清理未參考的檔案。
您可以從連結至 Azure Databricks 計算資源的筆記本內
,執行本文中的範例 Python、Scala 和 SQL 程式代碼,例如
叢集
。 您也可以從 Databricks SQL
中與
SQL 倉儲
相關聯的查詢
中
執行本文中的 SQL 程式代碼。
準備源數據
本教學課程依賴名為People 10 M的數據集。它包含 1000 萬筆虛構記錄,這些記錄保存著有關人們的事實,例如名字和姓氏、出生日期和工資。 本教學課程假設此數據集位於與您的目標 Azure Databricks 工作區相關聯的 Unity 目錄
磁碟
區中。
若要取得本教學課程的People 10 M數據集,請執行下列動作:
移至
Kaggle 中的 [人員 10 M
] 頁面。
按兩下 [
下載
] 以將名為
archive.zip
的檔案下載到本機電腦。
從檔案擷取名為
export.csv
的
archive.zip
檔案。 檔案
export.csv
包含本教學課程的數據。
若要將
export.csv
檔案上傳至磁碟區,請執行下列動作:
在提要欄中,按兩下
[目錄
]。
在
[目錄總管] 中
,流覽至並開啟您要上傳檔案的
export.csv
磁碟區。
按兩下 [
上傳至此磁碟區
]。
拖放,或流覽至本機計算機上的檔案並加以選取
export.csv
。
按一下 [上傳] 。
在下列程式代碼範例中,將 取代
/Volumes/main/default/my-volume/export.csv
為您目標磁碟區中的檔案路徑
export.csv
。
建立數據表
根據預設,在 Azure Databricks 上建立的所有數據表都會使用 Delta Lake。 Databricks 建議使用 Unity 目錄受控數據表。
在上述程式代碼範例和下列程式代碼範例中,將數據表名稱
main.default.people_10m
取代為 Unity 目錄中的目標三部分目錄、架構和數據表名稱。
Delta Lake 是 Azure Databricks 的所有讀取、寫入和數據表建立命令的預設值。
Python(程式語言)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.write.saveAsTable("main.default.people_10m")
程式語言 Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
上述作業會建立新的Managed數據表。 如需建立 Delta 資料表時可用選項的相關信息,請參閱 CREATE TABLE。
在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 CREATE TABLE LIKE 建立新的空白 Delta 數據表,以複製來源 Delta 數據表的架構和數據表屬性。 這在將數據表從開發環境升級為生產環境時特別有用,如下列程式代碼範例所示:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
若要建立空的數據表,您也可以在 Delta Lake for DeltaTableBuilder 和 Scala 中使用 API。 相較於對等的 DataFrameWriter API,這些 API 可讓您更輕鬆地指定其他資訊,例如數據行批註、數據表屬性和 產生的數據行。
這項功能處於公開預覽狀態。
Python(程式語言)
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
程式語言 Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
向上插入數據表
若要將一組更新和插入合併到現有的 Delta 數據表中,您可以使用 DeltaTable.merge 方法來 Python 和 Scala,以及 SQL 的 MERGE INTO 語句。 例如,下列範例會從源數據表取得數據,並將它合併至目標 Delta 數據表。 當這兩個數據表中有相符的數據列時,Delta Lake 會使用指定的表達式來更新數據行。 當沒有相符的數據列時,Delta Lake 會加入新的數據列。 這項作業稱為 upsert。
Python(程式語言)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
程式語言 Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
在 SQL 中,如果您指定 *,這會更新或插入目標數據表中的所有資料行,假設源數據表的數據行與目標數據表相同。 如果目標數據表沒有相同的數據行,查詢會擲回分析錯誤。
當您執行插入作業時,您必須為資料表中的每個資料行指定值(例如,當現有數據集中沒有相符的數據列時)。 不過,您不需要更新所有值。
若要查看結果,請查詢數據表。
Python(程式語言)
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
程式語言 Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SELECT * FROM main.default.people_10m WHERE id >= 9999998
讀取數據表
您可以依資料表名稱或資料表路徑存取 Delta 資料表中的數據,如下列範例所示:
Python(程式語言)
people_df = spark.read.table("main.default.people_10m")
display(people_df)
程式語言 Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SELECT * FROM main.default.people_10m;
寫入數據表
Delta Lake 使用標準語法將數據寫入數據表。
若要以不可部分完成的方式將新數據新增至現有的 Delta 數據表,請使用附加模式,如下列範例所示:
Python(程式語言)
df.write.mode("append").saveAsTable("main.default.people_10m")
程式語言 Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
若要取代數據表中的所有數據,請使用覆寫模式,如下列範例所示:
Python(程式語言)
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
程式語言 Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
更新數據表
您可以更新符合 Delta 數據表中述詞的數據。 例如,在範例people_10m數據表中,若要從 或變更 或 gender 資料行M中的F縮寫,MaleFemale您可以執行下列命令:
Python(程式語言)
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
程式語言 Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
從資料表中刪除
您可以從 Delta 數據表移除符合述詞的數據。 例如,在範例 people_10m 數據表中,若要刪除與資料行中 birthDate 值相對應的所有資料列 1955,您可以執行下列命令:
Python(程式語言)
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
程式語言 Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
刪除會從最新版的 Delta 數據表中移除數據,但在明確清除舊版之前,不會從實體記憶體中移除它。 如需詳細資訊,請參閱 真空 。
顯示數據表歷程記錄
若要檢視數據表的歷程記錄,您可以使用 DeltaTable.history 和 Scala的 方法,以及 SQL 中的 DESCRIBE HISTORY 語句,其提供數據表版本、作業、使用者等,以針對每個寫入數據表提供證明資訊。
Python(程式語言)
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
程式語言 Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
DESCRIBE HISTORY main.default.people_10m
查詢舊版的資料表 (時程移動)
Delta Lake 時間移動可讓您查詢差異數據表的較舊快照集。
若要查詢舊版的數據表,請指定數據表的版本或時間戳。 例如,若要從上述歷程記錄查詢第0版或時間戳 2024-05-15T22:43:15.000+00:00Z ,請使用下列專案:
Python(程式語言)
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
程式語言 Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
對於時間戳,只接受日期或時間戳字串,例如 "2024-05-15T22:43:15.000+00:00" 或 "2024-05-15 22:43:15"。
DataFrameReader 選項可讓您從已修正為數據表特定版本或時間戳的 Delta 數據表建立 DataFrame,例如:
Python(程式語言)
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
程式語言 Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
如需詳細資訊,請參閱 使用 Delta Lake 數據表歷程記錄。
優化數據表
對數據表執行多個變更之後,您可能有許多小型檔案。 若要改善讀取查詢的速度,您可以使用優化作業將小型檔案折疊成較大的檔案:
Python(程式語言)
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
程式語言 Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
OPTIMIZE main.default.people_10m
依數據行排序 Z 順序
若要進一步改善讀取效能,您可以依 z 順序將同一組檔案中的相關信息共置。 Delta Lake 數據略過演算法會使用此組合來大幅減少需要讀取的數據量。 若為迭置順序數據,您可以指定要依作業以迭置順序排序的數據行。 例如,若要由 gender共置 ,請執行:
Python(程式語言)
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
程式語言 Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
如需執行優化作業時可用的一組完整選項,請參閱 優化數據檔配置。
使用清除快照集 VACUUM
Delta Lake 提供讀取的快照集隔離,這表示即使其他使用者或作業正在查詢數據表,也能安全地執行優化作業。 不過,您最終應該清除舊的快照集。 您可以執行真空作業來執行這項操作:
Python(程式語言)
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
程式語言 Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
VACUUM main.default.people_10m
如需有效使用真空作業的詳細資訊,請參閱 使用真空移除未使用的數據檔。