此功能在 Databricks Runtime 15.4 LTS 及更高版本中处于公共预览阶段。
启用了类型扩展的表允许将列的数据类型更改为更宽泛的类型,而无需重写基础数据文件。 你可以手动更改列类型,也可以使用架构演变来改进列类型。
Databricks Runtime 15.4 LTS 及更高版本中支持类型扩展。 启用了加宽类型的表只能在 Databricks Runtime 15.4 LTS 及更高版本中读取。
类型扩展需要 Delta Lake。 所有 Unity Catalog 托管表都默认使用 Delta Lake。
支持的类型更改
可以根据以下规则扩展类型:
支持的更广泛类型
若要避免意外将整数值提升为十进制值,必须
手动提交
类型更改,将
byte
、
short
、
int
或
long
变为
decimal
或
double
。 将整数类型提升为
decimal
或
double
时,如果任何下游引入将此值写回整数列,Spark 将默认截断值的小数部分。
将任何数值类型更改为
decimal
时,总精度必须等于或大于起始精度。 如果还增加规模,总体精度也必须相应增加。
byte
、
short
和
int
类型的最小目标为
decimal(10,0)
。
long
的最低目标是
decimal(20,0)
。
如果要将两个小数位数添加到具有
decimal(10,1)
的字段,则最小目标为
decimal(12,3)
。
顶层列和嵌套在结构、映射和数组中的字段支持类型更改。
启用类型扩展
你可以通过将
delta.enableTypeWidening
表属性设置为
true
,在现有表上启用类型扩展:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
你还可以在创建表期间启用类型扩展:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
启用类型扩展时,它将设置表功能 typeWidening
,这将升级读取器和编写器协议。 必须使用 Databricks Runtime 15.4 或更高版本才能与已启用类型扩大的表进行交互。 如果外部客户端也与表交互,请验证它们是否支持此表功能。 请参阅 Delta Lake 功能兼容性和协议。
手动应用类型更改
使用 ALTER COLUMN
命令手动更改类型:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
此操作可在不重写基础数据文件的情况下更新表架构。
使用自动架构演变扩展类型
架构演变适用于类型扩展以更新目标表中的数据类型以匹配传入数据的类型。
如果未启用类型扩大,架构演变始终会尝试向下转换数据以匹配目标表中的列类型。 如果不希望自动扩展目标表中的数据类型,请在启用架构演变的情况下在运行工作负荷之前禁用类型扩展。
若要在引入期间使用架构演变来扩大列的数据类型,必须满足以下条件:
写入命令在启用自动架构演变的情况下运行。
目标表已启用类型扩展。
源列类型比目标列类型宽泛。
类型扩展支持类型更改。
类型更改不是byte
、short
、int
或long
变为decimal
或double
中的一种。 这些类型更改只能手动 ALTER TABLE 应用,以避免意外将整数提升为小数。
不满足所有这些条件的类型不匹配遵循正常的架构强制规则。 请参阅架构强制。
禁用类型扩展表功能
通过将属性设置为 false
,可以阻止启用的表上发生意外类型扩展:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
此设置可防止将来对表进行类型更改,但不会删除类型扩展表功能或撤消已更改的类型。
如果需要完全删除类型扩展表功能,可以使用 DROP FEATURE
命令,如以下示例所示:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
使用 Databricks Runtime 15.4 LTS 启用了类型扩大的数据库表,需要转而删除功能 typeWidening-preview
。
当删除类型扩展时,所有不符合当前表架构的数据文件都会被重写。 请参阅删除 Delta Lake 表功能并降级表协议。
从 Delta 表进行流式传输
在 Databricks Runtime 16.3 及更高版本中,支持结构化流处理的类型扩展。
从 Delta 表流式传输时,类型更改被视为非累加架构更改,类似于重命名或删除具有 列映射的列。
可以提供一个用于架构跟踪的位置,以便在应用类型更改后实现对 Delta Lake 表的流式处理。
针对数据源的每个流式读取都必须指定自己的 schemaTrackingLocation
。 指定的schemaTrackingLocation
必须包含在为目标表checkpointLocation
流式写入指定的目录中。
对于合并来自多个源 Delta 表的数据的流式处理工作负荷,必须在每个源表中指定唯一目录 checkpointLocation
。
选项 schemaTrackingLocation
用于指定架构跟踪的路径,如以下代码示例所示:
Python语言
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
Scala(编程语言)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
提供架构跟踪位置后,只要检测到类型更改,流就会演变其跟踪的架构,然后停止。 此时,可以采取任何必要作来处理类型更改,例如在下游表上启用类型扩大或更新流式处理查询。
若要恢复处理,请设置 Spark 配置 spark.databricks.delta.streaming.allowSourceColumnTypeChange
或 DataFrame 读取器选项 allowSourceColumnTypeChange
:
Python语言
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
Scala(编程语言)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
流停止时,检查点 ID <checkpoint_id>
和 Delta Lake 源表版本 <delta_source_table_version>
会显示在错误消息中。
Delta共享
Databricks Runtime 16.1及更高版本支持Type Widening在Delta Sharing中的应用。
在 Databricks 的 Delta Sharing 中,支持共享启用了类型扩展的 Delta Lake 表。 提供程序和收件人必须位于 Databricks Runtime 16.1 或更高版本上。
要使用 Delta Sharing 从启用了类型扩展的 Delta Lake 表中读取变更数据馈送,必须将响应格式设置为 delta
:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
不支持在类型更改过程中读取更改数据流。 您必须将操作拆分为两个单独的读取,一个在包含类型更改的表版本结束,另一个从包含类型更改的表版本开始。
Apache Iceberg 兼容性
Apache Iceberg 不支持类型扩大涵盖的所有类型更改,请参阅 Iceberg 架构演变。 具体而言,Azure Databricks 不支持以下类型更改:
byte
、short
、int
、long
到 decimal
或 double
小数位数增加
date
至 timestampNTZ
在 Delta Lake 表上启用 UniForm 与 Iceberg 兼容性 时,应用其中一种类型更改会导致错误。
如果将其中一个不受支持的类型更改应用到 Delta Lake 表,则对表启用 Uniform 与 Iceberg 兼容性 会导致错误。
若要解决此错误,必须 删除类型扩大表功能。
不支持从具有类型更改的 Delta Lake 表流式传输时使用 SQL 提供架构跟踪位置。
不支持使用启用类型扩展的 Delta Sharing 与非 Databricks 消费者共享表。