netty accept事件处理

2017-12-10 09:46:53

源码分析基于netty 4

前面已经说了netty启动过程。接着讲讲netty对accept事件的处理

前面说过,NioEventLoop的run方法正是loop的核心
run()比较复杂, 涉及wakenUp,ioRatio等内容, 抽取主要流程如下:

for (;;) {
    try {
        try {
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            runAllTasks();
        }
    } catch (Throwable t) {
                handleLoopException(t);
            }                
}

processSelectedKeys处理io事件

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

一般会走processSelectedKeysOptimized逻辑

NioEventLoop中的selectedKeys是netty自定义的SelectedSelectionKeySet,继承自AbstractSet,里面有两个SelectionKey数组, add时会添加key到其中一个数组, flip方法会取出当前存储的数组, 并修改存储标示使add添加到另一数组。

NioEventLoop中聚合了java.nio.channels.Selector。注意openSelector方法,通过反射, 将SelectedSelectionKeySet设置到Selector的selectedKeys属性, 这样jdk底层产生的selectedKey会add到SelectedSelectionKeySet中

    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

    selectedKeysField.setAccessible(true);
    publicSelectedKeysField.setAccessible(true);

    selectedKeysField.set(selector, selectedKeySet);
    publicSelectedKeysField.set(selector, selectedKeySet);

回到processSelectedKeysOptimized方法

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];


        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 网络事件
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
    }
}

网络事件会触发processSelectedKey(SelectionKey k, AbstractNioChannel ch):

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);

        unsafe.finishConnect();
    }

    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
        if (!ch.isOpen()) {
            return;
        }
    }
}

可以看到这里netty对各种网络事件做了处理

这里只关注OP_READ和OP_ACCEPT事件,来看对应的处理方法,主要是unsafe.read()方法(虽然方法名为read,但netty将它作为基本逻辑处理抽象,它也可以处理其他事件,如accept),该方法是抽象方法,有两个实现类:NioByteUnsafe和AbstractNioMessageChannel

回顾前面说得的启动过程,channel注册时,NioServerSocketChannel注册了eventloop.selector的OP_READ事件, 所以当OP_READ发生时,会调用到NioMessageUnsafe.read(NioServerSocketChannel继承自AbstractNioMessageChannel)

public void read() {
    ...
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }

        allocHandle.incMessagesRead(localRead);
    } while (allocHandle.continueReading());

    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        // 触发read事件
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    allocHandle.readComplete();
    // 触发readComplete事件
    pipeline.fireChannelReadComplete();
}

这里主要是通过模板方法doReadMessages, 将readBuf交给子类处理, 所以来看看NioServerSocketChannel实现的doReadMessages方法:

SocketChannel ch = javaChannel().accept();

if (ch != null) {
    buf.add(new NioSocketChannel(this, ch));
    return 1;
}

比较简单, 就是把accpet的java.nio.channels.SocketChannel封装为netty的NioSocketChannel对象
值得注意的是, NioSocketChannel的构造过程中, 会调用到父类AbstractNioByteChannel如下构造方法:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

主要之前说过的readInterestOp属性,这里NioSocketChannel关注的事件是OP_READ

接下来,会触发pipeline事件,
这里的pipeline.fireChannelRead事件比较重要(pipeline机制后面会讲), 他会触发ServerBootstrapAcceptor.channelRead方法(前面说过了, channl注册时会把ServerBootstrapAcceptor添加到pipeline中):

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    // 添加用户定义的Handler到pipeline
    child.pipeline().addLast(childHandler);

    // 注册NioSocketChannel到childGroup
    childGroup.register(child).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                forceClose(child, future.cause());
            }
        }
    });
}

这里把NioSocketChannel注册到EventLoop中, 注册过程前面说过了。
要注意的是,这里注册的loop是childGroup,注册的事件是OP_READ。到这里,childGroup总算启动了。