********進程池********
****進程池****
為什麼要有進程池?
答:
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千
萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程麼?首先,創建進程需要消耗時間,銷燬
程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響
序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那麼我們要怎麼做呢?
在這裡,要給大家介紹一個進程池的概念,定義一個池子,在裡面放上固定數量的進程,有需求來了,就
一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。
果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閒進
才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣
增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現併發效果。
******multiprocess.Pool模塊******
****概念介紹
Pool([numprocess [,initializer [, initargs]]]):創建進程池
**參數介紹
numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值(這是os模塊的一個方法)
initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
initargs:是要傳給initialiizer的參數組
**主要方法
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
'''
assert self._state == RUN
return self.apply_async(func, args, kwds).get()
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
'''
Asynchronous version of `apply()` method.
'''
if self._state != RUN:
raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result
1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數併發地執行func函數,
必須從不同線程調用p.apply()函數或者使用p.apply_async()'''
同步,只有func被執行完後才會繼續執行代碼,返回值為func的return值
同步處理任務,進程池中的所有進程都是普通進程
2 p.map(self, func, iterable, chunksize=None):
異步,自帶close和join,返回值為func返回值組成的列表
3 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,
將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。'''
異步,當func被註冊進入一個進程後,程序就繼續向下執行,返回一個對象,這個對象有get方法可以取到值(這是func的返回值)
obj.get() 會阻塞,知道對應的func執行完畢拿到結果。需要先close後join來保持多進程和主進程代碼的同步性。
異步處理任務時,進程池中的所有進程都是守護進程
有回調函數 callback
4 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
5 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用
**其他方法
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
2 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果
遠程操作中引發了異常,它將在調用此方法時再次被引發。
3 obj.ready():如果調用完成,返回True
4 obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
5 obj.wait([timeout]):等待結果變為可用。
6 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
**例子
*同步和異步*
同步
from multiprocessing import Pool
import os, time
def work(n):
print('%s run' % os.getpid())
time.sleep(1)
return n * 3
if __name__ == '__main__':
p = Pool(3) # 進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務
for i in range(10):
res = p.apply(work, args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
# 但不管該任務是否存在阻塞,同步調用都會在原地等著
print(res) # res為func的返回值
異步
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' % os.getpid())
time.sleep(random.random())
return n ** 2
if __name__ == '__main__':
p = Pool(3) # 進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務
res_l = []
for i in range(10):
res = p.apply_async(work, args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
# 返回結果之後,將結果放入列表,歸還進程,之後再執行新的任務
# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束
# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。
res_l.append(res)
# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然後可以用get收集結果
# 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了
p.close()
p.join()
for res in res_l:
print(res.get()) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
回調函數
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:
我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),
這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
如果在主進程中等待進程池中所有任務都執行完畢後,再統一處理結果,則無需回調函數
閱讀更多 程序猿Monster 的文章