Compare commits

...

10 Commits

Author SHA1 Message Date
026d71a060 1.暂降同步调整 2025-09-05 15:58:58 +08:00
hzj
b43728e154 添加项目部署说明 2025-07-07 10:11:25 +08:00
hzj
85659629de 添加项目部署说明 2024-12-13 10:03:53 +08:00
wr
6518adc8ce 1.暂态同步程序提交 2024-12-06 14:06:02 +08:00
wr
47518928e0 1.mysql基本信息同步 2024-11-25 12:10:31 +08:00
hzj
00beb8efa8 添加redis配置,将id缓存迁移由缓存移动到redis 2024-11-04 16:25:43 +08:00
wr
401d2a4e97 1.oracle同步influxdb代码,监测点运行中断状态 2024-09-29 16:22:34 +08:00
53cc3c85e3 1.添加登录模块 2024-08-13 16:15:59 +08:00
43b21859af 1.添加登录模块 2024-08-09 14:32:44 +08:00
cdf
a857953854 河北转换提交 2024-08-09 11:02:01 +08:00
35 changed files with 1337 additions and 173 deletions

24
README.md Normal file
View File

@@ -0,0 +1,24 @@
# 数据同步注意事项
Oralce同步到InfluxDB
1.确认数据库oralcemysql,influxdb的连接信息准确性
2.确认mysql数据库的pq_line_bakpq_device_bak两张表,mysql与oracle台账监测点映射关系正确
3.历史数据与实时数据必须分成两个jar包跑,且两个jar包的端口不能冲突与占用。
4.在历史jar中需要将实时数据的批处理任务停止否知只会跑实时数据,历史数据接口调用无效
5.在调用历史数据接口时将OracleToInfluxDBServiceImpl的
//获取监测点最新的数据时间,单监测点查询数据
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
lineTimeList.forEach(item->{
MigrationParam migration = new MigrationParam();
migration.setLineIds(Collections.singletonList(item.getLineIndex()));
migration.setStartTime(item.getUpdateTime().minusHours(2));
migration.setEndTime(item.getUpdateTime());
System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
list.addAll(executor.queryData(migration));
});![img.png](img.png)这一部分代码注释掉,这段代码是冀北特有的,如果不注释掉始终同步最新的数据时间的数据导致历史数据接口调用失败。
6.同步暂态数据接口如下例http://10.95.53.49:8092/data/eventBatch?startDateTime=2024-01-01&endDateTime=2024-12-06 (吴瑞开发的有问题咨询吴瑞)
7.同步稳态数据接口如下例http://10.95.53.49:9091/data/dataSyncHours?startDateTime=2024-11-25 00:00:00&endDateTime=2024-01-01 00:00:00(需注意是重最新时间朝更久的时间同步的,因此
startDateTime>endDateTime,否则无法同步).
8.当新老系统并行期间,可能老系统依旧会新建台账,但是pq_line_bak不能及时更新部分新增监测点实时数据同步不了。

View File

@@ -98,6 +98,22 @@
<artifactId>oracle-source</artifactId> <artifactId>oracle-source</artifactId>
<version>1.0.0</version> <version>1.0.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging</groupId>
<artifactId>logging-parent</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>

View File

@@ -28,7 +28,9 @@ public enum TableEnum {
DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4), DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4),
DATAI("DataI","谐波电流幅值数据表", 4), DATAI("DataI","谐波电流幅值数据表", 4),
DATAPLT("DataPlt","长时闪变数据表", 1), DATAPLT("DataPlt","长时闪变数据表", 1),
DATAV("DataV","谐波电压幅值数据表", 4); DATAV("DataV","谐波电压幅值数据表", 4),
COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4),
;
private final String code; private final String code;

View File

@@ -0,0 +1,53 @@
package com.njcn.influx.bo.po;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.njcn.influx.utils.InstantDateSerializer;
import com.njcn.oracle.bo.po.ComInfoRmation;
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import org.influxdb.annotation.TimeColumn;
import java.time.Instant;
import java.time.ZoneId;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2022/7/12 9:55
*/
@Data
@Measurement(name = "pqs_communicate")
public class InfluxDBComInfoRmation {
@TimeColumn
@Column(name = "time")
@JsonSerialize(using = InstantDateSerializer.class)
private Instant time;
@Column(name = "dev_id")
private String devId;
@Column(name = "description")
private String description;
@Column(name = "type")
private Integer type;
public static InfluxDBComInfoRmation oralceToInfluxDB(ComInfoRmation comInfoRmation) {
if (comInfoRmation == null) {
return null;
}
InfluxDBComInfoRmation influxDBDataCommunicate = new InfluxDBComInfoRmation();
Instant instant = comInfoRmation.getUpdateTime().atZone(ZoneId.systemDefault()).toInstant();
influxDBDataCommunicate.setTime(instant);
influxDBDataCommunicate.setDevId(comInfoRmation.getLineIndex());
influxDBDataCommunicate.setDescription(comInfoRmation.getDescription());
influxDBDataCommunicate.setType(comInfoRmation.getType());
return influxDBDataCommunicate;
}
}

View File

@@ -58,8 +58,8 @@ public class InfluxDBDataFlicker {
influxDBDataFlicker.setLineId(dataFlicker.getLineid()); influxDBDataFlicker.setLineId(dataFlicker.getLineid());
influxDBDataFlicker.setPhaseType(dataFlicker.getPhasicType()); influxDBDataFlicker.setPhaseType(dataFlicker.getPhasicType());
influxDBDataFlicker.setFluc(dataFlicker.getFluc()); influxDBDataFlicker.setFluc(dataFlicker.getFluc());
influxDBDataFlicker.setPlt(dataFlicker.getPst()); influxDBDataFlicker.setPlt(dataFlicker.getPlt());
influxDBDataFlicker.setPst(dataFlicker.getPlt()); influxDBDataFlicker.setPst(dataFlicker.getPst());
influxDBDataFlicker.setQualityFlag(dataFlicker.getQualityflag()+""); influxDBDataFlicker.setQualityFlag(dataFlicker.getQualityflag()+"");

View File

@@ -275,9 +275,9 @@ public class InfluxDBDataHarmpowerP {
influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50()); influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50());
} }
else if (valueType.equals("MAX")){ else if (valueType.equals("MAX")){
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDf()); influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDfMax());
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPf()); influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPfMax());
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getP()); influxDBDataHarmPhasicV.setP(dataHarmpowerP.getPMax());
influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Max()); influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Max());
influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Max()); influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Max());
@@ -331,9 +331,9 @@ public class InfluxDBDataHarmpowerP {
influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50Max()); influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50Max());
} }
else if (valueType.equals("MIN")){ else if (valueType.equals("MIN")){
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDf()); influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDfMin());
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPf()); influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPfMin());
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getP()); influxDBDataHarmPhasicV.setP(dataHarmpowerP.getPMin());
influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Min()); influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Min());
influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Min()); influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Min());
@@ -387,9 +387,9 @@ public class InfluxDBDataHarmpowerP {
influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50Min()); influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50Min());
} }
else if (valueType.equals("CP95")){ else if (valueType.equals("CP95")){
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDf()); influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDfCp95());
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPf()); influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPfCp95());
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getP()); influxDBDataHarmPhasicV.setP(dataHarmpowerP.getPCp95());
influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Cp95()); influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Cp95());
influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Cp95()); influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Cp95());

View File

@@ -268,7 +268,7 @@ public class InfluxDBDataHarmpowerQ {
influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50()); influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50());
} }
else if (valueType.equals("MAX")){ else if (valueType.equals("MAX")){
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQ()); influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQMax());
influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Max()); influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Max());
influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Max()); influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Max());
@@ -322,7 +322,7 @@ public class InfluxDBDataHarmpowerQ {
influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50Max()); influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50Max());
} }
else if (valueType.equals("MIN")){ else if (valueType.equals("MIN")){
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQ()); influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQMin());
influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Min()); influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Min());
influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Min()); influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Min());
@@ -376,7 +376,7 @@ public class InfluxDBDataHarmpowerQ {
influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50Min()); influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50Min());
} }
else if (valueType.equals("CP95")){ else if (valueType.equals("CP95")){
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQ()); influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQCp95());
influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Cp95()); influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Cp95());
influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Cp95()); influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Cp95());

View File

@@ -266,7 +266,7 @@ public class InfluxDBDataHarmpowerS {
influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50()); influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50());
} }
else if (valueType.equals("MAX")){ else if (valueType.equals("MAX")){
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getS()); influxDBDataHarmPhasicV.setS(dataHarmpowerS.getSMax());
influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Max()); influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Max());
influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Max()); influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Max());
@@ -320,7 +320,7 @@ public class InfluxDBDataHarmpowerS {
influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50Max()); influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50Max());
} }
else if (valueType.equals("MIN")){ else if (valueType.equals("MIN")){
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getS()); influxDBDataHarmPhasicV.setS(dataHarmpowerS.getSMin());
influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Min()); influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Min());
influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Min()); influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Min());
@@ -374,7 +374,7 @@ public class InfluxDBDataHarmpowerS {
influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50Min()); influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50Min());
} }
else if (valueType.equals("CP95")){ else if (valueType.equals("CP95")){
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getS()); influxDBDataHarmPhasicV.setS(dataHarmpowerS.getSCp95());
influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Cp95()); influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Cp95());
influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Cp95()); influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Cp95());

View File

@@ -106,7 +106,7 @@ public class OraclePqLineDetail {
* 统计类型 * 统计类型
*/ */
@TableField(value = "STATFLAG") @TableField(value = "STATFLAG")
private Short statflag; private Integer statflag;
/** /**
* *

View File

@@ -96,7 +96,7 @@ public class OracleRmpEventDetailPO implements Serializable {
* 处理结果第一条事件发生时间(读comtra文件获取) * 处理结果第一条事件发生时间(读comtra文件获取)
*/ */
@TableField(value = "FIRSTTIME") @TableField(value = "FIRSTTIME")
private Date firstTime; private LocalDateTime firstTime;
/** /**
* 处理结果第一条事件暂降类型字典表PQS_Dicdata * 处理结果第一条事件暂降类型字典表PQS_Dicdata

View File

@@ -94,7 +94,7 @@ public class RmpEventDetailPO implements Serializable {
* 处理结果第一条事件发生时间(读comtra文件获取) * 处理结果第一条事件发生时间(读comtra文件获取)
*/ */
@TableField(value = "first_time") @TableField(value = "first_time")
private Date firstTime; private LocalDateTime firstTime;
/** /**
* 处理结果第一条事件暂降类型字典表PQS_Dicdata * 处理结果第一条事件暂降类型字典表PQS_Dicdata

View File

@@ -2,16 +2,14 @@ package com.njcn.influx.config;
import com.njcn.influx.bo.po.PqDeviceBak; import com.njcn.influx.bo.po.PqDeviceBak;
import com.njcn.influx.bo.po.PqLineBak; import com.njcn.influx.bo.po.PqLineBak;
import com.njcn.influx.mapper.PqLineBakMapper;
import com.njcn.influx.service.IPqDeviceBakService; import com.njcn.influx.service.IPqDeviceBakService;
import com.njcn.influx.service.PqLineBakService; import com.njcn.influx.service.PqLineBakService;
import io.swagger.v3.oas.annotations.servers.Server;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -30,17 +28,24 @@ public class IdMappingCache {
private PqLineBakService pqLineBakService; private PqLineBakService pqLineBakService;
@Autowired @Autowired
private IPqDeviceBakService pqDeviceBakService; private IPqDeviceBakService pqDeviceBakService;
public static Map<String, String> DevIdMapping = new HashMap<>();
public static Map<String, String> LineIdMapping = new HashMap<>();
@Autowired
private RedisUtil redisUtil;
public static List<Integer> oracleIds = new ArrayList<>();
@PostConstruct @PostConstruct
public void init() { public void init() {
Map<String, String> DevIdMapping = new HashMap<>();
Map<String, String> LineIdMapping = new HashMap<>();
List<PqLineBak> resultList = pqLineBakService.list(); List<PqLineBak> resultList = pqLineBakService.list();
for (PqLineBak row : resultList) { for (PqLineBak row : resultList) {
String id = row.getId(); String id = row.getId();
String lineId = row.getLineId(); String lineId = row.getLineId();
oracleIds.add(Integer.valueOf(lineId));
LineIdMapping.put(lineId,id ); LineIdMapping.put(lineId,id );
} }
redisUtil.saveByKey("LineIdMapping",LineIdMapping);
List<PqDeviceBak> list = pqDeviceBakService.list(); List<PqDeviceBak> list = pqDeviceBakService.list();
for (PqDeviceBak row : list) { for (PqDeviceBak row : list) {
@@ -48,9 +53,8 @@ public class IdMappingCache {
String devId = row.getDevId()+""; String devId = row.getDevId()+"";
DevIdMapping.put(devId,id ); DevIdMapping.put(devId,id );
} }
redisUtil.saveByKey("DevIdMapping",DevIdMapping);
} }
// public String getDataById(String id) {
// return IdMapping.get(id);
// }
} }

View File

@@ -0,0 +1,63 @@
package com.njcn.influx.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.njcn.influx.utils.InstantDateDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Instant;
/**
* @author hongawen
* @version 1.0.0
* @date 2021年12月08日 17:53
*/
@Configuration
public class RedisConfig {
/**
* 修改RedisTemplate序列化由JdkSerializationRedisSerializer二进制调整为JSON格式
*/
@Bean("redisTemplate")
public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
// 为了开发方便,一般直接使用<String,object>
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
// 用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
// 指定要序列化的域(field,get,set),访问修饰符(public,private,protected)
//解决Java 8 date/time type java.time.Instant not supported
objectMapper.registerModule(new JavaTimeModule());
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.registerModule(new SimpleModule().addDeserializer(Instant.class, new InstantDateDeserializer()));
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// key采用string的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash的key采用string的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

View File

@@ -0,0 +1,353 @@
package com.njcn.influx.config;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author hongawen
* @version 1.0.0
* @date 2021年04月30日 09:38
*/
@Component
@RequiredArgsConstructor
public class RedisUtil {
private final RedisTemplate<String, Object> redisTemplate;
/**
* 指定key的失效时间
* 秒级别的过期时间
*/
public void expire(String key, long time) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
/**
* 根据key获取过期时间
*/
public long getExpire(String key) {
Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
return Objects.isNull(expireTime) ? 0 : expireTime;
}
/**
* 根据key获取过期时间(切库,切完之后自动换为原来库)
*/
public long getExpire(Integer dbIndex,String key) {
return getExpire(dbIndex,key,true);
}
/**
* 根据key获取过期时间(切库)true:切回原库 false:不切回原库
*/
public long getExpire(Integer dbIndex,String key,Boolean fly) {
Integer index = setDbIndex(dbIndex);
Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
if(fly){
setDbIndex(index);
}
return Objects.isNull(expireTime) ? 0 : expireTime;
}
/**
* 判断key是否存在
*/
public boolean hasKey(String key) {
Boolean hasKeyFlag = redisTemplate.hasKey(key);
if (Objects.isNull(hasKeyFlag)) {
return false;
}
return hasKeyFlag;
}
/**
* 删除某个Key
*/
public void delete(String key) {
redisTemplate.delete(key);
}
/**
* 批量删除keys
*/
public void deleteKeys(String... keys) {
redisTemplate.delete(Arrays.asList(keys));
}
/**
* 指定字符模糊匹配批量删除keys
*/
public void deleteKeysByString(String str) {
Set<String> keys = redisTemplate.keys(str.concat("*"));
// 删除所有匹配的key
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}
}
/**
* 获取key对应的字符数据
*/
public String getStringByKey(String key) {
return (String) redisTemplate.opsForValue().get(key);
}
/**
* 根据key和库索引获取内容字(切库之后,之后查询都是切库数据)
*
* @param dbIndex 指定库索引
* @param key key值
*/
public String getStringByKey(Integer dbIndex,String key) {
return getStringByKey(dbIndex,key,true);
}
/**
* 根据key和库索引获取内容字(切库之后,之后查询都是切库数据)
*
* @param dbIndex 指定库索引
* @param key key值
* @param fly 是否切回原库
*/
public String getStringByKey(Integer dbIndex,String key,Boolean fly) {
Integer index = setDbIndex(dbIndex);
String s = (String) redisTemplate.opsForValue().get(key);
if(fly){
setDbIndex(index);
}
return s;
}
/**
* 获取key对应对象数据
*/
public Object getObjectByKey(String key) {
return redisTemplate.opsForValue().get(key);
}
/**
* 保存数据
*
* @param key 键
* @param value 值
*/
public void saveByKey(String key, Object value) {
redisTemplate.boundValueOps(key).set(value);
}
/**
* 保存数据,指定生命周期(秒)
*
* @param key 键
* @param value 值
* @param expireTime 生命时间
*/
public void saveByKeyWithExpire(String key, Object value, Long expireTime) {
if (expireTime <= 0) {
saveByKey(key, value);
} else {
redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS);
}
}
/**
* 保存数据,指定生命周期(秒)
*
* @param dbIndex 指定库索引
* @param key 键
* @param value 值
* @param expireTime 生命时间
* @param expireTime 生命时间
*/
public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime) {
saveByKeyWithExpire(dbIndex,key,value,expireTime,true);
}
/**
* 保存数据,指定生命周期(秒)
*
* @param dbIndex 指定库索引
* @param key 键
* @param value 值
* @param fly 是否切回原库
*/
public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime,Boolean fly) {
Integer index = setDbIndex(dbIndex);
if (expireTime <= 0) {
saveByKey(key, value);
} else {
redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS);
}
if(fly){
setDbIndex(index);
}
}
/**
* 顺序的递增和递减
*
* @param value 增减根据数值的正负来判断
*/
public void increment(String key, Long value) {
redisTemplate.boundValueOps(key).increment(value);
}
/**
* 添加一个Map集合
*/
public void saveMapValue(String key, Map<String, ?> map, long expireTime) {
redisTemplate.boundHashOps(key).putAll(map);
if (expireTime > 0) {
expire(key, expireTime);
}
}
/**
* 获取map中所有的keys
*/
public Set<?> getMapKeys(String key) {
return redisTemplate.boundHashOps(key).keys();
}
/**
* 获取map中所有的values
*/
public List<?> getMapValues(String key) {
return redisTemplate.boundHashOps(key).values();
}
/**
* 根据map中某个key获取对应的value
*/
public Object getMapValueByMapKey(String redisKey, String mapKey) {
return redisTemplate.boundHashOps(redisKey).get(mapKey);
}
/**
* 根据map中的某个key删除对应的value
*/
public void deleteMapValueByMapKey(String redisKey, String mapKey) {
redisTemplate.boundHashOps(redisKey).delete(mapKey);
}
/**
* 判断map中是否有指定的key
*/
public boolean hasMapKey(String redisKey, String mapKey) {
Boolean hasKey = redisTemplate.boundHashOps(redisKey).hasKey(mapKey);
if (Objects.isNull(hasKey)) {
return false;
}
return hasKey;
}
/**
* 右存放List
*/
public void saveRightListByKey(String key, List<?> values, long expireTime) {
redisTemplate.boundListOps(key).rightPushAll(values);
if (expireTime > 0) {
expire(key, expireTime);
}
}
/**
* 左存放List
*/
public void saveLeftListByKey(String key, List<?> values, long expireTime) {
redisTemplate.boundListOps(key).leftPushAll(values);
if (expireTime > 0) {
expire(key, expireTime);
}
}
/**
* 获取List某范围的值
*
* @param start 起始索引
* @param end 截止索引
*/
public List<?> getListRangeValues(String key, long start, long end) {
long size = getListSize(key);
if ((start < 0 && end < 0) || (start > end)) {
start = 0;
end = size;
} else if (end > size) {
end = size;
} else if (start < 0) {
start = 0;
}
return redisTemplate.boundListOps(key).range(start, end);
}
/**
* 获取List所有数据
*/
public List<?> getListAllValues(String key) {
long size = getListSize(key);
return redisTemplate.boundListOps(key).range(0, size);
}
/**
* 根据指定索引获取指定value
*/
public Object getListValueByIndex(String key, int index) {
return redisTemplate.boundListOps(key).index(index);
}
/**
* 获取list长度
*/
public Long getListSize(String key) {
return Objects.isNull(redisTemplate.boundListOps(key).size()) ? 0 : redisTemplate.boundListOps(key).size();
}
/**
* 根据索引修改某个值
*/
public void updateListValueByIndex(String key, int index, Object newObj) {
redisTemplate.boundListOps(key).set(index, newObj);
}
/**
* 根据某个key模糊查询,并取出value
* @param key
* @return
*/
public List<?> getLikeListAllValues(String key) {
List<Object> info=new ArrayList<>();
for (String s : redisTemplate.keys(key + "*")) {
info.add(redisTemplate.opsForValue().get(s));
}
return info;
}
/**
* 数据切库
* @param dbIndex
* @return
*/
private Integer setDbIndex(Integer dbIndex) {
if (dbIndex == null || dbIndex > 15 || dbIndex < 0) {
dbIndex = 0;
}
LettuceConnectionFactory jedisConnectionFactory = (LettuceConnectionFactory) redisTemplate
.getConnectionFactory();
int database = jedisConnectionFactory.getDatabase();
jedisConnectionFactory.setDatabase(dbIndex);
redisTemplate.setConnectionFactory(jedisConnectionFactory);
//需要刷新才能生效
jedisConnectionFactory.afterPropertiesSet();
// 重置连接
jedisConnectionFactory.resetConnection();
return database;
}
}

View File

@@ -0,0 +1,7 @@
package com.njcn.influx.imapper;
import com.njcn.influx.base.InfluxDbBaseMapper;
import com.njcn.influx.bo.po.InfluxDBComInfoRmation;
public interface InfluxDBComInfoRmationMapper extends InfluxDbBaseMapper<InfluxDBComInfoRmation> {
}

View File

@@ -16,10 +16,7 @@ import org.springframework.stereotype.Service;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@@ -78,7 +75,10 @@ public class OracleEventDetailToMysqlServiceImpl extends ServiceImpl<RmpEventDet
po.setNum(oracleDetail.getNum()); po.setNum(oracleDetail.getNum());
po.setFileFlag(oracleDetail.getFileFlag()); po.setFileFlag(oracleDetail.getFileFlag());
po.setDealFlag(oracleDetail.getDealFlag()); po.setDealFlag(oracleDetail.getDealFlag());
po.setFirstTime(oracleDetail.getFirstTime());
if(Objects.nonNull(oracleDetail.getFirstTime())) {
po.setFirstTime(oracleDetail.getFirstTime().plus(oracleDetail.getFirstMs().intValue(), ChronoUnit.MILLIS));
}
po.setFirstType(oracleDetail.getFirstType()); po.setFirstType(oracleDetail.getFirstType());
po.setFirstMs(oracleDetail.getFirstMs()); po.setFirstMs(oracleDetail.getFirstMs());
po.setEnergy(oracleDetail.getEnergy()); po.setEnergy(oracleDetail.getEnergy());

View File

@@ -5,12 +5,13 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.njcn.influx.bo.po.*; import com.njcn.influx.bo.po.*;
import com.njcn.influx.config.IdMappingCache; import com.njcn.influx.config.RedisUtil;
import com.njcn.influx.mapper.OracleRmpEventDetailPOMapper; import com.njcn.influx.mapper.OracleRmpEventDetailPOMapper;
import com.njcn.influx.mapper.PqDeviceMapper; import com.njcn.influx.mapper.PqDeviceMapper;
import com.njcn.influx.mapper.RmpEventDetailPOMapper; import com.njcn.influx.mapper.RmpEventDetailPOMapper;
import com.njcn.influx.service.*; import com.njcn.influx.service.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@@ -29,6 +30,9 @@ import java.util.stream.Collectors;
@Service @Service
public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatusToMysqlService { public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatusToMysqlService {
@Value("${business.type}")
private Integer systemType;
private final RmpEventDetailPOMapper rmpEventDetailPOMapper; private final RmpEventDetailPOMapper rmpEventDetailPOMapper;
private final IPqDeviceBakService pqDeviceBakService; private final IPqDeviceBakService pqDeviceBakService;
private final PqDeviceMapper pqDeviceMapper; private final PqDeviceMapper pqDeviceMapper;
@@ -39,6 +43,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
private final OraclePqLineDetailService oraclePqLineDetailService; private final OraclePqLineDetailService oraclePqLineDetailService;
private final IPqLineDetailService pqLineDetailService; private final IPqLineDetailService pqLineDetailService;
private final OracleRmpEventDetailPOMapper oracleRmpEventDetailPOMapper; private final OracleRmpEventDetailPOMapper oracleRmpEventDetailPOMapper;
private final RedisUtil redisUtil;
/** /**
* 1.查询oracle装置表信息 * 1.查询oracle装置表信息
@@ -47,6 +52,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
*/ */
@Override @Override
public void monitorStatusSync() { public void monitorStatusSync() {
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
List<DictData> monitorDicList = rmpEventDetailPOMapper.selectByDicCodeList("Line_State"); List<DictData> monitorDicList = rmpEventDetailPOMapper.selectByDicCodeList("Line_State");
List<DictData> devDicList = rmpEventDetailPOMapper.selectByDicCodeList("Dev_Status"); List<DictData> devDicList = rmpEventDetailPOMapper.selectByDicCodeList("Dev_Status");
List<PqDeviceBak> list = pqDeviceBakService.list(); List<PqDeviceBak> list = pqDeviceBakService.list();
@@ -54,13 +60,14 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
Map<Long, String> oracleDevMysql = list.stream().collect(Collectors.toMap(PqDeviceBak::getDevId, PqDeviceBak::getId)); Map<Long, String> oracleDevMysql = list.stream().collect(Collectors.toMap(PqDeviceBak::getDevId, PqDeviceBak::getId));
List<PqDevice> pqDevices = pqDeviceMapper.selectList(null); List<PqDevice> pqDevices = pqDeviceMapper.selectList(null);
//pq的设备表更改通信状态 //pq的设备表更改通信状态
if(systemType == 0){
List<PqDeviceMysql> tempList = new ArrayList<>(); List<PqDeviceMysql> tempList = new ArrayList<>();
pqDevices.stream().forEach(temp->{ pqDevices.stream().forEach(temp->{
String id =""; String id ="";
if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){ if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){
return; return;
}else { }else {
id=IdMappingCache.DevIdMapping.get(temp.getDevIndex()+""); id=DevIdMapping.get(temp.getDevIndex()+"");
} }
pqDeviceMysqlService.update(new LambdaUpdateWrapper<PqDeviceMysql>() pqDeviceMysqlService.update(new LambdaUpdateWrapper<PqDeviceMysql>()
.set(PqDeviceMysql::getRunFlag,temp.getDevFlag()) .set(PqDeviceMysql::getRunFlag,temp.getDevFlag())
@@ -69,7 +76,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
.eq(PqDeviceMysql::getId,id)); .eq(PqDeviceMysql::getId,id));
}); });
}else {
Map<Integer, List<Long>> oracleDevMap = pqDevices.stream().collect(Collectors.groupingBy(PqDevice::getDevFlag Map<Integer, List<Long>> oracleDevMap = pqDevices.stream().collect(Collectors.groupingBy(PqDevice::getDevFlag
, Collectors.mapping(PqDevice::getDevIndex, Collectors.toList()))); , Collectors.mapping(PqDevice::getDevIndex, Collectors.toList())));
oracleDevMap.forEach((key, value) -> { oracleDevMap.forEach((key, value) -> {
@@ -96,6 +103,10 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
); );
} }
}); });
}
} }
@@ -156,6 +167,12 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
.set(PqLineDetail::getDealCapacity, oraclePqLineDetail.getXycMp()) .set(PqLineDetail::getDealCapacity, oraclePqLineDetail.getXycMp())
.set(PqLineDetail::getPtType, oraclePqLineDetail.getPttype()) .set(PqLineDetail::getPtType, oraclePqLineDetail.getPttype())
.set(PqLineDetail::getTimeInterval, oraclePqLineDetail.getTinterval()) .set(PqLineDetail::getTimeInterval, oraclePqLineDetail.getTinterval())
.set(PqLineDetail::getPowerFlag, oraclePqLineDetail.getPowerid())
.set(PqLineDetail::getObjName, oraclePqLineDetail.getObjname())
.set(PqLineDetail::getMonitorId, oraclePqLineDetail.getMonitorId())
.set(PqLineDetail::getStatFlag, oraclePqLineDetail.getStatflag())
.set(PqLineDetail::getPowerSubstationName, oraclePqLineDetail.getPowerSubstationName())
.eq(PqLineDetail::getId, line.getId()); .eq(PqLineDetail::getId, line.getId());
if(loadTypeMap.containsKey(oraclePqLineDetail.getLoadtype())){ if(loadTypeMap.containsKey(oraclePqLineDetail.getLoadtype())){
DictData loadType= rmpEventDetailPOMapper.getDicDataByNameAndTypeName("干扰源类型", loadTypeMap.get(oraclePqLineDetail.getLoadtype())); DictData loadType= rmpEventDetailPOMapper.getDicDataByNameAndTypeName("干扰源类型", loadTypeMap.get(oraclePqLineDetail.getLoadtype()));

View File

@@ -1,15 +1,18 @@
package com.njcn.influx.service.impl; package com.njcn.influx.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.text.StrPool; import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.extra.spring.SpringUtil; import cn.hutool.extra.spring.SpringUtil;
import com.njcn.influx.bo.param.TableEnum; import com.njcn.influx.bo.param.TableEnum;
import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
import com.njcn.influx.bo.po.JobDetailInfluxDB; import com.njcn.influx.bo.po.JobDetailInfluxDB;
import com.njcn.influx.bo.po.JobHistoryLogInfluxdb; import com.njcn.influx.bo.po.JobHistoryLogInfluxdb;
import com.njcn.influx.config.IdMappingCache; import com.njcn.influx.config.IdMappingCache;
import com.njcn.influx.config.RedisUtil;
import com.njcn.influx.service.JobDetailHoursInfluxDBService; import com.njcn.influx.service.JobDetailHoursInfluxDBService;
import com.njcn.influx.service.JobDetailInfluxDBService; import com.njcn.influx.service.JobDetailInfluxDBService;
import com.njcn.influx.service.JobHistoryLogInfluxdbService; import com.njcn.influx.service.JobHistoryLogInfluxdbService;
@@ -65,13 +68,14 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService; private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService;
private final LineTimeMapper lineTimeMapper; private final LineTimeMapper lineTimeMapper;
private final RedisUtil redisUtil;
@Value("${business.slice:2}") @Value("${business.slice:2}")
private int slice; private int slice;
@Override @Override
@Async @Async
public void dataBacthSysc(DataAsynParam dataAsynParam) { public void dataBacthSysc(DataAsynParam dataAsynParam) {
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
Runtime runtime = Runtime.getRuntime(); Runtime runtime = Runtime.getRuntime();
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
@@ -129,11 +133,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
Field id = obj.getClass().getDeclaredField("lineid"); Field id = obj.getClass().getDeclaredField("lineid");
id.setAccessible(true); //暴力访问id id.setAccessible(true); //暴力访问id
String id1 = id.get(obj).toString(); String id1 = id.get(obj).toString();
if (!IdMappingCache.LineIdMapping.containsKey(id1)){ if (!LineIdMapping.containsKey(id1)){
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
iterator.remove(); iterator.remove();
}else { }else {
id.set(obj, IdMappingCache.LineIdMapping.get(id1)); id.set(obj, LineIdMapping.get(id1));
} }
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
@@ -215,6 +219,9 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
//冀北现场存在数据更新时间不在1小时内会丢失数据的情况这边根据装置最新时间往前推1个小时查询数据 //冀北现场存在数据更新时间不在1小时内会丢失数据的情况这边根据装置最新时间往前推1个小时查询数据
@Override @Override
public void hourseDataBacthSysc(DataAsynParam dataAsynParam) { public void hourseDataBacthSysc(DataAsynParam dataAsynParam) {
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
Runtime runtime = Runtime.getRuntime(); Runtime runtime = Runtime.getRuntime();
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
@@ -245,6 +252,37 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
List list = new ArrayList(Collections.emptyList()); List list = new ArrayList(Collections.emptyList());
if("ComInfoRmation".equals(tableName)){
MigrationParam migration = new MigrationParam();
migration.setStartTime(dataAsynParam.getStartDateTime());
migration.setEndTime(dataAsynParam.getEndDateTime());
System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
list.addAll(executor.queryData(migration));
System.out.println("查询到的数据++++++++++++++"+list.size());
//反射获取linid的值并把linid的值替换成mysql对应的devid并记录未匹配的devid
Iterator iterator = list.iterator();
while (iterator.hasNext()) {
try{
Object obj = iterator.next();
//获取
Field id = obj.getClass().getDeclaredField("lineIndex");
id.setAccessible(true); //暴力访问id
Object o = id.get(obj);
if(ObjectUtil.isNotNull(o)){
int index = Integer.parseInt(o.toString())/10;
if (!DevIdMapping.containsKey(index+"")){
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到devid匹配的devid"+index);
iterator.remove();
}else {
id.set(obj, DevIdMapping.get(index+""));
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}else{
//获取监测点最新的数据时间,单监测点查询数据 //获取监测点最新的数据时间,单监测点查询数据
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime(); List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
lineTimeList.forEach(item->{ lineTimeList.forEach(item->{
@@ -267,16 +305,17 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
Field id = obj.getClass().getDeclaredField("lineid"); Field id = obj.getClass().getDeclaredField("lineid");
id.setAccessible(true); //暴力访问id id.setAccessible(true); //暴力访问id
String id1 = id.get(obj).toString(); String id1 = id.get(obj).toString();
if (!IdMappingCache.LineIdMapping.containsKey(id1)){ if (!LineIdMapping.containsKey(id1)){
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
iterator.remove(); iterator.remove();
}else { }else {
id.set(obj, IdMappingCache.LineIdMapping.get(id1)); id.set(obj,LineIdMapping.get(id1));
} }
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
} }
} }
}
//采用弱引用接受后续手动调用gc后会清空该对象 //采用弱引用接受后续手动调用gc后会清空该对象
WeakReference<List> weakReferenceData = new WeakReference<>(list); WeakReference<List> weakReferenceData = new WeakReference<>(list);
int size = 0; int size = 0;
@@ -351,9 +390,130 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
DataAsynParam dataAsynParam1 = new DataAsynParam(); DataAsynParam dataAsynParam1 = new DataAsynParam();
dataAsynParam1.setEndDateTime(startDateTime1.minusHours(-1).minusSeconds(1)); dataAsynParam1.setEndDateTime(startDateTime1.minusHours(-1).minusSeconds(1));
dataAsynParam1.setStartDateTime(startDateTime1); dataAsynParam1.setStartDateTime(startDateTime1);
if(CollUtil.isEmpty(dataAsynParam.getTableNames())){
dataAsynParam1.setTableNames(TableEnum.getExecutableTypes()); dataAsynParam1.setTableNames(TableEnum.getExecutableTypes());
}else{
dataAsynParam1.setTableNames(dataAsynParam.getTableNames());
}
log.info("执行"+startDateTime1+"时刻数据"); log.info("执行"+startDateTime1+"时刻数据");
this.hourseDataBacthSysc(dataAsynParam1); this.hourseDataBacthSysc(dataAsynParam1);
} }
} }
@Override
public void oneMonitorDataTransport(DataAsynParam dataAsynParam) {
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
Runtime runtime = Runtime.getRuntime();
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
//目前且作为2片后续将该属性提取到配置文件中
List<String> tableNames = dataAsynParam.getTableNames();
//嵌套循环,先循环指标,再循环日期
tableNames.stream().forEach(tableName -> {
IReplenishMybatisService executor;
try {
executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
List<LocalDate> dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime());
for (LocalDate date : dateList) {
//分片下,每段时间的小时数
int sliceHour = 24 / slice;
for (int i = 0; i < slice; i++) {
String dateStr = LocalDateTimeUtil.format(date, DatePattern.NORM_DATE_PATTERN);
LocalDateTime startTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * i < 10 ? "0" + sliceHour * i : sliceHour * i) + ":00:00", DatePattern.NORM_DATETIME_PATTERN);
LocalDateTime endTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * (i + 1) - 1 < 10 ? "0" + (sliceHour * (i + 1) - 1) : sliceHour * (i + 1) - 1) + ":59:59", DatePattern.NORM_DATETIME_PATTERN);
//查询该时区的数据,并准备入库
MigrationParam migration = new MigrationParam();
migration.setStartTime(startTime);
migration.setEndTime(endTime);
if (Objects.nonNull(dataAsynParam.getMonitorId())) {
migration.setLineIds(Stream.of(dataAsynParam.getMonitorId()).collect(Collectors.toList()));
}
List list = executor.queryData(migration);
//反射獲取linid的值并把linid的值替换成mysql对应的linid并记录未匹配的lineid
Iterator iterator = list.iterator();
while (iterator.hasNext()) {
try {
Object obj = iterator.next();
//获取
Field id = obj.getClass().getDeclaredField("lineid");
id.setAccessible(true); //暴力访问id
String id1 = id.get(obj).toString();
if (!LineIdMapping.containsKey(id1)) {
log.info(tableName + "表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid" + id1);
iterator.remove();
} else {
id.set(obj, LineIdMapping.get(id1));
}
} catch (Exception e) {
e.printStackTrace();
}
}
//采用弱引用接受后续手动调用gc后会清空该对象
WeakReference<List> weakReferenceData = new WeakReference<>(list);
int size = 0;
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
size = weakReferenceData.get().size();
}
System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
try {
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
//执行目标库的数据处理
Class<?> clazz;
Class<?> clazz2;
//获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
try {
clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName);
clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Method method;
try {
method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
method.setAccessible(true);
Method finalMethod = method;
List list1 = (List) weakReferenceData.get().stream().flatMap(po -> {
try {
Object invoke = finalMethod.invoke(null, po);
Object invoke1 = invoke;
//返回oracle转influxflicker等表是1-1还有1-4这是判断返回是否是集合如何是集合继续扁平化
return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
//插入influxdb
influxDBBaseService.insertBatchBySlice(tableName, list1, 10000);
// size = list1.size();
//最后一片时修改状态
}
//手动执行GC
System.gc();
} catch (Exception exception) {
exception.printStackTrace();
}
System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
}
}
});
}
} }

View File

@@ -3,30 +3,24 @@ package com.njcn.influx.service.impl;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.njcn.influx.bo.param.TableEnum;
import com.njcn.influx.bo.po.PqsOnlinerateMysql; import com.njcn.influx.bo.po.PqsOnlinerateMysql;
import com.njcn.influx.config.IdMappingCache; import com.njcn.influx.config.RedisUtil;
import com.njcn.influx.service.IPqDeviceBakService;
import com.njcn.influx.service.PqsOnlinerateMysqlService; import com.njcn.influx.service.PqsOnlinerateMysqlService;
import com.njcn.oracle.bo.param.DataAsynParam; import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.po.DataFlicker;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.Map;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.influx.bo.po.PqsOnlineratePO; import com.njcn.influx.bo.po.PqsOnlineratePO;
import com.njcn.influx.mapper.PqsOnlineratePOMapper; import com.njcn.influx.mapper.PqsOnlineratePOMapper;
import com.njcn.influx.service.PqsOnlineratePOService; import com.njcn.influx.service.PqsOnlineratePOService;
import org.springframework.transaction.annotation.Transactional;
/** /**
* *
@@ -41,9 +35,10 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j @Slf4j
public class PqsOnlineratePOServiceImpl extends ServiceImpl<PqsOnlineratePOMapper, PqsOnlineratePO> implements PqsOnlineratePOService{ public class PqsOnlineratePOServiceImpl extends ServiceImpl<PqsOnlineratePOMapper, PqsOnlineratePO> implements PqsOnlineratePOService{
private final PqsOnlinerateMysqlService pqsOnlinerateMysqlService; private final PqsOnlinerateMysqlService pqsOnlinerateMysqlService;
private final RedisUtil redisUtil;
@Override @Override
public void minutesDataBacthSysc(DataAsynParam dataAsynParam) { public void minutesDataBacthSysc(DataAsynParam dataAsynParam) {
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
List<PqsOnlineratePO> list = this.lambdaQuery().between(PqsOnlineratePO::getTimeid, LocalDateTimeUtil.beginOfDay(dataAsynParam.getStartDateTime()), LocalDateTimeUtil.endOfDay(dataAsynParam.getStartDateTime()).minusSeconds(1)).list(); List<PqsOnlineratePO> list = this.lambdaQuery().between(PqsOnlineratePO::getTimeid, LocalDateTimeUtil.beginOfDay(dataAsynParam.getStartDateTime()), LocalDateTimeUtil.endOfDay(dataAsynParam.getStartDateTime()).minusSeconds(1)).list();
log.info(dataAsynParam.getStartDateTime()+"-----数据量:"+list.size()); log.info(dataAsynParam.getStartDateTime()+"-----数据量:"+list.size());
if (CollectionUtil.isNotEmpty(list)) { if (CollectionUtil.isNotEmpty(list)) {
@@ -51,10 +46,10 @@ public class PqsOnlineratePOServiceImpl extends ServiceImpl<PqsOnlineratePOMappe
list.stream().forEach(temp -> { list.stream().forEach(temp -> {
PqsOnlinerateMysql pqsOnlinerateMysql = new PqsOnlinerateMysql(); PqsOnlinerateMysql pqsOnlinerateMysql = new PqsOnlinerateMysql();
if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){ if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){
return; return;
}else { }else {
pqsOnlinerateMysql.setDevIndex(IdMappingCache.DevIdMapping.get(temp.getDevIndex()+"")); pqsOnlinerateMysql.setDevIndex(DevIdMapping.get(temp.getDevIndex()+""));
} }
pqsOnlinerateMysql.setTimeId(temp.getTimeid()); pqsOnlinerateMysql.setTimeId(temp.getTimeid());
pqsOnlinerateMysql.setOnlineMin(temp.getOnlinemin()); pqsOnlinerateMysql.setOnlineMin(temp.getOnlinemin());

View File

@@ -19,7 +19,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync @EnableAsync
@DependsOn("proxyMapperRegister") @DependsOn("proxyMapperRegister")
@MapperScan("com.njcn.**.mapper") @MapperScan("com.njcn.**.mapper")
@SpringBootApplication(scanBasePackages = "com.njcn", exclude = {SecurityAutoConfiguration.class, SecurityFilterAutoConfiguration.class}) @SpringBootApplication(scanBasePackages = "com.njcn")
public class InfluxDataApplication { public class InfluxDataApplication {
public static void main(String[] args) { public static void main(String[] args) {

View File

@@ -0,0 +1,19 @@
package com.njcn.influx.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
@Configuration
@EnableWebSecurity
public class Security extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.formLogin().and().authorizeRequests().anyRequest().authenticated().and().csrf().disable();
}
}

View File

@@ -3,7 +3,9 @@ package com.njcn.influx.controller;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.njcn.influx.bo.po.JobDetailInfluxDB; import com.njcn.influx.bo.po.JobDetailInfluxDB;
import com.njcn.influx.config.IdMappingCache;
import com.njcn.influx.service.JobDetailInfluxDBService; import com.njcn.influx.service.JobDetailInfluxDBService;
import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.param.JobQueryParam; import com.njcn.oracle.bo.param.JobQueryParam;
import com.njcn.oracle.bo.po.JobDetail; import com.njcn.oracle.bo.po.JobDetail;
import com.njcn.oracle.service.JobDetailService; import com.njcn.oracle.service.JobDetailService;
@@ -13,10 +15,7 @@ import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/** /**
* Description: * Description:
@@ -34,6 +33,7 @@ import org.springframework.web.bind.annotation.RestController;
public class JobDetailInfluxDBController { public class JobDetailInfluxDBController {
private final JobDetailInfluxDBService jobDetailInfluxDBService; private final JobDetailInfluxDBService jobDetailInfluxDBService;
private final IdMappingCache idMappingCache;
@PostMapping("/jobQuery") @PostMapping("/jobQuery")
@ApiOperation("任务查询") @ApiOperation("任务查询")
@ApiImplicitParam(name = "jobQueryParam", value = "任务查询参数", required = true) @ApiImplicitParam(name = "jobQueryParam", value = "任务查询参数", required = true)
@@ -48,4 +48,12 @@ public class JobDetailInfluxDBController {
boolean b = jobDetailInfluxDBService.jobRemove(jobDetail); boolean b = jobDetailInfluxDBService.jobRemove(jobDetail);
return b; return b;
} }
@GetMapping("/refreshIdCache")
@ApiOperation("刷新缓存")
public Boolean refreshIdCache() {
DataAsynParam dataAsynParam = new DataAsynParam();
idMappingCache.init();
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
}
} }

View File

@@ -2,6 +2,7 @@ package com.njcn.influx.controller;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.StrUtil;
import com.njcn.influx.service.OracleEventDetailToMysqlService; import com.njcn.influx.service.OracleEventDetailToMysqlService;
import com.njcn.influx.service.OracleMonitorStatusToMysqlService; import com.njcn.influx.service.OracleMonitorStatusToMysqlService;
import com.njcn.influx.service.OracleToInfluxDBService; import com.njcn.influx.service.OracleToInfluxDBService;
@@ -16,8 +17,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Collections;
/** /**
@@ -59,6 +63,22 @@ public class OracleToInfluxDBController {
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步"); return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
} }
@GetMapping("/dataSyncTable")
@ApiOperation("数据同步")
public Boolean dataSyncTable(@RequestParam("startDateTime") String startDateTime,
@RequestParam("endDateTime") String endDateTime,
@RequestParam("tableName") String tableName
) {
DataAsynParam dataAsynParam = new DataAsynParam();
dataAsynParam.setStartDateTime(LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATETIME_PATTERN));
dataAsynParam.setEndDateTime(LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATETIME_PATTERN));
if(StrUtil.isNotBlank(tableName)){
dataAsynParam.setTableNames(Collections.singletonList(tableName));
}
oracleToInfluxDBService.AsyncData(dataAsynParam);
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
}
@PostMapping("/oneMonitorDataTransport") @PostMapping("/oneMonitorDataTransport")
@@ -89,10 +109,10 @@ public class OracleToInfluxDBController {
LocalDateTime startDate = LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATE_PATTERN)); LocalDateTime startDate = LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATE_PATTERN));
LocalDateTime endDate = LocalDateTimeUtil.endOfDay(LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATE_PATTERN)); LocalDateTime endDate = LocalDateTimeUtil.endOfDay(LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATE_PATTERN));
long betweenDay = LocalDateTimeUtil.between(startDate, endDate, ChronoUnit.DAYS); long betweenDay = LocalDateTimeUtil.between(startDate, endDate, ChronoUnit.DAYS);
oracleEventDetailToMysqlService.eventBatch(startDate, endDate); oracleEventDetailToMysqlService.eventBatch(startDate, LocalDateTime.of(LocalDate.from(startDate), LocalTime.MAX));
for (int i = 0; i <=betweenDay; i++) { for (int i = 0; i <=betweenDay; i++) {
startDate = LocalDateTimeUtil.offset(startDate, 1, ChronoUnit.DAYS); startDate = LocalDateTimeUtil.offset(startDate, 1, ChronoUnit.DAYS);
oracleEventDetailToMysqlService.eventBatch(startDate, endDate); oracleEventDetailToMysqlService.eventBatch(startDate, LocalDateTime.of(LocalDate.from(startDate), LocalTime.MAX));
} }
} catch (Exception exception) { } catch (Exception exception) {
exception.printStackTrace(); exception.printStackTrace();

View File

@@ -48,14 +48,14 @@ public class OracleToInfluxDBJob {
//每小时03分钟时执行上一个小时的数据同步 //每小时03分钟时执行上一个小时的数据同步
//河北这边比较特殊, //河北这边比较特殊,
@Scheduled(cron="0 40 * * * ?") @Scheduled(cron="0 15 * * * ?")
public void executeHours() { public void executeHours() {
DataAsynParam dataAsynParam = new DataAsynParam(); DataAsynParam dataAsynParam = new DataAsynParam();
// 获取当前时间 // 获取当前时间
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
// 减去一个小时 // 减去一个小时
LocalDateTime oneHourAgo = now.minusHours(1); LocalDateTime oneHourAgo = now.minusHours(2);
// 将分钟和秒设置为0 // 将分钟和秒设置为0
LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS); LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS);
@@ -116,8 +116,8 @@ public class OracleToInfluxDBJob {
* 每天同步台账监测点部分信息 仅数据中心使用 * 每天同步台账监测点部分信息 仅数据中心使用
* @date 2024/3/5 * @date 2024/3/5
*/ */
@Scheduled(cron="0 30 0 * * ?") /* @Scheduled(cron="0 30 0 * * ?")
public void synLedgerMonitor() { public void synLedgerMonitor() {
oracleMonitorStatusToMysqlService.monitorTimeSync(); oracleMonitorStatusToMysqlService.monitorTimeSync();
} }*/
} }

View File

@@ -1,13 +1,20 @@
#文件位置配置
business: business:
#分片次数一定为24的约数1 2 3 4 6 8 12 24 #分片次数一定为24的约数1 2 3 4 6 8 12 24
slice: 4 slice: 4
# 0.pq 1.pms
type: 0
server: server:
port: 8090 port: 8090
#springsecurity默认过期时间30m
servlet:
session:
timeout: 1440m
spring: spring:
security: security:
user: user:
name: njcn name: data_njcn
password: dnzl@#002 password: dnzl@#002
#influxDB内容配置 #influxDB内容配置
influx: influx:
@@ -76,10 +83,23 @@ spring:
password: Pqsadmin123 password: Pqsadmin123
driver-class-name: oracle.jdbc.driver.OracleDriver driver-class-name: oracle.jdbc.driver.OracleDriver
target: target:
url: jdbc:mysql://192.168.1.24:13306/pqsinfo_jb?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT url: jdbc:mysql://192.168.1.102:13306/pqsinfo?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
username: root username: root
password: njcnpqs password: njcnpqs
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
redis:
database: 15
host: 192.168.1.22
port: 16379
password: njcnpqs
timeout: 5000
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
#不做限制的参数配置
#mybatis配置信息 #mybatis配置信息
mybatis-plus: mybatis-plus:
#别名扫描 #别名扫描
@@ -89,9 +109,9 @@ mybatis-plus:
#驼峰命名 #驼峰命名
map-underscore-to-camel-case: true map-underscore-to-camel-case: true
#配置sql日志输出 #配置sql日志输出
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#关闭日志输出 #关闭日志输出
# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
global-config: global-config:
db-config: db-config:
#指定主键生成策略 #指定主键生成策略

View File

@@ -2,9 +2,16 @@
business: business:
#分片次数一定为24的约数1 2 3 4 6 8 12 24 #分片次数一定为24的约数1 2 3 4 6 8 12 24
slice: 4 slice: 4
# 0.pq 1.pms
type: 1
server: server:
port: 8090 port: 8090
spring: spring:
security:
user:
name: data_njcn
password: dnzl@#002
#influxDB内容配置 #influxDB内容配置
influx: influx:
url: http://25.36.232.36:8086 url: http://25.36.232.36:8086

View File

@@ -2,9 +2,15 @@
business: business:
#分片次数一定为24的约数1 2 3 4 6 8 12 24 #分片次数一定为24的约数1 2 3 4 6 8 12 24
slice: 4 slice: 4
# 0.pq 1.pms
type: 0
server: server:
port: 8090 port: 8090
spring: spring:
security:
user:
name: data_njcn
password: dnzl@#002
#influxDB内容配置 #influxDB内容配置
influx: influx:
url: http://192.168.1.102:8086 url: http://192.168.1.102:8086

View File

@@ -0,0 +1,243 @@
/**
* Created by SLICE_30_K on 2017/5/22.
*
* 支持一般Base64的编码和解码
* 支持符合RFC_4648标准中"URL and Filename Safe Alphabet"的URL安全Base64编解码
* 支持中文字符的编解码(Unicode编码)
*/
;(function (root, factory) {
if (typeof exports === "object") {
// CommonJS
module.exports = exports = factory();
}
else if (typeof define === "function" && define.amd) {
// AMD
define(factory);
}
else {
// Global (browser)
window.BASE64 = factory();
}
}(this, function () {
var BASE64_MAPPING = [
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '+', '/'
];
var URLSAFE_BASE64_MAPPING = [
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '-', '_'
];
var _toBinary = function (ascii) {
var binary = [];
while (ascii > 0) {
var b = ascii % 2;
ascii = Math.floor(ascii / 2);
binary.push(b);
}
binary.reverse();
return binary;
};
var _toDecimal = function (binary) {
var dec = 0;
var p = 0;
for (var i = binary.length - 1; i >= 0; --i) {
var b = binary[i];
if (b == 1) {
dec += Math.pow(2, p);
}
++p;
}
return dec;
};
var _toUTF8Binary = function (c, binaryArray) {
var mustLen = (8 - (c + 1)) + ((c - 1) * 6);
var fatLen = binaryArray.length;
var diff = mustLen - fatLen;
while (--diff >= 0) {
binaryArray.unshift(0);
}
var binary = [];
var _c = c;
while (--_c >= 0) {
binary.push(1);
}
binary.push(0);
var i = 0, len = 8 - (c + 1);
for (; i < len; ++i) {
binary.push(binaryArray[i]);
}
for (var j = 0; j < c - 1; ++j) {
binary.push(1);
binary.push(0);
var sum = 6;
while (--sum >= 0) {
binary.push(binaryArray[i++]);
}
}
return binary;
};
var _toBinaryArray = function (str) {
var binaryArray = [];
for (var i = 0, len = str.length; i < len; ++i) {
var unicode = str.charCodeAt(i);
var _tmpBinary = _toBinary(unicode);
if (unicode < 0x80) {
var _tmpdiff = 8 - _tmpBinary.length;
while (--_tmpdiff >= 0) {
_tmpBinary.unshift(0);
}
binaryArray = binaryArray.concat(_tmpBinary);
} else if (unicode >= 0x80 && unicode <= 0x7FF) {
binaryArray = binaryArray.concat(_toUTF8Binary(2, _tmpBinary));
} else if (unicode >= 0x800 && unicode <= 0xFFFF) {//UTF-8 3byte
binaryArray = binaryArray.concat(_toUTF8Binary(3, _tmpBinary));
} else if (unicode >= 0x10000 && unicode <= 0x1FFFFF) {//UTF-8 4byte
binaryArray = binaryArray.concat(_toUTF8Binary(4, _tmpBinary));
} else if (unicode >= 0x200000 && unicode <= 0x3FFFFFF) {//UTF-8 5byte
binaryArray = binaryArray.concat(_toUTF8Binary(5, _tmpBinary));
} else if (unicode >= 4000000 && unicode <= 0x7FFFFFFF) {//UTF-8 6byte
binaryArray = binaryArray.concat(_toUTF8Binary(6, _tmpBinary));
}
}
return binaryArray;
};
var _toUnicodeStr = function (binaryArray) {
var unicode;
var unicodeBinary = [];
var str = "";
for (var i = 0, len = binaryArray.length; i < len;) {
if (binaryArray[i] == 0) {
unicode = _toDecimal(binaryArray.slice(i, i + 8));
str += String.fromCharCode(unicode);
i += 8;
} else {
var sum = 0;
while (i < len) {
if (binaryArray[i] == 1) {
++sum;
} else {
break;
}
++i;
}
unicodeBinary = unicodeBinary.concat(binaryArray.slice(i + 1, i + 8 - sum));
i += 8 - sum;
while (sum > 1) {
unicodeBinary = unicodeBinary.concat(binaryArray.slice(i + 2, i + 8));
i += 8;
--sum;
}
unicode = _toDecimal(unicodeBinary);
str += String.fromCharCode(unicode);
unicodeBinary = [];
}
}
return str;
};
var _encode = function (str, url_safe) {
var base64_Index = [];
var binaryArray = _toBinaryArray(str);
var dictionary = url_safe ? URLSAFE_BASE64_MAPPING : BASE64_MAPPING;
var extra_Zero_Count = 0;
for (var i = 0, len = binaryArray.length; i < len; i += 6) {
var diff = (i + 6) - len;
if (diff == 2) {
extra_Zero_Count = 2;
} else if (diff == 4) {
extra_Zero_Count = 4;
}
var _tmpExtra_Zero_Count = extra_Zero_Count;
while (--_tmpExtra_Zero_Count >= 0) {
binaryArray.push(0);
}
base64_Index.push(_toDecimal(binaryArray.slice(i, i + 6)));
}
var base64 = '';
for (var i = 0, len = base64_Index.length; i < len; ++i) {
base64 += dictionary[base64_Index[i]];
}
for (var i = 0, len = extra_Zero_Count / 2; i < len; ++i) {
base64 += '=';
}
return base64;
};
var _decode = function (_base64Str, url_safe) {
var _len = _base64Str.length;
var extra_Zero_Count = 0;
var dictionary = url_safe ? URLSAFE_BASE64_MAPPING : BASE64_MAPPING;
if (_base64Str.charAt(_len - 1) == '=') {
if (_base64Str.charAt(_len - 2) == '=') {//两个等号说明补了4个0
extra_Zero_Count = 4;
_base64Str = _base64Str.substring(0, _len - 2);
} else {//一个等号说明补了2个0
extra_Zero_Count = 2;
_base64Str = _base64Str.substring(0, _len - 1);
}
}
var binaryArray = [];
for (var i = 0, len = _base64Str.length; i < len; ++i) {
var c = _base64Str.charAt(i);
for (var j = 0, size = dictionary.length; j < size; ++j) {
if (c == dictionary[j]) {
var _tmp = _toBinary(j);
/*不足6位的补0*/
var _tmpLen = _tmp.length;
if (6 - _tmpLen > 0) {
for (var k = 6 - _tmpLen; k > 0; --k) {
_tmp.unshift(0);
}
}
binaryArray = binaryArray.concat(_tmp);
break;
}
}
}
if (extra_Zero_Count > 0) {
binaryArray = binaryArray.slice(0, binaryArray.length - extra_Zero_Count);
}
var str = _toUnicodeStr(binaryArray);
return str;
};
var __BASE64 = {
encode: function (str) {
return _encode(str, false);
},
decode: function (base64Str) {
return _decode(base64Str, false);
},
urlsafe_encode: function (str) {
return _encode(str, true);
},
urlsafe_decode: function (base64Str) {
return _decode(base64Str, true);
}
};
return __BASE64;
}));

View File

@@ -8,6 +8,7 @@
<script src="./vue.js"></script> <script src="./vue.js"></script>
<script src="./element.js"></script> <script src="./element.js"></script>
<script src="./locale.js"></script> <script src="./locale.js"></script>
<script src="./base.js"></script>
</head> </head>
<body> <body>
<div id="app" > <div id="app" >
@@ -126,7 +127,7 @@
<script> <script>
const { createApp, ref, reactive } = Vue const { createApp, ref, reactive } = Vue
createApp({ createApp({
setup() { setup: function () {
const tableData = ref([]) const tableData = ref([])
const message = ref('Hello Vue!') const message = ref('Hello Vue!')
// 格式化时间YYYY-MM-DD // 格式化时间YYYY-MM-DD
@@ -145,18 +146,18 @@
const formInline = reactive({ const formInline = reactive({
loading: true, loading: true,
states: ['0', '1'], states: ['0', '1'],
checkAll:true, checkAll: true,
tableNames: [ tableNames: [
'DataFlicker', 'DataFlicker',
<!-- 'DataFluc',--> <!-- 'DataFluc',-->
<!-- 'DataHarmphasicI',--> <!-- 'DataHarmphasicI',-->
<!-- 'DataHarmphasicV',--> <!-- 'DataHarmphasicV',-->
<!-- 'DataHarmpowerP',--> <!-- 'DataHarmpowerP',-->
<!-- 'DataHarmpowerQ',--> <!-- 'DataHarmpowerQ',-->
<!-- 'DataHarmpowerS',--> <!-- 'DataHarmpowerS',-->
<!-- 'DataHarmrateI',--> <!-- 'DataHarmrateI',-->
'DataHarmrateV', 'DataHarmrateV',
<!-- 'DataInharmI',--> <!-- 'DataInharmI',-->
'DataInharmV', 'DataInharmV',
'DataI', 'DataI',
'DataPlt', 'DataPlt',
@@ -165,34 +166,40 @@
], ],
tableNames2: [ tableNames2: [
'DataFlicker', 'DataFlicker',
<!-- 'DataFluc',--> <!-- 'DataFluc',-->
<!-- 'DataHarmphasicI',--> <!-- 'DataHarmphasicI',-->
<!-- 'DataHarmphasicV',--> <!-- 'DataHarmphasicV',-->
<!-- 'DataHarmpowerP',--> <!-- 'DataHarmpowerP',-->
<!-- 'DataHarmpowerQ',--> <!-- 'DataHarmpowerQ',-->
<!-- 'DataHarmpowerS',--> <!-- 'DataHarmpowerS',-->
<!-- 'DataHarmrateI',--> <!-- 'DataHarmrateI',-->
'DataHarmrateV', 'DataHarmrateV',
<!-- 'DataInharmI',--> <!-- 'DataInharmI',-->
'DataInharmV', 'DataInharmV',
'DataI', 'DataI',
'DataPlt', 'DataPlt',
'DataV', 'DataV',
], ],
date: [formatTime(new Date()), formatTime(new Date())], date: [formatTime(new Date()), formatTime(new Date())],
monitorId:'', monitorId: '',
total: 0, total: 0,
currentPage: 1, currentPage: 1,
pageSize: 20, pageSize: 20,
}) })
const query = () => { const query = () => {
const username = "data_njcn"
const password = "dnzl@#002"
const auth = username + ":" + password
const encodedAuth = BASE64.encode(auth);
formInline.loading = true formInline.loading = true
console.log('submit!') console.log('submit!')
let url = window.location.origin + '/jobDetail/jobQuery'; let url = window.location.origin + '/jobDetail/jobQuery';
fetch(url, { fetch(url, {
method: 'POST', method: 'POST',
headers: { headers: {
"Authorization":'Basic '+ btoa(auth),
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, },
body: JSON.stringify({ body: JSON.stringify({
@@ -210,9 +217,9 @@
}) })
.then((res) => { .then((res) => {
console.log(res) console.log(res)
res.records.forEach(item=>{ res.records.forEach(item => {
for(let key in item){ for (let key in item) {
if(!item[key] && item[key] !== 0){ if (!item[key] && item[key] !== 0) {
item[key] = '/' item[key] = '/'
} }
} }
@@ -268,7 +275,7 @@
const oneMonitorMove = () => { const oneMonitorMove = () => {
if(!formInline.monitorId){ if (!formInline.monitorId) {
ElementPlus.ElMessage.error('请填写监测点id') ElementPlus.ElMessage.error('请填写监测点id')
return return
} }
@@ -277,7 +284,7 @@
tableNames: formInline.tableNames, tableNames: formInline.tableNames,
startTime: formInline.date[0], startTime: formInline.date[0],
endTime: formInline.date[1], endTime: formInline.date[1],
monitorId:formInline.monitorId monitorId: formInline.monitorId
} }
fetch('/data/oneMonitorDataTransport', { fetch('/data/oneMonitorDataTransport', {
@@ -301,11 +308,11 @@
} }
// 处理数据精度 // 处理数据精度
const formatNumber = (row,column) =>{ const formatNumber = (row, column) => {
const duration = row[column]; const duration = row[column];
if(/^-?\d*\.?\d+$/.test(duration)){ if (/^-?\d*\.?\d+$/.test(duration)) {
let time = Number(duration); let time = Number(duration);
return (time/60).toFixed(2) + " 分钟"; return (time / 60).toFixed(2) + " 分钟";
} }
return duration; return duration;
} }
@@ -332,18 +339,17 @@
} }
}) })
} }
const handleCheckAllChange = ()=>{ const handleCheckAllChange = () => {
console.log(formInline.checkAll) console.log(formInline.checkAll)
if(formInline.checkAll){ if (formInline.checkAll) {
formInline.tableNames = JSON.parse(JSON.stringify(formInline.tableNames2)) formInline.tableNames = JSON.parse(JSON.stringify(formInline.tableNames2))
}else{ } else {
formInline.tableNames = [] formInline.tableNames = []
} }
} }
return { return {
handleCheckAllChange, handleCheckAllChange,
message, message,

View File

@@ -0,0 +1,39 @@
package com.njcn.oracle.bo.po;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* @author wr
* @description
* @date 2024/9/25 11:01
*/
@Data
@TableName("PQS_COMINFORMATION")
public class ComInfoRmation implements Serializable {
private static final long serialVersionUID = 1L;
@TableField("UPDATETIME")
private LocalDateTime updateTime;
@TableField("LINE_INDEX")
private String lineIndex;
@TableField("TYPE")
private Integer type;
@TableField("DESCRIPTION")
private String description;
@TableField("REMARK")
private String remark;
@TableField("STATE")
private Integer state;
}

View File

@@ -0,0 +1,16 @@
package com.njcn.oracle.mapper;
import com.njcn.oracle.bo.po.ComInfoRmation;
import com.njcn.oracle.mybatis.mapper.BatchBaseMapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author hongawen
* @since 2023-12-28
*/
public interface ComInfoRmationMapper extends BatchBaseMapper<ComInfoRmation> {
}

View File

@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.oracle.mapper.ComInfoRmationMapper">
</mapper>

View File

@@ -11,7 +11,7 @@
LEFT JOIN PQ_DEVICE pd ON LEFT JOIN PQ_DEVICE pd ON
PL.DEV_INDEX = PD.DEV_INDEX PL.DEV_INDEX = PD.DEV_INDEX
WHERE WHERE
PL.STATUS = 0 pd.DEVFLAG = 0
</select> </select>
</mapper> </mapper>

View File

@@ -0,0 +1,16 @@
package com.njcn.oracle.service;
import com.njcn.oracle.bo.po.ComInfoRmation;
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
/**
* @Description: 监测点状态监测数据
* @param
* @return:
* @Author: wr
* @Date: 2024/9/25 13:52
*/
public interface IComInfoRmationService extends IReplenishMybatisService<ComInfoRmation> {
}

View File

@@ -0,0 +1,65 @@
package com.njcn.oracle.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.oracle.bo.param.MigrationParam;
import com.njcn.oracle.bo.po.ComInfoRmation;
import com.njcn.oracle.mapper.ComInfoRmationMapper;
import com.njcn.oracle.mybatis.service.impl.ReplenishMybatisServiceImpl;
import com.njcn.oracle.service.IComInfoRmationService;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import java.util.List;
/**
* @Description:
* @Author: wr
* @Date: 2024/9/25 13:58
*/
@Service
public class ComInfoRmationServiceImpl extends ReplenishMybatisServiceImpl<ComInfoRmationMapper, ComInfoRmation> implements IComInfoRmationService {
@Override
public List<ComInfoRmation> queryData(MigrationParam migrationParam) {
//查询时间范围内的数据
LambdaQueryWrapper<ComInfoRmation> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime());
if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) {
lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds());
}
return this.baseMapper.selectList(lambdaQueryWrapper);
}
@Override
@DS("target")
public void clearTargetData(MigrationParam migrationParam) {
LambdaQueryWrapper<ComInfoRmation> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime());
if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) {
lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds());
}
this.baseMapper.delete(lambdaQueryWrapper);
}
/**
* 默认500分片如果字段超过20以上建议重写方法调整为1000分片重写时不要忘记@DS注解
*
* @param data 数据集合
*/
@Override
@DS("target")
public void insertBatchByDB(List<ComInfoRmation> data) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
this.insertBatchBySlice(data, 100);
stopWatch.stop();
System.out.printf("pq_ComInfoRmation总计:%d条耗时执行时长%f 秒.%n", data.size(), stopWatch.getTotalTimeSeconds());
}
}