SparkSession - Spark SQL 的入口

SparkSession是Spark SQL的入口点。 它是使用完全类型化的数据集(Dataset)(或无类型的基于Row的DataFrame) )数据抽象开发Spark SQL应用程序时,您必须创建的第一个对象。

Note Spark 2.0中,SparkSession已将SQLContextHiveContext合并到一个对象。

你可以用SparkSession.builder 方法创建一个SparkSession实例

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
  .appName("My Spark Application")  // optional and will be autogenerated if not specified
  .master("local[*]")               // avoid hardcoding the deployment environment
  .enableHiveSupport()              // self-explanatory, isn't it?
  .config("spark.sql.warehouse.dir", "target/spark-warehouse")
  .getOrCreate

然后用 stop 方法停止当前的SparkSession

..............................................................

spark.stop

您可以在单个Spark应用程序中拥有任意数量的SparkSessions。常见的用例是用关系实体去分离SparkSession(见[ catalog ](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-SparkSession.html#catalog属性)

scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
|              name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default|       null|  MANAGED|      false|
|              strs|    null|       null|TEMPORARY|       true|
+------------------+--------+-----------+---------+-----------+

SparkSession内部需要一个SparkContext和一个可选的SharedState(保存着跨SparkSession实例的共享状态).

方法 描述
builder “打开”一个builder来获取或创建一个SparkSession实例
version 返回当前spark的版本
implicits 使用import spark.implicits._可以导入隐性转换并将几乎任意的Scala对象创建为Dataset
emptyDataset[T] 创建一个空的Dataset[T]
range 创建一个Dataset[Long].
sql 执行SQL查询(并返回一个DataFrame
udf 访问用户定义的函数(UDF)
table 用表创建一个DataFrame
catalog 访问结构化查询实体的catalog
read DataFrameReader从外部文件和存储系统读取DataFrame
conf 访问当前的runtime配置
readStream DataStreamReader读取流数据集
streams StreamingQueryManager管理结构化流查询
newSession 创建一个新的SparkSession.
stop 停止SparkSession.
Tip spark.sql.warehouse.dir属性来更改Hive的hive.metastore.warehouse.dir属性, 即Hive的本地/嵌入式转移数据库的位置(Derby).
请参阅SharedState了解Spark SQL对Apache Hive的支持细节.
另请参阅官方Hive Metastore Administration文档
名称 类型 描述
sessionState SessionState 在内部,sessionState 克隆可选父SessionState(如果创建SparkSession时指定了一个)或使用BaseSessionStateBuilder定义spark.sql.catalogImplementation属性来创建一个新的SessionState:
in-memory(默认) fororg.apache.spark.sql.internal.SessionStateBuilder
hivefororg.apache.spark.sql.hive.HiveSessionStateBuilder
sharedState SharedState
Note baseRelationToDataFrame作为SparkSession桥接对象的一个机制,将BaseRelation对象层级插入到LogicalPlan对象层级

创建SparkSession实例

Caution FIXME

用Builder模式创建 SparkSession — builder方法

builder(): 
Builder

builder会创建一个新的Builder用于使用fluent API构建完全配置的SparkSession

import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Tip 参阅维基百科中的Fluent interface设计模式,自由的百科全书

获取Spark版本信息 — version方法

version: String

version返回当前Spark版本信息

version内部使用spark.SPARK_VERSION值,也就是在CLASSPATH中的spark-version-info.properties属性文件中的version属性

隐式转换 — implicits对象

implicits对象是一个具有Scala隐式方法(也称为转换)的帮助类,用于将Scala对象转换为DatasetsDataFramesColumns。它还定义了Scala的“原始”类型的Encoders,例如IntDoubleString,和它们的products和集合。

Note 通过import spark.implicits._导入隐式转换。
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

implicits对象支持将任意类型的RDD(范围内存在encoder)或case classes或元组和Seq创建为Dataset

implicits对象还支持将Scala的Symbol或者$转换为Column

它还提供将RDDSeqProduct类型(例如case类或元组)转换成DataFrame。它能将RDDIntLongString直接转换成单个列名的DataFrame

It also offers conversions fromRDDorSeqofProducttypes (e.g. case classes or tuples) toDataFrame. It has direct conversions fromRDDofInt,LongandStringtoDataFramewith a single column name_1.

Note 只能通过对Int,Long, andString"原始" 类型的RDD对象使用toDF方法来实现

创建空的 Dataset — emptyDataset方法

emptyDataset[T: Encoder]: Dataset[T]

emptyDataset创建一个空的 Dataset(假定其类型将会是T).

scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]

scala> strings.printSchema
root
 |-- value: string (nullable = true)

emptyDataset创建一个LocalRelation逻辑查询计划.

用本地数据集合或者RDD创建Dataset — createDataset方法

createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]

createDataset是一个创建Dataset的试验性API 能将本地Scala集合转换成Dataset, i.e.Seq[T], Java’sList[T], 或者一个分布式RDD[T].

scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> one.show
+-----+
|value|
+-----+
|    1|
+-----+

createDataset会创建一个LocalRelation逻辑查询计划(如果是数据集合输入) 或者LogicalRDD(如果是RDD[T]输入).

Tip 你最好使用 Scala implicits 或者toDS方法(这样可以实现自动转换).</br>val spark: SparkSession = ...</br>import spark.implicits._</br></br>scala> val one = Seq(1).toDS</br>one: org.apache.spark.sql.Dataset[Int] = [value: int]

在内部,createDataset会先在AttributeReference的范围内 (schema)查找隐式expression encoder

Note 目前仅支持未解析的expression encoders

然后使用expression encoder将Seq[T]输入的元素映射到InternalRows的集合中。使用引用和行,createDataset返回一个具有LocalRelation逻辑查询计划Dataset

用 一个 Long类型的列 创建 Dataset  — range方法

range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

range方法族可以将Long类型的数值创建为 Dataset

scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
|  0|
|  2|
+---+
Note The three first variants (that do not specifynumPartitionsexplicitly) useSparkContext.defaultParallelismfor the number of partitionsnumPartitions.

在内部,range创建一个带Rangelogical planEncoders.LONGencoder的新Dataset[Long].

创建一个空的 DataFrame — emptyDataFrame方法

emptyDataFrame: DataFrame

emptyDataFrame创建一个空DataFrame(没有行和列)

它用一个RDD[Row]和一个空schemaStructType(Nil)调用createDataFrame

用显示 schema 将 RDD 创建为 DataFrame— createDataFrame方法

createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

createDataFramecreates aDataFrameusingRDD[Row]and the inputschema. It is assumed that the rows inrowRDDall match theschema.

Executing SQL Queries (aka SQL Mode) — sqlMethod

sql(sqlText: 
String
): 
DataFrame

sqlexecutes thesqlTextSQL statement and creates aDataFrame.

Note sqlis imported inspark-shellso you can execute SQL statements as ifsqlwere a part of the environment.scala> spark.versionres0: String = 2.2.0-SNAPSHOTscala> :imports 1) import spark.implicits._ (72 terms, 43 are implicit) 2) import spark.sql (1 terms)
scala
>
 sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]

scala
>
 sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []

// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")

scala
>
 sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata|      false|
+---------+-----------+

Internally,sqlrequests thecurrentParserInterfacetoexecute a SQL querythat gives aLogicalPlan.

Note sqlusesSessionStateto access the currentParserInterface.

sqlthen creates aDataFrameusing the currentSparkSession(itself) and theLogicalPlan.

Tip spark-sqlis the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).spark-sql> show databases;defaultTime taken: 0.028 seconds, Fetched 1 row(s)

Accessing UDF Registration Interface — udfAttribute

udf: 
UDFRegistration

udfattribute gives access toUDFRegistrationthat allows registeringuser-defined functionsfor SQL-based queries.

val
 spark: 
SparkSession
 = ...
spark.udf.register(
"myUpper"
, (s: 
String
) =
>
 s.toUpperCase)


val
 strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable(
"strs"
)

scala
>
 sql(
"SELECT *, myUpper(value) UPPER FROM strs"
).show
+-----+-----+
|value|
UPPER
|
+-----+-----+
|    a|    
A
|
|    b|    
B
|
|    c|    
C
|
+-----+-----+

Internally, it is simply an alias forSessionState.udfRegistration.

Creating DataFrame for Table — tablemethod

table(tableName: 
String
): 
DataFrame

tablecreates aDataFramefrom records in thetableNametable (if exists).

val
 df = spark.table(
"mytable"
)

Accessing Metastore — catalogAttribute

catalog: 
Catalog

catalogattribute is a (lazy) interface to the current metastore, i.e.data catalog(of relational entities like databases, tables, functions, table columns, and temporary views).

Tip All methods inCatalogreturnDatasets.
scala
>
 spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
|              name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| 
default
|       
null
|  
MANAGED
|      
false
|
|              strs|    
null
|       
null
|
TEMPORARY
|       
true
|
+------------------+--------+-----------+---------+-----------+

Internally,catalogcreates aCatalogImpl(that uses the currentSparkSession).

Accessing DataFrameReader — readmethod

read: 
DataFrameReader

readmethod returns aDataFrameReaderthat is used to read data from external storage systems and load it into aDataFrame.

val
 spark: 
SparkSession
 = 
// create instance
val
 dfReader: 
DataFrameReader
 = spark.read

Runtime Configuration — confattribute

conf: 
RuntimeConfig

confreturns the current runtime configuration (asRuntimeConfig) that wrapsSQLConf.

Caution FIXME

readStreammethod

readStream: 
DataStreamReader

readStreamreturns a newDataStreamReader.

streamsAttribute

streams: 
StreamingQueryManager

streamsattribute gives access toStreamingQueryManager(throughSessionState).

val
 spark: 
SparkSession
 = ...
spark.streams.active.foreach(println)

streamingQueryManagerAttribute

streamingQueryManageris…​

listenerManagerAttribute

listenerManageris…​

ExecutionListenerManager

ExecutionListenerManageris…​

functionRegistryAttribute

functionRegistryis…​

experimentalMethodsAttribute

experimental: 
ExperimentalMethods

experimentalMethodsis an extension point withExperimentalMethodsthat is a per-session collection of extra strategies andRule[LogicalPlan]s.

Note experimentalis used inSparkPlannerandSparkOptimizer. Hive andStructured Streaminguse it for their own extra strategies and optimization rules.

newSessionmethod

newSession(): 
SparkSession

newSessioncreates (starts) a newSparkSession(with the currentSparkContextandSharedState).

scala
>
 println(sc.version)

2.0
.0
-
SNAPSHOT


scala
>
val
 newSession = spark.newSession
newSession: org.apache.spark.sql.
SparkSession
 = org.apache.spark.sql.
SparkSession
@
122
f58a

Stopping SparkSession — stopMethod

stop(): 
Unit

stopstops theSparkSession, i.e.stops the underlyingSparkContext.

Create DataFrame from BaseRelation — baseRelationToDataFrameMethod

baseRelationToDataFrame(baseRelation: 
BaseRelation
): 
DataFrame

Internally,baseRelationToDataFramecreates aDataFramefrom the inputBaseRelationwrapped insideLogicalRelation.

Note LogicalRelationis an logical plan adapter forBaseRelation(soBaseRelationcan be part of alogical plan).
Note baseRelationToDataFrameis used when:DataFrameReaderloads data from a data source that supports multiple pathsDataFrameReaderloads data from an external table using JDBCTextInputCSVDataSourcecreates a baseDataset(of Strings)TextInputJsonDataSourcecreates a baseDataset(of Strings)

Building SessionState — instantiateSessionStateInternal Method

instantiateSessionState(className: 
String
, sparkSession: 
SparkSession
): 
SessionState

instantiateSessionStatefinds theclassNamethat is then used tocreateand immediatellybuildaBaseSessionStateBuilder.

instantiateSessionStatereports aIllegalArgumentExceptionwhile constructing aSessionState:

Error while instantiating '[className]'
Note instantiateSessionStateis used exclusively whenSparkSessionis requested forSessionState(and one is not available yet).

results matching ""

    No results matching ""