Spark Datasets 介紹

背景

開發者一直很喜歡Spark提供的簡簡潔而強大的API接口, 最小的編碼工作實現複雜的分析過程。

Databricks 通過引入DataFrames 和 Spark SQL 來改善Spark的可用性和性能。

DataFrames 和 Spark SQL 都是基於結構化的數據(比如數據庫表,JSON文件等)的上層API, 通過他們Spark可以自動優化存儲(Storage)和計算(computation)。

這些優化得益於Catalyst optimizer 和 Tungsten。 但是Spark的RDD API做不到這些,比如操作原始二進制形式的數據。

Datasets,他是DataFrame API的擴展, 提供了類型安全的,面向對象的變成接口。

Spark1.6 包含了Datasets的API預覽版本, 也是下幾個版本開發重點。

Datasets 利用Spark 的 Catalyst optimizer解析表達式和數據field 來生成執行計劃。

Datasets 還支持Tungsten的快速內存編碼(fast in-memory encoding).

Dataset繼承了這些優點並且是類型安全的,這意味著在編譯時能夠檢查錯誤。同時還擁有面向對象風格的接口。

Datasets 使用

Datasets 是強類型的、不可變的對象集合,集合中的對象映射為關係型數據的schema。

Datasets 的核心是一個新的概念Encoder,它負責JVM對象和tabular representation之間的轉換。

tabular representation 使用Spark的Tungsten 二進制格式,允許直接操作序列化的數據,提升內存使用。

Spark1.6 支持自動為一下類型生成encoder, 基本類型(比如:String, Integer ,Long), Scala case class, JavaBeans.

Datasets的API和RDD非常相似,提供了很多相同轉換函數(比如:map, flatMap, filter)。

下面的代碼,實現了從文本中讀取行,然後分割成單詞。

RDD:

Spark Datasets 介紹

Datasets:

Spark Datasets 介紹

RDD 和 Datasets 都可以很輕鬆通過lambda函數實現這些轉換。編譯器和IDE知道用那種類型,並且可以在構造數據管道時提供幫助提示和錯誤信息。

雖然這些高層次的代碼在句法上很相似,但是使用Datasets可以獲得關係型執行引擎的能力。

比如,執行一個聚合操作

RDD:

Spark Datasets 介紹

Datasets:

Spark Datasets 介紹

Datasets

Datasets 版本不止在代碼上更簡潔,執行速度上也更快。

從下面的對比圖,可以看出Datasets比RDD快。

相比之下,使用RDD要獲得同樣的性能,需要用戶手動考慮並行的執行計算。

Spark Datasets 介紹

Datasets API 的另一個優點就是降低內存使用。

因為Spark 知道Datasets中數據的結構,所以在緩存Dataset時能夠進行更多的優化。

下圖比較了RDD和Datasets緩存幾百萬字符串使用的內存。緩存,對於雙方來說都能夠導致性能的提升。但是,由於Dataset Encoder給Spark提供更多關於數據存儲的信息,所以緩存佔用空間更小,大概比RDD小4.5倍。

Spark Datasets 介紹

Encoders 光速序列化

Encoder 是被優化過的,在序列化和反序列化時,使用運行時代碼生成器構建bytecode。

所以速度明顯快於java或kryo序列化器。

除了速度,Encoder序列化佔用的空間更小(2倍),降低了網絡傳輸成本。此外,序列化的數據是Tungsten二進制格式,這意味著許多時候都可以直接操作數據,而不需要實例化整個對象。Spark支持的Encoder有原始類型(如String, Integer, Long),Scala case class和Java bean。未來會加入可自定義類型的Encoder。

Spark Datasets 介紹

無縫支持半結構化數據

Encoder還可以作為一個強大的橋樑,連接半結構化的格式(例如JSON)和類型安全的語言如Java和Scala。

例如,以下關於大學的數據集:

{"name": "UC Berkeley", "yearFounded": 1868, numStudents: 37581}

{"name": "MIT", "yearFounded": 1860, numStudents: 11318}

你可以簡單地定義一個類,並將輸入數據映射到類上,而不是人工提取字段,再把它們轉換成期望的類型。Spark會自動識別名稱和類型。

Spark Datasets 介紹

Encoder在映射的過程中,會先檢查定義的類的類型是否與數據相符,如果不相符,則能夠提供有用的錯誤信息,防止以不正確的方式處理TB級的數據。例如,如果我們使用的數據類型太小,轉換到一個對象時會導致截斷,這時Analyzer會拋出AnalysisException,如:如果numStudents是byte類型的,當有數據超過255時就會報錯。

case class University(numStudents: Byte)

val schools = sqlContext.read.json("/schools.json").as[University]

org.apache.spark.sql.AnalysisException: Cannot upcast yearFounded from bigint to smallint as it may truncate

執行映射時,Encoder將自動處理複雜類型,包括嵌套類、array和map。

Java和Scala的簡單API

Dataset的另一個目標是為Scala和java提供統一的接口。這種統一對應Java用戶來說是個好消息,因為它確保了Java的接口不會落後於Scala。代碼實例也會變得更通用,也不同處理輸入類型的少許區別了。對於Java用戶唯一不同的是需要指定Encoder,因為編譯器不提供類型信息。例如,如果想要處理JSON數據使用Java可以這樣做:

Spark Datasets 介紹

下一步展望

Dataset是一個新的API,它很容易與RDD和現有Spark項目相融合。只需要通過Dataset的rdd方法,就能將Dataset轉換成RDD。

從長遠來看,我們希望,Dataset可以成為編程時的首選。

Spark 2.0 版本的Dataset,我們計劃做以下改進:

  • 性能優化:在很多場景中,現在的Datasets實現不能利用額外信息增加性能,可能比RDD還慢 。在接下來的幾個版本中,我們將致力於改善這些新API的性能。
  • 自定義Encoder:開放自定義Encoder的API。
  • Python支持。
  • 統一DataFrame和Dataset:為了保證兼容性,DataFrame和Dataset目前不是繼承自共同的父類。Spark 2.0 版本,我們將會統一這些抽象,並儘可能不改變現有的 API,使得開發庫時能夠更容易的兼容DataFrame和Dataset。

  • 如果你想嘗試一下Dataset。我們提供的以下例子:Working with Classes, Word Count。https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Classes.html


分享到:


相關文章: