diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java index 33a523a..559197d 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java @@ -67,35 +67,46 @@ public class DataRecallController extends BaseController { RecallMessage message = new RecallMessage(); if(CollectionUtils.isEmpty(param.getData())){ - LocalDate localDate = Objects.isNull(param.getReCallTime())?LocalDate.now().plusDays(-1):param.getReCallTime(); - List runMonitorIds = commTerminalGeneralClient.getRunMonitorIds().getData(); - List recallDTOList = new ArrayList<>(); - runMonitorIds.forEach(temp->{ - LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData(); - Integer timeInterval = data.getTimeInterval(); - List localDateTimeList = generateTimeIntervals(localDate, timeInterval); - List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(localDate, DatePattern.NORM_DATE_PATTERN)); - localDateTimeList.removeAll(data1); - if(!CollectionUtils.isEmpty(localDateTimeList)){ - List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); - RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); - recallDTO.setDataType("0"); - recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); - recallDTO.setTimeInterval(timePeriod); - recallDTOList.add(recallDTO); - } - }); + List runMonitorIds = new ArrayList<>(); + List recallDTOList = new ArrayList<>(); + if(CollectionUtils.isEmpty(param.getMonitorId())){ + runMonitorIds = commTerminalGeneralClient.getRunMonitorIds().getData(); + }else { + runMonitorIds = param.getMonitorId(); + } + LocalDate currentDate = param.getReCallStartTime(); + //循环每一天 + while (!currentDate.isAfter(param.getReCallEndTime())) { + LocalDate finalCurrentDate = currentDate; + runMonitorIds.forEach(temp->{ + LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData(); + Integer timeInterval = data.getTimeInterval(); + List localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval); + List data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN)); + localDateTimeList.removeAll(data1); + if(!CollectionUtils.isEmpty(localDateTimeList)){ + List timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval); + RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO(); + recallDTO.setDataType("0"); + recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList())); + recallDTO.setTimeInterval(timePeriod); + + recallDTOList.add(recallDTO); + } + }); + currentDate = currentDate.plusDays(1); + } + + if(!CollectionUtils.isEmpty(recallDTOList)){ message.setData(recallDTOList); -// baseMessage.setMessageBody(JSONObject.toJSONString(message)); produceFeignClient.recall(message); } }else { message.setData(param.getData()); -// baseMessage.setMessageBody(JSONObject.toJSONString(message)); produceFeignClient.recall(message); } diff --git a/message/message-boot/src/main/java/com/njcn/message/redis/RedisKeyPrefix.java b/message/message-api/src/main/java/com/njcn/message/constant/RedisKeyPrefix.java similarity index 74% rename from message/message-boot/src/main/java/com/njcn/message/redis/RedisKeyPrefix.java rename to message/message-api/src/main/java/com/njcn/message/constant/RedisKeyPrefix.java index 73f1b46..0c75469 100644 --- a/message/message-boot/src/main/java/com/njcn/message/redis/RedisKeyPrefix.java +++ b/message/message-api/src/main/java/com/njcn/message/constant/RedisKeyPrefix.java @@ -1,4 +1,4 @@ -package com.njcn.message.redis; +package com.njcn.message.constant; /** * Description: @@ -14,5 +14,9 @@ public interface RedisKeyPrefix { String DEVICE_RUN_FLAG = "DEVICE_RUN_FLAG:"; + String HEART_BEAT = "HEART_BEAT:"; + + String TOPIC_REPLY = "TOPIC_REPLY:"; + String ASK_REAL_DATA = "ASK_REAL_DATA:"; } diff --git a/message/message-api/src/main/java/com/njcn/message/enums/FrontTypeEnum.java b/message/message-api/src/main/java/com/njcn/message/enums/FrontTypeEnum.java new file mode 100644 index 0000000..09b5021 --- /dev/null +++ b/message/message-api/src/main/java/com/njcn/message/enums/FrontTypeEnum.java @@ -0,0 +1,32 @@ +package com.njcn.message.enums; + +import lombok.Getter; + +/** + * Description: + * Date: 2024/11/13 15:24【需求编号】 + *01:稳态,02:短时闪变,03:长时闪变,04:暂态,05:告警 + * @author clam + * @version V1.0.0 + */ +@Getter +public enum FrontTypeEnum { + //前置进程类型"stat/recall/3s/comtrade + + + STAT("stat", "稳态"), + + RECALL("recall", "补招"), + + REALTIME("realTime", "实时数据"), + COMTRADE("comtrade", "暂态"); + + private final String code; + + private final String message; + + FrontTypeEnum(String code, String message) { + this.code = code; + this.message = message; + } +} diff --git a/message/message-api/src/main/java/com/njcn/message/message/DeviceRebootMessage.java b/message/message-api/src/main/java/com/njcn/message/message/DeviceRebootMessage.java index 64768d8..b772e2b 100644 --- a/message/message-api/src/main/java/com/njcn/message/message/DeviceRebootMessage.java +++ b/message/message-api/src/main/java/com/njcn/message/message/DeviceRebootMessage.java @@ -15,6 +15,11 @@ import java.util.List; public class DeviceRebootMessage { private String code; private Integer index; + private Integer processNo; + + + private String guid; + private String nodeId; private List data; diff --git a/message/message-api/src/main/java/com/njcn/message/message/ProcessRebootMessage.java b/message/message-api/src/main/java/com/njcn/message/message/ProcessRebootMessage.java index 5990a32..2435f76 100644 --- a/message/message-api/src/main/java/com/njcn/message/message/ProcessRebootMessage.java +++ b/message/message-api/src/main/java/com/njcn/message/message/ProcessRebootMessage.java @@ -16,20 +16,22 @@ public class ProcessRebootMessage { //set_process private String code="set_process"; + private String guid; private Integer index; - private List data; + private String nodeId; + + //reset/add",重置或者添加进程 + private String fun; + //重置启动多少个进程,或者添加第几个进程 + private Integer processNum; + + private Integer processNo; + + //"stat/recall/all"//重置的进程类型:stat或者recall或者所有进程;添加的进程类型:stat或者recall或者stat和recall;只有稳态和补招是多进程 + private String frontType; - @Data - public static class RebootData { - //reset/add",重置或者添加进程 - private String fun; - //重置启动多少个进程,或者添加第几个进程 - private Integer processNum; - //"stat/recall/all"//重置的进程类型:stat或者recall或者所有进程;添加的进程类型:stat或者recall或者stat和recall;只有稳态和补招是多进程 - private String frontType; - } } diff --git a/message/message-api/src/main/java/com/njcn/message/message/RecallMessage.java b/message/message-api/src/main/java/com/njcn/message/message/RecallMessage.java index f60140b..f518370 100644 --- a/message/message-api/src/main/java/com/njcn/message/message/RecallMessage.java +++ b/message/message-api/src/main/java/com/njcn/message/message/RecallMessage.java @@ -17,7 +17,13 @@ import java.util.List; @Data public class RecallMessage { @JsonFormat(pattern = "yyyy-MM-dd") - private LocalDate reCallTime; + private LocalDate reCallStartTime; + + @JsonFormat(pattern = "yyyy-MM-dd") + private LocalDate reCallEndTime; + private String nodeId; + + private List monitorId; private List data; @Data diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/FrontHeartBeatDTO.java b/message/message-api/src/main/java/com/njcn/message/messagedto/FrontHeartBeatDTO.java new file mode 100644 index 0000000..4d2498c --- /dev/null +++ b/message/message-api/src/main/java/com/njcn/message/messagedto/FrontHeartBeatDTO.java @@ -0,0 +1,25 @@ +package com.njcn.message.messagedto; + +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.Data; + +import java.io.Serializable; + +/** + * Description: + * Date: 2025/05/08 下午 3:09【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class FrontHeartBeatDTO extends BaseMessage implements Serializable { + //前置id + private String nodeId; + //前置进程号 + private Integer processNum; + //前置进程类型"stat/recall/realTime/comtrade + private String fronttype; + //进程状态(0:异常,1:正常) + private String status; +} diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java b/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java new file mode 100644 index 0000000..a6646d5 --- /dev/null +++ b/message/message-api/src/main/java/com/njcn/message/messagedto/TopicReplyDTO.java @@ -0,0 +1,24 @@ +package com.njcn.message.messagedto; + +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.Data; + +import java.io.Serializable; + +/** + * Description: + * Date: 2025/05/08 下午 3:09【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class TopicReplyDTO extends BaseMessage implements Serializable { + //消息id + private String guid; + + private String step; + + private String result; + +} diff --git a/message/message-boot/pom.xml b/message/message-boot/pom.xml index 3a60588..b7676be 100644 --- a/message/message-boot/pom.xml +++ b/message/message-boot/pom.xml @@ -75,10 +75,10 @@ rt-api 1.0.0 - - - - + + com.njcn + pq-device-api + 1.0.0 @@ -89,7 +89,7 @@ - + org.springframework.boot spring-boot-starter-websocket diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java new file mode 100644 index 0000000..283754a --- /dev/null +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java @@ -0,0 +1,147 @@ +package com.njcn.message.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.message.constant.MessageStatus; +import com.njcn.message.enums.FrontTypeEnum; +import com.njcn.message.messagedto.FrontHeartBeatDTO; +import com.njcn.message.constant.RedisKeyPrefix; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.api.MessAnalysisFeignClient; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * Description: + * Date: 2024/12/13 10:06【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Component +@RocketMQMessageListener( + topic = "Heart_Beat_Topic", + consumerGroup = "Heartb_Beat_Topic_Consumer", + selectorExpression = "Test_Tag||Test_Keys", + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class FrontHeartBeatConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Autowired + private MessAnalysisFeignClient messAnalysisFeignClient; + @Override + public void onMessage(String message) { + FrontHeartBeatDTO frontHeartBeatDTO = JSONObject.parseObject(message, FrontHeartBeatDTO.class); + super.dispatchMessage(frontHeartBeatDTO); + + } + + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(FrontHeartBeatDTO message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); + return false; + } + return true; + } + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(FrontHeartBeatDTO message) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + + @Override + protected void handleMessage(FrontHeartBeatDTO message) { + //将心跳状态存到redis失效时间为30s,如果持续有心跳则进程在线反之,redis找不到则不在线 + if(Objects.equals(message.getFronttype(), FrontTypeEnum.STAT.getCode())){ + redisUtil.saveByKeyWithExpire(message.getNodeId().concat(":").concat(message.getProcessNum()+""),message.getStatus(),30L); + + } + } + + + + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(FrontHeartBeatDTO message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + + +} diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java index afddb15..12ef05d 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.njcn.message.constant.MessageStatus; import com.njcn.message.messagedto.MessageDataDTO; -import com.njcn.message.redis.RedisKeyPrefix; +import com.njcn.message.constant.RedisKeyPrefix; import com.njcn.message.websocket.WebSocketServer; import com.njcn.middle.rocket.constant.EnhanceMessageConstant; import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java new file mode 100644 index 0000000..3303e6c --- /dev/null +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java @@ -0,0 +1,143 @@ +package com.njcn.message.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.message.constant.MessageStatus; +import com.njcn.message.messagedto.TopicReplyDTO; +import com.njcn.message.constant.RedisKeyPrefix; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.api.MessAnalysisFeignClient; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * Description: + * Date: 2024/12/13 10:06【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Component +@RocketMQMessageListener( + topic = "Topic_Reply_Topic", + consumerGroup = "Topic_Reply_Topic_Consumer", + selectorExpression = "Test_Tag||Test_Keys", + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class TopicReplyConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Autowired + private MessAnalysisFeignClient messAnalysisFeignClient; + @Override + public void onMessage(String message) { + TopicReplyDTO topicReplyDTO = JSONObject.parseObject(message,TopicReplyDTO.class); + super.dispatchMessage(topicReplyDTO); + + } + + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(TopicReplyDTO message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); + return false; + } + return true; + } + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(TopicReplyDTO message) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + + @Override + protected void handleMessage(TopicReplyDTO message) { + //业务处理 + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),message.getResult(),60*60L); + } + + + + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(TopicReplyDTO message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + + +} diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java b/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java index a3e2139..eb80028 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java @@ -19,6 +19,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; +import org.apache.rocketmq.client.producer.SendResult; import org.springframework.web.bind.annotation.*; /** @@ -48,7 +49,7 @@ public class ProduceController extends BaseController { String methodDescribe = getMethodDescribe("recall"); BaseMessage baseMessage = new BaseMessage(); baseMessage.setMessageBody(JSONObject.toJSONString(message)); - recallMessaggeTemplate.sendMember(baseMessage); + recallMessaggeTemplate.sendMember(baseMessage,message.getNodeId()); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -60,7 +61,7 @@ public class ProduceController extends BaseController { String methodDescribe = getMethodDescribe("recall"); BaseMessage baseMessage = new BaseMessage(); baseMessage.setMessageBody(JSONObject.toJSONString(message)); - deviceRebootMessageTemplate.sendMember(baseMessage); + deviceRebootMessageTemplate.sendMember(baseMessage,message.getNodeId()); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -72,7 +73,7 @@ public class ProduceController extends BaseController { String methodDescribe = getMethodDescribe("recall"); BaseMessage baseMessage = new BaseMessage(); baseMessage.setMessageBody(JSONObject.toJSONString(message)); - processRebootMessageTemplate.sendMember(baseMessage); + processRebootMessageTemplate.sendMember(baseMessage,message.getNodeId()); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/ScheduledProducer.java b/message/message-boot/src/main/java/com/njcn/message/produce/ScheduledProducer.java new file mode 100644 index 0000000..ccef87c --- /dev/null +++ b/message/message-boot/src/main/java/com/njcn/message/produce/ScheduledProducer.java @@ -0,0 +1,72 @@ +package com.njcn.message.produce; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.message.messagedto.FrontHeartBeatDTO; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.rocketmq.common.message.Message; +/** + * Description: + * Date: 2025/05/08 下午 3:35【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public class ScheduledProducer { + public static void main(String[] args) throws Exception { + // 1. 创建生产者实例 + // 参数为生产者组名(需要确保唯一性) + DefaultMQProducer producer = new DefaultMQProducer("scheduled_producer_group"); + + // 2. 设置NameServer地址(多个地址用分号分隔) + producer.setNamesrvAddr("192.168.1.24:9876"); + + // 3. 设置发送超时时间(毫秒) + producer.setSendMsgTimeout(5000); + + // 4. 启动生产者 + producer.start(); + System.out.println("Producer Started"); + + // 5. 创建定时任务线程池 + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + FrontHeartBeatDTO frontHeartBeatDTO = new FrontHeartBeatDTO(); + frontHeartBeatDTO.setNodeId("7d67349a44a9a4e1d02417f31e310a28"); + frontHeartBeatDTO.setStatus("1"); + frontHeartBeatDTO.setProcessNum(1); + // 6. 每30秒执行一次消息发送 + scheduler.scheduleAtFixedRate(() -> { + try { + // 创建消息对象 + // 参数:Topic名称、Tag标签、消息内容 + + Message msg = new Message( + "Heartb_Beat_Topic", + "Test_Tag","Test_Keys", + JSONObject.toJSONString(frontHeartBeatDTO).getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + + // 发送消息(同步方式) + producer.send(msg); + + System.out.printf("%s 发送消息成功: %s %n", + new java.util.Date(), msg); + } catch (Exception e) { + System.err.println("消息发送失败: " + e.getMessage()); + e.printStackTrace(); + } + }, 0, 30, TimeUnit.SECONDS); // 初始延迟0秒,周期30秒 + + // 7. 添加JVM关闭钩子 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("关闭生产者..."); + scheduler.shutdown(); + producer.shutdown(); + })); + } +} diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java index 49524a5..893330d 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java @@ -22,9 +22,9 @@ public class AskRealDataMessaggeTemplate extends RocketMQEnhanceTemplate { super(template); } - public SendResult sendMember(BaseMessage askRealDataMessage) { + public SendResult sendMember(BaseMessage askRealDataMessage,String nodeId) { askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE); askRealDataMessage.setKey("Test_Keys"); - return send(BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage); + return send(nodeId+"_"+BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java index 10a784e..f0dd9c1 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java @@ -22,9 +22,9 @@ public class DeviceRebootMessageTemplate extends RocketMQEnhanceTemplate { super(template); } - public SendResult sendMember(BaseMessage baseMessage) { + public SendResult sendMember(BaseMessage baseMessage,String nodeId) { baseMessage.setSource(BusinessResource.WEB_RESOURCE); baseMessage.setKey("Test_Keys"); - return send(BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage); + return send(nodeId+"_"+BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java index 366bc29..155c510 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java @@ -22,9 +22,9 @@ public class ProcessRebootMessageTemplate extends RocketMQEnhanceTemplate { super(template); } - public SendResult sendMember(BaseMessage baseMessage) { + public SendResult sendMember(BaseMessage baseMessage,String nodeId) { baseMessage.setSource(BusinessResource.WEB_RESOURCE); baseMessage.setKey("Test_Keys"); - return send(BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage); + return send(nodeId+"_"+BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java index 7798e8a..34df5ee 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java @@ -22,9 +22,9 @@ public class RecallMessaggeTemplate extends RocketMQEnhanceTemplate { super(template); } - public SendResult sendMember(BaseMessage recallMessage) { + public SendResult sendMember(BaseMessage recallMessage,String nodeId) { recallMessage.setSource(BusinessResource.WEB_RESOURCE); recallMessage.setKey("Test_Keys"); - return send(BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage); + return send(nodeId+"_"+BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/redis/RedisKeyExpirationListener.java b/message/message-boot/src/main/java/com/njcn/message/redis/RedisKeyExpirationListener.java index ed5dfff..87f37c1 100644 --- a/message/message-boot/src/main/java/com/njcn/message/redis/RedisKeyExpirationListener.java +++ b/message/message-boot/src/main/java/com/njcn/message/redis/RedisKeyExpirationListener.java @@ -1,5 +1,6 @@ package com.njcn.message.redis; +import com.njcn.message.constant.RedisKeyPrefix; import com.njcn.message.websocket.WebSocketServer; import com.njcn.redis.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; diff --git a/message/message-boot/src/main/java/com/njcn/message/websocket/WebSocketServer.java b/message/message-boot/src/main/java/com/njcn/message/websocket/WebSocketServer.java index 741cf5f..4c7437c 100644 --- a/message/message-boot/src/main/java/com/njcn/message/websocket/WebSocketServer.java +++ b/message/message-boot/src/main/java/com/njcn/message/websocket/WebSocketServer.java @@ -3,9 +3,11 @@ package com.njcn.message.websocket; import com.alibaba.fastjson.JSONObject; +import com.njcn.device.pq.api.DeviceFeignClient; +import com.njcn.device.pq.pojo.dto.DeviceDTO; import com.njcn.message.message.AskRealDataMessage; import com.njcn.message.produce.template.AskRealDataMessaggeTemplate; -import com.njcn.message.redis.RedisKeyPrefix; +import com.njcn.message.constant.RedisKeyPrefix; import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.redis.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; @@ -46,13 +48,13 @@ public class WebSocketServer { } -// private static LineFeignClient lineFeignClient; -// -// // 注入的时候,给类的 service 注入 -// @Autowired -// public void setLineFeignClient(LineFeignClient lineFeignClient) { -// WebSocketServer.lineFeignClient = lineFeignClient; -// } + private static DeviceFeignClient deviceFeignClient; + + // 注入的时候,给类的 service 注入 + @Autowired + public void setLineFeignClient(DeviceFeignClient deviceFeignClient) { + WebSocketServer.deviceFeignClient = deviceFeignClient; + } /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 */ @@ -126,9 +128,10 @@ public class WebSocketServer { // LineDetailDataVO data = lineFeignClient.getLineDetailData(split[1]).getData(); askRealDataMessage.setDevSeries(split[2]); BaseMessage baseMessage = new BaseMessage(); + DeviceDTO data = deviceFeignClient.getDeviceInfo(split[2]).getData(); baseMessage.setMessageBody(JSONObject.toJSONString(askRealDataMessage)); // 发送消息到topic1 - askRealDataMessaggeTemplate.sendMember(baseMessage); + askRealDataMessaggeTemplate.sendMember(baseMessage,data.getNodeId()); redisUtil.saveByKeyWithExpire(RedisKeyPrefix.ASK_REAL_DATA.concat(split[1]),RedisKeyPrefix.ASK_REAL_DATA.concat(split[1]),20L); log.info("监测点连接:" + userIdAndLineIdAndDevId + ",当前在线监测点数为:" + getOnlineCount());