From d537021ffd4d42eccc89fc3ea2abeb28078d0f19 Mon Sep 17 00:00:00 2001 From: xuyang <748613696@qq.com> Date: Thu, 7 Sep 2023 20:37:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B2=BB=E7=90=86=E6=9A=82=E6=80=81=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E3=80=81=E6=96=87=E4=BB=B6=E8=A7=A3=E6=9E=90=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/api/CsTopicFeignClient.java | 18 ++ .../CsTopicClientFallbackFactory.java | 34 +++ .../njcn/access/enums/AccessResponseEnum.java | 1 + .../njcn/access/pojo/dto/file/FileDto.java | 74 +++++ .../access/controller/CsTopicController.java | 45 +++ .../access/handler/MqttMessageHandler.java | 45 ++- .../service/impl/CsDeviceServiceImpl.java | 18 +- .../stat/service/impl/StatServiceImpl.java | 3 +- .../com/njcn/zlevent/api/FileFeignClient.java | 22 ++ .../com/njcn/zlevent/api/WaveFeignClient.java | 18 ++ .../fallback/FileClientFallbackFactory.java | 41 +++ .../fallback/WaveClientFallbackFactory.java | 35 +++ .../com/njcn/zlevent/param/CsEventParam.java | 27 ++ .../njcn/zlevent/pojo/dto/FileInfoDto.java | 37 +++ .../njcn/zlevent/pojo/dto/FileStreamDto.java | 20 ++ .../njcn/zlevent/pojo/dto/WaveTimeDto.java | 23 ++ .../com/njcn/zlevent/pojo/po/CsEvent.java | 48 --- .../njcn/zlevent/pojo/po/CsEventDetail.java | 61 ---- .../analysis-zl-event/zl-event-boot/pom.xml | 20 ++ .../zlevent/controller/FileController.java | 57 ++++ .../zlevent/controller/WaveController.java | 10 +- .../njcn/zlevent/mapper/CsEventMapper.java | 4 +- ...tailMapper.java => CsEventUserMapper.java} | 6 +- .../njcn/zlevent/service/ICsEventService.java | 10 +- ...lService.java => ICsEventUserService.java} | 6 +- .../njcn/zlevent/service/ICsWaveService.java | 23 ++ .../njcn/zlevent/service/IEventService.java | 4 + .../njcn/zlevent/service/IFileService.java | 29 ++ .../impl/CsEventDetailServiceImpl.java | 20 -- .../service/impl/CsEventServiceImpl.java | 16 +- .../service/impl/CsEventUserServiceImpl.java | 21 ++ .../service/impl/CsWaveServiceImpl.java | 160 ++++++++++ .../service/impl/EventServiceImpl.java | 124 ++++++-- .../zlevent/service/impl/FileServiceImpl.java | 282 ++++++++++++++++++ .../message/consumer/AppEventConsumer.java | 7 +- .../message/consumer/AppFileInfoConsumer.java | 139 +++++++++ .../consumer/AppFileStreamConsumer.java | 137 +++++++++ 37 files changed, 1466 insertions(+), 179 deletions(-) create mode 100644 iot-access/access-api/src/main/java/com/njcn/access/api/CsTopicFeignClient.java create mode 100644 iot-access/access-api/src/main/java/com/njcn/access/api/fallback/CsTopicClientFallbackFactory.java create mode 100644 iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/controller/CsTopicController.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/FileFeignClient.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/FileClientFallbackFactory.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/param/CsEventParam.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java delete mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEvent.java delete mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventDetail.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java rename iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/{CsEventDetailMapper.java => CsEventUserMapper.java} (50%) rename iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/{ICsEventDetailService.java => ICsEventUserService.java} (51%) create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java delete mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventDetailServiceImpl.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventUserServiceImpl.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java create mode 100644 iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppFileInfoConsumer.java create mode 100644 iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppFileStreamConsumer.java diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/CsTopicFeignClient.java b/iot-access/access-api/src/main/java/com/njcn/access/api/CsTopicFeignClient.java new file mode 100644 index 0000000..d7eb713 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/CsTopicFeignClient.java @@ -0,0 +1,18 @@ +package com.njcn.access.api; + +import com.njcn.access.api.fallback.CsTopicClientFallbackFactory; +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; + +/** + * @author xy + */ +@FeignClient(value = ServerInfo.ACCESS_BOOT, path = "/topic", fallbackFactory = CsTopicClientFallbackFactory.class,contextId = "topic") +public interface CsTopicFeignClient { + + @PostMapping("/find") + HttpResult find(@RequestParam("nDid") String nDid); +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/CsTopicClientFallbackFactory.java b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/CsTopicClientFallbackFactory.java new file mode 100644 index 0000000..84b0ba0 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/CsTopicClientFallbackFactory.java @@ -0,0 +1,34 @@ +package com.njcn.access.api.fallback; + +import com.njcn.access.api.CsTopicFeignClient; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author xy + */ +@Slf4j +@Component +public class CsTopicClientFallbackFactory implements FallbackFactory { + @Override + public CsTopicFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if (cause.getCause() instanceof BusinessException) { + BusinessException businessException = (BusinessException) cause.getCause(); + } + Enum finalExceptionEnum = exceptionEnum; + return new CsTopicFeignClient() { + + @Override + public HttpResult find(String nDid) { + log.error("{}异常,降级处理,异常为:{}","获取设备支持的主题版本",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 50426e2..3c14bf9 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -51,6 +51,7 @@ public enum AccessResponseEnum { SET_DICT_MISSING("A0307","Set字典数据缺失!"), INSET_DICT_MISSING("A0307","InSet字典数据缺失!"), CTRL_DICT_MISSING("A0307","Ctrl字典数据缺失!"), + WAVE_INFO_MISSING("A0307","波形参数缺失!"), MODEL_MISS("A0308","模板信息缺失!"), MODEL_VERSION_ERROR("A0308","询问装置模板信息错误"), diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java new file mode 100644 index 0000000..1274fac --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java @@ -0,0 +1,74 @@ +package com.njcn.access.pojo.dto.file; + +import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/6 8:51 + */ +@Data +public class FileDto implements Serializable { + + @SerializedName("Mid") + private Integer mid; + + @SerializedName("Did") + @ApiModelProperty("逻辑设备 治理逻辑设备为1 电能质量设备为2") + private Integer did; + + @SerializedName("Pri") + private Integer pri; + + @SerializedName("Type") + private Integer type; + + @SerializedName("Code") + private Integer code; + + @SerializedName("Msg") + private FileDto.Msg msg; + + @Data + public static class Msg{ + + @SerializedName("Type") + private String type; + + @SerializedName("FileInfo") + private FileDto.FileInfo fileInfo; + + @SerializedName("Data") + private String data; + + @SerializedName("Name") + private String name; + + } + + @Data + public static class FileInfo{ + + @SerializedName("Name") + private String name; + + @SerializedName("FileTime") + private Long fileTime; + + @SerializedName("FileSize") + private Integer fileSize; + + @SerializedName("FileCheck") + private String fileCheck; + + @SerializedName("FileChkType") + private String fileChkType; + } + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsTopicController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsTopicController.java new file mode 100644 index 0000000..3fc63b8 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsTopicController.java @@ -0,0 +1,45 @@ +package com.njcn.access.controller; + +import com.njcn.access.pojo.po.CsLineModel; +import com.njcn.access.service.ICsTopicService; +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; +import springfox.documentation.annotations.ApiIgnore; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/6 11:07 + */ +@Slf4j +@RestController +@RequestMapping("/topic") +@Api(tags = "设备主题") +@AllArgsConstructor +@ApiIgnore +public class CsTopicController extends BaseController { + + private final ICsTopicService csTopicService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/find") + @ApiOperation("获取设备支持的主题版本") + @ApiImplicitParam(name = "nDid", value = "nDid", required = true) + public HttpResult find(@RequestParam String nDid){ + String methodDescribe = getMethodDescribe("find"); + String version = csTopicService.getVersion(nDid); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, version, methodDescribe); + } +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index a750508..e9fb3ec 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -13,6 +13,7 @@ import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.RspDataDto; import com.njcn.access.pojo.dto.*; +import com.njcn.access.pojo.dto.file.FileDto; import com.njcn.access.pojo.dto.heart.HeartBeatDto; import com.njcn.access.pojo.param.ReqAndResParam; import com.njcn.access.pojo.po.CsLineModel; @@ -27,10 +28,14 @@ import com.njcn.csdevice.api.DataSetFeignClient; import com.njcn.csdevice.api.DevModelFeignClient; import com.njcn.csdevice.pojo.po.CsDataSet; import com.njcn.csdevice.pojo.po.CsDevModelPO; +import com.njcn.mq.constant.BusinessTopic; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.mq.message.AppEventMessage; +import com.njcn.mq.message.AppFileMessage; import com.njcn.mq.template.AppAutoDataMessageTemplate; import com.njcn.mq.template.AppEventMessageTemplate; +import com.njcn.mq.template.AppFileMessageTemplate; +import com.njcn.mq.template.AppFileStreamMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.web.utils.RequestUtil; @@ -78,6 +83,10 @@ public class MqttMessageHandler { private final CsLogsFeignClient csLogsFeignClient; + private final AppFileMessageTemplate appFileMessageTemplate; + + private final AppFileStreamMessageTemplate appFileStreamMessageTemplate; + @Autowired Validator validator; @@ -357,7 +366,8 @@ public class MqttMessageHandler { //处理心跳 ReqAndResDto.Res reqAndResParam = new ReqAndResDto.Res(); HeartBeatDto heartBeatDto = new HeartBeatDto(); - heartBeatDto.setTime(System.currentTimeMillis()/1000); + //fixme 前置处理的时间应该是UTC时间,所以需要加8小时。 + heartBeatDto.setTime(System.currentTimeMillis()/1000+8*3600); reqAndResParam.setMid(res.getMid()); reqAndResParam.setDid(0); reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); @@ -421,4 +431,37 @@ public class MqttMessageHandler { break; } } + + /** + * 文件传输 + * @param topic + * @param message + * @param version + * @param nDid + * @param payload + */ + @MqttSubscribe(value = "/Pfm/DevFileRsp/{version}/{edgeId}",qos = 1) + @Transactional(rollbackFor = Exception.class) + public void file(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) { + //解析数据 + Gson gson = new Gson(); + FileDto fileDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), FileDto.class); + JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(fileDto)); + AppFileMessage appFileMessage = JSONObject.toJavaObject(jsonObject, AppFileMessage.class); + appFileMessage.setId(nDid); + //响应请求 + switch (fileDto.getType()){ + case 4657: + log.info("获取文件信息"); + appFileMessageTemplate.sendMember(appFileMessage); + break; + case 4658: + log.info("获取文件流信息"); + appFileStreamMessageTemplate.sendMember(appFileMessage); + break; + default: + break; + } + } + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 881656c..2663aa1 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -243,14 +243,17 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //3.监测点表录入关系 for (DevAccessParam.LineParam item : devAccessParam.getList()) { String location = dicDataFeignClient.getDicDataById(item.getPosition()).getData().getCode(); - String id = IdUtil.fastSimpleUUID(); CsLinePO po = new CsLinePO(); - po.setLineId(id); + CsLedgerParam param = new CsLedgerParam(); + AppLineTopologyDiagramPO appLineTopologyDiagramPo = new AppLineTopologyDiagramPO(); po.setName(item.getName()); po.setPosition(item.getPosition()); po.setClDid(0); if (Objects.equals(DicDataEnum.LOAD_SIDE.getCode(),location)){ RspDataDto.LdevInfo po1 = list.stream().filter(s -> Objects.equals(s.getClDid(),1)).findFirst().orElse(null); + po.setLineId(devAccessParam.getNDid() + "1"); + param.setId(devAccessParam.getNDid() + "1"); + appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "1"); po.setVolGrade(po1.getVolGrade()); po.setPtRatio(po1.getPtRatio()); po.setCtRatio(po1.getCtRatio()); @@ -258,24 +261,27 @@ public class CsDeviceServiceImpl implements ICsDeviceService { po.setClDid(1); } else if (Objects.equals(DicDataEnum.GRID_SIDE.getCode(),location)){ RspDataDto.LdevInfo po1 = list.stream().filter(s -> Objects.equals(s.getClDid(),2)).findFirst().orElse(null); + po.setLineId(devAccessParam.getNDid() + "2"); + param.setId(devAccessParam.getNDid() + "2"); + appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "2"); po.setVolGrade(po1.getVolGrade()); po.setPtRatio(po1.getPtRatio()); po.setCtRatio(po1.getCtRatio()); po.setConType(po1.getConType()); po.setClDid(2); + } else { + po.setLineId(devAccessParam.getNDid() + "0"); + param.setId(devAccessParam.getNDid() + "0"); + appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "0"); } po.setStatus(1); csLinePoList.add(po); - CsLedgerParam param = new CsLedgerParam(); - param.setId(id); param.setPid(vo.getId()); param.setName(item.getName()); param.setLevel(3); param.setSort(0); csLedgerService.addLedgerTree(param); - AppLineTopologyDiagramPO appLineTopologyDiagramPo = new AppLineTopologyDiagramPO(); appLineTopologyDiagramPo.setId(devAccessParam.getTopologyDiagram()); - appLineTopologyDiagramPo.setLineId(id); appLineTopologyDiagramPo.setLat(item.getLat()); appLineTopologyDiagramPo.setLng(item.getLng()); appLineTopologyDiagramPo.setStatus("1"); diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index c30a3cc..ef2a74a 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -209,7 +209,8 @@ public class StatServiceImpl implements IStatService { Map fields = new HashMap<>(); fields.put(dataArrayList.get(i).getName(),floats.get(i)); fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag()); - Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec(), TimeUnit.SECONDS, tags, fields); + //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 + Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/FileFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/FileFeignClient.java new file mode 100644 index 0000000..673f9d9 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/FileFeignClient.java @@ -0,0 +1,22 @@ +package com.njcn.zlevent.api; + +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppFileMessage; +import com.njcn.zlevent.api.fallback.FileClientFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +/** + * @author xy + */ +@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/file", fallbackFactory = FileClientFallbackFactory.class,contextId = "file") +public interface FileFeignClient { + + @PostMapping("/fileInfo") + HttpResult fileInfo(AppFileMessage appFileMessage); + + @PostMapping("/fileStream") + HttpResult fileStream(AppFileMessage appFileMessage); +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java new file mode 100644 index 0000000..17be4ec --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/WaveFeignClient.java @@ -0,0 +1,18 @@ +package com.njcn.zlevent.api; + +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.api.fallback.EventClientFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; + +/** + * @author xy + */ +@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/wave", fallbackFactory = EventClientFallbackFactory.class,contextId = "wave") +public interface WaveFeignClient { + + @PostMapping("/analysis") + HttpResult analysis(AppEventMessage appEventMessage); +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/FileClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/FileClientFallbackFactory.java new file mode 100644 index 0000000..3116920 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/FileClientFallbackFactory.java @@ -0,0 +1,41 @@ +package com.njcn.zlevent.api.fallback; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppFileMessage; +import com.njcn.zlevent.api.FileFeignClient; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author xy + */ +@Slf4j +@Component +public class FileClientFallbackFactory implements FallbackFactory { + @Override + public FileFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if (cause.getCause() instanceof BusinessException) { + BusinessException businessException = (BusinessException) cause.getCause(); + } + Enum finalExceptionEnum = exceptionEnum; + return new FileFeignClient() { + + @Override + public HttpResult fileInfo(AppFileMessage appFileMessage) { + log.error("{}异常,降级处理,异常为:{}","文件信息",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + + @Override + public HttpResult fileStream(AppFileMessage appFileMessage) { + log.error("{}异常,降级处理,异常为:{}","解析文件流",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java new file mode 100644 index 0000000..2fcdf17 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/WaveClientFallbackFactory.java @@ -0,0 +1,35 @@ +package com.njcn.zlevent.api.fallback; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.api.EventFeignClient; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author xy + */ +@Slf4j +@Component +public class WaveClientFallbackFactory implements FallbackFactory { + @Override + public EventFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if (cause.getCause() instanceof BusinessException) { + BusinessException businessException = (BusinessException) cause.getCause(); + } + Enum finalExceptionEnum = exceptionEnum; + return new EventFeignClient() { + + @Override + public HttpResult analysis(AppEventMessage appEventMessage) { + log.error("{}异常,降级处理,异常为:{}","波形报文解析",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/param/CsEventParam.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/param/CsEventParam.java new file mode 100644 index 0000000..06660b3 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/param/CsEventParam.java @@ -0,0 +1,27 @@ +package com.njcn.zlevent.param; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/7 19:26 + */ +@Data +public class CsEventParam implements Serializable { + + private String lineId; + + private String deviceId; + + private String startTime; + + private String endTime; + + private String path; + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java new file mode 100644 index 0000000..6b10ae2 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java @@ -0,0 +1,37 @@ +package com.njcn.zlevent.pojo.dto; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/6 14:10 + */ +@Data +public class FileInfoDto { + + private String name; + + private Long fileTime; + + private Integer fileSize; + + private String fileCheck; + + private String fileChkType; + + @ApiModelProperty("报文数量") + private Integer number; + + private String startTime; + + private String endTime; + + private String deviceId; + + private String lineId; + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java new file mode 100644 index 0000000..69af185 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java @@ -0,0 +1,20 @@ +package com.njcn.zlevent.pojo.dto; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/7 9:02 + */ +@Data +public class FileStreamDto implements Serializable { + + private Map map; + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java new file mode 100644 index 0000000..6fba9d9 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WaveTimeDto.java @@ -0,0 +1,23 @@ +package com.njcn.zlevent.pojo.dto; + +import lombok.Data; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/7 18:54 + */ +@Data +public class WaveTimeDto { + + private String deviceId; + + private String lineId; + + private String startTime; + + private String endTime; + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEvent.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEvent.java deleted file mode 100644 index dd626e3..0000000 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEvent.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.njcn.zlevent.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableName; -import com.njcn.db.bo.BaseEntity; -import lombok.Data; -import lombok.EqualsAndHashCode; - -import java.time.LocalDateTime; - -/** - *

- * 暂态事件表 - *

- * - * @author xuyang - * @since 2023-08-23 - */ -@Data -@TableName("cs_event") -public class CsEvent { - - private static final long serialVersionUID = 1L; - - /** - * id - */ - private String id; - - /** - * 监测点id - */ - private String lineId; - - /** - * 事件名称 - */ - private String name; - - /** - * 展示名称 - */ - private String showName; - - /** - * 开始时间 - */ - private LocalDateTime startTime; -} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventDetail.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventDetail.java deleted file mode 100644 index 043aad2..0000000 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventDetail.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.njcn.zlevent.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableName; -import lombok.Data; - -/** - *

- * 暂态事件详情表 - *

- * - * @author xuyang - * @since 2023-08-23 - */ -@Data -@TableName("cs_event_detail") -public class CsEventDetail { - - private static final long serialVersionUID = 1L; - - /** - * id - */ - private String id; - - /** - * 暂态事件id - */ - private String pid; - - /** - * 指标名称 - */ - private String name; - - /** - * 指标别名 - */ - private String showName; - - /** - * 数据类型 - */ - private String type; - - /** - * 单位 - */ - private String unit; - - /** - * 数值 - */ - private Double data; - - /** - * 相别 - */ - private String phasic; - - -} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml b/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml index 3ed6e00..8e1a370 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml +++ b/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml @@ -73,6 +73,26 @@ system-api ${project.version} + + com.njcn + cs-harmonic-api + ${project.version} + + + com.njcn + common-influxDB + ${project.version} + + + com.njcn + access-api + ${project.version} + + + com.njcn + common-oss + ${project.version} + diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java new file mode 100644 index 0000000..f70540d --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java @@ -0,0 +1,57 @@ +package com.njcn.zlevent.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.mq.message.AppFileMessage; +import com.njcn.web.controller.BaseController; +import com.njcn.zlevent.service.IFileService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/6 9:29 + */ +@Slf4j +@RestController +@RequestMapping("/file") +@Api(tags = "文件处理") +@AllArgsConstructor +public class FileController extends BaseController { + + private final IFileService fileService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/fileInfo") + @ApiOperation("文件信息") + @ApiImplicitParam(name = "appFileMessage", value = "数据实体", required = true) + public HttpResult fileInfo(@RequestBody AppFileMessage appFileMessage){ + String methodDescribe = getMethodDescribe("fileInfo"); + fileService.analysisFileInfo(appFileMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/fileStream") + @ApiOperation("解析文件") + @ApiImplicitParam(name = "appFileMessage", value = "数据实体", required = true) + public HttpResult fileStream(@RequestBody AppFileMessage appFileMessage){ + String methodDescribe = getMethodDescribe("fileStream"); + fileService.analysisFileStream(appFileMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java index 75a6fc8..c8d8d27 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java @@ -5,13 +5,16 @@ import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; +import com.njcn.mq.message.AppEventMessage; import com.njcn.web.controller.BaseController; +import com.njcn.zlevent.service.ICsWaveService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -29,12 +32,15 @@ import org.springframework.web.bind.annotation.RestController; @AllArgsConstructor public class WaveController extends BaseController { + private final ICsWaveService csWaveService; + @OperateInfo(info = LogEnum.BUSINESS_COMMON) @PostMapping("/analysis") @ApiOperation("录波解析") - @ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true) - public HttpResult analysis(){ + @ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true) + public HttpResult analysis(@RequestBody AppEventMessage appEventMessage){ String methodDescribe = getMethodDescribe("analysis"); + csWaveService.analysis(appEventMessage); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventMapper.java index c7f7e3f..4a94a88 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventMapper.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventMapper.java @@ -1,7 +1,7 @@ package com.njcn.zlevent.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.njcn.zlevent.pojo.po.CsEvent; +import com.njcn.csharmonic.pojo.po.CsEventPO; /** *

@@ -11,6 +11,6 @@ import com.njcn.zlevent.pojo.po.CsEvent; * @author xuyang * @since 2023-08-23 */ -public interface CsEventMapper extends BaseMapper { +public interface CsEventMapper extends BaseMapper { } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventDetailMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventUserMapper.java similarity index 50% rename from iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventDetailMapper.java rename to iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventUserMapper.java index b663106..39f1e55 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventDetailMapper.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventUserMapper.java @@ -1,16 +1,16 @@ package com.njcn.zlevent.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.njcn.zlevent.pojo.po.CsEventDetail; +import com.njcn.csharmonic.pojo.po.CsEventUserPO; /** *

- * 暂态事件详情表 Mapper 接口 + * 暂态事件表 Mapper 接口 *

* * @author xuyang * @since 2023-08-23 */ -public interface CsEventDetailMapper extends BaseMapper { +public interface CsEventUserMapper extends BaseMapper { } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java index 28a9afa..9a4f092 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java @@ -1,7 +1,8 @@ package com.njcn.zlevent.service; import com.baomidou.mybatisplus.extension.service.IService; -import com.njcn.zlevent.pojo.po.CsEvent; +import com.njcn.csharmonic.pojo.po.CsEventPO; +import com.njcn.zlevent.param.CsEventParam; /** *

@@ -11,6 +12,11 @@ import com.njcn.zlevent.pojo.po.CsEvent; * @author xuyang * @since 2023-08-23 */ -public interface ICsEventService extends IService { +public interface ICsEventService extends IService { + + /** + * 事件添加波形文件地址 + */ + void updateCsEvent(CsEventParam csEventParam); } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventDetailService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventUserService.java similarity index 51% rename from iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventDetailService.java rename to iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventUserService.java index 1754b42..4e63ca2 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventDetailService.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventUserService.java @@ -1,16 +1,16 @@ package com.njcn.zlevent.service; import com.baomidou.mybatisplus.extension.service.IService; -import com.njcn.zlevent.pojo.po.CsEventDetail; +import com.njcn.csharmonic.pojo.po.CsEventUserPO; /** *

- * 暂态事件详情表 服务类 + * 暂态事件表 服务类 *

* * @author xuyang * @since 2023-08-23 */ -public interface ICsEventDetailService extends IService { +public interface ICsEventUserService extends IService { } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java new file mode 100644 index 0000000..eb867cc --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java @@ -0,0 +1,23 @@ +package com.njcn.zlevent.service; + +import com.njcn.mq.message.AppEventMessage; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/5 14:52 + */ + +public interface ICsWaveService { + + /** + * 1.获取波形报文的文件名称 + * 2.循环获取文件。将文件存储至文件服务器,获取文件路径 + * 3.根据波形持续的时间,更新事件里面的暂态波形路径 + * @param appEventMessage + */ + void analysis(AppEventMessage appEventMessage); + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java index 31b1df2..59a27e0 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java @@ -9,6 +9,10 @@ public interface IEventService { /** * 解析事件数据 + * 1.解析事件数据入库MySQL + * 2.解析模板数据入库influxDB + * 3.记录用户事件表,将状态默认置为未读取 + * 4.查看用户推送的开关是否开启,推送已开启状态的用户 * @param */ void analysis(AppEventMessage appEventMessage); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java new file mode 100644 index 0000000..b43a1ad --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java @@ -0,0 +1,29 @@ +package com.njcn.zlevent.service; + +import com.njcn.mq.message.AppFileMessage; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/6 9:31 + */ + +public interface IFileService { + + /** + * 解析文件流之前需要获取文件的信息,可能要特殊处理 + * 1.文件过大要分片获取(单次请求文件大小不超过50k) + * 2.校验文件(md5或者crc) + * @param appFileMessage + */ + void analysisFileInfo(AppFileMessage appFileMessage); + + /** + * 获取文件流,解析文件 + * @param appFileMessage + */ + void analysisFileStream(AppFileMessage appFileMessage); + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventDetailServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventDetailServiceImpl.java deleted file mode 100644 index c75975e..0000000 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventDetailServiceImpl.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.njcn.zlevent.service.impl; - -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.njcn.zlevent.mapper.CsEventDetailMapper; -import com.njcn.zlevent.pojo.po.CsEventDetail; -import com.njcn.zlevent.service.ICsEventDetailService; -import org.springframework.stereotype.Service; - -/** - *

- * 暂态事件详情表 服务实现类 - *

- * - * @author xuyang - * @since 2023-08-23 - */ -@Service -public class CsEventDetailServiceImpl extends ServiceImpl implements ICsEventDetailService { - -} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java index 49d23c8..b5fe78c 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java @@ -1,8 +1,11 @@ package com.njcn.zlevent.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.csharmonic.pojo.po.CsEventPO; import com.njcn.zlevent.mapper.CsEventMapper; -import com.njcn.zlevent.pojo.po.CsEvent; +import com.njcn.zlevent.param.CsEventParam; import com.njcn.zlevent.service.ICsEventService; import org.springframework.stereotype.Service; @@ -15,6 +18,15 @@ import org.springframework.stereotype.Service; * @since 2023-08-23 */ @Service -public class CsEventServiceImpl extends ServiceImpl implements ICsEventService { +public class CsEventServiceImpl extends ServiceImpl implements ICsEventService { + @Override + public void updateCsEvent(CsEventParam csEventParam) { + LambdaUpdateWrapper lambdaUpdateWrapper = new LambdaUpdateWrapper<>(); + lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId()) + .eq(CsEventPO::getDeviceId,csEventParam.getDeviceId()) + .eq(CsEventPO::getType,0) + .between(CsEventPO::getStartTime,csEventParam.getStartTime(),csEventParam.getEndTime()); + this.update(lambdaUpdateWrapper); + } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventUserServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventUserServiceImpl.java new file mode 100644 index 0000000..0457e38 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventUserServiceImpl.java @@ -0,0 +1,21 @@ +package com.njcn.zlevent.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.csharmonic.pojo.po.CsEventPO; +import com.njcn.csharmonic.pojo.po.CsEventUserPO; +import com.njcn.zlevent.mapper.CsEventUserMapper; +import com.njcn.zlevent.service.ICsEventUserService; +import org.springframework.stereotype.Service; + +/** + *

+ * 暂态事件表 服务实现类 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +@Service +public class CsEventUserServiceImpl extends ServiceImpl implements ICsEventUserService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java new file mode 100644 index 0000000..5d60354 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java @@ -0,0 +1,160 @@ +package com.njcn.zlevent.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.github.tocrhz.mqtt.publisher.MqttPublisher; +import com.njcn.access.api.CsTopicFeignClient; +import com.njcn.access.enums.AccessEnum; +import com.njcn.access.enums.TypeEnum; +import com.njcn.access.pojo.dto.ReqAndResDto; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.enums.StatResponseEnum; +import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.pojo.po.DictData; +import com.njcn.zlevent.pojo.dto.WaveTimeDto; +import com.njcn.zlevent.service.ICsWaveService; +import lombok.AllArgsConstructor; +import net.sf.json.JSONObject; +import org.springframework.stereotype.Service; + +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Collectors; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/5 14:52 + */ +@Service +@AllArgsConstructor +public class CsWaveServiceImpl implements ICsWaveService { + + private final EquipmentFeignClient equipmentFeignClient; + + private final MqttPublisher publisher; + + private final CsTopicFeignClient csTopicFeignClient; + + private final RedisUtil redisUtil; + + private final CsLineFeignClient csLineFeignClient; + + private final DicDataFeignClient dicDataFeignClient; + + @Override + public void analysis(AppEventMessage appEventMessage) { + int mid = 1; + //获取监测点 + String lineId = null; + Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); + if (Objects.isNull(object1)){ + lineInfo(appEventMessage.getId()); + } + if (Objects.equals(appEventMessage.getDid(),1)){ + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString(); + } else if (Objects.equals(appEventMessage.getDid(),2)){ + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString(); + } + //获取装置id + String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId(); + //获取波形文件名称 + List dataArrayList = appEventMessage.getMsg().getDataArray(); + if (CollectionUtil.isNotEmpty(dataArrayList)){ + for (AppEventMessage.DataArray item : dataArrayList) { + //处理mid + if (Objects.equals(mid,10000)){ + mid = 1; + } + List paramList = item.getParam(); + Object object = paramList.stream().filter(item2 -> "Wave_RcdName".equals(item2.getName())).findFirst().get().getData(); + Object object2 = paramList.stream().filter(item2 -> "Wave_RcdKeepTime".equals(item2.getName())).findFirst().get().getData(); + String fileName = object.toString().replaceAll("\\[","").replaceAll("]",""); + List fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList()); + for (String file : fileList) { + file = file.trim(); + askFileInfo(appEventMessage.getId(),mid,file); + mid++; + channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId); + } + } + } + } + + /** + * 询问文件信息 + */ + public void askFileInfo(String nDid, Integer mid, String fileName) { + String version = csTopicFeignClient.find(nDid).getData(); + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(mid); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_8.getCode())); + reqAndResParam.setExpire(-1); + String json = "{Name:\""+fileName+"\"}"; + JSONObject jsonObject = JSONObject.fromObject(json); + reqAndResParam.setMsg(jsonObject); + publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); + } + + /** + * 时间处理 + */ + public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId) { + // 将startTime减去8小时(8 * 3600秒) + time -= 8 * 3600; + // 将millisecond转换为长整型,并乘以1000以获取微秒 + long millisecondValue = millisecond.longValue() * 1000; +// long millisecondValue = Long.parseLong(String.valueOf(millisecond))*1000; + // 计算最终时间 + long finalTime = subtleTime + millisecondValue; + // 如果finalTime大于等于1000000,将startTime增加1秒,finalTime减去1000000 + if (finalTime >= 1000000) { + time += 1; + finalTime -= 1000000; + } + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String formatTime = format.format(time * 1000); + WaveTimeDto waveTimeDto = new WaveTimeDto(); + waveTimeDto.setStartTime(formatTime + "." + subtleTime); + waveTimeDto.setEndTime(formatTime + "." + finalTime); + waveTimeDto.setDeviceId(deviceId); + waveTimeDto.setLineId(lineId); + redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,60L); + } + + /** + * 缓存监测点相关信息 + */ + public void lineInfo(String id) { + Map map = new HashMap<>(); + List lineList = csLineFeignClient.findByNdid(id).getData(); + if (CollectionUtil.isEmpty(lineList)){ + throw new BusinessException(StatResponseEnum.LINE_NULL); + } + for (CsLinePO item : lineList) { + DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData(); + if (Objects.isNull(dictData)){ + throw new BusinessException(StatResponseEnum.DICT_NULL); + } + if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){ + map.put(0,item.getLineId()); + } else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){ + map.put(1,item.getLineId()); + } else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){ + map.put(2,item.getLineId()); + } + } + redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,600L); + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index 358ee38..09ce802 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -4,8 +4,15 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.CsDeviceUserFeignClient; import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.po.CsDeviceUserPO; import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.csharmonic.pojo.po.CsEventPO; +import com.njcn.csharmonic.pojo.po.CsEventUserPO; +import com.njcn.influx.pojo.constant.InfluxDBTableConstant; +import com.njcn.influxdb.utils.InfluxDbUtils; import com.njcn.mq.message.AppEventMessage; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; @@ -13,14 +20,16 @@ import com.njcn.stat.enums.StatResponseEnum; import com.njcn.system.api.DicDataFeignClient; import com.njcn.system.api.EpdFeignClient; import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.pojo.dto.EpdDTO; import com.njcn.system.pojo.po.DictData; -import com.njcn.zlevent.pojo.po.CsEvent; -import com.njcn.zlevent.pojo.po.CsEventDetail; -import com.njcn.zlevent.service.ICsEventDetailService; import com.njcn.zlevent.service.ICsEventService; +import com.njcn.zlevent.service.ICsEventUserService; import com.njcn.zlevent.service.IEventService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -28,6 +37,7 @@ import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.concurrent.TimeUnit; /** * 类的介绍: @@ -51,16 +61,27 @@ public class EventServiceImpl implements IEventService { private final ICsEventService csEventService; - private final ICsEventDetailService csEventDetailService; + private final EquipmentFeignClient equipmentFeignClient; + + private final InfluxDbUtils influxDbUtils; + + private final ICsEventUserService csEventUserService; + + private final CsDeviceUserFeignClient csDeviceUserFeignClient; @Override @Transactional(rollbackFor = Exception.class) public void analysis(AppEventMessage appEventMessage) { - List list1 = new ArrayList<>(); - List list2 = new ArrayList<>(); + List list1 = new ArrayList<>(); + List records = new ArrayList(); + List list2 = new ArrayList<>(); //获取监测点id String lineId = null; Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); + //判断字典数据是否存在 + if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){ + saveData(); + } if (Objects.isNull(object1)){ lineInfo(appEventMessage.getId()); } @@ -69,36 +90,53 @@ public class EventServiceImpl implements IEventService { } else if (Objects.equals(appEventMessage.getDid(),2)){ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString(); } + //获取装置id + String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId(); //处理事件数据 List dataArray = appEventMessage.getMsg().getDataArray(); for (AppEventMessage.DataArray item : dataArray) { String id = IdUtil.fastSimpleUUID(); //事件入库 - CsEvent csEvent = new CsEvent(); + CsEventPO csEvent = new CsEventPO(); csEvent.setId(id); csEvent.setLineId(lineId); - csEvent.setName(item.getName()); - csEvent.setShowName(epdFeignClient.findByName(item.getName()).getData().getShowName()); + csEvent.setDeviceId(deviceId); csEvent.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); - List params = item.getParam(); - for (AppEventMessage.Param item2 : params) { - CsEventDetail csEventDetail = new CsEventDetail(); - csEventDetail.setPid(id); - csEventDetail.setName(item2.getName()); - csEventDetail.setShowName(epdFeignClient.findByName(item2.getName()).getData().getShowName()); - csEventDetail.setType(item2.getType()); - csEventDetail.setUnit(item2.getUnit()); - if (Objects.equals(item2.getName(),"Evt_VVaPhas")){ - csEventDetail.setPhasic(item2.getData().toString()); - } else { - csEventDetail.setData(Double.parseDouble(item2.getData().toString())); - } - list2.add(csEventDetail); - } + csEvent.setTag(item.getName()); + csEvent.setType(0); list1.add(csEvent); + //参数入库 + Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); + if (!Objects.isNull(item.getParam())){ + String tableName = map.get(item.getName()); + List params = item.getParam(); + params.forEach(param->{ + Map tags = new HashMap<>(); + tags.put(InfluxDBTableConstant.UUID,id); + Map fields = new HashMap<>(); + fields.put(param.getName(),param.getData()); + //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 + Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + records.add(batchPoints.lineProtocol()); + }); + } + //事件用户关系入库 + list2 = deviceUserList(deviceId,id); + } + //cs_event入库 + if (CollectionUtil.isNotEmpty(list1)){ + csEventService.saveBatch(list1); + } + //cs_device_user入库 + if (CollectionUtil.isNotEmpty(list2)){ + csEventUserService.saveBatch(list2); + } + //evt_data入库 + if (CollectionUtil.isNotEmpty(records)) { + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records); } - csEventService.saveBatch(list1); - csEventDetailService.saveBatch(list2); } /** @@ -126,16 +164,48 @@ public class EventServiceImpl implements IEventService { redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,600L); } + /** + * 缓存字典和influxDB表关系 + */ + public void saveData() { + Map map = new HashMap<>(); + List list = epdFeignClient.findAll().getData(); + if (CollectionUtil.isEmpty(list)){ + throw new BusinessException(StatResponseEnum.DICT_NULL); + } + list.forEach(item->{ + map.put(item.getDictName(),item.getTableName()); + }); + redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L); + } + /** * 时间转换 */ public LocalDateTime timeFormat(Long time1, Long time2) { //设置格式 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String timeText = format.format(time1 * 1000); + //todo 这边暂时先这样处理,减去8小时。 + String timeText = format.format((time1-8*3600) * 1000); String time = timeText + "." + time2; DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); return LocalDateTime.parse(time, fmt); } + /** + * 获取用户设备关系 + */ + public List deviceUserList(String devId, String id) { + List result = new ArrayList<>(); + List list = csDeviceUserFeignClient.findUserById(devId).getData(); + list.forEach(item->{ + CsEventUserPO csEventUser = new CsEventUserPO(); + csEventUser.setUserId(item); + csEventUser.setStatus(0); + csEventUser.setEventId(id); + result.add(csEventUser); + }); + return result; + } + } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java new file mode 100644 index 0000000..0ca15eb --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -0,0 +1,282 @@ +package com.njcn.zlevent.service.impl; + +import cn.hutool.core.text.StrPool; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.github.tocrhz.mqtt.publisher.MqttPublisher; +import com.njcn.access.api.CsTopicFeignClient; +import com.njcn.access.enums.AccessEnum; +import com.njcn.access.enums.AccessResponseEnum; +import com.njcn.access.enums.TypeEnum; +import com.njcn.access.pojo.dto.ReqAndResDto; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.mq.message.AppFileMessage; +import com.njcn.oss.constant.GeneralConstant; +import com.njcn.oss.constant.OssPath; +import com.njcn.oss.utils.FileStorageUtil; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.zlevent.param.CsEventParam; +import com.njcn.zlevent.pojo.dto.FileInfoDto; +import com.njcn.zlevent.pojo.dto.FileStreamDto; +import com.njcn.zlevent.pojo.dto.WaveTimeDto; +import com.njcn.zlevent.service.ICsEventService; +import com.njcn.zlevent.service.IFileService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.sf.json.JSONObject; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.*; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/6 9:32 + */ +@Service +@AllArgsConstructor +@Slf4j +public class FileServiceImpl implements IFileService { + + private final RedisUtil redisUtil; + + private final CsTopicFeignClient csTopicFeignClient; + + private final MqttPublisher publisher; + + private final FileStorageUtil fileStorageUtil; + + private final ICsEventService csEventService; + + @Override + public void analysisFileInfo(AppFileMessage appFileMessage) { + if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){ + int mid = 1; + int range = 51200; + Integer fileSize = appFileMessage.getMsg().getFileInfo().getFileSize(); + String fileName = appFileMessage.getMsg().getFileInfo().getName(); + //缓存文件信息用于文件流拼接 + FileInfoDto fileInfoDto = new FileInfoDto(); + //文件流请求 判断文件大小是否需要分片请求,单次文件大小为50k + if (fileSize <= range){ + askFileStream(appFileMessage.getId(),mid,fileName,0,fileSize); + fileInfoDto.setNumber(1); + } else { + int total = (int)Math.ceil(fileSize*1.0/range) ; + for (int i = 0; i < total; i++) { + askFileStream(appFileMessage.getId(),mid,fileName,i*range,range-1); + mid++; + } + fileInfoDto.setNumber(mid-1); + } + //获取波形文件起始结束时间 + Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.TIME+fileName); + if (Objects.isNull(fileInfo)){ + throw new BusinessException(AccessResponseEnum.WAVE_INFO_MISSING); + } + WaveTimeDto waveTimeDto = JSON.parseObject(JSON.toJSONString(fileInfo), WaveTimeDto.class); + fileInfoDto.setStartTime(waveTimeDto.getStartTime()); + fileInfoDto.setEndTime(waveTimeDto.getEndTime()); + fileInfoDto.setDeviceId(waveTimeDto.getDeviceId()); + fileInfoDto.setLineId(waveTimeDto.getLineId()); + fileInfoDto.setName(appFileMessage.getMsg().getFileInfo().getName()); + fileInfoDto.setFileTime(appFileMessage.getMsg().getFileInfo().getFileTime()); + fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize()); + fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck()); + fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType()); + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto, 3600L); + redisUtil.delete(AppRedisKey.TIME+fileName); + } else { + throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR); + } + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void analysisFileStream(AppFileMessage appFileMessage) { + //todo 目前文件先只处理暂态事件的,后续有其他文件再做处理 + String fileName = appFileMessage.getMsg().getName(); + if(fileName.contains(".cfg") || fileName.contains(".dat")) { + FileStreamDto fileStreamDto = new FileStreamDto(); + String filePath; + Map map = new HashMap<>(); + //获取缓存的文件信息 + Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); + FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class); + //文件流 + Object object = redisUtil.getObjectByKey(fileName); + /* + * 文件解析存储逻辑 + * 1.如果文件只有1帧,那就直接解析文件流; + * 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件 + */ + if (Objects.isNull(object)){ + //第一次录入 + if(fileInfoDto.getNumber() == 1) { + //直接解析文件 + filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId()); + log.info(fileName + "解析成功"); + redisUtil.delete(fileName); + //波形文件关联事件 + filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); + correlateEvents(fileInfoDto,filePath); + } else { + //缓存文件 + map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData()); + fileStreamDto.setMap(map); + redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); + } + } else { + //分帧传递数据,需要校验收到的文件个数 + fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class); + Map l1 = fileStreamDto.getMap(); + l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData()); + if (l1.size() == fileInfoDto.getNumber()){ + //对数据进行排序 + // 将Map的Entry集合转换成List + List> entryList = new ArrayList<>(l1.entrySet()); + // 使用Comparator按Key进行排序 + entryList.sort(new Comparator>() { + @Override + public int compare(Map.Entry entry1, Map.Entry entry2) { + return entry1.getKey().compareTo(entry2.getKey()); + } + }); + //解析文件 + filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId()); + log.info(fileName + "解析成功"); + redisUtil.delete(fileName); + //波形文件关联事件 + filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); + correlateEvents(fileInfoDto,filePath); + } else { + //缓存 + fileStreamDto = new FileStreamDto(); + fileStreamDto.setMap(l1); + redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); + } + } + //记录日志 + + } else { + //todo 处理其他文件 + log.info("暂未做其他文件处理"); + } + } + + + /** + * 请求文件流信息 + */ + public void askFileStream(String nDid, Integer mid, String fileName, Integer offset, Integer len) { + String version = csTopicFeignClient.find(nDid).getData(); + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(mid); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode())); + reqAndResParam.setExpire(-1); + String json = "{Name:\""+fileName+"\",Offset:"+offset+",Len:"+len+"}"; + JSONObject jsonObject = JSONObject.fromObject(json); + reqAndResParam.setMsg(jsonObject); + publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); + } + + /** + * 组装文件 + */ + public String fileStream(Integer number, Map map, String data, String fileName, String nDid) { + String filePath; + if (number == 1){ + filePath = stream(true,data,nDid,fileName,null); + } else { + int lengthByte = 0; + for (int i = 1; i <= number; i++) { + byte[] byteArray = Base64.getDecoder().decode(map.get(i)); + lengthByte += byteArray.length; + } + byte[] allByte = new byte[lengthByte]; + int countLength = 0; + for (int i = 1; i <= number; i++) { + byte[] byteArray = Base64.getDecoder().decode(map.get(i)); + System.arraycopy(byteArray, 0, allByte, countLength, byteArray.length); + countLength += byteArray.length; + } + filePath = stream(false,null,nDid,fileName,allByte); + } + return filePath; + } + + /** + * 解析存储文件信息 + */ + public String stream(boolean bool, String stream, String folder, String fileName, byte[] bytes) { + byte[] byteArray = null; + //将文件后缀替换成大写 + String[] parts = fileName.split(StrUtil.SLASH); + fileName = parts[parts.length - 1].replaceAll(".cfg", GeneralConstant.CFG).replaceAll(".dat",GeneralConstant.DAT); + //处理文件层级 + folder = createPath(folder); + //解析二进制流成byte数组 + if (bool){ + byteArray = Base64.getDecoder().decode(stream); + } else { + byteArray = bytes; + } + InputStream inputStream = new ByteArrayInputStream(byteArray); + String path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH,fileName); + try { + inputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return path; + } + + /** + * 组装文件路径 + */ + public String createPath(String nDid) { + // 使用StringBuilder来构建新的字符串 + StringBuilder output = new StringBuilder(); + // 遍历输入字符串,每两个字符分割并添加冒号 + for (int i = 0; i < nDid.length(); i += 2) { + if (i > 0) { + //添加横线分隔符 + output.append(StrPool.DASHED); + } + if (i + 1 < nDid.length()) { + // 检查是否有足够的字符来分割成两组 + // 每两个字符添加到新字符串 + output.append(nDid.substring(i, i + 2)); + } else { + // 如果只剩一个字符,可以根据需要添加处理逻辑 + output.append(nDid.charAt(i)); + } + } + return output.toString(); + } + + /** + * 波形文件关联事件 + */ + public void correlateEvents(FileInfoDto fileInfoDto,String path) { + CsEventParam csEventParam = new CsEventParam(); + csEventParam.setLineId(fileInfoDto.getLineId()); + csEventParam.setDeviceId(fileInfoDto.getDeviceId()); + csEventParam.setStartTime(fileInfoDto.getStartTime()); + csEventParam.setEndTime(fileInfoDto.getEndTime()); + csEventParam.setPath(path); + csEventService.updateCsEvent(csEventParam); + + } +} diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java index 8c97286..614c0b6 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java @@ -10,6 +10,7 @@ import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.RocketMqLogFeignClient; import com.njcn.system.pojo.po.RocketmqMsgErrorLog; import com.njcn.zlevent.api.EventFeignClient; +import com.njcn.zlevent.api.WaveFeignClient; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; @@ -45,6 +46,9 @@ public class AppEventConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + + @Resource + private FileFeignClient fileFeignClient; + + @Override + protected void handleMessage(AppFileMessage appFileMessage) { + log.info("处理文件信息"); + fileFeignClient.fileInfo(appFileMessage); + } + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(AppFileMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(AppFileMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(AppFileMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(AppFileMessage appFileMessage) { + super.dispatchMessage(appFileMessage); + } +} diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppFileStreamConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppFileStreamConsumer.java new file mode 100644 index 0000000..8539a97 --- /dev/null +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppFileStreamConsumer.java @@ -0,0 +1,137 @@ +package com.njcn.message.consumer; + +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.constant.MessageStatus; +import com.njcn.mq.message.AppFileMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import com.njcn.zlevent.api.FileFeignClient; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:32 + */ +@Service +@RocketMQMessageListener( + topic = BusinessTopic.NJCN_APP_FILE_STREAM_TOPIC, + consumerGroup = BusinessTopic.NJCN_APP_FILE_STREAM_TOPIC, + selectorExpression = BusinessTopic.FileTag.STREAM_TAG, + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class AppFileStreamConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + + @Resource + private FileFeignClient fileFeignClient; + + @Override + protected void handleMessage(AppFileMessage appFileMessage) { + log.info("处理文件流信息"); + fileFeignClient.fileStream(appFileMessage); + } + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(AppFileMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(AppFileMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(AppFileMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(AppFileMessage appFileMessage) { + super.dispatchMessage(appFileMessage); + } +}