Timer定時器、心跳檢測及Task進(jìn)階實例:mysql連接池

2018-02-24 16:13 更新

環(huán)境說明: 系統(tǒng):Ubuntu14.04 (安裝教程包括CentOS6.5)
PHP版本:PHP-5.5.10
swoole版本:1.7.7-stable

1.Timer定時器

在實際應(yīng)用中,往往會遇到需要每隔一段時間重復(fù)做一件事,比如心跳檢測、訂閱消息、數(shù)據(jù)庫備份等工作。通常,我們會借助PHP的time()以及相關(guān)函數(shù)自己實現(xiàn)一個定時器,或者使用crontab工具來實現(xiàn)。但是,自定義的定時器容易出錯,而使用crontab則需要編寫額外的腳本文件,無論是遷移還是調(diào)試都比較麻煩。
因此,Swoole提供了一個內(nèi)置的Timer定時器功能,通過函數(shù)addtimer即可在Swoole中添加一個定時器,該定時器會在建立之后,按照預(yù)先設(shè)定好的時間間隔,每到對應(yīng)的時間就會調(diào)用一次回調(diào)函數(shù)onTimer通知Server。
簡單示例如下:

    $this->serv->on('Timer', array($this, 'onTimer'));

    public function onWorkerStart( $serv , $worker_id) {
        // 在Worker進(jìn)程開啟時綁定定時器
        // 只有當(dāng)worker_id為0時才添加定時器,避免重復(fù)添加
        if( $worker_id == 0 ) {
            $serv->addtimer(500);
            $serv->addtimer(1000);
            $serv->addtimer(1500);
        }
    }

    public function onTimer($serv, $interval) {
        switch( $interval ) {
            case 500: { // 
                echo "Do Thing A at interval 500\n";
                break;
            }
            case 1000:{
                echo "Do Thing B at interval 1000\n";
                break;
            }
            case 1500:{
                echo "Do Thing C at interval 1500\n";
                break;
            }
        }
    }

可以看到,在onWorkerStart回調(diào)函數(shù)中,通過addtimer添加了三個定時器,時間間隔分別為500、1000、1500。而在onTimer回調(diào)中,正好通過間隔的不同來區(qū)分不同的定時器回調(diào),從而執(zhí)行不同的操作。
需要注意的是,在上述示例中,當(dāng)1000ms的定時器被觸發(fā)時,500ms的定時器同樣會被觸發(fā),但是不能保證會在1000ms定時器前觸發(fā)還是后觸發(fā),因此需要注意,定時器中的操作不能依賴其他定時器的執(zhí)行結(jié)果。

點此查看完整示例

(PS:在Swoole-1.7.7版本,新提供了一個after函數(shù), 這個功能的用法會在以后的教程中給出。)

2.心跳檢測

上文提到過,使用Timer定時器功能可以實現(xiàn)發(fā)送心跳包的功能。事實上,Swoole已經(jīng)內(nèi)置了心跳檢測功能,能自動close掉長時間沒有數(shù)據(jù)來往的連接。而開啟心跳檢測功能,只需要設(shè)置heartbeat_check_intervalheartbeat_idle_time即可。如下:

$this->serv->set(
    array(
        'heartbeat_check_interval' => 60,
        'heartbeat_idle_time' => 600,
    )
);

其中heartbeat_idle_time的默認(rèn)值是heartbeat_check_interval的兩倍。 在設(shè)置這兩個選項后,swoole會在內(nèi)部啟動一個線程,每隔heartbeat_check_interval秒后遍歷一次全部連接,檢查最近一次發(fā)送數(shù)據(jù)的時間和當(dāng)前時間的差,如果這個差值大于heartbeat_idle_time,則會強(qiáng)制關(guān)閉這個連接,并通過回調(diào)onClose通知Server進(jìn)程。?點此查看完整示例?小技巧: 結(jié)合之前的Timer功能,如果我們想維持連接,就設(shè)置一個略小于如果這個差值大于heartbeat_idle_time的定時器,在定時器內(nèi)向所有連接發(fā)送一個心跳包。如果收到心跳回應(yīng),則判斷連接正常,如果沒有收到,則關(guān)閉這個連接或者再次嘗試發(fā)送。

3.Task進(jìn)階:MySQL連接池

上一章中我簡單講解了如何開啟和使用Task功能。這一節(jié),我將提供一個Task的高級用法。

在PHP中,訪問MySQL數(shù)據(jù)庫往往是性能提升的瓶頸。而MySQL連接池我想大家都不陌生,這是一個很好的提升數(shù)據(jù)庫訪問性能的方式。傳統(tǒng)的MySQL連接池,是預(yù)先申請一定數(shù)量的連接,每一個新的請求都會占用其中一個連接,請求結(jié)束后再將連接放回池中,如果所有連接都被占用,新來的連接則會進(jìn)入等待狀態(tài)。
知道了MySQL連接池的實現(xiàn)原理,那我們來看如何使用Swoole實現(xiàn)一個連接池。
首先,Swoole允許開啟一定量的Task Worker進(jìn)程,我們可以讓每個進(jìn)程都擁有一個MySQL連接,并保持這個連接,這樣,我們就創(chuàng)建了一個連接池。
其次,設(shè)置swoole的dispatch_mode為搶占模式(主進(jìn)程會根據(jù)Worker的忙閑狀態(tài)選擇投遞,只會投遞給處于閑置狀態(tài)的Worker)。這樣,每個task都會被投遞給閑置的Task Worker。這樣,我們保證了每個新的task都會被閑置的Task Worker處理,如果全部Task Worker都被占用,則會進(jìn)入等待隊列。

下面直接上關(guān)鍵代碼:

public function onWorkerStart( $serv , $worker_id) {
    echo "onWorkerStart\n";
    // 判定是否為Task Worker進(jìn)程
    if( $worker_id >= $serv->setting['worker_num'] ) {
        $this->pdo = new PDO(
            "mysql:host=localhost;port=3306;dbname=Test", 
            "root", 
            "123456", 
            array(
                PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8';",
                PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
                PDO::ATTR_PERSISTENT => true
            )
        );
    }
}

首先,在每個Task Worker進(jìn)程中,創(chuàng)建一個MySQL連接。這里我選用了PDO擴(kuò)展。

public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
    $sql = array(
        'sql'=>'select * from Test where pid > ?',
        'param' => array(
            0
        ),
        'fd' => $fd
    );
    $serv->task( json_encode($sql) );
}

其次,在需要的時候,通過task函數(shù)投遞一個任務(wù)(也就是發(fā)起一次SQL請求)

public function onTask($serv,$task_id,$from_id, $data) {
    $sql = json_decode( $data , true );

    $statement = $this->pdo->prepare($sql['sql']);
    $statement->execute($sql['param']);     

    $result = $statement->fetchAll(PDO::FETCH_ASSOC);
    $serv->send( $sql['fd'],json_encode($result));
    return true;
}

最后,在onTask回調(diào)中,根據(jù)請求過來的SQL語句以及相應(yīng)的參數(shù),發(fā)起一次MySQL請求,并將獲取到的結(jié)果通過send發(fā)送給客戶端(或者通過return返回給Worker進(jìn)程)。而且,這樣的一次MySQL請求還不會阻塞Worker進(jìn)程,Worker進(jìn)程可以繼續(xù)處理其他的邏輯。

可以看到,簡單十幾行代碼,就實現(xiàn)了一個高效的異步MySQL連接池。
通過測試,單個客戶端一共發(fā)起1W次select請求,共耗時9s;
1W次insert請求,共耗時21s。
(客戶端會在每次收到前一個請求的結(jié)果后才會發(fā)起下一次請求,而不是并發(fā))。

點此查看完整服務(wù)端代碼
點此查看完整客戶端代碼

4.Task實戰(zhàn):yii中應(yīng)用task

在YII框架中結(jié)合了swoole 的task 做了異步處理。 本例中 主要用到 1、protected/commands/ServerCommand.php 用來做server。 2、protected/event/下的文件 這里是在異步中的具體實現(xiàn)。

客戶端調(diào)用參照 TestController

<?php
class TestController extends Controller{
    public function actionTT(){
        $message['uid'] = 2;
        $message['email'] = '83212019@qq.com';
        $message['title'] = '接口報警郵件';
        $message['contents'] = "'EmailEvent'接口請求過程出錯! 錯誤信息如下:err_no:'00000' err_msg:'測試隊列' 請求參數(shù)為:'[]'";
        $message['type'] = 2;

        $data['param'] = $message;
        $data['class'] = 'Email';
        $client = new EventClient();
        $data = $client->send($data);
    }
}
?>

有個task表是用來記錄異步任務(wù)的。如果失敗重試3次。sql在protected/data/sql.sql里。
點此查看完整客戶端代碼

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號