JUC容器类
线程安全的同步容器类
Java同步容器类通过Synchronized(内置锁)来实现同步的容器,比如Vector
、HashTable
以及SynchronizedList
等容器。线程安全的同步容器类主要有Vector、Stack、HashTable等。另外,Java还提供了一组包装方法,将一个普通的基础容器包装成一个线程安全的同步容器。例如通过Collections.synchronized包装方法能将一个普通的SortedSet容器包装成一个线程安全的SortedSet同步容器。
Collections.synchronize
Collections.synchronized
相关的包装方法是 Java 中用于将非线程安全的集合类转换为线程安全的集合类的工具方法,位于 java.util.Collections
类中。
以下是一些常见的 Collections.synchronized
包装方法:
1 | import java.util.ArrayList; |
同步容器面临的问题
可以通过查看Vector、HashTable、java.util.Collections同步包装内部类的源码,发现这些同步容器实现线程安全的方式是:在需要同步访问的方法上添加关键字synchronized。由于锁的存在,同步容器的操作在同一时刻只能有一个线程执行,这使得原本可以并行执行的操作变成了串行执行。比如多个线程对容器进行读写操作时,不能并发进行,会降低系统的吞吐量。
Collections 虽能把基础容器包装成线程安全的同步容器,但这类同步容器包装类在对元素进行迭代时,无法执行元素添加操作。
为了解决同步容器的性能问题,有了JUC高并发容器。
JUC高并发容器
JUC基于非阻塞算法(Lock Free,无锁编程)提供了一组高并发容器,包括高并发的List、Set、Queue、Map容器。
什么是高并发容器
JUC高并发容器是基于非阻塞算法(或者无锁编程算法)实现的容器类,无锁编程算法主要通过CAS(Compare And Swap)+Volatile组合实现,通过CAS保障操作的原子性,通过volatile保障变量内存的可见性。
无锁编程算法的主要优点如下:
- 开销较小:不需要在内核态和用户态之间切换进程。
- 读写不互斥:只有写操作需要使用基于CAS机制的乐观锁,读读操作之间可以不用互斥。
JUC包中提供了List、Set、Queue、Map各种类型的高并发容器,如ConcurrentHashMap
、ConcurrentSkipListMap
、ConcurrentSkipListSet
、CopyOnWriteArrayList
和CopyOnWriteArraySet
。在性能上,ConcurrentHashMap通常优于同步的HashMap,ConcurrentSkipListMap通常优于同步的TreeMap。当读取和遍历操作远远大于列表的更新操作时,CopyOnWriteArrayList优于同步的ArrayList。
List
JUC包中的高并发List主要有CopyOnWriteArrayList
,对应的基础容器为ArrayList。
CopyOnWriteArrayList
相当于线程安全的ArrayList,它实现了List接口。在读多写少的场景中,其性能远远高于ArrayList的同步包装容器。
Set
JUC包中的Set主要有CopyOnWriteArraySet
和ConcurrentSkipListSe
t。
CopyOnWriteArraySet
继承自AbstractSet类,对应的基础容器为HashSet
。其内部组合了一个CopyOnWriteArrayList对象,它的核心操作是基于CopyOnWriteArrayList实现的。ConcurrentSkipListSet
是线程安全的有序集合,对应的基础容器为TreeSet
。它继承自AbstractSet,并实现了NavigableSet接口。ConcurrentSkipListSet
是通过ConcurrentSkipListMap实现的。
Map
JUC包中Map主要有ConcurrentHashMap
和ConcurrentSkipListMap
。
ConcurrentHashMap
对应的基础容器为HashMap
。JDK 6中的ConcurrentHashMap
采用一种更加细粒度的“分段锁”加锁机制,JDK8中采用CAS无锁算法。ConcurrentSkipListMap
对应的基础容器为TreeMap
。其内部的Skip List(跳表)结构是一种可以代替平衡树的数据结构,默认是按照Key值升序的。
Queue
JUC包中的Queue的实现类包括三类:单向队列、双向队列和阻塞队列。
ConcurrentLinkedQueue
是基于列表实现的单向队列,按照FIFO(先进先出)原则对元素进行排序。新元素从队列尾部插入,而获取队列元素则需要从队列头部获取。ConcurrentLinkedDeque
是基于链表的双向队列,但是该队列不允许null元素。作为双向队列,ConcurrentLinkedDeque
可以当作“栈来使用,并且高效地支持并发环境。除了提供普通的单向队列、双向队列外,JUC拓展了队列,增加了可阻塞的插入和获取等操作,提供了一组阻塞队列,具体如下:
ArrayBlockingQueue
:基于数组实现的可阻塞的FIFO队列。LinkedBlockingQueue
:基于链表实现的可阻塞的FIFO队列。PriorityBlockingQueue
:按优先级排序的队列。DelayQueue
:按照元素的Delay时间进行排序的队列。SynchronousQueue
:无缓冲等待队列。
CopyOnWriteArrayList
在很多应用场景中读操作常远超写操作,读操作不修改原有数据,每次读取都加锁是资源浪费,应允许多线程同时访问List内部数据(读操作线程安全)。写时复制(Copy On Write,COW)是计算机程序设计领域的优化策略,其核心思想为多个访问器访问资源时指向同一资源,若有修改器要修改该资源,系统会给修改器复制专用副本,其他访问器看到的资源不变且修改过程对它们透明,COW的主要优点是无修改器修改资源时不会创建副本,多个访问器可共享同一份资源。
CopyOnWriteArrayList的使用
前面讲到,Collections可以将基础容器包装为线程安全的同步容器,但是这些同步容器包装类在进行元素迭代时并不能进行元素添加操作。下面是一个简单的例子:
1 | import java.util.ArrayList; |
该如何解决此问题呢?可使用CopyOnWriteArrayList
替代Collections.synchronizedList
同步包装实例,具体的代码如下:
1 | import java.util.ArrayList; |
CopyOnWriteArrayList的原理
CopyOnWrite(写时复制)就是在修改器对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存复制一份,在新的内存中进行写操作,写完之后,再将原来的指针(或者引用)指向新的内存,原来的内存被回收。
CopyOnWriteArrayList是写时复制思想的一种典型实现,其含有一个指向操作内存的内部指针array,而可变操作(add、set等)是在array数组的副本上进行的。当元素需要被修改或者增加时,并不直接在array指向的原有数组上操作,而是首先对array进行一次复制,将修改的内容写入复制的副本中。写完之后,再将内部指针array指向新的副本,这样就可以确保修改操作不会影响访问器的读取操作。
1 | public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { |
读取操作
访问器的读取操作没有任何同步控制和锁操作,理由是内部数组array不会发生修改,只会被另一个array替换,因此可以保证数据安
全。
1 | public E get(int index) { |
写入操作
CopyOnWriteArrayList的写入操作add()方法在执行时加了独占锁以确保只能有一个线程进行写入操作,避免多线程写的时候会复制出多个副本。
1 | public boolean add(E e) { |
从add()
操作可以看出,在每次进行添加操作时,CopyOnWriteArrayList
底层都是重新复制一份数组,再往新的数组中添加新元素,待添加完了,再将新的array引用指向新的数组。当add()
操作完成后,array的引用就已经指向另一个存储空间了。
既然每次添加元素的时候都会重新复制一份新的数组,那就带来了一个问题,就是增加了内存的开销,如果容器的写操作比较频繁,那么其开销就比较大。所以,在实际应用的时候,CopyOnWriteArrayList并不适合进行添加操作。但是在并发场景下,迭代操作比较频繁,CopyOnWriteArrayList就是一个不错的选择。
迭代器实现
CopyOnWriteArray有自己的迭代器,该迭代器不会检查修改状态,也无须检查状态。为什么呢?因为被迭代的array数组可以说是只读的,不会有其他线程能够修改它。
1 | static final class COWIterator<E> implements ListIterator<E> { |
迭代器的快照成员会在构造迭代器的时候使用CopyOnWriteArrayList
的array成员去初始化,具体如下:
1 | //获取迭代器 |
CopyOnWriteArrayList的优点
CopyOnWriteArrayList
有一个显著的优点,那就是读取、遍历操作不需要同步,速度会非常快。所以,CopyOnWriteArrayList
适用于读操作多、写操作相对较少的场景(读多写少),比如可以在进行“黑名单”拦截时使用CopyOnWriteArrayList
。
CopyOnWriteArrayList和ReentrantReadWriteLock的比较
CopyOnWriteArrayList
和ReentrantReadWriteLock
读写锁的思想非常类似,即读读共享、写写互斥、读写互斥、写读互斥。但是前者相比后者的更进一步:为了将读取的性能发挥到极致,**CopyOnWriteArrayList
读取是完全不用加锁的,而且写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待,读操作的性能得到大幅度提升**。
BlockingQueue
在多线程环境中,通过BlockingQueue(阻塞队列)可以很容易地实现多线程之间的数据共享和通信,比如在经典的“生产者”和“消费者”模型中,通过BlockingQueue可以完成一个高性能的实现版本。
BlockingQueue的特点
阻塞队列与普通队列(ArrayDeque等)之间的最大不同点在于阻塞队列提供了阻塞式的添加和删除方法。
- 阻塞添加
- 阻塞添加是指当阻塞队列元素已满时,队列会阻塞添加元素的线程,直到队列元素不满时,才重新唤醒线程执行元素添加操作。
- 阻塞删除
- 阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程,再执行删除操作。
阻塞队列的常用方法
先来看看阻塞队列接口提供的主要方法:
1 | public interface BlockingQueue<E> extends Queue<E> { |
4个特征说明如下:
- 抛出异常: 如果试图的操作无法立即执行,就抛出一个异常。
- 特殊值: 如果尝试的操作无法立即执行,就返回一个特定的值(通常是true/false)。
- 阻塞:如果尝试的操作无法立即执行,该方法的调用就会发生阻塞,直到能够执行。
- 限时阻塞:如果尝试的操作无法立即执行,该方法的调用就会发生阻塞,直到能够执行,但等待时间不会超过设置的上限值。
操作类型 | 抛出异常 | 特殊值 | 阻塞 | 限时阻塞 |
---|---|---|---|---|
添加 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
删除 | remove() | poll() | take() | poll(time, unit) |
获取元素 | element() | peek() | 不可用 | 不可用 |
添加类方法
add(E e)
:添加成功则返回true,失败就抛出IllegalStateException异常。offer(E e)
:成功则返回true,如果此队列已满,就返回false。put(E e)
:将元素添加至此队列的尾部,如果该队列已满,就一直阻塞。
删除类方法
poll()
:获取并移除此队列的头元素,若队列为空,则返回null。take()
:获取并移除此队列的头元素,若没有元素,则一直阻塞。remove(Object o)
:移除指定元素,成功则返回true,失败则返回false。
获取元素类方法
element()
:获取但不移除此队列的头元素,没有元素则抛出异常。peek()
:获取但不移除此队列的头元素,若队列为空,则返回null。
常见的BlockingQueue
在了解了BlockingQueue的主要方法后,接下来介绍一下BlockingQueue家族大致有哪些成员。BlockingQueue的实现类有
ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、PriorityBlockingQueue、SynchronousQueue等。
ArrayBlockingQueue
ArrayBlockingQueue是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组来存储元素。除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue的添加和删除操作共用同一个锁对象,由此意味着添加和删除无法并行运行,这点不同于LinkedBlockingQueue。ArrayBlockingQueue完全可以将添加和删除的锁分离,从而添加和删除操作完全并行。Doug Lea之所以没这样去做,是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧。
为什么ArrayBlockingQueue比LinkedBlockingQueue更加常用?前者在添加或删除元素时不会产生或销毁任何额外的Node(节点)实例,而后者会生成一个额外的Node实例。在长时间、高并发处理大批量数据的场景中,LinkedBlockingQueue产生的额外Node实例会加大系统的GC压力。
LinkedBlockingQueue
LinkedBlockingQueue是基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。LinkedBlockingQueue对于添加和删除元素分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下,生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
需要注意的是,在新建一个LinkedBlockingQueue对象时,若没有指定其容量大小,则LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就已经被消耗殆尽了。
DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue是一个没有大小限制的队列,因此往队列中添加数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
DelayQueue的使用场景较少,但是相当巧妙,常见的例子是使用DelayQueue来管理一个超时未响应的连接队列。
PriorityBlockingQueue
基于优先级的阻塞队列和DelayQueue类似,PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。在使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。
SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么大家都在集市等待。相对于有缓冲的阻塞队列(如LinkedBlockingQueue)来说,SynchronousQueue少了中间缓冲区(如仓库)的环节。如果有仓库,生产者直接把产品批发给仓库,不需要关心仓库最终会将这些产品发给哪些消费者,由于仓库可以中转部分商品,总体来说有仓库进行生产和消费的吞吐量高一些。反过来说,又因为仓库的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低,所以对单个消息的响应要求高的场景可以使用SynchronousQueue。
声明一个SynchronousQueue有两种不同的方式:公平模式和非公平模式。公平模式的SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体现出整体的公平特征。非公平模式(默认情况)的SynchronousQueue采用非公平锁,同时配合一个LIFO堆栈(TransferStack内部实例)来管理多余的生产者和消费者。对于后一种模式,如果生产者和消费者的处理速度有差距,就很容易出现线程饥渴的情况,即可能出现某些生产者或者消费者的数据永远都得不到处理。
ArrayBlockingQueue的基本使用
下面通过ArrayBlockingQueue队列实现一个生产者-消费者的案例,通过该案例简单了解其使用方式和方法。具体的代码在前面的生
产者和消费者实现基础上进行迭代——Consumer(消费者)和Producer(生产者)通过ArrayBlockingQueue队列获取和添加元素。
- 其中,消费者调用take()方法获取元素,当队列没有元素时就阻塞;
- 生产者调用put()方法添加元素,当队列满时就阻塞。
通过这种方式便可以实现生产者-消费者模式,比直接使用等待唤醒机制或者Condition条件队列更加简单。
实现生产者-消费者模式
DataBuffer(共享数据区)使用一个ArrayBlockingQueue用于缓存数据,具体的代码如下:
1 | import java.util.concurrent.ArrayBlockingQueue; |
ArrayBlockingQueue构造器
创建公平与非公平阻塞队列的代码如下:
1 | //默认非公平阻塞队列 |
ArrayBlockingQueue的两个构造器的源码如下:
1 | //只带一个capacity参数的构造器 |
ArrayBlockingQueue内部的阻塞队列是通过重入锁ReenterLock和Condition条件队列实现的,接下来看看其内部的成员变量
ArrayBlockingQueue内部的成员变量
ArrayBlockingQueue是一个基于数组(Array)实现的有界阻塞队列,内部成员变量如下:
1 | public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { |
ArrayBlockingQueue内部是通过数组对象items来存储所有的数据的,通过ReentrantLock类型的成员lock控制添加线程与删除线程的并
发访问。ArrayBlockingQueue使用等待条件对象notEmpty成员来存放或唤醒被阻塞的消费(take)线程,当数组对象items有元素时,告诉
take线程可以执行删除操作。同理,ArrayBlockingQueue使用等待条件对象notFull成员来存放或唤醒被阻塞的生产(put)线程,当队列
未满时,告诉put线程可以执行添加元素的操作。
ArrayBlockingQueue的takeIndex
成员为消费(或删除元素)的索引,标识的是下一个方法(take、poll、peek、remove)被调用时获
取数组元素的位置。**putIndex
成员为生产(或添加元素)的索引**,代表下一种方法(put、offer、add)被调用时元素添加到数组中的位
置。
add()方法的实现
1 | //AbstractQueue |
从源码可以看出,add()方法间接调用了offer()方法,如果offer()方法添加失败,那么add()将抛出IllegalStateException异常,如果offer()方法添加成功,那么add()返回true。
offer()方法的实现
offer()方法根据数组是否满了,分两种场景进行操作:
- 如果数组满了,就直接释放锁,然后返回false。
- 如果数组没满,就将元素入队(加入数组),然后返回true。
1 | // ArrayBlockingQueue |
enqueue()方法的实现
offer()调用了enqueue(E x)元素入队方法
1 | // ArrayBlockingQueue |
首先,由于进入enqueue()方法意味着数组没满,因此enqueue()方法可以通过putIndex索引直接将元素添加到数组items中,然后调整putIndex索引值。其次,enqueue()完成尾部的插入后,将自己的元素个数成员count+1。最后,enqueue()通过调用notFull.notEmpty()唤醒一个消费(或删除)线程。
这里大家可能会疑惑:当putIndex索引大小等于数组长度时,为什么需要将putIndex重新设置为0呢?
这是因为获取元素时总是在队列头部(takeIndex索引)操作,添加元素总是在队列尾部(putIndex索引)操作,而ArrayBlockingQueue将内部数组作为环形队列使用,所以在更新后索引值与数组长度相等时需要进行校正,下一个值就需要从数组的第一个元素(索引值0)开始操作。
阻塞式添加元素:put()方法的原理
首先来看阻塞式添加元素。在队列满而不能添加元素时,执行添加元素的线程会被阻塞。put()方法是一个阻塞的方法,如果队列元素已满,那么当前线程会被加入notFull条件对象的等待队列中,直到队列有空位置才会被唤醒执行添加操作。但如果队列没有满,就直接调用enqueue(e)方法将元素加入数组队列中。
1 | public void put(E e) throws InterruptedException { |
下面总结一下put()方法的添加操作流程。
- 获取putLock锁。
- 如果队列已满,就被阻塞,put线程进入notFull的等待队列中排队,等待被唤醒。
- 如果队列未满,元素通过enqueue方法入队。
- 释放putLock锁。
- 当队列已满时,新到来的put线程将被添加到notFull的条件队列中进行阻塞等待
非阻塞式删除元素:poll()方法的原理
poll()方法删除获取此队列的头元素,若队列为空,则立即返回null。poll()方法的实现比较简单,其具体的删除操作委托给了dequeue(E x)
元素出队方法。
1 | public E poll() { |
dequeue(E x)元素出队
1 | //删除队列头元素并返回 |
- 进入dequeue()方法,意味着takeIndex位置有元素可以删除,反过来说,如果takeIndex位置没有元素,就不会进入此方法。所以,第一步是拿到takeIndex位置的元素。
- 将takeIndex位置后移(自增),移动到下一个位置,无论一个位置有没有元素都没有关系,总之移动之后的takeIndex新位置会是下一轮删除元素的位置。
- 如果takeIndex自增之后值为items.length,说明takeIndex的索引已到数组尽头,就将其值校正为0,表示下一次从头部开始删除元素,达到环形队列的效果。
- 删除了元素说明队列有空位,唤醒notFull条件等待队列中的一个put线程,执行添加操作。
阻塞式删除元素:take()方法的原理
take()方法是一个可阻塞、可中断的删除方法,主要做了两件事:
如果队列没有数据,就将线程加入notEmpty等待队列并阻塞线程,一直到有生产者插入数据后通过notEmpty发出一个消息,notEmpty将从其等待队列唤醒一个消费(或者删除)节点,同时启动该消费线程。
如果队列有数据,就通过dequeue()执行元素的删除(或消费)操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14//从队列头部移除元素,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //可中断
try {
//如果队列没有元素
while (count == 0)
//执行阻塞操作
notEmpty.await();
return dequeue(); //如果队列有元素就执行删除操作
} finally {
lock.unlock();
}
}
take()方法其实很简单,有就删除,没有就阻塞。如果队列没有数据,就将线程加入notEmpty条件队列等待,如果有新的put线程添加了数据,那么put操作将会唤醒一个处于阻塞状态的take线程执行消费(或删除)操作
peek()直接返回当前队列的头元素
peek()方法从takeIndex(头部位置)直接就可以获取最早被添加的元素,所以效率是比较高的,如果不存在就返回null。
1 | public E peek() { |
ConcurrentHashMap
ConcurrentHashMap是一个常用的高并发容器类,也是一种线程安全的哈希表。Java 7以及之前版本中的ConcurrentHashMap使用Segment(分段锁)技术将数据分成一段一段存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
Java 8对其内部的存储结构进行了优化,使之在性能上有进一步的提升。ConcurrentHashMap和同步容器HashTable的主要区别在锁的类型和粒度上:HashTable实现同步是利用synchronized关键字进行锁定的,其实是针对整张哈希表进行锁定的,即每次锁住整张表让线程独占,虽然解决了线程安全问题,但是造成了巨大的资源浪费。
HashMap和HashTable的问题
基础容器HashMap是线程不安全的,在多线程环境下,使用HashMap进行put操作时,可能会引起死循环,导致CPU利用率飙升,甚至接近100%,所以在高并发情况下是不能使用HashMap的。于是JDK提供了一个线程安全的Map——HashTable,HashTable虽然线程安全,但效率低下。HashTable和HashMap的实现原理几乎一样,区别有两点:
(1)HashTable不允许key和value为null。
(2)HashTable使用synchronized来保证线程安全,包含get()/put()在内的所有相关需要进行同步执行的方法都加上了synchronized关键字,对这个Hash表进行锁定。
HashTable线程安全策略的代价非常大,这相当于给整个哈希表加了一把大锁。当一个线程访问HashTable的同步方法时,其他访问HashTable同步方法的线程就会进入阻塞或轮询状态。若有一个线程在调用put()方法添加元素,则其他线程不但不能调用put()方法添加元素,而且不能调用get()方法来获取元素,相当于将所有的操作串行化。所以,HashTable的效率非常低下。
JDK 1.7版本ConcurrentHashMap的结构
JDK 1.7的ConcurrentHashMap的锁机制基于粒度更小的分段锁,分段锁也是提升多并发程序性能的重要手段之一,和LongAdder一样,属于热点分散型的削峰手段。
分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,分段锁技术将Key分成一个一个小Segment存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
ConcurrentHashMap的内部结构的层次关系为ConcurrentHashMap→Segment[]→HashEntry[]
。这样设计的好处在于,每次访问的时候只需要将一个Segment锁定,而不需要将整个Map类型集合都锁定。
JDK 1.7中的ConcurrentHashMap由 Segment 数组结构和 HashEntry 数组构成的。一个ConcurrentHashMap中包含一个Segment数组,一个Segment中包含一个HashEntry数组,每个元素是一个链表结构(一个Hash表的桶)。
Segment 是一种可重入的锁 ReentrantLock,HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表结构,一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。
ConcurrentHashMap 读写过程如下:
- get 方法
- 为输入的 Key 做 Hash 运算,得到 hash 值。
- 通过 hash 值,定位到对应的 Segment 对象
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
- put 方法
- 为输入的 Key 做 Hash 运算,得到 hash 值。
- 通过 hash 值,定位到对应的 Segment 对象
- 获取可重入锁
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
- 插入或覆盖 HashEntry 对象。
- 释放锁。
单一的 Segment 结构如下:
像这样的 Segment 对象,在 ConcurrentHashMap 集合中有多少个呢?有 2 的 N 次方个,共同保存在一个名为 segments 的数组当中。 因此整个 ConcurrentHashMap 的结构如下:
可以说,ConcurrentHashMap 是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表。
Case1:不同 Segment 的并发写入(可以并发执行)
Case2:同一 Segment 的一写一读(可以并发执行)
Case3:同一 Segment 的并发写入
Segment 的写入是需要上锁的,因此对同一 Segment 的并发写入会被阻塞。由此可见,ConcurrentHashMap 中每个 Segment 各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。
1.7版本ConcurrentHashMap的核心原理
ConcurrentHashMap类
ConcurrentHashMap类的核心源码如下:
1 | public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { |
Segment类
每个Segment实例用来守护其内部table成员对象,table是一个由HashEntry实例构成的数组,其每个元素就是哈希映射表的一个桶。
每个Segment实例都有一个count来表示该分段包含的HashEntry“Key-Value对”总数。具体来说,count变量是一个计数器,它表示每个Segment实例管理的table数组(若干个HashEntry组成的链表)包含的HashEntry实例的个数。之所以在每个Segment实例中包含一个计数器,而不是在ConcurrentHashMap中使用全局的计数器,是为了避免出现“全局热点”而影响并发性。
Segment类的代码如下:
1 | static final class Segment<K, V> extends ReentrantLock implements Serializable { |
HashEntry
HashEntry用来封装哈希映射表中的“Key-Value对”。在HashEntry类中,key、hash和next字段都被声明为final型,value字段被声明为volatile型。
1 | static final class HashEntry<K, V> { |
在ConcurrentHashMap中,哈希时如果产生“碰撞”,将采用“分离链接法”来处理:把“碰撞”的HashEntry对象链接成一个链表,形成一个桶。由于HashEntry的next字段为final型,因此新节点只能在链表的表头处插入。
注意:由于只能在表头插入,因此链表中节点的顺序和插入的顺序相反。
ConcurrentHashMap的get操作
从结构上我们可以看到Segment类似于一个小型的HashMap,ConcurrentHashMap就是HashMap集合。接下来就来看一下get操作:
1.8版本ConcurrentHashMap的结构
在JDK 1.8中,ConcurrentHashMap已经抛弃了Segment分段锁机制,存储结构采用数组+链表或者红黑树的组合方式,利用CAS+Synchronized来保证并发更新的安全。
ConcurrentHashMap的内部结构示例
ConcurrentHashMap实例的内部结构示例如图:
ConcurrentHashMap的成员属性
JDK 1.8版本ConcurrentHashMap的主要成员属性大致如下:
1 | public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { |
对以上清单中的重要属性介绍如下:
table
用于保存添加到哈希表中的桶。DEFAULT_CAPACITY
: table的默认长度。默认初期长度为16,在第一次添加元素时,会将table初始化成16个元素的数组。MIN_TREEIFY_CAPACITY
: 链式桶转成红黑树桶的阈值。在增加“Key-Value对”时,当链表长度大于该值时,将链表转换成红黑树。UNTREEIFY_THRESHOLD
: 红黑树桶还原回链式桶的阈值,也就是红黑树转为链表的阈值,当在容量变动时重新计算存储位置后,当原有的红黑树内节点数量小于6时,将红黑树转换成链表。MIN_TREEIFY_CAPACITY
: 链式桶转换成红黑树桶还有一个要求,table的容量达到最小树形化容量的阈值,只有当哈希表中的table容量大于该值时,才允许树将链表转换成红黑树的操作。否则,尽管单个桶内的元素太多,仍然选择直接扩容,而不是将桶树形化。sizeCtl
: sizeCtl用来控制table的初始化和扩容操作的过程,其值大致如下:-1
代表table正在初始化,其他线程应该交出CPU时间片。-N
表示有N-1个线程正在进行扩容操作,严格来说,当其为负数时,只用到其低16位,如果其低16位数值为M,此时有M-1个线程进行扩容。- 大于0分两种情况:如果table未初始化,sizeCtl表示table需要初始化的大小;如果table初始化完成,sizeCtl表示table的容量,默认是table大小的0.75倍。
- 涉及修改sizeCtl的方法有5个:
initTable()
: 初始化哈希表时,涉及sizeCtl的修改。addCount()
: 增加容量时,涉及sizeCtl的修改。tryPresize()
: ConcurrentHashMap扩容方法之一。transfer()
: table数据转移到nextTable。扩容操作的核心在于数据的转移,把旧数组中的数据迁移到新的数组。ConcurrentHashMap精华的部分是它可以利用多线程来进行协同扩容,简单来说,它把table数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划分每个线程锁负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的Bucket会被替换为一个ForwardingNode节点,标记当前Bucket已经被其他线程迁移完了。helpTransfer()
: ConcurrentHashMap鬼斧神工,并发添加元素时,如果正在扩容,其他线程会帮助扩容,也就是多线程扩容。
ConcurrentHashMap的数组扩容
JDK 1.8版本的ConcurrentHashMap中通过一个Node<K,V>[]
数组 table来保存添加到哈希表中的桶,而在同一个Bucket位置是通过链表和红黑树的形式来保存的。但是数组table是懒加载的,只有在第一次添加元素的时候才会初始化
第一次添加元素时,默认初期长度为16,当往table中继续添加元素时,通过Hash值跟数组长度取余来决定放在数组的哪个Bucket位置,如果出现放在同一个位置,就优先以链表的形式存放,在同一个位置的个数达到了8个以上,如果数组的长度还小于64,就会扩容数组。如果数组的长度大于等于64,就会将该节点的链表转换成树。
通过扩容数组的方式来把这些节点分散开。然后将这些元素复制到扩容后的新数组中,同一个Bucket中的元素通过Hash值的数组长度位来重新确定位置,可能还是放在原来的位置,也可能放到新的位置。
而且,在扩容完成之后,如果之前某个节点是树,但是现在该节点的“Key-Value对”数又小于等于6个,就会将该树转为链表。
什么时候扩容呢?当前容量超过阈值,也就是链表中的元素个数超过默认设定(8个)时,如果数组table的大小还未超过64,此时就
进行数组的扩容,如果超过就将链表转化成红黑树。
ConcurrentHashMap 的内部类
Node
Node 类实现了 Map.Entry 接口,主要存放 key-value 对,并且具有 next 域
1 | static class Node<K,V> implements Map.Entry<K,V> { |
另外可以看出很多属性都是用 volatile 关键字
修饰的,也是为了保证内存可见性。
TreeNode
树节点,继承于承载数据的 Node 类。红黑树的操作是针对 TreeBin 类的,从该类的注释也可以看出,TreeBin 是对 TreeNode 的再一次封装,下面会提到。
1 | ** |
TreeBin
这个类并不负责用户的 key、value 信息,而是封装了很多 TreeNode 节点。实际的 ConcurrentHashMap “数组”中,存放的都是 TreeBin 对象,而不是 TreeNode 对象。
1 | static final class TreeBin<K,V> extends Node<K,V> { |
ForwardingNode
在扩容时会出现的特殊节点,其 key、value、hash 全部为 null。并拥有 nextTable 引用的新 table 数组。
1 | static final class ForwardingNode<K,V> extends Node<K,V> { |
put()方法源码
1 | public V put(K key, V value) { |
从put()源码可以看到,JDK 1.8版本在使用CAS自旋完成桶的设置时,使用synchronized内置锁保证桶内并发操作的线程安全。尽管对同一个Map操作的线程争用会非常激烈,但是在同一个桶内的线程争用
通常不会很激烈,所以使用CAS自旋(简单轻量级锁)、synchronized偏向锁或轻量级锁不会降低ConcurrentHashMap的性能。为什么不用ReentrantLock显式锁呢?如果为每一个桶都创建一个ReentrantLock
实例,就会带来大量的内存消耗,反过来,使用CAS自旋(简单轻量级锁)、synchronized偏向锁或轻量级锁,内存消耗的增加会微乎其微。
ConcurrentHashMap 的字段
1、table,volatile Node<K,V>[] table
:
装载 Node 的数组,作为 ConcurrentHashMap 的底层容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为 2 的幂次方,讲 HashMap 的时候讲过。
2、nextTable,volatile Node<K,V>[] nextTable
扩容时使用,平时为 null,只有在扩容的时候才为非 null
3、sizeCtl,volatile int sizeCtl
该属性用来控制 table 数组的大小,根据是否初始化和是否正在扩容有几种情况:
- 当值为负数时: 如果为-1 表示正在初始化,如果为 -N 则表示当前正有 N-1 个线程进行扩容操作;
- 当值为正数时: 如果当前数组为 null 的话表示 table 在初始化过程中,sizeCtl 表示为需要新建数组的长度;若已经初始化了,表示当前数据容器(table 数组)可用容量,也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体指为数组的长度 n 乘以 加载因子 loadFactor;
- 当值为 0 时,即数组长度为默认初始值。
4、sun.misc.Unsafe U
在 ConcurrentHashMap 的实现中,可以看到用了大量的 U.compareAndSwapXXXX
方法去修改 ConcurrentHashMap 的一些属性。
这些方法实际上是利用了CAS 算法用于保证线程安全性,这是一种乐观策略:假设每一次操作都不会产生冲突,当且仅当冲突发生的时候再去尝试。
CAS 操作依赖于现代处理器指令集,通过底层的CMPXCHG指令实现。CAS(V,O,N)
核心思想为:若当前变量实际值 V 与期望的旧值 O 相同,则表明该变量没被其他线程进行修改,因此可以安全的将新值 N 赋值给变量;若当前变量实际值 V 与期望的旧值 O 不相同,则表明该变量已经被其他线程做了处理,此时将新值 N 赋给变量操作就是不安全的,在进行重试。
在并发容器中,CAS 是通过sun.misc.Unsafe
类实现的,该类提供了一些可以直接操控内存和线程的底层操作,可以理解为 Java 中的“指针”。该成员变量的获取是在静态代码块中:
1 | static { |
ConcurrentHashMap 的内部类
1、Node
Node 类实现了 Map.Entry 接口,主要存放 key-value 对,并且具有 next 域
1 | static class Node<K,V> implements Map.Entry<K,V> { |
另外可以看出很多属性都是用 volatile 关键字
2、TreeNode
树节点,继承于承载数据的 Node 类。红黑树的操作是针对 TreeBin 类的,从该类的注释也可以看出,TreeBin 是对 TreeNode 的再一次封装,下面会提到。
1 | ** |
3、TreeBin
这个类并不负责用户的 key、value 信息,而是封装了很多 TreeNode 节点。实际的 ConcurrentHashMap “数组”中,存放的都是 TreeBin 对象,而不是 TreeNode 对象。
1 | static final class TreeBin<K,V> extends Node<K,V> { |
4、ForwardingNode
在扩容时会出现的特殊节点,其 key、value、hash 全部为 null。并拥有 nextTable 引用的新 table 数组。
1 | static final class ForwardingNode<K,V> extends Node<K,V> { |
ConcurrentHashMap 的 CAS
ConcurrentHashMap 会大量使用 CAS 来修改它的属性和进行一些操作。因此,在理解 ConcurrentHashMap 的方法前,我们需要了解几个常用的利用 CAS 算法来保障线程安全的操作。
1、tabAt
1 | static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { |
该方法用来获取 table 数组中索引为 i 的 Node 元素。
2、casTabAt
1 | static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, |
利用 CAS 操作设置 table 数组中索引为 i 的元素
3、setTabAt
1 | static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { |
该方法用来设置 table 数组中索引为 i 的元素
ConcurrentHashMap 的方法
构造方法
ConcurrentHashMap 一共提供了以下 5 个构造方法:
1 | // 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16 |
差别请看注释,我们来看看第 2 种构造方法,源码如下:
1 | public ConcurrentHashMap(int initialCapacity) { |
这段代码的逻辑请看注释,很容易理解,如果小于 0 就直接抛异常,如果指定值大于所允许的最大值就取最大值,否则再对指定值做进一步处理。最后将 cap 赋值给 sizeCtl。
当调用构造方法之后,sizeCtl 的大小就代表了 ConcurrentHashMap 的大小,即 table 数组的长度。
tableSizeFor 做了哪些事情呢?源码如下:
1 | /** |
注释写的很清楚,该方法会将构造方法指定的大小转换成一个 2 的幂次方数,也就是说 ConcurrentHashMap 的大小一定是 2 的幂次方,比如,当指定大小为 18 时,为了满足 2 的幂次方特性,实际上 ConcurrentHashMap 的大小为 2 的 5 次方(32)。
另外,需要注意的是,调用构造方法时并初始化 table 数组,而只算出了 table 数组的长度,当第一次向 ConcurrentHashMap 插入数据时才会真正的完成初始化,并创建 table 数组。
initTable 方法
直接上源码:
1 | private final Node<K,V>[] initTable() { |
代码的逻辑请见注释。
可能存在这样一种情况,多个线程同时进入到这个方法,为了保证能够正确地初始化,第 1 步会先通过 if 进行判断,如果当前已经有一个线程正在初始化,这时候其他线程会调用 Thread.yield()
让出 CPU 时间片。
正在进行初始化的线程会调用 U.compareAndSwapInt
方法将 sizeCtl 改为 -1,即正在初始化的状态。
另外还需要注意,在第四步中会进一步计算数组中可用的大小,即数组的实际大小 n 乘以加载因子 0.75,0.75 就是四分之三,这里n - (n >>> 2)
刚好是n-(1/4)n=(3/4)n
,挺有意思的吧?
如果选择是无参的构造方法,这里在 new Node 数组的时候会使用默认大小DEFAULT_CAPACITY
(16),然后乘以加载因子 0.75,结果为 12,也就是说数组当前的可用大小为 12。
put 方法
调用 put 方法时会调用 putVal 方法,源码如下:
1 | /** Implementation for put and putIfAbsent */ |
ConcurrentHashMap 是一个哈希桶数组,如果不出现哈希冲突的时候,每个元素均匀的分布在哈希桶数组中。当出现哈希冲突的时候,采用拉链法的解决方案,将 hash 值相同的节点转换成链表的形式,另外,在 JDK 1.8 版本中,为了防止拉链过长,当链表的长度大于 8 的时候会将链表转换成红黑树。
确定好数组的索引 i 后,可以调用 tabAt()
方法获取该位置上的元素,如果当前 Node 为 null 的话,可以直接用 casTabAt 方法将新值插入。
拉链法、确定索引 i 的知识在学习 HashMap
如果当前节点不为 null,且该节点为特殊节点(forwardingNode),就说明当前 concurrentHashMap 正在进行扩容操作。怎么确定当前这个 Node 是特殊节点呢?
通过判断该节点的 hash 值是不是等于 -1(MOVED):
1 | static final int MOVED = -1; // hash for forwarding nodes |
当 table[i]
不为 null 并且不是 forwardingNode 时,以及当前 Node 的 hash 值大于0(fh >= 0)
时,说明当前节点为链表的头节点,那么向 ConcurrentHashMap 插入新值就是向这个链表插入新值。通过 synchronized (f)
的方式进行加锁以实现线程安全。
往链表中插入节点的部分代码如下:
1 | if (fh >= 0) { |
这部分代码很好理解,就两种情况:
- 如果在链表中找到了与待插入的 key 相同的节点,就直接覆盖;
- 如果找到链表的末尾都还没找到的话,直接将待插入的键值对追加到链表的末尾。
当链表长度超过 8(默认值)时,链表就转换为红黑树,利用红黑树快速增删改查的特点可以提高 ConcurrentHashMap 的性能:
1 | if (f instanceof TreeBin) { |
这段代码很简单,调用 putTreeVal 方法向红黑树插入新节点,同样的逻辑,如果在红黑树中存在 Key 相同(hash 值相等并且 equals 方法判断为 true)的节点,就覆盖旧值,否则向红黑树追加新节点。
当完成数据新节点插入后,会进一步对当前链表大小进行调整:
1 | if (binCount != 0) { |
至此,put 方法就分析完了,我们来做个总结:
- 对每一个放入的值,先用 spread 方法对 key 的 hashcode 进行 hash 计算,由此来确定这个值在 table 中的位置;
- 如果当前 table 数组还未初始化,进行初始化操作;
- 如果这个位置是 null,那么使用 CAS 操作直接放入;
- 如果这个位置存在节点,说明发生了 hash 碰撞,先判断这个节点的类型,如果该节点
==MOVED
的话,说明正在进行扩容; - 如果是链表节点(
fh>0
),先获取头节点,再依次向后遍历确定这个新加入节点的位置。如果遇到 key 相同的节点,直接覆盖。否则在链表尾插入; - 如果这个节点的类型是 TreeBin,直接调用红黑树的插入方法插入新的节点;
- 插入完节点之后再次检查链表的长度,如果长度大于 8,就把这个链表转换成红黑树;
- 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容。
get 方法
get 方法的源码如下:
1 | public V get(Object key) { |
- 哈希: 对传入的键的哈希值进行散列,这有助于减少哈希冲突的可能性。使用 spread 方法可以保证不同的键更均匀地分布在桶数组中。
- 直接查找: 查找的第一步是检查键的哈希值是否位于表的正确位置。如果在该桶的第一个元素中找到了键,则直接返回该元素的值。这里使用了 == 操作符和 equals 方法来比较键,这有助于处理可能的 null 值和确保正确的相等性比较。
- 红黑树查找: 如果第一个节点的哈希值小于 0,那么这个桶的数据结构是红黑树(Java 8 引入了树化结构来改进链表在哈希冲突时的性能)。在这种情况下,使用 find 方法在红黑树中查找键。
- 链表查找: 如果前两个条件都不满足,那么代码将遍历该桶中的链表。如果在链表中找到了具有相同哈希值和键的元素,则返回其值。如果遍历完整个链表都未找到,则返回 null。
transfer 方法
当 ConcurrentHashMap 容量不足的时候,需要对 table 进行扩容。这个方法的基本思想跟 HashMap 很像,但由于支持并发扩容,所以要复杂一些。transfer 方法源码如下:
1 | private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { |
代码逻辑请看注释,整个扩容操作分为两个部分:
第一部分是构建一个 nextTable,它的容量是原来的两倍,这个操作是单线程完成的。
第二个部分是将原来 table 中的元素复制到 nextTable 中,主要是遍历复制的过程。 得到当前遍历的数组位置 i,然后利用 tabAt 方法获得 i 位置的元素:
- 如果这个位置为空,就在原 table 中的 i 位置放入 forwardNode 节点,这个也是触发并发扩容的关键;
- 如果这个位置是 Node 节点(
fh>=0
),并且是链表的头节点,就把这个链表分裂成两个链表,把它们分别放在 nextTable 的 i 和 i+n 的位置上; - 如果这个位置是 TreeBin 节点(
fh<0
),也做一个反序处理,并且判断是否需要 untreefi,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上; - 遍历所有的节点,就完成复制工作,这时让 nextTable 作为新的 table,并且更新 sizeCtl 为新容量的 0.75 倍 ,完成扩容。
size 相关的方法
对于 ConcurrentHashMap 来说,这个 table 里到底装了多少东西是不确定的,因为不可能在调用 size()
方法的时候“stop the world”让其他线程都停下来去统计,对于这个不确定的 size,ConcurrentHashMap 仍然花费了大量的力气。
为了统计元素的个数,ConcurrentHashMap 定义了一些变量和一个内部类。
1 | /** |
再来看如何统计的源码:
1 | public int size() { |
size 方法返回 Map 中的元素数量,但结果被限制在 Integer.MAX_VALUE 内。如果计算的大小超过这个值,则返回 Integer.MAX_VALUE。如果计算的大小小于 0,则返回 0。
mappingCount 方法也返回 Map 中的元素数量,但允许返回一个 long 值,因此可以表示大于 Integer.MAX_VALUE 的数量。与 size()
方法类似,该方法也会忽略负值,返回 0。
sumCount 方法计算 Map 的实际大小。ConcurrentHashMap 使用一个基础计数 baseCount 和一个 CounterCell 数组 counterCells 来跟踪大小。这种结构有助于减少多线程环境中的争用,因为不同的线程可能会更新不同的 CounterCell。
在计算总和时,sumCount()
方法将 baseCount 与 counterCells 数组中的所有非空单元的值相加。
在 put 方法结尾处调用了 addCount 方法,把当前 ConcurrentHashMap 的元素个数 +1,这个方法一共做了两件事,更新 baseCount 的值,检测是否进行扩容。
1 | private final void addCount(long x, int check) { |
ConcurrentHashMap 示例
假设我们想要构建一个线程安全的高并发统计用户访问次数的功能。在这里,ConcurrentHashMap 是一个很好的选择,因为它提供了高并发性能。
1 | import java.util.concurrent.ConcurrentHashMap; |
在上述示例中:
- 我们使用了 ConcurrentHashMap 来存储用户的访问次数。
- 当用户访问时,我们通过 userVisited 方法更新访问次数。
- 使用 ConcurrentHashMap 的 compute 方法可以确保原子地更新用户的访问次数。
- 可以通过 getVisitCount 方法检索任何用户的访问次数。
ConcurrentHashMap 使我们能够无需担心并发问题就能构建这样一个高效的统计系统。