༺歲月蹉跎༻

只要路是对的,就不怕路远!

0%

CAS

7、CAS

7.1 java.util.concurrent.atomic

1654768267737

7.2 多线程环境不使用原子类保证线程安全i++(基本数据类型)

1654768282315

7.3 多线程环境使用原子类保证线程安全i++(基本数据类型)

1654768299055

7.4 CAS说明

  • CAS即compare and swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。
  • 它包含三个操作数——内存位置、预期原值及更新值。
  • 执行CAS操作的时候,将内存位置的值与预期原值比较:
    • 如果相匹配,那么处理器会自动将该位置值更新为新值,
    • 如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。

7.5 CAS原理

  • CAS(CompareAndSwap),CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来。当它重来重试的这种行为成为—-自旋!!

    1654768389503

7.6 CASDemo代码

1
2
3
4
5
6
7
8
9
10
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(5);

System.out.println(atomicInteger.compareAndSet(5, 2022) + "\t" + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 2022) + "\t" + atomicInteger.get());

atomicInteger.getAndIncrement();
}
}

1654768412126

7.7 硬件级别保证

  • CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。它是非阻塞的且自身具有原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。

  • CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现独占的,比起用synchronized重量级锁,这里的排他时间要短很多,所以在多线程情况下性能会比较好。

7.8 源码分析compareAndSet(int expect,int update)

1654768458127

1654768462026

7.9 Unsafe

1654768479755

  • Unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。

    • 注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用用操作系统底层资源热行相应任务。
  • 变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。

    1654768536285

  • 变量value用volatile修饰,保证了多线程之间的内存可见性。

7.10 atomiclnteger.getAndlncrement()

  • CAS的全称为Compare-And-Swap,它是一条CPU并发原语。 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。 Atomiclnteger类主要利用CAS(compare and swap)+volatile和native方法来保证原子操作,从而避免synchronized的高开销,执行效率大为提升。

    1654768593496

  • CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。

7.11 new AtomicInteger(). getAndIncrement();

1654768622390

1654768627511

  • 假设线程A和线程B两个线程同时执行getAndAddlnt操作(分别跑在不同CPU上):
    • Atomiclnteger里面的value原始值为3,即主内存中Atomiclnteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存。
    • 线程A通过getlntVolatile(var1,var2)拿到value值3,这时线程A被挂起。
    • 线程B也通过getilntVolatile(var1,var2)方法获取到value值3,此时刚好线程B没有被挂起并执行compareAndSwaplnt方法比较内存值也为3,成功修改内存值为4,线程B打完收工,一切OK。
    • 这时线程A恢复,执行compareAndSwaplnt方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值已经被其它线程抢先一步修改过了,那A线程本次修改失败,只能重新读重新来一遍了。
    • 线程A重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwaplnt进行比较替换,直到成功。

7.12 native修饰的方法代表是底层方法

1654768667103

7.13 cmpxchg

1654768679502

7.14 在不同的操作系统下会调用不同的cmpxchg重载函数

1654768707670

7.15 CAS总结

  • 你只需要记住:CAS是靠硬件实现的从而在硬件层面提升效率,最底层还是交给硬件来保证原子性和可见性实现方式是基于硬件平台的汇编指令,在intel的CPU中(X86机器上),使用的是汇编指令cmpxchg指令。
  • 核心思想就是:比较要更新变量的值V和预期值E(compare),相等才会将V的值设为新值N(swap)如果不相等自旋再来。

7.16 AtomicReferenceDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Getter
@ToString
@AllArgsConstructor
class User {
String userName;
int age;
}

public class AtomicReferenceDemo {
public static void main(String[] args) {
AtomicReference<User> atomicReference = new AtomicReference<>();

User z3 = new User("z3", 22);
User li4 = new User("li4", 28);

atomicReference.set(z3);

System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());
}
}

1654768781487

7.17 自旋锁是什么

  • CAS是实现自旋锁的基础,CAS利用CPU指令保证了操作的原子性,以达到锁的效果,至于自旋呢,看字面意思也很明白,自己旋转。是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。

    1654768818147

    1654768823073

7.18 自己实现一个自旋锁SpinLockDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t" + "----come in");
while (!atomicReference.compareAndSet(null, thread)) {

}
}

public void unLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t" + "----task over,unLock...");
}

public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();

new Thread(() -> {
spinLockDemo.lock();
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.unLock();
}, "A").start();

//暂停500毫秒,线程A先于B启动
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(() -> {
spinLockDemo.lock();

spinLockDemo.unLock();
}, "B").start();
}
}

1654768847382

7.19 循环时间长开销很大

  • 我们可以看到getAndAddlnt方法执行时,有个do while。

    1654768959091

  • 如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。

7.20 ABA问题怎么产生的

  • CAS会导致“ABA问题”。

  • CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。

  • 比如说一个线程1从内存位置V中取出A,这时候另一个线程2也从内存中取出A,并且线程2进行了一些操作将值变成了B,然后线程2又将V位置的数据变成A,这时候线程1进行CAS操作发现内存中仍然是A,预期OK,然后线程1操作成功。

  • 尽管线程1的CAS操作成功,但是不代表这个过程就是没有问题的。

7.21 简单case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@NoArgsConstructor
@AllArgsConstructor
@Data
class Book {
private int id;
private String bookName;
}

public class AtomicStampedDemo {
public static void main(String[] args) {
Book javaBook = new Book(1, "javaBook");

AtomicStampedReference<Book> stampedReference = new AtomicStampedReference<>(javaBook, 1);

System.out.println(stampedReference.getReference() + "\t" + stampedReference.getStamp());

Book mysqlBook = new Book(2, "mysqlBook");

boolean b;
b = stampedReference.compareAndSet(javaBook, mysqlBook, stampedReference.getStamp(), stampedReference.getStamp() + 1);

System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());


b = stampedReference.compareAndSet(mysqlBook, javaBook, stampedReference.getStamp(), stampedReference.getStamp() + 1);

System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());
}
}

1654769014875

7.22 ABADemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class ABADemo {
static AtomicInteger atomicInteger = new AtomicInteger(100);
static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);

public static void main(String[] args) {
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号:" + stamp);

//暂停500毫秒,保证后面的t4线程初始化拿到的版本号和我一样
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t" + "2次流水号:" + stampedReference.getStamp());

stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t" + "3次流水号:" + stampedReference.getStamp());

}, "t3").start();

new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号:" + stamp);

//暂停1秒钟线程,等待上面的t3线程,发生了ABA问题
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

boolean b = stampedReference.compareAndSet(100, 2022, stamp, stamp + 1);

System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());

}, "t4").start();

}

private static void abaHappen() {
new Thread(() -> {
atomicInteger.compareAndSet(100, 101);
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.compareAndSet(101, 100);
}, "t1").start();

new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(100, 2022) + "\t" + atomicInteger.get());
}, "t2").start();
}
}

1654769037323