完全刷新目标数据表

适用于标记为“是”的 SaaS 连接器标记为“是”的数据库连接器

完全刷新引入管道会清除目标表的数据和状态,然后重新处理数据源中的所有记录。 可以完全刷新管道中的所有表,也可以选择要刷新的表。

接口 说明书
Lakehouse 用户界面 (UI) 手动触发管道更新
管道 API POST /api/2.0/pipelines/{pipeline_id}/updates
Databricks 命令行界面 (CLI) databricks 流水线 启动更新

重要

引入管道更新可能会在Initializing阶段或Resetting tables阶段失败。 Lakeflow Connect 会自动多次重试管道。 如果中断自动重试或它们最终彻底失败,请使用与之前相同的表刷新选择手动启动新的管道更新。 否则,目标表最终可能会因为只有部分数据而处于不一致的状态。 如果手动重试也失败,请创建支持票证。

完全刷新行为 (CDC)

适用于检查标记为“是 ”的数据库连接器

当您触发表的完全刷新时,Databricks 会对过程进行优化,以减少停机时间并维护数据可用性。

  1. 快照请求:请求完全刷新时,引入网关会立即开始创建源表的新快照。 目标流式处理表将被排除在刷新选择之外,直到快照完成为止。
  2. 持续可用性:在快照过程中,目标流表保留现有数据,并始终可用于查询。 快照正在进行时,不会对表应用任何更新、追加或删除。
  3. 原子刷新:快照完成后,Databricks 会在单个更新中自动执行完整刷新。 此更新应用自请求快照以来累积的所有快照数据和任何 CDC 记录。

例如,如果表在更新 15 结束时有 50 条记录,并且你在 update 16 中请求完全刷新:

  1. 引入网关在更新 16 时开始创建快照。
  2. 该表继续显示原始 50 条记录,直到快照完成。
  3. 快照完成时(在更新 16 或更高版本中,这取决于源表的大小),将在一个原子操作中自动应用完整刷新。

此方法可显著减少完全刷新操作期间的停机时间,并帮助防止 PENDING_RESET 和超时错误。

为数据库连接器配置完全刷新行为

适用于:勾选为“是”的数据库连接器勾选为“是”的基于 API 的管道创作

了解如何在 Lakeflow Connect 中使用数据库连接器(如 SQL Server)为受管引入流水线配置完全刷新行为。 可以计划何时发生完全刷新快照,并启用自动完全刷新,以便从不支持的架构更改中恢复。

完全刷新窗口

可以使用完整刷新窗口功能来安排完全刷新的快照操作的执行时间。 当你请求完全刷新或系统自动触发完全刷新时,快照将在配置的窗口中的下一个可用时间启动。 下表显示了计划的工作原理:

请求时间 窗口 快照开始 注释
星期一 2025-10-20 10:00:00 UTC 开始时间:20,天:星期二,时区:UTC 星期二 2025-10-21 20:00:00 UTC 快照被推迟到下一个可用窗口日
星期一 2025-10-20 09:30:00 UTC 开始时间:9,天:星期一,时区:UTC 星期一 2025-10-20 09:30:00 UTC 同一天,请求时间在时间窗口内
星期一 2025-10-20 10:00:00 UTC 开始时间:9,天:星期一,时区:UTC 星期一 2025-10-27 09:00:00 UTC 请求的时间超出了窗口,因此被推迟到下周

配置参数

在管道规范中 ingestion_definition 配置完整刷新窗口:

参数 类型 Description 必选
start_hour 整数 窗口(0-23)的开始时间(24 小时)。 是的
days_of_week Array 窗口处于活动状态的天数。 有效值:MONDAY、、TUESDAYWEDNESDAYTHURSDAYFRIDAYSATURDAYSUNDAY 如果未指定,则使用所有日期。
time_zone_id String 窗口的时区 ID。 请参阅设置会话时区以获取受支持时区 ID 的信息。 如果未指定,则默认为 UTC。

示例:配置完整刷新窗口

以下示例演示如何向管道定义添加完整刷新窗口。

Databricks 资产捆绑包
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>

    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog>
      schema: <destination-schema>
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
        full_refresh_window:
          start_hour: 20
          days_of_week:
            - MONDAY
            - TUESDAY
          time_zone_id: 'America/Los_Angeles'
Databricks 笔记本
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": "<gateway-name>",
  "catalog": "<destination-catalog>",
  "target": "<destination-schema>",
  "gateway_definition": {
    "connection_id": "<connection-id>",
    "gateway_storage_catalog": "<destination-catalog>",
    "gateway_storage_schema": "<destination-schema>",
    "gateway_storage_name": "<destination-schema>"
  }
}

ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": "<pipeline-name>",
  "catalog": "<destination-catalog>",
  "schema": "<destination-schema>",
  "ingestion_definition": {
    "ingestion_gateway_id": "<gateway-pipeline-id>",
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>"
        }
      }
    ],
    "full_refresh_window": {
      "start_hour": 20,
      "days_of_week": ["MONDAY", "TUESDAY"],
      "time_zone_id": "America/Los_Angeles"
    }
  }
}

自动完全刷新策略

为了帮助在不进行手动干预的情况下保持数据一致性,自动完全刷新策略允许在管道遇到不支持的 DDL作时自动触发完全刷新:

  • 表清空
  • 不兼容的架构更改(例如数据类型更改)
  • 列重命名
  • 添加带有默认值的列

如果未启用自动完全刷新,则必须在这些作发生时手动触发完全刷新。

配置参数

在管道规范中,您可以在管道级别或表级别配置自动全量刷新:

参数 类型 Description 违约
enabled 布尔 是否启用自动完全刷新。 false
min_interval_hours 整数 完全刷新之间的最小等待间隔(以小时为单位)。 系统在启动新的自动完全刷新之前等待自上次快照以来的此间隔。 24

可以在多个级别配置自动完全刷新:

  • 管道级别:输入ingestion_definition.table_configuration.auto_full_refresh_policy
  • 表级别:In ingestion_definition.objects[].table.table_configuration.auto_full_refresh_policy

表级配置替代管道级配置。

示例:在管道级别配置自动完全刷新

以下示例演示如何为管道中的所有表启用自动全量刷新。

Databricks 资产捆绑包
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>

    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog>
      schema: <destination-schema>
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
        table_configuration:
          auto_full_refresh_policy:
            enabled: true
            min_interval_hours: 24
Databricks 笔记本
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": "<gateway-name>",
  "catalog": "<destination-catalog>",
  "target": "<destination-schema>",
  "gateway_definition": {
    "connection_id": "<connection-id>",
    "gateway_storage_catalog": "<destination-catalog>",
    "gateway_storage_schema": "<destination-schema>",
    "gateway_storage_name": "<destination-schema>"
  }
}

ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": "<pipeline-name>",
  "catalog": "<destination-catalog>",
  "schema": "<destination-schema>",
  "ingestion_definition": {
    "ingestion_gateway_id": "<gateway-pipeline-id>",
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>"
        }
      }
    ],
    "table_configuration": {
      "auto_full_refresh_policy": {
        "enabled": True,
        "min_interval_hours": 24
      }
    }
  }
}

示例:为每个表配置自动完全刷新

以下示例展示了如何在管道级别启用自动完全刷新,但在特定表中禁用此功能。

Databricks 资产捆绑包
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>

    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog>
      schema: <destination-schema>
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: table_1
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
          - table:
              source_schema: <source-schema>
              source_table: table_2
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
              table_configuration:
                auto_full_refresh_policy:
                  enabled: false
                  min_interval_hours: 24
        table_configuration:
          auto_full_refresh_policy:
            enabled: true
            min_interval_hours: 24
Databricks 笔记本
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": "<gateway-name>",
  "catalog": "<destination-catalog>",
  "target": "<destination-schema>",
  "gateway_definition": {
    "connection_id": "<connection-id>",
    "gateway_storage_catalog": "<destination-catalog>",
    "gateway_storage_schema": "<destination-schema>",
    "gateway_storage_name": "<destination-schema>"
  }
}

ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": "<pipeline-name>",
  "catalog": "<destination-catalog>",
  "schema": "<destination-schema>",
  "ingestion_definition": {
    "ingestion_gateway_id": "<gateway-pipeline-id>",
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "table_1",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>"
        }
      },
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "table_2",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>",
          "table_configuration": {
            "auto_full_refresh_policy": {
              "enabled": False,
              "min_interval_hours": 24
            }
          }
        }
      }
    ],
    "table_configuration": {
      "auto_full_refresh_policy": {
        "enabled": True,
        "min_interval_hours": 24
      }
    }
  }
}

在此示例中, table_1 使用管道级策略(已启用),而 table_2 用表级配置替代它(已禁用)。