第三方调用异常与事务边界、消息重试队列Redis与数据定时同步任务、重试次数闭环不稳定

This commit is contained in:
caozehui
2026-04-07 08:40:51 +08:00
parent 1bf5436f0c
commit 0a8627b440
32 changed files with 456 additions and 266 deletions

View File

@@ -6,6 +6,7 @@ import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService;
@@ -24,17 +25,20 @@ import java.util.concurrent.TimeUnit;
public class Sender {
@Autowired
public ChannelProviderConfigService channelProviderConfigService;
private ChannelProviderConfigService channelProviderConfigService;
@Autowired
public MessageRetryQueueService messageRetryQueueService;
private MessageRetryQueueService messageRetryQueueService;
@Autowired
public ProviderErrorCodeMappingService providerErrorCodeMappingService;
private ProviderErrorCodeMappingService providerErrorCodeMappingService;
@Autowired
public RestTemplateUtil restTemplateUtil;
@Autowired
public MessageRetryHistoryService messageRetryHistoryService;
private MessageRetryHistoryService messageRetryHistoryService;
@Autowired
public MessageRecordMapper messageRecordMapper;
private MessageRecordMapper messageRecordMapper;
@Autowired
private MessageConfirmRedisDAO messageConfirmRedisDAO;
public final ThreadPoolExecutor MSG_PUSH_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,
@@ -70,7 +74,7 @@ public class Sender {
}
boolean retryable = retryableFallback;
String errorCode = StrUtil.blankToDefault(defaultErrorCode, providerRawCode);
String errorCode = StrUtil.blankToDefault(providerRawCode, defaultErrorCode);
String resultMessage = StrUtil.blankToDefault(providerRawMessage, defaultMessage);
if (mappingDO != null) {
@@ -126,20 +130,21 @@ public class Sender {
message.setErrorCode(sendResult.getErrorCode());
message.setErrorMsg(sendResult.getMessage());
this.messageConfirmRedisDAO.removeFromConfirmQueue(message);
if (SendOutcome.SUCCESS.equals(sendResult.getOutcome())) {
this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getMessageId()));
this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getId()));
this.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
message.setNextRetryTime(null);
} else if (SendOutcome.RETRYABLE_FAILED.equals(sendResult.getOutcome())) {
this.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
} else if (SendOutcome.FAILED.equals(sendResult.getOutcome())) {
this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getMessageId()));
this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getId()));
this.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
}
this.messageRecordMapper.updateStatusAndErrorInfo(
message.getMessageId(),
message.getId(),
message.getStatus(),
message.getErrorCode(),
message.getErrorMsg(),
@@ -147,7 +152,7 @@ public class Sender {
message.getNextRetryTime()
);
this.messageRetryHistoryService.updateStatusAndErrorInfo(
message.getMessageId(),
message.getId(),
message.getStatus(),
message.getErrorCode(),
message.getErrorMsg()
@@ -167,4 +172,5 @@ public class Sender {
case CONFIG_INVALID -> MsgStatusConstant.CONFIG_INVALID;
};
}
}

View File

@@ -200,22 +200,35 @@ public class TelecomSmsSender implements SmsSender {
return;
}
TelecomSmsSelectDetailRes detailRes = selectResponse.getList().get(0);
if (detailRes.getStatus() == 5) {
// if (detailRes.getStatus() == 5) {
// SendResult failedResult = this.sender.buildFailureResult(
// message,
// detailRes.getStat(),
// null,
// "THIRD_PARTY_CALLBACK_FAILED",
// "短信回执返回失败",
// false
// );
// this.sender.applyCallbackResult(message, failedResult);
// } else if (detailRes.getStatus() == 4) {
// // 回执确认成功后,复用统一成功落库逻辑。
// this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId()));
// }
double random = Math.random();
System.out.println(random + " aaaa");
if (random > 0.5) {
SendResult failedResult = this.sender.buildFailureResult(
message,
detailRes.getStat(),
null,
"THIRD_PARTY_CALLBACK_FAILED",
"短信回执返回失败",
false
true
);
this.sender.applyCallbackResult(message, failedResult);
} else if (detailRes.getStatus() == 4) {
} else {
// 回执确认成功后,复用统一成功落库逻辑。
this.sender.applyCallbackResult(
message,
SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId())
);
this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId()));
}
} catch (Exception e) {
log.error("电信短信回执查询失败", e);

View File

@@ -57,6 +57,6 @@ public class BlacklistController {
@PreAuthorize("@ss.hasPermission('push:blacklist:delete')")
@Parameter(name = "ids", description = "id列表", required = true)
public CommonResult<Boolean> delete(@RequestParam("ids") List<Long> ids) {
return success(blacklistService.delete(ids));
return success(blacklistService.removeByIds(ids));
}
}

View File

@@ -1,5 +1,6 @@
package com.njcn.msgpush.module.push.controller.admin.message;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent;
@@ -56,7 +57,8 @@ public class MessageRecordController {
@Operation(summary = "更新消息记录")
@PreAuthorize("@ss.hasPermission('push:message:update')")
public CommonResult<Boolean> update(@Validated @RequestBody MessageRecordReqVO reqVO) {
return success(messageRecordService.update(reqVO));
MessageRecordDO messageRecordDO = BeanUtil.copyProperties(reqVO, MessageRecordDO.class);
return success(messageRecordService.updateById(messageRecordDO));
}
@PostMapping("/delete")
@@ -64,6 +66,6 @@ public class MessageRecordController {
@PreAuthorize("@ss.hasPermission('push:message:delete')")
@Parameter(name = "ids", description = "编号", required = true)
public CommonResult<Boolean> delete(@RequestParam("ids") List<Long> ids) {
return success(messageRecordService.delete(ids));
return success(messageRecordService.removeByIds(ids));
}
}

View File

@@ -5,6 +5,7 @@ import com.njcn.msgpush.framework.common.pojo.PageParam;
import com.njcn.msgpush.framework.common.validation.InEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Data;
@@ -18,10 +19,6 @@ public class MessageRecordReqVO extends PageParam {
@Schema(description = "主键ID")
private Long id;
@Schema(description = "消息唯一ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "123456")
@NotBlank(message = "消息唯一ID不能为空")
private String messageId;
@Schema(description = "应用名称/来源系统标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "NPQS-9500")
@NotBlank(message = "应用名称/来源系统标识不能为空")
private String appName;

View File

@@ -8,7 +8,7 @@ import lombok.Data;
*/
@Data
public class MessageSendResultVO {
private String messageId;
private Long messageId;
/**
* 发送结果
*/

View File

@@ -60,6 +60,6 @@ public class RateLimitConfigController {
@PreAuthorize("@ss.hasPermission('push:ratelimit:delete')")
@Parameter(name = "ids", description = "id列表", required = true)
public CommonResult<Boolean> delete(@RequestParam List<Long> ids) {
return success(rateLimitConfigService.delete(ids));
return success(rateLimitConfigService.removeByIds(ids));
}
}

View File

@@ -14,7 +14,7 @@ import lombok.Data;
@Schema(description = "管理后台 - 消息重试 Request VO")
public class MessageRetryQueueReqVO extends PageParam {
@Schema(description = "消息ID")
private String messageId;
private Long messageId;
@Schema(description = "渠道类型", example = "sms/email/app_push")
@NotBlank(message = "渠道类型不能为空")

View File

@@ -21,11 +21,6 @@ public class MessageRecordDO extends BaseDO {
*/
private Long id;
/**
* 消息唯一ID
*/
private String messageId;
/**
* 应用名称/来源系统标识
*/

View File

@@ -20,7 +20,7 @@ public class MessageRetryHistoryDO extends BaseDO {
/**
* 关联 message_record 的 message_id
*/
private String messageId;
private Long messageId;
/**
* 重试序号0=首次发送1=第1次重试2=第2次重试

View File

@@ -24,7 +24,7 @@ public class MessageRetryQueueDO extends BaseDO {
/**
* 关联message_record的message_id
*/
private String messageId;
private Long messageId;
/**
* 渠道类型

View File

@@ -9,5 +9,5 @@ import java.time.LocalDateTime;
@Mapper
public interface MessageRecordMapper extends BaseMapperX<MessageRecordDO> {
int updateStatusAndErrorInfo(@Param("messageId") String messageId, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg,@Param("lastRetryTime") LocalDateTime lastRetryTime, @Param("nextRetryTime") LocalDateTime nextRetryTime);
int updateStatusAndErrorInfo(@Param("id") Long id, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg,@Param("lastRetryTime") LocalDateTime lastRetryTime, @Param("nextRetryTime") LocalDateTime nextRetryTime);
}

View File

@@ -10,6 +10,6 @@
error_msg = #{errorMsg},
last_retry_time= #{lastRetryTime},
next_retry_time= #{nextRetryTime}
WHERE message_id = #{messageId}
WHERE id = #{id}
</update>
</mapper>

View File

@@ -5,7 +5,8 @@
<select id="selectNeedRetryMessages" resultType="com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO">
SELECT *
FROM push_message_retry_queue
WHERE next_retry_time <![CDATA[ <= ]]> #{currentTime} AND deleted = 0
WHERE next_retry_time <![CDATA[ <= ]]> #{currentTime}
AND deleted = 0
ORDER BY next_retry_time ASC
LIMIT #{limit}
</select>

View File

@@ -0,0 +1,32 @@
package com.njcn.msgpush.module.push.dal.redis;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* @author caozehui
* @data 2026-04-03
*/
@Component
@RequiredArgsConstructor
public class MessageConfirmRedisDAO {
private final RedisTemplate<String, String> redisTemplate;
private static final String CONFIRM_QUEUE_KEY_PREFIX = "msPush:confirm_queue:";
private String getConfirmQueueKey(Long messageId) {
return CONFIRM_QUEUE_KEY_PREFIX + messageId;
}
public boolean addToConfirmQueue(MessageRecordDO message) {
String key = getConfirmQueueKey(message.getId());
return redisTemplate.opsForValue().setIfAbsent(key, "");
}
public boolean removeFromConfirmQueue(MessageRecordDO message) {
String key = getConfirmQueueKey(message.getId());
return redisTemplate.delete(key);
}
}

View File

@@ -3,12 +3,15 @@ package com.njcn.msgpush.module.push.dal.redis;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.List;
/**
* {@link com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO} RedisDAO
@@ -25,7 +28,7 @@ public class MessageRetryRedisDAO {
/**
* Redis中消息重试队列的Key前缀
*/
private static final String RETRY_QUEUE_KEY_PREFIX = "msPpush:retry_queue:";
private static final String RETRY_QUEUE_KEY_PREFIX = "msPush:retry_queue:";
/**
* 获取指定渠道的重试队列Key
@@ -34,6 +37,51 @@ public class MessageRetryRedisDAO {
return RETRY_QUEUE_KEY_PREFIX + channel;
}
/**
* 原子认领并移除到期消息Lua 脚本保证原子性)
* 适用于单实例场景,直接删除不再单独维护认领状态
*
* @param channel 渠道类型
* @param currentTime 当前时间戳
* @param limit 限制数量
* @return 认领成功的消息 ID 列表
*/
public List<String> claimAndRemoveDueMessages(String channel, long currentTime, int limit) {
String key = getRetryQueueKey(channel);
String luaScript =
"local key = KEYS[1] " +
"local currentTime = tonumber(ARGV[1]) " +
"local limit = tonumber(ARGV[2]) " +
"local dueMessages = redis.call('ZRANGEBYSCORE', key, 0, currentTime, 'LIMIT', 0, limit) " +
"if #dueMessages == 0 then return {} end " +
"for _, messageId in ipairs(dueMessages) do " +
" redis.call('ZREM', key, messageId) " +
"end " +
"return dueMessages";
DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(List.class);
try {
List<String> claimedMessageIds = redisTemplate.execute(
redisScript,
Arrays.asList(key),
String.valueOf(currentTime),
String.valueOf(limit)
);
log.debug("原子认领并移除到期消息成功channel={}, 认领数量={}", channel,
claimedMessageIds != null ? claimedMessageIds.size() : 0);
return claimedMessageIds != null ? claimedMessageIds : Collections.emptyList();
} catch (Exception e) {
return Collections.emptyList();
}
}
/**
* 添加消息到重试队列
*
@@ -43,17 +91,7 @@ public class MessageRetryRedisDAO {
String key = getRetryQueueKey(message.getChannel());
double score = message.getNextRetryTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
try {
redisTemplate.opsForZSet().add(key, message.getMessageId(), score);
log.debug("添加消息到重试队列成功: channel={}, messageId={}, nextRetryTime={}",
message.getChannel(), message.getMessageId(), message.getNextRetryTime());
return true;
} catch (Exception e) {
log.error("添加消息到重试队列失败: channel={}, messageId={}", message.getChannel(), message.getMessageId(), e);
log.warn("Redis 入队失败,当前仅保留数据库重试记录等待后续恢复: channel={}, messageId={}",
message.getChannel(), message.getMessageId());
return false;
}
return redisTemplate.opsForZSet().add(key, message.getId() + "", score);
}
/**
@@ -62,55 +100,13 @@ public class MessageRetryRedisDAO {
* @param channel 渠道类型
* @param messageId 消息ID
*/
public boolean removeFromRetryQueue(String channel, String messageId) {
public boolean removeFromRetryQueue(String channel, Long messageId) {
String key = getRetryQueueKey(channel);
try {
redisTemplate.opsForZSet().remove(key, messageId);
log.debug("从重试队列移除消息成功: channel={}, messageId={}", channel, messageId);
return true;
} catch (Exception e) {
log.error("从重试队列移除消息失败: channel={}, messageId={}", channel, messageId, e);
log.warn("Redis 出队失败,当前消息可能存在待清理的 Redis 脏数据: channel={}, messageId={}", channel, messageId);
return false;
}
redisTemplate.opsForZSet().remove(key, messageId);
return true;
}
/**
* 获取需要重试的消息ID集合
*
* @param channel 渠道类型
* @param currentTime 当前时间戳
* @param limit 限制数量
* @return 消息ID集合
*/
public Set<String> getNeedRetryMessageIds(String channel, long currentTime, int limit) {
String key = getRetryQueueKey(channel);
try {
return redisTemplate.opsForZSet().rangeByScore(key, 0, currentTime, 0, limit);
} catch (Exception e) {
log.error("获取需要重试的消息ID失败: channel={}", channel, e);
return Collections.emptySet();
}
}
/**
* 获取队列大小
*
* @param channel 渠道类型
* @return 队列大小
*/
public Long getQueueSize(String channel) {
String key = getRetryQueueKey(channel);
try {
return redisTemplate.opsForZSet().size(key);
} catch (Exception e) {
log.error("获取队列大小失败: channel={}", channel, e);
return 0L;
}
}
/**
* 清空指定渠道的重试队列
@@ -119,12 +115,59 @@ public class MessageRetryRedisDAO {
*/
public void clearRetryQueue(String channel) {
String key = getRetryQueueKey(channel);
redisTemplate.delete(key);
}
/**
* 检查消息是否在重试队列中
*
* @param channel 渠道类型
* @param messageId 消息 ID
* @return 是否存在
*/
public boolean existsInRetryQueue(String channel, Long messageId) {
String key = getRetryQueueKey(channel);
try {
redisTemplate.delete(key);
log.info("清空重试队列成功: channel={}", channel);
Double score = redisTemplate.opsForZSet().score(key, messageId + "");
return score != null;
} catch (Exception e) {
log.error("清空重试队列失败: channel={}", channel, e);
return false;
}
}
/**
* 批量添加消息到重试队列
*
* @param messages 消息列表
* @return 成功添加的数量
*/
public int batchAddToRetryQueue(List<MessageRecordDO> messages) {
if (messages == null || messages.isEmpty()) {
return 0;
}
int successCount = 0;
for (MessageRecordDO message : messages) {
if (addToRetryQueue(message)) {
successCount++;
}
}
return successCount;
}
/**
* 检查 Redis 是否可用
*
* @return Redis 是否可用
*/
public boolean isRedisAvailable() {
try {
String pong = redisTemplate.execute((RedisCallback<String>) connection ->
new String(connection.ping()));
return "PONG".equals(pong);
} catch (Exception e) {
log.error("Redis 不可用", e);
return false;
}
}
}

View File

@@ -32,7 +32,7 @@ public class MessageRetryJob {
/**
* 定时处理邮件重试队列每10秒执行一次
*/
@Scheduled(fixedRate = 10000)
//@Scheduled(fixedRate = 10000)
public void processEmailRetryQueue() {
messageRetryQueueService.processRetryBatch("email");
}
@@ -40,8 +40,9 @@ public class MessageRetryJob {
/**
* 定时处理APP推送重试队列每10秒执行一次
*/
@Scheduled(fixedRate = 10000)
//@Scheduled(fixedRate = 10000)
public void processAppPushRetryQueue() {
messageRetryQueueService.processRetryBatch("app_push");
}
}

View File

@@ -16,7 +16,6 @@ import java.util.List;
public interface BlacklistService extends IService<BlacklistDO> {
Page<BlacklistDO> getPage(BlacklistReqVO reqVO);
Boolean delete(List<Long> ids);
/**
* 进行黑名单检查

View File

@@ -7,13 +7,11 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.mysql.blacklist.BlacklistMapper;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
@@ -29,10 +27,6 @@ public class BlacklistServiceImpl extends ServiceImpl<BlacklistMapper, Blacklist
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
}
@Override
public Boolean delete(List<Long> ids) {
return this.lambdaUpdate().in(BlacklistDO::getId, ids).set(BlacklistDO::getDeleted, true).update();
}
@Override
public boolean check(MessageRecordDO messageRecordDO) {
@@ -40,8 +34,7 @@ public class BlacklistServiceImpl extends ServiceImpl<BlacklistMapper, Blacklist
String[] split = receiver.split(String.valueOf(StrUtil.C_COMMA));
BlacklistDO blacklistDO = this.lambdaQuery()
.eq(BlacklistDO::getChannel, messageRecordDO.getChannel())
.in(split.length > 0, BlacklistDO::getTarget, split)
.eq(BlacklistDO::getDeleted, false).one();
.in(split.length > 0, BlacklistDO::getTarget, split).one();
if (ObjectUtil.isNotNull(blacklistDO)) {
blacklistDO.setHitCount(blacklistDO.getHitCount() + 1);

View File

@@ -1,6 +1,7 @@
package com.njcn.msgpush.module.push.service.message;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
@@ -10,7 +11,7 @@ import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
public interface MessageRecordService {
public interface MessageRecordService extends IService<MessageRecordDO> {
/**
* 发送消息包括email、sms、app_push
@@ -36,23 +37,14 @@ public interface MessageRecordService {
*/
boolean add(MessageRecordReqVO messageRecordSendReqVO);
MessageRecordDO getById(String messageId);
List<MessageRecordDO> listByIds(Collection<? extends Serializable> ids);
Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO);
boolean update(MessageRecordReqVO reqVO);
boolean delete(List<Long> ids);
/**
* 更新消息记录重试次数
*
* @param messageId
* @param id
* @return
*/
boolean updateRetryCount(String messageId);
boolean updateMessage(MessageRecordDO messageRecordDO);
boolean updateRetryCount(Long id);
}

View File

@@ -1,7 +1,6 @@
package com.njcn.msgpush.module.push.service.message;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -20,6 +19,7 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryHistoryDO;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
@@ -29,11 +29,10 @@ import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,50 +65,73 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
@Autowired
private RateLimitRedisDAO rateLimitRedisDAO;
@Autowired
private MessageConfirmRedisDAO messageConfirmRedisDAO;
@Override
@Transactional(rollbackFor = Exception.class)
public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList) {
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
messageRecordDOList.forEach(messageRecordDO -> messageRecordDO.setStatus(MsgStatusConstant.PENDING));
this.saveBatch(messageRecordDOList);
List<MessageRecordDO> messageRecordDOList = this.createMessageRecords(reqVOList);
return this.processSendMsg(messageRecordDOList);
}
@Transactional(rollbackFor = Exception.class)
public List<MessageRecordDO> createMessageRecords(List<MessageRecordReqVO> reqVOList) {
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
messageRecordDOList.forEach(messageRecordDO -> {
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
// messageRecordDO.setRetryCount(0);
});
this.saveBatch(messageRecordDOList);
return messageRecordDOList;
}
/**
* 发送主流程。
* 这里统一接收 sender 返回的 SendResult再决定消息状态、重试队列和服务商健康度如何变化。
*/
@Override
@Transactional(rollbackFor = Exception.class)
public List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList) {
msgPushGuardChain.checkAll(messageRecordDOList);
List<MessageSendResultVO> resultList = new ArrayList<>();
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
MessageSendResultVO resultVO = new MessageSendResultVO();
resultVO.setMessageId(messageRecordDO.getMessageId());
if (!MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) {
resultVO.setResult(false);
resultVO.setDetail(this.buildBlockedDetail(messageRecordDO));
this.updateMessage(messageRecordDO);
try {
MessageSendResultVO resultVO = this.processSingleMessage(messageRecordDO);
resultList.add(resultVO);
continue;
} catch (Exception e) {
MessageSendResultVO failedVO = new MessageSendResultVO();
failedVO.setMessageId(messageRecordDO.getId());
failedVO.setResult(false);
failedVO.setDetail("消息处理异常:" + e.getMessage());
resultList.add(failedVO);
}
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService
.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
SendResult sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
this.applySendResult(messageRecordDO, sendResult);
this.recordRetryHistory(messageRecordDO);
resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome()));
resultVO.setDetail(this.buildResultDetail(sendResult));
resultList.add(resultVO);
}
return resultList;
}
private MessageSendResultVO processSingleMessage(MessageRecordDO messageRecordDO) {
MessageSendResultVO resultVO = new MessageSendResultVO();
resultVO.setMessageId(messageRecordDO.getId());
if (!MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) {
resultVO.setResult(false);
resultVO.setDetail(this.buildBlockedDetail(messageRecordDO));
this.updateById(messageRecordDO);
return resultVO;
}
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService
.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
SendResult sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
this.applySendResult(messageRecordDO, sendResult);
this.recordRetryHistory(messageRecordDO);
// 在这里修改接口返回的是投递结果,还是调用接口结果
resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome()));
resultVO.setDetail(this.buildResultDetail(sendResult));
return resultVO;
}
private SendResult sendMessage(MessageRecordDO messageRecordDO,
ChannelProviderConfigDO channelProviderConfigDO,
MessageProviderFactory messageProviderFactory) {
@@ -137,7 +159,8 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
* sender 只负责调用第三方并返回统一结果;
* 主流程在这里统一做状态落库、重试编排和健康度更新。
*/
private void applySendResult(MessageRecordDO messageRecordDO, SendResult sendResult) {
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void applySendResult(MessageRecordDO messageRecordDO, SendResult sendResult) {
messageRecordDO.setStatus(this.mapOutcomeToStatus(sendResult.getOutcome()));
messageRecordDO.setErrorCode(sendResult.getErrorCode());
messageRecordDO.setErrorMsg(sendResult.getMessage());
@@ -152,14 +175,8 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
}
// 成功或已受理都视为本次调用已正常投递,更新成功健康度并消耗配额。
if (SendOutcome.SUCCESS.equals(sendResult.getOutcome()) || SendOutcome.ACCEPTED.equals(sendResult.getOutcome())) {
messageRecordDO.setErrorCode(null);
messageRecordDO.setErrorMsg(null);
messageRecordDO.setNextRetryTime(null);
channelProviderConfigService.successUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName());
rateLimitRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver());
// 可重试失败统一进入重试队列,避免 sender 内部各自处理重试逻辑。
if (SendOutcome.ACCEPTED.equals(sendResult.getOutcome())) {
messageConfirmRedisDAO.addToConfirmQueue(messageRecordDO);
} else if (SendOutcome.RETRYABLE_FAILED.equals(sendResult.getOutcome())) {
messageRetryQueueService.saveOrUpdateRetryMessage(messageRecordDO);
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
@@ -171,13 +188,15 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
messageRecordDO.setNextRetryTime(null);
}
this.updateMessage(messageRecordDO);
this.updateById(messageRecordDO);
}
private void recordRetryHistory(MessageRecordDO messageRecordDO) {
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void recordRetryHistory(MessageRecordDO messageRecordDO) {
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
messageRetryHistoryDO.setMessageId(messageRecordDO.getId());
messageRetryHistoryDO.setStatus(messageRecordDO.getStatus());
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
}
@@ -219,34 +238,20 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
};
}
@Override
public boolean updateMessage(MessageRecordDO messageRecordDO) {
return this.updateById(messageRecordDO);
}
@Override
public boolean updateRetryCount(String messageId) {
MessageRecordDO one = this.lambdaQuery().eq(MessageRecordDO::getMessageId, messageId).one();
public boolean updateRetryCount(Long id) {
MessageRecordDO one = this.lambdaQuery().eq(MessageRecordDO::getId, id).one();
one.setRetryCount(one.getRetryCount() + 1);
return this.updateById(one);
}
@Override
public MessageRecordDO getById(String messageId) {
return this.lambdaQuery().eq(MessageRecordDO::getMessageId, messageId).one();
}
@Override
public boolean add(MessageRecordReqVO messageRecordSendReqVO) {
MessageRecordDO messageRecordDO = BeanUtil.copyProperties(messageRecordSendReqVO, MessageRecordDO.class);
return this.save(messageRecordDO);
}
@Override
public List<MessageRecordDO> listByIds(Collection<? extends Serializable> ids) {
return this.lambdaQuery().in(CollectionUtil.isNotEmpty(ids), MessageRecordDO::getMessageId, ids).list();
}
@Override
public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) {
QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>();
@@ -256,15 +261,4 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
.eq(StrUtil.isNotBlank(reqVO.getAppName()), MessageRecordDO::getAppName, reqVO.getAppName());
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
}
@Override
public boolean update(MessageRecordReqVO reqVO) {
MessageRecordDO messageRecordDO = BeanUtil.copyProperties(reqVO, MessageRecordDO.class);
return this.updateById(messageRecordDO);
}
@Override
public boolean delete(List<Long> ids) {
return this.removeByIds(ids);
}
}

View File

@@ -27,7 +27,6 @@ public class SystemQuotaConfigServiceImpl extends ServiceImpl<SystemQuotaConfigM
.eq(SystemQuotaConfigDO::getChannel, channel)
.eq(SystemQuotaConfigDO::getAppName, appName)
.eq(SystemQuotaConfigDO::getEnabled, true)
.eq(SystemQuotaConfigDO::getDeleted, false)
.one();
}

View File

@@ -17,7 +17,6 @@ public interface RateLimitConfigService extends IService<RateLimitConfigDO> {
RateLimitConfigDO getByChannelAndAppName(String channel, String appName);
boolean delete(List<Long> ids);
void check(List<MessageRecordDO> messageRecordList);

View File

@@ -39,16 +39,7 @@ public class RateLimitConfigServiceImpl extends ServiceImpl<RateLimitConfigMappe
public RateLimitConfigDO getByChannelAndAppName(String channel, String appName) {
return this.lambdaQuery().eq(RateLimitConfigDO::getChannel, channel)
.eq(RateLimitConfigDO::getAppName, appName)
.eq(RateLimitConfigDO::getEnabled, true)
.eq(RateLimitConfigDO::getDeleted, false).one();
}
@Override
public boolean delete(List<Long> ids) {
return this.lambdaUpdate()
.set(RateLimitConfigDO::getDeleted, true)
.in(RateLimitConfigDO::getId, ids)
.update();
.eq(RateLimitConfigDO::getEnabled, true).one();
}
@Override

View File

@@ -15,7 +15,7 @@ public interface MessageRetryHistoryService {
* @param messageId 消息ID
* @return
*/
Integer getMaxRetrySequence(String messageId);
Integer getMaxRetrySequence(Long messageId);
/**
* 更新消息错误信息
@@ -26,5 +26,5 @@ public interface MessageRetryHistoryService {
* @param errorMsg 错误信息
* @return
*/
boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg);
boolean updateStatusAndErrorInfo(Long messageId, String status, String errorCode, String errorMsg);
}

View File

@@ -17,7 +17,7 @@ public class MessageRetryHistoryServiceImpl extends ServiceImpl<MessageRetryHist
return this.save(messageRetryHistoryDO);
}
public Integer getMaxRetrySequence(String messageId) {
public Integer getMaxRetrySequence(Long messageId) {
MessageRetryHistoryDO one = this.lambdaQuery().eq(MessageRetryHistoryDO::getMessageId, messageId)
.orderByDesc(MessageRetryHistoryDO::getRetrySequence)
.last("limit 1")
@@ -26,7 +26,7 @@ public class MessageRetryHistoryServiceImpl extends ServiceImpl<MessageRetryHist
}
@Override
public boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg) {
public boolean updateStatusAndErrorInfo(Long messageId, String status, String errorCode, String errorMsg) {
MessageRetryHistoryDO one = this.lambdaQuery()
.eq(MessageRetryHistoryDO::getMessageId, messageId)
.orderByDesc(MessageRetryHistoryDO::getRetrySequence)

View File

@@ -5,7 +5,6 @@ import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueR
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import java.time.LocalDateTime;
import java.util.List;
public interface MessageRetryQueueService {
@@ -31,7 +30,6 @@ public interface MessageRetryQueueService {
*/
void manualRetry(List<String> messageIds);
/**
* 分页查询重试队列
*
@@ -40,18 +38,22 @@ public interface MessageRetryQueueService {
*/
PageResult<MessageRetryQueueDO> getPage(MessageRetryQueueReqVO reqVO);
boolean deleteByMessageIds(List<Long> messageIds);
/**
* 更新重试信息
*
* @param messageId 消息ID
* @param retryCount 重试次数
* @param nextRetryTime 下次重试时间
* @param lastRetryTime 最后重试时间
* @param lastErrorMsg 最后错误信息
* 应用启动时重建重试队列
*/
boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String lastErrorMsg);
void rebuildRetryQueueOnStartup();
boolean deleteByMessageIds(List<String> messageIds);
/**
* 定时同步数据库和 Redis 的重试队列一致性
*/
void syncRetryQueueConsistency();
boolean updateLastRetryTime(String messageId, LocalDateTime now);
/**
* 从数据库兜底读取待重试消息并回填 Redis
*
* @param channel 渠道类型
*/
void fallbackToDatabase(String channel);
}

View File

@@ -13,47 +13,62 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueMapper, MessageRetryQueueDO> implements MessageRetryQueueService {
public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueMapper, MessageRetryQueueDO> implements MessageRetryQueueService, CommandLineRunner {
@Autowired
private MessageRetryRedisDAO messageRetryRedisDAO;
@Autowired
private MessageRecordService messageRecordService;
@Autowired
public RetryStrategyConfigService retryStrategyConfigService;
@Autowired
private MessageConfirmRedisDAO messageConfirmRedisDAO;
private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_MAX_RETRY_COUNT = 5;
private static final List<String> CHANNELS = Arrays.asList(
ChannelTypeEnum.SMS.getCode(),
ChannelTypeEnum.EMAIL.getCode(),
ChannelTypeEnum.APP_PUSH.getCode());
private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false);
@Override
@Transactional(rollbackFor = Exception.class)
public void saveOrUpdateRetryMessage(MessageRecordDO message) {
MessageRetryQueueDO existing = super.baseMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, message.getMessageId())
new LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, message.getId())
);
message.setLastRetryTime(message.getSendTime());
if (ObjectUtil.isNull(existing)) {
MessageRetryQueueDO retryRecord = new MessageRetryQueueDO();
retryRecord.setMessageId(message.getMessageId());
retryRecord.setMessageId(message.getId());
retryRecord.setChannel(message.getChannel());
retryRecord.setReceiver(message.getReceiver());
retryRecord.setRetryCount(1);
RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(message.getChannel());
retryRecord.setMaxRetry(ObjectUtil.isNull(strategyConfig) ? DEFAULT_MAX_RETRY_COUNT : strategyConfig.getMaxRetryCount());
@@ -65,13 +80,12 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
message.setNextRetryTime(nextRetryTime);
this.save(retryRecord);
if (!messageRetryRedisDAO.addToRetryQueue(message)) {
log.warn("消息已写入数据库重试队列,但 Redis 入队失败: messageId={}", message.getMessageId());
log.warn("消息已写入数据库重试队列,但 Redis 入队失败: messageId={}", message.getId());
}
return;
}
int newRetryCount = existing.getRetryCount() + 1;
existing.setRetryCount(newRetryCount);
existing.setLastRetryTime(message.getSendTime());
existing.setLastErrorMsg(message.getErrorMsg());
@@ -80,17 +94,18 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
message.setNextRetryTime(null);
existing.setNextRetryTime(null);
this.updateById(existing);
this.deleteByMessageIds(Collections.singletonList(message.getMessageId()));
messageRetryRedisDAO.removeFromRetryQueue(message.getChannel(), message.getMessageId());
this.deleteByMessageIds(Collections.singletonList(message.getId()));
messageRetryRedisDAO.removeFromRetryQueue(message.getChannel(), message.getId());
return;
}
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), newRetryCount);
existing.setRetryCount(newRetryCount);
existing.setNextRetryTime(nextRetryTime);
message.setNextRetryTime(nextRetryTime);
this.updateById(existing);
if (!messageRetryRedisDAO.addToRetryQueue(message)) {
log.warn("消息已更新数据库重试队列,但 Redis 入队失败: messageId={}", message.getMessageId());
log.warn("消息已更新数据库重试队列,但 Redis 入队失败: messageId={}", message.getId());
}
}
@@ -144,17 +159,49 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Override
public void processRetryBatch(String channel) {
long epochMilli = System.currentTimeMillis();
Set<String> needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, epochMilli, DEFAULT_BATCH_SIZE);
if (needRetryMessageIds.isEmpty()) {
return;
// 使用原子认领机制,一次性完成读取 + 删除
List<String> claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(claimedMessageIds)) {
log.debug("Redis 中没有待重试消息channel={}", channel);
fallbackToDatabase(channel);
epochMilli = System.currentTimeMillis();
claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(claimedMessageIds)) {
return;
}
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(needRetryMessageIds);
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(claimedMessageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO);
}
}
/**
* 处理单条消息重试,根据执行结果决定是否重新入队
* 成功:不再回 Redis并逻辑删除数据库队列记录
* 失败且继续重试:重新计算 nextRetryTime重新写入 Redis 和数据库队列表
* 失败且终止:不再回 Redis并逻辑删除数据库队列记录
*
* @param messageRecordDO 消息记录
*/
private void processSingleRetry(MessageRecordDO messageRecordDO) {
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0);
messageRecordService.updateRetryCount(messageRecordDO.getId());
}
/**
* 删除重试记录(数据库逻辑删除 + Redis 已提前删除)
*/
private void deleteRetryRecord(MessageRecordDO message) {
this.deleteByMessageIds(Collections.singletonList(message.getId()));
messageRecordService.updateById(message);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void manualRetry(List<String> messageIds) {
@@ -169,7 +216,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Override
public PageResult<MessageRetryQueueDO> getPage(MessageRetryQueueReqVO reqVO) {
LambdaQueryWrapper<MessageRetryQueueDO> queryWrapper = new LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(StrUtil.isNotBlank(reqVO.getMessageId()), MessageRetryQueueDO::getMessageId, reqVO.getMessageId())
.eq(ObjectUtil.isNotNull(reqVO.getMessageId()), MessageRetryQueueDO::getMessageId, reqVO.getMessageId())
.eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRetryQueueDO::getChannel, reqVO.getChannel())
.eq(StrUtil.isNotBlank(reqVO.getReceiver()), MessageRetryQueueDO::getReceiver, reqVO.getReceiver())
.ge(ObjectUtil.isNotNull(reqVO.getMinRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMinRetryCount())
@@ -180,50 +227,137 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
}
@Override
public boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String s) {
return this.lambdaUpdate()
.set(MessageRetryQueueDO::getRetryCount, retryCount)
.set(MessageRetryQueueDO::getNextRetryTime, nextRetryTime)
.set(MessageRetryQueueDO::getLastRetryTime, lastRetryTime)
.eq(MessageRetryQueueDO::getMessageId, messageId)
.update();
public boolean deleteByMessageIds(List<Long> messageIds) {
if (CollUtil.isEmpty(messageIds)) {
return false;
}
return this.remove(new LambdaQueryWrapper<MessageRetryQueueDO>()
.in(MessageRetryQueueDO::getMessageId, messageIds));
}
@Override
public void run(String... args) throws Exception {
rebuildRetryQueueOnStartup();
startupRebuildCompleted.set(true);
}
@Override
public boolean deleteByMessageIds(List<String> messageIds) {
return this.lambdaUpdate()
.set(MessageRetryQueueDO::getDeleted, true)
.in(CollUtil.isNotEmpty(messageIds), MessageRetryQueueDO::getMessageId, messageIds)
.update();
@Transactional(rollbackFor = Exception.class)
public void rebuildRetryQueueOnStartup() {
log.info("========== 开始重建重试队列 ==========");
for (String channel : CHANNELS) {
try {
log.info("正在清空渠道 [{}] 的 Redis 重试队列", channel);
messageRetryRedisDAO.clearRetryQueue(channel);
log.info("正在从数据库加载渠道 [{}] 的待重试消息", channel);
List<MessageRetryQueueDO> dbRecords = this.lambdaQuery()
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.list();
if (CollUtil.isEmpty(dbRecords)) {
log.info("渠道 [{}] 没有待重试的消息", channel);
continue;
}
List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList());
List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds);
if (CollUtil.isEmpty(messages)) {
log.warn("渠道 [{}] 的消息记录不存在", channel);
continue;
}
int successCount = messageRetryRedisDAO.batchAddToRetryQueue(messages);
log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}",
channel, dbRecords.size(), successCount);
} catch (Exception e) {
log.error("重建渠道 [{}] 的重试队列失败", channel, e);
}
}
log.info("========== 重试队列重建完成 ==========");
}
@Scheduled(fixedRate = 600000)
@Override
public void syncRetryQueueConsistency() {
if (!startupRebuildCompleted.get()) {
log.debug("启动重建未完成,跳过本次同步");
return;
}
log.info("开始同步redis与数据库重试队列一致性");
for (String channel : CHANNELS) {
List<MessageRetryQueueDO> dbRecords = this.lambdaQuery()
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.list();
if (CollUtil.isEmpty(dbRecords)) {
continue;
}
for (MessageRetryQueueDO dbRecord : dbRecords) {
boolean existsInRedis = messageRetryRedisDAO.existsInRetryQueue(channel, dbRecord.getMessageId());
if (!existsInRedis) {
Long messageId = dbRecord.getMessageId();
MessageRecordDO message = messageRecordService.getById(messageId);
if (message != null && message.getNextRetryTime() != null) {
boolean added = messageRetryRedisDAO.addToRetryQueue(message);
if (added) {
log.debug("补回消息到 Redis: channel={}, messageId={}", channel, messageId);
} else {
log.warn("补回消息到 Redis 失败channel={}, messageId={}", channel, messageId);
}
} else {
log.warn("消息记录不存在或 next_retry_time 为空channel={}, messageId={}", channel, messageId);
}
}
}
}
}
@Override
public boolean updateLastRetryTime(String messageId, LocalDateTime now) {
return this.lambdaUpdate()
.eq(MessageRetryQueueDO::getMessageId, messageId)
.set(MessageRetryQueueDO::getLastRetryTime, now)
.update();
public void fallbackToDatabase(String channel) {
boolean redisAvailable = messageRetryRedisDAO.isRedisAvailable();
if (!redisAvailable) {
log.warn("Redis 不可用channel={}", channel);
return;
}
fallbackToDatabaseDirectly(channel);
}
private void processSingleRetry(MessageRecordDO messageRecordDO) {
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
MessageSendResultVO messageSendResultVO = messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0);
messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
if (messageSendResultVO.getResult()) {
log.info("处理消息重试成功: messageId={}", messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
/**
* 从数据库补拉
*
* @param channel
*/
private void fallbackToDatabaseDirectly(String channel) {
List<MessageRetryQueueDO> dbRecords = baseMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(dbRecords)) {
return;
}
if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) {
log.warn("处理消息重试失败,已保留在重试队列: messageId={}", messageRecordDO.getMessageId());
List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList());
List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds);
if (CollUtil.isEmpty(messages)) {
return;
}
log.error("处理消息重试失败且不再重试: messageId={}", messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
if (messageRetryRedisDAO.isRedisAvailable()) {
messageRetryRedisDAO.batchAddToRetryQueue(messages);
}
}
}

View File

@@ -18,7 +18,6 @@ public class RetryStrategyConfigServiceImpl extends ServiceImpl<RetryStrategyCon
@Override
public RetryStrategyConfigDO getStrategyConfig(String channel) {
return this.lambdaQuery().eq(RetryStrategyConfigDO::getChannel, channel)
.eq(RetryStrategyConfigDO::getDeleted, false)
.eq(RetryStrategyConfigDO::getEnabled, true)
.one();
}
@@ -26,7 +25,6 @@ public class RetryStrategyConfigServiceImpl extends ServiceImpl<RetryStrategyCon
@Override
public List<RetryStrategyConfigDO> listAll() {
return this.lambdaQuery()
.eq(RetryStrategyConfigDO::getDeleted, false)
.eq(RetryStrategyConfigDO::getEnabled, true)
.list();
}

View File

@@ -59,6 +59,7 @@ mybatis-plus:
mapper-locations: classpath*:com/njcn/msgpush/**/mapping/*.xml
configuration:
map-underscore-to-camel-case: true # 虽然默认为 true ,但是还是显示去指定下。
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
global-config:
db-config:
id-type: ASSIGN_ID # 分配 ID默认使用雪花算法

View File

@@ -59,7 +59,7 @@ public class MsgPushClientTest {
// }
for (int i = 0; i < 1; i++) {
MessageRecordReqVO message = new MessageRecordReqVO();
message.setMessageId(String.valueOf(UUID.randomUUID()));
message.setId(1234567890L);
message.setAppName("NPQS-9000");
message.setChannel(ChannelTypeEnum.SMS.getMsg());
message.setReceiver("18839431215");