webSocket心跳问题处理
This commit is contained in:
@@ -24,15 +24,10 @@ public class WebServiceManager {
|
|||||||
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
|
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static void addUser(String userId, Channel channel) {
|
public static void addUser(String userId, Channel channel) {
|
||||||
userSessions.put(userId, channel);
|
userSessions.put(userId, channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void removeUser(String userId) {
|
|
||||||
String id = userSessions.get(userId).id().toString();
|
|
||||||
userSessions.remove(userId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void removeChannel(String channelId) {
|
public static void removeChannel(String channelId) {
|
||||||
// 遍历并删除
|
// 遍历并删除
|
||||||
|
|||||||
@@ -1,11 +1,17 @@
|
|||||||
package com.njcn.gather.detection.util.socket.web;
|
package com.njcn.gather.detection.util.socket.web;
|
||||||
|
|
||||||
|
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
|
||||||
|
import com.njcn.gather.detection.util.socket.CnSocketUtil;
|
||||||
|
import com.njcn.gather.detection.util.socket.SocketManager;
|
||||||
import com.njcn.gather.detection.util.socket.WebServiceManager;
|
import com.njcn.gather.detection.util.socket.WebServiceManager;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.handler.codec.http.FullHttpRequest;
|
import io.netty.handler.codec.http.FullHttpRequest;
|
||||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||||
|
import io.netty.handler.timeout.IdleStateEvent;
|
||||||
|
import io.netty.util.CharsetUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@@ -22,12 +28,15 @@ import org.springframework.stereotype.Component;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
||||||
|
|
||||||
|
private int times;
|
||||||
|
|
||||||
private final static String QUESTION_MARK = "?";
|
private final static String QUESTION_MARK = "?";
|
||||||
private final static String EQUAL_TO = "=";
|
private final static String EQUAL_TO = "=";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||||
System.out.println("服务端通道已建立" + ctx.channel().remoteAddress());
|
System.out.println("webSocket服务端通道已建立" + ctx.channel().remoteAddress());
|
||||||
|
super.channelActive(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -49,7 +58,11 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
|
|||||||
} else if (msg instanceof TextWebSocketFrame) {
|
} else if (msg instanceof TextWebSocketFrame) {
|
||||||
//正常的TEXT消息类型
|
//正常的TEXT消息类型
|
||||||
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
|
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
|
||||||
log.info("客户端收到服务器数据:{}", frame.text());
|
log.info("webSocket服务器收到客户端心跳信息:{}", frame.text());
|
||||||
|
/*if ("alive".equals(frame.text())) {
|
||||||
|
TextWebSocketFrame wd = new TextWebSocketFrame("over");
|
||||||
|
ctx.writeAndFlush(wd);
|
||||||
|
}*/
|
||||||
}
|
}
|
||||||
super.channelRead(ctx, msg);
|
super.channelRead(ctx, msg);
|
||||||
|
|
||||||
@@ -92,7 +105,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
|
|||||||
@Override
|
@Override
|
||||||
public void handlerAdded(ChannelHandlerContext ctx) {
|
public void handlerAdded(ChannelHandlerContext ctx) {
|
||||||
//WebServiceManager.addUser(userId, ctx.channel());
|
//WebServiceManager.addUser(userId, ctx.channel());
|
||||||
System.out.println("有新的连接接入:" + ctx);
|
System.out.println("webSocket有新的连接接入:" + ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -103,20 +116,42 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
System.out.println("weoSocket断线");
|
||||||
|
ctx.close();
|
||||||
|
|
||||||
|
PreDetectionParam param = new PreDetectionParam();
|
||||||
|
param.setUserPageId("cdf_Dev");
|
||||||
|
CnSocketUtil.quitSend(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||||
//空闲状态的事件
|
IdleStateEvent event = (IdleStateEvent) evt;
|
||||||
// if (evt instanceof IdleStateEvent) {
|
String eventDesc = null;
|
||||||
// IdleStateEvent event = (IdleStateEvent) evt;
|
switch (event.state()) {
|
||||||
// System.out.println(event.state() + ">>>" + ctx.channel().id());
|
case READER_IDLE:
|
||||||
// //已经10秒钟没有读时间了
|
eventDesc = "读空闲";
|
||||||
// if (event.state().equals(IdleState.READER_IDLE)){
|
System.out.println(ctx.channel().remoteAddress() + "发生超时事件--" + eventDesc);
|
||||||
// // 心跳包丢失,10秒没有收到客户端心跳 (断开连接)
|
times++;
|
||||||
// ctx.channel().close().sync();
|
if (times > 3) {
|
||||||
// System.out.println("已与 "+ctx.channel().remoteAddress()+" 断开连接");
|
System.out.println("空闲次数超过三次 关闭连接");
|
||||||
// }
|
ctx.channel().close();
|
||||||
// }
|
}
|
||||||
|
break;
|
||||||
|
case WRITER_IDLE:
|
||||||
|
eventDesc = "写空闲";
|
||||||
|
break;
|
||||||
|
case ALL_IDLE:
|
||||||
|
eventDesc = "读写空闲";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//super.userEventTriggered(ctx, evt);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -10,9 +10,12 @@ import io.netty.handler.codec.http.HttpResponseDecoder;
|
|||||||
import io.netty.handler.codec.http.HttpServerCodec;
|
import io.netty.handler.codec.http.HttpServerCodec;
|
||||||
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
|
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
|
||||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||||
|
import io.netty.handler.timeout.IdleStateHandler;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Description: webSocket服务端自定义配置
|
* @Description: webSocket服务端自定义配置
|
||||||
* @Author: wr
|
* @Author: wr
|
||||||
@@ -44,6 +47,7 @@ public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
|
|||||||
* 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
|
* 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
|
||||||
* 这个handler是将http协议升级为websocket 并且使用 101 作为响应码
|
* 这个handler是将http协议升级为websocket 并且使用 101 作为响应码
|
||||||
* */
|
* */
|
||||||
|
pipeline.addLast(new IdleStateHandler(10, -1, -1, TimeUnit.SECONDS));
|
||||||
pipeline.addLast(webSocketHandler);
|
pipeline.addLast(webSocketHandler);
|
||||||
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
|
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user