這次,我將嘗試解釋如何將Apache Arrow與Apache Spark和Python結合使用。 首先,讓我分享有關此開源項目的一些基本概念。
Apache Arrow是用於內存數據的跨語言開發平臺。 它為平面和分層數據指定了一種與語言無關的標準化列式存儲格式,該格式組織用於在現代硬件上進行有效的分析操作。 [Apache箭頭頁面]
簡而言之,它促進了許多組件之間的通信,例如,使用Python(熊貓)讀取實木複合地板文件並轉換為Spark數據框,Falcon Data Visualization或Cassandra,而無需擔心轉換。
一個好問題是問數據在內存中的外觀如何? 好吧,Apache Arrow利用列緩衝區來減少IO並加快分析處理性能。
在我們的例子中,我們將使用pyarrow庫執行一些基本代碼並檢查一些功能。 為了安裝,我們有兩個使用conda或pip命令*的選項。
<code>conda
install -c conda-forge pyarrow
pip
install pyarrow
/<code>
*建議在Python 3環境中使用conda。
帶有HDFS的Apache Arrow(遠程文件系統)
Apache Arrow附帶了到Hadoop File System的基於C ++的接口的綁定。 這意味著我們可以從HDFS讀取或下載所有文件,並直接使用Python進行解釋。
連接
主機是名稱節點,端口通常是RPC或WEBHDFS,允許使用更多參數,例如user,kerberos ticket。 強烈建議您閱讀所需的環境變量。
<code>import
pyarrow as pa
host
='1970.x.x.x'
port
=8022
fs
=pa.hdfs.connect(host, port)
/<code>
· 如果您的連接位於數據或邊緣節點的前面,則可以選擇使用
<code>fs
= pa.hdfs.connect()/<code>
將Parquet文件寫入HDFS
<code>pq
.write_to_dataset
(table, root_path='dataset_name'
, partition_cols=['one'
,'two'
], filesystem=fs)/<code>
從HDFS讀取CSV
<code>import
pandasas
pdfrom
pyarrowimport
csvimport
pyarrowas
pa fs = pa.hdfs.connect()with
fs.open('iris.csv'
,'rb'
)as
f: df = pd.read_csv(f, nrows =10
) df.head()/<code>
從HDFS讀取Parquet文件
有兩種形式可以從HDFS讀取實木複合地板文件
使用Pandas和Pyarrow引擎
<code>import
pandasas
pd pdIris = pd.read_parquet('hdfs:///iris/part-00000–27c8e2d3-fcc9–47ff-8fd1–6ef0b079f30e-c000.snappy.parquet'
, engine='pyarrow'
) pdTrain.head()/<code>
Parquet
<code>import pyarrow.parquet as pqpath
='hdfs:///iris/part-00000–71c8h2d3-fcc9–47ff-8fd1–6ef0b079f30e-c000.snappy.parquet'
table
= pq.read_table(path
)table
.schema df =table
.to_pandas() df.head()/<code>
其他文件擴展名
由於我們可以存儲任何類型的文件(SAS,STATA,Excel,JSON或對象),因此Python可以輕鬆解釋其中的大多數文件。 為此,我們將使用open函數,該函數返回一個緩衝區對象,許多pandas函數(如read_sas,read_json)都可以接收該緩衝區對象作為輸入,而不是字符串URL。
SAS
<code>import
pandasas
pdimport
pyarrowas
pa fs = pa.hdfs.connect()with
fs.open('/datalake/airplane.sas7bdat'
,'rb'
)as
f: sas_df = pd.read_sas(f, format='sas7bdat'
) sas_df.head()/<code>
電子表格
<code>import
pandasas
pdimport
pyarrowas
pa fs = pa.hdfs.connect()with
fs.open('/datalake/airplane.xlsx'
,'rb'
)as
f: g.download('airplane.xlsx'
) ex_df = pd.read_excel('airplane.xlsx'
)/<code>
JSON格式
<code>import
pandasas
pdimport
pyarrowas
pa fs = pa.hdfs.connect()with
fs.open('/datalake/airplane.json'
,'rb'
)as
f: g.download('airplane.json'
) js_df = pd.read_json('airplane.json'
)/<code>
從HDFS下載文件
如果我們只需要下載文件,Pyarrow為我們提供了下載功能,可以將文件保存在本地。
<code>import
pandasas
pdimport
pyarrowas
pa fs = pa.hdfs.connect()with
fs.open('/datalake/airplane.cs'
,'rb'
)as
f: g.download('airplane.cs'
)/<code>
上傳文件到HDFS
如果我們只需要下載文件,Pyarrow為我們提供了下載功能,可以將文件保存在本地。
<code>import
pyarrowas
pa fs = pa.hdfs.connect()with
open('settings.xml'
)as
f: pa.hdfs.HadoopFileSystem.upload(fs,'/datalake/settings.xml'
, f)/<code>
Apache Arrow with Pandas(本地文件系統)
將Pandas Dataframe轉換為Apache Arrow Table
<code>import
numpyas
npimport
pandasas
pdimport
pyarrowas
pa df = pd.DataFrame({'one'
: [20
, np.nan,2.5
],'two'
: ['january'
,'february'
,'march'
],'three'
: [True
,False
,True
]},index=list('abc'
)) table = pa.Table.from_pandas(df)/<code>
Pyarrow表到Pandas數據框
<code>df_new
= table.to_pandas()/<code>
讀取CSV
<code>from
pyarrowimport
csv fn ='data/demo.csv'
table = csv.read_csv(fn) Ω/<code>
從Apache Arrow編寫Parquet文件
<code>import
pyarrow.parquetas
pq pq.write_table(table,'example.parquet'
)/<code>
讀取Parquet文件
<code>table2
= pq.read_table('example.parquet'
) table2/<code>
從parquet文件中讀取一些列
<code>table2
= pq.read_table('example.parquet'
, columns=['one'
,'three'
])/<code>
從分區數據集讀取
<code>dataset = pq.ParquetDataset('dataset_name_directory/'
)table
= dataset.read
()table
/<code>
將Parquet文件轉換為Pandas DataFrame
<code>'example.parquet'
, columns=['two'
]).to_pandas() pdf/<code>
避免Pandas指數
<code>table
= pa.Table.from_pandas(df, preserve_index=False) pq.write_table(table
,'example_noindex.parquet'
) t = pq.read_table('example_noindex.parquet'
) t.to_pandas()/<code>
檢查元數據
<code>parquet_file
= pq.ParquetFile('example.parquet'
) parquet_file.metadata/<code>
查看數據模式
<code>parquet_file
.schema
/<code>
時間戳記
請記住,Pandas使用納秒,因此您可以以毫秒為單位截斷兼容性。
<code>pq.write_table(table,where
, coerce_timestamps='ms'
) pq.write_table(table,where
, coerce_timestamps='ms'
, allow_truncated_timestamps=True)/<code>
壓縮
默認情況下,儘管允許其他編解碼器,但Apache arrow使用快速壓縮(壓縮程度不高,但更易於訪問)。
<code>pq.write_table(table,where
, compression='snappy'
) pq.write_table(table,where
, compression='gzip'
) pq.write_table(table,where
, compression='brotli'
) pq.write_table(table,where
, compression='none'
)/<code>
另外,在一個表中可以使用多個壓縮
<code>pq.write_table(table
,'example_diffcompr.parquet'
, compression={b'one'
:'snappy'
, b'two'
:'gzip'
})/<code>
編寫分區的Parquet表
<code>df = pd.DataFrame({'one'
: [1
,2.5
,3
],'two'
: ['Peru'
,'Brasil'
,'Canada'
],'three'
: [True
,False
,True
]}, index=list
('abc'
)) table = pa.Table.from_pandas(df) pq.write_to_dataset(table, root_path='dataset_name'
,partition_cols=['one'
,'two'
])/<code>
· 兼容性說明:如果您使用pq.write_to_dataset創建一個供HIVE使用的表,則分區列值必須與您正在運行的HIVE版本的允許字符集兼容。
帶有Apache Spark的Apache Arrow
Apache Arrow自2.3版本以來已與Spark集成在一起,它很好地演示瞭如何優化時間以避免序列化和反序列化過程,並與其他庫進行了集成,例如Holden Karau上關於在Spark上加速Tensorflow Apache Arrow的演示。
存在其他有用的文章,例如Brian Cutler發表的文章以及Spark官方文檔中的非常好的示例
Apache Arrow的一些有趣用法是:
· 加快從Pandas數據框到Spark數據框的轉換
· 加快從Spark數據框到Pandas數據框的轉換
· 與Pandas UDF(也稱為矢量化UDF)一起使用
· 使用Apache Spark優化R
第三項是下一篇文章的一部分,因為這是一個非常有趣的主題,目的是在不損失性能的情況下擴展Pandas和Spark之間的集成,對於第四項,我建議您閱讀該文章(於2019年發佈!)以獲得 瞭解更多。
讓我們先測試Pandas和Spark之間的轉換,而不進行任何修改,然後再使用Arrow。
<code>from pyspark.sql import SparkSession warehouseLocation ="/antonio"
spark = SparkSession\ .builder.appName("demoMedium"
)\ .config("spark.sql.warehouse.dir"
, warehouseLocation)\ .enableHiveSupport()\ .getOrCreate() from pyspark.sql.functions import rand df = spark.range(1 << 22).toDF("id"
).withColumn("x"
, rand()) df.printSchema() pdf = df.toPandas()spark.conf.set("spark.sql.execution.arrow.enabled"
,"true"
) %time pdf = df.toPandas() pdf.describe()/<code>
結果顯然是使用Arrow減少時間轉換更方便。
如果我們需要測試相反的情況(Pandas來激發df),那麼我們也會及時發現優化。
<code>%time
df = spark.createDataFrame(pdf) spark.conf.set("spark.sql.execution.arrow.enabled"
,"false"
) %time
df = spark.createDataFrame(pdf) df.describe().show()/<code>
結論
本文的目的是發現並瞭解Apache Arrow以及它如何與Apache Spark和Pandas一起使用,我也建議您查看It的官方頁面,以進一步瞭解CUDA或C ++等其他可能的集成,如果您想更深入地瞭解它, 並瞭解有關Apache Spark的更多信息,我認為Spark:權威指南是一本很好的書。
附註:如果您有任何疑問,或者想澄清一些問題,可以在Twitter和LinkedIn上找到我。 我最近發表了Apache Druid的簡要介紹,這是一個新的Apache項目,非常適合分析數十億行。
(本文翻譯自Antonio Cachuan的文章《A gentle introduction to Apache Arrow with Apache Spark and Pandas》,參考:
https://towardsdatascience.com/a-gentle-introduction-to-apache-arrow-with-apache-spark-and-pandas-bb19ffe0ddae)