ConcurrentHashMap源码分析 您所在的位置:网站首页 剑心插件聊天锁 ConcurrentHashMap源码分析

ConcurrentHashMap源码分析

2023-03-03 01:42| 来源: 网络整理| 查看: 265

image.png 概述

ConcurrentHashMap(CHM)是日常开发中使用频率非常高的一种数据结构,想对于普通的HashMap,CHM提供了线程安全的读写,CHM里面使用了许多比较精妙的优化&操作。本文主要对CHM的整体结构、初始化,查找,插入等做分析。CHM在1.8之前和之后有比较大的变动,1.8之前主要通过Segment 分段锁 来解决并发问题,1.8及之后就没有这些臃肿的数据结构了,其数据结构与普通的HashMap一样,都是Node数组+链表+红黑树image.png一颗红黑树应满足如下性质:

根节点是黑色的 外部节点均为黑色(图中的 leaf 节点,通常在表述的时候会省略) 红色节点的孩子节点必为黑色(通常插入的节点为红色) 从任一外部节点到根节点的沿途,黑节点的数目相等

image.png除了上面基本的数据结构之外,Node节点也是一个需要关心的数据结构,Node节点本质上是单向链表的节点,其中包含key、value、Hash、next属性

static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; } 复制代码 ForwardingNode节点

ForwardingNode节点(简称fwd节点)继承自Node节点,主要用于扩容,该节点里面固定Hash值为MOVED(值为-1),同时持有新表的引用

static final class ForwardingNode extends Node { final Node[] nextTable; ForwardingNode(Node[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node find(int h, Object k) { ... } } 复制代码 TreeNode

TreeNode节点也继承自Node节点,用于表示红黑树上的节点,主要属性如下所示

static final class TreeNode extends Node { TreeNode parent; // 父节点 TreeNode left; // 左儿子 TreeNode right; // 右儿子 TreeNode prev; // 记录前驱节点,用于恢复链表 boolean red; } 复制代码 TreeBin

TreeBin节点内部持有TreeNode节点的引用,内部实现了读写锁用于控制多线程并发在红黑树上的操作,主要属性如下所示

static final class TreeBin extends Node { TreeNode root; // 红黑树根节点 volatile TreeNode first; // 链表根节点,读写分离时会用到 volatile Thread waiter; // 当前线程 volatile int lockState; // 当前红黑树的锁状态 // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // 读锁标记 } 复制代码 SizeCtl

除了数据结构需要说明外,SizeCtl也是理解CHM十分重要的一个字段,他是一个整数,不同的值表示不同的状态

当SizeCtl > 0时,表示下次扩展的阈值,其中阈值计算方式:数组长度 * 扩展阈值(注意这里是固定的0.75) 当SizeCtl = 0时,表示还没有开始初始化 当sizeCtl = -1是,表示此时正在进行初始化 当SizeCtl < -1时,表示此时正在进行扩展,其中高16位表示扩容标识戳,低16位表示参与扩容的线程数+1 初始化

CHM的初始化是惰性初始化的,即当我们使用ConCurrentHashMap map = new ConcurrentHashMap(20);创建一个CHM对象时,并不会真正的创建对象,而是只有在put时才会真正开始创建对象。

public ConcurrentHashMap(int initialCapacity) { // 只是检查参数是否合理,并设置好数组容量和扩容阈值 if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } 复制代码 初始化流程 private final Node[] initTable() { Node[] tab; int sc; // 判空,注意这里是while,当线程苏醒后会记性检查直到初始化完毕 while ((tab = table) == null || tab.length == 0) { // 如果其他线程正在初始化,则让出cpu if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 当前线程尝试获取创建数组的重任 try { // 这里需要再进行判断是否为空,防止当前线程创建完毕后又有其他线程进来重复创建 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n]; table = tab = nt; // 设置阈值为0.75n sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; } 复制代码 查找

get方法进行查找,针对不同情况有不同处理

public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; // 扰动运算 int h = spread(key.hashCode()); // 判断表是否为空,表长度是否为0,以及元素对应下标是否为空 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 判断当前下边下是否是我们要找到值 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) // 判断Node节点的Hash值是否小于0,如果小于0的话,则会在他的子类上进行查找 //这里情况比较复杂,不同的节点有不同的处理,如果当前节点为fwd节点,则去新表上找,如果为红黑树 //节点,则在红黑树上进行查找,后文会展开红黑树上的查找流程 return (p = e.find(h, key)) != null ? p.val : null; // 普通链表查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } 复制代码 插入 final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { // 注意这里是个死循环 Node f; int n, i, fh; // 判断是否初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 判断对应节点是否是空节点,如果是 //直接通过cas创建节点 if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 如果当前节点是fwd节点(正在扩容),则帮助扩容 tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { // 这里加锁 if (tabAt(tab, i) == f) { // 这里需要继续判断是否当前位置的节点没有变化,因为其他线程可能 // 改变此节点 if (fh >= 0) { // fh >= 0表示当前节点是链表节点,直接next往下找就行 binCount = 1; for (Node e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 如果此时节点是TreeBin节点,则需要再红黑树上进行插入,具体 // 插入流程后文展开 Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 判断是否需要树化 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 计数器加1,这里使用了计数器而不是AtomicLong这种 addCount(1L, binCount); return null; } 复制代码 扩容

CHM的扩容利用了多线程并发的去扩容CHM在两种条件下会发生扩容:

单个链表长度大于8,并且数组长度小于64时,会发生扩容 元素个数超过阈值会发生扩容

扩容流程:

创建新的Node表,长度为当前数组长度的两倍 从后往前分配任务区间,最小长度是16,即每个线程每次扩容最少需要迁移16个桶,具体迁移数量由cpu核数决定 判断当前元素是否为空,为空直接cas操作当前节点为fwd节点,否则判断当前元素是否为fwd节点,如果是,则说明其他线程再次区间扩容,此时需要重新选定区间,否则就对当前桶开始进行迁移 其他元素在put时如果发现当前桶位是fwd节点,会先协助扩容再put 最后一个扩容线程退出扩容时再次检查一遍旧桶,更新sizeCtl的值,同时引用新桶 private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; // 确定任务长度 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // 第一个扩容的线程需要创建新数组 try { @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n = bound || finishing) //当前任务是否完成 advance = false; else if ((nextIndex = transferIndex) stride ? nextIndex - stride : 0))) { //cas更新transferIndex bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { // 扩容完毕 int sc; if (finishing) { // 二次检查后引用新表 nextTable = null; table = nextTab; sizeCtl = (n >> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) = 0) { //链表节点 这里只需要判断对应位置是0还是1就可决定迁移到高桶位还是低桶位 int runBit = fh & n; Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { // 红黑树节点,通过判断对应位是否为0决定放到高 // 桶位还是低桶位 TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有