netty 读写过程

2017-12-10 16:08:32

源码分析基于netty 4

前面已经说过netty对accept事件的处理,现在来讲讲netty中的read/write过程。

开始前,先重点说说netty的Handler/Pipeline

netty的Handler分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。

Handler是netty提供的扩展点,非常重要。通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一错误处理、请求计数等工作。众多的网络协议都是通过Handler完成的, 如http、自定义rpc等。

Netty中,可以注册多个handler,形成Pipeline。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行

netty在ChannelPipeline类的注释中给出了如下示意图


                                                 I/O Request
                                            via Channel or
                                        ChannelHandlerContext
                                                      |
  +---------------------------------------------------+---------------+
  |                           ChannelPipeline         |               |
  |                                                  \|/              |
  |    +---------------------+            +-----------+----------+    |
  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  .               |
  |               .                                   .               |
  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
  |        [ method call]                       [method call]         |
  |               .                                   .               |
  |               .                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  +---------------+-----------------------------------+---------------+
                  |                                  \|/
  +---------------+-----------------------------------+---------------+
  |               |                                   |               |
  |       [ Socket.read() ]                    [ Socket.write() ]     |
  |                                                                   |
  |  Netty Internal I/O Threads (Transport Implementation)            |
  +-------------------------------------------------------------------+

就是Channel/ChannelHandlerContext writer会依次通过Outbound处理, 最后通过socket write出去
从socket read到的数据, 也会依次交给Inbound处理。

channelHandler.png

netty中还定义了ChannelInboundInvoker/ChannelOutboundInvoker, Invoker和ChannelHandler接口基本一致,只是参数少了ChannelHandlerContext(Invoker接口少了ChannelHandlerContext上下文)。

ChannelHandlerContext和ChannelPipeline接口都继承了ChannelInboundInvoker和ChannelOutboundInvoker

DefaultChannelPipeline是netty中的核心Pipeline, 聚合了ChannelHandlerContext, ChannelHandlerContext可以看做pipeline的节点。 HeadContext/TailContext(图中没展示)是Pipeline的开始/结束节点。

当触发ChannelPipeline的事件时, netty会将事件委派给ChannelHandlerContext, 再由ChannelHandlerContext委派到ChannelHandler进行处理。

ChannelHandler中有handlerAdded/handlerRemoved方法,当一个ChannelHandler添加/移除Pipeline中,会触发这些事件。
ChannelInitializer是一个特殊的InboundHandler,提供了抽象的initChannel方法,用于提供给用户向pipeline添加自定义的ChannelHandler

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

可以看到, ChannelInitializer添加到Pipeline后, 会调用initChannel方法

值得注意的是,DefaultChannelPipeline的handlerAdded是通过task执行的(不过有时也会强行执行)。
可以看一下DefaultChannelPipeline.addLast

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // 如果registered为false,那channel就没有注册到任何eventloop
        // 所以调用callHandlerCallbackLater方法,延时进行
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        // 立即执行task
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

callHandlerCallbackLater就是把task放到pendingHandlerCallbackHead,当loop启动后,会执行这些task

回想一下启动注册过程中调用的AbstractUnsafe.register0方法, 会调用pipeline.invokeHandlerAddedIfNeeded();, 这里会强制执行pendingHandlerCallbackHead。

先来看一个小栗子
我们将tcp请求的内容通过”|”分割为一个字符串数组,进行逻辑处理后,再将结果数组用”|”合并为一个字符串返回给用户。

负责分割的InboundHandler

public class SplitHandler  extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("split in inboundHandler channelRead");
        ByteBuf in = (ByteBuf) msg;
        byte[] bytes = new byte[in.writerIndex()];
        in.readBytes(bytes);
        String s = new String(bytes);

        System.out.println("read string : " + s);
        ctx.fireChannelRead(s.split("\\|"));
    }
}

负责合并的OutboundHandler

public class MergeHandler extends ChannelOutboundHandlerAdapter {
    public void read(ChannelHandlerContext ctx) throws Exception {
        System.out.println("outbound read");
        ctx.read();
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("outbound write");

        String[] arr = (String[])msg;
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < arr.length; i++) {
            if(i > 0) {
                sb.append("|");
            }
            sb.append(arr[i]);
        }
        System.out.println("response str : " + sb.toString());
        byte[] bytes = sb.toString().getBytes();
        ByteBuf byteBuf = ctx.alloc().buffer(bytes.length);
        byteBuf.writeBytes(bytes);
        ctx.write(byteBuf, promise);
    }
}

这里重写read方法, 主要是想关注该方法的触发时机。

简单的逻辑处理:

public class LogicHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server inboundHandler channelRead");
        String[] arr = (String[])msg;

        for (int i = 0; i < arr.length; i++) {
            String s = arr[i];
            System.out.println("split result : " + s);
            arr[i] = "ok for " + s;
        }
        ctx.writeAndFlush(arr);
    }
}

server端

ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 100)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();

                p.addLast(new MergeHandler());
                p.addLast(new SplitHandler());
                p.addLast(new LogicHandler());

            }
        });

运行栗子后, 用telnet发送123|456字符串, 得到结果

binecy ~/work/shadowsocks $ telnet 127.0.0.1 8007
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
123|345
ok for 123|ok for 345

服务端输出

split in inboundHandler channelRead
read string : 123|456

logic handle in inboundHandler channelRead
split result : 123
split result : 456

merge in outboundHandler write
response str : ok for 123|ok for 456

fire outboundHandler read

以这个栗子入手,分析netty中read/write过程
注意一下, accept后, netty中pipeline如下:
head > ServerBootstrapAcceptor > MergeHandler > SplitHandler > LogicHandler > tail

read

回顾一下, NioEventLoop中对read事件的处理

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();

    ...
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
        if (!ch.isOpen()) {
            return;
        }
    }
}

unsafe.read()这个方法, 会调用到AbstractNioByteChannel.read():

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;

    do {
        // 申请缓存区空间
        byteBuf = allocHandle.allocate(allocator);
        // 从socket读取数据到缓存区
        allocHandle.lastBytesRead(doReadBytes(byteBuf));

        allocHandle.incMessagesRead(1);

        // 触发ChannelRead事件
        pipeline.fireChannelRead(byteBuf);
        byteBuf = null;
    } while (allocHandle.continueReading());

    allocHandle.readComplete();
    // 触发ChannelReadComplete事件
    pipeline.fireChannelReadComplete();

pipeline.fireChannelRead触发read事件,看看DefaultChannelPipeline的处理

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

很简单, pipeline的事件会委托给ChannelHandlerContext处理, 从head开始处理

invokeChannelRead会调用到AbstractChannelHandlerContext.invokeChannelRead

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}


private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {    // 检查handler的状态
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

invokeHandler()会检查ChannelHandler是否已经调用了handlerAdded

看看HeadContext.channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

ctx.fireChannelRead调用的是AbstractChannelHandlerContext.fireChannelRead:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

findContextInbound会找到下一个InboundHandler

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

ctx.fireChannelRead(msg);的作用就是找到下一个InboundHandler,并调用它的fireChannelRead方法, 所以我们重写InboundHandler的fireChannelRead方法,方法最后也要调用ctx.fireChannelRead(msg);,以免调用链就此断掉, 除非使用write。这里会沿pipeline,依次查找InboundHandler并fireChannelRead方法。

write

下面看看write过程
LogicHandler中调用ctx.writeAndFlush触发write过程, 调用到AbstractChannelHandlerContext.write:

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

findContextOutbound会找到当前节点前一个OutboundHandler

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

next.invokeWriteAndFlush还是调用到AbstractChannelHandlerContext.invokeWriteAndFlush

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

invokeWrite0也比较简单, 就是调用handler的处理

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

LogicHandler前一个OutboundHandler是MergeHandler, 所以会调用到MergeHandler.write方法, 进行字符串数组合并。

MergeHandler.write调用ctx.write会调用HeadContext, 注意, HeadContext既是InboundHandler, 也是OutboundHandler

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里调用了AbstractUnsafe.write, 将数据write到socket中,具体过程这里不再描述。

invokeFlush0();也是类似的流程, 这里不再复述。

那么OutboundHandler的read事件, 是在哪里触发的呢? 其实是在fireChannelReadComplete中
pipeline.fireChannelReadComplete(); 会调用到DefaultChannelPipeline.channelReadComplete

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelReadComplete();

    readIfIsAutoRead();
}


private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

如果配置为AutoRead, 就会调用channel.read(), 进而调用 pipeline.read(), 最终就会触发MergeHandler.read方法。

到这里, netty启动, accept, read/write的一个完整流程都讲完了。
netty是非常优秀的框架, 模块化做到很好, 对jdk的future, buffer这些模块都做了扩展,还自行进行了内存管理。
对netty流程熟悉后, 就可以继续学习netty的这些闪光点, 网上也有很多优秀的教程了。

下面是一些非常优秀的netty博客:
Netty源码分析-占小狼
Netty那点事-黄亿华
Netty系列之Netty线程模型-李林锋
Netty系列之Netty高性能之道-李林锋
Netty_in_Action-译文