提交配置文件

This commit is contained in:
hzj
2025-04-07 11:24:58 +08:00
parent 72e63be140
commit 92c64873e6

View File

@@ -1,173 +1,173 @@
package com.njcn.message.consumer; //package com.njcn.message.consumer;
//
import com.alibaba.fastjson.JSONObject; //import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus; //import com.njcn.message.constant.MessageStatus;
import com.njcn.message.messagedto.MessageDataDTO; //import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant; //import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; //import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.RedisKeyEnum; //import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil; //import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.api.MessAnalysisFeignClient; //import com.njcn.stat.api.MessAnalysisFeignClient;
import com.njcn.system.api.RocketMqLogFeignClient; //import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog; //import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; //import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; //import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import javax.annotation.Resource; //import javax.annotation.Resource;
import java.util.ArrayList; //import java.util.ArrayList;
import java.util.List; //import java.util.List;
import java.util.Objects; //import java.util.Objects;
//
//
/** ///**
* 类的介绍: // * 类的介绍:
* // *
* @author xuyang // * @author xuyang
* @version 1.0.0 // * @version 1.0.0
* @createTime 2023/8/11 15:32 // * @createTime 2023/8/11 15:32
*/ // */
@Component //@Component
@RocketMQMessageListener( //@RocketMQMessageListener(
topic = "Test_Topic", // topic = "Test_Topic",
consumerGroup = "Test_consumer", // consumerGroup = "Test_consumer",
selectorExpression = "Test_Tag||Test_Keys", // selectorExpression = "Test_Tag||Test_Keys",
consumeThreadNumber = 10, // consumeThreadNumber = 10,
enableMsgTrace = true // enableMsgTrace = true
) //)
@Slf4j //@Slf4j
public class FrontDataConsumerTest extends EnhanceConsumerMessageHandler<MessageDataDTO> implements RocketMQListener<String> { //public class FrontDataConsumerTest extends EnhanceConsumerMessageHandler<MessageDataDTO> implements RocketMQListener<String> {
//
@Autowired // @Autowired
private MessAnalysisFeignClient messAnalysisFeignClient; // private MessAnalysisFeignClient messAnalysisFeignClient;
//
//
//
@Resource // @Resource
private RedisUtil redisUtil; // private RedisUtil redisUtil;
//
@Resource // @Resource
private RocketMqLogFeignClient rocketMqLogFeignClient; // private RocketMqLogFeignClient rocketMqLogFeignClient;
//
//
private List<MessageDataDTO> messageList = new ArrayList<>(1); // private List<MessageDataDTO> messageList = new ArrayList<>(1);
//
@Override // @Override
public void onMessage(String baseMessage) { // public void onMessage(String baseMessage) {
MessageDataDTO messageDataDTO = JSONObject.parseObject(baseMessage,MessageDataDTO.class); // MessageDataDTO messageDataDTO = JSONObject.parseObject(baseMessage,MessageDataDTO.class);
super.dispatchMessage(messageDataDTO); // super.dispatchMessage(messageDataDTO);
//
} // }
//
/*** // /***
* 通过redis分布式锁判断当前消息所处状态 // * 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行 // * 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行 // * 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去 // * 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去 // * 4、success 最近72小时消费成功避免重复消费打回去
*/ // */
@Override // @Override
public boolean filter(MessageDataDTO message) { // public boolean filter(MessageDataDTO message) {
String keyStatus = redisUtil.getStringByKey(message.getKey()); // String keyStatus = redisUtil.getStringByKey(message.getKey());
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { // if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.BEING_PROCESSED, 60L); // redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.BEING_PROCESSED, 60L);
return false; // return false;
} // }
return true; // return true;
} // }
/** // /**
* 消费成功缓存到redis72小时避免重复消费 // * 消费成功缓存到redis72小时避免重复消费
*/ // */
@Override // @Override
protected void consumeSuccess(MessageDataDTO message) { // protected void consumeSuccess(MessageDataDTO message) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); // redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
} // }
//
//
@Override // @Override
protected void handleMessage(MessageDataDTO message) { // protected void handleMessage(MessageDataDTO message) {
synchronized (messageList) { // synchronized (messageList) {
messageList.add(message); // messageList.add(message);
if (messageList.size() >= 1) { // if (messageList.size() >= 1) {
saveToDatabase(); // saveToDatabase();
} // }
} // }
} // }
//
//
//
//
/** // /**
* 发生异常时,进行错误信息入库保存 // * 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存 // * 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/ // */
@Override // @Override
protected void saveExceptionMsgLog(MessageDataDTO message, String identity, Exception exception) { // protected void saveExceptionMsgLog(MessageDataDTO message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); // redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); // RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey()); // rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource()); // rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { // if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因 // //数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage(); // String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){ // if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180); // exceptionMsg = exceptionMsg.substring(0,180);
} // }
rocketmqMsgErrorLog.setRecord(exceptionMsg); // rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过 // //如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ // if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常 // //单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog); // rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
} // }
} else { // } else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); // rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常 // //重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog); // rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
} // }
} // }
//
//
/*** // /***
* 处理失败后,是否重试 // * 处理失败后,是否重试
* 一般开启 // * 一般开启
*/ // */
@Override // @Override
protected boolean isRetry() { // protected boolean isRetry() {
return false; // return false;
} // }
//
//
/*** // /***
* 消费失败是否抛出异常,抛出异常后就不再消费了 // * 消费失败是否抛出异常,抛出异常后就不再消费了
*/ // */
@Override // @Override
protected boolean throwException() { // protected boolean throwException() {
return false; // return false;
} // }
//
//
//
//50个消息做一组插入数据库 // //50个消息做一组插入数据库
public void saveToDatabase(){ // public void saveToDatabase(){
try { // try {
long start = System.currentTimeMillis(); // long start = System.currentTimeMillis();
//
messAnalysisFeignClient.analysis(messageList); // messAnalysisFeignClient.analysis(messageList);
//
long end = System.currentTimeMillis(); // long end = System.currentTimeMillis();
log.info("处理120条消息所需时间------------"+(end-start)); // log.info("处理120条消息所需时间------------"+(end-start));
}catch (Exception e){{ // }catch (Exception e){{
log.info(e.toString()); // log.info(e.toString());
} // }
}finally{ // }finally{
messageList.clear(); // messageList.clear();
} // }
//
} // }
//
//
//
//
} //}