本文主要内容
- ConcurrentHashMap介绍
- ConcurrentHashMap初始化
- ConcurrentHashMap存储流程
- ConcurrentHashMap取出流程
- 总结
1、ConcurrentHashMap介绍
关于Java调集类,已经介绍过很多了,今日介绍完ConcurrentHashMap后,就暂时先告一段落。
ConcurrentHashMap,顾名思义,它也是map接口的完成者,从Concurrent一词来看,并发的,它是线程安全的。Hashtable也是线程安全的map接口完成类,这二者有什么不同吗?
Hashtable源码解析 中论述过Hashtable线程安全的原理,它运用 synchronized 关键字完成线程安全,比方 get 办法和 put 办法:
public synchronized V get(Object key) {}
public synchronized V put(K key, V value) {}
注意到,synchronized 关键字加在非静态办法上,阐明同步锁目标即是 Hashtable 目标自身,只要一个锁。
在线程竞赛激烈的状况下HashTable的功率十分低下。由于当一个线程访问HashTable的同步办法时,其他线程访问HashTable的同步办法时,或许会进入堵塞或轮询状态。如线程1运用put进行增加元素,线程2不光不能运用put办法增加元素,而且也不能运用get办法来获取元素,所以竞赛越激烈功率越低。
假如说HashTable 只要一个锁,那么 ConcurrentHashMap 则开创性的运用了锁分段技能。
假如容器里有多把锁,每一把锁用于锁容器其间一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞赛,然后可以有用的进步并发访问功率,这便是ConcurrentHashMap所运用的锁分段技能
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment承继可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的人物,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包括一个Segment数组,Segment的结构和HashMap相似,是一种数组和链表结构, 一个Segment里包括一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修正时,有必要首先获得它对应的Segment锁。
2、ConcurrentHashMap初始化
先来查看下ConcurrentHashMap中的重要成员变量。
//每个Segment都是一个ReentrantLock锁,一起它内部保存着一个HashEntry数组
final Segment<K,V>[] segments;
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* Sets next field with volatile write semantics. (See above
* about use of putOrderedObject.)
* 设置next,注意unsafe的运用,ConcurrentHashMap中很多这种操作
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
//核算 nextOffset ,以运用设置next
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
HashEntry比较简略,仅仅内部运用了Unsafe来设置next成员变量,不清楚Unsafe运用的,可以查看自己之前的文章,说说Java的Unsafe类 。
接下来咱们看看ConcurrentHashMap的结构办法。
//DEFAULT_INITIAL_CAPACITY和DEFAULT_CONCURRENCY_LEVEL一样,都是常量,16,而DEFAULT_LOAD_FACTOR为0.75
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//为简略剖析,咱们假定这些参数全是默认值,16,0.75,16
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//当concurrencyLevel值为16时,终究sshift 将为4,而ssize 值等于16
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//依据上边核算结果,segmentShift 等于28,而segmentMask 为15
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//由于c值为1,所以cap值便是它的默认值,MIN_SEGMENT_TABLE_CAPACITY,为2
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//创立名为s0的Segment,其间包括一个长度为2的HashEntry数组
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
//创立长度为16的Segment数组,而且将数组的第一个元素置为s0
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
为剖析更简略,咱们设ConcurrentHashMap结构函数的各参数为默认值,最终将创立一个长度为16的Segment数组,而且将它赋值给ConcurrentHashMap的成员变量segments 。一起创立一个名为s0的Segment,内部有一个长度为2的HashEntry数组,而且将s0作为segments 第一个元素。
这儿边有些参数可以记下来,等会剖析的时分有用。
3、ConcurrentHashMap存储流程
直接查看它的put办法:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//核算key的哈希
int hash = hash(key.hashCode());
//依据hash值核算此键值对应该刺进到segments数组中的哪个Segment当中
//在结构办法中,核算segmentShift等于28,segmentMask等于15,
//所以,j 就等于hash值的高4位
int j = (hash >>> segmentShift) & segmentMask;
//运用Unsafe获取segments中索引为 j 的元素。
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//假如segments数组中,索引 j 上的元素为null,则先初始化此元素
s = ensureSegment(j);
//终究调用Segment的put办法
return s.put(key, hash, value, false);
}
在ConcurrentHashMap初始化章节中,咱们剖析到,结构函数仅仅初始化了segments数组的第一个元素,所以其它元素或许为null,假如需求往某个元素上存储数据,必定先要将其初始化,咱们查看 ensureSegment 办法
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
//核算索引为k的元素的内存偏移值u,可依据u,运用Unsafe操作此元素
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//假如索引为k的元素为null
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//将ss[0]赋值给proto ,
//注意它的命名,将ss[0]当成原型,本认为只能在js中看到原型这个概念,没想到啊。。。
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
//依据proto的HashEntry数组长度,重新结构一个如出一辙长度的数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//重新查看索引为k上的元素是否为空,假如为空,则赋值
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//依然是重新查看,为了运用多线程的环境,注意到它运用了getObjectVolatile办法
//假如其它线程初始化此元素成功了,由于是volatile,所以此线程也是可见的
//这在必定程度上确保了此元素不会被重复赋值
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//在while循环中,测验运用CAS这种原子操作完成数组赋值操作,这必定是线程安全的
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
ensureSegment 运用 ss[0] 作为原型目标,重新结构一个新的Segment ,而且最终运用CAS原子操作为数组赋值,确认线程安全。值得一提的是,在检测数组元素是否为null时,运用了getObjectVolatile办法,Volatile将确保线程修正数组后对其它线程的可见性,必定程度为可以确保线程安全。
咱们持续查看Segment内部的put办法。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//由于Segment承继自ReentrantLock,tryLock测验获取锁
//假如没拿到锁,也不会导致堵塞,则履行scanAndLockForPut办法
//scanAndLockForPut将测验获取到锁,而且回来应该刺进的node,在获取必定次数后仍失利后,将堵塞线程
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//依据hash核算键值对应该存储在HashEntry数组的哪个方位上
int index = (tab.length - 1) & hash;
//获取相应索引位上的第1个元素,由于每个索引方位上理论上都是一个链表
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
//第一次循环时e便是first,假如e不为空而且key持平则更新value值
//否则将e赋值为它的next
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//当e为null时,node是scanAndLockForPut函数回来的要播放的entry
//假如node不为null,将node放置在链表头,设置它的next为first
if (node != null)
node.setNext(first);
else
//node为null时也是放置在链表头,它的next也为first,经过结构办法完成
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//假如c大于数组长度警戒值,则数组扩容,反之设置node值即可。
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//最终,开释锁
unlock();
}
return oldValue;
}
刚刚开始看此办法时,会很疑惑,哪里线程安全了,加锁在啥当地?都没看到。其实Segment承继自ReentrantLock,它自身便是一个锁目标。在刚开始的时分,它会测验性地去获取锁,调用 tryLock 办法,此办法也可以获取到锁,但假如没获取到也不会导致线程堵塞而已。假如没有获取到,则调用 scanAndLockForPut 办法,此办法回来一个要刺进的node,而且堵塞线程。之后的流程则比较简略了,假如其它线程开释锁了,当此线程抢到锁今后,则可以持续履行,后续的逻辑较为简略了,经过死循环,找到要刺进node的方位,刺进即可,一起size增大,假如数组长度大于警戒值,则数组扩容。
现在咱们来看看 scanAndLockForPut 办法:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//获取hash对应索引方位上的第一个元素值
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
//retries 某种状况下代表着测验获取锁的次数,初值为-1
int retries = -1; // negative while locating node
//经过循环,不停地获取锁,注意 tryLock 不会导致线程堵塞
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//由于是一个循环,假如first为null或者e不为null,履行了else的状况,e总有一个时分为null
//当e为null时,而且retries 为负数时,结构出要回来的node而且将retries 置为0
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//当retries不为担任时,它才真实代表着恳求锁的次数,每恳求锁一次就自增1
//假如恳求次数大于常量的话,调用lock办法,将真实堵塞线程
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//假如first值已经被其它线程所改动时,重新获取first值并将retries 设为-1
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
scanAndLockForPut 办法,便是不停地恳求锁,一起结构出一个要刺进HashEntry数组的node出来,假如恳求锁失利到达必定次数,就调用lock办法,这种恳求锁的方式会令线程堵塞,等待其它线程开释锁。
存储流程讲到这儿就差不多了,还有一些办法,比方rehash,有爱好的同学们可以自己看看,都差不多,而且已经也讲过HashMap办法的rehash办法了,尽管HashMap非线程安全,但原理相似。
4、ConcurrentHashMap取出流程
直接看ConcurrentHashMap的get办法:
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
//核算key的hash值
int h = hash(key.hashCode());
//依据hash值核算它是由哪个Segment所持有,再从此Segment的HashEntry数组中寻找
//本文中有太多这样的Unsafe相关操作了,之前已经讲过Unsafe了,此处不再阐明
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//假如Segment不为null,而且内部的HashEntry数组也不为null
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
//在循环中,查看key是否持平,假如持平则回来
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
get办法比较简略,但肯定有同学会提出疑问,get办法没有确保线程安全,没有加锁。get办法获取元素的值经过getObjectVolatile办法,getObjectVolatile办法与volatile关键字相似,volatile关键字有两个作用:
- 确保此变量对一切线程的可见性,这儿的可见性是指当一条线程修正了这个变量的值,新值关于其它线程当即可见
- 制止指令重排序优化,意思便是代码履行次序和代码自身次序共同
volatile关键字在只要单一线程能改动元素的值的状况下十分适用,假如是这种状况,它便是线程安全的,由于单一线程改动元素的值后,其它线程当即可见,读取的时分肯定也是最新的,所以 get 办法仍然是线程安全的。不过,volatile关键字,在多个线程都能改动元素值的状况下,就无法确保线程安全了,更多volatile相关的内容,可参考Java内存模型与线程一文。
回忆前一章论述的,put办法,有必要要获得锁才干更改元素内容,所以,在同一时刻,必定只要一个线程能改动元素内容。
5、总结
ConcurrentHashMap所运用的锁分段技能,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其间一个段数据的时分,其他段的数据也能被其他线程访问。有些办法需求跨段,比方size()和containsValue(),它们或许需求确定整个表而而不仅仅是某个段,这需求按次序确定一切段,操作完毕后,又按次序开释一切段的锁。
总归,ConcurrentHashMap真是一种很特殊的容器,它提出了一种令人眼前一亮的线程安全的计划,读完它的代码,长见识了