feat(device): 便携式设备数据补召功能重构,补召数据拆分到13张基础数据表中

This commit is contained in:
xy
2026-05-25 19:59:47 +08:00
parent ea2962840c
commit ff7b05bbb6
5 changed files with 119 additions and 114 deletions

View File

@@ -167,6 +167,11 @@
<artifactId>message-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>event-common</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>

View File

@@ -16,6 +16,10 @@ import com.njcn.csdevice.service.AppLineTopologyDiagramService;
import com.njcn.csdevice.service.AppTopologyDiagramService;
import com.njcn.csdevice.service.CsLinePOService;
import com.njcn.csdevice.service.ICsLedgerService;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.pojo.vo.DictTreeVO;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -39,6 +43,9 @@ public class AppLineTopologyDiagramServiceImpl extends ServiceImpl<AppLineTopolo
private final CsLinePOService csLinePOService;
private final AppTopologyDiagramService appTopologyDiagramService;
private final ICsLedgerService iCsLedgerService;
private final RedisUtil redisUtil;
private final DictTreeFeignClient dictTreeFeignClient;
@Override
public AppTopologyDiagramVO query(String devId) {
CsLedger one = iCsLedgerService.lambdaQuery().eq(CsLedger::getId, devId).eq(CsLedger::getState,1).one();
@@ -88,7 +95,15 @@ public class AppLineTopologyDiagramServiceImpl extends ServiceImpl<AppLineTopolo
set(AppLineTopologyDiagramPO::getLng,temp.getLng()).set(AppLineTopologyDiagramPO::getId,linePostionParam.getId())
.set(AppLineTopologyDiagramPO::getTarget,temp.getTarget())
.update();
iCsLedgerService.lambdaUpdate().eq(CsLedger::getId,temp.getLineId()).set(CsLedger::getName,temp.getName()).update();
iCsLedgerService.lambdaUpdate().eq(CsLedger::getId,temp.getLineId()).set(CsLedger::getName,temp.getName()).update();
});
//修改指标后,删除缓存数据
String deviceId = csLinePOService.getById(linePostionParam.getPointList().get(0).getLineId()).getDeviceId();
DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode("baseData").getData();
String key = deviceId + "#";
if (dictTreeVO != null) {
key = deviceId + "#" + dictTreeVO.getId();
}
redisUtil.delete(key);
}
}

View File

@@ -7,6 +7,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.advance.api.EventCauseFeignClient;
import com.njcn.advance.pojo.dto.EventAnalysisDTO;
import com.njcn.common.pojo.enums.common.DataStateEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.EquipmentFeignClient;
@@ -33,6 +35,8 @@ import com.njcn.csharmonic.offline.mincfg.AnalyseComtradeCfg;
import com.njcn.csharmonic.offline.mincfg.vo.CmnModeCfg;
import com.njcn.csharmonic.offline.vo.Response;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.event.common.mapper.WlRmpEventDetailMapper;
import com.njcn.event.pojo.po.RmpEventDetailPO;
import com.njcn.influx.imapper.*;
import com.njcn.influx.pojo.po.*;
import com.njcn.influx.pojo.po.cs.EntData;
@@ -40,6 +44,8 @@ import com.njcn.influx.pojo.po.cs.PqdData;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.api.EleEvtFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.enums.DicDataTypeEnum;
import com.njcn.system.pojo.po.DictData;
import com.njcn.web.pojo.param.BaseParam;
import lombok.RequiredArgsConstructor;
@@ -107,6 +113,8 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
private final PortableOffMainLogService portableOffMainLogService;
private final IWlRecordService wlRecordService;
private final CsLinePOService csLinePOService;
private final WlRmpEventDetailMapper wlRmpEventDetailMapper;
private final EventCauseFeignClient eventCauseFeignClient;
@Override
public Page<PortableOfflLog> queryPage(BaseParam baseParam) {
@@ -356,9 +364,9 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
List<List<PqdData>> partition = ListUtils.partition(pqdData, 1500);
for (List<PqdData> sliceList : partition) {
List<PqdData> sublistAsOriginalListType = new ArrayList<>(sliceList);
// Map<String, Object> map = pqdDataSplitService.splitPqdData(sublistAsOriginalListType);
// insertData(map);
pqdDataMapper.insertBatch(sublistAsOriginalListType);
Map<String, Object> map = pqdDataSplitService.splitPqdData(sublistAsOriginalListType);
insertData(map);
// pqdDataMapper.insertBatch(sublistAsOriginalListType);
}
//min结果集解析入库后就不需要在解析了
minFlag = false;
@@ -386,6 +394,8 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
}
List<NewTaglogbuffer> newTaglogbuffers = (List<NewTaglogbuffer>) response.getObj();
if(newTaglogbuffers != null && !newTaglogbuffers.isEmpty()){
List<DictData> list1 = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.EVENT_REASON.getCode()).getData();
List<DictData> list2 = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.EVENT_TYPE.getCode()).getData();
//否则正常标记为成功解析
portableOfflLog.setState(1);
portableOfflLog.setAllCount(newTaglogbuffers.size());
@@ -456,14 +466,32 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
}
}
csEventPO.setWavePath(wavePath);
if (Objects.equals(csEventPO.getTag(),DataParam.dipStrName)) {
EventAnalysisDTO var1 = new EventAnalysisDTO();
var1.setWlFilePath(wavePath);
EventAnalysisDTO dto = eventCauseFeignClient.analysisCauseAndType(var1).getData();
String id1 = list1.stream()
.filter(item -> Objects.equals(item.getAlgoDescribe(), dto.getCause()))
.map(DictData::getId)
.findFirst()
.orElse(null);
csEventPO.setAdvanceReason(id1);
String id2 = list2.stream()
.filter(item -> Objects.equals(item.getAlgoDescribe(), dto.getType()))
.map(DictData::getId)
.findFirst()
.orElse(null);
csEventPO.setAdvanceType(id2);
}
//默认暂态事件
csEventPO.setType(0);
String clDid = influxDbParamUtil.getClDidByLineId(uploadDataParam.getLineId());
csEventPO.setClDid(clDid == null ? null : Integer.parseInt(clDid));
csEventPO.setProcess(csEquipmentDeliveryDTO.getProcess());
csEventPOS.add(csEventPO);
EntData entData = new EntData();
entData.setUuid(csEventPO.getId());
entData.setTime(new Date().toInstant());
@@ -508,12 +536,19 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
log.info("类型log,插入infulxDb的evtData");
try {
evtDataMapper.insertOne(entData);
//添加其余数据
csEventPO.setPhase(entData.getEvtParamPhase());
csEventPO.setPersistTime(entData.getEvtParamTm());
csEventPO.setAmplitude(entData.getEvtParamVVaDepth());
csEventPOS.add(csEventPO);
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("类型log,插入mysql事件表cs_event");
eventFeignClient.saveBatchEventList(csEventPOS);
//同步数据到 r_mp_event_detail
csEventPOS.forEach(this::insertEvent);
}
}
}
@@ -534,6 +569,62 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
portableOffMainLogService.save(portableOffMainLog);
}
public void insertEvent(CsEventPO item) {
RmpEventDetailPO rmpEventDetailPo = new RmpEventDetailPO();
rmpEventDetailPo.setEventId(item.getId());
rmpEventDetailPo.setMeasurementPointId(item.getLineId());
rmpEventDetailPo.setStartTime(item.getStartTime());
rmpEventDetailPo.setEventType(getEventType(item.getTag()));
rmpEventDetailPo.setFeatureAmplitude(item.getAmplitude());
rmpEventDetailPo.setDuration(item.getPersistTime());
rmpEventDetailPo.setEventDescribe(getTag(item.getTag()));
rmpEventDetailPo.setDealFlag(0);
rmpEventDetailPo.setFileFlag(0);
rmpEventDetailPo.setPhase(item.getPhase());
rmpEventDetailPo.setAdvanceReason(item.getAdvanceReason());
rmpEventDetailPo.setAdvanceType(item.getAdvanceType());
wlRmpEventDetailMapper.insert(rmpEventDetailPo);
}
public String getTag(String tag) {
switch (tag) {
case "Evt_Sys_DipStr":
tag = DicDataEnum.VOLTAGE_DIP.getCode();
break;
case "Evt_Sys_SwlStr":
tag = DicDataEnum.VOLTAGE_RISE.getCode();
break;
case "Evt_Sys_IntrStr":
tag = DicDataEnum.SHORT_INTERRUPTIONS.getCode();
break;
default:
tag = "Un_Know";
break;
}
return tag;
}
public String getEventType(String tag) {
switch (tag) {
case "Evt_Sys_DipStr":
DictData dip = dicDataFeignClient.getDicDataByCode(DicDataEnum.VOLTAGE_DIP.getCode()).getData();
tag = dip.getId();
break;
case "Evt_Sys_SwlStr":
DictData rise = dicDataFeignClient.getDicDataByCode(DicDataEnum.VOLTAGE_RISE.getCode()).getData();
tag = rise.getId();
break;
case "Evt_Sys_IntrStr":
DictData interruptions = dicDataFeignClient.getDicDataByCode(DicDataEnum.SHORT_INTERRUPTIONS.getCode()).getData();
tag = interruptions.getId();
break;
default:
tag = "Un_Know";
break;
}
return tag;
}
@Override
public Page<PortableOffMainLog> queryMainLogPage(BaseParam baseParam) {
Page<PortableOffMainLog> returnpage = new Page<> (baseParam.getPageNum(), baseParam.getPageSize ());

View File

@@ -73,6 +73,7 @@ public class PqdDataSplitServiceImpl implements IPqdDataSplitService {
for (int i = item.getHarmStart(); i <= item.getHarmEnd(); i++) {
EleEpdPqd newItem = copyEleEpdPqd(item);
newItem.setName(item.getName() + "_" + i);
newItem.setOtherName(item.getOtherName() + "_" + i);
expandedList.add(newItem);
}
return expandedList.stream();

View File

@@ -10,8 +10,6 @@ import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.CsLineTopologyFeignClient;
import com.njcn.csdevice.api.DevCapacityFeignClient;
import com.njcn.csdevice.api.fallback.CsLineTopologyClientFallbackFactory;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csdevice.pojo.vo.AppTopologyDiagramVO;
import com.njcn.csharmonic.param.CommonStatisticalQueryParam;
import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam;
@@ -122,111 +120,6 @@ public class MqttMessageHandler {
publisher.send("/zl/TemperData/" + pageId, gson.toJson(recallReplyDTO), 1, false);
}
/**
* 实时数据应答
*/
// @MqttSubscribe(value = "/zl/askDevData/{devId}",qos = 1)
// public void responseTopoData(String topic, @NamedValue("devId") String devId, MqttMessage message, @Payload String payload) {
// String topoDataJson =redisUtil.getStringByKey (devId);
// if(StringUtils.isEmpty(topoDataJson)){
// List<ThdDataVO> result = new ArrayList<>();
// List<ThdDataVO> tempList = new ArrayList<>();
//
// //1.查询拓扑图配置的指标:拓扑图扑图配置7677f94c749dedaff30f911949cbd724
// List<EleEpdPqd> data = csStatisticalSetFeignClient.queryStatisticalSelect("b934664f9592d1c5e92caa90695b7103").getData();
// data.forEach(temp->{
// if(Objects.nonNull(temp.getHarmStart())&&Objects.nonNull(temp.getHarmEnd())){
// FrequencyStatisticalQueryParam frequencyStatisticalQueryParam = new FrequencyStatisticalQueryParam();
// frequencyStatisticalQueryParam.setDevId(devId);
// frequencyStatisticalQueryParam.setStatisticalId(temp.getId());
// frequencyStatisticalQueryParam.setValueType("avg");
// frequencyStatisticalQueryParam.setFrequencyStart(temp.getHarmStart());
// frequencyStatisticalQueryParam.setFrequencyEnd(temp.getHarmEnd());
// List<ThdDataVO> thdDataVOList = stableDataService.QuerySqlData(frequencyStatisticalQueryParam);
// tempList.addAll(thdDataVOList);
//
// }else {
// CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam();
// commonStatisticalQueryParam.setDevId(devId);
// commonStatisticalQueryParam.setStatisticalId(temp.getId());
// commonStatisticalQueryParam.setValueType("avg");
// List<ThdDataVO> listFuture= stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam);
// tempList.addAll(listFuture);
// }
//
// });
// //过滤M相
// List<ThdDataVO> m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
// m.stream().forEach(temp->{
// Stream.of("A","B","C").forEach(phase->{
// ThdDataVO thdDataVO = new ThdDataVO();
// BeanUtils.copyProperties(temp,thdDataVO);
// thdDataVO.setPhase(phase);
// result.add(thdDataVO);
// });
// });
// List<ThdDataVO> apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList());
// Map<String, List<ThdDataVO>> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
// collect3.forEach((k,v)->{
// if(!CollectionUtil.isEmpty(v)){
// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
// ThdDataVO thdDataVO = new ThdDataVO();
// BeanUtils.copyProperties(v.get(0),thdDataVO);
// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
// thdDataVO.setPhase("avg");
// result.add(thdDataVO);
// }
// });
// List<ThdDataVO> apfRmsI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_RmsI_TolOut(A)")).collect(Collectors.toList());
// Map<String, List<ThdDataVO>> collect2 = apfRmsI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
// collect2.forEach((k,v)->{
// if(!CollectionUtil.isEmpty(v)){
// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
// ThdDataVO thdDataVO = new ThdDataVO();
// BeanUtils.copyProperties(v.get(0),thdDataVO);
// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
// thdDataVO.setPhase("avg");
// result.add(thdDataVO);
// }
// });
// List<ThdDataVO> apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList());
// Map<String, List<ThdDataVO>> collect4 = apfThdISys.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId));
// collect4.forEach((k,v)->{
// if(!CollectionUtil.isEmpty(v)){
// double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble();
// ThdDataVO thdDataVO = new ThdDataVO();
// BeanUtils.copyProperties(v.get(0),thdDataVO);
// thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble)));
// thdDataVO.setPhase("avg");
// result.add(thdDataVO);
//
// }
// });
// Double capacity = devCapacityFeignClient.getDevCapacity(devId).getData();
// apfRmsI.forEach(temp->{
// ThdDataVO thdDataVO = new ThdDataVO();
// BeanUtils.copyProperties(temp,thdDataVO);
// thdDataVO.setUnit("%");
// thdDataVO.setStatisticalName("load_Rate");
// thdDataVO.setAnotherName("负载率");
// if (capacity<=0){
// thdDataVO.setStatisticalData(3.1415926);
// }else {
// double v = temp.getStatisticalData()*100 / capacity;
// thdDataVO.setStatisticalData(Double.valueOf(df.format(v)));
// }
// result.add(thdDataVO);
// });
// List<ThdDataVO> notM = tempList.stream().filter(temp -> !Objects.equals(temp.getPhase(), "M")).collect(Collectors.toList());
// result.addAll(notM);
// Gson gson = new Gson();
// topoDataJson = gson.toJson(result);
// redisUtil.saveByKeyWithExpire(devId, (Object) topoDataJson, 30L);
// }
// publisher.send("/zl/devData/"+devId,topoDataJson,1,false);
// }
/**
* 实时数据应答
*/