netty启动过程源码简析

2017-03-02 19:21:47

一个小栗子

EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, 与childGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(...);

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();

栗子中创建了 parentGroup和childGroup,
parentGroup负责处理请求的accept事件, 类似于reactor模式中的acceptor
childGroup负责接收请求的read、write事件,并分发给对应的处理器,类似于reactor模式中的reactor对象。

创建NioEventLoopGroup
NioEventLoopGroup实现了EventLoopGroup接口,负责对EventLoop进行管理。 NioEventLoopGroup的构造过程主要在MultithreadEventExecutorGroup构造方法中

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {                                    
    // 执行器
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    children = new EventExecutor[nThreads];
    // 选择器
    chooser = chooserFactory.newChooser(children);    
    // 创建EventLoop    
    for (int i = 0; i < nThreads; i ++) {
        children[i] = newChild(threadFactory, args);
    }
}

protected EventExecutor newChild(  
            ThreadFactory threadFactory, Object... args) throws Exception {  
      return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  
}

启动

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();

    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ...
    }
}

initAndRegister中创建并注册了Channel。

final ChannelFuture initAndRegister() {
    Channel channel = null;
    channel = channelFactory.newChannel();
    init(channel);

    ChannelFuture regFuture = config().group().register(channel);
}

channelFactory.newChannel()中的channelFactory为ReflectiveChannelFactory对象,这时通过反射生成NioServerSocketChannel对象(通过ServerBootstrap.channel(NioServerSocketChannel.class)设置)

NioServerSocketChannel是对java.nio.channels.ServerSocketChannel的封装,实现了io.netty.channel.Channel接口。

创建NioServerSocketChannel

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

DEFAULT_SELECTOR_PROVIDER为java.nio.channels.spi.SelectorProvider,
newSocket(DEFAULT_SELECTOR_PROVIDER)将创建一个ServerSocketChannel。

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);    
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    // 该readInterestOp即为NioServerSocketChannel构造方法中的SelectionKey.OP_ACCEPT
    this.readInterestOp = readInterestOp;    

    ch.configureBlocking(false);    // 设置channel为非阻塞
}

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();    // 初始化unsafe
    pipeline = newChannelPipeline();    // 初始化pipeline
}

AbstractChannel构造时创建了两个关键的属性:
unsafe为NioMessageUnsafe,通过AbstractNioMessageChannel重写的newUnsafe()创建
unsafe负责真正的操作,如register,bind,read等操作
pipeline则创建了DefaultChannelPipeline

创建pipeline

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

pipeline是一个链表,链表中的结点为ChannelHandlerContext,ChannelHandlerContext是包含ChannelHandler的上下文。

需要注意,DefaultChannelPipeline创建后,就已经包括了head,tail两个结点。

初始化channel
到此,channel已经成功创建,init(channel)将初始化channel

void init(Channel channel) {
    // 初始化options和attr
    ...

    p.addLast(new ChannelInitializer<Channel>() {    // 添加ChannelInitializer handler
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
}

这时添加了一个ChannelInitializer handler了,注意,addLast不是将handler添加到最后结点,而是倒数第二结点,tail总是最后一个结点。

注册channel
config().group().register(channel)进行了注册操作
注意config().group()返回的是parent EventLoopGroup

EventLoop继承自java.util.concurrent.ScheduledExecutorService,是一个任务执行器,可以将任务提交给它,则它来执行。

SingleThreadEventLoop实现了EventLoop,并继承自SingleThreadEventExecutor,Executor封装了任务执行细节。
nioEventLoop继承了SingleThreadEventLoop,并封装了java.nio.channels.Selector,响应其中的事件。

注册过程
MultithreadEventLoopGroup

public ChannelFuture register(Channel channel) {
    return next().register(channel);    // 通过选择器选择一个合适的EventLoop
}

SingleThreadEventLoop

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    promise.channel().unsafe().register(this, promise);
    return promise;
}

上面已经说了,channel中的unsafe对应为AbstractUnsafe,查看其register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {

        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    }
}

如果当前线程是eventLoop的执行线程,则直接在当前线程执行register0,否则将提交一个新任务到eventLoop中。

注意:调用eventLoop.execute时,如果eventLoop没有启动,将调用eventLoop。

private void register0(ChannelPromise promise) {
    doRegister();
    ...
    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
    // user may already fire events through the pipeline in the ChannelFutureListener.
    pipeline.invokeHandlerAddedIfNeeded();

// Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    pipeline.fireChannelRegistered();

    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            // This channel was registered before and autoRead() is set. This means we need to begin read
            // again so that we process inbound data.
            //
            // See https://github.com/netty/netty/issues/4805
            beginRead();
        }
    }

}

doRegister将调用到AbstractNioChannel.doRegister

protected void doRegister() throws Exception {
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
    return;    
}

需要注意,register中的ops参数为0,这时只是把select注册到channel上,将没有监听任何事件。

监听事件在beginRead()方法中实现,该方法将调用到AbstractNioChannel.doBeginRead

protected void doBeginRead() throws Exception {
    ...
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

interestOps为0,readInterestOp在NioServerSocketChannel创建时已经被赋值为SelectionKey.OP_ACCEPT,所以interestOps | readInterestOp为SelectionKey.OP_ACCEPT,这时成功设置事件监听。

绑定端口
启动的最后一步,就是绑定端口,在AbstractBootstrap.doBind0中实现

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

DefaultChannelPipeline.bind

public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}

HeadContext.bind

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
}

findContextOutbound()将找到HeadContext,HeadContext.bind

unsafe.bind(localAddress, promise);

NioMessageUnsafe.bind将调用外部类的doBind(localAddress)方法,最终调用NioServerSocketChannel.doBind

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

到此,启动完成。