1.eventDetail数据迁移

This commit is contained in:
wr
2024-03-01 16:21:31 +08:00
parent 6f76a4f420
commit 43df281522
10 changed files with 225 additions and 38 deletions

View File

@@ -0,0 +1,43 @@
package com.njcn.influx.bo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/**
*
* @author hongawen
* @since 2021-12-13
*/
@Data
public class DictData {
private static final long serialVersionUID = 1L;
/**
* 字典数据表Id
*/
private String id;
/**
* 字典类型表Id
*/
private String typeId;
/**
* 名称
*/
private String name;
/**
* 编码
*/
private String code;
/**
* 与高级算法内部Id描述对应
*/
private Integer algoDescribe;
}

View File

@@ -81,13 +81,13 @@ public class OracleRmpEventDetailPO implements Serializable {
* 波形文件是否从装置招到本地(0未招1已招)默认值为0 * 波形文件是否从装置招到本地(0未招1已招)默认值为0
*/ */
@TableField(value = "FILEFLAG") @TableField(value = "FILEFLAG")
private Boolean fileFlag; private Integer fileFlag;
/** /**
* 特征值计算标志0未处理1已处理; 2已处理无结果;3计算失败默认值为0 * 特征值计算标志0未处理1已处理; 2已处理无结果;3计算失败默认值为0
*/ */
@TableField(value = "DEALFLAG") @TableField(value = "DEALFLAG")
private Boolean dealFlag; private Integer dealFlag;
/** /**
* 处理结果第一条事件发生时间(读comtra文件获取) * 处理结果第一条事件发生时间(读comtra文件获取)

View File

@@ -82,13 +82,13 @@ public class RmpEventDetailPO implements Serializable {
* 波形文件是否从装置招到本地(0未招1已招)默认值为0 * 波形文件是否从装置招到本地(0未招1已招)默认值为0
*/ */
@TableField(value = "file_flag") @TableField(value = "file_flag")
private Boolean fileFlag; private Integer fileFlag;
/** /**
* 特征值计算标志0未处理1已处理; 2已处理无结果;3计算失败默认值为0 * 特征值计算标志0未处理1已处理; 2已处理无结果;3计算失败默认值为0
*/ */
@TableField(value = "deal_flag") @TableField(value = "deal_flag")
private Boolean dealFlag; private Integer dealFlag;
/** /**
* 处理结果第一条事件发生时间(读comtra文件获取) * 处理结果第一条事件发生时间(读comtra文件获取)

View File

@@ -2,8 +2,11 @@ package com.njcn.influx.mapper;
import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.influx.base.InfluxDbBaseMapper; import com.njcn.influx.bo.po.DictData;
import com.njcn.influx.bo.po.OracleRmpEventDetailPO; import com.njcn.influx.bo.po.OracleRmpEventDetailPO;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/** /**
* data-migration * data-migration
@@ -14,4 +17,5 @@ import com.njcn.influx.bo.po.OracleRmpEventDetailPO;
@DS("master") @DS("master")
public interface OracleRmpEventDetailPOMapper extends BaseMapper<OracleRmpEventDetailPO> { public interface OracleRmpEventDetailPOMapper extends BaseMapper<OracleRmpEventDetailPO> {
List<DictData> selectByDicCodeList(@Param("code") String code);
} }

View File

@@ -2,9 +2,11 @@ package com.njcn.influx.mapper;
import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.influx.base.InfluxDbBaseMapper; import com.njcn.influx.bo.po.DictData;
import com.njcn.influx.bo.po.OracleRmpEventDetailPO;
import com.njcn.influx.bo.po.RmpEventDetailPO; import com.njcn.influx.bo.po.RmpEventDetailPO;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/** /**
* data-migration * data-migration
@@ -15,4 +17,5 @@ import com.njcn.influx.bo.po.RmpEventDetailPO;
@DS("target") @DS("target")
public interface RmpEventDetailPOMapper extends BaseMapper<RmpEventDetailPO> { public interface RmpEventDetailPOMapper extends BaseMapper<RmpEventDetailPO> {
List<DictData> selectByDicCodeList(@Param("code") String code);
} }

View File

@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.influx.mapper.OracleRmpEventDetailPOMapper">
<select id="selectByDicCodeList" resultType="com.njcn.influx.bo.po.DictData">
SELECT
sys_dict_data.DIC_INDEX as id,
sys_dict_data.DIC_TYPE as typeId,
sys_dict_data.DIC_NAME as name,
sys_dict_data.DIC_NUMBER as code,
sys_dict_data.TRIPHASE as algoDescribe
FROM
PQS_DICDATA sys_dict_data,
PQS_DICTYPE sys_dict_type
WHERE
sys_dict_data.DIC_TYPE = sys_dict_type.DICTYPE_INDEX
AND sys_dict_type.DICTYPE_NUMBER = #{code}
ORDER BY DIC_NUMBER
</select>
</mapper>

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.influx.mapper.RmpEventDetailPOMapper">
<select id="selectByDicCodeList" resultType="com.njcn.influx.bo.po.DictData">
SELECT sys_dict_data.*
FROM sys_dict_data sys_dict_data,
sys_dict_type sys_dict_type
WHERE sys_dict_data.type_id = sys_dict_type.id
AND sys_dict_type.code = #{code}
order by sort
</select>
</mapper>

View File

@@ -1,6 +1,8 @@
package com.njcn.influx.service; package com.njcn.influx.service;
import java.time.LocalDate; import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.influx.bo.po.RmpEventDetailPO;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
@@ -9,9 +11,7 @@ import java.time.LocalDateTime;
* @author cdf * @author cdf
* @date 2024/2/19 * @date 2024/2/19
*/ */
public interface OracleEventDetailToMysqlService { public interface OracleEventDetailToMysqlService extends IService<RmpEventDetailPO> {
void eventBatch(LocalDateTime start, LocalDateTime end); void eventBatch(LocalDateTime start, LocalDateTime end);
} }

View File

@@ -1,9 +1,11 @@
package com.njcn.influx.service.impl; package com.njcn.influx.service.impl;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.influx.bo.po.OracleRmpEventDetailPO; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.influx.bo.po.RmpEventDetailPO; import com.njcn.influx.bo.po.*;
import com.njcn.influx.mapper.OracleRmpEventDetailPOMapper; import com.njcn.influx.mapper.OracleRmpEventDetailPOMapper;
import com.njcn.influx.mapper.RmpEventDetailPOMapper; import com.njcn.influx.mapper.RmpEventDetailPOMapper;
import com.njcn.influx.service.OracleEventDetailToMysqlService; import com.njcn.influx.service.OracleEventDetailToMysqlService;
@@ -12,7 +14,11 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/** /**
* data-migration * data-migration
@@ -22,7 +28,8 @@ import java.util.List;
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
@Service @Service
public class OracleEventDetailToMysqlServiceImpl implements OracleEventDetailToMysqlService { @DS("target")
public class OracleEventDetailToMysqlServiceImpl extends ServiceImpl<RmpEventDetailPOMapper, RmpEventDetailPO> implements OracleEventDetailToMysqlService {
private final PqLineBakService pqLineBakService; private final PqLineBakService pqLineBakService;
@@ -30,21 +37,117 @@ public class OracleEventDetailToMysqlServiceImpl implements OracleEventDetailToM
private final RmpEventDetailPOMapper rmpEventDetailPOMapper; private final RmpEventDetailPOMapper rmpEventDetailPOMapper;
/**
* 每小时执行一次该方法
* 1.读取Oracle中暂态事件表信息(PQS_EVENTDETAIL)
* 2.读取Mysql中pq_line_bak表获取Oracle和Mysql中监测点关系
* 3.读取Mysql中r_mp_event_detail
*/
@Override @Override
public void eventBatch(LocalDateTime start, LocalDateTime end){ public void eventBatch(LocalDateTime start, LocalDateTime end){
List<PqLineBak> list = pqLineBakService.list();
//lineId:Oracle监测点ID id:Mysql监测点ID
Map<String, String> oracleRelationMysql = list.stream().collect(Collectors.toMap(PqLineBak::getLineId, PqLineBak::getId));
//获取Oracle字典 暂降类型12 暂降原因13
List<DictData> oracleReason= oracleRmpEventDetailPOMapper.selectByDicCodeList("13");
List<DictData> oracleType= oracleRmpEventDetailPOMapper.selectByDicCodeList("12");
//获取Mysql字典
List<DictData> eventType = rmpEventDetailPOMapper.selectByDicCodeList("Event_Statis");
List<DictData> mysqlReason = rmpEventDetailPOMapper.selectByDicCodeList("Event_Reason");
List<DictData> mysqlType = rmpEventDetailPOMapper.selectByDicCodeList("Event_Type");
//字典类型
Map<String, String> mapReason = getRelationList(oracleReason, mysqlReason);
Map<String, String> mapType = getRelationList(oracleType, mysqlType);
List<RmpEventDetailPO> poList=new ArrayList<>();
List<OracleRmpEventDetailPO> oracleRmpEventDetailPOList = oracleRmpEventDetailPOMapper.selectList(new LambdaQueryWrapper<OracleRmpEventDetailPO>().between(OracleRmpEventDetailPO::getStartTime,start,end)); List<OracleRmpEventDetailPO> oracleRmpEventDetailPOList = oracleRmpEventDetailPOMapper.selectList(new LambdaQueryWrapper<OracleRmpEventDetailPO>().between(OracleRmpEventDetailPO::getStartTime,start,end));
RmpEventDetailPO po;
for (OracleRmpEventDetailPO oracleDetail : oracleRmpEventDetailPOList) {
List<RmpEventDetailPO> poList = BeanUtil.copyToList(oracleRmpEventDetailPOList, RmpEventDetailPO.class); if(oracleRelationMysql.containsKey(oracleDetail.getMeasurementPointId())){
String mysqlLineID = oracleRelationMysql.get(oracleDetail.getMeasurementPointId());
po=new RmpEventDetailPO();
po.setMeasurementPointId(mysqlLineID);
po.setEventType(eventTypeId(oracleDetail.getEventType(),eventType));
System.out.println("55555"); po.setAdvanceReason(getDicId(oracleDetail.getAdvanceReason(),mapReason));
po.setAdvanceType(getDicId(oracleDetail.getAdvanceType(),mapType));
po.setEventassIndex(oracleDetail.getEventassIndex());
po.setDqTime(oracleDetail.getDqTime());
po.setDealTime(oracleDetail.getDealTime());
po.setNum(oracleDetail.getNum());
po.setFileFlag(oracleDetail.getFileFlag());
po.setDealFlag(oracleDetail.getDealFlag());
po.setFirstTime(oracleDetail.getFirstTime());
po.setFirstType(oracleDetail.getFirstType());
po.setFirstMs(oracleDetail.getFirstMs());
po.setEnergy(oracleDetail.getEnergy());
po.setSeverity(oracleDetail.getSeverity());
po.setSagsource(oracleDetail.getSagsource());
po.setStartTime(oracleDetail.getStartTime());
po.setDuration(oracleDetail.getDuration());
po.setFeatureAmplitude(oracleDetail.getFeatureAmplitude());
po.setPhase(oracleDetail.getPhase());
po.setEventDescribe(oracleDetail.getEventDescribe());
po.setWavePath(oracleDetail.getWavePath());
po.setTransientValue(oracleDetail.getTransientValue());
poList.add(po);
}
}
if(CollUtil.isNotEmpty(poList)){
this.remove(new LambdaQueryWrapper<RmpEventDetailPO>().between(RmpEventDetailPO::getStartTime, start, end));
this.saveBatch(poList,500);
}
} }
//获取暂降类型id
private String eventTypeId(String eventType, List<DictData> eventTypeList){
String code="";
//事件类型0扰动1暂降2暂升3中断4其他5录波
if("0".equals(eventType)){
code="Disturbance";
}
if("1".equals(eventType)){
code="Voltage_Dip";
}
if("2".equals(eventType)){
code="Voltage_Rise";
}
if("3".equals(eventType)){
code="Short_Interruptions";
}
if("4".equals(eventType)){
code="Other";
}
if("5".equals(eventType)){
code="Recording_Wave";
}
String finalCode = code;
DictData dictData = eventTypeList.stream().filter(x -> finalCode.equals(x.getCode())).findFirst().get();
if(ObjectUtil.isNotNull(dictData)){
return dictData.getId();
}
return "";
}
//类型匹配
private Map<String,String> getRelationList(List<DictData> oracleReason,List<DictData> mysqlReason){
Map<String,String> map=new HashMap<>();
for (DictData dictData : oracleReason) {
for (DictData data : mysqlReason) {
if(dictData.getAlgoDescribe()==data.getAlgoDescribe()){
map.put(dictData.getId(),data.getId());
}
}
}
return map;
}
private String getDicId(String dicReason,Map<String,String> map){
if(map.containsKey(dicReason)){
return map.get(dicReason);
}
return "";
}
} }

View File

@@ -3,6 +3,7 @@ package com.njcn.influx.job;
import com.njcn.influx.bo.param.TableEnum; import com.njcn.influx.bo.param.TableEnum;
import com.njcn.influx.service.OracleEventDetailToMysqlService; import com.njcn.influx.service.OracleEventDetailToMysqlService;
import com.njcn.influx.service.OracleToInfluxDBService; import com.njcn.influx.service.OracleToInfluxDBService;
import com.njcn.influx.service.impl.OracleEventDetailToMysqlServiceImpl;
import com.njcn.oracle.bo.param.DataAsynParam; import com.njcn.oracle.bo.param.DataAsynParam;
import com.njcn.oracle.bo.param.ServiceTypeEnum; import com.njcn.oracle.bo.param.ServiceTypeEnum;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@@ -64,16 +65,16 @@ public class OracleToInfluxDBJob {
} }
/* @Scheduled(cron="0 55 23 * * ?") // @Scheduled(cron="0 55 23 * * ?")
public void executeEvent() { // public void executeEvent() {
DataAsynParam dataAsynParam = new DataAsynParam(); // // 获取当前时间
// 获取当前时间 // LocalDateTime now = LocalDateTime.now();
LocalDateTime end = LocalDateTime.now(); // // 减去一个小时
// LocalDateTime oneHourAgo = now.minusHours(1);
// 减去24时 // // 将分钟和秒设置为0
LocalDateTime begin = end.minusHours(24); // LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS);
// // 加上59分钟59秒
// LocalDateTime modifiedResult = result.plusMinutes(59).plusSeconds(59);
oracleEventDetailToMysqlService.eventBatch(begin,end); // oracleEventDetailToMysqlService.eventBatch(result,modifiedResult);
}*/ // }
} }