Java之AQS

1、ReentrantLock介绍

  • 相对于synchronized它具备如下特点:

    • 可中断。
    • 可以设置超时时间。
    • 可以设置为公平锁。
    • 支持多个条件变量。
    • 与synchronized一样,都支持可重入。
  • 基本语法:

    1
    2
    3
    4
    5
    6
    7
    8
    // 获取锁
    reentrantLock.lock();
    try {
    // 临界区
    } finally {
    // 释放锁
    reentrantLock.unlock();
    }

1.1 可重入

  • 可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。

  • 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Slf4j
    public class TestReentrantLock {
    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
    lock.lock();
    try {
    log.debug("execute main");
    method1();
    }finally {

    }
    }

    public static void method1() {
    lock.lock();
    try {
    log.debug("execute method1");
    method2();
    } finally {
    lock.unlock();
    }
    }
    public static void method2() {
    lock.lock();
    try {
    log.debug("execute method2");
    } finally {
    lock.unlock();
    }
    }
    }

    执行结果:

    1
    2
    3
    11:41:05.613 org.example.TestReentrantLock [main] - execute main
    11:41:05.617 org.example.TestReentrantLock [main] - execute method1
    11:41:05.617 org.example.TestReentrantLock [main] - execute method2

1.2 可打断

  • 可打断是指当前线程在等待锁释放的过程中可以被其它线程所打断。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Slf4j
    public class TestReentrantLock {
    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
    try {
    //如果没有竞争那么此方法会获取lock对象锁
    //如果有竞争就进入阻塞队列,可以被其它线程用interrupt方法打断
    log.debug("尝试获取锁");
    lock.lockInterruptibly();
    } catch (InterruptedException e) {
    e.printStackTrace();
    log.debug("没有获得锁,返回");
    return;
    }
    try {
    log.debug("获得到锁");
    } finally {
    lock.unlock();
    }
    },"t1");
    //主线程先获得锁
    lock.lock();
    t1.start();
    Thread.sleep(1000);
    log.debug("打断t1");
    t1.interrupt();
    }
    }

    执行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    11:48:27.116 org.example.TestReentrantLock [t1] - 尝试获取锁
    11:48:28.117 org.example.TestReentrantLock [main] - 打断t1
    11:48:28.117 org.example.TestReentrantLock [t1] - 没有获得锁,返回
    java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at org.example.TestReentrantLock.lambda$main$0(TestReentrantLock.java:21)
    at java.lang.Thread.run(Thread.java:748)

1.3 锁超时

  • 锁超时是指在等待获得锁超过一定时间就会主动放弃等待,表示此次尝试获取锁失败,即可以避免该线程无限等待下去。

    • ①获取不到锁则立刻放弃等待的情况。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      @Slf4j
      public class TestReentrantLock {
      static ReentrantLock lock = new ReentrantLock();

      public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
      log.debug("尝试获得锁");
      if (!lock.tryLock()) {
      log.debug("获取不到锁");
      return;
      }
      try {
      log.debug("获得到锁");
      } finally {
      lock.unlock();
      }
      }, "t1");

      lock.lock();
      t1.start();
      }
      }

      运行结果:

      1
      2
      11:54:35.706 org.example.TestReentrantLock [t1] - 尝试获得锁
      11:54:35.706 org.example.TestReentrantLock [t1] - 获取不到锁
    • ②设置超时时间等待的情况。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      @Slf4j
      public class TestReentrantLock {
      static ReentrantLock lock = new ReentrantLock();

      public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
      log.debug("尝试获得锁");
      try {
      if (! lock.tryLock(1, TimeUnit.SECONDS)) {
      log.debug("获取不到锁");
      return;
      }
      } catch (InterruptedException e) {
      e.printStackTrace();
      log.debug("获取不到锁");
      return;
      }
      try {
      log.debug("获得到锁");
      } finally {
      lock.unlock();
      }
      }, "t1");

      lock.lock();
      log.debug("获得到锁");
      t1.start();
      Thread.sleep(2000);
      log.debug("释放了锁");
      lock.unlock();
      }
      }

      执行结果:

      1
      2
      3
      4
      11:58:01.187 org.example.TestReentrantLock [main] - 获得到锁
      11:58:01.195 org.example.TestReentrantLock [t1] - 尝试获得锁
      11:58:02.209 org.example.TestReentrantLock [t1] - 获取不到锁
      11:58:03.196 org.example.TestReentrantLock [main] - 释放了锁

1.4 条件变量

  • synchronized中也有条件变量,就是我们讲原理时那个waitSet休息室,当条件不满足时进入waitSet等待。
  • ReentrantLock的条件变量比synchronized强大之处在于,它是支持多个条件变量的,这就好比:
    • synchronized是那些不满足条件的线程都在一间休息室等消息。
    • 而 ReentrantLock支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒。
  • 使用要点:
    • await前需要获得锁。
    • await执行后,会释放锁,进入conditionObject等待。
    • await的线程被唤醒(或打断、或超时)取重新竞争lock锁。
    • 竞争lock锁成功后,从await后继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Slf4j
public class TestReentrantLock {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
static ReentrantLock ROOM = new ReentrantLock();
// 等待烟的休息室
static Condition waitCigaretteSet = ROOM.newCondition();
// 等外卖的休息室
static Condition waitTakeoutSet = ROOM.newCondition();

public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
ROOM.lock();
try {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
waitCigaretteSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("可以开始干活了");
} finally {
ROOM.unlock();
}
}, "小南").start();

new Thread(() -> {
ROOM.lock();
try {
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
waitTakeoutSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("可以开始干活了");
} finally {
ROOM.unlock();
}
}, "小女").start();

Thread.sleep(1000);
new Thread(() -> {
ROOM.lock();
try {
hasTakeout = true;
waitTakeoutSet.signal();
} finally {
ROOM.unlock();
}
}, "送外卖的").start();

Thread.sleep(1000);

new Thread(() -> {
ROOM.lock();
try {
hasCigarette = true;
waitCigaretteSet.signal();
} finally {
ROOM.unlock();
}
}, "送烟的").start();
}
}

执行结果:

1
2
3
4
5
6
14:12:06.972 org.example.TestReentrantLock [小南] - 有烟没?[false]
14:12:06.972 org.example.TestReentrantLock [小南] - 没烟,先歇会!
14:12:06.972 org.example.TestReentrantLock [小女] - 外卖送到没?[false]
14:12:06.972 org.example.TestReentrantLock [小女] - 没外卖,先歇会!
14:12:07.980 org.example.TestReentrantLock [小女] - 可以开始干活了
14:12:08.981 org.example.TestReentrantLock [小南] - 可以开始干活了

2、ReentrantLock原理

  • ReentrantLock是基于AQS(全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架)实现的。

  • AQS特点:

    • 用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁。

    • getState - 获取state状态。

    • setState - 设置state状态。

    • compareAndSetState - cas机制设置state状态。

    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源。

    • 提供了基于FIFO的等待队列,类似于Monitor的EntryList。

    • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor的WaitSet。

    • 子类主要实现这样一些方法(默认抛出UnsupportedOperationException)

    • tryAcquire

    • tryRelease

    • tryAcquireShared

    • tryReleaseShared

    • isHeldExclusively

    • 获取锁的姿势:

    1
    2
    3
    4
    // 如果获取锁失败
    if (!tryAcquire(arg)) {
    // 入队, 可以选择阻塞当前线程 park unpark
    }
    • 释放锁的姿势:
    1
    2
    3
    4
    // 如果释放锁成功
    if (tryRelease(arg)) {
    // 让阻塞线程恢复运行
    }
  • ReentrantLock内部结构:

2.1 非公平锁实现原理

  • ①从构造器开始看,默认为非公平锁实现。
1
2
3
4
5
6
7
//同步器提供所有实施机制
private final Sync sync;

//创建ReentrantLock的实例,这等效于使用构造器ReentrantLock(false)。
public ReentrantLock() {
sync = new NonfairSync();
}

其中的NonfairSync继承自AQS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//同步对象的非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

//执行锁定,并在出现故障时备份到正常状态。
final void lock() {
//首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//如果尝试失败
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
  • ②当线程Thread-0执行ReentrantLock.lock()时,尝试将线程状态设置为1,如果设置成功就设置当前线程为同步器的Owner。

    1
    2
    3
    4
    5
    6
    //如果没有其他线程持有该锁,则获取该锁并立即返回,将锁保持计数设置为1。
    //如果当前线程已经持有该锁,则持有计数将增加1,该方法将立即返回。
    //如果锁是由另一个线程持有的,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到获取了锁为止,此时,锁持有计数被设置为1。
    public void lock() {
    sync.lock();
    }
  • ③这时如果线程Thread-1来竞争资源。

    1
    2
    3
    4
    5
    6
    7
    //AQS抽象类
    //在独占模式下获取,忽略中断。通过至少调用一次tryAcquire()并返回成功来实现。否则,线程将排队,并可能反复阻塞和解除阻塞,并调用tryAcquire()直到成功。
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
    • ①CAS尝试将state由0改为1,结果失败。

    • ②进入acquire()的tryAcquire()逻辑,这时state已经是1,结果仍然失败。

    • ③接下来进入addWaiter逻辑,构造Node队列。

      • 图中黄色三角表示该Node的waitStatus状态,其中0为默认正常状态。
      • Node的创建是懒惰的。
      • 其中第一个Node称为Dummy(哑元)或哨兵,用来占位,并不关联线程。
    • ④接下来进入acquireQueued()逻辑。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      //AQS抽象类
      //以排他的不间断模式获取已在队列中的线程,用于条件等待方法以及获取。
      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      //上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
      if (p == head && tryAcquire(arg)) {
      //获取成功, 设置自己(当前线程对应的 node)为 head
      setHead(node);
      p.next = null; // help GC
      failed = false;
      //返回中断标记 false
      return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }
      • ①acquireQueued会在一个死循环中不断尝试获得锁,失败后进入park阻塞。

      • ②如果自己是紧邻着head(排第二位),那么再次tryAcquire尝试获取锁,当然这时state仍为1,失败。

      • ③进入shouldParkAfterFailedAcquire逻辑,将前驱node,即head的waitStatus改为-1,这次返回false。表示waitStatus为-1的节点有资格唤醒它的后继节点。

      • ④当再次进入shouldParkAfterFailedAcquire时,这时因为其前驱node的waitStatus已经是-1,这次返回true。

      • ⑤接着进入parkAndCheckInterrupt,Thread-1 park(灰色表示)。

        1
        2
        3
        4
        5
        6
        //AQS抽象类
        private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程
        LockSupport.park(this);
        return Thread.interrupted();
        }
  • ④再次有多个线程经历上述过程竞争失败,变成这个样子。

  • ⑤这时如果Thread-0释放锁(调用ReentrantLock.unlock()),进入tryRelease流程,如果成功:

    1
    2
    3
    4
    //如果当前线程是此锁的持有者,则保留计数将减少。 如果保持计数现在为零,则释放锁定。如果当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。
    public void unlock() {
    sync.release(1);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //AQS抽象类
    //以独占模式发布。如果tryRelease返回true,则通过解锁一个或多个线程tryRelease实现。
    public final boolean release(int arg) {
    //尝试释放锁
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //AQS抽象类
    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    //支持锁重入, 只有 state 减为 0, 才释放成功
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
    }
    • ①设置exclusiveOwnerThread为null。

    • ②设置state=0。(此时执行完tryRelease())

    • ③这时回到release(),如果当前队列不为null,并且head的waitStatus=-1,进入unparkSuccessor流程。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      //AQS抽象类
      //唤醒节点的后继者(如果存在)。
      private void unparkSuccessor(Node node) {
      //如果状态是否定的(即可能需要信号),请尝试清除以预期发出信号。如果失败或等待线程更改状态,则可以。
      int ws = node.waitStatus;
      if (ws < 0)
      compareAndSetWaitStatus(node, ws, 0);

      //释放线程保留在后续线程中,该线程通常只是下一个节点。但是,如果已取消或明显为空,请从尾部向后移动以找到实际的未取消后继。
      //找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
      Node s = node.next;
      //不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
      if (s == null || s.waitStatus > 0) {
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
      s = t;
      }
      if (s != null)
      LockSupport.unpark(s.thread);
      }
      • ①找到队列中离head最近的一个Node(没取消的),unpark恢复其运行,本例中即为Thread-1。

      • ②这时回到Thread-1的acquireQueued流程。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        //以排他的不间断模式获取已在队列中的线程,用于条件等待方法以及获取。
        final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        boolean interrupted = false;
        for (;;) {
        final Node p = node.predecessor();
        //上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
        if (p == head && tryAcquire(arg)) {
        //获取成功, 设置自己(当前线程对应的 node)为 head
        setHead(node);
        p.next = null; // help GC
        failed = false;
        //返回中断标记
        return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
        }
        } finally {
        if (failed)
        cancelAcquire(node);
        }
        }
        • ①如果加锁成功(没有竞争),会设置:

          • exclusiveOwnerThread为Thread-1,state=1(在tryAcquire(arg)方法中执行)。

          • head指向刚刚Thread-1所在的Node,该Node清空Thread。

          • 原本的head因为从链表断开,而可被垃圾回收。

        • ②如果这时候有其它线程来竞争(非公平的体现),例如这时有Thread-4来了, 且被 Thread-4占了先。

          +  Thread-4被设置为exclusiveOwnerThread,state=1。
          +  Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞。
          

2.2 可重入原理

  • 在调用ReentrantLock.lock()时如果发现有竞争则会进入acquire()–>tryAcquire()–>nonfairTryAcquire(),相关可重入的代码在nonfairTryAcquire()里:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    //Sync抽象类
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果还没有获得锁
    if (c == 0) {
    //尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    //如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
    else if (current == getExclusiveOwnerThread()) {
    //state++
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    //获取失败, 回到调用处
    return false;
    }
  • 在调用ReentrantLock.unlock()进行解锁时,会执行sync.release(1)–>tryRelease(arg),相关可重入的代码在tryRelease()里:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //Sync抽象类
    protected final boolean tryRelease(int releases) {
    //state--
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    //支持锁重入, 只有 state 减为 0, 才释放成功
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
    }

2.3 可打断原理

  • 不可打断模式

    • 在此模式下,即使它被打断,仍会驻留在AQS队列中,一直要等到获得锁后方能得知自己被打断了。

    • 在执行ReentrantLock.lock()时会执行到acquire():

      1
      2
      3
      4
      5
      6
      7
      //AQS抽象类
      public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      //如果打断状态为 true
      selfInterrupt();
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      //AQS抽象类
      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      //还是需要获得锁后, 才能返回打断状态
      return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      //如果是因为 interrupt 被唤醒, 返回打断状态为 true
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }
      1
      2
      3
      4
      5
      //AQS抽象类
      static void selfInterrupt() {
      //重新产生一次中断
      Thread.currentThread().interrupt();
      }
      1
      2
      3
      4
      5
      6
      7
      //AQS抽象类
      private final boolean parkAndCheckInterrupt() {
      //如果打断标记已经是 true, 则 park 会失效
      LockSupport.park(this);
      //interrupted 会清除打断标记
      return Thread.interrupted();
      }
  • 可打断模式

    • 当调用ReentrantLock.lockInterruptibly()时会执行acquireInterruptibly–>doAcquireInterruptibly():

      1
      2
      3
      public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      //AQS抽象类
      public final void acquireInterruptibly(int arg)
      throws InterruptedException {
      if (Thread.interrupted())
      throw new InterruptedException();
      //如果没有获得到锁
      if (!tryAcquire(arg))
      doAcquireInterruptibly(arg);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      //AQS抽象类
      private void doAcquireInterruptibly(int arg)
      throws InterruptedException {
      final Node node = addWaiter(Node.EXCLUSIVE);
      boolean failed = true;
      try {
      for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      //在park过程中如果被interrupt会抛出异常, 而不会再次进入 for (;;)
      throw new InterruptedException();
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }

2.4 公平锁实现原理

  • 公平锁与非公平锁主要区别在于tryAcquire方法的实现:

    • 非公平锁的tryAcquire():

      1
      2
      3
      4
      //NonfairSync类
      protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      //Sync抽象类
      final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      //尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
      if (compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) // overflow
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }
    • 公平锁的tryAcquire():

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      //FairSync类
      protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      //先检查 AQS 队列中是否有前驱节点, 没有才去竞争
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      //AQS抽象类
      public final boolean hasQueuedPredecessors() {
      Node t = tail; // Read fields in reverse initialization order
      Node h = head;
      Node s;
      //h != t 时表示队列中有 Node
      return h != t &&
      //并且队列中没有第二个node
      ((s = h.next) == null ||
      //或者队列中第二个node不是当前线程
      s.thread != Thread.currentThread());
      }

2.5 条件变量实现原理

  • 每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject。

  • await流程

    • 开始Thread-0持有锁,调用await。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      //1.如果当前线程被中断,则抛出InterruptedException。
      //2.保存getState返回的锁定状态。
      //3.使用保存的状态作为参数调用release ,如果失败则抛出IllegalMonitorStateException。
      //4.阻塞直到发信号或被打断。
      //5.通过调用以保存状态作为参数的acquire专用版本来重新acquire。
      //6.如果在步骤4中被阻止而被中断,则抛出InterruptedException。
      public final void await() throws InterruptedException {
      if (Thread.interrupted())
      throw new InterruptedException();
      //添加一个 Node 至等待队列
      AbstractQueuedSynchronizer.Node node = addConditionWaiter();
      //释放节点持有的锁
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      //如果该节点还没有转移至 AQS 队列, 阻塞
      while (!isOnSyncQueue(node)) {
      //当前线程阻塞
      LockSupport.park(this);
      //如果被打断, 退出等待队列
      if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
      }
      //退出等待队列后, 还需要获得 AQS 队列的锁
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
      //所有已取消的 Node 从队列链表删除
      if (node.nextWaiter != null) // clean up if cancelled
      unlinkCancelledWaiters();
      if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);
      }
      • ①进入ConditionObject的addConditionWaiter流程,会创建新的Node状态为-2(Node.CONDITION),关联Thread-0,加入等待队列尾部。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        private AbstractQueuedSynchronizer.Node addConditionWaiter() {
        AbstractQueuedSynchronizer.Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
        }
        //创建一个关联当前线程的新 Node, 添加至队列尾部
        AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), AbstractQueuedSynchronizer.Node.CONDITION);
        if (t == null)
        firstWaiter = node;
        else
        t.nextWaiter = node;
        lastWaiter = node;
        return node;
        }
      • ②接下来进入AQS的fullyRelease流程,释放同步器上的锁。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        //因为某线程可能重入,需要将 state 全部释放
        final int fullyRelease(Node node) {
        boolean failed = true;
        try {
        int savedState = getState();
        if (release(savedState)) {
        failed = false;
        return savedState;
        } else {
        throw new IllegalMonitorStateException();
        }
        } finally {
        if (failed)
        node.waitStatus = Node.CANCELLED;
        }
        }
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        public final boolean release(int arg) {
        if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        //唤醒后继节点
        unparkSuccessor(h);
        return true;
        }
        return false;
        }
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
        }
      • ③unpark AQS队列中的下一个节点竞争锁,假设没有其他竞争线程,那么Thread-1竞争成功。

      • ④阻塞Thread-0。

  • signal流程

    • 假设Thread-1要来唤醒Thread-0,会调用ConditionObject.signal():

      1
      2
      3
      4
      5
      6
      7
      8
      9
      //必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁。
      public final void signal() {
      //判断调用该方法的线程是不是锁的持有者,如果不是则抛异常。
      if (!isHeldExclusively())
      throw new IllegalMonitorStateException();
      AbstractQueuedSynchronizer.Node first = firstWaiter;
      if (first != null)
      doSignal(first);
      }
      • 接着进入ConditionObject的doSignal流程,取得等待队列中第一个 Node,即Thread-0所在Node。

        1
        2
        3
        4
        5
        6
        7
        8
        private void doSignal(AbstractQueuedSynchronizer.Node first) {
        do {
        if ( (firstWaiter = first.nextWaiter) == null)
        lastWaiter = null;
        first.nextWaiter = null;
        } while (!transferForSignal(first) &&
        (first = firstWaiter) != null);
        }
        • 接着执行transferForSignal流程,将该Node加入AQS队列尾部,将Thread-0的waitStatus改为0,Thread-3的waitStatus改为-1。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          final boolean transferForSignal(Node node) {
          //如果无法更改waitStatus,则该节点已被取消。
          if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
          return false;

          //拼接到队列上并尝试设置前任的waitStatus来指示线程(可能)正在等待。如果取消设置或尝试设置waitStatus失败,请唤醒以重新同步(在这种情况下,waitStatus可能会短暂而无害地出现错误)。
          //将该Node加入AQS队列尾部,返回该结点的前驱节点
          Node p = enq(node);
          int ws = p.waitStatus;
          if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
          LockSupport.unpark(node.thread);
          return true;
          }

3、读写锁

3.1 使用ReentrantReadWriteLock

  • 当读操作远远高于写操作时,这时候使用读写锁让”读-读”可以并发,提高性能。

  • 提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}
+ **测试读锁-读锁可以并发。**

  
1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
public class TestReadWriteLock {
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
}
执行结果如下,从这里可以看到Thread-0锁定期间,Thread-1的读操作不受影响:
1
2
3
4
5
6
7
8
20:49:41.307 org.example.DataContainer [t1] - 获取读锁...
20:49:41.307 org.example.DataContainer [t2] - 获取读锁...
20:49:41.307 org.example.DataContainer [t2] - 读取
20:49:41.307 org.example.DataContainer [t1] - 读取
20:49:42.311 org.example.DataContainer [t1] - 释放读锁...
20:49:42.311 org.example.DataContainer [t2] - 释放读锁...

Process finished with exit code 0
+ **测试读锁-写锁相互阻塞。**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
public class TestReadWriteLock {
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
}
执行结果:
1
2
3
4
5
6
7
8
20:51:45.399 org.example.DataContainer [t1] - 获取读锁...
20:51:45.407 org.example.DataContainer [t1] - 读取
20:51:45.504 org.example.DataContainer [t2] - 获取写锁...
20:51:46.408 org.example.DataContainer [t1] - 释放读锁...
20:51:46.408 org.example.DataContainer [t2] - 写入
20:51:47.421 org.example.DataContainer [t2] - 释放写锁...

Process finished with exit code 0
+ **写锁-写锁也是相互阻塞的。**
  • 注意事项

    • 读锁不支持条件变量。

    • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      r.lock();
      try {
      // ...
      w.lock();
      try {
      // ...
      } finally{
      w.unlock();
      }
      } finally{
      r.unlock();
      }
    • 重入时降级支持:即持有写锁的情况下去获取读锁。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      class CachedData {
      Object data;
      // 是否有效,如果失效,需要重新计算 data
      volatile boolean cacheValid;
      final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
      void processCachedData() {
      rwl.readLock().lock();
      if (!cacheValid) {
      // 获取写锁前必须释放读锁
      rwl.readLock().unlock();
      rwl.writeLock().lock();
      try {
      // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
      if (!cacheValid) {
      data = ...
      cacheValid = true;
      }
      // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
      rwl.readLock().lock();
      } finally {
      rwl.writeLock().unlock();
      }
      }
      // 自己用完数据, 释放读锁
      try {
      use(data);
      } finally {
      rwl.readLock().unlock();
      }
      }
      }

3.2 ReentrantReadWriteLock原理

  • 读写锁用的是同一个Sync同步器,因此等待队列、state等也是同一个。

  • t1 w.lock –> t2 r.lock–> t3 r.lock –> t4 w.lock –> t1 w.unlock –> t2 r.unlock –> t3 r.unlock为例。

    • ①t1调用w.lock()成功上锁,流程与ReentrantLock加锁相比没有特殊之处,不同是写锁状态占了state的低16位,而读锁使用的是state的高16位。

      1
      2
      3
      4
      //WriteLock类
      public void lock() {
      sync.acquire(1);
      }
      1
      2
      3
      4
      5
      6
      //AQS抽象类
      public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      //Sync抽象类
      protected final boolean tryAcquire(int acquires) {
      //获得低 16 位, 代表写锁的 state 计数
      Thread current = Thread.currentThread();
      int c = getState();
      int w = exclusiveCount(c);
      if (c != 0) {
      // (Note: if c != 0 and w == 0 then shared count != 0)
      //c != 0 and w == 0 表示有读锁, 或者如果 exclusiveOwnerThread 不是自己
      if (w == 0 || current != getExclusiveOwnerThread())
      //获得锁失败
      return false;
      //写锁计数超过低 16 位, 报异常
      if (w + exclusiveCount(acquires) > MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
      // Reentrant acquire
      //写锁重入, 获得锁成功
      setState(c + acquires);
      return true;
      }
      //判断写锁是否该阻塞, 或者尝试更改计数失败
      if (writerShouldBlock() ||
      !compareAndSetState(c, c + acquires))
      //获得锁失败
      return false;
      //获得锁成功
      setExclusiveOwnerThread(current);
      return true;
      }
      1
      2
      3
      4
      5
      //NonfairSync类
      //非公平锁 writerShouldBlock 总是返回 false, 无需阻塞
      final boolean writerShouldBlock() {
      return false; // writers can always barge
      }
    • ②t2执行r.lock,这时进入读锁的sync.acquireShared(1) 流程,首先会进入tryAcquireShared流程。如果有写锁占据,那么tryAcquireShared返回-1表示失败。(tryAcquireShared 返回值表示:-1 表示失败;0 表示成功,但后继节点不会继续唤醒;正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1

      1
      2
      3
      4
      //ReadLock类
      public void lock() {
      sync.acquireShared(1);
      }
      1
      2
      3
      4
      5
      //AQS抽象类
      public final void acquireShared(int arg) {
      if (tryAcquireShared(arg) < 0)
      doAcquireShared(arg);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      //Sync类
      protected final int tryAcquireShared(int unused) {
      Thread current = Thread.currentThread();
      int c = getState();
      //如果是其它线程持有写锁, 获取读锁失败
      if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
      return -1;
      int r = sharedCount(c);
      //读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且小于读锁计数, 并且尝试增加计数成功
      if (!readerShouldBlock() &&
      r < MAX_COUNT &&
      compareAndSetState(c, c + SHARED_UNIT)) {
      if (r == 0) {
      firstReader = current;
      firstReaderHoldCount = 1;
      } else if (firstReader == current) {
      firstReaderHoldCount++;
      } else {
      ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
      cachedHoldCounter = rh = readHolds.get();
      else if (rh.count == 0)
      readHolds.set(rh);
      rh.count++;
      }
      return 1;
      }
      return fullTryAcquireShared(current);
      }

      这时会进入sync.doAcquireShared(1) 流程。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      private void doAcquireShared(int arg) {
      //将当前线程关联到一个 Node 对象上, 模式为共享模式
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
      //再一次尝试获取读锁
      int r = tryAcquireShared(arg);
      if (r >= 0) {
      setHeadAndPropagate(node, r);
      p.next = null; // help GC
      if (interrupted)
      selfInterrupt();
      failed = false;
      return;
      }
      }
      //是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }
      • 首先也是调用addWaiter添加节点,不同之处在于节点被设置为Node.SHARED模式而非Node.EXCLUSIVE模式,注意此时t2仍处于活跃状态。
      • t2会看看自己的节点是不是老二,如果是,还会再次调用tryAcquireShared(1)来尝试获取锁。

      • 如果没有成功,在doAcquireShared内for(;;)循环一次,把前驱节点的waitStatus改为-1(在shouldParkAfterFailedAcquire()中实现),再for(;;)循环一 次尝试tryAcquireShared(1)如果还不成功,那么在parkAndCheckInterrupt()处park。

    • ③这种状态下,假设又有t3加读锁和t4加写锁,这期间t1仍然持有锁,就变成了下面的样子:

    • ④这时如果t1 w.unlock,会走到写锁的sync.release(1)流程,调用sync.tryRelease(1)成功,变成下面的样子。

      1
      2
      3
      4
      //WriteLock类
      public void unlock() {
      sync.release(1);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      //AQS抽象类
      public final boolean release(int arg) {
      if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
      return true;
      }
      return false;
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      //Sync抽象类
      protected final boolean tryRelease(int releases) {
      if (!isHeldExclusively())
      throw new IllegalMonitorStateException();
      int nextc = getState() - releases;
      //因为可重入的原因, 写锁计数为 0, 才算释放成功
      boolean free = exclusiveCount(nextc) == 0;
      if (free)
      setExclusiveOwnerThread(null);
      setState(nextc);
      return free;
      }

      接下来执行唤醒流程sync.unparkSuccessor,即让老二恢复运行,这时t2在doAcquireShared内parkAndCheckInterrupt()处恢复运行。 这回t2再来一次for (;;) 执行tryAcquireShared成功则让读锁计数加一。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      private void doAcquireShared(int arg) {
      //将当前线程关联到一个 Node 对象上, 模式为共享模式
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
      //再一次尝试获取读锁
      int r = tryAcquireShared(arg);
      if (r >= 0) {
      setHeadAndPropagate(node, r);
      p.next = null; // help GC
      if (interrupted)
      selfInterrupt();
      failed = false;
      return;
      }
      }
      //是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      //Sync类
      protected final int tryAcquireShared(int unused) {
      Thread current = Thread.currentThread();
      int c = getState();
      //如果是其它线程持有写锁, 获取读锁失败
      if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
      return -1;
      int r = sharedCount(c);
      //读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且小于读锁计数, 并且尝试增加计数成功
      if (!readerShouldBlock() &&
      r < MAX_COUNT &&
      compareAndSetState(c, c + SHARED_UNIT)) {
      if (r == 0) {
      firstReader = current;
      firstReaderHoldCount = 1;
      } else if (firstReader == current) {
      firstReaderHoldCount++;
      } else {
      ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
      cachedHoldCounter = rh = readHolds.get();
      else if (rh.count == 0)
      readHolds.set(rh);
      rh.count++;
      }
      return 1;
      }
      return fullTryAcquireShared(current);
      }

      这时t2已经恢复运行,接下来t2调用setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      //AQS抽象类
      private void setHeadAndPropagate(Node node, int propagate) {
      Node h = head; // Record old head for check below
      //设置自己为 head
      setHead(node);
      //propagate 表示有共享资源(例如共享读锁或信号量)
      if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
      Node s = node.next;
      //如果是最后一个节点或者是等待共享读锁的节点
      if (s == null || s.isShared())
      doReleaseShared();
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      //AQS抽象类
      private void doReleaseShared() {
      //如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
      //如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析
      for (;;) {
      Node h = head;
      //队列还有节点
      if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
      //将头节点的状态从-1改为0,避免并发导致重复唤醒
      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
      continue; // loop to recheck cases
      //唤醒后继节点
      unparkSuccessor(h);
      }
      else if (ws == 0 &&
      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
      continue; // loop on failed CAS
      }
      if (h == head) // loop if head changed
      break;
      }
      }

      事情还没完,在setHeadAndPropagate方法内还会检查下一个节点是否是shared,如果是则调用 doReleaseShared()将head的状态从-1改为0并唤醒老二,这时t3在doAcquireShared内parkAndCheckInterrupt()处恢复运行。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      private void doAcquireShared(int arg) {
      //将当前线程关联到一个 Node 对象上, 模式为共享模式
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
      //再一次尝试获取读锁
      int r = tryAcquireShared(arg);
      if (r >= 0) {
      setHeadAndPropagate(node, r);
      p.next = null; // help GC
      if (interrupted)
      selfInterrupt();
      failed = false;
      return;
      }
      }
      //是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }

      t3这回再来一次for (;;)执行tryAcquireShared成功则让读锁计数加一。

      这时t3已经恢复运行,接下来t3调用setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。且下一个节点不是 shared 了,因此不会继续唤醒t4所在节点。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      //AQS抽象类
      private void setHeadAndPropagate(Node node, int propagate) {
      Node h = head; // Record old head for check below
      //设置自己为 head
      setHead(node);
      //propagate 表示有共享资源(例如共享读锁或信号量)
      if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
      Node s = node.next;
      //如果是最后一个节点或者是等待共享读锁的节点
      if (s == null || s.isShared())
      doReleaseShared();
      }
      }
    • ⑤现在假设t2 r.unlock, t2进入sync.releaseShared(1)中,调用tryReleaseShared(1)让计数减一,但由于计数还不为零。

      1
      2
      3
      4
      //ReadLock类
      public void unlock() {
      sync.releaseShared(1);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      //AQS抽象类
      public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
      }
      return false;
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      //Sync抽象类
      protected final boolean tryReleaseShared(int unused) {
      Thread current = Thread.currentThread();
      if (firstReader == current) {
      // assert firstReaderHoldCount > 0;
      if (firstReaderHoldCount == 1)
      firstReader = null;
      else
      firstReaderHoldCount--;
      } else {
      ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
      rh = readHolds.get();
      int count = rh.count;
      if (count <= 1) {
      readHolds.remove();
      if (count <= 0)
      throw unmatchedUnlockException();
      }
      --rh.count;
      }
      for (;;) {
      int c = getState();
      int nextc = c - SHARED_UNIT;
      if (compareAndSetState(c, nextc))
      // Releasing the read lock has no effect on readers,
      // but it may allow waiting writers to proceed if
      // both read and write locks are now free.
      //读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
      //计数为 0 才是真正释放
      return nextc == 0;
      }
      }
    • ⑥接着如果t3 r.unlock,t3也会进入sync.releaseShared(1) 中,调用tryReleaseShared(1)让计数减一,这回计数为零了,进入doReleaseShared()将头节点从-1改为0并唤醒老二,即:

      1
      2
      3
      4
      5
      6
      7
      8
      //AQS抽象类
      public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
      }
      return false;
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      //AQS抽象类
      private void doReleaseShared() {
      for (;;) {
      Node h = head;
      if (h != null && h != tail) {
      int ws = h.waitStatus;
      //如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0,防止 unparkSuccessor 被多次执行
      if (ws == Node.SIGNAL) {
      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
      continue; // loop to recheck cases
      unparkSuccessor(h);
      }
      //如果已经是 0 了,改为 -3,用来解决传播性
      else if (ws == 0 &&
      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
      continue; // loop on failed CAS
      }
      if (h == head) // loop if head changed
      break;
      }
      }

      之后t4在acquireQueued中parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束。

      1
      2
      3
      4
      5
      6
      //AQS抽象类中
      public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      //AQS抽象类中
      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }

3.3 使用StampedLock

  • 该类自JDK 8加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用。

  • StampedLock不支持条件变量,也不支持可重入。

  • 加解读锁:

    1
    2
    long stamp = lock.readLock();
    lock.unlockRead(stamp);
  • 加解写锁:

    1
    2
    long stamp = lock.writeLock();
    lock.unlockWrite(stamp);
  • 乐观读,StampedLock支持tryOptimisticRead()方法(乐观读),读取完毕后需要做一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

    1
    2
    3
    4
    5
    long stamp = lock.tryOptimisticRead();
    // 验戳
    if(!lock.validate(stamp)){
    // 锁升级
    }
  • 提供一个数据容器类,内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
      @Slf4j
    class DataContainerStamped {
    private int data;
    private final StampedLock lock = new StampedLock();
    public DataContainerStamped(int data) {
    this.data = data;
    }

    public int read(int readTime) {
    long stamp = lock.tryOptimisticRead();
    log.debug("optimistic read locking...{}", stamp);
    try {
    Thread.sleep(readTime);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    if (lock.validate(stamp)) {
    log.debug("read finish...{}, data:{}", stamp, data);
    return data;
    }
    // 锁升级:乐观读-->读锁
    log.debug("updating to read lock... {}", stamp);
    try {
    stamp = lock.readLock();
    log.debug("read lock {}", stamp);
    try {
    Thread.sleep(readTime);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.debug("read finish...{}, data:{}", stamp, data);
    return data;
    } finally {
    log.debug("read unlock {}", stamp);
    lock.unlockRead(stamp);
    }
    }

    public void write(int newData) {
    long stamp = lock.writeLock();
    log.debug("write lock {}", stamp);
    try {
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    this.data = newData;
    } finally {
    log.debug("write unlock {}", stamp);
    lock.unlockWrite(stamp);
    }
    }
    }
    • 测试读-读可以优化。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
        public class TestStampedLock {
      public static void main(String[] args) {
      DataContainerStamped dataContainer = new DataContainerStamped(1);
      new Thread(() -> {
      dataContainer.read(1000);
      }, "t1").start();
      try {
      Thread.sleep(500);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      new Thread(() -> {
      dataContainer.read(0);
      }, "t2").start();
      }
      }

      执行结果如下,可以看到实际没有加读锁:

      1
      2
      3
      4
      5
      15:07:00.018 org.example.DataContainerStamped [t1] - optimistic read locking...256
      15:07:00.527 org.example.DataContainerStamped [t2] - optimistic read locking...256
      15:07:00.528 org.example.DataContainerStamped [t2] - read finish...256, data:1
      15:07:01.038 org.example.DataContainerStamped [t1] - read finish...256, data:1

Process finished with exit code 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  
+ **测试读-写时可以优化。**

```java
public class TestStampedLock {
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1000);
}, "t1").start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
dataContainer.write(100);
}, "t2").start();
}
}

执行结果:

1
2
3
4
5
6
7
8
9
15:09:56.340 org.example.DataContainerStamped [t1] - optimistic read locking...256
15:09:56.843 org.example.DataContainerStamped [t2] - write lock 384
15:09:57.358 org.example.DataContainerStamped [t1] - updating to read lock... 256
15:09:58.854 org.example.DataContainerStamped [t2] - write unlock 384
15:09:58.854 org.example.DataContainerStamped [t1] - read lock 513
15:09:59.863 org.example.DataContainerStamped [t1] - read finish...513, data:100
15:09:59.863 org.example.DataContainerStamped [t1] - read unlock 513

Process finished with exit code 0

4、信号量

4.1 使用Semaphore

  • Semaphore即信号量,用来限制能同时访问共享资源的线程上限。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Slf4j
    public class TestSemaphore {
    public static void main(String[] args) {
    // 1. 创建 semaphore 对象
    Semaphore semaphore = new Semaphore(3);

    // 2. 10个线程同时运行
    for (int i = 0; i < 10; i++) {
    new Thread(() -> {
    try {
    semaphore.acquire();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    try {
    log.debug("running...");
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.debug("end...");
    } finally {
    semaphore.release();
    }
    }).start();
    }
    }
    }

    执行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    15:28:28.452 org.example.TestSemaphore [Thread-2] - running...
    15:28:28.452 org.example.TestSemaphore [Thread-0] - running...
    15:28:28.452 org.example.TestSemaphore [Thread-1] - running...
    15:28:29.458 org.example.TestSemaphore [Thread-1] - end...
    15:28:29.458 org.example.TestSemaphore [Thread-0] - end...
    15:28:29.458 org.example.TestSemaphore [Thread-2] - end...
    15:28:29.458 org.example.TestSemaphore [Thread-5] - running...
    15:28:29.458 org.example.TestSemaphore [Thread-3] - running...
    15:28:29.458 org.example.TestSemaphore [Thread-4] - running...
    15:28:30.460 org.example.TestSemaphore [Thread-3] - end...
    15:28:30.460 org.example.TestSemaphore [Thread-4] - end...
    15:28:30.461 org.example.TestSemaphore [Thread-6] - running...
    15:28:30.460 org.example.TestSemaphore [Thread-5] - end...
    15:28:30.461 org.example.TestSemaphore [Thread-7] - running...
    15:28:30.461 org.example.TestSemaphore [Thread-8] - running...
    15:28:31.464 org.example.TestSemaphore [Thread-8] - end...
    15:28:31.464 org.example.TestSemaphore [Thread-7] - end...
    15:28:31.464 org.example.TestSemaphore [Thread-6] - end...
    15:28:31.464 org.example.TestSemaphore [Thread-9] - running...
    15:28:32.467 org.example.TestSemaphore [Thread-9] - end...

    Process finished with exit code 0
  • Semaphore可以使用在自定义数据库连接池上。

    • 一个线上商城应用,QPS达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时 预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。

    • 使用Semaphore限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数。

    • 原本自定义数据库连接池:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      @Slf4j
      class Pool {
      // 1. 连接池大小
      private final int poolSize;
      // 2. 连接对象数组
      private Connection[] connections;
      // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
      private AtomicIntegerArray states;
      // 4. 构造方法初始化
      public Pool(int poolSize) {
      this.poolSize = poolSize;
      this.connections = new Connection[poolSize];
      this.states = new AtomicIntegerArray(new int[poolSize]);
      for (int i = 0; i < poolSize; i++) {
      connections[i] = new MockConnection("连接" + (i+1));
      }
      }
      // 5. 借连接
      public Connection borrow() {
      while(true) {
      for (int i = 0; i < poolSize; i++) {
      // 获取空闲连接
      if(states.get(i) == 0) {
      if (states.compareAndSet(i, 0, 1)) {
      log.debug("borrow {}", connections[i]);
      return connections[i];
      }
      }
      }
      // 如果没有空闲连接,当前线程进入等待
      synchronized (this) {
      try {
      log.debug("wait...");
      this.wait();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }
      }
      // 6. 归还连接
      public void free(Connection conn) {
      for (int i = 0; i < poolSize; i++) {
      if (connections[i] == conn) {
      states.set(i, 0);
      synchronized (this) {
      log.debug("free {}", conn);
      this.notifyAll();
      }
      break;
      }
      }
      }
      }

      class MockConnection implements Connection {
      private String name;

      public MockConnection(String name) {
      this.name = name;
      }

      @Override
      public String toString() {
      return "MockConnection{" +
      "name='" + name + '\'' +
      '}';
      }
      }

      测试代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      @Slf4j
      public class TestPools {
      public static void main(String[] args) {
      Pool pool = new Pool(2);
      for (int i = 0; i < 5; i++) {
      new Thread(() -> {
      Connection conn = pool.borrow();
      try {
      Thread.sleep(new Random().nextInt(1000));
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      pool.free(conn);
      }).start();
      }

      }
      }

      执行结果:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      15:51:34.811 org.example.Pool [Thread-2] - wait...
      15:51:34.811 org.example.Pool [Thread-1] - borrow MockConnection{name='连接2'}
      15:51:34.813 org.example.Pool [Thread-4] - wait...
      15:51:34.813 org.example.Pool [Thread-3] - wait...
      15:51:34.811 org.example.Pool [Thread-0] - borrow MockConnection{name='连接1'}
      15:51:34.883 org.example.Pool [Thread-0] - free MockConnection{name='连接1'}
      15:51:34.883 org.example.Pool [Thread-4] - wait...
      15:51:34.883 org.example.Pool [Thread-3] - borrow MockConnection{name='连接1'}
      15:51:34.883 org.example.Pool [Thread-2] - wait...
      15:51:35.336 org.example.Pool [Thread-1] - free MockConnection{name='连接2'}
      15:51:35.336 org.example.Pool [Thread-2] - borrow MockConnection{name='连接2'}
      15:51:35.336 org.example.Pool [Thread-4] - wait...
      15:51:35.614 org.example.Pool [Thread-3] - free MockConnection{name='连接1'}
      15:51:35.615 org.example.Pool [Thread-4] - borrow MockConnection{name='连接1'}
      15:51:36.118 org.example.Pool [Thread-4] - free MockConnection{name='连接1'}
      15:51:36.295 org.example.Pool [Thread-2] - free MockConnection{name='连接2'}

      Process finished with exit code 0
    • 使用Semaphore改进后的连接池代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      @Slf4j
      class Pool {
      // 1. 连接池大小
      private final int poolSize;

      // 2. 连接对象数组
      private Connection[] connections;

      // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
      private AtomicIntegerArray states;

      private Semaphore semaphore;

      // 4. 构造方法初始化
      public Pool(int poolSize) {
      this.poolSize = poolSize;
      // 让许可数与资源数一致
      this.semaphore = new Semaphore(poolSize);
      this.connections = new Connection[poolSize];
      this.states = new AtomicIntegerArray(new int[poolSize]);
      for (int i = 0; i < poolSize; i++) {
      connections[i] = new MockConnection("连接" + (i+1));
      }
      }

      // 5. 借连接
      public Connection borrow() {// t1, t2, t3
      // 获取许可
      try {
      semaphore.acquire(); // 没有许可的线程,在此等待
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      for (int i = 0; i < poolSize; i++) {
      // 获取空闲连接
      if(states.get(i) == 0) {
      if (states.compareAndSet(i, 0, 1)) {
      log.debug("borrow {}", connections[i]);
      return connections[i];
      }
      }
      }
      // 不会执行到这里
      return null;
      }
      // 6. 归还连接
      public void free(Connection conn) {
      for (int i = 0; i < poolSize; i++) {
      if (connections[i] == conn) {
      states.set(i, 0);
      log.debug("free {}", conn);
      semaphore.release();
      break;
      }
      }
      }
      }

4.2 Semaphore原理

  • Semaphore也是基于AQS实现的,其内部结构如下图:

  • ①从调用构造方法开始,假设传入许可数为3,这时5个线程来获取资源。

    1
    2
    3
    public Semaphore(int permits) {
    sync = new NonfairSync(permits);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
    super(permits);
    }

    protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
    setState(permits);
    }
    }
  • ②假设其中Thread-1,Thread-2,Thread-4 cas竞争成功(调用acquire()方法),而Thread-0和Thread-3竞争失败,进入AQS队列park阻塞。

    1
    2
    3
    4
    //Semaphore类
    public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    //AQS抽象类
    public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }
    1
    2
    3
    4
    5
    6
    7
    static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //Sync抽象类
    final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
    int available = getState();
    int remaining = available - acquires;
    //如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
    if (remaining < 0 ||
    //如果 cas 重试成功, 返回正数, 表示获取成功
    compareAndSetState(available, remaining))
    return remaining;
    }
    }

    而如果获取不到许可,会执行doAcquireSharedInterruptibly(arg):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    //AQS抽象类
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    //再次尝试获取许可
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    //成功后本线程出队(AQS), 所在 Node设置为 head
    //r 表示可用资源数, 为 0 则不会继续传播
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    //不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
  • ③假设这时Thread-4释放了permits(调用了release()),状态如下:

    1
    2
    3
    4
    //Semaphore类
    public void release() {
    sync.releaseShared(1);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    //AQS抽象类
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
    }
    return false;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //Sync抽象类
    protected final boolean tryReleaseShared(int releases) {
    for (;;) {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
    throw new Error("Maximum permit count exceeded");
    if (compareAndSetState(current, next))
    return true;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    //AQS抽象类
    private void doReleaseShared() {
    for (;;) {
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    //唤醒后继节点
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    if (h == head) // loop if head changed
    break;
    }
    }
  • ④这时Thread-0被唤醒,permits再次设置为0,设置自己为head节点,断开原来的head节点,unpark接下来的Thread-3节点,但由于permits是0,因此Thread-3在尝试不成功后再次进入park状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    //AQS抽象类
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    //再次尝试获取许可
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    //成功后本线程出队(AQS), 所在 Node设置为 head
    //r 表示可用资源数, 为 0 则不会继续传播
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    //不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //AQS抽象类
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    //下个节点不为null且为共享节点
    if (s == null || s.isShared())
    //唤醒后继共享节点
    doReleaseShared();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    //AQS抽象类
    private void doReleaseShared() {
    for (;;) {
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    //将自己的Status从-1变成0后唤醒后继节点
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    if (h == head) // loop if head changed
    break;
    }
    }

5、CountdownLatch

  • 用来进行线程同步协作,等待所有线程完成倒计时。

  • 其中构造参数用来初始化等待计数值,await()用来等待计数归零,countDown()用来让计数减一。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    @Slf4j
    public class TestCountdownLatch {
    public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(3);
    new Thread(() -> {
    log.debug("begin...");
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...{}", latch.getCount());
    }).start();
    new Thread(() -> {
    log.debug("begin...");
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...{}", latch.getCount());
    }).start();
    new Thread(() -> {
    log.debug("begin...");
    try {
    Thread.sleep(1500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...{}", latch.getCount());
    }).start();
    log.debug("waiting...");
    latch.await();
    log.debug("wait end...");
    }
    }

    执行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    14:59:26.184 org.example.TestCountdownLatch [main] - waiting...
    14:59:26.184 org.example.TestCountdownLatch [Thread-2] - begin...
    14:59:26.184 org.example.TestCountdownLatch [Thread-1] - begin...
    14:59:26.184 org.example.TestCountdownLatch [Thread-0] - begin...
    14:59:27.200 org.example.TestCountdownLatch [Thread-0] - end...2
    14:59:27.688 org.example.TestCountdownLatch [Thread-2] - end...1
    14:59:28.192 org.example.TestCountdownLatch [Thread-1] - end...0
    14:59:28.192 org.example.TestCountdownLatch [main] - wait end...

    Process finished with exit code 0
  • 可以配合线程池使用,改进如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @Slf4j
    public class TestCountdownLatch {
    public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(3);
    ExecutorService service = Executors.newFixedThreadPool(4);
    service.submit(() -> {
    log.debug("begin...");
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...{}", latch.getCount());
    });
    service.submit(() -> {
    log.debug("begin...");
    try {
    Thread.sleep(1500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...{}", latch.getCount());
    });
    service.submit(() -> {
    log.debug("begin...");
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...{}", latch.getCount());
    });
    service.submit(()->{
    try {
    log.debug("waiting...");
    latch.await();
    log.debug("wait end...");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }
    }

    执行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    15:02:59.023 org.example.TestCountdownLatch [main] - waiting...
    15:02:59.023 org.example.TestCountdownLatch [Thread-2] - begin...
    15:02:59.023 org.example.TestCountdownLatch [Thread-1] - begin...
    15:02:59.023 org.example.TestCountdownLatch [Thread-0] - begin...
    15:03:00.035 org.example.TestCountdownLatch [Thread-0] - end...2
    15:03:00.533 org.example.TestCountdownLatch [Thread-2] - end...1
    15:03:01.039 org.example.TestCountdownLatch [Thread-1] - end...0
    15:03:01.039 org.example.TestCountdownLatch [main] - wait end...

    Process finished with exit code 0

6、CyclicBarrier

  • 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置计数个数,每个线程执行到某个需要“同步”的时刻调用await()方法进行等待,当等待的线程数满足计数个数时,继续执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    @Slf4j
    public class TestCyclicBarrier {
    public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(2);
    CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
    log.debug("task1, task2 finish...");
    });
    for (int i = 0; i < 3; i++) {
    service.submit(() -> {
    log.debug("task1 begin...");
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    try {
    barrier.await(); // 2-1=1,阻塞
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    });
    service.submit(() -> {
    log.debug("task2 begin...");
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    try {
    barrier.await(); // 1-1=0
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    });
    }
    service.shutdown();
    }
    }

    执行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    15:09:03.303 org.example.TestCyclicBarrier [pool-1-thread-2] - task2 begin...
    15:09:03.303 org.example.TestCyclicBarrier [pool-1-thread-1] - task1 begin...
    15:09:05.314 org.example.TestCyclicBarrier [pool-1-thread-2] - task1, task2 finish...
    15:09:05.314 org.example.TestCyclicBarrier [pool-1-thread-2] - task1 begin...
    15:09:05.314 org.example.TestCyclicBarrier [pool-1-thread-1] - task2 begin...
    15:09:07.322 org.example.TestCyclicBarrier [pool-1-thread-1] - task1, task2 finish...
    15:09:07.322 org.example.TestCyclicBarrier [pool-1-thread-1] - task1 begin...
    15:09:07.322 org.example.TestCyclicBarrier [pool-1-thread-2] - task2 begin...
    15:09:09.323 org.example.TestCyclicBarrier [pool-1-thread-2] - task1, task2 finish...

    Process finished with exit code 0