本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
在 dubbo-remoting-zookeeper
模块,实现了 Dubbo 对 Zookeeper 客户端的封装。在该模块中,抽象了通用的 Zookeeper Client API 接口,实现了两种 Zookeeper Client 库的接入:
基于 Apache Curator 实现。
<dubbo:registry address="zookeeper://127.0.0.1:2181" client="curator" />
|
基于 ZkClient 实现。
<dubbo:registry address="zookeeper://127.0.0.1:2181" client="zkclient" />
|
默认不配置 client
的情况下,使用 Curator 。
dubbo-remoting-zookeeper
的整体类图如下:
类图
下面,我们按照接口到实现的顺序,往下看。
2. 接口
2.1 StateListener
com.alibaba.dubbo.remoting.zookeeper.StateListener
,状态监听器接口,代码如下:
public interface StateListener {
int DISCONNECTED = 0;
int CONNECTED = 1;
int RECONNECTED = 2;
void stateChanged(int connected);
}
|
2.2 ChildListener
com.alibaba.dubbo.remoting.zookeeper.StateListener
,节点监听器接口,代码如下:
public interface ChildListener {
void childChanged(String path, List<String> children);
}
|
2.3 ZookeeperClient
com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient
,Zookeeper 客户端接口,代码如下:
public interface ZookeeperClient {
void create(String path, boolean ephemeral);
void delete(String path);
List<String> getChildren(String path);
List<String> addChildListener(String path, ChildListener listener);
void removeChildListener(String path, ChildListener listener);
void addStateListener(StateListener listener);
void removeStateListener(StateListener listener);
boolean isConnected();
void close();
URL getUrl();
}
|
- 状态相关方法
#isConnected()
#close()
#getUrl()
- 数据相关方法
#create(path, ephemeral)
#getChildren(path)
- 从 API 上,Dubbo 只使用节点的路径,而不使用节点的值(内容)。
- 监听相关方法
#addChildListener(path, listener)
#removeChildListener(path, listener)
#addStateListener(listener)
#removeStateListener(listener)
2.4 AbstractZookeeperClient
com.alibaba.dubbo.remoting.zookeeper.support.AbstractZookeeperClient
,实现 ZookeeperClient 接口,Zookeeper 客户端抽象类,实现通用的逻辑。
2.4.1 属性
public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
private final URL url;
private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
private volatile boolean closed = false;
public AbstractZookeeperClient(URL url) { this.url = url; }
@Override public URL getUrl() { return url; } }
|
🙂 见代码注释。
2.4.2 create
@Override public void create(String path, boolean ephemeral) { int i = path.lastIndexOf('/'); if (i > 0) { String parentPath = path.substring(0, i); if (!checkExists(parentPath)) { create(parentPath, false); } } if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
|
#checkExists(path)
抽象方法,节点是否存在。代码如下:
protected abstract boolean checkExists(String path);
|
#createEphemeral(path)
抽象方法,创建临时节点。代码如下:
protected abstract void createPersistent(String path);
|
#createPersistent(path)
抽象方法,创建持久节点。代码如下:
protected abstract void createEphemeral(String path);
|
2.4.3 StateListener 相关方法
@Override public void addStateListener(StateListener listener) { stateListeners.add(listener); }
@Override public void removeStateListener(StateListener listener) { stateListeners.remove(listener); }
public Set<StateListener> getSessionListeners() { return stateListeners; }
protected void stateChanged(int state) { for (StateListener sessionListener : getSessionListeners()) { sessionListener.stateChanged(state); } }
|
2.4.4 ChildListener 相关方法
@Override public List<String> addChildListener(String path, final ChildListener listener) { ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path); if (listeners == null) { childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>()); listeners = childListeners.get(path); } TargetChildListener targetListener = listeners.get(listener); if (targetListener == null) { listeners.putIfAbsent(listener, createTargetChildListener(path, listener)); targetListener = listeners.get(listener); } return addTargetChildListener(path, targetListener); }
@Override public void removeChildListener(String path, ChildListener listener) { ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path); if (listeners != null) { TargetChildListener targetListener = listeners.remove(listener); if (targetListener != null) { removeTargetChildListener(path, targetListener); } } }
|
#createTargetChildListener(path, listener)
抽象方法,创建真正的 ChildListener 对象。因为,每个 Zookeeper 的库,实现不同。代码如下:
protected abstract TargetChildListener createTargetChildListener(String path, ChildListener listener);
|
#addTargetChildListener(path, targetListener)
抽象方法,向 Zookeeper ,真正发起订阅。代码如下:
protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener);
|
#removeTargetChildListener(path, targetListener)
抽象方法,向 Zookeeper ,真正发起取消订阅。代码如下:
protected abstract void removeTargetChildListener(String path, TargetChildListener listener);
|
2.4.5 close
@Override public void close() { if (closed) { return; } closed = true; try { doClose(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
|
2.5 ZookeeperTransporter
com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter
,Zookeeper 工厂接口,代码如下:
@SPI("curator") public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url);
}
|
#connect(url)
方法,连接创建 ZookeeperClient 对象。
@SPI("curator")
注解,使用 Dubbo SPI 机制,默认使用 Curator 实现。
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
注解,使用 Dubbo SPI Adaptive 机制,根据 url
参数,加载对应的 ZookeeperTransporter 拓展实现类。
3. 基于 Curator 实现
Apache Curator ,作为 Zookeeper 客户端的库,基本是最佳的选择,在 Sharding-JDBC、Elastic-Job 等等中间都选择了 Curator 连接 Zookeeper 。
友情提示,如果对 Curator 的 API 不熟悉的胖友,推荐看下官方文档,进行下学习。
🙂 本文不会写 Curator 使用方法。
3.1 CuratorZookeeperClient
com.alibaba.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient
,实现 ZookeeperTransporter 抽象类,基于 Curator 的 Zookeeper 客户端实现类。
3.1.1 属性
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) { super(url); try { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } client = builder.build(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
|
- 在连接状态发生变化时,调用
#stateChange(state)
方法,进行 StateListener 的回调。
3.1.2 create 相关方法
3.1.3 ChildListener 相关方法
public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { return new CuratorWatcherImpl(listener); }
public List<String> addTargetChildListener(String path, CuratorWatcher listener) { try { return client.getChildren().usingWatcher(listener).forPath(path); } catch (NoNodeException e) { return null; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
public void removeTargetChildListener(String path, CuratorWatcher listener) { ((CuratorWatcherImpl) listener).unwatch(); }
private class CuratorWatcherImpl implements CuratorWatcher {
private volatile ChildListener listener;
public CuratorWatcherImpl(ChildListener listener) { this.listener = listener; }
public void unwatch() { this.listener = null; }
@Override public void process(WatchedEvent event) throws Exception { if (listener != null) { String path = event.getPath() == null ? "" : event.getPath(); listener.childChanged(path, StringUtils.isNotEmpty(path) ? client.getChildren().usingWatcher(this).forPath(path) : Collections.<String>emptyList()); } } }
|
3.1.4 close 相关方法
3.1.5 delete
@Override public void delete(String path) { try { client.delete().forPath(path); } catch (NoNodeException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
|
无法通用,所以子类实现。
3.1.6 getChildren
@Override public List<String> getChildren(String path) { try { return client.getChildren().forPath(path); } catch (NoNodeException e) { return null; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
|
3.2 CuratorZookeeperTransporter
com.alibaba.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
,实现 ZookeeperTransporter 接口,CuratorZookeeper 工厂实现类。
public class CuratorZookeeperTransporter implements ZookeeperTransporter {
public ZookeeperClient connect(URL url) { return new CuratorZookeeperClient(url); }
}
|
4. 基于 ZkClient 实现
🙂 基于 ZkClient 的实现,本文仅列出标题。感兴趣的胖友,可以自己去看。
当然,良心如笔者,已经添加了中文注释。
4.1 ZkclientZookeeperClient
com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperClient
4.1.1 ZkClientWrapper
com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkClientWrapper
4.2 ZkclientZookeeperTransporter
com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter
666. 彩蛋
知识星球
小文一篇。
发现,即使很简单的文章,例如本文,也要写 3 小时左右。