Compare commits
10 Commits
0383bff7fd
...
14a13d631c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
14a13d631c | ||
|
|
11ee847cac | ||
|
|
9261adb79a | ||
|
|
e0154fb3ef | ||
|
|
7bee39f076 | ||
|
|
99f12e74fb | ||
|
|
648a5e1947 | ||
|
|
f519fd3e56 | ||
|
|
536ee24888 | ||
|
|
2159525ed9 |
@@ -652,7 +652,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//电压总谐波畸变率
|
//电压总谐波畸变率
|
||||||
if (!CollectionUtils.isEmpty(dataVPOList)) {
|
if (!CollectionUtils.isEmpty(dataVPOList)) {
|
||||||
for (DataVDto item : dataVPOList) {
|
for (DataVDto item : dataVPOList) {
|
||||||
if (item.getVThd() > overlimit.getUaberrance()) {
|
if (ObjectUtil.isNull(item.getVThd())||item.getVThd() > overlimit.getUaberrance()) {
|
||||||
addAbnormalData(thd, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVThd(), overlimit.getUaberrance());
|
addAbnormalData(thd, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVThd(), overlimit.getUaberrance());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -660,7 +660,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//电压上偏差、电压下偏差(根据vl_dev的正负判断是用哪个值判断越限)
|
//电压上偏差、电压下偏差(根据vl_dev的正负判断是用哪个值判断越限)
|
||||||
if (!CollectionUtils.isEmpty(dataVPODevList)) {
|
if (!CollectionUtils.isEmpty(dataVPODevList)) {
|
||||||
for (DataVDto item : dataVPODevList) {
|
for (DataVDto item : dataVPODevList) {
|
||||||
if (item.getVlDev() >= 0) {
|
if (ObjectUtil.isNull(item.getVlDev())||item.getVlDev() >= 0) {
|
||||||
if (item.getVlDev() > overlimit.getVoltageDev()) {
|
if (item.getVlDev() > overlimit.getVoltageDev()) {
|
||||||
addAbnormalData(uDev, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVlDev(), overlimit.getVoltageDev());
|
addAbnormalData(uDev, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVlDev(), overlimit.getVoltageDev());
|
||||||
}
|
}
|
||||||
@@ -674,7 +674,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//长时间闪变
|
//长时间闪变
|
||||||
if (!CollectionUtils.isEmpty(dataPltPOList)) {
|
if (!CollectionUtils.isEmpty(dataPltPOList)) {
|
||||||
for (DataPltDto item : dataPltPOList) {
|
for (DataPltDto item : dataPltPOList) {
|
||||||
if (item.getPlt() > overlimit.getFlicker()) {
|
if (ObjectUtil.isNull(item.getPlt())||item.getPlt() > overlimit.getFlicker()) {
|
||||||
addAbnormalData(flicker, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getPlt(), overlimit.getFlicker());
|
addAbnormalData(flicker, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getPlt(), overlimit.getFlicker());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -799,7 +799,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//电压总谐波畸变率
|
//电压总谐波畸变率
|
||||||
if (!CollectionUtils.isEmpty(dataVPOList)) {
|
if (!CollectionUtils.isEmpty(dataVPOList)) {
|
||||||
for (DataVDto item : dataVPOList) {
|
for (DataVDto item : dataVPOList) {
|
||||||
if (item.getVThd() > overlimit.getUaberrance()) {
|
if (ObjectUtil.isNull(item.getVThd())||item.getVThd() > overlimit.getUaberrance()) {
|
||||||
addAbnormalData(thd, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVThd(), overlimit.getUaberrance());
|
addAbnormalData(thd, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVThd(), overlimit.getUaberrance());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -807,7 +807,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//三相电压不平衡度
|
//三相电压不平衡度
|
||||||
if (!CollectionUtils.isEmpty(dataVPOUnbalanceList)) {
|
if (!CollectionUtils.isEmpty(dataVPOUnbalanceList)) {
|
||||||
for (DataVDto item : dataVPOUnbalanceList) {
|
for (DataVDto item : dataVPOUnbalanceList) {
|
||||||
if (item.getVUnbalance() > overlimit.getUbalance()) {
|
if (ObjectUtil.isNull(item.getVUnbalance())||item.getVUnbalance() > overlimit.getUbalance()) {
|
||||||
addAbnormalData(uAberrance, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVUnbalance(), overlimit.getUbalance());
|
addAbnormalData(uAberrance, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getVUnbalance(), overlimit.getUbalance());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -829,7 +829,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//频率偏差
|
//频率偏差
|
||||||
if (!CollectionUtils.isEmpty(dataVPOFreqList)) {
|
if (!CollectionUtils.isEmpty(dataVPOFreqList)) {
|
||||||
for (DataVDto item : dataVPOFreqList) {
|
for (DataVDto item : dataVPOFreqList) {
|
||||||
if (item.getFreqDev() > overlimit.getFreqDev()) {
|
if (ObjectUtil.isNull(item.getFreqDev())||item.getFreqDev() > overlimit.getFreqDev()) {
|
||||||
addAbnormalData(freqDev, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getFreqDev(), overlimit.getFreqDev());
|
addAbnormalData(freqDev, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getFreqDev(), overlimit.getFreqDev());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -838,7 +838,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//电压上偏差、电压下偏差(根据vl_dev的正负判断是用哪个值判断越限)
|
//电压上偏差、电压下偏差(根据vl_dev的正负判断是用哪个值判断越限)
|
||||||
if (!CollectionUtils.isEmpty(dataVPODevList)) {
|
if (!CollectionUtils.isEmpty(dataVPODevList)) {
|
||||||
for (DataVDto item : dataVPODevList) {
|
for (DataVDto item : dataVPODevList) {
|
||||||
if (item.getVlDev() >= 0) {
|
if (ObjectUtil.isNull(item.getVlDev())||item.getVlDev() >= 0) {
|
||||||
if (item.getVlDev() > overlimit.getVoltageDev()) {
|
if (item.getVlDev() > overlimit.getVoltageDev()) {
|
||||||
addAbnormalData(uDev, item.getPhasicType(), item.getMinTime(), item.getValueType(), item.getVlDev(), overlimit.getVoltageDev());
|
addAbnormalData(uDev, item.getPhasicType(), item.getMinTime(), item.getValueType(), item.getVlDev(), overlimit.getVoltageDev());
|
||||||
}
|
}
|
||||||
@@ -852,7 +852,7 @@ public class IDataCrossingServiceImpl implements IDataCrossingService {
|
|||||||
//长时间闪变
|
//长时间闪变
|
||||||
if (!CollectionUtils.isEmpty(dataPltPOList)) {
|
if (!CollectionUtils.isEmpty(dataPltPOList)) {
|
||||||
for (DataPltDto item : dataPltPOList) {
|
for (DataPltDto item : dataPltPOList) {
|
||||||
if (item.getPlt() > overlimit.getFlicker()) {
|
if (ObjectUtil.isNull(item.getPlt())||item.getPlt() > overlimit.getFlicker()) {
|
||||||
addAbnormalData(flicker, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getPlt(), overlimit.getFlicker());
|
addAbnormalData(flicker, item.getPhasicType(), item.getValueType(), item.getMinTime(), item.getPlt(), overlimit.getFlicker());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,9 +93,10 @@ public class PollutionCalcImpl implements IPollutionCalc {
|
|||||||
if (CollUtil.isNotEmpty(dataVDtoList) && CollUtil.isNotEmpty(dataHarmDtoList)) {
|
if (CollUtil.isNotEmpty(dataVDtoList) && CollUtil.isNotEmpty(dataHarmDtoList)) {
|
||||||
//计算谐波电压污染值
|
//计算谐波电压污染值
|
||||||
dataPollutionD.setValue(PubUtils.doubleRound(2, calcVAllPollutionValue(dataVDtoList, dataHarmDtoList, overlimit) * line.getTimeInterval()));
|
dataPollutionD.setValue(PubUtils.doubleRound(2, calcVAllPollutionValue(dataVDtoList, dataHarmDtoList, overlimit) * line.getTimeInterval()));
|
||||||
list.add(dataPollutionD);
|
}else{
|
||||||
|
dataPollutionD.setValue(0.0);
|
||||||
}
|
}
|
||||||
|
list.add(dataPollutionD);
|
||||||
dataPollutionD = new DataPollutionD();
|
dataPollutionD = new DataPollutionD();
|
||||||
dataPollutionD.setLineId(id);
|
dataPollutionD.setLineId(id);
|
||||||
dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()));
|
dataPollutionD.setDataDate(LocalDateTimeUtil.parseDate(calculatedParam.getDataDate()));
|
||||||
@@ -104,9 +105,10 @@ public class PollutionCalcImpl implements IPollutionCalc {
|
|||||||
if (CollUtil.isNotEmpty(data)) {
|
if (CollUtil.isNotEmpty(data)) {
|
||||||
//计算谐波电流污染值
|
//计算谐波电流污染值
|
||||||
dataPollutionD.setValue(PubUtils.doubleRound(2, calcIAllPollutionValue(data, overlimit) * line.getTimeInterval()));
|
dataPollutionD.setValue(PubUtils.doubleRound(2, calcIAllPollutionValue(data, overlimit) * line.getTimeInterval()));
|
||||||
list.add(dataPollutionD);
|
}else{
|
||||||
|
dataPollutionD.setValue(0.0);
|
||||||
}
|
}
|
||||||
|
list.add(dataPollutionD);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import com.njcn.common.pojo.response.HttpResult;
|
|||||||
import com.njcn.dataProcess.api.fallback.DataIFeignClientFallbackFactory;
|
import com.njcn.dataProcess.api.fallback.DataIFeignClientFallbackFactory;
|
||||||
import com.njcn.dataProcess.api.fallback.DataRecallFeignClientFallbackFactory;
|
import com.njcn.dataProcess.api.fallback.DataRecallFeignClientFallbackFactory;
|
||||||
import com.njcn.dataProcess.dto.DataIDTO;
|
import com.njcn.dataProcess.dto.DataIDTO;
|
||||||
|
import com.njcn.dataProcess.param.FullRecallMessage;
|
||||||
import com.njcn.dataProcess.param.LineCountEvaluateParam;
|
import com.njcn.dataProcess.param.LineCountEvaluateParam;
|
||||||
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
|
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
|
||||||
import com.njcn.dataProcess.pojo.dto.DataIDto;
|
import com.njcn.dataProcess.pojo.dto.DataIDto;
|
||||||
@@ -32,4 +33,10 @@ public interface DataRecallFeignClient {
|
|||||||
@ApiOperation("数据补招")
|
@ApiOperation("数据补招")
|
||||||
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
||||||
public HttpResult<List<String>> recall(@RequestBody RecallMessage param);
|
public HttpResult<List<String>> recall(@RequestBody RecallMessage param);
|
||||||
|
|
||||||
|
@PostMapping("/fullHourRecall")
|
||||||
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
|
@ApiOperation("数据全量补招")
|
||||||
|
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
||||||
|
public HttpResult<List<String>> fullHourRecall(@RequestBody FullRecallMessage param);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException;
|
|||||||
import com.njcn.common.pojo.response.HttpResult;
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
import com.njcn.dataProcess.api.DataRecallFeignClient;
|
import com.njcn.dataProcess.api.DataRecallFeignClient;
|
||||||
|
|
||||||
|
import com.njcn.dataProcess.param.FullRecallMessage;
|
||||||
import com.njcn.dataProcess.util.DataProcessingEnumUtil;
|
import com.njcn.dataProcess.util.DataProcessingEnumUtil;
|
||||||
import com.njcn.message.message.RecallMessage;
|
import com.njcn.message.message.RecallMessage;
|
||||||
import feign.hystrix.FallbackFactory;
|
import feign.hystrix.FallbackFactory;
|
||||||
@@ -44,6 +45,12 @@ public class DataRecallFeignClientFallbackFactory implements FallbackFactory<Dat
|
|||||||
log.error("{}异常,降级处理,异常为:{}","补招接口调用异常",cause.toString());
|
log.error("{}异常,降级处理,异常为:{}","补招接口调用异常",cause.toString());
|
||||||
throw new BusinessException(finalExceptionEnum);
|
throw new BusinessException(finalExceptionEnum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpResult<List<String>> fullHourRecall(FullRecallMessage param) {
|
||||||
|
log.error("{}异常,降级处理,异常为:{}","按小时补招接口调用异常",cause.toString());
|
||||||
|
throw new BusinessException(finalExceptionEnum);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,23 @@
|
|||||||
|
package com.njcn.dataProcess.dto;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2025/08/25 下午 2:42【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class RecallReplyDTO {
|
||||||
|
//code 200 完成,500错误;300补招进行中
|
||||||
|
private Integer code;
|
||||||
|
private String message;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -79,6 +79,17 @@
|
|||||||
<version>1.0.0</version>
|
<version>1.0.0</version>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||||
|
<version>2.7.12</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn.platform</groupId>
|
||||||
|
<artifactId>algorithm-api</artifactId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.njcn.dataProcess.controller;//package com.njcn.message.websocket;
|
package com.njcn.dataProcess.controller;//package com.njcn.message.websocket;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.date.DatePattern;
|
import cn.hutool.core.date.DatePattern;
|
||||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||||
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.IdUtil;
|
||||||
@@ -10,8 +11,12 @@ import com.njcn.common.pojo.exception.BusinessException;
|
|||||||
import com.njcn.common.pojo.response.HttpResult;
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
import com.njcn.common.utils.HttpResultUtil;
|
import com.njcn.common.utils.HttpResultUtil;
|
||||||
|
|
||||||
|
import com.njcn.dataProcess.annotation.InsertBean;
|
||||||
import com.njcn.dataProcess.annotation.QueryBean;
|
import com.njcn.dataProcess.annotation.QueryBean;
|
||||||
import com.njcn.dataProcess.param.FullRecallMessage;
|
import com.njcn.dataProcess.param.FullRecallMessage;
|
||||||
|
import com.njcn.dataProcess.param.LineCountEvaluateParam;
|
||||||
|
import com.njcn.dataProcess.pojo.dto.DataIntegrityDto;
|
||||||
|
import com.njcn.dataProcess.service.IDataIntegrity;
|
||||||
import com.njcn.dataProcess.service.IDataV;
|
import com.njcn.dataProcess.service.IDataV;
|
||||||
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
|
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
|
||||||
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
|
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
|
||||||
@@ -57,13 +62,14 @@ public class DataRecallController extends BaseController {
|
|||||||
private final ProduceFeignClient produceFeignClient;
|
private final ProduceFeignClient produceFeignClient;
|
||||||
private final DeviceFeignClient deviceFeignClient;
|
private final DeviceFeignClient deviceFeignClient;
|
||||||
|
|
||||||
|
@InsertBean
|
||||||
|
private IDataIntegrity iDataIntegrityInsert;
|
||||||
//页面补招按时间段带小时的全部补招不查datav数据去筛选
|
//页面补招按时间段带小时的全部补招不查datav数据去筛选
|
||||||
@PostMapping("/FullRecall")
|
@PostMapping("/FullRecall")
|
||||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
@ApiOperation("数据全量补招")
|
@ApiOperation("数据全量补招")
|
||||||
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
||||||
public HttpResult<List<String>> recall(@RequestBody FullRecallMessage param) {
|
public HttpResult<List<String>> FullRecall(@RequestBody FullRecallMessage param) {
|
||||||
String methodDescribe = getMethodDescribe("recall");
|
String methodDescribe = getMethodDescribe("recall");
|
||||||
List<String> guidList = new ArrayList<>();
|
List<String> guidList = new ArrayList<>();
|
||||||
List<String> runMonitorIds = new ArrayList<>();
|
List<String> runMonitorIds = new ArrayList<>();
|
||||||
@@ -80,16 +86,185 @@ public class DataRecallController extends BaseController {
|
|||||||
}else {
|
}else {
|
||||||
runMonitorIds = param.getMonitorId();
|
runMonitorIds = param.getMonitorId();
|
||||||
}
|
}
|
||||||
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
|
lineParam.setLineId(runMonitorIds);
|
||||||
|
//查看监测点的数据完整性
|
||||||
|
lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
|
||||||
|
lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
|
||||||
|
List<DataIntegrityDto> rawData = iDataIntegrityInsert.getRawData(lineParam);
|
||||||
|
Map<String, Map<String, List<DataIntegrityDto>>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
|
||||||
|
|
||||||
runMonitorIds.forEach(temp->{
|
runMonitorIds.forEach(temp->{
|
||||||
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
|
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
|
||||||
//后续根据不同前置下的测点发送补招密令
|
//后续根据不同前置下的测点发送补招密令
|
||||||
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
|
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
LocalDateTime currentDate = param.getReCallStartTime();
|
||||||
|
//循环每一天
|
||||||
|
while (!currentDate.isAfter(param.getReCallEndTime())) {
|
||||||
|
|
||||||
|
|
||||||
|
LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0);
|
||||||
|
//查看数据完整性,根据数据完整性进行补招;
|
||||||
|
//校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
|
||||||
|
Integer recallType = 1; //1全量补 2:差量补 3:不需要补招
|
||||||
|
String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
|
||||||
|
if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
|
||||||
|
DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
|
||||||
|
if( dto.getDueTime()!=0){
|
||||||
|
double i =(double)dto.getRealTime() / dto.getDueTime();
|
||||||
|
if( i>=0.98){
|
||||||
|
recallType=3;
|
||||||
|
} else if(i<0.98&&i>0.8){
|
||||||
|
recallType=2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
if(recallType ==3){
|
||||||
|
currentDate = tempTime;
|
||||||
|
continue;
|
||||||
|
}else if(recallType ==2){
|
||||||
|
Integer timeInterval = data.getTimeInterval();
|
||||||
|
List<LocalDateTime> localDateTimeList = generateTimeIntervals( currentDate.toLocalDate(), timeInterval);
|
||||||
|
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN));
|
||||||
|
localDateTimeList.removeAll(data1);
|
||||||
|
if(!CollectionUtils.isEmpty(localDateTimeList)){
|
||||||
|
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
|
||||||
|
//最大时间段为300
|
||||||
|
if(timePeriod.size()<300){
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
recallDTO.setDataType("");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
recallDTO.setTimeInterval(timePeriod);
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
}else {
|
||||||
|
List<List<String>> timePeriods = CollUtil.split(timePeriod, 300);
|
||||||
|
timePeriods.forEach(period->{
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
recallDTO.setDataType("");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
recallDTO.setTimeInterval(period);
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}else {
|
||||||
|
//暂态补招
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
//不设置dataType暂态稳态全部补招
|
||||||
|
recallDTO.setDataType("");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
if(tempTime.isAfter(param.getReCallEndTime())){
|
||||||
|
String eventTime = formatInterval(currentDate,param.getReCallEndTime());
|
||||||
|
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
}else {
|
||||||
|
String eventTime = formatInterval(currentDate,tempTime);
|
||||||
|
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
currentDate = tempTime;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
});
|
||||||
|
if(!CollectionUtils.isEmpty(recallDTOList)){
|
||||||
|
Map<String, List<RecallMessage.RecallDTO>> collect = recallDTOList.stream().collect(Collectors.groupingBy(RecallMessage.RecallDTO::getNodeId));
|
||||||
|
|
||||||
|
collect.forEach((k,v)->{
|
||||||
|
RecallMessage message = new RecallMessage();
|
||||||
|
message.setNodeId(k);
|
||||||
|
message.setData(v);
|
||||||
|
String guid = IdUtil.simpleUUID();
|
||||||
|
message.setGuid(guid);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
produceFeignClient.recall(message);
|
||||||
|
guidList.add(guid);
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, guidList, methodDescribe);
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostMapping("/fullHourRecall")
|
||||||
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
|
@ApiOperation("按小时数据补招")
|
||||||
|
@ApiImplicitParam(name = "param", value = "参数", required = true)
|
||||||
|
public HttpResult<List<String>> fullHourRecall(@RequestBody FullRecallMessage param){
|
||||||
|
String methodDescribe = getMethodDescribe("recall");
|
||||||
|
List<String> guidList = new ArrayList<>();
|
||||||
|
List<String> runMonitorIds = new ArrayList<>();
|
||||||
|
List<RecallMessage.RecallDTO> recallDTOList = new ArrayList<>();
|
||||||
|
// Duration duration = Duration.between(param.getReCallStartTime(), param.getReCallEndTime());
|
||||||
|
// // 获取剩余的分钟数,如果要精确到分钟可以这样做
|
||||||
|
// long minutes = duration.toMinutes() ;
|
||||||
|
// if(minutes>60){
|
||||||
|
// throw new BusinessException("全量补招时间区间不能超过一个小时");
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
if(CollectionUtils.isEmpty(param.getMonitorId())){
|
||||||
|
runMonitorIds = commTerminalGeneralClient.getRunMonitorIds().getData();
|
||||||
|
}else {
|
||||||
|
runMonitorIds = param.getMonitorId();
|
||||||
|
}
|
||||||
|
|
||||||
|
runMonitorIds.forEach(temp->{
|
||||||
|
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
|
||||||
|
//后续根据不同前置下的测点发送补招密令
|
||||||
|
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
|
||||||
|
Integer timeInterval = data.getTimeInterval();
|
||||||
|
|
||||||
|
List<LocalDateTime> localDateTimeList = generateTimeIntervals(param.getReCallStartTime(),param.getReCallEndTime(), timeInterval);
|
||||||
|
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(param.getReCallStartTime().toLocalDate(), DatePattern.NORM_DATE_PATTERN));
|
||||||
|
List<LocalDateTime> tempTime = data1.stream().filter(dateTime -> (!dateTime.isAfter(param.getReCallEndTime())) &&
|
||||||
|
(!dateTime.isBefore(param.getReCallStartTime()))).collect(Collectors.toList());
|
||||||
|
localDateTimeList.removeAll(tempTime);
|
||||||
|
if(!CollectionUtils.isEmpty(localDateTimeList)){
|
||||||
|
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
|
||||||
|
//最大时间段为300
|
||||||
|
if(timePeriod.size()<300){
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
recallDTO.setDataType("0");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
recallDTO.setTimeInterval(timePeriod);
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
}else {
|
||||||
|
List<List<String>> timePeriods = CollUtil.split(timePeriod, 300);
|
||||||
|
timePeriods.forEach(period->{
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
recallDTO.setDataType("0");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
recallDTO.setTimeInterval(period);
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
//暂态补招
|
//暂态补招
|
||||||
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
//不设置dataType暂态稳态全部补招
|
//不设置dataType暂态稳态全部补招
|
||||||
recallDTO.setDataType("");
|
recallDTO.setDataType("1");
|
||||||
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
String eventTime = formatInterval(param.getReCallStartTime(),param.getReCallEndTime());
|
String eventTime = formatInterval(param.getReCallStartTime(),param.getReCallEndTime());
|
||||||
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
|
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
|
||||||
@@ -117,7 +292,6 @@ public class DataRecallController extends BaseController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@PostMapping("/recall")
|
@PostMapping("/recall")
|
||||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
@ApiOperation("数据补招")
|
@ApiOperation("数据补招")
|
||||||
@@ -134,6 +308,14 @@ public class DataRecallController extends BaseController {
|
|||||||
}else {
|
}else {
|
||||||
runMonitorIds = param.getMonitorId();
|
runMonitorIds = param.getMonitorId();
|
||||||
}
|
}
|
||||||
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
|
lineParam.setLineId(runMonitorIds);
|
||||||
|
//查看监测点的数据完整性
|
||||||
|
lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
|
||||||
|
lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
|
||||||
|
List<DataIntegrityDto> rawData = iDataIntegrityInsert.getRawData(lineParam);
|
||||||
|
Map<String, Map<String, List<DataIntegrityDto>>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
|
||||||
|
|
||||||
LocalDate currentDate = param.getReCallStartTime();
|
LocalDate currentDate = param.getReCallStartTime();
|
||||||
//循环每一天
|
//循环每一天
|
||||||
while (!currentDate.isAfter(param.getReCallEndTime())) {
|
while (!currentDate.isAfter(param.getReCallEndTime())) {
|
||||||
@@ -143,19 +325,72 @@ public class DataRecallController extends BaseController {
|
|||||||
//后续根据不同前置下的测点发送补招密令
|
//后续根据不同前置下的测点发送补招密令
|
||||||
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
|
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
|
||||||
|
|
||||||
Integer timeInterval = data.getTimeInterval();
|
//查看数据完整性,根据数据完整性进行补招;
|
||||||
List<LocalDateTime> localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval);
|
//校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
|
||||||
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN));
|
Integer recallType = 1; //1全量补 2:差量补 3:不需要补招
|
||||||
localDateTimeList.removeAll(data1);
|
String curDateString = LocalDateTimeUtil.format(finalCurrentDate.atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
|
||||||
if(!CollectionUtils.isEmpty(localDateTimeList)){
|
if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
|
||||||
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
|
DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
|
||||||
|
if( dto.getDueTime()!=0){
|
||||||
|
double i = (double)dto.getRealTime() / dto.getDueTime();
|
||||||
|
if( i>=0.98){
|
||||||
|
recallType=3;
|
||||||
|
} else if(i<0.98&&i>0.8){
|
||||||
|
recallType=2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if(recallType ==3){
|
||||||
|
return;
|
||||||
|
}else if(recallType ==2){
|
||||||
|
Integer timeInterval = data.getTimeInterval();
|
||||||
|
List<LocalDateTime> localDateTimeList = generateTimeIntervals(finalCurrentDate, timeInterval);
|
||||||
|
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format(finalCurrentDate, DatePattern.NORM_DATE_PATTERN));
|
||||||
|
localDateTimeList.removeAll(data1);
|
||||||
|
if(!CollectionUtils.isEmpty(localDateTimeList)){
|
||||||
|
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
|
||||||
|
//最大时间段为300
|
||||||
|
if(timePeriod.size()<300){
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
recallDTO.setDataType("0");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
recallDTO.setTimeInterval(timePeriod);
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
}else {
|
||||||
|
List<List<String>> timePeriods = CollUtil.split(timePeriod, 300);
|
||||||
|
timePeriods.forEach(period->{
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
recallDTO.setDataType("0");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
recallDTO.setTimeInterval(period);
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
recallDTOList.add(recallDTO);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}else {
|
||||||
|
//暂态补招
|
||||||
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
//不设置dataType暂态稳态全部补招
|
||||||
recallDTO.setDataType("0");
|
recallDTO.setDataType("0");
|
||||||
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
recallDTO.setTimeInterval(timePeriod);
|
|
||||||
|
String eventTime = formatInterval(finalCurrentDate.atTime(0,0,0),finalCurrentDate.atTime(23,59,0));
|
||||||
|
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
|
||||||
recallDTO.setNodeId(data2.getNodeId());
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
recallDTOList.add(recallDTO);
|
recallDTOList.add(recallDTO);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//暂态补招
|
//暂态补招
|
||||||
RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO();
|
RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO();
|
||||||
recallDTO2.setDataType("1");
|
recallDTO2.setDataType("1");
|
||||||
@@ -177,10 +412,10 @@ public class DataRecallController extends BaseController {
|
|||||||
RecallMessage message = new RecallMessage();
|
RecallMessage message = new RecallMessage();
|
||||||
message.setNodeId(k);
|
message.setNodeId(k);
|
||||||
message.setData(v);
|
message.setData(v);
|
||||||
String guid = IdUtil.simpleUUID();
|
String guid = IdUtil.simpleUUID();
|
||||||
message.setGuid(guid);
|
message.setGuid(guid);
|
||||||
produceFeignClient.recall(message);
|
produceFeignClient.recall(message);
|
||||||
guidList.add(guid);
|
guidList.add(guid);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
@@ -225,6 +460,22 @@ public class DataRecallController extends BaseController {
|
|||||||
return dateTimeList;
|
return dateTimeList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<LocalDateTime> generateTimeIntervals(LocalDateTime startDateTime,LocalDateTime endDateTime , int intervalMinutes) {
|
||||||
|
List<LocalDateTime> dateTimeList = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// Generate LocalDateTime list with the given interval
|
||||||
|
LocalDateTime currentDateTime = startDateTime;
|
||||||
|
while (!currentDateTime.isAfter(endDateTime)) {
|
||||||
|
dateTimeList.add(currentDateTime);
|
||||||
|
currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dateTimeList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static List<String> mergeTimeIntervals(List<LocalDateTime> times, int intervalMinutes) {
|
public static List<String> mergeTimeIntervals(List<LocalDateTime> times, int intervalMinutes) {
|
||||||
List<String> mergedIntervals = new ArrayList<>();
|
List<String> mergedIntervals = new ArrayList<>();
|
||||||
if (times == null || times.isEmpty()) {
|
if (times == null || times.isEmpty()) {
|
||||||
@@ -259,10 +510,18 @@ public class DataRecallController extends BaseController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
// 创建两个LocalDateTime对象
|
LocalDate localDate1 = LocalDate.now();
|
||||||
LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0); // 例如:2023年1月1日 10:00
|
LocalDate localDate2 = LocalDate.now();
|
||||||
LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 2, 15, 30); // 例如:2023年1月1日 15:30
|
while (!localDate1.isAfter(localDate2)) {
|
||||||
|
LocalDate finalCurrentDate = localDate1;
|
||||||
|
System.out.println(finalCurrentDate);
|
||||||
|
localDate1 = localDate1.plusDays(1);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建两个LocalDateTime对象
|
||||||
|
LocalDateTime dateTime1 = LocalDateTime.of(2023, 1, 1, 10, 0,11); // 例如:2023年1月1日 10:00
|
||||||
|
LocalDateTime dateTime2 = LocalDateTime.of(2023, 1, 1, 10, 30,22); // 例如:2023年1月1日 15:30
|
||||||
// 使用Duration计算两个时间之间的差异
|
// 使用Duration计算两个时间之间的差异
|
||||||
Duration duration = Duration.between(dateTime1, dateTime2);
|
Duration duration = Duration.between(dateTime1, dateTime2);
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,592 @@
|
|||||||
|
package com.njcn.dataProcess.websocket;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import cn.hutool.core.date.DatePattern;
|
||||||
|
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||||
|
import cn.hutool.core.util.IdUtil;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
|
||||||
|
import com.njcn.algorithm.pojo.bo.BaseParam;
|
||||||
|
import com.njcn.algorithm.pojo.liteflow.LiteFlowAlgorithmFeignClient;
|
||||||
|
import com.njcn.dataProcess.annotation.InsertBean;
|
||||||
|
import com.njcn.dataProcess.annotation.QueryBean;
|
||||||
|
import com.njcn.dataProcess.dto.RecallReplyDTO;
|
||||||
|
import com.njcn.dataProcess.param.FullRecallMessage;
|
||||||
|
import com.njcn.dataProcess.param.LineCountEvaluateParam;
|
||||||
|
import com.njcn.dataProcess.pojo.dto.DataIntegrityDto;
|
||||||
|
import com.njcn.dataProcess.service.IDataIntegrity;
|
||||||
|
import com.njcn.dataProcess.service.IDataV;
|
||||||
|
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
|
||||||
|
import com.njcn.device.biz.pojo.dto.LineDevGetDTO;
|
||||||
|
import com.njcn.device.pq.api.DeviceFeignClient;
|
||||||
|
import com.njcn.device.pq.pojo.dto.DeviceDTO;
|
||||||
|
import com.njcn.message.api.ProduceFeignClient;
|
||||||
|
import com.njcn.message.constant.RedisKeyPrefix;
|
||||||
|
import com.njcn.message.message.RecallMessage;
|
||||||
|
import com.njcn.message.messagedto.TopicReplyDTO;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import javax.websocket.*;
|
||||||
|
import javax.websocket.server.PathParam;
|
||||||
|
import javax.websocket.server.ServerEndpoint;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.LocalTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/12/13 15:11【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@ServerEndpoint(value ="/api/recell/{userId}")
|
||||||
|
public class RecallWebSocketServer {
|
||||||
|
@QueryBean
|
||||||
|
private static IDataV dataVQuery;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("InfluxdbDataVImpl")
|
||||||
|
public void setDataVQuery( IDataV dataVQuery) {
|
||||||
|
RecallWebSocketServer.dataVQuery = dataVQuery;
|
||||||
|
}
|
||||||
|
private static CommTerminalGeneralClient commTerminalGeneralClient;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public void setCommTerminalGeneralClient(CommTerminalGeneralClient commTerminalGeneralClient) {
|
||||||
|
RecallWebSocketServer.commTerminalGeneralClient = commTerminalGeneralClient;
|
||||||
|
}
|
||||||
|
private static ProduceFeignClient produceFeignClient;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public void setProduceFeignClient(ProduceFeignClient produceFeignClient) {
|
||||||
|
RecallWebSocketServer.produceFeignClient = produceFeignClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
@InsertBean
|
||||||
|
private static IDataIntegrity iDataIntegrityInsert;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("RelationDataIntegrityImpl")
|
||||||
|
public void setiDataIntegrityInsert(IDataIntegrity iDataIntegrityInsert) {
|
||||||
|
RecallWebSocketServer.iDataIntegrityInsert = iDataIntegrityInsert;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public void setLiteFlowAlgorithmFeignClient(LiteFlowAlgorithmFeignClient liteFlowAlgorithmFeignClient) {
|
||||||
|
RecallWebSocketServer.liteFlowAlgorithmFeignClient = liteFlowAlgorithmFeignClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RedisUtil redisUtil;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public void setRedisUtil( RedisUtil redisUtil) {
|
||||||
|
RecallWebSocketServer.redisUtil = redisUtil;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static DeviceFeignClient deviceFeignClient;
|
||||||
|
|
||||||
|
// 注入的时候,给类的 service 注入
|
||||||
|
@Autowired
|
||||||
|
public void setLineFeignClient(DeviceFeignClient deviceFeignClient) {
|
||||||
|
RecallWebSocketServer.deviceFeignClient = deviceFeignClient;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
|
||||||
|
*/
|
||||||
|
private static int onlineCount = 0;
|
||||||
|
/**
|
||||||
|
* concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
|
||||||
|
*/
|
||||||
|
private static ConcurrentHashMap<String, RecallWebSocketServer> webSocketMap = new ConcurrentHashMap<>();
|
||||||
|
/**
|
||||||
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
||||||
|
*/
|
||||||
|
private Session session;
|
||||||
|
/**
|
||||||
|
* 接收userId
|
||||||
|
*/
|
||||||
|
private String userId = "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接建立成
|
||||||
|
* 功调用的方法
|
||||||
|
*/
|
||||||
|
@OnOpen
|
||||||
|
public void onOpen(Session session, @PathParam("userId") String userId) {
|
||||||
|
//lineId 是 userid+","+lineId+","+Devid
|
||||||
|
this.session = session;
|
||||||
|
this.userId = userId;
|
||||||
|
if (webSocketMap.containsKey(userId)) {
|
||||||
|
webSocketMap.remove(userId);
|
||||||
|
//加入set中
|
||||||
|
webSocketMap.put(userId, this);
|
||||||
|
} else {
|
||||||
|
//加入set中
|
||||||
|
webSocketMap.put(userId, this);
|
||||||
|
//在线数加1
|
||||||
|
addOnlineCount();
|
||||||
|
}
|
||||||
|
sendMessage("连接成功");
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接关闭
|
||||||
|
* 调用的方法
|
||||||
|
*/
|
||||||
|
@OnClose
|
||||||
|
public void onClose() {
|
||||||
|
if (webSocketMap.containsKey(userId)) {
|
||||||
|
webSocketMap.remove(userId);
|
||||||
|
//从set中删除
|
||||||
|
subOnlineCount();
|
||||||
|
}
|
||||||
|
log.info("用户退出:" + userId + ",当前在线监测点数为:" + getOnlineCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 收到客户端消
|
||||||
|
* 息后调用的方法
|
||||||
|
*
|
||||||
|
* @param message 客户端发送过来的消息
|
||||||
|
**/
|
||||||
|
@OnMessage
|
||||||
|
public void onMessage(String message, Session session) {
|
||||||
|
//会每30s发送请求1次
|
||||||
|
log.info("监测点消息:" + userId + ",报文:" + message);
|
||||||
|
if(Objects.equals(message,"alive")){
|
||||||
|
sendInfo("connect");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
FullRecallMessage param = JSONUtil.toBean(message,FullRecallMessage.class,true);
|
||||||
|
|
||||||
|
if(Objects.isNull(message)){
|
||||||
|
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(500,"参数有误");
|
||||||
|
sendInfo(JSONObject.toJSONString(recallReplyDTO));
|
||||||
|
}else {
|
||||||
|
List<String> runMonitorIds = param.getMonitorId();
|
||||||
|
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
|
||||||
|
lineParam.setLineId(runMonitorIds);
|
||||||
|
//查看监测点的数据完整性
|
||||||
|
lineParam.setStartTime( LocalDateTimeUtil.format( param.getReCallStartTime(), DatePattern.NORM_DATETIME_FORMATTER));
|
||||||
|
lineParam.setEndTime(LocalDateTimeUtil.format( param.getReCallEndTime(), DatePattern.NORM_DATETIME_FORMATTER));
|
||||||
|
List<DataIntegrityDto> rawData = iDataIntegrityInsert.getRawData(lineParam);
|
||||||
|
Map<String, Map<String, List<DataIntegrityDto>>> collect1 = rawData.stream().collect(Collectors.groupingBy(DataIntegrityDto::getLineIndex, Collectors.groupingBy(DataIntegrityDto::getTimeId)));
|
||||||
|
List<BaseParam> bsParmList = new ArrayList<>();
|
||||||
|
runMonitorIds.forEach(temp->{
|
||||||
|
LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
|
||||||
|
//后续根据不同前置下的测点发送补招密令
|
||||||
|
DeviceDTO data2 = deviceFeignClient.getDeviceInfo(data.getDevId()).getData();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
LocalDateTime currentDate = param.getReCallStartTime();
|
||||||
|
//循环每一天
|
||||||
|
while (!currentDate.isAfter(param.getReCallEndTime())) {
|
||||||
|
|
||||||
|
String key ="";
|
||||||
|
LocalDateTime tempTime = currentDate.toLocalDate().plusDays(1).atTime(0,0,0);
|
||||||
|
//查看数据完整性,根据数据完整性进行补招;
|
||||||
|
//校验数据完整性完整率>=98%不补招完整率80%~98%,差量补招,完整率<=80%,全量补招
|
||||||
|
Integer recallType = 1; //1全量补 2:差量补 3:不需要补招
|
||||||
|
String curDateString = LocalDateTimeUtil.format(currentDate.toLocalDate().atTime(0, 0, 0), DatePattern.NORM_DATETIME_FORMATTER);
|
||||||
|
if(collect1.containsKey(temp)&&collect1.get(temp).containsKey(curDateString)){
|
||||||
|
DataIntegrityDto dto = collect1.get(temp).get(curDateString).get(0);
|
||||||
|
if( dto.getDueTime()!=0){
|
||||||
|
double i = (double)dto.getRealTime() / dto.getDueTime();
|
||||||
|
if( i>=0.98){
|
||||||
|
recallType=3;
|
||||||
|
} else if(i<0.98&&i>0.8){
|
||||||
|
recallType=2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// 确定补招时间段
|
||||||
|
LocalDateTime toDayBeginTime,toDayEndTime;
|
||||||
|
if(tempTime.isAfter(param.getReCallEndTime())){
|
||||||
|
toDayBeginTime =currentDate;
|
||||||
|
toDayEndTime = param.getReCallEndTime();
|
||||||
|
|
||||||
|
}else {
|
||||||
|
toDayBeginTime =currentDate;
|
||||||
|
toDayEndTime = tempTime;
|
||||||
|
|
||||||
|
}
|
||||||
|
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"开始补招");
|
||||||
|
|
||||||
|
sendInfo(JSONObject.toJSONString(recallReplyDTO));
|
||||||
|
//暂态补招
|
||||||
|
RecallMessage.RecallDTO recallDTO2 = new RecallMessage.RecallDTO();
|
||||||
|
recallDTO2.setDataType("1");
|
||||||
|
recallDTO2.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
String tempTimeInterval = formatInterval(toDayBeginTime, toDayEndTime);
|
||||||
|
recallDTO2.setTimeInterval(Stream.of(tempTimeInterval).collect(Collectors.toList()));
|
||||||
|
recallDTO2.setNodeId(data2.getNodeId());
|
||||||
|
|
||||||
|
RecallMessage eventMessage = new RecallMessage();
|
||||||
|
eventMessage.setNodeId(data2.getNodeId());
|
||||||
|
eventMessage.setData(Stream.of(recallDTO2).collect(Collectors.toList()));
|
||||||
|
String guid1 = IdUtil.simpleUUID();
|
||||||
|
eventMessage.setGuid(guid1);
|
||||||
|
produceFeignClient.recall(eventMessage);
|
||||||
|
|
||||||
|
if(recallType ==3){
|
||||||
|
|
||||||
|
}else if(recallType ==2){
|
||||||
|
Integer timeInterval = data.getTimeInterval();
|
||||||
|
List<LocalDateTime> localDateTimeList = generateTimeIntervals( toDayBeginTime,toDayBeginTime, timeInterval);
|
||||||
|
List<LocalDateTime> data1 = dataVQuery.monitoringTime(temp, LocalDateTimeUtil.format( currentDate.toLocalDate(), DatePattern.NORM_DATE_PATTERN));
|
||||||
|
localDateTimeList.removeAll(data1);
|
||||||
|
if(!CollectionUtils.isEmpty(localDateTimeList)){
|
||||||
|
List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
//
|
||||||
|
recallDTO.setDataType("0");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
recallDTO.setTimeInterval(timePeriod);
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
RecallMessage recallMessage = new RecallMessage();
|
||||||
|
recallMessage.setNodeId(data2.getNodeId());
|
||||||
|
recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
|
||||||
|
String guid = IdUtil.simpleUUID();
|
||||||
|
recallMessage.setGuid(guid);
|
||||||
|
|
||||||
|
produceFeignClient.recall(recallMessage);
|
||||||
|
// boolean flag =true;
|
||||||
|
// LocalDateTime beginTaskTime = LocalDateTime.now();
|
||||||
|
// while (flag){
|
||||||
|
// if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
|
||||||
|
// key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
//
|
||||||
|
// String jsonString =redisUtil.getStringByKey(key);
|
||||||
|
// if(Objects.nonNull(jsonString)){
|
||||||
|
// TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
|
||||||
|
// redisUtil.delete(key);
|
||||||
|
// flag =false;
|
||||||
|
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
|
||||||
|
//
|
||||||
|
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
//
|
||||||
|
// BaseParam baseParam = new BaseParam();
|
||||||
|
// baseParam.setFullChain(false);
|
||||||
|
// baseParam.setRepair(false);
|
||||||
|
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
|
||||||
|
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
// bsParmList.add(baseParam);
|
||||||
|
// }
|
||||||
|
// }else {
|
||||||
|
// flag =false;
|
||||||
|
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时");
|
||||||
|
//
|
||||||
|
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
//
|
||||||
|
// BaseParam baseParam = new BaseParam();
|
||||||
|
// baseParam.setFullChain(false);
|
||||||
|
// baseParam.setRepair(false);
|
||||||
|
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
|
||||||
|
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
// bsParmList.add(baseParam);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}else {
|
||||||
|
//暂态补招
|
||||||
|
RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
|
||||||
|
//不设置dataType暂态稳态全部补招
|
||||||
|
recallDTO.setDataType("0");
|
||||||
|
recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
|
||||||
|
String eventTime = formatInterval(toDayBeginTime,toDayEndTime);
|
||||||
|
recallDTO.setTimeInterval(Stream.of(eventTime).collect(Collectors.toList()));
|
||||||
|
recallDTO.setNodeId(data2.getNodeId());
|
||||||
|
|
||||||
|
RecallMessage recallMessage = new RecallMessage();
|
||||||
|
recallMessage.setNodeId(data2.getNodeId());
|
||||||
|
recallMessage.setData(Stream.of(recallDTO).collect(Collectors.toList()));
|
||||||
|
String guid = IdUtil.simpleUUID();
|
||||||
|
recallMessage.setGuid(guid);
|
||||||
|
produceFeignClient.recall(recallMessage);
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// boolean flag =true;
|
||||||
|
// LocalDateTime beginTaskTime = LocalDateTime.now();
|
||||||
|
// while (flag){
|
||||||
|
// if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=5){
|
||||||
|
// key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
//
|
||||||
|
// String jsonString =redisUtil.getStringByKey(key);
|
||||||
|
// if(Objects.nonNull(jsonString)){
|
||||||
|
// TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
|
||||||
|
// redisUtil.delete(key);
|
||||||
|
//
|
||||||
|
// flag =false;
|
||||||
|
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
|
||||||
|
//
|
||||||
|
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
//
|
||||||
|
// BaseParam baseParam = new BaseParam();
|
||||||
|
// baseParam.setFullChain(false);
|
||||||
|
// baseParam.setRepair(false);
|
||||||
|
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
|
||||||
|
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
// bsParmList.add(baseParam);
|
||||||
|
// }
|
||||||
|
// }else {
|
||||||
|
// flag =false;
|
||||||
|
// RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"超时");
|
||||||
|
//
|
||||||
|
// sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
// BaseParam baseParam = new BaseParam();
|
||||||
|
// baseParam.setFullChain(false);
|
||||||
|
// baseParam.setRepair(false);
|
||||||
|
// baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
// baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
|
||||||
|
// baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
// bsParmList.add(baseParam);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
boolean flag =true;
|
||||||
|
LocalDateTime beginTaskTime = LocalDateTime.now();
|
||||||
|
while (flag){
|
||||||
|
if(Duration.between(beginTaskTime, LocalDateTime.now()).toMinutes()<=10){
|
||||||
|
key =RedisKeyPrefix.TOPIC_REPLY.concat(temp).concat(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
|
||||||
|
String jsonString =redisUtil.getStringByKey(key);
|
||||||
|
if(Objects.nonNull(jsonString)){
|
||||||
|
TopicReplyDTO bean = JSONUtil.toBean(jsonString, TopicReplyDTO.class, true);
|
||||||
|
redisUtil.delete(key);
|
||||||
|
flag =false;
|
||||||
|
RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,bean.getResult());
|
||||||
|
|
||||||
|
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
|
||||||
|
BaseParam baseParam = new BaseParam();
|
||||||
|
baseParam.setFullChain(false);
|
||||||
|
baseParam.setRepair(false);
|
||||||
|
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
|
||||||
|
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
bsParmList.add(baseParam);
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
flag =false;
|
||||||
|
RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"监测点:"+data.getPointName()+";日期:"+currentDate.toLocalDate()+"执行补招程序超时");
|
||||||
|
|
||||||
|
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
|
||||||
|
BaseParam baseParam = new BaseParam();
|
||||||
|
baseParam.setFullChain(false);
|
||||||
|
baseParam.setRepair(false);
|
||||||
|
baseParam.setDataDate(currentDate.toLocalDate().format(DatePattern.NORM_DATE_FORMATTER));
|
||||||
|
baseParam.setTagNames(Stream.of("dataIntegrity").collect(Collectors.toSet()));
|
||||||
|
baseParam.setIdList(Stream.of(temp).collect(Collectors.toList()));
|
||||||
|
bsParmList.add(baseParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
currentDate = tempTime;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
});
|
||||||
|
if(!CollectionUtils.isEmpty(bsParmList)){
|
||||||
|
RecallReplyDTO recallReplyDTO = new RecallReplyDTO(300,"开始执行数据完整性算法");
|
||||||
|
|
||||||
|
sendInfo(JSONObject.toJSONString(recallReplyDTO));
|
||||||
|
bsParmList.forEach(temp->{
|
||||||
|
liteFlowAlgorithmFeignClient.measurementPointExecutor(temp);
|
||||||
|
});
|
||||||
|
RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(300,"执行完成");
|
||||||
|
|
||||||
|
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
RecallReplyDTO recallReplyDTO2 = new RecallReplyDTO(200,"补招任务结束");
|
||||||
|
|
||||||
|
sendInfo(JSONObject.toJSONString(recallReplyDTO2));
|
||||||
|
}
|
||||||
|
public List<LocalDateTime> generateTimeIntervals(LocalDate date, int intervalMinutes) {
|
||||||
|
List<LocalDateTime> dateTimeList = new ArrayList<>();
|
||||||
|
|
||||||
|
// Create the starting LocalDateTime
|
||||||
|
LocalDateTime startDateTime = LocalDateTime.of(date, LocalTime.MIDNIGHT);
|
||||||
|
|
||||||
|
// Create the ending LocalDateTime
|
||||||
|
LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX);
|
||||||
|
|
||||||
|
// Generate LocalDateTime list with the given interval
|
||||||
|
LocalDateTime currentDateTime = startDateTime;
|
||||||
|
while (!currentDateTime.isAfter(endDateTime)) {
|
||||||
|
dateTimeList.add(currentDateTime);
|
||||||
|
currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dateTimeList;
|
||||||
|
}
|
||||||
|
public List<LocalDateTime> generateTimeIntervals(LocalDateTime startDateTime,LocalDateTime endDateTime , int intervalMinutes) {
|
||||||
|
List<LocalDateTime> dateTimeList = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// Generate LocalDateTime list with the given interval
|
||||||
|
LocalDateTime currentDateTime = startDateTime;
|
||||||
|
while (!currentDateTime.isAfter(endDateTime)) {
|
||||||
|
dateTimeList.add(currentDateTime);
|
||||||
|
currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dateTimeList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<String> mergeTimeIntervals(List<LocalDateTime> times, int intervalMinutes) {
|
||||||
|
List<String> mergedIntervals = new ArrayList<>();
|
||||||
|
if (times == null || times.isEmpty()) {
|
||||||
|
return mergedIntervals;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort the list to ensure the times are in order
|
||||||
|
times.sort(LocalDateTime::compareTo);
|
||||||
|
|
||||||
|
LocalDateTime start = times.get(0);
|
||||||
|
LocalDateTime end = start;
|
||||||
|
|
||||||
|
for (int i = 1; i < times.size(); i++) {
|
||||||
|
LocalDateTime current = times.get(i);
|
||||||
|
if (current.isAfter(end.plusMinutes(intervalMinutes))) {
|
||||||
|
// If the current time is more than interval minutes after the end, close the current interval
|
||||||
|
mergedIntervals.add(formatInterval(start, end));
|
||||||
|
start = current; // Start a new interval
|
||||||
|
}
|
||||||
|
end = current; // Update the end of the current interval
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the last interval
|
||||||
|
mergedIntervals.add(formatInterval(start, end));
|
||||||
|
|
||||||
|
return mergedIntervals;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String formatInterval(LocalDateTime start, LocalDateTime end) {
|
||||||
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||||
|
return start.format(formatter) + "~" + end.format(formatter);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param session
|
||||||
|
* @param error
|
||||||
|
*/
|
||||||
|
@OnError
|
||||||
|
public void onError(Session session, Throwable error) {
|
||||||
|
|
||||||
|
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
|
||||||
|
error.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 实现服务
|
||||||
|
* 器主动推送
|
||||||
|
*/
|
||||||
|
public void sendMessage(String message) {
|
||||||
|
try {
|
||||||
|
this.session.getBasicRemote().sendText(message);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送自定
|
||||||
|
* 义消息
|
||||||
|
**/
|
||||||
|
public static void sendInfo(String message) {
|
||||||
|
|
||||||
|
|
||||||
|
webSocketMap.forEach((k,v)->{
|
||||||
|
webSocketMap.get(k).sendMessage(message);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获得此时的
|
||||||
|
* 在线监测点
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static synchronized int getOnlineCount() {
|
||||||
|
return onlineCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 在线监测点
|
||||||
|
* 数加1
|
||||||
|
*/
|
||||||
|
public static synchronized void addOnlineCount() {
|
||||||
|
RecallWebSocketServer.onlineCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 在线监测点
|
||||||
|
* 数减1
|
||||||
|
*/
|
||||||
|
public static synchronized void subOnlineCount() {
|
||||||
|
RecallWebSocketServer.onlineCount--;
|
||||||
|
}
|
||||||
|
|
||||||
|
// /**
|
||||||
|
// * 过滤所有键包含指定字符串的条目
|
||||||
|
// * @param map 原始的Map
|
||||||
|
// * @param substring 要检查的子字符串
|
||||||
|
// * @return 过滤的Map
|
||||||
|
// */
|
||||||
|
// public static Map<String, String> filterMapByKey(ConcurrentHashMap<String, RecallWebSocketServer> map, String substring) {
|
||||||
|
// Map<String, String> result = new HashMap<>();
|
||||||
|
// for (Map.Entry<String, RecallWebSocketServer> entry : map.entrySet()) {
|
||||||
|
// if (entry.getKey().contains(substring)) {
|
||||||
|
// result.put(entry.getKey(), entry.getValue().toString());
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// return result;
|
||||||
|
// }
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package com.njcn.dataProcess.websocket;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||||
|
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/12/13 15:09【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class WebSocketConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ServerEndpointExporter serverEndpointExporter() {
|
||||||
|
return new ServerEndpointExporter();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通信文本消息和二进制缓存区大小
|
||||||
|
* 避免对接 第三方 报文过大时,Websocket 1009 错误
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ServletServerContainerFactoryBean createWebSocketContainer() {
|
||||||
|
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
|
||||||
|
// 在此处设置bufferSize
|
||||||
|
container.setMaxTextMessageBufferSize(10240000);
|
||||||
|
container.setMaxBinaryMessageBufferSize(10240000);
|
||||||
|
container.setMaxSessionIdleTimeout(15 * 60000L);
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -49,6 +49,7 @@ public class DeviceRebootMessage {
|
|||||||
private String series;
|
private String series;
|
||||||
//终端识别码
|
//终端识别码
|
||||||
private String devKey;
|
private String devKey;
|
||||||
|
private Integer processNo;
|
||||||
//
|
//
|
||||||
private List<MonitorInfo> monitorData;
|
private List<MonitorInfo> monitorData;
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import java.io.Serializable;
|
|||||||
public class FrontLogslMessage extends BaseMessage implements Serializable {
|
public class FrontLogslMessage extends BaseMessage implements Serializable {
|
||||||
private String nodeId;
|
private String nodeId;
|
||||||
private String processNo;
|
private String processNo;
|
||||||
|
private String code;
|
||||||
private String businessId;
|
private String businessId;
|
||||||
private String level;
|
private String level;
|
||||||
private String logType;
|
private String logType;
|
||||||
|
|||||||
@@ -1,9 +1,14 @@
|
|||||||
package com.njcn.message.messagedto;
|
package com.njcn.message.messagedto;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||||
|
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
|
||||||
import com.njcn.middle.rocket.domain.BaseMessage;
|
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Description:
|
* Description:
|
||||||
@@ -14,11 +19,22 @@ import java.io.Serializable;
|
|||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class TopicReplyDTO extends BaseMessage implements Serializable {
|
public class TopicReplyDTO extends BaseMessage implements Serializable {
|
||||||
//消息id
|
//消息id guid="12345"是补招回复结果
|
||||||
private String guid;
|
private String guid;
|
||||||
|
|
||||||
private String step;
|
private String step;
|
||||||
|
//guid="12345"是补招回复结果
|
||||||
private String result;
|
private String result;
|
||||||
|
|
||||||
|
private String lineIndex;
|
||||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
|
||||||
|
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
|
||||||
|
private LocalDateTime recallStartDate;
|
||||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
|
||||||
|
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
|
||||||
|
private LocalDateTime recallEndDate;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,19 +64,19 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler<Dev
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean filter(DevComFlagDTO message) {
|
public boolean filter(DevComFlagDTO message) {
|
||||||
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
|
// String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
|
||||||
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
// if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||||
return false;
|
// return false;
|
||||||
}
|
// }
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* 消费成功,缓存到redis72小时,避免重复消费
|
* 消费成功,缓存到redis72小时,避免重复消费
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void consumeSuccess(DevComFlagDTO message) {
|
protected void consumeSuccess(DevComFlagDTO message) {
|
||||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
|
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ public class FrontHeartBeatConsumer extends EnhanceConsumerMessageHandler<FrontH
|
|||||||
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||||
// return false;
|
// return false;
|
||||||
// }
|
// }
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* 消费成功,缓存到redis72小时,避免重复消费
|
* 消费成功,缓存到redis72小时,避免重复消费
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ public class RealTimeDataConsumer extends EnhanceConsumerMessageHandler<MessageD
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void consumeSuccess(MessageDataDTO message) {
|
protected void consumeSuccess(MessageDataDTO message) {
|
||||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
|
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
package com.njcn.message.consumer;
|
package com.njcn.message.consumer;
|
||||||
|
|
||||||
|
import cn.hutool.core.date.DatePattern;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.njcn.message.constant.MessageStatus;
|
import com.njcn.message.constant.MessageStatus;
|
||||||
import com.njcn.message.messagedto.TopicReplyDTO;
|
import com.njcn.message.messagedto.TopicReplyDTO;
|
||||||
import com.njcn.message.constant.RedisKeyPrefix;
|
import com.njcn.message.constant.RedisKeyPrefix;
|
||||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||||
import com.njcn.redis.pojo.enums.AppRedisKey;
|
|
||||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||||
import com.njcn.redis.utils.RedisUtil;
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
import com.njcn.stat.api.MessAnalysisFeignClient;
|
import com.njcn.stat.api.MessAnalysisFeignClient;
|
||||||
@@ -76,14 +76,20 @@ public class TopicReplyConsumer extends EnhanceConsumerMessageHandler<TopicReply
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void consumeSuccess(TopicReplyDTO message) {
|
protected void consumeSuccess(TopicReplyDTO message) {
|
||||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void handleMessage(TopicReplyDTO message) {
|
protected void handleMessage(TopicReplyDTO message) {
|
||||||
//业务处理
|
//“12345”补招回复
|
||||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),JSONObject.toJSONString(message),60*60L);
|
if(Objects.equals(message.getGuid(),"12345")){
|
||||||
|
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getLineIndex().concat(message.getRecallStartDate().toLocalDate().format(DatePattern.NORM_DATE_FORMATTER))),JSONObject.toJSONString(message),1*60L);
|
||||||
|
}else {
|
||||||
|
//业务处理
|
||||||
|
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getGuid()),JSONObject.toJSONString(message),60*60L);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -67,11 +67,24 @@ public class MigrationInfluxDBJob {
|
|||||||
// 加上59分钟59秒
|
// 加上59分钟59秒
|
||||||
LocalDateTime modifiedResultOld = resultOld.plusMinutes(59).plusSeconds(59);
|
LocalDateTime modifiedResultOld = resultOld.plusMinutes(59).plusSeconds(59);
|
||||||
LineCountEvaluateParam paramOld = new LineCountEvaluateParam();
|
LineCountEvaluateParam paramOld = new LineCountEvaluateParam();
|
||||||
paramOld.setIsManual(false);
|
paramOld.setIsManual(true);
|
||||||
paramOld.setStartTime(resultOld.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
paramOld.setStartTime(resultOld.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||||
paramOld.setEndTime(modifiedResultOld.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
paramOld.setEndTime(modifiedResultOld.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||||
migrationService.hourseLineDataBacthSysc(paramOld);
|
migrationService.hourseLineDataBacthSysc(paramOld);
|
||||||
migrationService.hourseDevDataBacthSysc(paramOld);
|
migrationService.hourseDevDataBacthSysc(paramOld);
|
||||||
|
|
||||||
|
// //定时任务在往前补俩小时的
|
||||||
|
// LocalDateTime oneHourAgoOld2 = now.minusHours(4);
|
||||||
|
// // 将分钟和秒设置为0
|
||||||
|
// LocalDateTime resultOld2 = oneHourAgoOld2.truncatedTo(ChronoUnit.HOURS);
|
||||||
|
// // 加上59分钟59秒
|
||||||
|
// LocalDateTime modifiedResultOld2 = resultOld2.plusMinutes(59).plusSeconds(59);
|
||||||
|
// LineCountEvaluateParam paramOld2 = new LineCountEvaluateParam();
|
||||||
|
// paramOld2.setIsManual(true);
|
||||||
|
// paramOld2.setStartTime(resultOld2.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||||
|
// paramOld2.setEndTime(modifiedResultOld2.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||||
|
// migrationService.hourseLineDataBacthSysc(paramOld2);
|
||||||
|
// migrationService.hourseDevDataBacthSysc(paramOld2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Scheduled(cron = "0 0 22 * * ?")
|
@Scheduled(cron = "0 0 22 * * ?")
|
||||||
|
|||||||
Reference in New Issue
Block a user