前置联调

This commit is contained in:
hzj
2025-05-14 10:14:18 +08:00
parent 4b10860997
commit 6d8df57ad5
20 changed files with 534 additions and 58 deletions

View File

@@ -75,10 +75,10 @@
<artifactId>rt-api</artifactId>
<version>1.0.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.njcn</groupId>-->
<!-- <artifactId>pq-device-api</artifactId>-->
<!-- <version>1.0.0</version>-->
<dependency>
<groupId>com.njcn</groupId>
<artifactId>pq-device-api</artifactId>
<version>1.0.0</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>pqs-influx</artifactId>-->
@@ -89,7 +89,7 @@
<!--&lt;!&ndash; <groupId>com.njcn</groupId>&ndash;&gt;-->
<!--&lt;!&ndash; </exclusion>&ndash;&gt;-->
<!-- </exclusions>-->
<!-- </dependency>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>

View File

@@ -0,0 +1,147 @@
package com.njcn.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.enums.FrontTypeEnum;
import com.njcn.message.messagedto.FrontHeartBeatDTO;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.api.MessAnalysisFeignClient;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* Description:
* Date: 2024/12/13 10:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@RocketMQMessageListener(
topic = "Heart_Beat_Topic",
consumerGroup = "Heartb_Beat_Topic_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class FrontHeartBeatConsumer extends EnhanceConsumerMessageHandler<FrontHeartBeatDTO> implements RocketMQListener<String> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Autowired
private MessAnalysisFeignClient messAnalysisFeignClient;
@Override
public void onMessage(String message) {
FrontHeartBeatDTO frontHeartBeatDTO = JSONObject.parseObject(message, FrontHeartBeatDTO.class);
super.dispatchMessage(frontHeartBeatDTO);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(FrontHeartBeatDTO message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(FrontHeartBeatDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
@Override
protected void handleMessage(FrontHeartBeatDTO message) {
//将心跳状态存到redis失效时间为30s如果持续有心跳则进程在线反之redis找不到则不在线
if(Objects.equals(message.getFronttype(), FrontTypeEnum.STAT.getCode())){
redisUtil.saveByKeyWithExpire(message.getNodeId().concat(":").concat(message.getProcessNum()+""),message.getStatus(),30L);
}
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(FrontHeartBeatDTO message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
}

View File

@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.message.redis.RedisKeyPrefix;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.message.websocket.WebSocketServer;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;

View File

@@ -0,0 +1,143 @@
package com.njcn.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.messagedto.TopicReplyDTO;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.api.MessAnalysisFeignClient;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* Description:
* Date: 2024/12/13 10:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@RocketMQMessageListener(
topic = "Topic_Reply_Topic",
consumerGroup = "Topic_Reply_Topic_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class TopicReplyConsumer extends EnhanceConsumerMessageHandler<TopicReplyDTO> implements RocketMQListener<String> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Autowired
private MessAnalysisFeignClient messAnalysisFeignClient;
@Override
public void onMessage(String message) {
TopicReplyDTO topicReplyDTO = JSONObject.parseObject(message,TopicReplyDTO.class);
super.dispatchMessage(topicReplyDTO);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(TopicReplyDTO message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(TopicReplyDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
@Override
protected void handleMessage(TopicReplyDTO message) {
//业务处理
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),message.getResult(),60*60L);
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(TopicReplyDTO message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
}

View File

@@ -19,6 +19,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.*;
/**
@@ -48,7 +49,7 @@ public class ProduceController extends BaseController {
String methodDescribe = getMethodDescribe("recall");
BaseMessage baseMessage = new BaseMessage();
baseMessage.setMessageBody(JSONObject.toJSONString(message));
recallMessaggeTemplate.sendMember(baseMessage);
recallMessaggeTemplate.sendMember(baseMessage,message.getNodeId());
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@@ -60,7 +61,7 @@ public class ProduceController extends BaseController {
String methodDescribe = getMethodDescribe("recall");
BaseMessage baseMessage = new BaseMessage();
baseMessage.setMessageBody(JSONObject.toJSONString(message));
deviceRebootMessageTemplate.sendMember(baseMessage);
deviceRebootMessageTemplate.sendMember(baseMessage,message.getNodeId());
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@@ -72,7 +73,7 @@ public class ProduceController extends BaseController {
String methodDescribe = getMethodDescribe("recall");
BaseMessage baseMessage = new BaseMessage();
baseMessage.setMessageBody(JSONObject.toJSONString(message));
processRebootMessageTemplate.sendMember(baseMessage);
processRebootMessageTemplate.sendMember(baseMessage,message.getNodeId());
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,72 @@
package com.njcn.message.produce;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.messagedto.FrontHeartBeatDTO;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.Message;
/**
* Description:
* Date: 2025/05/08 下午 3:35【需求编号】
*
* @author clam
* @version V1.0.0
*/
public class ScheduledProducer {
public static void main(String[] args) throws Exception {
// 1. 创建生产者实例
// 参数为生产者组名(需要确保唯一性)
DefaultMQProducer producer = new DefaultMQProducer("scheduled_producer_group");
// 2. 设置NameServer地址多个地址用分号分隔
producer.setNamesrvAddr("192.168.1.24:9876");
// 3. 设置发送超时时间(毫秒)
producer.setSendMsgTimeout(5000);
// 4. 启动生产者
producer.start();
System.out.println("Producer Started");
// 5. 创建定时任务线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
FrontHeartBeatDTO frontHeartBeatDTO = new FrontHeartBeatDTO();
frontHeartBeatDTO.setNodeId("7d67349a44a9a4e1d02417f31e310a28");
frontHeartBeatDTO.setStatus("1");
frontHeartBeatDTO.setProcessNum(1);
// 6. 每30秒执行一次消息发送
scheduler.scheduleAtFixedRate(() -> {
try {
// 创建消息对象
// 参数Topic名称、Tag标签、消息内容
Message msg = new Message(
"Heartb_Beat_Topic",
"Test_Tag","Test_Keys",
JSONObject.toJSONString(frontHeartBeatDTO).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息(同步方式)
producer.send(msg);
System.out.printf("%s 发送消息成功: %s %n",
new java.util.Date(), msg);
} catch (Exception e) {
System.err.println("消息发送失败: " + e.getMessage());
e.printStackTrace();
}
}, 0, 30, TimeUnit.SECONDS); // 初始延迟0秒周期30秒
// 7. 添加JVM关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("关闭生产者...");
scheduler.shutdown();
producer.shutdown();
}));
}
}

View File

@@ -22,9 +22,9 @@ public class AskRealDataMessaggeTemplate extends RocketMQEnhanceTemplate {
super(template);
}
public SendResult sendMember(BaseMessage askRealDataMessage) {
public SendResult sendMember(BaseMessage askRealDataMessage,String nodeId) {
askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE);
askRealDataMessage.setKey("Test_Keys");
return send(BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage);
return send(nodeId+"_"+BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage);
}
}

View File

@@ -22,9 +22,9 @@ public class DeviceRebootMessageTemplate extends RocketMQEnhanceTemplate {
super(template);
}
public SendResult sendMember(BaseMessage baseMessage) {
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
baseMessage.setKey("Test_Keys");
return send(BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage);
return send(nodeId+"_"+BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage);
}
}

View File

@@ -22,9 +22,9 @@ public class ProcessRebootMessageTemplate extends RocketMQEnhanceTemplate {
super(template);
}
public SendResult sendMember(BaseMessage baseMessage) {
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
baseMessage.setKey("Test_Keys");
return send(BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage);
return send(nodeId+"_"+BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage);
}
}

View File

@@ -22,9 +22,9 @@ public class RecallMessaggeTemplate extends RocketMQEnhanceTemplate {
super(template);
}
public SendResult sendMember(BaseMessage recallMessage) {
public SendResult sendMember(BaseMessage recallMessage,String nodeId) {
recallMessage.setSource(BusinessResource.WEB_RESOURCE);
recallMessage.setKey("Test_Keys");
return send(BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage);
return send(nodeId+"_"+BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage);
}
}

View File

@@ -1,5 +1,6 @@
package com.njcn.message.redis;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.message.websocket.WebSocketServer;
import com.njcn.redis.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;

View File

@@ -1,18 +0,0 @@
package com.njcn.message.redis;
/**
* Description:
* Date: 2025/03/12 下午 4:23【需求编号】
*前缀
* @author clam
* @version V1.0.0
*/
public interface RedisKeyPrefix {
String REAL_TIME_DATA = "REAL_TIME_DATA:";
String HARMMONIC_TOPIC = "HARMMONIC_TOPIC:";
String DEVICE_RUN_FLAG = "DEVICE_RUN_FLAG:";
String ASK_REAL_DATA = "ASK_REAL_DATA:";
}

View File

@@ -3,9 +3,11 @@ package com.njcn.message.websocket;
import com.alibaba.fastjson.JSONObject;
import com.njcn.device.pq.api.DeviceFeignClient;
import com.njcn.device.pq.pojo.dto.DeviceDTO;
import com.njcn.message.message.AskRealDataMessage;
import com.njcn.message.produce.template.AskRealDataMessaggeTemplate;
import com.njcn.message.redis.RedisKeyPrefix;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.redis.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
@@ -46,13 +48,13 @@ public class WebSocketServer {
}
// private static LineFeignClient lineFeignClient;
//
// // 注入的时候,给类的 service 注入
// @Autowired
// public void setLineFeignClient(LineFeignClient lineFeignClient) {
// WebSocketServer.lineFeignClient = lineFeignClient;
// }
private static DeviceFeignClient deviceFeignClient;
// 注入的时候,给类的 service 注入
@Autowired
public void setLineFeignClient(DeviceFeignClient deviceFeignClient) {
WebSocketServer.deviceFeignClient = deviceFeignClient;
}
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
@@ -126,9 +128,10 @@ public class WebSocketServer {
// LineDetailDataVO data = lineFeignClient.getLineDetailData(split[1]).getData();
askRealDataMessage.setDevSeries(split[2]);
BaseMessage baseMessage = new BaseMessage();
DeviceDTO data = deviceFeignClient.getDeviceInfo(split[2]).getData();
baseMessage.setMessageBody(JSONObject.toJSONString(askRealDataMessage));
// 发送消息到topic1
askRealDataMessaggeTemplate.sendMember(baseMessage);
askRealDataMessaggeTemplate.sendMember(baseMessage,data.getNodeId());
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.ASK_REAL_DATA.concat(split[1]),RedisKeyPrefix.ASK_REAL_DATA.concat(split[1]),20L);
log.info("监测点连接:" + userIdAndLineIdAndDevId + ",当前在线监测点数为:" + getOnlineCount());