registry层
通过dubbo解析一我们分析了整个ServiceConfig,留下了RegistryProtocol这个没有分析,接下来我们来分析分析RegistryProtocol
在ServiceConfig的loadRegistries方法里面,当spring容器初始化,服务提供方(消费方也差不多)暴露过程,就会拼接生成registryURL
registryURL:
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=9966&qos.port=22222®istry=zookeeper×tamp=1522650082230
RegistryProtocol
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public <T> Exporter<T> export (final Invoker<T> originInvoker) throws RpcException { final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); boolean register = registedProviderUrl.getParameter("register" , true ); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true ); } final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private <T> ExporterChangeableWrapper<T> doLocalExport (final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null ) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null ) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
protocol和registryFactory这里又是啥呢?我们看下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static RegistryProtocol getRegistryProtocol () { if (INSTANCE == null ) { ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); } return INSTANCE; } public void setProtocol (Protocol protocol) { this .protocol = protocol; } public void setRegistryFactory (RegistryFactory registryFactory) { this .registryFactory = registryFactory; }
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter (ZookeeperTransporter zookeeperTransporter) { this .zookeeperTransporter = zookeeperTransporter; } public Registry createRegistry (URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } }
ZookeeperRegistry
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private final ZookeeperClient zkClient;public ZookeeperRegistry (URL url, ZookeeperTransporter zookeeperTransporter) { super (url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null" ); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this .root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged (int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
注册,父类的register最终调用了子类的doRegister
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected void doRegister (URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true )); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } private String toUrlPath (URL url) { return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); }
订阅,registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);,先缓存到
1 ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
里面最终调用了doSubscribe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 protected void doSubscribe (final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null ) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null ) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged (String parentPath, List<String> currentChilds) { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false )), listener); } } } }); zkListener = listeners.get(listener); } zkClient.create(root, false ); List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false )), listener); } } } else { List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null ) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null ) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged (String parentPath, List<String> currentChilds) { ZookeeperRegistry.this .notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false ); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null ) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
override协议:
禁用提供者:(通常用于临时踢除某台提供者机器,相似的,禁止消费者访问请使用路由规则)
1 override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&disbaled=true
调整权重:(通常用于容量评估,缺省权重为 100)
1 override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&weight=200
调整负载均衡策略:(缺省负载均衡策略为 random)
1 override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&loadbalance=leastactive
服务降级:(通常用于临时屏蔽某个出错的非关键服务)
1 override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null
0.0.0.0表示所有的host
OverrideListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private class OverrideListener implements NotifyListener { private final URL subscribeUrl; private final Invoker originInvoker; public OverrideListener (URL subscribeUrl, Invoker originalInvoker) { this .subscribeUrl = subscribeUrl; this .originInvoker = originalInvoker; } public synchronized void notify (List<URL> urls) { logger.debug("original override urls: " + urls); List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl); logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls); if (matchedUrls.isEmpty()) { return ; } List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls); final Invoker<?> invoker; if (originInvoker instanceof InvokerDelegete) { invoker = ((InvokerDelegete<?>) originInvoker).getInvoker(); } else { invoker = originInvoker; } URL originUrl = RegistryProtocol.this .getProviderUrl(invoker); String key = getCacheKey(originInvoker); ExporterChangeableWrapper<?> exporter = bounds.get(key); if (exporter == null ) { logger.warn(new IllegalStateException("error state, exporter should not be null" )); return ; } URL currentUrl = exporter.getInvoker().getUrl(); URL newUrl = getConfigedInvokerUrl(configurators, originUrl); if (!currentUrl.equals(newUrl)) { RegistryProtocol.this .doChangeLocalExport(originInvoker, newUrl); logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl); } }
合并override协议和absent协议过程:
List configurators = RegistryDirectory.toConfigurators(matchedUrls); 先获取到所有的Configurator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static List<Configurator> toConfigurators (List<URL> urls) { if (urls == null || urls.isEmpty()) { return Collections.emptyList(); } List<Configurator> configurators = new ArrayList<Configurator>(urls.size()); for (URL url : urls) { if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) { configurators.clear(); break ; } Map<String, String> override = new HashMap<String, String>(url.getParameters()); override.remove(Constants.ANYHOST_KEY); if (override.size() == 0 ) { configurators.clear(); continue ; } configurators.add(configuratorFactory.getConfigurator(url)); } Collections.sort(configurators); return configurators; }
原始url和Configurators进行合并
1 2 3 4 5 6 private URL getConfigedInvokerUrl (List<Configurator> configurators, URL url) { for (Configurator configurator : configurators) { url = configurator.configure(url); } return url; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public URL configure (URL url) { if (configuratorUrl == null || configuratorUrl.getHost() == null || url == null || url.getHost() == null ) { return url; } if (configuratorUrl.getPort() != 0 ) { if (url.getPort() == configuratorUrl.getPort()) { return configureIfMatch(url.getHost(), url); } } else { if (url.getParameter(Constants.SIDE_KEY, Constants.PROVIDER).equals(Constants.CONSUMER)) { return configureIfMatch(NetUtils.getLocalHost(), url); } else if (url.getParameter(Constants.SIDE_KEY, Constants.CONSUMER).equals(Constants.PROVIDER)) { return configureIfMatch(Constants.ANYHOST_VALUE, url); } } return url; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private URL configureIfMatch (String host, URL url) { if (Constants.ANYHOST_VALUE.equals(configuratorUrl.getHost()) || host.equals(configuratorUrl.getHost())) { String configApplication = configuratorUrl.getParameter(Constants.APPLICATION_KEY, configuratorUrl.getUsername()); String currentApplication = url.getParameter(Constants.APPLICATION_KEY, url.getUsername()); if (configApplication == null || Constants.ANY_VALUE.equals(configApplication) || configApplication.equals(currentApplication)) { Set<String> condtionKeys = new HashSet<String>(); condtionKeys.add(Constants.CATEGORY_KEY); condtionKeys.add(Constants.CHECK_KEY); condtionKeys.add(Constants.DYNAMIC_KEY); condtionKeys.add(Constants.ENABLED_KEY); for (Map.Entry<String, String> entry : configuratorUrl.getParameters().entrySet()) { String key = entry.getKey(); String value = entry.getValue(); if (key.startsWith("~" ) || Constants.APPLICATION_KEY.equals(key) || Constants.SIDE_KEY.equals(key)) { condtionKeys.add(key); if (value != null && !Constants.ANY_VALUE.equals(value) && !value.equals(url.getParameter(key.startsWith("~" ) ? key.substring(1 ) : key))) { return url; } } } return doConfigure(url, configuratorUrl.removeParameters(condtionKeys)); } } return url; }
url变更了,doChangeLocalExport重新暴露
1 2 3 4 5 6 7 8 9 10 private <T> void doChangeLocalExport (final Invoker<T> originInvoker, URL newInvokerUrl) { String key = getCacheKey(originInvoker); final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null ) { logger.warn(new IllegalStateException("error state, exporter should not be null" )); } else { final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl); exporter.setExporter(protocol.export(invokerDelegete)); } }
/Users/wangwenwei/.dubbo/dubbo-registry-demo-provider-localhost:2181.cache
当doSubscribe订阅失败的时候才会拿cache的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void saveProperties (URL url) { if (file == null ) { return ; } try { StringBuilder buf = new StringBuilder(); Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified != null ) { for (List<URL> us : categoryNotified.values()) { for (URL u : us) { if (buf.length() > 0 ) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); } } } properties.setProperty(url.getServiceKey(), buf.toString()); long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { doSaveProperties(version); } else { registryCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public void doSaveProperties (long version) { if (version < lastCacheChanged.get()) { return ; } if (file == null ) { return ; } try { File lockfile = new File(file.getAbsolutePath() + ".lock" ); if (!lockfile.exists()) { lockfile.createNewFile(); } RandomAccessFile raf = new RandomAccessFile(lockfile, "rw" ); try { FileChannel channel = raf.getChannel(); try { FileLock lock = channel.tryLock(); if (lock == null ) { throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties" ); } try { if (!file.exists()) { file.createNewFile(); } FileOutputStream outputFile = new FileOutputStream(file); try { properties.store(outputFile, "Dubbo Registry Cache" ); } finally { outputFile.close(); } } finally { lock.release(); } } finally { channel.close(); } } finally { raf.close(); } } catch (Throwable e) { if (version < lastCacheChanged.get()) { return ; } else { registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); } logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e); } }
注册中心扩展 扩展说明 负责服务的注册与发现。
扩展接口
com.alibaba.dubbo.registry.RegistryFactory
com.alibaba.dubbo.registry.Registry
扩展配置 1 2 3 4 5 6 <dubbo:registry id ="xxx1" address ="xxx://ip:port" /> <dubbo:service registry ="xxx1" /> <dubbo:provider registry ="xxx1" />
扩展契约 RegistryFactory.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public interface RegistryFactory { Registry getRegistry (URL url) ; }
RegistryService.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public interface RegistryService { void register (URL url) ; void unregister (URL url) ; void subscribe (URL url, NotifyListener listener) ; void unsubscribe (URL url, NotifyListener listener) ; List<URL> lookup (URL url) ; }
NotifyListener.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public interface NotifyListener { void notify (List<URL> urls) ; }
已知扩展 com.alibaba.dubbo.registry.support.dubbo.DubboRegistryFactory
扩展示例 Maven 项目结构:
1 2 3 4 5 6 7 8 9 10 11 src |-main |-java |-com |-xxx |-XxxRegistryFactory.java (实现RegistryFactory接口) |-XxxRegistry.java (实现Registry接口) |-resources |-META-INF |-dubbo |-com.alibaba.dubbo.registry.RegistryFactory (纯文本文件,内容为:xxx=com.xxx.XxxRegistryFactory)
XxxRegistryFactory.java:
1 2 3 4 5 6 7 8 9 10 11 package com.xxx;import com.alibaba.dubbo.registry.RegistryFactory;import com.alibaba.dubbo.registry.Registry;import com.alibaba.dubbo.common.URL;public class XxxRegistryFactory implements RegistryFactory { public Registry getRegistry (URL url) { return new XxxRegistry(url); } }
XxxRegistry.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.xxx;import com.alibaba.dubbo.registry.Registry;import com.alibaba.dubbo.registry.NotifyListener;import com.alibaba.dubbo.common.URL;public class XxxRegistry implements Registry { public void register (URL url) { } public void unregister (URL url) { } public void subscribe (URL url, NotifyListener listener) { } public void unsubscribe (URL url, NotifyListener listener) { } }
META-INF/dubbo/com.alibaba.dubbo.registry.RegistryFactory:
1 xxx =com.xxx.XxxRegistryFactory
比如dubbo官方给出的基于redis的注册方式
1 redis =com.alibaba.dubbo.registry.redis.RedisRegistryFactory
1 2 3 4 5 6 public class RedisRegistryFactory extends AbstractRegistryFactory { @Override protected Registry createRegistry (URL url) { return new RedisRegistry(url); } }
RedisRegistry就不具体分析了,注册的时候主要使用hset放入所有url、publish通知订阅者要订阅,订阅的时候使用hgetAll获取所有url、psubscribe进行订阅通知