diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EnergyStatisticFeignClient.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EnergyStatisticFeignClient.java new file mode 100644 index 000000000..41eeff0cd --- /dev/null +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EnergyStatisticFeignClient.java @@ -0,0 +1,28 @@ +package com.njcn.energy.pojo.api; + +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.energy.pojo.api.fallback.EleOnlineRateFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; + + +/** + * pqs + * + * @author cdf + * @date 2022/4/22 + */ +@FeignClient(value = ServerInfo.ENERGY,path = "/energyJob",fallbackFactory = EleOnlineRateFallbackFactory.class) +public interface EnergyStatisticFeignClient { + + /** + * 获取策略下的监测点 + * @author cdf + * @date 2022/4/22 + */ + @GetMapping("electricCalJob") + HttpResult electricCalJob(); + +} diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EnergyStatisticFallbackFactory.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EnergyStatisticFallbackFactory.java new file mode 100644 index 000000000..45079700f --- /dev/null +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EnergyStatisticFallbackFactory.java @@ -0,0 +1,43 @@ +package com.njcn.energy.pojo.api.fallback; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.energy.pojo.api.EleAirStrategyFeignClient; +import com.njcn.energy.pojo.api.EnergyStatisticFeignClient; +import com.njcn.energy.pojo.utils.EnergyEnumUtil; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2022/4/20 15:09 + */ +@Slf4j +@Component +public class EnergyStatisticFallbackFactory implements FallbackFactory { + + + @Override + public EnergyStatisticFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if(cause.getCause() instanceof BusinessException){ + BusinessException businessException = (BusinessException) cause.getCause(); + exceptionEnum = EnergyEnumUtil.getExceptionEnum(businessException.getResult()); + } + Enum finalExceptionEnum = exceptionEnum; + return new EnergyStatisticFeignClient() { + @Override + public HttpResult electricCalJob() { + log.error("{}异常,降级处理,异常为:{}","调度统计电量增量",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/StatisticDataRunController.java b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/StatisticDataRunController.java new file mode 100644 index 000000000..125ac0c6c --- /dev/null +++ b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/StatisticDataRunController.java @@ -0,0 +1,152 @@ +package com.njcn.energy.controller; + + +import cn.hutool.core.collection.CollectionUtil; +import com.njcn.influx.config.InfluxDbConfig; +import com.njcn.influx.utils.InfluxDbUtils; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.springframework.scheduling.annotation.Async; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@RestController +@Api(tags = "用能系统数据调度") +@RequiredArgsConstructor +@Slf4j +@RequestMapping("energyJob") +public class StatisticDataRunController { + + private final InfluxDbUtils influxDbUtils; + + private final InfluxDbConfig influxDbConfig; + + + /** + * 调度统计电量增量 + */ + @GetMapping("electricCalJob") + @Async("asyncExecutor") + public void electricCalJob() { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Calendar startTime = Calendar.getInstance(); + + startTime.add(Calendar.MINUTE, -40); + String start = sdf.format(startTime.getTime()); + + Calendar endTime = Calendar.getInstance(); + //endTime.add(Calendar.DAY_OF_MONTH,-8); + String end = sdf.format(endTime.getTime()); + + String sql = "select * from power_data where time > '" + start + "' and time < '" + end + "' order by time"; + List> list = influxDbUtils.getResult(sql); + + if (CollectionUtil.isNotEmpty(list)) { + Map lineTemMap = new HashMap<>(); + List> insertObj = new ArrayList<>(); + list.stream().collect(Collectors.groupingBy((item) -> item.get("LineId"), Collectors.toList())).forEach(lineTemMap::put); + lineTemMap.forEach((lineId, mapList) -> { + List> tmpList = (List) mapList; + if (CollectionUtil.isNotEmpty(tmpList)) { + for (int i = 0; i < tmpList.size(); i++) { + if (i != 0) { + Map mapEnd = tmpList.get(i); + Map mapStart = tmpList.get(i - 1); + Long endT = (Long) mapEnd.get("time") + 28800; + Long startT = (Long) mapStart.get("time") + 28800; + if (endT > startT) { + Map map = new HashMap<>(); + if (endT - startT == 60 * 15) { + for (Map.Entry entry : tmpList.get(i).entrySet()) { + if (entry.getKey().equals("Phase") || entry.getKey().equals("LineId") || entry.getKey().equals("Stat_Method")) { + map.put(entry.getKey(), entry.getValue()); + } else if (entry.getKey().equals("time")) { + map.put(entry.getKey(), endT); + } else { + //定义 float最大值 + Float endThis = Float.parseFloat(entry.getValue().toString()); + Float startThis = Float.parseFloat(mapStart.get(entry.getKey()).toString()); + if (-1000000f == endThis || -1000000f == startThis) { + map.put(entry.getKey(), -1000000f); + } else { + map.put(entry.getKey(), endThis - startThis); + } + } + } + insertObj.add(map); + } else { + //缺少数据处理 + + } + } else { + log.error("时间起始顺序错误!"); + } + } + } + } + }); + + if (CollectionUtil.isNotEmpty(insertObj)) { + BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).build(); + for (Map it : insertObj) { + Point.Builder point = Point.measurement("power_data_add"); + it.forEach((key, val) -> { + if (key.equals("LineId")) { + point.tag(key, val.toString()); + } else if (key.equals("time")) { + point.time((Long) val, TimeUnit.SECONDS); + } else if (key.equals("Phase") || key.equals("Stat_Method")) { + point.addField(key, val.toString()); + } else { + point.addField(key, (Float) val); + } + }); + Point p = point.build(); + batchPoints.point(p); + } + + try { + influxDbUtils.batchInsert(batchPoints, TimeUnit.SECONDS); + HashMap> hashMap = new HashMap<>(); + for (int j = 0; j < insertObj.size(); j++) { + hashMap.put(insertObj.get(j).get("LineId").toString(), insertObj.get(j)); + } + hashMap.forEach((key, val) -> { + Long t = (Long) val.get("time") * 1000 - 28800000; + String tt = Instant.ofEpochMilli((Long) val.get("time") * 1000).toString(); + String sqlQs = "delete from power_data_add_Real where time< '" + tt + "' and LineId ='" + key + "'"; + influxDbUtils.deleteMeasurementData(sqlQs); + Map filedss = new HashMap<>(); + Map tagss = new HashMap<>(); + val.forEach((keytem, valtem) -> { + if (keytem.equals("time")) { + + } else if (keytem.equals("LineId")) { + tagss.put(keytem, valtem.toString()); + } else { + filedss.put(keytem, valtem); + } + }); + influxDbUtils.insert("power_data_add_Real", tagss, filedss, t, TimeUnit.MILLISECONDS); + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + } else { + log.error(start + "——————" + end + "数据为空"); + } + + + } +} diff --git a/pqs-system/system-boot/pom.xml b/pqs-system/system-boot/pom.xml index fa0467979..12112519d 100644 --- a/pqs-system/system-boot/pom.xml +++ b/pqs-system/system-boot/pom.xml @@ -80,6 +80,12 @@ ${project.version} + + com.njcn + energy-api + ${project.version} + + diff --git a/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/energy/EnergyStatisticRunner.java b/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/energy/EnergyStatisticRunner.java new file mode 100644 index 000000000..97929e628 --- /dev/null +++ b/pqs-system/system-boot/src/main/java/com/njcn/system/timer/tasks/energy/EnergyStatisticRunner.java @@ -0,0 +1,21 @@ +package com.njcn.system.timer.tasks.energy; + + +import com.njcn.energy.pojo.api.EnergyStatisticFeignClient; +import com.njcn.system.timer.TimerTaskRunner; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class EnergyStatisticRunner implements TimerTaskRunner { + + private final EnergyStatisticFeignClient energyStatisticFeignClient; + + @Override + public void action(String date) { + + energyStatisticFeignClient.electricCalJob(); + + } +}