java中有界隊列的飽和策略(reject policy)

我們在使用ExecutorService的時候知道,在ExecutorService中有個一個Queue來保存提交的任務,通過不同的構造函數,我們可以創建無界的隊列(ExecutorService.newCachedThreadPool)和有界的隊列(ExecutorService newFixedThreadPool(int nThreads))。

無界隊列很好理解,我們可以無限制的向ExecutorService提交任務。那麼對於有界隊列來說,如果隊列滿了該怎麼處理呢?

今天我們要介紹一下java中ExecutorService的飽和策略(reject policy)。

以ExecutorService的具體實現ThreadPoolExecutor來說,它定義了4種飽和策略。分別是AbortPolicy,DiscardPolicy,DiscardOldestPolicy和CallerRunsPolicy。

如果要在ThreadPoolExecutor中設定飽和策略可以調用setRejectedExecutionHandler方法,如下所示:

<code>        ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<runnable>(20));
threadPoolExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.AbortPolicy()
);
/<runnable>/<code>

Java

上面的例子中我們定義了一個初始5個,最大10個工作線程的Thread Pool,並且定義其中的Queue的容量是20。如果提交的任務超出了容量,則會使用AbortPolicy策略。

AbortPolicy

AbortPolicy意思是如果隊列滿了,最新的提交任務將會被拒絕,並拋出RejectedExecutionException異常:

<code>   public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.

*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/<code>

Java

上面的代碼中,rejectedExecution方法中我們直接拋出了RejectedExecutionException異常。

DiscardPolicy

DiscardPolicy將會悄悄的丟棄提交的任務,而不報任何異常。

<code>public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/<code>

Java

DiscardOldestPolicy

DiscardOldestPolicy將會丟棄最老的任務,保存最新插入的任務。

<code>   public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
/<code>

Java

我們看到在rejectedExecution方法中,poll了最老的一個任務,然後使用ThreadPoolExecutor提交了一個最新的任務。

CallerRunsPolicy

CallerRunsPolicy和其他的幾個策略不同,它既不會拋棄任務,也不會拋出異常,而是將任務回退給調用者,使用調用者的線程來執行任務,從而降低調用者的調用速度。我們看下是怎麼實現的:

<code>public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**

* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/<code>

Java

在rejectedExecution方法中,直接調用了 r.run()方法,這會導致該方法直接在調用者的主線程中執行,而不是在線程池中執行。從而導致主線程在該任務執行結束之前不能提交任何任務。從而有效的阻止了任務的提交。

使用Semaphore

如果我們並沒有定義飽和策略,那麼有沒有什麼方法來控制任務的提交速度呢?考慮下之前我們講到的Semaphore,我們可以指定一定的資源信號量來控制任務的提交,如下所示:

<code>public class SemaphoreUsage {

private final Executor executor;
private final Semaphore semaphore;

public SemaphoreUsage(Executor executor, int count) {
this.executor = executor;
this.semaphore = new Semaphore(count);
}

public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();

try {
executor.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
}
);
} catch (RejectedExecutionException e) {
semaphore.release();
}
}

}
/<code>

Java

本文的例子可參考https://github.com/ddean2009/learn-java-concurrency/tree/master/rejectPolicy

更多內容請訪問 [www.flydean.com](www.flydean.com)


分享到:


相關文章: