SynchronousQueue 源碼解析

SynchronousQueue 源碼解析

圖片標題


從不浪費時間的人,沒有工夫抱怨時間不夠。 ——傑弗遜

0 前言

SynchronousQueue 一個阻塞隊列,其中每個插入操作必須等待另一個線程進行相應的刪除操作,反之亦然。 同步隊列沒有任何內部容量,甚至沒有一個容量。 你無法窺視SynchronousQueue,因為僅當你嘗試刪除它時,該元素才存在。 你不能插入元素(使用任何方法),除非另一個線程試圖將其刪除; 你無法進行迭代,因為沒有要迭代的內容。 隊列的頭部是第一個排隊的插入線程試圖添加到隊列中的元素; 如果沒有這樣的排隊線程,則沒有元素可用於刪除,並且poll()將返回null。 為了其他Collection方法(例如,contains)的目的,SynchronousQueue充當空集合。 此隊列不允許空元素.

同步隊列類似於CSP和Ada中使用的集合通道。 它們非常適合切換設計,在該設計中,在一個線程中運行的對象必須與在另一個線程中運行的對象同步,以便向其傳遞一些信息,事件或任務。

此類支持可選的公平性策略,用於訂購正在等待的生產者和使用者線程。 默認情況下,不保證此排序。 但是,將公平性設置為true構造的隊列將按FIFO順序授予線程訪問權限。

此類及其迭代器實現Collection和Iterator接口的所有可選方法。

此類是Java Collections Framework的成員。

1 繼承體系


SynchronousQueue 源碼解析

圖片標題


SynchronousQueue 源碼解析


  • 繼承 AbstractQueue 抽象類,定義了對隊列的基本操作
  • 實現 BlockingQueue 阻塞隊列接口,其對隊列的操作可能會拋出異常
  • 實現 Searializable接口,可以被序列化

2 數據結構

由於SynchronousQueue的支持公平策略和非公平策略,所以底層有兩種數據結構

  • 隊列(實現公平策略),有一個頭結點和尾結點
  • 棧(實現非公平策略),有一個頭結點

隊列與棧都是通過鏈表來實現的。具體的數據結構如下

圖片標題


內部類UML 圖

  • Transferer是TransferStack棧和TransferQueue隊列的公共類,定義了轉移數據的公共操作,由TransferStack和TransferQueue具體實現圖片標題
  • WaitQueue、LifoWaitQueue、FifoWaitQueue表示為了兼容JDK1.5版本中的SynchronousQueue的序列化策略所遺留的,這裡不做具體的講解圖片標題

3 非公平的堆棧(默認策略)

3.1 棧元素

put 的時候,就往棧中放數據。take 的時候,就從棧中取數據,兩者操作都是在棧頂上操作數據.


SynchronousQueue 源碼解析


  • volatile SNode next 棧頂的下一個節點
  • volatile SNode match匹配,用來判斷阻塞棧元素能被喚醒的時機 比如我們先執行 take,此時隊列中沒有數據,take 被阻塞了,棧元素為 SNode1 當 put 時,會把當前 put 的棧元素賦值給 SNode1 的 match 屬性,並喚醒 take 當 take 被喚醒,發現 SNode1 的 match 屬性有值時,就能拿到 put 的數據
  • volatile Thread waiter 阻塞的線程
  • Object item 未投遞/未消費的消息

3.2 入棧和出棧

  • 入棧 使用 put 等方法,將數據放到棧中圖片標題
  • 出棧 使用 take 等方法,把數據從棧中拿出來圖片標題

操作的對象都是棧頂,底層實現的方法也是同一個:

<code>@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    
    // e 為空: take 方法,非空: put 方法
    int mode = (e == null) ? REQUEST : DATA;
    
    // 自旋
    for (;;) {
        // 頭節點情況分類
        // 1:為空,說明隊列中還沒有數據
        // 2:非空,並且是 take 類型的,說明頭節點線程正等著拿數據
        // 3:非空,並且是 put 類型的,說明頭節點線程正等著放數據
        SNode h = head;
        
        // 棧頭為空,說明隊列中還沒有數據。
        // 棧頭非空且棧頭的類型和本次操作一致
        //	比如都是 put,那麼就把本次 put 操作放到該棧頭的前面即可,讓本次 put 能夠先執行
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 設置了超時時間,並且 e 進棧或者出棧要超時了,
            // 就會丟棄本次操作,返回 null 值。
            // 如果棧頭此時被取消了,丟棄棧頭,取下一個節點繼續消費
            if (timed && nanos <= 0) {      // 無法等待
                // 棧頭操作被取消
                if (h != null && h.isCancelled())
                    // 丟棄棧頭,把棧頭的後一個元素作為棧頭
                    casHead(h, h.next);     // 將取消的節點彈棧
                // 棧頭為空,直接返回 null
                else
                    return null;
            // 沒有超時,直接把 e 作為新的棧頭
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // e 等待出棧,一種是空隊列 take,一種是 put
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                // 本來 s 是棧頭的,現在 s 不是棧頭了,s 後面又來了一個數,把新的數據作為棧頭
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        // 棧頭正在等待其他線程 put 或 take
        // 比如棧頭正在阻塞,並且是 put 類型,而此次操作正好是 take 類型,走此處
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 棧頭已經被取消,把下一個元素作為棧頭
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // snode 方法第三個參數 h 代表棧頭,賦值給 s 的 next 屬性
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    // m 就是棧頭,通過上面 snode 方法剛剛賦值
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                     // tryMatch 非常重要的方法,兩個作用:
                     // 1 喚醒被阻塞的棧頭 m,2 把當前節點 s 賦值給 m 的 match 屬性
                     // 這樣棧頭 m 被喚醒時,就能從 m.match 中得到本次操作 s
                     // 其中 s.item 記錄著本次的操作節點,也就是記錄本次操作的數據
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
/<code> 

執行流程:

  1. 判斷是 put 方法還是 take 方法
  2. 判斷棧頭數據是否為空,如果為空或者棧頭的操作和本次操作一致,是的話走 3,否則走 5
  3. 判斷操作有無設置超時時間,如果設置了超時時間並且已經超時,返回 null,否則走 4
  4. 如果棧頭為空,把當前操作設置成棧頭,或者棧頭不為空,但棧頭的操作和本次操作相同,也把當前操作設置成棧頭,並看看其它線程能否滿足自己,不能滿足則阻塞自己。比如當前操作是 take,但隊列中沒有數據,則阻塞自己
  5. 如果棧頭已經是阻塞住的,需要別人喚醒的,判斷當前操作能否喚醒棧頭,可以喚醒走 6,否則走 4
  6. 把自己當作一個節點,賦值到棧頭的 match 屬性上,並喚醒棧頭節點
  7. 棧頭被喚醒後,拿到 match 屬性,就是把自己喚醒的節點的信息,返回。

awaitFulfill

節點阻塞的方法

<code>/**
 * 旋轉/阻止,直到節點s通過執行操作匹配。
 * @param s 等待的節點
 * @param timed true if timed wait
 * @param nanos 超時時間
 * @return 匹配的節點, 或者是 s 如果被取消
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
	
    // deadline 死亡時間,如果設置了超時時間的話,死亡時間等於當前時間 + 超時時間,否則就是 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋的次數,如果設置了超時時間,會自旋 32 次,否則自旋 512 次。
    // 比如本次操作是 take 操作,自旋次數後,仍無其他線程 put 數據
    // 就會阻塞,有超時時間的,會阻塞固定的時間,否則一致阻塞下去
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 當前線程有無被打斷,如果過了超時時間,當前線程就會被打斷
        if (w.isInterrupted())
            s.tryCancel();

        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            // 超時了,取消當前線程的等待操作
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 自選次數減1
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 把當前線程設置成 waiter,主要是通過線程來完成阻塞和喚醒
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // park 阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}
/<code>

當一個 節點/線程 將要阻塞時,它會設置其 waiter 字段,然後在真正 park 之前至少再檢查一次狀態,從而涵蓋了競爭與實現者的關係,並注意到 waiter 非空,因此應將其喚醒。

當由出現在調用點位於堆棧頂部的節點調用時,對停放的調用之前會進行旋轉,以避免在生產者和消費者及時到達時阻塞。 這可能只足以在多處理器上發生。

從主循環返回的檢查順序反映了這樣一個事實,即優先級: 中斷 > 正常的返回 > 超時。 (因此,在超時時,在放棄之前要進行最後一次匹配檢查。)除了來自非定時SynchronousQueue的調用。{poll / offer}不會檢查中斷,根本不等待,因此陷入了轉移方法中 而不是調用awaitFulfill。

而且可以發現其阻塞策略,並不是一上來就阻塞住,而是在自旋一定次數後,仍然沒有其它線程來滿足自己的要求時,才會真正的阻塞。

3.3 圖解非公平模型

  • 線程put1執行 put(1)操作,由於當前無配對的消費線程,所以put1線程入棧,自旋一小會後睡眠等待
  • 接著,線程put2再次執行了put(2)操作,put2線程入棧,自旋一小會後睡眠等待
  • 這時候,來了一個線程take1,執行 take 操作,這時候發現棧頂為put2線程,匹配成功,但是實現會先把take1線程入棧,然後take1線程循環執行匹配put2線程邏輯,一旦發現沒有併發衝突,就會把棧頂指針直接指向 put1線程
  • 最後,再來一個線程take2,執行take操作,這跟上一步的邏輯基本一致,take2線程入棧,然後在循環中匹配put1線程,最終全部匹配完畢,棧空

從上面流程看出,雖然put1線程先入棧了,但是卻是後匹配,這就是非公平策略.

4 公平隊列

4.1 隊列元素


SynchronousQueue 源碼解析

SynchronousQueue 源碼解析


  • volatile QNode next 當前元素的下一個元素
  • volatile Object item // CAS'ed to or from null 當前元素的值,如果當前元素被阻塞住了,等其他線程來喚醒自己時,其他線程會把自己 set 到 item 裡面
  • volatile Thread waiter // to control park/unpark 阻塞線程
  • final boolean isData true 是 put,false 是 take

4.2 transfer

TransferQueue 內部類的 transfer 方法

<code>E transfer(E e, boolean timed, long nanos) {
    /**
     *
     * 這個基本方法, 主要分為兩種情況
     *
     * 1. 若隊列為空 / 隊列中的尾節點和自己的 類型相同, 則添加 node
     *      到隊列中, 直到 timeout/interrupt/其他線程和這個線程匹配
     *      timeout/interrupt awaitFulfill方法返回的是 node 本身
     *      匹配成功的話, 要麼返回 null (producer返回的), 或正真的傳遞值 (consumer 返回的)
     *
     * 2. 隊列不為空, 且隊列的 head.next 節點是當前節點匹配的節點,
     *      進行數據的傳遞匹配, 並且通過 advanceHead 方法幫助 先前 block 的節點 dequeue
     */

    QNode s = null; // 根據需要構造/重用
    // true:put  false:get
    boolean isData = (e != null);

    for (;;) {
        // 隊列首尾的臨時變量,隊列空時,t=h
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null) // 看到未初始化的值
            continue;               // 自旋
        // 首尾節點相同,隊列空
        // 或隊尾節點的操作和當前節點操作相同
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // tail 被修改,重試
            if (t != tail)
                continue;
            // 隊尾後面的值還不為空,說明其他線程添加了 tail.next,t 還不是隊尾,直接把 tn 賦值給 t
            if (tn != null) {
                advanceTail(t, tn);
                // 自旋
                continue;
            }
            // 超時直接返回 null
            if (timed && nanos <= 0)        // 等不及了
                return null;
            // 創建節點
            if (s == null)
                s = new QNode(e, isData);
            // 如果把 s 放到隊尾失敗,繼續遞歸放進去
            if (!t.casNext(null, s))        // 鏈接失敗
                continue;

            advanceTail(t, s);              // 推進 tail 節點並等待
            // 阻塞住自己,直到有其他線程與之匹配, 或它自己進行線程的中斷
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s); //  對接點 s 進行清除, 若 s 不是鏈表的最後一個節點, 則直接 CAS 進行 節點的刪除, 若 s 是鏈表的最後一個節點, 則 要麼清除以前的 cleamMe 節點(cleamMe != null), 然後將 s.prev 設置為 cleanMe 節點, 下次進行刪除 或直接將 s.prev 設置為cleanMe
                return null;
            }

            if (!s.isOffList()) {           // 尚未取消鏈接
                advanceHead(t, s);          // unlink if head 推進head 節點, 下次就調用 s.next 節點進行匹配(這裡調用的是 advanceHead, 因為代碼能執行到這邊說明s已經是 head.next 節點了)
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;
        // 隊列不為空,並且當前操作和隊尾不一致
        // 也就是說當前操作是隊尾是對應的操作
        // 比如說隊尾是因為 take 被阻塞的,那麼當前操作必然是 put
        } el***plementary-mode
            // 如果是第一次執行,此處的 m 代表就是 tail
            // 也就是這行代碼體現出隊列的公平,每次操作時,從頭開始按照順序進行操作
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                // m 代表棧頭
                // 這裡把當前的操作值賦值給阻塞住的 m 的 item 屬性
                // 這樣 m 被釋放時,就可得到此次操作的值
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // 當前操作放到隊頭
            advanceHead(h, m);              // successfully fulfilled
            // 釋放隊頭阻塞節點
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}
/<code> 

線程被阻塞住後,當前線程是如何把自己的數據傳給阻塞線程的。 假設線程 1 從隊列中 take 數據 ,被阻塞,變成阻塞線程 A 然後線程 2 開始往隊列中 put 數據 B,大致的流程如下:

  • 線程 1 從隊列 take 數據,發現隊列內無數據,於是被阻塞,成為 A
  • 線程 2 往隊尾 put 數據,會從隊尾往前找到第一個被阻塞的節點,假設此時能找到的就是節點 A,然後線程 B 把將 put 的數據放到節點 A 的 item 屬性裡面,並喚醒線程 1
  • 線程 1 被喚醒後,就能從 A.item 裡面拿到線程 2 put 的數據了,線程 1 成功返回。

在這個過程中,公平主要體現在,每次 put 數據的時候,都 put 到隊尾上,而每次拿數據時,並不是直接從堆頭拿數據,而是從隊尾往前尋找第一個被阻塞的線程,這樣就會按照順序釋放被阻塞的線程。

avanceTail

  • 嘗試 cas 將 nt 作為新的tail圖片標題

4.3 圖解公平隊列模型

公平模式下,底層實現使用的是 TransferQueue 隊列,它有一個head和tail指針,用於指向當前正在等待匹配的線程節點。

  • 初始化時的 TransferQueue
  • 線程 put1 執行 put(1) ,由於當前沒有配對的消費線程,所以 put1 線程入隊,自旋一小會後睡眠等待
  • 接著,線程 put2 執行 put(2),put2線程入隊,自旋一小會後睡眠等待
  • 這時來了一個線程 take1,執行了 take,由於 tail 指向 put2 線程,put2 線程跟 take1 線程匹配,這時take1 線程不需要入隊 注意了!這時要喚醒的線程並不是 put2,而是put1. 因為現在是公平策略,誰先入隊,誰優先被喚醒,這裡顯然 put1 應優先被喚醒. 公平策略總結一句話就是:隊尾匹配隊頭出隊
  • 執行後 put1 線程被喚醒,take1線程的 take()方法返回了1(put1線程的數據),這樣就實現了線程間的一對一通信
  • 最後,再來一個線程take2,執行take操作,這時候只有put2線程在等候,而且兩個線程匹配上了,線程put2被喚醒,take2線程take操作返回了2(線程put2的數據),這時候隊列又回到了起點

5 總結

SynchronousQueue 內沒有容器為什麼還能夠存儲一個元素呢?因為內部沒有容器指的是沒有像數組那樣的內存空間存多個元素,但是是有單地址內存空間,用於交換數據. SynchronousQueue 憑藉其獨有的線程配對通信機制,在大部分平常開發中,可能都不太會用到,但線程池技術中會有所使用,由於內部沒有使用AQS,而是直接使用CAS,所以代碼理解起來會比較困難,但這並不妨礙我們理解底層的實現模型,在理解了模型的基礎上,再翻閱源碼,就會有方向感,看起來也會比較容易!


分享到:


相關文章: