@@ -1,44 +0,0 @@
|
|||||||
package com.njcn.gather.detection.controller;
|
|
||||||
|
|
||||||
import com.njcn.common.pojo.annotation.OperateInfo;
|
|
||||||
import com.njcn.common.pojo.enums.common.LogEnum;
|
|
||||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
|
||||||
import com.njcn.common.pojo.response.HttpResult;
|
|
||||||
import com.njcn.gather.detection.sntp.SntpServerManager;
|
|
||||||
import com.njcn.web.controller.BaseController;
|
|
||||||
import com.njcn.web.utils.HttpResultUtil;
|
|
||||||
import io.swagger.annotations.Api;
|
|
||||||
import io.swagger.annotations.ApiOperation;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.web.bind.annotation.PostMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Api(tags = "SNTP对时")
|
|
||||||
@RestController
|
|
||||||
@RequestMapping("/sntp")
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class SntpController extends BaseController {
|
|
||||||
|
|
||||||
private final SntpServerManager sntpServerManager;
|
|
||||||
|
|
||||||
@OperateInfo(info = LogEnum.SYSTEM_COMMON)
|
|
||||||
@PostMapping("/start")
|
|
||||||
@ApiOperation("启动SNTP对时服务")
|
|
||||||
public HttpResult<?> start() {
|
|
||||||
String methodDescribe = getMethodDescribe("start");
|
|
||||||
sntpServerManager.start();
|
|
||||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
|
||||||
}
|
|
||||||
|
|
||||||
@OperateInfo(info = LogEnum.SYSTEM_COMMON)
|
|
||||||
@PostMapping("/stop")
|
|
||||||
@ApiOperation("停止SNTP对时服务")
|
|
||||||
public HttpResult<?> stop() {
|
|
||||||
String methodDescribe = getMethodDescribe("stop");
|
|
||||||
sntpServerManager.stop();
|
|
||||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,20 +0,0 @@
|
|||||||
package com.njcn.gather.detection.sntp;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.time.Instant;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class SntpExchange {
|
|
||||||
|
|
||||||
private InetSocketAddress clientAddress;
|
|
||||||
|
|
||||||
private int version;
|
|
||||||
|
|
||||||
private Instant deviceInstant;
|
|
||||||
|
|
||||||
private byte[] clientTransmitTimestamp;
|
|
||||||
}
|
|
||||||
@@ -1,108 +0,0 @@
|
|||||||
package com.njcn.gather.detection.sntp;
|
|
||||||
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class SntpPacketService {
|
|
||||||
|
|
||||||
private static final int MIN_PACKET_LENGTH = 48;
|
|
||||||
private static final int CLIENT_MODE = 3;
|
|
||||||
private static final int SERVER_MODE = 4;
|
|
||||||
private static final long NTP_EPOCH_OFFSET = 2208988800L;
|
|
||||||
private static final ZoneId SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
|
|
||||||
private static final byte[] REFERENCE_ID = "LOCL".getBytes(StandardCharsets.US_ASCII);
|
|
||||||
|
|
||||||
public SntpExchange parseRequest(byte[] request, InetSocketAddress clientAddress) {
|
|
||||||
if (request == null || request.length < MIN_PACKET_LENGTH) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
int mode = request[0] & 0x07;
|
|
||||||
if (mode != CLIENT_MODE) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
int version = (request[0] >> 3) & 0x07;
|
|
||||||
byte[] clientTransmitTimestamp = Arrays.copyOfRange(request, 40, 48);
|
|
||||||
Instant deviceInstant = fromNtpTimestamp(clientTransmitTimestamp);
|
|
||||||
return new SntpExchange(clientAddress, version == 0 ? 3 : version, deviceInstant, clientTransmitTimestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] buildResponse(SntpExchange exchange, Instant receiveInstant, Instant transmitInstant) {
|
|
||||||
byte[] response = new byte[MIN_PACKET_LENGTH];
|
|
||||||
int version = exchange.getVersion() == 0 ? 3 : exchange.getVersion();
|
|
||||||
response[0] = (byte) ((version << 3) | SERVER_MODE);
|
|
||||||
response[1] = 0x01;
|
|
||||||
response[2] = 0x04;
|
|
||||||
response[3] = (byte) 0xEC;
|
|
||||||
System.arraycopy(REFERENCE_ID, 0, response, 12, REFERENCE_ID.length);
|
|
||||||
|
|
||||||
byte[] receiveTimestamp = toNtpTimestamp(receiveInstant);
|
|
||||||
byte[] transmitTimestamp = toNtpTimestamp(transmitInstant);
|
|
||||||
|
|
||||||
System.arraycopy(receiveTimestamp, 0, response, 16, receiveTimestamp.length);
|
|
||||||
System.arraycopy(exchange.getClientTransmitTimestamp(), 0, response, 24, exchange.getClientTransmitTimestamp().length);
|
|
||||||
System.arraycopy(receiveTimestamp, 0, response, 32, receiveTimestamp.length);
|
|
||||||
System.arraycopy(transmitTimestamp, 0, response, 40, transmitTimestamp.length);
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SntpPushMessage toPushMessage(Instant computerInstant, Instant deviceInstant) {
|
|
||||||
long computerTimestampMs = computerInstant.toEpochMilli();
|
|
||||||
long deviceTimestampMs = deviceInstant.toEpochMilli();
|
|
||||||
return new SntpPushMessage(
|
|
||||||
"sntp_time_update",
|
|
||||||
formatShanghaiTime(computerInstant),
|
|
||||||
formatShanghaiTime(deviceInstant),
|
|
||||||
computerTimestampMs,
|
|
||||||
deviceTimestampMs,
|
|
||||||
computerTimestampMs - deviceTimestampMs
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static byte[] toNtpTimestamp(Instant instant) {
|
|
||||||
long ntpSeconds = instant.getEpochSecond() + NTP_EPOCH_OFFSET;
|
|
||||||
long fraction = ((long) instant.getNano() << 32) / 1_000_000_000L;
|
|
||||||
byte[] bytes = new byte[8];
|
|
||||||
writeUnsignedInt(bytes, 0, ntpSeconds);
|
|
||||||
writeUnsignedInt(bytes, 4, fraction);
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Instant fromNtpTimestamp(byte[] bytes) {
|
|
||||||
long seconds = readUnsignedInt(bytes, 0);
|
|
||||||
long fraction = readUnsignedInt(bytes, 4);
|
|
||||||
long epochSeconds = seconds - NTP_EPOCH_OFFSET;
|
|
||||||
long nanos = ((fraction * 1_000_000_000L) + 0x80000000L) >>> 32;
|
|
||||||
if (nanos >= 1_000_000_000L) {
|
|
||||||
epochSeconds += 1;
|
|
||||||
nanos -= 1_000_000_000L;
|
|
||||||
}
|
|
||||||
return Instant.ofEpochSecond(epochSeconds, nanos);
|
|
||||||
}
|
|
||||||
|
|
||||||
private String formatShanghaiTime(Instant instant) {
|
|
||||||
return LocalDateTime.ofInstant(instant, SHANGHAI_ZONE).toString().replace('T', ' ');
|
|
||||||
}
|
|
||||||
|
|
||||||
private static long readUnsignedInt(byte[] bytes, int offset) {
|
|
||||||
return ((long) bytes[offset] & 0xFF) << 24
|
|
||||||
| ((long) bytes[offset + 1] & 0xFF) << 16
|
|
||||||
| ((long) bytes[offset + 2] & 0xFF) << 8
|
|
||||||
| ((long) bytes[offset + 3] & 0xFF);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void writeUnsignedInt(byte[] target, int offset, long value) {
|
|
||||||
target[offset] = (byte) ((value >>> 24) & 0xFF);
|
|
||||||
target[offset + 1] = (byte) ((value >>> 16) & 0xFF);
|
|
||||||
target[offset + 2] = (byte) ((value >>> 8) & 0xFF);
|
|
||||||
target[offset + 3] = (byte) (value & 0xFF);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
package com.njcn.gather.detection.sntp;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class SntpPushMessage {
|
|
||||||
|
|
||||||
private String type = "sntp_time_update";
|
|
||||||
|
|
||||||
private String computerTime;
|
|
||||||
|
|
||||||
private String deviceTime;
|
|
||||||
|
|
||||||
private Long computerTimestampMs;
|
|
||||||
|
|
||||||
private Long deviceTimestampMs;
|
|
||||||
|
|
||||||
private Long errorMs;
|
|
||||||
}
|
|
||||||
@@ -1,166 +0,0 @@
|
|||||||
package com.njcn.gather.detection.sntp;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
|
||||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
|
||||||
import com.njcn.common.pojo.exception.BusinessException;
|
|
||||||
import com.njcn.gather.detection.util.socket.websocket.WebServiceManager;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.DatagramPacket;
|
|
||||||
import java.net.DatagramSocket;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketException;
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class SntpServerManager {
|
|
||||||
|
|
||||||
private final SntpServerProperties sntpServerProperties;
|
|
||||||
private final SntpPacketService sntpPacketService;
|
|
||||||
|
|
||||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
|
||||||
private final Object lifecycleMonitor = new Object();
|
|
||||||
|
|
||||||
private volatile DatagramSocket datagramSocket;
|
|
||||||
private volatile ExecutorService executorService;
|
|
||||||
|
|
||||||
public void start() {
|
|
||||||
// 使用同步锁和原子变量防止重复启动
|
|
||||||
synchronized (lifecycleMonitor) {
|
|
||||||
if (running.get()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
int port = resolvePort();
|
|
||||||
DatagramSocket socket = createSocket(port);
|
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
|
|
||||||
Thread thread = new Thread(runnable, "sntp-server");
|
|
||||||
thread.setDaemon(true);
|
|
||||||
return thread;
|
|
||||||
});
|
|
||||||
|
|
||||||
datagramSocket = socket;
|
|
||||||
executorService = executor;
|
|
||||||
running.set(true);
|
|
||||||
executor.submit(this::receiveLoop);
|
|
||||||
log.info("SNTP服务已启动,监听端口: {}", port);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
synchronized (lifecycleMonitor) {
|
|
||||||
if (!running.get()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
running.set(false);
|
|
||||||
closeSocketQuietly(datagramSocket);
|
|
||||||
datagramSocket = null;
|
|
||||||
if (executorService != null) {
|
|
||||||
executorService.shutdownNow();
|
|
||||||
executorService = null;
|
|
||||||
}
|
|
||||||
log.info("SNTP服务已停止");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isRunning() {
|
|
||||||
return running.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@PreDestroy
|
|
||||||
public void destroy() {
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void receiveLoop() {
|
|
||||||
byte[] buffer = new byte[512];
|
|
||||||
while (running.get()) {
|
|
||||||
DatagramSocket socket = datagramSocket;
|
|
||||||
if (socket == null || socket.isClosed()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
|
||||||
try {
|
|
||||||
socket.receive(packet);
|
|
||||||
handlePacket(socket, packet);
|
|
||||||
log.info("SNTP服务接收报文: {}", Arrays.toString(packet.getData()));
|
|
||||||
} catch (SocketException e) {
|
|
||||||
if (running.get()) {
|
|
||||||
log.error("SNTP服务接收报文失败", e);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("SNTP服务处理报文失败", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
|
||||||
if (running.get()) {
|
|
||||||
running.set(false);
|
|
||||||
closeSocketQuietly(datagramSocket);
|
|
||||||
datagramSocket = null;
|
|
||||||
if (executorService != null) {
|
|
||||||
executorService.shutdownNow();
|
|
||||||
executorService = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handlePacket(DatagramSocket socket, DatagramPacket packet) throws IOException {
|
|
||||||
byte[] request = Arrays.copyOf(packet.getData(), packet.getLength());
|
|
||||||
InetSocketAddress clientAddress = new InetSocketAddress(packet.getAddress(), packet.getPort());
|
|
||||||
SntpExchange exchange = sntpPacketService.parseRequest(request, clientAddress);
|
|
||||||
if (exchange == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Instant receiveInstant = Instant.now();
|
|
||||||
Instant transmitInstant = receiveInstant;
|
|
||||||
byte[] response = sntpPacketService.buildResponse(exchange, receiveInstant, transmitInstant);
|
|
||||||
DatagramPacket responsePacket = new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort());
|
|
||||||
socket.send(responsePacket);
|
|
||||||
|
|
||||||
SntpPushMessage pushMessage = sntpPacketService.toPushMessage(transmitInstant, exchange.getDeviceInstant());
|
|
||||||
WebServiceManager.broadcast(JSON.toJSONString(pushMessage));
|
|
||||||
}
|
|
||||||
|
|
||||||
private DatagramSocket createSocket(int port) {
|
|
||||||
try {
|
|
||||||
DatagramSocket socket = new DatagramSocket(port);
|
|
||||||
socket.setReuseAddress(true);
|
|
||||||
return socket;
|
|
||||||
} catch (SocketException e) {
|
|
||||||
throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口绑定异常");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private int resolvePort() {
|
|
||||||
Integer port = sntpServerProperties.getPort();
|
|
||||||
if (port == null || port < 1 || port > 65535) {
|
|
||||||
throw new BusinessException(CommonResponseEnum.FAIL, "SNTP服务启动失败,端口配置无效");
|
|
||||||
}
|
|
||||||
return port;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 关闭UDP Socket
|
|
||||||
*
|
|
||||||
* @param socket
|
|
||||||
*/
|
|
||||||
private void closeSocketQuietly(DatagramSocket socket) {
|
|
||||||
if (socket != null && !socket.isClosed()) {
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
package com.njcn.gather.detection.sntp;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@Component
|
|
||||||
@ConfigurationProperties(prefix = "sntp")
|
|
||||||
public class SntpServerProperties {
|
|
||||||
|
|
||||||
private Integer port = 123;
|
|
||||||
}
|
|
||||||
@@ -142,19 +142,6 @@ public class WebServiceManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void broadcast(String msg) {
|
|
||||||
for (Map.Entry<String, Channel> entry : userSessions.entrySet()) {
|
|
||||||
String userId = entry.getKey();
|
|
||||||
Channel channel = entry.getValue();
|
|
||||||
if (Objects.nonNull(channel) && channel.isActive()) {
|
|
||||||
channel.writeAndFlush(new TextWebSocketFrame(msg));
|
|
||||||
} else {
|
|
||||||
log.error("WebSocket broadcast failed, disconnected user, time: {}, userId: {}", LocalDateTime.now(), userId);
|
|
||||||
WebSocketHandler.cleanupSocketResources(userId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 存储检测参数(基于用户ID)
|
* 存储检测参数(基于用户ID)
|
||||||
* 支持多用户并发检测,每个用户的检测参数独立存储
|
* 支持多用户并发检测,每个用户的检测参数独立存储
|
||||||
@@ -328,4 +315,4 @@ public class WebServiceManager {
|
|||||||
sendMessage(userId, webSocketVO);
|
sendMessage(userId, webSocketVO);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,9 +66,6 @@ webSocket:
|
|||||||
port: 7778
|
port: 7778
|
||||||
|
|
||||||
#源参数下发,暂态数据默认值
|
#源参数下发,暂态数据默认值
|
||||||
sntp:
|
|
||||||
port: 123
|
|
||||||
|
|
||||||
Dip:
|
Dip:
|
||||||
#暂态前时间(s)
|
#暂态前时间(s)
|
||||||
fPreTime: 2f
|
fPreTime: 2f
|
||||||
|
|||||||
Reference in New Issue
Block a user