This commit is contained in:
caozehui
2026-05-11 15:49:05 +08:00
parent fe26aa8670
commit c4b56a727c
2 changed files with 5 additions and 1 deletions

View File

@@ -276,7 +276,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) { public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) {
QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>(); QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>();
wrapper.lambda() wrapper.lambda()
.eq(StrUtil.isNotBlank(reqVO.getMessageType()), MessageRecordDO::getChannel, reqVO.getChannel()); .eq(StrUtil.isNotBlank(reqVO.getMessageType()), MessageRecordDO::getMessageType, reqVO.getMessageType());
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
} }
} }

View File

@@ -271,6 +271,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
log.info("========== 重试队列重建完成 =========="); log.info("========== 重试队列重建完成 ==========");
} }
// 每60分钟主动从数据库中扫描一次将消息重试数据同步到 Redis 中
@Scheduled(fixedRate = 600000) @Scheduled(fixedRate = 600000)
@Override @Override
public void syncRetryQueueConsistency() { public void syncRetryQueueConsistency() {
@@ -334,6 +335,9 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
} }
} }
/**
* 通过设置lockUntil时间、process_status=1来从数据库层抢占该条重试消息。
*/
private boolean claimRetryMessage(Long messageId, String channel, LocalDateTime currentTime, LocalDateTime lockUntil) { private boolean claimRetryMessage(Long messageId, String channel, LocalDateTime currentTime, LocalDateTime lockUntil) {
return baseMapper.claimRetryMessage(messageId, channel, currentTime, lockUntil) > 0; return baseMapper.claimRetryMessage(messageId, channel, currentTime, lockUntil) > 0;
} }