大家好,我是小图灵视界,最近在分析Java8源码,使用的JDK是OpenJDK8,打算将分析源码的笔记都会分享出来,在头条上代码排版比较难看,想要笔记的可以关注并私信我。
由于分析Objec类源码比较长,所以将Object类源码分析的笔记分为两部分,没有看第一部分分析的可以点击下面进行链接进行阅读。
notify、notifyAll、wait
这三个方法放在一起讲,都是native修饰的本地方法,另外这三个方法被final修饰,说明这三个方法是不能重写。
notify:唤醒任意一个等待的线程
notifyAll:唤醒所有等待的线程
wait:释放锁,线程进入等待。
这几个方法主要用于线程之间的通信,特别适合与生产者和消费者的场景,下面用消费者和生产者的例子看看这几个方法的使用:
<code>//生产者 public class Producer implements Runnable{ //产品容器 private final List container; //产品容器的大小 private final int size=5; public Producer( List container){ this.container=container; } private void produce() throws InterruptedException { synchronized (container){ //判断容器是否已满 while (container.size()==size){ System.out.println("容器已满,暂定生产。"); container.wait(); } Random random = new Random(); int num = random.nextInt(10); //模拟1秒生产一个产品 Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"时间"+new Date()+" 生产产品:" + num); container.add(num); //生产一个产品就可以通知消费者消费了 container.notifyAll(); } } public void run() { while (true){ try { produce(); } catch (InterruptedException e) { System.out.println("生产机器异常"); e.printStackTrace(); } } } }/<code>
生产者每秒生产一个产品,然后通知消费者消费,当容器满了,就进行等待消费者唤醒。
<code>//消费者 public class Consumer implements Runnable{ //消费容器 private final List container; public Consumer(List container){ this.container=container; } private void consume() throws InterruptedException { synchronized(container){ while (container.isEmpty()){ System.out.println("没有可消费的产品"); //等待 container.wait(); } //消费产品 Thread.sleep(1000); Integer num=container.remove(0); System.out.println(Thread.currentThread().getName()+"时间"+new Date()+" 消费产品:"+num); //消费一个就可以通知生产者消费了 container.notifyAll(); } } public void run() { while (true){ try { consume(); } catch (InterruptedException e) { System.out.println("消费错误"); e.printStackTrace(); } } } }/<code>
消费者每秒消费一个产品,当容器减少一个产品就可以通知生产者生产产品了,当容器为空时,进行等待生产者唤醒。
<code>//生产消费过程 public class Main { public static void main(String[] args){ List container = new ArrayList(); Thread producer = new Thread(new Producer(container)); Thread consumer = new Thread(new Consumer(container)); producer.start(); consumer.start(); } }/<code>
启动一个消费者和一个生产者,也可以启动多个消费者和多个生产者,消费者和生产者之间共用容器,上述消费者的消费速度和生产者的生产速度都是每秒一个产品,可以改变消费者消费的速度和生产者生产的速度观察程序运行的结果。
notify、notifyAll、wait一般配合着关键synchronized 一起使用,这三个方法在synchronized 代码块中使用,否则会抛出
IllegalMonitorStateException。当它们在synchronized 代码块中执行,说明当前线程一定获得了锁。优先级高的线程竞争到对象锁的概率大。notify只会唤醒一个等待的线程,而notifyAll唤醒所有的线程,唤醒所有的线程,并不意味着所有的线程会立刻执行,这些被唤醒的锁还需要竞争锁。
既然notify、notifyAll、wait这几个方法都是本地方法,那JVM层面是怎么实现的?先来看看wait方法对应的本地方法是JVM_MonitorWait,JVM_MonitorWait的实现如下:
<code>JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms)) JVMWrapper("JVM_MonitorWait"); //resolve_non_null将传入的对象强转为oop Handle obj(THREAD, JNIHandles::resolve_non_null(handle)); JavaThreadInObjectWaitState jtiows(thread, ms != 0); //是否已经交于monitor监控等待了 if (JvmtiExport::should_post_monitor_wait()) { //触发等待事件 JVMTI [%s] montior wait event triggered // 将等待事件发送给线程 JVMTI [%s] monitor wait event sent JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms); } //重点分析这句 ObjectSynchronizer::wait(obj, ms, CHECK); JVM_END/<code>
monitor指的是监控器,monitor的作用监视线程,保证在同一时间内只有一个线程可以访问共享的数据和代码(加锁对象的数据),其他线程需要访问共享数据就先进行等待,等这个线程释放锁以后,其他线程才有机会进行访问。monitor有wait set(等待池)和entry set(实例池),wait set存放处于wait状态的线程队列,entry set存放处于等待锁block状态的线程队列。
JVM_MonitorWait方法中首先判断下是否已经交于monitor进行监控等待了,如果是的话,调用post_monitor_wait方法,这个方法主要作用触发等待事件,将等待事件发送给线程。
分割线上部分代码主要是消除偏向锁、判断等待的时间是否小于0,小于0的话,抛出抛出IllegalArgumentException异常。这个方法重要代码就在分割线的下部分,ObjectMonitor就是监视器对象,ObjectMonitor的结构如下:
<code>ObjectMonitor() { _header = NULL; _count = 0; _waiters = 0, _recursions = 0; //重试的次数 _object = NULL; _owner = NULL; //指向持有ObjectMonitor对象的线程 _WaitSet = NULL; //存放所有wait状态的线程的对象 _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ;//阻塞在Entry最近可达的线程列表,该列表其实是waitNode所构成的线程代理 FreeNext = NULL ; _EntryList = NULL ;//存放处于等待锁block状态的线程队列 _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; _previous_owner_tid = 0; }/<code>
_WaitSet就是monitor的等待池,存放处于wait状态的线程队列,_EntryList就是实例池,存放处于等待锁block状态的线程队列。_cxq阻塞在Entry最近可达的线程列表,该列表其实是waitNode所构成的线程代理, _owner 是指向持有ObjectMonitor对象,ObjectSynchronizer::wait方法首先调用
ObjectSynchronizer::inflate获取到monitor以后才执行monitor的wait方法。由于monitor的wait方法的源码如下:
<code>// Wait/Notify/NotifyAll // // Note: a subset of changes to ObjectMonitor::wait() // will need to be replicated in complete_exit above void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { //获取线程 Thread * const Self = THREAD ; //断言是否是java线程 assert(Self->is_Java_thread(), "Must be Java thread!"); //强转为java线程 JavaThread *jt = (JavaThread *)THREAD; //一些全局初始化工作、 DeferredInitialize () ; // Throw IMSX or IEX. //检查是否拥有monitor(监视器) CHECK_OWNER(); EventJavaMonitorWait event; // check for a pending interrupt 检查线程是否中断和挂起 if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { // post monitor waited event. Note that this is past-tense, we are done waiting. //递交给monitor等待事件 if (JvmtiExport::should_post_monitor_waited()) { // Note: 'false' parameter is passed here because the // wait was not timed out due to thread interrupt. JvmtiExport::post_monitor_waited(jt, this, false); } if (event.should_commit()) { post_monitor_wait_event(&event, 0, millis, false); } //监听到异常事件 TEVENT (Wait - Throw IEX) ; THROW(vmSymbols::java_lang_InterruptedException()); return ; } TEVENT (Wait) ; assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; //设置当前的等待monitor jt->set_current_waiting_monitor(this); // create a node to be put into the queue // Critically, after we reset() the event but prior to park(), we must check // for a pending interrupt. //将当前线程包装为ObjectWaiter ObjectWaiter node(Self); //状态设置为TS_WAIT node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag // Enter the waiting queue, which is a circular doubly linked list in this case // but it could be a priority queue or any data structure. // _WaitSetLock protects the wait queue. Normally the wait queue is accessed only // by the the owner of the monitor *except* in the case where park() // returns because of a timeout of interrupt. Contention is exceptionally rare // so we use a simple spin-lock instead of a heavier-weight blocking lock. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; //将ObjectWaiter对象放进等待池(wait set)中 AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; if ((SyncFlags & 4) == 0) { _Responsible = NULL ; } //保存旧的重试次数 intptr_t save = _recursions; // record the old recursion count //增加ObjectWaiter的数量 _waiters++; // increment the number of waiters _recursions = 0; // set the recursion level to be 1 exit (true, Self) ; // exit the monitor guarantee (_owner != Self, "invariant") ; // As soon as the ObjectMonitor's ownership is dropped in the exit() // call above, another thread can enter() the ObjectMonitor, do the // notify(), and exit() the ObjectMonitor. If the other thread's // exit() call chooses this thread as the successor and the unpark() // call happens to occur while this thread is posting a // MONITOR_CONTENDED_EXIT event, then we run the risk of the event // handler using RawMonitors and consuming the unpark(). // // To avoid the problem, we re-post the event. This does no harm // even if the original unpark() was not consumed because we are the // chosen successor for this monitor. if (node._notified != 0 && _succ == Self) { node._event->unpark(); } // The thread is on the WaitSet list - now park() it. // On MP systems it's conceivable that a brief spin before we park // could be profitable. // // TODO-FIXME: change the following logic to a loop of the form // while (!timeout && !interrupted && _notified == 0) park() int ret = OS_OK ; int WasNotified = 0 ; { // State transition wrappers OSThread* osthread = Self->osthread(); OSThreadWaitState osts(osthread, true); { ThreadBlockInVM tbivm(jt); // Thread is in thread_blocked state and oop access is unsafe. jt->set_suspend_equivalent(); if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty } else if (node._notified == 0) { //当前线程通过park()方法开始挂起(suspend) if (millis <= 0) { Self->_ParkEvent->park () ; } else { ret = Self->_ParkEvent->park (millis) ; } } // were we externally suspended while we were waiting? if (ExitSuspendEquivalent (jt)) { // TODO-FIXME: add -- if succ == Self then succ = null. jt->java_suspend_self(); } } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm // Node may be on the WaitSet, the EntryList (or cxq), or in transition // from the WaitSet to the EntryList. // See if we need to remove Node from the WaitSet. // We use double-checked locking to avoid grabbing _WaitSetLock // if the thread is not on the wait queue. // // Note that we don't need a fence before the fetch of TState. // In the worst case we'll fetch a old-stale value of TS_WAIT previously // written by the is thread. (perhaps the fetch might even be satisfied // by a look-aside into the processor's own store buffer, although given // the length of the code path between the prior ST and this load that's // highly unlikely). If the following LD fetches a stale TS_WAIT value // then we'll acquire the lock and then re-fetch a fresh TState value. // That is, we fail toward safety. //当node的状态为TS_WAIT时,从WaitSet中删除 if (node.TState == ObjectWaiter::TS_WAIT) { Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ; if (node.TState == ObjectWaiter::TS_WAIT) { DequeueSpecificWaiter (&node) ; // unlink from WaitSet assert(node._notified == 0, "invariant"); //node状态更改为TS_RUN node.TState = ObjectWaiter::TS_RUN ; } Thread::SpinRelease (&_WaitSetLock) ; } // The thread is now either on off-list (TS_RUN), // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ). // The Node's TState variable is stable from the perspective of this thread. // No other threads will asynchronously modify TState. guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ; OrderAccess::loadload() ; if (_succ == Self) _succ = NULL ; WasNotified = node._notified ; // Reentry phase -- reacquire the monitor. // re-enter contended monitor after object.wait(). // retain OBJECT_WAIT state until re-enter successfully completes // Thread state is thread_in_vm and oop access is again safe, // although the raw address of the object may have changed. // (Don't cache naked oops over safepoints, of course). // post monitor waited event. Note that this is past-tense, we are done waiting. if (JvmtiExport::should_post_monitor_waited()) { JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT); } if (event.should_commit()) { post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT); } OrderAccess::fence() ; assert (Self->_Stalled != 0, "invariant") ; Self->_Stalled = 0 ; assert (_owner != Self, "invariant") ; ObjectWaiter::TStates v = node.TState ; if (v == ObjectWaiter::TS_RUN) { enter (Self) ; } else { guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ; ReenterI (Self, &node) ; node.wait_reenter_end(this); } // Self has reacquired the lock. // Lifecycle - the node representing Self must not appear on any queues. // Node is about to go out-of-scope, but even if it were immortal we wouldn't // want residual elements associated with this thread left on any lists. guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ; assert (_owner == Self, "invariant") ; assert (_succ != Self , "invariant") ; } // OSThreadWaitState() jt->set_current_waiting_monitor(NULL); guarantee (_recursions == 0, "invariant") ; _recursions = save; // restore the old recursion count _waiters--; // decrement the number of waiters // Verify a few postconditions assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; if (SyncFlags & 32) { OrderAccess::fence() ; } // check if the notification happened if (!WasNotified) { // no, it could be timeout or Thread.interrupt() or both // check for interrupt event, otherwise it is timeout if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { TEVENT (Wait - throw IEX from epilog) ; THROW(vmSymbols::java_lang_InterruptedException()); } } // NOTE: Spurious wake up will be consider as timeout. // Monitor notify has precedence over thread interrupt. }/<code>
ObjectMonitor::wait方法的作用主要作用为:
- 将当前线程包装为ObjectWaiter,状态设置为TS_WAIT,ObjectWaiter的结构可以参考分析下面分析ObjectSynchronizer::notify的内容。
- 执行 AddWaiter (&node) ,将ObjectWaiter放进等待池(wait set)中,即_WaitSet,_WaitSet是ObjectWaiter的一个队列。AddWaiter方法就是将ObjectWaiter放进_WaitSet队尾中。
- 将当前线程挂起,在上述源码中并没与释放锁。也就是释放锁的工作不在方法内。
其他逻辑,源码中有详细的注释,感兴趣的可以直接深入下。ObjectMonitor::AddWaiter作用是将线程加入等待池(wait set)中,ObjectMonitor::AddWaiter的代码为:
<code>inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet ; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }/<code>
当_WaitSet为空,放在_WaitSet的头部,_WaitSet的_prev和_next都指向node,当_WaitSet不为空时,将node放在_WaitSet头部。
notify方法对应的JVM层面的函数是JVM_MonitorNotify,JVM_MonitorNotify的源码为:
<code>JVM_ENTRY(void, JVM_MonitorNotify(JNIEnv* env, jobject handle)) JVMWrapper("JVM_MonitorNotify"); //转成oop,oop表示普通对象 Handle obj(THREAD, JNIHandles::resolve_non_null(handle)); //调用notify方法 ObjectSynchronizer::notify(obj, CHECK); JVM_END/<code>
首先将传入的对象转换成oop,上述源码中,最重要的是调用了
ObjectSynchronizer::notify进行唤醒等待的线程,
ObjectSynchronizer::notify的源码如下:
<code>void ObjectSynchronizer::notify(Handle obj, TRAPS) { //是否使用偏向锁 if (UseBiasedLocking) { //消除偏向锁 BiasedLocking::revoke_and_rebias(obj, false, THREAD); assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } markOop mark = obj->mark(); //如果已经获取了锁和获取了监视器 if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { return; } //获取monitor(监视器),监视THREAD,然后调用notify方法 ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD); }/<code>
ObjectSynchronizer::notify方法先判断是否使用偏向锁,如果使用了就消除偏向锁。
ObjectSynchronizer::inflate是获取monitor,在调用wait、notify、notifyAll方法的时候,首先需要获取monitor(监视器),获取了monitor可以看成是获取了锁,monitor相当于是边界,没有monitor监控的线程就不能进来monitor中。获取了monitor,然后调用notify方法,notify在JVM层面的源码为:
<code>void ObjectMonitor::notify(TRAPS) { //OWNES就是ObjectMonitor的_owner,指向持有ObjectMonitor对象的线程 CHECK_OWNER(); //判断等待池(Wait Set)是是否为空,如果为空,直接返回 if (_WaitSet == NULL) { //触发Empty的Notify TEVENT (Empty-Notify) ; return ; } //追踪监视器探针,获取java线程的id以及获取java当前class的名字、字节大小、长度等 DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; ===============================分割线1============================= //线程自旋 Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; //等待池队列,将等待池(wait set)队列的第一个值取出并返回 ObjectWaiter * iterator = DequeueWaiter() ; if (iterator != NULL) { TEVENT (Notify1 - Transfer) ; //ObjectWaiterd的状态 guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ; guarantee (iterator->_notified == 0, "invariant") ; //如果Policy!=4 ,状态设置为在获取锁队列的状态 if (Policy != 4) { iterator->TState = ObjectWaiter::TS_ENTER ; } iterator->_notified = 1 ; Thread * Self = THREAD; //线程id iterator->_notifier_tid = Self->osthread()->thread_id(); //获取等待锁block状态的线程队列 ObjectWaiter * List = _EntryList ; if (List != NULL) { //断言,进行List前指针、状态的断言 assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } if (Policy == 0) { // prepend to EntryList //为空,等待锁block状态的线程队列为空 if (List == NULL) { //如果为空,将队列设置为空 iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { //放入_EntryList队列的排头位置 List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // append to EntryList //Policy == 1:放入_EntryList队列的末尾位置; if (List == NULL) { //如果为空,队列设置为空 iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { // CONSIDER: finding the tail currently requires a linear-time walk of // the EntryList. We can make tail access constant-time by converting to // a CDLL instead of using our current DLL. //放入_EntryList队列的末尾位置; ObjectWaiter * Tail ; for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ; assert (Tail != NULL && Tail->_next == NULL, "invariant") ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; } //Policy == 2 时,将List放在_cxq队列的排头位置 } else if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { //如果为空,队列设置为空 iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; //放进_cxq队列时,CAS操作,有其他线程竞争 for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } //Policy == 3:放入_cxq队列中,末尾位置; } else if (Policy == 3) { // append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; //同样CAS操作 for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; //尾指针为空,设置为空 if (Tail == NULL) { iterator->_next = NULL ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) { break ; } } else { //尾指针不为空,添加在队尾 while (Tail->_next != NULL) Tail = Tail->_next ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; break ; } } } else { //Policy等于其他值,立即唤醒ObjectWaiter对应的线程; ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; } //Policy<4,等待重试 if (Policy < 4) { iterator->wait_reenter_begin(this); } // _WaitSetLock protects the wait queue, not the EntryList. We could // move the add-to-EntryList operation, above, outside the critical section // protected by _WaitSetLock. In practice that's not useful. With the // exception of wait() timeouts and interrupts the monitor owner // is the only thread that grabs _WaitSetLock. There's almost no contention // on _WaitSetLock so it's not profitable to reduce the length of the // critical section. } //线程自旋释放 Thread::SpinRelease (&_WaitSetLock) ; if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) { ObjectMonitor::_sync_Notifications->inc() ; } }/<code>
ObjectMonitor::notify函数的逻辑主要分为两部分,第一部分做一些检查和准备唤醒操作过程需要的一些信息,第二部所及是根据Policy的大小将需要唤醒的线程放进等待锁block状态的线程队列,即ObjectMonitor的_EntryList和_cxq队列中,这两个队列的线程将等待获取锁。在分割线上部分, CHECK_OWNER()检测是否拥有monitor,只有拥有monitor,才可以唤醒等待的线程,当等待池(wait set)为空,说明没有等待池中没有需要唤醒的线程,直接返回。如果等待池不为空,则准备获取java线程以及获取java当前class(JVM层面读取的是class文件)的名字、字节大小、长度等。分割线下部分第二部分逻辑,当等待池中线程不为空的时候,首先调用 ObjectWaiter * iterator = DequeueWaiter() 从等待池中将第一个等待的线程取出来,DequeueWaiter() 的源代码为:
<code>inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter //将wait set赋值给waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; }/<code>
ObjectMonitor::DequeueWaiter()中,当_WaitSet不为空时,调用
ObjectMonitor::DequeueSpecificWaiter方法返回_WaitSet的第一个元素,
ObjectMonitor::DequeueSpecificWaiter方法的源代码为:
<code>//将_WaitSet中第一个线程返回回来 inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev != NULL, "node already removed from list"); assert(node->_next != NULL, "node already removed from list"); // when the waiter has woken up because of interrupt, // timeout or other spurious wake-up, dequeue the // waiter from waiting list ObjectWaiter* next = node->_next; // if (next == node) { assert(node->_prev == node, "invariant check"); _WaitSet = NULL; } else { ObjectWaiter* prev = node->_prev; assert(prev->_next == node, "invariant check"); assert(next->_prev == node, "invariant check"); next->_prev = prev; prev->_next = next; if (_WaitSet == node) { _WaitSet = next; } } node->_next = NULL; node->_prev = NULL; }/<code>
ObjectMonitor::DequeueSpecificWaiter方法中,首先判断ObjectWaiter的_next是否等于_WaitSet,如果是否,则说明_WaitSet为空,将_WaitSet设置为NULL,如果不是,则将第一元素返回。
ObjectWaiter是JVM层面的C++类,ObjectWaiter类为:
<code>// ObjectWaiter serves as a "proxy" or surrogate thread. // TODO-FIXME: Eliminate ObjectWaiter and use the thread-specific // ParkEvent instead. Beware, however, that the JVMTI code // knows about ObjectWaiters, so we'll have to reconcile that code. // See next_waiter(), first_waiter(), etc. class ObjectWaiter : public StackObj { public: //状态 enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ; enum Sorted { PREPEND, APPEND, SORTED } ; //下一个ObjectWaiter指针 ObjectWaiter * volatile _next; //前一个ObjectWaiter指针 ObjectWaiter * volatile _prev; //线程 Thread* _thread; //被唤醒的线程id jlong _notifier_tid; ParkEvent * _event; volatile int _notified ; volatile TStates TState ; Sorted _Sorted ; // List placement disposition bool _active ; // Contention monitoring is enabled public: ObjectWaiter(Thread* thread); void wait_reenter_begin(ObjectMonitor *mon); void wait_reenter_end(ObjectMonitor *mon); };/<code>
ObjectWaiter类充当“代理”或代理线程,也就是ObjectWaiter充当_thread的代理角色,负责与其他对外的工作对接。_next指向下一个ObjectWaiter指针,_prev指向前一个ObjectWaiter指针。回到 ObjectMonitor::notify函数的第二部分逻辑,当从线程池中取出第一个ObjectWaiter(线程代理),根据Policy的值不同,将取出的线程放入等待锁的_EnterList或者_cxq队列中的起始或末尾位置。当Policy == 0时,将等待池取出的iterator(线程或者线程代理)放进 _EntryList中的排头位置;当Policy == 1时,将等待池取出的iterator放进_EntryList中的末尾位置;当Policy == 2时,将等待池中取出的iterator放进放在_cxq队列的排头位置。因为有其他线程的竞争,当放入_cxq队列时,进行CAS操作保证线程的安全;在讲解ObjectMonitor结构出现过,_cxq是阻塞在_EnterList最近可达的线程列表,该列表其实是waitNode所构成的线程代理;当Policy == 3时,将等待池中取出的iterator放入_cxq队列中的末尾位置;当Policy等于其他值,立即唤醒ObjectWaiter对应的线程,唤醒线程以后并没有释放锁。经过上面的分析,我们知道java中调用notify方法时,不一定是立即唤醒线程,可能先将等待池中取出的线程放在获取锁阻塞池中(_EntryList或_cxq)。
java的notifyAll方法在JVM中的实现跟java的notify方法基本一样,这里就不贴源码了,主要区别是遍历ObjectWaiter * iterator = DequeueWaiter() ,重复java的notify方法的JVM实现过程,把所有的_WaitSet中的ObjectWaiter对象放入到_EntryList中。
JVM中wait、notify、notifyAll 方法中都没有释放锁,锁的释放是在Synchronizer同步块结束的时候释放的。
释放锁调用ObjectMonitor::exit方法,主要将ObjectWaiter从_cxq或者_EntryList中取出后唤醒,唤醒的线程会继续执行挂起前的代码,通过CAS去竞争锁,exit方式释放锁后,被唤醒的线程占用了该锁。
protected void finalize()
Object类中最后一个方法是finalize(),由protected 修饰,由子类进行重写。当对象的引用不再被使用时,垃圾回收器进行调用finalize。这个方法是由垃圾回收器进行调用的,所以该方法可能不会被触发,finalize方法不会被任何对象调用多次。当子类重写了finalize方法,并且这个方法体不为空时,JVM层面则会调用register_finalizer函数进行注册这个方法,finalize方法是在Java对象初始化过程中注册的,当进行垃圾回收时,对象被回收并且在finalize中引用自身时,会逃过一次回收,这样对象不一定会被回,finalize方法也不可能调用第二次。
關鍵字: ObjectMonitor 源码 wait