From 8806ba7afac9488440ad2c3408d791c0f14c3f79 Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Fri, 6 Mar 2026 08:42:19 +0800 Subject: [PATCH] =?UTF-8?q?=E8=8E=B7=E5=8F=96=E4=B8=8B=E8=A1=8C=E7=8A=B6?= =?UTF-8?q?=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.yaml | 9 +++ .../client/sender/impl/AliyunSmsSender.java | 56 ++++++++++++- .../client/sender/impl/TelecomSmsSender.java | 81 ++++++++----------- .../ChannelProviderConfigController.java | 5 +- .../message/MessageRecordController.java | 11 +-- .../retry/MessageRetryQueueController.java | 34 ++++++-- .../retry/vo/MessageRetryQueueReqVO.java | 3 + .../admin/retry/vo/RetryStrategyConfigVO.java | 40 +++++++++ .../retry/RetryStrategyConfigDO.java | 40 +++++++++ .../retry/RetryStrategyConfigMapper.java | 13 +++ .../module/push/enums/RetryStrategyEnum.java | 70 ---------------- .../retry/MessageRetryQueueService.java | 6 +- .../retry/MessageRetryQueueServiceImpl.java | 73 +++++++++++++++-- .../retry/RetryStrategyConfigService.java | 37 +++++++++ .../retry/RetryStrategyConfigServiceImpl.java | 46 +++++++++++ 15 files changed, 378 insertions(+), 146 deletions(-) create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigVO.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/RetryStrategyConfigDO.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/RetryStrategyConfigMapper.java delete mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/enums/RetryStrategyEnum.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigService.java create mode 100644 msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java diff --git a/msgpush-gateway/src/main/resources/application.yaml b/msgpush-gateway/src/main/resources/application.yaml index c5c3510..a302153 100644 --- a/msgpush-gateway/src/main/resources/application.yaml +++ b/msgpush-gateway/src/main/resources/application.yaml @@ -66,6 +66,12 @@ spring: uri: grayLb://infra-server predicates: # 断言,作为路由的匹配条件,对应 RouteDefinition 数组 - Path=/infra/ws/** + - id: push-admin-api # 路由的编号 + uri: grayLb://push-server + predicates: # 断言,作为路由的匹配条件,对应 RouteDefinition 数组 + - Path=/admin-api/push/** + filters: + - RewritePath=/admin-api/push/v3/api-docs, /v3/api-docs x-forwarded: prefix-enabled: false # 避免 Swagger 重复带上额外的 /admin-api/system 前缀 @@ -87,6 +93,9 @@ knife4j: - name: infra-server service-name: infra-server url: /admin-api/infra/v3/api-docs + - name: push-server + service-name: push-server + url: /admin-api/push/v3/api-docs --- #################### 灿能相关配置 #################### diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java index e882999..fa739e0 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java @@ -2,6 +2,8 @@ package com.njcn.msgpush.module.push.client.sender.impl; import cn.hutool.core.util.ObjectUtil; import com.aliyun.dysmsapi20170525.Client; +import com.aliyun.dysmsapi20170525.models.QuerySendDetailsRequest; +import com.aliyun.dysmsapi20170525.models.QuerySendDetailsResponse; import com.aliyun.dysmsapi20170525.models.SendSmsRequest; import com.aliyun.dysmsapi20170525.models.SendSmsResponse; import com.aliyun.teaopenapi.models.Config; @@ -10,14 +12,17 @@ import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.SmsSender; import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting; import com.njcn.msgpush.module.push.constant.MessageStatusConstant; +import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; -import com.njcn.msgpush.module.push.enums.RetryStrategyEnum; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.aliyun.teautil.Common.toJSONString; @@ -35,6 +40,13 @@ public class AliyunSmsSender implements SmsSender { private Client smsClient; + /** + * 存放发送完成的消息。key为其返回的mid + */ +// private Map completeSendMessageMap = new HashMap<>(); + + private ScheduledExecutorService scheduledExecutorService; + public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) { this.sender = sender; if (ObjectUtil.isNotNull(aliYunSmsSetting)) { @@ -43,7 +55,6 @@ public class AliyunSmsSender implements SmsSender { .setAccessKeySecret(aliYunSmsSetting.getAccessKeySecret()) .setRegionId(aliYunSmsSetting.getRegionId()) .setEndpoint(aliYunSmsSetting.getEndpoint()); - try { this.smsClient = new Client(config); } catch (Exception e) { @@ -74,11 +85,12 @@ public class AliyunSmsSender implements SmsSender { message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); System.out.println(toJSONString(response)); if (OK.equals(response.body.code)) { + this.getDownInfo(response.body.bizId, message); return true; } else { message.setErrorCode(response.body.code); message.setErrorMsg(response.body.message); - this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF); + this.sender.messageRetryQueueService.addRetryMessage(message); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); return false; } @@ -105,4 +117,42 @@ public class AliyunSmsSender implements SmsSender { } return res; } + + /** + * 获取下行信息 + * + * @param bizId + * @param message + */ + private void getDownInfo(String bizId, MessageRecordDO message) { + if (ObjectUtil.isNull(this.scheduledExecutorService)) { + this.scheduledExecutorService = Executors.newScheduledThreadPool(1); + } + this.scheduledExecutorService.schedule(() -> { + QuerySendDetailsRequest request = new QuerySendDetailsRequest() + .setPhoneNumber(message.getReceiver()) + .setBizId(bizId) + .setSendDate(message.getSendTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))) + .setCurrentPage(1L) + .setPageSize(10L); + try { + QuerySendDetailsResponse response = this.smsClient.querySendDetails(request); + System.out.println(toJSONString(response)); +// if (response.statusCode != HttpStatus.OK.value()) { + response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> { + if (!"DELIVERED".equals(detail.errCode)) { + ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode); + message.setErrorCode(detail.errCode); + message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); + this.sender.messageRetryQueueService.addRetryMessage(message); + } + }); +// } + } catch (Exception e) { + throw new RuntimeException(e); + } + this.scheduledExecutorService.shutdown(); + this.scheduledExecutorService = null; + }, 10, TimeUnit.SECONDS); + } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java index 500168e..99f38fe 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java @@ -8,7 +8,6 @@ import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting; import com.njcn.msgpush.module.push.constant.MessageStatusConstant; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; -import com.njcn.msgpush.module.push.enums.RetryStrategyEnum; import lombok.Data; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; @@ -39,11 +38,6 @@ public class TelecomSmsSender implements SmsSender { private TelecomSmsSetting telecomSmsSetting; private Sender sender; - /** - * 存放发送完成的消息。key为其返回的mid - */ - private Map completeSendMessageMap = new HashMap<>(); - private ScheduledExecutorService scheduledExecutorService; @Data @@ -118,22 +112,14 @@ public class TelecomSmsSender implements SmsSender { if (response1.getStatusCode() == HttpStatus.OK) { String mid = telecomSmsSendResponse.list.get(0).mid; - completeSendMessageMap.put(mid, message); // 定时任务,指定时间间隔后获取下行信息 - if (ObjectUtil.isNull(this.scheduledExecutorService)) { - this.scheduledExecutorService = Executors.newScheduledThreadPool(1); - } - this.scheduledExecutorService.schedule(() -> { - this.getDownInfo(mid); - this.scheduledExecutorService.shutdown(); - this.scheduledExecutorService = null; - }, 10, TimeUnit.SECONDS); + this.getDownInfo(mid, message); return true; } else { ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + ""); message.setErrorCode(telecomSmsSendResponse.list.get(0).result + ""); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF); + this.sender.messageRetryQueueService.addRetryMessage(message); return false; } }); @@ -193,45 +179,48 @@ public class TelecomSmsSender implements SmsSender { * 获取下行信息 * * @param mid + * @param message */ - private void getDownInfo(String mid) { - System.out.println("getDownInfo" + LocalDateTime.now()); - // 构建请求参数 - Map request = new HashMap<>(); - request.put("action", "select"); - request.put("account", telecomSmsSetting.getAccount()); - request.put("password", telecomSmsSetting.getPassword()); - request.put("date", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))); - request.put("condition", "APMID"); - request.put("valueList", mid); + private void getDownInfo(String mid, MessageRecordDO message) { + if (ObjectUtil.isNull(this.scheduledExecutorService)) { + this.scheduledExecutorService = Executors.newScheduledThreadPool(1); + } + this.scheduledExecutorService.schedule(() -> { + // 构建请求参数 + Map request = new HashMap<>(); + request.put("action", "select"); + request.put("account", telecomSmsSetting.getAccount()); + request.put("password", telecomSmsSetting.getPassword()); + request.put("date", message.getSendTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))); + request.put("condition", "APMID"); + request.put("valueList", mid); // request.put("condition", "MOBILE"); // request.put("valueList", message.getReceiver()); - request.remove("mobile"); - request.remove("content"); - // 设置请求头 - HttpHeaders headers = new HttpHeaders(); - headers.set("Content-Type", CONTENT_TYPE); - ResponseEntity response = this.sender.restTemplateUtil.post( - telecomSmsSetting.getApiUrl(), - request, - headers, - String.class - ); - System.out.println(JSON.toJSONString(JSON.toJSONString(response))); + request.remove("mobile"); + request.remove("content"); + // 设置请求头 + HttpHeaders headers = new HttpHeaders(); + headers.set("Content-Type", CONTENT_TYPE); + ResponseEntity response = this.sender.restTemplateUtil.post( + telecomSmsSetting.getApiUrl(), + request, + headers, + String.class + ); + System.out.println(JSON.toJSONString(JSON.toJSONString(response))); - if (response.getStatusCode() == HttpStatus.OK) { +// if (response.getStatusCode() == HttpStatus.OK) { TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class); TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0); - if (telecomSmsSelectDetailRes.getStatus() == 4) { - return; - } else if (telecomSmsSelectDetailRes.getStatus() == 5) { - MessageRecordDO message = completeSendMessageMap.get(mid); + if (telecomSmsSelectDetailRes.getStatus() == 5) { ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat()); message.setErrorCode(telecomSmsSelectDetailRes.getStat()); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF); - completeSendMessageMap.remove(mid); + this.sender.messageRetryQueueService.addRetryMessage(message); } - } +// } + this.scheduledExecutorService.shutdown(); + this.scheduledExecutorService = null; + }, 10, TimeUnit.SECONDS); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java index 061ce53..8105e0f 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/channel/ChannelProviderConfigController.java @@ -49,7 +49,6 @@ public class ChannelProviderConfigController { @PostMapping("/page") @Operation(summary = "分页查询渠道服务商列表") @PreAuthorize("@ss.hasPermission('push:channel:page')") - @Parameter(name = "reqVO", description = "分页查询参数", required = true) public CommonResult> pageChannelProviderConfig(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { Page res = channelProviderConfigService.getPage(reqVO); return success(res); @@ -58,7 +57,6 @@ public class ChannelProviderConfigController { @PostMapping("/add") @Operation(summary = "新增渠道服务商") @PreAuthorize("@ss.hasPermission('push:channel:add')") - @Parameter(name = "reqVO", description = "新增参数", required = true) public CommonResult addChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { boolean res = channelProviderConfigService.save(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class)); return success(res); @@ -67,7 +65,6 @@ public class ChannelProviderConfigController { @PostMapping("/update") @Operation(summary = "更新渠道服务商") @PreAuthorize("@ss.hasPermission('push:channel:update')") - @Parameter(name = "reqVO", description = "更新参数", required = true) public CommonResult updateChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { boolean res = channelProviderConfigService.updateById(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class)); return success(res); @@ -77,7 +74,7 @@ public class ChannelProviderConfigController { @Operation(summary = "删除渠道服务商") @PreAuthorize("@ss.hasPermission('push:channel:delete')") @Parameter(name = "ids", description = "id列表", required = true) - public CommonResult deleteChannelProvider(@RequestBody List ids) { + public CommonResult deleteChannelProvider(@RequestParam("ids") List ids) { boolean res = channelProviderConfigService.removeBatchByIds(ids); return success(res); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java index a01f6d5..0cd7325 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/MessageRecordController.java @@ -15,10 +15,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.List; @@ -36,7 +33,6 @@ public class MessageRecordController { @PermitAll @PostMapping("/send") @Operation(summary = "消息推送") - @Parameter(name = "reqVOList", description = "消息列表") public CommonResult send(@Valid @RequestBody List reqVOList) { Boolean result = messageRecordService.send(reqVOList); return CommonResult.success(result); @@ -45,7 +41,6 @@ public class MessageRecordController { @PostMapping("/page") @Operation(summary = "分页查询渠道服务商列表") @PreAuthorize("@ss.hasPermission('push:message:page')") - @Parameter(name = "reqVO", description = "分页查询参数", required = true) public CommonResult> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) { Page res = messageRecordService.getPage(reqVO); return success(res); @@ -54,7 +49,6 @@ public class MessageRecordController { @PostMapping("/add") @Operation(summary = "添加消息记录") @PreAuthorize("@ss.hasPermission('push:message:add')") - @Parameter(name = "reqVO", description = "添加参数", required = true) public CommonResult add(@Validated @RequestBody MessageRecordReqVO reqVO) { return messageRecordService.add(reqVO) ? success(true) : success(false); } @@ -62,7 +56,6 @@ public class MessageRecordController { @PostMapping("/update") @Operation(summary = "更新消息记录") @PreAuthorize("@ss.hasPermission('push:message:update')") - @Parameter(name = "reqVO", description = "更新参数", required = true) public CommonResult update(@Validated @RequestBody MessageRecordReqVO reqVO) { return messageRecordService.update(reqVO) ? success(true) : success(false); } @@ -71,7 +64,7 @@ public class MessageRecordController { @Operation(summary = "删除消息记录") @PreAuthorize("@ss.hasPermission('push:message:delete')") @Parameter(name = "ids", description = "编号", required = true) - public CommonResult delete(@RequestBody List ids) { + public CommonResult delete(@RequestParam("ids") List ids) { return messageRecordService.delete(ids) ? success(true) : success(false); } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java index 3d37e88..f105a50 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/MessageRetryQueueController.java @@ -3,8 +3,11 @@ package com.njcn.msgpush.module.push.controller.admin.retry; import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.common.pojo.PageResult; import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO; +import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; +import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService; +import com.njcn.msgpush.module.push.service.retry.RetryStrategyConfigService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; @@ -12,10 +15,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.List; @@ -34,11 +34,12 @@ public class MessageRetryQueueController { @Autowired private MessageRetryQueueService messageRetryQueueService; + @Autowired + private RetryStrategyConfigService retryStrategyConfigService; @PostMapping("/page") @Operation(summary = "分页获得消息重试队列") @PreAuthorize("@ss.hasPermission('push:retry:page')") - @Parameter(name = "reqVO", description = "分页查询参数", required = true) public CommonResult> getRetryPage(@Validated @RequestBody MessageRetryQueueReqVO reqVO) { PageResult result = messageRetryQueueService.getPage(reqVO); return success(result); @@ -47,8 +48,31 @@ public class MessageRetryQueueController { @PostMapping("/manual") @Operation(summary = "批量手动重试消息") @PreAuthorize("@ss.hasPermission('push:retry:manual')") + @Parameter(name = "messageIds", description = "消息ID列表", required = true) public CommonResult manualRetry(@RequestBody List messageIds) { messageRetryQueueService.manualRetry(messageIds); return success(null); } + + @PostMapping("/config/list") + @Operation(summary = "获得消息重试配置列表") + @PreAuthorize("@ss.hasPermission('push:retry:config:list')") + public CommonResult> listRetryConfig() { + return success(retryStrategyConfigService.listAll()); + } + + @PostMapping("/config/update") + @Operation(summary = "更新消息重试配置") + @PreAuthorize("@ss.hasPermission('push:retry:config:update')") + public CommonResult updateRetryConfig(@Validated @RequestBody RetryStrategyConfigVO retryStrategyConfigVO) { + return success(retryStrategyConfigService.updateStrategyConfig(retryStrategyConfigVO)); + } + + @PostMapping("/config/toggle") + @Operation(summary = "启用/禁用重试策略配置") + @PreAuthorize("@ss.hasPermission('push:retry:config:toggle')") + @Parameter(name = "id", description = "id", required = true) + public CommonResult toggleRetryConfig(@RequestParam("id") String id) { + return success(retryStrategyConfigService.toggleEnableField(id)); + } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java index dd5cb5c..6252046 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/MessageRetryQueueReqVO.java @@ -3,6 +3,7 @@ package com.njcn.msgpush.module.push.controller.admin.retry.vo; import com.njcn.msgpush.framework.common.pojo.PageParam; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; import lombok.Data; /** @@ -16,9 +17,11 @@ public class MessageRetryQueueReqVO extends PageParam { private String messageId; @Schema(description = "渠道类型", example = "sms/email/app_push") + @NotBlank(message = "渠道类型不能为空") private String channel; @Schema(description = "接收者", example = "10086") + @NotBlank(message = "接收者不能为空") private String receiver; @Schema(description = "最小重试次数") diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigVO.java new file mode 100644 index 0000000..5f22f1a --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/retry/vo/RetryStrategyConfigVO.java @@ -0,0 +1,40 @@ +package com.njcn.msgpush.module.push.controller.admin.retry.vo; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * @author caozehui + * @data 2026-03-05 + */ +@Data +@Schema(description = "管理后台 - 重试策略配置 Request VO") +public class RetryStrategyConfigVO { + /** + * 主键 ID + */ + @Schema(description = "主键 ID") + private Long id; + /** + * 渠道类型:sms/email/app_push + */ + @Schema(description = "渠道类型:sms/email/app_push") + @NotBlank(message = "渠道类型不能为空") + private String channel; + + /** + * 最大重试次数 + */ + @Schema(description = "最大重试次数") + @NotNull(message = "最大重试次数不能为空") + private Integer maxRetryCount; + + /** + * 重试间隔(秒),逗号分隔,如:300,600,1800 + */ + @Schema(description = "重试间隔(秒),逗号分隔,如:300,600,1800") + @NotBlank(message = "重试间隔不能为空") + private String retryIntervals; +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/RetryStrategyConfigDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/RetryStrategyConfigDO.java new file mode 100644 index 0000000..702081e --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/RetryStrategyConfigDO.java @@ -0,0 +1,40 @@ +package com.njcn.msgpush.module.push.dal.dataobject.retry; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author caozehui + * @data 2026-03-05 + */ +@Data +@TableName("push_retry_strategy_config") +@EqualsAndHashCode(callSuper = true) +public class RetryStrategyConfigDO extends BaseDO { + /** + * 主键 ID + */ + private Long id; + + /** + * 渠道类型:sms/email/app_push + */ + private String channel; + + /** + * 最大重试次数 + */ + private Integer maxRetryCount; + + /** + * 重试间隔(秒),逗号分隔,如:300,600,1800 + */ + private String retryIntervals; + + /** + * 是否启用:0-否 1-是 + */ + private Integer enabled; +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/RetryStrategyConfigMapper.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/RetryStrategyConfigMapper.java new file mode 100644 index 0000000..b72dc91 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/RetryStrategyConfigMapper.java @@ -0,0 +1,13 @@ +package com.njcn.msgpush.module.push.dal.mysql.retry; + +import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX; +import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; +import org.apache.ibatis.annotations.Mapper; + +/** + * @author caozehui + * @data 2026-03-05 + */ +@Mapper +public interface RetryStrategyConfigMapper extends BaseMapperX { +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/enums/RetryStrategyEnum.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/enums/RetryStrategyEnum.java deleted file mode 100644 index f165d75..0000000 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/enums/RetryStrategyEnum.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.njcn.msgpush.module.push.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -import java.time.LocalDateTime; - -/** - * @author caozehui - * @date 2026-02-27 - * @description 消息重试策略枚举 - */ -@Getter -@AllArgsConstructor -public enum RetryStrategyEnum { - - /** - * 固定间隔重试 - */ - FIXED_INTERVAL(1, "固定间隔重试") { - @Override - public LocalDateTime calculateNextRetryTime(int retryCount, int intervalSeconds) { - return LocalDateTime.now().plusSeconds(intervalSeconds); - } - }, - - /** - * 指数退避重试 - */ - EXPONENTIAL_BACKOFF(2, "指数退避重试") { - @Override - public LocalDateTime calculateNextRetryTime(int retryCount, int baseIntervalSeconds) { - // 基础间隔 * 2^重试次数,最大不超过1小时 - long delay = Math.min(baseIntervalSeconds * (1L << retryCount), 3600); - return LocalDateTime.now().plusSeconds(delay); - } - }; - - /** - * 自定义时间重试 - */ -// CUSTOM(3, "自定义时间重试") { -// @Override -// public LocalDateTime calculateNextRetryTime(int retryCount, int unused) { -// -// return LocalDateTime.now().plusMinutes(5 * retryCount); -// } -// }; - - private Integer code; - private String description; - - /** - * 计算下次重试时间 - * - * @param retryCount 当前重试次数 - * @param param 参数(根据策略不同含义不同) - * @return 下次重试时间 - */ - public abstract LocalDateTime calculateNextRetryTime(int retryCount, int param); - - public static RetryStrategyEnum fromCode(Integer code) { - for (RetryStrategyEnum strategy : values()) { - if (strategy.getCode().equals(code)) { - return strategy; - } - } - return FIXED_INTERVAL; // 默认返回固定间隔 - } -} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java index 2f657fb..c28898e 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java @@ -4,7 +4,6 @@ import com.njcn.msgpush.framework.common.pojo.PageResult; import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; -import com.njcn.msgpush.module.push.enums.RetryStrategyEnum; import java.util.List; @@ -12,10 +11,9 @@ public interface MessageRetryQueueService { /** * 添加消息到重试队列 * - * @param message 消息 - * @param strategy 重试策略 + * @param message 消息 */ - void addRetryMessage(MessageRecordDO message, RetryStrategyEnum strategy); + void addRetryMessage(MessageRecordDO message); /** * 批量处理重试消息 diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java index e0a960f..97c8538 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java @@ -2,15 +2,17 @@ package com.njcn.msgpush.module.push.service.retry; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.msgpush.framework.common.pojo.PageResult; import com.njcn.msgpush.module.push.constant.MessageStatusConstant; +import com.njcn.msgpush.module.push.constant.MsgPushConstant; import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; +import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper; import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO; -import com.njcn.msgpush.module.push.enums.RetryStrategyEnum; import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService; import com.njcn.msgpush.module.push.service.message.MessageRecordService; import lombok.extern.slf4j.Slf4j; @@ -35,6 +37,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl() @@ -88,7 +92,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl= split.length) { + plusSeconds = Long.parseLong(split[split.length - 1]); + } else { + plusSeconds = Long.parseLong(split[retryCount - 1]); + } + } + return LocalDateTime.now().plusSeconds(plusSeconds); + } + @Override public void processRetryBatch(String channel) { // 从数据库查询需要重试的消息 @@ -224,8 +288,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl listAll(); + + /** + * 修改重试策略配置 + * + * @param strategyConfigVO + * @return + */ + boolean updateStrategyConfig(RetryStrategyConfigVO strategyConfigVO); + + boolean toggleEnableField(String id); +} diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java new file mode 100644 index 0000000..1635942 --- /dev/null +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/RetryStrategyConfigServiceImpl.java @@ -0,0 +1,46 @@ +package com.njcn.msgpush.module.push.service.retry; + +import cn.hutool.core.bean.BeanUtil; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO; +import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; +import com.njcn.msgpush.module.push.dal.mysql.retry.RetryStrategyConfigMapper; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author caozehui + * @data 2026-03-05 + */ +@Service +public class RetryStrategyConfigServiceImpl extends ServiceImpl implements RetryStrategyConfigService { + @Override + public RetryStrategyConfigDO getStrategyConfig(String channel) { + return this.lambdaQuery().eq(RetryStrategyConfigDO::getChannel, channel) + .eq(RetryStrategyConfigDO::getDeleted, false) + .eq(RetryStrategyConfigDO::getEnabled, true) + .one(); + } + + @Override + public List listAll() { + return this.lambdaQuery() + .eq(RetryStrategyConfigDO::getDeleted, false) + .eq(RetryStrategyConfigDO::getEnabled, true) + .list(); + } + + @Override + public boolean updateStrategyConfig(RetryStrategyConfigVO strategyConfigVO) { + RetryStrategyConfigDO retryStrategyConfigDO = BeanUtil.copyProperties(strategyConfigVO, RetryStrategyConfigDO.class); + return this.updateById(retryStrategyConfigDO); + } + + @Override + public boolean toggleEnableField(String id) { + RetryStrategyConfigDO retryStrategyConfigDO = this.getById(id); + retryStrategyConfigDO.setEnabled(retryStrategyConfigDO.getEnabled() ^ 0X0001); + return this.updateById(retryStrategyConfigDO); + } +}