处理Redis认领后,数据库状态还未更新时,触发补招,导致重复补招同一个条记录问题

This commit is contained in:
caozehui
2026-04-20 08:46:27 +08:00
parent 47c00e2bd4
commit cb9a75084f
4 changed files with 155 additions and 127 deletions

View File

@@ -7,62 +7,32 @@ import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author caozehui
* @data 2026-02-27
* @description 消息重试队列表对应数据对象
*/
@Data
@TableName("push_message_retry_queue")
@EqualsAndHashCode(callSuper = true)
public class MessageRetryQueueDO extends BaseDO {
/**
* 主键ID
*/
private Long id;
/**
* 关联message_record的message_id
*/
private Long messageId;
/**
* 渠道类型
*/
private String channel;
/**
* 接收者
*/
private String receiver;
/**
* 已重试次数
*/
private Integer retryCount;
/**
* 最大重试次数
*/
private Integer maxRetry;
/**
* 下次重试时间
*/
private LocalDateTime nextRetryTime;
/**
* 首次失败时间
*/
private Integer processStatus;
private LocalDateTime lockUntil;
private LocalDateTime firstFailTime;
/**
* 最后一次重试时间
*/
private LocalDateTime lastRetryTime;
/**
* 最后失败原因
*/
private String lastErrorMsg;
}

View File

@@ -21,4 +21,9 @@ public interface MessageRetryQueueMapper extends BaseMapperX<MessageRetryQueueDO
List<MessageRetryQueueDO> selectNeedRetryMessages(@Param("channel") String channel,
@Param("currentTime") LocalDateTime currentTime,
@Param("limit") int limit);
int claimRetryMessage(@Param("messageId") Long messageId,
@Param("channel") String channel,
@Param("currentTime") LocalDateTime currentTime,
@Param("lockUntil") LocalDateTime lockUntil);
}

View File

@@ -8,9 +8,31 @@
WHERE channel = #{channel}
AND next_retry_time <![CDATA[ <= ]]> #{currentTime}
AND deleted = 0
AND (
process_status = 0
OR process_status IS NULL
OR lock_until IS NULL
OR lock_until <![CDATA[ < ]]> #{currentTime}
)
ORDER BY next_retry_time ASC
LIMIT #{limit}
</select>
<update id="claimRetryMessage">
UPDATE push_message_retry_queue
SET process_status = 1,
lock_until = #{lockUntil}
WHERE message_id = #{messageId}
AND channel = #{channel}
AND deleted = 0
AND next_retry_time <![CDATA[ <= ]]> #{currentTime}
AND (
process_status = 0
OR process_status IS NULL
OR lock_until IS NULL
OR lock_until <![CDATA[ < ]]> #{currentTime}
)
</update>
</mapper>

View File

@@ -12,7 +12,6 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.enums.RetrySourceEnum;
@@ -33,34 +32,34 @@ import java.util.stream.Collectors;
@Slf4j
@Service
public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueMapper, MessageRetryQueueDO> implements MessageRetryQueueService, CommandLineRunner {
@Autowired
private MessageRetryRedisDAO messageRetryRedisDAO;
@Autowired
private MessageRecordService messageRecordService;
@Autowired
public RetryStrategyConfigService retryStrategyConfigService;
@Autowired
private MessageConfirmRedisDAO messageConfirmRedisDAO;
public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueMapper, MessageRetryQueueDO>
implements MessageRetryQueueService, CommandLineRunner {
private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_MAX_RETRY_COUNT = 5;
private static final int PROCESS_STATUS_WAITING = 0;
private static final int PROCESS_STATUS_PROCESSING = 1;
private static final int DEFAULT_LOCK_SECONDS = 120;
private static final List<String> CHANNELS = Arrays.asList(
ChannelTypeEnum.SMS.getCode(),
ChannelTypeEnum.EMAIL.getCode(),
ChannelTypeEnum.APP.getCode());
@Autowired
private MessageRetryRedisDAO messageRetryRedisDAO;
@Autowired
private MessageRecordService messageRecordService;
@Autowired
private RetryStrategyConfigService retryStrategyConfigService;
private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false);
@Override
@Transactional(rollbackFor = Exception.class)
public void saveOrUpdateRetryMessage(MessageRecordDO message) {
MessageRetryQueueDO existing = super.baseMapper.selectOne(
new LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, message.getId())
);
MessageRetryQueueDO existing = baseMapper.selectOne(new LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, message.getId()));
message.setLastRetryTime(message.getSendTime());
if (ObjectUtil.isNull(existing)) {
@@ -68,17 +67,18 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
retryRecord.setMessageId(message.getId());
retryRecord.setChannel(message.getChannel());
retryRecord.setReceiver(message.getReceiver());
retryRecord.setRetryCount(1);
RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(message.getChannel());
retryRecord.setMaxRetry(ObjectUtil.isNull(strategyConfig) ? DEFAULT_MAX_RETRY_COUNT : strategyConfig.getMaxRetryCount());
retryRecord.setFirstFailTime(message.getSendTime());
retryRecord.setLastRetryTime(message.getSendTime());
retryRecord.setLastErrorMsg(message.getErrorMsg());
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), retryRecord.getRetryCount());
LocalDateTime nextRetryTime = calculateNextRetryTime(message.getChannel(), retryRecord.getRetryCount());
retryRecord.setNextRetryTime(nextRetryTime);
retryRecord.setProcessStatus(PROCESS_STATUS_WAITING);
retryRecord.setLockUntil(null);
message.setNextRetryTime(nextRetryTime);
this.save(retryRecord);
save(retryRecord);
if (!messageRetryRedisDAO.addToRetryQueue(message)) {
log.warn("消息已写入数据库重试队列,但 Redis 入队失败: messageId={}", message.getId());
}
@@ -93,17 +93,19 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
message.setStatus(MsgStatusConstant.FINALFAILED);
message.setNextRetryTime(null);
existing.setNextRetryTime(null);
this.updateById(existing);
this.deleteByMessageIds(Collections.singletonList(message.getId()));
updateById(existing);
deleteByMessageIds(Collections.singletonList(message.getId()));
messageRetryRedisDAO.removeFromRetryQueue(message.getChannel(), message.getId());
return;
}
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), newRetryCount);
LocalDateTime nextRetryTime = calculateNextRetryTime(message.getChannel(), newRetryCount);
existing.setRetryCount(newRetryCount);
existing.setNextRetryTime(nextRetryTime);
existing.setProcessStatus(PROCESS_STATUS_WAITING);
existing.setLockUntil(null);
message.setNextRetryTime(nextRetryTime);
this.updateById(existing);
updateById(existing);
if (!messageRetryRedisDAO.addToRetryQueue(message)) {
log.warn("消息已更新数据库重试队列,但 Redis 入队失败: messageId={}", message.getId());
}
@@ -147,11 +149,9 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
} else {
String retryIntervals = strategyConfig.getRetryIntervals();
String[] split = retryIntervals.split(String.valueOf(StrUtil.C_COMMA));
if (retryCount >= split.length) {
plusSeconds = Long.parseLong(split[split.length - 1]);
} else {
plusSeconds = Long.parseLong(split[retryCount - 1]);
}
plusSeconds = retryCount >= split.length
? Long.parseLong(split[split.length - 1])
: Long.parseLong(split[retryCount - 1]);
}
return LocalDateTime.now().plusSeconds(plusSeconds);
}
@@ -159,7 +159,6 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Override
public void processRetryBatch(String channel) {
long epochMilli = System.currentTimeMillis();
// 使用原子认领机制,一次性完成读取 + 删除
List<String> claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(claimedMessageIds)) {
@@ -168,46 +167,48 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
epochMilli = System.currentTimeMillis();
claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(claimedMessageIds)) {
return;
}
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(claimedMessageIds);
LocalDateTime currentTime = LocalDateTime.now();
LocalDateTime lockUntil = currentTime.plusSeconds(DEFAULT_LOCK_SECONDS);
List<Long> dbClaimedMessageIds = claimedMessageIds.stream()
.map(Long::valueOf)
.filter(messageId -> claimRetryMessage(messageId, channel, currentTime, lockUntil))
.collect(Collectors.toList());
if (CollUtil.isEmpty(dbClaimedMessageIds)) {
return;
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(dbClaimedMessageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO, RetrySourceEnum.AUTO_RETRY);
}
}
/**
* 处理单条消息重试,根据执行结果决定是否重新入队
* 成功:不再回 Redis并逻辑删除数据库队列记录
* 失败且继续重试:重新计算 nextRetryTime重新写入 Redis 和数据库队列表
* 失败且终止:不再回 Redis并逻辑删除数据库队列记录
*
* @param messageRecordDO 消息记录
*/
private void processSingleRetry(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) {
// 校验一下消息的状态是否为 retryable_failed防止误推送已经成功的消息
if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) {
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO), retrySource);
messageRecordService.updateRetryCount(messageRecordDO.getId());
finalizeRetryLease(messageRecordDO);
} else {
// 从retryQueue中删除(已经从redis中移除过了所以不用再次移除了)
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getId()));
deleteByMessageIds(Collections.singletonList(messageRecordDO.getId()));
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void manualRetry(List<String> messageIds) {
if (CollUtil.isNotEmpty(messageIds)) {
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO, RetrySourceEnum.MANUAL_RETRY);
}
if (CollUtil.isEmpty(messageIds)) {
return;
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO, RetrySourceEnum.MANUAL_RETRY);
}
}
@@ -221,7 +222,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
.le(ObjectUtil.isNotNull(reqVO.getMaxRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMaxRetryCount())
.orderByAsc(MessageRetryQueueDO::getNextRetryTime)
.orderByDesc(MessageRetryQueueDO::getId);
return this.baseMapper.selectPage(reqVO, queryWrapper);
return baseMapper.selectPage(reqVO, queryWrapper);
}
@Override
@@ -229,14 +230,11 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
if (CollUtil.isEmpty(messageIds)) {
return false;
}
return this.remove(new LambdaQueryWrapper<MessageRetryQueueDO>()
.in(MessageRetryQueueDO::getMessageId, messageIds));
return remove(new LambdaQueryWrapper<MessageRetryQueueDO>().in(MessageRetryQueueDO::getMessageId, messageIds));
}
@Override
public void run(String... args) throws Exception {
public void run(String... args) {
rebuildRetryQueueOnStartup();
startupRebuildCompleted.set(true);
}
@@ -245,18 +243,13 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Transactional(rollbackFor = Exception.class)
public void rebuildRetryQueueOnStartup() {
log.info("========== 开始重建重试队列 ==========");
for (String channel : CHANNELS) {
try {
log.info("正在清空渠道 [{}] 的 Redis 重试队列", channel);
messageRetryRedisDAO.clearRetryQueue(channel);
log.info("正在从数据库加载渠道 [{}] 的待重试消息", channel);
List<MessageRetryQueueDO> dbRecords = this.lambdaQuery()
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.list();
List<MessageRetryQueueDO> dbRecords = listEligibleQueueRecords(channel);
if (CollUtil.isEmpty(dbRecords)) {
log.info("渠道 [{}] 没有待重试的消息", channel);
continue;
@@ -264,21 +257,17 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList());
List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds);
if (CollUtil.isEmpty(messages)) {
log.warn("渠道 [{}] 的消息记录不存在", channel);
continue;
}
int successCount = messageRetryRedisDAO.batchAddToRetryQueue(messages);
log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}",
channel, dbRecords.size(), successCount);
log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}", channel, dbRecords.size(), successCount);
} catch (Exception e) {
log.error("重建渠道 [{}] 的重试队列失败", channel, e);
}
}
log.info("========== 重试队列重建完成 ==========");
}
@@ -289,35 +278,31 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
log.debug("启动重建未完成,跳过本次同步");
return;
}
log.info("开始同步redis与数据库重试队列一致性");
log.info("开始同步 Redis 与数据库重试队列一致性");
for (String channel : CHANNELS) {
List<MessageRetryQueueDO> dbRecords = this.lambdaQuery()
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.list();
List<MessageRetryQueueDO> dbRecords = listEligibleQueueRecords(channel);
if (CollUtil.isEmpty(dbRecords)) {
continue;
}
for (MessageRetryQueueDO dbRecord : dbRecords) {
boolean existsInRedis = messageRetryRedisDAO.existsInRetryQueue(channel, dbRecord.getMessageId());
if (existsInRedis) {
continue;
}
if (!existsInRedis) {
Long messageId = dbRecord.getMessageId();
MessageRecordDO message = messageRecordService.getById(messageId);
if (message != null && message.getNextRetryTime() != null) {
boolean added = messageRetryRedisDAO.addToRetryQueue(message);
if (added) {
log.debug("补回消息到 Redis: channel={}, messageId={}", channel, messageId);
} else {
log.warn("补回消息到 Redis 失败channel={}, messageId={}", channel, messageId);
}
Long messageId = dbRecord.getMessageId();
MessageRecordDO message = messageRecordService.getById(messageId);
if (message != null && message.getNextRetryTime() != null) {
boolean added = messageRetryRedisDAO.addToRetryQueue(message);
if (added) {
log.debug("补回消息到 Redis: channel={}, messageId={}", channel, messageId);
} else {
log.warn("消息记录不存在或 next_retry_time 为空channel={}, messageId={}", channel, messageId);
log.warn("补回消息到 Redis 失败channel={}, messageId={}", channel, messageId);
}
} else {
log.warn("消息记录不存在或 next_retry_time 为空channel={}, messageId={}", channel, messageId);
}
}
}
@@ -325,31 +310,21 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Override
public void fallbackToDatabase(String channel) {
boolean redisAvailable = messageRetryRedisDAO.isRedisAvailable();
if (!redisAvailable) {
if (!messageRetryRedisDAO.isRedisAvailable()) {
log.warn("Redis 不可用channel={}", channel);
return;
}
fallbackToDatabaseDirectly(channel);
}
/**
* 从数据库补拉
*
* @param channel
*/
private void fallbackToDatabaseDirectly(String channel) {
List<MessageRetryQueueDO> dbRecords = baseMapper.selectNeedRetryMessages(channel, LocalDateTime.now(), DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(dbRecords)) {
return;
}
List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList());
List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds);
if (CollUtil.isEmpty(messages)) {
return;
}
@@ -358,4 +333,60 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
messageRetryRedisDAO.batchAddToRetryQueue(messages);
}
}
private boolean claimRetryMessage(Long messageId, String channel, LocalDateTime currentTime, LocalDateTime lockUntil) {
return baseMapper.claimRetryMessage(messageId, channel, currentTime, lockUntil) > 0;
}
/**
* 完成重试租约处理,根据消息状态执行相应的清理或重置操作
* - 如果消息状态为可重试失败,直接返回,不进行任何处理
* - 如果消息状态为待处理重置队列记录的处理状态和锁时间并将消息重新加入Redis重试队列
* - 其他状态成功、最终失败等从Redis重试队列移除并删除数据库中的重试队列记录
*
* @param messageRecordDO 消息记录对象
*/
private void finalizeRetryLease(MessageRecordDO messageRecordDO) {
if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) {
return;
}
// “处理过程中断,但消息仍停留在 PENDING 时,主动释放 lease 并回队列”的兜底补偿逻辑
if (MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) {
lambdaUpdate()
.eq(MessageRetryQueueDO::getMessageId, messageRecordDO.getId())
.set(MessageRetryQueueDO::getProcessStatus, PROCESS_STATUS_WAITING)
.set(MessageRetryQueueDO::getLockUntil, null)
.update();
if (messageRecordDO.getNextRetryTime() != null) {
messageRetryRedisDAO.addToRetryQueue(messageRecordDO);
}
return;
}
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getId());
deleteByMessageIds(Collections.singletonList(messageRecordDO.getId()));
}
/**
* 查询指定通道下符合重试条件的队列记录
* 筛选条件:
* - 匹配指定通道
* - 下次重试时间不为空
* - 处理状态为等待中,或处理状态为空,或锁时间为空,或锁时间已过期
*
* @param channel 消息通道
* @return 符合条件的重试队列记录列表
*/
private List<MessageRetryQueueDO> listEligibleQueueRecords(String channel) {
LocalDateTime now = LocalDateTime.now();
return lambdaQuery()
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.and(wrapper -> wrapper.eq(MessageRetryQueueDO::getProcessStatus, PROCESS_STATUS_WAITING)
.or().isNull(MessageRetryQueueDO::getProcessStatus)
.or().isNull(MessageRetryQueueDO::getLockUntil)
.or().lt(MessageRetryQueueDO::getLockUntil, now))
.list();
}
}