离线数据上传入库
This commit is contained in:
@@ -56,12 +56,20 @@ public interface DataParam {
|
||||
|
||||
String wlRecordPath = "test/";
|
||||
|
||||
//中断标志
|
||||
List<Integer> intrStr = Arrays.asList(0,26,40,54,68,82);
|
||||
//暂降标志
|
||||
List<Integer> dipStr = Arrays.asList(1,27,41,55,69,83);
|
||||
//暂升标志
|
||||
List<Integer> swlStr = Arrays.asList(13,36,50,64,78,92);
|
||||
//中断标志及ele_epd_pqd表中中断事件ID、NAME
|
||||
List<Short> intrStr = Arrays.asList((short)0,(short)26,(short)40,(short)54,(short)68,(short)82);
|
||||
String intrStrId = "84aed85e7be48d59fcd59e1c8c8622ed";
|
||||
String intrStrName = "Evt_Sys_IntrStr";
|
||||
|
||||
//暂降标志及ele_epd_pqd表中暂降事件ID
|
||||
List<Short> dipStr = Arrays.asList((short)1,(short)27,(short)41,(short)55,(short)69,(short)83);
|
||||
String dipStrId = "145c96b0e2d359f7c1c00312eeeacaec";
|
||||
String dipStrName = "Evt_Sys_DipStr";
|
||||
|
||||
//暂升标志及ele_epd_pqd表中暂升事件ID
|
||||
List<Short> swlStr = Arrays.asList((short)13,(short)36,(short)50,(short)64,(short)78,(short)92);
|
||||
String swlStrId = "f9be85a431084a729748eee786ee1450";
|
||||
String swlStrName = "Evt_Sys_SwlStr";
|
||||
|
||||
String wlRecordUpload = "Offline_Data_Upload";
|
||||
|
||||
|
||||
@@ -3,21 +3,34 @@ package com.njcn.csdevice.service.impl;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.csdevice.api.CsLineFeignClient;
|
||||
import com.njcn.csdevice.api.EquipmentFeignClient;
|
||||
import com.njcn.csdevice.constant.DataParam;
|
||||
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
|
||||
import com.njcn.csdevice.mapper.PortableOfflLogMapper;
|
||||
import com.njcn.csdevice.pojo.dto.CsEquipmentDeliveryDTO;
|
||||
import com.njcn.csdevice.pojo.po.PortableOfflLog;
|
||||
import com.njcn.csdevice.service.IPortableOfflLogService;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.csdevice.param.UploadDataParam;
|
||||
import com.njcn.csharmonic.api.EventFeignClient;
|
||||
import com.njcn.csharmonic.api.OfflineDataUploadFeignClient;
|
||||
import com.njcn.csharmonic.offline.constant.OfflineConstant;
|
||||
import com.njcn.csharmonic.offline.log.vo.NewHeadTaglogbuffer;
|
||||
import com.njcn.csharmonic.offline.log.vo.NewTaglogbuffer;
|
||||
import com.njcn.csharmonic.offline.mincfg.tagComtradeCfg;
|
||||
import com.njcn.csharmonic.offline.vo.Response;
|
||||
import com.njcn.csharmonic.pojo.po.CsEventPO;
|
||||
import com.njcn.influx.imapper.PqdDataMapper;
|
||||
import com.njcn.influx.pojo.po.cs.PqdData;
|
||||
import com.njcn.influx.query.InfluxQueryWrapper;
|
||||
import com.njcn.influx.utils.InfluxDbUtils;
|
||||
import com.njcn.oss.utils.FileStorageUtil;
|
||||
import com.njcn.system.api.DicDataFeignClient;
|
||||
import com.njcn.system.api.EleEvtFeignClient;
|
||||
import com.njcn.system.enums.DicDataEnum;
|
||||
import com.njcn.system.pojo.po.DictData;
|
||||
import com.njcn.system.pojo.po.EleEvtParm;
|
||||
import com.njcn.web.pojo.param.BaseParam;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -28,6 +41,8 @@ import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
@@ -47,6 +62,16 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
|
||||
|
||||
private final OfflineDataUploadFeignClient offlineDataUploadFeignClient;
|
||||
|
||||
private final EleEvtFeignClient eleEvtFeignClient;
|
||||
|
||||
private final EventFeignClient eventFeignClient;
|
||||
|
||||
private final EquipmentFeignClient equipmentFeignClient;
|
||||
|
||||
private final CsLineFeignClient csLineFeignClient;
|
||||
|
||||
private final PqdDataMapper pqdDataMapper;
|
||||
|
||||
@Override
|
||||
public Page<PortableOfflLog> queryPage(BaseParam baseParam) {
|
||||
Page<PortableOfflLog> returnpage = new Page<> (baseParam.getPageNum(), baseParam.getPageSize ());
|
||||
@@ -95,6 +120,8 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
|
||||
}
|
||||
}
|
||||
|
||||
List<CsEquipmentDeliveryDTO> data1 = equipmentFeignClient.queryDeviceById(Stream.of(uploadDataParam.getDevId()).collect(Collectors.toList())).getData();
|
||||
|
||||
//min文件夹下的文件是否解析过
|
||||
boolean minFlag = true;
|
||||
//开始上传文件、记录上传日志、解析的文件结果入库
|
||||
@@ -115,40 +142,90 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
|
||||
//min解析结果较为特殊所有的文件解析后只有一条结果
|
||||
if(OfflineConstant.MIN.equals(response.getFilename()) && checkPrevsFolder(file.getOriginalFilename(),OfflineConstant.MIN)!=null){
|
||||
List<PqdData> pqdData = (List<PqdData>) response.getObj();
|
||||
//如果明确返回了state 那么当前文件解析出错
|
||||
if(response.getState() != null){
|
||||
portableOfflLog.setState(response.getState());
|
||||
}
|
||||
if(pqdData != null && !pqdData.isEmpty()){
|
||||
//否则正常标记为成功解析
|
||||
portableOfflLog.setState(1);
|
||||
portableOfflLog.setAllCount(pqdData.size());
|
||||
portableOfflLog.setRealCount(pqdData.size());
|
||||
if(minFlag){
|
||||
//高频谐波数据入库
|
||||
//pqdDataMapper.insertBatch(pqdData);
|
||||
//min结果集解析入库后就不需要在解析了
|
||||
minFlag = false;
|
||||
portableOfflLog.setRealCount(pqdData.size());
|
||||
}
|
||||
}
|
||||
}else if(file.getOriginalFilename().equals(response.getFilename())){
|
||||
//判断当前解析的数据属于哪种结果(目前来说三种:comtrade、log、min)
|
||||
if(checkPrevsFolder(file.getOriginalFilename(),OfflineConstant.COMTRADE)!=null){
|
||||
//判断当前解析的数据属于哪种结果(目前来说三种:comtrade、log、min(除外))
|
||||
if(checkPrevsFolder(file.getOriginalFilename(),OfflineConstant.COMTRADE)!=null){ //cfg
|
||||
//如果明确返回了state 那么当前文件解析出错
|
||||
if(response.getState() != null){
|
||||
portableOfflLog.setState(response.getState());
|
||||
}
|
||||
tagComtradeCfg tagComtradeCfg = (tagComtradeCfg) response.getObj();
|
||||
if(tagComtradeCfg != null){
|
||||
//否则正常标记为成功解析
|
||||
portableOfflLog.setState(1);
|
||||
portableOfflLog.setAllCount(1);
|
||||
portableOfflLog.setRealCount(1);
|
||||
//cfg入库
|
||||
}
|
||||
}
|
||||
if(checkPrevsFolder(file.getOriginalFilename(),OfflineConstant.LOG)!=null){
|
||||
if(checkPrevsFolder(file.getOriginalFilename(),OfflineConstant.LOG)!=null){ //事件
|
||||
//如果明确返回了state 那么当前文件解析出错
|
||||
if(response.getState() != null){
|
||||
portableOfflLog.setState(response.getState());
|
||||
}
|
||||
List<NewTaglogbuffer> newTaglogbuffers = (List<NewTaglogbuffer>) response.getObj();
|
||||
if(newTaglogbuffers != null && !newTaglogbuffers.isEmpty()){
|
||||
//否则正常标记为成功解析
|
||||
portableOfflLog.setState(1);
|
||||
portableOfflLog.setAllCount(newTaglogbuffers.size());
|
||||
portableOfflLog.setRealCount(newTaglogbuffers.size());
|
||||
|
||||
List<CsEventPO> csEventPOS = new ArrayList<>();
|
||||
//事件解析入库
|
||||
for(NewTaglogbuffer newTaglogbuffer : newTaglogbuffers){
|
||||
NewHeadTaglogbuffer newHeadTaglogbuffer = newTaglogbuffer.getNewHeadTaglogbuffer();
|
||||
String strId = "";
|
||||
String strName = "";
|
||||
//中断标志
|
||||
if(DataParam.intrStr.contains(newHeadTaglogbuffer.getLogCode())){
|
||||
strId = DataParam.intrStrId;
|
||||
strName = DataParam.intrStrName;
|
||||
}
|
||||
//暂降标志
|
||||
if(DataParam.dipStr.contains(newHeadTaglogbuffer.getLogCode())){
|
||||
strId = DataParam.dipStrId;
|
||||
strName = DataParam.dipStrName;
|
||||
}
|
||||
//暂升标志
|
||||
if(DataParam.swlStr.contains(newHeadTaglogbuffer.getLogCode())){
|
||||
strId = DataParam.swlStrId;
|
||||
strName = DataParam.swlStrName;
|
||||
}
|
||||
List<EleEvtParm> eleEvtParms = eleEvtFeignClient.queryByPid(strId).getData();
|
||||
//插入事件表
|
||||
CsEventPO csEventPO = new CsEventPO();
|
||||
csEventPO.setId(IdUtil.simpleUUID());
|
||||
csEventPO.setLineId(uploadDataParam.getLineId());
|
||||
csEventPO.setDeviceId(uploadDataParam.getDevId());
|
||||
csEventPO.setStartTime(newTaglogbuffer.getStart());
|
||||
csEventPO.setTag(strName);
|
||||
//默认暂态事件
|
||||
csEventPO.setType(0);
|
||||
csEventPO.setClDid(getClDidByLineId(uploadDataParam.getLineId()));
|
||||
//默认告警等级2
|
||||
csEventPO.setLevel(2);
|
||||
//默认事件发生位置:电网侧
|
||||
csEventPO.setLocation("grid");
|
||||
csEventPO.setProcess(data1.get(0).getProcess());
|
||||
csEventPOS.add(csEventPO);
|
||||
}
|
||||
//eventFeignClient.saveBatchEventList(csEventPOS);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -157,6 +234,23 @@ public class PortableOfflLogServiceImpl extends ServiceImpl<PortableOfflLogMappe
|
||||
}
|
||||
}
|
||||
|
||||
private Integer getClDidByLineId(String lineId){
|
||||
String position = csLineFeignClient.getPositionById(lineId).getData();
|
||||
if (Objects.isNull(position)){
|
||||
throw new BusinessException(AlgorithmResponseEnum.POSITION_ERROR);
|
||||
}
|
||||
String clDid = null;
|
||||
String areaCode = dicDataFeignClient.getDicDataById(position).getData().getCode();
|
||||
if (Objects.equals(areaCode, DicDataEnum.OUTPUT_SIDE.getCode())){
|
||||
clDid = "0";
|
||||
} else if (Objects.equals(areaCode, DicDataEnum.GRID_SIDE.getCode())){
|
||||
clDid = "1";
|
||||
} else if (Objects.equals(areaCode, DicDataEnum.LOAD_SIDE.getCode())){
|
||||
clDid = "2";
|
||||
}
|
||||
return clDid == null ? null : Integer.parseInt(clDid);
|
||||
}
|
||||
|
||||
//根据文件全路径(包含文件夹)解析文件的分类
|
||||
private String checkPrevsFolder(String filePath,String type){
|
||||
List<String> paths = Arrays.asList(filePath.split("/"));
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.njcn.csharmonic.api;
|
||||
|
||||
import com.njcn.common.pojo.constant.ServerInfo;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.csharmonic.api.fallback.EventFeignClientFallbackFactory;
|
||||
import com.njcn.csharmonic.pojo.po.CsEventPO;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author gfh
|
||||
*/
|
||||
@FeignClient(value = ServerInfo.CS_HARMONIC_BOOT, path = "/event", fallbackFactory = EventFeignClientFallbackFactory.class,contextId = "event")
|
||||
public interface EventFeignClient {
|
||||
|
||||
@PostMapping("/saveBatchEventList")
|
||||
HttpResult<String> saveBatchEventList(@RequestBody List<CsEventPO> csEventPOS);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.njcn.csharmonic.api.fallback;
|
||||
|
||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.csharmonic.api.EventFeignClient;
|
||||
import com.njcn.csharmonic.pojo.po.CsEventPO;
|
||||
import feign.hystrix.FallbackFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author gfh
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class EventFeignClientFallbackFactory implements FallbackFactory<EventFeignClient> {
|
||||
@Override
|
||||
public EventFeignClient create(Throwable cause) {
|
||||
//判断抛出异常是否为解码器抛出的业务异常
|
||||
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
|
||||
if (cause.getCause() instanceof BusinessException) {
|
||||
BusinessException businessException = (BusinessException) cause.getCause();
|
||||
}
|
||||
Enum<?> finalExceptionEnum = exceptionEnum;
|
||||
return new EventFeignClient() {
|
||||
@Override
|
||||
public HttpResult<String> saveBatchEventList(List<CsEventPO> csEventPOS) {
|
||||
log.error("{}异常,降级处理,异常为:{}","获取当天事件未读消息未读消息",cause.toString());
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.common.utils.HttpResultUtil;
|
||||
import com.njcn.csharmonic.param.CsEventUserQueryParam;
|
||||
import com.njcn.csharmonic.pojo.po.CsEventPO;
|
||||
import com.njcn.csharmonic.pojo.vo.CsEventVO;
|
||||
import com.njcn.csharmonic.pojo.vo.EventDetailVO;
|
||||
import com.njcn.csharmonic.service.CsEventPOService;
|
||||
@@ -66,7 +67,13 @@ public class EventController extends BaseController {
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, eventVO, methodDescribe);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@GetMapping("/saveBatchEventList")
|
||||
@ApiOperation("批量新增事件")
|
||||
@ApiImplicitParam(name = "csEventPOS", value = "事件集合", required = true)
|
||||
public HttpResult<String> saveBatchEventList(@RequestBody List<CsEventPO> csEventPOS) {
|
||||
String methodDescribe = getMethodDescribe("saveBatchEventList");
|
||||
csEventPOService.saveBatchEventList(csEventPOS);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,4 +39,6 @@ public interface CsEventPOService extends IService<CsEventPO>{
|
||||
CsEventVO getWavePics(String eventId, int i);
|
||||
|
||||
List<EventDetailVO> queryEventList(CsEventUserQueryParam csEventUserQueryParam);
|
||||
|
||||
void saveBatchEventList(List<CsEventPO> csEventPOS);
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.csharmonic.mapper.CsEventPOMapper;
|
||||
import com.njcn.csharmonic.pojo.po.CsEventPO;
|
||||
import com.njcn.csharmonic.service.CsEventPOService;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -110,6 +111,12 @@ public class CsEventPOServiceImpl extends ServiceImpl<CsEventPOMapper, CsEventPO
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void saveBatchEventList(List<CsEventPO> csEventPOS) {
|
||||
this.saveBatch(csEventPOS);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return WaveDataDTO
|
||||
|
||||
Reference in New Issue
Block a user