大数据Join指南-Python,SQL,Pandas,Spark,Dask

如何最佳地Join大型数据集-多种方法的指南

The Big Data — Photo by Patrick Lindenberg on Unsplash


旋转磁盘,鱼片或使用大数据有很多方法-这是快速指南。

样本数据集

Kaggle电影数据库[8]-25,000万条额定值,用于45,000部电影,数据分为5个文件:

© Foostack.Ai


要查找电影的最高平均收视率,您需要将指向元数据的链接添加到收视率中:

SELECT m.title, avg(r.rating) FROM links l INNER JOIN to metas m ON m.imdbId=l.imdbId INNER JOIN to ratings ON r.movieId=l.movieId GROUP BY m.title HAVING count(r.rating) > 2 and avg(r.rating) > 4.5

老派SQL RDBMS

经典的方法是加载数据库,建立索引并运行前面提到的SQL(或使用下面的经典SQL):

SELECT m.title, avg(r.rating) FROM links l, metas m, ratings r WHERE m.imdbId=l.imdbId and r.movieId=l.movieId GROUP BY m.title HAVING count(r.rating) > 2 and avg(r.rating) > 4.5

RDBMS的加入通过3种主要方式进行,并带有一些平台变体:

· 嵌套循环-对表A的每一行查找表B中的匹配键。B上的索引使查找为O(A * log B),否则联接为SLOW-O(A * B)。

· 哈希联接—通过查找关键字构建表B的哈希/映射,使联接查找非常快— O(A * 1)

· 合并排序-对两个表进行排序并在一次通过中合并,除非预先排序,否则不会超快-O(A + B + A log A + B log B)→O(A log A + B log B)

使用Sqlite3 [1]-创建数据库,表和加载非常容易:

<code>import sqlite3
import csv

con = sqlite3.connect('mydatabase.db')
c = con.cursor()

# load file into data array
with open(file, 'r', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
fields = next(csvreader) # strip header
for row in csvreader:
data.append(row)

# create table
c.execute("CREATE TABLE ratings(movieId text, rating float")

# execute in batch, commit in 500's
c.execute("INSERT INTO ratings(movieId, rating) VALUES (?,?)", data)

# create indexes
c.execute("create index ridx on ratings(movieId, rating)")/<code>

加载和查询2600万行数据大约需要10分钟的时间(也许我们可以组合调整前两个步骤..)

· 从csv /磁盘加载-35秒

· 插入数据库— 8分钟

· 添加索引— 30秒

· 按查询分组— 20秒

您还可以使用sqlite3命令行来测试和查看查询执行计划,如下所示。 在联接列上添加一些额外的索引后,我们的目标查询大约需要21秒才能执行。

SQLite query execution (from cmdline tools)

使用SQL DB是可扩展的,但是比较老套。 接下来,我们将尝试行家技巧。

Python —适用于终极黑客

我们可以节省数据库开销,编写数据负载,并在Python中直接繁琐地进行Join:

<code># double nested loop join
def merge(links, ratings, metas):
merged = []
for link in links[1]:
mlink = link.copy()
mlink += (['','']) # rating and name
for rating in ratings[1]:
if (mlink[0] == rating[1]):
mlink[3] = rating[2]
break
for meta in metas[1]:
if (mlink[1] == meta[6]): # stripped tt off meta imdb columns FYI
mlink[4] = meta[20]
break
merged.append(mlink)
return merged/<code>


" merge()"是一个没有索引的嵌套循环连接。 循环必须扫描metas和links表以获取每个等级(26m * 50k * 2)。 100k条评论需要5分钟,因此2600万条评论将需要很长时间...

" merge_wmap()"是一个哈希联接-我们为元数据和链接构建了一个Map映射,从而产生了O(n * 1)性能。 加入2600万行只需3秒!

<code># optimized single loop + w/ hash lookups of ratings and metadata
def merge_wmap(links, ratings, metas) -> []:
ratings_map = make_map(ratings[1], 1)
metas_map = make_map(metas[1], 6)
merged = []
for link in links[1]:
mlink = link.copy() + ['','']
mlink[3] = ratings_map.get(link[0])[2] if ratings_map.get(link[0]) else ''
mlink[4] = metas_map.get(link[1])[20] if metas_map.get(link[1]) else ''
merged.append(mlink)
return merged/<code>


我没有实现分组过滤器-相对较快(需要26m行结果的排序-扫描-结合)-我估计加载和处理的总时间约为0:53

· 将原始CSV加载到数组-35秒

· 手动合并索引— 3秒

· 手动分组和过滤器-15秒(待定〜估算)

原始Python很快但很丑。 全速本地PC并完全控制所有错误。

Pandas数据框

Pandas [2]是Python上用于数据准备的事实上的软件包。 极其快速且易于使用,我们可以用最少的代码进行加载,连接和分组:

<code>import pandas as pd

# load files
ratings_df = pd.read_csv('ratings.csv')
metas_df = pd.read_csv('movies_metadata.csv')
links_df = pd.read_csv('links.csv')

# 1st join
merged_df = pd.merge(links_df[['movieId','imdbId']],
ratings_df[['movieId','rating']], on='movieId', how='right',
validate='one_to_many')

# 2nd join
merged_df = pd.merge(merged_df, metas_df[['title','imdb_id']],
left_on='imdbId', right_on='imdb_id', how='inner')

# group-by having
grouped_df = merged_df[['title','rating']].groupby('title').
agg(Mean=('rating', 'mean' ), Count=('rating','count')).
query('Mean > 4.5 and Count > 2')/<code>

Pandas很聪明。 您无需预定义哈希或索引,它似乎可以动态生成优化连接所需的内容。 最大的限制是它存在于单个计算机上。 在大约0:17的时间内处理26m行,使用更少的代码,并且没有外部系统(数据库,集群等)。

· 将3个csv加载到DataFrames — 5秒

· 加入3个DataFrames — 8秒

· 加入,分组和过滤-+4秒

Pands文件加载比我自定义的py〜35sec和5sec快得多! 要显示不是黑客,请使用库。 从理论上讲,Pandas是它的单线程/进程(在我的TaskManager中看起来不是这样),因此数据集的大小受PC内存的限制。

尽管如此,Pandas是处理中小型数据的最终方法,但我们需要大数据!

Spark Clusters FTW(致胜)

SQL很棒,但是并行化和破解能力有限。 Python和Pandas超级灵活,但缺乏可伸缩性。 Apache Spark [5]是在大数据上并行化内存中操作的实际方法。

Spark有一个称为DataFrame的对象(是另一个!),它就像Pandas DataFrame一样,甚至可以从中加载/窃取数据(尽管您可能应该通过HDFS或Cloud加载数据,以避免BIG数据传输问题):

<code>from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# context config/setup
sc = SparkContext(conf=SparkConf().setMaster('local[8]'))
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


# copy pandas df's into spark df's
t1 = spark.createDataFrame(ratings_df)
t2 = spark.createDataFrame(links_df)
t3 = spark.createDataFrame(metas_df)

# normal spark join (runs on cluster default partioning)
df = t1.join(t2, t1['movieId']==t2['movieId']).
join(t3, t2['imdbId'] == t3['imbd_id'])

# broadcast smaller tables to worker
bc_df = t1.join(func.broadcast(t2), t1['movieId']==t2['movieId']).
join(func.broadcast(t3), t2['imdbId'] == t3['imbd_id'])

# group by results
df.groupBy('title').agg(func.mean('rating').
alias('avg_rating'),func.count('rating').
alias('r_count')).filter('r_count >2').
filter('avg_rating > 4.5').show()/<code>

我编写了两个Spark连接方法。 两者并行运行。 默认模式(第15行)将对数据进行分区,并在群集节点上随机(分散)数据。 后者的"广播"模式(第18行)仅复制一次较小的表,并且仅分区并发送较大的表内容。 使用较小的联接表,广播模式可以更快。

Spark将工作划分为多个工作节点(JVM(设置为8,以匹配我的CPU内核数)),以划分并征服到一个聚合中。 Spark代码和结果输出如下:

<code>df.groupBy('title').agg(func.mean('rating').
alias('avg_rating'),func.count('rating').
alias('r_count')).filter('r_count >2').
filter('avg_rating > 4.5').show()/<code>


Output from Spark's join and group-by operation on 26m rows

性能摘要

(我的笔记本电脑的非实验室认证结果)

首先请注意将3个数据集连接在一起的运行时间:

Performance for joins


令人惊讶的是,原始的Python解决方案是最快的吗? 哈克哈克!

顶级分组的最终结果(包括Spark):

Performance for join and group-by + filtering


总结:

· Pandas具有卓越的快速性和高效性,因此您拥有核心的记忆力。 在某些时候,Python / Pandas将耗尽内存并崩溃。

· 尽管集群管理可能很棘手,但Spark是一个很好的扩展解决方案。 内存中分布式处理,对作业和数据进行分区以及分区存储策略(HDFS或其他)是正确的方向。

· RDBMS可靠,但在移动数据和处理方面有扩展限制

下一章将进一步介绍Spark。 糟糕,我忘记了Dask(原生Python群集),也许是下次。

SQLite3资源配置文件

Only 2 cores really active, extra I/O (even though its mostly cached)


Pandas资源简介

Surprisingly good CPU utilization for what I thought was as single threaded/task process ?


Spark资源配置文件(8个工作程序,10个分区)

All cores utilized w/ 8 workers — good CPU and memory distribution


以上数据来自我的MSFT Surface Laptop 3 — i7 / 16gb / 256gb SSD

参考和启示

[0]测试代码的完整来源(不仅仅是要点)-DougFoo的GitHub

[1] SQLite Python指南-官方Python文档

[2]Pandas指南-10分钟教程

[3]较旧的分析SQLite vs Pandas — Wes McKiney博客

[4] Spark加入DB Deck — DataBricks演示

[5]关于Spark的详细介绍-A. Ialenti的TDS文章

[6] PYArrow用于在Spark中快速加载DataFrame — Bryan Cutler IBM

[7]在10分钟内安装PySpark Win — TDS文章作者:乌玛·格(Uma G)

[8]电影评论文件-Kaggle数据集


(本文翻译自Doug Foo的文章《Guide to Big Data Joins — Python, SQL, Pandas, Spark, Dask》,参考:https://towardsdatascience.com/guide-to-big-data-joins-python-sql-pandas-spark-dask-51b7f4fec810)