无线系统实时数据功能

This commit is contained in:
xy
2024-10-11 18:18:30 +08:00
parent 68e28880d0
commit cfd395a11c
28 changed files with 868 additions and 122 deletions

View File

@@ -5,6 +5,7 @@ import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.DependsOn;
/**
@@ -13,6 +14,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
* @date 2021年12月09日 20:59
*/
@Slf4j
@DependsOn("proxyMapperRegister")
@MapperScan("com.njcn.**.mapper")
@EnableFeignClients(basePackages = "com.njcn")
@SpringBootApplication(scanBasePackages = "com.njcn")

View File

@@ -5,6 +5,8 @@ import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.rt.service.IRtService;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
@@ -31,16 +33,17 @@ import org.springframework.web.bind.annotation.RestController;
@AllArgsConstructor
public class RtController extends BaseController {
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @PostMapping("/analysis")
// @ApiOperation("数据解析")
// @ApiImplicitParam(name = "csDataEffectiveAddParm", value = "新增app数据有效性表参数", required = true)
// public HttpResult<Boolean> addDataEffective(@RequestBody @Validated CsDataEffectiveAddParm csDataEffectiveAddParm){
// String methodDescribe = getMethodDescribe("addDataEffective");
//
// boolean save = csDataEffectiveService.add (csDataEffectiveAddParm);
// return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, save, methodDescribe);
// }
private final IRtService rtService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/rtAnalysis")
@ApiOperation("实时数据解析")
@ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true)
public HttpResult<String> analysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage){
String methodDescribe = getMethodDescribe("analysis");
rtService.analysis(appAutoDataMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,11 @@
package com.njcn.rt.service;
import com.njcn.mq.message.AppAutoDataMessage;
/**
* @author xy
*/
public interface IRtService {
void analysis(AppAutoDataMessage appAutoDataMessage);
}

View File

@@ -0,0 +1,275 @@
package com.njcn.rt.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DataSetFeignClient;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDataSet;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.enums.RtResponseEnum;
import com.njcn.rt.pojo.dto.BaseRealDataSet;
import com.njcn.rt.pojo.dto.HarmData;
import com.njcn.rt.pojo.dto.HarmRealDataSet;
import com.njcn.rt.service.IRtService;
import com.njcn.web.utils.FloatUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.lang.reflect.Field;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author xy
*/
@Service
@RequiredArgsConstructor
public class RtServiceImpl implements IRtService {
private final CsLineFeignClient csLineFeignClient;
private final DataSetFeignClient dataSetFeignClient;
private final DataArrayFeignClient dataArrayFeignClient;
private final RedisUtil redisUtil;
private final ChannelObjectUtil channelObjectUtil;
private final MqttPublisher publisher;
@Override
public void analysis(AppAutoDataMessage appAutoDataMessage) {
List<CsDataArray> dataArrayList;
//监测点id
String lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
//获取监测点基础信息
CsLinePO po = csLineFeignClient.getById(lineId).getData();
//获取数据集 dataSet
Integer idx = appAutoDataMessage.getMsg().getDsNameIdx();
CsDataSet dataSet = dataSetFeignClient.getDataSetByIdx(po.getDataModelId(),idx).getData();
//根据数据集获取指标 dataArray
//实时数据数据集不区分最大最小类型,因此数据集取平均值用于解析
String key = "BaseRealData:" + lineId + idx;
Object object = redisUtil.getObjectByKey(key);
if (Objects.isNull(object)){
dataArrayList = saveBaseRealDataSet(key,dataSet.getId());
} else {
dataArrayList = channelObjectUtil.objectToList(object,CsDataArray.class);
}
//根据dataArray解析数据
AppAutoDataMessage.DataArray item = appAutoDataMessage.getMsg().getDataArray().get(0);
//fixme 这边先根据数据集的名称来返回对应实体,这边感觉不太合适,后期有好方案再调整
//基础数据
if (Objects.equals(dataSet.getName(),"Ds$Pqd$Rt$Basic$01")) {
BaseRealDataSet baseRealDataSet = assembleData(dataArrayList,item,po.getConType());
baseRealDataSet.setLineId(lineId);
baseRealDataSet.setPt(po.getPtRatio().floatValue());
baseRealDataSet.setCt(po.getCtRatio().floatValue());
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec() - 8*3600;
baseRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false);
}
//fixme 目前实时数据只有基础数据和谐波数据,后期拓展,这边需要再判断
else {
HarmRealDataSet harmRealDataSet = harmData(dataArrayList,item);
harmRealDataSet.setLineId(lineId);
harmRealDataSet.setPt(po.getPtRatio().floatValue());
harmRealDataSet.setCt(po.getCtRatio().floatValue());
harmRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec() - 8*3600;
harmRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(harmRealDataSet), 1, false);
}
}
/**
* 时间处理
*/
public String getTime(long timestamp) {
Instant instant = Instant.ofEpochSecond(timestamp);
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return dateTime.format(formatter);
}
/**
* 缓存实时数据数据集
* @param key
* @param dataSetId
* @return
*/
public List<CsDataArray> saveBaseRealDataSet(String key, String dataSetId) {
List<CsDataArray> dataArrays = dataArrayFeignClient.getArrayBySet(dataSetId).getData();
List<CsDataArray> dataArrayList = dataArrays.stream().filter(item->Objects.equals(item.getStatMethod(),"avg")).collect(Collectors.toList());
redisUtil.saveByKeyWithExpire(key,dataArrayList,600L);
return dataArrayList;
}
/**
* 数据解码
* @return
*/
public Map<String,Float> getData(List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray dataArray) {
Map<String,Float> dataMap = new LinkedHashMap<>();
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(dataArray.getData()));
if (CollectionUtil.isEmpty(floats)){
throw new BusinessException(RtResponseEnum.AUTO_DATA_NULL);
}
//校验模板和解码数据数量能否对应上
if (!Objects.equals(dataArrayList.size(),floats.size())){
throw new BusinessException(RtResponseEnum.ARRAY_DATA_NOT_MATCH);
}
for (int i = 0; i < dataArrayList.size(); i++) {
dataMap.put(dataArrayList.get(i).getName() + dataArrayList.get(i).getPhase(),floats.get(i));
}
return dataMap;
}
public BaseRealDataSet assembleData(List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray dataArray,Integer conType) {
Map<String,Float> dataMap = getData(dataArrayList,dataArray);
return channelData(dataMap,conType);
}
public BaseRealDataSet channelData(Map<String,Float> map,Integer conType) {
BaseRealDataSet baseRealDataSet = new BaseRealDataSet();
//频率
baseRealDataSet.setFreq(FloatUtils.get2Float(map.get("Pq_FreqM")));
//频率偏差
baseRealDataSet.setFreqDev(FloatUtils.get2Float(map.get("Pq_FreqDevM")));
//判断监测点的接线方式,不同接线方式电压有效值取值不同
//星型-相电压 角形、V型-线电压
//电压有效值
if (conType == 0) {
baseRealDataSet.setVRmsA(FloatUtils.get2Float(map.get("Pq_RmsUA")));
baseRealDataSet.setVRmsB(FloatUtils.get2Float(map.get("Pq_RmsUB")));
baseRealDataSet.setVRmsC(FloatUtils.get2Float(map.get("Pq_RmsUC")));
} else {
baseRealDataSet.setVRmsA(FloatUtils.get2Float(map.get("Pq_RmsLUAB")));
baseRealDataSet.setVRmsB(FloatUtils.get2Float(map.get("Pq_RmsLUBC")));
baseRealDataSet.setVRmsC(FloatUtils.get2Float(map.get("Pq_RmsLUCA")));
}
//基波电压幅值
baseRealDataSet.setV1A(FloatUtils.get2Float(map.get("Pq_RmsFundUA")));
baseRealDataSet.setV1B(FloatUtils.get2Float(map.get("Pq_RmsFundUB")));
baseRealDataSet.setV1C(FloatUtils.get2Float(map.get("Pq_RmsFundUC")));
//电流有效值
baseRealDataSet.setIRmsA(FloatUtils.get2Float(map.get("Pq_RmsIA")));
baseRealDataSet.setIRmsB(FloatUtils.get2Float(map.get("Pq_RmsIB")));
baseRealDataSet.setIRmsC(FloatUtils.get2Float(map.get("Pq_RmsIC")));
//基波电流幅值
baseRealDataSet.setI1A(FloatUtils.get2Float(map.get("Pq_RmsFundIA")));
baseRealDataSet.setI1B(FloatUtils.get2Float(map.get("Pq_RmsFundIB")));
baseRealDataSet.setI1C(FloatUtils.get2Float(map.get("Pq_RmsFundIC")));
//电压偏差
baseRealDataSet.setVDevA(FloatUtils.get2Float(map.get("Pq_UDevA")));
baseRealDataSet.setVDevB(FloatUtils.get2Float(map.get("Pq_UDevB")));
baseRealDataSet.setVDevC(FloatUtils.get2Float(map.get("Pq_UDevC")));
//基波电压相位
baseRealDataSet.setV1AngA(FloatUtils.get2Float(map.get("Pq_FundUAngA")));
baseRealDataSet.setV1AngB(FloatUtils.get2Float(map.get("Pq_FundUAngB")));
baseRealDataSet.setV1AngC(FloatUtils.get2Float(map.get("Pq_FundUAngC")));
//基波电流相位
baseRealDataSet.setI1AngA(FloatUtils.get2Float(map.get("Pq_FundIAngA")));
baseRealDataSet.setI1AngB(FloatUtils.get2Float(map.get("Pq_FundIAngB")));
baseRealDataSet.setI1AngC(FloatUtils.get2Float(map.get("Pq_FundIAngC")));
//电压总谐波畸变率
baseRealDataSet.setVThdA(FloatUtils.get2Float(map.get("Pq_ThdUA")));
baseRealDataSet.setVThdB(FloatUtils.get2Float(map.get("Pq_ThdUB")));
baseRealDataSet.setVThdC(FloatUtils.get2Float(map.get("Pq_ThdUC")));
//电流总谐波畸变率
baseRealDataSet.setIThdA(FloatUtils.get2Float(map.get("Pq_ThdIA")));
baseRealDataSet.setIThdB(FloatUtils.get2Float(map.get("Pq_ThdIB")));
baseRealDataSet.setIThdC(FloatUtils.get2Float(map.get("Pq_ThdIC")));
//电压不平衡度
baseRealDataSet.setVUnbalance(FloatUtils.get2Float(map.get("Pq_UnbalNegUM")));
//电流不平衡度
baseRealDataSet.setIUnbalance(FloatUtils.get2Float(map.get("Pq_UnbalNegIM")));
//有功功率
baseRealDataSet.setPA(FloatUtils.get2Float(map.get("Pq_PA")));
baseRealDataSet.setPB(FloatUtils.get2Float(map.get("Pq_PB")));
baseRealDataSet.setPC(FloatUtils.get2Float(map.get("Pq_PC")));
baseRealDataSet.setPTot(FloatUtils.get2Float(map.get("Pq_TotPM")));
//无功功率
baseRealDataSet.setQA(FloatUtils.get2Float(map.get("Pq_QA")));
baseRealDataSet.setQB(FloatUtils.get2Float(map.get("Pq_QB")));
baseRealDataSet.setQC(FloatUtils.get2Float(map.get("Pq_QC")));
baseRealDataSet.setQTot(FloatUtils.get2Float(map.get("Pq_TotQM")));
//视在功率
baseRealDataSet.setSA(FloatUtils.get2Float(map.get("Pq_SA")));
baseRealDataSet.setSB(FloatUtils.get2Float(map.get("Pq_SB")));
baseRealDataSet.setSC(FloatUtils.get2Float(map.get("Pq_SC")));
baseRealDataSet.setSTot(FloatUtils.get2Float(map.get("Pq_TotSM")));
//功率因数
baseRealDataSet.setPfA(FloatUtils.get2Float(map.get("Pq_PFA")));
baseRealDataSet.setPfB(FloatUtils.get2Float(map.get("Pq_PFB")));
baseRealDataSet.setPfC(FloatUtils.get2Float(map.get("Pq_PFC")));
baseRealDataSet.setPfTot(FloatUtils.get2Float(map.get("Pq_TotPFM")));
//基波功率因数
baseRealDataSet.setDpfA(FloatUtils.get2Float(map.get("Pq_DPFA")));
baseRealDataSet.setDpfB(FloatUtils.get2Float(map.get("Pq_DPFB")));
baseRealDataSet.setDpfC(FloatUtils.get2Float(map.get("Pq_DPFC")));
baseRealDataSet.setDpfTot(FloatUtils.get2Float(map.get("Pq_TotDPFM")));
return baseRealDataSet;
}
public HarmRealDataSet harmData(List<CsDataArray> dataArrayList, AppAutoDataMessage.DataArray dataArray) {
HarmRealDataSet harmRealDataSet = new HarmRealDataSet();
List<HarmData> harmDataList = new ArrayList<>();
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(dataArray.getData()));
if (CollectionUtil.isEmpty(floats)){
throw new BusinessException(RtResponseEnum.AUTO_DATA_NULL);
}
//校验模板和解码数据数量能否对应上
if (!Objects.equals(dataArrayList.size(),floats.size())){
throw new BusinessException(RtResponseEnum.ARRAY_DATA_NOT_MATCH);
}
for (int i = 0; i < dataArrayList.size(); i++) {
HarmData harmData = new HarmData();
harmData.setHarmName(dataArrayList.get(i).getName());
harmData.setPhase(dataArrayList.get(i).getPhase());
harmData.setSort(dataArrayList.get(i).getSort());
harmData.setData(floats.get(i));
harmDataList.add(harmData);
}
//根据名称分组,然后在不同相别的数据中取最大值
List<HarmData> maxDataList = new ArrayList<>(harmDataList.stream()
.collect(Collectors.toMap(
HarmData::getHarmName,
Function.identity(),
BinaryOperator.maxBy(Comparator.comparingDouble(HarmData::getData))
))
.values());
//通过反射将数据赋值
Class<?> clazz = HarmRealDataSet.class;
maxDataList.forEach(item->{
if (Objects.equals(item.getHarmName(),"Pq_RmsFundI") || Objects.equals(item.getHarmName(),"Pq_RmsFundU")) {
harmRealDataSet.setData1(FloatUtils.get2Float(item.getData()));
} else {
String numberStr = item.getHarmName().substring(item.getHarmName().lastIndexOf('_') + 1);
String fieldName = "data" + numberStr;
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(harmRealDataSet,FloatUtils.get2Float(item.getData()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
return harmRealDataSet;
}
}