diff --git a/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/LiteFlowFeignClient.java b/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/LiteFlowFeignClient.java index e3036e2c7..c5e83fc71 100644 --- a/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/LiteFlowFeignClient.java +++ b/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/LiteFlowFeignClient.java @@ -6,6 +6,7 @@ import com.njcn.prepare.harmonic.api.liteflow.fallback.LiteFlowFeignClientFallba import com.njcn.prepare.harmonic.pojo.bo.BaseParam; import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.scheduling.annotation.Async; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -23,6 +24,9 @@ public interface LiteFlowFeignClient { @PostMapping("/measurementPointExecutor") void measurementPointExecutor(@RequestBody BaseParam baseParam); + @ApiOperation("监测点算法执行链(按小时执行的任务)") + @PostMapping("/measurementPointExecutorByHour") + public void measurementPointExecutorByHour(@RequestBody BaseParam baseParam); @ApiOperation("单位监测点算法执行链") @PostMapping("/orgPointExecutor") void orgPointExecutor(@RequestBody BaseParam baseParam); diff --git a/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/fallback/LiteFlowFeignClientFallbackFactory.java b/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/fallback/LiteFlowFeignClientFallbackFactory.java index b62129719..e032ab8c7 100644 --- a/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/fallback/LiteFlowFeignClientFallbackFactory.java +++ b/pqs-prepare/prepare-api/src/main/java/com/njcn/prepare/harmonic/api/liteflow/fallback/LiteFlowFeignClientFallbackFactory.java @@ -28,6 +28,12 @@ public class LiteFlowFeignClientFallbackFactory implements FallbackFactory { + LocalDateTime getNewTime(); +} \ No newline at end of file diff --git a/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/mapper/mysql/newalgorithm/mapping/CvtJobLogMapper.xml b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/mapper/mysql/newalgorithm/mapping/CvtJobLogMapper.xml new file mode 100644 index 000000000..0fa052ecf --- /dev/null +++ b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/mapper/mysql/newalgorithm/mapping/CvtJobLogMapper.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + excute_date_time, `row_count`, `state`, update_time, duration + + + + \ No newline at end of file diff --git a/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/CvtJobLogService.java b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/CvtJobLogService.java new file mode 100644 index 000000000..c287132ea --- /dev/null +++ b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/CvtJobLogService.java @@ -0,0 +1,20 @@ +package com.njcn.prepare.harmonic.service.mysql.cvt; + +import com.njcn.prepare.harmonic.pojo.mysql.CvtJobLog; +import com.baomidou.mybatisplus.extension.service.IService; + +import java.time.LocalDateTime; + +/** + * + * Description: + * Date: 2025/03/06 下午 2:07【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface CvtJobLogService extends IService{ + + + LocalDateTime getNewTime(); + } diff --git a/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/IDataHarmRateVCvtService.java b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/IDataHarmRateVCvtService.java new file mode 100644 index 000000000..7b955ace6 --- /dev/null +++ b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/IDataHarmRateVCvtService.java @@ -0,0 +1,15 @@ +package com.njcn.prepare.harmonic.service.mysql.cvt; + + +import com.njcn.prepare.harmonic.pojo.bo.CalculatedParam; + +/** + * Description: + * Date: 2025/02/24 上午 9:10【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface IDataHarmRateVCvtService { + void dataHarmRateVCvtHandler(CalculatedParam requestData); +} diff --git a/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/impl/CvtJobLogServiceImpl.java b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/impl/CvtJobLogServiceImpl.java new file mode 100644 index 000000000..f26d5da99 --- /dev/null +++ b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/impl/CvtJobLogServiceImpl.java @@ -0,0 +1,28 @@ +package com.njcn.prepare.harmonic.service.mysql.cvt.impl; + +import com.njcn.prepare.harmonic.mapper.mysql.CvtJobLogMapper; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.prepare.harmonic.pojo.mysql.CvtJobLog; +import com.njcn.prepare.harmonic.service.mysql.cvt.CvtJobLogService; +/** + * + * Description: + * Date: 2025/03/06 下午 2:07【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +public class CvtJobLogServiceImpl extends ServiceImpl implements CvtJobLogService{ + + @Override + public LocalDateTime getNewTime() { + return this.getBaseMapper().getNewTime(); + } + + +} diff --git a/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/impl/DataHarmRateVCvtServiceImpl.java b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/impl/DataHarmRateVCvtServiceImpl.java new file mode 100644 index 000000000..b1cf09e41 --- /dev/null +++ b/pqs-prepare/prepare-boot/src/main/java/com/njcn/prepare/harmonic/service/mysql/cvt/impl/DataHarmRateVCvtServiceImpl.java @@ -0,0 +1,187 @@ +package com.njcn.prepare.harmonic.service.mysql.cvt.impl; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; + +import cn.hutool.core.date.LocalDateTimeUtil; +import com.njcn.device.pq.api.CvtRelationFeignClient; +import com.njcn.device.pq.pojo.dto.CvtHarmonicCorrectionFactorsDTO; +import com.njcn.influx.imapper.DataHarmRateVCvtMapper; +import com.njcn.influx.imapper.DataHarmRateVMapper; +import com.njcn.influx.pojo.po.DataHarmRateV; +import com.njcn.influx.pojo.po.DataHarmRateVCvt; +import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.prepare.harmonic.pojo.bo.CalculatedParam; +import com.njcn.prepare.harmonic.pojo.mysql.CvtJobLog; +import com.njcn.prepare.harmonic.service.mysql.cvt.CvtJobLogService; +import com.njcn.prepare.harmonic.service.mysql.cvt.IDataHarmRateVCvtService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Component; +import org.springframework.util.StopWatch; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Description: + * Date: 2025/02/24 上午 9:11【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DataHarmRateVCvtServiceImpl implements IDataHarmRateVCvtService { + private final CvtRelationFeignClient cvtRelationFeignClient; + + private final DataHarmRateVMapper dataHarmRateVMapper; + + private final DataHarmRateVCvtMapper dataHarmRateCvtVMapper; + private final CvtJobLogService cvtJobLogService; + private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); + + @Override + public void dataHarmRateVCvtHandler(CalculatedParam calculatedParam) { + + CvtJobLog one = cvtJobLogService.lambdaQuery() + .eq(CvtJobLog::getExcuteDateTime, calculatedParam.getDataDate()) + .one(); + + //日志记录 + //程序监听 + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + if (Objects.nonNull(one) && (one.getState() == 1 || one.getState() == 0)) { + //当前时间段已执行或正在执行,直接跳出循环 + return; + } + if (Objects.isNull(one)) { + one = new CvtJobLog(LocalDateTimeUtil.parse(calculatedParam.getDataDate(),DATE_TIME_FORMATTER), 0, 0, LocalDateTime.now(),0.0 ); + cvtJobLogService.save(one); + } + + try { + + List idList = calculatedParam.getIdList(); + DateTime beginTime1= DateUtil.beginOfHour(DateUtil.parse(calculatedParam.getDataDate(), DatePattern.NORM_DATETIME_FORMAT)); + + InfluxQueryWrapper cvtMapper = new InfluxQueryWrapper(DataHarmRateVCvt.class); + + LocalDateTime time =cvtJobLogService.getNewTime(); + DateTime beginTime ; + if(Objects.nonNull(time)){ + String format = LocalDateTimeUtil.format(time, DatePattern.NORM_DATETIME_PATTERN); + DateTime beginTime2 = DateUtil.beginOfHour(DateUtil.parse(format, DatePattern.NORM_DATETIME_FORMAT)); + beginTime = beginTime1.isBefore(beginTime2) ? beginTime1 : beginTime2; + }else { + beginTime = beginTime1; + } + + DateTime endTime = DateUtil.endOfHour(beginTime1); + cvtMapper.regular(DataHarmRateVCvt::getLineId,calculatedParam.getIdList()).between(DataHarmRateVCvt::getTime,DateUtil.format(beginTime,DATE_TIME_FORMATTER) , DateUtil.format(endTime,DATE_TIME_FORMATTER)); + + List dataHarmRateVCvtList = dataHarmRateCvtVMapper.selectByQueryWrapper(cvtMapper); + + + List result = new ArrayList<>(); + Map> collect = dataHarmRateVCvtList.stream().collect(Collectors.groupingBy(DataHarmRateVCvt::getLineId)); + collect.forEach((k,v)->{ + CvtHarmonicCorrectionFactorsDTO cvtHarmonicCorrectionFactorsDTO = cvtRelationFeignClient.queryByLineId(k).getData(); + if(Objects.nonNull(cvtHarmonicCorrectionFactorsDTO)&& + StringUtils.isNotEmpty(cvtHarmonicCorrectionFactorsDTO.getId())){ + v = cvtChange(v,cvtHarmonicCorrectionFactorsDTO); + } + List dataVCvtDtoList = v.stream().map(temp -> { + DataHarmRateV dataHarmRateVDto = new DataHarmRateV(); + BeanUtils.copyProperties(temp, dataHarmRateVDto); + return dataHarmRateVDto; + }).collect(Collectors.toList()); + + result.addAll(dataVCvtDtoList); + }); + dataHarmRateVMapper.insertBatch(result); + stopWatch.stop(); + one.setRowCount(result.size()); + one.setState(1); + one.setDuration(stopWatch.getTotalTimeSeconds()); + cvtJobLogService.updateById(one); + + }catch (Exception exception) { + exception.printStackTrace(); + one.setState(2); + one.setUpdateTime(LocalDateTime.now()); + cvtJobLogService.updateById(one); + } + + + } + + private List cvtChange(List v, CvtHarmonicCorrectionFactorsDTO cvtHarmonicCorrectionFactorsDTO) { + v.stream().forEach(temp->{ + + temp.setV2(cvtHarmonicCorrectionFactorsDTO.getH2().doubleValue()*temp.getV2()); + temp.setV3(cvtHarmonicCorrectionFactorsDTO.getH3().doubleValue()*temp.getV3()); + temp.setV4(cvtHarmonicCorrectionFactorsDTO.getH4().doubleValue()*temp.getV4()); + temp.setV5(cvtHarmonicCorrectionFactorsDTO.getH5().doubleValue()*temp.getV5()); + temp.setV6(cvtHarmonicCorrectionFactorsDTO.getH6().doubleValue()*temp.getV6()); + temp.setV7(cvtHarmonicCorrectionFactorsDTO.getH7().doubleValue()*temp.getV7()); + temp.setV8(cvtHarmonicCorrectionFactorsDTO.getH8().doubleValue()*temp.getV8()); + temp.setV9(cvtHarmonicCorrectionFactorsDTO.getH9().doubleValue()*temp.getV9()); + temp.setV10(cvtHarmonicCorrectionFactorsDTO.getH10().doubleValue()*temp.getV10()); + temp.setV11(cvtHarmonicCorrectionFactorsDTO.getH11().doubleValue()*temp.getV11()); + temp.setV12(cvtHarmonicCorrectionFactorsDTO.getH12().doubleValue()*temp.getV12()); + temp.setV13(cvtHarmonicCorrectionFactorsDTO.getH13().doubleValue()*temp.getV13()); + temp.setV14(cvtHarmonicCorrectionFactorsDTO.getH14().doubleValue()*temp.getV14()); + temp.setV15(cvtHarmonicCorrectionFactorsDTO.getH15().doubleValue()*temp.getV15()); + temp.setV16(cvtHarmonicCorrectionFactorsDTO.getH16().doubleValue()*temp.getV16()); + temp.setV17(cvtHarmonicCorrectionFactorsDTO.getH17().doubleValue()*temp.getV17()); + temp.setV18(cvtHarmonicCorrectionFactorsDTO.getH18().doubleValue()*temp.getV18()); + temp.setV19(cvtHarmonicCorrectionFactorsDTO.getH19().doubleValue()*temp.getV19()); + temp.setV20(cvtHarmonicCorrectionFactorsDTO.getH20().doubleValue()*temp.getV20()); + temp.setV21(cvtHarmonicCorrectionFactorsDTO.getH21().doubleValue()*temp.getV21()); + temp.setV22(cvtHarmonicCorrectionFactorsDTO.getH22().doubleValue()*temp.getV22()); + temp.setV23(cvtHarmonicCorrectionFactorsDTO.getH23().doubleValue()*temp.getV23()); + temp.setV24(cvtHarmonicCorrectionFactorsDTO.getH24().doubleValue()*temp.getV24()); + temp.setV25(cvtHarmonicCorrectionFactorsDTO.getH25().doubleValue()*temp.getV25()); + temp.setV26(cvtHarmonicCorrectionFactorsDTO.getH26().doubleValue()*temp.getV26()); + temp.setV27(cvtHarmonicCorrectionFactorsDTO.getH27().doubleValue()*temp.getV27()); + temp.setV28(cvtHarmonicCorrectionFactorsDTO.getH28().doubleValue()*temp.getV28()); + temp.setV29(cvtHarmonicCorrectionFactorsDTO.getH29().doubleValue()*temp.getV29()); + temp.setV30(cvtHarmonicCorrectionFactorsDTO.getH30().doubleValue()*temp.getV30()); + temp.setV31(cvtHarmonicCorrectionFactorsDTO.getH31().doubleValue()*temp.getV31()); + temp.setV32(cvtHarmonicCorrectionFactorsDTO.getH32().doubleValue()*temp.getV32()); + temp.setV33(cvtHarmonicCorrectionFactorsDTO.getH33().doubleValue()*temp.getV33()); + temp.setV34(cvtHarmonicCorrectionFactorsDTO.getH34().doubleValue()*temp.getV34()); + temp.setV35(cvtHarmonicCorrectionFactorsDTO.getH35().doubleValue()*temp.getV35()); + temp.setV36(cvtHarmonicCorrectionFactorsDTO.getH36().doubleValue()*temp.getV36()); + temp.setV37(cvtHarmonicCorrectionFactorsDTO.getH37().doubleValue()*temp.getV37()); + temp.setV38(cvtHarmonicCorrectionFactorsDTO.getH38().doubleValue()*temp.getV38()); + temp.setV39(cvtHarmonicCorrectionFactorsDTO.getH39().doubleValue()*temp.getV39()); + temp.setV40(cvtHarmonicCorrectionFactorsDTO.getH40().doubleValue()*temp.getV40()); + temp.setV41(cvtHarmonicCorrectionFactorsDTO.getH41().doubleValue()*temp.getV41()); + temp.setV42(cvtHarmonicCorrectionFactorsDTO.getH42().doubleValue()*temp.getV42()); + temp.setV43(cvtHarmonicCorrectionFactorsDTO.getH43().doubleValue()*temp.getV43()); + temp.setV44(cvtHarmonicCorrectionFactorsDTO.getH44().doubleValue()*temp.getV44()); + temp.setV45(cvtHarmonicCorrectionFactorsDTO.getH45().doubleValue()*temp.getV45()); + temp.setV46(cvtHarmonicCorrectionFactorsDTO.getH46().doubleValue()*temp.getV46()); + temp.setV47(cvtHarmonicCorrectionFactorsDTO.getH47().doubleValue()*temp.getV47()); + temp.setV48(cvtHarmonicCorrectionFactorsDTO.getH48().doubleValue()*temp.getV48()); + temp.setV49(cvtHarmonicCorrectionFactorsDTO.getH49().doubleValue()*temp.getV49()); + temp.setV50(cvtHarmonicCorrectionFactorsDTO.getH50().doubleValue()*temp.getV50()); + + }); + return v; + } + +} diff --git a/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/MeasurementHourTaskRunner.java b/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/MeasurementHourTaskRunner.java new file mode 100644 index 000000000..26baaa34b --- /dev/null +++ b/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/MeasurementHourTaskRunner.java @@ -0,0 +1,39 @@ +package com.njcn.system.timer.tasks; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import com.njcn.prepare.harmonic.api.liteflow.LiteFlowFeignClient; +import com.njcn.prepare.harmonic.pojo.bo.BaseParam; +import com.njcn.system.timer.TimerTaskRunner; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +/** + * 类的介绍:监测点算法执行链定时任务 + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/12/6 9:35 + */ +@Component +@RequiredArgsConstructor +public class MeasurementHourTaskRunner implements TimerTaskRunner { + + private final LiteFlowFeignClient liteFlowFeignClient; + + @Override + public void action(String date) { + BaseParam baseParam = new BaseParam(); + baseParam.setFullChain(true); + baseParam.setRepair(false); + if(StrUtil.isBlank(date)){ + baseParam.setDataDate(DateUtil.yesterday().toString(DatePattern.NORM_DATETIME_PATTERN)); + }else { + baseParam.setDataDate(date); + } + liteFlowFeignClient.measurementPointExecutorByHour(baseParam); + } + + +} diff --git a/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/MeasurementTaskRunner.java b/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/MeasurementTaskRunner.java index 31e0c13a9..9e93dd792 100644 --- a/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/MeasurementTaskRunner.java +++ b/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/MeasurementTaskRunner.java @@ -28,7 +28,7 @@ public class MeasurementTaskRunner implements TimerTaskRunner { baseParam.setFullChain(true); baseParam.setRepair(false); if(StrUtil.isBlank(date)){ - baseParam.setDataDate(DateUtil.yesterday().toString(DatePattern.NORM_DATE_PATTERN)); + baseParam.setDataDate(DateUtil.now()); }else { baseParam.setDataDate(date); }