diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java b/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java index 1900add..2584b2d 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/AskDeviceDataFeignClient.java @@ -1,15 +1,8 @@ package com.njcn.access.api; import com.njcn.access.api.fallback.AskDeviceDataClientFallbackFactory; -import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.constant.ServerInfo; -import com.njcn.common.pojo.enums.common.LogEnum; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; -import com.njcn.common.utils.HttpResultUtil; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -41,4 +34,7 @@ public interface AskDeviceDataFeignClient { @PostMapping("/askRealData") HttpResult askRealData(@RequestParam("nDid") String nDid, @RequestParam("idx") Integer idx, @RequestParam("clDId") Integer clDId); + @PostMapping("/askCldRealData") + HttpResult askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId); + } diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java index f905e0a..a53bd82 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java @@ -73,6 +73,12 @@ public class AskDeviceDataClientFallbackFactory implements FallbackFactory askCldRealData(String devId, String lineId) { + log.error("{}异常,降级处理,异常为:{}","询问云前置实时数据",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/iot-access/access-api/src/main/java/com/njcn/access/utils/RedisSetUtil.java b/iot-access/access-api/src/main/java/com/njcn/access/utils/RedisSetUtil.java new file mode 100644 index 0000000..304f16f --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/utils/RedisSetUtil.java @@ -0,0 +1,68 @@ +package com.njcn.access.utils; + +import com.njcn.redis.utils.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * @author 徐扬 + */ +@Component +@Slf4j +public class RedisSetUtil { + + @Autowired + private RedisUtil redisUtil; + + /** + * 向Redis Set中添加元素 + */ + public void addToSet(String key, String value, long expireSeconds) { + try { + Object existing = redisUtil.getObjectByKey(key); + Set set = convertToSet(existing); + set.add(value); + redisUtil.saveByKeyWithExpire(key, set, expireSeconds); + } catch (Exception e) { + log.error("向Redis Set添加元素失败,key: {}", key, e); + } + } + + /** + * 从Redis Set中移除元素 + */ + public void removeFromSet(String key, String value) { + try { + Object existing = redisUtil.getObjectByKey(key); + if (existing != null) { + Set set = convertToSet(existing); + set.remove(value); + redisUtil.saveByKey(key, set); + } + } catch (Exception e) { + log.error("从Redis Set移除元素失败,key: {}", key, e); + } + } + + /** + * 安全的对象到Set转换 + */ + public Set convertToSet(Object obj) { + if (obj == null) { + return new HashSet<>(); + } + if (obj instanceof Set) { + return new HashSet<>((Set) obj); + } + if (obj instanceof Collection) { + return new HashSet<>((Collection) obj); + } + log.warn("无法转换的对象类型: {}", obj.getClass().getName()); + return new HashSet<>(); + } +} \ No newline at end of file diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java index f0c0ffd..99ecc1c 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/AskDeviceDataController.java @@ -123,5 +123,17 @@ public class AskDeviceDataController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/askCldRealData") + @ApiOperation("询问云前置实时数据") + @ApiImplicitParams({ + @ApiImplicitParam(name = "devId", value = "装置id"), + @ApiImplicitParam(name = "lineId", value = "监测点id") + }) + public HttpResult askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId){ + String methodDescribe = getMethodDescribe("askCldRealData"); + askDeviceDataService.askCldRealData(devId,lineId); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 80b6d6d..bd0e13e 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -1,14 +1,17 @@ package com.njcn.access.listener; import cn.hutool.core.date.DatePattern; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.enums.AccessEnum; import com.njcn.access.pojo.dto.NoticeUserDto; import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsEquipmentDeliveryService; -import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; import com.njcn.access.utils.MqttUtil; +import com.njcn.access.utils.RedisSetUtil; import com.njcn.access.utils.SendMessageUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.csdevice.api.*; @@ -17,6 +20,7 @@ import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; +import com.njcn.rt.pojo.dto.BaseRealDataSet; import com.njcn.user.api.AppUserFeignClient; import com.njcn.user.api.UserFeignClient; import com.njcn.user.pojo.po.User; @@ -32,6 +36,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -46,8 +51,6 @@ import java.util.stream.Collectors; @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { - @Resource - private ICsTopicService csTopicService; @Resource private ICsEquipmentDeliveryService csEquipmentDeliveryService; @Resource @@ -73,9 +76,11 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene @Resource private SendMessageUtil sendMessageUtil; @Resource - private CsDeviceServiceImpl csDeviceServiceImpl; - @Resource private CsCommunicateFeignClient csCommunicateFeignClient; + @Resource + private MqttPublisher publisher; + @Resource + private RedisSetUtil redisSetUtil; private final Object lock = new Object(); @@ -101,6 +106,20 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene String nDid = expiredKey.split(":")[1]; executeMainTask(nDid); } + if(expiredKey.startsWith("cldRtDataOverTime:")){ + String lineId = expiredKey.split(":")[1]; + Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId); + if (ObjectUtil.isNotNull(redisObject)) { + Set userSet = redisSetUtil.convertToSet(redisObject); + userSet.forEach(userId->{ + BaseRealDataSet baseRealDataSet = new BaseRealDataSet(); + baseRealDataSet.setUserId(userId); + baseRealDataSet.setResult(false); + baseRealDataSet.setContent("设备未响应,超时中断"); + publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false); + }); + } + } } //主任务 @@ -118,10 +137,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene logDto.setOperate(nDid +"装置离线"); sendMessage(nDid); //记录装置掉线时间 -// CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); -// record.setOfflineTime(LocalDateTime.now()); -// onlineLogsService.updateById(record); - //记录装置掉线时间 PqsCommunicateDto dto = new PqsCommunicateDto(); dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); dto.setDevId(nDid); @@ -131,68 +146,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene csLogsFeignClient.addUserLog(logDto); } - //主任务 - //1.装置心跳断连 - //2.MQTT客户端不在线 -// private void executeMainTask(String nDid, String version) { -// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); -// log.info("{}->装置离线", nDid); -// DeviceLogDTO logDto = new DeviceLogDTO(); -// logDto.setUserName("运维管理员"); -// logDto.setLoginName("njcnyw"); -// //判断mqtt -// String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); -// boolean mqttClient = mqttUtil.judgeClientOnline(clientName); -// //心跳异常,但是客户端在线,则发送接入请求 -// if (mqttClient) { -// csDeviceService.devAccessAskTemplate(nDid,version,1); -// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); -// try { -// Thread.sleep(5000); -// Object object = redisUtil.getObjectByKey("online" + nDid); -// if (Objects.nonNull(object)) { -// scheduler.shutdown(); -// logDto.setOperate(nDid + "客户端在线重连成功"); -// } else { -// //装置下线 -// csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); -// //装置调整为注册状态 -// csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); -// logDto.setOperate(nDid +"装置离线"); -// sendMessage(nDid); -// -// //记录装置掉线时间 -// CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); -// record.setOfflineTime(LocalDateTime.now()); -// onlineLogsService.updateById(record); -// -// scheduler.shutdown(); -// } -// } catch (InterruptedException e) { -// scheduler.shutdown(); -// throw new RuntimeException(e); -// } -// csLogsFeignClient.addUserLog(logDto); -// } -// //客户端不在线则修改装置状态,进入定时任务 -// else { -// //装置下线 -// csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); -// //装置调整为注册状态 -// csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode()); -// logDto.setOperate(nDid +"主任务执行失败,装置下线"); -// csLogsFeignClient.addUserLog(logDto); -// sendMessage(nDid); -// -// //记录装置掉线时间 -// CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); -// record.setOfflineTime(LocalDateTime.now()); -// onlineLogsService.updateById(record); -// -// scheduler.shutdown(); -// } -// } - private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) { synchronized (lock) { //判断是否推送消息 diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java index dda1777..97852ad 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java @@ -203,8 +203,7 @@ public class AutoAccessTimer implements ApplicationRunner { // } // }; // //第一次执行的时间为120s,然后在前一个任务执行完毕后,等待120s再执行下一个任务 -//// scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS); -// scheduler.scheduleWithFixedDelay(task, 10, 10, TimeUnit.SECONDS); +// scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS); // } // // public void accessDev(List list) { diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java index 9fdae95..e16342f 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/AskDeviceDataService.java @@ -21,4 +21,6 @@ public interface AskDeviceDataService { * 实时数据请求报文 */ void askRealData(String nDid, Integer idx, Integer size); + + void askCldRealData(String devId, String lineId); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java index 1f72c2d..c317ab3 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java @@ -11,6 +11,8 @@ import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.service.AskDeviceDataService; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.enums.AlgorithmResponseEnum; +import com.njcn.mq.message.RealDataMessage; +import com.njcn.mq.template.RealDataMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import lombok.RequiredArgsConstructor; @@ -32,6 +34,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { private final MqttPublisher publisher; private final CsTopicFeignClient csTopicFeignClient; private final RedisUtil redisUtil; + private final RealDataMessageTemplate realDataMessageTemplate; private static Integer mid = 1; private static Integer range = 51200; @@ -209,6 +212,16 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); } + @Override + public void askCldRealData(String devId, String lineId) { + RealDataMessage realDataMessage = new RealDataMessage(); + realDataMessage.setDevSeries(devId); + realDataMessage.setLine(lineId); + realDataMessage.setRealData(true); + realDataMessage.setSoeData(true); + realDataMessage.setLimit(20); + realDataMessageTemplate.sendMember(realDataMessage); + } public Object getDeviceMid(String nDid) { return redisUtil.getObjectByKey(AppRedisKey.DEVICE_MID + nDid); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 20da7c3..a7d9e0a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -726,7 +726,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { } } publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())), 1, false); - redisUtil.saveByKeyWithExpire("startFile:" + nDid, null, 60L); +// redisUtil.saveByKeyWithExpire("startFile:" + nDid, null, 60L); result = true; } catch (Exception e) { DeviceLogDTO logDto = new DeviceLogDTO(); diff --git a/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/pojo/dto/BaseRealDataSet.java b/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/pojo/dto/BaseRealDataSet.java index 13588b8..88b3751 100644 --- a/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/pojo/dto/BaseRealDataSet.java +++ b/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/pojo/dto/BaseRealDataSet.java @@ -14,6 +14,12 @@ public class BaseRealDataSet implements Serializable { @ApiModelProperty("用户ID") private String userId; + @ApiModelProperty("结果(仅超时使用)") + private boolean result = true; + + @ApiModelProperty("描述") + private String content; + @ApiModelProperty("监测点id") private String lineId; diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java index 59359ef..bfa860b 100644 --- a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java @@ -1,9 +1,11 @@ package com.njcn.rt.service.impl; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ObjectUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.utils.ChannelObjectUtil; +import com.njcn.access.utils.RedisSetUtil; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.CsLineFeignClient; @@ -21,6 +23,7 @@ import com.njcn.rt.pojo.dto.HarmRealDataSet; import com.njcn.rt.service.IRtService; import com.njcn.web.utils.FloatUtils; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.lang.reflect.Field; @@ -36,6 +39,7 @@ import java.util.stream.Collectors; /** * @author xy */ +@Slf4j @Service @RequiredArgsConstructor public class RtServiceImpl implements IRtService { @@ -46,14 +50,14 @@ public class RtServiceImpl implements IRtService { private final RedisUtil redisUtil; private final ChannelObjectUtil channelObjectUtil; private final MqttPublisher publisher; + private final RedisSetUtil redisSetUtil; @Override public void analysis(AppAutoDataMessage appAutoDataMessage) { List dataArrayList; //监测点id String lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid(); - //用户Id - String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString(); + redisUtil.delete("cldRtDataOverTime:"+lineId); //获取监测点基础信息 CsLinePO po = csLineFeignClient.getById(lineId).getData(); //获取数据集 dataSet @@ -73,6 +77,8 @@ public class RtServiceImpl implements IRtService { //fixme 这边先根据数据集的名称来返回对应实体,这边感觉不太合适,后期有好方案再调整 //基础数据 if (dataSet.getName().contains("Ds$Pqd$Rt$Basic$")) { + //用户Id + String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString(); BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType()); baseRealDataSet.setUserId(userId); baseRealDataSet.setLineId(lineId); @@ -82,9 +88,28 @@ public class RtServiceImpl implements IRtService { long timestamp = item.getDataTimeSec() - 8*3600; baseRealDataSet.setDataTime(getTime(timestamp)); publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false); + } else if (dataSet.getName().contains("实时数据集合")) { + //用户Id + Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId); + if (ObjectUtil.isNotNull(redisObject)) { + Set userSet = redisSetUtil.convertToSet(redisObject); + userSet.forEach(userId->{ + BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType()); + baseRealDataSet.setUserId(userId); + baseRealDataSet.setLineId(lineId); + baseRealDataSet.setPt(po.getPtRatio().floatValue()); + baseRealDataSet.setCt(po.getCtRatio().floatValue()); + baseRealDataSet.setDataLevel(dataSet.getDataLevel()); + long timestamp = item.getDataTimeSec(); + baseRealDataSet.setDataTime(getTime(timestamp)); + publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false); + }); + } } //fixme 目前实时数据只有基础数据和谐波数据,后期拓展,这边需要再判断 else { + //用户Id + String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString(); HarmRealDataSet harmRealDataSet = harmData(dataArrayList,item,dataSet.getDataLevel(),po.getCtRatio()); harmRealDataSet.setUserId(userId); harmRealDataSet.setLineId(lineId); @@ -322,4 +347,26 @@ public class RtServiceImpl implements IRtService { return harmRealDataSet; } + private Set convertObjectToSetSafe(Object obj) { + if (obj == null) { + return new HashSet<>(); + } + if (obj instanceof Set) { + // 类型安全的转换 + Set rawSet = (Set) obj; + return rawSet.stream() + .filter(Objects::nonNull) + .map(Object::toString) + .collect(Collectors.toSet()); + } else if (obj instanceof Collection) { + return ((Collection) obj).stream() + .filter(Objects::nonNull) + .map(Object::toString) + .collect(Collectors.toSet()); + } else { + log.warn("Redis中的对象类型不是Set或Collection: {}", obj.getClass().getName()); + return new HashSet<>(); + } + } + } diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index 330469f..85d7538 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -33,7 +33,6 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.Instant; -import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.*; @@ -85,7 +84,7 @@ public class StatServiceImpl implements IStatService { List poList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DEVICE_LIST),CsEquipmentDeliveryPO.class); CsEquipmentDeliveryPO po = poList.stream().filter(item->Objects.equals(item.getNdid(),appAutoDataMessage.getId())).findFirst().orElse(null); List dictTreeList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE),SysDicTreePO.class); - String code = dictTreeList.stream().filter(item->Objects.equals(item.getId(),po.getDevType())).findFirst().orElse(null).getCode(); + String code = Objects.requireNonNull(dictTreeList.stream().filter(item -> Objects.equals(item.getId(), po.getDevType())).findFirst().orElse(null)).getCode(); //便携式设备 if (Objects.equals(DicDataEnum.PORTABLE.getCode(),code)) { @@ -93,13 +92,19 @@ public class StatServiceImpl implements IStatService { } //直连设备 else if (Objects.equals(DicDataEnum.CONNECT_DEV.getCode(),code)) { - if (Objects.equals(appAutoDataMessage.getDid(),1)){lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get("0").toString(); - } else if (Objects.equals(appAutoDataMessage.getDid(),2)){ + if (Objects.equals(appAutoDataMessage.getDid(),1)) { + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get("0").toString(); + } else if (Objects.equals(appAutoDataMessage.getDid(),2)) { lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get(appAutoDataMessage.getMsg().getClDid().toString()).toString(); } } + //云前置设备 + else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) { + lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid(); + } + //获取当前设备信息 - if (CollectionUtil.isNotEmpty(list)){ + if (CollectionUtil.isNotEmpty(list)) { List recordList = new ArrayList<>(); for (AppAutoDataMessage.DataArray item : list) { switch (item.getDataAttr()) { diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index 30adf51..8572115 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -3,6 +3,7 @@ package com.njcn.zlevent.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient; @@ -92,7 +93,6 @@ public class EventServiceImpl implements IEventService { //获取设备类型 true:治理设备 false:其他类型的设备 boolean devModel = equipmentFeignClient.judgeDevModel(appEventMessage.getId()).getData(); try { -// lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); if (devModel) { if (Objects.equals(appEventMessage.getDid(),1)){ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString(); @@ -103,74 +103,81 @@ public class EventServiceImpl implements IEventService { lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString(); } - - //处理事件数据 List dataArray = appEventMessage.getMsg().getDataArray(); for (AppEventMessage.DataArray item : dataArray) { eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); - id = IdUtil.fastSimpleUUID(); - //事件入库 - CsEventPO csEvent = new CsEventPO(); - csEvent.setId(id); - csEvent.setDeviceId(po.getId()); - csEvent.setProcess(po.getProcess()); - csEvent.setCode(item.getCode()); - eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); - csEvent.setStartTime(eventTime); - tag = item.getName(); - csEvent.setTag(tag); - if (Objects.equals(item.getType(),"2")){ - csEvent.setType(0); - } else if (Objects.equals(item.getType(),"3")){ - csEvent.setType(1); - } else if (Objects.equals(item.getType(),"1")){ - csEvent.setType(2); - } - csEvent.setLevel(Integer.parseInt(item.getType())); - csEvent.setClDid(appEventMessage.getMsg().getClDid()); - csEvent.setLineId(lineId); - //参数入库 - Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); - //判断是否有参数 - List params = item.getParam(); - if (CollectionUtil.isNotEmpty(params)) { - String tableName = map.get(item.getName()); - Map tags = new HashMap<>(); - tags.put(InfluxDBTableConstant.UUID,id); - Map fields = new HashMap<>(); - for (AppEventMessage.Param param : params) { - if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){ - csEvent.setPersistTime(Double.parseDouble(param.getData().toString())); - } - fields.put(param.getName(),param.getData()); + //判断事件是否存在,如果存在则不处理 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(CsEventPO::getDeviceId,po.getId()) + .eq(CsEventPO::getTag,tag) + .eq(CsEventPO::getStartTime,eventTime) + .eq(CsEventPO::getLineId,lineId); + List eventList = csEventService.list(queryWrapper); + if (CollectionUtil.isEmpty(eventList)) { + id = IdUtil.fastSimpleUUID(); + //事件入库 + CsEventPO csEvent = new CsEventPO(); + csEvent.setId(id); + csEvent.setDeviceId(po.getId()); + csEvent.setProcess(po.getProcess()); + csEvent.setCode(item.getCode()); + eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); + csEvent.setStartTime(eventTime); + tag = item.getName(); + csEvent.setTag(tag); + if (Objects.equals(item.getType(),"2")){ + csEvent.setType(0); + } else if (Objects.equals(item.getType(),"3")){ + csEvent.setType(1); + } else if (Objects.equals(item.getType(),"1")){ + csEvent.setType(2); } - //只有治理型号的设备有监测位置 - if (devModel) { - if (appEventMessage.getMsg().getClDid() == 1) { - fields.put("Evt_Param_Position","电网侧"); - csEvent.setLocation("grid"); - } else if (appEventMessage.getMsg().getClDid() == 2) { - fields.put("Evt_Param_Position","负载侧"); - csEvent.setLocation("load"); + csEvent.setLevel(Integer.parseInt(item.getType())); + csEvent.setClDid(appEventMessage.getMsg().getClDid()); + csEvent.setLineId(lineId); + //参数入库 + Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); + //判断是否有参数 + List params = item.getParam(); + if (CollectionUtil.isNotEmpty(params)) { + String tableName = map.get(item.getName()); + Map tags = new HashMap<>(); + tags.put(InfluxDBTableConstant.UUID,id); + Map fields = new HashMap<>(); + for (AppEventMessage.Param param : params) { + if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){ + csEvent.setPersistTime(Double.parseDouble(param.getData().toString())); + } + fields.put(param.getName(),param.getData()); } + //只有治理型号的设备有监测位置 + if (devModel) { + if (appEventMessage.getMsg().getClDid() == 1) { + fields.put("Evt_Param_Position","电网侧"); + csEvent.setLocation("grid"); + } else if (appEventMessage.getMsg().getClDid() == 2) { + fields.put("Evt_Param_Position","负载侧"); + csEvent.setLocation("load"); + } + } + //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 + Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + records.add(batchPoints.lineProtocol()); } - //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 - Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); - batchPoints.point(point); - records.add(batchPoints.lineProtocol()); + list1.add(csEvent); + //事件处理日志库 + CsEventLogs csEventLogs = new CsEventLogs(); + csEventLogs.setLineId(lineId); + csEventLogs.setDeviceId(po.getId()); + csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); + csEventLogs.setTag(item.getName()); + csEventLogs.setStatus(1); + csEventLogs.setTime(LocalDateTime.now()); + csEventLogsService.save(csEventLogs); } - list1.add(csEvent); - //事件处理日志库 - CsEventLogs csEventLogs = new CsEventLogs(); - csEventLogs.setLineId(lineId); - csEventLogs.setDeviceId(po.getId()); - csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); - csEventLogs.setTag(item.getName()); - csEventLogs.setStatus(1); - csEventLogs.setTime(LocalDateTime.now()); - csEventLogsService.save(csEventLogs); } //cs_event入库 if (CollectionUtil.isNotEmpty(list1)){ diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java b/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java index 36b620a..71e35d2 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java @@ -15,7 +15,6 @@ import org.springframework.context.annotation.DependsOn; @Slf4j @EnableFeignClients(basePackages = "com.njcn") @SpringBootApplication(scanBasePackages = "com.njcn") -@DependsOn("proxyMapperRegister") public class MessageBootApplication { public static void main(String[] args) { diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/RealDataConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/RealDataConsumer.java new file mode 100644 index 0000000..cc0322c --- /dev/null +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/RealDataConsumer.java @@ -0,0 +1,135 @@ +package com.njcn.message.consumer; + +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.constant.MessageStatus; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.rt.api.RtFeignClient; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:32 + */ +@Service +@RocketMQMessageListener( + topic = BusinessTopic.CLD_HANDLE_REAL_DATA_TOPIC, + consumerGroup = BusinessTopic.CLD_HANDLE_REAL_DATA_TOPIC, + selectorExpression = BusinessTopic.AppDataTag.RT_TAG, + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class RealDataConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Resource + private RtFeignClient rtFeignClient; + + @Override + protected void handleMessage(AppAutoDataMessage appAutoDataMessage) { + log.info("分发至实时数据"); + rtFeignClient.analysis(appAutoDataMessage); + } + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(AppAutoDataMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(AppAutoDataMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(AppAutoDataMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(AppAutoDataMessage appAutoDataMessage) { + super.dispatchMessage(appAutoDataMessage); + } +}