揭祕 Python 協程

揭秘 Python 協程

什麼是協程?

協程是實現併發編程的一種方式。一說併發,你肯定想到了多線程/多進程模型,沒錯,多線程/多進程,正是解決併發問題的經典模型之一。最初的互聯網世界,多線程/多進程在服務器併發中,起到舉足輕重的作用。

隨著互聯網的快速發展,你逐漸遇到了 C10K 瓶頸,也就是同時連接到服務器的客戶達到了一萬個。於是很多代碼跑崩了,進程上下文切換佔用了大量的資源,線程也頂不住如此巨大的壓力,這時, NGINX 帶著事件循環出來拯救世界了。

如果將多進程/多線程類比為起源於唐朝的藩鎮割據,那麼事件循環,就是宋朝加強的中央集權制。事件循環啟動一個統一的調度器,讓調度器來決定一個時刻去運行哪個任務,於是省卻了多線程中啟動線程、管理線程、同步鎖等各種開銷。同一時期的 NGINX,在高併發下能保持低資源低消耗高性能,相比 Apache 也支持更多的併發連接。

再到後來,出現了一個很有名的名詞,叫做回調地獄(callback hell),手擼過 JavaScript 的朋友肯定知道我在說什麼。我們大家驚喜地發現,這種工具完美地繼承了事件循環的優越性,同時還能提供 async / await 語法糖,解決了執行性和可讀性共存的難題。於是,協程逐漸被更多人發現並看好,也有越來越多的人嘗試用 Node.js 做起了後端開發。(講個笑話,JavaScript 是一門編程語言。)

回到我們的 Python。使用生成器,是 Python 2 開頭的時代實現協程的老方法了,Python 3.7 提供了新的基於 asyncio 和 async / await 的方法。同樣的,跟隨時代,拋棄掉不容易理解、也不容易寫的舊的基於生成器的方法,直接來講新方法。

我們先從一個爬蟲實例出發,用清晰的講解思路,帶你結合實戰來搞懂這個不算特別容易理解的概念。之後,我們再由淺入深,直擊協程的核心。

從一個爬蟲說起

爬蟲,就是互聯網的蜘蛛,在搜索引擎誕生之時,與其一同來到世上。爬蟲每秒鐘都會爬取大量的網頁,提取關鍵信息後存儲在數據庫中,以便日後分析。爬蟲有非常簡單的 Python 十行代碼實現,也有 Google 那樣的全球分佈式爬蟲的上百萬行代碼,分佈在內部上萬臺服務器上,對全世界的信息進行嗅探。

話不多說,我們先看一個簡單的爬蟲例子:

import time
def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
time.sleep(sleep_time)
print('OK {}'.format(url))

def main(urls):
for url in urls:

crawl_page(url)

%time main(['url_1', 'url_2', 'url_3', 'url_4'])

########## 輸出 ##########
crawling url_1
OK url_1
crawling url_2
OK url_2
crawling url_3
OK url_3
crawling url_4
OK url_4
Wall time: 10 s

(注意:本節的主要目的是協程的基礎概念,因此我們簡化爬蟲的 scrawl_page 函數為休眠數秒,休眠時間取決於 url 最後的那個數字。)

這是一個很簡單的爬蟲,main() 函數執行時,調取 crawl_page() 函數進行網絡通信,經過若干秒等待後收到結果,然後執行下一個。

看起來很簡單,但你仔細一算,它也佔用了不少時間,五個頁面分別用了 1 秒到 4 秒的時間,加起來一共用了 10 秒。這顯然效率低下,該怎麼優化呢?

於是,一個很簡單的思路出現了——我們這種爬取操作,完全可以併發化。我們就來看看使用協程怎麼寫。

import asyncio
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)

print('OK {}'.format(url))

async def main(urls):
for url in urls:
await crawl_page(url)

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

########## 輸出 ##########
crawling url_1
OK url_1
crawling url_2
OK url_2
crawling url_3
OK url_3
crawling url_4
OK url_4
Wall time: 10 s

看到這段代碼,你應該發現了,在 Python 3.7 以上版本中,使用協程寫異步程序非常簡單。

首先來看 import asyncio,這個庫包含了大部分我們實現協程所需的魔法工具。

async 修飾詞聲明異步函數,於是,這裡的 crawl_page 和 main 都變成了異步函數。而調用異步函數,我們便可得到一個協程對象(coroutine object)。

舉個例子,如果你 print(crawl_page('')),便會輸出<coroutine>,提示你這是一個 Python 的協程對象,而並不會真正執行這個函數。/<coroutine>

再來說說協程的執行。執行協程有多種方法,這裡我介紹一下常用的三種。

首先,我們可以通過 await 來調用。await 執行的效果,和 Python 正常執行是一樣的,也就是說程序會阻塞在這裡,進入被調用的協程函數,執行完畢返回後再繼續,而這也是 await 的字面意思。代碼中 await asyncio.sleep(sleep_time) 會在這裡休息若干秒,await crawl_page(url) 則會執行 crawl_page() 函數。

其次,我們可以通過 asyncio.create_task() 來創建任務,這個我們下節課會詳細講一下,你先簡單知道即可。

最後,我們需要 asyncio.run 來觸發運行。asyncio.run 這個函數是 Python 3.7 之後才有的特性,可以讓 Python 的協程接口變得非常簡單,你不用去理會事件循環怎麼定義和怎麼使用的問題(我們會在下面講)。一個非常好的編程規範是,asyncio.run(main()) 作為主程序的入口函數,在程序運行週期內,只調用一次 asyncio.run。

這樣,你就大概看懂了協程是怎麼用的吧。不妨試著跑一下代碼,欸,怎麼還是 10 秒?

10 秒就對了,還記得上面所說的,await 是同步調用,因此, crawl_page(url) 在當前的調用結束之前,是不會觸發下一次調用的。於是,這個代碼效果就和上面完全一樣了,相當於我們用異步接口寫了個同步代碼。

現在又該怎麼辦呢?

其實很簡單,也正是我接下來要講的協程中的一個重要概念,任務(Task)。老規矩,先看代碼。

import asyncio
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))


async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
for task in tasks:
await task

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
########## 輸出 ##########
crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4
Wall time: 3.99 s

你可以看到,我們有了協程對象後,便可以通過 asyncio.create_task 來創建任務。任務創建後很快就會被調度執行,這樣,我們的代碼也不會阻塞在任務這裡。所以,我們要等所有任務都結束才行,用for task in tasks: await task 即可。

這次,你就看到效果了吧,結果顯示,運行總時長等於運行時間最長的爬蟲。

當然,你也可以想一想,這裡用多線程應該怎麼寫?而如果需要爬取的頁面有上萬個又該怎麼辦呢?再對比下協程的寫法,誰更清晰自是一目瞭然。

其實,對於執行 tasks,還有另一種做法:

import asyncio
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])

await asyncio.sleep(sleep_time)
print('OK {}'.format(url))
async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
await asyncio.gather(*tasks)
%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
########## 輸出 ##########
crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4
Wall time: 4.01 s

這裡的代碼也很好理解。唯一要注意的是,tasks 解包列表,將列表變成了函數的參數;與之對應的是, * dict 將字典變成了函數的參數。

另外,asyncio.create_task,asyncio.run 這些函數都是 Python 3.7 以上的版本才提供的,自然,相比於舊接口它們也更容易理解和閱讀。

解密協程運行時

說了這麼多,現在,我們不妨來深入代碼底層看看。有了前面的知識做基礎,你應該很容易理解這兩段代碼。

import asyncio
async def worker_1():
print('worker_1 start')
await asyncio.sleep(1)
print('worker_1 done')
async def worker_2():
print('worker_2 start')

await asyncio.sleep(2)
print('worker_2 done')
async def main():
print('before await')
await worker_1()
print('awaited worker_1')
await worker_2()
print('awaited worker_2')
%time asyncio.run(main())
########## 輸出 ##########
before await
worker_1 start
worker_1 done
awaited worker_1
worker_2 start
worker_2 done
awaited worker_2
Wall time: 3 s
import asyncio
async def worker_1():
print('worker_1 start')
await asyncio.sleep(1)
print('worker_1 done')
async def worker_2():
print('worker_2 start')
await asyncio.sleep(2)
print('worker_2 done')
async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print('before await')
await task1
print('awaited worker_1')
await task2
print('awaited worker_2')
%time asyncio.run(main())
########## 輸出 ##########
before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2
Wall time: 2.01 s

不過,第二個代碼,到底發生了什麼呢?為了讓你更詳細瞭解到協程和線程的具體區別,這裡我詳細地分析了整個過程。步驟有點多,彆著急,我們慢慢來看。

  1. asyncio.run(main()),程序進入 main() 函數,事件循環開啟;
  2. task1 和 task2 任務被創建,並進入事件循環等待運行;運行到 print,輸出 'before await';
  3. await task1 執行,用戶選擇從當前的主任務中切出,事件調度器開始調度 worker_1;
  4. worker_1 開始運行,運行 print 輸出'worker_1 start',然後運行到 await asyncio.sleep(1), 從當前任務切出,事件調度器開始調度 worker_2;
  5. worker_2 開始運行,運行 print 輸出 'worker_2 start',然後運行 await asyncio.sleep(2) 從當前任務切出;
  6. 以上所有事件的運行時間,都應該在 1ms 到 10ms 之間,甚至可能更短,事件調度器從這個時候開始暫停調度;
  7. 一秒鐘後,worker_1 的 sleep 完成,事件調度器將控制權重新傳給 task_1,輸出 'worker_1 done',task_1 完成任務,從事件循環中退出;
  8. await task1 完成,事件調度器將控制器傳給主任務,輸出 'awaited worker_1',·然後在 await task2 處繼續等待;
  9. 兩秒鐘後,worker_2 的 sleep 完成,事件調度器將控制權重新傳給 task_2,輸出 'worker_2 done',task_2 完成任務,從事件循環中退出;
  10. 主任務輸出 'awaited worker_2',協程全任務結束,事件循環結束。

接下來,我們進階一下。如果我們想給某些協程任務限定運行時間,一旦超時就取消,又該怎麼做呢?再進一步,如果某些協程運行時出現錯誤,又該怎麼處理呢?同樣的,來看代碼。

import asyncio
async def worker_1():
await asyncio.sleep(1)
return 1
async def worker_2():
await asyncio.sleep(2)
return 2 / 0
async def worker_3():
await asyncio.sleep(3)
return 3
async def main():
task_1 = asyncio.create_task(worker_1())
task_2 = asyncio.create_task(worker_2())
task_3 = asyncio.create_task(worker_3())
await asyncio.sleep(2)
task_3.cancel()
res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
print(res)
%time asyncio.run(main())
########## 輸出 ##########
[1, ZeroDivisionError('division by zero'), CancelledError()]
Wall time: 2 s

你可以看到,worker_1 正常運行,worker_2 運行中出現錯誤,worker_3 執行時間過長被我們 cancel 掉了,這些信息會全部體現在最終的返回結果 res 中。

不過要注意return_exceptions=True這行代碼。如果不設置這個參數,錯誤就會完整地 throw 到我們這個執行層,從而需要 try except 來捕捉,這也就意味著其他還沒被執行的任務會被全部取消掉。為了避免這個局面,我們將 return_exceptions 設置為 True 即可。

到這裡,發現了沒,線程能實現的,協程都能做到。那就讓我們用協程來實現一個經典的生產者消費者模型吧。

import asyncio
import random
async def consumer(queue, id):
while True:
val = await queue.get()
print('{} get a val: {}'.format(id, val))
await asyncio.sleep(1)
async def producer(queue, id):
for i in range(5):
val = random.randint(1, 10)
await queue.put(val)
print('{} put a val: {}'.format(id, val))
await asyncio.sleep(1)
async def main():
queue = asyncio.Queue()
consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))
producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
producer_2 = asyncio.create_task(producer(queue, 'producer_2'))
await asyncio.sleep(10)
consumer_1.cancel()
consumer_2.cancel()
await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)
%time asyncio.run(main())
########## 輸出 ##########
producer_1 put a val: 5
producer_2 put a val: 3
consumer_1 get a val: 5
consumer_2 get a val: 3
producer_1 put a val: 1
producer_2 put a val: 3
consumer_2 get a val: 1
consumer_1 get a val: 3
producer_1 put a val: 6
producer_2 put a val: 10
consumer_1 get a val: 6
consumer_2 get a val: 10
producer_1 put a val: 4
producer_2 put a val: 5
consumer_2 get a val: 4
consumer_1 get a val: 5
producer_1 put a val: 2

producer_2 put a val: 8
consumer_1 get a val: 2
consumer_2 get a val: 8
Wall time: 10 s

實戰:豆瓣近日推薦電影爬蟲

最後,進入今天的實戰環節——實現一個完整的協程爬蟲。

任務描述:https://movie.douban.com/cinema/later/beijing/ 這個頁面描述了北京最近上映的電影,你能否通過 Python 得到這些電影的名稱、上映時間和海報呢?這個頁面的海報是縮小版的,我希望你能從具體的電影描述頁面中抓取到海報。

聽起來難度不是很大吧?我在下面給出了同步版本的代碼和協程版本的代碼,通過運行時間和代碼寫法的對比,希望你能對協程有更深的瞭解。(注意:為了突出重點、簡化代碼,這裡我省略了異常處理。)

不過,在參考我給出的代碼之前,你是不是可以自己先動手寫一下、跑一下呢?

import requests
from bs4 import BeautifulSoup
def main():
url = "https://movie.douban.com/cinema/later/beijing/"
init_page = requests.get(url).content
init_soup = BeautifulSoup(init_page, 'lxml')
all_movies = init_soup.find('div', id="showing-soon")
for each_movie in all_movies.find_all('div', class_="item"):
all_a_tag = each_movie.find_all('a')
all_li_tag = each_movie.find_all('li')

movie_name = all_a_tag[1].text
url_to_fetch = all_a_tag[1]['href']
movie_date = all_li_tag[0].text
response_item = requests.get(url_to_fetch).content
soup_item = BeautifulSoup(response_item, 'lxml')
img_tag = soup_item.find('img')
print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))
%time main()
########## 輸出 ##########
阿拉丁 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2553992741.jpg
龍珠超:布羅利 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2557371503.jpg
五月天人生無限公司 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2554324453.jpg
... ...
直播攻略 06月04日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2555957974.jpg
Wall time: 56.6 s
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch_content(url):
async with aiohttp.ClientSession(
headers=header, connector=aiohttp.TCPConnector(ssl=False)
) as session:
async with session.get(url) as response:
return await response.text()
async def main():
url = "https://movie.douban.com/cinema/later/beijing/"
init_page = await fetch_content(url)
init_soup = BeautifulSoup(init_page, 'lxml')
movie_names, urls_to_fetch, movie_dates = [], [], []
all_movies = init_soup.find('div', id="showing-soon")
for each_movie in all_movies.find_all('div', class_="item"):
all_a_tag = each_movie.find_all('a')
all_li_tag = each_movie.find_all('li')
movie_names.append(all_a_tag[1].text)
urls_to_fetch.append(all_a_tag[1]['href'])
movie_dates.append(all_li_tag[0].text)
tasks = [fetch_content(url) for url in urls_to_fetch]
pages = await asyncio.gather(*tasks)
for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
soup_item = BeautifulSoup(page, 'lxml')
img_tag = soup_item.find('img')
print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))
%time asyncio.run(main())
########## 輸出 ##########
阿拉丁 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2553992741.jpg
龍珠超:布羅利 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2557371503.jpg

五月天人生無限公司 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2554324453.jpg
... ...
直播攻略 06月04日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2555957974.jpg
Wall time: 4.98 s

總結

  • 協程和多線程的區別,主要在於兩點,一是協程為單線程;二是協程由用戶決定,在哪些地方交出控制權,切換到下一個任務。
  • 協程的寫法更加簡潔清晰,把async / await 語法和 create_task 結合來用,對於中小級別的併發需求已經毫無壓力。
  • 寫協程程序的時候,你的腦海中要有清晰的事件循環概念,知道程序在什麼時候需要暫停、等待 I/O,什麼時候需要一併執行到底。

最後的最後,請一定不要輕易炫技。多線程模型也一定有其優點,一個真正牛逼的程序員,應該懂得,在什麼時候用什麼模型能達到工程上的最優,而不是自覺某個技術非常牛逼,所有項目創造條件也要上。技術是工程,而工程則是時間、資源、人力等紛繁複雜的事情的折衷。


分享到:


相關文章: