Java并发源码分析ConcurrentHashMap线程集合
简介
ConcurrentHashMap
是一个线程安全的集合,底层是通过对指定索引位置上的节点进行加锁,而不是对整个数组加锁,当一个线程对指定索引位置上的节点加了锁之后,其它线程就不能对该索引位置上的节点进行操作,但可以对其它的索引位置上的节点操作,ConcurrentHashMap与HashMap
有许多相似的地方,ConcurrentHashMap
只是在一些产生线程安全的地方加了锁,如果对HashMap
了解的话,再来看ConcurrentHashMap
就简单许多。
常量
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//节点正在移动
static final int MOVED = -1;
//节点已被树化
static final int TREEBIN = -2;
static final int RESERVED = -3; // hash for transient reservations
//32位二进制中正数最大的值,主要用于对key的hash值进行二次运算
static final int HASH_BITS = 0x7fffffff;
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Node<K, V>[] table;
private transient volatile Node<K, V>[] nextTable;
private transient volatile long baseCount;
private transient volatile int sizeCtl;
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
构造方法
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
//校验指定的初始容量大小是否大于等于最大容量大小的一半
//如果大于等于最大容量大小的一半则在后续第一次添加元素的时候使用最大容量大小创建数组
//如果小于最大容量大小的一半则根据指定容量来计算最接近指定容量大小并大于指定容量的2的次方
//此处有一个疑问,指定容量大小如果等于最大容量大小的一半则说明指定容量大小是2的29次方,为什么不使用指定容量大小,而是使用最大的容量大小?
//从tableSizeFor方法也能看出,如果指定的初始容量大小本来就是2的次方的话并不会使用指定的初始容量大小来创建数组对象
//而是使用大于指定初始容量大小的2的次方来创建数组对象,比如指定的初始容量大小为16,那就会使用32来创建数组对象
//在HashMap中如果指定的初始容量大小本来就是2的次方的话则会使用指定的初始容量大小来创建数组对象
//并不会像ConcurrentHashMap一样会取比当前指定的2的次方的初始容量大小大的2的次方的容量大小来创建数组对象
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//将计算后的容量大小赋值给sizeCtl等待初始化
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
//使用默认的初始容量大小等待初始化
this.sizeCtl = DEFAULT_CAPACITY;
//初始化新的数组对象并将指定集合中的元素添加到新的数组对象中
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
//初始容量大小小于并发数则使用并发数来创建数组对象
initialCapacity = concurrencyLevel;
//通过计算获取到扩容的阈值
long size = (long) (1.0 + (long) initialCapacity / loadFactor);
//如果阈值大于等于最大容量大小则使用最大容量大小来创建新的数组对象
//如果阈值小于最大容量大小则使用阈值来计算获取到大于阈值并是2的次方的值
//使用计算后的值来创建新的数组对象
int cap = (size >= (long) MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int) size);
//将计算后的容量大小赋值给sizeCtl等待初始化
this.sizeCtl = cap;
}
在构造方法中并没有初始化数组对象,而是在第一次添加元素的时候才会进行初始化,根据指定的初始容量大小和指定的负载因子进行初始化,如果没有指定初始容量或负载因子则使用默认的,而数组的大小必须是2的次方,最大的数组大小则是2的30次方,如果大于2的30次方的话,数组则不会将继续进行扩容,当数组的容量大小已到达最大时,后续添加的元素则会挂载到链表或红黑树上,如果说指定的初始容量不是2的次方时则会调用tableSizeFor
方法计算出大于指定初始容量并且是2的次方的值,如果说指定的初始容量本身就是2的次方时通过计算出的值还是本身。
此处就有一个疑问,本身的值不是2的次方的时候会通过计算获取到大于指定的初始容量并且是最接近指定的初始容量的2的次方的值,如果本身的值就是2的次方的时候则会取本身的值,但是在第二个构造方法中的tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1))
方法是有问题的,假如initialCapacity 为16
,按理来说最终数组初始化的容量大小应该也为16,但是经过该方法计算最终的数组初始化的容量大小为32,虽然该构造方法有问题,但是并不影响。
put
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null)
//key和value不能为空
throw new NullPointerException();
//将key的hash值的高16位二进制参加运算获取到新的hash值
int hash = spread(key.hashCode());
//指定索引位置上节点的数量
int binCount = 0;
for (Node<K, V>[] tab = table; ; ) {
//数组中指定索引的节点
Node<K, V> f;
//n 数组长度
//i key所在的数组索引位置
//fh 指定索引的节点的hash值
int n, i, fh;
//校验数组是否已经初始化
if (tab == null || (n = tab.length) == 0)
//数组未初始化则初始化数组
//并返回初始化完成的数组对象
tab = initTable();
//通过数组索引长度与key的hash值进行与运算获取到指定索引位置上的元素节点,并校验元素节点是否为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//指定索引位置上的节点不存在则使用传递进来的key和value创建新的节点
//并将新的节点放置到指定索引位置上
if (casTabAt(tab, i, null,new Node<K, V>(hash, key, value, null)))
//节点添加成功则退出循环
break;
//校验索引位置上的节点是否正在移动
} else if ((fh = f.hash) == MOVED)
//如果索引位置上的节点正在移动则帮忙移动节点到新的数组中
//协助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//使用锁锁住指定索引位置上的节点
//这样其它线程就不能对该索引位置上的节点进行操作
//其它线程可以对其它索引位置上的节点进行操作
synchronized (f) {
//校验指定索引位置上的节点是否被更改
if (tabAt(tab, i) == f) {
//校验节点的hash值是否大于等于0
//如果节点的hash值大于等于0则说明该索引位置上只有一个节点或节点是一个链表节点
if (fh >= 0) {
binCount = 1;
//从指定索引位置上的节点开始遍历
for (Node<K, V> e = f; ; ++binCount) {
K ek;
//校验遍历到的节点的hash值以及key是否与传递的key和计算的hash值相同
//如果相同则根据onlyIfAbsent参数来决定是否需要使用新的value替换旧的value
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
//获取旧的value
oldVal = e.val;
if (!onlyIfAbsent)
//使用新的value替换旧的value
e.val = value;
break;
}
//遍历到的节点的hash值以及key不相同
//则判断当前遍历到的节点是否是链表上的最后一个节点
//如果不是最后一个节点则继续遍历
//如果是最后一个节点则为key和value创建一个新的节点
//并将原尾节点的next指针指向新创建的节点
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key, value, null);
break;
}
}
//校验节点是否是一个红黑树节点
} else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
//将指定的key和value封装成一个树节点并添加到红黑树中
//如果红黑树中已经存在相同key的节点则根据onlyIfAbsent参数来决定是否使用新的value替换红黑树中的节点的value
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,value)) != null) {
//获取节点中的旧值
oldVal = p.val;
if (!onlyIfAbsent)
//替换节点中的value
p.val = value;
}
}
}
}
if (binCount != 0) {
//校验索引位置上的节点的数量是否到达树化的阈值
//如果该索引位置上的节点本来就是树节点则不会继续树化
//只有当索引位置上的节点是链表节点并且节点的数量大于等于8的时候才会树化
if (binCount >= TREEIFY_THRESHOLD)
//索引位置上的节点数量已经到达树化的阈值
//则将该索引位置上的所有节点转换为树节点
treeifyBin(tab, i);
if (oldVal != null)
//返回被替换的旧值
return oldVal;
break;
}
}
}
//更新集合中元素节点的数量,并校验是否需要扩容
addCount(1L, binCount);
return null;
}
其实putval
中的逻辑并不难,主要还是putval
中调用的其它一些方法比较难以理解,我们就一步步的来看。
首先如果key
和value
为null
的情况下是不能添加到当前集合中的,再看spread
方法,该方法是将key
本身的hash
值的高16位参加运算获取到新的hash
值,如果数组的长度的二进制是在低16位二进制中就会导致key
高16位的二进制没有参加运算,容易导致运算后的key
的索引位置发生冲突,所以需要将高16位二进制无符号右移16位与原hash
值的二进制进行运算减少索引冲突。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
为什么要与HASH_BITS进行与运算呢? HASH_BITS
是32位二进制中最大的正整数,与HASH_BITS
进行与运算其实就是让计算后的hash
值保持在正整数。
后续整个代码都被for
循环包裹着,只有当添加元素或替换元素成功时才会退出,而for
循环里总共有4个大的分支。
1.数组未初始化,此时就需要调用initTable
方法初始化数组对象,之前也说过数组对象是在第一次添加元素的时候初始化的,这是一种懒加载的思想,当你使用到的时候才会去初始化。
initTable
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
//校验数组是否未被初始化
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//数组长度小于0则说明其它线程正在准备初始化数组
//当前线程则需要让出cpu让其它线程初始化数组
Thread.yield();
//通过cas操作将sizeCtl修改成-1,告知其它线程当前线程正在准备初始化数组
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//如果指定了初始容量则使用指定的初始容量来创建数组
//如果没有指定则使用默认的初始容量大小16来创建数组
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
//初始化数组
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
//将初始化的数组对象赋值给当前集合中的数组对象
//并赋值给tab,下一次循环校验则会发现tab不为空则会退出循环并退出当前方法
table = tab = nt;
//计算数组扩容的阈值
sc = n - (n >>> 2);
}
} finally {
//如果try中的代码块执行失败时sizeCtl则是原先数组的长度
//执行成功时sizeCtl则是扩容的阈值
sizeCtl = sc;
}
break;
}
}
//数组已被初始化返回数组
return tab;
}
我们来看initTable
方法中的代码,首先whlie
循环中校验了数组是否还没被初始化,当数组未被初始化的时候,当前线程则会去尝试初始化数组,直到数组被当前线程或者其它线程初始化成功。
先看(sc = sizeCtl) < 0
这段代码,sizeCtl
小于0分为两种情况,sizeCtl为-1
的时候则说明当前有其它线程正在初始化数组对象,sizeCtl小于-1
的时候则说明当前数组正在进行扩容,从当前情况来看数组正在进行扩容的这种情况是不大可能存在的,所以说只有可能其它线程正在进行初始化数组,当其它线程正在初始化数组对象时,那当前线程则需要让出cpu
的执行权,等待初始化数组对象的线程执行完成。
再看后一个判断,该判断是一个cas
操作,将sizeCtl
修改成-1,告知其它线程当前线程正在初始化数组,如果当前线程修改失败,则说明有其它线程抢先修改了sizeCtl
的值,那当前线程则需要重新走while
循环来查看数组是否初始化完成,如果没有完成那需要让出cpu
的执行权,如果数组已经被其它线程初始化完成,那当前线程则可以对数组进行操作,如果说当前线程执行cas
操作成功,则会先查看是否指定了数组初始的容量大小,如果指定了则使用指定的初始容量大小来创建数组对象,如果没有指定则使用默认的初始容量大小16来创建数组对象,该指定的初始容量并非是真正的初始容量,如果说指定的初始容量是2的次方则会使用该容量大小来创建数组对象,如果说指定的初始容量不是2的次方的时候则会通过计算来获取大于这个指定的初始容量并且是最接近该初始容量的2的次方,并使用计算后的值来初始化数组对象,当初始化完成数组对象时则会计算下一次数组扩容的阈值。
在initTable
方法中我们看到代码中使用一个变量sizeCtl
来区分多种情况,其实sizeCtl
一共分为4种情况,那我们先了解一下分为那4种。
sizeCtl = N && table = null
:当数组还未被初始化的时候,sizeCtl
则是数组初始化的容量大小sizeCtl = N
:当数组已经被初始化了的时候,sizeCtl
则是数组下一次扩容的阈值sizeCtl = -1
:说明当前数组对象正在被初始化sizeCtl = -N
:当前数组正在进行扩容,-N
的低16位二进制转换为十进制数M
,M-1
则是当前正在进行扩容的线程数量,-N
的高16位二进制则是本次扩容的标记
2.通过spread
方法计算后的hash
值与数组索引长度进行与运算获取到当前元素节点所在的索引位置,并调用tabAt
方法根据索引位置计算该索引中的元素节点在内存中的偏移量,再根据偏移量从table
数组中获取元素节点,如果该元素节点为空则根据指定的key
和value
创建一个新的元素节点并调用casTabAt
方法根据cas
操作将新的元素节点添加到数组中指定的索引位置上。
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
//获取数组中第一个元素在内存中开始的偏移量
ABASE = U.arrayBaseOffset(ak);
//获取数组中元素在内存中的增量地址
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//获取增量地址在32位二进制情况下高位连续包含0的个数
//使用31减去包含0的个数获取到需要位移的次数
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
在看tabAt
方法和casTabAt
方法之前,我们先了解一下ConcurrentHashMap
中静态代码块中的一些代码,通过getUnsafe()
方法获取到Unsafe
类,Unsafe
类是一个不安全的类,能通过该类直接对内存中的一些资源进行操作,U.objectFieldOffset
方法能获取到指定名称变量在内存中的偏移量,后续能根据该偏移量直接对变量进行操作,U.arrayBaseOffset
方法获取指定类型数组中第一个元素在内存中的偏移量,ABASE
则是数组中第一个元素在内存中开始的偏移量,U.arrayIndexScale
方法获取指定类型数组中元素在内存中的增量地址,ASHIFT
则是通过增量地址在32位二进制的情况下高位连续为0的个数,并使用31减去连续为0的个数获取到需要位移的次数。
tabAt
static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i) {
//根据指定的索引位置计算该索引中的节点在内存中的偏移量
//并从tab数组中根据索引所在的内存的偏移量获取节点
return (Node<K, V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
}
i << ASHIFT
获取到第一个索引位置到索引i
的增量地址,再加上ABASE
即可获取到索引i在内存中的偏移量,然后在根据偏移量从volatile
类型的tab
数组中获取节点。
casTabAt
static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i,Node<K, V> c, Node<K, V> v) {
//根据指定的索引位置计算该索引中的节点在内存中的偏移量
//并根据偏移量获取节点,并将获取到的节点与传递的节点c进行比较
//如果两个节点相同则使用节点v进行替换
return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
}
根据指定的索引位置的内存偏移量获取节点,如果获取到的节点与传递的节点c
相同则使用节点v进行替换,该方法用于添加一个新的节点或替换节点,添加新的节点的时候,只需要将节点c
设置为null
,替换节点的时候,节点c
则需要设置为原索引位置上的节点。
3.当指定索引位置上的节点不为null
的时候,但是节点的hash
值为-1,此时说明有其它线程正在对数组进行扩容,并且指定索引位置上的节点已经转移到了新的数组中,此时当前线程就需要调用helpTransfer
方法协助其它线程进行扩容,当协助完成时则会对新的数组进行操作。
helpTransfer
final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
Node<K, V>[] nextTab;
int sc;
//f instanceof ForwardingNode 当前节点正在转移
//nextTab = ((ForwardingNode<K, V>) f).nextTable) != null 扩容的新数组是否不为空
//如果条件都为true则说明有线程正在扩容中,当前线程则需要协助扩容
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
//扩容标记
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
//(sc >>> RESIZE_STAMP_SHIFT) != rs 校验扩容标识是否一致,不一致则说明不是同一个数组
//sc == rs + 1 应该是sc == (rs << 16) + 1 校验扩容是否完成
//sc == rs + MAX_RESIZERS 应该是sc == (rs << 16) + MAX_RESIZERS 校验扩容的线程的数量是否到达最大
//transferIndex <= 0 旧数组中还有多少没有转移的索引节点长度,如果小于等于0则说明已经转移完成则不需要协助
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//扩容线程加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//协助扩容
transfer(tab, nextTab);
break;
}
}
//扩容完成则返回新的数组,对新的数组进行操作
return nextTab;
}
//当前节点还未被转移或新的数组还没有被初始化则对旧数组进行操作
return table;
}
我们来看一下协助扩容的方法,局部变量nextTab
和sc
则是最新的扩容数组和扩容标识以及扩容的线程数量。
tab != null和f instanceof FowardingNode
:该条件只是为了代码的健壮性,tab !=null
从代码来看,既然当前线程执行到了协助扩容的方法,那tab
肯定是不为null
的,再看f instanceof FowardingNode
条件,在之前的判断中f的节点的hash值为-1
,-1就已经代表节点已经被转移
,只有当节点已经转移的时候才会执行到当前方法中,此时你就可以发现之前为什么要将节点的hash
值控制在正整数,如果说没有控制在正整数的时候,当节点的hash值恰巧为-1
的时候,就会导致线程执行一些没有必要的代码,而且在后续也不好区分链表节点。(nextTab = ((ForwardingNode<K, V>) f).nextTable) != null
:该条件则是获取最新的扩容数组并校验最新的数组是否不为null
。
当上述条件都为true
的时候则会通过resizeStamp
方法使用旧数组的长度生成一个扩容标识,再看while
循环中的条件语句。
nextTab == nextTable
:校验当前线程协助扩容的数组是否是最新的数组,如果此时nextTable == null
的话则说明已经完成了扩容,那当前线程则不需要协助扩容了,只需要执行自己该执行的操作,如果说当前线程协助扩容的数组不是最新的扩容数组的时候,则会从nextTable
中继续获取f
节点,再从f
节点中获取nextTable
,继续校验是否是最新的扩容数组。
table == tab
:校验扩容是否完成。
(sc = sizeCtl) < 0
:校验是否正在扩容中。
上述条件都为true
的时候则说明数组正在进行扩容中,我们先来看一些在哪些条件下当前线程不会去协助扩容。
(sc >>> RESIZE_STAMP_SHIFT) != rs
:校验扩容标识是否一致,不一致则说明不是对同一长度的数组进行的扩容。
sc == rs + 1
:该条件应该是sc == (rs << 16) + 1
,校验扩容是否完成,在扩容开始的时候sizeCtl为(rs << 16) + 2
,这个2代表着开始扩容的线程以及最后一个扩容完成的线程,在transfer
扩容方法中,每一个线程完成了所否则的区域的时候都会将sizeCtl
的值减1,当最后一个线程执行完成时减1,此时sizeCtl
的值就等于(rs<< 16) + 1
则说明整个数组扩容都已经完成了,已经不需要线程协助了。
sc == rs + MAX_RESIZERS
:该条件应该是sc == (rs << 16) + MAX_RESIZERS
,校验当前正在扩容的线程的数量是否到达最大值(65535)。
transferIndex <= 0
:校验旧数组中还未完成转移的索引节点长度是否小于等于0,小于等于0则说明已经完成了转移则不需要协助。
当上述条件都不成立的时候则说明数组扩容需要协助,此时通过cas
操作将扩容线程的数量加1并调用transfer
方法来协助扩容,该扩容方法放在后面讲解。
4.当上面3个分支都不成立的时候,则说明该索引位置上的节点不为空并且没有被转移,此时就需要使用synchronized
对指定索引位置上的节点加锁,然后根据节点的类型来执行相关的操作。
- 链表节点:链表节点的
hash
值都是大于等于0的,当指定索引位置上的节点是一个链表节点的时候,则会从链表的头节点开始遍历并且记录链表中节点的数量,当链表中的节点的key
与指定的key
相同则根据onlyIfAbsent
参数来决定是否需要替换value
,如果没有相同的key
时,则会为指定的key
和value
创建一个新的节点,并将新的节点添加到链表的尾部。 - 红黑树节点:红黑树的头节点的
hash
值固定为-2,此处直接校验的是头节点的类型是否是红黑树,当指定索引位置上的头节点是一个红黑树节点,则会调用头节点的putTreeVal
方法将指定的key
和value
添加到红黑树中,如果说指定的key
已经存在在红黑树中则会根据onlyIfAbsent
参数来决定是否需要替换value
,此处为什么是调用头节点的putTreeVal
方法呢?看到后面链表转换为红黑树节点的时候就能明白了。
putTreeVal
final TreeNode<K, V> putTreeVal(int h, K k, V v) {
Class<?> kc = null;
boolean searched = false;
for (TreeNode<K, V> p = root; ; ) {
int dir, ph;
K pk;
if (p == null) {
first = root = new TreeNode<K, V>(h, k, v, null, null);
break;
} else if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0) {
if (!searched) {
TreeNode<K, V> q, ch;
searched = true;
if (((ch = p.left) != null &&
(q = ch.findTreeNode(h, k, kc)) != null) ||
((ch = p.right) != null &&
(q = ch.findTreeNode(h, k, kc)) != null))
return q;
}
dir = tieBreakOrder(k, pk);
}
TreeNode<K, V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
TreeNode<K, V> x, f = first;
first = x = new TreeNode<K, V>(h, k, v, f, xp);
if (f != null)
//将新添加的树节点设置为链表的头节点
f.prev = x;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
if (!xp.red)
x.red = true;
else {
//获取写锁,如果获取写锁失败则会将当前线程挂起进行等待
//如果有线程正在读,那当前写的线程就要挂起进行等待
//当读的线程执行完成之后就会去唤醒挂起的线程
//如果说当前线程在写,有线程准备读
//那读线程并不会去读取红黑树中的节点
//而是读取链表的节点,因为TreeBin对象中包含了红黑树的根节点并且也包含了链表的头节点
//如果发生写写的问题呢?
//其实并不会发生写写的问题,因为当前整个方法都被外部的synchronized锁住了
lockRoot();
try {
//平衡红黑树中的节点
root = balanceInsertion(root, x);
} finally {
//释放锁
unlockRoot();
}
}
break;
}
}
assert checkInvariants(root);
return null;
}
putTreeVal
和balanceInsertion
方法在之前的HashMap
中已经讲过,此处就不再讲解,我们主要还是看一下lockRoot
和unlockRoot
方法。
锁状态
//锁状态
volatile int lockState;
//写锁
static final int WRITER = 1; // set while holding write lock
//等待
static final int WAITER = 2; // set when waiting for write lock
//读锁
static final int READER = 4; // increment value for setting read lock
lockRoot
private final void lockRoot() {
//尝试加写锁
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
//加写锁失败则尝试将线程挂起进行等待
contendedLock();
}
通过cas
操作尝试加写锁,当加写锁失败则会调用contendedLock
方法将线程挂起进行等待。
contendedLock
private final void contendedLock() {
boolean waiting = false;
for (int s; ; ) {
if (((s = lockState) & ~WAITER) == 0) {
//没有线程对TreeBin对象下的红黑树加锁
//当前线程则可以对TreeBin对象下的红黑树尝试加锁
if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
if (waiting)
//将等待线程清空
waiter = null;
return;
}
//(s & WAITER) == 0 执行当前判断语句的时候则说明有线程对红黑树加了锁
//如果(s & WAITER)等于0则说明没有线程在等待加锁
//此时当前线程就可以进行等待
} else if ((s & WAITER) == 0) {
//将锁状态的32位二进制中的第2位设置为1
//代表当前有一个线程正在等待加锁
if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
waiting = true;
//将TreeBin中的等待线程设置为当前线程
waiter = Thread.currentThread();
}
} else if (waiting)
//当前线程已经设置成了等待状态了,此时就需要将线程挂起
LockSupport.park(this);
}
}
contendedLock
方法中总共有3个分支,我们就一个一个的开始讲解。
((s = lockState) & ~WAITER) == 0
:校验是否有线程对指定索引位置上的TreeBin
对象中的红黑树进行加锁。
~WAITER = -3
,32位二进制为 1111 1111 1111 1111 1111 1111 1111 1101
WRITER = 1
,1为写锁,32位二进制为 0000 0000 0000 0000 0000 0000 0000 0001
READER = 4
,4为读锁,32位二进制为 0000 0000 0000 0000 0000 0000 0000 0100
使用最新的锁状态与~WAITER
进行与运算,当加了读锁或写锁时,读锁和写锁的二进制标识与~WAITER
二进制标识都为1的情况下则说明已经有线程加了锁,如果有线程加了锁,那当前线程则会执行后续的代码来将自己挂起并等待。
(s & WAITER) == 0
:执行到当前语句的时候就说明已经有线程加了锁,此时就需要校验一下是否有线程挂起并在等待,从代码上来看,并不会出现多个线程同时等待,只会存在一个等待的线程,如果没有线程在等待,则会通过cas
操作将锁状态加上一个有线程在等待的标识,并将TreeBin
对象中的等待线程设置为当前线程。
waiting
:当有线程加了锁,并且将当前线程设置为了等待的线程,此时就需要将当前线程挂起。
为什么说只会存在一个等待的线程呢?
读锁与写锁本就互斥,在get
方法中,获取红黑树中的节点的时候则会加上一个读锁,此时写锁就需要挂起进行等待,当读锁释放完成之后就会唤醒加写锁的线程,如果已经有线程加了写锁,那get
方法则不会从红黑树中获取节点,而是从TreeBin
对象中记录的链表的头节点开始遍历进行匹配,那会不会发生多个线程同时去加写锁呢?其实并不会,因为加写锁的方法的外部整个都被synchronized
锁住了,所以并不会存在多个线程同时加写锁,也不会存在多个等待的线程。
链表添加节点和红黑树添加节点都已经讲解完毕,此时就要看一下后续代码,链表节点在什么情况下会转换为红黑树以及是怎么转换的。
if (binCount != 0) {
//校验索引位置上的节点的数量是否到达树化的阈值
//如果该索引位置上的节点本来就是树节点则不会继续树化
//只有当索引位置上的节点是链表节点并且节点的数量大于等于8的时候才会树化
if (binCount >= TREEIFY_THRESHOLD)
//索引位置上的节点数量已经到达树化的阈值
//则将该索引位置上的所有节点转换为树节点
treeifyBin(tab, i);
if (oldVal != null)
//返回被替换的旧值
return oldVal;
break;
}
在添加一个节点到链表中去,会从链表的头节点开始遍历并记录链表中的节点数量,该数量就是用binCount
记录的,当链表中的节点数量大于等于TREEIFY_THRESHOLD(8)
的时候则会调用treeifyBin
方法来决定是否需要将链表中的所有节点树化并转换为红黑树。
treeifyBin
private final void treeifyBin(Node<K, V>[] tab, int index) {
//指定索引位置上的节点
Node<K, V> b;
//n 数组长度
int n, sc;
if (tab != null) {
//校验数组的长度是否小于最小的树化阈值
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
//获取指定索引位置上的元素节点并校验元素节点是否不为空
//如果元素节点不为空则校验该节点的hash值是大于等于0
//如果hash值大于等于0则说明该节点需要转换为树节点
//如果hash值小于0则说明该节点正在移动或已经被树化了
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
//使用锁锁住指定索引位置上的节点
//在添加元素的时候也会对指定索引位置上的节点加锁
//如果两个索引位置都是相同的索引位置的时候,如果添加元素的线程先加了锁
//那执行到当前方法的线程则会等待添加元素的线程释放锁
//当添加元素的线程释放了锁之后,当前方法的线程则会获取锁将索引位置上所有的节点都树化,包括新添加的元素节点
//如果是执行到当前方法的线程获取到了锁,那添加元素的线程则会等待
//当指定索引位置上的所有节点都树化了之后并释放了锁
//添加元素的线程则会去获取锁,此时添加元素的线程会发现当前索引位置上的节点已经是树节点
//则会将元素节点添加到红黑树中并使红黑树平衡
synchronized (b) {
//校验在获取锁的时候指定索引位置上的节点是否被更改
if (tabAt(tab, index) == b) {
//hd 头节点
//tl 当前遍历到的节点的上一个节点
TreeNode<K, V> hd = null, tl = null;
//从指定索引位置上的节点开始遍历,依次将链表上的所有节点转换为树节点
for (Node<K, V> e = b; e != null; e = e.next) {
//将普通节点转换为树节点
TreeNode<K, V> p = new TreeNode<K, V>(e.hash, e.key, e.val, null, null);
//校验当前遍历的节点是否是链表中的头节点
//如果是头节点则将该节点设置为了树节点的头节点
//如果不是头节点则将当前转换的树节点与上一个树节点进行关联
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//创建一个TreeBin对象并将树化的节点转换为红黑树
//TreeBin对象中保留着红黑树的根节点以及链表的头节点
//再调用setTabAt方法将TreeBin对象添加到指定的索引位置上
//在HashMap中并没有创建一个TreeBin对象来存放红黑树的根节点
//而是直接将红黑树的根节点放置到指定的索引位置上
setTabAt(tab, index, new TreeBin<K, V>(hd));
}
}
}
}
}
(n = tab.length) < MIN_TREEIFY_CAPACITY
:校验数组的长度是否小于最小树化的阈值,并不是说链表中的节点数量到达8了就要将链表节点转换为红黑树节点,前提是需要数组的长度到达了阈值(64)才会转换为红黑树节点,如果数组的长度没有到达阈值则会进行扩容。
(b = tabAt(tab, index)) != null && b.hash >= 0
:校验外部synchronized
释放锁之后,准备节点树化的期间是否有其它线程对该节点进行操作了,该操作分为两种,有线程将该索引位置上的所有节点都删除了或者说有线程将该索引位置上的节点都树化了,这两种情况下就不需要再进行树化了。 我们主要看一下是怎么树化的以及转换成红黑树的,树化的操作其实很简单,就是对指定索引位置上的节点加锁防止其它线程操作,依次从链表的头节点开始遍历,将链表中的Node
节点转换为TreeNode
节点,就这样树化就完成了,而转换为红黑树就是通过创建一个TreeBin
对象来完成的,在构造TreeBin
对象时会将树化的节点转换为红黑树中的节点。
TreeBin
TreeBin(TreeNode<K, V> b) {
//给当前TreeBin对象设置一个树化标识
super(TREEBIN, null, null, null);
//树化的头节点的设置为链表的头节点
this.first = b;
//根节点
TreeNode<K, V> r = null;
//从树化的头节点开始遍历将节点转换为红黑树中的节点,直到没有下一个节点
for (TreeNode<K, V> x = b, next; x != null; x = next) {
//下一个节点
next = (TreeNode<K, V>) x.next;
//初始化每个树节点的左右子节点
x.left = x.right = null;
if (r == null) {
//根节点为空则将当前遍历到的节点设置为根节点
//并将该节点的颜色设置为黑色
x.parent = null;
x.red = false;
r = x;
} else {
//待添加到红黑树中的节点的key
K k = x.key;
//待添加到红黑树中的节点的hash值
int h = x.hash;
Class<?> kc = null;
//从红黑树的根节点开始遍历来校验节点放置的位置
for (TreeNode<K, V> p = r; ; ) {
//dir 节点添加到红黑树中的位置
//ph 被遍历到的红黑树中的节点的key
int dir, ph;
//被遍历到的红黑树中的节点的key
K pk = p.key;
if ((ph = p.hash) > h)
//被遍历到的红黑树中的节点的hash值大于待添加到红黑树中的节点的hash值
//则需要将节点添加到红黑树中的左子节点
dir = -1;
else if (ph < h)
//被遍历到的红黑树中的节点的hash值小于待添加到红黑树中的节点的hash值
//则需要将节点添加到红黑树中的右子节点
dir = 1;
else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K, V> xp = p;
//校验待添加到红黑树中的节点所放置的位置是否有节点存在
//如果有节点存在则重新走循环与子节点继续比较直到没有了子节点
if ((p = (dir <= 0) ? p.left : p.right) == null) {
//放置待添加到红黑树中的节点的位置没有了节点
//则会将节点添加到红黑树中
//将待添加节点的父节点的指针指向当前遍历到的红黑树中的节点
x.parent = xp;
if (dir <= 0)
//将待添加的节点放置到被遍历到的节点的左子节点
xp.left = x;
else
//将待添加的节点放置到被遍历到的节点的右子节点
xp.right = x;
//红黑树中添加了新的节点,可能让红黑树不在平衡
//所以需要将红黑树中的节点进行平衡
r = balanceInsertion(r, x);
break;
}
}
}
}
//TreeBin对象记录根节点
this.root = r;
assert checkInvariants(root);
}
TreeBin
的构造方法比较简单,通过super
方法给当前的TreeBin
对象设置hash值为-2
,然后通过遍历树化后的链表节点,将链表的头节点设置为红黑树的根节点,后续链表中的节点从红黑树的根节点开始进行比较,来获取到链表的节点在红黑树中所存放的位置,红黑树按左小右大的方式存放节点,每添加一个节点到红黑树中,都有可能让红黑树不平衡,所以每次都会尝试是否需要平衡红黑树中的节点。
上图是链表转红黑树,转为红黑树之后,索引位置上是一个TreeBin
对象,并且TreeBin
对象中包含了红黑树的根节点以及链表的头节点,而红黑树中的节点中不仅有红黑树本身的指针,而且还有链表的一些指针,既能当做是红黑树也可以当做链表,上图中并没有将所有节点的链表指针都画出来,只是画了部分节点的链表指针。
当元素添加操作以及链表转换为红黑树操作完成之后,我们再来看ConcurrentHashMap
是如何记录元素数量的以及扩容的。
addCount
private final void addCount(long x, int check) {
//计数器数组
CounterCell[] as;
//b 基本计数器值
long b, s;
//计数器数组counterCells不为空则说明之前已经发生过多线程对基本计数器baseCount进行操作
//计数器数组counterCells为空则说明之前一直在单线程的情况下对基本计数器baseCount进行操作
//如果计数器数组为空则会尝试对基本计数器baseCount进行操作,如果对baseCount操作失败则说明当前处于多线程的情况下
//此时就需要初始化counterCells
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a;
long v;
int m;
boolean uncontended = true;
//as == null || (m = as.length - 1) < 0 校验计数器数组是否初始化,如果没有初始化则调用fullAddCount方法进行初始化
//(a = as[ThreadLocalRandom.getProbe() & m]) == null 当前计数器数组不为空的时候则使用线程生成的随机数
//与计数器数组的长度进行与运算获取到指定索引位置上的计数器对象,并校验计数器对象是否为空
//如果计数器对象为空则调用fullAddCount方法对指定索引位置上的计数器对象进行初始化
//!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) 计数器数组不为空并且运算获取到的指定索引位置上的计数器对象也不为空
//此时就可以对计数器对象进行操作,如果操作失败则说明有其它线程获取到了这个索引位置上的计数器对象并对这个计数器进行操作中
//此时就需要调用fullAddCount方法选择其它索引位置上的计数器对象进行操作
if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//对计数器数组以及计数器数组中的计数器对象进行初始化
//一个线程对计数器对象操作失败则会更换其它索引上的计数器对象进行操作
//如果这个线程多次对计数器对象操作失败并且当前计数器数组的长度小于cpu的数量
//此时就会尝试进行扩容,然后对扩容之后的数组中的计数器对象进行操作
//如果说计数器数组的长度不小于cpu的数量则不会进行扩容
//只会一直更换计数器对象进行操作,直到操作成功
fullAddCount(x, uncontended);
return;
}
//校验check是否小于等于1
//check=-1的情况下则说明删除了指定索引位置上的元素节点
//check=0的情况下则说明指定索引位置上没有元素节点,直接将指定的key和value封装成一个节点放置到指定的索引位置上
//check=1的情况在put方法下则说明指定索引位置上有元素节点,但是只是对索引位置上的元素节点中的value进行了替换
//在其它方法中check=1可能表示在指定的索引位置上添加了1个元素节点
//如果是这三种情况的话则不会去尝试进行扩容
//如果执行到了当前判断语句的话则说明并没有执行上面的fullAddCount方法
//而是通过上面判断语句中的cas操作对指定索引位置上的计数器对象的值操作成功
//但是在check=1的情况下的put方法只是将原本的元素节点中的value替换了
//如果此时还要对计数器对象中的value加1,那不就会造成1个元素节点变成了2个元素节点?
//其实并不会的,如果check=1的情况下put方法并不会走到当前的方法中
if (check <= 1)
return;
//将计数器数组中所有的计数器对象值以及基本的计数器值合并获取到当前集合中的元素节点的数量
s = sumCount();
}
//校验check是否大于等于0
//大于等于0则说明添加了新的元素节点
//此时就需要来校验是否需要进行扩容
//check一共分为5种情况,有三种已经在上面说过,现在来看一下剩下的两种情况
//check=2的情况下代表在红黑树种添加了一个树节点,也有可能是在一个链表节点中添加了一个节点
//check>2的情况下代表在链表节点中添加了一个节点
if (check >= 0) {
Node<K, V>[] tab, nt;
int n, sc;
//当前数组中的元素节点的数量已经到达扩容的阈值并且数组的长度小于数组最大容量大小
//此时会尝试将数组进行扩容,如果扩容失败则会一直进行尝试,直到扩容成功
while (s >= (long) (sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
//获取扩容时的标记
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
break;
//扩容线程加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//协助扩容
transfer(tab, nt);
//将sizeCtl修改成一个很大的负数,告知其它线程现在有线程正在扩容
//为什么要+2而不是+1?+1代表当前扩容的线程数量+1,而+2可能是标识开启扩容
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
//扩容
transfer(tab, null);
//重新计算集合中元素节点的数量
s = sumCount();
}
}
}
addCount
中分为两个分支,分别为元素数量计数以及数组扩容,我们先看计数的分支。
在单线程的情况下只会对基本计数器baseCount
进行操作,如果在多线程的情况下同时对baseCount
进行操作,只会有一个线程操作成功,而其它线程并不会等待继续对baseCount
进行操作,而是调用fullAddCount
方法初始化计数器数组(CounterCells
)以及数组中的计数器对象(CounterCell
)并对数组中的计数器对象进行操作,只要出现过一次多线程的情况,往后都不会对baseCount
操作了,而是直接对计数器数组中的计数器对象进行操作,fullAddCount
中代码具体的意思可以参考下面代码中的注释,这里不再多讲。
fullAddCount
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//使用当前线程生成随机数,如果随机数为0则说明ThreadLocalRandom中的一些参数还未被初始化
//此时就需要进行初始化并重新获取随机数
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (; ; ) {
//计数器数组
CounterCell[] as;
//计数器对象
CounterCell a;
//计数器数组长度
int n;
//计数器对象中的value
long v;
//校验计数器数组是否已初始化
if ((as = counterCells) != null && (n = as.length) > 0) {
//校验当前线程生成的随机数与计数器数组索引长度运算后获取到的指定索引位置上的计数器对象是否为空
if ((a = as[(n - 1) & h]) == null) {
//校验计数器数组的锁状态是否为0
//如果不为0则说明有其它线程正在对计数器数组进行扩容或对计数器数组中的计数器对象进行初始化
if (cellsBusy == 0) {
//创建一个计数器对象,并将值添加到计数器对象中
CounterCell r = new CounterCell(x);
//加锁,防止多个线程同时对通一个索引位置上的计数器对象进行初始化
if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs;
int m, j;
//校验一下运算后的索引位置上的计数器对象是否为空
//如果不为空则说明已经被其它线程初始化了,当前线程就不需要再去初始化
if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
//将创建好的计数器对象添加到运算后的指定索引位置上
rs[j] = r;
//初始化成功
created = true;
}
} finally {
//修改计数器数组锁标识
cellsBusy = 0;
}
if (created)
//初始化成功则退出
break;
//初始化失败,说明已经有其它线程初始化了计数器对象,此时就需要重新走循环选择初始化成功的计数器对象进行操作
continue;
}
}
//其它线程正在对计数器数组进行扩容或初始化指定索引位置上的计数器对象
collide = false;
} else if (!wasUncontended)
//wasUncontended为false则说明有其它线程正在对指定索引位置上的计数器对象进行操作
//此时当前线程则需要获取其它索引位置上的计数器对象进行操作
wasUncontended = true;
//尝试对指定索引位置上的计数器对象进行操作
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
//执行成功则退出
break;
//校验当前是否有线程对计数器数组进行了扩容,将计数器数组中的计数器对象移动到了新的计数器数组中
//如果已经将计数器对象移动到了新的计数器数组中,那重新选择新的计数器数组中的计数器对象来进行操作
//如果没有线程对计数器数组进行扩容,那比较一下当前计数器数组的长度是否大于等于cpu的数量
//如果大于等于cpu的数量则不会进行扩容,只会重新选择计数器数组中的计数器对象来操作
//如果计数器数组的长度小于cpu的数量那就会对计数器数组进行2倍的扩容
else if (counterCells != as || n >= NCPU)
//此处将collide修改为false可能是想再次尝试一下是否能对计数器数组中的计数器对象进行操作
//如果能操作则不进行扩容,这样能减少扩容带来的性能问题
collide = false;
else if (!collide)
collide = true;
//多次获取计数器数组中的计数器对象而不能操作,并计数器数组的长度小于cpu的数量
//导致部分的cpu没有被使用到,此时就需要对计数器数组进行扩容,充分的使用cpu
//对计数器数组加锁
else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
//校验计数器数组是否变更,防止其它线程已经对计数器数组进行了扩容
if (counterCells == as) {
//初始化一个新的计数器数组,大小是原来的2倍
CounterCell[] rs = new CounterCell[n << 1];
//循环将原来的计数器数组中的计数器对象移动到扩容后的计数器数组中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
//更新当前集合中的计数器数组
counterCells = rs;
}
} finally {
//修改计数器数组锁标识
cellsBusy = 0;
}
collide = false;
//使用扩容后的计数器数组进行操作
continue;
}
//计数器对象正在被其它线程操作,此时需要重新生成随机数,获取别的索引位置上的计数器对象来进行操作
h = ThreadLocalRandom.advanceProbe(h);
//计数器数组未被初始化,此时就需要校验一下当前是否有其它线程正在初始化计数器数组
//如果没有,那当前线程就需要将计数器数组初始化的标识设置为1,代表当前有线程正在初始化计数器数组
} else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
if (counterCells == as) {
//创建计数器数组,计数器数组的长度为2的次方
CounterCell[] rs = new CounterCell[2];
//创建一个计数器对象,并将值添加到计数器对象中
//并使用线程生成的随机数与计数器数组索引长度进行运算获取到指定的索引位置
//并将计数器对象放置到指定的索引位置上
rs[h & 1] = new CounterCell(x);
//更新当前集合中的计数器数组
counterCells = rs;
//初始化完成
init = true;
}
} finally {
//修改计数器数组锁标识
cellsBusy = 0;
}
if (init)
//计数器数组初始化完成则退出
break;
//计数器数组正在被其它线程初始化或已经被其它线程初始化完成
//此时尝试对基本的计数器值进行操作
//如果失败则说明有其它线程也在对基本计数器值进行操作
//那就要重新走循环,来看计数器数组是否被初始化完成,如果被初始化完成那就对计数器数组中的计数器对象进行操作
//如果还没有初始化完成则会继续尝试对基本的计数器值进行操作
} else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
我们再看addCount
中的第二个分支,该分支中主要是先校验是否到达了扩容的阈值以及是否小于数组最大的容量大小,条件成立则会校验是否有线程在扩容,如果有线程在扩容,那当前线程则需要协助扩容如果没有那当前线程则开启扩容。
transfer
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
//n 数组长度
//stride 每个cpu需要负责转移的索引长度
int n = tab.length, stride;
//使用数组长度来计算每个cpu需要负责的长度
//每个cpu最少需要负责16的长度
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
//nextTab为空则说明当前没有其它线程在初始化
//不为空则说明当前有其它线程正在初始化,当前线程则需要帮助初始化的线程将旧数组中的元素节点转移到新的数组中
if (nextTab == null) {
try {
//创建新的数组,容量是旧数组的2倍
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
//出现异常时则说明扩容失败,容量大小已经超出MAXIMUM_CAPACITY参数指定的最大的数组容量
//因为MAXIMUM_CAPACITY参数指定的容量大小是int类型中正整数最大的2的次方
//当数组的长度已经到达MAXIMUM_CAPACITY参数指定的容量大小时,如果再进行2倍的扩容就会导致数组的长度变成负数
//此时就会导致扩容失败,将int类型中最大的正整数设置成扩容的阈值来停止本次扩容
//如果数组的长度已到达数组最大的容量大小那就不会进行扩容了
sizeCtl = Integer.MAX_VALUE;
return;
}
//扩容后的数组,当其它扩容的线程发现nextTable不为空时则不会重复扩容
nextTable = nextTab;
//扩容后,默认需要转移整个旧数组
transferIndex = n;
}
//获取扩容后的数组长度
int nextn = nextTab.length;
//创建一个转移的节点
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
//是否继续转移
boolean advance = true;
//标识扩容之后旧数组中的节点是否全部完成转移
boolean finishing = false;
for (int i = 0, bound = 0; ; ) {
Node<K, V> f;
int fh;
while (advance) {
//nextIndex 当前线程开始转移的长度位置
//nextBound 当前线程负责转移的索引位置边界,如果到达这个边界则说明当前线程已经完成了所负责的索引长度的节点
//转移旧数组中的元素节点是从数组的尾部向数组的头部开始转移
int nextIndex, nextBound;
//每次转移完一个索引位置上的节点的时候都会校验一下下一次转移的索引位置是否已经超出边界
//或旧数组中的元素节点都已经全部转移完成
//如果下一次转移的索引位置超出边界,但是剩下需要转移的节点没有被其它线程协助转移
//那么当前线程则继续选择部分的索引长度来转移
if (--i >= bound || finishing)
advance = false;
//校验剩余需要转移的索引长度是否为0,如果为0则说明旧数组中没有需要转移的元素节点了
else if ((nextIndex = transferIndex) <= 0) {
//将转移的节点的索引位置设置为-1,后续会根据该条件退出扩容方法
i = -1;
advance = false;
//根据开始转移的长度位置与每个线程需要转移的长度进行比较
//如果开始转移的长度位置大于每个线程需要转移的长度,那就使用开始转移的长度位置减去每个线程需要转移的长度
//获取到当前线程需要负责转移的索引位置边界
//如果开始转移的长度位置小于等于每个线程需要转移的长度,那就说明当前需要转移的长度不需要其它线程协助
//当前线程则会将剩下需要转移的节点转移到新的数组中
} else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
//将计算后的索引边界赋予bound,用于线程每次转移之后进行校验
//如果已经到达了边界说明线程已经完成了所负责的索引长度的节点转移
//线程不再执行节点的转移
bound = nextBound;
//开始转移的长度位置-1,获取到开始转移的节点的索引位置
i = nextIndex - 1;
advance = false;
}
}
//i < 0 小于0则说明当前线程所负责转移的节点已经完成并且没有其它需要转移的节点了
//i >= n
//i + n >= nextn
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//当前finishing为true时则说明旧数组中的所有节点都已经转移
//此时就需要将新数组设置为当前集合使用的数组并计算下一次的扩容阈值
if (finishing) {
//将扩容的数组置为空,代表当前没有线程正在进行扩容操作
nextTable = null;
//将扩容后的新数组设置为当前集合正在使用的数组
table = nextTab;
//计算下一次扩容的阈值
sizeCtl = (n << 1) - (n >>> 1);
//扩容完成,退出
return;
}
//finishing为false则会走到当前语句,则说明当前线程并不知道旧数组中的元素节点有没有转移完成
//但是当前线程所负责转移的索引节点已经完成,此时就需要将并发扩容中的线程数量减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//扩容标记左移16位获取到线程数量的标记
//使用sc-2获取到当前还剩下的线程数量的标记
//如果两个相等则说明当前线程是最后一个扩容结束的线程
//此时就需要当前线程执行收尾操作,需要从旧数组中的尾部开始向头部节点遍历来检查是否所有的元素节点都转移了
//如果都转移了,则将扩容后的新数组设置为当前集合正在使用的数组,并且计算下一次扩容的阈值
//如果还有元素节点没有转移,当前线程则会将剩下的元素节点进行转移
//如果两个不相等则说明当前线程不是最后一个扩容结束的线程
//当前线程已经完成了所负责的索引位置的元素节点,并且旧数组中没有其他需要转移的节点了,当前线程直接退出扩容
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//当前线程是最后一个结束扩容的线程,此时就需要检查旧数组中是否所有的元素节点都转移了
finishing = advance = true;
//从旧数组的尾部开始检查
i = n;
}
} else if ((f = tabAt(tab, i)) == null)
//如果指定索引位置上的节点为空则直接将旧数组中该索引位置上的节点设置成一个正在转移的节点进行占位
//当有新的线程要对该索引位置的节点操作的时候则会发现该索引位置上的节点是一个正在转移的节点
//则不会对该索引位置上的节点进行操作而是先协助扩容线程进行转移
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//指定索引位置上的节点已经被其它线程处理过
advance = true;
else {
//加锁,防止其它线程对当前需要移动的节点进行操作
synchronized (f) {
//校验节点是否变更
if (tabAt(tab, i) == f) {
//ln 低位节点,该节点放置到新数组中的索引位置与在旧数组中的索引位置相同
//hn 高位节点,该节点放置到新数组中的索引位置是在旧数组中的索引位置加上旧数组的长度
Node<K, V> ln, hn;
//校验节点的hash值是否大于等于0
//如果节点的hash值大于等于0则说明该索引位置上只有一个节点或节点是一个链表节点
if (fh >= 0) {
//使用节点的hash值与旧数组的长度进行与运算
//与运算后的结果分为0和1,0则将该节点放置到低位,1则将节点放置高位
int runBit = fh & n;
//避免后续转移节点的时候没有必要的循环以及创建节点
Node<K, V> lastRun = f;
//遍历整个链表来决定从那个节点开始以及后续的节点都是没有必要遍历和创建的
//而是直接使用指针指向旧数组中的节点,当扩容完成之后旧数组以及旧数组中部分没有被指针引用的节点则会被回收
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//没有必要循环和创建的节点应该放置到新数组中的低为还是高位
if (runBit == 0) {
//低位
ln = lastRun;
hn = null;
} else {
//高位
hn = lastRun;
ln = null;
}
//从头节点开始遍历将需要重新创建的节点进行创建并添加到高位或低位
//在添加到高位或低位的时候,新创建的节点使用的是头插法
//如果有节点不需要重新创建,那不需要重新创建的节点则会放到高位或低位节点的尾部
//在上面的时候如果不需要重新创建的节点其实就已经放在了ln或hn中
//当有节点重新创建了的时候,则ln或hn设置为新创建的节点的下一个节点
//其实整体来说就是头插法,如果说整个链表或部分连续的链表节点不需要重新创建的时候还是保持在旧数组中一样的顺序
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0)
//低位
ln = new Node<K, V>(ph, pk, pv, ln);
else
//高位
hn = new Node<K, V>(ph, pk, pv, hn);
}
//将低位节点链表添加到新数组中所在的索引位置与旧数组所在的索引位置相同
setTabAt(nextTab, i, ln);
//将高位节点链表添加到新数组中所在的索引位置是在旧数组中所在的索引位置加上旧数组的长度
setTabAt(nextTab, i + n, hn);
//将旧数组中的索引位置上的节点设置为一个已经转移的节点
setTabAt(tab, i, fwd);
//继续推进下一次节点转移
advance = true;
//节点是一个红黑树节点
} else if (f instanceof TreeBin) {
TreeBin<K, V> t = (TreeBin<K, V>) f;
//低位头节点和尾节点
TreeNode<K, V> lo = null, loTail = null;
//高位头节点和尾节点
TreeNode<K, V> hi = null, hiTail = null;
//低位节点和高位节点的数量
int lc = 0, hc = 0;
//从TreeBin对象中记录的链表的头节点开始遍历将每一个节点分为低位节点和高位节点
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
//构造新的树节点
TreeNode<K, V> p = new TreeNode<K, V>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
//将构造的树节点的上一个节点指针指向原低位尾节点
//如果原低尾节点为空则说明当前树节点是第一个节点
//那就将当前树节点设置为低位头节点
if ((p.prev = loTail) == null)
lo = p;
else
//原低位尾节点不为空则将原尾节点的下一个节点指针指向当前树节点
loTail.next = p;
//将新构造的树节点设置为低位尾节点
loTail = p;
//低位节点的数量自增
++lc;
} else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//拆分后的高低位节点的数量是否小于等于红黑树转链表的阈值
//如果小于等于则调用untreeify方法将树节点转换为链表节点
//如果高位节点数量或低位节点数量大于阈值则会校验对方节点的数量是否不等于0
//如果说对方的节点数量等于0则说明节点并没有拆分为高位或低位节点
//那就使用原本的TreeBin对象进行转移
//如果对方的节点数量大于0则说明已经拆分为了高低位节点
//此时就需要将高低位节点转换为红黑树并封装成一个TreeBin对象
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K, V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K, V>(hi) : t;
//将低位节点TreeBin对象添加到新数组中所在的索引位置与旧数组所在的索引位置相同
setTabAt(nextTab, i, ln);
//将高位节点TreeBin添加到新数组中所在的索引位置是在旧数组中所在的索引位置加上旧数组的长度
setTabAt(nextTab, i + n, hn);
//将旧数组中的索引位置上的节点设置为一个已经转移的节点
setTabAt(tab, i, fwd);
//继续推进下一次节点转移
advance = true;
}
}
}
}
}
}
transfer
方法中的代码比较多,我们就一段一段的讲解。
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
首先会通过旧数组的长度来计算每个cpu
在转移旧数组中的节点所需要负责的区域长度,每个cpu
最少需要负责16个区域的长度。 nextTab == null
则是校验是否有线程已经在对新的数组进行初始化了,如果没有那当前线程则会去初始化新的数组,新数组的长度则是旧数组的2倍,如果出现异常则说明旧数组的长度已经到达了数组最大的容量大小了,此时就不能再继续扩容了,数组最大的容量大小为2的30次方
,则是int
类型中正整数最大的2的次方,如果再次进行扩容的话就会导致数组的容量大小变成负数。
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
ForwardingNode
节点是一个占位节点,当将旧数组中指定索引位置上的所有节点都转移到了新的数组中,则会使用该节点进行占位,告知其它线程该索引位置上的节点都已经被转移到了新的数组中。
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
} else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
while
循环的作用主要是为每个线程分配所负责的区域以及推进转移的进度。
--i >= bound || finishing
:--i >= bound
则是每次转移完一个索引位置上的节点的时候都会校验一下下一次转移的索引位置的节点是否已经超出当前线程负责转移的边界了,而finishing
则是校验旧数组中所有的元素节点是否都已经完成了转移。
(nextIndex = transferIndex) <= 0
:剩余需要转移的区域的长度是否小于等于0,小于等于0则说明旧数组中的元素节点已经转移完成了,在开始准备转移的时候该值为旧数组的长度,每当有一个线程来协助扩容的时候则会从该值中取一部分长度来负责转移。
条件三则是根据开始转移的长度位置与每个线程需要转移的长度进行比较,如果开始转移的长度位置大于每个线程需要转移的长度,那就使用开始转移的长度位置减去每个线程需要转移的长度,获取到当前线程需要负责转移的索引位置的边界,再将剩余需要转移的长度存放到transferIndex
中,等待其它线程协助或者说等待当前线程转移完所负责的区域之后继续转移剩余的长度,如果开始转移的长度位置小于等于每个线程所需要转移的长度,那就说明当前线程自己可以完成转移,不需要其它线程协助,转移节点的时候则是从数组的尾部向前推进。
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n;
}
}
i小于0
则说明当前线程所负责转移的区域节点已经转移完成,并且数组中没有其它需要转移的节点了,此时就需要通过下面的cas
操作将sizeCtl
中的扩容线程数量减1,线程数量减1之后会去校验当前线程是否是最后一个扩容的线程,如果不是则说明当前线程已经完成了所负责的索引位置的元素节点转移,并且旧数组中没有其他需要转移的节点了,当前线程直接退出扩容,如果是最后一个扩容的线程,此时就需要当前线程执行收尾操作,需要从旧数组中的尾部开始向头部节点变量来检查是否所有的元素节点都转移了,如果还有没被转移的元素节点,那当前线程则会去进行转移,当检查完成之后会将新的数组替换旧的数组,并计算下一次扩容的阈值。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
如果指定索引位置上的节点为空则通过cas操作将旧数组中该索引位置上的节点设置成ForwardingNode
节点进行占位,告知其它线程该索引位置上的节点已经进行了转移,其它线程则不会对该索引位置上的节点进行操作,而是先协助扩容线程进行节点转移。
else if ((fh = f.hash) == MOVED)
advance = true;
索引位置上的节点的hash
值为MOVED
,则说明该索引位置上的节点都已经转移了,当前线程则继续推进索引位置转节点。
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K, V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K, V> lastRun = f;
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K, V>(ph, pk, pv, ln);
else
hn = new Node<K, V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
} else if (f instanceof TreeBin) {
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K, V> p = new TreeNode<K, V>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K, V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K, V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
我们再看真正转移节点的代码,分为转移链表节点和转移红黑树节点。
转移链表节点:链表节点会被分为高位节点链表(hn)
和低位节点链表(ln)
,首先通过头节点的hash
值与旧数组的长度进行与运算,与运算后的结果分为0和1,0则将节点放置到低位,1将该节点放置到高位,然后遍历整个链表来决定从那个节点开始以及后续的节点都是没有必要重新创建的,而是直接使用指针指向旧数组中的节点,lastRun
指针指向的节点以及后续的节点都是没有必要创建的,因为你后续的链表节点没有被拆分为高位或低位,是一个连续存放在高位或低位的链表节点,所以说不需要重新创建节点,比如说一个链表中的所有节点都是放在低位节点中的,那lastRun
指针指向的就是链表的头节点,该链表中的节点并没有改动,并不需要重新创建节点,而是直接将链表中的头节点放置到新的数组中即可,当区分了哪些节点是不需要重新创建的,则会将不需要重新创建的节点的头节点lastRun
赋值给高位或低位,然后从链表的头节点开始遍历将需要创建的节点进行创建并添加到高位链表或低位链表中,在添加到高位链表或低位链表的时候,节点使用的是头插法,创建完成之后,低位节点链表会放置到新数组中所在的索引位置与节点在旧数组中所在的索引位置相同,高位节点链表会放置到新数组中所在的索引位置是节点在旧数组中所在的索引位置加上旧数组的长度,当高低位节点链表都转移完成之后则会在旧数组中该索引位置上添加到一个占位节点。
下面的图中ln
则是低位节点链表,hn
则是高位节点链表,在某些情况下有些节点并不需要重新创建,而是使用原来的节点,最差的情况下就是只有链表节点的尾部的一个节点不需要重新创建。
红黑树节点:从TreeBin
对象中记录的链表的头节点开始遍历,将每一个节点转换为新的树节点,并分为高低位链表节点,校验高低位链表节点中的节点数量是否小于等于红黑树转链表的阈值,如果小于等于则会将高低位链表节点中的所有树节点都转换为普通的Node
节点,如果不小于等于则会将高低位链表节点转换为红黑树。 如果说高位或低位是一个链表节点的话,则会将链表的头节点放置到新的数组中,如果是红黑树的话,则会将TreeBin
对象放置到新的数组中,然后再将旧数组中的索引位置上添加一个占位节点。
注意:其实在转移旧数组中的节点的时候是有问题的,有可能会造成节点数据的丢失
线程A获取到了索引位置上的链表节点,头节点类型为Node
,准备对该链表节点加synchronized
锁进行转移时,此时线程B先加了synchronized
锁,对该索引位置上的链表节点添加了新的节点并将链表节点转换为了红黑树,并将TreeBin
对象放置在了该索引位置上,当线程B释放了锁之后,线程A获取到了锁后去判断之前获取到的头节点Node
是否与索引位置上最新的TreeBin
对象相同,显然是不相同的,当不相同的情况下就会跳过该索引位置上的节点的转移,在上面说过,当扩容完成时,最后一个线程会去检查旧数组中是否还有节点没有转移,如果有则会进行转移,如果说在检查转移的时候也遇到了上面类似的问题,删除节点的时候将TreeBin
转换为Node
,是不是就会跳过该索引位置上的检查,从而导致节点数据丢失。
以上就是Java并发源码分析ConcurrentHashMap线程集合的详细内容,更多关于Java ConcurrentHashMap的资料请关注编程网其它相关文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341