在 Fabric Spark 中配置和管理自动表统计信息

适用于:✅Microsoft Fabric 中的数据工程和数据科学

Microsoft Fabric 中的自动表统计信息通过自动收集详细的表级指标来帮助 Spark 优化查询执行。 这些统计信息包括:

  • 总行计数
  • 每列的null计数
  • 每个列的最小值和最大值
  • 每列独特值计数
  • 平均列长度和最大列长度

默认情况下,为 Fabric 中创建的每个 Delta 表的前 32 列(包括嵌套列)收集这些 扩展统计信息 。 收集的数据有助于 Spark 的基于成本的优化器(CBO)改进查询的规划,以便更好地进行连接、筛选器、聚合和分区修剪。

因此,工作负载可以看到更大的性能提升和减少计算资源的使用,所有这些都不需要手动 ANALYZE TABLE 运行或复杂的配置。

主要优势

本部分总结了自动化表统计信息对工作负荷很重要的原因。

  • 大约 45% 复杂查询的性能更快
  • 在新 Delta 表上自动启用
  • 改进查询规划和降低计算成本
  • 支持不同值计数、最小值/最大值、空值计数和列长度指标
  • 以 Parquet 格式存储以避免增加数据文件体积

工作原理

下面是 Fabric Spark 收集统计信息时幕后发生的情况:

  • 行计数
  • 每列的 Null 计数
  • 每个列的最小值和最大值
  • 每列的唯一值计数
  • 平均列长度和最大列长度

这些指标有助于 Spark 更智能地决定如何执行查询 — 改进联接策略、分区修剪和聚合性能。

如何启用或禁用

了解如何使用 Spark 会话配置或表属性控制自动统计信息收集。

会话配置

可以在会话级别启用或禁用扩展统计信息收集和优化器注入。

启用扩展统计信息收集:

spark.conf.set("spark.microsoft.delta.stats.collect.extended", "true")

禁用扩展统计信息收集:

spark.conf.set("spark.microsoft.delta.stats.collect.extended", "false")

在查询优化器中启用统计信息注入:

spark.conf.set("spark.microsoft.delta.stats.injection.enabled", "true")

禁用统计信息注入:

spark.conf.set("spark.microsoft.delta.stats.injection.enabled", "false")

注释

还必须启用增量日志统计信息收集(spark.databricks.delta.stats.collect默认值:true)。

表属性(覆盖会话配置)

使用表属性可以控制单个表的统计信息收集,替代会话设置。

对表启用:

ALTER TABLE tableName SET TBLPROPERTIES('delta.stats.extended.collect' = true, 'delta.stats.extended.inject' = true)

在表上禁用

ALTER TABLE tableName SET TBLPROPERTIES('delta.stats.extended.collect' = false, 'delta.stats.extended.inject' = false)

在创建时禁用表属性的自动设置:

spark.conf.set("spark.microsoft.delta.stats.collect.extended.property.setAtTableCreation", "false")

如何检查统计信息

可以使用 Spark 的 API 检查收集的表和列统计信息,这对于调试或验证非常有用。

检查行计数和表大小(Scala 示例):

println(spark.read.table("tableName").queryExecution.optimizedPlan.stats)

检查详细列统计信息:

spark.read.table("tableName").queryExecution.optimizedPlan.stats.attributeStats.foreach { case (attrName, colStat) =>
println(s"colName: $attrName distinctCount: ${colStat.distinctCount} min: ${colStat.min} max: ${colStat.max} nullCount: ${colStat.nullCount} avgLen: ${colStat.avgLen} maxLen: ${colStat.maxLen}")
    }

重新计算统计数据

有时统计信息可能会过时或部分,例如架构更改或部分更新之后。 可以使用这些方法重新计算统计信息。

重写表(注意:这会重置历史记录):

spark.read.table("targetTable").write.partitionBy("partCol").mode("overwrite").saveAsTable("targetTable")

建议的方法(Fabric Spark >= 3.2.0.19):

StatisticsStore.recomputeStatisticsWithCompaction(spark, "testTable1")

如果架构发生更改(例如添加/删除列),则需要在重新计算之前删除旧统计信息:

StatisticsStore.removeStatisticsData(spark, "testTable1")
StatisticsStore.recomputeStatisticsWithCompaction(spark, "testTable1")

使用 ANALYZE 命令

此命令 ANALYZE TABLE 提供了一种手动方法,用于收集所有列(类似于开源 Spark)的统计信息。

运行以下命令:

ANALYZE TABLE tableName COMPUTE STATISTICS FOR ALL COLUMNS

启用目录统计信息注入:

spark.conf.set("spark.microsoft.delta.stats.injection.catalog.enabled", "true")

禁用目录统计信息注入:

spark.conf.unset("spark.microsoft.delta.stats.injection.catalog.enabled")

局限性

请务必了解 Fabric 自动化统计信息的当前限制,以便可以相应地进行规划。

  • 仅在写入时收集的统计信息
  • 不会聚合 来自其他引擎 的更新或更改
  • 仅包括前 32 列 (包含嵌套列在内)
  • 删除或更新 可能会使统计信息过时
  • 无需重写表或使用 API 即可重新计算
  • 不可进行嵌套列的统计信息注入
  • 不会导致性能回退 →统计信息偶尔会导致回归
  • ANALYZE TABLE 仅适用于 FOR ALL COLUMNS
  • 列排序或配置更改需要完全重写才能刷新统计信息