Ignite 與 Spark 都很強,那如果把它們整合起來會怎樣?

Ignite 與 Spark 都很強,那如果把它們整合起來會怎樣?

點擊右上方,關注開源中國OSC頭條號,獲取最新技術資訊

在前面的文章中,我們分別介紹了 Ignite 和 Spark 這兩種技術,從功能上對兩者進行了全面深入的對比。經過分析,可以得出這樣一個結論:兩者都很強大,但是差別很大,定位不同,因此會有不同的適用領域。

但是,這兩種技術也是可以互補的,那麼它們互補適用於場景是什麼呢?主要是這麼幾個方面:如果覺得 Spark 中的 SQL 等運行速度較慢,那麼 Ignite 通過自己的方式提供了對 Spark 應用進行進一步加速的解決方案,這方面可選的解決方案並不多,推薦開發者考慮,另外就是數據和狀態的共享,當然這方面的解決方案也有很多,並不是一定要用 Ignite 實現。

Ignite 原生提供了對 Spark 的支持,本文主要探討為何如何將 Ignite 和 Spark 進行集成。

1.將 Ignite 與 Spark 整合

整合這兩種技術會為 Spark 應用帶來若干明顯的好處:

  • 通過避免大量的數據移動,獲得真正可擴展的內存級性能;
  • 提高 RDD、DataFrame 和 SQL 的性能;
  • 在 Spark 作業之間更方便地共享狀態和數據。

下圖顯示瞭如何整合這兩種技術,並且標註了顯著的優勢:

Ignite 與 Spark 都很強,那如果把它們整合起來會怎樣?

通過該圖,可以從整體架構的角度看到 Ignite 在整個 Spark 應用中的位置和作用。

Ignite 對 Spark 的支持主要體現為兩個方面,一個是 Ignite RDD,一個是 Ignite DataFrame。本文會首先聚焦於 Ignite RDD,之後再講講 Ignite DataFrame。

2.Ignite RDD

Ignite 提供了一個SparkRDD的實現,叫做IgniteRDD,這個實現可以在內存中跨 Spark 作業共享任何數據和狀態,IgniteRDD為 Ignite 中相同的內存數據提供了一個共享的、可變的視圖,它可以跨多個不同的 Spark 作業、工作節點或者應用,相反,原生的 SparkRDD 無法在 Spark 作業或者應用之間進行共享。

IgniteRDD作為 Ignite 分佈式緩存的視圖,既可以在 Spark 作業執行進程中部署,也可以在 Spark 工作節點中部署,也可以在它自己的集群中部署。因此,根據預配置的部署模型,狀態共享既可以只存在於一個 Spark 應用的生命週期內部(嵌入式模式),也可以存在於 Spark 應用的外部(獨立模式)。

Ignite 還可以幫助 Spark 應用提高 SQL 的性能,雖然 SparkSQL 支持豐富的 SQL 語法,但是它沒有實現索引。從結果上來說,即使在普通較小的數據集上,Spark 查詢也可能花費幾分鐘的時間,因為需要進行全表掃描。如果使用 Ignite,Spark 用戶可以配置主索引和二級索引,這樣可以帶來上千倍的性能提升。

2.1.IgniteRDD 示例

下面通過一些代碼以及創建若干應用的方式,展示 IgniteRDD 帶來的好處。

可以使用多種語言來訪問 Ignite RDD,這對於有跨語言需求的團隊來說有友好的,下邊代碼共包括兩個簡單的 Scala 應用和兩個 Java 應用。此外,會從兩個不同的環境運行應用:從終端運行 Scala 應用以及通過 IDE 運行 Java 應用。另外還會在 Java 應用中運行一些 SQL 查詢。

對於 Scala 應用,一個應用會用於往 IgniteRDD 中寫入數據,而另一個應用會執行部分過濾然後返回結果集。使用 Maven 將代碼構建為一個 jar 文件後在終端窗口中執行這個程序,下面是詳細的代碼:

object RDDWriter extends App {
val conf = new SparkConf().setAppName("RDDWriter")
val sc = new SparkContext(conf)
val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i)))
ic.close(true)
sc.stop()
}
object RDDReader extends App {
val conf = new SparkConf().setAppName("RDDReader")
val sc = new SparkContext(conf)
val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500)
println("The count is " + greaterThanFiveHundred.count())
ic.close(true)
sc.stop()
}

在這個 Scala 的RDDWriter中,首先創建了包含應用名的SparkConf,之後基於這個配置創建了SparkContext,最後,根據這個SparkContext創建一個IgniteContext。創建IgniteContext有很多種方法,本例中使用一個叫做example-shared-rdd.xml的 XML 文件,該文件會結合 Ignite 發行版然後根據需求進行預配置。顯然,需要根據自己的環境修改路徑(Ignite 主目錄),之後指定 IgniteRDD 持有的整數值元組,最後,將從 1 到 1000 的整數值存入 IgniteRDD,數值的存儲使用了 10 個 parallel 操作。

在這個 Scala 的RDDReader中,初始化和配置與 Scala RDDWriter相同,也會使用同一個 XML 配置文件,應用會執行部分過濾,然後關注存儲了多少大於 500 的值,答案最後會輸出。

關於IgniteContext和IgniteRDD的更多信息,可以看 Ignite 的文檔。

要構建 jar 文件,可以使用下面的 maven 命令:

mvn clean install

接下來,看下 Java 代碼,先寫一個 Java 應用往IgniteRDD中寫入多個記錄,然後另一個應用會執行部分過濾然後返回結果集,下面是RDDWriter的代碼細節:

public class RDDWriter {
public static void main(String args[]) {
SparkConf sparkConf = new SparkConf().setAppName("RDDWriter").setMaster("local").set("spark.executor.instances", "2");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Logger.getRootLogger().setLevel(Level.OFF);
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
JavaIgniteContext<integer> igniteContext = new JavaIgniteContext<integer>(
sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

JavaIgniteRDD<integer> sharedRDD = igniteContext.<integer>fromCache("sharedRDD");
List<integer> data = new ArrayList<>(20);
for (int i = 1001; i <= 1020; i++) {
data.add(i);
}
JavaRDD<integer> javaRDD = sparkContext.<integer>parallelize(data);
sharedRDD.savePairs(javaRDD.<integer>mapToPair(new PairFunction<integer>() {
public Tuple2<integer> call(Integer val) throws Exception {
return new Tuple2<integer>(val, val);
}
}));
igniteContext.close(true);
sparkContext.close();
}
}
/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>/<integer>

在這個 Java 的RDDWriter中,首先創建了包含應用名和執行器數量的SparkConf,之後基於這個配置創建了SparkContext,最後,根據這個SparkContext創建一個IgniteContext。最後,往 IgniteRDD 中添加了額外的 20 個值。

在這個 Java 的RDDReader中,初始化和配置與 Java RDDWriter相同,也會使用同一個 XML 配置文件,應用會執行部分過濾,然後關注存儲了多少大於 500 的值,答案最後會輸出,下面是 Java RDDReader的代碼:

public class RDDReader {
public static void main(String args[]) {
SparkConf sparkConf = new SparkConf().setAppName("RDDReader").setMaster("local").set("spark.executor.instances", "2");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Logger.getRootLogger().setLevel(Level.OFF);
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
JavaIgniteContext<integer> igniteContext = new JavaIgniteContext<integer>(
sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);
JavaIgniteRDD<integer> sharedRDD = igniteContext.<integer>fromCache("sharedRDD");
JavaPairRDD<integer> greaterThanFiveHundred = sharedRDD.filter(new Function<tuple2>, Boolean>() {
public Boolean call(Tuple2<integer> tuple) throws Exception {
return tuple._2() > 500;
}
});
System.out.println("The count is " + greaterThanFiveHundred.count());
System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10");

df.show();
igniteContext.close(true);
sparkContext.close();
}
}
/<integer>/<tuple2>/<integer>/<integer>/<integer>/<integer>/<integer>

到這裡就可以對代碼進行測試了。

2.2.運行應用

在第一個終端窗口中,啟動 Spark 的主節點,如下:

$SPARK_HOME/sbin/start-master.sh

在第二個終端窗口中,啟動 Spark 工作節點,如下:

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port

根據自己的環境,修改 IP 地址和端口號(ip:port)。

在第三個終端窗口中,啟動一個 Ignite 節點,如下:

$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml

這裡使用了之前討論過的example-shared-rdd.xml文件。

在第四個終端窗口中,可以運行 Scala 版的 RDDWriter 應用,如下:

$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

根據自己的環境修改 IP 地址和端口(ip:port),以及 jar 文件的路徑(/path_to_jar_file)。

會產生如下的輸出:

The count is 500

這是期望的輸出。

接下來,殺掉 Spark 的主節點和工作節點,而 Ignite 節點仍然在運行中並且IgniteRDD對於其它應用仍然可用,下面會使用 IDE 通過 Java 應用接入IgniteRDD。

運行 Java 版RDDWriter會擴展之前存儲於 IgniteRDD 中的記錄列表,通過運行 Java 版RDDReader可以進行測試,它會產生如下的輸出:

The count is 520

這也是期望的輸出。

最後,SQL 查詢會在IgniteRDD中執行一個 SELECT 語句,返回範圍在 10 到 100 之間的最初 10 個值,輸出如下:

Ignite 與 Spark 都很強,那如果把它們整合起來會怎樣?

結果正確。

3.IgniteDataframes

Spark 的 DataFrame API 為描述數據引入了模式的概念,Spark 通過表格的形式進行模式的管理和數據的組織。

DataFrame 是一個組織為命名列形式的分佈式數據集,從概念上講,DataFrame 等同於關係數據庫中的表,並允許 Spark 使用 Catalyst 查詢優化器來生成高效的查詢執行計劃。而 RDD 只是跨集群節點分區化的元素集合。

Ignite 擴展了 DataFrames,簡化了開發,改進了將 Ignite 作為 Spark 的內存存儲時的數據訪問時間,好處包括:

  • 通過 Ignite 讀寫 DataFrames 時,可以在 Spark 作業之間共享數據和狀態;
  • 通過優化 Spark 的查詢執行計劃加快 SparkSQL 查詢,這些主要是通過 IgniteSQL 引擎的高級索引以及避免了 Ignite 和 Spark 之間的網絡數據移動實現的。

3.1.IgniteDataframes 示例

下面通過一些代碼以及搭建幾個小程序的方式,瞭解如何通過 Ignite DataFrames 整合 Ignite 與 Spark。

一共會寫兩個 Java 的小應用,然後在 IDE 中運行,還會在這些 Java 應用中執行一些 SQL 查詢。

一個 Java 應用會從 JSON 文件中讀取一些數據,然後創建一個存儲於 Ignite 的 DataFrame,這個 JSON 文件 Ignite 的發行版中已經提供,另一個 Java 應用會從 Ignite 的 DataFrame 中讀取數據然後使用 SQL 進行查詢。

下面是寫應用的代碼:

public class DFWriter {
private static final String CONFIG = "config/example-ignite.xml";
public static void main(String args[]) {
Ignite ignite = Ignition.start(CONFIG);
SparkSession spark = SparkSession.builder().appName("DFWriter").master("local").config("spark.executor.instances", "2").getOrCreate();
Logger.getRootLogger().setLevel(Level.OFF);
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
Dataset peopleDF = spark.read().json(
resolveIgnitePath("resources/people.json").getAbsolutePath());
System.out.println("JSON file contents:");
peopleDF.show();
System.out.println("Writing DataFrame to Ignite.");
peopleDF.write().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated").save();
System.out.println("Done!");
Ignition.stop(false);
}
}

在DFWriter中,首先創建了SparkSession,它包含了應用名,之後會使用spark.read().json()讀取 JSON 文件並且輸出文件內容,下一步是將數據寫入 Ignite 存儲。下面是DFReader的代碼:

public class DFReader {
private static final String CONFIG = "config/example-ignite.xml";
public static void main(String args[]) {
Ignite ignite = Ignition.start(CONFIG);
SparkSession spark = SparkSession.builder().appName("DFReader").master("local").config("spark.executor.instances", "2").getOrCreate();
Logger.getRootLogger().setLevel(Level.OFF);
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
System.out.println("Reading data from Ignite table.");
Dataset peopleDF = spark.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").load();

peopleDF.createOrReplaceTempView("people");
Dataset sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");
sqlDF.show();
System.out.println("Done!");
Ignition.stop(false);
}
}

在DFReader中,初始化和配置與DFWriter相同,這個應用會執行一些過濾,需求是查找所有的 id > 0 以及 < 6 的人,然後輸出結果。

在 IDE 中,通過下面的代碼可以啟動一個 Ignite 節點:

public class ExampleNodeStartup {
public static void main(String[] args) throws IgniteException {
Ignition.start("config/example-ignite.xml");
}
}

到此,就可以對代碼進行測試了。

3.2.運行應用

首先在 IDE 中啟動一個 Ignite 節點,然後運行DFWriter應用,輸出如下:

Ignite 與 Spark 都很強,那如果把它們整合起來會怎樣?

如果將上面的結果與 JSON 文件的內容進行對比,會顯示兩者是一致的,這也是期望的結果。

下一步運行DFReader,輸出如下:

Ignite 與 Spark 都很強,那如果把它們整合起來會怎樣?

這也是期望的輸出。

4.總結

通過本文,會發現 Ignite 與 Spark 的集成很簡單,也可以看到如何從多個環境中使用多個編程語言輕鬆地訪問IgniteRDD。可以對IgniteRDD進行數據的讀寫,並且即使 Spark 已經關閉狀態也能通過 Ignite 得以保持,也看到了通過 Ignite 進行 DataFrame 的讀寫。讀者可以輕鬆嘗試一下。

如果想要這些示例的源代碼,可以從這裡下載。

作者

李玉珏,架構師,有豐富的架構設計和技術研發團隊管理經驗,社區技術翻譯作者以及撰稿人,開源技術貢獻者。Apache Ignite 技術中文文檔翻譯作者,長期在國內進行 Ignite 技術的推廣/技術支持/諮詢工作。

  • 互聯網技術相關,包括但不限於開發語言、網絡、數據庫、架構、運維、前端、DevOps(DevXXX)、AI、區塊鏈、存儲、移動、安全、技術團隊管理等內容。
  • 文章不需要首發,可以是已經在開源中國博客或網上其它平臺發佈過的。但是鼓勵首發,首發內容被收錄可能性較大。
  • 如果你是記錄某一次解決了某一個問題(這在博客中佔絕大比例),那麼需要將問題的前因後果描述清楚,最直接的就是結合圖文等方式將問題復現,同時完整地說明解決思路與最終成功的方案。
  • 如果你是分析某一技術理論知識,請從定義、應用場景、實際案例、關鍵技術細節、觀點等方面,對其進行較為全面地介紹。
  • 如果你是以實際案例分享自己或者公司對諸如某一架構模型、通用技術、編程語言、運維工具的實踐,那麼請將事件相關背景、具體技術細節、演進過程、思考、應用效果等方面描述清楚
  • 其它未盡 case 具體情況具體分析,不虛的,文章投過來試試先,比如我們並不拒絕就某個熱點事件對其進行的報導、深入解析。

重要說明

  • 作者需要擁有所投文章的所有權,不能將別人的文章拿過來投遞。
  • 投遞的文章需要經過審核,如果開源中國編輯覺得需要的話,將與作者一起進一步完善文章,意在使文章更佳、傳播更廣。
  • 文章版權歸作者所有,開源中國獲得文章的傳播權,可在開源中國各個平臺進行文章傳播,同時保留文章原始出處和作者信息,可在官方博客中標原創標籤。

↓↓↓


分享到:


相關文章: