From cb9a75084fb53a952337a593721f463923b722d2 Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Mon, 20 Apr 2026 08:46:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86Redis=E8=AE=A4=E9=A2=86?= =?UTF-8?q?=E5=90=8E=EF=BC=8C=E6=95=B0=E6=8D=AE=E5=BA=93=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E8=BF=98=E6=9C=AA=E6=9B=B4=E6=96=B0=E6=97=B6=EF=BC=8C=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E8=A1=A5=E6=8B=9B=EF=BC=8C=E5=AF=BC=E8=87=B4=E9=87=8D?= =?UTF-8?q?=E5=A4=8D=E8=A1=A5=E6=8B=9B=E5=90=8C=E4=B8=80=E4=B8=AA=E6=9D=A1?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataobject/retry/MessageRetryQueueDO.java | 40 +--- .../mysql/retry/MessageRetryQueueMapper.java | 5 + .../retry/mapping/MessageRetryQueueMapper.xml | 22 ++ .../retry/MessageRetryQueueServiceImpl.java | 215 ++++++++++-------- 4 files changed, 155 insertions(+), 127 deletions(-) diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java index 08b4e08..6d0cc98 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java @@ -7,62 +7,32 @@ import lombok.EqualsAndHashCode; import java.time.LocalDateTime; -/** - * @author caozehui - * @data 2026-02-27 - * @description 消息重试队列表对应数据对象 - */ @Data @TableName("push_message_retry_queue") @EqualsAndHashCode(callSuper = true) public class MessageRetryQueueDO extends BaseDO { - /** - * 主键ID - */ + private Long id; - /** - * 关联message_record的message_id - */ private Long messageId; - /** - * 渠道类型 - */ private String channel; - /** - * 接收者 - */ private String receiver; - /** - * 已重试次数 - */ private Integer retryCount; - /** - * 最大重试次数 - */ private Integer maxRetry; - /** - * 下次重试时间 - */ private LocalDateTime nextRetryTime; - /** - * 首次失败时间 - */ + private Integer processStatus; + + private LocalDateTime lockUntil; + private LocalDateTime firstFailTime; - /** - * 最后一次重试时间 - */ private LocalDateTime lastRetryTime; - /** - * 最后失败原因 - */ private String lastErrorMsg; } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java index 905d125..db4be47 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java @@ -21,4 +21,9 @@ public interface MessageRetryQueueMapper extends BaseMapperX selectNeedRetryMessages(@Param("channel") String channel, @Param("currentTime") LocalDateTime currentTime, @Param("limit") int limit); + + int claimRetryMessage(@Param("messageId") Long messageId, + @Param("channel") String channel, + @Param("currentTime") LocalDateTime currentTime, + @Param("lockUntil") LocalDateTime lockUntil); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml index 061c71e..c21c22f 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml @@ -8,9 +8,31 @@ WHERE channel = #{channel} AND next_retry_time #{currentTime} AND deleted = 0 + AND ( + process_status = 0 + OR process_status IS NULL + OR lock_until IS NULL + OR lock_until #{currentTime} + ) ORDER BY next_retry_time ASC LIMIT #{limit} + + UPDATE push_message_retry_queue + SET process_status = 1, + lock_until = #{lockUntil} + WHERE message_id = #{messageId} + AND channel = #{channel} + AND deleted = 0 + AND next_retry_time #{currentTime} + AND ( + process_status = 0 + OR process_status IS NULL + OR lock_until IS NULL + OR lock_until #{currentTime} + ) + + diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java index f481394..ff023eb 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java @@ -12,7 +12,6 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper; -import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO; import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; import com.njcn.msgpush.module.push.enums.RetrySourceEnum; @@ -33,34 +32,34 @@ import java.util.stream.Collectors; @Slf4j @Service -public class MessageRetryQueueServiceImpl extends ServiceImpl implements MessageRetryQueueService, CommandLineRunner { - @Autowired - private MessageRetryRedisDAO messageRetryRedisDAO; - @Autowired - private MessageRecordService messageRecordService; - @Autowired - public RetryStrategyConfigService retryStrategyConfigService; - @Autowired - private MessageConfirmRedisDAO messageConfirmRedisDAO; +public class MessageRetryQueueServiceImpl extends ServiceImpl + implements MessageRetryQueueService, CommandLineRunner { private static final int DEFAULT_BATCH_SIZE = 100; - private static final int DEFAULT_MAX_RETRY_COUNT = 5; + private static final int PROCESS_STATUS_WAITING = 0; + private static final int PROCESS_STATUS_PROCESSING = 1; + private static final int DEFAULT_LOCK_SECONDS = 120; private static final List CHANNELS = Arrays.asList( ChannelTypeEnum.SMS.getCode(), ChannelTypeEnum.EMAIL.getCode(), ChannelTypeEnum.APP.getCode()); + @Autowired + private MessageRetryRedisDAO messageRetryRedisDAO; + @Autowired + private MessageRecordService messageRecordService; + @Autowired + private RetryStrategyConfigService retryStrategyConfigService; + private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false); @Override @Transactional(rollbackFor = Exception.class) public void saveOrUpdateRetryMessage(MessageRecordDO message) { - MessageRetryQueueDO existing = super.baseMapper.selectOne( - new LambdaQueryWrapper() - .eq(MessageRetryQueueDO::getMessageId, message.getId()) - ); + MessageRetryQueueDO existing = baseMapper.selectOne(new LambdaQueryWrapper() + .eq(MessageRetryQueueDO::getMessageId, message.getId())); message.setLastRetryTime(message.getSendTime()); if (ObjectUtil.isNull(existing)) { @@ -68,17 +67,18 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl= split.length) { - plusSeconds = Long.parseLong(split[split.length - 1]); - } else { - plusSeconds = Long.parseLong(split[retryCount - 1]); - } + plusSeconds = retryCount >= split.length + ? Long.parseLong(split[split.length - 1]) + : Long.parseLong(split[retryCount - 1]); } return LocalDateTime.now().plusSeconds(plusSeconds); } @@ -159,7 +159,6 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE); if (CollUtil.isEmpty(claimedMessageIds)) { @@ -168,46 +167,48 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl messageRecordDOList = messageRecordService.listByIds(claimedMessageIds); + LocalDateTime currentTime = LocalDateTime.now(); + LocalDateTime lockUntil = currentTime.plusSeconds(DEFAULT_LOCK_SECONDS); + List dbClaimedMessageIds = claimedMessageIds.stream() + .map(Long::valueOf) + .filter(messageId -> claimRetryMessage(messageId, channel, currentTime, lockUntil)) + .collect(Collectors.toList()); + + if (CollUtil.isEmpty(dbClaimedMessageIds)) { + return; + } + + List messageRecordDOList = messageRecordService.listByIds(dbClaimedMessageIds); for (MessageRecordDO messageRecordDO : messageRecordDOList) { processSingleRetry(messageRecordDO, RetrySourceEnum.AUTO_RETRY); } } - /** - * 处理单条消息重试,根据执行结果决定是否重新入队 - * 成功:不再回 Redis,并逻辑删除数据库队列记录 - * 失败且继续重试:重新计算 nextRetryTime,重新写入 Redis 和数据库队列表 - * 失败且终止:不再回 Redis,并逻辑删除数据库队列记录 - * - * @param messageRecordDO 消息记录 - */ private void processSingleRetry(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) { - // 校验一下消息的状态是否为 retryable_failed,防止误推送已经成功的消息 if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) { messageRecordDO.setStatus(MsgStatusConstant.PENDING); messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO), retrySource); messageRecordService.updateRetryCount(messageRecordDO.getId()); + finalizeRetryLease(messageRecordDO); } else { - // 从retryQueue中删除(已经从redis中移除过了,所以不用再次移除了) - this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getId())); + deleteByMessageIds(Collections.singletonList(messageRecordDO.getId())); } } @Override @Transactional(rollbackFor = Exception.class) public void manualRetry(List messageIds) { - if (CollUtil.isNotEmpty(messageIds)) { - List messageRecordDOList = messageRecordService.listByIds(messageIds); - for (MessageRecordDO messageRecordDO : messageRecordDOList) { - processSingleRetry(messageRecordDO, RetrySourceEnum.MANUAL_RETRY); - } + if (CollUtil.isEmpty(messageIds)) { + return; + } + List messageRecordDOList = messageRecordService.listByIds(messageIds); + for (MessageRecordDO messageRecordDO : messageRecordDOList) { + processSingleRetry(messageRecordDO, RetrySourceEnum.MANUAL_RETRY); } } @@ -221,7 +222,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl() - .in(MessageRetryQueueDO::getMessageId, messageIds)); + return remove(new LambdaQueryWrapper().in(MessageRetryQueueDO::getMessageId, messageIds)); } - @Override - public void run(String... args) throws Exception { + public void run(String... args) { rebuildRetryQueueOnStartup(); startupRebuildCompleted.set(true); } @@ -245,18 +243,13 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl dbRecords = this.lambdaQuery() - .eq(MessageRetryQueueDO::getChannel, channel) - .isNotNull(MessageRetryQueueDO::getNextRetryTime) - .list(); - + List dbRecords = listEligibleQueueRecords(channel); if (CollUtil.isEmpty(dbRecords)) { log.info("渠道 [{}] 没有待重试的消息", channel); continue; @@ -264,21 +257,17 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList()); List messages = messageRecordService.listByIds(messageIds); - if (CollUtil.isEmpty(messages)) { log.warn("渠道 [{}] 的消息记录不存在", channel); continue; } int successCount = messageRetryRedisDAO.batchAddToRetryQueue(messages); - log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}", - channel, dbRecords.size(), successCount); - + log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}", channel, dbRecords.size(), successCount); } catch (Exception e) { log.error("重建渠道 [{}] 的重试队列失败", channel, e); } } - log.info("========== 重试队列重建完成 =========="); } @@ -289,35 +278,31 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl dbRecords = this.lambdaQuery() - .eq(MessageRetryQueueDO::getChannel, channel) - .isNotNull(MessageRetryQueueDO::getNextRetryTime) - .list(); - + List dbRecords = listEligibleQueueRecords(channel); if (CollUtil.isEmpty(dbRecords)) { continue; } for (MessageRetryQueueDO dbRecord : dbRecords) { boolean existsInRedis = messageRetryRedisDAO.existsInRetryQueue(channel, dbRecord.getMessageId()); + if (existsInRedis) { + continue; + } - if (!existsInRedis) { - Long messageId = dbRecord.getMessageId(); - MessageRecordDO message = messageRecordService.getById(messageId); - - if (message != null && message.getNextRetryTime() != null) { - boolean added = messageRetryRedisDAO.addToRetryQueue(message); - if (added) { - log.debug("补回消息到 Redis: channel={}, messageId={}", channel, messageId); - } else { - log.warn("补回消息到 Redis 失败:channel={}, messageId={}", channel, messageId); - } + Long messageId = dbRecord.getMessageId(); + MessageRecordDO message = messageRecordService.getById(messageId); + if (message != null && message.getNextRetryTime() != null) { + boolean added = messageRetryRedisDAO.addToRetryQueue(message); + if (added) { + log.debug("补回消息到 Redis: channel={}, messageId={}", channel, messageId); } else { - log.warn("消息记录不存在或 next_retry_time 为空:channel={}, messageId={}", channel, messageId); + log.warn("补回消息到 Redis 失败:channel={}, messageId={}", channel, messageId); } + } else { + log.warn("消息记录不存在或 next_retry_time 为空:channel={}, messageId={}", channel, messageId); } } } @@ -325,31 +310,21 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl dbRecords = baseMapper.selectNeedRetryMessages(channel, LocalDateTime.now(), DEFAULT_BATCH_SIZE); - if (CollUtil.isEmpty(dbRecords)) { return; } List messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList()); List messages = messageRecordService.listByIds(messageIds); - if (CollUtil.isEmpty(messages)) { return; } @@ -358,4 +333,60 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl 0; + } + + /** + * 完成重试租约处理,根据消息状态执行相应的清理或重置操作 + * - 如果消息状态为可重试失败,直接返回,不进行任何处理 + * - 如果消息状态为待处理,重置队列记录的处理状态和锁时间,并将消息重新加入Redis重试队列 + * - 其他状态(成功、最终失败等),从Redis重试队列移除并删除数据库中的重试队列记录 + * + * @param messageRecordDO 消息记录对象 + */ + private void finalizeRetryLease(MessageRecordDO messageRecordDO) { + if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) { + return; + } + + // “处理过程中断,但消息仍停留在 PENDING 时,主动释放 lease 并回队列”的兜底补偿逻辑 + if (MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) { + lambdaUpdate() + .eq(MessageRetryQueueDO::getMessageId, messageRecordDO.getId()) + .set(MessageRetryQueueDO::getProcessStatus, PROCESS_STATUS_WAITING) + .set(MessageRetryQueueDO::getLockUntil, null) + .update(); + if (messageRecordDO.getNextRetryTime() != null) { + messageRetryRedisDAO.addToRetryQueue(messageRecordDO); + } + return; + } + + messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getId()); + deleteByMessageIds(Collections.singletonList(messageRecordDO.getId())); + } + + /** + * 查询指定通道下符合重试条件的队列记录 + * 筛选条件: + * - 匹配指定通道 + * - 下次重试时间不为空 + * - 处理状态为等待中,或处理状态为空,或锁时间为空,或锁时间已过期 + * + * @param channel 消息通道 + * @return 符合条件的重试队列记录列表 + */ + private List listEligibleQueueRecords(String channel) { + LocalDateTime now = LocalDateTime.now(); + return lambdaQuery() + .eq(MessageRetryQueueDO::getChannel, channel) + .isNotNull(MessageRetryQueueDO::getNextRetryTime) + .and(wrapper -> wrapper.eq(MessageRetryQueueDO::getProcessStatus, PROCESS_STATUS_WAITING) + .or().isNull(MessageRetryQueueDO::getProcessStatus) + .or().isNull(MessageRetryQueueDO::getLockUntil) + .or().lt(MessageRetryQueueDO::getLockUntil, now)) + .list(); + } }