Async擴展

2018-10-10 11:29 更新

Async本身是一個非常有用的構造,它將通過其合作多任務基礎設施提供可能的節(jié)省時間。然而,我們知道將有一組常用的功能,其中Async最有用:數(shù)據(jù)庫訪問和緩存,Web資源訪問和流。

MySQL的

Async MySQL擴展類似于 mysqliHHVM附帶的擴展。此擴展將主要用于Async創(chuàng)建連接和查詢MySQL數(shù)據(jù)庫。

完整的API將包含所有可以通過Async訪問MySQL中的類和方法; 我們將在這里介紹一些更常見的情況。

連接到MySQL數(shù)據(jù)庫的主要類別是 AsyncMysqlConnectionPool其主要方法是異Async connect()。

查詢數(shù)據(jù)庫的主要類別是 AsyncMysqlConnection使用兩種主要的查詢方法,query()以及queryf()兩種Async。還有一個功能來確保要執(zhí)行的查詢被安全地調用escapeString()。

從查詢中檢索結果的主類是一個被稱為抽象類AsyncMysqlResult,它本身有兩個被稱為的具體子類AsyncMysqlQueryResult 和 AsyncMysqlErrorResult。這些類的主要方法是,vectorRows()并且mapRows()都是非同步的。

<?hh
class AsyncMysqlConnectionPool {
  public function __construct(array $pool_options): void;
  public static async function connect(
    string $host,
    int $port,
    string $dbname,
    string $user,
    string $password,
    int $timeout_micros = -1,
    string $extra_key = ""): Awaitable<AsyncMysqlConnection>;
  // More methods in this class, of course
}

class AsyncMysqlConnection {
  public function query(string $query, int $timeout_micros)
    : Awaitable<AsyncMysqlResult>;
  public function queryf(string $pattern, ..$args)
    : Awaitable<AsyncMysqlResult>;
  public function escapeString(string $data): string;
  // More methods in this class, of course
}

class AsyncMysqlQueryResult extends AsyncMysqlResult {
  public function vectorRows(): Vector; // vector of Vectors
  public function mapRows(): Vector; // vector of Maps
  // return db column values as Hack types instead of
  // string representations of those types.
  public function vectorRowsTyped(): Vector;
  public function mapRowsTyped(): Vector;
  // More methods in this class, of course
}

class AsyncMysqlErrorResult extends AsyncMysqlResult {
  public function failureType(): string;
  public function mysql_errno(): int;
  public function mysql_error(): string;
  // More methods in this class, of course
}

以下是一個簡單的示例,顯示如何從具有此擴展名的數(shù)據(jù)庫獲取用戶名。

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\MySQL;

use \Hack\UserDocumentation\Async\Extensions\Examples\AsyncMysql\ConnectionInfo as CI;

async function get_connection(): Awaitable<\AsyncMysqlConnection> {
  // Get a connection pool with default options
  $pool = new \AsyncMysqlConnectionPool(array());
  // Change credentials to something that works in order to test this code
  return await $pool->connect(
    CI::$host,
    CI::$port,
    CI::$db,
    CI::$user,
    CI::$passwd,
  );
}

async function fetch_user_name(\AsyncMysqlConnection $conn,
                               int $user_id) : Awaitable<string> {
  // Your table and column may differ, of course
  $result = await $conn->queryf(
    'SELECT name from test_table WHERE userID = %d',
    $user_id
  );
  // There shouldn't be more than one row returned for one user id
  invariant($result->numRows() === 1, 'one row exactly');
  // A vector of vector objects holding the string values of each column
  // in the query
  $vector = $result->vectorRows();
  return $vector[0][0]; // We had one column in our query
}

async function get_user_info(\AsyncMysqlConnection $conn,
                             string $user): Awaitable<Vector<Map>> {
  $result = await $conn->queryf(
    'SELECT * from test_table WHERE name = %s',
    $conn->escapeString($user)
  );
  // A vector of map objects holding the string values of each column
  // in the query, and the keys being the column names
  $map = $result->mapRows();
  return $map;
}

async function async_mysql_tutorial(): Awaitable<void> {
  $conn = await get_connection();
  if ($conn !== null) {
    $result = await fetch_user_name($conn, 2);
    var_dump($result);
    $info = await get_user_info($conn, 'Fred Emmott');
    var_dump($info instanceof Vector);
    var_dump($info[0] instanceof Map);
  }
}

\HH\Asio\join(async_mysql_tutorial());

Output

string(11) "Fred Emmott"
bool(true)
bool(true)

連接池

Async MySQL擴展不支持復用 - 每個并發(fā)查詢需要自己的連接。但是,擴展支持連接池。

Async MySQL擴展提供了一種池連接對象的機制,因此您不需要在每次進行查詢時創(chuàng)建新的連接。the class是AsyncMysqlConnectionPool 一個可以這樣創(chuàng)建:

?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\MySQLConnectionPool;

use \Hack\UserDocumentation\Async\Extensions\Examples\AsyncMysql\ConnectionInfo as CI;

function get_pool(): \AsyncMysqlConnectionPool {
  return new \AsyncMysqlConnectionPool(
    array('pool_connection_limit' => 100)
  ); // See API for more pool options
}

async function get_connection(): Awaitable<\AsyncMysqlConnection> {
  $pool = get_pool();
  $conn = await $pool->connect(
    CI::$host,
    CI::$port,
    CI::$db,
    CI::$user,
    CI::$passwd,
  );
  return $conn;
}

async function run(): Awaitable<void> {
  $conn = await get_connection();
  var_dump($conn);
}

\HH\Asio\join(run());

Output

object(AsyncMysqlConnection)#6 (0) {
}

這是強烈建議您使用連接池為您的MySQL連接; 如果由于某種原因你真的需要一個,單個異步客戶端,有一個AsyncMysqlClient提供一個connect()方法的類。

MCRouter

MCRouter是一個memcached協(xié)議路由庫。為了幫助您的memcached memcached部署,它提供了連接池,基于前綴的路由等功能。

Async MCRouter擴展基本上是作為HHVM一部分的Memcached擴展的Async但是子集的版本。主要課程是MCRouter。有兩種方法可以創(chuàng)建MCRouter對象的實例。該createSimple()采取其中Memcached是正在運行的服務器地址的矢量。更多的配置__construct()允許更多的選項調整。得到一個對象后,您可以使用Async的核心memcached協(xié)議的方法,如版本add(),get()del()。

class MCRouter {
  public function __construct(array<stirng, mixed> $options, string $pid = '');
  public static function createSimple(ConstVector<string> $servers): MCRouter;
  public async function add(string $key, string $value, int $flags = 0,
                            int $expiration = 0): Awaitable<void>;
  public async function get(string $key): Awaitable<string>;
  public async function del(string $key): Awaitable<void>;
  // More methods exist, of course
}

以下是一個簡單的示例,顯示如何從memcached獲取用戶名:

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\MCRouter;

require __DIR__ . "/../../../../vendor/hh_autoload.php"; // For wrap()

function get_mcrouter_object(): \MCRouter {
  $servers = Vector { getenv('HHVM_TEST_MCROUTER') };
  $mc = \MCRouter::createSimple($servers);
  return $mc;
}

async function add_user_name(
  \MCRouter $mcr,
  int $id,
  string $value): Awaitable<void> {
  $key = 'name:' . $id;
  await $mcr->set($key, $value);
}

async function get_user_name(\MCRouter $mcr, int $user_id): Awaitable<string> {
  $key = 'name:' . $user_id;
  try {
    $res = await \HH\Asio\wrap($mcr->get($key));
    if ($res->isSucceeded()) {
      return $res->getResult();
    }
    return "";
  } catch (\MCRouterException $ex) {
    echo $ex->getKey() . PHP_EOL . $ex->getOp();
    return "";
  }
}

async function run(): Awaitable<void> {
  $mcr = get_mcrouter_object();
  await add_user_name($mcr, 1, 'Joel');
  $name = await get_user_name($mcr, 1);
  var_dump($name); // Should print "Joel"
}

\HH\Asio\join(run());

Output

string(4) "Joel"

如果使用此協(xié)議時出現(xiàn)問題,則可能會拋出兩個可能的異常。MCRouterException當核心選項發(fā)生錯誤時,如刪除密鑰一樣被拋出。MCRouterOptionException當您提供不可解析的選項列表時發(fā)生。

卷曲

Hack目前為cURL提供了兩個Async功能。

curl_multi_await

cURL為URL提供了一個數(shù)據(jù)傳輸庫。異步cURL擴展提供了兩個功能,其中一個是圍繞另一個的包裝。curl_multi_await()是HHVM的異步版本curl_multi_select()。它等待直到cURL句柄上有活動,并且完成后,您可以使用curl_multi_exec()處理結果,就像在non-async情況下一樣。

async function curl_multi_await(resource $mh,
                                float $timeout = 1.0): Awaitable<int>;

curl_exec

HH\Asio\curl_exec()是一個包裝紙curl_multi_await()。它很容易使用,因為您不必擔心資源創(chuàng)建,因為您只需傳遞字符串URL即可。

namespace HH\Asio {
  async function curl_exec(mixed $urlOrHandle): Awaitable<string>;
}

以下是使用lambda來獲取URL內(nèi)容向量的示例,以減少使用完全關閉語法的代碼冗長度。

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\Curl;

function get_urls(): Vector<string> {
  return Vector {
    "http://example.com",
    "http://example.net",
    "http://example.org",
  };
}

async function get_combined_contents(Vector $urls): Awaitable<Vector<string>> {
  // Use lambda shorthand syntax here instead of full closure syntax
  $handles = $urls->mapWithKey(($idx, $url) ==> \HH\Asio\curl_exec($url));
  $contents = await \HH\Asio\v($handles);
  echo $contents->count();
  return $contents;
}

\HH\Asio\join(get_combined_contents(get_urls()));

Output

3

Streams

Async stream擴展有一個功能, stream_await(),其功能類似于HHVM stream_select()。它等待流進入狀態(tài)(例如STREAM_AWAIT_READY),但沒有多路復用功能stream_select()。您可以使用HH \ Asio \ v()等待多個流句柄,但是所有組合等待的結果將不會完成,直到所有底層流完成。

async function stream_await(resource $fp, int $events,
                            float $timeout = 0.0): Awaitable<int>;

此示例顯示如何使用 stream_await() 寫資源。

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\Stream;

function get_resources(): array<resource> {
  $r1 = fopen('php://stdout', 'w');
  $r2 = fopen('php://stdout', 'w');
  $r3 = fopen('php://stdout', 'w');

  return array($r1, $r2, $r3);
}

async function write_all(array<resource> $resources): Awaitable<void> {
  // UNSAFE : the typechecker isn't aware of stream_await until 3.12 :(
  $write_single_resource = async function(resource $r) {
    $status = await stream_await($r, STREAM_AWAIT_WRITE, 1.0);
    if ($status === STREAM_AWAIT_READY) {
      fwrite($r, str_shuffle('ABCDEF') . PHP_EOL);
    }
  };
  // You will get 3 shuffled strings, each on a separate line.
  await \HH\Asio\v(array_map($write_single_resource, $resources));
}

\HH\Asio\join(write_all(get_resources()));

Output

DCFAEB
BAFCDE
ACBDEF
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號