Spark SQL — 批量和流式查询大规模结构化数据
像Apache Spark一样,Spark SQL是针对大数据量的分布式内存计算。
Spark SQL与"单纯"的Spark Core的RDD计算模型之间的主要区别是Spark SQL使用了加载,查询和持久化结构化和半结构化数据的框架,可以使用SQL,HiveQL来表示结构化查询;而Spark Core利用的是称为结构化查询DSL的自定义 高可用性 类似SQL的 声明式 类型安全的数据集API。
| Note | 半结构化数据集和结构化数据集是可以分别使用schema隐式或明确描述的记录集合。 |
|---|---|
Spark SQL支持批处理和流模式的结构化查询(后者作为Spark SQL的单独模块称为Structured Streaming)。
| Note | Spark SQL底层会将结构化查询自动编译成对应的RDD操作。 |
|---|---|
无论您用哪种查询语言,所有查询都将成为一个Catalyst表达式的树,同时进一步优化到大型分布式数据集。
从Spark 2.0开始,Spark SQL 实际上是Spark底层内存分布式平台的功能丰富的主要接口。(将Spark Core的RDD隐藏在更高层次的抽象背后)
// 用 case class 定义schema
case class Person(name: String, age: Int)
// 你可以从 CSV 文件读取人员信息
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek", 10)))
// 把 RDD[Person] 转换成 Dataset[Person] 然后进行查询
// 自动推断RDD的schema
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
// 用 Scala Query DSL 查询青少年数据
scala> val teenagers = people.where('age >= 10).where('age <= 19).select('name).as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]
scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+
// 如果想用SQL
// 1. 将人员的 Dataset 注册为 Catalog 里的临时视图
people.createOrReplaceTempView("people")
// 2. 执行查询
val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19")
scala> teenagers.show
+-----+---+
| name|age|
+-----+---+
|Jacek| 10|
+-----+---+
启用Hive后,Spark开发人员可以使用HiveQL读取和写入现有Apache Hive部署中的数据。
sql("CREATE OR REPLACE TEMPORARY VIEW v1 (key INT, value STRING) USING csv OPTIONS ('path'='people.csv', 'header'='true')")
// 用 HiveQL 查询
sql("FROM v1").show
scala> sql("desc EXTENDED v1").show(false)
+----------+---------+-------+
|col_name |data_type|comment|
+----------+---------+-------+
|# col_name|data_type|comment|
|key |int |null |
|value |string |null |
+----------+---------+-------+
跟SQL和NoSQL数据库一样,Spark SQL使用逻辑查询计划优化器,代码生成(通常可能比您自己的自定义手写代码更好)和使用内部二进制行格式的 Tungsten 执行引擎提供性能查询优化。
Spark SQL引入了一个名为 Dataset 的表格数据抽象(以前是 DataFrame )。Dataset数据抽象被设计为使得Spark基础架构上的大量结构化表格数据更加简单快捷。
| Note | 引用Apache Drill的描述,也适用于Spark SQL:用于关系数据库和NoSQL数据库的SQL查询引擎,可直接查询文件中的自描述和半结构化数据,例如JSON或Parquet和 HBase表,而无需在集中式存储中指定元数据定义。 |
|---|---|
以下代码段展示用批处理ETL流程来处理JSON文件,并将其子集保存为CSV。
spark.read
.format("json")
.load("input-json")
.select("name", "score")
.where($"score" > 15)
.write
.format("csv")
.save("output-csv")
使用 Structured Streaming 功能,上述静态批量查询将变成动态,为连续应用程序
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
spark.readStream
.format("json")
.schema(schema)
.load("input-json")
.select("name", "score")
.where('score > 15)
.writeStream
.format("console")
.start
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+
从Spark 2.0开始,Spark SQL的主要数据抽象就是Dataset。它表示一个结构化数据,它是具有已知 schema 的记录。这种结构化数据格式使用压缩柱状格式Dataset实现了紧凑的二进制表示,该格式存储在JVM堆外的托管对象中。它会通过减少内存使用和GC来加快计算速度。
Spark SQL支持谓词下推( predicate pushdown )以优化Dataset查询的性能,并且还可以在运行时生成优化的代码。
Spark SQL附带的不同API可以使用:
- Dataset API(以前称为DataFrame API),具有强类型的类似LINQ的查询DSL,Scala程序员很可能会发现非常有吸引力。
Structured Streaming API (又名流式数据集),用于连续增量执行结构化查询。
非程序员可能通过与Hive直接集成来使用SQL作为查询语言
JDBC / ODBC爱好者可以使用JDBC接口(通过 Thrift JDBC/ODBC Server)并将其工具连接到Spark的分布式查询引擎。
Spark SQL具有使用专门的 DataFrameReader 和 DataFrameWriter对象作为接口访问分布式存储系统(如Cassandra或HDFS(Hive,Parquet,JSON))中的数据
Spark SQL允许您对存放在Hadoop HDFS或兼容Hadoop的文件系统中的大量数据执行类SQL查询。它可以从不同的数据源(文件或表)访问数据。
Spark SQL定义了以下类型的函数:
- 标准函数或用户定义函数 (UDFs),它们将单行中的值作为输入,为每个输入行生成单个返回值。
- 对一组行进行操作并计算每个组的单个返回值的基础聚合函数。
- 窗口聚合函数对一组行进行操作,并为组中的每一行计算单个返回值。
有两种支持的实现类别 - in-memory(默认)和hive - 可以使用spark.sql.catalogImplementation属性设置。
如果您已将csv数据加载到数据帧中,为什么不将其注册为表,并使用Spark SQL来查找max / min或任何其他聚合?SELECT MAX(column_name)FROM dftable_name ...似乎很自然。
您对SQL更加舒适,可能会将该DataFrame注册为表并生成SQL查询(生成一系列最小 - 最大调用的字符串)
您可以从外部数据源分析数据,并让 _schema inferencer_ 推断模式。
// 例 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
.agg(max('j).as("aggOrdering"))
.orderBy(sum('j))
.as[(Int, Int)]
query.collect contains (1, 2) // true
// 例 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
| 1|
| 0|
+-------------------+