4 Commits

Author SHA1 Message Date
xy
0f532033b0 fix(mq): 修正消息消费成功后的Redis缓存时间 2026-05-07 13:29:08 +08:00
xy
b418855d02 refactor(realtime): 优化实时数据分析服务设备接入处理逻辑
- 移除CldDevRunFlagConsumer中的注释代码,启用设备状态翻转功能
- 添加channelData2方法支持第二种设备类型的数据处理
- 在统计服务中新增通信记录功能,记录设备上线状态
2026-04-29 13:57:22 +08:00
caozehui
9774172c0b 升级调整 2026-04-29 08:52:39 +08:00
xy
da75f74218 refactor(rt): 移除FloatUtils依赖并优化数据处理
- 优化harmRealDataSet数据赋值方式,移除不必要的浮点数精度处理
2026-04-24 09:42:26 +08:00
9 changed files with 325 additions and 197 deletions

View File

@@ -11,8 +11,10 @@ import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DataSetFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDataSet;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.redis.utils.RedisUtil;
@@ -21,7 +23,6 @@ import com.njcn.rt.pojo.dto.BaseRealDataSet;
import com.njcn.rt.pojo.dto.HarmData;
import com.njcn.rt.pojo.dto.HarmRealDataSet;
import com.njcn.rt.service.IRtService;
import com.njcn.web.utils.FloatUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -51,6 +52,7 @@ public class RtServiceImpl implements IRtService {
private final ChannelObjectUtil channelObjectUtil;
private final MqttPublisher publisher;
private final RedisSetUtil redisSetUtil;
private final EquipmentFeignClient equipmentFeignClient;
@Override
public void analysis(AppAutoDataMessage appAutoDataMessage) {
@@ -74,12 +76,14 @@ public class RtServiceImpl implements IRtService {
}
//根据dataArray解析数据
AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0);
//获取设备类型
CsEquipmentDeliveryPO po1 =equipmentFeignClient.getDevByLineId(lineId).getData();
//fixme 这边先根据数据集的名称来返回对应实体,这边感觉不太合适,后期有好方案再调整
//基础数据
if (dataSet.getName().contains("Ds$Pqd$Rt$Basic$")) {
//用户Id
String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString();
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType());
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
@@ -94,7 +98,7 @@ public class RtServiceImpl implements IRtService {
if (ObjectUtil.isNotNull(redisObject)) {
Set<String> userSet = redisSetUtil.convertToSet(redisObject);
userSet.forEach(userId->{
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType());
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType(),po1.getDevAccessMethod());
baseRealDataSet.setUserId(userId);
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
@@ -102,7 +106,7 @@ public class RtServiceImpl implements IRtService {
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec();
baseRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + userId, new Gson().toJson(baseRealDataSet), 1, false);
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false);
});
}
}
@@ -201,114 +205,194 @@ public class RtServiceImpl implements IRtService {
return dataMap;
}
public BaseRealDataSet assembleData(List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray dataArray,Integer conType) {
public BaseRealDataSet assembleData(List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray dataArray,Integer conType,String accessMethod) {
Map<String,Float> dataMap = getData(dataArrayList,dataArray);
return channelData(dataMap,conType);
if (Objects.equals("CLD",accessMethod)) {
return channelData(dataMap,conType);
} else {
return channelData2(dataMap);
}
}
public BaseRealDataSet channelData(Map<String,Float> map,Integer conType) {
BaseRealDataSet baseRealDataSet = new BaseRealDataSet();
//频率
baseRealDataSet.setFreq(FloatUtils.get2Float(map.get("Pq_FreqM")));
baseRealDataSet.setFreq(map.get("Pq_FreqM"));
//频率偏差
baseRealDataSet.setFreqDev(FloatUtils.get2Float(map.get("Pq_FreqDevM")));
baseRealDataSet.setFreqDev(map.get("Pq_FreqDevM"));
//判断监测点的接线方式,不同接线方式电压有效值取值不同
//星型-相电压 角形、V型-线电压
//电压有效值
if (conType == 0) {
baseRealDataSet.setVRmsA(FloatUtils.get2Float(map.get("Pq_RmsUA")));
baseRealDataSet.setVRmsB(FloatUtils.get2Float(map.get("Pq_RmsUB")));
baseRealDataSet.setVRmsC(FloatUtils.get2Float(map.get("Pq_RmsUC")));
baseRealDataSet.setVRmsA(map.get("Pq_RmsUA"));
baseRealDataSet.setVRmsB(map.get("Pq_RmsUB"));
baseRealDataSet.setVRmsC(map.get("Pq_RmsUC"));
} else {
baseRealDataSet.setVRmsA(FloatUtils.get2Float(map.get("Pq_RmsLUAB")));
baseRealDataSet.setVRmsB(FloatUtils.get2Float(map.get("Pq_RmsLUBC")));
baseRealDataSet.setVRmsC(FloatUtils.get2Float(map.get("Pq_RmsLUCA")));
baseRealDataSet.setVRmsA(map.get("Pq_RmsLUAB"));
baseRealDataSet.setVRmsB(map.get("Pq_RmsLUBC"));
baseRealDataSet.setVRmsC(map.get("Pq_RmsLUCA"));
}
//基波电压幅值
if (conType == 0) {
baseRealDataSet.setV1A(FloatUtils.get2Float(map.get("Pq_RmsFundUA")));
baseRealDataSet.setV1B(FloatUtils.get2Float(map.get("Pq_RmsFundUB")));
baseRealDataSet.setV1C(FloatUtils.get2Float(map.get("Pq_RmsFundUC")));
baseRealDataSet.setV1A(map.get("Pq_RmsFundUA"));
baseRealDataSet.setV1B(map.get("Pq_RmsFundUB"));
baseRealDataSet.setV1C(map.get("Pq_RmsFundUC"));
} else {
baseRealDataSet.setV1A(FloatUtils.get2Float(map.get("Pq_RmsFundLUAB")));
baseRealDataSet.setV1B(FloatUtils.get2Float(map.get("Pq_RmsFundLUBC")));
baseRealDataSet.setV1C(FloatUtils.get2Float(map.get("Pq_RmsFundLUCA")));
baseRealDataSet.setV1A(map.get("Pq_RmsFundLUAB"));
baseRealDataSet.setV1B(map.get("Pq_RmsFundLUBC"));
baseRealDataSet.setV1C(map.get("Pq_RmsFundLUCA"));
}
//电流有效值
baseRealDataSet.setIRmsA(FloatUtils.get2Float(map.get("Pq_RmsIA")));
baseRealDataSet.setIRmsB(FloatUtils.get2Float(map.get("Pq_RmsIB")));
baseRealDataSet.setIRmsC(FloatUtils.get2Float(map.get("Pq_RmsIC")));
baseRealDataSet.setIRmsA(map.get("Pq_RmsIA"));
baseRealDataSet.setIRmsB(map.get("Pq_RmsIB"));
baseRealDataSet.setIRmsC(map.get("Pq_RmsIC"));
//基波电流幅值
baseRealDataSet.setI1A(FloatUtils.get2Float(map.get("Pq_RmsFundIA")));
baseRealDataSet.setI1B(FloatUtils.get2Float(map.get("Pq_RmsFundIB")));
baseRealDataSet.setI1C(FloatUtils.get2Float(map.get("Pq_RmsFundIC")));
baseRealDataSet.setI1A(map.get("Pq_RmsFundIA"));
baseRealDataSet.setI1B(map.get("Pq_RmsFundIB"));
baseRealDataSet.setI1C(map.get("Pq_RmsFundIC"));
//电压偏差
if (conType == 0) {
baseRealDataSet.setVDevA(FloatUtils.get2Float(map.get("Pq_UDevA")));
baseRealDataSet.setVDevB(FloatUtils.get2Float(map.get("Pq_UDevB")));
baseRealDataSet.setVDevC(FloatUtils.get2Float(map.get("Pq_UDevC")));
baseRealDataSet.setVDevA(map.get("Pq_UDevA"));
baseRealDataSet.setVDevB(map.get("Pq_UDevB"));
baseRealDataSet.setVDevC(map.get("Pq_UDevC"));
} else {
baseRealDataSet.setVDevA(FloatUtils.get2Float(map.get("Pq_LUDevAB")));
baseRealDataSet.setVDevB(FloatUtils.get2Float(map.get("Pq_LUDevBC")));
baseRealDataSet.setVDevC(FloatUtils.get2Float(map.get("Pq_LUDevCA")));
baseRealDataSet.setVDevA(map.get("Pq_LUDevAB"));
baseRealDataSet.setVDevB(map.get("Pq_LUDevBC"));
baseRealDataSet.setVDevC(map.get("Pq_LUDevCA"));
}
//基波电压相位
if (conType == 0) {
baseRealDataSet.setV1AngA(FloatUtils.get2Float(map.get("Pq_FundUAngA")));
baseRealDataSet.setV1AngB(FloatUtils.get2Float(map.get("Pq_FundUAngB")));
baseRealDataSet.setV1AngC(FloatUtils.get2Float(map.get("Pq_FundUAngC")));
baseRealDataSet.setV1AngA(map.get("Pq_FundUAngA"));
baseRealDataSet.setV1AngB(map.get("Pq_FundUAngB"));
baseRealDataSet.setV1AngC(map.get("Pq_FundUAngC"));
} else {
baseRealDataSet.setV1AngA(FloatUtils.get2Float(map.get("Pq_FundLUAngAB")));
baseRealDataSet.setV1AngB(FloatUtils.get2Float(map.get("Pq_FundLUAngBC")));
baseRealDataSet.setV1AngC(FloatUtils.get2Float(map.get("Pq_FundLUAngCA")));
baseRealDataSet.setV1AngA(map.get("Pq_FundLUAngAB"));
baseRealDataSet.setV1AngB(map.get("Pq_FundLUAngBC"));
baseRealDataSet.setV1AngC(map.get("Pq_FundLUAngCA"));
}
//基波电流相位
baseRealDataSet.setI1AngA(FloatUtils.get2Float(map.get("Pq_FundIAngA")));
baseRealDataSet.setI1AngB(FloatUtils.get2Float(map.get("Pq_FundIAngB")));
baseRealDataSet.setI1AngC(FloatUtils.get2Float(map.get("Pq_FundIAngC")));
baseRealDataSet.setI1AngA(map.get("Pq_FundIAngA"));
baseRealDataSet.setI1AngB(map.get("Pq_FundIAngB"));
baseRealDataSet.setI1AngC(map.get("Pq_FundIAngC"));
//电压总谐波畸变率
if (conType == 0) {
baseRealDataSet.setVThdA(FloatUtils.get2Float(map.get("Pq_ThdUA")));
baseRealDataSet.setVThdB(FloatUtils.get2Float(map.get("Pq_ThdUB")));
baseRealDataSet.setVThdC(FloatUtils.get2Float(map.get("Pq_ThdUC")));
baseRealDataSet.setVThdA(map.get("Pq_ThdUA"));
baseRealDataSet.setVThdB(map.get("Pq_ThdUB"));
baseRealDataSet.setVThdC(map.get("Pq_ThdUC"));
} else {
baseRealDataSet.setVThdA(FloatUtils.get2Float(map.get("Pq_ThdLUAB")));
baseRealDataSet.setVThdB(FloatUtils.get2Float(map.get("Pq_ThdLUBC")));
baseRealDataSet.setVThdC(FloatUtils.get2Float(map.get("Pq_ThdLUCA")));
baseRealDataSet.setVThdA(map.get("Pq_ThdLUAB"));
baseRealDataSet.setVThdB(map.get("Pq_ThdLUBC"));
baseRealDataSet.setVThdC(map.get("Pq_ThdLUCA"));
}
//电流总谐波畸变率
baseRealDataSet.setIThdA(FloatUtils.get2Float(map.get("Pq_ThdIA")));
baseRealDataSet.setIThdB(FloatUtils.get2Float(map.get("Pq_ThdIB")));
baseRealDataSet.setIThdC(FloatUtils.get2Float(map.get("Pq_ThdIC")));
baseRealDataSet.setIThdA(map.get("Pq_ThdIA"));
baseRealDataSet.setIThdB(map.get("Pq_ThdIB"));
baseRealDataSet.setIThdC(map.get("Pq_ThdIC"));
//电压不平衡度
baseRealDataSet.setVUnbalance(FloatUtils.get2Float(map.get("Pq_UnbalNegUM")));
baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUM"));
//电流不平衡度
baseRealDataSet.setIUnbalance(FloatUtils.get2Float(map.get("Pq_UnbalNegIM")));
baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIM"));
//有功功率
baseRealDataSet.setPA(FloatUtils.get2Float(map.get("Pq_PA")));
baseRealDataSet.setPB(FloatUtils.get2Float(map.get("Pq_PB")));
baseRealDataSet.setPC(FloatUtils.get2Float(map.get("Pq_PC")));
baseRealDataSet.setPTot(FloatUtils.get2Float(map.get("Pq_TotPM")));
baseRealDataSet.setPA(map.get("Pq_PA"));
baseRealDataSet.setPB(map.get("Pq_PB"));
baseRealDataSet.setPC(map.get("Pq_PC"));
baseRealDataSet.setPTot(map.get("Pq_TotPM"));
//无功功率
baseRealDataSet.setQA(FloatUtils.get2Float(map.get("Pq_QA")));
baseRealDataSet.setQB(FloatUtils.get2Float(map.get("Pq_QB")));
baseRealDataSet.setQC(FloatUtils.get2Float(map.get("Pq_QC")));
baseRealDataSet.setQTot(FloatUtils.get2Float(map.get("Pq_TotQM")));
baseRealDataSet.setQA(map.get("Pq_QA"));
baseRealDataSet.setQB(map.get("Pq_QB"));
baseRealDataSet.setQC(map.get("Pq_QC"));
baseRealDataSet.setQTot(map.get("Pq_TotQM"));
//视在功率
baseRealDataSet.setSA(FloatUtils.get2Float(map.get("Pq_SA")));
baseRealDataSet.setSB(FloatUtils.get2Float(map.get("Pq_SB")));
baseRealDataSet.setSC(FloatUtils.get2Float(map.get("Pq_SC")));
baseRealDataSet.setSTot(FloatUtils.get2Float(map.get("Pq_TotSM")));
baseRealDataSet.setSA(map.get("Pq_SA"));
baseRealDataSet.setSB(map.get("Pq_SB"));
baseRealDataSet.setSC(map.get("Pq_SC"));
baseRealDataSet.setSTot(map.get("Pq_TotSM"));
//视在功率因数
baseRealDataSet.setPfA(FloatUtils.get2Float(map.get("Pq_PFA")));
baseRealDataSet.setPfB(FloatUtils.get2Float(map.get("Pq_PFB")));
baseRealDataSet.setPfC(FloatUtils.get2Float(map.get("Pq_PFC")));
baseRealDataSet.setPfTot(FloatUtils.get2Float(map.get("Pq_TotPFM")));
baseRealDataSet.setPfA(map.get("Pq_PFA"));
baseRealDataSet.setPfB(map.get("Pq_PFB"));
baseRealDataSet.setPfC(map.get("Pq_PFC"));
baseRealDataSet.setPfTot(map.get("Pq_TotPFM"));
//位移功率因数
baseRealDataSet.setDpfA(FloatUtils.get2Float(map.get("Pq_DFA")));
baseRealDataSet.setDpfB(FloatUtils.get2Float(map.get("Pq_DFB")));
baseRealDataSet.setDpfC(FloatUtils.get2Float(map.get("Pq_DFC")));
baseRealDataSet.setDpfTot(FloatUtils.get2Float(map.get("Pq_TotDFM")));
baseRealDataSet.setDpfA(map.get("Pq_DFA"));
baseRealDataSet.setDpfB(map.get("Pq_DFB"));
baseRealDataSet.setDpfC(map.get("Pq_DFC"));
baseRealDataSet.setDpfTot(map.get("Pq_TotDFM"));
return baseRealDataSet;
}
public BaseRealDataSet channelData2(Map<String,Float> map) {
BaseRealDataSet baseRealDataSet = new BaseRealDataSet();
//频率
baseRealDataSet.setFreq(map.get("Pq_FreqM"));
//频率偏差
baseRealDataSet.setFreqDev(map.get("Pq_FreqDevM"));
//判断监测点的接线方式,不同接线方式电压有效值取值不同
//星型-相电压 角形、V型-线电压
//电压有效值
baseRealDataSet.setVRmsA(map.get("Pq_RmsUA"));
baseRealDataSet.setVRmsB(map.get("Pq_RmsUB"));
baseRealDataSet.setVRmsC(map.get("Pq_RmsUC"));
//基波电压幅值
baseRealDataSet.setV1A(map.get("Pq_RmsFundUA"));
baseRealDataSet.setV1B(map.get("Pq_RmsFundUB"));
baseRealDataSet.setV1C(map.get("Pq_RmsFundUC"));
//电流有效值
baseRealDataSet.setIRmsA(map.get("Pq_RmsIA"));
baseRealDataSet.setIRmsB(map.get("Pq_RmsIB"));
baseRealDataSet.setIRmsC(map.get("Pq_RmsIC"));
//基波电流幅值
baseRealDataSet.setI1A(map.get("Pq_RmsFundIA"));
baseRealDataSet.setI1B(map.get("Pq_RmsFundIB"));
baseRealDataSet.setI1C(map.get("Pq_RmsFundIC"));
//电压偏差
baseRealDataSet.setVDevA(map.get("Pq_UDevA"));
baseRealDataSet.setVDevB(map.get("Pq_UDevB"));
baseRealDataSet.setVDevC(map.get("Pq_UDevC"));
//基波电压相位
baseRealDataSet.setV1AngA(map.get("Pq_FundUAngA"));
baseRealDataSet.setV1AngB(map.get("Pq_FundUAngB"));
baseRealDataSet.setV1AngC(map.get("Pq_FundUAngC"));
//基波电流相位
baseRealDataSet.setI1AngA(map.get("Pq_FundIAngA"));
baseRealDataSet.setI1AngB(map.get("Pq_FundIAngB"));
baseRealDataSet.setI1AngC(map.get("Pq_FundIAngC"));
//电压总谐波畸变率
baseRealDataSet.setVThdA(map.get("Pq_ThdUA"));
baseRealDataSet.setVThdB(map.get("Pq_ThdUB"));
baseRealDataSet.setVThdC(map.get("Pq_ThdUC"));
//电流总谐波畸变率
baseRealDataSet.setIThdA(map.get("Pq_ThdIA"));
baseRealDataSet.setIThdB(map.get("Pq_ThdIB"));
baseRealDataSet.setIThdC(map.get("Pq_ThdIC"));
//电压不平衡度
baseRealDataSet.setVUnbalance(map.get("Pq_UnbalNegUM"));
//电流不平衡度
baseRealDataSet.setIUnbalance(map.get("Pq_UnbalNegIM"));
//有功功率
baseRealDataSet.setPA(map.get("Pq_PA"));
baseRealDataSet.setPB(map.get("Pq_PB"));
baseRealDataSet.setPC(map.get("Pq_PC"));
baseRealDataSet.setPTot(map.get("Pq_TotPM"));
//无功功率
baseRealDataSet.setQA(map.get("Pq_QA"));
baseRealDataSet.setQB(map.get("Pq_QB"));
baseRealDataSet.setQC(map.get("Pq_QC"));
baseRealDataSet.setQTot(map.get("Pq_TotQM"));
//视在功率
baseRealDataSet.setSA(map.get("Pq_SA"));
baseRealDataSet.setSB(map.get("Pq_SB"));
baseRealDataSet.setSC(map.get("Pq_SC"));
baseRealDataSet.setSTot(map.get("Pq_TotSM"));
//视在功率因数
baseRealDataSet.setPfA(map.get("Pq_PFA"));
baseRealDataSet.setPfB(map.get("Pq_PFB"));
baseRealDataSet.setPfC(map.get("Pq_PFC"));
baseRealDataSet.setPfTot(map.get("Pq_TotPFM"));
//位移功率因数
baseRealDataSet.setDpfA(map.get("Pq_DFA"));
baseRealDataSet.setDpfB(map.get("Pq_DFB"));
baseRealDataSet.setDpfC(map.get("Pq_DFC"));
baseRealDataSet.setDpfTot(map.get("Pq_TotDFM"));
return baseRealDataSet;
}
@@ -346,14 +430,14 @@ public class RtServiceImpl implements IRtService {
if (Objects.equals(item.getHarmName(),"Pq_RmsFundI")) {
if ("Secondary".equals(dataLevel)) {
double data = item.getData() * ct;
harmRealDataSet.setData1(FloatUtils.get2Float((float)data));
harmRealDataSet.setData1((float)data);
} else {
harmRealDataSet.setData1(FloatUtils.get2Float(item.getData()));
harmRealDataSet.setData1(item.getData());
}
} else if (Objects.equals(item.getHarmName(),"Pq_RmsFundU")) {
harmRealDataSet.setData1(FloatUtils.get2Float(item.getData()));
harmRealDataSet.setData1(item.getData());
} else if (Objects.equals(item.getHarmName(),"Pq_ThdU")) {
harmRealDataSet.setData1(FloatUtils.get2Float(item.getData()));
harmRealDataSet.setData1(item.getData());
} else {
String numberStr = item.getHarmName().substring(item.getHarmName().lastIndexOf('_') + 1);
String fieldName = "data" + numberStr;
@@ -363,12 +447,12 @@ public class RtServiceImpl implements IRtService {
if (item.getHarmName().contains("Pq_HarmI_")) {
if ("Secondary".equals(dataLevel)) {
double data = item.getData() * ct;
field.set(harmRealDataSet,FloatUtils.get2Float((float)data));
field.set(harmRealDataSet,(float)data);
} else {
field.set(harmRealDataSet,FloatUtils.get2Float(item.getData()));
field.set(harmRealDataSet,item.getData());
}
} else {
field.set(harmRealDataSet,FloatUtils.get2Float(item.getData()));
field.set(harmRealDataSet,item.getData());
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -378,26 +462,4 @@ public class RtServiceImpl implements IRtService {
return harmRealDataSet;
}
private Set<String> convertObjectToSetSafe(Object obj) {
if (obj == null) {
return new HashSet<>();
}
if (obj instanceof Set) {
// 类型安全的转换
Set<?> rawSet = (Set<?>) obj;
return rawSet.stream()
.filter(Objects::nonNull)
.map(Object::toString)
.collect(Collectors.toSet());
} else if (obj instanceof Collection) {
return ((Collection<?>) obj).stream()
.filter(Objects::nonNull)
.map(Object::toString)
.collect(Collectors.toSet());
} else {
log.warn("Redis中的对象类型不是Set或Collection: {}", obj.getClass().getName());
return new HashSet<>();
}
}
}

View File

@@ -1,6 +1,7 @@
package com.njcn.stat.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.api.CsDeviceFeignClient;
import com.njcn.access.api.CsLineLatestDataFeignClient;
@@ -9,13 +10,13 @@ import com.njcn.access.pojo.po.CsLineLatestData;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.CsCommunicateFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.param.DataArrayParam;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.mq.message.AppAutoDataMessage;
@@ -23,9 +24,7 @@ import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.stat.service.IStatService;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
import com.njcn.system.pojo.po.SysDicTreePO;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -38,6 +37,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -60,11 +60,7 @@ public class StatServiceImpl implements IStatService {
private final CsLineLatestDataFeignClient csLineLatestDataFeignClient;
private final CsDeviceFeignClient csDeviceFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
put("AB", "A");
put("BC", "B");
put("CA", "C");
}};
private final CsCommunicateFeignClient csCommunicateFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -166,6 +162,13 @@ public class StatServiceImpl implements IStatService {
//判断设备运行状态
if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) {
csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode());
//记录设备上线
PqsCommunicateDto dto = new PqsCommunicateDto();
dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
dto.setDevId(appAutoDataMessage.getId());
dto.setType(1);
dto.setDescription("通讯正常");
csCommunicateFeignClient.insertion(dto);
}
}
System.gc();

View File

@@ -0,0 +1,56 @@
package com.njcn.zlevent.pojo.dto;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.Map;
/**
* @author caozehui
* @data 2026-04-27
*/
@Data
public class TimeSyncRequestDTO {
/**
* 消息请求的唯一标识
*/
@JSONField(name = "guid")
private String guid;
/**
* 设备ID
*/
@JSONField(name = "Dev_id")
private String devId;
/**
* 前置Id
*/
@JSONField(name = "FrontId")
private String frontId;
/**
* 前置进程号
*/
@JSONField(name = "Node")
private Integer node;
@JSONField(name = "Detail")
private TimeSyncRequestDTO.Detail detail;
@Data
public static class Detail {
/**
* 数据类型,代表特定功能
*/
@JSONField(name = "Type")
private Integer type;
/**
* 详情
*/
@JSONField(name = "Msg")
private Map<String, Object> msg;
}
}

View File

@@ -50,6 +50,12 @@ public class UpgradeRequestDTO {
* 详情
*/
@JSONField(name = "Msg")
private Map<String, Object> msg;
private UpgradeRequestDTO.Msg msg;
}
@Data
public static class Msg {
@JSONField(name = "Name")
private String name;
}
}

View File

@@ -8,10 +8,12 @@ import com.njcn.access.enums.TypeEnum;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.csdevice.api.CsEdDataFeignClient;
import com.njcn.csdevice.api.CsSoftInfoFeignClient;
import com.njcn.csdevice.api.CsUpgradeLogsFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO;
import com.njcn.csdevice.pojo.po.CsEdDataPO;
import com.njcn.csdevice.pojo.po.CsSoftInfoPO;
import com.njcn.csdevice.pojo.vo.CsEdDataVO;
import com.njcn.csdevice.pojo.po.CsUpgradeLogs;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.redis.pojo.enums.AppRedisKey;
@@ -23,7 +25,6 @@ import com.njcn.zlevent.service.IDeviceService;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -46,6 +47,7 @@ public class DeviceServiceImpl implements IDeviceService {
private final RedisUtil redisUtil;
private final MqttPublisher publisher;
private final FileStorageUtil fileStorageUtil;
private final CsUpgradeLogsFeignClient csUpgradeLogsFeignClient;
@Override
public void startWorkingLog(String devId) {
@@ -98,9 +100,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public boolean timeSync(String devId) {
// List<CsEquipmentDeliveryDTO> listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData();
//
// BaseMessage message = new BaseMessage();
List<CsEquipmentDeliveryDTO> listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData();
BaseMessage message = new BaseMessage();
// FileDownloadRequestDTO requestDTO = new FileDownloadRequestDTO();
// requestDTO.setGuid(listHttpResult.get(0).getNodeId());
// requestDTO.setNode(listHttpResult.get(0).getNodeProcess());
@@ -181,97 +183,75 @@ public class DeviceServiceImpl implements IDeviceService {
List<CsEquipmentDeliveryDTO> listHttpResult = equipmentFeignClient.queryDeviceById(Collections.singletonList(devId)).getData();
CsEquipmentDeliveryDTO csEquipmentDeliveryDTO = listHttpResult.get(0);
CsEdDataVO csEdDataVO = csEdDataFeignClient.findByDevTypeId(edDataId).getData();
String filePath = csEdDataVO.getFilePath();
CsEdDataPO csEdDataPO = csEdDataFeignClient.getById(edDataId).getData();
String filePath = csEdDataPO.getFilePath();
FileUploadRequestDTO fileUploadRequestDTO = new FileUploadRequestDTO();
fileUploadRequestDTO.setDevId(devId);
fileUploadRequestDTO.setGuid(IdUtil.simpleUUID());
fileUploadRequestDTO.setNode(csEquipmentDeliveryDTO.getNodeProcess());
fileUploadRequestDTO.setFrontId(csEquipmentDeliveryDTO.getNodeId());
// 装置升级日志
CsUpgradeLogs csUpgradeLogs = new CsUpgradeLogs();
csUpgradeLogs.setDevId(devId);
csUpgradeLogs.setVersionNo(csEdDataPO.getVersionNo());
csUpgradeLogs.setResult(0);
FileUploadRequestDTO.Detail detail1 = new FileUploadRequestDTO.Detail();
detail1.setType(Integer.parseInt(TypeEnum.FILE_UPLOAD.getCode()));
UpgradeRequestDTO requestDTO = new UpgradeRequestDTO();
requestDTO.setDevId(devId);
requestDTO.setGuid(IdUtil.simpleUUID());
requestDTO.setNode(csEquipmentDeliveryDTO.getNodeProcess());
requestDTO.setFrontId(csEquipmentDeliveryDTO.getNodeId());
FileUploadRequestDTO.Msg msg = new FileUploadRequestDTO.Msg();
UpgradeRequestDTO.Detail detail1 = new UpgradeRequestDTO.Detail();
detail1.setType(Integer.valueOf(TypeEnum.DEVICE_UPGRADE.getCode()));
UpgradeRequestDTO.Msg msg = new UpgradeRequestDTO.Msg();
msg.setName(filePath);
msg.setRemoteName("");
detail1.setMsg(msg);
requestDTO.setDetail(detail1);
fileUploadRequestDTO.setDetail(detail1);
BaseMessage message1 = new BaseMessage();
message1.setMessageBody(JSON.toJSONString(fileUploadRequestDTO));
BaseMessage message = new BaseMessage();
message.setMessageBody(JSON.toJSONString(requestDTO));
// 使用 Redis 存储 guid 用于后续查询
redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + fileUploadRequestDTO.getGuid(), "pending", 120L);
redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L);
// 发送
commonProducer.send(message1, fileUploadRequestDTO.getFrontId());
commonProducer.send(message, requestDTO.getFrontId());
// 轮询 Redis 等待响应
FileUploadResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(fileUploadRequestDTO.getGuid(), 10), FileUploadResponeDTO.class);
FileUploadResponeDTO.Detail detail2 = responseDTO.getDetail();
UpgradeResponeDTO responeDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), UpgradeResponeDTO.class);
UpgradeResponeDTO.Detail detail2 = responeDTO.getDetail();
if (detail2.getCode() == 200) {
UpgradeRequestDTO requestDTO = new UpgradeRequestDTO();
requestDTO.setDevId(devId);
requestDTO.setGuid(IdUtil.simpleUUID());
requestDTO.setNode(csEquipmentDeliveryDTO.getNodeProcess());
requestDTO.setFrontId(csEquipmentDeliveryDTO.getNodeId());
// 修改数据库记录
String softinfoId = csEquipmentDeliveryDTO.getSoftinfoId();
UpgradeRequestDTO.Detail detail3 = new UpgradeRequestDTO.Detail();
detail3.setType(Integer.valueOf(TypeEnum.DEVICE_UPGRADE.getCode()));
detail3.setMsg(new HashMap<>());
requestDTO.setDetail(detail3);
BaseMessage message2 = new BaseMessage();
message2.setMessageBody(JSON.toJSONString(requestDTO));
// 使用 Redis 存储 guid 用于后续查询
redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L);
// 发送
commonProducer.send(message2, requestDTO.getFrontId());
// 轮询 Redis 等待响应
UpgradeResponeDTO responeDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), UpgradeResponeDTO.class);
UpgradeResponeDTO.Detail detail4 = responeDTO.getDetail();
if (detail4.getCode() == 200) {
// 修改数据库记录
String softinfoId = csEquipmentDeliveryDTO.getSoftinfoId();
CsSoftInfoPO softInfoPO = null;
if (StrUtil.isNotBlank(softinfoId)) {
csSoftInfoFeignClient.removeSoftInfo(softinfoId);
}
softInfoPO = new CsSoftInfoPO();
softInfoPO.setId(IdUtil.fastSimpleUUID());
softInfoPO.setAppCheck(csEdDataVO.getCrc());
softInfoPO.setAppDate(csEdDataVO.getVersionDate().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime());
softInfoPO.setAppVersion(csEdDataVO.getVersionNo());
softInfoPO.setOpAttr("r");
softInfoPO.setOsName("VxWorks");
softInfoPO.setOsVersion("VxWorks");
softInfoPO.setSoftUpdate("yes");
csSoftInfoFeignClient.saveSoftInfo(softInfoPO);
equipmentFeignClient.updateSoftInfo(csEquipmentDeliveryDTO.getNdid(), softInfoPO.getId());
// 重新获取升级后的版本信息
DevVersionResponeDTO.VersionInfo newVersionInfo = this.getDeviceVersion(devId);
if (newVersionInfo.getAppVersion().equals(csEdDataVO.getVersionNo()) && newVersionInfo.getCloudProtocolVer().equals(csEdDataVO.getVersionAgreement())) {
// 修改数据库记录
equipmentFeignClient.updateSoftInfo(csEquipmentDeliveryDTO.getNdid(), softInfoPO.getId());
return true;
}
return false;
} else {
return false;
if (StrUtil.isNotBlank(softinfoId)) {
csSoftInfoFeignClient.removeSoftInfo(softinfoId);
}
} else {
CsSoftInfoPO softInfoPO = new CsSoftInfoPO();
softInfoPO.setId(IdUtil.fastSimpleUUID());
softInfoPO.setAppCheck(csEdDataPO.getCrc());
softInfoPO.setAppDate(csEdDataPO.getVersionDate());
softInfoPO.setAppVersion(csEdDataPO.getVersionNo());
softInfoPO.setOpAttr("r");
softInfoPO.setOsName("VxWorks");
softInfoPO.setOsVersion("VxWorks");
softInfoPO.setSoftUpdate("yes");
csSoftInfoFeignClient.saveSoftInfo(softInfoPO);
equipmentFeignClient.updateSoftInfo(csEquipmentDeliveryDTO.getNdid(), softInfoPO.getId());
// 重新获取升级后的版本信息
DevVersionResponeDTO.VersionInfo newVersionInfo = this.getDeviceVersion(devId);
if (newVersionInfo.getAppVersion().equals(csEdDataPO.getVersionNo()) && newVersionInfo.getCloudProtocolVer().equals(csEdDataPO.getVersionAgreement())) {
// 修改数据库记录
equipmentFeignClient.updateSoftInfo(csEquipmentDeliveryDTO.getNdid(), softInfoPO.getId());
csUpgradeLogs.setResult(1);
csUpgradeLogsFeignClient.add(csUpgradeLogs);
return true;
}
csUpgradeLogsFeignClient.add(csUpgradeLogs);
return false;
}
csUpgradeLogsFeignClient.add(csUpgradeLogs);
return false;
}
@Override
@@ -284,6 +264,25 @@ public class DeviceServiceImpl implements IDeviceService {
requestDTO.setNode(listHttpResult.get(0).getNodeProcess());
requestDTO.setFrontId(listHttpResult.get(0).getNodeId());
return true;
RebootRequestDTO.Detail detail1 = new RebootRequestDTO.Detail();
detail1.setType(Integer.parseInt(TypeEnum.DEVICE_REBOOT.getCode()));
detail1.setMsg(new HashMap<>());
requestDTO.setDetail(detail1);
BaseMessage message = new BaseMessage();
message.setMessageBody(JSON.toJSONString(requestDTO));
// 使用 Redis 存储 guid 用于后续查询
redisUtil.saveByKeyWithExpire(AppRedisKey.COMMON_REQUEST + requestDTO.getGuid(), "pending", 120L);
// 发送
commonProducer.send(message, requestDTO.getFrontId());
// 轮询 Redis 等待响应
RebootResponeDTO responeDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), RebootResponeDTO.class);
if (responeDTO.getDetail().getCode() == 200) {
return true;
}
return false;
}
}

View File

@@ -86,11 +86,12 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDa
}
/**
* 消费成功缓存到redis72小时,避免重复消费
* 消费成功缓存到redis 5分钟,避免重复消费
*/
@Override
protected void consumeSuccess(AppAutoDataMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
// redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, 5 * 60L);
}
/**

View File

@@ -49,7 +49,7 @@ public class CldDevRunFlagConsumer extends EnhanceConsumerMessageHandler<CldDevi
protected void handleMessage(CldDeviceRunFlagMessage cldDeviceRunFlagMessage) {
log.info("分发至翻转设备状态");
int status = Objects.equals(cldDeviceRunFlagMessage.getStatus(),"0") ? 1 : 2;
//equipmentFeignClient.flipCldDevStatus(cldDeviceRunFlagMessage.getDate(),cldDeviceRunFlagMessage.getId(), status);
equipmentFeignClient.flipCldDevStatus(cldDeviceRunFlagMessage.getDate(),cldDeviceRunFlagMessage.getId(), status);
}

View File

@@ -68,11 +68,12 @@ public class CldHeartConsumer extends EnhanceConsumerMessageHandler<CldHeartBeat
}
/**
* 消费成功缓存到redis72小时,避免重复消费
* 消费成功缓存到redis 5分钟,避免重复消费
*/
@Override
protected void consumeSuccess(CldHeartBeatMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
// redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, 5 * 60L);
}
/**

View File

@@ -71,11 +71,11 @@ public class RealDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDataM
}
/**
* 消费成功缓存到redis72小时,避免重复消费
* 消费成功缓存到redis 5分钟,避免重复消费
*/
@Override
protected void consumeSuccess(AppAutoDataMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, 5 * 60L);
}
/**