Node.js Cluster模塊

2021-09-15 16:32 更新

概述

基本用法

Node.js默認(rèn)單進(jìn)程運(yùn)行,對于多核CPU的計(jì)算機(jī)來說,這樣做效率很低,因?yàn)橹挥幸粋€核在運(yùn)行,其他核都在閑置。cluster模塊就是為了解決這個問題而提出的。

cluster模塊允許設(shè)立一個主進(jìn)程和若干個worker進(jìn)程,由主進(jìn)程監(jiān)控和協(xié)調(diào)worker進(jìn)程的運(yùn)行。worker之間采用進(jìn)程建通信交換消息,cluster模塊內(nèi)置一個負(fù)載均衡器,采用Round-robin算法協(xié)調(diào)各個worker進(jìn)程之間的負(fù)載。運(yùn)行時,所有新建立的鏈接都由主進(jìn)程完成,然后主進(jìn)程再把TCP連接分配給指定的worker進(jìn)程。

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster){
  for (var i = 0, n = os.cpus().length; i < n; i += 1){
    cluster.fork();
  }
} else {
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}

上面代碼先判斷當(dāng)前進(jìn)程是否為主進(jìn)程(cluster.isMaster),如果是的,就按照CPU的核數(shù),新建若干個worker進(jìn)程;如果不是,說明當(dāng)前進(jìn)程是worker進(jìn)程,則在該進(jìn)程啟動一個服務(wù)器程序。

cluster.worker對象

cluster.worker指向當(dāng)前worker進(jìn)程對象,主進(jìn)程沒有這個值。

它有如下屬性。

(1)worker.id

work.id返回當(dāng)前worker的獨(dú)一無二的進(jìn)程編號。這個編號也是cluster.workers中指向當(dāng)前進(jìn)程的索引值。

(2)worker.process

所有的worker進(jìn)程都是用child_process.fork()生成的。child_process.fork()返回的對象,就被保存在worker.process之中。通過這個屬性,可以獲取worker所在的進(jìn)程對象。

(3)worker.send()

該方法用于在主進(jìn)程中,向子進(jìn)程發(fā)送信息。

if (cluster.isMaster) {
  var worker = cluster.fork();
  worker.send('hi there');
} else if (cluster.isWorker) {
  process.on('message', function(msg) {
    process.send(msg);
  });
}

上面代碼的作用是,worker進(jìn)程對主進(jìn)程發(fā)出的每個消息,都做回聲。

在worker進(jìn)程中調(diào)用這個方法,等同于process.send(message)。

cluster.workers對象

該對象只有主進(jìn)程才有,包含了所有worker進(jìn)程。每個成員的鍵值就是一個worker進(jìn)程,鍵名就是該worker進(jìn)程的worker.id屬性。

function eachWorker(callback) {
  for (var id in cluster.workers) {
    callback(cluster.workers[id]);
  }
}
eachWorker(function(worker) {
  worker.send('big announcement to all workers');
});

上面代碼用來遍歷所有worker進(jìn)程。

當(dāng)前socket的data事件,也可以用id屬性識別worker進(jìn)程。

socket.on('data', function(id) {
  var worker = cluster.workers[id];
});

屬性與方法

isMaster,isWorker

isMaster屬性返回一個布爾值,表示當(dāng)前進(jìn)程是否為主進(jìn)程。這個屬性由process.env.NODE_UNIQUE_ID決定,如果process.env.NODE_UNIQUE_ID為未定義,就表示該進(jìn)程是主進(jìn)程。

isWorker屬性返回一個布爾值,表示當(dāng)前進(jìn)程是否為work進(jìn)程。它與isMaster屬性的值正好相反。

fork()

fork方法用于新建一個worker進(jìn)程,上下文都復(fù)制主進(jìn)程。只有主進(jìn)程才能調(diào)用這個方法。

該方法返回一個worker對象。

kill()

kill方法用于終止worker進(jìn)程。它可以接受一個參數(shù),表示系統(tǒng)信號。

如果當(dāng)前是主進(jìn)程,就會終止與worker.process的聯(lián)絡(luò),然后將系統(tǒng)信號法發(fā)向worker進(jìn)程。如果當(dāng)前是worker進(jìn)程,就會終止與主進(jìn)程的通信,然后退出,返回0。

在以前的版本中,該方法也叫做 worker.destroy() 。

listening事件

worker進(jìn)程調(diào)用listen方面以后,“l(fā)istening”就傳向該進(jìn)程的服務(wù)器,然后傳向主進(jìn)程。

該事件的回調(diào)函數(shù)接受兩個參數(shù),一個是當(dāng)前worker對象,另一個是地址對象,包含網(wǎng)址、端口、地址類型(IPv4、IPv6、Unix socket、UDP)等信息。這對于那些服務(wù)多個網(wǎng)址的Node應(yīng)用程序非常有用。

cluster.on('listening', function(worker, address) {
  console.log("A worker is now connected to " + address.address + ":" + address.port);
});

實(shí)例:不中斷地重啟Node服務(wù)

重啟服務(wù)需要關(guān)閉后再啟動,利用cluster模塊,可以做到先啟動一個worker進(jìn)程,再把原有的所有work進(jìn)程關(guān)閉。這樣就能實(shí)現(xiàn)不中斷地重啟Node服務(wù)。

下面是主進(jìn)程的代碼master.js。

var cluster = require('cluster');

console.log('started master with ' + process.pid);

// 新建一個worker進(jìn)程
cluster.fork();

process.on('SIGHUP', function () {
  console.log('Reloading...');
  var new_worker = cluster.fork();
  new_worker.once('listening', function () {
    // 關(guān)閉所有其他worker進(jìn)程
    for(var id in cluster.workers) {
      if (id === new_worker.id.toString()) continue;
      cluster.workers[id].kill('SIGTERM');
    }
  });
});

上面代碼中,主進(jìn)程監(jiān)聽SIGHUP事件,如果發(fā)生該事件就關(guān)閉其他所有worker進(jìn)程。之所以是SIGHUP事件,是因?yàn)閚ginx服務(wù)器監(jiān)聽到這個信號,會創(chuàng)造一個新的worker進(jìn)程,重新加載配置文件。另外,關(guān)閉worker進(jìn)程時,主進(jìn)程發(fā)送SIGTERM信號,這是因?yàn)镹ode允許多個worker進(jìn)程監(jiān)聽同一個端口。

下面是worker進(jìn)程的代碼server.js。

var cluster = require('cluster');

if (cluster.isMaster) {
  require('./master');
  return;
}

var express = require('express');
var http = require('http');
var app = express();

app.get('/', function (req, res) {
  res.send('ha fsdgfds gfds gfd!');
});

http.createServer(app).listen(8080, function () {
  console.log('http://localhost:8080');
});

使用時代碼如下。

$ node server.js
started master with 10538
http://localhost:8080

然后,向主進(jìn)程連續(xù)發(fā)出兩次SIGHUP信號。

$ kill -SIGHUP 10538
$ kill -SIGHUP 10538

主進(jìn)程會連續(xù)兩次新建一個worker進(jìn)程,然后關(guān)閉所有其他worker進(jìn)程,顯示如下。

Reloading...
http://localhost:8080
Reloading...
http://localhost:8080

最后,向主進(jìn)程發(fā)出SIGTERM信號,關(guān)閉主進(jìn)程。

$ kill 10538

PM2模塊

PM2模塊是cluster模塊的一個包裝層。它的作用是盡量將cluster模塊抽象掉,讓用戶像使用單進(jìn)程一樣,部署多進(jìn)程N(yùn)ode應(yīng)用。

// app.js
var http = require('http');

http.createServer(function(req, res) {
  res.writeHead(200);
  res.end("hello world");
}).listen(8080);

上面代碼是標(biāo)準(zhǔn)的Node架設(shè)Web服務(wù)器的方式,然后用PM2從命令行啟動這段代碼。

$ pm2 start app.js -i 4

上面代碼的i參數(shù)告訴PM2,這段代碼應(yīng)該在cluster_mode啟動,且新建worker進(jìn)程的數(shù)量是4個。如果i參數(shù)的值是0,那么當(dāng)前機(jī)器有幾個CPU內(nèi)核,PM2就會啟動幾個worker進(jìn)程。

如果一個worker進(jìn)程由于某種原因掛掉了,會立刻重啟該worker進(jìn)程。

# 重啟所有worker進(jìn)程
$ pm2 reload all

每個worker進(jìn)程都有一個id,可以用下面的命令查看單個worker進(jìn)程的詳情。

$ pm2 show <worker id>

正確情況下,PM2采用fork模式新建worker進(jìn)程,即主進(jìn)程fork自身,產(chǎn)生一個worker進(jìn)程。pm2 reload命令則會用spawn方式啟動,即一個接一個啟動worker進(jìn)程,一個新的worker啟動成功,再殺死一個舊的worker進(jìn)程。采用這種方式,重新部署新版本時,服務(wù)器就不會中斷服務(wù)。

$ pm2 reload <腳本文件名>

關(guān)閉worker進(jìn)程的時候,可以部署下面的代碼,讓worker進(jìn)程監(jiān)聽shutdown消息。一旦收到這個消息,進(jìn)行完畢收尾清理工作再關(guān)閉。

process.on('message', function(msg) {
  if (msg === 'shutdown') {
    close_all_connections();
    delete_logs();
    server.close();
    process.exit(0);
  }
});

參考鏈接

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號