//springboot(version2.0.2)整合netty(version4.1.25)搭建的websocket服务端,握手阻塞,握手成功(handlerAdded调用)但是未返回握手成功信息给前端,
//前端websocket握手阻塞,连接超时
@Component
public class NettySocket {
private static class SingletionSocketServer {
static final NettySocket INSTANCE = new NettySocket();
}
public static NettySocket getInstance() {
return SingletionSocketServer.INSTANCE;
}
private EventLoopGroup parentGroup;
private EventLoopGroup childGroup;
private ServerBootstrap serverBootstrap;
private ChannelFuture future;
public NettySocket() {
parentGroup = new NioEventLoopGroup();
childGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).childHandler(new NettySocketInitializer());
}
public void start() {
this.future = serverBootstrap.bind(8065);
System.out.println("netty WebSocket start ok");
}
}初始化器public class NettySocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
////////////////http协议的支持 ///////////////
//Http编解码器
channelPipeline.addLast(new HttpServerCodec());
//对写大数据流的支持
channelPipeline.addLast(new ChunkedWriteHandler());
//Http对象聚合器,,参数:消息的最大长度
//几乎在Netty中的编程都会使用到这个handler
channelPipeline.addLast(new HttpObjectAggregator(1024 * 64));
////////////////http协议的支持 END///////////////
channelPipeline.addLast(new WebSocketServerProtocolHandler("/nt-ws"));
channelPipeline.addLast(new ChatHandler());
}
}消息处理@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 记录和管理所有客户端
*/
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String content = msg.text();
log.info("accept: {}", content);
for (Channel channel : clients) {
channel.writeAndFlush(new TextWebSocketFrame("服务器消息[" + ctx.channel().id().asLongText() + "]: " + content));
}
// clients.writeAndFlush(new TextWebSocketFrame("服务器消息[" + ctx.channel().id().asLongText() + "]: " + content));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
clients.add(ctx.channel());
log.info("客户端连接 —— channel id : {}", ctx.channel().id().asLongText());
log.info("剩余客户端:{}", clients.size());
// ctx.channel().writeAndFlush(new TextWebSocketFrame("连接成功!当前在线人数:" + clients.size()));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//触发该方法ChannelGroup会自动移除对应客户端的channel,所以不需要专门移除
// clients.remove(ctx.channel());
log.info("客户端连接断开 —— channel id : {}", ctx.channel().id().asLongText());
log.info("剩余客户端:{}", clients.size());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
ctx.channel().close();
clients.remove(ctx.channel());
}
}Booter启动类@Component
public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) {
try {
NettySocket.getInstance().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
添加回答
举报
0/150
提交
取消