Spark Shell

2018-02-24 15:57 更新

使用 Spark Shell

基礎(chǔ)

Spark 的 shell 作為一個(gè)強(qiáng)大的交互式數(shù)據(jù)分析工具,提供了一個(gè)簡(jiǎn)單的方式來(lái)學(xué)習(xí) API。它可以使用 Scala(在 Java 虛擬機(jī)上運(yùn)行現(xiàn)有的 Java 庫(kù)的一個(gè)很好方式) 或 Python。在 Spark 目錄里使用下面的方式開(kāi)始運(yùn)行:

./bin/spark-shell

Spark 最主要的抽象是叫Resilient Distributed Dataset(RDD) 的彈性分布式集合。RDDs 可以使用 Hadoop InputFormats(例如 HDFS 文件)創(chuàng)建,也可以從其他的 RDDs 轉(zhuǎn)換。讓我們?cè)?Spark 源代碼目錄從 README 文本文件中創(chuàng)建一個(gè)新的 RDD。

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDD 的 actions 從 RDD 中返回值,transformations 可以轉(zhuǎn)換成一個(gè)新 RDD 并返回它的引用。讓我們開(kāi)始使用幾個(gè)操作:

scala> textFile.count() // RDD 的數(shù)據(jù)條數(shù)
res0: Long = 126

scala> textFile.first() // RDD 的第一行數(shù)據(jù)
res1: String = # Apache Spark

現(xiàn)在讓我們使用一個(gè) transformation,我們將使用 filter 在這個(gè)文件里返回一個(gè)包含子數(shù)據(jù)集的新 RDD。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

我們可以把 actions 和 transformations 鏈接在一起:

scala> textFile.filter(line => line.contains("Spark")).count() // 有多少行包括 "Spark"?
res3: Long = 15

更多 RDD 操作

RDD actions 和 transformations 能被用在更多的復(fù)雜計(jì)算中。比方說(shuō),我們想要找到一行中最多的單詞數(shù)量:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

首先將行映射成一個(gè)整型數(shù)值產(chǎn)生一個(gè)新 RDD。 在這個(gè)新的 RDD 上調(diào)用 reduce 找到行中最大的個(gè)數(shù)。 mapreduce 的參數(shù)是 Scala 的函數(shù)串(閉包),并且可以使用任何語(yǔ)言特性或者 Scala/Java 類(lèi)庫(kù)。例如,我們可以很方便地調(diào)用其他的函數(shù)聲明。 我們使用 Math.max() 函數(shù)讓代碼更容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

Hadoop 流行的一個(gè)通用的數(shù)據(jù)流模式是 MapReduce。Spark 能很容易地實(shí)現(xiàn) MapReduce:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

這里,我們結(jié)合 [flatMap](), [map]() 和 [reduceByKey]() 來(lái)計(jì)算文件里每個(gè)單詞出現(xiàn)的數(shù)量,它的結(jié)果是包含一組(String, Int) 鍵值對(duì)的 RDD。我們可以使用 [collect] 操作在我們的 shell 中收集單詞的數(shù)量:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

緩存

Spark 支持把數(shù)據(jù)集拉到集群內(nèi)的內(nèi)存緩存中。當(dāng)要重復(fù)訪(fǎng)問(wèn)時(shí)這是非常有用的,例如當(dāng)我們?cè)谝粋€(gè)小的熱(hot)數(shù)據(jù)集中查詢(xún),或者運(yùn)行一個(gè)像網(wǎng)頁(yè)搜索排序這樣的重復(fù)算法。作為一個(gè)簡(jiǎn)單的例子,讓我們把 linesWithSpark 數(shù)據(jù)集標(biāo)記在緩存中:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

緩存 100 行的文本文件來(lái)研究 Spark 這看起來(lái)很傻。真正讓人感興趣的部分是我們可以在非常大型的數(shù)據(jù)集中使用同樣的函數(shù),甚至在 10 個(gè)或者 100 個(gè)節(jié)點(diǎn)中交叉計(jì)算。你同樣可以使用 bin/spark-shell 連接到一個(gè) cluster 來(lái)替換掉編程指南中的方法進(jìn)行交互操作。

以上內(nèi)容是否對(duì)您有幫助:
在線(xiàn)筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)