新增云协议相关内容
This commit is contained in:
@@ -19,7 +19,6 @@ import com.njcn.csdevice.pojo.dto.DevDetailDTO;
|
||||
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
|
||||
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
|
||||
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.rt.pojo.dto.BaseRealDataSet;
|
||||
import com.njcn.user.api.AppUserFeignClient;
|
||||
@@ -122,12 +121,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
|
||||
}
|
||||
}
|
||||
//云前置设备心跳丢失处理
|
||||
if(expiredKey.startsWith(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey())){
|
||||
String node = expiredKey.split(":")[1];
|
||||
String nodeId = node.substring(0, node.length() - 1);
|
||||
int processNo = Integer.parseInt(node.substring(node.length() - 1));
|
||||
equipmentFeignClient.updateCldDevStatus(nodeId,processNo);
|
||||
}
|
||||
// if(expiredKey.startsWith(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey())){
|
||||
// String node = expiredKey.split(":")[1];
|
||||
// String nodeId = node.substring(0, node.length() - 1);
|
||||
// int processNo = Integer.parseInt(node.substring(node.length() - 1));
|
||||
// equipmentFeignClient.updateCldDevStatus(nodeId,processNo);
|
||||
// }
|
||||
}
|
||||
|
||||
//主任务
|
||||
|
||||
@@ -0,0 +1,143 @@
|
||||
package com.njcn.access.runner;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.njcn.access.service.ICsEquipmentDeliveryService;
|
||||
import com.njcn.csdevice.api.EquipmentFeignClient;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* @author xy
|
||||
* 定时轮询离线设备接入
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class CldHeartTimer implements ApplicationRunner {
|
||||
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
private static final long OUT_TIME = 120L;
|
||||
private final RedisUtil redisUtil;
|
||||
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
|
||||
private final EquipmentFeignClient equipmentFeignClient;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
if (scheduler.isShutdown() || scheduler.isTerminated()) {
|
||||
scheduler = Executors.newScheduledThreadPool(1);
|
||||
}
|
||||
Runnable task = () -> {
|
||||
try {
|
||||
executeScheduledTask();
|
||||
}
|
||||
// 捕获所有Throwable,包括Error
|
||||
catch (Throwable t) {
|
||||
log.error("定时任务发生严重异常,尝试恢复", t);
|
||||
// 可以添加重启逻辑或告警
|
||||
}
|
||||
};
|
||||
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(task, OUT_TIME, OUT_TIME, TimeUnit.SECONDS);
|
||||
// 添加监控,如果任务被取消则重新调度
|
||||
monitorScheduledTask(future);
|
||||
}
|
||||
|
||||
//10分钟检查一下调度任务
|
||||
private void monitorScheduledTask(ScheduledFuture<?> future) {
|
||||
Thread monitorThread = new Thread(() -> {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
//每10分钟检查一次
|
||||
Thread.sleep(600000);
|
||||
if (future.isCancelled() || future.isDone()) {
|
||||
log.warn("定时任务被取消或完成,重新调度...");
|
||||
// 重新启动任务
|
||||
run(null);
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("监控线程被中断");
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
log.error("监控任务异常", e);
|
||||
}
|
||||
}
|
||||
}, "Schedule-Monitor-Thread");
|
||||
monitorThread.setDaemon(true);
|
||||
monitorThread.start();
|
||||
}
|
||||
|
||||
private void executeScheduledTask() {
|
||||
log.info("定时检查云前置心跳!");
|
||||
//获取在运设备的所有前置和进程号,循环查询redis里面是否存在,不存在则将所有前置下的设备翻转
|
||||
List<String> list = csEquipmentDeliveryService.getFrontAndProcess();
|
||||
if (CollUtil.isNotEmpty(list)) {
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
List<List<String>> subLists = CollUtil.split(list, 2);
|
||||
List<Future<Void>> futures = new ArrayList<>();
|
||||
for (List<String> subList : subLists) {
|
||||
futures.add(executor.submit(() -> {
|
||||
try {
|
||||
accessDevSafely(subList);
|
||||
} catch (Exception e) {
|
||||
log.error("处理设备子列表异常", e);
|
||||
}
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
for (Future<Void> future : futures) {
|
||||
try {
|
||||
future.get(5, TimeUnit.MINUTES);
|
||||
} catch (TimeoutException e) {
|
||||
log.error("任务执行超时", e);
|
||||
} catch (Exception e) {
|
||||
log.error("任务执行异常", e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
try {
|
||||
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
executor.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//安全的accessDev版本
|
||||
private void accessDevSafely(List<String> list) {
|
||||
if (CollUtil.isNotEmpty(list)) {
|
||||
for (String item : list) {
|
||||
try {
|
||||
processSingleDevice(item);
|
||||
} catch (Exception e) {
|
||||
log.error("处理设备 {} 失败: {}", item, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processSingleDevice(String item) {
|
||||
Object object = redisUtil.getObjectByKey(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey() + item);
|
||||
if (Objects.isNull(object)) {
|
||||
String nodeId = item.substring(0, item.length() - 1);
|
||||
int processNo = Integer.parseInt(item.substring(item.length() - 1));
|
||||
equipmentFeignClient.updateCldDevStatus(nodeId,processNo);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -69,4 +69,9 @@ public interface ICsEquipmentDeliveryService extends IService<CsEquipmentDeliver
|
||||
* 获取离线、启用、客户端在线的装置
|
||||
*/
|
||||
List<CsEquipmentDeliveryPO> getOfflineDev();
|
||||
|
||||
/**
|
||||
* 获取在运且在线的装置 所属前置和进程号
|
||||
*/
|
||||
List<String> getFrontAndProcess();
|
||||
}
|
||||
|
||||
@@ -97,10 +97,10 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
||||
DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData();
|
||||
if (Objects.isNull(dictTreeVO)){
|
||||
throw new BusinessException(AccessResponseEnum.DEV_TYPE_NOT_FIND);
|
||||
} else if (Objects.equals(devType,"CLD")) {
|
||||
} else if (Objects.equals(devType,DicDataEnum.DEV_CLD.getCode())) {
|
||||
//查询是否已存在云前置模板
|
||||
LambdaQueryWrapper<CsDevModelPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(CsDevModelPO::getName,"CLD").eq(CsDevModelPO::getStatus,1);
|
||||
lambdaQueryWrapper.eq(CsDevModelPO::getName,DicDataEnum.DEV_CLD.getCode()).eq(CsDevModelPO::getStatus,1);
|
||||
List<CsDevModelPO> list = csDevModelMapper.selectList(lambdaQueryWrapper);
|
||||
if (CollectionUtil.isNotEmpty(list)) {
|
||||
throw new BusinessException(AccessResponseEnum.CLD_MODEL_EXIST);
|
||||
@@ -946,37 +946,38 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
||||
}
|
||||
if(CollectionUtil.isNotEmpty(arrayList)) {
|
||||
csDataArrayService.addList(arrayList);
|
||||
List<CsGroup> ls = new ArrayList<>();
|
||||
List<CsGroArr> groArrList = new ArrayList<>();
|
||||
Map<String,List<CsDataArray>> setMap = arrayList.stream().collect(Collectors.groupingBy(CsDataArray::getPid,LinkedHashMap::new,Collectors.toList()));
|
||||
setMap.forEach((k0,v0)->{
|
||||
AtomicReference<Integer> sort = new AtomicReference<>(0);
|
||||
Map<String,List<CsDataArray>> map = v0.stream().filter(a-> "avg".equals(a.getStatMethod()) || Objects.isNull(a.getStatMethod())).collect(Collectors.groupingBy(CsDataArray::getAnotherName,LinkedHashMap::new,Collectors.toList()));
|
||||
map.forEach((k,v)->{
|
||||
//录入组数据
|
||||
String groupId = IdUtil.simpleUUID();
|
||||
CsGroup csGroup = new CsGroup();
|
||||
csGroup.setId(groupId);
|
||||
csGroup.setDataSetId(k0);
|
||||
csGroup.setGroupName(k);
|
||||
csGroup.setSort(sort.getAndSet(sort.get() + 1));
|
||||
csGroup.setIsShow(1);
|
||||
ls.add(csGroup);
|
||||
//录入组和指标关系
|
||||
v.forEach(item->{
|
||||
CsGroArr csGroArr = new CsGroArr();
|
||||
csGroArr.setGroupId(groupId);
|
||||
csGroArr.setArrayId(item.getId());
|
||||
groArrList.add(csGroArr);
|
||||
});
|
||||
});
|
||||
});
|
||||
if(CollectionUtil.isNotEmpty(ls)) {
|
||||
csGroupService.addList(ls);
|
||||
}
|
||||
if(CollectionUtil.isNotEmpty(groArrList)) {
|
||||
csGroArrService.addGroArrList(groArrList);
|
||||
}
|
||||
//物联这边没有分组的要求,这部分代码先注释,用能那边用到这个代码
|
||||
// List<CsGroup> ls = new ArrayList<>();
|
||||
// List<CsGroArr> groArrList = new ArrayList<>();
|
||||
// Map<String,List<CsDataArray>> setMap = arrayList.stream().collect(Collectors.groupingBy(CsDataArray::getPid,LinkedHashMap::new,Collectors.toList()));
|
||||
// setMap.forEach((k0,v0)->{
|
||||
// AtomicReference<Integer> sort = new AtomicReference<>(0);
|
||||
// Map<String,List<CsDataArray>> map = v0.stream().filter(a-> "avg".equals(a.getStatMethod()) || Objects.isNull(a.getStatMethod())).collect(Collectors.groupingBy(CsDataArray::getAnotherName,LinkedHashMap::new,Collectors.toList()));
|
||||
// map.forEach((k,v)->{
|
||||
// //录入组数据
|
||||
// String groupId = IdUtil.simpleUUID();
|
||||
// CsGroup csGroup = new CsGroup();
|
||||
// csGroup.setId(groupId);
|
||||
// csGroup.setDataSetId(k0);
|
||||
// csGroup.setGroupName(k);
|
||||
// csGroup.setSort(sort.getAndSet(sort.get() + 1));
|
||||
// csGroup.setIsShow(1);
|
||||
// ls.add(csGroup);
|
||||
// //录入组和指标关系
|
||||
// v.forEach(item->{
|
||||
// CsGroArr csGroArr = new CsGroArr();
|
||||
// csGroArr.setGroupId(groupId);
|
||||
// csGroArr.setArrayId(item.getId());
|
||||
// groArrList.add(csGroArr);
|
||||
// });
|
||||
// });
|
||||
// });
|
||||
// if(CollectionUtil.isNotEmpty(ls)) {
|
||||
// csGroupService.addList(ls);
|
||||
// }
|
||||
// if(CollectionUtil.isNotEmpty(groArrList)) {
|
||||
// csGroArrService.addGroArrList(groArrList);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.njcn.access.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.access.enums.AccessEnum;
|
||||
@@ -16,10 +18,8 @@ import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -125,6 +125,7 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
|
||||
.eq(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.OFFLINE.getCode())
|
||||
.eq(CsEquipmentDeliveryPO::getUsageStatus,1)
|
||||
.in(CsEquipmentDeliveryPO::getStatus, Arrays.asList(2,3))
|
||||
.isNull(CsEquipmentDeliveryPO::getNodeId)
|
||||
.list();
|
||||
if (CollUtil.isNotEmpty(list)) {
|
||||
list.forEach(item->{
|
||||
@@ -145,4 +146,18 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getFrontAndProcess() {
|
||||
QueryWrapper<CsEquipmentDeliveryPO> wrapper = new QueryWrapper<>();
|
||||
wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated");
|
||||
wrapper.eq("usage_status", 1);
|
||||
wrapper.eq("run_status", 2);
|
||||
wrapper.isNotNull("node_id");
|
||||
return baseMapper.selectObjs(wrapper)
|
||||
.stream()
|
||||
.map(obj -> (String) obj)
|
||||
.distinct()
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user