java.util.concurrent 簡介


java.util.concurrent 簡介

java.util.concurrent包提供了很多有用的類,方便我們進行併發程序的開發。本文將會做一個總體的簡單介紹。

主要的組件

java.util.concurrent包含了很多內容, 本文將會挑選其中常用的一些類來進行大概的說明:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory

Executor

Executor是一個接口,它定義了一個execute方法,這個方法接收一個Runnable,並在其中調用Runnable的run方法。

我們看一個Executor的實現:

<code>public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}/<code>

現在我們可以直接調用該類中的方法:

<code>    public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
log.info("{}", Thread.currentThread().toString());
});
}/<code>

注意,Executor並不一定要求執行的任務是異步的。

ExecutorService

如果我們真正的需要使用多線程的話,那麼就需要用到ExecutorService了。

ExecutorService管理了一個內存的隊列,並定時提交可用的線程。

我們首先定義一個Runnable類:

<code>public class Task implements Runnable {
@Override
public void run() {
// task details
}
}/<code>

我們可以通過Executors來方便的創建ExecutorService:

<code>ExecutorService executor = Executors.newFixedThreadPool(10);/<code>

上面創建了一個ThreadPool, 我們也可以創建單線程的ExecutorService:

<code>ExecutorService executor =Executors.newSingleThreadExecutor();/<code>

我們這樣提交task:

<code>public void execute() {  

executor.submit(new Task());
}/<code>

因為ExecutorService維持了一個隊列,所以它不會自動關閉, 我們需要調用executor.shutdown() 或者executor.shutdownNow()來關閉它。

如果想要判斷ExecutorService中的線程在收到shutdown請求後是否全部執行完畢,可以調用如下的方法:

<code>try {
executor.awaitTermination( 5l, TimeUnit.SECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}/<code>

ScheduledExecutorService

ScheduledExecutorService和ExecutorService很類似,但是它可以週期性的執行任務。

我們這樣創建ScheduledExecutorService:

<code>ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();/<code>

executorService的schedule方法,可以傳入Runnable也可以傳入Callable:

<code>Future<string> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);

ScheduledFuture> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);/<string>/<code>

還有兩個比較相近的方法:

<code>scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )

scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) /<code>

兩者的區別是前者的period是以任務開始時間來計算的,後者是以任務結束時間來計算。

Future

Future用來獲取異步執行的結果。可以調用cancel(boolean mayInterruptIfRunning) 方法來取消線程的執行。

我們看下怎麼得到一個Future對象:

<code>public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);

Future<string> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}/<string>/<code>

我們看下怎麼獲取Future的結果:

<code>if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}/<code>

future還可以接受一個時間參數,超過指定的時間,將會報TimeoutException。

<code>try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}/<code>

CountDownLatch

CountDownLatch是一個併發中很有用的類,CountDownLatch會初始化一個counter,通過這個counter變量,來控制資源的訪問。我們會在後面的文章詳細介紹。

CyclicBarrier

CyclicBarrier和CountDownLatch很類似。CyclicBarrier主要用於多個線程互相等待的情況,可以通過調用await() 方法等待,知道達到要等的數量。

<code>public class Task implements Runnable {

private CyclicBarrier barrier;

public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}/<code>
<code>public void start() {

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});

Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

if (!cyclicBarrier.isBroken()) {

t1.start();
t2.start();
t3.start();
}
}/<code>

Semaphore

Semaphore包含了一定數量的許可證,通過獲取許可證,從而獲得對資源的訪問權限。通過 tryAcquire()來獲取許可,如果獲取成功,許可證的數量將會減少。

一旦線程release()許可,許可的數量將會增加。

我們看下怎麼使用:

<code>static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());

if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}

}/<code>

ThreadFactory

ThreadFactory可以很方便的用來創建線程:

<code>public class ThreadFactoryUsage implements ThreadFactory {
private int threadId;
private String name;

public ThreadFactoryUsage(String name) {
threadId = 1;
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
log.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}/<code>


分享到:


相關文章: