事件、波形文件解析功能调整

This commit is contained in:
2023-09-26 20:31:56 +08:00
parent 4808106429
commit b76aed1a56
16 changed files with 474 additions and 189 deletions

View File

@@ -37,4 +37,8 @@ public class AlmDto implements Serializable {
@ApiModelProperty("相别")
private String phase;
@SerializedName("Code")
@ApiModelProperty("告警编码")
private String code;
}

View File

@@ -208,6 +208,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setPhase(apf.getPhase());
}
eleEpdPqdParam.setClassId(classId);
if (!Objects.isNull(apf.getHarmStart())){
if (Objects.equals(apf.getHarmStart(),0.5) && Objects.equals(apf.getHarmEnd(),49.5)){
eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()+49.5));
@@ -215,6 +216,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setHarmStart((int)(apf.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(apf.getHarmEnd()*1.0));
}
}
result.add(eleEpdPqdParam);
}
});
@@ -265,6 +267,12 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEvtParam.setPid(dictId);
eleEvtParam.setData(param.getData());
eleEvtParam.setName(param.getName());
String showName = dataSetName(param.getName());
if (Objects.isNull(showName)){
eleEvtParam.setShowName(param.getName());
} else {
eleEvtParam.setShowName(showName);
}
eleEvtParam.setType(param.getType());
eleEvtParam.setUnit(param.getUnit());
eleEvtFeignClient.add(eleEvtParam);
@@ -287,6 +295,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setShowName(alm.getName());
eleEpdPqdParam.setSort(alm.getIdx());
eleEpdPqdParam.setDataType(id);
//告警code到时候推送给用户告警码+事件时间
eleEpdPqdParam.setDefaultValue(alm.getCode());
if (Objects.isNull(alm.getPhase())){
eleEpdPqdParam.setPhase("M");
} else {
@@ -437,6 +447,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setPhase(epd.getPhase());
}
eleEpdPqdParam.setUnit(epd.getUnit());
if (!Objects.isNull(epd.getHarmStart())){
if (Objects.equals(epd.getHarmStart(),0.5) && Objects.equals(epd.getHarmEnd(),49.5)){
eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()+49.5));
@@ -444,6 +455,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(epd.getHarmEnd()*1.0));
}
}
eleEpdPqdParam.setStatMethod(epd.getStatMethod());
eleEpdPqdParam.setDataType(id);
eleEpdPqdParam.setClassId(classId);
@@ -471,6 +483,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setPhase(pqd.getPhase());
}
eleEpdPqdParam.setUnit(pqd.getUnit());
if (!Objects.isNull(pqd.getHarmStart())){
if (Objects.equals(pqd.getHarmStart(),0.5) && Objects.equals(pqd.getHarmEnd(),49.5)){
eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()+49.5));
@@ -478,6 +491,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmEnd()*1.0));
}
}
eleEpdPqdParam.setDataType(id);
eleEpdPqdParam.setClassId(classId);
result.add(eleEpdPqdParam);
@@ -633,7 +647,12 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
CsWaveParam csWaveParam = new CsWaveParam();
csWaveParam.setPid(dictId);
csWaveParam.setName(param.getName());
String showName = dataSetName(param.getName());
if (Objects.isNull(showName)){
csWaveParam.setShowName(param.getName());
} else {
csWaveParam.setShowName(showName);
}
csWaveParam.setType(param.getType());
csWaveParam.setData(param.getData().toString());
waveFeignClient.add(csWaveParam);
@@ -1008,6 +1027,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
public String dataSetName(String name){
String showName = null;
switch (name) {
//数据集
case "Ds$Apf$Master$01":
showName = "主模块数据";
break;
@@ -1023,12 +1043,74 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
case "Ds$Apf$module$04":
showName = "模块4数据";
break;
case "Ds$Apf$module$05":
showName = "模块5数据";
break;
case "Ds$Apf$module$06":
showName = "模块6数据";
break;
case "Ds$Apf$module$07":
showName = "模块7数据";
break;
case "Ds$Apf$module$08":
showName = "模块8数据";
break;
case "Ds$Pqd$Stat$01":
showName = "电网侧数据";
break;
case "Ds$Pqd$Stat$02":
showName = "负载侧数据";
break;
//波形参数名称
case "Wave_Param_Position":
showName = "录波记录位置";
break;
case "Wave_Param_RcdName":
showName = "录波记录名称";
break;
case "Wave_Param_RcdIdx":
showName = "录波记录序号";
break;
case "Wave_Param_RcdKeepTime":
showName = "录波文件持续时间";
break;
//暂态事件参数名称
case "Evt_Param_Position":
showName = "事件发生位置";
break;
case "Evt_Param_VVa":
showName = "特征幅值";
break;
case "Evt_Param_VVaDepth":
showName = "暂降深度";
break;
case "Evt_Param_Tm":
showName = "持续时间";
break;
case "Evt_Param_Phase":
showName = "相别";
break;
case "Evt_Param_Rms":
showName = "有效值";
break;
case "Evt_Param_UDev":
showName = "电压偏差";
break;
case "Evt_Param_Freq":
showName = "频率";
break;
case "Evt_Param_Thd":
showName = "畸变率";
break;
case "Evt_Param_Con":
showName = "含有率";
break;
case "Evt_Param_ImbNg":
showName = "不平衡度";
break;
case "Evt_Param_Flk":
showName = "闪变";
break;
default:
break;
}

View File

@@ -24,4 +24,6 @@ public class CsEventParam implements Serializable {
private String path;
private String location;
}

View File

@@ -0,0 +1,41 @@
package com.njcn.zlevent.pojo.constant;
/**
* 字典通用常量
* @author xuyang
*/
public interface ZlConstant {
/**
* 事件发生位置
*/
String EVENT_POSITION = "Evt_Param_Position";
/**
* 波形文件位置
*/
String WAVE_POSITION = "Wave_Param_Position";
/**
* 负载侧
*/
String LOAD = "load";
/**
* 电网侧
*/
String GRID = "grid";
/**
* 波形文件名称
*/
String WAVE_NAME = "Wave_Param_RcdName";
/**
* 波形持续时间
*/
String WAVE_PARAM_RCDKEEPTIME = "Wave_Param_RcdKeepTime";
}

View File

@@ -34,4 +34,6 @@ public class FileInfoDto {
private String lineId;
private String location;
}

View File

@@ -32,7 +32,7 @@ public class NoticeUserDto implements Serializable {
@Data
public static class Payload implements Serializable {
@ApiModelProperty("事件类型 0:设备运行事件 1:态事件 2:稳态事件 3:设备告警")
@ApiModelProperty("事件类型 0:暂态事件 1:态事件 2:设备事件 3:设备告警")
@ParamName("type")
private Integer type;
}

View File

@@ -20,4 +20,6 @@ public class WaveTimeDto {
private String endTime;
private String location;
}

View File

@@ -63,5 +63,9 @@ public class CsEventFileLogs {
*/
private LocalDateTime endTime;
/**
* 位置信息(grid:电网侧load:负载侧)
*/
private String location;
}

View File

@@ -78,9 +78,10 @@
<artifactId>cs-harmonic-api</artifactId>
<version>${project.version}</version>
</dependency>
<!--pqs-influx-->
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-influxDB</artifactId>
<artifactId>pqs-influx</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@@ -5,6 +5,7 @@ import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.DependsOn;
/**
@@ -13,6 +14,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
* @date 2021年12月09日 20:59
*/
@Slf4j
@DependsOn("proxyMapperRegister")
@MapperScan("com.njcn.**.mapper")
@EnableFeignClients(basePackages = "com.njcn")
@SpringBootApplication(scanBasePackages = "com.njcn")

View File

@@ -4,6 +4,8 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.zlevent.param.CsEventParam;
import java.util.List;
/**
* <p>
* 暂态事件表 服务类
@@ -17,6 +19,6 @@ public interface ICsEventService extends IService<CsEventPO> {
/**
* 事件添加波形文件地址
*/
void updateCsEvent(CsEventParam csEventParam);
List<String> updateCsEvent(CsEventParam csEventParam);
}

View File

@@ -1,12 +1,20 @@
package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.zlevent.mapper.CsEventMapper;
import com.njcn.zlevent.param.CsEventParam;
import com.njcn.zlevent.service.ICsEventService;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
@@ -17,15 +25,25 @@ import org.springframework.stereotype.Service;
* @since 2023-08-23
*/
@Service
@AllArgsConstructor
public class CsEventServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> implements ICsEventService {
@Override
public void updateCsEvent(CsEventParam csEventParam) {
@Transactional(rollbackFor = Exception.class)
public List<String> updateCsEvent(CsEventParam csEventParam) {
List<String> eventList = new ArrayList<>();
//1.将波形文件关联事件
LambdaUpdateWrapper<CsEventPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId())
.eq(CsEventPO::getDeviceId,csEventParam.getDeviceId())
.eq(CsEventPO::getType,0)
.in(CsEventPO::getType, Arrays.asList(0,1))
.eq(CsEventPO::getLocation,csEventParam.getLocation())
.between(CsEventPO::getStartTime,csEventParam.getStartTime(),csEventParam.getEndTime());
this.update(lambdaUpdateWrapper);
List<CsEventPO> list = this.baseMapper.selectList(lambdaUpdateWrapper);
if (CollectionUtil.isNotEmpty(list)){
eventList = list.stream().map(CsEventPO::getId).collect(Collectors.toList());
}
return eventList;
}
}

View File

@@ -18,6 +18,7 @@ import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveService;
import lombok.AllArgsConstructor;
@@ -60,11 +61,6 @@ public class CsWaveServiceImpl implements ICsWaveService {
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
if (Objects.equals(appEventMessage.getDid(),1)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
} else if (Objects.equals(appEventMessage.getDid(),2)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
//获取波形文件名称
@@ -76,15 +72,23 @@ public class CsWaveServiceImpl implements ICsWaveService {
mid = 1;
}
List<AppEventMessage.Param> paramList = item.getParam();
Object object = paramList.stream().filter(item2 -> "Wave_RcdName".equals(item2.getName())).findFirst().get().getData();
Object object2 = paramList.stream().filter(item2 -> "Wave_RcdKeepTime".equals(item2.getName())).findFirst().get().getData();
Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData();
Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData();
Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData();
if (Objects.equals(object3.toString(),ZlConstant.GRID)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString();
} else if (Objects.equals(object3.toString(),ZlConstant.LOAD)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString();
} else {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
}
String fileName = object.toString().replaceAll("\\[","").replaceAll("]","");
List<String> fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList());
for (String file : fileList) {
file = file.trim();
askFileInfo(appEventMessage.getId(),mid,file);
mid++;
channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId);
channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString());
}
}
}
@@ -110,7 +114,7 @@ public class CsWaveServiceImpl implements ICsWaveService {
/**
* 时间处理
*/
public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId) {
public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId, String location) {
long endTime;
// 将startTime减去8小时8 * 3600秒
time -= 8 * 3600;
@@ -133,6 +137,7 @@ public class CsWaveServiceImpl implements ICsWaveService {
waveTimeDto.setEndTime(formatTime2 + "." + finalTime);
waveTimeDto.setDeviceId(deviceId);
waveTimeDto.setLineId(lineId);
waveTimeDto.setLocation(location);
redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,60L);
}

View File

@@ -7,13 +7,11 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.api.EventLogsFeignClient;
import com.njcn.csdevice.pojo.po.CsEventSendMsg;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.csharmonic.pojo.po.CsEventUserPO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
@@ -23,16 +21,13 @@ import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.system.pojo.po.DictData;
import com.njcn.user.api.AppInfoSetFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import com.njcn.user.pojo.po.app.AppInfoSet;
import com.njcn.zlevent.pojo.dto.NoticeUserDto;
import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.po.CsEventLogs;
import com.njcn.zlevent.service.ICsEventLogsService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.ICsEventUserService;
import com.njcn.zlevent.service.IEventService;
import com.njcn.zlevent.utils.SendEventUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
@@ -40,21 +35,13 @@ import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.yaml.snakeyaml.scanner.Constant;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 类的介绍:
@@ -86,37 +73,27 @@ public class EventServiceImpl implements IEventService {
private final CsDeviceUserFeignClient csDeviceUserFeignClient;
private final AppInfoSetFeignClient appInfoSetFeignClient;
private final UserFeignClient userFeignClient;
private final EventLogsFeignClient eventLogsFeignClient;
private final ICsEventLogsService csEventLogsService;
private final SendEventUtils sendEventUtils;
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppEventMessage appEventMessage) {
//todo 这边到时候装置事件、暂态事件需要分开处理
List<CsEventPO> list1 = new ArrayList<>();
List<String> records = new ArrayList<String>();
List<CsEventUserPO> list2 = new ArrayList<>();
//获取监测点id
String lineId = null,id = null,tag = null;;
String lineId = null,id = null,tag = null;
LocalDateTime eventTime = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
//判断字典数据是否存在
if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){
saveData();
}
//判断监测点是否存在
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
if (Objects.equals(appEventMessage.getDid(),1)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
} else if (Objects.equals(appEventMessage.getDid(),2)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
try {
@@ -127,17 +104,21 @@ public class EventServiceImpl implements IEventService {
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
csEvent.setLineId(lineId);
csEvent.setDeviceId(deviceId);
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(eventTime);
tag = item.getName();
csEvent.setTag(tag);
if (Objects.equals(item.getType(),"2")){
csEvent.setType(0);
} else if (Objects.equals(item.getType(),"3")){
csEvent.setType(1);
} else if (Objects.equals(item.getType(),"1")){
csEvent.setType(2);
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
csEvent.setClDid(appEventMessage.getMsg().getClDid());
//todo 默认等级先设为1.后期根据事件再做调整
csEvent.setLevel(1);
list1.add(csEvent);
}
csEvent.setLevel(Integer.parseInt(item.getType()));
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
if (!Objects.isNull(item.getParam())){
@@ -147,7 +128,20 @@ public class EventServiceImpl implements IEventService {
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
if (Objects.equals(ZlConstant.EVENT_POSITION,param.getName())){
if (Objects.equals(param.getData(),ZlConstant.GRID)){
fields.put(param.getName(),"电网侧");
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString();
csEvent.setClDid(1);
} else if (Objects.equals(param.getData(),ZlConstant.LOAD)){
fields.put(param.getName(),"负载侧");
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString();
csEvent.setClDid(2);
}
csEvent.setLocation(param.getData().toString());
} else {
fields.put(param.getName(),param.getData());
}
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
@@ -155,6 +149,8 @@ public class EventServiceImpl implements IEventService {
records.add(batchPoints.lineProtocol());
}
}
csEvent.setLineId(lineId);
list1.add(csEvent);
//事件用户关系入库
list2 = deviceUserList(deviceId,id);
//事件处理日志库
@@ -179,29 +175,9 @@ public class EventServiceImpl implements IEventService {
if (CollectionUtil.isNotEmpty(records)) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
//todo 根据不同事件需要做处理,目前先测试消息通知
List<User> userList = getEventUser(deviceId);
if (CollectionUtil.isNotEmpty(userList)){
List<CsEventSendMsg> csEventSendMsgList = new ArrayList<>();
List<String> devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList());
String content = appEventMessage.getId() + "" +eventTime+ "发生暂态事件";
sendEventToUser(devCodeList,"暂态事件",content,1);
//记录推送日志
for (User item : userList) {
CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
csEventSendMsg.setUserId(item.getId());
csEventSendMsg.setEventId(id);
csEventSendMsg.setSendTime(LocalDateTime.now());
if (Objects.isNull(item.getDevCode())){
csEventSendMsg.setStatus(0);
csEventSendMsg.setRemark("用户设备识别码为空");
} else {
csEventSendMsg.setDevCode(item.getDevCode());
csEventSendMsg.setStatus(1);
}
csEventSendMsgList.add(csEventSendMsg);
}
eventLogsFeignClient.addLogs(csEventSendMsgList);
//推送事件逻辑处理
for (AppEventMessage.DataArray item : dataArray) {
sendEventUtils.sendUser(1,item.getType(),deviceId,item.getName(),eventTime,appEventMessage.getId(),id);
}
} catch (Exception e) {
CsEventLogs csEventLogs = new CsEventLogs();
@@ -289,78 +265,4 @@ public class EventServiceImpl implements IEventService {
});
return result;
}
/**
* 获取需要通知暂态事件的用户
*/
public List<User> getEventUser(String devId) {
List<User> users = new ArrayList<>();
List<String> result = new ArrayList<>();
List<String> devCode = new ArrayList<>();
//获取设备下主用户和子用户集合
List<String> list = csDeviceUserFeignClient.findUserById(devId).getData();
//查询哪些用户打开了事件提示
if (CollectionUtil.isNotEmpty(list)){
List<AppInfoSet> appInfoSet = appInfoSetFeignClient.getListById(list).getData();
result = appInfoSet.stream()
.filter(person -> person.getEventInfo() == 1)
.map(AppInfoSet::getUserId).collect(Collectors.toList());
}
if (CollectionUtil.isNotEmpty(list)){
users = userFeignClient.getUserByIdList(result).getData();
}
return users;
}
/**
* 发送通知消息
*/
public void sendEventToUser(List<String> devCodeList, String title, String content, Integer type) {
try {
if (CollectionUtil.isNotEmpty(devCodeList)){
// 创建一个URL对象指定目标HTTPS接口地址
URL url = new URL("https://fc-mp-b46c4dff-7244-4f7c-ae8b-7c1194d8cce8.next.bspapp.com/push");
// 打开HTTPS连接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法为POST
connection.setRequestMethod("POST");
// 设置请求头指定Content-Type为application/json
connection.setRequestProperty("Content-Type", "application/json");
// 启用输出流以发送JSON数据
connection.setDoOutput(true);
// 创建JSON
NoticeUserDto noticeUserDto = new NoticeUserDto();
noticeUserDto.setPushClientId(devCodeList);
noticeUserDto.setTitle(title);
noticeUserDto.setContent(content);
NoticeUserDto.Payload payload = new NoticeUserDto.Payload();
payload.setType(type);
noticeUserDto.setPayload(payload);
// 将JSON数据写入输出流
OutputStream outputStream = connection.getOutputStream();
System.out.println("json==:" + new Gson().toJson(noticeUserDto));
outputStream.write(new Gson().toJson(noticeUserDto).getBytes(StandardCharsets.UTF_8));
outputStream.flush();
outputStream.close();
// 获取响应代码
int responseCode = connection.getResponseCode();
System.out.println("Response Code: " + responseCode);
// 读取响应数据
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
StringBuilder response = new StringBuilder();
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
}
reader.close();
// 打印响应内容
System.out.println("Response Content: ");
System.out.println(response.toString());
// 关闭连接
connection.disconnect();
}
} catch (IOException e) {
e.getMessage();
}
}
}

View File

@@ -1,5 +1,6 @@
package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
@@ -11,6 +12,7 @@ import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csharmonic.api.WavePicFeignClient;
import com.njcn.mq.message.AppFileMessage;
import com.njcn.oss.constant.GeneralConstant;
import com.njcn.oss.constant.OssPath;
@@ -35,10 +37,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.*;
/**
* 类的介绍:
@@ -64,6 +63,8 @@ public class FileServiceImpl implements IFileService {
private final ICsEventFileLogsService csEventLogsService;
private final WavePicFeignClient wavePicFeignClient;
@Override
public void analysisFileInfo(AppFileMessage appFileMessage) {
if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){
@@ -100,6 +101,7 @@ public class FileServiceImpl implements IFileService {
fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck());
fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType());
fileInfoDto.setLocation(waveTimeDto.getLocation());
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto, 3600L);
redisUtil.delete(AppRedisKey.TIME+fileName);
} else {
@@ -139,12 +141,13 @@ public class FileServiceImpl implements IFileService {
redisUtil.delete(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
correlateEvents(fileInfoDto,filePath);
List<String> eventList = correlateEvents(fileInfoDto,filePath);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList)){
eventList.forEach(wavePicFeignClient::getWavePics);
}
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
csEventLogs.setCompleteTime(LocalDateTime.now());
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
} else {
//缓存文件
map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
@@ -152,9 +155,6 @@ public class FileServiceImpl implements IFileService {
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
csEventLogs.setCompleteTime(LocalDateTime.now());
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
}
} else {
//分帧传递数据,需要校验收到的文件个数
@@ -167,12 +167,13 @@ public class FileServiceImpl implements IFileService {
redisUtil.delete(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
correlateEvents(fileInfoDto,filePath);
List<String> eventList = correlateEvents(fileInfoDto,filePath);
//波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList)){
eventList.forEach(wavePicFeignClient::getWavePics);
}
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!");
csEventLogs.setCompleteTime(LocalDateTime.now());
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
} else {
//缓存
fileStreamDto = new FileStreamDto();
@@ -180,11 +181,12 @@ public class FileServiceImpl implements IFileService {
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
}
}
csEventLogs.setCompleteTime(LocalDateTime.now());
csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
}
}
csEventLogs.setLocation(fileInfoDto.getLocation());
//记录日志
csEventLogsService.save(csEventLogs);
} else {
@@ -200,6 +202,7 @@ public class FileServiceImpl implements IFileService {
}
}
/**
* 请求文件流信息
*/
@@ -258,8 +261,8 @@ public class FileServiceImpl implements IFileService {
} else {
byteArray = bytes;
}
InputStream inputStream = new ByteArrayInputStream(byteArray);
//todo 此处需要做文件crc校验或者md5校验目前不知道怎么处理先放一下
InputStream inputStream = new ByteArrayInputStream(byteArray);
String path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH,fileName);
try {
inputStream.close();
@@ -296,13 +299,14 @@ public class FileServiceImpl implements IFileService {
/**
* 波形文件关联事件
*/
public void correlateEvents(FileInfoDto fileInfoDto,String path) {
public List<String> correlateEvents(FileInfoDto fileInfoDto, String path) {
CsEventParam csEventParam = new CsEventParam();
csEventParam.setLineId(fileInfoDto.getLineId());
csEventParam.setDeviceId(fileInfoDto.getDeviceId());
csEventParam.setStartTime(fileInfoDto.getStartTime());
csEventParam.setEndTime(fileInfoDto.getEndTime());
csEventParam.setPath(path);
csEventService.updateCsEvent(csEventParam);
csEventParam.setLocation(fileInfoDto.getLocation());
return csEventService.updateCsEvent(csEventParam);
}
}

View File

@@ -0,0 +1,214 @@
package com.njcn.zlevent.utils;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.EventLogsFeignClient;
import com.njcn.csdevice.pojo.po.CsEventSendMsg;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.user.api.AppInfoSetFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import com.njcn.user.pojo.po.app.AppInfoSet;
import com.njcn.zlevent.pojo.dto.NoticeUserDto;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/25 16:08
*/
@Slf4j
@Component
public class SendEventUtils {
@Resource
private UserFeignClient userFeignClient;
@Resource
private CsDeviceUserFeignClient csDeviceUserFeignClient;
@Resource
private AppInfoSetFeignClient appInfoSetFeignClient;
@Resource
private EventLogsFeignClient eventLogsFeignClient;
@Resource
private EpdFeignClient epdFeignClient;
/**
* 事件推送给相关用户
* @param eventType 事件类型 1:事件 2:告警
* @param type 等级 事件分为设备事件、暂态事件、稳态事件 告警分为Ⅰ级告警、Ⅱ级告警、Ⅲ级告警
* @param devId 设备id
* @param eventName 事件名称
* @param eventTime 事件发生事件
* @param ndid 设备ndid
* @param id 事件id
*/
@Transactional(rollbackFor = Exception.class)
public void sendUser(Integer eventType,String type,String devId, String eventName, LocalDateTime eventTime, String ndid, String id) {
List<User> users = new ArrayList<>();
List<String> devCodeList = new ArrayList<>();
List<CsEventSendMsg> csEventSendMsgList = new ArrayList<>();
NoticeUserDto noticeUserDto = new NoticeUserDto();
NoticeUserDto.Payload payload = new NoticeUserDto.Payload();
String content;
eventName = epdFeignClient.findByName(eventName).getData().getShowName();
//事件处理
if (eventType == 1){
switch (type) {
case "1":
//设备自身事件 不推送给用户,推送给业务管理
users = userFeignClient.getAdminInfo().getData();
noticeUserDto.setPushClientId(Collections.singletonList(users.get(0).getDevCode()));
noticeUserDto.setTitle("设备事件");
break;
case "2":
//暂态事件
users = getEventUser(devId);
devCodeList = users.stream().map(User::getDevCode).collect(Collectors.toList());
noticeUserDto.setPushClientId(devCodeList);
noticeUserDto.setTitle("暂态事件");
break;
case "3":
//稳态事件
users = getEventUser(devId);
devCodeList = users.stream().map(User::getDevCode).collect(Collectors.toList());
noticeUserDto.setPushClientId(devCodeList);
noticeUserDto.setTitle("稳态事件");
break;
default:
break;
}
content = ndid + "" +eventTime+ "发生" + eventName;
noticeUserDto.setContent(content);
payload.setType(Integer.parseInt(type));
noticeUserDto.setPayload(payload);
}
//告警处理
else if (eventType == 2){
switch (type) {
case "1":
//Ⅰ级告警 不推送给用户,推送给业务管理
users = userFeignClient.getAdminInfo().getData();
noticeUserDto.setPushClientId(Collections.singletonList(users.get(0).getDevCode()));
break;
case "2":
case "3":
//Ⅱ、Ⅲ级告警推送相关用户
users = getEventUser(devId);
devCodeList = users.stream().map(User::getDevCode).collect(Collectors.toList());
noticeUserDto.setPushClientId(devCodeList);
break;
default:
break;
}
noticeUserDto.setTitle("告警事件");
content = ndid + "" +eventTime+ "发生告警,告警故障编码为:" + eventName;
noticeUserDto.setContent(content);
payload.setType(4);
noticeUserDto.setPayload(payload);
}
sendEventToUser(noticeUserDto);
//记录推送日志
for (User item : users) {
CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
csEventSendMsg.setUserId(item.getId());
csEventSendMsg.setEventId(id);
csEventSendMsg.setSendTime(LocalDateTime.now());
if (Objects.isNull(item.getDevCode())){
csEventSendMsg.setStatus(0);
csEventSendMsg.setRemark("用户设备识别码为空");
} else {
csEventSendMsg.setDevCode(item.getDevCode());
csEventSendMsg.setStatus(1);
}
csEventSendMsgList.add(csEventSendMsg);
}
eventLogsFeignClient.addLogs(csEventSendMsgList);
}
/**
* 发送通知消息
*/
public void sendEventToUser(NoticeUserDto noticeUserDto) {
try {
// 创建一个URL对象指定目标HTTPS接口地址
URL url = new URL("https://fc-mp-b46c4dff-7244-4f7c-ae8b-7c1194d8cce8.next.bspapp.com/push");
// 打开HTTPS连接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法为POST
connection.setRequestMethod("POST");
// 设置请求头指定Content-Type为application/json
connection.setRequestProperty("Content-Type", "application/json");
// 启用输出流以发送JSON数据
connection.setDoOutput(true);
// 将JSON数据写入输出流
OutputStream outputStream = connection.getOutputStream();
outputStream.write(new Gson().toJson(noticeUserDto).getBytes(StandardCharsets.UTF_8));
outputStream.flush();
outputStream.close();
// 获取响应代码
int responseCode = connection.getResponseCode();
log.info("Response Code: " + responseCode);
// 读取响应数据
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
StringBuilder response = new StringBuilder();
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
}
reader.close();
// 打印响应内容
log.info("Response Content: " + response.toString());
// 关闭连接
connection.disconnect();
} catch (IOException e) {
e.getMessage();
}
}
/**
* 获取需要通知暂态事件的用户
*/
public List<User> getEventUser(String devId) {
List<User> users = new ArrayList<>();
List<String> result = new ArrayList<>();
//获取设备下主用户和子用户集合
List<String> list = csDeviceUserFeignClient.findUserById(devId).getData();
//查询哪些用户打开了事件提示
if (CollectionUtil.isNotEmpty(list)){
List<AppInfoSet> appInfoSet = appInfoSetFeignClient.getListById(list).getData();
result = appInfoSet.stream()
.filter(person -> person.getEventInfo() == 1)
.map(AppInfoSet::getUserId).collect(Collectors.toList());
}
if (CollectionUtil.isNotEmpty(list)){
users = userFeignClient.getUserByIdList(result).getData();
}
return users;
}
}