W3Cschool
恭喜您成為首批注冊(cè)用戶(hù)
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
Spark支持兩種方法將存在的RDDs轉(zhuǎn)換為SchemaRDDs。第一種方法使用反射來(lái)推斷包含特定對(duì)象類(lèi)型的RDD的模式(schema)。在你寫(xiě)spark程序的同時(shí),當(dāng)你已經(jīng)知道了模式,這種基于反射的方法可以使代碼更簡(jiǎn)潔并且程序工作得更好。
創(chuàng)建SchemaRDDs的第二種方法是通過(guò)一個(gè)編程接口來(lái)實(shí)現(xiàn),這個(gè)接口允許你構(gòu)造一個(gè)模式,然后在存在的RDDs上使用它。雖然這種方法更冗長(zhǎng),但是它允許你在運(yùn)行期之前不知道列以及列的類(lèi)型的情況下構(gòu)造SchemaRDDs。
Spark SQL的Scala接口支持將包含樣本類(lèi)的RDDs自動(dòng)轉(zhuǎn)換為SchemaRDD。這個(gè)樣本類(lèi)定義了表的模式。
給樣本類(lèi)的參數(shù)名字通過(guò)反射來(lái)讀取,然后作為列的名字。樣本類(lèi)可以嵌套或者包含復(fù)雜的類(lèi)型如序列或者數(shù)組。這個(gè)RDD可以隱式轉(zhuǎn)化為一個(gè)SchemaRDD,然后注冊(cè)為一個(gè)表。表可以在后續(xù)的sql語(yǔ)句中使用。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
當(dāng)樣本類(lèi)不能提前確定(例如,記錄的結(jié)構(gòu)是經(jīng)過(guò)編碼的字符串,或者一個(gè)文本集合將會(huì)被解析,不同的字段投影給不同的用戶(hù)),一個(gè)SchemaRDD可以通過(guò)三步來(lái)創(chuàng)建。
StructType
表示的模式與第一步創(chuàng)建的RDD的行結(jié)構(gòu)相匹配applySchema
方法應(yīng)用模式// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t => "Name: " + t(0)).collect().foreach(println)
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話(huà):173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: