dubbo zookeeper注册

2018-02-08 22:27:13

源码分析基于dubbo 2.6.0

前面说过了,RegistryProtocol负责注册工作。

RegistryProtocol实现了Protocol, export负责暴露接口,refer负责引用接口。

暴露接口

dubbo server启动时会调用RegistryProtocol.export来暴露接口。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 启动server端网络通讯服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    URL registryUrl = getRegistryUrl(originInvoker);

    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    boolean register = registedProviderUrl.getParameter("register", true);

    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
    // 注册
    if (register) {
        register(registryUrl, registedProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // 订阅
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    // 订阅url
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);


    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new Exporter<T>() {
        ...
    };
}

(doLocalExport会启动server端网络通讯服务。以后会讲到)

注册

public void register(URL registryUrl, URL registedProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
}

registryUrl的格式为:zookeeper://(zk ip):(zk port)/com.alibaba.dubbo.registry.RegistryService, registryFactory通过registryUrl获取对应的Registry实现类。

registedProviderUrl格式为dubbo://(server ip):(server port)/com.dubbo.start.service.HelloService?anyhost=true&methods=hello2,hello...,这个才是真正注册的内容。HelloService是我们要暴露的接口。

Registry接口有FailbackRegistry, RedisRegistry,ZookeeperRegistry等实现。

这里来看看ZookeeperRegistry.doRegister

protected void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

注册接口就是创建对应的zk节点,节点路径为为/dubbo/com.dubbo.start.service.HelloService/providers/...

订阅

在看看订阅操作

protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            ...
        } else {
            List<URL> urls = new ArrayList<URL>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);

                // 获取ChildListener
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                // 创建configurators路径
                zkClient.create(path, false);
                // 添加ChildListener
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 通知NotifyListener
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

订阅要完成几个操作

  1. 创建configurators路径
  2. 添加configurators路径的ChildListener
  3. 通知NotifyListener

这里configurators节点路径格式为/dubbo/com.dubbo.start.service.HelloService/configurators

引用接口

dubbo client启动时会调用RegistryProtocol.doRefer方法来引用接口。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    //订阅
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));
    // cluster生成不同集群容错策略的invoker                        
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
    return invoker;
}

先看看RegistryDirectory.subscribe

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}

注意一下,registry.subscribe(url, this)第二个参数 为this,这里接受的参数类型是NotifyListener,所以RegistryDirectory也实现了NotifyListener。
上文说过了,subscribe方法会调用RegistryDirectory.notify方法,notify方法调用了关键的refreshInvoker方法, 该方法会启动client端网络通讯服务。

cluster定义了集群容错策略, 不同的策略会生成不同的invoker,如FailoverCluster

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<T>(directory);
}