了解 Direct Lake 查询性能

除了语义模型设计和查询复杂性外,Direct Lake 性能还特别依赖于优化良好的 Delta 表,以实现高效且快速的列加载(转码)和最佳查询执行。 请确保应用 V 顺序优化。 此外,请保持 Parquet 文件的数量较少,使用大型行组,并努力尽量减少数据更新对 Delta 日志的影响。 这些常见的最佳实践有助于确保在冷、半暖、暖和热状态的 Direct Lake 模式下快速执行查询。

本文介绍 Direct Lake 性能如何取决于 Delta 表运行状况和高效的数据更新。 了解这些依赖项至关重要。 你了解 Parquet 文件中的数据布局对于查询性能非常重要,就像良好的语义模型设计和优化数据分析表达式 (DAX) 度量值一样重要。

需要了解的内容

本文假设你已熟悉以下概念:

  • OneLake 中的 Delta 表:有关 Delta 表的详细信息,请参阅 Microsoft Fabric 基础知识文档
  • Parquet 文件、行组和 Delta 日志:Parquet 文件格式(包括如何将数据组织到行组和列区块中,以及如何处理元数据)在 Parquet 文件格式文档中介绍。 另请参阅 Delta 事务日志协议文档
  • Delta 表优化和 V 顺序:请参阅 Fabric Lakehouse 文档,了解 Delta 表维护,例如 Delta Lake 表优化和 V 顺序
  • 建模和转码:刷新 Direct Lake 语义模型(建模)和按需列数据加载(转码)是重要的概念,在 Direct Lake 概述中介绍。
  • 公式和存储引擎:执行 DAX 查询时,公式引擎将生成查询计划,并从存储引擎检索必要的数据和初始聚合。 本文中讨论的优化侧重于存储引擎。 若要了解有关公式引擎和存储引擎的详细信息,请浏览 Analysis Services 开发人员文档
  • VertiPaq 和 VertiScan:在导入模式和 Direct Lake 模式下,存储引擎使用其 VertiPaq 引擎来维护内存中列存储。 VertiScan 使公式引擎能够与 VertiPaq 交互。 有关详细信息,请参阅 Analysis Services 开发人员文档
  • 字典编码:Parquet 文件和 VertiPaq 都使用字典编码,这是应用于各种数据类型的各个列(如 int、long、date 和 char)的数据压缩技术。 它的工作原理是使用运行长度编码(RLE)/Bit-Packing 混合编码将每个唯一列值存储为整数。 VertiPaq 始终使用字典编码,但 Parquet 在某些情况下可能会切换到纯编码或增量编码,如 Parquet File-Format 文档中的编码中所述,这将要求 Direct Lake 重新编码数据,对转码性能产生相应影响。
  • 列块和列段:请参阅 Parquet 和 VertiPaq 存储列数据的方式,以便高效检索数据。 表中的每一列划分为可以独立处理和压缩的较小区块。 VertiPaq 称这些区块为段。 可以使用 DISCOVER_STORAGE_TABLE_COLUMN_SEGMENTS架构行集 来检索有关 Direct Lake 语义模型中的列段的信息。
  • Python 和 Jupyter Notebook:Jupyter Notebook 提供了用于编写和运行 Python 代码的交互式环境。 如果想要遵循本章后面的代码片段,则 Python 的基础知识非常有用。 有关详细信息,请参阅 Python 语言参考。 有关如何在 Microsoft Fabric 中使用笔记本的信息,请参阅 如何使用笔记本 - Microsoft Fabric

影响 Direct Lake 查询性能的内容

本部分总结了影响 Direct Lake 性能的主要因素。 后续部分提供了更详细的说明:

  • V 顺序压缩:压缩的有效性可能会影响查询性能,因为更好的压缩会导致数据加载更快,查询处理效率更高。 数据加载速度很快,因为流式处理压缩数据可以提高转码效率。 查询性能也是最佳的,因为 V 顺序压缩使 VertiScan 能够直接在压缩数据的基础上计算结果,从而跳过解压缩步骤。
  • 数据类型:对列使用适当的数据类型可以提高压缩和性能。 例如,尽可能使用整数数据类型而不是字符串,并避免将整数存储为字符串。
  • 段大小和计数:VertiPaq 将列数据存储在段中。 大量较小的段可能会对查询性能产生负面影响。 对于大型表,Direct Lake 首选大型段大小,例如 100 万到 1600 万行。
  • 列基数:高基数列(具有许多唯一值的列)可能会降低性能。 尽可能地减少基数将有所帮助。
  • 索引和聚合:具有较低基数的列受益于字典编码,这可以通过减少需要扫描的数据量来加快查询速度。
  • DirectQuery 回退:回退操作可能会导致查询性能降低,因为数据现在必须从 Fabric 数据源中的 SQL 分析终结点提取。 此外,即使 Direct Lake 不需要回退,回退机制仍然依赖混合查询计划来同时支持 DirectQuery 和 VertiScan,这可能会导致性能上的一些折中。 如果可能,请禁用 DirectQuery 回退以避免混合查询计划。
  • 内存驻留程度:Direct Lake 语义模型可以处于冷、半暖、暖或热状态,性能从冷到热逐步提高。 快速从冷向暖转换是良好的 Direct Lake 性能的关键。
    • :VertiPaq 存储为空。 必须从 Delta 表加载应答 DAX 查询所需的所有数据。
    • Semiwarm:Direct Lake 仅在数据框架涉及已删除的行组时删除相应的列段。 这意味着必须仅加载更新或新添加的数据。 Direct Lake 语义模型在内存压力下,当必须卸载段和联接索引时,也可能因内存压力而进入半温状态。
    • 温暖状态:用以回答 DAX 查询的列数据已完全加载到内存中。
    • :列数据已完全加载到内存中,VertiScan 缓存已填充,并且 DAX 查询已命中缓存。
  • 内存压力:Direct Lake 必须将响应 DAX 查询所需的所有列数据加载到内存中,这可能会耗尽可用的内存资源。 内存不足时,Direct Lake 必须卸载以前加载的列数据,然后 Direct Lake 可能需要再次重新加载该数据才能进行后续 DAX 查询。 适当调整 Direct Lake 语义模型的大小有助于避免频繁重新加载。

内存驻留和查询性能

Direct Lake 在暖或热状态中表现最佳,而冷状态会导致性能降低。 Direct Lake 通过使用增量框架,尽量避免回退到冷数据状态。

启动

初始语义模型加载后,尚未将列数据驻留在内存中。 Direct Lake 很冷。 当客户端以冷状态将 DAX 查询提交到 Direct Lake 语义模型时,Direct Lake 必须执行以下主要任务,以便处理和回答 DAX 查询:

  • VertiPaq 字典转码。 Direct Lake 必须合并每个列区块的本地 Parquet 字典,才能为该列创建全局 VertiPaq 字典。 此 合并 作会影响查询响应时间。

  • 将 Parquet 列区块加载到列段。 在大多数情况下,当双方都可以使用 RLE/Bit-Packing 混合编码时,会直接将 Parquet 数据 ID 映射到 VertiPaq ID。 如果 Parquet 字典使用纯编码,VertiPaq 必须将值转换为 RLE/Bit-Packing 混合编码,这需要更长的时间。

    • Direct Lake 性能在 V 排序的 Parquet 文件中是最佳的,因为 V 排序提高了 RLE 压缩的质量。 Direct Lake 可以比压缩程度较低的数据更快加载紧密打包的 V 排序数据。
  • 生成联接索引。 如果 DAX 查询访问来自多个表的列,Direct Lake 必须根据表关系生成联接索引,以便 VertiScan 能够正确联接表。 若要生成联接索引,Direct Lake 必须加载参与关系的键列的字典和主键列的列段(即表关系中“一”端的列)。

  • 应用增量删除向量。 如果源 Delta 表使用删除向量,Direct Lake 必须加载这些删除向量,以确保从查询处理中排除已删除的数据。

    注意

    可以通过先发送processClear,然后发送processFull XMLA 命令到模型,以引发冷状态。 该 ProcessClear 命令删除所有数据以及与指定 Delta 表版本的关联。 ProcessFull XMLA 命令执行构建框架,将模型绑定到最新的可用 Delta 提交版本。

增量构帧

在框架期间,Direct Lake 分析每个 Delta 表的 Delta 日志,仅当基础数据发生更改时,才会删除加载的列段和联接索引。 保留字典以避免不必要的转码,新值只是添加到现有字典中。 这种增量框架方法可降低重载负担,并有利于冷查询性能。

可以使用封装架构行集的 INFO.STORAGETABLECOLUMNSEGMENTS() DAX 函数来分析增量框架的有效性。 按照以下步骤确保有意义的结果:

  1. 查询 Direct Lake 语义模型,以确保其处于温态或热态。
  2. 更新要调查的 Delta 表并刷新 Direct Lake 语义模型以执行框架。
  3. 运行 DAX 查询以使用 INFO.STORAGETABLECOLUMNSEGMENTS() 函数检索列段信息,如以下屏幕截图所示。 屏幕截图使用小型示例表进行说明。 每个列只有一个段。 段不驻留在内存中。 这表示真正的冷状态。 如果模型在框架处理前已预热,这意味着 Delta 表是使用破坏性数据加载模式更新的,因此无法使用增量式框架处理。 本文稍后将介绍 Delta 表更新模式。

屏幕截图显示了在 Direct Lake 语义模型中使用 INFO.STORAGETABLECOLUMNSEGMENTS 的 DAX 查询结果,突出显示列段驻留情况。

注意

当 Delta 表未收到更新时,对于已驻留在内存中的列,无需重新加载。 使用无损更新模式时,由于增量框架本质上使得 Direct Lake 能够就地更新现有内存中数据的大部分,因而查询在处理后显示的性能影响要小得多。

完整内存驻留

加载字典、列段和联接索引后,Direct Lake 将达到暖状态,查询性能与导入模式相同。 在这两种模式下,列段的数量和大小在优化查询性能方面起着重要作用。

Delta表差异

Parquet 文件按列而不是行整理数据。 Direct Lake 还按列组织数据。 这种协调有助于实现无缝集成,但仍存在一些重要差异,特别是涉及行组和字典。

行组与列段

Parquet 文件中的行组由列区块组成,每个区块包含特定列的数据。 另一方面,语义模型中的列段还包含列数据的区块。

Delta 表的总行组计数与相应语义模型表的每一列的段计数之间存在直接关系。 例如,如果所有当前 Parquet 文件中的 Delta 表总共有三个行组,则相应的语义模型表每个列有三个段,如下图所示。 换句话说,如果 Delta 表具有大量小型行组,则相应的语义模型表也将具有大量小型列段。 这会对查询性能产生负面影响。

显示 Direct Lake 中 Delta 表行组与语义模型列段之间的关系的关系图。

注意

由于 Direct Lake 首选大型列段,因此源 Delta 表的行组应该尽可能大。

本地字典与全局字典

Delta 表的总行组计数也直接影响字典转码性能,因为 Parquet 文件使用本地字典,而 Direct Lake 语义模型对每列使用全局字典,如下图所示。 行组数越多,Direct Lake 需要合并的本地字典数也越多,以创建全局字典,因此转码完成所需的时间会更长。

图表展示将本地 Parquet 字典合并到全局字典用于 Direct Lake 语义模型的过程。

Delta 表更新模式

用于将数据引入 Delta 表的方法可以极大地影响增量框架效率。 例如,在将数据加载到现有表中时使用 Overwrite 选项会擦除每次加载的 Delta 日志。 这意味着 Direct Lake 不能使用增量框架,并且必须重新加载所有数据、字典和联接索引。 这种破坏性更新模式会对查询性能产生负面影响。

显示 Direct Lake 中 Delta 表的数据引入和更新模式的关系图。

本部分介绍 Delta 表更新模式,这些模式使 Direct Lake 能够使用增量框架、保留 VertiPaq 列存储元素(如字典、列段和联接索引)以最大程度地提高转码效率并提高冷查询性能。

不分区的批处理

此更新模式按计划间隔(例如每周或每月)以大型批次收集和处理数据。 新数据到达时,旧数据通常以滚动或滑动窗口的方式删除,以保持表大小处于控制状态。 但是,如果数据分布在大多数 Parquet 文件中,则删除旧数据可能是一项挑战。 例如,删除 30 天内的一天可能会影响 Parquet 文件的 95 个%,而不是 5 个%。 在这种情况下,即使是相对较小的删除操作,Direct Lake 也必须重新加载 95% 的数据。 相同的问题也适用于现有行的更新,因为更新是合并删除和追加的。 可以使用 Delta Analyzer 分析 删除更新 作的效果,如本文后面部分所述。

使用分区进行批处理

Delta 表分区有助于减少删除操作的影响,因为表根据分区列中的唯一值划分为存储于文件夹中的较小的 Parquet 文件。 常用的分区列包括日期、区域或其他维度类别。 在之前的例子中,在 30 天中移除一天时,按日期分区的 Delta 表会将删除操作限制为仅针对该日期分区内的 Parquet 文件。 但是,请务必注意,广泛的分区可能会导致 Parquet 文件和行组的数量大幅增加,从而导致 Direct Lake 语义模型中的列段过多增加,从而对查询性能产生负面影响。 选择低基数分区列对于查询性能至关重要。 最佳做法是,列应不超过 100-200 个不同值。

增量加载

通过增量加载,更新过程只会将新数据插入 Delta 表中,而不会影响现有的 Parquet 文件和行组。 没有删除。 Direct Lake 可以增量加载新数据,而无需放弃和重新加载现有的 VertiPaq 列存储元素。 此方法适用于 Direct Lake 增量框架。 不需要增量表分区。

流处理

在数据到达时,几乎实时处理数据可能会导致小型 Parquet 文件和行组激增,从而对 Direct Lake 性能产生负面影响。 与增量加载模式一样,不需要对 Delta 表进行分区。 但是,频繁的表维护对于确保 Parquet 文件和行组的数量保留在 Direct Lake 概述文章中指定的防护栏限制内至关重要。 换句话说,不要忘记定期运行 Spark Optimize,例如每日甚至更频繁。 下一部分将再次介绍 Spark 优化。

注意

实际实时分析最好使用 Eventstreams、KQL 数据库和 Eventhouse 来实现。 有关指南,请参阅 Microsoft Fabric 中的Real-Time Intelligence 文档

Delta表维护

关键维护任务包括清扫和优化 Delta 表。 若要自动执行维护操作,可以使用 Lakehouse API,如 使用 Microsoft Fabric REST API 管理 Lakehouse 文档中所述。

吸 尘

清理会删除在当前 Delta 提交版本中不再包含且早于设定保留阈值的 Parquet 文件。 删除这些 Parquet 文件不会影响 Direct Lake 性能,因为 Direct Lake 仅加载当前提交版本中的 Parquet 文件。 如果使用默认值每天运行 VACUUM,则保留过去七天的 Delta 提交版本以供时间旅行。

重要

由于封装 Direct Lake 语义模型引用特定的 Delta 提交版本,因此必须确保 Delta 表保留该版本,直至您再次刷新(更新)模型以将其切换到当前版本。 否则,当 Direct Lake 语义模型尝试访问不再存在的 Parquet 文件时,用户会遇到查询错误。

Spark 优化

增量表优化将多个小型 Parquet 文件合并为较少的大型文件。 由于这可能会影响 Direct Lake 的性能,因此建议将优化频率降至较低,例如在周末或月底进行优化。 如果小型 Parquet 文件因高频小更新快速累积,为确保 Delta 表保持在限制范围内,应更频繁地进行优化。

分区有助于最大程度地减少对增量帧的优化影响,因为分区有效地并置了数据。 例如,基于低基数的 date_key 列对大型 Delta 表进行分区,会将每周的维护工作限制为最多 7 个分区。 Delta 表将保留大多数现有的 Parquet 文件。 Direct Lake 只需要重新加载七天的数据。

分析 Delta 表更新

使用 Delta 分析器或类似工具来研究 Delta 表更新如何影响 Parquet 文件和行组。 Delta Analyzer 允许跟踪 Parquet 文件、行组、列区块和列的演变,以响应 追加更新删除 操作。 Delta Analyzer 作为 独立的 Jupyter Notebook 提供。 它在 语义链接实验室库中也可用。 以下部分使用Semantic-Link-Labs。 这个库可以通过%pip install semantic-link-labs命令轻松安装在笔记本中。

行组大小

Direct Lake 语义模型的理想行组大小介于 100 万到 1600 万行之间,但如果数据可压缩,Fabric 可能会对大型表使用更大的行组大小。 通常,不建议更改默认行组大小。 最理想的是让 Fabric 负责管理 Delta 表的布局。 但仔细检查也是个好主意。

以下 Python 代码可以用作在 Fabric 笔记本连接的 Lakehouse 中分析 Delta 表的行组大小和其他详细信息的起点。 下表显示了包含 10 亿行的示例表的输出。

import sempy_labs as labs
from IPython.display import HTML
from IPython.display import clear_output

table_name = "<Provide your table name>"

# Load the Delta table and run Delta Analyzer
df = spark.read.format("delta").load(f"Tables/{table_name}")
da_results = labs.delta_analyzer(table_name)

# Display the table summary in an HTML table.
clear_output()

df1 = da_results['Summary'].iloc[0]

html_table = "<table border='1'><tr><th>Column Name</th><th>{table_name}</th></tr>"
for column in da_results['Summary'].columns:
    html_table += f"<tr><td>{column}</td><td>{df1[column]}</td></tr>"
html_table += "</table>"

display(HTML(html_table))

输出

参数 价值
表名称 sales_1
行计数 1000000000
行组 24
Parquet 文件 8
每行组的最大行数 51210000
每行组的最小行数 22580000
每行组平均行数 41666666.666666664
已启用 VOrder 真 实
总大小 7700808430
时间戳 2025-03-24 03:01:02.794979

Delta 分析器摘要显示大约 4000 万行的平均行组大小。 这大于建议的最大行组大小 1600 万行。 幸运的是,较大的行组大小不会对 Direct Lake 造成重大问题。 较大的行组可简化连续分段作业,并且存储引擎中的开销最小。 相反,小行组(明显低于 100 万行)可能会导致性能问题。

在前面的示例中,更重要的是 Fabric 将行组分布在 8 个 Parquet 文件中。 这与 Fabric 容量上的核心数量保持一致,从而支持高效的并行读取操作。 同样重要的是,单个行组大小不会偏离平均值太远。 大幅变化可能会导致不均衡的 VertiScan 负载,从而降低查询性能。

滚动窗口更新

出于说明目的,以下 Python 代码示例模拟滚动窗口更新。 该代码从示例 Delta 表中删除具有最早 DateID 的行。 然后,它会更新这些行的 DateID,并将其重新插入到示例表中,作为最近的行。

from pyspark.sql.functions import lit

table_name = "<Provide your table name>"
table_df = spark.read.format("delta").load(f"Tables/{table_name}")

# Get the rows of the oldest DateID.
rows_df = table_df[table_df["DateID"] == 20200101]
rows_df = spark.createDataFrame(rows_df.rdd, schema=rows_df.schema)

# Delete these rows from the table
table_df.createOrReplaceTempView(f"{table_name}_view")
spark.sql(f"DELETE From {table_name}_view WHERE DateID = '20200101'")

# Update the DateID and append the rows as new data
rows_df = rows_df.withColumn("DateID", lit(20250101))
rows_df.write.format("delta").mode("append").save(f"Tables/{table_name}")

get_delta_table_history 语义链接实验室库中的函数有助于分析此滚动窗口更新的效果。 请参阅以下 Python 代码示例。 另请参阅代码片段后面的输出表。

import sempy_labs as labs
from IPython.display import HTML
from IPython.display import clear_output

table_name = "<Provide your table name>"
da_results = labs.get_delta_table_history(table_name)

# Create a single HTML table for specified columns
html_table = "<table border='1'>"
# Add data rows for specified columns
for index, row in da_results.iterrows():
    for column in ['Version', 'Operation', 'Operation Parameters', 'Operation Metrics']:
        if column == 'Version':
            html_table += f"<tr><td><b>Version</b></td><td><b>{row[column]}</b></td></tr>"
        else:
            html_table += f"<tr><td>{column}</td><td>{row[column]}</td></tr>"
html_table += "</table>"

# Display the HTML table
display(HTML(html_table))

输出

版本 DESCRIPTION 价值
2 运算 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'}
运营指标 {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4103076'}
1 运算 删除
操作参数 {'predicate': '[“(DateID#3910 = 20200101)”]'}
操作指标 {'numRemovedFiles': '8', 'numRemovedBytes': '7700855198', 'numCopiedRows': '999451335', 'numDeletionVectorsAdded': '0', 'numDeletionVectorsRemoved': '0', 'numAddedChangeFiles': '0', 'executionTimeMs': '123446', 'numDeletionVectorsUpdated': '0', 'numDeletedRows': '548665', 'scanTimeMs': '4820', 'numAddedFiles': '18', 'numAddedBytes': '7696900084', 'rewriteTimeMs': '198625'}
0 运算 写入
操作参数 {'mode': 'Overwrite', 'partitionBy': '[]'}
运营指标 {'numFiles': '8', 'numOutputRows': '100000000000', 'numOutputBytes': '7700892169'}

上面的 Delta 分析器历史记录显示,此 Delta 表现在具有以下三个版本:

  • 版本 0:这是原始版本,包含 8 个 Parquet 文件和 24 行组,如上一部分所述。
  • 版本 1:此版本反映 删除 操作。 尽管从包含五年销售记录的示例表中只删除了一天的数据(DateID = '20200101'),但所有 8 个 Parquet 文件都受到了影响。 在操作指标中,numRemovedFiles 为 8 个 [Parquet 文件],而 numAddedFiles 为 18 个 [Parquet 文件]。 这意味着删除操作将原始的 8 个 Parquet 文件替换为 18 个新的 Parquet 文件。
  • 版本 3:操作指标显示了向 Delta 表添加了一 个新的包含 548,665 行的 Parquet 文件。

滚动窗口更新后,最新的 Delta 提交版本包括 19 个 Parquet 文件和 21 行组,其大小范围为 50 万到 5000 万行。 548,665 行的滚动窗口更新影响了 10 亿行的整个 Delta 表。 它替换了所有 Parquet 文件和行组。 在这种情况下,增量框架无法提高冷性能,行组大小的变化不太可能有利于暖性能。

Delta表更新

以下 Python 代码以与上一部分所述的基本相同方式更新 Delta 表。 在表面上,更新函数仅更改与给定 DateID 匹配的现有行的 DateID 值。 但是,Parquet 文件是不可变的,无法修改。 在后台,更新操作会删除现有的 Parquet 文件并添加新的 Parquet 文件。 结果和效果与滚动窗口更新的结果和效果相同。

from pyspark.sql.functions import col, when
from delta.tables import DeltaTable

# Load the Delta table
table_name = "<Provide your table name>"
delta_table = DeltaTable.forPath(spark, f"Tables/{table_name}")

# Define the condition and the column to update
condition = col("DateID") == 20200101
column_name = "DateID"
new_value = 20250101

# Update the DateID column based on the condition
delta_table.update(
    condition,
    {column_name: when(condition, new_value).otherwise(col(column_name))}
)

分区滚动窗口更新

分区有助于减少表更新的影响。 使用日期键可能很诱人,但快速基数检查可能会显示这不是最佳选择。 例如,到目前为止讨论的示例表包含过去五年的销售额交易,相当于大约 1800 个不同的日期值。 这种基数太高了。 分区列应少于 200 个不同的值。

column_name = 'DateID'
table_name = "<Provide your table name>"
table_df = spark.read.format("delta").load(f"Tables/{table_name}")

distinct_count = table_df.select(column_name).distinct().count()
print(f"The '{column_name}' column has {distinct_count} distinct values.")

if distinct_count <= 200:
    print(f"The '{column_name}' column is a good partitioning candidate.")
    table_df.write.format("delta").partitionBy(column_name).save(f"Tables/{table_name}_by_date_id")
    print(f"Table '{table_name}_by_date_id' partitioned and saved successfully.")
else:   
    print(f"The cardinality of the '{column_name}' column is possibly too high.")

输出

The 'DateID' column has 1825 distinct values.
The cardinality of the 'DateID' column is possibly too high.

如果没有合适的分区列,可以通过减少现有列的基数来人为创建它。 以下 Python 代码通过删除 DateID 的最后两位数字来添加 Month 列。 这将生成 60 个不同的值。 然后,示例代码保存按 Month 列分区的 Delta 表。

from pyspark.sql.functions import col, expr

column_name = 'DateID'
table_name = "sales_1"
table_df = spark.read.format("delta").load(f"Tables/{table_name}")

partition_column = 'Month'
partitioned_table = f"{table_name}_by_month"
table_df = table_df.withColumn(partition_column, expr(f"int({column_name} / 100)"))

distinct_count = table_df.select(partition_column).distinct().count()
print(f"The '{partition_column}' column has {distinct_count} distinct values.")

if distinct_count <= 200:
    print(f"The '{partition_column}' column is a good partitioning candidate.")
    table_df.write.format("delta").partitionBy(partition_column).save(f"Tables/{partitioned_table}")
    print(f"Table '{partitioned_table}' partitioned and saved successfully.")
else:   
    print(f"The cardinality of the '{partition_column}' column is possibly too high.")

输出

The 'Month' column has 60 distinct values.
The 'Month' column is a good partitioning candidate.
Table 'sales_1_by_month' partitioned and saved successfully.

Delta 分析器摘要现在显示 Delta 表布局与 Direct Lake 完全一致。 平均行组大小约为 1600 万行,行组大小的平均绝对偏差,因此段大小小于 100 万行。

参数 价值
表名称 按月销售_1
行计数 1000000000
行组 六十
Parquet 文件 六十
每行组的最大行数 16997436
每行组的最小行数 15339311
每行组平均行数 16666666.666666666
已启用 VOrder 真 实
总大小 7447946016
时间戳 2025-03-24 03:01:02.794979

在对分区示例表进行滑动窗口更新后,Delta Analyzer 历史记录显示仅影响一个 Parquet 文件。 请参阅以下输出表。 版本 2 共有 16,445,655 行数据,从旧 Parquet 文件复制到一个新的替换 Parquet 文件中,而版本 3 增加了一个包含 548,665 行的新 Parquet 文件。 总体而言,Direct Lake 只需要重新加载大约 1700 万行,与没有分区时需要重新加载 10 亿行相比,这是一个显著的改进。

版本 DESCRIPTION 价值
2 运算 写入
操作参数 {'mode': 'Append', 'partitionBy': '["Month"]'}
运营指标 {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4103076'}
1 运算 删除
操作参数 {'predicate': '[“(DateID#3910 = 20200101)”]'}
运营指标 {'numRemovedFiles': '1', 'numRemovedBytes': '126464179', 'numCopiedRows': '16445655', 'numDeletionVectorsAdded': '0', 'numDeletionVectorsRemoved': '0', 'numAddedChangeFiles': '0', 'executionTimeMs': '19065', 'numDeletionVectorsUpdated': '0', 'numDeletedRows': '548665', 'scanTimeMs': '1926', 'numAddedFiles': '1', 'numAddedBytes': '121275513', 'rewriteTimeMs': '17138'}
0 运算 写入
操作参数 {'mode': 'Overwrite', 'partitionBy': '[“Month”]'}
运营指标 {'numFiles': '60', 'numOutputRows': '100000000000', 'numOutputBytes': '7447681467'}

仅追加模式由Spark优化

仅追加模式不会影响现有的 Parquet 文件。 它们与 Direct Lake 增量帧配合良好。 但是,不要忘记优化 Delta 表以合并 Parquet 文件和行组,如本文前面所述。 小型和频繁的追加可以快速累积文件,并且可能会扭曲行组大小的统一性。

以下输出显示了与已分区表相比的非分区表的 Delta 分析器历史记录。 历史包括七个追加操作和一个随后的 优化 操作。

版本 DESCRIPTION 默认布局中的值 分区布局中的值
8 运算 优化 优化
操作参数 {'predicate': '[]', 'auto': 'false', 'clusterBy': '[]', 'vorder': 'true', 'zOrderBy': '[]'} {'predicate': '[“('Month >= 202501)”]“, 'auto': 'false', 'clusterBy': '[]', 'vorder': 'true', 'zOrderBy': '[]'}
运营指标 {'numRemovedFiles': '8', 'numRemovedBytes': '991234561', 'p25FileSize': '990694179', 'numDeletionVectorsRemoved': '0', 'minFileSize': '990694179', 'numAddedFiles': '1', 'maxFileSize': '990694179', 'p75FileSize': '990694179', 'p50FileSize': '990694179', 'numAddedBytes': '990694179'} {'numRemovedFiles': '7', 'numRemovedBytes': '28658548', 'p25FileSize': '28308495', 'numDeletionVectorsRemoved': '0', 'minFileSize': '28308495', 'numAddedFiles': '1', 'maxFileSize': '28308495', 'p75FileSize': '28308495', 'p50FileSize': '28308495', 'numAddedBytes': '28308495'}
7 运算 写入 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'} {'mode': '追加', 'partitionBy': '[“月份”]'}
运营指标 {'numFiles': '1', 'numOutputRows': '547453', 'numOutputBytes': '4091802'} {'numFiles': '1', 'numOutputRows': '547453', 'numOutputBytes': '4091802'}
6 运算 写入 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'} {'mode': 'Append', 'partitionBy': '["Month"]'}
运营指标 {'numFiles': '1', 'numOutputRows': '548176', 'numOutputBytes': '4095497'} {'numFiles': '1', 'numOutputRows': '548176', 'numOutputBytes': '4095497'}
5 运算 写入 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'} {'mode': 'Append', 'partitionBy': '["Month"]'}
运营指标 {'numFiles': '1', 'numOutputRows': '547952', 'numOutputBytes': '4090107'} {'numFiles': '1', 'numOutputRows': '547952', 'numOutputBytes': '4093015'}
4 运算 写入 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'} {'mode': 'Append', 'partitionBy': '["Month"]'}
运营指标 {'numFiles': '1', 'numOutputRows': '548631', 'numOutputBytes': '4093134'} {'numFiles': '1', 'numOutputRows': '548631', 'numOutputBytes': '4094376'}
3 运算 写入 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'} {'mode': 'Append', 'partitionBy': '["Month"]'}
运营指标 {'numFiles': '1', 'numOutputRows': '548671', 'numOutputBytes': '4101221'} {'numFiles': '1', 'numOutputRows': '548671', 'numOutputBytes': '4101221'}
2 运算 写入 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'} {'mode': 'Append', 'partitionBy': '["Month"]'}
运营指标 {'numFiles': '1', 'numOutputRows': '546530', 'numOutputBytes': '4081589'} {'numFiles': '1', 'numOutputRows': '546530', 'numOutputBytes': '4081589'}
1 运算 写入 写入
操作参数 {'mode': 'Append', 'partitionBy': '[]'} {'mode': 'Append', 'partitionBy': '["Month"]'}
运营指标 {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4101048'} {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4101048'}
0 运算 写入 写入
操作参数 {'mode': 'Overwrite', 'partitionBy': '[]'} {'mode': 'Overwrite', 'partitionBy': '[“Month”]'}
运营指标 {'numFiles': '8', 'numOutputRows': '100000000000', 'numOutputBytes': '7700855198'} {'numFiles': '60', 'numOutputRows': '100000000000', 'numOutputBytes': '7447681467'}

回顾版本 8 的操作指标,值得指出的是,非分区表的优化操作合并了 8 个 Parquet 文件,涉及大约 1 GB 的数据,而已分区表的优化操作合并了 7 个 Parquet 文件,仅涉及大约 25 MB 的数据。 因此,Direct Lake 在分区表上的性能会更好。

注意事项和限制

优化 Direct Lake 性能的注意事项和限制如下所示:

  • 避免大型 Delta 表上的破坏性更新模式,以在 Direct Lake 中保留增量框架。
  • 不需要针对增量帧处理优化小型Delta表。
  • 目标为行组大小介于 100 万到 1600 万行之间,以在 Direct Lake 中创建列段,行数为 100 万至 1600 万行。 Direct Lake 更喜欢大型列分段。
  • 避免使用高基数的分区列,因为在 Direct Lake 转码中,与较大而少的 Parquet 文件和行组相比,使用许多较小的 Parquet 文件和行组的效率会更低。
  • 由于对计算和内存资源的需求不可预见的,语义模型可能会重新加载到处于冷状态的另一个 Fabric 群集节点上。
  • Direct Lake 不使用 delta\Parquet 统计信息来跳过行组\文件,以优化数据加载。