我們要使用 Scala 和先前介紹的 Finagle 框架構(gòu)建一個(gè)簡單的分布式搜索引擎。
從廣義上講,我們的設(shè)計(jì)目標(biāo)包括 抽象 (abstraction:在不知道其內(nèi)部的所有細(xì)節(jié)的前提下,利用該系統(tǒng)功能的能力)、 模塊化 (modularity:把系統(tǒng)分解為小而簡單的片段,從而更容易被理解和/或被更換的能力)和 擴(kuò)展性 (scalability:用簡單直接的方法給系統(tǒng)擴(kuò)容的能力)。
我們要描述的系統(tǒng)有三個(gè)部分: (1) 客戶端 發(fā)出請求,(2) 服務(wù)端 接收請求并應(yīng)答,和(3) 傳送 機(jī)制來這些通信包裝起來。通常情況下,客戶端和服務(wù)器位于不同的機(jī)器上,通過網(wǎng)絡(luò)上的一個(gè)特定的端口進(jìn)行通信,但在這個(gè)例子中,它們將運(yùn)行在同一臺機(jī)器上(而且仍然使用端口進(jìn)行通信) 。在我們的例子中,客戶端和服務(wù)器將用 Scala 編寫,傳送協(xié)議將使用 Thrift 處理。本教程的主要目的是展示一個(gè)簡單的具有良好可擴(kuò)展性的服務(wù)器和客戶端。
首先,使用 scala-bootstrapper 創(chuàng)建一個(gè)骨架項(xiàng)目( “ Searchbird ” )。這將創(chuàng)建一個(gè)簡單的基于 Finagle 和 key-value 內(nèi)存存儲的 Scala 服務(wù)。我們將擴(kuò)展這個(gè)工程以支持搜索值,并進(jìn)而支持多進(jìn)程多個(gè)內(nèi)存存儲的搜索。
$ mkdir searchbird ; cd searchbird
$ scala-bootstrapper searchbird
writing build.sbt
writing config/development.scala
writing config/production.scala
writing config/staging.scala
writing config/test.scala
writing console
writing Gemfile
writing project/plugins.sbt
writing README.md
writing sbt
writing src/main/scala/com/twitter/searchbird/SearchbirdConsoleClient.scala
writing src/main/scala/com/twitter/searchbird/SearchbirdServiceImpl.scala
writing src/main/scala/com/twitter/searchbird/config/SearchbirdServiceConfig.scala
writing src/main/scala/com/twitter/searchbird/Main.scala
writing src/main/thrift/searchbird.thrift
writing src/scripts/searchbird.sh
writing src/scripts/config.sh
writing src/scripts/devel.sh
writing src/scripts/server.sh
writing src/scripts/service.sh
writing src/test/scala/com/twitter/searchbird/AbstractSpec.scala
writing src/test/scala/com/twitter/searchbird/SearchbirdServiceSpec.scala
writing TUTORIAL.md
首先,來看下 scala-bootstrapper 為我們創(chuàng)建的默認(rèn)項(xiàng)目。這是一個(gè)模板。雖然最終將替換它的大部分內(nèi)容,不過作為支架它還是很方便的。它定義了一個(gè)簡單(但完整)的 key-value 存儲,并包含了配置、thrift 接口、統(tǒng)計(jì)輸出和日志記錄。
在我們看代碼之前,先運(yùn)行一個(gè)客戶端和服務(wù)器,看看它是如何工作的。這里是我們構(gòu)建的:
這里是我們的服務(wù)輸出的接口。由于 Searchbird 服務(wù)是一個(gè) Thrift 服務(wù)(和我們大部分服務(wù)一樣),因而其外部接口使用 Thrift IDL(“接口描述語言”)定義。
src/main/thrift/searchbird.thrift
service SearchbirdService {
string get(1: string key) throws(1: SearchbirdException ex)
void put(1: string key, 2: string value)
}
這是非常直觀的:我們的服務(wù) SearchbirdService 輸出兩個(gè) RPC 方法 get 和 put 。他們組成了一個(gè)到 key-value 存儲的簡單接口。
現(xiàn)在,讓我們運(yùn)行默認(rèn)的服務(wù),啟動(dòng)客戶端連接到這個(gè)服務(wù),并通過這個(gè)接口來探索他們。打開兩個(gè)窗口,一個(gè)用于服務(wù)器,一個(gè)用于客戶端。
在第一個(gè)窗口中,用交互模式啟動(dòng) SBT(在命令行中運(yùn)行 ./sbt
[1]),然后構(gòu)建和運(yùn)行項(xiàng)目內(nèi) SBT。這會(huì)運(yùn)行 Main.scala 定義的 主 進(jìn)程。
$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala
配置文件 (development.scala) 實(shí)例化一個(gè)新的服務(wù),并監(jiān)聽 9999 端口??蛻舳丝梢赃B接到 9999 端口使用此服務(wù)。
現(xiàn)在,我們將使用 控制臺 shell腳本初始化和運(yùn)行一個(gè)客戶端實(shí)例,即 SearchbirdConsoleClient 實(shí)例 (SearchbirdConsoleClient.scala) 。在另一個(gè)窗口中運(yùn)行此腳本:
$ ./console 127.0.0.1 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.
finagle-client>
客戶端對象 client 現(xiàn)在連接到本地計(jì)算機(jī)上的 9999 端口,并可以跟服務(wù)交互了。接下來我們發(fā)送一些請求:
scala> client.put("marius", "Marius Eriksen")
res0: ...
scala> client.put("stevej", "Steve Jenson")
res1: ...
scala> client.get("marius")
res2: com.twitter.util.Future[String] = ...
scala> client.get("marius").get()
res3: String = Marius Eriksen
(第二個(gè) get() 調(diào)用解析 client.get() 返回的 Future 類型值,阻塞直到該值準(zhǔn)備好。)
該服務(wù)器還輸出運(yùn)行統(tǒng)計(jì)(配置文件中指定這些信息在 9900 端口)。這不僅方便對各個(gè)服務(wù)器進(jìn)行檢查,也利于聚集全局的服務(wù)統(tǒng)計(jì)(以機(jī)器可讀的 JSON 接口)。打開第三個(gè)窗口來查看這些統(tǒng)計(jì):
$ curl localhost:9900/stats.txt
counters:
Searchbird/connects: 1
Searchbird/received_bytes: 264
Searchbird/requests: 3
Searchbird/sent_bytes: 128
Searchbird/success: 3
jvm_gc_ConcurrentMarkSweep_cycles: 1
jvm_gc_ConcurrentMarkSweep_msec: 15
jvm_gc_ParNew_cycles: 24
jvm_gc_ParNew_msec: 191
jvm_gc_cycles: 25
jvm_gc_msec: 206
gauges:
Searchbird/connections: 1
Searchbird/pending: 0
jvm_fd_count: 135
jvm_fd_limit: 10240
jvm_heap_committed: 85000192
jvm_heap_max: 530186240
jvm_heap_used: 54778640
jvm_nonheap_committed: 89657344
jvm_nonheap_max: 136314880
jvm_nonheap_used: 66238144
jvm_num_cpus: 4
jvm_post_gc_CMS_Old_Gen_used: 36490088
jvm_post_gc_CMS_Perm_Gen_used: 54718880
jvm_post_gc_Par_Eden_Space_used: 0
jvm_post_gc_Par_Survivor_Space_used: 1315280
jvm_post_gc_used: 92524248
jvm_start_time: 1345072684280
jvm_thread_count: 16
jvm_thread_daemon_count: 7
jvm_thread_peak_count: 16
jvm_uptime: 1671792
labels:
metrics:
Searchbird/handletime_us: (average=9598, count=4, maximum=19138, minimum=637, p25=637, p50=4265, p75=14175, p90=19138, p95=19138, p99=19138, p999=19138, p9999=19138, sum=38393)
Searchbird/request_latency_ms: (average=4, count=3, maximum=9, minimum=0, p25=0, p50=5, p75=9, p90=9, p95=9, p99=9, p999=9, p9999=9, sum=14)
除了我們自己的服務(wù)統(tǒng)計(jì)信息以外,還有一些通用的 JVM 統(tǒng)計(jì)。
現(xiàn)在,讓我們來看看配置、服務(wù)器和客戶端的實(shí)現(xiàn)代碼。
…/config/SearchbirdServiceConfig.scala
配置是一個(gè) Scala 的特質(zhì),有一個(gè)方法 apply: RuntimeEnvironment => T
來創(chuàng)建一些 T 。在這個(gè)意義上,配置是“工廠” 。在運(yùn)行時(shí),配置文件(通過使用Scala編譯器庫)被取值為一個(gè)腳本,并產(chǎn)生一個(gè)配置對象。 RuntimeEnvironment 是一個(gè)提供各種運(yùn)行參數(shù)(命令行標(biāo)志, JVM 版本,編譯時(shí)間戳等)查詢的一個(gè)對象。
SearchbirdServiceConfig 類就是這樣一個(gè)配置類。它使用其默認(rèn)值一起指定配置參數(shù)。 (Finagle 支持一個(gè)通用的跟蹤系統(tǒng),我們在本教程將不會(huì)介紹: Zipkin 一個(gè)集合/聚合軌跡的 分布式跟蹤系統(tǒng)。)
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
var thriftPort: Int = 9999
var tracerFactory: Tracer.Factory = NullTracer.factory
def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this)
}
在我們的例子中,我們要?jiǎng)?chuàng)建一個(gè) SearchbirdService.ThriftServer。這是由 thrift 代碼生成器生成的服務(wù)器類型[2]。
…/Main.scala
在 SBT 控制臺中鍵入“run”調(diào)用 main ,這將配置和初始化服務(wù)器。它讀取配置(在 development.scala 中指定,并會(huì)作為參數(shù)傳給“run”),創(chuàng)建 SearchbirdService.ThriftServer ,并啟動(dòng)它。 RuntimeEnvironment.loadRuntimeConfig 執(zhí)行配置賦值,并把自身作為一個(gè)參數(shù)來調(diào)用 apply [3]。
object Main {
private val log = Logger.get(getClass)
def main(args: Array[String]) {
val runtime = RuntimeEnvironment(this, args)
val server = runtime.loadRuntimeConfig[SearchbirdService.ThriftServer]
try {
log.info("Starting SearchbirdService")
server.start()
} catch {
case e: Exception =>
log.error(e, "Failed starting SearchbirdService, exiting")
ServiceTracker.shutdown()
System.exit(1)
}
}
}
…/SearchbirdServiceImpl.scala
這是實(shí)質(zhì)的服務(wù):我們用自己的實(shí)現(xiàn)擴(kuò)展 SearchbirdService.ThriftServer ?;貞浺幌?thrift 為我們生成的 SearchbirdService.ThriftServer 。它為每一個(gè) thrift 方法生成一個(gè) Scala 方法。到目前為止,在我們的例子中生成的接口是:
trait SearchbirdService {
def put(key: String, value: String): Future[Void]
def get(key: String): Future[String]
}
返回值是 Future[Value] 而不是直接返回值,可以推遲它們的計(jì)算(finagle 的文檔有 Future 更多的細(xì)節(jié))。對本教程的目的來說,你唯一需要知道的有關(guān) Future 的知識點(diǎn)是,可以通過 get() 獲取其值。
scala-bootstrapper 默認(rèn)實(shí)現(xiàn)的 key-value 存儲很簡單:它提供了一個(gè)通過 get 和 put 訪問的 數(shù)據(jù)庫 數(shù)據(jù)結(jié)構(gòu)。
class SearchbirdServiceImpl(config: SearchbirdServiceConfig) extends SearchbirdService.ThriftServer {
val serverName = "Searchbird"
val thriftPort = config.thriftPort
override val tracerFactory = config.tracerFactory
val database = new mutable.HashMap[String, String]()
def get(key: String) = {
database.get(key) match {
case None =>
log.debug("get %s: miss", key)
Future.exception(SearchbirdException("No such key"))
case Some(value) =>
log.debug("get %s: hit", key)
Future(value)
}
}
def put(key: String, value: String) = {
log.debug("put %s", key)
database(key) = value
Future.Unit
}
def shutdown() = {
super.shutdown(0.seconds)
}
}
其結(jié)果是構(gòu)建在 Scala HashMap 上的一個(gè)簡單 thrift 接口。
現(xiàn)在,我們將擴(kuò)展現(xiàn)有的例子,來創(chuàng)建一個(gè)簡單的搜索引擎。然后,我們將進(jìn)一步擴(kuò)展它成為由多個(gè)分片組成的 分布式 搜索引擎,使我們能夠適應(yīng)比單臺機(jī)器內(nèi)存更大的語料庫。
為了簡單起見,我們將最小化擴(kuò)展目前的 thrift 服務(wù),以支持搜索操作。使用模型是用 put 把文件加入搜索引擎,其中每個(gè)文件包含了一系列的記號(詞),那么我們就可以輸入一串記號,然后搜索會(huì)返回包含這個(gè)串中所有記號的所有文件。該體系結(jié)構(gòu)是與前面的例子相同,但增加了一個(gè)新的 @search@
調(diào)用。
要實(shí)現(xiàn)這樣一個(gè)搜索引擎需要修改以下兩個(gè)文件:
src/main/thrift/searchbird.thrift
service SearchbirdService {
string get(1: string key) throws(1: SearchbirdException ex)
void put(1: string key, 2: string value)
list<string> search(1: string query)
}
我們增加了一個(gè) search 方法來搜索當(dāng)前哈希表,返回其值與查詢匹配的鍵列表。實(shí)現(xiàn)也很簡單直觀:
…/SearchbirdServiceImpl.scala
大部分修改都在這個(gè)文件中。
現(xiàn)在的 數(shù)據(jù)庫 HashMap 保存一個(gè)正向索引來持有到文檔的鍵映射。我們重命名它為 forward 并增加一個(gè) 倒排(reverse) 索引(映射記號到所有包含該記號的文件)。所以在 SearchbirdServiceImpl.scala 中,更換 database 定義:
val forward = new mutable.HashMap[String, String]
with mutable.SynchronizedMap[String, String]
val reverse = new mutable.HashMap[String, Set[String]]
with mutable.SynchronizedMap[String, Set[String]]
在 get 調(diào)用中,使用 forward 替換 數(shù)據(jù)庫 即可,在其他方面 get 保持不變(僅執(zhí)行正向查找)。不過 put 還需要改變:我們還需要為文件中的每個(gè)令牌填充反向索引,把文件的鍵附加到令牌關(guān)聯(lián)的列表中。用下面的代碼替換 put 調(diào)用。給定一個(gè)特定的搜索令牌,我們現(xiàn)在可以使用反向映射來查找文件。
def put(key: String, value: String) = {
log.debug("put %s", key)
forward(key) = value
// serialize updaters
synchronized {
value.split(" ").toSet foreach { token =>
val current = reverse.getOrElse(token, Set())
reverse(token) = current + key
}
}
Future.Unit
}
需要注意的是(即使 HashMap 是線程安全的)同時(shí)只能有一個(gè)線程可以更新倒排索引,以確保對映射條目的 讀-修改-寫 是一個(gè)原子操作。 (這段代碼過于保守;在進(jìn)行 檢索-修改-寫 操作時(shí),它鎖定了整個(gè)映射,而不是鎖定單個(gè)條目。)。另外還要注意使用 Set 作為數(shù)據(jù)結(jié)構(gòu);這可以確保即使一個(gè)文件中兩次出現(xiàn)同樣的符號,它也只會(huì)被 foreach 循環(huán)處理一次。
這個(gè)實(shí)現(xiàn)仍然有一個(gè)問題,作為留給讀者的一個(gè)練習(xí):當(dāng)我們用一個(gè)新文檔覆蓋的一個(gè)鍵的時(shí)候,我們誒有刪除任何倒排索引中引用的舊文件。
現(xiàn)在進(jìn)入搜索引擎的核心:新的 search 方法。他應(yīng)該解析查詢,尋找匹配的文檔,然后對這些列表做相交操作。這將產(chǎn)生包含所有查詢中的標(biāo)記的文件列表。在 Scala 中可以很直接地表達(dá);添加這段代碼到
SearchbirdServiceImpl
類中:
def search(query: String) = Future.value {
val tokens = query.split(" ")
val hits = tokens map { token => reverse.getOrElse(token, Set()) }
val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
intersected.toList
}
在這段短短的代碼中有幾件事情是值得關(guān)注的。在構(gòu)建命中列表時(shí),如果鍵( token )沒有被發(fā)現(xiàn), getOrElse 會(huì)返回其第二個(gè)參數(shù)(在這種情況下,一個(gè)空 Set )。我們使用 left-reduce 執(zhí)行實(shí)際的相交操作。特別是當(dāng) reduceLeftOption 發(fā)現(xiàn) hits 為空時(shí)將不會(huì)繼續(xù)嘗試執(zhí)行 reduce 操作。這使我們能夠提供一個(gè)默認(rèn)值,而不是拋出一個(gè)異常。其實(shí)這相當(dāng)于:
def search(query: String) = Future.value {
val tokens = query.split(" ")
val hits = tokens map { token => reverse.getOrElse(token, Set()) }
if (hits.isEmpty)
Nil
else
hits reduceLeft { _ & _ } toList
}
使用哪種方式大多是個(gè)人喜好的問題,雖然函數(shù)式風(fēng)格往往會(huì)避開帶有合理默認(rèn)值的條件語句。
現(xiàn)在,我們可以嘗試在控制臺中實(shí)驗(yàn)我們新的實(shí)現(xiàn)。重啟服務(wù)器:
$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala
然后再從 searchbird 目錄,啟動(dòng)客戶端:
$ ./console 127.0.0.1 9999
...
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.
finagle-client>
粘貼以下說明到控制臺:
client.put("basics", " values functions classes methods inheritance try catch finally expression oriented")
client.put("basics", " case classes objects packages apply update functions are objects (uniform access principle) pattern")
client.put("collections", " lists maps functional combinators (map foreach filter zip")
client.put("pattern", " more functions! partialfunctions more pattern")
client.put("type", " basic types and type polymorphism type inference variance bounds")
client.put("advanced", " advanced types view bounds higher kinded types recursive types structural")
client.put("simple", " all about sbt the standard scala build")
client.put("more", " tour of the scala collections")
client.put("testing", " write tests with specs a bdd testing framework for")
client.put("concurrency", " runnable callable threads futures twitter")
client.put("java", " java interop using scala from")
client.put("searchbird", " building a distributed search engine using")
現(xiàn)在,我們可以執(zhí)行一些搜索,返回包含搜索詞的文件的鍵。
> client.search("functions").get()
res12: Seq[String] = ArrayBuffer(basics)
> client.search("java").get()
res13: Seq[String] = ArrayBuffer(java)
> client.search("java scala").get()
res14: Seq[String] = ArrayBuffer(java)
> client.search("functional").get()
res15: Seq[String] = ArrayBuffer(collections)
> client.search("sbt").get()
res16: Seq[String] = ArrayBuffer(simple)
> client.search("types").get()
res17: Seq[String] = ArrayBuffer(type, advanced)
回想一下,如果調(diào)用返回一個(gè) Future ,我們必須使用一個(gè)阻塞的 get() 來獲取其中包含的值。我們可以使用 Future.collect 命令來創(chuàng)建多個(gè)并發(fā)請求,并等待所有請求成功返回:
> import com.twitter.util.Future
...
> Future.collect(Seq(
client.search("types"),
client.search("sbt"),
client.search("functional")
)).get()
res18: Seq[Seq[String]] = ArrayBuffer(ArrayBuffer(type, advanced), ArrayBuffer(simple), ArrayBuffer(collections))
單臺機(jī)器上一個(gè)簡單的內(nèi)存搜索引擎將無法搜索超過內(nèi)存大小的語料庫?,F(xiàn)在,我們要大膽改進(jìn),用一個(gè)簡單的分片計(jì)劃來構(gòu)建分布式節(jié)點(diǎn)。下面是框圖:
為了幫助我們的工作,我們會(huì)先介紹另一個(gè)抽象索引來解耦 SearchbirdService 對索引實(shí)現(xiàn)的依賴。這是一個(gè)直觀的重構(gòu)。我們首先添加一個(gè)索引文件到構(gòu)建 (創(chuàng)建文件 searchbird/src/main/scala/com/twitter/searchbird/Index.scala
):
…/Index.scala
package com.twitter.searchbird
import scala.collection.mutable
import com.twitter.util._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.thrift.ThriftClientFramedCodec
trait Index {
def get(key: String): Future[String]
def put(key: String, value: String): Future[Unit]
def search(key: String): Future[List[String]]
}
class ResidentIndex extends Index {
val log = Logger.get(getClass)
val forward = new mutable.HashMap[String, String]
with mutable.SynchronizedMap[String, String]
val reverse = new mutable.HashMap[String, Set[String]]
with mutable.SynchronizedMap[String, Set[String]]
def get(key: String) = {
forward.get(key) match {
case None =>
log.debug("get %s: miss", key)
Future.exception(SearchbirdException("No such key"))
case Some(value) =>
log.debug("get %s: hit", key)
Future(value)
}
}
def put(key: String, value: String) = {
log.debug("put %s", key)
forward(key) = value
// admit only one updater.
synchronized {
(Set() ++ value.split(" ")) foreach { token =>
val current = reverse.get(token) getOrElse Set()
reverse(token) = current + key
}
}
Future.Unit
}
def search(query: String) = Future.value {
val tokens = query.split(" ")
val hits = tokens map { token => reverse.getOrElse(token, Set()) }
val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
intersected.toList
}
}
現(xiàn)在,我們把 thrift 服務(wù)轉(zhuǎn)換成一個(gè)簡單的調(diào)度機(jī)制:為每一個(gè)索引實(shí)例提供一個(gè) thrift 接口。這是一個(gè)強(qiáng)大的抽象,因?yàn)樗蛛x了索引實(shí)現(xiàn)和服務(wù)實(shí)現(xiàn)。服務(wù)不再知道索引的任何細(xì)節(jié);索引可以是本地的或遠(yuǎn)程的,甚至可能是許多索引的組合,但服務(wù)并不關(guān)心,索引實(shí)現(xiàn)可能會(huì)更改但是不用修改服務(wù)。
將 SearchbirdServiceImpl 類定義更換為以下(簡單得多)的代碼(其中不再包含索引實(shí)現(xiàn)細(xì)節(jié))。注意初始化服務(wù)器現(xiàn)在需要第二個(gè)參數(shù) Index 。
…/SearchbirdServiceImpl.scala
class SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdService.ThriftServer {
val serverName = "Searchbird"
val thriftPort = config.thriftPort
def get(key: String) = index.get(key)
def put(key: String, value: String) =
index.put(key, value) flatMap { _ => Future.Unit }
def search(query: String) = index.search(query)
def shutdown() = {
super.shutdown(0.seconds)
}
}
…/config/SearchbirdServiceConfig.scala
相應(yīng)地更新 SearchbirdServiceConfig 的 apply 調(diào)用:
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
var thriftPort: Int = 9999
var tracerFactory: Tracer.Factory = NullTracer.factory
def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex)
}
我們將建立一個(gè)簡單的分布式系統(tǒng),一個(gè)主節(jié)點(diǎn)組織查詢其子節(jié)點(diǎn)。為了實(shí)現(xiàn)這一目標(biāo),我們將需要兩個(gè)新的 Index 類型。一個(gè)代表遠(yuǎn)程索引,另一種是其他多個(gè) Index 實(shí)例的組合索引。這樣我們的服務(wù)就可以實(shí)例化多個(gè)遠(yuǎn)程索引的復(fù)合索引來構(gòu)建分布式索引。請注意這兩個(gè) Index 類型具有相同的接口,所以服務(wù)器不需要知道它們所連接的索引是遠(yuǎn)程的還是復(fù)合的。
…/Index.scala
在 Index.scala 中定義了 CompositeIndex :
class CompositeIndex(indices: Seq[Index]) extends Index {
require(!indices.isEmpty)
def get(key: String) = {
val queries = indices.map { idx =>
idx.get(key) map { r => Some(r) } handle { case e => None }
}
Future.collect(queries) flatMap { results =>
results.find { _.isDefined } map { _.get } match {
case Some(v) => Future.value(v)
case None => Future.exception(SearchbirdException("No such key"))
}
}
}
def put(key: String, value: String) =
Future.exception(SearchbirdException("put() not supported by CompositeIndex"))
def search(query: String) = {
val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } }
Future.collect(queries) map { results => (Set() ++ results.flatten) toList }
}
}
組合索引構(gòu)建在一組相關(guān) Index 實(shí)例的基礎(chǔ)上。注意它并不關(guān)心這些實(shí)例實(shí)際上是如何實(shí)現(xiàn)的。這種組合類型在構(gòu)建不同查詢機(jī)制的時(shí)候具有極大的靈活性。我們沒有定義拆分機(jī)制,所以復(fù)合索引不支持 put 操作。這些請求被直接交由子節(jié)點(diǎn)處理。 get 的實(shí)現(xiàn)是查詢所有子節(jié)點(diǎn),并提取第一個(gè)成功的結(jié)果。如果沒有成功結(jié)果的話,則拋出一個(gè)異常。注意因?yàn)闆]有結(jié)果是通過拋出一個(gè)異常表示的,所以我們 處理Future ,是將任何異常轉(zhuǎn)換成 None 。在實(shí)際系統(tǒng)中,我們很可能會(huì)為遺漏值填入適當(dāng)?shù)腻e(cuò)誤碼,而不是使用異常。異常在構(gòu)建原型時(shí)是方便和適宜的,但不能很好地組合。為了把真正的例外和遺漏值區(qū)分開,必須要檢查異常本身。相反,把這種區(qū)別直接嵌入在返回值的類型中是更好的風(fēng)格。
search 像以前一樣工作。和提取第一個(gè)結(jié)果不同,我們把它們組合起來,通過使用 Set 確保其唯一性。
RemoteIndex 提供了到遠(yuǎn)程服務(wù)器的一個(gè) Index 接口。
class RemoteIndex(hosts: String) extends Index {
val transport = ClientBuilder()
.name("remoteIndex")
.hosts(hosts)
.codec(ThriftClientFramedCodec())
.hostConnectionLimit(1)
.timeout(500.milliseconds)
.build()
val client = new SearchbirdService.FinagledClient(transport)
def get(key: String) = client.get(key)
def put(key: String, value: String) = client.put(key, value) map { _ => () }
def search(query: String) = client.search(query) map { _.toList }
}
這樣就使用一些合理的默認(rèn)值,調(diào)用代理,稍微調(diào)整類型,就構(gòu)造出一個(gè) finagle thrift 客戶端。
現(xiàn)在我們擁有了需要的所有功能。我們需要調(diào)整配置,以便能夠調(diào)用一個(gè)給定的節(jié)點(diǎn),不管是主節(jié)點(diǎn)亦或是數(shù)據(jù)分片節(jié)點(diǎn)。為了做到這一點(diǎn),我們將通過創(chuàng)建一個(gè)新的配置項(xiàng)來在系統(tǒng)中枚舉分片。我們還需要添加 Index 參數(shù)到我們的 SearchbirdServiceImpl 實(shí)例。然后,我們將使用命令行參數(shù)(還記得 Config 是如何做到的嗎)在這兩種模式中啟動(dòng)服務(wù)器。
…/config/SearchbirdServiceConfig.scala
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
var thriftPort: Int = 9999
var shards: Seq[String] = Seq()
def apply(runtime: RuntimeEnvironment) = {
val index = runtime.arguments.get("shard") match {
case Some(arg) =>
val which = arg.toInt
if (which >= shards.size || which < 0)
throw new Exception("invalid shard number %d".format(which))
// override with the shard port
val Array(_, port) = shards(which).split(":")
thriftPort = port.toInt
new ResidentIndex
case None =>
require(!shards.isEmpty)
val remotes = shards map { new RemoteIndex(_) }
new CompositeIndex(remotes)
}
new SearchbirdServiceImpl(this, index)
}
}
現(xiàn)在,我們將調(diào)整配置:添加“分片”初始化到 SearchbirdServiceConfig 的初始化中(我們可以通過端口 9000 訪問分片 0,9001 訪問分片 1,依次類推)。
config/development.scala
new SearchbirdServiceConfig {
// Add your own config here
shards = Seq(
"localhost:9000",
"localhost:9001",
"localhost:9002"
)
...
注釋掉 admin.httpPort 的設(shè)置(我們不希望在同一臺機(jī)器上運(yùn)行多個(gè)服務(wù),而不注釋的話這些服務(wù)都會(huì)試圖打開相同的端口):
// admin.httpPort = 9900
現(xiàn)在,如果我們不帶任何參數(shù)調(diào)用我們的服務(wù)器程序,它會(huì)啟動(dòng)一個(gè)主節(jié)點(diǎn)來和所有分片通信。如果我們指定一個(gè)分片參數(shù),它會(huì)在指定端口啟動(dòng)一個(gè)分片服務(wù)器。
讓我們試試吧!我們將啟動(dòng) 3 個(gè)服務(wù):2 個(gè)分片和 1 個(gè)主節(jié)點(diǎn)。首先編譯改動(dòng):
$ ./sbt
> compile
...
> exit
然后啟動(dòng)三個(gè)服務(wù):
$ ./sbt 'run -f config/development.scala -D shard=0'
$ ./sbt 'run -f config/development.scala -D shard=1'
$ ./sbt 'run -f config/development.scala'
您可以在 3 個(gè)不同的窗口中分別運(yùn)行,或在同一窗口開始依次逐個(gè)運(yùn)行,等待其啟動(dòng)后,只用 ctrl-z 懸掛這個(gè)命令,并使用 bg 將它放在后臺執(zhí)行。
然后,我們將通過控制臺與它們進(jìn)行互動(dòng)。首先,讓我們填充一些數(shù)據(jù)在兩個(gè)分片節(jié)點(diǎn)。從 searchbird 目錄運(yùn)行:
$ ./console localhost 9000
...
> client.put("fromShardA", "a value from SHARD_A")
> client.put("hello", "world")
$ ./console localhost 9001
...
> client.put("fromShardB", "a value from SHARD_B")
> client.put("hello", "world again")
一旦完成就可以退出這些控制臺會(huì)話。現(xiàn)在通過主節(jié)點(diǎn)查詢我們的數(shù)據(jù)庫(9999 端口):
$ ./console localhost 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient localhost 9999
'client' is bound to your thrift client.
finagle-client> client.get("hello").get()
res0: String = world
finagle-client> client.get("fromShardC").get()
SearchbirdException(No such key)
...
finagle-client> client.get("fromShardA").get()
res2: String = a value from SHARD_A
finagle-client> client.search("hello").get()
res3: Seq[String] = ArrayBuffer()
finagle-client> client.search("world").get()
res4: Seq[String] = ArrayBuffer(hello)
finagle-client> client.search("value").get()
res5: Seq[String] = ArrayBuffer(fromShardA, fromShardB)
這個(gè)設(shè)計(jì)有多個(gè)數(shù)據(jù)抽象,允許更加模塊化和可擴(kuò)展的實(shí)現(xiàn):
這個(gè)實(shí)現(xiàn)的可能改進(jìn)將包括:
[1]本地 ./sbt
腳本只是保證該 SBT 版本和我們知道的所有庫是一致的。
[2] 在 target/gen-scala/com/twitter/searchbird/SearchbirdService.scala
。
[3] 更多信息見 Ostrich’s README 。
更多建議: