From 00beb8efa87c62c6a5eeb524bb839b0e8b576449 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Mon, 4 Nov 2024 16:25:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0redis=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=EF=BC=8C=E5=B0=86id=E7=BC=93=E5=AD=98=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E7=94=B1=E7=BC=93=E5=AD=98=E7=A7=BB=E5=8A=A8=E5=88=B0redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- influx-data/influx-source/pom.xml | 16 + .../njcn/influx/config/IdMappingCache.java | 20 +- .../com/njcn/influx/config/RedisConfig.java | 63 ++++ .../com/njcn/influx/config/RedisUtil.java | 353 ++++++++++++++++++ ...OracleMonitorStatusToMysqlServiceImpl.java | 7 +- .../impl/OracleToInfluxDBServiceImpl.java | 25 +- .../impl/PqsOnlineratePOServiceImpl.java | 17 +- .../JobDetailInfluxDBController.java | 16 +- .../src/main/resources/application.yml | 15 +- 9 files changed, 496 insertions(+), 36 deletions(-) create mode 100644 influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisConfig.java create mode 100644 influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisUtil.java diff --git a/influx-data/influx-source/pom.xml b/influx-data/influx-source/pom.xml index 9114d8b..ca65b46 100644 --- a/influx-data/influx-source/pom.xml +++ b/influx-data/influx-source/pom.xml @@ -98,6 +98,22 @@ oracle-source 1.0.0 + + org.springframework.boot + spring-boot-starter-data-redis + + + org.apache.logging + logging-parent + + + + + + org.apache.commons + commons-pool2 + + org.apache.commons diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/config/IdMappingCache.java b/influx-data/influx-source/src/main/java/com/njcn/influx/config/IdMappingCache.java index 2870d5b..4aaba2f 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/config/IdMappingCache.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/config/IdMappingCache.java @@ -2,14 +2,11 @@ package com.njcn.influx.config; import com.njcn.influx.bo.po.PqDeviceBak; import com.njcn.influx.bo.po.PqLineBak; -import com.njcn.influx.mapper.PqLineBakMapper; import com.njcn.influx.service.IPqDeviceBakService; import com.njcn.influx.service.PqLineBakService; -import io.swagger.v3.oas.annotations.servers.Server; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.HashMap; @@ -30,17 +27,23 @@ public class IdMappingCache { private PqLineBakService pqLineBakService; @Autowired private IPqDeviceBakService pqDeviceBakService; - public static Map DevIdMapping = new HashMap<>(); - public static Map LineIdMapping = new HashMap<>(); + + @Autowired + private RedisUtil redisUtil; + @PostConstruct public void init() { + Map DevIdMapping = new HashMap<>(); + Map LineIdMapping = new HashMap<>(); List resultList = pqLineBakService.list(); for (PqLineBak row : resultList) { String id = row.getId(); String lineId = row.getLineId(); LineIdMapping.put(lineId,id ); } + redisUtil.saveByKey("LineIdMapping",LineIdMapping); + List list = pqDeviceBakService.list(); for (PqDeviceBak row : list) { @@ -48,9 +51,8 @@ public class IdMappingCache { String devId = row.getDevId()+""; DevIdMapping.put(devId,id ); } + redisUtil.saveByKey("DevIdMapping",DevIdMapping); + } -// public String getDataById(String id) { -// return IdMapping.get(id); -// } } diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisConfig.java b/influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisConfig.java new file mode 100644 index 0000000..369f40b --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisConfig.java @@ -0,0 +1,63 @@ +package com.njcn.influx.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.njcn.influx.utils.InstantDateDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +import java.time.Instant; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2021年12月08日 17:53 + */ +@Configuration +public class RedisConfig { + + + /** + * 修改RedisTemplate序列化由JdkSerializationRedisSerializer二进制调整为:JSON格式 + */ + @Bean("redisTemplate") + public RedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { + // 为了开发方便,一般直接使用 + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(lettuceConnectionFactory); + // 用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 + Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); + ObjectMapper objectMapper = new ObjectMapper(); + // 指定要序列化的域(field,get,set),访问修饰符(public,private,protected) + //解决Java 8 date/time type java.time.Instant not supported + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.registerModule(new SimpleModule().addDeserializer(Instant.class, new InstantDateDeserializer())); + + objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); + StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); + jackson2JsonRedisSerializer.setObjectMapper(objectMapper); + // key采用string的序列化方式 + redisTemplate.setKeySerializer(stringRedisSerializer); + // value序列化方式采用jackson + redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); + // hash的key采用string的序列化方式 + redisTemplate.setHashKeySerializer(stringRedisSerializer); + // hash的value序列化方式采用jackson + redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); + redisTemplate.afterPropertiesSet(); + + return redisTemplate; + } + +} diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisUtil.java b/influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisUtil.java new file mode 100644 index 0000000..ecf80ff --- /dev/null +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/config/RedisUtil.java @@ -0,0 +1,353 @@ +package com.njcn.influx.config; + +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.TimeUnit; + + +/** + * @author hongawen + * @version 1.0.0 + * @date 2021年04月30日 09:38 + */ +@Component +@RequiredArgsConstructor +public class RedisUtil { + + private final RedisTemplate redisTemplate; + + /** + * 指定key的失效时间 + * 秒级别的过期时间 + */ + public void expire(String key, long time) { + redisTemplate.expire(key, time, TimeUnit.SECONDS); + } + + /** + * 根据key获取过期时间 + */ + public long getExpire(String key) { + Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS); + return Objects.isNull(expireTime) ? 0 : expireTime; + } + + /** + * 根据key获取过期时间(切库,切完之后自动换为原来库) + */ + public long getExpire(Integer dbIndex,String key) { + return getExpire(dbIndex,key,true); + } + + /** + * 根据key获取过期时间(切库)true:切回原库 false:不切回原库 + */ + public long getExpire(Integer dbIndex,String key,Boolean fly) { + Integer index = setDbIndex(dbIndex); + Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS); + if(fly){ + setDbIndex(index); + } + return Objects.isNull(expireTime) ? 0 : expireTime; + } + /** + * 判断key是否存在 + */ + public boolean hasKey(String key) { + Boolean hasKeyFlag = redisTemplate.hasKey(key); + if (Objects.isNull(hasKeyFlag)) { + return false; + } + return hasKeyFlag; + } + + /** + * 删除某个Key + */ + public void delete(String key) { + redisTemplate.delete(key); + } + + /** + * 批量删除keys + */ + public void deleteKeys(String... keys) { + redisTemplate.delete(Arrays.asList(keys)); + } + + /** + * 指定字符模糊匹配,批量删除keys + */ + public void deleteKeysByString(String str) { + Set keys = redisTemplate.keys(str.concat("*")); + // 删除所有匹配的key + if (keys != null && !keys.isEmpty()) { + redisTemplate.delete(keys); + } + } + + + /** + * 获取key对应的字符数据 + */ + public String getStringByKey(String key) { + return (String) redisTemplate.opsForValue().get(key); + } + + /** + * 根据key和库索引获取内容字(切库之后,之后查询都是切库数据) + * + * @param dbIndex 指定库索引 + * @param key key值 + */ + public String getStringByKey(Integer dbIndex,String key) { + return getStringByKey(dbIndex,key,true); + } + /** + * 根据key和库索引获取内容字(切库之后,之后查询都是切库数据) + * + * @param dbIndex 指定库索引 + * @param key key值 + * @param fly 是否切回原库 + */ + public String getStringByKey(Integer dbIndex,String key,Boolean fly) { + Integer index = setDbIndex(dbIndex); + String s = (String) redisTemplate.opsForValue().get(key); + if(fly){ + setDbIndex(index); + } + return s; + } + + /** + * 获取key对应对象数据 + */ + public Object getObjectByKey(String key) { + return redisTemplate.opsForValue().get(key); + } + + + /** + * 保存数据 + * + * @param key 键 + * @param value 值 + */ + public void saveByKey(String key, Object value) { + redisTemplate.boundValueOps(key).set(value); + + } + + /** + * 保存数据,指定生命周期(秒) + * + * @param key 键 + * @param value 值 + * @param expireTime 生命时间 + */ + public void saveByKeyWithExpire(String key, Object value, Long expireTime) { + if (expireTime <= 0) { + saveByKey(key, value); + } else { + redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS); + } + } + /** + * 保存数据,指定生命周期(秒) + * + * @param dbIndex 指定库索引 + * @param key 键 + * @param value 值 + * @param expireTime 生命时间 + * @param expireTime 生命时间 + */ + public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime) { + saveByKeyWithExpire(dbIndex,key,value,expireTime,true); + } + /** + * 保存数据,指定生命周期(秒) + * + * @param dbIndex 指定库索引 + * @param key 键 + * @param value 值 + * @param fly 是否切回原库 + */ + public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime,Boolean fly) { + Integer index = setDbIndex(dbIndex); + if (expireTime <= 0) { + saveByKey(key, value); + } else { + redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS); + } + if(fly){ + setDbIndex(index); + } + } + + /** + * 顺序的递增和递减 + * + * @param value 增减根据数值的正负来判断 + */ + public void increment(String key, Long value) { + redisTemplate.boundValueOps(key).increment(value); + } + + + /** + * 添加一个Map集合 + */ + public void saveMapValue(String key, Map map, long expireTime) { + redisTemplate.boundHashOps(key).putAll(map); + if (expireTime > 0) { + expire(key, expireTime); + } + } + + /** + * 获取map中所有的keys + */ + public Set getMapKeys(String key) { + return redisTemplate.boundHashOps(key).keys(); + } + + /** + * 获取map中所有的values + */ + public List getMapValues(String key) { + return redisTemplate.boundHashOps(key).values(); + } + + /** + * 根据map中某个key获取对应的value + */ + public Object getMapValueByMapKey(String redisKey, String mapKey) { + return redisTemplate.boundHashOps(redisKey).get(mapKey); + } + + /** + * 根据map中的某个key删除对应的value + */ + public void deleteMapValueByMapKey(String redisKey, String mapKey) { + redisTemplate.boundHashOps(redisKey).delete(mapKey); + } + + /** + * 判断map中是否有指定的key + */ + public boolean hasMapKey(String redisKey, String mapKey) { + Boolean hasKey = redisTemplate.boundHashOps(redisKey).hasKey(mapKey); + if (Objects.isNull(hasKey)) { + return false; + } + return hasKey; + } + + /** + * 右存放List + */ + public void saveRightListByKey(String key, List values, long expireTime) { + redisTemplate.boundListOps(key).rightPushAll(values); + if (expireTime > 0) { + expire(key, expireTime); + } + } + + /** + * 左存放List + */ + public void saveLeftListByKey(String key, List values, long expireTime) { + redisTemplate.boundListOps(key).leftPushAll(values); + if (expireTime > 0) { + expire(key, expireTime); + } + } + + + /** + * 获取List某范围的值 + * + * @param start 起始索引 + * @param end 截止索引 + */ + public List getListRangeValues(String key, long start, long end) { + long size = getListSize(key); + if ((start < 0 && end < 0) || (start > end)) { + start = 0; + end = size; + } else if (end > size) { + end = size; + } else if (start < 0) { + start = 0; + } + return redisTemplate.boundListOps(key).range(start, end); + } + + /** + * 获取List所有数据 + */ + public List getListAllValues(String key) { + long size = getListSize(key); + return redisTemplate.boundListOps(key).range(0, size); + } + + /** + * 根据指定索引获取指定value + */ + public Object getListValueByIndex(String key, int index) { + return redisTemplate.boundListOps(key).index(index); + } + + /** + * 获取list长度 + */ + public Long getListSize(String key) { + return Objects.isNull(redisTemplate.boundListOps(key).size()) ? 0 : redisTemplate.boundListOps(key).size(); + } + + /** + * 根据索引修改某个值 + */ + public void updateListValueByIndex(String key, int index, Object newObj) { + redisTemplate.boundListOps(key).set(index, newObj); + } + + /** + * 根据某个key模糊查询,并取出value + * @param key + * @return + */ + public List getLikeListAllValues(String key) { + List info=new ArrayList<>(); + for (String s : redisTemplate.keys(key + "*")) { + info.add(redisTemplate.opsForValue().get(s)); + } + return info; + } + + /** + * 数据切库 + * @param dbIndex + * @return + */ + private Integer setDbIndex(Integer dbIndex) { + if (dbIndex == null || dbIndex > 15 || dbIndex < 0) { + dbIndex = 0; + } + LettuceConnectionFactory jedisConnectionFactory = (LettuceConnectionFactory) redisTemplate + .getConnectionFactory(); + int database = jedisConnectionFactory.getDatabase(); + jedisConnectionFactory.setDatabase(dbIndex); + redisTemplate.setConnectionFactory(jedisConnectionFactory); + //需要刷新才能生效 + jedisConnectionFactory.afterPropertiesSet(); + // 重置连接 + jedisConnectionFactory.resetConnection(); + return database; + } + +} diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleMonitorStatusToMysqlServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleMonitorStatusToMysqlServiceImpl.java index 6da4dc5..1c1a31a 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleMonitorStatusToMysqlServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleMonitorStatusToMysqlServiceImpl.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.njcn.influx.bo.po.*; import com.njcn.influx.config.IdMappingCache; +import com.njcn.influx.config.RedisUtil; import com.njcn.influx.mapper.OracleRmpEventDetailPOMapper; import com.njcn.influx.mapper.PqDeviceMapper; import com.njcn.influx.mapper.RmpEventDetailPOMapper; @@ -43,6 +44,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu private final OraclePqLineDetailService oraclePqLineDetailService; private final IPqLineDetailService pqLineDetailService; private final OracleRmpEventDetailPOMapper oracleRmpEventDetailPOMapper; + private final RedisUtil redisUtil; /** * 1.查询oracle装置表信息 @@ -51,6 +53,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu */ @Override public void monitorStatusSync() { + Map DevIdMapping = (Map) redisUtil.getObjectByKey("DevIdMapping"); List monitorDicList = rmpEventDetailPOMapper.selectByDicCodeList("Line_State"); List devDicList = rmpEventDetailPOMapper.selectByDicCodeList("Dev_Status"); List list = pqDeviceBakService.list(); @@ -62,10 +65,10 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu List tempList = new ArrayList<>(); pqDevices.stream().forEach(temp->{ String id =""; - if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){ + if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){ return; }else { - id=IdMappingCache.DevIdMapping.get(temp.getDevIndex()+""); + id=DevIdMapping.get(temp.getDevIndex()+""); } pqDeviceMysqlService.update(new LambdaUpdateWrapper() .set(PqDeviceMysql::getRunFlag,temp.getDevFlag()) diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java index 78c4b67..c2adb48 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/OracleToInfluxDBServiceImpl.java @@ -12,6 +12,7 @@ import com.njcn.influx.bo.po.JobDetailHoursInfluxDB; import com.njcn.influx.bo.po.JobDetailInfluxDB; import com.njcn.influx.bo.po.JobHistoryLogInfluxdb; import com.njcn.influx.config.IdMappingCache; +import com.njcn.influx.config.RedisUtil; import com.njcn.influx.service.JobDetailHoursInfluxDBService; import com.njcn.influx.service.JobDetailInfluxDBService; import com.njcn.influx.service.JobHistoryLogInfluxdbService; @@ -67,13 +68,14 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService; private final LineTimeMapper lineTimeMapper; - + private final RedisUtil redisUtil; @Value("${business.slice:2}") private int slice; @Override @Async public void dataBacthSysc(DataAsynParam dataAsynParam) { + Map LineIdMapping = (Map) redisUtil.getObjectByKey("LineIdMapping"); Runtime runtime = Runtime.getRuntime(); System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); @@ -131,11 +133,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { Field id = obj.getClass().getDeclaredField("lineid"); id.setAccessible(true); //暴力访问id String id1 = id.get(obj).toString(); - if (!IdMappingCache.LineIdMapping.containsKey(id1)){ + if (!LineIdMapping.containsKey(id1)){ log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); iterator.remove(); }else { - id.set(obj, IdMappingCache.LineIdMapping.get(id1)); + id.set(obj, LineIdMapping.get(id1)); } }catch (Exception e){ e.printStackTrace(); @@ -217,6 +219,9 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { //冀北现场存在数据更新时间不在1小时内,会丢失数据的情况,这边根据装置最新时间往前推1个小时查询数据 @Override public void hourseDataBacthSysc(DataAsynParam dataAsynParam) { + Map LineIdMapping = (Map) redisUtil.getObjectByKey("LineIdMapping"); + Map DevIdMapping = (Map) redisUtil.getObjectByKey("DevIdMapping"); + Runtime runtime = Runtime.getRuntime(); System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); @@ -266,11 +271,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { Object o = id.get(obj); if(ObjectUtil.isNotNull(o)){ int index = Integer.parseInt(o.toString())/10; - if (!IdMappingCache.DevIdMapping.containsKey(index+"")){ + if (!DevIdMapping.containsKey(index+"")){ log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到devid匹配的devid"+index); iterator.remove(); }else { - id.set(obj, IdMappingCache.DevIdMapping.get(index+"")); + id.set(obj, DevIdMapping.get(index+"")); } } }catch (Exception e){ @@ -300,11 +305,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { Field id = obj.getClass().getDeclaredField("lineid"); id.setAccessible(true); //暴力访问id String id1 = id.get(obj).toString(); - if (!IdMappingCache.LineIdMapping.containsKey(id1)){ + if (!LineIdMapping.containsKey(id1)){ log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1); iterator.remove(); }else { - id.set(obj, IdMappingCache.LineIdMapping.get(id1)); + id.set(obj,LineIdMapping.get(id1)); } }catch (Exception e){ e.printStackTrace(); @@ -399,6 +404,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { @Override public void oneMonitorDataTransport(DataAsynParam dataAsynParam) { + Map LineIdMapping = (Map) redisUtil.getObjectByKey("LineIdMapping"); + Runtime runtime = Runtime.getRuntime(); System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB"); System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB"); @@ -440,11 +447,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService { Field id = obj.getClass().getDeclaredField("lineid"); id.setAccessible(true); //暴力访问id String id1 = id.get(obj).toString(); - if (!IdMappingCache.LineIdMapping.containsKey(id1)) { + if (!LineIdMapping.containsKey(id1)) { log.info(tableName + "表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid" + id1); iterator.remove(); } else { - id.set(obj, IdMappingCache.LineIdMapping.get(id1)); + id.set(obj, LineIdMapping.get(id1)); } } catch (Exception e) { e.printStackTrace(); diff --git a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/PqsOnlineratePOServiceImpl.java b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/PqsOnlineratePOServiceImpl.java index 8e7d720..c1f1e5d 100644 --- a/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/PqsOnlineratePOServiceImpl.java +++ b/influx-data/influx-source/src/main/java/com/njcn/influx/service/impl/PqsOnlineratePOServiceImpl.java @@ -3,30 +3,24 @@ package com.njcn.influx.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.LocalDateTimeUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.njcn.influx.bo.param.TableEnum; import com.njcn.influx.bo.po.PqsOnlinerateMysql; -import com.njcn.influx.config.IdMappingCache; -import com.njcn.influx.service.IPqDeviceBakService; +import com.njcn.influx.config.RedisUtil; import com.njcn.influx.service.PqsOnlinerateMysqlService; import com.njcn.oracle.bo.param.DataAsynParam; -import com.njcn.oracle.bo.po.DataFlicker; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import org.springframework.beans.factory.annotation.Autowired; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.influx.bo.po.PqsOnlineratePO; import com.njcn.influx.mapper.PqsOnlineratePOMapper; import com.njcn.influx.service.PqsOnlineratePOService; -import org.springframework.transaction.annotation.Transactional; /** * @@ -41,9 +35,10 @@ import org.springframework.transaction.annotation.Transactional; @Slf4j public class PqsOnlineratePOServiceImpl extends ServiceImpl implements PqsOnlineratePOService{ private final PqsOnlinerateMysqlService pqsOnlinerateMysqlService; + private final RedisUtil redisUtil; @Override public void minutesDataBacthSysc(DataAsynParam dataAsynParam) { - + Map DevIdMapping = (Map) redisUtil.getObjectByKey("DevIdMapping"); List list = this.lambdaQuery().between(PqsOnlineratePO::getTimeid, LocalDateTimeUtil.beginOfDay(dataAsynParam.getStartDateTime()), LocalDateTimeUtil.endOfDay(dataAsynParam.getStartDateTime()).minusSeconds(1)).list(); log.info(dataAsynParam.getStartDateTime()+"-----数据量:"+list.size()); if (CollectionUtil.isNotEmpty(list)) { @@ -51,10 +46,10 @@ public class PqsOnlineratePOServiceImpl extends ServiceImpl { PqsOnlinerateMysql pqsOnlinerateMysql = new PqsOnlinerateMysql(); - if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){ + if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){ return; }else { - pqsOnlinerateMysql.setDevIndex(IdMappingCache.DevIdMapping.get(temp.getDevIndex()+"")); + pqsOnlinerateMysql.setDevIndex(DevIdMapping.get(temp.getDevIndex()+"")); } pqsOnlinerateMysql.setTimeId(temp.getTimeid()); pqsOnlinerateMysql.setOnlineMin(temp.getOnlinemin()); diff --git a/influx-data/influx-target/src/main/java/com/njcn/influx/controller/JobDetailInfluxDBController.java b/influx-data/influx-target/src/main/java/com/njcn/influx/controller/JobDetailInfluxDBController.java index 086363e..e5f3a31 100644 --- a/influx-data/influx-target/src/main/java/com/njcn/influx/controller/JobDetailInfluxDBController.java +++ b/influx-data/influx-target/src/main/java/com/njcn/influx/controller/JobDetailInfluxDBController.java @@ -3,7 +3,9 @@ package com.njcn.influx.controller; import com.baomidou.mybatisplus.core.metadata.IPage; import com.njcn.influx.bo.po.JobDetailInfluxDB; +import com.njcn.influx.config.IdMappingCache; import com.njcn.influx.service.JobDetailInfluxDBService; +import com.njcn.oracle.bo.param.DataAsynParam; import com.njcn.oracle.bo.param.JobQueryParam; import com.njcn.oracle.bo.po.JobDetail; import com.njcn.oracle.service.JobDetailService; @@ -13,10 +15,7 @@ import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; /** * Description: @@ -34,6 +33,7 @@ import org.springframework.web.bind.annotation.RestController; public class JobDetailInfluxDBController { private final JobDetailInfluxDBService jobDetailInfluxDBService; + private final IdMappingCache idMappingCache; @PostMapping("/jobQuery") @ApiOperation("任务查询") @ApiImplicitParam(name = "jobQueryParam", value = "任务查询参数", required = true) @@ -48,4 +48,12 @@ public class JobDetailInfluxDBController { boolean b = jobDetailInfluxDBService.jobRemove(jobDetail); return b; } + + @GetMapping("/refreshIdCache") + @ApiOperation("刷新缓存") + public Boolean refreshIdCache() { + DataAsynParam dataAsynParam = new DataAsynParam(); + idMappingCache.init(); + return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步"); + } } diff --git a/influx-data/influx-target/src/main/resources/application.yml b/influx-data/influx-target/src/main/resources/application.yml index 2a5b2fe..faed30b 100644 --- a/influx-data/influx-target/src/main/resources/application.yml +++ b/influx-data/influx-target/src/main/resources/application.yml @@ -1,4 +1,4 @@ -#文件位置配置 + business: #分片次数,一定为24的约数,1 2 3 4 6 8 12 24 slice: 4 @@ -87,6 +87,19 @@ spring: username: root password: njcnpqs driver-class-name: com.mysql.cj.jdbc.Driver + redis: + database: 15 + host: 192.168.1.22 + port: 16379 + password: njcnpqs + timeout: 5000 + lettuce: + pool: + max-active: 8 + max-wait: -1 + max-idle: 8 + min-idle: 0 + #不做限制的参数配置 #mybatis配置信息 mybatis-plus: #别名扫描