Swoole Task實(shí)例

2022-07-12 11:26 更新

Swoole Task異步任務(wù)介紹

swoole 的異步任務(wù)task系統(tǒng)可以很方便的為我們?cè)陂_發(fā)的過程中調(diào)用異步任務(wù)的執(zhí)行,而無需等待。

常見使用場(chǎng)景:

task模塊用來做一些異步的慢速任務(wù),比如webim中發(fā)廣播,發(fā)送郵件,異步訂單處理、異步支付處理等。

  • task進(jìn)程必須是同步阻塞的
  • task進(jìn)程支持定時(shí)器

node.js 假如有10萬個(gè)連接,要發(fā)廣播時(shí),那會(huì)循環(huán)10萬次,這時(shí)候程序不能做任何事情,不能接受新的連接,也不能收包發(fā)包。

而swoole不同,丟給task進(jìn)程之后,worker進(jìn)程可以繼續(xù)處理新的數(shù)據(jù)請(qǐng)求。任務(wù)完成后會(huì)異步地通知worker進(jìn)程告訴它此任務(wù)已經(jīng)完成。

當(dāng)然task模塊的作用還不僅如此,實(shí)現(xiàn)PHP的數(shù)據(jù)庫(kù)連接池,異步隊(duì)列等,還需要進(jìn)一步挖掘。

簡(jiǎn)單實(shí)例:

$serv = new Swoole\Server("127.0.0.1", 9502);
$serv->set(array('task_worker_num' => 4));
$serv->on('Receive', function($serv, $fd, $from_id, $data) {
    $task_id = $serv->task("Async");
    echo "Dispath AsyncTask: id=$task_id\n";
});
$serv->on('Task', function ($serv, $task_id, $from_id, $data) {
    echo "New AsyncTask[id=$task_id]".PHP_EOL;
    $serv->finish("$data -> OK");
});
$serv->on('Finish', function ($serv, $task_id, $data) {
    echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;
});
$serv->start();

swoole_server task異步任務(wù)介紹

投遞一個(gè)異步任務(wù)到task_worker池中。此函數(shù)是非阻塞的,執(zhí)行完畢會(huì)立即返回。worker進(jìn)程可以繼續(xù)處理新的請(qǐng)求。

int swoole_server::task(mixed $data, int $dst_worker_id = -1) 
$task_id = $serv->task("some data");
//swoole-1.8.6或更高版本
$serv->task("taskcallback", -1, function (swoole_server $serv, $task_id, $data) {
    echo "Task Callback: ";
    var_dump($task_id, $data);
});
  • $data要投遞的任務(wù)數(shù)據(jù),可以為除資源類型之外的任意PHP變量
  • $dst_worker_id可以制定要給投遞給哪個(gè)task進(jìn)程,傳入ID即可,范圍是0 - (serv->task_worker_num -1)
  • 調(diào)用成功,返回值為整數(shù)$task_id,表示此任務(wù)的ID。如果有finish回應(yīng),onFinish回調(diào)中會(huì)攜帶$task_id參數(shù)
  • 調(diào)用失敗,返回值為false
  • 未指定目標(biāo)Task進(jìn)程,調(diào)用task方法會(huì)判斷Task進(jìn)程的忙閑狀態(tài),底層只會(huì)向處于空閑狀態(tài)的Task進(jìn)程投遞任務(wù)
  • 1.8.6版本增加了第三個(gè)參數(shù),可以直接設(shè)置onFinish函數(shù),如果任務(wù)設(shè)置了回調(diào)函數(shù),Task返回結(jié)果時(shí)會(huì)直接執(zhí)行制定的回調(diào)函數(shù),不再執(zhí)行Server的onFinish回調(diào)

$dst_worker_id在1.6.11+后可用,默認(rèn)為隨機(jī)投遞
$task_id是從0-42億的整數(shù),在當(dāng)前進(jìn)程內(nèi)是唯一的
task方法不能在task進(jìn)程/用戶自定義進(jìn)程中調(diào)用

此功能用于將慢速的任務(wù)異步地去執(zhí)行,比如一個(gè)聊天室服務(wù)器,可以用它來進(jìn)行發(fā)送廣播。當(dāng)任務(wù)完成時(shí),在task進(jìn)程中調(diào)用$serv->finish("finish")告訴worker進(jìn)程此任務(wù)已完成。當(dāng)然swoole_server->finish是可選的。

task底層使用Unix Socket管道通信,是全內(nèi)存的,沒有IO消耗。單進(jìn)程讀寫性能可達(dá)100萬/s,不同的進(jìn)程使用不同的管道通信,可以最大化利用多核。

AsyncTask功能在1.6.4版本增加,默認(rèn)不啟動(dòng)task功能,需要在手工設(shè)置task_worker_num來啟動(dòng)此功能
task_worker的數(shù)量在swoole_server::set參數(shù)中調(diào)整,如task_worker_num => 64,表示啟動(dòng)64個(gè)進(jìn)程來接收異步任務(wù)

配置參數(shù)

swoole_server->task/taskwait/finish 3個(gè)方法當(dāng)傳入的$data數(shù)據(jù)超過8K時(shí)會(huì)啟用臨時(shí)文件來保存。當(dāng)臨時(shí)文件內(nèi)容超過server->package_max_length 時(shí)底層會(huì)拋出一個(gè)警告。

WARN: task package is too big.

server->package_max_length 默認(rèn)為2M

注意事項(xiàng)

  • 使用swoole_server_task必須為Server設(shè)置onTask和onFinish回調(diào),否則swoole_server->start會(huì)失敗
  • task操作的次數(shù)必須小于onTask處理速度,如果投遞容量超過處理能力,task會(huì)塞滿緩存區(qū),導(dǎo)致worker進(jìn)程發(fā)生阻塞。worker進(jìn)程將無法接收新的請(qǐng)求
  • 使用addProcess添加的用戶進(jìn)程中無法使用task投遞任務(wù),請(qǐng)使用sendMessage接口與工作進(jìn)程通信

Swoole taskwait

函數(shù)原型:

string $result = swoole_server->taskwait(mixed $task_data, float $timeout = 0.5, int $dst_worker_id = -1);

taskwait與task方法作用相同,用于投遞一個(gè)異步的任務(wù)到task進(jìn)程池去執(zhí)行。與task不同的是taskwait是阻塞等待的,直到任務(wù)完成或者超時(shí)返回。

$result為任務(wù)執(zhí)行的結(jié)果,由$serv->finish函數(shù)發(fā)出。如果此任務(wù)超時(shí),這里會(huì)返回false。

taskwait是阻塞接口,如果你的Server是全異步的請(qǐng)使用swoole_server::task和swoole_server::finish,不要使用taskwait
第3個(gè)參數(shù)可以制定要給投遞給哪個(gè)task進(jìn)程,傳入ID即可,范圍是0 - serv->task_worker_num
$dst_worker_id在1.6.11+后可用,默認(rèn)為隨機(jī)投遞
taskwait方法不能在task進(jìn)程中調(diào)用

onTask

在task_worker進(jìn)程內(nèi)被調(diào)用。worker進(jìn)程可以使用swoole_server_task函數(shù)向task_worker進(jìn)程投遞新的任務(wù)。當(dāng)前的Task進(jìn)程在調(diào)用onTask回調(diào)函數(shù)時(shí)會(huì)將進(jìn)程狀態(tài)切換為忙碌,這時(shí)將不再接收新的Task,當(dāng)onTask函數(shù)返回時(shí)會(huì)將進(jìn)程狀態(tài)切換為空閑然后繼續(xù)接收新的Task。

function onTask((swoole_server $serv, int $task_id, int $src_worker_id, string $data));
  • $task_id是任務(wù)ID,由swoole擴(kuò)展內(nèi)自動(dòng)生成,用于區(qū)分不同的任務(wù)。$task_id和$src_worker_id組合起來才是全局唯一的,不同的worker進(jìn)程投遞的任務(wù)ID可能會(huì)有相同
  • $src_worker_id來自于哪個(gè)worker進(jìn)程
  • $data 是任務(wù)的內(nèi)容

返回執(zhí)行結(jié)果到worker進(jìn)程

1.7.2以上的版本,在onTask函數(shù)中 return字符串,表示將此內(nèi)容返回給worker進(jìn)程。worker進(jìn)程中會(huì)觸發(fā)onFinish函數(shù),表示投遞的task已完成。

  • return的變量可以是任意非null的PHP變量

1.7.2以前的版本,需要調(diào)用swoole_server->finish()函數(shù)將結(jié)果返回給worker進(jìn)程


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)