From ea2962840ca9d1f88554b1d4709225b956dfd666 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Mon, 25 May 2026 14:38:35 +0800 Subject: [PATCH] =?UTF-8?q?feat(topology):=20=E6=B7=BB=E5=8A=A0=E6=8B=93?= =?UTF-8?q?=E6=89=91=E5=9B=BE=E7=9B=AE=E6=A0=87=E6=8C=87=E6=A0=87=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E5=B9=B6=E9=87=8D=E6=9E=84=E6=95=B0=E6=8D=AE=E5=A4=84?= =?UTF-8?q?=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在AppLineTopologyDiagram相关类中添加target字段用于绑定指标 - 重构DeviceMessageClient的getLineInfo方法参数结构 - 实现CsLineTopologyFeignClient的queryTopologyDiagram接口及降级处理 - 更新MqttMessageHandler中拓扑图数据处理逻辑,支持自定义指标查询 - 添加PqdData数据拆分服务IPqdDataSplitService及其实现 - 优化PortableOfflLogServiceImpl中的数据入库逻辑 - 修复CsTerminalReplyServiceImpl中时间范围查询条件 - 移除未使用的导入和代码,优化服务依赖注入 --- .../api/CsLineTopologyFeignClient.java | 7 + .../api/DeviceMessageFeignClient.java | 10 +- .../CsLineTopologyClientFallbackFactory.java | 7 + .../DeviceMessageClientFallbackFactory.java | 4 +- .../njcn/csdevice/param/LineInfoParam.java | 21 + .../param/AppLineTopologyDiagramParm.java | 1 + .../pojo/po/AppLineTopologyDiagramPO.java | 2 + .../pojo/vo/AppLineTopologyDiagramVO.java | 7 +- .../message/DeviceMessageController.java | 11 +- .../service/IPqdDataSplitService.java | 25 + .../AppLineTopologyDiagramServiceImpl.java | 11 +- .../impl/CsTerminalReplyServiceImpl.java | 24 +- .../impl/PortableOfflLogServiceImpl.java | 98 ++- .../service/impl/PqdDataSplitServiceImpl.java | 425 +++++++++++ .../handler/MqttMessageHandler.java | 175 ++++- .../csreport/controller/TestController.java | 8 +- .../njcn/csreport/job/ReportCovertJob.java | 696 +++++++++--------- 17 files changed, 1079 insertions(+), 453 deletions(-) create mode 100644 cs-device/cs-device-api/src/main/java/com/njcn/csdevice/param/LineInfoParam.java create mode 100644 cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/IPqdDataSplitService.java create mode 100644 cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PqdDataSplitServiceImpl.java diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/CsLineTopologyFeignClient.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/CsLineTopologyFeignClient.java index 1c4315b..cd3e56a 100644 --- a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/CsLineTopologyFeignClient.java +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/CsLineTopologyFeignClient.java @@ -4,9 +4,12 @@ import com.njcn.common.pojo.constant.ServerInfo; import com.njcn.common.pojo.response.HttpResult; import com.njcn.csdevice.api.fallback.CsLineTopologyClientFallbackFactory; import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO; +import com.njcn.csdevice.pojo.vo.AppTopologyDiagramVO; +import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; import java.util.List; @@ -19,4 +22,8 @@ public interface CsLineTopologyFeignClient { @PostMapping("/addList") HttpResult addList(@RequestBody List list); + @PostMapping("/queryTopologyDiagram") + @ApiOperation("查询装置拓扑图") + HttpResult queryTopologyDiagram(@RequestParam(value="devId") String devId); + } diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/DeviceMessageFeignClient.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/DeviceMessageFeignClient.java index 08bb32a..0473b07 100644 --- a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/DeviceMessageFeignClient.java +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/DeviceMessageFeignClient.java @@ -1,17 +1,11 @@ package com.njcn.csdevice.api; -import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.constant.ServerInfo; -import com.njcn.common.pojo.enums.common.LogEnum; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; -import com.njcn.common.utils.HttpResultUtil; import com.njcn.csdevice.api.fallback.DeviceMessageClientFallbackFactory; import com.njcn.csdevice.param.DeviceMessageParam; -import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.csdevice.param.LineInfoParam; import com.njcn.user.pojo.po.User; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; @@ -36,6 +30,6 @@ public interface DeviceMessageFeignClient { @PostMapping("/getLineInfo") @ApiOperation("获取监测点信息") - HttpResult getLineInfo(@RequestParam("id") String id, @RequestParam(value = "list", required = false) List list); + HttpResult getLineInfo(@RequestBody LineInfoParam param); } diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/CsLineTopologyClientFallbackFactory.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/CsLineTopologyClientFallbackFactory.java index 07b5423..9d6bf2d 100644 --- a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/CsLineTopologyClientFallbackFactory.java +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/CsLineTopologyClientFallbackFactory.java @@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.csdevice.api.CsLineTopologyFeignClient; import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO; +import com.njcn.csdevice.pojo.vo.AppTopologyDiagramVO; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -33,6 +34,12 @@ public class CsLineTopologyClientFallbackFactory implements FallbackFactory queryTopologyDiagram(String devId) { + log.error("{}异常,降级处理,异常为:{}","查询装置拓扑图异常",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; } } diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/DeviceMessageClientFallbackFactory.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/DeviceMessageClientFallbackFactory.java index 59c9c6d..940c21d 100644 --- a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/DeviceMessageClientFallbackFactory.java +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/api/fallback/DeviceMessageClientFallbackFactory.java @@ -5,7 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.csdevice.api.DeviceMessageFeignClient; import com.njcn.csdevice.param.DeviceMessageParam; -import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.csdevice.param.LineInfoParam; import com.njcn.user.pojo.po.User; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; @@ -42,7 +42,7 @@ public class DeviceMessageClientFallbackFactory implements FallbackFactory getLineInfo(String id, List list) { + public HttpResult getLineInfo(LineInfoParam param) { log.error("{}异常,降级处理,异常为:{}","获取监测点信息数据异常",cause.toString()); throw new BusinessException(finalExceptionEnum); } diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/param/LineInfoParam.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/param/LineInfoParam.java new file mode 100644 index 0000000..76784e6 --- /dev/null +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/param/LineInfoParam.java @@ -0,0 +1,21 @@ +package com.njcn.csdevice.param; + +import com.njcn.csdevice.pojo.po.CsLinePO; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.List; + +/** + * @author xy + */ +@Data +public class LineInfoParam { + + @ApiModelProperty("nDid") + private String nDid; + + @ApiModelProperty("监测点集合") + List list; + +} diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/param/AppLineTopologyDiagramParm.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/param/AppLineTopologyDiagramParm.java index 0966e9d..5743fc6 100644 --- a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/param/AppLineTopologyDiagramParm.java +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/param/AppLineTopologyDiagramParm.java @@ -44,5 +44,6 @@ public class AppLineTopologyDiagramParm extends BaseEntity { private Double lat; + private String target; } \ No newline at end of file diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/po/AppLineTopologyDiagramPO.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/po/AppLineTopologyDiagramPO.java index 4343fe3..841bbe8 100644 --- a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/po/AppLineTopologyDiagramPO.java +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/po/AppLineTopologyDiagramPO.java @@ -45,5 +45,7 @@ public class AppLineTopologyDiagramPO extends BaseEntity { @TableField(value = "lat") private Double lat; + @TableField(value = "target") + private String target; } \ No newline at end of file diff --git a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/vo/AppLineTopologyDiagramVO.java b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/vo/AppLineTopologyDiagramVO.java index 506c7d7..aecf191 100644 --- a/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/vo/AppLineTopologyDiagramVO.java +++ b/cs-device/cs-device-api/src/main/java/com/njcn/csdevice/pojo/vo/AppLineTopologyDiagramVO.java @@ -1,8 +1,6 @@ package com.njcn.csdevice.pojo.vo; -import com.baomidou.mybatisplus.annotation.TableField; -import com.baomidou.mybatisplus.annotation.TableName; -import com.njcn.db.bo.BaseEntity; +import io.swagger.annotations.ApiModelProperty; import lombok.Data; /** @@ -43,5 +41,6 @@ public class AppLineTopologyDiagramVO { private String linePostion; - + @ApiModelProperty(value="绑定的指标") + private String target; } \ No newline at end of file diff --git a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/controller/message/DeviceMessageController.java b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/controller/message/DeviceMessageController.java index dfb5a8c..6ebbd3d 100644 --- a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/controller/message/DeviceMessageController.java +++ b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/controller/message/DeviceMessageController.java @@ -7,7 +7,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; import com.njcn.csdevice.param.DeviceMessageParam; -import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.csdevice.param.LineInfoParam; import com.njcn.csdevice.service.DeviceMessageService; import com.njcn.user.pojo.po.User; import com.njcn.web.controller.BaseController; @@ -64,13 +64,10 @@ public class DeviceMessageController extends BaseController { @OperateInfo(info = LogEnum.BUSINESS_COMMON) @PostMapping("/getLineInfo") @ApiOperation("获取监测点信息") - @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "参数", paramType = "query"), - @ApiImplicitParam(name = "list", value = "监测点id集合", paramType = "query",required = false) - }) - public HttpResult getLineInfo(@RequestParam(value = "id") String id, @RequestParam(value = "list",required = false) List list){ + @ApiImplicitParam(name = "param", value = "参数", required = true) + public HttpResult getLineInfo(@RequestBody LineInfoParam param){ String methodDescribe = getMethodDescribe("getLineInfo"); - deviceMessageService.getLineInfo(id,list); + deviceMessageService.getLineInfo(param.getNDid(),param.getList()); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "success", methodDescribe); } diff --git a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/IPqdDataSplitService.java b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/IPqdDataSplitService.java new file mode 100644 index 0000000..fee0208 --- /dev/null +++ b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/IPqdDataSplitService.java @@ -0,0 +1,25 @@ +package com.njcn.csdevice.service; + +import com.njcn.influx.pojo.po.cs.PqdData; + +import java.util.List; +import java.util.Map; + +/** + * PqdData数据拆分服务 + * + * @author system + * @since 2026-05-22 + */ +public interface IPqdDataSplitService { + + /** + * 将PqdData数据拆分到不同的表中 + * + * @param pqdDataList 原始PqdData数据列表 + * @return 包含两部分数据: + * - "splitData": 已拆分到各表的数据 Map<表名, 数据列表> + * - "remainingPqdData": 未匹配的PqdData数据(仍需插入InfluxDB) + */ + Map splitPqdData(List pqdDataList); +} diff --git a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/AppLineTopologyDiagramServiceImpl.java b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/AppLineTopologyDiagramServiceImpl.java index 2b36fbc..26820a0 100644 --- a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/AppLineTopologyDiagramServiceImpl.java +++ b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/AppLineTopologyDiagramServiceImpl.java @@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.mapper.AppLineTopologyDiagramMapper; -import com.njcn.csdevice.mapper.CsLedgerMapper; import com.njcn.csdevice.pojo.param.AppTopologyDiagramQueryParm; import com.njcn.csdevice.pojo.param.LinePostionParam; import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO; @@ -13,7 +12,6 @@ import com.njcn.csdevice.pojo.po.CsLedger; import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.csdevice.pojo.vo.AppLineTopologyDiagramVO; import com.njcn.csdevice.pojo.vo.AppTopologyDiagramVO; -import com.njcn.csdevice.pojo.vo.CsLineTopologyTemplateVO; import com.njcn.csdevice.service.AppLineTopologyDiagramService; import com.njcn.csdevice.service.AppTopologyDiagramService; import com.njcn.csdevice.service.CsLinePOService; @@ -73,7 +71,7 @@ public class AppLineTopologyDiagramServiceImpl extends ServiceImpl queryByLineIds(List lineIds) { List result = new ArrayList<>(); - if (lineIds != null && lineIds.size() > 0) { + if (lineIds != null && !lineIds.isEmpty()) { result = this.getBaseMapper().queryByLineIds( lineIds); } return result; @@ -83,14 +81,13 @@ public class AppLineTopologyDiagramServiceImpl extends ServiceImpl{ csLinePOService.lambdaUpdate().eq(CsLinePO::getLineId,temp.getLineId()).set(CsLinePO::getName,temp.getName()).set(CsLinePO::getPosition,temp.getLinePostion()).update(); this.lambdaUpdate().eq(AppLineTopologyDiagramPO::getId,temp.getId()). eq(AppLineTopologyDiagramPO::getLineId,temp.getLineId()).set(AppLineTopologyDiagramPO::getLat,temp.getLat()). - set(AppLineTopologyDiagramPO::getLng,temp.getLng()).set(AppLineTopologyDiagramPO::getId,linePostionParam.getId()).update(); - + set(AppLineTopologyDiagramPO::getLng,temp.getLng()).set(AppLineTopologyDiagramPO::getId,linePostionParam.getId()) + .set(AppLineTopologyDiagramPO::getTarget,temp.getTarget()) + .update(); iCsLedgerService.lambdaUpdate().eq(CsLedger::getId,temp.getLineId()).set(CsLedger::getName,temp.getName()).update(); }); } diff --git a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/CsTerminalReplyServiceImpl.java b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/CsTerminalReplyServiceImpl.java index cb25ac2..44f08f7 100644 --- a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/CsTerminalReplyServiceImpl.java +++ b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/CsTerminalReplyServiceImpl.java @@ -1,6 +1,7 @@ package com.njcn.csdevice.service.impl; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -177,27 +178,10 @@ public class CsTerminalReplyServiceImpl extends ServiceImpl queryWrapper = new QueryWrapper<>(); if (StrUtil.isNotBlank(param.getSearchValue())) { queryWrapper.like("cs_terminal_reply.line_id", param.getSearchValue()); -// //获取监测点id -// List list = csLinePOService.getLineByName(param.getSearchValue()); -// if (CollectionUtil.isEmpty(list)) { -// return page; -// } else { -// queryWrapper.and(pr -> { -// // 获取所有 lineId -// List lineIds = list.stream() -// .map(CsLinePO::getLineId) -// .collect(Collectors.toList()); -// // 使用 OR 条件连接多个 lineId -// for (int i = 0; i < lineIds.size(); i++) { -// if (i == 0) { -// pr.like("cs_terminal_reply.line_id", lineIds.get(i)); -// } else { -// pr.or().like("cs_terminal_reply.line_id", lineIds.get(i)); -// } -// } -// }); -// } } + queryWrapper.between("cs_terminal_reply.Create_Time" + , DateUtil.beginOfDay(DateUtil.parse(param.getSearchBeginTime())) + , DateUtil.endOfDay(DateUtil.parse(param.getSearchEndTime()))); //排序 queryWrapper.orderBy(true, false, "cs_terminal_reply.create_time"); queryWrapper.in("cs_terminal_reply.code", Arrays.asList("allFile", "allEvent", "oneFile")); diff --git a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PortableOfflLogServiceImpl.java b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PortableOfflLogServiceImpl.java index b74e3a5..f8eee5e 100644 --- a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PortableOfflLogServiceImpl.java +++ b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PortableOfflLogServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.common.pojo.enums.common.DataStateEnum; import com.njcn.common.pojo.exception.BusinessException; @@ -12,17 +13,13 @@ import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.constant.DataParam; import com.njcn.csdevice.enums.AlgorithmResponseEnum; import com.njcn.csdevice.mapper.PortableOfflLogMapper; +import com.njcn.csdevice.param.UploadDataParam; import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO; import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.csdevice.pojo.po.PortableOffMainLog; import com.njcn.csdevice.pojo.po.PortableOfflLog; import com.njcn.csdevice.pojo.po.WlRecord; -import com.njcn.csdevice.service.CsLinePOService; -import com.njcn.csdevice.service.IPortableOfflLogService; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.njcn.csdevice.param.UploadDataParam; -import com.njcn.csdevice.service.IWlRecordService; -import com.njcn.csdevice.service.PortableOffMainLogService; +import com.njcn.csdevice.service.*; import com.njcn.csdevice.util.InfluxDbParamUtil; import com.njcn.csharmonic.api.EventFeignClient; import com.njcn.csharmonic.api.OfflineDataUploadFeignClient; @@ -36,8 +33,8 @@ import com.njcn.csharmonic.offline.mincfg.AnalyseComtradeCfg; import com.njcn.csharmonic.offline.mincfg.vo.CmnModeCfg; import com.njcn.csharmonic.offline.vo.Response; import com.njcn.csharmonic.pojo.po.CsEventPO; -import com.njcn.influx.imapper.EvtDataMapper; -import com.njcn.influx.imapper.PqdDataMapper; +import com.njcn.influx.imapper.*; +import com.njcn.influx.pojo.po.*; import com.njcn.influx.pojo.po.cs.EntData; import com.njcn.influx.pojo.po.cs.PqdData; import com.njcn.oss.utils.FileStorageUtil; @@ -50,11 +47,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.ListUtils; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; -import org.apache.poi.ss.formula.functions.T; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.web.multipart.MultipartFile; + import java.io.*; import java.math.BigDecimal; import java.text.DecimalFormat; @@ -81,25 +78,31 @@ public class PortableOfflLogServiceImpl extends ServiceImpl> partition = ListUtils.partition(pqdData, 1500); for (List sliceList : partition) { List sublistAsOriginalListType = new ArrayList<>(sliceList); +// Map map = pqdDataSplitService.splitPqdData(sublistAsOriginalListType); +// insertData(map); pqdDataMapper.insertBatch(sublistAsOriginalListType); } //min结果集解析入库后就不需要在解析了 @@ -604,4 +609,61 @@ public class PortableOfflLogServiceImpl extends ServiceImpl map) { + //13张表 + Map> splitData = (Map>) map.get("splitData"); + splitData.forEach((key, value) -> { + switch (key) { + case "data_v": + dataVMapper.insertBatch((List) value); + break; + case "data_i": + dataIMapper.insertBatch((List) value); + break; + case "data_flicker": + dataFlicker.insertBatch((List) value); + break; + case "data_fluc": + dataFluc.insertBatch((List) value); + break; + case "data_harmphasic_i": + dataHarmPhasicI.insertBatch((List) value); + break; + case "data_harmphasic_v": + dataHarmPhasicV.insertBatch((List) value); + break; + case "data_harmpower_p": + dataHarmPowerP.insertBatch((List) value); + break; + case "data_harmpower_q": + dataHarmPowerQ.insertBatch((List) value); + break; + case "data_harmpower_s": + dataHarmPowerS.insertBatch((List) value); + break; + case "data_harmrate_v": + dataHarmRateV.insertBatch((List) value); + break; + case "data_harmrate_i": + dataHarmRateI.insertBatch((List) value); + break; + case "data_inharm_v": + dataInHarmV.insertBatch((List) value); + break; + case "data_plt": + dataPlt.insertBatch((List) value); + break; + default: + log.warn("不支持的目标表: {}", key); + break; + } + }); + + //其余数据 + List remainingPqdData = (List) map.get("remainingPqdData"); + if (!CollectionUtils.isEmpty(remainingPqdData)) { + pqdDataMapper.insertBatch(remainingPqdData); + } + } } diff --git a/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PqdDataSplitServiceImpl.java b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PqdDataSplitServiceImpl.java new file mode 100644 index 0000000..53cc6ff --- /dev/null +++ b/cs-device/cs-device-boot/src/main/java/com/njcn/csdevice/service/impl/PqdDataSplitServiceImpl.java @@ -0,0 +1,425 @@ +package com.njcn.csdevice.service.impl; + +import cn.hutool.core.util.StrUtil; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.csdevice.service.IPqdDataSplitService; +import com.njcn.influx.pojo.po.*; +import com.njcn.influx.pojo.po.cs.PqdData; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.api.EpdFeignClient; +import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.pojo.po.DictData; +import com.njcn.system.pojo.po.EleEpdPqd; +import com.njcn.system.pojo.vo.EleEpdPqdListVO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.lang.reflect.Field; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * PqdData数据拆分服务实现 + * + * @author system + * @since 2026-05-22 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class PqdDataSplitServiceImpl implements IPqdDataSplitService { + + private final RedisUtil redisUtil; + private final EpdFeignClient epdFeignClient; + private final DicDataFeignClient dicDataFeignClient; + + @Override + public Map splitPqdData(List pqdDataList) { + Map result = new HashMap<>(); + + if (CollectionUtils.isEmpty(pqdDataList)) { + result.put("splitData", Collections.emptyMap()); + result.put("remainingPqdData", Collections.emptyList()); + return result; + } + + log.info("开始拆分PqdData数据,共{}条记录", pqdDataList.size()); + + Map> splitDataMap = new HashMap<>(); + List remainingPqdData = new ArrayList<>(); + + //获取表映射关系 + Map map1 = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); + //获取字段映射关系 + DictData pqd = dicDataFeignClient.getDicDataByCode(DicDataEnum.PQD.getCode()).getData(); + List list = epdFeignClient.selectAll().getData(); + List epdList = list.stream().filter(item -> Objects.equals(item.getDataType(), pqd.getId())) + .findFirst() + .map(EleEpdPqdListVO::getEleEpdPqdVOS) + .orElse(new ArrayList<>()); + // 处理 harmStart 和 harmEnd + List resultList = epdList.stream() + .flatMap(item -> { + if (item.getHarmStart() != null && item.getHarmEnd() != null) { + // 生成从 harmStart 到 harmEnd 的新对象 + List expandedList = new ArrayList<>(); + for (int i = item.getHarmStart(); i <= item.getHarmEnd(); i++) { + EleEpdPqd newItem = copyEleEpdPqd(item); + newItem.setName(item.getName() + "_" + i); + expandedList.add(newItem); + } + return expandedList.stream(); + } else { + // 保持原对象 + return Stream.of(item); + } + }) + .collect(Collectors.toList()); + Map> map2 = resultList.stream().collect(Collectors.groupingBy(EleEpdPqd::getName,LinkedHashMap::new,Collectors.toList())); + + try { + for (PqdData pqdData : pqdDataList) { + boolean hasMatchedField = false; + + Map fieldValues = extractFieldValues(pqdData); + + for (Map.Entry entry : fieldValues.entrySet()) { + String sourceFieldName = StringUtils.capitalize(entry.getKey()); + Object value = entry.getValue(); + + if (value == null || isCommonField(sourceFieldName)) { + continue; + } + String tableName = map1.get(sourceFieldName); + String otherName = sourceFieldName; + List v = map2.get(sourceFieldName); + if (!CollectionUtils.isEmpty(v)) { + otherName = toCamelCase(v.get(0).getOtherName()); + } + + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(otherName)) { + continue; + } + + hasMatchedField = true; + + if (!splitDataMap.containsKey(tableName)) { + splitDataMap.put(tableName, new ArrayList<>()); + } + + createAndAddTargetData((List) splitDataMap.get(tableName), tableName, pqdData, otherName, value); + } + + if (!hasMatchedField) { + remainingPqdData.add(pqdData); + } + } + + result.put("splitData", splitDataMap); + result.put("remainingPqdData", remainingPqdData); + + log.info("PqdData数据拆分完成,拆分为{}个表,剩余{}条未匹配数据需插入InfluxDB", + splitDataMap.size(), remainingPqdData.size()); + + } catch (Exception e) { + log.error("PqdData数据拆分失败", e); + throw new RuntimeException("PqdData数据拆分失败", e); + } + + return result; + } + + private Map extractFieldValues(PqdData pqdData) { + Map fieldValues = new HashMap<>(); + Field[] fields = PqdData.class.getDeclaredFields(); + + for (Field field : fields) { + field.setAccessible(true); + try { + Object value = field.get(pqdData); + if (value != null) { + fieldValues.put(field.getName(), value); + } + } catch (IllegalAccessException e) { + log.warn("获取字段 {} 的值失败", field.getName(), e); + } + } + + return fieldValues; + } + + private boolean isCommonField(String fieldName) { + Set commonFields = new HashSet<>(Arrays.asList( + "time", "lineId", "phasicType", "valueType", "clDid", + "process", "qualityFlag" + )); + return commonFields.contains(fieldName); + } + + private void createAndAddTargetData(List dataList, String tableName, + PqdData pqdData, String targetField, Object value) { + switch (tableName.toLowerCase()) { + case "data_v": + dataList.add(createRStatDataVD(pqdData, targetField, value)); + break; + case "data_i": + dataList.add(createRStatDataID(pqdData, targetField, value)); + break; + case "data_flicker": + dataList.add(createRStatDataFlicker(pqdData, targetField, value)); + break; + case "data_fluc": + dataList.add(createRStatDataFluc(pqdData, targetField, value)); + break; + case "data_harmphasic_i": + dataList.add(createRStatDataHarmphasicI(pqdData, targetField, value)); + break; + case "data_harmphasic_v": + dataList.add(createRStatDataHarmphasicV(pqdData, targetField, value)); + break; + case "data_harmpower_p": + dataList.add(createRStatDataHarmpowerP(pqdData, targetField, value)); + break; + case "data_harmpower_q": + dataList.add(createRStatDataHarmpowerQ(pqdData, targetField, value)); + break; + case "data_harmpower_s": + dataList.add(createRStatDataHarmpowerS(pqdData, targetField, value)); + break; + case "data_harmrate_v": + dataList.add(createRStatDataHarmrateV(pqdData, targetField, value)); + break; + case "data_harmrate_i": + dataList.add(createRStatDataHarmrateI(pqdData, targetField, value)); + break; + case "data_inharm_v": + dataList.add(createRStatDataInharmV(pqdData, targetField, value)); + break; + case "data_plt": + dataList.add(createRStatDataPlt(pqdData, targetField, value)); + break; + default: + log.warn("不支持的目标表: {}", tableName); + break; + } + } + + private Object createRStatDataVD(PqdData pqdData, String targetField, Object value) { + DataV data = new DataV(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataID(PqdData pqdData, String targetField, Object value) { + DataI data = new DataI(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setValueType(pqdData.getPhaseType()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataFlicker(PqdData pqdData, String targetField, Object value) { + DataFlicker data = new DataFlicker(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataFluc(PqdData pqdData, String targetField, Object value) { + DataFluc data = new DataFluc(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataHarmphasicI(PqdData pqdData, String targetField, Object value) { + DataHarmPhasicI data = new DataHarmPhasicI(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataHarmphasicV(PqdData pqdData, String targetField, Object value) { + DataHarmPhasicV data = new DataHarmPhasicV(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataHarmpowerP(PqdData pqdData, String targetField, Object value) { + DataHarmPowerP data = new DataHarmPowerP(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataHarmpowerQ(PqdData pqdData, String targetField, Object value) { + DataHarmPowerQ data = new DataHarmPowerQ(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataHarmpowerS(PqdData pqdData, String targetField, Object value) { + DataHarmPowerS data = new DataHarmPowerS(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataHarmrateV(PqdData pqdData, String targetField, Object value) { + DataHarmRateV data = new DataHarmRateV(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataHarmrateI(PqdData pqdData, String targetField, Object value) { + DataHarmRateI data = new DataHarmRateI(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataInharmV(PqdData pqdData, String targetField, Object value) { + DataInHarmV data = new DataInHarmV(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private Object createRStatDataPlt(PqdData pqdData, String targetField, Object value) { + DataPlt data = new DataPlt(); + data.setTime(pqdData.getTime()); + data.setLineId(pqdData.getLineId()); + data.setPhaseType(pqdData.getPhaseType()); + data.setValueType(pqdData.getValueType()); + data.setClDid(pqdData.getClDid()); + data.setProcess(pqdData.getProcess()); + data.setQualityFlag("0"); + setFieldValue(data, targetField, value); + return data; + } + + private void setFieldValue(Object obj, String fieldName, Object value) { + try { + String camelCaseFieldName = toCamelCase(fieldName); + Field field = obj.getClass().getDeclaredField(camelCaseFieldName); + field.setAccessible(true); + field.set(obj, value); + } catch (NoSuchFieldException | IllegalAccessException e) { + log.warn("设置字段 {} 的值失败", fieldName, e); + } + } + + public static String toCamelCase(String input) { + if (input == null || input.isEmpty()) { + return input; + } + StringBuilder result = new StringBuilder(); + boolean toUpper = false; + + for (char c : input.toCharArray()) { + if (c == '_') { + toUpper = true; + } else { + if (toUpper) { + result.append(Character.toUpperCase(c)); + toUpper = false; + } else { + result.append(c); + } + } + } + // 确保首字母小写,以匹配Java实体字段命名规范 + if (result.length() > 0 && Character.isUpperCase(result.charAt(0))) { + result.setCharAt(0, Character.toLowerCase(result.charAt(0))); + } + return result.toString(); + } + + private EleEpdPqd copyEleEpdPqd(EleEpdPqd source) { + if (source == null) { + return null; + } + EleEpdPqd target = new EleEpdPqd(); + BeanUtils.copyProperties(source, target); + return target; + } +} diff --git a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java index b141453..a0e7a31 100644 --- a/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java +++ b/cs-harmonic/cs-harmonic-boot/src/main/java/com/njcn/csharmonic/handler/MqttMessageHandler.java @@ -7,7 +7,12 @@ import com.github.tocrhz.mqtt.annotation.MqttSubscribe; import com.github.tocrhz.mqtt.annotation.NamedValue; import com.github.tocrhz.mqtt.annotation.Payload; import com.github.tocrhz.mqtt.publisher.MqttPublisher; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.CsLineTopologyFeignClient; import com.njcn.csdevice.api.DevCapacityFeignClient; +import com.njcn.csdevice.api.fallback.CsLineTopologyClientFallbackFactory; +import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.csdevice.pojo.vo.AppTopologyDiagramVO; import com.njcn.csharmonic.param.CommonStatisticalQueryParam; import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam; import com.njcn.csharmonic.pojo.dto.RealTimeDataDTO; @@ -21,6 +26,10 @@ import com.njcn.csharmonic.service.TemperatureService; import com.njcn.influx.pojo.dto.StatisticalDataDTO; import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.CsStatisticalSetFeignClient; +import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.enums.DicDataTypeEnum; +import com.njcn.system.pojo.po.DictData; import com.njcn.system.pojo.po.EleEpdPqd; import com.njcn.zlevent.api.FileFeignClient; import lombok.AllArgsConstructor; @@ -33,6 +42,7 @@ import org.springframework.util.CollectionUtils; import java.text.DecimalFormat; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -56,6 +66,9 @@ public class MqttMessageHandler { private final DecimalFormat df = new DecimalFormat("#0.000"); private final FileFeignClient fileFeignClient; private final CsEventPOService csEventPOService; + private final CsLineTopologyFeignClient csLineTopologyClient; + private final CsLineFeignClient csLineFeignClient; + private final DicDataFeignClient dicDataFeignClient; /** * 实时数据应答 @@ -248,7 +261,7 @@ public class MqttMessageHandler { }); - //过滤M相 + //过滤T相 List m = tempList.stream().filter(temp -> Objects.equals(temp.getPhase(), "T")).collect(Collectors.toList()); m.stream().forEach(temp -> { Stream.of("A", "B", "C").forEach(phase -> { @@ -260,41 +273,134 @@ public class MqttMessageHandler { }); //如果是基础数据则添加拓扑图的数据 if (Objects.equals("fc8c86dbc3f2d9810f5cd8f53c295415", typeId)) { - List apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList()); - Map> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); - collect3.forEach((k, v) -> { - if (!CollectionUtil.isEmpty(v)) { - double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); - ThdDataVO thdDataVO = new ThdDataVO(); - BeanUtils.copyProperties(v.get(0), thdDataVO); - thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); - thdDataVO.setPhase("avg"); - result.add(thdDataVO); - } - }); List apfRmsI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_RmsI_TolOut(A)")).collect(Collectors.toList()); - Map> collect2 = apfRmsI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); - collect2.forEach((k, v) -> { - if (!CollectionUtil.isEmpty(v)) { - double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); - ThdDataVO thdDataVO = new ThdDataVO(); - BeanUtils.copyProperties(v.get(0), thdDataVO); - thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); - thdDataVO.setPhase("avg"); - result.add(thdDataVO); - } - }); - List apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList()); - Map> collect4 = apfThdISys.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); - collect4.forEach((k, v) -> { - if (!CollectionUtil.isEmpty(v)) { - double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); - ThdDataVO thdDataVO = new ThdDataVO(); - BeanUtils.copyProperties(v.get(0), thdDataVO); - thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); - thdDataVO.setPhase("avg"); - result.add(thdDataVO); + List dictData = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.LINE_POSITION.getCode()).getData(); + Map codeToDictData = dictData.stream().collect(Collectors.toMap(DictData::getId, Function.identity())); + //添加拓扑图上自定义的指标 + //根据设备id查询拓扑图 + AppTopologyDiagramVO vo = csLineTopologyClient.queryTopologyDiagram(devId).getData(); + vo.getAppsLineTopologyDiagramPO().forEach(item -> { + String linePositionCode = codeToDictData.get(item.getLinePostion()).getCode(); + if (Objects.equals(linePositionCode, DicDataEnum.OUTPUT_SIDE.getCode())) { + //输出侧 + if (item.getTarget() != null && !item.getTarget().isEmpty()) { + List l1 = new ArrayList<>(); + //根据指标查询 + List data1 = csStatisticalSetFeignClient.queryStatisticalSelect(item.getTarget()).getData(); + data1.forEach(temp -> { + CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam(); + commonStatisticalQueryParam.setDevId(devId); + commonStatisticalQueryParam.setStatisticalId(temp.getId()); + commonStatisticalQueryParam.setValueType("avg"); + List listFuture = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam); + l1.addAll(listFuture); + }); + Map> collect3 = l1.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); + collect3.forEach((k, v) -> { + if (!CollectionUtil.isEmpty(v)) { + double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); + ThdDataVO thdDataVO = new ThdDataVO(); + BeanUtils.copyProperties(v.get(0), thdDataVO); + thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); + thdDataVO.setPhase("avg"); + result.add(thdDataVO); + } + }); + } else { + Map> collect2 = apfRmsI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); + collect2.forEach((k, v) -> { + if (!CollectionUtil.isEmpty(v)) { + double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); + ThdDataVO thdDataVO = new ThdDataVO(); + BeanUtils.copyProperties(v.get(0), thdDataVO); + thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); + thdDataVO.setPhase("avg"); + result.add(thdDataVO); + } + }); + } + } else if (Objects.equals(linePositionCode, DicDataEnum.GRID_SIDE.getCode())) { + //电网侧 + if (item.getTarget() != null && !item.getTarget().isEmpty()) { + //根据指标查询 + List l1 = new ArrayList<>(); + //根据指标查询 + List data1 = csStatisticalSetFeignClient.queryStatisticalSelect(item.getTarget()).getData(); + data1.forEach(temp -> { + CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam(); + commonStatisticalQueryParam.setDevId(devId); + commonStatisticalQueryParam.setStatisticalId(temp.getId()); + commonStatisticalQueryParam.setValueType("avg"); + List listFuture = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam); + l1.addAll(listFuture); + }); + Map> collect3 = l1.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); + collect3.forEach((k, v) -> { + if (!CollectionUtil.isEmpty(v)) { + double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); + ThdDataVO thdDataVO = new ThdDataVO(); + BeanUtils.copyProperties(v.get(0), thdDataVO); + thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); + thdDataVO.setPhase("avg"); + result.add(thdDataVO); + } + }); + } else { + List apfThdISys = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Sys(%)")).collect(Collectors.toList()); + Map> collect4 = apfThdISys.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); + collect4.forEach((k, v) -> { + if (!CollectionUtil.isEmpty(v)) { + double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); + ThdDataVO thdDataVO = new ThdDataVO(); + BeanUtils.copyProperties(v.get(0), thdDataVO); + thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); + thdDataVO.setPhase("avg"); + result.add(thdDataVO); + + } + }); + } + } else if (Objects.equals(linePositionCode, DicDataEnum.LOAD_SIDE.getCode())) { + //负载侧 + if (item.getTarget() != null && !item.getTarget().isEmpty()) { + //根据指标查询 + List l1 = new ArrayList<>(); + //根据指标查询 + List data1 = csStatisticalSetFeignClient.queryStatisticalSelect(item.getTarget()).getData(); + data1.forEach(temp -> { + CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam(); + commonStatisticalQueryParam.setDevId(devId); + commonStatisticalQueryParam.setStatisticalId(temp.getId()); + commonStatisticalQueryParam.setValueType("avg"); + List listFuture = stableDataService.queryFisrtCommonStatistical(commonStatisticalQueryParam); + l1.addAll(listFuture); + }); + Map> collect3 = l1.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); + collect3.forEach((k, v) -> { + if (!CollectionUtil.isEmpty(v)) { + double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); + ThdDataVO thdDataVO = new ThdDataVO(); + BeanUtils.copyProperties(v.get(0), thdDataVO); + thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); + thdDataVO.setPhase("avg"); + result.add(thdDataVO); + } + }); + } else { + List apfThdI = tempList.stream().filter(temp -> Objects.equals(temp.getStatisticalName(), "Apf_ThdA_Load(%)")).collect(Collectors.toList()); + Map> collect3 = apfThdI.stream().collect(Collectors.groupingBy(ThdDataVO::getLineId)); + collect3.forEach((k, v) -> { + if (!CollectionUtil.isEmpty(v)) { + double asDouble = v.stream().mapToDouble(ThdDataVO::getStatisticalData).average().getAsDouble(); + ThdDataVO thdDataVO = new ThdDataVO(); + BeanUtils.copyProperties(v.get(0), thdDataVO); + thdDataVO.setStatisticalData(Double.valueOf(df.format(asDouble))); + thdDataVO.setPhase("avg"); + result.add(thdDataVO); + } + }); + } } }); Double capacity = devCapacityFeignClient.getDevCapacity(devId).getData(); @@ -323,7 +429,6 @@ public class MqttMessageHandler { publisher.send("/zl/devData/" + devId + "/" + typeId, topoDataJson, 1, false); } - public String getCldidName(String cldid) { switch (cldid) { diff --git a/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/controller/TestController.java b/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/controller/TestController.java index 23b4c5c..c03586a 100644 --- a/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/controller/TestController.java +++ b/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/controller/TestController.java @@ -125,7 +125,7 @@ public class TestController { collect.forEach((tempPhase,byNameMap)->{ //最小值 InfluxQueryWrapper influxQueryWrapperMin = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperMin.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "min") + influxQueryWrapperMin.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "MIN") .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); // .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); @@ -156,7 +156,7 @@ public class TestController { //最大值 InfluxQueryWrapper influxQueryWrapperMax = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperMax.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "max") + influxQueryWrapperMax.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "MAX") .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); // .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); @@ -186,7 +186,7 @@ public class TestController { //平均值 InfluxQueryWrapper influxQueryWrapperAvg = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperAvg.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "avg") + influxQueryWrapperAvg.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "AVG") .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); // .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); @@ -216,7 +216,7 @@ public class TestController { //CP95 InfluxQueryWrapper influxQueryWrapperCp95 = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperCp95.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "avg") + influxQueryWrapperCp95.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "AVG") .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); // .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); diff --git a/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/job/ReportCovertJob.java b/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/job/ReportCovertJob.java index 89bc1b2..c051b7b 100644 --- a/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/job/ReportCovertJob.java +++ b/cs-report/cs-report-boot/src/main/java/com/njcn/csreport/job/ReportCovertJob.java @@ -1,348 +1,348 @@ -package com.njcn.csreport.job; - -import cn.hutool.extra.spring.SpringUtil; -import com.github.jeffreyning.mybatisplus.service.IMppService; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; -import com.njcn.common.utils.HarmonicTimesUtil; -import com.njcn.common.utils.HttpResultUtil; -import com.njcn.csdevice.api.CsLedgerFeignClient; -import com.njcn.csdevice.pojo.dto.LineParamDTO; -import com.njcn.csdevice.pojo.po.CsLedger; - -import com.njcn.csreport.enums.ReportTableEnum; -import com.njcn.csreport.service.RStatDataVDService; -import com.njcn.influx.constant.InfluxDbSqlConstant; -import com.njcn.influx.imapper.CommonMapper; -import com.njcn.influx.imapper.PqdDataMapper; -import com.njcn.influx.imapper.day.GovernReportMapper; -import com.njcn.influx.pojo.constant.InfluxDBTableConstant; -import com.njcn.influx.pojo.po.DataV; -import com.njcn.influx.pojo.po.cs.PqdData; -import com.njcn.influx.query.InfluxQueryWrapper; -import com.njcn.system.api.CsStatisticalSetFeignClient; -import com.njcn.system.api.DicDataFeignClient; - -import com.njcn.system.api.EpdFeignClient; -import com.njcn.system.enums.DicDataEnum; -import com.njcn.system.pojo.po.DictData; -import com.njcn.system.pojo.po.EleEpdPqd; -import com.njcn.system.pojo.vo.EleEpdPqdListVO; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.lang.reflect.Field; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -/** - * Description: - * Date: 2024/1/18 10:15【需求编号】 - * - * @author clam - * @version V1.0.0 - */ -@Component -@EnableScheduling -@RequiredArgsConstructor -@Slf4j -public class ReportCovertJob { - - private final CsLedgerFeignClient csLedgerFeignClient; - private final DicDataFeignClient dicDataFeignClient; - private final EpdFeignClient epdFeignClient; - private final GovernReportMapper reportMapper; - private final RStatDataVDService rStatDataVDService; - - private final static String SERVICE_PACKAGE_PREFIX = "com.njcn.csreport.service."; - private final static String ENTITY_PACKAGE_PREFIX = "com.njcn.csreport.pojo.po."; - private final static String SERVICE_SUFFIX = "Service"; - - - /** - * 每天同步influxdb数据到mysql - * - * @date 2024/3/5 - */ - @Scheduled(cron = "0 0 1 * * ? ") - public void executeEvent() throws ClassNotFoundException { - DateTimeFormatter formatter1 = DateTimeFormatter.ISO_DATE; - // 获取当前日期 - LocalDate today = LocalDate.now(); - - - // 计算前一天的日期 - LocalDate yesterday = today.minusDays(1); - // 获取前一天的开始时间(00:00:00) - LocalDateTime startOfYesterday = yesterday.atStartOfDay(); - // 获取前一天的结束时间(23:59:59) - LocalDateTime endOfYesterday = yesterday.plusDays(1).atStartOfDay().minusSeconds(1); - - // 定义时间格式 - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - - // 格式化时间为字符串 - String startTime = startOfYesterday.format(formatter); - String endTime = endOfYesterday.format(formatter); -// String startTime ="2024-07-31 00:00:00"; -// String endTime ="2024-09-20 23:00:00"; - List data = csLedgerFeignClient.queryLine(new LineParamDTO()).getData(); - List lineIndex = data.stream().map(CsLedger::getId).collect(Collectors.toList()); -// List lineIndex = Stream.of("00B78D016AB32","00B78D016AB51").collect(Collectors.toList()); - - //查询电能质量数据指标 - DictData pqd = dicDataFeignClient.getDicDataByCode(DicDataEnum.PQD.getCode()).getData(); - List tableNameList = dicDataFeignClient.getDicDataByTypeCode("Data_Day").getData(); - Map tableMap = tableNameList.stream().collect(Collectors.toMap(DictData::getId, DictData::getCode, (code, code2) -> code)); - - - List eleEpdPqdListVOList = epdFeignClient.selectAll().getData(); - EleEpdPqdListVO eleEpdPqdListVO = eleEpdPqdListVOList.stream().filter(temp -> Objects.equals(temp.getDataType(), pqd.getId())).collect(Collectors.toList()).get(0); - - - Map> eleEpdMap = eleEpdPqdListVO.getEleEpdPqdVOS().stream().filter(temp -> StringUtils.isNotEmpty(temp.getOtherName()) - && StringUtils.isNotEmpty(temp.getResourcesId())) - .collect(Collectors.groupingBy(EleEpdPqd::getResourcesId));//不能全表表分组查,根据映射的 - - for (Map.Entry> entry : eleEpdMap.entrySet()) { - String k = entry.getKey(); - List v = entry.getValue(); - String tableName = tableMap.get(k); - List result = new ArrayList<>(); - Class clazz = Class.forName(ENTITY_PACKAGE_PREFIX + ReportTableEnum.getEntityName(tableName)); - IMppService bean = (IMppService) SpringUtil.getBean(Class.forName(SERVICE_PACKAGE_PREFIX+ReportTableEnum.getEntityName(tableName)+SERVICE_SUFFIX)); - - //由于epd配置表A,B,C三项对应指标一样,这里做去重处理,由于功率表功率,和三项总功率都对应P只是项别不一一样因此,M相要和,A,B,C相分开计算 - Map>> collect = v.stream().collect(Collectors.groupingBy(EleEpdPqd::getPhase, Collectors.groupingBy(EleEpdPqd::getName))); - - collect.forEach((tempPhase,byNameMap)->{ - //最小值 - InfluxQueryWrapper influxQueryWrapperMin = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperMin.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "min") - .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); -// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); - - - byNameMap.forEach((name, eleEpdPqdList) -> { - EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); - if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { - //0-49转成1-50 - if(tempEleEpdPqd.getHarmStart()==0){ - tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); - tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); - } - for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { - influxQueryWrapperMin.min(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i); - } - } else { - influxQueryWrapperMin.min(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName()); - } - - }); - - influxQueryWrapperMin.groupBy(InfluxDBTableConstant.LINE_ID) - .groupBy(InfluxDBTableConstant.PHASIC_TYPE) - .groupBy(InfluxDBTableConstant.VALUE_TYPE) - .between(InfluxDBTableConstant.TIME, startTime, endTime); - List list1 = reportMapper.selectByQueryWrapper(influxQueryWrapperMin); - - - //最大值 - InfluxQueryWrapper influxQueryWrapperMax = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperMax.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "max") - .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); -// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); - - - byNameMap.forEach((name, eleEpdPqdList) -> { - EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); - if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { - //0-49转成1-50 - if(tempEleEpdPqd.getHarmStart()==0){ - tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); - tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); - } - for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { - influxQueryWrapperMax.max(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i); - } - } else { - influxQueryWrapperMax.max(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName()); - } - - }); - - influxQueryWrapperMax.groupBy(InfluxDBTableConstant.LINE_ID) - .groupBy(InfluxDBTableConstant.PHASIC_TYPE) - .groupBy(InfluxDBTableConstant.VALUE_TYPE) - .between(InfluxDBTableConstant.TIME, startTime, endTime); - List list2 = reportMapper.selectByQueryWrapper(influxQueryWrapperMax); - - //平均值 - InfluxQueryWrapper influxQueryWrapperAvg = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperAvg.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "avg") - .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); -// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); - - byNameMap.forEach((name, eleEpdPqdList) -> { - EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); - if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { - //0-49转成1-50 - if(tempEleEpdPqd.getHarmStart()==0){ - tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); - tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); - } - for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { - influxQueryWrapperAvg.mean(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i); - } - } else { - influxQueryWrapperAvg.mean(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName()); - } - - }); - - influxQueryWrapperAvg.groupBy(InfluxDBTableConstant.LINE_ID) - .groupBy(InfluxDBTableConstant.PHASIC_TYPE) - .groupBy(InfluxDBTableConstant.VALUE_TYPE) - .between(InfluxDBTableConstant.TIME, startTime, endTime); - List list3 = reportMapper.selectByQueryWrapper(influxQueryWrapperAvg); - - - //CP95 - InfluxQueryWrapper influxQueryWrapperCp95 = new InfluxQueryWrapper(PqdData.class, clazz); - influxQueryWrapperCp95.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "avg") - .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); -// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); - - byNameMap.forEach((name, eleEpdPqdList) -> { - EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); - if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { - //0-49转成1-50 - if(tempEleEpdPqd.getHarmStart()==0){ - tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); - tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); - } - for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { - influxQueryWrapperCp95.percentile(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i, 95); - } - } else { - influxQueryWrapperCp95.percentile(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName(),95); - } - - }); - - influxQueryWrapperCp95.groupBy(InfluxDBTableConstant.LINE_ID) - .groupBy(InfluxDBTableConstant.PHASIC_TYPE) - .groupBy(InfluxDBTableConstant.VALUE_TYPE) - .between(InfluxDBTableConstant.TIME, startTime, endTime); - List list4 = reportMapper.selectByQueryWrapper(influxQueryWrapperCp95); - //为ValueType转成cp95 - list4.forEach(temp -> { - //获取 - Field vauleField = null; - try { - vauleField = temp.getClass().getDeclaredField("valueType"); - - vauleField.setAccessible(true); //暴力访问id - vauleField.set(temp, "CP95"); - - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - - - }); - - - result.addAll(list1); - result.addAll(list2); - result.addAll(list3); - result.addAll(list4); - - - } ); - //1.为time赋值2.映射 - result.forEach(temp -> { - //获取 - Field phasicTypeeField = null; - Field timeField = null; - Field vauleTypeField = null; - Field qualityFlagField = null; - - try { - - timeField = temp.getClass().getDeclaredField("time"); - - timeField.setAccessible(true); - timeField.set(temp, yesterday); - phasicTypeeField = temp.getClass().getDeclaredField("phasicType"); - phasicTypeeField.setAccessible(true); - String phase = phasicTypeeField.get(temp).toString(); - phasicTypeeField.set(temp, getPhase(phase)); - - - vauleTypeField = temp.getClass().getDeclaredField("valueType"); - vauleTypeField.setAccessible(true); - String valueType = vauleTypeField.get(temp).toString(); - vauleTypeField.set(temp, valueType.toUpperCase()); - - qualityFlagField = temp.getClass().getDeclaredField("qualityFlag"); - - qualityFlagField.setAccessible(true); - qualityFlagField.set(temp, 0); - - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - - - }); - log.info("插入数据"); - //由于AB,A对应的都是A不能批量插入会爆主键冲突 - result.forEach(temp->{ - bean.saveOrUpdateByMultiId(temp); - }); - - - } - - - } - - public String getPhase( String phase) { - - switch (phase) { - case "A": - return "A"; - case "B": - return "B"; - case "C": - return "C"; - case "M": - return "T"; - case "AB": - return "A"; - case "BC": - return "B"; - case "CA": - return "C"; - default: - break; - } - return ""; - } - - -} +//package com.njcn.csreport.job; +// +//import cn.hutool.extra.spring.SpringUtil; +//import com.github.jeffreyning.mybatisplus.service.IMppService; +//import com.njcn.common.pojo.enums.response.CommonResponseEnum; +//import com.njcn.common.utils.HarmonicTimesUtil; +//import com.njcn.common.utils.HttpResultUtil; +//import com.njcn.csdevice.api.CsLedgerFeignClient; +//import com.njcn.csdevice.pojo.dto.LineParamDTO; +//import com.njcn.csdevice.pojo.po.CsLedger; +// +//import com.njcn.csreport.enums.ReportTableEnum; +//import com.njcn.csreport.service.RStatDataVDService; +//import com.njcn.influx.constant.InfluxDbSqlConstant; +//import com.njcn.influx.imapper.CommonMapper; +//import com.njcn.influx.imapper.PqdDataMapper; +//import com.njcn.influx.imapper.day.GovernReportMapper; +//import com.njcn.influx.pojo.constant.InfluxDBTableConstant; +//import com.njcn.influx.pojo.po.DataV; +//import com.njcn.influx.pojo.po.cs.PqdData; +//import com.njcn.influx.query.InfluxQueryWrapper; +//import com.njcn.system.api.CsStatisticalSetFeignClient; +//import com.njcn.system.api.DicDataFeignClient; +// +//import com.njcn.system.api.EpdFeignClient; +//import com.njcn.system.enums.DicDataEnum; +//import com.njcn.system.pojo.po.DictData; +//import com.njcn.system.pojo.po.EleEpdPqd; +//import com.njcn.system.pojo.vo.EleEpdPqdListVO; +//import lombok.RequiredArgsConstructor; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.commons.lang3.StringUtils; +//import org.springframework.scheduling.annotation.EnableScheduling; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import java.lang.reflect.Field; +//import java.time.LocalDate; +//import java.time.LocalDateTime; +//import java.time.format.DateTimeFormatter; +//import java.util.ArrayList; +//import java.util.List; +//import java.util.Map; +//import java.util.Objects; +//import java.util.stream.Collectors; +// +///** +// * Description: +// * Date: 2024/1/18 10:15【需求编号】 +// * +// * @author clam +// * @version V1.0.0 +// */ +//@Component +//@EnableScheduling +//@RequiredArgsConstructor +//@Slf4j +//public class ReportCovertJob { +// +// private final CsLedgerFeignClient csLedgerFeignClient; +// private final DicDataFeignClient dicDataFeignClient; +// private final EpdFeignClient epdFeignClient; +// private final GovernReportMapper reportMapper; +// private final RStatDataVDService rStatDataVDService; +// +// private final static String SERVICE_PACKAGE_PREFIX = "com.njcn.csreport.service."; +// private final static String ENTITY_PACKAGE_PREFIX = "com.njcn.csreport.pojo.po."; +// private final static String SERVICE_SUFFIX = "Service"; +// +// +// /** +// * 每天同步influxdb数据到mysql +// * +// * @date 2024/3/5 +// */ +// @Scheduled(cron = "0 0 1 * * ? ") +// public void executeEvent() throws ClassNotFoundException { +// DateTimeFormatter formatter1 = DateTimeFormatter.ISO_DATE; +// // 获取当前日期 +// LocalDate today = LocalDate.now(); +// +// +// // 计算前一天的日期 +// LocalDate yesterday = today.minusDays(1); +// // 获取前一天的开始时间(00:00:00) +// LocalDateTime startOfYesterday = yesterday.atStartOfDay(); +// // 获取前一天的结束时间(23:59:59) +// LocalDateTime endOfYesterday = yesterday.plusDays(1).atStartOfDay().minusSeconds(1); +// +// // 定义时间格式 +// DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); +// +// // 格式化时间为字符串 +// String startTime = startOfYesterday.format(formatter); +// String endTime = endOfYesterday.format(formatter); +//// String startTime ="2024-07-31 00:00:00"; +//// String endTime ="2024-09-20 23:00:00"; +// List data = csLedgerFeignClient.queryLine(new LineParamDTO()).getData(); +// List lineIndex = data.stream().map(CsLedger::getId).collect(Collectors.toList()); +//// List lineIndex = Stream.of("00B78D016AB32","00B78D016AB51").collect(Collectors.toList()); +// +// //查询电能质量数据指标 +// DictData pqd = dicDataFeignClient.getDicDataByCode(DicDataEnum.PQD.getCode()).getData(); +// List tableNameList = dicDataFeignClient.getDicDataByTypeCode("Data_Day").getData(); +// Map tableMap = tableNameList.stream().collect(Collectors.toMap(DictData::getId, DictData::getCode, (code, code2) -> code)); +// +// +// List eleEpdPqdListVOList = epdFeignClient.selectAll().getData(); +// EleEpdPqdListVO eleEpdPqdListVO = eleEpdPqdListVOList.stream().filter(temp -> Objects.equals(temp.getDataType(), pqd.getId())).collect(Collectors.toList()).get(0); +// +// +// Map> eleEpdMap = eleEpdPqdListVO.getEleEpdPqdVOS().stream().filter(temp -> StringUtils.isNotEmpty(temp.getOtherName()) +// && StringUtils.isNotEmpty(temp.getResourcesId())) +// .collect(Collectors.groupingBy(EleEpdPqd::getResourcesId));//不能全表表分组查,根据映射的 +// +// for (Map.Entry> entry : eleEpdMap.entrySet()) { +// String k = entry.getKey(); +// List v = entry.getValue(); +// String tableName = tableMap.get(k); +// List result = new ArrayList<>(); +// Class clazz = Class.forName(ENTITY_PACKAGE_PREFIX + ReportTableEnum.getEntityName(tableName)); +// IMppService bean = (IMppService) SpringUtil.getBean(Class.forName(SERVICE_PACKAGE_PREFIX+ReportTableEnum.getEntityName(tableName)+SERVICE_SUFFIX)); +// +// //由于epd配置表A,B,C三项对应指标一样,这里做去重处理,由于功率表功率,和三项总功率都对应P只是项别不一一样因此,M相要和,A,B,C相分开计算 +// Map>> collect = v.stream().collect(Collectors.groupingBy(EleEpdPqd::getPhase, Collectors.groupingBy(EleEpdPqd::getName))); +// +// collect.forEach((tempPhase,byNameMap)->{ +// //最小值 +// InfluxQueryWrapper influxQueryWrapperMin = new InfluxQueryWrapper(PqdData.class, clazz); +// influxQueryWrapperMin.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "min") +// .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); +//// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); +// +// +// byNameMap.forEach((name, eleEpdPqdList) -> { +// EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); +// if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { +// //0-49转成1-50 +// if(tempEleEpdPqd.getHarmStart()==0){ +// tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); +// tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); +// } +// for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { +// influxQueryWrapperMin.min(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i); +// } +// } else { +// influxQueryWrapperMin.min(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName()); +// } +// +// }); +// +// influxQueryWrapperMin.groupBy(InfluxDBTableConstant.LINE_ID) +// .groupBy(InfluxDBTableConstant.PHASIC_TYPE) +// .groupBy(InfluxDBTableConstant.VALUE_TYPE) +// .between(InfluxDBTableConstant.TIME, startTime, endTime); +// List list1 = reportMapper.selectByQueryWrapper(influxQueryWrapperMin); +// +// +// //最大值 +// InfluxQueryWrapper influxQueryWrapperMax = new InfluxQueryWrapper(PqdData.class, clazz); +// influxQueryWrapperMax.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "max") +// .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); +//// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); +// +// +// byNameMap.forEach((name, eleEpdPqdList) -> { +// EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); +// if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { +// //0-49转成1-50 +// if(tempEleEpdPqd.getHarmStart()==0){ +// tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); +// tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); +// } +// for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { +// influxQueryWrapperMax.max(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i); +// } +// } else { +// influxQueryWrapperMax.max(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName()); +// } +// +// }); +// +// influxQueryWrapperMax.groupBy(InfluxDBTableConstant.LINE_ID) +// .groupBy(InfluxDBTableConstant.PHASIC_TYPE) +// .groupBy(InfluxDBTableConstant.VALUE_TYPE) +// .between(InfluxDBTableConstant.TIME, startTime, endTime); +// List list2 = reportMapper.selectByQueryWrapper(influxQueryWrapperMax); +// +// //平均值 +// InfluxQueryWrapper influxQueryWrapperAvg = new InfluxQueryWrapper(PqdData.class, clazz); +// influxQueryWrapperAvg.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "avg") +// .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); +//// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); +// +// byNameMap.forEach((name, eleEpdPqdList) -> { +// EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); +// if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { +// //0-49转成1-50 +// if(tempEleEpdPqd.getHarmStart()==0){ +// tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); +// tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); +// } +// for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { +// influxQueryWrapperAvg.mean(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i); +// } +// } else { +// influxQueryWrapperAvg.mean(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName()); +// } +// +// }); +// +// influxQueryWrapperAvg.groupBy(InfluxDBTableConstant.LINE_ID) +// .groupBy(InfluxDBTableConstant.PHASIC_TYPE) +// .groupBy(InfluxDBTableConstant.VALUE_TYPE) +// .between(InfluxDBTableConstant.TIME, startTime, endTime); +// List list3 = reportMapper.selectByQueryWrapper(influxQueryWrapperAvg); +// +// +// //CP95 +// InfluxQueryWrapper influxQueryWrapperCp95 = new InfluxQueryWrapper(PqdData.class, clazz); +// influxQueryWrapperCp95.regular(PqdData::getLineId, lineIndex).eq(InfluxDBTableConstant.VALUE_TYPE, "avg") +// .eq(InfluxDBTableConstant.PHASIC_TYPE,tempPhase); +//// .eq(InfluxDBTableConstant.IS_ABNORMAL, 0); +// +// byNameMap.forEach((name, eleEpdPqdList) -> { +// EleEpdPqd tempEleEpdPqd = eleEpdPqdList.get(0); +// if (Objects.nonNull(tempEleEpdPqd.getHarmStart()) && Objects.nonNull(tempEleEpdPqd.getHarmEnd())) { +// //0-49转成1-50 +// if(tempEleEpdPqd.getHarmStart()==0){ +// tempEleEpdPqd.setHarmStart(tempEleEpdPqd.getHarmStart()+1); +// tempEleEpdPqd.setHarmEnd(tempEleEpdPqd.getHarmEnd()+1); +// } +// for (int i = tempEleEpdPqd.getHarmStart(); i < tempEleEpdPqd.getHarmEnd() + 1; i++) { +// influxQueryWrapperCp95.percentile(tempEleEpdPqd.getName() + "_" + i, tempEleEpdPqd.getOtherName() + "_" + i, 95); +// } +// } else { +// influxQueryWrapperCp95.percentile(tempEleEpdPqd.getName(), tempEleEpdPqd.getOtherName(),95); +// } +// +// }); +// +// influxQueryWrapperCp95.groupBy(InfluxDBTableConstant.LINE_ID) +// .groupBy(InfluxDBTableConstant.PHASIC_TYPE) +// .groupBy(InfluxDBTableConstant.VALUE_TYPE) +// .between(InfluxDBTableConstant.TIME, startTime, endTime); +// List list4 = reportMapper.selectByQueryWrapper(influxQueryWrapperCp95); +// //为ValueType转成cp95 +// list4.forEach(temp -> { +// //获取 +// Field vauleField = null; +// try { +// vauleField = temp.getClass().getDeclaredField("valueType"); +// +// vauleField.setAccessible(true); //暴力访问id +// vauleField.set(temp, "CP95"); +// +// } catch (NoSuchFieldException e) { +// throw new RuntimeException(e); +// } catch (IllegalAccessException e) { +// throw new RuntimeException(e); +// } +// +// +// }); +// +// +// result.addAll(list1); +// result.addAll(list2); +// result.addAll(list3); +// result.addAll(list4); +// +// +// } ); +// //1.为time赋值2.映射 +// result.forEach(temp -> { +// //获取 +// Field phasicTypeeField = null; +// Field timeField = null; +// Field vauleTypeField = null; +// Field qualityFlagField = null; +// +// try { +// +// timeField = temp.getClass().getDeclaredField("time"); +// +// timeField.setAccessible(true); +// timeField.set(temp, yesterday); +// phasicTypeeField = temp.getClass().getDeclaredField("phasicType"); +// phasicTypeeField.setAccessible(true); +// String phase = phasicTypeeField.get(temp).toString(); +// phasicTypeeField.set(temp, getPhase(phase)); +// +// +// vauleTypeField = temp.getClass().getDeclaredField("valueType"); +// vauleTypeField.setAccessible(true); +// String valueType = vauleTypeField.get(temp).toString(); +// vauleTypeField.set(temp, valueType.toUpperCase()); +// +// qualityFlagField = temp.getClass().getDeclaredField("qualityFlag"); +// +// qualityFlagField.setAccessible(true); +// qualityFlagField.set(temp, 0); +// +// } catch (NoSuchFieldException e) { +// throw new RuntimeException(e); +// } catch (IllegalAccessException e) { +// throw new RuntimeException(e); +// } +// +// +// }); +// log.info("插入数据"); +// //由于AB,A对应的都是A不能批量插入会爆主键冲突 +// result.forEach(temp->{ +// bean.saveOrUpdateByMultiId(temp); +// }); +// +// +// } +// +// +// } +// +// public String getPhase( String phase) { +// +// switch (phase) { +// case "A": +// return "A"; +// case "B": +// return "B"; +// case "C": +// return "C"; +// case "M": +// return "T"; +// case "AB": +// return "A"; +// case "BC": +// return "B"; +// case "CA": +// return "C"; +// default: +// break; +// } +// return ""; +// } +// +// +//}