ReentrantLock 學習筆記

ReentrantLock

實現了 Lock 接口

public class ReentrantLock implements Lock, java.io.Serializable {}// ReentrantLock 的使用範式class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } }}

靜態內部抽象類 Sync

/** * 基本的鎖控制實現 */abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 具體的加鎖方式由子類去實現 */ abstract void lock(); /** * 執行非公平鎖的嘗試加鎖 acquires = 1 */ final boolean nonfairTryAcquire(int acquires) { // 先獲取當前線程和鎖狀態 final Thread current = Thread.currentThread(); int c = getState(); // 如果 c == 0 代表之前沒有加過鎖, 使用CAS操作把 state 從 0 改成 acquires if (c == 0) { // 如果 CAS 修改成功, 則把加鎖線程設置為當前線程並且返回 true。 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已經加過鎖了, 判斷當前線程是否是之前加鎖的線程 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 加鎖次數增加 (重入鎖) if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); // 這裡不用原子操作, 因為加過鎖了所以沒有競爭。 return true; } return false; // 上面兩個狀態都不符合直接返回 false。 } /** * 嘗試釋放鎖 */ protected final boolean tryRelease(int releases) { int c = getState() - releases; // 減加鎖次數 // 判斷當前線程是否是加鎖線程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 判斷加鎖次數是否已經減到 0, true - 解鎖完成, 加鎖線程設置為 null。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c);  return free; } /** * 當前線程是否已經獲取到排他鎖 */ protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } /** * 創建 Condition */ final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class /** * 獲取當前加鎖的線程, 如果狀態為 0 表示未加鎖, 直接返回 null。 */ final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } /** * 獲取加鎖次數, 如果是之前加鎖的線程返回state, 不是返回 0。 */ final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } /** * 是否已經加鎖了 */ final boolean isLocked() { return getState() != 0; } /** * Reconstitutes this lock instance from a stream. * 序列變化時使用, 從流中讀取的鎖, 鎖狀態清零。 * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state }}

非公平鎖 NonfairSync extens Sync

static final class NonfairSync extends Sync { /** * 加鎖 */ final void lock() { // CAS 修改 state , 如果成功代表之前沒有鎖 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); // 否則調用 AQS 的 acquire 方法 } /** * 嘗試獲取 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); // 調用 Sync 的 nonfairTryAcquire 方法 }}

公平鎖 FairSync extens Sync

static final class FairSync extends Sync { /** * 加鎖 */ final void lock() { acquire(1); // 直接調用 AQS 的 acquire 方法 } /** * 嘗試獲取 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果未被加鎖則調用 hasQueuedPredecessors 判斷隊列中是否有等待執行的線程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 實現重入鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }} 

公平鎖與非公平鎖的總結

公平鎖與非公平鎖顧名思義一個是不允許插隊的鎖, 所有線程都按先後的排隊順序來。而另一個則是允許插隊的。

但是在插隊的定義上面 - 跟公平鎖的區別在於公平鎖中調用的 hasQueuedPredecessors() 方法, 這個方法會判斷這個節點前面是否還有節點在等待, 如果有則會放棄競爭。

加鎖流程總結

  1. 通過 CAS 操作直接修改鎖狀態 0 -> 1, 修改成功 = 加鎖成功, 鎖保存當前線程。
  2. 第一步獲取鎖失敗, 調用 tryAcquire 方法再次嘗試, 方法內判斷鎖狀態, 如果 state == 0 會重複第一步的操作嘗試獲取鎖, 前面沒成功會判斷是否是當前線程, 有一個重入鎖的實現。
  3. 前面操作全部失敗後會執行入隊操作。
  4. 把當前線程包裝成 Node 對象, 類似第一步, 先使用 CAS 操作嘗試直接把當前節點掛在隊列尾部, 如果不成功就死循環(自旋操作)嘗試掛在隊尾, 直到成功為止。
  5. 入隊成功後進入掛起階段, 在掛起之前判斷當前節點是否是頭結點的下一個, 如果是就嘗試獲取鎖, 成功後就把當前節點設置為頭結點並且把節點內的數據置空並終止返回。否則, 把當前節點的等待狀態改為 SIGNAL 狀態並把前面所有狀態為 CANCEL 的節點出隊。
  6. 最後掛起線程並檢查線程是否已經標識為中斷。
  7. 等待被釋放鎖的線程喚醒繼續死循環步驟 5 和 6。
  8. 如果入隊失敗瞭如果自己是最後一個節點直接移除返回。否則把當前節點和前面所有無效節點出隊並喚醒下一個有效節點 。

ReentrantLock 構造方法

/** * 默認使用非公平鎖 */public ReentrantLock() { sync = new NonfairSync();}/** * 根據 fair 是 true/false 來使用公平鎖還是非公平鎖 */public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}

ReentrantLock 成員方法

/** * 加鎖 - 默認是非公平鎖 */public void lock() { sync.lock();}/** * 加鎖 - 允許中斷的加鎖操作 */public void lockInterruptibly() throws InterruptedException { // 加鎖之前先判斷線程的中斷標識符, 如果中斷直接拋出異常 sync.acquireInterruptibly(1);}/** * 嘗試加鎖 */public boolean tryLock() { return sync.nonfairTryAcquire(1);}/** * 帶有超時的加鎖 * timeout = 超時時間; unit = 時間單位;  */public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { // 但是不管傳什麼時間單位都會被轉成納秒 return sync.tryAcquireNanos(1, unit.toNanos(timeout));}/** * 解鎖 */public void unlock() { // 調用 AQS 的 release 方法 sync.release(1);}..... 省略一堆 get 方法/** * toString  */public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]");}

使用到的 AQS 中的方法

概念

  1. AQS 隊列是一個雙向鏈表。
  2. AQS 隊列中的頭結點是不包含任何業務的, 是個空節點, 有效節點從首節點(第二個)開始算。
  3. 如果獲取鎖失敗的線程在入隊的時候也失敗了, 會被中斷。
  4. AQS 隊列中節點的 next 是 null 的時候代表排他, 是空節點時代表共享。Node.EXCLUSIVE = 排他; Node.SHARED = 共享。
  5. LockSupport.park() 方法獲取線程的執行許可, 獲取不到會使線程阻塞, 可以相應中斷請求但不會拋出InterruptedException。

入隊方法及涉及方法

/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */static final int PROPAGATE = -3;/** * 獲取鎖 */public final void acquire(int arg) { // 首先調用實現類的 tryAcquire 方法嘗試獲取鎖。 // 當前線程封裝成 Node 類型加入等待隊列 // 如果前兩步都失敗了, 直接調用 Thread.currentThread().interrupt() if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}/** * 增加等待節點 * Node.EXCLUSIVE = 排他; Node.SHARED = 共享 */private Node addWaiter(Node mode) { // 創建新 Node; mode = node.next() Node node = new Node(Thread.currentThread(), mode); // pred 指向隊列尾部 Node pred = tail; // 表示隊列不為空 if (pred != null) { node.prev = pred; // CAS 修改 pred 為 node, 也就相當於修改 tail 因為兩個引用指向同一塊內存。 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // enq 方法內含有初始化隊列邏輯 enq(node); return node;}/** * 入隊 + 初始化 */private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 如果隊列為空必須初始化一個頭結點 if (compareAndSetHead(new Node())) tail = head; } else { // 這裡邏輯同 addWaiter 一樣都是把 node 添加到鏈表尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // p = node 的前驅結點 final Node p = node.predecessor(); // 如果 node 是首節點 並且又獲取到了鎖 if (p == head && tryAcquire(arg)) { // 把 node 設置為頭結點, 並且 thread 和 prev 全部置 null setHead(node); // 把之前的頭結點的引用置 null, 變成不可達對象。 p.next = null; // help GC failed = false;  return interrupted; // 返回 false; } // 把 node 失效的前驅結點全部出隊 並且 掛起當前線程。 // 所有排隊線程都會在這阻塞, 等待解鎖方法喚醒後又會循環判斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 前驅結點的狀態如果是待執行的狀態則直接返回 true */ return true; if (ws > 0) { /* * 如果是取消狀態, 則先把前驅結點踢出, 然後循環判斷新的前驅結點 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE.  * 0 或 PROPAGATE 則改成 SIGNAL 狀態 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}/** * 取消獲取 */private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 取消獲取 先把當前節點的線程置 null node.thread = null; // 跳過已經是取消狀態的前驅結點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 有效前驅結點的下一個節點 Node predNext = pred.next; // 把當前節點的狀態置為取消 node.waitStatus = Node.CANCELLED; // 如果當前節點是尾部, 把尾部設為之前篩選出的有效前驅結點 if (node == tail && compareAndSetTail(node, pred)) { // 把新的尾部的後續指針指向 null, 這樣就相當於出隊了當前節點+所有無效的前驅結點 compareAndSetNext(pred, predNext, null); } else { int ws; // 當前節點的前驅結點不是頭結點 // 前驅結點的狀態為 SIGNAL || (前驅結點的狀態 <= 0 && 把前驅結點設置為 SIGNAL 狀態) // 前驅結點的線程不為 null if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; // 如果當前節點的後續節點不為 null 並且狀態不是取消狀態則把 prev.next = node.next // 完成了當前節點的出隊 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 喚醒繼任者 unparkSuccessor(node); } node.next = node; // help GC }}private void unparkSuccessor(Node node) { /* * 節點等待狀態改為 0  */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 查找下一個有效節點 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 釋放後續節點的線程 if (s != null) LockSupport.unpark(s.thread);} 

解鎖方法

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 釋放鎖成功後會喚醒下一個有效節點的線程 return true; } return false;}

筆記

  1. 獲取鎖方法 - 帶有 shared 結尾的方法是共享鎖的方法。
  2. 獨佔鎖和共享鎖會在 Node 的 mode 中體現
  3. AQS 中入隊方法的死循環 - 死循環中如果沒有獲取鎖成功則會調用 ThreadSuport.park() 方法, 使線程阻塞。 在釋放鎖方法裡面, 會獲取當前節點的 next 節點中的線程, 調用 LockSupport.unpark() 喚醒前面在死循環中的線程, 再次嘗試獲取鎖。
  4. 非公平鎖的插隊 - 跟公平鎖的區別在於公平鎖中調用的 hasQueuedPredecessors() 方法, 這個方法會判斷這個節點前面是否還有節點在等待, 如果有則會放棄競爭。
  5. 讀寫鎖 - 會把 int 類型的 state 屬性分割成兩部分, 高16位代表讀鎖狀態, 低16位代表寫鎖狀態。
ReentrantLock 學習筆記


分享到:


相關文章: