Egg 多進(jìn)程模型和進(jìn)程間通訊

2020-02-06 14:11 更新

我們知道 JavaScript 代碼是運(yùn)行在單線程上的,換句話說一個(gè) Node.js 進(jìn)程只能運(yùn)行在一個(gè) CPU 上。那么如果用 Node.js 來做 Web Server,就無法享受到多核運(yùn)算的好處。作為企業(yè)級(jí)的解決方案,我們要解決的一個(gè)問題就是:

如何榨干服務(wù)器資源,利用上多核 CPU 的并發(fā)優(yōu)勢(shì)?

而 Node.js 官方提供的解決方案是 Cluster 模塊,其中包含一段簡(jiǎn)介:

單個(gè) Node.js 實(shí)例在單線程環(huán)境下運(yùn)行。為了更好地利用多核環(huán)境,用戶有時(shí)希望啟動(dòng)一批 Node.js 進(jìn)程用于加載。集群化模塊使得你很方便地創(chuàng)建子進(jìn)程,以便于在服務(wù)端口之間共享。

Cluster 是什么呢?

簡(jiǎn)單的說,

  • 在服務(wù)器上同時(shí)啟動(dòng)多個(gè)進(jìn)程。
  • 每個(gè)進(jìn)程里都跑的是同一份源代碼(好比把以前一個(gè)進(jìn)程的工作分給多個(gè)進(jìn)程去做)。
  • 更神奇的是,這些進(jìn)程可以同時(shí)監(jiān)聽一個(gè)端口(具體原理推薦閱讀 @DavidCai1993 這篇 Cluster 實(shí)現(xiàn)原理)。

其中:

  • 負(fù)責(zé)啟動(dòng)其他進(jìn)程的叫做 Master 進(jìn)程,他好比是個(gè)『包工頭』,不做具體的工作,只負(fù)責(zé)啟動(dòng)其他進(jìn)程。
  • 其他被啟動(dòng)的叫 Worker 進(jìn)程,顧名思義就是干活的『工人』。它們接收請(qǐng)求,對(duì)外提供服務(wù)。
  • Worker 進(jìn)程的數(shù)量一般根據(jù)服務(wù)器的 CPU 核數(shù)來定,這樣就可以完美利用多核資源。
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}

框架的多進(jìn)程模型

上面的示例是不是很簡(jiǎn)單,但是作為企業(yè)級(jí)的解決方案,要考慮的東西還有很多。

  • Worker 進(jìn)程異常退出以后該如何處理?
  • 多個(gè) Worker 進(jìn)程之間如何共享資源?
  • 多個(gè) Worker 進(jìn)程之間如何調(diào)度?
  • ...

進(jìn)程守護(hù)

健壯性(又叫魯棒性)是企業(yè)級(jí)應(yīng)用必須考慮的問題,除了程序本身代碼質(zhì)量要保證,框架層面也需要提供相應(yīng)的『兜底』機(jī)制保證極端情況下應(yīng)用的可用性。

一般來說,Node.js 進(jìn)程退出可以分為兩類:

未捕獲異常

當(dāng)代碼拋出了異常沒有被捕獲到時(shí),進(jìn)程將會(huì)退出,此時(shí) Node.js 提供了 process.on('uncaughtException', handler) 接口來捕獲它,但是當(dāng)一個(gè) Worker 進(jìn)程遇到 未捕獲的異常 時(shí),它已經(jīng)處于一個(gè)不確定狀態(tài),此時(shí)我們應(yīng)該讓這個(gè)進(jìn)程優(yōu)雅退出:

  1. 關(guān)閉異常 Worker 進(jìn)程所有的 TCP Server(將已有的連接快速斷開,且不再接收新的連接),斷開和 Master 的 IPC 通道,不再接受新的用戶請(qǐng)求。
  2. Master 立刻 fork 一個(gè)新的 Worker 進(jìn)程,保證在線的『工人』總數(shù)不變。
  3. 異常 Worker 等待一段時(shí)間,處理完已經(jīng)接受的請(qǐng)求后退出。
+---------+                 +---------+
| Worker | | Master |
+---------+ +----+----+
| uncaughtException |
+------------+ |
| | | +---------+
| <----------+ | | Worker |
| | +----+----+
| disconnect | fork a new worker |
+-------------------------> + ---------------------> |
| wait... | |
| exit | |
+-------------------------> | |
| | |
die | |
| |
| |

OOM、系統(tǒng)異常

而當(dāng)一個(gè)進(jìn)程出現(xiàn)異常導(dǎo)致 crash 或者 OOM 被系統(tǒng)殺死時(shí),不像未捕獲異常發(fā)生時(shí)我們還有機(jī)會(huì)讓進(jìn)程繼續(xù)執(zhí)行,只能夠讓當(dāng)前進(jìn)程直接退出,Master 立刻 fork 一個(gè)新的 Worker。

在框架里,我們采用 graceful 和 egg-cluster 兩個(gè)模塊配合實(shí)現(xiàn)上面的邏輯。這套方案已在阿里巴巴和螞蟻金服的生產(chǎn)環(huán)境廣泛部署,且經(jīng)受過『雙11』大促的考驗(yàn),所以是相對(duì)穩(wěn)定和靠譜的。

Agent 機(jī)制

說到這里,Node.js 多進(jìn)程方案貌似已經(jīng)成型,這也是我們?cè)缙诰€上使用的方案。但后來我們發(fā)現(xiàn)有些工作其實(shí)不需要每個(gè) Worker 都去做,如果都做,一來是浪費(fèi)資源,更重要的是可能會(huì)導(dǎo)致多進(jìn)程間資源訪問沖突。舉個(gè)例子:生產(chǎn)環(huán)境的日志文件我們一般會(huì)按照日期進(jìn)行歸檔,在單進(jìn)程模型下這再簡(jiǎn)單不過了:

每天凌晨 0 點(diǎn),將當(dāng)前日志文件按照日期進(jìn)行重命名銷毀以前的文件句柄,并創(chuàng)建新的日志文件繼續(xù)寫入

試想如果現(xiàn)在是 4 個(gè)進(jìn)程來做同樣的事情,是不是就亂套了。所以,對(duì)于這一類后臺(tái)運(yùn)行的邏輯,我們希望將它們放到一個(gè)單獨(dú)的進(jìn)程上去執(zhí)行,這個(gè)進(jìn)程就叫 Agent Worker,簡(jiǎn)稱 Agent。Agent 好比是 Master 給其他 Worker 請(qǐng)的一個(gè)『秘書』,它不對(duì)外提供服務(wù),只給 App Worker 打工,專門處理一些公共事務(wù)?,F(xiàn)在我們的多進(jìn)程模型就變成下面這個(gè)樣子了

                +--------+          +-------+
| Master |<-------->| Agent |
+--------+ +-------+
^ ^ ^
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+

那我們框架的啟動(dòng)時(shí)序如下:

+---------+           +---------+          +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
|<--------------------+ |
| | fork worker |
+----------------------------------------->|
| worker ready | |
|<-----------------------------------------+
| Egg ready | |
+-------------------->| |
| Egg ready | |
+----------------------------------------->|
  1. Master 啟動(dòng)后先 fork Agent 進(jìn)程
  2. Agent 初始化成功后,通過 IPC 通道通知 Master
  3. Master 再 fork 多個(gè) App Worker
  4. App Worker 初始化成功,通知 Master
  5. 所有的進(jìn)程初始化成功后,Master 通知 Agent 和 Worker 應(yīng)用啟動(dòng)成功

另外,關(guān)于 Agent Worker 還有幾點(diǎn)需要注意的是:

  1. 由于 App Worker 依賴于 Agent,所以必須等 Agent 初始化完成后才能 fork App Worker
  2. Agent 雖然是 App Worker 的『小秘』,但是業(yè)務(wù)相關(guān)的工作不應(yīng)該放到 Agent 上去做,不然把她累垮了就不好了
  3. 由于 Agent 的特殊定位,我們應(yīng)該保證它相對(duì)穩(wěn)定。當(dāng)它發(fā)生未捕獲異常,框架不會(huì)像 App Worker 一樣讓他退出重啟,而是記錄異常日志、報(bào)警等待人工處理
  4. Agent 和普通 App Worker 掛載的 API 不完全一樣,如何識(shí)別差異可查看框架文檔

Agent 的用法

你可以在應(yīng)用或插件根目錄下的 agent.js 中實(shí)現(xiàn)你自己的邏輯(和啟動(dòng)自定義 用法類似,只是入口參數(shù)是 agent 對(duì)象)

// agent.js
module.exports = agent => {
// 在這里寫你的初始化邏輯

// 也可以通過 messenger 對(duì)象發(fā)送消息給 App Worker
// 但需要等待 App Worker 啟動(dòng)成功后才能發(fā)送,不然很可能丟失
agent.messenger.on('egg-ready', () => {
const data = { ... };
agent.messenger.sendToApp('xxx_action', data);
});
};
// app.js
module.exports = app => {
app.messenger.on('xxx_action', data => {
// ...
});
};

這個(gè)例子中,agent.js 的代碼會(huì)執(zhí)行在 agent 進(jìn)程上,app.js 的代碼會(huì)執(zhí)行在 Worker 進(jìn)程上,他們通過框架封裝的 messenger 對(duì)象進(jìn)行進(jìn)程間通訊(IPC),后面的章節(jié)會(huì)對(duì)框架的 IPC 做詳細(xì)的講解。

Master VS Agent VS Worker

當(dāng)一個(gè)應(yīng)用啟動(dòng)時(shí),會(huì)同時(shí)啟動(dòng)這三類進(jìn)程。

類型進(jìn)程數(shù)量作用穩(wěn)定性是否運(yùn)行業(yè)務(wù)代碼
Master1進(jìn)程管理,進(jìn)程間消息轉(zhuǎn)發(fā)非常高
Agent1后臺(tái)運(yùn)行工作(長(zhǎng)連接客戶端)少量
Worker一般設(shè)置為 CPU 核數(shù)執(zhí)行業(yè)務(wù)代碼一般

Master

在這個(gè)模型下,Master 進(jìn)程承擔(dān)了進(jìn)程管理的工作(類似 pm2),不運(yùn)行任何業(yè)務(wù)代碼,我們只需要運(yùn)行起一個(gè) Master 進(jìn)程它就會(huì)幫我們搞定所有的 Worker、Agent 進(jìn)程的初始化以及重啟等工作了。

Master 進(jìn)程的穩(wěn)定性是極高的,線上運(yùn)行時(shí)我們只需要通過 egg-scripts 后臺(tái)運(yùn)行通過 egg.startCluster 啟動(dòng)的 Master 進(jìn)程就可以了,不再需要使用 pm2 等進(jìn)程守護(hù)模塊。

$ egg-scripts start --daemon

Agent

在大部分情況下,我們?cè)趯憳I(yè)務(wù)代碼的時(shí)候完全不用考慮 Agent 進(jìn)程的存在,但是當(dāng)我們遇到一些場(chǎng)景,只想讓代碼運(yùn)行在一個(gè)進(jìn)程上的時(shí)候,Agent 進(jìn)程就到了發(fā)揮作用的時(shí)候了。

由于 Agent 只有一個(gè),而且會(huì)負(fù)責(zé)許多維持連接的臟活累活,因此它不能輕易掛掉和重啟,所以 Agent 進(jìn)程在監(jiān)聽到未捕獲異常時(shí)不會(huì)退出,但是會(huì)打印出錯(cuò)誤日志,我們需要對(duì)日志中的未捕獲異常提高警惕。

Worker

Worker 進(jìn)程負(fù)責(zé)處理真正的用戶請(qǐng)求和定時(shí)任務(wù)的處理。而 Egg 的定時(shí)任務(wù)也提供了只讓一個(gè) Worker 進(jìn)程運(yùn)行的能力,所以能夠通過定時(shí)任務(wù)解決的問題就不要放到 Agent 上執(zhí)行。

Worker 運(yùn)行的是業(yè)務(wù)代碼,相對(duì)會(huì)比 Agent 和 Master 進(jìn)程上運(yùn)行的代碼復(fù)雜度更高,穩(wěn)定性也低一點(diǎn),當(dāng) Worker 進(jìn)程異常退出時(shí),Master 進(jìn)程會(huì)重啟一個(gè) Worker 進(jìn)程。

進(jìn)程間通訊(IPC)

雖然每個(gè) Worker 進(jìn)程是相對(duì)獨(dú)立的,但是它們之間始終還是需要通訊的,叫進(jìn)程間通訊(IPC)。下面是 Node.js 官方提供的一段示例代碼

'use strict';
const cluster = require('cluster');

if (cluster.isMaster) {
const worker = cluster.fork();
worker.send('hi there');
worker.on('message', msg => {
console.log(`msg: ${msg} from worker#${worker.id}`);
});
} else if (cluster.isWorker) {
process.on('message', (msg) => {
process.send(msg);
});
}

細(xì)心的你可能已經(jīng)發(fā)現(xiàn) cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之間,Worker 與 Agent 進(jìn)程互相間是沒有的。那么 Worker 之間想通訊該怎么辦呢?是的,通過 Master 來轉(zhuǎn)發(fā)。

廣播消息: agent => all workers
+--------+ +-------+
| Master |<---------| Agent |
+--------+ +-------+
/ | \
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+

指定接收方: one worker => another worker
+--------+ +-------+
| Master |----------| Agent |
+--------+ +-------+
^ |
send to / |
worker 2 / |
/ |
/ v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+

為了方便調(diào)用,我們封裝了一個(gè) messenger 對(duì)象掛在 app / agent 實(shí)例上,提供一系列友好的 API。

發(fā)送

  • app.messenger.broadcast(action, data):發(fā)送給所有的 agent / app 進(jìn)程(包括自己)
  • app.messenger.sendToApp(action, data): 發(fā)送給所有的 app 進(jìn)程在 app 上調(diào)用該方法會(huì)發(fā)送給自己和其他的 app 進(jìn)程在 agent 上調(diào)用該方法會(huì)發(fā)送給所有的 app 進(jìn)程
  • app.messenger.sendToAgent(action, data): 發(fā)送給 agent 進(jìn)程在 app 上調(diào)用該方法會(huì)發(fā)送給 agent 進(jìn)程在 agent 上調(diào)用該方法會(huì)發(fā)送給 agent 自己
  • agent.messenger.sendRandom(action, data):app 上沒有該方法(現(xiàn)在 Egg 的實(shí)現(xiàn)是等同于 sentToAgent)agent 會(huì)隨機(jī)發(fā)送消息給一個(gè) app 進(jìn)程(由 master 來控制發(fā)送給誰)
  • app.messenger.sendTo(pid, action, data): 發(fā)送給指定進(jìn)程
// app.js
module.exports = app => {
// 注意,只有在 egg-ready 事件拿到之后才能發(fā)送消息
app.messenger.once('egg-ready', () => {
app.messenger.sendToAgent('agent-event', { foo: 'bar' });
app.messenger.sendToApp('app-event', { foo: 'bar' });
});
}

上面所有 app.messenger 上的方法都可以在 agent.messenger 上使用。

egg-ready

上面的示例中提到,需要等 egg-ready 消息之后才能發(fā)送消息。只有在 Master 確認(rèn)所有的 Agent 進(jìn)程和 Worker 進(jìn)程都已經(jīng)成功啟動(dòng)(并 ready)之后,才會(huì)通過 messenger 發(fā)送 egg-ready 消息給所有的 Agent 和 Worker,告知一切準(zhǔn)備就緒,IPC 通道可以開始使用了。

接收

在 messenger 上監(jiān)聽對(duì)應(yīng)的 action 事件,就可以收到其他進(jìn)程發(fā)送來的信息了。

app.messenger.on(action, data => {
// process data
});
app.messenger.once(action, data => {
// process data
});

agent 上的 messenger 接收消息的用法和 app 上一致。

IPC 實(shí)戰(zhàn)

我們通過一個(gè)簡(jiǎn)單的例子來感受一下在框架的多進(jìn)程模型下如何使用 IPC 解決實(shí)際問題。

需求

我們有一個(gè)接口需要從遠(yuǎn)程數(shù)據(jù)源中讀取一些數(shù)據(jù),對(duì)外部提供 API,但是這個(gè)數(shù)據(jù)源的數(shù)據(jù)很少變化,因此我們希望將數(shù)據(jù)緩存到內(nèi)存中以提升服務(wù)能力,降低 RT。此時(shí)就需要有一個(gè)更新內(nèi)存緩存的機(jī)制。

  1. 定時(shí)從遠(yuǎn)程數(shù)據(jù)源獲取數(shù)據(jù),更新內(nèi)存緩存,為了降低對(duì)數(shù)據(jù)源壓力,更新的間隔時(shí)間會(huì)設(shè)置的比較長(zhǎng)。
  2. 遠(yuǎn)程數(shù)據(jù)源提供一個(gè)檢查是否有數(shù)據(jù)更新的接口,我們的服務(wù)可以更頻繁的調(diào)用檢查接口,當(dāng)有數(shù)據(jù)更新時(shí)才去重新拉取數(shù)據(jù)。
  3. 遠(yuǎn)程數(shù)據(jù)源通過消息中間件推送數(shù)據(jù)更新的消息,我們的服務(wù)監(jiān)聽消息來更新數(shù)據(jù)。

在實(shí)際項(xiàng)目中,我們可以采用方案一用于兜底,結(jié)合方案三或者方案二的一種用于提升數(shù)據(jù)更新的實(shí)時(shí)性。而在這個(gè)示例中,我們會(huì)通過 IPC + 定時(shí)任務(wù)來同時(shí)實(shí)現(xiàn)這三種緩存更新方案。

實(shí)現(xiàn)

我們將所有的與遠(yuǎn)程數(shù)據(jù)源交互的邏輯封裝在一個(gè) Service 中,并提供 get 方法給 Controller 調(diào)用。

// app/service/source.js
let memoryCache = {};

class SourceService extends Service {
get(key) {
return memoryCache[key];
}

async checkUpdate() {
// check if remote data source has changed
const updated = await mockCheck();
this.ctx.logger.info('check update response %s', updated);
return updated;
}

async update() {
// update memory cache from remote
memoryCache = await mockFetch();
this.ctx.logger.info('update memory cache from remote: %j', memoryCache);
}
}

編寫定時(shí)任務(wù),實(shí)現(xiàn)方案一,每 10 分鐘定時(shí)從遠(yuǎn)程數(shù)據(jù)源獲取數(shù)據(jù)更新緩存做兜底。

// app/schedule/force_refresh.js
exports.schedule = {
interval: '10m',
type: 'all', // run in all workers
};

exports.task = async ctx => {
await ctx.service.source.update();
ctx.app.lastUpdateBy = 'force';
};

再編寫一個(gè)定時(shí)任務(wù)來實(shí)現(xiàn)方案二的檢查邏輯,每 10s 讓一個(gè) worker 調(diào)用檢查接口,當(dāng)發(fā)現(xiàn)數(shù)據(jù)有變化時(shí),通過 messenger 提供的方法通知所有的 Worker。

// app/schedule/pull_refresh.js
exports.schedule = {
interval: '10s',
type: 'worker', // only run in one worker
};

exports.task = async ctx => {
const needRefresh = await ctx.service.source.checkUpdate();
if (!needRefresh) return;

// notify all workers to update memory cache from `file`
ctx.app.messenger.sendToApp('refresh', 'pull');
};

在啟動(dòng)自定義文件中監(jiān)聽 refresh 事件,并更新數(shù)據(jù),所有的 Worker 進(jìn)程都能收到這個(gè)消息,并觸發(fā)更新,此時(shí)我們的方案二也已經(jīng)大功告成了。

// app.js
module.exports = app => {
app.messenger.on('refresh', by => {
app.logger.info('start update by %s', by);
// create an anonymous context to access service
const ctx = app.createAnonymousContext();
ctx.runInBackground(async () => {
await ctx.service.source.update();
app.lastUpdateBy = by;
});
});
};

現(xiàn)在我們來看看如何實(shí)現(xiàn)第三個(gè)方案。我們需要有一個(gè)消息中間件的客戶端,它會(huì)和服務(wù)端保持長(zhǎng)連接,這一類的長(zhǎng)連接維持比較適合在 Agent 進(jìn)程上做,可以有效降低連接數(shù),減少兩端的消耗。所以我們?cè)?Agent 進(jìn)程上來開啟消息監(jiān)聽。

// agent.js

const Subscriber = require('./lib/subscriber');

module.exports = agent => {
const subscriber = new Subscriber();
// listen changed event, broadcast to all workers
subscriber.on('changed', () => agent.messenger.sendToApp('refresh', 'push'));
};

通過合理使用 Agent 進(jìn)程、定時(shí)任務(wù)和 IPC,我們可以輕松搞定類似的需求并降低對(duì)數(shù)據(jù)源的壓力。具體的示例代碼可以查看 examples/ipc

更復(fù)雜的場(chǎng)景

上面的例子中,我們?cè)?Agent 進(jìn)程上運(yùn)行了一個(gè) subscriber,來監(jiān)聽消息中間件的消息,如果 Worker 進(jìn)程也需要監(jiān)聽一些消息怎么辦?如何通過 Agent 進(jìn)程建立連接再轉(zhuǎn)發(fā)給 Worker 進(jìn)程呢?這些問題可以在多進(jìn)程研發(fā)模式增強(qiáng)中找到答案。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)