Compare commits
10 Commits
33fcf37b12
...
026d71a060
| Author | SHA1 | Date | |
|---|---|---|---|
| 026d71a060 | |||
|
|
b43728e154 | ||
|
|
85659629de | ||
|
|
6518adc8ce | ||
|
|
47518928e0 | ||
|
|
00beb8efa8 | ||
|
|
401d2a4e97 | ||
| 53cc3c85e3 | |||
| 43b21859af | |||
| a857953854 |
24
README.md
Normal file
24
README.md
Normal file
@@ -0,0 +1,24 @@
|
||||
# 数据同步注意事项
|
||||
Oralce同步到InfluxDB
|
||||
1.确认数据库oralce,mysql,influxdb的连接信息准确性
|
||||
2.确认mysql数据库的pq_line_bak,pq_device_bak两张表,mysql与oracle台账监测点映射关系正确
|
||||
3.历史数据与实时数据必须分成两个jar包跑,且两个jar包的端口不能冲突与占用。
|
||||
4.在历史jar中需要将实时数据的批处理任务停止,否知只会跑实时数据,历史数据接口调用无效
|
||||
5.在调用历史数据接口时将OracleToInfluxDBServiceImpl的
|
||||
//获取监测点最新的数据时间,单监测点查询数据
|
||||
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
|
||||
lineTimeList.forEach(item->{
|
||||
MigrationParam migration = new MigrationParam();
|
||||
migration.setLineIds(Collections.singletonList(item.getLineIndex()));
|
||||
migration.setStartTime(item.getUpdateTime().minusHours(2));
|
||||
migration.setEndTime(item.getUpdateTime());
|
||||
System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
|
||||
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
list.addAll(executor.queryData(migration));
|
||||
});这一部分代码注释掉,这段代码是冀北特有的,如果不注释掉始终同步最新的数据时间的数据导致历史数据接口调用失败。
|
||||
6.同步暂态数据接口:如下例http://10.95.53.49:8092/data/eventBatch?startDateTime=2024-01-01&endDateTime=2024-12-06 (吴瑞开发的有问题咨询吴瑞)
|
||||
7.同步稳态数据接口:如下例http://10.95.53.49:9091/data/dataSyncHours?startDateTime=2024-11-25 00:00:00&endDateTime=2024-01-01 00:00:00(需注意是重最新时间朝更久的时间同步的,因此
|
||||
startDateTime>endDateTime,否则无法同步).
|
||||
8.当新老系统并行期间,可能老系统依旧会新建台账,但是pq_line_bak不能及时更新,部分新增监测点实时数据同步不了。
|
||||
|
||||
@@ -98,6 +98,22 @@
|
||||
<artifactId>oracle-source</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.logging</groupId>
|
||||
<artifactId>logging-parent</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-pool2</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -28,7 +28,9 @@ public enum TableEnum {
|
||||
DATAINHARMV("DataInharmV","电压间谐波幅值数据表", 4),
|
||||
DATAI("DataI","谐波电流幅值数据表", 4),
|
||||
DATAPLT("DataPlt","长时闪变数据表", 1),
|
||||
DATAV("DataV","谐波电压幅值数据表", 4);
|
||||
DATAV("DataV","谐波电压幅值数据表", 4),
|
||||
COMINFORMATION("ComInfoRmation","监测点状态监测数据", 4),
|
||||
;
|
||||
|
||||
|
||||
private final String code;
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.njcn.influx.bo.po;
|
||||
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.njcn.influx.utils.InstantDateSerializer;
|
||||
import com.njcn.oracle.bo.po.ComInfoRmation;
|
||||
import lombok.Data;
|
||||
import org.influxdb.annotation.Column;
|
||||
import org.influxdb.annotation.Measurement;
|
||||
import org.influxdb.annotation.TimeColumn;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2022/7/12 9:55
|
||||
*/
|
||||
@Data
|
||||
@Measurement(name = "pqs_communicate")
|
||||
public class InfluxDBComInfoRmation {
|
||||
|
||||
@TimeColumn
|
||||
@Column(name = "time")
|
||||
@JsonSerialize(using = InstantDateSerializer.class)
|
||||
private Instant time;
|
||||
|
||||
@Column(name = "dev_id")
|
||||
private String devId;
|
||||
|
||||
@Column(name = "description")
|
||||
private String description;
|
||||
|
||||
@Column(name = "type")
|
||||
private Integer type;
|
||||
|
||||
public static InfluxDBComInfoRmation oralceToInfluxDB(ComInfoRmation comInfoRmation) {
|
||||
if (comInfoRmation == null) {
|
||||
return null;
|
||||
}
|
||||
InfluxDBComInfoRmation influxDBDataCommunicate = new InfluxDBComInfoRmation();
|
||||
Instant instant = comInfoRmation.getUpdateTime().atZone(ZoneId.systemDefault()).toInstant();
|
||||
|
||||
influxDBDataCommunicate.setTime(instant);
|
||||
influxDBDataCommunicate.setDevId(comInfoRmation.getLineIndex());
|
||||
influxDBDataCommunicate.setDescription(comInfoRmation.getDescription());
|
||||
influxDBDataCommunicate.setType(comInfoRmation.getType());
|
||||
|
||||
return influxDBDataCommunicate;
|
||||
}
|
||||
}
|
||||
@@ -58,8 +58,8 @@ public class InfluxDBDataFlicker {
|
||||
influxDBDataFlicker.setLineId(dataFlicker.getLineid());
|
||||
influxDBDataFlicker.setPhaseType(dataFlicker.getPhasicType());
|
||||
influxDBDataFlicker.setFluc(dataFlicker.getFluc());
|
||||
influxDBDataFlicker.setPlt(dataFlicker.getPst());
|
||||
influxDBDataFlicker.setPst(dataFlicker.getPlt());
|
||||
influxDBDataFlicker.setPlt(dataFlicker.getPlt());
|
||||
influxDBDataFlicker.setPst(dataFlicker.getPst());
|
||||
influxDBDataFlicker.setQualityFlag(dataFlicker.getQualityflag()+"");
|
||||
|
||||
|
||||
|
||||
@@ -275,9 +275,9 @@ public class InfluxDBDataHarmpowerP {
|
||||
influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50());
|
||||
}
|
||||
else if (valueType.equals("MAX")){
|
||||
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDf());
|
||||
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPf());
|
||||
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getP());
|
||||
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDfMax());
|
||||
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPfMax());
|
||||
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getPMax());
|
||||
|
||||
influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Max());
|
||||
influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Max());
|
||||
@@ -331,9 +331,9 @@ public class InfluxDBDataHarmpowerP {
|
||||
influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50Max());
|
||||
}
|
||||
else if (valueType.equals("MIN")){
|
||||
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDf());
|
||||
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPf());
|
||||
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getP());
|
||||
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDfMin());
|
||||
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPfMin());
|
||||
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getPMin());
|
||||
|
||||
influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Min());
|
||||
influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Min());
|
||||
@@ -387,9 +387,9 @@ public class InfluxDBDataHarmpowerP {
|
||||
influxDBDataHarmPhasicV.setP50(dataHarmpowerP.getP50Min());
|
||||
}
|
||||
else if (valueType.equals("CP95")){
|
||||
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDf());
|
||||
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPf());
|
||||
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getP());
|
||||
influxDBDataHarmPhasicV.setDf(dataHarmpowerP.getDfCp95());
|
||||
influxDBDataHarmPhasicV.setPf(dataHarmpowerP.getPfCp95());
|
||||
influxDBDataHarmPhasicV.setP(dataHarmpowerP.getPCp95());
|
||||
|
||||
influxDBDataHarmPhasicV.setP1(dataHarmpowerP.getP1Cp95());
|
||||
influxDBDataHarmPhasicV.setP2(dataHarmpowerP.getP2Cp95());
|
||||
|
||||
@@ -268,7 +268,7 @@ public class InfluxDBDataHarmpowerQ {
|
||||
influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50());
|
||||
}
|
||||
else if (valueType.equals("MAX")){
|
||||
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQ());
|
||||
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQMax());
|
||||
|
||||
influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Max());
|
||||
influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Max());
|
||||
@@ -322,7 +322,7 @@ public class InfluxDBDataHarmpowerQ {
|
||||
influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50Max());
|
||||
}
|
||||
else if (valueType.equals("MIN")){
|
||||
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQ());
|
||||
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQMin());
|
||||
|
||||
influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Min());
|
||||
influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Min());
|
||||
@@ -376,7 +376,7 @@ public class InfluxDBDataHarmpowerQ {
|
||||
influxDBDataHarmPhasicV.setQ50(dataHarmpowerQ.getQ50Min());
|
||||
}
|
||||
else if (valueType.equals("CP95")){
|
||||
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQ());
|
||||
influxDBDataHarmPhasicV.setQ(dataHarmpowerQ.getQCp95());
|
||||
|
||||
influxDBDataHarmPhasicV.setQ1(dataHarmpowerQ.getQ1Cp95());
|
||||
influxDBDataHarmPhasicV.setQ2(dataHarmpowerQ.getQ2Cp95());
|
||||
|
||||
@@ -266,7 +266,7 @@ public class InfluxDBDataHarmpowerS {
|
||||
influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50());
|
||||
}
|
||||
else if (valueType.equals("MAX")){
|
||||
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getS());
|
||||
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getSMax());
|
||||
|
||||
influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Max());
|
||||
influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Max());
|
||||
@@ -320,7 +320,7 @@ public class InfluxDBDataHarmpowerS {
|
||||
influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50Max());
|
||||
}
|
||||
else if (valueType.equals("MIN")){
|
||||
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getS());
|
||||
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getSMin());
|
||||
|
||||
influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Min());
|
||||
influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Min());
|
||||
@@ -374,7 +374,7 @@ public class InfluxDBDataHarmpowerS {
|
||||
influxDBDataHarmPhasicV.setS50(dataHarmpowerS.getS50Min());
|
||||
}
|
||||
else if (valueType.equals("CP95")){
|
||||
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getS());
|
||||
influxDBDataHarmPhasicV.setS(dataHarmpowerS.getSCp95());
|
||||
|
||||
influxDBDataHarmPhasicV.setS1(dataHarmpowerS.getS1Cp95());
|
||||
influxDBDataHarmPhasicV.setS2(dataHarmpowerS.getS2Cp95());
|
||||
|
||||
@@ -106,7 +106,7 @@ public class OraclePqLineDetail {
|
||||
* 统计类型
|
||||
*/
|
||||
@TableField(value = "STATFLAG")
|
||||
private Short statflag;
|
||||
private Integer statflag;
|
||||
|
||||
/**
|
||||
*
|
||||
|
||||
@@ -96,7 +96,7 @@ public class OracleRmpEventDetailPO implements Serializable {
|
||||
* 处理结果第一条事件发生时间(读comtra文件获取)
|
||||
*/
|
||||
@TableField(value = "FIRSTTIME")
|
||||
private Date firstTime;
|
||||
private LocalDateTime firstTime;
|
||||
|
||||
/**
|
||||
* 处理结果第一条事件暂降类型(字典表PQS_Dicdata)
|
||||
|
||||
@@ -94,7 +94,7 @@ public class RmpEventDetailPO implements Serializable {
|
||||
* 处理结果第一条事件发生时间(读comtra文件获取)
|
||||
*/
|
||||
@TableField(value = "first_time")
|
||||
private Date firstTime;
|
||||
private LocalDateTime firstTime;
|
||||
|
||||
/**
|
||||
* 处理结果第一条事件暂降类型(字典表PQS_Dicdata)
|
||||
|
||||
@@ -2,16 +2,14 @@ package com.njcn.influx.config;
|
||||
|
||||
import com.njcn.influx.bo.po.PqDeviceBak;
|
||||
import com.njcn.influx.bo.po.PqLineBak;
|
||||
import com.njcn.influx.mapper.PqLineBakMapper;
|
||||
import com.njcn.influx.service.IPqDeviceBakService;
|
||||
import com.njcn.influx.service.PqLineBakService;
|
||||
import io.swagger.v3.oas.annotations.servers.Server;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -30,17 +28,24 @@ public class IdMappingCache {
|
||||
private PqLineBakService pqLineBakService;
|
||||
@Autowired
|
||||
private IPqDeviceBakService pqDeviceBakService;
|
||||
public static Map<String, String> DevIdMapping = new HashMap<>();
|
||||
public static Map<String, String> LineIdMapping = new HashMap<>();
|
||||
|
||||
@Autowired
|
||||
private RedisUtil redisUtil;
|
||||
|
||||
public static List<Integer> oracleIds = new ArrayList<>();
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
Map<String, String> DevIdMapping = new HashMap<>();
|
||||
Map<String, String> LineIdMapping = new HashMap<>();
|
||||
List<PqLineBak> resultList = pqLineBakService.list();
|
||||
for (PqLineBak row : resultList) {
|
||||
String id = row.getId();
|
||||
String lineId = row.getLineId();
|
||||
oracleIds.add(Integer.valueOf(lineId));
|
||||
LineIdMapping.put(lineId,id );
|
||||
}
|
||||
redisUtil.saveByKey("LineIdMapping",LineIdMapping);
|
||||
|
||||
List<PqDeviceBak> list = pqDeviceBakService.list();
|
||||
|
||||
for (PqDeviceBak row : list) {
|
||||
@@ -48,9 +53,8 @@ public class IdMappingCache {
|
||||
String devId = row.getDevId()+"";
|
||||
DevIdMapping.put(devId,id );
|
||||
}
|
||||
redisUtil.saveByKey("DevIdMapping",DevIdMapping);
|
||||
|
||||
}
|
||||
|
||||
// public String getDataById(String id) {
|
||||
// return IdMapping.get(id);
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.njcn.influx.config;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import com.njcn.influx.utils.InstantDateDeserializer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
|
||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
* @version 1.0.0
|
||||
* @date 2021年12月08日 17:53
|
||||
*/
|
||||
@Configuration
|
||||
public class RedisConfig {
|
||||
|
||||
|
||||
/**
|
||||
* 修改RedisTemplate序列化由JdkSerializationRedisSerializer二进制调整为:JSON格式
|
||||
*/
|
||||
@Bean("redisTemplate")
|
||||
public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
|
||||
// 为了开发方便,一般直接使用<String,object>
|
||||
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
|
||||
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
|
||||
// 用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
|
||||
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
// 指定要序列化的域(field,get,set),访问修饰符(public,private,protected)
|
||||
//解决Java 8 date/time type java.time.Instant not supported
|
||||
objectMapper.registerModule(new JavaTimeModule());
|
||||
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
objectMapper.registerModule(new SimpleModule().addDeserializer(Instant.class, new InstantDateDeserializer()));
|
||||
|
||||
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
|
||||
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
|
||||
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
|
||||
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
|
||||
// key采用string的序列化方式
|
||||
redisTemplate.setKeySerializer(stringRedisSerializer);
|
||||
// value序列化方式采用jackson
|
||||
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
|
||||
// hash的key采用string的序列化方式
|
||||
redisTemplate.setHashKeySerializer(stringRedisSerializer);
|
||||
// hash的value序列化方式采用jackson
|
||||
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
|
||||
redisTemplate.afterPropertiesSet();
|
||||
|
||||
return redisTemplate;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,353 @@
|
||||
package com.njcn.influx.config;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
* @version 1.0.0
|
||||
* @date 2021年04月30日 09:38
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RedisUtil {
|
||||
|
||||
private final RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
/**
|
||||
* 指定key的失效时间
|
||||
* 秒级别的过期时间
|
||||
*/
|
||||
public void expire(String key, long time) {
|
||||
redisTemplate.expire(key, time, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key获取过期时间
|
||||
*/
|
||||
public long getExpire(String key) {
|
||||
Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
|
||||
return Objects.isNull(expireTime) ? 0 : expireTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key获取过期时间(切库,切完之后自动换为原来库)
|
||||
*/
|
||||
public long getExpire(Integer dbIndex,String key) {
|
||||
return getExpire(dbIndex,key,true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key获取过期时间(切库)true:切回原库 false:不切回原库
|
||||
*/
|
||||
public long getExpire(Integer dbIndex,String key,Boolean fly) {
|
||||
Integer index = setDbIndex(dbIndex);
|
||||
Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
|
||||
if(fly){
|
||||
setDbIndex(index);
|
||||
}
|
||||
return Objects.isNull(expireTime) ? 0 : expireTime;
|
||||
}
|
||||
/**
|
||||
* 判断key是否存在
|
||||
*/
|
||||
public boolean hasKey(String key) {
|
||||
Boolean hasKeyFlag = redisTemplate.hasKey(key);
|
||||
if (Objects.isNull(hasKeyFlag)) {
|
||||
return false;
|
||||
}
|
||||
return hasKeyFlag;
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除某个Key
|
||||
*/
|
||||
public void delete(String key) {
|
||||
redisTemplate.delete(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量删除keys
|
||||
*/
|
||||
public void deleteKeys(String... keys) {
|
||||
redisTemplate.delete(Arrays.asList(keys));
|
||||
}
|
||||
|
||||
/**
|
||||
* 指定字符模糊匹配,批量删除keys
|
||||
*/
|
||||
public void deleteKeysByString(String str) {
|
||||
Set<String> keys = redisTemplate.keys(str.concat("*"));
|
||||
// 删除所有匹配的key
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
redisTemplate.delete(keys);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取key对应的字符数据
|
||||
*/
|
||||
public String getStringByKey(String key) {
|
||||
return (String) redisTemplate.opsForValue().get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key和库索引获取内容字(切库之后,之后查询都是切库数据)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key key值
|
||||
*/
|
||||
public String getStringByKey(Integer dbIndex,String key) {
|
||||
return getStringByKey(dbIndex,key,true);
|
||||
}
|
||||
/**
|
||||
* 根据key和库索引获取内容字(切库之后,之后查询都是切库数据)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key key值
|
||||
* @param fly 是否切回原库
|
||||
*/
|
||||
public String getStringByKey(Integer dbIndex,String key,Boolean fly) {
|
||||
Integer index = setDbIndex(dbIndex);
|
||||
String s = (String) redisTemplate.opsForValue().get(key);
|
||||
if(fly){
|
||||
setDbIndex(index);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取key对应对象数据
|
||||
*/
|
||||
public Object getObjectByKey(String key) {
|
||||
return redisTemplate.opsForValue().get(key);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 保存数据
|
||||
*
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
*/
|
||||
public void saveByKey(String key, Object value) {
|
||||
redisTemplate.boundValueOps(key).set(value);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存数据,指定生命周期(秒)
|
||||
*
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param expireTime 生命时间
|
||||
*/
|
||||
public void saveByKeyWithExpire(String key, Object value, Long expireTime) {
|
||||
if (expireTime <= 0) {
|
||||
saveByKey(key, value);
|
||||
} else {
|
||||
redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 保存数据,指定生命周期(秒)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param expireTime 生命时间
|
||||
* @param expireTime 生命时间
|
||||
*/
|
||||
public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime) {
|
||||
saveByKeyWithExpire(dbIndex,key,value,expireTime,true);
|
||||
}
|
||||
/**
|
||||
* 保存数据,指定生命周期(秒)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param fly 是否切回原库
|
||||
*/
|
||||
public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime,Boolean fly) {
|
||||
Integer index = setDbIndex(dbIndex);
|
||||
if (expireTime <= 0) {
|
||||
saveByKey(key, value);
|
||||
} else {
|
||||
redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS);
|
||||
}
|
||||
if(fly){
|
||||
setDbIndex(index);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 顺序的递增和递减
|
||||
*
|
||||
* @param value 增减根据数值的正负来判断
|
||||
*/
|
||||
public void increment(String key, Long value) {
|
||||
redisTemplate.boundValueOps(key).increment(value);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 添加一个Map集合
|
||||
*/
|
||||
public void saveMapValue(String key, Map<String, ?> map, long expireTime) {
|
||||
redisTemplate.boundHashOps(key).putAll(map);
|
||||
if (expireTime > 0) {
|
||||
expire(key, expireTime);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取map中所有的keys
|
||||
*/
|
||||
public Set<?> getMapKeys(String key) {
|
||||
return redisTemplate.boundHashOps(key).keys();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取map中所有的values
|
||||
*/
|
||||
public List<?> getMapValues(String key) {
|
||||
return redisTemplate.boundHashOps(key).values();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据map中某个key获取对应的value
|
||||
*/
|
||||
public Object getMapValueByMapKey(String redisKey, String mapKey) {
|
||||
return redisTemplate.boundHashOps(redisKey).get(mapKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据map中的某个key删除对应的value
|
||||
*/
|
||||
public void deleteMapValueByMapKey(String redisKey, String mapKey) {
|
||||
redisTemplate.boundHashOps(redisKey).delete(mapKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断map中是否有指定的key
|
||||
*/
|
||||
public boolean hasMapKey(String redisKey, String mapKey) {
|
||||
Boolean hasKey = redisTemplate.boundHashOps(redisKey).hasKey(mapKey);
|
||||
if (Objects.isNull(hasKey)) {
|
||||
return false;
|
||||
}
|
||||
return hasKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* 右存放List
|
||||
*/
|
||||
public void saveRightListByKey(String key, List<?> values, long expireTime) {
|
||||
redisTemplate.boundListOps(key).rightPushAll(values);
|
||||
if (expireTime > 0) {
|
||||
expire(key, expireTime);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 左存放List
|
||||
*/
|
||||
public void saveLeftListByKey(String key, List<?> values, long expireTime) {
|
||||
redisTemplate.boundListOps(key).leftPushAll(values);
|
||||
if (expireTime > 0) {
|
||||
expire(key, expireTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取List某范围的值
|
||||
*
|
||||
* @param start 起始索引
|
||||
* @param end 截止索引
|
||||
*/
|
||||
public List<?> getListRangeValues(String key, long start, long end) {
|
||||
long size = getListSize(key);
|
||||
if ((start < 0 && end < 0) || (start > end)) {
|
||||
start = 0;
|
||||
end = size;
|
||||
} else if (end > size) {
|
||||
end = size;
|
||||
} else if (start < 0) {
|
||||
start = 0;
|
||||
}
|
||||
return redisTemplate.boundListOps(key).range(start, end);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取List所有数据
|
||||
*/
|
||||
public List<?> getListAllValues(String key) {
|
||||
long size = getListSize(key);
|
||||
return redisTemplate.boundListOps(key).range(0, size);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据指定索引获取指定value
|
||||
*/
|
||||
public Object getListValueByIndex(String key, int index) {
|
||||
return redisTemplate.boundListOps(key).index(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取list长度
|
||||
*/
|
||||
public Long getListSize(String key) {
|
||||
return Objects.isNull(redisTemplate.boundListOps(key).size()) ? 0 : redisTemplate.boundListOps(key).size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据索引修改某个值
|
||||
*/
|
||||
public void updateListValueByIndex(String key, int index, Object newObj) {
|
||||
redisTemplate.boundListOps(key).set(index, newObj);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据某个key模糊查询,并取出value
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public List<?> getLikeListAllValues(String key) {
|
||||
List<Object> info=new ArrayList<>();
|
||||
for (String s : redisTemplate.keys(key + "*")) {
|
||||
info.add(redisTemplate.opsForValue().get(s));
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据切库
|
||||
* @param dbIndex
|
||||
* @return
|
||||
*/
|
||||
private Integer setDbIndex(Integer dbIndex) {
|
||||
if (dbIndex == null || dbIndex > 15 || dbIndex < 0) {
|
||||
dbIndex = 0;
|
||||
}
|
||||
LettuceConnectionFactory jedisConnectionFactory = (LettuceConnectionFactory) redisTemplate
|
||||
.getConnectionFactory();
|
||||
int database = jedisConnectionFactory.getDatabase();
|
||||
jedisConnectionFactory.setDatabase(dbIndex);
|
||||
redisTemplate.setConnectionFactory(jedisConnectionFactory);
|
||||
//需要刷新才能生效
|
||||
jedisConnectionFactory.afterPropertiesSet();
|
||||
// 重置连接
|
||||
jedisConnectionFactory.resetConnection();
|
||||
return database;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.njcn.influx.imapper;
|
||||
|
||||
import com.njcn.influx.base.InfluxDbBaseMapper;
|
||||
import com.njcn.influx.bo.po.InfluxDBComInfoRmation;
|
||||
|
||||
public interface InfluxDBComInfoRmationMapper extends InfluxDbBaseMapper<InfluxDBComInfoRmation> {
|
||||
}
|
||||
@@ -16,10 +16,7 @@ import org.springframework.stereotype.Service;
|
||||
import java.math.BigDecimal;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@@ -78,7 +75,10 @@ public class OracleEventDetailToMysqlServiceImpl extends ServiceImpl<RmpEventDet
|
||||
po.setNum(oracleDetail.getNum());
|
||||
po.setFileFlag(oracleDetail.getFileFlag());
|
||||
po.setDealFlag(oracleDetail.getDealFlag());
|
||||
po.setFirstTime(oracleDetail.getFirstTime());
|
||||
|
||||
if(Objects.nonNull(oracleDetail.getFirstTime())) {
|
||||
po.setFirstTime(oracleDetail.getFirstTime().plus(oracleDetail.getFirstMs().intValue(), ChronoUnit.MILLIS));
|
||||
}
|
||||
po.setFirstType(oracleDetail.getFirstType());
|
||||
po.setFirstMs(oracleDetail.getFirstMs());
|
||||
po.setEnergy(oracleDetail.getEnergy());
|
||||
|
||||
@@ -5,12 +5,13 @@ import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.njcn.influx.bo.po.*;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.config.RedisUtil;
|
||||
import com.njcn.influx.mapper.OracleRmpEventDetailPOMapper;
|
||||
import com.njcn.influx.mapper.PqDeviceMapper;
|
||||
import com.njcn.influx.mapper.RmpEventDetailPOMapper;
|
||||
import com.njcn.influx.service.*;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
@@ -29,6 +30,9 @@ import java.util.stream.Collectors;
|
||||
@Service
|
||||
public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatusToMysqlService {
|
||||
|
||||
@Value("${business.type}")
|
||||
private Integer systemType;
|
||||
|
||||
private final RmpEventDetailPOMapper rmpEventDetailPOMapper;
|
||||
private final IPqDeviceBakService pqDeviceBakService;
|
||||
private final PqDeviceMapper pqDeviceMapper;
|
||||
@@ -39,6 +43,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
|
||||
private final OraclePqLineDetailService oraclePqLineDetailService;
|
||||
private final IPqLineDetailService pqLineDetailService;
|
||||
private final OracleRmpEventDetailPOMapper oracleRmpEventDetailPOMapper;
|
||||
private final RedisUtil redisUtil;
|
||||
|
||||
/**
|
||||
* 1.查询oracle装置表信息
|
||||
@@ -47,6 +52,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
|
||||
*/
|
||||
@Override
|
||||
public void monitorStatusSync() {
|
||||
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
|
||||
List<DictData> monitorDicList = rmpEventDetailPOMapper.selectByDicCodeList("Line_State");
|
||||
List<DictData> devDicList = rmpEventDetailPOMapper.selectByDicCodeList("Dev_Status");
|
||||
List<PqDeviceBak> list = pqDeviceBakService.list();
|
||||
@@ -54,48 +60,53 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
|
||||
Map<Long, String> oracleDevMysql = list.stream().collect(Collectors.toMap(PqDeviceBak::getDevId, PqDeviceBak::getId));
|
||||
List<PqDevice> pqDevices = pqDeviceMapper.selectList(null);
|
||||
//pq的设备表更改通信状态
|
||||
List<PqDeviceMysql> tempList = new ArrayList<>();
|
||||
pqDevices.stream().forEach(temp->{
|
||||
String id ="";
|
||||
if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
return;
|
||||
}else {
|
||||
id=IdMappingCache.DevIdMapping.get(temp.getDevIndex()+"");
|
||||
}
|
||||
pqDeviceMysqlService.update(new LambdaUpdateWrapper<PqDeviceMysql>()
|
||||
.set(PqDeviceMysql::getRunFlag,temp.getDevFlag())
|
||||
.set(PqDeviceMysql::getComFlag,temp.getStatus())
|
||||
.set(PqDeviceMysql::getUpdateTime,temp.getUpdateTime())
|
||||
.eq(PqDeviceMysql::getId,id));
|
||||
|
||||
});
|
||||
|
||||
Map<Integer, List<Long>> oracleDevMap = pqDevices.stream().collect(Collectors.groupingBy(PqDevice::getDevFlag
|
||||
, Collectors.mapping(PqDevice::getDevIndex, Collectors.toList())));
|
||||
oracleDevMap.forEach((key, value) -> {
|
||||
List<String> info = new ArrayList<>();
|
||||
for (Long aLong : value) {
|
||||
if (oracleDevMysql.containsKey(aLong)) {
|
||||
info.add(oracleDevMysql.get(aLong));
|
||||
if(systemType == 0){
|
||||
List<PqDeviceMysql> tempList = new ArrayList<>();
|
||||
pqDevices.stream().forEach(temp->{
|
||||
String id ="";
|
||||
if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
return;
|
||||
}else {
|
||||
id=DevIdMapping.get(temp.getDevIndex()+"");
|
||||
}
|
||||
}
|
||||
String devStatus = devStatusId(key, devDicList);
|
||||
String monitorStatus = monitorStatusId(key, monitorDicList);
|
||||
if (StrUtil.isNotBlank(devStatus) && CollUtil.isNotEmpty(info)) {
|
||||
//修改终端状态
|
||||
pmsTerminalService.update(new LambdaUpdateWrapper<PmsTerminal>()
|
||||
.set(PmsTerminal::getTerminalState, devStatusId(key, devDicList))
|
||||
.in(PmsTerminal::getId, info)
|
||||
);
|
||||
}
|
||||
if (StrUtil.isNotBlank(monitorStatus) && CollUtil.isNotEmpty(info)) {
|
||||
//修改监测点状态
|
||||
pmsMonitorService.update(new LambdaUpdateWrapper<PmsMonitor>()
|
||||
.set(PmsMonitor::getMonitorState, monitorStatusId(key, monitorDicList))
|
||||
.in(PmsMonitor::getTerminalId, info)
|
||||
);
|
||||
}
|
||||
});
|
||||
pqDeviceMysqlService.update(new LambdaUpdateWrapper<PqDeviceMysql>()
|
||||
.set(PqDeviceMysql::getRunFlag,temp.getDevFlag())
|
||||
.set(PqDeviceMysql::getComFlag,temp.getStatus())
|
||||
.set(PqDeviceMysql::getUpdateTime,temp.getUpdateTime())
|
||||
.eq(PqDeviceMysql::getId,id));
|
||||
|
||||
});
|
||||
}else {
|
||||
Map<Integer, List<Long>> oracleDevMap = pqDevices.stream().collect(Collectors.groupingBy(PqDevice::getDevFlag
|
||||
, Collectors.mapping(PqDevice::getDevIndex, Collectors.toList())));
|
||||
oracleDevMap.forEach((key, value) -> {
|
||||
List<String> info = new ArrayList<>();
|
||||
for (Long aLong : value) {
|
||||
if (oracleDevMysql.containsKey(aLong)) {
|
||||
info.add(oracleDevMysql.get(aLong));
|
||||
}
|
||||
}
|
||||
String devStatus = devStatusId(key, devDicList);
|
||||
String monitorStatus = monitorStatusId(key, monitorDicList);
|
||||
if (StrUtil.isNotBlank(devStatus) && CollUtil.isNotEmpty(info)) {
|
||||
//修改终端状态
|
||||
pmsTerminalService.update(new LambdaUpdateWrapper<PmsTerminal>()
|
||||
.set(PmsTerminal::getTerminalState, devStatusId(key, devDicList))
|
||||
.in(PmsTerminal::getId, info)
|
||||
);
|
||||
}
|
||||
if (StrUtil.isNotBlank(monitorStatus) && CollUtil.isNotEmpty(info)) {
|
||||
//修改监测点状态
|
||||
pmsMonitorService.update(new LambdaUpdateWrapper<PmsMonitor>()
|
||||
.set(PmsMonitor::getMonitorState, monitorStatusId(key, monitorDicList))
|
||||
.in(PmsMonitor::getTerminalId, info)
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -156,6 +167,12 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
|
||||
.set(PqLineDetail::getDealCapacity, oraclePqLineDetail.getXycMp())
|
||||
.set(PqLineDetail::getPtType, oraclePqLineDetail.getPttype())
|
||||
.set(PqLineDetail::getTimeInterval, oraclePqLineDetail.getTinterval())
|
||||
|
||||
.set(PqLineDetail::getPowerFlag, oraclePqLineDetail.getPowerid())
|
||||
.set(PqLineDetail::getObjName, oraclePqLineDetail.getObjname())
|
||||
.set(PqLineDetail::getMonitorId, oraclePqLineDetail.getMonitorId())
|
||||
.set(PqLineDetail::getStatFlag, oraclePqLineDetail.getStatflag())
|
||||
.set(PqLineDetail::getPowerSubstationName, oraclePqLineDetail.getPowerSubstationName())
|
||||
.eq(PqLineDetail::getId, line.getId());
|
||||
if(loadTypeMap.containsKey(oraclePqLineDetail.getLoadtype())){
|
||||
DictData loadType= rmpEventDetailPOMapper.getDicDataByNameAndTypeName("干扰源类型", loadTypeMap.get(oraclePqLineDetail.getLoadtype()));
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
package com.njcn.influx.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import cn.hutool.core.text.StrPool;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.njcn.influx.bo.param.TableEnum;
|
||||
import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
|
||||
import com.njcn.influx.bo.po.JobDetailInfluxDB;
|
||||
import com.njcn.influx.bo.po.JobHistoryLogInfluxdb;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.config.RedisUtil;
|
||||
import com.njcn.influx.service.JobDetailHoursInfluxDBService;
|
||||
import com.njcn.influx.service.JobDetailInfluxDBService;
|
||||
import com.njcn.influx.service.JobHistoryLogInfluxdbService;
|
||||
@@ -65,13 +68,14 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService;
|
||||
|
||||
private final LineTimeMapper lineTimeMapper;
|
||||
|
||||
private final RedisUtil redisUtil;
|
||||
@Value("${business.slice:2}")
|
||||
private int slice;
|
||||
|
||||
@Override
|
||||
@Async
|
||||
public void dataBacthSysc(DataAsynParam dataAsynParam) {
|
||||
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
@@ -129,11 +133,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!IdMappingCache.LineIdMapping.containsKey(id1)){
|
||||
if (!LineIdMapping.containsKey(id1)){
|
||||
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
|
||||
iterator.remove();
|
||||
}else {
|
||||
id.set(obj, IdMappingCache.LineIdMapping.get(id1));
|
||||
id.set(obj, LineIdMapping.get(id1));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
@@ -215,6 +219,9 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
//冀北现场存在数据更新时间不在1小时内,会丢失数据的情况,这边根据装置最新时间往前推1个小时查询数据
|
||||
@Override
|
||||
public void hourseDataBacthSysc(DataAsynParam dataAsynParam) {
|
||||
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
|
||||
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
|
||||
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
@@ -245,36 +252,68 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
List list = new ArrayList(Collections.emptyList());
|
||||
//获取监测点最新的数据时间,单监测点查询数据
|
||||
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
|
||||
lineTimeList.forEach(item->{
|
||||
if("ComInfoRmation".equals(tableName)){
|
||||
MigrationParam migration = new MigrationParam();
|
||||
migration.setLineIds(Collections.singletonList(item.getLineIndex()));
|
||||
migration.setStartTime(item.getUpdateTime().minusHours(2));
|
||||
migration.setEndTime(item.getUpdateTime());
|
||||
System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
|
||||
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
migration.setStartTime(dataAsynParam.getStartDateTime());
|
||||
migration.setEndTime(dataAsynParam.getEndDateTime());
|
||||
System.out.println("执行扫描起始时间------------------------------------"+dataAsynParam.getStartDateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
System.out.println("执行扫描结束时间------------------------------------"+dataAsynParam.getEndDateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
list.addAll(executor.queryData(migration));
|
||||
});
|
||||
System.out.println("查询到的数据++++++++++++++"+list.size());
|
||||
//反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid
|
||||
Iterator iterator = list.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
try{
|
||||
Object obj = iterator.next();
|
||||
//获取
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!IdMappingCache.LineIdMapping.containsKey(id1)){
|
||||
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
|
||||
iterator.remove();
|
||||
}else {
|
||||
id.set(obj, IdMappingCache.LineIdMapping.get(id1));
|
||||
System.out.println("查询到的数据++++++++++++++"+list.size());
|
||||
//反射获取linid的值并把linid的值替换成mysql对应的devid,并记录未匹配的devid
|
||||
Iterator iterator = list.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
try{
|
||||
Object obj = iterator.next();
|
||||
//获取
|
||||
Field id = obj.getClass().getDeclaredField("lineIndex");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
Object o = id.get(obj);
|
||||
if(ObjectUtil.isNotNull(o)){
|
||||
int index = Integer.parseInt(o.toString())/10;
|
||||
if (!DevIdMapping.containsKey(index+"")){
|
||||
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到devid匹配的devid"+index);
|
||||
iterator.remove();
|
||||
}else {
|
||||
id.set(obj, DevIdMapping.get(index+""));
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}else{
|
||||
//获取监测点最新的数据时间,单监测点查询数据
|
||||
List<lineTimeDto> lineTimeList = lineTimeMapper.getLineTime();
|
||||
lineTimeList.forEach(item->{
|
||||
MigrationParam migration = new MigrationParam();
|
||||
migration.setLineIds(Collections.singletonList(item.getLineIndex()));
|
||||
migration.setStartTime(item.getUpdateTime().minusHours(2));
|
||||
migration.setEndTime(item.getUpdateTime());
|
||||
System.out.println("当前监测点为------------------------------------"+item.getLineIndex());
|
||||
System.out.println("执行扫描起始时间------------------------------------"+item.getUpdateTime().minusHours(2).format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
System.out.println("执行扫描结束时间------------------------------------"+item.getUpdateTime().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
|
||||
list.addAll(executor.queryData(migration));
|
||||
});
|
||||
System.out.println("查询到的数据++++++++++++++"+list.size());
|
||||
//反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid
|
||||
Iterator iterator = list.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
try{
|
||||
Object obj = iterator.next();
|
||||
//获取
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!LineIdMapping.containsKey(id1)){
|
||||
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
|
||||
iterator.remove();
|
||||
}else {
|
||||
id.set(obj,LineIdMapping.get(id1));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
//采用弱引用接受,后续手动调用gc后,会清空该对象
|
||||
@@ -351,9 +390,130 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
DataAsynParam dataAsynParam1 = new DataAsynParam();
|
||||
dataAsynParam1.setEndDateTime(startDateTime1.minusHours(-1).minusSeconds(1));
|
||||
dataAsynParam1.setStartDateTime(startDateTime1);
|
||||
dataAsynParam1.setTableNames(TableEnum.getExecutableTypes());
|
||||
if(CollUtil.isEmpty(dataAsynParam.getTableNames())){
|
||||
dataAsynParam1.setTableNames(TableEnum.getExecutableTypes());
|
||||
}else{
|
||||
dataAsynParam1.setTableNames(dataAsynParam.getTableNames());
|
||||
}
|
||||
log.info("执行"+startDateTime1+"时刻数据");
|
||||
this.hourseDataBacthSysc(dataAsynParam1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void oneMonitorDataTransport(DataAsynParam dataAsynParam) {
|
||||
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
|
||||
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||
//目前且作为2片,后续将该属性提取到配置文件中
|
||||
List<String> tableNames = dataAsynParam.getTableNames();
|
||||
//嵌套循环,先循环指标,再循环日期
|
||||
tableNames.stream().forEach(tableName -> {
|
||||
IReplenishMybatisService executor;
|
||||
try {
|
||||
executor = (IReplenishMybatisService) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + tableName + PACKAGE_SUFFIX));
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
List<LocalDate> dateList = LocalDateUtil.getDateList(dataAsynParam.getStartTime(), dataAsynParam.getEndTime());
|
||||
for (LocalDate date : dateList) {
|
||||
|
||||
//分片下,每段时间的小时数
|
||||
int sliceHour = 24 / slice;
|
||||
for (int i = 0; i < slice; i++) {
|
||||
String dateStr = LocalDateTimeUtil.format(date, DatePattern.NORM_DATE_PATTERN);
|
||||
LocalDateTime startTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * i < 10 ? "0" + sliceHour * i : sliceHour * i) + ":00:00", DatePattern.NORM_DATETIME_PATTERN);
|
||||
LocalDateTime endTime = LocalDateTimeUtil.parse(dateStr + StrPool.C_SPACE + (sliceHour * (i + 1) - 1 < 10 ? "0" + (sliceHour * (i + 1) - 1) : sliceHour * (i + 1) - 1) + ":59:59", DatePattern.NORM_DATETIME_PATTERN);
|
||||
//查询该时区的数据,并准备入库
|
||||
MigrationParam migration = new MigrationParam();
|
||||
migration.setStartTime(startTime);
|
||||
migration.setEndTime(endTime);
|
||||
if (Objects.nonNull(dataAsynParam.getMonitorId())) {
|
||||
migration.setLineIds(Stream.of(dataAsynParam.getMonitorId()).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
List list = executor.queryData(migration);
|
||||
//反射獲取linid的值并把linid的值替换成mysql对应的linid,并记录未匹配的lineid
|
||||
Iterator iterator = list.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
try {
|
||||
Object obj = iterator.next();
|
||||
//获取
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!LineIdMapping.containsKey(id1)) {
|
||||
log.info(tableName + "表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid" + id1);
|
||||
iterator.remove();
|
||||
} else {
|
||||
id.set(obj, LineIdMapping.get(id1));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//采用弱引用接受,后续手动调用gc后,会清空该对象
|
||||
WeakReference<List> weakReferenceData = new WeakReference<>(list);
|
||||
int size = 0;
|
||||
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
|
||||
size = weakReferenceData.get().size();
|
||||
}
|
||||
System.out.println(tableName + "查到" + size + "条数据后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println(tableName + "查到" + size + "条数据后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
System.out.println(tableName + "查到" + size + "条数据后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||
try {
|
||||
if (CollectionUtil.isNotEmpty(weakReferenceData.get())) {
|
||||
//执行目标库的数据处理
|
||||
Class<?> clazz;
|
||||
Class<?> clazz2;
|
||||
//获取Table表对应的influxdb对应的表的实体类调用oralceToInfluxDB方法及oralceToInfluxDB的入参clazz2
|
||||
try {
|
||||
clazz = Class.forName("com.njcn.influx.bo.po.InfluxDB" + tableName);
|
||||
clazz2 = Class.forName("com.njcn.oracle.bo.po." + tableName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
Method method;
|
||||
try {
|
||||
method = clazz.getDeclaredMethod("oralceToInfluxDB", clazz2);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
method.setAccessible(true);
|
||||
Method finalMethod = method;
|
||||
List list1 = (List) weakReferenceData.get().stream().flatMap(po -> {
|
||||
try {
|
||||
Object invoke = finalMethod.invoke(null, po);
|
||||
Object invoke1 = invoke;
|
||||
//返回oracle转influx,flicker等表是1-1,还有1-4,这是判断返回是否是集合如何是集合继续扁平化
|
||||
return invoke1 instanceof List ? ((List<?>) invoke1).stream() : Stream.of(invoke1);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
//插入influxdb
|
||||
influxDBBaseService.insertBatchBySlice(tableName, list1, 10000);
|
||||
// size = list1.size();
|
||||
//最后一片时修改状态
|
||||
}
|
||||
//手动执行GC
|
||||
System.gc();
|
||||
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
System.out.println("执行后总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("执行后已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
System.out.println("执行后空闲堆内存为:" + runtime.freeMemory() / (1024 * 1024) + " MB");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,30 +3,24 @@ package com.njcn.influx.service.impl;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.njcn.influx.bo.param.TableEnum;
|
||||
import com.njcn.influx.bo.po.PqsOnlinerateMysql;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.service.IPqDeviceBakService;
|
||||
import com.njcn.influx.config.RedisUtil;
|
||||
import com.njcn.influx.service.PqsOnlinerateMysqlService;
|
||||
import com.njcn.oracle.bo.param.DataAsynParam;
|
||||
import com.njcn.oracle.bo.po.DataFlicker;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.Map;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.influx.bo.po.PqsOnlineratePO;
|
||||
import com.njcn.influx.mapper.PqsOnlineratePOMapper;
|
||||
import com.njcn.influx.service.PqsOnlineratePOService;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -41,9 +35,10 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
@Slf4j
|
||||
public class PqsOnlineratePOServiceImpl extends ServiceImpl<PqsOnlineratePOMapper, PqsOnlineratePO> implements PqsOnlineratePOService{
|
||||
private final PqsOnlinerateMysqlService pqsOnlinerateMysqlService;
|
||||
private final RedisUtil redisUtil;
|
||||
@Override
|
||||
public void minutesDataBacthSysc(DataAsynParam dataAsynParam) {
|
||||
|
||||
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
|
||||
List<PqsOnlineratePO> list = this.lambdaQuery().between(PqsOnlineratePO::getTimeid, LocalDateTimeUtil.beginOfDay(dataAsynParam.getStartDateTime()), LocalDateTimeUtil.endOfDay(dataAsynParam.getStartDateTime()).minusSeconds(1)).list();
|
||||
log.info(dataAsynParam.getStartDateTime()+"-----数据量:"+list.size());
|
||||
if (CollectionUtil.isNotEmpty(list)) {
|
||||
@@ -51,10 +46,10 @@ public class PqsOnlineratePOServiceImpl extends ServiceImpl<PqsOnlineratePOMappe
|
||||
list.stream().forEach(temp -> {
|
||||
PqsOnlinerateMysql pqsOnlinerateMysql = new PqsOnlinerateMysql();
|
||||
|
||||
if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
return;
|
||||
}else {
|
||||
pqsOnlinerateMysql.setDevIndex(IdMappingCache.DevIdMapping.get(temp.getDevIndex()+""));
|
||||
pqsOnlinerateMysql.setDevIndex(DevIdMapping.get(temp.getDevIndex()+""));
|
||||
}
|
||||
pqsOnlinerateMysql.setTimeId(temp.getTimeid());
|
||||
pqsOnlinerateMysql.setOnlineMin(temp.getOnlinemin());
|
||||
|
||||
@@ -19,7 +19,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
|
||||
@EnableAsync
|
||||
@DependsOn("proxyMapperRegister")
|
||||
@MapperScan("com.njcn.**.mapper")
|
||||
@SpringBootApplication(scanBasePackages = "com.njcn", exclude = {SecurityAutoConfiguration.class, SecurityFilterAutoConfiguration.class})
|
||||
@SpringBootApplication(scanBasePackages = "com.njcn")
|
||||
public class InfluxDataApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.njcn.influx.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
|
||||
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
|
||||
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
|
||||
|
||||
@Configuration
|
||||
@EnableWebSecurity
|
||||
public class Security extends WebSecurityConfigurerAdapter {
|
||||
|
||||
|
||||
@Override
|
||||
protected void configure(HttpSecurity http) throws Exception {
|
||||
http.formLogin().and().authorizeRequests().anyRequest().authenticated().and().csrf().disable();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -3,7 +3,9 @@ package com.njcn.influx.controller;
|
||||
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.njcn.influx.bo.po.JobDetailInfluxDB;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.service.JobDetailInfluxDBService;
|
||||
import com.njcn.oracle.bo.param.DataAsynParam;
|
||||
import com.njcn.oracle.bo.param.JobQueryParam;
|
||||
import com.njcn.oracle.bo.po.JobDetail;
|
||||
import com.njcn.oracle.service.JobDetailService;
|
||||
@@ -13,10 +15,7 @@ import io.swagger.annotations.ApiOperation;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
/**
|
||||
* Description:
|
||||
@@ -34,6 +33,7 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
public class JobDetailInfluxDBController {
|
||||
|
||||
private final JobDetailInfluxDBService jobDetailInfluxDBService;
|
||||
private final IdMappingCache idMappingCache;
|
||||
@PostMapping("/jobQuery")
|
||||
@ApiOperation("任务查询")
|
||||
@ApiImplicitParam(name = "jobQueryParam", value = "任务查询参数", required = true)
|
||||
@@ -48,4 +48,12 @@ public class JobDetailInfluxDBController {
|
||||
boolean b = jobDetailInfluxDBService.jobRemove(jobDetail);
|
||||
return b;
|
||||
}
|
||||
|
||||
@GetMapping("/refreshIdCache")
|
||||
@ApiOperation("刷新缓存")
|
||||
public Boolean refreshIdCache() {
|
||||
DataAsynParam dataAsynParam = new DataAsynParam();
|
||||
idMappingCache.init();
|
||||
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.njcn.influx.controller;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.njcn.influx.service.OracleEventDetailToMysqlService;
|
||||
import com.njcn.influx.service.OracleMonitorStatusToMysqlService;
|
||||
import com.njcn.influx.service.OracleToInfluxDBService;
|
||||
@@ -16,8 +17,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Collections;
|
||||
|
||||
|
||||
/**
|
||||
@@ -59,6 +63,22 @@ public class OracleToInfluxDBController {
|
||||
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
|
||||
}
|
||||
|
||||
@GetMapping("/dataSyncTable")
|
||||
@ApiOperation("数据同步")
|
||||
public Boolean dataSyncTable(@RequestParam("startDateTime") String startDateTime,
|
||||
@RequestParam("endDateTime") String endDateTime,
|
||||
@RequestParam("tableName") String tableName
|
||||
) {
|
||||
DataAsynParam dataAsynParam = new DataAsynParam();
|
||||
dataAsynParam.setStartDateTime(LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATETIME_PATTERN));
|
||||
dataAsynParam.setEndDateTime(LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATETIME_PATTERN));
|
||||
if(StrUtil.isNotBlank(tableName)){
|
||||
dataAsynParam.setTableNames(Collections.singletonList(tableName));
|
||||
}
|
||||
oracleToInfluxDBService.AsyncData(dataAsynParam);
|
||||
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
|
||||
}
|
||||
|
||||
|
||||
|
||||
@PostMapping("/oneMonitorDataTransport")
|
||||
@@ -89,10 +109,10 @@ public class OracleToInfluxDBController {
|
||||
LocalDateTime startDate = LocalDateTimeUtil.beginOfDay(LocalDateTimeUtil.parse(startDateTime, DatePattern.NORM_DATE_PATTERN));
|
||||
LocalDateTime endDate = LocalDateTimeUtil.endOfDay(LocalDateTimeUtil.parse(endDateTime, DatePattern.NORM_DATE_PATTERN));
|
||||
long betweenDay = LocalDateTimeUtil.between(startDate, endDate, ChronoUnit.DAYS);
|
||||
oracleEventDetailToMysqlService.eventBatch(startDate, endDate);
|
||||
oracleEventDetailToMysqlService.eventBatch(startDate, LocalDateTime.of(LocalDate.from(startDate), LocalTime.MAX));
|
||||
for (int i = 0; i <=betweenDay; i++) {
|
||||
startDate = LocalDateTimeUtil.offset(startDate, 1, ChronoUnit.DAYS);
|
||||
oracleEventDetailToMysqlService.eventBatch(startDate, endDate);
|
||||
oracleEventDetailToMysqlService.eventBatch(startDate, LocalDateTime.of(LocalDate.from(startDate), LocalTime.MAX));
|
||||
}
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
|
||||
@@ -48,14 +48,14 @@ public class OracleToInfluxDBJob {
|
||||
|
||||
//每小时03分钟时执行上一个小时的数据同步
|
||||
//河北这边比较特殊,
|
||||
@Scheduled(cron="0 40 * * * ?")
|
||||
@Scheduled(cron="0 15 * * * ?")
|
||||
public void executeHours() {
|
||||
DataAsynParam dataAsynParam = new DataAsynParam();
|
||||
// 获取当前时间
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
// 减去一个小时
|
||||
LocalDateTime oneHourAgo = now.minusHours(1);
|
||||
LocalDateTime oneHourAgo = now.minusHours(2);
|
||||
|
||||
// 将分钟和秒设置为0
|
||||
LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS);
|
||||
@@ -116,8 +116,8 @@ public class OracleToInfluxDBJob {
|
||||
* 每天同步台账监测点部分信息 仅数据中心使用
|
||||
* @date 2024/3/5
|
||||
*/
|
||||
@Scheduled(cron="0 30 0 * * ?")
|
||||
/* @Scheduled(cron="0 30 0 * * ?")
|
||||
public void synLedgerMonitor() {
|
||||
oracleMonitorStatusToMysqlService.monitorTimeSync();
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
@@ -1,13 +1,20 @@
|
||||
#文件位置配置
|
||||
|
||||
business:
|
||||
#分片次数,一定为24的约数,1 2 3 4 6 8 12 24
|
||||
#分片次数,一定为24的约数,1 2 3 4 6 8 12 24
|
||||
slice: 4
|
||||
# 0.pq 1.pms
|
||||
type: 0
|
||||
server:
|
||||
port: 8090
|
||||
#springsecurity默认过期时间30m
|
||||
servlet:
|
||||
session:
|
||||
timeout: 1440m
|
||||
|
||||
spring:
|
||||
security:
|
||||
user:
|
||||
name: njcn
|
||||
name: data_njcn
|
||||
password: dnzl@#002
|
||||
#influxDB内容配置
|
||||
influx:
|
||||
@@ -76,10 +83,23 @@ spring:
|
||||
password: Pqsadmin123
|
||||
driver-class-name: oracle.jdbc.driver.OracleDriver
|
||||
target:
|
||||
url: jdbc:mysql://192.168.1.24:13306/pqsinfo_jb?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
|
||||
url: jdbc:mysql://192.168.1.102:13306/pqsinfo?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
|
||||
username: root
|
||||
password: njcnpqs
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
redis:
|
||||
database: 15
|
||||
host: 192.168.1.22
|
||||
port: 16379
|
||||
password: njcnpqs
|
||||
timeout: 5000
|
||||
lettuce:
|
||||
pool:
|
||||
max-active: 8
|
||||
max-wait: -1
|
||||
max-idle: 8
|
||||
min-idle: 0
|
||||
#不做限制的参数配置
|
||||
#mybatis配置信息
|
||||
mybatis-plus:
|
||||
#别名扫描
|
||||
@@ -89,9 +109,9 @@ mybatis-plus:
|
||||
#驼峰命名
|
||||
map-underscore-to-camel-case: true
|
||||
#配置sql日志输出
|
||||
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
|
||||
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
|
||||
#关闭日志输出
|
||||
# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
|
||||
log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
|
||||
global-config:
|
||||
db-config:
|
||||
#指定主键生成策略
|
||||
|
||||
@@ -2,9 +2,16 @@
|
||||
business:
|
||||
#分片次数,一定为24的约数,1 2 3 4 6 8 12 24
|
||||
slice: 4
|
||||
# 0.pq 1.pms
|
||||
type: 1
|
||||
|
||||
server:
|
||||
port: 8090
|
||||
spring:
|
||||
security:
|
||||
user:
|
||||
name: data_njcn
|
||||
password: dnzl@#002
|
||||
#influxDB内容配置
|
||||
influx:
|
||||
url: http://25.36.232.36:8086
|
||||
|
||||
@@ -2,9 +2,15 @@
|
||||
business:
|
||||
#分片次数,一定为24的约数,1 2 3 4 6 8 12 24
|
||||
slice: 4
|
||||
# 0.pq 1.pms
|
||||
type: 0
|
||||
server:
|
||||
port: 8090
|
||||
spring:
|
||||
security:
|
||||
user:
|
||||
name: data_njcn
|
||||
password: dnzl@#002
|
||||
#influxDB内容配置
|
||||
influx:
|
||||
url: http://192.168.1.102:8086
|
||||
|
||||
243
influx-data/influx-target/src/main/resources/static/base.js
Normal file
243
influx-data/influx-target/src/main/resources/static/base.js
Normal file
@@ -0,0 +1,243 @@
|
||||
/**
|
||||
* Created by SLICE_30_K on 2017/5/22.
|
||||
*
|
||||
* 支持一般Base64的编码和解码
|
||||
* 支持符合RFC_4648标准中"URL and Filename Safe Alphabet"的URL安全Base64编解码
|
||||
* 支持中文字符的编解码(Unicode编码)
|
||||
*/
|
||||
;(function (root, factory) {
|
||||
if (typeof exports === "object") {
|
||||
// CommonJS
|
||||
module.exports = exports = factory();
|
||||
}
|
||||
else if (typeof define === "function" && define.amd) {
|
||||
// AMD
|
||||
define(factory);
|
||||
}
|
||||
else {
|
||||
// Global (browser)
|
||||
window.BASE64 = factory();
|
||||
}
|
||||
}(this, function () {
|
||||
var BASE64_MAPPING = [
|
||||
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
|
||||
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
|
||||
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
|
||||
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
|
||||
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
|
||||
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
|
||||
'w', 'x', 'y', 'z', '0', '1', '2', '3',
|
||||
'4', '5', '6', '7', '8', '9', '+', '/'
|
||||
];
|
||||
var URLSAFE_BASE64_MAPPING = [
|
||||
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
|
||||
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
|
||||
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
|
||||
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
|
||||
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
|
||||
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
|
||||
'w', 'x', 'y', 'z', '0', '1', '2', '3',
|
||||
'4', '5', '6', '7', '8', '9', '-', '_'
|
||||
];
|
||||
|
||||
var _toBinary = function (ascii) {
|
||||
var binary = [];
|
||||
while (ascii > 0) {
|
||||
var b = ascii % 2;
|
||||
ascii = Math.floor(ascii / 2);
|
||||
binary.push(b);
|
||||
}
|
||||
binary.reverse();
|
||||
return binary;
|
||||
};
|
||||
|
||||
var _toDecimal = function (binary) {
|
||||
var dec = 0;
|
||||
var p = 0;
|
||||
for (var i = binary.length - 1; i >= 0; --i) {
|
||||
var b = binary[i];
|
||||
if (b == 1) {
|
||||
dec += Math.pow(2, p);
|
||||
}
|
||||
++p;
|
||||
}
|
||||
return dec;
|
||||
};
|
||||
|
||||
var _toUTF8Binary = function (c, binaryArray) {
|
||||
var mustLen = (8 - (c + 1)) + ((c - 1) * 6);
|
||||
var fatLen = binaryArray.length;
|
||||
var diff = mustLen - fatLen;
|
||||
while (--diff >= 0) {
|
||||
binaryArray.unshift(0);
|
||||
}
|
||||
var binary = [];
|
||||
var _c = c;
|
||||
while (--_c >= 0) {
|
||||
binary.push(1);
|
||||
}
|
||||
binary.push(0);
|
||||
var i = 0, len = 8 - (c + 1);
|
||||
for (; i < len; ++i) {
|
||||
binary.push(binaryArray[i]);
|
||||
}
|
||||
|
||||
for (var j = 0; j < c - 1; ++j) {
|
||||
binary.push(1);
|
||||
binary.push(0);
|
||||
var sum = 6;
|
||||
while (--sum >= 0) {
|
||||
binary.push(binaryArray[i++]);
|
||||
}
|
||||
}
|
||||
return binary;
|
||||
};
|
||||
|
||||
var _toBinaryArray = function (str) {
|
||||
var binaryArray = [];
|
||||
for (var i = 0, len = str.length; i < len; ++i) {
|
||||
var unicode = str.charCodeAt(i);
|
||||
var _tmpBinary = _toBinary(unicode);
|
||||
if (unicode < 0x80) {
|
||||
var _tmpdiff = 8 - _tmpBinary.length;
|
||||
while (--_tmpdiff >= 0) {
|
||||
_tmpBinary.unshift(0);
|
||||
}
|
||||
binaryArray = binaryArray.concat(_tmpBinary);
|
||||
} else if (unicode >= 0x80 && unicode <= 0x7FF) {
|
||||
binaryArray = binaryArray.concat(_toUTF8Binary(2, _tmpBinary));
|
||||
} else if (unicode >= 0x800 && unicode <= 0xFFFF) {//UTF-8 3byte
|
||||
binaryArray = binaryArray.concat(_toUTF8Binary(3, _tmpBinary));
|
||||
} else if (unicode >= 0x10000 && unicode <= 0x1FFFFF) {//UTF-8 4byte
|
||||
binaryArray = binaryArray.concat(_toUTF8Binary(4, _tmpBinary));
|
||||
} else if (unicode >= 0x200000 && unicode <= 0x3FFFFFF) {//UTF-8 5byte
|
||||
binaryArray = binaryArray.concat(_toUTF8Binary(5, _tmpBinary));
|
||||
} else if (unicode >= 4000000 && unicode <= 0x7FFFFFFF) {//UTF-8 6byte
|
||||
binaryArray = binaryArray.concat(_toUTF8Binary(6, _tmpBinary));
|
||||
}
|
||||
}
|
||||
return binaryArray;
|
||||
};
|
||||
|
||||
var _toUnicodeStr = function (binaryArray) {
|
||||
var unicode;
|
||||
var unicodeBinary = [];
|
||||
var str = "";
|
||||
for (var i = 0, len = binaryArray.length; i < len;) {
|
||||
if (binaryArray[i] == 0) {
|
||||
unicode = _toDecimal(binaryArray.slice(i, i + 8));
|
||||
str += String.fromCharCode(unicode);
|
||||
i += 8;
|
||||
} else {
|
||||
var sum = 0;
|
||||
while (i < len) {
|
||||
if (binaryArray[i] == 1) {
|
||||
++sum;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
++i;
|
||||
}
|
||||
unicodeBinary = unicodeBinary.concat(binaryArray.slice(i + 1, i + 8 - sum));
|
||||
i += 8 - sum;
|
||||
while (sum > 1) {
|
||||
unicodeBinary = unicodeBinary.concat(binaryArray.slice(i + 2, i + 8));
|
||||
i += 8;
|
||||
--sum;
|
||||
}
|
||||
unicode = _toDecimal(unicodeBinary);
|
||||
str += String.fromCharCode(unicode);
|
||||
unicodeBinary = [];
|
||||
}
|
||||
}
|
||||
return str;
|
||||
};
|
||||
|
||||
var _encode = function (str, url_safe) {
|
||||
var base64_Index = [];
|
||||
var binaryArray = _toBinaryArray(str);
|
||||
var dictionary = url_safe ? URLSAFE_BASE64_MAPPING : BASE64_MAPPING;
|
||||
|
||||
var extra_Zero_Count = 0;
|
||||
for (var i = 0, len = binaryArray.length; i < len; i += 6) {
|
||||
var diff = (i + 6) - len;
|
||||
if (diff == 2) {
|
||||
extra_Zero_Count = 2;
|
||||
} else if (diff == 4) {
|
||||
extra_Zero_Count = 4;
|
||||
}
|
||||
var _tmpExtra_Zero_Count = extra_Zero_Count;
|
||||
while (--_tmpExtra_Zero_Count >= 0) {
|
||||
binaryArray.push(0);
|
||||
}
|
||||
base64_Index.push(_toDecimal(binaryArray.slice(i, i + 6)));
|
||||
}
|
||||
|
||||
var base64 = '';
|
||||
for (var i = 0, len = base64_Index.length; i < len; ++i) {
|
||||
base64 += dictionary[base64_Index[i]];
|
||||
}
|
||||
|
||||
for (var i = 0, len = extra_Zero_Count / 2; i < len; ++i) {
|
||||
base64 += '=';
|
||||
}
|
||||
return base64;
|
||||
};
|
||||
|
||||
var _decode = function (_base64Str, url_safe) {
|
||||
var _len = _base64Str.length;
|
||||
var extra_Zero_Count = 0;
|
||||
var dictionary = url_safe ? URLSAFE_BASE64_MAPPING : BASE64_MAPPING;
|
||||
|
||||
if (_base64Str.charAt(_len - 1) == '=') {
|
||||
if (_base64Str.charAt(_len - 2) == '=') {//两个等号说明补了4个0
|
||||
extra_Zero_Count = 4;
|
||||
_base64Str = _base64Str.substring(0, _len - 2);
|
||||
} else {//一个等号说明补了2个0
|
||||
extra_Zero_Count = 2;
|
||||
_base64Str = _base64Str.substring(0, _len - 1);
|
||||
}
|
||||
}
|
||||
|
||||
var binaryArray = [];
|
||||
for (var i = 0, len = _base64Str.length; i < len; ++i) {
|
||||
var c = _base64Str.charAt(i);
|
||||
for (var j = 0, size = dictionary.length; j < size; ++j) {
|
||||
if (c == dictionary[j]) {
|
||||
var _tmp = _toBinary(j);
|
||||
/*不足6位的补0*/
|
||||
var _tmpLen = _tmp.length;
|
||||
if (6 - _tmpLen > 0) {
|
||||
for (var k = 6 - _tmpLen; k > 0; --k) {
|
||||
_tmp.unshift(0);
|
||||
}
|
||||
}
|
||||
binaryArray = binaryArray.concat(_tmp);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (extra_Zero_Count > 0) {
|
||||
binaryArray = binaryArray.slice(0, binaryArray.length - extra_Zero_Count);
|
||||
}
|
||||
var str = _toUnicodeStr(binaryArray);
|
||||
return str;
|
||||
};
|
||||
|
||||
var __BASE64 = {
|
||||
encode: function (str) {
|
||||
return _encode(str, false);
|
||||
},
|
||||
decode: function (base64Str) {
|
||||
return _decode(base64Str, false);
|
||||
},
|
||||
urlsafe_encode: function (str) {
|
||||
return _encode(str, true);
|
||||
},
|
||||
urlsafe_decode: function (base64Str) {
|
||||
return _decode(base64Str, true);
|
||||
}
|
||||
};
|
||||
|
||||
return __BASE64;
|
||||
}));
|
||||
@@ -8,6 +8,7 @@
|
||||
<script src="./vue.js"></script>
|
||||
<script src="./element.js"></script>
|
||||
<script src="./locale.js"></script>
|
||||
<script src="./base.js"></script>
|
||||
</head>
|
||||
<body>
|
||||
<div id="app" >
|
||||
@@ -126,7 +127,7 @@
|
||||
<script>
|
||||
const { createApp, ref, reactive } = Vue
|
||||
createApp({
|
||||
setup() {
|
||||
setup: function () {
|
||||
const tableData = ref([])
|
||||
const message = ref('Hello Vue!')
|
||||
// 格式化时间YYYY-MM-DD
|
||||
@@ -145,18 +146,18 @@
|
||||
const formInline = reactive({
|
||||
loading: true,
|
||||
states: ['0', '1'],
|
||||
checkAll:true,
|
||||
checkAll: true,
|
||||
tableNames: [
|
||||
'DataFlicker',
|
||||
<!-- 'DataFluc',-->
|
||||
<!-- 'DataHarmphasicI',-->
|
||||
<!-- 'DataHarmphasicV',-->
|
||||
<!-- 'DataHarmpowerP',-->
|
||||
<!-- 'DataHarmpowerQ',-->
|
||||
<!-- 'DataHarmpowerS',-->
|
||||
<!-- 'DataHarmrateI',-->
|
||||
<!-- 'DataFluc',-->
|
||||
<!-- 'DataHarmphasicI',-->
|
||||
<!-- 'DataHarmphasicV',-->
|
||||
<!-- 'DataHarmpowerP',-->
|
||||
<!-- 'DataHarmpowerQ',-->
|
||||
<!-- 'DataHarmpowerS',-->
|
||||
<!-- 'DataHarmrateI',-->
|
||||
'DataHarmrateV',
|
||||
<!-- 'DataInharmI',-->
|
||||
<!-- 'DataInharmI',-->
|
||||
'DataInharmV',
|
||||
'DataI',
|
||||
'DataPlt',
|
||||
@@ -165,34 +166,40 @@
|
||||
],
|
||||
tableNames2: [
|
||||
'DataFlicker',
|
||||
<!-- 'DataFluc',-->
|
||||
<!-- 'DataHarmphasicI',-->
|
||||
<!-- 'DataHarmphasicV',-->
|
||||
<!-- 'DataHarmpowerP',-->
|
||||
<!-- 'DataHarmpowerQ',-->
|
||||
<!-- 'DataHarmpowerS',-->
|
||||
<!-- 'DataHarmrateI',-->
|
||||
<!-- 'DataFluc',-->
|
||||
<!-- 'DataHarmphasicI',-->
|
||||
<!-- 'DataHarmphasicV',-->
|
||||
<!-- 'DataHarmpowerP',-->
|
||||
<!-- 'DataHarmpowerQ',-->
|
||||
<!-- 'DataHarmpowerS',-->
|
||||
<!-- 'DataHarmrateI',-->
|
||||
'DataHarmrateV',
|
||||
<!-- 'DataInharmI',-->
|
||||
<!-- 'DataInharmI',-->
|
||||
'DataInharmV',
|
||||
'DataI',
|
||||
'DataPlt',
|
||||
'DataV',
|
||||
],
|
||||
date: [formatTime(new Date()), formatTime(new Date())],
|
||||
monitorId:'',
|
||||
monitorId: '',
|
||||
total: 0,
|
||||
currentPage: 1,
|
||||
pageSize: 20,
|
||||
})
|
||||
|
||||
const query = () => {
|
||||
const username = "data_njcn"
|
||||
const password = "dnzl@#002"
|
||||
const auth = username + ":" + password
|
||||
const encodedAuth = BASE64.encode(auth);
|
||||
|
||||
formInline.loading = true
|
||||
console.log('submit!')
|
||||
let url = window.location.origin + '/jobDetail/jobQuery';
|
||||
fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
"Authorization":'Basic '+ btoa(auth),
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
@@ -210,9 +217,9 @@
|
||||
})
|
||||
.then((res) => {
|
||||
console.log(res)
|
||||
res.records.forEach(item=>{
|
||||
for(let key in item){
|
||||
if(!item[key] && item[key] !== 0){
|
||||
res.records.forEach(item => {
|
||||
for (let key in item) {
|
||||
if (!item[key] && item[key] !== 0) {
|
||||
item[key] = '/'
|
||||
}
|
||||
}
|
||||
@@ -268,7 +275,7 @@
|
||||
|
||||
const oneMonitorMove = () => {
|
||||
|
||||
if(!formInline.monitorId){
|
||||
if (!formInline.monitorId) {
|
||||
ElementPlus.ElMessage.error('请填写监测点id')
|
||||
return
|
||||
}
|
||||
@@ -277,7 +284,7 @@
|
||||
tableNames: formInline.tableNames,
|
||||
startTime: formInline.date[0],
|
||||
endTime: formInline.date[1],
|
||||
monitorId:formInline.monitorId
|
||||
monitorId: formInline.monitorId
|
||||
}
|
||||
|
||||
fetch('/data/oneMonitorDataTransport', {
|
||||
@@ -301,11 +308,11 @@
|
||||
}
|
||||
|
||||
// 处理数据精度
|
||||
const formatNumber = (row,column) =>{
|
||||
const formatNumber = (row, column) => {
|
||||
const duration = row[column];
|
||||
if(/^-?\d*\.?\d+$/.test(duration)){
|
||||
if (/^-?\d*\.?\d+$/.test(duration)) {
|
||||
let time = Number(duration);
|
||||
return (time/60).toFixed(2) + " 分钟";
|
||||
return (time / 60).toFixed(2) + " 分钟";
|
||||
}
|
||||
return duration;
|
||||
}
|
||||
@@ -332,20 +339,19 @@
|
||||
}
|
||||
})
|
||||
}
|
||||
const handleCheckAllChange = ()=>{
|
||||
console.log(formInline.checkAll)
|
||||
if(formInline.checkAll){
|
||||
formInline.tableNames = JSON.parse(JSON.stringify(formInline.tableNames2))
|
||||
}else{
|
||||
formInline.tableNames = []
|
||||
}
|
||||
const handleCheckAllChange = () => {
|
||||
console.log(formInline.checkAll)
|
||||
if (formInline.checkAll) {
|
||||
formInline.tableNames = JSON.parse(JSON.stringify(formInline.tableNames2))
|
||||
} else {
|
||||
formInline.tableNames = []
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
return {
|
||||
handleCheckAllChange,
|
||||
handleCheckAllChange,
|
||||
message,
|
||||
formInline,
|
||||
query,
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.njcn.oracle.bo.po;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* @author wr
|
||||
* @description
|
||||
* @date 2024/9/25 11:01
|
||||
*/
|
||||
@Data
|
||||
@TableName("PQS_COMINFORMATION")
|
||||
public class ComInfoRmation implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@TableField("UPDATETIME")
|
||||
private LocalDateTime updateTime;
|
||||
|
||||
@TableField("LINE_INDEX")
|
||||
private String lineIndex;
|
||||
|
||||
@TableField("TYPE")
|
||||
private Integer type;
|
||||
|
||||
@TableField("DESCRIPTION")
|
||||
private String description;
|
||||
|
||||
@TableField("REMARK")
|
||||
private String remark;
|
||||
|
||||
@TableField("STATE")
|
||||
private Integer state;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.njcn.oracle.mapper;
|
||||
|
||||
import com.njcn.oracle.bo.po.ComInfoRmation;
|
||||
import com.njcn.oracle.mybatis.mapper.BatchBaseMapper;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Mapper 接口
|
||||
* </p>
|
||||
*
|
||||
* @author hongawen
|
||||
* @since 2023-12-28
|
||||
*/
|
||||
public interface ComInfoRmationMapper extends BatchBaseMapper<ComInfoRmation> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.njcn.oracle.mapper.ComInfoRmationMapper">
|
||||
|
||||
</mapper>
|
||||
@@ -11,7 +11,7 @@
|
||||
LEFT JOIN PQ_DEVICE pd ON
|
||||
PL.DEV_INDEX = PD.DEV_INDEX
|
||||
WHERE
|
||||
PL.STATUS = 0
|
||||
pd.DEVFLAG = 0
|
||||
</select>
|
||||
|
||||
</mapper>
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.njcn.oracle.service;
|
||||
|
||||
import com.njcn.oracle.bo.po.ComInfoRmation;
|
||||
import com.njcn.oracle.mybatis.service.IReplenishMybatisService;
|
||||
|
||||
|
||||
/**
|
||||
* @Description: 监测点状态监测数据
|
||||
* @param
|
||||
* @return:
|
||||
* @Author: wr
|
||||
* @Date: 2024/9/25 13:52
|
||||
*/
|
||||
public interface IComInfoRmationService extends IReplenishMybatisService<ComInfoRmation> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package com.njcn.oracle.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.njcn.oracle.bo.param.MigrationParam;
|
||||
import com.njcn.oracle.bo.po.ComInfoRmation;
|
||||
import com.njcn.oracle.mapper.ComInfoRmationMapper;
|
||||
import com.njcn.oracle.mybatis.service.impl.ReplenishMybatisServiceImpl;
|
||||
import com.njcn.oracle.service.IComInfoRmationService;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StopWatch;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Description:
|
||||
* @Author: wr
|
||||
* @Date: 2024/9/25 13:58
|
||||
*/
|
||||
@Service
|
||||
public class ComInfoRmationServiceImpl extends ReplenishMybatisServiceImpl<ComInfoRmationMapper, ComInfoRmation> implements IComInfoRmationService {
|
||||
|
||||
@Override
|
||||
public List<ComInfoRmation> queryData(MigrationParam migrationParam) {
|
||||
//查询时间范围内的数据
|
||||
LambdaQueryWrapper<ComInfoRmation> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime());
|
||||
if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) {
|
||||
lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds());
|
||||
}
|
||||
return this.baseMapper.selectList(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@DS("target")
|
||||
public void clearTargetData(MigrationParam migrationParam) {
|
||||
LambdaQueryWrapper<ComInfoRmation> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.between(ComInfoRmation::getUpdateTime, migrationParam.getStartTime(), migrationParam.getEndTime());
|
||||
if (CollectionUtil.isNotEmpty(migrationParam.getLineIds())) {
|
||||
lambdaQueryWrapper.in(ComInfoRmation::getLineIndex, migrationParam.getLineIds());
|
||||
}
|
||||
this.baseMapper.delete(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 默认500分片,如果字段超过20以上,建议重写方法调整为1000分片,重写时不要忘记@DS注解
|
||||
*
|
||||
* @param data 数据集合
|
||||
*/
|
||||
@Override
|
||||
@DS("target")
|
||||
public void insertBatchByDB(List<ComInfoRmation> data) {
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
this.insertBatchBySlice(data, 100);
|
||||
stopWatch.stop();
|
||||
System.out.printf("pq_ComInfoRmation总计:%d条,耗时执行时长:%f 秒.%n", data.size(), stopWatch.getTotalTimeSeconds());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user