由于JVM的Synchronized重量级锁涉及操作系统(如Linux)内核态下互斥锁的使用,因此其线程阻塞和唤醒都涉及进程在用户态到内核态的频繁切换,导致重量级锁开销大、性能低。而JVM的Synchronized轻量级锁使用CAS(Compare And Swap,比较并交换)进行自旋抢锁,CAS是CPU指令级的原子操作,并处于用户态下,所以JVM轻量级锁的开销较小

什么是CAS

JDK 5所增加的JUC(java.util.concurrent)并发包对操作系统的底层CAS原子操作进行了封装,为上层Java程序提供了CAS操作的API。

Unsafe类中的CAS方法

Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全的底层操作,如直接访问系统内存资源、自主管理内存资源等。Unsafe大量的方法都是native方法,基于C++语言实现,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。

Unsafe类的全限定名为sun.misc.Unsafe,从名字中可以看出这个类对普通程序员来说是“危险”的,一般的应用开发都不会涉及此类,Java官方也不建议直接在应用程序中使用这些类

为什么此类取名为Unsafe呢?由于使用Unsafe类可以像C语言一样使用指针操作内存空间,这无疑增加了指针相关问题、内存泄漏问题出现的概率。总之,在程序中过度使用Unsafe类会使得程序出错的概率变大,使得安全的语言Java变得不再安全,因此对Unsafe的使用一定要慎重。

操作系统层面的CAS是一条CPU的原子指令(cmpxchg指令),正是由于该指令具备原子性,因此使用CAS操作数据时不会造成数据不一致的问题,Unsafe提供的CAS方法直接通过native方式(封装C++代码)调用了底层的CPU指令cmpxchg。

完成Java应用层的CAS操作主要涉及Unsafe方法的调用,具体如下:

  1. 获取Unsafe实例。
  2. 调用Unsafe提供的CAS方法,这些方法主要封装了底层CPU的CAS原子操作。
  3. 调用Unsafe提供的字段偏移量方法,这些方法用于获取对象中的字段(属性)偏移量,此偏移量值需要作为参数提供给CAS操作。

获取Unsafe实例

Unsafe类是一个final修饰的不允许继承的最终类,而且其构造函数是private类型的方法,具体的源码如下:

1
2
3
4
5
6
7
8
9
10
public final class Unsafe {
private static final Unsafe theUnsafe;
public static final int INVALID_FIELD_OFFSET = -1;

private static native void registerNatives();
// 构造函数是private的,不允许外部实例化
private Unsafe() {
}
...
}

因此,我们无法在外部对Unsafe进行实例化,那么怎么获取Unsafe的实例呢?可以通过反射的方式自定义地获取Unsafe实例的辅助方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 省略import
public class JvmUtil {
//自定义地获取Unsafe实例的辅助方法
public static Unsafe getUnsafe() {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
} catch (Exception e) {
throw new AssertionError(e);
}
} // 省略不相干代码
}

调用Unsafe提供的CAS方法

Unsafe提供的CAS方法包含4个操作数——字段所在的对象、字段内存位置、预期原值及新值。在执行Unsafe的CAS方法时,这些方法首先将内存位置的值与预期值(旧的值)比较,如果相匹配,那么CPU会
自动将该内存位置的值更新为新值,并返回true;如果不匹配,CPU不做任何操作,并返回false

Unsafe的CAS操作会将第一个参数(对象的指针、地址)与第二个参数(字段偏移量)组合在一起,计算出最终的内存操作地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 定义在Unsafe类中的三个“比较并交换”原子方法。
*
* @param o 需要操作的字段所在的对象
* @param offset 需要操作的字段的偏移量(相对的,相对于对象头)
* @param expected 期望值(旧的值)
* @param update 更新值(新的值)
* @return true 更新成功 | false 更新失败
*/
public final native boolean compareAndSwapObject(
Object o, long offset, Object expected, Object update);

public final native boolean compareAndSwapInt(
Object o, long offset, int expected, int update);

public final native boolean compareAndSwapLong(
Object o, long offset, long expected, long update);

调用Unsafe提供的偏移量相关

Unsafe提供的获取字段(属性)偏移量的相关操作主要如下:

  • staticFieldOffset()方法用于获取静态属性Field在Class对象中的偏移量,在CAS中操作静态属性时会用到这个偏移量。
  • objectFieldOffset()方法用于获取非静态Field(非静态属性)在Object实例中的偏移量,在CAS中操作对象的非静态属性时会用到这个偏移量。
1
2
3
4
5
6
7
8
9
/**
* 定义在Unsafe类中的几个获取字段偏移量的方法
* @param o 需要操作字段的反射
* @return 字段的偏移量
*/

public native long staticFieldOffset(Field field);

public native long objectFieldOffset(Field field);

一个获取非静态Field(非静态属性)在Object实例中的偏移量的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
static {
try {
// 获取反射的Field对象
Field field = OptimisticLockingPlus.class.getDeclaredField("value");

// 取得内存偏移
valueOffset = unsafe.objectFieldOffset(field);
} catch (Exception ex) {
throw new Error(ex);
}
}

使用CAS进行无锁编程

CAS是一种无锁算法,该算法关键依赖两个值——期望值(旧值)和新值,底层CPU利用原子操作判断内存原值与期望值是否相等,如果相等就给内存地址赋新值,否则不做任何操作。

使用CAS进行无锁编程的步骤大致如下:

  1. 获得字段的期望值(oldValue)。
  2. 计算出需要替换的新值(newValue)。
  3. 通过CAS将新值(newValue)放在字段的内存地址上,如果CAS失败就重复第(1)步到第(2)步,一直到CAS成功,这种重复俗称CAS自旋

使用CAS进行无锁编程的伪代码如下:

1
2
3
4
5
do {
获得字段的期望值(oldValue);
计算出需要替换的新值(newValue);
} while (!CAS(内存地址,oldValue,newValue))

假如某个内存地址(某对象的属性)的值为100,现在有两个线程(线程A和线程B)使用CAS无锁编程对该内存地址进行更新,线程A欲将其值更新为200,线程B欲将其值更新为300,线程是并发执行的,谁都有可能先执行。但是CAS是原子操作,对同一个内存地址的CAS操作在同一时刻只能执行一个。因此,在这个例
子中,要么线程A先执行,要么线程B先执行。

  • 假设线程A的CAS(100,200)执行在前,由于内存地址的旧值100与该CAS的期望值100相等,因此线程A会操作成功,内存地址的值被更新为200。
  • 接下来执行线程B的CAS(100,300)操作,此时内存地址的值为200,不等于CAS的期望值100,线程B操作失败。线程B只能自旋,开始新的循环,这一轮循环首先获取到内存地址的值200,然后进行CAS(200,300)操作,这一次内存地址的值与CAS的预期值(oldValue)相等,线程B操作成功。

当CAS将内存地址的值与预期值进行比较时,如果相等,就证明内存地址的值没有被修改,可以替换成新值,然后继续往下运行;如果不相等,就说明内存地址的值已经被修改,放弃替换操作,然后重新自旋。当并发修改的线程少,冲突出现的机会少时,自旋的次数也会很少,CAS的性能会很高;当并发修改的线程多,冲突出现的机会多时,自旋的次数也会很多,CAS的性能会大大降低。所以,提升CAS无
锁编程效率的关键在于减少冲突的机会

无锁编程实现轻量级安全自增

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Test {
private int value;
// 统计失败的次数
private static final AtomicLong failure = new AtomicLong(0);

public int increment() {
return ++value;
}

// 自增
public synchronized int synchronincrement() {
return ++value;
}

// cas自增
public int casincrement() {
int expected = value;
int newvalue = expected + 1;
while (!compareAndSwap(expected, newvalue)) {
expected = value;
newvalue = expected + 1;
}
return newvalue;
}

private boolean compareAndSwap(int expected, int newvalue) {
Unsafe unsafe = null;
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);

long offset = unsafe.objectFieldOffset(Test.class.getDeclaredField("value"));
System.out.println("offset:" + offset);
boolean result = unsafe.compareAndSwapInt(this, offset, expected, newvalue);
if (!result) {
failure.incrementAndGet();
}
return result;
} catch (Exception e) {
throw new AssertionError(e);
}
}


public static void main(String[] args) throws InterruptedException {
Test obj = new Test();
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
// 对value进行CAS操作
for (int j = 0; j < 1000; j++) {
obj.casincrement();
}
});
threads[i] = thread;
thread.start();
}
// 等待所有线程执行完毕
for (Thread thread : threads) {
thread.join();
}
System.out.println(obj.value);
System.out.println("失败次数:" + failure.get());
}
}

// offset:12
// 10000
// 失败次数:49430

字段偏移量的计算

调用Unsafe.objectFieldOffset(…)方法获取到的Object字段(也叫Object成员属性)的偏移量值是字段相对于Object头部的偏移
量,是一个相对的内存地址值,不是绝对的内存地址值

1
2
3
4
5
6
public class Test {
private int value;
// 统计失败的次数
private static final AtomicLong failure = new AtomicLong(0);
// ...
}

虽然Test类有2个字段,但是其中有1个是静态字段,属于类的成员而不是对象的成员,真正属于对象的字段只有其中的value字段。所以类的对象结构
如图所示:

在64位的JVM堆区中一个Test对象的Object Header(头部)占用了12字节,其中Mark Word占用了8字节(64位),压缩过的Class Pointer占用了4字节。接在Object Header之后的就是成员属性value的内存区域,所以value属性相对于Object Header的偏移量为12

JUC原子类

在多线程并发执行时,诸如“++”或“–”类的运算不具备原子性,不是线程安全的操作。通常情况下,大家会使用synchronized将这些线程不安全的操作变成同步操作,但是这样会降低并发程序的性能。所以,JDK为这些类型不安全的操作提供了一些原子类,与synchronized同步机制相比,JDK原子类是基于CAS轻量级原子操作的实现,使得程序运行效率变得更高。

Atomic原子操作包

Atomic操作翻译成中文是指一个不可中断的操作,即使在多个线程一起执行Atomic类型操作的时候,一个操作一旦开始,就不会被其
他线程中断。所谓Atomic类,指的是具有原子操作特征的类。JUC并发包中原子类的位置JUC并发包中的原子类都存放在java.util.concurrent.atomic类路径下.

根据操作的目标数据类型,可以将JUC包中的原子类分为4类:基本原子类数组原子类原子引用类字段更新原子类

基本原子类

  • 基本原子类的功能是通过原子方式更新Java基础类型变量的值。
  • 基本原子类主要包括以下三个:
    • AtomicInteger:整型原子类。
    • AtomicLong:长整型原子类。
    • AtomicBoolean:布尔型原子类。

数组原子类

  • 数组原子类的功能是通过原子方式更数组中的某个元素的值。数
  • 组原子类主要包括以下三个:
    • AtomicIntegerArray:整型数组原子类。
    • AtomicLongArray:长整型数组原子类。
    • AtomicReferenceArray:引用类型数组原子类。

引用原子类

  • 引用原子类主要包括以下三个:
    • AtomicReference:引用类型原子类。
    • AtomicMarkableReference:带有更新标记位的原子引用类型。
    • AtomicStampedReference:带有更新版本号的原子引用类型。
  • AtomicMarkableReference类将boolean标记与引用关联起来,可以解决使用AtomicBoolean进行原子更新时可能出现的ABA问题。
  • AtomicStampedReference类将整数值与引用关联起来,可以解决使用AtomicInteger进行原子更新时可能出现的ABA问题。

字段更新原子类

  • 字段更新原子类主要包括以下三个:
    • AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
    • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
    • AtomicReferenceFieldUpdater:原子更新引用类型中的字段

基础原子类AtomicInteger

基础原子类AtomicInteger常用的方法如下:

1
2
3
4
5
6
public final int get() //获取当前的值
public final int getAndSet(int newValue) //获取当前的值,然后设置新的值
public final int getAndIncrement() //获取当前的值,然后自增
public final int getAndDecrement() //获取当前的值,然后自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //通过CAS方式设置整数值

下面是一个基础原子类AtomicInteger的使用示例,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerExample {
// 定义一个 Atomic Integer 实例
private static final AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
// 创建并启动多个线程来更新计数器
Thread thread1 = new Thread(() -> updateCounter(5));
Thread thread2 = new Thread(() -> updateCounter(3));
Thread thread3 = new Thread(() -> updateCounter(7));

// 启动线程
thread1.start();
thread2.start();
thread3.start();

// 等待所有线程执行完毕
thread1.join();
thread2.join();
thread3.join();

System.out.println("Final counter value: " + counter.get());
}

private static void updateCounter(int increment) {
for (int i = 0; i < increment; i++) {
// 使用原子操作方法安全地增加计数值
int newValue = counter.addAndGet(1);
System.out.println(Thread.currentThread().getName() + " updated the counter to: " + newValue);
}
}
}

数组原子类AtomicIntegerArray

AtomicIntegerArray.AtomicLongArray,AtomicReferenceArray三个类提供的方法几乎相同,所以我们这里以AtomicIntegerArray为例来介绍。

AtomicIntegerArray类的常用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//获取 index=i 位置元素的值
public final int get(int i)

//返回 index=i 位置当前的值,并将其设置为新值:newValue
public final int getAndSet(int i, int newValue)

//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndIncrement(int i)

//获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndDecrement(int i)

//获取 index=i 位置元素的值,并加上预期的值
public final int getAndAdd(int delta)

//如果输入的数值等于预期值,就以原子方式将位置i的元素值设置为输入值(update)
boolean compareAndSet(int expect, int update)

//最终将位置i的元素设置为newValue
//lazySet()方法可能导致其他线程在之后的一小段时间内还是可以读到旧的值
public final void lazySet(int i, int newValue)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void testAtomicIntegerArray() {
int tempvalue = 0;
// 原始的数组
int[] array = {1, 2, 3, 4, 5, 6};

// 包装为原子数组
AtomicIntegerArray i = new AtomicIntegerArray(array);
// 获取第0个元素,然后设置为2
tempvalue = i.getAndSet(0, 2);
// 输出tempvalue:1; i:[2, 2, 3, 4, 5, 6]
Print.fo("tempvalue:" + tempvalue + "; i:" + i);
// 获取第0个元素,然后自增
tempvalue = i.getAndIncrement(0);
// 输出tempvalue:2; i:[3, 2, 3, 4, 5, 6]
Print.fo("tempvalue:" + tempvalue + "; i:" + i);

// 获取第0个元素,然后增加一个delta 5
tempvalue = i.getAndAdd(0, 5);
// 输出tempvalue:3; i:[8, 2, 3, 4, 5, 6]
Print.fo("tempvalue:" + tempvalue + "; i:" + i);
}

AtomicInteger线程安全原理

基础原子类(以AtomicInteger为例)主要通过CAS自旋+volatile的方案实现,既保障了变量操作的线程安全性,又避免了synchronized重量级锁的高开销,使得Java程序的执行效率大为提升。

AtomicInteger源码中的主要方法都是通过CAS自旋实现的。CAS自旋的主要操作为:如果一次CAS操作失败,获取最新的value值后,再
次进行CAS操作,直到成功

另外,AtomicInteger所包装的内部value成员是一个使用关键字volatile修饰的内部成员。关键字volatile的原理比较复杂,简单地
说,该关键字可以保证任何线程在任何时刻总能拿到该变量的最新值,其目的在于保障变量值的线程可见性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class AtomicInteger extends Number implements java.io.Serializable {

// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 内部value值,使用volatile保证线程可见性
private volatile int value;

// value属性值的地址偏移量
private static final long valueOffset;

static {
try {
// 计算value 属性值的地址偏移量
valueOffset = unsafe.objectFieldOffset(

AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}

// 初始化
public AtomicInteger(int initialValue) {
value = initialValue;
}

// 获取当前value值
public final int get() {
return value;
}

// 方法:返回旧值并赋新值
public final int getAndSet(int newValue) {
for (; ; ) {// 自旋
int current = get();// 获取旧值

// 以CAS方式赋值,直到成功返回
if (compareAndSet(current, newValue)) return current;
}
}

// 方法:封装底层的CAS操作,对比expect(期望值)与value,若不同则返回false

// 若expect与value相同,则将新值赋给value,并返回true
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// 方法:安全自增 i++
public final int getAndIncrement() {
for (; ; ) { // 自旋
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) return current;
}
}

// 方法:自定义增量数
public final int getAndAdd(int delta) {
for (; ; ) { // 自旋
int current = get();
int next = current + delta;
if (compareAndSet(current, next)) return current;
}
}

// 方法:类似++i,返回自增后的值
public final int incrementAndGet() {
for (; ; ) { // 自旋
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) return next;
}
}

// 方法:返回加上delta后的值
public final int addAndGet(int delta) {
for (; ; ) { // 自旋
int current = get();
int next = current + delta;
if (compareAndSet(current, next)) return next;
}
}
// 省略其他源码
}

引用类型原子类

  • 引用类型原子类包括以下种:
    • AtomicReference:基础的引用原子类。
    • AtomicStampedReference:带印戳的引用原子类。
    • AtomicMarkableReference:带修改标志的引用原子类。
  • 上面三个类提供的方法几乎相同,所以这里以AtomicReference为例来介绍。

下面为大家介绍一个简单的AtomicReference类的使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

qwen2.5:7b
在Java中,AtomicReference 是一个可以支持原子操作的类。它允许你以线程安全的方式更新对象的引用,而不必担心同步问题。这在多线程环境中非常有用。

下面是一个使用 AtomicReference 对 User 引用进行原子性修改的例子:

java


import java.util.concurrent.atomic.AtomicReference;

class User {
private String name;
// 其他属性和方法...

public User(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

public class AtomicReferenceExample {

public static void main(String[] args) {
// 创建一个User对象
User user = new User("John");

// 使用AtomicReference包装user引用
AtomicReference<User> atomicUserRef = new AtomicReference<>(user);

// 第一个线程尝试更新用户的名字
Thread thread1 = new Thread(() -> {
try {
Thread.sleep(500); // 模拟一些延迟
User newUser = new User("Jane");
atomicUserRef.set(newUser);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// 第二个线程尝试读取用户的名字
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(1000); // 模拟一些延迟
User readUser = atomicUserRef.get();
System.out.println("Name after update: " + readUser.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// 启动线程
thread1.start();
thread2.start();

try {
// 等待所有线程完成
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

属性更新原子类

此这里以AtomicIntegerFieldUpdater为例来介绍。使用属性更新原子类保障属性安全更新的流程大致需要两步:

  1. 第一步,更新的对象属性必须使用public volatile修饰符。
  2. 第二步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须调用静态方法newUpdater()创建一个更新器,并且需要设
    置想要更新的类和属性。

下面为大家介绍一个简单的AtomicIntegerFieldUpdater类的使用示例,原子性地更新User对象的age属性,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public void testAtomicIntegerFieldUpdater() {
// 调用静态方法newUpdater()创建一个更新器updater
AtomicIntegerFieldUpdater<User> updater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
User user = new User("1", "张三");
// 使用属性更新器的getAndIncrement、getAndAdd增加user的age值

Print.tco(updater.getAndIncrement(user));// 1
Print.tco(updater.getAndAdd(user, 100));// 101

// 使用属性更新器的get获取user的age值
Print.tco(updater.get(user));// 101
}

ABA问题

由于CAS原子操作性能高,因此其在JUC包中被广泛应用,只不过如果使用得不合理,CAS原子操作就会存在ABA问题。

了解ABA问题

什么是ABA问题?举一个例子来说明。比如一个线程A从内存位置M中取出V1,另一个线程B也取出V1。现在假设线程B进行了一些操作之
后将M位置的数据V1变成了V2,然后又在一些操作之后将V2变成了V1。之后,线程A进行CAS操作,但是线程A发现M位置的数据仍然是V1,然
后线程A操作成功。尽管线程A的CAS操作成功,但是不代表这个过程是没有问题的,线程A操作的数据V1可能已经不是之前的V1,而是被线程
B替换过的V1,这就是ABA问题

并发业务场景下,两个并发的查询库存操作,同时从数据库都得到了库存是5。用户1购买了3个库存,于是库存要设置为2, 用户2购买了2个库存,于是库存要设置为3,这两个设置库存的接口并发执行,库存会先变成2,再变成3,导致数据不一致(实际卖出了5件商品,但库存只扣减了2,最后一次设置库存会覆盖和掩盖前一次并发操作)

ABA问题解决方案

很多乐观锁的实现版本都是使用版本号(Version)方式来解决ABA问题。乐观锁每次在执行数据的修改操作时都会带上一个版本号,版本号和数据的版本号一致就可以执行修改操作并对版本号执行加1操作,否则执行失败。因为每次操作的版本号都会随之增加,所以不会出现ABA问题,因为版本号只会增加,不会减少。

AtomicStampedReference

参考乐观锁的版本号,JDK提供了一个AtomicStampedReference类来解决ABA问题。AtomicStampReference在CAS的基础上增加了一个
Stamp(印戳或标记),使用这个印戳可以用来觉察数据是否发生变化,给数据带上了一种实效性的检验。

AtomicStampReference的compareAndSet()方法首先检查当前的对象引用值是否等于预期引用,并且当前印戳(Stamp)标志是否等于预
期标志,如果全部相等,就以原子方式将引用值和印戳(Stamp)标志的值更新为给定的更新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//构造器,V表示要引用的原始数据,initialStamp表示最初的版本印戳(版本号)
AtomicStampedReference(V initialRef, int initialStamp)

//获取被封装的数据
public V getRerference();

//获取被封装的数据的版本印戳
public int getStamp();


public boolean compareAndSet(
V expectedReference, //预期引用值
V newReference, //更新后的引用值
int expectedStamp, //预期印戳(Stamp)标志值
int newStamp) //更新后的印戳(Stamp)标志值
)

compareAndSet()方法的第一个参数是原来的CAS中的参数,第二个参数是替换后的新参数,第三个参数是原来CAS数据旧的版本号,第
四个参数表示替换后的新参数版本号。进行CAS操作时,若当前引用值等于预期引用值,并且当前印戳值等于预期印戳值,则以原子方式将引用值和印戳值更新为给定的更新值。
下面是一个简单的AtomicStampedReference使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 使用 AtomicStampedReference 存储当前的库存值和版本号
private static final AtomicStampedReference<Integer> stock = new AtomicStampedReference<>(10, 0);

private static void updateInventory(int quantity) {
int currentValue, newStamp;
do {
currentValue = stock.getReference(); // 获取当前库存值
newStamp = stock.getStamp() + 1; // 获取并增加版本号

if (currentValue >= quantity) { // 检查是否足够库存
int newValue = currentValue - quantity;
boolean updated = stock.compareAndSet(currentValue, newValue, newStamp, newStamp + 1);
if (!updated) {
System.out.println("Failed to update inventory in thread " + Thread.currentThread().getName());
}
} else {
System.out.println("Insufficient stock for update by thread " + Thread.currentThread().getName());
}
} while (true); // 重试循环,直到更新成功
}

AtomicMarkableReference

AtomicMarkableReference是AtomicStampedReference的简化版,不关心修改过几次,只关心是否修改过。因此,其标记属性mark是boolean类型,而不是数字类型,标记属性mark仅记录值是否修改过。AtomicMarkableReference适用于只要知道对象是否被修改过,而不适用于对象被反复修改的场景

下面是一个简单的AtomicMarkableReference使用示例,通过两个线程分别更新同一个stock 的值,第一个线程会更新成功,而第二个线程会更新失败,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 使用 AtomicMarkableReference 存储当前的库存值和标记(表示是否有效)
private static final AtomicMarkableReference<Integer> stock = new AtomicMarkableReference<>(10, false);

private static void updateInventory(int quantity) {
int currentValue, currentMark;

do {
// 获取当前的库存值和标记
currentValue = stock.getReference();
currentMark = stock.getMark();

if (!currentMark && currentValue >= quantity) { // 检查是否足够库存且标记未被更改
boolean updated = stock.compareAndSet(currentValue, currentValue - quantity, currentMark, true);
if (updated) {
System.out.println("Inventory updated by thread " + Thread.currentThread().getName() + ": " + currentValue - quantity);
} else {
System.out.println("Failed to update inventory in thread " + Thread.currentThread().getName());
}
} else {
System.out.println("Insufficient stock for update by thread " + Thread.currentThread().getName());
}
} while (!updated); // 重试循环,直到更新成功
}

提升高并发场景下CAS操作的性能

在争用激烈的场景下,会导致大量的CAS空自旋。比如,在大量线程同时并发修改一个AtomicInteger时,可能有很多线程会不停地自旋,甚至有的线程会进入一个无限重复的循环中。大量的CAS空自旋会浪费大量的CPU资源,大大降低了程序的性能。

除了存在CAS空自旋之外,在SMP架构的CPU平台上,大量的CAS操作还可能导致“总线风暴”

在高并发场景下如何提升CAS操作的性能呢?可以使用LongAdder替代AtomicInteger

以空间换时间:LongAdder

Java 8提供了一个新的类LongAdder,以空间换时间的方式提升高并发场景下CAS操作的性能。LongAdder的核心思想是热点分离,与ConcurrentHashMap的设计思想类似:将value值分离成一个数组,当多线程访问时,通过Hash算法将线程映射到数组的一个元素进行操作;而获取最终的value结果时,则将数组的元素求和。最终,通过LongAdder将内部操作对象从单个value值“演变”成一系列的数组元素,从而减小了内部竞争的粒度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

public class AtomicLongAdderTest {
public static void main(String[] args) throws Exception{
testAtomicLongAdder(1, 10000000);
testAtomicLongAdder(10, 10000000);
testAtomicLongAdder(100, 10000000);
}

static void testAtomicLongAdder(int threadCount, int times) throws Exception{
System.out.println("threadCount: " + threadCount + ", times: " + times);
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start) + "ms");
System.out.println("threadCount: " + threadCount + ", times: " + times);
long atomicStart = System.currentTimeMillis();
testAtomicLong(threadCount, times);
System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - atomicStart) + "ms");
System.out.println("----------------------------------------");
}

static void testAtomicLong(int threadCount, int times) throws Exception{
AtomicLong atomicLong = new AtomicLong();
List<Thread> list = new ArrayList();
for (int i = 0; i < threadCount; i++) {
list.add(new Thread(() -> {
for (int j = 0; j < times; j++) {
atomicLong.incrementAndGet();
}
}));
}

for (Thread thread : list) {
thread.start();
}

for (Thread thread : list) {
thread.join();
}

System.out.println("AtomicLong value is : " + atomicLong.get());
}

static void testLongAdder(int threadCount, int times) throws Exception{
LongAdder longAdder = new LongAdder();
List<Thread> list = new ArrayList();
for (int i = 0; i < threadCount; i++) {
list.add(new Thread(() -> {
for (int j = 0; j < times; j++) {
longAdder.increment();
}
}));
}

for (Thread thread : list) {
thread.start();
}

for (Thread thread : list) {
thread.join();
}

System.out.println("LongAdder value is : " + longAdder.longValue());
}
}


threadCount: 1, times: 10000000
LongAdder value is : 10000000
LongAdder 耗时:75ms
threadCount: 1, times: 10000000
AtomicLong value is : 10000000
AtomicLong 耗时:23ms
----------------------------------------
threadCount: 10, times: 10000000
LongAdder value is : 100000000
LongAdder 耗时:52ms
threadCount: 10, times: 10000000
AtomicLong value is : 100000000
AtomicLong 耗时:958ms
----------------------------------------
threadCount: 100, times: 10000000
LongAdder value is : 1000000000
LongAdder 耗时:266ms
threadCount: 100, times: 10000000
AtomicLong value is : 1000000000
AtomicLong 耗时:12221ms
----------------------------------------

这里可以看到随着并发的增加AtomicLong性能是急剧下降的,耗时是LongAdder的数倍。至于原因我们还是接着往后看。

AtomicLong可以弃用了吗?

看上去LongAdder的性能全面超越了AtomicLong,而且阿里巴巴开发手册也提及到 推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观
锁的重试次数)
,但是我们真的就可以舍弃掉LongAdder了吗?

当然不是,我们需要看场景来使用,如果是并发不太高的系统,使用AtomicLong可能会更好一些,而且内存需求也会小一些。

我们通过分析源码sum()方法后可以知道LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差。而在高并发统计计数的场景下,才更适合使用LongAdder

LongAdder的原理

操作原理图

先看下LongAdder的操作原理图:

既然说到LongAdder可以显著提升高并发环境下的性能,那么它是如何做到的?

分段加锁思路

设计思想上,LongAdder采用”分段”的方式降低CAS失败的频次

我们知道,AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点数据,也就是N个线程竞争一个热点。

LongAdder的基本思路就是分散热点,将value值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个value值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。

LongAdder有一个全局变量volatile long base值,当并发不高的情况下都是通过CAS来直接操作base值,如果CAS失败,则针对LongAdder中的Cell[]数组中的Cell进行CAS操作,减少失败的概率。

例如当前类中base = 10,有三个线程进行CAS原子性的**+1操作线程一执行成功,此时base=11线程二、线程三执行失败后开始针对于Cell[]数组中的Cell元素进行+1操作**,同样也是CAS操作,此时数组index=1index=2Cellvalue都被设置为了1.

执行完成后,统计累加数据:sum = 11 + 1 + 1 = 13,利用LongAdder进行累加的操作就执行完了,流程图如下:

img

如果要获取真正的long值,只要将各个槽中的变量值累加返回。这种分段的做法类似于JDK7ConcurrentHashMap的分段锁。

消除伪共享

LongAdder 的父类 Striped64 中存在一个 volatile Cell[] cells; 数组,其长度是2 的幂次方,每个Cell都使用 @Contended 注解进行修饰,而@Contended注解可以进行缓存行填充,从而解决伪共享问题。伪共享会导致缓存行失效,缓存一致性开销变大。

1
2
3
@sun.misc.Contended static final class Cell {

}

伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的 CPU缓存失效。尽管这些变量之间没有任何关系,但由于在主内存中邻近,存在于同一个缓存行之中,它们的相互覆盖会导致频繁的缓存未命中,引发性能下降。这里对于伪共享我只是提一下概念,并不会深入去讲解,大家可以自行查阅一些资料。

解决伪共享的方法一般都是使用直接填充,我们只需要保证不同线程的变量存在于不同的 CacheLine 即可,使用多余的字节来填充可以做点这一点,这样就不会出现伪共享问题。

缓存行填充代码

缓存行填充

Striped64类中我们可以看看Doug LeaCell上加的注释也有说明这一点:

Cell注释

红框中的翻译如下:

Cell类是AtomicLong添加了padded(via@sun.misc.compended)来消除伪共享的变种版本。缓存行填充对于大多数原子来说是繁琐的,因为它们通常不规则地分散在内存中,因此彼此之间不会有太大的干扰。但是,驻留在数组中的原子对象往往彼此相邻,因此在没有这种预防措施的情况下,通常会共享缓存行数据(对性能有巨大的负面影响)。

惰性求值

LongAdder只有在使用longValue()获取当前累加值时才会真正的去结算计数的数据,longValue()方法底层就是调用sum()方法,对baseCell数组的数据累加然后返回,做到数据写入和读取分离。

AtomicLong使用incrementAndGet()每次都会返回long类型的计数值,每次递增后还会伴随着数据返回,增加了额外的开销。

LongAdder实现原理

之前说了,AtomicLong是多个线程针对单个热点值value进行原子操作。而LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作

比如有三个线程同时对value增加1,那么value = 1 + 1 + 1 = 3

但是对于LongAdder来说,内部有一个base变量,一个Cell[]数组。

  • base变量:非竞态条件下,直接累加到该变量上
  • Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
  • 最终结果的计算是下面这个形式:

LongAdder源码剖析

前面已经用图分析了LongAdder高性能的原理,我们继续看下LongAdder实现的源码:

add()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LongAdder extends Striped64 implements Serializable {
public void increment() {
add(1L);
}

public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
}

一般我们进行计数时都会使用increment()方法,每次进行**+1操作**,increment()会直接调用add()方法。

变量说明:

  • as 表示cells引用
  • b 表示获取的base值
  • v 表示 期望值,
  • m 表示 cells 数组的长度
  • a 表示当前线程命中的cell单元格

条件一:as == null || (m = as.length - 1) < 0
此条件成立说明cells数组未初始化。如果不成立则说明cells数组已经完成初始化,对应的线程需要找到Cell数组中的元素去写值。

条件一

条件二:(a = as[getProbe() & m]) == null

getProbe()获取当前线程的hash值,m表示cells长度-1,cells长度是2的幂次方数,原因之前也讲到过,与数组长度取模可以转化为按位与运算,提升计算性能。

当条件成立时说明当前线程通过hash计算出来数组位置处的cell为空,进一步去执行longAccumulate()方法。如果不成立则说明对应的cell不为空,下一步将要将x值通过CAS操作添加到cell中。

条件三:!(uncontended = a.cas(v = a.value, v + x)

主要看a.cas(v = a.value, v + x),接着条件二,说明当前线程hash与数组长度取模计算出的位置的cell有值,此时直接尝试一次CAS操作,如果成功则退出if条件,失败则继续往下执行longAccumulate()方法。

条件二/条件三

longAccumulate()方法

接着往下看核心的longAccumulate()方法,代码很长,后面会一步步分析,先上代码:java.util.concurrent.atomic.Striped64.:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
Cell r = new Cell(x);
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
}
}

代码很长,if else分支很多,除此看肯定会很头疼。这里一点点分析,然后结合画图一步步了解其中实现原理。

我们首先要清楚执行这个方法的前置条件,它们是或的关系,如上面条件一、二、三

  1. cells数组没有初始化
  2. cells数组已经初始化,但是当前线程对应的cell数据为空
  3. cells数组已经初始化, 当前线程对应的cell数据为空,且CAS操作+1失败

longAccumulate()方法的入参:

  • long x 需要增加的值,一般默认都是1
  • LongBinaryOperator fn 默认传递的是null
  • wasUncontended竞争标识,如果是false则代表有竞争。只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false

然后再看下Striped64中一些变量或者方法的定义:

  • base: 类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
  • collide:表示扩容意向,false 一定不会扩容,true可能会扩容。
  • cellsBusy:初始化cells或者扩容cells需要获取锁, 0:表示无锁状态 1:表示其他线程已经持有了锁
  • casCellsBusy(): 通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
  • NCPU:当前计算机CPU数量,Cell数组扩容时会使用到
  • getProbe(): 获取当前线程的hash值
  • advanceProbe(): 重置当前线程的hash值

接着开始正式解析longAccumulate()源码:

获取当前线程的hash值

1
2
3
4
5
6
7
8
9
10
11
private static final long PROBE;

if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}

static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

我们上面说过getProbe()方法是为了获取当前线程的hash值,具体实现是通过UNSAFE.getInt()实现的,PROBE是在初始化时候获取当前线程threadLocalRandomProbe的值。

注:Unsafe.getInt()有三个重载方法getInt(Object o, long offset)、getInt(long address) 和getIntVolatile(long address),都是从指定的位置获取变量的值,只不过第一个的offset是相对于对象o的相对偏移量,第二个address是绝对地址偏移量。如果第一个方法中o为null是,offset也会被作为绝对偏移量。第三个则是带有volatile语义的load读操作。

如果当前线程的hash值h=getProbe()为0,0与任何数取模都是0,会固定到数组第一个位置,所以这里做了优化,使用ThreadLocalRandom为当前线程重新计算一个hash值。最后设置wasUncontended = true,这里含义是重新计算了当前线程的hash后认为此次不算是一次竞争。hash值被重置就好比一个全新的线程一样,所以设置了竞争状态为true

可以画图理解为:

wasUncontended设置说明

接着执行for循环,我们可以把for循环代码拆分一下,每个if条件算作一个CASE来分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {

for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {

}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

}
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))

}
}

如上所示,第一个if语句代表CASE1,里面再有if判断会以CASE1.1这种形式来讲解,下面接着的else ifCASE2, 最后一个为CASE3

CASE1执行条件

1
2
3
if ((as = cells) != null && (n = as.length) > 0) {

}

cells数组不为空,且数组长度大于0的情况会执行CASE1CASE1的实现细节代码较多,放到最后面讲解。

CASE2执行条件和实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}

CASE2 标识cells数组还未初始化,因为判断cells == as,这个代表当前线程到了这里获取的cells还是之前的一致。我们可以先看这个case,最后再回头看最为麻烦的CASE1实现逻辑。

cellsBusy上面说了是加锁的状态,初始化cells数组和扩容的时候都要获取加锁的状态,这个是通过CAS来实现的,为0代表无锁状态,为1代表其他线程已经持有锁了。cells==as代表当前线程持有的数组未进行修改过,casCellsBusy()通过CAS操作去获取锁。但是里面的if条件又再次判断了cell==as,这一点是不是很奇怪?通过画图来说明下问题:

cells==as双重判断说明

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素value是x值,默认为1。

h & 1类似于我们之前HashMap或者ThreadLocal里面经常用到的计算散列桶index的算法,通常都是hash & (table.len - 1),这里就不做过多解释了。 执行完成后直接退出for循环

CASE3执行条件和实现原理

1
2
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;

进入到这里说明cells正在或者已经初始化过了,执行caseBase()方法,通过CAS操作来修改base的值,如果修改成功则跳出循环,这个CASE只有在初始化Cell数组的时候,多个线程尝试CAS修改cellsBusy加锁的时候,失败的线程会走到这个分支,然后直接CAS修改base数据。

CASE1 实现原理

分析完了CASE2和CASE3,我们再折头回看一下CASE1,进入CASE1的前提是:cells数组不为空,已经完成了初始化赋值操作。

接着还是一点点往下拆分代码,首先看第一个判断分支CASE1.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
Cell r = new Cell(x);
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}

这个if条件中(a = as[(n - 1) & h]) == null代表当前线程对应的数组下标位置的cell数据为null,代表没有线程在此处创建Cell对象。

接着判断cellsBusy==0,代表当前锁未被占用。然后新创建Cell对象,接着又判断了一遍cellsBusy == 0,然后执行casCellsBusy()尝试通过CAS操作修改cellsBusy=1,加锁成功后修改扩容意向collide = false;

1
2
3
4
5
6
7
8
9
10
for (;;) {
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}

if (created)
break;
continue;
}

上面代码判断当前线程hash后指向的数据位置元素是否为空,如果为空则将cell数据放入数组中,跳出循环。如果不为空则继续循环。

CASE1.1

继续往下看代码,CASE1.2

1
2
3
4
else if (!wasUncontended)
wasUncontended = true;

h = advanceProbe(h);

wasUncontended表示cells初始化后,当前线程竞争修改失败wasUncontended =false,这里只是重新设置了这个值为true,紧接着执行advanceProbe(h)重置当前线程的hash,重新循环。

接着看CASE1.3

1
2
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;

进入CASE1.3说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。

接着看CASE1.4:

1
2
else if (n >= NCPU || cells != as)
collide = false;

如果cells数组的长度达到了CPU核心数,或者cells扩容了,设置扩容意向collide为false并通过下面的h = advanceProbe(h)方法修改线程的probe再重新尝试

至于这里为什么要提出和CPU数量做判断的问题:每个线程会通过线程对cells[threadHash%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个cell对象上,如果超过CPU数量的时候就不再扩容是因为CPU的数量代表了机器处理能力,当超过CPU数量时,多出来的cells数组元素没有太大作用。

多线程更新Cell

接着看CASE1.5

1
2
else if (!collide)
collide = true;

如果扩容意向collidefalse则修改它为true,然后重新计算当前线程的hash值继续循环,在CASE1.4中,如果当前数组的长度已经大于了CPU的核数,就会再次设置扩容意向collide=false,这里的意义是保证扩容意向为false后不再继续往后执行CASE1.6的扩容操作。

接着看CASE1.6分支:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}

这里面执行的其实是扩容逻辑,首先是判断通过CAS改变cellsBusy来尝试加锁,如果CAS成功则代表获取锁成功,继续向下执行,判断当前的cells数组和最先赋值的as是同一个,代表没有被其他线程扩容过,然后进行扩容,扩容大小为之前的容量的两倍,这里用的按位左移1位来操作的。

1
Cell[] rs = new Cell[n << 1];

扩容后再将之前数组的元素拷贝到新数组中,释放锁设置cellsBusy = 0,设置扩容状态,然后继续循环执行。

到了这里,我们已经分析完了longAccumulate()所有的逻辑,逻辑分支挺多,仔细分析看看其实还是挺清晰的,流程图如下:

流程图

流程图

我们再举一些线程执行的例子里面场景覆盖不全,大家可以按照这种模式自己模拟场景分析代码流程:

多线程执行示例

如有问题也请及时指出,我会第一时间更正,不胜感激!

LongAdder的sum方法

当我们最终获取计数器值时,我们可以使用LongAdder.longValue()方法,其内部就是使用sum方法来汇总数据的。

java.util.concurrent.atomic.LongAdder.sum():

1
2
3
4
5
6
7
8
9
10
11
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

实现很简单,base + 遍历cells数组中的值,然后累加。

CAS在JDK中的广泛应用

CAS操作的弊端和规避措施

CAS操作的弊端主要有以下三点:

  1. ABA问题
    • 使用CAS操作内存数据时,数据发生过变化也能更新成功,如操作序列A==>B==>A时,最后一个CAS的预期数据A实际已经发生过更改,但也能更新成功,这就产生了ABA问题。
    • ABA问题的解决思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候将版本号加1,那么操作序列A==>B==>A就会变成A1==>B2==>A3,如果将A1当作A3的预期数据,就会操作失败。
    • JDK提供了两个类AtomicStampedReferenceAtomicMarkableReference来解决ABA问题。比较常用的是AtomicStampedReference类,该类的compareAndSet()方法的作用是首先检查当前引用是否等于预期引用,以及当前印戳是否等于预期印戳,如果全部相等,就以原子方式将引用和印戳的值一同设置为新的值。
  2. 只能保证一个共享变量之间的原子性操作
    • 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,CAS就无法保证操作的原子
      性。
    • 一个比较简单的规避方法为:把多个共享变量合并成一个共享变量来操作。
    • JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个AtomicReference实例后再进行CAS操作。比如有两个共享变量i=1、j=2,可以将二者合并成一个对象,然后用CAS来操作该合并对象的AtomicReference引用。
  3. 开销问题自旋CAS如果长时间不成功(不成功就一直循环执行,直到成功),就会给CPU带来非常大的执行开销。解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案为:
    • 分散操作热点,使用LongAdder替代基础原子类AtomicLong,LongAdder将单个CAS热点(value值)分散到一个cells数组中。
    • 使用队列削峰,将发生CAS争用的线程加入一个队列中排队,降低CAS争用的激烈程度。JUC中非常重要的基础类AQS(抽象队列同步器)就是这么做的。

CAS操作在JDK中的应用

  • CAS在java.util.concurrent.atomic包中的原子类、Java AQS以及显式锁、CurrentHashMap等重要并发容器类的实现都有非常广泛的应用。
  • 在java.util.concurrent.atomic包的原子类(如AtomicXXX)中都使用了CAS来保障对数字成员进行操作的原子性。
  • java.util.concurrent的大多数类(包括显式锁、并发容器)都是基于AQS和AtomicXXX来实现的,其中AQS通过CAS保障它内部双向队列头部、尾部操作的原子性。

参考:

longadder原理 - r1-12king - 博客园