CAS原理与JUC原子类
由于JVM的Synchronized重量级锁涉及操作系统(如Linux)内核态下互斥锁的使用,因此其线程阻塞和唤醒都涉及进程在用户态到内核态的频繁切换,导致重量级锁开销大、性能低。而JVM的Synchronized轻量级锁使用CAS(Compare And Swap,比较并交换)进行自旋抢锁,CAS是CPU指令级的原子操作,并处于用户态下,所以JVM轻量级锁的开销较小。
什么是CAS
JDK 5所增加的JUC(java.util.concurrent)并发包对操作系统的底层CAS原子操作进行了封装,为上层Java程序提供了CAS操作的API。
Unsafe类中的CAS方法
Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全的底层操作,如直接访问系统内存资源、自主管理内存资源等。Unsafe大量的方法都是native方法,基于C++语言实现,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。
Unsafe类的全限定名为sun.misc.Unsafe,从名字中可以看出这个类对普通程序员来说是“危险”的,一般的应用开发都不会涉及此类,Java官方也不建议直接在应用程序中使用这些类。
为什么此类取名为Unsafe呢?由于使用Unsafe类可以像C语言一样使用指针操作内存空间,这无疑增加了指针相关问题、内存泄漏问题出现的概率。总之,在程序中过度使用Unsafe类会使得程序出错的概率变大,使得安全的语言Java变得不再安全,因此对Unsafe的使用一定要慎重。
操作系统层面的CAS是一条CPU的原子指令(cmpxchg指令),正是由于该指令具备原子性,因此使用CAS操作数据时不会造成数据不一致的问题,Unsafe提供的CAS方法直接通过native方式(封装C++代码)调用了底层的CPU指令cmpxchg。
完成Java应用层的CAS操作主要涉及Unsafe方法的调用,具体如下:
- 获取Unsafe实例。
- 调用Unsafe提供的CAS方法,这些方法主要封装了底层CPU的CAS原子操作。
- 调用Unsafe提供的字段偏移量方法,这些方法用于获取对象中的字段(属性)偏移量,此偏移量值需要作为参数提供给CAS操作。
获取Unsafe实例
Unsafe类是一个final修饰的不允许继承的最终类,而且其构造函数是private类型的方法,具体的源码如下:
1 | public final class Unsafe { |
因此,我们无法在外部对Unsafe进行实例化,那么怎么获取Unsafe的实例呢?可以通过反射的方式自定义地获取Unsafe实例的辅助方法,代码如下:
1 | // 省略import |
调用Unsafe提供的CAS方法
Unsafe提供的CAS方法包含4个操作数——字段所在的对象、字段内存位置、预期原值及新值。在执行Unsafe的CAS方法时,这些方法首先将内存位置的值与预期值(旧的值)比较,如果相匹配,那么CPU会
自动将该内存位置的值更新为新值,并返回true;如果不匹配,CPU不做任何操作,并返回false。
Unsafe的CAS操作会将第一个参数(对象的指针、地址)与第二个参数(字段偏移量)组合在一起,计算出最终的内存操作地址。
1 | /** |
调用Unsafe提供的偏移量相关
Unsafe提供的获取字段(属性)偏移量的相关操作主要如下:
- staticFieldOffset()方法用于获取静态属性Field在Class对象中的偏移量,在CAS中操作静态属性时会用到这个偏移量。
- objectFieldOffset()方法用于获取非静态Field(非静态属性)在Object实例中的偏移量,在CAS中操作对象的非静态属性时会用到这个偏移量。
1 | /** |
一个获取非静态Field(非静态属性)在Object实例中的偏移量的示例代码如下:
1 | static { |
使用CAS进行无锁编程
CAS是一种无锁算法,该算法关键依赖两个值——期望值(旧值)和新值,底层CPU利用原子操作判断内存原值与期望值是否相等,如果相等就给内存地址赋新值,否则不做任何操作。
使用CAS进行无锁编程的步骤大致如下:
- 获得字段的期望值(oldValue)。
- 计算出需要替换的新值(newValue)。
- 通过CAS将新值(newValue)放在字段的内存地址上,如果CAS失败就重复第(1)步到第(2)步,一直到CAS成功,这种重复俗称CAS自旋。
使用CAS进行无锁编程的伪代码如下:
1 | do { |
假如某个内存地址(某对象的属性)的值为100,现在有两个线程(线程A和线程B)使用CAS无锁编程对该内存地址进行更新,线程A欲将其值更新为200,线程B欲将其值更新为300,线程是并发执行的,谁都有可能先执行。但是CAS是原子操作,对同一个内存地址的CAS操作在同一时刻只能执行一个。因此,在这个例
子中,要么线程A先执行,要么线程B先执行。
- 假设线程A的CAS(100,200)执行在前,由于内存地址的旧值100与该CAS的期望值100相等,因此线程A会操作成功,内存地址的值被更新为200。
- 接下来执行线程B的CAS(100,300)操作,此时内存地址的值为200,不等于CAS的期望值100,线程B操作失败。线程B只能自旋,开始新的循环,这一轮循环首先获取到内存地址的值200,然后进行CAS(200,300)操作,这一次内存地址的值与CAS的预期值(oldValue)相等,线程B操作成功。
当CAS将内存地址的值与预期值进行比较时,如果相等,就证明内存地址的值没有被修改,可以替换成新值,然后继续往下运行;如果不相等,就说明内存地址的值已经被修改,放弃替换操作,然后重新自旋。当并发修改的线程少,冲突出现的机会少时,自旋的次数也会很少,CAS的性能会很高;当并发修改的线程多,冲突出现的机会多时,自旋的次数也会很多,CAS的性能会大大降低。所以,提升CAS无
锁编程效率的关键在于减少冲突的机会。
无锁编程实现轻量级安全自增
1 | public class Test { |
字段偏移量的计算
调用Unsafe.objectFieldOffset(…)方法获取到的Object字段(也叫Object成员属性)的偏移量值是字段相对于Object头部的偏移
量,是一个相对的内存地址值,不是绝对的内存地址值。
1 | public class Test { |
虽然Test类有2个字段,但是其中有1个是静态字段,属于类的成员而不是对象的成员,真正属于对象的字段只有其中的value字段。所以类的对象结构
如图所示:
在64位的JVM堆区中一个Test对象的Object Header(头部)占用了12字节,其中Mark Word占用了8字节(64位),压缩过的Class Pointer占用了4字节。接在Object Header之后的就是成员属性value的内存区域,所以value属性相对于Object Header的偏移量为12。
JUC原子类
在多线程并发执行时,诸如“++”或“–”类的运算不具备原子性,不是线程安全的操作。通常情况下,大家会使用synchronized将这些线程不安全的操作变成同步操作,但是这样会降低并发程序的性能。所以,JDK为这些类型不安全的操作提供了一些原子类,与synchronized同步机制相比,JDK原子类是基于CAS轻量级原子操作的实现,使得程序运行效率变得更高。
Atomic原子操作包
Atomic操作翻译成中文是指一个不可中断的操作,即使在多个线程一起执行Atomic类型操作的时候,一个操作一旦开始,就不会被其
他线程中断。所谓Atomic类,指的是具有原子操作特征的类。JUC并发包中原子类的位置JUC并发包中的原子类都存放在java.util.concurrent.atomic
类路径下.
根据操作的目标数据类型,可以将JUC包中的原子类分为4类:基本原子类
、数组原子类
、原子引用类
和字段更新原子类
。
基本原子类
- 基本原子类的功能是通过原子方式更新Java基础类型变量的值。
- 基本原子类主要包括以下三个:
AtomicInteger
:整型原子类。AtomicLong
:长整型原子类。AtomicBoolean
:布尔型原子类。
数组原子类
- 数组原子类的功能是通过原子方式更数组中的某个元素的值。数
- 组原子类主要包括以下三个:
AtomicIntegerArray
:整型数组原子类。AtomicLongArray
:长整型数组原子类。AtomicReferenceArray
:引用类型数组原子类。
引用原子类
- 引用原子类主要包括以下三个:
AtomicReference
:引用类型原子类。AtomicMarkableReference
:带有更新标记位的原子引用类型。AtomicStampedReference
:带有更新版本号的原子引用类型。
AtomicMarkableReference
类将boolean标记与引用关联起来,可以解决使用AtomicBoolean进行原子更新时可能出现的ABA问题。AtomicStampedReference
类将整数值与引用关联起来,可以解决使用AtomicInteger进行原子更新时可能出现的ABA问题。
字段更新原子类
- 字段更新原子类主要包括以下三个:
AtomicIntegerFieldUpdater
:原子更新整型字段的更新器。AtomicLongFieldUpdater
:原子更新长整型字段的更新器。AtomicReferenceFieldUpdater
:原子更新引用类型中的字段
基础原子类AtomicInteger
基础原子类AtomicInteger常用的方法如下:
1 | public final int get() //获取当前的值 |
下面是一个基础原子类AtomicInteger的使用示例,具体代码如下:
1 | import java.util.concurrent.atomic.AtomicInteger; |
数组原子类AtomicIntegerArray
AtomicIntegerArray.AtomicLongArray,AtomicReferenceArray三个类提供的方法几乎相同,所以我们这里以AtomicIntegerArray为例来介绍。
AtomicIntegerArray类的常用方法如下:
1 | //获取 index=i 位置元素的值 |
1 | public void testAtomicIntegerArray() { |
AtomicInteger线程安全原理
基础原子类(以AtomicInteger为例)主要通过CAS自旋+volatile的方案实现,既保障了变量操作的线程安全性,又避免了synchronized重量级锁的高开销,使得Java程序的执行效率大为提升。
AtomicInteger源码中的主要方法都是通过CAS自旋实现的。CAS自旋的主要操作为:如果一次CAS操作失败,获取最新的value值后,再
次进行CAS操作,直到成功。
另外,AtomicInteger所包装的内部value成员是一个使用关键字volatile修饰的内部成员。关键字volatile的原理比较复杂,简单地
说,该关键字可以保证任何线程在任何时刻总能拿到该变量的最新值,其目的在于保障变量值的线程可见性。
1 | public class AtomicInteger extends Number implements java.io.Serializable { |
引用类型原子类
- 引用类型原子类包括以下种:
- AtomicReference:基础的引用原子类。
- AtomicStampedReference:带印戳的引用原子类。
- AtomicMarkableReference:带修改标志的引用原子类。
- 上面三个类提供的方法几乎相同,所以这里以AtomicReference为例来介绍。
下面为大家介绍一个简单的AtomicReference类的使用示例
1 |
|
属性更新原子类
此这里以AtomicIntegerFieldUpdater
为例来介绍。使用属性更新原子类保障属性安全更新的流程大致需要两步:
- 第一步,更新的对象属性必须使用
public volatile
修饰符。 - 第二步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须调用静态方法newUpdater()创建一个更新器,并且需要设
置想要更新的类和属性。
下面为大家介绍一个简单的AtomicIntegerFieldUpdater类的使用示例,原子性地更新User对象的age属性,代码如下:
1 | public void testAtomicIntegerFieldUpdater() { |
ABA问题
由于CAS原子操作性能高,因此其在JUC包中被广泛应用,只不过如果使用得不合理,CAS原子操作就会存在ABA问题。
了解ABA问题
什么是ABA问题?举一个例子来说明。比如一个线程A从内存位置M中取出V1,另一个线程B也取出V1。现在假设线程B进行了一些操作之
后将M位置的数据V1变成了V2,然后又在一些操作之后将V2变成了V1。之后,线程A进行CAS操作,但是线程A发现M位置的数据仍然是V1,然
后线程A操作成功。尽管线程A的CAS操作成功,但是不代表这个过程是没有问题的,线程A操作的数据V1可能已经不是之前的V1,而是被线程
B替换过的V1,这就是ABA问题。
并发业务场景下,两个并发的查询库存操作,同时从数据库都得到了库存是5。用户1购买了3个库存,于是库存要设置为2, 用户2购买了2个库存,于是库存要设置为3,这两个设置库存的接口并发执行,库存会先变成2,再变成3,导致数据不一致(实际卖出了5件商品,但库存只扣减了2,最后一次设置库存会覆盖和掩盖前一次并发操作)
ABA问题解决方案
很多乐观锁的实现版本都是使用版本号(Version)方式来解决ABA问题。乐观锁每次在执行数据的修改操作时都会带上一个版本号,版本号和数据的版本号一致就可以执行修改操作并对版本号执行加1操作,否则执行失败。因为每次操作的版本号都会随之增加,所以不会出现ABA问题,因为版本号只会增加,不会减少。
AtomicStampedReference
参考乐观锁的版本号,JDK提供了一个AtomicStampedReference类来解决ABA问题。AtomicStampReference在CAS的基础上增加了一个
Stamp(印戳或标记),使用这个印戳可以用来觉察数据是否发生变化,给数据带上了一种实效性的检验。
AtomicStampReference的compareAndSet()方法首先检查当前的对象引用值是否等于预期引用,并且当前印戳(Stamp)标志是否等于预
期标志,如果全部相等,就以原子方式将引用值和印戳(Stamp)标志的值更新为给定的更新值。
1 | //构造器,V表示要引用的原始数据,initialStamp表示最初的版本印戳(版本号) |
compareAndSet()方法的第一个参数是原来的CAS中的参数,第二个参数是替换后的新参数,第三个参数是原来CAS数据旧的版本号,第
四个参数表示替换后的新参数版本号。进行CAS操作时,若当前引用值等于预期引用值,并且当前印戳值等于预期印戳值,则以原子方式将引用值和印戳值更新为给定的更新值。
下面是一个简单的AtomicStampedReference使用示例:
1 | // 使用 AtomicStampedReference 存储当前的库存值和版本号 |
AtomicMarkableReference
AtomicMarkableReference是AtomicStampedReference的简化版,不关心修改过几次,只关心是否修改过。因此,其标记属性mark是boolean类型,而不是数字类型,标记属性mark仅记录值是否修改过。AtomicMarkableReference适用于只要知道对象是否被修改过,而不适用于对象被反复修改的场景。
下面是一个简单的AtomicMarkableReference使用示例,通过两个线程分别更新同一个stock 的值,第一个线程会更新成功,而第二个线程会更新失败,具体代码如下:
1 | // 使用 AtomicMarkableReference 存储当前的库存值和标记(表示是否有效) |
提升高并发场景下CAS操作的性能
在争用激烈的场景下,会导致大量的CAS空自旋。比如,在大量线程同时并发修改一个AtomicInteger时,可能有很多线程会不停地自旋,甚至有的线程会进入一个无限重复的循环中。大量的CAS空自旋会浪费大量的CPU资源,大大降低了程序的性能。
除了存在CAS空自旋之外,在SMP架构的CPU平台上,大量的CAS操作还可能导致“总线风暴”
在高并发场景下如何提升CAS操作的性能呢?可以使用LongAdder替代AtomicInteger。
以空间换时间:LongAdder
Java 8提供了一个新的类LongAdder
,以空间换时间的方式提升高并发场景下CAS操作的性能。LongAdder的核心思想是热点分离,与ConcurrentHashMap的设计思想类似:将value值分离成一个数组,当多线程访问时,通过Hash算法将线程映射到数组的一个元素进行操作;而获取最终的value结果时,则将数组的元素求和。最终,通过LongAdder将内部操作对象从单个value值“演变”成一系列的数组元素,从而减小了内部竞争的粒度。
1 |
|
这里可以看到随着并发的增加,AtomicLong
性能是急剧下降的,耗时是LongAdder
的数倍。至于原因我们还是接着往后看。
AtomicLong可以弃用了吗?
看上去LongAdder
的性能全面超越了AtomicLong
,而且阿里巴巴开发手册也提及到 推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观
锁的重试次数),但是我们真的就可以舍弃掉LongAdder
了吗?
当然不是,我们需要看场景来使用,如果是并发不太高的系统,使用AtomicLong
可能会更好一些,而且内存需求也会小一些。
我们通过分析源码sum()
方法后可以知道LongAdder
在统计的时候如果有并发更新,可能导致统计的数据有误差。而在高并发统计计数的场景下,才更适合使用LongAdder
。
LongAdder的原理
操作原理图
先看下LongAdder
的操作原理图:
既然说到LongAdder
可以显著提升高并发环境下的性能,那么它是如何做到的?
分段加锁思路
设计思想上,
LongAdder
采用”分段”的方式降低CAS
失败的频次
我们知道,AtomicLong
中有个内部变量value
保存着实际的long
值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value
变量其实是一个热点数据,也就是N个线程竞争一个热点。
LongAdder
的基本思路就是分散热点,将value
值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个value
值进行CAS
操作,这样热点就被分散了,冲突的概率就小很多。
LongAdder
有一个全局变量volatile long base
值,当并发不高的情况下都是通过CAS
来直接操作base
值,如果CAS
失败,则针对LongAdder
中的Cell[]
数组中的Cell
进行CA
S操作,减少失败的概率。
例如当前类中base = 10
,有三个线程进行CAS
原子性的**+1操作,线程一执行成功,此时base=11,线程二、线程三执行失败后开始针对于Cell[]
数组中的Cell
元素进行+1操作**,同样也是CAS
操作,此时数组index=1
和index=2
中Cell
的value
都被设置为了1.
执行完成后,统计累加数据:sum = 11 + 1 + 1 = 13
,利用LongAdder
进行累加的操作就执行完了,流程图如下:
如果要获取真正的long
值,只要将各个槽中的变量值累加返回。这种分段的做法类似于JDK7
中ConcurrentHashMap
的分段锁。
消除伪共享
在 LongAdder
的父类 Striped64
中存在一个 volatile Cell[] cells;
数组,其长度是2 的幂次方,每个Cell
都使用 @Contended
注解进行修饰,而@Contended
注解可以进行缓存行填充,从而解决伪共享问题。伪共享会导致缓存行失效,缓存一致性开销变大。
1 | static final class Cell { .misc.Contended |
伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的 CPU缓存失效
。尽管这些变量之间没有任何关系,但由于在主内存中邻近,存在于同一个缓存行之中,它们的相互覆盖会导致频繁的缓存未命中,引发性能下降。这里对于伪共享我只是提一下概念,并不会深入去讲解,大家可以自行查阅一些资料。
解决伪共享的方法一般都是使用直接填充,我们只需要保证不同线程的变量存在于不同的 CacheLine
即可,使用多余的字节来填充可以做点这一点,这样就不会出现伪共享问题。
在Striped64
类中我们可以看看Doug Lea
在Cell
上加的注释也有说明这一点:
红框中的翻译如下:
Cell
类是AtomicLong
添加了padded(via@sun.misc.compended)
来消除伪共享的变种版本。缓存行填充对于大多数原子来说是繁琐的,因为它们通常不规则地分散在内存中,因此彼此之间不会有太大的干扰。但是,驻留在数组中的原子对象往往彼此相邻,因此在没有这种预防措施的情况下,通常会共享缓存行数据(对性能有巨大的负面影响)。
惰性求值
LongAdder
只有在使用longValue()
获取当前累加值时才会真正的去结算计数的数据,longValue()
方法底层就是调用sum()
方法,对base
和Cell数组
的数据累加然后返回,做到数据写入和读取分离。
而AtomicLong
使用incrementAndGet()
每次都会返回long
类型的计数值,每次递增后还会伴随着数据返回,增加了额外的开销。
LongAdder实现原理
之前说了,AtomicLong
是多个线程针对单个热点值value进行原子操作。而LongAdder
是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作
。
比如有三个线程同时对value增加1,那么value = 1 + 1 + 1 = 3
但是对于LongAdder来说,内部有一个base变量,一个Cell[]数组。
- base变量:非竞态条件下,直接累加到该变量上
- Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
- 最终结果的计算是下面这个形式:
LongAdder源码剖析
前面已经用图分析了LongAdder
高性能的原理,我们继续看下LongAdder
实现的源码:
add()
方法
1 | public class LongAdder extends Striped64 implements Serializable { |
一般我们进行计数时都会使用increment()
方法,每次进行**+1操作**,increment()
会直接调用add()
方法。
变量说明:
- as 表示cells引用
- b 表示获取的base值
- v 表示 期望值,
- m 表示 cells 数组的长度
- a 表示当前线程命中的cell单元格
条件一:as == null || (m = as.length - 1) < 0
此条件成立说明cells数组未初始化。如果不成立则说明cells数组已经完成初始化,对应的线程需要找到Cell数组中的元素去写值。
条件二:(a = as[getProbe() & m]) == null
getProbe()获取当前线程的hash值,m表示cells长度-1,cells长度是2的幂次方数,原因之前也讲到过,与数组长度取模可以转化为按位与运算,提升计算性能。
当条件成立时说明当前线程通过hash计算出来数组位置处的cell为空,进一步去执行longAccumulate()方法。如果不成立则说明对应的cell不为空,下一步将要将x值通过CAS操作添加到cell中。
条件三:!(uncontended = a.cas(v = a.value, v + x)
主要看a.cas(v = a.value, v + x),接着条件二,说明当前线程hash与数组长度取模计算出的位置的cell有值,此时直接尝试一次CAS操作,如果成功则退出if条件,失败则继续往下执行longAccumulate()方法。
longAccumulate()方法
接着往下看核心的longAccumulate()
方法,代码很长,后面会一步步分析,先上代码:java.util.concurrent.atomic.Striped64.
:
1 | final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { |
代码很长,if else
分支很多,除此看肯定会很头疼。这里一点点分析,然后结合画图一步步了解其中实现原理。
我们首先要清楚执行这个方法的前置条件,它们是或的关系,如上面条件一、二、三
- cells数组没有初始化
- cells数组已经初始化,但是当前线程对应的cell数据为空
- cells数组已经初始化, 当前线程对应的cell数据为空,且CAS操作+1失败
longAccumulate()方法的入参:
- long x 需要增加的值,一般默认都是1
- LongBinaryOperator fn 默认传递的是null
- wasUncontended竞争标识,如果是false则代表有竞争。只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false
然后再看下Striped64中一些变量或者方法的定义:
- base: 类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
- collide:表示扩容意向,false 一定不会扩容,true可能会扩容。
- cellsBusy:初始化cells或者扩容cells需要获取锁, 0:表示无锁状态 1:表示其他线程已经持有了锁
- casCellsBusy(): 通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
- NCPU:当前计算机CPU数量,Cell数组扩容时会使用到
- getProbe(): 获取当前线程的hash值
- advanceProbe(): 重置当前线程的hash值
接着开始正式解析longAccumulate()源码:
获取当前线程的hash值
1 | private static final long PROBE; |
我们上面说过getProbe()
方法是为了获取当前线程的hash值
,具体实现是通过UNSAFE.getInt()
实现的,PROBE
是在初始化时候获取当前线程threadLocalRandomProbe
的值。
注:Unsafe.getInt()有三个重载方法getInt(Object o, long offset)、getInt(long address) 和getIntVolatile(long address),都是从指定的位置获取变量的值,只不过第一个的offset是相对于对象o的相对偏移量,第二个address是绝对地址偏移量。如果第一个方法中o为null是,offset也会被作为绝对偏移量。第三个则是带有volatile语义的load读操作。
如果当前线程的hash值h=getProbe()为0,0与任何数取模都是0,会固定到数组第一个位置,所以这里做了优化,使用ThreadLocalRandom
为当前线程重新计算一个hash
值。最后设置wasUncontended = true
,这里含义是重新计算了当前线程的hash
后认为此次不算是一次竞争。hash
值被重置就好比一个全新的线程一样,所以设置了竞争状态为true
。
可以画图理解为:
接着执行for循环
,我们可以把for循环
代码拆分一下,每个if条件
算作一个CASE
来分析:
1 | final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { |
如上所示,第一个if语句代表CASE1
,里面再有if判断
会以CASE1.1
这种形式来讲解,下面接着的else if
为CASE2
, 最后一个为CASE3
CASE1执行条件:
1 | if ((as = cells) != null && (n = as.length) > 0) { |
cells数组
不为空,且数组长度大于0的情况会执行CASE1
,CASE1
的实现细节代码较多,放到最后面讲解。
CASE2执行条件和实现原理:
1 | else if (cellsBusy == 0 && cells == as && casCellsBusy()) { |
CASE2
标识cells数组
还未初始化,因为判断cells == as
,这个代表当前线程到了这里获取的cells
还是之前的一致。我们可以先看这个case
,最后再回头看最为麻烦的CASE1
实现逻辑。
cellsBusy
上面说了是加锁的状态,初始化cells数组
和扩容的时候都要获取加锁的状态,这个是通过CAS
来实现的,为0代表无锁状态,为1代表其他线程已经持有锁了。cells==as
代表当前线程持有的数组未进行修改过,casCellsBusy()
通过CAS操作
去获取锁。但是里面的if条件
又再次判断了cell==as
,这一点是不是很奇怪?通过画图来说明下问题:
如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]
表示数组的长度为2,rs[h & 1] = new Cell(x)
表示创建一个新的Cell元素
,value是x值,默认为1。
h & 1
类似于我们之前HashMap
或者ThreadLocal
里面经常用到的计算散列桶index
的算法,通常都是hash & (table.len - 1)
,这里就不做过多解释了。 执行完成后直接退出for循环
。
CASE3执行条件和实现原理:
1 | else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) |
进入到这里说明cells
正在或者已经初始化过了,执行caseBase()
方法,通过CAS操作
来修改base
的值,如果修改成功则跳出循环,这个CAS
E只有在初始化Cell数组
的时候,多个线程尝试CAS
修改cellsBusy
加锁的时候,失败的线程会走到这个分支,然后直接CAS
修改base
数据。
CASE1 实现原理:
分析完了CASE2和CASE3
,我们再折头回看一下CASE1
,进入CASE1
的前提是:cells数组
不为空,已经完成了初始化赋值操作。
接着还是一点点往下拆分代码,首先看第一个判断分支CASE1.1
:
1 | if ((a = as[(n - 1) & h]) == null) { |
这个if条件中(a = as[(n - 1) & h]) == null
代表当前线程对应的数组下标位置的cell
数据为null
,代表没有线程在此处创建Cell
对象。
接着判断cellsBusy==0
,代表当前锁未被占用。然后新创建Cell对象
,接着又判断了一遍cellsBusy == 0
,然后执行casCellsBusy()
尝试通过CAS操作修改cellsBusy=1
,加锁成功后修改扩容意向collide = false;
1 | for (;;) { |
上面代码判断当前线程hash
后指向的数据位置元素是否为空,如果为空则将cell
数据放入数组中,跳出循环。如果不为空则继续循环。
继续往下看代码,CASE1.2:
1 | else if (!wasUncontended) |
wasUncontended
表示cells
初始化后,当前线程竞争修改失败wasUncontended =false
,这里只是重新设置了这个值为true
,紧接着执行advanceProbe(h)
重置当前线程的hash
,重新循环。
接着看CASE1.3:
1 | else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) |
进入CASE1.3
说明当前线程对应的数组中有了数据,也重置过hash值
,这时通过CAS操作尝试对当前数中的value值
进行累加x操作,x默认为1,如果CAS
成功则直接跳出循环。
接着看CASE1.4:
1 | else if (n >= NCPU || cells != as) |
如果cells数组
的长度达到了CPU核心数
,或者cells
扩容了,设置扩容意向collide为false
并通过下面的h = advanceProbe(h)
方法修改线程的probe
再重新尝试
至于这里为什么要提出和CPU数量
做判断的问题:每个线程会通过线程对cells[threadHash%cells.length]
位置的Cell
对象中的value
做累加,这样相当于将线程绑定到了cells
中的某个cell
对象上,如果超过CPU数量
的时候就不再扩容是因为CPU
的数量代表了机器处理能力,当超过CPU
数量时,多出来的cells
数组元素没有太大作用。
接着看CASE1.5:
1 | else if (!collide) |
如果扩容意向collide
是false
则修改它为true
,然后重新计算当前线程的hash
值继续循环,在CASE1.4
中,如果当前数组的长度已经大于了CPU
的核数,就会再次设置扩容意向collide=false
,这里的意义是保证扩容意向为false
后不再继续往后执行CASE1.6
的扩容操作。
接着看CASE1.6分支:
1 | else if (cellsBusy == 0 && casCellsBusy()) { |
这里面执行的其实是扩容逻辑,首先是判断通过CAS
改变cellsBusy
来尝试加锁,如果CAS
成功则代表获取锁成功,继续向下执行,判断当前的cells
数组和最先赋值的as
是同一个,代表没有被其他线程扩容过,然后进行扩容,扩容大小为之前的容量的两倍,这里用的按位左移1位来操作的。
1 | Cell[] rs = new Cell[n << 1]; |
扩容后再将之前数组的元素拷贝到新数组中,释放锁设置cellsBusy = 0
,设置扩容状态,然后继续循环执行。
到了这里,我们已经分析完了longAccumulate()
所有的逻辑,逻辑分支挺多,仔细分析看看其实还是挺清晰的,流程图如下:
流程图
我们再举一些线程执行的例子里面场景覆盖不全,大家可以按照这种模式自己模拟场景分析代码流程:
如有问题也请及时指出,我会第一时间更正,不胜感激!
LongAdder的sum方法
当我们最终获取计数器值时,我们可以使用LongAdder.longValue()
方法,其内部就是使用sum
方法来汇总数据的。
java.util.concurrent.atomic.LongAdder.sum()
:
1 | public long sum() { |
实现很简单,base + 遍历cells
数组中的值,然后累加。
CAS在JDK中的广泛应用
CAS操作的弊端和规避措施
CAS操作的弊端主要有以下三点:
- ABA问题
- 使用CAS操作内存数据时,数据发生过变化也能更新成功,如操作序列A==>B==>A时,最后一个CAS的预期数据A实际已经发生过更改,但也能更新成功,这就产生了ABA问题。
- ABA问题的解决思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候将版本号加1,那么操作序列A==>B==>A就会变成A1==>B2==>A3,如果将A1当作A3的预期数据,就会操作失败。
- JDK提供了两个类
AtomicStampedReference
和AtomicMarkableReference
来解决ABA问题。比较常用的是AtomicStampedReference类,该类的compareAndSet()方法的作用是首先检查当前引用是否等于预期引用,以及当前印戳是否等于预期印戳,如果全部相等,就以原子方式将引用和印戳的值一同设置为新的值。
- 只能保证一个共享变量之间的原子性操作
- 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,CAS就无法保证操作的原子
性。 - 一个比较简单的规避方法为:把多个共享变量合并成一个共享变量来操作。
- JDK提供了
AtomicReference
类来保证引用对象之间的原子性,可以把多个变量放在一个AtomicReference实例后再进行CAS操作。比如有两个共享变量i=1、j=2,可以将二者合并成一个对象,然后用CAS来操作该合并对象的AtomicReference引用。
- 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,CAS就无法保证操作的原子
- 开销问题自旋CAS如果长时间不成功(不成功就一直循环执行,直到成功),就会给CPU带来非常大的执行开销。解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案为:
- 分散操作热点,使用LongAdder替代基础原子类AtomicLong,LongAdder将单个CAS热点(value值)分散到一个cells数组中。
- 使用队列削峰,将发生CAS争用的线程加入一个队列中排队,降低CAS争用的激烈程度。JUC中非常重要的基础类AQS(抽象队列同步器)就是这么做的。
CAS操作在JDK中的应用
- CAS在java.util.concurrent.atomic包中的原子类、Java AQS以及显式锁、CurrentHashMap等重要并发容器类的实现都有非常广泛的应用。
- 在java.util.concurrent.atomic包的原子类(如AtomicXXX)中都使用了CAS来保障对数字成员进行操作的原子性。
- java.util.concurrent的大多数类(包括显式锁、并发容器)都是基于AQS和AtomicXXX来实现的,其中AQS通过CAS保障它内部双向队列头部、尾部操作的原子性。