dubbo client启动

2018-02-08 22:32:50

源码分析基于dubbo 2.6.0

ReferenceBean继承了ReferenceConfig, 还实现了FactoryBean。
spring启动时,会通过FactoryBean.getObject创建bean。这里会调用到ReferenceConfig.createProxy

private T createProxy(Map<String, String> map) {
    ...

    if (isJvmRefer) {
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
        invoker = refprotocol.refer(interfaceClass, url);
    } else {
        // 构建url,放入到urls中
        ...

        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    return (T) proxyFactory.getProxy(invoker);
}

refprotocol.refer会调用到RegistryProtocol.refer。它会调用doRefer方法。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    ...
    // 订阅
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
    return invoker;
}

这里只关注cluster.join,因为它创建了invoker。
cluster默认为FailoverCluster

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<T>(directory);
}

但FailoverCluster处理前,要经过装饰者MockClusterWrapper

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
}

前面dubbo 注册说过了,订阅会调用到RegistryDirectory.notify,进而调用refreshInvoker方法。

private void refreshInvoker(List<URL> invokerUrls) {
    ...
    Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
    Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map

    this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
    this.urlInvokerMap = newUrlInvokerMap;
}

看看toInvokers

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    for (URL providerUrl : urls) {
        ...
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again

            invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
            if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
            }
        } else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    ;
    return newUrlInvokerMap;
}

protocol.refer会调用DubboProtocol.refer,进而启动client端的网络通讯服务(如netty)。

proxyFactory.getProxy默认通过JavassistProxyFactory实现。

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

跟server端处理请求一样,这里也有invoker责任链,节点为InvokerInvocationHandler > MockClusterInvoker > FailoverClusterInvoker

动态类Proxy由com.alibaba.dubbo.common.bytecode.getProxy(ClassLoader cl, Class<?>... ics)方法生成。跟com.alibaba.dubbo.common.bytecode.Wrapper一样,拼凑代码字符串,通过Javassist生成动态类。
这里直接看生成动态类,生成动态类的方法体为

Object[] args = new Object[1]; 
args[0] = ($w)$1; 
Object ret = handler.invoke(this, methods[0], args); 
return (java.lang.String)ret;

就是通过handler来调用业务方法。
先看看InvokerInvocationHandler

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    ...
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

Invocation是rpc调用上下文信息,包括methodName,arguments等信息。

invoker的请求调用最后会调用FailoverClusterInvoker,它会调用父类AbstractClusterInvoker

public Result invoke(final Invocation invocation) throws RpcException {
    ...

    LoadBalance loadbalance;
    // 路由
    List<Invoker<T>> invokers = list(invocation);
    // 负载均衡
    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // 调用请求
    return doInvoke(invocation, invokers, loadbalance);
}

list方法会调用AbstractDirectory.list

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    List<Invoker<T>> invokers = doList(invocation);
    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && localRouters.size() > 0) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

RegistryDirectory.doList方法通过Invocation中的methodName,Arguments等信息从methodInvokerMap中获取对应的invokers。(这里的invokers是用于发送网络请求到server进行逻辑处理,而上面说的invoker责任链是用于实现MockCluster/FailoverCluster等扩展功能,注意两者使用场合不同)

Router接口负责实现路由选择操作,Router共有三个实现MockInvokersSelector/ConditionRouter/ScriptRouter。
MockInvokersSelector是专用于处理MOCK请求的。

LoadBalance接口负责实现负载均衡,有RandomLoadBalance/LeastActiveLoadBalance/LeastActiveLoadBalance/LeastActiveLoadBalance。

AbstractClusterInvoker有FailoverClusterInvoker/FailfastClusterInvoker/FailbackClusterInvoker/AvailableClusterInvoker/…
不同的实现类支持不同的集群容错。

看看默认的FailoverClusterInvoker.doInvoke

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    // 重试次数
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    // 已调用的invoked
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        ...
        // 选择invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 调用invoker,发送网络请求
            Result result = invoker.invoke(invocation);

            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(...);
}

注意,如果是server端抛出的业务异常,不会重发请求,只有rpc异常(如连接超时),才会重发请求。