本篇文章將為大家?guī)黻P(guān)于Springboot項(xiàng)目中如何使用多線程并發(fā)的技術(shù)實(shí)現(xiàn)定時任務(wù)的內(nèi)容,下面包括了詳細(xì)的代碼實(shí)現(xiàn)過程,有興趣的小伙伴可以學(xué)習(xí)參考一下。
一、實(shí)現(xiàn)
1、啟動類
在啟動類添加注解@EnableScheduling開啟,不然不起用做。
2、新建任務(wù)類
添加注解@Component注冊到spring的容器中。
package com.example.demo.task;
import com.example.demo.entity.MyTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
/**
* @path:com.example.demo.task.ScheduledTask.java
* @className:ScheduledTask.java
* @description:定時任務(wù)
* @author:tanyp
* @dateTime:2020/7/23 21:37
* @editNote:
*/
@Slf4j
@Component
public class ScheduledTask implements SchedulingConfigurer {
private volatile ScheduledTaskRegistrar registrar;
private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();
/**
* 默認(rèn)啟動10個線程
*/
private static final Integer DEFAULT_THREAD_POOL = 10;
@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {
registrar.setScheduler(Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL));
this.registrar = registrar;
}
@PreDestroy
public void destroy() {
this.registrar.destroy();
}
/**
* @methodName:refreshTask
* @description:初始化任務(wù)
* 1、從數(shù)據(jù)庫獲取執(zhí)行任務(wù)的集合【TxTask】
* 2、通過調(diào)用 【refresh】 方法刷新任務(wù)列表
* 3、每次數(shù)據(jù)庫中的任務(wù)發(fā)生變化后重新執(zhí)行【1、2】
* @author:tanyp
* @dateTime:2020/7/23 21:37
* @Params: [tasks]
* @Return: void
* @editNote:
*/
public void refreshTask(List<MyTask> tasks) {
// 刪除已經(jīng)取消任務(wù)
scheduledFutures.keySet().forEach(key -> {
if (Objects.isNull(tasks) || tasks.size() == 0) {
scheduledFutures.get(key).cancel(false);
scheduledFutures.remove(key);
cronTasks.remove(key);
return;
}
tasks.forEach(task -> {
if (!Objects.equals(key, task.getTaskId())) {
scheduledFutures.get(key).cancel(false);
scheduledFutures.remove(key);
cronTasks.remove(key);
return;
}
});
});
// 添加新任務(wù)、更改執(zhí)行規(guī)則任務(wù)
tasks.forEach(txTask -> {
String expression = txTask.getExpression();
// 任務(wù)表達(dá)式為空則跳過
if (StringUtils.isEmpty(expression)) {
return;
}
// 任務(wù)已存在并且表達(dá)式未發(fā)生變化則跳過
if (scheduledFutures.containsKey(txTask.getTaskId()) && cronTasks.get(txTask.getTaskId()).getExpression().equals(expression)) {
return;
}
// 任務(wù)執(zhí)行時間發(fā)生了變化,則刪除該任務(wù)
if (scheduledFutures.containsKey(txTask.getTaskId())) {
scheduledFutures.get(txTask.getTaskId()).cancel(false);
scheduledFutures.remove(txTask.getTaskId());
cronTasks.remove(txTask.getTaskId());
}
CronTask task = new CronTask(new Runnable() {
@Override
public void run() {
// 執(zhí)行業(yè)務(wù)邏輯
try {
log.info("執(zhí)行單個任務(wù),任務(wù)ID【{}】執(zhí)行規(guī)則【{}】", txTask.getTaskId(), txTask.getExpression());
System.out.println("==========================執(zhí)行任務(wù)=============================");
} catch (Exception e) {
log.error("執(zhí)行發(fā)送消息任務(wù)異常,異常信息:{}", e);
}
}
}, expression);
ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(txTask.getTaskId(), task);
scheduledFutures.put(txTask.getTaskId(), future);
});
}
}
3、創(chuàng)建自啟動任務(wù)類
package com.example.demo.task;
import com.example.demo.task.ScheduledTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @path:com.example.demo.task.MyApplicationRunner.java
* @className:ScheduledTask.java
* @description:自啟動
* @author:tanyp
* @dateTime:2020/7/23 21:37
* @editNote:
*/
@Slf4j
@Component
public class MyApplicationRunner implements ApplicationRunner {
@Autowired
private ScheduledTask scheduledTask;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("================項(xiàng)目啟動初始化定時任務(wù)====開始===========");
/**
* 初始化三個任務(wù):
* 1、10秒執(zhí)行一次
* 2、15秒執(zhí)行一次
* 3、20秒執(zhí)行一次
*/
List<MyTask> tasks = Arrays.asList(
MyTask.builder().taskId("10001").expression("*/10 * * * * ?").build(),
MyTask.builder().taskId("10002").expression("*/15 * * * * ?").build(),
MyTask.builder().taskId("10003").expression("*/20 * * * * ?").build()
);
scheduledTask.refreshTask(tasks);
log.info("================項(xiàng)目啟動初始化定時任務(wù)====完成==========");
}
}
4、實(shí)體
package com.example.demo.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @path:com.example.demo.entity.MyTask.java
* @className:MyTask.java
* @description:任務(wù)實(shí)體
* @author:tanyp
* @dateTime:2020/7/23 21:41
* @editNote:
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MyTask {
/**
* 任務(wù)id
*/
private String taskId;
/**
* 任務(wù)執(zhí)行規(guī)則時間
*/
private String expression;
}
二、測試
初始化三個任務(wù),分別為:
10秒執(zhí)行一次(*/10 * * * * ?)
15秒執(zhí)行一次(*/15 * * * * ?)
20秒執(zhí)行一次(*/20 * * * * ?)
測試效果:
可以看到初始化的三個任務(wù)都在執(zhí)行,并且是不用的線程在執(zhí)行。
三、動態(tài)使用方式
1、啟動方式有兩種:
- 啟動項(xiàng)目后,手動調(diào)用ScheduledTask.refreshTask(List tasks),并初始化任務(wù)列表;
- 使用我測試中的方式,配置項(xiàng)目啟動完成后自動調(diào)用初始任務(wù)的方法,并初始化任務(wù)列表。
2、數(shù)據(jù)初始化
只需要給 List集合賦值并調(diào)用refreshTask()方法即可:
- 根據(jù)業(yè)務(wù)需求修改MyTask實(shí)體類;
- 這里的初始化數(shù)據(jù)可以從數(shù)據(jù)庫讀取數(shù)據(jù)賦值給集合;
例如:從mysql讀取任務(wù)配置表的數(shù)據(jù),調(diào)用refreshTask()方法。
3、如何動態(tài)?
- 修改:修改某一項(xiàng)正在執(zhí)行的任務(wù)規(guī)則;
- 添加:添加一項(xiàng)新的任務(wù);
- 刪除:停止某一項(xiàng)正在執(zhí)行的任務(wù)。
例如:我們有一張任務(wù)配置表,此時進(jìn)行分別新增一條或多條數(shù)據(jù)、刪除一條或多條數(shù)據(jù)、改一條數(shù)據(jù),只需要完成以上任何一項(xiàng)操作后,重新調(diào)用一下refreshTask()方法即可。
怎么重新調(diào)用 refreshTask()方法:可以另外啟一個任務(wù)實(shí)時監(jiān)控任務(wù)表的數(shù)據(jù)變化。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持W3Cschool。