添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

本文介绍如何在 Azure Databricks 中使用 Apache Spark Scala 数据帧 API 加载和转换数据。

另请参阅 Apache Spark Scala API 参考

什么是数据帧?

数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。

Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。

什么是 Spark 数据集?

Apache Spark 数据集 API 提供了一个类型安全、面向对象的编程接口。 DataFrame 是非类型化的 Dataset [Row] 的别名。

Azure Databricks 文档使用大多数技术参考和指南中的术语“DataFrame”,因为此语言包含在 Python、Scala 和 R 中。请参阅 Scala 数据集聚合器示例笔记本

使用 Scala 创建数据帧

大多数 Apache Spark 查询返回一个数据帧。 这包括从表中读取、从文件中加载数据,以及转换数据的操作。

还可以从类列表创建数据帧,如以下示例中所示:

case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, "Teo"), new Employee(3, "Fang")).toDF

将表读取到数据帧中

默认情况下,Azure Databricks 对所有表使用 Delta Lake。 可以轻松将表加载到数据帧,如以下示例中所示:

spark.read.table("<catalog-name>.<schema-name>.<table-name>")

将数据从文件加载到数据帧中

可以从许多受支持的文件格式加载数据。 以下示例使用 /databricks-datasets 目录(可从大多数工作区访问)中提供的数据集。 请参阅示例数据集

val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")

向数据帧分配转换步骤

大多数 Spark 转换的结果会返回一个数据帧。 可将这些结果分配回到数据帧变量,类似于在其他系统中使用 CTE、临时视图或数据帧。

使用联接和联合来组合数据帧

数据帧使用标准 SQL 语义执行联接操作。 联接根据提供的匹配条件和联接类型返回两个数据帧的组合结果。 以下示例是一个内部联接(默认联接):

val joined_df = df1.join(df2, joinType="inner", usingColumn="id")

可以使用联合操作将一个数据帧的行添加到另一个数据帧,如以下示例中所示:

val unioned_df = df1.union(df2)

筛选数据帧中的行

可以使用 .filter().where() 筛选数据帧中的行。 性能或语法没有差别,如以下示例中所示:

val filtered_df = df.filter("id > 1")
val filtered_df = df.where("id > 1")

使用筛选来选择要在数据帧中返回或修改的行子集。

从数据帧中选择列

可以通过将一个或多个列名传递给 .select() 来选择列,如以下示例中所示:

val select_df = df.select("id", "name")

可以组合 select 和 filter 查询来限制返回的行和列。

subset_df = df.filter("id > 1").select("name")

查看数据帧

若要以表格格式查看此数据,可以使用 Azure Databricks display() 命令,如以下示例中所示:

display(df)

Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。

Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。

可以使用 .printSchema() 方法输出架构,如以下示例中所示:

df.printSchema()

将数据帧保存到表中

默认情况下,Azure Databricks 对所有表使用 Delta Lake。 可使用以下语法将数据帧的内容保存到表中:

df.write.saveAsTable("<table-name>")

将数据帧写入文件集合

大多数 Spark 应用程序旨在处理大型数据集并以分布方式工作,而 Spark 写出文件的目录而不是单个文件。 许多数据系统已配置为读取这些文件目录。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。

以下示例保存 JSON 文件的目录:

df.write.format("json").save("/tmp/json_data")

在 Spark 中运行 SQL 查询

Spark 数据帧提供了许多选项用于将 SQL 与 Scala 相组合。

使用 selectExpr() 方法可将每一列指定为 SQL 查询,如以下示例中所示:

display(df.selectExpr("id", "upper(name) as big_name"))

可以从 pyspark.sql.functions 导入 expr() 函数,以便在要指定列的任何位置使用 SQL 语法,如以下示例中所示:

import org.apache.spark.sql.functions.expr
display(df.select('id, expr("lower(name) as little_name")))

还可以使用 spark.sql() 在 Scala 内核中运行任意 SQL 查询,如以下示例中所示:

val query_df = spark.sql("SELECT * FROM <table-name>")

由于逻辑在 Scala 内核中执行,并且所有 SQL 查询将作为字符串传递,因此你可以使用 Scala 格式设置来参数化 SQL 查询,如以下示例中所示:

val table_name = "my_table"
val query_df = spark.sql(s"SELECT * FROM $table_name")

Scala 数据集聚合器示例笔记本

以下笔记本演示了如何使用数据集聚合器。

数据集聚合器笔记本

获取笔记本