微调
This commit is contained in:
@@ -18,10 +18,10 @@
|
|||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<!-- Spring Cloud 基础 -->
|
<!-- Spring Cloud 基础 -->
|
||||||
<!-- <dependency>-->
|
<dependency>
|
||||||
<!-- <groupId>com.njcn</groupId>-->
|
<groupId>com.njcn</groupId>
|
||||||
<!-- <artifactId>msgpush-spring-boot-starter-env</artifactId>-->
|
<artifactId>msgpush-spring-boot-starter-env</artifactId>
|
||||||
<!-- </dependency>-->
|
</dependency>
|
||||||
|
|
||||||
<!-- 依赖服务 -->
|
<!-- 依赖服务 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import org.mybatis.spring.annotation.MapperScan;
|
|||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
import org.springframework.context.annotation.ComponentScan;
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 项目的启动类
|
* 项目的启动类
|
||||||
@@ -13,6 +14,7 @@ import org.springframework.context.annotation.ComponentScan;
|
|||||||
@ComponentScan(basePackages = {"com.njcn.msgpush.module.push", "com.njcn.msgpush.framework.mybatis"})
|
@ComponentScan(basePackages = {"com.njcn.msgpush.module.push", "com.njcn.msgpush.framework.mybatis"})
|
||||||
@MapperScan("com.njcn.msgpush.module.*.dal.mysql")
|
@MapperScan("com.njcn.msgpush.module.*.dal.mysql")
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
|
@EnableScheduling
|
||||||
public class PushServerApplication {
|
public class PushServerApplication {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(PushServerApplication.class, args);
|
SpringApplication.run(PushServerApplication.class, args);
|
||||||
|
|||||||
@@ -1,22 +1,12 @@
|
|||||||
package com.njcn.msgpush.module.push.client.sender;
|
package com.njcn.msgpush.module.push.client.sender;
|
||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
|
||||||
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
|
|
||||||
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
|
|
||||||
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
|
|
||||||
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
|
|
||||||
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
|
|
||||||
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
|
|
||||||
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
|
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
|
||||||
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
|
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
|
||||||
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
|
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
|
||||||
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
|
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@@ -36,7 +26,7 @@ public class Sender {
|
|||||||
@Autowired
|
@Autowired
|
||||||
public RestTemplateUtil restTemplateUtil;
|
public RestTemplateUtil restTemplateUtil;
|
||||||
|
|
||||||
public final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
|
public final ThreadPoolExecutor MSG_PUSH_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
|
||||||
5,
|
5,
|
||||||
5,
|
5,
|
||||||
1000,
|
1000,
|
||||||
@@ -44,7 +34,7 @@ public class Sender {
|
|||||||
new java.util.concurrent.ArrayBlockingQueue<>(1000),
|
new java.util.concurrent.ArrayBlockingQueue<>(1000),
|
||||||
r -> {
|
r -> {
|
||||||
Thread thread = new Thread(r);
|
Thread thread = new Thread(r);
|
||||||
thread.setName("threadName-" + thread.getId());
|
thread.setName("msgPushThreadPool-" + thread.getId());
|
||||||
thread.setDaemon(false);
|
thread.setDaemon(false);
|
||||||
return thread;
|
return thread;
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -35,21 +35,6 @@ public class AliyunSmsSender implements SmsSender {
|
|||||||
|
|
||||||
private Client smsClient;
|
private Client smsClient;
|
||||||
|
|
||||||
// private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
|
|
||||||
// 5,
|
|
||||||
// 5,
|
|
||||||
// 1000,
|
|
||||||
// TimeUnit.MILLISECONDS,
|
|
||||||
// new java.util.concurrent.ArrayBlockingQueue<>(1000),
|
|
||||||
// r -> {
|
|
||||||
// Thread thread = new Thread(r);
|
|
||||||
// thread.setName("AliyunSmsSender-Pool-" + thread.getId());
|
|
||||||
// thread.setDaemon(false);
|
|
||||||
// return thread;
|
|
||||||
// },
|
|
||||||
// new ThreadPoolExecutor.CallerRunsPolicy()
|
|
||||||
// );
|
|
||||||
|
|
||||||
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) {
|
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) {
|
||||||
this.sender = sender;
|
this.sender = sender;
|
||||||
if (ObjectUtil.isNotNull(aliYunSmsSetting)) {
|
if (ObjectUtil.isNotNull(aliYunSmsSetting)) {
|
||||||
@@ -70,7 +55,7 @@ public class AliyunSmsSender implements SmsSender {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean sendSms(MessageRecordDO message) {
|
public boolean sendSms(MessageRecordDO message) {
|
||||||
Future<Boolean> future = this.sender.THREAD_POOL_EXECUTOR.submit(() -> {
|
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
|
||||||
message.setStatus(MessageStatusConstant.SENDING);
|
message.setStatus(MessageStatusConstant.SENDING);
|
||||||
RuntimeOptions runtimeOptions = new RuntimeOptions();
|
RuntimeOptions runtimeOptions = new RuntimeOptions();
|
||||||
// 设置自动重试,默认是不开启的。重试次数默认是3次
|
// 设置自动重试,默认是不开启的。重试次数默认是3次
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.njcn.msgpush.module.push.client.sender.impl;
|
package com.njcn.msgpush.module.push.client.sender.impl;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.njcn.msgpush.module.push.client.sender.Sender;
|
import com.njcn.msgpush.module.push.client.sender.Sender;
|
||||||
import com.njcn.msgpush.module.push.client.sender.SmsSender;
|
import com.njcn.msgpush.module.push.client.sender.SmsSender;
|
||||||
@@ -15,10 +16,13 @@ import org.springframework.http.ResponseEntity;
|
|||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -35,15 +39,22 @@ public class TelecomSmsSender implements SmsSender {
|
|||||||
private TelecomSmsSetting telecomSmsSetting;
|
private TelecomSmsSetting telecomSmsSetting;
|
||||||
private Sender sender;
|
private Sender sender;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 存放发送完成的消息。key为其返回的mid
|
||||||
|
*/
|
||||||
|
private Map<String, MessageRecordDO> completeSendMessageMap = new HashMap<>();
|
||||||
|
|
||||||
|
private ScheduledExecutorService scheduledExecutorService;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
private static class TelecomSmsResponse {
|
private static class TelecomSmsSendResponse {
|
||||||
private String status;
|
private String status;
|
||||||
private Double balance;
|
private Double balance;
|
||||||
private List<TelecomSmsMessageRes> list;
|
private List<TelecomSmsSendDetailRes> list;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
private static class TelecomSmsMessageRes {
|
private static class TelecomSmsSendDetailRes {
|
||||||
//消息ID(用于状态报告匹配)
|
//消息ID(用于状态报告匹配)
|
||||||
private String mid;
|
private String mid;
|
||||||
//手机号码
|
//手机号码
|
||||||
@@ -52,6 +63,22 @@ public class TelecomSmsSender implements SmsSender {
|
|||||||
private Integer result;
|
private Integer result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
private static class TelecomSmsSelectResponse {
|
||||||
|
private String status;
|
||||||
|
private Double balance;
|
||||||
|
private List<TelecomSmsSelectDetailRes> list;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
private static class TelecomSmsSelectDetailRes {
|
||||||
|
private String apmid;
|
||||||
|
private String apSubmitTime;
|
||||||
|
private String mobile;
|
||||||
|
private Integer status;
|
||||||
|
private String stat;
|
||||||
|
private String deliverTime;
|
||||||
|
}
|
||||||
|
|
||||||
public TelecomSmsSender(TelecomSmsSetting telecomSmsSetting, Sender sender) {
|
public TelecomSmsSender(TelecomSmsSetting telecomSmsSetting, Sender sender) {
|
||||||
this.telecomSmsSetting = telecomSmsSetting;
|
this.telecomSmsSetting = telecomSmsSetting;
|
||||||
@@ -60,7 +87,7 @@ public class TelecomSmsSender implements SmsSender {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean sendSms(MessageRecordDO message) {
|
public boolean sendSms(MessageRecordDO message) {
|
||||||
Future<Boolean> future = this.sender.THREAD_POOL_EXECUTOR.submit(() -> {
|
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
|
||||||
message.setStatus(MessageStatusConstant.SENDING);
|
message.setStatus(MessageStatusConstant.SENDING);
|
||||||
// 构建请求参数
|
// 构建请求参数
|
||||||
Map<String, Object> request = new HashMap<>();
|
Map<String, Object> request = new HashMap<>();
|
||||||
@@ -79,22 +106,32 @@ public class TelecomSmsSender implements SmsSender {
|
|||||||
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
||||||
message.setSendTime(now);
|
message.setSendTime(now);
|
||||||
// 发送请求
|
// 发送请求
|
||||||
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
|
ResponseEntity<String> response1 = this.sender.restTemplateUtil.post(
|
||||||
telecomSmsSetting.getApiUrl(),
|
telecomSmsSetting.getApiUrl(),
|
||||||
request,
|
request,
|
||||||
headers,
|
headers,
|
||||||
String.class
|
String.class
|
||||||
);
|
);
|
||||||
System.out.println(JSON.toJSONString(response));
|
|
||||||
LocalDateTime end = LocalDateTime.now();
|
LocalDateTime end = LocalDateTime.now();
|
||||||
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
|
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
|
||||||
TelecomSmsResponse telecomSmsResponse = JSON.parseObject(response.getBody(), TelecomSmsResponse.class);
|
TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response1.getBody(), TelecomSmsSendResponse.class);
|
||||||
|
|
||||||
if (response.getStatusCode() == HttpStatus.OK && telecomSmsResponse.list.get(0).result == 0) {
|
if (response1.getStatusCode() == HttpStatus.OK) {
|
||||||
|
String mid = telecomSmsSendResponse.list.get(0).mid;
|
||||||
|
completeSendMessageMap.put(mid, message);
|
||||||
|
// 定时任务,指定时间间隔后获取下行信息
|
||||||
|
if (ObjectUtil.isNull(this.scheduledExecutorService)) {
|
||||||
|
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
|
||||||
|
}
|
||||||
|
this.scheduledExecutorService.schedule(() -> {
|
||||||
|
this.getDownInfo(mid);
|
||||||
|
this.scheduledExecutorService.shutdown();
|
||||||
|
this.scheduledExecutorService = null;
|
||||||
|
}, 2, TimeUnit.SECONDS);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsResponse.list.get(0).result + "");
|
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + "");
|
||||||
message.setErrorCode(telecomSmsResponse.list.get(0).result + "");
|
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
|
||||||
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
|
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
|
||||||
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
|
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
|
||||||
return false;
|
return false;
|
||||||
@@ -144,11 +181,57 @@ public class TelecomSmsSender implements SmsSender {
|
|||||||
message.setSendTime(now);
|
message.setSendTime(now);
|
||||||
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
|
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
|
||||||
}
|
}
|
||||||
TelecomSmsResponse telecomSmsResponse = JSON.parseObject(response.getBody(), TelecomSmsResponse.class);
|
TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response.getBody(), TelecomSmsSendResponse.class);
|
||||||
boolean res = true;
|
boolean res = true;
|
||||||
for (TelecomSmsMessageRes telecomSmsMessageRes : telecomSmsResponse.list) {
|
for (TelecomSmsSendDetailRes telecomSmsSendDetailRes : telecomSmsSendResponse.list) {
|
||||||
res &= telecomSmsMessageRes.result == 0;
|
res &= telecomSmsSendDetailRes.result == 0;
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取下行信息
|
||||||
|
*
|
||||||
|
* @param mid
|
||||||
|
*/
|
||||||
|
private void getDownInfo(String mid) {
|
||||||
|
System.out.println("getDownInfo" + LocalDateTime.now());
|
||||||
|
// 构建请求参数
|
||||||
|
Map<String, Object> request = new HashMap<>();
|
||||||
|
request.put("action", "select");
|
||||||
|
request.put("account", telecomSmsSetting.getAccount());
|
||||||
|
request.put("password", telecomSmsSetting.getPassword());
|
||||||
|
request.put("date", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
||||||
|
request.put("condition", "APMID");
|
||||||
|
request.put("valueList", mid);
|
||||||
|
// request.put("condition", "MOBILE");
|
||||||
|
// request.put("valueList", message.getReceiver());
|
||||||
|
request.remove("mobile");
|
||||||
|
request.remove("content");
|
||||||
|
// 设置请求头
|
||||||
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.set("Content-Type", CONTENT_TYPE);
|
||||||
|
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
|
||||||
|
telecomSmsSetting.getApiUrl(),
|
||||||
|
request,
|
||||||
|
headers,
|
||||||
|
String.class
|
||||||
|
);
|
||||||
|
System.out.println(JSON.toJSONString(response.getBody()));
|
||||||
|
|
||||||
|
if (response.getStatusCode() == HttpStatus.OK) {
|
||||||
|
TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class);
|
||||||
|
TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0);
|
||||||
|
if (telecomSmsSelectDetailRes.getStatus() == 4) {
|
||||||
|
return;
|
||||||
|
} else if (telecomSmsSelectDetailRes.getStatus() == 5) {
|
||||||
|
MessageRecordDO message = completeSendMessageMap.get(mid);
|
||||||
|
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
|
||||||
|
message.setErrorCode(telecomSmsSelectDetailRes.getStat());
|
||||||
|
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
|
||||||
|
this.sender.messageRetryQueueService.addRetryMessage(message, RetryStrategyEnum.EXPONENTIAL_BACKOFF);
|
||||||
|
completeSendMessageMap.remove(mid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import io.swagger.v3.oas.annotations.Operation;
|
|||||||
import io.swagger.v3.oas.annotations.Parameter;
|
import io.swagger.v3.oas.annotations.Parameter;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
import jakarta.annotation.security.PermitAll;
|
import jakarta.annotation.security.PermitAll;
|
||||||
|
import jakarta.validation.Valid;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.security.access.prepost.PreAuthorize;
|
import org.springframework.security.access.prepost.PreAuthorize;
|
||||||
@@ -25,7 +26,6 @@ import static com.njcn.msgpush.framework.common.pojo.CommonResult.success;
|
|||||||
|
|
||||||
@Tag(name = "管理后台 - 消息")
|
@Tag(name = "管理后台 - 消息")
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Validated
|
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/push/message")
|
@RequestMapping("/push/message")
|
||||||
public class MessageRecordController {
|
public class MessageRecordController {
|
||||||
@@ -34,10 +34,10 @@ public class MessageRecordController {
|
|||||||
private MessageRecordService messageRecordService;
|
private MessageRecordService messageRecordService;
|
||||||
|
|
||||||
@PermitAll
|
@PermitAll
|
||||||
@PostMapping("send")
|
@PostMapping("/send")
|
||||||
@Operation(summary = "消息推送")
|
@Operation(summary = "消息推送")
|
||||||
@Parameter(name = "id", description = "编号", required = true, example = "1024")
|
@Parameter(name = "reqVOList", description = "消息列表")
|
||||||
public CommonResult<Boolean> send(@Validated @RequestBody List<MessageRecordReqVO> reqVOList) {
|
public CommonResult<Boolean> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
|
||||||
Boolean result = messageRecordService.send(reqVOList);
|
Boolean result = messageRecordService.send(reqVOList);
|
||||||
return CommonResult.success(result);
|
return CommonResult.success(result);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ public class MessageRecordReqVO extends PageParam {
|
|||||||
@NotBlank(message = "消息类型不能为空")
|
@NotBlank(message = "消息类型不能为空")
|
||||||
private String messageType;
|
private String messageType;
|
||||||
|
|
||||||
@Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED, example = "15601691300")
|
@Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED)
|
||||||
@NotBlank(message = "接收者不能为空")
|
@NotBlank(message = "接收者不能为空")
|
||||||
private String receiver;
|
private String receiver;
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author caozehui
|
* @author caozehui
|
||||||
* @date 2026-02-27
|
* @date 2026-02-27
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import com.njcn.msgpush.module.push.controller.admin.channel.vo.ChannelProviderC
|
|||||||
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
|
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
|
||||||
import com.njcn.msgpush.module.push.dal.mysql.channel.ChannelProviderConfigMapper;
|
import com.njcn.msgpush.module.push.dal.mysql.channel.ChannelProviderConfigMapper;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -42,6 +43,7 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProvide
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public void failureUpdate(String providerType, String channel) {
|
public void failureUpdate(String providerType, String channel) {
|
||||||
ChannelProviderConfigDO byTypeAndChannel = this.getByTypeAndChannel(providerType, channel);
|
ChannelProviderConfigDO byTypeAndChannel = this.getByTypeAndChannel(providerType, channel);
|
||||||
byTypeAndChannel.setFailureCount(byTypeAndChannel.getFailureCount() + 1);
|
byTypeAndChannel.setFailureCount(byTypeAndChannel.getFailureCount() + 1);
|
||||||
|
|||||||
@@ -77,6 +77,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public boolean updateStatus(String messageId, String status) {
|
public boolean updateStatus(String messageId, String status) {
|
||||||
return this.lambdaUpdate().eq(MessageRecordDO::getMessageId, messageId).set(MessageRecordDO::getStatus, status).update();
|
return this.lambdaUpdate().eq(MessageRecordDO::getMessageId, messageId).set(MessageRecordDO::getStatus, status).update();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ import java.time.LocalDateTime;
|
|||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@@ -32,6 +35,22 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
private MessageRecordService messageRecordService;
|
private MessageRecordService messageRecordService;
|
||||||
@Autowired
|
@Autowired
|
||||||
public ChannelProviderConfigService channelProviderConfigService;
|
public ChannelProviderConfigService channelProviderConfigService;
|
||||||
|
|
||||||
|
public final ThreadPoolExecutor MSG_RETRY_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
|
||||||
|
5,
|
||||||
|
5,
|
||||||
|
1000,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
new java.util.concurrent.ArrayBlockingQueue<>(1000),
|
||||||
|
r -> {
|
||||||
|
Thread thread = new Thread(r);
|
||||||
|
thread.setName("msgRetryThreadPool-" + thread.getId());
|
||||||
|
thread.setDaemon(false);
|
||||||
|
return thread;
|
||||||
|
},
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy()
|
||||||
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 默认每次处理的消息数量
|
* 默认每次处理的消息数量
|
||||||
*/
|
*/
|
||||||
@@ -80,7 +99,6 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public void processRetryBatch(String channel) {
|
public void processRetryBatch(String channel) {
|
||||||
// 从数据库查询需要重试的消息
|
// 从数据库查询需要重试的消息
|
||||||
// List<MessageRetryQueueDO> retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE);
|
// List<MessageRetryQueueDO> retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE);
|
||||||
@@ -149,20 +167,33 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
|||||||
* 处理单个消息的重试逻辑
|
* 处理单个消息的重试逻辑
|
||||||
*/
|
*/
|
||||||
private void processSingleRetry(MessageRecordDO messageRecordDO) {
|
private void processSingleRetry(MessageRecordDO messageRecordDO) {
|
||||||
// todo 调用消息发送接口进行重试
|
// 异步调用消息发送接口进行重试
|
||||||
boolean sendResult = messageRecordService.processSendMsg(messageRecordDO);
|
CompletableFuture<Boolean> sendFuture = CompletableFuture.supplyAsync(() ->
|
||||||
|
messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR
|
||||||
|
);
|
||||||
|
|
||||||
if (sendResult) {
|
sendFuture.orTimeout(5, TimeUnit.SECONDS)
|
||||||
// 重试成功,删除重试记录
|
.thenAccept(sendResult -> {
|
||||||
super.baseMapper.deleteByMessageId(messageRecordDO.getMessageId());
|
if (sendResult) {
|
||||||
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
|
log.info("处理消息重试成功逻辑:messageId={}", messageRecordDO.getMessageId());
|
||||||
|
super.baseMapper.deleteByMessageId(messageRecordDO.getMessageId());
|
||||||
log.info("消息重试成功并已清除: messageId={}", messageRecordDO.getMessageId());
|
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
|
||||||
} else {
|
} else {
|
||||||
// 重试失败,更新重试信息
|
// 重试失败,更新重试信息
|
||||||
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
|
log.error("处理消息重试失败逻辑:messageId={}", messageRecordDO.getMessageId());
|
||||||
handleRetryFailure(messageRecordDO);
|
channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
|
||||||
}
|
handleRetryFailure(messageRecordDO);
|
||||||
|
}
|
||||||
|
}).exceptionally(ex -> {
|
||||||
|
log.error("异步执行消息重试发生异常:messageId={}", messageRecordDO.getMessageId(), ex);
|
||||||
|
// 发生异常时也尝试处理失败逻辑,避免消息丢失
|
||||||
|
try {
|
||||||
|
handleRetryFailure(messageRecordDO);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("异常处理后再次失败:messageId={}", messageRecordDO.getMessageId(), e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -106,20 +106,5 @@ msgpush:
|
|||||||
|
|
||||||
|
|
||||||
debug: false
|
debug: false
|
||||||
aliyun:
|
|
||||||
sms:
|
|
||||||
access-key-id: LTAI4FxsR76x2dq3w9c5puUe
|
|
||||||
access-key-secret: GxkTR8fsrvHtixTlD9UPmOGli35tZs
|
|
||||||
regionId: cn-hangzhou
|
|
||||||
endpoint: dysmsapi.aliyuncs.com
|
|
||||||
mail:
|
|
||||||
access-key-id: LTAI4FxsR76x2dq3w9c5puUe
|
|
||||||
access-key-secret: GxkTR8fsrvHtixTlD9UPmOGli35tZs
|
|
||||||
regionId: cn-hangzhou
|
|
||||||
endpoint: dm.aliyuncs.com
|
|
||||||
|
|
||||||
telecom:
|
|
||||||
sms:
|
|
||||||
account: 925631
|
|
||||||
password: AMW2pOVrdky
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,10 +7,12 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author caozehui
|
* @author caozehui
|
||||||
@@ -18,13 +20,14 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@SpringBootTest
|
@SpringBootTest
|
||||||
|
@EnableScheduling
|
||||||
public class MsgPushClientTest {
|
public class MsgPushClientTest {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private MessageRecordService messageRecordService;
|
private MessageRecordService messageRecordService;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSend() {
|
public void testSend() throws InterruptedException {
|
||||||
List<MessageRecordReqVO> messageIdList = new ArrayList<>();
|
List<MessageRecordReqVO> messageIdList = new ArrayList<>();
|
||||||
// for (int i = 0; i < 5; i++) {
|
// for (int i = 0; i < 5; i++) {
|
||||||
// MessageRecordReqVO message = new MessageRecordReqVO();
|
// MessageRecordReqVO message = new MessageRecordReqVO();
|
||||||
@@ -40,7 +43,7 @@ public class MsgPushClientTest {
|
|||||||
// }
|
// }
|
||||||
for (int i = 0; i < 1; i++) {
|
for (int i = 0; i < 1; i++) {
|
||||||
MessageRecordReqVO message = new MessageRecordReqVO();
|
MessageRecordReqVO message = new MessageRecordReqVO();
|
||||||
message.setMessageId(i + "2345dc");
|
message.setMessageId(String.valueOf(UUID.randomUUID()));
|
||||||
message.setAppName("NPQS-9000");
|
message.setAppName("NPQS-9000");
|
||||||
message.setChannel(MsgPushConstant.CHANNEL_SMS);
|
message.setChannel(MsgPushConstant.CHANNEL_SMS);
|
||||||
message.setReceiver("18839431215");
|
message.setReceiver("18839431215");
|
||||||
@@ -49,11 +52,8 @@ public class MsgPushClientTest {
|
|||||||
messageIdList.add(message);
|
messageIdList.add(message);
|
||||||
}
|
}
|
||||||
boolean sendResult = messageRecordService.send(messageIdList);
|
boolean sendResult = messageRecordService.send(messageIdList);
|
||||||
|
|
||||||
|
Thread.sleep(9000);
|
||||||
System.out.println(sendResult);
|
System.out.println(sendResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user