dataV分钟数据转日表

This commit is contained in:
xy
2025-02-10 16:32:56 +08:00
parent ec2ea472b5
commit af9ead546f
27 changed files with 2197 additions and 342 deletions

View File

@@ -1,9 +1,11 @@
package com.njcn.algorithm;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.DependsOn;
/**
@@ -12,6 +14,8 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
@Slf4j
@EnableFeignClients(basePackages = "com.njcn")
@SpringBootApplication(scanBasePackages = "com.njcn")
@MapperScan("com.njcn.**.mapper")
@DependsOn("proxyMapperRegister")
public class AlgorithmBootApplication {
public static void main(String[] args) {

View File

@@ -0,0 +1,439 @@
package com.njcn.algorithm;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.njcn.algorithm.pojo.bo.BaseParam;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.pojo.enums.PrepareResponseEnum;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
import com.njcn.device.biz.pojo.dto.*;
import com.njcn.device.biz.pojo.param.DeptGetLineParam;
import com.njcn.user.api.DeptFeignClient;
import com.njcn.user.pojo.po.Dept;
import com.njcn.web.controller.BaseController;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.entity.CmpStep;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年11月01日 10:20
*/
@Slf4j
@Api(tags = "算法执行中心")
@RestController
@RequestMapping("/executor")
@RequiredArgsConstructor
public class ExecutionCenter extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(ExecutionCenter.class);
@Resource
private CommTerminalGeneralClient commTerminalGeneralClient;
@Resource
private DeptFeignClient deptFeignClient;
@Resource
private FlowExecutor flowExecutor;
/***
* 1、校验非全链执行时tagNames节点标签集合必须为非空否则提示---无可执行节点
* 2、补招标识为true时起始日期&截止日期不可为空
* 3、算法需要的索引集合
* @author hongawen
* @date 2023/11/3 11:36
* @param baseParam 执行的基础参数
*/
private CalculatedParam judgeExecuteParam(BaseParam baseParam) {
CalculatedParam calculatedParam = new CalculatedParam();
if (!baseParam.isFullChain() && CollectionUtil.isEmpty(baseParam.getTagNames())) {
throw new BusinessException(PrepareResponseEnum.NO_EXECUTOR_NODE);
}
if (baseParam.isRepair() && StrUtil.isAllEmpty(baseParam.getBeginTime(), baseParam.getEndTime())) {
throw new BusinessException(PrepareResponseEnum.NO_REPAIR_DATE);
}
BeanUtil.copyProperties(baseParam, calculatedParam);
return calculatedParam;
}
/***
*
* @author hongawen
* @date 2023/11/7 14:44
*/
private void dealResponse(CalculatedParam calculatedParam, LiteflowResponse liteflowResponse, String methodDescribe) {
if (liteflowResponse.isSuccess()) {
logger.info("日期{}{}执行{}成功", calculatedParam.getDataDate(), methodDescribe, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames())));
} else {
Map<String, List<CmpStep>> executeSteps = liteflowResponse.getExecuteSteps();
CmpStep failStep = null;
for (String key : executeSteps.keySet()) {
List<CmpStep> cmpSteps = executeSteps.get(key);
cmpSteps = cmpSteps.stream().filter(cmpStep -> !cmpStep.isSuccess()).collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(cmpSteps)) {
failStep = cmpSteps.get(0);
}
}
logger.error("日期{}{}执行{}失败,在执行{}失败,失败原因:{}"
, calculatedParam.getDataDate()
, methodDescribe
, calculatedParam.isFullChain() ? "全链" : "指定节点:".concat(String.join(StrPool.COMMA, calculatedParam.getTagNames()))
, failStep.getNodeId().concat(Objects.isNull(failStep.getTag()) ? "" : StrPool.DASHED.concat(failStep.getTag()))
, Objects.isNull(failStep.getException()) ? null : failStep.getException().getMessage());
}
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("监测点算法执行链")
@PostMapping("/measurementPointExecutor")
@Async("asyncExecutor")
public void measurementPointExecutor(@RequestBody BaseParam baseParam) {
String methodDescribe = getMethodDescribe("measurementPointExecutor");
//手动判断参数是否合法,
CalculatedParam calculatedParam = judgeExecuteParam(baseParam);
// 测点索引
if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
calculatedParam.setIdList(commTerminalGeneralClient.getRunMonitorIds().getData());
}
LiteflowResponse liteflowResponse;
if (baseParam.isRepair()) {
//补招时,起始日期、截止日期必填
DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
//递增日期执行算法链
for (int i = 0; i < betweenDay; i++) {
if (i != 0) {
startDate = DateUtil.offsetDay(startDate, 1);
}
calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
liteflowResponse = flowExecutor.execute2Resp("measurement_point", calculatedParam);
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
}
} else {
//非补招
liteflowResponse = flowExecutor.execute2Resp("measurement_point", calculatedParam);
dealResponse(calculatedParam, liteflowResponse, methodDescribe);
}
}
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("装置算法执行链")
// @PostMapping("/deviceExecutor")
// @Async("asyncExecutor")
// public void deviceExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("deviceExecutor");
// //手动判断参数是否合法,
// CalculatedParam calculatedParam = judgeExecuteParam(baseParam);
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
// // 设备索引
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept data = deptFeignClient.getRootDept().getData();
// deptGetLineParam.setDeptId(data.getId());
// List<DeptGetDeviceDTO> list = commTerminalGeneralClient.deptGetDevice(deptGetLineParam).getData();
// DeptGetDeviceDTO dto = list.stream().filter(po -> Objects.equals(po.getUnitId(), data.getId())).collect(Collectors.toList()).get(0);
// List<LineDevGetDTO> devList = dto.getDeviceList();
// calculatedParam.setIdList(devList.stream().map(LineDevGetDTO::getDevId).distinct().collect(Collectors.toList()));
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("device", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("device", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// }
//
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("单位监测点算法执行链")
// @PostMapping("/orgPointExecutor")
// @Async("asyncExecutor")
// public void orgPointExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("OrgPointExecutor");
// //手动判断参数是否合法,
// CalculatedParam<DeptGetChildrenMoreDTO> calculatedParam = judgeExecuteParam(baseParam);
// // 测点索引
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
//
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept data = deptFeignClient.getRootDept().getData();
// deptGetLineParam.setDeptId(data.getId());
// calculatedParam.setIdList(commTerminalGeneralClient.deptGetLine(deptGetLineParam).getData());
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("org_point", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("org_point", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
//
// }
//
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("pms国网上送单位层级算法执行链")
// @PostMapping("/uploadOrgExecutor")
// @Async("asyncExecutor")
// public void uploadOrgExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("uploadOrgExecutor");
// //手动判断参数是否合法,
// CalculatedParam<DeptGetChildrenMoreDTO> calculatedParam = judgeExecuteParam(baseParam);
// // 测点索引
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
//
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept data = deptFeignClient.getRootDept().getData();
// deptGetLineParam.setDeptId(data.getId());
// calculatedParam.setIdList(commTerminalGeneralClient.deptGetLine(deptGetLineParam).getData());
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("upload_org", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("upload_org", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
//
// }
//
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("变电站算法执行链")
// @PostMapping("/substationExecutor")
// @Async("asyncExecutor")
// public void substationExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("substationExecutor");
// //手动判断参数是否合法,
// CalculatedParam<String> calculatedParam = judgeExecuteParam(baseParam);
// // 测点索引
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
//
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept data = deptFeignClient.getRootDept().getData();
// deptGetLineParam.setDeptId(data.getId());
// List<DeptGetSubStationDTO> data1 = commTerminalGeneralClient.deptSubStation(deptGetLineParam).getData();
// List<String> stationIds = new ArrayList<>();
// for (DeptGetSubStationDTO deptGetSubStationDTO : data1) {
// Collection<String> union = CollectionUtils.union(Optional.ofNullable(deptGetSubStationDTO.getStationIds()).orElse(new ArrayList<String>()),
// Optional.ofNullable(deptGetSubStationDTO.getStationIds()).orElse(new ArrayList<String>()));
// List<String> collect = union.stream().distinct().collect(Collectors.toList());
// stationIds.addAll(collect);
// }
// stationIds = stationIds.stream().distinct().collect(Collectors.toList());
// calculatedParam.setIdList(stationIds);
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("sub_station", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
//
// }
//
// /**
// * pms dim母线电站运行情况
// *
// * @author cdf
// * @date 2023/11/17
// */
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("pms变电站母线算法执行链")
// @PostMapping("/pmsdimexecutor")
// public void pmsDimExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("pmsdimexecutor");
// //手动判断参数是否合法,
// CalculatedParam<DeptGetChildrenDTO> calculatedParam = judgeExecuteParam(baseParam);
// // 测点索引
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
//
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept data = deptFeignClient.getRootDept().getData();
// deptGetLineParam.setDeptId(data.getId());
// deptGetLineParam.setSystemType(0);
// List<DeptGetChildrenDTO> dept = commTerminalGeneralClient.deptGetLineList(deptGetLineParam).getData();
// calculatedParam.setIdList(dept);
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("dim_station_busbar", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("dim_station_busbar", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
//
// }
//
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("母线算法执行链(主网测点)")
// @PostMapping("/generaTrixExecutor")
// @Async("asyncExecutor")
// public void generaTrixExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("generaTrixExecutor");
// //手动判断参数是否合法,
// CalculatedParam<DeptGetBusBarDTO> calculatedParam = judgeExecuteParam(baseParam);
// //母线索引
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept dept = deptFeignClient.getRootDept().getData();
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
// deptGetLineParam.setDeptId(dept.getId());
// deptGetLineParam.setSystemType(0);
// List<DeptGetBusBarDTO> busBarList = commTerminalGeneralClient.deptBusBar(deptGetLineParam).getData();
// calculatedParam.setIdList(busBarList);
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("genera_trix", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("genera_trix", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// }
//
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("单位变电站算法执行链")
// @PostMapping("/orgSubStationExecutor")
// @Async("asyncExecutor")
// public void orgSubStationExecutor(@RequestBody BaseParam baseParam) {
// String methodDescribe = getMethodDescribe("orgSubStationExecutor");
// //手动判断参数是否合法,
// CalculatedParam<DeptGetSubStationDTO.Info> calculatedParam = judgeExecuteParam(baseParam);
// // 测点索引
// DeptGetLineParam deptGetLineParam = new DeptGetLineParam();
// if (CollectionUtils.isEmpty(calculatedParam.getIdList())) {
// Dept data = deptFeignClient.getRootDept().getData();
// deptGetLineParam.setDeptId(data.getId());
// List<DeptGetSubStationDTO.Info> data1 = commTerminalGeneralClient.deptGetSubStationInfo(deptGetLineParam).getData();
// calculatedParam.setIdList(data1);
// }
// LiteflowResponse liteflowResponse;
// if (baseParam.isRepair()) {
// //补招时,起始日期、截止日期必填
// DateTime startDate = DateUtil.parse(baseParam.getBeginTime(), DatePattern.NORM_DATE_FORMAT);
// DateTime endDate = DateUtil.parse(baseParam.getEndTime(), DatePattern.NORM_DATE_FORMAT);
// long betweenDay = DateUtil.betweenDay(startDate, endDate, true);
// //递增日期执行算法链
// for (int i = 0; i < betweenDay; i++) {
// if (i != 0) {
// startDate = DateUtil.offsetDay(startDate, 1);
// }
// calculatedParam.setDataDate(DateUtil.format(startDate, DatePattern.NORM_DATE_PATTERN));
// liteflowResponse = flowExecutor.execute2Resp("orgSub_station", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// } else {
// //非补招
// liteflowResponse = flowExecutor.execute2Resp("orgSub_station", calculatedParam);
// dealResponse(calculatedParam, liteflowResponse, methodDescribe);
// }
// }
}

View File

@@ -0,0 +1,38 @@
package com.njcn.algorithm.executor;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.yomahub.liteflow.core.NodeComponent;
import java.util.Set;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年11月07日 09:02
*/
public class BaseExecutor {
/***
* 判断当前节点是否执行
* @author hongawen
* @date 2023/11/6 16:17
* @param bindCmp 执行链绑定的上下文
* @return boolean
*/
public boolean isAccess(NodeComponent bindCmp) {
String tag = bindCmp.getTag();
if (StrUtil.isBlank(tag)) {
//没有指定tag的使用node自己的名称判断
tag = bindCmp.getNodeId();
}
CalculatedParam requestData = bindCmp.getRequestData();
Set<String> tagNames = requestData.getTagNames();
if (requestData.isFullChain() || (CollectionUtil.isNotEmpty(tagNames) && tagNames.contains(tag))) {
return true;
}
return false;
}
}

View File

@@ -0,0 +1,45 @@
package com.njcn.algorithm.executor;
import com.njcn.algorithm.service.line.DayDataService;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
/**
* @author xy
* @version 1.0.0
* @date 2025年1月16日
*/
@Slf4j
@LiteflowComponent
@RequiredArgsConstructor
public class MeasurementExecutor extends BaseExecutor {
@Resource
private DayDataService dayDataService;
/********************************************算法负责人:xy***********************************************************/
/**
* 算法名: 3.4.1.1-----监测点报表_日表(r_stat_data_*_d)
* @author xuyang
*/
@LiteflowMethod(value = LiteFlowMethodEnum.IS_ACCESS, nodeId = "dataV", nodeType = NodeTypeEnum.COMMON)
public boolean dataVToDayAccess(NodeComponent bindCmp) {
return isAccess(bindCmp);
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "dataV", nodeType = NodeTypeEnum.COMMON)
public void dataVToDayProcess(NodeComponent bindCmp) {
dayDataService.dataVHandler(bindCmp.getRequestData());
}
/********************************************算法负责人:xy结束***********************************************************/
}

View File

@@ -0,0 +1,36 @@
package com.njcn.algorithm.liteflow;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.yomahub.liteflow.core.NodeComponent;
import java.util.Set;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年11月03日 14:42
*/
public abstract class NjcnNodeComponent extends NodeComponent {
/***
* 判断是否进入该节点
* 1、全链路执行时
* 2、非全链路执行但是需要执行的tag集合中包含了当前tag
*/
@Override
public boolean isAccess() {
String tag = this.getTag();
if (StrUtil.isBlank(tag)) {
//没有指定tag的使用node自己的名称判断
tag = this.getNodeId();
}
CalculatedParam requestData = this.getRequestData();
Set<String> tagNames = requestData.getTagNames();
if (requestData.isFullChain() || (CollectionUtil.isNotEmpty(tagNames) && tagNames.contains(tag))) {
return true;
}
return false;
}
}

View File

@@ -0,0 +1,19 @@
package com.njcn.algorithm.service.line;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
/**
* @author xy
*/
public interface DayDataService {
/***
* dataV转r_stat_data_v_d
* @author xuyang
* @date 2025/01/18 21:18
* @param calculatedParam 查询条件
*/
void dataVHandler(CalculatedParam calculatedParam);
}

View File

@@ -0,0 +1,216 @@
package com.njcn.algorithm.serviceimpl.line;
import cn.hutool.core.collection.CollUtil;
import com.njcn.algorithm.pojo.bo.CalculatedParam;
import com.njcn.algorithm.service.line.DayDataService;
import com.njcn.dataProcess.api.DataVFeignClient;
import com.njcn.dataProcess.param.LineCountEvaluateParam;
import com.njcn.dataProcess.pojo.dto.CommonMinuteDto;
import com.njcn.dataProcess.pojo.dto.DataVDto;
import com.njcn.dataProcess.util.TimeUtils;
import com.njcn.influx.constant.InfluxDbSqlConstant;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
/**
* @author xy
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DayDataServiceImpl implements DayDataService {
private static final Logger logger = LoggerFactory.getLogger(DayDataServiceImpl.class);
private final static Integer NUM = 100;
@Resource
private DataVFeignClient dataVFeignClient;
@Override
public void dataVHandler(CalculatedParam calculatedParam) {
logger.info("{},dataV表转r_stat_data_v_d开始=====》", LocalDateTime.now());
List<DataVDto> result = new ArrayList<>();
//远程接口获取分钟数据
LineCountEvaluateParam lineParam = new LineCountEvaluateParam();
lineParam.setStartTime(TimeUtils.getBeginOfDay(calculatedParam.getDataDate()));
lineParam.setEndTime(TimeUtils.getEndOfDay(calculatedParam.getDataDate()));
//以100个监测点分片处理
List<List<String>> pendingIds = ListUtils.partition(calculatedParam.getIdList(),NUM);
pendingIds.forEach(list->{
lineParam.setLineId(list);
List<CommonMinuteDto> partList = dataVFeignClient.getBaseData(lineParam).getData();
if (CollUtil.isNotEmpty(partList)) {
partList.forEach(item->{
//相别
List<CommonMinuteDto.PhasicType> phasicTypeList = item.getPhasicTypeList();
phasicTypeList.forEach(item2->{
//数据类型
List<CommonMinuteDto.ValueType> valueTypeList = item2.getValueTypeList();
//获取平均值集合
CommonMinuteDto.ValueType valueTypes = valueTypeList.stream().filter(type-> type.getValueType().equalsIgnoreCase(InfluxDbSqlConstant.AVG_WEB)).findFirst().orElse(null);
valueTypeList.forEach(item3->{
DataVDto dto = new DataVDto();
dto.setTime(item.getTime());
dto.setLineId(item.getLineId());
dto.setPhasicType(item2.getPhasicType());
dto.setValueType(item3.getValueType());
//todo 数据清洗的功能需要讨论在哪完成
dto.setQualityFlag(0);
channelDataVHandler(item3,valueTypes,dto,true);
result.add(dto);
});
});
});
}
});
if (CollUtil.isNotEmpty(result)) {
//存储数据
dataVFeignClient.addList(result);
}
}
//指标处理
//pojo1 为正常数据集合
//pojo2 为平均值数据集合根据不同情况用来计算cp95
public void channelDataVHandler(CommonMinuteDto.ValueType pojo1, CommonMinuteDto.ValueType pojo2, DataVDto dto, boolean scheme) {
CommonMinuteDto.ValueType valueType;
if (dto.getValueType().equalsIgnoreCase(InfluxDbSqlConstant.CP95) && !scheme) {
valueType = pojo2;
} else {
valueType = pojo1;
}
//todo 按照集合排列顺序取值
dto.setFreq(getData(valueType.getValueType(),valueType.getValueList().get(0),scheme));
dto.setFreqDev(getData(valueType.getValueType(),valueType.getValueList().get(1),scheme));
dto.setRms(getData(valueType.getValueType(),valueType.getValueList().get(2),scheme));
dto.setRmsLvr(getData(valueType.getValueType(),valueType.getValueList().get(3),scheme));
dto.setVNeg(getData(valueType.getValueType(),valueType.getValueList().get(4),scheme));
dto.setVPos(getData(valueType.getValueType(),valueType.getValueList().get(5),scheme));
dto.setVThd(getData(valueType.getValueType(),valueType.getValueList().get(6),scheme));
dto.setVUnbalance(getData(valueType.getValueType(),valueType.getValueList().get(7),scheme));
dto.setVZero(getData(valueType.getValueType(),valueType.getValueList().get(8),scheme));
dto.setVlDev(getData(valueType.getValueType(),valueType.getValueList().get(9),scheme));
dto.setVuDev(getData(valueType.getValueType(),valueType.getValueList().get(10),scheme));
dto.setV1(getData(valueType.getValueType(),valueType.getValueList().get(11),scheme));
dto.setV2(getData(valueType.getValueType(),valueType.getValueList().get(12),scheme));
dto.setV3(getData(valueType.getValueType(),valueType.getValueList().get(13),scheme));
dto.setV4(getData(valueType.getValueType(),valueType.getValueList().get(14),scheme));
dto.setV5(getData(valueType.getValueType(),valueType.getValueList().get(15),scheme));
dto.setV6(getData(valueType.getValueType(),valueType.getValueList().get(16),scheme));
dto.setV7(getData(valueType.getValueType(),valueType.getValueList().get(17),scheme));
dto.setV8(getData(valueType.getValueType(),valueType.getValueList().get(18),scheme));
dto.setV9(getData(valueType.getValueType(),valueType.getValueList().get(19),scheme));
dto.setV10(getData(valueType.getValueType(),valueType.getValueList().get(20),scheme));
dto.setV11(getData(valueType.getValueType(),valueType.getValueList().get(21),scheme));
dto.setV12(getData(valueType.getValueType(),valueType.getValueList().get(22),scheme));
dto.setV13(getData(valueType.getValueType(),valueType.getValueList().get(23),scheme));
dto.setV14(getData(valueType.getValueType(),valueType.getValueList().get(24),scheme));
dto.setV15(getData(valueType.getValueType(),valueType.getValueList().get(25),scheme));
dto.setV16(getData(valueType.getValueType(),valueType.getValueList().get(26),scheme));
dto.setV17(getData(valueType.getValueType(),valueType.getValueList().get(27),scheme));
dto.setV18(getData(valueType.getValueType(),valueType.getValueList().get(28),scheme));
dto.setV19(getData(valueType.getValueType(),valueType.getValueList().get(29),scheme));
dto.setV20(getData(valueType.getValueType(),valueType.getValueList().get(30),scheme));
dto.setV21(getData(valueType.getValueType(),valueType.getValueList().get(31),scheme));
dto.setV22(getData(valueType.getValueType(),valueType.getValueList().get(32),scheme));
dto.setV23(getData(valueType.getValueType(),valueType.getValueList().get(33),scheme));
dto.setV24(getData(valueType.getValueType(),valueType.getValueList().get(34),scheme));
dto.setV25(getData(valueType.getValueType(),valueType.getValueList().get(35),scheme));
dto.setV26(getData(valueType.getValueType(),valueType.getValueList().get(36),scheme));
dto.setV27(getData(valueType.getValueType(),valueType.getValueList().get(37),scheme));
dto.setV28(getData(valueType.getValueType(),valueType.getValueList().get(38),scheme));
dto.setV29(getData(valueType.getValueType(),valueType.getValueList().get(39),scheme));
dto.setV30(getData(valueType.getValueType(),valueType.getValueList().get(40),scheme));
dto.setV31(getData(valueType.getValueType(),valueType.getValueList().get(41),scheme));
dto.setV32(getData(valueType.getValueType(),valueType.getValueList().get(42),scheme));
dto.setV33(getData(valueType.getValueType(),valueType.getValueList().get(43),scheme));
dto.setV34(getData(valueType.getValueType(),valueType.getValueList().get(44),scheme));
dto.setV35(getData(valueType.getValueType(),valueType.getValueList().get(45),scheme));
dto.setV36(getData(valueType.getValueType(),valueType.getValueList().get(46),scheme));
dto.setV37(getData(valueType.getValueType(),valueType.getValueList().get(47),scheme));
dto.setV38(getData(valueType.getValueType(),valueType.getValueList().get(48),scheme));
dto.setV39(getData(valueType.getValueType(),valueType.getValueList().get(49),scheme));
dto.setV40(getData(valueType.getValueType(),valueType.getValueList().get(50),scheme));
dto.setV41(getData(valueType.getValueType(),valueType.getValueList().get(51),scheme));
dto.setV42(getData(valueType.getValueType(),valueType.getValueList().get(52),scheme));
dto.setV43(getData(valueType.getValueType(),valueType.getValueList().get(53),scheme));
dto.setV44(getData(valueType.getValueType(),valueType.getValueList().get(54),scheme));
dto.setV45(getData(valueType.getValueType(),valueType.getValueList().get(55),scheme));
dto.setV46(getData(valueType.getValueType(),valueType.getValueList().get(56),scheme));
dto.setV47(getData(valueType.getValueType(),valueType.getValueList().get(57),scheme));
dto.setV48(getData(valueType.getValueType(),valueType.getValueList().get(58),scheme));
dto.setV49(getData(valueType.getValueType(),valueType.getValueList().get(59),scheme));
dto.setV50(getData(valueType.getValueType(),valueType.getValueList().get(60),scheme));
}
//数据类型处理
//cp95值的计算有点区别会用到cp95的集合或者平均值的集合
public Double getData(String valueType, List<Double> list, boolean scheme) {
Double result = null;
valueType = valueType.toUpperCase();
if (scheme) {
switch (valueType) {
case InfluxDbSqlConstant.MAX:
case InfluxDbSqlConstant.CP95:
Optional<Double> max = list.stream().filter(Objects::nonNull).max(Double::compare);
result = max.orElse(null);
break;
case InfluxDbSqlConstant.MIN:
Optional<Double> min = list.stream().filter(Objects::nonNull).min(Double::compare);
result = min.orElse(null);
break;
case InfluxDbSqlConstant.AVG_WEB:
OptionalDouble average = list.stream()
.filter(Objects::nonNull)
.mapToDouble(Double::doubleValue)
.average();
result = average.isPresent() ? average.getAsDouble() : null;
break;
default:
break;
}
} else {
switch (valueType) {
case InfluxDbSqlConstant.MAX:
Optional<Double> max = list.stream().filter(Objects::nonNull).max(Double::compare);
result = max.orElse(null);
break;
case InfluxDbSqlConstant.MIN:
Optional<Double> min = list.stream().filter(Objects::nonNull).min(Double::compare);
result = min.orElse(null);
break;
case InfluxDbSqlConstant.AVG_WEB:
OptionalDouble average = list.stream()
.filter(Objects::nonNull)
.mapToDouble(Double::doubleValue)
.average();
result = average.isPresent() ? average.getAsDouble() : null;
break;
case InfluxDbSqlConstant.CP95:
list.sort(Collections.reverseOrder());
int discardCount = (int) Math.ceil(list.size() * 0.05);
List<Double> remainingList = list.subList(discardCount, list.size());
result = remainingList.isEmpty() ? null : remainingList.get(0);
break;
default:
break;
}
}
return result;
}
}

View File

@@ -29,11 +29,20 @@ spring:
shared-configs:
- data-id: share-config.yaml
refresh: true
- data-Id: algorithm-config.yaml
- data-Id: share-config-datasource-db.yaml
refresh: true
main:
allow-bean-definition-overriding: true
liteflow:
rule-source-ext-data-map:
serverAddr: @nacos.url@
dataId: prepare_liteflow
group: DEFAULT_GROUP
namespace: @nacos.namespace@
when-max-wait-time: 600000
print-banner: false
#项目日志的配置
logging: