通讯表入库开发

This commit is contained in:
hzj
2025-03-11 18:18:11 +08:00
parent 7b6fd159c0
commit 49daa2cc47
14 changed files with 294 additions and 2 deletions

View File

@@ -1,10 +1,14 @@
package com.njcn.stat.api;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.message.messagedto.DevComFlagDTO;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.stat.api.fallback.MessAnalysisFeignClientFallbackFactory;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -26,4 +30,9 @@ import java.util.List;
public interface MessAnalysisFeignClient {
@PostMapping("/analysis")
HttpResult<String> analysis(@RequestBody List<MessageDataDTO> messageList);
@ApiOperation("处理设备状态数据")
@PostMapping("/handleDevRunflag")
HttpResult<String> handleDevRunflag(@RequestBody DevComFlagDTO devComFlagDTO);
}

View File

@@ -4,6 +4,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.message.messagedto.DevComFlagDTO;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.stat.api.MessAnalysisFeignClient;
import com.njcn.stat.utils.StatEnumUtil;
@@ -40,6 +41,12 @@ public class MessAnalysisFeignClientFallbackFactory implements FallbackFactory<M
log.error("{}异常,降级处理,异常为:{}", "消息数据解析", throwable.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<String> handleDevRunflag(DevComFlagDTO devComFlagDTO) {
log.error("{}异常,降级处理,异常为:{}", "处理设备状态数据", throwable.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.message.messagedto.DevComFlagDTO;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.stat.service.MessageAnalysisService;
import com.njcn.web.controller.BaseController;
@@ -44,6 +45,16 @@ public class MessageAnalysisController extends BaseController {
messageAnalysisService.analysis(messageList);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, CommonResponseEnum.SUCCESS.getMessage(), methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/handleDevRunflag")
@ApiOperation("处理设备状态数据")
public HttpResult<String> handleDevRunflag(@RequestBody DevComFlagDTO devComFlagDTO){
String methodDescribe = getMethodDescribe("handleDevRunflag");
messageAnalysisService.handleDevRunflag(devComFlagDTO);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, CommonResponseEnum.SUCCESS.getMessage(), methodDescribe);
}
}

View File

@@ -1,6 +1,7 @@
package com.njcn.stat.service;
import com.njcn.message.messagedto.DevComFlagDTO;
import com.njcn.message.messagedto.MessageDataDTO;
import java.util.List;
@@ -14,4 +15,6 @@ import java.util.List;
*/
public interface MessageAnalysisService {
void analysis(List<MessageDataDTO> messageList);
void handleDevRunflag(DevComFlagDTO devComFlagDTO);
}

View File

@@ -1,11 +1,16 @@
package com.njcn.stat.service.impl;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.alibaba.fastjson.JSONObject;
import com.njcn.dataProcess.api.LnDataDealFeignClient;
import com.njcn.dataProcess.api.PqsCommunicateFeignClient;
import com.njcn.dataProcess.api.RmpEventDetailFeignClient;
import com.njcn.dataProcess.dto.*;
import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto;
import com.njcn.message.enums.DataTypeEnum;
import com.njcn.message.messagedto.DevComFlagDTO;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.stat.messagedto.*;
import com.njcn.stat.service.MessageAnalysisService;
@@ -33,7 +38,8 @@ public class MessageAnalysisServiceImpl implements MessageAnalysisService {
@Autowired
private LnDataDealFeignClient lnDataDealFeignClient;
@Autowired
private PqsCommunicateFeignClient pqsCommunicateFeignClient;
// @Autowired
// private RmpEventDetailFeignClient rmpEventDetailFeignClient;
@Override
@@ -509,4 +515,14 @@ public class MessageAnalysisServiceImpl implements MessageAnalysisService {
lnDataDealFeignClient.batchInsertion(lnDataDTO);
}
@Override
public void handleDevRunflag(DevComFlagDTO devComFlagDTO) {
PqsCommunicateDto pqsCommunicateDto = new PqsCommunicateDto();
pqsCommunicateDto.setTime(LocalDateTimeUtil.format(devComFlagDTO.getDate(), DatePattern.NORM_DATETIME_PATTERN));
pqsCommunicateDto.setDevId(devComFlagDTO.getId());
pqsCommunicateDto.setType(devComFlagDTO.getStatus());
pqsCommunicateFeignClient.insertion(pqsCommunicateDto);
}
}

View File

@@ -5,6 +5,7 @@ import com.njcn.common.pojo.response.HttpResult;
import com.njcn.dataProcess.api.fallback.DataInharmVFeignClientFallbackFactory;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -44,4 +45,8 @@ public interface PqsCommunicateFeignClient {
@PostMapping("/getRawDataEnd")
HttpResult<List<PqsCommunicateDto>> getRawDataEnd(@RequestBody LineCountEvaluateParam lineParam);
@PostMapping("/insertion")
@ApiOperation("插入数据")
HttpResult<String> insertion(@RequestBody PqsCommunicateDto pqsCommunicateDto);
}

View File

@@ -57,6 +57,12 @@ public class PqsCommunicateFeignClientFallbackFactory implements FallbackFactory
log.error("{}异常,降级处理,异常为:{}","获取是否有当天最后一条数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<String> insertion(PqsCommunicateDto pqsCommunicateDto) {
log.error("{}异常,降级处理,异常为:{}","通讯表入库",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -62,6 +62,17 @@
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>pq-device-api</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>pqs-influx</artifactId>
<groupId>com.njcn</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>

View File

@@ -71,4 +71,13 @@ public class PqsCommunicateController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, rawData, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/insertion")
@ApiOperation("插入数据")
public HttpResult<String> insertion(@RequestBody PqsCommunicateDto pqsCommunicateDto) {
String methodDescribe = getMethodDescribe("insertion");
pqsCommunicateInsert.insertion(pqsCommunicateDto);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -34,4 +34,5 @@ public interface IPqsCommunicate {
*/
List<PqsCommunicateDto> getRawDataEnd(LineCountEvaluateParam lineParam);
void insertion(PqsCommunicateDto pqsCommunicateDto);
}

View File

@@ -2,21 +2,30 @@ package com.njcn.dataProcess.service.impl.influxdb;
import com.njcn.dataProcess.dao.imapper.PqsCommunicateMapper;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.po.influx.DataHarmrateV;
import com.njcn.dataProcess.po.influx.DataV;
import com.njcn.dataProcess.po.influx.PqsCommunicate;
import com.njcn.dataProcess.pojo.dto.DataHarmDto;
import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto;
import com.njcn.dataProcess.service.IPqsCommunicate;
import com.njcn.device.pq.api.DeviceFeignClient;
import com.njcn.device.pq.pojo.dto.DevComFlagDTO;
import com.njcn.influx.query.InfluxQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
@@ -31,7 +40,8 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
@Resource
private PqsCommunicateMapper pqsCommunicateMapper;
private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
@Autowired
private DeviceFeignClient deviceFeignClient;
@Override
public List<PqsCommunicateDto> getRawDataLatest(LineCountEvaluateParam lineParam) {
List<PqsCommunicateDto> result = new ArrayList<>();
@@ -96,6 +106,36 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
return result;
}
@Override
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
//获取最新一条数据
PqsCommunicate dto = new PqsCommunicate();
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class);
influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1);
List<PqsCommunicate> pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper);
PqsCommunicate pqsCommunicate = new PqsCommunicate();
pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant());
pqsCommunicate.setDevId(pqsCommunicateDto.getDevId());
pqsCommunicate.setType(pqsCommunicateDto.getType());
//如果不存数据或者状态不一样则插入数据
if(CollectionUtils.isEmpty(pqsCommunicates)|| !Objects.equals( pqsCommunicates.get(0).getType(),pqsCommunicateDto.getType())){
pqsCommunicateMapper.insertOne(pqsCommunicate);
//更新mysql数据
DevComFlagDTO devComFlagDTO = new DevComFlagDTO();
devComFlagDTO.setId(pqsCommunicateDto.getDevId());
devComFlagDTO.setDate(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER));
devComFlagDTO.setStatus(pqsCommunicateDto.getType());
deviceFeignClient.updateDevComFlag(devComFlagDTO);
}
}
/**
* 按监测点集合、时间条件获取dataV分钟数据
* timeMap参数来判断是否进行数据出来 timeMap为空则不进行数据处理

View File

@@ -33,4 +33,9 @@ public class RelationPqsCommunicateImpl implements IPqsCommunicate {
public List<PqsCommunicateDto> getRawDataEnd(LineCountEvaluateParam lineParam) {
return Collections.emptyList();
}
@Override
public void insertion(PqsCommunicateDto pqsCommunicateDto) {
}
}

View File

@@ -0,0 +1,26 @@
package com.njcn.message.messagedto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.njcn.middle.rocket.domain.BaseMessage;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Description:
* Date: 2024/11/5 15:07【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class DevComFlagDTO extends BaseMessage implements Serializable {
private String id;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime date;
private Integer status;
}

View File

@@ -0,0 +1,143 @@
package com.njcn.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.messagedto.DevComFlagDTO;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.message.websocket.WebSocketServer;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.api.MessAnalysisFeignClient;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* Description:
* Date: 2024/12/13 10:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@RocketMQMessageListener(
topic = "Device_Run_Flag_Topic",
consumerGroup = "Device_Run_Flag_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler<DevComFlagDTO> implements RocketMQListener<String> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Autowired
private MessAnalysisFeignClient messAnalysisFeignClient;
@Override
public void onMessage(String message) {
DevComFlagDTO devComFlagDTO = JSONObject.parseObject(message,DevComFlagDTO.class);
super.dispatchMessage(devComFlagDTO);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(DevComFlagDTO message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(DevComFlagDTO message) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
@Override
protected void handleMessage(DevComFlagDTO message) {
messAnalysisFeignClient.handleDevRunflag(message);
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(DevComFlagDTO message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
}