治理暂态事件、文件解析功能

This commit is contained in:
2023-09-07 20:37:25 +08:00
parent 58feaf2aa0
commit d537021ffd
37 changed files with 1466 additions and 179 deletions

View File

@@ -0,0 +1,57 @@
package com.njcn.zlevent.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.mq.message.AppFileMessage;
import com.njcn.web.controller.BaseController;
import com.njcn.zlevent.service.IFileService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/6 9:29
*/
@Slf4j
@RestController
@RequestMapping("/file")
@Api(tags = "文件处理")
@AllArgsConstructor
public class FileController extends BaseController {
private final IFileService fileService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/fileInfo")
@ApiOperation("文件信息")
@ApiImplicitParam(name = "appFileMessage", value = "数据实体", required = true)
public HttpResult<String> fileInfo(@RequestBody AppFileMessage appFileMessage){
String methodDescribe = getMethodDescribe("fileInfo");
fileService.analysisFileInfo(appFileMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/fileStream")
@ApiOperation("解析文件")
@ApiImplicitParam(name = "appFileMessage", value = "数据实体", required = true)
public HttpResult<String> fileStream(@RequestBody AppFileMessage appFileMessage){
String methodDescribe = getMethodDescribe("fileStream");
fileService.analysisFileStream(appFileMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -5,13 +5,16 @@ import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.web.controller.BaseController;
import com.njcn.zlevent.service.ICsWaveService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -29,12 +32,15 @@ import org.springframework.web.bind.annotation.RestController;
@AllArgsConstructor
public class WaveController extends BaseController {
private final ICsWaveService csWaveService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/analysis")
@ApiOperation("录波解析")
@ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true)
public HttpResult<String> analysis(){
@ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true)
public HttpResult<String> analysis(@RequestBody AppEventMessage appEventMessage){
String methodDescribe = getMethodDescribe("analysis");
csWaveService.analysis(appEventMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}

View File

@@ -1,7 +1,7 @@
package com.njcn.zlevent.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.zlevent.pojo.po.CsEvent;
import com.njcn.csharmonic.pojo.po.CsEventPO;
/**
* <p>
@@ -11,6 +11,6 @@ import com.njcn.zlevent.pojo.po.CsEvent;
* @author xuyang
* @since 2023-08-23
*/
public interface CsEventMapper extends BaseMapper<CsEvent> {
public interface CsEventMapper extends BaseMapper<CsEventPO> {
}

View File

@@ -1,16 +1,16 @@
package com.njcn.zlevent.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.zlevent.pojo.po.CsEventDetail;
import com.njcn.csharmonic.pojo.po.CsEventUserPO;
/**
* <p>
* 暂态事件详情 Mapper 接口
* 暂态事件表 Mapper 接口
* </p>
*
* @author xuyang
* @since 2023-08-23
*/
public interface CsEventDetailMapper extends BaseMapper<CsEventDetail> {
public interface CsEventUserMapper extends BaseMapper<CsEventUserPO> {
}

View File

@@ -1,7 +1,8 @@
package com.njcn.zlevent.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.zlevent.pojo.po.CsEvent;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.zlevent.param.CsEventParam;
/**
* <p>
@@ -11,6 +12,11 @@ import com.njcn.zlevent.pojo.po.CsEvent;
* @author xuyang
* @since 2023-08-23
*/
public interface ICsEventService extends IService<CsEvent> {
public interface ICsEventService extends IService<CsEventPO> {
/**
* 事件添加波形文件地址
*/
void updateCsEvent(CsEventParam csEventParam);
}

View File

@@ -1,16 +1,16 @@
package com.njcn.zlevent.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.zlevent.pojo.po.CsEventDetail;
import com.njcn.csharmonic.pojo.po.CsEventUserPO;
/**
* <p>
* 暂态事件详情 服务类
* 暂态事件表 服务类
* </p>
*
* @author xuyang
* @since 2023-08-23
*/
public interface ICsEventDetailService extends IService<CsEventDetail> {
public interface ICsEventUserService extends IService<CsEventUserPO> {
}

View File

@@ -0,0 +1,23 @@
package com.njcn.zlevent.service;
import com.njcn.mq.message.AppEventMessage;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/5 14:52
*/
public interface ICsWaveService {
/**
* 1.获取波形报文的文件名称
* 2.循环获取文件。将文件存储至文件服务器,获取文件路径
* 3.根据波形持续的时间,更新事件里面的暂态波形路径
* @param appEventMessage
*/
void analysis(AppEventMessage appEventMessage);
}

View File

@@ -9,6 +9,10 @@ public interface IEventService {
/**
* 解析事件数据
* 1.解析事件数据入库MySQL
* 2.解析模板数据入库influxDB
* 3.记录用户事件表,将状态默认置为未读取
* 4.查看用户推送的开关是否开启,推送已开启状态的用户
* @param
*/
void analysis(AppEventMessage appEventMessage);

View File

@@ -0,0 +1,29 @@
package com.njcn.zlevent.service;
import com.njcn.mq.message.AppFileMessage;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/6 9:31
*/
public interface IFileService {
/**
* 解析文件流之前需要获取文件的信息,可能要特殊处理
* 1.文件过大要分片获取(单次请求文件大小不超过50k)
* 2.校验文件(md5或者crc)
* @param appFileMessage
*/
void analysisFileInfo(AppFileMessage appFileMessage);
/**
* 获取文件流,解析文件
* @param appFileMessage
*/
void analysisFileStream(AppFileMessage appFileMessage);
}

View File

@@ -1,20 +0,0 @@
package com.njcn.zlevent.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.zlevent.mapper.CsEventDetailMapper;
import com.njcn.zlevent.pojo.po.CsEventDetail;
import com.njcn.zlevent.service.ICsEventDetailService;
import org.springframework.stereotype.Service;
/**
* <p>
* 暂态事件详情表 服务实现类
* </p>
*
* @author xuyang
* @since 2023-08-23
*/
@Service
public class CsEventDetailServiceImpl extends ServiceImpl<CsEventDetailMapper, CsEventDetail> implements ICsEventDetailService {
}

View File

@@ -1,8 +1,11 @@
package com.njcn.zlevent.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.zlevent.mapper.CsEventMapper;
import com.njcn.zlevent.pojo.po.CsEvent;
import com.njcn.zlevent.param.CsEventParam;
import com.njcn.zlevent.service.ICsEventService;
import org.springframework.stereotype.Service;
@@ -15,6 +18,15 @@ import org.springframework.stereotype.Service;
* @since 2023-08-23
*/
@Service
public class CsEventServiceImpl extends ServiceImpl<CsEventMapper, CsEvent> implements ICsEventService {
public class CsEventServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> implements ICsEventService {
@Override
public void updateCsEvent(CsEventParam csEventParam) {
LambdaUpdateWrapper<CsEventPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId())
.eq(CsEventPO::getDeviceId,csEventParam.getDeviceId())
.eq(CsEventPO::getType,0)
.between(CsEventPO::getStartTime,csEventParam.getStartTime(),csEventParam.getEndTime());
this.update(lambdaUpdateWrapper);
}
}

View File

@@ -0,0 +1,21 @@
package com.njcn.zlevent.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.csharmonic.pojo.po.CsEventUserPO;
import com.njcn.zlevent.mapper.CsEventUserMapper;
import com.njcn.zlevent.service.ICsEventUserService;
import org.springframework.stereotype.Service;
/**
* <p>
* 暂态事件表 服务实现类
* </p>
*
* @author xuyang
* @since 2023-08-23
*/
@Service
public class CsEventUserServiceImpl extends ServiceImpl<CsEventUserMapper, CsEventUserPO> implements ICsEventUserService {
}

View File

@@ -0,0 +1,160 @@
package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveService;
import lombok.AllArgsConstructor;
import net.sf.json.JSONObject;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/5 14:52
*/
@Service
@AllArgsConstructor
public class CsWaveServiceImpl implements ICsWaveService {
private final EquipmentFeignClient equipmentFeignClient;
private final MqttPublisher publisher;
private final CsTopicFeignClient csTopicFeignClient;
private final RedisUtil redisUtil;
private final CsLineFeignClient csLineFeignClient;
private final DicDataFeignClient dicDataFeignClient;
@Override
public void analysis(AppEventMessage appEventMessage) {
int mid = 1;
//获取监测点
String lineId = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
if (Objects.equals(appEventMessage.getDid(),1)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
} else if (Objects.equals(appEventMessage.getDid(),2)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
//获取波形文件名称
List<AppEventMessage.DataArray> dataArrayList = appEventMessage.getMsg().getDataArray();
if (CollectionUtil.isNotEmpty(dataArrayList)){
for (AppEventMessage.DataArray item : dataArrayList) {
//处理mid
if (Objects.equals(mid,10000)){
mid = 1;
}
List<AppEventMessage.Param> paramList = item.getParam();
Object object = paramList.stream().filter(item2 -> "Wave_RcdName".equals(item2.getName())).findFirst().get().getData();
Object object2 = paramList.stream().filter(item2 -> "Wave_RcdKeepTime".equals(item2.getName())).findFirst().get().getData();
String fileName = object.toString().replaceAll("\\[","").replaceAll("]","");
List<String> fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList());
for (String file : fileList) {
file = file.trim();
askFileInfo(appEventMessage.getId(),mid,file);
mid++;
channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId);
}
}
}
}
/**
* 询问文件信息
*/
public void askFileInfo(String nDid, Integer mid, String fileName) {
String version = csTopicFeignClient.find(nDid).getData();
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_8.getCode()));
reqAndResParam.setExpire(-1);
String json = "{Name:\""+fileName+"\"}";
JSONObject jsonObject = JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject);
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
}
/**
* 时间处理
*/
public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId) {
// 将startTime减去8小时8 * 3600秒
time -= 8 * 3600;
// 将millisecond转换为长整型并乘以1000以获取微秒
long millisecondValue = millisecond.longValue() * 1000;
// long millisecondValue = Long.parseLong(String.valueOf(millisecond))*1000;
// 计算最终时间
long finalTime = subtleTime + millisecondValue;
// 如果finalTime大于等于1000000将startTime增加1秒finalTime减去1000000
if (finalTime >= 1000000) {
time += 1;
finalTime -= 1000000;
}
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String formatTime = format.format(time * 1000);
WaveTimeDto waveTimeDto = new WaveTimeDto();
waveTimeDto.setStartTime(formatTime + "." + subtleTime);
waveTimeDto.setEndTime(formatTime + "." + finalTime);
waveTimeDto.setDeviceId(deviceId);
waveTimeDto.setLineId(lineId);
redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,60L);
}
/**
* 缓存监测点相关信息
*/
public void lineInfo(String id) {
Map<Integer,String> map = new HashMap<>();
List<CsLinePO> lineList = csLineFeignClient.findByNdid(id).getData();
if (CollectionUtil.isEmpty(lineList)){
throw new BusinessException(StatResponseEnum.LINE_NULL);
}
for (CsLinePO item : lineList) {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.isNull(dictData)){
throw new BusinessException(StatResponseEnum.DICT_NULL);
}
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(2,item.getLineId());
}
}
redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,600L);
}
}

View File

@@ -4,8 +4,15 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsDeviceUserPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.csharmonic.pojo.po.CsEventUserPO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
@@ -13,14 +20,16 @@ import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.system.pojo.po.DictData;
import com.njcn.zlevent.pojo.po.CsEvent;
import com.njcn.zlevent.pojo.po.CsEventDetail;
import com.njcn.zlevent.service.ICsEventDetailService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.ICsEventUserService;
import com.njcn.zlevent.service.IEventService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -28,6 +37,7 @@ import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 类的介绍:
@@ -51,16 +61,27 @@ public class EventServiceImpl implements IEventService {
private final ICsEventService csEventService;
private final ICsEventDetailService csEventDetailService;
private final EquipmentFeignClient equipmentFeignClient;
private final InfluxDbUtils influxDbUtils;
private final ICsEventUserService csEventUserService;
private final CsDeviceUserFeignClient csDeviceUserFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppEventMessage appEventMessage) {
List<CsEvent> list1 = new ArrayList<>();
List<CsEventDetail> list2 = new ArrayList<>();
List<CsEventPO> list1 = new ArrayList<>();
List<String> records = new ArrayList<String>();
List<CsEventUserPO> list2 = new ArrayList<>();
//获取监测点id
String lineId = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
//判断字典数据是否存在
if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){
saveData();
}
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
@@ -69,36 +90,53 @@ public class EventServiceImpl implements IEventService {
} else if (Objects.equals(appEventMessage.getDid(),2)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
//处理事件数据
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) {
String id = IdUtil.fastSimpleUUID();
//事件入库
CsEvent csEvent = new CsEvent();
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
csEvent.setLineId(lineId);
csEvent.setName(item.getName());
csEvent.setShowName(epdFeignClient.findByName(item.getName()).getData().getShowName());
csEvent.setDeviceId(deviceId);
csEvent.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
List<AppEventMessage.Param> params = item.getParam();
for (AppEventMessage.Param item2 : params) {
CsEventDetail csEventDetail = new CsEventDetail();
csEventDetail.setPid(id);
csEventDetail.setName(item2.getName());
csEventDetail.setShowName(epdFeignClient.findByName(item2.getName()).getData().getShowName());
csEventDetail.setType(item2.getType());
csEventDetail.setUnit(item2.getUnit());
if (Objects.equals(item2.getName(),"Evt_VVaPhas")){
csEventDetail.setPhasic(item2.getData().toString());
} else {
csEventDetail.setData(Double.parseDouble(item2.getData().toString()));
}
list2.add(csEventDetail);
}
csEvent.setTag(item.getName());
csEvent.setType(0);
list1.add(csEvent);
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
if (!Objects.isNull(item.getParam())){
String tableName = map.get(item.getName());
List<AppEventMessage.Param> params = item.getParam();
params.forEach(param->{
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
fields.put(param.getName(),param.getData());
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
});
}
//事件用户关系入库
list2 = deviceUserList(deviceId,id);
}
//cs_event入库
if (CollectionUtil.isNotEmpty(list1)){
csEventService.saveBatch(list1);
}
//cs_device_user入库
if (CollectionUtil.isNotEmpty(list2)){
csEventUserService.saveBatch(list2);
}
//evt_data入库
if (CollectionUtil.isNotEmpty(records)) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
csEventService.saveBatch(list1);
csEventDetailService.saveBatch(list2);
}
/**
@@ -126,16 +164,48 @@ public class EventServiceImpl implements IEventService {
redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,600L);
}
/**
* 缓存字典和influxDB表关系
*/
public void saveData() {
Map<String,String> map = new HashMap<>();
List<EpdDTO> list = epdFeignClient.findAll().getData();
if (CollectionUtil.isEmpty(list)){
throw new BusinessException(StatResponseEnum.DICT_NULL);
}
list.forEach(item->{
map.put(item.getDictName(),item.getTableName());
});
redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L);
}
/**
* 时间转换
*/
public LocalDateTime timeFormat(Long time1, Long time2) {
//设置格式
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timeText = format.format(time1 * 1000);
//todo 这边暂时先这样处理,减去8小时。
String timeText = format.format((time1-8*3600) * 1000);
String time = timeText + "." + time2;
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
return LocalDateTime.parse(time, fmt);
}
/**
* 获取用户设备关系
*/
public List<CsEventUserPO> deviceUserList(String devId, String id) {
List<CsEventUserPO> result = new ArrayList<>();
List<String> list = csDeviceUserFeignClient.findUserById(devId).getData();
list.forEach(item->{
CsEventUserPO csEventUser = new CsEventUserPO();
csEventUser.setUserId(item);
csEventUser.setStatus(0);
csEventUser.setEventId(id);
result.add(csEventUser);
});
return result;
}
}

View File

@@ -0,0 +1,282 @@
package com.njcn.zlevent.service.impl;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.mq.message.AppFileMessage;
import com.njcn.oss.constant.GeneralConstant;
import com.njcn.oss.constant.OssPath;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.zlevent.param.CsEventParam;
import com.njcn.zlevent.pojo.dto.FileInfoDto;
import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.IFileService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/6 9:32
*/
@Service
@AllArgsConstructor
@Slf4j
public class FileServiceImpl implements IFileService {
private final RedisUtil redisUtil;
private final CsTopicFeignClient csTopicFeignClient;
private final MqttPublisher publisher;
private final FileStorageUtil fileStorageUtil;
private final ICsEventService csEventService;
@Override
public void analysisFileInfo(AppFileMessage appFileMessage) {
if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){
int mid = 1;
int range = 51200;
Integer fileSize = appFileMessage.getMsg().getFileInfo().getFileSize();
String fileName = appFileMessage.getMsg().getFileInfo().getName();
//缓存文件信息用于文件流拼接
FileInfoDto fileInfoDto = new FileInfoDto();
//文件流请求 判断文件大小是否需要分片请求,单次文件大小为50k
if (fileSize <= range){
askFileStream(appFileMessage.getId(),mid,fileName,0,fileSize);
fileInfoDto.setNumber(1);
} else {
int total = (int)Math.ceil(fileSize*1.0/range) ;
for (int i = 0; i < total; i++) {
askFileStream(appFileMessage.getId(),mid,fileName,i*range,range-1);
mid++;
}
fileInfoDto.setNumber(mid-1);
}
//获取波形文件起始结束时间
Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.TIME+fileName);
if (Objects.isNull(fileInfo)){
throw new BusinessException(AccessResponseEnum.WAVE_INFO_MISSING);
}
WaveTimeDto waveTimeDto = JSON.parseObject(JSON.toJSONString(fileInfo), WaveTimeDto.class);
fileInfoDto.setStartTime(waveTimeDto.getStartTime());
fileInfoDto.setEndTime(waveTimeDto.getEndTime());
fileInfoDto.setDeviceId(waveTimeDto.getDeviceId());
fileInfoDto.setLineId(waveTimeDto.getLineId());
fileInfoDto.setName(appFileMessage.getMsg().getFileInfo().getName());
fileInfoDto.setFileTime(appFileMessage.getMsg().getFileInfo().getFileTime());
fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck());
fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType());
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto, 3600L);
redisUtil.delete(AppRedisKey.TIME+fileName);
} else {
throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void analysisFileStream(AppFileMessage appFileMessage) {
//todo 目前文件先只处理暂态事件的,后续有其他文件再做处理
String fileName = appFileMessage.getMsg().getName();
if(fileName.contains(".cfg") || fileName.contains(".dat")) {
FileStreamDto fileStreamDto = new FileStreamDto();
String filePath;
Map<Integer,String> map = new HashMap<>();
//获取缓存的文件信息
Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class);
//文件流
Object object = redisUtil.getObjectByKey(fileName);
/*
* 文件解析存储逻辑
* 1.如果文件只有1帧那就直接解析文件流
* 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件
*/
if (Objects.isNull(object)){
//第一次录入
if(fileInfoDto.getNumber() == 1) {
//直接解析文件
filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId());
log.info(fileName + "解析成功");
redisUtil.delete(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
correlateEvents(fileInfoDto,filePath);
} else {
//缓存文件
map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
fileStreamDto.setMap(map);
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
}
} else {
//分帧传递数据,需要校验收到的文件个数
fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class);
Map<Integer,String> l1 = fileStreamDto.getMap();
l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
if (l1.size() == fileInfoDto.getNumber()){
//对数据进行排序
// 将Map的Entry集合转换成List
List<Map.Entry<Integer, String>> entryList = new ArrayList<>(l1.entrySet());
// 使用Comparator按Key进行排序
entryList.sort(new Comparator<Map.Entry<Integer, String>>() {
@Override
public int compare(Map.Entry<Integer, String> entry1, Map.Entry<Integer, String> entry2) {
return entry1.getKey().compareTo(entry2.getKey());
}
});
//解析文件
filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId());
log.info(fileName + "解析成功");
redisUtil.delete(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
correlateEvents(fileInfoDto,filePath);
} else {
//缓存
fileStreamDto = new FileStreamDto();
fileStreamDto.setMap(l1);
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
}
}
//记录日志
} else {
//todo 处理其他文件
log.info("暂未做其他文件处理");
}
}
/**
* 请求文件流信息
*/
public void askFileStream(String nDid, Integer mid, String fileName, Integer offset, Integer len) {
String version = csTopicFeignClient.find(nDid).getData();
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode()));
reqAndResParam.setExpire(-1);
String json = "{Name:\""+fileName+"\",Offset:"+offset+",Len:"+len+"}";
JSONObject jsonObject = JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject);
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
}
/**
* 组装文件
*/
public String fileStream(Integer number, Map<Integer,String> map, String data, String fileName, String nDid) {
String filePath;
if (number == 1){
filePath = stream(true,data,nDid,fileName,null);
} else {
int lengthByte = 0;
for (int i = 1; i <= number; i++) {
byte[] byteArray = Base64.getDecoder().decode(map.get(i));
lengthByte += byteArray.length;
}
byte[] allByte = new byte[lengthByte];
int countLength = 0;
for (int i = 1; i <= number; i++) {
byte[] byteArray = Base64.getDecoder().decode(map.get(i));
System.arraycopy(byteArray, 0, allByte, countLength, byteArray.length);
countLength += byteArray.length;
}
filePath = stream(false,null,nDid,fileName,allByte);
}
return filePath;
}
/**
* 解析存储文件信息
*/
public String stream(boolean bool, String stream, String folder, String fileName, byte[] bytes) {
byte[] byteArray = null;
//将文件后缀替换成大写
String[] parts = fileName.split(StrUtil.SLASH);
fileName = parts[parts.length - 1].replaceAll(".cfg", GeneralConstant.CFG).replaceAll(".dat",GeneralConstant.DAT);
//处理文件层级
folder = createPath(folder);
//解析二进制流成byte数组
if (bool){
byteArray = Base64.getDecoder().decode(stream);
} else {
byteArray = bytes;
}
InputStream inputStream = new ByteArrayInputStream(byteArray);
String path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH,fileName);
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
return path;
}
/**
* 组装文件路径
*/
public String createPath(String nDid) {
// 使用StringBuilder来构建新的字符串
StringBuilder output = new StringBuilder();
// 遍历输入字符串,每两个字符分割并添加冒号
for (int i = 0; i < nDid.length(); i += 2) {
if (i > 0) {
//添加横线分隔符
output.append(StrPool.DASHED);
}
if (i + 1 < nDid.length()) {
// 检查是否有足够的字符来分割成两组
// 每两个字符添加到新字符串
output.append(nDid.substring(i, i + 2));
} else {
// 如果只剩一个字符,可以根据需要添加处理逻辑
output.append(nDid.charAt(i));
}
}
return output.toString();
}
/**
* 波形文件关联事件
*/
public void correlateEvents(FileInfoDto fileInfoDto,String path) {
CsEventParam csEventParam = new CsEventParam();
csEventParam.setLineId(fileInfoDto.getLineId());
csEventParam.setDeviceId(fileInfoDto.getDeviceId());
csEventParam.setStartTime(fileInfoDto.getStartTime());
csEventParam.setEndTime(fileInfoDto.getEndTime());
csEventParam.setPath(path);
csEventService.updateCsEvent(csEventParam);
}
}