本文包含 Python 使用者定義函式 (UDF) 範例。 它示範如何註冊 UDF、如何叫用 UDF,並提供 Spark SQL 中子運算式評估順序的注意事項。
在 Databricks Runtime 14.0 和更新版本中,您可以使用 Python 使用者定義數據表函式 (UDF) 來註冊傳回整個關聯而非純量值的函式。 請參閱
Python 使用者定義資料表函式 (UDF)
。
在 Databricks Runtime 12.2 LTS 和以下版本中,使用標準存取模式的 Unity 目錄計算不支援 Python UDF 和 Pandas UDF。 Databricks Runtime 13.3 LTS 和更新版本支援純量 Python UDF 和 Pandas UDF,適用於所有存取模式。
在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 SQL 語法向 Unity 目錄註冊純量 Python UDF。 請參閱 Unity 目錄中
使用者定義函式 (UDF)
。
將函式註冊為 UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
您可以選擇性地設定 UDF 的傳回類型。 預設傳回型態為 StringType。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
在 Spark SQL 中呼叫 UDF
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
使用 UDF 搭配 DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
或者,您可以使用註釋語法來宣告相同的 UDF:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
計算順序與空值檢查
Spark SQL(包括 SQL 和 DataFrame 和數據集 API)不保證子表達式的評估順序。 特別是,運算子或函式的輸入不一定以左至右或任何其他固定順序進行評估。 例如,邏輯 AND 和 OR 運算式沒有由左至右的「短路」語法。
因此,依賴布林運算式評估的副作用或順序,以及 WHERE 和 HAVING 子句的順序是危險的做法,因為這類運算式和子句可以在查詢最佳化和規劃期間重新排序。 具體來說,如果 UDF 依賴 SQL 中的短路語意進行空值檢查,則無法保證在調用 UDF 之前會進行空值檢查。 例如,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
此 WHERE 子句不保證在過濾掉 Null 值之後會叫用 strlen UDF。
若要執行適當的 Null 檢查,建議您執行下列其中一項:
將 UDF 本身設計為具備 Null 感知能力,並在 UDF 本身內執行 Null 檢查。
使用 IF 或 CASE WHEN 運算式執行 Null 檢查,並在條件分支中叫用 UDF
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
純量 Python UDF 中的服務認證
純量 Python UDF 可以使用 Unity 目錄服務認證,安全地存取外部雲端服務。 這適用於將雲端式令牌化、加密或秘密管理等作業直接整合到數據轉換中。
純量 Python UDF 的服務認證僅支援 SQL 倉儲和一般計算。
若要建立服務認證,請參閱 建立服務認證。
若要存取服務認證,請使用 databricks.service_credentials.getServiceCredentialsProvider() UDF邏輯中的公用程式,以適當的認證初始化雲端 SDK。 所有程式代碼都必須封裝在UDF主體中。
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
服務憑證權限
UDF 的建立者必須具有 Unity 目錄服務認證的 ACCESS 許可權。
在 No-PE 範圍中執行的UDF,也稱為專用叢集,需要服務認證的MANAGE許可權。
在純量 Python UDF 中使用時,Databricks 會自動使用來自計算環境變數的預設服務認證。 此行為可讓您安全地參考外部服務,而不需在 UDF 程式代碼中明確管理認證別名。 請參閱 指定計算資源的預設服務認證
默認認證支援僅適用於標準和專用存取模式叢集。 它無法在 DBSQL 中使用。
您必須安裝azure-identity套件才能使用DefaultAzureCredential提供者。 若要安裝套件,請參閱 筆記本範圍的 Python 連結庫 或 計算範圍連結庫。
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
獲取任務執行上下文
使用 TaskContext PySpark API 來獲取上下文信息,如使用者身份、叢集標籤、Spark 工作 ID 等。 請參閱在 UDF 中取得任務內容。
下列限制適用於 PySpark UDF:
檔案存取限制: 在 Databricks Runtime 14.2 和以下版本上,共用叢集上的 PySpark UDF 無法存取 Git 資料夾、工作區檔案或 Unity 目錄磁碟區。
廣播變數: 標準存取模式叢集和無伺服器計算上的 PySpark UDF 不支持廣播變數。
服務認證: 服務認證僅適用於 Batch Unity 目錄 Python UDF 和純量 Python UDF。 標準 Unity 目錄 Python UDF 不支持它們。
服務認證:無伺服器或專用計算不支援服務認證。
無伺服器記憶體限制:無伺服器計算上的 PySpark UDF 每個 PySpark UDF 的記憶體限制為 1GB。 超過此限制會導致UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT_SERVERLESS 類型的錯誤。
標準存取模式的記憶體限制:標準存取模式上的 PySpark UDF 會根據所選實例類型的可用記憶體,具有記憶體限制。 超過可用的記憶體會導致類型為 UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT 的錯誤。