Springboot定时任务开发
定时任务
规定在什么时间做什么事情或执行什么命令操作(相当于会做事的闹钟)
定时任务的应用场景十分广泛,如视频网站的定时发放成长值,程序定时清理临时附件,定时生成日志报表,数据库定时同步等
jdk自带库中,两种技术可以实现定时任务,一种是Timer,一种是ScheduledThreadPoolExecutor
- Timer是单线程,如果开启多个线程服务,将会出现竞争,一旦出现异常,线程停止,定时任务停止;
- ScheduledThreadPoolExecutor基于线程池实现,多线程,且自动调整线程数,线程出错并不会影响整体定时任务执行。
- Timer兼容性更高,jdk1.3后可使用
- ScheduledThreadPoolExecutor在jdk1.5后方可使用
Timer
Timer是一个线程,控制执行TimerTask所需要执行的内容
1
2
3
4
5
6
7
8
9
10
11
12
13public class Timer {
/**
* The timer task queue. This data structure is shared with the timer
* thread. The timer produces tasks, via its various schedule calls,
* and the timer thread consumes, executing timer tasks as appropriate,
* and removing them from the queue when they're obsolete.
*/
private final TaskQueue queue = new TaskQueue();
/**
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);
}
延时执行
1 | //其中的delay是延时时间,表示多少毫秒后执行一次task |
指定时间节点执行
1 | //到达指定时间time的时候执行一次task |
延时周期执行
1 | //经过delay毫秒后按每period毫秒执行一次的周期执行task |
指定时间节点后周期执行
1 | //到达指定时间firstTime之后按照每period毫秒执行一次的周期执行task |
TimerTask
TimerTask是一个实现了Runable接口的类,所以能够放到线程去执行
1
2
3
4
5
6public abstract class TimerTask implements Runnable {
/**
* This object is used to control access to the TimerTask internals.
*/
final Object lock = new Object();
}
示例
代码结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class TimerTest {
public static void main(String[] args) {
Timer timer = new Timer();
Task task = new Task();
// 当前时间开始,每秒定时执行
timer.schedule(task,new Date(),1000);
}
}
class Task extends TimerTask {
public void run() {
System.out.println(new Date()+": This is my job...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}通过结果可以看出,下一个任务是在上一个结束后开始的
1
2
3
4Mon Sep 20 12:09:52 CST 2021: This is my job...
Mon Sep 20 12:09:54 CST 2021: This is my job...
Mon Sep 20 12:09:56 CST 2021: This is my job...
Mon Sep 20 12:09:58 CST 2021: This is my job...弊端:Timer是单线程的,一旦定时任务中某一过程时刻抛出异常,将会导致整体线程停止,定时任务停止。
ThreadPoolExecutor
ScheduledThreadPoolExecutor
- 继承了ThreadPoolExecutor,是一个基于线程池的调度器
- 通过实现ScheduledExecutorService接口方法去实现任务调度,主要方法如下
延时执行
1 | //command是待执行的线程,delay表示延时时长,unit代表时间单位 |
延时周期执行
下一个任务的时间是在上个任务完成后执行,至少间隔period
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19//command是待执行的线程,initialDelay表示延时时长,period代表执行间隔时长,unit代表时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
每段延时间隔执行
下一个任务的时间是在上个任务完成后加上间隔时间(delay)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19//command是待执行的线程,initialDelay表示延时时长,delay代表每次执行线程前的延时时长,unit代表时间单位
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
示例
示例代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13public class JavaScheduledThreadPoolExecutor {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
System.out.println(new Date());
//延时1秒后开始执行,每3秒执行一次
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println(new Date()+": This is my job...");
}
}, 1, 3, TimeUnit.SECONDS);
}
}结果
1
2
3
4
5
6
7
8
9Mon Sep 20 12:20:55 CST 2021
# 延时1秒后开始执行
Mon Sep 20 12:20:56 CST 2021: This is my job...
# 每3秒(间隔周期)执行一次,这里超过了period,任务用了4s
Mon Sep 20 12:21:00 CST 2021: This is my job...
Mon Sep 20 12:21:04 CST 2021: This is my job...
Mon Sep 20 12:21:08 CST 2021: This is my job...
Mon Sep 20 12:21:12 CST 2021: This is my job...
Mon Sep 20 12:21:16 CST 2021: This is my job...
Spring定时任务
Scheduled注解
Spring原生定时任务主要依靠@Scheduled注解实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Scheduled {
String CRON_DISABLED = "-";
String cron() default ""; //类似于corn表达式,可以指定定时任务执行的延迟及周期规则
String zone() default ""; //指明解析cron表达式的时区。
long fixedDelay() default -1; //在最后一次调用结束和下一次调用开始之间以固定周期(以毫秒为单位)执行带注解的方法。(要等待上次任务完成后)
String fixedDelayString() default ""; //同上面作用一样,只是String类型
long fixedRate() default -1; //在调用之间以固定的周期(以毫秒为单位)执行带注解的方法。(不需要等待上次任务完成)
String fixedRateString() default ""; //同上面作用一样,只是String类型
long initialDelay() default -1; //第一次执行fixedRate()或fixedDelay()任务之前延迟的毫秒数 。
String initialDelayString() default ""; //同上面作用一样,只是String类型
}
静态定时任务示例
编写一个定时任务
1
2
3
4
5
6
7
8
9
10
public class TestJob {
//每10秒执行一次
public void run() {
log.info("Current time is :: " + Calendar.getInstance().getTime());
}
}启动项目,执行结果:
1
2
3
4
52021-09-20 17:50:10.002 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:10 CST 2021
2021-09-20 17:50:20.002 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:20 CST 2021
2021-09-20 17:50:30.003 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:30 CST 2021
2021-09-20 17:50:40.002 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:40 CST 2021
2021-09-20 17:50:50.002 INFO 5416 --- [ scheduling-1] pers.fulsun.Test.TestJob : Current time is :: Mon Sep 20 17:50:50 CST 2021
@Scheduled原理
项目启动,扫描带有注解@Scheduled的所有方法信息,由ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization方法实现功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
// 获取含有@Scheduled注解的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
// 调用processScheduled方法将定时任务方法存放到任务队列中
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}processScheduled方法处理@Scheduled注解后面的参数,并将其添加到任务列表中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
// 创建任务线程
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// 解析任务执行初始延迟
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// 解析cron表达式
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// 解析fixedDelay参数
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
// 存放任务到任务队列中
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// 解析fixedRate参数
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
// 并发控制将任务队列存入注册任务列表
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}将任务解析并添加到任务队列后,交由ScheduledTaskRegistrar类的scheduleTasks方法添加(注册)定时任务到环境中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32protected void scheduleTasks() {
if (this.taskScheduler == null) {
//获取ScheduledExecutorService对象,实际上都是使用ScheduledThreadPoolExecutor执行定时任务调度
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
private void addScheduledTask( { ScheduledTask task)
if (task != null) {
this.scheduledTasks.add(task);
}
}调用scheduleCronTask初始化定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}在scheduleTasks中,会对线程池进行初始化,线程池的核心线程数量为1
1
2
3
4public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}由上述源码可以看出,Spring原生定时任务的大概步骤如下:
- 扫描带@Scheduled注解的类和方法
- 将定时任务解析完成后加入任务队列
- 将定时任务注册到当前运行环境,等待执行
- 且@Scheduled的底层调度实现是ScheduledThreadPoolExecutor
Spring定时任务动态启停
- 模拟上述过程,将扫描注解改为从数据库获取数据,后续的任务队列和任务注册都可以不变,可以实现数据库定时任务配置。
问题点
如何动态启停和更改定时任务执行规则?
- 数据库中可以存储定时任务的类名,方法名,是否启用及执行规律(使用cron表达式),每次项目启动时都可以做到动态读取数据库记录添加定时任务。
可是如果动态更改定时任务配置,而又不用重启生效,如何改变当前运行的任务线程?
- 动态移除和重新添加进程即可,即操作ScheduledTaskRegistrar注册器实现定时任务的动态添加和移除。
步骤
- sys_schedule存储定时任务配置信息
- 程序启动后从数据库中获取配置信息
List<SysSchedule>
, 组合成线程任务 SchedulingRunnable (实现Runnable接口) - SchedulingRunnable 中通过类名反射执行方法
- 通过 SchedulingRunnable 和Cron表达式构建 cronTask
- 通过 TaskScheduler.schedule()注册到环境中,schedule方法返回的是ScheduledFuter,Future接口中有 cancle ,isCancelled ,isDone方法
- 使用ScheuleTask类封装 ScheduledFuter
- 通过注册类保留定时任务信息,使用
Map<Runable,ScheuleTask>
, Runable定义了方法名和类名,ScheuleTask中定义了定时任务的状态信息。通过对futer的操作完成动态启动停止- 添加 map.put
- 修改:先删除旧任务 futer.cancle,然后更新map中task的value值
- 删除:先删除旧任务 futer.cancle,后移除map中的key
实现
数据库表
创建数据库表格 sys_schedule,用于存储定时任务配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18-- ----------------------------
-- Table structure for sys_schedule 定时任务表格
-- ----------------------------
DROP TABLE IF EXISTS `sys_schedule`;
CREATE TABLE `sys_schedule` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`sche_bean_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '定时任务指定类注册名',
`sche_method` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '定时任务类执行方法,默认使用run()方法',
`sche_cron` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '定时任务执行规则cron表达式',
`sche_desc` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '定时任务描述',
`sche_status` int(255) NOT NULL COMMENT '定时任务状态',
`del_flag` int(255) NOT NULL COMMENT '是否删除',
`create_by` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建人',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_by` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '最后更新人',
`update_time` timestamp NULL DEFAULT NULL COMMENT '最后更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 15 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
新建system-job模块
添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45<dependencies>
<!--作为web项目存在-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--eureka 客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--实时健康监控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--配置客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-client</artifactId>
</dependency>
<dependency>
<groupId>pers.fulsun</groupId>
<artifactId>common-data</artifactId>
</dependency>
<dependency>
<groupId>pers.fulsun</groupId>
<artifactId>common-swagger</artifactId>
</dependency>
</dependencies>
<!-- 添加spring-boot的maven插件,不能少,打jar包时得用 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>添加配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14#springboot 只认application.properties和application.yml
server:
port: 8888 #服务端口
spring:
profiles:
active: dev #当前生效环境
application:
name: springcloud-job #指定应用的唯一标识/服务名
# 配置中心
cloud:
config:
name: ${spring.application.name},datasource,redis #指定工程于config server中的应用名
profile: ${spring.profiles.active} #指定工程于config server中的生效环境
uri: http://localhost:8080 #指定配置中心的注册路径生成SysSchedule类及相关操作类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SysSchedule extends BaseEntity {
private static final long serialVersionUID = 1L;
private Integer id;
private String scheBeanName;
private String scheMethod;
private String scheCron;
private String scheDesc;
private Integer scheStatus;
private Integer delFlag;
}启动类
1
2
3
4
5
6
7
8
9//开启定时任务
public class SpringCloudJobApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudJobApplication.class, args);
}
}
自定义线程类
自定义线程类SchedulingRunnable ,用于反射执行数据库中配置的类及定时方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80package pers.fulsun.demo.springcloud.helper;
import io.netty.util.internal.ObjectUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import pers.fulsun.demo.springcloud.utils.SpringUtil;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;
public class SchedulingRunnable implements Runnable {
/**
* 定时任务指定Bean类名
*/
private String scheBeanName;
/**
* 定时任务指定Bean的方法名
*/
private String scheMethod;
public void run() {
long startTime = System.currentTimeMillis();
log.info("执行对应的bean方法:{}.{}(),当前时间:{}", scheBeanName, scheMethod, startTime);
//利用反射生成对应的bean实例并执行指定方法,指定bean可能涉及数据库或者Spring体系的内容,所以统一从spring容器中获取
Object target = SpringUtil.getBeanByName(this.getScheBeanName());
Method method = null;
//执行实例判空
if (ObjectUtils.isNotEmpty(target)) {
try {
//获取指定方法
method = target.getClass().getDeclaredMethod(scheMethod);
} catch (NoSuchMethodException e) {
if (log.isErrorEnabled()) {
log.error(e.getMessage());
}
}
if (ObjectUtils.isNotEmpty(method)) {
//使该方法可访问可执行
ReflectionUtils.makeAccessible(method);
try {
method.invoke(target, null);
} catch (IllegalAccessException e) {
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
if (log.isDebugEnabled()) {
log.debug("方法执行完毕,耗时:{}", System.currentTimeMillis() - startTime);
}
}
}
/**
* 重写equals方法,否则移除定时任务将出现无法移除的情况,因为无法匹配
*
* @param o
* @return
*/
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchedulingRunnable that = (SchedulingRunnable) o;
return scheBeanName.equals(that.scheBeanName) &&
scheMethod.equals(that.getScheMethod());
}
public int hashCode() {
return Objects.hash(scheBeanName, scheMethod);
}
}
自定义ScheduledTask,
去除原生ScheduledTask构造函数的task参数依赖,仅实现原生的cancel方法
1
2
3
4
5
6
7
8
9
10
11
12public final class ScheduledTask {
static ScheduledFuture<?> future;
/**
* 定时任务取消
*/
public void cancel(){
ScheduledFuture<?> future = this.future;
if(ObjectUtil.isNotNull(future)){
future.cancel(true);
}
}
}
自定义定时任务注册类
代替上述的ScheduledTaskRegistrar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class CronTaskRegistrar implements DisposableBean {
private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
private TaskScheduler taskScheduler;
public void addCronTask(Runnable task, String cronExpression) {
addCronTask(new CronTask(task, cronExpression));
}
public void addCronTask(CronTask cronTask) {
if(log.isDebugEnabled()){
log.debug("注册新定时任务");
}
if (cronTask != null) {
Runnable task = cronTask.getRunnable();
if (this.scheduledTasks.containsKey(task)) {
removeCronTask(task);
}
this.scheduledTasks.put(task, scheduleCronTask(cronTask));
}
}
public void removeCronTask(Runnable task) {
if(log.isDebugEnabled()){
log.debug("移除定时任务");
}
boolean b = this.scheduledTasks.containsKey(task);
ScheduledTask scheduledTask = this.scheduledTasks.remove(task);
if (scheduledTask != null)
scheduledTask.cancel();
}
public ScheduledTask scheduleCronTask(CronTask cronTask) {
if(log.isDebugEnabled()){
log.debug("构建定时任务");
}
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
return scheduledTask;
}
public void destroy() {
for (ScheduledTask task : this.scheduledTasks.values()) {
task.cancel();
}
this.scheduledTasks.clear();
}
}
配置系统定时任务线程池
1 |
|
编写自动执行类
springboot项目启动自动执行sql语句查询数据库中的有效定时任务信息并将定时任务注册到任务列表
CommandLineRunner和ApplicationRunner的作用是相同的。不同之处在于CommandLineRunner接口的run()方法接收String数组作为参数,即是最原始的参数,没有做任何处理;而ApplicationRunner接口的run()方法接收ApplicationArguments对象作为参数,是对原始参数做了进一步的封装。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class InitApplication implements ApplicationRunner {
private SysScheduleService sysScheduleService;
private CronTaskRegistrar cronTaskRegistrar;
public void run(ApplicationArguments args) throws Exception {
//从数据库获取所有定时任务记录列表
List<SysSchedule> list = sysScheduleService.list();
for(SysSchedule sysSchedule: list){
//如果定时任务是有效状态,则将定时任务注册到任务列表
if(sysSchedule.getScheStatus() == CommonConst.VALID){
log.debug("注册定时任务:{}", sysSchedule);
SchedulingRunnable schedulingRunnable = new SchedulingRunnable(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod());
cronTaskRegistrar.addCronTask(schedulingRunnable, sysSchedule.getScheCron());
}
}
}
}
编写定时类与方法
用于被配置为定时任务类和定时任务方法
1
2
3
4
5
6
7
8
public class MySchedule1 {
public void test() {
// 此处仅用于测试,正式使用时方法可执行实际业务,如定时删除临时附件等
log.info("测试定时");
}
}
编写定时任务控制器
编写SysScheduleController,暴露接口供前端调用动态配置定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class SysScheduleController {
/**
* 定时任务操作类service
*/
private SysScheduleService sysScheduleService;
/**
* 定时任务注册类
*/
private CronTaskRegistrar cronTaskRegistrar;
/**
* 新增定时任务
*
* @param sysSchedule
* @return
*/
public String addJob(SysSchedule sysSchedule) {
if (ObjectUtils.isNull(sysSchedule) || ObjectUtils.isNull(sysSchedule.getScheBeanName()) || ObjectUtils.isNull(sysSchedule.getScheMethod())) {
return "请正确填写完整信息";
}
//判空
if (ObjectUtils.isNotNull(sysSchedule.getScheBeanName())) {
Object bean = SpringUtil.getBeanByName(sysSchedule.getScheBeanName());
if (ObjectUtils.isNull(bean)) {
return "查无此类,请核实";
}
try {
bean.getClass().getDeclaredMethod(sysSchedule.getScheMethod());
} catch (NoSuchMethodException e) {
return "查无此方法,请核实";
}
}
//验证cron表达式合法性
if (ObjectUtils.isNull(sysSchedule.getScheCron()) || (!isValidExpression(sysSchedule.getScheCron()))) {
return "请输入正确cron表达式";
}
//判重
if (ObjectUtils.isNotNull(this.sysScheduleService.findByBeanNameAndMethod(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod()))) {
return "该定时任务已存在,请核实";
}
this.sysScheduleService.save(sysSchedule);
//如果定时任务是激活状态
if (sysSchedule.getScheStatus() == Constants.VALID) {
SchedulingRunnable schedulingRunnable = new SchedulingRunnable(sysSchedule.getScheBeanName(), sysSchedule.getScheMethod());
cronTaskRegistrar.addCronTask(schedulingRunnable, sysSchedule.getScheCron());
}
return "OK";
}
/**
* 验证cron表达式是否正确
*
* @param corn
* @return
*/
public boolean isValidExpression(String corn) {
//使用spring内置工具CronSequenceGenerator进行校验
return CronSequenceGenerator.isValidExpression(corn);
}
}
测试
新增
结果
1
2
3
4
5
6
7
82021-09-21 14:12:36.523 INFO 9352 --- [nio-8888-exec-5] p.f.d.s.helper.CronTaskRegistrar : 注册新定时任务
2021-09-21 14:12:36.523 INFO 9352 --- [nio-8888-exec-5] p.f.d.s.helper.CronTaskRegistrar : 构建定时任务
2021-09-21 14:12:40.003 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 执行对应的bean方法:mySchedule1.test(),当前时间:1632204760003
2021-09-21 14:12:40.005 INFO 9352 --- [ TestThread-1] p.f.demo.springcloud.test.MySchedule1 : 测试定时
2021-09-21 14:12:40.005 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 方法执行完毕,耗时:2
2021-09-21 14:12:45.003 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 执行对应的bean方法:mySchedule1.test(),当前时间:1632204765003
2021-09-21 14:12:45.003 INFO 9352 --- [ TestThread-1] p.f.demo.springcloud.test.MySchedule1 : 测试定时
2021-09-21 14:12:45.003 INFO 9352 --- [ TestThread-1] p.f.d.s.helper.SchedulingRunnable : 方法执行完毕,耗时:0
完整控制器代码
1 | package pers.fulsun.demo.springcloud.controller; |