我們知道 JavaScript 代碼是運行在單線程上的,換句話說一個 Node.js 進程只能運行在一個 CPU 上。那么如果用 Node.js 來做 Web Server,就無法享受到多核運算的好處。作為企業(yè)級的解決方案,我們要解決的一個問題就是:
如何榨干服務(wù)器資源,利用上多核 CPU 的并發(fā)優(yōu)勢? 而 Node.js 官方提供的解決方案是 Cluster 模塊 ,其中包含一段簡介:
單個 Node.js 實例在單線程環(huán)境下運行。為了更好地利用多核環(huán)境,用戶有時希望啟動一批 Node.js 進程用于加載。集群化模塊使得你很方便地創(chuàng)建子進程,以便于在服務(wù)端口之間共享。 Cluster 是什么呢?簡單的說,
在服務(wù)器上同時啟動多個進程。 每個進程里都跑的是同一份源代碼(好比把以前一個進程的工作分給多個進程去做)。 更神奇的是,這些進程可以同時監(jiān)聽一個端口(具體原理推薦閱讀 @DavidCai1993 這篇 Cluster 實現(xiàn)原理 )。 其中:
負責(zé)啟動其他進程的叫做 Master 進程,他好比是個『包工頭』,不做具體的工作,只負責(zé)啟動其他進程。 其他被啟動的叫 Worker 進程,顧名思義就是干活的『工人』。它們接收請求,對外提供服務(wù)。 Worker 進程的數(shù)量一般根據(jù)服務(wù)器的 CPU 核數(shù)來定,這樣就可以完美利用多核資源。 const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
框架的多進程模型上面的示例是不是很簡單,但是作為企業(yè)級的解決方案,要考慮的東西還有很多。
Worker 進程異常退出以后該如何處理? 多個 Worker 進程之間如何共享資源? 多個 Worker 進程之間如何調(diào)度? ... 進程守護健壯性(又叫魯棒性)是企業(yè)級應(yīng)用必須考慮的問題,除了程序本身代碼質(zhì)量要保證,框架層面也需要提供相應(yīng)的『兜底』機制保證極端情況下應(yīng)用的可用性。
一般來說,Node.js 進程退出可以分為兩類:
未捕獲異常當代碼拋出了異常沒有被捕獲到時,進程將會退出,此時 Node.js 提供了 process.on('uncaughtException', handler) 接口來捕獲它,但是當一個 Worker 進程遇到 未捕獲的異常 時,它已經(jīng)處于一個不確定狀態(tài),此時我們應(yīng)該讓這個進程優(yōu)雅退出:
關(guān)閉異常 Worker 進程所有的 TCP Server(將已有的連接快速斷開,且不再接收新的連接),斷開和 Master 的 IPC 通道,不再接受新的用戶請求。 Master 立刻 fork 一個新的 Worker 進程,保證在線的『工人』總數(shù)不變。 異常 Worker 等待一段時間,處理完已經(jīng)接受的請求后退出。 +---------+ +---------+ | Worker | | Master | +---------+ +----+----+ | uncaughtException | +------------+ | | | | +---------+ | <----------+ | | Worker | | | +----+----+ | disconnect | fork a new worker | +-------------------------> + ---------------------> | | wait... | | | exit | | +-------------------------> | | | | | die | | | | | |
OOM、系統(tǒng)異常而當一個進程出現(xiàn)異常導(dǎo)致 crash 或者 OOM 被系統(tǒng)殺死時,不像未捕獲異常發(fā)生時我們還有機會讓進程繼續(xù)執(zhí)行,只能夠讓當前進程直接退出,Master 立刻 fork 一個新的 Worker。
在框架里,我們采用 graceful 和 egg-cluster 兩個模塊配合實現(xiàn)上面的邏輯。這套方案已在阿里巴巴和螞蟻金服的生產(chǎn)環(huán)境廣泛部署,且經(jīng)受過『雙11』大促的考驗,所以是相對穩(wěn)定和靠譜的。
Agent 機制說到這里,Node.js 多進程方案貌似已經(jīng)成型,這也是我們早期線上使用的方案。但后來我們發(fā)現(xiàn)有些工作其實不需要每個 Worker 都去做,如果都做,一來是浪費資源,更重要的是可能會導(dǎo)致多進程間資源訪問沖突。舉個例子:生產(chǎn)環(huán)境的日志文件我們一般會按照日期進行歸檔,在單進程模型下這再簡單不過了:
每天凌晨 0 點,將當前日志文件按照日期進行重命名銷毀以前的文件句柄,并創(chuàng)建新的日志文件繼續(xù)寫入 試想如果現(xiàn)在是 4 個進程來做同樣的事情,是不是就亂套了。所以,對于這一類后臺運行的邏輯,我們希望將它們放到一個單獨的進程上去執(zhí)行,這個進程就叫 Agent Worker,簡稱 Agent。Agent 好比是 Master 給其他 Worker 請的一個『秘書』,它不對外提供服務(wù),只給 App Worker 打工,專門處理一些公共事務(wù)?,F(xiàn)在我們的多進程模型就變成下面這個樣子了
+--------+ +-------+ | Master |<-------->| Agent | +--------+ +-------+ ^ ^ ^ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
那我們框架的啟動時序如下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
Master 啟動后先 fork Agent 進程 Agent 初始化成功后,通過 IPC 通道通知 Master Master 再 fork 多個 App Worker App Worker 初始化成功,通知 Master 所有的進程初始化成功后,Master 通知 Agent 和 Worker 應(yīng)用啟動成功 另外,關(guān)于 Agent Worker 還有幾點需要注意的是:
由于 App Worker 依賴于 Agent,所以必須等 Agent 初始化完成后才能 fork App Worker Agent 雖然是 App Worker 的『小秘』,但是業(yè)務(wù)相關(guān)的工作不應(yīng)該放到 Agent 上去做,不然把她累垮了就不好了 由于 Agent 的特殊定位,我們應(yīng)該保證它相對穩(wěn)定。當它發(fā)生未捕獲異常,框架不會像 App Worker 一樣讓他退出重啟,而是記錄異常日志、報警等待人工處理 Agent 和普通 App Worker 掛載的 API 不完全一樣,如何識別差異可查看框架文檔 Agent 的用法你可以在應(yīng)用或插件根目錄下的 agent.js 中實現(xiàn)你自己的邏輯(和啟動自定義 用法類似,只是入口參數(shù)是 agent 對象)
// agent.js module.exports = agent => { // 在這里寫你的初始化邏輯 // 也可以通過 messenger 對象發(fā)送消息給 App Worker // 但需要等待 App Worker 啟動成功后才能發(fā)送,不然很可能丟失 agent.messenger.on('egg-ready', () => { const data = { ... }; agent.messenger.sendToApp('xxx_action', data); }); };
// app.js module.exports = app => { app.messenger.on('xxx_action', data => { // ... }); };
這個例子中,agent.js 的代碼會執(zhí)行在 agent 進程上,app.js 的代碼會執(zhí)行在 Worker 進程上,他們通過框架封裝的 messenger 對象進行進程間通訊(IPC),后面的章節(jié)會對框架的 IPC 做詳細的講解。
Master VS Agent VS Worker當一個應(yīng)用啟動時,會同時啟動這三類進程。
類型 進程數(shù)量 作用 穩(wěn)定性 是否運行業(yè)務(wù)代碼 Master 1 進程管理,進程間消息轉(zhuǎn)發(fā) 非常高 否 Agent 1 后臺運行工作(長連接客戶端) 高 少量 Worker 一般設(shè)置為 CPU 核數(shù) 執(zhí)行業(yè)務(wù)代碼 一般 是
Master在這個模型下,Master 進程承擔(dān)了進程管理的工作(類似 pm2 ),不運行任何業(yè)務(wù)代碼,我們只需要運行起一個 Master 進程它就會幫我們搞定所有的 Worker、Agent 進程的初始化以及重啟等工作了。
Master 進程的穩(wěn)定性是極高的,線上運行時我們只需要通過 egg-scripts 后臺運行通過 egg.startCluster 啟動的 Master 進程就可以了,不再需要使用 pm2 等進程守護模塊。
$ egg-scripts start --daemon
Agent在大部分情況下,我們在寫業(yè)務(wù)代碼的時候完全不用考慮 Agent 進程的存在,但是當我們遇到一些場景,只想讓代碼運行在一個進程上的時候,Agent 進程就到了發(fā)揮作用的時候了。
由于 Agent 只有一個,而且會負責(zé)許多維持連接的臟活累活,因此它不能輕易掛掉和重啟,所以 Agent 進程在監(jiān)聽到未捕獲異常時不會退出,但是會打印出錯誤日志,我們需要對日志中的未捕獲異常提高警惕。
WorkerWorker 進程負責(zé)處理真正的用戶請求和定時任務(wù) 的處理。而 Egg 的定時任務(wù)也提供了只讓一個 Worker 進程運行的能力,所以能夠通過定時任務(wù)解決的問題就不要放到 Agent 上執(zhí)行。
Worker 運行的是業(yè)務(wù)代碼,相對會比 Agent 和 Master 進程上運行的代碼復(fù)雜度更高,穩(wěn)定性也低一點,當 Worker 進程異常退出時,Master 進程會重啟一個 Worker 進程。
進程間通訊(IPC)雖然每個 Worker 進程是相對獨立的,但是它們之間始終還是需要通訊的,叫進程間通訊(IPC)。下面是 Node.js 官方提供的一段示例代碼
'use strict'; const cluster = require('cluster'); if (cluster.isMaster) { const worker = cluster.fork(); worker.send('hi there'); worker.on('message', msg => { console.log(`msg: ${msg} from worker#${worker.id}`); }); } else if (cluster.isWorker) { process.on('message', (msg) => { process.send(msg); }); }
細心的你可能已經(jīng)發(fā)現(xiàn) cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之間,Worker 與 Agent 進程互相間是沒有的。那么 Worker 之間想通訊該怎么辦呢?是的,通過 Master 來轉(zhuǎn)發(fā)。
廣播消息: agent => all workers +--------+ +-------+ | Master |<---------| Agent | +--------+ +-------+ / | \ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+ 指定接收方: one worker => another worker +--------+ +-------+ | Master |----------| Agent | +--------+ +-------+ ^ | send to / | worker 2 / | / | / v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
為了方便調(diào)用,我們封裝了一個 messenger 對象掛在 app / agent 實例上,提供一系列友好的 API。
發(fā)送app.messenger.broadcast(action, data):發(fā)送給所有的 agent / app 進程(包括自己) app.messenger.sendToApp(action, data): 發(fā)送給所有的 app 進程在 app 上調(diào)用該方法會發(fā)送給自己和其他的 app 進程在 agent 上調(diào)用該方法會發(fā)送給所有的 app 進程 app.messenger.sendToAgent(action, data): 發(fā)送給 agent 進程在 app 上調(diào)用該方法會發(fā)送給 agent 進程在 agent 上調(diào)用該方法會發(fā)送給 agent 自己 agent.messenger.sendRandom(action, data):app 上沒有該方法(現(xiàn)在 Egg 的實現(xiàn)是等同于 sentToAgent)agent 會隨機發(fā)送消息給一個 app 進程(由 master 來控制發(fā)送給誰) app.messenger.sendTo(pid, action, data): 發(fā)送給指定進程 // app.js module.exports = app => { // 注意,只有在 egg-ready 事件拿到之后才能發(fā)送消息 app.messenger.once('egg-ready', () => { app.messenger.sendToAgent('agent-event', { foo: 'bar' }); app.messenger.sendToApp('app-event', { foo: 'bar' }); }); }
上面所有 app.messenger 上的方法都可以在 agent.messenger 上使用。
egg-ready上面的示例中提到,需要等 egg-ready 消息之后才能發(fā)送消息。只有在 Master 確認所有的 Agent 進程和 Worker 進程都已經(jīng)成功啟動(并 ready)之后,才會通過 messenger 發(fā)送 egg-ready 消息給所有的 Agent 和 Worker,告知一切準備就緒,IPC 通道可以開始使用了。
接收在 messenger 上監(jiān)聽對應(yīng)的 action 事件,就可以收到其他進程發(fā)送來的信息了。
app.messenger.on(action, data => { // process data }); app.messenger.once(action, data => { // process data });
agent 上的 messenger 接收消息的用法和 app 上一致。
IPC 實戰(zhàn)我們通過一個簡單的例子來感受一下在框架的多進程模型下如何使用 IPC 解決實際問題。
需求我們有一個接口需要從遠程數(shù)據(jù)源中讀取一些數(shù)據(jù),對外部提供 API,但是這個數(shù)據(jù)源的數(shù)據(jù)很少變化,因此我們希望將數(shù)據(jù)緩存到內(nèi)存中以提升服務(wù)能力,降低 RT。此時就需要有一個更新內(nèi)存緩存的機制。
定時從遠程數(shù)據(jù)源獲取數(shù)據(jù),更新內(nèi)存緩存,為了降低對數(shù)據(jù)源壓力,更新的間隔時間會設(shè)置的比較長。 遠程數(shù)據(jù)源提供一個檢查是否有數(shù)據(jù)更新的接口,我們的服務(wù)可以更頻繁的調(diào)用檢查接口,當有數(shù)據(jù)更新時才去重新拉取數(shù)據(jù)。 遠程數(shù)據(jù)源通過消息中間件推送數(shù)據(jù)更新的消息,我們的服務(wù)監(jiān)聽消息來更新數(shù)據(jù)。 在實際項目中,我們可以采用方案一用于兜底,結(jié)合方案三或者方案二的一種用于提升數(shù)據(jù)更新的實時性。而在這個示例中,我們會通過 IPC + 定時任務(wù) 來同時實現(xiàn)這三種緩存更新方案。
實現(xiàn)我們將所有的與遠程數(shù)據(jù)源交互的邏輯封裝在一個 Service 中,并提供 get 方法給 Controller 調(diào)用。
// app/service/source.js let memoryCache = {}; class SourceService extends Service { get(key) { return memoryCache[key]; } async checkUpdate() { // check if remote data source has changed const updated = await mockCheck(); this.ctx.logger.info('check update response %s', updated); return updated; } async update() { // update memory cache from remote memoryCache = await mockFetch(); this.ctx.logger.info('update memory cache from remote: %j', memoryCache); } }
編寫定時任務(wù),實現(xiàn)方案一,每 10 分鐘定時從遠程數(shù)據(jù)源獲取數(shù)據(jù)更新緩存做兜底。
// app/schedule/force_refresh.js exports.schedule = { interval: '10m', type: 'all', // run in all workers }; exports.task = async ctx => { await ctx.service.source.update(); ctx.app.lastUpdateBy = 'force'; };
再編寫一個定時任務(wù)來實現(xiàn)方案二的檢查邏輯,每 10s 讓一個 worker 調(diào)用檢查接口,當發(fā)現(xiàn)數(shù)據(jù)有變化時,通過 messenger 提供的方法通知所有的 Worker。
// app/schedule/pull_refresh.js exports.schedule = { interval: '10s', type: 'worker', // only run in one worker }; exports.task = async ctx => { const needRefresh = await ctx.service.source.checkUpdate(); if (!needRefresh) return; // notify all workers to update memory cache from `file` ctx.app.messenger.sendToApp('refresh', 'pull'); };
在啟動自定義文件中監(jiān)聽 refresh 事件,并更新數(shù)據(jù),所有的 Worker 進程都能收到這個消息,并觸發(fā)更新,此時我們的方案二也已經(jīng)大功告成了。
// app.js module.exports = app => { app.messenger.on('refresh', by => { app.logger.info('start update by %s', by); // create an anonymous context to access service const ctx = app.createAnonymousContext(); ctx.runInBackground(async () => { await ctx.service.source.update(); app.lastUpdateBy = by; }); }); };
現(xiàn)在我們來看看如何實現(xiàn)第三個方案。我們需要有一個消息中間件的客戶端,它會和服務(wù)端保持長連接,這一類的長連接維持比較適合在 Agent 進程上做,可以有效降低連接數(shù),減少兩端的消耗。所以我們在 Agent 進程上來開啟消息監(jiān)聽。
// agent.js const Subscriber = require('./lib/subscriber'); module.exports = agent => { const subscriber = new Subscriber(); // listen changed event, broadcast to all workers subscriber.on('changed', () => agent.messenger.sendToApp('refresh', 'push')); };
通過合理使用 Agent 進程、定時任務(wù)和 IPC,我們可以輕松搞定類似的需求并降低對數(shù)據(jù)源的壓力。具體的示例代碼可以查看 examples/ipc 。
更復(fù)雜的場景上面的例子中,我們在 Agent 進程上運行了一個 subscriber,來監(jiān)聽消息中間件的消息,如果 Worker 進程也需要監(jiān)聽一些消息怎么辦?如何通過 Agent 進程建立連接再轉(zhuǎn)發(fā)給 Worker 進程呢?這些問題可以在多進程研發(fā)模式增強 中找到答案。
更多建議: