添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

本文包含 Python 使用者定義函式 (UDF) 範例。 它示範如何註冊 UDF、如何叫用 UDF,並提供 Spark SQL 中子運算式評估順序的注意事項。

在 Databricks Runtime 14.0 和更新版本中,您可以使用 Python 使用者定義數據表函式 (UDF) 來註冊傳回整個關聯而非純量值的函式。 請參閱 Python 使用者定義資料表函式 (UDF)

在 Databricks Runtime 12.2 LTS 和以下版本中,使用標準存取模式的 Unity 目錄計算不支援 Python UDF 和 Pandas UDF。 Databricks Runtime 13.3 LTS 和更新版本支援純量 Python UDF 和 Pandas UDF,適用於所有存取模式。

在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 SQL 語法向 Unity 目錄註冊純量 Python UDF。 請參閱 Unity 目錄中 使用者定義函式 (UDF)

將函式註冊為 UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

您可以選擇性地設定 UDF 的傳回類型。 預設傳回型態為 StringType

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

在 Spark SQL 中呼叫 UDF

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
               使用 UDF 搭配 DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

或者,您可以使用註釋語法來宣告相同的 UDF:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

計算順序與空值檢查

Spark SQL(包括 SQL 和 DataFrame 和數據集 API)不保證子表達式的評估順序。 特別是,運算子或函式的輸入不一定以左至右或任何其他固定順序進行評估。 例如,邏輯 ANDOR 運算式沒有由左至右的「短路」語法。

因此,依賴布林運算式評估的副作用或順序,以及 WHEREHAVING 子句的順序是危險的做法,因為這類運算式和子句可以在查詢最佳化和規劃期間重新排序。 具體來說,如果 UDF 依賴 SQL 中的短路語意進行空值檢查,則無法保證在調用 UDF 之前會進行空值檢查。 例如,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

WHERE 子句不保證在過濾掉 Null 值之後會叫用 strlen UDF。

若要執行適當的 Null 檢查,建議您執行下列其中一項:

  • 將 UDF 本身設計為具備 Null 感知能力,並在 UDF 本身內執行 Null 檢查。
  • 使用 IFCASE WHEN 運算式執行 Null 檢查,並在條件分支中叫用 UDF
  • spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
    spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
    spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok
    

    純量 Python UDF 中的服務認證

    純量 Python UDF 可以使用 Unity 目錄服務認證,安全地存取外部雲端服務。 這適用於將雲端式令牌化、加密或秘密管理等作業直接整合到數據轉換中。

    純量 Python UDF 的服務認證僅支援 SQL 倉儲和一般計算。

    若要建立服務認證,請參閱 建立服務認證

    若要存取服務認證,請使用 databricks.service_credentials.getServiceCredentialsProvider() UDF邏輯中的公用程式,以適當的認證初始化雲端 SDK。 所有程式代碼都必須封裝在UDF主體中。

    def use_service_credential(): from azure.mgmt.web import WebSiteManagementClient # Assuming there is a service credential named 'testcred' set up in Unity Catalog web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred')) # Use web_client to perform operations

    服務憑證權限

    UDF 的建立者必須具有 Unity 目錄服務認證的 ACCESS 許可權。

    在 No-PE 範圍中執行的UDF,也稱為專用叢集,需要服務認證的MANAGE許可權。

    在純量 Python UDF 中使用時,Databricks 會自動使用來自計算環境變數的預設服務認證。 此行為可讓您安全地參考外部服務,而不需在 UDF 程式代碼中明確管理認證別名。 請參閱 指定計算資源的預設服務認證

    默認認證支援僅適用於標準和專用存取模式叢集。 它無法在 DBSQL 中使用。

    您必須安裝azure-identity套件才能使用DefaultAzureCredential提供者。 若要安裝套件,請參閱 筆記本範圍的 Python 連結庫計算範圍連結庫

    def use_service_credential(): from azure.identity import DefaultAzureCredential from azure.mgmt.web import WebSiteManagementClient # DefaultAzureCredential is automatically using the default service credential for the compute web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id) # Use web_client to perform operations

    獲取任務執行上下文

    使用 TaskContext PySpark API 來獲取上下文信息,如使用者身份、叢集標籤、Spark 工作 ID 等。 請參閱在 UDF 中取得任務內容

    下列限制適用於 PySpark UDF:

    檔案存取限制: 在 Databricks Runtime 14.2 和以下版本上,共用叢集上的 PySpark UDF 無法存取 Git 資料夾、工作區檔案或 Unity 目錄磁碟區。

    廣播變數: 標準存取模式叢集和無伺服器計算上的 PySpark UDF 不支持廣播變數。

    服務認證: 服務認證僅適用於 Batch Unity 目錄 Python UDF 和純量 Python UDF。 標準 Unity 目錄 Python UDF 不支持它們。

    服務認證:無伺服器或專用計算不支援服務認證。

    無伺服器記憶體限制:無伺服器計算上的 PySpark UDF 每個 PySpark UDF 的記憶體限制為 1GB。 超過此限制會導致UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT_SERVERLESS 類型的錯誤。 標準存取模式的記憶體限制:標準存取模式上的 PySpark UDF 會根據所選實例類型的可用記憶體,具有記憶體限制。 超過可用的記憶體會導致類型為 UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT 的錯誤。