结构调整

This commit is contained in:
caozehui
2026-03-03 10:33:03 +08:00
parent c7f3687bf8
commit 8b4a7967cc
57 changed files with 1747 additions and 323 deletions

View File

@@ -18,10 +18,10 @@
<dependencies>
<!-- Spring Cloud 基础 -->
<dependency>
<groupId>com.njcn</groupId>
<artifactId>msgpush-spring-boot-starter-env</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.njcn</groupId>-->
<!-- <artifactId>msgpush-spring-boot-starter-env</artifactId>-->
<!-- </dependency>-->
<!-- 依赖服务 -->
<dependency>
@@ -119,6 +119,13 @@
<version>4.2.0</version>
</dependency>
<!-- 个推 -->
<dependency>
<groupId>com.getui.push</groupId>
<artifactId>restful-sdk</artifactId>
<version>1.0.0.1</version>
</dependency>
</dependencies>
<build>

View File

@@ -1,20 +1,21 @@
package com.njcn.msgpush.module.push;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
/**
* 项目的启动类
*
* @author hongawen
*/
@ComponentScan(basePackages = {"com.njcn.msgpush.module.push", "com.njcn.msgpush.framework.mybatis"})
@MapperScan("com.njcn.msgpush.module.*.dal.mysql")
@SpringBootApplication
public class PushServerApplication {
public static void main(String[] args) {
SpringApplication.run(PushServerApplication.class, args);
}
}

View File

@@ -1,6 +1,6 @@
package com.njcn.msgpush.module.push.checker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
/**
* @author caozehui
@@ -9,5 +9,5 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSen
*/
public interface IChecker {
boolean check(MessageRecordSendReqVO reqVO);
boolean check(MessageRecordReqVO reqVO);
}

View File

@@ -4,7 +4,7 @@ import com.njcn.msgpush.module.push.checker.impl.BlacklistChecker;
import com.njcn.msgpush.module.push.checker.impl.IdempotencyChecker;
import com.njcn.msgpush.module.push.checker.impl.QuotaChecker;
import com.njcn.msgpush.module.push.checker.impl.RateLimitChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import java.util.ArrayList;
import java.util.List;
@@ -25,7 +25,7 @@ public class MsgPushGuardChain {
this.checkers.add(new RateLimitChecker());
}
public boolean checkAll(MessageRecordSendReqVO reqVO) {
public boolean checkAll(MessageRecordReqVO reqVO) {
for (IChecker checker : checkers) {
boolean result = checker.check(reqVO);
if (!result) {
@@ -37,7 +37,7 @@ public class MsgPushGuardChain {
return true;
}
private void logRejection(MessageRecordSendReqVO reqVO) {
private void logRejection(MessageRecordReqVO reqVO) {
// 记录拒绝日志,用于监控和分析
System.out.printf("消息请求被拒绝: receiver=%s, messageId=%s, reason=%s%n", reqVO.getReceiver(), reqVO.getMessageId());
}

View File

@@ -1,7 +1,7 @@
package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
/**
* @author caozehui
@@ -10,7 +10,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSen
*/
public class BlacklistChecker implements IChecker {
@Override
public boolean check(MessageRecordSendReqVO reqVO) {
public boolean check(MessageRecordReqVO reqVO) {
return true;
}
}

View File

@@ -1,7 +1,7 @@
package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
/**
* @author caozehui
@@ -10,7 +10,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSen
*/
public class IdempotencyChecker implements IChecker {
@Override
public boolean check(MessageRecordSendReqVO reqVO) {
public boolean check(MessageRecordReqVO reqVO) {
return true;
}
}

View File

@@ -1,7 +1,7 @@
package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
/**
* @author caozehui
@@ -10,7 +10,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSen
*/
public class QuotaChecker implements IChecker {
@Override
public boolean check(MessageRecordSendReqVO reqVO) {
public boolean check(MessageRecordReqVO reqVO) {
return true;
}
}

View File

@@ -2,7 +2,7 @@ package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
/**
* @author caozehui
@@ -11,7 +11,7 @@ import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSen
*/
public class RateLimitChecker implements IChecker {
@Override
public boolean check(MessageRecordSendReqVO reqVO) {
public boolean check(MessageRecordReqVO reqVO) {
return true;
}
}

View File

@@ -2,13 +2,20 @@ package com.njcn.msgpush.module.push.client;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.AliyunProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.TelecomProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.UniPushProviderFactory;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.channel.ChannelProviderConfigMapper;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import java.util.HashMap;
import java.util.List;
@@ -21,10 +28,6 @@ import java.util.Map;
@Configuration
public class ClientConfiguration {
@Autowired
private RestTemplateUtil restTemplateUtil;
@Autowired
private ChannelProviderConfigService channelProviderConfigService;
@@ -41,12 +44,12 @@ public class ClientConfiguration {
}
break;
case MsgPushConstant.PROVIDER_TYPE_TELECOM: {
MessageProviderFactory orDefault = messageProviderFactoryMap.getOrDefault(MsgPushConstant.PROVIDER_TYPE_TELECOM, new AliyunProviderFactory());
MessageProviderFactory orDefault = messageProviderFactoryMap.getOrDefault(MsgPushConstant.PROVIDER_TYPE_TELECOM, new TelecomProviderFactory());
messageProviderFactoryMap.put(MsgPushConstant.PROVIDER_TYPE_TELECOM, orDefault);
}
break;
case MsgPushConstant.PROVIDER_TYPE_UNI_PUSH: {
MessageProviderFactory orDefault = messageProviderFactoryMap.getOrDefault(MsgPushConstant.PROVIDER_TYPE_UNI_PUSH, new AliyunProviderFactory());
MessageProviderFactory orDefault = messageProviderFactoryMap.getOrDefault(MsgPushConstant.PROVIDER_TYPE_UNI_PUSH, new UniPushProviderFactory());
messageProviderFactoryMap.put(MsgPushConstant.PROVIDER_TYPE_UNI_PUSH, orDefault);
}
break;

View File

@@ -2,6 +2,7 @@ package com.njcn.msgpush.module.push.client.factory;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
@@ -13,15 +14,15 @@ public interface MessageProviderFactory {
/**
* 创建短信发送器
*/
SmsSender createSmsSender(ChannelProviderConfigDO config);
SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender);
/**
* 创建邮件发送器
*/
EmailSender createEmailSender(ChannelProviderConfigDO config) throws Exception;
EmailSender createEmailSender(ChannelProviderConfigDO config,Sender sender);
/**
* 创建APP推送发送器
*/
AppPushSender createAppPushSender(ChannelProviderConfigDO config);
AppPushSender createAppPushSender(ChannelProviderConfigDO config,Sender sender);
}

View File

@@ -3,6 +3,7 @@ package com.njcn.msgpush.module.push.client.factory.impl;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.AliyunEmailSender;
import com.njcn.msgpush.module.push.client.sender.impl.AliyunSmsSender;
@@ -16,31 +17,33 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class AliyunProviderFactory implements MessageProviderFactory {
@Override
public SmsSender createSmsSender(ChannelProviderConfigDO config) {
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
AliYunMailSetting aliYunSmsSetting = AliYunMailSetting.builder()
.accessKeyId(config.getAppKey())
.accessKeySecret(config.getAppSecret())
.regionId("cn-hangzhou")
.endpoint("dysmsapi.aliyuncs.com")
.build();
return new AliyunSmsSender(aliYunSmsSetting);
return new AliyunSmsSender(aliYunSmsSetting, sender);
}
@Override
public EmailSender createEmailSender(ChannelProviderConfigDO config) throws Exception {
public EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender) {
AliYunMailSetting aliYunSmsSetting = AliYunMailSetting.builder()
.accessKeyId(config.getAppKey())
.accessKeySecret(config.getAppSecret())
.regionId("cn-hangzhou")
.endpoint("dm.aliyuncs.com")
.build();
return new AliyunEmailSender(aliYunSmsSetting);
return new AliyunEmailSender(aliYunSmsSetting, sender);
}
@Override
public AppPushSender createAppPushSender(ChannelProviderConfigDO config) {
public AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender) {
log.error("阿里云暂不支持app推送");
return null;
}

View File

@@ -3,11 +3,11 @@ package com.njcn.msgpush.module.push.client.factory.impl;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.TelecomSmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import lombok.extern.slf4j.Slf4j;
/**
@@ -16,28 +16,27 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class TelecomProviderFactory implements MessageProviderFactory {
private RestTemplateUtil restTemplateUtil;
public TelecomProviderFactory(RestTemplateUtil restTemplateUtil) {
this.restTemplateUtil = restTemplateUtil;
}
@Override
public SmsSender createSmsSender(ChannelProviderConfigDO config) {
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
TelecomSmsSetting telecomSmsSetting = TelecomSmsSetting.builder()
.account(config.getAppKey())
.password(config.getAppSecret())
.apiUrl(config.getApiUrl())
.extno(config.getExtno())
.build();
return new TelecomSmsSender(telecomSmsSetting, restTemplateUtil);
return new TelecomSmsSender(telecomSmsSetting, sender);
}
@Override
public EmailSender createEmailSender(ChannelProviderConfigDO config) throws Exception {
public EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender) {
log.error("电信暂不支持email推送");
return null;
}
@Override
public AppPushSender createAppPushSender(ChannelProviderConfigDO config) {
public AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender) {
log.error("电信暂不支持app推送");
return null;
}

View File

@@ -3,7 +3,10 @@ package com.njcn.msgpush.module.push.client.factory.impl;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.UniPushAppPushSender;
import com.njcn.msgpush.module.push.client.setting.impl.UniPushAppPushSetting;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import lombok.extern.slf4j.Slf4j;
@@ -13,21 +16,27 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class UniPushProviderFactory implements MessageProviderFactory {
@Override
public SmsSender createSmsSender(ChannelProviderConfigDO config) {
public SmsSender createSmsSender(ChannelProviderConfigDO config,Sender sender) {
log.error("uniPush暂不支持短信推送");
return null;
}
@Override
public EmailSender createEmailSender(ChannelProviderConfigDO config) throws Exception {
public EmailSender createEmailSender(ChannelProviderConfigDO config,Sender sender) {
log.error("uniPush暂不支持email推送");
return null;
}
@Override
public AppPushSender createAppPushSender(ChannelProviderConfigDO config) {
log.error("阿里云暂不支持app推送");
return null;
public AppPushSender createAppPushSender(ChannelProviderConfigDO config,Sender sender) {
UniPushAppPushSetting uniPushAppPushSetting = UniPushAppPushSetting.builder()
.appId("")
.appKey(config.getAppKey())
.uniAppSecret(config.getAppSecret())
.masterSecret("")
.build();
return new UniPushAppPushSender(uniPushAppPushSetting, sender);
}
}

View File

@@ -8,5 +8,9 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
*/
public interface AppPushSender {
/**
* @param message 消息
* @return 发送结果
*/
boolean appPush(MessageRecordDO message);
}

View File

@@ -1,6 +1,7 @@
package com.njcn.msgpush.module.push.client.sender;
import java.util.List;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.Map;
/**
@@ -9,11 +10,11 @@ import java.util.Map;
*/
public interface EmailSender {
/**
* 发送邮件
* 发送单条邮件
*
* @param params 参数
* @param toAddressList 接收地址集合
* @param message 消息
* @param params 参数
* @return 发送结果
*/
boolean sendEmail(Map<String, Object> params, List<String> toAddressList);
boolean sendEmail(MessageRecordDO message, Map<String, Object> params);
}

View File

@@ -0,0 +1,54 @@
package com.njcn.msgpush.module.push.client.sender;
import cn.hutool.core.util.ObjectUtil;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author caozehui
* @data 2026-03-02
*/
@Component
public class Sender {
@Autowired
public ChannelProviderConfigService channelProviderConfigService;
@Autowired
public MessageRetryQueueService messageRetryQueueService;
@Autowired
public ProviderErrorCodeMappingService providerErrorCodeMappingService;
@Autowired
public RestTemplateUtil restTemplateUtil;
public final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,
5,
1000,
TimeUnit.MILLISECONDS,
new java.util.concurrent.ArrayBlockingQueue<>(1000),
r -> {
Thread thread = new Thread(r);
thread.setName("threadName-" + thread.getId());
thread.setDaemon(false);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

View File

@@ -3,7 +3,6 @@ package com.njcn.msgpush.module.push.client.sender;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.List;
import java.util.Map;
/**
* @author caozehui
@@ -11,7 +10,7 @@ import java.util.Map;
*/
public interface SmsSender {
/**
* 向个手机号发送短信
* 向个手机号发送单条短信
*
* @param message 消息
* @return 发送结果
@@ -19,7 +18,7 @@ public interface SmsSender {
boolean sendSms(MessageRecordDO message);
/**
* 向多个手机号发送短信
* 向多个手机号发送短信,支持发送不同签名、不同的模板、不同模板变量的短信
*
* @param messageList 消息集合
* @return 发送结果

View File

@@ -1,17 +1,18 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.aliyun.dm20151123.Client;
import com.aliyun.dm20151123.models.SingleSendMailRequest;
import com.aliyun.dm20151123.models.SingleSendMailResponse;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -22,30 +23,18 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
public class AliyunEmailSender implements EmailSender {
private Sender sender;
public static final String ACCOUNT_NAME = "accountName";
public static final String REPLY_TO_ADDRESS = "replyToAddress";
public static final String SUBJECT = "subject";
public static final String HTML_BODY = "htmlBody";
public static final String TEXT_BODY = "textBody";
private Client emailClient;
private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,
5,
1000,
TimeUnit.MILLISECONDS,
new java.util.concurrent.ArrayBlockingQueue<>(1000),
r -> {
Thread thread = new Thread(r);
thread.setName("AliYunSmsClient-Pool-" + thread.getId());
thread.setDaemon(false);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public AliyunEmailSender(AliYunMailSetting aliYunMailSetting) throws Exception {
public AliyunEmailSender(AliYunMailSetting aliYunMailSetting, Sender sender) {
this.sender = sender;
if (ObjectUtil.isNotNull(aliYunMailSetting)) {
Config config = new Config()
.setAccessKeyId(aliYunMailSetting.getAccessKeyId())
@@ -57,35 +46,33 @@ public class AliyunEmailSender implements EmailSender {
this.emailClient = new com.aliyun.dm20151123.Client(config);
} catch (Exception e) {
log.error("阿里云-邮件服务初始化失败,请检查配置信息");
throw e;
throw new RuntimeException(e);
}
}
}
@Override
public boolean sendEmail(Map<String, Object> params, List<String> toAddressList) {
public boolean sendEmail(MessageRecordDO message, Map<String, Object> params) {
RuntimeOptions runtimeOptions = new RuntimeOptions();
runtimeOptions.autoretry = true;
SingleSendMailRequest request = new SingleSendMailRequest()
.setAccountName(params.get(ACCOUNT_NAME).toString())
.setAddressType(1)
.setReplyToAddress((boolean) params.get(REPLY_TO_ADDRESS))
.setToAddress(StrUtil.join(StrUtil.COMMA, toAddressList))
.setToAddress(message.getReceiver()) //目标地址,多个 email 地址可以用逗号分隔,最多 100 个地址(支持邮件组)。
.setSubject(params.get(SUBJECT).toString())
.setHtmlBody(params.get(HTML_BODY).toString())
.setHtmlBody(params.get(HTML_BODY).toString()) //HtmlBody 和 TextBody 是针对不同类型的邮件
.setTextBody(params.get(TEXT_BODY).toString());
try {
SingleSendMailResponse response = this.emailClient.singleSendMailWithOptions(request, runtimeOptions);
if (HttpStatus.OK.value() == response.getStatusCode()) {
return true;
} else {
return false;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
// if(OK.equals(response.)){
// return true;
// }else {
// return false;
// }
return false;
}
}

View File

@@ -6,17 +6,18 @@ import com.aliyun.dysmsapi20170525.models.SendSmsRequest;
import com.aliyun.dysmsapi20170525.models.SendSmsResponse;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.aliyun.teautil.Common.toJSONString;
@@ -27,29 +28,30 @@ import static com.aliyun.teautil.Common.toJSONString;
*/
@Slf4j
public class AliyunSmsSender implements SmsSender {
public static final String SIGN_NAME = "signName";
public static final String TEMPLATE_CODE = "templateCode";
public static final String TEMPLATE_PARAM = "templateParam";
public static final String OK = "OK";
private Sender sender;
private Client smsClient;
private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,
5,
1000,
TimeUnit.MILLISECONDS,
new java.util.concurrent.ArrayBlockingQueue<>(1000),
r -> {
Thread thread = new Thread(r);
thread.setName("AliyunSmsSender-Pool-" + thread.getId());
thread.setDaemon(false);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
// private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
// 5,
// 5,
// 1000,
// TimeUnit.MILLISECONDS,
// new java.util.concurrent.ArrayBlockingQueue<>(1000),
// r -> {
// Thread thread = new Thread(r);
// thread.setName("AliyunSmsSender-Pool-" + thread.getId());
// thread.setDaemon(false);
// return thread;
// },
// new ThreadPoolExecutor.CallerRunsPolicy()
// );
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting) {
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) {
this.sender = sender;
if (ObjectUtil.isNotNull(aliYunSmsSetting)) {
Config config = new Config()
.setAccessKeyId(aliYunSmsSetting.getAccessKeyId())
@@ -66,16 +68,15 @@ public class AliyunSmsSender implements SmsSender {
}
}
@Override
public boolean sendSms(MessageRecordDO message) {
Future<Boolean> future = THREAD_POOL_EXECUTOR.submit(() -> {
Future<Boolean> future = this.sender.THREAD_POOL_EXECUTOR.submit(() -> {
message.setStatus(MessageStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions();
// 设置自动重试默认是不开启的。重试次数默认是3次
runtimeOptions.autoretry = true;
SendSmsRequest request = new SendSmsRequest()
.setPhoneNumbers(message.getReceiver())
.setPhoneNumbers(message.getReceiver()) //手机号码之间以半角逗号(,)分隔。上限为 1000 个手机号码。
.setSignName(message.getTitle())
.setTemplateCode(message.getTemplateCode())
.setTemplateParam(message.getTemplateParams());
@@ -90,11 +91,15 @@ public class AliyunSmsSender implements SmsSender {
if (OK.equals(response.body.code)) {
return true;
} else {
message.setErrorCode(response.body.code);
message.setErrorMsg(response.body.message);
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
} catch (Exception e) {
log.error("阿里云-短信服务发送失败");
throw new Exception(e);
throw new RuntimeException(e);
}
});
Boolean b = null;
@@ -104,11 +109,6 @@ public class AliyunSmsSender implements SmsSender {
throw new RuntimeException(e);
}
if (b) {
message.setStatus(MessageStatusConstant.SUCCESS);
} else {
message.setStatus(MessageStatusConstant.FAILED);
}
return b;
}

View File

@@ -1,12 +1,16 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import com.alibaba.fastjson.JSON;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import lombok.Data;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import java.time.LocalDateTime;
@@ -15,53 +19,31 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author caozehui
* @data 2026-02-11
*/
@Data
public class TelecomSmsSender implements SmsSender {
/**
* 短信接口地址
*/
private static final String API_URL = "https://sms.ymeeting.cn/smsv2";
/**
* 虚拟接入码
*/
private static final String ACCESS_CODE = "106905631";
/**
* 短信接口内容类型
*/
private static final String CONTENT_TYPE = "application/json;charset=utf-8";
private TelecomSmsSetting telecomSmsSetting;
private Sender sender;
private RestTemplateUtil restTemplateUtil;
private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,
5,
1000,
TimeUnit.MILLISECONDS,
new java.util.concurrent.ArrayBlockingQueue<>(1000),
r -> {
Thread thread = new Thread(r);
thread.setName("TelecomSmsSender-Pool-" + thread.getId());
thread.setDaemon(false);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
private class TelecomSmsResponse {
@Data
private static class TelecomSmsResponse {
private String status;
private Double balance;
private List<TelecomSmsMessageRes> list;
}
private class TelecomSmsMessageRes {
@Data
private static class TelecomSmsMessageRes {
//消息ID用于状态报告匹配
private String mid;
//手机号码
@@ -71,14 +53,14 @@ public class TelecomSmsSender implements SmsSender {
}
public TelecomSmsSender(TelecomSmsSetting telecomSmsSetting, RestTemplateUtil restTemplateUtil) {
public TelecomSmsSender(TelecomSmsSetting telecomSmsSetting, Sender sender) {
this.telecomSmsSetting = telecomSmsSetting;
this.restTemplateUtil = restTemplateUtil;
this.sender = sender;
}
@Override
public boolean sendSms(MessageRecordDO message) {
Future<Boolean> future = THREAD_POOL_EXECUTOR.submit(() -> {
Future<Boolean> future = this.sender.THREAD_POOL_EXECUTOR.submit(() -> {
message.setStatus(MessageStatusConstant.SENDING);
// 构建请求参数
Map<String, Object> request = new HashMap<>();
@@ -87,7 +69,7 @@ public class TelecomSmsSender implements SmsSender {
request.put("password", telecomSmsSetting.getPassword());
request.put("mobile", message.getReceiver());
request.put("content", message.getContent());
request.put("extno", ACCESS_CODE);
request.put("extno", telecomSmsSetting.getExtno());
// 设置请求头
HttpHeaders headers = new HttpHeaders();
@@ -97,19 +79,24 @@ public class TelecomSmsSender implements SmsSender {
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now);
// 发送请求
ResponseEntity<String> response = restTemplateUtil.post(
API_URL,
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class
);
System.out.println(JSON.toJSONString(response));
LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
TelecomSmsResponse telecomSmsResponse = JSON.parseObject(response.getBody(), TelecomSmsResponse.class);
if (telecomSmsResponse.list.get(0).result == 0) {
if (response.getStatusCode() == HttpStatus.OK && telecomSmsResponse.list.get(0).result == 0) {
return true;
} else {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsResponse.list.get(0).result + "");
message.setErrorCode(telecomSmsResponse.list.get(0).result + "");
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
return false;
}
});
@@ -121,11 +108,6 @@ public class TelecomSmsSender implements SmsSender {
throw new RuntimeException(e);
}
if (b) {
message.setStatus(MessageStatusConstant.SUCCESS);
} else {
message.setStatus(MessageStatusConstant.FAILED);
}
return b;
}
@@ -141,7 +123,7 @@ public class TelecomSmsSender implements SmsSender {
message.setStatus(MessageStatusConstant.SENDING);
}
request.put("mobileContentKvp", JSON.toJSONString(mobileContentKvp));
request.put("extno", ACCESS_CODE);
request.put("extno", telecomSmsSetting.getExtno());
// 设置请求头
HttpHeaders headers = new HttpHeaders();
@@ -151,8 +133,8 @@ public class TelecomSmsSender implements SmsSender {
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 发送请求
ResponseEntity<String> response = restTemplateUtil.post(
API_URL,
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class

View File

@@ -0,0 +1,132 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import com.getui.push.v2.sdk.ApiHelper;
import com.getui.push.v2.sdk.GtApiConfiguration;
import com.getui.push.v2.sdk.api.PushApi;
import com.getui.push.v2.sdk.common.ApiResult;
import com.getui.push.v2.sdk.dto.req.Audience;
import com.getui.push.v2.sdk.dto.req.Settings;
import com.getui.push.v2.sdk.dto.req.message.PushDTO;
import com.getui.push.v2.sdk.dto.req.message.PushMessage;
import com.getui.push.v2.sdk.dto.req.message.android.GTNotification;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.setting.impl.UniPushAppPushSetting;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.Map;
/**
* @author caozehui
* @data 2026-02-26
*/
public class UniPushAppPushSender implements AppPushSender {
// private UniPushAppPushSetting uniPushAppPushSetting;
private Sender sender;
private ApiHelper apiHelper;
public UniPushAppPushSender(UniPushAppPushSetting uniPushAppPushSetting, Sender sender) {
// this.uniPushAppPushSetting = uniPushAppPushSetting;
this.sender = sender;
GtApiConfiguration gtApiConfiguration = new GtApiConfiguration();
gtApiConfiguration.setAppId(uniPushAppPushSetting.getAppId());
gtApiConfiguration.setAppKey(uniPushAppPushSetting.getAppKey());
gtApiConfiguration.setMasterSecret(uniPushAppPushSetting.getMasterSecret());
gtApiConfiguration.setDomain("https://restapi.getui.com/v2/");
this.apiHelper = ApiHelper.build(gtApiConfiguration);
}
@Override
public boolean appPush(MessageRecordDO message) {
PushDTO<Audience> pushDTO = this.buildPushDTO(message.getTitle(), message.getContent());
// 进行cid单推
PushApi pushApi = apiHelper.creatApi(PushApi.class);
// 设置接收人信息
Audience audience = new Audience();
audience.addCid(message.getReceiver());
pushDTO.setAudience(audience);
ApiResult<Map<String, Map<String, String>>> apiResult = pushApi.pushToSingleByCid(pushDTO);
return apiResult.isSuccess();
}
/**
* 构建推送消息体
*
* @param title
* @param content
* @return
*/
private PushDTO<Audience> buildPushDTO(String title, String content) {
PushDTO<Audience> pushDTO = new PushDTO<>();
// 设置推送参数
pushDTO.setRequestId(System.currentTimeMillis() + "");
// pushDTO.setGroupName("hyqx-group");
//配置推送条件
Settings settings = new Settings();
//消息有效期,走厂商消息需要设置该值
settings.setTtl(3600000);
pushDTO.setSettings(settings);
//安卓在线通道走个推推送时的消息体在线通道不支持ios
PushMessage pushMessage = new PushMessage();
//通知消息
GTNotification notification = new GTNotification();
notification.setTitle(title);
notification.setBody(content);
/**
* intent打开应用内特定页面
* url打开网页地址
* payload自定义消息内容启动应用
* payload_custom自定义消息内容不启动应用
* startapp打开应用首页
* none纯通知无后续动作
*/
//进入特定的页面
// notification.setClickType("intent");
// notification.setIntent("intent://com.getui.push/detail?#Intent;scheme=gtpushscheme;launchFlags=0x4000000;package=com.getui.demo;component=com.getui.demo/com.getui.demo.DemoActivity;S.payload=payloadStr;end");
notification.setClickType("startapp");
pushMessage.setNotification(notification);
pushDTO.setPushMessage(pushMessage);
//透传消息 此格式的透传消息由 unipush 做了特殊处理,会自动展示通知栏。开发者也可自定义其它格式,在客户端自己处理。
// pushMessage.setTransmission(" {title:\"个推通道透传消息标题\",content:\"个推通道透传消息内容\",payload:\"自定义数据\"}");
//设置离线推送时的消息体
// PushChannel pushChannel = new PushChannel();
//安卓离线厂商通道推送的消息体
// AndroidDTO androidDTO = new AndroidDTO();
// Ups ups = new Ups();
// //通知消息
// ThirdNotification thirdNotification = new ThirdNotification();
// ups.setNotification(thirdNotification);
// thirdNotification.setTitle(title + "安卓离线厂商通道通知消息标题");
// thirdNotification.setBody(content + "安卓离线厂商通道通知消息内容");
// thirdNotification.setClickType("intent");
// thirdNotification.setIntent("intent://com.getui.push/detail?#Intent;scheme=gtpushscheme;launchFlags=0x4000000;package=com.getui.demo;component=com.getui.demo/com.getui.demo.DemoActivity;S.payload=payloadStr;end");
// //透传消息
//// ups.setTransmission(" {title:\"安卓离线厂商通道透传消息标题\",content:\"安卓离线厂商通道透传消息内容\",payload:\"自定义数据\"}");
//
// androidDTO.setUps(ups);
// pushChannel.setAndroid(androidDTO);
// //ios离线apn通道推送的消息体
// Alert alert = new Alert();
// alert.setTitle(title+"苹果离线通知栏标题");
// alert.setBody(content+"苹果离线通知栏内容");
// Aps aps = new Aps();
// aps.setContentAvailable(0);//0表示普通通知消息(默认为0);1表示静默推送(无通知栏消息)静默推送时不需要填写其他参数。苹果建议1小时最多推送3条静默消息
// aps.setSound("default");//自定义铃声:系统铃声设置为default; 无声设置为com.gexin.ios.silence或不填
// aps.setAlert(alert);
// IosDTO iosDTO = new IosDTO();
// iosDTO.setAps(aps);
// iosDTO.setType("notify");
// pushChannel.setIos(iosDTO);
// pushDTO.setPushChannel(pushChannel);
return pushDTO;
}
}

View File

@@ -1,27 +0,0 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.setting.impl.UniPushAppPushSetting;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
/**
* @author caozehui
* @data 2026-02-26
*/
public class UniPushEmailSender implements AppPushSender {
private UniPushAppPushSetting uniPushAppPushSetting;
private RestTemplateUtil restTemplateUtil;
public UniPushEmailSender(UniPushAppPushSetting uniPushAppPushSetting, RestTemplateUtil restTemplateUtil) {
this.uniPushAppPushSetting = uniPushAppPushSetting;
this.restTemplateUtil = restTemplateUtil;
}
@Override
public boolean appPush(MessageRecordDO message) {
return false;
}
}

View File

@@ -16,4 +16,6 @@ import lombok.EqualsAndHashCode;
public class TelecomSmsSetting extends SmsSetting {
private String account;
private String password;
private String extno;
private String apiUrl;
}

View File

@@ -14,6 +14,6 @@ import lombok.*;
public class UniPushAppPushSetting extends AppPushSetting {
private String appId;
private String appKey;
private String appSecret;
private String uniAppSecret;
private String masterSecret;
}

View File

@@ -7,7 +7,7 @@ package com.njcn.msgpush.module.push.constant;
public class MsgPushConstant {
public static final String PROVIDER_TYPE_ALI_YUN = "aliyun";
public static final String PROVIDER_TYPE_TELECOM = "telecom";
public static final String PROVIDER_TYPE_UNI_PUSH = "UniPush";
public static final String PROVIDER_TYPE_UNI_PUSH = "uniPush";
public static final String CHANNEL_SMS = "sms";
public static final String CHANNEL_EMAIL = "email";

View File

@@ -4,10 +4,20 @@ import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.AliyunProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.TelecomProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.UniPushProviderFactory;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.controller.admin.channel.vo.ChannelProviderConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -19,6 +29,8 @@ import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
@Tag(name = "管理后台 - 渠道服务商")
@Slf4j
@Validated
@@ -33,45 +45,55 @@ public class ChannelProviderConfigController {
@Qualifier("messageProviderFactoryMap")
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
@PostMapping("/list")
@PostMapping("/page")
@Operation(summary = "分页查询渠道服务商列表")
@PreAuthorize("@ss.hasPermission('push:channel:list')")
public CommonResult<Page<ChannelProviderConfigDO>> getChannelProviderConfigPage(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
Page<ChannelProviderConfigDO> res = channelProviderConfigService.listChannelProviderCfg(reqVO);
return CommonResult.success(res);
@PreAuthorize("@ss.hasPermission('push:channel:page')")
@Parameter(name = "reqVO", description = "分页查询参数", required = true)
public CommonResult<Page<ChannelProviderConfigDO>> pageChannelProviderConfig(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
Page<ChannelProviderConfigDO> res = channelProviderConfigService.getPage(reqVO);
return success(res);
}
@PostMapping("/add")
@Operation(summary = "新增渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:add')")
@Parameter(name = "reqVO", description = "新增参数", required = true)
public CommonResult<Boolean> addChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
boolean res = channelProviderConfigService.save(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class));
return CommonResult.success(res);
return success(res);
}
@PostMapping("/update")
@Operation(summary = "更新渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:update')")
@Parameter(name = "reqVO", description = "更新参数", required = true)
public CommonResult<Boolean> updateChannelProvider(@Validated @RequestBody ChannelProviderConfigReqVO reqVO) {
boolean res = channelProviderConfigService.updateById(BeanUtil.copyProperties(reqVO, ChannelProviderConfigDO.class));
return CommonResult.success(res);
return success(res);
}
@PostMapping("/delete")
@Operation(summary = "删除渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:delete')")
@Parameter(name = "ids", description = "id列表", required = true)
public CommonResult<Boolean> deleteChannelProvider(@RequestBody List<String> ids) {
boolean res = channelProviderConfigService.removeBatchByIds(ids);
return CommonResult.success(res);
return success(res);
}
@GetMapping("/toggle")
@Operation(summary = "启用/禁用渠道服务商")
@PreAuthorize("@ss.hasPermission('push:channel:toggle')")
public CommonResult<Boolean> toggleEnableChannelProvider(@RequestParam("id") String id) {
boolean res = channelProviderConfigService.toggleEnableField(id);
return CommonResult.success(res);
@Parameter(name = "id", description = "id", required = true)
public CommonResult<Void> toggleEnableChannelProvider(@RequestParam("id") String id) {
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.toggleEnableField(id);
if (channelProviderConfigDO.getEnabled() == 1) {
registerProviderBean(channelProviderConfigDO.getProviderType());
} else {
removeProviderBean(channelProviderConfigDO.getProviderType());
}
return success(null);
}
/**
@@ -80,13 +102,22 @@ public class ChannelProviderConfigController {
* @param providerType 服务商类型例如aliyun\telecom\UniPush
*/
public void registerProviderBean(String providerType) {
// DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
// BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClass);
// for (Object arg : args) {
// builder.addConstructorArgValue(arg);
// }
// beanFactory.registerBeanDefinition(beanName, builder.getBeanDefinition());
switch (providerType) {
case MsgPushConstant.PROVIDER_TYPE_ALI_YUN: {
messageProviderFactoryMap.put(MsgPushConstant.PROVIDER_TYPE_ALI_YUN, new AliyunProviderFactory());
}
break;
case MsgPushConstant.PROVIDER_TYPE_TELECOM: {
messageProviderFactoryMap.put(MsgPushConstant.PROVIDER_TYPE_TELECOM, new TelecomProviderFactory());
}
break;
case MsgPushConstant.PROVIDER_TYPE_UNI_PUSH: {
messageProviderFactoryMap.put(MsgPushConstant.PROVIDER_TYPE_UNI_PUSH, new UniPushProviderFactory());
}
break;
default:
throw new IllegalArgumentException("" + providerType + "服务商暂不支持");
}
}
/**
@@ -95,10 +126,6 @@ public class ChannelProviderConfigController {
* @param providerType 服务商类型例如aliyun\telecom\UniPush
*/
public void removeProviderBean(String providerType) {
// DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
// if (beanFactory.containsBeanDefinition(beanName)) {
// beanFactory.removeBeanDefinition(beanName);
// }
messageProviderFactoryMap.remove(providerType);
}
}

View File

@@ -2,32 +2,42 @@ package com.njcn.msgpush.module.push.controller.admin.channel.vo;
import com.njcn.msgpush.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
@Data
@Schema(description = "管理后台 - 渠道服务商配置 Request VO")
public class ChannelProviderConfigReqVO extends PageParam {
@Schema(description = "渠道ID")
private String id;
@Schema(description = "渠道类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "sms/email/app_push")
@NotBlank(message = "渠道类型不能为空")
private String channel;
@Schema(description = "服务商名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "中国电信/阿里云/UniPush")
@NotBlank(message = "服务商名称不能为空")
private String providerName;
@Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio/unipush")
@NotBlank(message = "服务商类型不能为空")
private String providerType;
@Schema(description = "API地址", requiredMode = Schema.RequiredMode.REQUIRED, example = "https://api.example.com")
@Schema(description = "API地址", example = "https://api.example.com")
@NotBlank(message = "API地址不能为空")
private String apiUrl;
@Schema(description = "AppKey", requiredMode = Schema.RequiredMode.REQUIRED, example = "123456")
@Schema(description = "AppKey", example = "123456")
@NotBlank(message = "AppKey不能为空")
private String appKey;
@Schema(description = "AppSecret", requiredMode = Schema.RequiredMode.REQUIRED, example = "123456")
@Schema(description = "AppSecret", example = "123456")
@NotBlank(message = "AppSecret不能为空")
private String appSecret;
@Schema(description = "额外配置JSON格式", requiredMode = Schema.RequiredMode.REQUIRED, example = "{}")
@Schema(description = "额外配置JSON格式", example = "{}")
private String extraConfig;
@Schema(description = "优先级(数字越小优先级越高)", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@Schema(description = "优先级(数字越小优先级越高)", example = "1")
private Integer priority;
}

View File

@@ -1,7 +1,10 @@
package com.njcn.msgpush.module.push.controller.admin.message;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@@ -9,11 +12,17 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.security.PermitAll;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
@Tag(name = "管理后台 - 消息")
@Slf4j
@Validated
@@ -28,8 +37,41 @@ public class MessageRecordController {
@PostMapping("send")
@Operation(summary = "消息推送")
@Parameter(name = "id", description = "编号", required = true, example = "1024")
public CommonResult<Boolean> send(MessageRecordSendReqVO messageRecordSendReqVO) {
Boolean result = messageRecordService.send(messageRecordSendReqVO);
public CommonResult<Boolean> send(@Validated @RequestBody List<MessageRecordReqVO> reqVOList) {
Boolean result = messageRecordService.send(reqVOList);
return CommonResult.success(result);
}
@PostMapping("/page")
@Operation(summary = "分页查询渠道服务商列表")
@PreAuthorize("@ss.hasPermission('push:message:page')")
@Parameter(name = "reqVO", description = "分页查询参数", required = true)
public CommonResult<Page<MessageRecordDO>> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) {
Page<MessageRecordDO> res = messageRecordService.getPage(reqVO);
return success(res);
}
@PostMapping("/add")
@Operation(summary = "添加消息记录")
@PreAuthorize("@ss.hasPermission('push:message:add')")
@Parameter(name = "reqVO", description = "添加参数", required = true)
public CommonResult<Boolean> add(@Validated @RequestBody MessageRecordReqVO reqVO) {
return messageRecordService.add(reqVO) ? success(true) : success(false);
}
@PostMapping("/update")
@Operation(summary = "更新消息记录")
@PreAuthorize("@ss.hasPermission('push:message:update')")
@Parameter(name = "reqVO", description = "更新参数", required = true)
public CommonResult<Boolean> update(@Validated @RequestBody MessageRecordReqVO reqVO) {
return messageRecordService.update(reqVO) ? success(true) : success(false);
}
@PostMapping("/delete")
@Operation(summary = "删除消息记录")
@PreAuthorize("@ss.hasPermission('push:message:delete')")
@Parameter(name = "ids", description = "编号", required = true)
public CommonResult<Boolean> delete(@RequestBody List<String> ids) {
return messageRecordService.delete(ids) ? success(true) : success(false);
}
}

View File

@@ -1,25 +1,38 @@
package com.njcn.msgpush.module.push.controller.admin.message.vo;
import com.njcn.msgpush.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
/**
* @author caozehui
* @data 2026-02-27
*/
@Data
@Schema(description = "管理后台 - 消息记录发送 Request VO")
public class MessageRecordSendReqVO {
public class MessageRecordReqVO extends PageParam {
@Schema(description = "主键ID")
private Long id;
@Schema(description = "消息唯一ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "123456")
@NotBlank(message = "消息唯一ID不能为空")
private String messageId;
@Schema(description = "应用名称/来源系统标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "NPQS-9500")
@NotBlank(message = "应用名称/来源系统标识不能为空")
private String appName;
@Schema(description = "渠道类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "sms/email/app_push")
@NotBlank(message = "渠道类型不能为空")
private String channel;
@Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify")
@NotBlank(message = "消息类型不能为空")
private String messageType;
@Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED, example = "15601691300")
@NotBlank(message = "接收者不能为空")
private String receiver;
@Schema(description = "标题", requiredMode = Schema.RequiredMode.REQUIRED)
@@ -35,6 +48,7 @@ public class MessageRecordSendReqVO {
private String templateParams;
@Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio")
@NotBlank(message = "服务商类型不能为空")
private String providerType;
@Schema(description = "第三方消息ID")

View File

@@ -0,0 +1,54 @@
package com.njcn.msgpush.module.push.controller.admin.retry;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
/**
* @author caozehui
* @data 2026-02-27
*/
@Tag(name = "管理后台 - 消息重试")
@Slf4j
@Validated
@RestController
@RequestMapping("/push/retry")
public class MessageRetryQueueController {
@Autowired
private MessageRetryQueueService messageRetryQueueService;
@PostMapping("/page")
@Operation(summary = "分页获得消息重试队列")
@PreAuthorize("@ss.hasPermission('push:retry:page')")
@Parameter(name = "reqVO", description = "分页查询参数", required = true)
public CommonResult<PageResult<MessageRetryQueueDO>> getRetryPage(@Validated @RequestBody MessageRetryQueueReqVO reqVO) {
PageResult<MessageRetryQueueDO> result = messageRetryQueueService.getPage(reqVO);
return success(result);
}
@PostMapping("/manual")
@Operation(summary = "批量手动重试消息")
@PreAuthorize("@ss.hasPermission('push:retry:manual')")
public CommonResult<Void> manualRetry(@RequestBody List<String> messageIds) {
messageRetryQueueService.manualRetry(messageIds);
return success(null);
}
}

View File

@@ -0,0 +1,31 @@
package com.njcn.msgpush.module.push.controller.admin.retry.vo;
import com.njcn.msgpush.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import lombok.Data;
/**
* @author caozehui
* @data 2026-02-27
*/
@Data
@Schema(description = "管理后台 - 消息重试 Request VO")
public class MessageRetryQueueReqVO extends PageParam {
@Schema(description = "消息ID")
private String messageId;
@Schema(description = "渠道类型", example = "sms/email/app_push")
private String channel;
@Schema(description = "接收者", example = "10086")
private String receiver;
@Schema(description = "最小重试次数")
@Min(value = 0, message = "最小重试次数不能小于0")
private Integer minRetryCount;
@Schema(description = "最大重试次数")
@Min(value = 0, message = "最大重试次数不能小于0")
private Integer maxRetryCount;
}

View File

@@ -51,6 +51,11 @@ public class ChannelProviderConfigDO extends BaseDO {
*/
private String appSecret;
/**
* 电信sms服务所需接入码
*/
private String extno;
/**
* 额外配置JSON格式
*/

View File

@@ -0,0 +1,71 @@
package com.njcn.msgpush.module.push.dal.dataobject.channel;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author caozehui
* @data 2026-03-02
* @description 服务商错误码映射表
*/
@Data
@TableName("push_channel_provider_config")
@EqualsAndHashCode(callSuper = true)
public class ProviderErrorCodeMappingDO extends BaseDO {
/**
* 主键ID
*/
private Long id;
/**
* 服务商类型telecom/cmcc/aliyun/twilio
*/
private String providerType;
/**
* 渠道类型sms/email/app_push
*/
private String channel;
/**
* 服务商原始错误码
*/
private String originalCode;
/**
* 服务商原始错误描述
*/
private String originalMessage;
/**
* 统一错误码
*/
private String unifiedErrorCode;
/**
* 统一错误描述
*/
private String unifiedErrorMessage;
/**
* 错误类别NETWORK/BALANCE/INVALID_PARAM/PROVIDER_ERROR/BLACKLIST
*/
private String errorCategory;
/**
* 是否应该重试0-否 1-是
*/
private Integer shouldRetry;
/**
* 最终状态failed/final_failed/blacklisted/invalid_param
*/
private String finalStatus;
/**
* 备注说明
*/
private String remark;
}

View File

@@ -66,6 +66,11 @@ public class MessageRecordDO extends BaseDO {
*/
private String templateParams;
/**
* 额外信息
*/
private String extraInfo;
/**
* 状态pending/sending/success/failed/final_failed/blacklisted/quota_exceeded/rate_limited/abandoned
*/

View File

@@ -0,0 +1,68 @@
package com.njcn.msgpush.module.push.dal.dataobject.retry;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author caozehui
* @data 2026-02-27
* @description 消息重试队列表对应数据对象
*/
@Data
@TableName("message_retry_queue")
@EqualsAndHashCode(callSuper = true)
public class MessageRetryQueueDO extends BaseDO {
/**
* 主键ID
*/
private Long id;
/**
* 关联message_record的message_id
*/
private String messageId;
/**
* 渠道类型
*/
private String channel;
/**
* 接收者
*/
private String receiver;
/**
* 已重试次数
*/
private Integer retryCount;
/**
* 最大重试次数
*/
private Integer maxRetry;
/**
* 下次重试时间
*/
private LocalDateTime nextRetryTime;
/**
* 首次失败时间
*/
private LocalDateTime firstFailTime;
/**
* 最后一次重试时间
*/
private LocalDateTime lastRetryTime;
/**
* 最后失败原因
*/
private String lastErrorMsg;
}

View File

@@ -4,6 +4,10 @@ import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface ChannelProviderConfigMapper extends BaseMapperX<ChannelProviderConfigDO> {
List<ChannelProviderConfigDO> getActiveProviders();
}

View File

@@ -0,0 +1,11 @@
package com.njcn.msgpush.module.push.dal.mysql.channel;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
/**
* @author caozehui
* @data 2026-03-02
*/
public interface ProviderErrorCodeMappingMappper extends BaseMapperX<ProviderErrorCodeMappingDO> {
}

View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.msgpush.module.push.dal.mysql.channel.ChannelProviderConfigMapper">
<select id="getActiveProviders"
resultType="com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO">
select *
from push_channel_provider_config
where enabled = 1
</select>
</mapper>

View File

@@ -2,7 +2,9 @@ package com.njcn.msgpush.module.push.dal.mysql.message;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import jakarta.validation.constraints.Pattern;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface MessageRecordMapper extends BaseMapperX<MessageRecordDO> {

View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper">
</mapper>

View File

@@ -0,0 +1,55 @@
package com.njcn.msgpush.module.push.dal.mysql.retry;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
import java.util.List;
@Mapper
public interface MessageRetryQueueMapper extends BaseMapperX<MessageRetryQueueDO> {
/**
* 查询需要重试的消息列表(按下次重试时间升序)
*
* @param currentTime 当前时间
* @param limit 限制数量
* @return 待重试消息列表
*/
List<MessageRetryQueueDO> selectNeedRetryMessages(@Param("currentTime") LocalDateTime currentTime,
@Param("limit") int limit);
/**
* 更新重试信息
*
* @param messageId 消息ID
* @param retryCount 重试次数
* @param nextRetryTime 下次重试时间
* @param lastRetryTime 最后重试时间
* @param lastErrorMsg 最后错误信息
* @return 影响行数
*/
int updateRetryInfo(@Param("messageId") String messageId,
@Param("retryCount") Integer retryCount,
@Param("nextRetryTime") LocalDateTime nextRetryTime,
@Param("lastRetryTime") LocalDateTime lastRetryTime,
@Param("lastErrorMsg") String lastErrorMsg);
/**
* 删除成功的重试记录
*
* @param messageId 消息ID
* @return 影响行数
*/
int deleteByMessageId(@Param("messageId") String messageId);
/**
* 批量删除多个消息ID的重试记录
*
* @param messageIds 消息ID列表
* @return 影响行数
*/
int deleteByMessageIds(@Param("messageIds") List<String> messageIds);
}

View File

@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper">
<select id="selectNeedRetryMessages" resultType="com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO">
SELECT *
FROM message_retry_queue
WHERE next_retry_time <![CDATA[ <= ]]> #{currentTime} and deleted != 0
ORDER BY next_retry_time ASC
LIMIT #{limit}
</select>
<update id="updateRetryInfo">
UPDATE message_retry_queue
SET retry_count = #{retryCount},
next_retry_time = #{nextRetryTime},
last_retry_time = #{lastRetryTime},
last_error_msg = #{lastErrorMsg},
update_time = NOW()
WHERE message_id = #{messageId} and deleted != 0
</update>
<delete id="deleteByMessageId">
DELETE
FROM message_retry_queue
WHERE message_id = #{messageId}
</delete>
<delete id="deleteByMessageIds">
DELETE FROM message_retry_queue
WHERE message_id IN
<foreach collection="messageIds" item="messageId" open="(" separator="," close=")">
#{messageId}
</foreach>
</delete>
</mapper>

View File

@@ -0,0 +1,123 @@
package com.njcn.msgpush.module.push.dal.redis;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.ZoneId;
import java.util.Set;
/**
* {@link com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO} RedisDAO
*
* @author caozehui
* @data 2026-02-27
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageRetryRedisDAO {
private final RedisTemplate<String, String> redisTemplate;
/**
* Redis中消息重试队列的Key前缀
*/
private static final String RETRY_QUEUE_KEY_PREFIX = "msgpush:retry_queue:";
/**
* 获取指定渠道的重试队列Key
*/
private String getRetryQueueKey(String channel) {
return RETRY_QUEUE_KEY_PREFIX + channel;
}
/**
* 添加消息到重试队列
*
* @param message 消息
*/
public void addToRetryQueue(MessageRecordDO message) {
String key = getRetryQueueKey(message.getChannel());
double score = message.getNextRetryTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
try {
redisTemplate.opsForZSet().add(key, message.getMessageId(), score);
log.debug("添加消息到重试队列成功: channel={}, messageId={}, nextRetryTime={}",
message.getChannel(), message.getMessageId(), message.getNextRetryTime());
} catch (Exception e) {
log.error("添加消息到重试队列失败: channel={}, messageId={}", message.getChannel(), message.getMessageId(), e);
throw e;
}
}
/**
* 从重试队列移除消息
*
* @param channel 渠道类型
* @param messageId 消息ID
*/
public void removeFromRetryQueue(String channel, String messageId) {
String key = getRetryQueueKey(channel);
try {
redisTemplate.opsForZSet().remove(key, messageId);
log.debug("从重试队列移除消息成功: channel={}, messageId={}", channel, messageId);
} catch (Exception e) {
log.error("从重试队列移除消息失败: channel={}, messageId={}", channel, messageId, e);
}
}
/**
* 获取需要重试的消息ID集合
*
* @param channel 渠道类型
* @param currentTime 当前时间戳
* @param limit 限制数量
* @return 消息ID集合
*/
public Set<String> getNeedRetryMessageIds(String channel, long currentTime, int limit) {
String key = getRetryQueueKey(channel);
try {
return redisTemplate.opsForZSet().rangeByScore(key, 0, currentTime, 0, limit);
} catch (Exception e) {
log.error("获取需要重试的消息ID失败: channel={}", channel, e);
return null;
}
}
/**
* 获取队列大小
*
* @param channel 渠道类型
* @return 队列大小
*/
public Long getQueueSize(String channel) {
String key = getRetryQueueKey(channel);
try {
return redisTemplate.opsForZSet().size(key);
} catch (Exception e) {
log.error("获取队列大小失败: channel={}", channel, e);
return 0L;
}
}
/**
* 清空指定渠道的重试队列
*
* @param channel 渠道类型
*/
public void clearRetryQueue(String channel) {
String key = getRetryQueueKey(channel);
try {
redisTemplate.delete(key);
log.info("清空重试队列成功: channel={}", channel);
} catch (Exception e) {
log.error("清空重试队列失败: channel={}", channel, e);
}
}
}

View File

@@ -0,0 +1,70 @@
package com.njcn.msgpush.module.push.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.time.LocalDateTime;
/**
* @author caozehui
* @date 2026-02-27
* @description 消息重试策略枚举
*/
@Getter
@AllArgsConstructor
public enum RetryStrategyEnum {
/**
* 固定间隔重试
*/
FIXED_INTERVAL(1, "固定间隔重试") {
@Override
public LocalDateTime calculateNextRetryTime(int retryCount, int intervalSeconds) {
return LocalDateTime.now().plusSeconds(intervalSeconds);
}
},
/**
* 指数退避重试
*/
EXPONENTIAL_BACKOFF(2, "指数退避重试") {
@Override
public LocalDateTime calculateNextRetryTime(int retryCount, int baseIntervalSeconds) {
// 基础间隔 * 2^重试次数最大不超过1小时
long delay = Math.min(baseIntervalSeconds * (1L << retryCount), 3600);
return LocalDateTime.now().plusSeconds(delay);
}
};
/**
* 自定义时间重试
*/
// CUSTOM(3, "自定义时间重试") {
// @Override
// public LocalDateTime calculateNextRetryTime(int retryCount, int unused) {
//
// return LocalDateTime.now().plusMinutes(5 * retryCount);
// }
// };
private Integer code;
private String description;
/**
* 计算下次重试时间
*
* @param retryCount 当前重试次数
* @param param 参数(根据策略不同含义不同)
* @return 下次重试时间
*/
public abstract LocalDateTime calculateNextRetryTime(int retryCount, int param);
public static RetryStrategyEnum fromCode(Integer code) {
for (RetryStrategyEnum strategy : values()) {
if (strategy.getCode().equals(code)) {
return strategy;
}
}
return FIXED_INTERVAL; // 默认返回固定间隔
}
}

View File

@@ -0,0 +1,44 @@
package com.njcn.msgpush.module.push.job;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @author caozehui
* @date 2026-02-27
* @description 消息重试定时任务
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageRetryJob {
private final MessageRetryQueueService messageRetryQueueService;
/**
* 定时处理短信重试队列每3秒执行一次
*/
@Scheduled(fixedRate = 3000)
public void processSmsRetryQueue() {
messageRetryQueueService.processRetryBatch("sms");
}
/**
* 定时处理邮件重试队列每3秒执行一次
*/
@Scheduled(fixedRate = 3000)
public void processEmailRetryQueue() {
messageRetryQueueService.processRetryBatch("email");
}
/**
* 定时处理APP推送重试队列每3秒执行一次
*/
@Scheduled(fixedRate = 3000)
public void processAppPushRetryQueue() {
messageRetryQueueService.processRetryBatch("app_push");
}
}

View File

@@ -8,7 +8,7 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig
import java.util.List;
public interface ChannelProviderConfigService extends IService<ChannelProviderConfigDO> {
Page<ChannelProviderConfigDO> listChannelProviderCfg(ChannelProviderConfigReqVO pageReqVO);
Page<ChannelProviderConfigDO> getPage(ChannelProviderConfigReqVO reqVO);
List<ChannelProviderConfigDO> getActiveProviders();
@@ -18,7 +18,7 @@ public interface ChannelProviderConfigService extends IService<ChannelProviderCo
* @param id 服务提供商id
* @return
*/
boolean toggleEnableField(String id);
ChannelProviderConfigDO toggleEnableField(String id);
/**
* 根据类型和渠道获取服务提供商
@@ -28,4 +28,6 @@ public interface ChannelProviderConfigService extends IService<ChannelProviderCo
* @return
*/
ChannelProviderConfigDO getByTypeAndChannel(String providerType, String channel);
void failureUpdate(String providerType, String channel);
}

View File

@@ -9,33 +9,29 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig
import com.njcn.msgpush.module.push.dal.mysql.channel.ChannelProviderConfigMapper;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
@Service
public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProviderConfigMapper, ChannelProviderConfigDO> implements ChannelProviderConfigService {
@Override
public Page<ChannelProviderConfigDO> listChannelProviderCfg(ChannelProviderConfigReqVO pageReqVO) {
public Page<ChannelProviderConfigDO> getPage(ChannelProviderConfigReqVO reqVO) {
QueryWrapper<ChannelProviderConfigDO> wrapper = new QueryWrapper<>();
wrapper.lambda().eq(ChannelProviderConfigDO::getChannel, pageReqVO.getChannel());
return this.page(new Page<>(PageUtils.getPageNum(pageReqVO), PageUtils.getPageSize(pageReqVO)), wrapper);
wrapper.lambda().eq(ChannelProviderConfigDO::getChannel, reqVO.getChannel());
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
}
@Override
public List<ChannelProviderConfigDO> getActiveProviders() {
List<ChannelProviderConfigDO> configDOList = List.of(
new ChannelProviderConfigDO()
.setAppKey("LTAI4FxsR76x2dq3w9c5puUe")
.setAppSecret("GxkTR8fsrvHtixTlD9UPmOGli35tZs")
.setProviderName("阿里云")
.setProviderType("aliyun")
);
return configDOList;
return this.lambdaQuery().eq(ChannelProviderConfigDO::getEnabled, true).list();
}
@Override
public boolean toggleEnableField(String id) {
return false;
public ChannelProviderConfigDO toggleEnableField(String id) {
ChannelProviderConfigDO channelProviderConfigDO = this.getById(id);
channelProviderConfigDO.setEnabled(channelProviderConfigDO.getEnabled() ^ 0X0001);
return channelProviderConfigDO;
}
@Override
@@ -44,4 +40,13 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProvide
.eq(ChannelProviderConfigDO::getChannel, channel)
.one();
}
@Override
public void failureUpdate(String providerType, String channel) {
ChannelProviderConfigDO byTypeAndChannel = this.getByTypeAndChannel(providerType, channel);
byTypeAndChannel.setFailureCount(byTypeAndChannel.getFailureCount() + 1);
byTypeAndChannel.setLastFailureTime(LocalDateTime.now());
byTypeAndChannel.setHealthStatus(byTypeAndChannel.getFailureCount() > 5 ? 0 : 1);
this.updateById(byTypeAndChannel);
}
}

View File

@@ -0,0 +1,13 @@
package com.njcn.msgpush.module.push.service.channel;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
/**
* @author caozehui
* @data 2026-03-02
*/
public interface ProviderErrorCodeMappingService extends IService<ProviderErrorCodeMappingDO> {
ProviderErrorCodeMappingDO getByProviderErrorCode(String providerType,String channel,String originalCode);
}

View File

@@ -0,0 +1,23 @@
package com.njcn.msgpush.module.push.service.channel;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.mysql.channel.ProviderErrorCodeMappingMappper;
import org.springframework.stereotype.Service;
/**
* @author caozehui
* @data 2026-03-02
*/
@Service
public class ProviderErrorCodeMappingServiceImpl extends ServiceImpl<ProviderErrorCodeMappingMappper, ProviderErrorCodeMappingDO> implements ProviderErrorCodeMappingService {
@Override
public ProviderErrorCodeMappingDO getByProviderErrorCode(String providerType, String channel, String originalCode) {
return this.lambdaQuery().eq(ProviderErrorCodeMappingDO::getProviderType, providerType)
.eq(ProviderErrorCodeMappingDO::getChannel, channel)
.eq(ProviderErrorCodeMappingDO::getOriginalCode, originalCode)
.one();
}
}

View File

@@ -1,16 +1,31 @@
package com.njcn.msgpush.module.push.service.message;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import org.apache.ibatis.annotations.Param;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
public interface MessageRecordService {
/**
* 发送消息包括email、sms、app_push
*
* @param messageRecordSendReqVO
* @param reqVOList
* @return 发送是否成功的结果
*/
Boolean send(MessageRecordSendReqVO messageRecordSendReqVO);
boolean send(List<MessageRecordReqVO> reqVOList);
/**
* 处理发送消息
* @param messageRecordDO
* @return 发送是否成功的结果
*/
boolean processSendMsg(MessageRecordDO messageRecordDO);
/**
* 添加消息记录
@@ -18,5 +33,24 @@ public interface MessageRecordService {
* @param messageRecordSendReqVO
* @return
*/
Boolean add(MessageRecordSendReqVO messageRecordSendReqVO);
boolean add(MessageRecordReqVO messageRecordSendReqVO);
MessageRecordDO getById(String messageId);
List<MessageRecordDO> listByIds(Collection<? extends Serializable> ids);
Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO);
boolean update(MessageRecordReqVO reqVO);
boolean delete(List<String> ids);
/**
* 更新消息记录状态
*
* @param messageId
* @param status
* @return
*/
boolean updateStatus(String messageId, String status);
}

View File

@@ -1,12 +1,17 @@
package com.njcn.msgpush.module.push.service.message;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordSendReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
@@ -14,42 +19,102 @@ import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, MessageRecordDO> implements MessageRecordService {
@Autowired
private Sender sender;
@Autowired
public ChannelProviderConfigService channelProviderConfigService;
@Autowired
@Qualifier("messageProviderFactoryMap")
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
@Autowired
private ChannelProviderConfigService channelProviderConfigService;
@Override
public Boolean send(MessageRecordSendReqVO messageRecordSendReqVO) {
MessageRecordDO messageRecordDO = BeanUtil.copyProperties(messageRecordSendReqVO, MessageRecordDO.class);
messageRecordDO.setStatus(MessageStatusConstant.PENDING);
this.save(messageRecordDO);
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
//channelProviderConfigDO.setAppKey("LTAI4FxsR76x2dq3w9c5puUe");
//channelProviderConfigDO.setAppSecret("GxkTR8fsrvHtixTlD9UPmOGli35tZs");
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
boolean sendResult = switch (messageRecordDO.getChannel()) {
case MsgPushConstant.CHANNEL_SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO).sendSms(messageRecordDO);
//case MsgPushConstant.CHANNEL_EMAIL ->
//messageProviderFactory.createEmailSender(channelProviderConfigDO).sendEmail(messageRecordDO.getTemplateParams(), messageRecordDO.getReceiver());
case MsgPushConstant.CHANNEL_APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO).appPush(messageRecordDO);
default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel());
};
@Transactional(rollbackFor = Exception.class)
public boolean send(List<MessageRecordReqVO> reqVOList) {
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
messageRecordDOList.forEach(messageRecordDO -> {
messageRecordDO.setStatus(MessageStatusConstant.PENDING);
});
this.saveBatch(messageRecordDOList);
boolean sendResult = true;
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
sendResult &= this.processSendMsg(messageRecordDO);
}
return sendResult;
}
@Override
public Boolean add(MessageRecordSendReqVO messageRecordSendReqVO) {
return null;
@Transactional(rollbackFor = Exception.class)
public boolean processSendMsg(MessageRecordDO messageRecordDO) {
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
if (ObjectUtil.isNull(messageProviderFactory)) {
throw new RuntimeException("暂不支持该供应商或者该供应商未激活:" + messageRecordDO.getProviderType());
}
boolean sendResult = switch (messageRecordDO.getChannel()) {
case MsgPushConstant.CHANNEL_SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
case MsgPushConstant.CHANNEL_EMAIL ->
messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, new HashMap<>());
case MsgPushConstant.CHANNEL_APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel());
};
this.updateStatus(messageRecordDO.getMessageId(), sendResult ? MessageStatusConstant.SUCCESS : MessageStatusConstant.FAILED);
return sendResult;
}
@Override
public boolean updateStatus(String messageId, String status) {
return this.lambdaUpdate().eq(MessageRecordDO::getMessageId, messageId).set(MessageRecordDO::getStatus, status).update();
}
@Override
public MessageRecordDO getById(String messageId) {
return this.lambdaQuery().eq(MessageRecordDO::getMessageId, messageId).eq(MessageRecordDO::getDeleted, true).one();
}
@Override
public boolean add(MessageRecordReqVO messageRecordSendReqVO) {
MessageRecordDO messageRecordDO = BeanUtil.copyProperties(messageRecordSendReqVO, MessageRecordDO.class);
return this.save(messageRecordDO);
}
@Override
public List<MessageRecordDO> listByIds(Collection<? extends Serializable> ids) {
return this.baseMapper.selectByIds(ids);
}
@Override
public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) {
QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>();
wrapper.lambda()
.eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRecordDO::getChannel, reqVO.getChannel())
.eq(StrUtil.isNotBlank(reqVO.getProviderType()), MessageRecordDO::getProviderType, reqVO.getProviderType())
.eq(StrUtil.isNotBlank(reqVO.getAppName()), MessageRecordDO::getAppName, reqVO.getAppName());
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
}
@Override
public boolean update(MessageRecordReqVO reqVO) {
MessageRecordDO messageRecordDO = BeanUtil.copyProperties(reqVO, MessageRecordDO.class);
return this.updateById(messageRecordDO);
}
@Override
public boolean delete(List<String> ids) {
return this.lambdaUpdate().set(MessageRecordDO::getDeleted, false).update();
}
}

View File

@@ -0,0 +1,50 @@
package com.njcn.msgpush.module.push.service.retry;
import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import java.util.List;
public interface MessageRetryQueueService {
/**
* 添加消息到重试队列
*
* @param message 消息
* @param strategy 重试策略
*/
void addRetryMessage(MessageRecordDO message, RetryStrategyEnum strategy);
/**
* 批量处理重试消息
*
* @param channel 渠道类型
*/
void processRetryBatch(String channel);
/**
* 手动触发重试
*
* @param messageIds 消息ID列表
* @return 是否成功
*/
void manualRetry(List<String> messageIds);
/**
* 移除重试记录
*
* @param messageId 消息ID
* @return 是否成功
*/
boolean removeRetryRecord(String messageId);
/**
* 分页查询重试队列
*
* @param reqVO 查询条件
* @return 分页结果
*/
PageResult<MessageRetryQueueDO> getPage(MessageRetryQueueReqVO reqVO);
}

View File

@@ -0,0 +1,211 @@
package com.njcn.msgpush.module.push.service.retry;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
import com.njcn.msgpush.module.push.enums.RetryStrategyEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Set;
@Slf4j
@Service
public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueMapper, MessageRetryQueueDO> implements MessageRetryQueueService {
@Autowired
private MessageRetryRedisDAO messageRetryRedisDAO;
@Autowired
private MessageRecordService messageRecordService;
@Autowired
public ChannelProviderConfigService channelProviderConfigService;
/**
* 默认每次处理的消息数量
*/
private static final int DEFAULT_BATCH_SIZE = 100;
/**
* 默认重试间隔(秒)
*/
private static final int DEFAULT_RETRY_INTERVAL = 300; // 5分钟
/**
* 默认最大重试次数
*/
private static final int DEFAULT_MAX_RETRY_COUNT = 3;
@Override
@Transactional(rollbackFor = Exception.class)
public void addRetryMessage(MessageRecordDO message, RetryStrategyEnum strategy) {
// 检查是否已存在重试记录
MessageRetryQueueDO existing = super.baseMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, message.getMessageId())
);
// 消息不在重试队列中
if (ObjectUtil.isNull(existing)) {
// 创建重试记录
MessageRetryQueueDO retryRecord = new MessageRetryQueueDO();
retryRecord.setMessageId(message.getMessageId());
retryRecord.setChannel(message.getChannel());
retryRecord.setReceiver(message.getReceiver());
retryRecord.setRetryCount(0);
retryRecord.setMaxRetry(message.getRetryCount());
retryRecord.setFirstFailTime(LocalDateTime.now());
retryRecord.setLastErrorMsg(message.getErrorMsg());
// 计算下次重试时间
LocalDateTime nextRetryTime = strategy.calculateNextRetryTime(0, DEFAULT_RETRY_INTERVAL);
retryRecord.setNextRetryTime(nextRetryTime);
super.baseMapper.insert(retryRecord);
// 同步到Redis
messageRetryRedisDAO.addToRetryQueue(message);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void processRetryBatch(String channel) {
// 从数据库查询需要重试的消息
// List<MessageRetryQueueDO> retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE);
// 从redis中查询需要重试的消息
Set<String> needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), DEFAULT_BATCH_SIZE);
// 没有需要重试的消息
if (needRetryMessageIds.isEmpty()) {
return;
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(needRetryMessageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void manualRetry(List<String> messageIds) {
if (CollUtil.isNotEmpty(messageIds)) {
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
// 立即重试(设置下次重试时间为现在)
//retryRecord.setNextRetryTime(LocalDateTime.now());
//retryQueueMapper.updateById(retryRecord);
// 更新redis中的分数
//messageRetryRedisDAO.addToRetryQueue(retryRecord.getChannel(), messageId, LocalDateTime.now());
processSingleRetry(messageRecordDO);
}
}
}
@Override
public PageResult<MessageRetryQueueDO> getPage(MessageRetryQueueReqVO reqVO) {
return null;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean removeRetryRecord(String messageId) {
MessageRetryQueueDO retryRecord = super.baseMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, messageId)
);
if (retryRecord == null) {
log.warn("未找到重试记录: messageId={}", messageId);
return false;
}
// 从数据库删除
super.baseMapper.deleteByMessageId(messageId);
// 从Redis删除
messageRetryRedisDAO.removeFromRetryQueue(retryRecord.getChannel(), messageId);
log.info("删除重试记录成功: messageId={}", messageId);
return true;
}
/**
* 处理单个消息的重试逻辑
*/
private void processSingleRetry(MessageRecordDO messageRecordDO) {
// todo 调用消息发送接口进行重试
boolean sendResult = messageRecordService.processSendMsg(messageRecordDO);
if (sendResult) {
// 重试成功,删除重试记录
super.baseMapper.deleteByMessageId(messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
log.info("消息重试成功并已清除: messageId={}", messageRecordDO.getMessageId());
} else {
// 重试失败,更新重试信息
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
handleRetryFailure(messageRecordDO);
}
}
/**
* 模拟重试过程(实际应用中应替换为真实的消息发送逻辑)
*/
private boolean simulateRetryProcess(String messageId) {
// 这里应该是调用实际的消息发送服务
// 暂时随机返回成功或失败用于演示
return Math.random() > 0.3; // 70%成功率
}
/**
* 处理重试失败的情况
*/
private void handleRetryFailure(MessageRecordDO messageRecordDO) {
MessageRetryQueueDO messageRetryQueueDO = this.getById(messageRecordDO.getMessageId());
int newRetryCount = messageRetryQueueDO.getRetryCount() + 1;
if (newRetryCount >= messageRetryQueueDO.getMaxRetry()) {
// 达到最大重试次数,标记为最终失败
// 更新消息的状态为final_failed
messageRecordService.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED);
// 数据库中不能删除
// retryQueueMapper.deleteByMessageId(messageRecordDO.getMessageId());
// redis中可以删除
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
} else {
// 还可以继续重试,更新重试信息
RetryStrategyEnum strategy = RetryStrategyEnum.EXPONENTIAL_BACKOFF;
LocalDateTime nextRetryTime = strategy.calculateNextRetryTime(newRetryCount, DEFAULT_RETRY_INTERVAL);
super.baseMapper.updateRetryInfo(
messageRecordDO.getMessageId(),
newRetryCount,
nextRetryTime,
LocalDateTime.now(),
"重试失败:" + LocalDateTime.now()
);
// 更新Redis中的下次重试时间
messageRetryRedisDAO.addToRetryQueue(messageRecordDO);
}
}
}

View File

@@ -7,11 +7,6 @@ import org.springframework.web.client.RestTemplate;
import java.util.Map;
/**
* @author caozehui
* @data 2026-02-10
* @description restTemplate工具类
*/
@Slf4j
@Component
public class RestTemplateUtil {
@@ -50,9 +45,9 @@ public class RestTemplateUtil {
}
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Object> request = new HttpEntity<>(requestBody, headers);
HttpEntity<Object> entity = new HttpEntity<>(requestBody, headers);
log.info("发送POST请求到: {}", url);
ResponseEntity<T> response = restTemplate.postForEntity(url, request, responseType);
ResponseEntity<T> response = restTemplate.postForEntity(url, entity, responseType);
log.info("POST请求响应状态: {}", response.getStatusCode());
return response;
} catch (Exception e) {
@@ -72,9 +67,9 @@ public class RestTemplateUtil {
}
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity<Map<String, String>> request = new HttpEntity<>(formData, headers);
HttpEntity<Map<String, String>> entity = new HttpEntity<>(formData, headers);
log.info("发送POST表单请求到: {}", url);
ResponseEntity<T> response = restTemplate.postForEntity(url, request, responseType);
ResponseEntity<T> response = restTemplate.postForEntity(url, entity, responseType);
log.info("POST表单请求响应状态: {}", response.getStatusCode());
return response;
} catch (Exception e) {
@@ -99,10 +94,10 @@ public class RestTemplateUtil {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Object> request = new HttpEntity<>(requestBody, headers);
HttpEntity<Object> entity = new HttpEntity<>(requestBody, headers);
log.info("发送POST请求获取对象到: {}", url);
T result = restTemplate.postForObject(url, request, responseType);
T result = restTemplate.postForObject(url, entity, responseType);
log.info("POST请求成功获取对象");
return result;
} catch (Exception e) {

View File

@@ -0,0 +1,133 @@
package com.njcn.msgpush.module.push.util;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author caozehui
* @data 2026-02-28
*/
@Slf4j
@Component
public class SmsApiUtil {
@Resource
private RestTemplateUtil restTemplateUtil;
/**
* 群发短信接口
*
* @param mobiles 手机号列表,多个手机号用逗号分隔
* @param content 短信内容
* @return 发送结果
*/
public ResponseEntity<String> sendBatchSms(String mobiles, String content) {
return sendBatchSms(SmsConstant.SMS_ACCOUNT, SmsConstant.SMS_PASSWORD, mobiles, content);
}
/**
* 群发短信接口
*
* @param account 账号
* @param password 密码
* @param mobiles 手机号列表,多个手机号用逗号分隔
* @param content 短信内容
* @return 发送结果
*/
public ResponseEntity<String> sendBatchSms(String account, String password, String mobiles, String content) {
try {
String[] mobileArray = mobiles.split(",");
log.info("开始群发短信,手机号数量: {}, 扩展号码: {}", mobileArray.length, SmsConstant.SMS_ACCESS_CODE);
// 构建请求参数
Map<String, Object> request = new HashMap<>();
request.put("action", "send");
request.put("account", account);
request.put("password", password);
request.put("mobile", mobiles);
request.put("content", content);
request.put("extno", SmsConstant.SMS_ACCESS_CODE);
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", SmsConstant.SMS_CONTENT_TYPE);
// 发送请求
ResponseEntity<String> response = restTemplateUtil.post(
SmsConstant.SMS_API_URL,
request,
headers,
String.class
);
log.info("群发短信完成,响应状态: {}, 响应内容: {}",
response.getStatusCode(), response.getBody());
return response;
} catch (Exception e) {
log.error("群发短信失败: {}", e.getMessage(), e);
throw new RuntimeException("群发短信失败", e);
}
}
/**
* 消息状态报告查询接口
*
* @param size 查询数量建议不超过1000
* @return 状态报告查询结果
*/
public ResponseEntity<String> messageReport(Integer size) {
return messageReport(SmsConstant.SMS_ACCOUNT, SmsConstant.SMS_PASSWORD, size);
}
/**
* 消息状态报告查询接口
*
* @param account 账号
* @param password 密码
* @param size 查询数量建议不超过1000
* @return 状态报告查询结果
*/
public ResponseEntity<String> messageReport(String account, String password, Integer size) {
try {
log.info("开始查询消息状态报告,查询数量: {}", size);
// 构建请求参数
Map<String, Object> request = new HashMap<>();
request.put("action", "report");
request.put("account", account);
request.put("password", password);
request.put("size", size != null ? size : 1000);
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", SmsConstant.SMS_CONTENT_TYPE);
// 发送请求
ResponseEntity<String> response = restTemplateUtil.post(
SmsConstant.SMS_API_URL,
request,
headers,
String.class
);
log.info("消息状态报告查询完成,响应状态: {}, 响应内容: {}",
response.getStatusCode(), response.getBody());
return response;
} catch (Exception e) {
log.error("消息状态报告查询失败: {}", e.getMessage(), e);
throw new RuntimeException("消息状态报告查询失败", e);
}
}
}

View File

@@ -0,0 +1,64 @@
package com.njcn.msgpush.module.push.util;
/**
* 短信发送的一些常量微服务添加在nacos中
*
* @author hongawen
* @version 1.0.0
* @date 2023年08月24日 18:25
*/
public interface SmsConstant {
String DEFAULT_CONNECT_TIME_OUT = "sun.net.client.defaultConnectTimeout";
String DEFAULT_READ_TIME_OUT = "sun.net.client.defaultReadTimeout";
//短信API产品名称短信产品名固定无需修改
String PRODUCT = "Dysmsapi";
//短信API产品域名接口地址固定无需修改
String DOMAIN = "dysmsapi.aliyuncs.com";
//accessKeyId
String ACCESS_KEY_ID = "LTAI4FxsR76x2dq3w9c5puUe";
//accessKeySecret
String ACCESS_KEY_SECRET = "GxkTR8fsrvHtixTlD9UPmOGli35tZs";
//短信所属地
String LOCATION = "cn-hangzhou";
/**
* 通知签名
*/
String SGIN = "灿能云";
/**
* 验证码签名
*/
String VERIFICATION_SIGNATURE = "南京灿能电力自动化股份";
/**
* 短信接口地址
*/
String SMS_API_URL = "https://sms.ymeeting.cn/smsv2";
/**
* 短信接口内容类型
*/
String SMS_CONTENT_TYPE = "application/json;charset=utf-8";
/**
* 接口编码方式
*/
String SMS_CHARSET = "UTF-8";
/**
* 短信接口账号
*/
String SMS_ACCOUNT = "925631";
/**
* 短信接口密码
*/
String SMS_PASSWORD = "AMW2pOVrdky";
/**
* 虚拟接入码
*/
String SMS_ACCESS_CODE = "106905631";
}

View File

@@ -1,78 +1,59 @@
package com.njcn.msgpush.module.push.sms;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.AliyunSmsSender;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author caozehui
* @data 2026-02-06
*/
@Slf4j
@SpringBootTest
public class MsgPushClientTest {
@Autowired
@Qualifier("messageProviderFactoryMap")
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
private MessageRecordService messageRecordService;
@Test
public void testSendSms() throws Exception {
ChannelProviderConfigDO channelProviderConfigDO = new ChannelProviderConfigDO();
channelProviderConfigDO.setAppKey("LTAI4FxsR76x2dq3w9c5puUe");
channelProviderConfigDO.setAppSecret("GxkTR8fsrvHtixTlD9UPmOGli35tZs");
SmsSender smsSender = messageProviderFactoryMap.get(MsgPushConstant.PROVIDER_TYPE_ALI_YUN).createSmsSender(channelProviderConfigDO);
MessageRecordDO message = new MessageRecordDO();
message.setMessageId("1c2w1e2a3c456");
message.setChannel(MsgPushConstant.CHANNEL_SMS);
message.setTitle("灿能云");
message.setReceiver("18839431215");
message.setTemplateCode("SMS_481710295");
message.setTemplateParams("{\"code\":\"123456\"}");
message.setProviderType(MsgPushConstant.PROVIDER_TYPE_ALI_YUN);
boolean b = smsSender.sendSms(message);
System.out.println(System.currentTimeMillis() + " " + b);
}
@Test
public void testSendBatchSms() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put(AliyunSmsSender.SIGN_NAME, List.of("灿能云"));
params.put(AliyunSmsSender.TEMPLATE_CODE, "SMS_481710295");
params.put(AliyunSmsSender.TEMPLATE_PARAM, List.of("{\"code\":\"123456\"}"));
List<MessageRecordDO> messageIdList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
MessageRecordDO message = new MessageRecordDO();
message.setMessageId(i + "c518e2a3c");
public void testSend() {
List<MessageRecordReqVO> messageIdList = new ArrayList<>();
// for (int i = 0; i < 5; i++) {
// MessageRecordReqVO message = new MessageRecordReqVO();
// message.setMessageId(i + "c518e2a3c");
// message.setAppName("NPQS-9500");
// message.setChannel(MsgPushConstant.CHANNEL_SMS);
// message.setTitle("灿能云");
// message.setReceiver("18839431215");
// message.setTemplateCode("SMS_481710295");
// message.setTemplateParams("{\"code\":\"65432" + i + "\"}");
// message.setProviderType(MsgPushConstant.PROVIDER_TYPE_ALI_YUN);
// messageIdList.add(message);
// }
for (int i = 0; i < 1; i++) {
MessageRecordReqVO message = new MessageRecordReqVO();
message.setMessageId(i + "2345dc");
message.setAppName("NPQS-9000");
message.setChannel(MsgPushConstant.CHANNEL_SMS);
message.setTitle("灿能云");
message.setReceiver("18839431215");
message.setTemplateCode("SMS_481710295");
message.setTemplateParams("{\"code\":\"65432" + i + "\"}");
message.setProviderType(MsgPushConstant.PROVIDER_TYPE_ALI_YUN);
message.setContent("【南京灿能电力】测试短信" + i + ",请忽略。" + LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
message.setProviderType(MsgPushConstant.PROVIDER_TYPE_TELECOM);
messageIdList.add(message);
}
boolean sendResult = messageRecordService.send(messageIdList);
System.out.println(sendResult);
}
ChannelProviderConfigDO channelProviderConfigDO = new ChannelProviderConfigDO();
channelProviderConfigDO.setAppKey("LTAI4FxsR76x2dq3w9c5puUe");
channelProviderConfigDO.setAppSecret("GxkTR8fsrvHtixTlD9UPmOGli35tZs");
SmsSender smsSender = messageProviderFactoryMap.get(MsgPushConstant.PROVIDER_TYPE_ALI_YUN).createSmsSender(channelProviderConfigDO);
@Test
public void test() {
boolean b = smsSender.sendBatchSms(messageIdList);
System.out.println(System.currentTimeMillis() + " " + b);
}
}