线程安全集合类

1、概述

  • 线程安全集合类可以分为三大类:

    • 遗留的线程安全集合如Hashtable、Vector。
    • 使用Collections装饰的线程安全集合,如:
      • Collections.synchronizedCollection
      • Collections.synchronizedList
      • Collections.synchronizedMap
      • Collections.synchronizedSet
      • Collections.synchronizedNavigableMap
      • Collections.synchronizedNavigableSet
      • Collections.synchronizedSortedMap
      • Collections.synchronizedSortedSet
    • java.util.concurrent.*
  • 重点介绍java.util.concurrent.*下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent。

    • Blocking大部分实现基于锁,并提供用来阻塞的方法。
    • CopyOnWrite之类容器修改开销相对较重。
    • Concurrent类型的容器。
      • 内部很多操作使用cas优化,一般可以提供较高吞吐量。
      • 弱一致性
        • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的。(遍历时如果发生了修改,对于非安全容器来讲,使用fail-fast机制也就是让遍历立刻失败,抛出 ConcurrentModificationException,不再继续遍历)
        • 求大小弱一致性,size操作未必是100%准确。
        • 读取弱一致性。

2、ConcurrentHashMap原理

2.1 JDK 7 HashMap并发死链

  • ①在jdk7环境下运行以下代码,测试HashMap扩容前后对于整数范围0~63有哪些数在桶下标为1的位置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class TestDeadLink {
    public static void main(String[] args) {
    System.out.println("长度为16时,桶下标为1的key");
    for (int i = 0; i < 64; i++) {
    if (hash(i) % 16 == 1) {
    System.out.println(i);
    }
    }
    System.out.println("长度为32时,桶下标为1的key");
    for (int i = 0; i < 64; i++) {
    if (hash(i) % 32 == 1) {
    System.out.println(i);
    }
    }
    }

    final static int hash(Object k) {
    int h = 0;
    if (0 != h && k instanceof String) {
    return sun.misc.Hashing.stringHash32((String) k);
    }
    h ^= k.hashCode();
    h ^= (h >>> 20) ^ (h >>> 12);
    return h ^ (h >>> 7) ^ (h >>> 4);
    }
    }

    执行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    长度为16时,桶下标为1的key
    1
    16
    35
    50
    长度为32时,桶下标为1的key
    1
    35
  • ②模拟扩容,即jdk7在向HashMap中添加元素的个数超过数组长度的3/4时会发生扩容(默认的初始容量为16,发生第一次扩容时变为32)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class TestDeadLink {
    public static void main(String[] args) {
    final HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
    // 放 12 个元素
    map.put(2, null);
    map.put(3, null);
    map.put(4, null);
    map.put(5, null);
    map.put(6, null);
    map.put(7, null);
    map.put(8, null);
    map.put(9, null);
    map.put(10, null);
    map.put(16, null);
    map.put(35, null);
    map.put(1, null);
    System.out.println("扩容前大小[main]:"+map.size());
    new Thread() {
    @Override
    public void run() {
    // 放第 13 个元素, 发生扩容
    map.put(50, null);
    System.out.println("扩容后大小[Thread-0]:"+map.size());
    }
    }.start();
    new Thread() {
    @Override
    public void run() {
    // 放第 13 个元素, 发生扩容
    map.put(50, null);
    System.out.println("扩容后大小[Thread-1]:"+map.size());
    }
    }.start();
    }
    }
  • ③在HashMap类的transfer()方法处加上断点(扩容时会调用此方法),并为了不想让其它无关线程也在此地方停下,需加上以下条件并将断点暂停类型设置为Thread:

    1
    2
    3
    4
    5
    newTable.length==32 &&
    (
    Thread.currentThread().getName().equals("Thread-0")||
    Thread.currentThread().getName().equals("Thread-1")
    )
  • ④打开debug模式运行程序,会来到线程Thread-0的此方法,且能看到此时HashMap中旧的table中桶下标为1中有1–>35–>16–>null的链表结构。会发现jdk7中采用的是头插法,即后加入链表的元素反而在最前面。

  • ⑤继续运行线程Thread-0到594行,并添加断点和以下条件:

    1
    2
    //此条件的目的是让线程Thread-1能够正常扩容结束,不在此断点处停下
    Thread.currentThread().getName().equals("Thread-0")

    同时会发现此时局部变量e(将要迁移的结点)的指针指向为1–>35–>16–>null,局部变量next(下一个结点)的指针指向为35–>16–>null,如下图。

  • ⑥在Threads面板选中线程Thread-1恢复运行,并让其运行完毕,此时Thread-1已经提前完成扩容机制,同时看到控制台中的第13个元素添加成功,这时又来到了线程Thread-0第594行处。

  • ⑦由于线程Thread-1的扩容完毕,导致此时线程Thread-0中的table已经变成是扩容后的新table了,此时新table桶下表为1的指针指向为35–>1–>null(因为刚才线程Thread-1扩容时是先遍历1再遍历35,从而使用头插法的结果是35反而在链表前面)。

    所以此时局部变量e的指针指向理所因当变成1–>null,而局部变量next指针指向变为35–>1–>null,如下图。(原先e指向的1和next指向的35的地址引用没有改变,但是1和35的next都被改变了)

  • ⑧按F8使线程Thread-0再次运行到594行代码,此时局部变量e的指针指向为35–>1–>null,而局部变量next指针指向为1–>null,且此时1已经被成功添加到newTable桶下标为1位置的链表头。

    再按一次F8使线程Thread-0再次运行到594行代码,此时局部变量e的指针指向为1–>null,而局部变量next指针指向null,且此时35已经被成功添加到newTable桶下标为1位置的链表头并且还链接着上一步加入的1。

    再按一次F8使线程Thread-0再次运行到594行代码,此时局部变量e的指针指向为null,局部变量next指针也指向null,且此时1再一次被添加到newTable桶下标为1位置的链表头并且还链接着上一步加入的35。此时新的newTable桶下标为1的死链已经产生,即不断以1–>35–>1–>35……无限制地循环下去导致程序崩溃。

  • 究其原因,是因为在多线程环境下使用了非线程安全的map集合。

  • JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能 够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)。

2.2 JDK 8 ConcurrentHashMap

  • ConcurrentHashMap中重要的属性和内部类有:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // 默认为 0
    // 当初始化时, 为 -1
    // 当扩容时, 为 -(1 + 扩容线程数)
    // 当初始化或扩容完成后,为 下一次的扩容的阈值大小
    private transient volatile int sizeCtl;

    // 整个 ConcurrentHashMap 就是一个 Node[]
    static class Node<K,V> implements Map.Entry<K,V> {}

    // hash 表
    transient volatile Node<K,V>[] table;

    // 扩容时的 新 hash 表
    private transient volatile Node<K,V>[] nextTable;

    // 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
    static final class ForwardingNode<K,V> extends Node<K,V> {}

    // 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
    static final class ReservationNode<K,V> extends Node<K,V> {}

    // 作为 treebin 的头节点, 存储 root 和 first
    static final class TreeBin<K,V> extends Node<K,V> {}

    // 作为 treebin 的节点, 存储 parent, left, right
    static final class TreeNode<K,V> extends Node<K,V> {}
  • 重要方法有:

    1
    2
    3
    4
    5
    6
    7
    8
    // 获取 Node[] 中第 i 个 Node
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)

    // cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)

    // 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
  • 构造器

    • 可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      //initialCapacity –初始容量。给定指定的负载系数,该实现将执行内部大小调整以容纳许多元素。
      //loadFactor –用于建立初始表大小的负载因子(表密度)
      //concurrencyLevel –并发更新线程的估计数量。该实现可以使用该值作为调整提示。
      public ConcurrentHashMap(int initialCapacity,
      float loadFactor, int concurrencyLevel) {
      if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
      throw new IllegalArgumentException();
      if (initialCapacity < concurrencyLevel) // Use at least as many bins
      initialCapacity = concurrencyLevel; // as estimated threads
      long size = (long)(1.0 + (long)initialCapacity / loadFactor);
      //tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ...,所以调用此构造器传入的initialCapacity不是实际的初始大小。
      int cap = (size >= (long)MAXIMUM_CAPACITY) ?
      MAXIMUM_CAPACITY : tableSizeFor((int)size);
      this.sizeCtl = cap;
      }
  • get()流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //spread 方法能确保返回结果是正数,因为负数有特殊用途
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
    (e = tabAt(tab, (n - 1) & h)) != null) {
    //如果头结点已经是要查找的 key
    if ((eh = e.hash) == h) {
    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
    return e.val;
    }
    //hash为负数表示该 bin 在扩容中是 ForwardingNode(-1) 或是 treebin(-2) , 这时调用 find 方法来查找
    else if (eh < 0)
    return (p = e.find(h, key)) != null ? p.val : null;
    //正常遍历链表, 用 equals 比较
    while ((e = e.next) != null) {
    if (e.hash == h &&
    ((ek = e.key) == key || (ek != null && key.equals(ek))))
    return e.val;
    }
    }
    return null;
    }
  • put()流程

    1
    2
    3
    public V put(K key, V value) {
    return putVal(key, value, false);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    //onlyIfAbsent为false时表示每次有相同key时用新的value覆盖旧的value
    final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //其中 spread 方法会综合高位低位, 具有更好的 hash 性
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
    //f 是链表头节点
    //fh 是链表头结点的 hash
    //i 是链表在 table 中的下标
    Node<K,V> f; int n, i, fh;
    //要创建 table
    if (tab == null || (n = tab.length) == 0)
    //初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环
    tab = initTable();
    //要创建链表头节点
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    //添加链表头使用了 cas, 无需 synchronized
    if (casTabAt(tab, i, null,
    new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
    }
    //判断头结点的hash是否==-1,如果是则说明有其它线程正在扩容,并且可以进入帮忙扩容逻辑
    else if ((fh = f.hash) == MOVED)
    //帮忙之后, 进入下一轮循环
    tab = helpTransfer(tab, f);
    //表明hashtable不是创建table、初始化链表以及扩容的情况,而是发生了桶下标冲突的情况,此时才要加锁
    else {
    V oldVal = null;
    //锁住链表头节点
    synchronized (f) {
    //再次确认链表头节点没有被移动
    if (tabAt(tab, i) == f) {
    //判断头结点的hashcode是否大于0,即如果是普通结点
    if (fh >= 0) {
    binCount = 1;
    //遍历链表
    for (Node<K,V> e = f;; ++binCount) {
    K ek;
    //找到相同的 key
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) {
    oldVal = e.val;
    //更新
    if (!onlyIfAbsent)
    e.val = value;
    break;
    }
    Node<K,V> pred = e;
    //已经是最后的节点了, 新增 Node, 追加至链表尾
    if ((e = e.next) == null) {
    pred.next = new Node<K,V>(hash, key,
    value, null);
    break;
    }
    }
    }
    //如果结点类型是红黑树
    else if (f instanceof TreeBin) {
    Node<K,V> p;
    binCount = 2;
    //putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNode
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    value)) != null) {
    oldVal = p.val;
    if (!onlyIfAbsent)
    p.val = value;
    }
    }
    }
    }
    if (binCount != 0) {
    //如果链表长度 >= 树化阈值(8), 进行链表转为红黑树
    if (binCount >= TREEIFY_THRESHOLD)
    treeifyBin(tab, i);
    if (oldVal != null)
    return oldVal;
    break;
    }
    }
    }
    //增加 size 计数,设置多个累加单元
    addCount(1L, binCount);
    return null;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
    Thread.yield(); // lost initialization race; just spin
    //尝试将 sizeCtl 设置为 -1(表示初始化 table)
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    //获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建
    try {
    if ((tab = table) == null || tab.length == 0) {
    //DEFAULT_CAPACITY==16
    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    @SuppressWarnings("unchecked")
    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    table = tab = nt;
    //此时sc代表下次要扩容的阈值
    sc = n - (n >>> 2);
    }
    } finally {
    sizeCtl = sc;
    }
    break;
    }
    }
    return tab;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    //check 是之前 binCount 的个数,即链表长度
    private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if (
    //已经有了 counterCells, 向 cell 累加
    (as = counterCells) != null ||
    //还没有, 向 baseCount 累加
    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (
    //还没有 counterCells
    as == null || (m = as.length - 1) < 0 ||
    //还没有 cell
    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
    //cell cas 增加计数失败
    !(uncontended =
    U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
    //创建累加单元数组和cell, 累加重试
    fullAddCount(x, uncontended);
    return;
    }
    if (check <= 1)
    return;
    //获取整个哈希表元素个数
    s = sumCount();
    }
    if (check >= 0) {
    Node<K,V>[] tab, nt; int n, sc;
    //判断元素个数是否大于等于扩容阈值
    while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    (n = tab.length) < MAXIMUM_CAPACITY) {
    int rs = resizeStamp(n);
    if (sc < 0) {
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    transferIndex <= 0)
    break;
    //newtable 已经创建了,帮忙扩容
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
    transfer(tab, nt);
    }
    //需要扩容,这时 newtable 未创建
    else if (U.compareAndSwapInt(this, SIZECTL, sc,
    (rs << RESIZE_STAMP_SHIFT) + 2))
    transfer(tab, null);
    s = sumCount();
    }
    }
    }
  • 哈希表元素个数计算流程

    • size计算实际发生在 put,remove 改变集合元素的操作之中。

      • 没有竞争发生,向baseCount累加计数。
      • 有竞争发生,新建counterCells,向其中的一个cell累加计数。
        • counterCells初始有两个cell。
        • 如果计数竞争比较激烈,会创建新的cell来累加计数。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      public int size() {
      long n = sumCount();
      return ((n < 0L) ? 0 :
      (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
      (int)n);
      }

      final long sumCount() {
      CounterCell[] as = counterCells; CounterCell a;
      //将 baseCount 计数与所有 cell 计数累加
      long sum = baseCount;
      if (as != null) {
      for (int i = 0; i < as.length; ++i) {
      if ((a = as[i]) != null)
      sum += a.value;
      }
      }
      return sum;
      }
  • 总结

    • JDK8的ConcurrentHashMap由数组(Node) +(链表 Node | 红黑树 TreeNode)组成,以下数组简称(table),链表简称(bin):
      • 初始化,使用cas来保证并发安全,懒惰初始化table。
      • 树化,当table.length<64时,先尝试扩容,超过64时,并且bin.length>8时,会将链表树化,树化过程会用synchronized锁住链表头。
      • put,如果该bin尚未创建,只需要使用cas创建bin;如果已经有了,锁住链表头进行后续put操作,元素添加至bin的尾部。
      • get,无锁操作仅需要保证可见性,扩容过程中get操作拿到的是ForwardingNode它会让get操作在新 table进行搜索。
      • 扩容,扩容时以bin为单位进行,需要对bin进行synchronized,但这时妙的是其它竞争线程也不是无事可做,它们会帮助把其它bin进行扩容,扩容时平均只有1/6的节点会把复制到新table中。
      • size,元素个数保存在baseCount中,并发时的个数变动保存在CounterCell[]当中。最后统计数量时累加即可。

2.3 JDK 7 ConcurrentHashMap

  • 它维护了一个segment数组,每个segment对应一把锁。

    • 优点:如果多个线程访问不同的segment,实际是没有冲突的,这与jdk8中是类似的。
    • 缺点:Segments数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化。
  • 构造器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public ConcurrentHashMap(int initialCapacity,
    float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
    throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
    concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    //ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
    }
    //移位属性,默认是 32 - 4 = 28
    this.segmentShift = 32 - sshift;
    //掩码属性,默认是 15 即 0000 0000 0000 1111
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
    ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
    cap <<= 1;
    // create segments and segments[0]
    //创建 segments and segments[0]
    Segment<K,V> s0 =
    new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
    (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
    }
    • 构造完成,如下图所示:

    • 可以看到ConcurrentHashMap没有实现懒惰初始化,空间占用不友好。

    • 其中this.segmentShift和this.segmentMask的作用是决定将key的hash结果匹配到哪个segment。

      • 例如,根据某一hash值求segment位置,先将高位向低位移动this.segmentShift位。

        结果再与this.segmentMask做位与运算,最终得到1010即下标为10的segment。

  • put()流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
    throw new NullPointerException();
    int hash = hash(key);
    //计算出 segment 下标
    int j = (hash >>> segmentShift) & segmentMask;
    //获得 segment 对象, 判断是否为 null, 是则创建该 segment
    if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
    (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
    //这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null
    //因此在 ensureSegment 里用 cas 方式保证该 segment 安全性
    s = ensureSegment(j);
    //进入 segment 的put 流程
    return s.put(key, hash, value, false);
    }
    • segment继承了可重入锁(ReentrantLock),它的put方法为:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      final V put(K key, int hash, V value, boolean onlyIfAbsent) {
      //尝试加锁
      ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null :
      //如果不成功, 进入 scanAndLockForPut 流程
      //如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程
      //在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来
      scanAndLockForPut(key, hash, value);
      //执行到这里 segment 已经被成功加锁, 可以安全执行
      V oldValue;
      try {
      ConcurrentHashMap.HashEntry<K,V>[] tab = table;
      //获得桶下标
      int index = (tab.length - 1) & hash;
      //根据桶下标找到链表的头结点
      ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index);
      for (ConcurrentHashMap.HashEntry<K,V> e = first;;) {
      if (e != null) {
      //更新
      K k;
      if ((k = e.key) == key ||
      (e.hash == hash && key.equals(k))) {
      oldValue = e.value;
      if (!onlyIfAbsent) {
      e.value = value;
      ++modCount;
      }
      break;
      }
      e = e.next;
      }
      else {
      //新增
      //之前等待锁时, node 已经被创建, next 指向链表头
      if (node != null)
      node.setNext(first);
      else
      //创建新 node
      node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first);
      int c = count + 1;
      //扩容
      if (c > threshold && tab.length < MAXIMUM_CAPACITY)
      rehash(node);
      else
      //将 node 作为链表头
      setEntryAt(tab, index, node);
      ++modCount;
      count = c;
      oldValue = null;
      break;
      }
      }
      } finally {
      unlock();
      }
      return oldValue;
      }
      • 其中扩容逻辑是rehash()方法。因为此时已经获得了锁,因此rehash时不需要考虑线程安全。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        private void rehash(ConcurrentHashMap.HashEntry<K,V> node) {
        ConcurrentHashMap.HashEntry<K,V>[] oldTable = table;
        int oldCapacity = oldTable.length;
        int newCapacity = oldCapacity << 1;
        threshold = (int)(newCapacity * loadFactor);
        ConcurrentHashMap.HashEntry<K,V>[] newTable =
        (ConcurrentHashMap.HashEntry<K,V>[]) new ConcurrentHashMap.HashEntry[newCapacity];
        int sizeMask = newCapacity - 1;
        for (int i = 0; i < oldCapacity ; i++) {
        ConcurrentHashMap.HashEntry<K,V> e = oldTable[i];
        if (e != null) {
        ConcurrentHashMap.HashEntry<K,V> next = e.next;
        int idx = e.hash & sizeMask;
        //如果该链表只有一个结点,则直接搬到相同的桶下标
        if (next == null) // Single node on list
        newTable[idx] = e;
        else { // Reuse consecutive sequence at same slot
        ConcurrentHashMap.HashEntry<K,V> lastRun = e;
        int lastIdx = idx;
        //遍历链表,将next的桶下标与当前结点的桶下标进行比较,如果不相同则记录,即尽可能把 rehash 后 idx 不变的节点重用
        for (ConcurrentHashMap.HashEntry<K,V> last = next;
        last != null;
        last = last.next) {
        int k = last.hash & sizeMask;
        if (k != lastIdx) {
        lastIdx = k;
        lastRun = last;
        }
        }
        newTable[lastIdx] = lastRun;
        // Clone remaining nodes
        //剩余节点需要新建
        for (ConcurrentHashMap.HashEntry<K,V> p = e; p != lastRun; p = p.next) {
        V v = p.value;
        int h = p.hash;
        int k = h & sizeMask;
        ConcurrentHashMap.HashEntry<K,V> n = newTable[k];
        newTable[k] = new ConcurrentHashMap.HashEntry<K,V>(h, p.key, v, n);
        }
        }
        }
        }
        //扩容完成, 才加入新的节点
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        //替换为新的 HashEntry table
        table = newTable;
        }
  • get()流程

    • get时并未加锁,用了UNSAFE方法保证了可见性,扩容过程中,get先发生就从旧表取内容,get后发生就从新 表取内容:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      public V get(Object key) {
      Segment<K,V> s; // manually integrate access methods to reduce overhead
      HashEntry<K,V>[] tab;
      int h = hash(key);
      //u 为 segment 对象在数组中的偏移量
      long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
      //s 即为 segment
      if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
      (tab = s.table) != null) {
      for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
      (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
      e != null; e = e.next) {
      K k;
      //如果找到就返回value
      if ((k = e.key) == key || (e.hash == h && key.equals(k)))
      return e.value;
      }
      }
      return null;
      }
  • size()流程

    • 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回。
    • 如果不一样,进行重试,重试次数超过3,将所有segment锁住,重新计算个数返回。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum; // sum of modCounts
    long last = 0L; // previous sum
    int retries = -1; // first iteration isn't retry
    try {
    for (;;) {
    //如果一直不一样则会从-1增长到2,即重试3次
    if (retries++ == RETRIES_BEFORE_LOCK) {
    //超过重试次数, 需要创建所有 segment 并加锁
    for (int j = 0; j < segments.length; ++j)
    ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
    //modCount表示最近修改的次数
    sum += seg.modCount;
    //c代表每个seg内元素的个数
    int c = seg.count;
    //如果溢出,统计失败
    if (c < 0 || (size += c) < 0)
    overflow = true;
    }
    }
    //如果一致则说明这期间没有多线程干扰
    if (sum == last)
    break;
    last = sum;
    }
    } finally {
    //解锁
    if (retries > RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    segmentAt(segments, j).unlock();
    }
    }
    return overflow ? Integer.MAX_VALUE : size;
    }

2.4 总结

  • 整体结构:
    • jdk7:以数组+分段锁(segement)+链表的方式实现,Segment继承自ReentrantLock。
    • jdk8:放弃了Segment 臃肿的设计,取而代之的是采用Node+CAS+Synchronized来保证并发安全进行实现,synchronized只锁定当前链表或红黑二叉树的首节点。
  • put()方法。
    • jdk7:先定位到相应的Segment下标,判断是否为null,是则创建该segment然后再进行put操作。put操作中首先会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁(最多自旋64次,还获取不到锁则进入lock流程)。紧接着获取HashEntry的桶下标,遍历该桶下标下的链表结构,存在相同key则覆盖原来的value,否则进入判断是否扩容逻辑,如果不需要扩容则直接头插法插入,如果需要扩容则扩容后再用头插法进行插入。
    • jdk8:如果table为空使用CAS进行初始化table。再定位到桶下标,拿到头节点后进行判断,如果为空则CAS插入;如果不为空,则判断头结点的hash是否==-1,如果是则说明有其它线程正在扩容,并且可以进入帮忙扩容逻辑。否则说明发生了桶下标冲突,使用synchronized锁住头节点后进入添加结点流程,即如果是链表则进行尾插法插入后判断如果达到临界值则转换为红黑树;如果是树则进行红黑树方式插入。之后调用addCount()对元素个数进行累加并进入判断是否扩容逻辑。
  • get()方法。
    • jdk7:根据key计算出hash值定位到具体的Segment,再根据hash值获取定位HashEntry对象,并对HashEntry对象进行链表遍历,找到对应元素。由于HashEntry涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以每次获取时都是最新值(全程并未加锁)。
    • jdk8:根据key计算出hash值,判断数组和对应桶下标是否为空,如果不为空则判断如果是首节点,就直接返回;如果是红黑树结构,就从红黑树里面查询;如果是链表结构,循环遍历判断。
  • 扩容方法。
    • jdk7:调用的是rehash(node)方法,因为此时已经获得了锁,因此rehash时不需要考虑线程安全。首先创建一个2倍原数组空间的HashEntry,遍历原HashEntry,如果该链表只有一个结点,则直接搬到相同的桶下标;否则从头结点开始遍历,找到第一个后续所有节点在新table中index保持不变的节点,直接将这个节点以及它后续的链表中内容全部直接复用copy到newTable中,之后将剩余结点重新按照新容量计算相应的桶下标后创建新对象插入到newTable中。
    • jdk8:调用的是transfer(tab, null)方法,首先new 出新Node<K,V>[]节点数组,容量为原来的2倍,接着进入核心扩容逻辑,扩容逻辑和HashMap的resize()方法对链表或者红黑树的迁移逻辑差不多,不同的地方是会设置一个步长,每个线程根据步长大小拿到结点进行扩容,当某个线程对节点进行扩容后,会将节点转换为ForwardingNode节点类型(表示正在被迁移的Node,它的key,value,next都为null,hash为-1,其中有个nextTable属性指向新tab[]),别的线程拿到了ForwardingNode类型节点,则不再对此节点扩容。
  • size()方法。
    • jdk7:计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回。如果不一样,进行重试,重试次数超过3,将所有segment锁住,重新计算个数返回。
    • jdk8:使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount。发生竞争时部分元素的变化个数保存在CounterCell数组中,通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数。

3、LinkedBlockingQueue原理

  • 内部基本结构:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static class Node<E> {E item;
    /**
    * 下列三种情况之一
    * - 真正的后继节点
    * - 自己, 发生在出队时
    * - null, 表示是没有后继节点, 是最后了
    */
    Node<E> next;
    Node(E x) { item = x; }
    }
    }
  • 初始化链表, Dummy节点用来占位,item为null。

    1
    2
    3
    4
    5
    public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
    }
  • 当一个节点入队last = last.next = node:

    • ①put(E e):

      • 队列已满,阻塞等待。
      • 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      public void put(E e) throws InterruptedException {
      if (e == null) throw new NullPointerException();
      int c = -1;
      Node<E> node = new Node<E>(e);
      final ReentrantLock putLock = this.putLock;
      // count 用来维护元素计数
      final AtomicInteger count = this.count;
      // 获取锁中断
      putLock.lockInterruptibly();
      try {
      //判断队列是否已满,如果已满阻塞等待
      while (count.get() == capacity) {
      //等待notFull
      notFull.await();
      }
      // 把node放入队列中
      enqueue(node);
      c = count.getAndIncrement();
      // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
      if (c + 1 < capacity)
      notFull.signal();
      } finally {
      putLock.unlock();
      }
      // 如果队列中有一条数据,唤醒消费线程进行消费
      if (c == 0)
      // 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争
      signalNotEmpty();
      }
      1
      2
      3
      private void enqueue(Node<E> node) {
      last = last.next = node;
      }
    • ②offer(E e):

      • offer仅仅对put方法改动了一点点,当队列没有可用元素的时候,不同于put方法的阻塞等待,offer方法直接方法false。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      public boolean offer(E e) {
      if (e == null) throw new NullPointerException();
      final AtomicInteger count = this.count;
      if (count.get() == capacity)
      return false;
      int c = -1;
      Node<E> node = new Node<E>(e);
      final ReentrantLock putLock = this.putLock;
      putLock.lock();
      try {
      // 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间,
      // 如果有,唤醒下一个添加线程进行添加操作。
      if (count.get() < capacity) {
      enqueue(node);
      c = count.getAndIncrement();
      if (c + 1 < capacity)
      notFull.signal();
      }
      } finally {
      putLock.unlock();
      }
      if (c == 0)
      signalNotEmpty();
      return c >= 0;
      }
  • 再来一个节点入队 last = last.next = node:

  • 出队

    • ①E take():

      • 队列为空,阻塞等待。
      • 队列不为空,从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果出之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      public E take() throws InterruptedException {
      E x;
      int c = -1;
      final AtomicInteger count = this.count;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lockInterruptibly();
      try {
      // 队列为空,阻塞等待
      while (count.get() == 0) {
      // 等待notEmpty
      notEmpty.await();
      }
      // 有空位, 入队且计数加一
      x = dequeue();
      c = count.getAndDecrement();
      // 队列中还有元素,唤醒下一个消费线程进行消费
      if (c > 1)
      notEmpty.signal();
      } finally {
      takeLock.unlock();
      }
      // 移除元素之前队列是满的,唤醒生产线程进行添加元素
      if (c == capacity)
      // 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争
      signalNotFull();
      return x;
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      private E dequeue() {
      // 获取到head节点
      Node<E> h = head;
      // 获取到head节点指向的下一个节点
      Node<E> first = h.next;
      // head节点原来指向的节点的next指向自己,等待下次gc回收
      h.next = h; // help GC
      // head节点指向新的节点
      head = first;
      // 获取到新的head节点的item值
      E x = first.item;
      // 新head节点的item值设置为null
      first.item = null;
      return x;
      }
      • ①Node h = head:

      • ②Node first = h.next:

      • ③h.next = h:

      • ④head = first:

      • ⑤E x = first.item–>first.item = null–>return x:

    • ②E poll():

      • poll方法去除了take方法中元素为空后阻塞等待这一步骤。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
        return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
        if (count.get() > 0) {
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
        notEmpty.signal();
        }
        } finally {
        takeLock.unlock();
        }
        if (c == capacity)
        signalNotFull();
        return x;
        }
  • 总结入队出队:

    • 用了两把锁和dummy节点。

      • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行。
      • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行。
      • 消费者与消费者线程仍然串行。
      • 生产者与生产者线程仍然串行。
    • 线程安全分析:

      1
      2
      3
      4
      5
      // 用于 put(阻塞) offer(非阻塞)
      private final ReentrantLock putLock = new ReentrantLock();

      // 用户 take(阻塞) poll(非阻塞)
      private final ReentrantLock takeLock = new ReentrantLock();
      • 当节点总数大于2时(包括 dummy 节点),putLock保证的是last节点的线程安全,takeLock保证的是head节点的线程安全。两把锁保证了入队和出队没有竞争。
      • 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争。
      • 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞。
    • LinkedBlockingQueue与ArrayBlockingQueue的性能比较。

      • Linked支持有界,Array强制有界。
      • Linked实现是链表,Array实现是数组。
      • Linked是懒惰的,而Array需要提前初始化Node数组。
      • Linked每次入队会生成新Node,而Array的Node是提前创建好的。
      • Linked两把锁,Array一把锁。

4、ConcurrentLinkedQueue原理

  • ConcurrentLinkedQueue的设计与LinkedBlockingQueue非常像,也是:

    • 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行。
    • dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争。
    • 只是这【锁】使用了cas来实现。
  • 事实上,ConcurrentLinkedQueue应用还是非常广泛的。

    • 例如之前讲的Tomcat的Connector结构时,Acceptor作为生产者向Poller消费者传递事件信息时,正是采用了ConcurrentLinkedQueue将SocketChannel给Poller使用。