生產者消費者之間的橋樑

前言

在這個多元的信息化時代,一個頗具規模的互聯網公司,運營的系統可能每天需要處理海量的數據。當系統的處理速度達不到數據產生的速度的時候,就需要考慮一下異步通信的手段了。因為有可能就是一段同步調用的代碼,在數據量上漲的時候,導致系統的崩潰。所以需要採用異步調用的方式,將海量的數據先暫時放到消息隊列中,等待關聯的程序逐步消費調用。所以本文將介紹一下異步調用的常用技術之消息隊列

注:本文的消息隊列針對於python語言自帶的Queue模塊

<code>知識拓展:
在一線互聯網的公司面試中,經常會問到消息隊列的使用場景。場景其實有很多,但是常用的一般總結為:
解耦,異步,削峰/<code>


生產者消費者之間的橋樑

圖1

接下來從以下三個方面介紹一下python的Queue模塊

  • Queue模塊的基本使用
  • Queue模塊的源碼分析
  • 對於Queue模塊的總結和思考

Queue模塊的基本使用

Queue是一個隊列,所以是先進先出的結構,一般可用於進程之間的通信。舉一個經典的例子,消費者生產者模式的例子


生產者消費者之間的橋樑

圖2

上圖兩個函數分別對應生產者和消費者功能,producer生產10個數字放到隊列中,消費者從隊列中依次將數字取出來

生產者消費者之間的橋樑

圖3

對於Queue類,內置的類方法有put()方法,功能是將生產的元素放到隊列中。get()方法是將隊列中的數據取出來。還有兩個類方法起到函數功能的輔助作用,empty()用來檢測隊列是否為空,隊列中是否還有元素,full()方法用來告訴程序隊列是否已經放滿,無法再繼續填充元素,一般用於設置了隊列大小的情況下,默認隊列是無限大的。qsize()返回目前隊列中元素的數量


生產者消費者之間的橋樑

圖4

Queue模塊的源碼分析

接下來我們一塊看下Queue類的源碼

生產者消費者之間的橋樑

圖5

首先init方法初始化了線程的條件變量,需要notify才可以喚醒線程繼續執行,從這裡也可以解釋為什麼python的Queue是線程安全的,因為每個線程相當於加鎖了,所以是線程安全的

生產者消費者之間的橋樑

圖6

上圖就是隊列的put()方法,我們如果不指定隊列的大小那麼就會調用self._put()方法將數據放到隊列中,同時還有一個標識self.unfinished_tasks,用來表明還有多少元素放到隊列後沒有被消費。並且調用self.not_empty.notify()喚醒get()方法去隊列中獲取元素消費。如果設置了隊列的大小和超時時間的話,那麼當元素數量等於隊列的容量的時候,就會raise Full這個錯誤,同理超時也是一樣的

生產者消費者之間的橋樑

圖7

put()方法向隊列輸入數據之後,會喚醒get()方法來消費,首先get方法會先調用self._qsize()來判斷當前隊列中是否還有數據可以消費,如果沒有,那麼就wait(),直到put方法又生產了新的數據

生產者消費者之間的橋樑

圖8

從圖8我們可以瞭解到,Queue消息隊列本質上用的deque這個數據結構,因為隊列需要先進先出,而deque通過append之後,可以popleft將最早的元素彈出來,符合隊列的規則。而empty()方法和full()方法本質上就是獲取deque的長度

對於Queue模塊的總結和思考

我們這篇文章主要分析了Queue這個類的使用場景和源碼實現,一般對於python編程而言,通過Queue這個隊列將兩個進程或者線程之間聯繫了起來,並且是線程安全的,使用非常的方便。大家還能想起來隊列的主要使用場景有哪些嗎,沒有印象的話可以到文章開頭再看一下,對於生產者消費者模式其實本質上就是異步通信的使用場景。那麼解耦和削峰的使用場景大家可以思考一下,削峰的目的其實是負載均衡,使流量的產生比較均勻,具體怎麼使用,還有哪些其他的使用場景,歡迎大家留言討論


以上就是作者的一些分析和看法,歡迎大家關注桓藝恆,多多留言討論哈


分享到:


相關文章: