dubbo源码解析(七) Cluster

在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。

image

各节点关系:

  • 这里的 InvokerProvider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息

  • Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更

  • ClusterDirectory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个

  • Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等

  • LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

cluster层

集群模式配置

按照以下示例在服务提供方和消费方配置集群模式

1
<dubbo:service cluster="failsafe" />

1
<dubbo:reference cluster="failsafe" />
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SPI(FailoverCluster.NAME)
public interface Cluster {

/**
* Merge the directory invokers to a virtual invoker.
*
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

Failover Cluster

failover失败自动切换,当出现失败,重试其它服务器 1。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。

重试次数配置如下:

1
<dubbo:service retries="2" />

1
<dubbo:reference retries="2" />

1
2
3
<dubbo:reference>
<dubbo:method name="findFoo" retries="2" />
</dubbo:reference>
1
2
3
4
5
6
7
8
9
public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}

}

当服务被调用的时候,将会使用FailoverClusterInvoker

Failfast Cluster

failfast快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

使用FailfastClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

public FailfastClusterInvoker(Directory<T> directory) {
super(directory);
}

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
//通过负载均衡选出invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
//只要失败就报错
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
}
}

Failsafe Cluster

failsafe失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

使用FailsafeClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

public FailsafeClusterInvoker(Directory<T> directory) {
super(directory);
}

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
//忽略异常,返回一个空的RpcResult
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
}
}
}

Failback Cluster

failback失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

使用FailbackClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

private static final long RETRY_FAILED_PERIOD = 5 * 1000;

private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private volatile ScheduledFuture<?> retryFuture;

public FailbackClusterInvoker(Directory<T> directory) {
super(directory);
}

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
//启动一个定时任务用来重发
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

public void run() {
// collect retry statistics
try {
retryFailed();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
failed.put(invocation, router);
}

void retryFailed() {
if (failed.size() == 0) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
invoker.invoke(invocation);
failed.remove(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
}
}
}

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
//失败记录下重试
addFailed(invocation, this);
return new RpcResult(); // ignore
}
}

}

Forking Cluster

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。

使用ForkingClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));

public ForkingClusterInvoker(Directory<T> directory) {
super(directory);
}

@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
//可能取同一个,所以去除下
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
//拿出时间较少的
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
}
}

Broadcast Cluster

broadcast 广播调用所有提供者,逐个调用,任意一台报错则报错 2。通常用于通知所有提供者更新缓存或日志等本地资源信息。

使用BroadcastClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

public BroadcastClusterInvoker(Directory<T> directory) {
super(directory);
}

@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
for (Invoker<T> invoker : invokers) {
try {
//遍历广播
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
if (exception != null) {
throw exception;
}
return result;
}

}

Available Cluster

available选取第一个可用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class AvailableClusterInvoker<T> extends AbstractClusterInvoker<T> {

public AvailableClusterInvoker(Directory<T> directory) {
super(directory);
}

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}

}

Mergeable Cluster

搜索所有分组

1
<dubbo:reference interface="com.xxx.MenuService" group="*" merger="true" />

合并指定分组

1
<dubbo:reference interface="com.xxx.MenuService" group="aaa,bbb" merger="true" />

指定方法合并结果,其它未指定的方法,将只调用一个 Group

1
2
3
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="true" />
</dubbo:service>

某个方法不合并结果,其它都合并结果

1
2
3
<dubbo:reference interface="com.xxx.MenuService" group="*" merger="true">
<dubbo:method name="getMenuItems" merger="false" />
</dubbo:service>

指定合并策略,缺省根据返回值类型自动匹配,如果同一类型有两个合并器时,需指定合并器的名称 2

1
2
3
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="mymerge" />
</dubbo:service>

指定合并方法,将调用返回结果的指定方法进行合并,合并方法的参数类型必须是返回结果类型本身

1
2
3
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger=".addAll" />
</dubbo:service>

mergeable 合并分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
public class MergeableClusterInvoker<T> implements Invoker<T> {

private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class);
private final Directory<T> directory;
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));

public MergeableClusterInvoker(Directory<T> directory) {
this.directory = directory;
}

@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
//获取merger配置
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
//不需要合并,调用第一个可用的group
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}

Class<?> returnType;
try {
//获取返回的类型
returnType = getInterface().getMethod(
invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}

Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
//遍历结果
results.put(invoker.getUrl().getServiceKey(), future);
}

Object result = null;

List<Result> resultList = new ArrayList<Result>(results.size());
//获取timeout 超时配置
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error(new StringBuilder(32).append("Invoke ")
.append(getGroupDescFromServiceKey(entry.getKey()))
.append(" failed: ")
.append(r.getException().getMessage()).toString(),
r.getException());
} else {
//获取未超时结果
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException(new StringBuilder(32)
.append("Failed to invoke service ")
.append(entry.getKey())
.append(": ")
.append(e.getMessage()).toString(),
e);
}
}

if (resultList.isEmpty()) {
return new RpcResult((Object) null);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}

if (returnType == void.class) {
return new RpcResult((Object) null);
}
//.开头则按方法合并
if (merger.startsWith(".")) {
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
throw new RpcException(new StringBuilder(32)
.append("Can not merge result because missing method [ ")
.append(merger)
.append(" ] in class [ ")
.append(returnType.getClass().getName())
.append(" ]")
.toString());
}
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
//执行.xxx获取结果
result = method.invoke(result, r.getValue());
}
} else {
//结果为空
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException(
new StringBuilder(32)
.append("Can not merge result: ")
.append(e.getMessage()).toString(),
e);
}
} else {
Merger resultMerger;
//true或者default,按类型加载
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
} else {
//指定了,则按指定加载
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType, 0)));
} else {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
}

public Class<T> getInterface() {
return directory.getInterface();
}

public URL getUrl() {
return directory.getUrl();
}

public boolean isAvailable() {
return directory.isAvailable();
}

public void destroy() {
directory.destroy();
}

private String getGroupDescFromServiceKey(String key) {
int index = key.indexOf("/");
if (index > 0) {
return new StringBuilder(32).append("group [ ")
.append(key.substring(0, index)).append(" ]").toString();
}
return key;
}
}