考慮這樣的一個場景,當前你有1000個任務(wù),要讓這1000個任務(wù)每隔幾分鐘觸發(fā)某個操作。要是實現(xiàn)這樣的需求,很多人第一想法就是弄一個定時器。但是1000個任務(wù)就是1000個定時器,一個定時器是一個線程。為了解決這個問題,就出現(xiàn)了時間輪算法。本篇文章將為您講述什么是時間輪算法,以及在Java中怎么用代碼實現(xiàn)時間輪算法。
時間輪
時間輪簡介:時間輪方案將現(xiàn)實生活中的時鐘概念引入到軟件設(shè)計中,主要思路是定義一個時鐘周期(比如時鐘的12小時)和步長(比如時鐘的一秒走一次),當指針每走一步的時候,會獲取當前時鐘刻度上掛載的任務(wù)并執(zhí)行。
核心思想
- 一個環(huán)形數(shù)組存儲時間輪的所有槽(看你的手表),每個槽對應(yīng)當前時間輪的最小精度
- 超過當前時間輪最大表示范圍的會被丟到上層時間輪,上層時間輪的最小精度即為下層時間輪能表達的最大時間(時分秒概念)
- 每個槽對應(yīng)一個環(huán)形鏈表存儲該時間應(yīng)該被執(zhí)行的任務(wù)
- 需要一個線程去驅(qū)動指針運轉(zhuǎn),獲取到期任務(wù)
以下給出java 簡單手寫版本實現(xiàn)
代碼實現(xiàn)
時間輪主數(shù)據(jù)結(jié)構(gòu)
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:31
*/
@Slf4j
public class TimeWheel {
/**
* 一個槽的時間間隔(時間輪最小刻度)
*/
private long tickMs;
/**
* 時間輪大小(槽的個數(shù))
*/
private int wheelSize;
/**
* 一輪的時間跨度
*/
private long interval;
private long currentTime;
/**
* 槽
*/
private TimerTaskList[] buckets;
/**
* 上層時間輪
*/
private volatile TimeWheel overflowWheel;
/**
* 一個timer只有一個delayqueue
*/
private DelayQueue<TimerTaskList> delayQueue;
public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
this.currentTime = currentTime;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.buckets = new TimerTaskList[wheelSize];
this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue;
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new TimerTaskList();
}
}
public boolean add(TimerTaskEntry entry) {
long expiration = entry.getExpireMs();
if (expiration < tickMs + currentTime) {
//到期了
return false;
} else if (expiration < currentTime + interval) {
//扔進當前時間輪的某個槽里,只有時間大于某個槽,才會放進去
long virtualId = (expiration / tickMs);
int index = (int) (virtualId % wheelSize);
TimerTaskList bucket = buckets[index];
bucket.addTask(entry);
//設(shè)置bucket 過期時間
if (bucket.setExpiration(virtualId * tickMs)) {
//設(shè)好過期時間的bucket需要入隊
delayQueue.offer(bucket);
return true;
}
} else {
//當前輪不能滿足,需要扔到上一輪
TimeWheel timeWheel = getOverflowWheel();
return timeWheel.add(entry);
}
return false;
}
private TimeWheel getOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
}
}
}
return overflowWheel;
}
/**
* 推進指針
*
* @param timestamp
*/
public void advanceLock(long timestamp) {
if (timestamp > currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
this.getOverflowWheel().advanceLock(timestamp);
}
}
}
}
定時器接口
/** * 定時器 * @author apdoer * @version 1.0 * @date 2021/3/22 20:30 */ public interface Timer { /** * 添加一個新任務(wù) * * @param timerTask */ void add(TimerTask timerTask); /** * 推動指針 * * @param timeout */ void advanceClock(long timeout); /** * 等待執(zhí)行的任務(wù) * * @return */ int size(); /** * 關(guān)閉服務(wù),剩下的無法被執(zhí)行 */ void shutdown(); }
定時器實現(xiàn)
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 20:33
*/
@Slf4j
public class SystemTimer implements Timer {
/**
* 底層時間輪
*/
private TimeWheel timeWheel;
/**
* 一個Timer只有一個延時隊列
*/
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
/**
* 過期任務(wù)執(zhí)行線程
*/
private ExecutorService workerThreadPool;
/**
* 輪詢delayQueue獲取過期任務(wù)線程
*/
private ExecutorService bossThreadPool;
public SystemTimer() {
this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
this.workerThreadPool = Executors.newFixedThreadPool(100);
this.bossThreadPool = Executors.newFixedThreadPool(1);
//20ms推動一次時間輪運轉(zhuǎn)
this.bossThreadPool.submit(() -> {
for (; ; ) {
this.advanceClock(20);
}
});
}
public void addTimerTaskEntry(TimerTaskEntry entry) {
if (!timeWheel.add(entry)) {
//已經(jīng)過期了
TimerTask timerTask = entry.getTimerTask();
log.info("=====任務(wù):{} 已到期,準備執(zhí)行============",timerTask.getDesc());
workerThreadPool.submit(timerTask);
}
}
@Override
public void add(TimerTask timerTask) {
log.info("=======添加任務(wù)開始====task:{}", timerTask.getDesc());
TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());
timerTask.setTimerTaskEntry(entry);
addTimerTaskEntry(entry);
}
/**
* 推動指針運轉(zhuǎn)獲取過期任務(wù)
*
* @param timeout 時間間隔
* @return
*/
@Override
public synchronized void advanceClock(long timeout) {
try {
TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (bucket != null) {
//推進時間
timeWheel.advanceLock(bucket.getExpiration());
//執(zhí)行過期任務(wù)(包含降級)
bucket.clear(this::addTimerTaskEntry);
}
} catch (InterruptedException e) {
log.error("advanceClock error");
}
}
@Override
public int size() {
//todo
return 0;
}
@Override
public void shutdown() {
this.bossThreadPool.shutdown();
this.workerThreadPool.shutdown();
this.timeWheel = null;
}
}
存儲任務(wù)的環(huán)形鏈表
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:26
*/
@Data
@Slf4j
class TimerTaskList implements Delayed {
/**
* TimerTaskList 環(huán)形鏈表使用一個虛擬根節(jié)點root
*/
private TimerTaskEntry root = new TimerTaskEntry(null, -1);
{
root.next = root;
root.prev = root;
}
/**
* bucket的過期時間
*/
private AtomicLong expiration = new AtomicLong(-1L);
public long getExpiration() {
return expiration.get();
}
/**
* 設(shè)置bucket的過期時間,設(shè)置成功返回true
*
* @param expirationMs
* @return
*/
boolean setExpiration(long expirationMs) {
return expiration.getAndSet(expirationMs) != expirationMs;
}
public boolean addTask(TimerTaskEntry entry) {
boolean done = false;
while (!done) {
//如果TimerTaskEntry已經(jīng)在別的list中就先移除,同步代碼塊外面移除,避免死鎖,一直到成功為止
entry.remove();
synchronized (this) {
if (entry.timedTaskList == null) {
//加到鏈表的末尾
entry.timedTaskList = this;
TimerTaskEntry tail = root.prev;
entry.prev = tail;
entry.next = root;
tail.next = entry;
root.prev = entry;
done = true;
}
}
}
return true;
}
/**
* 從 TimedTaskList 移除指定的 timerTaskEntry
*
* @param entry
*/
public void remove(TimerTaskEntry entry) {
synchronized (this) {
if (entry.getTimedTaskList().equals(this)) {
entry.next.prev = entry.prev;
entry.prev.next = entry.next;
entry.next = null;
entry.prev = null;
entry.timedTaskList = null;
}
}
}
/**
* 移除所有
*/
public synchronized void clear(Consumer<TimerTaskEntry> entry) {
TimerTaskEntry head = root.next;
while (!head.equals(root)) {
remove(head);
entry.accept(head);
head = root.next;
}
expiration.set(-1L);
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
@Override
public int compareTo(Delayed o) {
if (o instanceof TimerTaskList) {
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
}
return 0;
}
}
存儲任務(wù)的容器entry
/**
* @author apdoer
* @version 1.0
* @date 2021/3/22 19:26
*/
@Data
class TimerTaskEntry implements Comparable<TimerTaskEntry> {
private TimerTask timerTask;
private long expireMs;
volatile TimerTaskList timedTaskList;
TimerTaskEntry next;
TimerTaskEntry prev;
public TimerTaskEntry(TimerTask timedTask, long expireMs) {
this.timerTask = timedTask;
this.expireMs = expireMs;
this.next = null;
this.prev = null;
}
void remove() {
TimerTaskList currentList = timedTaskList;
while (currentList != null) {
currentList.remove(this);
currentList = timedTaskList;
}
}
@Override
public int compareTo(TimerTaskEntry o) {
return ((int) (this.expireMs - o.expireMs));
}
}
任務(wù)包裝類(這里也可以將工作任務(wù)以線程變量的方式去傳入)
@Data
@Slf4j
class TimerTask implements Runnable {
/**
* 延時時間
*/
private long delayMs;
/**
* 任務(wù)所在的entry
*/
private TimerTaskEntry timerTaskEntry;
private String desc;
public TimerTask(String desc, long delayMs) {
this.desc = desc;
this.delayMs = delayMs;
this.timerTaskEntry = null;
}
public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
// 如果這個timetask已經(jīng)被一個已存在的TimerTaskEntry持有,先移除一個
if (timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
public TimerTaskEntry getTimerTaskEntry() {
return timerTaskEntry;
}
@Override
public void run() {
log.info("============={}任務(wù)執(zhí)行", desc);
}
}
以上就是關(guān)于手寫Java實現(xiàn)時間輪算法的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持W3Cschool。