线程安全的同步容器类

Java同步容器类通过Synchronized(内置锁)来实现同步的容器,比如VectorHashTable以及SynchronizedList等容器。线程安全的同步容器类主要有Vector、Stack、HashTable等。另外,Java还提供了一组包装方法,将一个普通的基础容器包装成一个线程安全的同步容器。例如通过Collections.synchronized包装方法能将一个普通的SortedSet容器包装成一个线程安全的SortedSet同步容器。

Collections.synchronize

Collections.synchronized 相关的包装方法是 Java 中用于将非线程安全的集合类转换为线程安全的集合类的工具方法,位于 java.util.Collections 类中。

以下是一些常见的 Collections.synchronized 包装方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class SynchronizedListExample {
public static void main(String[] args) {
// 创建非线程安全的List
List<String> list = new ArrayList<>();
// 使用synchronizedList方法包装为线程安全的List
List<String> synchronizedList = Collections.synchronizedList(list);
}
}

public class SynchronizedSetExample {
public static void main(String[] args) {
// 创建非线程安全的Set
Set<String> set = new HashSet<>();
// 使用synchronizedSet方法包装为线程安全的Set
Set<String> synchronizedSet = Collections.synchronizedSet(set);
}
}

public class SynchronizedSortedSetIterationExample {
public static void main(String[] args) {
TreeSet<Integer> originalSet = new TreeSet<>();
originalSet.add(1);
originalSet.add(2);
originalSet.add(3);

SortedSet<Integer> synchronizedSortedSet = Collections.synchronizedSortedSet(originalSet);

// 错误的迭代方式(会导致并发问题)
// for (Integer num : synchronizedSortedSet) {
// System.out.println(num);
// }

// 正确的迭代方式,手动加锁
synchronized (synchronizedSortedSet) {
Iterator<Integer> iterator = synchronizedSortedSet.iterator();
while (iterator.hasNext()) {
Integer num = iterator.next();
System.out.println(num);
}
}
}
}

public class SynchronizedMapExample {
public static void main(String[] args) {
// 创建非线程安全的Map
Map<String, Integer> map = new HashMap<>();
// 使用synchronizedMap方法包装为线程安全的Map
Map<String, Integer> synchronizedMap = Collections.synchronizedMap(map);
}
}

public class SynchronizedSortedMapExample {
public static void main(String[] args) {
// 创建非线程安全的SortedMap
SortedMap<String, Integer> sortedMap = new TreeMap<>();
// 使用synchronizedSortedMap方法包装为线程安全的SortedMap
SortedMap<String, Integer> synchronizedSortedMap = Collections.synchronizedSortedMap(sortedMap);
}
}

同步容器面临的问题

可以通过查看Vector、HashTable、java.util.Collections同步包装内部类的源码,发现这些同步容器实现线程安全的方式是:在需要同步访问的方法上添加关键字synchronized。由于锁的存在,同步容器的操作在同一时刻只能有一个线程执行,这使得原本可以并行执行的操作变成了串行执行。比如多个线程对容器进行读写操作时,不能并发进行,会降低系统的吞吐量

Collections 虽能把基础容器包装成线程安全的同步容器,但这类同步容器包装类在对元素进行迭代时,无法执行元素添加操作。

为了解决同步容器的性能问题,有了JUC高并发容器。

JUC高并发容器

JUC基于非阻塞算法(Lock Free,无锁编程)提供了一组高并发容器,包括高并发的List、Set、Queue、Map容器。

什么是高并发容器

JUC高并发容器是基于非阻塞算法(或者无锁编程算法)实现的容器类,无锁编程算法主要通过CAS(Compare And Swap)+Volatile组合实现,通过CAS保障操作的原子性,通过volatile保障变量内存的可见性。

无锁编程算法的主要优点如下:

  1. 开销较小:不需要在内核态和用户态之间切换进程。
  2. 读写不互斥:只有写操作需要使用基于CAS机制的乐观锁,读读操作之间可以不用互斥。

JUC包中提供了List、Set、Queue、Map各种类型的高并发容器,如ConcurrentHashMapConcurrentSkipListMapConcurrentSkipListSetCopyOnWriteArrayListCopyOnWriteArraySet。在性能上,ConcurrentHashMap通常优于同步的HashMap,ConcurrentSkipListMap通常优于同步的TreeMap。当读取和遍历操作远远大于列表的更新操作时,CopyOnWriteArrayList优于同步的ArrayList。

List

JUC包中的高并发List主要有CopyOnWriteArrayList,对应的基础容器为ArrayList。

CopyOnWriteArrayList相当于线程安全的ArrayList,它实现了List接口。在读多写少的场景中,其性能远远高于ArrayList的同步包装容器。

Set

JUC包中的Set主要有CopyOnWriteArraySetConcurrentSkipListSet。

  • CopyOnWriteArraySet继承自AbstractSet类,对应的基础容器为HashSet。其内部组合了一个CopyOnWriteArrayList对象,它的核心操作是基于CopyOnWriteArrayList实现的。

  • ConcurrentSkipListSet是线程安全的有序集合,对应的基础容器为TreeSet。它继承自AbstractSet,并实现了NavigableSet接口。ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的。

Map

JUC包中Map主要有ConcurrentHashMapConcurrentSkipListMap

  • ConcurrentHashMap对应的基础容器为HashMap。JDK 6中的ConcurrentHashMap采用一种更加细粒度的“分段锁”加锁机制,JDK8中采用CAS无锁算法。

  • ConcurrentSkipListMap对应的基础容器为TreeMap。其内部的Skip List(跳表)结构是一种可以代替平衡树的数据结构,默认是按照Key值升序的。

Queue

JUC包中的Queue的实现类包括三类:单向队列、双向队列和阻塞队列。

  • ConcurrentLinkedQueue是基于列表实现的单向队列,按照FIFO(先进先出)原则对元素进行排序。新元素从队列尾部插入,而获取队列元素则需要从队列头部获取。

  • ConcurrentLinkedDeque是基于链表的双向队列,但是该队列不允许null元素。作为双向队列,ConcurrentLinkedDeque可以当作“栈来使用,并且高效地支持并发环境。

  • 除了提供普通的单向队列、双向队列外,JUC拓展了队列,增加了可阻塞的插入和获取等操作,提供了一组阻塞队列,具体如下:

    • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列。
    • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列。
    • PriorityBlockingQueue:按优先级排序的队列。
    • DelayQueue:按照元素的Delay时间进行排序的队列。
    • SynchronousQueue:无缓冲等待队列。

CopyOnWriteArrayList

在很多应用场景中读操作常远超写操作,读操作不修改原有数据,每次读取都加锁是资源浪费,应允许多线程同时访问List内部数据(读操作线程安全)。写时复制(Copy On Write,COW)是计算机程序设计领域的优化策略,其核心思想为多个访问器访问资源时指向同一资源,若有修改器要修改该资源,系统会给修改器复制专用副本,其他访问器看到的资源不变且修改过程对它们透明,COW的主要优点是无修改器修改资源时不会创建副本,多个访问器可共享同一份资源。

CopyOnWriteArrayList的使用

前面讲到,Collections可以将基础容器包装为线程安全的同步容器,但是这些同步容器包装类在进行元素迭代时并不能进行元素添加操作。下面是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class CollectionsExample {
public static void main(String[] args) {
// 创建一个普通的ArrayList
List<String> list = new ArrayList<>();
list.add("元素1");
list.add("元素2");
list.add("元素3");

// 使用Collections将其包装为线程安全的同步容器
List<String> synchronizedList = Collections.synchronizedList(list);

// 尝试迭代并添加元素,会抛出异常
try {
for (String element : synchronizedList) {
// 这里尝试添加新元素,会导致异常
synchronizedList.add("新元素");
}
} catch (Exception e) {
System.out.println("出现异常: " + e);
// 输出的异常信息通常为:java.util.ConcurrentModificationException
// 表明在迭代过程中不允许进行这样的修改操作
}
}
}

该如何解决此问题呢?可使用CopyOnWriteArrayList替代Collections.synchronizedList同步包装实例,具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListExample {
public static void main(String[] args) {
// 使用Collections.synchronizedList包装的示例
List<String> synchronizedList = Collections.synchronizedList(new ArrayList<>());
// 向同步列表中添加元素
synchronizedList.add("元素1");
synchronizedList.add("元素2");

// 在多线程环境下尝试迭代并修改(添加元素)
Thread thread1 = new Thread(() -> {
try {
Iterator<String> iterator = synchronizedList.iterator();
while (iterator.hasNext()) {
String element = iterator.next();
System.out.println("迭代元素: " + element);
// 这里尝试添加元素,会导致异常
synchronizedList.add("新元素");
}
} catch (Exception e) {
System.out.println("使用Collections.synchronizedList出现异常: " + e);
}
});

// 使用CopyOnWriteArrayList的示例
CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
copyOnWriteArrayList.add("元素A");
copyOnWriteArrayList.add("元素B");

// 在多线程环境下尝试迭代并修改(添加元素)
Thread thread2 = new Thread(() -> {
Iterator<String> iterator = copyOnWriteArrayList.iterator();
while (iterator.hasNext()) {
String element = iterator.next();
System.out.println("迭代元素: " + element);
// 使用CopyOnWriteArrayList可以在迭代时添加元素,不会出现异常
copyOnWriteArrayList.add("新元素");
}
});

// 启动线程
thread1.start();
thread2.start();

try {
// 等待线程执行完毕
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Collections.synchronizedList最终元素个数: " + synchronizedList.size());
System.out.println("CopyOnWriteArrayList最终元素个数: " + copyOnWriteArrayList.size());
}
}

CopyOnWriteArrayList的原理

CopyOnWrite(写时复制)就是在修改器对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存复制一份,在新的内存中进行写操作,写完之后,再将原来的指针(或者引用)指向新的内存,原来的内存被回收

CopyOnWriteArrayList是写时复制思想的一种典型实现,其含有一个指向操作内存的内部指针array,而可变操作(add、set等)是在array数组的副本上进行的。当元素需要被修改或者增加时,并不直接在array指向的原有数组上操作,而是首先对array进行一次复制,将修改的内容写入复制的副本中。写完之后,再将内部指针array指向新的副本,这样就可以确保修改操作不会影响访问器的读取操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

/** 对所有的修改器方法进行保护,访问器方法并不需要保护 */
final transient ReentrantLock lock = new ReentrantLock();


// 内部维护的就是一个数组
/** The array, accessed only via getArray/setArray. */
// 该数组被 volatile 修饰,能够保证数据的内存可见性。
private transient volatile Object[] array;

/**
* 获取内部对象数组
*/
final Object[] getArray() {
return array;
}

/**
* 设置内部对象数组
*/
final void setArray(Object[] a) {
array = a;
}
// 省略其他代码
}

读取操作

访问器的读取操作没有任何同步控制和锁操作,理由是内部数组array不会发生修改,只会被另一个array替换,因此可以保证数据安
全。

1
2
3
4
5
6
7
8
9
10
11
12
13
public E get(int index) {
return get(getArray(), index);
}
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}
private E get(Object[] a, int index) {
return (E) a[index];
}

写入操作

CopyOnWriteArrayList的写入操作add()方法在执行时加了独占锁以确保只能有一个线程进行写入操作,避免多线程写的时候会复制出多个副本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//1. 使用Lock,保证写线程在同一时刻只有一个
lock.lock();

try {
//2. 获取旧数组引用
Object[] elements = getArray();
int len = elements.length;

//3. 创建新的数组,并将旧数组的数据复制到新数组中
Object[] newElements = Arrays.copyOf(elements, len + 1);

//4. 往新数组中添加新的数据
newElements[len] = e;

//5. 将旧数组引用指向新的数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

add()操作可以看出,在每次进行添加操作时,CopyOnWriteArrayList底层都是重新复制一份数组,再往新的数组中添加新元素,待添加完了,再将新的array引用指向新的数组。当add()操作完成后,array的引用就已经指向另一个存储空间了。

既然每次添加元素的时候都会重新复制一份新的数组,那就带来了一个问题,就是增加了内存的开销,如果容器的写操作比较频繁,那么其开销就比较大。所以,在实际应用的时候,CopyOnWriteArrayList并不适合进行添加操作。但是在并发场景下,迭代操作比较频繁,CopyOnWriteArrayList就是一个不错的选择。

迭代器实现

CopyOnWriteArray有自己的迭代器,该迭代器不会检查修改状态,也无须检查状态。为什么呢?因为被迭代的array数组可以说是只读的,不会有其他线程能够修改它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
/**对象数组的快照(snapshot)*/
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;

private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

public boolean hasNext() {
return cursor < snapshot.length;
}

@SuppressWarnings("unchecked")
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
}

迭代器的快照成员会在构造迭代器的时候使用CopyOnWriteArrayList的array成员去初始化,具体如下:

1
2
3
4
5
6
7
8
9
//获取迭代器
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}

//返回操作内存
final Object[] getArray() {
return array;
}

CopyOnWriteArrayList的优点

CopyOnWriteArrayList有一个显著的优点,那就是读取、遍历操作不需要同步,速度会非常快。所以,CopyOnWriteArrayList适用于读操作多、写操作相对较少的场景(读多写少),比如可以在进行“黑名单”拦截时使用CopyOnWriteArrayList

CopyOnWriteArrayList和ReentrantReadWriteLock的比较

CopyOnWriteArrayListReentrantReadWriteLock读写锁的思想非常类似,即读读共享、写写互斥、读写互斥、写读互斥。但是前者相比后者的更进一步:为了将读取的性能发挥到极致,**CopyOnWriteArrayList读取是完全不用加锁的,而且写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待,读操作的性能得到大幅度提升**。

BlockingQueue

在多线程环境中,通过BlockingQueue(阻塞队列)可以很容易地实现多线程之间的数据共享和通信,比如在经典的“生产者”和“消费者”模型中,通过BlockingQueue可以完成一个高性能的实现版本。

BlockingQueue的特点

阻塞队列与普通队列(ArrayDeque等)之间的最大不同点在于阻塞队列提供了阻塞式的添加和删除方法。

  1. 阻塞添加
    • 阻塞添加是指当阻塞队列元素已满时,队列会阻塞添加元素的线程,直到队列元素不满时,才重新唤醒线程执行元素添加操作。
  2. 阻塞删除
    • 阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程,再执行删除操作。

阻塞队列的常用方法

先来看看阻塞队列接口提供的主要方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface BlockingQueue<E> extends Queue<E> {
//将指定的元素添加到此队列的尾部
//在成功时返回true,如果此队列已满,就抛出IllegalStateException
boolean add(E e);

//非阻塞式添加:将指定的元素添加到此队列的尾部(如果立即可行且不会超过该队列的容量)
//如果该队列已满,就直接返回
boolean offer(E e)

//限时阻塞式添加:将指定的元素添加到此队列的尾部
//如果该队列已满,那么在到达指定的等待时间之前,添加线程会阻塞,等待可用的空间,该方法可中断
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

//阻塞式添加:将指定的元素添加到此队列的尾部,如果该队列已满,就一直等待(阻塞)
void put(E e) throws InterruptedException;

//阻塞式删除:获取并移除此队列的头部,如果没有元素就等待(阻塞)
//直到有元素,将唤醒等待线程执行该操作
E take() throws InterruptedException;

//非阻塞式删除:获取并移除此队列的头部,如果没有元素就直接返回null(空)
E poll() throws InterruptedException;

//限时阻塞式删除:获取并移除此队列的头部,在指定的等待时间前一直等待获取元素,超过时间,方法将结束
E poll(long timeout, TimeUnit unit) throws InterruptedException;

//获取但不移除此队列的头元素,没有则抛出异常
NoSuchElementException E element();

//获取但不移除此队列的头元素,如果此队列为空,就返回null
E peek();

//从此队列中移除指定元素,返回删除是否成功
boolean remove(Object o);
}

4个特征说明如下:

  1. 抛出异常: 如果试图的操作无法立即执行,就抛出一个异常。
  2. 特殊值: 如果尝试的操作无法立即执行,就返回一个特定的值(通常是true/false)。
  3. 阻塞:如果尝试的操作无法立即执行,该方法的调用就会发生阻塞,直到能够执行。
  4. 限时阻塞:如果尝试的操作无法立即执行,该方法的调用就会发生阻塞,直到能够执行,但等待时间不会超过设置的上限值。
操作类型 抛出异常 特殊值 阻塞 限时阻塞
添加 add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
获取元素 element() peek() 不可用 不可用

添加类方法

  • add(E e):添加成功则返回true,失败就抛出IllegalStateException异常。
  • offer(E e):成功则返回true,如果此队列已满,就返回false。
  • put(E e):将元素添加至此队列的尾部,如果该队列已满,就一直阻塞。

删除类方法

  • poll():获取并移除此队列的头元素,若队列为空,则返回null。
  • take():获取并移除此队列的头元素,若没有元素,则一直阻塞。
  • remove(Object o):移除指定元素,成功则返回true,失败则返回false。

获取元素类方法

  • element():获取但不移除此队列的头元素,没有元素则抛出异常。
  • peek():获取但不移除此队列的头元素,若队列为空,则返回null。

常见的BlockingQueue

在了解了BlockingQueue的主要方法后,接下来介绍一下BlockingQueue家族大致有哪些成员。BlockingQueue的实现类有
ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、PriorityBlockingQueue、SynchronousQueue等。

ArrayBlockingQueue

ArrayBlockingQueue是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组来存储元素。除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。

ArrayBlockingQueue的添加和删除操作共用同一个锁对象,由此意味着添加和删除无法并行运行,这点不同于LinkedBlockingQueue。ArrayBlockingQueue完全可以将添加和删除的锁分离,从而添加和删除操作完全并行。Doug Lea之所以没这样去做,是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧。

为什么ArrayBlockingQueue比LinkedBlockingQueue更加常用?前者在添加或删除元素时不会产生或销毁任何额外的Node(节点)实例,而后者会生成一个额外的Node实例。在长时间、高并发处理大批量数据的场景中,LinkedBlockingQueue产生的额外Node实例会加大系统的GC压力。

LinkedBlockingQueue

LinkedBlockingQueue是基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。LinkedBlockingQueue对于添加和删除元素分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下,生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

需要注意的是,在新建一个LinkedBlockingQueue对象时,若没有指定其容量大小,则LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就已经被消耗殆尽了。

DelayQueue

DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue是一个没有大小限制的队列,因此往队列中添加数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

DelayQueue的使用场景较少,但是相当巧妙,常见的例子是使用DelayQueue来管理一个超时未响应的连接队列。

PriorityBlockingQueue

基于优先级的阻塞队列和DelayQueue类似,PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。在使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么大家都在集市等待。相对于有缓冲的阻塞队列(如LinkedBlockingQueue)来说,SynchronousQueue少了中间缓冲区(如仓库)的环节。如果有仓库,生产者直接把产品批发给仓库,不需要关心仓库最终会将这些产品发给哪些消费者,由于仓库可以中转部分商品,总体来说有仓库进行生产和消费的吞吐量高一些。反过来说,又因为仓库的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低,所以对单个消息的响应要求高的场景可以使用SynchronousQueue

声明一个SynchronousQueue有两种不同的方式:公平模式和非公平模式。公平模式的SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体现出整体的公平特征。非公平模式(默认情况)的SynchronousQueue采用非公平锁,同时配合一个LIFO堆栈(TransferStack内部实例)来管理多余的生产者和消费者。对于后一种模式,如果生产者和消费者的处理速度有差距,就很容易出现线程饥渴的情况,即可能出现某些生产者或者消费者的数据永远都得不到处理。

ArrayBlockingQueue的基本使用

下面通过ArrayBlockingQueue队列实现一个生产者-消费者的案例,通过该案例简单了解其使用方式和方法。具体的代码在前面的生
产者和消费者实现基础上进行迭代——Consumer(消费者)和Producer(生产者)通过ArrayBlockingQueue队列获取和添加元素。

  • 其中,消费者调用take()方法获取元素,当队列没有元素时就阻塞;
  • 生产者调用put()方法添加元素,当队列满时就阻塞。

通过这种方式便可以实现生产者-消费者模式,比直接使用等待唤醒机制或者Condition条件队列更加简单。

实现生产者-消费者模式

DataBuffer(共享数据区)使用一个ArrayBlockingQueue用于缓存数据,具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.util.concurrent.ArrayBlockingQueue;

// 定义共享数据区DataBuffer类
class DataBuffer {
private final ArrayBlockingQueue<Integer> buffer; // 使用Integer类型举例,可根据实际替换为其他类型

public DataBuffer(int capacity) {
buffer = new ArrayBlockingQueue<>(capacity);
}

// 生产者调用的放入数据方法
public void putData(int data) throws InterruptedException {
buffer.put(data);
}

// 消费者调用的获取数据方法
public int getData() throws InterruptedException {
return buffer.take();
}
}

// 生产者类
class Producer implements Runnable {
private final DataBuffer dataBuffer;

public Producer(DataBuffer dataBuffer) {
this.dataBuffer = dataBuffer;
}

@Override
public void run() {
try {
for (int i = 0; i < 10; i++) { // 简单循环生产10个元素示例,可按需调整
dataBuffer.putData(i);
System.out.println("生产者生产了元素: " + i);
Thread.sleep(500); // 模拟生产耗时,可按需调整时间间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}

// 消费者类
class Consumer implements Runnable {
private final DataBuffer dataBuffer;

public Consumer(DataBuffer dataBuffer) {
this.dataBuffer = dataBuffer;
}

@Override
public void run() {
try {
while (true) {
int element = dataBuffer.getData();
System.out.println("消费者消费了元素: " + element);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}

public class ProducerConsumerWithDataBufferExample {
public static void main(String[] args) {
DataBuffer dataBuffer = new DataBuffer(5); // 创建共享数据区,队列大小设置为5,可按需调整
Thread producerThread = new Thread(new Producer(dataBuffer));
Thread consumerThread = new Thread(new Consumer(dataBuffer));

producerThread.start();
consumerThread.start();

try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}

ArrayBlockingQueue构造器

创建公平与非公平阻塞队列的代码如下:

1
2
3
4
//默认非公平阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(capacity);
//公平阻塞队列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(capacity,true);

ArrayBlockingQueue的两个构造器的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//只带一个capacity参数的构造器
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

//带两个参数的构造器
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); //根据fair参数构造公平锁/获取非公平锁
notEmpty = lock.newCondition(); //有元素加入,队列为非空
notFull = lock.newCondition(); //有元素被取出,队列为未满
}

ArrayBlockingQueue内部的阻塞队列是通过重入锁ReenterLock和Condition条件队列实现的,接下来看看其内部的成员变量

ArrayBlockingQueue内部的成员变量

ArrayBlockingQueue是一个基于数组(Array)实现的有界阻塞队列,内部成员变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

/**
* 存储数据的数组
*/
final Object[] items;

/**
* 获取、删除元素的索引,主要用于take、poll、peek、remove方法
*/
int takeIndex;

/**
* 添加元素的索引,主要用于 put、offer、add方法
*/
int putIndex;

/**
* 队列元素的个数
*/
int count;

/**
* 控制并发访问的显式锁
*/
final ReentrantLock lock;

/**
* notEmpty条件对象,用于通知take线程(消费队列),可执行删除操
* 作
*/
private final Condition notEmpty;

/**
* notFull条件对象,用于通知put线程(生产队列),可执行添加操作
*/
private final Condition notFull;

/**
* 迭代器
*/
transient Itrs itrs = null;

}

ArrayBlockingQueue内部是通过数组对象items来存储所有的数据的,通过ReentrantLock类型的成员lock控制添加线程与删除线程的并
发访问。ArrayBlockingQueue使用等待条件对象notEmpty成员来存放或唤醒被阻塞的消费(take)线程,当数组对象items有元素时,告诉
take线程可以执行删除操作。同理,ArrayBlockingQueue使用等待条件对象notFull成员来存放或唤醒被阻塞的生产(put)线程,当队列
未满时,告诉put线程可以执行添加元素的操作。

ArrayBlockingQueue的takeIndex成员为消费(或删除元素)的索引,标识的是下一个方法(take、poll、peek、remove)被调用时获
取数组元素的位置。**putIndex成员为生产(或添加元素)的索引**,代表下一种方法(put、offer、add)被调用时元素添加到数组中的位
置。

add()方法的实现

1
2
3
4
5
6
7
//AbstractQueue
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

从源码可以看出,add()方法间接调用了offer()方法,如果offer()方法添加失败,那么add()将抛出IllegalStateException异常,如果offer()方法添加成功,那么add()返回true。

offer()方法的实现

offer()方法根据数组是否满了,分两种场景进行操作:

  • 如果数组满了,就直接释放锁,然后返回false。
  • 如果数组没满,就将元素入队(加入数组),然后返回true。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ArrayBlockingQueue
public boolean offer(E e) {
checkNotNull(e); //检查元素是否为null
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
if (count == items.length) //判断数组是否已满
return false;
else {
enqueue(e); //添加元素到队列
return true;
}
} finally {
lock.unlock();
}
}

enqueue()方法的实现

offer()调用了enqueue(E x)元素入队方法

1
2
3
4
5
6
7
8
9
10
11
12
// ArrayBlockingQueue
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items; //获取当前数组
items[putIndex] = x; //通过putIndex索引对数组进行赋值
//索引自增,如果已经是最后一个位置,重新设置 putIndex=0
if (++putIndex == items.length)
putIndex = 0;
count++; //队列中元素数量加1
notEmpty.signal(); //唤醒调用take()方法的线程,执行元素获取操作
}

首先,由于进入enqueue()方法意味着数组没满,因此enqueue()方法可以通过putIndex索引直接将元素添加到数组items中,然后调整putIndex索引值。其次,enqueue()完成尾部的插入后,将自己的元素个数成员count+1。最后,enqueue()通过调用notFull.notEmpty()唤醒一个消费(或删除)线程。

这里大家可能会疑惑:当putIndex索引大小等于数组长度时,为什么需要将putIndex重新设置为0呢?

这是因为获取元素时总是在队列头部(takeIndex索引)操作,添加元素总是在队列尾部(putIndex索引)操作,而ArrayBlockingQueue将内部数组作为环形队列使用,所以在更新后索引值与数组长度相等时需要进行校正,下一个值就需要从数组的第一个元素(索引值0)开始操作。

阻塞式添加元素:put()方法的原理

首先来看阻塞式添加元素。在队列满而不能添加元素时,执行添加元素的线程会被阻塞。put()方法是一个阻塞的方法,如果队列元素已满,那么当前线程会被加入notFull条件对象的等待队列中,直到队列有空位置才会被唤醒执行添加操作。但如果队列没有满,就直接调用enqueue(e)方法将元素加入数组队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //该方法可中断
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中,等待被唤醒
notFull.await();
enqueue(e); ;//如果队列没有满,就直接添加
} finally {
lock.unlock();
}
}

下面总结一下put()方法的添加操作流程。

  1. 获取putLock锁。
  2. 如果队列已满,就被阻塞,put线程进入notFull的等待队列中排队,等待被唤醒。
  3. 如果队列未满,元素通过enqueue方法入队。
  4. 释放putLock锁。
  5. 当队列已满时,新到来的put线程将被添加到notFull的条件队列中进行阻塞等待

非阻塞式删除元素:poll()方法的原理

poll()方法删除获取此队列的头元素,若队列为空,则立即返回null。poll()方法的实现比较简单,其具体的删除操作委托给了dequeue(E x)元素出队方法。

1
2
3
4
5
6
7
8
9
10
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判断队列是否为null,不为null执行dequeue()方法,否则返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

dequeue(E x)元素出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//删除队列头元素并返回
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
//拿到当前数组的数据
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要删除的对象
E x = (E) items[takeIndex];
//清空位置:将数组中的takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等
if (++takeIndex == items.length)
//如果相等就说明已到尽头,恢复为0
takeIndex = 0;
count--; ;//元素个数减1
if (itrs != null)
//同时更新迭代器中的元素数据
itrs.elementDequeued();
notFull.signal(); //删除了元素说明队列有空位,唤醒notFull条件等待队列中的put线程,执行添加操作
return x;
}
  1. 进入dequeue()方法,意味着takeIndex位置有元素可以删除,反过来说,如果takeIndex位置没有元素,就不会进入此方法。所以,第一步是拿到takeIndex位置的元素。
  2. 将takeIndex位置后移(自增),移动到下一个位置,无论一个位置有没有元素都没有关系,总之移动之后的takeIndex新位置会是下一轮删除元素的位置。
  3. 如果takeIndex自增之后值为items.length,说明takeIndex的索引已到数组尽头,就将其值校正为0,表示下一次从头部开始删除元素,达到环形队列的效果。
  4. 删除了元素说明队列有空位,唤醒notFull条件等待队列中的一个put线程,执行添加操作。

阻塞式删除元素:take()方法的原理

take()方法是一个可阻塞、可中断的删除方法,主要做了两件事:

  • 如果队列没有数据,就将线程加入notEmpty等待队列并阻塞线程,一直到有生产者插入数据后通过notEmpty发出一个消息,notEmpty将从其等待队列唤醒一个消费(或者删除)节点,同时启动该消费线程。

  • 如果队列有数据,就通过dequeue()执行元素的删除(或消费)操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //从队列头部移除元素,队列没有元素就阻塞,可中断
    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //可中断
    try {
    //如果队列没有元素
    while (count == 0)
    //执行阻塞操作
    notEmpty.await();
    return dequeue(); //如果队列有元素就执行删除操作
    } finally {
    lock.unlock();
    }
    }

take()方法其实很简单,有就删除,没有就阻塞。如果队列没有数据,就将线程加入notEmpty条件队列等待,如果有新的put线程添加了数据,那么put操作将会唤醒一个处于阻塞状态的take线程执行消费(或删除)操作

peek()直接返回当前队列的头元素

peek()方法从takeIndex(头部位置)直接就可以获取最早被添加的元素,所以效率是比较高的,如果不存在就返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直接返回当前队列的头元素,但不删除
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}

final E itemAt(int i) {
return (E) items[i];
}

ConcurrentHashMap

ConcurrentHashMap是一个常用的高并发容器类,也是一种线程安全的哈希表。Java 7以及之前版本中的ConcurrentHashMap使用Segment(分段锁)技术将数据分成一段一段存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问

Java 8对其内部的存储结构进行了优化,使之在性能上有进一步的提升。ConcurrentHashMap和同步容器HashTable的主要区别在锁的类型和粒度上:HashTable实现同步是利用synchronized关键字进行锁定的,其实是针对整张哈希表进行锁定的,即每次锁住整张表让线程独占,虽然解决了线程安全问题,但是造成了巨大的资源浪费。

HashMap和HashTable的问题

基础容器HashMap是线程不安全的,在多线程环境下,使用HashMap进行put操作时,可能会引起死循环,导致CPU利用率飙升,甚至接近100%,所以在高并发情况下是不能使用HashMap的。于是JDK提供了一个线程安全的Map——HashTable,HashTable虽然线程安全,但效率低下。HashTable和HashMap的实现原理几乎一样,区别有两点:
(1)HashTable不允许key和value为null。
(2)HashTable使用synchronized来保证线程安全,包含get()/put()在内的所有相关需要进行同步执行的方法都加上了synchronized关键字,对这个Hash表进行锁定。

HashTable线程安全策略的代价非常大,这相当于给整个哈希表加了一把大锁。当一个线程访问HashTable的同步方法时,其他访问HashTable同步方法的线程就会进入阻塞或轮询状态。若有一个线程在调用put()方法添加元素,则其他线程不但不能调用put()方法添加元素,而且不能调用get()方法来获取元素,相当于将所有的操作串行化。所以,HashTable的效率非常低下。

JDK 1.7版本ConcurrentHashMap的结构

JDK 1.7的ConcurrentHashMap的锁机制基于粒度更小的分段锁,分段锁也是提升多并发程序性能的重要手段之一,和LongAdder一样,属于热点分散型的削峰手段。

分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,分段锁技术将Key分成一个一个小Segment存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。

ConcurrentHashMap的内部结构的层次关系为ConcurrentHashMap→Segment[]→HashEntry[]。这样设计的好处在于,每次访问的时候只需要将一个Segment锁定,而不需要将整个Map类型集合都锁定。

JDK 1.7中的ConcurrentHashMap由 Segment 数组结构和 HashEntry 数组构成的。一个ConcurrentHashMap中包含一个Segment数组,一个Segment中包含一个HashEntry数组,每个元素是一个链表结构(一个Hash表的桶)。

Segment 是一种可重入的锁 ReentrantLock,HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表结构,一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。

ConcurrentHashMap 读写过程如下:

  • get 方法
    1. 为输入的 Key 做 Hash 运算,得到 hash 值。
    2. 通过 hash 值,定位到对应的 Segment 对象
    3. 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
  • put 方法
    1. 为输入的 Key 做 Hash 运算,得到 hash 值。
    2. 通过 hash 值,定位到对应的 Segment 对象
    3. 获取可重入锁
    4. 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
    5. 插入或覆盖 HashEntry 对象。
    6. 释放锁。

单一的 Segment 结构如下:

像这样的 Segment 对象,在 ConcurrentHashMap 集合中有多少个呢?有 2 的 N 次方个,共同保存在一个名为 segments 的数组当中。 因此整个 ConcurrentHashMap 的结构如下:

可以说,ConcurrentHashMap 是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表。

  • Case1:不同 Segment 的并发写入(可以并发执行)

  • Case2:同一 Segment 的一写一读(可以并发执行)

  • Case3:同一 Segment 的并发写入

Segment 的写入是需要上锁的,因此对同一 Segment 的并发写入会被阻塞。由此可见,ConcurrentHashMap 中每个 Segment 各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。

1.7版本ConcurrentHashMap的核心原理

ConcurrentHashMap类

ConcurrentHashMap类的核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

/**
* 哈希映射表的默认初始容量为16,即初始默认为16个桶
* 在构造器中没有指定这个参数时,使用本参数
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
* 哈希映射表的默认装载因子为0.75,该值是table中包含的
* HashEntry元素的个数与
* table数组长度的比值,当table中包含的HashEntry元素的个数超过
* 了table数组的长度与装载因子的乘积时, 将触发扩容操作
* 如果哈希在构造函数中没有指定这个参数,就使用本参数的值
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

// 集合最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;

// 分段锁的最小数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// 分段锁的最大数量
static final int MAX_SEGMENTS = 1 << 16;

// 加锁前的重试次数
static final int RETRIES_BEFORE_LOCK = 2;

/**
* 哈希表的默认并发级别为16,该值表示当前更新线程的估计数
* 在构造器中没有指定这个参数时,使用本参数
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* segments 的掩码值
* key 的哈希码的高位用来选择具体的 segment
*/
final int segmentMask;

/**
* 偏移量
*/
final int segmentShift;

/**
* 由 Segment 对象组成的数组
*/
final Segment<K, V>[] segments;

/**
* 创建一个带有指定初始容量、加载因子和并发级别的新的空映射
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException();

if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS;

// 寻找最佳匹配参数(不小于给定参数最接近的 2 次幂)
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}

segmentShift = 32 - sshift; // 偏移量值
segmentMask = ssize - 1; // 掩码值
this.segments = Segment.newArray(ssize); // 创建数组

if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity) ++c;
int cap = 1;
while (cap < c) cap <<= 1;

// 依次遍历每个数组元素
for (int i = 0; i < this.segments.length; ++i)
// 初始化每个数组元素引用的 Segment 对象
this.segments[i] = new Segment<K, V>(cap, loadFactor);
}

/**
* 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并
* 发级别 (16)的
* 空哈希映射表
*/
public ConcurrentHashMap() {
// 使用三个默认参数调用上面重载的构造器来创建空哈希映射表
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

}

Segment类

每个Segment实例用来守护其内部table成员对象,table是一个由HashEntry实例构成的数组,其每个元素就是哈希映射表的一个桶。

每个Segment实例都有一个count来表示该分段包含的HashEntry“Key-Value对”总数。具体来说,count变量是一个计数器,它表示每个Segment实例管理的table数组(若干个HashEntry组成的链表)包含的HashEntry实例的个数。之所以在每个Segment实例中包含一个计数器,而不是在ConcurrentHashMap中使用全局的计数器,是为了避免出现“全局热点”而影响并发性。

Segment类的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static final class Segment<K, V> extends ReentrantLock implements Serializable {
/**
* 在本 segment范围内包含的HashEntry 元素的个数
* 该变量被声明为 volatile 型
*/
transient volatile int count;

/**
* table 被更新的次数
*/
transient int modCount;

/**
* 当table中包含的HashEntry元素的个数超过本变量值时,触发table的再哈希
*
*/
transient int threshold;

/**
* table 是由 HashEntry 实例组成的数组
* 如果HashEntry实例的哈希值发生碰撞,碰撞的HashEntry实例就以
* 链表的形式链接成一个链表
* table 数组的数组成员代表哈希映射表的一个桶
* 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
* 若并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶
* 总数的 1/16
*/
transient volatile HashEntry<K, V>[] table;

/**
* 装载因子
*/
final float loadFactor;

Segment(int initialCapacity, float lf) {
loadFactor = lf;
setTable(HashEntry.<K, V>newArray(initialCapacity));
}

/**
* 设置table引用到这个新生成的 HashEntry 数组
* 只能在持有锁或构造器中调用本方法
*/
void setTable(HashEntry<K, V>[] newTable) {
// 计算临界阈值为新数组的长度与装载因子的乘积
threshold = (int) (newTable.length * loadFactor);
table = newTable;
}

/**
* 根据key的哈希值找到 table 中对应的那个桶(table 数组的某个
* 数组成员)
*/
HashEntry<K, V> getFirst(int hash) {
HashEntry<K, V>[] tab = table;
// 把哈希值与table数组长度减1的值相“与”得到哈希值对应的table 数组的下标
// 然后返回 table 数组中此下标对应的 HashEntry 元素
return tab[hash & (tab.length - 1)];
}
}

HashEntry

HashEntry用来封装哈希映射表中的“Key-Value对”。在HashEntry类中,key、hash和next字段都被声明为final型,value字段被声明为volatile型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static final class HashEntry<K, V> {
final K key; // 声明 key 为final 型
final int hash; // 声明 hash 为final 型
volatile V value; // 声明 value 为volatile 型

final HashEntry<K, V> next; // 声明 next 为final 型

HashEntry(K key, int hash, HashEntry<K, V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
}

在ConcurrentHashMap中,哈希时如果产生“碰撞”,将采用“分离链接法”来处理:把“碰撞”的HashEntry对象链接成一个链表,形成一个桶。由于HashEntry的next字段为final型,因此新节点只能在链表的表头处插入。

注意:由于只能在表头插入,因此链表中节点的顺序和插入的顺序相反。

ConcurrentHashMap的get操作

从结构上我们可以看到Segment类似于一个小型的HashMap,ConcurrentHashMap就是HashMap集合。接下来就来看一下get操作:

1.8版本ConcurrentHashMap的结构

在JDK 1.8中,ConcurrentHashMap已经抛弃了Segment分段锁机制,存储结构采用数组+链表或者红黑树的组合方式,利用CAS+Synchronized来保证并发更新的安全。

ConcurrentHashMap的内部结构示例

ConcurrentHashMap实例的内部结构示例如图:

ConcurrentHashMap的成员属性

JDK 1.8版本ConcurrentHashMap的主要成员属性大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
// 常量:表示正在转移
static final int MOVED = -1;
// 常量:表示已经转换成树
static final int TREEBIN = -2;
// 常量:hash for transient reservations
static final int RESERVED = -3;
// 常量:usable bits of normal node hash
static final int HASH_BITS = 0x7fffffff;
//数组,用来保存元素
transient volatile Node<K,V>[] table;
//转移时用的数组
private transient volatile Node<K,V>[] nextTable;
/**
* 用来控制表初始化和扩容的控制属性
*/
private transient volatile int sizeCtl;

// 省略其他
}

对以上清单中的重要属性介绍如下:

  • table 用于保存添加到哈希表中的桶。
  • DEFAULT_CAPACITY: table的默认长度。默认初期长度为16,在第一次添加元素时,会将table初始化成16个元素的数组。
  • MIN_TREEIFY_CAPACITY: 链式桶转成红黑树桶的阈值。在增加“Key-Value对”时,当链表长度大于该值时,将链表转换成红黑树。
  • UNTREEIFY_THRESHOLD: 红黑树桶还原回链式桶的阈值,也就是红黑树转为链表的阈值,当在容量变动时重新计算存储位置后,当原有的红黑树内节点数量小于6时,将红黑树转换成链表。
  • MIN_TREEIFY_CAPACITY: 链式桶转换成红黑树桶还有一个要求,table的容量达到最小树形化容量的阈值,只有当哈希表中的table容量大于该值时,才允许树将链表转换成红黑树的操作。否则,尽管单个桶内的元素太多,仍然选择直接扩容,而不是将桶树形化。
  • sizeCtl: sizeCtl用来控制table的初始化和扩容操作的过程,其值大致如下:
    • -1代表table正在初始化,其他线程应该交出CPU时间片。
    • -N表示有N-1个线程正在进行扩容操作,严格来说,当其为负数时,只用到其低16位,如果其低16位数值为M,此时有M-1个线程进行扩容。
    • 大于0分两种情况:如果table未初始化,sizeCtl表示table需要初始化的大小;如果table初始化完成,sizeCtl表示table的容量,默认是table大小的0.75倍。
    • 涉及修改sizeCtl的方法有5个:
      • initTable(): 初始化哈希表时,涉及sizeCtl的修改。
      • addCount(): 增加容量时,涉及sizeCtl的修改。
      • tryPresize(): ConcurrentHashMap扩容方法之一。
      • transfer(): table数据转移到nextTable。扩容操作的核心在于数据的转移,把旧数组中的数据迁移到新的数组。ConcurrentHashMap精华的部分是它可以利用多线程来进行协同扩容,简单来说,它把table数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划分每个线程锁负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的Bucket会被替换为一个ForwardingNode节点,标记当前Bucket已经被其他线程迁移完了。
      • helpTransfer(): ConcurrentHashMap鬼斧神工,并发添加元素时,如果正在扩容,其他线程会帮助扩容,也就是多线程扩容。

ConcurrentHashMap的数组扩容

JDK 1.8版本的ConcurrentHashMap中通过一个Node<K,V>[] 数组 table来保存添加到哈希表中的桶,而在同一个Bucket位置是通过链表和红黑树的形式来保存的。但是数组table是懒加载的,只有在第一次添加元素的时候才会初始化

第一次添加元素时,默认初期长度为16,当往table中继续添加元素时,通过Hash值跟数组长度取余来决定放在数组的哪个Bucket位置,如果出现放在同一个位置,就优先以链表的形式存放,在同一个位置的个数达到了8个以上,如果数组的长度还小于64,就会扩容数组。如果数组的长度大于等于64,就会将该节点的链表转换成树。

通过扩容数组的方式来把这些节点分散开。然后将这些元素复制到扩容后的新数组中,同一个Bucket中的元素通过Hash值的数组长度位来重新确定位置,可能还是放在原来的位置,也可能放到新的位置。

而且,在扩容完成之后,如果之前某个节点是树,但是现在该节点的“Key-Value对”数又小于等于6个,就会将该树转为链表。

什么时候扩容呢?当前容量超过阈值,也就是链表中的元素个数超过默认设定(8个)时,如果数组table的大小还未超过64,此时就
进行数组的扩容,如果超过就将链表转化成红黑树。

ConcurrentHashMap 的内部类

Node

Node 类实现了 Map.Entry 接口,主要存放 key-value 对,并且具有 next 域

1
2
3
4
5
6
7
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}

另外可以看出很多属性都是用 volatile 关键字修饰的,也是为了保证内存可见性。

TreeNode

树节点,继承于承载数据的 Node 类。红黑树的操作是针对 TreeBin 类的,从该类的注释也可以看出,TreeBin 是对 TreeNode 的再一次封装,下面会提到。

1
2
3
4
5
6
7
8
9
10
11
**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
......
}

TreeBin

这个类并不负责用户的 key、value 信息,而是封装了很多 TreeNode 节点。实际的 ConcurrentHashMap “数组”中,存放的都是 TreeBin 对象,而不是 TreeNode 对象。

1
2
3
4
5
6
7
8
9
10
11
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
......
}

ForwardingNode

在扩容时会出现的特殊节点,其 key、value、hash 全部为 null。并拥有 nextTable 引用的新 table 数组。

1
2
3
4
5
6
7
8
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
.....
}

put()方法源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//自旋:并发情况下,也可以保障安全添加成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//第一次添加,先初始化node数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//计算出table[i]无节点,创建节点
//使用Unsafe.compareAndSwapObject 原子操作table[i]位置
//如果为null,就添加新建的node节点,跳出循环
//反之,再循环进入执行添加操作
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//如果当前处于转移状态,返回新的tab内部表,然后进入循环执行添加操作
tab = helpTransfer(tab, f);
else {
//在链表或红黑树中追加节点
V oldVal = null;
//使用synchronized 对 f 对象加锁
// f = tabAt(tab, i = (n - 1) & hash) :table[i] 的node对象(桶)
//注意:这里没用ReentrantLock,而是使用synchronized 进行同步
//在争用不激烈的场景中,synchronized 的性能和ReentrantLock不相上下
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//在红黑树上追加节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//节点数大于临界值,转换成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

从put()源码可以看到,JDK 1.8版本在使用CAS自旋完成桶的设置时,使用synchronized内置锁保证桶内并发操作的线程安全。尽管对同一个Map操作的线程争用会非常激烈,但是在同一个桶内的线程争用
通常不会很激烈,所以使用CAS自旋(简单轻量级锁)、synchronized偏向锁或轻量级锁不会降低ConcurrentHashMap的性能。为什么不用ReentrantLock显式锁呢?如果为每一个桶都创建一个ReentrantLock
实例,就会带来大量的内存消耗,反过来,使用CAS自旋(简单轻量级锁)、synchronized偏向锁或轻量级锁,内存消耗的增加会微乎其微。

ConcurrentHashMap 的字段

1、tablevolatile Node<K,V>[] table:

装载 Node 的数组,作为 ConcurrentHashMap 的底层容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为 2 的幂次方,讲 HashMap 的时候讲过。

2、nextTablevolatile Node<K,V>[] nextTable

扩容时使用,平时为 null,只有在扩容的时候才为非 null

3、sizeCtlvolatile int sizeCtl

该属性用来控制 table 数组的大小,根据是否初始化和是否正在扩容有几种情况:

  • 当值为负数时: 如果为-1 表示正在初始化,如果为 -N 则表示当前正有 N-1 个线程进行扩容操作;
  • 当值为正数时: 如果当前数组为 null 的话表示 table 在初始化过程中,sizeCtl 表示为需要新建数组的长度;若已经初始化了,表示当前数据容器(table 数组)可用容量,也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体指为数组的长度 n 乘以 加载因子 loadFactor;
  • 当值为 0 时,即数组长度为默认初始值。

4、sun.misc.Unsafe U

在 ConcurrentHashMap 的实现中,可以看到用了大量的 U.compareAndSwapXXXX 方法去修改 ConcurrentHashMap 的一些属性。

这些方法实际上是利用了CAS 算法用于保证线程安全性,这是一种乐观策略:假设每一次操作都不会产生冲突,当且仅当冲突发生的时候再去尝试。

CAS 操作依赖于现代处理器指令集,通过底层的CMPXCHG指令实现。CAS(V,O,N)核心思想为:若当前变量实际值 V 与期望的旧值 O 相同,则表明该变量没被其他线程进行修改,因此可以安全的将新值 N 赋值给变量;若当前变量实际值 V 与期望的旧值 O 不相同,则表明该变量已经被其他线程做了处理,此时将新值 N 赋给变量操作就是不安全的,在进行重试

在并发容器中,CAS 是通过sun.misc.Unsafe类实现的,该类提供了一些可以直接操控内存和线程的底层操作,可以理解为 Java 中的“指针”。该成员变量的获取是在静态代码块中:

1
2
3
4
5
6
7
8
static {
try {
U = sun.misc.Unsafe.getUnsafe();
.......
} catch (Exception e) {
throw new Error(e);
}
}

ConcurrentHashMap 的内部类

1、Node

Node 类实现了 Map.Entry 接口,主要存放 key-value 对,并且具有 next 域

1
2
3
4
5
6
7
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}

另外可以看出很多属性都是用 volatile 关键字

2、TreeNode

树节点,继承于承载数据的 Node 类。红黑树的操作是针对 TreeBin 类的,从该类的注释也可以看出,TreeBin 是对 TreeNode 的再一次封装,下面会提到。

1
2
3
4
5
6
7
8
9
10
11
**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
......
}

3、TreeBin

这个类并不负责用户的 key、value 信息,而是封装了很多 TreeNode 节点。实际的 ConcurrentHashMap “数组”中,存放的都是 TreeBin 对象,而不是 TreeNode 对象。

1
2
3
4
5
6
7
8
9
10
11
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
......
}

4、ForwardingNode

在扩容时会出现的特殊节点,其 key、value、hash 全部为 null。并拥有 nextTable 引用的新 table 数组。

1
2
3
4
5
6
7
8
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
.....
}

ConcurrentHashMap 的 CAS

ConcurrentHashMap 会大量使用 CAS 来修改它的属性和进行一些操作。因此,在理解 ConcurrentHashMap 的方法前,我们需要了解几个常用的利用 CAS 算法来保障线程安全的操作。

1、tabAt

1
2
3
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

该方法用来获取 table 数组中索引为 i 的 Node 元素。

2、casTabAt

1
2
3
4
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

利用 CAS 操作设置 table 数组中索引为 i 的元素

3、setTabAt

1
2
3
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

该方法用来设置 table 数组中索引为 i 的元素

ConcurrentHashMap 的方法

构造方法

ConcurrentHashMap 一共提供了以下 5 个构造方法:

1
2
3
4
5
6
7
8
9
10
// 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16
ConcurrentHashMap()
// 2. 给定map的大小
ConcurrentHashMap(int initialCapacity)
// 3. 给定一个map
ConcurrentHashMap(Map<? extends K, ? extends V> m)
// 4. 给定map的大小以及加载因子
ConcurrentHashMap(int initialCapacity, float loadFactor)
// 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程)
ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)

差别请看注释,我们来看看第 2 种构造方法,源码如下:

1
2
3
4
5
6
7
8
9
10
11
public ConcurrentHashMap(int initialCapacity) {
//1. 小于0直接抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
//2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//3. 赋值给sizeCtl
this.sizeCtl = cap;
}

这段代码的逻辑请看注释,很容易理解,如果小于 0 就直接抛异常,如果指定值大于所允许的最大值就取最大值,否则再对指定值做进一步处理。最后将 cap 赋值给 sizeCtl。

当调用构造方法之后,sizeCtl 的大小就代表了 ConcurrentHashMap 的大小,即 table 数组的长度

tableSizeFor 做了哪些事情呢?源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

注释写的很清楚,该方法会将构造方法指定的大小转换成一个 2 的幂次方数,也就是说 ConcurrentHashMap 的大小一定是 2 的幂次方,比如,当指定大小为 18 时,为了满足 2 的幂次方特性,实际上 ConcurrentHashMap 的大小为 2 的 5 次方(32)。

另外,需要注意的是,调用构造方法时并初始化 table 数组,而只算出了 table 数组的长度,当第一次向 ConcurrentHashMap 插入数据时才会真正的完成初始化,并创建 table 数组

initTable 方法

直接上源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 1. 保证只有一个线程正在进行初始化操作
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

代码的逻辑请见注释。

可能存在这样一种情况,多个线程同时进入到这个方法,为了保证能够正确地初始化,第 1 步会先通过 if 进行判断,如果当前已经有一个线程正在初始化,这时候其他线程会调用 Thread.yield() 让出 CPU 时间片。

正在进行初始化的线程会调用 U.compareAndSwapInt 方法将 sizeCtl 改为 -1,即正在初始化的状态。

另外还需要注意,在第四步中会进一步计算数组中可用的大小,即数组的实际大小 n 乘以加载因子 0.75,0.75 就是四分之三,这里n - (n >>> 2)刚好是n-(1/4)n=(3/4)n,挺有意思的吧?

如果选择是无参的构造方法,这里在 new Node 数组的时候会使用默认大小DEFAULT_CAPACITY(16),然后乘以加载因子 0.75,结果为 12,也就是说数组当前的可用大小为 12。

put 方法

调用 put 方法时会调用 putVal 方法,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1. 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//4. 当前正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//5. 当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 6.当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}

ConcurrentHashMap 是一个哈希桶数组,如果不出现哈希冲突的时候,每个元素均匀的分布在哈希桶数组中。当出现哈希冲突的时候,采用拉链法的解决方案,将 hash 值相同的节点转换成链表的形式,另外,在 JDK 1.8 版本中,为了防止拉链过长,当链表的长度大于 8 的时候会将链表转换成红黑树。

确定好数组的索引 i 后,可以调用 tabAt() 方法获取该位置上的元素,如果当前 Node 为 null 的话,可以直接用 casTabAt 方法将新值插入。

拉链法、确定索引 i 的知识在学习 HashMap

如果当前节点不为 null,且该节点为特殊节点(forwardingNode),就说明当前 concurrentHashMap 正在进行扩容操作。怎么确定当前这个 Node 是特殊节点呢?

通过判断该节点的 hash 值是不是等于 -1(MOVED):

1
static final int MOVED     = -1; // hash for forwarding nodes

table[i] 不为 null 并且不是 forwardingNode 时,以及当前 Node 的 hash 值大于0(fh >= 0)时,说明当前节点为链表的头节点,那么向 ConcurrentHashMap 插入新值就是向这个链表插入新值。通过 synchronized (f) 的方式进行加锁以实现线程安全。

往链表中插入节点的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到hash值相同的key,覆盖旧值即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//如果到链表末尾仍未找到,则直接将新值插入到链表末尾即可
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}

这部分代码很好理解,就两种情况:

  1. 如果在链表中找到了与待插入的 key 相同的节点,就直接覆盖;
  2. 如果找到链表的末尾都还没找到的话,直接将待插入的键值对追加到链表的末尾。

当链表长度超过 8(默认值)时,链表就转换为红黑树,利用红黑树快速增删改查的特点可以提高 ConcurrentHashMap 的性能:

1
2
3
4
5
6
7
8
9
10
if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}

这段代码很简单,调用 putTreeVal 方法向红黑树插入新节点,同样的逻辑,如果在红黑树中存在 Key 相同(hash 值相等并且 equals 方法判断为 true)的节点,就覆盖旧值,否则向红黑树追加新节点

当完成数据新节点插入后,会进一步对当前链表大小进行调整:

1
2
3
4
5
6
7
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}

至此,put 方法就分析完了,我们来做个总结:

  1. 对每一个放入的值,先用 spread 方法对 key 的 hashcode 进行 hash 计算,由此来确定这个值在 table 中的位置;
  2. 如果当前 table 数组还未初始化,进行初始化操作;
  3. 如果这个位置是 null,那么使用 CAS 操作直接放入;
  4. 如果这个位置存在节点,说明发生了 hash 碰撞,先判断这个节点的类型,如果该节点 ==MOVED 的话,说明正在进行扩容;
  5. 如果是链表节点(fh>0),先获取头节点,再依次向后遍历确定这个新加入节点的位置。如果遇到 key 相同的节点,直接覆盖。否则在链表尾插入;
  6. 如果这个节点的类型是 TreeBin,直接调用红黑树的插入方法插入新的节点;
  7. 插入完节点之后再次检查链表的长度,如果长度大于 8,就把这个链表转换成红黑树;
  8. 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容。

get 方法

get 方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 重hash
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 2. table[i]桶节点的key与查找的key相同,则直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
//4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
  • 哈希: 对传入的键的哈希值进行散列,这有助于减少哈希冲突的可能性。使用 spread 方法可以保证不同的键更均匀地分布在桶数组中。
  • 直接查找: 查找的第一步是检查键的哈希值是否位于表的正确位置。如果在该桶的第一个元素中找到了键,则直接返回该元素的值。这里使用了 == 操作符和 equals 方法来比较键,这有助于处理可能的 null 值和确保正确的相等性比较。
  • 红黑树查找: 如果第一个节点的哈希值小于 0,那么这个桶的数据结构是红黑树(Java 8 引入了树化结构来改进链表在哈希冲突时的性能)。在这种情况下,使用 find 方法在红黑树中查找键。
  • 链表查找: 如果前两个条件都不满足,那么代码将遍历该桶中的链表。如果在链表中找到了具有相同哈希值和键的元素,则返回其值。如果遍历完整个链表都未找到,则返回 null。

transfer 方法

当 ConcurrentHashMap 容量不足的时候,需要对 table 进行扩容。这个方法的基本思想跟 HashMap 很像,但由于支持并发扩容,所以要复杂一些。transfer 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//1. 新建Node数组,容量为之前的两倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//2. 新建forwardingNode引用,在之后会用到
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 3. 确定遍历中的索引i
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//4.将原数组中的元素复制到新数组中去
//4.5 for循环退出,扩容结束修改sizeCtl属性
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//4.1 当前数组中第i个元素为null,用CAS设置成特殊节点forwardingNode(可以理解成占位符)
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//4.2 如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//4.3 处理当前节点为链表的头结点的情况,根据最高位为1还是为0(最高位指数组长度位),将原链表拆分为两个链表,分别放到新数组的i位置和i+n位置。这里还通过巧妙的处理措施,使得原链表中的一部分能直接平移到新链表(即lastRun及其后面跟着的一串节点),剩下部分才需要通过new方式克隆移动到新链表中(采用头插法)。
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln); //可以看到是逆序插入新节点的(头插)
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
//4.4 处理当前节点是TreeBin时的情况,操作和上面的类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

代码逻辑请看注释,整个扩容操作分为两个部分

第一部分是构建一个 nextTable,它的容量是原来的两倍,这个操作是单线程完成的。

第二个部分是将原来 table 中的元素复制到 nextTable 中,主要是遍历复制的过程。 得到当前遍历的数组位置 i,然后利用 tabAt 方法获得 i 位置的元素:

  1. 如果这个位置为空,就在原 table 中的 i 位置放入 forwardNode 节点,这个也是触发并发扩容的关键;
  2. 如果这个位置是 Node 节点(fh>=0),并且是链表的头节点,就把这个链表分裂成两个链表,把它们分别放在 nextTable 的 i 和 i+n 的位置上;
  3. 如果这个位置是 TreeBin 节点(fh<0),也做一个反序处理,并且判断是否需要 untreefi,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上;
  4. 遍历所有的节点,就完成复制工作,这时让 nextTable 作为新的 table,并且更新 sizeCtl 为新容量的 0.75 倍 ,完成扩容。

ConcurrentHashMap扩容示意图

size 相关的方法

对于 ConcurrentHashMap 来说,这个 table 里到底装了多少东西是不确定的,因为不可能在调用 size() 方法的时候“stop the world”让其他线程都停下来去统计,对于这个不确定的 size,ConcurrentHashMap 仍然花费了大量的力气。

为了统计元素的个数,ConcurrentHashMap 定义了一些变量和一个内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

/******************************************/

/**
* 实际上保存的是HashMap中的元素个数 利用CAS锁进行更新
但它并不用返回当前HashMap的元素个数

*/
private transient volatile long baseCount;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;

/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;

再来看如何统计的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;//所有counter的值求和
}
}
return sum;
}

size 方法返回 Map 中的元素数量,但结果被限制在 Integer.MAX_VALUE 内。如果计算的大小超过这个值,则返回 Integer.MAX_VALUE。如果计算的大小小于 0,则返回 0。

mappingCount 方法也返回 Map 中的元素数量,但允许返回一个 long 值,因此可以表示大于 Integer.MAX_VALUE 的数量。与 size() 方法类似,该方法也会忽略负值,返回 0。

sumCount 方法计算 Map 的实际大小。ConcurrentHashMap 使用一个基础计数 baseCount 和一个 CounterCell 数组 counterCells 来跟踪大小。这种结构有助于减少多线程环境中的争用,因为不同的线程可能会更新不同的 CounterCell。

在计算总和时,sumCount() 方法将 baseCount 与 counterCells 数组中的所有非空单元的值相加。

在 put 方法结尾处调用了 addCount 方法,把当前 ConcurrentHashMap 的元素个数 +1,这个方法一共做了两件事,更新 baseCount 的值,检测是否进行扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果check值大于等于0 则需要检验是否需要进行扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

ConcurrentHashMap 示例

假设我们想要构建一个线程安全的高并发统计用户访问次数的功能。在这里,ConcurrentHashMap 是一个很好的选择,因为它提供了高并发性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.ConcurrentHashMap;

public class UserVisitCounter {

private final ConcurrentHashMap<String, Integer> visitCountMap;

public UserVisitCounter() {
this.visitCountMap = new ConcurrentHashMap<>();
}

// 用户访问时调用的方法
public void userVisited(String userId) {
visitCountMap.compute(userId, (key, value) -> value == null ? 1 : value + 1);
}

// 获取用户的访问次数
public int getVisitCount(String userId) {
return visitCountMap.getOrDefault(userId, 0);
}

public static void main(String[] args) {
UserVisitCounter counter = new UserVisitCounter();

// 模拟用户访问
counter.userVisited("user1");
counter.userVisited("user1");
counter.userVisited("user2");

System.out.println("User1 visit count: " + counter.getVisitCount("user1")); // 输出: User1 visit count: 2
System.out.println("User2 visit count: " + counter.getVisitCount("user2")); // 输出: User2 visit count: 1
}
}

在上述示例中:

  • 我们使用了 ConcurrentHashMap 来存储用户的访问次数。
  • 当用户访问时,我们通过 userVisited 方法更新访问次数。
  • 使用 ConcurrentHashMap 的 compute 方法可以确保原子地更新用户的访问次数。
  • 可以通过 getVisitCount 方法检索任何用户的访问次数。

ConcurrentHashMap 使我们能够无需担心并发问题就能构建这样一个高效的统计系统。