並行計算入門-PySpark的使用


並行計算入門-PySpark的使用

Spark應用程序作為獨立的進程集運行,由驅動程序中的Spark context進行協調。

它可以自動創建(例如,如果您從shell中調用pyspark(然後將Spark上下文稱為sc)。

但是我們還沒有建立它,所以需要進行定義:

<code>from pyspark import SparkContext
sc = SparkContext('local[2]', 'Spark 101')
/<code>

如果想要使用全部可用資源,也可以使用:

<code>sc = SparkContext('local[*]', 'Spark 101') 
/<code>

創建完之後,首先處理一個簡單的攝氏華氏溫度轉換的並行計算

<code>temp_c = [10, 3, -5, 25, 1, 9, 29, -10, 5]

rdd_temp_c = sc.parallelize(temp_c)
rdd_temp_K = rdd_temp_c.map(lambda x: x + 273.15).collect()

print(rdd_temp_K)
/<code>

結果

<code>[283.15, 276.15, 268.15, 298.15, 274.15, 282.15, 302.15, 263.15, 278.15]
/<code>

通過使用sc.parallelize來並行處理待計算的攝氏溫度,然後使用map來進行一一映射。

除了map這種映射外,常用還有reduce函數,例如

<code>numbers = [1, 4, 6,2, 9, 10]

rdd_numbers=sc.parallelize(numbers)

# Use reduce to combine numbers
rdd_reduce = rdd_numbers.reduce(lambda x,y: "(" + str(x) + ", " + str(y) + "]")
#rdd_reduce = rdd_numbers.reduce(lambda x,y: "(" + str(x) + str(y))


print(rdd_reduce)
/<code>

結果

<code>(((1, (4, 6]], 2], (9, 10]]
/<code>

通常我們使用PySpark是為了部署機器學習項目,在PySpark中也有多樣的數據處理手段。

首先我們創建一個會話(Session)

<code>from pyspark.sql import SparkSession

session = SparkSession.builder.appName('data_processing').getOrCreate()
/<code>

讀入數據集

<code>df = session.read.csv('iris.csv',inferSchema=True,header=True)
df.show()
/<code>


<code>df.printSchema()
/<code>

結果

<code>root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)

|-- petal_width: double (nullable = true)
|-- species: string (nullable = true)
/<code>

也可以選擇指定列輸出

<code>df.select('sepal_width','petal_width').show(3)
/<code>

結果

<code>+-----------+-----------+
|sepal_width|petal_width|
+-----------+-----------+
| 3.5| 0.2|
| 3.0| 0.2|
| 3.2| 0.2|
+-----------+-----------+
only showing top 3 rows
/<code>

創建新列

<code>df.withColumn("new_col_name",(df["sepal_width"]*10)).show(6)
/<code>

篩選特徵

<code>df.filter(df["species"]=="setosa").show(9)
df.filter((df["species"]=="setosa") | (df["sepal_width"]<3.0)).show(9)
/<code>

查看列的不同值

<code>df.select("species").distinct().show()
/<code>

按列統計

<code>df.groupBy('petal_length').count().show()
/<code>

按列統計排序

<code>df.groupBy('petal_length').count().orderBy("count",ascending=False).show()
/<code>

按列聚合

<code>df.groupBy('species').agg({'sepal_length':'sum'}).show()
/<code>

以上只是簡單的數據處理部分,PySpark還有很多其他吸引人的特性,將在後續文章中繼續介紹。


分享到:


相關文章: