前语
本文为笔者学习和思考后结合自己的了解所做的总结,参阅了很多大佬的讲解,但仍是不扫除存在了解有误的地方,如有错误请指正
我会别离总结Java1.7和1.8两个版别的ConcurrentHashMap的完结原理,主要是对初始化、put、get、扩容几个办法进行总结
为什么有了HashMap还需求ConcurrentHashMap
HashMap作为Java十分经典的调集数据结构,增删改查都能够做到O(1)的时刻复杂度,十分值得学习。可是HashMap在并发状况下会呈现死循环的问题。
HashMap并发死循环原理
HashMap扩容时会进行Rehash进程,而Rehash的进程是次序拜访旧数组中的链表,但选用头插法添加到到新数组对应方位的链表中,这个进程会将本来的链表次序倒置,一起也是在并发状况下产生死循环的原因
//次序遍历旧数组中的链表,取出链表的当时元素e,记住旧链表的下一个元素next
Entry<K,V> next = e.next; // 1
//从头在核算新数组中的下标
int i = indexFor(e.hash, newCapacity);
//运用头插法将元素刺进新数组
e.next = newTable[i];
newTable[i] = e;
//再取旧数组中的下一个元素
e = next;
假如线程1履行到代码 1 处时刻片用完了,当时的的链表状况如下:
e指向 key=3的节点1 next 指向 key=7的节点2
而此刻线程二取得时刻片开端履行
第一次:e指向节点1,next指向节点2,选用头插法把e刺进新数组中
第一次:e指向节点2,next指向节点3(key=5),选用头插法把e刺进新数组中
此刻在新链表中链表的次序发生了倒置 节点2在前,节点1在后了
第三次履行后完结了扩容,现在的链表状况就变成了这样:
之后线程1从头取得时刻片履行,可是他并不知道现已扩容结束了,他现在的 e指向 key=3的节点1 next 指向 key=7的节点2
开端持续依照扩容的代码履行,再次将e运用头插法添加到新数组中,这个时分就呈现环了节点e2.next=节点e1,而线程1又将e1运用头插法参加到链表头部,e1.next=e2
后续线程1不断取next进行头插法参加新数组,其实一直都是e1,e2两个在互换方位
所以官方供给了ConcurrentHashMap确保并发状况下HashMap的线程安全
ConcurrentHashMap源码
JDK1.7版别
存储结构如下
1.7版别的ConcurrentHashMap是引入了一个段(Segment)数组,每个段内是一个HashMap结构,运用分段锁确保线程安全,每次锁一个段(Segment),不同段上的数据互不影响,所以理论上最大并发度便是Segment数组的长度,Segment数组的长度在初始化后就不行再改变,因此最大并发度也在初试化时就承认
初始化
初始化函数除了一些参数校验之外主要做了三件事:
- 承认最大并发度ssize:依据concurrencyLevel向上取最接近的2的n次方做为最大并发度,也便是Segment数组的长度
- 依据总的容量承认每个Segment的容量巨细cap:(initialCapacity/段长度)向上取最接近的2的n次方,最小是2
- 依据承认的ssize创立Segment数组,并且依据cap和loadFactor初始化Segment[0]作为后续其他Segment节点初始化的模版
/**
* 默许初始化容量
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 默许负载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 默许并发级别
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//无参结构 调用的是有参结构函数,传入三个默许值
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//有参结构
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 参数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 校验并发级别巨细,大于 1<<16,重置为 65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 2的多少次方
int sshift = 0;
int ssize = 1;
// 这个循环能够找到 concurrencyLevel 之上最近的 2的次方值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 记载段偏移量
this.segmentShift = 32 - sshift;
// 记载段掩码
this.segmentMask = ssize - 1;
// 设置容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c = 容量 / ssize ,默许 16 / 16 = 1,这儿是核算每个 Segment 中的相似于 HashMap 的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//Segment 中的相似于 HashMap 的容量至少是2或许2的倍数
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创立 Segment 数组,设置 segments[0]
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
put
put分三部分
- 段数组层面的put:经过key的hash值前几位j=(hash >>> segmentShift) 找到段数组对应的段
- 段初始化:段为null时调用ensureSegment(j) 对段进行初始化
- 段层面的put:一个段相当于一个HashMap,这儿是在段内进行put
段数组层面的put
经过key的hash值核算段地址j,找到段数组上对应的段,调用段的put
- segmentShift:每个段的容量cap需求占的位数,比方每个段的巨细为1024,则segmentShift=10
- segmentMask: 段掩码,拿来取段内地址,比方段巨细为1024,则段内地址为[0-1023],用低9位表明,segmentMask便是为了取hash值的低9位 所以segmentMask=2进制(111111111)
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// hash 值无符号右移 28位(初始化时取得),然后与 segmentMask=15 做与运算
// 其实也便是把高4位与segmentMask(1111)做与运算
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 假如查找到的 Segment 为空,调用ensureSegment初始化
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
段初始化ensureSegment(j)
段初始化其实就三个操作
- 获取模版Segment[0]的参数信息(容量,负载因子,阈值)
- 创立Segment
- CAS操作将新创立的段添加到Segments数组中
但本着尽或许早的发现Segement以及被其他线程初始化而停止当时的初始化,然后削减资源浪费的思想,在每进行一个阶段操作前都进行一次判别空,先后进行了两次判空操作才运用CAS初始化
- 进入后尽早判别Segment[k]是否null (第一次判别段是否为null)
- 获取Segment[0]段模版的容量,负载因子,阈值做初始化预备
- 第2次判别段是否为null
- 运用上面获取到的创立Segment
- CAS操作将新创立的段添加到Segments数组中
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 第一次判别 u 方位的 Segment 是否为null
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
// 获取0号 segment 里的 HashEntry<K,V> 初始化长度
int cap = proto.table.length;
// 获取0号 segment 里的 hash 表里的扩容负载因子,一切的 segment 的 loadFactor 是相同的
float lf = proto.loadFactor;
// 核算扩容阀值
int threshold = (int)(cap * lf);
// 创立一个 cap 容量的 HashEntry 数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 第2次判别 u 方位的 Segment 是否为null
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
// 再次查看 u 方位的 Segment 是否为null,由于这时或许有其他线程进行了操作
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 自旋查看 u 方位的 Segment 是否为null
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
// 运用CAS 赋值,只会成功一次
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
段内put
在Segment数组中获取hash值对应的段Segment,并且承认段不为null后会进行段内put
由于段内自身便是一个HashMap结构所以段内put和HashMap的put几乎是相同的,唯一的差异便是在最开端需求先获取锁tryLock()
- 获取锁,获取成功往下履行,获取失利则进入scanAndLockForPut(key, hash, value) 后边会介绍
- 依据hash值核算key在段内HashEntry数组的方位,该方位上是一个链表(也或许为null)
- 遍历链表看是否存在key值相同的元素,假如有则判别是否答应掩盖,然后再对值进行掩盖
- 若不存在key值相同的,则运用头插法添加一个新节点,一起判别是否添加新节点后是否到达阈值,到达阈值则扩容 rehash(node)后边会介绍
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 核算要put的数据方位
int index = (tab.length - 1) & hash;
// CAS 获取 index 坐标的值
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
// 查看是否 key 现已存在,假如存在,则遍历链表寻觅方位,找到后替换 value
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// first 有值没阐明 index 方位现已有值了,有冲突,链表头插法。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 容量大于扩容阀值,小于最大容量,进行扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// index 方位赋值 node,node 或许是一个元素,也或许是一个链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
再来看看scanAndLockForPut(key, hash, value)
tryLock()获取不到锁的线程则进入这个函数,他有两个效果
- 不断自旋获取锁while (!tryLock()) 这是最主要的效果
- 在自旋的进程他为了不是纯粹的空自旋,还添加了一点别的效果便是去遍历链表查看是否存在key值相同的元素,假如不存在就在自旋期间把 HashEntry节点先创立了,算是为后边取得锁之后的刺进操作节约了一点时刻
在承认链表查看是否存在key值相同的元素后的自旋便是空自旋了,进入空自旋有两个操作
- 他会对空自旋次数进行核算,到达自旋阈值MAX_SCAN_RETRIES就进入堵塞了
- 一起每两次空自旋就会判别一次当时的链表头节点是否变化,由于添加元素时运用头插法,经过判别头节点是否变化来判别是否有新的节点参加进来(也有或许是头节点被删去了) ,假如有新节点参加进来他就得从头判别是否存在key相同的元素
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 自旋获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 自旋到达指定次数后,堵塞等到只到获取到锁
lock();
break;
}
//判别一次当时的链表头节点是否变化
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//从头变成刚进入时的状况,从头判别是否存在key相同的元素
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
扩容
由于此处的扩容是在段内扩容,并且是在持有锁的状况下扩容,所以这儿的扩容和HashMap的扩容机制是几乎是相同的
- 创立一个新的数组,数组长度为旧数组的两倍
- 遍历旧数组每个方位的链表,将链表上的每个元素与新的掩码(newCapacity – 1)进行&核算在新数组的方位,并运用头插法把元素刺进新数组中
差异在于这儿的扩容会在第2步前先核算lastRun的方位,lastRun节点便是最终一串hash值在新数组中方位没变的节点中的头节点
找到lastRun节点后直接将lastRun节点先参加到新数组中,使得最终这一部分的节点得到复用,不需求从头创立
所以实践上的次序为
- 创立一个新的数组,数组长度为旧数组的两倍
- 找到lastRun节点后直接将lastRun节点先参加到新数组中
- 遍历旧数组每个方位的链表,将链表上的截止到lastRun节点之前到每个元素与新的掩码(newCapacity – 1)进行&核算在新数组的方位,并运用头插法把元素刺进新数组中
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
// 老容量
int oldCapacity = oldTable.length;
// 新容量,扩展两倍
int newCapacity = oldCapacity << 1;
// 新的扩容阀值
threshold = (int)(newCapacity * loadFactor);
// 创立新的数组
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,默许2扩容后是4,-1是3,二进制便是11。
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 遍历老数组
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 核算新的方位,新的方位只或许是不方便或许是老的方位+老的容量。
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// 假如当时方位还不是链表,仅仅一个元素,直接赋值
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// 假如是链表了
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 新的方位只或许是不方便或许是老的方位+老的容量。
// 遍历结束后,lastRun 后边的元素方位都是相同的
for (HashEntry<K,V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// ,lastRun 后边的元素方位都是相同的,直接作为链表赋值到新方位。
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
// 遍历剩下元素,头插法到指定 k 方位。
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 头插法刺进新的节点
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
get
get操作不需求加锁,直接读就行了
- 依据hash值核算段地址,Segment数组找到对应的段
- 依据hash值核算在段内的数组上的方位
- 遍历数组对应方位的链表,查看是否存在key相同的值,若存在则回来value,不存在则回来null
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 核算得到 key 的存放方位
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
// 假如是链表,遍历查找到相同 key 的 value。
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
锁的运用
- 选用了Segment数组+HashEntry数组+链表的结构,经过Segment来降低锁的粒度,进步并发度
- 段初始化时ensureSegment运用达观锁(CAS操作)为将初始化的段赋值到Segment数组中
- put操作时先对Segment段加锁,加锁成功才能进入段内操作
总的思想将一个大的HashMap拆分红n个小的HashMap(segment),每个segment在任意时刻只答应一个线程操作
JDK1.8版别
存储结构
JDK1.7中选用Segement数组+HashEntry数组+链表的结构进行存储,选用分段锁的办法解决并发问题,所以并发度取决于Segement数组的巨细,并且这个巨细是在初始化的进程就决定的,后期不能修正
JDK1.8版别中摒弃了这种设计,选用了和HashMap相似的Node数组+链表+红黑树的办法完结,并发的巨细取决于Node数组的巨细,锁的粒度更小,每次只锁一条链表的数据,并且Node数组是能够扩容的,所以并发度也能够跟着扩容而增大。
初始化
初始化的函数十分简单,由于他并没有做实践的初始化作业,真正的初始化作业是在添加元素的时分做的
关于无参数的结构办法,他甚至什么不做
关于有参数的办法
- 找到初始化容量1.5倍+1向上最接近的2的n次方cap
- 将cap暂存在sizeCtl中
// 这结构函数里,什么都不干
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//sizeCtl这个变量在整个ConcurrentHashMap中十分重要,后边会详细讲
this.sizeCtl = cap;
}
sizeCtl
sizeCtl在整个ConcurrentHashMap中十分重要,他有四种状况
- 表明未实践初始化前暂存初始化的容量巨细:sizeCtl=cap
- 表明处于实践初始化进程中:sizeCtl=-1
- 表明整个ConcurrentHashMap的阈值:sizeCtl = sc (sc是0.75*n)
- 表明并发扩容的线程个数 sizeCtl=-N -N表明有N个线程正在帮忙扩容
这儿没看懂没关系,跟着思路走,后边到详细运用到的地方就了解了
put
实践上的put办法只要5个步骤
-
依据hash值找到元素应该存放的Node数组的下标
-
对Node数组对应下标的链表头节点上锁 synchronized (f)
-
上锁成功之后第一步便是判别咱们锁的节点是否仍是链表头节点,假如不是就放弃操作,从头去竞赛表头节点的锁。为什么需求这一步判别呢? 由于咱们上锁的这个时刻内或许咱们上锁的表头节点现已不是表头节点了,比方remove了表头节点,这样咱们上的锁就无效了,由于别的线程经过synchronized锁的表头节点和咱们不是一个目标,所以也能成功取得锁
-
成功锁住链表后就开端真正的put操作了,判别是链表节点仍是红黑树节点(if (fh >= 0) // fh是头节点的 hash值,大于 0阐明是链表)
- 假如是链表则遍历链表查看是否有key相同的元素,若存在相同的元素则掩盖,若没有则在表尾刺进元素
- 假如是红黑树节点则查看是否调用红黑树的putVal办法刺进元素
-
成功刺进后调用addCount将元素个数+1,addCount()里边会判别存储的数据巨细是否到达阈值,假如到达则进入扩容操作
可是put办法在上述的操作之前还进行3个状况判别
-
判别Hash表是否初始化 if (tab == null || (n = tab.length) == 0)
在之前讲到init办法并未实践做初始化,而是在put操作的时分进行实践的初始化,便是经过这个判别分支调用初始化办法initTable(),后边会详解
-
判别Hash表对应下标的方位是否为空 else if ((f = tabAt(tab, i = (n – 1) & hash)) == null)
由于Hash表对应下标的方位为空则不存在表头节点,没有办法对表头节点进行上锁,所以经过CAS操作直接将咱们要刺进的节点变成表头节点
-
判别是否正在并发扩容 else if ((fh = f.hash) == MOVED)
hash值为MOVED是并发扩容进程中的一个标志状况,在这个状况下线程需求去先一起帮忙完结扩容后才能够进行其他操作 tab = helpTransfer(tab, f) 后边会详解
当put办法每次进入上述三个分支的时分表明本次刺进还未完结(第二个分支CAS失利也是刺进失利),所以在外层加了一个for循环来不断测验刺进元素,直到成功刺进才退出
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 得到 hash 值
int hash = spread(key.hashCode());
// 用于记载相应链表的长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 假如数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 初始化数组,后边会详细介绍
tab = initTable();
// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 假如数组该方位为空,
// 用一次 CAS 操作将这个新值放入其间即可,这个 put 操作差不多就结束了,能够拉到最终边了
// 假如 CAS 失利,那便是有并发操作,进到下一个循环就好了
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 竟然能够等于 MOVED,这个需求到后边才能看理解,不过从名字上也能猜到,必定是由于在扩容
else if ((fh = f.hash) == MOVED)
// 帮忙数据搬迁,这个等到看完数据搬迁部分的介绍后,再了解这个就很简单了
tab = helpTransfer(tab, f);
else { // 到这儿便是说,f 是该方位的头节点,并且不为空
V oldVal = null;
// 获取数组该方位的头节点的监视器锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头节点的 hash 值大于 0,阐明是链表
// 用于累加,记载链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 假如发现了"相等"的 key,判别是否要进行值掩盖,然后也就能够 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最终边
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值办法刺进新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 判别是否要将链表转换为红黑树,临界值和 HashMap 相同,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 这个办法和 HashMap 中稍微有一点点不同,那便是它不是一定会进行红黑树转换,
// 假如当时数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
// 详细源码咱们就不看了,扩容部分后边说
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}
initTable
在put操作行进行了三次状况的判别,第一个便是判别Hash表是否初始化,假如未初始化则经过initTable函数完结初始化操作
-
判别当时Hash表是否还为null: (tab = table) == null || tab.length == 0
由于在咱们上次判别到进入initTable办法的进程中或许现已有其他线程完结了初始化作业
-
判别Hash表是否现已处于了初始化状况 (sc = sizeCtl) < 0
前面说到在初始化进程中sizeCtl=-1,初始化进程只能由一个线程完结,不能多个线程帮忙,所以在发现有线程现已开端初始化过后就会放弃时刻片进入等候
-
经过前两个判别后还能持续往下履行阐明Hash表还未初始化,并且还没有线程竞赛到初始化的使命,就先经过CAS操作去竞赛初始化的作业(也便是测验将sizeCtl改为-1),竞赛成功则开端担任初始化作业
-
实践创立作业
- 创立Node数组长度为初试容量巨细
- 将table指向新创立的Node数组
- 核算扩容阈值0.75 * n,最终保存到sizeCtl中
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 初始化的"功劳"被其他线程"抢去"了
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// DEFAULT_CAPACITY 默许初始容量是 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化数组,长度为 16 或初始化时供给的长度
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将这个数组赋值给 table,table 是 volatile 的
table = tab = nt;
// 假如 n 为 16 的话,那么这儿 sc = 12
// 其实便是 0.75 * n
sc = n - (n >>> 2);
}
} finally {
// 设置 sizeCtl 为 sc,咱们就当是 12 吧
sizeCtl = sc;
}
break;
}
}
return tab;
}
addCount
addCount办法用于更新hash表中元素的个数,查看是否到达扩容阈值s >= (long)(sc = sizeCtl) ,假如到达阈值则敞开扩容. transfer(tab, null)
transfer是实践扩容的办法,留意transfer的第二个参数,第一个敞开扩容的线程传的是null,后续帮忙扩容的线程传递的都是第一个线程创立的容量为旧数组两倍的新数组
可是在判别扩容阈值的时分还会有一种特殊状况,便是hash表现已处于扩容的进程中,这个时分sizeCtl=-N,N表明帮忙扩容的线程个数,这种状况下线程会进行帮忙扩容 transfer(tab, nt)
//新增元素时,也便是在调用 putVal 办法后,为了通用,添加了个 check 入参,用于指定是否或许会呈现扩容的状况
//check >= 0 即为或许呈现扩容的状况,例如 putVal办法中的调用
private final void addCount(long x, int check){
... ...
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//查看当时调集元素个数 s 是否到达扩容阈值 sizeCtl ,扩容时 sizeCtl 为负数,仍旧建立,一起还得满足数组非空且数组长度不能大于答应的数组最大长度这两个条件才能持续
//这个 while 循环除了判别是否到达阈值然后进行扩容操作之外还有一个效果便是当一条线程完结自己的搬迁使命后,假如调集还在扩容,则会持续循环,持续参加扩容大军,恳求后边的搬迁使命
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// sc < 0 阐明调集正在扩容当中
if (sc < 0) {
//判别扩容是否结束或许并发扩容线程数是否已达最大值,假如是的话直接结束while循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
break;
//扩容还未结束,并且答应扩容线程参加,此刻参加扩容大军中
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//假如调集还未处于扩容状况中,则进入扩容办法,并首先初始化 nextTab 数组,也便是新数组
//(rs << RESIZE_STAMP_SHIFT) + 2 为首个扩容线程所设置的特定值,后边扩容时会依据线程是否为这个值来承认是否为最终一个线程
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
helpTransfer
在put办法中进行状况判别时发现当时hash表正在扩容(链表头节点的hash值为MOVED. else if ((fh = f.hash) == MOVED) ),则进入担任扩容函数helpTransfer,详细操作如下
- 判别当时是否还处于扩容状况 f instanceof ForwardingNode
- 判别扩容是否结束或许并发扩容线程数是否已达最大值,假如是的话直接结束while循环
- 帮忙进入扩容
//扩容状况下其他线程对调集进行刺进、修正、删去、兼并、compute等操作时遇到 ForwardingNode 节点会调用该帮忙扩容办法 (ForwardingNode 后边介绍)
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
//判别扩容是否结束或许并发扩容线程数是否已达最大值,假如是的话直接结束while循环
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
扩容
进行实践扩容操作的函数是 transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) ,而进入这个扩容函数的途径有3种
- addCount中发现到达扩容阈值
- Hash表处于扩容状况时其他线程对hash进行增删改等操作是遇到ForwardingNode 节点
- 将链表转化为红黑树时发现数组长度小于64
扩容函数中详细的操作如下:
-
核算步长stride,也便是每个线程单次需求处理Node数组中Node节点个数
假如只要一个CPU则步长就整个数组长度
若多个CPU则每个线程处理 (n >>> 3) / NCPU个Node节点,可是最小不能小于16个节点
-
创立新数组,长度为旧数组的2倍(只要第一个敞开扩容的线程(nextTab == null)才会履行)
-
第一个for循环为当时线程分配节点长度为步长的待处理节点区间
-
遍历分配到的待处理节点,将表头节点上锁
-
判别链表时红黑树节点仍是链表节点
链表节点:
-
找到lastRun节点直接复用
-
遍历链表节点直到lastRun节点结束,将每个节点的hash值与于旧数组长度相&,将成果为1和0的拆分到两个不同的链表中,最终刺进到新数组对应方位
为什么将每个节点的hash值与于旧数组长度相&?
举个例子就了解了:
本来数组长度为16,扩容后数组长度为32
本来数组下标为1处的链表节点的hash值最终5位或许是00001,也或许是10001,他们与旧数组的掩码1111进行&操作后成果都是1所以都放在了下标为1的方位
新数组的掩码为11111,所以要求他们在新数组的方位能够运用hash值与新数组的掩码进行&操作,可是由于之前他们之前在旧数组中方位1现已承认了他们的低4位巨细为1,所以只需求承认第5位进行,将第5位为1的就放到新数组下标17的方位,第5位为0的就仍是放到新数组下标1的方位
红黑树节点
- 找到lastRun节点直接复用
- 运用for循环以链表办法遍历整棵红黑树,仍然将节点拆分为高位和低位两个链表
- 判别高位和低位两个链表的长度是否小于UNTREEIFY_THRESHOLD,假如小于就在新数组中转化为链表,假如不小于则仍是以红黑树结构保存
-
-
在处理完自己分配到的节点区间后会判别自己是不是最终一个处理扩容的线程
- 假如是则会再将整个旧数组遍历一遍,避免发生遗失,完结后将table指向新数组,阈值sizeCtl改成新数组的075倍
- 假如不是则将直接退出(假如扩容未结束还会经过addCount等函数的for循环再次进入扩容函数帮忙扩容)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//核算每条线程处理的桶个数,每条线程处理的桶数量相同,假如CPU为单核,则运用一条线程处理一切桶
//每条线程至少处理16个桶,假如核算出来的成果少于16,则一条线程处理16个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 初始化新数组(原数组长度的2倍)
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//将 transferIndex 指向最右边的桶,也便是数组索引下标最大的方位
transferIndex = n;
}
int nextn = nextTab.length;
//新建一个占位目标,该占位目标的 hash 值为 -1 该占位目标存在时表明调集正在扩容状况,key、value、next 特点均为 null ,nextTable 特点指向扩容后的数组
//该占位目标主要有两个用处:
// 1、占位效果,用于标识数组该方位的桶现已搬迁结束,处于扩容中的状况。
// 2、作为一个转发的效果,扩容期间假如遇到查询操作,遇到转发节点,会把该查询操作转发到新的数组上去,不会堵塞查询操作。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//该标识用于操控是否持续处理下一个桶,为 true 则表明现已处理完当时桶,能够持续搬迁下一个桶的数据
boolean advance = true;
//该标识用于操控扩容何时结束,该标识还有一个用处是最终一个扩容线程会担任从头查看一遍数组查看是否有遗失的桶
boolean finishing = false; // to ensure sweep before committing nextTab
//这个循环用于处理一个 stride 长度的使命,i 后边会被赋值为该 stride 内最大的下标,而 bound 后边会被赋值为该 stride 内最小的下标
//经过循环不断减小 i 的值,从右往左顺次搬迁桶上面的数据,直到 i 小于 bound 时结束该次长度为 stride 的搬迁使命
//结束这次的使命后会经过外层 addCount、helpTransfer、tryPresize 办法的 while 循环到达持续领取其他使命的效果
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//每处理完一个hash桶就将 bound 进行减 1 操作
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex <= 0 阐明数组的hash桶已被线程分配结束,没有了待分配的hash桶,将 i 设置为 -1 ,后边的代码依据这个数值退出当时线的扩容操作
i = -1;
advance = false;
}
//只要首次进入for循环才会进入这个判别里边去,设置 bound 和 i 的值,也便是领取到的搬迁使命的数组区间
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//扩容结束后做后续作业,将 nextTable 设置为 null,表明扩容已结束,将 table 指向新数组,sizeCtl 设置为扩容阈值
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//每逢一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 建立,阐明该线程不是扩容大军里边的最终一条线程,直接return回到上层while循环
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 阐明这条线程是最终一条扩容线程
//之所以能用这个来判别是否是最终一条线程,由于第一条扩容线程进行了如下操作:
// U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
//除了修正结束标识之外,还得设置 i = n; 以便从头查看一遍数组,防止有遗失未成功搬迁的桶
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
//遇到数组上空的方位直接放置一个占位目标,以便查询操作的转发和标识当时处于扩容状况
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//数组上遇到hash值为MOVED,也便是 -1 的方位,阐明该方位现已被其他线程搬迁过了,将 advance 设置为 true ,以便持续往下一个桶查看并进行搬迁操作
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//该节点为链表结构
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
//遍历整条链表,找出 lastRun 节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//依据 lastRun 节点的高位标识(0 或 1),首先将 lastRun设置为 ln 或许 hn 链的末尾部分节点,后续的节点运用头插法拼接
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//运用高位和低位两条链表进行搬迁,运用头插法拼接链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//setTabAt办法调用的是 Unsafe 类的 putObjectVolatile 办法
//运用 volatile 办法的 putObjectVolatile 办法,能够将数据直接更新回主内存,并使得其他线程作业内存的对应变量失效,到达各线程数据及时同步的效果
//运用 volatile 的办法将 ln 链设置到新数组下标为 i 的方位上
setTabAt(nextTab, i, ln);
//运用 volatile 的办法将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的方位上
setTabAt(nextTab, i + n, hn);
//搬迁完结后运用 volatile 的办法将占位目标设置到该 hash 桶上,该占位目标的用处是标识该hash桶已被处理过,以及查询恳求的转发效果
setTabAt(tab, i, fwd);
//advance 设置为 true 表明当时 hash 桶已处理完,能够持续处理下一个 hash 桶
advance = true;
}
//该节点为红黑树结构
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
//lo 为低位链表头结点,loTail 为低位链表尾结点,hi 和 hiTail 为高位链表头尾结点
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//同样也是运用高位和低位两条链表进行搬迁
//运用for循环以链表办法遍历整棵红黑树,运用尾插法拼接 ln 和 hn 链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
//这儿面形成的是以 TreeNode 为节点的链表
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//形成中心链表后会先判别是否需求转换为红黑树:
//1、假如符合条件则直接将 TreeNode 链表转为红黑树,再设置到新数组中去
//2、假如不符合条件则将 TreeNode 转换为一般的 Node 节点,再将该一般链表设置到新数组中去
//(hc != 0) ? new TreeBin<K,V>(lo) : t 这行代码的用意在于,假如本来的红黑树没有被拆分红两份,那么搬迁后它仍旧是红黑树,能够直接运用本来的 TreeBin 目标
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//setTabAt办法调用的是 Unsafe 类的 putObjectVolatile 办法
//运用 volatile 办法的 putObjectVolatile 办法,能够将数据直接更新回主内存,并使得其他线程作业内存的对应变量失效,到达各线程数据及时同步的效果
//运用 volatile 的办法将 ln 链设置到新数组下标为 i 的方位上
setTabAt(nextTab, i, ln);
//运用 volatile 的办法将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的方位上
setTabAt(nextTab, i + n, hn);
//搬迁完结后运用 volatile 的办法将占位目标设置到该 hash 桶上,该占位目标的用处是标识该hash桶已被处理过,以及查询恳求的转发效果
setTabAt(tab, i, fwd);
//advance 设置为 true 表明当时 hash 桶已处理完,能够持续处理下一个 hash 桶
advance = true;
}
}
}
}
}
}
get
get办法由于不涉及到对hash表的修正,所以也不需求上锁
- 经过hash值找到Node数组对应方位的表头节点
- 查看当时状况是否在扩容,并且当时链表现已扩容完结,表头节点为ForwardingNode,假如是则进入新数组中找咱们需求的元素
- 经过遍历链表寻觅需求的节点,假如找到则回来value,没找到则回来null
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判别头节点是否便是咱们需求的节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 假如头节点的 hash 小于 0,阐明 正在扩容,或许该方位是红黑树
else if (eh < 0)
// 参阅 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
长处
- 选用Node数组+链表+红黑树,锁的粒度更小,一起并发度能够跟着扩容而添加
- 经过锁Node节点确保一起只要一个线程操作链表,确保线程安全
- 经过多个线程并发帮忙扩容,大幅度进步了扩容的功率
特别感谢
Java全栈常识体系-ConcurrentHashMap详解
ConcurrentHashMap1.8 – 扩容详解
JavaGuide-ConcurrentHashMap 源码剖析