From 5feae9e40387070c2f932c9b066fc536ec390d29 Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Mon, 16 Mar 2026 10:40:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=91=E9=80=81=E9=A2=91=E7=8E=87=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=95=B0=E6=8D=AE=E8=AE=BF=E9=97=AE=E5=B1=82=E3=80=81?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../msgpush/module/push/checker/IChecker.java | 4 +- .../push/checker/MsgPushGuardChain.java | 15 +-- .../push/checker/impl/BlacklistChecker.java | 13 ++- .../push/checker/impl/QuotaChecker.java | 6 +- .../push/checker/impl/RateLimitChecker.java | 6 +- .../module/push/client/sender/Sender.java | 4 + .../client/sender/impl/AliyunEmailSender.java | 6 +- .../client/sender/impl/AliyunSmsSender.java | 24 ++-- .../client/sender/impl/TelecomSmsSender.java | 41 ++----- .../admin/blacklist/BlacklistController.java | 19 ++-- .../admin/blacklist/vo/BlacklistReqVO.java | 21 ---- .../ChannelProviderConfigController.java | 9 +- .../vo/ChannelProviderConfigReqVO.java | 15 ++- .../message/MessageRecordController.java | 19 ++-- .../admin/message/vo/MessageRecordReqVO.java | 8 +- .../ratelimit/RateLimitConfigController.java | 65 +++++++++++ .../ratelimit/VO/RateLimitConfigReqVO.java | 37 ++++++ .../retry/MessageRetryQueueController.java | 6 +- ...gVO.java => RetryStrategyConfigReqVO.java} | 22 ++-- .../ratelimit/RateLimitConfigDO.java | 38 +++++++ .../mysql/message/MessageRecordMapper.java | 4 +- .../message/mapping/MessageRecordMapper.xml | 8 +- .../ratelimit/RateLimitConfigMapper.java | 13 +++ .../service/blacklist/BlacklistService.java | 12 ++ .../blacklist/BlacklistServiceImpl.java | 34 +++++- .../service/message/MessageRecordService.java | 16 +-- .../message/MessageRecordServiceImpl.java | 15 +-- .../ratelimit/RateLimitConfigService.java | 19 ++++ .../ratelimit/RateLimitConfigServiceImpl.java | 35 ++++++ .../retry/MessageRetryQueueServiceImpl.java | 107 ++++++++++-------- .../retry/RetryStrategyConfigService.java | 9 +- .../retry/RetryStrategyConfigServiceImpl.java | 6 +- 32 files changed, 437 insertions(+), 219 deletions(-) create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/VO/RateLimitConfigReqVO.java rename msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/{RetryStrategyConfigVO.java => RetryStrategyConfigReqVO.java} (65%) create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/ratelimit/RateLimitConfigDO.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/ratelimit/RateLimitConfigMapper.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java index eda2720..1099fc1 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java @@ -2,6 +2,8 @@ package com.njcn.msgpush.module.push.checker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import java.util.List; + /** * @author caozehui * @data 2026-02-27 @@ -9,5 +11,5 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq */ public interface IChecker { - boolean check(MessageRecordReqVO reqVO); + void check(List messageRecordReqVOList); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java index f9c22cd..1fad7b7 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java @@ -23,20 +23,9 @@ public class MsgPushGuardChain { this.checkers.add(new RateLimitChecker()); } - public boolean checkAll(MessageRecordReqVO reqVO) { + public void checkAll(List messageRecordReqVOList) { for (IChecker checker : checkers) { - boolean result = checker.check(reqVO); - if (!result) { - // 任何一层检查失败,立即返回拒绝 - logRejection(reqVO); - return result; - } + checker.check(messageRecordReqVOList); } - return true; - } - - private void logRejection(MessageRecordReqVO reqVO) { - // 记录拒绝日志,用于监控和分析 - System.out.printf("消息请求被拒绝: receiver=%s, messageId=%s, reason=%s%n", reqVO.getReceiver(), reqVO.getMessageId()); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java index d81b7f0..28024f4 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java @@ -2,15 +2,24 @@ package com.njcn.msgpush.module.push.checker.impl; import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.service.blacklist.BlacklistService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; /** * @author caozehui * @data 2026-02-27 * @description 黑名单检查器 */ +@Component public class BlacklistChecker implements IChecker { + @Autowired + private BlacklistService blacklistService; + @Override - public boolean check(MessageRecordReqVO reqVO) { - return true; + public void check(List messageRecordReqVOList) { + blacklistService.check(messageRecordReqVOList); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java index 553431f..2b5ea57 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java @@ -3,6 +3,8 @@ package com.njcn.msgpush.module.push.checker.impl; import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import java.util.List; + /** * @author caozehui * @data 2026-02-27 @@ -10,7 +12,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq */ public class QuotaChecker implements IChecker { @Override - public boolean check(MessageRecordReqVO reqVO) { - return true; + public void check(List messageRecordReqVOList) { + } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java index 2dc986e..c31398c 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java @@ -4,6 +4,8 @@ package com.njcn.msgpush.module.push.checker.impl; import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import java.util.List; + /** * @author caozehui * @data 2026-02-27 @@ -11,7 +13,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq */ public class RateLimitChecker implements IChecker { @Override - public boolean check(MessageRecordReqVO reqVO) { - return true; + public void check(List messageRecordReqVOList) { + } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java index 7282baf..feafcd5 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/Sender.java @@ -9,6 +9,8 @@ import com.njcn.msgpush.module.push.util.RestTemplateUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -47,4 +49,6 @@ public class Sender { new ThreadPoolExecutor.CallerRunsPolicy() ); + public final ScheduledExecutorService MSG_CALLBACK_THREAD_POOL_SCHEDULER = Executors.newScheduledThreadPool(5); + } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java index 85fe96c..40a4a90 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java @@ -66,6 +66,8 @@ public class AliyunEmailSender implements EmailSender { @Override public boolean sendEmail(MessageRecordDO message, Map params) { Future future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { + message.setLastRetryTime(message.getSendTime()); + message.setNextRetryTime(null); message.setStatus(MessageStatusConstant.SENDING); RuntimeOptions runtimeOptions = new RuntimeOptions(); runtimeOptions.autoretry = true; @@ -92,7 +94,7 @@ public class AliyunEmailSender implements EmailSender { System.out.println(toJSONString(response)); if (HttpStatus.OK.value() == response.getStatusCode()) { message.setStatus(MessageStatusConstant.SUCCESS); - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); return true; @@ -108,7 +110,7 @@ public class AliyunEmailSender implements EmailSender { } else { message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); } - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); return false; diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java index a735b6b..b9781bd 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java @@ -22,9 +22,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.aliyun.teautil.Common.toJSONString; @@ -42,8 +40,6 @@ public class AliyunSmsSender implements SmsSender { private Client smsClient; - private ScheduledExecutorService scheduledExecutorService; - public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) { this.sender = sender; if (ObjectUtil.isNotNull(aliYunSmsSetting)) { @@ -64,6 +60,8 @@ public class AliyunSmsSender implements SmsSender { @Override public boolean sendSms(MessageRecordDO message) { Future future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { + message.setLastRetryTime(message.getSendTime()); + message.setNextRetryTime(null); message.setStatus(MessageStatusConstant.SENDING); RuntimeOptions runtimeOptions = new RuntimeOptions(); // 设置自动重试,默认是不开启的。重试次数默认是3次 @@ -95,7 +93,7 @@ public class AliyunSmsSender implements SmsSender { ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.body.code); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - if(providerErrorCodeMappingDO.getShouldRetry() == 1){ + if (providerErrorCodeMappingDO.getShouldRetry() == 1) { this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } } else { @@ -140,10 +138,7 @@ public class AliyunSmsSender implements SmsSender { * @param message */ private void getDownInfo(String bizId, MessageRecordDO message) { - if (ObjectUtil.isNull(this.scheduledExecutorService)) { - this.scheduledExecutorService = Executors.newScheduledThreadPool(1); - } - this.scheduledExecutorService.schedule(() -> { + this.sender.MSG_CALLBACK_THREAD_POOL_SCHEDULER.schedule(() -> { QuerySendDetailsRequest request = new QuerySendDetailsRequest() .setPhoneNumber(message.getReceiver()) .setBizId(bizId) @@ -156,22 +151,23 @@ public class AliyunSmsSender implements SmsSender { // if (response.statusCode != HttpStatus.OK.value()) { response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> { if (!"DELIVERED".equals(detail.errCode)) { + message.setStatus(MessageStatusConstant.FAILED); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode); message.setErrorCode(detail.errCode); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - if(providerErrorCodeMappingDO.getShouldRetry() == 1){ + if (providerErrorCodeMappingDO.getShouldRetry() == 1) { this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } } else { message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); } - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); } else { message.setStatus(MessageStatusConstant.SUCCESS); - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null, message.getLastRetryTime(), null); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); } @@ -180,8 +176,6 @@ public class AliyunSmsSender implements SmsSender { } catch (Exception e) { throw new RuntimeException(e); } - this.scheduledExecutorService.shutdown(); - this.scheduledExecutorService = null; - }, 10, TimeUnit.SECONDS); + }, 20, TimeUnit.SECONDS); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java index e6af233..15841eb 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java @@ -22,9 +22,7 @@ import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.alibaba.fastjson.JSON.toJSON; @@ -43,8 +41,6 @@ public class TelecomSmsSender implements SmsSender { private TelecomSmsSetting telecomSmsSetting; private Sender sender; - private ScheduledExecutorService scheduledExecutorService; - @Data private static class TelecomSmsSendResponse { private String status; @@ -87,6 +83,8 @@ public class TelecomSmsSender implements SmsSender { @Override public boolean sendSms(MessageRecordDO message) { Future future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { + message.setLastRetryTime(message.getSendTime()); + message.setNextRetryTime(null); message.setStatus(MessageStatusConstant.SENDING); // 构建请求参数 Map request = new HashMap<>(); @@ -143,7 +141,7 @@ public class TelecomSmsSender implements SmsSender { message.setErrorCode(telecomSmsSendResponse.list.get(0).result + ""); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - if(providerErrorCodeMappingDO.getShouldRetry() == 1){ + if (providerErrorCodeMappingDO.getShouldRetry() == 1) { this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } } else { @@ -239,10 +237,7 @@ public class TelecomSmsSender implements SmsSender { * @param message */ private void getDownInfo(String mid, MessageRecordDO message) { - if (ObjectUtil.isNull(this.scheduledExecutorService)) { - this.scheduledExecutorService = Executors.newScheduledThreadPool(1); - } - this.scheduledExecutorService.schedule(() -> { + this.sender.MSG_CALLBACK_THREAD_POOL_SCHEDULER.schedule(() -> { // 构建请求参数 Map request = new HashMap<>(); request.put("action", "select"); @@ -266,48 +261,34 @@ public class TelecomSmsSender implements SmsSender { ); System.out.println(JSON.toJSONString(JSON.toJSONString(response))); -// if (response.getStatusCode() == HttpStatus.OK) { TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class); TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0); -// if (StrUtil.isBlank(message.getErrorMsg())) { -// message.setStatus(MessageStatusConstant.FAILED); -// ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), "MA:0006"); -// message.setErrorCode(telecomSmsSelectDetailRes.getStat()); -// if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { -// message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); -// } else { -// message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); -// } -// this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); -// this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); -// } else { -// message.setStatus(MessageStatusConstant.SUCCESS); -// } + boolean b = Math.random() > 0.5; if (telecomSmsSelectDetailRes.getStatus() == 5) { message.setStatus(MessageStatusConstant.FAILED); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat()); message.setErrorCode(telecomSmsSelectDetailRes.getStat()); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - if(providerErrorCodeMappingDO.getShouldRetry() == 1){ + if (providerErrorCodeMappingDO.getShouldRetry() == 1) { this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } } else { message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); + //this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); } if (telecomSmsSelectDetailRes.getStatus() == 4) { message.setStatus(MessageStatusConstant.SUCCESS); - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); } - this.scheduledExecutorService.shutdown(); - this.scheduledExecutorService = null; - }, 10, TimeUnit.SECONDS); + + }, 20, TimeUnit.SECONDS); } /** diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java index 04d8c1c..3a13659 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/BlacklistController.java @@ -8,13 +8,11 @@ import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; import com.njcn.msgpush.module.push.service.blacklist.BlacklistService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.List; @@ -24,7 +22,10 @@ import static com.njcn.msgpush.framework.common.pojo.CommonResult.success; * @author caozehui * @data 2026-03-12 */ -@RestController("/push/blacklist") +@Tag(name = "管理后台 - 黑名单") +@Validated +@RestController +@RequestMapping("/push/blacklist") public class BlacklistController { @Autowired private BlacklistService blacklistService; @@ -41,21 +42,21 @@ public class BlacklistController { @Operation(summary = "添加黑名单") @PreAuthorize("@ss.hasPermission('push:blacklist:add')") public CommonResult add(@RequestBody BlacklistReqVO reqVO) { - return CommonResult.success(blacklistService.save(BeanUtil.copyProperties(reqVO, BlacklistDO.class))); + return success(blacklistService.save(BeanUtil.copyProperties(reqVO, BlacklistDO.class))); } @PostMapping("/update") @Operation(summary = "更新黑名单") @PreAuthorize("@ss.hasPermission('push:blacklist:update')") public CommonResult update(@RequestBody BlacklistReqVO reqVO) { - return CommonResult.success(blacklistService.updateById(BeanUtil.copyProperties(reqVO, BlacklistDO.class))); + return success(blacklistService.updateById(BeanUtil.copyProperties(reqVO, BlacklistDO.class))); } @PostMapping("/delete") @Operation(summary = "删除黑名单") @PreAuthorize("@ss.hasPermission('push:blacklist:delete')") @Parameter(name = "ids", description = "id列表", required = true) - public CommonResult delete(@RequestParam("ids") List ids){ - return CommonResult.success(blacklistService.removeByIds(ids)); + public CommonResult delete(@RequestParam("ids") List ids) { + return success(blacklistService.delete(ids)); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/vo/BlacklistReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/vo/BlacklistReqVO.java index dcbacbb..d840377 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/vo/BlacklistReqVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/blacklist/vo/BlacklistReqVO.java @@ -16,50 +16,29 @@ import lombok.Data; @Schema(description = "管理后台 - 黑名单 Request VO") public class BlacklistReqVO extends PageParam { - /** - * 主键 ID - */ @Schema(description = "主键 ID", example = "123444") private Long id; - /** - * 渠道类型:sms/email/app_push - */ @Schema(description = "渠道类型:sms/email/app_push", example = "sms") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") private String channel; - /** - * 黑名单目标:手机号/邮箱/设备 Token - */ @Schema(description = "黑名单目标:手机号/邮箱/设备 Token", example = "15601691000") @NotBlank(message = "黑名单目标不能为空") private String target; - /** - * 加入原因:用户投诉/无效号码/频繁退订等 - */ @Schema(description = "加入原因:用户投诉/无效号码/频繁退订等", example = "用户投诉") private String reason; - /** - * 操作来源:manual/auto/import - */ @Schema(description = "操作来源:manual/auto/import", example = "manual") @InEnum(value = com.njcn.msgpush.module.push.enums.BlacklistSourceEnum.class, message = "操作来源必须是 {value}") private String source; - /** - * 过期时间:-1=永久,时间戳=到期时间 - */ @Schema(description = "过期时间:-1=永久,时间戳=到期时间", example = "15601691000") @NotNull(message = "过期时间不能为空") @Min(value = -1L, message = "过期时间不能小于 -1") private Long expireTime; - /** - * 备注 - */ @Schema(description = "备注", example = "备注信息") private String remark; } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java index 3f84d87..b251e7e 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java @@ -45,14 +45,13 @@ public class ChannelProviderConfigController { @Operation(summary = "分页查询渠道服务商列表") @PreAuthorize("@ss.hasPermission('push:channel:page')") public CommonResult> pageChannelProviderConfig(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { - Page res = channelProviderConfigService.getPage(reqVO); - return success(res); + return success(channelProviderConfigService.getPage(reqVO)); } @PostMapping("/add") @Operation(summary = "新增渠道服务商") @PreAuthorize("@ss.hasPermission('push:channel:add')") - public CommonResult addChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { + public CommonResult add(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { boolean res = channelProviderConfigService.save(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class)); return success(res); } @@ -60,7 +59,7 @@ public class ChannelProviderConfigController { @PostMapping("/update") @Operation(summary = "更新渠道服务商") @PreAuthorize("@ss.hasPermission('push:channel:update')") - public CommonResult updateChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { + public CommonResult update(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { boolean res = channelProviderConfigService.updateById(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class)); return success(res); } @@ -69,7 +68,7 @@ public class ChannelProviderConfigController { @Operation(summary = "删除渠道服务商") @PreAuthorize("@ss.hasPermission('push:channel:delete')") @Parameter(name = "ids", description = "id列表", required = true) - public CommonResult deleteChannelProvider(@RequestParam("ids") List ids) { + public CommonResult delete(@RequestParam("ids") List ids) { boolean res = channelProviderConfigService.removeBatchByIds(ids); return success(res); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/vo/ChannelProviderConfigReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/vo/ChannelProviderConfigReqVO.java index 9ac9bbf..4000d42 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/vo/ChannelProviderConfigReqVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/vo/ChannelProviderConfigReqVO.java @@ -1,7 +1,9 @@ package com.njcn.msgpush.module.push.controller.admin.channel.vo; import com.njcn.msgpush.framework.common.pojo.PageParam; +import com.njcn.msgpush.framework.common.validation.InEnum; import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotBlank; import lombok.Data; @@ -11,8 +13,8 @@ public class ChannelProviderConfigReqVO extends PageParam { @Schema(description = "渠道ID") private String id; - @Schema(description = "渠道类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "sms/email/app_push") - @NotBlank(message = "渠道类型不能为空") + @Schema(description = "渠道类型:sms/email/app_push", example = "sms") + @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") private String channel; @Schema(description = "服务商名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "中国电信/阿里云/UniPush") @@ -20,18 +22,18 @@ public class ChannelProviderConfigReqVO extends PageParam { private String providerName; @Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio/unipush") - @NotBlank(message = "服务商类型不能为空") + @InEnum(value = com.njcn.msgpush.module.push.enums.ProviderTypeEnum.class, message = "服务商类型必须是 {value}") private String providerType; - @Schema(description = "API地址", example = "https://api.example.com") + @Schema(description = "API地址") @NotBlank(message = "API地址不能为空") private String apiUrl; - @Schema(description = "AppKey", example = "123456") + @Schema(description = "AppKey") @NotBlank(message = "AppKey不能为空") private String appKey; - @Schema(description = "AppSecret", example = "123456") + @Schema(description = "AppSecret") @NotBlank(message = "AppSecret不能为空") private String appSecret; @@ -39,5 +41,6 @@ public class ChannelProviderConfigReqVO extends PageParam { private String extraConfig; @Schema(description = "优先级(数字越小优先级越高)", example = "1") + @Min(value = 1L, message = "优先级不能小于 1") private Integer priority; } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java index 7478d2a..3f904e3 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java @@ -4,7 +4,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; -import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.service.message.MessageRecordService; import io.swagger.v3.oas.annotations.Operation; @@ -12,19 +11,17 @@ import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.annotation.security.PermitAll; import jakarta.validation.Valid; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import java.util.List; -import java.util.concurrent.TimeUnit; import static com.njcn.msgpush.framework.common.pojo.CommonResult.success; @Tag(name = "管理后台 - 消息") -@Slf4j +@Validated @RestController @RequestMapping("/push/message") public class MessageRecordController { @@ -37,37 +34,35 @@ public class MessageRecordController { @Operation(summary = "消息推送") @Idempotent(timeout = 60) public CommonResult send(@Valid @RequestBody List reqVOList) { - Boolean result = messageRecordService.send(reqVOList); - return CommonResult.success(result); + return success(messageRecordService.send(reqVOList)); } @PostMapping("/page") @Operation(summary = "分页查询渠道服务商列表") @PreAuthorize("@ss.hasPermission('push:message:page')") public CommonResult> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) { - Page res = messageRecordService.getPage(reqVO); - return success(res); + return success(messageRecordService.getPage(reqVO)); } @PostMapping("/add") @Operation(summary = "添加消息记录") @PreAuthorize("@ss.hasPermission('push:message:add')") public CommonResult add(@Validated @RequestBody MessageRecordReqVO reqVO) { - return messageRecordService.add(reqVO) ? success(true) : success(false); + return success(messageRecordService.add(reqVO)); } @PostMapping("/update") @Operation(summary = "更新消息记录") @PreAuthorize("@ss.hasPermission('push:message:update')") public CommonResult update(@Validated @RequestBody MessageRecordReqVO reqVO) { - return messageRecordService.update(reqVO) ? success(true) : success(false); + return success(messageRecordService.update(reqVO)); } @PostMapping("/delete") @Operation(summary = "删除消息记录") @PreAuthorize("@ss.hasPermission('push:message:delete')") @Parameter(name = "ids", description = "编号", required = true) - public CommonResult delete(@RequestParam("ids") List ids) { - return messageRecordService.delete(ids) ? success(true) : success(false); + public CommonResult delete(@RequestParam("ids") List ids) { + return success(messageRecordService.delete(ids)); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java index 2d15b16..3b67b24 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java @@ -2,6 +2,7 @@ package com.njcn.msgpush.module.push.controller.admin.message.vo; import cn.hutool.core.lang.RegexPool; import com.njcn.msgpush.framework.common.pojo.PageParam; +import com.njcn.msgpush.framework.common.validation.InEnum; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.Pattern; @@ -25,13 +26,12 @@ public class MessageRecordReqVO extends PageParam { @NotBlank(message = "应用名称/来源系统标识不能为空") private String appName; - @Schema(description = "渠道类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "sms/email/app_push") - @NotBlank(message = "渠道类型不能为空") + @Schema(description = "渠道类型:sms/email/app_push", example = "sms") + @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") private String channel; @Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify") @NotBlank(message = "消息类型不能为空") -// @InEnum(value = CommonStatusEnum.class,message = "消息类型错误") private String messageType; @Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED) @@ -52,7 +52,7 @@ public class MessageRecordReqVO extends PageParam { private String templateParams; @Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio") - @NotBlank(message = "服务商类型不能为空") + @InEnum(value = com.njcn.msgpush.module.push.enums.ProviderTypeEnum.class, message = "服务商类型必须是 {value}") private String providerType; @Schema(description = "第三方消息ID") diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java new file mode 100644 index 0000000..b986c42 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/RateLimitConfigController.java @@ -0,0 +1,65 @@ +package com.njcn.msgpush.module.push.controller.admin.ratelimit; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.log.Log; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.njcn.msgpush.framework.common.pojo.CommonResult; +import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO; +import com.njcn.msgpush.module.push.service.ratelimit.RateLimitConfigService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +import static com.njcn.msgpush.framework.common.pojo.CommonResult.success; + +/** + * @author caozehui + * @data 2026-03-13 + */ +@Tag(name = "管理后台 - 单人配额") +@Validated +@RestController +@RequestMapping("/push/ratelimit") +public class RateLimitConfigController { + @Autowired + private RateLimitConfigService rateLimitConfigService; + + @PostMapping("/page") + @Operation(summary = "分页单人配额列表") + @PreAuthorize("@ss.hasPermission('push:ratelimit:page')") + public CommonResult> pageRateLimitConfig(@Validated @RequestBody RateLimitConfigReqVO reqVO) { + Page res = rateLimitConfigService.getPage(reqVO); + return success(res); + } + + @PostMapping("/add") + @Operation(summary = "添加单人配额") + @PreAuthorize("@ss.hasPermission('push:ratelimit:add')") + public CommonResult add(@Validated @RequestBody RateLimitConfigReqVO reqVO) { + boolean res = rateLimitConfigService.save(BeanUtil.copyProperties(reqVO, RateLimitConfigDO.class)); + return success(res); + } + + @PostMapping("/update") + @Operation(summary = "更新单人配额") + @PreAuthorize("@ss.hasPermission('push:ratelimit:update')") + public CommonResult update(@Validated @RequestBody RateLimitConfigReqVO reqVO) { + boolean res = rateLimitConfigService.updateById(BeanUtil.copyProperties(reqVO, RateLimitConfigDO.class)); + return success(res); + } + + @PostMapping("/delete") + @Operation(summary = "删除单人配额") + @PreAuthorize("@ss.hasPermission('push:ratelimit:delete')") + @Parameter(name = "ids", description = "id列表", required = true) + public CommonResult delete(@RequestParam List ids) { + return success(rateLimitConfigService.delete(ids)); + } +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/VO/RateLimitConfigReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/VO/RateLimitConfigReqVO.java new file mode 100644 index 0000000..ed84304 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/ratelimit/VO/RateLimitConfigReqVO.java @@ -0,0 +1,37 @@ +package com.njcn.msgpush.module.push.controller.admin.ratelimit.VO; + +import com.njcn.msgpush.framework.common.pojo.PageParam; +import com.njcn.msgpush.framework.common.validation.InEnum; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-13 + */ +@Data +@Schema(description = "管理后台 - 限流配置 Request VO") +public class RateLimitConfigReqVO extends PageParam { + @Schema(description = "主键 ID") + private Long id; + + @Schema(description = "渠道类型:sms/email/app_push", example = "sms") + @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") + private String channel; + + @Schema(description = "来源系统", example = "NPQS-9000") + @NotBlank(message = "来源系统不能为空") + private String appName; + + @Schema(description = "单接收者每日上限", example = "100") + @NotNull(message = "单接收者每日上限不能为空") + @Min(value = 1L, message = "单接收者每日上限不能小于 1") + private Integer dailyLimit; + + @Schema(description = "是否启用:0-否 1-是", example = "1") + @NotNull(message = "是否启用不能为空") + private Boolean enabled; +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java index f105a50..8c2ff2c 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java @@ -3,7 +3,7 @@ package com.njcn.msgpush.module.push.controller.admin.retry; import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.common.pojo.PageResult; import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO; -import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO; +import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigReqVO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService; @@ -11,7 +11,6 @@ import com.njcn.msgpush.module.push.service.retry.RetryStrategyConfigService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; @@ -26,7 +25,6 @@ import static com.njcn.msgpush.framework.common.pojo.CommonResult.success; * @data 2026-02-27 */ @Tag(name = "管理后台 - 消息重试") -@Slf4j @Validated @RestController @RequestMapping("/push/retry") @@ -64,7 +62,7 @@ public class MessageRetryQueueController { @PostMapping("/config/update") @Operation(summary = "更新消息重试配置") @PreAuthorize("@ss.hasPermission('push:retry:config:update')") - public CommonResult updateRetryConfig(@Validated @RequestBody RetryStrategyConfigVO retryStrategyConfigVO) { + public CommonResult updateRetryConfig(@Validated @RequestBody RetryStrategyConfigReqVO retryStrategyConfigVO) { return success(retryStrategyConfigService.updateStrategyConfig(retryStrategyConfigVO)); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigReqVO.java similarity index 65% rename from msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigVO.java rename to msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigReqVO.java index 5f22f1a..4213874 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigReqVO.java @@ -1,6 +1,8 @@ package com.njcn.msgpush.module.push.controller.admin.retry.vo; +import com.njcn.msgpush.framework.common.validation.InEnum; import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import lombok.Data; @@ -11,29 +13,19 @@ import lombok.Data; */ @Data @Schema(description = "管理后台 - 重试策略配置 Request VO") -public class RetryStrategyConfigVO { - /** - * 主键 ID - */ +public class RetryStrategyConfigReqVO { @Schema(description = "主键 ID") private Long id; - /** - * 渠道类型:sms/email/app_push - */ - @Schema(description = "渠道类型:sms/email/app_push") - @NotBlank(message = "渠道类型不能为空") + + @Schema(description = "渠道类型:sms/email/app_push", example = "sms") + @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") private String channel; - /** - * 最大重试次数 - */ @Schema(description = "最大重试次数") @NotNull(message = "最大重试次数不能为空") + @Min(value = 1, message = "最大重试次数不能小于0") private Integer maxRetryCount; - /** - * 重试间隔(秒),逗号分隔,如:300,600,1800 - */ @Schema(description = "重试间隔(秒),逗号分隔,如:300,600,1800") @NotBlank(message = "重试间隔不能为空") private String retryIntervals; diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/ratelimit/RateLimitConfigDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/ratelimit/RateLimitConfigDO.java new file mode 100644 index 0000000..6649797 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/ratelimit/RateLimitConfigDO.java @@ -0,0 +1,38 @@ +package com.njcn.msgpush.module.push.dal.dataobject.ratelimit; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-13 + */ +@Data +@TableName("push_rate_limit_config") +public class RateLimitConfigDO extends BaseDO { + /** + * 主键 ID + */ + private Long id; + + /** + * 渠道类型:sms/email/app_push + */ + private String channel; + + /** + * 来源系统 + */ + private String appName; + + /** + * 单接收者每日上限 + */ + private Integer dailyLimit; + + /** + * 是否启用:0-否 1-是 + */ + private Boolean enabled; +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java index f792fa2..edec7b7 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/MessageRecordMapper.java @@ -5,7 +5,9 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; +import java.time.LocalDateTime; + @Mapper public interface MessageRecordMapper extends BaseMapperX { - int updateStatusAndErrorInfo(@Param("messageId") String messageId, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg); + int updateStatusAndErrorInfo(@Param("messageId") String messageId, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg,@Param("lastRetryTime") LocalDateTime lastRetryTime, @Param("nextRetryTime") LocalDateTime nextRetryTime); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml index 643ddd9..2104003 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/message/mapping/MessageRecordMapper.xml @@ -5,9 +5,11 @@ UPDATE push_message_record - SET status = #{status}, - error_code = #{errorCode}, - error_msg = #{errorMsg} + SET status = #{status}, + error_code = #{errorCode}, + error_msg = #{errorMsg}, + last_retry_time= #{lastRetryTime}, + next_retry_time= #{nextRetryTime} WHERE message_id = #{messageId} \ No newline at end of file diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/ratelimit/RateLimitConfigMapper.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/ratelimit/RateLimitConfigMapper.java new file mode 100644 index 0000000..97d3f0e --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/ratelimit/RateLimitConfigMapper.java @@ -0,0 +1,13 @@ +package com.njcn.msgpush.module.push.dal.mysql.ratelimit; + +import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX; +import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO; +import org.apache.ibatis.annotations.Mapper; + +/** + * @author caozehui + * @data 2026-03-13 + */ +@Mapper +public interface RateLimitConfigMapper extends BaseMapperX { +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java index eaf910e..7990894 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java @@ -3,12 +3,24 @@ package com.njcn.msgpush.module.push.service.blacklist; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.IService; import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; +import java.util.List; + /** * @author caozehui * @data 2026-03-12 */ public interface BlacklistService extends IService { Page getPage(BlacklistReqVO reqVO); + + Boolean delete(List ids); + + /** + * 进行黑名单检查 + * + * @param messageRecordReqVOList + */ + void check(List messageRecordReqVOList); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java index e7c6af0..07aae8d 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java @@ -1,15 +1,20 @@ package com.njcn.msgpush.module.push.service.blacklist; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.msgpush.framework.common.util.object.PageUtils; import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; -import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO; import com.njcn.msgpush.module.push.dal.mysql.blacklist.BlacklistMapper; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.List; + /** * @author caozehui * @data 2026-03-12 @@ -22,4 +27,31 @@ public class BlacklistServiceImpl extends ServiceImpl(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); } + + @Override + public Boolean delete(List ids) { + return this.lambdaUpdate().in(BlacklistDO::getId, ids).set(BlacklistDO::getDeleted, true).update(); + } + + @Override + public void check(List messageRecordReqVOList) { + List exists = new ArrayList<>(); + for (int i = messageRecordReqVOList.size() - 1; i >= 0; i--) { + MessageRecordReqVO messageRecordReqVO = messageRecordReqVOList.get(i); + String receiver = messageRecordReqVO.getReceiver(); + String[] split = receiver.split(String.valueOf(StrUtil.C_COMMA)); + BlacklistDO blacklistDO = this.lambdaQuery() + .eq(BlacklistDO::getChannel, messageRecordReqVO.getChannel()) + .in(split.length > 0, BlacklistDO::getTarget, split) + .eq(BlacklistDO::getDeleted, false).one(); + + if (ObjectUtil.isNotNull(blacklistDO)) { + messageRecordReqVOList.remove(i); + blacklistDO.setHitCount(blacklistDO.getHitCount() + 1); + exists.add(blacklistDO); + } + } + // 更新 + this.updateBatchById(exists); + } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java index 3a21fb8..bac595e 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java @@ -5,6 +5,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import java.io.Serializable; +import java.time.LocalDateTime; import java.util.Collection; import java.util.List; @@ -42,18 +43,7 @@ public interface MessageRecordService { boolean update(MessageRecordReqVO reqVO); - boolean delete(List ids); - - /** - * 更新消息记录状态 - * - * @param messageId - * @param status - * @param errorCode - * @param errorMsg - * @return - */ - boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg); + boolean delete(List ids); /** * 更新消息记录重试次数 @@ -62,4 +52,6 @@ public interface MessageRecordService { * @return */ boolean updateRetryCount(String messageId); + + boolean updateMessage(MessageRecordDO messageRecordDO); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java index a9cc965..019c1d0 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java @@ -25,6 +25,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.Serializable; +import java.time.LocalDateTime; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -82,10 +83,10 @@ public class MessageRecordServiceImpl extends ServiceImpl 0; + public boolean updateMessage(MessageRecordDO messageRecordDO) { + return this.updateById(messageRecordDO); } @Override @@ -140,7 +141,7 @@ public class MessageRecordServiceImpl extends ServiceImpl ids) { - return this.lambdaUpdate().set(MessageRecordDO::getDeleted, false).update(); + public boolean delete(List ids) { + return this.lambdaUpdate().in(MessageRecordDO::getId, ids).set(MessageRecordDO::getDeleted, false).update(); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java new file mode 100644 index 0000000..1fd6bc8 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigService.java @@ -0,0 +1,19 @@ +package com.njcn.msgpush.module.push.service.ratelimit; + +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO; + +import java.util.List; + +/** + * @author caozehui + * @data 2026-03-13 + */ +public interface RateLimitConfigService extends IService { + Page getPage(RateLimitConfigReqVO reqVO); + + boolean delete(List ids); +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java new file mode 100644 index 0000000..7bf0c18 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/ratelimit/RateLimitConfigServiceImpl.java @@ -0,0 +1,35 @@ +package com.njcn.msgpush.module.push.service.ratelimit; + +import cn.hutool.core.util.StrUtil; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.msgpush.framework.common.util.object.PageUtils; +import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO; +import com.njcn.msgpush.module.push.dal.mysql.ratelimit.RateLimitConfigMapper; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author caozehui + * @data 2026-03-13 + */ +@Service +public class RateLimitConfigServiceImpl extends ServiceImpl implements RateLimitConfigService { + @Override + public Page getPage(RateLimitConfigReqVO reqVO) { + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.lambda().eq(StrUtil.isNotBlank(reqVO.getChannel()), RateLimitConfigDO::getChannel, reqVO.getChannel()); + return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); + } + + @Override + public boolean delete(List ids) { + return this.lambdaUpdate() + .set(RateLimitConfigDO::getDeleted, true) + .in(RateLimitConfigDO::getId, ids) + .update(); + } +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java index 89e98f7..2c2ac1a 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java @@ -25,9 +25,6 @@ import java.time.ZoneId; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; @Slf4j @Service @@ -41,21 +38,20 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl(1000), - r -> { - Thread thread = new Thread(r); - thread.setName("msgRetryThreadPool-" + thread.getId()); - thread.setDaemon(false); - return thread; - }, - new ThreadPoolExecutor.CallerRunsPolicy() - ); +// public final ThreadPoolExecutor MSG_RETRY_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor( +// 3, +// 3, +// 1000, +// TimeUnit.MILLISECONDS, +// new java.util.concurrent.ArrayBlockingQueue<>(1000), +// r -> { +// Thread thread = new Thread(r); +// thread.setName("msgRetryThreadPool-" + thread.getId()); +// thread.setDaemon(false); +// return thread; +// }, +// new ThreadPoolExecutor.CallerRunsPolicy() +// ); /** * 默认每次处理的消息数量 @@ -80,6 +76,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl() .eq(MessageRetryQueueDO::getMessageId, message.getMessageId()) ); + message.setLastRetryTime(message.getSendTime()); // 消息不在重试队列中 if (ObjectUtil.isNull(existing)) { @@ -249,34 +246,49 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR); + //CompletableFuture sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR); - sendFuture.orTimeout(5, TimeUnit.SECONDS) - .thenAccept(sendResult -> { - messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); - if (sendResult) { - log.info("处理消息重试成功逻辑:messageId={}", messageRecordDO.getMessageId()); - messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); - this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); - // todo 重试成功后是否要删除retry_history表中的数据??? +// sendFuture.orTimeout(5, TimeUnit.SECONDS) +// .thenAccept(sendResult -> { +// messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); +// if (sendResult) { +// log.info("处理消息重试成功逻辑:messageId={}", messageRecordDO.getMessageId()); +// messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); +// this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); +// +// } else { +// // 重试失败,更新重试信息 +// log.error("处理消息重试失败逻辑:messageId={}", messageRecordDO.getMessageId()); +// channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); +// handleRetryFailure(messageRecordDO); +// } +// }).exceptionally(ex -> { +// messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); +// log.error("异步执行消息重试发生异常:messageId={}", messageRecordDO.getMessageId(), ex); +// // 发生异常时也尝试处理失败逻辑,避免消息丢失 +// try { +// handleRetryFailure(messageRecordDO); +// } catch (Exception e) { +// log.error("异常处理后再次失败:messageId={}", messageRecordDO.getMessageId(), e); +// } +// return null; +// }); - } else { - // 重试失败,更新重试信息 - log.error("处理消息重试失败逻辑:messageId={}", messageRecordDO.getMessageId()); - channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); - handleRetryFailure(messageRecordDO); - } - }).exceptionally(ex -> { - messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); - log.error("异步执行消息重试发生异常:messageId={}", messageRecordDO.getMessageId(), ex); - // 发生异常时也尝试处理失败逻辑,避免消息丢失 - try { - handleRetryFailure(messageRecordDO); - } catch (Exception e) { - log.error("异常处理后再次失败:messageId={}", messageRecordDO.getMessageId(), e); - } - return null; - }); + boolean sendResult = messageRecordService.processSendMsg(messageRecordDO); + + messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); + if (sendResult) { + log.info("处理消息重试成功逻辑:messageId={}", messageRecordDO.getMessageId()); + messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); + this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); + + } else { + // 重试失败,更新重试信息 + log.error("处理消息重试失败逻辑:messageId={}", messageRecordDO.getMessageId()); + channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); + handleRetryFailure(messageRecordDO); + } + messageRecordService.updateMessage(messageRecordDO); } /** @@ -296,10 +308,13 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl= messageRetryQueueDO.getMaxRetry()) { + messageRecordDO.setStatus(MessageStatusConstant.FINALFAILED); + messageRecordDO.setNextRetryTime(null); + // 达到最大重试次数,标记为最终失败 // 更新消息的状态为final_failed - messageRecordService.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg()); + //messageRecordService.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg(), messageRecordDO.getSendTime(), null); // 数据库中不能删除 // retryQueueMapper.deleteByMessageId(messageRecordDO.getMessageId()); @@ -308,6 +323,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl listAll(); @@ -28,10 +29,10 @@ public interface RetryStrategyConfigService { /** * 修改重试策略配置 * - * @param strategyConfigVO + * @param strategyConfigReqVO * @return */ - boolean updateStrategyConfig(RetryStrategyConfigVO strategyConfigVO); + boolean updateStrategyConfig(RetryStrategyConfigReqVO strategyConfigReqVO); boolean toggleEnableField(String id); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java index 1635942..9172493 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java @@ -2,7 +2,7 @@ package com.njcn.msgpush.module.push.service.retry; import cn.hutool.core.bean.BeanUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO; +import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigReqVO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.mysql.retry.RetryStrategyConfigMapper; import org.springframework.stereotype.Service; @@ -32,8 +32,8 @@ public class RetryStrategyConfigServiceImpl extends ServiceImpl