在以如此驚人的速度生成數據的世界中,在正確的時間對數據進行正確分析非常有用。實時處理大數據並執行分析的最令人驚奇的框架之一是Apache Spark,如果我們談論現在用於處理複雜數據分析和數據修改任務的編程語言,我相信Python會超越這個圖表。所以在這個
PySpark教程中,我將討論以下主題:什麼是PySpark?
PySpark在業界
為什麼選擇Python?
Spark RDDs
使用PySpark進行機器學習
PySpark教程:什麼是PySpark?
Apache Spark是一個快速的集群計算框架,用於處理,查詢和分析大數據。基於內存計算,它具有優於其他幾個大數據框架的優勢。
開源社區最初是用Scala編程語言編寫的,它開發了一個支持Apache Spark的神奇工具。PySpark通過其庫Py4j幫助數據科學家與Apache Spark和Python中的RDD進行交互。有許多功能使PySpark成為比其他更好的框架:
- 速度:比傳統的大規模數據處理框架快100倍。
- 強大的緩存:簡單的編程層提供強大的緩存和磁盤持久性功能。
- 部署:可以通過Mesos,Hadoop通過Yarn或Spark自己的集群管理器進行部署。
- 實時:由於內存計算,實時計算和低延遲。
- Polyglot: 支持Scala,Java,Python和R編程。
PySpark在業界
讓我們繼續我們的PySpark教程,看看Spark在業界的使用位置。
每個行業都圍繞大數據展開,而大數據則涉及分析。那麼讓我們來看看使用Apache Spark的各個行業。
Media是向在線流媒體發展的最大行業之一。Netflix使用Apache Spark進行實時流處理,為其客戶提供個性化的在線推薦。它每天處理4500億個事件,流向服務器端應用程序。
財務是Apache Spark的實時處理發揮重要作用的另一個領域。銀行正在使用Spark訪問和分析社交媒體資料,以獲取洞察力,從而幫助他們為
信用風險評估,有針對性的廣告和客戶細分做出正確的業務決策。使用Spark還可以減少客戶流失。欺詐檢測是涉及Spark的最廣泛使用的機器學習領域之一。醫療保健提供商正在使用Apache Spark來分析患者記錄以及過去的臨床數據,以確定哪些患者在從診所出院後可能面臨健康問題。Apache Spark用於基因組測序,以減少處理基因組數據所需的時間。
零售和電子商務是一個人們無法想象它在沒有使用分析和有針對性的廣告的情況下運行的行業。作為當今最大的電子商務平臺之一,Alibabaruns是世界上一些最大的Spark職位,用於分析數PB的數據。阿里巴巴在圖像數據中執行特徵提取。易趣使用Apache Spark提供有針對性的優惠,增強客戶體驗並優化整體性能。
旅遊業也使用Apache Spark。TripAdvisor是一家幫助用戶計劃完美旅行的領先旅遊網站,它正在使用Apache Spark來加速其個性化的客戶推薦。TripAdvisor使用Apache Spark通過比較數百個網站為數百萬旅客提供建議,以便為其客戶找到最佳的酒店價格。
這個PySpark教程的一個重要方面是理解為什麼我們需要使用Python。為什麼不使用Java,Scala或R?
易於學習: 對於程序員來說,Python因其語法和標準庫而相對容易學習。而且,它是一種動態類型語言,這意味著RDD可以保存多種類型的對象。
大量的庫: Scala沒有足夠的數據科學工具和Python,如機器學習和自然語言處理。此外,Scala缺乏良好的可視化和本地數據轉換。
巨大的社區支持: Python擁有一個全球社區,擁有數百萬開發人員,可在數千個虛擬和物理位置進行在線和離線交互。
Spark RDDs
當涉及到迭代分佈式計算,即在計算中處理多個作業的數據時,我們需要在多個作業之間重用或共享數據。像Hadoop這樣的早期框架在處理多個操作/作業時遇到了問題:
- 將數據存儲在HDFS等中間存儲中。
- 多個I / O作業使計算變慢。
- 複製和序列化反過來使進程更慢。
RDD嘗試通過啟用容錯分佈式內存計算來解決所有問題。RDD是彈性分佈式數據集的縮寫。RDD是一種分佈式內存抽象,它允許程序員以容錯的方式在大型集群上執行內存計算。它們是在一組計算機上分區的對象的只讀集合,如果分區丟失,可以重建這些對象。在RDD上執行了幾個操作:
- 轉換:轉換從現有數據集創建新數據集。懶惰的評價。
- 操作:僅當在RDD上調用操作時, Spark才會強制執行計算。
讓我們理解一些轉換,動作和函數。
讀取文件並顯示前n個元素:
rdd = sc.textFile("file:///home/edureka/Desktop/Sample")
rdd.take(n)
轉換為小寫和拆分:(降低和拆分)
def Func(lines):
lines = lines.lower()
lines = lines.split()
return lines
rdd1 = rdd.map(Func)
rdd1.take(5)
刪除StopWords :(過濾器)
stop_words = ['a','all','the','as','is','am','an','and','be','been','from','had','I','I'd','why','with']
rdd2 = rdd1.filter(lambda z: z not in stop_words)
rdd2.take(10)
數字總和從1到500 :(減少)
sum_rdd = sc.parallelize(range(1,500))
sum_rdd.reduce(lambda x,y: x+y)
124750
使用PySpark進行機器學習
繼續我們的PySpark教程,讓我們分析一些籃球數據並進行一些預測。所以,在這裡我們將使用自1980年以來NBA所有球員的數據[引入3指針的年份]。
df = spark.read.option('header','true')\
.option('inferSchema','true')
.csv("file:///home/edureka/Downloads/season_totals.csv")
print(df.columns)
排序玩家(OrderBy)和 toPandas:
在這裡,我們根據一個賽季得分來排序球員。
df.orderBy('pts',ascending = False).limit(10).toPandas()[['yr','player','age','pts','fg3']]
使用DSL和matplotlib:
在這裡,我們分析了每個賽季
3次嘗試的平均次數,在36分鐘 的時間限制內[對應於足夠休息的近似完整的NBA比賽的間隔]。我們使用3點射門次數(fg3a)和分鐘數(mp)來計算此指標,然後使用matlplotlib。來自 pyspark。sql。函數 import col
from pyspark.sql.functions import col
fga_py = df.groupBy('yr')\
.agg({'mp' : 'sum', 'fg3a' : 'sum'})
.select(col('yr'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_p36m'))\
.orderBy('yr')
from matplotlib import pyplot as plt
import seaborn as sns
plt.style.use('fivethirtyeight')
_df = fga_py.toPandas()
plt.plot(_df.yr,_df.fg3a_p36m, color = '#CD5C5C')
plt.xlabel('Year')
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.annotate('3 pointer introduced', xy=(1980, .5), xytext=(1981, 1.1), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved in 3-point line', xy=(1996, 2.4), xytext=(1991.5, 2.7), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved back\n3-point line', xy=(1998, 2.), xytext=(1998.5, 2.4), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
線性迴歸和VectorAssembler:
我們可以在此曲線上擬合線性迴歸模型,以模擬未來5年的射擊次數。我們必須使用VectorAssembler 函數將數據轉換 為單個列。這是一個必要條件為在MLlib線性迴歸API。
來自 pyspark。毫升。功能 導入 VectorAssembler
from pyspark.ml.feature import VectorAssembler
t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
training = t.transform(fga_py)\
.withColumn('yr',fga_py.yr)\
.withColumn('label',fga_py.fg3a_p36m)
training.toPandas().head()
然後,我們使用轉換後的數據構建線性迴歸模型對象。
來自 pyspark。毫升。迴歸 導入 LinearRegression
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)
model = lr.fit(training)
將訓練模型應用於數據集:
我們將訓練有素的模型對象模型應用於我們的原始訓練集以及5年的未來數據:
from pyspark.sql.types import Row
# apply model for the 1979-80 season thru 2020-21 season
training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect()
training_y = training.select('fg3a_p36m').rdd.map(lambda x: x[0]).collect()
prediction_yrs = [2017, 2018, 2019, 2020, 2021]
all_yrs = training_yrs + prediction_yrs
# built testing DataFrame
test_rdd = sc.parallelize(all_yrs)
row = Row('yr')<
all_years_features = t.transform(test_rdd.map(row).toDF())
# apply linear regression model
df_results = model.transform(all_years_features).toPandas()
繪製最終預測:
然後,我們可以繪製結果並將圖表保存在指定位置。
plt.plot(df_results.yr,df_results.prediction, linewidth = 2, linestyle = '--',color = '#224df7', label = 'L2 Fit')
plt.plot(training_yrs, training_y, color = '#f08080', label = None)
plt.xlabel('Year')
plt.ylabel('Number of attempts')
plt.legend(loc = 4)
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.tight_layout()
plt.savefig("/home/edureka/Downloads/Images/REGRESSION.png")
而且,通過這個圖,我們來到這個PySpark教程的末尾。
夥計們,這就是它!
我希望你們知道PySpark是什麼,為什麼Python最適合Spark,RDD和Pyspark機器學習的一瞥。恭喜,您不再是PySpark的新手了。
閱讀更多 故事你真的zai聽嗎 的文章