diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/NoticeUserDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/NoticeUserDto.java similarity index 96% rename from iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/NoticeUserDto.java rename to iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/NoticeUserDto.java index 7aea205..29d7bb2 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/dto/NoticeUserDto.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/NoticeUserDto.java @@ -1,4 +1,4 @@ -package com.njcn.zlevent.pojo.dto; +package com.njcn.access.pojo.dto; import com.njcn.access.annotation.ParamName; import io.swagger.annotations.ApiModelProperty; diff --git a/iot-access/access-api/src/main/java/com/njcn/access/utils/SendMessageUtil.java b/iot-access/access-api/src/main/java/com/njcn/access/utils/SendMessageUtil.java new file mode 100644 index 0000000..5e52613 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/utils/SendMessageUtil.java @@ -0,0 +1,63 @@ +package com.njcn.access.utils; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.access.pojo.dto.NoticeUserDto; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +/** + * 推送消息 + * @author xy + */ +@Component +@Slf4j +@AllArgsConstructor +public class SendMessageUtil { + + public void sendEventToUser(NoticeUserDto noticeUserDto) { + try { + // 创建一个URL对象,指定目标HTTPS接口地址 + URL url = new URL("https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push"); + // 打开HTTPS连接 + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + // 设置请求方法为POST + connection.setRequestMethod("POST"); + // 设置请求头,指定Content-Type为application/json + connection.setRequestProperty("Content-Type", "application/json"); + // 启用输出流以发送JSON数据 + connection.setDoOutput(true); + // 将JSON数据写入输出流 + OutputStream outputStream = connection.getOutputStream(); + log.info(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid")); + outputStream.write(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid").getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + outputStream.close(); + // 获取响应代码 + int responseCode = connection.getResponseCode(); + log.info("Response Code: " + responseCode); + // 读取响应数据 + BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String inputLine; + StringBuilder response = new StringBuilder(); + while ((inputLine = reader.readLine()) != null) { + response.append(inputLine); + } + reader.close(); + // 打印响应内容 + log.info("Response Content: " + response.toString()); + // 关闭连接 + connection.disconnect(); + } catch (IOException e) { + e.getMessage(); + } + } +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 0b967d0..c90e802 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -339,7 +339,7 @@ public class MqttMessageHandler { //更新电网侧、负载侧监测点信息 askDevData(nDid,version,3,(res.getMid()+1)); //录波任务倒计时 - redisUtil.saveByKeyWithExpire("startFile",null,120L); + redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L); } else { log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); logDto.setResult(0); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index ee02ecf..dfe0cc5 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -1,13 +1,14 @@ package com.njcn.access.listener; -import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.njcn.access.enums.AccessEnum; +import com.njcn.access.pojo.dto.NoticeUserDto; import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; import com.njcn.access.utils.MqttUtil; +import com.njcn.access.utils.SendMessageUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.csdevice.api.CsDeviceUserFeignClient; import com.njcn.csdevice.api.CsLedgerFeignClient; @@ -20,7 +21,6 @@ import com.njcn.redis.utils.RedisUtil; import com.njcn.user.api.AppUserFeignClient; import com.njcn.user.api.UserFeignClient; import com.njcn.user.pojo.po.User; -import com.njcn.zlevent.pojo.dto.NoticeUserDto; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.Message; @@ -29,13 +29,6 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; @@ -79,6 +72,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private UserFeignClient userFeignClient; @Resource private RedisUtil redisUtil; + @Resource + private SendMessageUtil sendMessageUtil; private final Object lock = new Object(); public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { @@ -153,7 +148,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) { synchronized (lock) { NoticeUserDto dto = sendOffLine(nDid); - sendEventToUser(dto); + sendMessageUtil.sendEventToUser(dto); addLogs(dto); String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { @@ -178,7 +173,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene MAX_WARNING_TIMES++; if (MAX_WARNING_TIMES == 30) { NoticeUserDto dto2 = sendConnectFail(nDid); - sendEventToUser(dto2); + sendMessageUtil.sendEventToUser(dto2); addLogs(dto2); } //记录装置掉线时间 @@ -194,7 +189,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene MAX_WARNING_TIMES++; if (MAX_WARNING_TIMES == 30) { NoticeUserDto dto2 = sendConnectFail(nDid); - sendEventToUser(dto2); + sendMessageUtil.sendEventToUser(dto2); addLogs(dto2); } logDto.setResult(0); @@ -260,42 +255,4 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene List users = userFeignClient.appuserByIdList(adminList).getData(); return users.stream().map(User::getDevCode).filter(Objects::nonNull).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList()); } - - public void sendEventToUser(NoticeUserDto noticeUserDto) { - try { - // 创建一个URL对象,指定目标HTTPS接口地址 - URL url = new URL("https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push"); - // 打开HTTPS连接 - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - // 设置请求方法为POST - connection.setRequestMethod("POST"); - // 设置请求头,指定Content-Type为application/json - connection.setRequestProperty("Content-Type", "application/json"); - // 启用输出流以发送JSON数据 - connection.setDoOutput(true); - // 将JSON数据写入输出流 - OutputStream outputStream = connection.getOutputStream(); - log.info(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid")); - outputStream.write(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid").getBytes(StandardCharsets.UTF_8)); - outputStream.flush(); - outputStream.close(); - // 获取响应代码 - int responseCode = connection.getResponseCode(); - log.info("Response Code: " + responseCode); - // 读取响应数据 - BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream())); - String inputLine; - StringBuilder response = new StringBuilder(); - while ((inputLine = reader.readLine()) != null) { - response.append(inputLine); - } - reader.close(); - // 打印响应内容 - log.info("Response Content: " + response.toString()); - // 关闭连接 - connection.disconnect(); - } catch (IOException e) { - e.getMessage(); - } - } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 2dffcd5..bb3cbf7 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -34,6 +34,7 @@ import com.njcn.system.enums.DicDataEnum; import com.njcn.system.pojo.po.SysDicTreePO; import com.njcn.web.utils.RequestUtil; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -52,6 +53,7 @@ import java.util.stream.Collectors; */ @Service @AllArgsConstructor +@Slf4j public class CsDeviceServiceImpl implements ICsDeviceService { private static final Logger logger = LoggerFactory.getLogger(CsDeviceServiceImpl.class); @@ -575,7 +577,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //发起接入 publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())),1,false); //录波任务倒计时 - redisUtil.saveByKeyWithExpire("startFile",null,120L); + redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L); result = true; } } catch (InterruptedException e) { diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java index 2be63fe..606c75c 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java @@ -98,6 +98,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene downloadFile(missingList,dto.getNDid(),fileName); } } + //重新接入之后,装置60s后开始消费缓存的录波文件 + else if (expiredKey.startsWith("startFile:")) { + String nDid = expiredKey.split(":")[1]; + //处理缓存数据 + csWaveAnalysisService.channelWave(nDid); + } //手动文件下载 else if (expiredKey.startsWith(AppRedisKey.FILE_DOWN_TIME)) { List missingList = new ArrayList<>(); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java index e82f7e4..1a979a6 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsAlarmServiceImpl.java @@ -77,9 +77,9 @@ public class CsAlarmServiceImpl extends ServiceImpl im //推送事件逻辑处理 && cs_event_user入库 && 修改字典中告警事件的编码 for (AppEventMessage.DataArray item : dataArray) { if (Objects.isNull(item.getCode())){ - sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getName(),eventTime,id); + sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid()); } else { - sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getCode(),eventTime,id); + sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getCode(),eventTime,id,po.getNdid()); //更新字典信息 EleEpdPqd eleEpdPqd = epdFeignClient.findByName(item.getName()).getData(); EleEpdPqdParam.EleEpdPqdUpdateParam updateParam = new EleEpdPqdParam.EleEpdPqdUpdateParam(); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java index aa1793c..2ee70a9 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveAnalysisServiceImpl.java @@ -129,6 +129,8 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService { } else { throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING); } + } else { + throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE); } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index c949da9..868ef59 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -169,7 +169,7 @@ public class EventServiceImpl implements IEventService { csEventService.saveBatch(list1); //推送事件逻辑处理 && cs_event_user入库 for (AppEventMessage.DataArray item : dataArray) { - sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,id); + sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid()); } } //evt_data入库 diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index 961a85d..27d7777 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -16,7 +16,6 @@ import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.config.GeneralInfo; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.EquipmentFeignClient; -import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csharmonic.api.WavePicFeignClient; import com.njcn.mq.message.AppFileMessage; import com.njcn.oss.constant.GeneralConstant; @@ -24,15 +23,16 @@ import com.njcn.oss.constant.OssPath; import com.njcn.oss.utils.FileStorageUtil; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; -import com.njcn.system.api.DictTreeFeignClient; -import com.njcn.system.enums.DicTreeEnum; import com.njcn.zlevent.param.CsEventParam; import com.njcn.zlevent.pojo.dto.FileInfoDto; import com.njcn.zlevent.pojo.dto.FileStreamDto; import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.pojo.po.CsEventFileLogs; import com.njcn.zlevent.pojo.po.CsWave; -import com.njcn.zlevent.service.*; +import com.njcn.zlevent.service.ICsEventFileLogsService; +import com.njcn.zlevent.service.ICsEventService; +import com.njcn.zlevent.service.ICsWaveService; +import com.njcn.zlevent.service.IFileService; import com.njcn.zlevent.utils.RemoveInfoUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -65,11 +65,9 @@ public class FileServiceImpl implements IFileService { private final WavePicFeignClient wavePicFeignClient; private final ICsWaveService csWaveService; private final GeneralInfo generalInfo; - private final ICsWaveAnalysisService iCsWaveAnalysisService; private final ChannelObjectUtil channelObjectUtil; private final RemoveInfoUtils removeInfoUtils; private static Integer mid = 1; - private final DictTreeFeignClient dictTreeFeignClient; private final EquipmentFeignClient equipmentFeignClient; @Override @@ -141,10 +139,8 @@ public class FileServiceImpl implements IFileService { csEventLogs.setNdid(appFileMessage.getId()); csEventLogs.setFileName(appFileMessage.getMsg().getName()); csEventLogs.setStatus(0); - //获取装置id - CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appFileMessage.getId()).getData(); - //设备型号 - String code = dictTreeFeignClient.queryById(po.getDevModel()).getData().getCode(); + //判断设备类型 true:治理设备 false:其他设备 + boolean devModel = equipmentFeignClient.judgeDevModel(appFileMessage.getId()).getData(); try { //todo 目前文件先只处理波形事件的,后续有其他文件再做处理 String fileName = appFileMessage.getMsg().getName(); @@ -266,7 +262,7 @@ public class FileServiceImpl implements IFileService { filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); List eventList = correlateEvents(fileInfoDto,filePath,fileName); //波形文件解析成图片 - if (CollectionUtil.isNotEmpty(eventList) && Objects.equals(DicTreeEnum.PQ_COM.getCode(),code)){ + if (CollectionUtil.isNotEmpty(eventList) && devModel){ eventList.forEach(wavePicFeignClient::getWavePics); } //解析完删除、处理缓存 @@ -312,7 +308,7 @@ public class FileServiceImpl implements IFileService { filePath = filePath.replaceAll(GeneralConstant.CFG, "").replaceAll(GeneralConstant.DAT, ""); List eventList = correlateEvents(fileInfoDto, filePath, fileName); //波形文件解析成图片 - if (CollectionUtil.isNotEmpty(eventList) && Objects.equals(DicTreeEnum.PQ_COM.getCode(),code)) { + if (CollectionUtil.isNotEmpty(eventList) && devModel) { eventList.forEach(wavePicFeignClient::getWavePics); } redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); @@ -357,6 +353,18 @@ public class FileServiceImpl implements IFileService { csEventLogs.setCompleteTime(LocalDateTime.now()); //记录日志 csEventLogsService.save(csEventLogs); + //清空缓存 + redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); + redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); + //删除临时文件 + String fileName = appFileMessage.getMsg().getName(); + String lsFileName = generalInfo.getBusinessTempPath() + File.separator + fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1]; + File file = new File(lsFileName); + if (file.exists()) { + file.delete(); + } + //继续消费 + removeInfoUtils.deleteEventInfo(appFileMessage.getId(),appFileMessage.getMsg().getName()); } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java index 2033128..8a12690 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/RemoveInfoUtils.java @@ -46,7 +46,7 @@ public class RemoveInfoUtils { // 删除相关的 Redis 键 deleteRelatedKeys(nDid, fileName); // 处理重试逻辑 - if (retryTimes < 3) { + if (retryTimes < 4) { // 增加重试次数并保存 redisUtil.saveByKey(("retryEvent:" + nDid + fileName), retryTimes + 1); // 重排文件列表 @@ -54,6 +54,8 @@ public class RemoveInfoUtils { } else { // 从列表中移除文件 removeFileFromList(nDid, fileName); + // 清空计数 + redisUtil.delete(("retryEvent:" + nDid + fileName)); } // 检查是否还有其他文件需要处理 List fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid), WaveTimeDto.class); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/SendEventUtils.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/SendEventUtils.java index b022dcb..6d6b7e9 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/SendEventUtils.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/SendEventUtils.java @@ -2,9 +2,11 @@ package com.njcn.zlevent.utils; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; -import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.access.pojo.dto.NoticeUserDto; +import com.njcn.access.utils.SendMessageUtil; import com.njcn.csdevice.api.CsDeviceUserFeignClient; import com.njcn.csdevice.api.CsLedgerFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.EventLogsFeignClient; import com.njcn.csdevice.pojo.dto.DevDetailDTO; import com.njcn.csdevice.pojo.po.CsEventSendMsg; @@ -15,20 +17,12 @@ import com.njcn.user.api.AppUserFeignClient; import com.njcn.user.api.UserFeignClient; import com.njcn.user.pojo.po.User; import com.njcn.user.pojo.po.app.AppInfoSet; -import com.njcn.zlevent.pojo.dto.NoticeUserDto; import com.njcn.zlevent.service.ICsEventUserService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; @@ -48,27 +42,24 @@ public class SendEventUtils { @Resource private UserFeignClient userFeignClient; - @Resource private AppUserFeignClient appUserFeignClient; - @Resource private CsDeviceUserFeignClient csDeviceUserFeignClient; - @Resource private AppInfoSetFeignClient appInfoSetFeignClient; - @Resource private EventLogsFeignClient eventLogsFeignClient; - @Resource private EpdFeignClient epdFeignClient; - @Resource private ICsEventUserService csEventUserService; - @Resource private CsLedgerFeignClient csLedgerFeignclient; + @Resource + private SendMessageUtil sendMessageUtil; + @Resource + private EquipmentFeignClient equipmentFeignClient; /** * 事件推送给相关用户 @@ -80,7 +71,7 @@ public class SendEventUtils { * @param id 事件id */ @Transactional(rollbackFor = Exception.class) - public void sendUser(Integer eventType,String type,String devId, String eventName, LocalDateTime eventTime, String id) { + public void sendUser(Integer eventType,String type,String devId, String eventName, LocalDateTime eventTime, String id, String nDid) { int code; List users = new ArrayList<>(); List eventUser; @@ -91,209 +82,171 @@ public class SendEventUtils { NoticeUserDto.Payload payload = new NoticeUserDto.Payload(); String content; List result = new ArrayList<>(); - //事件处理 - if (eventType == 1){ - eventName = epdFeignClient.findByName(eventName).getData().getShowName(); - switch (type) { - case "1": - code = 2; - //设备自身事件 不推送给用户,推送给业务管理 - eventUser = getEventUser(devId,false); - if (CollectionUtil.isNotEmpty(eventUser)) { - eventUser.forEach(item->{ - CsEventUserPO csEventUser = new CsEventUserPO(); - csEventUser.setUserId(item); - csEventUser.setStatus(0); - csEventUser.setEventId(id); - result.add(csEventUser); - }); + //获取设备类型 true:治理设备 false:其他类型的设备 + boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData(); + if (devModel) { + //事件处理 + if (eventType == 1){ + eventName = epdFeignClient.findByName(eventName).getData().getShowName(); + switch (type) { + case "1": + code = 2; + //设备自身事件 不推送给用户,推送给业务管理 + eventUser = getEventUser(devId,false); + if (CollectionUtil.isNotEmpty(eventUser)) { + eventUser.forEach(item->{ + CsEventUserPO csEventUser = new CsEventUserPO(); + csEventUser.setUserId(item); + csEventUser.setStatus(0); + csEventUser.setEventId(id); + result.add(csEventUser); + }); - users = getSendUser(eventUser,2); - if (CollectionUtil.isNotEmpty(users)){ - for (User user : users){ - userList.add(user.getDevCode()); + users = getSendUser(eventUser,2); + if (CollectionUtil.isNotEmpty(users)){ + for (User user : users){ + userList.add(user.getDevCode()); + } + noticeUserDto.setPushClientId(userList); + noticeUserDto.setTitle("设备事件"); } - noticeUserDto.setPushClientId(userList); - noticeUserDto.setTitle("设备事件"); } - } - break; - case "2": - code = 0; - //暂态事件 - eventUser = getEventUser(devId,true); - if (CollectionUtil.isNotEmpty(eventUser)) { - eventUser.forEach(item->{ - CsEventUserPO csEventUser = new CsEventUserPO(); - csEventUser.setUserId(item); - csEventUser.setStatus(0); - csEventUser.setEventId(id); - result.add(csEventUser); - }); - users = getSendUser(eventUser,0); - if (CollectionUtil.isNotEmpty(users)){ - devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); - noticeUserDto.setPushClientId(devCodeList); - noticeUserDto.setTitle("暂态事件"); + break; + case "2": + code = 0; + //暂态事件 + eventUser = getEventUser(devId,true); + if (CollectionUtil.isNotEmpty(eventUser)) { + eventUser.forEach(item->{ + CsEventUserPO csEventUser = new CsEventUserPO(); + csEventUser.setUserId(item); + csEventUser.setStatus(0); + csEventUser.setEventId(id); + result.add(csEventUser); + }); + users = getSendUser(eventUser,0); + if (CollectionUtil.isNotEmpty(users)){ + devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); + noticeUserDto.setPushClientId(devCodeList); + noticeUserDto.setTitle("暂态事件"); + } } - } - break; - case "3": - code = 1; - //稳态事件 - eventUser = getEventUser(devId,true); - if (CollectionUtil.isNotEmpty(eventUser)) { - eventUser.forEach(item->{ - CsEventUserPO csEventUser = new CsEventUserPO(); - csEventUser.setUserId(item); - csEventUser.setStatus(0); - csEventUser.setEventId(id); - result.add(csEventUser); - }); - users = getSendUser(eventUser,1); - if (CollectionUtil.isNotEmpty(users)){ - devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); - noticeUserDto.setPushClientId(devCodeList); - noticeUserDto.setTitle("稳态事件"); + break; + case "3": + code = 1; + //稳态事件 + eventUser = getEventUser(devId,true); + if (CollectionUtil.isNotEmpty(eventUser)) { + eventUser.forEach(item->{ + CsEventUserPO csEventUser = new CsEventUserPO(); + csEventUser.setUserId(item); + csEventUser.setStatus(0); + csEventUser.setEventId(id); + result.add(csEventUser); + }); + users = getSendUser(eventUser,1); + if (CollectionUtil.isNotEmpty(users)){ + devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); + noticeUserDto.setPushClientId(devCodeList); + noticeUserDto.setTitle("稳态事件"); + } } - } - break; - default: - code = 0; - break; + break; + default: + code = 0; + break; + } + //获取台账信息 + DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData(); + content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生" + eventName; + noticeUserDto.setContent(content); + payload.setType(code); + payload.setPath("/pages/message/message?type="+payload.getType()); + noticeUserDto.setPayload(payload); } - //获取台账信息 - DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData(); - content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生" + eventName; - noticeUserDto.setContent(content); - payload.setType(code); - payload.setPath("/pages/message/message?type="+payload.getType()); - noticeUserDto.setPayload(payload); - } - //告警处理 - else if (eventType == 2){ - switch (type) { - case "1": - //Ⅰ级告警 不推送给用户,推送给业务管理 - eventUser = getEventUser(devId,false); - if (CollectionUtil.isNotEmpty(eventUser)) { - eventUser.forEach(item->{ - CsEventUserPO csEventUser = new CsEventUserPO(); - csEventUser.setUserId(item); - csEventUser.setStatus(0); - csEventUser.setEventId(id); - result.add(csEventUser); - }); - users = getSendUser(eventUser,3); - if (CollectionUtil.isNotEmpty(users)){ - eventName = epdFeignClient.findByName(eventName).getData().getShowName(); - devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); - noticeUserDto.setPushClientId(devCodeList); + //告警处理 + else if (eventType == 2){ + switch (type) { + case "1": + //Ⅰ级告警 不推送给用户,推送给业务管理 + eventUser = getEventUser(devId,false); + if (CollectionUtil.isNotEmpty(eventUser)) { + eventUser.forEach(item->{ + CsEventUserPO csEventUser = new CsEventUserPO(); + csEventUser.setUserId(item); + csEventUser.setStatus(0); + csEventUser.setEventId(id); + result.add(csEventUser); + }); + users = getSendUser(eventUser,3); + if (CollectionUtil.isNotEmpty(users)){ + eventName = epdFeignClient.findByName(eventName).getData().getShowName(); + devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); + noticeUserDto.setPushClientId(devCodeList); + } } - } - break; - case "2": - eventName = epdFeignClient.findByName(eventName).getData().getShowName(); - case "3": - //Ⅱ、Ⅲ级告警推送相关用户及业务管理员 - eventUser = getEventUser(devId,true); - if (CollectionUtil.isNotEmpty(eventUser)) { - eventUser.forEach(item->{ - CsEventUserPO csEventUser = new CsEventUserPO(); - csEventUser.setUserId(item); - csEventUser.setStatus(0); - csEventUser.setEventId(id); - result.add(csEventUser); - }); - users = getSendUser(eventUser,3); - if (CollectionUtil.isNotEmpty(users)){ - devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); - noticeUserDto.setPushClientId(devCodeList); + break; + case "2": + eventName = epdFeignClient.findByName(eventName).getData().getShowName(); + case "3": + //Ⅱ、Ⅲ级告警推送相关用户及业务管理员 + eventUser = getEventUser(devId,true); + if (CollectionUtil.isNotEmpty(eventUser)) { + eventUser.forEach(item->{ + CsEventUserPO csEventUser = new CsEventUserPO(); + csEventUser.setUserId(item); + csEventUser.setStatus(0); + csEventUser.setEventId(id); + result.add(csEventUser); + }); + users = getSendUser(eventUser,3); + if (CollectionUtil.isNotEmpty(users)){ + devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList()); + noticeUserDto.setPushClientId(devCodeList); + } } - } - break; - default: - break; + break; + default: + break; + } + noticeUserDto.setTitle("告警事件"); + DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData(); + content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生告警,告警信息:" + eventName; + noticeUserDto.setContent(content); + payload.setType(3); + payload.setPath("/pages/message/message?type="+payload.getType()); + noticeUserDto.setPayload(payload); } - noticeUserDto.setTitle("告警事件"); - DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData(); - content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生告警,告警信息:" + eventName; - noticeUserDto.setContent(content); - payload.setType(3); - payload.setPath("/pages/message/message?type="+payload.getType()); - noticeUserDto.setPayload(payload); - } - if (CollectionUtil.isNotEmpty(noticeUserDto.getPushClientId())) { - List filteredList = noticeUserDto.getPushClientId().stream() - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toList()); - if (CollectionUtil.isNotEmpty(filteredList)) { - noticeUserDto.setPushClientId(filteredList); - sendEventToUser(noticeUserDto); + if (CollectionUtil.isNotEmpty(noticeUserDto.getPushClientId())) { + List filteredList = noticeUserDto.getPushClientId().stream() + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + if (CollectionUtil.isNotEmpty(filteredList)) { + noticeUserDto.setPushClientId(filteredList); + sendMessageUtil.sendEventToUser(noticeUserDto); + } } - } - //记录推送日志 - for (User item : users) { - CsEventSendMsg csEventSendMsg = new CsEventSendMsg(); - csEventSendMsg.setUserId(item.getId()); - csEventSendMsg.setEventId(id); - csEventSendMsg.setSendTime(LocalDateTime.now()); - if (Objects.isNull(item.getDevCode())){ - csEventSendMsg.setStatus(0); - csEventSendMsg.setRemark("用户设备识别码为空"); - } else { - csEventSendMsg.setDevCode(item.getDevCode()); - csEventSendMsg.setStatus(1); + //记录推送日志 + for (User item : users) { + CsEventSendMsg csEventSendMsg = new CsEventSendMsg(); + csEventSendMsg.setUserId(item.getId()); + csEventSendMsg.setEventId(id); + csEventSendMsg.setSendTime(LocalDateTime.now()); + if (Objects.isNull(item.getDevCode())){ + csEventSendMsg.setStatus(0); + csEventSendMsg.setRemark("用户设备识别码为空"); + } else { + csEventSendMsg.setDevCode(item.getDevCode()); + csEventSendMsg.setStatus(1); + } + csEventSendMsgList.add(csEventSendMsg); } - csEventSendMsgList.add(csEventSendMsg); - } - eventLogsFeignClient.addLogs(csEventSendMsgList); - //事件用户关系入库 - if (CollectionUtil.isNotEmpty(result)){ - csEventUserService.saveBatch(result); - } - } - - /** - * 发送通知消息 - */ - public void sendEventToUser(NoticeUserDto noticeUserDto) { - try { - // 创建一个URL对象,指定目标HTTPS接口地址 - //URL url = new URL("https://fc-mp-b46c4dff-7244-4f7c-ae8b-7c1194d8cce8.next.bspapp.com/push"); - URL url = new URL("https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push"); - // 打开HTTPS连接 - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - // 设置请求方法为POST - connection.setRequestMethod("POST"); - // 设置请求头,指定Content-Type为application/json - connection.setRequestProperty("Content-Type", "application/json"); - // 启用输出流以发送JSON数据 - connection.setDoOutput(true); - // 将JSON数据写入输出流 - OutputStream outputStream = connection.getOutputStream(); - log.info(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid")); - outputStream.write(new Gson().toJson(noticeUserDto).replace("pushClientId", "push_clientid").getBytes(StandardCharsets.UTF_8)); - outputStream.flush(); - outputStream.close(); - // 获取响应代码 - int responseCode = connection.getResponseCode(); - log.info("Response Code: " + responseCode); - // 读取响应数据 - BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream())); - String inputLine; - StringBuilder response = new StringBuilder(); - while ((inputLine = reader.readLine()) != null) { - response.append(inputLine); + eventLogsFeignClient.addLogs(csEventSendMsgList); + //事件用户关系入库 + if (CollectionUtil.isNotEmpty(result)){ + csEventUserService.saveBatch(result); } - reader.close(); - // 打印响应内容 - log.info("Response Content: " + response.toString()); - // 关闭连接 - connection.disconnect(); - } catch (IOException e) { - e.getMessage(); } } diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java b/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java index 2a8a37e..36b620a 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java @@ -1,10 +1,10 @@ package com.njcn.message; import lombok.extern.slf4j.Slf4j; -import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; +import org.springframework.context.annotation.DependsOn; /** @@ -13,9 +13,9 @@ import org.springframework.cloud.openfeign.EnableFeignClients; * @date 2021年12月09日 20:59 */ @Slf4j -//@MapperScan("com.njcn.**.mapper") @EnableFeignClients(basePackages = "com.njcn") @SpringBootApplication(scanBasePackages = "com.njcn") +@DependsOn("proxyMapperRegister") public class MessageBootApplication { public static void main(String[] args) {