原始方式

  • 最原始的方式,当我们要并行的或者异步的执行一个任务的时候,我们会直接使用启动一个线程的方式,如下面所示:

    1
    2
    3
    4
    5
    6
    7
    new Thread(new Runnable() {

    @Override
    public void run() {
    // TODO 这里放你要执行的方法
    }
    }).start();
  • 但是像上面或者类似这种每次来都是用 new 一个 Thread 出来的方式存在着很多的弊端,如下面:

    • 每次 new Thread 新建对象性能差;
    • 线程缺乏统一的管理,可以无限制新建线程,相互之间竞争,还可能占用过多系统资源导致死机或者 OOM(Out of Memory);
    • 缺乏更多的功能,如定时执行、定期执行、线程中断等。

线程池

  • 为了解决上面的问题,Jdk1.5 之后加入了 java.util.concurrent 包,这个包中主要介绍 java 中线程以及线程池的使用。为我们在开发中处理线程的问题提供了非常大的帮助。
  • 根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。
  • 用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

为什么要用线程池

  • 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  • 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  • 提供定时执行、定期执行、单线程、并发数控制等功能。
  • 可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约 1MB 内存,线程开的越多,消耗的内存也就越大,最后死机)。

Executor 框架

  • Executor:是一个基本接口,提供方法 execute(Runnable command),该方法接收一个 Runable 实例,将提交任务的过程与执行任务的过程进行了解耦。严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。

  • ExecutorService:是一个比 Executor 使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回 Future 的方法

  • AbstractExecutorService:ExecutorService 执行方法的默认实现

  • ScheduledExecutorService:一个可定时调度任务的接口

  • ThreadPoolExecutor:线程池,可以通过调用 Executors 以下静态工厂方法来创建线程池并返回一个 ExecutorService 对象:

  • ScheduledThreadPoolExecutor:ScheduledExecutorService 的实现,一个可定时调度任务的线程池

ExecutorService 的方法

submit 方法

1
2
3
4
5
6
7
//提交一个 Runnable 的任务,该任务没有返回结果,虽然返回一个 Future 对象,但是 Future 的 get 是 null。
Future<?> submit(Runnable task)
//提交一个 Runnable 的任务,该任务可以有返回值,通过传入的 result 对象返回结果。
<T> Future<T> submit(Runnable task, T result)
//提交一个 Callable 的任务,该任务可以有返回值,因为 Callable 与 Runnable 不同,Callable 自身就可以返回结果
<T> Future<T> submit(Callable<T> task)

invokeAny 方法

  • 该方法会阻塞。批量提交任务并执行所有 callable,直到获得一个已经成功执行的任务的结果

  • 如果一个任务已经完成,剩余的 Callable 任务将被取消即使它在运行也会被取消。

  • 异常处理:

    • 如果 Callable 集合中只有部分 Callable 异常,则即使是将该异常抛出,在其调用的地方也是无法捕获异常的
    • 因为该 Callable 异常了,则会调用 Callable 集合中的下一个 Callable。
    • 如果 Callable 集合中的 Callable 全部异常,则可以在其调用的地方捕获异常的。
    1
    2
    3
    4
    5
    6
    // tasks:Callable 类型的集合
    // timeout:超时时间
    // unit:时间单位
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

invokeAll 方法

  • 该方法会阻塞。批量提交任务并获得他们的 future,Task 列表与 Future 列表一一对应

  • 异常处理

    • 当线程池中线程发送发生异常时,直接抛出,可以在其调用的方法捕获异常,
    • 但是只有在调用 Future 中的 get 才能捕获异常,否则则一样捕获不到。
    1
    2
    3
    4
    5
    // tasks:Callable 类型的集合
    // timeout:超时时间
    // unit:时间单位
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

awaitTermination 方法

  • 是阻塞方法。等待一定时间后如果有任务未结束则强行关闭

    1
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    1
    5、isShutdown 方法
    判断该 Executor 是否关闭,调用 shutdown 后会变为 true,即使 Executor 还没有关闭。

shutdown 方法

  • 是非阻塞方法。不再接受新的任务,等待所有任务执行完毕后关闭 Excutor

    1
    void shutdown();

shutdownNow 方法

  • 它会对正在执行的线程进行 interrupt,也有可能存在 interrupt 不成功的现象,如该线程中没有 sleep、wait、join 等即没有抛出中断异常的方法,则可能会出现中断失败

  • 将空闲线程取消

  • 会返回未执行的任务列表

    1
    List<Runnable> shutdownNow()

isTerminated 方法

  • 是非阻塞方法。判断是否 Executor 是否已终结,调用 shutdown 后该方法并不会立即返回 true,只有等到所有任务都结束,Executor 真正结束时才返回 true

    1
    boolean isTerminated()

ScheduledExecutorService

  • ScheduledExecutorService 是基于 ExecutorService 的功能实现的延迟和周期执行任务的功能。每个任务以及每个任务的每个周期都会提交到线程池中由线程去执行,所以任务在不同周期内执行它的线程可能是不同的。
  • ScheduledExecutorService 接口的默认实现类是 ScheduledThreadPoolExecutor。在周期执行的任务中,如果任务执行时间大于周期时间,则会以任务时间优先,等任务执行完毕后才会进入下一次周期

捕获异常

uncaughtExceptionHandler

  • 弊端:捕获异常时无法得知实体任务中的属性。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class ExecutorServiceExample {

    public static void main(String[] args) {
    //创建一个 core 和最大线程数都为 5,空闲时间为 0 的线程池,并且指定一个我们自己的 ThreadFactory
    ExecutorService executorService = Executors.newFixedThreadPool(5,new ThreadFactory());
    IntStream.rangeClosed(1,5).boxed().forEach(i->{
    executorService.execute(()->{
    System.out.println(1/0);
    });
    });
    }
    private static class ThreadFactory implements java.util.concurrent.ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
    Thread t = new Thread(r);
    //对线程中的异常进行捕获,但这种情况并不能得知线程任务中的属性
    t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
    System.out.println(t.getName()+": 发生了异常");
    e.printStackTrace();
    }
    });
    return t;
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    Thread-0: 发生了异常
    Thread-4: 发生了异常
    Thread-3: 发生了异常
    Thread-2: 发生了异常
    Thread-1: 发生了异常
    java.lang.ArithmeticException: / by zero
    at pers.fulsun.Test.lambda$main$0(Test.java:21)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    java.lang.ArithmeticException: / by zero
    at pers.fulsun.Test.lambda$main$0(Test.java:21)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    java.lang.ArithmeticException: / by zero
    at pers.fulsun.Test.lambda$main$0(Test.java:21)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    java.lang.ArithmeticException: / by zero
    at pers.fulsun.Test.lambda$main$0(Test.java:21)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    java.lang.ArithmeticException: / by zero
    at pers.fulsun.Test.lambda$main$0(Test.java:21)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Runnable 接口类处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ExecutorServiceExample {

public static void main(String[] args) {
//创建一个 core 和最大线程数都为 5 的线程池,且该线程池的空闲时间为 0
ExecutorService executorService = Executors.newFixedThreadPool(5);
IntStream.rangeClosed(1,5).boxed().forEach(i->{
executorService.execute(new MyTask(i) {
@Override
protected void doError(Throwable cause) {
System.out.println(Thread.currentThread().getName()+": 发生了异常, 编号: "+super.no);
cause.printStackTrace();
}
});
});
}
private abstract static class MyTask implements Runnable{
private int no;

public MyTask(int no) {
this.no = no;
}

@Override
public void run() {
try {
//模拟做事情
TimeUnit.SECONDS.sleep(1);
System.out.println("编号:"+no+" 准备开始工作");
//模拟做到一半时部分出现了异常
if(no%3==0){
int num = 1/0;
}
System.out.println("编号:"+no+" 完成工作");
} catch (Exception e) {
doError(e);
}
}
//交由外部实现
protected abstract void doError(Throwable cause);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
编号:4 准备开始工作
编号:4 完成工作
编号:5 准备开始工作
编号:5 完成工作
编号:1 准备开始工作
编号:1 完成工作
编号:3 准备开始工作
编号:2 准备开始工作
编号:2 完成工作
pool-1-thread-3: 发生了异常, 编号: 3
java.lang.ArithmeticException: / by zero
at pers.fulsun.Test$MyTask.run(Test.java:45)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

Executors 静态工厂

  • 我们不可能通过创建 ThreadPoolExecutor 的实例来配置线程池,而 Java 中为我们提供了更加便捷的方法,就是 Executors 静态工厂。在 Executors 类里面提供了一些静态工厂,生成一些常用的线程池。

  • 严格意义上讲 Executors 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是 ExecutorService。下面我们就逐个讲解 Executors 中创建不同线程池的方法。

    newSingleThreadExecutor 单线程的线程池 这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
    newFixedThreadPool 固定大小的线程池 每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
    newCachedThreadPool 可缓存的线程池 如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。
    newScheduledThreadPool 大小无限的线程池 此线程池支持定时以及周期性执行任务的需求。

newCachedThreadPool

  • 特点: 创建一个可缓存线程池,线程池初始的线程数为 0,使用 SynchronousQueue 队列存储任务,这个队列只能存储一个任务,该线程池的最大线程数为 Integer 的最大值。因为队列中只能存储一个任务,所以当队列处于饱和状态时,该线程池就会创建新的线程。空闲的线程会在 60 秒后关闭,因为线程的 coreThread 为 0,当空闲线程销毁为 0 的时候该线程池就会关闭。

  • 大概是如下几个特点:

    • 初始化线程大小为 0
    • 使用的是 SynchronousQueue 队列,只能存储一个任务
    • 最大线程数为 Integer 的最大值
    • 60 秒后将空闲线程关闭后,当线程池内线程为 0 时会自动关闭线程池
  • 适用场景

    • 只适合短暂的任务,如果是非常耗时的任务不建议使用,因为每次提交一个任务都会开启一个线程
    • 用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。
  • 演示案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import java.util.concurrent.*;
    import java.util.stream.IntStream;

    public class ExecutorsExample {

    public static void main(String[] args) throws InterruptedException {
    useCachedThreadPool();
    }

    private static void useCachedThreadPool() throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    //查看线程池中的线程数
    System.out.println("poolSize: "+((ThreadPoolExecutor)executorService).getPoolSize() );
    IntStream.rangeClosed(1,10).forEach(i->executorService.execute(()->{
    try {
    //休眠 10 秒
    TimeUnit.SECONDS.sleep(10);
    System.out.println(Thread.currentThread().getName()+" ["+i+"]");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }));
    //休眠 100 毫秒,确保任务全部提交进去
    TimeUnit.MILLISECONDS.sleep(100);
    //查看线程池中的线程数
    System.out.println("poolSize: "+((ThreadPoolExecutor)executorService).getPoolSize() );
    }
    }

  • 运行结果:60 秒后当线程减为 0 时,线程池关闭,程序关闭。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    poolSize: 0
    poolSize: 10
    pool-1-thread-5 [5]
    pool-1-thread-6 [6]
    pool-1-thread-4 [4]
    pool-1-thread-9 [9]
    pool-1-thread-10 [10]
    pool-1-thread-8 [8]
    pool-1-thread-7 [7]
    pool-1-thread-1 [1]
    pool-1-thread-3 [3]
    pool-1-thread-2 [2]
    Process finished with exit code 0

newFixedThreadPool

  • 特点: 该线程池的线程数在创建时就被固定了,不可以扩大。线程池中的每个线程都是处于活动状态,该线程池会一直存在,直到调用 shutdown。该线程池试用的是 LinkedBlockingQueue 队列,最大任务数为 Integer 的最大值。该线程池的空闲时间为 0,因为不可以扩大线程数,所以也就不存在回收线程。

  • 大概是如下几个特点:

    • 初始化时该线程池中的线程数就定了,是固定的,不可以扩大。
    • 使用的是 LinkedBlockingQueue 队列,最大数为 Integer 最大值
    • 该线程池的空闲线程空闲时间为 0,因为不可以扩大线程数,所以也就不存在回收线程
    • 当线程处于空闲,且任务队列为空时,线程池也不会关闭
  • 适用场景

    • 主要适用于固定大小的线程池,因为它是无界的阻塞队列,那么线程池中的线程不会扩大,适用与可以预测线程数的场景中
    • 或者服务器的负载很高,需要对线程数量进行严格控制的场景中
  • 演示案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class ExecutorsExample {

    public static void main(String[] args) throws InterruptedException {
    useFixedThreadPool();
    }

    private static void useFixedThreadPool() throws InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    //查看线程池中的线程数,默认情况下,只有当线程池接受到任务时才开始创建线程,所以这边输出0
    System.out.println("poolSize: "+((ThreadPoolExecutor)executorService).getPoolSize() );
    IntStream.rangeClosed(1,10).forEach(i->executorService.execute(()->{
    try {
    //休眠10秒
    TimeUnit.SECONDS.sleep(10);
    System.out.println(Thread.currentThread().getName()+" ["+i+"]");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }));
    //休眠100毫秒,确保任务全部提交进去
    TimeUnit.MILLISECONDS.sleep(100);
    //查看线程池中的线程数
    System.out.println("poolSize: "+((ThreadPoolExecutor)executorService).getPoolSize() );
    }
    }
  • 运行结果:只能有 5 个线程执行任务,后续的任务只能等待前面的任务执行完,该线程池不会自动关闭。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    poolSize: 0
    poolSize: 5
    pool-1-thread-5 [5]
    pool-1-thread-2 [2]
    pool-1-thread-3 [3]
    pool-1-thread-4 [4]
    pool-1-thread-1 [1]
    pool-1-thread-2 [7]
    pool-1-thread-4 [9]
    pool-1-thread-3 [8]
    pool-1-thread-5 [6]
    pool-1-thread-1 [10]

newSingleThreadExecutor

  • 特点:该线程池只会有一个线程处于活动状态,所以任务只能一个一个被执行。其实内部还是使用的 newFixedThreadPool,只不过现在了只有一个线程,与 newFixedThreadPool(1)不同是不能重新配置加入线程,因为它返回的是一个代理的包装过的 ExecutorService。该线程池能够保证任务的执行顺序,先提交的先执行。如果在执行期间线程出现异常,那么会创建一个新的线程替换。SingleThreadExecutor 与单独 new 出来的线程区别在于 new 出来的 Thread 在任务结束之后也就会销毁,而且也不可以 submit 提交任务到队列。

  • 总结有如下几点:

    • 其实内部还是使用的 FixedThreadPool,只不过限制了只有 1 个线程而已
    • 该线程池里的任务只能被一个一个执行
    • newSingleThreadExecutor 返回的是一个经过代理的 ExecutorService,不能转换为 ThreadPoolExecutor,这也就意味着它只有一些 ExecutorService 的基本方法
    • 能够保证任务的执行顺序
    • 当线程出现异常时会重新创建一个线程替换 SingleThreadExecutor
    • 与单独 new 出来的 Thread 区别在于,单独 new 出来的 Thread 任务结束之后线程也就会随着结束,而且不可以 submit 提交任务到队列
  • 适用场景

    • 适用于需要保证任务被按顺序执行,并且在任何时候都不会出现多个线程的情况。
  • 演示案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class ExecutorsExample {

    public static void main(String[] args) throws InterruptedException {
    useSingleThreadPool();
    }

    private static void useSingleThreadPool() throws InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    //因为不能转换为 ThreadPoolExecutor,所以也就没有 getPoolSize 等一些方法
    //System.out.println("poolSize: "+((ThreadPoolExecutor)executorService).getPoolSize() );
    IntStream.rangeClosed(1,10).forEach(i->executorService.execute(()->{
    try {
    //休眠 10 秒
    TimeUnit.SECONDS.sleep(10);
    System.out.println(Thread.currentThread().getName()+" ["+i+"]");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }));
    }
    }
  • 运行结果:按顺序一个一个执行,并且在执行所有任务后该线程池不会自动关闭。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    pool-1-thread-1 [1]
    pool-1-thread-1 [2]
    pool-1-thread-1 [3]
    pool-1-thread-1 [4]
    pool-1-thread-1 [5]
    pool-1-thread-1 [6]
    pool-1-thread-1 [7]
    pool-1-thread-1 [8]
    pool-1-thread-1 [9]
    pool-1-thread-1 [10]

newWorkStealingPool

  • 特点: 该方法会根据你的 CPU 核数来创建线程个数,也可指定线程数, newWorkStealingPool 其内部使用的是 ForkJoinPool,在任务全部执行完之后该线程池会自动关闭

  • 适用场景

    • 创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 数量的线程来并行执行,适用于大耗时的操作,可以并行来执行
  • 演示案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public static void main(String[] args) throws InterruptedException {
    //我的 CPU 核数是 4,所以创建了 4 个线程
    ExecutorService executorService = Executors.newWorkStealingPool();

    //生成 10 个 callable 任务
    List<Callable<String>> callableList = IntStream.rangeClosed(1, 10).boxed().map(i -> (Callable<String>) () -> {
    System.out.println("Thread " + Thread.currentThread().getName());
    TimeUnit.SECONDS.sleep(2);
    return "Task" + i;
    }).collect(Collectors.toList());

    //执行 list 集合中所有的 callable 并进行阻塞
    List<Future<String>> futureList = executorService.invokeAll(callableList);
    //获取 callable 执行完的返回值
    for (Future<String> future:futureList) {
    try {
    //进行阻塞
    String result = future.get();
    System.out.println(result);
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }
    }
  • 运行结果:执行完成后会自动关闭线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    Thread ForkJoinPool-1-worker-3
    Thread ForkJoinPool-1-worker-5
    Thread ForkJoinPool-1-worker-7
    Thread ForkJoinPool-1-worker-1
    Thread ForkJoinPool-1-worker-5
    Thread ForkJoinPool-1-worker-3
    Thread ForkJoinPool-1-worker-7
    Thread ForkJoinPool-1-worker-1
    Thread ForkJoinPool-1-worker-7
    Thread ForkJoinPool-1-worker-3
    Task1
    Task2
    Task3
    Task4
    Task5
    Task6
    Task7
    Task8
    Task9
    Task10

    Process finished with exit code 0

ThreadPoolExecutor

  • 由于 ThreadPoolExecutor 是继承了 AbstractExecutorService 抽象类,而 AbstractExecutorService 抽象类又实现了 ExecutorService 接口,所以有很多方法都是来自 ExecutorService 接口\

    图 2 ThreadPoolExecutor 运行流程

构造方法

  • 4 种构造方法

    1
    2
    3
    4
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
  • 构造方法中参数详解

    • corePoolSize
      线程池的基本大小,线程池最小就有这么多线程。线程池刚创建时并不会立即就创建 corePoolSize 个线程,而是在等待有任务过来才会创建线程,而且是一个一个创建的,当所有任务执行完后 core 的线程也不会被回收。

    • maximumPoolSize
      线程池中最大的线程数,当 workQueue 队列中的任务放不下的时候就会创建新线程,直至达到 maximumPoolSize 的值。无界的任务队列这个参数就没用了

    • keepAliveTime
      线程最大空闲时间,如果在空闲时间内,任务队列没有饱和的话就会销毁除基本线程之外的线程。

    • unit
      keepAliveTime 的时间单位。可选的单位有 Days、HOURS、MINUTES、MILLISECONDS、MICROSECONDS、NANOSECONDS。

    • workQueue
      用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:

      • ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列,按 FIFO 原则进行排序
      • LinkedBlockingQueue: 一个基于链表结构的阻塞队列,吞吐量高于 ArrayBlockingQueue。静态工厂方法 Excutors.newFixedThreadPool()使用了这个队列
      • SynchronousQueue: 一个不存储元素的阻塞队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。如果线程获取的元素时该队列不存在任何元素,则该线程会被阻塞,直至有元素插入。吞吐量高于 LinkedBlockingQueue,静态工厂方法 Excutors.newCachedThreadPool()使用了这个队列
      • PriorityBlockingQueue: 一个具有优先级的无限阻塞队列。
    • threadFactory
      线程创建工厂,通常可以用来给线程命名、查看创建线程数、给线程设置是否是后台运行、设置线程优先级等等

    • handler
      拒绝策略。当队列中的元素放满并且线程池中的线程达到最大数量时,此时线程池处于饱和状态。此时就需要做出相应的策略应对,有如下四个选项:

      • AbortPolicy:默认策略,抛出异常
      • CallerRunsPolicy:使用调用者所在线程来运行该任务,不抛出异常
      • DiscardOldestPolicy:丢弃队列里最近的一个任务,然后再添加到队列中,不抛出异常
      • DiscardPolicy:直接忽略提交的任务

其他基本方法

execute 方法

  • 非阻塞方法。提交 Runnable 任务,没有返回值。

    1
    void execute(Runnable command)

getCorePoolSize 方法

  • 获取当前线程池基本线程的大小

    1
    int getCorePoolSize()

getMaximumPoolSize 方法

  • 获取当前线程池允许的最大线程数

    1
    int getMaximumPoolSize()

getQueue 方法

  • 获取任务队列,返回的是可修改的队列,我们可以直接添加任务。

  • 如果线程池内有空闲线程则会执行直接添加的任务,如果线程池是刚创建的 core 线程还没有创建则不会执行,因为直接添加的任务,线程池是收不到创建线程的信号的。

    1
    BlockingQueue<Runnable> getQueue()

getPoolSize 方法

  • 获取线程池当前的线程数

    1
    int getPoolSize()

getActiveCount 方法

  • 获取线程池当前处于活跃状态的线程数

    1
    int getActiveCount()

isTerminating 方法

  • 判断 Executor 是否正在关闭

    1
    boolean isTerminating()

allowCoreThreadTimeOut 方法

  • 设置允许将 core 线程在空闲的时候关闭,如果是用 newFixedThreadPool 创建的线程池,需要搭配 setKeepAliveTime 方法这种允许空闲的时间,因为 newFixedThreadPool 中的默认空闲时间为 0。注意当 core 的线程数关闭为 0 时,线程池会关闭。

    1
    2
    // value:是否开启
    void allowCoreThreadTimeOut(boolean value)

setKeepAliveTime 方法

  • 设置允许线程的空闲时间

    1
    2
    3
    time:时间
    unit:时间单位
    void setKeepAliveTime(long time, TimeUnit unit)

remove 方法

  • 移除队列中还未执行的任务

    1
    2
    // task:需要移除的任务
    boolean remove(Runnable task)

prestartCoreThread 方法

  • 创建线程池后立即预创建一个 Core 线程,当有任务提交时会再创建一个新线程留作备用,直到线程数到达 core 线程数。返回是否创建成功。

    1
    boolean prestartCoreThread()

prestartAllCoreThreads 方法

  • 创建线程池后立即预创建所有 Core 线程,返回创建的线程数

    1
    int prestartAllCoreThreads()

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ThreadPoolExecutorExample {

public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue(1), r -> {
return new Thread(r);
}, new ThreadPoolExecutor.AbortPolicy());

threadPoolExecutor.submit(() -> sleepSeconds(10));
threadPoolExecutor.submit(() -> sleepSeconds(10));
threadPoolExecutor.submit(() -> sleepSeconds(10));

int corePoolSize = -1;
int maxPoolSize = -1;
int activeCount = -1;
int poolSize = -1;
int queueSize = -1;
while (true) {
int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();
int currentMaxPoolSize = threadPoolExecutor.getMaximumPoolSize();
int currentActiveCount = threadPoolExecutor.getActiveCount();
int currentPoolSize = threadPoolExecutor.getPoolSize();
int currentQueueSize = threadPoolExecutor.getQueue().size();
if (corePoolSize != currentCorePoolSize || maxPoolSize != currentMaxPoolSize
|| activeCount != currentActiveCount || queueSize != currentQueueSize || poolSize != currentPoolSize) {
System.out.println("CorePoolSize: " + currentCorePoolSize);
System.out.println("MaximumPoolSize: " + currentMaxPoolSize);
System.out.println("ActiveCount: " + currentActiveCount);
System.out.println("PoolSize: " + currentPoolSize);
System.out.println("BlockingQueueSize: " + currentQueueSize);
corePoolSize = currentCorePoolSize;
maxPoolSize = currentMaxPoolSize;
activeCount = currentActiveCount;
queueSize = currentQueueSize;
poolSize = currentPoolSize;
System.out.println("==============================================");
TimeUnit.MILLISECONDS.sleep(100);
}
}
}

private static void sleepSeconds(int seconds) {
try {
System.out.println("** " + Thread.currentThread().getName() + " **");
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
** Thread-0 **
** Thread-1 **
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 2
PoolSize: 2
BlockingQueueSize: 1
==============================================
** Thread-1 **
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 1
PoolSize: 2
BlockingQueueSize: 0
==============================================
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 0
PoolSize: 2
BlockingQueueSize: 0
==============================================
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 0
PoolSize: 1
BlockingQueueSize: 0
==============================================
————————————————
版权声明:本文为CSDN博主「Brycen Liu」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/liuyu973971883/article/details/108003581

线程池关闭

  • 当我们的任务全部并行执行完关闭线程池后,如何串行执行我们后续代码。

    • shutdown 不是阻塞的,并行任务还没执行完我们的串行代码就被执行了。
    • 可以使用 awaitTermination 结合 shutdown 的方法,因为 awaitTermination 是阻塞的。
    • shutdown() 关闭任务,awaitTermination 关闭执行器(executor)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class ThreadPoolExecutorExample2 {
    public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,
    new ArrayBlockingQueue(10), r -> {
    return new Thread(r);
    }, new ThreadPoolExecutor.AbortPolicy());

    //提交 20 个任务
    IntStream.rangeClosed(1, 20).forEach(i->{
    threadPoolExecutor.submit(() -> task(5,String.valueOf(i)));
    });

    threadPoolExecutor.shutdown();
    //阻塞,等待线程池关闭
    threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
    //下面可以进行串行执行代码
    System.out.println("========all work over========");

    }

    private static void task(int seconds,String no) {
    try {
    System.out.println(Thread.currentThread().getName()+" ["+no+"] start work");
    TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    }

线程池关闭 2

  • shutdownNow 方法会立即关闭线程池,但是也会存在关闭不了的情况,因为 shutdownNow 中其实是对正在执行任务的线程进行 interrupt 打断,如果该任务中没有抛出 interrupt 异常的方法,则该线程就不会被打断,也就结束不了

    • 如 BufferedReader 的 readLine 方法,同步 IO 中的阻塞方法没有对中断响应做任何处理。
      因此,调用 shutdown 之后,只会把工作线程的中断标志置为 true ,并不会产生实际的效果。
    • 同样的还有内置锁获取时的阻塞,也会产生这样的问题。
  • 这样的场景如:网络请求一个数据,这个数据非常庞大,耗时非常久,那么 interrupt 就不会使该线程中断,也就不会立即结束。

  • 错误演示: 虽然最后会显示未执行的任务数,但是线程池并未关闭。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class ThreadPoolExecutorExample3 {
    public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,
    new ArrayBlockingQueue(10), r -> {
    return new Thread(r);
    }, new ThreadPoolExecutor.AbortPolicy());

    // 提交 20 个任务
    IntStream.rangeClosed(1, 20).forEach(i -> {
    threadPoolExecutor.submit(() -> task(5, String.valueOf(i)));
    });

    List<Runnable> noRunTask = threadPoolExecutor.shutdownNow();
    // 下面可以进行串行执行代码
    System.out.println("未执行的任务数:" + noRunTask.size());
    }

    private static void task(int seconds, String no) {
    System.out.println(Thread.currentThread().getName() + " [" + no + "] start work");
    // 模拟网络请求
    while (true) {
    }
    }
    }
  • 正确演示

    • 利用 ThreadFactory,将线程池创建的线程都设置为守护线程,这样当主线程挂掉之后,这样线程也就会跟着结束了
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class ThreadPoolExecutorExample3 {
    public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,
    new ArrayBlockingQueue(10), r -> {
    Thread t = new Thread(r);
    //设置为守护线程
    t.setDaemon(true);
    return t;
    }, new ThreadPoolExecutor.AbortPolicy());

    // 提交 20 个任务
    IntStream.rangeClosed(1, 20).forEach(i -> {
    threadPoolExecutor.submit(() -> task(5, String.valueOf(i)));
    });

    List<Runnable> noRunTask = threadPoolExecutor.shutdownNow();
    // 下面可以进行串行执行代码
    System.out.println("未执行的任务数:" + noRunTask.size());
    }

    private static void task(int seconds, String no) {
    System.out.println(Thread.currentThread().getName() + " [" + no + "] start work");
    // 模拟网络请求
    while (true) {
    }
    }
    }

关闭线程池中的 core 线程

  • 线程池中的 core 线程是基本线程,默认是不会关闭的,只有当任务队列任务饱和时创建的额外线程才会在规定空闲时间之后关

  • 那么我们如何让 core 线程也在规定空闲时间后关闭呢,下面看演示。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public class ThreadPoolExecutorExample2 {

    public static void main(String[] args) throws InterruptedException {
    //创建一个 core 和最大线程数都为 5 的线程池,且该线程池的空闲时间为 0
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    //因为 newFixedThreadPool 默认创建的线程池的空闲时间为 0,所以这边我们要设置空闲时间
    threadPoolExecutor.setKeepAliveTime(5,TimeUnit.SECONDS);
    //允许关闭 core 空闲线程
    threadPoolExecutor.allowCoreThreadTimeOut(true);

    //在没有接受到任务前,线程池内线程数为 0
    System.out.println(threadPoolExecutor.getPoolSize());
    threadPoolExecutor.execute(() -> sleepSeconds(1));
    threadPoolExecutor.execute(() -> sleepSeconds(1));
    //休眠一下,确保任务被提交进入线程池
    TimeUnit.MILLISECONDS.sleep(500);
    //此时因为提交了两个任务,所以线程池启动了两个线程
    System.out.println(threadPoolExecutor.getPoolSize());
    //休眠一下,查看线程池中剩余线程数
    TimeUnit.SECONDS.sleep(7);
    System.out.println(threadPoolExecutor.getPoolSize());
    }

    private static void sleepSeconds(int seconds) {
    try {
    System.out.println("** " + Thread.currentThread().getName() + " **");
    TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
  • 运行结果: core 线程数减为 0 时,线程池自动关闭

    1
    2
    3
    4
    5
    6
    7
    0
    ** Thread-0 **
    ** Thread-1 **
    2
    0

    Process finished with exit code 0

提前创建线程池中的线程

  • 在默认情况下,线程池刚创建的时候线程数是 0,只有在接受到任务的时候才会去创建 core 线程,
  • 那么下面将演示如何让线程池提前创建 core 线程。

prestartCoreThread

  • prestartCoreThread 该方法会使线程池刚创建的时候就会预创建一个线程留作备用,当有一个任务提交后会随即再创建一个线程去备用,直至线程数达到 core 的线程数为止。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class ThreadPoolExecutorExample3 {

    public static void main(String[] args) throws InterruptedException {
    //利用 Executors 工厂创建一个 core 线程和最大线程数都为 5 且空闲时间为 0 的线程池
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);

    //提前创建一个线程,如果有任务提交,那么会再创建一个线程,保证总有一个 core 线程是空闲的,最大为 core 线程数,达到 core 线程数后就不会在预创建了。
    threadPoolExecutor.prestartCoreThread();
    //这边输出 1,因为我们预创建了一个线程
    System.out.println(threadPoolExecutor.getPoolSize());
    //提交一个任务
    threadPoolExecutor.execute(() -> sleepSeconds(1));
    TimeUnit.MILLISECONDS.sleep(500);
    //此时 core 线程池中的线程数为 2,因为我们总是预创建了一个线程
    System.out.println(threadPoolExecutor.getPoolSize());
    }

    private static void sleepSeconds(int seconds) {
    try {
    System.out.println("** " + Thread.currentThread().getName() + " **");
    TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
  • 运行结果:

    1
    2
    3
    1
    ** pool-1-thread-2 **
    2

prestartAllCoreThreads

  • 预创建所有 core 线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class ThreadPoolExecutorExample4 {

    public static void main(String[] args) throws InterruptedException {
    //利用 Executors 工厂创建一个 core 线程和最大线程数都为 5 且空闲时间为 0 的线程池
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    //提前创建所有 core 线程
    threadPoolExecutor.prestartAllCoreThreads();
    //这边输出 5,因为我们预创建了所有 core 线程
    System.out.println(threadPoolExecutor.getPoolSize());
    }
    }
  • 运行结果: 5

ScheduledThreadPoolExecutor

  • 因为 ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 类,所以有很多方法都是来自 ThreadPoolExecutor 类的,这里就不做解释了

构造方法

  • 构造一个 Schedule 线程池,最大线程数为 Integer 的最大值,线程的空闲时间为 0,队列采用的是 DelayedWorkQueue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * corePoolSize:线程池核心线程数
    * threadFactory:线程工厂
    * handler:任务拒绝策略
    * /
    ScheduledThreadPoolExecutor(int corePoolSize)
    ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
    ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
    ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)

schedule 方法

  • 延时执行 runnable 或者 callable 任务。执行 runnable 任务时是没有结果返回的,那为什么还会返回 ScheduledFuture,因为我们可以通过 Future 做一些取消任务等操作。

    1
    2
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

scheduleAtFixedRate 方法

  • 固定周期性执行任务,当任务的执行时长大于周期,那么下一个周期任务将在上一个执行完毕之后马上执行。

    1
    2
    3
    4
    5
    6
    7
    /**
    * command:runnable 任务
    * initialDelay:任务首次执行前的延迟时间
    * period:周期时间
    * unit:时间单位
    * /
    ScheduledFuture <?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

scheduleWithFixedDelay 方法

  • 固定延时执行任务,也是周期性任务,和 scheduleAtFixedRate 不同的是:scheduleAtFixedRate 当任务执行时间小于周期时间时,此时周期时间到了的时候会进入下一周期,如果任务执行时间大于周期时间时,任务结束后会立即进入下一周期;

  • 而 scheduleWithFixedDelay 是无论你任务时间是否超过,都将会在你任务执行完毕后延迟固定秒数,才会进入下一周期。

    1
    2
    3
    4
    5
    6
    7
    /**
    * command:runnable 任务
    * initialDelay:任务首次执行前的延迟时间
    * delay:延时时间
    * unit:时间单位
    * /
    ScheduledFuture <?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

setContinueExistingPeriodicTasksAfterShutdownPolicy

  • 默认为 false。在线程池执行 shutdown 方法后是否继续执行 scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法提交的任务

    1
    void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)

setExecuteExistingDelayedTasksAfterShutdownPolicy

  • 默认为 true,在线程池执行 shutdown 方法后,需要等待当前正在等待的任务的和正在运行的任务被执行完,然后进程被销毁。

  • 为 false 时,表示放弃等待的任务,正在运行的任务一旦完成,则进程被销毁。

    1
    void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)

schedule 练习

  • schedule 的使用外,还包含了如何取消任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public class ScheduledExecutorServiceExample {
    public static void main(String[] args) {
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
    //2 秒后执行 runnable 任务
    scheduledThreadPoolExecutor.schedule(() -> {
    System.out.println("This is runable1 task");
    }, 2, TimeUnit.SECONDS);

    //提交一个 2 秒后才执行的 runnable 任务
    //既然 runnable 无法返回结果, 为什么还要有 Future 呢, 因为我们可以通过 Future 进行取消任务等操作
    ScheduledFuture<?> runnableFuture = scheduledThreadPoolExecutor.schedule(() -> {
    System.out.println("This is runable2 task");
    }, 2, TimeUnit.SECONDS);
    //取消任务
    runnableFuture.cancel(true);

    //休眠 3 秒, 确保上面的任务都被执行完
    mySleep(3);
    System.out.println("========================");
    }
    private static void mySleep(int seconds){
    try {
    TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    This is runable1 task
    ========================

scheduleAtFixedRate 练习

  • 周期性执行某个任务,执行到一定之间后取消任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class ScheduledExecutorServiceExample2 {
    public static void main(String[] args) {
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
    //提交延迟 1 秒执行, 周期为 2 秒的 runnable 任务, 虽然 runnable 没有返回结果, 但是可以通过 runnable 取消任务
    ScheduledFuture<?> runnableFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
    System.out.println("This is runable task running "+Thread.currentThread().getName());
    }, 1,2, TimeUnit.SECONDS);

    //休眠 8 秒
    mySleep(8);
    //取消该循坏任务
    runnableFuture.cancel(true);
    }
    private static void mySleep(int seconds){
    try {
    TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
  • 可以看出每个周期执行的任务并不是同一个线程,周期时间到的时候只是将任务扔到线程池的任务队列中由空闲线程获取它的执行权。

    1
    2
    3
    4
    This is runable task running pool-1-thread-1
    This is runable task running pool-1-thread-1
    This is runable task running pool-1-thread-1
    This is runable task running pool-1-thread-2

scheduleAtFixedRate 练习 2

  • 超时的周期性任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class ScheduledExecutorServiceExample3 {
    public static void main(String[] args) {
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
    AtomicLong atomicLong = new AtomicLong(0L);
    //提交初始延迟 1 秒执行, 固定周期为 2 秒的 runnable 任务
    ScheduledFuture<?> runnableFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
    //记录当前时间
    Long current = System.currentTimeMillis();
    //判断是否为第一次运行
    if (atomicLong.get()==0){
    atomicLong.set(current);
    System.out.printf("first running [%d]\n",atomicLong.get());
    }else{
    //记录与上次的间隔时间
    System.out.printf("running time:[%d]\n",current-atomicLong.get());
    }
    //将当前时间保存
    atomicLong.set(current);
    //模拟超过固定周期时间
    mySleep(5);
    }, 1,2, TimeUnit.SECONDS);

    }
    private static void mySleep(int seconds){
    try {
    TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
  • 运行结果: 可以看出,超出周期时间时,任务完成后立即就进入了下一周期

    1
    2
    3
    4
    5
    first running [1597659726690]
    running time:[5042]
    running time:[5001]
    running time:[5000]
    running time:[5001]

scheduleWithFixedDelay 练习

  • scheduleWithFixedDelay 练习

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class ScheduledExecutorServiceExample4 {
    public static void main(String[] args) {
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
    AtomicLong atomicLong = new AtomicLong(0L);
    //提交初始延迟 1 秒执行, 延迟为 2 秒的 runnable 任务
    ScheduledFuture<?> runnableFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
    //记录当前时间
    Long current = System.currentTimeMillis();
    //判断是否为第一次运行
    if (atomicLong.get()==0){
    atomicLong.set(current);
    System.out.printf("first running [%d]\n",atomicLong.get());
    }else{
    //记录与上次的间隔时间
    System.out.printf("running time:[%d]\n",current-atomicLong.get());
    }
    //将当前时间保存
    atomicLong.set(current);
    //模拟超过固定周期时间
    mySleep(5);
    }, 1,2, TimeUnit.SECONDS);

    }
    private static void mySleep(int seconds){
    try {
    TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
  • 运行结果: 可以看出来,无论你的任务执行多久,在任务执行完毕之后都会延迟一定时间才进入下一周期。

    1
    2
    3
    4
    5
    6
    first running [1597659862349]
    running time:[7047]
    running time:[7002]
    running time:[7023]
    running time:[7002]
    running time:[7003]

setContinueExistingPeriodicTasksAfterShutdownPolicy 练习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ScheduledExecutorServiceExample5 {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
//提交固定周期任务
ScheduledFuture<?> runnableFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("This is runable task running "+Thread.currentThread().getName());
}, 1,2, TimeUnit.SECONDS);
//默认情况关闭线程池后是不允许继续执行固定周期任务的,所有输出 false
System.out.println(scheduledThreadPoolExecutor.getContinueExistingPeriodicTasksAfterShutdownPolicy());
//设置为 true
scheduledThreadPoolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
//休眠 1200 毫秒,确保任务被执行
mySleep(1200);
//关闭线程池
scheduledThreadPoolExecutor.shutdown();
//休眠 2000 毫秒后查看线程池状态
mySleep(2000);
//线程池的状态
System.out.println("isShutdown:"+scheduledThreadPoolExecutor.isShutdown());
System.out.println("isTerminating:"+scheduledThreadPoolExecutor.isTerminating());
System.out.println("isTerminated:"+scheduledThreadPoolExecutor.isTerminated());
}

private static void mySleep(int milliSeconds){
try {
TimeUnit.MILLISECONDS.sleep(milliSeconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
  • 运行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    false
    This is runable task running pool-1-thread-1
    This is runable task running pool-1-thread-1
    isShutdown:true
    isTerminating:true
    isTerminated:false
    This is runable task running pool-1-thread-1
    This is runable task running pool-1-thread-1
    ...

SpringBoot 异步操作

线程池配置

  • 线程池配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    /**
    * 线程池配置
    *
    * @author fulsun
    * @date 8/11/2021
    */
    @Configuration
    public class ThreadPoolConfig {

    // 核心线程池大小
    private int corePoolSize = 50;

    // 最大可创建的线程数
    private int maxPoolSize = 200;

    // 队列最大长度
    private int queueCapacity = 1000;

    // 线程池维护线程所允许的空闲时间
    private int keepAliveSeconds = 300;

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(maxPoolSize);
    executor.setCorePoolSize(corePoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setKeepAliveSeconds(keepAliveSeconds);
    // 线程池对拒绝任务(无线程可用)的处理策略
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
    }

    /**
    * 执行周期性或定时任务
    */
    @Bean(name = "scheduledExecutorService")
    protected ScheduledExecutorService scheduledExecutorService() {
    return new ScheduledThreadPoolExecutor(corePoolSize,
    new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true)
    .build()) {
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    ThreadsUtils.printException(r, t);
    }
    };
    }
    }

异步任务管理器

  • ScheduledExecutorService 的用法主要有三个:

    • 延时任务

      1
      2
      3
      public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
      public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    • 循环任务,按照上一次任务的 发起时间 计算下一次任务的开始时间

      • 上一个任务的开始时间 + 延迟时间 = 下一个任务的开始时间
      1
      2
      3
      4
      5
      6
      7
      8
      command—要执行的任务
      initialDelay -延迟第一次执行的时间
      Period—连续执行之间的时间间隔
      unit - initialDelay和period参数的时间单位
      public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
      long initialDelay,
      long period,
      TimeUnit unit);
    • 循环任务,以上一次任务的 结束时间 计算下一次任务的开始时间

      • 上一次任务的结束时间 + 延迟时间 = 下一次任务的开始时间
      1
      2
      3
      4
      5
      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
      long initialDelay,
      long delay,
      TimeUnit unit);

  • 使用单例模式创建管理器,操作 ScheduledThreadPoolExecutor,提供任务执行接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class AsyncManager {

    /**
    * 操作延迟 10 毫秒
    */
    private final int OPERATE_DELAY_TIME = 10;

    /**
    * 异步操作任务调度线程池
    */
    private ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService");

    /**
    * 单例模式
    */
    private AsyncManager() {
    }

    private static AsyncManager me = new AsyncManager();

    public static AsyncManager me() {
    return me;
    }

    /**
    * 执行任务
    *
    * @param task 任务
    */
    public void execute(TimerTask task) {
    executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
    }

    /**
    * 停止任务线程池
    */
    public void shutdown() {
    Threads.shutdownAndAwaitTermination(executor);
    }
    }

线程工具类

  • 线程相关工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package pers.fulsun.common.utils;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 线程相关工具类.
*
* @author fulsun
* @date 8/12/2021
*/
public class ThreadsUtils {

private static final Logger logger = LoggerFactory.getLogger(ThreadsUtils.class);

/**
* 打印线程异常信息
*/
public static void printException(Runnable r, Throwable t) {
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
logger.error(t.getMessage(), t);
}
}

/**
* 停止线程池
* 先使用 shutdown, 停止接收新任务并尝试完成所有已存在任务.
* 如果超时, 则调用 shutdownNow, 取消在 workQueue 中 Pending 的任务, 并中断所有阻塞函数.
* 如果仍人超時,則強制退出.
* 另对在 shutdown 时线程本身被调用中断做了处理.
*/
public static void shutdownAndAwaitTermination(ScheduledExecutorService pool) {
if (pool != null && !pool.isShutdown()) {
pool.shutdown();
try {
if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
pool.shutdownNow();
if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
logger.info("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

测试异步调用

  • 提供测试接口,执行二个异步操作(耗时共 5s)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    @GetMapping("/test/aysc")
    public static void ayscTest() {
    long start = System.currentTimeMillis();
    System.out.println("AsyncManagerTest.execute()..." + Thread.currentThread().getName());
    AsyncManager.me().execute(new TimerTask() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + "start");
    try {
    Thread.sleep(2000);
    System.out.println(Thread.currentThread().getName() + "end");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    AsyncManager.me().execute(new TimerTask() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + "start");
    try {
    Thread.sleep(3000);
    System.out.println(Thread.currentThread().getName() + "end");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });

    long end = System.currentTimeMillis();
    System.out.println("总共耗时: " + (end - start));
    }
  • 测试结果

    1
    2
    3
    4
    5
    6
    AsyncManagerTest.execute()...http-nio-8090-exec-22
    总共耗时: 2ms
    schedule-pool-3start
    schedule-pool-4start
    schedule-pool-3end
    schedule-pool-4end

使用@Async

线程池相关配置

  • application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    spring:
    task:
    execution:
    pool:
    # 最大线程数
    max-size: 16
    # 核心线程数
    core-size: 16
    # 存活时间
    keep-alive: 10s
    # 队列大小
    queue-capacity: 100
    # 是否允许核心线程超时
    allow-core-thread-timeout: true
    # 线程名称前缀
    thread-name-prefix: async-task-

@EnableAsync 注解

  • 需要在 Application 上添加 @EnableAsync 注解,开启异步任务。如果是选择在代码里面写 config,则需要在 config 文件上添加@EnableAsync 注解。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @EnableAsync
    @SpringBootApplication
    public class ThreadpoolApplication {

    public static void main(String[] args) {
    SpringApplication.run(ThreadpoolApplication.class, args);
    }

    }

AsyncTask

  • 编写一个异步任务处理类,在需要开启异步的方法上面添加@Async

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @Slf4j
    public class AsyncTask {
    @Async
    public void asyncRun() throws InterruptedException {
    Thread.sleep(10);
    log.info(Thread.currentThread().getName()+":处理完成");
    }
    }

AsyncService

  • 编写一个调用异步方法的 service

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Service
    @Slf4j
    public class AsyncService {
    @Autowired
    private AsyncTask asyncTask;

    public void asyncSimpleExample() {
    try {
    log.info("service start");
    asyncTask.asyncRun();
    log.info("service end");
    }catch (InterruptedException e){
    e.printStackTrace();
    }
    }

    }
  • 编写一个 Controller 去调用 AsyncService

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Controller
    @RequestMapping("/")
    public class AsyncController {
    @Autowired
    private AsyncService asyncService;
    @PostMapping("/asyncSimpleExample")
    @ResponseBody
    public void asyncSimpleExample(){
    asyncService.asyncSimpleExample();
    }
    }

@Async 和@EnableAsync 的原理