SparkSession - Spark SQL 的入口
SparkSession是Spark SQL的入口点。 它是使用完全类型化的数据集(Dataset)(或无类型的基于Row的DataFrame) )数据抽象开发Spark SQL应用程序时,您必须创建的第一个对象。
| Note | Spark 2.0中,SparkSession已将SQLContext和HiveContext合并到一个对象。 |
|---|---|
你可以用SparkSession.builder 方法创建一个SparkSession实例
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // avoid hardcoding the deployment environment
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.getOrCreate
然后用 stop 方法停止当前的SparkSession
..............................................................
spark.stop
您可以在单个Spark应用程序中拥有任意数量的SparkSessions。常见的用例是用关系实体去分离SparkSession(见[ catalog ](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-SparkSession.html#catalog属性)
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
SparkSession内部需要一个SparkContext和一个可选的SharedState(保存着跨SparkSession实例的共享状态).
| 方法 | 描述 |
| builder | “打开”一个builder来获取或创建一个SparkSession实例 |
| version | 返回当前spark的版本 |
| implicits | 使用import spark.implicits._可以导入隐性转换并将几乎任意的Scala对象创建为Dataset |
| emptyDataset[T] | 创建一个空的Dataset[T] |
| range | 创建一个Dataset[Long]. |
| sql | 执行SQL查询(并返回一个DataFrame) |
| udf | 访问用户定义的函数(UDF) |
| table | 用表创建一个DataFrame |
| catalog | 访问结构化查询实体的catalog |
| read | 用DataFrameReader从外部文件和存储系统读取DataFrame |
| conf | 访问当前的runtime配置 |
| readStream | 用DataStreamReader读取流数据集 |
| streams | 用StreamingQueryManager管理结构化流查询 |
| newSession | 创建一个新的SparkSession. |
| stop | 停止SparkSession. |
| Tip | 用spark.sql.warehouse.dir属性来更改Hive的hive.metastore.warehouse.dir属性, 即Hive的本地/嵌入式转移数据库的位置(Derby).请参阅SharedState了解Spark SQL对Apache Hive的支持细节. 另请参阅官方Hive Metastore Administration文档 |
|---|---|
| 名称 | 类型 | 描述 |
sessionState |
SessionState | 在内部,sessionState 克隆可选父SessionState(如果创建SparkSession时指定了一个)或使用BaseSessionStateBuilder定义spark.sql.catalogImplementation属性来创建一个新的SessionState: in-memory(默认) for org.apache.spark.sql.internal.SessionStateBuilderhivefor org.apache.spark.sql.hive.HiveSessionStateBuilder |
sharedState |
SharedState |
| Note | baseRelationToDataFrame作为SparkSession桥接对象的一个机制,将BaseRelation对象层级插入到LogicalPlan对象层级 |
|---|---|
创建SparkSession实例
| Caution | FIXME |
|---|---|
用Builder模式创建 SparkSession — builder方法
builder():
Builder
builder会创建一个新的Builder用于使用fluent API构建完全配置的SparkSession
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
| Tip | 参阅维基百科中的Fluent interface设计模式,自由的百科全书 |
|---|---|
获取Spark版本信息 — version方法
version: String
version返回当前Spark版本信息
version内部使用spark.SPARK_VERSION值,也就是在CLASSPATH中的spark-version-info.properties属性文件中的version属性
隐式转换 — implicits对象
implicits对象是一个具有Scala隐式方法(也称为转换)的帮助类,用于将Scala对象转换为Datasets,DataFrames和Columns。它还定义了Scala的“原始”类型的Encoders,例如Int,Double,String,和它们的products和集合。
| Note | 通过import spark.implicits._导入隐式转换。val spark = SparkSession.builder.getOrCreate()import spark.implicits._ |
|---|---|
implicits对象支持将任意类型的RDD(范围内存在encoder)或case classes或元组和Seq创建为Dataset
implicits对象还支持将Scala的Symbol或者$转换为Column
它还提供将RDD或Seq从Product类型(例如case类或元组)转换成DataFrame。它能将RDD的Int,Long和String直接转换成单个列名的DataFrame。
It also offers conversions fromRDDorSeqofProducttypes (e.g. case classes or tuples) toDataFrame. It has direct conversions fromRDDofInt,LongandStringtoDataFramewith a single column name_1.
| Note | 只能通过对Int,Long, andString"原始" 类型的RDD对象使用toDF方法来实现 |
|---|---|
创建空的 Dataset — emptyDataset方法
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset创建一个空的 Dataset(假定其类型将会是T).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset创建一个LocalRelation逻辑查询计划.
用本地数据集合或者RDD创建Dataset — createDataset方法
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset是一个创建Dataset的试验性API
能将本地Scala集合转换成Dataset, i.e.Seq[T], Java’sList[T], 或者一个分布式RDD[T].
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset会创建一个LocalRelation逻辑查询计划(如果是数据集合输入) 或者LogicalRDD(如果是RDD[T]输入).
| Tip | 你最好使用 Scala implicits 或者toDS方法(这样可以实现自动转换).</br>val spark: SparkSession = ...</br>import spark.implicits._</br></br>scala> val one = Seq(1).toDS</br>one: org.apache.spark.sql.Dataset[Int] = [value: int] |
|---|---|
在内部,createDataset会先在AttributeReference的范围内 (schema)查找隐式expression encoder
| Note | 目前仅支持未解析的expression encoders |
|---|---|
然后使用expression encoder将Seq[T]输入的元素映射到InternalRows的集合中。使用引用和行,createDataset返回一个具有LocalRelation逻辑查询计划的Dataset。
用 一个 Long类型的列 创建 Dataset — range方法
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range方法族可以将Long类型的数值创建为 Dataset
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
| Note | The three first variants (that do not specifynumPartitionsexplicitly) useSparkContext.defaultParallelismfor the number of partitionsnumPartitions. |
|---|---|
在内部,range创建一个带Rangelogical plan 和Encoders.LONGencoder的新Dataset[Long].
创建一个空的 DataFrame — emptyDataFrame方法
emptyDataFrame: DataFrame
emptyDataFrame创建一个空DataFrame(没有行和列)
它用一个RDD[Row]和一个空schemaStructType(Nil)调用createDataFrame
用显示 schema 将 RDD 创建为 DataFrame— createDataFrame方法
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFramecreates aDataFrameusingRDD[Row]and the inputschema. It is assumed that the rows inrowRDDall match theschema.
Executing SQL Queries (aka SQL Mode) — sqlMethod
sql(sqlText:
String
):
DataFrame
sqlexecutes thesqlTextSQL statement and creates aDataFrame.
| Note | sqlis imported inspark-shellso you can execute SQL statements as ifsqlwere a part of the environment.scala> spark.versionres0: String = 2.2.0-SNAPSHOTscala> :imports 1) import spark.implicits._ (72 terms, 43 are implicit) 2) import spark.sql (1 terms) |
|---|---|
scala
>
sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala
>
sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala
>
sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally,sqlrequests thecurrentParserInterfacetoexecute a SQL querythat gives aLogicalPlan.
| Note | sqlusesSessionStateto access the currentParserInterface. |
|---|---|
sqlthen creates aDataFrameusing the currentSparkSession(itself) and theLogicalPlan.
| Tip | spark-sqlis the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).spark-sql> show databases;defaultTime taken: 0.028 seconds, Fetched 1 row(s) |
|---|---|
Accessing UDF Registration Interface — udfAttribute
udf:
UDFRegistration
udfattribute gives access toUDFRegistrationthat allows registeringuser-defined functionsfor SQL-based queries.
val
spark:
SparkSession
= ...
spark.udf.register(
"myUpper"
, (s:
String
) =
>
s.toUpperCase)
val
strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable(
"strs"
)
scala
>
sql(
"SELECT *, myUpper(value) UPPER FROM strs"
).show
+-----+-----+
|value|
UPPER
|
+-----+-----+
| a|
A
|
| b|
B
|
| c|
C
|
+-----+-----+
Internally, it is simply an alias forSessionState.udfRegistration.
Creating DataFrame for Table — tablemethod
table(tableName:
String
):
DataFrame
tablecreates aDataFramefrom records in thetableNametable (if exists).
val
df = spark.table(
"mytable"
)
Accessing Metastore — catalogAttribute
catalog:
Catalog
catalogattribute is a (lazy) interface to the current metastore, i.e.data catalog(of relational entities like databases, tables, functions, table columns, and temporary views).
| Tip | All methods inCatalogreturnDatasets. |
|---|---|
scala
>
spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table|
default
|
null
|
MANAGED
|
false
|
| strs|
null
|
null
|
TEMPORARY
|
true
|
+------------------+--------+-----------+---------+-----------+
Internally,catalogcreates aCatalogImpl(that uses the currentSparkSession).
Accessing DataFrameReader — readmethod
read:
DataFrameReader
readmethod returns aDataFrameReaderthat is used to read data from external storage systems and load it into aDataFrame.
val
spark:
SparkSession
=
// create instance
val
dfReader:
DataFrameReader
= spark.read
Runtime Configuration — confattribute
conf:
RuntimeConfig
confreturns the current runtime configuration (asRuntimeConfig) that wrapsSQLConf.
| Caution | FIXME |
|---|---|
readStreammethod
readStream:
DataStreamReader
readStreamreturns a newDataStreamReader.
streamsAttribute
streams:
StreamingQueryManager
streamsattribute gives access toStreamingQueryManager(throughSessionState).
val
spark:
SparkSession
= ...
spark.streams.active.foreach(println)
streamingQueryManagerAttribute
streamingQueryManageris…
listenerManagerAttribute
listenerManageris…
ExecutionListenerManager
ExecutionListenerManageris…
functionRegistryAttribute
functionRegistryis…
experimentalMethodsAttribute
experimental:
ExperimentalMethods
experimentalMethodsis an extension point withExperimentalMethodsthat is a per-session collection of extra strategies andRule[LogicalPlan]s.
| Note | experimentalis used inSparkPlannerandSparkOptimizer. Hive andStructured Streaminguse it for their own extra strategies and optimization rules. |
|---|---|
newSessionmethod
newSession():
SparkSession
newSessioncreates (starts) a newSparkSession(with the currentSparkContextandSharedState).
scala
>
println(sc.version)
2.0
.0
-
SNAPSHOT
scala
>
val
newSession = spark.newSession
newSession: org.apache.spark.sql.
SparkSession
= org.apache.spark.sql.
SparkSession
@
122
f58a
Stopping SparkSession — stopMethod
stop():
Unit
stopstops theSparkSession, i.e.stops the underlyingSparkContext.
Create DataFrame from BaseRelation — baseRelationToDataFrameMethod
baseRelationToDataFrame(baseRelation:
BaseRelation
):
DataFrame
Internally,baseRelationToDataFramecreates aDataFramefrom the inputBaseRelationwrapped insideLogicalRelation.
| Note | LogicalRelationis an logical plan adapter forBaseRelation(soBaseRelationcan be part of alogical plan). |
|---|---|
| Note | baseRelationToDataFrameis used when:DataFrameReaderloads data from a data source that supports multiple pathsDataFrameReaderloads data from an external table using JDBCTextInputCSVDataSourcecreates a baseDataset(of Strings)TextInputJsonDataSourcecreates a baseDataset(of Strings) |
|---|---|
Building SessionState — instantiateSessionStateInternal Method
instantiateSessionState(className:
String
, sparkSession:
SparkSession
):
SessionState
instantiateSessionStatefinds theclassNamethat is then used tocreateand immediatellybuildaBaseSessionStateBuilder.
instantiateSessionStatereports aIllegalArgumentExceptionwhile constructing aSessionState:
Error while instantiating '[className]'
| Note | instantiateSessionStateis used exclusively whenSparkSessionis requested forSessionState(and one is not available yet). |
|---|---|