swoole 的異步任務(wù)task系統(tǒng)可以很方便的為我們?cè)陂_發(fā)的過程中調(diào)用異步任務(wù)的執(zhí)行,而無需等待。
task模塊用來做一些異步的慢速任務(wù),比如webim中發(fā)廣播,發(fā)送郵件,異步訂單處理、異步支付處理等。
node.js 假如有10萬個(gè)連接,要發(fā)廣播時(shí),那會(huì)循環(huán)10萬次,這時(shí)候程序不能做任何事情,不能接受新的連接,也不能收包發(fā)包。
而swoole不同,丟給task進(jìn)程之后,worker進(jìn)程可以繼續(xù)處理新的數(shù)據(jù)請(qǐng)求。任務(wù)完成后會(huì)異步地通知worker進(jìn)程告訴它此任務(wù)已經(jīng)完成。
當(dāng)然task模塊的作用還不僅如此,實(shí)現(xiàn)PHP的數(shù)據(jù)庫(kù)連接池,異步隊(duì)列等,還需要進(jìn)一步挖掘。
$serv = new Swoole\Server("127.0.0.1", 9502);
$serv->set(array('task_worker_num' => 4));
$serv->on('Receive', function($serv, $fd, $from_id, $data) {
$task_id = $serv->task("Async");
echo "Dispath AsyncTask: id=$task_id\n";
});
$serv->on('Task', function ($serv, $task_id, $from_id, $data) {
echo "New AsyncTask[id=$task_id]".PHP_EOL;
$serv->finish("$data -> OK");
});
$serv->on('Finish', function ($serv, $task_id, $data) {
echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;
});
$serv->start();
投遞一個(gè)異步任務(wù)到task_worker池中。此函數(shù)是非阻塞的,執(zhí)行完畢會(huì)立即返回。worker進(jìn)程可以繼續(xù)處理新的請(qǐng)求。
int swoole_server::task(mixed $data, int $dst_worker_id = -1)
$task_id = $serv->task("some data");
//swoole-1.8.6或更高版本
$serv->task("taskcallback", -1, function (swoole_server $serv, $task_id, $data) {
echo "Task Callback: ";
var_dump($task_id, $data);
});
0 - (serv->task_worker_num -1)
$task_id
,表示此任務(wù)的ID。如果有finish回應(yīng),onFinish
回調(diào)中會(huì)攜帶$task_id
參數(shù)onFinish
函數(shù),如果任務(wù)設(shè)置了回調(diào)函數(shù),Task返回結(jié)果時(shí)會(huì)直接執(zhí)行制定的回調(diào)函數(shù),不再執(zhí)行Server的onFinish
回調(diào)$dst_worker_id在1.6.11+后可用,默認(rèn)為隨機(jī)投遞
$task_id是從0-42億的整數(shù),在當(dāng)前進(jìn)程內(nèi)是唯一的
task方法不能在task進(jìn)程/用戶自定義進(jìn)程中調(diào)用
此功能用于將慢速的任務(wù)異步地去執(zhí)行,比如一個(gè)聊天室服務(wù)器,可以用它來進(jìn)行發(fā)送廣播。當(dāng)任務(wù)完成時(shí),在task進(jìn)程中調(diào)用$serv->finish("finish")
告訴worker進(jìn)程此任務(wù)已完成。當(dāng)然swoole_server->finish
是可選的。
task底層使用Unix Socket管道通信,是全內(nèi)存的,沒有IO消耗。單進(jìn)程讀寫性能可達(dá)100萬/s,不同的進(jìn)程使用不同的管道通信,可以最大化利用多核。
AsyncTask功能在1.6.4版本增加,默認(rèn)不啟動(dòng)task功能,需要在手工設(shè)置task_worker_num來啟動(dòng)此功能
task_worker的數(shù)量在swoole_server::set參數(shù)中調(diào)整,如task_worker_num => 64,表示啟動(dòng)64個(gè)進(jìn)程來接收異步任務(wù)
swoole_server->task/taskwait/finish 3個(gè)方法當(dāng)傳入的$data
數(shù)據(jù)超過8K時(shí)會(huì)啟用臨時(shí)文件來保存。當(dāng)臨時(shí)文件內(nèi)容超過server->package_max_length
時(shí)底層會(huì)拋出一個(gè)警告。
WARN: task package is too big.
server->package_max_length 默認(rèn)為2M
函數(shù)原型:
string $result = swoole_server->taskwait(mixed $task_data, float $timeout = 0.5, int $dst_worker_id = -1);
taskwait與task方法作用相同,用于投遞一個(gè)異步的任務(wù)到task進(jìn)程池去執(zhí)行。與task不同的是taskwait是阻塞等待的,直到任務(wù)完成或者超時(shí)返回。
$result為任務(wù)執(zhí)行的結(jié)果,由$serv->finish函數(shù)發(fā)出。如果此任務(wù)超時(shí),這里會(huì)返回false。
taskwait是阻塞接口,如果你的Server是全異步的請(qǐng)使用swoole_server::task和swoole_server::finish,不要使用taskwait
第3個(gè)參數(shù)可以制定要給投遞給哪個(gè)task進(jìn)程,傳入ID即可,范圍是0 - serv->task_worker_num
$dst_worker_id在1.6.11+后可用,默認(rèn)為隨機(jī)投遞
taskwait方法不能在task進(jìn)程中調(diào)用
在task_worker進(jìn)程內(nèi)被調(diào)用。worker進(jìn)程可以使用swoole_server_task函數(shù)向task_worker進(jìn)程投遞新的任務(wù)。當(dāng)前的Task進(jìn)程在調(diào)用onTask
回調(diào)函數(shù)時(shí)會(huì)將進(jìn)程狀態(tài)切換為忙碌,這時(shí)將不再接收新的Task,當(dāng)onTask
函數(shù)返回時(shí)會(huì)將進(jìn)程狀態(tài)切換為空閑然后繼續(xù)接收新的Task。
function onTask((swoole_server $serv, int $task_id, int $src_worker_id, string $data));
1.7.2以上的版本,在onTask函數(shù)中 return字符串,表示將此內(nèi)容返回給worker進(jìn)程。worker進(jìn)程中會(huì)觸發(fā)onFinish函數(shù),表示投遞的task已完成。
1.7.2以前的版本,需要調(diào)用swoole_server->finish()
函數(shù)將結(jié)果返回給worker進(jìn)程
更多建議: