NestJS 隊(duì)列

2023-09-08 17:41 更新

隊(duì)列是一種有用的設(shè)計(jì)模式,可以幫助你處理一般應(yīng)用規(guī)模和性能的挑戰(zhàn)。一些隊(duì)列可以幫助你處理的問題示例包括:

  • 平滑輸出峰值。例如,如果用戶可以在任何時(shí)間創(chuàng)建資源敏感型任務(wù),你可以將其添加到一個(gè)消息隊(duì)列中而不是同步執(zhí)行。然后你可以通過工作者進(jìn)程從隊(duì)列中以一個(gè)可控的方式取出進(jìn)程。在應(yīng)用規(guī)模增大時(shí),你可以輕松添加新的隊(duì)列消費(fèi)者來提高后端任務(wù)處理能力。
  • 將可能阻塞Node.js事件循環(huán)的整體任務(wù)打碎。例如,如果一個(gè)用戶請求是 CPU 敏感型工作,例如音頻轉(zhuǎn)碼,你可以將其委托給其他進(jìn)程,從而保證用戶接口進(jìn)程保持響應(yīng)。
  • 在不同的服務(wù)間提供一個(gè)可信的通訊通道。例如,你可以將任務(wù)(工作)加入一個(gè)進(jìn)程或服務(wù),并由另一個(gè)進(jìn)程或服務(wù)來消費(fèi)他們。你可以在由其他任何進(jìn)程或服務(wù)執(zhí)行的工作完成、錯(cuò)誤或者其他狀態(tài)變化時(shí)得到通知(通過監(jiān)聽狀態(tài)事件)。當(dāng)隊(duì)列生產(chǎn)者或者消費(fèi)者失敗時(shí),他們的狀態(tài)會(huì)被保留,任務(wù)將在 node 重啟后自動(dòng)重啟。

Nest 提供了@nestjs/bull包,這是Bull包的一個(gè)包裝器,Bull 是一個(gè)流行的、支持良好的、高性能的基于 Nodejs 的消息隊(duì)列系統(tǒng)應(yīng)用。該包將 Bull 隊(duì)列以 Nest 友好的方式添加到你的應(yīng)用中。

Bull 使用Redis持久化工作數(shù)據(jù),因此你需要在你的系統(tǒng)中安裝 Redis。因?yàn)樗腔?Redis 的,你的隊(duì)列結(jié)構(gòu)可以是完全分布式的并且和平臺(tái)無關(guān)。例如,你可以有一些隊(duì)列生產(chǎn)者、消費(fèi)者監(jiān)聽者,他們運(yùn)行在 Nest 的一個(gè)或多個(gè)節(jié)點(diǎn)上,同時(shí),其他生產(chǎn)者、消費(fèi)者和監(jiān)聽者在其他 Node.js 平臺(tái)或者其他網(wǎng)絡(luò)節(jié)點(diǎn)上。

本章使用@nestjs/bull包,我們同時(shí)推薦閱讀BUll 文檔來獲取更多背景和應(yīng)用細(xì)節(jié)。

安裝

要開始使用,我們首先安裝需要的依賴:

$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull

一旦安裝過程完成,我們可以在根AppModule中導(dǎo)入BullModule。

app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'audio',
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class AppModule {}

registerQueue()方法用于實(shí)例化并/或注冊隊(duì)列。隊(duì)列在不同的模塊和進(jìn)程之間共享,在底層則通過同樣的憑據(jù)連接到同樣的 Redis 數(shù)據(jù)庫。每個(gè)隊(duì)列由其name屬性區(qū)分(如下),當(dāng)共享隊(duì)列(跨模塊/進(jìn)程)時(shí),第一個(gè)registerQueue()方法同時(shí)實(shí)例化該隊(duì)列并向模塊注冊它。其他模塊(在相同或者不同進(jìn)程下)則簡單地注冊隊(duì)列。隊(duì)列注冊創(chuàng)建一個(gè)injection token,它可以被用在給定 Nest 模塊中獲取隊(duì)列。

針對每個(gè)隊(duì)列,傳遞一個(gè)包含下列屬性的配置對象:

-name:string- 一個(gè)隊(duì)列名稱,它可以被用作injection token(用于將隊(duì)列注冊到控制器/提供者),也可以作為裝飾器參數(shù)來將消費(fèi)者類和監(jiān)聽者與隊(duì)列聯(lián)系起來。是必須的。 -limiter:RateLimiter-該選項(xiàng)用于確定消息隊(duì)列處理速率,查看RateLimiter獲取更多信息??蛇x的。 -redis:RedisOpts-該選項(xiàng)用于配置 Redis 連接,查看RedisOpts獲取更多信息。可選的。 -prefix: string-隊(duì)列所有鍵的前綴??蛇x的。 -defaultJobOptions: JobOpts-選項(xiàng)用以控制新任務(wù)的默認(rèn)屬性。查看JobOpts獲取更多信息??蛇x的。 -settings: AdvancedSettings-高級(jí)隊(duì)列配置設(shè)置。這些通常不需要改變。查看AdvancedSettings獲取更多信息??蛇x的。

注意,name屬性是必須的。其他選項(xiàng)是可選的,為隊(duì)列行為提供更細(xì)節(jié)的控制。這些會(huì)直接傳遞給 Bull 的Queue構(gòu)造器。在這里閱讀更多選項(xiàng)。當(dāng)在第二個(gè)或者子模塊中注冊一個(gè)隊(duì)列時(shí),最佳時(shí)間是省略配置對象中除name屬性之外的所有選項(xiàng)。這些選項(xiàng)僅應(yīng)該在實(shí)例化隊(duì)列的模塊中確定。

在registerQueue()方法中傳遞多個(gè)逗號(hào)分隔的選項(xiàng)對象來創(chuàng)建多個(gè)隊(duì)列。

由于任務(wù)在 Redis 中是持久化的,每次當(dāng)一個(gè)特定名稱的隊(duì)列被實(shí)例化時(shí)(例如,當(dāng)一個(gè) app 啟動(dòng)/重啟時(shí)),它嘗試處理任何可能在前一個(gè)舊的任務(wù)遺留未完成的session。

每個(gè)隊(duì)里可能有一個(gè)或很多生產(chǎn)者、消費(fèi)者以及監(jiān)聽者。消費(fèi)者從一個(gè)特定命令隊(duì)列中獲取任務(wù):FIFO(默認(rèn),先進(jìn)先出),LIFO(后進(jìn)先出)或者依據(jù)優(yōu)先級(jí)。

控制隊(duì)列處理命令在這里討論。

生產(chǎn)者

任務(wù)生產(chǎn)者添加任務(wù)到隊(duì)列中。生產(chǎn)者是典型的應(yīng)用服務(wù)(Nest 提供者)。要添加工作到一個(gè)隊(duì)列,首先注冊隊(duì)列到服務(wù)中:

import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';

@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}

@InjectQueue()裝飾器由其名稱指定隊(duì)列,像它在registerQueue()方法中提供的那樣(例如,audio)。

現(xiàn)在,通過調(diào)用隊(duì)列的add()方法添加一個(gè)任務(wù),傳遞一個(gè)用戶定義的任務(wù)對象。任務(wù)表現(xiàn)為序列化的JavaScript對象(因?yàn)樗鼈儽淮鎯?chǔ)在 Redis 數(shù)據(jù)庫中)。你傳遞的任務(wù)形式是可選的;用它來在語義上表示你任務(wù)對象:

const job = await this.audioQueue.add({
  foo: 'bar',
});

命名的任務(wù)

任務(wù)需要獨(dú)一無二的名字。這允許你創(chuàng)建專用的消費(fèi)者,這將僅處理給定名稱的處理任務(wù)。

const job = await this.audioQueue.add('transcode', {
  foo: 'bar',
});

當(dāng)使用命名任務(wù)時(shí),你必須為每個(gè)添加到隊(duì)列中的特有名稱創(chuàng)建處理者,否則隊(duì)列會(huì)反饋缺失了給定任務(wù)的處理器。查看這里閱讀更多關(guān)于消費(fèi)命名任務(wù)的信息。

任務(wù)選項(xiàng)

任務(wù)可以包括附加選項(xiàng)。在Quene.add()方法的job參數(shù)之后傳遞選項(xiàng)對象。任務(wù)選項(xiàng)屬性有:

  • priority: number-選項(xiàng)優(yōu)先級(jí)值。范圍從 1(最高優(yōu)先)到 MAX_INT(最低優(yōu)先)。注意使用屬性對性能有輕微影響,因此要小心使用。
  • delay: number- 任務(wù)執(zhí)行前等待的時(shí)間(毫秒)。注意,為了精確延時(shí),服務(wù)端和客戶端時(shí)鐘應(yīng)該同步。
  • attempts: number-任務(wù)結(jié)束前總的嘗試次數(shù)。
  • repeat: RepeatOpts-按照定時(shí)設(shè)置重復(fù)任務(wù)記錄,查看RepeatOpts。
  • backoff: number | BackoffOpts- 如果任務(wù)失敗,自動(dòng)重試閃避設(shè)置,查看BackoffOpts。
  • lifo: boolean-如果為true,從隊(duì)列右端添加任務(wù)以替代從左邊添加(默認(rèn)為 false)。
  • timeout: number-任務(wù)超時(shí)失敗的毫秒數(shù)。
  • jobId: number | string- 覆蓋任務(wù) ID-默認(rèn)地,任務(wù) ID 是唯一的整數(shù),但你可以使用該參數(shù)覆蓋它。如果你使用這個(gè)選項(xiàng),你需要保證jobId是唯一的。如果你嘗試添加一個(gè)包含已有 id 的任務(wù),它不會(huì)被添加。
  • removeOnComplete: boolean | number-如果為true,當(dāng)任務(wù)完成時(shí)移除任務(wù)。一個(gè)數(shù)字用來指定要保存的任務(wù)數(shù)。默認(rèn)行為是將完成的工作保存在已完成的設(shè)置中。
  • removeOnFail: boolean | number-如果為true,當(dāng)所有嘗試失敗時(shí)移除任務(wù)。一個(gè)數(shù)字用來指定要保存的任務(wù)數(shù)。默認(rèn)行為是將失敗的任務(wù)保存在已失敗的設(shè)置中。
  • stackTraceLimit: number-限制在stacktrace中保存的堆棧跟蹤線。

這里是一些帶有任務(wù)選項(xiàng)的自定義任務(wù)示例。

要延遲任務(wù)的開始,使用delay配置屬性:

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { delay: 3000 } // 3 seconds delayed
);

要從右端添加任務(wù)到隊(duì)列(以 LIFO(后進(jìn)先出)處理任務(wù)),設(shè)置配置對象的lifo屬性為true。

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { lifo: true }
);

要優(yōu)先一個(gè)任務(wù),使用priority屬性。

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { priority: 2 }
);

消費(fèi)者

消費(fèi)者是一個(gè)類,定義的方法要么處理添加到隊(duì)列中的任務(wù),要么監(jiān)聽隊(duì)列的事件,或者兩者皆有。使用@Processor()裝飾器來定義消費(fèi)者類,如下:

import { Processor } from '@nestjs/bull';

@Processor('audio')
export class AudioConsumer {}

裝飾器的字符串參數(shù)(例如,audio)是和類方法關(guān)聯(lián)的隊(duì)列名稱。

在消費(fèi)者類中,使用@Process()裝飾器來裝飾任務(wù)處理者。

import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {
  @Process()
  async transcode(job: Job<unknown>) {
    let progress = 0;
    for (i = 0; i < 100; i++) {
      await doSomething(job.data);
      progress += 10;
      job.progress(progress);
    }
    return {};
  }
}

裝飾器方法(例如transcode()) 在工作空閑或者隊(duì)列中有消息要處理的時(shí)候被調(diào)用。該處理器方法接受job對象作為其僅有的參數(shù)。處理器方法的返回值被保存在任務(wù)對象中,可以在之后被訪問,例如,在用于完成事件的監(jiān)聽者中。

Job對象有多個(gè)方法,允許你和他們的狀態(tài)交互。例如,上述代碼使用progress()方法來更新工作進(jìn)程。查看這里以了解完整的Job對象 API 參照。

你可以指定一個(gè)任務(wù)處理方法,僅處理指定類型(包含特定name的任務(wù))的任務(wù),這可以通過如下所述的將name傳遞給@Process()裝飾器完成。你在一個(gè)給定消費(fèi)者類中可以有多個(gè)@Process()處理器,以反應(yīng)每個(gè)任務(wù)類型(name),確保每個(gè)name有相應(yīng)的處理者。

@Process('transcode')
async transcode(job: Job<unknown>) { ... }

事件監(jiān)聽者

當(dāng)隊(duì)列和/或任務(wù)狀態(tài)改變時(shí),Bull生成一個(gè)有用的事件集合。Nest 提供了一個(gè)裝飾器集合,允許訂閱一系列標(biāo)準(zhǔn)核心事件集合。他們從@nestjs/bull包中導(dǎo)出。

事件監(jiān)聽者必須在一個(gè)消費(fèi)者類中聲明(通過@Processor()裝飾器)。要監(jiān)聽一個(gè)事件,使用如下表格之一的裝飾器來聲明一個(gè)事件處理器。例如,當(dāng)一個(gè)任務(wù)進(jìn)入audio隊(duì)列活躍狀態(tài)時(shí),要監(jiān)聽其發(fā)射的事件,使用下列結(jié)構(gòu):

import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {

  @OnQueueActive()
  onActive(job: Job) {
    console.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }

鑒于 BUll 運(yùn)行于分布式(多 node)環(huán)境,它定義了本地事件概念。該概念可以辨識(shí)出一個(gè)由完整的單一進(jìn)程觸發(fā)的事件,或者由不同進(jìn)程共享的隊(duì)列。一個(gè)本地事件是指在本地進(jìn)程中觸發(fā)的一個(gè)隊(duì)列行為或者狀態(tài)變更。換句話說,當(dāng)你的事件生產(chǎn)者和消費(fèi)者是本地單進(jìn)程時(shí),隊(duì)列中所有事件都是本地的。

當(dāng)一個(gè)隊(duì)列在多個(gè)進(jìn)程中共享時(shí),我們可能要遇到全局事件。對一個(gè)由其他進(jìn)程觸發(fā)的事件通知器進(jìn)程的監(jiān)聽者來說,它必須注冊為全局事件。

當(dāng)相應(yīng)事件發(fā)射時(shí)事件處理器被喚醒。該處理器被下表所示的簽名調(diào)用,提供訪問事件相關(guān)的信息。我們討論下面簽名中本地和全局事件處理器。

本地事件監(jiān)聽者全局事件監(jiān)聽者處理器方法簽名/當(dāng)觸發(fā)時(shí)
@OnQueueError()@OnGlobalQueueError()handler(error: Error) - 當(dāng)錯(cuò)誤發(fā)生時(shí),error包括觸發(fā)錯(cuò)誤
@OnQueueWaiting()@OnGlobalQueueWaiting()handler(jobId: number | string) - 一旦工作者空閑就等待執(zhí)行的任務(wù),jobId包括進(jìn)入此狀態(tài)的 id
@OnQueueActive()@OnGlobalQueueActive()handler(job: Job)-job任務(wù)已啟動(dòng)
@OnQueueStalled()@OnGlobalQueueStalled()handler(job: Job)-job任務(wù)被標(biāo)記為延遲。這在時(shí)間循環(huán)崩潰或暫停時(shí)進(jìn)行調(diào)試工作時(shí)是很有效的
@OnQueueProgress()@OnGlobalQueueProgress()handler(job: Job, progress: number)-job任務(wù)進(jìn)程被更新為progress
@OnQueueCompleted()@OnGlobalQueueCompleted()handler(job: Job, result: any) job任務(wù)進(jìn)程成功以result結(jié)束
@OnQueueFailed()@OnGlobalQueueFailed()handler(job: Job, err: Error)job任務(wù)以err原因失敗
@OnQueuePaused()@OnGlobalQueuePaused()handler()隊(duì)列被暫停
@OnQueueResumed()@OnGlobalQueueResumed()handler(job: Job)隊(duì)列被恢復(fù)
@OnQueueCleaned()@OnGlobalQueueCleaned()handler(jobs: Job[], type: string) 舊任務(wù)從隊(duì)列中被清理,job是一個(gè)清理任務(wù)數(shù)組,type是要清理的任務(wù)類型
@OnQueueDrained()@OnGlobalQueueDrained()handler()在隊(duì)列處理完所有等待的任務(wù)(除非有些尚未處理的任務(wù)被延遲)時(shí)發(fā)射出
@OnQueueRemoved()@OnGlobalQueueRemoved()handler(job: Job)job任務(wù)被成功移除

當(dāng)監(jiān)聽全局事件時(shí),簽名方法可能和本地有一點(diǎn)不同。特別地,本地版本的任何方法簽名接受job對象的方法簽名而不是全局版本的jobId(number)。要在這種情況下獲取實(shí)際的job對象的引用,使用Queue#getJob方法。這種調(diào)用可能需要等待,因此處理者應(yīng)該被聲明為async,例如:

@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
  const job = await this.immediateQueue.getJob(jobId);
  console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}

要獲取一個(gè)Queue對象(使用getJob()調(diào)用),你當(dāng)然必須注入它。同時(shí),隊(duì)列必須注冊到你要注入的模塊中。

在特定事件監(jiān)聽器裝飾器之外,你可以使用通用的@OnQueueEvent()裝飾器與BullQueueEvents或者BullQueueGlobalEvents枚舉相結(jié)合。在這里閱讀更多有關(guān)事件的內(nèi)容。

隊(duì)列管理

隊(duì)列有一個(gè) API 來實(shí)現(xiàn)管理功能比如暫停、恢復(fù)、檢索不同狀態(tài)的任務(wù)數(shù)量等。你可以在這里找到完整的隊(duì)列 API。直接在Queue對象上調(diào)用這些方法,如下所示的暫停/恢復(fù)示例。

使用pause()方法調(diào)用來暫停隊(duì)列。一個(gè)暫停的隊(duì)列在恢復(fù)前將不會(huì)處理新的任務(wù),但會(huì)繼續(xù)處理完當(dāng)前執(zhí)行的任務(wù)。

await audioQueue.pause();

要恢復(fù)一個(gè)暫停的隊(duì)列,使用resume()方法,如下:

await audioQueue.resume();

異步配置

你可能需要異步而不是靜態(tài)傳遞隊(duì)列選項(xiàng)。在這種情況下,使用registerQueueAsync()方法,可以提供不同的異步配置方法。

一個(gè)方法是使用工廠函數(shù):

BullModule.registerQueueAsync({
  name: 'audio',
  useFactory: () => ({
    redis: {
      host: 'localhost',
      port: 6379,
    },
  }),
});

我們的工廠函數(shù)方法和其他異步提供者(它可以是async的并可以使用inject來注入)方法相同。

BullModule.registerQueueAsync({
  name: 'audio',
  imports: [ConfigModule],
  useFactory: async (configService: ConfigService) => ({
    redis: {
      host: configService.get('QUEUE_HOST'),
      port: +configService.get('QUEUE_PORT'),
    },
  }),
  inject: [ConfigService],
});

可選的,你可以使用useClass語法。

BullModule.registerQueueAsync({
  name: 'audio',
  useClass: BullConfigService,
});

上述結(jié)構(gòu)在BullModule中實(shí)例化BullConfigService,并通過調(diào)用createBullOptions()來用它提供一個(gè)選項(xiàng)對象。注意這意味著BullConfigService要實(shí)現(xiàn)BullOptionsFactory工廠接口,如下:

@Injectable()
class BullConfigService implements BullOptionsFactory {
  createBullOptions(): BullModuleOptions {
    return {
      redis: {
        host: 'localhost',
        port: 6379,
      },
    };
  }
}

要阻止在BullModule中創(chuàng)建BullConfigService并使用一個(gè)從其他模塊導(dǎo)入的提供者,可以使用useExisting語法。

BullModule.registerQueueAsync({
  name: 'audio',
  imports: [ConfigModule],
  useExisting: ConfigService,
});

這個(gè)結(jié)構(gòu)和useClass有一個(gè)根本區(qū)別——BullModule將查找導(dǎo)入的模塊來重用現(xiàn)有的ConfigServie而不是實(shí)例化一個(gè)新的。

示例

一個(gè)可用的示例見這里。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)