Spark Streaming減少批數(shù)據(jù)的執(zhí)行時間

2018-11-26 16:37 更新

Spark Streaming減少批數(shù)據(jù)的執(zhí)行時間

在Spark中有幾個優(yōu)化可以減少批處理的時間。這些可以在優(yōu)化指南中作了討論。這節(jié)重點討論幾個重要的。

數(shù)據(jù)接收的并行水平

通過網(wǎng)絡(luò)(如kafka,flume,socket等)接收數(shù)據(jù)需要這些數(shù)據(jù)反序列化并被保存到Spark中。如果數(shù)據(jù)接收成為系統(tǒng)的瓶頸,就要考慮并行地接收數(shù)據(jù)。注意,每個輸入DStream創(chuàng)建一個receiver(運行在worker機器上)接收單個數(shù)據(jù)流。創(chuàng)建多個輸入DStream并配置它們可以從源中接收不同分區(qū)的數(shù)據(jù)流,從而實現(xiàn)多數(shù)據(jù)流接收。例如,接收兩個topic數(shù)據(jù)的單個輸入DStream可以被切分為兩個kafka輸入流,每個接收一個topic。這將在兩個worker上運行兩個receiver,因此允許數(shù)據(jù)并行接收,提高整體的吞吐量。多個DStream可以被合并生成單個DStream,這樣運用在單個輸入DStream的transformation操作可以運用在合并的DStream上。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

另外一個需要考慮的參數(shù)是receiver的阻塞時間。對于大部分的receiver,在存入Spark內(nèi)存之前,接收的數(shù)據(jù)都被合并成了一個大數(shù)據(jù)塊。每批數(shù)據(jù)中塊的個數(shù)決定了任務(wù)的個數(shù)。這些任務(wù)是用類似map的transformation操作接收的數(shù)據(jù)。阻塞間隔由配置參數(shù)spark.streaming.blockInterval決定,默認的值是200毫秒。

多輸入流或者多receiver的可選的方法是明確地重新分配輸入數(shù)據(jù)流(利用inputStream.repartition(<number of partitions>)),在進一步操作之前,通過集群的機器數(shù)分配接收的批數(shù)據(jù)。

數(shù)據(jù)處理的并行水平

如果運行在計算stage上的并發(fā)任務(wù)數(shù)不足夠大,就不會充分利用集群的資源。例如,對于分布式reduce操作如reduceByKeyreduceByKeyAndWindow,默認的并發(fā)任務(wù)數(shù)通過配置屬性來確定(configuration.html#spark-properties)spark.default.parallelism。你可以通過參數(shù)(PairDStreamFunctions (api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))傳遞并行度,或者設(shè)置參數(shù)spark.default.parallelism修改默認值。

數(shù)據(jù)序列化

數(shù)據(jù)序列化的總開銷是平常大的,特別是當sub-second級的批數(shù)據(jù)被接收時。下面有兩個相關(guān)點:

  • Spark中RDD數(shù)據(jù)的序列化。關(guān)于數(shù)據(jù)序列化請參照Spark優(yōu)化指南。注意,與Spark不同的是,默認的RDD會被持久化為序列化的字節(jié)數(shù)組,以減少與垃圾回收相關(guān)的暫停。
  • 輸入數(shù)據(jù)的序列化。從外部獲取數(shù)據(jù)存到Spark中,獲取的byte數(shù)據(jù)需要從byte反序列化,然后再按照Spark的序列化格式重新序列化到Spark中。因此,輸入數(shù)據(jù)的反序列化花費可能是一個瓶頸。

任務(wù)的啟動開支

每秒鐘啟動的任務(wù)數(shù)是非常大的(50或者更多)。發(fā)送任務(wù)到slave的花費明顯,這使請求很難獲得亞秒(sub-second)級別的反應。通過下面的改變可以減小開支

  • 任務(wù)序列化。運行kyro序列化任何可以減小任務(wù)的大小,從而減小任務(wù)發(fā)送到slave的時間。
  • 執(zhí)行模式。在Standalone模式下或者粗粒度的Mesos模式下運行Spark可以在比細粒度Mesos模式下運行Spark獲得更短的任務(wù)啟動時間??梢栽?a href="#">在Mesos下運行Spark中獲取更多信息。

These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號