From 1d29a03a3c14afbd5afa7aa8737dae26874d9f02 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Thu, 23 Oct 2025 09:41:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BA=91=E5=89=8D=E7=BD=AE=E6=94=B9=E9=80=A0-?= =?UTF-8?q?=E8=A1=A5=E5=8F=AC=E5=8A=9F=E8=83=BD=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/CsEquipmentDeliveryServiceImpl.java | 7 +- .../com/njcn/message/consumer/BzConsumer.java | 146 ++++++++++++++++++ .../consumer/CldDevRunFlagConsumer.java | 2 + .../message/consumer/CldHeartConsumer.java | 2 + .../consumer/UpdateLedgerConsumer.java | 13 +- 5 files changed, 164 insertions(+), 6 deletions(-) create mode 100644 iot-message/message-boot/src/main/java/com/njcn/message/consumer/BzConsumer.java diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index 28c68d7..e0f1741 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -1,7 +1,6 @@ package com.njcn.access.service.impl; import cn.hutool.core.collection.CollUtil; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -18,7 +17,10 @@ import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -151,7 +153,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl wrapper = new QueryWrapper<>(); wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated"); wrapper.eq("usage_status", 1); - wrapper.eq("run_status", 2); wrapper.isNotNull("node_id"); return baseMapper.selectObjs(wrapper) .stream() diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/BzConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/BzConsumer.java new file mode 100644 index 0000000..2057c81 --- /dev/null +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/BzConsumer.java @@ -0,0 +1,146 @@ +package com.njcn.message.consumer; + +import com.njcn.csdevice.api.CsTerminalReplyFeignClient; +import com.njcn.csdevice.param.IcdBzReplyParam; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.constant.MessageStatus; +import com.njcn.mq.message.BzMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * 类的介绍:接收前置响应台账更新相关信息 + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:32 + */ +@Service +@RocketMQMessageListener( + topic = BusinessTopic.REPLY_TOPIC, + consumerGroup = BusinessTopic.REPLY_TOPIC, + selectorExpression = "RECALL", + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class BzConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Resource + private CsTerminalReplyFeignClient csTerminalReplyFeignClient; + + @Override + public void handleMessage(BzMessage message) { + log.info("分发至补召响应处理程序"); + //收到消息修改(cs_terminal_reply) + IcdBzReplyParam param = new IcdBzReplyParam(); + param.setId(message.getGuid()); + param.setDeviceId(message.getTerminalId()); + param.setLineId(message.getMonitorId()); + param.setCode(message.getCode()); + param.setMsg(message.getResult()); + if (param.getCode() == 200) { + param.setState(1); + } else { + param.setState(2); + } + csTerminalReplyFeignClient.updateBzData(param); + } + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(BzMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(BzMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(BzMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(BzMessage message) { + super.dispatchMessage(message); + } +} diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java index 92440b3..2d1bd20 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldDevRunFlagConsumer.java @@ -12,6 +12,7 @@ import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.RocketMqLogFeignClient; import com.njcn.system.pojo.po.RocketmqMsgErrorLog; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @@ -31,6 +32,7 @@ import java.util.Objects; topic = BusinessTopic.DEVICE_RUN_FLAG_TOPIC, consumerGroup = BusinessTopic.DEVICE_RUN_FLAG_TOPIC, consumeThreadNumber = 10, + consumeMode = ConsumeMode.ORDERLY, enableMsgTrace = true ) @Slf4j diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldHeartConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldHeartConsumer.java index 1b256fc..ee6defd 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldHeartConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/CldHeartConsumer.java @@ -11,6 +11,7 @@ import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.RocketMqLogFeignClient; import com.njcn.system.pojo.po.RocketmqMsgErrorLog; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @@ -30,6 +31,7 @@ import java.util.Objects; topic = BusinessTopic.HEART_BEAT_TOPIC, consumerGroup = BusinessTopic.HEART_BEAT_TOPIC, consumeThreadNumber = 10, + consumeMode = ConsumeMode.ORDERLY, enableMsgTrace = true ) @Slf4j diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/UpdateLedgerConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/UpdateLedgerConsumer.java index 6b98eb5..2ea56d6 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/UpdateLedgerConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/UpdateLedgerConsumer.java @@ -2,6 +2,7 @@ package com.njcn.message.consumer; import cn.hutool.core.util.ObjectUtil; import com.njcn.csdevice.api.CsTerminalReplyFeignClient; +import com.njcn.csdevice.param.IcdBzReplyParam; import com.njcn.middle.rocket.constant.EnhanceMessageConstant; import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; import com.njcn.mq.constant.BusinessTopic; @@ -32,7 +33,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = BusinessTopic.REPLY_TOPIC, consumerGroup = BusinessTopic.REPLY_TOPIC, - selectorExpression = "Test_Tag", + selectorExpression = "LEDGER", consumeThreadNumber = 10, enableMsgTrace = true ) @@ -53,11 +54,17 @@ public class UpdateLedgerConsumer extends EnhanceConsumerMessageHandler data = updateLedgerMessage.getData(); if (ObjectUtil.isNotEmpty(data)) { data.forEach(item->{ + IcdBzReplyParam param = new IcdBzReplyParam(); + param.setId(updateLedgerMessage.getGuid()); + param.setDeviceId(item.getDeviceId()); + param.setCode(item.getCode()); + param.setMsg(item.getResult()); if (item.getCode() == 200) { - csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),1,item.getDeviceId()); + param.setState(1); } else { - csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),2,item.getDeviceId()); + param.setState(2); } + csTerminalReplyFeignClient.updateData(param); }); } }