我們知道 JavaScript 代碼是運(yùn)行在單線程上的,換句話說一個(gè) Node.js 進(jìn)程只能運(yùn)行在一個(gè) CPU 上。那么如果用 Node.js 來做 Web Server,就無法享受到多核運(yùn)算的好處。作為企業(yè)級(jí)的解決方案,我們要解決的一個(gè)問題就是:
如何榨干服務(wù)器資源,利用上多核 CPU 的并發(fā)優(yōu)勢(shì)? 而 Node.js 官方提供的解決方案是 Cluster 模塊 ,其中包含一段簡(jiǎn)介:
單個(gè) Node.js 實(shí)例在單線程環(huán)境下運(yùn)行。為了更好地利用多核環(huán)境,用戶有時(shí)希望啟動(dòng)一批 Node.js 進(jìn)程用于加載。集群化模塊使得你很方便地創(chuàng)建子進(jìn)程,以便于在服務(wù)端口之間共享。 Cluster 是什么呢?簡(jiǎn)單的說,
在服務(wù)器上同時(shí)啟動(dòng)多個(gè)進(jìn)程。 每個(gè)進(jìn)程里都跑的是同一份源代碼(好比把以前一個(gè)進(jìn)程的工作分給多個(gè)進(jìn)程去做)。 更神奇的是,這些進(jìn)程可以同時(shí)監(jiān)聽一個(gè)端口(具體原理推薦閱讀 @DavidCai1993 這篇 Cluster 實(shí)現(xiàn)原理 )。 其中:
負(fù)責(zé)啟動(dòng)其他進(jìn)程的叫做 Master 進(jìn)程,他好比是個(gè)『包工頭』,不做具體的工作,只負(fù)責(zé)啟動(dòng)其他進(jìn)程。 其他被啟動(dòng)的叫 Worker 進(jìn)程,顧名思義就是干活的『工人』。它們接收請(qǐng)求,對(duì)外提供服務(wù)。 Worker 進(jìn)程的數(shù)量一般根據(jù)服務(wù)器的 CPU 核數(shù)來定,這樣就可以完美利用多核資源。 const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
框架的多進(jìn)程模型上面的示例是不是很簡(jiǎn)單,但是作為企業(yè)級(jí)的解決方案,要考慮的東西還有很多。
Worker 進(jìn)程異常退出以后該如何處理? 多個(gè) Worker 進(jìn)程之間如何共享資源? 多個(gè) Worker 進(jìn)程之間如何調(diào)度? ... 進(jìn)程守護(hù)健壯性(又叫魯棒性)是企業(yè)級(jí)應(yīng)用必須考慮的問題,除了程序本身代碼質(zhì)量要保證,框架層面也需要提供相應(yīng)的『兜底』機(jī)制保證極端情況下應(yīng)用的可用性。
一般來說,Node.js 進(jìn)程退出可以分為兩類:
未捕獲異常當(dāng)代碼拋出了異常沒有被捕獲到時(shí),進(jìn)程將會(huì)退出,此時(shí) Node.js 提供了 process.on('uncaughtException', handler) 接口來捕獲它,但是當(dāng)一個(gè) Worker 進(jìn)程遇到 未捕獲的異常 時(shí),它已經(jīng)處于一個(gè)不確定狀態(tài),此時(shí)我們應(yīng)該讓這個(gè)進(jìn)程優(yōu)雅退出:
關(guān)閉異常 Worker 進(jìn)程所有的 TCP Server(將已有的連接快速斷開,且不再接收新的連接),斷開和 Master 的 IPC 通道,不再接受新的用戶請(qǐng)求。 Master 立刻 fork 一個(gè)新的 Worker 進(jìn)程,保證在線的『工人』總數(shù)不變。 異常 Worker 等待一段時(shí)間,處理完已經(jīng)接受的請(qǐng)求后退出。 +---------+ +---------+ | Worker | | Master | +---------+ +----+----+ | uncaughtException | +------------+ | | | | +---------+ | <----------+ | | Worker | | | +----+----+ | disconnect | fork a new worker | +-------------------------> + ---------------------> | | wait... | | | exit | | +-------------------------> | | | | | die | | | | | |
OOM、系統(tǒng)異常而當(dāng)一個(gè)進(jìn)程出現(xiàn)異常導(dǎo)致 crash 或者 OOM 被系統(tǒng)殺死時(shí),不像未捕獲異常發(fā)生時(shí)我們還有機(jī)會(huì)讓進(jìn)程繼續(xù)執(zhí)行,只能夠讓當(dāng)前進(jìn)程直接退出,Master 立刻 fork 一個(gè)新的 Worker。
在框架里,我們采用 graceful 和 egg-cluster 兩個(gè)模塊配合實(shí)現(xiàn)上面的邏輯。這套方案已在阿里巴巴和螞蟻金服的生產(chǎn)環(huán)境廣泛部署,且經(jīng)受過『雙11』大促的考驗(yàn),所以是相對(duì)穩(wěn)定和靠譜的。
Agent 機(jī)制說到這里,Node.js 多進(jìn)程方案貌似已經(jīng)成型,這也是我們?cè)缙诰€上使用的方案。但后來我們發(fā)現(xiàn)有些工作其實(shí)不需要每個(gè) Worker 都去做,如果都做,一來是浪費(fèi)資源,更重要的是可能會(huì)導(dǎo)致多進(jìn)程間資源訪問沖突。舉個(gè)例子:生產(chǎn)環(huán)境的日志文件我們一般會(huì)按照日期進(jìn)行歸檔,在單進(jìn)程模型下這再簡(jiǎn)單不過了:
每天凌晨 0 點(diǎn),將當(dāng)前日志文件按照日期進(jìn)行重命名銷毀以前的文件句柄,并創(chuàng)建新的日志文件繼續(xù)寫入 試想如果現(xiàn)在是 4 個(gè)進(jìn)程來做同樣的事情,是不是就亂套了。所以,對(duì)于這一類后臺(tái)運(yùn)行的邏輯,我們希望將它們放到一個(gè)單獨(dú)的進(jìn)程上去執(zhí)行,這個(gè)進(jìn)程就叫 Agent Worker,簡(jiǎn)稱 Agent。Agent 好比是 Master 給其他 Worker 請(qǐng)的一個(gè)『秘書』,它不對(duì)外提供服務(wù),只給 App Worker 打工,專門處理一些公共事務(wù)?,F(xiàn)在我們的多進(jìn)程模型就變成下面這個(gè)樣子了
+--------+ +-------+ | Master |<-------->| Agent | +--------+ +-------+ ^ ^ ^ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
那我們框架的啟動(dòng)時(shí)序如下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
Master 啟動(dòng)后先 fork Agent 進(jìn)程 Agent 初始化成功后,通過 IPC 通道通知 Master Master 再 fork 多個(gè) App Worker App Worker 初始化成功,通知 Master 所有的進(jìn)程初始化成功后,Master 通知 Agent 和 Worker 應(yīng)用啟動(dòng)成功 另外,關(guān)于 Agent Worker 還有幾點(diǎn)需要注意的是:
由于 App Worker 依賴于 Agent,所以必須等 Agent 初始化完成后才能 fork App Worker Agent 雖然是 App Worker 的『小秘』,但是業(yè)務(wù)相關(guān)的工作不應(yīng)該放到 Agent 上去做,不然把她累垮了就不好了 由于 Agent 的特殊定位,我們應(yīng)該保證它相對(duì)穩(wěn)定。當(dāng)它發(fā)生未捕獲異常,框架不會(huì)像 App Worker 一樣讓他退出重啟,而是記錄異常日志、報(bào)警等待人工處理 Agent 和普通 App Worker 掛載的 API 不完全一樣,如何識(shí)別差異可查看框架文檔 Agent 的用法你可以在應(yīng)用或插件根目錄下的 agent.js 中實(shí)現(xiàn)你自己的邏輯(和啟動(dòng)自定義 用法類似,只是入口參數(shù)是 agent 對(duì)象)
// agent.js module.exports = agent => { // 在這里寫你的初始化邏輯 // 也可以通過 messenger 對(duì)象發(fā)送消息給 App Worker // 但需要等待 App Worker 啟動(dòng)成功后才能發(fā)送,不然很可能丟失 agent.messenger.on('egg-ready', () => { const data = { ... }; agent.messenger.sendToApp('xxx_action', data); }); };
// app.js module.exports = app => { app.messenger.on('xxx_action', data => { // ... }); };
這個(gè)例子中,agent.js 的代碼會(huì)執(zhí)行在 agent 進(jìn)程上,app.js 的代碼會(huì)執(zhí)行在 Worker 進(jìn)程上,他們通過框架封裝的 messenger 對(duì)象進(jìn)行進(jìn)程間通訊(IPC),后面的章節(jié)會(huì)對(duì)框架的 IPC 做詳細(xì)的講解。
Master VS Agent VS Worker當(dāng)一個(gè)應(yīng)用啟動(dòng)時(shí),會(huì)同時(shí)啟動(dòng)這三類進(jìn)程。
類型 進(jìn)程數(shù)量 作用 穩(wěn)定性 是否運(yùn)行業(yè)務(wù)代碼 Master 1 進(jìn)程管理,進(jìn)程間消息轉(zhuǎn)發(fā) 非常高 否 Agent 1 后臺(tái)運(yùn)行工作(長(zhǎng)連接客戶端) 高 少量 Worker 一般設(shè)置為 CPU 核數(shù) 執(zhí)行業(yè)務(wù)代碼 一般 是
Master在這個(gè)模型下,Master 進(jìn)程承擔(dān)了進(jìn)程管理的工作(類似 pm2 ),不運(yùn)行任何業(yè)務(wù)代碼,我們只需要運(yùn)行起一個(gè) Master 進(jìn)程它就會(huì)幫我們搞定所有的 Worker、Agent 進(jìn)程的初始化以及重啟等工作了。
Master 進(jìn)程的穩(wěn)定性是極高的,線上運(yùn)行時(shí)我們只需要通過 egg-scripts 后臺(tái)運(yùn)行通過 egg.startCluster 啟動(dòng)的 Master 進(jìn)程就可以了,不再需要使用 pm2 等進(jìn)程守護(hù)模塊。
$ egg-scripts start --daemon
Agent在大部分情況下,我們?cè)趯憳I(yè)務(wù)代碼的時(shí)候完全不用考慮 Agent 進(jìn)程的存在,但是當(dāng)我們遇到一些場(chǎng)景,只想讓代碼運(yùn)行在一個(gè)進(jìn)程上的時(shí)候,Agent 進(jìn)程就到了發(fā)揮作用的時(shí)候了。
由于 Agent 只有一個(gè),而且會(huì)負(fù)責(zé)許多維持連接的臟活累活,因此它不能輕易掛掉和重啟,所以 Agent 進(jìn)程在監(jiān)聽到未捕獲異常時(shí)不會(huì)退出,但是會(huì)打印出錯(cuò)誤日志,我們需要對(duì)日志中的未捕獲異常提高警惕。
WorkerWorker 進(jìn)程負(fù)責(zé)處理真正的用戶請(qǐng)求和定時(shí)任務(wù) 的處理。而 Egg 的定時(shí)任務(wù)也提供了只讓一個(gè) Worker 進(jìn)程運(yùn)行的能力,所以能夠通過定時(shí)任務(wù)解決的問題就不要放到 Agent 上執(zhí)行。
Worker 運(yùn)行的是業(yè)務(wù)代碼,相對(duì)會(huì)比 Agent 和 Master 進(jìn)程上運(yùn)行的代碼復(fù)雜度更高,穩(wěn)定性也低一點(diǎn),當(dāng) Worker 進(jìn)程異常退出時(shí),Master 進(jìn)程會(huì)重啟一個(gè) Worker 進(jìn)程。
進(jìn)程間通訊(IPC)雖然每個(gè) Worker 進(jìn)程是相對(duì)獨(dú)立的,但是它們之間始終還是需要通訊的,叫進(jìn)程間通訊(IPC)。下面是 Node.js 官方提供的一段示例代碼
'use strict'; const cluster = require('cluster'); if (cluster.isMaster) { const worker = cluster.fork(); worker.send('hi there'); worker.on('message', msg => { console.log(`msg: ${msg} from worker#${worker.id}`); }); } else if (cluster.isWorker) { process.on('message', (msg) => { process.send(msg); }); }
細(xì)心的你可能已經(jīng)發(fā)現(xiàn) cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之間,Worker 與 Agent 進(jìn)程互相間是沒有的。那么 Worker 之間想通訊該怎么辦呢?是的,通過 Master 來轉(zhuǎn)發(fā)。
廣播消息: agent => all workers +--------+ +-------+ | Master |<---------| Agent | +--------+ +-------+ / | \ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+ 指定接收方: one worker => another worker +--------+ +-------+ | Master |----------| Agent | +--------+ +-------+ ^ | send to / | worker 2 / | / | / v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
為了方便調(diào)用,我們封裝了一個(gè) messenger 對(duì)象掛在 app / agent 實(shí)例上,提供一系列友好的 API。
發(fā)送app.messenger.broadcast(action, data):發(fā)送給所有的 agent / app 進(jìn)程(包括自己) app.messenger.sendToApp(action, data): 發(fā)送給所有的 app 進(jìn)程在 app 上調(diào)用該方法會(huì)發(fā)送給自己和其他的 app 進(jìn)程在 agent 上調(diào)用該方法會(huì)發(fā)送給所有的 app 進(jìn)程 app.messenger.sendToAgent(action, data): 發(fā)送給 agent 進(jìn)程在 app 上調(diào)用該方法會(huì)發(fā)送給 agent 進(jìn)程在 agent 上調(diào)用該方法會(huì)發(fā)送給 agent 自己 agent.messenger.sendRandom(action, data):app 上沒有該方法(現(xiàn)在 Egg 的實(shí)現(xiàn)是等同于 sentToAgent)agent 會(huì)隨機(jī)發(fā)送消息給一個(gè) app 進(jìn)程(由 master 來控制發(fā)送給誰) app.messenger.sendTo(pid, action, data): 發(fā)送給指定進(jìn)程 // app.js module.exports = app => { // 注意,只有在 egg-ready 事件拿到之后才能發(fā)送消息 app.messenger.once('egg-ready', () => { app.messenger.sendToAgent('agent-event', { foo: 'bar' }); app.messenger.sendToApp('app-event', { foo: 'bar' }); }); }
上面所有 app.messenger 上的方法都可以在 agent.messenger 上使用。
egg-ready上面的示例中提到,需要等 egg-ready 消息之后才能發(fā)送消息。只有在 Master 確認(rèn)所有的 Agent 進(jìn)程和 Worker 進(jìn)程都已經(jīng)成功啟動(dòng)(并 ready)之后,才會(huì)通過 messenger 發(fā)送 egg-ready 消息給所有的 Agent 和 Worker,告知一切準(zhǔn)備就緒,IPC 通道可以開始使用了。
接收在 messenger 上監(jiān)聽對(duì)應(yīng)的 action 事件,就可以收到其他進(jìn)程發(fā)送來的信息了。
app.messenger.on(action, data => { // process data }); app.messenger.once(action, data => { // process data });
agent 上的 messenger 接收消息的用法和 app 上一致。
IPC 實(shí)戰(zhàn)我們通過一個(gè)簡(jiǎn)單的例子來感受一下在框架的多進(jìn)程模型下如何使用 IPC 解決實(shí)際問題。
需求我們有一個(gè)接口需要從遠(yuǎn)程數(shù)據(jù)源中讀取一些數(shù)據(jù),對(duì)外部提供 API,但是這個(gè)數(shù)據(jù)源的數(shù)據(jù)很少變化,因此我們希望將數(shù)據(jù)緩存到內(nèi)存中以提升服務(wù)能力,降低 RT。此時(shí)就需要有一個(gè)更新內(nèi)存緩存的機(jī)制。
定時(shí)從遠(yuǎn)程數(shù)據(jù)源獲取數(shù)據(jù),更新內(nèi)存緩存,為了降低對(duì)數(shù)據(jù)源壓力,更新的間隔時(shí)間會(huì)設(shè)置的比較長(zhǎng)。 遠(yuǎn)程數(shù)據(jù)源提供一個(gè)檢查是否有數(shù)據(jù)更新的接口,我們的服務(wù)可以更頻繁的調(diào)用檢查接口,當(dāng)有數(shù)據(jù)更新時(shí)才去重新拉取數(shù)據(jù)。 遠(yuǎn)程數(shù)據(jù)源通過消息中間件推送數(shù)據(jù)更新的消息,我們的服務(wù)監(jiān)聽消息來更新數(shù)據(jù)。 在實(shí)際項(xiàng)目中,我們可以采用方案一用于兜底,結(jié)合方案三或者方案二的一種用于提升數(shù)據(jù)更新的實(shí)時(shí)性。而在這個(gè)示例中,我們會(huì)通過 IPC + 定時(shí)任務(wù) 來同時(shí)實(shí)現(xiàn)這三種緩存更新方案。
實(shí)現(xiàn)我們將所有的與遠(yuǎn)程數(shù)據(jù)源交互的邏輯封裝在一個(gè) Service 中,并提供 get 方法給 Controller 調(diào)用。
// app/service/source.js let memoryCache = {}; class SourceService extends Service { get(key) { return memoryCache[key]; } async checkUpdate() { // check if remote data source has changed const updated = await mockCheck(); this.ctx.logger.info('check update response %s', updated); return updated; } async update() { // update memory cache from remote memoryCache = await mockFetch(); this.ctx.logger.info('update memory cache from remote: %j', memoryCache); } }
編寫定時(shí)任務(wù),實(shí)現(xiàn)方案一,每 10 分鐘定時(shí)從遠(yuǎn)程數(shù)據(jù)源獲取數(shù)據(jù)更新緩存做兜底。
// app/schedule/force_refresh.js exports.schedule = { interval: '10m', type: 'all', // run in all workers }; exports.task = async ctx => { await ctx.service.source.update(); ctx.app.lastUpdateBy = 'force'; };
再編寫一個(gè)定時(shí)任務(wù)來實(shí)現(xiàn)方案二的檢查邏輯,每 10s 讓一個(gè) worker 調(diào)用檢查接口,當(dāng)發(fā)現(xiàn)數(shù)據(jù)有變化時(shí),通過 messenger 提供的方法通知所有的 Worker。
// app/schedule/pull_refresh.js exports.schedule = { interval: '10s', type: 'worker', // only run in one worker }; exports.task = async ctx => { const needRefresh = await ctx.service.source.checkUpdate(); if (!needRefresh) return; // notify all workers to update memory cache from `file` ctx.app.messenger.sendToApp('refresh', 'pull'); };
在啟動(dòng)自定義文件中監(jiān)聽 refresh 事件,并更新數(shù)據(jù),所有的 Worker 進(jìn)程都能收到這個(gè)消息,并觸發(fā)更新,此時(shí)我們的方案二也已經(jīng)大功告成了。
// app.js module.exports = app => { app.messenger.on('refresh', by => { app.logger.info('start update by %s', by); // create an anonymous context to access service const ctx = app.createAnonymousContext(); ctx.runInBackground(async () => { await ctx.service.source.update(); app.lastUpdateBy = by; }); }); };
現(xiàn)在我們來看看如何實(shí)現(xiàn)第三個(gè)方案。我們需要有一個(gè)消息中間件的客戶端,它會(huì)和服務(wù)端保持長(zhǎng)連接,這一類的長(zhǎng)連接維持比較適合在 Agent 進(jìn)程上做,可以有效降低連接數(shù),減少兩端的消耗。所以我們?cè)?Agent 進(jìn)程上來開啟消息監(jiān)聽。
// agent.js const Subscriber = require('./lib/subscriber'); module.exports = agent => { const subscriber = new Subscriber(); // listen changed event, broadcast to all workers subscriber.on('changed', () => agent.messenger.sendToApp('refresh', 'push')); };
通過合理使用 Agent 進(jìn)程、定時(shí)任務(wù)和 IPC,我們可以輕松搞定類似的需求并降低對(duì)數(shù)據(jù)源的壓力。具體的示例代碼可以查看 examples/ipc 。
更復(fù)雜的場(chǎng)景上面的例子中,我們?cè)?Agent 進(jìn)程上運(yùn)行了一個(gè) subscriber,來監(jiān)聽消息中間件的消息,如果 Worker 進(jìn)程也需要監(jiān)聽一些消息怎么辦?如何通過 Agent 進(jìn)程建立連接再轉(zhuǎn)發(fā)給 Worker 進(jìn)程呢?這些問題可以在多進(jìn)程研發(fā)模式增強(qiáng) 中找到答案。
更多建議: