diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java b/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java index b738e34..9716ab0 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java @@ -35,6 +35,6 @@ public interface AskDeviceDataFeignClient { HttpResult askRealData(@RequestParam("nDid") String nDid, @RequestParam("idx") Integer idx, @RequestParam("clDId") Integer clDId); @PostMapping("/askCldRealData") - HttpResult askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId); + HttpResult askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId, @RequestParam("idx") Integer idx); } diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java index cafce2d..f5e45c3 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java @@ -75,7 +75,7 @@ public class AskDeviceDataClientFallbackFactory implements FallbackFactory askCldRealData(String devId, String lineId, String nodeId) { + public HttpResult askCldRealData(String devId, String lineId, String nodeId, Integer idx) { log.error("{}异常,降级处理,异常为:{}","询问云前置实时数据",cause.toString()); throw new BusinessException(finalExceptionEnum); } diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index b134855..108cefa 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -56,7 +56,7 @@ public enum AccessResponseEnum { CTRL_DICT_MISSING("A0307","Ctrl字典数据缺失!"), WAVE_INFO_MISSING("A0307","波形参数缺失!"), - MODEL_MISS("A0308","模板信息缺失!"), + MODEL_MISS("A0308","询问模板信息超时,设备未响应!"), MODEL_VERSION_ERROR("A0308","询问装置模板信息错误"), UPLOAD_ERROR("A0308","平台上送文件异常"), RELOAD_UPLOAD_ERROR("A0308","平台重新上送文件异常"), diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java index 0223c49..a45f9c0 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java @@ -129,11 +129,12 @@ public class AskDeviceDataController extends BaseController { @ApiImplicitParams({ @ApiImplicitParam(name = "devId", value = "装置id"), @ApiImplicitParam(name = "lineId", value = "监测点id"), - @ApiImplicitParam(name = "nodeId", value = "前置id") + @ApiImplicitParam(name = "nodeId", value = "前置id"), + @ApiImplicitParam(name = "idx", value = "数据集编号") }) - public HttpResult askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId){ + public HttpResult askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId, @RequestParam("idx") Integer idx){ String methodDescribe = getMethodDescribe("askCldRealData"); - askDeviceDataService.askCldRealData(devId,lineId,nodeId); + askDeviceDataService.askCldRealData(devId,lineId,nodeId,idx); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java index f9c4188..897f07c 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java @@ -22,5 +22,5 @@ public interface AskDeviceDataService { */ void askRealData(String nDid, Integer idx, Integer size); - void askCldRealData(String devId, String lineId, String nodeId); + void askCldRealData(String devId, String lineId, String nodeId, Integer idx); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java index 86602d3..b48cec4 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java @@ -213,7 +213,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { } @Override - public void askCldRealData(String devId, String lineId, String nodeId) { + public void askCldRealData(String devId, String lineId, String nodeId, Integer idx) { RealDataMessage realDataMessage = new RealDataMessage(); realDataMessage.setDevSeries(devId); int lastDigit = Character.getNumericValue(lineId.charAt(lineId.length() - 1)); @@ -221,6 +221,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { realDataMessage.setRealData(true); realDataMessage.setSoeData(true); realDataMessage.setLimit(20); + realDataMessage.setIdx(idx); realDataMessageTemplate.sendMember(realDataMessage,nodeId); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index 6084d34..f47031d 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -2,6 +2,7 @@ package com.njcn.access.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ObjectUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.github.tocrhz.mqtt.publisher.MqttPublisher; @@ -21,11 +22,13 @@ import com.njcn.access.utils.CRC32Utils; import com.njcn.access.utils.JsonUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.csdevice.api.CsLogsFeignClient; -import com.njcn.csdevice.api.DevModelFeignClient; +import com.njcn.csdevice.api.*; import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.pojo.param.CsDevModelAddParm; -import com.njcn.csdevice.pojo.po.*; +import com.njcn.csdevice.pojo.po.CsDataArray; +import com.njcn.csdevice.pojo.po.CsDataSet; +import com.njcn.csdevice.pojo.po.CsDevModelPO; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.oss.constant.OssPath; import com.njcn.oss.utils.FileStorageUtil; import com.njcn.redis.pojo.enums.AppRedisKey; @@ -46,7 +49,6 @@ import org.springframework.web.multipart.MultipartFile; import java.sql.Date; import java.util.*; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -78,6 +80,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService { private final MqttPublisher publisher; private final ICsTopicService csTopicService; private final RedisUtil redisUtil; + private final EquipmentFeignClient eequipmentFeignClient; + private final DevModelRelationFeignClient devModelRelationFeignClient; + private final CsLineFeignClient csLineFeignClient; @Override @Transactional(rollbackFor = {Exception.class}) @@ -115,6 +120,21 @@ public class CsDevModelServiceImpl implements ICsDevModelService { analysisDataSet(templateDto,csDevModelPo.getId()); //3.录入监测点模板表(记录当前模板有几个监测点,治理类型的模板目前规定1个监测点,电能质量模板根据逻辑子设备来) addCsLineModel(templateDto,csDevModelPo.getId()); + //4.如果是云前置的模板录入,重新录入完需要将模板关系信息重新录入 + if (Objects.equals(devType,DicDataEnum.DEV_CLD.getCode())) { + List list = eequipmentFeignClient.getAll().getData(); + if (CollectionUtil.isNotEmpty(list)) { + List devList = list.stream() + .filter(item -> Objects.equals(item.getDevAccessMethod(),"CLD")) + .map(CsEquipmentDeliveryPO::getId) + .collect(Collectors.toList()); + devModelRelationFeignClient.updateDataByList(devList,csDevModelPo.getId()); + Object object = redisUtil.getObjectByKey("setId:" + csDevModelPo.getId()); + if (ObjectUtil.isNotNull(object)) { + csLineFeignClient.updateDataByList(devList,csDevModelPo.getId(),object.toString()); + } + } + } csLogsFeignClient.addUserLog(logDto); } catch (Exception e) { logDto.setResult(0); @@ -943,6 +963,11 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } if(CollectionUtil.isNotEmpty(setList)) { csDataSetService.addList(setList); + setList.forEach(item->{ + if (Objects.equals(item.getName(),"统计数据")) { + redisUtil.saveByKeyWithExpire("setId:" + pId,item.getId(),30L); + } + }); } if(CollectionUtil.isNotEmpty(arrayList)) { csDataArrayService.addList(arrayList); diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java index 26a2ee6..0b5a08b 100644 --- a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java @@ -102,12 +102,13 @@ public class RtServiceImpl implements IRtService { baseRealDataSet.setDataLevel(dataSet.getDataLevel()); long timestamp = item.getDataTimeSec(); baseRealDataSet.setDataTime(getTime(timestamp)); - publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false); + publisher.send("/Web/RealData/" + userId, new Gson().toJson(baseRealDataSet), 1, false); }); } } //fixme 目前实时数据只有基础数据和谐波数据,后期拓展,这边需要再判断 else { + long timestamp; //用户Id String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString(); HarmRealDataSet harmRealDataSet = harmData(dataArrayList,item,dataSet.getDataLevel(),po.getCtRatio()); @@ -116,7 +117,11 @@ public class RtServiceImpl implements IRtService { harmRealDataSet.setPt(po.getPtRatio().floatValue()); harmRealDataSet.setCt(po.getCtRatio().floatValue()); harmRealDataSet.setDataLevel(dataSet.getDataLevel()); - long timestamp = item.getDataTimeSec() - 8*3600; + if (ObjectUtil.isNotNull(po.getLineNo())) { + timestamp = item.getDataTimeSec(); + } else { + timestamp = item.getDataTimeSec() - 8*3600; + } harmRealDataSet.setDataTime(getTime(timestamp)); publisher.send("/Web/RealData/" + lineId, new Gson().toJson(harmRealDataSet), 1, false); } @@ -323,6 +328,8 @@ public class RtServiceImpl implements IRtService { } } else if (Objects.equals(item.getHarmName(),"Pq_RmsFundU")) { harmRealDataSet.setData1(FloatUtils.get2Float(item.getData())); + } else if (Objects.equals(item.getHarmName(),"Pq_ThdU")) { + harmRealDataSet.setData1(FloatUtils.get2Float(item.getData())); } else { String numberStr = item.getHarmName().substring(item.getHarmName().lastIndexOf('_') + 1); String fieldName = "data" + numberStr; diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java index 7584a29..40adc37 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java @@ -1,16 +1,10 @@ package com.njcn.zlevent.api; -import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.constant.ServerInfo; -import com.njcn.common.pojo.enums.common.LogEnum; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; -import com.njcn.common.utils.HttpResultUtil; import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.message.CldLogMessage; import com.njcn.zlevent.api.fallback.EventClientFallbackFactory; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/RealDataConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/RealDataConsumer.java index cc0322c..341753b 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/RealDataConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/RealDataConsumer.java @@ -47,6 +47,8 @@ public class RealDataConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Resource + private CsTerminalReplyFeignClient csTerminalReplyFeignClient; + + @Override + @Transactional(rollbackFor = Exception.class) + public void handleMessage(UpdateLedgerMessage updateLedgerMessage) { + log.info("分发至更新台账响应处理程序"); + //收到消息修改(cs_terminal_reply) + List data = updateLedgerMessage.getData(); + if (ObjectUtil.isNotEmpty(data)) { + data.forEach(item->{ + if (item.getCode() == 200) { + csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),1,item.getDeviceId()); + } else { + csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),2,item.getDeviceId()); + } + }); + } + } + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(UpdateLedgerMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(UpdateLedgerMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(UpdateLedgerMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(UpdateLedgerMessage message) { + super.dispatchMessage(message); + } +} diff --git a/pom.xml b/pom.xml index 8786e24..e2f2e51 100644 --- a/pom.xml +++ b/pom.xml @@ -31,16 +31,47 @@ 物联网交互模块 - - 192.168.1.22 - - 127.0.0.1 - - 192.168.1.22 - + + + + + + + + + + 192.168.1.103 + 192.168.1.103 + 192.168.1.103 ${middle.server.url}:18848 - - 415a1c87-33aa-47bd-8e25-13cc456c87ed + + + + + + + + + + + + + + + + + + + + + + + + + + + + ${middle.server.url}:8080