From 335997cdf65e8a7c25c82b82c9bfdbadd50dcaff Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Fri, 13 Sep 2024 20:41:18 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=BD=95=E6=B3=A2=E6=96=87=E4=BB=B6=E4=B8=8B?= =?UTF-8?q?=E8=BD=BD=E5=A4=84=E7=90=86=EF=BC=9B=202.=E8=A3=85=E7=BD=AE?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E5=91=8A=E8=AD=A6=E4=BA=8B=E4=BB=B6=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AskDeviceDataClientFallbackFactory.java | 2 +- .../com/njcn/access/pojo/dto/AutoDataDto.java | 4 + .../com/njcn/access/pojo/dto/EventDto.java | 4 + .../access/handler/MqttMessageHandler.java | 21 +++ .../listener/RedisKeyExpirationListener.java | 8 +- .../impl/AskDeviceDataServiceImpl.java | 4 +- .../service/impl/CsDevModelServiceImpl.java | 7 +- .../njcn/zlevent/api/EventFeignClient.java | 4 + .../njcn/zlevent/api/EvtErrorFeignClient.java | 19 +++ .../com/njcn/zlevent/api/WaveFeignClient.java | 4 +- .../fallback/EventClientFallbackFactory.java | 6 + .../EvtErrorClientFallbackFactory.java | 35 +++++ .../fallback/WaveClientFallbackFactory.java | 7 +- .../njcn/zlevent/pojo/dto/FileStreamDto.java | 3 +- .../njcn/zlevent/pojo/dto/WaveTimeDto.java | 4 + .../com/njcn/zlevent/pojo/po/CsDevErrEvt.java | 43 +++++ .../controller/CsDevErrEvtController.java | 50 ++++++ .../zlevent/controller/WaveController.java | 2 +- .../listener/RedisKeyExpirationListener.java | 117 +++++++++++--- .../zlevent/mapper/CsDevErrEvtMapper.java | 16 ++ .../zlevent/service/ICsDevErrEvtService.java | 23 +++ .../service/ICsWaveAnalysisService.java | 2 + .../service/impl/CsAlarmServiceImpl.java | 38 ++--- .../service/impl/CsDevErrEvtServiceImpl.java | 61 ++++++++ .../impl/CsWaveAnalysisServiceImpl.java | 57 ++++--- .../service/impl/EventServiceImpl.java | 147 ++++++++---------- .../zlevent/service/impl/FileServiceImpl.java | 56 +++---- .../message/consumer/AppEventConsumer.java | 7 + 28 files changed, 567 insertions(+), 184 deletions(-) create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EvtErrorFeignClient.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EvtErrorClientFallbackFactory.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsDevErrEvt.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsDevErrEvtController.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsDevErrEvtMapper.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsDevErrEvtService.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsDevErrEvtServiceImpl.java diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java index 238c80a..0ff180f 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/AskDeviceDataClientFallbackFactory.java @@ -45,7 +45,7 @@ public class AskDeviceDataClientFallbackFactory implements FallbackFactory downloadFile(String nDid, String name, Integer size, String fileCheck) { log.error("{}异常,降级处理,异常为:{}","文件下载",cause.toString()); - redisUtil.delete("fileDowning"); + redisUtil.delete("fileDowning:" + nDid); redisUtil.delete("fileCheck"+name); throw new BusinessException(finalExceptionEnum); } diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/AutoDataDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/AutoDataDto.java index 97adeaa..d241fe5 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/AutoDataDto.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/AutoDataDto.java @@ -70,6 +70,10 @@ public class AutoDataDto { @ApiModelProperty("数据是否参与合格率统计") private Integer dataTag; + @SerializedName("Code") + @ApiModelProperty("事件码") + private Integer code; + @SerializedName("Data") private String data; diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java index 95b38da..a05bb4a 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java @@ -76,6 +76,10 @@ public class EventDto { @ApiModelProperty("告警故障编码(一般显示为Hex)") private String code; + @SerializedName("DataTag") + @ApiModelProperty("数据标识,1-标识数据异常") + private Integer dataTag; + @SerializedName("Parm") private List param; } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 5f15eb8..6e6366a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -638,6 +638,27 @@ public class MqttMessageHandler { } } + /** + * 装置异常事件记录 + * @param topic + * @param message + * @param version + * @param nDid + * @param payload + */ + @MqttSubscribe(value = "/Dev/Error/{edgeId}",qos = 1) + @Transactional(rollbackFor = Exception.class) + public void devErrorInfo(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) { + //解析数据 + Gson gson = new Gson(); + log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8)); + EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class); + JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto)); + AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class); + appEventMessage.setId(nDid); + appEventMessageTemplate.sendMember(appEventMessage); + } + private void saveDirectoryInfo(List dirInfo, String key) { if (!CollectionUtil.isEmpty(dirInfo)) { redisUtil.saveByKeyWithExpire(key, dirInfo, 10L); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index c11fab3..c100461 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -167,17 +167,17 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene return; } else { logDto.setResult(0); - //重连三次仍未成功,则推送告警消息 + //一个小时未连接上,则推送告警消息 MAX_WARNING_TIMES++; - if (MAX_WARNING_TIMES == 3) { + if (MAX_WARNING_TIMES == 30) { NoticeUserDto dto2 = sendConnectFail(nDid); sendEventToUser(dto2); } } } else { - //重连三次仍未成功,则推送告警消息 + //一个小时未连接上,则推送告警消息 MAX_WARNING_TIMES++; - if (MAX_WARNING_TIMES == 3) { + if (MAX_WARNING_TIMES == 30) { NoticeUserDto dto2 = sendConnectFail(nDid); sendEventToUser(dto2); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java index b979e53..76d37c7 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/AskDeviceDataServiceImpl.java @@ -84,7 +84,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { @Override public boolean downloadFile(String nDid, String name, Integer size, String fileCheck) { boolean result = true; - redisUtil.saveByKeyWithExpire("fileDowning","fileDowning",300L); + redisUtil.saveByKeyWithExpire("fileDowning:"+nDid,"fileDowning",300L); redisUtil.saveByKeyWithExpire("fileCheck"+name,fileCheck,300L); int length = size/51200 + 1; try { @@ -110,7 +110,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService { redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); } } catch (Exception e) { - redisUtil.delete("fileDowning"); + redisUtil.delete("fileDowning:"+nDid); redisUtil.delete("fileCheck"+name); throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOAD_ERROR); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index 35e9246..776aebe 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -170,7 +170,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { if (Objects.nonNull(object)) { throw new BusinessException(AlgorithmResponseEnum.FILE_UPLOADING); } - Object object2 = redisUtil.getObjectByKey("fileDowning"); + Object object2 = redisUtil.getObjectByKey("fileDowning:" + id); if (Objects.nonNull(object2)) { throw new BusinessException(AlgorithmResponseEnum.FILE_BUSY); } @@ -190,6 +190,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService { //需要循环的次数 int times = bytes.length / cap + 1; for (int i = 1; i <= times; i++) { + //发送数据给前端 + String json = "{allStep:\""+times+"\",nowStep:"+i+"}"; + publisher.send("/Web/Progress", new Gson().toJson(json), 1, false); DeviceLogDTO logDto = new DeviceLogDTO(); byte[] lsBytes; if (length > 50*1024) { @@ -219,6 +222,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService { csLogsFeignClient.addUserLog(logDto); } } else { + String json = "{allStep:\""+1+"\",nowStep:"+1+"}"; + publisher.send("/Web/Progress", new Gson().toJson(json), 1, false); ReqAndResDto.Req req = getPojo(1,path,file,length,bytes,0,hexString); publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); DeviceLogDTO logDto = new DeviceLogDTO(); diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java index 42a4fad..c9d02b9 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java @@ -15,4 +15,8 @@ public interface EventFeignClient { @PostMapping("/analysis") HttpResult analysis(AppEventMessage appEventMessage); + + @PostMapping("/errorEvent") + HttpResult insertErrorEvent(AppEventMessage appEventMessage); + } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EvtErrorFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EvtErrorFeignClient.java new file mode 100644 index 0000000..bdd5de5 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EvtErrorFeignClient.java @@ -0,0 +1,19 @@ +package com.njcn.zlevent.api; + +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.api.fallback.EvtErrorClientFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; + +/** + * @author xy + */ +@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/csDevErrEvt", fallbackFactory = EvtErrorClientFallbackFactory.class,contextId = "csDevErrEvt") +public interface EvtErrorFeignClient { + + @PostMapping("/errorEvent") + HttpResult insertErrorEvent(AppEventMessage appEventMessage); + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java index 17be4ec..ac8a313 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java @@ -3,14 +3,14 @@ package com.njcn.zlevent.api; import com.njcn.common.pojo.constant.ServerInfo; import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppEventMessage; -import com.njcn.zlevent.api.fallback.EventClientFallbackFactory; +import com.njcn.zlevent.api.fallback.WaveClientFallbackFactory; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; /** * @author xy */ -@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/wave", fallbackFactory = EventClientFallbackFactory.class,contextId = "wave") +@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/wave", fallbackFactory = WaveClientFallbackFactory.class,contextId = "wave") public interface WaveFeignClient { @PostMapping("/analysis") diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java index 0be5fa7..495882d 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java @@ -30,6 +30,12 @@ public class EventClientFallbackFactory implements FallbackFactory insertErrorEvent(AppEventMessage appEventMessage) { + log.error("{}异常,降级处理,异常为:{}","异常事件统计",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EvtErrorClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EvtErrorClientFallbackFactory.java new file mode 100644 index 0000000..63222ea --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EvtErrorClientFallbackFactory.java @@ -0,0 +1,35 @@ +package com.njcn.zlevent.api.fallback; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.api.EvtErrorFeignClient; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author xy + */ +@Slf4j +@Component +public class EvtErrorClientFallbackFactory implements FallbackFactory { + @Override + public EvtErrorFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if (cause.getCause() instanceof BusinessException) { + BusinessException businessException = (BusinessException) cause.getCause(); + } + Enum finalExceptionEnum = exceptionEnum; + return new EvtErrorFeignClient() { + + @Override + public HttpResult insertErrorEvent(AppEventMessage appEventMessage) { + log.error("{}异常,降级处理,异常为:{}","异常事件统计",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java index 2fcdf17..dcd6b8a 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java @@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppEventMessage; import com.njcn.zlevent.api.EventFeignClient; +import com.njcn.zlevent.api.WaveFeignClient; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -14,16 +15,16 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -public class WaveClientFallbackFactory implements FallbackFactory { +public class WaveClientFallbackFactory implements FallbackFactory { @Override - public EventFeignClient create(Throwable cause) { + public WaveFeignClient create(Throwable cause) { //判断抛出异常是否为解码器抛出的业务异常 Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; if (cause.getCause() instanceof BusinessException) { BusinessException businessException = (BusinessException) cause.getCause(); } Enum finalExceptionEnum = exceptionEnum; - return new EventFeignClient() { + return new WaveFeignClient() { @Override public HttpResult analysis(AppEventMessage appEventMessage) { diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java index 3596013..848883e 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java @@ -5,6 +5,7 @@ import lombok.Data; import java.io.Serializable; import java.util.HashSet; import java.util.List; +import java.util.Set; /** * 类的介绍: @@ -22,6 +23,6 @@ public class FileStreamDto implements Serializable { private Integer frameLen; - private List list; + private Set list; } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java index d131772..5abaa5a 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java @@ -12,8 +12,12 @@ import lombok.Data; @Data public class WaveTimeDto { + private String fileName; + private String deviceId; + private String nDid; + private String lineId; private String startTime; diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsDevErrEvt.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsDevErrEvt.java new file mode 100644 index 0000000..300e462 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsDevErrEvt.java @@ -0,0 +1,43 @@ +package com.njcn.zlevent.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + *

+ * 装置异常事件统计 + *

+ * + * @author xy + * @since 2024-09-12 + */ +@Data +@TableName("cs_dev_err_evt") +public class CsDevErrEvt { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + private String id; + + /** + * 设备识别码 + */ + private String ndid; + + /** + * 事件发生时间 + */ + private LocalDateTime evtTime; + + /** + * 事件code编码 + */ + private String code; + + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsDevErrEvtController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsDevErrEvtController.java new file mode 100644 index 0000000..46e6d87 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsDevErrEvtController.java @@ -0,0 +1,50 @@ +package com.njcn.zlevent.controller; + + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.web.controller.BaseController; +import com.njcn.zlevent.service.ICsDevErrEvtService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + *

+ * 装置异常事件统计 前端控制器 + *

+ * + * @author xy + * @since 2024-09-12 + */ +@RestController +@Slf4j +@RequestMapping("/csDevErrEvt") +@Api(tags = "装置异常事件处理") +@AllArgsConstructor +public class CsDevErrEvtController extends BaseController { + + private final ICsDevErrEvtService csDevErrEvtService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/errorEvent") + @ApiOperation("异常事件统计") + @ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true) + public HttpResult insertErrorEvent(@RequestBody AppEventMessage appEventMessage){ + String methodDescribe = getMethodDescribe("insertErrorEvent"); + csDevErrEvtService.insertErrorEvent(appEventMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + +} + diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java index f480bff..d6b41ee 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java @@ -36,7 +36,7 @@ public class WaveController extends BaseController { @OperateInfo(info = LogEnum.BUSINESS_COMMON) @PostMapping("/analysis") - @ApiOperation("录波解析") + @ApiOperation("录波事件") @ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true) public HttpResult analysis(@RequestBody AppEventMessage appEventMessage){ String methodDescribe = getMethodDescribe("analysis"); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java index 8e7fb5e..f85f94f 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java @@ -8,13 +8,18 @@ import com.njcn.access.api.CsTopicFeignClient; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.dto.ReqAndResDto; +import com.njcn.access.pojo.dto.file.FileRedisDto; +import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.stat.enums.StatResponseEnum; import com.njcn.system.api.EpdFeignClient; import com.njcn.system.pojo.dto.EpdDTO; import com.njcn.zlevent.pojo.dto.FileStreamDto; +import com.njcn.zlevent.pojo.dto.WaveTimeDto; +import com.njcn.zlevent.service.ICsWaveAnalysisService; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; import org.apache.commons.lang3.StringUtils; @@ -25,10 +30,7 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.IntStream; /** @@ -48,6 +50,13 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private CsTopicFeignClient csTopicFeignClient; @Resource private MqttPublisher publisher; + @Resource + private ChannelObjectUtil channelObjectUtil; + @Resource + private ICsWaveAnalysisService iCsWaveAnalysisService; + private static Integer mid = 1; + private static Integer range = 51200; + public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @@ -85,35 +94,103 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene int end = dto.getTotal(); IntStream.rangeClosed(start, end) .filter(i -> !dto.getList().contains(i)) - .forEach(missingNumber -> { - log.info("缺失的数字:{}",missingNumber); - missingList.add(missingNumber); - }); - redisUtil.saveByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName), missingList); - Integer offset = (missingList.get(0) - 1) * dto.getFrameLen(); - askMissingFileStream(dto.getNDid(),missingList.get(0),fileName,offset,dto.getFrameLen()); + .forEach(missingList::add); + Object object = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName)); + if (CollectionUtil.isNotEmpty(missingList) && Objects.isNull(object)) { + downloadFile(missingList,dto.getNDid(),fileName); + } } } + //请求缺失的数据 + public void downloadFile( List missingList, String nDid, String name) { + for (Integer missingNumber : missingList) { + int i = missingNumber - 1; + Object object = getDeviceMid(nDid); + if (!Objects.isNull(object)) { + mid = (Integer) object; + } + ReqAndResDto.Req reqAndResParam = getPojo(mid,name,i); + publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false); + //判断是否重发 + sendNextStep(name,nDid,mid,i,nDid); + FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + name + mid); + //重发之后判断继续循环还是跳出循环 + if (!Objects.isNull(fileRedisDto) && !Objects.equals(fileRedisDto.getCode(),200)) { + break; + } + mid = mid + 1; + if (mid > 10000) { + mid = 1; + } + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); + } + } - public void askMissingFileStream(String nDid, Integer mid, String fileName, Integer offset, Integer len) { - String version = csTopicFeignClient.find(nDid).getData(); + public Object getDeviceMid(String nDid) { + return redisUtil.getObjectByKey(AppRedisKey.DEVICE_MID + nDid); + } + + /** + * 文件下载请求报文 + */ + public ReqAndResDto.Req getPojo(Integer mid, String fileName, Integer step) { + String json; ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); reqAndResParam.setMid(mid); reqAndResParam.setDid(0); reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode())); reqAndResParam.setExpire(-1); - String json = "{Name:\""+fileName+"\",Offset:"+offset+",Len:"+len+"}"; + json = "{Name:\""+fileName+"\",TransferMode:"+1+",Offset:"+(step*range)+",Len:"+range+"}"; JSONObject jsonObject = JSONObject.fromObject(json); reqAndResParam.setMsg(jsonObject); - publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); - log.info("请求文件流报文:" + new Gson().toJson(reqAndResParam)); + return reqAndResParam; } - - - - + /** + * 根据装置响应来判断是否询问下一帧数据 + */ + public void sendNextStep(String fileName, String id, int mid,int step, String nDid) { + try { + for (int i = 1; i < 31; i++) { + if (step == 0 ){ + Thread.sleep(5000); + } else { + Thread.sleep(2000); + } + FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + fileName + mid); + if (Objects.isNull(fileRedisDto)) { + FileRedisDto failDto = new FileRedisDto(); + failDto.setCode(400); + redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L); + } else { + if (Objects.equals(fileRedisDto.getCode(),200)) { + redisUtil.delete("handleEvent:" + nDid); + redisUtil.delete(AppRedisKey.FILE_PART.concat(fileName)); + redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); + //删除已经处理完的文件,之后再判断还有是否需要下载的文件 + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class); + fileDto.removeIf(item2 -> item2.getFileName().equals(fileName)); + redisUtil.saveByKey("eventFile:" + nDid, fileDto); + if (CollectionUtil.isNotEmpty(fileDto)) { + iCsWaveAnalysisService.channelWave(nDid); + } + break; + } else { + log.info("第" +i+"次尝试"); + //尝试失败则设置code为400,如果装置响应了,则会将code置为200 + FileRedisDto failDto = new FileRedisDto(); + failDto.setCode(400); + redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L); + ReqAndResDto.Req req = getPojo(mid,fileName,step); + publisher.send("/Pfm/DevFileCmd/V1" + id, new Gson().toJson(req), 1, false); + } + } + } + } catch (Exception e) { + throw new BusinessException(AlgorithmResponseEnum.ASK_DEVICE_DIR_ERROR); + } + } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsDevErrEvtMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsDevErrEvtMapper.java new file mode 100644 index 0000000..ac729dd --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsDevErrEvtMapper.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.zlevent.pojo.po.CsDevErrEvt; + +/** + *

+ * 装置异常事件统计 Mapper 接口 + *

+ * + * @author xy + * @since 2024-09-12 + */ +public interface CsDevErrEvtMapper extends BaseMapper { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsDevErrEvtService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsDevErrEvtService.java new file mode 100644 index 0000000..72b9023 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsDevErrEvtService.java @@ -0,0 +1,23 @@ +package com.njcn.zlevent.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.pojo.po.CsDevErrEvt; + +/** + *

+ * 装置异常事件统计 服务类 + *

+ * + * @author xy + * @since 2024-09-12 + */ +public interface ICsDevErrEvtService extends IService { + + /** + * 将装置推送的异常事件统计,目前先入库 + * @param appEventMessage + */ + void insertErrorEvent(AppEventMessage appEventMessage); + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveAnalysisService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveAnalysisService.java index 929cc0b..b09a0c5 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveAnalysisService.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveAnalysisService.java @@ -20,4 +20,6 @@ public interface ICsWaveAnalysisService { */ void analysis(AppEventMessage appEventMessage); + void channelWave(String nDid); + } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java index 6876220..e82f7e4 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java @@ -2,7 +2,6 @@ package com.njcn.zlevent.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; @@ -58,29 +57,20 @@ public class CsAlarmServiceImpl extends ServiceImpl im List dataArray = appEventMessage.getMsg().getDataArray(); for (AppEventMessage.DataArray item : dataArray) { eventTime = eventService.timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); - LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - wrapper.eq(CsEventPO::getDeviceId,po.getId()) - .eq(CsEventPO::getProcess,po.getProcess()) - .eq(CsEventPO::getCode,item.getCode()) - .eq(CsEventPO::getStartTime,eventTime) - .eq(CsEventPO::getTag,item.getName()); - List list = csEventService.list(wrapper); - if (CollectionUtil.isEmpty(list)) { - //事件入库 - CsEventPO csEvent = new CsEventPO(); - csEvent.setId(id); - csEvent.setDeviceId(po.getId()); - csEvent.setProcess(po.getProcess()); - csEvent.setCode(item.getCode()); - csEvent.setStartTime(eventTime); - tag = item.getName(); - csEvent.setTag(tag); - csEvent.setType(3); - csEvent.setClDid(appEventMessage.getMsg().getClDid()); - csEvent.setLevel(Integer.parseInt(item.getType())); - csEvent.setCode(item.getCode()); - list1.add(csEvent); - } + //事件入库 + CsEventPO csEvent = new CsEventPO(); + csEvent.setId(id); + csEvent.setDeviceId(po.getId()); + csEvent.setProcess(po.getProcess()); + csEvent.setCode(item.getCode()); + csEvent.setStartTime(eventTime); + tag = item.getName(); + csEvent.setTag(tag); + csEvent.setType(3); + csEvent.setClDid(appEventMessage.getMsg().getClDid()); + csEvent.setLevel(Integer.parseInt(item.getType())); + csEvent.setCode(item.getCode()); + list1.add(csEvent); } if (CollectionUtil.isNotEmpty(list1)){ csEventService.saveBatch(list1); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsDevErrEvtServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsDevErrEvtServiceImpl.java new file mode 100644 index 0000000..10f4dd5 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsDevErrEvtServiceImpl.java @@ -0,0 +1,61 @@ +package com.njcn.zlevent.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.mapper.CsDevErrEvtMapper; +import com.njcn.zlevent.pojo.po.CsDevErrEvt; +import com.njcn.zlevent.service.ICsDevErrEvtService; +import org.springframework.stereotype.Service; + +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +/** + *

+ * 装置异常事件统计 服务实现类 + *

+ * + * @author xy + * @since 2024-09-12 + */ +@Service +public class CsDevErrEvtServiceImpl extends ServiceImpl implements ICsDevErrEvtService { + + @Override + public void insertErrorEvent(AppEventMessage appEventMessage) { + List list = new ArrayList<>(); + List dataArrayList = appEventMessage.getMsg().getDataArray(); + for (AppEventMessage.DataArray dataArray : dataArrayList) { + CsDevErrEvt evt = new CsDevErrEvt(); + evt.setNdid(appEventMessage.getId()); + evt.setEvtTime(timeFormat(dataArray.getDataTimeSec(),dataArray.getDataTimeUSec())); + evt.setCode(dataArray.getCode()); + list.add(evt); + } + this.saveBatch(list); + } + + /** + * 时间转换 + */ + public LocalDateTime timeFormat(Long time1, Long time2) { + String time; + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + time1 = time1 - 8*3600; + long t1 = time1 * 1000000 + time2; + String time1String = String.valueOf(t1); + String time11 = time1String.substring(time1String.length() - 6); + String time111 = time1String.substring(0,time1String.length() - 6); + String formatTime1 = format.format(Long.parseLong(time111) * 1000); + if (time2 == 0){ + time = formatTime1 + ".000000"; + } else { + time = formatTime1 + "." + time11; + } + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + return LocalDateTime.parse(time, fmt); + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java index 07699e9..8b6f830 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java @@ -7,9 +7,11 @@ import com.njcn.access.api.CsTopicFeignClient; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.dto.ReqAndResDto; +import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.mq.message.AppEventMessage; import com.njcn.redis.pojo.enums.AppRedisKey; @@ -24,6 +26,7 @@ import com.njcn.zlevent.service.ICsWaveAnalysisService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; @@ -54,11 +57,11 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { private final DicDataFeignClient dicDataFeignClient; + private final ChannelObjectUtil channelObjectUtil; + @Override public void analysis(AppEventMessage appEventMessage) { - int mid = 1; - //获取监测点 - String lineId = null; + List list = new ArrayList<>(); Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); if (Objects.isNull(object1)){ lineInfo(appEventMessage.getId()); @@ -69,30 +72,43 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { List dataArrayList = appEventMessage.getMsg().getDataArray(); if (CollectionUtil.isNotEmpty(dataArrayList)){ for (AppEventMessage.DataArray item : dataArrayList) { - //处理mid - if (Objects.equals(mid,10000)){ - mid = 1; - } List paramList = item.getParam(); Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData(); Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData(); Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData(); - if (Objects.equals(object3.toString(),ZlConstant.GRID)){ - lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString(); - } else if (Objects.equals(object3.toString(),ZlConstant.LOAD)){ - lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString(); - } else { - lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString(); - } + String lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); String fileName = object.toString().replaceAll("\\[","").replaceAll("]",""); List fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList()); + //获取到录波文件,将文件信息存储起来 for (String file : fileList) { file = file.trim(); - askFileInfo(appEventMessage.getId(),mid,file); - mid++; - channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString()); + WaveTimeDto dto = channelTimeRange(file,appEventMessage.getId(),item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString()); + list.add(dto); + } + Object obj = redisUtil.getObjectByKey("eventFile:" + appEventMessage.getId()); + if (!Objects.isNull(obj)) { + List oldList = channelObjectUtil.objectToList(obj,WaveTimeDto.class); + oldList.addAll(list); + redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), oldList); + } else { + redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), list); } } + //处理缓存数据 + channelWave(appEventMessage.getId()); + } + } + + @Override + @Async("asyncExecutor") + public void channelWave(String nDid) { + //有相同文件处理时,则不进行数据处理 + Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid); + if (Objects.isNull(obj)) { + WaveTimeDto dto = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class).get(0); + askFileInfo(nDid,1,dto.getFileName()); + } else { + throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING); } } @@ -114,10 +130,11 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); } + /** * 时间处理 */ - public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId, String location) { + public WaveTimeDto channelTimeRange(String fileName,String nDid, long time, long subtleTime, Double millisecond, String deviceId, String lineId, String location) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); time = time - 8*3600; // 将millisecond转换为长整型,并乘以1000以获取微秒 @@ -136,12 +153,14 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { String formatTime2 = format.format(Long.parseLong(time222) * 1000); WaveTimeDto waveTimeDto = new WaveTimeDto(); + waveTimeDto.setFileName(fileName); waveTimeDto.setStartTime(formatTime1 + "." + time11); waveTimeDto.setEndTime(formatTime2 + "." + time22); waveTimeDto.setDeviceId(deviceId); + waveTimeDto.setNDid(nDid); waveTimeDto.setLineId(lineId); waveTimeDto.setLocation(location); - redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,600L); + return waveTimeDto; } /** diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index 869f148..869d48d 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -3,7 +3,6 @@ package com.njcn.zlevent.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient; @@ -94,85 +93,75 @@ public class EventServiceImpl implements IEventService { List dataArray = appEventMessage.getMsg().getDataArray(); for (AppEventMessage.DataArray item : dataArray) { eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); - //此处做限制,如果已有事件则不在记录通知 - LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - wrapper.eq(CsEventPO::getDeviceId,po.getId()) - .eq(CsEventPO::getProcess,po.getProcess()) - .eq(CsEventPO::getCode,item.getCode()) - .eq(CsEventPO::getStartTime,eventTime) - .eq(CsEventPO::getTag,item.getName()); - List list = csEventService.list(wrapper); - if (CollectionUtil.isEmpty(list)) { - id = IdUtil.fastSimpleUUID(); - //事件入库 - CsEventPO csEvent = new CsEventPO(); - csEvent.setId(id); - csEvent.setDeviceId(po.getId()); - csEvent.setProcess(po.getProcess()); - csEvent.setCode(item.getCode()); - eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); - csEvent.setStartTime(eventTime); - tag = item.getName(); - csEvent.setTag(tag); - if (Objects.equals(item.getType(),"2")){ - csEvent.setType(0); - } else if (Objects.equals(item.getType(),"3")){ - csEvent.setType(1); - } else if (Objects.equals(item.getType(),"1")){ - csEvent.setType(2); - lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString(); - csEvent.setClDid(appEventMessage.getMsg().getClDid()); - } - csEvent.setLevel(Integer.parseInt(item.getType())); - //参数入库 - Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); - if (!Objects.isNull(item.getParam())){ - String tableName = map.get(item.getName()); - List params = item.getParam(); - for (AppEventMessage.Param param : params) { - Map tags = new HashMap<>(); - tags.put(InfluxDBTableConstant.UUID,id); - Map fields = new HashMap<>(); - if (Objects.equals(ZlConstant.EVENT_POSITION,param.getName())){ - if (Objects.equals(param.getData(),ZlConstant.GRID)){ - fields.put(param.getName(),"电网侧"); - lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString(); - csEvent.setClDid(1); - } else if (Objects.equals(param.getData(),ZlConstant.LOAD)){ - fields.put(param.getName(),"负载侧"); - lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString(); - csEvent.setClDid(2); - } - csEvent.setLocation(param.getData().toString()); - } else { - if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){ - csEvent.setPersistTime(Double.parseDouble(param.getData().toString())); - } - lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); - fields.put(param.getName(),appEventMessage.getMsg().getClDid()==1?"电网侧":"负载侧"); - csEvent.setLocation(appEventMessage.getMsg().getClDid()==1?ZlConstant.GRID:ZlConstant.LOAD); - csEvent.setClDid(appEventMessage.getMsg().getClDid()); - fields.put(param.getName(),param.getData()); - } - //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 - Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); - BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); - batchPoints.point(point); - records.add(batchPoints.lineProtocol()); - } - } - csEvent.setLineId(lineId); - list1.add(csEvent); - //事件处理日志库 - CsEventLogs csEventLogs = new CsEventLogs(); - csEventLogs.setLineId(lineId); - csEventLogs.setDeviceId(po.getId()); - csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); - csEventLogs.setTag(item.getName()); - csEventLogs.setStatus(1); - csEventLogs.setTime(LocalDateTime.now()); - csEventLogsService.save(csEventLogs); + id = IdUtil.fastSimpleUUID(); + //事件入库 + CsEventPO csEvent = new CsEventPO(); + csEvent.setId(id); + csEvent.setDeviceId(po.getId()); + csEvent.setProcess(po.getProcess()); + csEvent.setCode(item.getCode()); + eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); + csEvent.setStartTime(eventTime); + tag = item.getName(); + csEvent.setTag(tag); + if (Objects.equals(item.getType(),"2")){ + csEvent.setType(0); + } else if (Objects.equals(item.getType(),"3")){ + csEvent.setType(1); + } else if (Objects.equals(item.getType(),"1")){ + csEvent.setType(2); + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString(); + csEvent.setClDid(appEventMessage.getMsg().getClDid()); } + csEvent.setLevel(Integer.parseInt(item.getType())); + //参数入库 + Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); + if (!Objects.isNull(item.getParam())){ + String tableName = map.get(item.getName()); + List params = item.getParam(); + for (AppEventMessage.Param param : params) { + Map tags = new HashMap<>(); + tags.put(InfluxDBTableConstant.UUID,id); + Map fields = new HashMap<>(); + if (Objects.equals(ZlConstant.EVENT_POSITION,param.getName())){ + if (Objects.equals(param.getData(),ZlConstant.GRID)){ + fields.put(param.getName(),"电网侧"); + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString(); + csEvent.setClDid(1); + } else if (Objects.equals(param.getData(),ZlConstant.LOAD)){ + fields.put(param.getName(),"负载侧"); + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString(); + csEvent.setClDid(2); + } + csEvent.setLocation(param.getData().toString()); + } else { + if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){ + csEvent.setPersistTime(Double.parseDouble(param.getData().toString())); + } + lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); + fields.put(param.getName(),appEventMessage.getMsg().getClDid()==1?"电网侧":"负载侧"); + csEvent.setLocation(appEventMessage.getMsg().getClDid()==1?ZlConstant.GRID:ZlConstant.LOAD); + csEvent.setClDid(appEventMessage.getMsg().getClDid()); + fields.put(param.getName(),param.getData()); + } + //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 + Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + records.add(batchPoints.lineProtocol()); + } + } + csEvent.setLineId(lineId); + list1.add(csEvent); + //事件处理日志库 + CsEventLogs csEventLogs = new CsEventLogs(); + csEventLogs.setLineId(lineId); + csEventLogs.setDeviceId(po.getId()); + csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); + csEventLogs.setTag(item.getName()); + csEventLogs.setStatus(1); + csEventLogs.setTime(LocalDateTime.now()); + csEventLogsService.save(csEventLogs); } //cs_event入库 if (CollectionUtil.isNotEmpty(list1)){ diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index e11f371..64d98b2 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -12,6 +12,7 @@ import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.utils.CRC32Utils; +import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.config.GeneralInfo; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csharmonic.api.WavePicFeignClient; @@ -27,10 +28,7 @@ import com.njcn.zlevent.pojo.dto.FileStreamDto; import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.pojo.po.CsEventFileLogs; import com.njcn.zlevent.pojo.po.CsWave; -import com.njcn.zlevent.service.ICsEventFileLogsService; -import com.njcn.zlevent.service.ICsEventService; -import com.njcn.zlevent.service.ICsWaveService; -import com.njcn.zlevent.service.IFileService; +import com.njcn.zlevent.service.*; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; @@ -54,22 +52,16 @@ import java.util.*; public class FileServiceImpl implements IFileService { private final RedisUtil redisUtil; - private final CsTopicFeignClient csTopicFeignClient; - private final MqttPublisher publisher; - private final FileStorageUtil fileStorageUtil; - private final ICsEventService csEventService; - private final ICsEventFileLogsService csEventLogsService; - private final WavePicFeignClient wavePicFeignClient; - private final ICsWaveService csWaveService; - private final GeneralInfo generalInfo; + private final ICsWaveAnalysisService iCsWaveAnalysisService; + private final ChannelObjectUtil channelObjectUtil; @Override public void analysisFileInfo(AppFileMessage appFileMessage) { @@ -80,12 +72,7 @@ public class FileServiceImpl implements IFileService { String fileName = appFileMessage.getMsg().getFileInfo().getName(); //缓存文件信息用于文件流拼接 FileInfoDto fileInfoDto = new FileInfoDto(); - //获取波形文件起始结束时间 - Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.TIME+fileName); - if (Objects.isNull(fileInfo)){ - throw new BusinessException(AccessResponseEnum.WAVE_INFO_MISSING); - } - WaveTimeDto waveTimeDto = JSON.parseObject(JSON.toJSONString(fileInfo), WaveTimeDto.class); + WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class).get(0); fileInfoDto.setStartTime(waveTimeDto.getStartTime()); fileInfoDto.setEndTime(waveTimeDto.getEndTime()); fileInfoDto.setDeviceId(waveTimeDto.getDeviceId()); @@ -110,7 +97,7 @@ public class FileServiceImpl implements IFileService { csWave.setStatus(0); csWaveService.save(csWave); //请求当前文件的数据 - askFileStream(appFileMessage.getId(),mid,fileName,0,range); + askFileStream(appFileMessage.getId(),mid,fileName,-1,range); redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); redisUtil.delete(AppRedisKey.TIME+fileName); } else { @@ -122,7 +109,7 @@ public class FileServiceImpl implements IFileService { public void analysisFileStream(AppFileMessage appFileMessage) { DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); String filePath = null; - List list = new ArrayList<>(); + Set list = new HashSet<>(); FileStreamDto fileStreamDto = new FileStreamDto(); //波形文件上传成功后,将文件信息存储一下,方便后期查看 CsWave csWave = new CsWave(); @@ -139,7 +126,7 @@ public class FileServiceImpl implements IFileService { String lsFileName = generalInfo.getBusinessTempPath() + File.separator + fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1]; File lsFile =new File(generalInfo.getBusinessTempPath()); //如果文件夹不存在则创建 - if (!lsFile.exists() && !lsFile.isDirectory()){ + if (!lsFile.exists() && !lsFile.isDirectory()) { lsFile .mkdirs(); } //获取缓存的文件信息 @@ -180,6 +167,7 @@ public class FileServiceImpl implements IFileService { //2.缓存了判断收到的报文个数是否和总个数一致,一致则解析文件;不一致则更新缓存 //3.超时判断: 30s未收到相关文件信息,核查文件个数,看丢失哪些帧,重新请求 else { + redisUtil.saveByKey("handleEvent:" + appFileMessage.getId(),"doing"); if (appFileMessage.getMsg().getFrameTotal() == 1){ //解析文件入库 filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileInfoDto.getFileCheck(),"event"); @@ -197,8 +185,18 @@ public class FileServiceImpl implements IFileService { if (CollectionUtil.isNotEmpty(eventList)){ eventList.forEach(wavePicFeignClient::getWavePics); } + redisUtil.delete("handleEvent:" + appFileMessage.getId()); redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName())); + //删除已经处理完的文件,之后再判断还有是否需要下载的文件 + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class); + fileDto.removeIf(item -> item.getFileName().equals(fileName)); + redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto); + if (CollectionUtil.isNotEmpty(fileDto)) { + iCsWaveAnalysisService.channelWave(appFileMessage.getId()); + } } else { + //收到数据就刷新缓存值 + redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 10L); Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName)); if (Objects.isNull(object1)){ fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal()); @@ -211,7 +209,6 @@ public class FileServiceImpl implements IFileService { csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); csEventLogs.setIsAll(0); - redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 5L); redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto); //将数据写入临时文件 appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData()); @@ -245,13 +242,20 @@ public class FileServiceImpl implements IFileService { redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART_MISSING.concat(appFileMessage.getMsg().getName())); + redisUtil.delete("handleEvent:" + appFileMessage.getId()); //删除临时文件 File file = new File(lsFileName); if (file.exists()) { file.delete(); } + //删除已经处理完的文件,之后再判断还有是否需要下载的文件 + List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class); + fileDto.removeIf(item -> item.getFileName().equals(fileName)); + redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto); + if (CollectionUtil.isNotEmpty(fileDto)) { + iCsWaveAnalysisService.channelWave(appFileMessage.getId()); + } } else { - Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName)); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!"); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); @@ -261,14 +265,12 @@ public class FileServiceImpl implements IFileService { //将数据写入临时文件 appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); - if (Objects.isNull(object2)) { - redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 10L); - } else { + Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName)); + if (!Objects.isNull(object2)) { redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L); } } } else { - redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件为重复帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,不做记录!"); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java index d02f0ca..3f26345 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java @@ -11,6 +11,7 @@ import com.njcn.system.api.RocketMqLogFeignClient; import com.njcn.system.pojo.po.RocketmqMsgErrorLog; import com.njcn.zlevent.api.AlarmFeignClient; import com.njcn.zlevent.api.EventFeignClient; +import com.njcn.zlevent.api.EvtErrorFeignClient; import com.njcn.zlevent.api.WaveFeignClient; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -48,6 +49,8 @@ public class AppEventConsumer extends EnhanceConsumerMessageHandler