From cd8cf6068328ecf201ee9b49609f1b362c78a176 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Fri, 14 Nov 2025 10:02:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BA=91=E5=89=8D=E7=BD=AE=E6=94=B9=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/handler/MqttMessageHandler.java | 60 ++++++++++--------- .../stat/service/impl/StatServiceImpl.java | 11 +++- 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 4138cb3..8d7045e 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -380,32 +380,27 @@ public class MqttMessageHandler { if (Objects.equals(res.getDid(),1)){ log.info("{},设备数据应答--->更新治理监测点信息和设备容量", nDid); List list3 = new ArrayList<>(); - devInfo.forEach(item->{ - //1.更新治理监测点信息 - CsLineParam csLineParam = new CsLineParam(); - if (Objects.equals(item.getClDid(),0)){ - csLineParam.setLineId(nDid.concat("0")); + boolean hasZeroClDid = devInfo.stream().anyMatch(item -> item.getClDid() == 0); + //治理设备 + if (hasZeroClDid) { + devInfo.forEach(item->{ + if (Objects.equals(item.getClDid(),0)){ + updateLineInfo(nDid,item); + } //2.录入各个模块设备容量 CsDevCapacityPO csDevCapacity = new CsDevCapacityPO(); csDevCapacity.setLineId(nDid.concat("0")); csDevCapacity.setCldid(item.getClDid()); csDevCapacity.setCapacity(Objects.isNull(item.getCapacityA())?0.0:item.getCapacityA()); list3.add(csDevCapacity); - } else { - csLineParam.setLineId(nDid.concat(item.getClDid().toString())); - } - csLineParam.setVolGrade(item.getVolGrade()); - csLineParam.setPtRatio(item.getPtRatio()); - csLineParam.setCtRatio(item.getCtRatio()); - csLineParam.setConType(item.getConType()); - csLineParam.setLineInterval(item.getStatCycle()); - csLineFeignClient.updateLine(csLineParam); - //生成监测点限值 - Overlimit overlimit = COverlimitUtil.globalAssemble(item.getVolGrade().floatValue(),10f,10f,10f,0,0); - overlimit.setId(nDid.concat(item.getClDid().toString())); - overlimitMapper.deleteById(nDid.concat(item.getClDid().toString())); - overlimitMapper.insert(overlimit); - }); + }); + } + //其余设备 + else { + devInfo.forEach(item->{ + updateLineInfo(nDid,item); + }); + } if (CollectionUtil.isNotEmpty(list3)) { devCapacityFeignClient.addList(list3); //3.更新设备模块个数 @@ -416,14 +411,7 @@ public class MqttMessageHandler { logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息"); //1.更新电网侧、负载侧监测点相关信息 devInfo.forEach(item->{ - CsLineParam csLineParam = new CsLineParam(); - csLineParam.setLineId(nDid.concat(item.getClDid().toString())); - csLineParam.setVolGrade(item.getVolGrade()); - csLineParam.setPtRatio(item.getPtRatio()); - csLineParam.setCtRatio(item.getCtRatio()); - csLineParam.setConType(item.getConType()); - csLineParam.setLineInterval(item.getStatCycle()); - csLineFeignClient.updateLine(csLineParam); + updateLineInfo(nDid,item); }); } } @@ -468,6 +456,22 @@ public class MqttMessageHandler { } } + public void updateLineInfo(String nDid,RspDataDto.LdevInfo item) { + CsLineParam csLineParam = new CsLineParam(); + csLineParam.setLineId(nDid.concat(item.getClDid().toString())); + csLineParam.setVolGrade(item.getVolGrade()); + csLineParam.setPtRatio(item.getPtRatio()); + csLineParam.setCtRatio(item.getCtRatio()); + csLineParam.setConType(item.getConType()); + csLineParam.setLineInterval(item.getStatCycle()); + csLineFeignClient.updateLine(csLineParam); + //生成监测点限值 + Overlimit overlimit = COverlimitUtil.globalAssemble(item.getVolGrade().floatValue(),10f,10f,10f,0,0); + overlimit.setId(nDid.concat(item.getClDid().toString())); + overlimitMapper.deleteById(nDid.concat(item.getClDid().toString())); + overlimitMapper.insert(overlimit); + } + /** * 装置心跳 && 主动数据上送 * fixme 这边由于接收文件数据时间跨度会很长,途中有其他请求进来会中断之前的程序,目前是记录中断的位置,等处理完成再继续请求接收文件 diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index 30af952..20cfe75 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -57,6 +57,11 @@ public class StatServiceImpl implements IStatService { private final RedisUtil redisUtil; private final ChannelObjectUtil channelObjectUtil; private final CsLineLatestDataFeignClient csLineLatestDataFeignClient; + private static final Map PHASE_MAPPING = new HashMap() {{ + put("AB", "A"); + put("BC", "B"); + put("CA", "C"); + }}; @Override @Transactional(rollbackFor = Exception.class) @@ -215,10 +220,11 @@ public class StatServiceImpl implements IStatService { String tableName = map.get(dataArrayList.get(i).getName()); Map tags = new HashMap<>(); tags.put(InfluxDBTableConstant.LINE_ID,lineId); - tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase()); + tags.put(InfluxDBTableConstant.PHASIC_TYPE,Objects.isNull(PHASE_MAPPING.get(dataArrayList.get(i).getPhase()))?dataArrayList.get(i).getPhase():PHASE_MAPPING.get(dataArrayList.get(i).getPhase())); tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod); tags.put(InfluxDBTableConstant.CL_DID,clDid.toString()); tags.put(InfluxDBTableConstant.PROCESS,process.toString()); + tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0"); Map fields = new HashMap<>(); //这边特殊处理,如果数据为3.14159,则将数据置为null fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i)); @@ -244,4 +250,7 @@ public class StatServiceImpl implements IStatService { return urlList; } + //相别处理 + + }