暂态事件同步

This commit is contained in:
xy
2026-03-26 18:52:02 +08:00
parent 56c9c69fc9
commit f227fe3c3f
4 changed files with 101 additions and 31 deletions

View File

@@ -160,6 +160,7 @@ public class StatServiceImpl implements IStatService {
csLineLatestDataFeignClient.addData(csLineLatestData);
}
}
System.gc();
}
/**

View File

@@ -41,6 +41,9 @@ public interface ZlConstant {
*/
String EVT_PARAM_TM = "Evt_Param_Tm";
/**
* 幅值
*/
String EVT_PARAM_VVADEPTH = "Evt_Param_VVaDepth";
}

View File

@@ -2,7 +2,9 @@ package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
@@ -14,6 +16,8 @@ import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csdevice.pojo.po.WlRecord;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.event.common.mapper.WlRmpEventDetailMapper;
import com.njcn.event.pojo.po.RmpEventDetailPO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.mq.message.AppEventMessage;
@@ -27,7 +31,6 @@ import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.system.pojo.po.DictData;
import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.po.CsEventLogs;
import com.njcn.zlevent.service.ICsEventLogsService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.IEventService;
@@ -49,6 +52,7 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 类的介绍:
@@ -72,9 +76,10 @@ public class EventServiceImpl implements IEventService {
private final ICsEventLogsService csEventLogsService;
private final SendEventUtils sendEventUtils;
private final WlRecordFeignClient wlRecordFeignClient;
private final WlRmpEventDetailMapper wlRmpEventDetailMapper;
@Override
@Transactional(rollbackFor = Exception.class)
@DSTransactional
public void analysis(AppEventMessage appEventMessage) {
List<CsEventPO> list1 = new ArrayList<>();
List<String> records = new ArrayList<String>();
@@ -96,12 +101,21 @@ public class EventServiceImpl implements IEventService {
try {
if (devModel) {
if (Objects.equals(appEventMessage.getDid(),1)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
Object object = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0");
if (ObjectUtil.isNotNull(object)) {
lineId = object.toString();
}
} else if (Objects.equals(appEventMessage.getDid(),2)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
Object object = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString());
if (ObjectUtil.isNotNull(object)) {
lineId = object.toString();
}
}
} else {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
Object object = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString());
if (ObjectUtil.isNotNull(object)) {
lineId = object.toString();
}
}
//处理事件数据
@@ -111,9 +125,9 @@ public class EventServiceImpl implements IEventService {
//判断事件是否存在,如果存在则不处理
LambdaQueryWrapper<CsEventPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CsEventPO::getDeviceId,po.getId())
.eq(CsEventPO::getTag,tag)
.eq(CsEventPO::getTag,item.getName())
.eq(CsEventPO::getStartTime,eventTime)
.eq(CsEventPO::getLineId,lineId);
.eq(ObjectUtil.isNotNull(lineId),CsEventPO::getLineId,lineId);
List<CsEventPO> eventList = csEventService.list(queryWrapper);
if (CollectionUtil.isEmpty(eventList)) {
id = IdUtil.fastSimpleUUID();
@@ -150,6 +164,9 @@ public class EventServiceImpl implements IEventService {
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){
csEvent.setPersistTime(Double.parseDouble(param.getData().toString()));
}
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_VVADEPTH)) {
csEvent.setAmplitude(Double.parseDouble(param.getData().toString()));
}
fields.put(param.getName(),param.getData());
}
//只有治理型号的设备有监测位置
@@ -169,20 +186,16 @@ public class EventServiceImpl implements IEventService {
records.add(batchPoints.lineProtocol());
}
list1.add(csEvent);
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
csEventLogs.setTime(LocalDateTime.now());
csEventLogsService.save(csEventLogs);
}
}
//cs_event入库
if (CollectionUtil.isNotEmpty(list1)){
csEventService.saveBatch(list1);
//同步数据到 r_mp_event_detail 只有暂态事件再同步
List<CsEventPO> filterList = list1.stream().filter(csEvent -> Objects.equals(csEvent.getType(), 0)).collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(filterList)) {
filterList.forEach(this::insertEvent);
}
//推送事件逻辑处理 && cs_event_user入库
for (AppEventMessage.DataArray item : dataArray) {
sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid());
@@ -193,18 +206,63 @@ public class EventServiceImpl implements IEventService {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
} catch (Exception e) {
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(eventTime);
csEventLogs.setTag(tag);
csEventLogs.setStatus(0);
csEventLogs.setTime(LocalDateTime.now());
csEventLogs.setRemark(e.getMessage());
csEventLogsService.save(csEventLogs);
log.error("事件入库异常:{}",e.getMessage());
}
}
public void insertEvent(CsEventPO item) {
RmpEventDetailPO rmpEventDetailPo = new RmpEventDetailPO();
rmpEventDetailPo.setEventId(item.getId());
rmpEventDetailPo.setMeasurementPointId(item.getLineId());
rmpEventDetailPo.setStartTime(item.getStartTime());
rmpEventDetailPo.setEventType(getEventType(item.getTag()));
rmpEventDetailPo.setFeatureAmplitude(item.getAmplitude());
rmpEventDetailPo.setDuration(item.getPersistTime());
rmpEventDetailPo.setEventDescribe(getTag(item.getTag()));
rmpEventDetailPo.setDealFlag(0);
rmpEventDetailPo.setFileFlag(0);
wlRmpEventDetailMapper.insert(rmpEventDetailPo);
}
public String getEventType(String tag) {
switch (tag) {
case "Evt_Sys_DipStr":
DictData dip = dicDataFeignClient.getDicDataByCode(DicDataEnum.VOLTAGE_DIP.getCode()).getData();
tag = dip.getId();
break;
case "Evt_Sys_SwlStr":
DictData rise = dicDataFeignClient.getDicDataByCode(DicDataEnum.VOLTAGE_RISE.getCode()).getData();
tag = rise.getId();
break;
case "Evt_Sys_IntrStr":
DictData interruptions = dicDataFeignClient.getDicDataByCode(DicDataEnum.SHORT_INTERRUPTIONS.getCode()).getData();
tag = interruptions.getId();
break;
default:
tag = "Un_Know";
break;
}
return tag;
}
public String getTag(String tag) {
switch (tag) {
case "Evt_Sys_DipStr":
tag = DicDataEnum.VOLTAGE_DIP.getCode();
break;
case "Evt_Sys_SwlStr":
tag = DicDataEnum.VOLTAGE_RISE.getCode();
break;
case "Evt_Sys_IntrStr":
tag = DicDataEnum.SHORT_INTERRUPTIONS.getCode();
break;
default:
tag = "Un_Know";
break;
}
return tag;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void getPortableData(AppEventMessage appEventMessage) {

View File

@@ -294,9 +294,13 @@ public class FileServiceImpl implements IFileService {
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
List<String> eventList = correlateEvents(fileInfoDto,filePath,fileName);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList) && devModel){
eventList.forEach(wavePicFeignClient::getWavePics);
eventList.forEach(item -> {
//波形文件解析成图片
wavePicFeignClient.getWavePics(item);
//同步更新r_mp_event_detail将波形路径录入
wavePicFeignClient.updateEventById(item);
});
}
//解析完删除、处理缓存
removeInfoUtils.deleteEventInfo(appFileMessage.getId(),fileName);
@@ -340,9 +344,13 @@ public class FileServiceImpl implements IFileService {
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG, "").replaceAll(GeneralConstant.DAT, "");
List<String> eventList = correlateEvents(fileInfoDto, filePath, fileName);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList) && devModel) {
eventList.forEach(wavePicFeignClient::getWavePics);
if (CollectionUtil.isNotEmpty(eventList) && devModel){
eventList.forEach(item -> {
//波形文件解析成图片
wavePicFeignClient.getWavePics(item);
//同步更新r_mp_event_detail将波形路径录入
wavePicFeignClient.updateEventById(item);
});
}
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));