发送频率限制数据访问层、服务层

This commit is contained in:
caozehui
2026-03-16 10:40:42 +08:00
parent 2fa61b350c
commit 5feae9e403
32 changed files with 437 additions and 219 deletions

View File

@@ -2,6 +2,8 @@ package com.njcn.msgpush.module.push.checker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import java.util.List;
/** /**
* @author caozehui * @author caozehui
* @data 2026-02-27 * @data 2026-02-27
@@ -9,5 +11,5 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq
*/ */
public interface IChecker { public interface IChecker {
boolean check(MessageRecordReqVO reqVO); void check(List<MessageRecordReqVO> messageRecordReqVOList);
} }

View File

@@ -23,20 +23,9 @@ public class MsgPushGuardChain {
this.checkers.add(new RateLimitChecker()); this.checkers.add(new RateLimitChecker());
} }
public boolean checkAll(MessageRecordReqVO reqVO) { public void checkAll(List<MessageRecordReqVO> messageRecordReqVOList) {
for (IChecker checker : checkers) { for (IChecker checker : checkers) {
boolean result = checker.check(reqVO); checker.check(messageRecordReqVOList);
if (!result) {
// 任何一层检查失败,立即返回拒绝
logRejection(reqVO);
return result;
}
} }
return true;
}
private void logRejection(MessageRecordReqVO reqVO) {
// 记录拒绝日志,用于监控和分析
System.out.printf("消息请求被拒绝: receiver=%s, messageId=%s, reason=%s%n", reqVO.getReceiver(), reqVO.getMessageId());
} }
} }

View File

@@ -2,15 +2,24 @@ package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.service.blacklist.BlacklistService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/** /**
* @author caozehui * @author caozehui
* @data 2026-02-27 * @data 2026-02-27
* @description 黑名单检查器 * @description 黑名单检查器
*/ */
@Component
public class BlacklistChecker implements IChecker { public class BlacklistChecker implements IChecker {
@Autowired
private BlacklistService blacklistService;
@Override @Override
public boolean check(MessageRecordReqVO reqVO) { public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
return true; blacklistService.check(messageRecordReqVOList);
} }
} }

View File

@@ -3,6 +3,8 @@ package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import java.util.List;
/** /**
* @author caozehui * @author caozehui
* @data 2026-02-27 * @data 2026-02-27
@@ -10,7 +12,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq
*/ */
public class QuotaChecker implements IChecker { public class QuotaChecker implements IChecker {
@Override @Override
public boolean check(MessageRecordReqVO reqVO) { public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
return true;
} }
} }

View File

@@ -4,6 +4,8 @@ package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker; import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import java.util.List;
/** /**
* @author caozehui * @author caozehui
* @data 2026-02-27 * @data 2026-02-27
@@ -11,7 +13,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq
*/ */
public class RateLimitChecker implements IChecker { public class RateLimitChecker implements IChecker {
@Override @Override
public boolean check(MessageRecordReqVO reqVO) { public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
return true;
} }
} }

View File

@@ -9,6 +9,8 @@ import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -47,4 +49,6 @@ public class Sender {
new ThreadPoolExecutor.CallerRunsPolicy() new ThreadPoolExecutor.CallerRunsPolicy()
); );
public final ScheduledExecutorService MSG_CALLBACK_THREAD_POOL_SCHEDULER = Executors.newScheduledThreadPool(5);
} }

View File

@@ -66,6 +66,8 @@ public class AliyunEmailSender implements EmailSender {
@Override @Override
public boolean sendEmail(MessageRecordDO message, Map<String, Object> params) { public boolean sendEmail(MessageRecordDO message, Map<String, Object> params) {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MessageStatusConstant.SENDING); message.setStatus(MessageStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions(); RuntimeOptions runtimeOptions = new RuntimeOptions();
runtimeOptions.autoretry = true; runtimeOptions.autoretry = true;
@@ -92,7 +94,7 @@ public class AliyunEmailSender implements EmailSender {
System.out.println(toJSONString(response)); System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) { if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setStatus(MessageStatusConstant.SUCCESS); message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true; return true;
@@ -108,7 +110,7 @@ public class AliyunEmailSender implements EmailSender {
} else { } else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
} }
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false; return false;

View File

@@ -22,9 +22,7 @@ import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.aliyun.teautil.Common.toJSONString; import static com.aliyun.teautil.Common.toJSONString;
@@ -42,8 +40,6 @@ public class AliyunSmsSender implements SmsSender {
private Client smsClient; private Client smsClient;
private ScheduledExecutorService scheduledExecutorService;
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) { public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) {
this.sender = sender; this.sender = sender;
if (ObjectUtil.isNotNull(aliYunSmsSetting)) { if (ObjectUtil.isNotNull(aliYunSmsSetting)) {
@@ -64,6 +60,8 @@ public class AliyunSmsSender implements SmsSender {
@Override @Override
public boolean sendSms(MessageRecordDO message) { public boolean sendSms(MessageRecordDO message) {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MessageStatusConstant.SENDING); message.setStatus(MessageStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions(); RuntimeOptions runtimeOptions = new RuntimeOptions();
// 设置自动重试默认是不开启的。重试次数默认是3次 // 设置自动重试默认是不开启的。重试次数默认是3次
@@ -95,7 +93,7 @@ public class AliyunSmsSender implements SmsSender {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.body.code); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.body.code);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){ if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
} }
} else { } else {
@@ -140,10 +138,7 @@ public class AliyunSmsSender implements SmsSender {
* @param message * @param message
*/ */
private void getDownInfo(String bizId, MessageRecordDO message) { private void getDownInfo(String bizId, MessageRecordDO message) {
if (ObjectUtil.isNull(this.scheduledExecutorService)) { this.sender.MSG_CALLBACK_THREAD_POOL_SCHEDULER.schedule(() -> {
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
}
this.scheduledExecutorService.schedule(() -> {
QuerySendDetailsRequest request = new QuerySendDetailsRequest() QuerySendDetailsRequest request = new QuerySendDetailsRequest()
.setPhoneNumber(message.getReceiver()) .setPhoneNumber(message.getReceiver())
.setBizId(bizId) .setBizId(bizId)
@@ -156,22 +151,23 @@ public class AliyunSmsSender implements SmsSender {
// if (response.statusCode != HttpStatus.OK.value()) { // if (response.statusCode != HttpStatus.OK.value()) {
response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> { response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> {
if (!"DELIVERED".equals(detail.errCode)) { if (!"DELIVERED".equals(detail.errCode)) {
message.setStatus(MessageStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode);
message.setErrorCode(detail.errCode); message.setErrorCode(detail.errCode);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){ if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
} }
} else { } else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
} }
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
} else { } else {
message.setStatus(MessageStatusConstant.SUCCESS); message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null); this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
} }
@@ -180,8 +176,6 @@ public class AliyunSmsSender implements SmsSender {
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.scheduledExecutorService.shutdown(); }, 20, TimeUnit.SECONDS);
this.scheduledExecutorService = null;
}, 10, TimeUnit.SECONDS);
} }
} }

View File

@@ -22,9 +22,7 @@ import java.time.format.DateTimeFormatter;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.alibaba.fastjson.JSON.toJSON; import static com.alibaba.fastjson.JSON.toJSON;
@@ -43,8 +41,6 @@ public class TelecomSmsSender implements SmsSender {
private TelecomSmsSetting telecomSmsSetting; private TelecomSmsSetting telecomSmsSetting;
private Sender sender; private Sender sender;
private ScheduledExecutorService scheduledExecutorService;
@Data @Data
private static class TelecomSmsSendResponse { private static class TelecomSmsSendResponse {
private String status; private String status;
@@ -87,6 +83,8 @@ public class TelecomSmsSender implements SmsSender {
@Override @Override
public boolean sendSms(MessageRecordDO message) { public boolean sendSms(MessageRecordDO message) {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> { Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MessageStatusConstant.SENDING); message.setStatus(MessageStatusConstant.SENDING);
// 构建请求参数 // 构建请求参数
Map<String, Object> request = new HashMap<>(); Map<String, Object> request = new HashMap<>();
@@ -143,7 +141,7 @@ public class TelecomSmsSender implements SmsSender {
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + ""); message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){ if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
} }
} else { } else {
@@ -239,10 +237,7 @@ public class TelecomSmsSender implements SmsSender {
* @param message * @param message
*/ */
private void getDownInfo(String mid, MessageRecordDO message) { private void getDownInfo(String mid, MessageRecordDO message) {
if (ObjectUtil.isNull(this.scheduledExecutorService)) { this.sender.MSG_CALLBACK_THREAD_POOL_SCHEDULER.schedule(() -> {
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
}
this.scheduledExecutorService.schedule(() -> {
// 构建请求参数 // 构建请求参数
Map<String, Object> request = new HashMap<>(); Map<String, Object> request = new HashMap<>();
request.put("action", "select"); request.put("action", "select");
@@ -266,48 +261,34 @@ public class TelecomSmsSender implements SmsSender {
); );
System.out.println(JSON.toJSONString(JSON.toJSONString(response))); System.out.println(JSON.toJSONString(JSON.toJSONString(response)));
// if (response.getStatusCode() == HttpStatus.OK) {
TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class); TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class);
TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0); TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0);
// if (StrUtil.isBlank(message.getErrorMsg())) { boolean b = Math.random() > 0.5;
// message.setStatus(MessageStatusConstant.FAILED);
// ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), "MA:0006");
// message.setErrorCode(telecomSmsSelectDetailRes.getStat());
// if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
// message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
// } else {
// message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
// }
// this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
// this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
// } else {
// message.setStatus(MessageStatusConstant.SUCCESS);
// }
if (telecomSmsSelectDetailRes.getStatus() == 5) { if (telecomSmsSelectDetailRes.getStatus() == 5) {
message.setStatus(MessageStatusConstant.FAILED); message.setStatus(MessageStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat()); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
message.setErrorCode(telecomSmsSelectDetailRes.getStat()); message.setErrorCode(telecomSmsSelectDetailRes.getStat());
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){ if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
} }
} else { } else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
//this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
} }
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg()); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
} }
if (telecomSmsSelectDetailRes.getStatus() == 4) { if (telecomSmsSelectDetailRes.getStatus() == 4) {
message.setStatus(MessageStatusConstant.SUCCESS); message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null); this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
} }
this.scheduledExecutorService.shutdown();
this.scheduledExecutorService = null; }, 20, TimeUnit.SECONDS);
}, 10, TimeUnit.SECONDS);
} }
/** /**

View File

@@ -8,13 +8,11 @@ import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO;
import com.njcn.msgpush.module.push.service.blacklist.BlacklistService; import com.njcn.msgpush.module.push.service.blacklist.BlacklistService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.List;
@@ -24,7 +22,10 @@ import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
* @author caozehui * @author caozehui
* @data 2026-03-12 * @data 2026-03-12
*/ */
@RestController("/push/blacklist") @Tag(name = "管理后台 - 黑名单")
@Validated
@RestController
@RequestMapping("/push/blacklist")
public class BlacklistController { public class BlacklistController {
@Autowired @Autowired
private BlacklistService blacklistService; private BlacklistService blacklistService;
@@ -41,21 +42,21 @@ public class BlacklistController {
@Operation(summary = "添加黑名单") @Operation(summary = "添加黑名单")
@PreAuthorize("@ss.hasPermission('push:blacklist:add')") @PreAuthorize("@ss.hasPermission('push:blacklist:add')")
public CommonResult<Boolean> add(@RequestBody BlacklistReqVO reqVO) { public CommonResult<Boolean> add(@RequestBody BlacklistReqVO reqVO) {
return CommonResult.success(blacklistService.save(BeanUtil.copyProperties(reqVO, BlacklistDO.class))); return success(blacklistService.save(BeanUtil.copyProperties(reqVO, BlacklistDO.class)));
} }
@PostMapping("/update") @PostMapping("/update")
@Operation(summary = "更新黑名单") @Operation(summary = "更新黑名单")
@PreAuthorize("@ss.hasPermission('push:blacklist:update')") @PreAuthorize("@ss.hasPermission('push:blacklist:update')")
public CommonResult<Boolean> update(@RequestBody BlacklistReqVO reqVO) { public CommonResult<Boolean> update(@RequestBody BlacklistReqVO reqVO) {
return CommonResult.success(blacklistService.updateById(BeanUtil.copyProperties(reqVO, BlacklistDO.class))); return success(blacklistService.updateById(BeanUtil.copyProperties(reqVO, BlacklistDO.class)));
} }
@PostMapping("/delete") @PostMapping("/delete")
@Operation(summary = "删除黑名单") @Operation(summary = "删除黑名单")
@PreAuthorize("@ss.hasPermission('push:blacklist:delete')") @PreAuthorize("@ss.hasPermission('push:blacklist:delete')")
@Parameter(name = "ids", description = "id列表", required = true) @Parameter(name = "ids", description = "id列表", required = true)
public CommonResult<Boolean> delete(@RequestParam("ids") List<String> ids){ public CommonResult<Boolean> delete(@RequestParam("ids") List<Long> ids) {
return CommonResult.success(blacklistService.removeByIds(ids)); return success(blacklistService.delete(ids));
} }
} }

View File

@@ -16,50 +16,29 @@ import lombok.Data;
@Schema(description = "管理后台 - 黑名单 Request VO") @Schema(description = "管理后台 - 黑名单 Request VO")
public class BlacklistReqVO extends PageParam { public class BlacklistReqVO extends PageParam {
/**
* 主键 ID
*/
@Schema(description = "主键 ID", example = "123444") @Schema(description = "主键 ID", example = "123444")
private Long id; private Long id;
/**
* 渠道类型sms/email/app_push
*/
@Schema(description = "渠道类型sms/email/app_push", example = "sms") @Schema(description = "渠道类型sms/email/app_push", example = "sms")
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;
/**
* 黑名单目标:手机号/邮箱/设备 Token
*/
@Schema(description = "黑名单目标:手机号/邮箱/设备 Token", example = "15601691000") @Schema(description = "黑名单目标:手机号/邮箱/设备 Token", example = "15601691000")
@NotBlank(message = "黑名单目标不能为空") @NotBlank(message = "黑名单目标不能为空")
private String target; private String target;
/**
* 加入原因:用户投诉/无效号码/频繁退订等
*/
@Schema(description = "加入原因:用户投诉/无效号码/频繁退订等", example = "用户投诉") @Schema(description = "加入原因:用户投诉/无效号码/频繁退订等", example = "用户投诉")
private String reason; private String reason;
/**
* 操作来源manual/auto/import
*/
@Schema(description = "操作来源manual/auto/import", example = "manual") @Schema(description = "操作来源manual/auto/import", example = "manual")
@InEnum(value = com.njcn.msgpush.module.push.enums.BlacklistSourceEnum.class, message = "操作来源必须是 {value}") @InEnum(value = com.njcn.msgpush.module.push.enums.BlacklistSourceEnum.class, message = "操作来源必须是 {value}")
private String source; private String source;
/**
* 过期时间:-1=永久,时间戳=到期时间
*/
@Schema(description = "过期时间:-1=永久,时间戳=到期时间", example = "15601691000") @Schema(description = "过期时间:-1=永久,时间戳=到期时间", example = "15601691000")
@NotNull(message = "过期时间不能为空") @NotNull(message = "过期时间不能为空")
@Min(value = -1L, message = "过期时间不能小于 -1") @Min(value = -1L, message = "过期时间不能小于 -1")
private Long expireTime; private Long expireTime;
/**
* 备注
*/
@Schema(description = "备注", example = "备注信息") @Schema(description = "备注", example = "备注信息")
private String remark; private String remark;
} }

View File

@@ -45,14 +45,13 @@ public class ChannelProviderConfigController {
@Operation(summary = "分页查询渠道服务商列表") @Operation(summary = "分页查询渠道服务商列表")
@PreAuthorize("@ss.hasPermission('push:channel:page')") @PreAuthorize("@ss.hasPermission('push:channel:page')")
public CommonResult<Page<ChannelProviderConfigDO>> pageChannelProviderConfig(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { public CommonResult<Page<ChannelProviderConfigDO>> pageChannelProviderConfig(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
Page<ChannelProviderConfigDO> res = channelProviderConfigService.getPage(reqVO); return success(channelProviderConfigService.getPage(reqVO));
return success(res);
} }
@PostMapping("/add") @PostMapping("/add")
@Operation(summary = "新增渠道服务商") @Operation(summary = "新增渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:add')") @PreAuthorize("@ss.hasPermission('push:channel:add')")
public CommonResult<Boolean> addChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { public CommonResult<Boolean> add(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
boolean res = channelProviderConfigService.save(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class)); boolean res = channelProviderConfigService.save(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class));
return success(res); return success(res);
} }
@@ -60,7 +59,7 @@ public class ChannelProviderConfigController {
@PostMapping("/update") @PostMapping("/update")
@Operation(summary = "更新渠道服务商") @Operation(summary = "更新渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:update')") @PreAuthorize("@ss.hasPermission('push:channel:update')")
public CommonResult<Boolean> updateChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) { public CommonResult<Boolean> update(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
boolean res = channelProviderConfigService.updateById(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class)); boolean res = channelProviderConfigService.updateById(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class));
return success(res); return success(res);
} }
@@ -69,7 +68,7 @@ public class ChannelProviderConfigController {
@Operation(summary = "删除渠道服务商") @Operation(summary = "删除渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:delete')") @PreAuthorize("@ss.hasPermission('push:channel:delete')")
@Parameter(name = "ids", description = "id列表", required = true) @Parameter(name = "ids", description = "id列表", required = true)
public CommonResult<Boolean> deleteChannelProvider(@RequestParam("ids") List<String> ids) { public CommonResult<Boolean> delete(@RequestParam("ids") List<String> ids) {
boolean res = channelProviderConfigService.removeBatchByIds(ids); boolean res = channelProviderConfigService.removeBatchByIds(ids);
return success(res); return success(res);
} }

View File

@@ -1,7 +1,9 @@
package com.njcn.msgpush.module.push.controller.admin.channel.vo; package com.njcn.msgpush.module.push.controller.admin.channel.vo;
import com.njcn.msgpush.framework.common.pojo.PageParam; import com.njcn.msgpush.framework.common.pojo.PageParam;
import com.njcn.msgpush.framework.common.validation.InEnum;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import lombok.Data; import lombok.Data;
@@ -11,8 +13,8 @@ public class ChannelProviderConfigReqVO extends PageParam {
@Schema(description = "渠道ID") @Schema(description = "渠道ID")
private String id; private String id;
@Schema(description = "渠道类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "sms/email/app_push") @Schema(description = "渠道类型sms/email/app_push", example = "sms")
@NotBlank(message = "渠道类型不能为空") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;
@Schema(description = "服务商名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "中国电信/阿里云/UniPush") @Schema(description = "服务商名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "中国电信/阿里云/UniPush")
@@ -20,18 +22,18 @@ public class ChannelProviderConfigReqVO extends PageParam {
private String providerName; private String providerName;
@Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio/unipush") @Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio/unipush")
@NotBlank(message = "服务商类型不能为空") @InEnum(value = com.njcn.msgpush.module.push.enums.ProviderTypeEnum.class, message = "服务商类型必须是 {value}")
private String providerType; private String providerType;
@Schema(description = "API地址", example = "https://api.example.com") @Schema(description = "API地址")
@NotBlank(message = "API地址不能为空") @NotBlank(message = "API地址不能为空")
private String apiUrl; private String apiUrl;
@Schema(description = "AppKey", example = "123456") @Schema(description = "AppKey")
@NotBlank(message = "AppKey不能为空") @NotBlank(message = "AppKey不能为空")
private String appKey; private String appKey;
@Schema(description = "AppSecret", example = "123456") @Schema(description = "AppSecret")
@NotBlank(message = "AppSecret不能为空") @NotBlank(message = "AppSecret不能为空")
private String appSecret; private String appSecret;
@@ -39,5 +41,6 @@ public class ChannelProviderConfigReqVO extends PageParam {
private String extraConfig; private String extraConfig;
@Schema(description = "优先级(数字越小优先级越高)", example = "1") @Schema(description = "优先级(数字越小优先级越高)", example = "1")
@Min(value = 1L, message = "优先级不能小于 1")
private Integer priority; private Integer priority;
} }

View File

@@ -4,7 +4,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent; import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.service.message.MessageRecordService; import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
@@ -12,19 +11,17 @@ import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.security.PermitAll; import jakarta.annotation.security.PermitAll;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.njcn.msgpush.framework.common.pojo.CommonResult.success; import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
@Tag(name = "管理后台 - 消息") @Tag(name = "管理后台 - 消息")
@Slf4j @Validated
@RestController @RestController
@RequestMapping("/push/message") @RequestMapping("/push/message")
public class MessageRecordController { public class MessageRecordController {
@@ -37,37 +34,35 @@ public class MessageRecordController {
@Operation(summary = "消息推送") @Operation(summary = "消息推送")
@Idempotent(timeout = 60) @Idempotent(timeout = 60)
public CommonResult<Boolean> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) { public CommonResult<Boolean> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
Boolean result = messageRecordService.send(reqVOList); return success(messageRecordService.send(reqVOList));
return CommonResult.success(result);
} }
@PostMapping("/page") @PostMapping("/page")
@Operation(summary = "分页查询渠道服务商列表") @Operation(summary = "分页查询渠道服务商列表")
@PreAuthorize("@ss.hasPermission('push:message:page')") @PreAuthorize("@ss.hasPermission('push:message:page')")
public CommonResult<Page<MessageRecordDO>> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) { public CommonResult<Page<MessageRecordDO>> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) {
Page<MessageRecordDO> res = messageRecordService.getPage(reqVO); return success(messageRecordService.getPage(reqVO));
return success(res);
} }
@PostMapping("/add") @PostMapping("/add")
@Operation(summary = "添加消息记录") @Operation(summary = "添加消息记录")
@PreAuthorize("@ss.hasPermission('push:message:add')") @PreAuthorize("@ss.hasPermission('push:message:add')")
public CommonResult<Boolean> add(@Validated @RequestBody MessageRecordReqVO reqVO) { public CommonResult<Boolean> add(@Validated @RequestBody MessageRecordReqVO reqVO) {
return messageRecordService.add(reqVO) ? success(true) : success(false); return success(messageRecordService.add(reqVO));
} }
@PostMapping("/update") @PostMapping("/update")
@Operation(summary = "更新消息记录") @Operation(summary = "更新消息记录")
@PreAuthorize("@ss.hasPermission('push:message:update')") @PreAuthorize("@ss.hasPermission('push:message:update')")
public CommonResult<Boolean> update(@Validated @RequestBody MessageRecordReqVO reqVO) { public CommonResult<Boolean> update(@Validated @RequestBody MessageRecordReqVO reqVO) {
return messageRecordService.update(reqVO) ? success(true) : success(false); return success(messageRecordService.update(reqVO));
} }
@PostMapping("/delete") @PostMapping("/delete")
@Operation(summary = "删除消息记录") @Operation(summary = "删除消息记录")
@PreAuthorize("@ss.hasPermission('push:message:delete')") @PreAuthorize("@ss.hasPermission('push:message:delete')")
@Parameter(name = "ids", description = "编号", required = true) @Parameter(name = "ids", description = "编号", required = true)
public CommonResult<Boolean> delete(@RequestParam("ids") List<String> ids) { public CommonResult<Boolean> delete(@RequestParam("ids") List<Long> ids) {
return messageRecordService.delete(ids) ? success(true) : success(false); return success(messageRecordService.delete(ids));
} }
} }

View File

@@ -2,6 +2,7 @@ package com.njcn.msgpush.module.push.controller.admin.message.vo;
import cn.hutool.core.lang.RegexPool; import cn.hutool.core.lang.RegexPool;
import com.njcn.msgpush.framework.common.pojo.PageParam; import com.njcn.msgpush.framework.common.pojo.PageParam;
import com.njcn.msgpush.framework.common.validation.InEnum;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern; import jakarta.validation.constraints.Pattern;
@@ -25,13 +26,12 @@ public class MessageRecordReqVO extends PageParam {
@NotBlank(message = "应用名称/来源系统标识不能为空") @NotBlank(message = "应用名称/来源系统标识不能为空")
private String appName; private String appName;
@Schema(description = "渠道类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "sms/email/app_push") @Schema(description = "渠道类型sms/email/app_push", example = "sms")
@NotBlank(message = "渠道类型不能为空") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;
@Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify") @Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify")
@NotBlank(message = "消息类型不能为空") @NotBlank(message = "消息类型不能为空")
// @InEnum(value = CommonStatusEnum.class,message = "消息类型错误")
private String messageType; private String messageType;
@Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED) @Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED)
@@ -52,7 +52,7 @@ public class MessageRecordReqVO extends PageParam {
private String templateParams; private String templateParams;
@Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio") @Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio")
@NotBlank(message = "服务商类型不能为空") @InEnum(value = com.njcn.msgpush.module.push.enums.ProviderTypeEnum.class, message = "服务商类型必须是 {value}")
private String providerType; private String providerType;
@Schema(description = "第三方消息ID") @Schema(description = "第三方消息ID")

View File

@@ -0,0 +1,65 @@
package com.njcn.msgpush.module.push.controller.admin.ratelimit;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.log.Log;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO;
import com.njcn.msgpush.module.push.service.ratelimit.RateLimitConfigService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
/**
* @author caozehui
* @data 2026-03-13
*/
@Tag(name = "管理后台 - 单人配额")
@Validated
@RestController
@RequestMapping("/push/ratelimit")
public class RateLimitConfigController {
@Autowired
private RateLimitConfigService rateLimitConfigService;
@PostMapping("/page")
@Operation(summary = "分页单人配额列表")
@PreAuthorize("@ss.hasPermission('push:ratelimit:page')")
public CommonResult<Page<RateLimitConfigDO>> pageRateLimitConfig(@Validated @RequestBody RateLimitConfigReqVO reqVO) {
Page<RateLimitConfigDO> res = rateLimitConfigService.getPage(reqVO);
return success(res);
}
@PostMapping("/add")
@Operation(summary = "添加单人配额")
@PreAuthorize("@ss.hasPermission('push:ratelimit:add')")
public CommonResult<Boolean> add(@Validated @RequestBody RateLimitConfigReqVO reqVO) {
boolean res = rateLimitConfigService.save(BeanUtil.copyProperties(reqVO, RateLimitConfigDO.class));
return success(res);
}
@PostMapping("/update")
@Operation(summary = "更新单人配额")
@PreAuthorize("@ss.hasPermission('push:ratelimit:update')")
public CommonResult<Boolean> update(@Validated @RequestBody RateLimitConfigReqVO reqVO) {
boolean res = rateLimitConfigService.updateById(BeanUtil.copyProperties(reqVO, RateLimitConfigDO.class));
return success(res);
}
@PostMapping("/delete")
@Operation(summary = "删除单人配额")
@PreAuthorize("@ss.hasPermission('push:ratelimit:delete')")
@Parameter(name = "ids", description = "id列表", required = true)
public CommonResult<Boolean> delete(@RequestParam List<Long> ids) {
return success(rateLimitConfigService.delete(ids));
}
}

View File

@@ -0,0 +1,37 @@
package com.njcn.msgpush.module.push.controller.admin.ratelimit.VO;
import com.njcn.msgpush.framework.common.pojo.PageParam;
import com.njcn.msgpush.framework.common.validation.InEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* @author caozehui
* @data 2026-03-13
*/
@Data
@Schema(description = "管理后台 - 限流配置 Request VO")
public class RateLimitConfigReqVO extends PageParam {
@Schema(description = "主键 ID")
private Long id;
@Schema(description = "渠道类型sms/email/app_push", example = "sms")
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel;
@Schema(description = "来源系统", example = "NPQS-9000")
@NotBlank(message = "来源系统不能为空")
private String appName;
@Schema(description = "单接收者每日上限", example = "100")
@NotNull(message = "单接收者每日上限不能为空")
@Min(value = 1L, message = "单接收者每日上限不能小于 1")
private Integer dailyLimit;
@Schema(description = "是否启用0-否 1-是", example = "1")
@NotNull(message = "是否启用不能为空")
private Boolean enabled;
}

View File

@@ -3,7 +3,7 @@ package com.njcn.msgpush.module.push.controller.admin.retry;
import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.common.pojo.PageResult; import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO; import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO; import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService; import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
@@ -11,7 +11,6 @@ import com.njcn.msgpush.module.push.service.retry.RetryStrategyConfigService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
@@ -26,7 +25,6 @@ import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
* @data 2026-02-27 * @data 2026-02-27
*/ */
@Tag(name = "管理后台 - 消息重试") @Tag(name = "管理后台 - 消息重试")
@Slf4j
@Validated @Validated
@RestController @RestController
@RequestMapping("/push/retry") @RequestMapping("/push/retry")
@@ -64,7 +62,7 @@ public class MessageRetryQueueController {
@PostMapping("/config/update") @PostMapping("/config/update")
@Operation(summary = "更新消息重试配置") @Operation(summary = "更新消息重试配置")
@PreAuthorize("@ss.hasPermission('push:retry:config:update')") @PreAuthorize("@ss.hasPermission('push:retry:config:update')")
public CommonResult<Boolean> updateRetryConfig(@Validated @RequestBody RetryStrategyConfigVO retryStrategyConfigVO) { public CommonResult<Boolean> updateRetryConfig(@Validated @RequestBody RetryStrategyConfigReqVO retryStrategyConfigVO) {
return success(retryStrategyConfigService.updateStrategyConfig(retryStrategyConfigVO)); return success(retryStrategyConfigService.updateStrategyConfig(retryStrategyConfigVO));
} }

View File

@@ -1,6 +1,8 @@
package com.njcn.msgpush.module.push.controller.admin.retry.vo; package com.njcn.msgpush.module.push.controller.admin.retry.vo;
import com.njcn.msgpush.framework.common.validation.InEnum;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import lombok.Data; import lombok.Data;
@@ -11,29 +13,19 @@ import lombok.Data;
*/ */
@Data @Data
@Schema(description = "管理后台 - 重试策略配置 Request VO") @Schema(description = "管理后台 - 重试策略配置 Request VO")
public class RetryStrategyConfigVO { public class RetryStrategyConfigReqVO {
/**
* 主键 ID
*/
@Schema(description = "主键 ID") @Schema(description = "主键 ID")
private Long id; private Long id;
/**
* 渠道类型sms/email/app_push @Schema(description = "渠道类型sms/email/app_push", example = "sms")
*/ @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
@Schema(description = "渠道类型sms/email/app_push")
@NotBlank(message = "渠道类型不能为空")
private String channel; private String channel;
/**
* 最大重试次数
*/
@Schema(description = "最大重试次数") @Schema(description = "最大重试次数")
@NotNull(message = "最大重试次数不能为空") @NotNull(message = "最大重试次数不能为空")
@Min(value = 1, message = "最大重试次数不能小于0")
private Integer maxRetryCount; private Integer maxRetryCount;
/**
* 重试间隔逗号分隔300,600,1800
*/
@Schema(description = "重试间隔逗号分隔300,600,1800") @Schema(description = "重试间隔逗号分隔300,600,1800")
@NotBlank(message = "重试间隔不能为空") @NotBlank(message = "重试间隔不能为空")
private String retryIntervals; private String retryIntervals;

View File

@@ -0,0 +1,38 @@
package com.njcn.msgpush.module.push.dal.dataobject.ratelimit;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
import lombok.Data;
/**
* @author caozehui
* @data 2026-03-13
*/
@Data
@TableName("push_rate_limit_config")
public class RateLimitConfigDO extends BaseDO {
/**
* 主键 ID
*/
private Long id;
/**
* 渠道类型sms/email/app_push
*/
private String channel;
/**
* 来源系统
*/
private String appName;
/**
* 单接收者每日上限
*/
private Integer dailyLimit;
/**
* 是否启用0-否 1-是
*/
private Boolean enabled;
}

View File

@@ -5,7 +5,9 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
@Mapper @Mapper
public interface MessageRecordMapper extends BaseMapperX<MessageRecordDO> { public interface MessageRecordMapper extends BaseMapperX<MessageRecordDO> {
int updateStatusAndErrorInfo(@Param("messageId") String messageId, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg); int updateStatusAndErrorInfo(@Param("messageId") String messageId, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg,@Param("lastRetryTime") LocalDateTime lastRetryTime, @Param("nextRetryTime") LocalDateTime nextRetryTime);
} }

View File

@@ -5,9 +5,11 @@
<update id="updateStatusAndErrorInfo"> <update id="updateStatusAndErrorInfo">
UPDATE push_message_record UPDATE push_message_record
SET status = #{status}, SET status = #{status},
error_code = #{errorCode}, error_code = #{errorCode},
error_msg = #{errorMsg} error_msg = #{errorMsg},
last_retry_time= #{lastRetryTime},
next_retry_time= #{nextRetryTime}
WHERE message_id = #{messageId} WHERE message_id = #{messageId}
</update> </update>
</mapper> </mapper>

View File

@@ -0,0 +1,13 @@
package com.njcn.msgpush.module.push.dal.mysql.ratelimit;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO;
import org.apache.ibatis.annotations.Mapper;
/**
* @author caozehui
* @data 2026-03-13
*/
@Mapper
public interface RateLimitConfigMapper extends BaseMapperX<RateLimitConfigDO> {
}

View File

@@ -3,12 +3,24 @@ package com.njcn.msgpush.module.push.service.blacklist;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO; import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO;
import java.util.List;
/** /**
* @author caozehui * @author caozehui
* @data 2026-03-12 * @data 2026-03-12
*/ */
public interface BlacklistService extends IService<BlacklistDO> { public interface BlacklistService extends IService<BlacklistDO> {
Page<BlacklistDO> getPage(BlacklistReqVO reqVO); Page<BlacklistDO> getPage(BlacklistReqVO reqVO);
Boolean delete(List<Long> ids);
/**
* 进行黑名单检查
*
* @param messageRecordReqVOList
*/
void check(List<MessageRecordReqVO> messageRecordReqVOList);
} }

View File

@@ -1,15 +1,20 @@
package com.njcn.msgpush.module.push.service.blacklist; package com.njcn.msgpush.module.push.service.blacklist;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils; import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO; import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO; import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.blacklist.BlacklistMapper; import com.njcn.msgpush.module.push.dal.mysql.blacklist.BlacklistMapper;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/** /**
* @author caozehui * @author caozehui
* @data 2026-03-12 * @data 2026-03-12
@@ -22,4 +27,31 @@ public class BlacklistServiceImpl extends ServiceImpl<BlacklistMapper, Blacklist
wrapper.lambda().eq(BlacklistDO::getChannel, reqVO.getChannel()); wrapper.lambda().eq(BlacklistDO::getChannel, reqVO.getChannel());
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
} }
@Override
public Boolean delete(List<Long> ids) {
return this.lambdaUpdate().in(BlacklistDO::getId, ids).set(BlacklistDO::getDeleted, true).update();
}
@Override
public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
List<BlacklistDO> exists = new ArrayList<>();
for (int i = messageRecordReqVOList.size() - 1; i >= 0; i--) {
MessageRecordReqVO messageRecordReqVO = messageRecordReqVOList.get(i);
String receiver = messageRecordReqVO.getReceiver();
String[] split = receiver.split(String.valueOf(StrUtil.C_COMMA));
BlacklistDO blacklistDO = this.lambdaQuery()
.eq(BlacklistDO::getChannel, messageRecordReqVO.getChannel())
.in(split.length > 0, BlacklistDO::getTarget, split)
.eq(BlacklistDO::getDeleted, false).one();
if (ObjectUtil.isNotNull(blacklistDO)) {
messageRecordReqVOList.remove(i);
blacklistDO.setHitCount(blacklistDO.getHitCount() + 1);
exists.add(blacklistDO);
}
}
// 更新
this.updateBatchById(exists);
}
} }

View File

@@ -5,6 +5,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReq
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@@ -42,18 +43,7 @@ public interface MessageRecordService {
boolean update(MessageRecordReqVO reqVO); boolean update(MessageRecordReqVO reqVO);
boolean delete(List<String> ids); boolean delete(List<Long> ids);
/**
* 更新消息记录状态
*
* @param messageId
* @param status
* @param errorCode
* @param errorMsg
* @return
*/
boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg);
/** /**
* 更新消息记录重试次数 * 更新消息记录重试次数
@@ -62,4 +52,6 @@ public interface MessageRecordService {
* @return * @return
*/ */
boolean updateRetryCount(String messageId); boolean updateRetryCount(String messageId);
boolean updateMessage(MessageRecordDO messageRecordDO);
} }

View File

@@ -25,6 +25,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -82,10 +83,10 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id"); MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
if (sendResult) { if (sendResult) {
this.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.SUCCESS, null, null); this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MessageStatusConstant.SUCCESS); messageRetryHistoryDO.setStatus(MessageStatusConstant.SUCCESS);
} else { } else {
this.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg()); this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MessageStatusConstant.FAILED); messageRetryHistoryDO.setStatus(MessageStatusConstant.FAILED);
} }
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId())); messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
@@ -94,10 +95,10 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
return sendResult; return sendResult;
} }
@Override @Override
@Transactional(rollbackFor = Exception.class) public boolean updateMessage(MessageRecordDO messageRecordDO) {
public boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg) { return this.updateById(messageRecordDO);
return this.baseMapper.updateStatusAndErrorInfo(messageId, status, errorCode, errorMsg) > 0;
} }
@Override @Override
@@ -140,7 +141,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
} }
@Override @Override
public boolean delete(List<String> ids) { public boolean delete(List<Long> ids) {
return this.lambdaUpdate().set(MessageRecordDO::getDeleted, false).update(); return this.lambdaUpdate().in(MessageRecordDO::getId, ids).set(MessageRecordDO::getDeleted, false).update();
} }
} }

View File

@@ -0,0 +1,19 @@
package com.njcn.msgpush.module.push.service.ratelimit;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO;
import java.util.List;
/**
* @author caozehui
* @data 2026-03-13
*/
public interface RateLimitConfigService extends IService<RateLimitConfigDO> {
Page<RateLimitConfigDO> getPage(RateLimitConfigReqVO reqVO);
boolean delete(List<Long> ids);
}

View File

@@ -0,0 +1,35 @@
package com.njcn.msgpush.module.push.service.ratelimit;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.ratelimit.RateLimitConfigMapper;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author caozehui
* @data 2026-03-13
*/
@Service
public class RateLimitConfigServiceImpl extends ServiceImpl<RateLimitConfigMapper, RateLimitConfigDO> implements RateLimitConfigService {
@Override
public Page<RateLimitConfigDO> getPage(RateLimitConfigReqVO reqVO) {
QueryWrapper<RateLimitConfigDO> wrapper = new QueryWrapper<>();
wrapper.lambda().eq(StrUtil.isNotBlank(reqVO.getChannel()), RateLimitConfigDO::getChannel, reqVO.getChannel());
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
}
@Override
public boolean delete(List<Long> ids) {
return this.lambdaUpdate()
.set(RateLimitConfigDO::getDeleted, true)
.in(RateLimitConfigDO::getId, ids)
.update();
}
}

View File

@@ -25,9 +25,6 @@ import java.time.ZoneId;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Service @Service
@@ -41,21 +38,20 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Autowired @Autowired
public RetryStrategyConfigService retryStrategyConfigService; public RetryStrategyConfigService retryStrategyConfigService;
// public final ThreadPoolExecutor MSG_RETRY_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
public final ThreadPoolExecutor MSG_RETRY_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor( // 3,
5, // 3,
5, // 1000,
1000, // TimeUnit.MILLISECONDS,
TimeUnit.MILLISECONDS, // new java.util.concurrent.ArrayBlockingQueue<>(1000),
new java.util.concurrent.ArrayBlockingQueue<>(1000), // r -> {
r -> { // Thread thread = new Thread(r);
Thread thread = new Thread(r); // thread.setName("msgRetryThreadPool-" + thread.getId());
thread.setName("msgRetryThreadPool-" + thread.getId()); // thread.setDaemon(false);
thread.setDaemon(false); // return thread;
return thread; // },
}, // new ThreadPoolExecutor.CallerRunsPolicy()
new ThreadPoolExecutor.CallerRunsPolicy() // );
);
/** /**
* 默认每次处理的消息数量 * 默认每次处理的消息数量
@@ -80,6 +76,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>() new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, message.getMessageId()) .eq(MessageRetryQueueDO::getMessageId, message.getMessageId())
); );
message.setLastRetryTime(message.getSendTime());
// 消息不在重试队列中 // 消息不在重试队列中
if (ObjectUtil.isNull(existing)) { if (ObjectUtil.isNull(existing)) {
@@ -249,34 +246,49 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
*/ */
private void processSingleRetry(MessageRecordDO messageRecordDO) { private void processSingleRetry(MessageRecordDO messageRecordDO) {
// 异步调用消息发送接口进行重试 // 异步调用消息发送接口进行重试
CompletableFuture<Boolean> sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR); //CompletableFuture<Boolean> sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR);
sendFuture.orTimeout(5, TimeUnit.SECONDS) // sendFuture.orTimeout(5, TimeUnit.SECONDS)
.thenAccept(sendResult -> { // .thenAccept(sendResult -> {
messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); // messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
if (sendResult) { // if (sendResult) {
log.info("处理消息重试成功逻辑messageId={}", messageRecordDO.getMessageId()); // log.info("处理消息重试成功逻辑messageId={}", messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); // messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); // this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
// todo 重试成功后是否要删除retry_history表中的数据 //
// } else {
// // 重试失败,更新重试信息
// log.error("处理消息重试失败逻辑messageId={}", messageRecordDO.getMessageId());
// channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
// handleRetryFailure(messageRecordDO);
// }
// }).exceptionally(ex -> {
// messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
// log.error("异步执行消息重试发生异常messageId={}", messageRecordDO.getMessageId(), ex);
// // 发生异常时也尝试处理失败逻辑,避免消息丢失
// try {
// handleRetryFailure(messageRecordDO);
// } catch (Exception e) {
// log.error("异常处理后再次失败messageId={}", messageRecordDO.getMessageId(), e);
// }
// return null;
// });
} else { boolean sendResult = messageRecordService.processSendMsg(messageRecordDO);
// 重试失败,更新重试信息
log.error("处理消息重试失败逻辑messageId={}", messageRecordDO.getMessageId()); messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); if (sendResult) {
handleRetryFailure(messageRecordDO); log.info("处理消息重试成功逻辑messageId={}", messageRecordDO.getMessageId());
} messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
}).exceptionally(ex -> { this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
log.error("异步执行消息重试发生异常messageId={}", messageRecordDO.getMessageId(), ex); } else {
// 发生异常时也尝试处理失败逻辑,避免消息丢失 // 重试失败,更新重试信息
try { log.error("处理消息重试失败逻辑messageId={}", messageRecordDO.getMessageId());
handleRetryFailure(messageRecordDO); channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
} catch (Exception e) { handleRetryFailure(messageRecordDO);
log.error("异常处理后再次失败messageId={}", messageRecordDO.getMessageId(), e); }
} messageRecordService.updateMessage(messageRecordDO);
return null;
});
} }
/** /**
@@ -296,10 +308,13 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
int newRetryCount = messageRetryQueueDO.getRetryCount() + 1; int newRetryCount = messageRetryQueueDO.getRetryCount() + 1;
if (newRetryCount >= messageRetryQueueDO.getMaxRetry()) { if (newRetryCount >= messageRetryQueueDO.getMaxRetry()) {
messageRecordDO.setStatus(MessageStatusConstant.FINALFAILED);
messageRecordDO.setNextRetryTime(null);
// 达到最大重试次数,标记为最终失败 // 达到最大重试次数,标记为最终失败
// 更新消息的状态为final_failed // 更新消息的状态为final_failed
messageRecordService.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg()); //messageRecordService.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg(), messageRecordDO.getSendTime(), null);
// 数据库中不能删除 // 数据库中不能删除
// retryQueueMapper.deleteByMessageId(messageRecordDO.getMessageId()); // retryQueueMapper.deleteByMessageId(messageRecordDO.getMessageId());
@@ -308,6 +323,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
} else { } else {
// 还可以继续重试,更新重试信息 // 还可以继续重试,更新重试信息
LocalDateTime nextRetryTime = this.calculateNextRetryTime(messageRecordDO.getChannel(), newRetryCount); LocalDateTime nextRetryTime = this.calculateNextRetryTime(messageRecordDO.getChannel(), newRetryCount);
messageRecordDO.setStatus(MessageStatusConstant.FAILED);
messageRecordDO.setNextRetryTime(nextRetryTime);
this.updateRetryInfo( this.updateRetryInfo(
messageRecordDO.getMessageId(), messageRecordDO.getMessageId(),

View File

@@ -1,7 +1,7 @@
package com.njcn.msgpush.module.push.service.retry; package com.njcn.msgpush.module.push.service.retry;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO; import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import java.util.List; import java.util.List;
@@ -10,7 +10,7 @@ import java.util.List;
* @author caozehui * @author caozehui
* @data 2026-03-05 * @data 2026-03-05
*/ */
public interface RetryStrategyConfigService { public interface RetryStrategyConfigService {
/** /**
* 获得指定渠道的重试策略 * 获得指定渠道的重试策略
* *
@@ -21,6 +21,7 @@ public interface RetryStrategyConfigService {
/** /**
* 获得所有重试策略 * 获得所有重试策略
*
* @return * @return
*/ */
List<RetryStrategyConfigDO> listAll(); List<RetryStrategyConfigDO> listAll();
@@ -28,10 +29,10 @@ public interface RetryStrategyConfigService {
/** /**
* 修改重试策略配置 * 修改重试策略配置
* *
* @param strategyConfigVO * @param strategyConfigReqVO
* @return * @return
*/ */
boolean updateStrategyConfig(RetryStrategyConfigVO strategyConfigVO); boolean updateStrategyConfig(RetryStrategyConfigReqVO strategyConfigReqVO);
boolean toggleEnableField(String id); boolean toggleEnableField(String id);
} }

View File

@@ -2,7 +2,7 @@ package com.njcn.msgpush.module.push.service.retry;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigVO; import com.njcn.msgpush.module.push.controller.admin.retry.vo.RetryStrategyConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.RetryStrategyConfigMapper; import com.njcn.msgpush.module.push.dal.mysql.retry.RetryStrategyConfigMapper;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -32,8 +32,8 @@ public class RetryStrategyConfigServiceImpl extends ServiceImpl<RetryStrategyCon
} }
@Override @Override
public boolean updateStrategyConfig(RetryStrategyConfigVO strategyConfigVO) { public boolean updateStrategyConfig(RetryStrategyConfigReqVO strategyConfigReqVO) {
RetryStrategyConfigDO retryStrategyConfigDO = BeanUtil.copyProperties(strategyConfigVO, RetryStrategyConfigDO.class); RetryStrategyConfigDO retryStrategyConfigDO = BeanUtil.copyProperties(strategyConfigReqVO, RetryStrategyConfigDO.class);
return this.updateById(retryStrategyConfigDO); return this.updateById(retryStrategyConfigDO);
} }