在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
各节点关系:
这里的 Invoker
是 Provider
的一个可调用 Service
的抽象,Invoker
封装了 Provider
地址及 Service
接口信息
Directory
代表多个 Invoker
,可以把它看成 List<Invoker>
,但与 List
不同的是,它的值可能是动态变化的,比如注册中心推送变更
Cluster
将 Directory
中的多个 Invoker
伪装成一个 Invoker
,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
Router
负责从多个 Invoker
中按路由规则选出子集,比如读写分离,应用隔离等
LoadBalance
负责从多个 Invoker
中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选
cluster层 集群模式配置 按照以下示例在服务提供方和消费方配置集群模式
1 <dubbo:service cluster ="failsafe" />
或
1 <dubbo:reference cluster ="failsafe" />
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @SPI(FailoverCluster.NAME) public interface Cluster { @Adaptive <T> Invoker<T> join (Directory<T> directory) throws RpcException ; }
Failover Cluster failover失败自动切换,当出现失败,重试其它服务器 1 。通常用于读操作,但重试会带来更长延迟。可通过 retries="2"
来设置重试次数(不含第一次)。
重试次数配置如下:
1 <dubbo:service retries ="2" />
或
1 <dubbo:reference retries ="2" />
或
1 2 3 <dubbo:reference > <dubbo:method name ="findFoo" retries ="2" /> </dubbo:reference >
1 2 3 4 5 6 7 8 9 public class FailoverCluster implements Cluster { public final static String NAME = "failover" ; public <T> Invoker<T> join (Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
当服务被调用的时候,将会使用FailoverClusterInvoker
Failfast Cluster failfast快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
使用FailfastClusterInvoker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class FailfastClusterInvoker <T > extends AbstractClusterInvoker <T > { public FailfastClusterInvoker (Directory<T> directory) { super (directory); } public Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null ); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0 , "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } }
Failsafe Cluster failsafe失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
使用FailsafeClusterInvoker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class FailsafeClusterInvoker <T > extends AbstractClusterInvoker <T > { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker (Directory<T> directory) { super (directory); } public Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null ); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); } } }
Failback Cluster failback失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
使用FailbackClusterInvoker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class FailbackClusterInvoker <T > extends AbstractClusterInvoker <T > { private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); private static final long RETRY_FAILED_PERIOD = 5 * 1000 ; private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2 , new NamedThreadFactory("failback-cluster-timer" , true )); private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); private volatile ScheduledFuture<?> retryFuture; public FailbackClusterInvoker (Directory<T> directory) { super (directory); } private void addFailed (Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null ) { synchronized (this ) { if (retryFuture == null ) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run () { try { retryFailed(); } catch (Throwable t) { logger.error("Unexpected error occur at collect statistic" , t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } failed.put(invocation, router); } void retryFailed () { if (failed.size() == 0 ) { return ; } for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { invoker.invoke(invocation); failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again." , e); } } } protected Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null ); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", " , e); addFailed(invocation, this ); return new RpcResult(); } } }
Forking Cluster 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2"
来设置最大并行数。
使用ForkingClusterInvoker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class ForkingClusterInvoker <T > extends AbstractClusterInvoker <T > { private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer" , true )); public ForkingClusterInvoker (Directory<T> directory) { super (directory); } @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke (final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); final List<Invoker<T>> selected; final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); for (int i = 0 ; i < forks; i++) { Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) { selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); for (final Invoker<T> invoker : selected) { executor.execute(new Runnable() { public void run () { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } } }); } try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0 , "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } } }
Broadcast Cluster broadcast 广播调用所有提供者,逐个调用,任意一台报错则报错 2 。通常用于通知所有提供者更新缓存或日志等本地资源信息。
使用BroadcastClusterInvoker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class BroadcastClusterInvoker <T > extends AbstractClusterInvoker <T > { private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker (Directory<T> directory) { super (directory); } @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke (final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null ; Result result = null ; for (Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null ) { throw exception; } return result; } }
Available Cluster available选取第一个可用的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class AvailableClusterInvoker <T > extends AbstractClusterInvoker <T > { public AvailableClusterInvoker (Directory<T> directory) { super (directory); } public Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }
Mergeable Cluster 搜索所有分组
1 <dubbo:reference interface ="com.xxx.MenuService" group ="*" merger ="true" />
合并指定分组
1 <dubbo:reference interface ="com.xxx.MenuService" group ="aaa,bbb" merger ="true" />
指定方法合并结果,其它未指定的方法,将只调用一个 Group
1 2 3 <dubbo:reference interface ="com.xxx.MenuService" group ="*" > <dubbo:method name ="getMenuItems" merger ="true" /> </dubbo:service >
某个方法不合并结果,其它都合并结果
1 2 3 <dubbo:reference interface ="com.xxx.MenuService" group ="*" merger ="true" > <dubbo:method name ="getMenuItems" merger ="false" /> </dubbo:service >
指定合并策略,缺省根据返回值类型自动匹配,如果同一类型有两个合并器时,需指定合并器的名称 2
1 2 3 <dubbo:reference interface ="com.xxx.MenuService" group ="*" > <dubbo:method name ="getMenuItems" merger ="mymerge" /> </dubbo:service >
指定合并方法,将调用返回结果的指定方法进行合并,合并方法的参数类型必须是返回结果类型本身
1 2 3 <dubbo:reference interface ="com.xxx.MenuService" group ="*" > <dubbo:method name ="getMenuItems" merger =".addAll" /> </dubbo:service >
mergeable 合并分组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 public class MergeableClusterInvoker <T > implements Invoker <T > { private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class); private final Directory<T> directory; private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor" , true )); public MergeableClusterInvoker (Directory<T> directory) { this .directory = directory; } @SuppressWarnings("rawtypes") public Result invoke (final Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); if (ConfigUtils.isEmpty(merger)) { for (final Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } return invokers.iterator().next().invoke(invocation); } Class<?> returnType; try { returnType = getInterface().getMethod( invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); } catch (NoSuchMethodException e) { returnType = null ; } Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); for (final Invoker<T> invoker : invokers) { Future<Result> future = executor.submit(new Callable<Result>() { public Result call () throws Exception { return invoker.invoke(new RpcInvocation(invocation, invoker)); } }); results.put(invoker.getUrl().getServiceKey(), future); } Object result = null ; List<Result> resultList = new ArrayList<Result>(results.size()); int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); for (Map.Entry<String, Future<Result>> entry : results.entrySet()) { Future<Result> future = entry.getValue(); try { Result r = future.get(timeout, TimeUnit.MILLISECONDS); if (r.hasException()) { log.error(new StringBuilder(32 ).append("Invoke " ) .append(getGroupDescFromServiceKey(entry.getKey())) .append(" failed: " ) .append(r.getException().getMessage()).toString(), r.getException()); } else { resultList.add(r); } } catch (Exception e) { throw new RpcException(new StringBuilder(32 ) .append("Failed to invoke service " ) .append(entry.getKey()) .append(": " ) .append(e.getMessage()).toString(), e); } } if (resultList.isEmpty()) { return new RpcResult((Object) null ); } else if (resultList.size() == 1 ) { return resultList.iterator().next(); } if (returnType == void .class) { return new RpcResult((Object) null ); } if (merger.startsWith("." )) { merger = merger.substring(1 ); Method method; try { method = returnType.getMethod(merger, returnType); } catch (NoSuchMethodException e) { throw new RpcException(new StringBuilder(32 ) .append("Can not merge result because missing method [ " ) .append(merger) .append(" ] in class [ " ) .append(returnType.getClass().getName()) .append(" ]" ) .toString()); } if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true ); } result = resultList.remove(0 ).getValue(); try { if (method.getReturnType() != void .class && method.getReturnType().isAssignableFrom(result.getClass())) { for (Result r : resultList) { result = method.invoke(result, r.getValue()); } } else { for (Result r : resultList) { method.invoke(result, r.getValue()); } } } catch (Exception e) { throw new RpcException( new StringBuilder(32 ) .append("Can not merge result: " ) .append(e.getMessage()).toString(), e); } } else { Merger resultMerger; if (ConfigUtils.isDefault(merger)) { resultMerger = MergerFactory.getMerger(returnType); } else { resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null ) { List<Object> rets = new ArrayList<Object>(resultList.size()); for (Result r : resultList) { rets.add(r.getValue()); } result = resultMerger.merge( rets.toArray((Object[]) Array.newInstance(returnType, 0 ))); } else { throw new RpcException("There is no merger to merge result." ); } } return new RpcResult(result); } public Class<T> getInterface () { return directory.getInterface(); } public URL getUrl () { return directory.getUrl(); } public boolean isAvailable () { return directory.isAvailable(); } public void destroy () { directory.destroy(); } private String getGroupDescFromServiceKey (String key) { int index = key.indexOf("/" ); if (index > 0 ) { return new StringBuilder(32 ).append("group [ " ) .append(key.substring(0 , index)).append(" ]" ).toString(); } return key; } }