motan源碼分析一:服務發佈及註冊

motan源碼分析一:服務發佈及註冊

motan是新浪微博開源的服務治理框架,具體介紹請看:http://tech.sina.com.cn/i/2016-05-10/doc-ifxryhhh1869879.shtml.

本系列的文章將分析它的底層源碼,分析的源碼版本為:0.1.2。第一篇文章將以服務的發佈和註冊開始,註冊服務使用zookeeper來分析。源碼地址:https://github.com/weibocom/motan

本文涉及到的主要類和接口:MotanApiExportDemo、MotanDemoService、MotanDemoServiceImpl、ServiceConfig、RegistryConfig、ProtocolConfig、DefaultProvider、ZookeeperRegistryFactory、ZookeeperRegistry、SimpleConfigHandler、ProtocolFilterDecorator等。

1.首先來看demo源碼:MotanApiExportDemo

demo中先後創建了ServiceConfig、RegistryConfig和ProtocolConfig相關的對象,其中ServiceConfig是我們提供服務的相關配置(每個服務一個配置,例如一個服務接口一個配置,本文中的具體服務是:MotanDemoServiceImpl)、RegistryConfig是註冊中心相關的配置信息、ProtocolConfig是應用協議相關的配置(在客戶端還負責集群相關的配置)。


ServiceConfig<motandemoservice> motanDemoService = new ServiceConfig<motandemoservice>();
// 設置接口及實現類
motanDemoService.setInterface(MotanDemoService.class);//設置服務接口,客戶端在rpc調用時,會在協議中傳遞接口名稱,從而實現與具體實現類一一對應
motanDemoService.setRef(new MotanDemoServiceImpl());//設置接口實現類,實際的業務代碼

// 配置服務的group以及版本號
motanDemoService.setGroup("motan-demo-rpc");//服務所屬的組

motanDemoService.setVersion("1.0");

// 配置註冊中心直連調用
RegistryConfig registry = new RegistryConfig();
//use local registry
registry.setRegProtocol("local");
// use ZooKeeper registry
// registry.setRegProtocol("zookeeper");
// registry.setAddress("127.0.0.1:2181");
// registry.setCheck("false"); //是否檢查是否註冊成功
motanDemoService.setRegistry(registry);

// 配置RPC協議
ProtocolConfig protocol = new ProtocolConfig();
protocol.setId("motan");//使用motan應用協議
protocol.setName("motan");
motanDemoService.setProtocol(protocol);
motanDemoService.setExport("motan:8002");//本服務的監控端口號是8002
motanDemoService.export();//發佈及在zookeeper上註冊此服務
MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, true);
System.out.println("server start...");
/<motandemoservice>/<motandemoservice>

2.從上面的代碼可知ServiceConfig類是服務的發佈及註冊的核心是motanDemoService.export()方法,我們來看一下此方法的實現細節:

public synchronized void export() {
if (exported.get()) {
LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", interfaceClass.getName()));
return;
}
checkInterfaceAndMethods(interfaceClass, methods);
List registryUrls = loadRegistryUrls();//加載註冊中心的url,支持多個註冊中心
if (registryUrls == null || registryUrls.size() == 0) {
throw new IllegalStateException("Should set registry config for service:" + interfaceClass.getName());
}
Map<string> protocolPorts = getProtocolAndPort();
for (ProtocolConfig protocolConfig : protocols) {

Integer port = protocolPorts.get(protocolConfig.getId());
if (port == null) {
throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(), protocolConfig.getId()));
}
doExport(protocolConfig, port, registryUrls);//發佈服務
}
afterExport();
}
/<string>

方法中調用了doExport和afterExport方法:

private void doExport(ProtocolConfig protocolConfig, int port, List registryURLs) {
String protocolName = protocolConfig.getName();//獲取協議名稱,此處為motan
if (protocolName == null || protocolName.length() == 0) {
protocolName = URLParamType.protocol.getValue();
}
String hostAddress = host;//本機地址
if (StringUtils.isBlank(hostAddress) && basicService != null) {
hostAddress = basicService.getHost();
}
if (NetUtils.isInvalidLocalHost(hostAddress)) {
hostAddress = getLocalHostAddress(registryURLs);
}
Map<string> map = new HashMap<string>();
map.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_SERVICE);
map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
collectConfigParams(map, protocolConfig, basicService, extConfig, this);
collectMethodConfigParams(map, this.getMethods());
URL serviceUrl = new URL(protocolName, hostAddress, port, interfaceClass.getName(), map);//組裝serviceUrl信息
if (serviceExists(serviceUrl)) {//判斷服務之前是否已經加載過
LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", interfaceClass.getName(),
serviceUrl.getIdentity()));
throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ",
interfaceClass.getName(), serviceUrl.getIdentity()), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);//拋出同名服務異常
}
List urls = new ArrayList();
// injvm 協議只支持註冊到本地,其他協議可以註冊到local、remote
if (MotanConstants.PROTOCOL_INJVM.equals(protocolConfig.getId())) {

URL localRegistryUrl = null;
for (URL ru : registryURLs) {
if (MotanConstants.REGISTRY_PROTOCOL_LOCAL.equals(ru.getProtocol())) {
localRegistryUrl = ru.createCopy();
break;
}
}
if (localRegistryUrl == null) {
localRegistryUrl =
new URL(MotanConstants.REGISTRY_PROTOCOL_LOCAL, hostAddress, MotanConstants.DEFAULT_INT_VALUE,
RegistryService.class.getName());
}
urls.add(localRegistryUrl);
} else {
for (URL ru : registryURLs) {
urls.add(ru.createCopy());
}
}
for (URL u : urls) {
u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr()));
registereUrls.add(u.createCopy());
}
//使用spi機制加載SimpleConfigHandler
ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
//調用SimpleConfigHandler的export方法
exporters.add(configHandler.export(interfaceClass, ref, urls));
}


private void afterExport() {
exported.set(true);
for (Exporter ep : exporters) {
existingServices.add(ep.getProvider().getUrl().getIdentity());
}
}

/<string>/<string>

再來看一下SimpleConfigHandler的export方法

@Override
public Exporter export(Class
interfaceClass, T ref, List registryUrls) {
String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
URL serviceUrl = URL.valueOf(serviceStr);
// export service
// 利用protocol decorator來增加filter特性
String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
Protocol orgProtocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName);//對於Protocol對象增強filter
Provider provider = getProvider(orgProtocol, ref, serviceUrl, interfaceClass);//服務的代理提供者,包裝ref的服務
Protocol protocol = new ProtocolFilterDecorator(orgProtocol);
Exporter exporter = protocol.export(provider, serviceUrl);//發佈服務,將代理對象provider與具體的serviceUrl關聯
// register service
register(registryUrls, serviceUrl);
return exporter;
}

3.下面我們來看一下,motan如何對filter進行相應的增強處理

/**
*
* Decorate the protocol, to add more features.
*
* @author fishermen
* @version V1.0 created at: 2013-5-30
*/
public class ProtocolFilterDecorator implements Protocol { //實現Protocol的接口,聯繫到上文中使用此類對實際的Protocol進行包裝
private Protocol protocol;
public ProtocolFilterDecorator(Protocol protocol) {
if (protocol == null) {
throw new MotanFrameworkException("Protocol is null when construct ProtocolFilterDecorator",
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);

}
this.protocol = protocol;//給實際的Protocol進行賦值
}
@Override
public Exporter export(Provider provider, URL url) {
return protocol.export(decorateWithFilter(provider, url), url);//發佈服務時,調用filter增強處理方法
}
private Provider decorateWithFilter(final Provider provider, URL url) {
List<filter> filters = getFilters(url, MotanConstants.NODE_TYPE_SERVICE);//獲取實際需要增強的filter
if (filters == null || filters.size() == 0) {
return provider;
}
Provider lastProvider = provider;
for (Filter filter : filters) { //對於代理對象provider進行包裝,包裝成一個provider鏈,返回最後一個provider
final Filter f = filter;
if (f instanceof InitializableFilter) {
((InitializableFilter) f).init(lastProvider);
}
final Provider lp = lastProvider;
lastProvider = new Provider() {
@Override
public Response call(Request request) {
return f.filter(lp, request);//對於後面調用的call方法時,首先調用最外層的filter,最後再調用實際的provider的call方法
}
@Override
public String desc() {
return lp.desc();
}
@Override
public void destroy() {
lp.destroy();
}
@Override

public Class getInterface() {
return lp.getInterface();
}
@Override
public Method lookupMethod(String methodName, String methodDesc) {
return lp.lookupMethod(methodName, methodDesc);
}
@Override
public URL getUrl() {
return lp.getUrl();
}
@Override
public void init() {
lp.init();
}
@Override
public boolean isAvailable() {
return lp.isAvailable();
}
@Override
public T getImpl() {
return provider.getImpl();
}
};
}
return lastProvider;
}
/**
*

* 獲取方式:
* 1)先獲取默認的filter列表;
* 2)根據filter配置獲取新的filters,並和默認的filter列表合併;
* 3)再根據一些其他配置判斷是否需要增加其他filter,如根據accessLog進行判斷,是否需要增加accesslog
*

*
* @param url
* @param key
* @return
*/
private List<filter> getFilters(URL url, String key) {

// load default filters
List<filter> filters = new ArrayList<filter>();
List<filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter.class).getExtensions(key);//使用spi機制初始化filer對象
if (defaultFilters != null && defaultFilters.size() > 0) {
filters.addAll(defaultFilters);
}
// add filters via "filter" config
String filterStr = url.getParameter(URLParamType.filter.getName());
if (StringUtils.isNotBlank(filterStr)) {
String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr);
for (String fn : filterNames) {
addIfAbsent(filters, fn);
}
}
// add filter via other configs, like accessLog and so on
boolean accessLog = url.getBooleanParameter(URLParamType.accessLog.getName(), URLParamType.accessLog.getBooleanValue());
if (accessLog) {
addIfAbsent(filters, AccessLogFilter.class.getAnnotation(SpiMeta.class).name());
}
// sort the filters
Collections.sort(filters, new ActivationComparator<filter>());
Collections.reverse(filters);
return filters;
}
}
/<filter>/<filter>/<filter>/<filter>/<filter>
/<filter>

4.服務發佈完成後,需要像註冊中心註冊此服務

private void register(List registryUrls, URL serviceUrl) {
for (URL url : registryUrls) { //循環遍歷多個註冊中心的信息
// 根據check參數的設置,register失敗可能會拋異常,上層應該知曉

RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());//文中使用的是zookeeper
if (registryFactory == null) {
throw new MotanFrameworkException(new MotanErrorMsg(500, MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE,
"register error! Could not find extension for registry protocol:" + url.getProtocol()
+ ", make sure registry module for " + url.getProtocol() + " is in classpath!"));
}
Registry registry = registryFactory.getRegistry(url);//獲取registry
registry.register(serviceUrl);//將服務註冊到zookeeper,也就是把節點信息寫入到zookeeper中
}
}

我們來看一下zookeeper註冊中心的工廠類:每個Registry都需要獨立維護一個ZkClient與zookeeper的鏈接

import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.registry.Registry;
import com.weibo.api.motan.registry.support.AbstractRegistryFactory;
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.util.LoggerUtil;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;

@SpiMeta(name = "zookeeper")
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
@Override
protected Registry createRegistry(URL registryUrl) {
try {
int timeout = registryUrl.getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
int sessionTimeout =
registryUrl.getIntParameter(URLParamType.registrySessionTimeout.getName(),
URLParamType.registrySessionTimeout.getIntValue());
ZkClient zkClient = new ZkClient(registryUrl.getParameter("address"), sessionTimeout, timeout, new StringSerializer());//創建zookeeper的客戶端
return new ZookeeperRegistry(registryUrl, zkClient);//創建實際的Registry
} catch (ZkException e) {
LoggerUtil.error("[ZookeeperRegistry] fail to connect zookeeper, cause: " + e.getMessage());
throw e;
}
}
}

我們再來分析ZookeeperRegistry中的代碼

 public ZookeeperRegistry(URL url, ZkClient client) {
super(url);
this.zkClient = client;
IZkStateListener zkStateListener = new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
// do nothing
}
@Override
public void handleNewSession() throws Exception { //響應zkClient的事件
LoggerUtil.info("zkRegistry get new session notify.");
reconnectService();//重新註冊服務
reconnectClient();
}
};
zkClient.subscribeStateChanges(zkStateListener);
ShutDownHook.registerShutdownHook(this);
}

private void reconnectService() {
Collection allRegisteredServices = getRegisteredServiceUrls();
if (allRegisteredServices != null && !allRegisteredServices.isEmpty()) {
try {
serverLock.lock();
for (URL url : getRegisteredServiceUrls()) {
doRegister(url);//註冊
}
LoggerUtil.info("[{}] reconnect: register services {}", registryClassName, allRegisteredServices);
for (URL url : availableServices) {
if (!getRegisteredServiceUrls().contains(url)) {
LoggerUtil.warn("reconnect url not register. url:{}", url);
continue;
}
doAvailable(url);//標識服務可以提供服務
}
LoggerUtil.info("[{}] reconnect: available services {}", registryClassName, availableServices);
} finally {
serverLock.unlock();
}
}
}

@Override
protected void doRegister(URL url) {
try {
serverLock.lock();

// 防止舊節點未正常註銷
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
} catch (Throwable e) {
throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
} finally {
serverLock.unlock();
}
}

@Override
protected void doUnregister(URL url) {
try {
serverLock.lock();
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
} catch (Throwable e) {
throw new MotanFrameworkException(String.format("Failed to unregister %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
} finally {
serverLock.unlock();
}
}

@Override
protected void doAvailable(URL url) {
try{
serverLock.lock();
if (url == null) {
availableServices.addAll(getRegisteredServiceUrls());
for (URL u : getRegisteredServiceUrls()) {
removeNode(u, ZkNodeType.AVAILABLE_SERVER);
removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
createNode(u, ZkNodeType.AVAILABLE_SERVER);
}
} else {
availableServices.add(url);
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
createNode(url, ZkNodeType.AVAILABLE_SERVER);
}
} finally {
serverLock.unlock();
}
}

@Override
protected void doUnavailable(URL url) {
try{

serverLock.lock();
if (url == null) {
availableServices.removeAll(getRegisteredServiceUrls());
for (URL u : getRegisteredServiceUrls()) {
removeNode(u, ZkNodeType.AVAILABLE_SERVER);
removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
createNode(u, ZkNodeType.UNAVAILABLE_SERVER);
}
} else {
availableServices.remove(url);
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
}
} finally {
serverLock.unlock();
}
}

private void createNode(URL url, ZkNodeType nodeType) {
String nodeTypePath = ZkUtils.toNodeTypePath(url, nodeType);
if (!zkClient.exists(nodeTypePath)) {
zkClient.createPersistent(nodeTypePath, true);//對於服務的標識信息,創建持久化節點
}
//對於服務的ip和端口號信息使用臨時節點,當服務斷了後,zookeeper自動摘除目標服務器
zkClient.createEphemeral(ZkUtils.toNodePath(url, nodeType), url.toFullStr());
}

private void removeNode(URL url, ZkNodeType nodeType) {
String nodePath = ZkUtils.toNodePath(url, nodeType);
if (zkClient.exists(nodePath)) {
zkClient.delete(nodePath);
}
}

本文分析了motan的服務發佈及註冊到zookeeper的流程相關的源碼,主要涉及到的知識點:

1.利用相關的配置對象進行信息的存儲及傳遞;

2.利用provider對具體的業務類進行封裝代理;

3.利用filter鏈的結構,來包裝實際的provider,把所有的過濾器都處理完畢後,最後調用實際的業務類,大家可以想象一下aop相關的原理,有些類似;

4.代碼中大量使用jdk的標準spi技術進行類的加載;

5.支持多個註冊中心,也就是同一個服務可以註冊到不同的註冊中心上,每個registry對應一個具體的zkclient;

6.利用了zookeeper的臨時節點來維護服務器的host和port信息;

7.支持多個服務發佈到同一個端口,在本文中並沒分析netty使用相關的代碼,後面會分析到。


分享到:


相關文章: