为了账号安全,请及时绑定邮箱和手机立即绑定

为什么 Netty HTTP 处理程序不可共享?

为什么 Netty HTTP 处理程序不可共享?

慕盖茨4494581 2023-12-13 17:04:25
每当打开新连接时, Netty 都会实例化一组请求处理程序类。对于像 Websocket 这样的东西来说,这似乎很好,因为连接将在 Websocket 的生命周期内保持打开状态。当使用 Netty 作为每秒可以接收数千个请求的 HTTP 服务器时,这似乎会对垃圾收集造成相当大的负担。每个请求都会实例化几个类(在我的例子中是 10 个处理程序类),然后垃圾收集器会在几毫秒后收集它们。在中等负载 ~1000 req/sec 的 HTTP 服务器中,每秒需要实例化和垃圾收集一万个类。看来我们可以简单地 看到下面的答案,创建可共享的处理程序,这将使用 消除这种巨大的 GC 开销ChannelHandler.Sharable。它们只需是线程安全的。但是,我发现库中打包的所有非常基本的 HTTP 处理程序都是不可共享的,例如HttpServerCodec和HttpObjectAggregator。此外,所有HTTP 处理程序示例都是不可共享的。99% 的示例代码和教程似乎都不关心它。Norman Maurer 的书(Netty 作者)中只有一则简介给出了使用共享处理程序的原因:为什么要共享 ChannelHandler?在多个 ChannelPipelines 中安装单个 ChannelHandler 的常见原因是跨多个 Channel 收集统计信息。任何地方都没有提到 GC 负载问题。Netty 已在常规生产环境中使用了近十年。它可以说是目前最常用的高度并发非阻塞 IO 的 Java 库。换句话说,它的设计目的是远远超过我每秒 1000 个适度的请求。我是否错过了一些使 GC 负载不再成为问题的东西?或者,我是否应该尝试实现自己的Sharable处理程序,具有类似的功能来解码、编码和写入 HTTP 请求和响应?
查看完整描述

1 回答

?
呼唤远方

TA贡献1856条经验 获得超11个赞

虽然我们始终致力于在 Netty 中产生尽可能少的 GC,但在某些情况下这是不可能的。例如,http 编解码器等保留每个连接的状态,因此这些无法共享(即使它们是线程安全的)。

解决这个问题的唯一方法是对它们进行池化,但我认为还有其他对象更有可能导致 GC 问题,对于这些对象,我们会在可能的情况下尝试进行池化。


查看完整回答
反对 回复 2023-12-13
?
叮当猫咪

TA贡献1776条经验 获得超12个赞

长话短说:

如果使用默认 HTTP 处理程序达到了使 GC 成为问题所需的量,那么无论如何都应该使用代理服务器进行扩展。


在 Norman 的回答之后,我最终尝试了一个非常简单的可共享 HTTP 编解码器/聚合器 POC,看看这是否值得追求。

我的可共享解码器距离RFC 7230还很远,但它满足了我当前项目的足够要求。

然后我使用httperfVisualVM来了解 GC 负载差异的概念。经过我的努力,GC 率仅降低了 10%。换句话说,这确实没有多大区别。

唯一真正值得赞赏的效果是,与使用打包的非共享 HTTP 编解码器 + 聚合器和我的可共享编解码器相比,在运行 1000 个请求/秒时,我的错误减少了 5%。只有当我每秒执行 1000 个请求并持续超过 10 秒时才会出现这种情况。

最后我不会去追求它。为了获得通过使用代理服务器可以解决的微小好处而将其变成完全兼容 HTTP 的解码器所需的时间根本不值得。

出于参考目的,这里是我尝试过的组合可共享解码器/聚合器:

import java.util.concurrent.ConcurrentHashMap;


import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandler.Sharable;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelId;

import io.netty.channel.ChannelInboundHandlerAdapter;


@Sharable

public class SharableHttpDecoder extends ChannelInboundHandlerAdapter {


    private static final ConcurrentHashMap<ChannelId, SharableHttpRequest> MAP = 

            new ConcurrentHashMap<ChannelId, SharableHttpRequest>();

    

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) 

        throws Exception 

    {        

        if (msg instanceof ByteBuf) 

        {

            ByteBuf buf = (ByteBuf) msg;

            ChannelId channelId = ctx.channel().id();

            SharableHttpRequest request = MAP.get(channelId);

                                    

            if (request == null)

            {

                request = new SharableHttpRequest(buf);

                buf.release();

                if (request.isComplete()) 

                {

                    ctx.fireChannelRead(request);

                }

                else

                {

                    MAP.put(channelId, request);

                }

            }

            else

            {

                request.append(buf);

                buf.release();

                if (request.isComplete()) 

                {

                    ctx.fireChannelRead(request);

                }

            }

        }

        else

        {

            // TODO send 501

            System.out.println("WTF is this? " + msg.getClass().getName());

            ctx.fireChannelRead(msg);

        }

    }

    

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

        throws Exception 

    {

        System.out.println("Unable to handle request on channel: " + 

            ctx.channel().id().asLongText());

        cause.printStackTrace(System.err);

        

        // TODO send 500

        ctx.fireExceptionCaught(cause);

        ctx.close();

    }

    

}

解码器创建的用于在管道上处理的结果对象:


import java.util.Arrays;

import java.util.HashMap;

import io.netty.buffer.ByteBuf;


public class SharableHttpRequest

{

    

    private static final byte SPACE = 32;

    private static final byte COLON = 58;

    private static final byte CARRAIGE_RETURN = 13;

    

    private HashMap<Header,String> myHeaders;

    private Method myMethod;

    private String myPath;

    private byte[] myBody;

    private int myIndex = 0;

    

    public SharableHttpRequest(ByteBuf buf)

    {

        try

        {

            myHeaders = new HashMap<Header,String>();

            final StringBuilder builder = new StringBuilder(8);

            parseRequestLine(buf, builder);

            while (parseNextHeader(buf, builder));

            parseBody(buf);

        }

        catch (Exception e)

        {

            e.printStackTrace(System.err);

        }

    }

    

    public String getHeader(Header name)

    {

        return myHeaders.get(name);

    }

    

    public Method getMethod()

    {

        return myMethod;

    }

    

    public String getPath()

    {

        return myPath;

    }

    

    public byte[] getBody()

    {

        return myBody;

    }

    

    public boolean isComplete()

    {

        return myIndex >= myBody.length;

    }

    

    public void append(ByteBuf buf)

    {

        int length = buf.readableBytes();

        buf.getBytes(buf.readerIndex(), myBody, myIndex, length);

        myIndex += length;

    }


    private void parseRequestLine(ByteBuf buf, StringBuilder builder)

    {

        int idx = buf.readerIndex();

        int end = buf.writerIndex();

        for (; idx < end; ++idx)

        {

            byte next = buf.getByte(idx);

            

            // break on CR

            if (next == CARRAIGE_RETURN)

            {

                break;

            }

            

            // we need the method

            else if (myMethod == null)

            {

                if (next == SPACE)

                {

                    myMethod = Method.fromBuilder(builder);

                    builder.delete(0, builder.length());

                    builder.ensureCapacity(100);

                }

                else

                {

                    builder.append((char) next);

                }

            }

            

            // we need the path

            else if (myPath == null)

            {

                if (next == SPACE)

                {

                    myPath = builder.toString();

                    builder.delete(0, builder.length());

                }

                else

                {

                    builder.append((char) next);

                }

            }

            

            // don't need the version right now

        }

        idx += 2; // skip line endings

        buf.readerIndex(idx);

    }

    

    private boolean parseNextHeader(ByteBuf buf, StringBuilder builder)

    {

        Header header = null;

        int idx = buf.readerIndex();

        int end = buf.writerIndex();

        for (; idx < end; ++idx)

        {

            byte next = buf.getByte(idx);

            

            // break on CR

            if (next == CARRAIGE_RETURN)

            {

                if (header != Header.UNHANDLED)

                {

                    myHeaders.put(header,builder.toString());

                    builder.delete(0, builder.length());

                }

                break;

            }

            

            else if (header == null)

            {

                // we have the full header name

                if (next == COLON)

                {

                    header = Header.fromBuilder(builder);

                    builder.delete(0, builder.length());

                }


                // get header name as lower case for mapping purposes

                else

                {

                    builder.append(next > 64 && next < 91 ? 

                        (char) ( next | 32 ) : (char) next);

                }

            }

            

            // we don't care about some headers

            else if (header == Header.UNHANDLED)

            {

                continue;

            }

            

            // skip initial spaces

            else if (builder.length() == 0 && next == SPACE)

            {

                continue;

            }

            

            // get the header value

            else

            {

                builder.append((char) next);

            }

        }

        

        idx += 2; // skip line endings

        buf.readerIndex(idx);

        

        if (buf.getByte(idx) == CARRAIGE_RETURN)

        {

            idx += 2; // skip line endings

            buf.readerIndex(idx);

            return false;

        }

        else

        {

            return true;

        }

    }

    

    private void parseBody(ByteBuf buf)

    {

        int length = buf.readableBytes();

        if (length == 0)

        {

            myBody = new byte[0];

            myIndex = 1;

        }

        else

        {

            System.out.println("Content-Length: " + myHeaders.get(Header.CONTENT_LENGTH));

            if (myHeaders.get(Header.CONTENT_LENGTH) != null)

            {

                int totalLength = Integer.valueOf(myHeaders.get(Header.CONTENT_LENGTH));

                myBody = new byte[totalLength];

                buf.getBytes(buf.readerIndex(), myBody, myIndex, length);

                myIndex += length;

            }

            

            // TODO handle chunked

        }

    }

    

    

    

    

    public enum Method

    {

        GET(new char[]{71, 69, 84}), 

        POST(new char[]{80, 79, 83, 84}),

        UNHANDLED(new char[]{}); // could be expanded if needed

        

        private char[] chars;


        Method(char[] chars) 

        {

            this.chars = chars;

        }

        

        public static Method fromBuilder(StringBuilder builder) 

        {

            for (Method method : Method.values()) 

            {

                if (method.chars.length == builder.length()) 

                {

                    boolean match = true;

                    for (int i = 0; i < builder.length(); i++) 

                    {

                        if (method.chars[i] != builder.charAt(i)) 

                        {

                            match = false;

                            break;

                        }

                    }

                    

                    if (match)

                    {

                        return method;

                    }

                }

            }

            return null;

        }

    }

    

    public enum Header

    {

        HOST(new char[]{104, 111, 115, 116}), 

        CONNECTION(new char[]{99, 111, 110, 110, 101, 99, 116, 105, 111, 110}),

        IF_MODIFIED_SINCE(new char[]{

            105, 102, 45, 109, 111, 100, 105, 102, 105, 101, 100, 45, 115, 

            105, 110, 99, 101}),

        COOKIE(new char[]{99, 111, 111, 107, 105, 101}),

        CONTENT_LENGTH(new char[]{

            99, 111, 110, 116, 101, 110, 116, 45, 108, 101, 110, 103, 116, 104}),

        UNHANDLED(new char[]{}); // could be expanded if needed

        

        private char[] chars;


        Header(char[] chars) 

        {

            this.chars = chars;

        }

        

        public static Header fromBuilder(StringBuilder builder) 

        {

            for (Header header : Header.values()) 

            {

                if (header.chars.length == builder.length()) 

                {                    

                    boolean match = true;

                    for (int i = 0; i < builder.length(); i++) 

                    {

                        if (header.chars[i] != builder.charAt(i)) 

                        {

                            match = false;

                            break;

                        }

                    }

                    

                    if (match)

                    {

                        return header;

                    }

                }

            }

            return UNHANDLED;

        }

    }

}

用于测试的简单处理程序:


import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandler.Sharable;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.util.CharsetUtil;


@Sharable

public class SharableHttpHandler extends SimpleChannelInboundHandler<SharableHttpRequest>

{    

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, SharableHttpRequest msg) 

        throws Exception

    {

        String message = "HTTP/1.1 200 OK\r\n" +

                "Content-type: text/html\r\n" + 

                "Content-length: 42\r\n\r\n" + 

                "<html><body>Hello sharedworld</body><html>";

        

        ByteBuf buffer = ctx.alloc().buffer(message.length());

        buffer.writeCharSequence(message, CharsetUtil.UTF_8);

        ChannelFuture flushPromise = ctx.channel().writeAndFlush(buffer);

        flushPromise.addListener(ChannelFutureListener.CLOSE);

        if (!flushPromise.isSuccess()) 

        {

            flushPromise.cause().printStackTrace(System.err);

        }

    }    

}

使用这些可共享处理程序的完整管道:


import tests.SharableHttpDecoder;

import tests.SharableHttpHandler;


import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;


public class ServerPipeline extends ChannelInitializer<SocketChannel>

{

    private final SharableHttpDecoder decoder = new SharableHttpDecoder();

    private final SharableHttpHandler handler = new SharableHttpHandler();


    @Override

    public void initChannel(SocketChannel channel)

    {

        ChannelPipeline pipeline = channel.pipeline();

        pipeline.addLast(decoder);

        pipeline.addLast(handler);

        

    }

}

上面的内容针对这个(更常见的)非共享管道进行了测试:


import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;


import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.http.DefaultFullHttpResponse;

import io.netty.handler.codec.http.FullHttpRequest;

import io.netty.handler.codec.http.FullHttpResponse;

import io.netty.handler.codec.http.HttpHeaderNames;

import io.netty.handler.codec.http.HttpHeaderValues;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.HttpUtil;

import io.netty.util.CharsetUtil;


public class ServerPipeline extends ChannelInitializer<SocketChannel>

{


    @Override

    public void initChannel(SocketChannel channel)

    {

        ChannelPipeline pipeline = channel.pipeline();

        pipeline.addLast(new HttpServerCodec());

        pipeline.addLast(new HttpObjectAggregator(65536));

        pipeline.addLast(new UnsharedHttpHandler());

        

    }

    

    class UnsharedHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest>

    {


        @Override

        public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) 

            throws Exception

        {

            String message = "<html><body>Hello sharedworld</body><html>";

            ByteBuf buffer = ctx.alloc().buffer(message.length());

            buffer.writeCharSequence(message.toString(), CharsetUtil.UTF_8);


            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buffer);

            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");

            HttpUtil.setContentLength(response, response.content().readableBytes());

            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

            ChannelFuture flushPromise = ctx.writeAndFlush(response);

            flushPromise.addListener(ChannelFutureListener.CLOSE);

                        

        }

    }

}



查看完整回答
反对 回复 2023-12-13
  • 1 回答
  • 0 关注
  • 137 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信