SparkSession - Spark SQL 的入口
SparkSession
是Spark SQL的入口点。 它是使用完全类型化的数据集(Dataset)(或无类型的基于Row的DataFrame) )数据抽象开发Spark SQL应用程序时,您必须创建的第一个对象。
Note | Spark 2.0中,SparkSession 已将SQLContext和HiveContext合并到一个对象。 |
---|---|
你可以用SparkSession.builder 方法创建一个SparkSession
实例
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // avoid hardcoding the deployment environment
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.getOrCreate
然后用 stop 方法停止当前的SparkSession
..............................................................
spark.stop
您可以在单个Spark应用程序中拥有任意数量的SparkSessions
。常见的用例是用关系实体去分离SparkSession
(见[ catalog ](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-SparkSession.html#catalog属性)
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
SparkSession
内部需要一个SparkContext和一个可选的SharedState(保存着跨SparkSession
实例的共享状态).
方法 | 描述 |
builder | “打开”一个builder来获取或创建一个SparkSession 实例 |
version | 返回当前spark的版本 |
implicits | 使用import spark.implicits._ 可以导入隐性转换并将几乎任意的Scala对象创建为Dataset |
emptyDataset[T] | 创建一个空的Dataset[T] |
range | 创建一个Dataset[Long] . |
sql | 执行SQL查询(并返回一个DataFrame ) |
udf | 访问用户定义的函数(UDF) |
table | 用表创建一个DataFrame |
catalog | 访问结构化查询实体的catalog |
read | 用DataFrameReader 从外部文件和存储系统读取DataFrame |
conf | 访问当前的runtime配置 |
readStream | 用DataStreamReader 读取流数据集 |
streams | 用StreamingQueryManager 管理结构化流查询 |
newSession | 创建一个新的SparkSession . |
stop | 停止SparkSession . |
Tip | 用spark.sql.warehouse.dir属性来更改Hive的hive.metastore.warehouse.dir 属性, 即Hive的本地/嵌入式转移数据库的位置(Derby).请参阅SharedState了解Spark SQL对Apache Hive的支持细节. 另请参阅官方Hive Metastore Administration文档 |
---|---|
名称 | 类型 | 描述 |
sessionState |
SessionState | 在内部,sessionState 克隆可选父SessionState(如果创建SparkSession时指定了一个)或使用BaseSessionStateBuilder定义spark.sql.catalogImplementation属性来创建一个新的SessionState: in-memory(默认) for org.apache.spark.sql.internal.SessionStateBuilder hivefor org.apache.spark.sql.hive.HiveSessionStateBuilder |
sharedState |
SharedState |
Note | baseRelationToDataFrame作为SparkSession 桥接对象的一个机制,将BaseRelation 对象层级插入到LogicalPlan对象层级 |
---|---|
创建SparkSession实例
Caution | FIXME |
---|---|
用Builder模式创建 SparkSession — builder
方法
builder():
Builder
builder
会创建一个新的Builder用于使用fluent API构建完全配置的SparkSession
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Tip | 参阅维基百科中的Fluent interface设计模式,自由的百科全书 |
---|---|
获取Spark版本信息 — version
方法
version: String
version
返回当前Spark版本信息
version
内部使用spark.SPARK_VERSION
值,也就是在CLASSPATH中的spark-version-info.properties
属性文件中的version
属性
隐式转换 — implicits
对象
implicits
对象是一个具有Scala隐式方法(也称为转换
)的帮助类,用于将Scala对象转换为Datasets,DataFrames和Columns。它还定义了Scala的“原始”类型的Encoders,例如Int
,Double
,String
,和它们的products和集合。
Note | 通过import spark.implicits._ 导入隐式转换。val spark = SparkSession.builder.getOrCreate() import spark.implicits._ |
---|---|
implicits
对象支持将任意类型的RDD(范围内存在encoder)或case classes或元组和Seq
创建为Dataset
implicits
对象还支持将Scala的Symbol
或者$
转换为Column
它还提供将RDD
或Seq
从Product
类型(例如case
类或元组)转换成DataFrame。它能将RDD
的Int
,Long
和String
直接转换成单个列名的DataFrame
。
It also offers conversions fromRDD
orSeq
ofProduct
types (e.g. case classes or tuples) toDataFrame
. It has direct conversions fromRDD
ofInt
,Long
andString
toDataFrame
with a single column name_1
.
Note | 只能通过对Int ,Long , andString "原始" 类型的RDD 对象使用toDF 方法来实现 |
---|---|
创建空的 Dataset — emptyDataset
方法
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset
创建一个空的 Dataset(假定其类型将会是T
).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset
创建一个LocalRelation
逻辑查询计划.
用本地数据集合或者RDD创建Dataset — createDataset
方法
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset
是一个创建Dataset的试验性API
能将本地Scala
集合转换成Dataset
, i.e.Seq[T]
, Java’sList[T]
, 或者一个分布式RDD[T]
.
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset
会创建一个LocalRelation
逻辑查询计划(如果是数据集合输入) 或者LogicalRDD
(如果是RDD[T]
输入).
Tip | 你最好使用 Scala implicits 或者toDS 方法(这样可以实现自动转换).</br>val spark: SparkSession = ... </br>import spark.implicits._ </br></br>scala> val one = Seq(1).toDS </br>one: org.apache.spark.sql.Dataset[Int] = [value: int] |
---|---|
在内部,createDataset
会先在AttributeReference
的范围内 (schema)查找隐式expression encoder
Note | 目前仅支持未解析的expression encoders |
---|---|
然后使用expression encoder
将Seq[T]输入的元素映射到InternalRows的集合中。使用引用和行,createDataset
返回一个具有LocalRelation
逻辑查询计划的Dataset。
用 一个 Long类型的列
创建 Dataset — range
方法
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range
方法族可以将Long
类型的数值创建为 Dataset
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
Note | The three first variants (that do not specifynumPartitions explicitly) useSparkContext.defaultParallelismfor the number of partitionsnumPartitions . |
---|---|
在内部,range
创建一个带Range
logical plan 和Encoders.LONG
encoder的新Dataset[Long]
.
创建一个空的 DataFrame — emptyDataFrame
方法
emptyDataFrame: DataFrame
emptyDataFrame
创建一个空DataFrame
(没有行和列)
它用一个RDD[Row]
和一个空schema
StructType(Nil)调用createDataFrame
用显示 schema 将 RDD 创建为 DataFrame— createDataFrame
方法
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame
creates aDataFrame
usingRDD[Row]
and the inputschema
. It is assumed that the rows inrowRDD
all match theschema
.
Executing SQL Queries (aka SQL Mode) — sql
Method
sql(sqlText:
String
):
DataFrame
sql
executes thesqlText
SQL statement and creates aDataFrame.
Note | sql is imported inspark-shellso you can execute SQL statements as ifsql were a part of the environment.scala> spark.versionres0: String = 2.2.0-SNAPSHOTscala> :imports 1) import spark.implicits._ (72 terms, 43 are implicit) 2) import spark.sql (1 terms) |
---|---|
scala
>
sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala
>
sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala
>
sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally,sql
requests thecurrentParserInterface
toexecute a SQL querythat gives aLogicalPlan.
Note | sql usesSessionState to access the currentParserInterface . |
---|---|
sql
then creates aDataFrameusing the currentSparkSession
(itself) and theLogicalPlan.
Tip | spark-sqlis the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).spark-sql> show databases;defaultTime taken: 0.028 seconds, Fetched 1 row(s) |
---|---|
Accessing UDF Registration Interface — udf
Attribute
udf:
UDFRegistration
udf
attribute gives access toUDFRegistrationthat allows registeringuser-defined functionsfor SQL-based queries.
val
spark:
SparkSession
= ...
spark.udf.register(
"myUpper"
, (s:
String
) =
>
s.toUpperCase)
val
strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable(
"strs"
)
scala
>
sql(
"SELECT *, myUpper(value) UPPER FROM strs"
).show
+-----+-----+
|value|
UPPER
|
+-----+-----+
| a|
A
|
| b|
B
|
| c|
C
|
+-----+-----+
Internally, it is simply an alias forSessionState.udfRegistration.
Creating DataFrame for Table — table
method
table(tableName:
String
):
DataFrame
table
creates aDataFramefrom records in thetableName
table (if exists).
val
df = spark.table(
"mytable"
)
Accessing Metastore — catalog
Attribute
catalog:
Catalog
catalog
attribute is a (lazy) interface to the current metastore, i.e.data catalog(of relational entities like databases, tables, functions, table columns, and temporary views).
Tip | All methods inCatalog returnDatasets . |
---|---|
scala
>
spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table|
default
|
null
|
MANAGED
|
false
|
| strs|
null
|
null
|
TEMPORARY
|
true
|
+------------------+--------+-----------+---------+-----------+
Internally,catalog
creates aCatalogImpl(that uses the currentSparkSession
).
Accessing DataFrameReader — read
method
read:
DataFrameReader
read
method returns aDataFrameReaderthat is used to read data from external storage systems and load it into aDataFrame
.
val
spark:
SparkSession
=
// create instance
val
dfReader:
DataFrameReader
= spark.read
Runtime Configuration — conf
attribute
conf:
RuntimeConfig
conf
returns the current runtime configuration (asRuntimeConfig
) that wrapsSQLConf.
Caution | FIXME |
---|---|
readStream
method
readStream:
DataStreamReader
readStream
returns a newDataStreamReader.
streams
Attribute
streams:
StreamingQueryManager
streams
attribute gives access toStreamingQueryManager(throughSessionState).
val
spark:
SparkSession
= ...
spark.streams.active.foreach(println)
streamingQueryManager
Attribute
streamingQueryManager
is…
listenerManager
Attribute
listenerManager
is…
ExecutionListenerManager
ExecutionListenerManager
is…
functionRegistry
Attribute
functionRegistry
is…
experimentalMethods
Attribute
experimental:
ExperimentalMethods
experimentalMethods
is an extension point withExperimentalMethodsthat is a per-session collection of extra strategies andRule[LogicalPlan]
s.
Note | experimental is used inSparkPlannerandSparkOptimizer. Hive andStructured Streaminguse it for their own extra strategies and optimization rules. |
---|---|
newSession
method
newSession():
SparkSession
newSession
creates (starts) a newSparkSession
(with the currentSparkContextandSharedState).
scala
>
println(sc.version)
2.0
.0
-
SNAPSHOT
scala
>
val
newSession = spark.newSession
newSession: org.apache.spark.sql.
SparkSession
= org.apache.spark.sql.
SparkSession
@
122
f58a
Stopping SparkSession — stop
Method
stop():
Unit
stop
stops theSparkSession
, i.e.stops the underlyingSparkContext
.
Create DataFrame from BaseRelation — baseRelationToDataFrame
Method
baseRelationToDataFrame(baseRelation:
BaseRelation
):
DataFrame
Internally,baseRelationToDataFrame
creates aDataFramefrom the inputBaseRelationwrapped insideLogicalRelation.
Note | LogicalRelationis an logical plan adapter forBaseRelation (soBaseRelation can be part of alogical plan). |
---|---|
Note | baseRelationToDataFrame is used when:DataFrameReader loads data from a data source that supports multiple pathsDataFrameReader loads data from an external table using JDBCTextInputCSVDataSource creates a baseDataset (of Strings)TextInputJsonDataSource creates a baseDataset (of Strings) |
---|---|
Building SessionState — instantiateSessionState
Internal Method
instantiateSessionState(className:
String
, sparkSession:
SparkSession
):
SessionState
instantiateSessionState
finds theclassName
that is then used tocreateand immediatellybuildaBaseSessionStateBuilder
.
instantiateSessionState
reports aIllegalArgumentException
while constructing aSessionState
:
Error while instantiating '[className]'
Note | instantiateSessionState is used exclusively whenSparkSession is requested forSessionState (and one is not available yet). |
---|---|