- 作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
- 系列专栏:Java规划模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列
- 假如感觉博主的文章还不错的话,请三连支持一下博主哦
- 博主正在尽力完成2023计划中:以梦为马,扬帆起航,2023追梦人
- 联系方式:hls1793929520,加我进群,大家一同学习,一同前进,一同对立互联网隆冬
一、导言
ConcurrentHashMap
技能在互联网技能运用如此广泛,几乎所有的后端技能面试官都要在 ConcurrentHashMap
技能的运用和原理方面临小伙伴们进行 360 的刁难。
作为一个在互联网公司面一次拿一次 Offer
的面霸,打败了许多竞争对手,每次都只能看到许多落寞的身影失望的脱离,略感愧疚(请允许我运用一下夸大的修辞手法)。
所以在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开端写 《吊打面试官》 系列,希望能帮忙各位读者今后边试势如破竹,对面试官进行 360
的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer
!
尽管现在是互联网隆冬,但天地未定,你我皆是黑马!
本篇文章大约读取时刻十分钟,带你彻底把握 ConcurrentHashMap
的 魔鬼式的多线程扩容 逻辑,目录如下:
二、扩容流程
咱们上一篇文章讲了关于 ConcurrentHashMap
的存储流程:美团二面:聊聊ConcurrentHashMap的存储流程,了解了其数据是如何的放到数组、链表、红黑树中
今日这篇文章,咱们要点讲一下 ConcurrentHashMap
魔鬼式的多线程扩容逻辑
系好安全带,咱们发车了!
1、treeifyBin办法触发扩容
// 在链表长度大于等于8时,测验将链表转为红黑树
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
// 数组不能为空
if (tab != null) {
// 数组的长度n,是否小于64
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 假如数组长度小于64,不能将链表转为红黑树,先测验扩容操作
// 这儿的扩容传入的数据是当时数据的2倍
tryPresize(n << 1);
// 省掉部分代码……
}
}
2、tryPreSize办法-针对putAll的初始化操作
private final void tryPresize(int size) {
// 假如当时传过来的size大于最大值MAXIMUM_CAPACITY >>> 1,则取 MAXIMUM_CAPACITY
// 反之取 tableSizeFor(size + (size >>> 1) + 1)= size的1.5倍 + 1
// tableSizeFor:这个办法咱们之前也讲过,找size的最近二次幂
// 这个地方的c
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 当时的sizeCtl大于等于0,证明不属于异常状态
// static final int MOVED = -1; // 代表当时hash方位的数据正在扩容!
// static final int TREEBIN = -2; // 代表当时hash方位下挂载的是一个红黑树
// static final int RESERVED = -3; // 预留当时索引方位……
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 假如当时的数组为空,走初始化的逻辑
// 有人可能会好奇,这儿怎样会为空呢?
// 大家能够去看下putAll的源码(放在下面),源码里边榜首句调用的便是tryPresize的办法
if (tab == null || (n = tab.length) == 0) {
// 进来履行初始化
// sc是初始化长度,初始化长度假如比核算出来的c要大的话,直接运用sc,假如没有sc大,
// 阐明sc无法容纳下putAll中传入的map,运用更大的数组长度
n = (sc > c) ? sc : c;
// 仍是老配方,CAS将SIZECTL测验从sc变成-1
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// Step1:判别引证
// Step2:创建数组
// Step3:修正sizeCtl数值
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 假如核算出来的长度c假如小于等于sc,直接退出循环完毕办法
// 数组长度大于等于最大长度了,直接退出循环完毕办法
// 不需求扩容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 省掉部分代码
}
}
// putAll办法
public void putAll(Map<? extends K, ? extends V> m) {
// 直接走扩容的逻辑
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}
3、tryPreSize办法-核算扩容戳
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table;
// 当时数组的长度
int n;
// 判别引证是否共同
if (tab == table) {
// 核算扩容标识戳,根据当时数组的长度核算一个16位的扩容戳
// 1、确保后边的SIZECTL赋值是负数
// 2、记录当时从什么长度开端扩容
int rs = resizeStamp(n);
// 这儿存在bug
// 因为咱们上面(sc = sizeCtl) >= 0的判别,sc只能大于0
// 这儿永远大于等于0
if (sc < 0) {
// 帮忙扩容的代码
}
// 代表当时没有线程进行扩容,我是榜首个扩容的
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
// 核算扩容标识戳
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
4、tryPreSize办法-对sizeCtl的修正
private final void tryPresize(int size) {
// sc默认为sizeCtl
while ((sc = sizeCtl) >= 0) {
else if (tab == table) {
// rs:扩容戳 00000000 00000000 10000000 00011010
int rs = resizeStamp(n);
if (sc < 0) {
// 阐明有线程正在扩容,过来帮忙扩容
// 咱们之前榜首个线程扩容的时候,将sizeCtl设置成:10000000 00011010 00000000 00000010
// 所以:sc = 10000000 00011010 00000000 00000010
Node<K,V>[] nt;
// 仍然有BUG
// 当时线程扩容时,老数组长度是否和我当时线程扩容时的老数组长度共同
// 00000000 00000000 10000000 00011010
if ((sc >>> RESIZE_STAMP_SHIFT) != rs
// 10000000 00011010 00000000 00000010
// 00000000 00000000 10000000 00011010
// 这两个判别都是有问题的,核心问题就应该先将rs左移16位,再追加当时值。
// 这两个判别是BUG
// 判别当时扩容是否现已行将完毕
|| sc == rs + 1 // sc == rs << 16 + 1 BUG
// 判别当时扩容的线程是否达到了最大限度
|| sc == rs + MAX_RESIZERS // sc == rs << 16 + MAX_RESIZERS BUG
// 扩容现已完毕了。
|| (nt = nextTable) == null
// 记录搬迁的索引方位,从高位往低位搬迁,也代表扩容行将完毕。
|| transferIndex <= 0)
break;
// 假如线程需求帮忙扩容,首要便是对sizeCtl进行+1操作,代表当时要进来一个线程帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 上面的判别没进去的话,nt就代表新数组
transfer(tab, nt);
}
// 是榜首个来扩容的线程
// 根据CAS将sizeCtl修正为:10000000 00011010 00000000 00000010
// 将扩容戳左移16位之后,符号位是1,就代码这个值为负数
// 低16位在表示当时正在扩容的线程有多少个:低16位为2,代表当时有一个线程正在扩容
// 为什么低16位值为2时,代表有一个线程正在扩容
// 每一个线程扩容完毕后,会对低16位进行-1操作,当最终一个线程扩容完毕后,减1的成果仍是-1,
// 当值为-1时,要对老数组进行一波扫描,检查是否有遗失的数据没有搬迁到新数组
else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
// 调用transfer办法,而且将第二个参数设置为null,就代表是榜首次来扩容!
transfer(tab, null);
}
}
}
5、transfer办法-核算每个线程搬迁的长度
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 数组长度
int n = tab.length;
// 每个线程一次性搬迁多少数据到新数组
int stride;
// 根据CPU的线程数来核算:NCPU = Runtime.getRuntime().availableProcessors()
// MIN_TRANSFER_STRIDE = 16
// NCPU = 4
// 举个例子:N = 1024 - 512 - 256 - 128 / 4 = 32
// 假如算出来每个线程的长度小于16的话,直接运用16
// 假如大于16的话,则运用N
// 假如线程数只有1的话,直接便是原数组长度
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE){
stride = MIN_TRANSFER_STRIDE;
}
}
6、 transfer办法-构建新数组并检查标识特点
// 以32位长度数组扩容到64位为例子
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 数组长度
int n = tab.length;
// 每个线程一次性搬迁多少数据到新数组
int stride;
// 榜首个进来扩容的线程需求把新数组构建出来
if (nextTab == null) {
try {
// 将原数组长度左移一位创建新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
// 赋值操作
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
// 将成员变量的新数组赋值volatile润饰
nextTable = nextTab;
// 搬迁数据时用到的标识volatile润饰
transferIndex = n;
}
// 新数组长度
int nextn = nextTab.length;
// 老数组搬迁完数据后,做的标识
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 搬迁数据时,需求用到的标识
boolean advance = true;
boolean finishing = false;
}
7、transfer办法-线程收取搬迁使命
// 以32位长度数组扩容到64位为例子
// 数组长度
int n = tab.length;
// 每个线程一次性搬迁多少数据到新数组
int stride;
// 搬迁数据时用到的标识volatile润饰
transferIndex = n;
// 新数组长度
int nextn = nextTab.length;
// 老数组搬迁完数据后,做的标识
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance:true,代表当时线程需求接纳使命,然后再履行搬迁
// 假如为false,代表现已接纳完使命
boolean advance = true;
// 标识搬迁是否完毕
boolean finishing = false;
// 开端循环
for (int i = 0, bound = 0;;) {
Node<K,V> f;
int fh;
while (advance) {
int nextIndex;
int nextBound;
// 榜首次:这儿必定进不去
// 首要判别当时使命是否履行完毕
if (--i >= bound || finishing)
advance = false;
// 榜首次:这儿必定也进不去
// 判别transferIndex是否小于等于0,代表没有使命可收取,完毕了。
// 在线程收取使命会,会对transferIndex进行修正,修正为transferIndex - stride
// 在使命都收取完之后,transferIndex必定是小于等于0的,代表没有搬迁数据的使命能够收取
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// nextIndex=32
// stride=16
// nextIndex > stride ? nextIndex - stride : 0:当时的nextIndex是否大于每个线程切开的
// 是:nextIndex - stride/否:0
// 将TRANSFERINDEX从nextIndex变成16
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {
// 对bound赋值(16)
bound = nextBound;
// 对i赋值(31)
i = nextIndex - 1;
// 赋值,代表当时收取使命完毕
// 该线程当时收取的使命是(16~31)
advance = false;
}
}
}
线程收取扩容使命的流程:
8、transfer办法-扩容是否现已完毕
// 判别扩容是否现已完毕!
// i < 0:当时线程没有接纳到使命!
// i >= n: 搬迁的索引方位,不可能大于数组的长度,不会建立
// i + n >= nextn:因为i最大值便是数组索引的最大值,不会建立
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 榜首次到这儿:必定是false
// 假如再次到这儿的时候,最终一个扩容的线程也扫描完了
if (finishing) {
nextTable = null;
table = nextTab;
// 重新更改当时的sizeCtl的数值:0.75N
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 到这儿代表当时线程现已不需求去扩容了
// 那么它要把当时并发扩容的线程数减一
// SIZECTL ----> sc - 1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判别当时这个线程是不是最终一个扩容线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 假如不是的话,直接回来就好了
return;
// 假如是的话,当时线程还需求将旧数据悉数扫描一遍,校验一下是不是都搬迁完了
finishing = advance = true;
// 将当时的i重新变成原数组的长度
// 重新进行一轮校验
i = n; // recheck before commit
}
}
9、 transfer办法-搬迁数据(链表)
// 假如当时的i方位上没有数据,直接不需求搬迁
else if ((f = tabAt(tab, i)) == null)
// 将本来i方位上CAS标记成fwd
// fwd的Hash为Move
advance = casTabAt(tab, i, null, fwd);
// // 拿到当时i方位的hash值,假如为MOVED,证明数据现已搬迁过了。
else if ((fh = f.hash) == MOVED)
// 那么直接回来即可,首要是最终一个扩容线程扫描的时候运用
advance = true; // already processed
else {
// 锁住
synchronized (f) {
// 再次校验
if (tabAt(tab, i) == f) {
// ln:null - lowNode
Node<K,V> ln;
// hn:null - highNode
Node<K,V> hn;
// 假如当时的fh是正常的
if (fh >= 0) {
// 求当时的runBit
// 这儿只会有两个成果:0 或许 n
int runBit = fh & n;
// 用当时lastRun指向f
Node<K,V> lastRun = f;
// 进行链表的遍历
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 求每一个链表的p.hash & n
int b = p.hash & n;
// 假如当时的b不等于上一个节点
// 需求更新下runBit的数据和lastRun的指针
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 假如最终runBit=0的话
// lastRun赋值给ln
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 假如最终runBit=0的话
// lastRun赋值给hn
else {
hn = lastRun;
ln = null;
}
// 从数组头节点开端一直遍历到lastRun方位
for (Node<K,V> p = f; p != lastRun; p = p.next) {
// hash/key/value
int ph = p.hash; K pk = p.key; V pv = p.val;
// 假如当时的是0,则挂到ln下面
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
// 假如当时的是1,则挂到hn下面
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将新数组i方位设置成ln
setTabAt(nextTab, i, ln);
// 将新数组i+N方位设置成hn
setTabAt(nextTab, i + n, hn);
// 将本来数组的i方位设置成fwd,代表搬迁完毕
setTabAt(tab, i, fwd);
// 将advance设置成true,确保进行上面的while循环
// 履行上面的i--,进行下一节点的搬迁计划
advance = true;
}
}
}
}
}
9.1 LastRun机制
假如当时我需求将数组从 16 扩容至 32,咱们看下本来的结构:
当咱们进行下面这一步时:
int runBit = fh & n;
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 求每一个链表的p.hash & n
int b = p.hash & n;
// 假如当时的b不等于上一个节点
// 需求更新下runBit的数据和lastRun的指针
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
图片如下:紫色代表 0
,粉色代表 1
在搬迁之前,咱们要把握一个常识,因为咱们的数组是 2
倍扩容,所以原始数据的数据一定会落在新数组的 2
和 N + 2
的方位
最终将咱们的数组从原始数组直接搬迁过来,因为 lastRun
之后方位的数据都是相同的hash,所以直接全量搬迁即可,不需求挨个遍历,这也是实施 lastRun
的原因,削减时刻复杂度
最终搬迁效果如上,搬迁完毕,削减 i
的下标,继续搬迁下一个数组的方位。
10、帮忙扩容
// 假如当时的hash是正在扩容
if ((fh = f.hash) == MOVED){
// 进行协作式扩容
tab = helpTransfer(tab, f);
}
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab;
int sc;
// 榜首个判别:老数组不为null
// 第二个判别:新数组不为null (将新数组赋值给nextTab)
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 拿到扩容戳
int rs = resizeStamp(tab.length);
// 榜首个判别:fwd中的新数组,和当时正在扩容的新数组是否持平。
// 持平:能够帮忙扩容。不持平:要么扩容完毕,要么开启了新的扩容
// 第二个判别:老数组是否改变了。 持平:能够帮忙扩容。不持平:扩容完毕了
// 第三个判别:假如正在扩容,sizeCtl必定为负数,而且给sc赋值
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 判别扩容戳是否共同
// 看下transferIndex是否小于0,假如小于的话,阐明扩容的使命现已被领完了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 假如还有使命的话,CAS将当时的SIZECTL加一,帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 帮忙扩容的代码,上面刚刚剖析过
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
三、流程图
四、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一同前进。互相毫无保留的分享经历,才是对立互联网隆冬的最佳挑选。
其实许多时候,并不是咱们不够尽力,很可能便是自己尽力的方向不对,假如有一个人能稍微指点你一下,你真的可能会少走几年弯路。
假如你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一同学习,一同成长
我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜爱后端架构和中间件源码。
咱们下期再见。
我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一同共勉。
往期文章推荐:
- 美团二面:聊聊ConcurrentHashMap的存储流程
- 从源码全面解析Java 线程池的来龙去脉
- 从源码全面解析LinkedBlockingQueue的来龙去脉
- 从源码全面解析 ArrayBlockingQueue 的来龙去脉
- 从源码全面解析ReentrantLock的来龙去脉
- 阅读完synchronized和ReentrantLock的源码后,我竟发现其十分相似
- 从源码全面解析 ThreadLocal 关键字的来龙去脉
- 从源码全面解析 synchronized 关键字的来龙去脉