Dubbo源码学习--服务发布(DubboProtocol、Exporter)

Dubbo服务发布的整体流程一文中,只是分析了服务发布的整体流程,具体的细节还没有进一步分析。本节将继续分析服务暴露的过程。在ServiceConfig中通过一句话即可暴露服务,如下:

1
Exporter<?> exporter = protocol.export(invoker);

此时Invoker对象携带的URL信息中定义的是”registry”,则此处”protocol”加载的是RegistryProtocol对象。也即调用RegistryProtocol的export方法处理Invoker。

RegistryProtocol实现了Protocol接口。Protocol接口是一个顶级接口,定义内容比较简单,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;
@SPI("dubbo")
public interface Protocol {
/**
* 获取缺省端口,当用户没有配置端口时使用。
*
* @return 缺省端口
*/
int getDefaultPort();
/**
* 暴露远程服务:<br>
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用远程服务:<br>
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 释放协议:<br>
* 1. 取消该协议所有已经暴露和引用的服务。<br>
* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
void destroy();
}

通过中文注释,很容易理解接口中定义的四个方法的功能。通过SPI注解,设置默认的实现类为DubboProtocol。实现类RegistryProtocol中方法比较多,但大多数都是private方法,public方法主要是一些setter和getter方法,还有就是Protocol接口中定义的方法实现。

在这里主要看下RegistryProtocol中export方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}

export方法的入参是Invoker实例,并且定义了final类型,表明在方法内部,Invoker实例不能被再修改。方法中首先对传入的Invoker进行暴露,具体方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}

doLocalExport方法中先检查服务是否已经暴露过,如果已经暴露则不再重复暴露,如果没有暴露,则执行暴露过程。

1
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);

执行暴露的protocol是DubboProtocol实例。和RegistryProtocol一样,DubboProtocol也实现了Protocol接口,大多数方法也是private方法。不一样的是RegistryProtocol直接实现Protocol接口,而DubboProtocol是继承AbstractProtocol抽象类,AbstractProtocol实现Protocol接口。如下所示:

1
2
3
4
5
public class RegistryProtocol implements Protocol{}
public class DubboProtocol extends AbstractProtocol{}
public abstract class AbstractProtocol implements Protocol{}

直接来看DubboProtocol中export方法,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}

在发布服务之前,先根据要发布的服务产生与服务相对应的key值。key是一个字符串类型,格式为“group/ServiceName:version:port”。具体生成方法在ProtocolUtils工具类中定义了,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
StringBuilder buf = new StringBuilder();
if (serviceGroup != null && serviceGroup.length() > 0) {
buf.append(serviceGroup);
buf.append("/");
}
buf.append(serviceName);
if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
buf.append(":");
buf.append(serviceVersion);
}
buf.append(":");
buf.append(port);
return buf.toString();
}

生成key值之后,结合Invoker和exportMap生成服务暴露,然后将生成的服务暴露作为value值放入map中,从而实现服务发布。服务暴露是用DubboExporter封装的,DubboExporter类比较简单,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.alibaba.dubbo.rpc.protocol.dubbo;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.protocol.AbstractExporter;
import java.util.Map;
public class DubboExporter<T> extends AbstractExporter<T> {
private final String key;
private final Map<String, Exporter<?>> exporterMap;
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
}
@Override
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
}

DubboExporter引入了泛型,继承了AbstractExporter抽象类,AbstractExporter又实现了Exporter接口。抽象类AbstractExporter和接口Exporter都比较简单,这里就不再叙述,请自行查阅。

DubboProtocol的export方法中需要重点看下exporterMap属性。exporterMap是在AbstractProtocol抽象类中定义的,如下:

1
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

exporterMap是Map类型,精确的说,是ConcurrentHashMap类型。ConcurrentHashMap是一个高并发的HashMap容器。发布的服务就存储在这里,通过key值可以得到服务发布DubboExporter,再根据DubboExporter的getInvoker方法得到服务调用,从而调用服务。具体的服务调用细节将在后续文章中分析,敬请期待。

以上就是Dubbo发布服务的整个过程,过程中各种封装、调用、继承、多态应用得淋漓尽致。大体的流程是:i. 应用Spring Schema机制读入provider配置文件信息;ii. 将读入后的信息组装成URL对象;iii. 根据组装后的URL对象创建服务调用Invoker;iv. 根据服务调用Invoker创建服务发布Exporter及相应key值;v. 将key值和Exporter装入ConcurrentHashMap中,实现发布服务功能。在这个过程中,Dubbo自定义的URL对象发挥着重要作用,将provider配置文件、Invoker、Exporter串联了起来。

分析完了Dubbo发布服务的过程,下文将继续分析Dubbo服务的注册过程