干货:多线程-ConcurrentHashMap源码详解

概述:

HashMap是集合中最常用的数据结构之一,由于HashMap非线程安全,因此不能用于并发访问场景。在jdk1.5之前,通常使用HashTable作为HashMap的线程安全版本。HashMap对读写操作进行全局加锁,在高并发的条件下会造成严重的锁竞争和等待,极大地降低了系统的吞吐量。

干货:多线程-ConcurrentHashMap源码详解

优点:

相比于HashTable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的的基础上提供了更好的写并发能力,并且读操作(get)通常不会阻塞,使得读写操作可并发执行,支持客户端修改ConcurrenHashMap的并发访问度,迭代期间也不会抛出ConcurrentModificationException等等。

缺点:

一致性问题:这是当前所有分布式系统都面临的问题。

注意:

ConcurrentHashMap中key和value值都不能为null,HashMap中key可以为null,HashTable中key不能为null。

ConcurrentHashMap是线程安全的类并不能保证使用ConcurrentHashMap的操作都是线程安全的。

返回目录

ConcurrentHashMap实现原理:

ConcurrentHashMap的基本策略是将table细分为多个Segment保存在数组segments中,每个Segment本身又是一个可并发的哈希表,同时每个Segment都是一把ReentrantLock锁,只有在同一个Segment内才存在竞争关系,不同的Segment之间没有锁竞争,这就是分段锁机制。Segment内部拥有一个HashEntry数组,数组中的每个元素又是一个链表。

为了减少占用空间,除了第一个Segment之外,剩余的Segment采用的是延迟初始化的机制,仅在第一次需要时才会创建(ensureSegment实现) 为了保证延迟初始化存在的可见性,访问segments数组以及table数组的元素,均通过volatile访问,主要借助于Unsafe中原子操作getObjectVolatile来实现,此外,segments中segment的写入以及table中元素和next域的写入均使用UNSAFE.putOrderedObject来完成。这些操作提供了AtomicReferenceArrays的功能。

源码解析:

继承关系:

public class ConcurrentHashMap
extends AbstractMap //集合一些基本功能的实现
implements ConcurrentMap, //需要实现几个删除添加操作
Serializable { //序列化

返回目录

成员变量:

 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认容量
static final float DEFAULT_LOAD_FACTOR = 0.75f; //加载因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//默认并发度,该参数影响segments数组的长度
static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量
//最大容量,构造ConcurrentHashMap时指定的大小超过该值,则会使用该值替换
//ConcurrentHashMap的大小必须是2的幂,且小于等于1<<30 ,以确保不超过int的范围来索引条目
// 注意,这是集合元素最大容量,而不是线程最大个数
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//table数组的最小长度,必须是2的幂,至少为2,以免延迟构造后立即调整大小
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

//允许的最大segment数量,用于限定构造函数参数concurrent_level的边界
//也就是最大允许的线程数量。
static final int RETRIES_BEFORE_LOCK = 2;
//非锁定情况下调用size和containsValue方法的重试次数,
//避免由于table连续修改导致无限重试,次数超过则对全局加锁。
private transient final int hashSeed = randomHashSeed(this);
//和当前相关联,用于keyhash的随机值,用来减少hash冲突
private static int randomHashSeed(ConcurrentHashMap instance) {
if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
return sun.misc.Hashing.randomHashSeed(instance);
}
return 0;
}
final int segmentMask;
//用于索引segment的掩码值(只留下高位),key高位hash码用来选择segment
final int segmentShift; //用来索引segment偏移值
final Segment[] segments; //数组+数组+链表 //segments创建后其容量不可变
transient Set keySet;
transient Set<map.entry>> entrySet;
transient Collection values;
/<map.entry>

返回目录

构造函数:

initialCapacity: 创建ConcurrentHashMap对象的初始容量,即HashEntity的总数量,创建时未指定initialCapacity默认16 ,最大容量为MAXIMUM_CAPACITY.

LoadFactor: 负载因子,用于计算Segment的threshlod域

concurrencyLevel: 即ConcurrentHashMap的并发度,支持同时更新ConccurentHashMap且不发生锁竞争的最大线程数。 但是其并不代表实际并发度,因为会使用大于等于该值的2的幂指数的最小值作为实际并发度,实际并发度即为segments数组的长度。如未指定则默认为16 ;

注意:并发度对ConccurentHashMap性能具有举足轻重的作用,如果并发度设置过小,会带来严重的锁竞争问题;如果并发度设置过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

 @SuppressWarnings("unchecked")
//有参构造
public ConcurrentHashMap(int initialCapacity, //指定容量
float loadFactor, int concurrencyLevel) {
//加载因子和最大线程容量
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
//给定线程容量大于默认最大容量,采用默认最大容量

concurrencyLevel = MAX_SEGMENTS;
/*
* 寻找与给定参数concurrencylevel匹配的最佳数组ssize,
* 必须是2的幂,如果concurrencylevel是2的幂,那么ssize就是
* concurrencyevel,否则concurrencylevel为ssize大于concurrency最小2的幂
*例 : concurrencyLevel为7,则ssize为2^3=8;

*/

// Find power-of-two sizes best matching arguments
int sshift = 0; //记录左移次数,用来计算segment最大偏移值
int ssize = 1; //segment数组长度,也是最大线程数量
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1; //采用位运算而不是直接使用concurrenylevel,
//因为此值可能不一定是2的幂
}
this.segmentShift = 32 - sshift; //索引偏移量
this.segmentMask = ssize - 1; //-1是为了将低位二进制全部变1,达到掩码目的
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//给定容量大于默认最大容量,采用默认最大容量
// 防止索引条目超出int值范围
int c = initialCapacity / ssize; //每个table数组的最大容量
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY; //table数组最小长度
while (cap < c) //因为必须是2的幂,所以采用此方式,保证cap最终是比c的2的幂函数
cap <<= 1;
// create segments and segments[0]
Segment s0 = //创建segments和第一个segment
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize]; //segments
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

//指定最大容量和加载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity) { //指定最大容量
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap() { //默认构造
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//由一个集合构建
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
//集合元素数量处以加载因子就是当前集合容量
DEFAULT_INITIAL_CAPACITY), //默认容量,两个去最大值
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); //默认加载因子,默认并发度
putAll(m); //将集合中元素添加到当前集合中
}

返回目录

原子方法:

ConcurrentHashMap主要使用下面几个方法对segments数组和table数组进行读写,并且保证线程安全性。

其主要使用了UNSAFE.getObjectVolatile提供的volatile读语义,UNSAFE.getObjectVolatile提供了Volatile写语义。

使用其好处为:

UNSAFE.getObjectVolatile使得非volatile声明的对象具有volatile读的语义

要使非volatile声明的对象具有volatile写语义则需要借助操作UNSAFE.putObjectvolatile

UNSAFE.putOrderedObject操作的含义和作用是什么:

为了控制特定条件下的指令重排序和内存可见性问题,java编辑器使用了内存屏障的CPU指令来禁止指令重排序。java中volatile写入使用了内存的屏障中的LoadStore屏障规则,对于 Load1->LoadStore->Store2, 在Store2以及后续写入操作被刷出之前,要保证Load1要读取的数据被读取完毕。

volatile的写所插入的storeLoad是一个耗时的操作,因此出现了一个对volatile写的升级版本,利用lazySet方法对性能进行优化,在实现上对volatile的写只会在之前插入storestore屏障,对于这样的Store1 ;StoreStore;Store2,在store2及后续写入操作执行前,保证store1的写入对其它处理器是可见,也就是按顺序写入。 UNSAFE.putOrderedObject正是提供了这样的语义,避免了写写指定重排序,但是不保证内存可见性,因此需要借助volatile读来保证可见性。

ConcurrentHashMap正是利用了这些高性能的原子读写来避免加锁带来的开销。

// 获取给定table的第i个元素,使用volatile读语义。

 @SuppressWarnings("unchecked")
static final HashEntry
entryAt(HashEntry[] tab, int i) {
return (tab == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
/**
* Sets the ith element of given table, with volatile write
* semantics. (See above about use of putOrderedObject.)
*/
//设置给定table的第i个元素,使用volatile写入语义
static final void setEntryAt(HashEntry[] tab, int i,
HashEntry e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
/*
* 通过Unsafe提供的具有volatile元素访问语义的操作获取Segment数组的第j个元素(如果ss为空)
* 注意:因为Segment数组的每个元素只能设置一次(使用完全有序的写入)
* 所以,一些性能敏感的方法只能依靠此方法作为对空读取的重新检查。
*/
@SuppressWarnings("unchecked")
static final Segment segmentAt(Segment[] ss, int j) {
long u = (j << SSHIFT) + SBASE;
return ss == null ? null :
(Segment) UNSAFE.getObjectVolatile(ss, u);
}
/*
* 根据给定的Hash获取segment
*/

@SuppressWarnings("unchecked")

private Segment segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment) UNSAFE.getObjectVolatile(segments, u);
}
/*
* 根据给定的segment和hash获取table entry 一条链表
*/
@SuppressWarnings("unchecked")
static final HashEntry entryForHash(Segment seg, int h) {
HashEntry[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}

返回目录

HashEntry:内部类,用来存储key-value的数据结构

注意:value和next声明为volatile,是为了保证内存的可见性,也就是保证读取的值都是最新的值,而不会从缓存读取。 写入next域使用volatile写入是为了保证原子性。写入使用原子性操作,读取使用volatile,保证多线程访问的安全性。

 //存储key-value的数据结构,
static final class HashEntry {
final int hash; //hash值
final K key;
volatile V value; //全局可见,用来实现containsValue方法
volatile HashEntry next; //全局可见,netxt节点
//初始化一个节点
HashEntry(int hash, K key, V value, HashEntry next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
设置下一节点
final void setNext(HashEntry n) { //原子性设置下next节点
UNSAFE.putOrderedObject(this, nextOffset, n);
}
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

返回目录

Segment:内部类,用来实行并发操作

segment为ConcurrentHashMap的专用数据结构,同时扩展了ReentrantLock,使得Segment本身就是一把重入锁,方便执行锁定。Segment内部持有一个始终处于一致状态的entry列表,使得读取状态无需加锁(通过volatile读table数组)。调整table大小期间通过复制节点实现,使旧版本的table仍然可以进行遍历。

Segment仅定义需要加锁的可变方法,针对ConcurrentHashMap中相应方法的调用都会被代理到Segment中的方法。这些可变方法使用scanAndLock和scanAndLockForPut在竞争中使用受控旋转(自旋次数受限制的自旋锁) 由于线程的阻塞与唤醒通常伴随着上下文切换,CPU抢占等,都是开销比较大的操作。使用自旋次数受限制的自旋锁,可以提高获取锁的概率,降低线程阻塞的概率,这样可极大提升性能。为什么受限自旋呢?(自旋会不断消耗CPU的时间片,无限制自旋会导致开销增加)所以自旋锁适合多核CPU下,同时线程等待所的时间非常短,若等待时间较长,应该尽早进入阻塞。

成员变量和继承关系:

 static final class Segment 
extends ReentrantLock //继承了重入锁
implements Serializable {

private static final long serialVersionUID = 2249069246763182397L;
//对segment加锁时,在阻塞之前进行的最大自旋次数,
//在多处理器上,使用有限数量的重试来维护在定位节点时获取的高速缓存

//最多自旋64
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//每个segment的table数组,访问数组中元素通过entryAt/setEntryAt提供的
//volatile语义来完成。
transient volatile HashEntry[] table;
//元素数量,只能在锁中或其它volatile读保证可见性之间进行访问。
transient int count;
//当前segment中可变操作发生的次数,put,remove等,可能会溢出32位,
//它为isEmpry()和size()方法中的稳定性检查提供了足够的准确性。
//只能在锁中或者其它volatile读保证可见性之间进行访问。
transient int modCount;
//table大小超过阈值对table进行扩容
transient int threshold; //阈值
final float loadFactor; //负载因子
//构造函数
Segment(float lf, int threshold, HashEntry[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

返回目录

scanAndLockForPut:

while每循环一次,都会尝试获取锁,成功则返回, retries初始值设为-1是为了遍历当前hash对应的桶的链表,找到则停止遍历,未找到则会预创建一个节点;同时,如果头节点发生变化,则会重新进行遍历,直到自旋次数大于MAX_SCAN_RETRIES,使用lock进行加锁,如果失败则会进入等待队列。

为何要遍历一次链表:

scanAndLockForPut使用自旋次数受限制的自旋锁进行优化加锁的方式,此外遍历链表也是一种优化方法,主要是尽可能使当前链表中的节点进入CPU高速缓存,提高缓存命中率,以便获取锁定后的遍历速度更快。 实际上加锁后并没有使用已经找到的节点,因为它们必须在锁定下重新获取,以确保更新的顺序一致性,但是遍历一次后可以更快的进行定位 ,这是一种预热优化方法。 scanAndLock中也使用了该优化方式。

scanAndLock内部实现方式与scanAndLockForPut更简单,scanAndLock不需要预创建节点。因此主要用于remove和replace操作。

 //自旋获取锁
private HashEntry scanAndLockForPut(K key, int hash, V value) {
HashEntry first = entryForHash(this, hash); //根据key的hash值找到头节点
HashEntry e = first;
HashEntry node = null;
int retries = -1; // negative while locating node
while (!tryLock()) { //尝试获取锁,成功返回,不成功开始自旋
HashEntry f; // 用于后续重新检查头结点
if (retries < 0) { //第一次自旋
if (e == null) { //结束遍历节点
if (node == null) //创建节点
node = new HashEntry
(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key)) //找到节点,结束遍历
retries = 0;
else
e = e.next; //下一节点
//如果链表有节点,那么如果没有找到,
//那么就会一致遍历(retries=-1)的状态下
//直到节点到达末尾
}
else if (++retries > MAX_SCAN_RETRIES) { //达到最大尝试次数,
lock(); //进入加锁方法,失败则会进入排队,阻塞当前线程
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//头节点发生变化需要重新遍历,说明油新结点加入或者被移除
e = first = f; // re-traverse if entry changed
retries = -1; //自旋次数归0,重新自旋
}
}
return node;
}

返回目录

put操作:

流程:

先对Segment加锁,然后根据(tab.length-1)&hash找到对应的slot

然后根据slot遍历对应的链表,

如果key对应的entry存在(根据onlyIfAbsent)决定是否替换新值

如果key对应的entry不存在,创建新节点头插法插入

若容量超出限制,则判断是否进行rehash;

几个优化:

在 scanAndLockForPut()中:

如果锁能够很快的获取到,有限次数的自旋可防止线程进入阻塞,有助于提升性能。

在自旋期间会遍历链表,希望遍历后的链表被 cache缓存,为实际put操作过程中的链表遍历操作提供性能(预热优化:遍历一次后可以更快进行定位)

并且还会预创建节点;

HashTable[] tab =table 好处:为什么不直接用table操作

table被声明为volatile,为了保证内存的可见性,table上的修改都必须立即更新到主内存,

volatile写实际是具有一定开销的。 由于put是中代码是加锁执行的,锁是既能保证可见性,也能保证原子性的,因此不需要在对table进行volatile写,将其赋给一个局部变量实现编译,运行时优化。

node.setNext(first)也是同样的道理,next同样是被声明为volatile,因此也是使用优化的方式UNSAFE.putOrderedObject进行volatile写入操作。

put已经加锁,为何访问tab元素不直接通过数组索引,而用entryAt(tab,index):

加锁保证了volatile同步语义,但是对table数组中元素的写入使用UNSAFE.putOrderedObject进行顺序写,该操作只是禁止写写重排序指令,不能保证写入后内存的可见性 所以必须使用使用entryAt(tab,index)提供的volatile读获取最新的数据

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry node = tryLock() ? null : //获得锁成功,
scanAndLockForPut(key, hash, value); //未成功获取锁,则自旋获取锁,
//如果超过自旋次数,则阻塞

V oldValue;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash; // 得到桶位置
HashEntry first = entryAt(tab, index);
//volatile读语义获取index的头节点

for (HashEntry e = first;;) { //遍历链表

if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { //找到节点
oldValue = e.value;
if (!onlyIfAbsent) { //是否替换旧值
e.value = value;
++modCount;//被修改次数
}
break;
}
e = e.next; //不是,继续遍历
}
else { //e==null //没有找到key值
if (node != null) //scanAndLockForPut只有找不到节点才会不返回null
node.setNext(first);
//将node设置为头节点,此处可以看出其为头插法链表插入元素

else //处理tryLock成功返回null值,没有找到节点的情况
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY) //当前table中元素数量大于阈值,
//需要重新进行rehash
rehash(node); //重新hash
else //没有大于阈值
setEntryAt(tab, index, node);//将node头节点插入table中
++modCount; //被修改次数
count = c; //元素个数
oldValue = null; //旧址=null
break;
}
}
} finally {
unlock();
}
return oldValue;
}

返回目录

rehash操作:

rehash主要的作用是扩容,将扩容前table中的节点重新分配到新table中。由于table的capacity都是2的幂,按照2的幂扩容为原来的一倍,扩容前在slot i 中的元素,扩容后要么在slot i中或者 i+扩容前table的capacity的solt中,这样使得只需要移动原来桶中的部分元素即可将所有节点分配到新table中。

为了提高效率,rehash首先找到第一个后续所有节点在扩容后index都保持不变的结点,将这个结点加入扩容后的table的index对应的slot中,然后将节点之前的所有节点重排即可。

	 //重新进行扩容hash,这个操作是已经在put加锁的
@SuppressWarnings("unchecked")
private void rehash(HashEntry node) {
HashEntry[] oldTable = table; //旧的table数组
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1; //二倍扩容
threshold = (int)(newCapacity * loadFactor); //新阈值
HashEntry[] newTable = //新数组长度
(HashEntry[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;

//将旧数组中所有节点复制到新数组中,对旧数组中链表最后同index的进行复用(提高效率)
for (int i = 0; i < oldCapacity ; i++) {//遍历旧数组
HashEntry e = oldTable[i];
if (e != null) { //当前数组节点不为空
HashEntry next = e.next; //当前节点next节点
int idx = e.hash & sizeMask;//新数组角标
if (next == null) // Single node on list
newTable[idx] = e; //旧table数组此角标只有一个节点
else { //当前角标链表不止一个节点
// Reuse consecutive sequence at same slot
HashEntry lastRun = e; //链表头节点
int lastIdx = idx;
for (HashEntry last = next; //对链表进行遍历
//找到后续节点新index不变的节点
last != null;
last = last.next) {
int k = last.hash & sizeMask; //当前节点重新hash后的角标
if (k != lastIdx) { //当前节点k与lastIdx不同则进行替换
//目的是找到该链表最后相同新角标的节点,这样就可以
//最后一段链表一次性加入到新index
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
//后续节点新index不变节点前的所有节点均需要重新创建分配

for (HashEntry p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k]; //当前角标头节点
newTable[k] = new HashEntry(h, p.key, v, n); //头插法
}
}
}
}
//找到当前要插入节点对应的角标
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]); //头插法插入
newTable[nodeIndex] = node; //设置为新的头节点
table = newTable;
}

返回目录

scanAndLock操作:自旋并获取锁

 private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry first = entryForHash(this, hash);
//通过volatile读获取指定坐标的链表
HashEntry e = first;
int retries = -1;

while (!tryLock()) { //自旋尝试获取锁
HashEntry f;
if (retries < 0) {
if (e == null || key.equals(e.key)) //链表为空||找到key对应节点
retries = 0;
else
e = e.next; //遍历链表
}
else if (++retries > MAX_SCAN_RETRIES) { //大于最大自旋次数,阻塞线程
lock();
break;
}
else if ((retries & 1) == 0 && //头结点发生变化,重新自旋并且遍历链表
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}

返回目录

remove操作:

final V remove(Object key, int hash, Object value) {
if (!tryLock()) //未成功获取锁,自旋获取
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry[] tab = table; //已经加锁,不需要volatile写
int index = (tab.length - 1) & hash;
HashEntry e = entryAt(tab, index);
//volatile读,获取index角标的链表
HashEntry pred = null; //前驱

while (e != null) {
K k;
HashEntry next = e.next;
if ((k = e.key) == key || //找到要删除节点
(e.hash == hash && key.equals(k))) {
V v = e.value;
//
if (value == null || value == v || value.equals(v)) {
if (pred == null) //要删除的是头节点
setEntryAt(tab, index, next);
else //非头节点
pred.setNext(next);
++modCount; //修改次数+1
--count; //元素数量--
oldValue = v; //旧值
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}

返回目录

replace操作:

boolean replace(K key, int hash, V oldValue, V newValue):根据旧值替换新值,若旧值发生变化,则返回false

 final boolean replace(K key, int hash, V oldValue, V newValue) {
if (!tryLock()) //尝试获取锁,不成功则自旋获取
scanAndLock(key, hash);
boolean replaced = false;

try {
HashEntry e;
//遍历给定角标链表
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
if ((k = e.key) == key || //找到key值
(e.hash == hash && key.equals(k))) {
//典型CAS操作
if (oldValue.equals(e.value)) { //旧址相同则替换为新值
e.value = newValue;
++modCount;
replaced = true;
}
break; //说明oldvValue已经被别的线程修改
}
}
} finally {
unlock();
}
return replaced;
}

final V replace(K key, int hash, V value) :直接进行替换,并返回旧值

final V replace(K key, int hash, V value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry e;
//遍历链表
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
//找到直接进行替换
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
e.value = value;
++modCount;
break;

}
}
} finally {
unlock();
}
return oldValue;
}

几个常用操作源码解析:

返回目录

get操作: 不需要进行加锁,只关心一个segment;非线程安全,得到的数据可能是过时数据。

public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key); //对key进行hash
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //得到对应segment
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) { //table数据存在
//遍历数组
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//找到指定key
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value; //返回对应value值
}
}
return null;
}

返回目录

boolean isEmpty():判断集合是否为空
public boolean isEmpty() {

long sum = 0L;
final Segment[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) { //遍历sgment数组
Segment seg = segmentAt(segments, j); //得到对应table
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount; //可变操作次数相加
}
}
//有过修改痕迹,再次遍历
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L) //一加一减不为0 说明这期间有被修改过,所以不为null
return false;
}
return true;

返回目录

int size() 得到集合元素个数:

size和containsValue与put和get最大的区别在于,都需要遍历所有的Segment才能得到结果。

这两个源码实现都是先给三次机会。不lock所有的segment,比较相邻两次的modCount和,如果相同则说明在这之间整个集合是没有进行更新操作的。得到的size是正确的。如果三次循环之后仍然没有得到正确答案。那么就对所有的segment进行加锁。计算完毕后在进行解锁。

public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
//为true表示size溢出32
long sum; // sum of modCounts //modCount和
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
//第一次迭代不计入重试,所以会重试三次
try {
for (;;) {
//前三次(-1,0,1) 进行不加锁统计size,如果得不到准确值则第四次加锁统计
if (retries++ == RETRIES_BEFORE_LOCK) { //默认table大小为2
//sgments全部加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//遍历segment
for (int j = 0; j < segments.length; ++j) {
Segment
seg = segmentAt(segments, j);
if (seg != null) { //当前table不为空
sum += seg.modCount;
int c = seg.count; //当前segment元素个数
if (c < 0 || (size += c) < 0)
overflow = true; //size溢出
}
}
if (sum == last) //两次统计的可变操作修改次数相同,说明全部加锁成功
//并且获得准确size
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
// 三次以后才进行加锁,因此此时需要解锁
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size; //溢出返回最大值,否则返回size
}

返回目录

boolean containsValue(Object value) /boolean contains(Object value) :集合中是否包含此值
public boolean containsValue(Object value) {
// Same idea as size()
if (value == null) //hashMap不允许value为null
throw new NullPointerException();
final Segment[] segments = this.segments;
boolean found = false; //是否包含
long last = 0;
int retries = -1;
try {
outer: for (;;) {
//三次不加锁尝试寻找,如果未成功则进行加锁寻找

if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
long hashSum = 0L;
int sum = 0;
//遍历segments
for (int j = 0; j < segments.length; ++j) {
HashEntry[] tab;
Segment seg = segmentAt(segments, j);

if (seg != null && (tab = seg.table) != null) {
//遍历table
for (int i = 0 ; i < tab.length; i++) {
HashEntry e;
//遍历链表
for (e = entryAt(tab, i); e != null; e = e.next) {
V v = e.value;
if (v != null && value.equals(v)) { //找到value
found = true;
break outer; //跳出死循环
}
}
}
sum += seg.modCount; //可变操作次数相加
}
}
if (retries > 0 && sum == last) //两次操作中间没有发生可变操作次数变化
//说明value值不存在
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) { //大于2 需要解锁
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return found;
}

返回目录

弱一致性:

ConcurrentHashMap是弱一致性的,它的get/containsKey/clear/iterator都是弱一致性的。

get和containsKey都是无锁操作,均通过getObjectVolatile()提供的原子读来获得Segment以及对应的链表,然后遍历链表。由于遍历期间其它线程可能对链表结构做了调整 ,所以返回的可能是过时数据。


分享到:


相關文章: