初始化StreamingContext

2018-02-24 15:57 更新

初始化StreamingContext

為了初始化Spark Streaming程序,一個StreamingContext對象必需被創(chuàng)建,它是Spark Streaming所有流操作的主要入口。一個StreamingContext對象可以用SparkConf對象創(chuàng)建。

import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName表示你的應用程序顯示在集群UI上的名字,master是一個Spark、Mesos、YARN集群URL或者一個特殊字符串“l(fā)ocal[*]”,它表示程序用本地模式運行。當程序運行在集群中時,你并不希望在程序中硬編碼master,而是希望用spark-submit啟動應用程序,并從spark-submit中得到master的值。對于本地測試或者單元測試,你可以傳遞“l(fā)ocal”字符串在同一個進程內運行Spark Streaming。需要注意的是,它在內部創(chuàng)建了一個SparkContext對象,你可以通過ssc.sparkContext訪問這個SparkContext對象。

批時間片需要根據(jù)你的程序的潛在需求以及集群的可用資源來設定,你可以在性能調優(yōu)那一節(jié)獲取詳細的信息。

可以利用已經(jīng)存在的SparkContext對象創(chuàng)建StreamingContext對象。

import org.apache.spark.streaming._
val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

當一個上下文(context)定義之后,你必須按照以下幾步進行操作

  • 定義輸入源;
  • 準備好流計算指令;
  • 利用streamingContext.start()方法接收和處理數(shù)據(jù);
  • 處理過程將一直持續(xù),直到streamingContext.stop()方法被調用。

幾點需要注意的地方:

  • 一旦一個context已經(jīng)啟動,就不能有新的流算子建立或者是添加到context中。
  • 一旦一個context已經(jīng)停止,它就不能再重新啟動
  • 在JVM中,同一時間只能有一個StreamingContext處于活躍狀態(tài)
  • 在StreamingContext上調用stop()方法,也會關閉SparkContext對象。如果只想僅關閉StreamingContext對象,設置stop()的可選參數(shù)為false
  • 一個SparkContext對象可以重復利用去創(chuàng)建多個StreamingContext對象,前提條件是前面的StreamingContext在后面StreamingContext創(chuàng)建之前關閉(不關閉SparkContext)。
以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號