dubbo源码解析(九) Filter & Listener

Filter解析

通过Filter接口我们可以轻松地实现服务提供方和消费方的拦截

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SPI
public interface Filter {

/**
* do invoke filter.
* <p>
* <code>
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
* </code>
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;

}

我们注意到ReferenceConfig和ServiceConfig的共同父类AbstractInterfaceConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// filter
protected String filter;
//key为reference.filter
@Parameter(key = Constants.REFERENCE_FILTER_KEY, append = true)
public String getFilter() {
return filter;
}

//Parameter作用是将filter解析并放入key为service.filter的url
@Parameter(key = Constants.SERVICE_FILTER_KEY, append = true)
public String getFilter() {
return super.getFilter();
}

//xxx,yyy
public void setFilter(String filter) {
checkMultiExtension(Filter.class, "filter", filter);
this.filter = filter;
}

自定义调用拦截使用方法

服务提供方和服务消费方调用过程拦截,Dubbo 本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,请注意对性能的影响。

约定:

  • 用户自定义 filter 默认在内置 filter 之后。
  • 特殊值 default,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy",表示 xxx 在缺省 filter 之前,yyy 在缺省 filter 之后。
  • 特殊符号 -,表示剔除。比如:filter="-foo1",剔除添加缺省扩展点 foo1。比如:filter="-default",剔除添加所有缺省扩展点。
  • provider 和 service 同时配置的 filter 时,累加所有 filter,而不是覆盖。比如:<dubbo:provider filter="xxx,yyy"/><dubbo:service filter="aaa,bbb" />,则 xxx,yyy,aaa,bbb 均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />

扩展接口

com.alibaba.dubbo.rpc.Filter

扩展配置

1
2
3
4
5
6
7
8
<!-- 消费方调用过程拦截 -->
<dubbo:reference filter="xxx,yyy" />
<!-- 消费方调用过程缺省拦截器,将拦截所有reference -->
<dubbo:consumer filter="xxx,yyy"/>
<!-- 提供方调用过程拦截 -->
<dubbo:service filter="xxx,yyy" />
<!-- 提供方调用过程缺省拦截器,将拦截所有service -->
<dubbo:provider filter="xxx,yyy"/>

已知扩展

  • com.alibaba.dubbo.rpc.filter.EchoFilter
  • com.alibaba.dubbo.rpc.filter.GenericFilter
  • com.alibaba.dubbo.rpc.filter.GenericImplFilter
  • com.alibaba.dubbo.rpc.filter.TokenFilter
  • com.alibaba.dubbo.rpc.filter.AccessLogFilter
  • com.alibaba.dubbo.rpc.filter.CountFilter
  • com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
  • com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
  • com.alibaba.dubbo.rpc.filter.ContextFilter
  • com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
  • com.alibaba.dubbo.rpc.filter.ExceptionFilter
  • com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
  • com.alibaba.dubbo.rpc.filter.DeprecatedFilter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter

扩展示例

Maven 项目结构:

1
2
3
4
5
6
7
8
9
10
src
|-main
|-java
|-com
|-xxx
|-XxxFilter.java (实现Filter接口)
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.Filter (纯文本文件,内容为:xxx=com.xxx.XxxFilter)

XxxFilter.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.xxx;

import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;

public class XxxFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// before filter ...
Result result = invoker.invoke(invocation);
// after filter ...
return result;
}
}

META-INF/dubbo/com.alibaba.dubbo.rpc.Filter:

1
xxx=com.xxx.XxxFilter

具体是如何调用的呢?

ProtocolFilterWrapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class ProtocolFilterWrapper implements Protocol {

private final Protocol protocol;

public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//通过配置拿到所有filter
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {

public Class<T> getInterface() {
return invoker.getInterface();
}

public URL getUrl() {
return invoker.getUrl();
}

public boolean isAvailable() {
return invoker.isAvailable();
}

public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}

public void destroy() {
invoker.destroy();
}

@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}

public int getDefaultPort() {
return protocol.getDefaultPort();
}
//服务暴露
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//通过读取service.filter构建filter链
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
//服务引用
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url),
//通过读取reference.filter配置构建filter链
Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

public void destroy() {
protocol.destroy();
}

}

Listener解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SPI
public interface InvokerListener {

/**
* The invoker referred
*
* @param invoker
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#refer(Class, URL)
*/
void referred(Invoker<?> invoker) throws RpcException;

/**
* The invoker destroyed.
*
* @param invoker
* @see com.alibaba.dubbo.rpc.Invoker#destroy()
*/
void destroyed(Invoker<?> invoker);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SPI
public interface ExporterListener {

/**
* The exporter exported.
*
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#export(Invoker)
*/
void exported(Exporter<?> exporter) throws RpcException;

/**
* The exporter unexported.
*
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Exporter#unexport()
*/
void unexported(Exporter<?> exporter);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected String listener;
//invoker.listener
@Parameter(key = Constants.INVOKER_LISTENER_KEY, append = true)
public String getListener() {
checkMultiExtension(InvokerListener.class, "listener", listener);
return listener;
}

//exporter.listener
@Parameter(key = Constants.EXPORTER_LISTENER_KEY, append = true)
public String getListener() {
return super.getListener();
}

public void setListener(String listener) {
this.listener = listener;
}

引用监听扩展

扩展说明

当有服务引用时,触发该事件。

扩展接口

com.alibaba.dubbo.rpc.InvokerListener

扩展配置

1
2
3
4
<!-- 引用服务监听 -->
<dubbo:reference listener="xxx,yyy" />
<!-- 引用服务缺省监听器 -->
<dubbo:consumer listener="xxx,yyy" />

已知扩展

com.alibaba.dubbo.rpc.listener.DeprecatedInvokerListener

扩展示例

Maven 项目结构:

1
2
3
4
5
6
7
8
9
10
src
|-main
|-java
|-com
|-xxx
|-XxxInvokerListener.java (实现InvokerListener接口)
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.InvokerListener (纯文本文件,内容为:xxx=com.xxx.XxxInvokerListener)

XxxInvokerListener.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.xxx;

import com.alibaba.dubbo.rpc.InvokerListener;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;

public class XxxInvokerListener implements InvokerListener {
public void referred(Invoker<?> invoker) throws RpcException {
// ...
}
public void destroyed(Invoker<?> invoker) throws RpcException {
// ...
}
}

META-INF/dubbo/com.alibaba.dubbo.rpc.InvokerListener:

1
xxx=com.xxx.XxxInvokerListener

暴露监听扩展

扩展说明

当有服务暴露时,触发该事件。

扩展接口

com.alibaba.dubbo.rpc.ExporterListener

扩展配置

1
2
3
4
<!-- 暴露服务监听 -->
<dubbo:service listener="xxx,yyy" />
<!-- 暴露服务缺省监听器 -->
<dubbo:provider listener="xxx,yyy" />

已知扩展

com.alibaba.dubbo.registry.directory.RegistryExporterListener

扩展示例

Maven 项目结构:

1
2
3
4
5
6
7
8
9
10
src
|-main
|-java
|-com
|-xxx
|-XxxExporterListener.java (实现ExporterListener接口)
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.ExporterListener (纯文本文件,内容为:xxx=com.xxx.XxxExporterListener)

XxxExporterListener.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.xxx;

import com.alibaba.dubbo.rpc.ExporterListener;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.RpcException;


public class XxxExporterListener implements ExporterListener {
public void exported(Exporter<?> exporter) throws RpcException {
// ...
}
public void unexported(Exporter<?> exporter) throws RpcException {
// ...
}
}

META-INF/dubbo/com.alibaba.dubbo.rpc.ExporterListener:

1
xxx=com.xxx.XxxExporterListener

ProtocolListenerWrapper解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ProtocolListenerWrapper implements Protocol {

private final Protocol protocol;

public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}

public int getDefaultPort() {
return protocol.getDefaultPort();
}
//暴露
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
//exporter.listener
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
//引用
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
//invoker.listener
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}

public void destroy() {
protocol.destroy();
}

}

ListenerExporterWrapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class ListenerExporterWrapper<T> implements Exporter<T> {

private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);

private final Exporter<T> exporter;

private final List<ExporterListener> listeners;

public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}

public Invoker<T> getInvoker() {
return exporter.getInvoker();
}

public void unexport() {
try {
exporter.unexport();
} finally {
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.unexported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}

}

ListenerInvokerWrapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ListenerInvokerWrapper<T> implements Invoker<T> {

private static final Logger logger = LoggerFactory.getLogger(ListenerInvokerWrapper.class);

private final Invoker<T> invoker;

private final List<InvokerListener> listeners;

public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
if (invoker == null) {
throw new IllegalArgumentException("invoker == null");
}
this.invoker = invoker;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.referred(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}

public Class<T> getInterface() {
return invoker.getInterface();
}

public URL getUrl() {
return invoker.getUrl();
}

public boolean isAvailable() {
return invoker.isAvailable();
}

public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}

@Override
public String toString() {
return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
}

public void destroy() {
try {
invoker.destroy();
} finally {
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.destroyed(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
}

}