14 Commits

Author SHA1 Message Date
caozehui
c2bbdd865d 系统凭证调整 2026-05-13 09:47:54 +08:00
caozehui
e9f66ca0b2 微调 2026-05-12 16:02:11 +08:00
caozehui
8f86c563f0 微调 2026-05-12 11:22:26 +08:00
caozehui
db15799440 微调 2026-05-12 08:42:07 +08:00
caozehui
c4b56a727c 微调 2026-05-11 15:49:05 +08:00
caozehui
fe26aa8670 微调 2026-05-11 13:57:02 +08:00
caozehui
9343799290 引入阿里云邮件推送 2026-05-11 10:57:40 +08:00
caozehui
9be824777b 开放 生成系统凭证 接口 2026-04-20 19:55:21 +08:00
caozehui
8c2f5a0e17 微调 2026-04-20 11:15:16 +08:00
caozehui
c147ef602f 删除消息重试队列delete字段 2026-04-20 11:13:21 +08:00
caozehui
cb9a75084f 处理Redis认领后,数据库状态还未更新时,触发补招,导致重复补招同一个条记录问题 2026-04-20 08:46:27 +08:00
caozehui
47c00e2bd4 服务提供商启用\禁用逻辑调整,确保只有一个渠道只有一个服务提供商 2026-04-17 16:11:00 +08:00
caozehui
d44f6423e0 微调 2026-04-09 18:31:55 +08:00
caozehui
fc6fc9642b 重试逻辑调整 2026-04-09 13:39:10 +08:00
55 changed files with 1138 additions and 480 deletions

View File

@@ -72,7 +72,7 @@
<!-- Swagger 注解,用于 API 文档生成(@Schema、@Operation 等) --> <!-- Swagger 注解,用于 API 文档生成(@Schema、@Operation 等) -->
<dependency> <dependency>
<groupId>io.swagger.core.v3</groupId> <groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId> <artifactId>swagger-annotations-jakarta</artifactId>
</dependency> </dependency>
<!-- RPC 远程调用相关 --> <!-- RPC 远程调用相关 -->

View File

@@ -1,5 +1,7 @@
package com.njcn.msgpush.framework.common.exception.enums; package com.njcn.msgpush.framework.common.exception.enums;
import com.njcn.msgpush.framework.common.exception.ErrorCode;
/** /**
* 业务异常的错误码区间,解决:解决各模块错误码定义,避免重复,在此只声明不做实际使用 * 业务异常的错误码区间,解决:解决各模块错误码定义,避免重复,在此只声明不做实际使用
* *
@@ -27,7 +29,7 @@ package com.njcn.msgpush.framework.common.exception.enums;
* *
* @author hongawen * @author hongawen
*/ */
public class ServiceErrorCodeRange { public interface ServiceErrorCodeRange {
// 模块 infra 错误码区间 [1-001-000-000 ~ 1-002-000-000) // 模块 infra 错误码区间 [1-001-000-000 ~ 1-002-000-000)
// 模块 system 错误码区间 [1-002-000-000 ~ 1-003-000-000) // 模块 system 错误码区间 [1-002-000-000 ~ 1-003-000-000)
@@ -43,6 +45,7 @@ public class ServiceErrorCodeRange {
// 模块 crm 错误码区间 [1-020-000-000 ~ 1-021-000-000) // 模块 crm 错误码区间 [1-020-000-000 ~ 1-021-000-000)
// 模块 ai 错误码区间 [1-022-000-000 ~ 1-023-000-000) // 模块 push 错误码区间 [1-022-000-000 ~ 1-023-000-000)
ErrorCode VALIDATE_SYSTEM_SECRET_FAIL = new ErrorCode(1-022-000-000, "校验系统密钥失败");
ErrorCode GENERATE_CREDENTIAL_FAIL = new ErrorCode(1-022-000-001, "生成凭证失败");
} }

View File

@@ -93,9 +93,10 @@ public class MsgpushSwaggerAutoConfiguration {
} }
public static GroupedOpenApi buildGroupedOpenApi(String group, String path) { public static GroupedOpenApi buildGroupedOpenApi(String group, String path) {
String pathSuffix = path == null || path.isBlank() ? "" : "/" + path;
return GroupedOpenApi.builder() return GroupedOpenApi.builder()
.group(group) .group(group)
.pathsToMatch("/admin-api/" + path + "/**", "/app-api/" + path + "/**") .pathsToMatch("/admin-api" + pathSuffix + "/**", "/app-api" + pathSuffix + "/**")
.addOperationCustomizer((operation, handlerMethod) -> operation .addOperationCustomizer((operation, handlerMethod) -> operation
.addParametersItem(buildSecurityHeaderParameter())) .addParametersItem(buildSecurityHeaderParameter()))
.addOperationCustomizer(buildOperationIdCustomizer()) .addOperationCustomizer(buildOperationIdCustomizer())

View File

@@ -1,60 +0,0 @@
package com.njcn.msgpush.module.push.client;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.AliyunProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.TelecomProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.UniPushProviderFactory;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.enums.ProviderTypeEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author caozehui
* @data 2026-02-10
*/
@Configuration
public class ClientConfiguration {
@Autowired
private ChannelProviderConfigService channelProviderConfigService;
@Bean
public Map<String, MessageProviderFactory> messageProviderFactoryMap() throws Exception {
List<ChannelProviderConfigDO> activeProviders = channelProviderConfigService.getActiveProviders();
Map<String, MessageProviderFactory> messageProviderFactoryMap = new HashMap<>();
for (ChannelProviderConfigDO config : activeProviders) {
switch (ProviderTypeEnum.getByCode(config.getProviderType())) {
case ALIYUN: {
MessageProviderFactory orDefault = messageProviderFactoryMap.getOrDefault(ProviderTypeEnum.ALIYUN.getCode(), new AliyunProviderFactory());
messageProviderFactoryMap.put(ProviderTypeEnum.ALIYUN.getCode(), orDefault);
}
break;
case TELECOM: {
MessageProviderFactory orDefault = messageProviderFactoryMap.getOrDefault(ProviderTypeEnum.TELECOM.getCode(), new TelecomProviderFactory());
messageProviderFactoryMap.put(ProviderTypeEnum.TELECOM.getCode(), orDefault);
}
break;
case UNIPUSH: {
MessageProviderFactory orDefault = messageProviderFactoryMap.getOrDefault(ProviderTypeEnum.UNIPUSH.getCode(), new UniPushProviderFactory());
messageProviderFactoryMap.put(ProviderTypeEnum.UNIPUSH.getCode(), orDefault);
}
break;
default:
throw new IllegalArgumentException("" + config.getProviderType() + "服务商暂不支持");
}
}
if (messageProviderFactoryMap.isEmpty()) {
throw new Exception("当前没有激活的渠道服务商!");
}
return messageProviderFactoryMap;
}
}

View File

@@ -0,0 +1,29 @@
package com.njcn.msgpush.module.push.client.factory;
import com.njcn.msgpush.module.push.client.factory.impl.AliyunProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.TelecomProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.UniPushProviderFactory;
import com.njcn.msgpush.module.push.enums.ProviderTypeEnum;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Component
public class MessageProviderFactoryRegistry {
private final Map<String, MessageProviderFactory> factoryMap;
public MessageProviderFactoryRegistry() {
Map<String, MessageProviderFactory> factories = new HashMap<>();
factories.put(ProviderTypeEnum.ALIYUN.getCode(), new AliyunProviderFactory());
factories.put(ProviderTypeEnum.TELECOM.getCode(), new TelecomProviderFactory());
factories.put(ProviderTypeEnum.UNIPUSH.getCode(), new UniPushProviderFactory());
this.factoryMap = Collections.unmodifiableMap(factories);
}
public MessageProviderFactory getFactory(String providerType) {
return factoryMap.get(providerType);
}
}

View File

@@ -20,12 +20,12 @@ public class AliyunProviderFactory implements MessageProviderFactory {
@Override @Override
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) { public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。 // 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getAppSecret())) { if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getSecret())) {
return new FixedResultSmsSender(SendOutcome.CONFIG_INVALID, "当前服务商短信渠道配置不完整"); return new FixedResultSmsSender(SendOutcome.CONFIG_INVALID, "当前服务商短信渠道配置不完整");
} }
AliYunMailSetting aliYunSmsSetting = AliYunMailSetting.builder() AliYunMailSetting aliYunSmsSetting = AliYunMailSetting.builder()
.accessKeyId(config.getAppKey()) .accessKeyId(config.getAppKey())
.accessKeySecret(config.getAppSecret()) .accessKeySecret(config.getSecret())
.regionId("cn-hangzhou") .regionId("cn-hangzhou")
.endpoint("dysmsapi.aliyuncs.com") .endpoint("dysmsapi.aliyuncs.com")
.build(); .build();
@@ -35,12 +35,12 @@ public class AliyunProviderFactory implements MessageProviderFactory {
@Override @Override
public EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender) { public EmailSender createEmailSender(ChannelProviderConfigDO config, Sender sender) {
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。 // 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getAppSecret())) { if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getSecret())) {
return new FixedResultEmailSender(SendOutcome.CONFIG_INVALID, "当前服务商邮件渠道配置不完整"); return new FixedResultEmailSender(SendOutcome.CONFIG_INVALID, "当前服务商邮件渠道配置不完整");
} }
AliYunMailSetting aliYunMailSetting = AliYunMailSetting.builder() AliYunMailSetting aliYunMailSetting = AliYunMailSetting.builder()
.accessKeyId(config.getAppKey()) .accessKeyId(config.getAppKey())
.accessKeySecret(config.getAppSecret()) .accessKeySecret(config.getSecret())
.regionId("cn-hangzhou") .regionId("cn-hangzhou")
.endpoint("dm.aliyuncs.com") .endpoint("dm.aliyuncs.com")
.build(); .build();

View File

@@ -19,12 +19,12 @@ public class TelecomProviderFactory implements MessageProviderFactory {
@Override @Override
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) { public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。 // 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getAppSecret(), config.getApiUrl())) { if (ProviderFactorySupport.hasBlank(config.getAppKey(), config.getSecret(), config.getApiUrl())) {
return new FixedResultSmsSender(SendOutcome.CONFIG_INVALID, "当前服务商短信渠道配置不完整"); return new FixedResultSmsSender(SendOutcome.CONFIG_INVALID, "当前服务商短信渠道配置不完整");
} }
TelecomSmsSetting telecomSmsSetting = TelecomSmsSetting.builder() TelecomSmsSetting telecomSmsSetting = TelecomSmsSetting.builder()
.account(config.getAppKey()) .account(config.getAppKey())
.password(config.getAppSecret()) .password(config.getSecret())
.apiUrl(config.getApiUrl()) .apiUrl(config.getApiUrl())
.extno(config.getExtno()) .extno(config.getExtno())
.build(); .build();

View File

@@ -1,15 +1,8 @@
package com.njcn.msgpush.module.push.client.factory.impl; package com.njcn.msgpush.module.push.client.factory.impl;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory; import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.ProviderFactorySupport; import com.njcn.msgpush.module.push.client.factory.ProviderFactorySupport;
import com.njcn.msgpush.module.push.client.sender.AppPushSender; import com.njcn.msgpush.module.push.client.sender.*;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.SendOutcome;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultAppPushSender; import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultAppPushSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultEmailSender; import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultEmailSender;
import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultSmsSender; import com.njcn.msgpush.module.push.client.sender.fallback.FixedResultSmsSender;
@@ -19,8 +12,6 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig
public class UniPushProviderFactory implements MessageProviderFactory { public class UniPushProviderFactory implements MessageProviderFactory {
private static final String APP_ID = "appId";
private static final String MASTER_SECRET = "masterSecret";
@Override @Override
public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) { public SmsSender createSmsSender(ChannelProviderConfigDO config, Sender sender) {
@@ -36,18 +27,15 @@ public class UniPushProviderFactory implements MessageProviderFactory {
@Override @Override
public AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender) { public AppPushSender createAppPushSender(ChannelProviderConfigDO config, Sender sender) {
JSONObject extraConfig = StrUtil.isBlank(config.getExtraConfig()) ? new JSONObject() : JSON.parseObject(config.getExtraConfig());
String appId = extraConfig.getString(APP_ID);
String masterSecret = extraConfig.getString(MASTER_SECRET);
// 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。 // 配置缺失时返回固定结果 sender由主流程统一落库为配置异常。
if (ProviderFactorySupport.hasBlank(appId, config.getAppKey(), config.getAppSecret(), masterSecret)) { if (ProviderFactorySupport.hasBlank(config.getSecret())) {
return new FixedResultAppPushSender(SendOutcome.CONFIG_INVALID, "当前服务商 APP 推送渠道配置不完整"); return new FixedResultAppPushSender(SendOutcome.CONFIG_INVALID, "当前服务商 APP 推送渠道配置不完整");
} }
UniPushAppPushSetting uniPushAppPushSetting = new UniPushAppPushSetting( UniPushAppPushSetting uniPushAppPushSetting = new UniPushAppPushSetting(
appId, config.getAppId(),
config.getAppKey(), config.getAppKey(),
config.getAppSecret(), config.getSecret(),
masterSecret config.getApiUrl()
); );
return new UniPushAppPushSender(uniPushAppPushSetting, sender); return new UniPushAppPushSender(uniPushAppPushSetting, sender);
} }

View File

@@ -1,14 +1,22 @@
package com.njcn.msgpush.module.push.client.sender.impl.apppush; package com.njcn.msgpush.module.push.client.sender.impl.apppush;
import cn.hutool.core.util.StrUtil;
import com.getui.push.v2.sdk.ApiHelper; import com.getui.push.v2.sdk.ApiHelper;
import com.getui.push.v2.sdk.GtApiConfiguration; import com.getui.push.v2.sdk.GtApiConfiguration;
import com.getui.push.v2.sdk.api.PushApi; import com.getui.push.v2.sdk.api.PushApi;
import com.getui.push.v2.sdk.common.ApiResult; import com.getui.push.v2.sdk.common.ApiResult;
import com.getui.push.v2.sdk.dto.req.Audience; import com.getui.push.v2.sdk.dto.req.Audience;
import com.getui.push.v2.sdk.dto.req.Settings; import com.getui.push.v2.sdk.dto.req.Settings;
import com.getui.push.v2.sdk.dto.req.message.PushChannel;
import com.getui.push.v2.sdk.dto.req.message.PushDTO; import com.getui.push.v2.sdk.dto.req.message.PushDTO;
import com.getui.push.v2.sdk.dto.req.message.PushMessage; import com.getui.push.v2.sdk.dto.req.message.PushMessage;
import com.getui.push.v2.sdk.dto.req.message.android.AndroidDTO;
import com.getui.push.v2.sdk.dto.req.message.android.GTNotification; import com.getui.push.v2.sdk.dto.req.message.android.GTNotification;
import com.getui.push.v2.sdk.dto.req.message.android.ThirdNotification;
import com.getui.push.v2.sdk.dto.req.message.android.Ups;
import com.getui.push.v2.sdk.dto.req.message.ios.Alert;
import com.getui.push.v2.sdk.dto.req.message.ios.Aps;
import com.getui.push.v2.sdk.dto.req.message.ios.IosDTO;
import com.njcn.msgpush.module.push.client.sender.AppPushSender; import com.njcn.msgpush.module.push.client.sender.AppPushSender;
import com.njcn.msgpush.module.push.client.sender.SendResult; import com.njcn.msgpush.module.push.client.sender.SendResult;
import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.Sender;
@@ -34,7 +42,13 @@ public class UniPushAppPushSender implements AppPushSender {
gtApiConfiguration.setAppId(uniPushAppPushSetting.getAppId()); gtApiConfiguration.setAppId(uniPushAppPushSetting.getAppId());
gtApiConfiguration.setAppKey(uniPushAppPushSetting.getAppKey()); gtApiConfiguration.setAppKey(uniPushAppPushSetting.getAppKey());
gtApiConfiguration.setMasterSecret(uniPushAppPushSetting.getMasterSecret()); gtApiConfiguration.setMasterSecret(uniPushAppPushSetting.getMasterSecret());
// 使用配置的 API 地址,如果未配置则使用默认地址
String apiUrl = uniPushAppPushSetting.getApiUrl();
if (StrUtil.isNotBlank(apiUrl)) {
gtApiConfiguration.setDomain(apiUrl);
} else {
gtApiConfiguration.setDomain("https://restapi.getui.com/v2/"); gtApiConfiguration.setDomain("https://restapi.getui.com/v2/");
}
ApiHelper apiHelper = ApiHelper.build(gtApiConfiguration); ApiHelper apiHelper = ApiHelper.build(gtApiConfiguration);
this.pushApi = apiHelper.creatApi(PushApi.class); this.pushApi = apiHelper.creatApi(PushApi.class);
} catch (Exception e) { } catch (Exception e) {
@@ -89,13 +103,48 @@ public class UniPushAppPushSender implements AppPushSender {
settings.setTtl(3600000); settings.setTtl(3600000);
pushDTO.setSettings(settings); pushDTO.setSettings(settings);
// 创建 PushMessage设置透传消息可选
PushMessage pushMessage = new PushMessage(); PushMessage pushMessage = new PushMessage();
GTNotification notification = new GTNotification(); pushMessage.setTransmission("{\"title\":\"" + title + "\",\"body\":\"" + content + "\"}");
notification.setTitle(title);
notification.setBody(content); // 在线推送 - 使用 GTNotification
notification.setClickType("startapp"); GTNotification gtNotification = new GTNotification();
pushMessage.setNotification(notification); gtNotification.setTitle(title);
gtNotification.setBody(content);
gtNotification.setClickType("startapp");
pushMessage.setNotification(gtNotification);
pushDTO.setPushMessage(pushMessage); pushDTO.setPushMessage(pushMessage);
// 创建 PushChannel 用于配置各平台的通知
PushChannel pushChannel = new PushChannel();
// Android 离线推送配置 - 使用 ThirdNotification
AndroidDTO androidDTO = new AndroidDTO();
Ups ups = new Ups();
ThirdNotification thirdNotification = new ThirdNotification();
thirdNotification.setTitle(title);
thirdNotification.setBody(content);
thirdNotification.setClickType("startapp");
ups.setNotification(thirdNotification);
androidDTO.setUps(ups);
pushChannel.setAndroid(androidDTO);
// iOS 通知配置
IosDTO iosDTO = new IosDTO();
Aps aps = new Aps();
Alert alert = new Alert();
alert.setTitle(title);
alert.setBody(content);
aps.setAlert(alert);
aps.setSound("default");
aps.setContentAvailable(0);
iosDTO.setAps(aps);
pushChannel.setIos(iosDTO);
// 将 PushChannel 设置到 PushDTO 中
pushDTO.setPushChannel(pushChannel);
return pushDTO; return pushDTO;
} }
} }

View File

@@ -1,11 +1,8 @@
package com.njcn.msgpush.module.push.client.sender.impl.email; package com.njcn.msgpush.module.push.client.sender.impl.email;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dm20151123.Client; import com.aliyun.dm20151123.Client;
import com.aliyun.dm20151123.models.SingleSendMailRequest; import com.aliyun.dm20151123.models.*;
import com.aliyun.dm20151123.models.SingleSendMailResponse;
import com.aliyun.teaopenapi.models.Config; import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions; import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.EmailSender; import com.njcn.msgpush.module.push.client.sender.EmailSender;
@@ -18,6 +15,8 @@ import org.springframework.http.HttpStatus;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -25,9 +24,10 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class AliyunEmailSender implements EmailSender { public class AliyunEmailSender implements EmailSender {
private static final String ACCOUNT_NAME = "accountName"; private static final String ACCOUNT_NAME = "njcn@shining-electric.cn";
private static final String REPLY_TO_ADDRESS = "replyToAddress"; private static final Integer ADDRESS_TYPE = 1;
private static final String FROM_ALIAS = "fromAlias"; private static final Boolean REPLY_TO_ADDRESS = false;
private static final String FROM_ALIAS = "南京灿能";
private final Sender sender; private final Sender sender;
@@ -60,16 +60,15 @@ public class AliyunEmailSender implements EmailSender {
RuntimeOptions runtimeOptions = new RuntimeOptions(); RuntimeOptions runtimeOptions = new RuntimeOptions();
runtimeOptions.autoretry = true; runtimeOptions.autoretry = true;
JSONObject jsonObject = JSON.parseObject(message.getExtraInfo());
SingleSendMailRequest request = new SingleSendMailRequest() SingleSendMailRequest request = new SingleSendMailRequest()
.setAccountName(jsonObject.getString(ACCOUNT_NAME)) .setAccountName(ACCOUNT_NAME)
.setAddressType(1) .setAddressType(ADDRESS_TYPE)
.setReplyToAddress(jsonObject.getBooleanValue(REPLY_TO_ADDRESS)) .setReplyToAddress(REPLY_TO_ADDRESS)
.setToAddress(message.getReceiver()) .setToAddress(message.getReceiver())
.setSubject(message.getTitle()) .setSubject(message.getTitle())
.setHtmlBody(message.getContent()) .setHtmlBody(message.getContent())
.setTextBody("") .setTextBody("")
.setFromAlias(jsonObject.getString(FROM_ALIAS)); .setFromAlias(FROM_ALIAS);
try { try {
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
@@ -81,8 +80,11 @@ public class AliyunEmailSender implements EmailSender {
message.setCostTime(costTime); message.setCostTime(costTime);
if (HttpStatus.OK.value() == response.getStatusCode()) { if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setThirdPartyId(response.getBody().getEnvId());
// 电信短信同步返回成功同样只代表已受理,最终状态以后续回执为准。
this.getDownInfo(message);
// 邮件接口同步返回成功时,当前平台直接认定本次发送成功。 // 邮件接口同步返回成功时,当前平台直接认定本次发送成功。
return SendResult.success(now, costTime, null); return SendResult.accepted(now, costTime, response.getBody().getEnvId());
} }
// 邮件服务失败同样统一转换为平台错误码和中文错误信息。 // 邮件服务失败同样统一转换为平台错误码和中文错误信息。
@@ -107,4 +109,50 @@ public class AliyunEmailSender implements EmailSender {
return this.sender.buildTimeoutResult(); return this.sender.buildTimeoutResult();
} }
} }
private void getDownInfo(MessageRecordDO message) {
// 回执查询延后执行,给第三方落库和状态变更留出时间。
this.sender.MSG_CALLBACK_THREAD_POOL_SCHEDULER.schedule(() -> {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
SenderStatisticsDetailByParamRequest request = new SenderStatisticsDetailByParamRequest()
.setToAddress(message.getReceiver())
.setLength(1);
try {
SenderStatisticsDetailByParamResponse detail = this.emailClient.senderStatisticsDetailByParam(request);
if (detail.statusCode == 200) {
List<SenderStatisticsDetailByParamResponseBody.SenderStatisticsDetailByParamResponseBodyDataMailDetail> mailDetailList = detail.body.getData().mailDetail;
if (mailDetailList.size() > 0) {
SenderStatisticsDetailByParamResponseBody.SenderStatisticsDetailByParamResponseBodyDataMailDetail mailDetail = mailDetailList.get(0);
if (mailDetail.getStatus() == 0) {
// 回执确认成功后,复用统一成功落库逻辑。
this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId()));
} else {
SendResult failedResult = this.sender.buildFailureResult(
message,
mailDetail.getErrorClassification(),
mailDetail.getErrorClassification(),
"THIRD_PARTY_CALLBACK_FAILED",
"邮件回执返回失败",
false
);
this.sender.applyCallbackResult(message, failedResult);
}
} else {
SendResult failedResult = this.sender.buildFailureResult(
message,
null,
null,
"THIRD_PARTY_CALLBACK_FAILED",
"邮件回执返回失败",
false
);
this.sender.applyCallbackResult(message, failedResult);
}
}
} catch (Exception e) {
log.error("电信短信回执查询失败", e);
}
}, 30, TimeUnit.SECONDS);
}
} }

View File

@@ -200,33 +200,17 @@ public class TelecomSmsSender implements SmsSender {
return; return;
} }
TelecomSmsSelectDetailRes detailRes = selectResponse.getList().get(0); TelecomSmsSelectDetailRes detailRes = selectResponse.getList().get(0);
// if (detailRes.getStatus() == 5) { if (detailRes.getStatus() == 5) {
// SendResult failedResult = this.sender.buildFailureResult(
// message,
// detailRes.getStat(),
// null,
// "THIRD_PARTY_CALLBACK_FAILED",
// "短信回执返回失败",
// false
// );
// this.sender.applyCallbackResult(message, failedResult);
// } else if (detailRes.getStatus() == 4) {
// // 回执确认成功后,复用统一成功落库逻辑。
// this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId()));
// }
double random = Math.random();
System.out.println(random + " aaaa");
if (random > 0.5) {
SendResult failedResult = this.sender.buildFailureResult( SendResult failedResult = this.sender.buildFailureResult(
message, message,
detailRes.getStat(), detailRes.getStat(),
null, null,
"THIRD_PARTY_CALLBACK_FAILED", "THIRD_PARTY_CALLBACK_FAILED",
"短信回执返回失败", "短信回执返回失败",
true false
); );
this.sender.applyCallbackResult(message, failedResult); this.sender.applyCallbackResult(message, failedResult);
} else { } else if (detailRes.getStatus() == 4) {
// 回执确认成功后,复用统一成功落库逻辑。 // 回执确认成功后,复用统一成功落库逻辑。
this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId())); this.sender.applyCallbackResult(message, SendResult.success(message.getSendTime(), message.getCostTime(), message.getThirdPartyId()));
} }

View File

@@ -12,15 +12,17 @@ import lombok.EqualsAndHashCode;
@Data @Data
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public class UniPushAppPushSetting extends AppPushSetting { public class UniPushAppPushSetting extends AppPushSetting {
// 个推应用ID
private String appId; private String appId;
private String appKey; private String appKey;
private String uniAppSecret; // 个推主密钥
private String masterSecret; private String masterSecret;
private String apiUrl;
public UniPushAppPushSetting(String appId, String appKey, String uniAppSecret, String masterSecret) { public UniPushAppPushSetting(String appId, String appKey, String masterSecret, String apiUrl) {
this.appId = appId; this.appId = appId;
this.appKey = appKey; this.appKey = appKey;
this.uniAppSecret = uniAppSecret;
this.masterSecret = masterSecret; this.masterSecret = masterSecret;
this.apiUrl = apiUrl;
} }
} }

View File

@@ -19,7 +19,7 @@ public class BlacklistReqVO extends PageParam {
@Schema(description = "主键 ID", example = "123444") @Schema(description = "主键 ID", example = "123444")
private Long id; private Long id;
@Schema(description = "渠道类型sms/email/app_push", example = "sms") @Schema(description = "渠道类型sms/email/app", example = "sms")
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;

View File

@@ -3,30 +3,27 @@ package com.njcn.msgpush.module.push.controller.admin.channel;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.AliyunProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.TelecomProviderFactory;
import com.njcn.msgpush.module.push.client.factory.impl.UniPushProviderFactory;
import com.njcn.msgpush.module.push.controller.admin.channel.vo.ChannelProviderConfigReqVO; import com.njcn.msgpush.module.push.controller.admin.channel.vo.ChannelProviderConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.enums.ProviderTypeEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService; import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.List;
import java.util.Map;
import static com.njcn.msgpush.framework.common.pojo.CommonResult.success; import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
@Tag(name = "管理后台 - 渠道服务商") @Tag(name = "管理后台 - 渠道服务商")
@Slf4j @Slf4j
@Validated @Validated
@@ -37,10 +34,6 @@ public class ChannelProviderConfigController {
@Autowired @Autowired
private ChannelProviderConfigService channelProviderConfigService; private ChannelProviderConfigService channelProviderConfigService;
@Autowired
@Qualifier("messageProviderFactoryMap")
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
@PostMapping("/page") @PostMapping("/page")
@Operation(summary = "分页查询渠道服务商列表") @Operation(summary = "分页查询渠道服务商列表")
@PreAuthorize("@ss.hasPermission('push:channel:page')") @PreAuthorize("@ss.hasPermission('push:channel:page')")
@@ -69,7 +62,7 @@ public class ChannelProviderConfigController {
@PreAuthorize("@ss.hasPermission('push:channel:delete')") @PreAuthorize("@ss.hasPermission('push:channel:delete')")
@Parameter(name = "ids", description = "id 列表", required = true) @Parameter(name = "ids", description = "id 列表", required = true)
public CommonResult<Boolean> delete(@RequestParam("ids") List<String> ids) { public CommonResult<Boolean> delete(@RequestParam("ids") List<String> ids) {
boolean res = channelProviderConfigService.removeBatchByIds(ids); boolean res = channelProviderConfigService.deleteByIds(ids);
return success(res); return success(res);
} }
@@ -78,46 +71,7 @@ public class ChannelProviderConfigController {
@PreAuthorize("@ss.hasPermission('push:channel:toggle')") @PreAuthorize("@ss.hasPermission('push:channel:toggle')")
@Parameter(name = "id", description = "id", required = true) @Parameter(name = "id", description = "id", required = true)
public CommonResult<Void> toggleEnableChannelProvider(@RequestParam("id") String id) { public CommonResult<Void> toggleEnableChannelProvider(@RequestParam("id") String id) {
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.toggleEnableField(id); channelProviderConfigService.toggleEnableField(id);
if (channelProviderConfigDO.getEnabled() == 1) {
registerProviderBean(channelProviderConfigDO.getProviderType());
} else {
removeProviderBean(channelProviderConfigDO.getProviderType());
}
return success(null); return success(null);
} }
/**
* 添加指定providerType服务商对应的bean
*
* @param providerType 服务商类型例如aliyun\telecom\UniPush
*/
public void registerProviderBean(String providerType) {
switch (ProviderTypeEnum.getByCode(providerType)) {
case ALIYUN: {
messageProviderFactoryMap.put(ProviderTypeEnum.ALIYUN.getCode(), new AliyunProviderFactory());
}
break;
case TELECOM: {
messageProviderFactoryMap.put(ProviderTypeEnum.TELECOM.getCode(), new TelecomProviderFactory());
}
break;
case UNIPUSH: {
messageProviderFactoryMap.put(ProviderTypeEnum.UNIPUSH.getCode(), new UniPushProviderFactory());
}
break;
default:
throw new IllegalArgumentException("" + providerType + "服务商暂不支持");
}
}
/**
* 移除指定providerType服务商对应的bean
*
* @param providerType 服务商类型例如aliyun\telecom\UniPush
*/
public void removeProviderBean(String providerType) {
messageProviderFactoryMap.remove(providerType);
}
} }

View File

@@ -13,7 +13,7 @@ public class ChannelProviderConfigReqVO extends PageParam {
@Schema(description = "渠道ID") @Schema(description = "渠道ID")
private String id; private String id;
@Schema(description = "渠道类型sms/email/app_push", example = "sms") @Schema(description = "渠道类型sms/email/app", example = "sms")
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;

View File

@@ -0,0 +1,37 @@
package com.njcn.msgpush.module.push.controller.admin.credential;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialReqDTO;
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialRespDTO;
import com.njcn.msgpush.module.push.service.credential.ICredentialService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.security.PermitAll;
import jakarta.validation.Valid;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 系统凭证 API 接口
*
* @author msgpush
*/
@Tag(name = "系统凭证管理")
@RestController
@RequestMapping("/push/credential")
public class CredentialController {
@Autowired
private ICredentialService credentialService;
@PermitAll
@PostMapping("/generate")
@Operation(summary = "生成系统凭证")
public CommonResult<CredentialRespDTO> generateCredential(@Valid @RequestBody CredentialReqDTO reqDTO) {
CredentialRespDTO respDTO = credentialService.generateCredential(reqDTO);
return CommonResult.success(respDTO);
}
}

View File

@@ -2,11 +2,15 @@ package com.njcn.msgpush.module.push.controller.admin.message;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.exception.enums.GlobalErrorCodeConstants;
import com.njcn.msgpush.framework.common.pojo.CommonResult; import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent; import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.service.credential.CredentialServiceImpl;
import com.njcn.msgpush.module.push.service.credential.ICredentialService;
import com.njcn.msgpush.module.push.service.message.MessageRecordService; import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
@@ -16,7 +20,12 @@ import jakarta.validation.Valid;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.List;
@@ -30,24 +39,55 @@ public class MessageRecordController {
@Autowired @Autowired
private MessageRecordService messageRecordService; private MessageRecordService messageRecordService;
@Autowired
private ICredentialService credentialService;
@PermitAll @PermitAll
@PostMapping("/send") @PostMapping("/send/sms")
@Operation(summary = "消息推送") @Operation(summary = "短信推送")
@Idempotent(timeout = 2) @Idempotent(timeout = 2)
public CommonResult<List<MessageSendResultVO>> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) { public CommonResult<List<MessageSendResultVO>> sendSms(@Valid @RequestBody List<MessageRecordReqVO> reqVOList,
return success(messageRecordService.send(reqVOList)); @RequestHeader(value = "X-Credential-Token", required = true) String credentialToken) {
return doSend(reqVOList, credentialToken, ChannelTypeEnum.SMS);
}
@PermitAll
@PostMapping("/send/email")
@Operation(summary = "邮件推送")
@Idempotent(timeout = 2)
public CommonResult<List<MessageSendResultVO>> sendEmail(@Valid @RequestBody List<MessageRecordReqVO> reqVOList,
@RequestHeader(value = "X-Credential-Token", required = true) String credentialToken) {
return doSend(reqVOList, credentialToken, ChannelTypeEnum.EMAIL);
}
@PermitAll
@PostMapping("/send/app")
@Operation(summary = "APP 推送")
@Idempotent(timeout = 2)
public CommonResult<List<MessageSendResultVO>> sendApp(@Valid @RequestBody List<MessageRecordReqVO> reqVOList,
@RequestHeader(value = "X-Credential-Token", required = true) String credentialToken) {
return doSend(reqVOList, credentialToken, ChannelTypeEnum.APP);
}
private CommonResult<List<MessageSendResultVO>> doSend(List<MessageRecordReqVO> reqVOList,
String credentialToken,
ChannelTypeEnum channelTypeEnum) {
CredentialServiceImpl.CredentialInfo credentialInfo = credentialService.verifyCredential(credentialToken);
if (credentialInfo == null) {
return CommonResult.error(GlobalErrorCodeConstants.UNAUTHORIZED.getCode(), "凭证无效或已过期");
}
return success(messageRecordService.send(reqVOList, channelTypeEnum, credentialInfo.getSystemName()));
} }
@PostMapping("/page") @PostMapping("/page")
@Operation(summary = "分页查询渠道服务商列表") @Operation(summary = "分页查询消息记录")
@PreAuthorize("@ss.hasPermission('push:message:page')") @PreAuthorize("@ss.hasPermission('push:message:page')")
public CommonResult<Page<MessageRecordDO>> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) { public CommonResult<Page<MessageRecordDO>> pageChannelProviderConfig(@Validated @RequestBody MessageRecordReqVO reqVO) {
return success(messageRecordService.getPage(reqVO)); return success(messageRecordService.getPage(reqVO));
} }
@PostMapping("/add") @PostMapping("/add")
@Operation(summary = "添加消息记录") @Operation(summary = "新增消息记录")
@PreAuthorize("@ss.hasPermission('push:message:add')") @PreAuthorize("@ss.hasPermission('push:message:add')")
public CommonResult<Boolean> add(@Validated @RequestBody MessageRecordReqVO reqVO) { public CommonResult<Boolean> add(@Validated @RequestBody MessageRecordReqVO reqVO) {
return success(messageRecordService.add(reqVO)); return success(messageRecordService.add(reqVO));

View File

@@ -16,15 +16,7 @@ import lombok.Data;
@Data @Data
@Schema(description = "管理后台 - 消息记录发送 Request VO") @Schema(description = "管理后台 - 消息记录发送 Request VO")
public class MessageRecordReqVO extends PageParam { public class MessageRecordReqVO extends PageParam {
@Schema(description = "主键ID")
private Long id;
@Schema(description = "应用名称/来源系统标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "NPQS-9500")
@NotBlank(message = "应用名称/来源系统标识不能为空")
private String appName;
@Schema(description = "渠道类型sms/email/app_push", example = "sms")
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;
@Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify") @Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify")
@@ -33,7 +25,7 @@ public class MessageRecordReqVO extends PageParam {
@Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED) @Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED)
@NotBlank(message = "接收者不能为空") @NotBlank(message = "接收者不能为空")
@Pattern(regexp = RegexPool.EMAIL + "|" + RegexPool.MOBILE, message = "必须是有效的邮箱或手机号格式") // @Pattern(regexp = RegexPool.EMAIL + "|" + RegexPool.MOBILE, message = "必须是有效的邮箱或手机号格式")
private String receiver; private String receiver;
@Schema(description = "标题", requiredMode = Schema.RequiredMode.REQUIRED) @Schema(description = "标题", requiredMode = Schema.RequiredMode.REQUIRED)
@@ -47,14 +39,4 @@ public class MessageRecordReqVO extends PageParam {
@Schema(description = "模板参数") @Schema(description = "模板参数")
private String templateParams; private String templateParams;
@Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio")
@InEnum(value = com.njcn.msgpush.module.push.enums.ProviderTypeEnum.class, message = "服务商类型必须是 {value}")
private String providerType;
@Schema(description = "第三方消息ID")
private String thirdPartyId;
@Schema(description = "额外信息")
private String extraInfo;
} }

View File

@@ -18,7 +18,7 @@ public class RateLimitConfigReqVO extends PageParam {
@Schema(description = "主键 ID") @Schema(description = "主键 ID")
private Long id; private Long id;
@Schema(description = "渠道类型sms/email/app_push", example = "sms") @Schema(description = "渠道类型sms/email/app", example = "sms")
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;

View File

@@ -16,7 +16,7 @@ public class MessageRetryQueueReqVO extends PageParam {
@Schema(description = "消息ID") @Schema(description = "消息ID")
private Long messageId; private Long messageId;
@Schema(description = "渠道类型", example = "sms/email/app_push") @Schema(description = "渠道类型", example = "sms/email/app")
@NotBlank(message = "渠道类型不能为空") @NotBlank(message = "渠道类型不能为空")
private String channel; private String channel;

View File

@@ -17,7 +17,7 @@ public class RetryStrategyConfigReqVO {
@Schema(description = "主键 ID") @Schema(description = "主键 ID")
private Long id; private Long id;
@Schema(description = "渠道类型sms/email/app_push", example = "sms") @Schema(description = "渠道类型sms/email/app", example = "sms")
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}") @InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
private String channel; private String channel;

View File

@@ -19,7 +19,7 @@ public class BlacklistDO extends BaseDO {
private Long id; private Long id;
/** /**
* 渠道类型sms/email/app_push * 渠道类型sms/email/app
*/ */
private String channel; private String channel;

View File

@@ -22,7 +22,7 @@ public class ChannelProviderConfigDO extends BaseDO {
private Long id; private Long id;
/** /**
* 渠道类型sms/email/app_push * 渠道类型sms/email/app
*/ */
private String channel; private String channel;
@@ -32,30 +32,30 @@ public class ChannelProviderConfigDO extends BaseDO {
private String providerName; private String providerName;
/** /**
* 服务商类型telecom/cmcc/aliyun/twilio/unipush * 服务商类型telecom/aliyun/unipush
*/ */
private String providerType; private String providerType;
/** /**
* API地址 * API地址
* 对于 UniPush可配置为自定义API地址如未配置则使用默认地址 https://restapi.getui.com/v2/
*/ */
private String apiUrl; private String apiUrl;
/**
* AppKey
*/
private String appKey; private String appKey;
/** private String secret;
* AppSecret
*/
private String appSecret;
/** /**
* 电信sms服务所需接入码 * 电信sms服务所需接入码
*/ */
private String extno; private String extno;
/**
* app推送服务所需的appId
*/
private String appId;
/** /**
* 额外配置JSON格式 * 额外配置JSON格式
*/ */

View File

@@ -25,7 +25,7 @@ public class ProviderErrorCodeMappingDO extends BaseDO {
private String providerType; private String providerType;
/** /**
* 渠道类型sms/email/app_push * 渠道类型sms/email/app
*/ */
private String channel; private String channel;

View File

@@ -0,0 +1,16 @@
package com.njcn.msgpush.module.push.dal.dataobject.credential;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/**
* @author caozehui
* @data 2026-04-09
*/
@Data
@TableName("push_sys_secret")
public class SystemSecretDO {
private String id;
private String systemName;
private String secret;
}

View File

@@ -0,0 +1,35 @@
package com.njcn.msgpush.module.push.dal.dataobject.credential.dto;
/**
* @author caozehui
* @data 2026-03-31
*/
import jakarta.validation.constraints.NotEmpty;
import lombok.Data;
import java.io.Serializable;
/**
* 系统凭证请求 DTO
*
* @author msgpush
*/
@Data
public class CredentialReqDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 上游系统名称
*/
@NotEmpty(message = "上游系统名称不能为空")
private String systemName;
/**
* 密钥(用于生成凭证)
*/
@NotEmpty(message = "密钥不能为空")
private String secretKey;
}

View File

@@ -0,0 +1,37 @@
package com.njcn.msgpush.module.push.dal.dataobject.credential.dto;
/**
* @author caozehui
* @data 2026-03-31
*/
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 系统凭证响应 DTO
*
* @author msgpush
*/
@Data
public class CredentialRespDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 凭证令牌
*/
private String credentialToken;
/**
* 上游系统名称
*/
private String systemName;
/**
* 过期时间
*/
private LocalDateTime expiresTime;
}

View File

@@ -1,5 +1,6 @@
package com.njcn.msgpush.module.push.dal.dataobject.message; package com.njcn.msgpush.module.push.dal.dataobject.message;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO; import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
import lombok.Data; import lombok.Data;
@@ -27,7 +28,7 @@ public class MessageRecordDO extends BaseDO {
private String appName; private String appName;
/** /**
* 渠道类型sms/email/app_push * 渠道类型sms/email/app
*/ */
private String channel; private String channel;
@@ -61,10 +62,6 @@ public class MessageRecordDO extends BaseDO {
*/ */
private String templateParams; private String templateParams;
/**
* 额外信息
*/
private String extraInfo;
/** /**
* 状态pending/sending/success/failed/final_failed/blacklisted/quota_exceeded/rate_limited/abandoned * 状态pending/sending/success/failed/final_failed/blacklisted/quota_exceeded/rate_limited/abandoned

View File

@@ -19,7 +19,7 @@ public class SystemQuotaConfigDO extends BaseDO {
private Long id; private Long id;
/** /**
* 渠道类型sms/email/app_push * 渠道类型sms/email/app
*/ */
private String channel; private String channel;

View File

@@ -17,7 +17,7 @@ public class RateLimitConfigDO extends BaseDO {
private Long id; private Long id;
/** /**
* 渠道类型sms/email/app_push * 渠道类型sms/email/app
*/ */
private String channel; private String channel;

View File

@@ -1,5 +1,6 @@
package com.njcn.msgpush.module.push.dal.dataobject.retry; package com.njcn.msgpush.module.push.dal.dataobject.retry;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO; import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
import lombok.Data; import lombok.Data;
@@ -61,4 +62,9 @@ public class MessageRetryHistoryDO extends BaseDO {
* 第三方消息ID * 第三方消息ID
*/ */
private String thirdPartyId; private String thirdPartyId;
/**
* 重试来源AUTO_RETRY-系统自动重试MANUAL_RETRY-人工手动重试
*/
private String retrySource;
} }

View File

@@ -7,62 +7,32 @@ import lombok.EqualsAndHashCode;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/**
* @author caozehui
* @data 2026-02-27
* @description 消息重试队列表对应数据对象
*/
@Data @Data
@TableName("push_message_retry_queue") @TableName("push_message_retry_queue")
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public class MessageRetryQueueDO extends BaseDO { public class MessageRetryQueueDO extends BaseDO {
/**
* 主键ID
*/
private Long id; private Long id;
/**
* 关联message_record的message_id
*/
private Long messageId; private Long messageId;
/**
* 渠道类型
*/
private String channel; private String channel;
/**
* 接收者
*/
private String receiver; private String receiver;
/**
* 已重试次数
*/
private Integer retryCount; private Integer retryCount;
/**
* 最大重试次数
*/
private Integer maxRetry; private Integer maxRetry;
/**
* 下次重试时间
*/
private LocalDateTime nextRetryTime; private LocalDateTime nextRetryTime;
/** private Integer processStatus;
* 首次失败时间
*/ private LocalDateTime lockUntil;
private LocalDateTime firstFailTime; private LocalDateTime firstFailTime;
/**
* 最后一次重试时间
*/
private LocalDateTime lastRetryTime; private LocalDateTime lastRetryTime;
/**
* 最后失败原因
*/
private String lastErrorMsg; private String lastErrorMsg;
} }

View File

@@ -19,7 +19,7 @@ public class RetryStrategyConfigDO extends BaseDO {
private Long id; private Long id;
/** /**
* 渠道类型sms/email/app_push * 渠道类型sms/email/app
*/ */
private String channel; private String channel;

View File

@@ -4,10 +4,6 @@ import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper @Mapper
public interface ChannelProviderConfigMapper extends BaseMapperX<ChannelProviderConfigDO> { public interface ChannelProviderConfigMapper extends BaseMapperX<ChannelProviderConfigDO> {
List<ChannelProviderConfigDO> getActiveProviders();
} }

View File

@@ -1,11 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.msgpush.module.push.dal.mysql.channel.ChannelProviderConfigMapper">
<select id="getActiveProviders"
resultType="com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO">
select *
from push_channel_provider_config
where enabled = 1
</select>
</mapper>

View File

@@ -0,0 +1,11 @@
package com.njcn.msgpush.module.push.dal.mysql.credential;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
/**
* @author caozehui
* @data 2026-04-09
*/
public interface SystemSecretMappper extends BaseMapperX<SystemSecretDO> {
}

View File

@@ -18,6 +18,12 @@ public interface MessageRetryQueueMapper extends BaseMapperX<MessageRetryQueueDO
* @param limit 限制数量 * @param limit 限制数量
* @return 待重试消息列表 * @return 待重试消息列表
*/ */
List<MessageRetryQueueDO> selectNeedRetryMessages(@Param("currentTime") LocalDateTime currentTime, List<MessageRetryQueueDO> selectNeedRetryMessages(@Param("channel") String channel,
@Param("currentTime") LocalDateTime currentTime,
@Param("limit") int limit); @Param("limit") int limit);
int claimRetryMessage(@Param("messageId") Long messageId,
@Param("channel") String channel,
@Param("currentTime") LocalDateTime currentTime,
@Param("lockUntil") LocalDateTime lockUntil);
} }

View File

@@ -5,11 +5,32 @@
<select id="selectNeedRetryMessages" resultType="com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO"> <select id="selectNeedRetryMessages" resultType="com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO">
SELECT * SELECT *
FROM push_message_retry_queue FROM push_message_retry_queue
WHERE next_retry_time <![CDATA[ <= ]]> #{currentTime} WHERE channel = #{channel}
AND deleted = 0 AND next_retry_time <![CDATA[ <= ]]> #{currentTime}
AND (
process_status = 0
OR process_status IS NULL
OR lock_until IS NULL
OR lock_until <![CDATA[ < ]]> #{currentTime}
)
ORDER BY next_retry_time ASC ORDER BY next_retry_time ASC
LIMIT #{limit} LIMIT #{limit}
</select> </select>
<update id="claimRetryMessage">
UPDATE push_message_retry_queue
SET process_status = 1,
lock_until = #{lockUntil}
WHERE message_id = #{messageId}
AND channel = #{channel}
AND next_retry_time <![CDATA[ <= ]]> #{currentTime}
AND (
process_status = 0
OR process_status IS NULL
OR lock_until IS NULL
OR lock_until <![CDATA[ < ]]> #{currentTime}
)
</update>
</mapper> </mapper>

View File

@@ -18,7 +18,7 @@ import java.util.Arrays;
public enum ChannelTypeEnum implements ArrayValuable<String> { public enum ChannelTypeEnum implements ArrayValuable<String> {
SMS("sms", "短信"), SMS("sms", "短信"),
EMAIL("email", "邮箱"), EMAIL("email", "邮箱"),
APP_PUSH("app_push", "APP 推送"),; APP("app", "APP 推送"),;
public static final String[] ARRAYS = Arrays.stream(values()).map(ChannelTypeEnum::getCode).toArray(String[]::new); public static final String[] ARRAYS = Arrays.stream(values()).map(ChannelTypeEnum::getCode).toArray(String[]::new);

View File

@@ -0,0 +1,30 @@
package com.njcn.msgpush.module.push.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author caozehui
* @data 2026-04-07
*/
@Getter
@AllArgsConstructor
public enum RetrySourceEnum {
AUTO_RETRY("AUTO_RETRY", "系统自动重试"),
MANUAL_RETRY("MANUAL_RETRY", "人工手动重试");
private final String code;
private final String desc;
public static RetrySourceEnum getByCode(String code) {
if (code == null) {
return null;
}
for (RetrySourceEnum value : values()) {
if (value.getCode().equals(code)) {
return value;
}
}
return null;
}
}

View File

@@ -0,0 +1,115 @@
package com.njcn.msgpush.module.push.filter;
/**
* @author caozehui
* @data 2026-03-31
*/
import cn.hutool.core.util.StrUtil;
import com.njcn.msgpush.framework.common.exception.enums.GlobalErrorCodeConstants;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.common.util.servlet.ServletUtils;
import com.njcn.msgpush.framework.web.config.WebProperties;
import com.njcn.msgpush.framework.web.core.filter.ApiRequestFilter;
import com.njcn.msgpush.module.push.service.credential.CredentialServiceImpl;
import com.njcn.msgpush.module.push.service.credential.ICredentialService;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 凭证认证过滤器
*
* @author msgpush
*/
@Component
@Slf4j
public class CredentialAuthenticationFilter extends ApiRequestFilter implements Ordered {
@Autowired
private ICredentialService credentialService;
/**
* 凭证 Header 名称
*/
private static final String CREDENTIAL_HEADER = "X-Credential-Token";
/**
* 需要凭证认证的接口路径
*/
private static final String[] NOT_IGNORE_PATHS = {
"/admin-api/push/message/send/**",
};
private static final String[] IGNORE_PATHS = {
"/admin-api/push/credential/generate",
};
public CredentialAuthenticationFilter(WebProperties webProperties) {
super(webProperties);
}
@Override
protected boolean shouldNotFilter(HttpServletRequest request) {
// 检查是否在排除列表中
String path = request.getRequestURI().substring(request.getContextPath().length());
for (String notExcludePath : NOT_IGNORE_PATHS) {
if (path.equals(notExcludePath)) {
return false;
}
if (notExcludePath.endsWith("/**") && path.startsWith(notExcludePath.substring(0, notExcludePath.length() - 3))) {
return false;
}
}
for (String excludePath : IGNORE_PATHS){
if (path.equals(excludePath)) {
return true;
}
if (excludePath.endsWith("/**") && path.startsWith(excludePath.substring(0, excludePath.length() - 3))) {
return true;
}
}
return super.shouldNotFilter(request);
}
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
// 1. 获取凭证 token
String credentialToken = request.getHeader(CREDENTIAL_HEADER);
if (StrUtil.isEmpty(credentialToken)) {
credentialToken = request.getParameter("credential");
}
// 2. 如果没有凭证,继续过滤链
if (StrUtil.isEmpty(credentialToken)) {
ServletUtils.writeJSON(response, CommonResult.error(GlobalErrorCodeConstants.UNAUTHORIZED.getCode(), "缺少凭证"));
return;
}
// 3. 验证凭证
CredentialServiceImpl.CredentialInfo credentialInfo = credentialService.verifyCredential(credentialToken);
if (credentialInfo == null) {
// 校验失败
ServletUtils.writeJSON(response, CommonResult.error(GlobalErrorCodeConstants.UNAUTHORIZED.getCode(), "凭证无效或已过期"));
return;
}
filterChain.doFilter(request, response);
}
@Override
public int getOrder() {
return 1000;
}
}

View File

@@ -1,5 +1,6 @@
package com.njcn.msgpush.module.push.job; package com.njcn.msgpush.module.push.job;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService; import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -25,8 +26,7 @@ public class MessageRetryJob {
*/ */
@Scheduled(fixedRate = 10000) @Scheduled(fixedRate = 10000)
public void processSmsRetryQueue() { public void processSmsRetryQueue() {
log.info("开始处理短信重试队列:{}", LocalDateTime.now()); messageRetryQueueService.processRetryBatch(ChannelTypeEnum.SMS.getCode());
messageRetryQueueService.processRetryBatch("sms");
} }
/** /**
@@ -34,7 +34,7 @@ public class MessageRetryJob {
*/ */
//@Scheduled(fixedRate = 10000) //@Scheduled(fixedRate = 10000)
public void processEmailRetryQueue() { public void processEmailRetryQueue() {
messageRetryQueueService.processRetryBatch("email"); messageRetryQueueService.processRetryBatch(ChannelTypeEnum.EMAIL.getCode());
} }
/** /**
@@ -42,7 +42,7 @@ public class MessageRetryJob {
*/ */
//@Scheduled(fixedRate = 10000) //@Scheduled(fixedRate = 10000)
public void processAppPushRetryQueue() { public void processAppPushRetryQueue() {
messageRetryQueueService.processRetryBatch("app_push"); messageRetryQueueService.processRetryBatch(ChannelTypeEnum.APP.getCode());
} }
} }

View File

@@ -8,28 +8,18 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig
import java.util.List; import java.util.List;
public interface ChannelProviderConfigService extends IService<ChannelProviderConfigDO> { public interface ChannelProviderConfigService extends IService<ChannelProviderConfigDO> {
Page<ChannelProviderConfigDO> getPage(ChannelProviderConfigReqVO reqVO); Page<ChannelProviderConfigDO> getPage(ChannelProviderConfigReqVO reqVO);
List<ChannelProviderConfigDO> getActiveProviders();
/**
* 切换服务提供商enable字段
*
* @param id 服务提供商id
* @return
*/
ChannelProviderConfigDO toggleEnableField(String id); ChannelProviderConfigDO toggleEnableField(String id);
/**
* 根据类型和渠道获取服务提供商
*
* @param providerType 服务提供商类型
* @param channel 渠道
* @return
*/
ChannelProviderConfigDO getByTypeAndChannel(String providerType, String channel); ChannelProviderConfigDO getByTypeAndChannel(String providerType, String channel);
void failureUpdate(String providerType, String channel); void failureUpdate(String providerType, String channel);
void successUpdate(String providerType, String channel); void successUpdate(String providerType, String channel);
List<ChannelProviderConfigDO> getEnabledProviders(String channel);
boolean deleteByIds(List<String> ids);
} }

View File

@@ -1,5 +1,7 @@
package com.njcn.msgpush.module.push.service.channel; package com.njcn.msgpush.module.push.service.channel;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.stream.CollectorUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -12,6 +14,9 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import static com.njcn.msgpush.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
@Service @Service
public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProviderConfigMapper, ChannelProviderConfigDO> implements ChannelProviderConfigService { public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProviderConfigMapper, ChannelProviderConfigDO> implements ChannelProviderConfigService {
@@ -24,14 +29,31 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProvide
} }
@Override @Override
public List<ChannelProviderConfigDO> getActiveProviders() { @Transactional(rollbackFor = Exception.class)
return this.lambdaQuery().eq(ChannelProviderConfigDO::getEnabled, true).list();
}
@Override
public ChannelProviderConfigDO toggleEnableField(String id) { public ChannelProviderConfigDO toggleEnableField(String id) {
ChannelProviderConfigDO channelProviderConfigDO = this.getById(id); ChannelProviderConfigDO channelProviderConfigDO = this.getById(id);
channelProviderConfigDO.setEnabled(channelProviderConfigDO.getEnabled() ^ 0X0001); if (channelProviderConfigDO == null) {
throw invalidParamException("渠道服务商配置不存在id={}", id);
}
boolean enable = !Integer.valueOf(1).equals(channelProviderConfigDO.getEnabled());
if (enable) {
this.lambdaUpdate()
.eq(ChannelProviderConfigDO::getChannel, channelProviderConfigDO.getChannel())
.ne(ChannelProviderConfigDO::getId, channelProviderConfigDO.getId())
.set(ChannelProviderConfigDO::getEnabled, 0)
.update();
} else {
long enabledCount = this.lambdaQuery()
.eq(ChannelProviderConfigDO::getChannel, channelProviderConfigDO.getChannel())
.eq(ChannelProviderConfigDO::getEnabled, 1)
.count();
if (enabledCount <= 1) {
throw invalidParamException("渠道 {} 必须保留一个启用服务商,请先启用其它服务商后再禁用当前配置", channelProviderConfigDO.getChannel());
}
}
channelProviderConfigDO.setEnabled(enable ? 1 : 0);
this.updateById(channelProviderConfigDO); this.updateById(channelProviderConfigDO);
return channelProviderConfigDO; return channelProviderConfigDO;
} }
@@ -61,4 +83,29 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProvide
byTypeAndChannel.setHealthStatus(1); byTypeAndChannel.setHealthStatus(1);
this.updateById(byTypeAndChannel); this.updateById(byTypeAndChannel);
} }
@Override
public List<ChannelProviderConfigDO> getEnabledProviders(String channel) {
return this.lambdaQuery()
.eq(ChannelProviderConfigDO::getChannel, channel)
.eq(ChannelProviderConfigDO::getEnabled, 1)
.list();
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean deleteByIds(List<String> ids) {
List<ChannelProviderConfigDO> configs = this.listByIds(ids);
List<ChannelProviderConfigDO> enabledConfigs = configs.stream()
.filter(config -> Integer.valueOf(1).equals(config.getEnabled()))
.collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(enabledConfigs)) {
String channels = enabledConfigs.stream()
.map(ChannelProviderConfigDO::getChannel)
.distinct()
.collect(Collectors.joining(","));
throw invalidParamException("启用中的服务商配置禁止删除,请先禁用后再删除。涉及渠道:{}", channels);
}
return this.removeBatchByIds(ids);
}
} }

View File

@@ -0,0 +1,218 @@
package com.njcn.msgpush.module.push.service.credential;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.Mode;
import cn.hutool.crypto.Padding;
import cn.hutool.crypto.symmetric.AES;
import com.njcn.msgpush.framework.common.exception.ServiceException;
import com.njcn.msgpush.framework.common.exception.enums.ServiceErrorCodeRange;
import com.njcn.msgpush.framework.common.util.json.JsonUtils;
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialReqDTO;
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialRespDTO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author caozehui
* @data 2026-03-31
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class CredentialServiceImpl implements ICredentialService {
private static final long CREDENTIAL_EXPIRE_HOURS = 24L;
private static final long LOCK_WAIT_SECONDS = 3L;
private static final long LOCK_LEASE_SECONDS = 5L;
private static final String SYSTEM_KEY_PREFIX = "credential:";
private static final String LOCK_KEY_PREFIX = "credential:lock:";
private String credentialSecretKey = "88888888888888888888888888888888"; // 32 瀛楄妭
private final ISystemSecretService systemSecretService;
private final StringRedisTemplate stringRedisTemplate;
private final RedissonClient redissonClient;
@Override
public CredentialRespDTO generateCredential(CredentialReqDTO reqDTO) {
String systemName = reqDTO.getSystemName();
boolean validatedSecretRes = validateSecret(systemName, reqDTO.getSecretKey());
if (!validatedSecretRes) {
throw new ServiceException(ServiceErrorCodeRange.VALIDATE_SYSTEM_SECRET_FAIL);
}
LocalDateTime expiresTime = LocalDateTime.now().plusHours(CREDENTIAL_EXPIRE_HOURS);
String credential = encryptCredential(systemName, expiresTime);
RLock lock = redissonClient.getLock(formatLockKey(systemName));
boolean locked = false;
try {
locked = lock.tryLock(LOCK_WAIT_SECONDS, LOCK_LEASE_SECONDS, TimeUnit.SECONDS);
if (!locked) {
log.warn("[generateCredential][systemName({}) failed to acquire lock within {} seconds]",
systemName, LOCK_WAIT_SECONDS);
throw new ServiceException(ServiceErrorCodeRange.GENERATE_CREDENTIAL_FAIL);
}
deleteCredential(systemName);
CredentialInfo credentialInfo = new CredentialInfo(systemName, expiresTime, credential);
cacheCredential(systemName, credentialInfo);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[generateCredential][systemName({}) interrupted while waiting for lock]", systemName, e);
throw new ServiceException(ServiceErrorCodeRange.GENERATE_CREDENTIAL_FAIL);
} finally {
if (locked && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return new CredentialRespDTO()
.setCredentialToken(systemName + StrUtil.COLON + credential)
.setSystemName(systemName)
.setExpiresTime(expiresTime);
}
/**
* 验证凭证
*
* @param credentialToken 凭证 credentialToken
* @return
*/
@Override
public CredentialInfo verifyCredential(String credentialToken) {
if (StrUtil.isEmpty(credentialToken)) {
return null;
}
String[] split = credentialToken.split(StrUtil.COLON);
if (split.length != 2) {
return null;
}
String systemName = split[0];
CredentialInfo credentialInfo = getCredentialFromRedis(systemName);
if (ObjectUtil.isNull(credentialInfo)) {
return null;
}
if (credentialInfo.getExpiresTime().isBefore(LocalDateTime.now()) || !credentialInfo.getCredential().equals(split[1])) {
return null;
}
return credentialInfo;
}
/**
* 校验系统secretKey
*
* @param systemName
* @param secretKey
* @return
*/
private boolean validateSecret(String systemName, String secretKey) {
SystemSecretDO systemSecretDO = systemSecretService.getBySystemName(systemName);
return systemSecretDO != null && StrUtil.equals(systemSecretDO.getSecret(), secretKey);
}
/**
* 获取加密后的凭证credential
*
* @param systemName 系统名称
* @param expiresTime 过期时间
* @return
*/
private String encryptCredential(String systemName, LocalDateTime expiresTime) {
// 生成随机 IV
byte[] iv = RandomUtil.randomBytes(16);
// 创建 AES 实例
AES aes = new AES(
Mode.CBC,
Padding.PKCS5Padding,
credentialSecretKey.getBytes(StandardCharsets.UTF_8),
iv
);
String origin = systemName + StrUtil.C_COLON + expiresTime.getNano();
byte[] encrypted = aes.encrypt(origin.getBytes(StandardCharsets.UTF_8));
return Base64.encode(encrypted);
}
/**
* 缓存系统凭证credential
*
* @param systemName
* @param info
*/
private void cacheCredential(String systemName, CredentialInfo info) {
String credentialKey = formatCredentialKey(systemName);
String jsonValue = JsonUtils.toJsonString(info);
long remainingSeconds = Duration.between(LocalDateTime.now(), info.getExpiresTime()).getSeconds();
if (remainingSeconds > 0) {
stringRedisTemplate.opsForValue().set(credentialKey, jsonValue, remainingSeconds, TimeUnit.SECONDS);
}
}
/**
* 从redis中获取系统凭证信息
*
* @param systemName
* @return
*/
private CredentialInfo getCredentialFromRedis(String systemName) {
String credentialKey = formatCredentialKey(systemName);
String jsonValue = stringRedisTemplate.opsForValue().get(credentialKey);
if (StrUtil.isEmpty(jsonValue)) {
return null;
}
return JsonUtils.parseObject(jsonValue, CredentialInfo.class);
}
private void deleteCredential(String systemName) {
String credentialKey = formatCredentialKey(systemName);
String oldToken = stringRedisTemplate.opsForValue().get(credentialKey);
if (StrUtil.isNotEmpty(oldToken)) {
stringRedisTemplate.delete(credentialKey);
}
}
private String formatCredentialKey(String systemName) {
return SYSTEM_KEY_PREFIX + systemName;
}
/**
* 获取锁的key用于针对同一时刻多个请求保证多个请求只对同一个key进行加锁
*
* @param systemName
* @return
*/
private String formatLockKey(String systemName) {
return LOCK_KEY_PREFIX + systemName;
}
@Data
@AllArgsConstructor
public static class CredentialInfo {
private String systemName;
private LocalDateTime expiresTime;
private String credential;
}
}

View File

@@ -0,0 +1,26 @@
package com.njcn.msgpush.module.push.service.credential;
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialReqDTO;
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialRespDTO;
/**
* @author caozehui
* @data 2026-03-31
*/
public interface ICredentialService {
/**
* 生成系统凭证
*
* @param reqDTO 请求参数
* @return 凭证响应
*/
CredentialRespDTO generateCredential(CredentialReqDTO reqDTO);
/**
* 验证凭证是否有效
*
* @param token 凭证 token
* @return 凭证信息
*/
CredentialServiceImpl.CredentialInfo verifyCredential(String token);
}

View File

@@ -0,0 +1,12 @@
package com.njcn.msgpush.module.push.service.credential;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
/**
* @author caozehui
* @data 2026-04-09
*/
public interface ISystemSecretService extends IService<SystemSecretDO> {
SystemSecretDO getBySystemName(String systemName);
}

View File

@@ -0,0 +1,18 @@
package com.njcn.msgpush.module.push.service.credential;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
import com.njcn.msgpush.module.push.dal.mysql.credential.SystemSecretMappper;
import org.springframework.stereotype.Service;
/**
* @author caozehui
* @data 2026-04-09
*/
@Service
public class SystemSecretServiceImpl extends ServiceImpl<SystemSecretMappper, SystemSecretDO> implements ISystemSecretService {
@Override
public SystemSecretDO getBySystemName(String systemName) {
return this.lambdaQuery().eq(SystemSecretDO::getSystemName, systemName).one();
}
}

View File

@@ -5,29 +5,31 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.enums.RetrySourceEnum;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List; import java.util.List;
public interface MessageRecordService extends IService<MessageRecordDO> { public interface MessageRecordService extends IService<MessageRecordDO> {
/** /**
* 发送消息包括email、sms、app_push * 发送消息包括email、sms、app
* *
* @param reqVOList * @param reqVOList
* @param systemName
* @return channelTypeEnum
* @return 发送的结果 * @return 发送的结果
*/ */
List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList); List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList, ChannelTypeEnum channelTypeEnum, String systemName);
/** /**
* 处理发送消息 * 处理发送消息
* *
* @param messageRecordDOList * @param messageRecordDOList
* @param retrySource
* @return 发送的结果 * @return 发送的结果
*/ */
List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList); List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList, RetrySourceEnum retrySource);
/** /**
* 添加消息记录 * 添加消息记录

View File

@@ -1,6 +1,7 @@
package com.njcn.msgpush.module.push.service.message; package com.njcn.msgpush.module.push.service.message;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -9,6 +10,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils; import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.checker.MsgPushGuardChain; import com.njcn.msgpush.module.push.checker.MsgPushGuardChain;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory; import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactoryRegistry;
import com.njcn.msgpush.module.push.client.sender.SendOutcome; import com.njcn.msgpush.module.push.client.sender.SendOutcome;
import com.njcn.msgpush.module.push.client.sender.SendResult; import com.njcn.msgpush.module.push.client.sender.SendResult;
import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.Sender;
@@ -23,11 +25,11 @@ import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO; import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO; import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.enums.RetrySourceEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService; import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService; import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService; import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@@ -35,7 +37,6 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
@Service @Service
public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, MessageRecordDO> implements MessageRecordService { public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, MessageRecordDO> implements MessageRecordService {
@@ -53,8 +54,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
public ChannelProviderConfigService channelProviderConfigService; public ChannelProviderConfigService channelProviderConfigService;
@Autowired @Autowired
@Qualifier("messageProviderFactoryMap") private MessageProviderFactoryRegistry messageProviderFactoryRegistry;
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
@Autowired @Autowired
private MsgPushGuardChain msgPushGuardChain; private MsgPushGuardChain msgPushGuardChain;
@@ -69,16 +69,18 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
private MessageConfirmRedisDAO messageConfirmRedisDAO; private MessageConfirmRedisDAO messageConfirmRedisDAO;
@Override @Override
public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList) { public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList, ChannelTypeEnum channelTypeEnum, String systemName) {
List<MessageRecordDO> messageRecordDOList = this.createMessageRecords(reqVOList); List<MessageRecordDO> messageRecordDOList = this.createMessageRecords(reqVOList, channelTypeEnum, systemName);
return this.processSendMsg(messageRecordDOList); return this.processSendMsg(messageRecordDOList, null);
} }
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public List<MessageRecordDO> createMessageRecords(List<MessageRecordReqVO> reqVOList) { public List<MessageRecordDO> createMessageRecords(List<MessageRecordReqVO> reqVOList, ChannelTypeEnum channelTypeEnum, String systemName) {
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class); List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
messageRecordDOList.forEach(messageRecordDO -> { messageRecordDOList.forEach(messageRecordDO -> {
messageRecordDO.setChannel(channelTypeEnum.getCode());
messageRecordDO.setStatus(MsgStatusConstant.PENDING); messageRecordDO.setStatus(MsgStatusConstant.PENDING);
messageRecordDO.setAppName(systemName);
// messageRecordDO.setRetryCount(0); // messageRecordDO.setRetryCount(0);
}); });
this.saveBatch(messageRecordDOList); this.saveBatch(messageRecordDOList);
@@ -90,12 +92,12 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
* 这里统一接收 sender 返回的 SendResult再决定消息状态、重试队列和服务商健康度如何变化。 * 这里统一接收 sender 返回的 SendResult再决定消息状态、重试队列和服务商健康度如何变化。
*/ */
@Override @Override
public List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList) { public List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList, RetrySourceEnum retrySource) {
msgPushGuardChain.checkAll(messageRecordDOList); msgPushGuardChain.checkAll(messageRecordDOList);
List<MessageSendResultVO> resultList = new ArrayList<>(); List<MessageSendResultVO> resultList = new ArrayList<>();
for (MessageRecordDO messageRecordDO : messageRecordDOList) { for (MessageRecordDO messageRecordDO : messageRecordDOList) {
try { try {
MessageSendResultVO resultVO = this.processSingleMessage(messageRecordDO); MessageSendResultVO resultVO = this.processSingleMessage(messageRecordDO, retrySource);
resultList.add(resultVO); resultList.add(resultVO);
} catch (Exception e) { } catch (Exception e) {
MessageSendResultVO failedVO = new MessageSendResultVO(); MessageSendResultVO failedVO = new MessageSendResultVO();
@@ -108,7 +110,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
return resultList; return resultList;
} }
private MessageSendResultVO processSingleMessage(MessageRecordDO messageRecordDO) { private MessageSendResultVO processSingleMessage(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) {
MessageSendResultVO resultVO = new MessageSendResultVO(); MessageSendResultVO resultVO = new MessageSendResultVO();
resultVO.setMessageId(messageRecordDO.getId()); resultVO.setMessageId(messageRecordDO.getId());
@@ -118,13 +120,24 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
this.updateById(messageRecordDO); this.updateById(messageRecordDO);
return resultVO; return resultVO;
} }
List<ChannelProviderConfigDO> enabledProviders = channelProviderConfigService.getEnabledProviders(messageRecordDO.getChannel());
SendResult sendResult = null;
if (CollectionUtil.isNotEmpty(enabledProviders)) {
ChannelProviderConfigDO channelProviderConfigDO = enabledProviders.get(0);
MessageProviderFactory messageProviderFactory = messageProviderFactoryRegistry.getFactory(channelProviderConfigDO.getProviderType());
sendResult = this.validateProviderAndChannel(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService messageRecordDO.setProviderType(channelProviderConfigDO.getProviderType());
.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); if (sendResult == null) {
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType()); sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
SendResult sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory); }
this.applySendResult(messageRecordDO, sendResult); this.applySendResult(messageRecordDO, sendResult);
this.recordRetryHistory(messageRecordDO); this.recordRetryHistory(messageRecordDO, retrySource);
} else {
sendResult = this.validateProviderAndChannel(messageRecordDO, null, null);
this.applySendResult(messageRecordDO, sendResult);
}
// 在这里修改接口返回的是投递结果,还是调用接口结果 // 在这里修改接口返回的是投递结果,还是调用接口结果
resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome())); resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome()));
@@ -132,26 +145,29 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
return resultVO; return resultVO;
} }
private SendResult sendMessage(MessageRecordDO messageRecordDO, private SendResult validateProviderAndChannel(MessageRecordDO messageRecordDO, ChannelProviderConfigDO channelProviderConfigDO, MessageProviderFactory messageProviderFactory) {
ChannelProviderConfigDO channelProviderConfigDO, ChannelTypeEnum channelType = ChannelTypeEnum.getByCode(messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory) { if (channelType == null) {
return SendResult.unsupported("当前发送渠道类型暂不支持");
}
if (ObjectUtil.isNull(messageProviderFactory)) { if (ObjectUtil.isNull(messageProviderFactory)) {
return SendResult.configInvalid("当前服务商未启用或未注册"); return SendResult.configInvalid("当前服务商未启用或未注册");
} }
if (ObjectUtil.isNull(channelProviderConfigDO)) { if (ObjectUtil.isNull(channelProviderConfigDO)) {
return SendResult.configInvalid("当前服务商渠道配置不存在"); return SendResult.configInvalid("当前服务商渠道配置不存在");
} }
return null;
ChannelTypeEnum channelType = ChannelTypeEnum.getByCode(messageRecordDO.getChannel());
if (channelType == null) {
return SendResult.unsupported("当前发送渠道类型暂不支持");
} }
private SendResult sendMessage(MessageRecordDO messageRecordDO,
ChannelProviderConfigDO channelProviderConfigDO,
MessageProviderFactory messageProviderFactory) {
ChannelTypeEnum channelType = ChannelTypeEnum.getByCode(messageRecordDO.getChannel());
return switch (channelType) { return switch (channelType) {
case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO); case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
case EMAIL -> messageProviderFactory.createEmailSender(channelProviderConfigDO, sender) case EMAIL -> messageProviderFactory.createEmailSender(channelProviderConfigDO, sender)
.sendEmail(messageRecordDO, new HashMap<>()); .sendEmail(messageRecordDO, new HashMap<>());
case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO); case APP -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
}; };
} }
@@ -177,6 +193,9 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
// 成功或已受理都视为本次调用已正常投递,更新成功健康度并消耗配额。 // 成功或已受理都视为本次调用已正常投递,更新成功健康度并消耗配额。
if (SendOutcome.ACCEPTED.equals(sendResult.getOutcome())) { if (SendOutcome.ACCEPTED.equals(sendResult.getOutcome())) {
messageConfirmRedisDAO.addToConfirmQueue(messageRecordDO); messageConfirmRedisDAO.addToConfirmQueue(messageRecordDO);
systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName());
rateLimitRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver());
} else if (SendOutcome.RETRYABLE_FAILED.equals(sendResult.getOutcome())) { } else if (SendOutcome.RETRYABLE_FAILED.equals(sendResult.getOutcome())) {
messageRetryQueueService.saveOrUpdateRetryMessage(messageRecordDO); messageRetryQueueService.saveOrUpdateRetryMessage(messageRecordDO);
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel()); channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
@@ -192,11 +211,12 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
} }
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void recordRetryHistory(MessageRecordDO messageRecordDO) { public void recordRetryHistory(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) {
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id"); MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
messageRetryHistoryDO.setMessageId(messageRecordDO.getId()); messageRetryHistoryDO.setMessageId(messageRecordDO.getId());
messageRetryHistoryDO.setStatus(messageRecordDO.getStatus()); messageRetryHistoryDO.setStatus(messageRecordDO.getStatus());
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getId())); messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getId()));
messageRetryHistoryDO.setRetrySource(retrySource != null ? retrySource.getCode() : null);
messageRetryHistoryService.add(messageRetryHistoryDO); messageRetryHistoryService.add(messageRetryHistoryDO);
} }
@@ -256,9 +276,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) { public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) {
QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>(); QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>();
wrapper.lambda() wrapper.lambda()
.eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRecordDO::getChannel, reqVO.getChannel()) .eq(StrUtil.isNotBlank(reqVO.getMessageType()), MessageRecordDO::getMessageType, reqVO.getMessageType());
.eq(StrUtil.isNotBlank(reqVO.getProviderType()), MessageRecordDO::getProviderType, reqVO.getProviderType())
.eq(StrUtil.isNotBlank(reqVO.getAppName()), MessageRecordDO::getAppName, reqVO.getAppName());
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper); return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
} }
} }

View File

@@ -7,15 +7,14 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.pojo.PageResult; import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant; import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO; import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.RetryStrategyConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper; import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper;
import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO; import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum; import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.enums.RetrySourceEnum;
import com.njcn.msgpush.module.push.service.message.MessageRecordService; import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -33,34 +32,34 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@Service @Service
public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueMapper, MessageRetryQueueDO> implements MessageRetryQueueService, CommandLineRunner { public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueMapper, MessageRetryQueueDO>
implements MessageRetryQueueService, CommandLineRunner {
private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_MAX_RETRY_COUNT = 5;
private static final int PROCESS_STATUS_WAITING = 0;
private static final int PROCESS_STATUS_PROCESSING = 1;
private static final int DEFAULT_LOCK_SECONDS = 120;
private static final List<String> CHANNELS = Arrays.asList(
ChannelTypeEnum.SMS.getCode(),
ChannelTypeEnum.EMAIL.getCode(),
ChannelTypeEnum.APP.getCode());
@Autowired @Autowired
private MessageRetryRedisDAO messageRetryRedisDAO; private MessageRetryRedisDAO messageRetryRedisDAO;
@Autowired @Autowired
private MessageRecordService messageRecordService; private MessageRecordService messageRecordService;
@Autowired @Autowired
public RetryStrategyConfigService retryStrategyConfigService; private RetryStrategyConfigService retryStrategyConfigService;
@Autowired
private MessageConfirmRedisDAO messageConfirmRedisDAO;
private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_MAX_RETRY_COUNT = 5;
private static final List<String> CHANNELS = Arrays.asList(
ChannelTypeEnum.SMS.getCode(),
ChannelTypeEnum.EMAIL.getCode(),
ChannelTypeEnum.APP_PUSH.getCode());
private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false); private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false);
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void saveOrUpdateRetryMessage(MessageRecordDO message) { public void saveOrUpdateRetryMessage(MessageRecordDO message) {
MessageRetryQueueDO existing = super.baseMapper.selectOne( MessageRetryQueueDO existing = baseMapper.selectOne(new LambdaQueryWrapper<MessageRetryQueueDO>()
new LambdaQueryWrapper<MessageRetryQueueDO>() .eq(MessageRetryQueueDO::getMessageId, message.getId()));
.eq(MessageRetryQueueDO::getMessageId, message.getId())
);
message.setLastRetryTime(message.getSendTime()); message.setLastRetryTime(message.getSendTime());
if (ObjectUtil.isNull(existing)) { if (ObjectUtil.isNull(existing)) {
@@ -68,17 +67,18 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
retryRecord.setMessageId(message.getId()); retryRecord.setMessageId(message.getId());
retryRecord.setChannel(message.getChannel()); retryRecord.setChannel(message.getChannel());
retryRecord.setReceiver(message.getReceiver()); retryRecord.setReceiver(message.getReceiver());
retryRecord.setRetryCount(1); retryRecord.setRetryCount(1);
RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(message.getChannel()); RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(message.getChannel());
retryRecord.setMaxRetry(ObjectUtil.isNull(strategyConfig) ? DEFAULT_MAX_RETRY_COUNT : strategyConfig.getMaxRetryCount()); retryRecord.setMaxRetry(ObjectUtil.isNull(strategyConfig) ? DEFAULT_MAX_RETRY_COUNT : strategyConfig.getMaxRetryCount());
retryRecord.setFirstFailTime(message.getSendTime()); retryRecord.setFirstFailTime(message.getSendTime());
retryRecord.setLastRetryTime(message.getSendTime()); retryRecord.setLastRetryTime(message.getSendTime());
retryRecord.setLastErrorMsg(message.getErrorMsg()); retryRecord.setLastErrorMsg(message.getErrorMsg());
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), retryRecord.getRetryCount()); LocalDateTime nextRetryTime = calculateNextRetryTime(message.getChannel(), retryRecord.getRetryCount());
retryRecord.setNextRetryTime(nextRetryTime); retryRecord.setNextRetryTime(nextRetryTime);
retryRecord.setProcessStatus(PROCESS_STATUS_WAITING);
retryRecord.setLockUntil(null);
message.setNextRetryTime(nextRetryTime); message.setNextRetryTime(nextRetryTime);
this.save(retryRecord); save(retryRecord);
if (!messageRetryRedisDAO.addToRetryQueue(message)) { if (!messageRetryRedisDAO.addToRetryQueue(message)) {
log.warn("消息已写入数据库重试队列,但 Redis 入队失败: messageId={}", message.getId()); log.warn("消息已写入数据库重试队列,但 Redis 入队失败: messageId={}", message.getId());
} }
@@ -93,17 +93,19 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
message.setStatus(MsgStatusConstant.FINALFAILED); message.setStatus(MsgStatusConstant.FINALFAILED);
message.setNextRetryTime(null); message.setNextRetryTime(null);
existing.setNextRetryTime(null); existing.setNextRetryTime(null);
this.updateById(existing); updateById(existing);
this.deleteByMessageIds(Collections.singletonList(message.getId())); deleteByMessageIds(Collections.singletonList(message.getId()));
messageRetryRedisDAO.removeFromRetryQueue(message.getChannel(), message.getId()); messageRetryRedisDAO.removeFromRetryQueue(message.getChannel(), message.getId());
return; return;
} }
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), newRetryCount); LocalDateTime nextRetryTime = calculateNextRetryTime(message.getChannel(), newRetryCount);
existing.setRetryCount(newRetryCount); existing.setRetryCount(newRetryCount);
existing.setNextRetryTime(nextRetryTime); existing.setNextRetryTime(nextRetryTime);
existing.setProcessStatus(PROCESS_STATUS_WAITING);
existing.setLockUntil(null);
message.setNextRetryTime(nextRetryTime); message.setNextRetryTime(nextRetryTime);
this.updateById(existing); updateById(existing);
if (!messageRetryRedisDAO.addToRetryQueue(message)) { if (!messageRetryRedisDAO.addToRetryQueue(message)) {
log.warn("消息已更新数据库重试队列,但 Redis 入队失败: messageId={}", message.getId()); log.warn("消息已更新数据库重试队列,但 Redis 入队失败: messageId={}", message.getId());
} }
@@ -136,7 +138,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
plusSeconds = 60 * 60 * 4; plusSeconds = 60 * 60 * 4;
} }
} }
case APP_PUSH -> { case APP -> {
if (retryCount == 1) { if (retryCount == 1) {
plusSeconds = 60; plusSeconds = 60;
} else { } else {
@@ -147,11 +149,9 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
} else { } else {
String retryIntervals = strategyConfig.getRetryIntervals(); String retryIntervals = strategyConfig.getRetryIntervals();
String[] split = retryIntervals.split(String.valueOf(StrUtil.C_COMMA)); String[] split = retryIntervals.split(String.valueOf(StrUtil.C_COMMA));
if (retryCount >= split.length) { plusSeconds = retryCount >= split.length
plusSeconds = Long.parseLong(split[split.length - 1]); ? Long.parseLong(split[split.length - 1])
} else { : Long.parseLong(split[retryCount - 1]);
plusSeconds = Long.parseLong(split[retryCount - 1]);
}
} }
return LocalDateTime.now().plusSeconds(plusSeconds); return LocalDateTime.now().plusSeconds(plusSeconds);
} }
@@ -159,7 +159,6 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Override @Override
public void processRetryBatch(String channel) { public void processRetryBatch(String channel) {
long epochMilli = System.currentTimeMillis(); long epochMilli = System.currentTimeMillis();
// 使用原子认领机制,一次性完成读取 + 删除
List<String> claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE); List<String> claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(claimedMessageIds)) { if (CollUtil.isEmpty(claimedMessageIds)) {
@@ -168,48 +167,48 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
epochMilli = System.currentTimeMillis(); epochMilli = System.currentTimeMillis();
claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE); claimedMessageIds = messageRetryRedisDAO.claimAndRemoveDueMessages(channel, epochMilli, DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(claimedMessageIds)) { if (CollUtil.isEmpty(claimedMessageIds)) {
return; return;
} }
} }
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(claimedMessageIds); LocalDateTime currentTime = LocalDateTime.now();
LocalDateTime lockUntil = currentTime.plusSeconds(DEFAULT_LOCK_SECONDS);
List<Long> dbClaimedMessageIds = claimedMessageIds.stream()
.map(Long::valueOf)
.filter(messageId -> claimRetryMessage(messageId, channel, currentTime, lockUntil))
.collect(Collectors.toList());
if (CollUtil.isEmpty(dbClaimedMessageIds)) {
return;
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(dbClaimedMessageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) { for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO); processSingleRetry(messageRecordDO, RetrySourceEnum.AUTO_RETRY);
} }
} }
/** private void processSingleRetry(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) {
* 处理单条消息重试,根据执行结果决定是否重新入队 if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) {
* 成功:不再回 Redis并逻辑删除数据库队列记录
* 失败且继续重试:重新计算 nextRetryTime重新写入 Redis 和数据库队列表
* 失败且终止:不再回 Redis并逻辑删除数据库队列记录
*
* @param messageRecordDO 消息记录
*/
private void processSingleRetry(MessageRecordDO messageRecordDO) {
messageRecordDO.setStatus(MsgStatusConstant.PENDING); messageRecordDO.setStatus(MsgStatusConstant.PENDING);
messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0); messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO), retrySource);
messageRecordService.updateRetryCount(messageRecordDO.getId()); messageRecordService.updateRetryCount(messageRecordDO.getId());
finalizeRetryLease(messageRecordDO);
} else {
deleteByMessageIds(Collections.singletonList(messageRecordDO.getId()));
} }
/**
* 删除重试记录(数据库逻辑删除 + Redis 已提前删除)
*/
private void deleteRetryRecord(MessageRecordDO message) {
this.deleteByMessageIds(Collections.singletonList(message.getId()));
messageRecordService.updateById(message);
} }
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void manualRetry(List<String> messageIds) { public void manualRetry(List<String> messageIds) {
if (CollUtil.isNotEmpty(messageIds)) { if (CollUtil.isEmpty(messageIds)) {
return;
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds); List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds);
for (MessageRecordDO messageRecordDO : messageRecordDOList) { for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO); processSingleRetry(messageRecordDO, RetrySourceEnum.MANUAL_RETRY);
}
} }
} }
@@ -223,7 +222,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
.le(ObjectUtil.isNotNull(reqVO.getMaxRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMaxRetryCount()) .le(ObjectUtil.isNotNull(reqVO.getMaxRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMaxRetryCount())
.orderByAsc(MessageRetryQueueDO::getNextRetryTime) .orderByAsc(MessageRetryQueueDO::getNextRetryTime)
.orderByDesc(MessageRetryQueueDO::getId); .orderByDesc(MessageRetryQueueDO::getId);
return this.baseMapper.selectPage(reqVO, queryWrapper); return baseMapper.selectPage(reqVO, queryWrapper);
} }
@Override @Override
@@ -231,14 +230,11 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
if (CollUtil.isEmpty(messageIds)) { if (CollUtil.isEmpty(messageIds)) {
return false; return false;
} }
return this.baseMapper.delete(new LambdaQueryWrapper<MessageRetryQueueDO>().in(MessageRetryQueueDO::getMessageId, messageIds)) > 0;
return this.remove(new LambdaQueryWrapper<MessageRetryQueueDO>()
.in(MessageRetryQueueDO::getMessageId, messageIds));
} }
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) {
rebuildRetryQueueOnStartup(); rebuildRetryQueueOnStartup();
startupRebuildCompleted.set(true); startupRebuildCompleted.set(true);
} }
@@ -247,18 +243,13 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void rebuildRetryQueueOnStartup() { public void rebuildRetryQueueOnStartup() {
log.info("========== 开始重建重试队列 =========="); log.info("========== 开始重建重试队列 ==========");
for (String channel : CHANNELS) { for (String channel : CHANNELS) {
try { try {
log.info("正在清空渠道 [{}] 的 Redis 重试队列", channel); log.info("正在清空渠道 [{}] 的 Redis 重试队列", channel);
messageRetryRedisDAO.clearRetryQueue(channel); messageRetryRedisDAO.clearRetryQueue(channel);
log.info("正在从数据库加载渠道 [{}] 的待重试消息", channel); log.info("正在从数据库加载渠道 [{}] 的待重试消息", channel);
List<MessageRetryQueueDO> dbRecords = this.lambdaQuery() List<MessageRetryQueueDO> dbRecords = listEligibleQueueRecords(channel);
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.list();
if (CollUtil.isEmpty(dbRecords)) { if (CollUtil.isEmpty(dbRecords)) {
log.info("渠道 [{}] 没有待重试的消息", channel); log.info("渠道 [{}] 没有待重试的消息", channel);
continue; continue;
@@ -266,24 +257,21 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList()); List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList());
List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds); List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds);
if (CollUtil.isEmpty(messages)) { if (CollUtil.isEmpty(messages)) {
log.warn("渠道 [{}] 的消息记录不存在", channel); log.warn("渠道 [{}] 的消息记录不存在", channel);
continue; continue;
} }
int successCount = messageRetryRedisDAO.batchAddToRetryQueue(messages); int successCount = messageRetryRedisDAO.batchAddToRetryQueue(messages);
log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}", log.info("渠道 [{}] 重建完成:数据库记录数={}, 成功入队 Redis 数={}", channel, dbRecords.size(), successCount);
channel, dbRecords.size(), successCount);
} catch (Exception e) { } catch (Exception e) {
log.error("重建渠道 [{}] 的重试队列失败", channel, e); log.error("重建渠道 [{}] 的重试队列失败", channel, e);
} }
} }
log.info("========== 重试队列重建完成 =========="); log.info("========== 重试队列重建完成 ==========");
} }
// 每60分钟主动从数据库中扫描一次将消息重试数据同步到 Redis 中
@Scheduled(fixedRate = 600000) @Scheduled(fixedRate = 600000)
@Override @Override
public void syncRetryQueueConsistency() { public void syncRetryQueueConsistency() {
@@ -291,25 +279,22 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
log.debug("启动重建未完成,跳过本次同步"); log.debug("启动重建未完成,跳过本次同步");
return; return;
} }
log.info("开始同步redis与数据库重试队列一致性"); log.info("开始同步 Redis 与数据库重试队列一致性");
for (String channel : CHANNELS) { for (String channel : CHANNELS) {
List<MessageRetryQueueDO> dbRecords = this.lambdaQuery() List<MessageRetryQueueDO> dbRecords = listEligibleQueueRecords(channel);
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.list();
if (CollUtil.isEmpty(dbRecords)) { if (CollUtil.isEmpty(dbRecords)) {
continue; continue;
} }
for (MessageRetryQueueDO dbRecord : dbRecords) { for (MessageRetryQueueDO dbRecord : dbRecords) {
boolean existsInRedis = messageRetryRedisDAO.existsInRetryQueue(channel, dbRecord.getMessageId()); boolean existsInRedis = messageRetryRedisDAO.existsInRetryQueue(channel, dbRecord.getMessageId());
if (existsInRedis) {
continue;
}
if (!existsInRedis) {
Long messageId = dbRecord.getMessageId(); Long messageId = dbRecord.getMessageId();
MessageRecordDO message = messageRecordService.getById(messageId); MessageRecordDO message = messageRecordService.getById(messageId);
if (message != null && message.getNextRetryTime() != null) { if (message != null && message.getNextRetryTime() != null) {
boolean added = messageRetryRedisDAO.addToRetryQueue(message); boolean added = messageRetryRedisDAO.addToRetryQueue(message);
if (added) { if (added) {
@@ -323,35 +308,24 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
} }
} }
} }
}
@Override @Override
public void fallbackToDatabase(String channel) { public void fallbackToDatabase(String channel) {
boolean redisAvailable = messageRetryRedisDAO.isRedisAvailable(); if (!messageRetryRedisDAO.isRedisAvailable()) {
if (!redisAvailable) {
log.warn("Redis 不可用channel={}", channel); log.warn("Redis 不可用channel={}", channel);
return; return;
} }
fallbackToDatabaseDirectly(channel); fallbackToDatabaseDirectly(channel);
} }
/**
* 从数据库补拉
*
* @param channel
*/
private void fallbackToDatabaseDirectly(String channel) { private void fallbackToDatabaseDirectly(String channel) {
List<MessageRetryQueueDO> dbRecords = baseMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE); List<MessageRetryQueueDO> dbRecords = baseMapper.selectNeedRetryMessages(channel, LocalDateTime.now(), DEFAULT_BATCH_SIZE);
if (CollUtil.isEmpty(dbRecords)) { if (CollUtil.isEmpty(dbRecords)) {
return; return;
} }
List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList()); List<Long> messageIds = dbRecords.stream().map(MessageRetryQueueDO::getMessageId).collect(Collectors.toList());
List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds); List<MessageRecordDO> messages = messageRecordService.listByIds(messageIds);
if (CollUtil.isEmpty(messages)) { if (CollUtil.isEmpty(messages)) {
return; return;
} }
@@ -360,4 +334,63 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
messageRetryRedisDAO.batchAddToRetryQueue(messages); messageRetryRedisDAO.batchAddToRetryQueue(messages);
} }
} }
/**
* 通过设置lockUntil时间、process_status=1来从数据库层抢占该条重试消息。
*/
private boolean claimRetryMessage(Long messageId, String channel, LocalDateTime currentTime, LocalDateTime lockUntil) {
return baseMapper.claimRetryMessage(messageId, channel, currentTime, lockUntil) > 0;
}
/**
* 完成重试租约处理,根据消息状态执行相应的清理或重置操作
* - 如果消息状态为可重试失败,直接返回,不进行任何处理
* - 如果消息状态为待处理重置队列记录的处理状态和锁时间并将消息重新加入Redis重试队列
* - 其他状态成功、最终失败等从Redis重试队列移除并删除数据库中的重试队列记录
*
* @param messageRecordDO 消息记录对象
*/
private void finalizeRetryLease(MessageRecordDO messageRecordDO) {
if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) {
return;
}
// “处理过程中断,但消息仍停留在 PENDING 时,主动释放 lease 并回队列”的兜底补偿逻辑
if (MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) {
lambdaUpdate()
.eq(MessageRetryQueueDO::getMessageId, messageRecordDO.getId())
.set(MessageRetryQueueDO::getProcessStatus, PROCESS_STATUS_WAITING)
.set(MessageRetryQueueDO::getLockUntil, null)
.update();
if (messageRecordDO.getNextRetryTime() != null) {
messageRetryRedisDAO.addToRetryQueue(messageRecordDO);
}
return;
}
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getId());
deleteByMessageIds(Collections.singletonList(messageRecordDO.getId()));
}
/**
* 查询指定通道下符合重试条件的队列记录
* 筛选条件:
* - 匹配指定通道
* - 下次重试时间不为空
* - 处理状态为等待中,或处理状态为空,或锁时间为空,或锁时间已过期
*
* @param channel 消息通道
* @return 符合条件的重试队列记录列表
*/
private List<MessageRetryQueueDO> listEligibleQueueRecords(String channel) {
LocalDateTime now = LocalDateTime.now();
return lambdaQuery()
.eq(MessageRetryQueueDO::getChannel, channel)
.isNotNull(MessageRetryQueueDO::getNextRetryTime)
.and(wrapper -> wrapper.eq(MessageRetryQueueDO::getProcessStatus, PROCESS_STATUS_WAITING)
.or().isNull(MessageRetryQueueDO::getProcessStatus)
.or().isNull(MessageRetryQueueDO::getLockUntil)
.or().lt(MessageRetryQueueDO::getLockUntil, now))
.list();
}
} }

View File

@@ -34,5 +34,11 @@ public interface RetryStrategyConfigService {
*/ */
boolean updateStrategyConfig(RetryStrategyConfigReqVO strategyConfigReqVO); boolean updateStrategyConfig(RetryStrategyConfigReqVO strategyConfigReqVO);
/**
* 切换重试策略的启用/禁用状态
*
* @param id 重试策略配置ID
* @return 是否切换成功
*/
boolean toggleEnableField(String id); boolean toggleEnableField(String id);
} }

View File

@@ -2,6 +2,7 @@ package com.njcn.msgpush.module.push.sms;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory; import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactoryRegistry;
import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender; import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO; import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
@@ -13,15 +14,12 @@ import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID;
/** /**
* @author caozehui * @author caozehui
@@ -36,8 +34,7 @@ public class MsgPushClientTest {
private MessageRecordService messageRecordService; private MessageRecordService messageRecordService;
@Autowired @Autowired
@Qualifier("messageProviderFactoryMap") private MessageProviderFactoryRegistry messageProviderFactoryRegistry;
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
@Autowired @Autowired
private Sender sender; private Sender sender;
@@ -57,29 +54,27 @@ public class MsgPushClientTest {
// message.setProviderType(MsgPushConstant.PROVIDER_TYPE_ALI_YUN); // message.setProviderType(MsgPushConstant.PROVIDER_TYPE_ALI_YUN);
// messageIdList.add(message); // messageIdList.add(message);
// } // }
for (int i = 0; i < 1; i++) { // for (int i = 0; i < 1; i++) {
MessageRecordReqVO message = new MessageRecordReqVO(); // MessageRecordReqVO message = new MessageRecordReqVO();
message.setId(1234567890L); // message.setId(1234567890L);
message.setAppName("NPQS-9000"); // message.setChannel(ChannelTypeEnum.SMS.getMsg());
message.setChannel(ChannelTypeEnum.SMS.getMsg()); // message.setReceiver("18839431215");
message.setReceiver("18839431215"); // message.setContent("【南京灿能电力】测试短信" + i + ",请忽略。" + LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
message.setContent("【南京灿能电力】测试短信" + i + ",请忽略。" + LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); // messageIdList.add(message);
message.setProviderType(ProviderTypeEnum.TELECOM.getCode()); // }
messageIdList.add(message); // List<MessageSendResultVO> sendResult = messageRecordService.send(messageIdList, ChannelTypeEnum.SMS);
} //
List<MessageSendResultVO> sendResult = messageRecordService.send(messageIdList); // Thread.sleep(9000);
// System.out.println(JSON.toJSONString(sendResult));
Thread.sleep(9000);
System.out.println(JSON.toJSONString(sendResult));
} }
@Test @Test
public void templateSelect() { public void templateSelect() {
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(ProviderTypeEnum.TELECOM.getCode()); MessageProviderFactory messageProviderFactory = messageProviderFactoryRegistry.getFactory(ProviderTypeEnum.TELECOM.getCode());
ChannelProviderConfigDO config = new ChannelProviderConfigDO(); ChannelProviderConfigDO config = new ChannelProviderConfigDO();
config.setApiUrl("https://sms.ymeeting.cn/smsv2"); config.setApiUrl("https://sms.ymeeting.cn/smsv2");
config.setAppKey("925631"); config.setAppKey("925631");
config.setAppSecret("AMW2pOVrdky"); config.setSecret("AMW2pOVrdky");
config.setExtno("106905631"); config.setExtno("106905631");
SmsSender smsSender = messageProviderFactory.createSmsSender(config, sender); SmsSender smsSender = messageProviderFactory.createSmsSender(config, sender);
String templateIdentifier = "SMS_481710295"; String templateIdentifier = "SMS_481710295";

View File

@@ -24,7 +24,7 @@
<!-- Web 相关 --> <!-- Web 相关 -->
<dependency> <dependency>
<groupId>io.swagger.core.v3</groupId> <groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId> <artifactId>swagger-annotations-jakarta</artifactId>
</dependency> </dependency>
<!-- 参数校验 --> <!-- 参数校验 -->

View File

@@ -27,6 +27,7 @@
<!-- maven-surefire-plugin 暂时无法通过 bom 的依赖读取(兼容老版本 IDEA 2024 及以前版本) --> <!-- maven-surefire-plugin 暂时无法通过 bom 的依赖读取(兼容老版本 IDEA 2024 及以前版本) -->
<lombok.version>1.18.42</lombok.version> <lombok.version>1.18.42</lombok.version>
<spring.boot.version>3.5.9</spring.boot.version> <spring.boot.version>3.5.9</spring.boot.version>
<swagger.jakarta.version>2.2.38</swagger.jakarta.version>
<mapstruct.version>1.6.3</mapstruct.version> <mapstruct.version>1.6.3</mapstruct.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
@@ -119,6 +120,12 @@
<artifactId>msgpush-spring-boot-starter-websocket</artifactId> <artifactId>msgpush-spring-boot-starter-websocket</artifactId>
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations-jakarta</artifactId>
<version>${swagger.jakarta.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>