添加redis配置,将id缓存迁移由缓存移动到redis
This commit is contained in:
@@ -98,6 +98,22 @@
|
||||
<artifactId>oracle-source</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.logging</groupId>
|
||||
<artifactId>logging-parent</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-pool2</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -2,14 +2,11 @@ package com.njcn.influx.config;
|
||||
|
||||
import com.njcn.influx.bo.po.PqDeviceBak;
|
||||
import com.njcn.influx.bo.po.PqLineBak;
|
||||
import com.njcn.influx.mapper.PqLineBakMapper;
|
||||
import com.njcn.influx.service.IPqDeviceBakService;
|
||||
import com.njcn.influx.service.PqLineBakService;
|
||||
import io.swagger.v3.oas.annotations.servers.Server;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.HashMap;
|
||||
@@ -30,17 +27,23 @@ public class IdMappingCache {
|
||||
private PqLineBakService pqLineBakService;
|
||||
@Autowired
|
||||
private IPqDeviceBakService pqDeviceBakService;
|
||||
public static Map<String, String> DevIdMapping = new HashMap<>();
|
||||
public static Map<String, String> LineIdMapping = new HashMap<>();
|
||||
|
||||
@Autowired
|
||||
private RedisUtil redisUtil;
|
||||
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
Map<String, String> DevIdMapping = new HashMap<>();
|
||||
Map<String, String> LineIdMapping = new HashMap<>();
|
||||
List<PqLineBak> resultList = pqLineBakService.list();
|
||||
for (PqLineBak row : resultList) {
|
||||
String id = row.getId();
|
||||
String lineId = row.getLineId();
|
||||
LineIdMapping.put(lineId,id );
|
||||
}
|
||||
redisUtil.saveByKey("LineIdMapping",LineIdMapping);
|
||||
|
||||
List<PqDeviceBak> list = pqDeviceBakService.list();
|
||||
|
||||
for (PqDeviceBak row : list) {
|
||||
@@ -48,9 +51,8 @@ public class IdMappingCache {
|
||||
String devId = row.getDevId()+"";
|
||||
DevIdMapping.put(devId,id );
|
||||
}
|
||||
redisUtil.saveByKey("DevIdMapping",DevIdMapping);
|
||||
|
||||
}
|
||||
|
||||
// public String getDataById(String id) {
|
||||
// return IdMapping.get(id);
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.njcn.influx.config;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import com.njcn.influx.utils.InstantDateDeserializer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
|
||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
* @version 1.0.0
|
||||
* @date 2021年12月08日 17:53
|
||||
*/
|
||||
@Configuration
|
||||
public class RedisConfig {
|
||||
|
||||
|
||||
/**
|
||||
* 修改RedisTemplate序列化由JdkSerializationRedisSerializer二进制调整为:JSON格式
|
||||
*/
|
||||
@Bean("redisTemplate")
|
||||
public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
|
||||
// 为了开发方便,一般直接使用<String,object>
|
||||
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
|
||||
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
|
||||
// 用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
|
||||
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
// 指定要序列化的域(field,get,set),访问修饰符(public,private,protected)
|
||||
//解决Java 8 date/time type java.time.Instant not supported
|
||||
objectMapper.registerModule(new JavaTimeModule());
|
||||
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
objectMapper.registerModule(new SimpleModule().addDeserializer(Instant.class, new InstantDateDeserializer()));
|
||||
|
||||
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
|
||||
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
|
||||
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
|
||||
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
|
||||
// key采用string的序列化方式
|
||||
redisTemplate.setKeySerializer(stringRedisSerializer);
|
||||
// value序列化方式采用jackson
|
||||
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
|
||||
// hash的key采用string的序列化方式
|
||||
redisTemplate.setHashKeySerializer(stringRedisSerializer);
|
||||
// hash的value序列化方式采用jackson
|
||||
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
|
||||
redisTemplate.afterPropertiesSet();
|
||||
|
||||
return redisTemplate;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,353 @@
|
||||
package com.njcn.influx.config;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
* @version 1.0.0
|
||||
* @date 2021年04月30日 09:38
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RedisUtil {
|
||||
|
||||
private final RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
/**
|
||||
* 指定key的失效时间
|
||||
* 秒级别的过期时间
|
||||
*/
|
||||
public void expire(String key, long time) {
|
||||
redisTemplate.expire(key, time, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key获取过期时间
|
||||
*/
|
||||
public long getExpire(String key) {
|
||||
Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
|
||||
return Objects.isNull(expireTime) ? 0 : expireTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key获取过期时间(切库,切完之后自动换为原来库)
|
||||
*/
|
||||
public long getExpire(Integer dbIndex,String key) {
|
||||
return getExpire(dbIndex,key,true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key获取过期时间(切库)true:切回原库 false:不切回原库
|
||||
*/
|
||||
public long getExpire(Integer dbIndex,String key,Boolean fly) {
|
||||
Integer index = setDbIndex(dbIndex);
|
||||
Long expireTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
|
||||
if(fly){
|
||||
setDbIndex(index);
|
||||
}
|
||||
return Objects.isNull(expireTime) ? 0 : expireTime;
|
||||
}
|
||||
/**
|
||||
* 判断key是否存在
|
||||
*/
|
||||
public boolean hasKey(String key) {
|
||||
Boolean hasKeyFlag = redisTemplate.hasKey(key);
|
||||
if (Objects.isNull(hasKeyFlag)) {
|
||||
return false;
|
||||
}
|
||||
return hasKeyFlag;
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除某个Key
|
||||
*/
|
||||
public void delete(String key) {
|
||||
redisTemplate.delete(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量删除keys
|
||||
*/
|
||||
public void deleteKeys(String... keys) {
|
||||
redisTemplate.delete(Arrays.asList(keys));
|
||||
}
|
||||
|
||||
/**
|
||||
* 指定字符模糊匹配,批量删除keys
|
||||
*/
|
||||
public void deleteKeysByString(String str) {
|
||||
Set<String> keys = redisTemplate.keys(str.concat("*"));
|
||||
// 删除所有匹配的key
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
redisTemplate.delete(keys);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取key对应的字符数据
|
||||
*/
|
||||
public String getStringByKey(String key) {
|
||||
return (String) redisTemplate.opsForValue().get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key和库索引获取内容字(切库之后,之后查询都是切库数据)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key key值
|
||||
*/
|
||||
public String getStringByKey(Integer dbIndex,String key) {
|
||||
return getStringByKey(dbIndex,key,true);
|
||||
}
|
||||
/**
|
||||
* 根据key和库索引获取内容字(切库之后,之后查询都是切库数据)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key key值
|
||||
* @param fly 是否切回原库
|
||||
*/
|
||||
public String getStringByKey(Integer dbIndex,String key,Boolean fly) {
|
||||
Integer index = setDbIndex(dbIndex);
|
||||
String s = (String) redisTemplate.opsForValue().get(key);
|
||||
if(fly){
|
||||
setDbIndex(index);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取key对应对象数据
|
||||
*/
|
||||
public Object getObjectByKey(String key) {
|
||||
return redisTemplate.opsForValue().get(key);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 保存数据
|
||||
*
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
*/
|
||||
public void saveByKey(String key, Object value) {
|
||||
redisTemplate.boundValueOps(key).set(value);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存数据,指定生命周期(秒)
|
||||
*
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param expireTime 生命时间
|
||||
*/
|
||||
public void saveByKeyWithExpire(String key, Object value, Long expireTime) {
|
||||
if (expireTime <= 0) {
|
||||
saveByKey(key, value);
|
||||
} else {
|
||||
redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 保存数据,指定生命周期(秒)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param expireTime 生命时间
|
||||
* @param expireTime 生命时间
|
||||
*/
|
||||
public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime) {
|
||||
saveByKeyWithExpire(dbIndex,key,value,expireTime,true);
|
||||
}
|
||||
/**
|
||||
* 保存数据,指定生命周期(秒)
|
||||
*
|
||||
* @param dbIndex 指定库索引
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param fly 是否切回原库
|
||||
*/
|
||||
public void saveByKeyWithExpire(Integer dbIndex,String key, Object value, Integer expireTime,Boolean fly) {
|
||||
Integer index = setDbIndex(dbIndex);
|
||||
if (expireTime <= 0) {
|
||||
saveByKey(key, value);
|
||||
} else {
|
||||
redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS);
|
||||
}
|
||||
if(fly){
|
||||
setDbIndex(index);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 顺序的递增和递减
|
||||
*
|
||||
* @param value 增减根据数值的正负来判断
|
||||
*/
|
||||
public void increment(String key, Long value) {
|
||||
redisTemplate.boundValueOps(key).increment(value);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 添加一个Map集合
|
||||
*/
|
||||
public void saveMapValue(String key, Map<String, ?> map, long expireTime) {
|
||||
redisTemplate.boundHashOps(key).putAll(map);
|
||||
if (expireTime > 0) {
|
||||
expire(key, expireTime);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取map中所有的keys
|
||||
*/
|
||||
public Set<?> getMapKeys(String key) {
|
||||
return redisTemplate.boundHashOps(key).keys();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取map中所有的values
|
||||
*/
|
||||
public List<?> getMapValues(String key) {
|
||||
return redisTemplate.boundHashOps(key).values();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据map中某个key获取对应的value
|
||||
*/
|
||||
public Object getMapValueByMapKey(String redisKey, String mapKey) {
|
||||
return redisTemplate.boundHashOps(redisKey).get(mapKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据map中的某个key删除对应的value
|
||||
*/
|
||||
public void deleteMapValueByMapKey(String redisKey, String mapKey) {
|
||||
redisTemplate.boundHashOps(redisKey).delete(mapKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断map中是否有指定的key
|
||||
*/
|
||||
public boolean hasMapKey(String redisKey, String mapKey) {
|
||||
Boolean hasKey = redisTemplate.boundHashOps(redisKey).hasKey(mapKey);
|
||||
if (Objects.isNull(hasKey)) {
|
||||
return false;
|
||||
}
|
||||
return hasKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* 右存放List
|
||||
*/
|
||||
public void saveRightListByKey(String key, List<?> values, long expireTime) {
|
||||
redisTemplate.boundListOps(key).rightPushAll(values);
|
||||
if (expireTime > 0) {
|
||||
expire(key, expireTime);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 左存放List
|
||||
*/
|
||||
public void saveLeftListByKey(String key, List<?> values, long expireTime) {
|
||||
redisTemplate.boundListOps(key).leftPushAll(values);
|
||||
if (expireTime > 0) {
|
||||
expire(key, expireTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取List某范围的值
|
||||
*
|
||||
* @param start 起始索引
|
||||
* @param end 截止索引
|
||||
*/
|
||||
public List<?> getListRangeValues(String key, long start, long end) {
|
||||
long size = getListSize(key);
|
||||
if ((start < 0 && end < 0) || (start > end)) {
|
||||
start = 0;
|
||||
end = size;
|
||||
} else if (end > size) {
|
||||
end = size;
|
||||
} else if (start < 0) {
|
||||
start = 0;
|
||||
}
|
||||
return redisTemplate.boundListOps(key).range(start, end);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取List所有数据
|
||||
*/
|
||||
public List<?> getListAllValues(String key) {
|
||||
long size = getListSize(key);
|
||||
return redisTemplate.boundListOps(key).range(0, size);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据指定索引获取指定value
|
||||
*/
|
||||
public Object getListValueByIndex(String key, int index) {
|
||||
return redisTemplate.boundListOps(key).index(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取list长度
|
||||
*/
|
||||
public Long getListSize(String key) {
|
||||
return Objects.isNull(redisTemplate.boundListOps(key).size()) ? 0 : redisTemplate.boundListOps(key).size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据索引修改某个值
|
||||
*/
|
||||
public void updateListValueByIndex(String key, int index, Object newObj) {
|
||||
redisTemplate.boundListOps(key).set(index, newObj);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据某个key模糊查询,并取出value
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public List<?> getLikeListAllValues(String key) {
|
||||
List<Object> info=new ArrayList<>();
|
||||
for (String s : redisTemplate.keys(key + "*")) {
|
||||
info.add(redisTemplate.opsForValue().get(s));
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据切库
|
||||
* @param dbIndex
|
||||
* @return
|
||||
*/
|
||||
private Integer setDbIndex(Integer dbIndex) {
|
||||
if (dbIndex == null || dbIndex > 15 || dbIndex < 0) {
|
||||
dbIndex = 0;
|
||||
}
|
||||
LettuceConnectionFactory jedisConnectionFactory = (LettuceConnectionFactory) redisTemplate
|
||||
.getConnectionFactory();
|
||||
int database = jedisConnectionFactory.getDatabase();
|
||||
jedisConnectionFactory.setDatabase(dbIndex);
|
||||
redisTemplate.setConnectionFactory(jedisConnectionFactory);
|
||||
//需要刷新才能生效
|
||||
jedisConnectionFactory.afterPropertiesSet();
|
||||
// 重置连接
|
||||
jedisConnectionFactory.resetConnection();
|
||||
return database;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.njcn.influx.bo.po.*;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.config.RedisUtil;
|
||||
import com.njcn.influx.mapper.OracleRmpEventDetailPOMapper;
|
||||
import com.njcn.influx.mapper.PqDeviceMapper;
|
||||
import com.njcn.influx.mapper.RmpEventDetailPOMapper;
|
||||
@@ -43,6 +44,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
|
||||
private final OraclePqLineDetailService oraclePqLineDetailService;
|
||||
private final IPqLineDetailService pqLineDetailService;
|
||||
private final OracleRmpEventDetailPOMapper oracleRmpEventDetailPOMapper;
|
||||
private final RedisUtil redisUtil;
|
||||
|
||||
/**
|
||||
* 1.查询oracle装置表信息
|
||||
@@ -51,6 +53,7 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
|
||||
*/
|
||||
@Override
|
||||
public void monitorStatusSync() {
|
||||
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
|
||||
List<DictData> monitorDicList = rmpEventDetailPOMapper.selectByDicCodeList("Line_State");
|
||||
List<DictData> devDicList = rmpEventDetailPOMapper.selectByDicCodeList("Dev_Status");
|
||||
List<PqDeviceBak> list = pqDeviceBakService.list();
|
||||
@@ -62,10 +65,10 @@ public class OracleMonitorStatusToMysqlServiceImpl implements OracleMonitorStatu
|
||||
List<PqDeviceMysql> tempList = new ArrayList<>();
|
||||
pqDevices.stream().forEach(temp->{
|
||||
String id ="";
|
||||
if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
return;
|
||||
}else {
|
||||
id=IdMappingCache.DevIdMapping.get(temp.getDevIndex()+"");
|
||||
id=DevIdMapping.get(temp.getDevIndex()+"");
|
||||
}
|
||||
pqDeviceMysqlService.update(new LambdaUpdateWrapper<PqDeviceMysql>()
|
||||
.set(PqDeviceMysql::getRunFlag,temp.getDevFlag())
|
||||
|
||||
@@ -12,6 +12,7 @@ import com.njcn.influx.bo.po.JobDetailHoursInfluxDB;
|
||||
import com.njcn.influx.bo.po.JobDetailInfluxDB;
|
||||
import com.njcn.influx.bo.po.JobHistoryLogInfluxdb;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.config.RedisUtil;
|
||||
import com.njcn.influx.service.JobDetailHoursInfluxDBService;
|
||||
import com.njcn.influx.service.JobDetailInfluxDBService;
|
||||
import com.njcn.influx.service.JobHistoryLogInfluxdbService;
|
||||
@@ -67,13 +68,14 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
private final JobDetailHoursInfluxDBService jobDetailHoursInfluxDBService;
|
||||
|
||||
private final LineTimeMapper lineTimeMapper;
|
||||
|
||||
private final RedisUtil redisUtil;
|
||||
@Value("${business.slice:2}")
|
||||
private int slice;
|
||||
|
||||
@Override
|
||||
@Async
|
||||
public void dataBacthSysc(DataAsynParam dataAsynParam) {
|
||||
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
@@ -131,11 +133,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!IdMappingCache.LineIdMapping.containsKey(id1)){
|
||||
if (!LineIdMapping.containsKey(id1)){
|
||||
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
|
||||
iterator.remove();
|
||||
}else {
|
||||
id.set(obj, IdMappingCache.LineIdMapping.get(id1));
|
||||
id.set(obj, LineIdMapping.get(id1));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
@@ -217,6 +219,9 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
//冀北现场存在数据更新时间不在1小时内,会丢失数据的情况,这边根据装置最新时间往前推1个小时查询数据
|
||||
@Override
|
||||
public void hourseDataBacthSysc(DataAsynParam dataAsynParam) {
|
||||
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
|
||||
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
|
||||
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
@@ -266,11 +271,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
Object o = id.get(obj);
|
||||
if(ObjectUtil.isNotNull(o)){
|
||||
int index = Integer.parseInt(o.toString())/10;
|
||||
if (!IdMappingCache.DevIdMapping.containsKey(index+"")){
|
||||
if (!DevIdMapping.containsKey(index+"")){
|
||||
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到devid匹配的devid"+index);
|
||||
iterator.remove();
|
||||
}else {
|
||||
id.set(obj, IdMappingCache.DevIdMapping.get(index+""));
|
||||
id.set(obj, DevIdMapping.get(index+""));
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
@@ -300,11 +305,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!IdMappingCache.LineIdMapping.containsKey(id1)){
|
||||
if (!LineIdMapping.containsKey(id1)){
|
||||
log.info(tableName+"表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid"+id1);
|
||||
iterator.remove();
|
||||
}else {
|
||||
id.set(obj, IdMappingCache.LineIdMapping.get(id1));
|
||||
id.set(obj,LineIdMapping.get(id1));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
@@ -399,6 +404,8 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
|
||||
@Override
|
||||
public void oneMonitorDataTransport(DataAsynParam dataAsynParam) {
|
||||
Map<String, String> LineIdMapping = (Map<String, String>) redisUtil.getObjectByKey("LineIdMapping");
|
||||
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
System.out.println("开始执行前总堆内存为:" + runtime.totalMemory() / (1024 * 1024) + " MB");
|
||||
System.out.println("开始执行前已用堆内存为:" + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + " MB");
|
||||
@@ -440,11 +447,11 @@ public class OracleToInfluxDBServiceImpl implements OracleToInfluxDBService {
|
||||
Field id = obj.getClass().getDeclaredField("lineid");
|
||||
id.setAccessible(true); //暴力访问id
|
||||
String id1 = id.get(obj).toString();
|
||||
if (!IdMappingCache.LineIdMapping.containsKey(id1)) {
|
||||
if (!LineIdMapping.containsKey(id1)) {
|
||||
log.info(tableName + "表---Oralcet数据同步到InfluxDB未找mysql中到lineid匹配的lineid" + id1);
|
||||
iterator.remove();
|
||||
} else {
|
||||
id.set(obj, IdMappingCache.LineIdMapping.get(id1));
|
||||
id.set(obj, LineIdMapping.get(id1));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
||||
@@ -3,30 +3,24 @@ package com.njcn.influx.service.impl;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.njcn.influx.bo.param.TableEnum;
|
||||
import com.njcn.influx.bo.po.PqsOnlinerateMysql;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.service.IPqDeviceBakService;
|
||||
import com.njcn.influx.config.RedisUtil;
|
||||
import com.njcn.influx.service.PqsOnlinerateMysqlService;
|
||||
import com.njcn.oracle.bo.param.DataAsynParam;
|
||||
import com.njcn.oracle.bo.po.DataFlicker;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.Map;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.influx.bo.po.PqsOnlineratePO;
|
||||
import com.njcn.influx.mapper.PqsOnlineratePOMapper;
|
||||
import com.njcn.influx.service.PqsOnlineratePOService;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -41,9 +35,10 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
@Slf4j
|
||||
public class PqsOnlineratePOServiceImpl extends ServiceImpl<PqsOnlineratePOMapper, PqsOnlineratePO> implements PqsOnlineratePOService{
|
||||
private final PqsOnlinerateMysqlService pqsOnlinerateMysqlService;
|
||||
private final RedisUtil redisUtil;
|
||||
@Override
|
||||
public void minutesDataBacthSysc(DataAsynParam dataAsynParam) {
|
||||
|
||||
Map<String, String> DevIdMapping = (Map<String, String>) redisUtil.getObjectByKey("DevIdMapping");
|
||||
List<PqsOnlineratePO> list = this.lambdaQuery().between(PqsOnlineratePO::getTimeid, LocalDateTimeUtil.beginOfDay(dataAsynParam.getStartDateTime()), LocalDateTimeUtil.endOfDay(dataAsynParam.getStartDateTime()).minusSeconds(1)).list();
|
||||
log.info(dataAsynParam.getStartDateTime()+"-----数据量:"+list.size());
|
||||
if (CollectionUtil.isNotEmpty(list)) {
|
||||
@@ -51,10 +46,10 @@ public class PqsOnlineratePOServiceImpl extends ServiceImpl<PqsOnlineratePOMappe
|
||||
list.stream().forEach(temp -> {
|
||||
PqsOnlinerateMysql pqsOnlinerateMysql = new PqsOnlinerateMysql();
|
||||
|
||||
if (!IdMappingCache.DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
if (!DevIdMapping.containsKey(temp.getDevIndex()+"")){
|
||||
return;
|
||||
}else {
|
||||
pqsOnlinerateMysql.setDevIndex(IdMappingCache.DevIdMapping.get(temp.getDevIndex()+""));
|
||||
pqsOnlinerateMysql.setDevIndex(DevIdMapping.get(temp.getDevIndex()+""));
|
||||
}
|
||||
pqsOnlinerateMysql.setTimeId(temp.getTimeid());
|
||||
pqsOnlinerateMysql.setOnlineMin(temp.getOnlinemin());
|
||||
|
||||
@@ -3,7 +3,9 @@ package com.njcn.influx.controller;
|
||||
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.njcn.influx.bo.po.JobDetailInfluxDB;
|
||||
import com.njcn.influx.config.IdMappingCache;
|
||||
import com.njcn.influx.service.JobDetailInfluxDBService;
|
||||
import com.njcn.oracle.bo.param.DataAsynParam;
|
||||
import com.njcn.oracle.bo.param.JobQueryParam;
|
||||
import com.njcn.oracle.bo.po.JobDetail;
|
||||
import com.njcn.oracle.service.JobDetailService;
|
||||
@@ -13,10 +15,7 @@ import io.swagger.annotations.ApiOperation;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
/**
|
||||
* Description:
|
||||
@@ -34,6 +33,7 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
public class JobDetailInfluxDBController {
|
||||
|
||||
private final JobDetailInfluxDBService jobDetailInfluxDBService;
|
||||
private final IdMappingCache idMappingCache;
|
||||
@PostMapping("/jobQuery")
|
||||
@ApiOperation("任务查询")
|
||||
@ApiImplicitParam(name = "jobQueryParam", value = "任务查询参数", required = true)
|
||||
@@ -48,4 +48,12 @@ public class JobDetailInfluxDBController {
|
||||
boolean b = jobDetailInfluxDBService.jobRemove(jobDetail);
|
||||
return b;
|
||||
}
|
||||
|
||||
@GetMapping("/refreshIdCache")
|
||||
@ApiOperation("刷新缓存")
|
||||
public Boolean refreshIdCache() {
|
||||
DataAsynParam dataAsynParam = new DataAsynParam();
|
||||
idMappingCache.init();
|
||||
return true;// HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, "数据同步");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#文件位置配置
|
||||
|
||||
business:
|
||||
#分片次数,一定为24的约数,1 2 3 4 6 8 12 24
|
||||
slice: 4
|
||||
@@ -87,6 +87,19 @@ spring:
|
||||
username: root
|
||||
password: njcnpqs
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
redis:
|
||||
database: 15
|
||||
host: 192.168.1.22
|
||||
port: 16379
|
||||
password: njcnpqs
|
||||
timeout: 5000
|
||||
lettuce:
|
||||
pool:
|
||||
max-active: 8
|
||||
max-wait: -1
|
||||
max-idle: 8
|
||||
min-idle: 0
|
||||
#不做限制的参数配置
|
||||
#mybatis配置信息
|
||||
mybatis-plus:
|
||||
#别名扫描
|
||||
|
||||
Reference in New Issue
Block a user