Async本身是一個非常有用的構造,它將通過其合作多任務基礎設施提供可能的節(jié)省時間。然而,我們知道將有一組常用的功能,其中Async最有用:數(shù)據(jù)庫訪問和緩存,Web資源訪問和流。
Async MySQL擴展類似于 mysqliHHVM附帶的擴展。此擴展將主要用于Async創(chuàng)建連接和查詢MySQL數(shù)據(jù)庫。
在完整的API將包含所有可以通過Async訪問MySQL中的類和方法; 我們將在這里介紹一些更常見的情況。
連接到MySQL數(shù)據(jù)庫的主要類別是 AsyncMysqlConnectionPool其主要方法是異Async connect()。
查詢數(shù)據(jù)庫的主要類別是 AsyncMysqlConnection使用兩種主要的查詢方法,query()以及queryf()兩種Async。還有一個功能來確保要執(zhí)行的查詢被安全地調用escapeString()。
從查詢中檢索結果的主類是一個被稱為抽象類AsyncMysqlResult,它本身有兩個被稱為的具體子類AsyncMysqlQueryResult 和 AsyncMysqlErrorResult。這些類的主要方法是,vectorRows()并且mapRows()都是非同步的。
<?hh
class AsyncMysqlConnectionPool {
public function __construct(array $pool_options): void;
public static async function connect(
string $host,
int $port,
string $dbname,
string $user,
string $password,
int $timeout_micros = -1,
string $extra_key = ""): Awaitable<AsyncMysqlConnection>;
// More methods in this class, of course
}
class AsyncMysqlConnection {
public function query(string $query, int $timeout_micros)
: Awaitable<AsyncMysqlResult>;
public function queryf(string $pattern, ..$args)
: Awaitable<AsyncMysqlResult>;
public function escapeString(string $data): string;
// More methods in this class, of course
}
class AsyncMysqlQueryResult extends AsyncMysqlResult {
public function vectorRows(): Vector; // vector of Vectors
public function mapRows(): Vector; // vector of Maps
// return db column values as Hack types instead of
// string representations of those types.
public function vectorRowsTyped(): Vector;
public function mapRowsTyped(): Vector;
// More methods in this class, of course
}
class AsyncMysqlErrorResult extends AsyncMysqlResult {
public function failureType(): string;
public function mysql_errno(): int;
public function mysql_error(): string;
// More methods in this class, of course
}
以下是一個簡單的示例,顯示如何從具有此擴展名的數(shù)據(jù)庫獲取用戶名。
<?hh
namespace Hack\UserDocumentation\Async\Extensions\Examples\MySQL;
use \Hack\UserDocumentation\Async\Extensions\Examples\AsyncMysql\ConnectionInfo as CI;
async function get_connection(): Awaitable<\AsyncMysqlConnection> {
// Get a connection pool with default options
$pool = new \AsyncMysqlConnectionPool(array());
// Change credentials to something that works in order to test this code
return await $pool->connect(
CI::$host,
CI::$port,
CI::$db,
CI::$user,
CI::$passwd,
);
}
async function fetch_user_name(\AsyncMysqlConnection $conn,
int $user_id) : Awaitable<string> {
// Your table and column may differ, of course
$result = await $conn->queryf(
'SELECT name from test_table WHERE userID = %d',
$user_id
);
// There shouldn't be more than one row returned for one user id
invariant($result->numRows() === 1, 'one row exactly');
// A vector of vector objects holding the string values of each column
// in the query
$vector = $result->vectorRows();
return $vector[0][0]; // We had one column in our query
}
async function get_user_info(\AsyncMysqlConnection $conn,
string $user): Awaitable<Vector<Map>> {
$result = await $conn->queryf(
'SELECT * from test_table WHERE name = %s',
$conn->escapeString($user)
);
// A vector of map objects holding the string values of each column
// in the query, and the keys being the column names
$map = $result->mapRows();
return $map;
}
async function async_mysql_tutorial(): Awaitable<void> {
$conn = await get_connection();
if ($conn !== null) {
$result = await fetch_user_name($conn, 2);
var_dump($result);
$info = await get_user_info($conn, 'Fred Emmott');
var_dump($info instanceof Vector);
var_dump($info[0] instanceof Map);
}
}
\HH\Asio\join(async_mysql_tutorial());
Output
string(11) "Fred Emmott"
bool(true)
bool(true)
Async MySQL擴展不支持復用 - 每個并發(fā)查詢需要自己的連接。但是,擴展支持連接池。
Async MySQL擴展提供了一種池連接對象的機制,因此您不需要在每次進行查詢時創(chuàng)建新的連接。the class是AsyncMysqlConnectionPool 一個可以這樣創(chuàng)建:
?hh
namespace Hack\UserDocumentation\Async\Extensions\Examples\MySQLConnectionPool;
use \Hack\UserDocumentation\Async\Extensions\Examples\AsyncMysql\ConnectionInfo as CI;
function get_pool(): \AsyncMysqlConnectionPool {
return new \AsyncMysqlConnectionPool(
array('pool_connection_limit' => 100)
); // See API for more pool options
}
async function get_connection(): Awaitable<\AsyncMysqlConnection> {
$pool = get_pool();
$conn = await $pool->connect(
CI::$host,
CI::$port,
CI::$db,
CI::$user,
CI::$passwd,
);
return $conn;
}
async function run(): Awaitable<void> {
$conn = await get_connection();
var_dump($conn);
}
\HH\Asio\join(run());
Output
object(AsyncMysqlConnection)#6 (0) {
}
這是強烈建議您使用連接池為您的MySQL連接; 如果由于某種原因你真的需要一個,單個異步客戶端,有一個AsyncMysqlClient
提供一個connect()
方法的類。
MCRouter是一個memcached協(xié)議路由庫。為了幫助您的memcached memcached部署,它提供了連接池,基于前綴的路由等功能。
Async MCRouter擴展基本上是作為HHVM一部分的Memcached擴展的Async但是子集的版本。主要課程是MCRouter。有兩種方法可以創(chuàng)建MCRouter對象的實例。該createSimple()采取其中Memcached是正在運行的服務器地址的矢量。更多的配置__construct()允許更多的選項調整。得到一個對象后,您可以使用Async的核心memcached協(xié)議的方法,如版本add(),get()和del()。
class MCRouter {
public function __construct(array<stirng, mixed> $options, string $pid = '');
public static function createSimple(ConstVector<string> $servers): MCRouter;
public async function add(string $key, string $value, int $flags = 0,
int $expiration = 0): Awaitable<void>;
public async function get(string $key): Awaitable<string>;
public async function del(string $key): Awaitable<void>;
// More methods exist, of course
}
以下是一個簡單的示例,顯示如何從memcached獲取用戶名:
<?hh
namespace Hack\UserDocumentation\Async\Extensions\Examples\MCRouter;
require __DIR__ . "/../../../../vendor/hh_autoload.php"; // For wrap()
function get_mcrouter_object(): \MCRouter {
$servers = Vector { getenv('HHVM_TEST_MCROUTER') };
$mc = \MCRouter::createSimple($servers);
return $mc;
}
async function add_user_name(
\MCRouter $mcr,
int $id,
string $value): Awaitable<void> {
$key = 'name:' . $id;
await $mcr->set($key, $value);
}
async function get_user_name(\MCRouter $mcr, int $user_id): Awaitable<string> {
$key = 'name:' . $user_id;
try {
$res = await \HH\Asio\wrap($mcr->get($key));
if ($res->isSucceeded()) {
return $res->getResult();
}
return "";
} catch (\MCRouterException $ex) {
echo $ex->getKey() . PHP_EOL . $ex->getOp();
return "";
}
}
async function run(): Awaitable<void> {
$mcr = get_mcrouter_object();
await add_user_name($mcr, 1, 'Joel');
$name = await get_user_name($mcr, 1);
var_dump($name); // Should print "Joel"
}
\HH\Asio\join(run());
Output
string(4) "Joel"
如果使用此協(xié)議時出現(xiàn)問題,則可能會拋出兩個可能的異常。MCRouterException
當核心選項發(fā)生錯誤時,如刪除密鑰一樣被拋出。MCRouterOptionException
當您提供不可解析的選項列表時發(fā)生。
Hack目前為cURL提供了兩個Async功能。
cURL為URL提供了一個數(shù)據(jù)傳輸庫。異步cURL擴展提供了兩個功能,其中一個是圍繞另一個的包裝。curl_multi_await()是HHVM的異步版本curl_multi_select()。它等待直到cURL句柄上有活動,并且完成后,您可以使用curl_multi_exec()處理結果,就像在non-async情況下一樣。
async function curl_multi_await(resource $mh,
float $timeout = 1.0): Awaitable<int>;
HH\Asio\curl_exec()是一個包裝紙curl_multi_await()。它很容易使用,因為您不必擔心資源創(chuàng)建,因為您只需傳遞字符串URL即可。
namespace HH\Asio {
async function curl_exec(mixed $urlOrHandle): Awaitable<string>;
}
以下是使用lambda來獲取URL內(nèi)容向量的示例,以減少使用完全關閉語法的代碼冗長度。
<?hh
namespace Hack\UserDocumentation\Async\Extensions\Examples\Curl;
function get_urls(): Vector<string> {
return Vector {
"http://example.com",
"http://example.net",
"http://example.org",
};
}
async function get_combined_contents(Vector $urls): Awaitable<Vector<string>> {
// Use lambda shorthand syntax here instead of full closure syntax
$handles = $urls->mapWithKey(($idx, $url) ==> \HH\Asio\curl_exec($url));
$contents = await \HH\Asio\v($handles);
echo $contents->count();
return $contents;
}
\HH\Asio\join(get_combined_contents(get_urls()));
Output
3
Async stream擴展有一個功能, stream_await()
,其功能類似于HHVM stream_select()
。它等待流進入狀態(tài)(例如STREAM_AWAIT_READY
),但沒有多路復用功能stream_select()
。您可以使用HH \ Asio \ v()等待多個流句柄,但是所有組合等待的結果將不會完成,直到所有底層流完成。
async function stream_await(resource $fp, int $events,
float $timeout = 0.0): Awaitable<int>;
此示例顯示如何使用 stream_await() 寫資源。
<?hh
namespace Hack\UserDocumentation\Async\Extensions\Examples\Stream;
function get_resources(): array<resource> {
$r1 = fopen('php://stdout', 'w');
$r2 = fopen('php://stdout', 'w');
$r3 = fopen('php://stdout', 'w');
return array($r1, $r2, $r3);
}
async function write_all(array<resource> $resources): Awaitable<void> {
// UNSAFE : the typechecker isn't aware of stream_await until 3.12 :(
$write_single_resource = async function(resource $r) {
$status = await stream_await($r, STREAM_AWAIT_WRITE, 1.0);
if ($status === STREAM_AWAIT_READY) {
fwrite($r, str_shuffle('ABCDEF') . PHP_EOL);
}
};
// You will get 3 shuffled strings, each on a separate line.
await \HH\Asio\v(array_map($write_single_resource, $resources));
}
\HH\Asio\join(write_all(get_resources()));
Output
DCFAEB
BAFCDE
ACBDEF
更多建議: