初始化检测模块

This commit is contained in:
wr
2024-12-11 11:33:34 +08:00
parent 35ee76888d
commit 208ba4984b
15 changed files with 721 additions and 0 deletions

41
detection/pom.xml Normal file
View File

@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.njcn.gather</groupId>
<artifactId>CN_Gather</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>detection</artifactId>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>njcn-common</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>mybatis-plus</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>spingboot2.3.12</artifactId>
<version>2.3.12</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,41 @@
package com.njcn.gather.detection.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author wr
* @description
* @date 2024/12/10 14:25
*/
@Slf4j
@Api(tags = "守时检测")
@RestController
@RequestMapping("/punctuality")
@RequiredArgsConstructor
public class PunctualityController extends BaseController {
// private final PunctualityService punctualityService;
@OperateInfo
@PostMapping("/deliveryTime")
@ApiOperation("下发守时检测")
@ApiImplicitParam(name = "queryParam", value = "查询参数", required = true)
public HttpResult<?> list(@RequestBody Object param) {
String methodDescribe = getMethodDescribe("list");
// punctualityService.triggerTimeMark();
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,32 @@
package com.njcn.gather.detection.service;
/**
* @author wr
* @description 预检测流程
* @date 2024/12/10 13:44
*/
public interface PreDetectionService {
/**
* 源通讯校验
*/
void sourceCommunicationCheck();
/**
* 装置通讯校验
*/
void deviceCommunicationCheck();
/**
* 协议校验
*/
void agreementCheck();
/**
* 相序校验
*/
void phaseSequenceCheck();
}

View File

@@ -0,0 +1,14 @@
package com.njcn.gather.detection.service;
/**
* @Description: 守时检测流程
* @Author: wr
* @Date: 2024/12/10 14:23
*/
public interface PunctualityService {
/**
* 触发时间标识
*/
void triggerTimeMark();
}

View File

@@ -0,0 +1,132 @@
package com.njcn.gather.detection.util.scoket;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
/**
* @Description: 心跳检测服务端 对应的服务端在netty-server 包下的NettyClient
* @Author: wr
* @Date: 2024/12/10 14:16
*/
public class NettyClient {
private static Bootstrap bootstrap;
public static void socketClient(String msg) {
try {
NioEventLoopGroup group = new NioEventLoopGroup();
if (ObjectUtil.isNull(bootstrap)) {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
//空闲状态的handler
// .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new NettyClientHandler());
}
});
}
//连接
connect(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 重连方法
*/
public static void connect(String msg) {
try {
bootstrap.connect("127.0.0.1", 8787).sync()
.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
ch.channel().close();
final EventLoop loop = ch.channel().eventLoop();
loop.schedule(() -> {
System.err.println("服务端链接不上,开始重连操作...");
//重连
connect(msg);
}, 3L, TimeUnit.SECONDS);
} else {
if (StrUtil.isNotBlank(msg)) {
ch.channel().writeAndFlush(msg);
}
System.out.println("服务端链接成功...");
}
});
} catch (Exception e) {
System.out.println(e.getMessage());
try {
Thread.sleep(3000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//再重连
connect(msg);
}
}
/**
* 重连方法
*/
public static void connect() {
try {
bootstrap.connect("127.0.0.1", 8787).sync()
.addListener((ChannelFutureListener) ch -> {
if (!ch.isSuccess()) {
ch.channel().close();
final EventLoop loop = ch.channel().eventLoop();
loop.schedule(() -> {
System.err.println("服务端链接不上,开始重连操作...");
//重连
connect();
}, 3L, TimeUnit.SECONDS);
} else {
ch.channel().writeAndFlush("ping");
System.out.println("服务端链接成功...");
}
});
} catch (Exception e) {
System.out.println(e.getMessage());
try {
Thread.sleep(3000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//再重连
connect();
}
}
}

View File

@@ -0,0 +1,95 @@
package com.njcn.gather.detection.util.scoket;
import com.njcn.gather.detection.util.webscoket.UserSessionManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
/**
* @Description: 客户端业务处理
* @Author: wr
* @Date: 2024/12/10 14:16
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 当通道进行连接时推送消息
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道已建立" + ctx.channel().remoteAddress());
}
/**
* 处理服务端消息消息信息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
if(msg.endsWith("结束")){
Channel userId = UserSessionManager.getChannelByUserId("userId");
TextWebSocketFrame wd1 = new TextWebSocketFrame("流程结束");
userId.writeAndFlush(wd1);
ctx.close();
}else{
System.out.println("服务端消息 == " + msg);
Scanner scanner = new Scanner(System.in);
System.out.print("一个信息返回客户端:");
String text = scanner.nextLine();
ctx.writeAndFlush(text);
}
}
/**
* 当通道断线时,支持重连
* @param ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// System.out.println("断线了......" + ctx.channel());
// ctx.channel().eventLoop().schedule(() -> {
// System.out.println("断线重连......");
// //重连
// NettyClient.connect();
// }, 3L, TimeUnit.SECONDS);
}
/**
* 用户事件的回调方法(自定义事件用于心跳机制)
*
* @param ctx
* @param evt
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
//如果是空闲状态事件
// if (evt instanceof IdleStateEvent) {
// if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
// System.out.println("空闲" + ctx.channel().id());
// //发送ping 保持心跳链接
// TextWebSocketFrame resp = new TextWebSocketFrame(ctx.channel().id() + " ping");
// ctx.writeAndFlush(resp);
// }
// }else {
// userEventTriggered(ctx,evt);
// }
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("有通道接入" + ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@@ -0,0 +1,66 @@
package com.njcn.gather.detection.util.scoket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @Description: NettyServer 心跳检测服务端
*
* Netty心跳检测与断线重连
* 需求:
* 1、客户端利用空闲状态给服务端发送心跳ping命令保持长连接不被关闭
* 2、服务端如果超过指定的时间没有收到客户端心跳则关闭连接
* 3、服务端关闭连接触发客户端的channelInactive方法在此方法中进行重连
* @Author: wr
* @Date: 2024/12/10 14:18
*/
public class NettyServer {
public static final int port = 8787;
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer();
nettyServer.run();
}
private void run() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap().group(boss, work);
bootstrap.channel(NioServerSocketChannel.class)
//这个处理器可以不写
.handler(new ChannelInitializer<ServerSocketChannel>() {
@Override
protected void initChannel(ServerSocketChannel ch) {
System.out.println("服务正在启动中......");
}
})
//业务处理
.childHandler(NettyServerChannelInitializer.INSTANCE);
ChannelFuture future = bootstrap.bind(port).sync();
future.addListener(f -> {
if (future.isSuccess()) {
System.out.println("服务启动成功");
} else {
System.out.println("服务启动失败");
}
});
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}

View File

@@ -0,0 +1,35 @@
package com.njcn.gather.detection.util.scoket;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* @Description: 服务端初始化配置
* @Author: wr
* @Date: 2024/12/10 14:18
*/
@ChannelHandler.Sharable
public class NettyServerChannelInitializer extends ChannelInitializer<NioSocketChannel> {
public static final NettyServerChannelInitializer INSTANCE = new NettyServerChannelInitializer();
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
//空闲状态的处理器
// .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(NettyServerHandler.INSTANCE);
}
}

View File

@@ -0,0 +1,71 @@
package com.njcn.gather.detection.util.scoket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import java.util.Scanner;
/**
* @Description: 客户端业务处理
* @Author: wr
* @Date: 2024/12/10 14:18
*/
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
public static final NettyServerHandler INSTANCE = new NettyServerHandler();
/**
* @Description: 当通道进行连接时推送消息
* @Author: wr
* @Date: 2024/12/10 14:19
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* 处理消息信息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) {
Channel channel = ctx.channel();
System.out.println("socket客户端接收成功"+msg);
if(msg.endsWith("结束")) {
channel.writeAndFlush("socket指令结果"+msg);
}else{
channel.writeAndFlush("socket指令结果成功指令");
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("有新连接加入了++++......" + ctx.channel());
}
/**
* 用户事件的回调方法(自定义事件用于心跳机制)
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//空闲状态的事件
// if (evt instanceof IdleStateEvent) {
// IdleStateEvent event = (IdleStateEvent) evt;
// System.out.println(event.state() + ">>>" + ctx.channel().id());
// //已经10秒钟没有读时间了
// if (event.state().equals(IdleState.READER_IDLE)){
// // 心跳包丢失10秒没有收到客户端心跳 (断开连接)
// ctx.channel().close().sync();
// System.out.println("已与 "+ctx.channel().remoteAddress()+" 断开连接");
// }
// }
}
}

View File

@@ -0,0 +1,23 @@
package com.njcn.gather.detection.util.webscoket;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class UserSessionManager {
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
public static void addUser(String userId, Channel channel) {
userSessions.put(userId, channel);
}
public static void removeUser(String userId) {
userSessions.remove(userId);
}
public static Channel getChannelByUserId(String userId) {
return userSessions.get(userId);
}
}

View File

@@ -0,0 +1,74 @@
package com.njcn.gather.detection.util.webscoket;
import com.njcn.gather.detection.util.scoket.NettyClient;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.Map;
/**
* @Description: 泛型 代表的是处理数据的单位
* TextWebSocketFrame : 文本信息帧
* @Author: wr
* @Date: 2024/12/10 13:56
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("成功连接");
ctx.writeAndFlush("22222222222222222222");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
System.out.println("服务端消息 == " + msg.text());
// Channel userId = UserSessionManager.getChannelByUserId("userId");
// TextWebSocketFrame wd1 = new TextWebSocketFrame("------------------cs");
// userId.writeAndFlush(wd1);
if(msg.text().equals("下发指令")){
NettyClient.socketClient(msg.text());
}
//可以直接调用text 拿到文本信息帧中的信息
Channel channel = ctx.channel();
TextWebSocketFrame resp = new TextWebSocketFrame(msg.text());
channel.writeAndFlush(resp);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
String userId = "userId";
UserSessionManager.addUser(userId, ctx.channel());
System.out.println("有新的连接接入:"+ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 假设用户 ID 是从某个地方获取的,这里简单示例为 "userId"
String userId = "userId";
UserSessionManager.removeUser(userId);
System.out.println("有连接退出: " + ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
//空闲状态的事件
// if (evt instanceof IdleStateEvent) {
// IdleStateEvent event = (IdleStateEvent) evt;
// System.out.println(event.state() + ">>>" + ctx.channel().id());
// //已经10秒钟没有读时间了
// if (event.state().equals(IdleState.READER_IDLE)){
// // 心跳包丢失10秒没有收到客户端心跳 (断开连接)
// ctx.channel().close().sync();
// System.out.println("已与 "+ctx.channel().remoteAddress()+" 断开连接");
// }
// }
}
}

View File

@@ -0,0 +1,42 @@
package com.njcn.gather.detection.util.webscoket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @Description: webSocket服务端自定义配置
* @Author: wr
* @Date: 2024/12/10 14:20
*/
public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//设置心跳机制
// ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
//增加编解码器 的另一种方式
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpResponseDecoder());
//块方式写的处理器 适合处理大数据
pipeline.addLast(new ChunkedWriteHandler());
//聚合
pipeline.addLast(new HttpObjectAggregator(512 * 1024));
/*
* 这个时候 我们需要声明我们使用的是 websocket 协议
* netty为websocket也准备了对应处理器 设置的是访问路径
* 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
* 这个handler是将http协议升级为websocket 并且使用 101 作为响应码
* */
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new WebSocketHandler());
}
}

View File

@@ -0,0 +1,49 @@
package com.njcn.gather.detection.util.webscoket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
/**
* @Description: websocket服务端
* @Author: wr
* @Date: 2024/12/10 13:59
*/
public class WebSocketService {
public static void main(String[] args) {
//可以自定义线程的数量
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 默认创建的线程数量 = CPU 处理器数量 *2
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
//当前连接被阻塞的时候BACKLOG代表的事 阻塞队列的长度
.option(ChannelOption.SO_BACKLOG, 128)
//设置连接保持为活动状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitializer());
try {
ChannelFuture future = serverBootstrap.bind(7777).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@@ -26,6 +26,11 @@
<artifactId>user</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn.gather</groupId>
<artifactId>detection</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>

View File

@@ -12,6 +12,7 @@
<module>system</module>
<module>user</module>
<module>device</module>
<module>detection</module>
</modules>
<packaging>pom</packaging>
<name>融合各工具的项目</name>