From 0b32c09fdb42a00f8bbd9f0c28d906b9659994f7 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Thu, 3 Jul 2025 08:59:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B2=BB=E7=90=86=E8=AE=BE=E5=A4=87=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/pojo/po/CsLineRunData.java | 48 +++++++++++ iot-access/access-boot/pom.xml | 6 ++ .../controller/CsLineRunDataController.java | 52 ++++++++++++ .../access/handler/MqttMessageHandler.java | 79 ++++++++++++++----- .../listener/RedisKeyExpirationListener.java | 23 ++++-- .../access/mapper/CsLineRunDataMapper.java | 16 ++++ .../njcn/access/runner/AutoAccessTimer.java | 2 - .../access/service/ICsLineRunDataService.java | 20 +++++ .../service/impl/CsDeviceServiceImpl.java | 1 + .../impl/CsEquipmentDeliveryServiceImpl.java | 2 + .../impl/CsLineRunDataServiceImpl.java | 26 ++++++ .../java/com/njcn/rt/api/RtFeignClient.java | 7 ++ .../api/fallback/RtClientFallbackFactory.java | 10 ++- .../com/njcn/rt/controller/RtController.java | 11 +++ .../java/com/njcn/rt/service/IRtService.java | 6 ++ .../njcn/rt/service/impl/RtServiceImpl.java | 30 +++++++ .../service/impl/CsAlarmServiceImpl.java | 19 ++++- 17 files changed, 327 insertions(+), 31 deletions(-) create mode 100644 iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineRunData.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/controller/CsLineRunDataController.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsLineRunDataMapper.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/service/ICsLineRunDataService.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsLineRunDataServiceImpl.java diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineRunData.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineRunData.java new file mode 100644 index 0000000..88351ea --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineRunData.java @@ -0,0 +1,48 @@ +package com.njcn.access.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + *

+ * 治理设备模块运行状态记录表 + *

+ * + * @author xy + * @since 2025-06-26 + */ +@Getter +@Setter +@TableName("cs_line_run_data") +public class CsLineRunData implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 监测点id + */ + private String lineId; + + /** + * 子模块编号id(没有子模块,则为0) + */ + private Integer moduleId; + + /** + * 最新数据时间 + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime timeId; + + /** + * 子模块通讯状态(0:离线 1:连接) + */ + private Integer runState; + + +} diff --git a/iot-access/access-boot/pom.xml b/iot-access/access-boot/pom.xml index 3dc01ea..5598be1 100644 --- a/iot-access/access-boot/pom.xml +++ b/iot-access/access-boot/pom.xml @@ -20,6 +20,12 @@ + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + com.njcn access-api diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsLineRunDataController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsLineRunDataController.java new file mode 100644 index 0000000..204afd5 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsLineRunDataController.java @@ -0,0 +1,52 @@ +package com.njcn.access.controller; + +import com.njcn.access.pojo.po.CsLineRunData; +import com.njcn.access.service.ICsLineRunDataService; +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + *

+ * 治理设备模块运行状态记录表 前端控制器 + *

+ * + * @author xy + * @since 2025-06-26 + */ +@RestController +@RequestMapping("/csLineRunData") +@Slf4j +@Api(tags = "治理设备数据运行记录") +@AllArgsConstructor +public class CsLineRunDataController extends BaseController { + + private final ICsLineRunDataService csLineRunDataService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/add") + @ApiOperation("新增数据") + @ApiImplicitParam(name = "list", value = "参数", required = true) + public HttpResult addData(@RequestBody @Validated List list){ + String methodDescribe = getMethodDescribe("addData"); + csLineRunDataService.addData(list); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + +} + diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 08cef38..44a00db 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -1,6 +1,7 @@ package com.njcn.access.handler; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DatePattern; import cn.hutool.core.util.IdUtil; import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.excel.util.CollectionUtils; @@ -20,7 +21,6 @@ import com.njcn.access.pojo.dto.*; import com.njcn.access.pojo.dto.file.FileDto; import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.param.ReqAndResParam; -import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.pojo.po.CsTopic; import com.njcn.access.service.ICsDeviceOnlineLogsService; @@ -31,6 +31,7 @@ import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.*; +import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; import com.njcn.csdevice.pojo.param.CsLineParam; import com.njcn.csdevice.pojo.po.*; import com.njcn.device.biz.pojo.po.Overlimit; @@ -99,6 +100,7 @@ public class MqttMessageHandler { private final ChannelObjectUtil channelObjectUtil; private final WaveFeignClient waveFeignClient; private final RtFeignClient rtFeignClient; + private final CsCommunicateFeignClient csCommunicateFeignClient; @Autowired Validator validator; @@ -311,23 +313,12 @@ public class MqttMessageHandler { csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode()); csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode()); //记录设备上线 - CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); - CsDeviceOnlineLogs csDeviceOnlineLogs = new CsDeviceOnlineLogs(); - if(Objects.isNull(record)) { - csDeviceOnlineLogs.setNdid(nDid); - csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now()); - onlineLogsService.save(csDeviceOnlineLogs); - } else { - LocalDateTime time = record.getOfflineTime(); - if (!Objects.isNull(time)){ - csDeviceOnlineLogs.setNdid(nDid); - csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now()); - onlineLogsService.save(csDeviceOnlineLogs); - } else { - record.setOnlineTime(LocalDateTime.now()); - onlineLogsService.updateById(record); - } - } + PqsCommunicateDto dto = new PqsCommunicateDto(); + dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + dto.setDevId(nDid); + dto.setType(1); + dto.setDescription("通讯正常"); + csCommunicateFeignClient.insertion(dto); //询问设备软件信息 askDevData(nDid,version,1,mid); //更新治理监测点信息和设备容量 @@ -437,6 +428,13 @@ public class MqttMessageHandler { } } break; + case 15: + log.info("{}模块{}:处理实时数据", nDid, rspDataDto.getClDid()); + JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(res)); + AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class); + appAutoDataMessage.setId(nDid); + rtFeignClient.apfRtAnalysis(appAutoDataMessage); + break; case 48: log.info("询问装置项目列表"); logDto.setUserName("运维管理员"); @@ -461,10 +459,16 @@ public class MqttMessageHandler { break; } //csLogsFeignClient.addUserLog(logDto); + } else { + String result = getEnum(res.getCode()); + log.info(result); + logDto.setResult(0); + logDto.setFailReason(result); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(result); } } - /** * 装置心跳 && 主动数据上送 * fixme 这边由于接收文件数据时间跨度会很长,途中有其他请求进来会中断之前的程序,目前是记录中断的位置,等处理完成再继续请求接收文件 @@ -742,4 +746,41 @@ public class MqttMessageHandler { publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); } + public String getEnum(Integer code) { + String result = null; + switch (code) { + case 201: + result = AccessEnum.START_CHANNEL.getMessage(); + break; + case 202: + result = AccessEnum.WAIT_CHANNEL.getMessage(); + break; + case 400: + result = AccessEnum.FAIL.getMessage(); + break; + case 401: + result = AccessEnum.ERROR.getMessage(); + break; + case 402: + result = AccessEnum.REFUSE_WAIT.getMessage(); + break; + case 403: + result = AccessEnum.REFUSE_UNKNOWN.getMessage(); + break; + case 404: + result = AccessEnum.NOT_FIND.getMessage(); + break; + case 405: + result = AccessEnum.BUSY.getMessage(); + break; + case 406: + result = AccessEnum.TIME_OUT.getMessage(); + break; + default: + result = AccessEnum.OTHER_ERROR.getMessage(); + break; + } + return result; + } + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 6d58a3b..80b6d6d 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -1,5 +1,6 @@ package com.njcn.access.listener; +import cn.hutool.core.date.DatePattern; import com.njcn.access.enums.AccessEnum; import com.njcn.access.pojo.dto.NoticeUserDto; import com.njcn.access.pojo.po.CsDeviceOnlineLogs; @@ -10,11 +11,9 @@ import com.njcn.access.service.impl.CsDeviceServiceImpl; import com.njcn.access.utils.MqttUtil; import com.njcn.access.utils.SendMessageUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; -import com.njcn.csdevice.api.CsDeviceUserFeignClient; -import com.njcn.csdevice.api.CsLedgerFeignClient; -import com.njcn.csdevice.api.CsLogsFeignClient; -import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.api.*; import com.njcn.csdevice.pojo.dto.DevDetailDTO; +import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; @@ -75,6 +74,9 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private SendMessageUtil sendMessageUtil; @Resource private CsDeviceServiceImpl csDeviceServiceImpl; + @Resource + private CsCommunicateFeignClient csCommunicateFeignClient; + private final Object lock = new Object(); public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { @@ -116,9 +118,16 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene logDto.setOperate(nDid +"装置离线"); sendMessage(nDid); //记录装置掉线时间 - CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); - record.setOfflineTime(LocalDateTime.now()); - onlineLogsService.updateById(record); +// CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); +// record.setOfflineTime(LocalDateTime.now()); +// onlineLogsService.updateById(record); + //记录装置掉线时间 + PqsCommunicateDto dto = new PqsCommunicateDto(); + dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + dto.setDevId(nDid); + dto.setType(0); + dto.setDescription("通讯中断"); + csCommunicateFeignClient.insertion(dto); csLogsFeignClient.addUserLog(logDto); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsLineRunDataMapper.java b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsLineRunDataMapper.java new file mode 100644 index 0000000..d29c0ad --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsLineRunDataMapper.java @@ -0,0 +1,16 @@ +package com.njcn.access.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.access.pojo.po.CsLineRunData; + +/** + *

+ * 治理设备模块运行状态记录表 Mapper 接口 + *

+ * + * @author xy + * @since 2025-06-26 + */ +public interface CsLineRunDataMapper extends BaseMapper { + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java index 81736ff..f004acd 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java @@ -16,7 +16,6 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.*; @@ -80,7 +79,6 @@ public class AutoAccessTimer implements ApplicationRunner { log.error("任务执行异常", e.getCause()); } } - // 关闭ExecutorService executor.shutdown(); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsLineRunDataService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsLineRunDataService.java new file mode 100644 index 0000000..08e5599 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsLineRunDataService.java @@ -0,0 +1,20 @@ +package com.njcn.access.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.access.pojo.po.CsLineRunData; + +import java.util.List; + +/** + *

+ * 治理设备模块运行状态记录表 服务类 + *

+ * + * @author xy + * @since 2025-06-26 + */ +public interface ICsLineRunDataService extends IService { + + void addData(List list); + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 2524a31..0f77c12 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -613,6 +613,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { logDto.setResult(1); logDto.setOperate(nDid + "接入失败,装置客户端不在线"); csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); } boolean result = false; Map modelMap = new HashMap<>(); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index 05fd8c2..b1c944c 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -17,6 +17,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -123,6 +124,7 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl list = this.lambdaQuery() .eq(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.OFFLINE.getCode()) .eq(CsEquipmentDeliveryPO::getUsageStatus,1) + .in(CsEquipmentDeliveryPO::getStatus, Arrays.asList(2,3)) .list(); if (CollUtil.isNotEmpty(list)) { list.forEach(item->{ diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsLineRunDataServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsLineRunDataServiceImpl.java new file mode 100644 index 0000000..975c1f1 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsLineRunDataServiceImpl.java @@ -0,0 +1,26 @@ +package com.njcn.access.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.access.mapper.CsLineRunDataMapper; +import com.njcn.access.pojo.po.CsLineRunData; +import com.njcn.access.service.ICsLineRunDataService; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + *

+ * 治理设备模块运行状态记录表 服务实现类 + *

+ * + * @author xy + * @since 2025-06-26 + */ +@Service +public class CsLineRunDataServiceImpl extends ServiceImpl implements ICsLineRunDataService { + + @Override + public void addData(List list) { + this.saveBatch(list); + } +} diff --git a/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/RtFeignClient.java b/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/RtFeignClient.java index c6bba3f..da3d915 100644 --- a/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/RtFeignClient.java +++ b/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/RtFeignClient.java @@ -5,7 +5,11 @@ import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.rt.api.fallback.RtClientFallbackFactory; import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +import java.util.Map; /** * @author xy @@ -15,4 +19,7 @@ public interface RtFeignClient { @PostMapping("/rtAnalysis") HttpResult analysis(AppAutoDataMessage appAutoDataMessage); + + @PostMapping("/apfRtAnalysis") + HttpResult> apfRtAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage); } diff --git a/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/fallback/RtClientFallbackFactory.java b/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/fallback/RtClientFallbackFactory.java index 5d38043..ad8497f 100644 --- a/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/fallback/RtClientFallbackFactory.java +++ b/iot-analysis/analysis-rt/rt-api/src/main/java/com/njcn/rt/api/fallback/RtClientFallbackFactory.java @@ -9,6 +9,8 @@ import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Map; + /** * @author xy */ @@ -27,7 +29,13 @@ public class RtClientFallbackFactory implements FallbackFactory { @Override public HttpResult analysis(AppAutoDataMessage appAutoDataMessage) { - log.error("{}异常,降级处理,异常为:{}","实时数据解析",cause.toString()); + log.error("{}异常,降级处理,异常为:{}","便携式实时数据解析",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + + @Override + public HttpResult> apfRtAnalysis(AppAutoDataMessage appAutoDataMessage) { + log.error("{}异常,降级处理,异常为:{}","APF实时数据解析",cause.toString()); throw new BusinessException(finalExceptionEnum); } }; diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/controller/RtController.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/controller/RtController.java index 7a4aac8..6e61a8c 100644 --- a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/controller/RtController.java +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/controller/RtController.java @@ -19,6 +19,8 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.Map; + /** * 类的介绍: * @@ -45,5 +47,14 @@ public class RtController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/apfRtAnalysis") + @ApiOperation("APF实时数据解析") + @ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true) + public HttpResult apfRtAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage){ + String methodDescribe = getMethodDescribe("apfRtAnalysis"); + rtService.apfRtAnalysis(appAutoDataMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } } diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/IRtService.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/IRtService.java index 0cb125d..7f4a87a 100644 --- a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/IRtService.java +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/IRtService.java @@ -8,4 +8,10 @@ import com.njcn.mq.message.AppAutoDataMessage; public interface IRtService { void analysis(AppAutoDataMessage appAutoDataMessage); + + /** + * APF数据解析 + * @param appAutoDataMessage + */ + void apfRtAnalysis(AppAutoDataMessage appAutoDataMessage); } diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java index 444d7f7..59359ef 100644 --- a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/service/impl/RtServiceImpl.java @@ -97,6 +97,36 @@ public class RtServiceImpl implements IRtService { } } + @Override + public void apfRtAnalysis(AppAutoDataMessage appAutoDataMessage) { + List dataArrayList; + String lineId; + //监测点id + if (appAutoDataMessage.getDid() == 1){ + lineId = appAutoDataMessage.getId() + "0"; + } else { + lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid(); + } + //获取监测点基础信息 + CsLinePO po = csLineFeignClient.getById(lineId).getData(); + //获取数据集 dataSet + Integer idx = appAutoDataMessage.getMsg().getDsNameIdx(); + CsDataSet dataSet = dataSetFeignClient.getDataSetByIdx(po.getDataModelId(),idx).getData(); + + String key = "BaseRealData:" + lineId + idx; + Object object = redisUtil.getObjectByKey(key); + if (Objects.isNull(object)){ + dataArrayList = saveBaseRealDataSet(key,dataSet.getId()); + } else { + dataArrayList = channelObjectUtil.objectToList(object,CsDataArray.class); + } + //根据dataArray解析数据 + AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0); + Map map = getData(dataArrayList,item); + int data = Math.round(map.get("Apf_ModWorkingSts" + "M")); + redisUtil.saveByKeyWithExpire("ApfRtData:" + appAutoDataMessage.getMid(),data,10L); + } + /** * 时间处理 */ diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java index 93ce47d..9b5fc4b 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java @@ -3,8 +3,11 @@ package com.njcn.zlevent.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.utils.ChannelObjectUtil; +import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csharmonic.pojo.po.CsEventPO; @@ -33,6 +36,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import static com.njcn.csdevice.enums.AlgorithmResponseEnum.DATA_ERROR; + /** *

* 告警事件表 服务实现类 @@ -63,7 +68,7 @@ public class CsAlarmServiceImpl extends ServiceImpl im //获取装置id CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData(); List dictTreeList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE),SysDicTreePO.class); - String code = dictTreeList.stream().filter(item->Objects.equals(item.getId(),po.getDevType())).findFirst().orElse(null).getCode(); + String code = dictTreeList.stream().filter(item->Objects.equals(item.getId(),po.getDevType())).findFirst().orElse(null).getCode(); try { //便携式设备 @@ -82,6 +87,17 @@ public class CsAlarmServiceImpl extends ServiceImpl im List dataArray = appEventMessage.getMsg().getDataArray(); for (AppEventMessage.DataArray item : dataArray) { eventTime = eventService.timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); + tag = item.getName(); + //判断各模块事件,如果上次模块事件和这次一致,则不记录 + CsEventPO csEventPO = this.lambdaQuery().eq(CsEventPO::getLineId,lineId) + .eq(CsEventPO::getClDid,appEventMessage.getMsg().getClDid()) + .eq(CsEventPO::getProcess,po.getProcess()) + .orderByDesc(CsEventPO::getStartTime).last("LIMIT 1").one(); + if (csEventPO != null) { + if (Objects.equals(csEventPO.getTag(),tag)) { + throw new BusinessException(DATA_ERROR); + } + } //事件入库 CsEventPO csEvent = new CsEventPO(); csEvent.setLineId(lineId); @@ -90,7 +106,6 @@ public class CsAlarmServiceImpl extends ServiceImpl im csEvent.setProcess(po.getProcess()); csEvent.setCode(item.getCode()); csEvent.setStartTime(eventTime); - tag = item.getName(); csEvent.setTag(tag); csEvent.setType(3); csEvent.setClDid(appEventMessage.getMsg().getClDid());