Semaphore是什麼
Semaphore是J.U.C包下的許可控制類,維護了一個許可集,通常用於限制可以訪問某些資源(物理或邏輯的)的線程數目,或對資源訪問的許可控制。
常用方法
acquire()
從許可集中請求獲取一個許可,此時當前線程開始阻塞,直到獲得一個可用許可,或者當前線程被中斷。
acquire(int permits)
從許可集中請求獲取指定個數(permits)的許可,此時當前線程開始阻塞,直到獲得指定數據(permits)可用許可,或者當前線程被中斷。
release()
釋放一個許可,將其返回給許可集。
release(int permits)
釋放指定個數(permits)許可,將其返回給許可集。
tryAcquire()
嘗試獲取一個可用許可,如果此時有一個可用的許可,則立即返回true,同時許可集中許可個數減一;如果此時許可集中無可用許可,則立即返回false。
tryAcquire(int permits)
嘗試獲取指定個數(permits)可用許可,如果此時有指定個數(permits)可用的許可,則立即返回true,同時許可集中許可個數減指定個數(permits);如果此時許可集中許可個數不足指定個數(permits),則立即返回false。
tryAcquire(long timeout, TimeUnit unit)
在給定的等待時間內,嘗試獲取一個可用許可,如果此時有一個可用的許可,則立即返回true,同時許可集中許可個數減一;如果此時許可集中無可用許可,當前線程阻塞,直至其它某些線程調用此Semaphore的release()方法並且當前線程是下一個被分配許可的線程,或者其它某些線程中斷當前線程,或者已超出指定的等待時間。
tryAcquire(int permits, long timeout, TimeUnit unit)
在給定的等待時間內,嘗試獲取指定個數(permits)可用許可,如果此時有指定個數(permits)可用的許可,則立即返回true,同時許可集中許可個數減指定個數(permits);如果此時許可集中許可個數不足指定個數(permits),當前線程阻塞,直至其它某些線程調用此Semaphore的release()方法並且當前線程是下一個被分配許可的線程並且許可個數滿足指定個數,或者其它某些線程中斷當前線程,或者已超出指定的等待時間。
實現原理
Semaphore內部原理是通過AQS實現的。Semaphore中定義了Sync抽象類,而Sync又繼承了AbstractQueuedSynchronizer,Semaphore中對許可的獲取與釋放,是使用CAS通過對AQS中state的操作實現的。
Semaphore對許可的分配有兩種策略,公平策略和非公平策略,沒有明確指明時,默認為非公平策略。
公平策略:根據方法調用順序(即先進先出;FIFO)來選擇線程、獲得許可。 非公平策略:不對線程獲取許可的順序做任何保證。
Semaphore提供了兩個構造方法用於構建實例對象。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
下面分析非公平策略的Semaphore實現。首先是acquire()方法的實現代碼。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly方法的實現在AQS中。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取指定個數許可
// 如果許可個數不足,則執行doAcquireSharedInterruptibly
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
可以看到,如果當前線程被中斷,則直接拋出中斷異常,否則繼續執行tryAcquireShared方法,tryAcquireShared方法對公平策略和非公平策略在Semaphore中有不同的實現,這裡分析非公平策略的實現,進入Semaphore的靜態內部類NonfairSync中查看tryAcquireShared具體實現。
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
nonfairTryAcquireShared方法繼承自Sync。
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
可以看到,Semaphore中許可的分配是通過AQS中的state實現的。創建Semaphore對象時,初始化AQS的state值;當向Semaphore對象請求獲取許可時,會獲取state當前值,然後用當前值減去要獲取的許可個數,得到許可剩餘個數,如果剩餘個數不足(小於0)或者剩餘個數充足並且通過CAS成功修改state值,則直接返回許可剩餘個數,否則一直做輪訓獲取操作。
回到上面的acquireSharedInterruptibly方法,如果此時方法中tryAcquireShared執行結果是大於等於0,則獲取許可成功,否則執行doAcquireSharedInterruptibly方法,這個方法的實現在AQS中。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
把當前線程封裝成Node節點,並加入到等待隊列的尾部,通過循環再次嘗試獲取許可,如果不能獲取則當前線程阻塞,否則恢復當前線程並返回。
無參數的tryAcquire方法和有參數的tryAcquire方法,在具體實現上和acquire方法類似,這裡不再做具體分析。下面分析release方法。
public void release() {
sync.releaseShared(1);
}
releaseShared方法的具體實現在AQS中。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法在Sync類中進行了重寫。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
如果要歸還的許可個數和當前剩下的許可個數的總和超限,則拋出Error;否則通過CAS修改state,成功則返回true,失敗返回false。返回到上面的releaseShared方法,如果tryReleaseShared方法執行返回false,則直接返回false,歸還許可失敗;如果tryReleaseShared方法執行返回true,則可以繼續進行歸還許可操作,執行doReleaseShared方法。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
doReleaseShared方法中,從等待隊列的頭結點開始,恢復符合條件的阻塞線程,使其恢復繼續執行。
閱讀更多 Java實戰技術 的文章