diff --git a/detection/src/main/java/com/njcn/gather/detection/controller/SntpController.java b/detection/src/main/java/com/njcn/gather/detection/controller/SntpController.java deleted file mode 100644 index 8618acb8..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/controller/SntpController.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.njcn.gather.detection.controller; - -import com.njcn.common.pojo.annotation.OperateInfo; -import com.njcn.common.pojo.enums.common.LogEnum; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; -import com.njcn.common.pojo.response.HttpResult; -import com.njcn.gather.detection.sntp.SntpServerManager; -import com.njcn.web.controller.BaseController; -import com.njcn.web.utils.HttpResultUtil; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@Slf4j -@Api(tags = "SNTP对时") -@RestController -@RequestMapping("/sntp") -@RequiredArgsConstructor -public class SntpController extends BaseController { - - private final SntpServerManager sntpServerManager; - - @OperateInfo(info = LogEnum.SYSTEM_COMMON) - @PostMapping("/start") - @ApiOperation("启动SNTP对时服务") - public HttpResult start() { - String methodDescribe = getMethodDescribe("start"); - sntpServerManager.start(); - return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); - } - - @OperateInfo(info = LogEnum.SYSTEM_COMMON) - @PostMapping("/stop") - @ApiOperation("停止SNTP对时服务") - public HttpResult stop() { - String methodDescribe = getMethodDescribe("stop"); - sntpServerManager.stop(); - return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); - } -} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpExchange.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpExchange.java deleted file mode 100644 index 737482fc..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpExchange.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.njcn.gather.detection.sntp; - -import lombok.AllArgsConstructor; -import lombok.Data; - -import java.net.InetSocketAddress; -import java.time.Instant; - -@Data -@AllArgsConstructor -public class SntpExchange { - - private InetSocketAddress clientAddress; - - private int version; - - private Instant deviceInstant; - - private byte[] clientTransmitTimestamp; -} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPacketService.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPacketService.java deleted file mode 100644 index 15f088a6..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPacketService.java +++ /dev/null @@ -1,108 +0,0 @@ -package com.njcn.gather.detection.sntp; - -import org.springframework.stereotype.Service; - -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.Arrays; - -@Service -public class SntpPacketService { - - private static final int MIN_PACKET_LENGTH = 48; - private static final int CLIENT_MODE = 3; - private static final int SERVER_MODE = 4; - private static final long NTP_EPOCH_OFFSET = 2208988800L; - private static final ZoneId SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai"); - private static final byte[] REFERENCE_ID = "LOCL".getBytes(StandardCharsets.US_ASCII); - - public SntpExchange parseRequest(byte[] request, InetSocketAddress clientAddress) { - if (request == null || request.length < MIN_PACKET_LENGTH) { - return null; - } - - int mode = request[0] & 0x07; - if (mode != CLIENT_MODE) { - return null; - } - - int version = (request[0] >> 3) & 0x07; - byte[] clientTransmitTimestamp = Arrays.copyOfRange(request, 40, 48); - Instant deviceInstant = fromNtpTimestamp(clientTransmitTimestamp); - return new SntpExchange(clientAddress, version == 0 ? 3 : version, deviceInstant, clientTransmitTimestamp); - } - - public byte[] buildResponse(SntpExchange exchange, Instant receiveInstant, Instant transmitInstant) { - byte[] response = new byte[MIN_PACKET_LENGTH]; - int version = exchange.getVersion() == 0 ? 3 : exchange.getVersion(); - response[0] = (byte) ((version << 3) | SERVER_MODE); - response[1] = 0x01; - response[2] = 0x04; - response[3] = (byte) 0xEC; - System.arraycopy(REFERENCE_ID, 0, response, 12, REFERENCE_ID.length); - - byte[] receiveTimestamp = toNtpTimestamp(receiveInstant); - byte[] transmitTimestamp = toNtpTimestamp(transmitInstant); - - System.arraycopy(receiveTimestamp, 0, response, 16, receiveTimestamp.length); - System.arraycopy(exchange.getClientTransmitTimestamp(), 0, response, 24, exchange.getClientTransmitTimestamp().length); - System.arraycopy(receiveTimestamp, 0, response, 32, receiveTimestamp.length); - System.arraycopy(transmitTimestamp, 0, response, 40, transmitTimestamp.length); - return response; - } - - public SntpPushMessage toPushMessage(Instant computerInstant, Instant deviceInstant) { - long computerTimestampMs = computerInstant.toEpochMilli(); - long deviceTimestampMs = deviceInstant.toEpochMilli(); - return new SntpPushMessage( - "sntp_time_update", - formatShanghaiTime(computerInstant), - formatShanghaiTime(deviceInstant), - computerTimestampMs, - deviceTimestampMs, - computerTimestampMs - deviceTimestampMs - ); - } - - public static byte[] toNtpTimestamp(Instant instant) { - long ntpSeconds = instant.getEpochSecond() + NTP_EPOCH_OFFSET; - long fraction = ((long) instant.getNano() << 32) / 1_000_000_000L; - byte[] bytes = new byte[8]; - writeUnsignedInt(bytes, 0, ntpSeconds); - writeUnsignedInt(bytes, 4, fraction); - return bytes; - } - - private static Instant fromNtpTimestamp(byte[] bytes) { - long seconds = readUnsignedInt(bytes, 0); - long fraction = readUnsignedInt(bytes, 4); - long epochSeconds = seconds - NTP_EPOCH_OFFSET; - long nanos = ((fraction * 1_000_000_000L) + 0x80000000L) >>> 32; - if (nanos >= 1_000_000_000L) { - epochSeconds += 1; - nanos -= 1_000_000_000L; - } - return Instant.ofEpochSecond(epochSeconds, nanos); - } - - private String formatShanghaiTime(Instant instant) { - return LocalDateTime.ofInstant(instant, SHANGHAI_ZONE).toString().replace('T', ' '); - } - - private static long readUnsignedInt(byte[] bytes, int offset) { - return ((long) bytes[offset] & 0xFF) << 24 - | ((long) bytes[offset + 1] & 0xFF) << 16 - | ((long) bytes[offset + 2] & 0xFF) << 8 - | ((long) bytes[offset + 3] & 0xFF); - } - - private static void writeUnsignedInt(byte[] target, int offset, long value) { - target[offset] = (byte) ((value >>> 24) & 0xFF); - target[offset + 1] = (byte) ((value >>> 16) & 0xFF); - target[offset + 2] = (byte) ((value >>> 8) & 0xFF); - target[offset + 3] = (byte) (value & 0xFF); - } -} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPushMessage.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPushMessage.java deleted file mode 100644 index cf0a3e3c..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPushMessage.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.njcn.gather.detection.sntp; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class SntpPushMessage { - - private String type = "sntp_time_update"; - - private String computerTime; - - private String deviceTime; - - private Long computerTimestampMs; - - private Long deviceTimestampMs; - - private Long errorMs; -} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerManager.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerManager.java deleted file mode 100644 index 97a7a657..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerManager.java +++ /dev/null @@ -1,166 +0,0 @@ -package com.njcn.gather.detection.sntp; - -import com.alibaba.fastjson.JSON; -import com.njcn.common.pojo.enums.response.CommonResponseEnum; -import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import javax.annotation.PreDestroy; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.time.Instant; -import java.util.Arrays; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -@Slf4j -@Service -@RequiredArgsConstructor -public class SntpServerManager { - - private final SntpServerProperties sntpServerProperties; - private final SntpPacketService sntpPacketService; - - private final AtomicBoolean running = new AtomicBoolean(false); - private final Object lifecycleMonitor = new Object(); - - private volatile DatagramSocket datagramSocket; - private volatile ExecutorService executorService; - - public void start() { - // 使用同步锁和原子变量防止重复启动 - synchronized (lifecycleMonitor) { - if (running.get()) { - return; - } - int port = resolvePort(); - DatagramSocket socket = createSocket(port); - ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { - Thread thread = new Thread(runnable, "sntp-server"); - thread.setDaemon(true); - return thread; - }); - - datagramSocket = socket; - executorService = executor; - running.set(true); - executor.submit(this::receiveLoop); - log.info("SNTP服务已启动,监听端口: {}", port); - } - } - - public void stop() { - synchronized (lifecycleMonitor) { - if (!running.get()) { - return; - } - running.set(false); - closeSocketQuietly(datagramSocket); - datagramSocket = null; - if (executorService != null) { - executorService.shutdownNow(); - executorService = null; - } - log.info("SNTP服务已停止"); - } - } - - public boolean isRunning() { - return running.get(); - } - - @PreDestroy - public void destroy() { - stop(); - } - - private void receiveLoop() { - byte[] buffer = new byte[512]; - while (running.get()) { - DatagramSocket socket = datagramSocket; - if (socket == null || socket.isClosed()) { - break; - } - - DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - try { - socket.receive(packet); - handlePacket(socket, packet); - log.info("SNTP服务接收报文: {}", Arrays.toString(packet.getData())); - } catch (SocketException e) { - if (running.get()) { - log.error("SNTP服务接收报文失败", e); - } - break; - } catch (Exception e) { - log.error("SNTP服务处理报文失败", e); - } - } - - synchronized (lifecycleMonitor) { - if (running.get()) { - running.set(false); - closeSocketQuietly(datagramSocket); - datagramSocket = null; - if (executorService != null) { - executorService.shutdownNow(); - executorService = null; - } - } - } - } - - private void handlePacket(DatagramSocket socket, DatagramPacket packet) throws IOException { - byte[] request = Arrays.copyOf(packet.getData(), packet.getLength()); - InetSocketAddress clientAddress = new InetSocketAddress(packet.getAddress(), packet.getPort()); - SntpExchange exchange = sntpPacketService.parseRequest(request, clientAddress); - if (exchange == null) { - return; - } - - Instant receiveInstant = Instant.now(); - Instant transmitInstant = receiveInstant; - byte[] response = sntpPacketService.buildResponse(exchange, receiveInstant, transmitInstant); - DatagramPacket responsePacket = new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort()); - socket.send(responsePacket); - - SntpPushMessage pushMessage = sntpPacketService.toPushMessage(transmitInstant, exchange.getDeviceInstant()); - WebServiceManager.broadcast(JSON.toJSONString(pushMessage)); - } - - private DatagramSocket createSocket(int port) { - try { - DatagramSocket socket = new DatagramSocket(port); - socket.setReuseAddress(true); - return socket; - } catch (SocketException e) { - throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口绑定异常"); - } - } - - private int resolvePort() { - Integer port = sntpServerProperties.getPort(); - if (port == null || port < 1 || port > 65535) { - throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口配置无效"); - } - return port; - } - - /** - * 关闭UDP Socket - * - * @param socket - */ - private void closeSocketQuietly(DatagramSocket socket) { - if (socket != null && !socket.isClosed()) { - socket.close(); - } - } -} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerProperties.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerProperties.java deleted file mode 100644 index 1a4f3227..00000000 --- a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerProperties.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.njcn.gather.detection.sntp; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -@Data -@Component -@ConfigurationProperties(prefix = "sntp") -public class SntpServerProperties { - - private Integer port = 123; -} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java index 935ba89a..2e1a0107 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java @@ -142,19 +142,6 @@ public class WebServiceManager { } } - public static void broadcast(String msg) { - for (Map.Entry entry : userSessions.entrySet()) { - String userId = entry.getKey(); - Channel channel = entry.getValue(); - if (Objects.nonNull(channel) && channel.isActive()) { - channel.writeAndFlush(new TextWebSocketFrame(msg)); - } else { - log.error("WebSocket broadcast failed, disconnected user, time: {}, userId: {}", LocalDateTime.now(), userId); - WebSocketHandler.cleanupSocketResources(userId); - } - } - } - /** * 存储检测参数(基于用户ID) * 支持多用户并发检测,每个用户的检测参数独立存储 @@ -328,4 +315,4 @@ public class WebServiceManager { sendMessage(userId, webSocketVO); } } - + \ No newline at end of file diff --git a/entrance/src/main/resources/application.yml b/entrance/src/main/resources/application.yml index 51a9593a..a98719ca 100644 --- a/entrance/src/main/resources/application.yml +++ b/entrance/src/main/resources/application.yml @@ -66,9 +66,6 @@ webSocket: port: 7778 #源参数下发,暂态数据默认值 -sntp: - port: 123 - Dip: #暂态前时间(s) fPreTime: 2f