python-進程池

********進程池********

****進程池****

為什麼要有進程池?

答:

在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千

萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程麼?首先,創建進程需要消耗時間,銷燬

程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響

序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那麼我們要怎麼做呢?

在這裡,要給大家介紹一個進程池的概念,定義一個池子,在裡面放上固定數量的進程,有需求來了,就

一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。

果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閒進

才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣

增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現併發效果。

******multiprocess.Pool模塊******

****概念介紹

Pool([numprocess [,initializer [, initargs]]]):創建進程池

**參數介紹

numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值(這是os模塊的一個方法)

initializer:是每個工作進程啟動時要執行的可調用對象,默認為None

initargs:是要傳給initialiizer的參數組

**主要方法

def apply(self, func, args=(), kwds={}):

'''

Equivalent of `func(*args, **kwds)`.

'''

assert self._state == RUN

return self.apply_async(func, args, kwds).get()

def map(self, func, iterable, chunksize=None):

'''

Apply `func` to each element in `iterable`, collecting the results

in a list that is returned.

'''

return self._map_async(func, iterable, mapstar, chunksize).get()

def apply_async(self, func, args=(), kwds={}, callback=None,

error_callback=None):

'''

Asynchronous version of `apply()` method.

'''

if self._state != RUN:

raise ValueError("Pool not running")

result = ApplyResult(self._cache, callback, error_callback)

self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))

return result

1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。

'''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數併發地執行func函數,

必須從不同線程調用p.apply()函數或者使用p.apply_async()'''

同步,只有func被執行完後才會繼續執行代碼,返回值為func的return值

同步處理任務,進程池中的所有進程都是普通進程

2 p.map(self, func, iterable, chunksize=None):

異步,自帶close和join,返回值為func返回值組成的列表

3 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。

'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,

將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。'''

異步,當func被註冊進入一個進程後,程序就繼續向下執行,返回一個對象,這個對象有get方法可以取到值(這是func的返回值)

obj.get() 會阻塞,知道對應的func執行完畢拿到結果。需要先close後join來保持多進程和主進程代碼的同步性。

異步處理任務時,進程池中的所有進程都是守護進程

有回調函數 callback

4 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成

5 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用

**其他方法

1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法

2 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果

遠程操作中引發了異常,它將在調用此方法時再次被引發。

3 obj.ready():如果調用完成,返回True

4 obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常

5 obj.wait([timeout]):等待結果變為可用。

6 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

**例子

*同步和異步*

同步

from multiprocessing import Pool

import os, time

def work(n):

print('%s run' % os.getpid())

time.sleep(1)

return n * 3

if __name__ == '__main__':

p = Pool(3) # 進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務

for i in range(10):

res = p.apply(work, args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞

# 但不管該任務是否存在阻塞,同步調用都會在原地等著

print(res) # res為func的返回值

異步

import os

import time

import random

from multiprocessing import Pool

def work(n):

print('%s run' % os.getpid())

time.sleep(random.random())

return n ** 2

if __name__ == '__main__':

p = Pool(3) # 進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務

res_l = []

for i in range(10):

res = p.apply_async(work, args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行

# 返回結果之後,將結果放入列表,歸還進程,之後再執行新的任務

# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束

# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。

res_l.append(res)

# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然後可以用get收集結果

# 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了

p.close()

p.join()

for res in res_l:

print(res.get()) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

回調函數

需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:

我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),

這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

如果在主進程中等待進程池中所有任務都執行完畢後,再統一處理結果,則無需回調函數

python-進程池


分享到:


相關文章: