diff --git a/pqs-device/pq-device/pq-device-api/src/main/java/com/njcn/device/pq/pojo/param/AskFileSysParam.java b/pqs-device/pq-device/pq-device-api/src/main/java/com/njcn/device/pq/pojo/param/AskFileSysParam.java new file mode 100644 index 000000000..5703e2a52 --- /dev/null +++ b/pqs-device/pq-device/pq-device-api/src/main/java/com/njcn/device/pq/pojo/param/AskFileSysParam.java @@ -0,0 +1,19 @@ +package com.njcn.device.pq.pojo.param; + +import lombok.Data; + +/** + * Description:请求装置文件系统参数 + * Date: 2026/04/30 上午 10:51【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class AskFileSysParam { + + private String devId; + private String path; + private String remotePath; + +} diff --git a/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/DirectoryController.java b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/DirectoryController.java new file mode 100644 index 000000000..bfe665e29 --- /dev/null +++ b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/DirectoryController.java @@ -0,0 +1,181 @@ +package com.njcn.device.pq.controller.file; + +import cn.hutool.core.date.TimeInterval; +import cn.hutool.core.util.IdUtil; +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.device.device.service.DeviceProcessService; +import com.njcn.device.device.service.IDeviceService; +import com.njcn.device.pq.pojo.param.AskFileSysParam; +import com.njcn.device.pq.pojo.po.Device; +import com.njcn.device.pq.pojo.po.DeviceProcess; +import com.njcn.message.api.ProduceFeignClient; +import com.njcn.message.message.AskFileSysMessage; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + +/** + * Description: + * Date: 2026/04/29 上午 11:41【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@RestController +@RequestMapping("/dir") +@RequiredArgsConstructor +@Slf4j +@Api(tags = "装置文件系统") +public class DirectoryController extends BaseController { + + + private final ProduceFeignClient produceFeignClient; + + + private final PendingRequestManager pendingRequestManager; + + private final IDeviceService iDeviceService; + private final DeviceProcessService deviceProcessService; + + @PostMapping("/list") + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("获取装置文件目录") + public DeferredResult listDirectory(@RequestBody AskFileSysParam askFileSysParam) { + String methodDescribe = getMethodDescribe("listDirectory"); + + long timeoutMs = 15000L; // 15 秒超时 + // 构造指令 body + Device device = iDeviceService.getById(askFileSysParam.getDevId()); + DeviceProcess deviceProcess = deviceProcessService.getById(askFileSysParam.getDevId()); + // 发送 MQ 指令,获取 msgId + AskFileSysMessage askFileSysMessage = new AskFileSysMessage(); + askFileSysMessage.setGuid(IdUtil.simpleUUID()); + askFileSysMessage.setNodeId(device.getNodeId()); + askFileSysMessage.setProcessNo(deviceProcess.getProcessNo()); + askFileSysMessage.setDevId(askFileSysParam.getDevId()); + askFileSysMessage.setType(0); + askFileSysMessage.setPath(askFileSysParam.getPath()); + + produceFeignClient.askFileSys(askFileSysMessage); + // 创建挂起请求 + DeferredResult deferredResult = pendingRequestManager.createPendingRequest(askFileSysMessage.getGuid(), timeoutMs); + // 可额外在 Redis 中记录初始状态(可选) + // 返回 DeferredResult,Spring 将挂起此请求 + return deferredResult; + } + + @PostMapping("/downLoadFile") + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("获取装置文件文件") + public DeferredResult downLoadFile(@RequestBody AskFileSysParam askFileSysParam) { + String methodDescribe = getMethodDescribe("downLoadFile"); + + long timeoutMs = 15000L; // 15 秒超时 + // 构造指令 body + Device device = iDeviceService.getById(askFileSysParam.getDevId()); + DeviceProcess deviceProcess = deviceProcessService.getById(askFileSysParam.getDevId()); + // 发送 MQ 指令,获取 msgId + AskFileSysMessage askFileSysMessage = new AskFileSysMessage(); + askFileSysMessage.setGuid(IdUtil.simpleUUID()); + askFileSysMessage.setNodeId(device.getNodeId()); + askFileSysMessage.setProcessNo(deviceProcess.getProcessNo()); + askFileSysMessage.setDevId(askFileSysParam.getDevId()); + askFileSysMessage.setType(1); + askFileSysMessage.setPath(askFileSysParam.getPath()); + + produceFeignClient.askFileSys(askFileSysMessage); + // 创建挂起请求 + DeferredResult deferredResult = pendingRequestManager.createPendingRequest(askFileSysMessage.getGuid(), timeoutMs); + // 可额外在 Redis 中记录初始状态(可选) + // 返回 DeferredResult,Spring 将挂起此请求 + return deferredResult; + } + + @PostMapping("/upLoadFile") + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("上传装置文件文件") + public DeferredResult upLoadFile(@RequestBody AskFileSysParam askFileSysParam) { + String methodDescribe = getMethodDescribe("upLoadFile"); + + long timeoutMs = 15000L; // 15 秒超时 + // 构造指令 body + Device device = iDeviceService.getById(askFileSysParam.getDevId()); + DeviceProcess deviceProcess = deviceProcessService.getById(askFileSysParam.getDevId()); + // 发送 MQ 指令,获取 msgId + AskFileSysMessage askFileSysMessage = new AskFileSysMessage(); + askFileSysMessage.setGuid(IdUtil.simpleUUID()); + askFileSysMessage.setNodeId(device.getNodeId()); + askFileSysMessage.setProcessNo(deviceProcess.getProcessNo()); + askFileSysMessage.setDevId(askFileSysParam.getDevId()); + askFileSysMessage.setType(2); + askFileSysMessage.setPath(askFileSysParam.getPath()); + askFileSysMessage.setRemotePath(askFileSysParam.getRemotePath()); + produceFeignClient.askFileSys(askFileSysMessage); + // 创建挂起请求 + DeferredResult deferredResult = pendingRequestManager.createPendingRequest(askFileSysMessage.getGuid(), timeoutMs); + // 可额外在 Redis 中记录初始状态(可选) + // 返回 DeferredResult,Spring 将挂起此请求 + return deferredResult; + } + + @PostMapping("/delete") + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("装置目录,文件删除") + public DeferredResult delete(@RequestBody AskFileSysParam askFileSysParam) { + String methodDescribe = getMethodDescribe("delete"); + + long timeoutMs = 15000L; // 15 秒超时 + // 构造指令 body + Device device = iDeviceService.getById(askFileSysParam.getDevId()); + DeviceProcess deviceProcess = deviceProcessService.getById(askFileSysParam.getDevId()); + // 发送 MQ 指令,获取 msgId + AskFileSysMessage askFileSysMessage = new AskFileSysMessage(); + askFileSysMessage.setGuid(IdUtil.simpleUUID()); + askFileSysMessage.setNodeId(device.getNodeId()); + askFileSysMessage.setProcessNo(deviceProcess.getProcessNo()); + askFileSysMessage.setDevId(askFileSysParam.getDevId()); + askFileSysMessage.setType(3); + askFileSysMessage.setPath(askFileSysParam.getPath()); + askFileSysMessage.setRemotePath(askFileSysParam.getRemotePath()); + produceFeignClient.askFileSys(askFileSysMessage); + // 创建挂起请求 + DeferredResult deferredResult = pendingRequestManager.createPendingRequest(askFileSysMessage.getGuid(), timeoutMs); + // 可额外在 Redis 中记录初始状态(可选) + // 返回 DeferredResult,Spring 将挂起此请求 + return deferredResult; + } + + @PostMapping("/restart") + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("装置重启") + public DeferredResult restart(@RequestBody AskFileSysParam askFileSysParam) { + String methodDescribe = getMethodDescribe("restart"); + + long timeoutMs = 15000L; // 15 秒超时 + // 构造指令 body + Device device = iDeviceService.getById(askFileSysParam.getDevId()); + DeviceProcess deviceProcess = deviceProcessService.getById(askFileSysParam.getDevId()); + // 发送 MQ 指令,获取 msgId + AskFileSysMessage askFileSysMessage = new AskFileSysMessage(); + askFileSysMessage.setGuid(IdUtil.simpleUUID()); + askFileSysMessage.setNodeId(device.getNodeId()); + askFileSysMessage.setProcessNo(deviceProcess.getProcessNo()); + askFileSysMessage.setDevId(askFileSysParam.getDevId()); + askFileSysMessage.setPath("reboot"); + askFileSysMessage.setType(4); + produceFeignClient.askFileSys(askFileSysMessage); + // 创建挂起请求 + DeferredResult deferredResult = pendingRequestManager.createPendingRequest(askFileSysMessage.getGuid(), timeoutMs); + // 可额外在 Redis 中记录初始状态(可选) + // 返回 DeferredResult,Spring 将挂起此请求 + return deferredResult; + } + + +} diff --git a/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/PendingRequestManager.java b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/PendingRequestManager.java new file mode 100644 index 000000000..3f6ca6f5e --- /dev/null +++ b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/PendingRequestManager.java @@ -0,0 +1,61 @@ +package com.njcn.device.pq.controller.file; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.utils.HttpResultUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.context.request.async.DeferredResult; +import java.util.concurrent.ConcurrentHashMap; + +@Component +@Slf4j +public class PendingRequestManager { + + @Autowired + private RedisTemplate redisTemplate; + + // 本地映射:msgId -> DeferredResult,主要用于超时清理和主动完成 + private final ConcurrentHashMap> deferredResultMap = new ConcurrentHashMap<>(); + + // 创建挂起请求 + public DeferredResult createPendingRequest(String msgId, long timeoutMs) { + DeferredResult deferredResult = new DeferredResult<>(timeoutMs); + // 超时回调 + deferredResult.onTimeout(() -> { + // 清理 Redis 中的等待记录 + redisTemplate.delete("pending:" + msgId); + deferredResultMap.remove(msgId); + deferredResult.setErrorResult(new BusinessException("前置超时....")); + }); + // 完成回调(正常或异常后移出本地映射) + deferredResult.onCompletion(() -> deferredResultMap.remove(msgId)); + + deferredResultMap.put(msgId, deferredResult); + return deferredResult; + } + + // 收到 Redis 通知后,根据 msgId 完成请求 + public void completeRequest(String msgId) { + if (deferredResultMap.containsKey(msgId)) { + DeferredResult deferredResult = deferredResultMap.get(msgId); + + // 从 Redis 中获取结果内容 + String key = "pending:" + msgId; + Object result = redisTemplate.opsForValue().get(key); + if (result != null) { + deferredResult.setResult( HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result,"")); + } else { + log.info("Receive notification for unknown msgId: " + msgId); + deferredResult.setErrorResult(new BusinessException("前置未返回结果")); + } + // 清理 Redis 中的记录(可选,利用过期时间自动删除也可) + redisTemplate.delete(key); + } else { + // 可能请求已超时被移除了,仅记录日志即可 + log.info("Receive notification for unknown msgId: " + msgId); + } + } +} \ No newline at end of file diff --git a/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/RedisMessageSubscriber.java b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/RedisMessageSubscriber.java new file mode 100644 index 000000000..317732bc1 --- /dev/null +++ b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/RedisMessageSubscriber.java @@ -0,0 +1,20 @@ +package com.njcn.device.pq.controller.file; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +@Component +public class RedisMessageSubscriber implements MessageListener { + @Autowired + private PendingRequestManager pendingRequestManager; + + @Override + public void onMessage(Message message, byte[] pattern) { + String msgId = new String(message.getBody(), StandardCharsets.UTF_8); + pendingRequestManager.completeRequest(msgId); + } +} diff --git a/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/RedisSubscriptionConfig.java b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/RedisSubscriptionConfig.java new file mode 100644 index 000000000..8baaf63bd --- /dev/null +++ b/pqs-device/pq-device/pq-device-boot/src/main/java/com/njcn/device/pq/controller/file/RedisSubscriptionConfig.java @@ -0,0 +1,44 @@ +package com.njcn.device.pq.controller.file; + +import com.njcn.redis.config.RedisConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; +import org.springframework.stereotype.Component; +import org.springframework.data.redis.connection.MessageListener; + +import java.nio.charset.StandardCharsets; + +/** + * Description: + * Date: 2026/04/29 下午 3:37【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Configuration +@Import(RedisConfig.class) // 引入公共配置 +public class RedisSubscriptionConfig { + + @Bean + public RedisMessageListenerContainer redisContainer( + RedisConnectionFactory connectionFactory, + MessageListenerAdapter resultListenerAdapter) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(resultListenerAdapter, new ChannelTopic("result_ready_channel")); + return container; + } + + @Bean + public MessageListenerAdapter resultListenerAdapter(RedisMessageSubscriber subscriber) { + return new MessageListenerAdapter(subscriber, "onMessage"); + } +} +