Compare commits
5 Commits
fd398a85de
...
2026-02
| Author | SHA1 | Date | |
|---|---|---|---|
| d3e46f5ba4 | |||
| 35939e6f8f | |||
| ba1f5a2e00 | |||
|
|
97e3386d53 | ||
| 4f8c34d21b |
@@ -4,26 +4,29 @@ import com.njcn.device.biz.pojo.po.Overlimit;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public interface IDataLimitRateAsync {
|
public interface IDataLimitRateAsync {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* limit_rate多线程算法
|
* limit_rate多线程算法
|
||||||
|
*
|
||||||
* @param dataDate
|
* @param dataDate
|
||||||
* @param list
|
* @param list
|
||||||
* @param phase
|
* @param phase
|
||||||
* @param overLimitMap
|
* @param overLimitMap
|
||||||
* @param size
|
* @param size
|
||||||
* @param i
|
* @param i
|
||||||
* @param type 系统类型
|
* @param type 系统类型
|
||||||
|
* @return
|
||||||
* @Author: wr
|
* @Author: wr
|
||||||
* @Date: 2025/12/17 12:16
|
* @Date: 2025/12/17 12:16
|
||||||
*/
|
*/
|
||||||
void lineDataRate(String dataDate,
|
CompletableFuture<Void> lineDataRate(String dataDate,
|
||||||
List<String> list,
|
List<String> list,
|
||||||
List<String> phase,
|
List<String> phase,
|
||||||
Map<String, Overlimit> overLimitMap,
|
Map<String, Overlimit> overLimitMap,
|
||||||
int size,
|
int size,
|
||||||
int i,
|
int i,
|
||||||
int type);
|
int type);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -108,11 +109,26 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
List<List<String>> pendingIds = ListUtils.partition(lineIds, 1);
|
List<List<String>> pendingIds = ListUtils.partition(lineIds, 1);
|
||||||
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
|
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
|
||||||
MemorySizeUtil.getNowMemory();
|
MemorySizeUtil.getNowMemory();
|
||||||
|
|
||||||
|
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||||
for (int i = 0; i < pendingIds.size(); i++) {
|
for (int i = 0; i < pendingIds.size(); i++) {
|
||||||
logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区");
|
logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区");
|
||||||
List<String> list = pendingIds.get(i);
|
List<String> list = pendingIds.get(i);
|
||||||
dataLimitRateAsync.lineDataRate(calculatedParam.getDataDate(), list, phase, overLimitMap, pendingIds.size(), (i + 1), lineParam.getType());
|
// 获取Future
|
||||||
|
CompletableFuture<Void> future = dataLimitRateAsync.lineDataRate(
|
||||||
|
calculatedParam.getDataDate(),
|
||||||
|
list,
|
||||||
|
phase,
|
||||||
|
overLimitMap,
|
||||||
|
pendingIds.size(),
|
||||||
|
(i + 1),
|
||||||
|
lineParam.getType()
|
||||||
|
);
|
||||||
|
|
||||||
|
futures.add(future);
|
||||||
}
|
}
|
||||||
|
// 等待所有任务完成
|
||||||
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||||
System.gc();
|
System.gc();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils;
|
|||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -56,13 +57,13 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Async("asyncExecutor")
|
@Async("asyncExecutor")
|
||||||
public void lineDataRate(String dataDate,
|
public CompletableFuture<Void> lineDataRate(String dataDate,
|
||||||
List<String> list,
|
List<String> list,
|
||||||
List<String> phase,
|
List<String> phase,
|
||||||
Map<String, Overlimit> overLimitMap,
|
Map<String, Overlimit> overLimitMap,
|
||||||
int size,
|
int size,
|
||||||
int i,
|
int i,
|
||||||
int type) {
|
int type) {
|
||||||
List<DataLimitDetailDto> result = new ArrayList<>();
|
List<DataLimitDetailDto> result = new ArrayList<>();
|
||||||
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
lineParam.setStartTime(TimeUtils.getBeginOfDay(dataDate));
|
lineParam.setStartTime(TimeUtils.getBeginOfDay(dataDate));
|
||||||
@@ -123,7 +124,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
|
|||||||
*/
|
*/
|
||||||
Map<String, List<DataHarmDto>> harmRateV = dataVHarmList.stream()
|
Map<String, List<DataHarmDto>> harmRateV = dataVHarmList.stream()
|
||||||
.filter(x -> phase.contains(x.getPhasicType()))
|
.filter(x -> phase.contains(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
|
.filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataHarmDto::getLineId));
|
.collect(Collectors.groupingBy(DataHarmDto::getLineId));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -131,7 +132,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
|
|||||||
*/
|
*/
|
||||||
Map<String, List<DataIDto>> dataI = dataIList.stream()
|
Map<String, List<DataIDto>> dataI = dataIList.stream()
|
||||||
.filter(x -> phase.contains(x.getPhasicType()))
|
.filter(x -> phase.contains(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
|
.filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataIDto::getLineId));
|
.collect(Collectors.groupingBy(DataIDto::getLineId));
|
||||||
|
|
||||||
|
|
||||||
@@ -140,7 +141,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
|
|||||||
*/
|
*/
|
||||||
Map<String, List<DataHarmDto>> inHarmV = dataVInHarmList.stream()
|
Map<String, List<DataHarmDto>> inHarmV = dataVInHarmList.stream()
|
||||||
.filter(x -> phase.contains(x.getPhasicType()))
|
.filter(x -> phase.contains(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
|
.filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataHarmDto::getLineId));
|
.collect(Collectors.groupingBy(DataHarmDto::getLineId));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -148,49 +149,57 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
|
|||||||
*/
|
*/
|
||||||
Map<String, List<DataVDto>> dataVThd = dataVAllTime.stream()
|
Map<String, List<DataVDto>> dataVThd = dataVAllTime.stream()
|
||||||
.filter(x -> phase.contains(x.getPhasicType()))
|
.filter(x -> phase.contains(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()))
|
.filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值
|
* 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值
|
||||||
*/
|
*/
|
||||||
Map<String, List<DataVDto>> dataVUnbalance = dataVAllTime.stream()
|
Map<String, List<DataVDto>> dataVUnbalance = dataVAllTime.stream()
|
||||||
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType()))
|
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equalsIgnoreCase(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) ||
|
.filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()) ||
|
||||||
InfluxDBTableConstant.MAX.equals(x.getValueType()))
|
InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值
|
* 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值
|
||||||
*/
|
*/
|
||||||
Map<String, List<DataIDto>> dataINeg = dataIList.stream()
|
Map<String, List<DataIDto>> dataINeg = dataIList.stream()
|
||||||
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType()))
|
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equalsIgnoreCase(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) ||
|
.filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()) ||
|
||||||
InfluxDBTableConstant.MAX.equals(x.getValueType()))
|
InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataIDto::getLineId));
|
.collect(Collectors.groupingBy(DataIDto::getLineId));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值
|
* 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值
|
||||||
*/
|
*/
|
||||||
Map<String, List<DataVDto>> dataVFreq = dataVAllTime.stream()
|
Map<String, List<DataVDto>> dataVFreq = dataVAllTime.stream()
|
||||||
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType()))
|
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equalsIgnoreCase(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.MIN.equals(x.getValueType()) ||
|
.filter(x -> InfluxDBTableConstant.MIN.equalsIgnoreCase(x.getValueType()) ||
|
||||||
InfluxDBTableConstant.MAX.equals(x.getValueType()))
|
InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
||||||
/**
|
/**
|
||||||
* 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值
|
* 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值
|
||||||
*/
|
*/
|
||||||
Map<String, List<DataVDto>> dataVDev = dataVAllTime.stream()
|
Map<String, List<DataVDto>> dataVDev = dataVAllTime.stream()
|
||||||
.filter(x -> phase.contains(x.getPhasicType()))
|
.filter(x -> phase.contains(x.getPhasicType()))
|
||||||
.filter(x -> InfluxDBTableConstant.MAX.equals(x.getValueType()))
|
.filter(x -> InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
|
||||||
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
.collect(Collectors.groupingBy(DataVDto::getLineId));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较)
|
* 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较)
|
||||||
*/
|
*/
|
||||||
Map<String, List<DataPltDto>> dataPlt = dataPltAllTime.stream()
|
Map<String, List<DataPltDto>> dataPlt;
|
||||||
.filter(x -> phase.contains(x.getPhasicType()))
|
if (type == 2) {
|
||||||
.collect(Collectors.groupingBy(DataPltDto::getLineId));
|
dataPlt = dataPltAllTime.stream()
|
||||||
|
.filter(x -> phase.contains(x.getPhasicType()))
|
||||||
|
.filter(x -> InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
|
||||||
|
.collect(Collectors.groupingBy(DataPltDto::getLineId));
|
||||||
|
} else {
|
||||||
|
dataPlt = dataPltAllTime.stream()
|
||||||
|
.filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType()))
|
||||||
|
.collect(Collectors.groupingBy(DataPltDto::getLineId));
|
||||||
|
}
|
||||||
|
|
||||||
for (String item : list) {
|
for (String item : list) {
|
||||||
if (ObjectUtil.isNotNull(overLimitMap.get(item))) {
|
if (ObjectUtil.isNotNull(overLimitMap.get(item))) {
|
||||||
@@ -229,6 +238,8 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
|
|||||||
MemorySizeUtil.getNowMemory();
|
MemorySizeUtil.getNowMemory();
|
||||||
}
|
}
|
||||||
System.gc();
|
System.gc();
|
||||||
|
|
||||||
|
return CompletableFuture.completedFuture(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ public class PqsCommunicateDto {
|
|||||||
private String description;
|
private String description;
|
||||||
|
|
||||||
private Integer type;
|
private Integer type;
|
||||||
|
//是否更新updateTime标志;数据上送更新1,状态翻转不更新0
|
||||||
|
private Integer flag=0;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ public class LnDataDealServiceImpl implements LnDataDealService {
|
|||||||
pqsCommunicateDto.setTime(LocalDateTimeUtil.format(dataVDTO.getTimeid(), DatePattern.NORM_DATETIME_PATTERN));
|
pqsCommunicateDto.setTime(LocalDateTimeUtil.format(dataVDTO.getTimeid(), DatePattern.NORM_DATETIME_PATTERN));
|
||||||
pqsCommunicateDto.setDevId(temp);
|
pqsCommunicateDto.setDevId(temp);
|
||||||
pqsCommunicateDto.setType(1);
|
pqsCommunicateDto.setType(1);
|
||||||
|
pqsCommunicateDto.setFlag(1);
|
||||||
|
|
||||||
iPqsCommunicate.insertion(pqsCommunicateDto);
|
iPqsCommunicate.insertion(pqsCommunicateDto);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -129,7 +129,9 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
|
|||||||
//更新mysql数据
|
//更新mysql数据
|
||||||
DevComFlagDTO devComFlagDTO = new DevComFlagDTO();
|
DevComFlagDTO devComFlagDTO = new DevComFlagDTO();
|
||||||
devComFlagDTO.setId(pqsCommunicateDto.getDevId());
|
devComFlagDTO.setId(pqsCommunicateDto.getDevId());
|
||||||
devComFlagDTO.setDate(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER));
|
if(Objects.equals(pqsCommunicateDto.getFlag(),1)){
|
||||||
|
devComFlagDTO.setDate(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER));
|
||||||
|
}
|
||||||
devComFlagDTO.setStatus(pqsCommunicateDto.getType());
|
devComFlagDTO.setStatus(pqsCommunicateDto.getType());
|
||||||
|
|
||||||
deviceFeignClient.updateDevComFlag(devComFlagDTO);
|
deviceFeignClient.updateDevComFlag(devComFlagDTO);
|
||||||
|
|||||||
Reference in New Issue
Block a user