本文介绍如何在 Azure Databricks 中使用 Apache Spark Scala 数据帧 API 加载和转换数据。
另请参阅
Apache Spark Scala API 参考
。
什么是数据帧?
数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。
Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。
什么是 Spark 数据集?
Apache Spark
数据集 API
提供了一个类型安全、面向对象的编程接口。
DataFrame
是非类型化的
Dataset [Row]
的别名。
Azure Databricks 文档使用大多数技术参考和指南中的术语“DataFrame”,因为此语言包含在 Python、Scala 和 R 中。请参阅
Scala 数据集聚合器示例笔记本
。
使用 Scala 创建数据帧
大多数 Apache Spark 查询返回一个数据帧。 这包括从表中读取、从文件中加载数据,以及转换数据的操作。
还可以从类列表创建数据帧,如以下示例中所示:
case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, "Teo"), new Employee(3, "Fang")).toDF
将表读取到数据帧中
默认情况下,Azure Databricks 对所有表使用 Delta Lake。 可以轻松将表加载到数据帧,如以下示例中所示:
spark.read.table("<catalog-name>.<schema-name>.<table-name>")
将数据从文件加载到数据帧中
可以从许多受支持的文件格式加载数据。 以下示例使用 /databricks-datasets
目录(可从大多数工作区访问)中提供的数据集。 请参阅示例数据集。
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
大多数 Spark 转换的结果会返回一个数据帧。 可将这些结果分配回到数据帧变量,类似于在其他系统中使用 CTE、临时视图或数据帧。
使用联接和联合来组合数据帧
数据帧使用标准 SQL 语义执行联接操作。 联接根据提供的匹配条件和联接类型返回两个数据帧的组合结果。 以下示例是一个内部联接(默认联接):
val joined_df = df1.join(df2, joinType="inner", usingColumn="id")
可以使用联合操作将一个数据帧的行添加到另一个数据帧,如以下示例中所示:
val unioned_df = df1.union(df2)
筛选数据帧中的行
可以使用 .filter()
或 .where()
筛选数据帧中的行。 性能或语法没有差别,如以下示例中所示:
val filtered_df = df.filter("id > 1")
val filtered_df = df.where("id > 1")
使用筛选来选择要在数据帧中返回或修改的行子集。
从数据帧中选择列
可以通过将一个或多个列名传递给 .select()
来选择列,如以下示例中所示:
val select_df = df.select("id", "name")
可以组合 select 和 filter 查询来限制返回的行和列。
subset_df = df.filter("id > 1").select("name")
查看数据帧
若要以表格格式查看此数据,可以使用 Azure Databricks display()
命令,如以下示例中所示:
display(df)
输出数据架构
Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。
Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。
可以使用 .printSchema()
方法输出架构,如以下示例中所示:
df.printSchema()
将数据帧保存到表中
默认情况下,Azure Databricks 对所有表使用 Delta Lake。 可使用以下语法将数据帧的内容保存到表中:
df.write.saveAsTable("<table-name>")
将数据帧写入文件集合
大多数 Spark 应用程序旨在处理大型数据集并以分布方式工作,而 Spark 写出文件的目录而不是单个文件。 许多数据系统已配置为读取这些文件目录。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。
以下示例保存 JSON 文件的目录:
df.write.format("json").save("/tmp/json_data")
在 Spark 中运行 SQL 查询
Spark 数据帧提供了许多选项用于将 SQL 与 Scala 相组合。
使用 selectExpr()
方法可将每一列指定为 SQL 查询,如以下示例中所示:
display(df.selectExpr("id", "upper(name) as big_name"))
可以从 pyspark.sql.functions
导入 expr()
函数,以便在要指定列的任何位置使用 SQL 语法,如以下示例中所示:
import org.apache.spark.sql.functions.expr
display(df.select('id, expr("lower(name) as little_name")))
还可以使用 spark.sql()
在 Scala 内核中运行任意 SQL 查询,如以下示例中所示:
val query_df = spark.sql("SELECT * FROM <table-name>")
由于逻辑在 Scala 内核中执行,并且所有 SQL 查询将作为字符串传递,因此你可以使用 Scala 格式设置来参数化 SQL 查询,如以下示例中所示:
val table_name = "my_table"
val query_df = spark.sql(s"SELECT * FROM $table_name")
Scala 数据集聚合器示例笔记本
以下笔记本演示了如何使用数据集聚合器。
数据集聚合器笔记本
获取笔记本