DStream中的轉(zhuǎn)換

2018-02-24 15:57 更新

DStream中的轉(zhuǎn)換(transformation)

和RDD類似,transformation允許從輸入DStream來的數(shù)據(jù)被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:

Transformation Meaning
map(func) 利用函數(shù)func處理原DStream的每個元素,返回一個新的DStream
flatMap(func) 與map相似,但是每個輸入項可用被映射為0個或者多個輸出項
filter(func) 返回一個新的DStream,它僅僅包含源DStream中滿足函數(shù)func的項
repartition(numPartitions) 通過創(chuàng)建更多或者更少的partition改變這個DStream的并行級別(level of parallelism)
union(otherStream) 返回一個新的DStream,它包含源DStream和otherStream的聯(lián)合元素
count() 通過計算源DStream中每個RDD的元素數(shù)量,返回一個包含單元素(single-element)RDDs的新DStream
reduce(func) 利用函數(shù)func聚集源DStream中每個RDD的元素,返回一個包含單元素(single-element)RDDs的新DStream。函數(shù)應(yīng)該是相關(guān)聯(lián)的,以使計算可以并行化
countByValue() 這個算子應(yīng)用于元素類型為K的DStream上,返回一個(K,long)對的新DStream,每個鍵的值是在原DStream的每個RDD中的頻率。
reduceByKey(func, [numTasks]) 當(dāng)在一個由(K,V)對組成的DStream上調(diào)用這個算子,返回一個新的由(K,V)對組成的DStream,每一個key的值均由給定的reduce函數(shù)聚集起來。注意:在默認(rèn)情況下,這個算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。你可以用numTasks參數(shù)設(shè)置不同的任務(wù)數(shù)
join(otherStream, [numTasks]) 當(dāng)應(yīng)用于兩個DStream(一個包含(K,V)對,一個包含(K,W)對),返回一個包含(K, (V, W))對的新DStream
cogroup(otherStream, [numTasks]) 當(dāng)應(yīng)用于兩個DStream(一個包含(K,V)對,一個包含(K,W)對),返回一個包含(K, Seq[V], Seq[W])的元組
transform(func) 通過對源DStream的每個RDD應(yīng)用RDD-to-RDD函數(shù),創(chuàng)建一個新的DStream。這個可以在DStream中的任何RDD操作中使用
updateStateByKey(func) 利用給定的函數(shù)更新DStream的狀態(tài),返回一個新"state"的DStream。

最后兩個transformation算子需要重點介紹一下:

UpdateStateByKey操作

updateStateByKey操作允許不斷用新信息更新它的同時保持任意狀態(tài)。你需要通過兩步來使用它

  • 定義狀態(tài)-狀態(tài)可以是任何的數(shù)據(jù)類型
  • 定義狀態(tài)更新函數(shù)-怎樣利用更新前的狀態(tài)和從輸入流里面獲取的新值更新狀態(tài)

讓我們舉個例子說明。在例子中,你想保持一個文本數(shù)據(jù)流中每個單詞的運行次數(shù),運行次數(shù)用一個state表示,它的類型是整數(shù)

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

這個函數(shù)被用到了DStream包含的單詞上

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函數(shù)將會被每個單詞調(diào)用,newValues擁有一系列的1(從 (詞, 1)對而來),runningCount擁有之前的次數(shù)。要看完整的代碼,見例子

Transform操作

transform操作(以及它的變化形式如transformWith)允許在DStream運行任何RDD-to-RDD函數(shù)。它能夠被用來應(yīng)用任何沒在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。例如,連接數(shù)據(jù)流中的每個批(batch)和另外一個數(shù)據(jù)集的功能并沒有在DStream API中提供,然而你可以簡單的利用transform方法做到。如果你想通過連接帶有預(yù)先計算的垃圾郵件信息的輸入數(shù)據(jù)流來清理實時數(shù)據(jù),然后過了它們,你可以按如下方法來做:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

事實上,你也可以在transform方法中用機器學(xué)習(xí)圖計算算法

窗口(window)操作

Spark Streaming也支持窗口計算,它允許你在一個滑動窗口數(shù)據(jù)上應(yīng)用transformation算子。下圖闡明了這個滑動窗口。

滑動窗口

如上圖顯示,窗口在源DStream上滑動,合并和操作落入窗內(nèi)的源RDDs,產(chǎn)生窗口化的DStream的RDDs。在這個具體的例子中,程序在三個時間單元的數(shù)據(jù)上進行窗口操作,并且每兩個時間單元滑動一次。這說明,任何一個窗口操作都需要指定兩個參數(shù):

  • 窗口長度:窗口的持續(xù)時間
  • 滑動的時間間隔:窗口操作執(zhí)行的時間間隔

這兩個參數(shù)必須是源DStream的批時間間隔的倍數(shù)。

下面舉例說明窗口操作。例如,你想擴展前面的例子用來計算過去30秒的詞頻,間隔時間是10秒。為了達到這個目的,我們必須在過去30秒的pairs DStream上應(yīng)用reduceByKey操作。用方法reduceByKeyAndWindow實現(xiàn)。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常用的窗口操作如下所示,這些操作都需要用到上文提到的兩個參數(shù):窗口長度和滑動的時間間隔

Transformation Meaning
window(windowLength, slideInterval) 基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù)計算一個新的DStream
countByWindow(windowLength, slideInterval) 返回流中元素的一個滑動窗口數(shù)
reduceByWindow(func, windowLength, slideInterval) 返回一個單元素流。利用函數(shù)func聚集滑動時間間隔的流的元素創(chuàng)建這個單元素流。函數(shù)必須是相關(guān)聯(lián)的以使計算能夠正確的并行計算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 應(yīng)用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每一個key的值均由給定的reduce函數(shù)聚集起來。注意:在默認(rèn)情況下,這個算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。你可以用numTasks參數(shù)設(shè)置不同的任務(wù)數(shù)
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 應(yīng)用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每個key的值都是它們在滑動窗口中出現(xiàn)的頻率。
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號