一、前言
很多時候我們寫了一個爬蟲,實現了需求後會發現了很多值得改進的地方,其中很重要的一點就是爬取速度。本文 就通過代碼講解如何使用多進程、多線程、協程 來提升爬取速度。注意:我們不深入介紹理論和原理,一切都在代碼中。
二、同步
首先我們寫一個簡化的爬蟲,對各個功能細分,有意識進行函數式編程。下面代碼的目的是訪問300次百度頁面並返回狀態碼,其中 parse_1 函數可以設定循環次數,每次循環將當前循環數(從0開始)和url傳入 parse_2 函數。
import requests
def parse_1():
url = 'https://www.baidu.com'
for i in range(300):
parse_2(url)
def parse_2(url):
response = requests.get(url)
print(response.status_code)
if __name__ == '__main__':
parse_1()
性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引起等待
示例代碼就是典型的串行邏輯, parse_1 將url和循環數傳遞給 parse_2 , parse_2 請求並返回狀態碼後 parse_1 繼續迭代一次,重複之前步驟
三、多線程
因為CPU在執行程序時每個時間刻度上只會存在一個線程,因此多線程實際上提高了進程的使用率從而提高了CPU的使用率
實現多線程的庫有很多,這裡用 concurrent.futures 中的 ThreadPoolExecutor 來演示。介紹 ThreadPoolExecutor 庫是因為它相比其他庫代碼更簡潔
為了方便說明問題,下面代碼中如果是新增加的部分,代碼行前會加上 > 符號便於觀察說明問題,實際運行需要去掉
import requests
> from concurrent.futures import ThreadPoolExecutor
def parse_1():
url = 'https://www.baidu.com'
# 建立線程池
> pool = ThreadPoolExecutor(6)
for i in range(300):
> pool.submit(parse_2, url)
> pool.shutdown(wait=True)
def parse_2(url):
response = requests.get(url)
print(response.status_code)
if __name__ == '__main__':
parse_1()
跟同步相對的就是異步 。異步就是彼此獨立,在等待某事件的過程中繼續做自己的事,不需要等待這一事件完成後再工作。線程就是實現異步的一個方式,也就是說多線程是異步處理異步就意味著不知道處理結果,有時候我們需要了解處理結果,就可以採用回調
import requests
from concurrent.futures import ThreadPoolExecutor
# 增加回調函數
> def callback(future):
> print(future.result())
def parse_1():
url = 'https://www.baidu.com'
pool = ThreadPoolExecutor(6)
for i in range(300):
> results = pool.submit(parse_2, url)
# 回調的關鍵步驟
> results.add_done_callback(callback)
pool.shutdown(wait=True)
def parse_2(url):
response = requests.get(url)
print(response.status_code)
if __name__ == '__main__':
parse_1()
Python實現多線程有一個無數人詬病的GIL(全局解釋器鎖) ,但多線程對於爬取網頁這種多數屬於IO密集型的任務依舊很合適。
四、多進程
多進程用兩個方法實現: ProcessPoolExecutor 和 multiprocessing
1. ProcessPoolExecutor
和實現多線程的 ThreadPoolExecutor 類似
import requests
> from concurrent.futures import ProcessPoolExecutor
def parse_1():
url = 'https://www.baidu.com'
# 建立線程池
> pool = ProcessPoolExecutor(6)
for i in range(300):
> pool.submit(parse_2, url)
> pool.shutdown(wait=True)
def parse_2(url):
response = requests.get(url)
print(response.status_code)
if __name__ == '__main__':
parse_1()
可以看到改動了兩次類名,代碼依舊很簡潔,同理也可以添加回調 函數
import requests
from concurrent.futures import ProcessPoolExecutor
> def callback(future):
> print(future.result())
def parse_1():
url = 'https://www.baidu.com'
pool = ProcessPoolExecutor(6)
for i in range(300):
> results = pool.submit(parse_2, url)
> results.add_done_callback(callback)
pool.shutdown(wait=True)
def parse_2(url):
response = requests.get(url)
print(response.status_code)
if __name__ == '__main__':
parse_1()
2. multiprocessing
直接看代碼,一切都在註釋中。
import requests
> from multiprocessing import Pool
def parse_1():
url = 'https://www.baidu.com'
# 建池
> pool = Pool(processes=5)
# 存放結果
> res_lst = []
for i in range(300):
# 把任務加入池中
> res = pool.apply_async(func=parse_2, args=(url,))
# 獲取完成的結果(需要取出)
> res_lst.append(res)
# 存放最終結果(也可以直接存儲或者print)
> good_res_lst = []
> for res in res_lst:
# 利用get獲取處理後的結果
> good_res = res.get()
# 判斷結果的好壞
> if good_res:
> good_res_lst.append(good_res)
# 關閉和等待完成
> pool.close()
> pool.join()
def parse_2(url):
response = requests.get(url)
print(response.status_code)
if __name__ == '__main__':
parse_1()
可以看到 multiprocessing 庫的代碼稍繁瑣,但支持更多的拓展。多進程和多線程確實能夠達到加速的目的,但如果遇到IO阻塞會出現線程或者進程的浪費 ,因此有一個更好的方法……
五、異步非阻塞
協程+回調 配合動態協作就可以達到異步非阻塞的目的,本質只用了一個線程,所以很大程度利用了資源
實現異步非阻塞經典是利用 asyncio 庫+ yield ,為了方便利用逐漸出現了更上層的封裝 aiohttp ,要想更好的理解異步非阻塞最好還是深入瞭解 asyncio 庫。而 gevent 是一個非常方便實現協程的庫
import requests
> from gevent import monkey
# 猴子補丁是協作運行的靈魂
> monkey.patch_all()
> import gevent
def parse_1():
url = 'https://www.baidu.com'
# 建立任務列表
> tasks_list = []
for i in range(300):
> task = gevent.spawn(parse_2, url)
> tasks_list.append(task)
> gevent.joinall(tasks_list)
def parse_2(url):
response = requests.get(url)
print(response.status_code)
if __name__ == '__main__':
parse_1()
gevent能很大提速,也引入了新的問題:如果我們不想速度太快給服務器造成太大負擔怎麼辦? 如果是多進程多線程的建池方法,可以控制池內數量。如果用gevent想要控制速度也有一個不錯的方法: 建立隊列。 gevent中也提供了 Quene類 ,下面代碼改動較大
import requests
from gevent import monkey
monkey.patch_all()
import gevent
> from gevent.queue import Queue
def parse_1():
url = 'https://www.baidu.com'
tasks_list = []
# 實例化隊列
> quene = Queue()
for i in range(300):
# 全部url壓入隊列
> quene.put_nowait(url)
# 兩路隊列
> for _ in range(2):
> task = gevent.spawn(parse_2)
> tasks_list.append(task)
gevent.joinall(tasks_list)
# 不需要傳入參數,都在隊列中
> def parse_2():
# 循環判斷隊列是否為空
> while not quene.empty():
# 彈出隊列
> url = quene.get_nowait()
response = requests.get(url)
# 判斷隊列狀態
> print(quene.qsize(), response.status_code)
if __name__ == '__main__':
parse_1()
結束語
以上就是幾種常用的加速方法。如果對代碼測試感興趣可以利用time模塊判斷運行時間。爬蟲的加速是重要技能,但適當控制速度也是爬蟲工作者的良好習慣,不要給服務器太大壓力,拜拜~
閱讀更多 有趣的程序媛 的文章