前置日志联调

This commit is contained in:
hzj
2025-05-16 11:18:33 +08:00
parent 976d4bbfd1
commit bb1bc6a324
4 changed files with 180 additions and 4 deletions

View File

@@ -3,7 +3,6 @@ package com.njcn.dataProcess.controller;//package com.njcn.message.websocket;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSONObject;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
@@ -11,7 +10,6 @@ import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.dataProcess.annotation.QueryBean;
import com.njcn.dataProcess.api.DataVFeignClient;
import com.njcn.dataProcess.service.IDataV;
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
@@ -19,7 +17,6 @@ import com.njcn.device.pq.api.DeviceFeignClient;
import com.njcn.device.pq.pojo.dto.DeviceDTO;
import com.njcn.message.api.ProduceFeignClient;
import com.njcn.message.message.RecallMessage;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
@@ -38,7 +35,6 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

View File

@@ -0,0 +1,25 @@
package com.njcn.message.messagedto;
import com.njcn.middle.rocket.domain.BaseMessage;
import lombok.Data;
import java.io.Serializable;
/**
* Description:
* Date: 2024/12/13 10:15【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class FrontLogslMessage extends BaseMessage implements Serializable {
private String nodeId;
private String processNo;
private String businessId;
private String level;
private String logType;
private String grade;
private String frontType;
private String log;
}

View File

@@ -75,6 +75,11 @@
<artifactId>rt-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>system-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>pq-device-api</artifactId>

View File

@@ -0,0 +1,150 @@
package com.njcn.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.message.messagedto.FrontLogslMessage;
import com.njcn.message.messagedto.TopicReplyDTO;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.api.MessAnalysisFeignClient;
import com.njcn.system.api.FrontLogsFeignClient;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.dto.PqFrontLogsDTO;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* Description:
* Date: 2024/12/13 10:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@RocketMQMessageListener(
topic = "log_Topic",
consumerGroup = "Log_Topic_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class TopicLogsConsumer extends EnhanceConsumerMessageHandler<FrontLogslMessage> implements RocketMQListener<String> {
@Resource
private RedisUtil redisUtil;
@Resource
private FrontLogsFeignClient frontLogsFeignClient;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Override
public void onMessage(String message) {
FrontLogslMessage frontLogslMessage = JSONObject.parseObject(message,FrontLogslMessage.class);
super.dispatchMessage(frontLogslMessage);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(FrontLogslMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(FrontLogslMessage message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
@Override
protected void handleMessage(FrontLogslMessage message) {
//业务处理
PqFrontLogsDTO pqFrontLogsDTO = new PqFrontLogsDTO();
BeanUtils.copyProperties(message,pqFrontLogsDTO);
frontLogsFeignClient.addFrontLogs(pqFrontLogsDTO);
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(FrontLogslMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
}