03.30 Python的 concurrent.futures 模塊介紹

Python3.2帶來了 concurrent.futures 模塊,這個模塊具有線程池和進程池、管理並行編程任務、處理非確定性的執行流程、進程/線程同步等功能。

此模塊由以下部分組成:

  • concurrent.futures.Executor: 這是一個虛擬基類,提供了異步執行的方法。
  • submit(function, argument): 調度函數(可調用的對象)的執行,將 argument 作為參數傳入。
  • map(function, argument): 將 argument 作為參數執行函數,以 異步 的方式。
  • shutdown(Wait=True): 發出讓執行者釋放所有資源的信號。
  • concurrent.futures.Future: 其中包括函數的異步執行。Future對象是submit任務(即帶有參數的functions)到executor的實例。

Executor是抽象類,可以通過子類訪問,即線程或進程的 ExecutorPools 。因為,線程或進程的實例是依賴於資源的任務,所以最好以“池”的形式將他們組織在一起,作為可以重用的launcher或executor。

使用線程池和進程池

線程池或進程池是用於在程序中優化和簡化線程/進程的使用。通過池,你可以提交任務給executor。池由兩部分組成,一部分是內部的隊列,存放著待執行的任務;另一部分是一系列的進程或線程,用於執行這些任務。池的概念主要目的是為了重用:讓線程或進程在生命週期內可以多次使用。它減少了創建創建線程和進程的開銷,提高了程序性能。重用不是必須的規則,但它是程序員在應用中使用池的主要原因。

Python的 concurrent.futures 模塊介紹

current.Futures 模塊提供了兩種 Executor 的子類,各自獨立操作一個線程池和一個進程池。這兩個子類分別是:

  • concurrent.futures.ThreadPoolExecutor(max_workers)
  • concurrent.futures.ProcessPoolExecutor(max_workers)

max_workers 參數表示最多有多少個worker並行執行任務。

下面的示例代碼展示了線程池和進程池的功能。這裡的任務是,給一個list number_list ,包含1到10。對list中的每一個數字,乘以1+2+3…+10000000的和(這個任務只是為了消耗時間)。

下面的代碼分別測試了:

  • 順序執行
  • 通過有5個worker的線程池執行
  • 通過有5個worker的進程池執行
import concurrent.futures
import time
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def evaluate_item(x):
# 計算總和,這裡只是為了消耗時間
result_item = count(x)
# 打印輸入和輸出結果
return result_item


def count(number) :
for i in range(0, 10000000):
i=i+1
return i * number

if __name__ == "__main__":
# 順序執行
start_time = time.time()
for item in number_list:
print(evaluate_item(item))
print("Sequential execution in " + str(time.time() - start_time), "seconds")
# 線程池執行
start_time_1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print ("Thread pool execution in " + str(time.time() - start_time_1), "seconds")
# 進程池
start_time_2 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print ("Process pool execution in " + str(time.time() - start_time_2), "seconds")

運行這個代碼,我們可以看到運行時間的輸出:

10000000
20000000
30000000
40000000
50000000
60000000
70000000
80000000
90000000
100000000
Sequential execution in 7.936585903167725 seconds
10000000
30000000
40000000
20000000
50000000
70000000

90000000
100000000
80000000
60000000
Thread pool execution in 7.633088827133179 seconds
40000000
50000000
10000000
30000000
20000000
70000000
90000000
60000000
80000000
100000000
Process pool execution in 4.787093639373779 seconds


分享到:


相關文章: