From a48dad40f6f262f5930cd8e94bcfdd27d6bfd06e Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Thu, 4 Jun 2026 17:23:44 +0800 Subject: [PATCH] sntp --- .../detection/controller/SntpController.java | 44 +++++ .../gather/detection/sntp/SntpExchange.java | 20 +++ .../detection/sntp/SntpPacketService.java | 108 ++++++++++++ .../detection/sntp/SntpPushMessage.java | 23 +++ .../detection/sntp/SntpServerManager.java | 166 ++++++++++++++++++ .../detection/sntp/SntpServerProperties.java | 13 ++ .../socket/websocket/WebServiceManager.java | 15 +- entrance/src/main/resources/application.yml | 3 + 8 files changed, 391 insertions(+), 1 deletion(-) create mode 100644 detection/src/main/java/com/njcn/gather/detection/controller/SntpController.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/sntp/SntpExchange.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/sntp/SntpPacketService.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/sntp/SntpPushMessage.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerManager.java create mode 100644 detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerProperties.java diff --git a/detection/src/main/java/com/njcn/gather/detection/controller/SntpController.java b/detection/src/main/java/com/njcn/gather/detection/controller/SntpController.java new file mode 100644 index 00000000..8618acb8 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/controller/SntpController.java @@ -0,0 +1,44 @@ +package com.njcn.gather.detection.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.gather.detection.sntp.SntpServerManager; +import com.njcn.web.controller.BaseController; +import com.njcn.web.utils.HttpResultUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@Api(tags = "SNTP对时") +@RestController +@RequestMapping("/sntp") +@RequiredArgsConstructor +public class SntpController extends BaseController { + + private final SntpServerManager sntpServerManager; + + @OperateInfo(info = LogEnum.SYSTEM_COMMON) + @PostMapping("/start") + @ApiOperation("启动SNTP对时服务") + public HttpResult start() { + String methodDescribe = getMethodDescribe("start"); + sntpServerManager.start(); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + + @OperateInfo(info = LogEnum.SYSTEM_COMMON) + @PostMapping("/stop") + @ApiOperation("停止SNTP对时服务") + public HttpResult stop() { + String methodDescribe = getMethodDescribe("stop"); + sntpServerManager.stop(); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } +} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpExchange.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpExchange.java new file mode 100644 index 00000000..737482fc --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpExchange.java @@ -0,0 +1,20 @@ +package com.njcn.gather.detection.sntp; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.net.InetSocketAddress; +import java.time.Instant; + +@Data +@AllArgsConstructor +public class SntpExchange { + + private InetSocketAddress clientAddress; + + private int version; + + private Instant deviceInstant; + + private byte[] clientTransmitTimestamp; +} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPacketService.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPacketService.java new file mode 100644 index 00000000..15f088a6 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPacketService.java @@ -0,0 +1,108 @@ +package com.njcn.gather.detection.sntp; + +import org.springframework.stereotype.Service; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Arrays; + +@Service +public class SntpPacketService { + + private static final int MIN_PACKET_LENGTH = 48; + private static final int CLIENT_MODE = 3; + private static final int SERVER_MODE = 4; + private static final long NTP_EPOCH_OFFSET = 2208988800L; + private static final ZoneId SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai"); + private static final byte[] REFERENCE_ID = "LOCL".getBytes(StandardCharsets.US_ASCII); + + public SntpExchange parseRequest(byte[] request, InetSocketAddress clientAddress) { + if (request == null || request.length < MIN_PACKET_LENGTH) { + return null; + } + + int mode = request[0] & 0x07; + if (mode != CLIENT_MODE) { + return null; + } + + int version = (request[0] >> 3) & 0x07; + byte[] clientTransmitTimestamp = Arrays.copyOfRange(request, 40, 48); + Instant deviceInstant = fromNtpTimestamp(clientTransmitTimestamp); + return new SntpExchange(clientAddress, version == 0 ? 3 : version, deviceInstant, clientTransmitTimestamp); + } + + public byte[] buildResponse(SntpExchange exchange, Instant receiveInstant, Instant transmitInstant) { + byte[] response = new byte[MIN_PACKET_LENGTH]; + int version = exchange.getVersion() == 0 ? 3 : exchange.getVersion(); + response[0] = (byte) ((version << 3) | SERVER_MODE); + response[1] = 0x01; + response[2] = 0x04; + response[3] = (byte) 0xEC; + System.arraycopy(REFERENCE_ID, 0, response, 12, REFERENCE_ID.length); + + byte[] receiveTimestamp = toNtpTimestamp(receiveInstant); + byte[] transmitTimestamp = toNtpTimestamp(transmitInstant); + + System.arraycopy(receiveTimestamp, 0, response, 16, receiveTimestamp.length); + System.arraycopy(exchange.getClientTransmitTimestamp(), 0, response, 24, exchange.getClientTransmitTimestamp().length); + System.arraycopy(receiveTimestamp, 0, response, 32, receiveTimestamp.length); + System.arraycopy(transmitTimestamp, 0, response, 40, transmitTimestamp.length); + return response; + } + + public SntpPushMessage toPushMessage(Instant computerInstant, Instant deviceInstant) { + long computerTimestampMs = computerInstant.toEpochMilli(); + long deviceTimestampMs = deviceInstant.toEpochMilli(); + return new SntpPushMessage( + "sntp_time_update", + formatShanghaiTime(computerInstant), + formatShanghaiTime(deviceInstant), + computerTimestampMs, + deviceTimestampMs, + computerTimestampMs - deviceTimestampMs + ); + } + + public static byte[] toNtpTimestamp(Instant instant) { + long ntpSeconds = instant.getEpochSecond() + NTP_EPOCH_OFFSET; + long fraction = ((long) instant.getNano() << 32) / 1_000_000_000L; + byte[] bytes = new byte[8]; + writeUnsignedInt(bytes, 0, ntpSeconds); + writeUnsignedInt(bytes, 4, fraction); + return bytes; + } + + private static Instant fromNtpTimestamp(byte[] bytes) { + long seconds = readUnsignedInt(bytes, 0); + long fraction = readUnsignedInt(bytes, 4); + long epochSeconds = seconds - NTP_EPOCH_OFFSET; + long nanos = ((fraction * 1_000_000_000L) + 0x80000000L) >>> 32; + if (nanos >= 1_000_000_000L) { + epochSeconds += 1; + nanos -= 1_000_000_000L; + } + return Instant.ofEpochSecond(epochSeconds, nanos); + } + + private String formatShanghaiTime(Instant instant) { + return LocalDateTime.ofInstant(instant, SHANGHAI_ZONE).toString().replace('T', ' '); + } + + private static long readUnsignedInt(byte[] bytes, int offset) { + return ((long) bytes[offset] & 0xFF) << 24 + | ((long) bytes[offset + 1] & 0xFF) << 16 + | ((long) bytes[offset + 2] & 0xFF) << 8 + | ((long) bytes[offset + 3] & 0xFF); + } + + private static void writeUnsignedInt(byte[] target, int offset, long value) { + target[offset] = (byte) ((value >>> 24) & 0xFF); + target[offset + 1] = (byte) ((value >>> 16) & 0xFF); + target[offset + 2] = (byte) ((value >>> 8) & 0xFF); + target[offset + 3] = (byte) (value & 0xFF); + } +} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPushMessage.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPushMessage.java new file mode 100644 index 00000000..cf0a3e3c --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpPushMessage.java @@ -0,0 +1,23 @@ +package com.njcn.gather.detection.sntp; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SntpPushMessage { + + private String type = "sntp_time_update"; + + private String computerTime; + + private String deviceTime; + + private Long computerTimestampMs; + + private Long deviceTimestampMs; + + private Long errorMs; +} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerManager.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerManager.java new file mode 100644 index 00000000..97a7a657 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerManager.java @@ -0,0 +1,166 @@ +package com.njcn.gather.detection.sntp; + +import com.alibaba.fastjson.JSON; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.gather.detection.util.socket.websocket.WebServiceManager; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.time.Instant; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SntpServerManager { + + private final SntpServerProperties sntpServerProperties; + private final SntpPacketService sntpPacketService; + + private final AtomicBoolean running = new AtomicBoolean(false); + private final Object lifecycleMonitor = new Object(); + + private volatile DatagramSocket datagramSocket; + private volatile ExecutorService executorService; + + public void start() { + // 使用同步锁和原子变量防止重复启动 + synchronized (lifecycleMonitor) { + if (running.get()) { + return; + } + int port = resolvePort(); + DatagramSocket socket = createSocket(port); + ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "sntp-server"); + thread.setDaemon(true); + return thread; + }); + + datagramSocket = socket; + executorService = executor; + running.set(true); + executor.submit(this::receiveLoop); + log.info("SNTP服务已启动,监听端口: {}", port); + } + } + + public void stop() { + synchronized (lifecycleMonitor) { + if (!running.get()) { + return; + } + running.set(false); + closeSocketQuietly(datagramSocket); + datagramSocket = null; + if (executorService != null) { + executorService.shutdownNow(); + executorService = null; + } + log.info("SNTP服务已停止"); + } + } + + public boolean isRunning() { + return running.get(); + } + + @PreDestroy + public void destroy() { + stop(); + } + + private void receiveLoop() { + byte[] buffer = new byte[512]; + while (running.get()) { + DatagramSocket socket = datagramSocket; + if (socket == null || socket.isClosed()) { + break; + } + + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + socket.receive(packet); + handlePacket(socket, packet); + log.info("SNTP服务接收报文: {}", Arrays.toString(packet.getData())); + } catch (SocketException e) { + if (running.get()) { + log.error("SNTP服务接收报文失败", e); + } + break; + } catch (Exception e) { + log.error("SNTP服务处理报文失败", e); + } + } + + synchronized (lifecycleMonitor) { + if (running.get()) { + running.set(false); + closeSocketQuietly(datagramSocket); + datagramSocket = null; + if (executorService != null) { + executorService.shutdownNow(); + executorService = null; + } + } + } + } + + private void handlePacket(DatagramSocket socket, DatagramPacket packet) throws IOException { + byte[] request = Arrays.copyOf(packet.getData(), packet.getLength()); + InetSocketAddress clientAddress = new InetSocketAddress(packet.getAddress(), packet.getPort()); + SntpExchange exchange = sntpPacketService.parseRequest(request, clientAddress); + if (exchange == null) { + return; + } + + Instant receiveInstant = Instant.now(); + Instant transmitInstant = receiveInstant; + byte[] response = sntpPacketService.buildResponse(exchange, receiveInstant, transmitInstant); + DatagramPacket responsePacket = new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort()); + socket.send(responsePacket); + + SntpPushMessage pushMessage = sntpPacketService.toPushMessage(transmitInstant, exchange.getDeviceInstant()); + WebServiceManager.broadcast(JSON.toJSONString(pushMessage)); + } + + private DatagramSocket createSocket(int port) { + try { + DatagramSocket socket = new DatagramSocket(port); + socket.setReuseAddress(true); + return socket; + } catch (SocketException e) { + throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口绑定异常"); + } + } + + private int resolvePort() { + Integer port = sntpServerProperties.getPort(); + if (port == null || port < 1 || port > 65535) { + throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口配置无效"); + } + return port; + } + + /** + * 关闭UDP Socket + * + * @param socket + */ + private void closeSocketQuietly(DatagramSocket socket) { + if (socket != null && !socket.isClosed()) { + socket.close(); + } + } +} diff --git a/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerProperties.java b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerProperties.java new file mode 100644 index 00000000..1a4f3227 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/detection/sntp/SntpServerProperties.java @@ -0,0 +1,13 @@ +package com.njcn.gather.detection.sntp; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "sntp") +public class SntpServerProperties { + + private Integer port = 123; +} diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java index 2e1a0107..935ba89a 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java @@ -142,6 +142,19 @@ public class WebServiceManager { } } + public static void broadcast(String msg) { + for (Map.Entry entry : userSessions.entrySet()) { + String userId = entry.getKey(); + Channel channel = entry.getValue(); + if (Objects.nonNull(channel) && channel.isActive()) { + channel.writeAndFlush(new TextWebSocketFrame(msg)); + } else { + log.error("WebSocket broadcast failed, disconnected user, time: {}, userId: {}", LocalDateTime.now(), userId); + WebSocketHandler.cleanupSocketResources(userId); + } + } + } + /** * 存储检测参数(基于用户ID) * 支持多用户并发检测,每个用户的检测参数独立存储 @@ -315,4 +328,4 @@ public class WebServiceManager { sendMessage(userId, webSocketVO); } } - \ No newline at end of file + diff --git a/entrance/src/main/resources/application.yml b/entrance/src/main/resources/application.yml index a98719ca..51a9593a 100644 --- a/entrance/src/main/resources/application.yml +++ b/entrance/src/main/resources/application.yml @@ -66,6 +66,9 @@ webSocket: port: 7778 #源参数下发,暂态数据默认值 +sntp: + port: 123 + Dip: #暂态前时间(s) fPreTime: 2f