Spark提供三個位置用來配置系統(tǒng):
conf/spark-env.sh
腳本設(shè)置每臺機(jī)器的設(shè)置。例如IP地址Spark屬性控制大部分的應(yīng)用程序設(shè)置,并且為每個應(yīng)用程序分別配置它。這些屬性可以直接在SparkConf上配置,然后傳遞給SparkContext
。SparkConf
允許你配置一些通用的屬性(如master URL、應(yīng)用程序明)以及通過set()
方法設(shè)置的任意鍵值對。例如,我們可以用如下方式創(chuàng)建一個擁有兩個線程的應(yīng)用程序。注意,我們用local[2]
運(yùn)行,這意味著兩個線程-表示最小的并行度,它可以幫助我們檢測當(dāng)在分布式環(huán)境下運(yùn)行的時才出現(xiàn)的錯誤。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
.set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
注意,我們在本地模式中擁有超過1個線程。和Spark Streaming的情況一樣,我們可能需要一個線程防止任何形式的饑餓問題。
在一些情況下,你可能想在SparkConf
中避免硬編碼確定的配置。例如,你想用不同的master或者不同的內(nèi)存數(shù)運(yùn)行相同的應(yīng)用程序。Spark允許你簡單地創(chuàng)建一個空conf。
val sc = new SparkContext(new SparkConf())
然后你在運(yùn)行時提供值。
./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark shell和spark-submit
工具支持兩種方式動態(tài)加載配置。第一種方式是命令行選項(xiàng),例如--master
,如上面shell顯示的那樣。spark-submit
可以接受任何Spark屬性,用--conf
標(biāo)記表示。但是那些參與Spark應(yīng)用程序啟動的屬性要用特定的標(biāo)記表示。運(yùn)行./bin/spark-submit --help
將會顯示選項(xiàng)的整個列表。
bin/spark-submit
也會從conf/spark-defaults.conf
中讀取配置選項(xiàng),這個配置文件中,每一行都包含一對以空格分開的鍵和值。例如:
spark.master spark://5.6.7.8:7077
spark.executor.memory 512m
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
任何標(biāo)簽(flags)指定的值或者在配置文件中的值將會傳遞給應(yīng)用程序,并且通過SparkConf
合并這些值。在SparkConf
上設(shè)置的屬性具有最高的優(yōu)先級,其次是傳遞給spark-submit
或者spark-shell
的屬性值,最后是spark-defaults.conf
文件中的屬性值。
在http://<driver>:4040
上的應(yīng)用程序web UI在“Environment”標(biāo)簽中列出了所有的Spark屬性。這對你確保設(shè)置的屬性的正確性是很有用的。注意,只有通過spark-defaults.conf, SparkConf以及命令行直接指定的值才會顯示。對于其它的配置屬性,你可以認(rèn)為程序用到了默認(rèn)的值。
控制內(nèi)部設(shè)置的大部分屬性都有合理的默認(rèn)值,一些最通用的選項(xiàng)設(shè)置如下:
Property Name | Default | Meaning |
---|---|---|
spark.app.name | (none) | 你的應(yīng)用程序的名字。這將在UI和日志數(shù)據(jù)中出現(xiàn) |
spark.master | (none) | 集群管理器連接的地方 |
spark.executor.memory | 512m | 每個executor進(jìn)程使用的內(nèi)存數(shù)。和JVM內(nèi)存串擁有相同的格式(如512m,2g) |
spark.driver.memory | 512m | driver進(jìn)程使用的內(nèi)存數(shù) |
spark.driver.maxResultSize | 1g | 每個Spark action(如collect)所有分區(qū)的序列化結(jié)果的總大小限制。設(shè)置的值應(yīng)該不小于1m,0代表沒有限制。如果總大小超過這個限制,工作將會終止。大的限制值可能導(dǎo)致driver出現(xiàn)內(nèi)存溢出錯誤(依賴于spark.driver.memory和JVM中對象的內(nèi)存消耗)。設(shè)置合理的限制,可以避免出現(xiàn)內(nèi)存溢出錯誤。 |
spark.serializer | org.apache.spark.serializer.JavaSerializer | 序列化對象使用的類。默認(rèn)的java序列化類可以序列化任何可序列化的java對象但是它很慢。所有我們建議用org.apache.spark.serializer.KryoSerializer |
spark.kryo.classesToRegister | (none) | 如果你用Kryo序列化,給定的用逗號分隔的自定義類名列表表示要注冊的類 |
spark.kryo.registrator | (none) | 如果你用Kryo序列化,設(shè)置這個類去注冊你的自定義類。如果你需要用自定義的方式注冊你的類,那么這個屬性是有用的。否則spark.kryo.classesToRegister 會更簡單。它應(yīng)該設(shè)置一個繼承自KryoRegistrator的類 |
spark.local.dir | /tmp | Spark中暫存空間的使用目錄。在Spark1.0以及更高的版本中,這個屬性被SPARK_LOCAL_DIRS(Standalone, Mesos)和LOCAL_DIRS(YARN)環(huán)境變量覆蓋。 |
spark.logConf | false | 當(dāng)SparkContext啟動時,將有效的SparkConf記錄為INFO。 |
Property Name | Default | Meaning |
---|---|---|
spark.executor.extraJavaOptions | (none) | 傳遞給executors的JVM選項(xiàng)字符串。例如GC設(shè)置或者其它日志設(shè)置。注意,在這個選項(xiàng)中設(shè)置Spark屬性或者堆大小是不合法的。Spark屬性需要用SparkConf對象或者spark-submit 腳本用到的spark-defaults.conf 文件設(shè)置。堆內(nèi)存可以通過spark.executor.memory 設(shè)置 |
spark.executor.extraClassPath | (none) | 附加到executors的classpath的額外的classpath實(shí)體。這個設(shè)置存在的主要目的是Spark與舊版本的向后兼容問題。用戶一般不用設(shè)置這個選項(xiàng) |
spark.executor.extraLibraryPath | (none) | 指定啟動executor的JVM時用到的庫路徑 |
spark.executor.logs.rolling.strategy | (none) | 設(shè)置executor日志的滾動(rolling)策略。默認(rèn)情況下沒有開啟??梢耘渲脼?code>time(基于時間的滾動)和size (基于大小的滾動)。對于time ,用spark.executor.logs.rolling.time.interval 設(shè)置滾動間隔;對于size ,用spark.executor.logs.rolling.size.maxBytes 設(shè)置最大的滾動大小 |
spark.executor.logs.rolling.time.interval | daily | executor日志滾動的時間間隔。默認(rèn)情況下沒有開啟。合法的值是daily , hourly , minutely 以及任意的秒。 |
spark.executor.logs.rolling.size.maxBytes | (none) | executor日志的最大滾動大小。默認(rèn)情況下沒有開啟。值設(shè)置為字節(jié) |
spark.executor.logs.rolling.maxRetainedFiles | (none) | 設(shè)置被系統(tǒng)保留的最近滾動日志文件的數(shù)量。更老的日志文件將被刪除。默認(rèn)沒有開啟。 |
spark.files.userClassPathFirst | false | (實(shí)驗(yàn)性)當(dāng)在Executors中加載類時,是否用戶添加的jar比Spark自己的jar優(yōu)先級高。這個屬性可以降低Spark依賴和用戶依賴的沖突。它現(xiàn)在還是一個實(shí)驗(yàn)性的特征。 |
spark.python.worker.memory | 512m | 在聚合期間,每個python worker進(jìn)程使用的內(nèi)存數(shù)。在聚合期間,如果內(nèi)存超過了這個限制,它將會將數(shù)據(jù)塞進(jìn)磁盤中 |
spark.python.profile | false | 在Python worker中開啟profiling。通過sc.show_profiles() 展示分析結(jié)果。或者在driver退出前展示分析結(jié)果??梢酝ㄟ^sc.dump_profiles(path) 將結(jié)果dump到磁盤中。如果一些分析結(jié)果已經(jīng)手動展示,那么在driver退出前,它們再不會自動展示 |
spark.python.profile.dump | (none) | driver退出前保存分析結(jié)果的dump文件的目錄。每個RDD都會分別dump一個文件??梢酝ㄟ^ptats.Stats() 加載這些文件。如果指定了這個屬性,分析結(jié)果不會自動展示 |
spark.python.worker.reuse | true | 是否重用python worker。如果是,它將使用固定數(shù)量的Python workers,而不需要為每個任務(wù)fork()一個Python進(jìn)程。如果有一個非常大的廣播,這個設(shè)置將非常有用。因?yàn)椋瑥V播不需要為每個任務(wù)從JVM到Python worker傳遞一次 |
spark.executorEnv.[EnvironmentVariableName] | (none) | 通過EnvironmentVariableName 添加指定的環(huán)境變量到executor進(jìn)程。用戶可以指定多個EnvironmentVariableName ,設(shè)置多個環(huán)境變量 |
spark.mesos.executor.home | driver side SPARK_HOME | 設(shè)置安裝在Mesos的executor上的Spark的目錄。默認(rèn)情況下,executors將使用driver的Spark本地(home)目錄,這個目錄對它們不可見。注意,如果沒有通過spark.executor.uri 指定Spark的二進(jìn)制包,這個設(shè)置才起作用 |
spark.mesos.executor.memoryOverhead | executor memory * 0.07, 最小384m | 這個值是spark.executor.memory 的補(bǔ)充。它用來計算mesos任務(wù)的總內(nèi)存。另外,有一個7%的硬編碼設(shè)置。最后的值將選擇spark.mesos.executor.memoryOverhead 或者spark.executor.memory 的7%二者之間的大者 |
Property Name | Default | Meaning |
---|---|---|
spark.shuffle.consolidateFiles | false | 如果設(shè)置為"true",在shuffle期間,合并的中間文件將會被創(chuàng)建。創(chuàng)建更少的文件可以提供文件系統(tǒng)的shuffle的效率。這些shuffle都伴隨著大量遞歸任務(wù)。當(dāng)用ext4和dfs文件系統(tǒng)時,推薦設(shè)置為"true"。在ext3中,因?yàn)槲募到y(tǒng)的限制,這個選項(xiàng)可能機(jī)器(大于8核)降低效率 |
spark.shuffle.spill | true | 如果設(shè)置為"true",通過將多出的數(shù)據(jù)寫入磁盤來限制內(nèi)存數(shù)。通過spark.shuffle.memoryFraction 來指定spilling的閾值 |
spark.shuffle.spill.compress | true | 在shuffle時,是否將spilling的數(shù)據(jù)壓縮。壓縮算法通過spark.io.compression.codec 指定。 |
spark.shuffle.memoryFraction | 0.2 | 如果spark.shuffle.spill 為“true”,shuffle中聚合和合并組操作使用的java堆內(nèi)存占總內(nèi)存的比重。在任何時候,shuffles使用的所有內(nèi)存內(nèi)maps的集合大小都受這個限制的約束。超過這個限制,spilling數(shù)據(jù)將會保存到磁盤上。如果spilling太過頻繁,考慮增大這個值 |
spark.shuffle.compress | true | 是否壓縮map操作的輸出文件。一般情況下,這是一個好的選擇。 |
spark.shuffle.file.buffer.kb | 32 | 每個shuffle文件輸出流內(nèi)存內(nèi)緩存的大小,單位是kb。這個緩存減少了創(chuàng)建只中間shuffle文件中磁盤搜索和系統(tǒng)訪問的數(shù)量 |
spark.reducer.maxMbInFlight | 48 | 從遞歸任務(wù)中同時獲取的map輸出數(shù)據(jù)的最大大小(mb)。因?yàn)槊恳粋€輸出都需要我們創(chuàng)建一個緩存用來接收,這個設(shè)置代表每個任務(wù)固定的內(nèi)存上限,所以除非你有更大的內(nèi)存,將其設(shè)置小一點(diǎn) |
spark.shuffle.manager | sort | 它的實(shí)現(xiàn)用于shuffle數(shù)據(jù)。有兩種可用的實(shí)現(xiàn):sort 和hash 。基于sort的shuffle有更高的內(nèi)存使用率 |
spark.shuffle.sort.bypassMergeThreshold | 200 | (Advanced) In the sort-based shuffle manager, avoid merge-sorting data if there is no map-side aggregation and there are at most this many reduce partitions |
spark.shuffle.blockTransferService | netty | 實(shí)現(xiàn)用來在executor直接傳遞shuffle和緩存塊。有兩種可用的實(shí)現(xiàn):netty 和nio ?;趎etty的塊傳遞在具有相同的效率情況下更簡單 |
Property Name | Default | Meaning |
---|---|---|
spark.ui.port | 4040 | 你的應(yīng)用程序dashboard的端口。顯示內(nèi)存和工作量數(shù)據(jù) |
spark.ui.retainedStages | 1000 | 在垃圾回收之前,Spark UI和狀態(tài)API記住的stage數(shù) |
spark.ui.retainedJobs | 1000 | 在垃圾回收之前,Spark UI和狀態(tài)API記住的job數(shù) |
spark.ui.killEnabled | true | 運(yùn)行在web UI中殺死stage和相應(yīng)的job |
spark.eventLog.enabled | false | 是否記錄Spark的事件日志。這在應(yīng)用程序完成后,重新構(gòu)造web UI是有用的 |
spark.eventLog.compress | false | 是否壓縮事件日志。需要spark.eventLog.enabled 為true |
spark.eventLog.dir | file:///tmp/spark-events | Spark事件日志記錄的基本目錄。在這個基本目錄下,Spark為每個應(yīng)用程序創(chuàng)建一個子目錄。各個應(yīng)用程序記錄日志到直到的目錄。用戶可能想設(shè)置這為統(tǒng)一的地點(diǎn),像HDFS一樣,所以歷史文件可以通過歷史服務(wù)器讀取 |
Property Name | Default | Meaning |
---|---|---|
spark.broadcast.compress | true | 在發(fā)送廣播變量之前是否壓縮它 |
spark.rdd.compress | true | 是否壓縮序列化的RDD分區(qū)。在花費(fèi)一些額外的CPU時間的同時節(jié)省大量的空間 |
spark.io.compression.codec | snappy | 壓縮諸如RDD分區(qū)、廣播變量、shuffle輸出等內(nèi)部數(shù)據(jù)的編碼解碼器。默認(rèn)情況下,Spark提供了三種選擇:lz4, lzf和snappy。你也可以用完整的類名來制定。org.apache.spark.io.LZ4CompressionCodec ,org.apache.spark.io.LZFCompressionCodec ,org.apache.spark.io.SnappyCompressionCodec |
spark.io.compression.snappy.block.size | 32768 | Snappy壓縮中用到的塊大小。降低這個塊的大小也會降低shuffle內(nèi)存使用率 |
spark.io.compression.lz4.block.size | 32768 | LZ4壓縮中用到的塊大小。降低這個塊的大小也會降低shuffle內(nèi)存使用率 |
spark.closure.serializer | org.apache.spark.serializer.JavaSerializer | 閉包用到的序列化類。目前只支持java序列化器 |
spark.serializer.objectStreamReset | 100 | 當(dāng)用org.apache.spark.serializer.JavaSerializer 序列化時,序列化器通過緩存對象防止寫多余的數(shù)據(jù),然而這會造成這些對象的垃圾回收停止。通過請求'reset',你從序列化器中flush這些信息并允許收集老的數(shù)據(jù)。為了關(guān)閉這個周期性的reset,你可以將值設(shè)為-1。默認(rèn)情況下,每一百個對象reset一次 |
spark.kryo.referenceTracking | true | 當(dāng)用Kryo序列化時,跟蹤是否引用同一對象。如果你的對象圖有環(huán),這是必須的設(shè)置。如果他們包含相同對象的多個副本,這個設(shè)置對效率是有用的。如果你知道不在這兩個場景,那么可以禁用它以提高效率 |
spark.kryo.registrationRequired | false | 是否需要注冊為Kyro可用。如果設(shè)置為true,然后如果一個沒有注冊的類序列化,Kyro會拋出異常。如果設(shè)置為false,Kryo將會同時寫每個對象和其非注冊類名。寫類名可能造成顯著地性能瓶頸。 |
spark.kryoserializer.buffer.mb | 0.064 | Kyro序列化緩存的大小。這樣worker上的每個核都有一個緩存。如果有需要,緩存會漲到spark.kryoserializer.buffer.max.mb 設(shè)置的值那么大。 |
spark.kryoserializer.buffer.max.mb | 64 | Kryo序列化緩存允許的最大值。這個值必須大于你嘗試序列化的對象 |
Property Name | Default | Meaning |
---|---|---|
spark.driver.host | (local hostname) | driver監(jiān)聽的主機(jī)名或者IP地址。這用于和executors以及獨(dú)立的master通信 |
spark.driver.port | (random) | driver監(jiān)聽的接口。這用于和executors以及獨(dú)立的master通信 |
spark.fileserver.port | (random) | driver的文件服務(wù)器監(jiān)聽的端口 |
spark.broadcast.port | (random) | driver的HTTP廣播服務(wù)器監(jiān)聽的端口 |
spark.replClassServer.port | (random) | driver的HTTP類服務(wù)器監(jiān)聽的端口 |
spark.blockManager.port | (random) | 塊管理器監(jiān)聽的端口。這些同時存在于driver和executors |
spark.executor.port | (random) | executor監(jiān)聽的端口。用于與driver通信 |
spark.port.maxRetries | 16 | 當(dāng)綁定到一個端口,在放棄前重試的最大次數(shù) |
spark.akka.frameSize | 10 | 在"control plane"通信中允許的最大消息大小。如果你的任務(wù)需要發(fā)送大的結(jié)果到driver中,調(diào)大這個值 |
spark.akka.threads | 4 | 通信的actor線程數(shù)。當(dāng)driver有很多CPU核時,調(diào)大它是有用的 |
spark.akka.timeout | 100 | Spark節(jié)點(diǎn)之間的通信超時。單位是s |
spark.akka.heartbeat.pauses | 6000 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in combination of spark.akka.heartbeat.interval and spark.akka.failure-detector.threshold if you need to. |
spark.akka.failure-detector.threshold | 300.0 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). This maps to akka's akka.remote.transport-failure-detector.threshold . Tune this in combination of spark.akka.heartbeat.pauses and spark.akka.heartbeat.interval if you need to. |
spark.akka.heartbeat.interval | 1000 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of spark.akka.heartbeat.pauses and spark.akka.failure-detector.threshold if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real Spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. |
Property Name | Default | Meaning |
---|---|---|
spark.authenticate | false | 是否Spark驗(yàn)證其內(nèi)部連接。如果不是運(yùn)行在YARN上,請看spark.authenticate.secret |
spark.authenticate.secret | None | 設(shè)置Spark兩個組件之間的密匙驗(yàn)證。如果不是運(yùn)行在YARN上,但是需要驗(yàn)證,這個選項(xiàng)必須設(shè)置 |
spark.core.connection.auth.wait.timeout | 30 | 連接時等待驗(yàn)證的實(shí)際。單位為秒 |
spark.core.connection.ack.wait.timeout | 60 | 連接等待回答的時間。單位為秒。為了避免不希望的超時,你可以設(shè)置更大的值 |
spark.ui.filters | None | 應(yīng)用到Spark web UI的用于過濾類名的逗號分隔的列表。過濾器必須是標(biāo)準(zhǔn)的javax servlet Filter。通過設(shè)置java系統(tǒng)屬性也可以指定每個過濾器的參數(shù)。spark.<class name of filter>.params='param1=value1,param2=value2' 。例如-Dspark.ui.filters=com.test.filter1 、-Dspark.com.test.filter1.params='param1=foo,param2=testing' |
spark.acls.enable | false | 是否開啟Spark acls。如果開啟了,它檢查用戶是否有權(quán)限去查看或修改job。 Note this requires the user to be known, so if the user comes across as null no checks are done。UI利用使用過濾器驗(yàn)證和設(shè)置用戶 |
spark.ui.view.acls | empty | 逗號分隔的用戶列表,列表中的用戶有查看(view)Spark web UI的權(quán)限。默認(rèn)情況下,只有啟動Spark job的用戶有查看權(quán)限 |
spark.modify.acls | empty | 逗號分隔的用戶列表,列表中的用戶有修改Spark job的權(quán)限。默認(rèn)情況下,只有啟動Spark job的用戶有修改權(quán)限 |
spark.admin.acls | empty | 逗號分隔的用戶或者管理員列表,列表中的用戶或管理員有查看和修改所有Spark job的權(quán)限。如果你運(yùn)行在一個共享集群,有一組管理員或開發(fā)者幫助debug,這個選項(xiàng)有用 |
Property Name | Default | Meaning |
---|---|---|
spark.streaming.blockInterval | 200 | 在這個時間間隔(ms)內(nèi),通過Spark Streaming receivers接收的數(shù)據(jù)在保存到Spark之前,chunk為數(shù)據(jù)塊。推薦的最小值為50ms |
spark.streaming.receiver.maxRate | infinite | 每秒鐘每個receiver將接收的數(shù)據(jù)的最大記錄數(shù)。有效的情況下,每個流將消耗至少這個數(shù)目的記錄。設(shè)置這個配置為0或者-1將會不作限制 |
spark.streaming.receiver.writeAheadLogs.enable | false | Enable write ahead logs for receivers. All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures |
spark.streaming.unpersist | true | 強(qiáng)制通過Spark Streaming生成并持久化的RDD自動從Spark內(nèi)存中非持久化。通過Spark Streaming接收的原始輸入數(shù)據(jù)也將清除。設(shè)置這個屬性為false允許流應(yīng)用程序訪問原始數(shù)據(jù)和持久化RDD,因?yàn)樗鼈儧]有被自動清除。但是它會造成更高的內(nèi)存花費(fèi) |
通過環(huán)境變量配置確定的Spark設(shè)置。環(huán)境變量從Spark安裝目錄下的conf/spark-env.sh
腳本讀?。ɑ蛘遷indows的conf/spark-env.cmd
)。在獨(dú)立的或者M(jìn)esos模式下,這個文件可以給機(jī)器確定的信息,如主機(jī)名。當(dāng)運(yùn)行本地應(yīng)用程序或者提交腳本時,它也起作用。
注意,當(dāng)Spark安裝時,conf/spark-env.sh
默認(rèn)是不存在的。你可以復(fù)制conf/spark-env.sh.template
創(chuàng)建它。
可以在spark-env.sh
中設(shè)置如下變量:
Environment Variable | Meaning |
---|---|
JAVA_HOME | java安裝的路徑 |
PYSPARK_PYTHON | PySpark用到的Python二進(jìn)制執(zhí)行文件路徑 |
SPARK_LOCAL_IP | 機(jī)器綁定的IP地址 |
SPARK_PUBLIC_DNS | 你Spark應(yīng)用程序通知給其他機(jī)器的主機(jī)名 |
除了以上這些,Spark standalone cluster scripts也可以設(shè)置一些選項(xiàng)。例如每臺機(jī)器使用的核數(shù)以及最大內(nèi)存。
因?yàn)?code>spark-env.sh是shell腳本,其中的一些可以以編程方式設(shè)置。例如,你可以通過特定的網(wǎng)絡(luò)接口計算SPARK_LOCAL_IP
。
Spark用log4j logging。你可以通過在conf目錄下添加log4j.properties
文件來配置。一種方法是復(fù)制log4j.properties.template
文件。
更多建議: