快速、安全、可靠!Yarn!

Yarn 的全稱是 Yet Anther Resource Negotiator(另一種資源協商者)。它作為 Hadoop 的一個組件,官方對它的定義是

一個工作調度和集群資源管理的框架。關於 Yarn 的發展歷史我們在之前的文章曾介紹過,在這裡就不贅述了。

快速、安全、可靠!Yarn!

如上圖所示是 Yarn 的系統調度框架,Yarn 主要由四個核心組件進行協調工作,其中 ResourceManager 負責資源調度;NodeManager 負責資源管理,可啟動 Container;ApplicationMaster 管理具體的應用程序,負責啟動具體的任務;Container 設計比較精巧,將機器資源封裝後用於計算任務,它是具體執行任務最小的單元,若任務較大可並行多個 Container 共同執行,這也是分佈式任務的優勢。

Yarn VS Mesos

說到 Yarn 自然要聊聊 Mesos,Mesos 是以與 Linux 內核同樣的原則而創建的分佈式操作系統內核,Mesos 內核運行在每一個機器上同時通過 API 為各種應用提供跨數據中心和雲的資源管理調度能力。這些應用包括 Hadoop、Spark、Kafka、Elastic Search。還可配合框架 Marathon 來管理大規模的 Docker 等容器化應用,如下圖所示是它的架構圖:

快速、安全、可靠!Yarn!

與 Yarn 相同 Mesos 也主要由四個組件構成,它們之間根據功能可一一對應。

快速、安全、可靠!Yarn!

自然它們之間也存在一些差異,具體可總結為以下 3 點:

1.框架擔任的角色不同,在 Mesos 中各種計算框架是完全融入Mesos中的,若想在 Mesos 中添加一個新的計算框架,首先需要在 Mesos 中部署一套該框架;而在 Yarn 中,各種框架作為 client 端的 library 使用,不需要事先部署一套該框架,過程更簡易。

2.在資源調度方面,Mesos 只是簡單的將資源推送給各個應用程序,由應用程序選擇是否接受資源,Mesos 本身並不知道各個應用程序資源需求。而應用程序的 ApplicationMaster 會把各個任務的資源要求彙報給 Yarn,Yarn 則根據需要為應用程序分配資源;

3.Yarn 是 MapReduce 進化的產物,它是為 Hadoop jobs 管理資源而誕生的,只為 Hadoop jobs提供 static partitioning,而 Mesos 的設計目標是為各個框架提供dynamical partitioning,讓各個集群框架共用數據中心機器。因此 Mesos 定位在數據中心級別,Yarn 則更適合運行 Apache 生態圈的應用。


底層實現技術

Yarn 的底層實現技術主要分為 3 個部分:序列化、狀態機和通信模型。

序列化是在網絡傳輸中把一個對象按照一定的編碼格式通過 bit 數組序列化傳輸到另外一邊,再通過反序列化將其組裝成一個對象,它的目標是傳輸更少的東西卻包含更大的數據量。

Java 的 Serializable 雖然很簡便但傳輸量太大,不適合 Hadoop 的大數據量溝通。因此 Yarn 實現了一套 Writable 框架,在 MapReduce 1.0 被提出。這個框架更輕量級,但由於結構簡單因此性能和兼容性都較差,所以在 MapReduce 2.0 版本更新了

Protocol Buffers,這是谷歌開源的一款序列化框架,支持 Java、C++、Python 三種語言,性能有大幅度提升。後期又推出的 Avro 是 Hadoop 生態圈的序列化組件,它同時也是一個 rpc 框架,說不定未來會代替 Protocol Buffers~目前 Avro 僅支持 Java,值得一提的是它可以通過 Json 格式表達數據結構。

由於 Yarn 有很多任務狀態,如 finish、running 等,都是通過狀態機觸發的。Yarn 的狀態機通過一個前置狀態後置狀態是通過什麼事件觸發的,觸發之後調用回調函數,這四個組件組成一個最基本的狀態模型,這種設計十分契合狀態轉移的業務。

快速、安全、可靠!Yarn!

由於 Hadoop 組件是有心跳的,如果頻繁溝通使用 Java 的 rpc 組件 RMI 容易出現性能問題,因此 Yarn 需要通過更輕量級的框架來實現,它通過 Java 延伸了三個組件組合成了自有的 rpc 框架,分別是 Reflect 反射、Proxy 動態代理和 Socket 網絡編程。

接下來通過代碼實例更形象地感受下該組件的應用。YarnRPC 的核心代碼是兩個類:Invocation 和 Method,用來代表需要傳輸並實例化的類,其中 Invocation 的屬性如下:

private Class interfaces; // 實例類實現的接口
private Method method; // 實例類需要調用的方法
private Object[] params; // 方法的參數值
private Object result; // 返回結果


Method 的屬性如下:

private String methodName; // 方法的名稱
private Class[] params; // 方法的參數類型

從兩個類的屬性可以看出,YarnRPC 需要遠程調用的有用屬性都在這兩個類裡定義好了。

Server 端的 RPC 類需要通過反射實例化對應的類,過程如下:

public void call(Invocation invo) { 

System.out.println(invo.getClass().getName());
Object obj = serviceEngine.get(invo.getInterfaces().getName());
if(obj!=null) {
try {
Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
Object result = m.invoke(obj, invo.getParams());
invo.setResult(result);
} catch (Throwable th) {
th.printStackTrace();
}
} else {
throw new IllegalArgumentException("has no these class");
}
}

Client 端的 RPC 類需要通過動態代理調用對應的類,過程如下:

public static  T getProxy(final Class clazz,String host,int port) {

final Client client = new Client(host,port);
InvocationHandler handler = new InvocationHandler() {

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invo = new Invocation();
invo.setInterfaces(clazz);
invo.setMethod(new com.protocal.Method(method.getName(),method.getParameterTypes()));
invo.setParams(args);
client.invoke(invo);
return invo.getResult();
}
};
T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
return t;
}


結構模型

快速、安全、可靠!Yarn!

如上圖所示是 Yarn 的結構模型,各個組件協調將任務從客戶端提交到 Yarn 上面運行大致分為 6 個步驟。

1. 作業提交

Client 調用 job.waitForCompletion 方法,向整個集群提交 MapReduce program。 新的 Appcation ID 由 ResourceManager 分配,MapReduce program 的 client 核實作業的輸出,計算輸入的 split,將作業的資源(如 Jar 包、 配置文件等)同步至HDFS。最後通過調用 ResourceManager 的 submitApplication() 提交作業。

2. 作業初始化

當 ResourceManager 收到 submitApplciation() 請求時就將該請求發給調度器,調度器分配 container。然後 ResourceManager 在該 container 內啟動 ApplicationMaster 進程,由 NodeManager 監控。

MapReduce program 的 ApplicationMaster 是一個主類為 MRAppMaster 的 Java 應用,它通過創造一些 bookkeeping 對象來監控作業的進度,得到任務的進度和完成報告。然後通過分佈式文件系統得到由客戶端計算好的輸入 split,接著為每個輸入 split 創建一個 map 任務,根據 mapreduce.job.reduces 創建 reduce 任務對象。

3. 任務分配

若作業很小 ApplicationMaster 會選擇在其自己的 JVM 中運行任務,如果不是 ApplicationMaster 則會向 ResourceManager 請求 container 來運行所有的 map 和 reduce 任務。這些請求通過心跳來傳輸的,包括每個 map 任務的數據位置(比如存放輸入 split 的主機名和機架)。 調度器利用這些信息來調度任務,儘量將任務分配給存儲數據的節點,或者分配給和存放輸入 split 的節點相同機架的節點。

4. 任務運行

當一個任務由 ResourceManager 的調度器分配給一個 container後,ApplicationMaster 通過聯繫 NodeManager 來啟動 container。任務由一個主類為 YarnChild 的 Java 應用執行,在運行任務之前首先本地化任務需要的資源(如作業配置、JAR 文件以及分佈式緩存的所有文件),最後運行 map 或 reduce 任務。

5. 進度和狀態更新

Yarn 中的任務將其進度和狀態返回給 ApplicationMaster,客戶端每秒(通過 mapreduce.client.progressmonitor.pollinterval 設置)向 ApplicationMaster 請求進度更新並展示給用戶。

6. 作業完成

除了向 ApplicationMaster 請求作業進度外,客戶端每 5 分鐘都會通過調用 waitForCompletion() 來檢查作業是否完成,時間間隔可以通過 mapreduce.client.completion.pollinterval 來設置。作業完成之後 ApplicationMaster 和 container 會清理工作狀態,OutputCommiter 的作業清理方法也會被調用,作業的信息會被作業歷史服務器存儲以備之後用戶核查。

資源調度器

Yarn 是通過將資源分配給 queue 來進行資源分配的,每個 queue 可以設置它的資源分配方式,接著展開介紹 Yarn 的三種資源分配方式。

FIFO Scheduler

如果沒有配置策略的話,所有的任務都提交到一個 default 隊列,根據它們的提交順序執行。富裕資源就執行任務,若資源不富裕就等待前面的任務執行完畢後釋放資源,這就是 FIFO Scheduler 先入先出的分配方式。

快速、安全、可靠!Yarn!

如上圖所示,在 Job1 提交時佔用了所有的資源,不久後 Job2提交了,但是此時系統中已經沒有資源可以分配給它了。加入 Job1 是一個大任務,那麼 Job2 就只能等待一段很長的時間才能獲得執行的資源。所以先入先出的分配方式存在一個問題就是大任務會佔用很多資源,造成後面的小任務等待時間太長而餓死,因此一般不使用這個默認配置。

Capacity Scheduler

Capacity Scheduler 是一種多租戶、彈性的分配方式。每個租戶一個隊列,每個隊列可以配置能使用的資源上限與下限(譬如 50%,達到這個上限後即使其他的資源空置著,也不可使用),通過配置可以令隊列至少有資源下限配置的資源可使用。

快速、安全、可靠!Yarn!

上圖隊列 A 和隊列 B 分配了相互獨立的資源。Job1 提交給隊列 A 執行,它只能使用隊列 A 的資源。接著 Job2 提交給了隊列B 就不必等待 Job1 釋放資源了。這樣就可以將大任務和小任務分配在兩個隊列中,這兩個隊列的資源相互獨立,就不會造成小任務餓死的情況了。

Fair Scheduler

Fair Scheduler 是一種公平的分配方式,所謂的公平就是集群會儘可能地按配置的比例分配資源給隊列。

快速、安全、可靠!Yarn!

如上圖所示,Job1 提交給隊列 A,它佔用了集群的所有資源。接著 Job2 提交給了隊列 B,這時 Job1 就需要釋放它的一半的資源給隊列 A 中的 Job2 使用。接著 Job3 也提交給了隊列 B,這個時候 Job2 如果還未執行完畢的話也必須釋放一半的資源給 Job3。這就是公平的分配方式,在隊列範圍內所有任務享用到的資源都是均分的。

目前這三種調度方式用得最廣泛的就是 Capacity Scheduler

資源隔離原理

既然在調度中存在隊列的概念,自然免不了資源隔離。

Yarn 採用線程監控的方法判斷任務是否超量使用內存,一旦發現超量則直接將其 kill 掉。這種方式比較暴力,容易誤傷一些啟動時內存比較大的任務。默認情況下 NodeManager 不會對進行 CPU 隔離,我們可以通過啟用 Cgroup 支持,它會生成指定的配置文件設置資源佔有 CPU 的百分比,運行過程中會嚴格限制其佔用的百分比,但 Cgroup 只能隔離百分比而不能進行分核隔離。

通信接口

在分佈式內組件之間的溝通是十分重要的,因此接著介紹幾個常用的通信接口。如下圖所示是 Yarn 的基礎通信模型圖。

快速、安全、可靠!Yarn!

其中 JobClient 與 ResourceManager的通信協議為 ApplicationClientProtocol,客戶端通過該協議提交應用程序,查詢應用狀態;ApplicationMaster 與 ResourceManager 通信協議為 ApplicationMasterProtocol,ApplicationMaster 向 ResourceManager 註冊自己,併為各個任務申請資源;ApplicationMaster 與 NodeManager 的通信協議為 ContainerManagementProtocol,ApplicationMaster 要求 NodeManager 啟動或停止 Container,並獲取 Container 的狀態信息;NodeManager 與 ResourceManager 的通信協議為 ResourceTracker,NodeManager 向 ResourceManager 註冊自己,並定時發送心跳信息彙報當前節點的資源使用情況。

編程概述

編寫一個可以在 Yarn 上運行的任務,需要寫兩個類:用於啟動 AppMaster 的 Client 類和用於啟動 Container 的 AppMaster 類。

Client 類的具體過程描述如下:

Configuration conf = new Configuration();

// 1、創建並啟動一個yarnClient
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// 2、創建一個application
YarnClientApplication app = yarnClient.createApplication();
// 3、獲取application運行時的context
ApplicationSubmissionContext context = app.getApplicationSubmissionContext();
ApplicationId appId = context.getApplicationId();
System.out.println("create new applicationId ==> " + appId);

// 4、設置container運行完成後自動銷燬
context.setKeepContainersAcrossApplicationAttempts(false);
// 設置application的名稱
context.setApplicationName("ApplicationMaster");
// 5、定義該application所使用的內存和cpu,10mb,1cpu

Resource capability = Resource.newInstance(10, 1);
context.setResource(capability);
// 6、構建appmaster運行的container, 需要定義本地資源,運行時環境,和運行命令
ContainerLaunchContext amContainer = createAMContainerLanunchContext(conf, context.getApplicationId());
context.setAMContainerSpec(amContainer);
// 7、設置優先級,默認是0
context.setPriority(Priority.newInstance(0));
// 8、設置運行隊列
context.setQueue("default");
// 9、提交任務
yarnClient.submitApplication(context);

AppMaster 類的具體過程描述如下:

Configuration conf = new Configuration();
// 1、創建 AM 到 RM 的客戶端
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler());
amRMClient.init(conf);
amRMClient.start();
// 2、創建 AM 到 NM 的客戶端
amNMClient = new NMClientAsyncImpl(new NMCallbackHandler());
amNMClient.init(conf);
amNMClient.start();
// 3、將AM註冊到RM上,註冊成功後 會啟動AM和RM之間的心跳線程
amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1, "");
for (int i = 0; i < numTotalContainers.get(); i++) {
// 4、申請container資源
ContainerRequest containerAsk = new ContainerRequest(Resource.newInstance(100, 1), null, null, Priority.newInstance(0));
amRMClient.addContainerRequest(containerAsk);
}

儘管 Yarn 自帶的編程 API 已經得到了極大的簡化,但從頭開發一個 Yarn 應用程序仍是一件非常困難的事。

Apache Twill 這個項目則是為簡化 Yarn 上應用程序開發而成立的項目,該項目把與 Yarn 相關的重複性的工作封裝成庫,使得用戶可以專注於自己的應用程序邏輯,最後通過一個簡單的 helloworld 實例感受一下:

public class HelloWorld {
static Logger LOG = LoggerFactory.getLogger(HelloWorld.class);
static class HelloWorldRunnable extends AbstractTwillRunnable {
@Override
public void run() {
LOG.info("Hello World");
}
}
public static void main(String[] args) throws Exception {
YarnConfiguration conf = new YarnConfiguration();
TwillRunnerService runner = new YarnTwillRunnerService(conf, "localhost:2181");
runner.startAndWait();
TwillController controller = runner.prepare(new HelloWorldRunnable()).start();
Services.getCompletionFuture(controller).get();
}


分享到:


相關文章: