Spark應用程序作為獨立的進程集運行,由驅動程序中的Spark context進行協調。
它可以自動創建(例如,如果您從shell中調用pyspark(然後將Spark上下文稱為sc)。
但是我們還沒有建立它,所以需要進行定義:
<code>from pyspark import SparkContext
sc = SparkContext('local[2]', 'Spark 101')
/<code>
如果想要使用全部可用資源,也可以使用:
<code>sc = SparkContext('local[*]', 'Spark 101')
/<code>
創建完之後,首先處理一個簡單的攝氏華氏溫度轉換的並行計算
<code>temp_c = [10, 3, -5, 25, 1, 9, 29, -10, 5]
rdd_temp_c = sc.parallelize(temp_c)
rdd_temp_K = rdd_temp_c.map(lambda x: x + 273.15).collect()
print(rdd_temp_K)
/<code>
結果
<code>[283.15, 276.15, 268.15, 298.15, 274.15, 282.15, 302.15, 263.15, 278.15]
/<code>
通過使用sc.parallelize來並行處理待計算的攝氏溫度,然後使用map來進行一一映射。
除了map這種映射外,常用還有reduce函數,例如
<code>numbers = [1, 4, 6,2, 9, 10]
rdd_numbers=sc.parallelize(numbers)
# Use reduce to combine numbers
rdd_reduce = rdd_numbers.reduce(lambda x,y: "(" + str(x) + ", " + str(y) + "]")
#rdd_reduce = rdd_numbers.reduce(lambda x,y: "(" + str(x) + str(y))
print(rdd_reduce)
/<code>
結果
<code>(((1, (4, 6]], 2], (9, 10]]
/<code>
通常我們使用PySpark是為了部署機器學習項目,在PySpark中也有多樣的數據處理手段。
首先我們創建一個會話(Session)
<code>from pyspark.sql import SparkSession
session = SparkSession.builder.appName('data_processing').getOrCreate()
/<code>
讀入數據集
<code>df = session.read.csv('iris.csv',inferSchema=True,header=True)
df.show()
/<code>
<code>df.printSchema()
/<code>
結果
<code>root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)
|-- petal_width: double (nullable = true)
|-- species: string (nullable = true)
/<code>
也可以選擇指定列輸出
<code>df.select('sepal_width','petal_width').show(3)
/<code>
結果
<code>+-----------+-----------+
|sepal_width|petal_width|
+-----------+-----------+
| 3.5| 0.2|
| 3.0| 0.2|
| 3.2| 0.2|
+-----------+-----------+
only showing top 3 rows
/<code>
創建新列
<code>df.withColumn("new_col_name",(df["sepal_width"]*10)).show(6)
/<code>
篩選特徵
<code>df.filter(df["species"]=="setosa").show(9)
df.filter((df["species"]=="setosa") | (df["sepal_width"]<3.0)).show(9)
/<code>
查看列的不同值
<code>df.select("species").distinct().show()
/<code>
按列統計
<code>df.groupBy('petal_length').count().show()
/<code>
按列統計排序
<code>df.groupBy('petal_length').count().orderBy("count",ascending=False).show()
/<code>
按列聚合
<code>df.groupBy('species').agg({'sepal_length':'sum'}).show()
/<code>
以上只是簡單的數據處理部分,PySpark還有很多其他吸引人的特性,將在後續文章中繼續介紹。
閱讀更多 leo的學習之旅 的文章