Yarn 的全稱是 Yet Anther Resource Negotiator(另一種資源協商者)。它作為 Hadoop 的一個組件,官方對它的定義是
一個工作調度和集群資源管理的框架。關於 Yarn 的發展歷史我們在之前的文章曾介紹過,在這裡就不贅述了。如上圖所示是 Yarn 的系統調度框架,Yarn 主要由四個核心組件進行協調工作,其中 ResourceManager 負責資源調度;NodeManager 負責資源管理,可啟動 Container;ApplicationMaster 管理具體的應用程序,負責啟動具體的任務;Container 設計比較精巧,將機器資源封裝後用於計算任務,它是具體執行任務最小的單元,若任務較大可並行多個 Container 共同執行,這也是分佈式任務的優勢。
Yarn VS Mesos
說到 Yarn 自然要聊聊 Mesos,Mesos 是以與 Linux 內核同樣的原則而創建的分佈式操作系統內核,Mesos 內核運行在每一個機器上同時通過 API 為各種應用提供跨數據中心和雲的資源管理調度能力。這些應用包括 Hadoop、Spark、Kafka、Elastic Search。還可配合框架 Marathon 來管理大規模的 Docker 等容器化應用,如下圖所示是它的架構圖:
與 Yarn 相同 Mesos 也主要由四個組件構成,它們之間根據功能可一一對應。
自然它們之間也存在一些差異,具體可總結為以下 3 點:
1.框架擔任的角色不同,在 Mesos 中各種計算框架是完全融入Mesos中的,若想在 Mesos 中添加一個新的計算框架,首先需要在 Mesos 中部署一套該框架;而在 Yarn 中,各種框架作為 client 端的 library 使用,不需要事先部署一套該框架,過程更簡易。
2.在資源調度方面,Mesos 只是簡單的將資源推送給各個應用程序,由應用程序選擇是否接受資源,Mesos 本身並不知道各個應用程序資源需求。而應用程序的 ApplicationMaster 會把各個任務的資源要求彙報給 Yarn,Yarn 則根據需要為應用程序分配資源;
3.Yarn 是 MapReduce 進化的產物,它是為 Hadoop jobs 管理資源而誕生的,只為 Hadoop jobs提供 static partitioning,而 Mesos 的設計目標是為各個框架提供dynamical partitioning,讓各個集群框架共用數據中心機器。因此 Mesos 定位在數據中心級別,Yarn 則更適合運行 Apache 生態圈的應用。
底層實現技術
Yarn 的底層實現技術主要分為 3 個部分:序列化、狀態機和通信模型。
序列化是在網絡傳輸中把一個對象按照一定的編碼格式通過 bit 數組序列化傳輸到另外一邊,再通過反序列化將其組裝成一個對象,它的目標是傳輸更少的東西卻包含更大的數據量。
Java 的 Serializable 雖然很簡便但傳輸量太大,不適合 Hadoop 的大數據量溝通。因此 Yarn 實現了一套 Writable 框架,在 MapReduce 1.0 被提出。這個框架更輕量級,但由於結構簡單因此性能和兼容性都較差,所以在 MapReduce 2.0 版本更新了
Protocol Buffers,這是谷歌開源的一款序列化框架,支持 Java、C++、Python 三種語言,性能有大幅度提升。後期又推出的 Avro 是 Hadoop 生態圈的序列化組件,它同時也是一個 rpc 框架,說不定未來會代替 Protocol Buffers~目前 Avro 僅支持 Java,值得一提的是它可以通過 Json 格式表達數據結構。由於 Yarn 有很多任務狀態,如 finish、running 等,都是通過狀態機觸發的。Yarn 的狀態機通過一個前置狀態到後置狀態是通過什麼事件觸發的,觸發之後調用回調函數,這四個組件組成一個最基本的狀態模型,這種設計十分契合狀態轉移的業務。
由於 Hadoop 組件是有心跳的,如果頻繁溝通使用 Java 的 rpc 組件 RMI 容易出現性能問題,因此 Yarn 需要通過更輕量級的框架來實現,它通過 Java 延伸了三個組件組合成了自有的 rpc 框架,分別是 Reflect 反射、Proxy 動態代理和 Socket 網絡編程。
接下來通過代碼實例更形象地感受下該組件的應用。YarnRPC 的核心代碼是兩個類:Invocation 和 Method,用來代表需要傳輸並實例化的類,其中 Invocation 的屬性如下:
private Class interfaces; // 實例類實現的接口
private Method method; // 實例類需要調用的方法
private Object[] params; // 方法的參數值
private Object result; // 返回結果
Method 的屬性如下:
private String methodName; // 方法的名稱
private Class[] params; // 方法的參數類型
從兩個類的屬性可以看出,YarnRPC 需要遠程調用的有用屬性都在這兩個類裡定義好了。
Server 端的 RPC 類需要通過反射實例化對應的類,過程如下:
public void call(Invocation invo) {
System.out.println(invo.getClass().getName());
Object obj = serviceEngine.get(invo.getInterfaces().getName());
if(obj!=null) {
try {
Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
Object result = m.invoke(obj, invo.getParams());
invo.setResult(result);
} catch (Throwable th) {
th.printStackTrace();
}
} else {
throw new IllegalArgumentException("has no these class");
}
}
Client 端的 RPC 類需要通過動態代理調用對應的類,過程如下:
public staticT getProxy(final Class clazz,String host,int port) {
final Client client = new Client(host,port);
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invo = new Invocation();
invo.setInterfaces(clazz);
invo.setMethod(new com.protocal.Method(method.getName(),method.getParameterTypes()));
invo.setParams(args);
client.invoke(invo);
return invo.getResult();
}
};
T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
return t;
}
結構模型
如上圖所示是 Yarn 的結構模型,各個組件協調將任務從客戶端提交到 Yarn 上面運行大致分為 6 個步驟。
1. 作業提交
Client 調用 job.waitForCompletion 方法,向整個集群提交 MapReduce program。 新的 Appcation ID 由 ResourceManager 分配,MapReduce program 的 client 核實作業的輸出,計算輸入的 split,將作業的資源(如 Jar 包、 配置文件等)同步至HDFS。最後通過調用 ResourceManager 的 submitApplication() 提交作業。
2. 作業初始化
當 ResourceManager 收到 submitApplciation() 請求時就將該請求發給調度器,調度器分配 container。然後 ResourceManager 在該 container 內啟動 ApplicationMaster 進程,由 NodeManager 監控。
MapReduce program 的 ApplicationMaster 是一個主類為 MRAppMaster 的 Java 應用,它通過創造一些 bookkeeping 對象來監控作業的進度,得到任務的進度和完成報告。然後通過分佈式文件系統得到由客戶端計算好的輸入 split,接著為每個輸入 split 創建一個 map 任務,根據 mapreduce.job.reduces 創建 reduce 任務對象。
3. 任務分配
若作業很小 ApplicationMaster 會選擇在其自己的 JVM 中運行任務,如果不是 ApplicationMaster 則會向 ResourceManager 請求 container 來運行所有的 map 和 reduce 任務。這些請求通過心跳來傳輸的,包括每個 map 任務的數據位置(比如存放輸入 split 的主機名和機架)。 調度器利用這些信息來調度任務,儘量將任務分配給存儲數據的節點,或者分配給和存放輸入 split 的節點相同機架的節點。
4. 任務運行
當一個任務由 ResourceManager 的調度器分配給一個 container後,ApplicationMaster 通過聯繫 NodeManager 來啟動 container。任務由一個主類為 YarnChild 的 Java 應用執行,在運行任務之前首先本地化任務需要的資源(如作業配置、JAR 文件以及分佈式緩存的所有文件),最後運行 map 或 reduce 任務。
5. 進度和狀態更新
Yarn 中的任務將其進度和狀態返回給 ApplicationMaster,客戶端每秒(通過 mapreduce.client.progressmonitor.pollinterval 設置)向 ApplicationMaster 請求進度更新並展示給用戶。
6. 作業完成
除了向 ApplicationMaster 請求作業進度外,客戶端每 5 分鐘都會通過調用 waitForCompletion() 來檢查作業是否完成,時間間隔可以通過 mapreduce.client.completion.pollinterval 來設置。作業完成之後 ApplicationMaster 和 container 會清理工作狀態,OutputCommiter 的作業清理方法也會被調用,作業的信息會被作業歷史服務器存儲以備之後用戶核查。
資源調度器
Yarn 是通過將資源分配給 queue 來進行資源分配的,每個 queue 可以設置它的資源分配方式,接著展開介紹 Yarn 的三種資源分配方式。
FIFO Scheduler
如果沒有配置策略的話,所有的任務都提交到一個 default 隊列,根據它們的提交順序執行。富裕資源就執行任務,若資源不富裕就等待前面的任務執行完畢後釋放資源,這就是 FIFO Scheduler 先入先出的分配方式。
如上圖所示,在 Job1 提交時佔用了所有的資源,不久後 Job2提交了,但是此時系統中已經沒有資源可以分配給它了。加入 Job1 是一個大任務,那麼 Job2 就只能等待一段很長的時間才能獲得執行的資源。所以先入先出的分配方式存在一個問題就是大任務會佔用很多資源,造成後面的小任務等待時間太長而餓死,因此一般不使用這個默認配置。
Capacity Scheduler
Capacity Scheduler 是一種多租戶、彈性的分配方式。每個租戶一個隊列,每個隊列可以配置能使用的資源上限與下限(譬如 50%,達到這個上限後即使其他的資源空置著,也不可使用),通過配置可以令隊列至少有資源下限配置的資源可使用。
上圖隊列 A 和隊列 B 分配了相互獨立的資源。Job1 提交給隊列 A 執行,它只能使用隊列 A 的資源。接著 Job2 提交給了隊列B 就不必等待 Job1 釋放資源了。這樣就可以將大任務和小任務分配在兩個隊列中,這兩個隊列的資源相互獨立,就不會造成小任務餓死的情況了。
Fair Scheduler
Fair Scheduler 是一種公平的分配方式,所謂的公平就是集群會儘可能地按配置的比例分配資源給隊列。
如上圖所示,Job1 提交給隊列 A,它佔用了集群的所有資源。接著 Job2 提交給了隊列 B,這時 Job1 就需要釋放它的一半的資源給隊列 A 中的 Job2 使用。接著 Job3 也提交給了隊列 B,這個時候 Job2 如果還未執行完畢的話也必須釋放一半的資源給 Job3。這就是公平的分配方式,在隊列範圍內所有任務享用到的資源都是均分的。
目前這三種調度方式用得最廣泛的就是 Capacity Scheduler 。
資源隔離原理
既然在調度中存在隊列的概念,自然免不了資源隔離。
Yarn 採用線程監控的方法判斷任務是否超量使用內存,一旦發現超量則直接將其 kill 掉。這種方式比較暴力,容易誤傷一些啟動時內存比較大的任務。默認情況下 NodeManager 不會對進行 CPU 隔離,我們可以通過啟用 Cgroup 支持,它會生成指定的配置文件設置資源佔有 CPU 的百分比,運行過程中會嚴格限制其佔用的百分比,但 Cgroup 只能隔離百分比而不能進行分核隔離。
通信接口
在分佈式內組件之間的溝通是十分重要的,因此接著介紹幾個常用的通信接口。如下圖所示是 Yarn 的基礎通信模型圖。
其中 JobClient 與 ResourceManager的通信協議為 ApplicationClientProtocol,客戶端通過該協議提交應用程序,查詢應用狀態;ApplicationMaster 與 ResourceManager 通信協議為 ApplicationMasterProtocol,ApplicationMaster 向 ResourceManager 註冊自己,併為各個任務申請資源;ApplicationMaster 與 NodeManager 的通信協議為 ContainerManagementProtocol,ApplicationMaster 要求 NodeManager 啟動或停止 Container,並獲取 Container 的狀態信息;NodeManager 與 ResourceManager 的通信協議為 ResourceTracker,NodeManager 向 ResourceManager 註冊自己,並定時發送心跳信息彙報當前節點的資源使用情況。
編程概述
編寫一個可以在 Yarn 上運行的任務,需要寫兩個類:用於啟動 AppMaster 的 Client 類和用於啟動 Container 的 AppMaster 類。
Client 類的具體過程描述如下:
Configuration conf = new Configuration();
// 1、創建並啟動一個yarnClient
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// 2、創建一個application
YarnClientApplication app = yarnClient.createApplication();
// 3、獲取application運行時的context
ApplicationSubmissionContext context = app.getApplicationSubmissionContext();
ApplicationId appId = context.getApplicationId();
System.out.println("create new applicationId ==> " + appId);
// 4、設置container運行完成後自動銷燬
context.setKeepContainersAcrossApplicationAttempts(false);
// 設置application的名稱
context.setApplicationName("ApplicationMaster");
// 5、定義該application所使用的內存和cpu,10mb,1cpu
Resource capability = Resource.newInstance(10, 1);
context.setResource(capability);
// 6、構建appmaster運行的container, 需要定義本地資源,運行時環境,和運行命令
ContainerLaunchContext amContainer = createAMContainerLanunchContext(conf, context.getApplicationId());
context.setAMContainerSpec(amContainer);
// 7、設置優先級,默認是0
context.setPriority(Priority.newInstance(0));
// 8、設置運行隊列
context.setQueue("default");
// 9、提交任務
yarnClient.submitApplication(context);
AppMaster 類的具體過程描述如下:
Configuration conf = new Configuration();
// 1、創建 AM 到 RM 的客戶端
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler());
amRMClient.init(conf);
amRMClient.start();
// 2、創建 AM 到 NM 的客戶端
amNMClient = new NMClientAsyncImpl(new NMCallbackHandler());
amNMClient.init(conf);
amNMClient.start();
// 3、將AM註冊到RM上,註冊成功後 會啟動AM和RM之間的心跳線程
amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1, "");
for (int i = 0; i < numTotalContainers.get(); i++) {
// 4、申請container資源
ContainerRequest containerAsk = new ContainerRequest(Resource.newInstance(100, 1), null, null, Priority.newInstance(0));
amRMClient.addContainerRequest(containerAsk);
}
儘管 Yarn 自帶的編程 API 已經得到了極大的簡化,但從頭開發一個 Yarn 應用程序仍是一件非常困難的事。
Apache Twill 這個項目則是為簡化 Yarn 上應用程序開發而成立的項目,該項目把與 Yarn 相關的重複性的工作封裝成庫,使得用戶可以專注於自己的應用程序邏輯,最後通過一個簡單的 helloworld 實例感受一下:
public class HelloWorld {
static Logger LOG = LoggerFactory.getLogger(HelloWorld.class);
static class HelloWorldRunnable extends AbstractTwillRunnable {
@Override
public void run() {
LOG.info("Hello World");
}
}
public static void main(String[] args) throws Exception {
YarnConfiguration conf = new YarnConfiguration();
TwillRunnerService runner = new YarnTwillRunnerService(conf, "localhost:2181");
runner.startAndWait();
TwillController controller = runner.prepare(new HelloWorldRunnable()).start();
Services.getCompletionFuture(controller).get();
}
閱讀更多 美圖數據技術團隊 的文章