如何在 Kylin 中優雅地使用 Spark

前言

Kylin 用戶在使用 Spark的過程中,經常會遇到任務提交緩慢、構建節點不穩定的問題。為了更方便地向 Spark 提交、管理和監控任務,有些用戶會使用 Livy 作為 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了通過 Apache Livy 遞交 Spark 任務的新功能[KYLIN-3795],特此感謝滴滴靳國衛同學對此功能的貢獻。

Livy 介紹

Apache Livy 是一個基於 Spark 的開源 REST 服務,是 Apache 基金會的一個孵化項目,它能夠通過 REST 的方式將代碼片段或是序列化的二進制代碼提交到 Spark 集群中去執行。它提供瞭如下基本功能:

提交 Scala、Python 或是 R 代碼片段到遠端的 Spark 集群上執行。提交 Java、Scala、Python 所編寫的 Spark 作業到遠端的 Spark 集群上執行。

△ Apache Livy 架構

為什麼使用 Livy

1. 當前 Spark 存在的問題

Spark 當前支持兩種交互方式:

交互式處理用戶使用 spark-shell 或 pyspark 腳本啟動 Spark 應用程序,伴隨應用程序啟動的同時,Spark 會在當前終端啟動 REPL(Read–Eval–Print Loop) 來接收用戶的代碼輸入,並將其編譯成 Spark 作業。批處理批處理的程序邏輯由用戶實現並編譯打包成 jar 包,spark-submit 腳本啟動 Spark 應用程序來執行用戶所編寫的邏輯,與交互式處理不同的是批處理程序在執行過程中用戶沒有與 Spark 進行任何的交互。

兩種方式都需要用戶登錄到 Gateway 節點上通過腳本啟動 Spark 進程,但是會出現以下問題:

增加 Gateway 節點的資源使用負擔和故障發生的可能性。同時 Gateway 節點的故障會帶來單點問題,造成 Spark 程序的失敗。難以管理、審計以及與已有的權限管理工具的集成。由於 Spark 採用腳本的方式啟動應用程序,因此相比於 WEB 方式少了許多管理、審計的便利性,同時也難以與已有的工具結合,如 Apache Knox 等。將 Gateway 節點上的部署細節以及配置不可避免地暴露給了登陸用戶。

2. Livy 優勢

一方面,接受並解析用戶的 REST 請求,轉換成相應的操作;另一方面,它管理著用戶所啟動的所有的 Spark 集群。

Livy 具有如下功能:

通過 Livy session 實時提交代碼片段與 Spark 的 REPL 進行交互。通過 Livy batch 提交 Scala、Java、Python 編寫的二進制包來提交批處理任務。多用戶能夠使用同一個服務器(支持用戶模擬)。能夠通過 REST 接口在任何設備上提交任務、查看任務執行狀態和結果。

Kylin with Livy

1. 引入 Livy 之前 Kylin 是如何使用 Spark 的

Spark 是在 Kylin v2.0 引入的,主要應用於 Cube 構建,構建過程介紹可以查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

下面是 SparkExecutable 類的 doWork 方法關於提交 Spark job 的一段代碼,我們可以看到 Kylin 會從配置中獲取 Spark job 包的路徑(默認為 $KYLIN_HOME/lib),通過本地指令的形式提交 Spark job,然後循環獲取 Spark job 的執行狀態和結果。我們可以看到 Kylin 單獨開了一個線程在本地向 Spark 客戶端發送來 job 請求並且循環獲取結果,額外增加了節點系統壓力。

@Override

protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {

//略...

String jobJar = config.getKylinJobJarPath(); //獲取job jar的路徑

//略...

final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //構建本地command

//略...

//創建指令執行線程

Callable callable = new Callable>() {

@Override

public Pair call() throws Exception {

Pair result;

try {

result = exec.execute(cmd, patternedLogger);

} catch (Exception e) {

logger.error("error run spark job:", e);

result = new Pair<>(-1, e.getMessage());

}

return result;

}

};

//略...

try {

Future> future = executorService.submit(callable);

Pair result = null;

while (!isDiscarded() && !isPaused()) {

if (future.isDone()) {

result = future.get(); //循環獲取指令執行結果

break;

} else {

Thread.sleep(5000); //每隔5秒檢查一次job執行狀態

}

}

//略...

} catch (Exception e) {

logger.error("Error run spark job:", e);

return ExecuteResult.createError(e);

}

//略...

}

2. Livy for Kylin 詳細解析

Livy 向 Spark 提交 job 一共有兩種,分別是 Session 和 Batch,Kylin 是通過 Batch 的方式提交 job 的,需要提前構建好 Spark job 對應的 jar 包並上傳到 HDFS 中,並且將配置項 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。

Batch 一共具有如下九種狀態:

public enum LivyStateEnum { starting, running, success, dead, error, not_started, idle, busy, shutting_down;}下面是 SparkExecutableLivy 類的 doWork 方法和 LivyRestExecutor 類的 execute 方法關於提交 Spark job 的一段代碼,Kylin 通過 livyRestBuilder 讀取配置文件獲取 Spark job 的包路徑,然後通過 restClient 向 Livy 發送 Http 請求。在提交 job 之後會每隔 10 秒查詢一次 job 執行的結果,直到 job 的狀態變為 shutting_down, error, dead, success 中的一種。每一次都是通過 Http 的方式發送請求,相比較於通過本地 Spark 客戶端提交任務,更加穩定而且減少了 Kylin 節點系統壓力。

下面是 SparkExecutableLivy 類的 doWork 方法和 LivyRestExecutor 類的 execute 方法關於提交 Spark job 的一段代碼,Kylin 通過 livyRestBuilder 讀取配置文件獲取 Spark job 的包路徑,然後通過 restClient 向 Livy 發送 Http 請求。在提交 job 之後會每隔 10 秒查詢一次 job 執行的結果,直到 job 的狀態變為 shutting_down, error, dead, success 中的一種。每一次都是通過 Http 的方式發送請求,相比較於通過本地 Spark 客戶端提交任務,更加穩定而且減少了 Kylin 節點系統壓力。

@Override

protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {

//略...

livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);

executor.execute(livyRestBuilder, patternedLogger); //調用LivyRestExecutor類的execute方法

if (isDiscarded()) {

return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");

}

if (isPaused()) {

return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");

}

//略...

}

public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {

LivyRestClient restClient = new LivyRestClient();

String result = restClient.livySubmitJobBatches(dataJson); //向Livy發送http請求

JSONObject resultJson = new JSONObject(result);

String state = resultJson.getString("state"); //得到Livy請求結果

final String livyTaskId = resultJson.getString("id");

while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.error.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.dead.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {

String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //獲取Spark job執行狀態

JSONObject stateJson = new JSONObject(statusResult);

if (!state.equalsIgnoreCase(stateJson.getString("state"))) {

logAppender.log("Livy status Result: " + stateJson.getString("state"));

}

state = stateJson.getString("state");

Thread.sleep(10*1000); //每10秒檢查一次結果

}

}

3. Livy 在 Kylin 中的應用

構建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 原本都是通過 Hive 客戶端(Cli 或 Beeline)進行構建的,引入 Livy 之後,Kylin 通過 Livy 來調用 SparkSQL 進行構建,提高了平表的構建速度。在引入 Livy 之後,Cube 的構建主要改變的是以下幾個步驟,對應的任務日誌輸出如下:

構建 Intermediate Flat Hive Table

△ 點擊圖片查看大圖

構建 Redistribute Flat Hive Table

△ 點擊圖片查看大圖

使用 Spark-Submit 的地方都用 Livy 的 Batch API 進行替換

1)構建 Cube

△ 點擊圖片查看大圖

2)轉換 Cuboid 為 HFile

△ 點擊圖片查看大圖

4. 引入 Livy 對 Kylin 的好處

無需準備 Spark 的客戶端配置,Kylin 部署更加輕量化。Kylin 節點系統壓力更低,無需在 Kylin 節點啟動 Spark 客戶端。構建 Flat Hive Table 更快,通過 Livy 可以使用 Spark SQL 構建平表,而 Spark SQL 要快於 Hive。提交 job 更快,job 狀態獲取更方便。

5. 如何在 Kylin 中啟用 Livy

在 Kylin 啟用 Livy 前,請先確保 Livy 能夠正常工作

1)在 Kylin.properties 中,加入如下配置,並重啟使之生效。

其中 livy-key.file 和 livy-arr.jars 地址之間不要有空格,否則可能會出不可預知的錯誤。

2)Cube 構建引擎選用 Spark。

常見問題

以下問題往往為使用不當和配置錯誤的原因,非 Kylin 本身存在的問題,此處僅為友情提示。

1. Table or view not found

輸出日誌:

解決方法:

2. livy request 400 error

解決方法:

3. NoClassDefFoundError

輸出日誌:

解決方法:

4. livy sql 執行錯誤

解決方法:

總結

Livy 本質上是在 Spark 上的 REST 服務,對於 Kylin cube 的構建沒有本質上的性能提升,但是通過引入 Livy,Kylin 能夠直接通過 Spark SQL 代替 Hive 構建 Flat Table,而且管理 Spark job 也更加方便。但是,Livy 當前也存在一些問題,比如使用較低或較高版本的 Spark 無法正常工作以及單點故障等問題,用戶可以考慮自身的實際場景選擇是否需要在 Kylin 中使用 Livy。

參考文章

https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/https://wiki.apache.org/incubator/LivyProposalhttps://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/