适用于:✅Microsoft Fabric 中的数据工程和数据科学
Microsoft Fabric 中的自动表统计信息通过自动收集详细的表级指标来帮助 Spark 优化查询执行。 这些统计信息包括:
- 总行计数
- 每列的null计数
- 每个列的最小值和最大值
- 每列独特值计数
- 平均列长度和最大列长度
默认情况下,为 Fabric 中创建的每个 Delta 表的前 32 列(包括嵌套列)收集这些 扩展统计信息 。 收集的数据有助于 Spark 的基于成本的优化器(CBO)改进查询的规划,以便更好地进行连接、筛选器、聚合和分区修剪。
因此,工作负载可以看到更大的性能提升和减少计算资源的使用,所有这些都不需要手动 ANALYZE TABLE 运行或复杂的配置。
主要优势
本部分总结了自动化表统计信息对工作负荷很重要的原因。
- 大约 45% 复杂查询的性能更快
- 在新 Delta 表上自动启用
- 改进查询规划和降低计算成本
- 支持不同值计数、最小值/最大值、空值计数和列长度指标
- 以 Parquet 格式存储以避免增加数据文件体积
工作原理
下面是 Fabric Spark 收集统计信息时幕后发生的情况:
- 行计数
- 每列的 Null 计数
- 每个列的最小值和最大值
- 每列的唯一值计数
- 平均列长度和最大列长度
这些指标有助于 Spark 更智能地决定如何执行查询 — 改进联接策略、分区修剪和聚合性能。
如何启用或禁用
了解如何使用 Spark 会话配置或表属性控制自动统计信息收集。
会话配置
可以在会话级别启用或禁用扩展统计信息收集和优化器注入。
启用扩展统计信息收集:
spark.conf.set("spark.microsoft.delta.stats.collect.extended", "true")
禁用扩展统计信息收集:
spark.conf.set("spark.microsoft.delta.stats.collect.extended", "false")
在查询优化器中启用统计信息注入:
spark.conf.set("spark.microsoft.delta.stats.injection.enabled", "true")
禁用统计信息注入:
spark.conf.set("spark.microsoft.delta.stats.injection.enabled", "false")
注释
还必须启用增量日志统计信息收集(spark.databricks.delta.stats.collect默认值:true)。
表属性(覆盖会话配置)
使用表属性可以控制单个表的统计信息收集,替代会话设置。
对表启用:
ALTER TABLE tableName SET TBLPROPERTIES('delta.stats.extended.collect' = true, 'delta.stats.extended.inject' = true)
在表上禁用
ALTER TABLE tableName SET TBLPROPERTIES('delta.stats.extended.collect' = false, 'delta.stats.extended.inject' = false)
在创建时禁用表属性的自动设置:
spark.conf.set("spark.microsoft.delta.stats.collect.extended.property.setAtTableCreation", "false")
如何检查统计信息
可以使用 Spark 的 API 检查收集的表和列统计信息,这对于调试或验证非常有用。
检查行计数和表大小(Scala 示例):
println(spark.read.table("tableName").queryExecution.optimizedPlan.stats)
检查详细列统计信息:
spark.read.table("tableName").queryExecution.optimizedPlan.stats.attributeStats.foreach { case (attrName, colStat) =>
println(s"colName: $attrName distinctCount: ${colStat.distinctCount} min: ${colStat.min} max: ${colStat.max} nullCount: ${colStat.nullCount} avgLen: ${colStat.avgLen} maxLen: ${colStat.maxLen}")
}
重新计算统计数据
有时统计信息可能会过时或部分,例如架构更改或部分更新之后。 可以使用这些方法重新计算统计信息。
重写表(注意:这会重置历史记录):
spark.read.table("targetTable").write.partitionBy("partCol").mode("overwrite").saveAsTable("targetTable")
建议的方法(Fabric Spark >= 3.2.0.19):
StatisticsStore.recomputeStatisticsWithCompaction(spark, "testTable1")
如果架构发生更改(例如添加/删除列),则需要在重新计算之前删除旧统计信息:
StatisticsStore.removeStatisticsData(spark, "testTable1")
StatisticsStore.recomputeStatisticsWithCompaction(spark, "testTable1")
使用 ANALYZE 命令
此命令 ANALYZE TABLE 提供了一种手动方法,用于收集所有列(类似于开源 Spark)的统计信息。
运行以下命令:
ANALYZE TABLE tableName COMPUTE STATISTICS FOR ALL COLUMNS
启用目录统计信息注入:
spark.conf.set("spark.microsoft.delta.stats.injection.catalog.enabled", "true")
禁用目录统计信息注入:
spark.conf.unset("spark.microsoft.delta.stats.injection.catalog.enabled")
局限性
请务必了解 Fabric 自动化统计信息的当前限制,以便可以相应地进行规划。
- 仅在写入时收集的统计信息
- 不会聚合 来自其他引擎 的更新或更改
- 仅包括前 32 列 (包含嵌套列在内)
- 删除或更新 可能会使统计信息过时
- 无需重写表或使用 API 即可重新计算
- 不可进行嵌套列的统计信息注入
- 不会导致性能回退 →统计信息偶尔会导致回归
-
ANALYZE TABLE仅适用于FOR ALL COLUMNS - 列排序或配置更改需要完全重写才能刷新统计信息