diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java index 31c3080..ec62288 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java @@ -6,6 +6,7 @@ import com.njcn.msgpush.module.push.constant.MsgStatusConstant; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper; +import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO; import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService; import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService; import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService; @@ -24,17 +25,20 @@ import java.util.concurrent.TimeUnit; public class Sender { @Autowired - public ChannelProviderConfigService channelProviderConfigService; + private ChannelProviderConfigService channelProviderConfigService; @Autowired - public MessageRetryQueueService messageRetryQueueService; + private MessageRetryQueueService messageRetryQueueService; @Autowired - public ProviderErrorCodeMappingService providerErrorCodeMappingService; + private ProviderErrorCodeMappingService providerErrorCodeMappingService; @Autowired public RestTemplateUtil restTemplateUtil; @Autowired - public MessageRetryHistoryService messageRetryHistoryService; + private MessageRetryHistoryService messageRetryHistoryService; @Autowired - public MessageRecordMapper messageRecordMapper; + private MessageRecordMapper messageRecordMapper; + + @Autowired + private MessageConfirmRedisDAO messageConfirmRedisDAO; public final ThreadPoolExecutor MSG_PUSH_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor( 5, @@ -70,7 +74,7 @@ public class Sender { } boolean retryable = retryableFallback; - String errorCode = StrUtil.blankToDefault(defaultErrorCode, providerRawCode); + String errorCode = StrUtil.blankToDefault(providerRawCode, defaultErrorCode); String resultMessage = StrUtil.blankToDefault(providerRawMessage, defaultMessage); if (mappingDO != null) { @@ -126,20 +130,21 @@ public class Sender { message.setErrorCode(sendResult.getErrorCode()); message.setErrorMsg(sendResult.getMessage()); + this.messageConfirmRedisDAO.removeFromConfirmQueue(message); if (SendOutcome.SUCCESS.equals(sendResult.getOutcome())) { - this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getMessageId())); + this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getId())); this.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); message.setNextRetryTime(null); } else if (SendOutcome.RETRYABLE_FAILED.equals(sendResult.getOutcome())) { this.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); } else if (SendOutcome.FAILED.equals(sendResult.getOutcome())) { - this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getMessageId())); + this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getId())); this.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); } this.messageRecordMapper.updateStatusAndErrorInfo( - message.getMessageId(), + message.getId(), message.getStatus(), message.getErrorCode(), message.getErrorMsg(), @@ -147,7 +152,7 @@ public class Sender { message.getNextRetryTime() ); this.messageRetryHistoryService.updateStatusAndErrorInfo( - message.getMessageId(), + message.getId(), message.getStatus(), message.getErrorCode(), message.getErrorMsg() @@ -167,4 +172,5 @@ public class Sender { case CONFIG_INVALID -> MsgStatusConstant.CONFIG_INVALID; }; } + } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/sms/TelecomSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/sms/TelecomSmsSender.java index 571eefe..d0a8012 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/sms/TelecomSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/sms/TelecomSmsSender.java @@ -200,22 +200,35 @@ public class TelecomSmsSender implements SmsSender { return; } TelecomSmsSelectDetailRes detailRes = selectResponse.getList().get(0); - if (detailRes.getStatus() == 5) { +// if (detailRes.getStatus() == 5) { +// SendResult failedResult = this.sender.buildFailureResult( +// message, +// detailRes.getStat(), +// null, +// "THIRD_PARTY_CALLBACK_FAILED", +// "短信回执返回失败", +// false +// ); +// this.sender.applyCallbackResult(message, failedResult); +// } else if (detailRes.getStatus() == 4) { +// // 回执确认成功后,复用统一成功落库逻辑。 +// this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId())); +// } + double random = Math.random(); + System.out.println(random + " aaaa"); + if (random > 0.5) { SendResult failedResult = this.sender.buildFailureResult( message, detailRes.getStat(), null, "THIRD_PARTY_CALLBACK_FAILED", "短信回执返回失败", - false + true ); this.sender.applyCallbackResult(message, failedResult); - } else if (detailRes.getStatus() == 4) { + } else { // 回执确认成功后,复用统一成功落库逻辑。 - this.sender.applyCallbackResult( - message, - SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId()) - ); + this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId())); } } catch (Exception e) { log.error("电信短信回执查询失败", e); diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java index 3a13659..1613cc9 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java @@ -57,6 +57,6 @@ public class BlacklistController { @PreAuthorize("@ss.hasPermission('push:blacklist:delete')") @Parameter(name = "ids", description = "id列表", required = true) public CommonResult delete(@RequestParam("ids") List ids) { - return success(blacklistService.delete(ids)); + return success(blacklistService.removeByIds(ids)); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java index 5e595b1..5b612e8 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java @@ -1,5 +1,6 @@ package com.njcn.msgpush.module.push.controller.admin.message; +import cn.hutool.core.bean.BeanUtil; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent; @@ -56,7 +57,8 @@ public class MessageRecordController { @Operation(summary = "更新消息记录") @PreAuthorize("@ss.hasPermission('push:message:update')") public CommonResult update(@Validated @RequestBody MessageRecordReqVO reqVO) { - return success(messageRecordService.update(reqVO)); + MessageRecordDO messageRecordDO = BeanUtil.copyProperties(reqVO, MessageRecordDO.class); + return success(messageRecordService.updateById(messageRecordDO)); } @PostMapping("/delete") @@ -64,6 +66,6 @@ public class MessageRecordController { @PreAuthorize("@ss.hasPermission('push:message:delete')") @Parameter(name = "ids", description = "编号", required = true) public CommonResult delete(@RequestParam("ids") List ids) { - return success(messageRecordService.delete(ids)); + return success(messageRecordService.removeByIds(ids)); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java index 3b67b24..f9737e3 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java @@ -5,6 +5,7 @@ import com.njcn.msgpush.framework.common.pojo.PageParam; import com.njcn.msgpush.framework.common.validation.InEnum; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Pattern; import lombok.Data; @@ -18,10 +19,6 @@ public class MessageRecordReqVO extends PageParam { @Schema(description = "主键ID") private Long id; - @Schema(description = "消息唯一ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "123456") - @NotBlank(message = "消息唯一ID不能为空") - private String messageId; - @Schema(description = "应用名称/来源系统标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "NPQS-9500") @NotBlank(message = "应用名称/来源系统标识不能为空") private String appName; diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java index a6c05c3..1f4b026 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java @@ -8,7 +8,7 @@ import lombok.Data; */ @Data public class MessageSendResultVO { - private String messageId; + private Long messageId; /** * 发送结果 */ diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java index b986c42..9a6de9a 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java @@ -60,6 +60,6 @@ public class RateLimitConfigController { @PreAuthorize("@ss.hasPermission('push:ratelimit:delete')") @Parameter(name = "ids", description = "id列表", required = true) public CommonResult delete(@RequestParam List ids) { - return success(rateLimitConfigService.delete(ids)); + return success(rateLimitConfigService.removeByIds(ids)); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java index 6252046..992b48c 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java @@ -14,7 +14,7 @@ import lombok.Data; @Schema(description = "管理后台 - 消息重试 Request VO") public class MessageRetryQueueReqVO extends PageParam { @Schema(description = "消息ID") - private String messageId; + private Long messageId; @Schema(description = "渠道类型", example = "sms/email/app_push") @NotBlank(message = "渠道类型不能为空") diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/message/MessageRecordDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/message/MessageRecordDO.java index 57a1468..aa15938 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/message/MessageRecordDO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/message/MessageRecordDO.java @@ -21,11 +21,6 @@ public class MessageRecordDO extends BaseDO { */ private Long id; - /** - * 消息唯一ID - */ - private String messageId; - /** * 应用名称/来源系统标识 */ diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryHistoryDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryHistoryDO.java index c08820a..a3a1230 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryHistoryDO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryHistoryDO.java @@ -20,7 +20,7 @@ public class MessageRetryHistoryDO extends BaseDO { /** * 关联 message_record 的 message_id */ - private String messageId; + private Long messageId; /** * 重试序号:0=首次发送,1=第1次重试,2=第2次重试 diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java index c7687b6..08b4e08 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java @@ -24,7 +24,7 @@ public class MessageRetryQueueDO extends BaseDO { /** * 关联message_record的message_id */ - private String messageId; + private Long messageId; /** * 渠道类型 diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java index edec7b7..1033b95 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java @@ -9,5 +9,5 @@ import java.time.LocalDateTime; @Mapper public interface MessageRecordMapper extends BaseMapperX { - int updateStatusAndErrorInfo(@Param("messageId") String messageId, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg,@Param("lastRetryTime") LocalDateTime lastRetryTime, @Param("nextRetryTime") LocalDateTime nextRetryTime); + int updateStatusAndErrorInfo(@Param("id") Long id, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg,@Param("lastRetryTime") LocalDateTime lastRetryTime, @Param("nextRetryTime") LocalDateTime nextRetryTime); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml index 2104003..ed2bf63 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml @@ -10,6 +10,6 @@ error_msg = #{errorMsg}, last_retry_time= #{lastRetryTime}, next_retry_time= #{nextRetryTime} - WHERE message_id = #{messageId} + WHERE id = #{id} \ No newline at end of file diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml index cfadc63..1bdae88 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml @@ -5,7 +5,8 @@ diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageConfirmRedisDAO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageConfirmRedisDAO.java new file mode 100644 index 0000000..ed1e167 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageConfirmRedisDAO.java @@ -0,0 +1,32 @@ +package com.njcn.msgpush.module.push.dal.redis; + +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +/** + * @author caozehui + * @data 2026-04-03 + */ +@Component +@RequiredArgsConstructor +public class MessageConfirmRedisDAO { + private final RedisTemplate redisTemplate; + + private static final String CONFIRM_QUEUE_KEY_PREFIX = "msPush:confirm_queue:"; + + private String getConfirmQueueKey(Long messageId) { + return CONFIRM_QUEUE_KEY_PREFIX + messageId; + } + + public boolean addToConfirmQueue(MessageRecordDO message) { + String key = getConfirmQueueKey(message.getId()); + return redisTemplate.opsForValue().setIfAbsent(key, ""); + } + + public boolean removeFromConfirmQueue(MessageRecordDO message) { + String key = getConfirmQueueKey(message.getId()); + return redisTemplate.delete(key); + } +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java index a8d4578..b5578d0 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/MessageRetryRedisDAO.java @@ -3,12 +3,15 @@ package com.njcn.msgpush.module.push.dal.redis; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; import java.time.ZoneId; +import java.util.Arrays; import java.util.Collections; -import java.util.Set; +import java.util.List; /** * {@link com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO} RedisDAO @@ -25,7 +28,7 @@ public class MessageRetryRedisDAO { /** * Redis中消息重试队列的Key前缀 */ - private static final String RETRY_QUEUE_KEY_PREFIX = "msPpush:retry_queue:"; + private static final String RETRY_QUEUE_KEY_PREFIX = "msPush:retry_queue:"; /** * 获取指定渠道的重试队列Key @@ -34,6 +37,51 @@ public class MessageRetryRedisDAO { return RETRY_QUEUE_KEY_PREFIX + channel; } + /** + * 原子认领并移除到期消息(Lua 脚本保证原子性) + * 适用于单实例场景,直接删除不再单独维护认领状态 + * + * @param channel 渠道类型 + * @param currentTime 当前时间戳 + * @param limit 限制数量 + * @return 认领成功的消息 ID 列表 + */ + public List claimAndRemoveDueMessages(String channel, long currentTime, int limit) { + String key = getRetryQueueKey(channel); + + String luaScript = + "local key = KEYS[1] " + + "local currentTime = tonumber(ARGV[1]) " + + "local limit = tonumber(ARGV[2]) " + + "local dueMessages = redis.call('ZRANGEBYSCORE', key, 0, currentTime, 'LIMIT', 0, limit) " + + "if #dueMessages == 0 then return {} end " + + "for _, messageId in ipairs(dueMessages) do " + + " redis.call('ZREM', key, messageId) " + + "end " + + "return dueMessages"; + + DefaultRedisScript redisScript = new DefaultRedisScript<>(); + redisScript.setScriptText(luaScript); + redisScript.setResultType(List.class); + + try { + List claimedMessageIds = redisTemplate.execute( + redisScript, + Arrays.asList(key), + String.valueOf(currentTime), + String.valueOf(limit) + ); + + log.debug("原子认领并移除到期消息成功:channel={}, 认领数量={}", channel, + claimedMessageIds != null ? claimedMessageIds.size() : 0); + + return claimedMessageIds != null ? claimedMessageIds : Collections.emptyList(); + } catch (Exception e) { + return Collections.emptyList(); + } + } + + /** * 添加消息到重试队列 * @@ -43,17 +91,7 @@ public class MessageRetryRedisDAO { String key = getRetryQueueKey(message.getChannel()); double score = message.getNextRetryTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - try { - redisTemplate.opsForZSet().add(key, message.getMessageId(), score); - log.debug("添加消息到重试队列成功: channel={}, messageId={}, nextRetryTime={}", - message.getChannel(), message.getMessageId(), message.getNextRetryTime()); - return true; - } catch (Exception e) { - log.error("添加消息到重试队列失败: channel={}, messageId={}", message.getChannel(), message.getMessageId(), e); - log.warn("Redis 入队失败,当前仅保留数据库重试记录等待后续恢复: channel={}, messageId={}", - message.getChannel(), message.getMessageId()); - return false; - } + return redisTemplate.opsForZSet().add(key, message.getId() + "", score); } /** @@ -62,55 +100,13 @@ public class MessageRetryRedisDAO { * @param channel 渠道类型 * @param messageId 消息ID */ - public boolean removeFromRetryQueue(String channel, String messageId) { + public boolean removeFromRetryQueue(String channel, Long messageId) { String key = getRetryQueueKey(channel); - try { - redisTemplate.opsForZSet().remove(key, messageId); - log.debug("从重试队列移除消息成功: channel={}, messageId={}", channel, messageId); - return true; - } catch (Exception e) { - log.error("从重试队列移除消息失败: channel={}, messageId={}", channel, messageId, e); - log.warn("Redis 出队失败,当前消息可能存在待清理的 Redis 脏数据: channel={}, messageId={}", channel, messageId); - return false; - } + redisTemplate.opsForZSet().remove(key, messageId); + return true; } - /** - * 获取需要重试的消息ID集合 - * - * @param channel 渠道类型 - * @param currentTime 当前时间戳 - * @param limit 限制数量 - * @return 消息ID集合 - */ - public Set getNeedRetryMessageIds(String channel, long currentTime, int limit) { - String key = getRetryQueueKey(channel); - - try { - return redisTemplate.opsForZSet().rangeByScore(key, 0, currentTime, 0, limit); - } catch (Exception e) { - log.error("获取需要重试的消息ID失败: channel={}", channel, e); - return Collections.emptySet(); - } - } - - /** - * 获取队列大小 - * - * @param channel 渠道类型 - * @return 队列大小 - */ - public Long getQueueSize(String channel) { - String key = getRetryQueueKey(channel); - - try { - return redisTemplate.opsForZSet().size(key); - } catch (Exception e) { - log.error("获取队列大小失败: channel={}", channel, e); - return 0L; - } - } /** * 清空指定渠道的重试队列 @@ -119,12 +115,59 @@ public class MessageRetryRedisDAO { */ public void clearRetryQueue(String channel) { String key = getRetryQueueKey(channel); + redisTemplate.delete(key); + } + /** + * 检查消息是否在重试队列中 + * + * @param channel 渠道类型 + * @param messageId 消息 ID + * @return 是否存在 + */ + public boolean existsInRetryQueue(String channel, Long messageId) { + String key = getRetryQueueKey(channel); try { - redisTemplate.delete(key); - log.info("清空重试队列成功: channel={}", channel); + Double score = redisTemplate.opsForZSet().score(key, messageId + ""); + return score != null; } catch (Exception e) { - log.error("清空重试队列失败: channel={}", channel, e); + return false; + } + } + + /** + * 批量添加消息到重试队列 + * + * @param messages 消息列表 + * @return 成功添加的数量 + */ + public int batchAddToRetryQueue(List messages) { + if (messages == null || messages.isEmpty()) { + return 0; + } + + int successCount = 0; + for (MessageRecordDO message : messages) { + if (addToRetryQueue(message)) { + successCount++; + } + } + return successCount; + } + + /** + * 检查 Redis 是否可用 + * + * @return Redis 是否可用 + */ + public boolean isRedisAvailable() { + try { + String pong = redisTemplate.execute((RedisCallback) connection -> + new String(connection.ping())); + return "PONG".equals(pong); + } catch (Exception e) { + log.error("Redis 不可用", e); + return false; } } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java index 2790332..5cab0d0 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java @@ -32,7 +32,7 @@ public class MessageRetryJob { /** * 定时处理邮件重试队列(每10秒执行一次) */ - @Scheduled(fixedRate = 10000) + //@Scheduled(fixedRate = 10000) public void processEmailRetryQueue() { messageRetryQueueService.processRetryBatch("email"); } @@ -40,8 +40,9 @@ public class MessageRetryJob { /** * 定时处理APP推送重试队列(每10秒执行一次) */ - @Scheduled(fixedRate = 10000) + //@Scheduled(fixedRate = 10000) public void processAppPushRetryQueue() { messageRetryQueueService.processRetryBatch("app_push"); } + } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java index f39ada8..b4f2e1f 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java @@ -16,7 +16,6 @@ import java.util.List; public interface BlacklistService extends IService { Page getPage(BlacklistReqVO reqVO); - Boolean delete(List ids); /** * 进行黑名单检查 diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java index f232f22..95562fb 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java @@ -7,13 +7,11 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.msgpush.framework.common.util.object.PageUtils; import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO; -import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.mysql.blacklist.BlacklistMapper; import org.springframework.stereotype.Service; -import java.util.ArrayList; import java.util.List; /** @@ -29,10 +27,6 @@ public class BlacklistServiceImpl extends ServiceImpl(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); } - @Override - public Boolean delete(List ids) { - return this.lambdaUpdate().in(BlacklistDO::getId, ids).set(BlacklistDO::getDeleted, true).update(); - } @Override public boolean check(MessageRecordDO messageRecordDO) { @@ -40,8 +34,7 @@ public class BlacklistServiceImpl extends ServiceImpl 0, BlacklistDO::getTarget, split) - .eq(BlacklistDO::getDeleted, false).one(); + .in(split.length > 0, BlacklistDO::getTarget, split).one(); if (ObjectUtil.isNotNull(blacklistDO)) { blacklistDO.setHitCount(blacklistDO.getHitCount() + 1); diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java index b2a00f1..f3a8331 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java @@ -1,6 +1,7 @@ package com.njcn.msgpush.module.push.service.message; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.IService; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; @@ -10,7 +11,7 @@ import java.time.LocalDateTime; import java.util.Collection; import java.util.List; -public interface MessageRecordService { +public interface MessageRecordService extends IService { /** * 发送消息(包括email、sms、app_push) @@ -36,23 +37,14 @@ public interface MessageRecordService { */ boolean add(MessageRecordReqVO messageRecordSendReqVO); - MessageRecordDO getById(String messageId); - - List listByIds(Collection ids); Page getPage(MessageRecordReqVO reqVO); - boolean update(MessageRecordReqVO reqVO); - - boolean delete(List ids); - /** * 更新消息记录重试次数 * - * @param messageId + * @param id * @return */ - boolean updateRetryCount(String messageId); - - boolean updateMessage(MessageRecordDO messageRecordDO); + boolean updateRetryCount(Long id); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java index 828b8ab..ae58c8d 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java @@ -1,7 +1,6 @@ package com.njcn.msgpush.module.push.service.message; import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -20,6 +19,7 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryHistoryDO; import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper; +import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO; import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO; import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; @@ -29,11 +29,10 @@ import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; -import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,50 +65,73 @@ public class MessageRecordServiceImpl extends ServiceImpl send(List reqVOList) { - List messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class); - messageRecordDOList.forEach(messageRecordDO -> messageRecordDO.setStatus(MsgStatusConstant.PENDING)); - this.saveBatch(messageRecordDOList); + List messageRecordDOList = this.createMessageRecords(reqVOList); return this.processSendMsg(messageRecordDOList); } + @Transactional(rollbackFor = Exception.class) + public List createMessageRecords(List reqVOList) { + List messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class); + messageRecordDOList.forEach(messageRecordDO -> { + messageRecordDO.setStatus(MsgStatusConstant.PENDING); +// messageRecordDO.setRetryCount(0); + }); + this.saveBatch(messageRecordDOList); + return messageRecordDOList; + } + /** * 发送主流程。 * 这里统一接收 sender 返回的 SendResult,再决定消息状态、重试队列和服务商健康度如何变化。 */ @Override - @Transactional(rollbackFor = Exception.class) public List processSendMsg(List messageRecordDOList) { msgPushGuardChain.checkAll(messageRecordDOList); List resultList = new ArrayList<>(); for (MessageRecordDO messageRecordDO : messageRecordDOList) { - MessageSendResultVO resultVO = new MessageSendResultVO(); - resultVO.setMessageId(messageRecordDO.getMessageId()); - - if (!MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) { - resultVO.setResult(false); - resultVO.setDetail(this.buildBlockedDetail(messageRecordDO)); - this.updateMessage(messageRecordDO); + try { + MessageSendResultVO resultVO = this.processSingleMessage(messageRecordDO); resultList.add(resultVO); - continue; + } catch (Exception e) { + MessageSendResultVO failedVO = new MessageSendResultVO(); + failedVO.setMessageId(messageRecordDO.getId()); + failedVO.setResult(false); + failedVO.setDetail("消息处理异常:" + e.getMessage()); + resultList.add(failedVO); } - - ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService - .getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); - MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType()); - SendResult sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory); - this.applySendResult(messageRecordDO, sendResult); - this.recordRetryHistory(messageRecordDO); - - resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome())); - resultVO.setDetail(this.buildResultDetail(sendResult)); - resultList.add(resultVO); } return resultList; } + private MessageSendResultVO processSingleMessage(MessageRecordDO messageRecordDO) { + MessageSendResultVO resultVO = new MessageSendResultVO(); + resultVO.setMessageId(messageRecordDO.getId()); + + if (!MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) { + resultVO.setResult(false); + resultVO.setDetail(this.buildBlockedDetail(messageRecordDO)); + this.updateById(messageRecordDO); + return resultVO; + } + + ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService + .getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); + MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType()); + SendResult sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory); + this.applySendResult(messageRecordDO, sendResult); + this.recordRetryHistory(messageRecordDO); + + // 在这里修改接口返回的是投递结果,还是调用接口结果 + resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome())); + resultVO.setDetail(this.buildResultDetail(sendResult)); + return resultVO; + } + private SendResult sendMessage(MessageRecordDO messageRecordDO, ChannelProviderConfigDO channelProviderConfigDO, MessageProviderFactory messageProviderFactory) { @@ -137,7 +159,8 @@ public class MessageRecordServiceImpl extends ServiceImpl listByIds(Collection ids) { - return this.lambdaQuery().in(CollectionUtil.isNotEmpty(ids), MessageRecordDO::getMessageId, ids).list(); - } - @Override public Page getPage(MessageRecordReqVO reqVO) { QueryWrapper wrapper = new QueryWrapper<>(); @@ -256,15 +261,4 @@ public class MessageRecordServiceImpl extends ServiceImpl(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); } - - @Override - public boolean update(MessageRecordReqVO reqVO) { - MessageRecordDO messageRecordDO = BeanUtil.copyProperties(reqVO, MessageRecordDO.class); - return this.updateById(messageRecordDO); - } - - @Override - public boolean delete(List ids) { - return this.removeByIds(ids); - } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java index 1aa27b0..5ab0958 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java @@ -27,7 +27,6 @@ public class SystemQuotaConfigServiceImpl extends ServiceImpl { RateLimitConfigDO getByChannelAndAppName(String channel, String appName); - boolean delete(List ids); void check(List messageRecordList); diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java index d713242..b511f04 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java @@ -39,16 +39,7 @@ public class RateLimitConfigServiceImpl extends ServiceImpl ids) { - return this.lambdaUpdate() - .set(RateLimitConfigDO::getDeleted, true) - .in(RateLimitConfigDO::getId, ids) - .update(); + .eq(RateLimitConfigDO::getEnabled, true).one(); } @Override diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryService.java index 3875552..0bcf993 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryService.java @@ -15,7 +15,7 @@ public interface MessageRetryHistoryService { * @param messageId 消息ID * @return */ - Integer getMaxRetrySequence(String messageId); + Integer getMaxRetrySequence(Long messageId); /** * 更新消息错误信息 @@ -26,5 +26,5 @@ public interface MessageRetryHistoryService { * @param errorMsg 错误信息 * @return */ - boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg); + boolean updateStatusAndErrorInfo(Long messageId, String status, String errorCode, String errorMsg); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryServiceImpl.java index 12f3f0b..8f2bc5c 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryHistoryServiceImpl.java @@ -17,7 +17,7 @@ public class MessageRetryHistoryServiceImpl extends ServiceImpl messageIds); - /** * 分页查询重试队列 * @@ -40,18 +38,22 @@ public interface MessageRetryQueueService { */ PageResult getPage(MessageRetryQueueReqVO reqVO); + boolean deleteByMessageIds(List messageIds); + /** - * 更新重试信息 - * - * @param messageId 消息ID - * @param retryCount 重试次数 - * @param nextRetryTime 下次重试时间 - * @param lastRetryTime 最后重试时间 - * @param lastErrorMsg 最后错误信息 + * 应用启动时重建重试队列 */ - boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String lastErrorMsg); + void rebuildRetryQueueOnStartup(); - boolean deleteByMessageIds(List messageIds); + /** + * 定时同步数据库和 Redis 的重试队列一致性 + */ + void syncRetryQueueConsistency(); - boolean updateLastRetryTime(String messageId, LocalDateTime now); + /** + * 从数据库兜底读取待重试消息并回填 Redis + * + * @param channel 渠道类型 + */ + void fallbackToDatabase(String channel); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java index 54d7690..a1af3a1 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java @@ -13,47 +13,62 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper; +import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO; import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; import com.njcn.msgpush.module.push.service.message.MessageRecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; @Slf4j @Service -public class MessageRetryQueueServiceImpl extends ServiceImpl implements MessageRetryQueueService { +public class MessageRetryQueueServiceImpl extends ServiceImpl implements MessageRetryQueueService, CommandLineRunner { @Autowired private MessageRetryRedisDAO messageRetryRedisDAO; @Autowired private MessageRecordService messageRecordService; @Autowired public RetryStrategyConfigService retryStrategyConfigService; + @Autowired + private MessageConfirmRedisDAO messageConfirmRedisDAO; private static final int DEFAULT_BATCH_SIZE = 100; private static final int DEFAULT_MAX_RETRY_COUNT = 5; + private static final List CHANNELS = Arrays.asList( + ChannelTypeEnum.SMS.getCode(), + ChannelTypeEnum.EMAIL.getCode(), + ChannelTypeEnum.APP_PUSH.getCode()); + + private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false); + @Override @Transactional(rollbackFor = Exception.class) public void saveOrUpdateRetryMessage(MessageRecordDO message) { MessageRetryQueueDO existing = super.baseMapper.selectOne( - new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper() - .eq(MessageRetryQueueDO::getMessageId, message.getMessageId()) + new LambdaQueryWrapper() + .eq(MessageRetryQueueDO::getMessageId, message.getId()) ); message.setLastRetryTime(message.getSendTime()); if (ObjectUtil.isNull(existing)) { MessageRetryQueueDO retryRecord = new MessageRetryQueueDO(); - retryRecord.setMessageId(message.getMessageId()); + retryRecord.setMessageId(message.getId()); retryRecord.setChannel(message.getChannel()); retryRecord.setReceiver(message.getReceiver()); + retryRecord.setRetryCount(1); RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(message.getChannel()); retryRecord.setMaxRetry(ObjectUtil.isNull(strategyConfig) ? DEFAULT_MAX_RETRY_COUNT : strategyConfig.getMaxRetryCount()); @@ -65,13 +80,12 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, epochMilli, DEFAULT_BATCH_SIZE); - if (needRetryMessageIds.isEmpty()) { - return; + // 使用原子认领机制,一次性完成读取 + 删除 + List claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE); + + if (CollUtil.isEmpty(claimedMessageIds)) { + log.debug("Redis 中没有待重试消息:channel={}", channel); + fallbackToDatabase(channel); + + epochMilli = System.currentTimeMillis(); + claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE); + + if (CollUtil.isEmpty(claimedMessageIds)) { + return; + } } - List messageRecordDOList = messageRecordService.listByIds(needRetryMessageIds); + List messageRecordDOList = messageRecordService.listByIds(claimedMessageIds); for (MessageRecordDO messageRecordDO : messageRecordDOList) { processSingleRetry(messageRecordDO); } } + /** + * 处理单条消息重试,根据执行结果决定是否重新入队 + * 成功:不再回 Redis,并逻辑删除数据库队列记录 + * 失败且继续重试:重新计算 nextRetryTime,重新写入 Redis 和数据库队列表 + * 失败且终止:不再回 Redis,并逻辑删除数据库队列记录 + * + * @param messageRecordDO 消息记录 + */ + private void processSingleRetry(MessageRecordDO messageRecordDO) { + messageRecordDO.setStatus(MsgStatusConstant.PENDING); + messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0); + messageRecordService.updateRetryCount(messageRecordDO.getId()); + } + + /** + * 删除重试记录(数据库逻辑删除 + Redis 已提前删除) + */ + private void deleteRetryRecord(MessageRecordDO message) { + this.deleteByMessageIds(Collections.singletonList(message.getId())); + messageRecordService.updateById(message); + } + @Override @Transactional(rollbackFor = Exception.class) public void manualRetry(List messageIds) { @@ -169,7 +216,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl getPage(MessageRetryQueueReqVO reqVO) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(StrUtil.isNotBlank(reqVO.getMessageId()), MessageRetryQueueDO::getMessageId, reqVO.getMessageId()) + .eq(ObjectUtil.isNotNull(reqVO.getMessageId()), MessageRetryQueueDO::getMessageId, reqVO.getMessageId()) .eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRetryQueueDO::getChannel, reqVO.getChannel()) .eq(StrUtil.isNotBlank(reqVO.getReceiver()), MessageRetryQueueDO::getReceiver, reqVO.getReceiver()) .ge(ObjectUtil.isNotNull(reqVO.getMinRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMinRetryCount()) @@ -180,50 +227,137 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl messageIds) { + if (CollUtil.isEmpty(messageIds)) { + return false; + } + + return this.remove(new LambdaQueryWrapper() + .in(MessageRetryQueueDO::getMessageId, messageIds)); + } + + + @Override + public void run(String... args) throws Exception { + rebuildRetryQueueOnStartup(); + startupRebuildCompleted.set(true); } @Override - public boolean deleteByMessageIds(List messageIds) { - return this.lambdaUpdate() - .set(MessageRetryQueueDO::getDeleted, true) - .in(CollUtil.isNotEmpty(messageIds), MessageRetryQueueDO::getMessageId, messageIds) - .update(); + @Transactional(rollbackFor = Exception.class) + public void rebuildRetryQueueOnStartup() { + log.info("========== 开始重建重试队列 =========="); + + for (String channel : CHANNELS) { + try { + log.info("正在清空渠道 [{}] 的 Redis 重试队列", channel); + messageRetryRedisDAO.clearRetryQueue(channel); + + log.info("正在从数据库加载渠道 [{}] 的待重试消息", channel); + List dbRecords = this.lambdaQuery() + .eq(MessageRetryQueueDO::getChannel, channel) + .isNotNull(MessageRetryQueueDO::getNextRetryTime) + .list(); + + if (CollUtil.isEmpty(dbRecords)) { + log.info("渠道 [{}] 没有待重试的消息", channel); + continue; + } + + List messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList()); + List messages = messageRecordService.listByIds(messageIds); + + if (CollUtil.isEmpty(messages)) { + log.warn("渠道 [{}] 的消息记录不存在", channel); + continue; + } + + int successCount = messageRetryRedisDAO.batchAddToRetryQueue(messages); + log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}", + channel, dbRecords.size(), successCount); + + } catch (Exception e) { + log.error("重建渠道 [{}] 的重试队列失败", channel, e); + } + } + + log.info("========== 重试队列重建完成 =========="); + } + + @Scheduled(fixedRate = 600000) + @Override + public void syncRetryQueueConsistency() { + if (!startupRebuildCompleted.get()) { + log.debug("启动重建未完成,跳过本次同步"); + return; + } + log.info("开始同步redis与数据库重试队列一致性"); + + for (String channel : CHANNELS) { + List dbRecords = this.lambdaQuery() + .eq(MessageRetryQueueDO::getChannel, channel) + .isNotNull(MessageRetryQueueDO::getNextRetryTime) + .list(); + + if (CollUtil.isEmpty(dbRecords)) { + continue; + } + + for (MessageRetryQueueDO dbRecord : dbRecords) { + boolean existsInRedis = messageRetryRedisDAO.existsInRetryQueue(channel, dbRecord.getMessageId()); + + if (!existsInRedis) { + Long messageId = dbRecord.getMessageId(); + MessageRecordDO message = messageRecordService.getById(messageId); + + if (message != null && message.getNextRetryTime() != null) { + boolean added = messageRetryRedisDAO.addToRetryQueue(message); + if (added) { + log.debug("补回消息到 Redis: channel={}, messageId={}", channel, messageId); + } else { + log.warn("补回消息到 Redis 失败:channel={}, messageId={}", channel, messageId); + } + } else { + log.warn("消息记录不存在或 next_retry_time 为空:channel={}, messageId={}", channel, messageId); + } + } + } + } } @Override - public boolean updateLastRetryTime(String messageId, LocalDateTime now) { - return this.lambdaUpdate() - .eq(MessageRetryQueueDO::getMessageId, messageId) - .set(MessageRetryQueueDO::getLastRetryTime, now) - .update(); + public void fallbackToDatabase(String channel) { + boolean redisAvailable = messageRetryRedisDAO.isRedisAvailable(); + if (!redisAvailable) { + log.warn("Redis 不可用,channel={}", channel); + return; + } + + fallbackToDatabaseDirectly(channel); } - private void processSingleRetry(MessageRecordDO messageRecordDO) { - messageRecordDO.setStatus(MsgStatusConstant.PENDING); - MessageSendResultVO messageSendResultVO = messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0); - messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); - if (messageSendResultVO.getResult()) { - log.info("处理消息重试成功: messageId={}", messageRecordDO.getMessageId()); - messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); - this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); + /** + * 从数据库补拉 + * + * @param channel + */ + private void fallbackToDatabaseDirectly(String channel) { + List dbRecords = baseMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE); + + if (CollUtil.isEmpty(dbRecords)) { return; } - if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) { - log.warn("处理消息重试失败,已保留在重试队列: messageId={}", messageRecordDO.getMessageId()); + List messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList()); + List messages = messageRecordService.listByIds(messageIds); + + if (CollUtil.isEmpty(messages)) { return; } - log.error("处理消息重试失败且不再重试: messageId={}", messageRecordDO.getMessageId()); - messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); - this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); + if (messageRetryRedisDAO.isRedisAvailable()) { + messageRetryRedisDAO.batchAddToRetryQueue(messages); + } } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java index 9172493..1bd4725 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java @@ -18,7 +18,6 @@ public class RetryStrategyConfigServiceImpl extends ServiceImpl listAll() { return this.lambdaQuery() - .eq(RetryStrategyConfigDO::getDeleted, false) .eq(RetryStrategyConfigDO::getEnabled, true) .list(); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/resources/application.yaml b/msgpush-module-push/msgpush-module-push-server/src/main/resources/application.yaml index 0cb6755..8199441 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/resources/application.yaml +++ b/msgpush-module-push/msgpush-module-push-server/src/main/resources/application.yaml @@ -59,6 +59,7 @@ mybatis-plus: mapper-locations: classpath*:com/njcn/msgpush/**/mapping/*.xml configuration: map-underscore-to-camel-case: true # 虽然默认为 true ,但是还是显示去指定下。 + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl global-config: db-config: id-type: ASSIGN_ID # 分配 ID,默认使用雪花算法 diff --git a/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java b/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java index 9231093..34deb0d 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java +++ b/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java @@ -59,7 +59,7 @@ public class MsgPushClientTest { // } for (int i = 0; i < 1; i++) { MessageRecordReqVO message = new MessageRecordReqVO(); - message.setMessageId(String.valueOf(UUID.randomUUID())); + message.setId(1234567890L); message.setAppName("NPQS-9000"); message.setChannel(ChannelTypeEnum.SMS.getMsg()); message.setReceiver("18839431215"); diff --git a/msgpush-module-system/msgpush-module-system-boot/pom.xml b/msgpush-module-system/msgpush-module-system-boot/pom.xml index de6632d..d5987af 100644 --- a/msgpush-module-system/msgpush-module-system-boot/pom.xml +++ b/msgpush-module-system/msgpush-module-system-boot/pom.xml @@ -27,12 +27,11 @@ com.njcn - msgpush-system-api + msgpush-module-system-api ${revision} - com.njcn msgpush-spring-boot-starter-biz-ip @@ -80,7 +79,6 @@ - com.njcn