From 4740f2b4d003687628857d8fea218c91bc1919c6 Mon Sep 17 00:00:00 2001 From: xuyang <748613696@qq.com> Date: Wed, 16 Aug 2023 19:47:54 +0800 Subject: [PATCH] =?UTF-8?q?=E7=89=A9=E8=A7=A3=E6=9E=90=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/enums/AccessResponseEnum.java | 1 + .../com/njcn/access/pojo/dto/CsModelDto.java | 2 + .../access/handler/MqttMessageHandler.java | 21 ++++- .../service/ICsEquipmentDeliveryService.java | 2 +- .../service/impl/CsDeviceServiceImpl.java | 30 ++++-- .../impl/CsEquipmentDeliveryServiceImpl.java | 4 +- iot-analysis/analysis-stat/stat-boot/pom.xml | 4 + .../stat/service/impl/StatServiceImpl.java | 91 +++++++++++++------ 8 files changed, 108 insertions(+), 47 deletions(-) diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 6710833..24d142a 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -54,6 +54,7 @@ public enum AccessResponseEnum { MODEL_VERSION_ERROR("A0308","询问装置模板信息错误"), CLDID_IS_NULL("A0309","逻辑子设备标识为空"), + MODULE_NUMBER_IS_NULL("A0309","设备子模块个数为空"), LDEVINFO_IS_NULL("A0309","逻辑设备信息为空"), SOFTINFO_IS_NULL("A0309","软件信息为空"), diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/CsModelDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/CsModelDto.java index b76b537..66fe989 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/CsModelDto.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/CsModelDto.java @@ -20,4 +20,6 @@ public class CsModelDto { private Integer type; + private Integer moduleNumber; + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 5adceea..5a7f4c5 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -21,15 +21,17 @@ import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsLineModelService; import com.njcn.access.service.ICsTopicService; import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.DataSetFeignClient; import com.njcn.csdevice.api.DevModelFeignClient; +import com.njcn.csdevice.pojo.po.CsDataSet; import com.njcn.csdevice.pojo.po.CsDevModelPO; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.mq.template.AppAutoDataMessageTemplate; +import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -65,6 +67,8 @@ public class MqttMessageHandler { private final AppAutoDataMessageTemplate appAutoDataMessageTemplate; + private final DataSetFeignClient dataSetFeignClient; + @Autowired Validator validator; @@ -172,6 +176,13 @@ public class MqttMessageHandler { log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); } + if (Objects.equals(po.getType(),0)){ + List dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData(); + if (CollectionUtils.isEmpty(dataSetList)){ + throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); + } + csModelDto.setModuleNumber(dataSetList.size()); + } csModelDto.setDevType(po.getDevTypeName()); csModelDto.setModelId(po.getId()); csModelDto.setDid(did); @@ -179,12 +190,12 @@ public class MqttMessageHandler { modelList.add(csModelDto); }); //存储模板id - String key2 = "MODEL" + nDid; + String key2 = AppRedisKey.MODEL + nDid; redisUtil.saveByKeyWithExpire(key2,modelList,600L); //存储监测点模板信息,用于界面回显 List modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList()); List lineList = csLineModelService.getMonitorNumByModelId(modelId); - String key = "LINE" + nDid; + String key = AppRedisKey.LINE + nDid; redisUtil.saveByKeyWithExpire(key,lineList,600L); } } @@ -219,12 +230,12 @@ public class MqttMessageHandler { switch (rspDataDto.getDataType()){ case 1: RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class); - redisUtil.saveByKeyWithExpire("SOFTINFO"+nDid,softInfo,600L); + redisUtil.saveByKeyWithExpire(AppRedisKey.SOFTINFO+nDid,softInfo,600L); break; case 2: List ldevInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class); //fixme 默认第一个监测点是负载侧,第二个是电网测,后期数据错误可以在移动端调整 - redisUtil.saveByKeyWithExpire("LINEDATA"+nDid,ldevInfo,600L); + redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_DATA+nDid,ldevInfo,600L); break; default: break; diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java index 31d2499..e106450 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java @@ -33,7 +33,7 @@ public interface ICsEquipmentDeliveryService extends IService modelId = objectToList(redisUtil.getObjectByKey("MODEL" + devAccessParam.getNDid())); - Integer clDid = null; + List modelId = objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + devAccessParam.getNDid())); + Integer clDid = null, moduleNumber = null; //2.新增装置-模板关系、获取电能质量的逻辑设备id for (CsModelDto item : modelId) { CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm(); @@ -167,15 +168,21 @@ public class CsDeviceServiceImpl implements ICsDeviceService { if (Objects.equals(item.getType(),1)){ clDid = item.getDid(); } + if (Objects.equals(item.getType(),0)){ + moduleNumber = item.getModuleNumber(); + } } if (Objects.isNull(clDid)){ throw new BusinessException(AccessResponseEnum.CLDID_IS_NULL); } + if (Objects.isNull(moduleNumber)){ + throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); + } askDevData(devAccessParam.getNDid(),AccessEnum.L_DEV_INFO.getCode(),version,clDid); List list = new ArrayList<>(); //等待mqtt数据 Thread.sleep(500); - String key = "LINEDATA" + devAccessParam.getNDid(); + String key = AppRedisKey.LINE_DATA + devAccessParam.getNDid(); list = objectToList2(redisUtil.getObjectByKey(key)); if (CollectionUtils.isEmpty(list)){ throw new BusinessException(AccessResponseEnum.LDEVINFO_IS_NULL); @@ -188,18 +195,21 @@ public class CsDeviceServiceImpl implements ICsDeviceService { po.setLineId(id); po.setName(item.getName()); po.setPosition(item.getPosition()); + po.setClDid(0); if (Objects.equals(DicDataEnum.LOAD_SIDE.getCode(),location)){ RspDataDto.LdevInfo po1 = list.stream().filter(s -> Objects.equals(s.getClDid(),1)).findFirst().orElse(null); po.setVolGrade(po1.getVolGrade()); po.setPtRatio(po1.getPtRatio()); po.setCtRatio(po1.getCtRatio()); po.setConType(po1.getConType()); + po.setClDid(1); } else if (Objects.equals(DicDataEnum.GRID_SIDE.getCode(),location)){ RspDataDto.LdevInfo po1 = list.stream().filter(s -> Objects.equals(s.getClDid(),2)).findFirst().orElse(null); po.setVolGrade(po1.getVolGrade()); po.setPtRatio(po1.getPtRatio()); po.setCtRatio(po1.getCtRatio()); po.setConType(po1.getConType()); + po.setClDid(2); } po.setStatus(1); csLinePoList.add(po); @@ -232,7 +242,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { askDevData(devAccessParam.getNDid(),AccessEnum.SOFT_INFO.getCode(),version,0); //等待mqtt数据 Thread.sleep(500); - String key2 = "SOFTINFO" + devAccessParam.getNDid(); + String key2 = AppRedisKey.SOFTINFO + devAccessParam.getNDid(); RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(redisUtil.getObjectByKey(key2)), RspDataDto.SoftInfo.class); if (Objects.isNull(softInfo)){ throw new BusinessException(AccessResponseEnum.SOFTINFO_IS_NULL); @@ -243,16 +253,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService { csSoftInfoPo.setAppDate(new SimpleDateFormat("yyyy-MM-dd").parse(softInfo.getAppDate())); csSoftInfoService.save(csSoftInfoPo); //更新设备表软件信息 - csEquipmentDeliveryService.updateSoftInfoBynDid(devAccessParam.getNDid(),csSoftInfoPo.getId()); + csEquipmentDeliveryService.updateSoftInfoBynDid(devAccessParam.getNDid(),csSoftInfoPo.getId(),moduleNumber); //修改装置状态 csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode()); //7.发起自动接入请求 devAccess(devAccessParam.getNDid(),version); //8.删除redis监测点模板信息 - redisUtil.delete("MODEL" + devAccessParam.getNDid()); - redisUtil.delete("LINE" + devAccessParam.getNDid()); - redisUtil.delete("LINEDATA" + devAccessParam.getNDid()); - redisUtil.delete("SOFTINFO" + devAccessParam.getNDid()); + redisUtil.delete(AppRedisKey.MODEL + devAccessParam.getNDid()); + redisUtil.delete(AppRedisKey.LINE + devAccessParam.getNDid()); + redisUtil.delete(AppRedisKey.LINE_DATA + devAccessParam.getNDid()); + redisUtil.delete(AppRedisKey.SOFTINFO + devAccessParam.getNDid()); } catch (Exception e) { throw new BusinessException(CommonResponseEnum.FAIL); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index 09c0735..b22ec5d 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -34,9 +34,9 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl lambdaUpdateWrapper = new LambdaUpdateWrapper<>(); - lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getSoftinfoId,id).eq(CsEquipmentDeliveryPO::getNdid,nDid); + lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getSoftinfoId,id).set(CsEquipmentDeliveryPO::getModuleNumber,moduleNumber).eq(CsEquipmentDeliveryPO::getNdid,nDid); this.update(lambdaUpdateWrapper); } diff --git a/iot-analysis/analysis-stat/stat-boot/pom.xml b/iot-analysis/analysis-stat/stat-boot/pom.xml index d204c56..39f762d 100644 --- a/iot-analysis/analysis-stat/stat-boot/pom.xml +++ b/iot-analysis/analysis-stat/stat-boot/pom.xml @@ -76,6 +76,10 @@ ${project.version} compile + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index 5919b6b..84efc2d 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -1,10 +1,10 @@ package com.njcn.stat.service.impl; import cn.hutool.core.collection.CollectionUtil; +import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.gson.Gson; -import com.njcn.access.enums.AccessResponseEnum; +import com.njcn.access.pojo.dto.CsModelDto; import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.DataArrayFeignClient; @@ -14,24 +14,20 @@ import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.influx.pojo.constant.InfluxDBTableConstant; import com.njcn.influxdb.utils.InfluxDbUtils; import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.stat.enums.StatResponseEnum; import com.njcn.stat.service.IStatService; import com.njcn.system.api.DicDataFeignClient; import com.njcn.system.api.EpdFeignClient; import com.njcn.system.enums.DicDataEnum; -import com.njcn.system.enums.SystemResponseEnum; import com.njcn.system.pojo.dto.EpdDTO; -import com.njcn.system.pojo.po.Dic; import com.njcn.system.pojo.po.DictData; -import com.njcn.system.pojo.po.EleEpdPqd; import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.net.Inet4Address; import java.util.*; import java.util.concurrent.TimeUnit; @@ -73,11 +69,23 @@ public class StatServiceImpl implements IStatService { dataArrayParam.setCldId(appAutoDataMessage.getMsg().getClDid()); List list = appAutoDataMessage.getMsg().getDataArray(); //获取监测点id - String lineId = lineInfo(appAutoDataMessage.getId(),appAutoDataMessage.getMsg().getClDid()); + String lineId = null; + Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId()); + if (Objects.isNull(object1)){ + lineInfo(appAutoDataMessage.getId()); + } + if (Objects.equals(appAutoDataMessage.getDid(),1)){ + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get("0").toString(); + } else if (Objects.equals(appAutoDataMessage.getDid(),2)){ + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get(appAutoDataMessage.getMsg().getClDid().toString()).toString(); + } //缓存指标和influxDB表关系 - saveData(); + Object object2 = redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD); + if(Objects.isNull(object2)) { + saveData(); + } if (CollectionUtil.isNotEmpty(list)){ - list.forEach(item->{ + for (AppAutoDataMessage.DataArray item : list) { switch (item.getDataAttr()) { case 1: log.info("处理最大值"); @@ -98,21 +106,28 @@ public class StatServiceImpl implements IStatService { default: break; } - insertData(lineId,dataArrayParam,item); - }); + Object object = redisUtil.getObjectByKey(appAutoDataMessage.getId() + appAutoDataMessage.getDid() + appAutoDataMessage.getMsg().getClDid()); + List dataArrayList; + if (Objects.isNull(object)){ + dataArrayList = saveModelData(dataArrayParam); + } else { + dataArrayList = objectToList(object); + } + insertData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod()); + } } } /** - * 获取监测点相关信息 + * 缓存监测点相关信息 */ - public String lineInfo(String id, Integer clDid) { + public void lineInfo(String id) { Map map = new HashMap<>(); List lineList = csLineFeignClient.findByNdid(id).getData(); if (CollectionUtil.isEmpty(lineList)){ throw new BusinessException(StatResponseEnum.LINE_NULL); } - lineList.forEach(item->{ + for (CsLinePO item : lineList) { DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData(); if (Objects.isNull(dictData)){ throw new BusinessException(StatResponseEnum.DICT_NULL); @@ -124,8 +139,8 @@ public class StatServiceImpl implements IStatService { } else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){ map.put(2,item.getLineId()); } - }); - return map.get(clDid); + } + redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,600L); } /** @@ -140,19 +155,27 @@ public class StatServiceImpl implements IStatService { list.forEach(item->{ map.put(item.getDictName(),item.getTableName()); }); - redisUtil.saveByKeyWithExpire("ELEEPDPQD",map,180L); + redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,600L); + } + + /** + * 缓存设备模板信息 + */ + public List saveModelData(DataArrayParam dataArrayParam) { + String key = dataArrayParam.getId() + dataArrayParam.getDid() + dataArrayParam.getCldId(); + List dataArrayList = dataArrayFeignClient.findListByParam(dataArrayParam).getData(); + if (CollectionUtil.isEmpty(dataArrayList)){ + throw new BusinessException(StatResponseEnum.DATA_ARRAY_NULL); + } + redisUtil.saveByKeyWithExpire(key,dataArrayList,600L); + return dataArrayList; } /** * influxDB数据入库 */ - public void insertData(String lineId,DataArrayParam dataArrayParam,AppAutoDataMessage.DataArray item) { - //获取详细数据 - List dataArrayList = dataArrayFeignClient.findListByParam(dataArrayParam).getData(); - if (CollectionUtil.isEmpty(dataArrayList)){ - throw new BusinessException(StatResponseEnum.DATA_ARRAY_NULL); - } + public void insertData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod) { //解码 List floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData())); if (CollectionUtil.isEmpty(floats)){ @@ -163,14 +186,12 @@ public class StatServiceImpl implements IStatService { throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH); } for (int i = 0; i < dataArrayList.size(); i++) { - if (Objects.isNull(redisUtil.getObjectByKey("ELEEPDPQD"))){ - saveData(); - } - String tableName = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey("ELEEPDPQD")), Map.class).get(dataArrayList.get(i).getName()).toString(); + String tableName = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class).get(dataArrayList.get(i).getName()).toString(); Map tags = new HashMap<>(); tags.put(InfluxDBTableConstant.LINE_ID,lineId); tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase()); - tags.put(InfluxDBTableConstant.VALUE_TYPE,dataArrayList.get(i).getStatMethod()); + tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod); + tags.put(InfluxDBTableConstant.CL_DID,clDid.toString()); Map fields = new HashMap<>(); fields.put(dataArrayList.get(i).getName(),floats.get(i)); fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag()); @@ -178,4 +199,16 @@ public class StatServiceImpl implements IStatService { } } + public List objectToList(Object object) { + List urlList = new ArrayList<>(); + if (object != null) { + if (object instanceof ArrayList) { + for (Object o : (List) object) { + urlList.add((CsDataArray) o); + } + } + } + return urlList; + } + }