重试逻辑调整
This commit is contained in:
@@ -72,7 +72,7 @@
|
|||||||
<!-- Swagger 注解,用于 API 文档生成(@Schema、@Operation 等) -->
|
<!-- Swagger 注解,用于 API 文档生成(@Schema、@Operation 等) -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.swagger.core.v3</groupId>
|
<groupId>io.swagger.core.v3</groupId>
|
||||||
<artifactId>swagger-annotations</artifactId>
|
<artifactId>swagger-annotations-jakarta</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- RPC 远程调用相关 -->
|
<!-- RPC 远程调用相关 -->
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
package com.njcn.msgpush.framework.common.exception.enums;
|
package com.njcn.msgpush.framework.common.exception.enums;
|
||||||
|
|
||||||
|
import com.njcn.msgpush.framework.common.exception.ErrorCode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 业务异常的错误码区间,解决:解决各模块错误码定义,避免重复,在此只声明不做实际使用
|
* 业务异常的错误码区间,解决:解决各模块错误码定义,避免重复,在此只声明不做实际使用
|
||||||
*
|
*
|
||||||
@@ -27,7 +29,7 @@ package com.njcn.msgpush.framework.common.exception.enums;
|
|||||||
*
|
*
|
||||||
* @author hongawen
|
* @author hongawen
|
||||||
*/
|
*/
|
||||||
public class ServiceErrorCodeRange {
|
public interface ServiceErrorCodeRange {
|
||||||
|
|
||||||
// 模块 infra 错误码区间 [1-001-000-000 ~ 1-002-000-000)
|
// 模块 infra 错误码区间 [1-001-000-000 ~ 1-002-000-000)
|
||||||
// 模块 system 错误码区间 [1-002-000-000 ~ 1-003-000-000)
|
// 模块 system 错误码区间 [1-002-000-000 ~ 1-003-000-000)
|
||||||
@@ -43,6 +45,7 @@ public class ServiceErrorCodeRange {
|
|||||||
|
|
||||||
// 模块 crm 错误码区间 [1-020-000-000 ~ 1-021-000-000)
|
// 模块 crm 错误码区间 [1-020-000-000 ~ 1-021-000-000)
|
||||||
|
|
||||||
// 模块 ai 错误码区间 [1-022-000-000 ~ 1-023-000-000)
|
// 模块 push 错误码区间 [1-022-000-000 ~ 1-023-000-000)
|
||||||
|
ErrorCode VALIDATE_SYSTEM_SECRET_FAIL = new ErrorCode(1-022-000-000, "校验系统密钥失败");
|
||||||
|
ErrorCode GENERATE_CREDENTIAL_FAIL = new ErrorCode(1-022-000-001, "生成凭证失败");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,9 +93,10 @@ public class MsgpushSwaggerAutoConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static GroupedOpenApi buildGroupedOpenApi(String group, String path) {
|
public static GroupedOpenApi buildGroupedOpenApi(String group, String path) {
|
||||||
|
String pathSuffix = path == null || path.isBlank() ? "" : "/" + path;
|
||||||
return GroupedOpenApi.builder()
|
return GroupedOpenApi.builder()
|
||||||
.group(group)
|
.group(group)
|
||||||
.pathsToMatch("/admin-api/" + path + "/**", "/app-api/" + path + "/**")
|
.pathsToMatch("/admin-api" + pathSuffix + "/**", "/app-api" + pathSuffix + "/**")
|
||||||
.addOperationCustomizer((operation, handlerMethod) -> operation
|
.addOperationCustomizer((operation, handlerMethod) -> operation
|
||||||
.addParametersItem(buildSecurityHeaderParameter()))
|
.addParametersItem(buildSecurityHeaderParameter()))
|
||||||
.addOperationCustomizer(buildOperationIdCustomizer())
|
.addOperationCustomizer(buildOperationIdCustomizer())
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ public class BlacklistReqVO extends PageParam {
|
|||||||
@Schema(description = "主键 ID", example = "123444")
|
@Schema(description = "主键 ID", example = "123444")
|
||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
@Schema(description = "渠道类型:sms/email/app_push", example = "sms")
|
@Schema(description = "渠道类型:sms/email/app", example = "sms")
|
||||||
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ public class ChannelProviderConfigReqVO extends PageParam {
|
|||||||
@Schema(description = "渠道ID")
|
@Schema(description = "渠道ID")
|
||||||
private String id;
|
private String id;
|
||||||
|
|
||||||
@Schema(description = "渠道类型:sms/email/app_push", example = "sms")
|
@Schema(description = "渠道类型:sms/email/app", example = "sms")
|
||||||
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package com.njcn.msgpush.module.push.controller.admin.credential;
|
||||||
|
|
||||||
|
import com.njcn.msgpush.framework.common.pojo.CommonResult;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialReqDTO;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialRespDTO;
|
||||||
|
import com.njcn.msgpush.module.push.service.credential.ICredentialService;
|
||||||
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
|
import jakarta.validation.Valid;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 系统凭证 API 接口
|
||||||
|
*
|
||||||
|
* @author msgpush
|
||||||
|
*/
|
||||||
|
@Tag(name = "系统凭证管理")
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/credential")
|
||||||
|
public class CredentialController {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ICredentialService credentialService;
|
||||||
|
|
||||||
|
@PostMapping("/generate")
|
||||||
|
@Operation(summary = "生成系统凭证")
|
||||||
|
public CommonResult<CredentialRespDTO> generateCredential(@Valid @RequestBody CredentialReqDTO reqDTO) {
|
||||||
|
CredentialRespDTO respDTO = credentialService.generateCredential(reqDTO);
|
||||||
|
return CommonResult.success(respDTO);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent;
|
|||||||
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
|
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
|
||||||
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
|
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
|
||||||
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
|
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
|
||||||
|
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
|
||||||
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
|
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
import io.swagger.v3.oas.annotations.Parameter;
|
import io.swagger.v3.oas.annotations.Parameter;
|
||||||
@@ -32,11 +33,27 @@ public class MessageRecordController {
|
|||||||
private MessageRecordService messageRecordService;
|
private MessageRecordService messageRecordService;
|
||||||
|
|
||||||
@PermitAll
|
@PermitAll
|
||||||
@PostMapping("/send")
|
@PostMapping("/send/sms")
|
||||||
@Operation(summary = "消息推送")
|
@Operation(summary = "短信推送")
|
||||||
@Idempotent(timeout = 2)
|
@Idempotent(timeout = 2)
|
||||||
public CommonResult<List<MessageSendResultVO>> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
|
public CommonResult<List<MessageSendResultVO>> sendSms(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
|
||||||
return success(messageRecordService.send(reqVOList));
|
return success(messageRecordService.send(reqVOList, ChannelTypeEnum.SMS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@PermitAll
|
||||||
|
@PostMapping("/send/email")
|
||||||
|
@Operation(summary = "邮箱推送")
|
||||||
|
@Idempotent(timeout = 2)
|
||||||
|
public CommonResult<List<MessageSendResultVO>> sendEmail(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
|
||||||
|
return success(messageRecordService.send(reqVOList, ChannelTypeEnum.EMAIL));
|
||||||
|
}
|
||||||
|
|
||||||
|
@PermitAll
|
||||||
|
@PostMapping("/send/app")
|
||||||
|
@Operation(summary = "app推送")
|
||||||
|
@Idempotent(timeout = 2)
|
||||||
|
public CommonResult<List<MessageSendResultVO>> sendApp(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
|
||||||
|
return success(messageRecordService.send(reqVOList, ChannelTypeEnum.APP));
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/page")
|
@PostMapping("/page")
|
||||||
|
|||||||
@@ -19,11 +19,7 @@ public class MessageRecordReqVO extends PageParam {
|
|||||||
@Schema(description = "主键ID")
|
@Schema(description = "主键ID")
|
||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
@Schema(description = "应用名称/来源系统标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "NPQS-9500")
|
@Schema(description = "渠道类型:sms/email/app", example = "sms")
|
||||||
@NotBlank(message = "应用名称/来源系统标识不能为空")
|
|
||||||
private String appName;
|
|
||||||
|
|
||||||
@Schema(description = "渠道类型:sms/email/app_push", example = "sms")
|
|
||||||
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
@@ -48,13 +44,6 @@ public class MessageRecordReqVO extends PageParam {
|
|||||||
@Schema(description = "模板参数")
|
@Schema(description = "模板参数")
|
||||||
private String templateParams;
|
private String templateParams;
|
||||||
|
|
||||||
@Schema(description = "服务商类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "telecom/cmcc/aliyun/twilio")
|
|
||||||
@InEnum(value = com.njcn.msgpush.module.push.enums.ProviderTypeEnum.class, message = "服务商类型必须是 {value}")
|
|
||||||
private String providerType;
|
|
||||||
|
|
||||||
@Schema(description = "第三方消息ID")
|
@Schema(description = "第三方消息ID")
|
||||||
private String thirdPartyId;
|
private String thirdPartyId;
|
||||||
|
|
||||||
@Schema(description = "额外信息")
|
|
||||||
private String extraInfo;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ public class RateLimitConfigReqVO extends PageParam {
|
|||||||
@Schema(description = "主键 ID")
|
@Schema(description = "主键 ID")
|
||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
@Schema(description = "渠道类型:sms/email/app_push", example = "sms")
|
@Schema(description = "渠道类型:sms/email/app", example = "sms")
|
||||||
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ public class MessageRetryQueueReqVO extends PageParam {
|
|||||||
@Schema(description = "消息ID")
|
@Schema(description = "消息ID")
|
||||||
private Long messageId;
|
private Long messageId;
|
||||||
|
|
||||||
@Schema(description = "渠道类型", example = "sms/email/app_push")
|
@Schema(description = "渠道类型", example = "sms/email/app")
|
||||||
@NotBlank(message = "渠道类型不能为空")
|
@NotBlank(message = "渠道类型不能为空")
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ public class RetryStrategyConfigReqVO {
|
|||||||
@Schema(description = "主键 ID")
|
@Schema(description = "主键 ID")
|
||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
@Schema(description = "渠道类型:sms/email/app_push", example = "sms")
|
@Schema(description = "渠道类型:sms/email/app", example = "sms")
|
||||||
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
@InEnum(value = com.njcn.msgpush.module.push.enums.ChannelTypeEnum.class, message = "渠道类型必须是 {value}")
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ public class BlacklistDO extends BaseDO {
|
|||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 渠道类型:sms/email/app_push
|
* 渠道类型:sms/email/app
|
||||||
*/
|
*/
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ public class ChannelProviderConfigDO extends BaseDO {
|
|||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 渠道类型:sms/email/app_push
|
* 渠道类型:sms/email/app
|
||||||
*/
|
*/
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ public class ProviderErrorCodeMappingDO extends BaseDO {
|
|||||||
private String providerType;
|
private String providerType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 渠道类型:sms/email/app_push
|
* 渠道类型:sms/email/app
|
||||||
*/
|
*/
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,14 @@
|
|||||||
|
package com.njcn.msgpush.module.push.dal.dataobject.credential;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-04-09
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class SystemSecretDO {
|
||||||
|
private String id;
|
||||||
|
private String systemName;
|
||||||
|
private String secret;
|
||||||
|
}
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package com.njcn.msgpush.module.push.dal.dataobject.credential.dto;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-03-31
|
||||||
|
*/
|
||||||
|
|
||||||
|
import jakarta.validation.constraints.NotEmpty;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 系统凭证请求 DTO
|
||||||
|
*
|
||||||
|
* @author msgpush
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class CredentialReqDTO implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上游系统名称
|
||||||
|
*/
|
||||||
|
@NotEmpty(message = "上游系统名称不能为空")
|
||||||
|
private String systemName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 密钥(用于生成凭证)
|
||||||
|
*/
|
||||||
|
@NotEmpty(message = "密钥不能为空")
|
||||||
|
private String secretKey;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package com.njcn.msgpush.module.push.dal.dataobject.credential.dto;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-03-31
|
||||||
|
*/
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 系统凭证响应 DTO
|
||||||
|
*
|
||||||
|
* @author msgpush
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class CredentialRespDTO implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 凭证令牌
|
||||||
|
*/
|
||||||
|
private String credentialToken;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上游系统名称
|
||||||
|
*/
|
||||||
|
private String systemName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 过期时间
|
||||||
|
*/
|
||||||
|
private LocalDateTime expiresTime;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.njcn.msgpush.module.push.dal.dataobject.message;
|
package com.njcn.msgpush.module.push.dal.dataobject.message;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableField;
|
||||||
import com.baomidou.mybatisplus.annotation.TableName;
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
|
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
@@ -27,7 +28,7 @@ public class MessageRecordDO extends BaseDO {
|
|||||||
private String appName;
|
private String appName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 渠道类型:sms/email/app_push
|
* 渠道类型:sms/email/app
|
||||||
*/
|
*/
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ public class SystemQuotaConfigDO extends BaseDO {
|
|||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 渠道类型:sms/email/app_push
|
* 渠道类型:sms/email/app
|
||||||
*/
|
*/
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ public class RateLimitConfigDO extends BaseDO {
|
|||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 渠道类型:sms/email/app_push
|
* 渠道类型:sms/email/app
|
||||||
*/
|
*/
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -61,4 +61,9 @@ public class MessageRetryHistoryDO extends BaseDO {
|
|||||||
* 第三方消息ID
|
* 第三方消息ID
|
||||||
*/
|
*/
|
||||||
private String thirdPartyId;
|
private String thirdPartyId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重试来源:AUTO_RETRY-系统自动重试,MANUAL_RETRY-人工手动重试
|
||||||
|
*/
|
||||||
|
private String retrySource;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ public class RetryStrategyConfigDO extends BaseDO {
|
|||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 渠道类型:sms/email/app_push
|
* 渠道类型:sms/email/app
|
||||||
*/
|
*/
|
||||||
private String channel;
|
private String channel;
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,11 @@
|
|||||||
|
package com.njcn.msgpush.module.push.dal.mysql.credential;
|
||||||
|
|
||||||
|
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-04-09
|
||||||
|
*/
|
||||||
|
public interface SystemSecretMappper extends BaseMapperX<SystemSecretDO> {
|
||||||
|
}
|
||||||
@@ -18,7 +18,7 @@ import java.util.Arrays;
|
|||||||
public enum ChannelTypeEnum implements ArrayValuable<String> {
|
public enum ChannelTypeEnum implements ArrayValuable<String> {
|
||||||
SMS("sms", "短信"),
|
SMS("sms", "短信"),
|
||||||
EMAIL("email", "邮箱"),
|
EMAIL("email", "邮箱"),
|
||||||
APP_PUSH("app_push", "APP 推送"),;
|
APP("app", "APP 推送"),;
|
||||||
|
|
||||||
public static final String[] ARRAYS = Arrays.stream(values()).map(ChannelTypeEnum::getCode).toArray(String[]::new);
|
public static final String[] ARRAYS = Arrays.stream(values()).map(ChannelTypeEnum::getCode).toArray(String[]::new);
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package com.njcn.msgpush.module.push.enums;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-04-07
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum RetrySourceEnum {
|
||||||
|
AUTO_RETRY("AUTO_RETRY", "系统自动重试"),
|
||||||
|
MANUAL_RETRY("MANUAL_RETRY", "人工手动重试");
|
||||||
|
|
||||||
|
private final String code;
|
||||||
|
private final String desc;
|
||||||
|
|
||||||
|
public static RetrySourceEnum getByCode(String code) {
|
||||||
|
if (code == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for (RetrySourceEnum value : values()) {
|
||||||
|
if (value.getCode().equals(code)) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,115 @@
|
|||||||
|
package com.njcn.msgpush.module.push.filter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-03-31
|
||||||
|
*/
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.njcn.msgpush.framework.common.exception.enums.GlobalErrorCodeConstants;
|
||||||
|
import com.njcn.msgpush.framework.common.pojo.CommonResult;
|
||||||
|
import com.njcn.msgpush.framework.common.util.servlet.ServletUtils;
|
||||||
|
import com.njcn.msgpush.framework.web.config.WebProperties;
|
||||||
|
import com.njcn.msgpush.framework.web.core.filter.ApiRequestFilter;
|
||||||
|
import com.njcn.msgpush.module.push.service.credential.CredentialServiceImpl;
|
||||||
|
import com.njcn.msgpush.module.push.service.credential.ICredentialService;
|
||||||
|
import jakarta.servlet.FilterChain;
|
||||||
|
import jakarta.servlet.ServletException;
|
||||||
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
|
import jakarta.servlet.http.HttpServletResponse;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.core.Ordered;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 凭证认证过滤器
|
||||||
|
*
|
||||||
|
* @author msgpush
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class CredentialAuthenticationFilter extends ApiRequestFilter implements Ordered {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ICredentialService credentialService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 凭证 Header 名称
|
||||||
|
*/
|
||||||
|
private static final String CREDENTIAL_HEADER = "X-Credential-Token";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 需要凭证认证的接口路径
|
||||||
|
*/
|
||||||
|
private static final String[] NOT_IGNORE_PATHS = {
|
||||||
|
"/admin-api/push/message/send/**",
|
||||||
|
};
|
||||||
|
|
||||||
|
private static final String[] IGNORE_PATHS = {
|
||||||
|
"/admin-api/push/credential/generate",
|
||||||
|
};
|
||||||
|
|
||||||
|
public CredentialAuthenticationFilter(WebProperties webProperties) {
|
||||||
|
super(webProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean shouldNotFilter(HttpServletRequest request) {
|
||||||
|
// 检查是否在排除列表中
|
||||||
|
String path = request.getRequestURI().substring(request.getContextPath().length());
|
||||||
|
for (String notExcludePath : NOT_IGNORE_PATHS) {
|
||||||
|
if (path.equals(notExcludePath)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (notExcludePath.endsWith("/**") && path.startsWith(notExcludePath.substring(0, notExcludePath.length() - 3))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (String excludePath : IGNORE_PATHS){
|
||||||
|
if (path.equals(excludePath)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (excludePath.endsWith("/**") && path.startsWith(excludePath.substring(0, excludePath.length() - 3))) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return super.shouldNotFilter(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
|
||||||
|
// 1. 获取凭证 token
|
||||||
|
String credentialToken = request.getHeader(CREDENTIAL_HEADER);
|
||||||
|
if (StrUtil.isEmpty(credentialToken)) {
|
||||||
|
credentialToken = request.getParameter("credential");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 如果没有凭证,继续过滤链
|
||||||
|
if (StrUtil.isEmpty(credentialToken)) {
|
||||||
|
filterChain.doFilter(request, response);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 验证凭证
|
||||||
|
CredentialServiceImpl.CredentialInfo credentialInfo = credentialService.verifyCredential(credentialToken);
|
||||||
|
|
||||||
|
if (credentialInfo == null) {
|
||||||
|
// 校验失败
|
||||||
|
ServletUtils.writeJSON(response, CommonResult.error(GlobalErrorCodeConstants.UNAUTHORIZED.getCode(), "凭证无效或已过期"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
filterChain.doFilter(request, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getOrder() {
|
||||||
|
return 1000;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.njcn.msgpush.module.push.job;
|
package com.njcn.msgpush.module.push.job;
|
||||||
|
|
||||||
|
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
|
||||||
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
|
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -25,8 +26,7 @@ public class MessageRetryJob {
|
|||||||
*/
|
*/
|
||||||
@Scheduled(fixedRate = 10000)
|
@Scheduled(fixedRate = 10000)
|
||||||
public void processSmsRetryQueue() {
|
public void processSmsRetryQueue() {
|
||||||
log.info("开始处理短信重试队列:{}", LocalDateTime.now());
|
messageRetryQueueService.processRetryBatch(ChannelTypeEnum.SMS.getCode());
|
||||||
messageRetryQueueService.processRetryBatch("sms");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -34,7 +34,7 @@ public class MessageRetryJob {
|
|||||||
*/
|
*/
|
||||||
//@Scheduled(fixedRate = 10000)
|
//@Scheduled(fixedRate = 10000)
|
||||||
public void processEmailRetryQueue() {
|
public void processEmailRetryQueue() {
|
||||||
messageRetryQueueService.processRetryBatch("email");
|
messageRetryQueueService.processRetryBatch(ChannelTypeEnum.EMAIL.getCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -42,7 +42,7 @@ public class MessageRetryJob {
|
|||||||
*/
|
*/
|
||||||
//@Scheduled(fixedRate = 10000)
|
//@Scheduled(fixedRate = 10000)
|
||||||
public void processAppPushRetryQueue() {
|
public void processAppPushRetryQueue() {
|
||||||
messageRetryQueueService.processRetryBatch("app_push");
|
messageRetryQueueService.processRetryBatch(ChannelTypeEnum.APP.getCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,4 +32,6 @@ public interface ChannelProviderConfigService extends IService<ChannelProviderCo
|
|||||||
void failureUpdate(String providerType, String channel);
|
void failureUpdate(String providerType, String channel);
|
||||||
|
|
||||||
void successUpdate(String providerType, String channel);
|
void successUpdate(String providerType, String channel);
|
||||||
|
|
||||||
|
List<ChannelProviderConfigDO> getEnabledProviders(String channel);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,4 +61,9 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProvide
|
|||||||
byTypeAndChannel.setHealthStatus(1);
|
byTypeAndChannel.setHealthStatus(1);
|
||||||
this.updateById(byTypeAndChannel);
|
this.updateById(byTypeAndChannel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ChannelProviderConfigDO> getEnabledProviders(String channel) {
|
||||||
|
return this.lambdaQuery().eq(ChannelProviderConfigDO::getChannel, channel).list();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,212 @@
|
|||||||
|
package com.njcn.msgpush.module.push.service.credential;
|
||||||
|
|
||||||
|
import cn.hutool.core.codec.Base64;
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import cn.hutool.crypto.SecureUtil;
|
||||||
|
import cn.hutool.crypto.symmetric.AES;
|
||||||
|
import com.njcn.msgpush.framework.common.exception.ServiceException;
|
||||||
|
import com.njcn.msgpush.framework.common.exception.enums.ServiceErrorCodeRange;
|
||||||
|
import com.njcn.msgpush.framework.common.util.json.JsonUtils;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialReqDTO;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialRespDTO;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-03-31
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class CredentialServiceImpl implements ICredentialService {
|
||||||
|
/**
|
||||||
|
* 凭证加密密钥(生产环境应通过配置中心或环境变量注入)
|
||||||
|
*/
|
||||||
|
private String credentialSecretKey = "88888888";
|
||||||
|
@Autowired
|
||||||
|
private ISystemSecretService systemSecretService;
|
||||||
|
|
||||||
|
private final StringRedisTemplate stringRedisTemplate;
|
||||||
|
|
||||||
|
public CredentialServiceImpl(StringRedisTemplate stringRedisTemplate) {
|
||||||
|
this.stringRedisTemplate = stringRedisTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CredentialRespDTO generateCredential(CredentialReqDTO reqDTO) {
|
||||||
|
try {
|
||||||
|
// 1. 验证系统密钥
|
||||||
|
boolean validatedSecretRes = validateSecret(reqDTO.getSystemName(), reqDTO.getSecretKey());
|
||||||
|
if (!validatedSecretRes) {
|
||||||
|
throw new ServiceException(ServiceErrorCodeRange.VALIDATE_SYSTEM_SECRET_FAIL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 创建凭证信息
|
||||||
|
CredentialInfo credentialInfo = new CredentialInfo(
|
||||||
|
reqDTO.getSystemName(),
|
||||||
|
LocalDateTime.now().plusHours(24)
|
||||||
|
);
|
||||||
|
|
||||||
|
// 3. 生成凭证 token(加密后的 JSON)
|
||||||
|
String token = encryptCredential(credentialInfo);
|
||||||
|
|
||||||
|
// 4. 缓存凭证信息到 Redis
|
||||||
|
cacheCredential(token, credentialInfo);
|
||||||
|
|
||||||
|
// 5. 构建响应
|
||||||
|
return new CredentialRespDTO()
|
||||||
|
.setCredentialToken(token)
|
||||||
|
.setSystemName(reqDTO.getSystemName())
|
||||||
|
.setExpiresTime(credentialInfo.getExpiresTime());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ServiceException(ServiceErrorCodeRange.GENERATE_CREDENTIAL_FAIL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public CredentialInfo verifyCredential(String token) {
|
||||||
|
if (StrUtil.isEmpty(token)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 1. 从 Redis 获取凭证信息
|
||||||
|
CredentialInfo credentialInfo = getCredentialFromRedis(token);
|
||||||
|
if (credentialInfo == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 检查是否过期
|
||||||
|
if (credentialInfo.getExpiresTime().isBefore(LocalDateTime.now())) {
|
||||||
|
// 删除过期的凭证
|
||||||
|
deleteCredential(token);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return credentialInfo;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("[verifyCredential] 验证凭证失败,token={}", token, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 验证系统密钥
|
||||||
|
*
|
||||||
|
* @param systemName 系统名称
|
||||||
|
* @param secretKey 密钥
|
||||||
|
*/
|
||||||
|
private boolean validateSecret(String systemName, String secretKey) {
|
||||||
|
SystemSecretDO systemSecretDO = systemSecretService.getBySystemName(systemName);
|
||||||
|
|
||||||
|
String storedSecret = systemSecretDO.getSecret();
|
||||||
|
if (!storedSecret.equals(secretKey)) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 加密凭证信息
|
||||||
|
*
|
||||||
|
* @param info 凭证信息
|
||||||
|
* @return 加密后的 token
|
||||||
|
*/
|
||||||
|
private String encryptCredential(CredentialInfo info) {
|
||||||
|
AES aes = SecureUtil.aes(credentialSecretKey.getBytes(StandardCharsets.UTF_8));
|
||||||
|
String json = JsonUtils.toJsonString(info);
|
||||||
|
byte[] encrypted = aes.encrypt(json.getBytes(StandardCharsets.UTF_8));
|
||||||
|
return Base64.encode(encrypted);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 解密凭证信息
|
||||||
|
*
|
||||||
|
* @param token 凭证 token
|
||||||
|
* @return 凭证信息
|
||||||
|
*/
|
||||||
|
private CredentialInfo decryptCredential(String token) {
|
||||||
|
try {
|
||||||
|
AES aes = SecureUtil.aes(credentialSecretKey.getBytes(StandardCharsets.UTF_8));
|
||||||
|
byte[] decrypted = aes.decrypt(Base64.decode(token));
|
||||||
|
String json = new String(decrypted, StandardCharsets.UTF_8);
|
||||||
|
return JsonUtils.parseObject(json, CredentialInfo.class);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[decryptCredential] 解密凭证失败", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 缓存凭证信息到 Redis
|
||||||
|
*
|
||||||
|
* @param token 凭证 token
|
||||||
|
* @param info 凭证信息
|
||||||
|
*/
|
||||||
|
private void cacheCredential(String token, CredentialInfo info) {
|
||||||
|
String redisKey = formatRedisKey(token);
|
||||||
|
String jsonValue = JsonUtils.toJsonString(info);
|
||||||
|
|
||||||
|
// 计算剩余过期时间(秒)
|
||||||
|
long remainingSeconds = Duration.between(LocalDateTime.now(), info.getExpiresTime()).getSeconds();
|
||||||
|
if (remainingSeconds > 0) {
|
||||||
|
stringRedisTemplate.opsForValue().set(redisKey, jsonValue, remainingSeconds, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从 Redis 获取凭证信息
|
||||||
|
*
|
||||||
|
* @param token 凭证 token
|
||||||
|
* @return 凭证信息
|
||||||
|
*/
|
||||||
|
private CredentialInfo getCredentialFromRedis(String token) {
|
||||||
|
String redisKey = formatRedisKey(token);
|
||||||
|
String jsonValue = stringRedisTemplate.opsForValue().get(redisKey);
|
||||||
|
if (StrUtil.isEmpty(jsonValue)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return JsonUtils.parseObject(jsonValue, CredentialInfo.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除凭证
|
||||||
|
*
|
||||||
|
* @param token 凭证 token
|
||||||
|
*/
|
||||||
|
private void deleteCredential(String token) {
|
||||||
|
String redisKey = formatRedisKey(token);
|
||||||
|
stringRedisTemplate.delete(redisKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 格式化 Redis Key
|
||||||
|
*
|
||||||
|
* @param token 凭证 token
|
||||||
|
* @return Redis Key
|
||||||
|
*/
|
||||||
|
private String formatRedisKey(String token) {
|
||||||
|
return "credential:token:" + token;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 凭证信息内部类
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
public static class CredentialInfo {
|
||||||
|
private String systemName;
|
||||||
|
private LocalDateTime expiresTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
package com.njcn.msgpush.module.push.service.credential;
|
||||||
|
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialReqDTO;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.dto.CredentialRespDTO;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-03-31
|
||||||
|
*/
|
||||||
|
public interface ICredentialService {
|
||||||
|
/**
|
||||||
|
* 生成系统凭证
|
||||||
|
*
|
||||||
|
* @param reqDTO 请求参数
|
||||||
|
* @return 凭证响应
|
||||||
|
*/
|
||||||
|
CredentialRespDTO generateCredential(CredentialReqDTO reqDTO);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 验证凭证是否有效
|
||||||
|
*
|
||||||
|
* @param token 凭证 token
|
||||||
|
* @return 凭证信息
|
||||||
|
*/
|
||||||
|
CredentialServiceImpl.CredentialInfo verifyCredential(String token);
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
package com.njcn.msgpush.module.push.service.credential;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-04-09
|
||||||
|
*/
|
||||||
|
public interface ISystemSecretService extends IService<SystemSecretDO> {
|
||||||
|
SystemSecretDO getBySystemName(String systemName);
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package com.njcn.msgpush.module.push.service.credential;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
import com.njcn.msgpush.module.push.dal.dataobject.credential.SystemSecretDO;
|
||||||
|
import com.njcn.msgpush.module.push.dal.mysql.credential.SystemSecretMappper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author caozehui
|
||||||
|
* @data 2026-04-09
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class SystemSecretServiceImpl extends ServiceImpl<SystemSecretMappper, SystemSecretDO> implements ISystemSecretService {
|
||||||
|
@Override
|
||||||
|
public SystemSecretDO getBySystemName(String systemName) {
|
||||||
|
return this.lambdaQuery().eq(SystemSecretDO::getSystemName, systemName).one();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,29 +5,30 @@ import com.baomidou.mybatisplus.extension.service.IService;
|
|||||||
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
|
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
|
||||||
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
|
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
|
||||||
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
|
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
|
||||||
|
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
|
||||||
|
import com.njcn.msgpush.module.push.enums.RetrySourceEnum;
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public interface MessageRecordService extends IService<MessageRecordDO> {
|
public interface MessageRecordService extends IService<MessageRecordDO> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送消息(包括email、sms、app_push)
|
* 发送消息(包括email、sms、app)
|
||||||
*
|
*
|
||||||
* @param reqVOList
|
* @param reqVOList
|
||||||
|
* @return channelTypeEnum
|
||||||
* @return 发送的结果
|
* @return 发送的结果
|
||||||
*/
|
*/
|
||||||
List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList);
|
List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList, ChannelTypeEnum channelTypeEnum);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理发送消息
|
* 处理发送消息
|
||||||
*
|
*
|
||||||
* @param messageRecordDOList
|
* @param messageRecordDOList
|
||||||
|
* @param retrySource
|
||||||
* @return 发送的结果
|
* @return 发送的结果
|
||||||
*/
|
*/
|
||||||
List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList);
|
List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList, RetrySourceEnum retrySource);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 添加消息记录
|
* 添加消息记录
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.njcn.msgpush.module.push.service.message;
|
package com.njcn.msgpush.module.push.service.message;
|
||||||
|
|
||||||
import cn.hutool.core.bean.BeanUtil;
|
import cn.hutool.core.bean.BeanUtil;
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
@@ -23,6 +24,7 @@ import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
|
|||||||
import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO;
|
import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO;
|
||||||
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
|
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
|
||||||
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
|
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
|
||||||
|
import com.njcn.msgpush.module.push.enums.RetrySourceEnum;
|
||||||
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
|
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
|
||||||
import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService;
|
import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService;
|
||||||
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
|
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
|
||||||
@@ -69,15 +71,16 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
private MessageConfirmRedisDAO messageConfirmRedisDAO;
|
private MessageConfirmRedisDAO messageConfirmRedisDAO;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList) {
|
public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList, ChannelTypeEnum channelTypeEnum) {
|
||||||
List<MessageRecordDO> messageRecordDOList = this.createMessageRecords(reqVOList);
|
List<MessageRecordDO> messageRecordDOList = this.createMessageRecords(reqVOList, channelTypeEnum);
|
||||||
return this.processSendMsg(messageRecordDOList);
|
return this.processSendMsg(messageRecordDOList, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public List<MessageRecordDO> createMessageRecords(List<MessageRecordReqVO> reqVOList) {
|
public List<MessageRecordDO> createMessageRecords(List<MessageRecordReqVO> reqVOList, ChannelTypeEnum channelTypeEnum) {
|
||||||
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
|
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
|
||||||
messageRecordDOList.forEach(messageRecordDO -> {
|
messageRecordDOList.forEach(messageRecordDO -> {
|
||||||
|
messageRecordDO.setChannel(channelTypeEnum.getCode());
|
||||||
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
|
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
|
||||||
// messageRecordDO.setRetryCount(0);
|
// messageRecordDO.setRetryCount(0);
|
||||||
});
|
});
|
||||||
@@ -90,12 +93,12 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
* 这里统一接收 sender 返回的 SendResult,再决定消息状态、重试队列和服务商健康度如何变化。
|
* 这里统一接收 sender 返回的 SendResult,再决定消息状态、重试队列和服务商健康度如何变化。
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList) {
|
public List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList, RetrySourceEnum retrySource) {
|
||||||
msgPushGuardChain.checkAll(messageRecordDOList);
|
msgPushGuardChain.checkAll(messageRecordDOList);
|
||||||
List<MessageSendResultVO> resultList = new ArrayList<>();
|
List<MessageSendResultVO> resultList = new ArrayList<>();
|
||||||
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
|
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
|
||||||
try {
|
try {
|
||||||
MessageSendResultVO resultVO = this.processSingleMessage(messageRecordDO);
|
MessageSendResultVO resultVO = this.processSingleMessage(messageRecordDO, retrySource);
|
||||||
resultList.add(resultVO);
|
resultList.add(resultVO);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
MessageSendResultVO failedVO = new MessageSendResultVO();
|
MessageSendResultVO failedVO = new MessageSendResultVO();
|
||||||
@@ -108,7 +111,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
return resultList;
|
return resultList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private MessageSendResultVO processSingleMessage(MessageRecordDO messageRecordDO) {
|
private MessageSendResultVO processSingleMessage(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) {
|
||||||
MessageSendResultVO resultVO = new MessageSendResultVO();
|
MessageSendResultVO resultVO = new MessageSendResultVO();
|
||||||
resultVO.setMessageId(messageRecordDO.getId());
|
resultVO.setMessageId(messageRecordDO.getId());
|
||||||
|
|
||||||
@@ -118,13 +121,34 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
this.updateById(messageRecordDO);
|
this.updateById(messageRecordDO);
|
||||||
return resultVO;
|
return resultVO;
|
||||||
}
|
}
|
||||||
|
List<ChannelProviderConfigDO> enabledProviders = channelProviderConfigService.getEnabledProviders(messageRecordDO.getChannel());
|
||||||
|
SendResult sendResult = null;
|
||||||
|
if (CollectionUtil.isNotEmpty(enabledProviders)) {
|
||||||
|
ChannelProviderConfigDO channelProviderConfigDO = enabledProviders.get(0);
|
||||||
|
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(channelProviderConfigDO.getProviderType());
|
||||||
|
sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
|
||||||
|
this.applySendResult(messageRecordDO, sendResult);
|
||||||
|
this.recordRetryHistory(messageRecordDO, retrySource);
|
||||||
|
} else {
|
||||||
|
sendResult = this.sendMessage(messageRecordDO, null, null);
|
||||||
|
this.applySendResult(messageRecordDO, sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (CollectionUtil.isNotEmpty(enabledProviders)) {
|
||||||
|
ChannelProviderConfigDO channelProviderConfigDO = enabledProviders.get(0);
|
||||||
|
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(channelProviderConfigDO.getProviderType());
|
||||||
|
sendResult = this.validateProviderAndChannel(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
|
||||||
|
|
||||||
|
if (sendResult == null) {
|
||||||
|
sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
|
||||||
|
}
|
||||||
|
this.applySendResult(messageRecordDO, sendResult);
|
||||||
|
this.recordRetryHistory(messageRecordDO, retrySource);
|
||||||
|
} else {
|
||||||
|
sendResult = this.validateProviderAndChannel(messageRecordDO, null, null);
|
||||||
|
this.applySendResult(messageRecordDO, sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService
|
|
||||||
.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
|
|
||||||
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
|
|
||||||
SendResult sendResult = this.sendMessage(messageRecordDO, channelProviderConfigDO, messageProviderFactory);
|
|
||||||
this.applySendResult(messageRecordDO, sendResult);
|
|
||||||
this.recordRetryHistory(messageRecordDO);
|
|
||||||
|
|
||||||
// 在这里修改接口返回的是投递结果,还是调用接口结果
|
// 在这里修改接口返回的是投递结果,还是调用接口结果
|
||||||
resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome()));
|
resultVO.setResult(this.isSuccessOutcome(sendResult.getOutcome()));
|
||||||
@@ -132,26 +156,29 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
return resultVO;
|
return resultVO;
|
||||||
}
|
}
|
||||||
|
|
||||||
private SendResult sendMessage(MessageRecordDO messageRecordDO,
|
private SendResult validateProviderAndChannel(MessageRecordDO messageRecordDO, ChannelProviderConfigDO channelProviderConfigDO, MessageProviderFactory messageProviderFactory) {
|
||||||
ChannelProviderConfigDO channelProviderConfigDO,
|
ChannelTypeEnum channelType = ChannelTypeEnum.getByCode(messageRecordDO.getChannel());
|
||||||
MessageProviderFactory messageProviderFactory) {
|
if (channelType == null) {
|
||||||
|
return SendResult.unsupported("当前发送渠道类型暂不支持");
|
||||||
|
}
|
||||||
if (ObjectUtil.isNull(messageProviderFactory)) {
|
if (ObjectUtil.isNull(messageProviderFactory)) {
|
||||||
return SendResult.configInvalid("当前服务商未启用或未注册");
|
return SendResult.configInvalid("当前服务商未启用或未注册");
|
||||||
}
|
}
|
||||||
if (ObjectUtil.isNull(channelProviderConfigDO)) {
|
if (ObjectUtil.isNull(channelProviderConfigDO)) {
|
||||||
return SendResult.configInvalid("当前服务商渠道配置不存在");
|
return SendResult.configInvalid("当前服务商渠道配置不存在");
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private SendResult sendMessage(MessageRecordDO messageRecordDO,
|
||||||
|
ChannelProviderConfigDO channelProviderConfigDO,
|
||||||
|
MessageProviderFactory messageProviderFactory) {
|
||||||
ChannelTypeEnum channelType = ChannelTypeEnum.getByCode(messageRecordDO.getChannel());
|
ChannelTypeEnum channelType = ChannelTypeEnum.getByCode(messageRecordDO.getChannel());
|
||||||
if (channelType == null) {
|
|
||||||
return SendResult.unsupported("当前发送渠道类型暂不支持");
|
|
||||||
}
|
|
||||||
|
|
||||||
return switch (channelType) {
|
return switch (channelType) {
|
||||||
case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
|
case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
|
||||||
case EMAIL -> messageProviderFactory.createEmailSender(channelProviderConfigDO, sender)
|
case EMAIL -> messageProviderFactory.createEmailSender(channelProviderConfigDO, sender)
|
||||||
.sendEmail(messageRecordDO, new HashMap<>());
|
.sendEmail(messageRecordDO, new HashMap<>());
|
||||||
case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
|
case APP -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,11 +219,12 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
|
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
|
||||||
public void recordRetryHistory(MessageRecordDO messageRecordDO) {
|
public void recordRetryHistory(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) {
|
||||||
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
|
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
|
||||||
messageRetryHistoryDO.setMessageId(messageRecordDO.getId());
|
messageRetryHistoryDO.setMessageId(messageRecordDO.getId());
|
||||||
messageRetryHistoryDO.setStatus(messageRecordDO.getStatus());
|
messageRetryHistoryDO.setStatus(messageRecordDO.getStatus());
|
||||||
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getId()));
|
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getId()));
|
||||||
|
messageRetryHistoryDO.setRetrySource(retrySource != null ? retrySource.getCode() : null);
|
||||||
messageRetryHistoryService.add(messageRetryHistoryDO);
|
messageRetryHistoryService.add(messageRetryHistoryDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -256,9 +284,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) {
|
public Page<MessageRecordDO> getPage(MessageRecordReqVO reqVO) {
|
||||||
QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>();
|
QueryWrapper<MessageRecordDO> wrapper = new QueryWrapper<>();
|
||||||
wrapper.lambda()
|
wrapper.lambda()
|
||||||
.eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRecordDO::getChannel, reqVO.getChannel())
|
.eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRecordDO::getChannel, reqVO.getChannel());
|
||||||
.eq(StrUtil.isNotBlank(reqVO.getProviderType()), MessageRecordDO::getProviderType, reqVO.getProviderType())
|
|
||||||
.eq(StrUtil.isNotBlank(reqVO.getAppName()), MessageRecordDO::getAppName, reqVO.getAppName());
|
|
||||||
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
|
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|||||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
import com.njcn.msgpush.framework.common.pojo.PageResult;
|
import com.njcn.msgpush.framework.common.pojo.PageResult;
|
||||||
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
|
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
|
||||||
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
|
|
||||||
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
|
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
|
||||||
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
|
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
|
||||||
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
|
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
|
||||||
@@ -16,6 +15,7 @@ import com.njcn.msgpush.module.push.dal.mysql.retry.MessageRetryQueueMapper;
|
|||||||
import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
|
import com.njcn.msgpush.module.push.dal.redis.MessageConfirmRedisDAO;
|
||||||
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
|
import com.njcn.msgpush.module.push.dal.redis.MessageRetryRedisDAO;
|
||||||
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
|
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
|
||||||
|
import com.njcn.msgpush.module.push.enums.RetrySourceEnum;
|
||||||
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
|
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -50,7 +50,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
private static final List<String> CHANNELS = Arrays.asList(
|
private static final List<String> CHANNELS = Arrays.asList(
|
||||||
ChannelTypeEnum.SMS.getCode(),
|
ChannelTypeEnum.SMS.getCode(),
|
||||||
ChannelTypeEnum.EMAIL.getCode(),
|
ChannelTypeEnum.EMAIL.getCode(),
|
||||||
ChannelTypeEnum.APP_PUSH.getCode());
|
ChannelTypeEnum.APP.getCode());
|
||||||
|
|
||||||
private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false);
|
private final AtomicBoolean startupRebuildCompleted = new AtomicBoolean(false);
|
||||||
|
|
||||||
@@ -136,7 +136,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
plusSeconds = 60 * 60 * 4;
|
plusSeconds = 60 * 60 * 4;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case APP_PUSH -> {
|
case APP -> {
|
||||||
if (retryCount == 1) {
|
if (retryCount == 1) {
|
||||||
plusSeconds = 60;
|
plusSeconds = 60;
|
||||||
} else {
|
} else {
|
||||||
@@ -176,7 +176,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
|
|
||||||
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(claimedMessageIds);
|
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(claimedMessageIds);
|
||||||
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
|
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
|
||||||
processSingleRetry(messageRecordDO);
|
processSingleRetry(messageRecordDO, RetrySourceEnum.AUTO_RETRY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -188,18 +188,16 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
*
|
*
|
||||||
* @param messageRecordDO 消息记录
|
* @param messageRecordDO 消息记录
|
||||||
*/
|
*/
|
||||||
private void processSingleRetry(MessageRecordDO messageRecordDO) {
|
private void processSingleRetry(MessageRecordDO messageRecordDO, RetrySourceEnum retrySource) {
|
||||||
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
|
// 校验一下消息的状态是否为 retryable_failed,防止误推送已经成功的消息
|
||||||
messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0);
|
if (MsgStatusConstant.RETRYABLE_FAILED.equals(messageRecordDO.getStatus())) {
|
||||||
messageRecordService.updateRetryCount(messageRecordDO.getId());
|
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
|
||||||
}
|
messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO), retrySource);
|
||||||
|
messageRecordService.updateRetryCount(messageRecordDO.getId());
|
||||||
/**
|
} else {
|
||||||
* 删除重试记录(数据库逻辑删除 + Redis 已提前删除)
|
// 从retryQueue中删除(已经从redis中移除过了,所以不用再次移除了)
|
||||||
*/
|
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getId()));
|
||||||
private void deleteRetryRecord(MessageRecordDO message) {
|
}
|
||||||
this.deleteByMessageIds(Collections.singletonList(message.getId()));
|
|
||||||
messageRecordService.updateById(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -208,7 +206,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
if (CollUtil.isNotEmpty(messageIds)) {
|
if (CollUtil.isNotEmpty(messageIds)) {
|
||||||
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds);
|
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(messageIds);
|
||||||
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
|
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
|
||||||
processSingleRetry(messageRecordDO);
|
processSingleRetry(messageRecordDO, RetrySourceEnum.MANUAL_RETRY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ import java.time.LocalDateTime;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author caozehui
|
* @author caozehui
|
||||||
@@ -60,14 +59,12 @@ public class MsgPushClientTest {
|
|||||||
for (int i = 0; i < 1; i++) {
|
for (int i = 0; i < 1; i++) {
|
||||||
MessageRecordReqVO message = new MessageRecordReqVO();
|
MessageRecordReqVO message = new MessageRecordReqVO();
|
||||||
message.setId(1234567890L);
|
message.setId(1234567890L);
|
||||||
message.setAppName("NPQS-9000");
|
|
||||||
message.setChannel(ChannelTypeEnum.SMS.getMsg());
|
message.setChannel(ChannelTypeEnum.SMS.getMsg());
|
||||||
message.setReceiver("18839431215");
|
message.setReceiver("18839431215");
|
||||||
message.setContent("【南京灿能电力】测试短信" + i + ",请忽略。" + LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
message.setContent("【南京灿能电力】测试短信" + i + ",请忽略。" + LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
||||||
message.setProviderType(ProviderTypeEnum.TELECOM.getCode());
|
|
||||||
messageIdList.add(message);
|
messageIdList.add(message);
|
||||||
}
|
}
|
||||||
List<MessageSendResultVO> sendResult = messageRecordService.send(messageIdList);
|
List<MessageSendResultVO> sendResult = messageRecordService.send(messageIdList, ChannelTypeEnum.SMS);
|
||||||
|
|
||||||
Thread.sleep(9000);
|
Thread.sleep(9000);
|
||||||
System.out.println(JSON.toJSONString(sendResult));
|
System.out.println(JSON.toJSONString(sendResult));
|
||||||
|
|||||||
@@ -24,7 +24,7 @@
|
|||||||
<!-- Web 相关 -->
|
<!-- Web 相关 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.swagger.core.v3</groupId>
|
<groupId>io.swagger.core.v3</groupId>
|
||||||
<artifactId>swagger-annotations</artifactId>
|
<artifactId>swagger-annotations-jakarta</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- 参数校验 -->
|
<!-- 参数校验 -->
|
||||||
|
|||||||
7
pom.xml
7
pom.xml
@@ -27,6 +27,7 @@
|
|||||||
<!-- maven-surefire-plugin 暂时无法通过 bom 的依赖读取(兼容老版本 IDEA 2024 及以前版本) -->
|
<!-- maven-surefire-plugin 暂时无法通过 bom 的依赖读取(兼容老版本 IDEA 2024 及以前版本) -->
|
||||||
<lombok.version>1.18.42</lombok.version>
|
<lombok.version>1.18.42</lombok.version>
|
||||||
<spring.boot.version>3.5.9</spring.boot.version>
|
<spring.boot.version>3.5.9</spring.boot.version>
|
||||||
|
<swagger.jakarta.version>2.2.38</swagger.jakarta.version>
|
||||||
<mapstruct.version>1.6.3</mapstruct.version>
|
<mapstruct.version>1.6.3</mapstruct.version>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
</properties>
|
</properties>
|
||||||
@@ -119,6 +120,12 @@
|
|||||||
<artifactId>msgpush-spring-boot-starter-websocket</artifactId>
|
<artifactId>msgpush-spring-boot-starter-websocket</artifactId>
|
||||||
<version>${revision}</version>
|
<version>${revision}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.swagger.core.v3</groupId>
|
||||||
|
<artifactId>swagger-annotations-jakarta</artifactId>
|
||||||
|
<version>${swagger.jakarta.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user