༺歲月蹉跎༻

只要路是对的,就不怕路远!

0%

原子操作类之18罗汉增强

8、原子操作类之18罗汉增强

8.1 是什么

1654778836896

1654778841414

8.2 基本类型原子类Case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class MyNumber {
AtomicInteger atomicInteger = new AtomicInteger();

public void addPlusPlus() {
atomicInteger.getAndIncrement();
}
}


/**
* @auther zzyy
* @create 2022-02-25 21:59
*/
public class AtomicIntegerDemo {
public static final int SIZE = 50;

public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);

for (int i = 1; i <= SIZE; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 1000; j++) {
myNumber.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
//等待上面50个线程全部计算完成后,再去获得最终值

//暂停几秒钟线程
//try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

countDownLatch.await();

System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get());
}
}

1654778876059

8.3 数组类型原子类Case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AtomicIntegerArrayDemo {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});

for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}

System.out.println();

int tmpInt = 0;

tmpInt = atomicIntegerArray.getAndSet(0, 1122);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));

tmpInt = atomicIntegerArray.getAndIncrement(0);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
}
}

8.4 自旋锁SpinLockDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 题目:实现一个自旋锁,复习CAS思想
* 自旋锁好处:循环比较获取没有类似wait的阻塞。
* <p>
* 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
* 当前有线程持有锁,所以只能通过自旋等待,直到A释放锁后B随后抢到。
*/
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t" + "----come in");
while (!atomicReference.compareAndSet(null, thread)) {

}
}

public void unLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t" + "----task over,unLock...");
}

public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();

new Thread(() -> {
spinLockDemo.lock();
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.unLock();
}, "A").start();

//暂停500毫秒,线程A先于B启动
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(() -> {
spinLockDemo.lock();

spinLockDemo.unLock();
}, "B").start();
}
}

1654778969180

8.5 ABADemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class ABADemo {
static AtomicInteger atomicInteger = new AtomicInteger(100);
static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);

public static void main(String[] args) {
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号:" + stamp);

//暂停500毫秒,保证后面的t4线程初始化拿到的版本号和我一样
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t" + "2次流水号:" + stampedReference.getStamp());

stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t" + "3次流水号:" + stampedReference.getStamp());

}, "t3").start();

new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号:" + stamp);

//暂停1秒钟线程,等待上面的t3线程,发生了ABA问题
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

boolean b = stampedReference.compareAndSet(100, 2022, stamp, stamp + 1);

System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());

}, "t4").start();

}

private static void abaHappen() {
new Thread(() -> {
atomicInteger.compareAndSet(100, 101);
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.compareAndSet(101, 100);
}, "t1").start();

new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(100, 2022) + "\t" + atomicInteger.get());
}, "t2").start();
}
}

1654778998479

8.6 状态戳(true/false)原子引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* CAS----Unsafe----do while+ABA---AtomicStampedReference,AtomicMarkableReference
* <p>
* AtomicStampedReference,version号,+1;
* <p>
* AtomicMarkableReference,一次,false,true
*/
public class AtomicMarkableReferenceDemo {
static AtomicMarkableReference markableReference = new AtomicMarkableReference(100, false);

public static void main(String[] args) {
new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t" + "默认标识:" + marked);
//暂停1秒钟线程,等待后面的T2线程和我拿到一样的模式flag标识,都是false
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
markableReference.compareAndSet(100, 1000, marked, !marked);
}, "t1").start();

new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t" + "默认标识:" + marked);

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean b = markableReference.compareAndSet(100, 2000, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t" + "t2线程CASresult: " + b);
System.out.println(Thread.currentThread().getName() + "\t" + markableReference.isMarked());
System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference());
}, "t2").start();
}
}

1654779023392

8.7 AtomicIntegerFieldUpdaterDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class BankAccount//资源类
{
String bankName = "CCB";

//更新的对象属性必须使用 public volatile 修饰符。
public volatile int money = 0;//钱数

public void add() {
money++;
}

//因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须
// 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
AtomicIntegerFieldUpdater<BankAccount> fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");

//不加synchronized,保证高性能原子性,局部微创小手术
public void transMoney(BankAccount bankAccount) {
fieldUpdater.getAndIncrement(bankAccount);
}
}

/**
* 以一种线程安全的方式操作非线程安全对象的某些字段。
* <p>
* 需求:
* 10个线程,
* 每个线程转账1000,
* 不使用synchronized,尝试使用AtomicIntegerFieldUpdater来实现。
*/
public class AtomicIntegerFieldUpdaterDemo {
public static void main(String[] args) throws InterruptedException {
BankAccount bankAccount = new BankAccount();
CountDownLatch countDownLatch = new CountDownLatch(10);

for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 1000; j++) {
//bankAccount.add();
bankAccount.transMoney(bankAccount);
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}

countDownLatch.await();

System.out.println(Thread.currentThread().getName() + "\t" + "result: " + bankAccount.money);
}
}

1654779078640

8.8 AtomicReferenceFieldUpdaterDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class MyVar //资源类
{
public volatile Boolean isInit = Boolean.FALSE;

AtomicReferenceFieldUpdater<MyVar, Boolean> referenceFieldUpdater =
AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");

public void init(MyVar myVar) {
if (referenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + "\t" + "----- start init,need 2 seconds");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "----- over init");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "----- 已经有线程在进行初始化工作。。。。。");
}
}
}


/**
* 需求:
* 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,
* 要求只能被初始化一次,只有一个线程操作成功
*/
public class AtomicReferenceFieldUpdaterDemo {
public static void main(String[] args) {
MyVar myVar = new MyVar();

for (int i = 1; i <= 5; i++) {
new Thread(() -> {
myVar.init(myVar);
}, String.valueOf(i)).start();
}
}
}

1654779100514

8.9 阿里要命题目

1654779123218

8.10 原子操作增强类常用API

1654779151521

8.11 LongAdderAPIDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class LongAdderAPIDemo {
public static void main(String[] args) {
LongAdder longAdder = new LongAdder();

longAdder.increment();
longAdder.increment();
longAdder.increment();

System.out.println(longAdder.sum());

LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
@Override
public long applyAsLong(long left, long right) {
return left + right;
}
}, 0);

longAccumulator.accumulate(1);//1
longAccumulator.accumulate(3);//4

System.out.println(longAccumulator.get());
}
}

1654779223457

8.12 LongAdder高性能对比Code演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ClickNumber //资源类
{
int number = 0;

public synchronized void clickBySynchronized() {
number++;
}

AtomicLong atomicLong = new AtomicLong(0);

public void clickByAtomicLong() {
atomicLong.getAndIncrement();
}

LongAdder longAdder = new LongAdder();

public void clickByLongAdder() {
longAdder.increment();
}

LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);

public void clickByLongAccumulator() {
longAccumulator.accumulate(1);
}

}

/**
* @auther zzyy
* 需求: 50个线程,每个线程100W次,总点赞数出来
*/
public class AccumulatorCompareDemo {
public static final int _1W = 10000;
public static final int threadNumber = 50;

public static void main(String[] args) throws InterruptedException {
ClickNumber clickNumber = new ClickNumber();
long startTime;
long endTime;

CountDownLatch countDownLatch1 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch2 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch3 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch4 = new CountDownLatch(threadNumber);

startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.clickBySynchronized();
}
} finally {
countDownLatch1.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch1.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickBySynchronized: " + clickNumber.number);

startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.clickByAtomicLong();
}
} finally {
countDownLatch2.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByAtomicLong: " + clickNumber.atomicLong.get());


startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.clickByLongAdder();
}
} finally {
countDownLatch3.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAdder: " + clickNumber.longAdder.sum());

startTime = System.currentTimeMillis();
for (int i = 1; i <= threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * _1W; j++) {
clickNumber.clickByLongAccumulator();
}
} finally {
countDownLatch4.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAccumulator: " + clickNumber.longAccumulator.get());

}
}

1654779254703

8.13 LongAdder架构

1654779277147

1654779284880

8.14 官网说明和阿里要求

1654779315216

1654779324379

8.15 Striped64有几个比较重要的成员函数

1654779343634

8.15.1 最重要2个

1654779367328

8.16 Striped64中一些变量或者方法的定义

1654779384794

8.17 Cell是java.util.concurrent.atomic下Striped64的一个内部类

1654779414327

8.18 LongAdder为什么这么快

  • LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

  • sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

    1654779447604

    1654779458872

8.19 longAdder.increment()源码解读深度分析

8.19.1 小总结

  • LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零分散热点的做法,用空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和base都加起来作为最终结果。

    1654779504321

    1654779508489

8.19.2 add(1L)

1654779537429

8.19.3 上述小总结

1654779657019

1654779661821

1654779667136

1654779671141

8.19.4 longAccumulate入参说明

1654779691515

8.19.5 线程hash值:probe

1654779724742

1654779730196

8.19.6 总纲

1654779748446

1654779752912

8.19.7 未初始化过Cell数组,尝试占有锁并首次初始化cells数组

1654779782461

1654779797621

8.19.8 多个线程尝试CAS修改失败的线程会走到这个分支

1654779814826

  • 该分支实现直接操作base基数,将值加到base上,也即其它线程正在初始化,多个线程正在更新base的值。

1654779832348

8.19.9 Cell数组不再为空且可能存在Cell数组扩容

8.19.9.1 总体代码

1654779867917

8.19.9.2 ①

1654779883627

1654779931496

8.19.9.3 ②

1654779950659

8.19.9.4 ③

1654779968034

8.19.9.5 ④

1654779985642

8.19.9.6 ⑤

1654780001335

8.19.9.7 ⑥

1654780024498

8.19.9.8 上6步骤总结

1654780040440

1654780045751

1654780049624

8.19.10 sum

  • sum()会将所有Cell数组中的value和base累加作为返回值。
  • 核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

8.19.10.1 为啥在并发情况下sum的值不精确

  • sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。

  • 首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。

    1654780129930

8.20 AtomicLong的自旋会成为瓶颈

  • N个线程CAS操作修改线程的值,每次只有一个成功过,其它N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。