Apache Spark,Parquet和麻煩的Null

關於類型安全性的經驗教訓,並承擔過多

介紹

在將SQL分析ETL管道遷移到客戶端的新Apache Spark批處理ETL基礎結構時,我注意到了一些奇特的東西。 開發的基礎結構具有可為空的DataFrame列架構的概念。 乍看起來似乎並不奇怪。 大多數(如果不是全部)SQL數據庫都允許列為可空或不可空,對嗎? 讓我們研究一下在創建Spark DataFrame時,這種看似明智的概念為什麼會帶來問題。

<code>from pyspark.sql import types
schema = types.StructType([
types.StructField("index", types.LongType(), False),
types.StructField("long", types.LongType(), True),
])
df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df.printSchema()/<code>

此代碼塊在將為空的DataFrame df上強制實施模式。 df.printSchema()將為我們提供以下內容:

root
|-- index: long (nullable = false)
|-- long: long (nullable = true)

可以看出,內存中的DataFrame繼承了已定義模式的可空性。 但是,這有點誤導。 Spark中的列可空性是一個優化語句; 不是對象類型的強制。

在本文中,我們將主要介紹通過Parquet創建和保存DataFrame的行為。 實木複合地板的文件格式和設計將不作深入介紹。

Nullable對DataFrame列意味著什麼?

…當您定義一個架構,在該架構中所有列均聲明為不具有空值時– Spark不會強制執行該操作,並且會很樂意讓空值進入該列。 可為空的信號只是為了幫助Spark SQL優化處理該列。 如果列中的空值不應包含空值,則可能會得到錯誤的結果,或者會看到難以調試的奇怪異常。 —《 Apache Spark數據工程師指南》; 第74頁

當一列被聲明為不具有空值時,Spark不會強制執行此聲明。 無論用戶定義的調用代碼是否聲明為可空,Spark都不會執行空檢查。 列的可為空特性是與Catalyst Optimizer簽訂的一項合同,該協議不會產生空數據。 如有任何疑問,健康的做法是始終將其設置為true。 在像JSON / CSV這樣的實例中將默認值默認為null以支持更寬鬆類型的數據源是有意義的。 更重要的是,忽略可空性是Spark的保守選擇。 Apache Spark無法控制要查詢的數據及其存儲,因此默認為代碼安全行為。 例如,始終可以以特殊方式將文件添加到DFS(分佈式文件服務器),這將違反任何已定義的數據完整性約束。

從Parquet創建DataFrame

從Parquet文件路徑創建DataFrame對用戶來說很容易。 可以通過調用實例化DataFrameReader的SparkSession.read.parquet()或SparkSession.read.load('path / to / data.parquet')來完成。¹在將外部數據轉換為DataFrame的過程中,數據 由Spark推斷模式,併為攝取Parquet零件文件的Spark作業設計查詢計劃。

調用模式推斷時,將設置一個標誌來回答問題:"是否應合併所有Parquet零件文件中的模式?"當多個Parquet文件具有不同的架構時,可以將它們合併。默認行為是不合並架構。²然後區分出解析架構所需的文件。如果不需要合併,Spark總是首先嚐試摘要文件。在這種情況下,_common_metadata比_metadata更可取,因為它不包含行組信息,並且對於具有許多行組的大型Parquet文件而言,它可能要小得多。如果摘要文件不可用,則行為是回退到隨機的零件文件。³在默認情況下(未將架構合併標記為必要),Spark將首先嚐試任意_common_metadata文件,然後回退至任意_metadata,最後是任意部分文件,並假定(正確或不正確)方案是一致的。設置了要合併的文件後,該操作將由分佈式Spark作業完成。⁴請務必注意,數據架構始終斷言為可空值。簡而言之,這是因為QueryPlan()重新創建了保存架構的StructType,但強制所有包含的字段為空。

Parquet的書寫方式

"…編寫Parquet文件時,出於兼容性原因,所有列都將自動轉換為可為空。" -Spark Docs

因此,說您找到了在Spark作業的列級強制執行null的一種方法。 不幸的是,一旦您寫信給Parquet,該執行就失效了。 為了從更高層次描述SparkSession.write.parquet(),它將從給定的DataFrame中創建一個DataSource,實施為Parquet提供的默認壓縮,構建優化的查詢,並使用可為空的模式複製數據。 可以將其大致描述為DataFrame創建的逆過程。

一些實驗

在最後一部分中,我將提供一些有關默認行為的預期示例。

在調查對Parquet的寫入時,有兩種選擇:

· 在建立DataFrame上使用手動定義的架構

<code>schema = types.StructType([
types.StructField("index", types.LongType(), False),
types.StructField("long", types.LongType(), True),
])
data = [
(1, 6),
(2, 7),
(3, None),
(4, 8),
(5, 9)
]
df_w_schema = sqlContext.createDataFrame(data, schema)
df_w_schema.collect()
df_w_schema.write.parquet('nullable_check_w_schema')
df_parquet_w_schema = sqlContext.read.schema(schema).parquet('nullable_check_w_schema')

df_parquet_w_schema.printSchema()/<code>

此處完成的工作是定義模式和數據集。 在寫入之前,該模式的可空性得到了加強。 但是,一旦將DataFrame寫入Parquet,就可以看到所有列的空性都從窗口中消失了,就像從傳入的DataFrame中獲得printSchema()的輸出一樣。

root
|-- index: long (nullable = true)
|-- long: long (nullable = true)

2.未定義架構

<code>df_wo_schema = sqlContext.createDataFrame(data)
df_wo_schema.collect()
df_wo_schema.write.mode('overwrite').parquet('nullable_check_wo_schema')
df_parquet_wo_schema = sqlContext.read.parquet('nullable_check_wo_schema')
df_parquet_wo_schema.printSchema()/<code>

與1一樣,我們定義了相同的數據集,但是缺少"強制"模式。 結果可以看作是

root
|-- _1: long (nullable = true)
|-- _2: long (nullable = true)

無論是否聲明架構,都不會強制實現可空性。

腳註

[1] DataFrameReader是DataFrame與外部存儲之間的接口。

[2] PARQUET_SCHEMA_MERGING_ENABLED:為true時,Parquet數據源合併從所有數據文件收集的模式,否則從摘要文件或隨機數據文件中選擇該模式(如果沒有可用的摘要文件)。

[3]摘要文件中存儲的元數據將從所有零件文件中合併。 但是,對於用戶定義的鍵值元數據(我們在其中存儲Spark SQL模式),如果鍵與單獨的零件文件中的不同值相關聯,Parquet不知道如何正確合併它們。 發生這種情況時,Parquet停止生成摘要文件,這意味著存在摘要文件時,則:

一種。 所有部分文件都具有完全相同的Spark SQL模式或orb。 有些部分文件根本不在鍵值元數據中包含Spark SQL模式(因此它們的模式可能彼此不同)。

Spark扮演悲觀主義者,並考慮了第二種情況。 這意味著如果用戶需要合併的架構,並且必須分析所有零件文件以進行合併,則摘要文件將不受信任。

[4]不考慮地點。 此優化對於S3記錄系統主要有用。 由於S3節點的計算限制,S3文件元數據操作可能很慢,並且本地性不可用。

並行性受合併文件的數量限制。 因此,並行度為2的SparkSession只有一個合併文件,它將使用一個執行程序啟動一個Spark作業。

(本文翻譯自Wesley Hoffman的文章《Apache Spark, Parquet, and Troublesome Nulls》,參考:https://medium.com/@weshoffman/apache-spark-parquet-and-troublesome-nulls-28712b06f836)


分享到:


相關文章: