03.08 Spring cloud Ribbon 客戶端負載均衡詳解(二)負載均衡器

通過上一篇 分析,我們已經對Spring Cloud如何使用Ribbon有了基本的瞭解。雖然Spring Cloud中定義了LoadBalancerClient作為負載均衡的通用接口,並且針對Ribbon實現了RibbonLoadBalancerClient,但是他在具體實現客戶端負載均衡時,是通過Ribbon的ILoadBalancer接口實現的。下面我們根據ILoadBalancer接口的實現做個看看它是如何實現客戶端負載均衡的。


AbstractLoadBalancer


Spring cloud Ribbon 客戶端負載均衡詳解(二)負載均衡器

AbstractLoadBalancer

AbstractLoadBalancer 是ILoadBalancer接口的抽象實現,在該抽象類中定義一個關於微服務實例的枚舉類ServerGroup,它包含三種不同類型。

1、ALL:所有服務實例。

2、STATUS_UP:正常服務的實例

3、STATUS_NOT_UP:停止服務的實例

另外還實現了chooseServer()方法,該方法通過調用接口中的chooseServer(Object key) 實現,其中參數為null,表示在選擇具體服務實例時忽略key的條件判斷。

最後還定義了兩個抽象函數,getServerList(ServerGroup serverGroup)定義了根據分組類型來獲取不同的服務實例列表,getLoadBalancerStats()定義了獲取LoadBalancerStats對象的方法,LoadBalancerStats對象被用來存儲負載均衡器中各個服務實例當前的屬性和統計信息,這些信息非常有用,我們可以利用這些信息來觀察負載均衡器的運行情況,同時這些信息也是用來制定負載均衡策略的重要依據。

<code>package com.netflix.loadbalancer;

import java.util.List;

public abstract class AbstractLoadBalancer implements ILoadBalancer {
public AbstractLoadBalancer() {
}

public Server chooseServer() {
return this.chooseServer((Object)null);

}
\t\t// 定義了根據分組類型來獲取不同的服務實例列表
public abstract List<server> getServerList(AbstractLoadBalancer.ServerGroup var1);
//getLoadBalancerStats()定義了獲取LoadBalancerStats對象的方法,
//LoadBalancerStats對象被用來存儲負載均衡器中各個服務實例當前的屬性和統計信息,這些信息非常有用,
//我們可以利用這些信息來觀察負載均衡器的運行情況,同時這些信息也是用來制定負載均衡策略的重要依據。
public abstract LoadBalancerStats getLoadBalancerStats();

public static enum ServerGroup {
ALL,
STATUS_UP,
STATUS_NOT_UP;

private ServerGroup() {
}
}
}/<server>/<code>

BaseLoadBalancer

BaseLoadBalancer類是Ribbon負載均衡器的基礎實現類,在該類中定義很多關於均衡負載器相關的基礎內容:

  • 定義並維護了兩個存儲服務實例Server對象的列表。一個用於存儲所有服務實例的清單,一個用於存儲正常服務的實例清單。
<code>@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<server> allServerList = Collections
.synchronizedList(new ArrayList<server>()); // 所有服務實例

@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<server> upServerList = Collections
.synchronizedList(new ArrayList<server>()); // 正常服務實例/<server>/<server>/<server>/<server>/<code>
  • 定義了之前我們提到的用來存儲負載均衡器各服務實例屬性和統計信息的LoadBalancerStats對象。
  • 定義了檢查服務實例是否正常服務的IPing對象,在BaseLoadBalancer中默認為null,需要在構造時注入它的具體實現。
  • 定義了檢查服務實例操作的執行策略對象IPingStrategy,在BaseLoadBalancer中默認使用了該類中定義的靜態內部類SerialPingStrategy實現。根據源碼,我們可以看到該策略採用線性遍歷ping服務實例的方式實現檢查。該策略在當我們實現的IPing速度不理想,或是Server列表過大時,可能變的不是很為理想,這時候我們需要通過實現IPingStrategy接口並實現pingServers(IPing ping, Server[] servers)函數去擴展ping的執行策略。
  • <code> private static class SerialPingStrategy implements IPingStrategy {

    @Override public boolean[] pingServers(IPing ping, Server[] servers) {
    int numCandidates = servers.length;
    boolean[] results = new boolean[numCandidates];

    logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);

    for (int i = 0; i < numCandidates; i++) {
    results[i] = false;
    try {
    if (ping != null) {
    \t\tresults[i] = ping.isAlive(servers[i]);
    \t}
    }
    } catch (Exception e) {
    logger.error("Exception while pinging Server: '{}'", servers[i], e);
    }
    }

    return results;
    }
    }
    }/<code>
    • 定義了負載均衡的處理規則IRule對象,從BaseLoadBalancer中chooseServer(Object key)的實現源碼,我們可以知道負載均衡器實際進行服務實例選擇任務是委託給了IRule實例中的choose函數來實現。而在這裡,默認初始化了RoundRobinRule為IRule的實現對象。RoundRobinRule實現了最基本且常用的線性負載均衡規則。
    <code>public Server chooseServer(Object key) {
    if (counter == null) {
    counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
    return null;
    } else {
    try {
    return rule.choose(key);
    } catch (Throwable t) {
    return null;
    }
    }
    }/<code>
  • 啟動ping任務:在BaseLoadBalancer的默認構造函數中,會直接啟動一個用於定時檢查Server是否健康的任務。該任務默認的執行間隔為:10秒。
  • 實現了ILoadBalancer接口定義的負載均衡器應具備的一系列基本操作:
    • addServers(List newServers):向負載均衡器中增加新的服務實例列表,該實現將原本已經維護著的所有服務實例清單allServerList和新傳入的服務實例清單newServers都加入到newList中,然後通過調用setServersList函數對newList進行處理,在BaseLoadBalancer中實現的時候會使用新的列表覆蓋舊的列表。而之後介紹的幾個擴展實現類對於服務實例清單更新的優化都是對setServersList函數的重寫來實現的。
    <code> public void addServer(Server newServer) {
    if (newServer != null) {
    try {
    ArrayList<server> newList = new ArrayList();
    newList.addAll(this.allServerList);
    newList.add(newServer);
    this.setServersList(newList);
    } catch (Exception var3) {
    logger.error("LoadBalancer [{}]: Error adding newServer {}", new Object[]{this.name, newServer.getHost(), var3});
    }
    }

    }/<server>/<code>
    • chooseServer(Object key):挑選一個具體的服務實例,在上面介紹IRule的時候,已經做了說明,這裡不再贅述。
    <code>public Server chooseServer(Object key) {
    if (this.counter == null) {
    this.counter = this.createCounter();
    }

    this.counter.increment();
    if (this.rule == null) {
    return null;
    } else {
    try {
    return this.rule.choose(key);
    } catch (Exception var3) {
    logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3});
    return null;
    }
    }
    }/<code>
    • markServerDown(Server server):標記某個服務實例暫停服務。
    <code> public void markServerDown(Server server) {
    if (server != null && server.isAlive()) {
    logger.error("LoadBalancer [{}]: markServerDown called on [{}]", this.name, server.getId());

    server.setAlive(false);
    this.notifyServerStatusChangeListener(Collections.singleton(server));
    }
    }/<code>
    • getReachableServers():獲取可用的服務實例列表。由於BaseLoadBalancer中單獨維護了一個正常服務的實例清單,所以直接返回即可。
    <code>  public List<server> getReachableServers() {
    return Collections.unmodifiableList(this.upServerList);
    }
    /<server>/<code>
    • getAllServers():獲取所有的服務實例列表。由於BaseLoadBalancer中單獨維護了一個所有服務的實例清單,所以也直接返回它即可。
    <code>public List<server> getAllServers() {
    return Collections.unmodifiableList(this.allServerList);
    }/<server>/<code>


    DynamicServerListLoadBalancer

    DynamicServerListLoadBalancer類繼承於BaseLoadBalancer類,它是對基礎負載均衡器的擴展。在該負載均衡器中,實現了服務實例清單的在運行期的動態更新能力;同時,它還具備了對服務實例清單的過濾功能,也就是說我們可以通過過濾器來選擇性的獲取一批服務實例清單。下面我們具體來看看在該類中增加了一些什麼內容:

    ServerList:

    從DynamicServerListLoadBalancer的成員定義中,我們馬上可以發現新增了一個關於服務列表的操作對象:ServerList serverListImpl。其中泛型T從類名中對於T的限定DynamicServerListLoadBalancer可以獲知它是一個Server的子類,即代表了一個具體的服務實例的擴展類。而ServerList接口定義如下所示:

    <code>public interface ServerList {
    \t\t// 獲取初始化的服務清單
    public List getInitialListOfServers();

    /**
    * Return updated list of servers. This is called say every 30 secs
    * (configurable) by the Loadbalancer's Ping cycle
    * 獲取更新的服務實例清單
    */
    public List getUpdatedListOfServers();

    }
    /<code>

    它定義了兩個抽象方法:getInitialListOfServers用於獲取初始化的服務實例清單,而getUpdatedListOfServers用於獲取更新的服務實例清單。那麼該接口的實現有哪些呢?通過搜索源碼,我們可以整出如下圖的結構:

    Spring cloud Ribbon 客戶端負載均衡詳解(二)負載均衡器

    從圖中我們可以看到有很多個ServerList的實現類,那麼在DynamicServerListLoadBalancer中的ServerList默認配置到底使用了哪個具體實現呢?既然在該負載均衡器中需要實現服務實例的動態更新,那麼勢必需要ribbon具備訪問eureka來獲取服務實例的能力,所以我們從Spring Cloud整合ribbon與eureka的包org.springframework.cloud.netflix.ribbon.eureka下探索,可以找到配置類EurekaRibbonClientConfiguration,在該類中可以找到看到下面創建ServerList實例的內容:

    <code>@Bean
    @ConditionalOnMissingBean
    public ServerList> ribbonServerList(IClientConfig config) {
    DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
    config);
    DomainExtractingServerList serverList = new DomainExtractingServerList(
    discoveryServerList, config, this.approximateZoneFromHostname);
    return serverList;
    }/<code>

    可以看到,這裡創建的是一個DomainExtractingServerList實例,從下面它的源碼中我們可以看到在它內部還定義了一個ServerList list。同時,DomainExtractingServerList類中對getInitialListOfServers和getUpdatedListOfServers的具體實現,其實委託給了內部定義的ServerList list對象,而該對象是通過創建DomainExtractingServerList時候,由構造函數傳入的DiscoveryEnabledNIWSServerList實現的。

    <code>public class DomainExtractingServerList implements ServerList<discoveryenabledserver> {

    \tprivate ServerList<discoveryenabledserver> list;
    \tprivate final RibbonProperties ribbon;

    \tprivate boolean approximateZoneFromHostname;

    \tpublic DomainExtractingServerList(ServerList<discoveryenabledserver> list,
    \t\t\tIClientConfig clientConfig, boolean approximateZoneFromHostname) {
    \t\tthis.list = list;
    \t\tthis.ribbon = RibbonProperties.from(clientConfig);
    \t\tthis.approximateZoneFromHostname = approximateZoneFromHostname;
    \t}


    \t@Override
    \tpublic List<discoveryenabledserver> getInitialListOfServers() {
    \t\tList<discoveryenabledserver> servers = setZones(this.list
    \t\t\t\t.getInitialListOfServers());
    \t\treturn servers;
    \t}

    \t@Override
    \tpublic List<discoveryenabledserver> getUpdatedListOfServers() {
    \t\tList<discoveryenabledserver> servers = setZones(this.list
    \t\t\t\t.getUpdatedListOfServers());
    \t\treturn servers;
    \t}

    \tprivate List<discoveryenabledserver> setZones(List<discoveryenabledserver> servers) {
    \t\tList<discoveryenabledserver> result = new ArrayList<>();
    \t\tboolean isSecure = this.ribbon.isSecure(true);
    \t\tboolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
    \t\tfor (DiscoveryEnabledServer server : servers) {
    \t\t\tresult.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
    \t\t\t\t\tthis.approximateZoneFromHostname));
    \t\t}
    \t\treturn result;
    \t}

    }/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<code>

    那麼DiscoveryEnabledNIWSServerList是如何實現這兩個服務實例的獲取的呢?我們從源碼中可以看到這兩個方法都是通過該類中的一個私有函數obtainServersViaDiscovery來通過服務發現機制來實現服務實例的獲取。

    <code>
    @Override
    public List<discoveryenabledserver> getInitialListOfServers(){
    return obtainServersViaDiscovery();
    }

    @Override
    public List<discoveryenabledserver> getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
    }/<discoveryenabledserver>/<discoveryenabledserver>/<code>

    而obtainServersViaDiscovery的實現邏輯如下,主要依靠EurekaClient從服務註冊中心中獲取到具體的服務實例InstanceInfo列表(EurekaClient的具體實現,我們在分析Eureka的源碼時已經做了詳細的介紹,這裡傳入的vipAddress可以理解為邏輯上的服務名,比如“USER-SERVICE”)。接著,對這些服務實例進行遍歷,將狀態為“UP”(正常服務)的實例轉換成DiscoveryEnabledServer對象,最後將這些實例組織成列表返回。

    <code>private List<discoveryenabledserver> obtainServersViaDiscovery() {
    List<discoveryenabledserver> serverList = new ArrayList<discoveryenabledserver>();

    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
    logger.warn("EurekaClient has not been initialized yet, returning an empty list");
    return new ArrayList<discoveryenabledserver>();
    }

    EurekaClient eurekaClient = eurekaClientProvider.get();
    if (vipAddresses!=null){
    for (String vipAddress : vipAddresses.split(",")) {
    List<instanceinfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(
    vipAddress, isSecure, targetRegion);
    for (InstanceInfo ii : listOfInstanceInfo) {
    if (ii.getStatus().equals(InstanceStatus.UP)) {
    // 省略了一些實例信息的加工邏輯
    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
    des.setZone(DiscoveryClient.getZone(ii));
    serverList.add(des);
    }
    }
    if (serverList.size()>0 && prioritizeVipAddressBasedServers){
    break;
    }
    }
    }
    return serverList;
    }/<instanceinfo>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<code>

    在DiscoveryEnabledNIWSServerList中通過EurekaClient從服務註冊中心獲取到最新的服務實例清單後,返回的List到了DomainExtractingServerList類中,將繼續通過setZones函數進行處理,而這裡的處理具體內容如下,主要完成將DiscoveryEnabledNIWSServerList返回的List列表中的元素,轉換成內部定義的DiscoveryEnabledServer的子類對象DomainExtractingServer,在該對象的構造函數中將為服務實例對象設置一些必要的屬性,比如id、zone、isAliveFlag、readyToServe等信息。

    <code>private List<discoveryenabledserver> setZones(List<discoveryenabledserver> servers) {
    List<discoveryenabledserver> result = new ArrayList<>();
    boolean isSecure = this.clientConfig.getPropertyAsBoolean(
    CommonClientConfigKey.IsSecure, Boolean.TRUE);
    boolean shouldUseIpAddr = this.clientConfig.getPropertyAsBoolean(

    CommonClientConfigKey.UseIPAddrForServer, Boolean.FALSE);
    for (DiscoveryEnabledServer server : servers) {
    result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
    this.approximateZoneFromHostname));
    }
    return result;
    }/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<code>

    ServerListUpdater

    通過上面的分析我們已經知道了Ribbon與Eureka整合後,如何實現從Eureka Server中獲取服務實例清單。那麼它又是如何觸發向Eureka Server去獲取服務實例清單以及如何在獲取到服務實例清單後更新本地的服務實例清單的呢?繼續來看DynamicServerListLoadBalancer中的實現內容,我們可以很容易的找到下面定義的關於ServerListUpdater的內容:

    <code>protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
    updateListOfServers();
    }
    };

    protected volatile ServerListUpdater serverListUpdater;/<code>

    根據該接口的命名,我們基本就能猜到,這個對象實現的是對ServerList的更新,所以可以稱它為“服務更新器”,從下面的源碼中可以看到,在ServerListUpdater內部還定義了一個UpdateAction接口,上面定義的updateAction對象就是以匿名內部類的方式創建了一個它的具體實現,其中doUpdate實現的內容就是對ServerList的具體更新操作。除此之外,“服務更新器”中還定義了一系列控制它和獲取它一些信息的操作。

    <code>public interface ServerListUpdater { 


    public interface UpdateAction {
    void doUpdate();
    }

    // 啟動服務更新器,傳入的UpdateAction對象為更新操作的具體實現。
    void start(UpdateAction updateAction);

    // 停止服務更新器
    void stop();

    // 獲取最近的更新時間戳
    String getLastUpdate();

    // 獲取上一次更新到現在的時間間隔,單位為毫秒
    long getDurationSinceLastUpdateMs();

    // 獲取錯過的更新週期數
    int getNumberMissedCycles();

    // 獲取核心線程數
    int getCoreThreads();
    }/<code>

    而ServerListUpdater的實現類不多,具體下圖所示。

    Spring cloud Ribbon 客戶端負載均衡詳解(二)負載均衡器

    根據兩個類的註釋,我們可以很容易的知道它們的作用分別是:

    • PollingServerListUpdater:動態服務列表更新的默認策略,也就是說DynamicServerListLoadBalancer負載均衡器中的默認實現就是它,它通過定時任務的方式進行服務列表的更新。
    • EurekaNotificationServerListUpdater:該更新器也可服務於DynamicServerListLoadBalancer負載均衡器,但是它的觸發機制與PollingServerListUpdater不同,它需要利用Eureka的事件監聽器來驅動服務列表的更新操作。

    下面我們來詳細看看它默認實現的PollingServerListUpdater。先從用於啟動“服務更新器”的start函數源碼看起,具體如下。我們可以看到start函數的實現內容驗證了之前提到的:以定時任務的方式進行服務列表的更新。它先創建了一個Runnable的線程實現,在該實現中調用了上面提到的具體更新服務實例列表的方法updateAction.doUpdate(),最後再為這個Runnable的線程實現啟動了一個定時任務來執行。

    <code>@Override
    public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
    final Runnable wrapperRunnable = new Runnable() {
    @Override
    public void run() {
    if (!isActive.get()) {
    if (scheduledFuture != null) {
    scheduledFuture.cancel(true);
    }

    return;
    }
    try {
    updateAction.doUpdate();
    lastUpdated = System.currentTimeMillis();
    } catch (Exception e) {
    logger.warn("Failed one update cycle", e);
    }
    }
    };

    scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
    wrapperRunnable,
    initialDelayMs,
    refreshIntervalMs,
    TimeUnit.MILLISECONDS
    );
    } else {
    logger.info("Already active, no-op");
    }
    }/<code>

    繼續看PollingServerListUpdater的其他內容,我們可以找到用於啟動定時任務的2個重要參數initialDelayMs和refreshIntervalMs的默認定義分別為1000和30*1000,單位為毫秒。也就是說更新服務實例在初始化之後延遲1秒後開始執行,並以30秒為週期重複執行。除了這些內容之外,我們還能看到它還會記錄最後更新時間、是否存活等信息,同時也實現了ServerListUpdater中定義的一些其他操作內容,這些操作相對比較簡單,這裡不再具體說明,有興趣的讀者可以自己查看源碼瞭解其實現原理。

    ServerListFilter

    在瞭解了更新服務實例的定時任務是如何啟動的之後,我們繼續回到updateAction.doUpdate()調用的具體實現位置,在DynamicServerListLoadBalancer中,它的實際實現委託給了updateListOfServers函數,具體實現如下:

    <code>public void updateListOfServers() {
    List servers = new ArrayList();
    if (serverListImpl != null) {
    servers = serverListImpl.getUpdatedListOfServers();
    LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);

    if (filter != null) {
    servers = filter.getFilteredListOfServers(servers);
    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);
    }
    }
    updateAllServerList(servers);
    }
    /<code>

    可以看到,這裡終於用到了我們之前提到的ServerList的getUpdatedListOfServers,通過之前的介紹我們已經可以知道這一步實現了從Eureka Server中獲取服務可用實例的列表。在獲得了服務實例列表之後,這裡又將引入一個新的對象filter,追朔該對象的定義,我們可以找到它是ServerListFilter定義的。

    ServerListFilter接口非常簡單,該接口中之定義了一個方法List getFilteredListOfServers(List servers),主要用於實現對服務實例列表的過濾,通過傳入的服務實例清單,根據一些規則返回過濾後的服務實例清單。該接口的實現如下圖所示:

    Spring cloud Ribbon 客戶端負載均衡詳解(二)負載均衡器

    其中,除了ZonePreferenceServerListFilter的實現是Spring Cloud Netflix中對Ribbon的擴展實現外,其他均是Netflix Ribbon中的實現類。我們可以分別看看這些過濾器實現都有什麼特點:

    • AbstractServerListFilter:這是一個抽象過濾器,在這裡定義了過濾時需要的一個重要依據對象LoadBalancerStats,我們在之前介紹過的,該對象存儲了關於負載均衡器的一些屬性和統計信息等。
    <code>public abstract class AbstractServerListFilter implements ServerListFilter {

    private volatile LoadBalancerStats stats;

    public void setLoadBalancerStats(LoadBalancerStats stats) {
    this.stats = stats;
    }

    public LoadBalancerStats getLoadBalancerStats() {
    return stats;
    }
    }
    /<code>

    ZoneAffinityServerListFilter:該過濾器基於“區域感知(Zone Affinity)”的方式實現服務實例的過濾,也就是說它會根據提供服務的實例所處區域(Zone)與消費者自身的所處區域(Zone)進行比較,過濾掉那些不是同處一個區域的實例。

    <code>public List getFilteredListOfServers(List servers) { 

    if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
    List filteredServers = Lists.newArrayList(Iterables.filter(
    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
    if (shouldEnableZoneAffinity(filteredServers)) {
    return filteredServers;
    } else if (zoneAffinity) {
    overrideCounter.increment();
    }
    }
    return servers;
    }
    /<code>

    從上面的源碼中我們可以看到,對於服務實例列表的過濾是通過Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())來實現的,其中判斷依據由ZoneAffinityPredicate實現服務實例與消費者的Zone比較。而在過濾之後,這裡並不會馬上返回過濾的結果,而是通過shouldEnableZoneAffinity函數來判斷是否要啟用“區域感知”的功能,從下面shouldEnableZoneAffinity的實現中,我們可以看到,它使用了LoadBalancerStats的getZoneSnapshot方法來獲取這些過濾後的同區域實例的基礎指標(包含了:實例數量、斷路器斷開數、活動請求數、實例平均負載等),根據一系列的算法求出下面的幾個評價值並與設置的閾值對比(下面的為默認值),若有一個條件符合,就不啟用“區域感知”過濾的服務實例清單。這一算法實現對於集群出現區域故障時,依然可以依靠其他區域的實例進行正常服務提供了完善的高可用保障。同時,通過這裡的介紹,我們也可以關聯著來理解之前介紹Eureka的時候提到的對於區域分配設計來保證跨區域故障的高可用問題。

    • blackOutServerPercentage:故障實例百分比(斷路器斷開數 / 實例數量) >= 0.8
    • activeReqeustsPerServer:實例平均負載 >= 0.6
    • availableServers:可用實例數(實例數量 - 斷路器斷開數) < 2
    <code>private boolean shouldEnableZoneAffinity(List filtered) {
    if (!zoneAffinity && !zoneExclusive) {
    return false;
    }
    if (zoneExclusive) {
    return true;
    }
    LoadBalancerStats stats = getLoadBalancerStats();
    if (stats == null) {
    return zoneAffinity;
    } else {
    logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
    ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
    double loadPerServer = snapshot.getLoadPerServer();
    int instanceCount = snapshot.getInstanceCount();
    int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
    if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
    || loadPerServer >= activeReqeustsPerServerThreshold.get()
    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
    logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
    new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
    return false;
    } else {
    return true;
    }
    }
    }
    /<code>
  • DefaultNIWSServerListFilter:該過濾器完全繼承自ZoneAffinityServerListFilter,是默認的NIWS(Netflix Internal Web Service)過濾器。
  • ServerListSubsetFilter:該過濾器也繼承自ZoneAffinityServerListFilter,它非常適用於擁有大規模服務器集群(上百或更多)的系統。因為它可以產生一個“區域感知”結果的子集列表,同時它還能夠通過比較服務實例的通信失敗數量和併發連接數來判定該服務是否健康來選擇性的從服務實例列表中剔除那些相對不夠健康的實例。該過濾器的實現主要分為三步: 獲取“區域感知”的過濾結果,來作為候選的服務實例清單 從當前消費者維護的服務實例子集中剔除那些相對不夠健康的實例(同時也將這些實例從候選清單中剔除,防止第三步的時候又被選入),不夠健康的標準如下:

    a. 服務實例的併發連接數超過客戶端配置的值,默認為0,配置參數為:<clientname>.<namespace>.ServerListSubsetFilter.eliminationConnectionThresold
    b. 服務實例的失敗數超過客戶端配置的值,默認為0,配置參數為:<clientname>.<namespace>.ServerListSubsetFilter.eliminationFailureThresold
    c. 如果按符合上面任一規則的服務實例剔除後,剔除比例小於客戶端默認配置的百分比,默認為0.1(10%),配置參數為:<clientname>.<namespace>.ServerListSubsetFilter.forceEliminatePercent。那麼就先對剩下的實例列表進行健康排序,再開始從最不健康實例進行剔除,直到達到配置的剔除百分比。 在完成剔除後,清單已經少了至少10%(默認值)的服務實例,最後通過隨機的方式從候選清單中選出一批實例加入到清單中,以保持服務實例子集與原來的數量一致,而默認的實例子集數量為20,其配置參數為:<clientname>.<namespace>.ServerListSubsetFilter.size。/<namespace>/<clientname>/<namespace>/<clientname>/<namespace>/<clientname>/<namespace>/<clientname>
  • ZonePreferenceServerListFilter:Spring Cloud整合時新增的過濾器。若使用Spring Cloud整合Eureka和Ribbon時會默認使用該過濾器。它實現了通過配置或者Eureka實例元數據的所屬區域(Zone)來過濾出同區域的服務實例。如下面的源碼所示,它的實現非常簡單,首先通過父類ZoneAffinityServerListFilter的過濾器來獲得“區域感知”的服務實例列表,然後遍歷這個結果取出根據消費者配置預設的區域Zone來進行過濾,如果過濾的結果是空的就直接返回父類獲取的結果,如果不為空就返回通過消費者配置的Zone過濾後的結果。
  • <code>@Override 

    public List<server> getFilteredListOfServers(List<server> servers) {
    List<server> output = super.getFilteredListOfServers(servers);
    if (this.zone != null && output.size() == servers.size()) {
    List<server> local = new ArrayList<server>();
    for (Server server : output) {
    if (this.zone.equalsIgnoreCase(server.getZone())) {
    local.add(server);
    }
    }
    if (!local.isEmpty()) {
    return local;
    }
    }
    return output;
    }/<server>/<server>/<server>/<server>/<server>/<code>

    ZoneAwareLoadBalancer

    ZoneAwareLoadBalancer負載均衡器是對DynamicServerListLoadBalancer的擴展。在DynamicServerListLoadBalancer中,我們可以看到它並沒有重寫選擇具體服務實例的chooseServer函數,所以它依然會採用在BaseLoadBalancer中實現的算法,使用RoundRobinRule規則,以線性輪詢的方式來選擇調用的服務實例,該算法實現簡單並沒有區域(Zone)的概念,所以它會把所有實例視為一個Zone下的節點來看待,這樣就會週期性的產生跨區域(Zone)訪問的情況,由於跨區域會產生更高的延遲,這些實例主要以防止區域性故障實現高可用為目的而不能作為常規訪問的實例,所以在多區域部署的情況下會有一定的性能問題,而該負載均衡器則可以避免這樣的問題。那麼它是如何實現的呢?

    首先,在ZoneAwareLoadBalancer中,我們可以發現,它並沒有重寫setServersList,說明實現服務實例清單的更新主邏輯沒有修改。但是我們可以發現它重寫了這個函數:setServerListForZones(Map<string>> zoneServersMap)。看到這裡可能會有一些陌生,因為它並不是接口中定義的必備函數,所以我們不妨去父類DynamicServerListLoadBalancer中尋找一下該函數,我們可以找到下面的定義了:/<string>

    <code>public void setServersList(List lsrv) {
    super.setServersList(lsrv);
    List serverList = (List) lsrv;
    Map<string>> serversInZones = new HashMap<string>>();
    ...
    setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(Map<string>> zoneServersMap) {
    LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
    getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }/<string>/<string>/<string>
    /<code>

    setServerListForZones函數的調用位於更新服務實例清單函數setServersList的最後,同時從其實現內容來看,它在父類DynamicServerListLoadBalancer中的作用是根據按區域Zone分組的實例列表,為負載均衡器中的LoadBalancerStats對象創建ZoneStats並放入Map zoneStatsMap集合中,每一個區域Zone會對應一個ZoneStats,它用於存儲每個Zone的一些狀態和統計信息。

    在ZoneAwareLoadBalancer中對setServerListForZones的重寫如下:

    <code>protected void setServerListForZones(Map<string>> zoneServersMap) {
    super.setServerListForZones(zoneServersMap);
    if (balancers == null) {
    balancers = new ConcurrentHashMap<string>();
    }
    for (Map.Entry<string>> entry: zoneServersMap.entrySet()) {
    String zone = entry.getKey().toLowerCase();
    getLoadBalancer(zone).setServersList(entry.getValue());
    }
    for (Map.Entry<string> existingLBEntry: balancers.entrySet()) {
    if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
    existingLBEntry.getValue().setServersList(Collections.emptyList());
    }
    }
    }/<string>/<string>/<string>/<string>/<code>

    可以看到,在該實現中創建了一個ConcurrentHashMap()類型的balancers對象,它將用來存儲每個Zone區域對應的負載均衡器,而具體的負載均衡器的創建則是通過下面的第一個循環中調用getLoadBalancer函數來完成,同時在創建負載均衡器的時候會創建它的規則(如果當前實現中沒有IRULE的實例,就創建一個AvailabilityFilteringRule規則;如果已經有具體實例,就clone一個),在創建完負載均衡器後又馬上調用setServersList函數為其設置對應Zone區域的實例清單。而第二個循環則是對Zone區域中實例清單的檢查,看看是否有Zone區域下已經沒有實例了,是的話就將balancers中對應Zone區域的實例列表清空,該操作的作用是為了後續選擇節點時,防止過時的Zone區域統計信息干擾具體實例的選擇算法。

    在瞭解了該負載均衡器是如何擴展服務實例清單的實現後,我們來具體看看它是如何挑選服務實例,來實現對區域的識別的:

    <code>public Server chooseServer(Object key) {
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
    logger.debug("Zone aware logic disabled or there is only one zone");
    return super.chooseServer(key);
    }
    Server server = null;
    try {
    LoadBalancerStats lbStats = getLoadBalancerStats();
    Map<string> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    ...
    Set<string> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
    String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
    logger.debug("Zone chosen: {}", zone);
    if (zone != null) {
    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
    server = zoneLoadBalancer.chooseServer(key);
    }
    }
    } catch (Throwable e) {
    logger.error("Unexpected exception when choosing server using zone aware logic", e);
    }
    if (server != null) {
    return server;
    } else {
    logger.debug("Zone avoidance logic is not invoked.");
    return super.chooseServer(key);
    }
    }/<string>/<string>/<code>

    從源碼中我們可以看到,只有當負載均衡器中維護的實例所屬Zone區域個數大於1的時候才會執行這裡的選擇策略,否則還是將使用父類的實現。當Zone區域個數大於1個的時候,它的實現步驟主要如下:

    • 調用ZoneAvoidanceRule中的靜態方法createSnapshot(lbStats),為當前負載均衡器中所有的Zone區域分別創建快照,保存在Map zoneSnapshot中,這些快照中的數據將用於後續的算法。
    • 調用ZoneAvoidanceRule中的靜態方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),來獲取可用的Zone區域集合,在該函數中會通過Zone區域快照中的統計數據來實現可用區的挑選。 首先它會剔除符合這些規則的Zone區域:所屬實例數為零的Zone區域;Zone區域內實例平均負載小於零,或者實例故障率(斷路器斷開次數/實例數)大於等於閾值(默認為0.99999)。 然後根據Zone區域的實例平均負載來計算出最差的Zone區域,這裡的最差指的是實例平均負載最高的Zone區域。 如果在上面的過程中沒有符合剔除要求的區域,同時實例最大平均負載小於閾值(默認為20%),就直接返回所有Zone區域為可用區域。否則,從最壞Zone區域集合中隨機的選擇一個,將它從可用Zone區域集合中剔除。
    • 當獲得的可用Zone區域集合不為空,並且個數小於Zone區域總數,就隨機的選擇一個Zone區域。
    • 在確定了某個Zone區域後,則獲取對應Zone區域的服務均衡器,並調用chooseServer來選擇具體的服務實例,而在chooseServer中將使用IRule接口的choose函數來選擇具體的服務實例。在這裡IRule接口的實現會使用ZoneAvoidanceRule來挑選出具體的服務實例。


    分享到:


    相關文章: