您的位置:首页 > 科技 > 能源 > 【ConcurrentHashMap】JDK1.8版本源码解读与分析

【ConcurrentHashMap】JDK1.8版本源码解读与分析

2024/10/6 4:09:17 来源:https://blog.csdn.net/wdx7770/article/details/141096449  浏览:    关键词:【ConcurrentHashMap】JDK1.8版本源码解读与分析

ConcurrentHashMap(1.8)

HashTable 是早期的线程安全的哈希表, 但是锁的范围太大了, 其 put, get 方法都有 synchronized 关键字修饰, 锁的范围是 hashtable 对象, 并发度太低;

JDK1.7 的 ConcurrentHashMap ( 以下简称为 CHM ), 锁的范围是一个段, 段的数量可以在构造的时候指定, 又称并发级别;

JDK1.7版本 CHM 的详细实现原理移步【ConcurrentHashMap】JDK1.7版本源码解读与分析

但是1.7版本的 CHM 并发度还是不够高, 能不能进一步减小锁的粒度? 如果能锁 Entry 数组的一个下标, 并发度岂不是又可以提升一大截?

JDK1.8 版本 CHM 的实现方案, 就是通过锁住 Entry 数组的一个下标来实现的;

对 Entry 数组或哈希表原理有疑问请移步HashMap源码解读与分析;

JDK 1.8 版本的 CHM, 整体架构上来说, 和 JDK1.8 版本的 HashMap 是基本一致的, 只不过在一些细节上做了特殊处理, 重要的部分下文都会提到; 下文提到的 HashMap 和 CHM, 均指JDK1.8版本的实现;

关于红黑树, ConcurrentHashMap的数组中放入的不是TreeNode结点,而是将TreeNode包装起来的TreeBin对象

哈希值

CHM 对 key 计算哈希值的方法是 spread 方法, 在 HashMap 的基础上, 多了一个最高位恒置为 0 的操作;

  • 为什么要异或自己右移16位? 哈希表使用哈希值的时候, 是用来对数组长度取模的(虽然最终是通过取与实现的), 这样的话, 一个哈希值, 就只有低若干位有效, 具体取决于数组的长度; 通过异或自己右移 16 位, 可以将高位的信息带入到低位, 增加扰动;
  • 例如 Float 类型, 使用 IEEE754 编码, 底层是一个长 32 位的二进制值, 其 hashCode 方法返回的就是底层编码本身, 没有经过任何处理; 连续的 Float 类型的整数, 他们的二进制编码的低几位很有可能都是一样的(这是由 IEEE754 的特性导致的), 这样的话他们进行散列的时候, 一定会碰撞; 通过移位异或, 可以减少这种类型的碰撞;
  • 为什么最高位无效(恒为0)? 因为 CHM 会将结点的 Hash 值设置为负数来表示一些特殊状态, -1表示ForwardingNode结点,-2表示TreeBin结点
  • ForwardingNode节点是Node节点的子类,hash值固定为-1,只在扩容 transfer的时候出现,当旧数组中某个位置的全部节点都迁移到新数组中时,就在旧数组中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容。
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;
}

PUT

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 计算出 key 的哈希值int hash = spread(key.hashCode());int binCount = 0;// 死循环, 一次循环内代码只会进入四个分支中的一个;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 分支一: 新创建的CHM首次添加元素时, 才初始化底层数组;if (tab == null || (n = tab.length) == 0)tab = initTable();// 分支二: 计算散列到的下标, 使用tabAt通过 Unsafe 实现 volatile 地读取数组对应下标位置的元素// 如果对应下标为空, 通过 CAS 的方式实现线程安全的添加; // CAS 失败则会进入下次循环;else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}// 分支三: 如果对应位置是 ForwardingNode, 其哈希值为-1, 说明CHM正在扩容中, 当前下标已经迁移;// 当前线程去辅助进行扩容;else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);// 分支四: 当前位置已经有键值对了, 可能链表, 也可能是红黑树; else {V oldVal = null;// 这个分支, 使用 sychronized 保证线程安全, 锁的是下标上的第一个元素;// JDK1.6引入锁升级, synchronized 的性能已经比以前高很多了;synchronized (f) {// 防止其它线程改变了链表头结点; 如果改变了, 重新进入循环;if (tabAt(tab, i) == f) {// 当前是链表, 按链表的逻辑插入; 这里已经是尾插法了;if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value, null);break;}}}// 如果是红黑树结点, 走红黑树的插入逻辑;else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}} // sync 代码块结束// 这里还是分支四; 如果插入后链表结点数量 >= 9, 树化;if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)// 树化也是 synchroniezed, 也是锁第一个元素;treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 修改 baseCount, baseCount 记录的是元素个数// 很多线程同时添加的时候, 并发修改 baseCount, 都去用 CAS 的方式对 baseCount 进行修改// CAS 失败则转而去修改自己对应的 CounterCells;// CounterCells 是一个数组, 不同过的线程生成与当前线程相关的一个随机数(同一个线程多次生成的话, 也不会变), 散列到 CounterCells 的一个下标// 然后 CAS 的方式修改这个下标上的计数值;// 这样, 冲突的概率就大大降低了;// 调用 size 方法获取元素个数时, 会用 baseCount + 遍历CounterCells中的所有元素的值;// 这里面会检查是否需要扩容;// binCount < 0 时不会检查// remove 的时候也会调用 addCount, 调用addCount(-1, binCount);addCount(1L, binCount);return null;
}

初始化

// -1 表示正在初始化或扩容; -N 表示在参与扩容的线程个数为 N - 1 个;
// = 0 表示刚构造出CHM, 还没分配空间给底层数组;
// > 0表示下次进行扩容的阈值; 装填因子0.75
private transient volatile int sizeCtl;private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 如果当前已经在初始化, 让出时间片, 一直等到别人把数组初始化完毕if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// 没人在初始化, CAS 尝试获取初始化权利;else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {// 双重判断;if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;// 下次扩容的阈值 sc = cap * 3/4, 即装填因子 = 0.75; sc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab;
}

GET

可以看出, get 的逻辑全程没有涉及到加锁的操作, 在这种情况下, 如何保证其他线程的修改能立即可见? 如何保证扩容时不会因为元素转移而漏掉数据?

一, Node 也就是 Entry, 其 val 成员和 next 成员, 都由 volatile 修饰, 其它线程所做的修改或添加, 能够立即被感知;

需要注意, 底层的 Node 数组虽然加了 volatile, 但只能保证引用的修改具有可见性, 而非其中元素具有可见性

transient volatile Node<K,V>[] table;

Node 数组被 volatile 修饰, 能保证扩容后 table 引用的变化能够立即被感知;

二, 另外, 如果 get 的时候正在扩容, 读到 ForwardingNode 时会被转发到 nextTable 执行读操作;

ForwardingNode节点是Node节点的子类,hash值固定为-1,只在扩容 transfer的时候出现;

当旧数组中全部的节点都迁移到新数组中时,就在旧数组对应位置放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到 nextTable数组上去执行,写操作碰见它时,则尝试帮助转移元素, 转移完了再put。

三, 如果正在扩容, 但是当前结点又没有完成迁移, 没有放置 ForwardingNode, 怎么办? 不用担心, 迁移的过程并不是将Node一个一个从oldTable移除, 添加到 nextTable, 而是先复制到 nextTable 上, 最后再删除 oldTable 中的引用;

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());  // 计算哈希值if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {  // 计算桶的位置并获取对应的节点if ((eh = e.hash) == h) {  // 如果节点的哈希值与计算值匹配// 这里与 HashMap 判断键是否相同的逻辑是一样的;if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val;  // 返回对应的值}// 当前是 ForwardingNode 或 TreeBinelse if (eh < 0)  return (p = e.find(h, key)) != null ? p.val : null;  // 查找链表或树中节点的值while ((e = e.next) != null) {  // 遍历链表中可能存在的其他节点if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;  // 找到匹配的节点,返回对应的值}}return null;  // 没有找到匹配的节点,返回 null
}// TreeBin的find 方法
final Node<K,V> find(int h, Object k) {if (k != null) {for (Node<K,V> e = first; e != null; ) {int s; K ek;if (((s = lockState) & (WAITER|WRITER)) != 0) {if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;e = e.next;}else if (U.compareAndSwapInt(this, LOCKSTATE, s,s + READER)) {TreeNode<K,V> r, p;try {p = ((r = root) == null ? null :r.findTreeNode(h, k, null));} finally {Thread w;if (U.getAndAddInt(this, LOCKSTATE, -READER) ==(READER|WAITER) && (w = waiter) != null)LockSupport.unpark(w);}return p;}}}return null;
}// 构造 ForwardingNode 的时候, 会把当前 CHM 的 nextTable 给它;
// ForwardingNode 的find 方法就不多说了, 就是到 nextTable 上去寻找;
// 找的时候套娃, 还会判断是不是 hash < 0, 是 TreeBin 就到 TreeBin 里去找; 是 ForwardingNode 就到再nextTable中找;

扩容

使用了一个 volatile 的辅助数组进行扩容;

private transient volatile Node<K,V>[] nextTable;

支持多线程扩容; 每个线程负责转移某些下标;

转移的时候从右往左扫描, transferIndex 代表待转移的下标 + 1;

每个线程会负责迁移一定长度的区间内的下标, 转移完了以后尝试去取下一个转移范围;

可以理解为分段转移元素, 每个线程负责若干个段; 段的默认最小长度 (用变量 stride 表示)是 16, 也就是说, 如果当前 CHM 长度为 16, 那么最多只能有一个线程参与扩容;

每个线程开始转移的时候, 都会确认自己负责的边界,同时会更新 transferIndex, transferIndex 的初始值为扩容前数组的长度, 第一个参与扩容的线程, 由 transferIndex 计算出自己负责的范围, 即[tranferIndex - stride, transferIndex - 1], 然后将 transferIndex - stride 赋给 transferIndex;

转移时, 会用 synchronized 锁住当前下标的第一个结点;

一个下标转移完成后, 给扩容前的数组的对应位置, 插入一个 ForwardingNode

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com