隊(duì)列是一種有用的設(shè)計(jì)模式,可以幫助你處理一般應(yīng)用規(guī)模和性能的挑戰(zhàn)。一些隊(duì)列可以幫助你處理的問題示例包括:
Nest 提供了@nestjs/bull包,這是Bull包的一個(gè)包裝器,Bull 是一個(gè)流行的、支持良好的、高性能的基于 Nodejs 的消息隊(duì)列系統(tǒng)應(yīng)用。該包將 Bull 隊(duì)列以 Nest 友好的方式添加到你的應(yīng)用中。
Bull 使用Redis持久化工作數(shù)據(jù),因此你需要在你的系統(tǒng)中安裝 Redis。因?yàn)樗腔?Redis 的,你的隊(duì)列結(jié)構(gòu)可以是完全分布式的并且和平臺(tái)無關(guān)。例如,你可以有一些隊(duì)列生產(chǎn)者、消費(fèi)者和監(jiān)聽者,他們運(yùn)行在 Nest 的一個(gè)或多個(gè)節(jié)點(diǎn)上,同時(shí),其他生產(chǎn)者、消費(fèi)者和監(jiān)聽者在其他 Node.js 平臺(tái)或者其他網(wǎng)絡(luò)節(jié)點(diǎn)上。
本章使用@nestjs/bull包,我們同時(shí)推薦閱讀BUll 文檔來獲取更多背景和應(yīng)用細(xì)節(jié)。
要開始使用,我們首先安裝需要的依賴:
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
一旦安裝過程完成,我們可以在根AppModule中導(dǎo)入BullModule。
app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
registerQueue()方法用于實(shí)例化并/或注冊隊(duì)列。隊(duì)列在不同的模塊和進(jìn)程之間共享,在底層則通過同樣的憑據(jù)連接到同樣的 Redis 數(shù)據(jù)庫。每個(gè)隊(duì)列由其name屬性區(qū)分(如下),當(dāng)共享隊(duì)列(跨模塊/進(jìn)程)時(shí),第一個(gè)registerQueue()方法同時(shí)實(shí)例化該隊(duì)列并向模塊注冊它。其他模塊(在相同或者不同進(jìn)程下)則簡單地注冊隊(duì)列。隊(duì)列注冊創(chuàng)建一個(gè)injection token,它可以被用在給定 Nest 模塊中獲取隊(duì)列。
針對每個(gè)隊(duì)列,傳遞一個(gè)包含下列屬性的配置對象:
-name:string- 一個(gè)隊(duì)列名稱,它可以被用作injection token(用于將隊(duì)列注冊到控制器/提供者),也可以作為裝飾器參數(shù)來將消費(fèi)者類和監(jiān)聽者與隊(duì)列聯(lián)系起來。是必須的。 -limiter:RateLimiter-該選項(xiàng)用于確定消息隊(duì)列處理速率,查看RateLimiter獲取更多信息??蛇x的。 -redis:RedisOpts-該選項(xiàng)用于配置 Redis 連接,查看RedisOpts獲取更多信息。可選的。 -prefix: string-隊(duì)列所有鍵的前綴??蛇x的。 -defaultJobOptions: JobOpts-選項(xiàng)用以控制新任務(wù)的默認(rèn)屬性。查看JobOpts獲取更多信息??蛇x的。 -settings: AdvancedSettings-高級(jí)隊(duì)列配置設(shè)置。這些通常不需要改變。查看AdvancedSettings獲取更多信息??蛇x的。
注意,name屬性是必須的。其他選項(xiàng)是可選的,為隊(duì)列行為提供更細(xì)節(jié)的控制。這些會(huì)直接傳遞給 Bull 的Queue構(gòu)造器。在這里閱讀更多選項(xiàng)。當(dāng)在第二個(gè)或者子模塊中注冊一個(gè)隊(duì)列時(shí),最佳時(shí)間是省略配置對象中除name屬性之外的所有選項(xiàng)。這些選項(xiàng)僅應(yīng)該在實(shí)例化隊(duì)列的模塊中確定。
在registerQueue()方法中傳遞多個(gè)逗號(hào)分隔的選項(xiàng)對象來創(chuàng)建多個(gè)隊(duì)列。
由于任務(wù)在 Redis 中是持久化的,每次當(dāng)一個(gè)特定名稱的隊(duì)列被實(shí)例化時(shí)(例如,當(dāng)一個(gè) app 啟動(dòng)/重啟時(shí)),它嘗試處理任何可能在前一個(gè)舊的任務(wù)遺留未完成的session。
每個(gè)隊(duì)里可能有一個(gè)或很多生產(chǎn)者、消費(fèi)者以及監(jiān)聽者。消費(fèi)者從一個(gè)特定命令隊(duì)列中獲取任務(wù):FIFO(默認(rèn),先進(jìn)先出),LIFO(后進(jìn)先出)或者依據(jù)優(yōu)先級(jí)。
控制隊(duì)列處理命令在這里討論。
任務(wù)生產(chǎn)者添加任務(wù)到隊(duì)列中。生產(chǎn)者是典型的應(yīng)用服務(wù)(Nest 提供者)。要添加工作到一個(gè)隊(duì)列,首先注冊隊(duì)列到服務(wù)中:
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
@InjectQueue()裝飾器由其名稱指定隊(duì)列,像它在registerQueue()方法中提供的那樣(例如,audio)。
現(xiàn)在,通過調(diào)用隊(duì)列的add()方法添加一個(gè)任務(wù),傳遞一個(gè)用戶定義的任務(wù)對象。任務(wù)表現(xiàn)為序列化的JavaScript對象(因?yàn)樗鼈儽淮鎯?chǔ)在 Redis 數(shù)據(jù)庫中)。你傳遞的任務(wù)形式是可選的;用它來在語義上表示你任務(wù)對象:
const job = await this.audioQueue.add({
foo: 'bar',
});
任務(wù)需要獨(dú)一無二的名字。這允許你創(chuàng)建專用的消費(fèi)者,這將僅處理給定名稱的處理任務(wù)。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
當(dāng)使用命名任務(wù)時(shí),你必須為每個(gè)添加到隊(duì)列中的特有名稱創(chuàng)建處理者,否則隊(duì)列會(huì)反饋缺失了給定任務(wù)的處理器。查看這里閱讀更多關(guān)于消費(fèi)命名任務(wù)的信息。
任務(wù)可以包括附加選項(xiàng)。在Quene.add()方法的job參數(shù)之后傳遞選項(xiàng)對象。任務(wù)選項(xiàng)屬性有:
這里是一些帶有任務(wù)選項(xiàng)的自定義任務(wù)示例。
要延遲任務(wù)的開始,使用delay配置屬性:
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 } // 3 seconds delayed
);
要從右端添加任務(wù)到隊(duì)列(以 LIFO(后進(jìn)先出)處理任務(wù)),設(shè)置配置對象的lifo屬性為true。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true }
);
要優(yōu)先一個(gè)任務(wù),使用priority屬性。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 }
);
消費(fèi)者是一個(gè)類,定義的方法要么處理添加到隊(duì)列中的任務(wù),要么監(jiān)聽隊(duì)列的事件,或者兩者皆有。使用@Processor()裝飾器來定義消費(fèi)者類,如下:
import { Processor } from '@nestjs/bull';
@Processor('audio')
export class AudioConsumer {}
裝飾器的字符串參數(shù)(例如,audio)是和類方法關(guān)聯(lián)的隊(duì)列名稱。
在消費(fèi)者類中,使用@Process()裝飾器來裝飾任務(wù)處理者。
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 10;
job.progress(progress);
}
return {};
}
}
裝飾器方法(例如transcode()) 在工作空閑或者隊(duì)列中有消息要處理的時(shí)候被調(diào)用。該處理器方法接受job對象作為其僅有的參數(shù)。處理器方法的返回值被保存在任務(wù)對象中,可以在之后被訪問,例如,在用于完成事件的監(jiān)聽者中。
Job對象有多個(gè)方法,允許你和他們的狀態(tài)交互。例如,上述代碼使用progress()方法來更新工作進(jìn)程。查看這里以了解完整的Job對象 API 參照。
你可以指定一個(gè)任務(wù)處理方法,僅處理指定類型(包含特定name的任務(wù))的任務(wù),這可以通過如下所述的將name傳遞給@Process()裝飾器完成。你在一個(gè)給定消費(fèi)者類中可以有多個(gè)@Process()處理器,以反應(yīng)每個(gè)任務(wù)類型(name),確保每個(gè)name有相應(yīng)的處理者。
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
當(dāng)隊(duì)列和/或任務(wù)狀態(tài)改變時(shí),Bull生成一個(gè)有用的事件集合。Nest 提供了一個(gè)裝飾器集合,允許訂閱一系列標(biāo)準(zhǔn)核心事件集合。他們從@nestjs/bull包中導(dǎo)出。
事件監(jiān)聽者必須在一個(gè)消費(fèi)者類中聲明(通過@Processor()裝飾器)。要監(jiān)聽一個(gè)事件,使用如下表格之一的裝飾器來聲明一個(gè)事件處理器。例如,當(dāng)一個(gè)任務(wù)進(jìn)入audio隊(duì)列活躍狀態(tài)時(shí),要監(jiān)聽其發(fā)射的事件,使用下列結(jié)構(gòu):
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
鑒于 BUll 運(yùn)行于分布式(多 node)環(huán)境,它定義了本地事件概念。該概念可以辨識(shí)出一個(gè)由完整的單一進(jìn)程觸發(fā)的事件,或者由不同進(jìn)程共享的隊(duì)列。一個(gè)本地事件是指在本地進(jìn)程中觸發(fā)的一個(gè)隊(duì)列行為或者狀態(tài)變更。換句話說,當(dāng)你的事件生產(chǎn)者和消費(fèi)者是本地單進(jìn)程時(shí),隊(duì)列中所有事件都是本地的。
當(dāng)一個(gè)隊(duì)列在多個(gè)進(jìn)程中共享時(shí),我們可能要遇到全局事件。對一個(gè)由其他進(jìn)程觸發(fā)的事件通知器進(jìn)程的監(jiān)聽者來說,它必須注冊為全局事件。
當(dāng)相應(yīng)事件發(fā)射時(shí)事件處理器被喚醒。該處理器被下表所示的簽名調(diào)用,提供訪問事件相關(guān)的信息。我們討論下面簽名中本地和全局事件處理器。
本地事件監(jiān)聽者 | 全局事件監(jiān)聽者 | 處理器方法簽名/當(dāng)觸發(fā)時(shí) |
---|---|---|
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 當(dāng)錯(cuò)誤發(fā)生時(shí),error 包括觸發(fā)錯(cuò)誤 |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - 一旦工作者空閑就等待執(zhí)行的任務(wù),jobId 包括進(jìn)入此狀態(tài)的 id |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job)-job 任務(wù)已啟動(dòng) |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job)-job 任務(wù)被標(biāo)記為延遲。這在時(shí)間循環(huán)崩潰或暫停時(shí)進(jìn)行調(diào)試工作時(shí)是很有效的 |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number)-job 任務(wù)進(jìn)程被更新為progress 值 |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) job 任務(wù)進(jìn)程成功以result 結(jié)束 |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error)job 任務(wù)以err 原因失敗 |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler()隊(duì)列被暫停 |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job)隊(duì)列被恢復(fù) |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 舊任務(wù)從隊(duì)列中被清理,job 是一個(gè)清理任務(wù)數(shù)組,type 是要清理的任務(wù)類型 |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler()在隊(duì)列處理完所有等待的任務(wù)(除非有些尚未處理的任務(wù)被延遲)時(shí)發(fā)射出 |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job)job 任務(wù)被成功移除 |
當(dāng)監(jiān)聽全局事件時(shí),簽名方法可能和本地有一點(diǎn)不同。特別地,本地版本的任何方法簽名接受job對象的方法簽名而不是全局版本的jobId(number)。要在這種情況下獲取實(shí)際的job對象的引用,使用Queue#getJob方法。這種調(diào)用可能需要等待,因此處理者應(yīng)該被聲明為async,例如:
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}
要獲取一個(gè)Queue對象(使用getJob()調(diào)用),你當(dāng)然必須注入它。同時(shí),隊(duì)列必須注冊到你要注入的模塊中。
在特定事件監(jiān)聽器裝飾器之外,你可以使用通用的@OnQueueEvent()裝飾器與BullQueueEvents或者BullQueueGlobalEvents枚舉相結(jié)合。在這里閱讀更多有關(guān)事件的內(nèi)容。
隊(duì)列有一個(gè) API 來實(shí)現(xiàn)管理功能比如暫停、恢復(fù)、檢索不同狀態(tài)的任務(wù)數(shù)量等。你可以在這里找到完整的隊(duì)列 API。直接在Queue對象上調(diào)用這些方法,如下所示的暫停/恢復(fù)示例。
使用pause()方法調(diào)用來暫停隊(duì)列。一個(gè)暫停的隊(duì)列在恢復(fù)前將不會(huì)處理新的任務(wù),但會(huì)繼續(xù)處理完當(dāng)前執(zhí)行的任務(wù)。
await audioQueue.pause();
要恢復(fù)一個(gè)暫停的隊(duì)列,使用resume()方法,如下:
await audioQueue.resume();
你可能需要異步而不是靜態(tài)傳遞隊(duì)列選項(xiàng)。在這種情況下,使用registerQueueAsync()方法,可以提供不同的異步配置方法。
一個(gè)方法是使用工廠函數(shù):
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});
我們的工廠函數(shù)方法和其他異步提供者(它可以是async的并可以使用inject來注入)方法相同。
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: +configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});
可選的,你可以使用useClass語法。
BullModule.registerQueueAsync({
name: 'audio',
useClass: BullConfigService,
});
上述結(jié)構(gòu)在BullModule中實(shí)例化BullConfigService,并通過調(diào)用createBullOptions()來用它提供一個(gè)選項(xiàng)對象。注意這意味著BullConfigService要實(shí)現(xiàn)BullOptionsFactory工廠接口,如下:
@Injectable()
class BullConfigService implements BullOptionsFactory {
createBullOptions(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}
要阻止在BullModule中創(chuàng)建BullConfigService并使用一個(gè)從其他模塊導(dǎo)入的提供者,可以使用useExisting語法。
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useExisting: ConfigService,
});
這個(gè)結(jié)構(gòu)和useClass有一個(gè)根本區(qū)別——BullModule將查找導(dǎo)入的模塊來重用現(xiàn)有的ConfigServie而不是實(shí)例化一個(gè)新的。
一個(gè)可用的示例見這里。
更多建議: