线程安全问题

  • 如果你的代码所在的进程中有多个线程在同时运行,而这些线程可能会同时运行这段代码。
  • 如果每次运行结果和单线程运行的结果是一样的,而且其他的变量的值也和预期的是一样的,就是线程安全的,
  • 或者说:一个类或者程序所提供的接口对于线程来说是原子操作或者多个线程之间的切换不会导致该接口的执行结果存在二义性,也就是说我们不用考虑同步的问题 。

线程安全需要保证几个基本特性

  • 原子性,简单说就是相关操作不会中途被其他线程干扰,一般通过同步机制实现。
  • 可见性,是一个线程修改了某个共享变量,其状态能够立即被其他线程知晓,通常被解释为将线程本地状态反映到主内存上,volatile 就是负责保证可见性的。
  • 有序性,是保证线程内串行语义,避免指令重排等。

synchronized使用

简单介绍

  • synchronized 是java语言关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码。

  • synchronized 关键字,它包括两种用法:synchronized 方法synchronized 块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //同步的方法
    pubilc synchronized void test() {

    }

    //同步代码块上
    public void test() {
    synchronized(obj) {
    System.out.println("===");
    }
    }

修饰内容

  • 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象;
  • 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象
  • 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象
  • 修改一个,其作用的范围是synchronized后面括号括起来的部分,作用主的对象是这个类的所有对象

synchonized(this)和synchonized(object)区别

  • synchonized(this)和synchonized(object)区别
  • 其实并没有很大的区别,synchonized(object)本身就包含synchonized(this)这种情况,使用的场景都是对一个代码块进行加锁,效率比直接在方法名上加synchonized高一些(下面分析),唯一的区别就是对象的不同
  • 对synchronized(this)的一些理解
    • 当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行。另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块。
    • 然而,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的非synchronized(this)同步代码块。
    • 尤其关键的是,当一个线程访问object的一个synchronized(this)同步代码块时,其他线程对object中所有其它synchronized(this)同步代码块的访问将被阻塞。
    • 当一个线程访问object的一个synchronized(this)同步代码块时,它就获得了这个object的对象锁。结果,其它线程对该object对象所有同步代码部分的访问都被暂时阻塞。

效率问题

  • sychonized method 和 synchonized代码块的效率问题
  • 使用前者的对象会获取该对象中所有synchonized的锁,也就是其他线程不能访问所有加了synchonized前缀的方法(不仅仅只是当前运行的方法),影响了其他线程多其他同步方法的访问,降低了效率。而后者只对当前代码块加锁,其他的同步方法不受影响。

代码使用案例

第一个案例代码

  • 简单使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    Thread01 t01 = new Thread01();
    System.out.println("synchronized 关键字使用 \n" +"--------------------------");
    Thread ta = new Thread(t01,"A");
    Thread tb = new Thread(t01,"B");
    ta.start();
    tb.start();

    private class Thread01 implements Runnable{
    @Override
    public void run() {
    synchronized (this) {
    for(int i=0;i<3;i++){
    System.out.println(Thread.currentThread().getName()+" synchronized loop "+i);
    }
    }
    }
    }

    执行结果:
    synchronized 关键字使用
    --------------------------
    B synchronized loop 0
    B synchronized loop 1
    B synchronized loop 2
    A synchronized loop 0
    A synchronized loop 1
    A synchronized loop 2

第二个案例代码

  • 同步代码块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    System.out.println("synchronized 关键字使用 \n" +"--------------------------");
    Thread t02A = new Thread(new Runnable() {
    @Override
    public void run() {
    method01();
    }
    },"A");
    Thread t02B = new Thread(new Runnable() {

    @Override
    public void run() {
    method02();
    }
    },"B");
    Thread t02C = new Thread(new Runnable() {
    @Override
    public void run() {
    method3();
    }
    },"C");
    t02A.start();
    t02B.start();
    t02C.start();


    public void method01(){
    synchronized (this) {
    int i=0;
    while(i++ < 3){
    System.out.println(Thread.currentThread().getName() +":"+ i);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    public void method02(){
    //第2种方式:当一个线程访问object的一个synchronized(this)同步代码块时,
    //其他线程对object中所有其它synchronized(this)同步代码块的访问将被阻塞。
    synchronized (this) {
    int j=0;
    while(j++ < 3){
    System.out.println(Thread.currentThread().getName() +":"+ j);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    /*
    * 当一个线程访问object的一个synchronized(this)同步代码块时,
    * 它就获得了这个object的对象锁。
    * 结果,其它线程对该object对象所有同步代码部分的访问都被暂时阻塞。
    */
    public synchronized void method3(){
    int k=0;
    while(k++ < 3){
    System.out.println(Thread.currentThread().getName() +":"+ k);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    执行结果:A和B不能同时运行,B和A不能和C同时运行
    synchronized 关键字使用
    --------------------------
    B:1
    B:2
    B:3
    C:1
    C:2
    C:3
    A:1
    A:2
    A:3

第三个案例代码【 synchronized对象锁 】

  • synchronized对象锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    final InnerObject innerObj = new InnerObject();
    System.out.println("synchronized 关键字使用 \n" +"--------------------------");
    Thread t03A = new Thread(new Runnable() {
    @Override
    public void run() {
    outerMethod01(innerObj);
    }
    },"A");
    Thread t03B = new Thread(new Runnable() {
    @Override
    public void run() {
    outerMethod02(innerObj);
    }
    },"B");
    t03A.start();
    t03B.start();

    class InnerObject{
    private void innerMethod01(){
    int i=0;
    while(i++ < 3){
    System.out.println(Thread.currentThread().getName() +":"+ i);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    private void innerMethod02(){
    int j=0;
    while(j++ < 3){
    System.out.println(Thread.currentThread().getName() +":"+ j);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    /**
    * 外部类方法1
    */
    private void outerMethod01(InnerObject innerObj){
    synchronized (innerObj) {
    innerObj.innerMethod01();
    }
    }

    /**
    * 外部类方法2
    */
    private void outerMethod02(InnerObject innerObj){
    innerObj.innerMethod02();
    }

    执行结果:
    synchronized 关键字使用
    --------------------------
    A:1
    B:1
    B:2
    A:2
    B:3
    A:3

结论

  1. synchronized 方法控制对类成员变量的访问:每个类实例对应一把锁,每个 synchronized 方法都必须获得调用该方法的类实例的锁方能执行,否则所属线程阻塞,方法一旦执行,就独占该锁,直到从该方法返回时才将锁释放,此后被阻塞的线程方能获得该锁,重新进入可执行状态。

  2. 这种机制确保了同一时刻对于每一个类实例,其所有声明为 synchronized 的成员函数中至多只有一个处于可执行状态(因为至多只有一个能够获得该类实例对应的锁),从而有效避免了类成员变量的访问冲突(只要所有可能访问类成员变量的方法均被声明为 synchronized)。

  3. synchronized 块是这样一个代码块,其中的代码必须获得对象 syncObject (如前所述,可以是类实例或类)的锁方能执行。由于可以针对任意代码块,且可任意指定上锁的对象,故灵活性较高。

Synchronized问题

作用于方法和静态方法区别

  • 测试代码如下所示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    private void test() {
    final TestSynchronized test1 = new TestSynchronized();
    final TestSynchronized test2 = new TestSynchronized();
    Thread t1 = new Thread(new Runnable() {

    @Override
    public void run() {
    test1.method01("a");
    //test1.method02("a");
    }
    });
    Thread t2 = new Thread(new Runnable() {

    @Override
    public void run() {
    test2.method01("b");
    //test2.method02("a");
    }
    });
    t1.start();
    t2.start();
    }

    private static class TestSynchronized{
    private int num1;
    public synchronized void method01(String arg) {
    try {
    if("a".equals(arg)){
    num1 = 100;
    System.out.println("tag a set number over");
    Thread.sleep(1000);
    }else{
    num1 = 200;
    System.out.println("tag b set number over");
    }
    System.out.println("tag = "+ arg + ";num ="+ num1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    private static int num2;
    public static synchronized void method02(String arg) {
    try {
    if("a".equals(arg)){
    num2 = 100;
    System.out.println("tag a set number over");
    Thread.sleep(1000);
    }else{
    num2 = 200;
    System.out.println("tag b set number over");
    }
    System.out.println("tag = "+ arg + ";num ="+ num2);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    //调用method01方法打印日志【普通方法】:test1,test2是二个不同的对象,获取的是对象的锁,二个线程能够同时访问method01
    tag a set number over
    tag b set number over
    tag = b;num =200
    tag = a;num =100


    //调用method02方法打印日志【static静态方法】获取的是属于类的锁,同时只能有一个线程能够访问
    tag a set number over
    tag = a;num =100
    tag b set number over
    tag = b;num =200
  • 在static方法前加synchronized:静态方法属于类方法,它属于这个类,获取到的锁,是属于类的锁。

  • 在普通方法前加synchronized:非static方法获取到的锁,是属于当前对象的锁。

  • 结论:类锁和对象锁不同,synchronized修饰不加static的方法,锁是加在单个对象上,不同的对象没有竞争关系;修饰加了static的方法,锁是加载类上,这个类所有的对象竞争一把锁。

Synchronize在编译时如何实现锁机制

  • Synchronized进过编译,会在同步块的前后分别形成monitorenter和monitorexit这个两个字节码指令。在执行monitorenter指令时,首先要尝试获取对象锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象锁,把锁的计算器加1,相应的,在执行monitorexit指令时会将锁计算器就减1,当计算器为0时,锁就被释放了。如果获取对象锁失败,那当前线程就要阻塞,直到对象锁被另一个线程释放为止。

反编译Synchronize代码

  • synchronized,是Java中用于解决并发情况下数据同步访问的一个很重要的关键字。当我们想要保证一个共享资源在同一时间只会被一个线程访问到时,我们可以在代码中使用synchronized关键字对类或者对象加锁。

  • 对上面的代码进行反编译,可以得到如下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public synchronized void doSth();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
    stack=2, locals=1, args_size=1
    0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
    3: ldc #3 // String Hello World
    5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
    8: return

    public void doSth1();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
    stack=2, locals=3, args_size=1
    0: ldc #5 // class com/hollis/SynchronizedTest
    2: dup
    3: astore_1
    4: monitorenter
    5: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
    8: ldc #3 // String Hello World
    10: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
    13: aload_1
    14: monitorexit
    15: goto 23
    18: astore_2
    19: aload_1
    20: monitorexit
    21: aload_2
    22: athrow
    23: return
  • 通过反编译后代码可以看出:对于同步方法,JVM采用ACC_SYNCHRONIZED标记符来实现同步。 对于同步代码块。

  • JVM采用monitorentermonitorexit两个指令来实现同步。

同步方法和代码块实现原理

  • 方法级的同步是隐式的。同步方法的常量池中会有一个ACC_SYNCHRONIZED标志。
    • 当某个线程要访问某个方法的时候,会检查是否有ACC_SYNCHRONIZED,如果有设置,则需要先获得监视器锁,然后开始执行方法,方法执行之后再释放监视器锁。这时如果其他线程来请求执行方法,会因为无法获得监视器锁而被阻断住。
    • 值得注意的是,如果在方法执行过程中,发生了异常,并且方法内部并没有处理该异常,那么在异常被抛到方法外面之前监视器锁会被自动释放。
  • 同步代码块使用monitorentermonitorexit两个指令实现。
    • 可以把执行monitorenter指令理解为加锁,执行monitorexit理解为释放锁。
    • 每个对象维护着一个记录着被锁次数的计数器。未被锁定的对象的该计数器为0,当一个线程获得锁(执行monitorenter)后,该计数器自增变为1,当同一个线程再次获得该对象的锁的时候,计数器再次自增。当同一个线程释放锁(执行monitorexit指令)的时候,计数器再自减。当计数器为0的时候。锁将被释放,其他线程便可以获得锁。
  • 无论是ACC_SYNCHRONIZED还是monitorentermonitorexit都是基于Monitor实现的,在Java虚拟机(HotSpot)中,Monitor是基于C++实现的,由ObjectMonitor实现。
    • ObjectMonitor类中提供了几个方法,如enterexitwaitnotifynotifyAll等。sychronized加锁的时候,会调用objectMonitor的enter方法,解锁的时候会调用exit方法。

synchronized与原子性

  • 原子性是指一个操作是不可中断的,要全部执行完成,要不就都不执行。
    • 线程是CPU调度的基本单位。
      • CPU有时间片的概念,会根据不同的调度算法进行线程调度。当一个线程获得时间片之后开始执行,在时间片耗尽之后,就会失去CPU使用权。所以在多线程场景下,由于时间片在线程间轮换,就会发生原子性问题。
  • 在Java中,为了保证原子性,提供了两个高级的字节码指令monitorentermonitorexit
    • 前面中,介绍过,这两个字节码指令,在Java中对应的关键字就是synchronized
    • 通过monitorentermonitorexit指令,可以保证被synchronized修饰的代码在同一时间只能被一个线程访问,在锁未释放之前,无法被其他线程访问到。因此,在Java中可以使用synchronized来保证方法和代码块内的操作是原子性的。
  • 多线程保证了原子性
    • 线程1在执行monitorenter指令的时候,会对Monitor进行加锁,加锁后其他线程无法获得锁,除非线程1主动解锁。即使在执行过程中,由于某种原因,比如CPU时间片用完,线程1放弃了CPU,但是,他并没有进行解锁。而由于synchronized的锁是可重入的,下一个时间片还是只能被他自己获取到,还是会继续执行代码。直到所有代码执行完。这就保证了原子性。

synchronized与可见性

  • 可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
  • Java内存模型规定了所有的变量都存储在主内存中,每条线程还有自己的工作内存,线程的工作内存中保存了该线程中是用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。不同的线程之间也无法直接访问对方工作内存中的变量,线程间变量的传递均需要自己的工作内存和主存之间进行数据同步进行。所以,就可能出现线程1改了某个变量的值,但是线程2不可见的情况。
  • 前面我们介绍过,被synchronized修饰的代码,在开始执行时会加锁,执行完成后会进行解锁。而为了保证可见性,有一条规则是这样的:对一个变量解锁之前,必须先把此变量同步回主存中。这样解锁后,后续线程就可以访问到被修改后的值。
  • 所以,synchronized关键字锁住的对象,其值是具有可见性的。

synchronized与有序性

  • 有序性即程序执行的顺序按照代码的先后顺序执行。
    • 除了引入了时间片以外,由于处理器优化和指令重排等,CPU还可能对输入代码进行乱序执行,比如load->add->save 有可能被优化成load->save->add 。这就是可能存在有序性问题。
    • 这里需要注意的是,synchronized是无法禁止指令重排和处理器优化的。也就是说,synchronized无法避免上述提到的问题。
  • 那么,为什么还说synchronized也提供了有序性保证呢?
    • 这就要再把有序性的概念扩展一下了。Java程序中天然的有序性可以总结为一句话:如果在本线程内观察,所有操作都是天然有序的。如果在一个线程中观察另一个线程,所有操作都是无序的。
  • 以上这句话也是《深入理解Java虚拟机》中的原句

synchronized与锁优化

  • 前面介绍了synchronized的用法、原理以及对并发编程的作用。是一个很好用的关键字。
  • synchronized其实是借助Monitor实现的,在加锁时会调用objectMonitor的enter方法,解锁的时候会调用exit方法。事实上,只有在JDK1.6之前,synchronized的实现才会直接调用ObjectMonitor的enterexit,这种锁被称之为重量级锁。
  • 所以,在JDK1.6中出现对锁进行了很多的优化,进而出现轻量级锁,偏向锁,锁消除,适应性自旋锁,锁粗化(自旋锁在1.4就有,只不过默认的是关闭的,jdk1.6是默认开启的),这些操作都是为了在线程之间更高效的共享数据 ,解决竞争问题。

AQS(Abstract Queued Synchronizer)

抽象队列同步器。并发包的锁都是基于AQS来实现的,一般我们开发是不直接接触的,它是并发的基础,java并发包底层的API。AQS的过程简述一下,当第一个线程获取锁时,将state状态+1,变成了1。此时当前加锁线程为线程1.然后线程2来获取锁,发现state不等于0,也就是有人占有了锁,此时线程2就到一个队列中排队。这时候线程3,线程N会依次来排队挂起。线程1处理任务完毕,将会唤醒队列中的线程,然后线程就去争取锁,获取到锁的线程就会出队列,重新改变state的值,将当前线程变为自己的。

  • ReentrantLock里面有内部类:NonfairSync、FairSync、Sync,ReentrantLock里面默认构造了NonfairSync()对象,ReentrantLock应该是一个外壳,真正执行功能的对象应该就是NonfairSync了。

  • abstract static class Sync extends AbstractQueuedSynchronizer {}

  • AQS的内部队列应该就是一个双向链表实现的,获取到锁之后就更新state的状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #这是一个指针的头部
    private transient volatile Node head;
    #指针的尾部
    private transient volatile Node tail;
    #状态变量state
    private volatile int state;
    #这个应该似曾相识了,之前Atomic原子系列的CAS,就是基于unsafe来实现的。这里大概也能猜测到了,ReentractLock底层是基于CAS无锁化来实现的了
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    #下面是一些变量的内存指针。
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

State 状态

1
2
3
4
5
private volatile int state;
volatile 保证了 state 的可见性,AQS 同时提供了以下三个原子操作,以保证 state 变量的原子性。
final int getState();
final void setState(int newState);
final boolean compareAndSetState(int expect, int update) ;

等待队列

  • AQS 的等待队列基于一个双向链表实现的,HEAD 节点不关联线程,是一个严格的 FIFO 队列。等待队列中的每个线程都被包装成一个内部类 Node,Node 为一个链表结构, 主要包含 thread + waitStatus + pre + next 四个属性。

  • AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

  • 不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

  • 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    static final class Node {
    // 这个值 大于0 代表此线程取消了等待,
    // 也就是说半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程本尊
    volatile Thread thread;
    //
    Node nextWaiter;
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;
    // ======== 下面的几个int常量是给waitStatus用的 ===========
    /*
    CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
    SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
    CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
    0状态:值为0,代表初始化状态
    */
    /** waitStatus value to indicate thread has cancelled */
    // 代表此线程取消了争抢这个锁
    static final int CANCELLED = 1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
    static final int PROPAGATE = -3;
    }

ReentrantLock

  • ReentrantLock是java.util.concurrent包下提供的一套互斥锁,相比Synchronized,ReentrantLock类提供了一些高级功能,主要有以下3项:

  • 1.等待可中断,持有锁的线程长期不释放的时候,正在等待的线程可以选择放弃等待,这相当于Synchronized来说可以避免出现死锁的情况。

  • 2.公平锁,多个线程等待同一个锁时,必须按照申请锁的时间顺序获得锁,Synchronized锁非公平锁,ReentrantLock默认的构造函数是创建的非公平锁,可以通过参数true设为公平锁,但公平锁表现的性能不是很好。

  • 3.锁绑定多个条件,一个ReentrantLock对象可以同时绑定对个对象。

方法解析

  • lock()方法,就是获取锁的操作,当线程1来的时候,没有人持有锁,所以线程1获取锁成功,修改state状态,设置当前线程为线程1;这时候线程2,线程3依次过来那么线程2,线程3会形成一个双向链表(队列),并且会同步挂起。当线程1执行了unlock()方法,其实就是修改state状态,将当前线程置为NULL,然后最重要的一步是唤醒了队列头的线程。这就是为啥能满足先进先出的原因了。此时线程2被唤醒以后,修改state状态,将当前线程=线程2,然后将链表head的线程与整个链表断开,然后把链表中的线程2设置为NULL的Node。整个锁的操作设计的很巧妙,通过一个state状态来控制。

lock()方法

  • 调用lock方法之后,底层调用的是Sync的lock方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //Lock的lock方法
    public void lock() {
    sync.lock();
    }
    //调用的是Sync的lock
    //这是默认的子类NonfairSync的lock()方法的实现。
    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }
  • 首先采用CAS操作判断state字段,如果state=0,就设置为1:表示某个线程第一次获取锁,将state状态从0到1,并将当前占用线程设置为自己:

    1
    2
    3
    4
    protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
  • 如果不是第一个,调用acquire(获得)方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    //1. tryAcquire 调用 nonfairTryAcquire
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }

    //1.1 nonfairTryAcquire
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 首先获取state的状态
    int c = getState();
    // 如果等于0,说明没有线程获取到锁,
    if (c == 0) {
    // 此时该线程获取到锁,并将自己设为当前线程
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 否则说明有人占有锁,那么再判断下占有锁的线程是否是自己,是的话将state+1.
    // 这是什么意思呢??就是同一个线程可以获取到多次锁,也就是可重入锁。
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    //2. 如果获取锁失败,那么第一个条件就是true了。进入第二个条件:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    //addWaiter方法源码:基于链表来操作
    //2.1 acquireQueued加入到队列

    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    //此时线程2会走到这里.
    if (shouldParkAfterFailedAcquire(p, node) &&
    // 让线程2挂起的源码
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    // 挂起线程
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }

unlock方法

  • 当线程执行unlock()方法之后,state就一直减1,直到0为止,然后将当前加锁线程设为NULL。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public void unlock() {
    sync.release(1);
    }

    public final boolean release(int arg) {
    //AQS也是没有实现的,在ReentrantLock里面有具体的实现。
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    //从队列中唤醒
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
    // tryRelease的实现:
    protected final boolean tryRelease(int releases) {
    //首先将state的变量值-1
    int c = getState() - releases;
    //不是占有的线程去释放,就报异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
    }
  • 从队列中唤醒

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    //按照上文的分析,这个s其实就是线程2
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    //当线程2唤醒以后,就从上文挂起的地方活过来(lock的acquireQueued),又开始争抢锁的操作
    LockSupport.unpark(s.thread);
    }
  • 然后将链表的head下移一位,链表的线程2设置为NULL。这样就相当于线程2从队列中弹出了。

公平锁和非公平锁

  • 默认情况下是使用非公平锁来实现的,而且非公平锁的性能高。传入true后创建公平锁。

  • 再次看lock()方法和unlock()方法了。因为前面详细分析过,这部分我们主要查看有哪些不同。

    方法实现的lock()方法中,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //FairSync
    final void lock() {
    acquire(1);
    }

    //非公平锁的实现:
    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }

    //非公平锁上来就是直接判断state是否等于0,等于0就尝试获取锁。
    //意味着就算没有进入队列的线程,也可以直接参与争抢锁。
    //类比很多人在排队买东西,有个人直接过来就买了,不排队了,,不公平!!
  • acquire方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }


    // 公平FairSync: tryAcquire
    protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    //如果state=0,也就是没有人占有锁的情况下,
    //会先去查看线程是否在队列中,这样就能彻底实现公平了。
    if (!hasQueuedPredecessors() &&
    compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

使用

  • ReentrantLock是java.util.concurrent(并发)包下提供的一套互斥锁,相比Synchronized,ReentrantLock类提供了一些高级功能,主要有以下3项:

    • 等待可中断,持有锁的线程长期不释放的时候,正在等待的线程可以选择放弃等待,这相当于Synchronized来说可以避免出现死锁的情况。
    • 公平锁,多个线程等待同一个锁时,必须按照申请锁的时间顺序获得锁,Synchronized锁非公平锁,ReentrantLock默认的构造函数是创建的非公平锁,可以通过参数true设为公平锁,但公平锁表现的性能不是很好。
    • 锁绑定多个条件,一个ReentrantLock对象可以同时绑定对个对象。
  • 使用方法代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private ReentrantLock lock = new ReentrantLock();
    public void run() {
    lock.lock();
    try{
    for(int i=0;i<5;i++){
    System.out.println(Thread.currentThread().getName()+":"+i);
    }
    }finally{
    lock.unlock();
    }
    }
  • 注意问题:为保证锁释放,每一个 lock() 动作,建议都立即对应一都立即对应一个 try-catch-finally

代码案例分析

  • 代码如下所示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private void test2() {
    Runnable t1 = new MyThread();
    new Thread(t1,"t1").start();
    new Thread(t1,"t2").start();
    }

    class MyThread implements Runnable {
    private ReentrantLock lock = new ReentrantLock();
    public void run() {
    lock.lock();
    try{
    for(int i=0;i<5;i++){
    System.out.println(Thread.currentThread().getName()+":"+i);
    }
    }finally{
    lock.unlock();
    }
    }
    }

    //打印值如下所示: 一次只有一个线程进入
    t1:0
    t1:1
    t1:2
    t1:3
    t1:4
    t2:0
    t2:1
    t2:2
    t2:3
    t2:4

什么时候选择用ReentrantLock

  • 适用场景:时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票

  • 在确实需要一些 synchronized所没有的特性的时候,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票。 ReentrantLock 还具有可伸缩性的好处,应当在高度争用的情况下使用它,但是请记住,大多数 synchronized 块几乎从来没有出现过争用,所以可以把高度争用放在一边。我建议用 synchronized 开发,直到确实证明 synchronized 不合适,而不要仅仅是假设如果使用 ReentrantLock “性能会更好”。请记住,这些是供高级用户使用的高级工具。(而且,真正的高级用户喜欢选择能够找到的最简单工具,直到他们认为简单的工具不适用为止。)。一如既往,首先要把事情做好,然后再考虑是不是有必要做得更快。

  • 使用场景代码展示【摘自ThreadPoolExecutor类,这个类中很多地方用到了这个锁。自己可以查看】:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * Rolls back the worker thread creation.
    * - removes worker from workers, if present
    * - decrements worker count
    * - rechecks for termination, in case the existence of this
    * worker was holding up termination
    */
    private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    if (w != null)
    workers.remove(w);
    decrementWorkerCount();
    tryTerminate();
    } finally {
    mainLock.unlock();
    }
    }

公平锁和非公平锁有何区别

  • 公平性是指在竞争场景中,当公平性为真时,会倾向于将锁赋予等待时间最久的线程。公平性是减少线程“饥饿”(个别线程长期等待锁,但始终无法获取)情况发生的一个办法。

    • 1、公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。
    • 2、非公平锁保证:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁。
    • 看下面代码案例所示:可以得出结论,公平锁指的是哪个线程先运行,那就可以先得到锁。非公平锁是不管线程是否是先运行,新的线程都有可能抢占已经在排队的线程的锁。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    private void test3() {
    Service service = new Service();
    ThreadClass tcArray[] = new ThreadClass[10];
    for(int i=0;i<10;i++){
    tcArray[i] = new ThreadClass(service);
    tcArray[i].start();
    }
    }

    public class Service {
    ReentrantLock lock = new ReentrantLock(true);
    Service() {
    }

    void getThreadName() {
    System.out.println(Thread.currentThread().getName() + " 已经被锁定");
    }
    }
    public class ThreadClass extends Thread{
    private Service service;
    ThreadClass(Service service) {
    this.service = service;
    }

    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + " 抢到了锁");
    service.lock.lock();
    try {
    service.getThreadName();
    }finally {
    service.lock.unlock();
    }
    }
    }
    //当ReentrantLock设置true,也就是公平锁时
    Thread-0 抢到了锁
    Thread-1 抢到了锁
    Thread-1 已经被锁定
    Thread-0 已经被锁定
    Thread-2 抢到了锁
    Thread-2 已经被锁定
    Thread-3 抢到了锁
    Thread-3 已经被锁定
    Thread-4 抢到了锁
    Thread-4 已经被锁定
    Thread-5 抢到了锁
    Thread-5 已经被锁定
    Thread-6 抢到了锁
    Thread-6 已经被锁定
    Thread-7 抢到了锁
    Thread-7 已经被锁定
    Thread-8 抢到了锁
    Thread-8 已经被锁定
    Thread-9 抢到了锁
    Thread-9 已经被锁定

    //当ReentrantLock设置false,也就是非公平锁时
    Thread-0 抢到了锁
    Thread-2 抢到了锁
    Thread-1 抢到了锁
    Thread-2 已经被锁定
    Thread-0 已经被锁定
    Thread-3 抢到了锁
    Thread-3 已经被锁定
    Thread-1 已经被锁定
    Thread-4 抢到了锁
    Thread-4 已经被锁定
    Thread-5 抢到了锁
    Thread-7 抢到了锁
    Thread-5 已经被锁定
    Thread-7 已经被锁定
    Thread-8 抢到了锁
    Thread-8 已经被锁定
    Thread-9 抢到了锁
    Thread-9 已经被锁定
    Thread-6 抢到了锁
    Thread-6 已经被锁定

ReentrantLock问题

  • ReentrantLock是Lock的实现类,是一个互斥的同步器,在多线程高竞争条件下,ReentrantLock比synchronized有更加优异的性能表现。
  • 1 用法比较
    • Lock使用起来比较灵活,但是必须有释放锁的配合动作
    • Lock必须手动获取与释放锁,而synchronized不需要手动释放和开启锁
    • Lock只适用于代码块锁,而synchronized可用于修饰方法、代码块等
  • 2 特性比较
    • ReentrantLock的优势体现在:
      • 具备尝试非阻塞地获取锁的特性:当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁
      • 能被中断地获取锁的特性:与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
      • 超时获取锁的特性:在指定的时间范围内获取锁;如果截止时间到了仍然无法获取锁,则返回
  • 3 注意事项
    • 在使用ReentrantLock类的时,一定要注意三点:
      • 在finally中释放锁,目的是保证在获取锁之后,最终能够被释放
      • 不要将获取锁的过程写在try块内,因为如果在获取锁时发生了异常,异常抛出的同时,也会导致锁无故被释放。
      • ReentrantLock提供了一个newCondition的方法,以便用户在同一锁的情况下可以根据不同的情况执行等待或唤醒的动作。

Synchronize和ReentrantLock区别

相似点:

  • 这两种同步方式有很多相似之处,它们都是加锁方式同步,而且都是阻塞式的同步,也就是说当如果一个线程获得了对象锁,进入了同步块,其他访问该同步块的线程都必须阻塞在同步块外面等待,而进行线程阻塞和唤醒的代价是比较高的(操作系统需要在用户态与内核态之间来回切换,代价很高,不过可以通过对锁优化进行改善)。

区别:

API层面

  • 这两种方式最大区别就是对于Synchronized来说,它是java语言的关键字,是原生语法层面的互斥,需要jvm实现。而ReentrantLock它是JDK 1.5之后提供的API层面的互斥锁,需要lock()和unlock()方法配合try/finally语句块来完成。

  • synchronized既可以修饰方法,也可以修饰代码块。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //synchronized修饰一个方法时,这个方法叫同步方法。
    public synchronized void test() {
    //方法体``

    }

    synchronized(Object) {
    //括号中表示需要锁的对象.
    //线程执行的时候会对Object上锁
    }
  • ReentrantLock使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private ReentrantLock lock = new ReentrantLock();
    public void run() {
    lock.lock();
    try{
    for(int i=0;i<5;i++){
    System.out.println(Thread.currentThread().getName()+":"+i);
    }
    }finally{
    lock.unlock();
    }
    }

等待可中断

  • 等待可中断是指当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。可等待特性对处理执行时间非常长的同步快很有帮助。
  • 具体来说,假如业务代码中有两个线程,Thread1 Thread2。假设 Thread1 获取了对象object的锁,Thread2将等待Thread1释放object的锁。
    • 使用synchronized。如果Thread1不释放,Thread2将一直等待,不能被中断。synchronized也可以说是Java提供的原子性内置锁机制。内部锁扮演了互斥锁(mutual exclusion lock ,mutex)的角色,一个线程引用锁的时候,别的线程阻塞等待。
    • 使用ReentrantLock。如果Thread1不释放,Thread2等待了很长时间以后,可以中断等待,转而去做别的事情。

公平锁

  • 公平锁是指多个线程在等待同一个锁时,必须按照申请的时间顺序来依次获得锁;而非公平锁则不能保证这一点。非公平锁在锁被释放时,任何一个等待锁的线程都有机会获得锁。

  • synchronized的锁是非公平锁,ReentrantLock默认情况下也是非公平锁,但可以通过带布尔值的构造函数要求使用公平锁。

    • ReentrantLock 构造器的一个参数是boolean值,它允许您选择想要一个公平(fair)锁,还是一个不公平(unfair)锁。公平锁:使线程按照请求锁的顺序依次获得锁, 但是有成本;不公平锁:则允许讨价还价
    • 那么如何用代码设置公平锁呢?通过构造方法传入参数true.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
    * Creates an instance of {@code ReentrantLock}.
    * This is equivalent to using {@code ReentrantLock(false)}.
    */
    public ReentrantLock() {
    sync = new NonfairSync();
    }

    /**
    * Creates an instance of {@code ReentrantLock} with the
    * given fairness policy. 给定公平政策。
    *
    * @param fair {@code true} if this lock should use a fair ordering policy
    */
    public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    }

锁定的对象或者方法区别

  • ReentrantLock可以同时绑定多个Condition对象,只需多次调用newCondition方法即可。
  • synchronized中,锁对象的wait()和notify()或notifyAll()方法可以实现一个隐含的条件。但如果要和多于一个的条件关联的时候,就不得不额外添加一个锁。

ReentrantLock和synchronized使用分析

  • ReentrantLock是Lock的实现类,是一个互斥的同步器,在多线程高竞争条件下,ReentrantLock比synchronized有更加优异的性能表现。
  • 1 用法比较
    • Lock使用起来比较灵活,但是必须有释放锁的配合动作
    • Lock必须手动获取与释放锁,而synchronized不需要手动释放和开启锁
    • Lock只适用于代码块锁,而synchronized可用于修饰方法、代码块等
  • 2 特性比较
    • ReentrantLock的优势体现在:
      • 具备尝试非阻塞地获取锁的特性:当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁
      • 能被中断地获取锁的特性:与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
      • 超时获取锁的特性:在指定的时间范围内获取锁;如果截止时间到了仍然无法获取锁,则返回
  • 3 注意事项
    • 在使用ReentrantLock类的时,一定要注意三点:
      • 在finally中释放锁,目的是保证在获取锁之后,最终能够被释放
      • 不要将获取锁的过程写在try块内,因为如果在获取锁时发生了异常,异常抛出的同时,也会导致锁无故被释放。
      • ReentrantLock提供了一个newCondition的方法,以便用户在同一锁的情况下可以根据不同的情况执行等待或唤醒的动作。

读写锁

  • 它是将读写进行了分离,读线程可以同时多个并行,写请求只有一个线程,读写是互斥的。

例子

  • 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class ReentractReadWriteLockDemo {
    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    //获取写锁
    static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    //获取读锁
    static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {
    writeLock.lock();
    writeLock.unlock();
    readLock.lock();
    readLock.unlock();

    }
    }

ReentrantReadWriteLock

  • 构造函数默认非公平

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public ReentrantReadWriteLock() {
    this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
    //默认的是非公平锁
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
    }

    //通过这里可以看出来,所谓的readLock或者是WriteLock底层的实现其实就是sync,ReentrantReadWriteLock就是一个包装的外壳。ReadLock和WriteLock都是ReentrantReadWriteLock都是其内部类。
    protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
    }

写锁获取

  • WriteLock写锁的lock()方法剖析:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public void lock() {
    sync.acquire(1);
    }
    // 这个方法就是AQS类的方法,tryAcquire()方法在AQS没有实现,所以我们应该看看你它的具体实现。
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  • tryAcquire()方法,这个方法其实就是加锁的操作,具体看看它的实现:ReentrantReadWriteLock->Sync。state & 0x0000FFFF(相同为1),为读锁结果为0,写锁结果大于0.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    static final int SHARED_SHIFT   = 16;
    //1左移16位减1=>0000_0000_0000_0000_1111_1111_1111_1111
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //返回读状态的值
    protected final boolean tryAcquire(int acquires) {
    /*
    * Walkthrough:
    * 1. 如果读状态不为0或者写状态不为0并且写线程不是自己,返回false
    * 2. 如果已经超过了可重入的计数值MAX_COUNT,就会返回false
    * 3. 如果该线程是可重入获取或队列策略允许,则该线程有资格获得锁定;同时更新所有者和写锁状态值
    */
    Thread current = Thread.currentThread(); //获取当前线程
    int c = getState(); //获取当前写锁状态值
    int w = exclusiveCount(c); //获取写状态的值
    //当同步状态state值不等于0的时候,如果写状态(state & 0x0000FFFF)等于0的话,读状态是大于0的,表示读锁被获取
    if (c != 0) {
    //说明是加了读锁,但是不是该线程。
    if (w == 0 || current != getExclusiveOwnerThread())
    return false;
    //写锁 之前的锁 +1
    if (w + exclusiveCount(acquires) > MAX_COUNT) //如果已经超过了可重入的计数值MAX_COUNT,就会返回false
    throw new Error("Maximum lock count exceeded");
    // 重入锁:更新状态值
    setState(c + acquires);
    return true;
    }
    //前一个方法表示是公平锁实现还是非公平锁实现的,公平锁还要再判断是否在队列里,非公平锁则不需要。
    if (writerShouldBlock() ||
    !compareAndSetState(c, c + acquires))
    return false;
    setExclusiveOwnerThread(current);
    return true;
    }
  • 线程1第一次来获取锁,线程1重入加锁,线程2来获取锁,但是没有成功,入队列:

写锁释放

  • 写锁的释放

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // WriteLock
    public void unlock() {
    sync.release(1);
    }

    // AQS
    public final boolean release(int arg) {
    // 1. 释放锁
    if (tryRelease(arg)) {
    // 2. 如果独占锁释放"完全",唤醒后继节点
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

    // Sync
    // 释放锁,是线程安全的,因为写锁是独占锁,具有排他性
    // 实现很简单,state 减 1 就是了
    protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
    setExclusiveOwnerThread(null);
    setState(nextc);
    // 如果 exclusiveCount(nextc) == 0,也就是说包括重入的,所有的写锁都释放了,
    // 那么返回 true,这样会进行唤醒后继节点的操作。
    return free;
    }

读锁获取

  • ReadLocklock代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public void lock() {
    sync.acquireShared(1);
    }


    public final void acquireShared(int arg) {
    //可以发现AQS也没有实现tryAcquireShared()方法,那么看子类是如何实现的,也是重点剖析的
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
    }

  • tryAcquireShared()方法的具体实现:AbstractQueuedSynchronizer-> Sync

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //返回读状态的值
    protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //判断别的线程是否占有了写锁(不等于 0,说明有线程持有写锁,)而且不是当前线程持有写锁,那么当前线程获取读锁失败
    if (exclusiveCount(c) != 0 &&
    getExclusiveOwnerThread() != current)
    return -1;
    //读锁的获取次数,获取高16位,判断是否加了读锁
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
    r < MAX_COUNT &&
    // 下面这行 CAS 是将 state 属性的高 16 位加 1,低 16 位不变,如果成功就代表获取到了读锁
    compareAndSetState(c, c + SHARED_UNIT)) {
    // r == 0 说明此线程是第一个获取读锁的,或者说在它前面获取读锁的都走光光了,它也算是第一个吧
    if (r == 0) {
    // 记录 firstReader 为当前线程,及其持有的读锁数量:1
    firstReader = current;
    firstReaderHoldCount = 1;
    } else if (firstReader == current) {
    // 进来这里,说明是 firstReader 重入获取读锁(这非常简单,count 加 1 结束)
    firstReaderHoldCount++;
    } else {
    // cachedHoldCounter用于缓存最后一个获取读锁的线程
    // 如果 cachedHoldCounter 缓存的不是当前线程,设置为缓存当前线程的 HoldCounter
    HoldCounter rh = cachedHoldCounter;
    if (rh == null || rh.tid != getThreadId(current))
    cachedHoldCounter = rh = readHolds.get();
    else if (rh.count == 0)
    // cachedHoldCounter 缓存的是当前线程,但是 count 为 0,
    readHolds.set(rh);
    rh.count++;
    }
    // return 大于 0 的数,代表获取到了共享锁
    return 1;
    }
    return fullTryAcquireShared(current);
    }
  • 假设这时候线程3来加读锁,如果有写锁了,获取失败,进入方法:doAcquireShared

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    if (interrupted)
    selfInterrupt();
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
  • 这时候的图形是:

读锁释放

  • 读锁释放的过程还是比较简单的,主要就是将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉。

  • 然后是在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    // ReadLock
    public void unlock() {
    sync.releaseShared(1);
    }
    // Sync
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared(); // 这句代码其实唤醒 获取写锁的线程,往下看就知道了
    return true;
    }
    return false;
    }

    // Sync
    protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
    if (firstReaderHoldCount == 1)
    // 如果等于 1,那么这次解锁后就不再持有锁了,把 firstReader 置为 null,给后来的线程用
    // 为什么不顺便设置 firstReaderHoldCount = 0?因为没必要,其他线程使用的时候自己会设值
    firstReader = null;
    else
    firstReaderHoldCount--;
    } else {
    // 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取
    HoldCounter rh = cachedHoldCounter;
    if (rh == null || rh.tid != getThreadId(current))
    rh = readHolds.get();

    int count = rh.count;
    if (count <= 1) {

    // 这一步将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了
    readHolds.remove();

    if (count <= 0)
    // 就是那种,lock() 一次,unlock() 好几次的逗比
    throw unmatchedUnlockException();
    }
    // count 减 1
    --rh.count;
    }

    for (;;) {
    int c = getState();
    // nextc 是 state 高 16 位减 1 后的值
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
    // 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了
    // 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程
    return nextc == 0;
    }
    }
  • // ReadLock
    public void unlock() {
        sync.releaseShared(1);
    }
    // Sync
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared(); // 这句代码其实唤醒 获取写锁的线程,往下看就知道了
            return true;
        }
        return false;
    }
    
    // Sync
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            if (firstReaderHoldCount == 1)
                // 如果等于 1,那么这次解锁后就不再持有锁了,把 firstReader 置为 null,给后来的线程用
                // 为什么不顺便设置 firstReaderHoldCount = 0?因为没必要,其他线程使用的时候自己会设值
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            // 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
    
            int count = rh.count;
            if (count <= 1) {
    
                // 这一步将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了
                readHolds.remove();
    
                if (count <= 0)
                    // 就是那种,lock() 一次,unlock() 好几次的逗比
                    throw unmatchedUnlockException();
            }
            // count 减 1
            --rh.count;
        }
    
        for (;;) {
            int c = getState();
            // nextc 是 state 高 16 位减 1 后的值
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了
                // 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程
                return nextc == 0;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50

    ### 锁降级

    锁降级指的是写锁降级成为读锁,指持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。

    ## Condition

    - Condition 接口提供了类似 Object 的监视器方法(主要包括 wait()、wait(long timeout)、notify() 以及 notifyAll() 方法),与 Lock 配合可以实现**等待/通知模式**。

    - AQS 内部提供了 Condition 的实现类,每个Condition对象都包含着一个队列(以下称为等待队列),该队列其实就是 AQS 的 FIFO 队列,是 Condition 对象实现等待/通知功能的关键。

    ![](22-Java的锁/a02c702d8b5d950623f336b3920004d4.png)

    - 在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,对应关系如下:

    ![](22-Java的锁/5ec492a00387c265072c88faa3d66a64.png)

    ### 等待

    - 调用 Condition 的 await() 方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await() 方法返回时,当前线程一定获取了Condition 相关联的锁。**当当前线程调用condition.await()方法后,会使得当前线程释放lock然后加入到等待队列中,直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理**。

    - 如果从队列(同步队列和等待队列)的角度看 await() 方法,当调用 await() 方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition 的等待队列中。

    ![](22-Java的锁/942d5eb53cae716477be20b5ba317e98.png)

    - 代码如下:

    ```java
    public final void await() throws InterruptedException {
    //
    if (Thread.interrupted())
    throw new InterruptedException();
    // 当前线程加入到等待队列
    Node node = addConditionWaiter();
    // 释放同步锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步队列中
    while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
    }

唤醒

  • 调用 Condition 的 signal() 方法(调用该方法的前置条件是当前线程必须获取了锁),将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中并使用LockSupport唤醒节点中的线程。
1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

例子

  • 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public class ConditionDemo {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static void main(String[] args) {
    new Thread() {
    @Override
    public void run() {

    lock.lock();
    System.out.println("第一个线程await之前的操作............");

    try {
    condition.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    System.out.println("第一个线程await之后的操作............");
    lock.unlock();
    }
    }.start();

    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    new Thread() {
    @Override
    public void run() {
    lock.lock();
    System.out.println("第二个线程唤醒线程1之前............");
    condition.signal();
    System.out.println("第二个线程唤醒线程1之后............");
    lock.unlock();
    }
    }.start();
    }
    }


    // 运行结果
    第一个线程await之前的操作............
    第二个线程唤醒线程1之前............
    第二个线程唤醒线程1之后............
    第一个线程await之后的操作............
  • demo演示图,当线程1执行完await()方法,线程2获取到锁的时候:

    1. 线程1获取锁成功,修改state为1,输出第一条语句。线程2进入等待队列,阻塞等待
    2. 调用await后,将线程1放到等待队列。释放锁
    3. 线程2可以成功获取到锁。

  • 当线程2执行完signal()操作会怎样呢?

    1. 线程2执行signal后,线程1由阻塞队列的被放到同步队列中
    2. 线程2继续执行,线程1等待获取锁后继续执行

  • 其实可以发现,当线程2执行完signal()方法,其实是将线程1从condition队列放到了阻塞等待队列。这样线程1又可以等待唤醒,来继续获取锁了。

  • 线程1执行await()方法其实是将自己放入到一个condition队列,并将自己挂起。然后线程2执行signal()方法,会从condition队列转到阻塞等待队列。然后线程2释放锁,就会唤醒线程1来执行了。

总结

  • 线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,

  • 而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,

  • 当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列。

  • 通过使用condition提供的await和signal/signalAll方法就可以实现这种机制,而这种机制能够解决最经典的问题就是“生产者与消费者问题”

死锁

代码模拟产生死锁

synchronized方式

  • 启动两个线程,设置两个线程监听对象obj1 、obj2。

    • 线程1启动的时候,先获取obj1锁,暂停1秒,然后获取obj2锁。
    • 线程2启动时,先获取obj2,再获取obj1
  • 当线程2启动的时候获取obj2成功,然后去获取obj1的时候,obj1被线程1占用,此时就等待。线程1秒后去获取obj2,此时obj2锁被线程2握着,产生死锁,互相无法获取。

  • 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package top.fulsun.lock;

    public class TestDeathLock {
    static Object obj1 = new Object();
    static Object obj2 = new Object();

    public static void main(String[] args) {
    new Thread(() -> {
    synchronized (obj1) {
    try {
    System.out.println(Thread.currentThread().getName() + ":获取到了Obj1");
    Thread.sleep(1000);
    System.out.println(Thread.currentThread().getName() + ":休息1秒");
    System.out.println(Thread.currentThread().getName() + ":尝试获取obj2");
    synchronized (obj2) {
    System.out.println(Thread.currentThread().getName() + ":获取到了Obj2");
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }


    }

    }, "A").start();

    new Thread(() -> {
    synchronized (obj2) {
    System.out.println(Thread.currentThread().getName() + ":获取到了Obj2");
    System.out.println(Thread.currentThread().getName() + ":尝试获取obj1");
    synchronized (obj1) {
    System.out.println(Thread.currentThread().getName() + ":获取到了Obj1");
    }
    }

    }, "B").start();

    }


    }

    //结果: 线程没有结束
    A:获取到了Obj1
    B:获取到了Obj2
    B:尝试获取obj1
    A:休息1
    A:尝试获取obj2

    // 取消掉sleep(1000)后,会有通过的情况
    // 但是多次触发后,发现最终也会出现死锁问题。
    A:获取到了Obj1
    A:休息1
    A:尝试获取obj2
    A:获取到了Obj2
    B:获取到了Obj2
    B:尝试获取obj1
    B:获取到了Obj1

    Process finished with exit code 0

Lock方式

  • 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package top.fulsun.lock;

    import java.util.concurrent.locks.ReentrantLock;

    public class TestDeathLock {
    static ReentrantLock lock1 = new ReentrantLock();
    static ReentrantLock lock2 = new ReentrantLock();

    public static void main(String[] args) {
    new Thread(() -> {
    lock1.lock();
    try {
    System.out.println(Thread.currentThread().getName() + ":获取到了 lock1");
    System.out.println(Thread.currentThread().getName() + ":休息1秒");
    Thread.sleep(1000);
    System.out.println(Thread.currentThread().getName() + ":尝试获取 lock2");
    lock2.lock();
    try {
    System.out.println(Thread.currentThread().getName() + ":获取到了 lock22");
    } finally {
    lock2.unlock();
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    lock1.unlock();
    }

    }, "C").start();

    new Thread(() -> {
    lock2.lock();
    System.out.println(Thread.currentThread().getName() + ":获取到了 lock2");
    System.out.println(Thread.currentThread().getName() + ":尝试获取 lock1");
    lock1.lock();
    System.out.println(Thread.currentThread().getName() + ":获取到了 lock1");
    lock1.unlock();
    lock2.unlock();
    }, "D").start();
    }
    }

    // 出现死锁的结果:
    C:获取到了 lock1
    C:休息1
    D:获取到了 lock2
    D:尝试获取 lock1
    C:尝试获取 lock2

死锁发生的场景

  • 死锁不仅仅是在线程之间会发生,存在资源独占的进程之间同样也可能出现死锁。大多是聚焦在多线程场景中的死锁,指两个或多个线程之间,由于互相持有对方需要的锁,而永久处于阻塞的状态。

导致死锁的原因

  • 在申请锁时发生了交叉闭环申请。即线程在获得了锁1并且没有释放的情况下去申请锁2,这时,另一个线程已经获得了锁2,在释放锁2之前又要先获得锁1,因此闭环发生,陷入死锁循环。

死锁的危害

  • 从上面死锁代码案例可以知道,当发生死锁的时候,导致彼此一直处于等待之中,而导致代码无法执行下去。只能重启,后果比较严重!
  • 在死锁时,线程间相互等待资源,而又不释放自身的资源,导致无穷无尽的等待,其结果是系统任务永远无法执行完成。系统发生死锁现象不仅浪费大量的系统资源,甚至导致整个系统崩溃,带来灾难性后果。

死锁问题条件

  • 互斥条件:一个资源每次只能被一个线程使用。
  • 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
  • 不剥夺条件:进程已获得的资源,在未使用完之前,不能强行剥夺。
  • 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

如何预防死锁

  • 死锁发生时的四个必要条件,只要破坏这四个必要条件中的任意一个条件,死锁就不会发生。这就为我们解决死锁问题提供了可能。一般地,解决死锁的方法分为死锁的预防避免检测[定位死锁的位置]与恢复三种(注意:死锁的检测与恢复是一个方法)。
    锁的预防是保证系统不进入死锁状态的一种策略。它的基本思想是要求进程申请资源时遵循某种协议,从而打破产生死锁的四个必要条件中的一个或几个,保证系统不会进入死锁状态。
    • 打破互斥条件。即允许进程同时访问某些资源。但是,有的资源是不允许被同时访问的,像打印机等等,这是由资源本身的属性所决定的。所以,这种办法并无实用价值。
    • 打破不可抢占条件。即允许进程强行从占有者那里夺取某些资源。就是说,当一个进程已占有了某些资源,它又申请新的资源,但不能立即被满足时,它必须释放所占有的全部资源,以后再重新申请。它所释放的资源可以分配给其它进程。这就相当于该进程占有的资源被隐蔽地强占了。这种预防死锁的方法实现起来困难,会降低系统性能。
    • 打破占有且申请条件。可以实行资源预先分配策略。即进程在运行前一次性地向系统申请它所需要的全部资源。如果某个进程所需的全部资源得不到满足,则不分配任何资源,此进程暂不运行。只有当系统能够满足当前进程的全部资源需求时,才一次性地将所申请的资源全部分配给该进程。由于运行的进程已占有了它所需的全部资源,所以不会发生占有资源又申请资源的现象,因此不会发生死锁。但是,这种策略也有如下缺点:
      • 在许多情况下,一个进程在执行之前不可能知道它所需要的全部资源。这是由于进程在执行时是动态的,不可预测的;
      • 资源利用率低。无论所分资源何时用到,一个进程只有在占有所需的全部资源后才能执行。即使有些资源最后才被该进程用到一次,但该进程在生存期间却一直占有它们,造成长期占着不用的状况。这显然是一种极大的资源浪费;
      • 降低了进程的并发性。因为资源有限,又加上存在浪费,能分配到所需全部资源的进程个数就必然少了。
    • 打破循环等待条件,实行资源有序分配策略。采用这种策略,即把资源事先分类编号,按号分配,使进程在申请,占用资源时不会形成环路。所有进程对资源的请求必须严格按资源序号递增的顺序提出。进程占用了小号资源,才能申请大号资源,就不会产生环路,从而预防了死锁。这种策略与前面的策略相比,资源的利用率和系统吞吐量都有很大提高,但是也存在以下缺点:
      • 限制了进程对资源的请求,同时给系统中所有资源合理编号也是件困难事,并增加了系统开销;
      • 为了遵循按编号申请的次序,暂不使用的资源也需要提前申请,从而增加了进程对资源的占用时间。

死锁诊断与修复

如何定位死锁

  • 定位死锁最常用的工具就是利用jstack等工具获取线程栈,然后定位相互之间的依赖关系,进而找到死锁。如果是比较明显的死锁,往往jstack工具就能直接定位,类似JConsole甚至可以在图形界面进行有限的死锁检测。

  • 如果程序运行时发生了死锁,绝大多数情况下都是无法在线解决的,只能重启、修正程序本身问题。所以,代码开发阶段相互审查,或者利用工具进行预防性排查,也是很重要的。

    • 1.首先,可以使用 jps 或者系统的 ps 命令、任务管理器等工具,确定进程 ID。
    • 2.调用 jstack 获取线程栈:jstack your-pid
    • 3.然后看看日志,思考怎么用studio将日志打印出来呢?
  • jstack的部分内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    Java stack information for the threads listed above:
    ===================================================
    "B":
    at top.fulsun.lock.TestDeathLock.lambda$main$1(TestDeathLock.java:32)
    - waiting to lock <0x00000000d60f9d40> (a java.lang.Object)
    - locked <0x00000000d60f9d50> (a java.lang.Object)
    at top.fulsun.lock.TestDeathLock$$Lambda$2/1078694789.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)
    "A":
    at top.fulsun.lock.TestDeathLock.lambda$main$0(TestDeathLock.java:15)
    - waiting to lock <0x00000000d60f9d50> (a java.lang.Object)
    - locked <0x00000000d60f9d40> (a java.lang.Object)
    at top.fulsun.lock.TestDeathLock$$Lambda$1/1324119927.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

    Found 1 deadlock.

死锁修复方案

  • 如果在死锁检查时发现了死锁情况,那么就要努力消除死锁,使系统从死锁状态中恢复过来。消除死锁的几种方式:
    • 最简单、最常用的方法就是进行系统的重新启动,不过这种方法代价很大,它意味着在这之前所有的进程已经完成的计算工作都将付之东流,包括参与死锁的那些进程,以及未参与死锁的进程;
    • 撤消进程,剥夺资源。终止参与死锁的进程,收回它们占有的资源,从而解除死锁。这时又分两种情况:一次性撤消参与死锁的全部进程,剥夺全部资源;或者逐步撤消参与死锁的进程,逐步收回死锁进程占有的资源。一般来说,选择逐步撤消的进程时要按照一定的原则进行,目的是撤消那些代价最小的进程,比如按进程的优先级确定进程的代价;考虑进程运行时的代价和与此进程相关的外部作业的代价等因素;
    • 进程回退策略,即让参与死锁的进程回退到没有发生死锁前某一点处,并由此点处继续执行,以求再次执行时不再发生死锁。虽然这是个较理想的办法,但是操作起来系统开销极大,要有堆栈这样的机构记录进程的每一步变化,以便今后的回退,有时这是无法做到的。

死锁类型

  • 在JAVA编程中,有3种典型的死锁类型:
    • 静态的锁顺序死锁,
    • 动态的锁顺序死锁,
    • 协作对象之间发生的死锁。

静态的锁顺序死锁

  • a和b两个方法都需要获得A锁和B锁。

    • 一个线程执行a方法且已经获得了A锁,在等待B锁;另一个线程执行了b方法且已经获得了B锁,在等待A锁。这种状态,就是发生了静态的锁顺序死锁。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    //可能发生静态锁顺序死锁的代码
    class StaticLockOrderDeadLock {
    private final Object lockA = new Object();
    private final Object lockB = new Object();

    public void a() {
    synchronized (lockA) {
    synchronized (lockB) {
    System.out.println("function a");
    }
    }
    }

    public void b() {
    synchronized (lockB) {
    synchronized (lockA) {
    System.out.println("function b");
    }
    }
    }
    }
  • 解决静态的锁顺序死锁的方法就是:所有需要多个锁的线程,都要以相同的顺序来获得锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    //正确的代码
    class StaticLockOrderDeadLock {
    private final Object lockA = new Object();
    private final Object lockB = new Object();

    public void a() {
    synchronized (lockA) {
    synchronized (lockB) {
    System.out.println("function a");
    }
    }
    }

    public void b() {
    synchronized (lockA) {
    synchronized (lockB) {
    System.out.println("function b");
    }
    }
    }
    }

动态的锁顺序死锁

  • 动态的锁顺序死锁是指两个线程调用同一个方法时,传入的参数颠倒造成的死锁。

    • 如下代码,一个线程调用了transferMoney方法并传入参数accountA,accountB;另一个线程调用了transferMoney方法并传入参数accountB,accountA。此时就可能发生在静态的锁顺序死锁中存在的问题,即:第一个线程获得了accountA锁并等待accountB锁,第二个线程获得了accountB锁并等待accountA锁。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //可能发生动态锁顺序死锁的代码
    class DynamicLockOrderDeadLock {
    public void transefMoney(Account fromAccount, Account toAccount, Double amount) {
    synchronized (fromAccount) {
    synchronized (toAccount) {
    //...
    fromAccount.minus(amount);
    toAccount.add(amount);
    //...
    }
    }
    }
    }
  • 动态的锁顺序死锁解决方案如下:使用System.identifyHashCode来定义锁的顺序。确保所有的线程都以相同的顺序获得锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//正确的代码
class DynamicLockOrderDeadLock {
private final Object myLock = new Object();

public void transefMoney(final Account fromAccount, final Account toAccount, final Double amount) {
class Helper {
public void transfer() {
//...
fromAccount.minus(amount);
toAccount.add(amount);
//...
}
}
int fromHash = System.identityHashCode(fromAccount);
int toHash = System.identityHashCode(toAccount);

if (fromHash < toHash) {
synchronized (fromAccount) {
synchronized (toAccount) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {
synchronized (toAccount) {
synchronized (fromAccount) {
new Helper().transfer();
}
}
} else {
synchronized (myLock) {
synchronized (fromAccount) {
synchronized (toAccount) {
new Helper().transfer();
}
}
}
}

}
}

协作对象之间发生的死锁

  • 有时,死锁并不会那么明显,比如两个相互协作的类之间的死锁

    • 比如下面的代码:一个线程调用了Taxi对象的setLocation方法,另一个线程调用了Dispatcher对象的getImage方法。此时可能会发生,第一个线程持有Taxi对象锁并等待Dispatcher对象锁,另一个线程持有Dispatcher对象锁并等待Taxi对象锁。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    //可能发生死锁
    class Taxi {
    private Point location, destination;
    private final Dispatcher dispatcher;

    public Taxi(Dispatcher dispatcher) {
    this.dispatcher = dispatcher;
    }

    public synchronized Point getLocation() {
    return location;
    }

    public synchronized void setLocation(Point location) {
    this.location = location;
    if (location.equals(destination))
    dispatcher.notifyAvailable(this);//外部调用方法,可能等待Dispatcher对象锁
    }
    }

    class Dispatcher {
    private final Set<Taxi> taxis;
    private final Set<Taxi> availableTaxis;

    public Dispatcher() {
    taxis = new HashSet<Taxi>();
    availableTaxis = new HashSet<Taxi>();
    }

    public synchronized void notifyAvailable(Taxi taxi) {
    availableTaxis.add(taxi);
    }

    public synchronized Image getImage() {
    Image image = new Image();
    for (Taxi t : taxis)
    image.drawMarker(t.getLocation());//外部调用方法,可能等待Taxi对象锁
    return image;
    }
    }
    • 上面的代码中, 我们在持有锁的情况下调用了外部的方法,这是非常危险的(可能发生死锁)。为了避免这种危险的情况发生, 我们使用开放调用。如果调用某个外部方法时不需要持有锁,我们称之为开放调用。
  • 解决协作对象之间发生的死锁:需要使用开放调用,即避免在持有锁的情况下调用外部的方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    //正确的代码
    class Taxi {
    private Point location, destination;
    private final Dispatcher dispatcher;

    public Taxi(Dispatcher dispatcher) {
    this.dispatcher = dispatcher;
    }

    public synchronized Point getLocation() {
    return location;
    }

    public void setLocation(Point location) {
    boolean flag = false;
    synchronized (this) {
    this.location = location;
    flag = location.equals(destination);
    }
    if (flag)
    dispatcher.notifyAvailable(this);//使用开放调用
    }
    }

    class Dispatcher {
    private final Set<Taxi> taxis;
    private final Set<Taxi> availableTaxis;

    public Dispatcher() {
    taxis = new HashSet<Taxi>();
    availableTaxis = new HashSet<Taxi>();
    }

    public synchronized void notifyAvailable(Taxi taxi) {
    availableTaxis.add(taxi);
    }

    public Image getImage() {
    Set<Taxi> copy;
    synchronized (this) {
    copy = new HashSet<Taxi>(taxis);
    }
    Image image = new Image();
    for (Taxi t : copy)
    image.drawMarker(t.getLocation());//使用开放调用
    return image;
    }
    }

总结

  • 综上,是常见的3种死锁的类型。
  • 即:静态的锁顺序死锁,动态的锁顺序死锁,协作对象之间的死锁。在写代码时,要确保线程在获取多个锁时采用一致的顺序。同时,要避免在持有锁的情况下调用外部方法。

volatile关键字

案例

  • 举个简单的例子:在java中,执行下面这个语句:

    1
    i  = 10;
    • 执行线程必须先在自己的工作线程中对变量i所在的缓存行进行赋值操作,然后再写入主存当中。而不是直接将数值10写入主存当中。
    • 比如同时有2个线程执行这段代码,假如初始时i的值为10,那么我们希望两个线程执行完之后i的值变为12。但是事实会是这样吗?
    • 可能存在下面一种情况:初始时,两个线程分别读取i的值存入各自所在的工作内存当中,然后线程1进行加1操作,然后把i的最新值11写入到内存。此时线程2的工作内存当中i的值还是10,进行加1操作之后,i的值为11,然后线程2把i的值写入内存。
    • 最终结果i的值是11,而不是12。这就是著名的缓存一致性问题。通常称这种被多个线程访问的变量为共享变量。

volatile轻量级锁

  • synchronized是阻塞式同步,在线程竞争激烈的情况下会升级为重量级锁。而volatile就可以说是java虚拟机提供的最轻量级的同步机制。

volatile的用途

  • 但它同时不容易被正确理解,也至于在并发编程中很多程序员遇到线程安全的问题就会使用synchronized。Java内存模型告诉我们,各个线程会将共享变量从主内存中拷贝到工作内存,然后执行引擎会基于工作内存中的数据进行操作处理。
  • 线程在工作内存进行操作后何时会写到主内存中?这个时机对普通变量是没有规定的,而针对volatile修饰的变量给java虚拟机特殊的约定,线程对volatile变量的修改会立刻被其他线程所感知,即不会出现数据脏读的现象,从而保证数据的“可见性”。
  • 被volatile修饰的变量能够保证每个线程能够获取该变量的最新值,从而避免出现数据脏读的现象。

volatile实现原理

  • 在生成汇编代码时会在volatile修饰的共享变量进行写操作的时候会多出Lock前缀的指令
  • 这个Lock指令肯定有神奇的地方,那么Lock前缀的指令在多核处理器下会发现什么事情了?主要有这两个方面的影响:
    1. 将当前处理器缓存行的数据写回系统内存;
    2. 这个写回内存的操作会使得其他CPU里缓存了该内存地址的数据无效

多线程下如何获取最新值

  • 为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完不知道何时会写到内存。
  • 如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是,就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题。所以,在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里。
  • 因此,经过分析我们可以得出如下结论:
    1. Lock前缀的指令会引起处理器缓存写回内存;
    2. 一个处理器的缓存回写到内存会导致其他处理器的缓存失效;
    3. 当处理器发现本地缓存失效后,就会从内存中重读该变量数据,即可以获取当前最新值。
  • 这样针对volatile变量通过这样的机制就使得每个线程都能获得该变量的最新值。

happens-before

八大原则:

  • 单线程happen-before原则:在同一个线程中,书写在前面的操作happen-before后面的操作。

    • 把happen-before 理解成“先于什么发生”

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      int a = 3;      //1
      int b = a + 1; //2

      // 这里对b赋值的操作会用到变量a,那么happen-before原则就保证 //2的中的a的值一定是3,而不是0,
      // 因为//1 书写在//2前面, //1对变量a的赋值操作对//2一定可见。
      // 因为//2 中有用到//1中的变量a,再加上java内存模型提供了“单线程happen-before原则”,所以java虚拟机不许可操作系统对//1 //2 操作进行指令重排序,即不可能有//2 在//1之前发生。

      // 但是对于下面的代码:两个语句直接没有依赖关系,所以指令重排序可能发生,即对b的赋值可能先于对a的赋值。
      int a = 3;
      int b = 4;
  • 锁的happen-before原则:同一个锁的unlock操作happen-before此锁的lock操作。

    • 代码例子

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      public class A {
      public int var;

      private static A a = new A();

      private A(){}

      public static A getInstance(){
      return a;
      }

      public synchronized void method1(){
      var = 3;
      }

      public synchronized void method2(){
      int b = var;
      }

      public void method3(){
      synchronized(new A()){ //注意这里和method1 method2 用的可不是同一个锁哦
      var = 4;
      }
      }
      }

      //线程1执行的代码:
      A.getInstance().method1();
      //线程2执行的代码:
      A.getInstance().method2();
      //线程3执行的代码:
      A.getInstance().method3();

      //如果某个时刻执行完“线程1” 马上执行“线程2”,
      // 因为“线程1”执行A类的method1方法后肯定要释放锁,“线程2”在执行A类的method2方法前要先拿到锁,符合“锁的happen-before原则”,那么在“线程2”method2方法中的变量var一定是3,所以变量b的值也一定是3。

      // 但是如果是“线程1”、“线程3”、“线程2”这个顺序,
      // 那么最后“线程2”method2方法中的b值是3,还是4呢?其结果是可能是3,也可能是4。
      // 的确“线程3”在执行完method3方法后的确要unlock,然后“线程2”有个lock,但是这两个线程用的不是同一个锁,
      // 所以JMM这个两个操作之间不符合八大happen-before中的任何一条,
      // 所以JMM不能保证“线程3”对var变量的修改对“线程2”一定可见,虽然“线程3”先于“线程2”发生。
  • volatile的happen-before原则:对一个volatile变量的写操作happen-before对此变量的任意操作(当然也包括写操作了)。

    • 代码如下

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      volatile int var;
      int b;
      int c;

      // 假设“线程1”执行这段代码,如果某次的执行顺序如下:
      b = 4; //1
      var = 3; //2

      // “线程2”执行这段代码。
      c = var; //3
      c = b; //4

      // 如果执行顺序为: //1 //2 //3 //4。那么有如下推导 ( hd(a,b)表示a happen-before b):
      // 因为有hd(//1,//2) 、hd(//3,//4) (单线程的happen-before原则)
      // 且hd(//2,//3) (volatile的happen-before原则)
      // 所以有 hd(//1,//3),可导出hd(//1,//4) (happen-before原则的传递性)
      // 所以变量c的值最后为4

      // 如果某次的执行顺序如下:
      //1 //3 //2// //4
      // 因为 1 hb 2,2 hb 3,3 hb 4,所以1必然hb 2.
      // 1的运算结果可见于4,则最后b=4;c=4.
  • happen-before的传递性原则:如果A操作 happen-before B操作,B操作happen-before C操作,那么A操作happen-before C操作。

  • 线程启动的happen-before原则:同一个线程的start方法happen-before此线程的其它方法。

  • 线程中断的happen-before原则:对线程interrupt方法的调用happen-before被中断线程的检测到中断发送的代码。

  • 线程终结的happen-before原则:线程中的所有操作都happen-before线程的终止检测。

  • 对象创建的happen-before原则:一个对象的初始化完成先于他的finalize方法调用。

volatile的happens-before关系

并发切入点

  • 经过上面的分析,已经知道了volatile变量可以通过缓存一致性协议保证每个线程都能获得最新值,即满足数据的“可见性”。
  • 并发分析的切入点分为两个核心,三大性质。两大核心:JMM内存模型(主内存和工作内存)以及happens-before;三条性质:原子性,可见性,有序性。

volatile的happens-before

  • 先来看两个核心之一:volatile的happens-before关系。

    • volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。下面我们结合具体的代码,我们利用这条规则推导下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    private void test3() {
    Thread thread1 = new Thread(new Runnable() {
    @Override
    public void run() {
    new VolatileExample().writer();
    }
    });
    Thread thread2 = new Thread(new Runnable() {
    @Override
    public void run() {
    new VolatileExample().reader();
    }
    });
    thread1.start();
    thread2.start();
    }


    class VolatileExample {
    private int a = 0;
    private volatile boolean flag = false;

    public void writer() {
    a = 1; //1
    System.out.println("测试volatile数据1--" + a);
    flag = true; //2
    System.out.println("测试volatile数据2--" + flag);
    }

    public void reader() {
    try {
    Thread.sleep(0);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("测试volatile数据3--" + flag);
    if (flag) { //3
    int i = a; //4
    System.out.println("测试volatile数据4--" + i);
    }
    }
    }
  • 打印日志如下所示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //第一种情况
    测试volatile数据3--false
    测试volatile数据1--1
    测试volatile数据2--true

    //第二种情况
    测试volatile数据1--1
    测试volatile数据3--false
    测试volatile数据2--true

    测试volatile数据1--1
    测试volatile数据2--true
    测试volatile数据3--false
  • 上面的实例代码对应的happens-before关系如下图所示:

  • 分析上面代码执行过程

    • 加锁线程A先执行writer方法,然后线程B执行reader.方法图中每一个箭头两个节点就代码一个happens-before关系
      • 黑色的代表根据程序顺序规则推导出来,
      • 红色的是根据volatile变量的写happens-before 于任意后续对volatile变量的读
      • 而蓝色的就是根据传递性规则推导出来的。
    • 这里的2 happen-before 3,同样根据happens-before规则定义:如果A happens-before B,则A的执行结果对B可见,并且A的执行顺序先于B的执行顺序,我们可以知道操作2执行结果对操作3来说是可见的,也就是说当线程A将volatile变量 flag更改为true后线程B就能够迅速感知。

volatile的内存语义

执行代码状态图

  • 还是按照两个核心的分析方式,分析完happens-before关系后我们现在就来进一步分析volatile的内存语义。还是以上面的代码为例,假设线程A先执行writer方法,线程B随后执行reader方法,初始时线程的本地内存中flag和a都是初始状态,下图是线程A执行volatile写后的状态图。

    • 线程A执行volatile写后的内存状态图

  • 当volatile变量写后,线程中本地内存中共享变量就会置为失效的状态,因此线程B再需要读取从主内存中去读取该变量的最新值。下图就展示了线程B读取同一个volatile变量的内存变化示意图。

    • 线程B读volatile后的内存状态图

  • 结果分析

    • 从横向来看,线程A和线程B之间进行了一次通信,线程A在写volatile变量时,实际上就像是给B发送了一个消息告诉线程B你现在的值都是旧的了,然后线程B读这个volatile变量时就像是接收了线程A刚刚发送的消息。既然是旧的了,那线程B该怎么办了?自然而然就只能去主内存去取啦。

volatile的内存语义实现

  • 为了性能优化,JMM在不改变正确语义的前提下,会允许编译器和处理器对指令序列进行重排序,那如果想阻止重排序要怎么办了?答案是可以添加内存屏障。

内存屏障

  • JMM内存屏障分为四类见下图,
  • java编译器会在生成指令系列时在适当的位置会插入内存屏障指令来禁止特定类型的处理器重排序。为了实现volatile的内存语义,JMM会限制特定类型的编译器和处理器重排序,JMM会针对编译器制定volatile重排序规则表:
  • “NO”表示禁止重排序。为了实现volatile内存语义时,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎是不可能的,为此,JMM采取了保守策略:
    • 1.在每个volatile写操作的前面插入一个StoreStore屏障;
    • 2.在每个volatile写操作的后面插入一个StoreLoad屏障;
    • 3.在每个volatile读操作的后面插入一个LoadLoad屏障;
    • 4.在每个volatile读操作的后面插入一个LoadStore屏障。
  • 需要注意的是:volatile写是在前面和后面分别插入内存屏障,而volatile读操作是在后面插入两个内存屏障
    • StoreStore屏障:禁止上面的普通写和下面的volatile写重排序;
    • StoreLoad屏障:防止上面的volatile写与下面可能有的volatile读/写重排序
    • LoadLoad屏障:禁止下面所有的普通读操作和上面的volatile读重排序
    • LoadStore屏障:禁止下面所有的普通写操作和上面的volatile读重排序
  • 下面以两个示意图进行理解,图片摘自相当好的一本书《java并发编程的艺术》。
    • volatile写插入内存屏障示意图
    • volatile读插入内存屏障示意图

案例分析

  • 代码如下所示

    • 注意不同点,现在已经将isOver设置成了volatile变量,这样在main线程中将isOver改为了true后,thread的工作内存该变量值就会失效,从而需要再次从主内存中读取该值,现在能够读出isOver最新值为true从而能够结束在thread里的死循环,从而能够顺利停止掉thread线程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private static volatile boolean isOver = false;
    private void test4(){
    Thread thread = new Thread(new Runnable() {
    @Override
    public void run() {
    while (!isOver) {
    LogUtils.e("测试volatile数据"+isOver);
    }
    }
    });
    thread.start();
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    isOver = true;
    }

volatile的应用场景

  • 使用volatile需要具备哪些条件

    • synchronized关键字是防止多个线程同时执行一段代码,那么就会很影响程序执行效率,而volatile关键字在某些情况下性能要优于synchronized,但是要注意volatile关键字是无法替代synchronized关键字的,因为volatile关键字无法保证操作的原子性。通常来说,使用volatile必须具备以下2个条件:
      • 1)对变量的写操作不依赖于当前值
      • 2)该变量没有包含在具有其他变量的不变式中
  • 下面列举几个Java中使用volatile的几个场景。

    • ①.状态标记量

      • 根据状态标记,终止线程。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        volatile boolean flag = false;
        //线程1
        while(!flag){
        doSomething();
        }
        //线程2
        public void setFlag() {
        flag = true;
        }
    • ②.单例模式中的double check

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      class Singleton {
      private volatile static Singleton instance = null;

      private Singleton() {

      }

      public static Singleton getInstance() {
      // 对于第二次不进入锁,优化效率
      if (instance == null) {
      synchronized (Singleton.class) {
      // 防止创建多个对象
      if (instance == null)
      instance = new Singleton();
      }
      }
      return instance;
      }
      }
  • 为什么要使用volatile 修饰instance?

    • 主要在于instance = new Singleton()这句,这并非是一个原子操作,事实上在 JVM 中这句话大概做了下面 3 件事情:
    • 1.给 instance 分配内存
    • 2.调用 Singleton 的构造函数来初始化成员变量
    • 3.将instance对象指向分配的内存空间(执行完这步 instance 就为非 null 了)。
    • 但是在 JVM 的即时编译器中存在指令重排序的优化。也就是说上面的第二步和第三步的顺序是不能保证的,最终的执行顺序可能是 1-2-3 也可能是 1-3-2。如果是后者,则在 3 执行完毕、2 未执行之前,被线程二抢占了,这时 instance 已经是非 null 了(但却没有初始化),所以线程二会直接返回 instance,然后使用,然后顺理成章地报错。

volatile深入解读

volatile保证可见性

  • 一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:

    • 1)保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
    • 2)禁止进行指令重排序。
  • 先看一段代码,假如线程1先执行,线程2后执行:

    1
    2
    3
    4
    5
    6
    7
    8
    //线程1
    boolean stop = false;
    while(!stop){
    doSomething();
    }

    //线程2
    stop = true;
  • 思考上面代码完整吗?

    • 这段代码是很典型的一段代码,很多人在中断线程时可能都会采用这种标记办法。但是事实上,这段代码会完全运行正确么?即一定会将线程中断么?不一定,也许在大多数时候,这个代码能够把线程中断,但是也有可能会导致无法中断线程(虽然这个可能性很小,但是只要一旦发生这种情况就会造成死循环了)。
  • 上面代码为何有可能导致无法中断线程?

    • 下面解释一下这段代码为何有可能导致无法中断线程。在前面已经解释过,每个线程在运行过程中都有自己的工作内存,那么线程1在运行的时候,会将stop变量的值拷贝一份放在自己的工作内存当中。
    • 那么当线程2更改了stop变量的值之后,但是还没来得及写入主存当中,线程2转去做其他事情了,那么线程1由于不知道线程2对stop变量的更改,因此还会一直循环下去。
  • 但是用volatile修饰之后就变得不一样:

    • 第一:使用volatile关键字会强制将修改的值立即写入主存
    • 第二:使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量stop的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效);
    • 第三:由于线程1的工作内存中缓存变量stop的缓存行无效,所以线程1再次读取变量stop的值时会去主存读取
    • 那么在线程2修改stop值时(当然这里包括2个操作,修改线程2工作内存中的值,然后将修改后的值写入内存),会使得线程1的工作内存中缓存变量stop的缓存行无效,然后线程1读取时,发现自己的缓存行无效,它会等待缓存行对应的主存地址被更新之后,然后去对应的主存读取最新的值。那么线程1读取到的就是最新的正确的值。

volatile不能确保原子性

  • 先来看一下下面的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public class Nothing {

    private volatile int inc = 0;
    private volatile static int count = 10;

    private void increase() {
    ++inc;
    }

    public static void main(String[] args) {
    int loop = 10;
    Nothing nothing = new Nothing();
    while (loop-- > 0) {
    nothing.operation();
    }
    }

    private void operation() {
    final Nothing test = new Nothing();
    for (int i = 0; i < 10; i++) {
    new Thread(() -> {
    for (int j = 0; j < 1000000; j++) {
    test.increase();
    }
    --count;
    }).start();
    }

    // 保证前面的线程都执行完
    while (count > 0) {

    }
    System.out.println("最后的数据为:" + test.inc);
    }

    }
    • 运行结果为:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    最后的数据为:5919956
    最后的数据为:3637231
    最后的数据为:2144549
    最后的数据为:2403538
    最后的数据为:1762639
    最后的数据为:2878721
    最后的数据为:2658645
    最后的数据为:2534078
    最后的数据为:2031751
    最后的数据为:2924506
  • 大家想一下这段程序的输出结果是多少?

    • 也许有些朋友认为是1000000。但是事实上运行它会发现每次运行结果都不一致,都是一个小于1000,0000的数字。可能有的朋友就会有疑问,不对啊,上面是对变量inc进行自增操作,由于volatile保证了可见性,那么在每个线程中对inc自增完之后,在其他线程中都能看到修改后的值啊,所以有10个线程分别进行了1000000次操作,那么最终inc的值应该是1000000*10=10000000。
  • 这里面就有一个误区

    • volatile关键字能保证可见性没有错,但是上面的程序错在没能保证原子性。可见性只能保证每次读取的是最新的值,但是volatile没办法保证对变量的操作的原子性。
    • 在前面已经提到过,自增操作是不具备原子性的,它包括读取变量的原始值、进行加1操作、写入工作内存。那么就是说自增操作的三个子操作可能会分割开执行,就有可能导致下面这种情况出现:
    • 假如某个时刻变量inc的值为10,线程1对变量进行自增操作,线程1先读取了变量inc的原始值,然后线程1被阻塞了;然后线程2对变量进行自增操作,线程2也去读取变量inc的原始值,由于线程1只是对变量inc进行读取操作,而没有对变量进行修改操作,所以不会导致线程2的工作内存中缓存变量inc的缓存行无效,也不会导致主存中的值刷新,所以线程2会直接去主存读取inc的值,发现inc的值时10,然后进行加1操作,并把11写入工作内存,最后写入主存。
    • 然后线程1接着进行加1操作,由于已经读取了inc的值,注意此时在线程1的工作内存中inc的值仍然为10,所以线程1对inc进行加1操作后inc的值为11,然后将11写入工作内存,最后写入主存。
    • 那么两个线程分别进行了一次自增操作后,inc只增加了1。根源就在这里,自增操作不是原子性操作,而且volatile也无法保证对变量的任何操作都是原子性的。解决方案:可以通过synchronized或lock,进行加锁,来保证操作的原子性。也可以通过AtomicInteger。
  • 在java 1.5的java.util.concurrent.atomic包下提供了一些原子操作类

    • 即对基本数据类型的 自增(加1操作),自减(减1操作)、以及加法操作(加一个数),减法操作(减一个数)进行了封装,保证这些操作是原子性操作。atomic是利用CAS来实现原子性操作的(Compare And Swap),CAS实际上是利用处理器提供的CMPXCHG指令实现的,而处理器执行CMPXCHG指令是一个原子性操作。

volatile保证有序性

  • 在前面提到volatile关键字能禁止指令重排序,所以volatile能在一定程度上保证有序性。

    • volatile关键字禁止指令重排序有两层意思:
    • 1)当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见;在其后面的操作肯定还没有进行
    • 2)在进行指令优化时,不能将在对volatile变量的读操作或者写操作的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。
  • 可能上面说的比较绕,举个简单的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    // x、y为非volatile变量
    // flag为volatile变量

    x = 2; //语句1
    y = 0; //语句2
    flag = true; //语句3
    x = 4; //语句4
    y = -1; //语句5
    • 由于flag变量为volatile变量,那么在进行指令重排序的过程的时候,不会将语句3放到语句1、语句2前面,也不会讲语句3放到语句4、语句5后面。但是要注意语句1和语句2的顺序、语句4和语句5的顺序是不作任何保证的。
    • 并且volatile关键字能保证,执行到语句3时,语句1和语句2必定是执行完毕了的,且语句1和语句2的执行结果对语句3、语句4、语句5是可见的。

乐观锁与悲观锁

  • 我们都知道,cpu是时分复用的,也就是把cpu的时间片,分配给不同的thread/process轮流执行,时间片与时间片之间,需要进行cpu切换,也就是会发生进程的切换。切换涉及到清空寄存器,缓存数据。然后重新加载新的thread所需数据。当一个线程被挂起时,加入到阻塞队列,在一定的时间或条件下,在通过notify(),notifyAll()唤醒回来。
  • 在某个资源不可用的时候,就将cpu让出,把当前等待线程切换为阻塞状态。等到资源(比如一个共享数据)可用了,那么就将线程唤醒,让他进入runnable状态等待cpu调度。这就是典型的悲观锁的实现。 独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的情况,认为一个线程修改共享数据的时候其他线程也会修改该数据,因此只在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。
  • 但是,由于在进程挂起和恢复执行过程中存在着很大的开销。当一个线程正在等待锁时,它不能做任何事,所以悲观锁有很大的缺点。举个例子,如果一个线程需要某个资源,但是这个资源的占用时间很短,当线程第一次抢占这个资源时,可能这个资源被占用,如果此时挂起这个线程,可能立刻就发现资源可用,然后又需要花费很长的时间重新抢占锁,时间代价就会非常的高。
  • 所以就有了乐观锁的概念,他的核心思路就是,每次不加锁而是假设修改数据之前其他线程一定不会修改,如果因为修改过产生冲突就失败就重试,直到成功为止。 在上面的例子中,某个线程可以不让出cpu,而是一直while循环,如果失败就重试,直到成功为止。所以,当数据争用不严重时,乐观锁效果更好。比如CAS就是一种乐观锁思想的应用。

CAS简单介绍

  • CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。

    • 执行CAS操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。
  • 举个CAS操作的应用场景的一个例子

    • 当一个线程需要修改共享变量的值。完成这个操作,先取出共享变量的值赋给A,然后基于A的基础进行计算,得到新值B,完了需要更新共享变量的值了,这个时候就可以调用CAS方法更新变量值了。
  • 在java中可以通过锁和循环CAS的方式来实现原子操作。

    • Java中java.util.concurrent.atomic包相关类就是 CAS的实现
  • atomic包里包括以下类:

    类名 说明
    AtomicBoolean 可以用原子方式更新的 boolean 值。
    AtomicInteger 可以用原子方式更新的 int 值。
    AtomicIntegerArray 可以用原子方式更新其元素的 int 数组。
    AtomicIntegerFieldUpdater<T> 基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。
    AtomicLong 可以用原子方式更新的 long 值。
    AtomicLongArray 可以用原子方式更新其元素的 long 数组。
    AtomicLongFieldUpdater<T> 基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。
    AtomicMarkableReference<V> AtomicMarkableReference 维护带有标记位的对象引用,可以原子方式对其进行更新。
    AtomicReference<V> 可以用原子方式更新的对象引用。
    AtomicReferenceArray<E> 可以用原子方式更新其元素的对象引用数组。
    AtomicReferenceFieldUpdater<T,V> 基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。
    AtomicStampedReference<V> AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。

CAS操作案例分析

  • 下面我们来已AtomicIneger的源码为例来看看CAS操作:

    1
    2
    3
    4
    5
    6
    7
    8
    public final int getAndAdd(int delta) {
    for (; ; ) {
    int current = get();
    int next = current + delta;
    if (compareAndSet(current, next))
    return current;
    }
    }
  • 这里很显然使用CAS操作(for(; ; )里面),他每次都从内存中读取数据,+1操作,然后两个值进行CAS操作。如果成功则返回,否则失败重试,直到修改成功为止。

  • 上面源码最关键的地方有两个,一个for循环,它代表着一种宁死不屈的精神,不成功誓不罢休。还有就是compareAndSet:

    1
    2
    3
    public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
  • compareAndSet方法内部是调用Java本地方法compareAndSwapInt来实现的,而compareAndSwapInt方法内部又是借助C来调用CPU的底层指令来保证在硬件层面上实现原子操作的。在intel处理器中,CAS是通过调用cmpxchg指令完成的。这就是我们常说的CAS操作(compare and swap)。

CAS存在的问题

  • CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作。
    • 1.ABA问题。
      • 为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
    • 2.循环时间长开销大。
      • 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
    • 3.只能保证一个共享变量的原子操作。
      • 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。