新增云协议设备实时数据功能

This commit is contained in:
xy
2025-09-04 14:00:15 +08:00
parent f88baa52be
commit 1a3e443be1
15 changed files with 398 additions and 150 deletions

View File

@@ -3,6 +3,7 @@ package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
@@ -92,7 +93,6 @@ public class EventServiceImpl implements IEventService {
//获取设备类型 true:治理设备 false:其他类型的设备
boolean devModel = equipmentFeignClient.judgeDevModel(appEventMessage.getId()).getData();
try {
// lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid();
if (devModel) {
if (Objects.equals(appEventMessage.getDid(),1)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
@@ -103,74 +103,81 @@ public class EventServiceImpl implements IEventService {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
//处理事件数据
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) {
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
id = IdUtil.fastSimpleUUID();
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
csEvent.setDeviceId(po.getId());
csEvent.setProcess(po.getProcess());
csEvent.setCode(item.getCode());
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(eventTime);
tag = item.getName();
csEvent.setTag(tag);
if (Objects.equals(item.getType(),"2")){
csEvent.setType(0);
} else if (Objects.equals(item.getType(),"3")){
csEvent.setType(1);
} else if (Objects.equals(item.getType(),"1")){
csEvent.setType(2);
}
csEvent.setLevel(Integer.parseInt(item.getType()));
csEvent.setClDid(appEventMessage.getMsg().getClDid());
csEvent.setLineId(lineId);
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
//判断是否有参数
List<AppEventMessage.Param> params = item.getParam();
if (CollectionUtil.isNotEmpty(params)) {
String tableName = map.get(item.getName());
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
for (AppEventMessage.Param param : params) {
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){
csEvent.setPersistTime(Double.parseDouble(param.getData().toString()));
}
fields.put(param.getName(),param.getData());
//判断事件是否存在,如果存在则不处理
LambdaQueryWrapper<CsEventPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CsEventPO::getDeviceId,po.getId())
.eq(CsEventPO::getTag,tag)
.eq(CsEventPO::getStartTime,eventTime)
.eq(CsEventPO::getLineId,lineId);
List<CsEventPO> eventList = csEventService.list(queryWrapper);
if (CollectionUtil.isEmpty(eventList)) {
id = IdUtil.fastSimpleUUID();
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
csEvent.setDeviceId(po.getId());
csEvent.setProcess(po.getProcess());
csEvent.setCode(item.getCode());
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(eventTime);
tag = item.getName();
csEvent.setTag(tag);
if (Objects.equals(item.getType(),"2")){
csEvent.setType(0);
} else if (Objects.equals(item.getType(),"3")){
csEvent.setType(1);
} else if (Objects.equals(item.getType(),"1")){
csEvent.setType(2);
}
//只有治理型号的设备有监测位置
if (devModel) {
if (appEventMessage.getMsg().getClDid() == 1) {
fields.put("Evt_Param_Position","电网侧");
csEvent.setLocation("grid");
} else if (appEventMessage.getMsg().getClDid() == 2) {
fields.put("Evt_Param_Position","负载侧");
csEvent.setLocation("load");
csEvent.setLevel(Integer.parseInt(item.getType()));
csEvent.setClDid(appEventMessage.getMsg().getClDid());
csEvent.setLineId(lineId);
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
//判断是否有参数
List<AppEventMessage.Param> params = item.getParam();
if (CollectionUtil.isNotEmpty(params)) {
String tableName = map.get(item.getName());
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
for (AppEventMessage.Param param : params) {
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){
csEvent.setPersistTime(Double.parseDouble(param.getData().toString()));
}
fields.put(param.getName(),param.getData());
}
//只有治理型号的设备有监测位置
if (devModel) {
if (appEventMessage.getMsg().getClDid() == 1) {
fields.put("Evt_Param_Position","电网侧");
csEvent.setLocation("grid");
} else if (appEventMessage.getMsg().getClDid() == 2) {
fields.put("Evt_Param_Position","负载侧");
csEvent.setLocation("load");
}
}
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
}
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
list1.add(csEvent);
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
csEventLogs.setTime(LocalDateTime.now());
csEventLogsService.save(csEventLogs);
}
list1.add(csEvent);
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
csEventLogs.setTime(LocalDateTime.now());
csEventLogsService.save(csEventLogs);
}
//cs_event入库
if (CollectionUtil.isNotEmpty(list1)){