1.物解析功能-统计数据解析完成

2.物消息路由转发功能完成
This commit is contained in:
2023-08-17 20:25:19 +08:00
parent 4740f2b4d0
commit f8b240cf5f
6 changed files with 112 additions and 23 deletions

View File

@@ -0,0 +1,67 @@
package com.njcn.stat.listener;
import cn.hutool.core.collection.CollectionUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.pojo.dto.EpdDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author hongawen
* @version 1.0.0
* @date 2022年04月02日 14:31
*/
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Resource
private EpdFeignClient epdFeignClient;
@Resource
private RedisUtil redisUtil;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 针对redis数据失效事件进行数据处理
* 注意message.toString()可以获取失效的key
*/
@Override
@Order(0)
public void onMessage(Message message, byte[] pattern) {
if (StringUtils.isBlank(message.toString())) {
return;
}
//判断失效的key
String expiredKey = message.toString();
if(expiredKey.equals(AppRedisKey.ELE_EPD_PQD)){
Map<String,String> map = new HashMap<>();
List<EpdDTO> list = epdFeignClient.findAll().getData();
if (CollectionUtil.isEmpty(list)){
throw new BusinessException(StatResponseEnum.DICT_NULL);
}
list.forEach(item->{
map.put(item.getDictName(),item.getTableName());
});
redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L);
}
}
}

View File

@@ -1,9 +1,7 @@
package com.njcn.stat.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.pojo.dto.CsModelDto;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
@@ -25,6 +23,9 @@ import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.system.pojo.po.DictData;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -55,9 +56,12 @@ public class StatServiceImpl implements IStatService {
private final RedisUtil redisUtil;
private final Integer NUMBER = 200;
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppAutoDataMessage appAutoDataMessage) {
log.info("开始消费{},发送时间{}",appAutoDataMessage.getKey(),appAutoDataMessage.getSendTime());
//1.根据设备网络识别码获取设备id查询到所用的模板用来判断模板的类型(治理模板还是电能质量模板)
//2.解析appAutoDataMessage的Did来判断当前数据是治理数据还是电能质量数据
//3-1.治理数据则获取治理的dataArray并且查询治理的监测点
@@ -85,6 +89,7 @@ public class StatServiceImpl implements IStatService {
saveData();
}
if (CollectionUtil.isNotEmpty(list)){
List<String> recordList = new ArrayList<>();
for (AppAutoDataMessage.DataArray item : list) {
switch (item.getDataAttr()) {
case 1:
@@ -113,7 +118,12 @@ public class StatServiceImpl implements IStatService {
} else {
dataArrayList = objectToList(object);
}
insertData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod());
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod());
recordList.addAll(result);
}
if (CollectionUtil.isNotEmpty(recordList)){
//influx数据批量入库
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, recordList);
}
}
}
@@ -155,7 +165,7 @@ public class StatServiceImpl implements IStatService {
list.forEach(item->{
map.put(item.getDictName(),item.getTableName());
});
redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,600L);
redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L);
}
/**
@@ -173,9 +183,10 @@ public class StatServiceImpl implements IStatService {
/**
* influxDB数据入库
* influxDB数据组装
*/
public void insertData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod) {
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod) {
List<String> records = new ArrayList<String>();
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
if (CollectionUtil.isEmpty(floats)){
@@ -185,8 +196,13 @@ public class StatServiceImpl implements IStatService {
if (!Objects.equals(dataArrayList.size(),floats.size())){
throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH);
}
//判断字典数据是否存在
if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){
saveData();
}
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
for (int i = 0; i < dataArrayList.size(); i++) {
String tableName = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class).get(dataArrayList.get(i).getName()).toString();
String tableName = map.get(dataArrayList.get(i).getName());
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.LINE_ID,lineId);
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
@@ -195,8 +211,12 @@ public class StatServiceImpl implements IStatService {
Map<String,Object> fields = new HashMap<>();
fields.put(dataArrayList.get(i).getName(),floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
influxDbUtils.insert(tableName, tags, fields, item.getDataTimeSec(), TimeUnit.SECONDS);
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec(), TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
}
return records;
}
public List<CsDataArray> objectToList(Object object) {