ConcurrentHashMap 是咱们日常开发中使用频率最高的并发容器之一了,具有如下特点:
基于JDK8剖析
- 存储结构和
HashMap
相同,都是数组
+链表
+红黑树
- 是
线程安全
的容器,底层是经过CAS
自旋 +sychronized
来保证的 - key 和 value 都不答应为空,不然将抛出空指针异常
- 比照
HashTalbe
,锁的粒度控制在数组的桶元素上 - 能够多线程帮忙扩容
接下来从源码的角度来剖析下 ConcurrentHashMap。
1. 要害特点
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/**
* 数组的最大长度 (和HashMap相同)
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 数组的默许长度 (和HashMap相同)
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 数组中的元素个数最大值,超越这个阈值将抛出OOM
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* The default concurrency level for this table. Unused but defined for
* compatibility with previous versions of this class.
*
* 翻译过来是默许的并发级别,可是并没有使用
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 加载因子
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 当链表长度达到8,转成红黑树
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 当红黑树的长度小于6,转成链表
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 当链表长度大于8,可是数组table长度没有达到64,此刻不会转成红黑树,而是扩容
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 最小的搬迁步长,当多线程并发扩容时,每个线程担任的搬迁的最小桶的个数
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
* 表明并发扩容最大线程数
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* The bit shift for recording size stamp in sizeCtl.
* 和扩容相关,用来标识sizeCtl的符号位
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
//旧数组搬迁后,桶位会放入ForwardingNode节点,hash值固定为-1
static final int MOVED = -1; // hash for forwarding nodes
//红黑树在桶中元素的节点(TreeBin)的hash值固定为-2
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
//保证hash值都是正数
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/*
* 处理器的核数
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* table
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; //value是volatile的
volatile Node<K,V> next; //next是volatile的
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//.......
}
/* ---------------- Fields -------------- */
/**
* table数组,长度总是2的N次幂
*/
transient volatile Node<K,V>[] table;
/**
* 扩容时使用的
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 记载元素个数的基准值
*/
private transient volatile long baseCount;
/**
* table数组初始化和扩容的控制符,不同值有不同的意义:
* 1.假设值为-1,则表明table数组正在初始化;-(n + 1) 表明表明此刻有n个线程正在一起完结数组的扩容操作
* 2.假设值为0,表明table数组还没有被初始化,默许长度是16
* 3.值为正数,假设数组没有初始化,则记载的是数组的初始容量;假设数组现已初始化,记载的是数组的扩容阈值(capacity * loadFactor)
*
* 这个值是十分重要的
*/
private transient volatile int sizeCtl;
/**
* 搬迁时的索引
*/
private transient volatile int transferIndex;
/**
* 自旋锁标志位
* 在在调整CounterCell[]大小和或创立CounterCell[]时使用。
*/
private transient volatile int cellsBusy;
/**
* 当put元素并进行计数时,优先对baseCount进行加1操作,可是并发度高的状况下,只有一个线程能
* 成功,其他失利的线程将会使用CounterCell进行核算。
* 这样元素的总个数 = baseCount + CounterCell[]数组各个下标对应值的总和
*/
private transient volatile CounterCell[] counterCells;
}
sizeCtl 特点侧重阐明一下:
- 假设值为 -1,则表明table数组正在
初始化
;-(n + 1) 表明表明此刻有多个线程正在一起完结数组的扩容操作 - 假设值为0,表明table数组还没有被初始化,默许长度是16
- 值为正数,假设数组没有初始化,则记载的是数组的初始容量;假设数组现已初始化(table != null),记载的是数组的扩容阈值(capacity * loadFactor)
2.构造器剖析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/**
* 假设使用的空参数构造器,那么默许的长度16
*/
public ConcurrentHashMap() {
}
/**
* 指定初始容量
* 假设初始容量大于(最大值/2 = 2^29),那么初始容量便是默许的最大值2^30
* 不然去初始容量的(1.5倍+1),然后核算出大于该值的最小的2的N次幂
*
*
* 一起,将初始容量赋给sizeCtl, 表明数组还没有初始化
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
//.....其他构造器......
}
tableSizeFor()
办法并不生疏,前面讲HashMap的时分,咱们有提到,不过二者在核算数组的初始容量稍有差异,HashMap
直接对指定容量进行位运算,核算出大于等于指定容量的最小的2的N次幂;而ConcurrentHashMap
是先对指定容量进行(1.5倍 + 1
), 然后对核算后的值进行位运算,得到大于该值的最小的2的N次幂。
HashMap
: 7 ——> 8,17 ——> 32,16 ——> 16,1 ——> 1ConcurrentHashMap
: 7 ——> 16,17 ——> 32,32 ——> 64,1 ——> 2
3.put()办法剖析
测试数据:
public class Test {
public static void main(String[] args) {
//此刻初始容量是64,由于内部的table还没有初始化,所以sizeCtl特点值为初始容量64
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(32);
map.put("abc", 12);
}
}
源码剖析:
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value 都不答应为空
if (key == null || value == null) throw new NullPointerException();
/**
* 经过扰动函数,核算出hash值,高16位也参加运算,这个和HashMap相同
* 不过核算出成果后,它会 &0x7fffffff,其意图是,这样核算出来的hash值都是正数!
*/
int hash = spread(key.hashCode());
int binCount = 0;
//死循环,调配CAS做自旋用的
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//假设数组是空的,则需求初始化table数组 ①
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//数组不为空,核算当时key在数组中桶的下标,假设为空,则表明没有放元素,则经过cas设置元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; //设置成功,break
}
//假设桶中元素的hash值为-1,表明当时数组正在扩容
else if ((fh = f.hash) == MOVED)
//当时线程帮忙扩容
tab = helpTransfer(tab, f);
else { //不然就需求刺进到链表或许红黑树中
V oldVal = null;
//对数组中的桶节点元素加锁
synchronized (f) {
//double check, 防止其他线程将树转成了红黑树,或许其他线程移除了该元素
if (tabAt(tab, i) == f) {
/**
* fh >= 0, 表明一般的链表节点
* 能走到这儿,阐明数组中的桶元素不为空,一起hash值也不是-1
* 一起,假设是红黑树,则桶中元素放到的是TreeBin节点,它的hash值固定是-2,
* 所以,假设 fh>=0,则是一般链表
*/
if (fh >= 0) {
binCount = 1;
//整个for循环是处理链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//刺进到红黑树中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//假设链表长度大于等于8,则将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//元素个数+1
addCount(1L, binCount);
return null;
}
接下来咱们研究一下功用点的细节办法
3.1 表的初始化——initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//while循环+CAS形成自旋
while ((tab = table) == null || tab.length == 0) {
/*
* 1.假设值为-1,则表明table数组正在初始化;-(n + 1) 表明表明此刻有n个线程正在一起完结数组的扩容操作
* 2.假设值为0,表明table数组还没有被初始化,默许长度是16
* 3.值为正数,假设数组没有初始化,则记载的是数组的初始容量;假设数组现已初始化,记载的是数组的扩容阈值(capacity * loadFactor)
*
*/
//假设值小于0,阐明正在初始化或许扩容,此刻当时的线程就直接放弃CPU的使用权
//由运行状况变为就绪状况,再次获得CPU的控制权后接着往下履行,这儿也便是持续履行while循环
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
/**
* 在测试数据中,咱们指定了初始容量,在没有初始化的状况下,sizeCtl等于初始容量也便是64 > 0
* 使用CAS将sizeCtl修改为-1
*
* U.compareAndSwapInt(this, SIZECTL, sc, -1)
* 假设SIZECTL(便是sizeCtl在内存中的偏移量)和sc相等,则将SIZECTL(便是sizeCtl)修改为-1
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
/*
* 为什么要double check ?
* 由于finally 中 sizeCtl = sc; 此刻 sizeCtl是扩容阈值,其他线程
* 来到else if 判别为true, 则也能够进来,这样就重复初始化了
*/
if ((tab = table) == null || tab.length == 0) {
//这儿的sc仍是初始容量,请看指定容量的构造器办法
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
/**
* 重新核算sc: 由于table现已初始化了,所以这sc值在这儿便是扩容阈值了
*
* n - (n >>> 2) = n - n/4 = 3/4(n) = 0.75 * n
* sc = 0.75 * table.length
*/
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
初始化完结之后,就不走其他判别了,直接重新循环,此刻table现已不为空了,然后根据key核算在数组中的索引,判别是否为空,假设为空,则经过CAS刺进元素。
3.2 CAS刺进元素
这儿比较简略,假设数组中该索引方位没有元素,则直接经过CAS刺进即可;假设刺进失利,则阐明有其他线程先一步刺进了该方位,则CAS失利,此刻就重新for循环然后刺进到链表方位。
接下来咱们先看下假设呈现hash抵触怎么解决,帮忙扩容晚点再探究。
3.3 hash抵触
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value 都不答应为空
if (key == null || value == null) throw new NullPointerException();
/**
* 经过扰动函数,核算出hash值,高16位也参加运算,这个和HashMap相同
* 不过核算出成果后,它会 &0x7fffffff,其意图是,这样核算出来的hash值都是正数!
*/
int hash = spread(key.hashCode());
int binCount = 0;
//死循环,调配CAS做自旋用的
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//....省掉其他else if........
//hash碰撞的处理逻辑
else {
V oldVal = null;
//对数组中的桶节点元素加锁
synchronized (f) {
//double check, 防止其他线程将树转成了红黑树,或许其他线程移除了该元素
if (tabAt(tab, i) == f) {
/**
* fh >= 0, 表明一般的链表节点
* 能走到这儿,阐明数组中的桶元素不为空,一起hash值也不是-1
* 一起,假设是红黑树,则桶中元素放到的是TreeBin节点,它的hash值固定是-2,
* 所以,假设 fh>=0,则是一般链表
*/
if (fh >= 0) {
binCount = 1;
//处理链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//刺进到红黑树中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//假设链表长度大于等于8,则将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//元素个数+1
addCount(1L, binCount);
return null;
}
不难发现,假设呈现hash抵触,则锁住当时桶位,比照HashTable
, 锁的粒度直接从锁整个表下降到锁表中的某一个桶位,并发度将大大提高。假设是链表,则采用尾插法刺进到链表尾部;假设是红黑树,则将元素刺进到红黑树中;假设链表长度 >=8,则将链表转成红黑树。接下来咱们简略看下链表转成红黑树的逻辑:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
/**
* 链表长度大于8,可是数组长度没有达到64,此刻不会转成红黑树,而是先扩容
* 这一点和HashMap相同
*/
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1); //扩容2倍,扩容的逻辑放到后边再说
/**
* 1.找到桶中的元素,而且不为空
* 2.hash值要大于0,为什么?
* 由于桶中元素的hash值小于0,只有两种状况
* 1)数组正在扩容,旧数组搬迁后的桶的方位会放一个 ForwardingNode 节点,它的hash值是-1
* 2)桶中的元素现已是树了,此刻桶中元素的hash值是-2
*/
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
//锁住当时桶中的元素,开端树化
synchronized (b) {
/**
* double check
* 为什么要double check ?
* 由于有或许当时线程进来了else if, 可是还没有履行sychronized获得锁之前,有其他线程
* 现已获得了这个锁,然后开端树化,树化后节点元素或许随着左旋和右旋而改变,所以这儿必需求double check
*/
if (tabAt(tab, index) == b) {
//整个for循环就做一件事,将当时的单向链表转成双向链表
//hd 是双向链表的头节点
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
/**
* 将当时桶的方位设置为 TreeBin 节点,然后将双向链表经过构造器传入到
* 内部,然后再内部将双向链表转成红黑树。
* 留意:桶中的TreeBin 的hash值是 -2
*/
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
咱们用一张图来展示下红黑树的存储结构:
3.4 核算元素个数
咱们看下addCount()
办法的内部完结:
这个办法内部首要做两件事:一个是核算元素个数,另一个是扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
/**
* 1.核算元素个数
* 优先对baseCount进行加,可是高并发下,只能有一个线程经过CAS增加成功,其他的线程CAS会失利
* 此刻那些失利的线程就会越过对baseCount的运算,而是去对CounterCell[]数组中的元素进行计数
* 这样总的元素个数 = baseCount + CounterCell[]数组各个元素的和
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
/**
* 假设check >= 0, 则需求判别是否扩容,由于remove()的话,也会调用调用这个办法
* 传入的是 -1,此刻是不需求扩容的。
*
*/
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
咱们先用一张图来全体概括下计数的逻辑:
咱们来详细剖析下核算元素个数的逻辑:
参考JUC下的原子类
LongAdder
,他们的逻辑是一模相同的。使用了分段CAS的思维
/**
* 1.核算元素个数
* 优先对baseCount进行加,可是高并发下,只能有一个线程经过CAS增加成功,其他的线程CAS会失利
* 此刻那些失利的线程就会越过对baseCount的运算,而是去对CounterCell[]数组中的元素进行计数
* 这样总的元素个数 = baseCount + CounterCell[]数组各个元素的和
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
/**
* 不难发现,一旦并发比较高,对baseCount加失利后,就会对counterCells计数,之后就都会
* 对 counterCells 计数,不在对baseCount计数了。
*
* 一旦CAS对baseCount计数成功,就退出了
* 一旦CAS对baseCount计数失利,取反便是true,开端初始化counterCells然后计数
*
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//对counterCells计数的中心逻辑
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//......先省掉扩容的逻辑........
}
接下来咱们看下对 counterCells
计数的逻辑:
中心逻辑首要便是集中在这3个判别中:
- 第一个判别首要是对
counterCells
数组进行扩容,每次也是以2的N次幂进行扩容,初始容量是2 - 第二个判别首要是初始化
counterCells
数组 - 第三个判别是假设对
counterCells
计数失利了,回过头来在对baseCount
计数,吃回头草。
接下来咱们在详细展开看看:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
/**
* h: 能够了解为一个随机数,用来计数counterCells数组索引方位的
* h = ThreadLocalRandom.getProbe()) == 0 阐明还没有初始化,调用localInit()初始化,产生一个随机数
*/
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
//碰撞的意思
boolean collide = false; // True if last slot nonempty(假设最终一个槽位非空则为True)
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
/**
* 1.这儿现已完结了对counterCells的初始化操作。咱们先看进程2判别
*
*/
if ((as = counterCells) != null && (n = as.length) > 0) {
/**
* 1.1核算counterCells数组中元素的方位是否为空
*
* 不论h是什么,假设n便是初始容量2,h&1位运算成果只能是0或1(由于h只有最终一个位参加运算)
* (a = as[(n - 1) & h]) == null 阐明counterCells数组的方位没有值
*
*/
if ((a = as[(n - 1) & h]) == null) {
//cellsBusy == 0 阐明counterCells没有初始化也没有扩容
if (cellsBusy == 0) {
//创立一个 CounterCell 对象,将保存到 counterCells数组中
CounterCell r = new CounterCell(x); // Optimistic create
/**
* 这儿为什么还要对cellsBusy CAS 判别呢?
* 说白了,来到这儿也有或许是多个线程,所以有必要保证只能有一个操作成功
*/
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
//仍然double check, 由于有或许履行完finally逻辑后,另一个线程就进来了
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//Uncontended 是无竞赛的意思
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
/**
* 尝试用CAS对该单元格的值累加,假设成功则退出
*/
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
/**
* 屡次尝试操作后,仍然失利了,阐明对couterCells并发十分高,则此刻尝试扩容
* 扩容为本来的2倍(也是2的N次幂)
*/
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//产生一个新的随机数
h = ThreadLocalRandom.advanceProbe(h);
}
/**
* 2.初始化counterCells数组,先看这儿。
* cellsBusy == 0 表明不忙,也便是还没有初始化,然后经过CAS将其修改为1,表明忙
* 在调配double check 这样其他线程就无法再进来重复初始化了
*
*/
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
// double check
if (counterCells == as) {
//创立一个长度为2的数组
CounterCell[] rs = new CounterCell[2];
/**
* 核算索引方位,h是一个随机数
* h & 1 的成果要么是0,要么是1,由于不论h是什么,它只有最终1位参加位运算
* 索引,这个x值或许放在index = 0的方位,或许放到index = 1的方位
*/
rs[h & 1] = new CounterCell(x);
//赋值,完结初始化操作
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//假设对counterCells计数失利,回过头来在对baseCount计数
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
整个流程咱们用下面这个图概括下:
怎么核算元素个数呢?
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
//先取出baseCount的值
long sum = baseCount;
//再取出counterCells数组中每个单元格的值
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
好了,关于核算元素个数就梳理到这儿,接下来咱们看下是否扩容的逻辑。
3.5 扩容
private final void addCount(long x, int check) {
//......省掉核算元素个数的逻辑........
//假设是remove元素,则不要走扩容逻辑了,remove传过来的是-1
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
/**
* s = sumCount() 便是元素个数
* 由于table现已完结了初始化(table != null),所以sizeCtl此刻表明扩容阈值
* s >= (long)(sc = sizeCtl): 判别集合元素个数是否大于扩容阈值,超越则扩容
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
/**
* 内部完结:Integer.numberOfLeadingZeros(n) | (1 << (16 - 1));
* 先不看前面,二者按位或之后第17位肯定是1,其他先不论
*/
int rs = resizeStamp(n);
/**
* sc < 0 是什么状况?很明显便是扩容了
*
* 假设a,b两个线程一起来到while这儿,a线程优先履行了else if, 此刻将会把
* sizeCtl修改为负数,此刻b线程判别 sc < 0 就建立了。
*
* 假设a线程履行完CAS判别后,还没有履行transfer(tab, null)办法,此刻切换
* 到b线程,由于nextTable == null, 所以b线程直接退出了。这怎么了解 ?
* 1)b线程现已将元素增加到old table中了
* 2)元素个数现已核算结束了
* 所以,直接让b线程退出,由a线程完结搬迁工作就能够了。。。。。
*
* 假设a线程履行完CAS判别后,开端履行transfer(tab, null)办法,nextTable != null
* 此刻切换到b线程,b线程履行CAS判别后,履行transfer(tab, nt) 办法,这儿怎么了解?
* --这儿便是帮忙扩容了。
*
*
*/
if (sc < 0) {
/**
* 判别是否现已扩容结束了,nextTable便是扩容后的新数组
* 扩容完结后,会将nextTable 赋给 table
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//帮忙扩容
transfer(tab, nt);
}
/*
* 将rs左移16位,使sizeCtl变成负数
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//当时线程是触发扩容的第一个线程
transfer(tab, null);
s = sumCount();
}
}
}
咱们先来看下rs
变量的意义:
static final int resizeStamp(int n) {
//RESIZE_STAMP_BITS = 16
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
首先将1左移15位:
00000000 00000000 00000000 00000001
00000000 00000000 10000000 00000000 #左移15位,前面15位去掉,后边补15位
不论
Integer.numberOfLeadingZeros(n)
的值是什么,他们按位或的成果,其间第17位一定是1.
咱们持续看 else if 中的CAS的逻辑:
//RESIZE_STAMP_SHIFT = 16
else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
其间要对rs的值左移16位:
00000000 00000000 10000000 00000000 #rs
10000000 11100111 00111111 11111000 # rs << 16
rs 左移16位,会将第17位的1移动到首位,这意味着什么?意味着rs 是一个小于0的数值。只看首尾就好,其他位是什么不重要。
接下来咱们就看下扩容的中心逻辑(十分杂乱):
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
/**
* 这个办法是能够完结多个线程的一起扩容的
* 这儿的意义便是对旧数组分块,每个线程担任多少块(步长)的数据搬迁。
* 默许是每个线程担任16个桶位的搬迁
* 当然了,假设是单核,那就彻底没有必要使用多线程扩容了,数组长度是多少,线程就担任多少个元素(步长)
*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//创立新的数组,长度为本来的2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//便利帮忙扩容的线程拿到新表
nextTable = nextTab;
//transferIndex 便是旧数组的长度
transferIndex = n;
}
int nextn = nextTab.length;
/**
* 创立一个 fwd 节点,它的hash值固定为 -1
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//搬迁推动标识
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
/**
* 这个while的逻辑:
* 1)给每个线程分配搬迁的使命(默许16个长度)
* 2) 从后往前一个一个搬迁
*/
while (advance) {
int nextIndex, nextBound;
/**
* boud 是一个使命段的边界值,请看下面的图
* 没有达到边界值之前,一个一个元素的搬迁
*/
if (--i >= bound || finishing)
advance = false;
//搬迁到最终一个元素了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
/**
* 这儿的逻辑便是为每个线程分配一个使命段(默许便是16个桶位的长度)
*
*/
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/**
* 这个if的判别便是:查看是否一切线程都搬迁完结
*
*
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/**
* 一切线程都搬迁完结
*/
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
/**
* 再次review旧数组每个节点,保证没有遗失的
* 来到这儿,i = oldTable.legnth, bound = 0
*/
i = n; // recheck before commit
}
}
//从这接下来的判别便是真实的搬迁了
//假设旧数组方位是null,那爽性不必搬迁了,直接放一个fwd节点,告诉其他线程正在扩容,
//不要再往旧数组放了,你能够过来帮忙扩容
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//阐明现已搬迁过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//搬迁真实有数据的桶位
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//处理链表
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//处理红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
整个搬迁的逻辑十分的杂乱,而且代码很长,咱们将其拆开看:
3.5.1 对搬迁步长的了解
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//构建每个线程搬迁的步长,默许每个便是16个
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
/**
* 穿件扩容后新的table, 长度为本来的2倍
*/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//赋给nextTable, 便利其他帮忙扩容的线程能否拿到新表
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
/**
* 构建一个fwd节点,hash值固定为-1
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
//从后往前遍历旧数组,每搬迁一个就把旧数组的方位放一个fwd节点
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//省掉for循环的其他逻辑........
}
}
咱们将要点关注代码截图展示:
- 首先线程进来先履行1处,它便是给每个线程分配搬迁的使命段,默许便是16个,bound便是本次线程担任搬迁的段尾
- 然后循环履行2处,从后往前一个一个搬迁数据,搬迁完结后,将fwd节点覆盖
- 表明全体搬迁结束了
咱们用一个图来表明下它的分段:
咱们在用一个图来展示下
3.5.2 搬迁数据
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//....省掉其他代码......
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//1.省掉分配使命的while循环
//2.省掉搬迁完结的判别
//接下来便是搬迁工作了
//假设旧数组方位是null,则不必搬迁,直接放一个fwd节点即可
//假设其他线程想要put元素,发现是fwd节点,则知道在扩容,此刻将会参加扩容,将数据放到新数组中
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//阐明现已搬迁过了,当每个线程搬迁完结后,它还会重新查看一遍,保证没有遗失
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//这儿便是将有数据的节点真实的搬迁了
else {
/**
* 这儿要加锁,防止A线程搬迁的进程中,B线程往旧的数组里put元素
* put先抢到锁,就先增加,搬迁线程先抢到锁,则先搬迁
*/
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//处理链表
if (fh >= 0) {
//fh & n: 假设是0,阐明是放到原方位,这个和HashMap相同的
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//放到新数组的【原方位】
setTabAt(nextTab, i, ln);
//放到新数组的【原方位 + oldCap】方位
setTabAt(nextTab, i + n, hn);
//旧数组放一个fwd节点
setTabAt(tab, i, fwd);
//再次向前推动
advance = true;
}
//处理红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
这儿其实相对简略,咱们简略画个图来展示下:
3.5.3 搬迁完结
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//....省掉其他代码.......
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
/**
* 搬迁完结nextIndex = -1
*/
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//....省掉else if....
}
/**
* 搬迁完结的判别
* 搬迁到最终一个元素,i = -1
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//一切线程搬迁完结了,将nextTable 置为null
nextTable = null;
//将搬迁后的新数组赋给table,此刻table便是新数组了
table = nextTab;
/**
* 核算新的扩容阈值
* sizeCtl = 2n - 0.5n = 1.5n = 0.75 * 2n
* JDK的开发者将位运算用的真是神乎其神
*/
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/**
* 判别是否一切线程都完结了扩容
* sc - 1 是由于,帮忙扩容的时分sc + 1
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
/**
* 将数据长度赋给i,重新查看旧数组的一切方位,保证都搬迁结束,没有遗失
*/
i = n;
}
}
//......省掉其他代码......
}
咱们要点看下CAS的判别逻辑:
外层判别(1) 为什么要对sizeCtl减1的操作呢?
这个首要是和帮忙扩容有关。
咱们看下帮忙扩容的代码:
我的初始容量是64,核算出来的rs值是固定的32793,然后触发扩容的第一个线程A会将sizeCtl
修改为负数 -2145845246
, 然后参加扩容的线程B会将sizeCtl
修改为 -2145845246 + 1
,参加扩容的线程C将sizeCtl修改为-2145845246 + 1 + 1
, 接下来咱们回到扩容完结的逻辑:
所以这个内层判别和外层判别的逻辑便是查看是否一切线程都完结了扩容操作。
假设从始至终都只有一个线程参加扩容这个判别也是建立的,由于sizeCtl的值先赋给了sc,然后经过CAS将sc – 1 赋给sizeCtl, 可是sc的值没变,所以内层判别肯定是建立的。
咱们在简略看下,put元素时呈现hash抵触
,而且hash=-1
时的帮忙扩容:
简略看下代码:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//double check
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
//sc = sizeCtl) < 0 :小于0表明扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
/**
* 判别是否扩容结束了。。
* (sc >>> RESIZE_STAMP_SHIFT) != rs 判别我看在JDK17中拿掉了
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//参加扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
3.6 put元素进程总结
-
核算扰动核算出hash值,这个hash值永远是正数
-
假设没有初始化,就调用 initTable() 办法来进行初始化
-
假设没有 hash 抵触就直接 CAS 无锁刺进
-
假设正在扩容,则当时线程帮忙扩容
-
假设存在 hash 抵触,就加锁来保证线程安全,两种状况:一种是链表形式就直接遍历到尾端刺进,一种是红黑树就按照红黑树结构刺进
-
假设该链表的数量大于阈值8而且数组长度大于64,就要先转换成红黑树的结构
-
假设增加成功就调用 addCount() 办法核算 size,而且查看是否需求扩容
4.get()办法剖析
了解了put办法,再来看get就十分简略了
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//核算key的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//从数组的桶中查找到
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
/**
* 从红黑树中查找
* hash < 0 只有是扩容或许红黑树在桶中的元素节点
*/
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//从链表中查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
5.debug 剖析多线程扩容
预备测试数据:
/**
* @author qiuguan
* @date 2022/11/24 23:40:24 周四
*/
public class ConcurrentHashMapTest {
public static void main(String[] args) {
/**
* 指定容量32,则table的初始容量是64
* 当达到 64 * 0.75 = 48 时触发扩容
*/
ConcurrentHashMap<String, Integer> chm = new ConcurrentHashMap<>(32);
//先放47个元素
for (int i = 1; i <= 47; i++) {
chm.put("jk" + i, i);
}
//Ea 的 hash值是: 2236, 在数组中的索引方位是: 60
//FB 的 hash值是: 2236, 在数组中的索引方位是: 60
int ea = "Ea".hashCode();
int fb = "FB".hashCode();
System.out.printf("Ea 的 hash值是: %s, 在数组中的索引方位是: %s\n", spread(ea), (64 -1) & spread(ea));
System.out.printf("FB 的 hash值是: %s, 在数组中的索引方位是: %s\n", spread(fb), (64 -1) & spread(fb));
//再次增加元素会触发扩容
new Thread(() -> chm.put("Ea", 11)).start();
new Thread(() -> chm.put("FB", 22)).start();
}
//将源码中核算hash值的逻辑拿出来s
static final int spread(int h) {
return (h ^ (h >>> 16)) & 0x7fffffff;
}
}
要害方位打上断点:
先让thread0
(Ea)线程履行:
然后让thread0
开端搬迁,搬迁几个就行,由于”Ea
“的在数组中方位是靠后的(index = 60),而搬迁是从后往前搬迁的,所以保证”Ea
“搬迁完结后,咱们切换到 “thread1“线程。
在thread1线程进行帮忙扩容前,我先用几张图来演示下进程:
-
thread0
(“Ea”)触发扩容
-
thread0
(“Ea”)开端扩容并完结”Ea”数据的搬迁工作
-
thread1
(“FB”)put元素产生hash碰撞而且桶中元素的hash值为-1,预备帮忙扩容
接下来咱们就从”thread1
“视角看它是怎么帮忙扩容的,首先要切换到”thread1
“线程
用一张图展示的话,大概便是下面这样
好了,关于ConcurrentHashMap的源码剖析就到这儿吧。
补充:
ConcurrentSkipListMap
是一个支撑排序的并发容器
限于作者水平,文中不免有过错之处,欢迎纠正,感谢感谢