Python多線程、多進程、協程!配合案例講解,這波到位!

一、前言

很多時候我們寫了一個爬蟲,實現了需求後會發現了很多值得改進的地方,其中很重要的一點就是爬取速度。本文 就通過代碼講解如何使用多進程、多線程、協程 來提升爬取速度。注意:我們不深入介紹理論和原理,一切都在代碼中。

二、同步

首先我們寫一個簡化的爬蟲,對各個功能細分,有意識進行函數式編程。下面代碼的目的是訪問300次百度頁面並返回狀態碼,其中 parse_1 函數可以設定循環次數,每次循環將當前循環數(從0開始)和url傳入 parse_2 函數。

import requests

def parse_1():

url = 'https://www.baidu.com'

for i in range(300):

parse_2(url)

def parse_2(url):

response = requests.get(url)

print(response.status_code)

if __name__ == '__main__':

parse_1()

性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引起等待

示例代碼就是典型的串行邏輯, parse_1 將url和循環數傳遞給 parse_2 , parse_2 請求並返回狀態碼後 parse_1 繼續迭代一次,重複之前步驟

三、多線程

因為CPU在執行程序時每個時間刻度上只會存在一個線程,因此多線程實際上提高了進程的使用率從而提高了CPU的使用率

實現多線程的庫有很多,這裡用 concurrent.futures 中的 ThreadPoolExecutor 來演示。介紹 ThreadPoolExecutor 庫是因為它相比其他庫代碼更簡潔

為了方便說明問題,下面代碼中如果是新增加的部分,代碼行前會加上 > 符號便於觀察說明問題,實際運行需要去掉

import requests

> from concurrent.futures import ThreadPoolExecutor

def parse_1():

url = 'https://www.baidu.com'

# 建立線程池

> pool = ThreadPoolExecutor(6)

for i in range(300):

> pool.submit(parse_2, url)

> pool.shutdown(wait=True)

def parse_2(url):

response = requests.get(url)

print(response.status_code)

if __name__ == '__main__':

parse_1()

跟同步相對的就是異步 。異步就是彼此獨立,在等待某事件的過程中繼續做自己的事,不需要等待這一事件完成後再工作。線程就是實現異步的一個方式,也就是說多線程是異步處理異步就意味著不知道處理結果,有時候我們需要了解處理結果,就可以採用回調

import requests

from concurrent.futures import ThreadPoolExecutor

# 增加回調函數

> def callback(future):

> print(future.result())

def parse_1():

url = 'https://www.baidu.com'

pool = ThreadPoolExecutor(6)

for i in range(300):

> results = pool.submit(parse_2, url)

# 回調的關鍵步驟

> results.add_done_callback(callback)

pool.shutdown(wait=True)

def parse_2(url):

response = requests.get(url)

print(response.status_code)

if __name__ == '__main__':

parse_1()

Python實現多線程有一個無數人詬病的GIL(全局解釋器鎖) ,但多線程對於爬取網頁這種多數屬於IO密集型的任務依舊很合適。

四、多進程

多進程用兩個方法實現: ProcessPoolExecutor 和 multiprocessing

1. ProcessPoolExecutor

和實現多線程的 ThreadPoolExecutor 類似

import requests

> from concurrent.futures import ProcessPoolExecutor

def parse_1():

url = 'https://www.baidu.com'

# 建立線程池

> pool = ProcessPoolExecutor(6)

for i in range(300):

> pool.submit(parse_2, url)

> pool.shutdown(wait=True)

def parse_2(url):

response = requests.get(url)

print(response.status_code)

if __name__ == '__main__':

parse_1()

可以看到改動了兩次類名,代碼依舊很簡潔,同理也可以添加回調 函數

import requests

from concurrent.futures import ProcessPoolExecutor

> def callback(future):

> print(future.result())

def parse_1():

url = 'https://www.baidu.com'

pool = ProcessPoolExecutor(6)

for i in range(300):

> results = pool.submit(parse_2, url)

> results.add_done_callback(callback)

pool.shutdown(wait=True)

def parse_2(url):

response = requests.get(url)

print(response.status_code)

if __name__ == '__main__':

parse_1()

2. multiprocessing

直接看代碼,一切都在註釋中。

import requests

> from multiprocessing import Pool

def parse_1():

url = 'https://www.baidu.com'

# 建池

> pool = Pool(processes=5)

# 存放結果

> res_lst = []

for i in range(300):

# 把任務加入池中

> res = pool.apply_async(func=parse_2, args=(url,))

# 獲取完成的結果(需要取出)

> res_lst.append(res)

# 存放最終結果(也可以直接存儲或者print)

> good_res_lst = []

> for res in res_lst:

# 利用get獲取處理後的結果

> good_res = res.get()

# 判斷結果的好壞

> if good_res:

> good_res_lst.append(good_res)

# 關閉和等待完成

> pool.close()

> pool.join()

def parse_2(url):

response = requests.get(url)

print(response.status_code)

if __name__ == '__main__':

parse_1()

可以看到 multiprocessing 庫的代碼稍繁瑣,但支持更多的拓展。多進程和多線程確實能夠達到加速的目的,但如果遇到IO阻塞會出現線程或者進程的浪費 ,因此有一個更好的方法……

五、異步非阻塞

協程+回調 配合動態協作就可以達到異步非阻塞的目的,本質只用了一個線程,所以很大程度利用了資源

實現異步非阻塞經典是利用 asyncio 庫+ yield ,為了方便利用逐漸出現了更上層的封裝 aiohttp ,要想更好的理解異步非阻塞最好還是深入瞭解 asyncio 庫。而 gevent 是一個非常方便實現協程的庫

import requests

> from gevent import monkey

# 猴子補丁是協作運行的靈魂

> monkey.patch_all()

> import gevent

def parse_1():

url = 'https://www.baidu.com'

# 建立任務列表

> tasks_list = []

for i in range(300):

> task = gevent.spawn(parse_2, url)

> tasks_list.append(task)

> gevent.joinall(tasks_list)

def parse_2(url):

response = requests.get(url)

print(response.status_code)

if __name__ == '__main__':

parse_1()

gevent能很大提速,也引入了新的問題:如果我們不想速度太快給服務器造成太大負擔怎麼辦? 如果是多進程多線程的建池方法,可以控制池內數量。如果用gevent想要控制速度也有一個不錯的方法: 建立隊列。 gevent中也提供了 Quene類 ,下面代碼改動較大

import requests

from gevent import monkey

monkey.patch_all()

import gevent

> from gevent.queue import Queue

def parse_1():

url = 'https://www.baidu.com'

tasks_list = []

# 實例化隊列

> quene = Queue()

for i in range(300):

# 全部url壓入隊列

> quene.put_nowait(url)

# 兩路隊列

> for _ in range(2):

> task = gevent.spawn(parse_2)

> tasks_list.append(task)

gevent.joinall(tasks_list)

# 不需要傳入參數,都在隊列中

> def parse_2():

# 循環判斷隊列是否為空

> while not quene.empty():

# 彈出隊列

> url = quene.get_nowait()

response = requests.get(url)

# 判斷隊列狀態

> print(quene.qsize(), response.status_code)

if __name__ == '__main__':

parse_1()

結束語

以上就是幾種常用的加速方法。如果對代碼測試感興趣可以利用time模塊判斷運行時間。爬蟲的加速是重要技能,但適當控制速度也是爬蟲工作者的良好習慣,不要給服務器太大壓力,拜拜~


分享到:


相關文章: