1、结构调整

2、抽象工厂优化
This commit is contained in:
2026-03-31 19:35:21 +08:00
parent 87757b352c
commit ebdbdbeb41
667 changed files with 1240 additions and 50173 deletions

View File

@@ -29,11 +29,6 @@
<artifactId>msgpush-module-system-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>msgpush-module-infra-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>msgpush-module-push-api</artifactId>

View File

@@ -7,8 +7,8 @@ import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
/**
* @author caozehui
* @data 2026-02-11
* 各服务商自己的 sender 工厂。
* 约束是:无论是否支持该渠道,都必须返回 sender 实例,不能返回 null。
*/
public interface MessageProviderFactory {
/**
@@ -19,10 +19,10 @@ public interface MessageProviderFactory {
/**
* 创建邮件发送器
*/
EmailSender createEmailSender(ChannelProviderConfigDO config,Sender sender);
EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender);
/**
* 创建APP推送发送器
*/
AppPushSender createAppPushSender(ChannelProviderConfigDO config,Sender sender);
AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender);
}

View File

@@ -1,26 +1,28 @@
package com.njcn.msgpush.module.push.client.factory.impl;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.ProviderFactorySupport;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.SendOutcome;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.AliyunEmailSender;
import com.njcn.msgpush.module.push.client.sender.impl.AliyunSmsSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultAppPushSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultEmailSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultSmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.email.AliyunEmailSender;
import com.njcn.msgpush.module.push.client.sender.impl.sms.AliyunSmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import lombok.extern.slf4j.Slf4j;
/**
* @author caozehui
* @data 2026-02-11
*/
@Slf4j
public class AliyunProviderFactory implements MessageProviderFactory {
@Override
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getAppSecret())) {
return new FixedResultSmsSender(SendOutcome.CONFIG_INVALID, "当前服务商短信渠道配置不完整");
}
AliYunMailSetting aliYunSmsSetting = AliYunMailSetting.builder()
.accessKeyId(config.getAppKey())
.accessKeySecret(config.getAppSecret())
@@ -30,21 +32,24 @@ public class AliyunProviderFactory implements MessageProviderFactory {
return new AliyunSmsSender(aliYunSmsSetting, sender);
}
@Override
public EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender) {
AliYunMailSetting aliYunSmsSetting = AliYunMailSetting.builder()
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getAppSecret())) {
return new FixedResultEmailSender(SendOutcome.CONFIG_INVALID, "当前服务商邮件渠道配置不完整");
}
AliYunMailSetting aliYunMailSetting = AliYunMailSetting.builder()
.accessKeyId(config.getAppKey())
.accessKeySecret(config.getAppSecret())
.regionId("cn-hangzhou")
.endpoint("dm.aliyuncs.com")
.build();
return new AliyunEmailSender(aliYunSmsSetting, sender);
return new AliyunEmailSender(aliYunMailSetting, sender);
}
@Override
public AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender) {
log.error("阿里云暂不支持app推送");
return null;
// 当前服务商没有该能力时,返回固定结果 sender避免主流程处理 null。
return new FixedResultAppPushSender(SendOutcome.UNSUPPORTED_CHANNEL, "当前服务商不支持 APP 推送");
}
}

View File

@@ -1,24 +1,27 @@
package com.njcn.msgpush.module.push.client.factory.impl;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.ProviderFactorySupport;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.SendOutcome;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.TelecomSmsSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultAppPushSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultEmailSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultSmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.sms.TelecomSmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import lombok.extern.slf4j.Slf4j;
/**
* @author caozehui
* @data 2026-02-11
*/
@Slf4j
public class TelecomProviderFactory implements MessageProviderFactory {
@Override
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getAppSecret(), config.getApiUrl())) {
return new FixedResultSmsSender(SendOutcome.CONFIG_INVALID, "当前服务商短信渠道配置不完整");
}
TelecomSmsSetting telecomSmsSetting = TelecomSmsSetting.builder()
.account(config.getAppKey())
.password(config.getAppSecret())
@@ -31,13 +34,13 @@ public class TelecomProviderFactory implements MessageProviderFactory {
@Override
public EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender) {
log.error("电信暂不支持email推送");
return null;
// 当前服务商没有该能力时,返回固定结果 sender避免主流程处理 null。
return new FixedResultEmailSender(SendOutcome.UNSUPPORTED_CHANNEL, "当前服务商不支持邮件发送");
}
@Override
public AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender) {
log.error("电信暂不支持app推送");
return null;
// 当前服务商没有该能力时,返回固定结果 sender避免主流程处理 null。
return new FixedResultAppPushSender(SendOutcome.UNSUPPORTED_CHANNEL, "当前服务商不支持 APP 推送");
}
}

View File

@@ -1,42 +1,54 @@
package com.njcn.msgpush.module.push.client.factory.impl;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.ProviderFactorySupport;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.SendOutcome;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.UniPushAppPushSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultAppPushSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultEmailSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultSmsSender;
import com.njcn.msgpush.module.push.client.sender.impl.apppush.UniPushAppPushSender;
import com.njcn.msgpush.module.push.client.setting.impl.UniPushAppPushSetting;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import lombok.extern.slf4j.Slf4j;
/**
* @author caozehui
* @data 2026-02-11
*/
@Slf4j
public class UniPushProviderFactory implements MessageProviderFactory {
private static final String APP_ID = "appId";
private static final String MASTER_SECRET = "masterSecret";
@Override
public SmsSender createSmsSender(ChannelProviderConfigDO config,Sender sender) {
log.error("uniPush暂不支持短信推送");
return null;
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
// 当前服务商没有该能力时,返回固定结果 sender避免主流程处理 null。
return new FixedResultSmsSender(SendOutcome.UNSUPPORTED_CHANNEL, "当前服务商不支持短信发送");
}
@Override
public EmailSender createEmailSender(ChannelProviderConfigDO config,Sender sender) {
log.error("uniPush暂不支持email推送");
return null;
public EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender) {
// 当前服务商没有该能力时,返回固定结果 sender避免主流程处理 null。
return new FixedResultEmailSender(SendOutcome.UNSUPPORTED_CHANNEL, "当前服务商不支持邮件发送");
}
@Override
public AppPushSender createAppPushSender(ChannelProviderConfigDO config,Sender sender) {
UniPushAppPushSetting uniPushAppPushSetting = UniPushAppPushSetting.builder()
.appId("")
.appKey(config.getAppKey())
.uniAppSecret(config.getAppSecret())
.masterSecret("")
.build();
public AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender) {
JSONObject extraConfig = StrUtil.isBlank(config.getExtraConfig()) ? new JSONObject() : JSON.parseObject(config.getExtraConfig());
String appId = extraConfig.getString(APP_ID);
String masterSecret = extraConfig.getString(MASTER_SECRET);
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(appId, config.getAppKey(), config.getAppSecret(), masterSecret)) {
return new FixedResultAppPushSender(SendOutcome.CONFIG_INVALID, "当前服务商 APP 推送渠道配置不完整");
}
UniPushAppPushSetting uniPushAppPushSetting = new UniPushAppPushSetting(
appId,
config.getAppKey(),
config.getAppSecret(),
masterSecret
);
return new UniPushAppPushSender(uniPushAppPushSetting, sender);
}
}

View File

@@ -3,14 +3,13 @@ package com.njcn.msgpush.module.push.client.sender;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
/**
* @author caozehui
* @data 2026-02-11
* APP 推送发送器。
* 统一返回 SendResult由主流程决定状态流转和是否重试。
*/
public interface AppPushSender {
/**
* @param message 消息
* @return 发送结果
* 发送 APP 推送。
*/
boolean appPush(MessageRecordDO message);
SendResult appPush(MessageRecordDO message);
}

View File

@@ -5,16 +5,13 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.Map;
/**
* @author caozehui
* @data 2026-02-11
* 邮件发送器。
* 统一返回 SendResult由主流程决定状态流转和是否重试。
*/
public interface EmailSender {
/**
* 发送单条邮件
*
* @param message 消息
* @param params 参数
* @return 发送结果
* 发送邮件
*/
boolean sendEmail(MessageRecordDO message, Map<String, Object> params);
SendResult sendEmail(MessageRecordDO message, Map<String, Object> params);
}

View File

@@ -1,5 +1,10 @@
package com.njcn.msgpush.module.push.client.sender;
import cn.hutool.core.util.StrUtil;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
@@ -9,15 +14,12 @@ import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author caozehui
* @data 2026-03-02
*/
@Component
public class Sender {
@@ -51,4 +53,118 @@ public class Sender {
public final ScheduledExecutorService MSG_CALLBACK_THREAD_POOL_SCHEDULER = Executors.newScheduledThreadPool(5);
/**
* 将服务商原始错误码映射为平台统一错误结果。
* 这里不直接修改消息状态,只负责产出统一 SendResult。
*/
public SendResult buildFailureResult(MessageRecordDO message,
String providerRawCode,
String providerRawMessage,
String defaultErrorCode,
String defaultMessage,
boolean retryableFallback) {
ProviderErrorCodeMappingDO mappingDO = null;
if (StrUtil.isNotBlank(providerRawCode)) {
mappingDO = this.providerErrorCodeMappingService.getByProviderErrorCode(
message.getProviderType(), message.getChannel(), providerRawCode);
}
boolean retryable = retryableFallback;
String errorCode = StrUtil.blankToDefault(defaultErrorCode, providerRawCode);
String resultMessage = StrUtil.blankToDefault(providerRawMessage, defaultMessage);
if (mappingDO != null) {
retryable = mappingDO.getShouldRetry() != null && mappingDO.getShouldRetry() == 1;
errorCode = StrUtil.blankToDefault(mappingDO.getUnifiedErrorCode(), errorCode);
resultMessage = StrUtil.blankToDefault(
mappingDO.getUnifiedErrorMessage(),
StrUtil.blankToDefault(mappingDO.getOriginalMessage(), resultMessage)
);
}
if (StrUtil.isBlank(errorCode)) {
errorCode = "PROVIDER_ERROR";
}
if (StrUtil.isBlank(resultMessage)) {
resultMessage = StrUtil.blankToDefault(defaultMessage, MsgPushConstant.ERROR_MSG_UNKNOWN);
}
return SendResult.builder()
.outcome(retryable ? SendOutcome.RETRYABLE_FAILED : SendOutcome.FAILED)
.errorCode(errorCode)
.message(resultMessage)
.providerRawCode(providerRawCode)
.retryable(retryable)
.build();
}
public SendResult buildTimeoutResult() {
return SendResult.builder()
.outcome(SendOutcome.RETRYABLE_FAILED)
.errorCode("THIRD_PARTY_TIMEOUT")
.message("第三方服务调用超时")
.retryable(true)
.build();
}
public SendResult buildCallFailedResult(String message) {
return SendResult.builder()
.outcome(SendOutcome.RETRYABLE_FAILED)
.errorCode("THIRD_PARTY_CALL_FAILED")
.message(message)
.retryable(true)
.build();
}
/**
* 异步回执场景下,沿用和主流程一致的状态落库方式。
* 这样可以保证同步发送结果和异步回执结果使用同一套状态语义。
*/
public void applyCallbackResult(MessageRecordDO message, SendResult sendResult) {
String status = this.mapOutcomeToStatus(sendResult.getOutcome());
message.setStatus(status);
message.setErrorCode(sendResult.getErrorCode());
message.setErrorMsg(sendResult.getMessage());
if (SendOutcome.SUCCESS.equals(sendResult.getOutcome())) {
this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getMessageId()));
this.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
message.setNextRetryTime(null);
} else if (SendOutcome.RETRYABLE_FAILED.equals(sendResult.getOutcome())) {
this.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
} else if (SendOutcome.FAILED.equals(sendResult.getOutcome())) {
this.messageRetryQueueService.deleteByMessageIds(Collections.singletonList(message.getMessageId()));
this.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
}
this.messageRecordMapper.updateStatusAndErrorInfo(
message.getMessageId(),
message.getStatus(),
message.getErrorCode(),
message.getErrorMsg(),
message.getLastRetryTime(),
message.getNextRetryTime()
);
this.messageRetryHistoryService.updateStatusAndErrorInfo(
message.getMessageId(),
message.getStatus(),
message.getErrorCode(),
message.getErrorMsg()
);
}
private String mapOutcomeToStatus(SendOutcome outcome) {
if (outcome == null) {
return MsgStatusConstant.FAILED;
}
return switch (outcome) {
case SUCCESS -> MsgStatusConstant.SUCCESS;
case ACCEPTED -> MsgStatusConstant.SENDING;
case FAILED -> MsgStatusConstant.FAILED;
case RETRYABLE_FAILED -> MsgStatusConstant.RETRYABLE_FAILED;
case UNSUPPORTED_CHANNEL -> MsgStatusConstant.UNSUPPORTED_CHANNEL;
case CONFIG_INVALID -> MsgStatusConstant.CONFIG_INVALID;
};
}
}

View File

@@ -5,30 +5,23 @@ import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.List;
/**
* @author caozehui
* @data 2026-02-11
* 短信发送器。
* 统一返回 SendResult由主流程决定状态流转和是否重试。
*/
public interface SmsSender {
/**
* 向多个手机号发送单条短信
*
* @param message 消息
* @return 发送结果
*/
boolean sendSms(MessageRecordDO message);
/**
* 向多个手机号发送短信,支持发送不同签名、不同的模板、不同模板变量的短信
*
* @param messageList 消息集合
* @return 发送结果
* 发送单条短信
*/
boolean sendBatchSms(List<MessageRecordDO> messageList);
SendResult sendSms(MessageRecordDO message);
/**
* 查询模板信息
*
* @param templateIdentifier 模板唯一标识符例如模板id、模板code
* 批量发送短信。
*/
List<SendResult> sendBatchSms(List<MessageRecordDO> messageList);
/**
* 查询短信模板。
*/
void queryTemplate(String templateIdentifier);
}

View File

@@ -1,131 +0,0 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dm20151123.Client;
import com.aliyun.dm20151123.models.SingleSendMailRequest;
import com.aliyun.dm20151123.models.SingleSendMailResponse;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static com.aliyun.teautil.Common.toJSONString;
/**
* @author caozehui
* @data 2026-02-11
*/
@Slf4j
public class AliyunEmailSender implements EmailSender {
private Sender sender;
public static final String ACCOUNT_NAME = "accountName";
public static final String REPLY_TO_ADDRESS = "replyToAddress";
public static final String FROM_ALIAS = "fromAlias";
public static final String SUBJECT = "subject";
public static final String HTML_BODY = "htmlBody";
public static final String TEXT_BODY = "textBody";
private Client emailClient;
public AliyunEmailSender(AliYunMailSetting aliYunMailSetting, Sender sender) {
this.sender = sender;
if (ObjectUtil.isNotNull(aliYunMailSetting)) {
Config config = new Config()
.setAccessKeyId(aliYunMailSetting.getAccessKeyId())
.setAccessKeySecret(aliYunMailSetting.getAccessKeySecret())
.setRegionId(aliYunMailSetting.getRegionId())
.setEndpoint(aliYunMailSetting.getEndpoint());
try {
this.emailClient = new com.aliyun.dm20151123.Client(config);
} catch (Exception e) {
log.error("阿里云-邮件服务初始化失败,请检查配置信息");
throw new RuntimeException(e);
}
}
}
@Override
public boolean sendEmail(MessageRecordDO message, Map<String, Object> params) {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MsgStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions();
runtimeOptions.autoretry = true;
String extraInfo = message.getExtraInfo();
JSONObject jsonObject = JSON.parseObject(extraInfo);
SingleSendMailRequest request = new SingleSendMailRequest()
.setAccountName(jsonObject.get(ACCOUNT_NAME).toString())
.setAddressType(1)
.setReplyToAddress((boolean) jsonObject.get(REPLY_TO_ADDRESS))
.setToAddress(message.getReceiver()) //目标地址,多个 email 地址可以用逗号分隔,最多 100 个地址(支持邮件组)。
.setSubject(message.getTitle())
.setHtmlBody(message.getContent()) //HtmlBody 和 TextBody 是针对不同类型的邮件
.setTextBody("")
.setFromAlias(jsonObject.get(FROM_ALIAS).toString());
try {
LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now);
SingleSendMailResponse response = this.emailClient.singleSendMailWithOptions(request, runtimeOptions);
LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setStatus(MsgStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true;
} else {
message.setStatus(MsgStatusConstant.FAILED);
message.setErrorCode(response.getStatusCode() + "");
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + "");
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Boolean b = null;
try {
b = future.get(3, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
return b;
}
}

View File

@@ -1,181 +0,0 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import cn.hutool.core.util.ObjectUtil;
import com.aliyun.dysmsapi20170525.Client;
import com.aliyun.dysmsapi20170525.models.QuerySendDetailsRequest;
import com.aliyun.dysmsapi20170525.models.QuerySendDetailsResponse;
import com.aliyun.dysmsapi20170525.models.SendSmsRequest;
import com.aliyun.dysmsapi20170525.models.SendSmsResponse;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static com.aliyun.teautil.Common.toJSONString;
/**
* @author caozehui
* @data 2026-02-11
*/
@Slf4j
public class AliyunSmsSender implements SmsSender {
public static final String OK = "OK";
private Sender sender;
private Client smsClient;
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) {
this.sender = sender;
if (ObjectUtil.isNotNull(aliYunSmsSetting)) {
Config config = new Config()
.setAccessKeyId(aliYunSmsSetting.getAccessKeyId())
.setAccessKeySecret(aliYunSmsSetting.getAccessKeySecret())
.setRegionId(aliYunSmsSetting.getRegionId())
.setEndpoint(aliYunSmsSetting.getEndpoint());
try {
this.smsClient = new Client(config);
} catch (Exception e) {
log.error("阿里云-短信服务初始化失败,请检查配置信息");
throw new RuntimeException(e);
}
}
}
@Override
public boolean sendSms(MessageRecordDO message) {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MsgStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions();
// 设置自动重试默认是不开启的。重试次数默认是3次
runtimeOptions.autoretry = true;
SendSmsRequest request = new SendSmsRequest()
.setPhoneNumbers(message.getReceiver()) //手机号码之间以半角逗号(,)分隔。上限为 1000 个手机号码。
.setSignName(message.getTitle())
.setTemplateCode(message.getTemplateCode())
.setTemplateParam(message.getTemplateParams());
try {
LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now);
// 更新上次重试时间
this.sender.messageRetryQueueService.updateLastRetryTime(message.getMessageId(), now);
SendSmsResponse response = this.smsClient.sendSmsWithOptions(request, runtimeOptions);
LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setStatus(MsgStatusConstant.SUCCESS);
this.getDownInfo(response.body.bizId, message);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true;
} else {
message.setStatus(MsgStatusConstant.FAILED);
message.setErrorCode(response.body.code);
message.setErrorMsg(response.body.message);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.body.code);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
} catch (Exception e) {
log.error("阿里云-短信服务发送失败");
throw new RuntimeException(e);
}
});
Boolean b = null;
try {
b = future.get(3, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
return b;
}
@Override
public boolean sendBatchSms(List<MessageRecordDO> messageList) {
boolean res = true;
for (MessageRecordDO message : messageList) {
res &= this.sendSms(message);
}
return res;
}
@Override
public void queryTemplate(String templateIdentifier) {
}
/**
* 获取下行信息
*
* @param bizId
* @param message
*/
private void getDownInfo(String bizId, MessageRecordDO message) {
this.sender.MSG_CALLBACK_THREAD_POOL_SCHEDULER.schedule(() -> {
QuerySendDetailsRequest request = new QuerySendDetailsRequest()
.setPhoneNumber(message.getReceiver())
.setBizId(bizId)
.setSendDate(message.getSendTime().format(DateTimeFormatter.ofPattern("yyyyMMdd")))
.setCurrentPage(1L)
.setPageSize(10L);
try {
QuerySendDetailsResponse response = this.smsClient.querySendDetails(request);
System.out.println(toJSONString(response));
// if (response.statusCode != HttpStatus.OK.value()) {
response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> {
if (!"DELIVERED".equals(detail.errCode)) {
message.setStatus(MsgStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode);
message.setErrorCode(detail.errCode);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
} else {
message.setStatus(MsgStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
}
});
// }
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 20, TimeUnit.SECONDS);
}
}

View File

@@ -1,315 +0,0 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.Data;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static com.alibaba.fastjson.JSON.toJSON;
/**
* @author caozehui
* @data 2026-02-11
*/
@Data
public class TelecomSmsSender implements SmsSender {
/**
* 短信接口内容类型
*/
private static final String CONTENT_TYPE = "application/json;charset=utf-8";
private TelecomSmsSetting telecomSmsSetting;
private Sender sender;
@Data
private static class TelecomSmsSendResponse {
private String status;
private Double balance;
private List<TelecomSmsSendDetailRes> list;
}
@Data
private static class TelecomSmsSendDetailRes {
//消息ID用于状态报告匹配
private String mid;
//手机号码
private String mobile;
//短信提交错误代码
private Integer result;
}
@Data
private static class TelecomSmsSelectResponse {
private String status;
private Double balance;
private List<TelecomSmsSelectDetailRes> list;
}
@Data
private static class TelecomSmsSelectDetailRes {
private String apmid;
private String apSubmitTime;
private String mobile;
private Integer status;
private String stat;
private String deliverTime;
}
public TelecomSmsSender(TelecomSmsSetting telecomSmsSetting, Sender sender) {
this.telecomSmsSetting = telecomSmsSetting;
this.sender = sender;
}
@Override
public boolean sendSms(MessageRecordDO message) {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MsgStatusConstant.SENDING);
// 构建请求参数
Map<String, Object> request = new HashMap<>();
boolean isTemplateSend = StrUtil.isNotBlank(message.getTemplateCode());
request.put("account", telecomSmsSetting.getAccount());
request.put("password", telecomSmsSetting.getPassword());
request.put("extno", telecomSmsSetting.getExtno());
if (isTemplateSend) {
request.put("action", "templatep2p");
Map<String, Object> templateJsonMap = new HashMap<>();
templateJsonMap.put("templateID", message.getTemplateCode());
JSONObject jsonObject = JSON.parseObject(message.getTemplateParams());
Map<String, String> variable = new HashMap<>();
variable.put("mobile", message.getReceiver());
jsonObject.forEach((key, value) -> variable.put(key, value.toString()));
templateJsonMap.put("variable", "[" + toJSON(variable) + "]");
request.put("templateJson", toJSON(templateJsonMap));
} else {
request.put("action", "send");
request.put("mobile", message.getReceiver());
request.put("content", message.getContent());
}
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", CONTENT_TYPE);
LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now);
// 更新上次重试时间
this.sender.messageRetryQueueService.updateLastRetryTime(message.getMessageId(), now);
// this.setMessageIdToRedis(message);
// 发送请求
ResponseEntity<String> response1 = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class
);
LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response1.getBody(), TelecomSmsSendResponse.class);
if (response1.getStatusCode() == HttpStatus.OK) {
String mid = telecomSmsSendResponse.list.get(0).mid;
// 定时任务,指定时间间隔后获取下行信息
this.getDownInfo(mid, message);
return true;
} else {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + "");
message.setStatus(MsgStatusConstant.FAILED);
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
});
Boolean b = null;
try {
b = future.get(3, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
return b;
}
@Override
public boolean sendBatchSms(List<MessageRecordDO> messageList) {
// Map<String, Object> request = new HashMap<>();
// request.put("action", "p2p");
// request.put("account", telecomSmsSetting.getAccount());
// request.put("password", telecomSmsSetting.getPassword());
// Map<String, String> mobileContentKvp = new HashMap<>();
// for (MessageRecordDO message : messageList) {
// mobileContentKvp.put(message.getReceiver(), "【" + message.getTitle() + "】" + message.getContent());
// message.setStatus(MessageStatusConstant.SENDING);
// }
// request.put("mobileContentKvp", JSON.toJSONString(mobileContentKvp));
// request.put("extno", telecomSmsSetting.getExtno());
//
// // 设置请求头
// HttpHeaders headers = new HttpHeaders();
// headers.set("Content-Type", CONTENT_TYPE);
//
// LocalDateTime now = LocalDateTime.now();
// long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
//
// // 发送请求
// ResponseEntity<String> response = this.sender.restTemplateUtil.post(
// telecomSmsSetting.getApiUrl(),
// request,
// headers,
// String.class
// );
// LocalDateTime end = LocalDateTime.now();
// for (MessageRecordDO message : messageList) {
// message.setSendTime(now);
// message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
// }
// TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response.getBody(), TelecomSmsSendResponse.class);
// boolean res = true;
// for (TelecomSmsSendDetailRes telecomSmsSendDetailRes : telecomSmsSendResponse.list) {
// res &= telecomSmsSendDetailRes.result == 0;
//
// // 定时任务,指定时间间隔后获取下行信息
// //this.getDownInfo(telecomSmsSendDetailRes.getMid(), message);
// }
// return res;
boolean res = true;
for (MessageRecordDO message : messageList) {
res &= this.sendSms(message);
}
return res;
}
@Override
public void queryTemplate(String templateIdentifier) {
Map<String, Object> request = new HashMap<>();
request.put("action", "templateSelect");
request.put("account", telecomSmsSetting.getAccount());
request.put("password", telecomSmsSetting.getPassword());
request.put("templateJson", StrUtil.isBlank(templateIdentifier) ? 0 : templateIdentifier);
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", CONTENT_TYPE);
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class
);
System.out.println(toJSON(response));
}
/**
* 获取下行信息
*
* @param mid
* @param message
*/
private void getDownInfo(String mid, MessageRecordDO message) {
this.sender.MSG_CALLBACK_THREAD_POOL_SCHEDULER.schedule(() -> {
// 构建请求参数
Map<String, Object> request = new HashMap<>();
request.put("action", "select");
request.put("account", telecomSmsSetting.getAccount());
request.put("password", telecomSmsSetting.getPassword());
request.put("date", message.getSendTime().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
request.put("condition", "APMID");
request.put("valueList", mid);
// request.put("condition", "MOBILE");
// request.put("valueList", message.getReceiver());
request.remove("mobile");
request.remove("content");
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", CONTENT_TYPE);
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class
);
System.out.println(JSON.toJSONString(JSON.toJSONString(response)));
TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class);
TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0);
if (telecomSmsSelectDetailRes.getStatus() == 5) {
message.setStatus(MsgStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
message.setErrorCode(telecomSmsSelectDetailRes.getStat());
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
}
if (telecomSmsSelectDetailRes.getStatus() == 4) {
message.setStatus(MsgStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
}
}, 20, TimeUnit.SECONDS);
}
/**
* 从redis中判断是否可以发送消息
*
* @param message
* @return 将消息ID缓存到redis中
* @param message
* <p>
* 将消息ID缓存到redis中
* @param message
*/
// private boolean canSend(MessageRecordDO message) {
// String cachedData = this.sender.redisTemplate.opsForValue().get(message.getMessageId());
// return ObjectUtil.isNull(cachedData);
// }
/**
* 将消息ID缓存到redis中
*
* @param message
*/
// private void setMessageIdToRedis(MessageRecordDO message) {
// this.sender.redisTemplate.opsForValue().set(message.getMessageId(), message.getMessageId(), 60, TimeUnit.SECONDS);
// }
}

View File

@@ -1,134 +0,0 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import com.getui.push.v2.sdk.ApiHelper;
import com.getui.push.v2.sdk.GtApiConfiguration;
import com.getui.push.v2.sdk.api.PushApi;
import com.getui.push.v2.sdk.common.ApiResult;
import com.getui.push.v2.sdk.dto.req.Audience;
import com.getui.push.v2.sdk.dto.req.Settings;
import com.getui.push.v2.sdk.dto.req.message.PushDTO;
import com.getui.push.v2.sdk.dto.req.message.PushMessage;
import com.getui.push.v2.sdk.dto.req.message.android.GTNotification;
import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.setting.impl.UniPushAppPushSetting;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.Map;
/**
* @author caozehui
* @data 2026-02-26
*/
public class UniPushAppPushSender implements AppPushSender {
// private UniPushAppPushSetting uniPushAppPushSetting;
private Sender sender;
private ApiHelper apiHelper;
private PushApi pushApi;
public UniPushAppPushSender(UniPushAppPushSetting uniPushAppPushSetting, Sender sender) {
// this.uniPushAppPushSetting = uniPushAppPushSetting;
this.sender = sender;
GtApiConfiguration gtApiConfiguration = new GtApiConfiguration();
gtApiConfiguration.setAppId(uniPushAppPushSetting.getAppId());
gtApiConfiguration.setAppKey(uniPushAppPushSetting.getAppKey());
gtApiConfiguration.setMasterSecret(uniPushAppPushSetting.getMasterSecret());
gtApiConfiguration.setDomain("https://restapi.getui.com/v2/");
this.apiHelper = ApiHelper.build(gtApiConfiguration);
this.pushApi = apiHelper.creatApi(PushApi.class);
}
@Override
public boolean appPush(MessageRecordDO message) {
PushDTO<Audience> pushDTO = this.buildPushDTO(message.getTitle(), message.getContent());
// 进行cid单推
// 设置接收人信息
Audience audience = new Audience();
audience.addCid(message.getReceiver());
pushDTO.setAudience(audience);
ApiResult<Map<String, Map<String, String>>> apiResult = pushApi.pushToSingleByCid(pushDTO);
return apiResult.isSuccess();
}
/**
* 构建推送消息体
*
* @param title
* @param content
* @return
*/
private PushDTO<Audience> buildPushDTO(String title, String content) {
PushDTO<Audience> pushDTO = new PushDTO<>();
// 设置推送参数
pushDTO.setRequestId(System.currentTimeMillis() + "");
// pushDTO.setGroupName("hyqx-group");
//配置推送条件
Settings settings = new Settings();
//消息有效期,走厂商消息需要设置该值
settings.setTtl(3600000);
pushDTO.setSettings(settings);
//安卓在线通道走个推推送时的消息体在线通道不支持ios
PushMessage pushMessage = new PushMessage();
//通知消息
GTNotification notification = new GTNotification();
notification.setTitle(title);
notification.setBody(content);
/**
* intent打开应用内特定页面
* url打开网页地址
* payload自定义消息内容启动应用
* payload_custom自定义消息内容不启动应用
* startapp打开应用首页
* none纯通知无后续动作
*/
//进入特定的页面
// notification.setClickType("intent");
// notification.setIntent("intent://com.getui.push/detail?#Intent;scheme=gtpushscheme;launchFlags=0x4000000;package=com.getui.demo;component=com.getui.demo/com.getui.demo.DemoActivity;S.payload=payloadStr;end");
notification.setClickType("startapp");
pushMessage.setNotification(notification);
pushDTO.setPushMessage(pushMessage);
//透传消息 此格式的透传消息由 unipush 做了特殊处理,会自动展示通知栏。开发者也可自定义其它格式,在客户端自己处理。
// pushMessage.setTransmission(" {title:\"个推通道透传消息标题\",content:\"个推通道透传消息内容\",payload:\"自定义数据\"}");
//设置离线推送时的消息体
// PushChannel pushChannel = new PushChannel();
//安卓离线厂商通道推送的消息体
// AndroidDTO androidDTO = new AndroidDTO();
// Ups ups = new Ups();
// //通知消息
// ThirdNotification thirdNotification = new ThirdNotification();
// ups.setNotification(thirdNotification);
// thirdNotification.setTitle(title + "安卓离线厂商通道通知消息标题");
// thirdNotification.setBody(content + "安卓离线厂商通道通知消息内容");
// thirdNotification.setClickType("intent");
// thirdNotification.setIntent("intent://com.getui.push/detail?#Intent;scheme=gtpushscheme;launchFlags=0x4000000;package=com.getui.demo;component=com.getui.demo/com.getui.demo.DemoActivity;S.payload=payloadStr;end");
// //透传消息
//// ups.setTransmission(" {title:\"安卓离线厂商通道透传消息标题\",content:\"安卓离线厂商通道透传消息内容\",payload:\"自定义数据\"}");
//
// androidDTO.setUps(ups);
// pushChannel.setAndroid(androidDTO);
// //ios离线apn通道推送的消息体
// Alert alert = new Alert();
// alert.setTitle(title+"苹果离线通知栏标题");
// alert.setBody(content+"苹果离线通知栏内容");
// Aps aps = new Aps();
// aps.setContentAvailable(0);//0表示普通通知消息(默认为0);1表示静默推送(无通知栏消息)静默推送时不需要填写其他参数。苹果建议1小时最多推送3条静默消息
// aps.setSound("default");//自定义铃声:系统铃声设置为default; 无声设置为com.gexin.ios.silence或不填
// aps.setAlert(alert);
// IosDTO iosDTO = new IosDTO();
// iosDTO.setAps(aps);
// iosDTO.setType("notify");
// pushChannel.setIos(iosDTO);
// pushDTO.setPushChannel(pushChannel);
return pushDTO;
}
}

View File

@@ -1,7 +1,8 @@
package com.njcn.msgpush.module.push.client.setting.impl;
import com.njcn.msgpush.module.push.client.setting.AppPushSetting;
import lombok.*;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author caozehui
@@ -9,11 +10,17 @@ import lombok.*;
* @description UniPush应用推送配置
*/
@Data
@Builder
@EqualsAndHashCode(callSuper = true)
public class UniPushAppPushSetting extends AppPushSetting {
private String appId;
private String appKey;
private String uniAppSecret;
private String masterSecret;
public UniPushAppPushSetting(String appId, String appKey, String uniAppSecret, String masterSecret) {
this.appId = appId;
this.appKey = appKey;
this.uniAppSecret = uniAppSecret;
this.masterSecret = masterSecret;
}
}

View File

@@ -1,27 +1,16 @@
package com.njcn.msgpush.module.push.constant;
/**
* @author caozehui
* @data 2026-02-26
*/
public class MsgStatusConstant {
//待发送
public static final String PENDING = "pending";
//发送中
public static final String SENDING = "sending";
//发送成功
public static final String SUCCESS = "success";
//发送失败(可重试)
public static final String FAILED = "failed";
//最终失败(超过最大重试次数,系统自动判定)
public static final String RETRYABLE_FAILED = "retryable_failed";
public static final String FINALFAILED = "final_failed";
//黑名单拦截
public static final String BLACKLISTED = "blacklisted";
//配额超限
public static final String QUOTAEXCEEDED = "quota_exceeded";
//频率限制
public static final String RATE_LIMITED = "rate_limited";
//手动终止重试(运维在重试队列管理中主动终止区别于系统自动判定的final_failed)
public static final String UNSUPPORTED_CHANNEL = "unsupported_channel";
public static final String CONFIG_INVALID = "config_invalid";
public static final String ABANDONED = "abandoned";
}

View File

@@ -10,6 +10,8 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.checker.MsgPushGuardChain;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.SendOutcome;
import com.njcn.msgpush.module.push.client.sender.SendResult;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
@@ -23,13 +25,18 @@ import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, MessageRecordDO> implements MessageRecordService {
@@ -40,6 +47,9 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
@Autowired
private MessageRetryHistoryService messageRetryHistoryService;
@Autowired
private MessageRetryQueueService messageRetryQueueService;
@Autowired
public ChannelProviderConfigService channelProviderConfigService;
@@ -60,71 +70,154 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
@Transactional(rollbackFor = Exception.class)
public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList) {
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
messageRecordDOList.forEach(messageRecordDO -> {
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
});
messageRecordDOList.forEach(messageRecordDO -> messageRecordDO.setStatus(MsgStatusConstant.PENDING));
this.saveBatch(messageRecordDOList);
return this.processSendMsg(messageRecordDOList);
}
/**
* 发送主流程。
* 这里统一接收 sender 返回的 SendResult再决定消息状态、重试队列和服务商健康度如何变化。
*/
@Override
@Transactional(rollbackFor = Exception.class)
public List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList) {
msgPushGuardChain.checkAll(messageRecordDOList);
List<MessageSendResultVO> resultList = new ArrayList<>();
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
MessageSendResultVO messageSendResultVO = new MessageSendResultVO();
messageSendResultVO.setMessageId(messageRecordDO.getMessageId());
MessageSendResultVO resultVO = new MessageSendResultVO();
resultVO.setMessageId(messageRecordDO.getMessageId());
if (!MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) {
messageSendResultVO.setResult(false);
if (messageRecordDO.getStatus().equals(MsgStatusConstant.BLACKLISTED)) {
messageSendResultVO.setDetail("黑名单拦截");
}
if (messageRecordDO.getStatus().equals(MsgStatusConstant.QUOTAEXCEEDED)) {
messageSendResultVO.setDetail("配额超限");
}
resultList.add(messageSendResultVO);
resultVO.setResult(false);
resultVO.setDetail(this.buildBlockedDetail(messageRecordDO));
this.updateMessage(messageRecordDO);
resultList.add(resultVO);
continue;
}
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService
.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
if (ObjectUtil.isNull(messageProviderFactory) || ObjectUtil.isNull(channelProviderConfigDO)) {
throw new RuntimeException("暂不支持该供应商或者该供应商未激活:" + messageRecordDO.getProviderType());
}
boolean sendResult = switch (ChannelTypeEnum.getByCode(messageRecordDO.getChannel())) {
case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
case EMAIL -> {
Map<String, Object> params = new HashMap<>();
yield messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, params);
}
case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel());
};
SendResult sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
this.applySendResult(messageRecordDO, sendResult);
this.recordRetryHistory(messageRecordDO);
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
if (sendResult) {
this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MsgStatusConstant.SUCCESS);
messageSendResultVO.setResult(true);
} else {
this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MsgStatusConstant.FAILED);
messageSendResultVO.setResult(false);
messageSendResultVO.setDetail("调用第三方服务发送消息失败");
}
resultList.add(messageSendResultVO);
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
// 更新配额
systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName());
rateLimitRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver());
resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome()));
resultVO.setDetail(this.buildResultDetail(sendResult));
resultList.add(resultVO);
}
return resultList;
}
private SendResult sendMessage(MessageRecordDO messageRecordDO,
ChannelProviderConfigDO channelProviderConfigDO,
MessageProviderFactory messageProviderFactory) {
if (ObjectUtil.isNull(messageProviderFactory)) {
return SendResult.configInvalid("当前服务商未启用或未注册");
}
if (ObjectUtil.isNull(channelProviderConfigDO)) {
return SendResult.configInvalid("当前服务商渠道配置不存在");
}
ChannelTypeEnum channelType = ChannelTypeEnum.getByCode(messageRecordDO.getChannel());
if (channelType == null) {
return SendResult.unsupported("当前发送渠道类型暂不支持");
}
return switch (channelType) {
case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
case EMAIL -> messageProviderFactory.createEmailSender(channelProviderConfigDO, sender)
.sendEmail(messageRecordDO, new HashMap<>());
case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
};
}
/**
* sender 只负责调用第三方并返回统一结果;
* 主流程在这里统一做状态落库、重试编排和健康度更新。
*/
private void applySendResult(MessageRecordDO messageRecordDO, SendResult sendResult) {
messageRecordDO.setStatus(this.mapOutcomeToStatus(sendResult.getOutcome()));
messageRecordDO.setErrorCode(sendResult.getErrorCode());
messageRecordDO.setErrorMsg(sendResult.getMessage());
if (sendResult.getSendTime() != null) {
messageRecordDO.setSendTime(sendResult.getSendTime());
}
if (sendResult.getCostTime() != null) {
messageRecordDO.setCostTime(sendResult.getCostTime());
}
if (StrUtil.isNotBlank(sendResult.getThirdPartyId())) {
messageRecordDO.setThirdPartyId(sendResult.getThirdPartyId());
}
// 成功或已受理都视为本次调用已正常投递,更新成功健康度并消耗配额。
if (SendOutcome.SUCCESS.equals(sendResult.getOutcome()) || SendOutcome.ACCEPTED.equals(sendResult.getOutcome())) {
messageRecordDO.setErrorCode(null);
messageRecordDO.setErrorMsg(null);
messageRecordDO.setNextRetryTime(null);
channelProviderConfigService.successUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName());
rateLimitRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver());
// 可重试失败统一进入重试队列,避免 sender 内部各自处理重试逻辑。
} else if (SendOutcome.RETRYABLE_FAILED.equals(sendResult.getOutcome())) {
messageRetryQueueService.saveOrUpdateRetryMessage(messageRecordDO);
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
} else if (SendOutcome.FAILED.equals(sendResult.getOutcome())) {
messageRecordDO.setNextRetryTime(null);
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
} else {
// 不支持能力和配置异常都不自动重试。
messageRecordDO.setNextRetryTime(null);
}
this.updateMessage(messageRecordDO);
}
private void recordRetryHistory(MessageRecordDO messageRecordDO) {
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
messageRetryHistoryDO.setStatus(messageRecordDO.getStatus());
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
}
private boolean isSuccessOutcome(SendOutcome outcome) {
return SendOutcome.SUCCESS.equals(outcome) || SendOutcome.ACCEPTED.equals(outcome);
}
private String buildBlockedDetail(MessageRecordDO messageRecordDO) {
if (MsgStatusConstant.BLACKLISTED.equals(messageRecordDO.getStatus())) {
return "黑名单拦截";
}
if (MsgStatusConstant.QUOTAEXCEEDED.equals(messageRecordDO.getStatus())) {
return "配额超限";
}
if (MsgStatusConstant.RATE_LIMITED.equals(messageRecordDO.getStatus())) {
return "限流拦截";
}
return StrUtil.blankToDefault(messageRecordDO.getErrorMsg(), "消息状态不允许发送");
}
private String buildResultDetail(SendResult sendResult) {
if (StrUtil.isNotBlank(sendResult.getMessage())) {
return sendResult.getMessage();
}
return sendResult.getOutcome() == null ? "发送失败" : sendResult.getOutcome().getDesc();
}
private String mapOutcomeToStatus(SendOutcome outcome) {
if (outcome == null) {
return MsgStatusConstant.FAILED;
}
return switch (outcome) {
case SUCCESS -> MsgStatusConstant.SUCCESS;
case ACCEPTED -> MsgStatusConstant.SENDING;
case FAILED -> MsgStatusConstant.FAILED;
case RETRYABLE_FAILED -> MsgStatusConstant.RETRYABLE_FAILED;
case UNSUPPORTED_CHANNEL -> MsgStatusConstant.UNSUPPORTED_CHANNEL;
case CONFIG_INVALID -> MsgStatusConstant.CONFIG_INVALID;
};
}
@Override
public boolean updateMessage(MessageRecordDO messageRecordDO) {

View File

@@ -14,7 +14,6 @@ import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,97 +33,68 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Autowired
private MessageRecordService messageRecordService;
@Autowired
public ChannelProviderConfigService channelProviderConfigService;
@Autowired
public RetryStrategyConfigService retryStrategyConfigService;
// public final ThreadPoolExecutor MSG_RETRY_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
// 3,
// 3,
// 1000,
// TimeUnit.MILLISECONDS,
// new java.util.concurrent.ArrayBlockingQueue<>(1000),
// r -> {
// Thread thread = new Thread(r);
// thread.setName("msgRetryThreadPool-" + thread.getId());
// thread.setDaemon(false);
// return thread;
// },
// new ThreadPoolExecutor.CallerRunsPolicy()
// );
/**
* 默认每次处理的消息数量
*/
private static final int DEFAULT_BATCH_SIZE = 100;
/**
* 默认重试间隔(秒)
*/
private static final int DEFAULT_RETRY_INTERVAL = 300; // 5分钟
/**
* 默认最大重试次数
*/
private static final int DEFAULT_MAX_RETRY_COUNT = 5;
@Override
@Transactional(rollbackFor = Exception.class)
public void saveOrUpdateRetryMessage(MessageRecordDO message) {
// 检查是否已存在重试记录
MessageRetryQueueDO existing = super.baseMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, message.getMessageId())
);
message.setLastRetryTime(message.getSendTime());
// 消息不在重试队列中
if (ObjectUtil.isNull(existing)) {
// 创建重试记录
MessageRetryQueueDO retryRecord = new MessageRetryQueueDO();
retryRecord.setMessageId(message.getMessageId());
retryRecord.setChannel(message.getChannel());
retryRecord.setReceiver(message.getReceiver());
retryRecord.setRetryCount(1);
RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(message.getChannel());
if (ObjectUtil.isNull(strategyConfig)) {
retryRecord.setMaxRetry(DEFAULT_MAX_RETRY_COUNT);
} else {
retryRecord.setMaxRetry(strategyConfig.getMaxRetryCount());
}
retryRecord.setMaxRetry(ObjectUtil.isNull(strategyConfig) ? DEFAULT_MAX_RETRY_COUNT : strategyConfig.getMaxRetryCount());
retryRecord.setFirstFailTime(message.getSendTime());
retryRecord.setLastRetryTime(message.getSendTime());
retryRecord.setLastErrorMsg(message.getErrorMsg());
// 计算下次重试时间
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), retryRecord.getRetryCount());
retryRecord.setNextRetryTime(nextRetryTime);
message.setNextRetryTime(nextRetryTime);
this.save(retryRecord);
} else {
existing.setRetryCount(existing.getRetryCount() + 1);
// 计算下次重试时间
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), existing.getRetryCount());
existing.setNextRetryTime(nextRetryTime);
message.setNextRetryTime(nextRetryTime);
this.updateById(existing);
messageRetryRedisDAO.addToRetryQueue(message);
return;
}
// 同步到Redis
int newRetryCount = existing.getRetryCount() + 1;
existing.setRetryCount(newRetryCount);
existing.setLastRetryTime(message.getSendTime());
existing.setLastErrorMsg(message.getErrorMsg());
if (newRetryCount >= existing.getMaxRetry()) {
message.setStatus(MsgStatusConstant.FINALFAILED);
message.setNextRetryTime(null);
existing.setNextRetryTime(null);
this.updateById(existing);
this.deleteByMessageIds(Collections.singletonList(message.getMessageId()));
messageRetryRedisDAO.removeFromRetryQueue(message.getChannel(), message.getMessageId());
return;
}
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), newRetryCount);
existing.setNextRetryTime(nextRetryTime);
message.setNextRetryTime(nextRetryTime);
this.updateById(existing);
messageRetryRedisDAO.addToRetryQueue(message);
}
/**
* 计算下次重试时间
*
* @param channel
* @param retryCount
* @return
*/
private LocalDateTime calculateNextRetryTime(String channel, int retryCount) {
RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(channel);
long plusSeconds = 0;
if (ObjectUtil.isNull(strategyConfig)) {
// 默认策略
switch (ChannelTypeEnum.getByCode(channel)) {
case SMS: {
case SMS -> {
if (retryCount == 1) {
plusSeconds = 60 * 5;
} else if (retryCount == 2) {
@@ -133,10 +103,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
plusSeconds = 60 * 30;
}
}
break;
case EMAIL: {
case EMAIL -> {
if (retryCount == 1) {
// plusSeconds = 60 * 10;
plusSeconds = 60 * 2;
} else if (retryCount == 2) {
plusSeconds = 60 * 30;
@@ -148,17 +116,13 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
plusSeconds = 60 * 60 * 4;
}
}
break;
case APP_PUSH: {
case APP_PUSH -> {
if (retryCount == 1) {
plusSeconds = 60 * 1;
plusSeconds = 60;
} else {
plusSeconds = 60 * 60;
}
}
break;
default:
plusSeconds = 60 * 5;
}
} else {
String retryIntervals = strategyConfig.getRetryIntervals();
@@ -174,20 +138,13 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Override
public void processRetryBatch(String channel) {
// 从数据库查询需要重试的消息
// List<MessageRetryQueueDO> retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE);
// 从redis中查询需要重试的消息
long epochMilli = System.currentTimeMillis();
Set<String> needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, epochMilli, DEFAULT_BATCH_SIZE);
// 没有需要重试的消息
if (needRetryMessageIds.isEmpty()) {
return;
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(needRetryMessageIds);
System.out.println("messageRecordDOList.size()=:" + messageRecordDOList.size());
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO);
}
@@ -199,12 +156,6 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
if (CollUtil.isNotEmpty(messageIds)) {
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
// 立即重试(设置下次重试时间为现在)
//retryRecord.setNextRetryTime(LocalDateTime.now());
//retryQueueMapper.updateById(retryRecord);
// 更新redis中的分数
//messageRetryRedisDAO.addToRetryQueue(retryRecord.getChannel(), messageId, LocalDateTime.now());
processSingleRetry(messageRecordDO);
}
}
@@ -241,57 +192,25 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
.update();
}
/**
* 处理单个消息的重试逻辑
*/
private void processSingleRetry(MessageRecordDO messageRecordDO) {
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
MessageSendResultVO messageSendResultVO = messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0);
boolean sendResult = messageSendResultVO.getResult();
messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
if (sendResult) {
log.info("处理消息重试成功逻辑messageId={}", messageRecordDO.getMessageId());
if (messageSendResultVO.getResult()) {
log.info("处理消息重试成功: messageId={}", messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
} else {
// 重试失败,更新重试信息
log.error("处理消息重试失败逻辑messageId={}", messageRecordDO.getMessageId());
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
handleRetryFailure(messageRecordDO);
return;
}
messageRecordService.updateMessage(messageRecordDO);
}
/**
* 处理重试失败的情况
*/
private void handleRetryFailure(MessageRecordDO messageRecordDO) {
MessageRetryQueueDO messageRetryQueueDO = this.lambdaQuery().eq(MessageRetryQueueDO::getMessageId, messageRecordDO.getMessageId()).one();
int newRetryCount = messageRetryQueueDO.getRetryCount() + 1;
if (newRetryCount >= messageRetryQueueDO.getMaxRetry()) {
messageRecordDO.setStatus(MsgStatusConstant.FINALFAILED);
messageRecordDO.setNextRetryTime(null);
// redis中可以删除
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
} else {
// 还可以继续重试,更新重试信息
LocalDateTime nextRetryTime = this.calculateNextRetryTime(messageRecordDO.getChannel(), newRetryCount);
messageRecordDO.setStatus(MsgStatusConstant.FAILED);
messageRecordDO.setNextRetryTime(nextRetryTime);
this.updateRetryInfo(
messageRecordDO.getMessageId(),
newRetryCount,
nextRetryTime,
LocalDateTime.now(),
"重试失败:" + LocalDateTime.now()
);
// 更新Redis中的下次重试时间
messageRetryRedisDAO.addToRetryQueue(messageRecordDO);
if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) {
log.warn("处理消息重试失败,已保留在重试队列: messageId={}", messageRecordDO.getMessageId());
return;
}
log.error("处理消息重试失败且不再重试: messageId={}", messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
}
}

View File

@@ -7,12 +7,12 @@ spring:
username: # Nacos 账号
password: # Nacos 密码
discovery: # 【配置中心】配置项
namespace: dev # 命名空间。这里使用 dev 开发环境
namespace: 7e15dd2b-2aa2-487d-9e80-1c01c7b9f742 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
metadata:
version: 1.0.0 # 服务实例的版本号,可用于灰度发布
config: # 【注册中心】配置项
namespace: dev # 命名空间。这里使用 dev 开发环境
namespace: 7e15dd2b-2aa2-487d-9e80-1c01c7b9f742 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
#spring:
# cloud:

View File

@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.njcn</groupId>
<artifactId>msgpush</artifactId>
<artifactId>cn-msgpush</artifactId>
<version>${revision}</version>
</parent>
<artifactId>msgpush-module-push</artifactId>