字解JDK并发工具源码 - AbstractQueuedSynchronizer Condition Queue

向Doug Lea学习并发工具设计

Posted by Chris on May 16, 2017

简介

Condition是一个接口,其子类在AbstractQueuedSynchronizer中实现,名为ConditionObject。

在本文中如没有特殊说明的话Condition皆为ConditionObject,他的作用与Object提供的监视器方法一样,实现等待通知模式,在Object中wait()、notify()和notifyAll()分别对应Condition中的await()、signal()和signalAll()。但是Condition却可以提供忽略中断的等待方法,并且可以创建多个等待条件。

await() - 睡眠

public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();

  Node node = addConditionWaiter();
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    // k1
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  if (//k2
    acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}

我们需要的是各个击破,当然更需要一些预备的知识让我们更好地理解await(),当前调用顺序如下:

addConditionWaiter() -> fullyRelease(Node) -> isOnSyncQueue(Node) -> checkInterruptWhileWaiting(Node)

addConditionWaiter() - 添加元素至Condition Queue

private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  // 根据上面的英文描述,推断condition queue中的节点类型只有两种: condition, cancelled
  if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    // 清除cancelled后重新获取WaitQueue中的尾节点
    t = lastWaiter;
  }
  // 构建WaitQueue的预节点。Node.nextWater的属性在sync queue表示节点类型mode,即exclusive还是shared;在condition queue中都是exclusive,nextWaiter用作本意--下一个waiter。
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  // 如果尾节点引用为null则说明队列未被初始化
  if (t == null)
    firstWaiter = node;
  else
    t.nextWaiter = node;
  lastWaiter = node;
  // i1
  return node;
}

每一次添加节点到Condition Queue之前都会先进行一次整理,删除lastWaiter前面的cancelled类型节点。然后再其链接到Condition Queue的尾端,完成真正的入队操作。

unlinkCancelledWaiters() - 从前往后删除cancelled节点

private void unlinkCancelledWaiters() {
  Node t = firstWaiter;
  Node trail = null;
  while (t != null) {
    Node next = t.nextWaiter;
    if (t.waitStatus != Node.CONDITION) {
      // 不是condition就是cancelled,所以这里直接把t.nextWaiter置为null
      t.nextWaiter = null;
      if (trail == null)
        firstWaiter = next;
      else
        trail.nextWaiter = next;
      if (next == null)
        lastWaiter = trail;
    } else
      trail = t;
    t = next;
  }
}

Condition Queue是一个单向链表所以清理工作是从前往后的,而且await()是在获得排他锁之后方可调用,所以这些操作也是线程安全的。这个方法除了在addConditionWaiter()中调用以外,还会在await()唤醒后再调用一次。

至此完成了addConditionWaiter(),将视野拉到await(),当前的调用顺序如下:

addConditionWaiter() -> fullyRelease(Node) -> isOnSyncQueue(Node) -> checkInterruptWhileWaiting(Node)

fullyRelease(Node) - 释放锁从Sync Queue中出队

final int fullyRelease(Node node) {
  boolean failed = true;
  try {
    int savedState = getState();
    // 释放在Sync Queue的节点
    if (release(savedState)) {
      failed = false;
      return savedState;
    } else {
      throw new IllegalMonitorStateException();
    }
  } finally {
    if (failed)
      node.waitStatus = Node.CANCELLED;
  }
}

回顾一下await()的调用顺序,我们先将当前线程封装成Node添加至Condition Queue,在未执行fullyRelease(Node)之前,该节点同时存在于Condition Queue与Sync Queue(尽管head.thread为null,但事实上就是同一线程),所以此时要将Sync Queue中的head出队,并保存其“释放的层数”以便于重新入队,如果在release期间有任何异常或者是release返回false则直接将Condition Queue中的节点置为cancelled,由unlinkCancelledWaiters()进行清理。

当前的调用顺序如下:

addConditionWaiter() -> fullyRelease(Node) -> isOnSyncQueue(Node) -> checkInterruptWhileWaiting(Node)

isOnSyncQueue(Node) - 判断节点是否处于Sync Queue

final boolean isOnSyncQueue(Node node) {
  // 在sync queue中的节点有若干特点,waitStatus无condition,node.prev一定不为null
  if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
  // 如果next不为null则一定在sync queue
  if (node.next != null) // If has successor, it must be on queue
    return true;
  /*
   * node.prev can be non-null, but not yet on queue because
   * the CAS to place it on queue can fail. So we have to
   * traverse from tail to make sure it actually made it.  It
   * will always be near the tail in calls to this method, and
   * unless the CAS failed (which is unlikely), it will be
   * there, so we hardly ever traverse much.
   * waitStatus不为condition,node.prev不为null,但是node.next为null,根据enq方法,应该是未完成入队操作,所以这里降级为全sync queue搜索
   */
  return findNodeFromTail(node);
}

本方法保证所传入的形参node一定不为head,要判断是否在Sync Queue那就要分析他和Condition Queue的特点了。Condition Queue中特有condition节点,而且prev域一定为null;如果next不为null的话则必然是位于Sync Queue,因为prev和next都是Sync Queue特有的域。那么那些“不为condition且node.prev不为null,但是node.next为null”的节点呢?这些节点有可能处于Sync Queue中(还未完全执行enq的时刻),这时就需要降级处理,从tail往前迭代以确保当前节点是否真的存在于Sync Queue中。

private boolean findNodeFromTail(Node node) {
  // 从tail往前所有节点node
  Node t = tail;
  for (; ; ) {
    if (t == node)
      return true;
    if (t == null)
      return false;
    t = t.prev;
  }
}

回顾一下await(),在执行完isOnSyncQueue(Node)确认过后,如果Node仍未回到Sync Queue的话那么当前线程将会挂起,等待唤醒。

当前的调用顺序如下:

addConditionWaiter() -> fullyRelease(Node) -> isOnSyncQueue(Node) -> checkInterruptWhileWaiting(Node)

checkInterruptWhileWaiting(Node) - 唤醒后检查是否有被中断

private int checkInterruptWhileWaiting(Node node) {
  // 线程被唤醒后检测中断状态,如果为被中断就返回0,如果被中断了就返回非0
  return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  0;
}

线程被唤醒后检测中断状态,如果为被中断就返回0,如果被中断了就返回非0。在确认被中断之后的行为也会有所不同,继续深入到transferAfterCancelledWait(Node)看看。

transferAfterCancelledWait(Node)

final boolean transferAfterCancelledWait(Node node) {
  // 如果在signal()之前醒来则将原来的节点放会sync queue
  // 将当前在condition queue的节点重新入队至sync queue,nextWaiter的值在sync queue无影响,因为sync queue只会检测shared
  if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    // enq是线程安全方法
    enq(node);
    return true;
  }
  /*
   * If we lost out to a signal(), then we can't proceed
   * until it finishes its enq().  Cancelling during an
   * incomplete transfer is both rare and transient, so just
   * spin.
   */
  // 如果在signal()之后醒来,则等待节点被放回sync queue
  // 可能会有线程竞争,也就是其他线程调用本方法的数据竞争,等待enq完成即可
  while (!isOnSyncQueue(node))
    // 最理想的情况下就是把时间片分给signal的那个线程
    Thread.yield();
  return false;
}

为什么有的选择抛出异常?有的却直接返回状态?我最开始也很疑惑,但是我在并发编程网上面看到Doug Lea在设计AQS时候的论文译文后就明白了。译文如下

JSR133修订以后,就要求如果中断发生在signal操作之前,await方法必须在重新获取到锁后,抛出InterruptedException。但是,如果中断发生在signal后,await必须返回且不抛异常,同时设置线程的中断状态。

照着上述规范我们再来看这个方法,整个方法分成两块:if块和while块+return语句。上半部分属于中断发生先于signal,此时需归还节点至Sync Queue。下半部分属于signal先于中断,此时只能等待signal的入队操作完成。如果是中断发生先于signal那么要告知调用方抛出InterruptedException,反之则仅需标记中断。

好,await()的大部分相关方法已经击破,我们重新研究await(),意在将其一举拿下

public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  // 添加至Condition Queue
  Node node = addConditionWaiter();
  // 在拥有exclusive锁的前提下,释放该锁,当后继节点唤醒时就算将当前拥有锁的线程逐出sync queue
  // 如果在sync queue中释放锁失败,就会将condition queue中的节点置为cancelled
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  // 等待新节点入队: 1.其他线程调用signal()会入队,2.当前线程被中断也会入队
  while (!isOnSyncQueue(node)) {
    // 挂起直至被signal
    LockSupport.park(this);
    // 如果是LockSupport唤醒的,会循环然后被挂起,等待进入到sync queue才退出循环
    // 但如果是强行中断,也会添加至sync queue,并且返回中断标识(非0)
    // 顾名思义,就是检测在park期间有没有被执行过中断操作
    // k1
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  // 节点自旋在获取锁成功后返回, 如果没有中断就返回false,如果被中断了就返回true
  if (//k2
    acquireQueued(node, savedState) && interruptMode != THROW_IE)
    // 这是执行到k1处都没有中断,但是在执行acquireQueued方法中却中断了的情况
    interruptMode = REINTERRUPT;
  // 如果在重新获得锁的期间有其他节点被添加到condition queue,就再整理一下
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}

其实能从acquireQueued返回,无论是true or false都是说明获得了锁,接下来仅需重新整理一下队列,根据checkInterruptWhileWaiting和acquireQueued返回的中断情况决定是否应该抛出InterruptedException,随后即可回到线程调用await()的地方。至此await方法已经被我们拿下,其他await的衍生方法也是大同小异,这里便不再赘述。

终于到signal出场了。

signal() - 唤醒Condition Queue中的首节点

public final void signal() {
// 检测是否获取了排他锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 如果队列不为null则释放第一个节点,可见condition queue是类似于公平队列
if (first != null)
doSignal(first);
}

需要注意的是isHeldExclusively()是需要子类去实现的,如果没有使用Condition那么就可以选择不实现,另一方面也可得知signal是需要持有锁才能调用的(与等待/通知的经典范式一致)。注意,signal选择的是最长等待时间的节点进行唤醒,我们看看其中doSignal(Node)的具体实现吧。

doSignal(Node)

private void doSignal(Node first) {
  do {
    // 将firstWaiter引用指向后面的节点,并且释放旧的firstWaiter的nextWaiter引用
    if ((firstWaiter = first.nextWaiter) == null)
      // 证明first是唯一一个节点,清空condition queue
      lastWaiter = null;
    // help GC
    first.nextWaiter = null;
  } while (!transferForSignal(first) &&
           // 直至condition queue没有节点或者已经将节点置入sync queue
           (first = firstWaiter) != null);
}

在do块中程序选取了最长等待时间的节点,直至transferForSignal成功或者Condition Queue已经没有更多的元素。如果你还记得await()中的transferAfterCancelledWait()方法,那么你肯定知道transferForSignal(Node)的作用必然是将Condition Queue的节点归还给Sync Queue。

transferForSignal(Node) - 迁移节点并唤醒await的线程

final boolean transferForSignal(Node node) {
  /*
   * If cannot change waitStatus, the node has been cancelled.
   * 说明该节点在释放锁的时候发生了异常,被修改为cancelled,返回false表示将传入的节点转移到sync queue失败
   */
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

  /*
   * Splice onto queue and try to set waitStatus of predecessor to
   * indicate that thread is (probably) waiting. If cancelled or
   * attempt to set waitStatus fails, wake up to resync (in which
   * case the waitStatus can be transiently and harmlessly wrong).
   * 入队至sync queue,会修改Node的prev和next域,其中next域的修改延后于prev域
   */
  Node p = enq(node);
  // p节点就是node的直接前驱
  int ws = p.waitStatus;
  // 如果前驱节点为cancelled或者修改为signal失败,那么就唤醒node所封装的线程,会继续在k1处执行,进而退出循环执行AcquireQueued,将前驱节点更改为signal
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
  // 如果前驱节点不为cancelled且成功将其更改为signal的话就不用麻烦acquireQueued进行自旋了
  return true;
}

如果将condition转换为0的CAS失败,说明该节点被置为cancelled了(详见awaitNanos(long)),返回false告知调用方signal()需要传入下一个节点。如果设置成功,则将节点重新放回Sync Queue,enq所返回的是旧的tail,也就是node的前驱。有意思的来了,我们分析一下if块,首先如果前驱是cancelled节点或者CAS设置signal失败的话(排他锁的状态仅可能有0、signal和cancelled),将会唤醒node节点的线程。这些线程的唤醒点在await()中while循环里的k1处,因为节点已经处于Sync Queue,所以退出while后将会调用acquireQueued(Node, int)选择一个有效的前驱,并通过CAS自旋修改其waitStatus。那么如果ws<=0而且成功将前驱的状态设置为signal的话则不需要再唤醒,因为他的前驱就是一个有效的前驱

signalAll() - 迁移Condition Queue所有节点

public final void signalAll() {
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  // condition queue不为null时
  if (first != null)
    doSignalAll(first);
}

与signal()的结构完全一致,唯一不同的就是调用doSignalAll(Node)而不是doSignal(Node)

private void doSignalAll(Node first) {
  // 清空condition queue
  lastWaiter = firstWaiter = null;
  do {
    Node next = first.nextWaiter;
    first.nextWaiter = null;
    // 将condition queue的节点全部迁移至sync queue
    transferForSignal(first);
    first = next;
  } while (first != null);
}

本方法清空所有节点,全部迁移至Sync Queue,所以并不需要在乎transferForSignal的返回值。

Condition Queue特点总结

  • 必须在获得排他锁的前提下才能正确使用Condition
  • isHeldExclusively()必须要被子类实现
  • Condition Queue是一个单向链表
  • Node.nextWaiter在Sync Queue被复用成节点类型(Exclusive、Shared),因为Condition中都是Exclusive类型的节点,nextWaiter用作真正的本意 — 下一个waiter
  • Condition Queue的节点类型waitStatus仅有0、condition和cancelled

后话

至此,关于Condition Queue的源码介绍已经完毕,感谢你的耐心阅读!