Java 多線程之Semaphore (限流Java 版)

Java 多線程之Semaphore (限流Java 版)

人生意義

概念

計數信號量。從概念上講,信號量維護一組許可證。

舉一個例子

某銀行分店只有三個窗口,所以同一時間最多隻有三個人辦理業務,其它人只能等待。可以把辦理業務的人比作成線程,三個窗口就相當於三個許可證。此時來了4個人,先到的三個領到人許可證然後辦理業務,第四個人呢只有等待,等待其中一個先辦好業務釋放許可證之後,然後再辦理業務。

簡單的用法

調用aquire方法是阻塞的直到有一個許可可用然後返回。每次調用release方法就會增加一個許可,隱式地釋放一個阻塞獲取者(調用aquire方法阻塞的線程)。當然沒有所謂實際的許可對象,Semaphore僅僅是維護了一個數字而已,然後執行相應的減加操作而已。

一個demo?

public class SemaphoreDemo {
 public static final int THREAD_SIZE = 10;
 public static void runSomething() throws InterruptedException {
 //模擬處理什麼事
 Thread.sleep(1000);
 System.out.println(String.format("current threadId is %d,current time is %d",
 Thread.currentThread().getId(), System.currentTimeMillis() / 1000));
 }
 public static void main(String[] args) throws InterruptedException {
 //創建一個包含4個許可證的信號量實例
 Semaphore semaphoreDemo = new Semaphore(4);
 for (int i = 0; i < THREAD_SIZE; i++) {
 //獲取許可
 Thread demoThread = new Thread(() -> {
 try {
 //獲取許可
 semaphoreDemo.acquire();
 //操作資源
 runSomething();
 } catch (InterruptedException e) {
 //拋出InterruptedException 會將該線程的中斷標誌設置為false
 Thread.currentThread().interrupt();
 } finally {
 semaphoreDemo.release();
 }
 });
 //開啟demo線程
 demoThread.start();
 }
 }
}

程序大家有認真看嘛,給大家一分鐘的時間看下。。。。。。大家發現有沒有啥問題?考慮一下?

程序在調用aquire方法時會阻塞住,如果此時該線程被中斷了finally 還執行release 方法.So,we can optimize it !

Semaphore 類提供了一個好用的方法

tryAcquire 方法,調用該方法在調用的時間如果有許可的話則會獲取到許可並返回true,但是如果當前沒有許可的話則會立即返回false

整改之後的代碼如下:

/**
 * @author 梁自強
 * @date 2019.09.20
 */
public class SemaphoreAdvancedDemo {
 public static void main(String[] args) {
 //新建一個擁有4個許可的信號量
 Semaphore semaphoreAdvance = new Semaphore(4);
 for (int i = 0; i < THREAD_SIZE; i++) {
 Thread demoThread = new Thread(() -> {
 boolean isAcquire = false;
 try {
 //tryAcquire 如果 沒有許可會立即返回false,否則會通過CAS 去修改被volatile修飾的許可總數即state
 while (!(isAcquire = semaphoreAdvance.tryAcquire())) {
 Thread.sleep(100);
 }
 runSomething();
 } catch (InterruptedException e) {
 System.out.println(String.format("threadId:%s interrupt", Thread.currentThread().getId()));
 Thread.currentThread().interrupt();
 } finally {
 if (isAcquire) {
 semaphoreAdvance.release();
 System.out.println(String.format("current threadId:%s released a permit", Thread.currentThread().getId()));
 }
 }
 });
 demoThread.start();
 //隨機調用interrupt 方法模仿實際被中斷
 if (ThreadLocalRandom.current().nextInt(THREAD_SIZE) > THREAD_SIZE / 2) {
 demoThread.interrupt();
 }
 }
 }
}

上面有個while 循環感覺很不爽有木有?我們再來優化一下?

下面在看一個神奇的方法,再也不害怕中斷了

acquireUninterruptibly() :從信號量實例獲取許可,直到有一個許可可用否則一直阻塞。若阻塞中的線程被調用了interrupt方法,該線程會一直等待,當獲取到許可返回時中斷狀態會被設置為true

測試方法:

 public static void main(String[] ar) throws InterruptedException {
 
 /**
 *
 * {@link Semaphore#acquireUninterruptibly} 方法 獲取許可,如有許可則返回,
 * 若麼有阻塞,在阻塞的過程中線程調用中斷方法也不會影響線程的等待獲取許可,但是在返回時該線程的中斷狀態會被設置為true
 * 測試步驟:
 * 1.創建一個擁有一個許可的信號量實例
 * 2.在主線程中acquire一個許可
 * 3.創建一個線程a去獲取許可
 * 4.調用a.interrupt方法
 * 5.主線程釋放許可
 */
 Semaphore semaphore = new Semaphore(1);
 semaphore.acquire();
 Thread testThread = new Thread(()->{
 semaphore.acquireUninterruptibly();
 //測試線程是否是中斷狀態
 if (Thread.currentThread().isInterrupted()) {
 System.out.println("pass");
 }else {
 System.out.println("get error");
 }
 });
 //啟動測試線程
 testThread.start();
 //中斷測試線程
 testThread.interrupt();
 //釋放許可
 semaphore.release();
 }
結果:
pass

下面來使用下這個 acquireUninterruptibly 方法來改造一下我們的增加版demo,下面我們來看下,做出一些調整使用了策略模式修改了我們上面的類,使其易於擴展,因為需要傳到Thread裡執行,這裡我就偷個懶,直接使用java 的Runnable 為策略基類,實現了兩個子類實現run 接口,類圖關係如下:


Java 多線程之Semaphore (限流Java 版)

Semaphore 使用策略圖

並且在方法調用上還特別設計了一下,在調用的時候 傳的是Supplier 實現類對象,相當於函數式調用大家不僅學習了多線程的知識,還學習如何使用java 8 的新姿勢,此時不點個贊,有點對不住我這個博主φ(>ω

代碼如下:

public class SemaphoreAdvancedDemo {
 public static void main(String[] args) {
 //新建一個擁有4個許可的信號量
 Semaphore semaphoreAdvance = new Semaphore(4);
// testSemaphore(() -> new Advance(semaphoreAdvance));
 testSemaphore(()->new Final(semaphoreAdvance));
 }
 private static void testSemaphore(Supplier supplier) {
 for (int i = 0; i < THREAD_SIZE; i++) {
 Thread demoThread = new Thread(supplier.get());
 demoThread.start();
 //隨機調用interrupt 方法模仿實際被中斷
 if (ThreadLocalRandom.current().nextInt(THREAD_SIZE) > THREAD_SIZE / 2) {
 demoThread.interrupt();
 }
 }
 }
 static class Advance implements Runnable {
 Advance(Semaphore semaphoreAdvance) {
 this.semaphoreAdvance = semaphoreAdvance;
 }
 private Semaphore semaphoreAdvance;
 @Override
 public void run() {
 boolean isAcquire = false;
 try {
 //tryAcquire 如果 沒有許可會立即返回false,否則會通過CAS 去修改被volatile修飾的許可總數即state
 while (!(isAcquire = semaphoreAdvance.tryAcquire())) {
 Thread.sleep(100);
 }
 runSomething();
 } catch (InterruptedException e) {
 System.out.println(String.format("threadId:%s interrupt", Thread.currentThread().getId()));
 Thread.currentThread().interrupt();
 } finally {
 if (isAcquire) {
 semaphoreAdvance.release();
 System.out.println(String.format("current threadId:%s released a permit", Thread.currentThread().getId()));
 }
 }
 }
 }
 static class Final implements Runnable {
 Final(Semaphore semaphoreFinal) {
 this.semaphoreFinal = semaphoreFinal;
 }
 private Semaphore semaphoreFinal;
 @Override
 public void run() {
 try {
 semaphoreFinal.acquireUninterruptibly();
 if (Thread.currentThread().isInterrupted()) {
 System.out.println(String.format("[final] %s have interrupt", Thread.currentThread().getName()));
 throw new InterruptedException();
 }
 runSomething();
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 } finally {
 semaphoreFinal.release();
 System.out.println(String.format("%s released a permit", Thread.currentThread().getName()));
 }
 }
 }
}

使用場景

通例:某類資源同時限定n 個線程訪問

實際使用場景

  • 信號量可以用來限制一次數據庫連接的數量
  • 可以實現java 版的限流工具
  • 信號量持有一個許可證的時侯可以當做同步資源來使用,不過使用需要小心因為信號量即使一個線程沒有獲取許可證,也可以釋放許可證,這就是和排它鎖的區別,但是如果你使用得當,它還可以解決線上死鎖的問題(大家可以思考下怎麼設計在評論區討論)

最後實現一個信號量版的令牌桶算法


Java 多線程之Semaphore (限流Java 版)

令牌桶示意圖


上源碼

/**
 * 限流:令牌桶算法
 */
public class RateLimiterOnSemaphore {
 private static final int DEFAULT_REQUEST_PER_SECOND = 200;
 private static final int SECOND_MILLIONS = 1000;
 private ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1);
 private final Semaphore tokenContainer;
 private final int requestPerSecond;
 public RateLimiterOnSemaphore() {
 this(DEFAULT_REQUEST_PER_SECOND);
 }
 public RateLimiterOnSemaphore(int requestPerSecond) {
 this.requestPerSecond = requestPerSecond;
 tokenContainer = new Semaphore(requestPerSecond);
 //定時任務往container 勻速的存放token
 //計算 定時任務執行的間隔時間
 long period = SECOND_MILLIONS / requestPerSecond;
 schedule.scheduleAtFixedRate(this::putToken, 0, period, TimeUnit.MILLISECONDS);
 }
 public static RateLimiterOnSemaphore create(int tokensPerSecond) {
 return new RateLimiterOnSemaphore(tokensPerSecond);
 }
 /**
 * 獲取token
 */
 public void acquire() {
 tokenContainer.acquireUninterruptibly();
 }
 /**
 * 嘗試獲取token
 *
 * @return true 如果獲取成功 否則返回false
 */
 public boolean tryAcquire() {
 return tokenContainer.tryAcquire();
 }
 /**
 * 往容器裡存放token
 */
 private void putToken() {
 //判斷是否達到每秒上限
 if (tokenContainer.availablePermits()  
< requestPerSecond) { tokenContainer.release(); } } }

小結

這篇主要講信號量的用法,下一篇講jdk是如何實現信號量的。請大家點贊關注下吧,動動你的小拇指是對我最大的幫助。


分享到:


相關文章: