20 Commits

Author SHA1 Message Date
xy
0b32c09fdb 治理设备新增功能 2025-07-03 08:59:57 +08:00
xy
28b23e9c52 治理事件更新 2025-06-24 15:08:14 +08:00
xy
7277299c66 新增数据 2025-06-24 11:32:37 +08:00
xy
49642066f5 bug调整 2025-06-19 09:39:44 +08:00
xy
826970357e 代码优化 2025-04-22 15:55:57 +08:00
11713e9b68 优化处理,避免任务重叠问题
异常处理
2025-03-12 16:02:57 +08:00
xy
6d3a1dd735 代码优化 2025-01-16 10:24:52 +08:00
xy
72150a3acc 微调 2025-01-14 16:36:17 +08:00
xy
19999a582e 微调 2025-01-14 16:26:15 +08:00
xy
7293d9b84d 代码优化 2025-01-14 16:06:28 +08:00
xy
40a6cd608c 代码优化 2025-01-14 11:47:40 +08:00
xy
1fb08ab66c 代码优化 2025-01-14 11:33:35 +08:00
xy
c6e938e7a0 代码优化 2025-01-14 10:31:45 +08:00
xy
52d2dda01c 代码优化 2025-01-13 16:39:20 +08:00
xy
f9926d16f8 代码优化 2025-01-13 16:32:55 +08:00
xy
87fc735969 代码优化 2025-01-13 16:25:41 +08:00
xy
cc22afd877 代码优化 2025-01-13 14:58:00 +08:00
xy
2ec0024d0f 代码优化 2025-01-09 14:37:28 +08:00
xy
f0a22192fb 代码优化 2025-01-08 15:45:17 +08:00
xy
98b901e6ab 代码优化 2025-01-06 18:09:29 +08:00
22 changed files with 590 additions and 125 deletions

View File

@@ -1,6 +1,7 @@
package com.njcn.access.pojo.dto; package com.njcn.access.pojo.dto;
import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName; import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
@@ -17,39 +18,49 @@ import java.util.List;
public class AutoDataDto { public class AutoDataDto {
@SerializedName("Mid") @SerializedName("Mid")
@JsonProperty("Mid")
private Integer mid; private Integer mid;
@SerializedName("Did") @SerializedName("Did")
@JsonProperty("Did")
@ApiModelProperty("逻辑设备 治理逻辑设备为1 电能质量设备为2") @ApiModelProperty("逻辑设备 治理逻辑设备为1 电能质量设备为2")
private Integer did; private Integer did;
@SerializedName("Pri") @SerializedName("Pri")
@JsonProperty("Pri")
private Integer pri; private Integer pri;
@SerializedName("Type") @SerializedName("Type")
@JsonProperty("Type")
private Integer type; private Integer type;
@SerializedName("Msg") @SerializedName("Msg")
@JsonProperty("Msg")
private Msg msg; private Msg msg;
@Data @Data
public static class Msg{ public static class Msg{
@SerializedName("Cldid") @SerializedName("Cldid")
@JsonProperty("Cldid")
@ApiModelProperty("逻辑子设备 治理逻辑设备为0 电能质量设备为1、2") @ApiModelProperty("逻辑子设备 治理逻辑设备为0 电能质量设备为1、2")
private Integer clDid; private Integer clDid;
@SerializedName("DataType") @SerializedName("DataType")
@JsonProperty("DataType")
private Integer dataType; private Integer dataType;
@SerializedName("DataAttr") @SerializedName("DataAttr")
@JsonProperty("DataAttr")
@ApiModelProperty("数据属性:无-0、实时-1、统计-2") @ApiModelProperty("数据属性:无-0、实时-1、统计-2")
private Integer dataAttr; private Integer dataAttr;
@SerializedName("DsNameIdx") @SerializedName("DsNameIdx")
@JsonProperty("DsNameIdx")
private Integer dsNameIdx; private Integer dsNameIdx;
@SerializedName("DataArray") @SerializedName("DataArray")
@JsonProperty("DataArray")
private List<DataArray> dataArray; private List<DataArray> dataArray;
} }
@@ -57,95 +68,119 @@ public class AutoDataDto {
public static class DataArray{ public static class DataArray{
@SerializedName("DataAttr") @SerializedName("DataAttr")
@JsonProperty("DataAttr")
@ApiModelProperty("数据属性 -1-无 0-Rt(实时) 1-Max 2-Min 3-Avg 4-Cp95") @ApiModelProperty("数据属性 -1-无 0-Rt(实时) 1-Max 2-Min 3-Avg 4-Cp95")
private Integer dataAttr; private Integer dataAttr;
@SerializedName("DataTimeSec") @SerializedName("DataTimeSec")
@JsonProperty("DataTimeSec")
private Long dataTimeSec; private Long dataTimeSec;
@SerializedName("DataTimeUSec") @SerializedName("DataTimeUSec")
@JsonProperty("DataTimeUSec")
private Integer dataTimeUSec; private Integer dataTimeUSec;
@SerializedName("DataTag") @SerializedName("DataTag")
@JsonProperty("DataTag")
@ApiModelProperty("数据是否参与合格率统计") @ApiModelProperty("数据是否参与合格率统计")
private Integer dataTag; private Integer dataTag;
@SerializedName("Code") @SerializedName("Code")
@JsonProperty("Code")
@ApiModelProperty("事件码") @ApiModelProperty("事件码")
private Integer code; private String code;
@SerializedName("Data") @SerializedName("Data")
@JsonProperty("Data")
private String data; private String data;
@SerializedName("PrjName") @SerializedName("PrjName")
@JsonProperty("PrjName")
@ApiModelProperty("工程名称") @ApiModelProperty("工程名称")
private String prjName; private String prjName;
@SerializedName("PrjTimeStart") @SerializedName("PrjTimeStart")
@JsonProperty("PrjTimeStart")
@ApiModelProperty("装置启动时间") @ApiModelProperty("装置启动时间")
private Long prjTimeStart; private Long prjTimeStart;
@SerializedName("PrjTimeEnd") @SerializedName("PrjTimeEnd")
@JsonProperty("PrjTimeEnd")
@ApiModelProperty("装置结束时间") @ApiModelProperty("装置结束时间")
private Long prjTimeEnd; private Long prjTimeEnd;
@SerializedName("PrjDataPath") @SerializedName("PrjDataPath")
@JsonProperty("PrjDataPath")
@ApiModelProperty("装置数据路径") @ApiModelProperty("装置数据路径")
private String prjDataPath; private String prjDataPath;
@SerializedName("DevType") @SerializedName("DevType")
@JsonProperty("DevType")
@ApiModelProperty("装置型号") @ApiModelProperty("装置型号")
private String devType; private String devType;
@SerializedName("DevMac") @SerializedName("DevMac")
@JsonProperty("DevMac")
@ApiModelProperty("装置mac地址") @ApiModelProperty("装置mac地址")
private String devMac; private String devMac;
@SerializedName("AppVersion") @SerializedName("AppVersion")
@JsonProperty("AppVersion")
@ApiModelProperty("装置程序版本") @ApiModelProperty("装置程序版本")
private String appVersion; private String appVersion;
@SerializedName("Cldid") @SerializedName("Cldid")
@JsonProperty("Cldid")
@ApiModelProperty("逻辑子设备id") @ApiModelProperty("逻辑子设备id")
private Integer clDid; private Integer clDid;
@SerializedName("StatCycle") @SerializedName("StatCycle")
@JsonProperty("StatCycle")
@ApiModelProperty("统计间隔") @ApiModelProperty("统计间隔")
private Integer statCycle; private Integer statCycle;
@SerializedName("VolGrade") @SerializedName("VolGrade")
@JsonProperty("VolGrade")
@ApiModelProperty("电压等级") @ApiModelProperty("电压等级")
private Float volGrade; private Float volGrade;
@SerializedName("VolConType") @SerializedName("VolConType")
@JsonProperty("VolConType")
@ApiModelProperty("电压接线方式0-星型, 1-角型, 2-V型") @ApiModelProperty("电压接线方式0-星型, 1-角型, 2-V型")
private Integer volConType; private Integer volConType;
@SerializedName("CurConSel") @SerializedName("CurConSel")
@JsonProperty("CurConSel")
@ApiModelProperty("电流接线方式0-正常, 1-合成IB, 2-合成IC") @ApiModelProperty("电流接线方式0-正常, 1-合成IB, 2-合成IC")
private Integer curConSel; private Integer curConSel;
@SerializedName("PtRatio") @SerializedName("PtRatio")
@JsonProperty("PtRatio")
@ApiModelProperty("PT变比") @ApiModelProperty("PT变比")
private Integer ptRatio; private Integer ptRatio;
@SerializedName("CtRatio") @SerializedName("CtRatio")
@JsonProperty("CtRatio")
@ApiModelProperty("ct变比") @ApiModelProperty("ct变比")
private Integer ctRatio; private Integer ctRatio;
@SerializedName("CapacitySscb") @SerializedName("CapacitySscb")
@JsonProperty("CapacitySscb")
@ApiModelProperty("基准短路容量") @ApiModelProperty("基准短路容量")
private Float capacitySscb; private Float capacitySscb;
@SerializedName("CapacitySscmin") @SerializedName("CapacitySscmin")
@JsonProperty("CapacitySscmin")
@ApiModelProperty("最小短路容量") @ApiModelProperty("最小短路容量")
private Float capacitySscmin; private Float capacitySscmin;
@SerializedName("CapacitySt") @SerializedName("CapacitySt")
@JsonProperty("CapacitySt")
@ApiModelProperty("供电设备容量") @ApiModelProperty("供电设备容量")
private Float capacitySt; private Float capacitySt;
@SerializedName("CapacitySi") @SerializedName("CapacitySi")
@JsonProperty("CapacitySi")
@ApiModelProperty("用户协议容量") @ApiModelProperty("用户协议容量")
private Float capacitySi; private Float capacitySi;
} }

View File

@@ -0,0 +1,48 @@
package com.njcn.access.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* <p>
* 治理设备模块运行状态记录表
* </p>
*
* @author xy
* @since 2025-06-26
*/
@Getter
@Setter
@TableName("cs_line_run_data")
public class CsLineRunData implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 监测点id
*/
private String lineId;
/**
* 子模块编号id(没有子模块则为0)
*/
private Integer moduleId;
/**
* 最新数据时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime timeId;
/**
* 子模块通讯状态(0离线 1连接)
*/
private Integer runState;
}

View File

@@ -20,6 +20,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency> <dependency>
<groupId>com.njcn</groupId> <groupId>com.njcn</groupId>
<artifactId>access-api</artifactId> <artifactId>access-api</artifactId>

View File

@@ -0,0 +1,52 @@
package com.njcn.access.controller;
import com.njcn.access.pojo.po.CsLineRunData;
import com.njcn.access.service.ICsLineRunDataService;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* <p>
* 治理设备模块运行状态记录表 前端控制器
* </p>
*
* @author xy
* @since 2025-06-26
*/
@RestController
@RequestMapping("/csLineRunData")
@Slf4j
@Api(tags = "治理设备数据运行记录")
@AllArgsConstructor
public class CsLineRunDataController extends BaseController {
private final ICsLineRunDataService csLineRunDataService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/add")
@ApiOperation("新增数据")
@ApiImplicitParam(name = "list", value = "参数", required = true)
public HttpResult<String> addData(@RequestBody @Validated List<CsLineRunData> list){
String methodDescribe = getMethodDescribe("addData");
csLineRunDataService.addData(list);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -1,6 +1,7 @@
package com.njcn.access.handler; package com.njcn.access.handler;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.excel.util.CollectionUtils; import com.alibaba.excel.util.CollectionUtils;
@@ -20,7 +21,6 @@ import com.njcn.access.pojo.dto.*;
import com.njcn.access.pojo.dto.file.FileDto; import com.njcn.access.pojo.dto.file.FileDto;
import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.dto.file.FileRedisDto;
import com.njcn.access.pojo.param.ReqAndResParam; import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.pojo.po.CsLineModel;
import com.njcn.access.pojo.po.CsTopic; import com.njcn.access.pojo.po.CsTopic;
import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsDeviceOnlineLogsService;
@@ -31,6 +31,7 @@ import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.*; import com.njcn.csdevice.api.*;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.param.CsLineParam; import com.njcn.csdevice.pojo.param.CsLineParam;
import com.njcn.csdevice.pojo.po.*; import com.njcn.csdevice.pojo.po.*;
import com.njcn.device.biz.pojo.po.Overlimit; import com.njcn.device.biz.pojo.po.Overlimit;
@@ -99,6 +100,7 @@ public class MqttMessageHandler {
private final ChannelObjectUtil channelObjectUtil; private final ChannelObjectUtil channelObjectUtil;
private final WaveFeignClient waveFeignClient; private final WaveFeignClient waveFeignClient;
private final RtFeignClient rtFeignClient; private final RtFeignClient rtFeignClient;
private final CsCommunicateFeignClient csCommunicateFeignClient;
@Autowired @Autowired
Validator validator; Validator validator;
@@ -311,34 +313,24 @@ public class MqttMessageHandler {
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode()); csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode());
csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode()); csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode());
//记录设备上线 //记录设备上线
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); PqsCommunicateDto dto = new PqsCommunicateDto();
CsDeviceOnlineLogs csDeviceOnlineLogs = new CsDeviceOnlineLogs(); dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
if(Objects.isNull(record)) { dto.setDevId(nDid);
csDeviceOnlineLogs.setNdid(nDid); dto.setType(1);
csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now()); dto.setDescription("通讯正常");
onlineLogsService.save(csDeviceOnlineLogs); csCommunicateFeignClient.insertion(dto);
} else {
LocalDateTime time = record.getOfflineTime();
if (!Objects.isNull(time)){
csDeviceOnlineLogs.setNdid(nDid);
csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now());
onlineLogsService.save(csDeviceOnlineLogs);
} else {
record.setOnlineTime(LocalDateTime.now());
onlineLogsService.updateById(record);
}
}
//接入后系统重置装置心跳
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L);
//修改redis的mid
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
redisUtil.saveByKeyWithExpire("online" + nDid,"online",10L);
//询问设备软件信息 //询问设备软件信息
askDevData(nDid,version,1,mid); askDevData(nDid,version,1,mid);
//更新治理监测点信息和设备容量 //更新治理监测点信息和设备容量
askDevData(nDid,version,2,(res.getMid()+1)); askDevData(nDid,version,2,(res.getMid()+1));
//更新电网侧、负载侧监测点信息 //更新电网侧、负载侧监测点信息
askDevData(nDid,version,3,(res.getMid()+1)); askDevData(nDid,version,3,(res.getMid()+1));
//接入后系统重置装置心跳
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L);
//修改redis的mid
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
//接入成功标识
redisUtil.saveByKeyWithExpire("online" + nDid,"online",10L);
//录波任务倒计时 //录波任务倒计时
redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L); redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L);
} else { } else {
@@ -436,6 +428,13 @@ public class MqttMessageHandler {
} }
} }
break; break;
case 15:
log.info("{}模块{}:处理实时数据", nDid, rspDataDto.getClDid());
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(res));
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class);
appAutoDataMessage.setId(nDid);
rtFeignClient.apfRtAnalysis(appAutoDataMessage);
break;
case 48: case 48:
log.info("询问装置项目列表"); log.info("询问装置项目列表");
logDto.setUserName("运维管理员"); logDto.setUserName("运维管理员");
@@ -460,10 +459,16 @@ public class MqttMessageHandler {
break; break;
} }
//csLogsFeignClient.addUserLog(logDto); //csLogsFeignClient.addUserLog(logDto);
} else {
String result = getEnum(res.getCode());
log.info(result);
logDto.setResult(0);
logDto.setFailReason(result);
csLogsFeignClient.addUserLog(logDto);
throw new BusinessException(result);
} }
} }
/** /**
* 装置心跳 && 主动数据上送 * 装置心跳 && 主动数据上送
* fixme 这边由于接收文件数据时间跨度会很长,途中有其他请求进来会中断之前的程序,目前是记录中断的位置,等处理完成再继续请求接收文件 * fixme 这边由于接收文件数据时间跨度会很长,途中有其他请求进来会中断之前的程序,目前是记录中断的位置,等处理完成再继续请求接收文件
@@ -741,4 +746,41 @@ public class MqttMessageHandler {
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
} }
public String getEnum(Integer code) {
String result = null;
switch (code) {
case 201:
result = AccessEnum.START_CHANNEL.getMessage();
break;
case 202:
result = AccessEnum.WAIT_CHANNEL.getMessage();
break;
case 400:
result = AccessEnum.FAIL.getMessage();
break;
case 401:
result = AccessEnum.ERROR.getMessage();
break;
case 402:
result = AccessEnum.REFUSE_WAIT.getMessage();
break;
case 403:
result = AccessEnum.REFUSE_UNKNOWN.getMessage();
break;
case 404:
result = AccessEnum.NOT_FIND.getMessage();
break;
case 405:
result = AccessEnum.BUSY.getMessage();
break;
case 406:
result = AccessEnum.TIME_OUT.getMessage();
break;
default:
result = AccessEnum.OTHER_ERROR.getMessage();
break;
}
return result;
}
} }

View File

@@ -1,5 +1,6 @@
package com.njcn.access.listener; package com.njcn.access.listener;
import cn.hutool.core.date.DatePattern;
import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.dto.NoticeUserDto; import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
@@ -10,11 +11,9 @@ import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.MqttUtil; import com.njcn.access.utils.MqttUtil;
import com.njcn.access.utils.SendMessageUtil; import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.CsDeviceUserFeignClient; import com.njcn.csdevice.api.*;
import com.njcn.csdevice.api.CsLedgerFeignClient;
import com.njcn.csdevice.api.CsLogsFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.dto.DevDetailDTO; import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
@@ -75,6 +74,9 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private SendMessageUtil sendMessageUtil; private SendMessageUtil sendMessageUtil;
@Resource @Resource
private CsDeviceServiceImpl csDeviceServiceImpl; private CsDeviceServiceImpl csDeviceServiceImpl;
@Resource
private CsCommunicateFeignClient csCommunicateFeignClient;
private final Object lock = new Object(); private final Object lock = new Object();
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
@@ -116,9 +118,16 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
logDto.setOperate(nDid +"装置离线"); logDto.setOperate(nDid +"装置离线");
sendMessage(nDid); sendMessage(nDid);
//记录装置掉线时间 //记录装置掉线时间
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); // CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
record.setOfflineTime(LocalDateTime.now()); // record.setOfflineTime(LocalDateTime.now());
onlineLogsService.updateById(record); // onlineLogsService.updateById(record);
//记录装置掉线时间
PqsCommunicateDto dto = new PqsCommunicateDto();
dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
dto.setDevId(nDid);
dto.setType(0);
dto.setDescription("通讯中断");
csCommunicateFeignClient.insertion(dto);
csLogsFeignClient.addUserLog(logDto); csLogsFeignClient.addUserLog(logDto);
} }

View File

@@ -0,0 +1,16 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.access.pojo.po.CsLineRunData;
/**
* <p>
* 治理设备模块运行状态记录表 Mapper 接口
* </p>
*
* @author xy
* @since 2025-06-26
*/
public interface CsLineRunDataMapper extends BaseMapper<CsLineRunData> {
}

View File

@@ -9,7 +9,6 @@ import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DictTreeFeignClient; import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.enums.DicDataEnum; import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.enums.DicTreeEnum;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@@ -88,20 +87,28 @@ public class AccessApplicationRunner implements ApplicationRunner {
} }
public void accessDev(List<CsEquipmentDeliveryPO> list) { public void accessDev(List<CsEquipmentDeliveryPO> list) {
list.forEach(item->{ if (CollUtil.isNotEmpty(list)) {
System.out.println(Thread.currentThread().getName() + ": reboot : nDid : " + item.getNdid()); try {
String version = csTopicService.getVersion(item.getNdid()); list.forEach(item->{
if (!Objects.isNull(version)){ System.out.println(Thread.currentThread().getName() + ": reboot : nDid : " + item.getNdid());
//判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) { if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) {
csDeviceService.wlDevRegister(item.getNdid()); //csDeviceService.wlDevRegister(item.getNdid());
} else { log.info("请先手动注册、接入");
csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); } else {
} String version = csTopicService.getVersion(item.getNdid());
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); if (Objects.isNull(version)) {
version = "V1";
}
csDeviceService.autoAccess(item.getNdid(),version,1);
}
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1);
});
} catch (Exception e) {
log.error(e.getMessage());
} }
}); }
} }
} }

View File

@@ -9,7 +9,6 @@ import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DictTreeFeignClient; import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.enums.DicDataEnum; import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.enums.DicTreeEnum;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@@ -45,57 +44,71 @@ public class AutoAccessTimer implements ApplicationRunner {
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
ExecutorService executor = Executors.newFixedThreadPool(10); ExecutorService executor = Executors.newFixedThreadPool(10);
// 将任务平均分配给10个子列表 // 将任务平均分配给10个子列表
List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>(); List<List<CsEquipmentDeliveryPO>> subLists = CollUtil.split(list, 10);
int partitionSize = list.size() / 10; // List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>();
for (int i = 0; i < 10; i++) { // int partitionSize = list.size() / 10;
int start = i * partitionSize; // for (int i = 0; i < 10; i++) {
int end = (i == 9) ? list.size() : start + partitionSize; // int start = i * partitionSize;
subLists.add(list.subList(start, end)); // int end = (i == 9) ? list.size() : start + partitionSize;
} // subLists.add(list.subList(start, end));
// }
// 创建一个ExecutorService来处理这些任务 // 创建一个ExecutorService来处理这些任务
List<Future<Void>> futures = new ArrayList<>(); List<Future<Void>> futures = new ArrayList<>();
// 提交任务给线程池执行 // 提交任务给线程池执行
for (int i = 0; i < 10; i++) { // for (int i = 0; i < 10; i++) {
int index = i; // int index = i;
futures.add(executor.submit(new Callable<Void>() { // futures.add(executor.submit(() -> {
@Override // accessDev(subLists.get(index));
public Void call() { // return null;
accessDev(subLists.get(index)); // }));
return null; // }
} for (List<CsEquipmentDeliveryPO> subList : subLists) {
futures.add(executor.submit(() -> {
accessDev(subList);
return null;
})); }));
} }
// 等待所有任务完成 // 等待所有任务完成
for (Future<Void> future : futures) { for (Future<Void> future : futures) {
try { try {
future.get(); future.get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); Thread.currentThread().interrupt();
log.error("任务被中断", e);
} catch (ExecutionException e) {
log.error("任务执行异常", e.getCause());
} }
} }
// 关闭ExecutorService // 关闭ExecutorService
executor.shutdown(); executor.shutdown();
} }
}; };
//第一次执行的时间为120s然后每隔120s执行一次 //第一次执行的时间为120s然后在前一个任务执行完毕后,等待120s执行下一个任务
scheduler.scheduleAtFixedRate(task,AUTO_TIME,AUTO_TIME,TimeUnit.SECONDS); scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS);
} }
public void accessDev(List<CsEquipmentDeliveryPO> list) { public void accessDev(List<CsEquipmentDeliveryPO> list) {
list.forEach(item->{ if (CollUtil.isNotEmpty(list)) {
System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); try {
String version = csTopicService.getVersion(item.getNdid()); list.forEach(item -> {
if (!Objects.isNull(version)){ System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid());
//判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) { if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) {
csDeviceService.wlDevRegister(item.getNdid()); //csDeviceService.wlDevRegister(item.getNdid());
} else { log.info("请先手动注册、接入");
csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); } else {
} String version = csTopicService.getVersion(item.getNdid());
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); if (Objects.isNull(version)) {
version = "V1";
}
csDeviceService.autoAccess(item.getNdid(), version, 1);
}
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
});
} catch (Exception e) {
log.error(e.getMessage());
} }
}); }
} }
} }

View File

@@ -0,0 +1,20 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.access.pojo.po.CsLineRunData;
import java.util.List;
/**
* <p>
* 治理设备模块运行状态记录表 服务类
* </p>
*
* @author xy
* @since 2025-06-26
*/
public interface ICsLineRunDataService extends IService<CsLineRunData> {
void addData(List<CsLineRunData> list);
}

View File

@@ -91,6 +91,12 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
json = JsonUtil.convertStreamToString(file.getInputStream()); json = JsonUtil.convertStreamToString(file.getInputStream());
Gson gson = new Gson(); Gson gson = new Gson();
TemplateDto templateDto = gson.fromJson(json, TemplateDto.class); TemplateDto templateDto = gson.fromJson(json, TemplateDto.class);
//判断设备型号
String devType = templateDto.getDevType();
DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData();
if (Objects.isNull(dictTreeVO)){
throw new BusinessException(AccessResponseEnum.DEV_TYPE_NOT_FIND);
}
logDto.setOperate("新增设备模板,模板名称:" + templateDto.getDevType()); logDto.setOperate("新增设备模板,模板名称:" + templateDto.getDevType());
//模板文件存入文件服务器 //模板文件存入文件服务器
String filePath = fileStorageUtil.uploadMultipart(file, OssPath.DEV_MODEL + templateDto.getDevType() + "_"); String filePath = fileStorageUtil.uploadMultipart(file, OssPath.DEV_MODEL + templateDto.getDevType() + "_");
@@ -838,9 +844,6 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
List<DataSetDto> dataSetList = templateDto.getDataSet(); List<DataSetDto> dataSetList = templateDto.getDataSet();
String devType = templateDto.getDevType(); String devType = templateDto.getDevType();
DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData(); DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData();
if (Objects.isNull(dictTreeVO)){
throw new BusinessException(AccessResponseEnum.DEV_TYPE_NOT_FIND);
}
String code = dictTreeFeignClient.queryById(dictTreeVO.getPid()).getData().getCode(); String code = dictTreeFeignClient.queryById(dictTreeVO.getPid()).getData().getCode();
//逻辑设备录入 //逻辑设备录入
if (CollectionUtil.isNotEmpty(dataSetList)){ if (CollectionUtil.isNotEmpty(dataSetList)){

View File

@@ -38,6 +38,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@@ -370,6 +371,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
} }
@Override @Override
// @Transactional(rollbackFor = Exception.class)
public String wlDevRegister(String nDid) { public String wlDevRegister(String nDid) {
String result = "fail"; String result = "fail";
// 设备状态判断 // 设备状态判断
@@ -444,15 +446,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csEquipmentDeliveryService.updateProcessBynDid(nDid,4); csEquipmentDeliveryService.updateProcessBynDid(nDid,4);
//7.存储日志 //7.存储日志
csLogsFeignClient.addUserLog(logDto); csLogsFeignClient.addUserLog(logDto);
//8.存储设备调试日志表
CsEquipmentProcessPO csEquipmentProcess = new CsEquipmentProcessPO();
csEquipmentProcess.setDevId(nDid);
csEquipmentProcess.setOperator(RequestUtil.getUserIndex());
csEquipmentProcess.setStartTime(LocalDateTime.now());
csEquipmentProcess.setEndTime(LocalDateTime.now());
csEquipmentProcess.setProcess(4);
csEquipmentProcess.setStatus(1);
processFeignClient.add(csEquipmentProcess);
//9.删除redis监测点模板信息 //9.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid); redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.delete(AppRedisKey.LINE + nDid); redisUtil.delete(AppRedisKey.LINE + nDid);
@@ -604,6 +597,78 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
return result; return result;
} }
/**
* 装置重新接入系统,需要校验所用的模板
* @param nDid
* @param version
*/
@Transactional(rollbackFor = Exception.class)
public boolean autoAccess(String nDid,String version,Integer mid) {
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (!mqttClient) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "接入失败,装置客户端不在线");
csLogsFeignClient.addUserLog(logDto);
throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE);
}
boolean result = false;
Map<Integer,String> modelMap = new HashMap<>();
try {
//删除缓存数据
redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
//询问装置当前所用模板
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(getJson(mid,TypeEnum.TYPE_3.getCode())),1,false);
//接收到模板,判断模板是否存在,替换模板,发起接入
Thread.sleep(2000);
List<CsModelDto> modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class);
if (CollUtil.isNotEmpty(modelId)) {
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
//重新录入装置和模板关系信息
for (CsModelDto item : modelId) {
CsDevModelRelationPO po = new CsDevModelRelationPO();
po.setDevId(vo.getId());
po.setModelId(item.getModelId());
po.setDid(item.getDid());
po.setUpdateTime(LocalDateTime.now());
csDevModelRelationService.addRelation(po);
modelMap.put(item.getType(),item.getModelId());
}
//修改监测点使用的模板和数据集
List<CsLinePO> lineList;
Object object = redisUtil.getObjectByKey("accessLineInfo:" + nDid);
if (Objects.isNull(object)) {
lineList = csLineFeignClient.findByNdid(nDid).getData();
for (CsLinePO item : lineList) {
if (item.getClDid() == 0) {
updateLineIds(modelMap.get(0),item.getClDid(),nDid);
} else {
updateLineIds(modelMap.get(1),item.getClDid(),nDid);
}
}
}
//发起接入
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())),1,false);
//录波任务倒计时
redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L);
result = true;
}
} catch (Exception e) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "装置接入失败");
csLogsFeignClient.addUserLog(logDto);
throw new BusinessException(e.getMessage());
}
return result;
}
/** /**
* 组装报文 * 组装报文
*/ */

View File

@@ -1,5 +1,6 @@
package com.njcn.access.service.impl; package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessEnum;
@@ -16,6 +17,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@@ -97,20 +99,22 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
.ne(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.DEL.getCode()) .ne(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.DEL.getCode())
.eq(CsEquipmentDeliveryPO::getUsageStatus,1) .eq(CsEquipmentDeliveryPO::getUsageStatus,1)
.list(); .list();
list.forEach(item->{ if (CollUtil.isNotEmpty(list)) {
String clientName = "NJCN-" + item.getNdid().substring(item.getNdid().length() - 6); list.forEach(item->{
boolean mqttClient = mqttUtil.judgeClientOnline(clientName); String clientName = "NJCN-" + item.getNdid().substring(item.getNdid().length() - 6);
if (mqttClient) { boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
result.add(item); if (mqttClient) {
} else { result.add(item);
DeviceLogDTO logDto = new DeviceLogDTO(); } else {
logDto.setUserName("运维管理员"); DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setLoginName("njcnyw"); logDto.setUserName("运维管理员");
logDto.setResult(1); logDto.setLoginName("njcnyw");
logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线"); logDto.setResult(1);
csLogsFeignClient.addUserLog(logDto); logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线");
} csLogsFeignClient.addUserLog(logDto);
}); }
});
}
return result; return result;
} }
@@ -120,21 +124,24 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
List<CsEquipmentDeliveryPO> list = this.lambdaQuery() List<CsEquipmentDeliveryPO> list = this.lambdaQuery()
.eq(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.OFFLINE.getCode()) .eq(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.OFFLINE.getCode())
.eq(CsEquipmentDeliveryPO::getUsageStatus,1) .eq(CsEquipmentDeliveryPO::getUsageStatus,1)
.in(CsEquipmentDeliveryPO::getStatus, Arrays.asList(2,3))
.list(); .list();
list.forEach(item->{ if (CollUtil.isNotEmpty(list)) {
String clientName = "NJCN-" + item.getNdid().substring(item.getNdid().length() - 6); list.forEach(item->{
boolean mqttClient = mqttUtil.judgeClientOnline(clientName); String clientName = "NJCN-" + item.getNdid().substring(item.getNdid().length() - 6);
if (mqttClient) { boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
result.add(item); if (mqttClient) {
} else { result.add(item);
DeviceLogDTO logDto = new DeviceLogDTO(); } else {
logDto.setUserName("运维管理员"); DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setLoginName("njcnyw"); logDto.setUserName("运维管理员");
logDto.setResult(1); logDto.setLoginName("njcnyw");
logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线"); logDto.setResult(1);
csLogsFeignClient.addUserLog(logDto); logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线");
} csLogsFeignClient.addUserLog(logDto);
}); }
});
}
return result; return result;
} }

View File

@@ -0,0 +1,26 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsLineRunDataMapper;
import com.njcn.access.pojo.po.CsLineRunData;
import com.njcn.access.service.ICsLineRunDataService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* <p>
* 治理设备模块运行状态记录表 服务实现类
* </p>
*
* @author xy
* @since 2025-06-26
*/
@Service
public class CsLineRunDataServiceImpl extends ServiceImpl<CsLineRunDataMapper, CsLineRunData> implements ICsLineRunDataService {
@Override
public void addData(List<CsLineRunData> list) {
this.saveBatch(list);
}
}

View File

@@ -5,7 +5,11 @@ import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.rt.api.fallback.RtClientFallbackFactory; import com.njcn.rt.api.fallback.RtClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.Map;
/** /**
* @author xy * @author xy
@@ -15,4 +19,7 @@ public interface RtFeignClient {
@PostMapping("/rtAnalysis") @PostMapping("/rtAnalysis")
HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage); HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage);
@PostMapping("/apfRtAnalysis")
HttpResult<Map<String,Float>> apfRtAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage);
} }

View File

@@ -9,6 +9,8 @@ import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map;
/** /**
* @author xy * @author xy
*/ */
@@ -27,7 +29,13 @@ public class RtClientFallbackFactory implements FallbackFactory<RtFeignClient> {
@Override @Override
public HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage) { public HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage) {
log.error("{}异常,降级处理,异常为:{}","实时数据解析",cause.toString()); log.error("{}异常,降级处理,异常为:{}","便携式实时数据解析",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<Map<String,Float>> apfRtAnalysis(AppAutoDataMessage appAutoDataMessage) {
log.error("{}异常,降级处理,异常为:{}","APF实时数据解析",cause.toString());
throw new BusinessException(finalExceptionEnum); throw new BusinessException(finalExceptionEnum);
} }
}; };

View File

@@ -19,6 +19,8 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/** /**
* 类的介绍: * 类的介绍:
* *
@@ -45,5 +47,14 @@ public class RtController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
} }
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/apfRtAnalysis")
@ApiOperation("APF实时数据解析")
@ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true)
public HttpResult<String> apfRtAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage){
String methodDescribe = getMethodDescribe("apfRtAnalysis");
rtService.apfRtAnalysis(appAutoDataMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
} }

View File

@@ -8,4 +8,10 @@ import com.njcn.mq.message.AppAutoDataMessage;
public interface IRtService { public interface IRtService {
void analysis(AppAutoDataMessage appAutoDataMessage); void analysis(AppAutoDataMessage appAutoDataMessage);
/**
* APF数据解析
* @param appAutoDataMessage
*/
void apfRtAnalysis(AppAutoDataMessage appAutoDataMessage);
} }

View File

@@ -97,6 +97,36 @@ public class RtServiceImpl implements IRtService {
} }
} }
@Override
public void apfRtAnalysis(AppAutoDataMessage appAutoDataMessage) {
List<CsDataArray> dataArrayList;
String lineId;
//监测点id
if (appAutoDataMessage.getDid() == 1){
lineId = appAutoDataMessage.getId() + "0";
} else {
lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
}
//获取监测点基础信息
CsLinePO po = csLineFeignClient.getById(lineId).getData();
//获取数据集 dataSet
Integer idx = appAutoDataMessage.getMsg().getDsNameIdx();
CsDataSet dataSet = dataSetFeignClient.getDataSetByIdx(po.getDataModelId(),idx).getData();
String key = "BaseRealData:" + lineId + idx;
Object object = redisUtil.getObjectByKey(key);
if (Objects.isNull(object)){
dataArrayList = saveBaseRealDataSet(key,dataSet.getId());
} else {
dataArrayList = channelObjectUtil.objectToList(object,CsDataArray.class);
}
//根据dataArray解析数据
AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0);
Map<String,Float> map = getData(dataArrayList,item);
int data = Math.round(map.get("Apf_ModWorkingSts" + "M"));
redisUtil.saveByKeyWithExpire("ApfRtData:" + appAutoDataMessage.getMid(),data,10L);
}
/** /**
* 时间处理 * 时间处理
*/ */

View File

@@ -2,14 +2,23 @@ package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csharmonic.pojo.po.CsEventPO; import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.message.AppEventMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.EpdFeignClient; import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.param.EleEpdPqdParam; import com.njcn.system.pojo.param.EleEpdPqdParam;
import com.njcn.system.pojo.po.EleEpdPqd; import com.njcn.system.pojo.po.EleEpdPqd;
import com.njcn.system.pojo.po.SysDicTreePO;
import com.njcn.zlevent.mapper.CsEventMapper; import com.njcn.zlevent.mapper.CsEventMapper;
import com.njcn.zlevent.pojo.po.CsEventLogs; import com.njcn.zlevent.pojo.po.CsEventLogs;
import com.njcn.zlevent.service.ICsAlarmService; import com.njcn.zlevent.service.ICsAlarmService;
@@ -24,8 +33,11 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import static com.njcn.csdevice.enums.AlgorithmResponseEnum.DATA_ERROR;
/** /**
* <p> * <p>
* 告警事件表 服务实现类 * 告警事件表 服务实现类
@@ -43,28 +55,57 @@ public class CsAlarmServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
private final SendEventUtils sendEventUtils; private final SendEventUtils sendEventUtils;
private final ICsEventLogsService csEventLogsService; private final ICsEventLogsService csEventLogsService;
private final EpdFeignClient epdFeignClient; private final EpdFeignClient epdFeignClient;
private final RedisUtil redisUtil;
private final ChannelObjectUtil channelObjectUtil;
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void analysis(AppEventMessage appEventMessage) { public void analysis(AppEventMessage appEventMessage) {
List<CsEventPO> list1 = new ArrayList<>(); List<CsEventPO> list1 = new ArrayList<>();
LocalDateTime eventTime = null; LocalDateTime eventTime = null;
String tag = null; String tag = null, lineId = null;
String id = IdUtil.fastSimpleUUID(); String id = IdUtil.fastSimpleUUID();
//获取装置id //获取装置id
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData(); CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData();
List<SysDicTreePO> dictTreeList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE),SysDicTreePO.class);
String code = dictTreeList.stream().filter(item->Objects.equals(item.getId(),po.getDevType())).findFirst().orElse(null).getCode();
try { try {
//便携式设备
if (Objects.equals(DicDataEnum.PORTABLE.getCode(),code)) {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
//直连设备
else if (Objects.equals(DicDataEnum.CONNECT_DEV.getCode(),code)) {
if (Objects.equals(appEventMessage.getDid(),1)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
} else if (Objects.equals(appEventMessage.getDid(),2)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
}
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray(); List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) { for (AppEventMessage.DataArray item : dataArray) {
eventTime = eventService.timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); eventTime = eventService.timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
tag = item.getName();
//判断各模块事件,如果上次模块事件和这次一致,则不记录
CsEventPO csEventPO = this.lambdaQuery().eq(CsEventPO::getLineId,lineId)
.eq(CsEventPO::getClDid,appEventMessage.getMsg().getClDid())
.eq(CsEventPO::getProcess,po.getProcess())
.orderByDesc(CsEventPO::getStartTime).last("LIMIT 1").one();
if (csEventPO != null) {
if (Objects.equals(csEventPO.getTag(),tag)) {
throw new BusinessException(DATA_ERROR);
}
}
//事件入库 //事件入库
CsEventPO csEvent = new CsEventPO(); CsEventPO csEvent = new CsEventPO();
csEvent.setLineId(lineId);
csEvent.setId(id); csEvent.setId(id);
csEvent.setDeviceId(po.getId()); csEvent.setDeviceId(po.getId());
csEvent.setProcess(po.getProcess()); csEvent.setProcess(po.getProcess());
csEvent.setCode(item.getCode()); csEvent.setCode(item.getCode());
csEvent.setStartTime(eventTime); csEvent.setStartTime(eventTime);
tag = item.getName();
csEvent.setTag(tag); csEvent.setTag(tag);
csEvent.setType(3); csEvent.setType(3);
csEvent.setClDid(appEventMessage.getMsg().getClDid()); csEvent.setClDid(appEventMessage.getMsg().getClDid());
@@ -91,6 +132,7 @@ public class CsAlarmServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
} }
} catch (Exception e) { } catch (Exception e) {
CsEventLogs csEventLogs = new CsEventLogs(); CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(po.getId()); csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(eventTime); csEventLogs.setStartTime(eventTime);
csEventLogs.setTag(tag); csEventLogs.setTag(tag);

View File

@@ -92,7 +92,19 @@ public class EventServiceImpl implements IEventService {
//获取设备类型 true:治理设备 false:其他类型的设备 //获取设备类型 true:治理设备 false:其他类型的设备
boolean devModel = equipmentFeignClient.judgeDevModel(appEventMessage.getId()).getData(); boolean devModel = equipmentFeignClient.judgeDevModel(appEventMessage.getId()).getData();
try { try {
lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid(); // lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid();
if (devModel) {
if (Objects.equals(appEventMessage.getDid(),1)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
} else if (Objects.equals(appEventMessage.getDid(),2)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
} else {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
//处理事件数据 //处理事件数据
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray(); List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) { for (AppEventMessage.DataArray item : dataArray) {

View File

@@ -59,7 +59,7 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDa
rtFeignClient.analysis(appAutoDataMessage); rtFeignClient.analysis(appAutoDataMessage);
break; break;
case 2: case 2:
log.info(appAutoDataMessage.getKey() + "分发至统计数据"); log.info("{}分发至统计数据", appAutoDataMessage.getKey());
statFeignClient.analysis(appAutoDataMessage); statFeignClient.analysis(appAutoDataMessage);
break; break;
default: default: