netty启动过程分析

2017-12-09 18:25:05

源码分析基于netty 4

接口结构

evenloop

先看看evenloop的接口
evenloop.png

除了jdk原生的ScheduledExecutorService/ExecutorService/Executor接口, netty主要实现了两大接口体系:EventExecutor, EventLoop

EventExecutor继承了ScheduledExecutorService,负责处理netty产生的task,它扩展了submit, inEventLoop等方法
EventLoop是核心接口,loop表示循环,一旦一个Channel 注册到loop, 它将处理Channel所以的I/O操作。

以grop结尾的接口,通过next方法返回一个实际操作的接口

先简单看看evenloop的关键实现
SingleThreadEventExecutor是EventExecutor的一个简单的实现类,在一个线程中执行所有提交的task。

SingleThreadEventExecutor.execute将执行一个task

    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        ...
    }

inEventLoop判断当前线程是否为loop的线程
如果inEventLoop,直接把task添加到队列中
否则先调用startThread, 该方法如果检测到当前loop线程没启动,会启动loop线程

SingleThreadEventExecutor定义了一个关键的抽象方法run(), 该run方法负责进行loop循环, 处理各种事件/task, NioEventLoop主要的逻辑就在run方法中

Channel

在看看Channel的接口结构
channel.png

图中没有展示,channel中还定义了一个重要的接口Unsafe
Unsafe定义了一些不提供给用户调用的方法, 这些方法实现了实际的网络运输操作
AbstractNioChannel中也定义了NioUnsafe接口, 扩展了一些方法

AbstractNioChannel分为两个不同的体系, AbstractNioByteChannel和 AbstractNioMessageChannel
AbstractNioByteChannel 负责底层的字节操作
可以看一下AbstractNioByteChannel.NioMessageUnsafe 中的read, doWrite
AbstractNioMessageChannel中的read方法, 通过模板方法doReadMessages(readBuf)将缓存区交给子类处理

实例

一个netty实现的tcp server端小栗子


EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup(1);

ServerBootstrap b = new ServerBootstrap();

b.group(parentGroup, childGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 100)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            public void initChannel(SocketChannel ch) throws Exception {
                ...
            }
        });

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();

代码不完整, 但已经足够描述问题

源码分析

Reactor

注意上面栗子创建了两个EventLoopGroup, 他们的责任不同, 注意观察他们的工作

Netty基于Multiple Reactors模式。Mutilple Reactors模式有多个reactor:mainReactor和subReactor,其中mainReactor负责客户端的连接请求,并将请求转交给subReactor,后由subReactor负责相应通道的IO请求,非IO请求(具体逻辑处理)的task则会直接写入队列,等待worker threads进行处理。
Multi-reactors3.png.jpeg
Netty的线程模型基于Multiple Reactors模式,借用了mainReactor和subReactor的结构,但它并没有Thread Pool。Netty的subReactor与worker thread是同一个线程。
parentGroup就是mainReactor。
childGroup就是subReactor + Thread Pool。

childGroup聚合在ServerBootstrap.childGroup中
parentGroup聚合在ServerBootstrap.group中

AbstractBootstrap.doBind

final ChannelFuture regFuture = initAndRegister();

if (regFuture.isDone()) {
    ChannelPromise promise = channel.newPromise();
    doBind0(regFuture, channel, localAddress, promise);
    return promise;
} else {
    ...
}

initAndRegister

initAndRegister负责初始化channle, 并注册到loop中

final ChannelFuture initAndRegister() {
    Channel channel = null;
    channel = channelFactory.newChannel();
    init(channel);

    ChannelFuture regFuture = config().group().register(channel);
}

AbstractBootstrap.channelFactory为ReflectiveChannelFactory对象,他通过反射生成NioServerSocketChannel对象(NioServerSocketChannel对象由ServerBootstrap.channel(NioServerSocketChannel.class)配置)

AbstractNioChannel中有一个int readInterestOp属性, 会影响到channle注册的事件
注意一下NioServerSocketChannel的构造方法:

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

这里的readInterestOp是OP_ACCEPT,

初始化

init(channel)会调用到子类ServerBootstrap

void init(Channel channel) throws Exception {
    // 初始化options和attr
    ...

    ChannelPipeline p = channel.pipeline();
    p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // ch.eventLoop()是parentGroup
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

}

这里给ChannelPipeline添加了ChannelInitializer,ChannelInitializer是一个特殊的handler, 它被添加后就会执行initChannel方法,所以这里工作 主要是给ChannelPipeline添加了ServerBootstrapAcceptor, 注意, ServerBootstrapAcceptor构建参数中的currentChildGroup是childGroup

注册

在看看config().group()中的注册

config().group().register(channel)

注意:config().group()返回的是parent EventLoopGroup

这里会调用的SingleThreadEventLoop.register

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

unsafe()将返回AbstractUnsafe, 看看它的register方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    }
}

private void register0(ChannelPromise promise) {

    doRegister();

    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);    // 设置promise完成状态

    ...

     if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }

}

inEventLoop和execute前面已经说过了
这里将启动parentGroup了

doRegister调用到子类AbstractNioChannel.doRegister

protected void doRegister() throws Exception {
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
    return;    
}

AbstractChannel中聚合了一个eventloop, 这里是parent EventLoopGroup
AbstractNioChannel还聚合了java.nio.channels.SelectableChannel, javaChannel()正是返回该对象
NioEventLoop中聚合了java.nio.channels.Selector, eventLoop().selector正是该对象
这里实现了jdk原生的channle注册到selector, 并将this作为att, 但还没有注册关注的事件

注册事件是在beginRead中实现, 该方法将调用到AbstractNioChannel.doBeginRead

protected void doBeginRead() throws Exception {
    ...
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

selectionKey.interestOps()获取已注册的事件, 当前是0, readInterestOp在NioServerSocketChannel创建时已经被赋值为SelectionKey.OP_ACCEPT, 这里终于注册了ACCEPT事件。

绑定端口

启动的最后一步,就是绑定端口,在AbstractBootstrap.doBind0中实现

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

channel.bind会依次调用DefaultChannelPipeline.bindTailContext.bindHeadContext.bindAbstractUnsafe.bindNioServerSocketChannel.doBind(跳转过程后面会讲):

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

到此,启动完成。

关于Reactor模式, 推荐看看Doug Lea大神的《Scalable IO in Java》

参考:
Netty 4.x学习笔记 - 线程模型