云前置改造-补召功能调整
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package com.njcn.access.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
@@ -18,7 +17,10 @@ import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@@ -151,7 +153,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
|
||||
QueryWrapper<CsEquipmentDeliveryPO> wrapper = new QueryWrapper<>();
|
||||
wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated");
|
||||
wrapper.eq("usage_status", 1);
|
||||
wrapper.eq("run_status", 2);
|
||||
wrapper.isNotNull("node_id");
|
||||
return baseMapper.selectObjs(wrapper)
|
||||
.stream()
|
||||
|
||||
@@ -0,0 +1,146 @@
|
||||
package com.njcn.message.consumer;
|
||||
|
||||
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
|
||||
import com.njcn.csdevice.param.IcdBzReplyParam;
|
||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||
import com.njcn.mq.constant.BusinessTopic;
|
||||
import com.njcn.mq.constant.MessageStatus;
|
||||
import com.njcn.mq.message.BzMessage;
|
||||
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 类的介绍:接收前置响应台账更新相关信息
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2023/8/11 15:32
|
||||
*/
|
||||
@Service
|
||||
@RocketMQMessageListener(
|
||||
topic = BusinessTopic.REPLY_TOPIC,
|
||||
consumerGroup = BusinessTopic.REPLY_TOPIC,
|
||||
selectorExpression = "RECALL",
|
||||
consumeThreadNumber = 10,
|
||||
enableMsgTrace = true
|
||||
)
|
||||
@Slf4j
|
||||
public class BzConsumer extends EnhanceConsumerMessageHandler<BzMessage> implements RocketMQListener<BzMessage> {
|
||||
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
@Resource
|
||||
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||
@Resource
|
||||
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
|
||||
|
||||
@Override
|
||||
public void handleMessage(BzMessage message) {
|
||||
log.info("分发至补召响应处理程序");
|
||||
//收到消息修改(cs_terminal_reply)
|
||||
IcdBzReplyParam param = new IcdBzReplyParam();
|
||||
param.setId(message.getGuid());
|
||||
param.setDeviceId(message.getTerminalId());
|
||||
param.setLineId(message.getMonitorId());
|
||||
param.setCode(message.getCode());
|
||||
param.setMsg(message.getResult());
|
||||
if (param.getCode() == 200) {
|
||||
param.setState(1);
|
||||
} else {
|
||||
param.setState(2);
|
||||
}
|
||||
csTerminalReplyFeignClient.updateBzData(param);
|
||||
}
|
||||
|
||||
/***
|
||||
* 通过redis分布式锁判断当前消息所处状态
|
||||
* 1、null 查不到该key的数据,属于第一次消费,放行
|
||||
* 2、fail 上次消息消费时发生异常,放行
|
||||
* 3、being processed 正在处理,打回去
|
||||
* 4、success 最近72小时消费成功,避免重复消费,打回去
|
||||
*/
|
||||
@Override
|
||||
public boolean filter(BzMessage message) {
|
||||
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
|
||||
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 消费成功,缓存到redis72小时,避免重复消费
|
||||
*/
|
||||
@Override
|
||||
protected void consumeSuccess(BzMessage message) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发生异常时,进行错误信息入库保存
|
||||
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||
*/
|
||||
@Override
|
||||
protected void saveExceptionMsgLog(BzMessage message, String identity, Exception exception) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
|
||||
//数据库字段配置长度200,避免插入失败,大致分析异常原因
|
||||
String exceptionMsg = exception.getMessage();
|
||||
if(exceptionMsg.length() > 200){
|
||||
exceptionMsg = exceptionMsg.substring(0,180);
|
||||
}
|
||||
rocketmqMsgErrorLog.setRecord(exceptionMsg);
|
||||
//如果是当前消息重试的则略过
|
||||
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
|
||||
//单次消费异常
|
||||
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||
}
|
||||
} else {
|
||||
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
|
||||
//重试N次后,依然消费异常
|
||||
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* 处理失败后,是否重试
|
||||
* 一般开启
|
||||
*/
|
||||
@Override
|
||||
protected boolean isRetry() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 消费失败是否抛出异常,抛出异常后就不再消费了
|
||||
*/
|
||||
@Override
|
||||
protected boolean throwException() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 调用父类handler处理消息的元信息
|
||||
*/
|
||||
@Override
|
||||
public void onMessage(BzMessage message) {
|
||||
super.dispatchMessage(message);
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -31,6 +32,7 @@ import java.util.Objects;
|
||||
topic = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
|
||||
consumerGroup = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
|
||||
consumeThreadNumber = 10,
|
||||
consumeMode = ConsumeMode.ORDERLY,
|
||||
enableMsgTrace = true
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -30,6 +31,7 @@ import java.util.Objects;
|
||||
topic = BusinessTopic.HEART_BEAT_TOPIC,
|
||||
consumerGroup = BusinessTopic.HEART_BEAT_TOPIC,
|
||||
consumeThreadNumber = 10,
|
||||
consumeMode = ConsumeMode.ORDERLY,
|
||||
enableMsgTrace = true
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.njcn.message.consumer;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
|
||||
import com.njcn.csdevice.param.IcdBzReplyParam;
|
||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||
import com.njcn.mq.constant.BusinessTopic;
|
||||
@@ -32,7 +33,7 @@ import java.util.Objects;
|
||||
@RocketMQMessageListener(
|
||||
topic = BusinessTopic.REPLY_TOPIC,
|
||||
consumerGroup = BusinessTopic.REPLY_TOPIC,
|
||||
selectorExpression = "Test_Tag",
|
||||
selectorExpression = "LEDGER",
|
||||
consumeThreadNumber = 10,
|
||||
enableMsgTrace = true
|
||||
)
|
||||
@@ -53,11 +54,17 @@ public class UpdateLedgerConsumer extends EnhanceConsumerMessageHandler<UpdateLe
|
||||
List<UpdateLedgerMessage.HandleData> data = updateLedgerMessage.getData();
|
||||
if (ObjectUtil.isNotEmpty(data)) {
|
||||
data.forEach(item->{
|
||||
IcdBzReplyParam param = new IcdBzReplyParam();
|
||||
param.setId(updateLedgerMessage.getGuid());
|
||||
param.setDeviceId(item.getDeviceId());
|
||||
param.setCode(item.getCode());
|
||||
param.setMsg(item.getResult());
|
||||
if (item.getCode() == 200) {
|
||||
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),1,item.getDeviceId());
|
||||
param.setState(1);
|
||||
} else {
|
||||
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),2,item.getDeviceId());
|
||||
param.setState(2);
|
||||
}
|
||||
csTerminalReplyFeignClient.updateData(param);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user