diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java index 8272de9..95570e7 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/param/TableEnum.java @@ -28,7 +28,9 @@ public enum TableEnum { DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4), DATAI("DataI","谐波电流幅值数据表", 4), DATAPLT("DataPlt","长时闪变数据表", 1), - DATAV("DataV","谐波电压幅值数据表", 4); + DATAV("DataV","谐波电压幅值数据表", 4), + COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4), + ; private final String code; diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/InfluxDBComInfoRmation.java b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/InfluxDBComInfoRmation.java new file mode 100644 index 0000000..9833968 --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/bo/po/InfluxDBComInfoRmation.java @@ -0,0 +1,53 @@ +package com.njcn.influx.bo.po; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.njcn.influx.utils.InstantDateSerializer; +import com.njcn.oracle.bo.po.ComInfoRmation; +import lombok.Data; +import org.influxdb.annotation.Column; +import org.influxdb.annotation.Measurement; +import org.influxdb.annotation.TimeColumn; + +import java.time.Instant; +import java.time.ZoneId; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2022/7/12 9:55 + */ +@Data +@Measurement(name = "pqs_communicate") +public class InfluxDBComInfoRmation { + + @TimeColumn + @Column(name = "time") + @JsonSerialize(using = InstantDateSerializer.class) + private Instant time; + + @Column(name = "dev_id") + private String devId; + + @Column(name = "description") + private String description; + + @Column(name = "type") + private Integer type; + + public static InfluxDBComInfoRmation oralceToInfluxDB(ComInfoRmation comInfoRmation) { + if (comInfoRmation == null) { + return null; + } + InfluxDBComInfoRmation influxDBDataCommunicate = new InfluxDBComInfoRmation(); + Instant instant = comInfoRmation.getUpdateTime().atZone(ZoneId.systemDefault()).toInstant(); + + influxDBDataCommunicate.setTime(instant); + influxDBDataCommunicate.setDevId(comInfoRmation.getLineIndex()); + influxDBDataCommunicate.setDescription(comInfoRmation.getDescription()); + influxDBDataCommunicate.setType(comInfoRmation.getType()); + + return influxDBDataCommunicate; + } +} diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/imapper/InfluxDBComInfoRmationMapper.java b/influx-data/influx-source/src/main/java/com/njcn/influx/imapper/InfluxDBComInfoRmationMapper.java new file mode 100644 index 0000000..e3eedff --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/imapper/InfluxDBComInfoRmationMapper.java @@ -0,0 +1,7 @@ +package com.njcn.influx.imapper; + +import com.njcn.influx.base.InfluxDbBaseMapper; +import com.njcn.influx.bo.po.InfluxDBComInfoRmation; + +public interface InfluxDBComInfoRmationMapper extends InfluxDbBaseMapper { +} diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java index b9857ce..78c4b67 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java @@ -1,9 +1,11 @@ package com.njcn.influx.service.impl; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.text.StrPool; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.extra.spring.SpringUtil; import com.njcn.influx.bo.param.TableEnum; import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; @@ -245,36 +247,68 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { StopWatch stopWatch = new StopWatch(); stopWatch.start(); List list = new ArrayList(Collections.emptyList()); - //获取监测点最新的数据时间,单监测点查询数据 - List lineTimeList = lineTimeMapper.getLineTime(); - lineTimeList.forEach(item->{ + if("ComInfoRmation".equals(tableName)){ MigrationParam migration = new MigrationParam(); - migration.setLineIds(Collections.singletonList(item.getLineIndex())); - migration.setStartTime(item.getUpdateTime().minusHours(2)); - migration.setEndTime(item.getUpdateTime()); - System.out.println("当前监测点为------------------------------------"+item.getLineIndex()); - System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); - System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + migration.setStartTime(dataAsynParam.getStartDateTime()); + migration.setEndTime(dataAsynParam.getEndDateTime()); + System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); list.addAll(executor.queryData(migration)); - }); - System.out.println("查询到的数据++++++++++++++"+list.size()); - //反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid - Iterator iterator = list.iterator(); - while (iterator.hasNext()) { - try{ - Object obj = iterator.next(); - //获取 - Field id = obj.getClass().getDeclaredField("lineid"); - id.setAccessible(true); //暴力访问id - String id1 = id.get(obj).toString(); - if (!IdMappingCache.LineIdMapping.containsKey(id1)){ - log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); - iterator.remove(); - }else { - id.set(obj, IdMappingCache.LineIdMapping.get(id1)); + System.out.println("查询到的数据++++++++++++++"+list.size()); + //反射获取linid的值并把linid的值替换成mysql对应的devid,并记录未匹配的devid + Iterator iterator = list.iterator(); + while (iterator.hasNext()) { + try{ + Object obj = iterator.next(); + //获取 + Field id = obj.getClass().getDeclaredField("lineIndex"); + id.setAccessible(true); //暴力访问id + Object o = id.get(obj); + if(ObjectUtil.isNotNull(o)){ + int index = Integer.parseInt(o.toString())/10; + if (!IdMappingCache.DevIdMapping.containsKey(index+"")){ + log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到devid匹配的devid"+index); + iterator.remove(); + }else { + id.set(obj, IdMappingCache.DevIdMapping.get(index+"")); + } + } + }catch (Exception e){ + e.printStackTrace(); + } + } + }else{ + //获取监测点最新的数据时间,单监测点查询数据 + List lineTimeList = lineTimeMapper.getLineTime(); + lineTimeList.forEach(item->{ + MigrationParam migration = new MigrationParam(); + migration.setLineIds(Collections.singletonList(item.getLineIndex())); + migration.setStartTime(item.getUpdateTime().minusHours(2)); + migration.setEndTime(item.getUpdateTime()); + System.out.println("当前监测点为------------------------------------"+item.getLineIndex()); + System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + list.addAll(executor.queryData(migration)); + }); + System.out.println("查询到的数据++++++++++++++"+list.size()); + //反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid + Iterator iterator = list.iterator(); + while (iterator.hasNext()) { + try{ + Object obj = iterator.next(); + //获取 + Field id = obj.getClass().getDeclaredField("lineid"); + id.setAccessible(true); //暴力访问id + String id1 = id.get(obj).toString(); + if (!IdMappingCache.LineIdMapping.containsKey(id1)){ + log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); + iterator.remove(); + }else { + id.set(obj, IdMappingCache.LineIdMapping.get(id1)); + } + }catch (Exception e){ + e.printStackTrace(); } - }catch (Exception e){ - e.printStackTrace(); } } //采用弱引用接受,后续手动调用gc后,会清空该对象 @@ -351,7 +385,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { DataAsynParam dataAsynParam1 = new DataAsynParam(); dataAsynParam1.setEndDateTime(startDateTime1.minusHours(-1).minusSeconds(1)); dataAsynParam1.setStartDateTime(startDateTime1); - dataAsynParam1.setTableNames(TableEnum.getExecutableTypes()); + if(CollUtil.isEmpty(dataAsynParam.getTableNames())){ + dataAsynParam1.setTableNames(TableEnum.getExecutableTypes()); + }else{ + dataAsynParam1.setTableNames(dataAsynParam.getTableNames()); + } log.info("执行"+startDateTime1+"时刻数据"); this.hourseDataBacthSysc(dataAsynParam1); } diff --git a/influx-data/influx-target/src/main/java/com/njcn/influx/controller/OracleToInfluxDBController.java b/influx-data/influx-target/src/main/java/com/njcn/influx/controller/OracleToInfluxDBController.java index da86da3..2486728 100644 --- a/influx-data/influx-target/src/main/java/com/njcn/influx/controller/OracleToInfluxDBController.java +++ b/influx-data/influx-target/src/main/java/com/njcn/influx/controller/OracleToInfluxDBController.java @@ -2,6 +2,7 @@ package com.njcn.influx.controller; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.StrUtil; import com.njcn.influx.service.OracleEventDetailToMysqlService; import com.njcn.influx.service.OracleMonitorStatusToMysqlService; import com.njcn.influx.service.OracleToInfluxDBService; @@ -18,6 +19,7 @@ import org.springframework.web.bind.annotation.*; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; +import java.util.Collections; /** @@ -59,6 +61,22 @@ public class OracleToInfluxDBController { return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步"); } + @GetMapping("/dataSyncTable") + @ApiOperation("数据同步") + public Boolean dataSyncTable(@RequestParam("startDateTime") String startDateTime, + @RequestParam("endDateTime") String endDateTime, + @RequestParam("tableName") String tableName + ) { + DataAsynParam dataAsynParam = new DataAsynParam(); + dataAsynParam.setStartDateTime(LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATETIME_PATTERN)); + dataAsynParam.setEndDateTime(LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATETIME_PATTERN)); + if(StrUtil.isNotBlank(tableName)){ + dataAsynParam.setTableNames(Collections.singletonList(tableName)); + } + oracleToInfluxDBService.AsyncData(dataAsynParam); + return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步"); + } + @PostMapping("/oneMonitorDataTransport") diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/ComInfoRmation.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/ComInfoRmation.java new file mode 100644 index 0000000..ba76321 --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/bo/po/ComInfoRmation.java @@ -0,0 +1,39 @@ +package com.njcn.oracle.bo.po; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * @author wr + * @description + * @date 2024/9/25 11:01 + */ +@Data +@TableName("PQS_COMINFORMATION") +public class ComInfoRmation implements Serializable { + + private static final long serialVersionUID = 1L; + + @TableField("UPDATETIME") + private LocalDateTime updateTime; + + @TableField("LINE_INDEX") + private String lineIndex; + + @TableField("TYPE") + private Integer type; + + @TableField("DESCRIPTION") + private String description; + + @TableField("REMARK") + private String remark; + + @TableField("STATE") + private Integer state; + +} diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/ComInfoRmationMapper.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/ComInfoRmationMapper.java new file mode 100644 index 0000000..f8b5c54 --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/ComInfoRmationMapper.java @@ -0,0 +1,16 @@ +package com.njcn.oracle.mapper; + +import com.njcn.oracle.bo.po.ComInfoRmation; +import com.njcn.oracle.mybatis.mapper.BatchBaseMapper; + +/** + *

+ * Mapper 接口 + *

+ * + * @author hongawen + * @since 2023-12-28 + */ +public interface ComInfoRmationMapper extends BatchBaseMapper { + +} diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/mapping/ComInfoRmationMapper.xml b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/mapping/ComInfoRmationMapper.xml new file mode 100644 index 0000000..3270aa4 --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/mapper/mapping/ComInfoRmationMapper.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/IComInfoRmationService.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/IComInfoRmationService.java new file mode 100644 index 0000000..59a36cb --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/IComInfoRmationService.java @@ -0,0 +1,16 @@ +package com.njcn.oracle.service; + +import com.njcn.oracle.bo.po.ComInfoRmation; +import com.njcn.oracle.mybatis.service.IReplenishMybatisService; + + +/** + * @Description: 监测点状态监测数据 + * @param + * @return: + * @Author: wr + * @Date: 2024/9/25 13:52 + */ +public interface IComInfoRmationService extends IReplenishMybatisService { + +} diff --git a/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/ComInfoRmationServiceImpl.java b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/ComInfoRmationServiceImpl.java new file mode 100644 index 0000000..0fbca5b --- /dev/null +++ b/oracle-data/oracle-source/src/main/java/com/njcn/oracle/service/impl/ComInfoRmationServiceImpl.java @@ -0,0 +1,65 @@ +package com.njcn.oracle.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.njcn.oracle.bo.param.MigrationParam; +import com.njcn.oracle.bo.po.ComInfoRmation; +import com.njcn.oracle.mapper.ComInfoRmationMapper; +import com.njcn.oracle.mybatis.service.impl.ReplenishMybatisServiceImpl; +import com.njcn.oracle.service.IComInfoRmationService; +import org.springframework.stereotype.Service; +import org.springframework.util.StopWatch; + +import java.util.List; + +/** + * @Description: + * @Author: wr + * @Date: 2024/9/25 13:58 + */ +@Service +public class ComInfoRmationServiceImpl extends ReplenishMybatisServiceImpl implements IComInfoRmationService { + + @Override + public List queryData(MigrationParam migrationParam) { + //查询时间范围内的数据 + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime()); + if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) { + lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds()); + } + return this.baseMapper.selectList(lambdaQueryWrapper); + } + + + @Override + @DS("target") + public void clearTargetData(MigrationParam migrationParam) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime()); + if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) { + lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds()); + } + this.baseMapper.delete(lambdaQueryWrapper); + } + + + + /** + * 默认500分片,如果字段超过20以上,建议重写方法调整为1000分片,重写时不要忘记@DS注解 + * + * @param data 数据集合 + */ + @Override + @DS("target") + public void insertBatchByDB(List data) { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + this.insertBatchBySlice(data, 100); + stopWatch.stop(); + System.out.printf("pq_ComInfoRmation总计:%d条,耗时执行时长:%f 秒.%n", data.size(), stopWatch.getTotalTimeSeconds()); + } + + +}