From e982fa960e2a3f6e2a3a63b6410a4422f0ec85c7 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Sat, 9 May 2026 08:43:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=80=E5=8C=97=E7=89=88=E6=9C=AC=E9=80=82?= =?UTF-8?q?=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/message/consumer/DeviceRunFlagDataConsumer.java | 2 +- .../java/com/njcn/message/consumer/FrontDataConsumer.java | 2 +- .../com/njcn/message/consumer/FrontHeartBeatConsumer.java | 2 +- .../com/njcn/message/consumer/RealTimeDataConsumer.java | 2 +- .../java/com/njcn/message/consumer/TopicLogsConsumer.java | 2 +- .../com/njcn/message/consumer/TopicReplyConsumer.java | 2 +- .../produce/template/AskRealDataMessaggeTemplate.java | 7 +++++-- .../produce/template/DeviceRebootMessageTemplate.java | 6 ++++-- .../produce/template/ProcessRebootMessageTemplate.java | 8 ++++++-- .../message/produce/template/RecallMessaggeTemplate.java | 3 +-- 10 files changed, 22 insertions(+), 14 deletions(-) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java index 755432a..d553e1d 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java @@ -33,7 +33,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Device_Run_Flag_Topic", consumerGroup = "Device_Run_Flag_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java index a60a78c..335b9b1 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit; @RocketMQMessageListener( topic = "LN_Topic", consumerGroup = "ln_consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java index 3b8724b..0fe447f 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java @@ -33,7 +33,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Heart_Beat_Topic", consumerGroup = "Heartb_Beat_Topic_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java index a6036fe..2d09140 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java @@ -31,7 +31,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Real_Time_Data_Topic", consumerGroup = "real_time_consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java index 54194f0..560d6b7 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java @@ -33,7 +33,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "log_Topic", consumerGroup = "Log_Topic_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java index 7e3715b..f01c0d9 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java @@ -32,7 +32,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Topic_Reply_Topic", consumerGroup = "Topic_Reply_Topic_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java index 893330d..18745af 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java @@ -1,8 +1,10 @@ package com.njcn.message.produce.template; +import com.alibaba.fastjson.JSON; import com.njcn.message.constant.BusinessResource; import com.njcn.message.constant.BusinessTopic; +import com.njcn.message.message.AskRealDataMessage; import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; import org.apache.rocketmq.client.producer.SendResult; @@ -24,7 +26,8 @@ public class AskRealDataMessaggeTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage askRealDataMessage,String nodeId) { askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE); - askRealDataMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage); + AskRealDataMessage dto = JSON.parseObject(askRealDataMessage.getMessageBody(), AskRealDataMessage.class); + askRealDataMessage.setKey(dto.getLine()); + return send(BusinessTopic.ASK_REAL_DATA_TOPIC,nodeId , askRealDataMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java index f0dd9c1..d5b9338 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java @@ -1,7 +1,10 @@ package com.njcn.message.produce.template; +import com.alibaba.fastjson.JSON; import com.njcn.message.constant.BusinessResource; import com.njcn.message.constant.BusinessTopic; +import com.njcn.message.message.AskRealDataMessage; +import com.njcn.message.message.DeviceRebootMessage; import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; import org.apache.rocketmq.client.producer.SendResult; @@ -24,7 +27,6 @@ public class DeviceRebootMessageTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage baseMessage,String nodeId) { baseMessage.setSource(BusinessResource.WEB_RESOURCE); - baseMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage); + return send(BusinessTopic.CONTROL_TOPIC,nodeId, baseMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java index 155c510..8645108 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java @@ -1,7 +1,10 @@ package com.njcn.message.produce.template; +import com.alibaba.fastjson.JSON; import com.njcn.message.constant.BusinessResource; import com.njcn.message.constant.BusinessTopic; +import com.njcn.message.message.AskRealDataMessage; +import com.njcn.message.message.ProcessRebootMessage; import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; import org.apache.rocketmq.client.producer.SendResult; @@ -24,7 +27,8 @@ public class ProcessRebootMessageTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage baseMessage,String nodeId) { baseMessage.setSource(BusinessResource.WEB_RESOURCE); - baseMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage); + ProcessRebootMessage dto = JSON.parseObject(baseMessage.getMessageBody(), ProcessRebootMessage.class); + baseMessage.setKey(dto.getIndex()+""); + return send(BusinessTopic.PROCESS_TOPIC,nodeId, baseMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java index 34df5ee..f05d3d4 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java @@ -24,7 +24,6 @@ public class RecallMessaggeTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage recallMessage,String nodeId) { recallMessage.setSource(BusinessResource.WEB_RESOURCE); - recallMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage); + return send(BusinessTopic.RECALL_TOPIC,nodeId , recallMessage); } }