From be7c1b7b26d7bdb02084c71c9db692a6a5540971 Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Mon, 23 Mar 2026 15:39:10 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E9=85=8D=E9=A2=9D=E3=80=81re?= =?UTF-8?q?dis=E7=BC=93=E5=AD=98=E5=90=84=E4=B8=AA=E7=B3=BB=E7=BB=9F&?= =?UTF-8?q?=E6=B8=A0=E9=81=93=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../msgpush/module/push/checker/IChecker.java | 3 +- .../push/checker/MsgPushGuardChain.java | 26 ++--- .../push/checker/impl/BlacklistChecker.java | 13 ++- .../push/checker/impl/QuotaChecker.java | 13 ++- .../push/checker/impl/RateLimitChecker.java | 5 +- .../client/sender/impl/AliyunEmailSender.java | 16 +-- .../client/sender/impl/AliyunSmsSender.java | 20 ++-- .../client/sender/impl/TelecomSmsSender.java | 19 ++-- ...usConstant.java => MsgStatusConstant.java} | 2 +- .../message/MessageRecordController.java | 3 +- .../admin/message/vo/MessageSendResultVO.java | 21 ++++ .../dataobject/quota/SystemQuotaConfigDO.java | 40 +++++++ .../mysql/quota/SystemQuotaConfigMapper.java | 14 +++ .../push/dal/redis/SystemQuotaRedisDAO.java | 44 ++++++++ .../service/blacklist/BlacklistService.java | 5 +- .../blacklist/BlacklistServiceImpl.java | 30 +++--- .../service/message/MessageRecordService.java | 11 +- .../message/MessageRecordServiceImpl.java | 100 +++++++++++------- .../quota/SystemQuotaConfigService.java | 18 ++++ .../quota/SystemQuotaConfigServiceImpl.java | 57 ++++++++++ .../retry/MessageRetryQueueServiceImpl.java | 58 ++-------- .../module/push/sms/MsgPushClientTest.java | 6 +- 22 files changed, 359 insertions(+), 165 deletions(-) rename msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/{MessageStatusConstant.java => MsgStatusConstant.java} (96%) create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/quota/SystemQuotaConfigDO.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/quota/SystemQuotaConfigMapper.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigService.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java index 1099fc1..32b3b78 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/IChecker.java @@ -1,6 +1,7 @@ package com.njcn.msgpush.module.push.checker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import java.util.List; @@ -11,5 +12,5 @@ import java.util.List; */ public interface IChecker { - void check(List messageRecordReqVOList); + void check(List messageRecordList); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java index 1fad7b7..3c452bd 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/MsgPushGuardChain.java @@ -3,9 +3,10 @@ package com.njcn.msgpush.module.push.checker; import com.njcn.msgpush.module.push.checker.impl.BlacklistChecker; import com.njcn.msgpush.module.push.checker.impl.QuotaChecker; import com.njcn.msgpush.module.push.checker.impl.RateLimitChecker; -import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; -import java.util.ArrayList; import java.util.List; /** @@ -13,19 +14,18 @@ import java.util.List; * @data 2026-02-28 * @description 检查链 */ +@Component public class MsgPushGuardChain { - private final List checkers; + @Autowired + private BlacklistChecker blacklistChecker; + @Autowired + private QuotaChecker quotaChecker; + @Autowired + private RateLimitChecker rateLimitChecker; - public MsgPushGuardChain() { - this.checkers = new ArrayList<>(); - this.checkers.add(new BlacklistChecker()); - this.checkers.add(new QuotaChecker()); - this.checkers.add(new RateLimitChecker()); - } - public void checkAll(List messageRecordReqVOList) { - for (IChecker checker : checkers) { - checker.check(messageRecordReqVOList); - } + public void checkAll(List messageRecordList) { + blacklistChecker.check(messageRecordList); + quotaChecker.check(messageRecordList); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java index 28024f4..59a45eb 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/BlacklistChecker.java @@ -1,7 +1,8 @@ package com.njcn.msgpush.module.push.checker.impl; import com.njcn.msgpush.module.push.checker.IChecker; -import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.service.blacklist.BlacklistService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -19,7 +20,13 @@ public class BlacklistChecker implements IChecker { private BlacklistService blacklistService; @Override - public void check(List messageRecordReqVOList) { - blacklistService.check(messageRecordReqVOList); + public void check(List messageRecordList) { + for (int i = 0; i < messageRecordList.size(); i++) { + MessageRecordDO messageRecordDO = messageRecordList.get(i); + boolean check = blacklistService.check(messageRecordDO); + if (!check) { + messageRecordDO.setStatus(MsgStatusConstant.BLACKLISTED); + } + } } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java index 2b5ea57..83430bc 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/QuotaChecker.java @@ -2,6 +2,10 @@ package com.njcn.msgpush.module.push.checker.impl; import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import com.njcn.msgpush.module.push.service.quota.SystemQuotaConfigService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.util.List; @@ -10,9 +14,14 @@ import java.util.List; * @data 2026-02-27 * @description 系统配额检查器 */ +@Component public class QuotaChecker implements IChecker { - @Override - public void check(List messageRecordReqVOList) { + @Autowired + private SystemQuotaConfigService systemQuotaConfigService; + + @Override + public void check(List messageRecordList) { + systemQuotaConfigService.check(messageRecordList); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java index c31398c..915f726 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/checker/impl/RateLimitChecker.java @@ -3,6 +3,8 @@ package com.njcn.msgpush.module.push.checker.impl; import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import org.springframework.stereotype.Component; import java.util.List; @@ -11,9 +13,10 @@ import java.util.List; * @data 2026-02-27 * @description 接收者频率检查器 */ +@Component public class RateLimitChecker implements IChecker { @Override - public void check(List messageRecordReqVOList) { + public void check(List messageRecordList) { } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java index 40a4a90..74de773 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java @@ -11,8 +11,8 @@ import com.aliyun.teautil.models.RuntimeOptions; import com.njcn.msgpush.module.push.client.sender.EmailSender; import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting; -import com.njcn.msgpush.module.push.constant.MessageStatusConstant; import com.njcn.msgpush.module.push.constant.MsgPushConstant; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import lombok.extern.slf4j.Slf4j; @@ -68,7 +68,7 @@ public class AliyunEmailSender implements EmailSender { Future future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { message.setLastRetryTime(message.getSendTime()); message.setNextRetryTime(null); - message.setStatus(MessageStatusConstant.SENDING); + message.setStatus(MsgStatusConstant.SENDING); RuntimeOptions runtimeOptions = new RuntimeOptions(); runtimeOptions.autoretry = true; @@ -93,13 +93,13 @@ public class AliyunEmailSender implements EmailSender { message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); System.out.println(toJSONString(response)); if (HttpStatus.OK.value() == response.getStatusCode()) { - message.setStatus(MessageStatusConstant.SUCCESS); - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null); - this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); + message.setStatus(MsgStatusConstant.SUCCESS); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null); + this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); return true; } else { - message.setStatus(MessageStatusConstant.FAILED); + message.setStatus(MsgStatusConstant.FAILED); message.setErrorCode(response.getStatusCode() + ""); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + ""); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { @@ -110,8 +110,8 @@ public class AliyunEmailSender implements EmailSender { } else { message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); } - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); - this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); + this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); return false; } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java index b9781bd..600094c 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java @@ -11,8 +11,8 @@ import com.aliyun.teautil.models.RuntimeOptions; import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.SmsSender; import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting; -import com.njcn.msgpush.module.push.constant.MessageStatusConstant; import com.njcn.msgpush.module.push.constant.MsgPushConstant; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import lombok.extern.slf4j.Slf4j; @@ -62,7 +62,7 @@ public class AliyunSmsSender implements SmsSender { Future future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { message.setLastRetryTime(message.getSendTime()); message.setNextRetryTime(null); - message.setStatus(MessageStatusConstant.SENDING); + message.setStatus(MsgStatusConstant.SENDING); RuntimeOptions runtimeOptions = new RuntimeOptions(); // 设置自动重试,默认是不开启的。重试次数默认是3次 runtimeOptions.autoretry = true; @@ -82,12 +82,12 @@ public class AliyunSmsSender implements SmsSender { message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); System.out.println(toJSONString(response)); if (HttpStatus.OK.value() == response.getStatusCode()) { - message.setStatus(MessageStatusConstant.SUCCESS); + message.setStatus(MsgStatusConstant.SUCCESS); this.getDownInfo(response.body.bizId, message); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); return true; } else { - message.setStatus(MessageStatusConstant.FAILED); + message.setStatus(MsgStatusConstant.FAILED); message.setErrorCode(response.body.code); message.setErrorMsg(response.body.message); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.body.code); @@ -151,7 +151,7 @@ public class AliyunSmsSender implements SmsSender { // if (response.statusCode != HttpStatus.OK.value()) { response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> { if (!"DELIVERED".equals(detail.errCode)) { - message.setStatus(MessageStatusConstant.FAILED); + message.setStatus(MsgStatusConstant.FAILED); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode); message.setErrorCode(detail.errCode); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { @@ -162,13 +162,13 @@ public class AliyunSmsSender implements SmsSender { } else { message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); } - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); - this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); + this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); } else { - message.setStatus(MessageStatusConstant.SUCCESS); - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null, message.getLastRetryTime(), null); - this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null); + message.setStatus(MsgStatusConstant.SUCCESS); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, null, null, message.getLastRetryTime(), null); + this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, null, null); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); } }); diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java index 3533040..cf5368a 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java @@ -7,8 +7,8 @@ import com.alibaba.fastjson.JSONObject; import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.SmsSender; import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting; -import com.njcn.msgpush.module.push.constant.MessageStatusConstant; import com.njcn.msgpush.module.push.constant.MsgPushConstant; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import lombok.Data; @@ -85,7 +85,7 @@ public class TelecomSmsSender implements SmsSender { Future future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { message.setLastRetryTime(message.getSendTime()); message.setNextRetryTime(null); - message.setStatus(MessageStatusConstant.SENDING); + message.setStatus(MsgStatusConstant.SENDING); // 构建请求参数 Map request = new HashMap<>(); boolean isTemplateSend = StrUtil.isNotBlank(message.getTemplateCode()); @@ -137,7 +137,7 @@ public class TelecomSmsSender implements SmsSender { return true; } else { ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + ""); - message.setStatus(MessageStatusConstant.FAILED); + message.setStatus(MsgStatusConstant.FAILED); message.setErrorCode(telecomSmsSendResponse.list.get(0).result + ""); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); @@ -264,7 +264,7 @@ public class TelecomSmsSender implements SmsSender { TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class); TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0); if (telecomSmsSelectDetailRes.getStatus() == 5) { - message.setStatus(MessageStatusConstant.FAILED); + message.setStatus(MsgStatusConstant.FAILED); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat()); message.setErrorCode(telecomSmsSelectDetailRes.getStat()); if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { @@ -274,16 +274,15 @@ public class TelecomSmsSender implements SmsSender { } } else { message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); - //this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); - this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime()); + this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); } if (telecomSmsSelectDetailRes.getStatus() == 4) { - message.setStatus(MessageStatusConstant.SUCCESS); - this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null); - this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); + message.setStatus(MsgStatusConstant.SUCCESS); + this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null); + this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MessageStatusConstant.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MsgStatusConstant.java similarity index 96% rename from msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MessageStatusConstant.java rename to msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MsgStatusConstant.java index 4306b15..1c53df1 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MessageStatusConstant.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MsgStatusConstant.java @@ -4,7 +4,7 @@ package com.njcn.msgpush.module.push.constant; * @author caozehui * @data 2026-02-26 */ -public class MessageStatusConstant { +public class MsgStatusConstant { //待发送 public static final String PENDING = "pending"; //发送中 diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java index 3f904e3..f61c820 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java @@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.service.message.MessageRecordService; import io.swagger.v3.oas.annotations.Operation; @@ -33,7 +34,7 @@ public class MessageRecordController { @PostMapping("/send") @Operation(summary = "消息推送") @Idempotent(timeout = 60) - public CommonResult send(@Valid @RequestBody List reqVOList) { + public CommonResult> send(@Valid @RequestBody List reqVOList) { return success(messageRecordService.send(reqVOList)); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java new file mode 100644 index 0000000..a6c05c3 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageSendResultVO.java @@ -0,0 +1,21 @@ +package com.njcn.msgpush.module.push.controller.admin.message.vo; + +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-23 + */ +@Data +public class MessageSendResultVO { + private String messageId; + /** + * 发送结果 + */ + private Boolean result; + + /** + * 详情。成功为success,失败为黑名单拦截、配额超限、调用第三方服务发送消息失败 + */ + private String detail; +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/quota/SystemQuotaConfigDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/quota/SystemQuotaConfigDO.java new file mode 100644 index 0000000..2cda705 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/quota/SystemQuotaConfigDO.java @@ -0,0 +1,40 @@ +package com.njcn.msgpush.module.push.dal.dataobject.quota; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author caozehui + * @data 2026-03-23 + */ +@Data +@TableName("push_system_quota_config") +@EqualsAndHashCode(callSuper = true) +public class SystemQuotaConfigDO extends BaseDO { + /** + * 主键 ID + */ + private Long id; + + /** + * 渠道类型:sms/email/app_push + */ + private String channel; + + /** + * 来源系统 + */ + private String appName; + + /** + * 系统每日总配额 + */ + private Integer dailyQuota; + + /** + * 是否启用:0-否 1-是 + */ + private Boolean enabled; +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/quota/SystemQuotaConfigMapper.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/quota/SystemQuotaConfigMapper.java new file mode 100644 index 0000000..c85ae9c --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/quota/SystemQuotaConfigMapper.java @@ -0,0 +1,14 @@ +package com.njcn.msgpush.module.push.dal.mysql.quota; + +import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX; +import com.njcn.msgpush.module.push.dal.dataobject.quota.SystemQuotaConfigDO; +import org.apache.ibatis.annotations.Mapper; + +/** + * @author caozehui + * @data 2026-03-23 + */ +@Mapper +public interface SystemQuotaConfigMapper extends BaseMapperX { + +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java new file mode 100644 index 0000000..aa0a91c --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/redis/SystemQuotaRedisDAO.java @@ -0,0 +1,44 @@ +package com.njcn.msgpush.module.push.dal.redis; + +import cn.hutool.core.util.ObjectUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class SystemQuotaRedisDAO { + private final RedisTemplate redisTemplate; + + private static final String QUOTA_KEY_PREFIX = "msgpush:quota:"; + + public void set(String channel, String appName, boolean isSchedule) { + String key = buildKey(channel, appName); + + if (isSchedule) { + redisTemplate.opsForValue().set(key, "0"); + } else { + String countStr = redisTemplate.opsForValue().get(key); + Integer count = 0; + if (ObjectUtil.isNull(countStr)) { + count = 1; + } else { + count = Integer.parseInt(countStr) + 1; + } + + redisTemplate.opsForValue().set(key, String.valueOf(count)); + } + } + + public Integer get(String channel, String appName) { + String key = buildKey(channel, appName); + String countStr = redisTemplate.opsForValue().get(key); + return ObjectUtil.isNull(countStr) ? 0 : Integer.parseInt(countStr); + } + + private String buildKey(String channel, String appName) { + return QUOTA_KEY_PREFIX + channel + ":" + appName; + } +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java index 7990894..f39ada8 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistService.java @@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.extension.service.IService; import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import java.util.List; @@ -20,7 +21,7 @@ public interface BlacklistService extends IService { /** * 进行黑名单检查 * - * @param messageRecordReqVOList + * @param messageRecordDO */ - void check(List messageRecordReqVOList); + boolean check(MessageRecordDO messageRecordDO); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java index 07aae8d..f232f22 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/blacklist/BlacklistServiceImpl.java @@ -9,6 +9,7 @@ import com.njcn.msgpush.framework.common.util.object.PageUtils; import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.mysql.blacklist.BlacklistMapper; import org.springframework.stereotype.Service; @@ -34,24 +35,19 @@ public class BlacklistServiceImpl extends ServiceImpl messageRecordReqVOList) { - List exists = new ArrayList<>(); - for (int i = messageRecordReqVOList.size() - 1; i >= 0; i--) { - MessageRecordReqVO messageRecordReqVO = messageRecordReqVOList.get(i); - String receiver = messageRecordReqVO.getReceiver(); - String[] split = receiver.split(String.valueOf(StrUtil.C_COMMA)); - BlacklistDO blacklistDO = this.lambdaQuery() - .eq(BlacklistDO::getChannel, messageRecordReqVO.getChannel()) - .in(split.length > 0, BlacklistDO::getTarget, split) - .eq(BlacklistDO::getDeleted, false).one(); + public boolean check(MessageRecordDO messageRecordDO) { + String receiver = messageRecordDO.getReceiver(); + String[] split = receiver.split(String.valueOf(StrUtil.C_COMMA)); + BlacklistDO blacklistDO = this.lambdaQuery() + .eq(BlacklistDO::getChannel, messageRecordDO.getChannel()) + .in(split.length > 0, BlacklistDO::getTarget, split) + .eq(BlacklistDO::getDeleted, false).one(); - if (ObjectUtil.isNotNull(blacklistDO)) { - messageRecordReqVOList.remove(i); - blacklistDO.setHitCount(blacklistDO.getHitCount() + 1); - exists.add(blacklistDO); - } + if (ObjectUtil.isNotNull(blacklistDO)) { + blacklistDO.setHitCount(blacklistDO.getHitCount() + 1); + this.updateById(blacklistDO); + return false; } - // 更新 - this.updateBatchById(exists); + return true; } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java index bac595e..b2a00f1 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java @@ -2,6 +2,7 @@ package com.njcn.msgpush.module.push.service.message; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import java.io.Serializable; @@ -15,17 +16,17 @@ public interface MessageRecordService { * 发送消息(包括email、sms、app_push) * * @param reqVOList - * @return 发送是否成功的结果 + * @return 发送的结果 */ - boolean send(List reqVOList); + List send(List reqVOList); /** * 处理发送消息 * - * @param messageRecordDO - * @return 发送是否成功的结果 + * @param messageRecordDOList + * @return 发送的结果 */ - boolean processSendMsg(MessageRecordDO messageRecordDO); + List processSendMsg(List messageRecordDOList); /** * 添加消息记录 diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java index 019c1d0..06fb2b6 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java @@ -8,14 +8,17 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.msgpush.framework.common.util.object.PageUtils; +import com.njcn.msgpush.module.push.checker.MsgPushGuardChain; import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory; import com.njcn.msgpush.module.push.client.sender.Sender; -import com.njcn.msgpush.module.push.constant.MessageStatusConstant; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryHistoryDO; import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper; +import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService; import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService; @@ -25,11 +28,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.Serializable; -import java.time.LocalDateTime; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @Service public class MessageRecordServiceImpl extends ServiceImpl implements MessageRecordService { @@ -47,52 +46,77 @@ public class MessageRecordServiceImpl extends ServiceImpl messageProviderFactoryMap; + @Autowired + private MsgPushGuardChain msgPushGuardChain; + + @Autowired + private SystemQuotaRedisDAO systemQuotaRedisDAO; + @Override @Transactional(rollbackFor = Exception.class) - public boolean send(List reqVOList) { + public List send(List reqVOList) { List messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class); messageRecordDOList.forEach(messageRecordDO -> { - messageRecordDO.setStatus(MessageStatusConstant.PENDING); + messageRecordDO.setStatus(MsgStatusConstant.PENDING); }); this.saveBatch(messageRecordDOList); - boolean sendResult = true; - for (MessageRecordDO messageRecordDO : messageRecordDOList) { - sendResult &= this.processSendMsg(messageRecordDO); - } - return sendResult; + return this.processSendMsg(messageRecordDOList); } @Override @Transactional(rollbackFor = Exception.class) - public boolean processSendMsg(MessageRecordDO messageRecordDO) { - ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); - MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType()); - if (ObjectUtil.isNull(messageProviderFactory) || ObjectUtil.isNull(channelProviderConfigDO)) { - throw new RuntimeException("暂不支持该供应商或者该供应商未激活:" + messageRecordDO.getProviderType()); - } - boolean sendResult = switch (ChannelTypeEnum.getByCode(messageRecordDO.getChannel())) { - case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO); - case EMAIL -> { - Map params = new HashMap<>(); - yield messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, params); + public List processSendMsg(List messageRecordDOList) { + msgPushGuardChain.checkAll(messageRecordDOList); + List resultList = new ArrayList<>(); + for (MessageRecordDO messageRecordDO : messageRecordDOList) { + MessageSendResultVO messageSendResultVO = new MessageSendResultVO(); + messageSendResultVO.setMessageId(messageRecordDO.getMessageId()); + if (!MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) { + messageSendResultVO.setResult(false); + if (messageRecordDO.getStatus().equals(MsgStatusConstant.BLACKLISTED)) { + messageSendResultVO.setDetail("黑名单拦截"); + } + if (messageRecordDO.getStatus().equals(MsgStatusConstant.QUOTAEXCEEDED)) { + messageSendResultVO.setDetail("配额超限"); + } + resultList.add(messageSendResultVO); + continue; } - case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO); - default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel()); - }; - MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id"); - if (sendResult) { - this.updateMessage(messageRecordDO); - messageRetryHistoryDO.setStatus(MessageStatusConstant.SUCCESS); - } else { - this.updateMessage(messageRecordDO); - messageRetryHistoryDO.setStatus(MessageStatusConstant.FAILED); + ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); + MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType()); + if (ObjectUtil.isNull(messageProviderFactory) || ObjectUtil.isNull(channelProviderConfigDO)) { + throw new RuntimeException("暂不支持该供应商或者该供应商未激活:" + messageRecordDO.getProviderType()); + } + boolean sendResult = switch (ChannelTypeEnum.getByCode(messageRecordDO.getChannel())) { + case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO); + case EMAIL -> { + Map params = new HashMap<>(); + yield messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, params); + } + case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO); + default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel()); + }; + + MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id"); + if (sendResult) { + this.updateMessage(messageRecordDO); + messageRetryHistoryDO.setStatus(MsgStatusConstant.SUCCESS); + messageSendResultVO.setResult(true); + } else { + this.updateMessage(messageRecordDO); + messageRetryHistoryDO.setStatus(MsgStatusConstant.FAILED); + messageSendResultVO.setResult(false); + messageSendResultVO.setDetail("调用第三方服务发送消息失败"); + } + resultList.add(messageSendResultVO); + messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId())); + messageRetryHistoryService.add(messageRetryHistoryDO); + // 更新配额 + systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(),false); } - messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId())); - messageRetryHistoryService.add(messageRetryHistoryDO); - - return sendResult; + return resultList; } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigService.java new file mode 100644 index 0000000..c3ebff2 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigService.java @@ -0,0 +1,18 @@ +package com.njcn.msgpush.module.push.service.quota; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import com.njcn.msgpush.module.push.dal.dataobject.quota.SystemQuotaConfigDO; + +import java.util.List; + +/** + * @author caozehui + * @data 2026-03-23 + */ +public interface SystemQuotaConfigService extends IService { + SystemQuotaConfigDO getByChannelAndAppName(String channel, String appName); + + void check(List messageRecordList); +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java new file mode 100644 index 0000000..ca2f673 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/quota/SystemQuotaConfigServiceImpl.java @@ -0,0 +1,57 @@ +package com.njcn.msgpush.module.push.service.quota; + +import cn.hutool.core.util.ObjectUtil; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; +import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; +import com.njcn.msgpush.module.push.dal.dataobject.quota.SystemQuotaConfigDO; +import com.njcn.msgpush.module.push.dal.mysql.quota.SystemQuotaConfigMapper; +import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author caozehui + * @data 2026-03-23 + */ +@Service +public class SystemQuotaConfigServiceImpl extends ServiceImpl implements SystemQuotaConfigService { + @Autowired + private SystemQuotaRedisDAO systemQuotaRedisDAO; + + @Override + public SystemQuotaConfigDO getByChannelAndAppName(String channel, String appName) { + return this.lambdaQuery() + .eq(SystemQuotaConfigDO::getChannel, channel) + .eq(SystemQuotaConfigDO::getAppName, appName) + .eq(SystemQuotaConfigDO::getDeleted, false) + .one(); + } + + @Override + public void check(List messageRecordList) { + for (int i = 0; i < messageRecordList.size(); i++) { + MessageRecordDO messageRecordDO = messageRecordList.get(i); + SystemQuotaConfigDO systemQuotaConfigDO = this.getByChannelAndAppName(messageRecordDO.getChannel(), messageRecordDO.getAppName()); + + if (ObjectUtil.isNotNull(systemQuotaConfigDO)) { + Integer dailyQuota = systemQuotaConfigDO.getDailyQuota(); + Integer count = systemQuotaRedisDAO.get(messageRecordDO.getChannel(), messageRecordDO.getAppName()); + if (count >= dailyQuota) { + messageRecordDO.setStatus(MsgStatusConstant.QUOTAEXCEEDED); + } + } + } + } + + @Scheduled(cron = "0 0 0 * * ?") + public void resetDailyQuota() { + List list = this.lambdaQuery().eq(SystemQuotaConfigDO::getEnabled, true).eq(SystemQuotaConfigDO::getDeleted, false).list(); + for (SystemQuotaConfigDO systemQuotaConfigDO : list) { + systemQuotaRedisDAO.set(systemQuotaConfigDO.getChannel(), systemQuotaConfigDO.getAppName(), true); + } + } +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java index 2c2ac1a..6e6c2a9 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java @@ -5,7 +5,8 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.msgpush.framework.common.pojo.PageResult; -import com.njcn.msgpush.module.push.constant.MessageStatusConstant; +import com.njcn.msgpush.module.push.constant.MsgStatusConstant; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; @@ -21,7 +22,6 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Collections; import java.util.List; import java.util.Set; @@ -177,7 +177,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE); // 从redis中查询需要重试的消息 - long epochMilli = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + long epochMilli = System.currentTimeMillis(); Set needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, epochMilli, DEFAULT_BATCH_SIZE); // 没有需要重试的消息 @@ -245,36 +245,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR); - -// sendFuture.orTimeout(5, TimeUnit.SECONDS) -// .thenAccept(sendResult -> { -// messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); -// if (sendResult) { -// log.info("处理消息重试成功逻辑:messageId={}", messageRecordDO.getMessageId()); -// messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); -// this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); -// -// } else { -// // 重试失败,更新重试信息 -// log.error("处理消息重试失败逻辑:messageId={}", messageRecordDO.getMessageId()); -// channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); -// handleRetryFailure(messageRecordDO); -// } -// }).exceptionally(ex -> { -// messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); -// log.error("异步执行消息重试发生异常:messageId={}", messageRecordDO.getMessageId(), ex); -// // 发生异常时也尝试处理失败逻辑,避免消息丢失 -// try { -// handleRetryFailure(messageRecordDO); -// } catch (Exception e) { -// log.error("异常处理后再次失败:messageId={}", messageRecordDO.getMessageId(), e); -// } -// return null; -// }); - - boolean sendResult = messageRecordService.processSendMsg(messageRecordDO); + MessageSendResultVO messageSendResultVO = messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0); + boolean sendResult = messageSendResultVO.getResult(); messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); if (sendResult) { @@ -291,15 +263,6 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl 0.3; // 70%成功率 - } - /** * 处理重试失败的情况 */ @@ -308,22 +271,15 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl= messageRetryQueueDO.getMaxRetry()) { - messageRecordDO.setStatus(MessageStatusConstant.FINALFAILED); + messageRecordDO.setStatus(MsgStatusConstant.FINALFAILED); messageRecordDO.setNextRetryTime(null); - // 达到最大重试次数,标记为最终失败 - - // 更新消息的状态为final_failed - //messageRecordService.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg(), messageRecordDO.getSendTime(), null); - - // 数据库中不能删除 - // retryQueueMapper.deleteByMessageId(messageRecordDO.getMessageId()); // redis中可以删除 messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); } else { // 还可以继续重试,更新重试信息 LocalDateTime nextRetryTime = this.calculateNextRetryTime(messageRecordDO.getChannel(), newRetryCount); - messageRecordDO.setStatus(MessageStatusConstant.FAILED); + messageRecordDO.setStatus(MsgStatusConstant.FAILED); messageRecordDO.setNextRetryTime(nextRetryTime); this.updateRetryInfo( diff --git a/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java b/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java index e8c56a7..9231093 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java +++ b/msgpush-module-push/msgpush-module-push-server/src/test/java/com/njcn/msgpush/module/push/sms/MsgPushClientTest.java @@ -1,9 +1,11 @@ package com.njcn.msgpush.module.push.sms; +import com.alibaba.fastjson.JSON; import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory; import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.SmsSender; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; +import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; import com.njcn.msgpush.module.push.enums.ProviderTypeEnum; @@ -65,10 +67,10 @@ public class MsgPushClientTest { message.setProviderType(ProviderTypeEnum.TELECOM.getCode()); messageIdList.add(message); } - boolean sendResult = messageRecordService.send(messageIdList); + List sendResult = messageRecordService.send(messageIdList); Thread.sleep(9000); - System.out.println(sendResult); + System.out.println(JSON.toJSONString(sendResult)); } @Test