代码优化

1.推送消息屏蔽,只推送治理设备信息;
2.文件下载优化
3.公共方法提取
This commit is contained in:
xy
2024-11-01 11:22:38 +08:00
parent 749a814bfd
commit e2f46ebcde
13 changed files with 272 additions and 279 deletions

View File

@@ -1,4 +1,4 @@
package com.njcn.zlevent.pojo.dto; package com.njcn.access.pojo.dto;
import com.njcn.access.annotation.ParamName; import com.njcn.access.annotation.ParamName;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;

View File

@@ -0,0 +1,63 @@
package com.njcn.access.utils;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.pojo.dto.NoticeUserDto;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
/**
* 推送消息
* @author xy
*/
@Component
@Slf4j
@AllArgsConstructor
public class SendMessageUtil {
public void sendEventToUser(NoticeUserDto noticeUserDto) {
try {
// 创建一个URL对象指定目标HTTPS接口地址
URL url = new URL("https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push");
// 打开HTTPS连接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法为POST
connection.setRequestMethod("POST");
// 设置请求头指定Content-Type为application/json
connection.setRequestProperty("Content-Type", "application/json");
// 启用输出流以发送JSON数据
connection.setDoOutput(true);
// 将JSON数据写入输出流
OutputStream outputStream = connection.getOutputStream();
log.info(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid"));
outputStream.write(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid").getBytes(StandardCharsets.UTF_8));
outputStream.flush();
outputStream.close();
// 获取响应代码
int responseCode = connection.getResponseCode();
log.info("Response Code: " + responseCode);
// 读取响应数据
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
StringBuilder response = new StringBuilder();
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
}
reader.close();
// 打印响应内容
log.info("Response Content: " + response.toString());
// 关闭连接
connection.disconnect();
} catch (IOException e) {
e.getMessage();
}
}
}

View File

@@ -339,7 +339,7 @@ public class MqttMessageHandler {
//更新电网侧、负载侧监测点信息 //更新电网侧、负载侧监测点信息
askDevData(nDid,version,3,(res.getMid()+1)); askDevData(nDid,version,3,(res.getMid()+1));
//录波任务倒计时 //录波任务倒计时
redisUtil.saveByKeyWithExpire("startFile",null,120L); redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L);
} else { } else {
log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
logDto.setResult(0); logDto.setResult(0);

View File

@@ -1,13 +1,14 @@
package com.njcn.access.listener; package com.njcn.access.listener;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsDeviceOnlineLogsService;
import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl; import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.MqttUtil; import com.njcn.access.utils.MqttUtil;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.CsDeviceUserFeignClient; import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.CsLedgerFeignClient; import com.njcn.csdevice.api.CsLedgerFeignClient;
@@ -20,7 +21,6 @@ import com.njcn.redis.utils.RedisUtil;
import com.njcn.user.api.AppUserFeignClient; import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient; import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User; import com.njcn.user.pojo.po.User;
import com.njcn.zlevent.pojo.dto.NoticeUserDto;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
@@ -29,13 +29,6 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
@@ -79,6 +72,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private UserFeignClient userFeignClient; private UserFeignClient userFeignClient;
@Resource @Resource
private RedisUtil redisUtil; private RedisUtil redisUtil;
@Resource
private SendMessageUtil sendMessageUtil;
private final Object lock = new Object(); private final Object lock = new Object();
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
@@ -153,7 +148,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) { private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) {
synchronized (lock) { synchronized (lock) {
NoticeUserDto dto = sendOffLine(nDid); NoticeUserDto dto = sendOffLine(nDid);
sendEventToUser(dto); sendMessageUtil.sendEventToUser(dto);
addLogs(dto); addLogs(dto);
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> { ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
@@ -178,7 +173,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
MAX_WARNING_TIMES++; MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 30) { if (MAX_WARNING_TIMES == 30) {
NoticeUserDto dto2 = sendConnectFail(nDid); NoticeUserDto dto2 = sendConnectFail(nDid);
sendEventToUser(dto2); sendMessageUtil.sendEventToUser(dto2);
addLogs(dto2); addLogs(dto2);
} }
//记录装置掉线时间 //记录装置掉线时间
@@ -194,7 +189,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
MAX_WARNING_TIMES++; MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 30) { if (MAX_WARNING_TIMES == 30) {
NoticeUserDto dto2 = sendConnectFail(nDid); NoticeUserDto dto2 = sendConnectFail(nDid);
sendEventToUser(dto2); sendMessageUtil.sendEventToUser(dto2);
addLogs(dto2); addLogs(dto2);
} }
logDto.setResult(0); logDto.setResult(0);
@@ -260,42 +255,4 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
List<User> users = userFeignClient.appuserByIdList(adminList).getData(); List<User> users = userFeignClient.appuserByIdList(adminList).getData();
return users.stream().map(User::getDevCode).filter(Objects::nonNull).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList()); return users.stream().map(User::getDevCode).filter(Objects::nonNull).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList());
} }
public void sendEventToUser(NoticeUserDto noticeUserDto) {
try {
// 创建一个URL对象指定目标HTTPS接口地址
URL url = new URL("https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push");
// 打开HTTPS连接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法为POST
connection.setRequestMethod("POST");
// 设置请求头指定Content-Type为application/json
connection.setRequestProperty("Content-Type", "application/json");
// 启用输出流以发送JSON数据
connection.setDoOutput(true);
// 将JSON数据写入输出流
OutputStream outputStream = connection.getOutputStream();
log.info(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid"));
outputStream.write(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid").getBytes(StandardCharsets.UTF_8));
outputStream.flush();
outputStream.close();
// 获取响应代码
int responseCode = connection.getResponseCode();
log.info("Response Code: " + responseCode);
// 读取响应数据
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
StringBuilder response = new StringBuilder();
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
}
reader.close();
// 打印响应内容
log.info("Response Content: " + response.toString());
// 关闭连接
connection.disconnect();
} catch (IOException e) {
e.getMessage();
}
}
} }

View File

@@ -34,6 +34,7 @@ import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.SysDicTreePO; import com.njcn.system.pojo.po.SysDicTreePO;
import com.njcn.web.utils.RequestUtil; import com.njcn.web.utils.RequestUtil;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -52,6 +53,7 @@ import java.util.stream.Collectors;
*/ */
@Service @Service
@AllArgsConstructor @AllArgsConstructor
@Slf4j
public class CsDeviceServiceImpl implements ICsDeviceService { public class CsDeviceServiceImpl implements ICsDeviceService {
private static final Logger logger = LoggerFactory.getLogger(CsDeviceServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(CsDeviceServiceImpl.class);
@@ -575,7 +577,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//发起接入 //发起接入
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())),1,false); publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())),1,false);
//录波任务倒计时 //录波任务倒计时
redisUtil.saveByKeyWithExpire("startFile",null,120L); redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L);
result = true; result = true;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@@ -98,6 +98,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
downloadFile(missingList,dto.getNDid(),fileName); downloadFile(missingList,dto.getNDid(),fileName);
} }
} }
//重新接入之后,装置60s后开始消费缓存的录波文件
else if (expiredKey.startsWith("startFile:")) {
String nDid = expiredKey.split(":")[1];
//处理缓存数据
csWaveAnalysisService.channelWave(nDid);
}
//手动文件下载 //手动文件下载
else if (expiredKey.startsWith(AppRedisKey.FILE_DOWN_TIME)) { else if (expiredKey.startsWith(AppRedisKey.FILE_DOWN_TIME)) {
List<Integer> missingList = new ArrayList<>(); List<Integer> missingList = new ArrayList<>();

View File

@@ -77,9 +77,9 @@ public class CsAlarmServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
//推送事件逻辑处理 && cs_event_user入库 && 修改字典中告警事件的编码 //推送事件逻辑处理 && cs_event_user入库 && 修改字典中告警事件的编码
for (AppEventMessage.DataArray item : dataArray) { for (AppEventMessage.DataArray item : dataArray) {
if (Objects.isNull(item.getCode())){ if (Objects.isNull(item.getCode())){
sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getName(),eventTime,id); sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid());
} else { } else {
sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getCode(),eventTime,id); sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getCode(),eventTime,id,po.getNdid());
//更新字典信息 //更新字典信息
EleEpdPqd eleEpdPqd = epdFeignClient.findByName(item.getName()).getData(); EleEpdPqd eleEpdPqd = epdFeignClient.findByName(item.getName()).getData();
EleEpdPqdParam.EleEpdPqdUpdateParam updateParam = new EleEpdPqdParam.EleEpdPqdUpdateParam(); EleEpdPqdParam.EleEpdPqdUpdateParam updateParam = new EleEpdPqdParam.EleEpdPqdUpdateParam();

View File

@@ -129,6 +129,8 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
} else { } else {
throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING); throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING);
} }
} else {
throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE);
} }
} }

View File

@@ -169,7 +169,7 @@ public class EventServiceImpl implements IEventService {
csEventService.saveBatch(list1); csEventService.saveBatch(list1);
//推送事件逻辑处理 && cs_event_user入库 //推送事件逻辑处理 && cs_event_user入库
for (AppEventMessage.DataArray item : dataArray) { for (AppEventMessage.DataArray item : dataArray) {
sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,id); sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid());
} }
} }
//evt_data入库 //evt_data入库

View File

@@ -16,7 +16,6 @@ import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.config.GeneralInfo; import com.njcn.common.config.GeneralInfo;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csharmonic.api.WavePicFeignClient; import com.njcn.csharmonic.api.WavePicFeignClient;
import com.njcn.mq.message.AppFileMessage; import com.njcn.mq.message.AppFileMessage;
import com.njcn.oss.constant.GeneralConstant; import com.njcn.oss.constant.GeneralConstant;
@@ -24,15 +23,16 @@ import com.njcn.oss.constant.OssPath;
import com.njcn.oss.utils.FileStorageUtil; import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.enums.DicTreeEnum;
import com.njcn.zlevent.param.CsEventParam; import com.njcn.zlevent.param.CsEventParam;
import com.njcn.zlevent.pojo.dto.FileInfoDto; import com.njcn.zlevent.pojo.dto.FileInfoDto;
import com.njcn.zlevent.pojo.dto.FileStreamDto; import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.pojo.po.CsEventFileLogs; import com.njcn.zlevent.pojo.po.CsEventFileLogs;
import com.njcn.zlevent.pojo.po.CsWave; import com.njcn.zlevent.pojo.po.CsWave;
import com.njcn.zlevent.service.*; import com.njcn.zlevent.service.ICsEventFileLogsService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.ICsWaveService;
import com.njcn.zlevent.service.IFileService;
import com.njcn.zlevent.utils.RemoveInfoUtils; import com.njcn.zlevent.utils.RemoveInfoUtils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -65,11 +65,9 @@ public class FileServiceImpl implements IFileService {
private final WavePicFeignClient wavePicFeignClient; private final WavePicFeignClient wavePicFeignClient;
private final ICsWaveService csWaveService; private final ICsWaveService csWaveService;
private final GeneralInfo generalInfo; private final GeneralInfo generalInfo;
private final ICsWaveAnalysisService iCsWaveAnalysisService;
private final ChannelObjectUtil channelObjectUtil; private final ChannelObjectUtil channelObjectUtil;
private final RemoveInfoUtils removeInfoUtils; private final RemoveInfoUtils removeInfoUtils;
private static Integer mid = 1; private static Integer mid = 1;
private final DictTreeFeignClient dictTreeFeignClient;
private final EquipmentFeignClient equipmentFeignClient; private final EquipmentFeignClient equipmentFeignClient;
@Override @Override
@@ -141,10 +139,8 @@ public class FileServiceImpl implements IFileService {
csEventLogs.setNdid(appFileMessage.getId()); csEventLogs.setNdid(appFileMessage.getId());
csEventLogs.setFileName(appFileMessage.getMsg().getName()); csEventLogs.setFileName(appFileMessage.getMsg().getName());
csEventLogs.setStatus(0); csEventLogs.setStatus(0);
//获取装置id //判断设备类型 true:治理设备 false:其他设备
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appFileMessage.getId()).getData(); boolean devModel = equipmentFeignClient.judgeDevModel(appFileMessage.getId()).getData();
//设备型号
String code = dictTreeFeignClient.queryById(po.getDevModel()).getData().getCode();
try { try {
//todo 目前文件先只处理波形事件的,后续有其他文件再做处理 //todo 目前文件先只处理波形事件的,后续有其他文件再做处理
String fileName = appFileMessage.getMsg().getName(); String fileName = appFileMessage.getMsg().getName();
@@ -266,7 +262,7 @@ public class FileServiceImpl implements IFileService {
filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,"");
List<String> eventList = correlateEvents(fileInfoDto,filePath,fileName); List<String> eventList = correlateEvents(fileInfoDto,filePath,fileName);
//波形文件解析成图片 //波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList) && Objects.equals(DicTreeEnum.PQ_COM.getCode(),code)){ if (CollectionUtil.isNotEmpty(eventList) && devModel){
eventList.forEach(wavePicFeignClient::getWavePics); eventList.forEach(wavePicFeignClient::getWavePics);
} }
//解析完删除、处理缓存 //解析完删除、处理缓存
@@ -312,7 +308,7 @@ public class FileServiceImpl implements IFileService {
filePath = filePath.replaceAll(GeneralConstant.CFG, "").replaceAll(GeneralConstant.DAT, ""); filePath = filePath.replaceAll(GeneralConstant.CFG, "").replaceAll(GeneralConstant.DAT, "");
List<String> eventList = correlateEvents(fileInfoDto, filePath, fileName); List<String> eventList = correlateEvents(fileInfoDto, filePath, fileName);
//波形文件解析成图片 //波形文件解析成图片
if (CollectionUtil.isNotEmpty(eventList) && Objects.equals(DicTreeEnum.PQ_COM.getCode(),code)) { if (CollectionUtil.isNotEmpty(eventList) && devModel) {
eventList.forEach(wavePicFeignClient::getWavePics); eventList.forEach(wavePicFeignClient::getWavePics);
} }
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
@@ -357,6 +353,18 @@ public class FileServiceImpl implements IFileService {
csEventLogs.setCompleteTime(LocalDateTime.now()); csEventLogs.setCompleteTime(LocalDateTime.now());
//记录日志 //记录日志
csEventLogsService.save(csEventLogs); csEventLogsService.save(csEventLogs);
//清空缓存
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
//删除临时文件
String fileName = appFileMessage.getMsg().getName();
String lsFileName = generalInfo.getBusinessTempPath() + File.separator + fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1];
File file = new File(lsFileName);
if (file.exists()) {
file.delete();
}
//继续消费
removeInfoUtils.deleteEventInfo(appFileMessage.getId(),appFileMessage.getMsg().getName());
} }
} }

View File

@@ -46,7 +46,7 @@ public class RemoveInfoUtils {
// 删除相关的 Redis 键 // 删除相关的 Redis 键
deleteRelatedKeys(nDid, fileName); deleteRelatedKeys(nDid, fileName);
// 处理重试逻辑 // 处理重试逻辑
if (retryTimes < 3) { if (retryTimes < 4) {
// 增加重试次数并保存 // 增加重试次数并保存
redisUtil.saveByKey(("retryEvent:" + nDid + fileName), retryTimes + 1); redisUtil.saveByKey(("retryEvent:" + nDid + fileName), retryTimes + 1);
// 重排文件列表 // 重排文件列表
@@ -54,6 +54,8 @@ public class RemoveInfoUtils {
} else { } else {
// 从列表中移除文件 // 从列表中移除文件
removeFileFromList(nDid, fileName); removeFileFromList(nDid, fileName);
// 清空计数
redisUtil.delete(("retryEvent:" + nDid + fileName));
} }
// 检查是否还有其他文件需要处理 // 检查是否还有其他文件需要处理
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class);

View File

@@ -2,9 +2,11 @@ package com.njcn.zlevent.utils;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.csdevice.api.CsDeviceUserFeignClient; import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.CsLedgerFeignClient; import com.njcn.csdevice.api.CsLedgerFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.api.EventLogsFeignClient; import com.njcn.csdevice.api.EventLogsFeignClient;
import com.njcn.csdevice.pojo.dto.DevDetailDTO; import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.po.CsEventSendMsg; import com.njcn.csdevice.pojo.po.CsEventSendMsg;
@@ -15,20 +17,12 @@ import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient; import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User; import com.njcn.user.pojo.po.User;
import com.njcn.user.pojo.po.app.AppInfoSet; import com.njcn.user.pojo.po.app.AppInfoSet;
import com.njcn.zlevent.pojo.dto.NoticeUserDto;
import com.njcn.zlevent.service.ICsEventUserService; import com.njcn.zlevent.service.ICsEventUserService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -48,27 +42,24 @@ public class SendEventUtils {
@Resource @Resource
private UserFeignClient userFeignClient; private UserFeignClient userFeignClient;
@Resource @Resource
private AppUserFeignClient appUserFeignClient; private AppUserFeignClient appUserFeignClient;
@Resource @Resource
private CsDeviceUserFeignClient csDeviceUserFeignClient; private CsDeviceUserFeignClient csDeviceUserFeignClient;
@Resource @Resource
private AppInfoSetFeignClient appInfoSetFeignClient; private AppInfoSetFeignClient appInfoSetFeignClient;
@Resource @Resource
private EventLogsFeignClient eventLogsFeignClient; private EventLogsFeignClient eventLogsFeignClient;
@Resource @Resource
private EpdFeignClient epdFeignClient; private EpdFeignClient epdFeignClient;
@Resource @Resource
private ICsEventUserService csEventUserService; private ICsEventUserService csEventUserService;
@Resource @Resource
private CsLedgerFeignClient csLedgerFeignclient; private CsLedgerFeignClient csLedgerFeignclient;
@Resource
private SendMessageUtil sendMessageUtil;
@Resource
private EquipmentFeignClient equipmentFeignClient;
/** /**
* 事件推送给相关用户 * 事件推送给相关用户
@@ -80,7 +71,7 @@ public class SendEventUtils {
* @param id 事件id * @param id 事件id
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void sendUser(Integer eventType,String type,String devId, String eventName, LocalDateTime eventTime, String id) { public void sendUser(Integer eventType,String type,String devId, String eventName, LocalDateTime eventTime, String id, String nDid) {
int code; int code;
List<User> users = new ArrayList<>(); List<User> users = new ArrayList<>();
List<String> eventUser; List<String> eventUser;
@@ -91,209 +82,171 @@ public class SendEventUtils {
NoticeUserDto.Payload payload = new NoticeUserDto.Payload(); NoticeUserDto.Payload payload = new NoticeUserDto.Payload();
String content; String content;
List<CsEventUserPO> result = new ArrayList<>(); List<CsEventUserPO> result = new ArrayList<>();
//事件处理 //获取设备类型 true:治理设备 false:其他类型的设备
if (eventType == 1){ boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
eventName = epdFeignClient.findByName(eventName).getData().getShowName(); if (devModel) {
switch (type) { //事件处理
case "1": if (eventType == 1){
code = 2; eventName = epdFeignClient.findByName(eventName).getData().getShowName();
//设备自身事件 不推送给用户,推送给业务管理 switch (type) {
eventUser = getEventUser(devId,false); case "1":
if (CollectionUtil.isNotEmpty(eventUser)) { code = 2;
eventUser.forEach(item->{ //设备自身事件 不推送给用户,推送给业务管理
CsEventUserPO csEventUser = new CsEventUserPO(); eventUser = getEventUser(devId,false);
csEventUser.setUserId(item); if (CollectionUtil.isNotEmpty(eventUser)) {
csEventUser.setStatus(0); eventUser.forEach(item->{
csEventUser.setEventId(id); CsEventUserPO csEventUser = new CsEventUserPO();
result.add(csEventUser); csEventUser.setUserId(item);
}); csEventUser.setStatus(0);
csEventUser.setEventId(id);
result.add(csEventUser);
});
users = getSendUser(eventUser,2); users = getSendUser(eventUser,2);
if (CollectionUtil.isNotEmpty(users)){ if (CollectionUtil.isNotEmpty(users)){
for (User user : users){ for (User user : users){
userList.add(user.getDevCode()); userList.add(user.getDevCode());
}
noticeUserDto.setPushClientId(userList);
noticeUserDto.setTitle("设备事件");
} }
noticeUserDto.setPushClientId(userList);
noticeUserDto.setTitle("设备事件");
} }
} break;
break; case "2":
case "2": code = 0;
code = 0; //暂态事件
//暂态事件 eventUser = getEventUser(devId,true);
eventUser = getEventUser(devId,true); if (CollectionUtil.isNotEmpty(eventUser)) {
if (CollectionUtil.isNotEmpty(eventUser)) { eventUser.forEach(item->{
eventUser.forEach(item->{ CsEventUserPO csEventUser = new CsEventUserPO();
CsEventUserPO csEventUser = new CsEventUserPO(); csEventUser.setUserId(item);
csEventUser.setUserId(item); csEventUser.setStatus(0);
csEventUser.setStatus(0); csEventUser.setEventId(id);
csEventUser.setEventId(id); result.add(csEventUser);
result.add(csEventUser); });
}); users = getSendUser(eventUser,0);
users = getSendUser(eventUser,0); if (CollectionUtil.isNotEmpty(users)){
if (CollectionUtil.isNotEmpty(users)){ devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList());
devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); noticeUserDto.setPushClientId(devCodeList);
noticeUserDto.setPushClientId(devCodeList); noticeUserDto.setTitle("暂态事件");
noticeUserDto.setTitle("暂态事件"); }
} }
} break;
break; case "3":
case "3": code = 1;
code = 1; //稳态事件
//稳态事件 eventUser = getEventUser(devId,true);
eventUser = getEventUser(devId,true); if (CollectionUtil.isNotEmpty(eventUser)) {
if (CollectionUtil.isNotEmpty(eventUser)) { eventUser.forEach(item->{
eventUser.forEach(item->{ CsEventUserPO csEventUser = new CsEventUserPO();
CsEventUserPO csEventUser = new CsEventUserPO(); csEventUser.setUserId(item);
csEventUser.setUserId(item); csEventUser.setStatus(0);
csEventUser.setStatus(0); csEventUser.setEventId(id);
csEventUser.setEventId(id); result.add(csEventUser);
result.add(csEventUser); });
}); users = getSendUser(eventUser,1);
users = getSendUser(eventUser,1); if (CollectionUtil.isNotEmpty(users)){
if (CollectionUtil.isNotEmpty(users)){ devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList());
devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); noticeUserDto.setPushClientId(devCodeList);
noticeUserDto.setPushClientId(devCodeList); noticeUserDto.setTitle("稳态事件");
noticeUserDto.setTitle("稳态事件"); }
} }
} break;
break; default:
default: code = 0;
code = 0; break;
break; }
//获取台账信息
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData();
content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生" + eventName;
noticeUserDto.setContent(content);
payload.setType(code);
payload.setPath("/pages/message/message?type="+payload.getType());
noticeUserDto.setPayload(payload);
} }
//获取台账信息 //告警处理
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData(); else if (eventType == 2){
content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生" + eventName; switch (type) {
noticeUserDto.setContent(content); case "1":
payload.setType(code); //Ⅰ级告警 不推送给用户,推送给业务管理
payload.setPath("/pages/message/message?type="+payload.getType()); eventUser = getEventUser(devId,false);
noticeUserDto.setPayload(payload); if (CollectionUtil.isNotEmpty(eventUser)) {
} eventUser.forEach(item->{
//告警处理 CsEventUserPO csEventUser = new CsEventUserPO();
else if (eventType == 2){ csEventUser.setUserId(item);
switch (type) { csEventUser.setStatus(0);
case "1": csEventUser.setEventId(id);
//Ⅰ级告警 不推送给用户,推送给业务管理 result.add(csEventUser);
eventUser = getEventUser(devId,false); });
if (CollectionUtil.isNotEmpty(eventUser)) { users = getSendUser(eventUser,3);
eventUser.forEach(item->{ if (CollectionUtil.isNotEmpty(users)){
CsEventUserPO csEventUser = new CsEventUserPO(); eventName = epdFeignClient.findByName(eventName).getData().getShowName();
csEventUser.setUserId(item); devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList());
csEventUser.setStatus(0); noticeUserDto.setPushClientId(devCodeList);
csEventUser.setEventId(id); }
result.add(csEventUser);
});
users = getSendUser(eventUser,3);
if (CollectionUtil.isNotEmpty(users)){
eventName = epdFeignClient.findByName(eventName).getData().getShowName();
devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList());
noticeUserDto.setPushClientId(devCodeList);
} }
} break;
break; case "2":
case "2": eventName = epdFeignClient.findByName(eventName).getData().getShowName();
eventName = epdFeignClient.findByName(eventName).getData().getShowName(); case "3":
case "3": //Ⅱ、Ⅲ级告警推送相关用户及业务管理员
//Ⅱ、Ⅲ级告警推送相关用户及业务管理员 eventUser = getEventUser(devId,true);
eventUser = getEventUser(devId,true); if (CollectionUtil.isNotEmpty(eventUser)) {
if (CollectionUtil.isNotEmpty(eventUser)) { eventUser.forEach(item->{
eventUser.forEach(item->{ CsEventUserPO csEventUser = new CsEventUserPO();
CsEventUserPO csEventUser = new CsEventUserPO(); csEventUser.setUserId(item);
csEventUser.setUserId(item); csEventUser.setStatus(0);
csEventUser.setStatus(0); csEventUser.setEventId(id);
csEventUser.setEventId(id); result.add(csEventUser);
result.add(csEventUser); });
}); users = getSendUser(eventUser,3);
users = getSendUser(eventUser,3); if (CollectionUtil.isNotEmpty(users)){
if (CollectionUtil.isNotEmpty(users)){ devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList());
devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); noticeUserDto.setPushClientId(devCodeList);
noticeUserDto.setPushClientId(devCodeList); }
} }
} break;
break; default:
default: break;
break; }
noticeUserDto.setTitle("告警事件");
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData();
content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生告警,告警信息:" + eventName;
noticeUserDto.setContent(content);
payload.setType(3);
payload.setPath("/pages/message/message?type="+payload.getType());
noticeUserDto.setPayload(payload);
} }
noticeUserDto.setTitle("告警事件"); if (CollectionUtil.isNotEmpty(noticeUserDto.getPushClientId())) {
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData(); List<String> filteredList = noticeUserDto.getPushClientId().stream()
content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生告警,告警信息:" + eventName; .filter(Objects::nonNull)
noticeUserDto.setContent(content); .distinct()
payload.setType(3); .collect(Collectors.toList());
payload.setPath("/pages/message/message?type="+payload.getType()); if (CollectionUtil.isNotEmpty(filteredList)) {
noticeUserDto.setPayload(payload); noticeUserDto.setPushClientId(filteredList);
} sendMessageUtil.sendEventToUser(noticeUserDto);
if (CollectionUtil.isNotEmpty(noticeUserDto.getPushClientId())) { }
List<String> filteredList = noticeUserDto.getPushClientId().stream()
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(filteredList)) {
noticeUserDto.setPushClientId(filteredList);
sendEventToUser(noticeUserDto);
} }
} //记录推送日志
//记录推送日志 for (User item : users) {
for (User item : users) { CsEventSendMsg csEventSendMsg = new CsEventSendMsg();
CsEventSendMsg csEventSendMsg = new CsEventSendMsg(); csEventSendMsg.setUserId(item.getId());
csEventSendMsg.setUserId(item.getId()); csEventSendMsg.setEventId(id);
csEventSendMsg.setEventId(id); csEventSendMsg.setSendTime(LocalDateTime.now());
csEventSendMsg.setSendTime(LocalDateTime.now()); if (Objects.isNull(item.getDevCode())){
if (Objects.isNull(item.getDevCode())){ csEventSendMsg.setStatus(0);
csEventSendMsg.setStatus(0); csEventSendMsg.setRemark("用户设备识别码为空");
csEventSendMsg.setRemark("用户设备识别码为空"); } else {
} else { csEventSendMsg.setDevCode(item.getDevCode());
csEventSendMsg.setDevCode(item.getDevCode()); csEventSendMsg.setStatus(1);
csEventSendMsg.setStatus(1); }
csEventSendMsgList.add(csEventSendMsg);
} }
csEventSendMsgList.add(csEventSendMsg); eventLogsFeignClient.addLogs(csEventSendMsgList);
} //事件用户关系入库
eventLogsFeignClient.addLogs(csEventSendMsgList); if (CollectionUtil.isNotEmpty(result)){
//事件用户关系入库 csEventUserService.saveBatch(result);
if (CollectionUtil.isNotEmpty(result)){
csEventUserService.saveBatch(result);
}
}
/**
* 发送通知消息
*/
public void sendEventToUser(NoticeUserDto noticeUserDto) {
try {
// 创建一个URL对象指定目标HTTPS接口地址
//URL url = new URL("https://fc-mp-b46c4dff-7244-4f7c-ae8b-7c1194d8cce8.next.bspapp.com/push");
URL url = new URL("https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push");
// 打开HTTPS连接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法为POST
connection.setRequestMethod("POST");
// 设置请求头指定Content-Type为application/json
connection.setRequestProperty("Content-Type", "application/json");
// 启用输出流以发送JSON数据
connection.setDoOutput(true);
// 将JSON数据写入输出流
OutputStream outputStream = connection.getOutputStream();
log.info(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid"));
outputStream.write(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid").getBytes(StandardCharsets.UTF_8));
outputStream.flush();
outputStream.close();
// 获取响应代码
int responseCode = connection.getResponseCode();
log.info("Response Code: " + responseCode);
// 读取响应数据
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
StringBuilder response = new StringBuilder();
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
} }
reader.close();
// 打印响应内容
log.info("Response Content: " + response.toString());
// 关闭连接
connection.disconnect();
} catch (IOException e) {
e.getMessage();
} }
} }

View File

@@ -1,10 +1,10 @@
package com.njcn.message; package com.njcn.message;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.DependsOn;
/** /**
@@ -13,9 +13,9 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
* @date 2021年12月09日 20:59 * @date 2021年12月09日 20:59
*/ */
@Slf4j @Slf4j
//@MapperScan("com.njcn.**.mapper")
@EnableFeignClients(basePackages = "com.njcn") @EnableFeignClients(basePackages = "com.njcn")
@SpringBootApplication(scanBasePackages = "com.njcn") @SpringBootApplication(scanBasePackages = "com.njcn")
@DependsOn("proxyMapperRegister")
public class MessageBootApplication { public class MessageBootApplication {
public static void main(String[] args) { public static void main(String[] args) {