如何在 Kylin 中優雅地使用 Spark

前言

Kylin 用戶在使用 Spark的過程中,經常會遇到任務提交緩慢、構建節點不穩定的問題。為了更方便地向 Spark 提交、管理和監控任務,有些用戶會使用 Livy 作為 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了通過 Apache Livy 遞交 Spark 任務的新功能[KYLIN-3795],特此感謝滴滴靳國衛同學對此功能的貢獻。

Livy 介紹

Apache Livy 是一個基於 Spark 的開源 REST 服務,是 Apache 基金會的一個孵化項目,它能夠通過 REST 的方式將代碼片段或是序列化的二進制代碼提交到 Spark 集群中去執行。它提供瞭如下基本功能:

  • 提交 Scala、Python 或是 R 代碼片段到遠端的 Spark 集群上執行。
  • 提交 Java、Scala、Python 所編寫的 Spark 作業到遠端的 Spark 集群上執行。
如何在 Kylin 中優雅地使用 Spark

△ Apache Livy 架構

為什麼使用 Livy

1. 當前 Spark 存在的問題

Spark 當前支持兩種交互方式:

  • 交互式處理用戶使用 spark-shell 或 pyspark 腳本啟動 Spark 應用程序,伴隨應用程序啟動的同時,Spark 會在當前終端啟動 REPL(Read–Eval–Print Loop) 來接收用戶的代碼輸入,並將其編譯成 Spark 作業。
  • 批處理批處理的程序邏輯由用戶實現並編譯打包成 jar 包,spark-submit 腳本啟動 Spark 應用程序來執行用戶所編寫的邏輯,與交互式處理不同的是批處理程序在執行過程中用戶沒有與 Spark 進行任何的交互。

兩種方式都需要用戶登錄到 Gateway 節點上通過腳本啟動 Spark 進程,但是會出現以下問題:

  • 增加 Gateway 節點的資源使用負擔和故障發生的可能性。
  • 同時 Gateway 節點的故障會帶來單點問題,造成 Spark 程序的失敗。
  • 難以管理、審計以及與已有的權限管理工具的集成。由於 Spark 採用腳本的方式啟動應用程序,因此相比於 WEB 方式少了許多管理、審計的便利性,同時也難以與已有的工具結合,如 Apache Knox 等。
  • 將 Gateway 節點上的部署細節以及配置不可避免地暴露給了登陸用戶。

2. Livy 優勢

一方面,接受並解析用戶的 REST 請求,轉換成相應的操作;另一方面,它管理著用戶所啟動的所有的 Spark 集群。

Livy 具有如下功能:

  • 通過 Livy session 實時提交代碼片段與 Spark 的 REPL 進行交互。
  • 通過 Livy batch 提交 Scala、Java、Python 編寫的二進制包來提交批處理任務。
  • 多用戶能夠使用同一個服務器(支持用戶模擬)。
  • 能夠通過 REST 接口在任何設備上提交任務、查看任務執行狀態和結果。

Kylin with Livy

1. 引入 Livy 之前 Kylin 是如何使用 Spark 的

Spark 是在 Kylin v2.0 引入的,主要應用於 Cube 構建,構建過程介紹可以查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

下面是 SparkExecutable 類的 doWork 方法關於提交 Spark job 的一段代碼,我們可以看到 Kylin 會從配置中獲取 Spark job 包的路徑(默認為 $KYLIN_HOME/lib),通過本地指令的形式提交 Spark job,然後循環獲取 Spark job 的執行狀態和結果。我們可以看到 Kylin 單獨開了一個線程在本地向 Spark 客戶端發送來 job 請求並且循環獲取結果,額外增加了節點系統壓力。

@Override

protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {

//略...

String jobJar = config.getKylinJobJarPath(); //獲取job jar的路徑

//略...

final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //構建本地command

//略...

//創建指令執行線程

Callable callable = new Callable>() {

@Override

public Pair call() throws Exception {

Pair result;

try {

result = exec.execute(cmd, patternedLogger);

} catch (Exception e) {

logger.error("error run spark job:", e);

result = new Pair<>(-1, e.getMessage());

}

return result;

}

};

//略...

try {

Future> future = executorService.submit(callable);

Pair result = null;

while (!isDiscarded() && !isPaused()) {

if (future.isDone()) {

result = future.get(); //循環獲取指令執行結果

break;

} else {

Thread.sleep(5000); //每隔5秒檢查一次job執行狀態

}

}

//略...

} catch (Exception e) {

logger.error("Error run spark job:", e);

return ExecuteResult.createError(e);

}

//略...

}

2. Livy for Kylin 詳細解析

Livy 向 Spark 提交 job 一共有兩種,分別是 Session 和 Batch,Kylin 是通過 Batch 的方式提交 job 的,需要提前構建好 Spark job 對應的 jar 包並上傳到 HDFS 中,並且將配置項 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。

Batch 一共具有如下九種狀態:

public enum LivyStateEnum { starting, running, success, dead, error, not_started, idle, busy, shutting_down;}下面是 SparkExecutableLivy 類的 doWork 方法和 LivyRestExecutor 類的 execute 方法關於提交 Spark job 的一段代碼,Kylin 通過 livyRestBuilder 讀取配置文件獲取 Spark job 的包路徑,然後通過 restClient 向 Livy 發送 Http 請求。在提交 job 之後會每隔 10 秒查詢一次 job 執行的結果,直到 job 的狀態變為 shutting_down, error, dead, success 中的一種。每一次都是通過 Http 的方式發送請求,相比較於通過本地 Spark 客戶端提交任務,更加穩定而且減少了 Kylin 節點系統壓力。

下面是 SparkExecutableLivy 類的 doWork 方法和 LivyRestExecutor 類的 execute 方法關於提交 Spark job 的一段代碼,Kylin 通過 livyRestBuilder 讀取配置文件獲取 Spark job 的包路徑,然後通過 restClient 向 Livy 發送 Http 請求。在提交 job 之後會每隔 10 秒查詢一次 job 執行的結果,直到 job 的狀態變為 shutting_down, error, dead, success 中的一種。每一次都是通過 Http 的方式發送請求,相比較於通過本地 Spark 客戶端提交任務,更加穩定而且減少了 Kylin 節點系統壓力。

@Override

protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {

//略...

livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job);

executor.execute(livyRestBuilder, patternedLogger); //調用LivyRestExecutor類的execute方法

if (isDiscarded()) {

return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");

}

if (isPaused()) {

return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");

}

//略...

}

public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) {

LivyRestClient restClient = new LivyRestClient();

String result = restClient.livySubmitJobBatches(dataJson); //向Livy發送http請求

JSONObject resultJson = new JSONObject(result);

String state = resultJson.getString("state"); //得到Livy請求結果

final String livyTaskId = resultJson.getString("id");

while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.error.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.dead.toString().equalsIgnoreCase(state)

&& !LivyStateEnum.success.toString().equalsIgnoreCase(state)) {

String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //獲取Spark job執行狀態

JSONObject stateJson = new JSONObject(statusResult);

if (!state.equalsIgnoreCase(stateJson.getString("state"))) {

logAppender.log("Livy status Result: " + stateJson.getString("state"));

}

state = stateJson.getString("state");

Thread.sleep(10*1000); //每10秒檢查一次結果

}

}

3. Livy 在 Kylin 中的應用

構建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 原本都是通過 Hive 客戶端(Cli 或 Beeline)進行構建的,引入 Livy 之後,Kylin 通過 Livy 來調用 SparkSQL 進行構建,提高了平表的構建速度。在引入 Livy 之後,Cube 的構建主要改變的是以下幾個步驟,對應的任務日誌輸出如下:

  • 構建 Intermediate Flat Hive Table
如何在 Kylin 中優雅地使用 Spark

△ 點擊圖片查看大圖

  • 構建 Redistribute Flat Hive Table
如何在 Kylin 中優雅地使用 Spark

△ 點擊圖片查看大圖

  • 使用 Spark-Submit 的地方都用 Livy 的 Batch API 進行替換

1)構建 Cube

如何在 Kylin 中優雅地使用 Spark

△ 點擊圖片查看大圖

2)轉換 Cuboid 為 HFile

如何在 Kylin 中優雅地使用 Spark

△ 點擊圖片查看大圖

4. 引入 Livy 對 Kylin 的好處

  • 無需準備 Spark 的客戶端配置,Kylin 部署更加輕量化。
  • Kylin 節點系統壓力更低,無需在 Kylin 節點啟動 Spark 客戶端。
  • 構建 Flat Hive Table 更快,通過 Livy 可以使用 Spark SQL 構建平表,而 Spark SQL 要快於 Hive。
  • 提交 job 更快,job 狀態獲取更方便。

5. 如何在 Kylin 中啟用 Livy

在 Kylin 啟用 Livy 前,請先確保 Livy 能夠正常工作

1)在 Kylin.properties 中,加入如下配置,並重啟使之生效。

其中 livy-key.file 和 livy-arr.jars 地址之間不要有空格,否則可能會出不可預知的錯誤。

2)Cube 構建引擎選用 Spark。

常見問題

以下問題往往為使用不當和配置錯誤的原因,非 Kylin 本身存在的問題,此處僅為友情提示。

1. Table or view not found

輸出日誌:

解決方法:

2. livy request 400 error

解決方法:

3. NoClassDefFoundError

輸出日誌:

解決方法:

4. livy sql 執行錯誤

解決方法:

總結

Livy 本質上是在 Spark 上的 REST 服務,對於 Kylin cube 的構建沒有本質上的性能提升,但是通過引入 Livy,Kylin 能夠直接通過 Spark SQL 代替 Hive 構建 Flat Table,而且管理 Spark job 也更加方便。但是,Livy 當前也存在一些問題,比如使用較低或較高版本的 Spark 無法正常工作以及單點故障等問題,用戶可以考慮自身的實際場景選擇是否需要在 Kylin 中使用 Livy。

參考文章

  1. https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/
  2. https://wiki.apache.org/incubator/LivyProposal
  3. https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/


分享到:


相關文章: