diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index fef3c81..a60626e 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -14,6 +14,7 @@ import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.TypeEnum; +import com.njcn.access.mapper.OverlimitMapper; import com.njcn.access.pojo.RspDataDto; import com.njcn.access.pojo.dto.*; import com.njcn.access.pojo.dto.file.FileDto; @@ -31,6 +32,8 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.*; import com.njcn.csdevice.pojo.param.CsLineParam; import com.njcn.csdevice.pojo.po.*; +import com.njcn.device.biz.pojo.po.Overlimit; +import com.njcn.device.biz.utils.COverlimitUtil; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.message.AppFileMessage; @@ -89,6 +92,7 @@ public class MqttMessageHandler { private final CsLineFeignClient csLineFeignClient; private final DevCapacityFeignClient devCapacityFeignClient; private final EquipmentFeignClient equipmentFeignClient; + private final OverlimitMapper overlimitMapper; @Autowired Validator validator; @@ -197,97 +201,97 @@ public class MqttMessageHandler { } } - /** - * 装置类型模板应答 - * 1.判断网关的类型 - * 2.直联设备的DevCfg和DevMod是以直联设备为准,上送平台端,平台端保存。通过校验DevMod模板信息来从平台端模板池中选取对应的模板,如果找不到匹配模板需告警提示人工干预处理。 - * 3.平台端需读取装置的DevMod来判断网关支持的设备模板(包含设备型号和模板版本),根据app提交的接入子设备DID匹配数据模板(型号及版本),生成DevCfg下发给网关,网关根据下发信息生成就地设备点表。 - * @param topic - * @param message - * @param nDid - * @param payload - */ - @MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1) - @Transactional(rollbackFor = Exception.class) - public void devModelOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){ - log.info("收到当前设备所用模板响应--->" + nDid); - DeviceLogDTO logDto = new DeviceLogDTO(); - try{ - logDto.setUserName(RequestUtil.getUsername()); - logDto.setLoginName(RequestUtil.getLoginName()); - } catch (Exception e) { - logDto.setUserName("系统重启或定时任务创建"); - logDto.setLoginName(null); - } - logDto.setOperate(nDid + "设备类型模板应答"); - logDto.setResult(1); - //业务处理 - Gson gson = new Gson(); - ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class); - if (Objects.equals(modelDto.getType(),Integer.parseInt(TypeEnum.TYPE_18.getCode()))){ - List list = modelDto.getMsg().getDevMod(); - List list2 = modelDto.getMsg().getDevCfg(); - if (CollectionUtils.isEmpty(list)){ - log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); - csLogsFeignClient.addUserLog(logDto); - //有异常删除缓存的模板信息 - redisUtil.delete(AppRedisKey.MODEL + nDid); - throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR); - } - //校验前置传递的装置模板库中是否存在 - List modelList = new ArrayList<>(); - list.forEach(item->{ - Integer did = null; - for (DevCfgDto item2 : list2) { - if (Objects.equals(item.getDevType(),item2.getDevType())){ - did = item2.getDid(); - } - } - CsModelDto csModelDto = new CsModelDto(); - CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData(); - if (Objects.isNull(po)){ - log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.MODEL_NO_FIND.getMessage()); - csLogsFeignClient.addUserLog(logDto); - //有异常删除缓存的模板信息 - redisUtil.delete(AppRedisKey.MODEL + nDid); - throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); - } - if (Objects.equals(po.getType(),0)){ - List dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData(); - if (CollectionUtils.isEmpty(dataSetList)){ - logDto.setResult(0); - logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage()); - csLogsFeignClient.addUserLog(logDto); - //有异常删除缓存的模板信息 - redisUtil.delete(AppRedisKey.MODEL + nDid); - throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); - } - csModelDto.setModuleNumber(dataSetList.size()); - } - csModelDto.setDevType(po.getDevTypeName()); - csModelDto.setModelId(po.getId()); - csModelDto.setDid(did); - csModelDto.setType(po.getType()); - modelList.add(csModelDto); - }); - //存储模板id - String key2 = AppRedisKey.MODEL + nDid; - redisUtil.saveByKeyWithExpire(key2,modelList,600L); - //存储监测点模板信息,用于界面回显 - List modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList()); - List lineList = csLineModelService.getMonitorNumByModelId(modelId); - String key = AppRedisKey.LINE + nDid; - redisUtil.saveByKeyWithExpire(key,lineList,600L); - //csLogsFeignClient.addUserLog(logDto); - } - } +// /** +// * 装置类型模板应答 +// * 1.判断网关的类型 +// * 2.直联设备的DevCfg和DevMod是以直联设备为准,上送平台端,平台端保存。通过校验DevMod模板信息来从平台端模板池中选取对应的模板,如果找不到匹配模板需告警提示人工干预处理。 +// * 3.平台端需读取装置的DevMod来判断网关支持的设备模板(包含设备型号和模板版本),根据app提交的接入子设备DID匹配数据模板(型号及版本),生成DevCfg下发给网关,网关根据下发信息生成就地设备点表。 +// * @param topic +// * @param message +// * @param nDid +// * @param payload +// */ +// @MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1) +// @Transactional(rollbackFor = Exception.class) +// public void devModelOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){ +// log.info("收到当前设备所用模板响应--->" + nDid); +// DeviceLogDTO logDto = new DeviceLogDTO(); +// try{ +// logDto.setUserName(RequestUtil.getUsername()); +// logDto.setLoginName(RequestUtil.getLoginName()); +// } catch (Exception e) { +// logDto.setUserName("系统重启或定时任务创建"); +// logDto.setLoginName(null); +// } +// logDto.setOperate(nDid + "设备类型模板应答"); +// logDto.setResult(1); +// //业务处理 +// Gson gson = new Gson(); +// ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class); +// if (Objects.equals(modelDto.getType(),Integer.parseInt(TypeEnum.TYPE_18.getCode()))){ +// List list = modelDto.getMsg().getDevMod(); +// List list2 = modelDto.getMsg().getDevCfg(); +// if (CollectionUtils.isEmpty(list)){ +// log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); +// logDto.setResult(0); +// logDto.setFailReason(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); +// csLogsFeignClient.addUserLog(logDto); +// //有异常删除缓存的模板信息 +// redisUtil.delete(AppRedisKey.MODEL + nDid); +// throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR); +// } +// //校验前置传递的装置模板库中是否存在 +// List modelList = new ArrayList<>(); +// list.forEach(item->{ +// Integer did = null; +// for (DevCfgDto item2 : list2) { +// if (Objects.equals(item.getDevType(),item2.getDevType())){ +// did = item2.getDid(); +// } +// } +// CsModelDto csModelDto = new CsModelDto(); +// CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData(); +// if (Objects.isNull(po)){ +// log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); +// logDto.setResult(0); +// logDto.setFailReason(AccessResponseEnum.MODEL_NO_FIND.getMessage()); +// csLogsFeignClient.addUserLog(logDto); +// //有异常删除缓存的模板信息 +// redisUtil.delete(AppRedisKey.MODEL + nDid); +// throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); +// } +// if (Objects.equals(po.getType(),0)){ +// List dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData(); +// if (CollectionUtils.isEmpty(dataSetList)){ +// logDto.setResult(0); +// logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage()); +// csLogsFeignClient.addUserLog(logDto); +// //有异常删除缓存的模板信息 +// redisUtil.delete(AppRedisKey.MODEL + nDid); +// throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); +// } +// csModelDto.setModuleNumber(dataSetList.size()); +// } +// csModelDto.setDevType(po.getDevTypeName()); +// csModelDto.setModelId(po.getId()); +// csModelDto.setDid(did); +// csModelDto.setType(po.getType()); +// modelList.add(csModelDto); +// }); +// //存储模板id +// String key2 = AppRedisKey.MODEL + nDid; +// redisUtil.saveByKeyWithExpire(key2,modelList,600L); +// //存储监测点模板信息,用于界面回显 +// List modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList()); +// List lineList = csLineModelService.getMonitorNumByModelId(modelId); +// String key = AppRedisKey.LINE + nDid; +// redisUtil.saveByKeyWithExpire(key,lineList,600L); +// //csLogsFeignClient.addUserLog(logDto); +// } +// } /** - * 设备接入平台应答 + * 设备响应 * @param topic * @param message * @param version @@ -313,9 +317,76 @@ public class MqttMessageHandler { redisUtil.saveByKeyWithExpire("devResponse",res.getCode(),5L); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ switch (res.getType()){ + /** + * 装置类型模板应答 + * 1.判断网关的类型 + * 2.直联设备的DevCfg和DevMod是以直联设备为准,上送平台端,平台端保存。通过校验DevMod模板信息来从平台端模板池中选取对应的模板,如果找不到匹配模板需告警提示人工干预处理。 + * 3.平台端需读取装置的DevMod来判断网关支持的设备模板(包含设备型号和模板版本),根据app提交的接入子设备DID匹配数据模板(型号及版本),生成DevCfg下发给网关,网关根据下发信息生成就地设备点表。 + */ + case 4611: + log.info("{},装置模板应答,应答code {}",nDid,res.getCode()); + ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class); + List list = modelDto.getMsg().getDevMod(); + List list2 = modelDto.getMsg().getDevCfg(); + if (CollectionUtils.isEmpty(list)){ + log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); + //有异常删除缓存的模板信息 + redisUtil.delete(AppRedisKey.MODEL + nDid); + throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR); + } + //校验前置传递的装置模板库中是否存在 + List modelList = new ArrayList<>(); + list.forEach(item->{ + Integer did = null; + for (DevCfgDto item2 : list2) { + if (Objects.equals(item.getDevType(),item2.getDevType())){ + did = item2.getDid(); + } + } + CsModelDto csModelDto = new CsModelDto(); + CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData(); + if (Objects.isNull(po)){ + log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODEL_NO_FIND.getMessage()); + csLogsFeignClient.addUserLog(logDto); + //有异常删除缓存的模板信息 + redisUtil.delete(AppRedisKey.MODEL + nDid); + throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); + } + if (Objects.equals(po.getType(),0)){ + List dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData(); + if (CollectionUtils.isEmpty(dataSetList)){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage()); + csLogsFeignClient.addUserLog(logDto); + //有异常删除缓存的模板信息 + redisUtil.delete(AppRedisKey.MODEL + nDid); + throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); + } + csModelDto.setModuleNumber(dataSetList.size()); + } + csModelDto.setDevType(po.getDevTypeName()); + csModelDto.setModelId(po.getId()); + csModelDto.setDid(did); + csModelDto.setType(po.getType()); + modelList.add(csModelDto); + }); + //存储模板id + String key2 = AppRedisKey.MODEL + nDid; + redisUtil.saveByKeyWithExpire(key2,modelList,600L); + //存储监测点模板信息,用于界面回显 + List modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList()); + List lineList = csLineModelService.getMonitorNumByModelId(modelId); + String key = AppRedisKey.LINE + nDid; + redisUtil.saveByKeyWithExpire(key,lineList,600L); + break; case 4613: logDto.setOperate(nDid + "设备接入"); - log.info("{}收到接入应答响应,应答code {}",nDid,res.getCode()); + log.info("{},收到接入应答响应,应答code {}",nDid,res.getCode()); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ int mid = 1; //修改装置状态 @@ -362,99 +433,107 @@ public class MqttMessageHandler { csLogsFeignClient.addUserLog(logDto); break; case 4614: - log.info("设备数据应答--->" + nDid); RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class); - switch (rspDataDto.getDataType()){ - case 1: - logDto.setOperate(nDid + "更新设备软件信息"); - RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class); - //记录设备软件信息 - CsSoftInfoPO csSoftInfoPo = new CsSoftInfoPO(); - BeanUtils.copyProperties(softInfo,csSoftInfoPo); - String id = IdUtil.fastSimpleUUID(); - csSoftInfoPo.setId(id); - DateTimeFormatter formatter = new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd[[HH][:mm][:ss]]") - .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) - .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) - .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) - .parseDefaulting(ChronoField.MILLI_OF_SECOND, 0) - .toFormatter(); - LocalDateTime localDateTime = LocalDateTime.parse(softInfo.getAppDate(), formatter); - assertThat(localDateTime).isNotNull(); - csSoftInfoPo.setAppDate(localDateTime); - csSoftInfoFeignClient.saveSoftInfo(csSoftInfoPo); - //更新设备软件id 先看是否存在软件信息,删除 然后在录入 - CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData(); - String soft = po.getSoftinfoId(); - if (StringUtil.isNotBlank(soft)){ - csSoftInfoFeignClient.removeSoftInfo(soft); - } - equipmentFeignClient.updateSoftInfo(nDid,csSoftInfoPo.getId()); - break; - case 2: - List devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class); - if (CollectionUtil.isNotEmpty(devInfo)){ - if (Objects.equals(res.getDid(),1)){ - logDto.setOperate(nDid + "更新治理监测点信息和设备容量"); - List list = new ArrayList<>(); - devInfo.forEach(item->{ - //1.更新治理监测点信息 - CsLineParam csLineParam = new CsLineParam(); - if (Objects.equals(item.getClDid(),0)){ - csLineParam.setLineId(nDid.concat("0")); - //2.录入各个模块设备容量 - CsDevCapacityPO csDevCapacity = new CsDevCapacityPO(); - csDevCapacity.setLineId(nDid.concat("0")); - csDevCapacity.setCldid(item.getClDid()); - csDevCapacity.setCapacity(Objects.isNull(item.getCapacityA())?0.0:item.getCapacityA()); - list.add(csDevCapacity); - } else { - csLineParam.setLineId(nDid.concat(item.getClDid().toString())); - } - csLineParam.setVolGrade(item.getVolGrade()); - csLineParam.setPtRatio(item.getPtRatio()); - csLineParam.setCtRatio(item.getCtRatio()); - csLineParam.setConType(item.getConType()); - csLineFeignClient.updateLine(csLineParam); - }); - if (CollectionUtil.isNotEmpty(list)) { - devCapacityFeignClient.addList(list); - //3.更新设备模块个数 - equipmentFeignClient.updateModuleNumber(nDid,(devInfo.size()-1)); - } - } else if (Objects.equals(res.getDid(),2)) { - logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息"); - //1.更新电网侧、负载侧监测点相关信息 - devInfo.forEach(item->{ - CsLineParam csLineParam = new CsLineParam(); - csLineParam.setLineId(nDid.concat(item.getClDid().toString())); - csLineParam.setVolGrade(item.getVolGrade()); - csLineParam.setPtRatio(item.getPtRatio()); - csLineParam.setCtRatio(item.getCtRatio()); - csLineParam.setConType(item.getConType()); - csLineFeignClient.updateLine(csLineParam); - }); + if (!Objects.isNull(rspDataDto.getDataType())) { + switch (rspDataDto.getDataType()){ + case 1: + log.info("{},设备数据应答--->更新设备软件信息", nDid); + logDto.setOperate(nDid + "更新设备软件信息"); + RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class); + //记录设备软件信息 + CsSoftInfoPO csSoftInfoPo = new CsSoftInfoPO(); + BeanUtils.copyProperties(softInfo,csSoftInfoPo); + String id = IdUtil.fastSimpleUUID(); + csSoftInfoPo.setId(id); + DateTimeFormatter formatter = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd[[HH][:mm][:ss]]") + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .parseDefaulting(ChronoField.MILLI_OF_SECOND, 0) + .toFormatter(); + LocalDateTime localDateTime = LocalDateTime.parse(softInfo.getAppDate(), formatter); + assertThat(localDateTime).isNotNull(); + csSoftInfoPo.setAppDate(localDateTime); + csSoftInfoFeignClient.saveSoftInfo(csSoftInfoPo); + //更新设备软件id 先看是否存在软件信息,删除 然后在录入 + CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData(); + String soft = po.getSoftinfoId(); + if (StringUtil.isNotBlank(soft)){ + csSoftInfoFeignClient.removeSoftInfo(soft); } - } - break; - case 48: - log.info("询问装置项目列表"); - logDto.setUserName("询问装置项目列表"); - logDto.setOperate("监测点:" + (nDid + rspDataDto.getClDid()) + "询问项目列表"); - List projectInfoList = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.ProjectInfo.class); - String key = AppRedisKey.PROJECT_INFO + nDid + rspDataDto.getClDid(); - redisUtil.saveByKeyWithExpire(key,projectInfoList,60L); - break; - default: - break; + equipmentFeignClient.updateSoftInfo(nDid,csSoftInfoPo.getId()); + break; + case 2: + List devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class); + if (CollectionUtil.isNotEmpty(devInfo)){ + if (Objects.equals(res.getDid(),1)){ + log.info("{},设备数据应答--->更新治理监测点信息和设备容量", nDid); + List list3 = new ArrayList<>(); + devInfo.forEach(item->{ + //1.更新治理监测点信息 + CsLineParam csLineParam = new CsLineParam(); + if (Objects.equals(item.getClDid(),0)){ + csLineParam.setLineId(nDid.concat("0")); + //2.录入各个模块设备容量 + CsDevCapacityPO csDevCapacity = new CsDevCapacityPO(); + csDevCapacity.setLineId(nDid.concat("0")); + csDevCapacity.setCldid(item.getClDid()); + csDevCapacity.setCapacity(Objects.isNull(item.getCapacityA())?0.0:item.getCapacityA()); + list3.add(csDevCapacity); + } else { + csLineParam.setLineId(nDid.concat(item.getClDid().toString())); + } + csLineParam.setVolGrade(item.getVolGrade()); + csLineParam.setPtRatio(item.getPtRatio()); + csLineParam.setCtRatio(item.getCtRatio()); + csLineParam.setConType(item.getConType()); + csLineFeignClient.updateLine(csLineParam); + //生成监测点限值 + Overlimit overlimit = COverlimitUtil.globalAssemble(item.getVolGrade().floatValue(),10f,10f,10f,0,0); + overlimit.setId(nDid.concat(item.getClDid().toString())); + overlimitMapper.deleteById(nDid.concat(item.getClDid().toString())); + overlimitMapper.insert(overlimit); + }); + if (CollectionUtil.isNotEmpty(list3)) { + devCapacityFeignClient.addList(list3); + //3.更新设备模块个数 + equipmentFeignClient.updateModuleNumber(nDid,(devInfo.size()-1)); + } + } else if (Objects.equals(res.getDid(),2)) { + log.info("{},设备数据应答--->更新电网侧、负载侧监测点信息", nDid); + logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息"); + //1.更新电网侧、负载侧监测点相关信息 + devInfo.forEach(item->{ + CsLineParam csLineParam = new CsLineParam(); + csLineParam.setLineId(nDid.concat(item.getClDid().toString())); + csLineParam.setVolGrade(item.getVolGrade()); + csLineParam.setPtRatio(item.getPtRatio()); + csLineParam.setCtRatio(item.getCtRatio()); + csLineParam.setConType(item.getConType()); + csLineFeignClient.updateLine(csLineParam); + }); + } + } + break; + case 48: + log.info("询问装置项目列表"); + logDto.setUserName("询问装置项目列表"); + logDto.setOperate("监测点:" + (nDid + rspDataDto.getClDid()) + "询问项目列表"); + List projectInfoList = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.ProjectInfo.class); + String key3 = AppRedisKey.PROJECT_INFO + nDid + rspDataDto.getClDid(); + redisUtil.saveByKeyWithExpire(key3,projectInfoList,60L); + break; + default: + break; + } } break; case 4663: log.info("装置操作应答"); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ - String key = AppRedisKey.CONTROL + nDid; - redisUtil.saveByKeyWithExpire(key,"success",10L); + String key4 = AppRedisKey.CONTROL + nDid; + redisUtil.saveByKeyWithExpire(key4,"success",10L); } break; default: @@ -529,7 +608,8 @@ public class MqttMessageHandler { switch (dataDto.getMsg().getDataAttr()) { //暂态事件、录波处理、工程信息 case 0: - log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8)); + log.info(nDid + "处理事件"); + //log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8)); EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class); JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto)); AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class); @@ -648,7 +728,7 @@ public class MqttMessageHandler { public void devErrorInfo(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) { //解析数据 Gson gson = new Gson(); - log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8)); + //log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8)); EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class); JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto)); AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class); @@ -721,7 +801,7 @@ public class MqttMessageHandler { break; } reqAndResParam.setMsg(askDataDto); - log.info("askDevData的请求报文:" + new Gson().toJson(reqAndResParam)); + //log.info("askDevData的请求报文:" + new Gson().toJson(reqAndResParam)); publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 078cebb..354ca08 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -122,7 +122,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene //1.装置心跳断连 //2.MQTT客户端不在线 private void executeMainTask(ScheduledExecutorService scheduler, String nDid, String version) { - System.out.println("正在执行主任务..."); + log.info("正在执行主任务..."); DeviceLogDTO logDto = new DeviceLogDTO(); logDto.setUserName("装置失去心跳触发"); //判断mqtt @@ -133,7 +133,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene csDeviceService.devAccessAskTemplate(nDid,version,1); redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); try { - Thread.sleep(3000); + Thread.sleep(5000); Object object = redisUtil.getObjectByKey("online" + nDid); if (Objects.nonNull(object)) { scheduler.shutdown(); @@ -165,7 +165,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene addLogs(dto); String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { - System.out.println(nDid + "执行重连定时任务..."); + log.info(nDid + "执行重连定时任务..."); DeviceLogDTO logDto = new DeviceLogDTO(); logDto.setOperate(nDid + "重连定时任务"); //判断客户端 @@ -173,7 +173,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene if (mqttClient) { csDeviceService.devAccessAskTemplate(nDid,version,1); try { - Thread.sleep(3000); + Thread.sleep(5000); Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ logDto.setResult(1); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/OverlimitMapper.java b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/OverlimitMapper.java index d7f6da3..610eb86 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/OverlimitMapper.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/OverlimitMapper.java @@ -1,8 +1,10 @@ package com.njcn.access.mapper; +import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.njcn.device.biz.pojo.po.Overlimit; +import org.apache.ibatis.annotations.Mapper; /** @@ -12,6 +14,8 @@ import com.njcn.device.biz.pojo.po.Overlimit; * * @author xy */ +@DS("sjzx") +@Mapper public interface OverlimitMapper extends BaseMapper { } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java index 5801ec4..795de11 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java @@ -31,6 +31,6 @@ public class CsDataSetServiceImpl extends ServiceImpl getDataSetData(String modelId) { - return this.lambdaQuery().eq(CsDataSet::getPid, modelId).list(); + return this.lambdaQuery().eq(CsDataSet::getPid, modelId).eq(CsDataSet::getDataType,"Stat").list(); } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 2b9ac03..1ac2178 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -27,8 +27,6 @@ import com.njcn.csdevice.pojo.param.CsLedgerParam; import com.njcn.csdevice.pojo.param.CsLineParam; import com.njcn.csdevice.pojo.po.*; import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; -import com.njcn.device.biz.pojo.po.Overlimit; -import com.njcn.device.biz.utils.COverlimitUtil; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.DicDataFeignClient; @@ -80,7 +78,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private final ChannelObjectUtil channelObjectUtil; private final CsLineFeignClient csLineFeignClient; private final DataSetFeignClient dataSetFeignClient; - private final OverlimitMapper overlimitMapper; @Override @Transactional(rollbackFor = {Exception.class}) @@ -372,108 +369,101 @@ public class CsDeviceServiceImpl implements ICsDeviceService { @Override @Transactional(rollbackFor = {Exception.class}) public String wlDevRegister(String nDid) { + String result = "fail"; + // 设备状态判断 + checkDeviceStatus(nDid); + // 询问设备支持的主题信息,并将支持的主题入库 + askAndStoreTopics(nDid); + // MQTT询问装置用的模板,并判断库中是否存在模板 + checkDeviceModel(nDid); + // 根据模板接入设备 + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName(RequestUtil.getUserNickname()); + logDto.setLoginName(RequestUtil.getUsername()); + logDto.setOperate("便携式设备"+nDid+"注册、接入"); + logDto.setResult(1); try { - // 设备状态判断 - checkDeviceStatus(nDid); - // 询问设备支持的主题信息,并将支持的主题入库 - askAndStoreTopics(nDid); - // MQTT询问装置用的模板,并判断库中是否存在模板 - checkDeviceModel(nDid); - // 根据模板接入设备 - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName(RequestUtil.getUserNickname()); - logDto.setLoginName(RequestUtil.getUsername()); - logDto.setOperate("便携式设备"+nDid+"注册、接入"); - logDto.setResult(1); - try { - Thread.sleep(2000); - //获取版本 - String version = csTopicService.getVersion(nDid); - CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); - List csLinePoList = new ArrayList<>(); - //1.录入装置台账信息 - CsLedgerParam csLedgerParam = new CsLedgerParam(); - csLedgerParam.setId(vo.getId()); - csLedgerParam.setPid("0"); - csLedgerParam.setName(vo.getName()); - csLedgerParam.setLevel(2); - csLedgerParam.setSort(0); - csLedgerService.addLedgerTree(csLedgerParam); - //2.根据模板获取监测点个数,插入监测点表 - Thread.sleep(2000); - List modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class); - if (CollUtil.isEmpty(modelList)){ - try { - throwExceptionAndLog(AccessResponseEnum.MODEL_ERROR, logDto); - } catch (Exception e) { - throw new BusinessException(e.getMessage()); - } - } - List list = csDataSetService.getDataSetData(modelList.get(0).getModelId()); - list.forEach(item->{ - CsLinePO po = new CsLinePO(); - po.setLineId(nDid + item.getClDev().toString()); - po.setName(item.getClDev().toString() + "#监测点"); - po.setStatus(1); - po.setClDid(item.getClDev()); - po.setDeviceId(vo.getId()); - po.setDataSetId(item.getId()); - po.setDataModelId(item.getPid()); - //防止主键重复 - QueryWrapper qw = new QueryWrapper<>(); - qw.eq("line_id",po.getLineId()); - if(csLineService.getBaseMapper().selectList(qw).isEmpty()){ - csLinePoList.add(po); - } - //3.生成台账树监测点数据 - CsLedgerParam param = new CsLedgerParam(); - param.setId(nDid + item.getClDev().toString()); - param.setPid(vo.getId()); - param.setName(item.getClDev().toString() + "#监测点"); - param.setLevel(3); - param.setSort(0); - csLedgerService.addLedgerTree(param); - }); - csLineService.saveBatch(csLinePoList); - //生成监测点限值 - for(CsLinePO csLinePO: csLinePoList){ - Overlimit overlimit = COverlimitUtil.globalAssemble(csLinePO.getVolGrade().floatValue(),10f,10f,10f,0,1); - overlimit.setId(csLinePO.getLineId()); - overlimitMapper.deleteById(csLinePO.getLineId()); - overlimitMapper.insert(overlimit); - } - //4.生成装置和模板的关系表 - CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm(); - csDevModelRelationAddParm.setDevId(vo.getId()); - csDevModelRelationAddParm.setModelId(modelList.get(0).getModelId()); - csDevModelRelationAddParm.setDid(modelList.get(0).getDid()); - csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm); - //5.发起自动接入请求 - devAccessAskTemplate(nDid,version,1); - //6.存储日志 - csLogsFeignClient.addUserLog(logDto); - //7.存储设备调试日志表 - CsEquipmentProcessPO csEquipmentProcess = new CsEquipmentProcessPO(); - csEquipmentProcess.setDevId(nDid); - csEquipmentProcess.setOperator(RequestUtil.getUserIndex()); - csEquipmentProcess.setStartTime(LocalDateTime.now()); - csEquipmentProcess.setEndTime(LocalDateTime.now()); - csEquipmentProcess.setProcess(4); - csEquipmentProcess.setStatus(1); - processFeignClient.add(csEquipmentProcess); - //8.删除redis监测点模板信息 - redisUtil.delete(AppRedisKey.MODEL + nDid); - redisUtil.delete(AppRedisKey.LINE + nDid); - } catch (Exception e) { - logDto.setResult(0); - logDto.setFailReason(e.getMessage()); - csLogsFeignClient.addUserLog(logDto); - throw new BusinessException(CommonResponseEnum.FAIL); + Thread.sleep(2000); + //获取版本 + String version = csTopicService.getVersion(nDid); + CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData(); + List csLinePoList = new ArrayList<>(); + //1.录入装置台账信息 + CsLedgerParam csLedgerParam = new CsLedgerParam(); + csLedgerParam.setId(vo.getId()); + csLedgerParam.setPid("0"); + csLedgerParam.setName(vo.getName()); + csLedgerParam.setLevel(2); + csLedgerParam.setSort(0); + csLedgerService.addLedgerTree(csLedgerParam); + //2.根据模板获取监测点个数,插入监测点表 + Thread.sleep(2000); + List modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class); + if (CollUtil.isEmpty(modelList)) { + throwExceptionAndLog(AccessResponseEnum.MODEL_ERROR, logDto); } - return "success"; - } catch (BusinessException e) { - throw new BusinessException(e.getMessage()); + List list = csDataSetService.getDataSetData(modelList.get(0).getModelId()); + list.forEach(item->{ + CsLinePO po = new CsLinePO(); + po.setLineId(nDid + item.getClDev().toString()); + po.setName(item.getClDev().toString() + "#监测点"); + po.setStatus(1); + po.setClDid(item.getClDev()); + po.setDeviceId(vo.getId()); + po.setDataSetId(item.getId()); + po.setDataModelId(item.getPid()); + //防止主键重复 + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("line_id",po.getLineId()); + if(csLineService.getBaseMapper().selectList(qw).isEmpty()){ + csLinePoList.add(po); + } + //3.生成台账树监测点数据 + CsLedgerParam param = new CsLedgerParam(); + param.setId(nDid + item.getClDev().toString()); + param.setPid(vo.getId()); + param.setName(item.getClDev().toString() + "#监测点"); + param.setLevel(3); + param.setSort(0); + csLedgerService.addLedgerTree(param); + }); + csLineService.saveBatch(csLinePoList); + redisUtil.saveByKeyWithExpire("accessLineInfo:" + nDid,csLinePoList,30L); + //4.生成装置和模板的关系表 + CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm(); + csDevModelRelationAddParm.setDevId(vo.getId()); + csDevModelRelationAddParm.setModelId(modelList.get(0).getModelId()); + csDevModelRelationAddParm.setDid(modelList.get(0).getDid()); + csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm); + //5.发起自动接入请求 + devAccessAskTemplate(nDid,version,1); + //6.存储日志 + csLogsFeignClient.addUserLog(logDto); + //7.存储设备调试日志表 + CsEquipmentProcessPO csEquipmentProcess = new CsEquipmentProcessPO(); + csEquipmentProcess.setDevId(nDid); + csEquipmentProcess.setOperator(RequestUtil.getUserIndex()); + csEquipmentProcess.setStartTime(LocalDateTime.now()); + csEquipmentProcess.setEndTime(LocalDateTime.now()); + csEquipmentProcess.setProcess(4); + csEquipmentProcess.setStatus(1); + processFeignClient.add(csEquipmentProcess); + //8.删除redis监测点模板信息 + redisUtil.delete(AppRedisKey.MODEL + nDid); + redisUtil.delete(AppRedisKey.LINE + nDid); + //判断接入状态 + Thread.sleep(5000); + Object object = redisUtil.getObjectByKey("online" + nDid); + if (Objects.nonNull(object)) { + result = "success"; + } + } catch (Exception e) { + logDto.setResult(0); + logDto.setFailReason(e.getMessage()); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(AccessResponseEnum.ACCESS_ERROR); } + return result; } @Override @@ -549,13 +539,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { try { redisUtil.delete(AppRedisKey.MODEL + nDid); //询问装置当前所用模板 - ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); - reqAndResParam.setMid(mid); - reqAndResParam.setDid(0); - reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_3.getCode())); - reqAndResParam.setExpire(-1); - publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())),1,false); //接收到模板,判断模板是否存在,替换模板,发起接入 Thread.sleep(2000); List modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class); @@ -572,17 +556,20 @@ public class CsDeviceServiceImpl implements ICsDeviceService { modelMap.put(item.getType(),item.getModelId()); } //修改监测点使用的模板和数据集 - List lineList = csLineFeignClient.findByNdid(nDid).getData(); - for (CsLinePO item : lineList) { - if (item.getClDid() == 0) { - updateLineIds(modelMap.get(0),item.getClDid(),nDid); - } else { - updateLineIds(modelMap.get(1),item.getClDid(),nDid); + List lineList; + Object object = redisUtil.getObjectByKey("accessLineInfo:" + nDid); + if (Objects.isNull(object)) { + lineList = csLineFeignClient.findByNdid(nDid).getData(); + for (CsLinePO item : lineList) { + if (item.getClDid() == 0) { + updateLineIds(modelMap.get(0),item.getClDid(),nDid); + } else { + updateLineIds(modelMap.get(1),item.getClDid(),nDid); + } } } //发起接入 - reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); - publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())),1,false); //录波任务倒计时 redisUtil.saveByKeyWithExpire("startFile",null,120L); result = true; @@ -593,6 +580,19 @@ public class CsDeviceServiceImpl implements ICsDeviceService { return result; } + /** + * 组装报文 + */ + public ReqAndResDto.Req getJson(Integer mid, String code) { + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(mid); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setExpire(-1); + reqAndResParam.setType(Integer.parseInt(code)); + return reqAndResParam; + } + /** * 修改监测点的模板id和数据集id */ @@ -605,8 +605,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService { csLineFeignClient.updateIds(csLineParam); } - - /** * 平台对设备发起主题询问命令 */ diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java index 3e284b3..e3419b5 100644 --- a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java @@ -70,7 +70,7 @@ public class RtServiceImpl implements IRtService { AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0); //fixme 这边先根据数据集的名称来返回对应实体,这边感觉不太合适,后期有好方案再调整 //基础数据 - if (Objects.equals(dataSet.getName(),"Ds$Pqd$Rt$Basic$01")) { + if (dataSet.getName().contains("Ds$Pqd$Rt$Basic$")) { BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType()); baseRealDataSet.setLineId(lineId); baseRealDataSet.setPt(po.getPtRatio().floatValue());