新增云协议相关内容

This commit is contained in:
xy
2025-10-11 09:20:41 +08:00
parent 37ffaa3ad4
commit 0d4db672e1
12 changed files with 237 additions and 29 deletions

View File

@@ -47,6 +47,8 @@ public class RealDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDataM
@Override
protected void handleMessage(AppAutoDataMessage appAutoDataMessage) {
log.info("分发至实时数据");
String lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
redisUtil.saveByKeyWithExpire("devResponse:" + lineId,200,5L);
rtFeignClient.analysis(appAutoDataMessage);
}

View File

@@ -0,0 +1,147 @@
package com.njcn.message.consumer;
import cn.hutool.core.util.ObjectUtil;
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.UpdateLedgerMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 类的介绍:接收前置响应台账更新相关信息
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:32
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.REPLY_TOPIC,
consumerGroup = BusinessTopic.REPLY_TOPIC,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class UpdateLedgerConsumer extends EnhanceConsumerMessageHandler<UpdateLedgerMessage> implements RocketMQListener<UpdateLedgerMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Resource
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void handleMessage(UpdateLedgerMessage updateLedgerMessage) {
log.info("分发至更新台账响应处理程序");
//收到消息修改(cs_terminal_reply)
List<UpdateLedgerMessage.HandleData> data = updateLedgerMessage.getData();
if (ObjectUtil.isNotEmpty(data)) {
data.forEach(item->{
if (item.getCode() == 200) {
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),1,item.getDeviceId());
} else {
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),2,item.getDeviceId());
}
});
}
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(UpdateLedgerMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(UpdateLedgerMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(UpdateLedgerMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(UpdateLedgerMessage message) {
super.dispatchMessage(message);
}
}