定时任务

  • 规定在什么时间做什么事情或执行什么命令操作(相当于会做事的闹钟)

  • 定时任务的应用场景十分广泛,如视频网站的定时发放成长值,程序定时清理临时附件,定时生成日志报表,数据库定时同步等

  • jdk自带库中,两种技术可以实现定时任务,一种是Timer,一种是ScheduledThreadPoolExecutor

    • Timer是单线程,如果开启多个线程服务,将会出现竞争,一旦出现异常,线程停止,定时任务停止;
    • ScheduledThreadPoolExecutor基于线程池实现,多线程,且自动调整线程数,线程出错并不会影响整体定时任务执行。
    • Timer兼容性更高,jdk1.3后可使用
    • ScheduledThreadPoolExecutor在jdk1.5后方可使用

Timer

  • Timer是一个线程,控制执行TimerTask所需要执行的内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class Timer {
    /**
    * The timer task queue. This data structure is shared with the timer
    * thread. The timer produces tasks, via its various schedule calls,
    * and the timer thread consumes, executing timer tasks as appropriate,
    * and removing them from the queue when they're obsolete.
    */
    private final TaskQueue queue = new TaskQueue();
    /**
    * The timer thread.
    */
    private final TimerThread thread = new TimerThread(queue);
    }

延时执行

1
2
3
4
5
6
//其中的delay是延时时间,表示多少毫秒后执行一次task
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}

指定时间节点执行

1
2
3
4
//到达指定时间time的时候执行一次task
public void schedule(TimerTask task, Date time) {
sched(task, time.getTime(), 0);
}

延时周期执行

1
2
3
4
5
6
7
8
//经过delay毫秒后按每period毫秒执行一次的周期执行task
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}

指定时间节点后周期执行

1
2
3
4
5
6
//到达指定时间firstTime之后按照每period毫秒执行一次的周期执行task
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}

TimerTask

  • TimerTask是一个实现了Runable接口的类,所以能够放到线程去执行

    1
    2
    3
    4
    5
    6
    public abstract class TimerTask implements Runnable {
    /**
    * This object is used to control access to the TimerTask internals.
    */
    final Object lock = new Object();
    }

示例

  • 代码结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class TimerTest {
    public static void main(String[] args) {
    Timer timer = new Timer();
    Task task = new Task();
    // 当前时间开始,每秒定时执行
    timer.schedule(task,new Date(),1000);
    }

    }
    class Task extends TimerTask {
    @Override
    public void run() {
    System.out.println(new Date()+": This is my job...");
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
  • 通过结果可以看出,下一个任务是在上一个结束后开始的

    1
    2
    3
    4
    Mon Sep 20 12:09:52 CST 2021:  This is my job...
    Mon Sep 20 12:09:54 CST 2021: This is my job...
    Mon Sep 20 12:09:56 CST 2021: This is my job...
    Mon Sep 20 12:09:58 CST 2021: This is my job...
  • 弊端:Timer是单线程的,一旦定时任务中某一过程时刻抛出异常,将会导致整体线程停止,定时任务停止。

ThreadPoolExecutor

ScheduledThreadPoolExecutor

  • 继承了ThreadPoolExecutor,是一个基于线程池的调度器
  • 通过实现ScheduledExecutorService接口方法去实现任务调度,主要方法如下

延时执行

1
2
3
4
5
6
7
8
9
10
11
12
//command是待执行的线程,delay表示延时时长,unit代表时间单位
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}

延时周期执行

  • 下一个任务的时间是在上个任务完成后执行,至少间隔period

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    //command是待执行的线程,initialDelay表示延时时长,period代表执行间隔时长,unit代表时间单位
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (period <= 0)
    throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
    null,
    triggerTime(initialDelay, unit),
    unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
    }

每段延时间隔执行

  • 下一个任务的时间是在上个任务完成后加上间隔时间(delay)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    //command是待执行的线程,initialDelay表示延时时长,delay代表每次执行线程前的延时时长,unit代表时间单位
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    long initialDelay,
    long delay,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (delay <= 0)
    throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
    null,
    triggerTime(initialDelay, unit),
    unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
    }

示例

  • 示例代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class JavaScheduledThreadPoolExecutor {
    public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
    System.out.println(new Date());
    //延时1秒后开始执行,每3秒执行一次
    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    System.out.println(new Date()+": This is my job...");
    }
    }, 1, 3, TimeUnit.SECONDS);
    }
    }
  • 结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Mon Sep 20 12:20:55 CST 2021
    # 延时1秒后开始执行
    Mon Sep 20 12:20:56 CST 2021: This is my job...
    # 每3秒(间隔周期)执行一次,这里超过了period,任务用了4s
    Mon Sep 20 12:21:00 CST 2021: This is my job...
    Mon Sep 20 12:21:04 CST 2021: This is my job...
    Mon Sep 20 12:21:08 CST 2021: This is my job...
    Mon Sep 20 12:21:12 CST 2021: This is my job...
    Mon Sep 20 12:21:16 CST 2021: This is my job...

Spring定时任务

Scheduled注解

  • Spring原生定时任务主要依靠@Scheduled注解实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Repeatable(Schedules.class)
    public @interface Scheduled {
    String CRON_DISABLED = "-";
    String cron() default ""; //类似于corn表达式,可以指定定时任务执行的延迟及周期规则

    String zone() default ""; //指明解析cron表达式的时区。
    long fixedDelay() default -1; //在最后一次调用结束和下一次调用开始之间以固定周期(以毫秒为单位)执行带注解的方法。(要等待上次任务完成后)
    String fixedDelayString() default ""; //同上面作用一样,只是String类型
    long fixedRate() default -1; //在调用之间以固定的周期(以毫秒为单位)执行带注解的方法。(不需要等待上次任务完成)

    String fixedRateString() default ""; //同上面作用一样,只是String类型
    long initialDelay() default -1; //第一次执行fixedRate()或fixedDelay()任务之前延迟的毫秒数 。
    String initialDelayString() default ""; //同上面作用一样,只是String类型
    }

静态定时任务示例

  • 编写一个定时任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Slf4j
    @Component
    @EnableScheduling
    public class TestJob {
    //每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ?")
    public void run() {
    log.info("Current time is :: " + Calendar.getInstance().getTime());
    }
    }
  • 启动项目,执行结果:

    1
    2
    3
    4
    5
    2021-09-20 17:50:10.002  INFO 5416 --- [   scheduling-1] pers.fulsun.Test.TestJob                 : Current time is :: Mon Sep 20 17:50:10 CST 2021
    2021-09-20 17:50:20.002 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:20 CST 2021
    2021-09-20 17:50:30.003 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:30 CST 2021
    2021-09-20 17:50:40.002 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:40 CST 2021
    2021-09-20 17:50:50.002 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:50 CST 2021

@Scheduled原理

  • 项目启动,扫描带有注解@Scheduled的所有方法信息,由ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization方法实现功能

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
    bean instanceof ScheduledExecutorService) {
    // Ignore AOP infrastructure such as scoped proxies.
    return bean;
    }

    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass) &&
    AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
    // 获取含有@Scheduled注解的方法
    Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
    method, Scheduled.class, Schedules.class);
    return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
    });
    if (annotatedMethods.isEmpty()) {
    this.nonAnnotatedClasses.add(targetClass);
    if (logger.isTraceEnabled()) {
    logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
    }
    }
    else {
    // Non-empty set of methods
    annotatedMethods.forEach((method, scheduledMethods) ->
    // 调用processScheduled方法将定时任务方法存放到任务队列中
    scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
    if (logger.isTraceEnabled()) {
    logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
    "': " + annotatedMethods);
    }
    }
    }
    return bean;
    }

  • processScheduled方法处理@Scheduled注解后面的参数,并将其添加到任务列表中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
    // 创建任务线程
    Runnable runnable = createRunnable(bean, method);
    boolean processedSchedule = false;
    String errorMessage =
    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

    Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

    // 解析任务执行初始延迟
    long initialDelay = scheduled.initialDelay();
    String initialDelayString = scheduled.initialDelayString();
    if (StringUtils.hasText(initialDelayString)) {
    Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
    if (this.embeddedValueResolver != null) {
    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
    }
    if (StringUtils.hasLength(initialDelayString)) {
    try {
    initialDelay = parseDelayAsLong(initialDelayString);
    }
    catch (RuntimeException ex) {
    throw new IllegalArgumentException(
    "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
    }
    }
    }

    // 解析cron表达式
    String cron = scheduled.cron();
    if (StringUtils.hasText(cron)) {
    String zone = scheduled.zone();
    if (this.embeddedValueResolver != null) {
    cron = this.embeddedValueResolver.resolveStringValue(cron);
    zone = this.embeddedValueResolver.resolveStringValue(zone);
    }
    if (StringUtils.hasLength(cron)) {
    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
    processedSchedule = true;
    if (!Scheduled.CRON_DISABLED.equals(cron)) {
    TimeZone timeZone;
    if (StringUtils.hasText(zone)) {
    timeZone = StringUtils.parseTimeZoneString(zone);
    }
    else {
    timeZone = TimeZone.getDefault();
    }
    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
    }
    }
    }

    // At this point we don't need to differentiate between initial delay set or not anymore
    if (initialDelay < 0) {
    initialDelay = 0;
    }

    // 解析fixedDelay参数
    long fixedDelay = scheduled.fixedDelay();
    if (fixedDelay >= 0) {
    Assert.isTrue(!processedSchedule, errorMessage);
    processedSchedule = true;
    // 存放任务到任务队列中
    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
    }
    String fixedDelayString = scheduled.fixedDelayString();
    if (StringUtils.hasText(fixedDelayString)) {
    if (this.embeddedValueResolver != null) {
    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
    }
    if (StringUtils.hasLength(fixedDelayString)) {
    Assert.isTrue(!processedSchedule, errorMessage);
    processedSchedule = true;
    try {
    fixedDelay = parseDelayAsLong(fixedDelayString);
    }
    catch (RuntimeException ex) {
    throw new IllegalArgumentException(
    "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
    }
    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
    }
    }

    // 解析fixedRate参数
    long fixedRate = scheduled.fixedRate();
    if (fixedRate >= 0) {
    Assert.isTrue(!processedSchedule, errorMessage);
    processedSchedule = true;
    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
    }
    String fixedRateString = scheduled.fixedRateString();
    if (StringUtils.hasText(fixedRateString)) {
    if (this.embeddedValueResolver != null) {
    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
    }
    if (StringUtils.hasLength(fixedRateString)) {
    Assert.isTrue(!processedSchedule, errorMessage);
    processedSchedule = true;
    try {
    fixedRate = parseDelayAsLong(fixedRateString);
    }
    catch (RuntimeException ex) {
    throw new IllegalArgumentException(
    "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
    }
    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
    }
    }

    // Check whether we had any attribute set
    Assert.isTrue(processedSchedule, errorMessage);

    // Finally register the scheduled tasks
    // 并发控制将任务队列存入注册任务列表
    synchronized (this.scheduledTasks) {
    Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
    regTasks.addAll(tasks);
    }
    }
    catch (IllegalArgumentException ex) {
    throw new IllegalStateException(
    "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
    }
    }

  • 将任务解析并添加到任务队列后,交由ScheduledTaskRegistrar类的scheduleTasks方法添加(注册)定时任务到环境中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    protected void scheduleTasks() {
    if (this.taskScheduler == null) {
    //获取ScheduledExecutorService对象,实际上都是使用ScheduledThreadPoolExecutor执行定时任务调度
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    if (this.triggerTasks != null) {
    for (TriggerTask task : this.triggerTasks) {
    addScheduledTask(scheduleTriggerTask(task));
    }
    }
    if (this.cronTasks != null) {
    for (CronTask task : this.cronTasks) {
    addScheduledTask(scheduleCronTask(task));
    }
    }
    if (this.fixedRateTasks != null) {
    for (IntervalTask task : this.fixedRateTasks) {
    addScheduledTask(scheduleFixedRateTask(task));
    }
    }
    if (this.fixedDelayTasks != null) {
    for (IntervalTask task : this.fixedDelayTasks) {
    addScheduledTask(scheduleFixedDelayTask(task));
    }
    }
    }
    private void addScheduledTask(@Nullable ScheduledTask task) {
    if (task != null) {
    this.scheduledTasks.add(task);
    }
    }
  • 调用scheduleCronTask初始化定时任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
    scheduledTask = new ScheduledTask(task);
    newTask = true;
    }
    if (this.taskScheduler != null) {
    scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    }
    else {
    addCronTask(task);
    this.unresolvedTasks.put(task, scheduledTask);
    }
    return (newTask ? scheduledTask : null);
    }
  • 在scheduleTasks中,会对线程池进行初始化,线程池的核心线程数量为1

    1
    2
    3
    4
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1));
    }
  • 由上述源码可以看出,Spring原生定时任务的大概步骤如下:

    1. 扫描带@Scheduled注解的类和方法
    2. 将定时任务解析完成后加入任务队列
    3. 将定时任务注册到当前运行环境,等待执行
    4. 且@Scheduled的底层调度实现是ScheduledThreadPoolExecutor

Spring定时任务动态启停

  • 模拟上述过程,将扫描注解改为从数据库获取数据,后续的任务队列和任务注册都可以不变,可以实现数据库定时任务配置。

问题点

  • 如何动态启停和更改定时任务执行规则?

    • 数据库中可以存储定时任务的类名,方法名,是否启用及执行规律(使用cron表达式),每次项目启动时都可以做到动态读取数据库记录添加定时任务。
  • 可是如果动态更改定时任务配置,而又不用重启生效,如何改变当前运行的任务线程?

    • 动态移除和重新添加进程即可,即操作ScheduledTaskRegistrar注册器实现定时任务的动态添加和移除。
  • 步骤

    1. sys_schedule存储定时任务配置信息
    2. 程序启动后从数据库中获取配置信息List<SysSchedule>, 组合成线程任务 SchedulingRunnable (实现Runnable接口)
    3. SchedulingRunnable 中通过类名反射执行方法
    4. 通过 SchedulingRunnable 和Cron表达式构建 cronTask
    5. 通过 TaskScheduler.schedule()注册到环境中,schedule方法返回的是ScheduledFuter,Future接口中有 cancle ,isCancelled ,isDone方法
    6. 使用ScheuleTask类封装 ScheduledFuter
    7. 通过注册类保留定时任务信息,使用Map<Runable,ScheuleTask>, Runable定义了方法名和类名,ScheuleTask中定义了定时任务的状态信息。通过对futer的操作完成动态启动停止
      • 添加 map.put
      • 修改:先删除旧任务 futer.cancle,然后更新map中task的value值
      • 删除:先删除旧任务 futer.cancle,后移除map中的key

实现

数据库表

  • 创建数据库表格 sys_schedule,用于存储定时任务配置信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    -- ----------------------------
    -- Table structure for sys_schedule 定时任务表格
    -- ----------------------------
    DROP TABLE IF EXISTS `sys_schedule`;
    CREATE TABLE `sys_schedule` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
    `sche_bean_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '定时任务指定类注册名',
    `sche_method` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '定时任务类执行方法,默认使用run()方法',
    `sche_cron` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '定时任务执行规则cron表达式',
    `sche_desc` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '定时任务描述',
    `sche_status` int(255) NOT NULL COMMENT '定时任务状态',
    `del_flag` int(255) NOT NULL COMMENT '是否删除',
    `create_by` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建人',
    `create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
    `update_by` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '最后更新人',
    `update_time` timestamp NULL DEFAULT NULL COMMENT '最后更新时间',
    PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 15 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

新建system-job模块

  • 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    <dependencies>
    <!--作为web项目存在-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--eureka 客户端-->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <!--实时健康监控-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!--配置客户端-->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-config-client</artifactId>
    </dependency>

    <dependency>
    <groupId>pers.fulsun</groupId>
    <artifactId>common-data</artifactId>
    </dependency>

    <dependency>
    <groupId>pers.fulsun</groupId>
    <artifactId>common-swagger</artifactId>
    </dependency>
    </dependencies>

    <!-- 添加spring-boot的maven插件,不能少,打jar包时得用 -->
    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>
  • 添加配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #springboot 只认application.properties和application.yml
    server:
    port: 8888 #服务端口
    spring:
    profiles:
    active: dev #当前生效环境
    application:
    name: springcloud-job #指定应用的唯一标识/服务名
    # 配置中心
    cloud:
    config:
    name: ${spring.application.name},datasource,redis #指定工程于config server中的应用名
    profile: ${spring.profiles.active} #指定工程于config server中的生效环境
    uri: http://localhost:8080 #指定配置中心的注册路径
  • 生成SysSchedule类及相关操作类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Data
    @EqualsAndHashCode(callSuper = true)
    @Accessors(chain = true)
    @TableName("sys_schedule")
    @ApiModel(value="SysSchedule对象", description="定时任务操作类")
    public class SysSchedule extends BaseEntity {
    private static final long serialVersionUID = 1L;
    @ApiModelProperty(value = "主键id")
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    @ApiModelProperty(value = "定时任务类全路径")
    @TableField("sche_bean_name")
    private String scheBeanName;

    @ApiModelProperty(value = "定时任务类执行方法,默认使用run()方法")
    @TableField("sche_method")
    private String scheMethod;

    @ApiModelProperty(value = "定时任务执行规则cron表达式")
    @TableField("sche_cron")
    private String scheCron;

    @ApiModelProperty(value = "定时任务描述")
    @TableField("sche_desc")
    private String scheDesc;

    @ApiModelProperty(value = "定时任务状态")
    @TableField("sche_status")
    private Integer scheStatus;

    @ApiModelProperty(value = "是否删除")
    @TableField(value = "del_flag",fill = FieldFill.INSERT)
    @TableLogic
    private Integer delFlag;
    }
  • 启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @EnableScheduling //开启定时任务
    @EnableEurekaClient
    @SpringBootApplication
    public class SpringCloudJobApplication {
    public static void main(String[] args) {
    SpringApplication.run(SpringCloudJobApplication.class, args);
    }
    }

自定义线程类

  • 自定义线程类SchedulingRunnable ,用于反射执行数据库中配置的类及定时方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    package pers.fulsun.demo.springcloud.helper;

    import io.netty.util.internal.ObjectUtil;
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.ObjectUtils;
    import org.springframework.util.ReflectionUtils;
    import pers.fulsun.demo.springcloud.utils.SpringUtil;

    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.util.Objects;

    @Slf4j
    @AllArgsConstructor
    @Getter
    public class SchedulingRunnable implements Runnable {
    /**
    * 定时任务指定Bean类名
    */
    private String scheBeanName;
    /**
    * 定时任务指定Bean的方法名
    */
    private String scheMethod;

    @Override
    public void run() {
    long startTime = System.currentTimeMillis();
    log.info("执行对应的bean方法:{}.{}(),当前时间:{}", scheBeanName, scheMethod, startTime);
    //利用反射生成对应的bean实例并执行指定方法,指定bean可能涉及数据库或者Spring体系的内容,所以统一从spring容器中获取
    Object target = SpringUtil.getBeanByName(this.getScheBeanName());
    Method method = null;
    //执行实例判空
    if (ObjectUtils.isNotEmpty(target)) {
    try {
    //获取指定方法
    method = target.getClass().getDeclaredMethod(scheMethod);
    } catch (NoSuchMethodException e) {
    if (log.isErrorEnabled()) {
    log.error(e.getMessage());
    }
    }
    if (ObjectUtils.isNotEmpty(method)) {
    //使该方法可访问可执行
    ReflectionUtils.makeAccessible(method);
    try {
    method.invoke(target, null);
    } catch (IllegalAccessException e) {
    } catch (InvocationTargetException e) {
    e.printStackTrace();
    }
    }
    if (log.isDebugEnabled()) {
    log.debug("方法执行完毕,耗时:{}", System.currentTimeMillis() - startTime);
    }
    }
    }

    /**
    * 重写equals方法,否则移除定时任务将出现无法移除的情况,因为无法匹配
    *
    * @param o
    * @return
    */
    @Override
    public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    SchedulingRunnable that = (SchedulingRunnable) o;
    return scheBeanName.equals(that.scheBeanName) &&
    scheMethod.equals(that.getScheMethod());
    }

    @Override
    public int hashCode() {
    return Objects.hash(scheBeanName, scheMethod);
    }
    }

自定义ScheduledTask,

  • 去除原生ScheduledTask构造函数的task参数依赖,仅实现原生的cancel方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public final class ScheduledTask {
    static ScheduledFuture<?> future;
    /**
    * 定时任务取消
    */
    public void cancel(){
    ScheduledFuture<?> future = this.future;
    if(ObjectUtil.isNotNull(future)){
    future.cancel(true);
    }
    }
    }

自定义定时任务注册类

  • 代替上述的ScheduledTaskRegistrar

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @Slf4j
    @Component
    public class CronTaskRegistrar implements DisposableBean {
    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
    @Autowired
    private TaskScheduler taskScheduler;
    public void addCronTask(Runnable task, String cronExpression) {
    addCronTask(new CronTask(task, cronExpression));
    }
    public void addCronTask(CronTask cronTask) {
    if(log.isDebugEnabled()){
    log.debug("注册新定时任务");
    }
    if (cronTask != null) {
    Runnable task = cronTask.getRunnable();
    if (this.scheduledTasks.containsKey(task)) {
    removeCronTask(task);
    }
    this.scheduledTasks.put(task, scheduleCronTask(cronTask));
    }
    }
    public void removeCronTask(Runnable task) {
    if(log.isDebugEnabled()){
    log.debug("移除定时任务");
    }
    boolean b = this.scheduledTasks.containsKey(task);
    ScheduledTask scheduledTask = this.scheduledTasks.remove(task);
    if (scheduledTask != null)
    scheduledTask.cancel();
    }
    public ScheduledTask scheduleCronTask(CronTask cronTask) {
    if(log.isDebugEnabled()){
    log.debug("构建定时任务");
    }
    ScheduledTask scheduledTask = new ScheduledTask();
    scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
    return scheduledTask;
    }
    @Override
    public void destroy() {
    for (ScheduledTask task : this.scheduledTasks.values()) {
    task.cancel();
    }
    this.scheduledTasks.clear();
    }
    }

配置系统定时任务线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@Configuration
public class ScheduleConfig {
/**
* 定时任务线程池对象
* @return
*/
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
//定时任务执行线程池核心线程数
threadPoolTaskScheduler.setPoolSize(6);
//允许移除任务
threadPoolTaskScheduler.setRemoveOnCancelPolicy(true);
//线程池前缀
threadPoolTaskScheduler.setThreadNamePrefix("TestThread-");

return threadPoolTaskScheduler;
}

}

编写自动执行类

  • springboot项目启动自动执行sql语句查询数据库中的有效定时任务信息并将定时任务注册到任务列表

  • CommandLineRunner和ApplicationRunner的作用是相同的。不同之处在于CommandLineRunner接口的run()方法接收String数组作为参数,即是最原始的参数,没有做任何处理;而ApplicationRunner接口的run()方法接收ApplicationArguments对象作为参数,是对原始参数做了进一步的封装。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Slf4j
    @Component
    public class InitApplication implements ApplicationRunner {
    @Autowired
    private SysScheduleService sysScheduleService;
    @Autowired
    private CronTaskRegistrar cronTaskRegistrar;
    @Override
    public void run(ApplicationArguments args) throws Exception {
    //从数据库获取所有定时任务记录列表
    List<SysSchedule> list = sysScheduleService.list();
    for(SysSchedule sysSchedule: list){
    //如果定时任务是有效状态,则将定时任务注册到任务列表
    if(sysSchedule.getScheStatus() == CommonConst.VALID){
    log.debug("注册定时任务:{}", sysSchedule);
    SchedulingRunnable schedulingRunnable = new SchedulingRunnable(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod());
    cronTaskRegistrar.addCronTask(schedulingRunnable, sysSchedule.getScheCron());
    }
    }
    }
    }

编写定时类与方法

  • 用于被配置为定时任务类和定时任务方法

    1
    2
    3
    4
    5
    6
    7
    8
    @Slf4j
    @Component
    public class MySchedule1 {
    public void test() {
    // 此处仅用于测试,正式使用时方法可执行实际业务,如定时删除临时附件等
    log.info("测试定时");
    }
    }

编写定时任务控制器

  • 编写SysScheduleController,暴露接口供前端调用动态配置定时任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    @RestController
    @AllArgsConstructor
    @RequestMapping("/schedule")
    public class SysScheduleController {
    /**
    * 定时任务操作类service
    */
    private SysScheduleService sysScheduleService;
    /**
    * 定时任务注册类
    */
    private CronTaskRegistrar cronTaskRegistrar;

    /**
    * 新增定时任务
    *
    * @param sysSchedule
    * @return
    */
    @PostMapping
    public String addJob(SysSchedule sysSchedule) {
    if (ObjectUtils.isNull(sysSchedule) || ObjectUtils.isNull(sysSchedule.getScheBeanName()) || ObjectUtils.isNull(sysSchedule.getScheMethod())) {
    return "请正确填写完整信息";
    }
    //判空
    if (ObjectUtils.isNotNull(sysSchedule.getScheBeanName())) {
    Object bean = SpringUtil.getBeanByName(sysSchedule.getScheBeanName());
    if (ObjectUtils.isNull(bean)) {
    return "查无此类,请核实";
    }
    try {
    bean.getClass().getDeclaredMethod(sysSchedule.getScheMethod());
    } catch (NoSuchMethodException e) {
    return "查无此方法,请核实";
    }
    }
    //验证cron表达式合法性
    if (ObjectUtils.isNull(sysSchedule.getScheCron()) || (!isValidExpression(sysSchedule.getScheCron()))) {
    return "请输入正确cron表达式";
    }
    //判重
    if (ObjectUtils.isNotNull(this.sysScheduleService.findByBeanNameAndMethod(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod()))) {
    return "该定时任务已存在,请核实";
    }
    this.sysScheduleService.save(sysSchedule);
    //如果定时任务是激活状态
    if (sysSchedule.getScheStatus() == Constants.VALID) {
    SchedulingRunnable schedulingRunnable = new SchedulingRunnable(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod());
    cronTaskRegistrar.addCronTask(schedulingRunnable, sysSchedule.getScheCron());
    }
    return "OK";
    }

    /**
    * 验证cron表达式是否正确
    *
    * @param corn
    * @return
    */
    public boolean isValidExpression(String corn) {
    //使用spring内置工具CronSequenceGenerator进行校验
    return CronSequenceGenerator.isValidExpression(corn);
    }

    }

测试

  • 新增

  • 结果

    1
    2
    3
    4
    5
    6
    7
    8
    2021-09-21 14:12:36.523  INFO 9352 --- [nio-8888-exec-5] p.f.d.s.helper.CronTaskRegistrar         : 注册新定时任务
    2021-09-21 14:12:36.523 INFO 9352 --- [nio-8888-exec-5] p.f.d.s.helper.CronTaskRegistrar : 构建定时任务
    2021-09-21 14:12:40.003 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 执行对应的bean方法:mySchedule1.test(),当前时间:1632204760003
    2021-09-21 14:12:40.005 INFO 9352 --- [ TestThread-1] p.f.demo.springcloud.test.MySchedule1 : 测试定时
    2021-09-21 14:12:40.005 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 方法执行完毕,耗时:2
    2021-09-21 14:12:45.003 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 执行对应的bean方法:mySchedule1.test(),当前时间:1632204765003
    2021-09-21 14:12:45.003 INFO 9352 --- [ TestThread-1] p.f.demo.springcloud.test.MySchedule1 : 测试定时
    2021-09-21 14:12:45.003 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 方法执行完毕,耗时:0

完整控制器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package pers.fulsun.demo.springcloud.controller;


import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.extension.api.R;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.springframework.web.bind.annotation.*;
import pers.fulsun.demo.springcloud.Constants;
import pers.fulsun.demo.springcloud.entity.SysSchedule;
import pers.fulsun.demo.springcloud.helper.CronTaskRegistrar;
import pers.fulsun.demo.springcloud.helper.SchedulingRunnable;
import pers.fulsun.demo.springcloud.service.SysScheduleService;
import pers.fulsun.demo.springcloud.utils.SpringUtil;

/**
* <p>
* 前端控制器
* </p>
*
* @author fulsun
* @since 2021-09-21
*/
@RestController
@AllArgsConstructor
@RequestMapping("/schedule")
public class SysScheduleController {
/**
* 定时任务操作类service
*/
private SysScheduleService sysScheduleService;
/**
* 定时任务注册类
*/
private CronTaskRegistrar cronTaskRegistrar;

/**
* 新增定时任务
*
* @param sysSchedule
* @return
*/
@PostMapping
public String addJob(SysSchedule sysSchedule) {
if (ObjectUtils.isNull(sysSchedule) || ObjectUtils.isNull(sysSchedule.getScheBeanName()) || ObjectUtils.isNull(sysSchedule.getScheMethod())) {
return "请正确填写完整信息";
}
//判空
if (ObjectUtils.isNotNull(sysSchedule.getScheBeanName())) {
Object bean = SpringUtil.getBeanByName(sysSchedule.getScheBeanName());
if (ObjectUtils.isNull(bean)) {
return "查无此类,请核实";
}
try {
bean.getClass().getDeclaredMethod(sysSchedule.getScheMethod());
} catch (NoSuchMethodException e) {
return "查无此方法,请核实";
}
}
//验证cron表达式合法性
if (ObjectUtils.isNull(sysSchedule.getScheCron()) || (!isValidExpression(sysSchedule.getScheCron()))) {
return "请输入正确cron表达式";
}
//判重
if (ObjectUtils.isNotNull(this.sysScheduleService.findByBeanNameAndMethod(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod()))) {
return "该定时任务已存在,请核实";
}
this.sysScheduleService.save(sysSchedule);
//如果定时任务是激活状态
if (sysSchedule.getScheStatus() == Constants.VALID) {
SchedulingRunnable schedulingRunnable = new SchedulingRunnable(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod());
cronTaskRegistrar.addCronTask(schedulingRunnable, sysSchedule.getScheCron());
}
return "OK";
}

/**
* 验证cron表达式是否正确
*
* @param corn
* @return
*/
public boolean isValidExpression(String corn) {
//使用spring内置工具CronSequenceGenerator进行校验
return CronSequenceGenerator.isValidExpression(corn);
}

/**
* 更新定时任务操作类
*
* @param sysSchedule
* @return
*/
@PutMapping
public String updateJob(SysSchedule sysSchedule) {
if (ObjectUtils.isNull(sysSchedule) || ObjectUtils.isNull(sysSchedule.getScheBeanName()) || ObjectUtils.isNull(sysSchedule.getScheMethod())) {
return "请正确填写完整信息";
}
if (ObjectUtils.isNull(sysSchedule.getId())) {
return "请选择要修改的定时任务";
}
if (ObjectUtils.isNotNull(sysSchedule.getScheBeanName())) {
Object bean = SpringUtil.getBeanByName(sysSchedule.getScheBeanName());
if (ObjectUtils.isNull(bean)) {
return "查无此类,请核实";
}
try {
bean.getClass().getDeclaredMethod(sysSchedule.getScheMethod());
} catch (NoSuchMethodException e) {
return "查无此方法,请核实";
}

//验证cron表达式合法性
if (ObjectUtils.isNull(sysSchedule.getScheCron()) || (!isValidExpression(sysSchedule.getScheCron()))) {
return "请输入正确cron表达式";
}

SysSchedule oldSysSchedule = this.sysScheduleService.getById(sysSchedule.getId());
this.sysScheduleService.updateById(sysSchedule);
//停止之前的定时任务
SchedulingRunnable schedulingRunnable = new SchedulingRunnable(oldSysSchedule.getScheBeanName(), oldSysSchedule.getScheMethod());
this.cronTaskRegistrar.removeCronTask(schedulingRunnable);
//判断是否重新启动
if (sysSchedule.getScheStatus() == Constants.VALID) {
cronTaskRegistrar.addCronTask(new SchedulingRunnable(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod()), sysSchedule.getScheCron());
}
}
return "OK";
}

/**
* 删除定时任务
* @param id
* @return
*/
@DeleteMapping("/{id}")
public String removeJob(@PathVariable Integer id){
//从数据库获取数据
SysSchedule schedule = this.sysScheduleService.getById(id);
if(ObjectUtils.isNull(schedule)){
return "查无此数据,请核实";
}
//停止定时任务
SchedulingRunnable schedulingRunnable = new SchedulingRunnable(schedule.getScheBeanName(), schedule.getScheMethod());
this.cronTaskRegistrar.removeCronTask(schedulingRunnable);
//最后删除数据库数据
this.sysScheduleService.removeById(id);
return "OK";
}
/**
* 获取定时任务
* @param id
* @return
*/
@GetMapping("/{id}")
public SysSchedule getJob(@PathVariable Integer id){
return this.sysScheduleService.getById(id);
}
/**
* 获取定时任务列表
* @param page
* @param sysSchedule
* @return
*/
@GetMapping("/page")
public IPage<SysSchedule> getPage(IPage<SysSchedule> page, SysSchedule sysSchedule){
return this.sysScheduleService.page(page,new QueryWrapper<>(sysSchedule));
}
}