sntp
This commit is contained in:
@@ -0,0 +1,44 @@
|
|||||||
|
package com.njcn.gather.detection.controller;
|
||||||
|
|
||||||
|
import com.njcn.common.pojo.annotation.OperateInfo;
|
||||||
|
import com.njcn.common.pojo.enums.common.LogEnum;
|
||||||
|
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||||
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
|
import com.njcn.gather.detection.sntp.SntpServerManager;
|
||||||
|
import com.njcn.web.controller.BaseController;
|
||||||
|
import com.njcn.web.utils.HttpResultUtil;
|
||||||
|
import io.swagger.annotations.Api;
|
||||||
|
import io.swagger.annotations.ApiOperation;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Api(tags = "SNTP对时")
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/sntp")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class SntpController extends BaseController {
|
||||||
|
|
||||||
|
private final SntpServerManager sntpServerManager;
|
||||||
|
|
||||||
|
@OperateInfo(info = LogEnum.SYSTEM_COMMON)
|
||||||
|
@PostMapping("/start")
|
||||||
|
@ApiOperation("启动SNTP对时服务")
|
||||||
|
public HttpResult<?> start() {
|
||||||
|
String methodDescribe = getMethodDescribe("start");
|
||||||
|
sntpServerManager.start();
|
||||||
|
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OperateInfo(info = LogEnum.SYSTEM_COMMON)
|
||||||
|
@PostMapping("/stop")
|
||||||
|
@ApiOperation("停止SNTP对时服务")
|
||||||
|
public HttpResult<?> stop() {
|
||||||
|
String methodDescribe = getMethodDescribe("stop");
|
||||||
|
sntpServerManager.stop();
|
||||||
|
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package com.njcn.gather.detection.sntp;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class SntpExchange {
|
||||||
|
|
||||||
|
private InetSocketAddress clientAddress;
|
||||||
|
|
||||||
|
private int version;
|
||||||
|
|
||||||
|
private Instant deviceInstant;
|
||||||
|
|
||||||
|
private byte[] clientTransmitTimestamp;
|
||||||
|
}
|
||||||
@@ -0,0 +1,108 @@
|
|||||||
|
package com.njcn.gather.detection.sntp;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class SntpPacketService {
|
||||||
|
|
||||||
|
private static final int MIN_PACKET_LENGTH = 48;
|
||||||
|
private static final int CLIENT_MODE = 3;
|
||||||
|
private static final int SERVER_MODE = 4;
|
||||||
|
private static final long NTP_EPOCH_OFFSET = 2208988800L;
|
||||||
|
private static final ZoneId SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
|
||||||
|
private static final byte[] REFERENCE_ID = "LOCL".getBytes(StandardCharsets.US_ASCII);
|
||||||
|
|
||||||
|
public SntpExchange parseRequest(byte[] request, InetSocketAddress clientAddress) {
|
||||||
|
if (request == null || request.length < MIN_PACKET_LENGTH) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
int mode = request[0] & 0x07;
|
||||||
|
if (mode != CLIENT_MODE) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
int version = (request[0] >> 3) & 0x07;
|
||||||
|
byte[] clientTransmitTimestamp = Arrays.copyOfRange(request, 40, 48);
|
||||||
|
Instant deviceInstant = fromNtpTimestamp(clientTransmitTimestamp);
|
||||||
|
return new SntpExchange(clientAddress, version == 0 ? 3 : version, deviceInstant, clientTransmitTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] buildResponse(SntpExchange exchange, Instant receiveInstant, Instant transmitInstant) {
|
||||||
|
byte[] response = new byte[MIN_PACKET_LENGTH];
|
||||||
|
int version = exchange.getVersion() == 0 ? 3 : exchange.getVersion();
|
||||||
|
response[0] = (byte) ((version << 3) | SERVER_MODE);
|
||||||
|
response[1] = 0x01;
|
||||||
|
response[2] = 0x04;
|
||||||
|
response[3] = (byte) 0xEC;
|
||||||
|
System.arraycopy(REFERENCE_ID, 0, response, 12, REFERENCE_ID.length);
|
||||||
|
|
||||||
|
byte[] receiveTimestamp = toNtpTimestamp(receiveInstant);
|
||||||
|
byte[] transmitTimestamp = toNtpTimestamp(transmitInstant);
|
||||||
|
|
||||||
|
System.arraycopy(receiveTimestamp, 0, response, 16, receiveTimestamp.length);
|
||||||
|
System.arraycopy(exchange.getClientTransmitTimestamp(), 0, response, 24, exchange.getClientTransmitTimestamp().length);
|
||||||
|
System.arraycopy(receiveTimestamp, 0, response, 32, receiveTimestamp.length);
|
||||||
|
System.arraycopy(transmitTimestamp, 0, response, 40, transmitTimestamp.length);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SntpPushMessage toPushMessage(Instant computerInstant, Instant deviceInstant) {
|
||||||
|
long computerTimestampMs = computerInstant.toEpochMilli();
|
||||||
|
long deviceTimestampMs = deviceInstant.toEpochMilli();
|
||||||
|
return new SntpPushMessage(
|
||||||
|
"sntp_time_update",
|
||||||
|
formatShanghaiTime(computerInstant),
|
||||||
|
formatShanghaiTime(deviceInstant),
|
||||||
|
computerTimestampMs,
|
||||||
|
deviceTimestampMs,
|
||||||
|
computerTimestampMs - deviceTimestampMs
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte[] toNtpTimestamp(Instant instant) {
|
||||||
|
long ntpSeconds = instant.getEpochSecond() + NTP_EPOCH_OFFSET;
|
||||||
|
long fraction = ((long) instant.getNano() << 32) / 1_000_000_000L;
|
||||||
|
byte[] bytes = new byte[8];
|
||||||
|
writeUnsignedInt(bytes, 0, ntpSeconds);
|
||||||
|
writeUnsignedInt(bytes, 4, fraction);
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Instant fromNtpTimestamp(byte[] bytes) {
|
||||||
|
long seconds = readUnsignedInt(bytes, 0);
|
||||||
|
long fraction = readUnsignedInt(bytes, 4);
|
||||||
|
long epochSeconds = seconds - NTP_EPOCH_OFFSET;
|
||||||
|
long nanos = ((fraction * 1_000_000_000L) + 0x80000000L) >>> 32;
|
||||||
|
if (nanos >= 1_000_000_000L) {
|
||||||
|
epochSeconds += 1;
|
||||||
|
nanos -= 1_000_000_000L;
|
||||||
|
}
|
||||||
|
return Instant.ofEpochSecond(epochSeconds, nanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String formatShanghaiTime(Instant instant) {
|
||||||
|
return LocalDateTime.ofInstant(instant, SHANGHAI_ZONE).toString().replace('T', ' ');
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long readUnsignedInt(byte[] bytes, int offset) {
|
||||||
|
return ((long) bytes[offset] & 0xFF) << 24
|
||||||
|
| ((long) bytes[offset + 1] & 0xFF) << 16
|
||||||
|
| ((long) bytes[offset + 2] & 0xFF) << 8
|
||||||
|
| ((long) bytes[offset + 3] & 0xFF);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeUnsignedInt(byte[] target, int offset, long value) {
|
||||||
|
target[offset] = (byte) ((value >>> 24) & 0xFF);
|
||||||
|
target[offset + 1] = (byte) ((value >>> 16) & 0xFF);
|
||||||
|
target[offset + 2] = (byte) ((value >>> 8) & 0xFF);
|
||||||
|
target[offset + 3] = (byte) (value & 0xFF);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
package com.njcn.gather.detection.sntp;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class SntpPushMessage {
|
||||||
|
|
||||||
|
private String type = "sntp_time_update";
|
||||||
|
|
||||||
|
private String computerTime;
|
||||||
|
|
||||||
|
private String deviceTime;
|
||||||
|
|
||||||
|
private Long computerTimestampMs;
|
||||||
|
|
||||||
|
private Long deviceTimestampMs;
|
||||||
|
|
||||||
|
private Long errorMs;
|
||||||
|
}
|
||||||
@@ -0,0 +1,166 @@
|
|||||||
|
package com.njcn.gather.detection.sntp;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||||
|
import com.njcn.common.pojo.exception.BusinessException;
|
||||||
|
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.DatagramPacket;
|
||||||
|
import java.net.DatagramSocket;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class SntpServerManager {
|
||||||
|
|
||||||
|
private final SntpServerProperties sntpServerProperties;
|
||||||
|
private final SntpPacketService sntpPacketService;
|
||||||
|
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
private final Object lifecycleMonitor = new Object();
|
||||||
|
|
||||||
|
private volatile DatagramSocket datagramSocket;
|
||||||
|
private volatile ExecutorService executorService;
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
// 使用同步锁和原子变量防止重复启动
|
||||||
|
synchronized (lifecycleMonitor) {
|
||||||
|
if (running.get()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int port = resolvePort();
|
||||||
|
DatagramSocket socket = createSocket(port);
|
||||||
|
ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
|
||||||
|
Thread thread = new Thread(runnable, "sntp-server");
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
});
|
||||||
|
|
||||||
|
datagramSocket = socket;
|
||||||
|
executorService = executor;
|
||||||
|
running.set(true);
|
||||||
|
executor.submit(this::receiveLoop);
|
||||||
|
log.info("SNTP服务已启动,监听端口: {}", port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
synchronized (lifecycleMonitor) {
|
||||||
|
if (!running.get()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
running.set(false);
|
||||||
|
closeSocketQuietly(datagramSocket);
|
||||||
|
datagramSocket = null;
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdownNow();
|
||||||
|
executorService = null;
|
||||||
|
}
|
||||||
|
log.info("SNTP服务已停止");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRunning() {
|
||||||
|
return running.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void destroy() {
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void receiveLoop() {
|
||||||
|
byte[] buffer = new byte[512];
|
||||||
|
while (running.get()) {
|
||||||
|
DatagramSocket socket = datagramSocket;
|
||||||
|
if (socket == null || socket.isClosed()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
||||||
|
try {
|
||||||
|
socket.receive(packet);
|
||||||
|
handlePacket(socket, packet);
|
||||||
|
log.info("SNTP服务接收报文: {}", Arrays.toString(packet.getData()));
|
||||||
|
} catch (SocketException e) {
|
||||||
|
if (running.get()) {
|
||||||
|
log.error("SNTP服务接收报文失败", e);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("SNTP服务处理报文失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized (lifecycleMonitor) {
|
||||||
|
if (running.get()) {
|
||||||
|
running.set(false);
|
||||||
|
closeSocketQuietly(datagramSocket);
|
||||||
|
datagramSocket = null;
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdownNow();
|
||||||
|
executorService = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handlePacket(DatagramSocket socket, DatagramPacket packet) throws IOException {
|
||||||
|
byte[] request = Arrays.copyOf(packet.getData(), packet.getLength());
|
||||||
|
InetSocketAddress clientAddress = new InetSocketAddress(packet.getAddress(), packet.getPort());
|
||||||
|
SntpExchange exchange = sntpPacketService.parseRequest(request, clientAddress);
|
||||||
|
if (exchange == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Instant receiveInstant = Instant.now();
|
||||||
|
Instant transmitInstant = receiveInstant;
|
||||||
|
byte[] response = sntpPacketService.buildResponse(exchange, receiveInstant, transmitInstant);
|
||||||
|
DatagramPacket responsePacket = new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort());
|
||||||
|
socket.send(responsePacket);
|
||||||
|
|
||||||
|
SntpPushMessage pushMessage = sntpPacketService.toPushMessage(transmitInstant, exchange.getDeviceInstant());
|
||||||
|
WebServiceManager.broadcast(JSON.toJSONString(pushMessage));
|
||||||
|
}
|
||||||
|
|
||||||
|
private DatagramSocket createSocket(int port) {
|
||||||
|
try {
|
||||||
|
DatagramSocket socket = new DatagramSocket(port);
|
||||||
|
socket.setReuseAddress(true);
|
||||||
|
return socket;
|
||||||
|
} catch (SocketException e) {
|
||||||
|
throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口绑定异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int resolvePort() {
|
||||||
|
Integer port = sntpServerProperties.getPort();
|
||||||
|
if (port == null || port < 1 || port > 65535) {
|
||||||
|
throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口配置无效");
|
||||||
|
}
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭UDP Socket
|
||||||
|
*
|
||||||
|
* @param socket
|
||||||
|
*/
|
||||||
|
private void closeSocketQuietly(DatagramSocket socket) {
|
||||||
|
if (socket != null && !socket.isClosed()) {
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package com.njcn.gather.detection.sntp;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Component
|
||||||
|
@ConfigurationProperties(prefix = "sntp")
|
||||||
|
public class SntpServerProperties {
|
||||||
|
|
||||||
|
private Integer port = 123;
|
||||||
|
}
|
||||||
@@ -142,6 +142,19 @@ public class WebServiceManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void broadcast(String msg) {
|
||||||
|
for (Map.Entry<String, Channel> entry : userSessions.entrySet()) {
|
||||||
|
String userId = entry.getKey();
|
||||||
|
Channel channel = entry.getValue();
|
||||||
|
if (Objects.nonNull(channel) && channel.isActive()) {
|
||||||
|
channel.writeAndFlush(new TextWebSocketFrame(msg));
|
||||||
|
} else {
|
||||||
|
log.error("WebSocket broadcast failed, disconnected user, time: {}, userId: {}", LocalDateTime.now(), userId);
|
||||||
|
WebSocketHandler.cleanupSocketResources(userId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 存储检测参数(基于用户ID)
|
* 存储检测参数(基于用户ID)
|
||||||
* 支持多用户并发检测,每个用户的检测参数独立存储
|
* 支持多用户并发检测,每个用户的检测参数独立存储
|
||||||
|
|||||||
@@ -66,6 +66,9 @@ webSocket:
|
|||||||
port: 7778
|
port: 7778
|
||||||
|
|
||||||
#源参数下发,暂态数据默认值
|
#源参数下发,暂态数据默认值
|
||||||
|
sntp:
|
||||||
|
port: 123
|
||||||
|
|
||||||
Dip:
|
Dip:
|
||||||
#暂态前时间(s)
|
#暂态前时间(s)
|
||||||
fPreTime: 2f
|
fPreTime: 2f
|
||||||
|
|||||||
Reference in New Issue
Block a user