1.物消息路由转发功能

2.物解析功能
This commit is contained in:
2023-08-14 21:06:19 +08:00
parent 02f5f7c031
commit 1d75cce63d
34 changed files with 869 additions and 449 deletions

View File

@@ -11,8 +11,34 @@
<artifactId>stat-api</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-db</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-microservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-mq</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<name>stat-api</name>
<name>stat-api</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

View File

@@ -0,0 +1,19 @@
package com.njcn.stat.api;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.stat.api.fallback.RtClientFallbackFactory;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
/**
* @author xy
*/
@FeignClient(value = ServerInfo.CS_STAT_BOOT, path = "/stat", fallbackFactory = RtClientFallbackFactory.class,contextId = "stat")
public interface RtFeignClient {
@PostMapping("/analysis")
HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage);
}

View File

@@ -0,0 +1,35 @@
package com.njcn.stat.api.fallback;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.stat.api.RtFeignClient;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author xy
*/
@Slf4j
@Component
public class RtClientFallbackFactory implements FallbackFactory<RtFeignClient> {
@Override
public RtFeignClient create(Throwable cause) {
//判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if (cause.getCause() instanceof BusinessException) {
BusinessException businessException = (BusinessException) cause.getCause();
}
Enum<?> finalExceptionEnum = exceptionEnum;
return new RtFeignClient() {
@Override
public HttpResult<String> analysis(AppAutoDataMessage appAutoDataMessage) {
log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -0,0 +1,37 @@
package com.njcn.stat.enums;
import lombok.Getter;
/**
* @author xuyang
* @version 1.0.0
* @date 2023年04月17日 10:50
*/
@Getter
public enum StatResponseEnum {
/**
* A1001 ~ A1099 用于用户模块的枚举
* <p>
*/
STAT_ERROR("A10001","统计数据模块错误"),
DATA_ARRAY_NULL("A10002","详细数据为空"),
AUTO_DATA_NULL("A10002","上送数据为空"),
DICT_NULL("A10002","字典数据为空"),
LINE_NULL("A10002","监测点为空"),
ARRAY_DATA_NOT_MATCH("A10003","上送数据与模板匹配失败"),
;
private final String code;
private final String message;
StatResponseEnum(String code, String message) {
this.code = code;
this.message = message;
}
}

View File

@@ -40,6 +40,42 @@
<artifactId>common-db</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>access-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-mq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>cs-device-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>stat-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>system-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>pqs-influx</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-influxDB</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,48 @@
package com.njcn.stat.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.stat.service.IStatService;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/14 9:23
*/
@Slf4j
@RestController
@RequestMapping("/stat")
@Api(tags = "统计数据")
@AllArgsConstructor
public class StatController extends BaseController {
private final IStatService statService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/analysis")
@ApiOperation("数据解析")
@ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true)
public HttpResult<String> analysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage){
String methodDescribe = getMethodDescribe("analysis");
statService.analysis(appAutoDataMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,63 @@
package com.njcn.stat.handler;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
import com.github.tocrhz.mqtt.annotation.NamedValue;
import com.github.tocrhz.mqtt.annotation.Payload;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.redis.utils.RedisUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.validation.Validator;
import java.nio.charset.StandardCharsets;
/**
* @author hongawen
* @version 1.0.0
* @date 2022年03月23日 09:41
*/
@Slf4j
@Component
@AllArgsConstructor
public class MqttMessageHandler {
private final MqttPublisher publisher;
private final RedisUtil redisUtil;
@Autowired
Validator validator;
/**
* 装置心跳
* @param topic
* @param message
* @param version
* @param nDid
* @param payload
*/
@MqttSubscribe(value = "/Dev/Data/{version}/{edgeId}",qos = 1)
@Transactional(rollbackFor = Exception.class)
public void devHeartBeat(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) {
//解析数据
Gson gson = new Gson();
ReqAndResDto.Req res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Req.class);
//响应请求
switch (res.getType()){
case 4866:
//处理主动上送数据
break;
default:
break;
}
}
}

View File

@@ -0,0 +1,16 @@
package com.njcn.stat.service;
import com.njcn.mq.message.AppAutoDataMessage;
/**
* @author xy
*/
public interface IStatService {
/**
* 解析统计数据
* @param appAutoDataMessage
*/
void analysis(AppAutoDataMessage appAutoDataMessage);
}

View File

@@ -0,0 +1,177 @@
package com.njcn.stat.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.pojo.param.DataArrayParam;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.stat.service.IStatService;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.enums.SystemResponseEnum;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.system.pojo.po.Dic;
import com.njcn.system.pojo.po.DictData;
import com.njcn.system.pojo.po.EleEpdPqd;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.net.Inet4Address;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/14 9:32
*/
@Service
@Slf4j
@AllArgsConstructor
public class StatServiceImpl implements IStatService {
private final DataArrayFeignClient dataArrayFeignClient;
private final EpdFeignClient epdFeignClient;
private final DicDataFeignClient dicDataFeignClient;
private final InfluxDbUtils influxDbUtils;
private final CsLineFeignClient csLineFeignClient;
private final RedisUtil redisUtil;
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppAutoDataMessage appAutoDataMessage) {
//1.根据设备网络识别码获取设备id查询到所用的模板用来判断模板的类型(治理模板还是电能质量模板)
//2.解析appAutoDataMessage的Did来判断当前数据是治理数据还是电能质量数据
//3-1.治理数据则获取治理的dataArray并且查询治理的监测点
//3-2.电能质量数据则还需要判断Cldid的值在dataSet里面筛选正确的数据集查询dataArray(默认Cldid=1为负载侧监测点、Cldid=2为电网侧监测点)
//4.查询dataArray数据查询对应的字典数据然后将数据组装录入influxDB
DataArrayParam dataArrayParam = new DataArrayParam();
dataArrayParam.setId(appAutoDataMessage.getId());
dataArrayParam.setDid(appAutoDataMessage.getDid());
dataArrayParam.setCldId(appAutoDataMessage.getMsg().getClDid());
List<AppAutoDataMessage.DataArray> list = appAutoDataMessage.getMsg().getDataArray();
//获取监测点id
String lineId = lineInfo(appAutoDataMessage.getId(),appAutoDataMessage.getMsg().getClDid());
//缓存指标和influxDB表关系
saveData();
if (CollectionUtil.isNotEmpty(list)){
list.forEach(item->{
switch (item.getDataAttr()) {
case 1:
log.info("处理最大值");
dataArrayParam.setStatMethod("max");
break;
case 2:
log.info("处理最小值");
dataArrayParam.setStatMethod("min");
break;
case 3:
log.info("处理avg");
dataArrayParam.setStatMethod("avg");
break;
case 4:
log.info("处理cp95");
dataArrayParam.setStatMethod("cp95");
break;
default:
break;
}
insertData(lineId,dataArrayParam,item);
});
}
}
/**
* 获取监测点相关信息
*/
public String lineInfo(String id, Integer clDid) {
Map<Integer,String> map = new HashMap<>();
List<CsLinePO> lineList = csLineFeignClient.findByNdid(id).getData();
if (CollectionUtil.isEmpty(lineList)){
throw new BusinessException(StatResponseEnum.LINE_NULL);
}
lineList.forEach(item->{
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.isNull(dictData)){
throw new BusinessException(StatResponseEnum.DICT_NULL);
}
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(2,item.getLineId());
}
});
return map.get(clDid);
}
/**
* 缓存字典和influxDB表关系
*/
public void saveData() {
Map<String,String> map = new HashMap<>();
List<EpdDTO> list = epdFeignClient.findAll().getData();
if (CollectionUtil.isEmpty(list)){
throw new BusinessException(StatResponseEnum.DICT_NULL);
}
list.forEach(item->{
map.put(item.getDictName(),item.getTableName());
});
redisUtil.saveByKeyWithExpire("ELEEPDPQD",map,180L);
}
/**
* influxDB数据入库
*/
public void insertData(String lineId,DataArrayParam dataArrayParam,AppAutoDataMessage.DataArray item) {
//获取详细数据
List<CsDataArray> dataArrayList = dataArrayFeignClient.findListByParam(dataArrayParam).getData();
if (CollectionUtil.isEmpty(dataArrayList)){
throw new BusinessException(StatResponseEnum.DATA_ARRAY_NULL);
}
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
if (CollectionUtil.isEmpty(floats)){
throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL);
}
//校验模板和解码数据数量能否对应上
if (!Objects.equals(dataArrayList.size(),floats.size())){
throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH);
}
for (int i = 0; i < dataArrayList.size(); i++) {
String tableName = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey("ELEEPDPQD")), Map.class).get(dataArrayList.get(i).getName()).toString();
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.LINE_ID,lineId);
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
tags.put(InfluxDBTableConstant.VALUE_TYPE,dataArrayList.get(i).getStatMethod());
Map<String,Object> fields = new HashMap<>();
fields.put(dataArrayList.get(i).getName(),floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
influxDbUtils.insert(tableName, tags, fields, item.getDataTimeSec(), TimeUnit.SECONDS);
}
}
}