diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/RspDataDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/RspDataDto.java
index d88aa1a..73a192e 100644
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/RspDataDto.java
+++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/RspDataDto.java
@@ -59,7 +59,7 @@ public class RspDataDto {
}
/**
- * 软件信息
+ * 监测点信息
*/
@Data
public static class LdevInfo {
@@ -79,6 +79,9 @@ public class RspDataDto {
@SerializedName("CtRatio")
private Double ctRatio;
+
+ @SerializedName("Capacity_A")
+ private Double capacity;
}
}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java
index 8f23e28..95b38da 100644
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java
+++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java
@@ -72,6 +72,10 @@ public class EventDto {
@ApiModelProperty("事件类型")
private String type;
+ @SerializedName("Code")
+ @ApiModelProperty("告警故障编码(一般显示为Hex)")
+ private String code;
+
@SerializedName("Parm")
private List param;
}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/CpuInfoDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/CpuInfoDto.java
deleted file mode 100644
index 4352033..0000000
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/CpuInfoDto.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.njcn.access.pojo.dto.devModel;
-
-import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-import java.io.Serializable;
-
-;
-
-/**
- * 类的介绍:
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2022/3/13 12:09
- */
-@Data
-public class CpuInfoDto implements Serializable {
-
- @SerializedName("OpAttr")
- @ApiModelProperty("读写操作属性")
- private String opAttr;
-
- @SerializedName("CpuCore")
- @ApiModelProperty("CPU核心数")
- private Integer cpuCore;
-
- @SerializedName("CpuFreq")
- @ApiModelProperty("CPU主频(单位MHz)")
- private Float cpuFreq;
-
- @SerializedName("Arch")
- @ApiModelProperty("CPU架构")
- private String arch;
-
- @SerializedName("CpuLmt")
- @ApiModelProperty("CPU监控阈值(单位%)")
- private Float cpuLmt;
-}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/DevInfoDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/DevInfoDto.java
deleted file mode 100644
index 21a24a4..0000000
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/DevInfoDto.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.njcn.access.pojo.dto.devModel;
-
-import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-import java.io.Serializable;
-
-;
-
-/**
- * 类的介绍:
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2022/3/13 12:08
- */
-@Data
-public class DevInfoDto implements Serializable {
-
- @SerializedName("OpAttr")
- @ApiModelProperty("读写操作属性")
- private String opAttr;
-
- @SerializedName("DevType")
- @ApiModelProperty("设备型号")
- private String devType;
-
- @SerializedName("DevName ")
- @ApiModelProperty("设备名称")
- private String devName;
-
- @SerializedName("MsgInfo")
- @ApiModelProperty("设备厂商信息")
- private String msgInfo;
-}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/DiskInfoDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/DiskInfoDto.java
deleted file mode 100644
index 115f79c..0000000
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/DiskInfoDto.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.njcn.access.pojo.dto.devModel;
-
-import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import java.io.Serializable;
-
-/**
- * 类的介绍:
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2022/3/13 12:09
- */
-@Data
-public class DiskInfoDto implements Serializable {
-
- @SerializedName("OpAttr")
- @ApiModelProperty("读写操作属性")
- private String opAttr;
-
- @SerializedName("DiskPhy")
- @ApiModelProperty("磁盘空间(单位MB)")
- private Float diskPhy;
-
- @SerializedName("DiskLmt")
- @ApiModelProperty("内存监控阈值(单位%)")
- private Float diskLmt;
-
-}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/LDevInfoDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/LDevInfoDto.java
deleted file mode 100644
index 5665743..0000000
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/LDevInfoDto.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.njcn.access.pojo.dto.devModel;
-
-import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import org.apache.poi.hpsf.Decimal;
-
-import javax.validation.constraints.NotEmpty;
-import java.io.Serializable;
-import java.math.BigDecimal;
-
-/**
- * 类的介绍:
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2023/5/4 10:04
- */
-@Data
-public class LDevInfoDto implements Serializable {
-
- @SerializedName("OpAttr")
- private String opAttr;
-
- @SerializedName("Cldid")
- @ApiModelProperty(value = "逻辑子设备Id")
- private Integer cldId;
-
- @SerializedName("VolGrade")
- @ApiModelProperty(value = "电压等级")
- private Double volGrade;
-
- @SerializedName("ConType")
- @ApiModelProperty(value = "接线方式")
- private Integer conType;
-
- @SerializedName("PtRatio")
- @ApiModelProperty(value = "PT变比")
- private BigDecimal ptRatio;
-
- @SerializedName("CtRatio")
- @ApiModelProperty(value = "CT变比")
- private BigDecimal ctRatio;
-
-}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/MemInfoDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/MemInfoDto.java
deleted file mode 100644
index e3585fd..0000000
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/MemInfoDto.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.njcn.access.pojo.dto.devModel;
-
-import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import java.io.Serializable;
-
-/**
- * 类的介绍:
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2022/3/13 12:09
- */
-@Data
-public class MemInfoDto implements Serializable {
-
- @SerializedName("OpAttr")
- @ApiModelProperty("读写操作属性")
- private String opAttr;
-
- @SerializedName("MemPhy")
- @ApiModelProperty("物理内存(单位MB)")
- private Float memPhy;
-
- @SerializedName("MemLmt")
- @ApiModelProperty("内存监控阈值(单位%)")
- private Float memLmt;
-}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/PrjInfoDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/PrjInfoDto.java
deleted file mode 100644
index fd04d83..0000000
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/PrjInfoDto.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.njcn.access.pojo.dto.devModel;
-
-import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
-import lombok.Data;
-
-import javax.validation.constraints.NotEmpty;
-import java.io.Serializable;
-
-/**
- * 类的介绍:
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2023/5/16 15:30
- */
-@Data
-public class PrjInfoDto implements Serializable {
-
- @SerializedName("OpAttr")
- @NotEmpty(message = "读写操作属性不可为空")
- private String opAttr;
-
- @SerializedName("Province")
- @NotEmpty(message = "安装区域省,不可为空")
- private String province;
-
- @SerializedName("City")
- @NotEmpty(message = "安装区域市,不可为空")
- private String city;
-
- @SerializedName("County")
- @NotEmpty(message = "安装区域县或区,不可为空")
- private String county;
-
- @SerializedName("Address")
- @NotEmpty(message = "安装地址(厂区或变电站),不可为空")
- private String address;
-
- @SerializedName("Position")
- @NotEmpty(message = "安装位置,不可为空")
- private String position;
-
-}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/SoftInfoDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/SoftInfoDto.java
deleted file mode 100644
index 84aa87e..0000000
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/devModel/SoftInfoDto.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.njcn.access.pojo.dto.devModel;
-
-import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.Null;
-import java.io.Serializable;
-
-/**
- * 类的介绍:
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2022/3/13 12:09
- */
-@Data
-public class SoftInfoDto implements Serializable {
-
- @SerializedName("OpAttr")
- @NotEmpty(message = "读写操作属性不可为空")
- private String opAttr;
-
- @SerializedName("OsName")
- @NotEmpty(message = "操作系统名称,裸机系统填Null,不可为空")
- private String osName;
-
- @SerializedName("OsVersion")
- @NotEmpty(message = "操作系统版本,裸机系统填Null,不可为空")
- private String osVersion;
-
- @SerializedName("AppVersion")
- @NotEmpty(message = "应用程序版本号,不可为空")
- private String appVersion;
-
- @SerializedName("AppDate")
- @NotEmpty(message = "应用程序发布日期,不可为空")
- private String appDate;
-
- @SerializedName("AppCheck")
- @NotEmpty(message = "应用程序校验码,不可为空")
- private String appCheck;
-
- @SerializedName("Softupdate")
- @NotEmpty(message = "是否支持远程升级程序,不可为空")
- private String softUpdate;
-
-}
diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java
index 1274fac..2b0d3ec 100644
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java
+++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileDto.java
@@ -50,6 +50,21 @@ public class FileDto implements Serializable {
@SerializedName("Name")
private String name;
+ @SerializedName("FrameCurr")
+ @ApiModelProperty("当前帧")
+ private Integer frameCurr;
+
+ @SerializedName("FrameTotal")
+ @ApiModelProperty("总帧数")
+ private Integer frameTotal;
+
+ @SerializedName("FrameLen")
+ @ApiModelProperty("单帧大小")
+ private Integer frameLen;
+
+ @SerializedName("Offset")
+ private Integer offset;
+
}
@Data
diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java
index ff265d3..e801344 100644
--- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java
+++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java
@@ -1,5 +1,7 @@
package com.njcn.access.handler;
+import cn.hutool.core.collection.CollectionUtil;
+import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.excel.util.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
@@ -17,18 +19,17 @@ import com.njcn.access.pojo.dto.file.FileDto;
import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.pojo.po.CsLineModel;
+import com.njcn.access.pojo.po.CsSoftInfoPO;
import com.njcn.access.pojo.po.CsTopic;
-import com.njcn.access.service.ICsDeviceOnlineLogsService;
-import com.njcn.access.service.ICsEquipmentDeliveryService;
-import com.njcn.access.service.ICsLineModelService;
-import com.njcn.access.service.ICsTopicService;
+import com.njcn.access.service.*;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.exception.BusinessException;
-import com.njcn.csdevice.api.CsLogsFeignClient;
-import com.njcn.csdevice.api.DataSetFeignClient;
-import com.njcn.csdevice.api.DevModelFeignClient;
+import com.njcn.csdevice.api.*;
+import com.njcn.csdevice.pojo.param.CsLineParam;
import com.njcn.csdevice.pojo.po.CsDataSet;
+import com.njcn.csdevice.pojo.po.CsDevCapacityPO;
import com.njcn.csdevice.pojo.po.CsDevModelPO;
+import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.mq.message.AppFileMessage;
@@ -42,19 +43,19 @@ import com.njcn.web.utils.RequestUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
@@ -93,6 +94,14 @@ public class MqttMessageHandler {
private final ICsDeviceOnlineLogsService onlineLogsService;
+ private final ICsSoftInfoService csSoftInfoService;
+
+ private final CsLineFeignClient csLineFeignClient;
+
+ private final DevCapacityFeignClient devCapacityFeignClient;
+
+ private final EquipmentFeignClient equipmentFeignClient;
+
@Autowired
Validator validator;
@@ -292,6 +301,7 @@ public class MqttMessageHandler {
* @param payload
*/
@MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1)
+ @Transactional(rollbackFor = Exception.class)
public void devAccessOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
@@ -306,59 +316,122 @@ public class MqttMessageHandler {
//业务处理
Gson gson = new Gson();
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
- switch (res.getType()){
- case 4613:
- logDto.setOperate(nDid + "设备接入");
- log.info("收到接入应答响应--->" + nDid);
- if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
- log.info("接入应答成功--->" + nDid);
- //修改装置状态
- csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode());
- csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode());
- //记录设备上线
- CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
- CsDeviceOnlineLogs csDeviceOnlineLogs = new CsDeviceOnlineLogs();
- if(Objects.isNull(record)) {
- csDeviceOnlineLogs.setNdid(nDid);
- csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now());
- onlineLogsService.save(csDeviceOnlineLogs);
- } else {
- LocalDateTime time = record.getOfflineTime();
- if (!Objects.isNull(time)){
+ if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
+ switch (res.getType()){
+ case 4613:
+ logDto.setOperate(nDid + "设备接入");
+ log.info("{}收到接入应答响应,应答code {}",nDid,res.getCode());
+ if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
+ int mid = 1;
+ //修改装置状态
+ csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode());
+ csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode());
+ //记录设备上线
+ CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
+ CsDeviceOnlineLogs csDeviceOnlineLogs = new CsDeviceOnlineLogs();
+ if(Objects.isNull(record)) {
csDeviceOnlineLogs.setNdid(nDid);
csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now());
onlineLogsService.save(csDeviceOnlineLogs);
+ } else {
+ LocalDateTime time = record.getOfflineTime();
+ if (!Objects.isNull(time)){
+ csDeviceOnlineLogs.setNdid(nDid);
+ csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now());
+ onlineLogsService.save(csDeviceOnlineLogs);
+ }
}
+ //询问设备软件信息
+ askDevData(nDid,version,1,mid);
+ mid++;
+ //询问设备容量信息
+ askDevData(nDid,version,2,mid);
+ mid++;
+ //询问监测点pt/ct信息
+ askDevData(nDid,version,3,mid);
+ } else {
+ log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
+ logDto.setResult(0);
+ logDto.setFailReason(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
+ csLogsFeignClient.addUserLog(logDto);
+ throw new BusinessException(AccessResponseEnum.ACCESS_RESPONSE_ERROR);
}
- } else {
- log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
- logDto.setResult(0);
- logDto.setFailReason(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
- csLogsFeignClient.addUserLog(logDto);
- throw new BusinessException(AccessResponseEnum.ACCESS_RESPONSE_ERROR);
- }
- break;
- case 4614:
- logDto.setOperate(nDid + "设备数据应答");
- log.info("设备数据应答--->" + nDid);
- RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class);
- switch (rspDataDto.getDataType()){
- case 1:
- RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class);
- redisUtil.saveByKeyWithExpire(AppRedisKey.SOFTINFO+nDid,softInfo,600L);
- break;
- case 2:
- List ldevInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class);
- redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_DATA+nDid,ldevInfo,600L);
- break;
- default:
- break;
- }
- break;
- default:
- break;
+ break;
+ case 4614:
+ log.info("设备数据应答--->" + nDid);
+ RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class);
+ switch (rspDataDto.getDataType()){
+ case 1:
+ logDto.setOperate(nDid + "更新设备软件信息");
+ RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class);
+ //记录设备软件信息
+ CsSoftInfoPO csSoftInfoPo = new CsSoftInfoPO();
+ BeanUtils.copyProperties(softInfo,csSoftInfoPo);
+ try {
+ csSoftInfoPo.setAppDate(new SimpleDateFormat("yyyy-MM-dd").parse(softInfo.getAppDate()));
+ csSoftInfoService.save(csSoftInfoPo);
+ //更新设备软件id 先看是否存在软件信息,删除 然后在录入
+ CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData();
+ String soft = po.getSoftinfoId();
+ if (StringUtil.isNotBlank(soft)){
+ csSoftInfoService.removeById(soft);
+ }
+ equipmentFeignClient.updateSoftInfo(nDid,csSoftInfoPo.getId());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ break;
+ case 2:
+ List devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class);
+ if (CollectionUtil.isNotEmpty(devInfo)){
+ if (Objects.equals(res.getDid(),1)){
+ logDto.setOperate(nDid + "更新治理监测点信息和设备容量");
+ List list = new ArrayList<>();
+ devInfo.forEach(item->{
+ //1.更新治理监测点信息
+ if (Objects.equals(item.getClDid(),0)){
+ CsLineParam csLineParam = new CsLineParam();
+ csLineParam.setLineId(nDid.concat("0"));
+ csLineParam.setVolGrade(item.getVolGrade());
+ csLineParam.setPtRatio(item.getPtRatio());
+ csLineParam.setCtRatio(item.getCtRatio());
+ csLineParam.setConType(item.getConType());
+ csLineFeignClient.updateLine(csLineParam);
+ }
+ //2.录入各个模块设备容量
+ CsDevCapacityPO csDevCapacity = new CsDevCapacityPO();
+ csDevCapacity.setLineId(nDid.concat("0"));
+ csDevCapacity.setCldid(item.getClDid());
+ csDevCapacity.setCapacity(item.getCapacity());
+ list.add(csDevCapacity);
+ });
+ devCapacityFeignClient.addList(list);
+ //3.更新设备模块个数
+ equipmentFeignClient.updateModuleNumber(nDid,(devInfo.size()-1));
+ } else if (Objects.equals(res.getDid(),2)) {
+ logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息");
+ //1.更新电网侧、负载侧监测点相关信息
+ devInfo.forEach(item->{
+ CsLineParam csLineParam = new CsLineParam();
+ csLineParam.setLineId(nDid.concat(item.getClDid().toString()));
+ csLineParam.setVolGrade(item.getVolGrade());
+ csLineParam.setPtRatio(item.getPtRatio());
+ csLineParam.setCtRatio(item.getCtRatio());
+ csLineParam.setConType(item.getConType());
+ csLineFeignClient.updateLine(csLineParam);
+ });
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ break;
+ default:
+ break;
+ }
+ csLogsFeignClient.addUserLog(logDto);
}
- csLogsFeignClient.addUserLog(logDto);
}
@@ -481,11 +554,59 @@ public class MqttMessageHandler {
break;
case 4658:
log.info("获取文件流信息");
- redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName() + appFileMessage.getMid()), appFileMessage.getMid(),3600L);
- appFileStreamMessageTemplate.sendMember(appFileMessage);
+ if (Objects.equals(fileDto.getCode(),AccessEnum.SUCCESS.getCode())){
+ appFileStreamMessageTemplate.sendMember(appFileMessage);
+ }
+ //todo 处理文件信息,先缓存起来,后期在询问
+ else if (Objects.equals(fileDto.getCode(),AccessEnum.REFUSE_WAIT.getCode())) {
+ log.info("需要缓存请求的文件信息");
+ }
break;
default:
break;
}
}
+
+
+ /**
+ * type含义
+ * 1:询问设备软件信息
+ * 2:模块信息
+ * 3:监测点pt/ct信息
+ */
+ public void askDevData(String nDid,String version,Integer type,Integer mid){
+ ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
+ reqAndResParam.setMid(mid);
+ reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
+ reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_6.getCode()));
+ reqAndResParam.setExpire(1);
+ AskDataDto askDataDto = new AskDataDto();
+ askDataDto.setDataAttr(0);
+ askDataDto.setOperate(1);
+ askDataDto.setStartTime(-1);
+ askDataDto.setEndTime(-1);
+ switch (type) {
+ case 1:
+ reqAndResParam.setDid(2);
+ askDataDto.setCldid(0);
+ askDataDto.setDataType(1);
+ break;
+ case 2:
+ reqAndResParam.setDid(1);
+ askDataDto.setCldid(-1);
+ askDataDto.setDataType(2);
+ break;
+ case 3:
+ reqAndResParam.setDid(2);
+ askDataDto.setCldid(-1);
+ askDataDto.setDataType(2);
+ break;
+ default:
+ break;
+ }
+ reqAndResParam.setMsg(askDataDto);
+ publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
+ }
+
+
}
diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java
index fc726b4..716417d 100644
--- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java
+++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java
@@ -1,47 +1,47 @@
-package com.njcn.access.runner;
-
-import com.njcn.access.service.ICsEquipmentDeliveryService;
-import com.njcn.access.service.ICsTopicService;
-import com.njcn.access.service.impl.CsDeviceServiceImpl;
-import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
- *
- * @author xuyang
- * @version 1.0.0
- * @createTime 2023/8/28 13:57
- */
-@Component
-@Slf4j
-public class AccessApplicationRunner implements ApplicationRunner {
-
- @Resource
- private CsDeviceServiceImpl csDeviceService;
-
- @Resource
- private ICsTopicService csTopicService;
-
- @Resource
- private ICsEquipmentDeliveryService csEquipmentDeliveryService;
-
- @Override
- public void run(ApplicationArguments args){
- List list = csEquipmentDeliveryService.getAll();
- list.forEach(item->{
- String version = csTopicService.getVersion(item.getNdid());
- if (!Objects.isNull(version)){
- csDeviceService.devAccess(item.getNdid(),version);
- }
- });
- }
-
-}
+//package com.njcn.access.runner;
+//
+//import com.njcn.access.service.ICsEquipmentDeliveryService;
+//import com.njcn.access.service.ICsTopicService;
+//import com.njcn.access.service.impl.CsDeviceServiceImpl;
+//import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.boot.ApplicationArguments;
+//import org.springframework.boot.ApplicationRunner;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.Resource;
+//import java.util.List;
+//import java.util.Objects;
+//
+///**
+// * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
+// *
+// * @author xuyang
+// * @version 1.0.0
+// * @createTime 2023/8/28 13:57
+// */
+//@Component
+//@Slf4j
+//public class AccessApplicationRunner implements ApplicationRunner {
+//
+// @Resource
+// private CsDeviceServiceImpl csDeviceService;
+//
+// @Resource
+// private ICsTopicService csTopicService;
+//
+// @Resource
+// private ICsEquipmentDeliveryService csEquipmentDeliveryService;
+//
+// @Override
+// public void run(ApplicationArguments args){
+// List list = csEquipmentDeliveryService.getAll();
+// list.forEach(item->{
+// String version = csTopicService.getVersion(item.getNdid());
+// if (!Objects.isNull(version)){
+// csDeviceService.devAccess(item.getNdid(),version);
+// }
+// });
+// }
+//
+//}
diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java
index 8b23d13..9a50303 100644
--- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java
+++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java
@@ -217,7 +217,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csLedgerParam.setSort(0);
csLedgerService.addLedgerTree(csLedgerParam);
List modelId = objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + devAccessParam.getNDid()));
- Integer clDid = null, moduleNumber = null;
//2.新增装置-模板关系、获取电能质量的逻辑设备id
for (CsModelDto item : modelId) {
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
@@ -225,36 +224,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csDevModelRelationAddParm.setModelId(item.getModelId());
csDevModelRelationAddParm.setDid(item.getDid());
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
- if (Objects.equals(item.getType(),1)){
- clDid = item.getDid();
- }
- if (Objects.equals(item.getType(),0)){
- moduleNumber = item.getModuleNumber();
- }
- }
- if (Objects.isNull(clDid)){
- logDto.setResult(0);
- logDto.setFailReason(AccessResponseEnum.CLDID_IS_NULL.getMessage());
- csLogsFeignClient.addUserLog(logDto);
- throw new BusinessException(AccessResponseEnum.CLDID_IS_NULL);
- }
- if (Objects.isNull(moduleNumber)){
- logDto.setResult(0);
- logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage());
- csLogsFeignClient.addUserLog(logDto);
- throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL);
- }
- askDevData(devAccessParam.getNDid(),AccessEnum.L_DEV_INFO.getCode(),version,clDid);
- List list = new ArrayList<>();
- //等待mqtt数据
- Thread.sleep(500);
- String key = AppRedisKey.LINE_DATA + devAccessParam.getNDid();
- list = objectToList2(redisUtil.getObjectByKey(key));
- if (CollectionUtils.isEmpty(list)){
- logDto.setResult(0);
- logDto.setFailReason(AccessResponseEnum.LDEVINFO_IS_NULL.getMessage());
- csLogsFeignClient.addUserLog(logDto);
- throw new BusinessException(AccessResponseEnum.LDEVINFO_IS_NULL);
}
//3.监测点表录入关系
for (DevAccessParam.LineParam item : devAccessParam.getList()) {
@@ -266,24 +235,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
po.setPosition(item.getPosition());
po.setClDid(0);
if (Objects.equals(DicDataEnum.GRID_SIDE.getCode(),location)){
- RspDataDto.LdevInfo po1 = list.stream().filter(s -> Objects.equals(s.getClDid(),1)).findFirst().orElse(null);
po.setLineId(devAccessParam.getNDid() + "1");
param.setId(devAccessParam.getNDid() + "1");
appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "1");
- po.setVolGrade(po1.getVolGrade());
- po.setPtRatio(po1.getPtRatio());
- po.setCtRatio(po1.getCtRatio());
- po.setConType(po1.getConType());
po.setClDid(1);
} else if (Objects.equals(DicDataEnum.LOAD_SIDE.getCode(),location)){
- RspDataDto.LdevInfo po1 = list.stream().filter(s -> Objects.equals(s.getClDid(),2)).findFirst().orElse(null);
po.setLineId(devAccessParam.getNDid() + "2");
param.setId(devAccessParam.getNDid() + "2");
appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "2");
- po.setVolGrade(po1.getVolGrade());
- po.setPtRatio(po1.getPtRatio());
- po.setCtRatio(po1.getCtRatio());
- po.setConType(po1.getConType());
po.setClDid(2);
} else {
po.setLineId(devAccessParam.getNDid() + "0");
@@ -321,34 +280,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
po.setSubUserId(RequestUtil.getUserIndex());
po.setDeviceId(vo.getId());
csDeviceUserService.saveBatch(Collections.singletonList(po));
- //6.录入软件信息 SoftInfo
- askDevData(devAccessParam.getNDid(),AccessEnum.SOFT_INFO.getCode(),version,0);
- //等待mqtt数据
- Thread.sleep(500);
- String key2 = AppRedisKey.SOFTINFO + devAccessParam.getNDid();
- RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(redisUtil.getObjectByKey(key2)), RspDataDto.SoftInfo.class);
- if (Objects.isNull(softInfo)){
- logDto.setResult(0);
- logDto.setFailReason(AccessResponseEnum.SOFTINFO_IS_NULL.getMessage());
- csLogsFeignClient.addUserLog(logDto);
- throw new BusinessException(AccessResponseEnum.SOFTINFO_IS_NULL);
- }
- //记录设备软件信息
- CsSoftInfoPO csSoftInfoPo = new CsSoftInfoPO();
- BeanUtils.copyProperties(softInfo,csSoftInfoPo);
- csSoftInfoPo.setAppDate(new SimpleDateFormat("yyyy-MM-dd").parse(softInfo.getAppDate()));
- csSoftInfoService.save(csSoftInfoPo);
- //更新设备表软件信息
- csEquipmentDeliveryService.updateSoftInfoBynDid(devAccessParam.getNDid(),csSoftInfoPo.getId(),moduleNumber);
- //修改装置状态
+ //6.修改装置状态
csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode());
//7.发起自动接入请求
devAccess(devAccessParam.getNDid(),version);
//8.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + devAccessParam.getNDid());
redisUtil.delete(AppRedisKey.LINE + devAccessParam.getNDid());
- redisUtil.delete(AppRedisKey.LINE_DATA + devAccessParam.getNDid());
- redisUtil.delete(AppRedisKey.SOFTINFO + devAccessParam.getNDid());
//存储日志
csLogsFeignClient.addUserLog(logDto);
//存储设备调试日志表
@@ -437,33 +375,4 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
return urlList;
}
-
- /**
- * 平台向设备发送数据命令
- */
- public void askDevData(String nDid,Integer dataType,String version,Integer did){
- ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
- reqAndResParam.setMid(1);
- reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
- reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_6.getCode()));
- reqAndResParam.setExpire(-1);
- AskDataDto askDataDto = new AskDataDto();
- askDataDto.setDataAttr(0);
- askDataDto.setOperate(1);
- askDataDto.setStartTime(-1);
- askDataDto.setEndTime(-1);
- if (Objects.equals(dataType,AccessEnum.SOFT_INFO.getCode())){
- askDataDto.setCldid(0);
- reqAndResParam.setDid(0);
- askDataDto.setDataType(1);
- } else if (Objects.equals(dataType,AccessEnum.L_DEV_INFO.getCode())){
- askDataDto.setCldid(-1);
- reqAndResParam.setDid(did);
- askDataDto.setDataType(2);
- }
- reqAndResParam.setMsg(askDataDto);
- logger.info("询问设备软件信息(监测点信息):" + new Gson().toJson(reqAndResParam));
- publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
- }
-
}
diff --git a/iot-access/access-boot/src/test/java/com/njcn/AppTest.java b/iot-access/access-boot/src/test/java/com/njcn/AppTest.java
index 3e0212c..c57eca2 100644
--- a/iot-access/access-boot/src/test/java/com/njcn/AppTest.java
+++ b/iot-access/access-boot/src/test/java/com/njcn/AppTest.java
@@ -12,8 +12,16 @@ import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.dto.mqtt.MqttClientDto;
+import com.njcn.access.service.ICsEquipmentDeliveryService;
+import com.njcn.access.service.impl.CsDeviceServiceImpl;
+import com.njcn.common.pojo.enums.response.CommonResponseEnum;
+import com.njcn.common.pojo.exception.BusinessException;
+import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.pojo.po.CsLinePO;
+import com.njcn.oss.utils.FileStorageUtil;
+import com.njcn.redis.utils.RedisUtil;
import io.lettuce.core.protocol.CompleteableCommand;
+import lombok.extern.slf4j.Slf4j;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -26,15 +34,15 @@ import org.springframework.test.context.web.WebAppConfiguration;
import javax.annotation.Resource;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.stream.Collectors;
+import java.util.zip.CRC32;
/**
* Unit test for simple App.
@@ -42,6 +50,7 @@ import java.util.stream.Collectors;
@RunWith(SpringRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = AccessBootApplication.class)
+@Slf4j
public class AppTest
{
/**
@@ -56,6 +65,35 @@ public class AppTest
@Resource
private MqttPublisher publisher;
+ @Resource
+ private FileStorageUtil fileStorageUtil;
+
+ @Resource
+ private ICsEquipmentDeliveryService csEquipmentDeliveryService;
+
+ @Resource
+ private CsDeviceServiceImpl csDeviceService;
+
+ @Resource
+ private RedisUtil redisUtil;
+
+ @Test
+ public void lossTest() {
+ String nDid = "0008C0A801C8";
+ Integer status = csEquipmentDeliveryService.queryEquipmentByndid(nDid).getRunStatus();
+ if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.ONLINE.getCode())){
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
+ ScheduledFuture> runnableFuture = executor.scheduleAtFixedRate(() -> {
+ log.info("定时发送接入请求...");
+ Integer status2 = csEquipmentDeliveryService.queryEquipmentByndid(nDid).getRunStatus();
+ if (Objects.equals(status2,AccessEnum.OFFLINE.getCode())){
+ throw new BusinessException(CommonResponseEnum.SUCCESS);
+ }
+ }, 1, 5, TimeUnit.SECONDS);
+ }
+ }
+
+
@Test
public void test() {
@@ -69,49 +107,43 @@ public class AppTest
// String key = String.valueOf(IdUtil.getSnowflake().nextId());
// System.out.println("key==:" + key);
- List csLinePoList = new ArrayList<>();
- CsLinePO po1 = new CsLinePO();
- po1.setPosition("1");
- CsLinePO po2= new CsLinePO();
- po2.setPosition("2");
- CsLinePO po3= new CsLinePO();
- po3.setPosition("3");
- CsLinePO po4= new CsLinePO();
- po4.setPosition("1");
+// List csLinePoList = new ArrayList<>();
+// CsLinePO po1 = new CsLinePO();
+// po1.setPosition("1");
+// CsLinePO po2= new CsLinePO();
+// po2.setPosition("2");
+// CsLinePO po3= new CsLinePO();
+// po3.setPosition("3");
+// CsLinePO po4= new CsLinePO();
+// po4.setPosition("1");
+//
+// csLinePoList.add(po1);
+// csLinePoList.add(po2);
+// csLinePoList.add(po3);
+// csLinePoList.add(po4);
+// List l = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList());
+// System.out.println("l===:" + l);
+// List lineList = l.stream().filter(e-> Collections.frequency(l,e) > 1).distinct().collect(Collectors.toList());
+// System.out.println("lineList==:" + lineList);
- csLinePoList.add(po1);
- csLinePoList.add(po2);
- csLinePoList.add(po3);
- csLinePoList.add(po4);
- List l = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList());
- System.out.println("l===:" + l);
- List lineList = l.stream().filter(e-> Collections.frequency(l,e) > 1).distinct().collect(Collectors.toList());
- System.out.println("lineList==:" + lineList);
- }
-
-// public static void main(String[] args) {
+// String text = "TkosUFEsMTk5OQ0KNiw2QSwwRA0KMSxBz+C159G5LEEstefRuSxWLDAuMDYyMjU2LDAuMDAwMDAwLDAuMDAwMDAwLC0zMjc2NywzMjc2NywzODAsMzgwLFMNCjIsQs/gtefRuSxCLLXn0bksViwwLjA2MjI1NiwwLjAwMDAwMCwwLjAwMDAwMCwtMzI3NjcsMzI3NjcsMzgwLDM4MCxTDQozLEPP4LXn0bksQyy159G5LFYsMC4wNjIyNTYsMC4wMDAwMDAsMC4wMDAwMDAsLTMyNzY3LDMyNzY3LDM4MCwzODAsUw0KNCxBz+C158H3LEEstefB9yxBLDAuMDE1MjU5LDAuMDAwMDAwLDAuMDAwMDAwLC0zMjc2NywzMjc2NywyMDAsNSxTDQo1LELP4LXnwfcsQiy158H3LEEsMC4wMTUyNTksMC4wMDAwMDAsMC4wMDAwMDAsLTMyNzY3LDMyNzY3LDIwMCw1LFMNCjYsQ8/gtefB9yxDLLXnwfcsQSwwLjAxNTI1OSwwLjAwMDAwMCwwLjAwMDAwMCwtMzI3NjcsMzI3NjcsMjAwLDUsUw0KNTANCjENCjEyODAwLDcxNjgNCjA1LzA5LzIwMjMsMTU6NTQ6MDIuMTM2MDAwDQowNS8wOS8yMDIzLDE1OjU0OjAyLjIzNjAwMA0KQklOQVJZDQoxDQo=";
+// byte[] byteArray = Base64.getDecoder().decode(text);
+// InputStream inputStream = new ByteArrayInputStream(byteArray);
+// fileStorageUtil.uploadStreamSpecifyName(inputStream, "configuration/","xuyang.cfg");
// try {
-// String username = "ac760c62395cecec";
-// String password = "k0vGfe5xOE2Bl4DCF73uahcknvcwoKOEDPnOkMvuSBB";
-//
-// OkHttpClient client = new OkHttpClient();
-//
-// Request request = new Request.Builder()
-// .url("http://192.168.1.18:18083/api/v5/clients/access-boot123456")
-// .header("Content-Type", "application/json")
-// .header("Authorization", Credentials.basic(username, password))
-// .build();
-//
-// Response response = client.newCall(request).execute();
-// response.body();
-// String res = Objects.requireNonNull(response.body()).string();
-//
-// Gson gson = new Gson();
-// MqttClientDto mqttClientDto = gson.fromJson(res, new TypeToken(){}.getType());
-// System.out.println("mqttClientDto==:" + mqttClientDto.isConnected());
-//
+// inputStream.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
-// }
+
+ // 要计算CRC32的数据
+ String data = "Hello, World!";
+ CRC32 crc32 = new CRC32();
+ crc32.update(data.getBytes());
+ long crc32Value = crc32.getValue();
+ // 将CRC32校验值转换为16进制字符串
+ String crc32Str = String.format("%08X", crc32Value);
+ System.out.println("CRC32校验值为: " + crc32Str);
+
+ }
}
diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java
index f7ffc9a..07f74d9 100644
--- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java
+++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java
@@ -6,9 +6,13 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
+import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.param.DataArrayParam;
import com.njcn.csdevice.pojo.po.CsDataArray;
+import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
+import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
+import com.njcn.cswarn.api.CsEquipmentAlarmFeignClient;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.mq.message.AppAutoDataMessage;
@@ -56,6 +60,8 @@ public class StatServiceImpl implements IStatService {
private final RedisUtil redisUtil;
+ private final EquipmentFeignClient equipmentFeignClient;
+
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppAutoDataMessage appAutoDataMessage) {
@@ -86,6 +92,8 @@ public class StatServiceImpl implements IStatService {
if(Objects.isNull(object2)) {
saveData();
}
+ //获取当前设备信息
+ CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appAutoDataMessage.getId()).getData();
if (CollectionUtil.isNotEmpty(list)){
List recordList = new ArrayList<>();
for (AppAutoDataMessage.DataArray item : list) {
@@ -116,7 +124,7 @@ public class StatServiceImpl implements IStatService {
} else {
dataArrayList = objectToList(object);
}
- List result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod());
+ List result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess());
recordList.addAll(result);
}
if (CollectionUtil.isNotEmpty(recordList)){
@@ -183,7 +191,7 @@ public class StatServiceImpl implements IStatService {
/**
* influxDB数据组装
*/
- public List assembleData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod) {
+ public List assembleData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process) {
List records = new ArrayList();
//解码
List floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
@@ -206,6 +214,7 @@ public class StatServiceImpl implements IStatService {
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod);
tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
+ tags.put(InfluxDBTableConstant.PROCESS,process.toString());
Map fields = new HashMap<>();
fields.put(dataArrayList.get(i).getName(),floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
diff --git a/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java b/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java
index f255d81..f945b5a 100644
--- a/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java
+++ b/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java
@@ -1,29 +1,33 @@
package com.njcn;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import com.alibaba.nacos.shaded.com.google.gson.Gson;
-import com.njcn.common.utils.PubUtils;
-import com.njcn.csdevice.pojo.po.CsDataArray;
-import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
-import com.njcn.influxdb.utils.InfluxDbUtils;
-import com.njcn.mq.message.AppAutoDataMessage;
-import com.njcn.redis.pojo.enums.AppRedisKey;
+import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.redis.utils.RedisUtil;
+import com.njcn.stat.StatBootApplication;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.Test;
-import org.springframework.web.bind.annotation.ResponseBody;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.context.web.WebAppConfiguration;
import javax.annotation.Resource;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
/**
* Unit test for simple App.
*/
+@RunWith(SpringRunner.class)
+@WebAppConfiguration
+@SpringBootTest(classes = StatBootApplication.class)
public class AppTest
{
@@ -33,6 +37,7 @@ public class AppTest
@Resource
private InfluxDbUtils influxDbUtils;
+
/**
* Rigorous Test :-)
*/
@@ -42,4 +47,39 @@ public class AppTest
assertTrue( true );
}
+ @Test
+ public void addRedis() {
+ Map tags1 = new HashMap<>();
+ tags1.put("LineID", "4");
+ tags1.put("Phasic_Type", "A");
+ Map fields1 = new HashMap<>();
+ fields1.put("RMS", 4.1111);
+ fields1.put("RMS_AB", 4.1111);
+ fields1.put("RMS_BC", 4.1111);
+ fields1.put("RMS_CA", 4.1111);
+ Map tags2 = new HashMap<>();
+ tags2.put("LineID", "5");
+ tags2.put("Phasic_Type", "A");
+ Map fields2 = new HashMap<>();
+ fields2.put("RMS", 5.1111);
+ fields2.put("RMS_AB", 5.1111);
+ fields2.put("RMS_BC", 5.1111);
+ // 一条记录值。(注意:生产环境不要用System.currentTimeMillis(),因为数据量大会产生重复时间戳,导致数据丢失,要用数据自己的时间戳,这里只做演示)
+ Point point1 = influxDbUtils.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags1, fields1);
+ Point point2 = influxDbUtils.pointBuilder("Data_v", System.currentTimeMillis(), TimeUnit.MILLISECONDS, tags2, fields2);
+// BatchPoints batchPoints1 = BatchPoints.database("Data_v").tag("LineID", "4").tag("Phasic_Type","A").retentionPolicy("").consistency(ConsistencyLevel.ALL).precision(TimeUnit.MILLISECONDS).build();
+ BatchPoints batchPoints1 = BatchPoints.database("test").tag("LineID", "4").tag("Phasic_Type", "A").retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
+ batchPoints1.point(point1);
+ BatchPoints batchPoints2 = BatchPoints.database("test").tag("LineID", "5").tag("Phasic_Type", "A").retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
+ // 将两条记录添加到batchPoints中
+ batchPoints2.point(point2);
+ // 将不同的batchPoints序列化后,一次性写入数据库,提高写入速度
+ List records = new ArrayList();
+ records.add(batchPoints1.lineProtocol());
+ records.add(batchPoints2.lineProtocol());
+ // 将两条数据批量插入到数据库中
+ influxDbUtils.batchInsert("test", "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
+
+ }
+
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java
index b6c6c04..0c66a94 100644
--- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java
+++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileInfoDto.java
@@ -23,9 +23,6 @@ public class FileInfoDto {
private String fileChkType;
- @ApiModelProperty("报文数量")
- private Integer number;
-
private String startTime;
private String endTime;
diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java
index 69af185..3596013 100644
--- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java
+++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/FileStreamDto.java
@@ -3,7 +3,8 @@ package com.njcn.zlevent.pojo.dto;
import lombok.Data;
import java.io.Serializable;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
/**
* 类的介绍:
@@ -15,6 +16,12 @@ import java.util.Map;
@Data
public class FileStreamDto implements Serializable {
- private Map map;
+ private Integer total;
+
+ private String nDid;
+
+ private Integer frameLen;
+
+ private List list;
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java
index 5667c29..97edbe1 100644
--- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java
+++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsWave.java
@@ -73,5 +73,10 @@ public class CsWave {
*/
private LocalDateTime createTime;
+ /**
+ * 文件获取状态 (0:获取失败 1:获取成功)
+ */
+ private Integer status;
+
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java
index 4e5bf30..714623e 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java
@@ -1,13 +1,23 @@
package com.njcn.zlevent.listener;
import cn.hutool.core.collection.CollectionUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.nacos.shaded.com.google.gson.Gson;
+import com.github.tocrhz.mqtt.publisher.MqttPublisher;
+import com.njcn.access.api.CsTopicFeignClient;
+import com.njcn.access.enums.AccessEnum;
+import com.njcn.access.enums.TypeEnum;
+import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.pojo.dto.EpdDTO;
+import com.njcn.zlevent.pojo.dto.FileStreamDto;
+import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import lombok.extern.slf4j.Slf4j;
+import net.sf.json.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.Message;
@@ -16,9 +26,11 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.IntStream;
/**
* @author hongawen
@@ -35,6 +47,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
@Resource
private RedisUtil redisUtil;
+ @Resource
+ private CsTopicFeignClient csTopicFeignClient;
+
+ @Resource
+ private MqttPublisher publisher;
+
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@@ -63,5 +81,45 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
});
redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L);
}
+ //文件流规定时间未收全,请求缺失数据,需要一帧一帧询问
+ else if (expiredKey.startsWith(AppRedisKey.FILE_PART_TIME)) {
+ List missingList = new ArrayList<>();
+ String fileName = expiredKey.split(AppRedisKey.FILE_PART_TIME)[1];
+ Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
+ FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class);
+ int start = 1;
+ int end = dto.getTotal();
+ IntStream.rangeClosed(start, end)
+ .filter(i -> !dto.getList().contains(i))
+ .forEach(missingNumber -> {
+ log.info("缺失的数字:{}",missingNumber);
+ missingList.add(missingNumber);
+ });
+ redisUtil.saveByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName), missingList);
+ Integer offset = (missingList.get(0) - 1) * dto.getFrameLen();
+ askMissingFileStream(dto.getNDid(),missingList.get(0),fileName,offset,dto.getFrameLen());
+ }
}
+
+
+ public void askMissingFileStream(String nDid, Integer mid, String fileName, Integer offset, Integer len) {
+ String version = csTopicFeignClient.find(nDid).getData();
+ ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
+ reqAndResParam.setMid(mid);
+ reqAndResParam.setDid(0);
+ reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
+ reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode()));
+ reqAndResParam.setExpire(-1);
+ String json = "{Name:\""+fileName+"\",Offset:"+offset+",Len:"+len+"}";
+ JSONObject jsonObject = JSONObject.fromObject(json);
+ reqAndResParam.setMsg(jsonObject);
+ publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
+ log.info("请求文件流报文:" + new Gson().toJson(reqAndResParam));
+ }
+
+
+
+
+
+
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java
index 60023eb..e19b497 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsWaveService.java
@@ -20,4 +20,10 @@ public interface ICsWaveService extends IService {
*/
int findCountByName(String fileName);
+ /**
+ * 修改文件状态
+ * @param fileName
+ */
+ void updateCsWave(String fileName);
+
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java
index cf54967..10841c6 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java
@@ -1,6 +1,7 @@
package com.njcn.zlevent.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.zlevent.mapper.CsWaveMapper;
import com.njcn.zlevent.pojo.po.CsWave;
@@ -21,7 +22,14 @@ public class CsWaveServiceImpl extends ServiceImpl impleme
@Override
public int findCountByName(String fileName) {
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
- lambdaQueryWrapper.like(CsWave::getRcdName,fileName);
+ lambdaQueryWrapper.like(CsWave::getRcdName,fileName).eq(CsWave::getStatus,1);
return this.baseMapper.selectCount(lambdaQueryWrapper);
}
+
+ @Override
+ public void updateCsWave(String fileName) {
+ LambdaUpdateWrapper lambdaQueryWrapper = new LambdaUpdateWrapper<>();
+ lambdaQueryWrapper.eq(CsWave::getRcdName,fileName).set(CsWave::getStatus,1);
+ this.update(lambdaQueryWrapper);
+ }
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
index 235adcf..0d9f65f 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
@@ -6,6 +6,7 @@ import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
+import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
@@ -86,7 +87,7 @@ public class EventServiceImpl implements IEventService {
lineInfo(appEventMessage.getId());
}
//获取装置id
- String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
+ CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData();
try {
//处理事件数据
List dataArray = appEventMessage.getMsg().getDataArray();
@@ -95,7 +96,9 @@ public class EventServiceImpl implements IEventService {
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
- csEvent.setDeviceId(deviceId);
+ csEvent.setDeviceId(po.getId());
+ csEvent.setProcess(po.getProcess());
+ csEvent.setCode(item.getCode());
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(eventTime);
tag = item.getName();
@@ -145,7 +148,7 @@ public class EventServiceImpl implements IEventService {
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
- csEventLogs.setDeviceId(deviceId);
+ csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
@@ -162,12 +165,12 @@ public class EventServiceImpl implements IEventService {
}
//推送事件逻辑处理 && cs_event_user入库
for (AppEventMessage.DataArray item : dataArray) {
- sendEventUtils.sendUser(1,item.getType(),deviceId,item.getName(),eventTime,appEventMessage.getId(),id);
+ sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,appEventMessage.getId(),id);
}
} catch (Exception e) {
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
- csEventLogs.setDeviceId(deviceId);
+ csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(eventTime);
csEventLogs.setTag(tag);
csEventLogs.setStatus(0);
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java
index b731281..431f6a2 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java
@@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
+import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
+import com.alibaba.nacos.shaded.com.google.gson.GsonBuilder;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum;
@@ -34,9 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.stereotype.Service;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@@ -73,9 +73,9 @@ public class FileServiceImpl implements IFileService {
@Override
public void analysisFileInfo(AppFileMessage appFileMessage) {
if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){
+ DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
int mid = 1;
int range = 51200;
- Integer fileSize = appFileMessage.getMsg().getFileInfo().getFileSize();
String fileName = appFileMessage.getMsg().getFileInfo().getName();
//缓存文件信息用于文件流拼接
FileInfoDto fileInfoDto = new FileInfoDto();
@@ -95,18 +95,22 @@ public class FileServiceImpl implements IFileService {
fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck());
fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType());
fileInfoDto.setLocation(waveTimeDto.getLocation());
- //文件流请求 判断文件大小是否需要分片请求,单次文件大小为50k
- if (fileSize <= range){
- askFileStream(appFileMessage.getId(),mid,fileName,0,fileSize);
- fileInfoDto.setNumber(1);
- redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + mid));
- redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
- } else {
- int total = (int)Math.ceil(fileSize*1.0/range);
- fileInfoDto.setNumber(total);
- redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
- askFileStream(appFileMessage.getId(), 1, fileName, 0, range);
- }
+ //存储波形文件
+ CsWave csWave = new CsWave();
+ csWave.setNdid(appFileMessage.getId());
+ csWave.setCreateTime(LocalDateTime.now());
+ csWave.setStartTime(LocalDateTime.parse(waveTimeDto.getStartTime(), fmt));
+ csWave.setEndTime(LocalDateTime.parse(waveTimeDto.getEndTime(), fmt));
+ csWave.setRcdName(fileName);
+ csWave.setLocation(waveTimeDto.getLocation());
+ csWave.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
+ csWave.setCheckType(appFileMessage.getMsg().getFileInfo().getFileChkType());
+ csWave.setCheckNumber(appFileMessage.getMsg().getFileInfo().getFileCheck());
+ csWave.setStatus(0);
+ csWaveService.save(csWave);
+ //请求当前文件的数据
+ askFileStream(appFileMessage.getId(),mid,fileName,-1,range);
+ redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
redisUtil.delete(AppRedisKey.TIME+fileName);
} else {
throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR);
@@ -115,7 +119,10 @@ public class FileServiceImpl implements IFileService {
@Override
public void analysisFileStream(AppFileMessage appFileMessage) {
- int range = 51200;
+ DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+ String filePath;
+ List list = new ArrayList<>();
+ FileStreamDto fileStreamDto = new FileStreamDto();
//波形文件上传成功后,将文件信息存储一下,方便后期查看
CsWave csWave = new CsWave();
csWave.setNdid(appFileMessage.getId());
@@ -124,99 +131,112 @@ public class FileServiceImpl implements IFileService {
CsEventFileLogs csEventLogs = new CsEventFileLogs();
csEventLogs.setNdid(appFileMessage.getId());
csEventLogs.setFileName(appFileMessage.getMsg().getName());
+ csEventLogs.setStatus(0);
try {
- //todo 目前文件先只处理暂态事件的,后续有其他文件再做处理
+ //todo 目前文件先只处理波形事件的,后续有其他文件再做处理
String fileName = appFileMessage.getMsg().getName();
+ String lsFileName =fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1];
+ //获取缓存的文件信息
+ Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
+ FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
+ //1.判断当前文件是否之前缓存过,没缓存,则先缓存(这边缓存两条记录,一条是用来判断超时的,还有一条记录文件数据,文件数据目前没有过期时间,文件数据收完才会删除)
+ //2.缓存了判断收到的报文个数是否和总个数一致,一致则解析文件;不一致则更新缓存
+ //3.超时判断: 30s分钟未收到相关文件信息,核查文件个数,看丢失哪些帧,重新请求
if(fileName.contains(".cfg") || fileName.contains(".dat")) {
- DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
- FileStreamDto fileStreamDto = new FileStreamDto();
- String filePath;
- Map map = new HashMap<>();
- //获取缓存的文件信息
- Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
- FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
- //文件流
- Object object = redisUtil.getObjectByKey(fileName);
- /*
- * 文件解析存储逻辑
- * 1.如果文件只有1帧,那就直接解析文件流;
- * 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件
- */
- if (Objects.isNull(object)){
- //第一次录入
- if(fileInfoDto.getNumber() == 1) {
- //直接解析文件
- filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId());
- csEventLogs.setStatus(1);
- csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
- csEventLogs.setNowStep(1);
- csEventLogs.setAllStep(1);
- csEventLogs.setIsAll(1);
- //记录文件信息
- createCsWave(csWave,fileInfoDto,fmt,fileName);
- //波形文件关联事件
- filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
- List eventList = correlateEvents(fileInfoDto,filePath,fileName);
- //波形文件解析成图片
- if (CollectionUtil.isNotEmpty(eventList)){
- eventList.forEach(wavePicFeignClient::getWavePics);
- }
- redisUtil.delete(fileName);
- redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
- } else {
- //缓存文件
- map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
- fileStreamDto.setMap(map);
- redisUtil.saveByKey(fileName, fileStreamDto);
- csEventLogs.setStatus(1);
- csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
- csEventLogs.setNowStep(appFileMessage.getMid());
- csEventLogs.setAllStep(fileInfoDto.getNumber());
- csEventLogs.setIsAll(0);
- log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", fileInfoDto.getNumber(), appFileMessage.getMid());
- //收到文件后,继续请求下一帧报文
- askFileStream(appFileMessage.getId(), appFileMessage.getMid()+1, fileName, appFileMessage.getMid() * range, range);
- redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + appFileMessage.getMid()));
+ if (appFileMessage.getMsg().getFrameTotal() == 1){
+ //解析文件入库
+ filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId());
+ csEventLogs.setStatus(1);
+ csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
+ csEventLogs.setNowStep(1);
+ csEventLogs.setAllStep(1);
+ csEventLogs.setIsAll(1);
+ //更新文件信息
+ csWaveService.updateCsWave(fileName);
+ //波形文件关联事件
+ filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
+ List eventList = correlateEvents(fileInfoDto,filePath,fileName);
+ //波形文件解析成图片
+ if (CollectionUtil.isNotEmpty(eventList)){
+ eventList.forEach(wavePicFeignClient::getWavePics);
}
+ redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
} else {
- //分帧传递数据,需要校验收到的文件个数
- fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class);
- Map l1 = fileStreamDto.getMap();
- l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
- if (l1.size() == fileInfoDto.getNumber()){
- //解析文件
- filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId());
+ Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
+ if (Objects.isNull(object1)){
+ fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal());
+ fileStreamDto.setNDid(appFileMessage.getId());
+ fileStreamDto.setFrameLen(appFileMessage.getMsg().getFrameLen());
+ list.add(appFileMessage.getMsg().getFrameCurr());
+ fileStreamDto.setList(list);
csEventLogs.setStatus(1);
- csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!");
- csEventLogs.setNowStep(l1.size());
- csEventLogs.setAllStep(l1.size());
- csEventLogs.setIsAll(1);
- log.info("当前文件 {} 帧,这是第 {} 帧报文,全部收到,解析成功!", l1.size(), l1.size());
- //记录文件信息
- createCsWave(csWave,fileInfoDto,fmt,fileName);
- //波形文件关联事件
- filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
- List eventList = correlateEvents(fileInfoDto,filePath,fileName);
- //波形文件解析成图片
- if (CollectionUtil.isNotEmpty(eventList)){
- eventList.forEach(wavePicFeignClient::getWavePics);
- }
- redisUtil.delete(fileName);
- redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
- } else {
- //缓存
- fileStreamDto = new FileStreamDto();
- fileStreamDto.setMap(l1);
- redisUtil.saveByKey(fileName, fileStreamDto);
- csEventLogs.setStatus(1);
- csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
- csEventLogs.setNowStep(appFileMessage.getMid());
- csEventLogs.setAllStep(fileInfoDto.getNumber());
+ csEventLogs.setRemark("当前文件"+appFileMessage.getMsg().getFrameTotal()+"帧,这是第"+appFileMessage.getMsg().getFrameCurr()+"帧,记录成功!");
+ csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
+ csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0);
- log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", fileInfoDto.getNumber(), appFileMessage.getMid());
- //收到文件后,继续请求下一帧报文
- askFileStream(appFileMessage.getId(), appFileMessage.getMid()+1, fileName, appFileMessage.getMid() * range, range);
- redisUtil.delete(AppRedisKey.FILE_PART.concat(fileInfoDto.getName() + appFileMessage.getMid()));
+ redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 30L);
+ redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto);
+ //将数据写入临时文件
+ appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData());
+ log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
+ } else {
+ FileStreamDto dto = JSON.parseObject(JSON.toJSONString(object1), FileStreamDto.class);
+ //防止出现录入重复数据,做个判断
+ if (!dto.getList().contains(appFileMessage.getMsg().getFrameCurr())) {
+ dto.getList().add(appFileMessage.getMsg().getFrameCurr());
+ if (Objects.equals(dto.getTotal(), dto.getList().size())) {
+ Map filePartMap = readFile(lsFileName);
+ filePartMap.put(appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
+ //解析文件
+ filePath = fileStream(appFileMessage.getMsg().getFrameTotal(), filePartMap, null, fileName, appFileMessage.getId());
+ csEventLogs.setStatus(1);
+ csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,全部收到,解析成功!");
+ csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
+ csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
+ csEventLogs.setIsAll(1);
+ log.info("当前文件 {} 帧,这是第 {} 帧报文,全部收到,解析成功!", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
+ //修改文件信息
+ csWaveService.updateCsWave(fileName);
+ //波形文件关联事件
+ filePath = filePath.replaceAll(GeneralConstant.CFG, "").replaceAll(GeneralConstant.DAT, "");
+ List eventList = correlateEvents(fileInfoDto, filePath, fileName);
+ //波形文件解析成图片
+ if (CollectionUtil.isNotEmpty(eventList)) {
+ eventList.forEach(wavePicFeignClient::getWavePics);
+ }
+ redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
+ redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
+ redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
+ redisUtil.delete(AppRedisKey.FILE_PART_MISSING.concat(appFileMessage.getMsg().getName()));
+ //删除临时文件
+ File file = new File(lsFileName);
+ if (file.exists()) {
+ file.delete();
+ }
+ } else {
+ Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName));
+ csEventLogs.setStatus(1);
+ csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!");
+ csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
+ csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
+ csEventLogs.setIsAll(0);
+ if (Objects.isNull(object2)) {
+ redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 30L);
+ } else {
+ redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L);
+ }
+ redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), dto);
+ //将数据写入临时文件
+ appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
+ log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
+ }
+ } else {
+ csEventLogs.setStatus(1);
+ csEventLogs.setRemark("当前文件为重复帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,不做记录!");
+ csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
+ csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
+ csEventLogs.setIsAll(0);
+ }
}
}
csEventLogs.setCompleteTime(LocalDateTime.now());
@@ -238,7 +258,6 @@ public class FileServiceImpl implements IFileService {
}
}
-
/**
* 请求文件流信息
*/
@@ -254,6 +273,7 @@ public class FileServiceImpl implements IFileService {
JSONObject jsonObject = JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject);
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
+ log.info("请求文件流报文:" + new Gson().toJson(reqAndResParam));
}
/**
@@ -354,18 +374,81 @@ public class FileServiceImpl implements IFileService {
}
/**
- * 生成波形记录
+ * 数据写入文件
*/
- public void createCsWave(CsWave csWave, FileInfoDto fileInfoDto, DateTimeFormatter fmt, String fileName) {
- csWave.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
- csWave.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
- csWave.setRcdName(fileName);
- csWave.setLocation(fileInfoDto.getLocation());
- csWave.setFileSize(fileInfoDto.getFileSize());
- csWave.setCheckType(fileInfoDto.getFileChkType());
- csWave.setCheckNumber(fileInfoDto.getFileCheck());
- csWaveService.save(csWave);
+ public void appendFile(String filePath, Integer key, String value) {
+ Map map = new HashMap<>();
+ map.put(key, value);
+ try {
+ FileOutputStream fileOutputStream = new FileOutputStream(filePath, true);
+ MyObjectOutputStream outputStream = new MyObjectOutputStream(fileOutputStream);
+ outputStream.writeObject(map);
+ outputStream.close();
+ fileOutputStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
+ /**
+ * 文件读取数据
+ */
+ public Map readFile(String filePath) {
+ Map readMap = new HashMap<>();
+ List