添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
清理未參考的檔案。

您可以從連結至 Azure Databricks 計算資源的筆記本內 ,執行本文中的範例 Python、Scala 和 SQL 程式代碼,例如 叢集 。 您也可以從 Databricks SQL 中與 SQL 倉儲 相關聯的查詢 執行本文中的 SQL 程式代碼。

準備源數據

本教學課程依賴名為People 10 M的數據集。它包含 1000 萬筆虛構記錄,這些記錄保存著有關人們的事實,例如名字和姓氏、出生日期和工資。 本教學課程假設此數據集位於與您的目標 Azure Databricks 工作區相關聯的 Unity 目錄 磁碟 區中。

若要取得本教學課程的People 10 M數據集,請執行下列動作:

  • 移至 Kaggle 中的 [人員 10 M ] 頁面。
  • 按兩下 [ 下載 ] 以將名為 archive.zip 的檔案下載到本機電腦。
  • 從檔案擷取名為 export.csv archive.zip 檔案。 檔案 export.csv 包含本教學課程的數據。
  • 若要將 export.csv 檔案上傳至磁碟區,請執行下列動作:

  • 在提要欄中,按兩下 [目錄 ]。
  • [目錄總管] 中 ,流覽至並開啟您要上傳檔案的 export.csv 磁碟區。
  • 按兩下 [ 上傳至此磁碟區 ]。
  • 拖放,或流覽至本機計算機上的檔案並加以選取 export.csv
  • 按一下 [上傳] 。
  • 在下列程式代碼範例中,將 取代 /Volumes/main/default/my-volume/export.csv 為您目標磁碟區中的檔案路徑 export.csv

    建立數據表

    根據預設,在 Azure Databricks 上建立的所有數據表都會使用 Delta Lake。 Databricks 建議使用 Unity 目錄受控數據表。

    在上述程式代碼範例和下列程式代碼範例中,將數據表名稱 main.default.people_10m 取代為 Unity 目錄中的目標三部分目錄、架構和數據表名稱。

    Delta Lake 是 Azure Databricks 的所有讀取、寫入和數據表建立命令的預設值。

    Python(程式語言)

    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
    schema = StructType([
      StructField("id", IntegerType(), True),
      StructField("firstName", StringType(), True),
      StructField("middleName", StringType(), True),
      StructField("lastName", StringType(), True),
      StructField("gender", StringType(), True),
      StructField("birthDate", TimestampType(), True),
      StructField("ssn", StringType(), True),
      StructField("salary", IntegerType(), True)
    df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
    # Create the table if it does not exist. Otherwise, replace the existing table.
    df.writeTo("main.default.people_10m").createOrReplace()
    # If you know the table does not already exist, you can call this instead:
    # df.write.saveAsTable("main.default.people_10m")
    

    程式語言 Scala

    import org.apache.spark.sql.types._
    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = true),
      StructField("firstName", StringType, nullable = true),
      StructField("middleName", StringType, nullable = true),
      StructField("lastName", StringType, nullable = true),
      StructField("gender", StringType, nullable = true),
      StructField("birthDate", TimestampType, nullable = true),
      StructField("ssn", StringType, nullable = true),
      StructField("salary", IntegerType, nullable = true)
    val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
    // Create the table if it does not exist. Otherwise, replace the existing table.
    df.writeTo("main.default.people_10m").createOrReplace()
    // If you know that the table doesn't exist, call this instead:
    // df.saveAsTable("main.default.people_10m")
    
    CREATE OR REPLACE TABLE main.default.people_10m (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    COPY INTO main.default.people_10m
    FROM '/Volumes/main/default/my-volume/export.csv'
    FILEFORMAT = CSV
    FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
    

    上述作業會建立新的Managed數據表。 如需建立 Delta 資料表時可用選項的相關信息,請參閱 CREATE TABLE

    在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 CREATE TABLE LIKE 建立新的空白 Delta 數據表,以複製來源 Delta 數據表的架構和數據表屬性。 這在將數據表從開發環境升級為生產環境時特別有用,如下列程式代碼範例所示:

    CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
    

    若要建立空的數據表,您也可以在 Delta Lake for DeltaTableBuilderScala 中使用 API。 相較於對等的 DataFrameWriter API,這些 API 可讓您更輕鬆地指定其他資訊,例如數據行批註、數據表屬性和 產生的數據行

    這項功能處於公開預覽狀態

    Python(程式語言)

    DeltaTable.createIfNotExists(spark)
      .tableName("main.default.people_10m")
      .addColumn("id", "INT")
      .addColumn("firstName", "STRING")
      .addColumn("middleName", "STRING")
      .addColumn("lastName", "STRING", comment = "surname")
      .addColumn("gender", "STRING")
      .addColumn("birthDate", "TIMESTAMP")
      .addColumn("ssn", "STRING")
      .addColumn("salary", "INT")
      .execute()
    

    程式語言 Scala

    DeltaTable.createOrReplace(spark)
      .tableName("main.default.people_10m")
      .addColumn("id", "INT")
      .addColumn("firstName", "STRING")
      .addColumn("middleName", "STRING")
      .addColumn(
        DeltaTable.columnBuilder("lastName")
          .dataType("STRING")
          .comment("surname")
          .build())
      .addColumn("lastName", "STRING", comment = "surname")
      .addColumn("gender", "STRING")
      .addColumn("birthDate", "TIMESTAMP")
      .addColumn("ssn", "STRING")
      .addColumn("salary", "INT")
      .execute()
                  向上插入數據表
    

    若要將一組更新和插入合併到現有的 Delta 數據表中,您可以使用 DeltaTable.merge 方法來 PythonScala,以及 SQL 的 MERGE INTO 語句。 例如,下列範例會從源數據表取得數據,並將它合併至目標 Delta 數據表。 當這兩個數據表中有相符的數據列時,Delta Lake 會使用指定的表達式來更新數據行。 當沒有相符的數據列時,Delta Lake 會加入新的數據列。 這項作業稱為 upsert

    Python(程式語言)

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
    from datetime import date
    schema = StructType([
      StructField("id", IntegerType(), True),
      StructField("firstName", StringType(), True),
      StructField("middleName", StringType(), True),
      StructField("lastName", StringType(), True),
      StructField("gender", StringType(), True),
      StructField("birthDate", DateType(), True),
      StructField("ssn", StringType(), True),
      StructField("salary", IntegerType(), True)
    data = [
      (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
      (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
      (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
      (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
      (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
      (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
    people_10m_updates = spark.createDataFrame(data, schema)
    people_10m_updates.createTempView("people_10m_updates")
    # ...
    from delta.tables import DeltaTable
    deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
    (deltaTable.alias("people_10m")
      .merge(
        people_10m_updates.alias("people_10m_updates"),
        "people_10m.id = people_10m_updates.id")
      .whenMatchedUpdateAll()
      .whenNotMatchedInsertAll()
      .execute()
    

    程式語言 Scala

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import java.sql.Timestamp
    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = true),
      StructField("firstName", StringType, nullable = true),
      StructField("middleName", StringType, nullable = true),
      StructField("lastName", StringType, nullable = true),
      StructField("gender", StringType, nullable = true),
      StructField("birthDate", TimestampType, nullable = true),
      StructField("ssn", StringType, nullable = true),
      StructField("salary", IntegerType, nullable = true)
    val data = Seq(
      Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
      Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
      Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
      Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
      Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
      Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
    val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    people_10m_updates.createOrReplaceTempView("people_10m_updates")
    // ...
    import io.delta.tables.DeltaTable
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaTable.as("people_10m")
      .merge(
        people_10m_updates.as("people_10m_updates"),
        "people_10m.id = people_10m_updates.id"
      .whenMatched()
      .updateAll()
      .whenNotMatched()
      .insertAll()
      .execute()
    
    CREATE OR REPLACE TEMP VIEW people_10m_updates (
      id, firstName, middleName, lastName, gender, birthDate, ssn, salary
    ) AS VALUES
      (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
      (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
      (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
      (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
      (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
      (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
    MERGE INTO people_10m
    USING people_10m_updates
    ON people_10m.id = people_10m_updates.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    

    在 SQL 中,如果您指定 *,這會更新或插入目標數據表中的所有資料行,假設源數據表的數據行與目標數據表相同。 如果目標數據表沒有相同的數據行,查詢會擲回分析錯誤。

    當您執行插入作業時,您必須為資料表中的每個資料行指定值(例如,當現有數據集中沒有相符的數據列時)。 不過,您不需要更新所有值。

    若要查看結果,請查詢數據表。

    Python(程式語言)

    df = spark.read.table("main.default.people_10m")
    df_filtered = df.filter(df["id"] >= 9999998)
    display(df_filtered)
    

    程式語言 Scala

    val df = spark.read.table("main.default.people_10m")
    val df_filtered = df.filter($"id" >= 9999998)
    display(df_filtered)
    
    SELECT * FROM main.default.people_10m WHERE id >= 9999998
                  讀取數據表
    

    您可以依資料表名稱或資料表路徑存取 Delta 資料表中的數據,如下列範例所示:

    Python(程式語言)

    people_df = spark.read.table("main.default.people_10m")
    display(people_df)
    

    程式語言 Scala

    val people_df = spark.read.table("main.default.people_10m")
    display(people_df)
    
    SELECT * FROM main.default.people_10m;
                  寫入數據表
    

    Delta Lake 使用標準語法將數據寫入數據表。

    若要以不可部分完成的方式將新數據新增至現有的 Delta 數據表,請使用附加模式,如下列範例所示:

    Python(程式語言)

    df.write.mode("append").saveAsTable("main.default.people_10m")
    

    程式語言 Scala

    df.write.mode("append").saveAsTable("main.default.people_10m")
    
    INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
    

    若要取代數據表中的所有數據,請使用覆寫模式,如下列範例所示:

    Python(程式語言)

    df.write.mode("overwrite").saveAsTable("main.default.people_10m")
    

    程式語言 Scala

    df.write.mode("overwrite").saveAsTable("main.default.people_10m")
    
    INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
                  更新數據表
    

    您可以更新符合 Delta 數據表中述詞的數據。 例如,在範例people_10m數據表中,若要從 或變更 或 gender 資料行M中的F縮寫,MaleFemale您可以執行下列命令:

    Python(程式語言)

    from delta.tables import *
    from pyspark.sql.functions import *
    deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    # Declare the predicate by using a SQL-formatted string.
    deltaTable.update(
      condition = "gender = 'F'",
      set = { "gender": "'Female'" }
    # Declare the predicate by using Spark SQL functions.
    deltaTable.update(
      condition = col('gender') == 'M',
      set = { 'gender': lit('Male') }
    

    程式語言 Scala

    import io.delta.tables._
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    // Declare the predicate by using a SQL-formatted string.
    deltaTable.updateExpr(
      "gender = 'F'",
      Map("gender" -> "'Female'")
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // Declare the predicate by using Spark SQL functions and implicits.
    deltaTable.update(
      col("gender") === "M",
      Map("gender" -> lit("Male")));
    
    UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
    UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
                  從資料表中刪除
    

    您可以從 Delta 數據表移除符合述詞的數據。 例如,在範例 people_10m 數據表中,若要刪除與資料行中 birthDate 值相對應的所有資料列 1955,您可以執行下列命令:

    Python(程式語言)

    from delta.tables import *
    from pyspark.sql.functions import *
    deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    # Declare the predicate by using a SQL-formatted string.
    deltaTable.delete("birthDate < '1955-01-01'")
    # Declare the predicate by using Spark SQL functions.
    deltaTable.delete(col('birthDate') < '1960-01-01')
    

    程式語言 Scala

    import io.delta.tables._
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    // Declare the predicate by using a SQL-formatted string.
    deltaTable.delete("birthDate < '1955-01-01'")
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // Declare the predicate by using Spark SQL functions and implicits.
    deltaTable.delete(col("birthDate") < "1955-01-01")
    
    DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
    

    刪除會從最新版的 Delta 數據表中移除數據,但在明確清除舊版之前,不會從實體記憶體中移除它。 如需詳細資訊,請參閱 真空

    顯示數據表歷程記錄

    若要檢視數據表的歷程記錄,您可以使用 DeltaTable.historyScala 方法,以及 SQL 中的 DESCRIBE HISTORY 語句,其提供數據表版本、作業、使用者等,以針對每個寫入數據表提供證明資訊。

    Python(程式語言)

    from delta.tables import *
    deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    display(deltaTable.history())
    

    程式語言 Scala

    import io.delta.tables._
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    display(deltaTable.history())
    
    DESCRIBE HISTORY main.default.people_10m
                  查詢舊版的資料表 (時程移動)
    

    Delta Lake 時間移動可讓您查詢差異數據表的較舊快照集。

    若要查詢舊版的數據表,請指定數據表的版本或時間戳。 例如,若要從上述歷程記錄查詢第0版或時間戳 2024-05-15T22:43:15.000+00:00Z ,請使用下列專案:

    Python(程式語言)

    from delta.tables import *
    deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaHistory = deltaTable.history()
    display(deltaHistory.where("version == 0"))
    # Or:
    display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
    

    程式語言 Scala

    import io.delta.tables._
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    val deltaHistory = deltaTable.history()
    display(deltaHistory.where("version == 0"))
    // Or:
    display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
    
    SELECT * FROM main.default.people_10m VERSION AS OF 0
    -- Or:
    SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
    

    對於時間戳,只接受日期或時間戳字串,例如 "2024-05-15T22:43:15.000+00:00""2024-05-15 22:43:15"

    DataFrameReader 選項可讓您從已修正為數據表特定版本或時間戳的 Delta 數據表建立 DataFrame,例如:

    Python(程式語言)

    df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
    # Or:
    df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
    display(df)
    

    程式語言 Scala

    val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
    // Or:
    val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
    display(df)
    
    SELECT * FROM main.default.people_10m VERSION AS OF 0
    -- Or:
    SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
    

    如需詳細資訊,請參閱 使用 Delta Lake 數據表歷程記錄

    優化數據表

    對數據表執行多個變更之後,您可能有許多小型檔案。 若要改善讀取查詢的速度,您可以使用優化作業將小型檔案折疊成較大的檔案:

    Python(程式語言)

    from delta.tables import *
    deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaTable.optimize().executeCompaction()
    

    程式語言 Scala

    import io.delta.tables._
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaTable.optimize().executeCompaction()
    
    OPTIMIZE main.default.people_10m
                  依數據行排序 Z 順序
    

    若要進一步改善讀取效能,您可以依 z 順序將同一組檔案中的相關信息共置。 Delta Lake 數據略過演算法會使用此組合來大幅減少需要讀取的數據量。 若為迭置順序數據,您可以指定要依作業以迭置順序排序的數據行。 例如,若要由 gender共置 ,請執行:

    Python(程式語言)

    from delta.tables import *
    deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaTable.optimize().executeZOrderBy("gender")
    

    程式語言 Scala

    import io.delta.tables._
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaTable.optimize().executeZOrderBy("gender")
    
    OPTIMIZE main.default.people_10m
    ZORDER BY (gender)
    

    如需執行優化作業時可用的一組完整選項,請參閱 優化數據檔配置

    使用清除快照集 VACUUM

    Delta Lake 提供讀取的快照集隔離,這表示即使其他使用者或作業正在查詢數據表,也能安全地執行優化作業。 不過,您最終應該清除舊的快照集。 您可以執行真空作業來執行這項操作:

    Python(程式語言)

    from delta.tables import *
    deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaTable.vacuum()
    

    程式語言 Scala

    import io.delta.tables._
    val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
    deltaTable.vacuum()
    
    VACUUM main.default.people_10m
    

    如需有效使用真空作業的詳細資訊,請參閱 使用真空移除未使用的數據檔。