From 6f87784ddfbae4d8fe0cd0714f6c172ad57bc783 Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Thu, 23 Apr 2026 08:58:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=87=E4=BB=B6=E7=B3=BB=E7=BB=9F=EF=BC=8C?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E5=8D=87=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/enums/AccessResponseEnum.java | 10 +- .../java/com/njcn/access/enums/TypeEnum.java | 13 + .../njcn/access/utils/SendMessageUtil.java | 28 ++ iot-access/access-boot/pom.xml | 4 + iot-analysis/analysis-rt/rt-boot/pom.xml | 4 + iot-analysis/analysis-stat/stat-boot/pom.xml | 4 + .../zlevent/pojo/dto/CommonBaseMessage.java | 46 ++ .../pojo/dto/DevVersionResponeDTO.java | 205 ++++++++ .../pojo/dto/DeviceVersionRequestDTO.java | 55 +++ .../pojo/dto/FileDownloadRequestDTO.java | 60 +++ .../pojo/dto/FileDownloadResponeDTO.java | 42 ++ .../zlevent/pojo/dto/FileInfoRequestDTO.java | 57 +++ .../zlevent/pojo/dto/FileInfoResponseDTO.java | 79 +++ .../pojo/dto/FileOrDirDeleteRequestDTO.java | 58 +++ .../pojo/dto/FileOrDirDeleteResponeDTO.java | 39 ++ .../pojo/dto/FileUploadRequestDTO.java | 60 +++ .../pojo/dto/FileUploadResponeDTO.java | 39 ++ .../zlevent/pojo/dto/MkdirRequestDTO.java | 57 +++ .../zlevent/pojo/dto/MkdirResponeDTO.java | 39 ++ .../zlevent/pojo/dto/RebootRequestDTO.java | 55 +++ .../zlevent/pojo/dto/RebootResponeDTO.java | 39 ++ .../zlevent/pojo/dto/UpgradeRequestDTO.java | 55 +++ .../zlevent/pojo/dto/UpgradeResponeDTO.java | 39 ++ .../pojo/dto/WorkingLogRequestDTO.java | 55 +++ .../pojo/dto/WorkingLogResponeDTO.java | 101 ++++ .../njcn/zlevent/pojo/param/FileParam.java | 13 + .../analysis-zl-event/zl-event-boot/pom.xml | 4 + .../zlevent/config/TaskSchedulerConfig.java | 63 +++ .../zlevent/controller/DeviceController.java | 80 +++ .../zlevent/controller/FileController.java | 62 ++- .../njcn/zlevent/producer/CommonProducer.java | 33 ++ .../njcn/zlevent/service/IDeviceService.java | 67 +++ .../njcn/zlevent/service/IFileService.java | 38 ++ .../service/impl/DeviceServiceImpl.java | 289 +++++++++++ .../zlevent/service/impl/FileServiceImpl.java | 466 +++++++++++++++--- .../com/njcn/zlevent/FileDownloadTest.java | 33 ++ .../consumer/CldDevRunFlagConsumer.java | 2 +- .../njcn/message/consumer/CommonConsumer.java | 133 +++++ .../src/main/resources/bootstrap.yml | 2 +- pom.xml | 4 +- 40 files changed, 2459 insertions(+), 73 deletions(-) create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/CommonBaseMessage.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DevVersionResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DeviceVersionRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoResponseDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogRequestDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogResponeDTO.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/param/FileParam.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/config/TaskSchedulerConfig.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/DeviceController.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/producer/CommonProducer.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IDeviceService.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/DeviceServiceImpl.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/test/java/com/njcn/zlevent/FileDownloadTest.java create mode 100644 iot-message/message-boot/src/main/java/com/njcn/message/consumer/CommonConsumer.java diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 5d976af..258e5f0 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -75,7 +75,15 @@ public enum AccessResponseEnum { FILE_CHECK_ERROR("A0312","文件校验码不一致!"), CLD_MODEL_EXIST("A0313","云前置模板已存在,请先删除再录入!"), - ; + + /** + * A3001 ~ A3099 用于zlevent模块的枚举 + *

+ */ + FILE_DOWNLOAD_FAIL("A3002", "文件下载失败!"), + UNKNOWN_BUSINESS_TYPE_CODE("A3003", "未知业务type码"), + FILE_UPLOAD_FAIL("A3004", "文件上传失败!"), + TIME_OUT("A3005", "前置响应超时!"),; private final String code; diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java index 5652355..b7db8b1 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java @@ -45,6 +45,19 @@ public enum TypeEnum { TYPE_29("9217","设备心跳请求"), TYPE_30("4865","设备数据主动上送"), TYPE_31("8503","设备控制命令"), + READ_FILE_DIR("1101", "读取文件目录"), + FILE_DOWNLOAD("1102", "文件下载"), + FIXED_VALUE("1103", "定值读取/写入"), + INNER_FIXED_VALUE("1104", "内部定值读取/写入"), + + WORKING_LOG("1111","设备运行日志"), + DEVICE_VERSION("1112","设备版本信息"), + DEVICE_REBOOT("1114","设备重启"), + DEVICE_UPGRADE("1115","设备升级"), + FILE_UPLOAD("1116","文件上传"), + FILE_DELETE("1117","文件删除"), + MKDIR("1118","目录创建"), + DIR_DELETE("1119","目录删除"), /** * 数据类型 diff --git a/iot-access/access-api/src/main/java/com/njcn/access/utils/SendMessageUtil.java b/iot-access/access-api/src/main/java/com/njcn/access/utils/SendMessageUtil.java index 567e755..03acd74 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/utils/SendMessageUtil.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/utils/SendMessageUtil.java @@ -1,7 +1,11 @@ package com.njcn.access.utils; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.pojo.dto.NoticeUserDto; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -25,6 +29,7 @@ import java.nio.charset.StandardCharsets; @AllArgsConstructor @RequiredArgsConstructor public class SendMessageUtil { + private final RedisUtil redisUtil; @Value("${app.sendUrl:https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push}") private String appSendUrl; @@ -67,4 +72,27 @@ public class SendMessageUtil { e.getMessage(); } } + + /** + * 轮询 Redis 等待响应 + */ + public String waitForResponse(String guid, int timeoutSeconds) { + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < timeoutSeconds * 1000L) { + String response = redisUtil.getStringByKey(AppRedisKey.COMMON_RESOPNSE + guid); + if (response != null) { + redisUtil.delete(AppRedisKey.COMMON_REQUEST + guid); + redisUtil.delete(AppRedisKey.COMMON_RESOPNSE + guid); + return response; + } + try { + Thread.sleep(100); // 100ms 轮询一次 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + throw new BusinessException(AccessResponseEnum.TIME_OUT); + } + } diff --git a/iot-access/access-boot/pom.xml b/iot-access/access-boot/pom.xml index 5598be1..fa26dd2 100644 --- a/iot-access/access-boot/pom.xml +++ b/iot-access/access-boot/pom.xml @@ -20,6 +20,10 @@ + + com.github.tocrhz + mqtt-spring-boot-starter + org.eclipse.paho diff --git a/iot-analysis/analysis-rt/rt-boot/pom.xml b/iot-analysis/analysis-rt/rt-boot/pom.xml index 6d3176c..207f28f 100644 --- a/iot-analysis/analysis-rt/rt-boot/pom.xml +++ b/iot-analysis/analysis-rt/rt-boot/pom.xml @@ -21,6 +21,10 @@ + + com.github.tocrhz + mqtt-spring-boot-starter + com.njcn common-web diff --git a/iot-analysis/analysis-stat/stat-boot/pom.xml b/iot-analysis/analysis-stat/stat-boot/pom.xml index 8865a24..1a41c8f 100644 --- a/iot-analysis/analysis-stat/stat-boot/pom.xml +++ b/iot-analysis/analysis-stat/stat-boot/pom.xml @@ -21,6 +21,10 @@ + + com.github.tocrhz + mqtt-spring-boot-starter + com.njcn common-web diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/CommonBaseMessage.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/CommonBaseMessage.java new file mode 100644 index 0000000..1c369d5 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/CommonBaseMessage.java @@ -0,0 +1,46 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-31 + */ +@Data +public class CommonBaseMessage extends BaseMessage { + /** + * 消息请求的唯一标识 + */ + @JsonProperty("guid") + @JsonAlias({"guid"}) + private String guid; + + /** + * 设备Mac + */ + @JsonProperty("devMac") + @JsonAlias({"Dev_mac"}) + private String devMac; + + /** + * 前置Id + */ + @JsonProperty("frontId") + @JsonAlias({"FrontId"}) + private String frontId; + + /** + * 前置进程号 + */ + @JsonProperty("node") + @JsonAlias({"Node"}) + private Integer node; + + @JsonProperty("detail") + @JsonAlias({"Detail"}) + private Object detail; +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DevVersionResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DevVersionResponeDTO.java new file mode 100644 index 0000000..0a1a777 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DevVersionResponeDTO.java @@ -0,0 +1,205 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.time.LocalDate; + +/** + * @author caozehui + * @data 2026-03-16 + */ +@Data +public class DevVersionResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private DevVersionResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + @JsonProperty("Msg") + private DevVersionResponeDTO.Msg msg; + } + + @Data + public static class Msg { + + @JsonProperty("Code") + private Integer code; + + + @JsonProperty("VersionInfo") + private VersionInfo versionInfo; + } + + @Data + public static class VersionInfo { + /** + * 装置基础型号(只用于程序升级鉴别) + */ + @JsonProperty("BaseModel") + private String baseModel; + + /** + * 云服务协议版本 + */ + @JsonProperty("CloudProtocolVer") + private String cloudProtocolVer; + + /** + * 应用程序版本号 + */ + @JsonProperty("AppVersion") + private String appVersion; + + /** + * 应用程序版本日期 + */ + @JsonProperty("AppDate") + private LocalDate appDate; + + /** + * 应用程序校验码 + */ + @JsonProperty("AppChecksum") + private String appChecksum; + + /** + * 电压接线方式(0-星1-三角2-V) + */ + @JsonProperty("VoltageWiring") + private String voltageWiring; + + /** + * 电流B相是否合成(0-否1-是) + */ + @JsonProperty("CurrentBSynthetic") + private String currentBSynthetic; + + /** + * 数据统计时间间隔(单位分钟) + */ + @JsonProperty("DataStatInterval") + private Integer dataStatInterval; + + /** + * 额定电压(二次值,单位V) + */ + @JsonProperty("RatedVoltage") + private Double ratedVoltage; + + /** + * PT变比 + */ + @JsonProperty("PTRatio") + private Integer ptRatio; + + /** + * CT变比 + */ + @JsonProperty("CTRatio") + private Integer ctRatio; + + /** + * sntp对时IP + */ + @JsonProperty("SntpIP") + private String sntpIP; + + /** + * sntp对时端口 + */ + @JsonProperty("SntpPort") + private Integer sntpPort; + + /** + * sntp对时间隔(单位分钟) + */ + @JsonProperty("SntpInterval") + private Integer sntpInterval; + + /** + * Web端口 + */ + @JsonProperty("WebPort") + private Integer webPort; + + /** + * FTP端口 + */ + @JsonProperty("FtpPort") + private Integer ftpPort; + + /** + * Pqdif文件时间间隔(单位小时) + */ + @JsonProperty("PqdifInterval") + private Integer pqdifInterval; + + /** + * 录波文件包含文件类型数 + */ + @JsonProperty("WaveFileTypeCount") + private Integer waveFileTypeCount; + + /** + * 特殊程序版本信息 + */ + @JsonProperty("SpecialVersion") + private String specialVersion; + + /** + * 装置型号(具体型号全称) + */ + @JsonProperty("DeviceModel") + private String deviceModel; + + /** + * 谐波电度版本标志(0-否1-是) + */ + @JsonProperty("HarmonicEnergyFlag") + private Integer harmonicEnergyFlag; + + /** + * 物理设备名称(仅用于上位机录波文件拼接) + */ + @JsonProperty("PhysicalName") + private String physicalName; + + /** + * 录波LD名称(仅用于上位机录波文件拼接) + */ + @JsonProperty("WaveLDName") + private String waveLDName; + + /** + * 高频谐波功能标志(0-否1-是) + */ + @JsonProperty("HighFreqHarmonicFlag") + private Integer highFreqHarmonicFlag; + + /** + * 投入的通讯协议(2字节十六进制数) + */ + @JsonProperty("CommProtocols") + private Integer commProtocols; + + /** + * 投入的对时方式选择(2字节十六进制数) + */ + @JsonProperty("TimeSyncMethods") + private Integer timeSyncMethods; + + /** + * 装置功能配置(2字节十六进制数) + */ + @JsonProperty("DeviceFunctions") + private Integer deviceFunctions; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DeviceVersionRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DeviceVersionRequestDTO.java new file mode 100644 index 0000000..7eb39bf --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/DeviceVersionRequestDTO.java @@ -0,0 +1,55 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-04-01 + */ +@Data +public class DeviceVersionRequestDTO { + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private DeviceVersionRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + /** + * 详情 + */ + @JSONField(name = "Msg") + private Map msg; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadRequestDTO.java new file mode 100644 index 0000000..c82fbef --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadRequestDTO.java @@ -0,0 +1,60 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-16 + */ +@Data +public class FileDownloadRequestDTO { + + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private FileDownloadRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + @JSONField(name = "Msg") + private FileDownloadRequestDTO.Msg msg; + } + + @Data + public static class Msg { + /** + * 设备的文件名,例如:/etc/vol1_stat.txt + */ + @JSONField(name = "Name") + private String name; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadResponeDTO.java new file mode 100644 index 0000000..f144d6e --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileDownloadResponeDTO.java @@ -0,0 +1,42 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-16 + */ +@Data +public class FileDownloadResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private FileDownloadResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + @JsonProperty("Msg") + private FileDownloadResponeDTO.Msg msg; + } + + @Data + public static class Msg { + /** + * 文件名称,例如 /etc/vol1_stat.txt + */ + @JsonProperty("Name") + private String name; + + /** + * 远端文件名,例如 /download/vol1_stat.txt + */ + @JsonProperty("RemoteName") + private String remoteName; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoRequestDTO.java new file mode 100644 index 0000000..c7c66c9 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoRequestDTO.java @@ -0,0 +1,57 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-18 + */ +@Data +public class FileInfoRequestDTO { + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private FileInfoRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + @JSONField(name = "Msg") + private FileInfoRequestDTO.Msg msg; + } + + @Data + public static class Msg { + + @JSONField(name = "Name") + private String name; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoResponseDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoResponseDTO.java new file mode 100644 index 0000000..fcba67c --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoResponseDTO.java @@ -0,0 +1,79 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.List; + +/** + * @author caozehui + * @data 2026-03-18 + */ +@Data +public class FileInfoResponseDTO extends CommonBaseMessage { + + @JsonProperty("detail") + @JsonAlias({"Detail"}) + private FileInfoResponseDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("type") + @JsonAlias({"Type"}) + private Integer type; + + /** + * 消息主体 + */ + @JsonProperty("msg") + @JsonAlias({"Msg"}) + private FileInfoResponseDTO.Msg msg; + + /** + * 响应码 + */ + @JsonProperty("code") + @JsonAlias({"Code"}) + private Integer code; + } + + @Data + public static class Msg { + /** + * 目录信息数组 + */ + @JsonProperty("dirInfo") + @JsonAlias({"DirInfo"}) + private List dirInfo; + } + + @Data + public static class ResourceElement { + /** + * 文件名/文件夹名称 + */ + @JsonProperty("name") + @JsonAlias({"Name"}) + private String name; + + /** + * 类型,文件/文件夹 + */ + @JsonProperty("type") + @JsonAlias({"Type"}) + private String type; + + /** + * 目录信息数组单个元素的数据成员大小 + */ + @JsonProperty("size") + @JsonAlias({"Size"}) + private Integer size; + + private String prjDataPath; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteRequestDTO.java new file mode 100644 index 0000000..78d5e48 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteRequestDTO.java @@ -0,0 +1,58 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-04-01 + */ +@Data +public class FileOrDirDeleteRequestDTO { + + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private FileOrDirDeleteRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + @JSONField(name = "Msg") + private FileOrDirDeleteRequestDTO.Msg msg; + } + + @Data + public static class Msg { + + @JSONField(name = "Name") + private String name; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteResponeDTO.java new file mode 100644 index 0000000..4836604 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileOrDirDeleteResponeDTO.java @@ -0,0 +1,39 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-03-30 + */ +@Data +public class FileOrDirDeleteResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private FileOrDirDeleteResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + /** + * 响应主体 + */ + @JsonProperty("Msg") + private Map msg; + + /** + * 响应码 + */ + @JsonProperty("Code") + private Integer code; + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadRequestDTO.java new file mode 100644 index 0000000..d7997c8 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadRequestDTO.java @@ -0,0 +1,60 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-18 + */ +@Data +public class FileUploadRequestDTO { + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private FileUploadRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + @JSONField(name = "Msg") + private FileUploadRequestDTO.Msg msg; + } + + @Data + public static class Msg { + + @JSONField(name = "Name") + private String name; + + @JSONField(name = "RemoteName") + private String remoteName; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadResponeDTO.java new file mode 100644 index 0000000..43eb69b --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileUploadResponeDTO.java @@ -0,0 +1,39 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-03-30 + */ +@Data +public class FileUploadResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private FileUploadResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + /** + * 响应主体 + */ + @JsonProperty("Msg") + private Map msg; + + /** + * 响应码 + */ + @JsonProperty("Code") + private Integer code; + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirRequestDTO.java new file mode 100644 index 0000000..6617e81 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirRequestDTO.java @@ -0,0 +1,57 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-18 + */ +@Data +public class MkdirRequestDTO { + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private MkdirRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + @JSONField(name = "Msg") + private MkdirRequestDTO.Msg msg; + } + + @Data + public static class Msg { + + @JSONField(name = "Name") + private String name; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirResponeDTO.java new file mode 100644 index 0000000..da55910 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/MkdirResponeDTO.java @@ -0,0 +1,39 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-03-30 + */ +@Data +public class MkdirResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private MkdirResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + /** + * 响应主体 + */ + @JsonProperty("Msg") + private Map msg; + + /** + * 响应码 + */ + @JsonProperty("Code") + private Integer code; + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootRequestDTO.java new file mode 100644 index 0000000..b5da35a --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootRequestDTO.java @@ -0,0 +1,55 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-04-01 + */ +@Data +public class RebootRequestDTO { + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private RebootRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + /** + * 详情 + */ + @JSONField(name = "Msg") + private Map msg; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootResponeDTO.java new file mode 100644 index 0000000..81141b7 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/RebootResponeDTO.java @@ -0,0 +1,39 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-03-30 + */ +@Data +public class RebootResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private RebootResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + /** + * 响应主体 + */ + @JsonProperty("Msg") + private Map msg; + + /** + * 响应码 + */ + @JsonProperty("Code") + private Integer code; + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeRequestDTO.java new file mode 100644 index 0000000..8a7221e --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeRequestDTO.java @@ -0,0 +1,55 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-04-01 + */ +@Data +public class UpgradeRequestDTO { + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private UpgradeRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + /** + * 详情 + */ + @JSONField(name = "Msg") + private Map msg; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeResponeDTO.java new file mode 100644 index 0000000..99a9acc --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/UpgradeResponeDTO.java @@ -0,0 +1,39 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-03-30 + */ +@Data +public class UpgradeResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private UpgradeResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + /** + * 响应主体 + */ + @JsonProperty("Msg") + private Map msg; + + /** + * 响应码 + */ + @JsonProperty("Code") + private Integer code; + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogRequestDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogRequestDTO.java new file mode 100644 index 0000000..5cc89af --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogRequestDTO.java @@ -0,0 +1,55 @@ +package com.njcn.zlevent.pojo.dto; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +import java.util.Map; + +/** + * @author caozehui + * @data 2026-03-30 + */ +@Data +public class WorkingLogRequestDTO { + /** + * 消息请求的唯一标识 + */ + @JSONField(name = "guid") + private String guid; + + /** + * 设备ID + */ + @JSONField(name = "Dev_id") + private String devId; + + /** + * 前置Id + */ + @JSONField(name = "FrontId") + private String frontId; + + /** + * 前置进程号 + */ + @JSONField(name = "Node") + private Integer node; + + @JSONField(name = "Detail") + private WorkingLogRequestDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JSONField(name = "Type") + private Integer type; + + /** + * 详情 + */ + @JSONField(name = "Msg") + private Map msg; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogResponeDTO.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogResponeDTO.java new file mode 100644 index 0000000..fff6af8 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/WorkingLogResponeDTO.java @@ -0,0 +1,101 @@ +package com.njcn.zlevent.pojo.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author caozehui + * @data 2026-03-30 + */ +@Data +public class WorkingLogResponeDTO extends CommonBaseMessage { + + @JsonProperty("Detail") + private WorkingLogResponeDTO.Detail detail; + + @Data + public static class Detail { + /** + * 数据类型,代表特定功能 + */ + @JsonProperty("Type") + private Integer type; + + /** + * 响应主体 + */ + @JsonProperty("Msg") + private WorkingLogResponeDTO.Msg msg; + + /** + * 响应码 + */ + @JsonProperty("Code") + private Integer code; + } + + @Data + public static class Msg { + /** + * 时标 + */ + @JsonProperty("Time") + private LocalDateTime time; + + /** + * CPU负荷(单位%) 39_38(双核,单核的为一个) + */ + @JsonProperty("CpuLoad") + private String cpuLoad; + + /** + * 装置剩余内存(单位MB) + */ + @JsonProperty("FreeMemory") + private String freeMemory; + + /** + * 装置总内存(单位MB) + */ + @JsonProperty("TotalMemory") + private String totalMemory; + + /** + * 装置主存储器剩余空间(单位GB) + */ + @JsonProperty("FreeStorage") + private String freeStorage; + + /** + * 装置主存储器总空间(单位GB) + */ + @JsonProperty("TotalStorage") + private String totalStorage; + + /** + * 硬对时最后时标(B码或秒秒冲) + */ + @JsonProperty("HardTimeSync") + private LocalDateTime hardTimeSync; + + /** + * Sntp对时最后时标 + */ + @JsonProperty("SntpTimeSync") + private LocalDateTime sntpTimeSync; + + /** + * 云服务协议对时最后时标 + */ + @JsonProperty("CloudTimeSync") + private LocalDateTime cloudTimeSync; + + /** + * 无线模块信号强度 + */ + @JsonProperty("SignalStrength") + private String signalStrength; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/param/FileParam.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/param/FileParam.java new file mode 100644 index 0000000..fbeceed --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/param/FileParam.java @@ -0,0 +1,13 @@ +package com.njcn.zlevent.pojo.param; + +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-17 + */ +@Data +public class FileParam { + private String filePath; + private String devId; +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml b/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml index b9fe57f..959136c 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml +++ b/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml @@ -20,6 +20,10 @@ + + com.github.tocrhz + mqtt-spring-boot-starter + com.baomidou dynamic-datasource-spring-boot-starter diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/config/TaskSchedulerConfig.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/config/TaskSchedulerConfig.java new file mode 100644 index 0000000..14cc1d7 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/config/TaskSchedulerConfig.java @@ -0,0 +1,63 @@ +package com.njcn.zlevent.config; + +import cn.hutool.core.util.ObjectUtil; +import com.njcn.zlevent.service.IDeviceService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +import java.time.Instant; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; + +@Slf4j +@Configuration +public class TaskSchedulerConfig { + + @Autowired + private IDeviceService deviceService; + + private final Map> scheduledTasks = new ConcurrentHashMap<>(); + + private final TaskScheduler taskScheduler; + + public TaskSchedulerConfig() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(3); + scheduler.setThreadNamePrefix("device-task-"); + scheduler.setWaitForTasksToCompleteOnShutdown(true); + scheduler.initialize(); + this.taskScheduler = scheduler; + } + + public void startTask(String devId, long intervalSeconds) { + if (scheduledTasks.containsKey(devId)) { + return; + } + + ScheduledFuture future = taskScheduler.schedule( + () -> deviceService.getWorkingLog(devId), + triggerContext -> Date.from(Instant.now().plusSeconds(intervalSeconds)) + ); + + scheduledTasks.put(devId, future); + log.info("启动设备 {} 的定时任务成功,间隔={}秒", devId, intervalSeconds); + } + + public void stopTask(String devId) { + ScheduledFuture future = scheduledTasks.remove(devId); + if (ObjectUtil.isNotNull(future)) { + future.cancel(false); + log.info("停止设备 {} 的定时任务成功", devId); + } + } + + public boolean isTaskRunning(String devId) { + ScheduledFuture future = scheduledTasks.get(devId); + return ObjectUtil.isNotNull(future) && !future.isDone() && !future.isCancelled(); + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/DeviceController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/DeviceController.java new file mode 100644 index 0000000..4fc9120 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/DeviceController.java @@ -0,0 +1,80 @@ +package com.njcn.zlevent.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.web.controller.BaseController; +import com.njcn.zlevent.service.IDeviceService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.multipart.MultipartFile; + +/** + * @author caozehui + * @data 2026-03-20 + */ +@Slf4j +@RestController +@RequestMapping("/device") +@Api(tags = "操作设备") +@AllArgsConstructor +public class DeviceController extends BaseController { + + private IDeviceService deviceService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @GetMapping("/workingLog") + @ApiOperation("开始获取装置运行日志") + public HttpResult startWorkingLog(@RequestParam("devId") String devId) { + String methodDescribe = getMethodDescribe("startWorkingLog"); + deviceService.startWorkingLog(devId); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @GetMapping("/stopWorkingLog") + @ApiOperation("停止获取装置运行日志") + public HttpResult stopWorkingLog(@RequestParam("devId") String devId) { + String methodDescribe = getMethodDescribe("stopWorkingLog"); + deviceService.stopWorkingLogTask(devId); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @GetMapping("/upgrade") + @ApiOperation("装置升级") + @ApiImplicitParams({ + @ApiImplicitParam(name = "devId", value = "设备ID", required = true), + @ApiImplicitParam(name = "edDataId", value = "程序版本Id", required = true) + }) + public HttpResult upgrade(@RequestParam("devId") String devId, @RequestParam("edDataId") String edDataId) { + String methodDescribe = getMethodDescribe("upgrade"); + boolean res = deviceService.upgrade(devId, edDataId); + return HttpResultUtil.assembleCommonResponseResult(res ? CommonResponseEnum.SUCCESS : CommonResponseEnum.FAIL, res, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @GetMapping("/reboot") + @ApiOperation("装置重启") + public HttpResult reboot(@RequestParam("devId") String devId) { + String methodDescribe = getMethodDescribe("reboot"); + boolean res = deviceService.reboot(devId); + return HttpResultUtil.assembleCommonResponseResult(res ? CommonResponseEnum.SUCCESS : CommonResponseEnum.FAIL, res, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @GetMapping("/timeSync") + @ApiOperation("装置对时") + public HttpResult timeSync(@RequestParam("devId") String devId) { + String methodDescribe = getMethodDescribe("listDir"); + boolean res = deviceService.timeSync(devId); + return HttpResultUtil.assembleCommonResponseResult(res ? CommonResponseEnum.SUCCESS : CommonResponseEnum.FAIL, res, methodDescribe); + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java index 5b96c2b..957341c 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/FileController.java @@ -7,13 +7,20 @@ import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; import com.njcn.mq.message.AppFileMessage; import com.njcn.web.controller.BaseController; +import com.njcn.zlevent.pojo.dto.FileInfoResponseDTO; +import com.njcn.zlevent.pojo.param.FileParam; import com.njcn.zlevent.service.IFileService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.*; +import org.springframework.web.multipart.MultipartFile; + +import javax.servlet.http.HttpServletResponse; +import java.util.List; /** * 类的介绍: @@ -35,7 +42,7 @@ public class FileController extends BaseController { @PostMapping("/fileInfo") @ApiOperation("文件信息") @ApiImplicitParam(name = "appFileMessage", value = "数据实体", required = true) - public HttpResult fileInfo(@RequestBody AppFileMessage appFileMessage){ + public HttpResult fileInfo(@RequestBody AppFileMessage appFileMessage) { String methodDescribe = getMethodDescribe("fileInfo"); fileService.analysisFileInfo(appFileMessage); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); @@ -45,7 +52,7 @@ public class FileController extends BaseController { @PostMapping("/fileStream") @ApiOperation("解析文件") @ApiImplicitParam(name = "appFileMessage", value = "数据实体", required = true) - public HttpResult fileStream(@RequestBody AppFileMessage appFileMessage){ + public HttpResult fileStream(@RequestBody AppFileMessage appFileMessage) { String methodDescribe = getMethodDescribe("fileStream"); fileService.analysisFileStream(appFileMessage); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); @@ -55,10 +62,59 @@ public class FileController extends BaseController { @PostMapping("/downloadMakeUpFile") @ApiOperation("下载补召文件") @ApiImplicitParam(name = "nDid", value = "nDid", required = true) - public HttpResult downloadMakeUpFile(@RequestParam("nDid") String nDid){ + public HttpResult downloadMakeUpFile(@RequestParam("nDid") String nDid) { String methodDescribe = getMethodDescribe("downloadMakeUpFile"); fileService.downloadMakeUpFile(nDid); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/listDir") + @ApiOperation("获取目录列表") + @ApiImplicitParam(name = "fileParam", value = "文件路径", required = true) + public HttpResult> listDir(@RequestBody FileParam fileParam) { + String methodDescribe = getMethodDescribe("listDir"); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, fileService.listDir(fileParam.getFilePath(), fileParam.getDevId()), methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/downloadFileFromFront") + @ApiOperation("下载文件") + @ApiImplicitParam(name = "fileParam", value = "文件参数", required = true) + public void downloadFileFromFront(@RequestBody FileParam fileParam, HttpServletResponse response) { + fileService.downloadFileFromFront(fileParam.getFilePath(), fileParam.getDevId(), response); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/mkdir") + @ApiOperation("创建目录") + @ApiImplicitParam(name = "fileParam", value = "文件参数", required = true) + public HttpResult mkdir(@RequestBody FileParam fileParam) { + String methodDescribe = getMethodDescribe("mkdir"); + boolean res = fileService.mkdir(fileParam.getFilePath(), fileParam.getDevId()); + return HttpResultUtil.assembleCommonResponseResult(res ? CommonResponseEnum.SUCCESS : CommonResponseEnum.FAIL, res, methodDescribe); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/uploadFileToFront") + @ApiOperation("上传文件") + @ApiImplicitParams({ + @ApiImplicitParam(name = "file", value = "文件", required = true), + @ApiImplicitParam(name = "devId", value = "设备ID", required = true), + @ApiImplicitParam(name = "dirPath", value = "文件所在路径", required = true) + }) + public HttpResult uploadFileToFront(@RequestPart("file") MultipartFile file, @RequestParam("devId") String devId, @RequestParam("dirPath") String dirPath) { + boolean res = fileService.uploadFileToFront(file, devId, dirPath); + return HttpResultUtil.assembleCommonResponseResult(res ? CommonResponseEnum.SUCCESS : CommonResponseEnum.FAIL, res, getMethodDescribe("uploadFileToFront")); + } + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/delete") + @ApiOperation("删除文件/目录") + @ApiImplicitParam(name = "fileParam", value = "文件参数", required = true) + public HttpResult delete(@RequestBody FileParam fileParam) { + boolean res = fileService.delete(fileParam.getFilePath(), fileParam.getDevId()); + return HttpResultUtil.assembleCommonResponseResult(res ? CommonResponseEnum.SUCCESS : CommonResponseEnum.FAIL, res, getMethodDescribe("delete")); + } + } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/producer/CommonProducer.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/producer/CommonProducer.java new file mode 100644 index 0000000..e108eb9 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/producer/CommonProducer.java @@ -0,0 +1,33 @@ +package com.njcn.zlevent.producer; + +import com.njcn.middle.rocket.domain.BaseMessage; +import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; +import com.njcn.mq.constant.BusinessTopic; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Component; + +/** + * @author caozehui + * @data 2026-03-16 + */ +@Component +public class CommonProducer extends RocketMQEnhanceTemplate { + + + public CommonProducer(RocketMQTemplate template) { + super(template); + } + + /** + * @param message + * @param frontId + * @return + */ + public SendResult send(BaseMessage message, String frontId) { + return send(BusinessTopic.CLOUD_TOPIC, frontId, message); + } + public SendResult send(BaseMessage message) { + return send(BusinessTopic.CLOUD_REPLY_TOPIC, message); + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IDeviceService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IDeviceService.java new file mode 100644 index 0000000..cf51482 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IDeviceService.java @@ -0,0 +1,67 @@ +package com.njcn.zlevent.service; + +import com.njcn.zlevent.pojo.dto.DevVersionResponeDTO; +import com.njcn.zlevent.pojo.dto.DeviceVersionRequestDTO; +import org.springframework.web.multipart.MultipartFile; + +/** + * @author caozehui + * @data 2026-03-20 + */ +public interface IDeviceService { + + /** + * 开始运行日志任务 + * + * @param devId 设备id + */ + void startWorkingLog(String devId); + + /** + * 停止运行日志任务 + * + * @param devId + */ + void stopWorkingLogTask(String devId); + + /** + * 运行日志任务是否正在运行 + * + * @param devId + * @return + */ + boolean isWorkingLogTaskRunning(String devId); + + void getWorkingLog(String devId); + + /** + * 设备对时 + * + * @param devId + * @return + */ + boolean timeSync(String devId); + + /** + * 获取设备版本信息 + * + * @param devId + * @return + */ + DevVersionResponeDTO.VersionInfo getDeviceVersion(String devId); + + /** + * 设备升级 + * + * @param devId + * @param edDataId + */ + boolean upgrade(String devId, String edDataId); + + /** + * 重启设备 + * + * @param devId + */ + boolean reboot(String devId); +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java index e7bc8c4..5a17af6 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IFileService.java @@ -1,6 +1,11 @@ package com.njcn.zlevent.service; import com.njcn.mq.message.AppFileMessage; +import com.njcn.zlevent.pojo.dto.FileInfoResponseDTO; +import org.springframework.web.multipart.MultipartFile; + +import javax.servlet.http.HttpServletResponse; +import java.util.List; /** * 类的介绍: @@ -16,12 +21,14 @@ public interface IFileService { * 解析文件流之前需要获取文件的信息,可能要特殊处理 * 1.文件过大要分片获取(单次请求文件大小不超过50k) * 2.校验文件(md5或者crc) + * * @param appFileMessage */ void analysisFileInfo(AppFileMessage appFileMessage); /** * 获取文件流,解析文件 + * * @param appFileMessage */ void analysisFileStream(AppFileMessage appFileMessage); @@ -30,4 +37,35 @@ public interface IFileService { * 下载补召文件 */ void downloadMakeUpFile(String nDid); + + /** + * 获取目录列表 + * + * @param filePath + * @param devId + * @return + */ + List listDir(String filePath, String devId); + + /** + * 从前置下载文件 + * + * @param filePath + * @param devId + * @param response + */ + void downloadFileFromFront(String filePath, String devId, HttpServletResponse response); + + /** + * 上传文件到文件服务器 + * + * @param file + * @param devId + * @param dirPath + */ + boolean uploadFileToFront(MultipartFile file, String devId, String dirPath); + + boolean mkdir(String filePath, String devId); + + boolean delete(String filePath, String devId); } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/DeviceServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/DeviceServiceImpl.java new file mode 100644 index 0000000..de9ec3e --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/DeviceServiceImpl.java @@ -0,0 +1,289 @@ +package com.njcn.zlevent.service.impl; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.github.tocrhz.mqtt.publisher.MqttPublisher; +import com.njcn.access.enums.TypeEnum; +import com.njcn.access.utils.SendMessageUtil; +import com.njcn.csdevice.api.CsEdDataFeignClient; +import com.njcn.csdevice.api.CsSoftInfoFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; +import com.njcn.csdevice.pojo.po.CsSoftInfoPO; +import com.njcn.csdevice.pojo.vo.CsEdDataVO; +import com.njcn.middle.rocket.domain.BaseMessage; +import com.njcn.oss.utils.FileStorageUtil; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.zlevent.config.TaskSchedulerConfig; +import com.njcn.zlevent.pojo.dto.*; +import com.njcn.zlevent.producer.CommonProducer; +import com.njcn.zlevent.service.IDeviceService; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Service; + +import java.time.ZoneId; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +/** + * @author caozehui + * @data 2026-03-20 + */ +@Service +@AllArgsConstructor +public class DeviceServiceImpl implements IDeviceService { + + private final EquipmentFeignClient equipmentFeignClient; + private final CsSoftInfoFeignClient csSoftInfoFeignClient; + private final CsEdDataFeignClient csEdDataFeignClient; + + private final CommonProducer commonProducer; + private final SendMessageUtil sendMessageUtil; + private final TaskSchedulerConfig taskSchedulerConfig; + private final RedisUtil redisUtil; + private final MqttPublisher publisher; + private final FileStorageUtil fileStorageUtil; + + @Override + public void startWorkingLog(String devId) { + taskSchedulerConfig.startTask(devId, 5); + } + + @Override + public void stopWorkingLogTask(String devId) { + taskSchedulerConfig.stopTask(devId); + } + + @Override + public boolean isWorkingLogTaskRunning(String devId) { + return taskSchedulerConfig.isTaskRunning(devId); + } + + public void getWorkingLog(String devId) { + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + + WorkingLogRequestDTO requestDTO = new WorkingLogRequestDTO(); + requestDTO.setDevId(devId); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + requestDTO.setFrontId(listHttpResult.get(0).getNodeId()); + + WorkingLogRequestDTO.Detail detail = new WorkingLogRequestDTO.Detail(); + detail.setType(Integer.valueOf(TypeEnum.WORKING_LOG.getCode())); + detail.setMsg(new HashMap<>()); + + requestDTO.setDetail(detail); + BaseMessage message = new BaseMessage(); + message.setMessageBody(JSON.toJSONString(requestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message, requestDTO.getFrontId()); + + // 轮询 Redis 等待响应 + WorkingLogResponeDTO workingLogResponeDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), WorkingLogResponeDTO.class); + + WorkingLogResponeDTO.Detail detail1 = workingLogResponeDTO.getDetail(); + WorkingLogResponeDTO.Msg msg1 = detail1.getMsg(); + + //mqtt推送给前端 + publisher.send("/afafaidfasd", JSON.toJSONString(msg1), 1, false); + } + + + @Override + public boolean timeSync(String devId) { +// List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); +// +// BaseMessage message = new BaseMessage(); +// FileDownloadRequestDTO requestDTO = new FileDownloadRequestDTO(); +// requestDTO.setGuid(listHttpResult.get(0).getNodeId()); +// requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + + +// BaseRequestDTO message = new BaseRequestDTO<>(); +// message.setGuid(IdUtil.simpleUUID()); +// message.setFrontId(listHttpResult.get(0).getNodeId()); +// message.setNode(listHttpResult.get(0).getNodeProcess()); +// message.setDevId(devId); +// BaseRequestDTO.Detail detail = new BaseRequestDTO.Detail<>(); +// detail.setType(1113); +// detail.setMsg(null); +// message.setDetail(detail); + + // pendingResponsesMap.put(message.getGuid(), new CompletableFuture<>()); + + // 发送 + //deviceProducer.send(message); + +// BaseMessage baseMessage = null; +// CompletableFuture future = pendingResponsesMap.get(message.getGuid()); +// try { +// baseMessage = future.get(5, TimeUnit.SECONDS); +// } catch (Exception e) { +// throw new BusinessException(ZleventResoponseEnum.RESPONSE_ERROR); +// } finally { +// pendingResponsesMap.remove(message.getGuid()); +// } +// +// Integer code = baseMessage.getDetail().getCode(); + +// return code == 200; + return true; + } + + @Override + public DevVersionResponeDTO.VersionInfo getDeviceVersion(String devId) { + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + CsEquipmentDeliveryDTO csEquipmentDeliveryDTO = listHttpResult.get(0); + // 先询问一下旧的版本信息 + DeviceVersionRequestDTO deviceVersionRequestDTO = new DeviceVersionRequestDTO(); + deviceVersionRequestDTO.setDevId(devId); + deviceVersionRequestDTO.setGuid(IdUtil.simpleUUID()); + deviceVersionRequestDTO.setNode(csEquipmentDeliveryDTO.getNodeProcess()); + deviceVersionRequestDTO.setFrontId(csEquipmentDeliveryDTO.getNodeId()); + + DeviceVersionRequestDTO.Detail detail1 = new DeviceVersionRequestDTO.Detail(); + detail1.setMsg(new HashMap<>()); + detail1.setType(Integer.parseInt(TypeEnum.DEVICE_VERSION.getCode())); + deviceVersionRequestDTO.setDetail(detail1); + + BaseMessage message = new BaseMessage(); + message.setMessageBody(JSON.toJSONString(deviceVersionRequestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + deviceVersionRequestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message, deviceVersionRequestDTO.getFrontId()); + + // 轮询 Redis 等待响应 + DevVersionResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(deviceVersionRequestDTO.getGuid(), 10), DevVersionResponeDTO.class); + DevVersionResponeDTO.Detail detail2 = responseDTO.getDetail(); + + if (detail2.getMsg().getCode() == 200) { + DevVersionResponeDTO.VersionInfo versionInfo = detail2.getMsg().getVersionInfo(); + return versionInfo; + } + return null; + } + + @Override + public boolean upgrade(String devId, String edDataId) { + // 先获取旧的版本信息 + //DevVersionResponeDTO.VersionInfo oldVersionInfo = this.getDeviceVersion(devId); + + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + CsEquipmentDeliveryDTO csEquipmentDeliveryDTO = listHttpResult.get(0); + + CsEdDataVO csEdDataVO = csEdDataFeignClient.findByDevTypeId(edDataId).getData(); + String filePath = csEdDataVO.getFilePath(); + + FileUploadRequestDTO fileUploadRequestDTO = new FileUploadRequestDTO(); + fileUploadRequestDTO.setDevId(devId); + fileUploadRequestDTO.setGuid(IdUtil.simpleUUID()); + fileUploadRequestDTO.setNode(csEquipmentDeliveryDTO.getNodeProcess()); + fileUploadRequestDTO.setFrontId(csEquipmentDeliveryDTO.getNodeId()); + + FileUploadRequestDTO.Detail detail1 = new FileUploadRequestDTO.Detail(); + detail1.setType(Integer.parseInt(TypeEnum.FILE_UPLOAD.getCode())); + + FileUploadRequestDTO.Msg msg = new FileUploadRequestDTO.Msg(); + msg.setName(filePath); + msg.setRemoteName(""); + detail1.setMsg(msg); + + fileUploadRequestDTO.setDetail(detail1); + + BaseMessage message1 = new BaseMessage(); + message1.setMessageBody(JSON.toJSONString(fileUploadRequestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + fileUploadRequestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message1, fileUploadRequestDTO.getFrontId()); + + // 轮询 Redis 等待响应 + FileUploadResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(fileUploadRequestDTO.getGuid(), 10), FileUploadResponeDTO.class); + FileUploadResponeDTO.Detail detail2 = responseDTO.getDetail(); + + if (detail2.getCode() == 200) { + UpgradeRequestDTO requestDTO = new UpgradeRequestDTO(); + requestDTO.setDevId(devId); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setNode(csEquipmentDeliveryDTO.getNodeProcess()); + requestDTO.setFrontId(csEquipmentDeliveryDTO.getNodeId()); + + UpgradeRequestDTO.Detail detail3 = new UpgradeRequestDTO.Detail(); + detail3.setType(Integer.valueOf(TypeEnum.DEVICE_UPGRADE.getCode())); + detail3.setMsg(new HashMap<>()); + + requestDTO.setDetail(detail3); + BaseMessage message2 = new BaseMessage(); + message2.setMessageBody(JSON.toJSONString(requestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message2, requestDTO.getFrontId()); + + // 轮询 Redis 等待响应 + UpgradeResponeDTO responeDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), UpgradeResponeDTO.class); + UpgradeResponeDTO.Detail detail4 = responeDTO.getDetail(); + + if (detail4.getCode() == 200) { + // 修改数据库记录 + String softinfoId = csEquipmentDeliveryDTO.getSoftinfoId(); + + CsSoftInfoPO softInfoPO = null; + if (StrUtil.isNotBlank(softinfoId)) { + csSoftInfoFeignClient.removeSoftInfo(softinfoId); + } + softInfoPO = new CsSoftInfoPO(); + softInfoPO.setId(IdUtil.fastSimpleUUID()); + softInfoPO.setAppCheck(csEdDataVO.getCrc()); + softInfoPO.setAppDate(csEdDataVO.getVersionDate().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime()); + softInfoPO.setAppVersion(csEdDataVO.getVersionNo()); + softInfoPO.setOpAttr("r"); + softInfoPO.setOsName("VxWorks"); + softInfoPO.setOsVersion("VxWorks"); + softInfoPO.setSoftUpdate("yes"); + csSoftInfoFeignClient.saveSoftInfo(softInfoPO); + equipmentFeignClient.updateSoftInfo(csEquipmentDeliveryDTO.getNdid(), softInfoPO.getId()); + + // 重新获取升级后的版本信息 + DevVersionResponeDTO.VersionInfo newVersionInfo = this.getDeviceVersion(devId); + if (newVersionInfo.getAppVersion().equals(csEdDataVO.getVersionNo()) && newVersionInfo.getCloudProtocolVer().equals(csEdDataVO.getVersionAgreement())) { + // 修改数据库记录 + equipmentFeignClient.updateSoftInfo(csEquipmentDeliveryDTO.getNdid(), softInfoPO.getId()); + return true; + } + return false; + } else { + return false; + } + } else { + return false; + } + } + + @Override + public boolean reboot(String devId) { + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + + RebootRequestDTO requestDTO = new RebootRequestDTO(); + requestDTO.setDevId(devId); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + requestDTO.setFrontId(listHttpResult.get(0).getNodeId()); + + return true; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index 9fbcde7..65ad0e1 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -2,6 +2,7 @@ package com.njcn.zlevent.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.text.StrPool; +import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.nacos.shaded.com.google.gson.Gson; @@ -12,19 +13,18 @@ import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.file.FileDto; -import com.njcn.access.utils.CRC32Utils; -import com.njcn.access.utils.ChannelObjectUtil; -import com.njcn.access.utils.FileCommonUtils; -import com.njcn.access.utils.MqttUtil; +import com.njcn.access.utils.*; import com.njcn.common.config.GeneralInfo; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.DeviceFtpFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.PortableOffLogFeignClient; import com.njcn.csdevice.enums.AlgorithmResponseEnum; +import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; import com.njcn.csharmonic.api.WavePicFeignClient; import com.njcn.csharmonic.enums.CsHarmonicResponseEnum; import com.njcn.csharmonic.pojo.dto.DownloadMakeUpDto; +import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.mq.message.AppFileMessage; import com.njcn.oss.constant.GeneralConstant; import com.njcn.oss.constant.OssPath; @@ -32,11 +32,10 @@ import com.njcn.oss.utils.FileStorageUtil; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.zlevent.param.CsEventParam; -import com.njcn.zlevent.pojo.dto.FileInfoDto; -import com.njcn.zlevent.pojo.dto.FileStreamDto; -import com.njcn.zlevent.pojo.dto.WaveTimeDto; +import com.njcn.zlevent.pojo.dto.*; import com.njcn.zlevent.pojo.po.CsEventFileLogs; import com.njcn.zlevent.pojo.po.CsWave; +import com.njcn.zlevent.producer.CommonProducer; import com.njcn.zlevent.service.ICsEventFileLogsService; import com.njcn.zlevent.service.ICsEventService; import com.njcn.zlevent.service.ICsWaveService; @@ -46,12 +45,16 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; +import javax.servlet.http.HttpServletResponse; import java.io.*; +import java.net.URLEncoder; import java.nio.file.Files; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.concurrent.CompletableFuture; /** * 类的介绍: @@ -83,17 +86,22 @@ public class FileServiceImpl implements IFileService { private final DeviceFtpFeignClient deviceFtpFeignClient; private final PortableOffLogFeignClient portableOffLogFeignClient; + private final CommonProducer commonProducer; + private final SendMessageUtil sendMessageUtil; + public final static String UPLOAD_PATH = "upload"; + + @Override public void analysisFileInfo(AppFileMessage appFileMessage) { - if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){ + if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())) { DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); int range = 51200; String fileName = appFileMessage.getMsg().getFileInfo().getName(); //缓存文件信息用于文件流拼接 FileInfoDto fileInfoDto = new FileInfoDto(); - List list = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class); + List list = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()), WaveTimeDto.class); if (CollectionUtil.isNotEmpty(list)) { - WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class).get(0); + WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()), WaveTimeDto.class).get(0); fileInfoDto.setStartTime(waveTimeDto.getStartTime()); fileInfoDto.setEndTime(waveTimeDto.getEndTime()); fileInfoDto.setDeviceId(waveTimeDto.getDeviceId()); @@ -123,14 +131,14 @@ public class FileServiceImpl implements IFileService { mid = (Integer) object; } //请求当前文件的数据 - askFileStream(appFileMessage.getId(),mid,fileName,-1,range); + askFileStream(appFileMessage.getId(), mid, fileName, -1, range); redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); - redisUtil.delete(AppRedisKey.TIME+fileName); + redisUtil.delete(AppRedisKey.TIME + fileName); mid = mid + 1; if (mid > 10000) { mid = 1; } - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + appFileMessage.getId(),mid); + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + appFileMessage.getId(), mid); } } else { throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR); @@ -161,16 +169,16 @@ public class FileServiceImpl implements IFileService { File lsFile = new File(generalInfo.getBusinessTempPath()); //如果文件夹不存在则创建 if (!lsFile.exists() && !lsFile.isDirectory()) { - lsFile .mkdirs(); + lsFile.mkdirs(); } //获取缓存的文件信息 Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class); if (Objects.isNull(fileInfoDto)) { - String fileCheck = redisUtil.getObjectByKey("fileCheck"+appFileMessage.getId()+fileName).toString(); + String fileCheck = redisUtil.getObjectByKey("fileCheck" + appFileMessage.getId() + fileName).toString(); if (appFileMessage.getMsg().getFrameTotal() == 1) { //解析文件入库 - filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileCheck,"download"); + filePath = fileStream(1, null, appFileMessage.getMsg().getData(), fileName, appFileMessage.getId(), fileCheck, "download"); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!"); csEventLogs.setNowStep(1); @@ -188,29 +196,29 @@ public class FileServiceImpl implements IFileService { String key = AppRedisKey.MAKE_UP_FILES + appFileMessage.getId(); Object object = redisUtil.getObjectByKey(key); //清空redis缓存 - fileCommonUtils.cleanRedisData(appFileMessage.getId(),fileName); + fileCommonUtils.cleanRedisData(appFileMessage.getId(), fileName); if (Objects.nonNull(object)) { DownloadMakeUpDto dto = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class); - channelMakeUpFile(dto,appFileMessage.getId(),fileName,filePath,lsFileName); + channelMakeUpFile(dto, appFileMessage.getId(), fileName, filePath, lsFileName); } } else { //收到数据就刷新缓存值 redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_DOWN_TIME.concat(appFileMessage.getMsg().getName()), null, 60L); Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName)); - if (Objects.isNull(object1)){ + if (Objects.isNull(object1)) { fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal()); fileStreamDto.setNDid(appFileMessage.getId()); fileStreamDto.setFrameLen(appFileMessage.getMsg().getFrameLen()); list.add(appFileMessage.getMsg().getFrameCurr()); fileStreamDto.setList(list); csEventLogs.setStatus(1); - csEventLogs.setRemark("当前文件"+appFileMessage.getMsg().getFrameTotal()+"帧,这是第"+appFileMessage.getMsg().getFrameCurr()+"帧,记录成功!"); + csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!"); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); csEventLogs.setIsAll(0); redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto); //将数据写入临时文件 - appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData()); + appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); } else { FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class); @@ -221,7 +229,7 @@ public class FileServiceImpl implements IFileService { Map filePartMap = readFile(lsFileName); filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); //解析文件入库 - filePath = fileStream(dto.getTotal(), filePartMap, null, fileName, appFileMessage.getId(),fileCheck,"download"); + filePath = fileStream(dto.getTotal(), filePartMap, null, fileName, appFileMessage.getId(), fileCheck, "download"); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,全部收到,解析成功!"); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); @@ -237,10 +245,10 @@ public class FileServiceImpl implements IFileService { String key = AppRedisKey.MAKE_UP_FILES + appFileMessage.getId(); Object object = redisUtil.getObjectByKey(key); //清空redis缓存 - fileCommonUtils.cleanRedisData(appFileMessage.getId(),fileName); + fileCommonUtils.cleanRedisData(appFileMessage.getId(), fileName); if (Objects.nonNull(object)) { DownloadMakeUpDto dto2 = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class); - channelMakeUpFile(dto2,appFileMessage.getId(),fileName,filePath,lsFileName); + channelMakeUpFile(dto2, appFileMessage.getId(), fileName, filePath, lsFileName); } } else { csEventLogs.setStatus(1); @@ -262,7 +270,7 @@ public class FileServiceImpl implements IFileService { } } } - String userIndex = redisUtil.getObjectByKey("fileDownUserId"+appFileMessage.getId()+appFileMessage.getMsg().getName()).toString(); + String userIndex = redisUtil.getObjectByKey("fileDownUserId" + appFileMessage.getId() + appFileMessage.getMsg().getName()).toString(); //推送mqtt String json = "{fileName:" + appFileMessage.getMsg().getName() + ",allStep:" + appFileMessage.getMsg().getFrameTotal() @@ -283,7 +291,7 @@ public class FileServiceImpl implements IFileService { redisUtil.saveByKey("handleEvent:" + appFileMessage.getId(),"doing"); if (appFileMessage.getMsg().getFrameTotal() == 1){ //解析文件入库 - filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileInfoDto.getFileCheck(),"event"); + filePath = fileStream(1, null, appFileMessage.getMsg().getData(), fileName, appFileMessage.getId(), fileInfoDto.getFileCheck(), "event"); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!"); csEventLogs.setNowStep(1); @@ -305,25 +313,25 @@ public class FileServiceImpl implements IFileService { } } //解析完删除、处理缓存 - removeInfoUtils.deleteEventInfo(appFileMessage.getId(),fileName); + removeInfoUtils.deleteEventInfo(appFileMessage.getId(), fileName); } else { //收到数据就刷新缓存值 redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 60L); Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName)); - if (Objects.isNull(object1)){ + if (Objects.isNull(object1)) { fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal()); fileStreamDto.setNDid(appFileMessage.getId()); fileStreamDto.setFrameLen(appFileMessage.getMsg().getFrameLen()); list.add(appFileMessage.getMsg().getFrameCurr()); fileStreamDto.setList(list); csEventLogs.setStatus(1); - csEventLogs.setRemark("当前文件"+appFileMessage.getMsg().getFrameTotal()+"帧,这是第"+appFileMessage.getMsg().getFrameCurr()+"帧,记录成功!"); + csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!"); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); csEventLogs.setIsAll(0); redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto); //将数据写入临时文件 - appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData()); + appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); } else { FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class); @@ -334,7 +342,7 @@ public class FileServiceImpl implements IFileService { Map filePartMap = readFile(lsFileName); filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); //解析文件 - filePath = fileStream(appFileMessage.getMsg().getFrameTotal(), filePartMap, null, fileName, appFileMessage.getId(),fileInfoDto.getFileCheck(),"event"); + filePath = fileStream(appFileMessage.getMsg().getFrameTotal(), filePartMap, null, fileName, appFileMessage.getId(), fileInfoDto.getFileCheck(), "event"); csEventLogs.setStatus(1); csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,全部收到,解析成功!"); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); @@ -359,7 +367,7 @@ public class FileServiceImpl implements IFileService { redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); //解析完删除、处理缓存 - removeInfoUtils.deleteEventInfo(appFileMessage.getId(),fileName); + removeInfoUtils.deleteEventInfo(appFileMessage.getId(), fileName); //删除临时文件 File file = new File(lsFileName); if (file.exists()) { @@ -392,7 +400,7 @@ public class FileServiceImpl implements IFileService { //记录日志 csEventLogsService.save(csEventLogs); } - } catch (Exception e){ + } catch (Exception e) { csEventLogs.setStatus(0); csEventLogs.setRemark("文件解析失败,失败原因:" + e.getMessage()); csEventLogs.setCompleteTime(LocalDateTime.now()); @@ -406,9 +414,9 @@ public class FileServiceImpl implements IFileService { file.delete(); } //继续消费 - removeInfoUtils.deleteEventInfo(appFileMessage.getId(),appFileMessage.getMsg().getName()); + removeInfoUtils.deleteEventInfo(appFileMessage.getId(), appFileMessage.getMsg().getName()); //清空redis缓存 - fileCommonUtils.cleanRedisData(appFileMessage.getId(),fileName); + fileCommonUtils.cleanRedisData(appFileMessage.getId(), fileName); } } @@ -418,27 +426,27 @@ public class FileServiceImpl implements IFileService { //判断客户端是否在线,在线再处理文件 String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); boolean mqttClient = mqttUtil.judgeClientOnline(clientName); - if (mqttClient){ + if (mqttClient) { String key = AppRedisKey.MAKE_UP_FILES + nDid; Object object = redisUtil.getObjectByKey(key); if (Objects.nonNull(object)) { DownloadMakeUpDto dto = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class); - if (CollectionUtil.isNotEmpty(dto.getFileList())){ + if (CollectionUtil.isNotEmpty(dto.getFileList())) { Object object1 = channelObjectUtil.getDeviceMid(nDid); if (!Objects.isNull(object1)) { mid = (Integer) object1; } String file = dto.getFileList().get(0); - fileCommonUtils.askFileInfo(nDid,mid,file); + fileCommonUtils.askFileInfo(nDid, mid, file); mid = mid + 1; if (mid > 10000) { mid = 1; } - redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid, mid); Thread.sleep(10000); String infoKey = AppRedisKey.PROJECT_INFO + nDid; FileDto.FileInfo info = channelObjectUtil.objectToSingleObject(redisUtil.getObjectByKey(infoKey), FileDto.FileInfo.class); - deviceFtpFeignClient.downloadFile(nDid,file,info.getFileSize(),info.getFileCheck()).getData(); + deviceFtpFeignClient.downloadFile(nDid, file, info.getFileSize(), info.getFileCheck()).getData(); } } } else { @@ -450,21 +458,347 @@ public class FileServiceImpl implements IFileService { Object object = redisUtil.getObjectByKey(AppRedisKey.MAKE_UP_FILES + nDid); if (Objects.nonNull(object)) { DownloadMakeUpDto dto = channelObjectUtil.objectToSingleObject(object, DownloadMakeUpDto.class); - if (CollectionUtil.isNotEmpty(dto.getFileList())){ + if (CollectionUtil.isNotEmpty(dto.getFileList())) { String file = dto.getFileList().get(0); - fileCommonUtils.cleanRedisData(nDid,file); + fileCommonUtils.cleanRedisData(nDid, file); } } } } + @Override + public List listDir(String filePath, String devId) { + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + + FileInfoRequestDTO requestDTO = new FileInfoRequestDTO(); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setFrontId(listHttpResult.get(0).getNodeId()); + requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + requestDTO.setDevId(devId); + + FileInfoRequestDTO.Detail detail = new FileInfoRequestDTO.Detail(); + detail.setType(Integer.parseInt(TypeEnum.READ_FILE_DIR.getCode())); + + FileInfoRequestDTO.Msg msg = new FileInfoRequestDTO.Msg(); + msg.setName(filePath); + detail.setMsg(msg); + + requestDTO.setDetail(detail); + + BaseMessage message = new BaseMessage(); + message.setMessageBody(JSON.toJSONString(requestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message, requestDTO.getFrontId()); + //this.simulation1(requestDTO.getGuid()); + + // 轮询 Redis 等待响应 + FileInfoResponseDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), FileInfoResponseDTO.class); + + FileInfoResponseDTO.Detail detail1 = responseDTO.getDetail(); + FileInfoResponseDTO.Msg msg1 = detail1.getMsg(); + msg1.getDirInfo().forEach(resourceElement -> { + resourceElement.setPrjDataPath(StrUtil.SLASH.equals(filePath) ? resourceElement.getName() : filePath + StrUtil.SLASH + resourceElement.getName()); + }); + + return msg1.getDirInfo(); + } + + + private void simulation1(String guid) { + // 模拟异步处理,实际场景中应由消息队列或回调触发 + CompletableFuture.runAsync(() -> { + // 模拟耗时操作,例如等待设备响应 + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + FileInfoResponseDTO message = new FileInfoResponseDTO(); + List dirInfo = new ArrayList<>(); + + FileInfoResponseDTO.ResourceElement resourceElement1 = new FileInfoResponseDTO.ResourceElement(); + resourceElement1.setName("/ram0"); + resourceElement1.setType("dir"); + resourceElement1.setSize(1); + dirInfo.add(resourceElement1); + + FileInfoResponseDTO.ResourceElement resourceElement2 = new FileInfoResponseDTO.ResourceElement(); + resourceElement2.setName("/etc"); + resourceElement2.setType("dir"); + resourceElement2.setSize(1); + dirInfo.add(resourceElement2); + + FileInfoResponseDTO.ResourceElement resourceElement3 = new FileInfoResponseDTO.ResourceElement(); + resourceElement3.setName("/sd0:1"); + resourceElement3.setType("dir"); + resourceElement3.setSize(1); + dirInfo.add(resourceElement3); + + FileInfoResponseDTO.ResourceElement resourceElement4 = new FileInfoResponseDTO.ResourceElement(); + resourceElement4.setName("1773986668375094.xls"); + resourceElement4.setType("file"); + resourceElement4.setSize(1000); + dirInfo.add(resourceElement4); + + FileInfoResponseDTO.Detail detail = new FileInfoResponseDTO.Detail(); + FileInfoResponseDTO.Msg msg = new FileInfoResponseDTO.Msg(); + msg.setDirInfo(dirInfo); + detail.setMsg(msg); + detail.setCode(200); + detail.setType(4657); + detail.setMsg(msg); + + message.setGuid(guid); + message.setFrontId("sdghsfdhfdhdfhdfghd234234534534"); + message.setNode(1); + message.setDevMac("A0BC7B4A5D8A"); + message.setDetail(detail); + + + BaseMessage baseMessage = new BaseMessage(); + baseMessage.setSendTime(LocalDateTime.now()); + baseMessage.setMessageBody(JSON.toJSONString(message)); + commonProducer.send(baseMessage); + }); + } + + private void simulation2(String guid) { + // 模拟异步处理,实际场景中应由消息队列或回调触发 + CompletableFuture.runAsync(() -> { + // 模拟耗时操作,例如等待设备响应 + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + FileDownloadResponeDTO message = new FileDownloadResponeDTO(); + message.setGuid(guid); + message.setFrontId("sdghsfdhfdhdfhdfghd234234534534"); + message.setNode(1); + message.setDevMac("A0BC7B4A5D8A"); + + FileDownloadResponeDTO.Detail detail = new FileDownloadResponeDTO.Detail(); + detail.setType(Integer.parseInt(TypeEnum.FILE_DOWNLOAD.getCode())); + FileDownloadResponeDTO.Msg msg = new FileDownloadResponeDTO.Msg(); + msg.setName("a.txt"); + msg.setRemoteName("https://www.jswsrc.com.cn/data/upload/ueditor/file/20260320/1773986668375094.xls"); + detail.setMsg(msg); + + message.setDetail(detail); + + BaseMessage baseMessage = new BaseMessage(); + baseMessage.setSendTime(LocalDateTime.now()); + baseMessage.setMessageBody(JSON.toJSONString(message)); + commonProducer.send(baseMessage); + }); + } + + @Override + public void downloadFileFromFront(String filePath, String devId, HttpServletResponse response) { + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + + FileDownloadRequestDTO requestDTO = new FileDownloadRequestDTO(); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setFrontId(listHttpResult.get(0).getNodeId()); + requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + requestDTO.setDevId(devId); + + FileDownloadRequestDTO.Detail detail = new FileDownloadRequestDTO.Detail(); + detail.setType(Integer.parseInt(TypeEnum.FILE_DOWNLOAD.getCode())); + + FileDownloadRequestDTO.Msg msg = new FileDownloadRequestDTO.Msg(); + msg.setName(filePath); + detail.setMsg(msg); + + requestDTO.setDetail(detail); + + BaseMessage message = new BaseMessage(); + message.setMessageBody(JSON.toJSONString(requestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message, requestDTO.getFrontId()); + //this.simulation2(requestDTO.getGuid()); + + // 轮询 Redis 等待响应 + FileDownloadResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), FileDownloadResponeDTO.class); + + + String remoteName = responseDTO.getDetail().getMsg().getRemoteName(); +// String remoteName = "https://yunpan.360.cn/uploads/20230710/037ca576a421eb0bc23d717a7b076c5f.jpg"; +// String remoteName = "/PQ_PQLD1_001429_20251010_143805_792.dat"; + String fileName = remoteName.substring(remoteName.lastIndexOf(StrUtil.SLASH) + 1); + + + try { + response.setHeader("Content-Disposition", "attachment; filename=" + URLEncoder.encode(fileName, "UTF-8") + "\";filename*=UTF-8''" + URLEncoder.encode(fileName, "UTF-8")); + fileStorageUtil.downloadStream(response, remoteName); + // 下载完后删除文件 + //fileStorageUtil.deleteFile(filePath); + } catch (Exception e) { + throw new BusinessException(AccessResponseEnum.FILE_DOWNLOAD_FAIL); + } + } + + @Override + public boolean uploadFileToFront(MultipartFile file, String devId, String dirPath) { + dirPath = (dirPath.endsWith(StrUtil.SLASH) ? dirPath : dirPath + StrUtil.SLASH); + String remotePath = StrUtil.SLASH + UPLOAD_PATH + StrUtil.SLASH + devId + dirPath; + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + String frontId = listHttpResult.get(0).getNodeId(); + try { + fileStorageUtil.uploadMultipart(file, remotePath, true); + } catch (Exception e) { + throw new BusinessException(AccessResponseEnum.FILE_UPLOAD_FAIL); + } + // 告诉前置上传文件所在的路径、设备等信息 + FileUploadRequestDTO requestDTO = new FileUploadRequestDTO(); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setFrontId(frontId); + requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + requestDTO.setDevId(devId); + + FileUploadRequestDTO.Detail detail = new FileUploadRequestDTO.Detail(); + detail.setType(Integer.parseInt(TypeEnum.FILE_UPLOAD.getCode())); + + FileUploadRequestDTO.Msg msg = new FileUploadRequestDTO.Msg(); + msg.setName(remotePath + file.getOriginalFilename()); + msg.setRemoteName(dirPath + file.getOriginalFilename()); + detail.setMsg(msg); + + requestDTO.setDetail(detail); + + BaseMessage message = new BaseMessage(); + message.setMessageBody(JSON.toJSONString(requestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message, requestDTO.getFrontId()); + + // 轮询 Redis 等待响应 + FileUploadResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), FileUploadResponeDTO.class); + + FileUploadResponeDTO.Detail detail1 = responseDTO.getDetail(); + + return detail1.getCode()==200; + } + + @Override + public boolean mkdir(String filePath, String devId) { + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + String frontId = listHttpResult.get(0).getNodeId(); + + // 告诉前置上传文件所在的路径、设备等信息 + MkdirRequestDTO requestDTO = new MkdirRequestDTO(); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setFrontId(frontId); + requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + requestDTO.setDevId(devId); + + MkdirRequestDTO.Detail detail = new MkdirRequestDTO.Detail(); + detail.setType(Integer.parseInt(TypeEnum.MKDIR.getCode())); + + MkdirRequestDTO.Msg msg = new MkdirRequestDTO.Msg(); + msg.setName(filePath.endsWith(StrUtil.SLASH) ? filePath : filePath + StrUtil.SLASH); + detail.setMsg(msg); + + requestDTO.setDetail(detail); + + BaseMessage message = new BaseMessage(); + message.setMessageBody(JSON.toJSONString(requestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message, requestDTO.getFrontId()); + + // 轮询 Redis 等待响应 + MkdirResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), MkdirResponeDTO.class); + + MkdirResponeDTO.Detail detail1 = responseDTO.getDetail(); + + return detail1.getCode()==200; + } + + @Override + public boolean delete(String filePath, String devId) { + boolean isDir = isDirectory(filePath); + + List listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData(); + String frontId = listHttpResult.get(0).getNodeId(); + + // 告诉前置上传文件所在的路径、设备等信息 + FileOrDirDeleteRequestDTO requestDTO = new FileOrDirDeleteRequestDTO(); + requestDTO.setGuid(IdUtil.simpleUUID()); + requestDTO.setFrontId(frontId); + requestDTO.setNode(listHttpResult.get(0).getNodeProcess()); + requestDTO.setDevId(devId); + + FileOrDirDeleteRequestDTO.Detail detail = new FileOrDirDeleteRequestDTO.Detail(); + detail.setType(Integer.parseInt(isDir ? TypeEnum.DIR_DELETE.getCode() : TypeEnum.FILE_DELETE.getCode())); + + FileOrDirDeleteRequestDTO.Msg msg = new FileOrDirDeleteRequestDTO.Msg(); + msg.setName(isDir ? (filePath.endsWith(StrUtil.SLASH) ? filePath : filePath + StrUtil.SLASH) : filePath); + detail.setMsg(msg); + + requestDTO.setDetail(detail); + + BaseMessage message = new BaseMessage(); + message.setMessageBody(JSON.toJSONString(requestDTO)); + + // 使用 Redis 存储 guid 用于后续查询 + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L); + + // 发送 + commonProducer.send(message, requestDTO.getFrontId()); + + // 轮询 Redis 等待响应 + FileOrDirDeleteResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), FileOrDirDeleteResponeDTO.class); + + FileOrDirDeleteResponeDTO.Detail detail1 = responseDTO.getDetail(); + + return detail1.getCode()==200; + } + + /** + * 根据文件扩展名判断是文件还是目录 + * 规则:包含小数点的是文件,否则是目录 + * + * @param filePath 文件路径 + * @return true=目录,false=文件 + */ + private boolean isDirectory(String filePath) { + String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); + + // 判断文件名中是否包含小数点(从第二个字符开始,排除隐藏文件的情况) + // 例如:.gitignore 是文件,不是目录 + if (fileName.startsWith(".")) { + // 隐藏文件/目录:判断除第一个点外是否还有其他点 + return !fileName.substring(1).contains("."); + } else { + // 普通文件/目录:直接判断是否包含点 + return !fileName.contains("."); + } + } + /** * 处理补召文件 */ - public void channelMakeUpFile(DownloadMakeUpDto dto, String nDid, String fileName, String oldPath, String lsFileName){ + public void channelMakeUpFile(DownloadMakeUpDto dto, String nDid, String fileName, String oldPath, String lsFileName) { try { //如果是补召文件,则将文件复制到补召目录下 - moveFile(oldPath,getFilePath(fileName,nDid),lsFileName); + moveFile(oldPath, getFilePath(fileName, nDid), lsFileName); //删除临时文件 File file = new File(lsFileName); if (file.exists()) { @@ -475,9 +809,9 @@ public class FileServiceImpl implements IFileService { List list = dto.getFileList(); list.removeIf(item -> item.equals(fileName)); dto.setFileList(list); - redisUtil.saveByKey(AppRedisKey.MAKE_UP_FILES + nDid,dto); + redisUtil.saveByKey(AppRedisKey.MAKE_UP_FILES + nDid, dto); //判断是否还有缓存的文件 - if (CollectionUtil.isNotEmpty(list)){ + if (CollectionUtil.isNotEmpty(list)) { //推送进度条 String json = "{allStep:" + dto.getAllStep() * 2 + ",nowStep:" + (dto.getAllStep() - list.size()) + "}"; publisher.send("/dataOnlineRecruitment/Progress/" + dto.getLineId(), new Gson().toJson(json), 1, false); @@ -491,12 +825,12 @@ public class FileServiceImpl implements IFileService { String json = "{allStep:" + dto.getAllStep() * 2 + ",nowStep:" + dto.getAllStep() + "}"; publisher.send("/dataOnlineRecruitment/Progress/" + dto.getLineId(), new Gson().toJson(json), 1, false); //调用方法 - portableOffLogFeignClient.dataOnlineRecruitment(dto.getDevId(),dto.getLineId(),dto.getEngineeringName()); + portableOffLogFeignClient.dataOnlineRecruitment(dto.getDevId(), dto.getLineId(), dto.getEngineeringName()); } } catch (Exception e) { String key = AppRedisKey.MAKE_UP_FILES + nDid; redisUtil.delete(key); - fileCommonUtils.cleanRedisData(nDid,fileName); + fileCommonUtils.cleanRedisData(nDid, fileName); } } @@ -511,20 +845,20 @@ public class FileServiceImpl implements IFileService { reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode())); reqAndResParam.setExpire(-1); - String json = "{Name:\""+fileName+"\",Offset:"+offset+",Len:"+len+"}"; + String json = "{Name:\"" + fileName + "\",Offset:" + offset + ",Len:" + len + "}"; JSONObject jsonObject = JSONObject.fromObject(json); reqAndResParam.setMsg(jsonObject); - publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); + publisher.send("/Pfm/DevFileCmd/" + version + "/" + nDid, new Gson().toJson(reqAndResParam), 1, false); log.info("请求文件流报文:" + new Gson().toJson(reqAndResParam)); } /** * 组装文件 */ - public String fileStream(Integer number, Map map, String data, String fileName, String nDid, String fileCheck,String type) { + public String fileStream(Integer number, Map map, String data, String fileName, String nDid, String fileCheck, String type) { String filePath; - if (number == 1){ - filePath = stream(true,data,nDid,fileName,null,fileCheck,type); + if (number == 1) { + filePath = stream(true, data, nDid, fileName, null, fileCheck, type); } else { int lengthByte = 0; for (int i = 1; i <= number; i++) { @@ -538,39 +872,39 @@ public class FileServiceImpl implements IFileService { System.arraycopy(byteArray, 0, allByte, countLength, byteArray.length); countLength += byteArray.length; } - filePath = stream(false,null,nDid,fileName,allByte,fileCheck,type); + filePath = stream(false, null, nDid, fileName, allByte, fileCheck, type); } return filePath; } /** - * 解析存储文件信息 + * 解析存储文件信息 */ public String stream(boolean bool, String stream, String folder, String fileName, byte[] bytes, String fileCheck, String type) { String path; byte[] byteArray = null; //将文件后缀替换成大写 String[] parts = fileName.split(StrUtil.SLASH); - fileName = parts[parts.length - 1].replaceAll(".cfg", GeneralConstant.CFG).replaceAll(".dat",GeneralConstant.DAT); + fileName = parts[parts.length - 1].replaceAll(".cfg", GeneralConstant.CFG).replaceAll(".dat", GeneralConstant.DAT); //处理文件层级 folder = createPath(folder); //解析二进制流成byte数组 - if (bool){ + if (bool) { byteArray = Base64.getDecoder().decode(stream); } else { byteArray = bytes; } //文件校验 - int crc = CRC32Utils.calculateCRC32(byteArray,byteArray.length,0xffffffff); + int crc = CRC32Utils.calculateCRC32(byteArray, byteArray.length, 0xffffffff); String hexString = String.format("%08X", crc); - if (!Objects.equals(hexString,fileCheck)) { + if (!Objects.equals(hexString, fileCheck)) { throw new BusinessException(AccessResponseEnum.FILE_CHECK_ERROR); } InputStream inputStream = new ByteArrayInputStream(byteArray); - if (Objects.equals(type,"download")) { - path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.DOWNLOAD_DIR + folder + StrUtil.SLASH,fileName); + if (Objects.equals(type, "download")) { + path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.DOWNLOAD_DIR + folder + StrUtil.SLASH, fileName); } else { - path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH,fileName); + path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH, fileName); } try { inputStream.close(); @@ -612,7 +946,7 @@ public class FileServiceImpl implements IFileService { String[] parts = fileName.split(StrUtil.SLASH); fileName = parts[parts.length - 1].split("\\.")[0]; boolean result = csWaveService.findCountByName(fileName); - if (result){ + if (result) { CsEventParam csEventParam = new CsEventParam(); csEventParam.setLineId(fileInfoDto.getLineId()); csEventParam.setDeviceId(fileInfoDto.getDeviceId()); @@ -666,7 +1000,7 @@ public class FileServiceImpl implements IFileService { } for (Map map : mapList) { for (Map.Entry entry : map.entrySet()) { - readMap.put(entry.getKey(),entry.getValue()); + readMap.put(entry.getKey(), entry.getValue()); } } return readMap; @@ -680,6 +1014,7 @@ public class FileServiceImpl implements IFileService { public MyObjectOutputStream(OutputStream out) throws IOException { super(out); } + @Override protected void writeStreamHeader() throws IOException { //重写读取头部信息方法:不写入头部信息 @@ -691,6 +1026,7 @@ public class FileServiceImpl implements IFileService { public MyObjectInputStream(InputStream in) throws IOException { super(in); } + @Override protected void readStreamHeader() throws IOException { //重写读取头部信息方法:什么也不做 @@ -718,7 +1054,7 @@ public class FileServiceImpl implements IFileService { File src = new File(lsPath); src.getParentFile().mkdirs(); InputStream is = Files.newInputStream(src.toPath()); - fileStorageUtil.uploadStreamSpecifyName(is, OssPath.DEV_MAKE_UP_PATH,newPath); + fileStorageUtil.uploadStreamSpecifyName(is, OssPath.DEV_MAKE_UP_PATH, newPath); inputStream.close(); fileOutputStream.close(); is.close(); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/test/java/com/njcn/zlevent/FileDownloadTest.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/test/java/com/njcn/zlevent/FileDownloadTest.java new file mode 100644 index 0000000..7809fd5 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/test/java/com/njcn/zlevent/FileDownloadTest.java @@ -0,0 +1,33 @@ +package com.njcn.zlevent; + +import org.springframework.boot.test.context.SpringBootTest; + +/** + * @author caozehui + * @data 2026-03-17 + */ +@SpringBootTest +public class FileDownloadTest { + +// @Resource +// private FileDownloadProducer fileDownloadProducer; +// +// @Test +// public void testSend() { +// BaseRequestDTO message = new BaseRequestDTO<>(); +// message.setGuid(IdUtil.simpleUUID()); +// message.setNode(1); +// message.setDevId("167456737637374567"); +// message.setFrontId("dhdfhdfghd2342"); +// // 设置 detail(重要!) +// BaseRequestDTO.Detail detail = new BaseRequestDTO.Detail<>(); +// detail.setType(8498); // 设置类型 +// FileDownloadRequestDTO fileDownloadRequestDTO = new FileDownloadRequestDTO(); +// fileDownloadRequestDTO.setName("/etc/vol1_stat.txt"); +// detail.setMsg(fileDownloadRequestDTO); // 设置消息体 +// message.setDetail(detail); +// SendResult send = fileDownloadProducer.send(message); +// System.out.println(JSON.toJSON(send)); +// } + +} diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java index b882757..03426ad 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java @@ -49,7 +49,7 @@ public class CldDevRunFlagConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + + @Override + protected void handleMessage(CommonBaseMessage message) throws Exception { + log.info("@@@@@处理Common信息"); + System.out.println(JSON.toJSON(message)); + String guid = message.getGuid(); + + // 将响应结果存入 Redis + redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_RESOPNSE + guid, JSON.toJSONString(message), 120L); + } + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + protected boolean filter(CommonBaseMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(CommonBaseMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(CommonBaseMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if (exceptionMsg.length() > 200) { + exceptionMsg = exceptionMsg.substring(0, 180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if (!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)) { + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(CommonBaseMessage message) { + super.dispatchMessage(message); + } +} diff --git a/iot-message/message-boot/src/main/resources/bootstrap.yml b/iot-message/message-boot/src/main/resources/bootstrap.yml index 4144174..58b0cdd 100644 --- a/iot-message/message-boot/src/main/resources/bootstrap.yml +++ b/iot-message/message-boot/src/main/resources/bootstrap.yml @@ -51,4 +51,4 @@ mybatis-plus: mqtt: - client-id: message-boot${random.value} \ No newline at end of file + client-id: message-boot${random.value} diff --git a/pom.xml b/pom.xml index 07c0b08..dfc396e 100644 --- a/pom.xml +++ b/pom.xml @@ -47,10 +47,10 @@ 192.168.1.103 - 192.168.1.103 + 192.168.2.124 192.168.1.103 ${middle.server.url}:18848 - + cb3a03dd-47f6-4a83-9408-b3182b3d0619