diff --git a/iot-access/access-api/pom.xml b/iot-access/access-api/pom.xml index 49baaac..117a9eb 100644 --- a/iot-access/access-api/pom.xml +++ b/iot-access/access-api/pom.xml @@ -20,6 +20,11 @@ + + com.njcn + common-mq + ${project.version} + com.github.tocrhz mqtt-spring-boot-starter diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/CsHeartbeatFeignClient.java b/iot-access/access-api/src/main/java/com/njcn/access/api/CsHeartbeatFeignClient.java new file mode 100644 index 0000000..f591d02 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/CsHeartbeatFeignClient.java @@ -0,0 +1,22 @@ +package com.njcn.access.api; + +import com.njcn.access.api.fallback.CsHeartbeatClientFallbackFactory; +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.HeartbeatTimeoutMessage; +import io.swagger.annotations.ApiOperation; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +/** + * @author xy + */ +@FeignClient(value = ServerInfo.ACCESS_BOOT, path = "/heartbeat", fallbackFactory = CsHeartbeatClientFallbackFactory.class,contextId = "heartbeat") +public interface CsHeartbeatFeignClient { + + @PostMapping("/handleHeartbeat") + @ApiOperation("处理物联设备心跳") + HttpResult handleHeartbeat(@RequestBody HeartbeatTimeoutMessage message); + +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/CsHeartbeatClientFallbackFactory.java b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/CsHeartbeatClientFallbackFactory.java new file mode 100644 index 0000000..2839eec --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/api/fallback/CsHeartbeatClientFallbackFactory.java @@ -0,0 +1,35 @@ +package com.njcn.access.api.fallback; + +import com.njcn.access.api.CsHeartbeatFeignClient; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.HeartbeatTimeoutMessage; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author xy + */ +@Slf4j +@Component +public class CsHeartbeatClientFallbackFactory implements FallbackFactory { + @Override + public CsHeartbeatFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if (cause.getCause() instanceof BusinessException) { + BusinessException businessException = (BusinessException) cause.getCause(); + } + Enum finalExceptionEnum = exceptionEnum; + return new CsHeartbeatFeignClient() { + + @Override + public HttpResult handleHeartbeat(HeartbeatTimeoutMessage message) { + log.error("{}异常,降级处理,异常为:{}","处理物联设备心跳数据异常",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsHeartController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsHeartController.java new file mode 100644 index 0000000..e3ab18c --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsHeartController.java @@ -0,0 +1,48 @@ +package com.njcn.access.controller; + +import com.njcn.access.service.ICsHeartService; +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.mq.message.HeartbeatTimeoutMessage; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import springfox.documentation.annotations.ApiIgnore; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/6 11:07 + */ +@Slf4j +@RestController +@RequestMapping("/heartbeat") +@Api(tags = "心跳") +@AllArgsConstructor +@ApiIgnore +public class CsHeartController extends BaseController { + + private final ICsHeartService csHeartService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/handleHeartbeat") + @ApiOperation("处理物联设备心跳") + @ApiImplicitParam(name = "message", value = "message", required = true) + public HttpResult handleHeartbeat(@RequestBody HeartbeatTimeoutMessage message){ + String methodDescribe = getMethodDescribe("handleHeartbeat"); + csHeartService.handleHeartbeat(message); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index e2483f8..34e8547 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -22,10 +22,7 @@ import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.param.ReqAndResParam; import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.pojo.po.CsTopic; -import com.njcn.access.service.ICsDeviceOnlineLogsService; -import com.njcn.access.service.ICsEquipmentDeliveryService; -import com.njcn.access.service.ICsLineModelService; -import com.njcn.access.service.ICsTopicService; +import com.njcn.access.service.*; import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.exception.BusinessException; @@ -103,6 +100,7 @@ public class MqttMessageHandler { private final WaveFeignClient waveFeignClient; private final RtFeignClient rtFeignClient; private final CsCommunicateFeignClient csCommunicateFeignClient; + private final IHeartbeatService heartbeatService; @Autowired Validator validator; @@ -328,7 +326,7 @@ public class MqttMessageHandler { //更新电网侧、负载侧监测点信息 askDevData(nDid,version,3,(res.getMid()+1)); //接入后系统重置装置心跳 - redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L); + heartbeatService.receiveHeartbeat(nDid); //修改redis的mid redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1); //接入成功标识 @@ -349,7 +347,6 @@ public class MqttMessageHandler { if (!Objects.isNull(rspDataDto.getDataType())) { switch (rspDataDto.getDataType()){ case 1: - log.info("{},设备数据应答--->更新设备软件信息", nDid); logDto.setOperate(nDid + "更新设备软件信息"); RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.SoftInfo.class); //记录设备软件信息 @@ -380,7 +377,6 @@ public class MqttMessageHandler { List devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class); if (CollectionUtil.isNotEmpty(devInfo)){ if (Objects.equals(res.getDid(),1)){ - log.info("{},设备数据应答--->更新治理监测点信息和设备容量", nDid); List list3 = new ArrayList<>(); boolean hasZeroClDid = devInfo.stream().anyMatch(item -> item.getClDid() == 0); //治理设备 @@ -409,7 +405,6 @@ public class MqttMessageHandler { equipmentFeignClient.updateModuleNumber(nDid,(devInfo.size()-1)); } } else if (Objects.equals(res.getDid(),2)) { - log.info("{},设备数据应答--->更新电网侧、负载侧监测点信息", nDid); logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息"); //1.更新电网侧、负载侧监测点相关信息 devInfo.forEach(item->{ @@ -419,14 +414,12 @@ public class MqttMessageHandler { } break; case 15: - log.info("{}模块{}:处理实时数据", nDid, rspDataDto.getClDid()); JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(res)); AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class); appAutoDataMessage.setId(nDid); rtFeignClient.apfRtAnalysis(appAutoDataMessage); break; case 48: - log.info("询问装置项目列表"); logDto.setUserName("运维管理员"); logDto.setOperate("监测点:" + (nDid + rspDataDto.getClDid()) + "询问项目列表"); List projectInfoList = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.ProjectInfo.class); @@ -439,7 +432,6 @@ public class MqttMessageHandler { } break; case 4663: - log.info("装置操作应答"); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ String key4 = AppRedisKey.CONTROL + nDid; redisUtil.saveByKeyWithExpire(key4,"success",10L); @@ -492,8 +484,7 @@ public class MqttMessageHandler { //响应请求 switch (res.getType()){ case 4865: - //设置心跳时间,超时改为掉线 - redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L); + heartbeatService.receiveHeartbeat(nDid); //有心跳,则将装置改成在线 //csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode()); //处理心跳 @@ -531,15 +522,12 @@ public class MqttMessageHandler { response.setPri(AccessEnum.FIRST_CHANNEL.getCode()); response.setType(Integer.parseInt(TypeEnum.TYPE_15.getCode())); response.setCode(200); - log.info("应答事件:{}", new Gson().toJson(response)); publisher.send("/Dev/DataRsp/"+version+"/"+nDid,new Gson().toJson(response),1,false); } //判断事件类型 switch (dataDto.getMsg().getDataAttr()) { //暂态事件、录波处理、工程信息 case 0: - log.info("{}处理事件", nDid); - //log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8)); EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class); JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto)); AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class); @@ -548,7 +536,6 @@ public class MqttMessageHandler { break; //实时数据 case 1: - log.info("{}处理实时数据", nDid); JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto)); AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class); appAutoDataMessage.setId(nDid); @@ -559,9 +546,6 @@ public class MqttMessageHandler { JSONObject jsonObject3 = JSONObject.parseObject(JSON.toJSONString(dataDto)); AppAutoDataMessage appAutoDataMessage2 = JSONObject.toJavaObject(jsonObject3, AppAutoDataMessage.class); appAutoDataMessage2.setId(nDid); - appAutoDataMessage2.getMsg().getDataArray().forEach(item->{ - log.info("{}处理统计数据{}", nDid, item.getDataAttr()); - }); appAutoDataMessageTemplate.sendMember(appAutoDataMessage2); break; default: @@ -593,7 +577,6 @@ public class MqttMessageHandler { //响应请求 switch (fileDto.getType()){ case 4657: - log.info("获取文件信息{}", fileDto); if (Objects.equals(fileDto.getCode(),AccessEnum.SUCCESS.getCode())) { String key = AppRedisKey.PROJECT_INFO + nDid; if (Objects.isNull(fileDto.getMsg().getType())) { @@ -626,7 +609,6 @@ public class MqttMessageHandler { } break; case 4658: - log.info("获取文件流信息"); FileRedisDto dto = new FileRedisDto(); dto.setCode(fileDto.getCode()); redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileDto.getMsg().getName() + fileDto.getMid(),dto,60L); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 24e57b6..d933915 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -1,25 +1,14 @@ package com.njcn.access.listener; -import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.date.DatePattern; import cn.hutool.core.util.ObjectUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.publisher.MqttPublisher; -import com.njcn.access.enums.AccessEnum; -import com.njcn.access.pojo.dto.NoticeUserDto; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.utils.RedisSetUtil; import com.njcn.access.utils.SendMessageUtil; -import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.csdevice.api.*; -import com.njcn.csdevice.param.DeviceMessageParam; -import com.njcn.csdevice.pojo.dto.DevDetailDTO; -import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; -import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; -import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.rt.pojo.dto.BaseRealDataSet; -import com.njcn.user.pojo.po.User; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.Message; @@ -28,12 +17,7 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.List; -import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; /** * @author xy @@ -80,10 +64,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene } //判断失效的key是否为MQTT消费端存入的 String expiredKey = message.toString(); - if(expiredKey.startsWith("MQTT:")){ - String nDid = expiredKey.split(":")[1]; - executeMainTask(nDid); - } +// if(expiredKey.startsWith("MQTT:")){ +// String nDid = expiredKey.split(":")[1]; +// executeMainTask(nDid); +// } if(expiredKey.startsWith("cldRtDataOverTime:")){ String lineId = expiredKey.split(":")[1]; Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId); @@ -100,75 +84,75 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene } } - //主任务 - //1.装置心跳断连 - //2.MQTT客户端不在线 - private void executeMainTask(String nDid) { - log.info("{}->装置离线", nDid); - DeviceLogDTO logDto = new DeviceLogDTO(); - logDto.setUserName("运维管理员"); - logDto.setLoginName("njcnyw"); - //装置下线 - csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); - //装置调整为注册状态 - csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null); - logDto.setOperate(nDid +"装置离线"); - sendMessage(nDid); - //记录装置掉线时间 - PqsCommunicateDto dto = new PqsCommunicateDto(); - dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); - dto.setDevId(nDid); - dto.setType(0); - dto.setDescription("通讯中断"); - csCommunicateFeignClient.insertion(dto); - csLogsFeignClient.addUserLog(logDto); - //清空缓存 - redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid); - } +// //主任务 +// //1.装置心跳断连 +// //2.MQTT客户端不在线 +// private void executeMainTask(String nDid) { +// log.info("{}->装置离线", nDid); +// DeviceLogDTO logDto = new DeviceLogDTO(); +// logDto.setUserName("运维管理员"); +// logDto.setLoginName("njcnyw"); +// //装置下线 +// csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); +// //装置调整为注册状态 +// csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null); +// logDto.setOperate(nDid +"装置离线"); +// sendMessage(nDid); +// //记录装置掉线时间 +// PqsCommunicateDto dto = new PqsCommunicateDto(); +// dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); +// dto.setDevId(nDid); +// dto.setType(0); +// dto.setDescription("通讯中断"); +// csCommunicateFeignClient.insertion(dto); +// csLogsFeignClient.addUserLog(logDto); +// //清空缓存 +// redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid); +// } - //判断设备型号发送数据 - private void sendMessage(String nDid) { - boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData(); - if (devModel) { - NoticeUserDto dto = sendOffLine(nDid); - if (CollectionUtil.isNotEmpty(dto.getPushClientId())) { - sendMessageUtil.sendEventToUser(dto); - addLogs(dto); - } - } - } +// //判断设备型号发送数据 +// private void sendMessage(String nDid) { +// boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData(); +// if (devModel) { +// NoticeUserDto dto = sendOffLine(nDid); +// if (CollectionUtil.isNotEmpty(dto.getPushClientId())) { +// sendMessageUtil.sendEventToUser(dto); +// addLogs(dto); +// } +// } +// } - //掉线通知 - private NoticeUserDto sendOffLine(String nDid) { - NoticeUserDto dto = new NoticeUserDto(); - dto.setTitle("设备离线"); - CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData(); - DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData(); - DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - LocalDateTime localDateTime = LocalDateTime.now(); - String dateStr = localDateTime.format(fmt); - String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + dateStr + "离线"); - dto.setContent(content); - //获取设备关联的用户 - List eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData(); - DeviceMessageParam param1 = new DeviceMessageParam(); - param1.setUserList(eventUser); - param1.setEventType(2); - //获取打开推送的用户 - List users = deviceMessageFeignClient.getSendUserByType(param1).getData(); - if (CollectionUtil.isNotEmpty(users)){ - dto.setPushClientId( - users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList())); - } - return dto; - } - - //日志记录 - private void addLogs(NoticeUserDto noticeUserDto) { - DeviceLogDTO dto = new DeviceLogDTO(); - dto.setUserName("运维管理员"); - dto.setLoginName("njcnyw"); - dto.setOperate(noticeUserDto.getContent()); - csLogsFeignClient.addUserLog(dto); - } +// //掉线通知 +// private NoticeUserDto sendOffLine(String nDid) { +// NoticeUserDto dto = new NoticeUserDto(); +// dto.setTitle("设备离线"); +// CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData(); +// DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData(); +// DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); +// LocalDateTime localDateTime = LocalDateTime.now(); +// String dateStr = localDateTime.format(fmt); +// String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + dateStr + "离线"); +// dto.setContent(content); +// //获取设备关联的用户 +// List eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData(); +// DeviceMessageParam param1 = new DeviceMessageParam(); +// param1.setUserList(eventUser); +// param1.setEventType(2); +// //获取打开推送的用户 +// List users = deviceMessageFeignClient.getSendUserByType(param1).getData(); +// if (CollectionUtil.isNotEmpty(users)){ +// dto.setPushClientId( +// users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList())); +// } +// return dto; +// } +// +// //日志记录 +// private void addLogs(NoticeUserDto noticeUserDto) { +// DeviceLogDTO dto = new DeviceLogDTO(); +// dto.setUserName("运维管理员"); +// dto.setLoginName("njcnyw"); +// dto.setOperate(noticeUserDto.getContent()); +// csLogsFeignClient.addUserLog(dto); +// } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/MqttHeartCheckTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/MqttHeartCheckTimer.java deleted file mode 100644 index 907d875..0000000 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/MqttHeartCheckTimer.java +++ /dev/null @@ -1,190 +0,0 @@ -package com.njcn.access.runner; - -import cn.hutool.core.collection.CollUtil; -import com.njcn.access.service.ICsEquipmentDeliveryService; -import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; -import com.njcn.redis.utils.RedisUtil; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; - -/** - * @author xy - * 定时1小时轮询设备列表,并使用多线程进行业务处理 - * 1. 定时查询所有设备列表 - * 2. 使用多线程处理每个设备的业务逻辑 - * 3. 处理完成后释放资源 - */ -@Component -@Slf4j -@RequiredArgsConstructor -public class MqttHeartCheckTimer implements ApplicationRunner { - - private final ICsEquipmentDeliveryService csEquipmentDeliveryService; - private final RedisUtil redisUtil; - private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - // 1小时间隔 - private static final long INTERVAL_MINUTES = 60L; - - - @Override - public void run(ApplicationArguments args) { - if (scheduler.isShutdown() || scheduler.isTerminated()) { - scheduler = Executors.newScheduledThreadPool(1); - } - - Runnable task = () -> { - try { - executeScheduledTask(); - } catch (Throwable t) { - log.error("定时设备处理任务发生严重异常", t); - } - }; - - // 每小时执行一次 - ScheduledFuture future = scheduler.scheduleWithFixedDelay( - task, - 0, - INTERVAL_MINUTES, - TimeUnit.MINUTES - ); - - // 添加监控,如果任务被取消则重新调度 - monitorScheduledTask(future); - } - - /** - * 监控定时任务 - */ - private void monitorScheduledTask(ScheduledFuture future) { - Thread monitorThread = new Thread(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - // 每10分钟检查一次任务状态 - Thread.sleep(600000); - if (future.isCancelled() || future.isDone()) { - log.warn("定时设备处理任务被取消或完成,重新调度..."); - // 重新启动任务 - run(null); - break; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("监控线程被中断"); - break; - } catch (Exception e) { - log.error("监控任务异常", e); - } - } - }, "Device-Processor-Monitor-Thread"); - monitorThread.setDaemon(true); - monitorThread.start(); - } - - /** - * 执行定时任务的主要逻辑 - */ - private void executeScheduledTask() { - log.info("开始执行定时设备处理任务(查看在线设备和心跳数据是否一致) - 时间: {}", System.currentTimeMillis()); - - try { - // 查询所有设备列表 - List deviceList = csEquipmentDeliveryService.getUseOnlineDevice(); - - if (CollUtil.isEmpty(deviceList)) { - log.info("设备列表为空,跳过处理"); - return; - } - - log.info("查询到 {} 个设备,开始多线程处理", deviceList.size()); - - // 创建线程池进行多线程处理,根据设备数量动态调整线程数 - // 最大10个线程 - int threadCount = Math.min(deviceList.size(), 10); - ExecutorService executor = Executors.newFixedThreadPool(threadCount); - - try { - // 将设备列表分批处理,每批处理一定数量的设备 - List> batches = CollUtil.split(deviceList, threadCount); - List> futures = new ArrayList<>(); - - for (List batch : batches) { - futures.add(executor.submit(() -> { - try { - processDeviceBatch(batch); - } catch (Exception e) { - log.error("处理设备批次异常", e); - } - return null; - })); - } - - // 等待所有任务完成,设置超时时间防止长时间阻塞 - for (Future future : futures) { - try { - // 设置10分钟超时 - future.get(10, TimeUnit.MINUTES); - } catch (TimeoutException e) { - log.error("设备批次处理超时", e); - } catch (Exception e) { - log.error("设备批次处理异常", e); - } - } - } finally { - executor.shutdown(); - try { - if (!executor.awaitTermination(2, TimeUnit.MINUTES)) { - log.warn("设备处理线程池未在规定时间内关闭,强制关闭"); - executor.shutdownNow(); - } - } catch (InterruptedException e) { - log.warn("等待线程池关闭时被中断"); - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - log.info("设备批量处理任务完成 - 时间: {}", System.currentTimeMillis()); - } catch (Exception e) { - log.error("执行设备处理任务异常", e); - } - } - - /** - * 批量处理设备 - */ - private void processDeviceBatch(List deviceBatch) { - if (CollUtil.isEmpty(deviceBatch)) { - return; - } - for (CsEquipmentDeliveryPO device : deviceBatch) { - try { - processSingleDevice(device); - } catch (Exception e) { - log.error("处理单个设备失败: 设备ID={}, 错误={}", device.getNdid(), e.getMessage(), e); - } - } - } - - /** - * 处理单个设备的业务逻辑 - * 注意:这里需要根据实际业务需求实现具体的业务逻辑 - */ - private void processSingleDevice(CsEquipmentDeliveryPO device) { - log.info("正在处理设备: {}", device.getNdid()); - // 1. 检查设备在Redis中的状态 - String deviceKey = "MQTT:" + device.getNdid(); - Object deviceStatus = redisUtil.getObjectByKey(deviceKey); - // 2. 如果没有心跳,则模拟补充个心跳 - if (deviceStatus == null) { - // 如果Redis中没有该设备的状态信息,可以设置默认值或执行相应处理 - redisUtil.saveByKeyWithExpire(deviceKey, device.getNdid(), 100L); - } - log.info("设备处理完成: {}", device.getNdid()); - } -} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsHeartService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsHeartService.java new file mode 100644 index 0000000..637bc0d --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsHeartService.java @@ -0,0 +1,12 @@ +package com.njcn.access.service; + +import com.njcn.mq.message.HeartbeatTimeoutMessage; + +/** + * @author xy + */ +public interface ICsHeartService { + + void handleHeartbeat(HeartbeatTimeoutMessage message); + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/IHeartbeatService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/IHeartbeatService.java new file mode 100644 index 0000000..ddf1d0a --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/IHeartbeatService.java @@ -0,0 +1,8 @@ +package com.njcn.access.service; + +public interface IHeartbeatService { + + void receiveHeartbeat(String nDid); + + Boolean isHeartbeatUpdated(String nDid, Long sendTime); +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsHeartServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsHeartServiceImpl.java new file mode 100644 index 0000000..2de481b --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsHeartServiceImpl.java @@ -0,0 +1,146 @@ +package com.njcn.access.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DatePattern; +import com.njcn.access.enums.AccessEnum; +import com.njcn.access.pojo.dto.NoticeUserDto; +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.access.service.ICsHeartService; +import com.njcn.access.service.IHeartbeatService; +import com.njcn.access.utils.SendMessageUtil; +import com.njcn.common.pojo.dto.DeviceLogDTO; +import com.njcn.csdevice.api.*; +import com.njcn.csdevice.param.DeviceMessageParam; +import com.njcn.csdevice.pojo.dto.DevDetailDTO; +import com.njcn.csdevice.pojo.dto.PqsCommunicateDto; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.mq.message.HeartbeatTimeoutMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.user.api.AppUserFeignClient; +import com.njcn.user.api.UserFeignClient; +import com.njcn.user.pojo.po.User; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + *

+ * 数据集表 服务实现类 + *

+ * + * @author xuyang + * @since 2023-08-01 + */ +@Service +@Slf4j +public class CsHeartServiceImpl implements ICsHeartService { + + @Resource + private ICsEquipmentDeliveryService csEquipmentDeliveryService; + @Resource + private CsLogsFeignClient csLogsFeignClient; + @Resource + private EquipmentFeignClient equipmentFeignClient; + @Resource + private SendMessageUtil sendMessageUtil; + @Resource + private CsLedgerFeignClient csLedgerFeignclient; + @Resource + private AppUserFeignClient appUserFeignClient; + @Resource + private CsDeviceUserFeignClient csDeviceUserFeignClient; + @Resource + private UserFeignClient userFeignClient; + @Resource + private IHeartbeatService heartbeatService; + @Resource + private CsCommunicateFeignClient csCommunicateFeignClient; + @Resource + private RedisUtil redisUtil; + @Resource + private DeviceMessageFeignClient deviceMessageFeignClient; + + @Override + public void handleHeartbeat(HeartbeatTimeoutMessage message) { + String nDid = message.getNDid(); + Long sendTime = message.getTimestamp(); + if (heartbeatService.isHeartbeatUpdated(nDid, sendTime)) { + return; + } + log.info("{}->装置离线,执行业务处理", nDid); + handleDeviceOffline(nDid); + } + + private void handleDeviceOffline(String nDid) { + DeviceLogDTO logDto = new DeviceLogDTO(); + logDto.setUserName("运维管理员"); + logDto.setLoginName("njcnyw"); + //装置下线 + csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); + //装置调整为注册状态 + csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null); + logDto.setOperate(nDid +"装置离线"); + sendMessage(nDid); + //记录装置掉线时间 + PqsCommunicateDto dto = new PqsCommunicateDto(); + dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + dto.setDevId(nDid); + dto.setType(0); + dto.setDescription("通讯中断"); + csCommunicateFeignClient.insertion(dto); + csLogsFeignClient.addUserLog(logDto); + //清空缓存 + redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid); + } + + private void sendMessage(String nDid) { + boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData(); + if (devModel) { + NoticeUserDto dto = sendOffLine(nDid); + if (CollectionUtil.isNotEmpty(dto.getPushClientId())) { + sendMessageUtil.sendEventToUser(dto); + addLogs(dto); + } + } + } + + //掉线通知 + private NoticeUserDto sendOffLine(String nDid) { + NoticeUserDto dto = new NoticeUserDto(); + dto.setTitle("设备离线"); + CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData(); + DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData(); + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime localDateTime = LocalDateTime.now(); + String dateStr = localDateTime.format(fmt); + String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "于" + dateStr + "离线"); + dto.setContent(content); + //获取设备关联的用户 + List eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData(); + DeviceMessageParam param1 = new DeviceMessageParam(); + param1.setUserList(eventUser); + param1.setEventType(2); + //获取打开推送的用户 + List users = deviceMessageFeignClient.getSendUserByType(param1).getData(); + if (CollectionUtil.isNotEmpty(users)){ + dto.setPushClientId( + users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(org.apache.commons.lang3.StringUtils::isNotBlank).distinct().collect(Collectors.toList())); + } + return dto; + } + + private void addLogs(NoticeUserDto noticeUserDto) { + DeviceLogDTO dto = new DeviceLogDTO(); + dto.setUserName("运维管理员"); + dto.setLoginName("njcnyw"); + dto.setOperate(noticeUserDto.getContent()); + csLogsFeignClient.addUserLog(dto); + } +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/HeartbeatServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/HeartbeatServiceImpl.java new file mode 100644 index 0000000..82b550a --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/HeartbeatServiceImpl.java @@ -0,0 +1,55 @@ +package com.njcn.access.service.impl; + +import com.njcn.access.service.IHeartbeatService; +import com.njcn.mq.message.HeartbeatTimeoutMessage; +import com.njcn.mq.template.HeartbeatTimeoutMessageTemplate; +import com.njcn.redis.utils.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author xy + */ +@Service +@Slf4j +public class HeartbeatServiceImpl implements IHeartbeatService { + + @Resource + private HeartbeatTimeoutMessageTemplate heartbeatTimeoutMessageTemplate; + @Resource + private RedisUtil redisUtil; + private static final String HEARTBEAT_REDIS_KEY_PREFIX = "HEARTBEAT:"; + private static final int DELAY_LEVEL_4MIN = 7; + private static final long HEARTBEAT_EXPIRE_SECONDS = 180; + + @Override + public void receiveHeartbeat(String nDid) { + String redisKey = HEARTBEAT_REDIS_KEY_PREFIX + nDid; + long currentTime = System.currentTimeMillis(); + + redisUtil.saveByKey(redisKey, currentTime); + redisUtil.expire(redisKey, HEARTBEAT_EXPIRE_SECONDS); + + HeartbeatTimeoutMessage message = new HeartbeatTimeoutMessage(); + + message.setNDid(nDid); + message.setTimestamp(currentTime); + message.setDelayLevel(DELAY_LEVEL_4MIN); + heartbeatTimeoutMessageTemplate.sendMember(message); + } + + @Override + public Boolean isHeartbeatUpdated(String nDid, Long sendTime) { + String redisKey = HEARTBEAT_REDIS_KEY_PREFIX + nDid; + Object lastHeartbeat = redisUtil.getObjectByKey(redisKey); + + if (lastHeartbeat == null) { + return false; + } + + long lastUpdateTime = Long.parseLong(lastHeartbeat.toString()); + return lastUpdateTime > sendTime; + } +} diff --git a/iot-access/access-boot/src/main/resources/bootstrap.yml b/iot-access/access-boot/src/main/resources/bootstrap.yml index 48e3e85..6aabb75 100644 --- a/iot-access/access-boot/src/main/resources/bootstrap.yml +++ b/iot-access/access-boot/src/main/resources/bootstrap.yml @@ -45,6 +45,7 @@ logging: config: http://@nacos.url@/nacos/v1/cs/configs?tenant=@nacos.namespace@&group=DEFAULT_GROUP&dataId=logback.xml level: root: info + com.njcn.middle.rocket.template.RocketMQEnhanceTemplate: ERROR #mybatis配置信息 diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/HeartbeatTimeoutConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/HeartbeatTimeoutConsumer.java new file mode 100644 index 0000000..e06bcbc --- /dev/null +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/HeartbeatTimeoutConsumer.java @@ -0,0 +1,134 @@ +package com.njcn.message.consumer; + +import com.njcn.access.api.CsHeartbeatFeignClient; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.constant.MessageStatus; +import com.njcn.mq.message.HeartbeatTimeoutMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:32 + */ +@Service +@RocketMQMessageListener( + topic = BusinessTopic.HEARTBEAT_TIMEOUT_TOPIC, + consumerGroup = BusinessTopic.HEARTBEAT_TIMEOUT_TOPIC, + selectorExpression = BusinessTopic.HeartTag.APF_TAG, + consumeThreadNumber = 1, + enableMsgTrace = true +) +@Slf4j +public class HeartbeatTimeoutConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Resource + private CsHeartbeatFeignClient csHeartbeatFeignClient; + + @Override + protected void handleMessage(HeartbeatTimeoutMessage appFileMessage) { + csHeartbeatFeignClient.handleHeartbeat(appFileMessage); + } + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(HeartbeatTimeoutMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(HeartbeatTimeoutMessage message) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, 300L); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(HeartbeatTimeoutMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(HeartbeatTimeoutMessage appFileMessage) { + super.dispatchMessage(appFileMessage); + } +}