4 Commits

Author SHA1 Message Date
wr
37959a4a7e 微调 2026-05-27 10:56:01 +08:00
2d07a60001 Merge pull request 'master-temp' (#2) from master-temp into master-jb
Reviewed-on: #2
2026-05-15 09:19:38 +08:00
hzj
0c8b17d336 装置文件系统管理 2026-05-15 09:12:46 +08:00
hzj
e982fa960e 冀北版本适配 2026-05-09 08:43:57 +08:00
21 changed files with 306 additions and 24 deletions

View File

@@ -85,7 +85,7 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService {
Map<String, List<LineDevGetDTO>> LineDevGetMap = devList.stream()
.filter(x -> devIdList.contains(x.getDevId()))
.collect(Collectors.groupingBy(LineDevGetDTO::getDevId));
if (ObjectUtil.isNotNull(calculatedParam.getType())&&calculatedParam.getType() == 1) {
if (ObjectUtil.isNotNull(calculatedParam.getType()) && calculatedParam.getType() == 1) {
LineDevGetMap.forEach((key, value) -> {
DataOnlineRateDto.Detail onlineRateDpo = new DataOnlineRateDto.Detail();
List<String> collect = value.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
@@ -130,9 +130,9 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService {
List<String> lineIds = value.stream().map(LineDevGetDTO::getPointId).collect(Collectors.toList());
lineParam.setLineId(lineIds);
List<DataIntegrityDto> integrityDS = dataIntegrityFeignClient.getRawData(lineParam).getData();
Double max=0.0;
if(CollUtil.isNotEmpty(integrityDS)){
List<Double> info=new ArrayList<>();
Double max = 0.0;
if (CollUtil.isNotEmpty(integrityDS)) {
List<Double> info = new ArrayList<>();
for (DataIntegrityDto integrityD : integrityDS) {
double realTime = integrityD.getRealTime();
double dueTime = integrityD.getDueTime();
@@ -144,7 +144,7 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService {
}
max = info.stream().max(Comparator.naturalOrder()).orElse(0.0);
}
int v = (int)Math.ceil(InfluxDBPublicParam.DAY_MINUTE * max);
int v = (int) Math.ceil(InfluxDBPublicParam.DAY_MINUTE * max);
onlineRateDpo.setOnlineMin(v);
onlineRateDpo.setOfflineMin(InfluxDBPublicParam.DAY_MINUTE - v);
list.add(onlineRateDpo);
@@ -247,6 +247,8 @@ public class IDataOnlineRateServiceImpl implements IDataOnlineRateService {
long differ = DateUtil.between(date, newDate, DateUnit.MINUTE);
if (online.equals(type)) {
minute = InfluxDBPublicParam.DAY_MINUTE - (int) differ;
} else {
minute = (int) differ;
}
} else {
List<PqsCommunicateDto> communicateDataOld = communicateFeignClient.getRawDataEnd(lineParam).getData();

View File

@@ -17,6 +17,7 @@ public class PqsCommunicateDto {
private String description;
//1是在线 0是离线
private Integer type;
//是否更新updateTime标志数据上送更新1状态翻转不更新0
private Integer flag=0;

View File

@@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.message.api.fallback.ProduceFeignClientFallbackFactory;
import com.njcn.message.message.AskFileSysMessage;
import com.njcn.message.message.DeviceRebootMessage;
import com.njcn.message.message.ProcessRebootMessage;
import com.njcn.message.message.RecallMessage;
@@ -38,4 +39,7 @@ public interface ProduceFeignClient {
public HttpResult<Boolean> askRestartProcess(@RequestBody ProcessRebootMessage message);
@PostMapping("/askFileSys")
public HttpResult<String> askFileSys(@RequestBody AskFileSysMessage message);
}

View File

@@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.message.api.ProduceFeignClient;
import com.njcn.message.message.AskFileSysMessage;
import com.njcn.message.message.DeviceRebootMessage;
import com.njcn.message.message.ProcessRebootMessage;
import com.njcn.message.message.RecallMessage;
@@ -53,6 +54,12 @@ public class ProduceFeignClientFallbackFactory implements FallbackFactory<Produc
log.error("{}异常,降级处理,异常为:{}", "进程重启", throwable.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<String> askFileSys(AskFileSysMessage message) {
log.error("{}异常,降级处理,异常为:{}", "请求装置文件系统", throwable.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -37,6 +37,9 @@ public interface BusinessTopic {
String ASK_REAL_DATA_TOPIC = "ask_real_data_topic";
String RECALL_TOPIC = "recall_Topic";
String FILE_TOPIC = "File_Topic";
/********************************数据中心*********************************/

View File

@@ -0,0 +1,36 @@
package com.njcn.message.message;
import lombok.Data;
/**
* Description:
* Date: 2024/12/13 10:15【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class AskFileSysMessage {
/**
* guid : sdghsfdhfdhdfhdfghd234234534534
* frontid : dhdfhdfghd2342
* ProcessNo : 1
* devid : 167456737637374567
* type : 2
* Path : /remote/vol1_stat.txt
* DevPath : /etc/vol1_stat.txt
*/
private String guid;
private String nodeId;
private int processNo;
private String devId;
private int type;
private String path;
private String remotePath;
}

View File

@@ -0,0 +1,26 @@
package com.njcn.message.messagedto;
import com.njcn.middle.rocket.domain.BaseMessage;
import lombok.Data;
import java.io.Serializable;
/**
* Description:
* Date: 2026/04/29 下午 3:16【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class FileSysDTO<T> extends BaseMessage implements Serializable {
private String guid;
private String nodeId;
private int processNo;
private String devId;
private int type;
private String path;
private String devPath;
private int result;
private T detail;
}

View File

@@ -33,7 +33,7 @@ import java.util.Objects;
@RocketMQMessageListener(
topic = "Device_Run_Flag_Topic",
consumerGroup = "Device_Run_Flag_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
selectorExpression = "*",
consumeThreadNumber = 10,
enableMsgTrace = true
)

View File

@@ -0,0 +1,151 @@
package com.njcn.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.constant.RedisKeyPrefix;
import com.njcn.message.messagedto.FileSysDTO;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.message.websocket.WebSocketServer;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Description:
* Date: 2024/12/13 10:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@RocketMQMessageListener(
topic = "File_Reply_Topic",
consumerGroup = "file_sys_consumer",
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class FileSysDataConsumer extends EnhanceConsumerMessageHandler<FileSysDTO> implements RocketMQListener<String> {
@Resource
private RedisUtil redisUtil;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Override
public void onMessage(String message) {
FileSysDTO messageDataDTO = JSONObject.parseObject(message, FileSysDTO.class);
super.dispatchMessage(messageDataDTO);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(FileSysDTO message) {
String keyStatus = redisUtil.getStringByKey(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis5分钟避免重复消费
*/
@Override
protected void consumeSuccess(FileSysDTO message) {
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
}
@Override
protected void handleMessage(FileSysDTO message) {
String msgId = message.getGuid();
// 1. 将结果存入 Redis过期时间 60 秒(足够完成请求)
String key = "pending:" + msgId;
redisUtil.saveByKeyWithExpire(key, message, 60L);
// 2. 发布 Redis 通知,让等待的实例知道结果已就绪
stringRedisTemplate.convertAndSend("result_ready_channel", msgId);
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(FileSysDTO message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
}

View File

@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
@RocketMQMessageListener(
topic = "LN_Topic",
consumerGroup = "ln_consumer",
selectorExpression = "Test_Tag||Test_Keys",
selectorExpression = "*",
consumeThreadNumber = 10,
enableMsgTrace = true
)

View File

@@ -33,7 +33,7 @@ import java.util.Objects;
@RocketMQMessageListener(
topic = "Heart_Beat_Topic",
consumerGroup = "Heartb_Beat_Topic_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
selectorExpression = "*",
consumeThreadNumber = 10,
enableMsgTrace = true
)

View File

@@ -31,7 +31,7 @@ import java.util.Objects;
@RocketMQMessageListener(
topic = "Real_Time_Data_Topic",
consumerGroup = "real_time_consumer",
selectorExpression = "Test_Tag||Test_Keys",
selectorExpression = "*",
consumeThreadNumber = 10,
enableMsgTrace = true
)

View File

@@ -33,7 +33,7 @@ import java.util.Objects;
@RocketMQMessageListener(
topic = "log_Topic",
consumerGroup = "Log_Topic_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
selectorExpression = "*",
consumeThreadNumber = 10,
enableMsgTrace = true
)

View File

@@ -32,7 +32,7 @@ import java.util.Objects;
@RocketMQMessageListener(
topic = "Topic_Reply_Topic",
consumerGroup = "Topic_Reply_Topic_Consumer",
selectorExpression = "Test_Tag||Test_Keys",
selectorExpression = "*",
consumeThreadNumber = 10,
enableMsgTrace = true
)

View File

@@ -7,9 +7,11 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.message.message.AskFileSysMessage;
import com.njcn.message.message.DeviceRebootMessage;
import com.njcn.message.message.ProcessRebootMessage;
import com.njcn.message.message.RecallMessage;
import com.njcn.message.produce.template.AskFileSysMessaggeTemplate;
import com.njcn.message.produce.template.DeviceRebootMessageTemplate;
import com.njcn.message.produce.template.ProcessRebootMessageTemplate;
import com.njcn.message.produce.template.RecallMessaggeTemplate;
@@ -40,6 +42,7 @@ public class ProduceController extends BaseController {
private final ProcessRebootMessageTemplate processRebootMessageTemplate;
private final AskFileSysMessaggeTemplate askFileSysMessaggeTemplate;
@PostMapping("/recall")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@@ -76,4 +79,14 @@ public class ProduceController extends BaseController {
processRebootMessageTemplate.sendMember(baseMessage,message.getNodeId());
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@PostMapping("/askFileSys")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("请求文件系统")
@ApiImplicitParam(name = "message", value = "参数", required = true)
public HttpResult<String> askFileSys(@RequestBody AskFileSysMessage message){
String methodDescribe = getMethodDescribe("askFileSys");
SendResult sendResult = askFileSysMessaggeTemplate.sendMember(message);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS,message.getGuid() , methodDescribe);
}
}

View File

@@ -0,0 +1,34 @@
package com.njcn.message.produce.template;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.BusinessResource;
import com.njcn.message.constant.BusinessTopic;
import com.njcn.message.message.AskFileSysMessage;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
/**
* Description:
* Date: 2024/12/13 15:15【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
public class AskFileSysMessaggeTemplate extends RocketMQEnhanceTemplate {
public AskFileSysMessaggeTemplate(RocketMQTemplate template) {
super(template);
}
public SendResult sendMember(AskFileSysMessage askRealDataMessage) {
BaseMessage baseMessage = new BaseMessage();
baseMessage.setMessageBody(JSONObject.toJSONString(askRealDataMessage));
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
baseMessage.setKey(askRealDataMessage.getGuid());
return send(BusinessTopic.FILE_TOPIC,askRealDataMessage.getNodeId() , baseMessage);
}
}

View File

@@ -1,8 +1,10 @@
package com.njcn.message.produce.template;
import com.alibaba.fastjson.JSON;
import com.njcn.message.constant.BusinessResource;
import com.njcn.message.constant.BusinessTopic;
import com.njcn.message.message.AskRealDataMessage;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.client.producer.SendResult;
@@ -24,7 +26,8 @@ public class AskRealDataMessaggeTemplate extends RocketMQEnhanceTemplate {
public SendResult sendMember(BaseMessage askRealDataMessage,String nodeId) {
askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE);
askRealDataMessage.setKey("Test_Keys");
return send(nodeId+"_"+BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage);
AskRealDataMessage dto = JSON.parseObject(askRealDataMessage.getMessageBody(), AskRealDataMessage.class);
askRealDataMessage.setKey(dto.getLine());
return send(BusinessTopic.ASK_REAL_DATA_TOPIC,nodeId , askRealDataMessage);
}
}

View File

@@ -1,7 +1,10 @@
package com.njcn.message.produce.template;
import com.alibaba.fastjson.JSON;
import com.njcn.message.constant.BusinessResource;
import com.njcn.message.constant.BusinessTopic;
import com.njcn.message.message.AskRealDataMessage;
import com.njcn.message.message.DeviceRebootMessage;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.client.producer.SendResult;
@@ -24,7 +27,6 @@ public class DeviceRebootMessageTemplate extends RocketMQEnhanceTemplate {
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
baseMessage.setKey("Test_Keys");
return send(nodeId+"_"+BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage);
return send(BusinessTopic.CONTROL_TOPIC,nodeId, baseMessage);
}
}

View File

@@ -1,7 +1,10 @@
package com.njcn.message.produce.template;
import com.alibaba.fastjson.JSON;
import com.njcn.message.constant.BusinessResource;
import com.njcn.message.constant.BusinessTopic;
import com.njcn.message.message.AskRealDataMessage;
import com.njcn.message.message.ProcessRebootMessage;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.client.producer.SendResult;
@@ -24,7 +27,8 @@ public class ProcessRebootMessageTemplate extends RocketMQEnhanceTemplate {
public SendResult sendMember(BaseMessage baseMessage,String nodeId) {
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
baseMessage.setKey("Test_Keys");
return send(nodeId+"_"+BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage);
ProcessRebootMessage dto = JSON.parseObject(baseMessage.getMessageBody(), ProcessRebootMessage.class);
baseMessage.setKey(dto.getIndex()+"");
return send(BusinessTopic.PROCESS_TOPIC,nodeId, baseMessage);
}
}

View File

@@ -24,7 +24,6 @@ public class RecallMessaggeTemplate extends RocketMQEnhanceTemplate {
public SendResult sendMember(BaseMessage recallMessage,String nodeId) {
recallMessage.setSource(BusinessResource.WEB_RESOURCE);
recallMessage.setKey("Test_Keys");
return send(nodeId+"_"+BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage);
return send(BusinessTopic.RECALL_TOPIC,nodeId , recallMessage);
}
}

View File

@@ -19,16 +19,13 @@ spring:
cloud:
nacos:
discovery:
ip: @service.server.url@
server-addr: @nacos.url@
namespace: @nacos.namespace@
username: @nacos.username@
password: @nacos.password@
config:
server-addr: @nacos.url@
namespace: @nacos.namespace@
username: @nacos.username@
password: @nacos.password@
server-addr: @nacos.url@
namespace: @nacos.namespace@
file-extension: yaml
shared-configs:
- data-id: share-config.yaml