diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/ZlEventBootApplication.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/ZlEventBootApplication.java
index c5e3ae3..b94df08 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/ZlEventBootApplication.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/ZlEventBootApplication.java
@@ -5,6 +5,7 @@ import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
+import org.springframework.context.annotation.DependsOn;
/**
@@ -13,6 +14,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
* @date 2021年12月09日 20:59
*/
@Slf4j
+@DependsOn("proxyMapperRegister")
@MapperScan("com.njcn.**.mapper")
@EnableFeignClients(basePackages = "com.njcn")
@SpringBootApplication(scanBasePackages = "com.njcn")
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java
index 9a4f092..5996df2 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java
@@ -4,6 +4,8 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.zlevent.param.CsEventParam;
+import java.util.List;
+
/**
*
* 暂态事件表 服务类
@@ -17,6 +19,6 @@ public interface ICsEventService extends IService {
/**
* 事件添加波形文件地址
*/
- void updateCsEvent(CsEventParam csEventParam);
+ List updateCsEvent(CsEventParam csEventParam);
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java
index 22991e7..f13f759 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java
@@ -1,12 +1,20 @@
package com.njcn.zlevent.service.impl;
+import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.zlevent.mapper.CsEventMapper;
import com.njcn.zlevent.param.CsEventParam;
import com.njcn.zlevent.service.ICsEventService;
+import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
/**
*
@@ -17,15 +25,25 @@ import org.springframework.stereotype.Service;
* @since 2023-08-23
*/
@Service
+@AllArgsConstructor
public class CsEventServiceImpl extends ServiceImpl implements ICsEventService {
@Override
- public void updateCsEvent(CsEventParam csEventParam) {
+ @Transactional(rollbackFor = Exception.class)
+ public List updateCsEvent(CsEventParam csEventParam) {
+ List eventList = new ArrayList<>();
+ //1.将波形文件关联事件
LambdaUpdateWrapper lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId())
.eq(CsEventPO::getDeviceId,csEventParam.getDeviceId())
- .eq(CsEventPO::getType,0)
+ .in(CsEventPO::getType, Arrays.asList(0,1))
+ .eq(CsEventPO::getLocation,csEventParam.getLocation())
.between(CsEventPO::getStartTime,csEventParam.getStartTime(),csEventParam.getEndTime());
this.update(lambdaUpdateWrapper);
+ List list = this.baseMapper.selectList(lambdaUpdateWrapper);
+ if (CollectionUtil.isNotEmpty(list)){
+ eventList = list.stream().map(CsEventPO::getId).collect(Collectors.toList());
+ }
+ return eventList;
}
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java
index cdf2117..056bfac 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java
@@ -18,6 +18,7 @@ import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
+import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveService;
import lombok.AllArgsConstructor;
@@ -60,11 +61,6 @@ public class CsWaveServiceImpl implements ICsWaveService {
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
- if (Objects.equals(appEventMessage.getDid(),1)){
- lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
- } else if (Objects.equals(appEventMessage.getDid(),2)){
- lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
- }
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
//获取波形文件名称
@@ -76,15 +72,23 @@ public class CsWaveServiceImpl implements ICsWaveService {
mid = 1;
}
List paramList = item.getParam();
- Object object = paramList.stream().filter(item2 -> "Wave_RcdName".equals(item2.getName())).findFirst().get().getData();
- Object object2 = paramList.stream().filter(item2 -> "Wave_RcdKeepTime".equals(item2.getName())).findFirst().get().getData();
+ Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData();
+ Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData();
+ Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData();
+ if (Objects.equals(object3.toString(),ZlConstant.GRID)){
+ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString();
+ } else if (Objects.equals(object3.toString(),ZlConstant.LOAD)){
+ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString();
+ } else {
+ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
+ }
String fileName = object.toString().replaceAll("\\[","").replaceAll("]","");
List fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList());
for (String file : fileList) {
file = file.trim();
askFileInfo(appEventMessage.getId(),mid,file);
mid++;
- channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId);
+ channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString());
}
}
}
@@ -110,7 +114,7 @@ public class CsWaveServiceImpl implements ICsWaveService {
/**
* 时间处理
*/
- public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId) {
+ public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId, String location) {
long endTime;
// 将startTime减去8小时(8 * 3600秒)
time -= 8 * 3600;
@@ -133,6 +137,7 @@ public class CsWaveServiceImpl implements ICsWaveService {
waveTimeDto.setEndTime(formatTime2 + "." + finalTime);
waveTimeDto.setDeviceId(deviceId);
waveTimeDto.setLineId(lineId);
+ waveTimeDto.setLocation(location);
redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,60L);
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
index c96cda0..0c7492c 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
@@ -7,13 +7,11 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
-import com.njcn.csdevice.api.EventLogsFeignClient;
-import com.njcn.csdevice.pojo.po.CsEventSendMsg;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.csharmonic.pojo.po.CsEventUserPO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
-import com.njcn.influxdb.utils.InfluxDbUtils;
+import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
@@ -23,16 +21,13 @@ import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.system.pojo.po.DictData;
-import com.njcn.user.api.AppInfoSetFeignClient;
-import com.njcn.user.api.UserFeignClient;
-import com.njcn.user.pojo.po.User;
-import com.njcn.user.pojo.po.app.AppInfoSet;
-import com.njcn.zlevent.pojo.dto.NoticeUserDto;
+import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.po.CsEventLogs;
import com.njcn.zlevent.service.ICsEventLogsService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.ICsEventUserService;
import com.njcn.zlevent.service.IEventService;
+import com.njcn.zlevent.utils.SendEventUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
@@ -40,21 +35,13 @@ import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import org.springframework.util.CollectionUtils;
+import org.yaml.snakeyaml.scanner.Constant;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* 类的介绍:
@@ -86,37 +73,27 @@ public class EventServiceImpl implements IEventService {
private final CsDeviceUserFeignClient csDeviceUserFeignClient;
- private final AppInfoSetFeignClient appInfoSetFeignClient;
-
- private final UserFeignClient userFeignClient;
-
- private final EventLogsFeignClient eventLogsFeignClient;
-
private final ICsEventLogsService csEventLogsService;
+ private final SendEventUtils sendEventUtils;
+
@Override
@Transactional(rollbackFor = Exception.class)
public void analysis(AppEventMessage appEventMessage) {
- //todo 这边到时候装置事件、暂态事件需要分开处理
List list1 = new ArrayList<>();
List records = new ArrayList();
List list2 = new ArrayList<>();
- //获取监测点id
- String lineId = null,id = null,tag = null;;
+ String lineId = null,id = null,tag = null;
LocalDateTime eventTime = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
//判断字典数据是否存在
if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){
saveData();
}
+ //判断监测点是否存在
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
}
- if (Objects.equals(appEventMessage.getDid(),1)){
- lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
- } else if (Objects.equals(appEventMessage.getDid(),2)){
- lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
- }
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
try {
@@ -127,17 +104,21 @@ public class EventServiceImpl implements IEventService {
//事件入库
CsEventPO csEvent = new CsEventPO();
csEvent.setId(id);
- csEvent.setLineId(lineId);
csEvent.setDeviceId(deviceId);
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
csEvent.setStartTime(eventTime);
tag = item.getName();
csEvent.setTag(tag);
- csEvent.setType(0);
- csEvent.setClDid(appEventMessage.getMsg().getClDid());
- //todo 默认等级先设为1.后期根据事件再做调整
- csEvent.setLevel(1);
- list1.add(csEvent);
+ if (Objects.equals(item.getType(),"2")){
+ csEvent.setType(0);
+ } else if (Objects.equals(item.getType(),"3")){
+ csEvent.setType(1);
+ } else if (Objects.equals(item.getType(),"1")){
+ csEvent.setType(2);
+ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
+ csEvent.setClDid(appEventMessage.getMsg().getClDid());
+ }
+ csEvent.setLevel(Integer.parseInt(item.getType()));
//参数入库
Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
if (!Objects.isNull(item.getParam())){
@@ -147,7 +128,20 @@ public class EventServiceImpl implements IEventService {
Map tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map fields = new HashMap<>();
- fields.put(param.getName(),param.getData());
+ if (Objects.equals(ZlConstant.EVENT_POSITION,param.getName())){
+ if (Objects.equals(param.getData(),ZlConstant.GRID)){
+ fields.put(param.getName(),"电网侧");
+ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString();
+ csEvent.setClDid(1);
+ } else if (Objects.equals(param.getData(),ZlConstant.LOAD)){
+ fields.put(param.getName(),"负载侧");
+ lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString();
+ csEvent.setClDid(2);
+ }
+ csEvent.setLocation(param.getData().toString());
+ } else {
+ fields.put(param.getName(),param.getData());
+ }
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
@@ -155,6 +149,8 @@ public class EventServiceImpl implements IEventService {
records.add(batchPoints.lineProtocol());
}
}
+ csEvent.setLineId(lineId);
+ list1.add(csEvent);
//事件用户关系入库
list2 = deviceUserList(deviceId,id);
//事件处理日志库
@@ -179,29 +175,9 @@ public class EventServiceImpl implements IEventService {
if (CollectionUtil.isNotEmpty(records)) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
- //todo 根据不同事件需要做处理,目前先测试消息通知
- List userList = getEventUser(deviceId);
- if (CollectionUtil.isNotEmpty(userList)){
- List csEventSendMsgList = new ArrayList<>();
- List devCodeList = userList.stream().map(User::getDevCode).collect(Collectors.toList());
- String content = appEventMessage.getId() + "于" +eventTime+ "发生暂态事件";
- sendEventToUser(devCodeList,"暂态事件",content,1);
- //记录推送日志
- for (User item : userList) {
- CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
- csEventSendMsg.setUserId(item.getId());
- csEventSendMsg.setEventId(id);
- csEventSendMsg.setSendTime(LocalDateTime.now());
- if (Objects.isNull(item.getDevCode())){
- csEventSendMsg.setStatus(0);
- csEventSendMsg.setRemark("用户设备识别码为空");
- } else {
- csEventSendMsg.setDevCode(item.getDevCode());
- csEventSendMsg.setStatus(1);
- }
- csEventSendMsgList.add(csEventSendMsg);
- }
- eventLogsFeignClient.addLogs(csEventSendMsgList);
+ //推送事件逻辑处理
+ for (AppEventMessage.DataArray item : dataArray) {
+ sendEventUtils.sendUser(1,item.getType(),deviceId,item.getName(),eventTime,appEventMessage.getId(),id);
}
} catch (Exception e) {
CsEventLogs csEventLogs = new CsEventLogs();
@@ -289,78 +265,4 @@ public class EventServiceImpl implements IEventService {
});
return result;
}
-
- /**
- * 获取需要通知暂态事件的用户
- */
- public List getEventUser(String devId) {
- List users = new ArrayList<>();
- List result = new ArrayList<>();
- List devCode = new ArrayList<>();
- //获取设备下主用户和子用户集合
- List list = csDeviceUserFeignClient.findUserById(devId).getData();
- //查询哪些用户打开了事件提示
- if (CollectionUtil.isNotEmpty(list)){
- List appInfoSet = appInfoSetFeignClient.getListById(list).getData();
- result = appInfoSet.stream()
- .filter(person -> person.getEventInfo() == 1)
- .map(AppInfoSet::getUserId).collect(Collectors.toList());
- }
- if (CollectionUtil.isNotEmpty(list)){
- users = userFeignClient.getUserByIdList(result).getData();
- }
- return users;
- }
-
- /**
- * 发送通知消息
- */
- public void sendEventToUser(List devCodeList, String title, String content, Integer type) {
- try {
- if (CollectionUtil.isNotEmpty(devCodeList)){
- // 创建一个URL对象,指定目标HTTPS接口地址
- URL url = new URL("https://fc-mp-b46c4dff-7244-4f7c-ae8b-7c1194d8cce8.next.bspapp.com/push");
- // 打开HTTPS连接
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- // 设置请求方法为POST
- connection.setRequestMethod("POST");
- // 设置请求头,指定Content-Type为application/json
- connection.setRequestProperty("Content-Type", "application/json");
- // 启用输出流以发送JSON数据
- connection.setDoOutput(true);
- // 创建JSON
- NoticeUserDto noticeUserDto = new NoticeUserDto();
- noticeUserDto.setPushClientId(devCodeList);
- noticeUserDto.setTitle(title);
- noticeUserDto.setContent(content);
- NoticeUserDto.Payload payload = new NoticeUserDto.Payload();
- payload.setType(type);
- noticeUserDto.setPayload(payload);
- // 将JSON数据写入输出流
- OutputStream outputStream = connection.getOutputStream();
- System.out.println("json==:" + new Gson().toJson(noticeUserDto));
- outputStream.write(new Gson().toJson(noticeUserDto).getBytes(StandardCharsets.UTF_8));
- outputStream.flush();
- outputStream.close();
- // 获取响应代码
- int responseCode = connection.getResponseCode();
- System.out.println("Response Code: " + responseCode);
- // 读取响应数据
- BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
- String inputLine;
- StringBuilder response = new StringBuilder();
- while ((inputLine = reader.readLine()) != null) {
- response.append(inputLine);
- }
- reader.close();
- // 打印响应内容
- System.out.println("Response Content: ");
- System.out.println(response.toString());
- // 关闭连接
- connection.disconnect();
- }
- } catch (IOException e) {
- e.getMessage();
- }
- }
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java
index 4a1865f..89152a4 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java
@@ -1,5 +1,6 @@
package com.njcn.zlevent.service.impl;
+import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
@@ -11,6 +12,7 @@ import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.common.pojo.exception.BusinessException;
+import com.njcn.csharmonic.api.WavePicFeignClient;
import com.njcn.mq.message.AppFileMessage;
import com.njcn.oss.constant.GeneralConstant;
import com.njcn.oss.constant.OssPath;
@@ -35,10 +37,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
/**
* 类的介绍:
@@ -64,6 +63,8 @@ public class FileServiceImpl implements IFileService {
private final ICsEventFileLogsService csEventLogsService;
+ private final WavePicFeignClient wavePicFeignClient;
+
@Override
public void analysisFileInfo(AppFileMessage appFileMessage) {
if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){
@@ -100,6 +101,7 @@ public class FileServiceImpl implements IFileService {
fileInfoDto.setFileSize(appFileMessage.getMsg().getFileInfo().getFileSize());
fileInfoDto.setFileCheck(appFileMessage.getMsg().getFileInfo().getFileCheck());
fileInfoDto.setFileChkType(appFileMessage.getMsg().getFileInfo().getFileChkType());
+ fileInfoDto.setLocation(waveTimeDto.getLocation());
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto, 3600L);
redisUtil.delete(AppRedisKey.TIME+fileName);
} else {
@@ -139,12 +141,13 @@ public class FileServiceImpl implements IFileService {
redisUtil.delete(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
- correlateEvents(fileInfoDto,filePath);
+ List eventList = correlateEvents(fileInfoDto,filePath);
+ //波形文件解析成图片
+ if (CollectionUtil.isNotEmpty(eventList)){
+ eventList.forEach(wavePicFeignClient::getWavePics);
+ }
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!");
- csEventLogs.setCompleteTime(LocalDateTime.now());
- csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
- csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
} else {
//缓存文件
map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData());
@@ -152,9 +155,6 @@ public class FileServiceImpl implements IFileService {
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
- csEventLogs.setCompleteTime(LocalDateTime.now());
- csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
- csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
}
} else {
//分帧传递数据,需要校验收到的文件个数
@@ -167,12 +167,13 @@ public class FileServiceImpl implements IFileService {
redisUtil.delete(fileName);
//波形文件关联事件
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
- correlateEvents(fileInfoDto,filePath);
+ List eventList = correlateEvents(fileInfoDto,filePath);
+ //波形文件解析成图片
+ if (CollectionUtil.isNotEmpty(eventList)){
+ eventList.forEach(wavePicFeignClient::getWavePics);
+ }
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!");
- csEventLogs.setCompleteTime(LocalDateTime.now());
- csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
- csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
} else {
//缓存
fileStreamDto = new FileStreamDto();
@@ -180,11 +181,12 @@ public class FileServiceImpl implements IFileService {
redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L);
csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!");
- csEventLogs.setCompleteTime(LocalDateTime.now());
- csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
- csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
}
}
+ csEventLogs.setCompleteTime(LocalDateTime.now());
+ csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt));
+ csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt));
+ csEventLogs.setLocation(fileInfoDto.getLocation());
//记录日志
csEventLogsService.save(csEventLogs);
} else {
@@ -200,6 +202,7 @@ public class FileServiceImpl implements IFileService {
}
}
+
/**
* 请求文件流信息
*/
@@ -258,8 +261,8 @@ public class FileServiceImpl implements IFileService {
} else {
byteArray = bytes;
}
- InputStream inputStream = new ByteArrayInputStream(byteArray);
//todo 此处需要做文件crc校验或者md5校验,目前不知道怎么处理,先放一下
+ InputStream inputStream = new ByteArrayInputStream(byteArray);
String path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH,fileName);
try {
inputStream.close();
@@ -296,13 +299,14 @@ public class FileServiceImpl implements IFileService {
/**
* 波形文件关联事件
*/
- public void correlateEvents(FileInfoDto fileInfoDto,String path) {
+ public List correlateEvents(FileInfoDto fileInfoDto, String path) {
CsEventParam csEventParam = new CsEventParam();
csEventParam.setLineId(fileInfoDto.getLineId());
csEventParam.setDeviceId(fileInfoDto.getDeviceId());
csEventParam.setStartTime(fileInfoDto.getStartTime());
csEventParam.setEndTime(fileInfoDto.getEndTime());
csEventParam.setPath(path);
- csEventService.updateCsEvent(csEventParam);
+ csEventParam.setLocation(fileInfoDto.getLocation());
+ return csEventService.updateCsEvent(csEventParam);
}
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/SendEventUtils.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/SendEventUtils.java
new file mode 100644
index 0000000..a1df40d
--- /dev/null
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/SendEventUtils.java
@@ -0,0 +1,214 @@
+package com.njcn.zlevent.utils;
+
+import cn.hutool.core.collection.CollectionUtil;
+import com.alibaba.nacos.shaded.com.google.gson.Gson;
+import com.njcn.csdevice.api.CsDeviceUserFeignClient;
+import com.njcn.csdevice.api.EventLogsFeignClient;
+import com.njcn.csdevice.pojo.po.CsEventSendMsg;
+import com.njcn.system.api.EpdFeignClient;
+import com.njcn.user.api.AppInfoSetFeignClient;
+import com.njcn.user.api.UserFeignClient;
+import com.njcn.user.pojo.po.User;
+import com.njcn.user.pojo.po.app.AppInfoSet;
+import com.njcn.zlevent.pojo.dto.NoticeUserDto;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * 类的介绍:
+ *
+ * @author xuyang
+ * @version 1.0.0
+ * @createTime 2023/9/25 16:08
+ */
+@Slf4j
+@Component
+public class SendEventUtils {
+
+ @Resource
+ private UserFeignClient userFeignClient;
+
+ @Resource
+ private CsDeviceUserFeignClient csDeviceUserFeignClient;
+
+ @Resource
+ private AppInfoSetFeignClient appInfoSetFeignClient;
+
+ @Resource
+ private EventLogsFeignClient eventLogsFeignClient;
+
+ @Resource
+ private EpdFeignClient epdFeignClient;
+
+ /**
+ * 事件推送给相关用户
+ * @param eventType 事件类型 1:事件 2:告警
+ * @param type 等级 事件分为设备事件、暂态事件、稳态事件 告警分为Ⅰ级告警、Ⅱ级告警、Ⅲ级告警
+ * @param devId 设备id
+ * @param eventName 事件名称
+ * @param eventTime 事件发生事件
+ * @param ndid 设备ndid
+ * @param id 事件id
+ */
+ @Transactional(rollbackFor = Exception.class)
+ public void sendUser(Integer eventType,String type,String devId, String eventName, LocalDateTime eventTime, String ndid, String id) {
+ List users = new ArrayList<>();
+ List devCodeList = new ArrayList<>();
+ List csEventSendMsgList = new ArrayList<>();
+ NoticeUserDto noticeUserDto = new NoticeUserDto();
+ NoticeUserDto.Payload payload = new NoticeUserDto.Payload();
+ String content;
+ eventName = epdFeignClient.findByName(eventName).getData().getShowName();
+ //事件处理
+ if (eventType == 1){
+ switch (type) {
+ case "1":
+ //设备自身事件 不推送给用户,推送给业务管理
+ users = userFeignClient.getAdminInfo().getData();
+ noticeUserDto.setPushClientId(Collections.singletonList(users.get(0).getDevCode()));
+ noticeUserDto.setTitle("设备事件");
+ break;
+ case "2":
+ //暂态事件
+ users = getEventUser(devId);
+ devCodeList = users.stream().map(User::getDevCode).collect(Collectors.toList());
+ noticeUserDto.setPushClientId(devCodeList);
+ noticeUserDto.setTitle("暂态事件");
+ break;
+ case "3":
+ //稳态事件
+ users = getEventUser(devId);
+ devCodeList = users.stream().map(User::getDevCode).collect(Collectors.toList());
+ noticeUserDto.setPushClientId(devCodeList);
+ noticeUserDto.setTitle("稳态事件");
+ break;
+ default:
+ break;
+ }
+ content = ndid + "于" +eventTime+ "发生" + eventName;
+ noticeUserDto.setContent(content);
+ payload.setType(Integer.parseInt(type));
+ noticeUserDto.setPayload(payload);
+ }
+ //告警处理
+ else if (eventType == 2){
+ switch (type) {
+ case "1":
+ //Ⅰ级告警 不推送给用户,推送给业务管理
+ users = userFeignClient.getAdminInfo().getData();
+ noticeUserDto.setPushClientId(Collections.singletonList(users.get(0).getDevCode()));
+ break;
+ case "2":
+ case "3":
+ //Ⅱ、Ⅲ级告警推送相关用户
+ users = getEventUser(devId);
+ devCodeList = users.stream().map(User::getDevCode).collect(Collectors.toList());
+ noticeUserDto.setPushClientId(devCodeList);
+ break;
+ default:
+ break;
+ }
+ noticeUserDto.setTitle("告警事件");
+ content = ndid + "于" +eventTime+ "发生告警,告警故障编码为:" + eventName;
+ noticeUserDto.setContent(content);
+ payload.setType(4);
+ noticeUserDto.setPayload(payload);
+ }
+ sendEventToUser(noticeUserDto);
+ //记录推送日志
+ for (User item : users) {
+ CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
+ csEventSendMsg.setUserId(item.getId());
+ csEventSendMsg.setEventId(id);
+ csEventSendMsg.setSendTime(LocalDateTime.now());
+ if (Objects.isNull(item.getDevCode())){
+ csEventSendMsg.setStatus(0);
+ csEventSendMsg.setRemark("用户设备识别码为空");
+ } else {
+ csEventSendMsg.setDevCode(item.getDevCode());
+ csEventSendMsg.setStatus(1);
+ }
+ csEventSendMsgList.add(csEventSendMsg);
+ }
+ eventLogsFeignClient.addLogs(csEventSendMsgList);
+ }
+
+ /**
+ * 发送通知消息
+ */
+ public void sendEventToUser(NoticeUserDto noticeUserDto) {
+ try {
+ // 创建一个URL对象,指定目标HTTPS接口地址
+ URL url = new URL("https://fc-mp-b46c4dff-7244-4f7c-ae8b-7c1194d8cce8.next.bspapp.com/push");
+ // 打开HTTPS连接
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ // 设置请求方法为POST
+ connection.setRequestMethod("POST");
+ // 设置请求头,指定Content-Type为application/json
+ connection.setRequestProperty("Content-Type", "application/json");
+ // 启用输出流以发送JSON数据
+ connection.setDoOutput(true);
+ // 将JSON数据写入输出流
+ OutputStream outputStream = connection.getOutputStream();
+ outputStream.write(new Gson().toJson(noticeUserDto).getBytes(StandardCharsets.UTF_8));
+ outputStream.flush();
+ outputStream.close();
+ // 获取响应代码
+ int responseCode = connection.getResponseCode();
+ log.info("Response Code: " + responseCode);
+ // 读取响应数据
+ BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+ String inputLine;
+ StringBuilder response = new StringBuilder();
+ while ((inputLine = reader.readLine()) != null) {
+ response.append(inputLine);
+ }
+ reader.close();
+ // 打印响应内容
+ log.info("Response Content: " + response.toString());
+ // 关闭连接
+ connection.disconnect();
+ } catch (IOException e) {
+ e.getMessage();
+ }
+ }
+
+ /**
+ * 获取需要通知暂态事件的用户
+ */
+ public List getEventUser(String devId) {
+ List users = new ArrayList<>();
+ List result = new ArrayList<>();
+ //获取设备下主用户和子用户集合
+ List list = csDeviceUserFeignClient.findUserById(devId).getData();
+ //查询哪些用户打开了事件提示
+ if (CollectionUtil.isNotEmpty(list)){
+ List appInfoSet = appInfoSetFeignClient.getListById(list).getData();
+ result = appInfoSet.stream()
+ .filter(person -> person.getEventInfo() == 1)
+ .map(AppInfoSet::getUserId).collect(Collectors.toList());
+ }
+ if (CollectionUtil.isNotEmpty(list)){
+ users = userFeignClient.getUserByIdList(result).getData();
+ }
+ return users;
+ }
+
+}