Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37959a4a7e | ||
| 2d07a60001 | |||
|
|
0c8b17d336 | ||
|
|
88e0d71718 | ||
|
|
e982fa960e |
@@ -247,6 +247,8 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService {
|
|||||||
long differ = DateUtil.between(date, newDate, DateUnit.MINUTE);
|
long differ = DateUtil.between(date, newDate, DateUnit.MINUTE);
|
||||||
if (online.equals(type)) {
|
if (online.equals(type)) {
|
||||||
minute = InfluxDBPublicParam.DAY_MINUTE - (int) differ;
|
minute = InfluxDBPublicParam.DAY_MINUTE - (int) differ;
|
||||||
|
} else {
|
||||||
|
minute = (int) differ;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
List<PqsCommunicateDto> communicateDataOld = communicateFeignClient.getRawDataEnd(lineParam).getData();
|
List<PqsCommunicateDto> communicateDataOld = communicateFeignClient.getRawDataEnd(lineParam).getData();
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ public class DataVDTO {
|
|||||||
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
|
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
|
||||||
@JsonSerialize(using = LocalDateTimeSerializer.class)
|
@JsonSerialize(using = LocalDateTimeSerializer.class)
|
||||||
private LocalDateTime timeid;
|
private LocalDateTime timeid;
|
||||||
|
private String devId;
|
||||||
private String lineid;
|
private String lineid;
|
||||||
private String phasicType;
|
private String phasicType;
|
||||||
private Double rms;
|
private Double rms;
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ public class PqsCommunicateDto {
|
|||||||
|
|
||||||
private String description;
|
private String description;
|
||||||
|
|
||||||
|
//1是在线 0是离线
|
||||||
private Integer type;
|
private Integer type;
|
||||||
//是否更新updateTime标志;数据上送更新1,状态翻转不更新0
|
//是否更新updateTime标志;数据上送更新1,状态翻转不更新0
|
||||||
private Integer flag=0;
|
private Integer flag=0;
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import org.springframework.util.CollectionUtils;
|
|||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -84,16 +85,18 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
//更新mysqldevice表最新数据时间
|
//更新mysqldevice表最新数据时间
|
||||||
|
|
||||||
if(!CollectionUtils.isEmpty(lnDataDTO.getDataVList())){
|
if(!CollectionUtils.isEmpty(lnDataDTO.getDataVList())){
|
||||||
DataVDTO dataVDTO =lnDataDTO.getDataVList().stream().max(Comparator.comparing(DataVDTO::getTimeid)).get();
|
|
||||||
List<String> lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList());
|
List<String> lineIdList = lnDataDTO.getDataVList().stream().map(DataVDTO::getLineid).distinct().collect(Collectors.toList());
|
||||||
List<LineDeviceStateVO> data = lineFeignClient.getAllLine(lineIdList).getData();
|
List<LineDeviceStateVO> data = lineFeignClient.getAllLine(lineIdList).getData();
|
||||||
//获取所有设备id并去重
|
Map<String, String> map = data.stream().collect(Collectors.toMap(LineDeviceStateVO::getId, temp -> temp.getPids().split(",")[4]));
|
||||||
List<String> devIds = data.stream().map(temp -> {
|
lnDataDTO.getDataVList().forEach(temp->{
|
||||||
return temp.getPids().split(",")[4];
|
temp.setDevId(map.get(temp.getLineid()));
|
||||||
}).distinct().collect(Collectors.toList());
|
});
|
||||||
|
Map<String, List<DataVDTO>> collect = lnDataDTO.getDataVList().stream().collect(Collectors.groupingBy(DataVDTO::getDevId));
|
||||||
devIds.forEach(temp->{
|
collect.forEach((temp,dataVDTOList)->{
|
||||||
PqsCommunicateDto pqsCommunicateDto = new PqsCommunicateDto();
|
PqsCommunicateDto pqsCommunicateDto = new PqsCommunicateDto();
|
||||||
|
DataVDTO dataVDTO =dataVDTOList.stream().max(Comparator.comparing(DataVDTO::getTimeid)).get();
|
||||||
|
|
||||||
pqsCommunicateDto.setTime(LocalDateTimeUtil.format(dataVDTO.getTimeid(), DatePattern.NORM_DATETIME_PATTERN));
|
pqsCommunicateDto.setTime(LocalDateTimeUtil.format(dataVDTO.getTimeid(), DatePattern.NORM_DATETIME_PATTERN));
|
||||||
pqsCommunicateDto.setDevId(temp);
|
pqsCommunicateDto.setDevId(temp);
|
||||||
pqsCommunicateDto.setType(1);
|
pqsCommunicateDto.setType(1);
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.common.LogEnum;
|
|||||||
import com.njcn.common.pojo.response.HttpResult;
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
|
|
||||||
import com.njcn.message.api.fallback.ProduceFeignClientFallbackFactory;
|
import com.njcn.message.api.fallback.ProduceFeignClientFallbackFactory;
|
||||||
|
import com.njcn.message.message.AskFileSysMessage;
|
||||||
import com.njcn.message.message.DeviceRebootMessage;
|
import com.njcn.message.message.DeviceRebootMessage;
|
||||||
import com.njcn.message.message.ProcessRebootMessage;
|
import com.njcn.message.message.ProcessRebootMessage;
|
||||||
import com.njcn.message.message.RecallMessage;
|
import com.njcn.message.message.RecallMessage;
|
||||||
@@ -38,4 +39,7 @@ public interface ProduceFeignClient {
|
|||||||
|
|
||||||
public HttpResult<Boolean> askRestartProcess(@RequestBody ProcessRebootMessage message);
|
public HttpResult<Boolean> askRestartProcess(@RequestBody ProcessRebootMessage message);
|
||||||
|
|
||||||
|
@PostMapping("/askFileSys")
|
||||||
|
public HttpResult<String> askFileSys(@RequestBody AskFileSysMessage message);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException;
|
|||||||
import com.njcn.common.pojo.response.HttpResult;
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
|
|
||||||
import com.njcn.message.api.ProduceFeignClient;
|
import com.njcn.message.api.ProduceFeignClient;
|
||||||
|
import com.njcn.message.message.AskFileSysMessage;
|
||||||
import com.njcn.message.message.DeviceRebootMessage;
|
import com.njcn.message.message.DeviceRebootMessage;
|
||||||
import com.njcn.message.message.ProcessRebootMessage;
|
import com.njcn.message.message.ProcessRebootMessage;
|
||||||
import com.njcn.message.message.RecallMessage;
|
import com.njcn.message.message.RecallMessage;
|
||||||
@@ -53,6 +54,12 @@ public class ProduceFeignClientFallbackFactory implements FallbackFactory<Produc
|
|||||||
log.error("{}异常,降级处理,异常为:{}", "进程重启", throwable.toString());
|
log.error("{}异常,降级处理,异常为:{}", "进程重启", throwable.toString());
|
||||||
throw new BusinessException(finalExceptionEnum);
|
throw new BusinessException(finalExceptionEnum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpResult<String> askFileSys(AskFileSysMessage message) {
|
||||||
|
log.error("{}异常,降级处理,异常为:{}", "请求装置文件系统", throwable.toString());
|
||||||
|
throw new BusinessException(finalExceptionEnum);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,9 @@ public interface BusinessTopic {
|
|||||||
String ASK_REAL_DATA_TOPIC = "ask_real_data_topic";
|
String ASK_REAL_DATA_TOPIC = "ask_real_data_topic";
|
||||||
String RECALL_TOPIC = "recall_Topic";
|
String RECALL_TOPIC = "recall_Topic";
|
||||||
|
|
||||||
|
String FILE_TOPIC = "File_Topic";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/********************************数据中心*********************************/
|
/********************************数据中心*********************************/
|
||||||
|
|||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package com.njcn.message.message;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/12/13 10:15【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class AskFileSysMessage {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* guid : sdghsfdhfdhdfhdfghd234234534534
|
||||||
|
* frontid : dhdfhdfghd2342
|
||||||
|
* ProcessNo : 1
|
||||||
|
* devid : 167456737637374567
|
||||||
|
* type : 2
|
||||||
|
* Path : /remote/vol1_stat.txt
|
||||||
|
* DevPath : /etc/vol1_stat.txt
|
||||||
|
*/
|
||||||
|
|
||||||
|
private String guid;
|
||||||
|
private String nodeId;
|
||||||
|
private int processNo;
|
||||||
|
private String devId;
|
||||||
|
private int type;
|
||||||
|
private String path;
|
||||||
|
private String remotePath;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
package com.njcn.message.messagedto;
|
||||||
|
|
||||||
|
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2026/04/29 下午 3:16【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class FileSysDTO<T> extends BaseMessage implements Serializable {
|
||||||
|
private String guid;
|
||||||
|
private String nodeId;
|
||||||
|
private int processNo;
|
||||||
|
private String devId;
|
||||||
|
private int type;
|
||||||
|
private String path;
|
||||||
|
private String devPath;
|
||||||
|
private int result;
|
||||||
|
private T detail;
|
||||||
|
}
|
||||||
@@ -33,7 +33,7 @@ import java.util.Objects;
|
|||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = "Device_Run_Flag_Topic",
|
topic = "Device_Run_Flag_Topic",
|
||||||
consumerGroup = "Device_Run_Flag_Consumer",
|
consumerGroup = "Device_Run_Flag_Consumer",
|
||||||
selectorExpression = "Test_Tag||Test_Keys",
|
selectorExpression = "*",
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -0,0 +1,151 @@
|
|||||||
|
package com.njcn.message.consumer;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.njcn.message.constant.MessageStatus;
|
||||||
|
import com.njcn.message.constant.RedisKeyPrefix;
|
||||||
|
import com.njcn.message.messagedto.FileSysDTO;
|
||||||
|
import com.njcn.message.messagedto.MessageDataDTO;
|
||||||
|
import com.njcn.message.websocket.WebSocketServer;
|
||||||
|
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||||
|
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||||
|
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||||
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/12/13 10:06【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@RocketMQMessageListener(
|
||||||
|
topic = "File_Reply_Topic",
|
||||||
|
consumerGroup = "file_sys_consumer",
|
||||||
|
consumeThreadNumber = 10,
|
||||||
|
enableMsgTrace = true
|
||||||
|
)
|
||||||
|
@Slf4j
|
||||||
|
public class FileSysDataConsumer extends EnhanceConsumerMessageHandler<FileSysDTO> implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisUtil redisUtil;
|
||||||
|
@Resource
|
||||||
|
private StringRedisTemplate stringRedisTemplate;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||||
|
@Override
|
||||||
|
public void onMessage(String message) {
|
||||||
|
|
||||||
|
FileSysDTO messageDataDTO = JSONObject.parseObject(message, FileSysDTO.class);
|
||||||
|
super.dispatchMessage(messageDataDTO);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 通过redis分布式锁判断当前消息所处状态
|
||||||
|
* 1、null 查不到该key的数据,属于第一次消费,放行
|
||||||
|
* 2、fail 上次消息消费时发生异常,放行
|
||||||
|
* 3、being processed 正在处理,打回去
|
||||||
|
* 4、success 最近72小时消费成功,避免重复消费,打回去
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean filter(FileSysDTO message) {
|
||||||
|
String keyStatus = redisUtil.getStringByKey(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()));
|
||||||
|
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||||
|
|
||||||
|
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* 消费成功,缓存到redis5分钟,避免重复消费
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void consumeSuccess(FileSysDTO message) {
|
||||||
|
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void handleMessage(FileSysDTO message) {
|
||||||
|
|
||||||
|
String msgId = message.getGuid();
|
||||||
|
// 1. 将结果存入 Redis,过期时间 60 秒(足够完成请求)
|
||||||
|
String key = "pending:" + msgId;
|
||||||
|
redisUtil.saveByKeyWithExpire(key, message, 60L);
|
||||||
|
// 2. 发布 Redis 通知,让等待的实例知道结果已就绪
|
||||||
|
stringRedisTemplate.convertAndSend("result_ready_channel", msgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发生异常时,进行错误信息入库保存
|
||||||
|
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void saveExceptionMsgLog(FileSysDTO message, String identity, Exception exception) {
|
||||||
|
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
|
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||||
|
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||||
|
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||||
|
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
|
||||||
|
//数据库字段配置长度200,避免插入失败,大致分析异常原因
|
||||||
|
String exceptionMsg = exception.getMessage();
|
||||||
|
if(exceptionMsg.length() > 200){
|
||||||
|
exceptionMsg = exceptionMsg.substring(0,180);
|
||||||
|
}
|
||||||
|
rocketmqMsgErrorLog.setRecord(exceptionMsg);
|
||||||
|
//如果是当前消息重试的则略过
|
||||||
|
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
|
||||||
|
//单次消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
|
||||||
|
//重试N次后,依然消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 处理失败后,是否重试
|
||||||
|
* 一般开启
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean isRetry() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 消费失败是否抛出异常,抛出异常后就不再消费了
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean throwException() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = "LN_Topic",
|
topic = "LN_Topic",
|
||||||
consumerGroup = "ln_consumer",
|
consumerGroup = "ln_consumer",
|
||||||
selectorExpression = "Test_Tag||Test_Keys",
|
selectorExpression = "*",
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ import java.util.Objects;
|
|||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = "Heart_Beat_Topic",
|
topic = "Heart_Beat_Topic",
|
||||||
consumerGroup = "Heartb_Beat_Topic_Consumer",
|
consumerGroup = "Heartb_Beat_Topic_Consumer",
|
||||||
selectorExpression = "Test_Tag||Test_Keys",
|
selectorExpression = "*",
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ import java.util.Objects;
|
|||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = "Real_Time_Data_Topic",
|
topic = "Real_Time_Data_Topic",
|
||||||
consumerGroup = "real_time_consumer",
|
consumerGroup = "real_time_consumer",
|
||||||
selectorExpression = "Test_Tag||Test_Keys",
|
selectorExpression = "*",
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ import java.util.Objects;
|
|||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = "log_Topic",
|
topic = "log_Topic",
|
||||||
consumerGroup = "Log_Topic_Consumer",
|
consumerGroup = "Log_Topic_Consumer",
|
||||||
selectorExpression = "Test_Tag||Test_Keys",
|
selectorExpression = "*",
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ import java.util.Objects;
|
|||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = "Topic_Reply_Topic",
|
topic = "Topic_Reply_Topic",
|
||||||
consumerGroup = "Topic_Reply_Topic_Consumer",
|
consumerGroup = "Topic_Reply_Topic_Consumer",
|
||||||
selectorExpression = "Test_Tag||Test_Keys",
|
selectorExpression = "*",
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -7,9 +7,11 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
|||||||
import com.njcn.common.pojo.response.HttpResult;
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
import com.njcn.common.utils.HttpResultUtil;
|
import com.njcn.common.utils.HttpResultUtil;
|
||||||
|
|
||||||
|
import com.njcn.message.message.AskFileSysMessage;
|
||||||
import com.njcn.message.message.DeviceRebootMessage;
|
import com.njcn.message.message.DeviceRebootMessage;
|
||||||
import com.njcn.message.message.ProcessRebootMessage;
|
import com.njcn.message.message.ProcessRebootMessage;
|
||||||
import com.njcn.message.message.RecallMessage;
|
import com.njcn.message.message.RecallMessage;
|
||||||
|
import com.njcn.message.produce.template.AskFileSysMessaggeTemplate;
|
||||||
import com.njcn.message.produce.template.DeviceRebootMessageTemplate;
|
import com.njcn.message.produce.template.DeviceRebootMessageTemplate;
|
||||||
import com.njcn.message.produce.template.ProcessRebootMessageTemplate;
|
import com.njcn.message.produce.template.ProcessRebootMessageTemplate;
|
||||||
import com.njcn.message.produce.template.RecallMessaggeTemplate;
|
import com.njcn.message.produce.template.RecallMessaggeTemplate;
|
||||||
@@ -40,6 +42,7 @@ public class ProduceController extends BaseController {
|
|||||||
|
|
||||||
private final ProcessRebootMessageTemplate processRebootMessageTemplate;
|
private final ProcessRebootMessageTemplate processRebootMessageTemplate;
|
||||||
|
|
||||||
|
private final AskFileSysMessaggeTemplate askFileSysMessaggeTemplate;
|
||||||
|
|
||||||
@PostMapping("/recall")
|
@PostMapping("/recall")
|
||||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
@@ -76,4 +79,14 @@ public class ProduceController extends BaseController {
|
|||||||
processRebootMessageTemplate.sendMember(baseMessage,message.getNodeId());
|
processRebootMessageTemplate.sendMember(baseMessage,message.getNodeId());
|
||||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping("/askFileSys")
|
||||||
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
|
@ApiOperation("请求文件系统")
|
||||||
|
@ApiImplicitParam(name = "message", value = "参数", required = true)
|
||||||
|
public HttpResult<String> askFileSys(@RequestBody AskFileSysMessage message){
|
||||||
|
String methodDescribe = getMethodDescribe("askFileSys");
|
||||||
|
SendResult sendResult = askFileSysMessaggeTemplate.sendMember(message);
|
||||||
|
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS,message.getGuid() , methodDescribe);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package com.njcn.message.produce.template;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.njcn.message.constant.BusinessResource;
|
||||||
|
import com.njcn.message.constant.BusinessTopic;
|
||||||
|
import com.njcn.message.message.AskFileSysMessage;
|
||||||
|
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||||
|
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/12/13 15:15【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class AskFileSysMessaggeTemplate extends RocketMQEnhanceTemplate {
|
||||||
|
public AskFileSysMessaggeTemplate(RocketMQTemplate template) {
|
||||||
|
super(template);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SendResult sendMember(AskFileSysMessage askRealDataMessage) {
|
||||||
|
BaseMessage baseMessage = new BaseMessage();
|
||||||
|
baseMessage.setMessageBody(JSONObject.toJSONString(askRealDataMessage));
|
||||||
|
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
|
||||||
|
baseMessage.setKey(askRealDataMessage.getGuid());
|
||||||
|
return send(BusinessTopic.FILE_TOPIC,askRealDataMessage.getNodeId() , baseMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
package com.njcn.message.produce.template;
|
package com.njcn.message.produce.template;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.njcn.message.constant.BusinessResource;
|
import com.njcn.message.constant.BusinessResource;
|
||||||
import com.njcn.message.constant.BusinessTopic;
|
import com.njcn.message.constant.BusinessTopic;
|
||||||
|
import com.njcn.message.message.AskRealDataMessage;
|
||||||
import com.njcn.middle.rocket.domain.BaseMessage;
|
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||||
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
@@ -24,7 +26,8 @@ public class AskRealDataMessaggeTemplate extends RocketMQEnhanceTemplate {
|
|||||||
|
|
||||||
public SendResult sendMember(BaseMessage askRealDataMessage,String nodeId) {
|
public SendResult sendMember(BaseMessage askRealDataMessage,String nodeId) {
|
||||||
askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE);
|
askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE);
|
||||||
askRealDataMessage.setKey("Test_Keys");
|
AskRealDataMessage dto = JSON.parseObject(askRealDataMessage.getMessageBody(), AskRealDataMessage.class);
|
||||||
return send(nodeId+"_"+BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage);
|
askRealDataMessage.setKey(dto.getLine());
|
||||||
|
return send(BusinessTopic.ASK_REAL_DATA_TOPIC,nodeId , askRealDataMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
package com.njcn.message.produce.template;
|
package com.njcn.message.produce.template;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.njcn.message.constant.BusinessResource;
|
import com.njcn.message.constant.BusinessResource;
|
||||||
import com.njcn.message.constant.BusinessTopic;
|
import com.njcn.message.constant.BusinessTopic;
|
||||||
|
import com.njcn.message.message.AskRealDataMessage;
|
||||||
|
import com.njcn.message.message.DeviceRebootMessage;
|
||||||
import com.njcn.middle.rocket.domain.BaseMessage;
|
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||||
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
@@ -24,7 +27,6 @@ public class DeviceRebootMessageTemplate extends RocketMQEnhanceTemplate {
|
|||||||
|
|
||||||
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
|
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
|
||||||
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
|
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
|
||||||
baseMessage.setKey("Test_Keys");
|
return send(BusinessTopic.CONTROL_TOPIC,nodeId, baseMessage);
|
||||||
return send(nodeId+"_"+BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
package com.njcn.message.produce.template;
|
package com.njcn.message.produce.template;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.njcn.message.constant.BusinessResource;
|
import com.njcn.message.constant.BusinessResource;
|
||||||
import com.njcn.message.constant.BusinessTopic;
|
import com.njcn.message.constant.BusinessTopic;
|
||||||
|
import com.njcn.message.message.AskRealDataMessage;
|
||||||
|
import com.njcn.message.message.ProcessRebootMessage;
|
||||||
import com.njcn.middle.rocket.domain.BaseMessage;
|
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||||
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
@@ -24,7 +27,8 @@ public class ProcessRebootMessageTemplate extends RocketMQEnhanceTemplate {
|
|||||||
|
|
||||||
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
|
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
|
||||||
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
|
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
|
||||||
baseMessage.setKey("Test_Keys");
|
ProcessRebootMessage dto = JSON.parseObject(baseMessage.getMessageBody(), ProcessRebootMessage.class);
|
||||||
return send(nodeId+"_"+BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage);
|
baseMessage.setKey(dto.getIndex()+"");
|
||||||
|
return send(BusinessTopic.PROCESS_TOPIC,nodeId, baseMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ public class RecallMessaggeTemplate extends RocketMQEnhanceTemplate {
|
|||||||
|
|
||||||
public SendResult sendMember(BaseMessage recallMessage,String nodeId) {
|
public SendResult sendMember(BaseMessage recallMessage,String nodeId) {
|
||||||
recallMessage.setSource(BusinessResource.WEB_RESOURCE);
|
recallMessage.setSource(BusinessResource.WEB_RESOURCE);
|
||||||
recallMessage.setKey("Test_Keys");
|
return send(BusinessTopic.RECALL_TOPIC,nodeId , recallMessage);
|
||||||
return send(nodeId+"_"+BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,16 +19,13 @@ spring:
|
|||||||
cloud:
|
cloud:
|
||||||
nacos:
|
nacos:
|
||||||
discovery:
|
discovery:
|
||||||
ip: @service.server.url@
|
|
||||||
server-addr: @nacos.url@
|
server-addr: @nacos.url@
|
||||||
namespace: @nacos.namespace@
|
namespace: @nacos.namespace@
|
||||||
username: @nacos.username@
|
|
||||||
password: @nacos.password@
|
|
||||||
config:
|
config:
|
||||||
server-addr: @nacos.url@
|
|
||||||
namespace: @nacos.namespace@
|
|
||||||
username: @nacos.username@
|
username: @nacos.username@
|
||||||
password: @nacos.password@
|
password: @nacos.password@
|
||||||
|
server-addr: @nacos.url@
|
||||||
|
namespace: @nacos.namespace@
|
||||||
file-extension: yaml
|
file-extension: yaml
|
||||||
shared-configs:
|
shared-configs:
|
||||||
- data-id: share-config.yaml
|
- data-id: share-config.yaml
|
||||||
|
|||||||
Reference in New Issue
Block a user