W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
在Spark中有幾個優(yōu)化可以減少批處理的時間。這些可以在優(yōu)化指南中作了討論。這節(jié)重點討論幾個重要的。
通過網(wǎng)絡(luò)(如kafka,flume,socket等)接收數(shù)據(jù)需要這些數(shù)據(jù)反序列化并被保存到Spark中。如果數(shù)據(jù)接收成為系統(tǒng)的瓶頸,就要考慮并行地接收數(shù)據(jù)。注意,每個輸入DStream創(chuàng)建一個receiver
(運行在worker機器上)接收單個數(shù)據(jù)流。創(chuàng)建多個輸入DStream并配置它們可以從源中接收不同分區(qū)的數(shù)據(jù)流,從而實現(xiàn)多數(shù)據(jù)流接收。例如,接收兩個topic數(shù)據(jù)的單個輸入DStream可以被切分為兩個kafka輸入流,每個接收一個topic。這將在兩個worker上運行兩個receiver
,因此允許數(shù)據(jù)并行接收,提高整體的吞吐量。多個DStream可以被合并生成單個DStream,這樣運用在單個輸入DStream的transformation操作可以運用在合并的DStream上。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另外一個需要考慮的參數(shù)是receiver
的阻塞時間。對于大部分的receiver
,在存入Spark內(nèi)存之前,接收的數(shù)據(jù)都被合并成了一個大數(shù)據(jù)塊。每批數(shù)據(jù)中塊的個數(shù)決定了任務(wù)的個數(shù)。這些任務(wù)是用類似map的transformation操作接收的數(shù)據(jù)。阻塞間隔由配置參數(shù)spark.streaming.blockInterval
決定,默認的值是200毫秒。
多輸入流或者多receiver
的可選的方法是明確地重新分配輸入數(shù)據(jù)流(利用inputStream.repartition(<number of partitions>)
),在進一步操作之前,通過集群的機器數(shù)分配接收的批數(shù)據(jù)。
如果運行在計算stage上的并發(fā)任務(wù)數(shù)不足夠大,就不會充分利用集群的資源。例如,對于分布式reduce操作如reduceByKey
和reduceByKeyAndWindow
,默認的并發(fā)任務(wù)數(shù)通過配置屬性來確定(configuration.html#spark-properties)spark.default.parallelism
。你可以通過參數(shù)(PairDStreamFunctions
(api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))傳遞并行度,或者設(shè)置參數(shù)spark.default.parallelism
修改默認值。
數(shù)據(jù)序列化的總開銷是平常大的,特別是當sub-second級的批數(shù)據(jù)被接收時。下面有兩個相關(guān)點:
每秒鐘啟動的任務(wù)數(shù)是非常大的(50或者更多)。發(fā)送任務(wù)到slave的花費明顯,這使請求很難獲得亞秒(sub-second)級別的反應。通過下面的改變可以減小開支
These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: