dubbo server处理请求

2018-02-05 22:28:23

源码分析基于dubbo 2.6.0

前面讲到,RegistryProtocol.doLocalExport负责启动server端网络通讯服务

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    // 对Invoker进行了代理
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

这里invokerDelegete中url的格式为dubbo://(server ip):(server port)/com.dubbo.start.service.HelloService?class=com.dubbo.start.service.HelloServiceImpl&dubbo=2.6.0&interface=com.dubbo.start.service.HelloService&methods=hello2,hello&..., 所以会调用DubboProtocol,但会先经过两个装饰类ProtocolListenerWrapper,ProtocolFilterWrapper。

先看看DubboProtocol.export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // 创建DubboExporter
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    ...

    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

注意一下,这里会构建DubboExporter,并缓存到exporterMap。(后面会用到)

关键就是openServer

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

如果ExchangeServer不存在,就createServer

private ExchangeServer createServer(URL url) {
    // url 参数处理
    ...
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    ...

    return server;
}

Exchangers.bind会调用HeaderExchanger.bind(注意第二个参数DubboProtocol.requestHandler)

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

出现了一个新的概念Transporter(运输层)。

Transporters同样通过ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()找到一个Transporter实现类,默认使用netty3, 会根据url中server/transporter参数转换。

我配置了netty4,所以会使用com.alibaba.dubbo.remoting.transport.netty4包下的netty代码。
NettyTransporter

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

NettyServer构造方法

  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

先看看ChannelHandlers.wrap

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }

这里出现一个新的概念Dispatcher,它是dubbo中的调度器,默认是AllDispatcher

public ChannelHandler dispatch(ChannelHandler handler, URL url) {
    return new AllChannelHandler(handler, url);
}

再回调一下HeaderExchanger.bind方法, Transporters.bind(URL url, ChannelHandler... handlers)第二个参数ChannelHandler为new DecodeHandler(new HeaderExchangeHandler(handler)) (注意new HeaderExchangeHandler(handler)中的handler为DubboProtocol.requestHandler)
ChannelHandler是一条责任链,到这里,责任链国家完成,节点依次为 MultiMessageHandler > HeartbeatHandler > AllChannelHandler > DecodeHandler > HeaderExchangeHandler > DubboProtocol.requestHandler

NettyServer继承了AbstractServer,AbstractServer的构造方法会调用NettyServer.doOpen

    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new ServerBootstrap();
        // bossGroup中线程数为1
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

关键是netty handler的实现类NettyServerHandler。注意它的构造方法第二个参数为this,它接受的是ChannelHandler类型,就是说NettyServer也继承了ChannelHandler。

NettyServerHandler继承了netty的ChannelDuplexHandler,这里只关注channelRead方法

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

handler就是NettyServer, 他会调用上面说的ChannelHandler责任链。会调用到AllChannelHandler.received

public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        ...
    }
}

就是给ExecutorService提交一个任务,异步处理请求。

看看ChannelEventRunnable.run,会根据state进行不同的处理。但实际请求都转发到handler处理(就是继续调用责任链)。

再看看DecodeHandler.received

    public void received(Channel channel, Object message) throws RemotingException {
        // 解码
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }
        // 调用下一个节点
        handler.received(channel, message);
    }

HeaderExchangeHandler

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // 处理请求
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            // 处理响应
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            ...
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

twoWay表示需要响应的请求。这里调用handleRequest方法,将返回一个Response,最后通过channel将它send到client。

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());

    // find handler by message class.
    Object msg = req.getData();
    try {
        // handle data.
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

终于调用到DubboProtocol.requestHandler了

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;
        Invoker<?> invoker = getInvoker(channel, inv);
        ...
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        return invoker.invoke(inv);
    }
    throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}

这里也是通过invoker调用,逐渐调用到我们的业务方法。

先看看getInvoker方法

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    ...

    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

    return exporter.getInvoker();
}

很简单,从exporterMap中获取DubboExporter,再获取对应的Invoker。

这时回顾一下上文说到的DubboProtocol.export方法,它用invoker构建了DubboExporter,缓存到exporterMap中。
invoker也是责任链。那么invoker在哪里创建的呢?

这时要回到dubbo server启动,那里说到ServiceConfig.doExportUrlsFor1Protocol

if (registryURLs != null && registryURLs.size() > 0) {
    for (URL registryURL : registryURLs) {
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

        Exporter<?> exporter = protocol.export(wrapperInvoker);
    }
}

是的,invoker就是在这里创建的。(注意,参数ref就是方法的实现类,栗子中就是HelloServiceImpl)

proxyFactory默认使用的是JavassistProxyFactory。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

前面也说过了,DubboProtocol处理前,会经过两个包装类ProtocolListenerWrapper,ProtocolFilterWrapper。

ProtocolFilterWrapper.export 会调用buildInvokerChain,为每一个filter创建一个invoker,

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (filters.size() > 0) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {
                ...
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }
            };
        }
    }
    return last;
}

dubbo默认有如下

  • EchoFilter
  • ClassLoaderFilter
  • GenericFilter
  • ContextFilter
  • TraceFilter
  • TimeoutFilter
  • MonitorFilter
  • ExceptionFilter
  • ValidationFilter

再回顾一下上文的RegistryProtocol.doLocalExport方法,对Invoker进行代理包装,生成了InvokerDelegete。所以invoker责任链节点为ProtocolFilterWrapper&Invoker > RegistryProtocol&InvokerDelegete > DelegateProviderMetaDataInvoker > JavassistProxyFactory&AbstractProxyInvoker

JavassistProxyFactory中创建的invoke,会通过Wrapper调用到实际的业务方法。

com.alibaba.dubbo.common.bytecode.Wrapper也是动态生成代码,JavassistProxyFactory正是通过它调用业务方法。
看看getWrapper方法

public static Wrapper getWrapper(Class<?> c) {
    while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
        c = c.getSuperclass();

    if (c == Object.class)
        return OBJECT_WRAPPER;

    Wrapper ret = WRAPPER_MAP.get(c);
    if (ret == null) {
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c, ret);
    }
    return ret;
}

关键在makeWrapper,makeWrapper会拼凑代码字符串,再通过javassist生成代理类。
过程比较繁琐,直接看生成的代理类吧。

原接口

public interface HelloService {
    String hello(String user) ;
    String hello2(String user) ;
}

生成的Wrapper,关键的方法在invokeMethod

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{ 
    com.dubbo.start.service.HelloService w; 
    try{
        w = ((com.dubbo.start.service.HelloService)$1); 
    } catch(Throwable e){ 
        throw new IllegalArgumentException(e); 
    } 
    try{ 
        if( "hello".equals( $2 )  &&  $3.length == 1 ) {
            return ($w)w.hello((java.lang.String)$4[0]); 
        } 
        if( "hello2".equals( $2 )  &&  $3.length == 1 ) {  
            return ($w)w.hello2((java.lang.String)$4[0]);
        } 
    } catch(Throwable e) {     
        throw new java.lang.reflect.InvocationTargetException(e);  
    }
}

$1, $2, $3在javassist中表示方法参数。
可以看到,通过方法名和参数数调用对应的逻辑方法。
所以dubbo暴露的接口就尽量不要方法重构了