7 Commits

15 changed files with 164 additions and 70 deletions

View File

@@ -123,9 +123,9 @@ public class ExecutionCenter extends BaseController {
startDate = DateUtil.offsetDay(startDate, 1); startDate = DateUtil.offsetDay(startDate, 1);
} }
calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN)); calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// CalculatedParam repairParam = BeanUtil.copyProperties(calculatedParam, CalculatedParam.class); CalculatedParam repairParam = BeanUtil.copyProperties(calculatedParam, CalculatedParam.class);
calculatedParam.setType(0); repairParam.setType(0);
flowService.execute2Resp(methodDescribe, calculatedParam); flowService.execute2Resp(methodDescribe, repairParam);
} }
} else { } else {
//非补招 //非补招

View File

@@ -4,11 +4,13 @@ import com.njcn.device.biz.pojo.po.Overlimit;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
public interface IDataLimitRateAsync { public interface IDataLimitRateAsync {
/** /**
* limit_rate多线程算法 * limit_rate多线程算法
*
* @param dataDate * @param dataDate
* @param list * @param list
* @param phase * @param phase
@@ -16,10 +18,11 @@ public interface IDataLimitRateAsync {
* @param size * @param size
* @param i * @param i
* @param type 系统类型 * @param type 系统类型
* @return
* @Author: wr * @Author: wr
* @Date: 2025/12/17 12:16 * @Date: 2025/12/17 12:16
*/ */
void lineDataRate(String dataDate, CompletableFuture<Void> lineDataRate(String dataDate,
List<String> list, List<String> list,
List<String> phase, List<String> phase,
Map<String, Overlimit> overLimitMap, Map<String, Overlimit> overLimitMap,

View File

@@ -58,7 +58,7 @@ public class DataComAssServiceImpl implements IDataComAssService {
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate())); lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate())); lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
List<String> lineIdList = calculatedParam.getIdList(); List<String> lineIdList = calculatedParam.getIdList();
getAbnormalData(lineParam); //getAbnormalData(lineParam);
for (String lineId : lineIdList) { for (String lineId : lineIdList) {
DataComassesDPO rStatComassesDpo = new DataComassesDPO(); DataComassesDPO rStatComassesDpo = new DataComassesDPO();
rStatComassesDpo.setTime(calculatedParam.getDataDate()); rStatComassesDpo.setTime(calculatedParam.getDataDate());

View File

@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -108,11 +109,26 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
List<List<String>> pendingIds = ListUtils.partition(lineIds, 1); List<List<String>> pendingIds = ListUtils.partition(lineIds, 1);
ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C); ArrayList<String> phase = ListUtil.toList(PhaseType.PHASE_A, PhaseType.PHASE_B, PhaseType.PHASE_C);
MemorySizeUtil.getNowMemory(); MemorySizeUtil.getNowMemory();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < pendingIds.size(); i++) { for (int i = 0; i < pendingIds.size(); i++) {
logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区"); logger.info(calculatedParam.getDataDate()+" 总分区数据:" + pendingIds.size() + "=====》当前第"+(i + 1)+"小分区");
List<String> list = pendingIds.get(i); List<String> list = pendingIds.get(i);
dataLimitRateAsync.lineDataRate(calculatedParam.getDataDate(), list, phase, overLimitMap, pendingIds.size(), (i + 1), lineParam.getType()); // 获取Future
CompletableFuture<Void> future = dataLimitRateAsync.lineDataRate(
calculatedParam.getDataDate(),
list,
phase,
overLimitMap,
pendingIds.size(),
(i + 1),
lineParam.getType()
);
futures.add(future);
} }
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
System.gc(); System.gc();
} }

View File

@@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@@ -56,7 +57,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
@Override @Override
@Async("asyncExecutor") @Async("asyncExecutor")
public void lineDataRate(String dataDate, public CompletableFuture<Void> lineDataRate(String dataDate,
List<String> list, List<String> list,
List<String> phase, List<String> phase,
Map<String, Overlimit> overLimitMap, Map<String, Overlimit> overLimitMap,
@@ -123,7 +124,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
*/ */
Map<String, List<DataHarmDto>> harmRateV = dataVHarmList.stream() Map<String, List<DataHarmDto>> harmRateV = dataVHarmList.stream()
.filter(x -> phase.contains(x.getPhasicType())) .filter(x -> phase.contains(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) .filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataHarmDto::getLineId)); .collect(Collectors.groupingBy(DataHarmDto::getLineId));
/** /**
@@ -131,7 +132,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
*/ */
Map<String, List<DataIDto>> dataI = dataIList.stream() Map<String, List<DataIDto>> dataI = dataIList.stream()
.filter(x -> phase.contains(x.getPhasicType())) .filter(x -> phase.contains(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) .filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataIDto::getLineId)); .collect(Collectors.groupingBy(DataIDto::getLineId));
@@ -140,7 +141,7 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
*/ */
Map<String, List<DataHarmDto>> inHarmV = dataVInHarmList.stream() Map<String, List<DataHarmDto>> inHarmV = dataVInHarmList.stream()
.filter(x -> phase.contains(x.getPhasicType())) .filter(x -> phase.contains(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) .filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataHarmDto::getLineId)); .collect(Collectors.groupingBy(DataHarmDto::getLineId));
/** /**
@@ -148,49 +149,57 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
*/ */
Map<String, List<DataVDto>> dataVThd = dataVAllTime.stream() Map<String, List<DataVDto>> dataVThd = dataVAllTime.stream()
.filter(x -> phase.contains(x.getPhasicType())) .filter(x -> phase.contains(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType())) .filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataVDto::getLineId)); .collect(Collectors.groupingBy(DataVDto::getLineId));
/** /**
* 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值 * 功能描述:获取influxDB -> data_v -> 负序电压不平衡度 -> 最大值 && 日95%概率值
*/ */
Map<String, List<DataVDto>> dataVUnbalance = dataVAllTime.stream() Map<String, List<DataVDto>> dataVUnbalance = dataVAllTime.stream()
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equalsIgnoreCase(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || .filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()) ||
InfluxDBTableConstant.MAX.equals(x.getValueType())) InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataVDto::getLineId)); .collect(Collectors.groupingBy(DataVDto::getLineId));
/** /**
* 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值 * 功能描述:获取influxDB -> data_i -> 负序电流 -> 最大值 && 日95%概率值
*/ */
Map<String, List<DataIDto>> dataINeg = dataIList.stream() Map<String, List<DataIDto>> dataINeg = dataIList.stream()
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equalsIgnoreCase(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.CP95.equals(x.getValueType()) || .filter(x -> InfluxDBTableConstant.CP95.equalsIgnoreCase(x.getValueType()) ||
InfluxDBTableConstant.MAX.equals(x.getValueType())) InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataIDto::getLineId)); .collect(Collectors.groupingBy(DataIDto::getLineId));
/** /**
* 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值 * 功能描述:获取influxDB -> data_v -> 频率偏差 -> 最大值 && 最小值
*/ */
Map<String, List<DataVDto>> dataVFreq = dataVAllTime.stream() Map<String, List<DataVDto>> dataVFreq = dataVAllTime.stream()
.filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equals(x.getPhasicType())) .filter(x -> InfluxDBTableConstant.PHASE_TYPE_T.equalsIgnoreCase(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.MIN.equals(x.getValueType()) || .filter(x -> InfluxDBTableConstant.MIN.equalsIgnoreCase(x.getValueType()) ||
InfluxDBTableConstant.MAX.equals(x.getValueType())) InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataVDto::getLineId)); .collect(Collectors.groupingBy(DataVDto::getLineId));
/** /**
* 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值 * 功能描述:获取influxDB -> data_v -> 电压偏差 -> 最大值
*/ */
Map<String, List<DataVDto>> dataVDev = dataVAllTime.stream() Map<String, List<DataVDto>> dataVDev = dataVAllTime.stream()
.filter(x -> phase.contains(x.getPhasicType())) .filter(x -> phase.contains(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.MAX.equals(x.getValueType())) .filter(x -> InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataVDto::getLineId)); .collect(Collectors.groupingBy(DataVDto::getLineId));
/** /**
* 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较) * 功能描述:获取influxDB -> data_plt -> 长时间闪变 -> 注(取最大值原始算法去掉了,现没有根据最大值比较)
*/ */
Map<String, List<DataPltDto>> dataPlt = dataPltAllTime.stream() Map<String, List<DataPltDto>> dataPlt;
if (type == 2) {
dataPlt = dataPltAllTime.stream()
.filter(x -> phase.contains(x.getPhasicType())) .filter(x -> phase.contains(x.getPhasicType()))
.filter(x -> InfluxDBTableConstant.MAX.equalsIgnoreCase(x.getValueType()))
.collect(Collectors.groupingBy(DataPltDto::getLineId)); .collect(Collectors.groupingBy(DataPltDto::getLineId));
} else {
dataPlt = dataPltAllTime.stream()
.filter(x -> PhaseType.PHASE_A.equals(x.getPhasicType()))
.collect(Collectors.groupingBy(DataPltDto::getLineId));
}
for (String item : list) { for (String item : list) {
if (ObjectUtil.isNotNull(overLimitMap.get(item))) { if (ObjectUtil.isNotNull(overLimitMap.get(item))) {
@@ -229,6 +238,8 @@ public class IDataLimitRateAsyncImpl implements IDataLimitRateAsync {
MemorySizeUtil.getNowMemory(); MemorySizeUtil.getNowMemory();
} }
System.gc(); System.gc();
return CompletableFuture.completedFuture(null);
} }
/** /**

View File

@@ -80,19 +80,21 @@ public class PollutionCalcImpl implements IPollutionCalc {
LineDevGetDTO line = lineDetailMap.get(id); LineDevGetDTO line = lineDetailMap.get(id);
if (limitMap.containsKey(id)) { if (limitMap.containsKey(id)) {
Overlimit overlimit = limitMap.get(id); Overlimit overlimit = limitMap.get(id);
lineParam.setValueType(Arrays.asList(line.getTimeInterval() + ""));
lineParam.setLineId(Arrays.asList(id));
if(!dataVFeignClient.excludeZeroData(lineParam).getData()){
dataPollutionD = new DataPollutionD(); dataPollutionD = new DataPollutionD();
dataPollutionD.setLineId(id); dataPollutionD.setLineId(id);
dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate())); dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()));
dataPollutionD.setPollutionType(vHarmonicLimit); dataPollutionD.setPollutionType(vHarmonicLimit);
lineParam.setValueType(Arrays.asList(line.getTimeInterval() + ""));
lineParam.setLineId(Arrays.asList(id));
List<DataVDto> dataVDtoList = dataVFeignClient.getGroupByTimeDataV(lineParam).getData(); List<DataVDto> dataVDtoList = dataVFeignClient.getGroupByTimeDataV(lineParam).getData();
List<DataHarmDto> dataHarmDtoList = dataHarmRateVFeignClient.getGroupByTimeHarmRateV(lineParam).getData(); List<DataHarmDto> dataHarmDtoList = dataHarmRateVFeignClient.getGroupByTimeHarmRateV(lineParam).getData();
//计算谐波电压污染值 //计算谐波电压污染值
dataPollutionD.setValue(PubUtils.doubleRound(2, calcVAllPollutionValue(dataVDtoList, dataHarmDtoList, overlimit) * line.getTimeInterval())); dataPollutionD.setValue(PubUtils.doubleRound(2, calcVAllPollutionValue(dataVDtoList, dataHarmDtoList, overlimit) * line.getTimeInterval()));
list.add(dataPollutionD); list.add(dataPollutionD);
}
dataPollutionD = new DataPollutionD(); dataPollutionD = new DataPollutionD();
dataPollutionD.setLineId(id); dataPollutionD.setLineId(id);
dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate())); dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()));

View File

@@ -68,4 +68,8 @@ public interface DataVFeignClient {
//按时间分组获取原始数据 //按时间分组获取原始数据
@PostMapping("/getGroupByTimeDataV") @PostMapping("/getGroupByTimeDataV")
HttpResult<List<DataVDto>> getGroupByTimeDataV(@RequestBody LineCountEvaluateParam lineParam); HttpResult<List<DataVDto>> getGroupByTimeDataV(@RequestBody LineCountEvaluateParam lineParam);
@PostMapping("/excludeZeroData")
HttpResult<Boolean> excludeZeroData(@RequestBody LineCountEvaluateParam lineParam);
} }

View File

@@ -116,6 +116,12 @@ public class DataVFeignClientFallbackFactory implements FallbackFactory<DataVFei
log.error("{}异常,降级处理,异常为:{}","DataV按时间分组获取原始数据",cause.toString()); log.error("{}异常,降级处理,异常为:{}","DataV按时间分组获取原始数据",cause.toString());
throw new BusinessException(finalExceptionEnum); throw new BusinessException(finalExceptionEnum);
} }
@Override
public HttpResult<Boolean> excludeZeroData(LineCountEvaluateParam lineParam) {
log.error("{}异常,降级处理,异常为:{}","判断是否存在是零飘数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
}; };
} }
} }

View File

@@ -18,6 +18,8 @@ public class PqsCommunicateDto {
private String description; private String description;
private Integer type; private Integer type;
//是否更新updateTime标志数据上送更新1状态翻转不更新0
private Integer flag=0;
} }

View File

@@ -166,6 +166,15 @@ public class DataVController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataV, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, dataV, methodDescribe);
} }
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/excludeZeroData")
@ApiOperation("判断是否存在是零飘数据")
public HttpResult<Boolean> excludeZeroData(@RequestBody LineCountEvaluateParam lineParam) {
String methodDescribe = getMethodDescribe("excludeZeroData");
Boolean b = dataVQuery.excludeZeroData(lineParam);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, b, methodDescribe);
}

View File

@@ -81,4 +81,17 @@ public interface IDataV extends IMppService<RStatDataVD> {
List<DataVDto> getDataV(LineCountEvaluateParam lineParam); List<DataVDto> getDataV(LineCountEvaluateParam lineParam);
List<DataVDto> getGroupByTimeDataV(LineCountEvaluateParam lineParam); List<DataVDto> getGroupByTimeDataV(LineCountEvaluateParam lineParam);
/**
* 查看监测点的电压,判断是否存在是零飘数据
* 如果零飘数据占比一半多,则抛弃该监测点数据
* @param lineParam
* @return: java.lang.Boolean
* @Author: wr
* @Date: 2026/1/15 9:26
*/
Boolean excludeZeroData(LineCountEvaluateParam lineParam);
} }

View File

@@ -97,7 +97,7 @@ public class LnDataDealServiceImpl implements LnDataDealService {
pqsCommunicateDto.setTime(LocalDateTimeUtil.format(dataVDTO.getTimeid(), DatePattern.NORM_DATETIME_PATTERN)); pqsCommunicateDto.setTime(LocalDateTimeUtil.format(dataVDTO.getTimeid(), DatePattern.NORM_DATETIME_PATTERN));
pqsCommunicateDto.setDevId(temp); pqsCommunicateDto.setDevId(temp);
pqsCommunicateDto.setType(1); pqsCommunicateDto.setType(1);
pqsCommunicateDto.setFlag(1);
iPqsCommunicate.insertion(pqsCommunicateDto); iPqsCommunicate.insertion(pqsCommunicateDto);
}); });

View File

@@ -425,6 +425,27 @@ public class InfluxdbDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
return result; return result;
} }
@Override
public Boolean excludeZeroData(LineCountEvaluateParam lineParam) {
InfluxQueryWrapper dataVQueryWrapper = new InfluxQueryWrapper(DataV.class);
dataVQueryWrapper.eq(DataV::getLineId, lineParam.getLineId().get(0))
.eq(DataV::getValueType, InfluxDbSqlConstant.AVG_WEB)
.ne(DataV::getPhasicType, InfluxDBTableConstant.PHASE_TYPE_T)
.select(DataV::getRms)
.between(DataV::getTime, lineParam.getStartTime(), lineParam.getEndTime());
List<DataV> rmsResult = dataVMapper.selectByQueryWrapper(dataVQueryWrapper);
if (CollUtil.isNotEmpty(rmsResult)) {
List<DataV> exceptionData = rmsResult.stream().filter(dataV -> dataV.getRms() < 1.0).collect(Collectors.toList());
if (CollUtil.isNotEmpty(exceptionData)) {
if (exceptionData.size() * 2 >= rmsResult.size()) {
return true;
}
}
}
return false;
}
/** /**
* 按监测点集合、时间条件获取dataV分钟数据 * 按监测点集合、时间条件获取dataV分钟数据
* timeMap参数来判断是否进行数据处理 timeMap为空则不进行数据处理 * timeMap参数来判断是否进行数据处理 timeMap为空则不进行数据处理

View File

@@ -129,7 +129,9 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate {
//更新mysql数据 //更新mysql数据
DevComFlagDTO devComFlagDTO = new DevComFlagDTO(); DevComFlagDTO devComFlagDTO = new DevComFlagDTO();
devComFlagDTO.setId(pqsCommunicateDto.getDevId()); devComFlagDTO.setId(pqsCommunicateDto.getDevId());
if(Objects.equals(pqsCommunicateDto.getFlag(),1)){
devComFlagDTO.setDate(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER)); devComFlagDTO.setDate(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER));
}
devComFlagDTO.setStatus(pqsCommunicateDto.getType()); devComFlagDTO.setStatus(pqsCommunicateDto.getType());
deviceFeignClient.updateDevComFlag(devComFlagDTO); deviceFeignClient.updateDevComFlag(devComFlagDTO);

View File

@@ -167,6 +167,11 @@ public class RelationDataVImpl extends MppServiceImpl<RStatDataVRelationMapper,
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public Boolean excludeZeroData(LineCountEvaluateParam lineParam) {
return null;
}
private List<DataVDto> quality(List<DataVDto> list, LineCountEvaluateParam lineParam) { private List<DataVDto> quality(List<DataVDto> list, LineCountEvaluateParam lineParam) {
List<DataVDto> result = new ArrayList<>(); List<DataVDto> result = new ArrayList<>();
Map<String, List<DataVDto>> lineMap = list.stream().collect(Collectors.groupingBy(DataVDto::getLineId)); Map<String, List<DataVDto>> lineMap = list.stream().collect(Collectors.groupingBy(DataVDto::getLineId));