在前面的多進程模型章節(jié)中,我們詳細講述了框架的多進程模型,其中適合使用 Agent 進程的有一類常見的場景:一些中間件客戶端需要和服務器建立長連接,理論上一臺服務器最好只建立一個長連接,但多進程模型會導致 n 倍(n = Worker 進程數(shù))連接被創(chuàng)建。
+--------+ +--------+ | Client | | Client | ... n +--------+ +--------+ | \ / | | \ / | n * m 個鏈接 | / \ | | / \ | +--------+ +--------+ | Server | | Server | ... m +--------+ +--------+
|
為了盡可能的復用長連接(因為它們對于服務端來說是非常寶貴的資源),我們會把它放到 Agent 進程里維護,然后通過 messenger 將數(shù)據(jù)傳遞給各個 Worker。這種做法是可行的,但是往往需要寫大量代碼去封裝接口和實現(xiàn)數(shù)據(jù)的傳遞,非常麻煩。
另外,通過 messenger 傳遞數(shù)據(jù)效率是比較低的,因為它會通過 Master 來做中轉;萬一 IPC 通道出現(xiàn)問題還可能將 Master 進程搞掛。
那么有沒有更好的方法呢?答案是肯定的,我們提供一種新的模式來降低這類客戶端封裝的復雜度。通過建立 Agent 和 Worker 的 socket 直連跳過 Master 的中轉。Agent 作為對外的門面維持多個 Worker 進程的共享連接。
核心思想
- 受到 Leader/Follower 模式的啟發(fā)。
- 客戶端會被區(qū)分為兩種角色:Leader: 負責和遠程服務端維持連接,對于同一類的客戶端只有一個 Leader。Follower: 會將具體的操作委托給 Leader,常見的是訂閱模型(讓 Leader 和遠程服務端交互,并等待其返回)。
- 如何確定誰是 Leader,誰是 Follower 呢?有兩種模式:自由競爭模式:客戶端啟動的時候通過本地端口的爭奪來確定 Leader。例如:大家都嘗試監(jiān)聽 7777 端口,最后只會有一個實例搶占到,那它就變成 Leader,其余的都是 Follower。強制指定模式:框架指定某一個 Leader,其余的就是 Follower。
- 框架里面我們采用的是強制指定模式,Leader 只能在 Agent 里面創(chuàng)建,這也符合我們對 Agent 的定位
- 框架啟動的時候 Master 會隨機選擇一個可用的端口作為 Cluster Client 監(jiān)聽的通訊端口,并將它通過參數(shù)傳遞給 Agent 和 App Worker。
- Leader 和 Follower 之間通過 socket 直連(通過通訊端口),不再需要 Master 中轉。
新的模式下,客戶端的通信方式如下:
+-------+ | start | +---+---+ | +--------+---------+ __| port competition |__ win / +------------------+ \ lose / \ +---------------+ tcp conn +-------------------+ | Leader(Agent) |<---------------->| Follower(Worker1) | +---------------+ +-------------------+ | \ tcp conn | \ +--------+ +-------------------+ | Client | | Follower(Worker2) | +--------+ +-------------------+
|
客戶端接口類型抽象
我們將客戶端接口抽象為下面兩大類,這也是對客戶端接口的一個規(guī)范,對于符合規(guī)范的客戶端,我們可以自動將其包裝為 Leader/Follower 模式。
- 訂閱、發(fā)布類(subscribe / publish):subscribe(info, listener) 接口包含兩個參數(shù),第一個是訂閱的信息,第二個是訂閱的回調函數(shù)。publish(info) 接口包含一個參數(shù),就是訂閱的信息。
- 調用類 (invoke),支持 callback, promise 和 generator function 三種風格的接口,但是推薦使用 generator function。
客戶端示例
const Base = require('sdk-base');
class Client extends Base { constructor(options) { super(options); // 在初始化成功以后記得 ready this.ready(true); }
/** * 訂閱 * * @param {Object} info - 訂閱的信息(一個 JSON 對象,注意盡量不要包含 Function, Buffer, Date 這類屬性) * @param {Function} listener - 監(jiān)聽的回調函數(shù),接收一個參數(shù)就是監(jiān)聽到的結果對象 */ subscribe(info, listener) { // ... }
/** * 發(fā)布 * * @param {Object} info - 發(fā)布的信息,和上面 subscribe 的 info 類似 */ publish(info) { // ... }
/** * 獲取數(shù)據(jù) (invoke) * * @param {String} id - id * @return {Object} result */ async getData(id) { // ... } }
|
異常處理
- Leader 如果“死掉”會觸發(fā)新一輪的端口爭奪,爭奪到端口的那個實例被推選為新的 Leader。
- 為保證 Leader 和 Follower 之間的通道健康,需要引入定時心跳檢查機制,如果 Follower 在固定時間內沒有發(fā)送心跳包,那么 Leader 會將 Follower 主動斷開,從而觸發(fā) Follower 的重新初始化。
協(xié)議和調用時序
Leader 和 Follower 通過下面的協(xié)議進行數(shù)據(jù)交換:
0 1 2 4 12 +-------+-------+---------------+---------------------------------------------------------------+ |version|req/res| reserved | request id | +-------------------------------+-------------------------------+-------------------------------+ | timeout | connection object length | application object length | +-------------------------------+---------------------------------------------------------------+ | conn object (JSON format) ... | app object | +-----------------------------------------------------------+ | | ... | +-----------------------------------------------------------------------------------------------+
|
- 在通訊端口上 Leader 啟動一個 Local Server,所有的 Leader/Follower 通訊都經(jīng)過 Local Server。
- Follower 連接上 Local Server 后,首先發(fā)送一個 register channel 的 packet(引入 channel 的概念是為了區(qū)別不同類型的客戶端)。
- Local Server 會將 Follower 分配給指定的 Leader(根據(jù)客戶端類型進行配對)。
- Follower 向 Leader 發(fā)送訂閱、發(fā)布請求。
- Leader 在訂閱數(shù)據(jù)變更時通過 subscribe result packet 通知 Follower。
- Follower 向 Leader 發(fā)送調用請求,Leader 收到后執(zhí)行相應操作后返回結果。
+----------+ +---------------+ +---------+ | Follower | | Local Server | | Leader | +----------+ +---------------+ +---------+ | register channel | assign to | + -----------------------> | --------------------> | | | | | subscribe | + ------------------------------------------------> | | publish | + ------------------------------------------------> | | | | subscribe result | | <------------------------------------------------ + | | | invoke | + ------------------------------------------------> | | invoke result | | <------------------------------------------------ + | |
|
具體的使用方法
下面我用一個簡單的例子,介紹在框架里面如何讓一個客戶端支持 Leader/Follower 模式:
- 第一步,我們的客戶端最好是符合上面提到過的接口約定,例如:
// registry_client.js const URL = require('url'); const Base = require('sdk-base');
class RegistryClient extends Base { constructor(options) { super({ // 指定異步啟動的方法 initMethod: 'init', }); this._options = options; this._registered = new Map(); }
/** * 啟動邏輯 */ async init() { this.ready(true); }
/** * 獲取配置 * @param {String} dataId - the dataId * @return {Object} 配置 */ async getConfig(dataId) { return this._registered.get(dataId); }
/** * 訂閱 * @param {Object} reg * - {String} dataId - the dataId * @param {Function} listener - the listener */ subscribe(reg, listener) { const key = reg.dataId; this.on(key, listener);
const data = this._registered.get(key); if (data) { process.nextTick(() => listener(data)); } }
/** * 發(fā)布 * @param {Object} reg * - {String} dataId - the dataId * - {String} publishData - the publish data */ publish(reg) { const key = reg.dataId; let changed = false;
if (this._registered.has(key)) { const arr = this._registered.get(key); if (arr.indexOf(reg.publishData) === -1) { changed = true; arr.push(reg.publishData); } } else { changed = true; this._registered.set(key, [reg.publishData]); } if (changed) { this.emit(key, this._registered.get(key).map(url => URL.parse(url, true))); } } }
module.exports = RegistryClient;
|
- 第二步,使用 agent.cluster 接口對 RegistryClient 進行封裝:
// agent.js const RegistryClient = require('registry_client');
module.exports = agent => { // 對 RegistryClient 進行封裝和實例化 agent.registryClient = agent.cluster(RegistryClient) // create 方法的參數(shù)就是 RegistryClient 構造函數(shù)的參數(shù) .create({});
agent.beforeStart(async () => { await agent.registryClient.ready(); agent.coreLogger.info('registry client is ready'); }); };
|
- 第三步,使用 app.cluster 接口對 RegistryClient 進行封裝:
// app.js const RegistryClient = require('registry_client');
module.exports = app => { app.registryClient = app.cluster(RegistryClient).create({}); app.beforeStart(async () => { await app.registryClient.ready(); app.coreLogger.info('registry client is ready');
// 調用 subscribe 進行訂閱 app.registryClient.subscribe({ dataId: 'demo.DemoService', }, val => { // ... });
// 調用 publish 發(fā)布數(shù)據(jù) app.registryClient.publish({ dataId: 'demo.DemoService', publishData: 'xxx', });
// 調用 getConfig 接口 const res = await app.registryClient.getConfig('demo.DemoService'); console.log(res); }); };
|
是不是很簡單?
當然,如果你的客戶端不是那么『標準』,那你可能需要用到其他一些 API,比如,你的訂閱函數(shù)不叫 subscribe 而是叫 sub:
class MockClient extends Base { constructor(options) { super({ initMethod: 'init', }); this._options = options; this._registered = new Map(); }
async init() { this.ready(true); }
sub(info, listener) { const key = reg.dataId; this.on(key, listener);
const data = this._registered.get(key); if (data) { process.nextTick(() => listener(data)); } }
... }
|
你需要通過 delegate(API代理)手動設置此委托:
// agent.js module.exports = agent => { agent.mockClient = agent.cluster(MockClient) // 將 sub 代理到 subscribe 邏輯上 .delegate('sub', 'subscribe') .create();
agent.beforeStart(async () => { await agent.mockClient.ready(); }); };
|
// app.js module.exports = app => { app.mockClient = app.cluster(MockClient) // 將 sub 代理到 subscribe 邏輯上 .delegate('sub', 'subscribe') .create();
app.beforeStart(async () => { await app.mockClient.ready();
app.sub({ id: 'test-id' }, val => { // put your code here }); }); };
|
我們已經(jīng)理解,通過 cluster-client 可以讓我們在不理解多進程模型的情況下開發(fā)『純粹』的 RegistryClient,只負責和服務端進行交互,然后使用 cluster-client 進行簡單的封裝就可以得到一個支持多進程模型的 ClusterClient。這里的 RegistryClient 實際上是一個專門負責和遠程服務通信進行數(shù)據(jù)通信的 DataClient。
大家可能已經(jīng)發(fā)現(xiàn),ClusterClient 同時帶來了一些約束,如果想在各進程暴露同樣的方法,那么 RegistryClient 上只能支持 sub/pub 模式以及異步的 API 調用。因為在多進程模型中所有的交互都必須經(jīng)過 socket 通信,勢必帶來了這一約束。
假設我們要實現(xiàn)一個同步的 get 方法,訂閱過的數(shù)據(jù)直接放入內存,使用 get 方法時直接返回。要怎么實現(xiàn)呢?而真實情況可能比這更復雜。
在這里,我們引入一個 APIClient 的最佳實踐。對于有讀取緩存數(shù)據(jù)等同步 API 需求的模塊,在 RegistryClient 基礎上再封裝一個 APIClient 來實現(xiàn)這些與遠程服務端交互無關的 API,暴露給用戶使用到的是這個 APIClient 的實例。
在 APIClient 內部實現(xiàn)上:
- 異步數(shù)據(jù)獲取,通過調用基于 ClusterClient 的 RegistryClient 的 API 實現(xiàn)。
- 同步調用等與服務端無關的接口在 APIClient 上實現(xiàn)。由于 ClusterClient 的 API 已經(jīng)抹平了多進程差異,所以在開發(fā) APIClient 調用到 RegistryClient 時也無需關心多進程模型。
例如在模塊的 APIClient 中增加帶緩存的 get 同步方法:
// some-client/index.js const cluster = require('cluster-client'); const RegistryClient = require('./registry_client');
class APIClient extends Base { constructor(options) { super(options);
// options.cluster 用于給 Egg 的插件傳遞 app.cluster 進來 this._client = (options.cluster || cluster)(RegistryClient).create(options); this._client.ready(() => this.ready(true));
this._cache = {};
// subMap: // { // foo: reg1, // bar: reg2, // } const subMap = options.subMap;
for (const key in subMap) { this.subscribe(subMap[key], value => { this._cache[key] = value; }); } }
subscribe(reg, listener) { this._client.subscribe(reg, listener); }
publish(reg) { this._client.publish(reg); }
get(key) { return this._cache[key]; } }
// 最終模塊向外暴露這個 APIClient module.exports = APIClient;
|
那么我們就可以這么使用該模塊:
// app.js || agent.js const APIClient = require('some-client'); // 上面那個模塊 module.exports = app => { const config = app.config.apiClient; app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster }); app.beforeStart(async () => { await app.apiClient.ready(); }); };
// config.${env}.js exports.apiClient = { subMap: { foo: { id: '', }, // bar... } };
|
為了方便你封裝 APIClient,在 cluster-client 模塊中提供了一個 APIClientBase 基類,那么上面的 APIClient 可以改寫為:
const APIClientBase = require('cluster-client').APIClientBase; const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase { // 返回原始的客戶端類 get DataClient() { return RegistryClient; }
// 用于設置 cluster-client 相關參數(shù),等同于 cluster 方法的第二個參數(shù) get clusterOptions() { return { responseTimeout: 120 * 1000, }; }
subscribe(reg, listener) { this._client.subscribe(reg, listener); }
publish(reg) { this._client.publish(reg); }
get(key) { return this._cache[key]; } }
|
總結一下:
+------------------------------------------------+ | APIClient | | +----------------------------------------| | | ClusterClient | | | +---------------------------------| | | | RegistryClient | +------------------------------------------------+
|
- RegistryClient - 負責和遠端服務通訊,實現(xiàn)數(shù)據(jù)的存取,只支持異步 API,不關心多進程模型。
- ClusterClient - 通過 cluster-client 模塊進行簡單 wrap 得到的 client 實例,負責自動抹平多進程模型的差異。
- APIClient - 內部調用 ClusterClient 做數(shù)據(jù)同步,無需關心多進程模型,用戶最終使用的模塊。API 都通過此處暴露,支持同步和異步。
有興趣的同學可以看一下增強多進程研發(fā)模式 討論過程。
在框架里面 cluster-client 相關的配置項
/** * @property {Number} responseTimeout - response timeout, default is 60000 * @property {Transcode} [transcode] * - {Function} encode - custom serialize method * - {Function} decode - custom deserialize method */ config.clusterClient = { responseTimeout: 60000, };
|
配置項 | 類型 | 默認值 | 描述 |
---|
responseTimeout | number | 60000 (一分鐘) | 全局的進程間通訊的超時時長,不能設置的太短,因為代理的接口本身也有超時設置 |
transcode | function | N/A | 進程間通訊的序列化方式,默認采用 serialize-json(建議不要自行設置) |
上面是全局的配置方式。如果,你想對一個客戶端單獨做設置
- 可以通過 app/agent.cluster(ClientClass, options) 的第二個參數(shù) options 進行覆蓋
app.registryClient = app.cluster(RegistryClient, { responseTimeout: 120 * 1000, // 這里傳入的是和 cluster-client 相關的參數(shù) }).create({ // 這里傳入的是 RegistryClient 需要的參數(shù) });
|
- 也可以通過覆蓋 APIClientBase 的 clusterOptions 這個 getter 屬性
const APIClientBase = require('cluster-client').APIClientBase; const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase { get DataClient() { return RegistryClient; }
get clusterOptions() { return { responseTimeout: 120 * 1000, }; }
// ... }
module.exports = APIClient; |
更多建議: