From 1bf5436f0c3e3b109e631e3443e93fb5b054da47 Mon Sep 17 00:00:00 2001 From: hongawen <83944980@qq.com> Date: Wed, 1 Apr 2026 20:33:59 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=BE=AE=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../push/dal/redis/MessageRetryRedisDAO.java | 15 +++++++++++---- .../ChannelProviderConfigServiceImpl.java | 1 + .../message/MessageRecordServiceImpl.java | 4 ++-- .../ratelimit/RateLimitConfigServiceImpl.java | 2 +- .../retry/MessageRetryQueueServiceImpl.java | 19 ++++++++++++++++--- 5 files changed, 31 insertions(+), 10 deletions(-) diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java index ea0be3a..a8d4578 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java @@ -7,6 +7,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.time.ZoneId; +import java.util.Collections; import java.util.Set; /** @@ -38,7 +39,7 @@ public class MessageRetryRedisDAO { * * @param message 消息 */ - public void addToRetryQueue(MessageRecordDO message) { + public boolean addToRetryQueue(MessageRecordDO message) { String key = getRetryQueueKey(message.getChannel()); double score = message.getNextRetryTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); @@ -46,9 +47,12 @@ public class MessageRetryRedisDAO { redisTemplate.opsForZSet().add(key, message.getMessageId(), score); log.debug("添加消息到重试队列成功: channel={}, messageId={}, nextRetryTime={}", message.getChannel(), message.getMessageId(), message.getNextRetryTime()); + return true; } catch (Exception e) { log.error("添加消息到重试队列失败: channel={}, messageId={}", message.getChannel(), message.getMessageId(), e); - throw e; + log.warn("Redis 入队失败,当前仅保留数据库重试记录等待后续恢复: channel={}, messageId={}", + message.getChannel(), message.getMessageId()); + return false; } } @@ -58,14 +62,17 @@ public class MessageRetryRedisDAO { * @param channel 渠道类型 * @param messageId 消息ID */ - public void removeFromRetryQueue(String channel, String messageId) { + public boolean removeFromRetryQueue(String channel, String messageId) { String key = getRetryQueueKey(channel); try { redisTemplate.opsForZSet().remove(key, messageId); log.debug("从重试队列移除消息成功: channel={}, messageId={}", channel, messageId); + return true; } catch (Exception e) { log.error("从重试队列移除消息失败: channel={}, messageId={}", channel, messageId, e); + log.warn("Redis 出队失败,当前消息可能存在待清理的 Redis 脏数据: channel={}, messageId={}", channel, messageId); + return false; } } @@ -84,7 +91,7 @@ public class MessageRetryRedisDAO { return redisTemplate.opsForZSet().rangeByScore(key, 0, currentTime, 0, limit); } catch (Exception e) { log.error("获取需要重试的消息ID失败: channel={}", channel, e); - return null; + return Collections.emptySet(); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigServiceImpl.java index 41161ab..0ef88f9 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigServiceImpl.java @@ -32,6 +32,7 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl ids) { - return this.lambdaUpdate().in(MessageRecordDO::getId, ids).set(MessageRecordDO::getDeleted, false).update(); + return this.removeByIds(ids); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java index 30dfdd0..d713242 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java @@ -61,7 +61,7 @@ public class RateLimitConfigServiceImpl extends ServiceImpl= dailyQuota) { - messageRecordDO.setStatus(MsgStatusConstant.QUOTAEXCEEDED); + messageRecordDO.setStatus(MsgStatusConstant.RATE_LIMITED); } } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java index 61be118..54d7690 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java @@ -3,6 +3,7 @@ package com.njcn.msgpush.module.push.service.retry; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.msgpush.framework.common.pojo.PageResult; import com.njcn.msgpush.module.push.constant.MsgStatusConstant; @@ -63,7 +64,9 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl getPage(MessageRetryQueueReqVO reqVO) { - return null; + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(StrUtil.isNotBlank(reqVO.getMessageId()), MessageRetryQueueDO::getMessageId, reqVO.getMessageId()) + .eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRetryQueueDO::getChannel, reqVO.getChannel()) + .eq(StrUtil.isNotBlank(reqVO.getReceiver()), MessageRetryQueueDO::getReceiver, reqVO.getReceiver()) + .ge(ObjectUtil.isNotNull(reqVO.getMinRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMinRetryCount()) + .le(ObjectUtil.isNotNull(reqVO.getMaxRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMaxRetryCount()) + .orderByAsc(MessageRetryQueueDO::getNextRetryTime) + .orderByDesc(MessageRetryQueueDO::getId); + return this.baseMapper.selectPage(reqVO, queryWrapper); } @Override