並行處理是一種在同一臺計算機的多個處理器中同時運行任務的工作模式。 這種工作模式的目的就是減少總的任務處理時間。 在本教程中,您將瞭解使用python多進程模塊對任何類型的邏輯過程進行並行處理的流程。
內容:
1. 簡介
2. 最多可以進行多少個並行處理?
3. 什麼是同步和異步執行?
4. 問題討論:計算每行中給定數值範圍內的元素個數
不使用並行處理的解答
5. 如何進行函數並行化?
6. 異步並行處理
7. 如何對Pandas DataFrame進行並行處理?
8. 練習
9. 總結
1. 簡介
並行處理是一種在同一臺計算機的多個處理器中同時運行任務的工作模式。 這種工作模式的目的就是減少總的任務處理時間。但是進程之間的通信會有額外的開銷,因此對小的任務而言,總的任務時間會有所增加而不是減少。
在Python語言中,multiprocessing模塊通過使用子進程(而不是線程)來運行獨立的並行進程。 它可以讓您利用機器上的多個處理器(Windows和Unix),也就是說,多個進程可以完全獨立的在內存中運行。
學習了本教程的內容之後,您將瞭解:
在使用multiprocessing進行並行處理時,如何理解語法並組織代碼?
如何實現同步和異步並行處理?
使用multiprocessing.Pool接口完成3個不同的用例。
2. 最多可以進行多少個並行處理?
您一次可以運行的最大進程數受計算機中處理器數量的限制。 如果您不知道機器中有多少處理器,可以使用multiprocessing模塊中的cpu_count函數進行顯示。
Python代碼:
3. 同步執行和異步執行?
在並行處理中,有兩種執行類型:同步和異步。
同步執行就是各個進程按照啟動的先後,順序完成。 這是通過鎖定主程序直到相應的進程運行完畢來實現的。
而異步執行,換句話說,進程的執行不涉及鎖定。這樣做的結果就是,進程結果返回的順序可能會混淆,但通常情況下,異步執行會更快完成。
multiprocessing 模塊中有兩個對象是用來實現函數並行執行的:Pool 類和Process 類。
接下來,我們討論一個典型的問題,並使用上述技術實現並行處理。在本教程中,我們將重點使用Pool類,因為它使用起來很方便,並可以滿足幾乎所有的並行處理需求。
4. 問題討論:計算每行中給定數值範圍內的元素個數
第一個問題:給定一個二維矩陣(或者列表和多維列表),計算每行中給定數值範圍內的元素個數。我們可以在下面的列表基礎上開始工作。
不使用並行處理的參考代碼:
我們先看看不用並行計算它需要多長時間。為此,我們對函數howmany_within_range(如下)進行重複以檢查在範圍內的數有多少個並返回計數。
5. 如何對函數進行並行化處理?
對代碼進行並行處理通常的做法是取出其中可以多次運行的特定函數,將其放在不同的處理器上並行運行。
要做到這一點,就需要使用 Pool類對數目為n的處理器進行初始化,之後將想要並行運行的函數傳遞給Pool類中的並行方法。
multiprocessing.Pool 中提供了 apply, map 和 starmap 等方法對傳入的函數並行運行。
這簡直太完美了!那麼apply和 map之間又有什麼區別呢?
apply和 map都是把要進行並行化的函數作為主要參數。但是不同的是, apply接受args參數, 通過args將各個參數傳送給被並行化處理的函數,而map 僅將一個迭代器作為參數。
因此,對於簡單的可迭代的操作,使用map進行並行處理更適合,而且能更快完成工作。
當我們看到如何使用apply和map對函數howmany_within_range進行並行化處理之後,我們還會介紹starmap。
5.1. Pool.apply 進行並行化處理
我們來使用multiprocessing.Pool,對howmany_within_range 函數進行並行化處理。
5.2. Parallelizing using Pool.map
Pool.map僅接受一個迭代器參數。 為了變通起見,我把howmany_within_range函數做了修改,為 minimum 和 maximum參數設定了缺省值,並另存為新的函數 howmany_within_range_rowonly,這個函數可以只接受行數據列表迭代器作為輸入。我知道這種做法不是map的一個最好的用法,但它已經清楚地顯示出它與apply的不同之處。
5.3. 使用Pool.starmap 進行並行化
在前面的例子中,我們必須重新定義howmany_within_range函數,讓其中的一對參數使用默認值。 而使用starmap,您就能避免這樣做。 你怎麼問?
與Pool.map一樣,Pool.starmap也只僅接受一個迭代器參數,但在starmap中,迭代器種的每一個元件也是一個迭代器。你可以通過這個內部迭代器向被並行化處理的函數傳遞參數,在執行時再順序解開,只要傳遞和解開的順序一致就可以。
實際上,Pool.starmap就像是一個接受參數的Pool.map版本。
6. 異步並行處理
和同步並行處理對等的異步並行處理函數 apply_async,map_async和starmap_async允許您以異步方式並行執行進程,即下一個進程可以在前一個進程完成時立即啟動,而不考慮啟動順序。 因此,無法保證結果與輸入的順序相同。
6.1 使用Pool.apply_async進行並行化
apply_async的使用與apply非常相似,只是你需要提供一個回調函數來告訴如何存儲計算結果。
但是,使用apply_async時需要注意的是,結果中的數字順序會混亂,表明進程沒有按照啟動的順序完成。
變通的辦法就是,我們重新定義一個新的howmany_within_range2,接受並返回迭代序號(i),然後對最終結果進行排序。
使用apply_async時,不提供回調函數也是可以的。只是這時候,如果您不提供回調函數,那麼您將獲得pool.ApplyResult對象的列表,其中包含來自每個進程的計算輸出值。 從這裡,您需要使用pool.ApplyResult.get方法來得到所需的最終結果。
6.2 使用Pool.starmap_async進行並行化
你已經見識了apply_async的使用。你是否可以想象一下或者寫一個 starmap_async and map_async的對應版本呢? 實現代碼如下:
7. 如何對Pandas DataFrame進行並行處理?
到目前為止,您已經瞭解瞭如何通過使函數在列表上工作來進行函數並行化。
但是,在處理數據分析或機器學習項目時,您可能希望對Pandas Dataframe 進行並行化,Pandas Dataframe是除了numpy數組之外,最常用的存儲表格數據對象。
在對DataFrame進行並行化時,您可以把要被並行化的函數作為輸入參數:
DataFrame的一行
DataFrame的一列
整個DataFrame
前兩個可以使用multiprocessing本身就可以完成。 但是對於最後一個,即對整個dataframe進行並行化,我們將使用pathos包,pathos包內部使用了dill進行序列化。
首先,讓我們創建一個簡單的dataframe,看看如何進行逐行和逐列進行並行化。 在用戶定義的函數種使用了類似pd.apply的寫法,但這是並行處理。
現在已經有了dataframe。之後使用hypotenuse對每一行進行處理,每次同時運行4個進程。
為了做到這一點,在下面的代碼中,可以看到我們使用了df.itertuples(name=False)。設定name=False, 就可以把dataframe中的每一行作為一個簡單的元組送入hypotenuse函數
上面就是對dataframe每一行進行並行化的例子。我們來試試對每一列進行並行化。這裡,我使用了 df.iteritems將一列數據作為一個系列傳遞給sum_of_squares 函數。
接下來是第三部分——完成一個能接收Pandas Dataframe、NumPy數組的並行化函數。Pathos遵循multiprocessing的風格:Pool > Map > Close > Join > Clear。請查看pathos docs文檔以獲取更多信息。
8. 練習
問題1: 使用 Pool.apply 獲取list_a和list_b每一行相同的元素
參考答案:
問題2: 使用 Pool.map 並行運行下面的 python代碼
Python代碼名稱: ‘script1.py’, ‘script2.py’, ‘script3.py’
參考答案:
問題3: 將一個二維列表中的每一行歸一化到0到1之間
參考答案:
9. 總結
希望你能完成上面的練習,恭喜你們!
在這篇文章中,我們看到了使用multiprocessing模塊實現並行處理的整個過程和各種方法。 哪怕是在具有更多處理器數量的大型計算機上工作,上述過程也幾乎相同,您可以通過並行處理獲得真正的速度優勢。
祝各位編碼快樂,下次再見!
英文原文:https://www.machinelearningplus.com/python/parallel-processing-python/ 譯者:Xindong
閱讀更多 Python部落 的文章