大數據Join指南-Python,SQL,Pandas,Spark,Dask

如何最佳地Join大型數據集-多種方法的指南

大數據Join指南-Python,SQL,Pandas,Spark,Dask

The Big Data — Photo by Patrick Lindenberg on Unsplash


旋轉磁盤,魚片或使用大數據有很多方法-這是快速指南。

樣本數據集

Kaggle電影數據庫[8]-25,000萬條額定值,用於45,000部電影,數據分為5個文件:

大數據Join指南-Python,SQL,Pandas,Spark,Dask

© Foostack.Ai


要查找電影的最高平均收視率,您需要將指向元數據的鏈接添加到收視率中:

SELECT m.title, avg(r.rating) FROM links l INNER JOIN to metas m ON m.imdbId=l.imdbId INNER JOIN to ratings ON r.movieId=l.movieId GROUP BY m.title HAVING count(r.rating) > 2 and avg(r.rating) > 4.5

老派SQL RDBMS

經典的方法是加載數據庫,建立索引並運行前面提到的SQL(或使用下面的經典SQL):

SELECT m.title, avg(r.rating) FROM links l, metas m, ratings r WHERE m.imdbId=l.imdbId and r.movieId=l.movieId GROUP BY m.title HAVING count(r.rating) > 2 and avg(r.rating) > 4.5

RDBMS的加入通過3種主要方式進行,並帶有一些平臺變體:

· 嵌套循環-對錶A的每一行查找表B中的匹配鍵。B上的索引使查找為O(A * log B),否則聯接為SLOW-O(A * B)。

· 哈希聯接—通過查找關鍵字構建表B的哈希/映射,使聯接查找非常快— O(A * 1)

· 合併排序-對兩個表進行排序並在一次通過中合併,除非預先排序,否則不會超快-O(A + B + A log A + B log B)→O(A log A + B log B)

使用Sqlite3 [1]-創建數據庫,表和加載非常容易:

<code>import sqlite3 

import csv

con = sqlite3.connect('mydatabase.db')
c = con.cursor()

# load file into data array
with open(file, 'r', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
fields = next(csvreader) # strip header
for row in csvreader:
data.append(row)

# create table
c.execute("CREATE TABLE ratings(movieId text, rating float")

# execute in batch, commit in 500's
c.execute("INSERT INTO ratings(movieId, rating) VALUES (?,?)", data)

# create indexes
c.execute("create index ridx on ratings(movieId, rating)")/<code>

加載和查詢2600萬行數據大約需要10分鐘的時間(也許我們可以組合調整前兩個步驟..)

· 從csv /磁盤加載-35秒

· 插入數據庫— 8分鐘

· 添加索引— 30秒

· 按查詢分組— 20秒

您還可以使用sqlite3命令行來測試和查看查詢執行計劃,如下所示。 在聯接列上添加一些額外的索引後,我們的目標查詢大約需要21秒才能執行。

大數據Join指南-Python,SQL,Pandas,Spark,Dask

SQLite query execution (from cmdline tools)

使用SQL DB是可擴展的,但是比較老套。 接下來,我們將嘗試行家技巧。

Python —適用於終極黑客

我們可以節省數據庫開銷,編寫數據負載,並在Python中直接繁瑣地進行Join:

<code># double nested loop join
def merge(links, ratings, metas):
merged = []
for link in links[1]:
mlink = link.copy()
mlink += (['','']) # rating and name
for rating in ratings[1]:
if (mlink[0] == rating[1]):
mlink[3] = rating[2]
break
for meta in metas[1]:
if (mlink[1] == meta[6]): # stripped tt off meta imdb columns FYI
mlink[4] = meta[20]
break
merged.append(mlink)
return merged/<code>


" merge()"是一個沒有索引的嵌套循環連接。 循環必須掃描metas和links表以獲取每個等級(26m * 50k * 2)。 100k條評論需要5分鐘,因此2600萬條評論將需要很長時間...

" merge_wmap()"是一個哈希聯接-我們為元數據和鏈接構建了一個Map映射,從而產生了O(n * 1)性能。 加入2600萬行只需3秒!

<code># optimized single loop + w/ hash lookups of ratings and metadata
def merge_wmap(links, ratings, metas) -> []:
ratings_map = make_map(ratings[1], 1)
metas_map = make_map(metas[1], 6)
merged = []
for link in links[1]:
mlink = link.copy() + ['','']
mlink[3] = ratings_map.get(link[0])[2] if ratings_map.get(link[0]) else ''
mlink[4] = metas_map.get(link[1])[20] if metas_map.get(link[1]) else ''
merged.append(mlink)
return merged/<code>


我沒有實現分組過濾器-相對較快(需要26m行結果的排序-掃描-結合)-我估計加載和處理的總時間約為0:53

· 將原始CSV加載到數組-35秒

· 手動合併索引— 3秒

· 手動分組和過濾器-15秒(待定〜估算)

原始Python很快但很醜。 全速本地PC並完全控制所有錯誤。

Pandas數據框

Pandas [2]是Python上用於數據準備的事實上的軟件包。 極其快速且易於使用,我們可以用最少的代碼進行加載,連接和分組:

<code>import pandas as pd

# load files
ratings_df = pd.read_csv('ratings.csv')

metas_df = pd.read_csv('movies_metadata.csv')
links_df = pd.read_csv('links.csv')

# 1st join
merged_df = pd.merge(links_df[['movieId','imdbId']],
ratings_df[['movieId','rating']], on='movieId', how='right',
validate='one_to_many')

# 2nd join
merged_df = pd.merge(merged_df, metas_df[['title','imdb_id']],
left_on='imdbId', right_on='imdb_id', how='inner')

# group-by having
grouped_df = merged_df[['title','rating']].groupby('title').
agg(Mean=('rating', 'mean' ), Count=('rating','count')).
query('Mean > 4.5 and Count > 2')/<code>

Pandas很聰明。 您無需預定義哈希或索引,它似乎可以動態生成優化連接所需的內容。 最大的限制是它存在於單個計算機上。 在大約0:17的時間內處理26m行,使用更少的代碼,並且沒有外部系統(數據庫,集群等)。

· 將3個csv加載到DataFrames — 5秒

· 加入3個DataFrames — 8秒

· 加入,分組和過濾-+4秒

Pands文件加載比我自定義的py〜35sec和5sec快得多! 要顯示不是黑客,請使用庫。 從理論上講,Pandas是它的單線程/進程(在我的TaskManager中看起來不是這樣),因此數據集的大小受PC內存的限制。

儘管如此,Pandas是處理中小型數據的最終方法,但我們需要大數據!

Spark Clusters FTW(致勝)

SQL很棒,但是並行化和破解能力有限。 Python和Pandas超級靈活,但缺乏可伸縮性。 Apache Spark [5]是在大數據上並行化內存中操作的實際方法。

Spark有一個稱為DataFrame的對象(是另一個!),它就像Pandas DataFrame一樣,甚至可以從中加載/竊取數據(儘管您可能應該通過HDFS或Cloud加載數據,以避免BIG數據傳輸問題):

<code>from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# context config/setup
sc = SparkContext(conf=SparkConf().setMaster('local[8]'))
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# copy pandas df's into spark df's
t1 = spark.createDataFrame(ratings_df)
t2 = spark.createDataFrame(links_df)
t3 = spark.createDataFrame(metas_df)

# normal spark join (runs on cluster default partioning)
df = t1.join(t2, t1['movieId']==t2['movieId']).
join(t3, t2['imdbId'] == t3['imbd_id'])

# broadcast smaller tables to worker
bc_df = t1.join(func.broadcast(t2), t1['movieId']==t2['movieId']).
join(func.broadcast(t3), t2['imdbId'] == t3['imbd_id'])

# group by results
df.groupBy('title').agg(func.mean('rating').
alias('avg_rating'),func.count('rating').
alias('r_count')).filter('r_count >2').
filter('avg_rating > 4.5').show()/<code>

我編寫了兩個Spark連接方法。 兩者並行運行。 默認模式(第15行)將對數據進行分區,並在群集節點上隨機(分散)數據。 後者的"廣播"模式(第18行)僅複製一次較小的表,並且僅分區併發送較大的表內容。 使用較小的聯接表,廣播模式可以更快。

Spark將工作劃分為多個工作節點(JVM(設置為8,以匹配我的CPU內核數)),以劃分並征服到一個聚合中。 Spark代碼和結果輸出如下:

<code>df.groupBy('title').agg(func.mean('rating').   
alias('avg_rating'),func.count('rating').
alias('r_count')).filter('r_count >2').
filter('avg_rating > 4.5').show()/<code>


大數據Join指南-Python,SQL,Pandas,Spark,Dask

Output from Spark's join and group-by operation on 26m rows

性能摘要

(我的筆記本電腦的非實驗室認證結果)

首先請注意將3個數據集連接在一起的運行時間:

大數據Join指南-Python,SQL,Pandas,Spark,Dask

Performance for joins


令人驚訝的是,原始的Python解決方案是最快的嗎? 哈克哈克!

頂級分組的最終結果(包括Spark):

大數據Join指南-Python,SQL,Pandas,Spark,Dask

Performance for join and group-by + filtering


總結:

· Pandas具有卓越的快速性和高效性,因此您擁有核心的記憶力。 在某些時候,Python / Pandas將耗盡內存並崩潰。

· 儘管集群管理可能很棘手,但Spark是一個很好的擴展解決方案。 內存中分佈式處理,對作業和數據進行分區以及分區存儲策略(HDFS或其他)是正確的方向。

· RDBMS可靠,但在移動數據和處理方面有擴展限制

下一章將進一步介紹Spark。 糟糕,我忘記了Dask(原生Python群集),也許是下次。

SQLite3資源配置文件

大數據Join指南-Python,SQL,Pandas,Spark,Dask

Only 2 cores really active, extra I/O (even though its mostly cached)


Pandas資源簡介

大數據Join指南-Python,SQL,Pandas,Spark,Dask

Surprisingly good CPU utilization for what I thought was as single threaded/task process ?


Spark資源配置文件(8個工作程序,10個分區)

大數據Join指南-Python,SQL,Pandas,Spark,Dask

All cores utilized w/ 8 workers — good CPU and memory distribution


以上數據來自我的MSFT Surface Laptop 3 — i7 / 16gb / 256gb SSD

參考和啟示

[0]測試代碼的完整來源(不僅僅是要點)-DougFoo的GitHub

[1] SQLite Python指南-官方Python文檔

[2]Pandas指南-10分鐘教程

[3]較舊的分析SQLite vs Pandas — Wes McKiney博客

[4] Spark加入DB Deck — DataBricks演示

[5]關於Spark的詳細介紹-A. Ialenti的TDS文章

[6] PYArrow用於在Spark中快速加載DataFrame — Bryan Cutler IBM

[7]在10分鐘內安裝PySpark Win — TDS文章作者:烏瑪·格(Uma G)

[8]電影評論文件-Kaggle數據集


(本文翻譯自Doug Foo的文章《Guide to Big Data Joins — Python, SQL, Pandas, Spark, Dask》,參考:https://towardsdatascience.com/guide-to-big-data-joins-python-sql-pandas-spark-dask-51b7f4fec810)


分享到:


相關文章: