JDK 中定時器是如何實現的


JDK 中定時器是如何實現的

作者:Fooisart https://www.jianshu.com/p/e21eb60a2c41

jdk中能夠實現定時器功能的大致有三種方式:

  • java.util.Timer
  • java.util.concurrent.DelayQueue
  • java.util.concurrent.ScheduledThreadPoolExecutor

靜下心來,咱們一一探究。

一. java.util.Timer

示例代碼:

<code>/**
* 安排指定的任務task在指定的時間firstTime開始進行重複的固定速率period執行
* 每天中午12點都執行一次
*
* @author Fooisart
* Created on 21:46 14-01-2019
*/
public class TimerDemo {
public static void main(String[] args) {
Timer timer = new Timer();
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.HOUR_OF_DAY, 12);//控制小時
calendar.set(Calendar.MINUTE, 0);//控制分鐘
calendar.set(Calendar.SECOND, 0);//控制秒
Date time = calendar.getTime();//執行任務時間為12:00:00

//每天定時12:00執行操作,每隔2秒執行一次
timer.schedule(new TimerTask() {

@Override
public void run() {
System.out.println(new Date() + "執行任務。。。");
}
}, time, 1000 * 2);
}
}/<code>

Demo中使用了Timer實現了一個定時任務,該任務在每天12點開始執行,並且每隔2秒執行一次。

順手牽羊:查看源碼時,無意發現Timer中有schedule與scheduleAtFixedRate,它倆都可以到約定時間按照指定時間間隔執行。然而它倆的區別是什麼呢?官方解釋:一個是Fixed-delay,一個是Fixed-rate。那麼這兩個詞到底是什麼意思呢?把demo中的代碼運行一遍,然後把schedule換成scheduleAtFixedRate,就全部瞭然了。

示例代碼中較為簡潔,能看出控制執行時間的方法應該是 timer.schedule(),跟進去看源碼:

<code>public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}/<code>
  • task 表示要執行的任務邏輯
  • firstTime 表示第一次執行的時間
  • period 表示每次間隔時間

繼續跟進:

<code>private void sched(TimerTask task, long time, long period) {
//省略非重點代碼
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");

synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}

queue.add(task);
if (queue.getMin() == task)
queue.notify();
}
}/<code>

這裡其實做了兩個事情

  • 給task設定了一些參數,類似於初始化task。這裡還給它加了把鎖,可以思考一下為甚要在此初始化?為何要加鎖?(不是本文範疇,各位夥伴自行思考)
  • 把初始化後的task加入到queue中。

讀到這裡,我們還是沒有看到到底是如何實現定時的?彆著急,繼續。進入queu.add(task)

<code>/**
* Adds a new task to the priority queue.
*/
void add(TimerTask task) {
// Grow backing store if necessary
if (size + 1 == queue.length)

queue = Arrays.copyOf(queue, 2*queue.length);

queue[++size] = task;
fixUp(size);
}/<code>

這裡註釋提到,加入一個新任務到優先級隊列中去。其實這裡的TimerTask[]是一個優先級隊列,使用數組存儲方式。並且它的數據結構是heap。包括從fixUp()我們也能看出來,它是在保持堆屬性,即堆化(heapify)。

那麼能分析的都分析完了,還是沒能看到定時是如何實現的?再次靜下來想一想,定時任務如果想執行,首先得啟動定時器。所有咱們再次關注構造方法。

Timer一共有4個構造方法,看最底層的:

<code>public Timer(String name) {
thread.setName(name);
thread.start();
}/<code>

可以看到,這裡在啟動一個thread,那麼既然是一個Thread,那肯定就得關注它的 run()方法了。進入:

<code>public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}/<code>

繼續進入mainLoop():

<code>/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
//省略
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}/<code>

從上述源碼中,可以看出有兩個重要的if

  • if (taskFired = (executionTime<=currentTime)),表示已經到了執行時間,那麼下面執行任務就好了;
  • if (!taskFired),表示未到執行時間,那麼等待就好了。那麼是如何等待的呢?再仔細一看,原來是調用了Object.wait(long timeout)。

到這裡我們知道了,原來jdk中的定時器是這樣實現的啊,等待是使用最簡單的Object.wait()實現的啊!彆著急,這裡有個小提問:使用Therad.sleep()可以實現嘛?如果可以,為何不用呢?

java.util.concurrent.DelayQueue

比較細緻地分析了java.util.Timer,DelayQueue也大同小異。整理一下心情,重新出發。

先上示例代碼:

DelayQueue它本質上是一個隊列,而這個隊列裡也只有存放Delayed的子類才有意義,所有定義了DelayTask:

<code>public class DelayTask implements Delayed {
private Date startDate = new Date();
public DelayTask(Long delayMillions) {
this.startDate.setTime(new Date().getTime() + delayMillions);
}

@Override
public int compareTo(Delayed o) {
long result = this.getDelay(TimeUnit.NANOSECONDS)
- o.getDelay(TimeUnit.NANOSECONDS);
if (result < 0) {
return -1;
} else if (result > 0) {
return 1;
} else {
return 0;
}

}

@Override
public long getDelay(TimeUnit unit) {
Date now = new Date();
long diff = startDate.getTime() - now.getTime();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
}
public static void main(String[] args) throws Exception {
BlockingQueue<delaytask> queue = new DelayQueue<>();
DelayTask delayTask = new DelayTask(1000 * 5L);
queue.put(delayTask);
while (queue.size()>0){
queue.take();
}
}/<delaytask>/<code>

看main方法,主要做了三件事:

  • 構造DelayTask,其中的延遲時間是5秒
  • 將任務放入隊列
  • 從隊列中取任務

DelayQueue跟剛才的Timer.TaskQueue是比較相似的,都是優先級隊列,放入元素時,都得堆化(DelayQueue.put()如果元素滿了,會阻塞。自行研究)。重點看queue.take()。

<code>public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();

first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}/<code>

源碼中出現了三次await字眼:

  • 第一次是當隊列為空時,等待;
  • 第二次等待是因為,發現有任務,沒有到執行時間,並且有準備執行的線程(leader)。咱們得講理吧,既然已經有人在準備執行了,咱們就得等吧。
  • 第三次是真正延時的地方了,available.awaitNanos(delay),此時也沒有別的線程要執行,也就是我將要執行,所有等待剩下的延遲時間即可。

這裡咱們明白了,DelayQueue的等待是通過Condition.await()來實現的。請注意,這裡又有一個小問題了:Object.wait()與Conditon.await()有何異同?

java.util.concurrent.ScheduledThreadPoolExecutor

由於ScheduledThreadPoolExecutor涉及到的線程池(ThreadPoolExecutor)內容較多,所有就不詳細分析了,也考慮到讀到這裡,難免有些疲倦。直接簡述一下結論:在創建ScheduledThreadPoolExecutor時,線程池的工作隊列使用的是DelayedWorkQueue,它的take()方法,與DelayQueue.take()方法極其相似,也有三個等待。

至此,要結束了。總結一下,jdk中實現定時器一共有兩種方式:

  1. 使用Object.wait()
  2. 使用Conditon.await()

還記得文中的兩個小提問嘛:

  1. 使用Thread.sleep()可以實現嘛?如果可以,為何不用呢?
  2. Object.wait()與Conditon.await()有何異同?


分享到:


相關文章: