Java之AQS
1、ReentrantLock介绍
相对于synchronized它具备如下特点:
- 可中断。
- 可以设置超时时间。
- 可以设置为公平锁。
- 支持多个条件变量。
- 与synchronized一样,都支持可重入。
基本语法:
1
2
3
4
5
6
7
8// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
1.1 可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestReentrantLock {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
log.debug("execute main");
method1();
}finally {
}
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
} finally {
lock.unlock();
}
}
}执行结果:
1
2
311:41:05.613 org.example.TestReentrantLock [main] - execute main
11:41:05.617 org.example.TestReentrantLock [main] - execute method1
11:41:05.617 org.example.TestReentrantLock [main] - execute method2
1.2 可打断
可打断是指当前线程在等待锁释放的过程中可以被其它线程所打断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TestReentrantLock {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
//如果没有竞争那么此方法会获取lock对象锁
//如果有竞争就进入阻塞队列,可以被其它线程用interrupt方法打断
log.debug("尝试获取锁");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("没有获得锁,返回");
return;
}
try {
log.debug("获得到锁");
} finally {
lock.unlock();
}
},"t1");
//主线程先获得锁
lock.lock();
t1.start();
Thread.sleep(1000);
log.debug("打断t1");
t1.interrupt();
}
}执行结果:
1
2
3
4
5
6
7
8
911:48:27.116 org.example.TestReentrantLock [t1] - 尝试获取锁
11:48:28.117 org.example.TestReentrantLock [main] - 打断t1
11:48:28.117 org.example.TestReentrantLock [t1] - 没有获得锁,返回
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at org.example.TestReentrantLock.lambda$main$0(TestReentrantLock.java:21)
at java.lang.Thread.run(Thread.java:748)
1.3 锁超时
锁超时是指在等待获得锁超过一定时间就会主动放弃等待,表示此次尝试获取锁失败,即可以避免该线程无限等待下去。
①获取不到锁则立刻放弃等待的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestReentrantLock {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("尝试获得锁");
if (!lock.tryLock()) {
log.debug("获取不到锁");
return;
}
try {
log.debug("获得到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
t1.start();
}
}运行结果:
1
211:54:35.706 org.example.TestReentrantLock [t1] - 尝试获得锁
11:54:35.706 org.example.TestReentrantLock [t1] - 获取不到锁②设置超时时间等待的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TestReentrantLock {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("尝试获得锁");
try {
if (! lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取不到锁");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("获取不到锁");
return;
}
try {
log.debug("获得到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得到锁");
t1.start();
Thread.sleep(2000);
log.debug("释放了锁");
lock.unlock();
}
}执行结果:
1
2
3
411:58:01.187 org.example.TestReentrantLock [main] - 获得到锁
11:58:01.195 org.example.TestReentrantLock [t1] - 尝试获得锁
11:58:02.209 org.example.TestReentrantLock [t1] - 获取不到锁
11:58:03.196 org.example.TestReentrantLock [main] - 释放了锁
1.4 条件变量
- synchronized中也有条件变量,就是我们讲原理时那个waitSet休息室,当条件不满足时进入waitSet等待。
- ReentrantLock的条件变量比synchronized强大之处在于,它是支持多个条件变量的,这就好比:
- synchronized是那些不满足条件的线程都在一间休息室等消息。
- 而 ReentrantLock支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒。
- 使用要点:
- await前需要获得锁。
- await执行后,会释放锁,进入conditionObject等待。
- await的线程被唤醒(或打断、或超时)取重新竞争lock锁。
- 竞争lock锁成功后,从await后继续执行。
1 |
|
执行结果:
1 | 14:12:06.972 org.example.TestReentrantLock [小南] - 有烟没?[false] |
2、ReentrantLock原理
ReentrantLock是基于AQS(全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架)实现的。
AQS特点:
用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁。
getState - 获取state状态。
setState - 设置state状态。
compareAndSetState - cas机制设置state状态。
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源。
提供了基于FIFO的等待队列,类似于Monitor的EntryList。
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor的WaitSet。
子类主要实现这样一些方法(默认抛出UnsupportedOperationException)
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
获取锁的姿势:
1
2
3
4// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}- 释放锁的姿势:
1
2
3
4// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}ReentrantLock内部结构:
2.1 非公平锁实现原理
- ①从构造器开始看,默认为非公平锁实现。
1 | //同步器提供所有实施机制 |
其中的NonfairSync继承自AQS:
1 | //同步对象的非公平锁 |
②当线程Thread-0执行ReentrantLock.lock()时,尝试将线程状态设置为1,如果设置成功就设置当前线程为同步器的Owner。
1
2
3
4
5
6//如果没有其他线程持有该锁,则获取该锁并立即返回,将锁保持计数设置为1。
//如果当前线程已经持有该锁,则持有计数将增加1,该方法将立即返回。
//如果锁是由另一个线程持有的,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到获取了锁为止,此时,锁持有计数被设置为1。
public void lock() {
sync.lock();
}③这时如果线程Thread-1来竞争资源。
1
2
3
4
5
6
7//AQS抽象类
//在独占模式下获取,忽略中断。通过至少调用一次tryAcquire()并返回成功来实现。否则,线程将排队,并可能反复阻塞和解除阻塞,并调用tryAcquire()直到成功。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}①CAS尝试将state由0改为1,结果失败。
②进入acquire()的tryAcquire()逻辑,这时state已经是1,结果仍然失败。
③接下来进入addWaiter逻辑,构造Node队列。
- 图中黄色三角表示该Node的waitStatus状态,其中0为默认正常状态。
- Node的创建是懒惰的。
- 其中第一个Node称为Dummy(哑元)或哨兵,用来占位,并不关联线程。
④接下来进入acquireQueued()逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26//AQS抽象类
//以排他的不间断模式获取已在队列中的线程,用于条件等待方法以及获取。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
//获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
p.next = null; // help GC
failed = false;
//返回中断标记 false
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}①acquireQueued会在一个死循环中不断尝试获得锁,失败后进入park阻塞。
②如果自己是紧邻着head(排第二位),那么再次tryAcquire尝试获取锁,当然这时state仍为1,失败。
③进入shouldParkAfterFailedAcquire逻辑,将前驱node,即head的waitStatus改为-1,这次返回false。表示waitStatus为-1的节点有资格唤醒它的后继节点。
④当再次进入shouldParkAfterFailedAcquire时,这时因为其前驱node的waitStatus已经是-1,这次返回true。
⑤接着进入parkAndCheckInterrupt,Thread-1 park(灰色表示)。
1
2
3
4
5
6//AQS抽象类
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
return Thread.interrupted();
}
④再次有多个线程经历上述过程竞争失败,变成这个样子。
⑤这时如果Thread-0释放锁(调用ReentrantLock.unlock()),进入tryRelease流程,如果成功:
1
2
3
4//如果当前线程是此锁的持有者,则保留计数将减少。 如果保持计数现在为零,则释放锁定。如果当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。
public void unlock() {
sync.release(1);
}1
2
3
4
5
6
7
8
9
10
11
12//AQS抽象类
//以独占模式发布。如果tryRelease返回true,则通过解锁一个或多个线程tryRelease实现。
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14//AQS抽象类
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}①设置exclusiveOwnerThread为null。
②设置state=0。(此时执行完tryRelease())
③这时回到release(),如果当前队列不为null,并且head的waitStatus=-1,进入unparkSuccessor流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//AQS抽象类
//唤醒节点的后继者(如果存在)。
private void unparkSuccessor(Node node) {
//如果状态是否定的(即可能需要信号),请尝试清除以预期发出信号。如果失败或等待线程更改状态,则可以。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//释放线程保留在后续线程中,该线程通常只是下一个节点。但是,如果已取消或明显为空,请从尾部向后移动以找到实际的未取消后继。
//找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
//不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}①找到队列中离head最近的一个Node(没取消的),unpark恢复其运行,本例中即为Thread-1。
②这时回到Thread-1的acquireQueued流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//以排他的不间断模式获取已在队列中的线程,用于条件等待方法以及获取。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
//获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
p.next = null; // help GC
failed = false;
//返回中断标记
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}①如果加锁成功(没有竞争),会设置:
exclusiveOwnerThread为Thread-1,state=1(在tryAcquire(arg)方法中执行)。
head指向刚刚Thread-1所在的Node,该Node清空Thread。
原本的head因为从链表断开,而可被垃圾回收。
②如果这时候有其它线程来竞争(非公平的体现),例如这时有Thread-4来了, 且被 Thread-4占了先。
+ Thread-4被设置为exclusiveOwnerThread,state=1。 + Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞。
2.2 可重入原理
在调用ReentrantLock.lock()时如果发现有竞争则会进入acquire()–>tryAcquire()–>nonfairTryAcquire(),相关可重入的代码在nonfairTryAcquire()里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//Sync抽象类
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果还没有获得锁
if (c == 0) {
//尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
//state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取失败, 回到调用处
return false;
}在调用ReentrantLock.unlock()进行解锁时,会执行sync.release(1)–>tryRelease(arg),相关可重入的代码在tryRelease()里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15//Sync抽象类
protected final boolean tryRelease(int releases) {
//state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.3 可打断原理
不可打断模式
在此模式下,即使它被打断,仍会驻留在AQS队列中,一直要等到获得锁后方能得知自己被打断了。
在执行ReentrantLock.lock()时会执行到acquire():
1
2
3
4
5
6
7//AQS抽象类
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果打断状态为 true
selfInterrupt();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//AQS抽象类
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}1
2
3
4
5//AQS抽象类
static void selfInterrupt() {
//重新产生一次中断
Thread.currentThread().interrupt();
}1
2
3
4
5
6
7//AQS抽象类
private final boolean parkAndCheckInterrupt() {
//如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
//interrupted 会清除打断标记
return Thread.interrupted();
}
可打断模式
当调用ReentrantLock.lockInterruptibly()时会执行acquireInterruptibly–>doAcquireInterruptibly():
1
2
3public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}1
2
3
4
5
6
7
8
9//AQS抽象类
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//如果没有获得到锁
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//AQS抽象类
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//在park过程中如果被interrupt会抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.4 公平锁实现原理
公平锁与非公平锁主要区别在于tryAcquire方法的实现:
非公平锁的tryAcquire():
1
2
3
4//NonfairSync类
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//Sync抽象类
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}公平锁的tryAcquire():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//FairSync类
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}1
2
3
4
5
6
7
8
9
10
11
12//AQS抽象类
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//h != t 时表示队列中有 Node
return h != t &&
//并且队列中没有第二个node
((s = h.next) == null ||
//或者队列中第二个node不是当前线程
s.thread != Thread.currentThread());
}
2.5 条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject。
await流程
开始Thread-0持有锁,调用await。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31//1.如果当前线程被中断,则抛出InterruptedException。
//2.保存getState返回的锁定状态。
//3.使用保存的状态作为参数调用release ,如果失败则抛出IllegalMonitorStateException。
//4.阻塞直到发信号或被打断。
//5.通过调用以保存状态作为参数的acquire专用版本来重新acquire。
//6.如果在步骤4中被阻止而被中断,则抛出InterruptedException。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加一个 Node 至等待队列
AbstractQueuedSynchronizer.Node node = addConditionWaiter();
//释放节点持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
//当前线程阻塞
LockSupport.park(this);
//如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//所有已取消的 Node 从队列链表删除
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}①进入ConditionObject的addConditionWaiter流程,会创建新的Node状态为-2(Node.CONDITION),关联Thread-0,加入等待队列尾部。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private AbstractQueuedSynchronizer.Node addConditionWaiter() {
AbstractQueuedSynchronizer.Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个关联当前线程的新 Node, 添加至队列尾部
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), AbstractQueuedSynchronizer.Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}②接下来进入AQS的fullyRelease流程,释放同步器上的锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//因为某线程可能重入,需要将 state 全部释放
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}1
2
3
4
5
6
7
8
9
10public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}1
2
3
4
5
6
7
8
9
10
11
12protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}③unpark AQS队列中的下一个节点竞争锁,假设没有其他竞争线程,那么Thread-1竞争成功。
④阻塞Thread-0。
signal流程
假设Thread-1要来唤醒Thread-0,会调用ConditionObject.signal():
1
2
3
4
5
6
7
8
9//必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁。
public final void signal() {
//判断调用该方法的线程是不是锁的持有者,如果不是则抛异常。
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
AbstractQueuedSynchronizer.Node first = firstWaiter;
if (first != null)
doSignal(first);
}接着进入ConditionObject的doSignal流程,取得等待队列中第一个 Node,即Thread-0所在Node。
1
2
3
4
5
6
7
8private void doSignal(AbstractQueuedSynchronizer.Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}接着执行transferForSignal流程,将该Node加入AQS队列尾部,将Thread-0的waitStatus改为0,Thread-3的waitStatus改为-1。
1
2
3
4
5
6
7
8
9
10
11
12
13final boolean transferForSignal(Node node) {
//如果无法更改waitStatus,则该节点已被取消。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//拼接到队列上并尝试设置前任的waitStatus来指示线程(可能)正在等待。如果取消设置或尝试设置waitStatus失败,请唤醒以重新同步(在这种情况下,waitStatus可能会短暂而无害地出现错误)。
//将该Node加入AQS队列尾部,返回该结点的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
3、读写锁
3.1 使用ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用读写锁让”读-读”可以并发,提高性能。
提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法。
1 |
|
+ **测试读锁-读锁可以并发。**
1
2
3
4
5
6
7
8
9
10
11
12
public class TestReadWriteLock {
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
}
执行结果如下,从这里可以看到Thread-0锁定期间,Thread-1的读操作不受影响:
1
2
3
4
5
6
7
8
20:49:41.307 org.example.DataContainer [t1] - 获取读锁...
20:49:41.307 org.example.DataContainer [t2] - 获取读锁...
20:49:41.307 org.example.DataContainer [t2] - 读取
20:49:41.307 org.example.DataContainer [t1] - 读取
20:49:42.311 org.example.DataContainer [t1] - 释放读锁...
20:49:42.311 org.example.DataContainer [t2] - 释放读锁...
Process finished with exit code 0
+ **测试读锁-写锁相互阻塞。**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestReadWriteLock {
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
}
执行结果:
1
2
3
4
5
6
7
8
20:51:45.399 org.example.DataContainer [t1] - 获取读锁...
20:51:45.407 org.example.DataContainer [t1] - 读取
20:51:45.504 org.example.DataContainer [t2] - 获取写锁...
20:51:46.408 org.example.DataContainer [t1] - 释放读锁...
20:51:46.408 org.example.DataContainer [t2] - 写入
20:51:47.421 org.example.DataContainer [t2] - 释放写锁...
Process finished with exit code 0
+ **写锁-写锁也是相互阻塞的。**
注意事项
读锁不支持条件变量。
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待。
1
2
3
4
5
6
7
8
9
10
11
12r.lock();
try {
// ...
w.lock();
try {
// ...
} finally{
w.unlock();
}
} finally{
r.unlock();
}重入时降级支持:即持有写锁的情况下去获取读锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31class CachedData {
Object data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
}
// 自己用完数据, 释放读锁
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
3.2 ReentrantReadWriteLock原理
读写锁用的是同一个Sync同步器,因此等待队列、state等也是同一个。
以t1 w.lock –> t2 r.lock–> t3 r.lock –> t4 w.lock –> t1 w.unlock –> t2 r.unlock –> t3 r.unlock为例。
①t1调用w.lock()成功上锁,流程与ReentrantLock加锁相比没有特殊之处,不同是写锁状态占了state的低16位,而读锁使用的是state的高16位。
1
2
3
4//WriteLock类
public void lock() {
sync.acquire(1);
}1
2
3
4
5
6//AQS抽象类
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29//Sync抽象类
protected final boolean tryAcquire(int acquires) {
//获得低 16 位, 代表写锁的 state 计数
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//c != 0 and w == 0 表示有读锁, 或者如果 exclusiveOwnerThread 不是自己
if (w == 0 || current != getExclusiveOwnerThread())
//获得锁失败
return false;
//写锁计数超过低 16 位, 报异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//写锁重入, 获得锁成功
setState(c + acquires);
return true;
}
//判断写锁是否该阻塞, 或者尝试更改计数失败
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
//获得锁失败
return false;
//获得锁成功
setExclusiveOwnerThread(current);
return true;
}1
2
3
4
5//NonfairSync类
//非公平锁 writerShouldBlock 总是返回 false, 无需阻塞
final boolean writerShouldBlock() {
return false; // writers can always barge
}②t2执行r.lock,这时进入读锁的sync.acquireShared(1) 流程,首先会进入tryAcquireShared流程。如果有写锁占据,那么tryAcquireShared返回-1表示失败。(tryAcquireShared 返回值表示:-1 表示失败;0 表示成功,但后继节点不会继续唤醒;正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1)
1
2
3
4//ReadLock类
public void lock() {
sync.acquireShared(1);
}1
2
3
4
5//AQS抽象类
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//Sync类
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//如果是其它线程持有写锁, 获取读锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且小于读锁计数, 并且尝试增加计数成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}这时会进入sync.doAcquireShared(1) 流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private void doAcquireShared(int arg) {
//将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//再一次尝试获取读锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}- 首先也是调用addWaiter添加节点,不同之处在于节点被设置为Node.SHARED模式而非Node.EXCLUSIVE模式,注意此时t2仍处于活跃状态。
t2会看看自己的节点是不是老二,如果是,还会再次调用tryAcquireShared(1)来尝试获取锁。
如果没有成功,在doAcquireShared内for(;;)循环一次,把前驱节点的waitStatus改为-1(在shouldParkAfterFailedAcquire()中实现),再for(;;)循环一 次尝试tryAcquireShared(1)如果还不成功,那么在parkAndCheckInterrupt()处park。
③这种状态下,假设又有t3加读锁和t4加写锁,这期间t1仍然持有锁,就变成了下面的样子:
④这时如果t1 w.unlock,会走到写锁的sync.release(1)流程,调用sync.tryRelease(1)成功,变成下面的样子。
1
2
3
4//WriteLock类
public void unlock() {
sync.release(1);
}1
2
3
4
5
6
7
8
9
10//AQS抽象类
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}1
2
3
4
5
6
7
8
9
10
11
12//Sync抽象类
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//因为可重入的原因, 写锁计数为 0, 才算释放成功
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}接下来执行唤醒流程sync.unparkSuccessor,即让老二恢复运行,这时t2在doAcquireShared内parkAndCheckInterrupt()处恢复运行。 这回t2再来一次for (;;) 执行tryAcquireShared成功则让读锁计数加一。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private void doAcquireShared(int arg) {
//将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//再一次尝试获取读锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//Sync类
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//如果是其它线程持有写锁, 获取读锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且小于读锁计数, 并且尝试增加计数成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}这时t2已经恢复运行,接下来t2调用setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14//AQS抽象类
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//设置自己为 head
setHead(node);
//propagate 表示有共享资源(例如共享读锁或信号量)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果是最后一个节点或者是等待共享读锁的节点
if (s == null || s.isShared())
doReleaseShared();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//AQS抽象类
private void doReleaseShared() {
//如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
//如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析
for (;;) {
Node h = head;
//队列还有节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//将头节点的状态从-1改为0,避免并发导致重复唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}事情还没完,在setHeadAndPropagate方法内还会检查下一个节点是否是shared,如果是则调用 doReleaseShared()将head的状态从-1改为0并唤醒老二,这时t3在doAcquireShared内parkAndCheckInterrupt()处恢复运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private void doAcquireShared(int arg) {
//将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//再一次尝试获取读锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}t3这回再来一次for (;;)执行tryAcquireShared成功则让读锁计数加一。
这时t3已经恢复运行,接下来t3调用setHeadAndPropagate(node, 1),它原本所在节点被置为头节点。且下一个节点不是 shared 了,因此不会继续唤醒t4所在节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14//AQS抽象类
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//设置自己为 head
setHead(node);
//propagate 表示有共享资源(例如共享读锁或信号量)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果是最后一个节点或者是等待共享读锁的节点
if (s == null || s.isShared())
doReleaseShared();
}
}⑤现在假设t2 r.unlock, t2进入sync.releaseShared(1)中,调用tryReleaseShared(1)让计数减一,但由于计数还不为零。
1
2
3
4//ReadLock类
public void unlock() {
sync.releaseShared(1);
}1
2
3
4
5
6
7
8//AQS抽象类
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33//Sync抽象类
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
//读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
//计数为 0 才是真正释放
return nextc == 0;
}
}⑥接着如果t3 r.unlock,t3也会进入sync.releaseShared(1) 中,调用tryReleaseShared(1)让计数减一,这回计数为零了,进入doReleaseShared()将头节点从-1改为0并唤醒老二,即:
1
2
3
4
5
6
7
8//AQS抽象类
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//AQS抽象类
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0,防止 unparkSuccessor 被多次执行
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果已经是 0 了,改为 -3,用来解决传播性
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}之后t4在acquireQueued中parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束。
1
2
3
4
5
6//AQS抽象类中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22//AQS抽象类中
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3 使用StampedLock
该类自JDK 8加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用。
StampedLock不支持条件变量,也不支持可重入。
加解读锁:
1
2long stamp = lock.readLock();
lock.unlockRead(stamp);加解写锁:
1
2long stamp = lock.writeLock();
lock.unlockWrite(stamp);乐观读,StampedLock支持tryOptimisticRead()方法(乐观读),读取完毕后需要做一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1
2
3
4
5long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}提供一个数据容器类,内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
try {
Thread.sleep(readTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 锁升级:乐观读-->读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
try {
Thread.sleep(readTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}测试读-读可以优化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class TestStampedLock {
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1000);
}, "t1").start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
dataContainer.read(0);
}, "t2").start();
}
}执行结果如下,可以看到实际没有加读锁:
1
2
3
4
515:07:00.018 org.example.DataContainerStamped [t1] - optimistic read locking...256
15:07:00.527 org.example.DataContainerStamped [t2] - optimistic read locking...256
15:07:00.528 org.example.DataContainerStamped [t2] - read finish...256, data:1
15:07:01.038 org.example.DataContainerStamped [t1] - read finish...256, data:1
Process finished with exit code 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
+ **测试读-写时可以优化。**
```java
public class TestStampedLock {
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1000);
}, "t1").start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
dataContainer.write(100);
}, "t2").start();
}
}
执行结果:
1
2
3
4
5
6
7
8
9
15:09:56.340 org.example.DataContainerStamped [t1] - optimistic read locking...256
15:09:56.843 org.example.DataContainerStamped [t2] - write lock 384
15:09:57.358 org.example.DataContainerStamped [t1] - updating to read lock... 256
15:09:58.854 org.example.DataContainerStamped [t2] - write unlock 384
15:09:58.854 org.example.DataContainerStamped [t1] - read lock 513
15:09:59.863 org.example.DataContainerStamped [t1] - read finish...513, data:100
15:09:59.863 org.example.DataContainerStamped [t1] - read unlock 513
Process finished with exit code 0
4、信号量
4.1 使用Semaphore
Semaphore即信号量,用来限制能同时访问共享资源的线程上限。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TestSemaphore {
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("end...");
} finally {
semaphore.release();
}
}).start();
}
}
}执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2215:28:28.452 org.example.TestSemaphore [Thread-2] - running...
15:28:28.452 org.example.TestSemaphore [Thread-0] - running...
15:28:28.452 org.example.TestSemaphore [Thread-1] - running...
15:28:29.458 org.example.TestSemaphore [Thread-1] - end...
15:28:29.458 org.example.TestSemaphore [Thread-0] - end...
15:28:29.458 org.example.TestSemaphore [Thread-2] - end...
15:28:29.458 org.example.TestSemaphore [Thread-5] - running...
15:28:29.458 org.example.TestSemaphore [Thread-3] - running...
15:28:29.458 org.example.TestSemaphore [Thread-4] - running...
15:28:30.460 org.example.TestSemaphore [Thread-3] - end...
15:28:30.460 org.example.TestSemaphore [Thread-4] - end...
15:28:30.461 org.example.TestSemaphore [Thread-6] - running...
15:28:30.460 org.example.TestSemaphore [Thread-5] - end...
15:28:30.461 org.example.TestSemaphore [Thread-7] - running...
15:28:30.461 org.example.TestSemaphore [Thread-8] - running...
15:28:31.464 org.example.TestSemaphore [Thread-8] - end...
15:28:31.464 org.example.TestSemaphore [Thread-7] - end...
15:28:31.464 org.example.TestSemaphore [Thread-6] - end...
15:28:31.464 org.example.TestSemaphore [Thread-9] - running...
15:28:32.467 org.example.TestSemaphore [Thread-9] - end...
Process finished with exit code 0Semaphore可以使用在自定义数据库连接池上。
一个线上商城应用,QPS达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时 预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
使用Semaphore限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数。
原本自定义数据库连接池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}
// 5. 借连接
public Connection borrow() {
while(true) {
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 如果没有空闲连接,当前线程进入等待
synchronized (this) {
try {
log.debug("wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
synchronized (this) {
log.debug("free {}", conn);
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
private String name;
public MockConnection(String name) {
this.name = name;
}
public String toString() {
return "MockConnection{" +
"name='" + name + '\'' +
'}';
}
}测试代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestPools {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
}
}执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1815:51:34.811 org.example.Pool [Thread-2] - wait...
15:51:34.811 org.example.Pool [Thread-1] - borrow MockConnection{name='连接2'}
15:51:34.813 org.example.Pool [Thread-4] - wait...
15:51:34.813 org.example.Pool [Thread-3] - wait...
15:51:34.811 org.example.Pool [Thread-0] - borrow MockConnection{name='连接1'}
15:51:34.883 org.example.Pool [Thread-0] - free MockConnection{name='连接1'}
15:51:34.883 org.example.Pool [Thread-4] - wait...
15:51:34.883 org.example.Pool [Thread-3] - borrow MockConnection{name='连接1'}
15:51:34.883 org.example.Pool [Thread-2] - wait...
15:51:35.336 org.example.Pool [Thread-1] - free MockConnection{name='连接2'}
15:51:35.336 org.example.Pool [Thread-2] - borrow MockConnection{name='连接2'}
15:51:35.336 org.example.Pool [Thread-4] - wait...
15:51:35.614 org.example.Pool [Thread-3] - free MockConnection{name='连接1'}
15:51:35.615 org.example.Pool [Thread-4] - borrow MockConnection{name='连接1'}
15:51:36.118 org.example.Pool [Thread-4] - free MockConnection{name='连接1'}
15:51:36.295 org.example.Pool [Thread-2] - free MockConnection{name='连接2'}
Process finished with exit code 0使用Semaphore改进后的连接池代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
private Semaphore semaphore;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
// 让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}
// 5. 借连接
public Connection borrow() {// t1, t2, t3
// 获取许可
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 不会执行到这里
return null;
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug("free {}", conn);
semaphore.release();
break;
}
}
}
}
4.2 Semaphore原理
Semaphore也是基于AQS实现的,其内部结构如下图:
①从调用构造方法开始,假设传入许可数为3,这时5个线程来获取资源。
1
2
3public Semaphore(int permits) {
sync = new NonfairSync(permits);
}1
2
3
4
5
6
7
8
9
10
11static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}1
2
3
4
5
6
7abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
}②假设其中Thread-1,Thread-2,Thread-4 cas竞争成功(调用acquire()方法),而Thread-0和Thread-3竞争失败,进入AQS队列park阻塞。
1
2
3
4//Semaphore类
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}1
2
3
4
5
6
7
8//AQS抽象类
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}1
2
3
4
5
6
7static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}1
2
3
4
5
6
7
8
9
10
11
12//Sync抽象类
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
//如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
if (remaining < 0 ||
//如果 cas 重试成功, 返回正数, 表示获取成功
compareAndSetState(available, remaining))
return remaining;
}
}而如果获取不到许可,会执行doAcquireSharedInterruptibly(arg):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//AQS抽象类
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//再次尝试获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
//成功后本线程出队(AQS), 所在 Node设置为 head
//r 表示可用资源数, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}③假设这时Thread-4释放了permits(调用了release()),状态如下:
1
2
3
4//Semaphore类
public void release() {
sync.releaseShared(1);
}1
2
3
4
5
6
7
8//AQS抽象类
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}1
2
3
4
5
6
7
8
9
10
11//Sync抽象类
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//AQS抽象类
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}④这时Thread-0被唤醒,permits再次设置为0,设置自己为head节点,断开原来的head节点,unpark接下来的Thread-3节点,但由于permits是0,因此Thread-3在尝试不成功后再次进入park状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//AQS抽象类
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//再次尝试获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
//成功后本线程出队(AQS), 所在 Node设置为 head
//r 表示可用资源数, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13//AQS抽象类
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//下个节点不为null且为共享节点
if (s == null || s.isShared())
//唤醒后继共享节点
doReleaseShared();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//AQS抽象类
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//将自己的Status从-1变成0后唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
5、CountdownLatch
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await()用来等待计数归零,countDown()用来让计数减一。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TestCountdownLatch {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
log.debug("begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
new Thread(() -> {
log.debug("begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
new Thread(() -> {
log.debug("begin...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
log.debug("waiting...");
latch.await();
log.debug("wait end...");
}
}执行结果:
1
2
3
4
5
6
7
8
9
1014:59:26.184 org.example.TestCountdownLatch [main] - waiting...
14:59:26.184 org.example.TestCountdownLatch [Thread-2] - begin...
14:59:26.184 org.example.TestCountdownLatch [Thread-1] - begin...
14:59:26.184 org.example.TestCountdownLatch [Thread-0] - begin...
14:59:27.200 org.example.TestCountdownLatch [Thread-0] - end...2
14:59:27.688 org.example.TestCountdownLatch [Thread-2] - end...1
14:59:28.192 org.example.TestCountdownLatch [Thread-1] - end...0
14:59:28.192 org.example.TestCountdownLatch [main] - wait end...
Process finished with exit code 0可以配合线程池使用,改进如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestCountdownLatch {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(4);
service.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(()->{
try {
log.debug("waiting...");
latch.await();
log.debug("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}执行结果:
1
2
3
4
5
6
7
8
9
1015:02:59.023 org.example.TestCountdownLatch [main] - waiting...
15:02:59.023 org.example.TestCountdownLatch [Thread-2] - begin...
15:02:59.023 org.example.TestCountdownLatch [Thread-1] - begin...
15:02:59.023 org.example.TestCountdownLatch [Thread-0] - begin...
15:03:00.035 org.example.TestCountdownLatch [Thread-0] - end...2
15:03:00.533 org.example.TestCountdownLatch [Thread-2] - end...1
15:03:01.039 org.example.TestCountdownLatch [Thread-1] - end...0
15:03:01.039 org.example.TestCountdownLatch [main] - wait end...
Process finished with exit code 0
6、CyclicBarrier
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置计数个数,每个线程执行到某个需要“同步”的时刻调用await()方法进行等待,当等待的线程数满足计数个数时,继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class TestCyclicBarrier {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
log.debug("task1, task2 finish...");
});
for (int i = 0; i < 3; i++) {
service.submit(() -> {
log.debug("task1 begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
barrier.await(); // 2-1=1,阻塞
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(() -> {
log.debug("task2 begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
barrier.await(); // 1-1=0
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}执行结果:
1
2
3
4
5
6
7
8
9
10
1115:09:03.303 org.example.TestCyclicBarrier [pool-1-thread-2] - task2 begin...
15:09:03.303 org.example.TestCyclicBarrier [pool-1-thread-1] - task1 begin...
15:09:05.314 org.example.TestCyclicBarrier [pool-1-thread-2] - task1, task2 finish...
15:09:05.314 org.example.TestCyclicBarrier [pool-1-thread-2] - task1 begin...
15:09:05.314 org.example.TestCyclicBarrier [pool-1-thread-1] - task2 begin...
15:09:07.322 org.example.TestCyclicBarrier [pool-1-thread-1] - task1, task2 finish...
15:09:07.322 org.example.TestCyclicBarrier [pool-1-thread-1] - task1 begin...
15:09:07.322 org.example.TestCyclicBarrier [pool-1-thread-2] - task2 begin...
15:09:09.323 org.example.TestCyclicBarrier [pool-1-thread-2] - task1, task2 finish...
Process finished with exit code 0