适用于: ![]()
![]()
完全刷新引入管道会清除目标表的数据和状态,然后重新处理数据源中的所有记录。 可以完全刷新管道中的所有表,也可以选择要刷新的表。
| 接口 | 说明书 |
|---|---|
| Lakehouse 用户界面 (UI) | 手动触发管道更新 |
| 管道 API |
POST /api/2.0/pipelines/{pipeline_id}/updates |
| Databricks 命令行界面 (CLI) | databricks 流水线 启动更新 |
重要
引入管道更新可能会在Initializing阶段或Resetting tables阶段失败。 Lakeflow Connect 会自动多次重试管道。 如果中断自动重试或它们最终彻底失败,请使用与之前相同的表刷新选择手动启动新的管道更新。 否则,目标表最终可能会因为只有部分数据而处于不一致的状态。 如果手动重试也失败,请创建支持票证。
完全刷新行为 (CDC)
适用于:
”的数据库连接器
当您触发表的完全刷新时,Databricks 会对过程进行优化,以减少停机时间并维护数据可用性。
- 快照请求:请求完全刷新时,引入网关会立即开始创建源表的新快照。 目标流式处理表将被排除在刷新选择之外,直到快照完成为止。
- 持续可用性:在快照过程中,目标流表保留现有数据,并始终可用于查询。 快照正在进行时,不会对表应用任何更新、追加或删除。
- 原子刷新:快照完成后,Databricks 会在单个更新中自动执行完整刷新。 此更新应用自请求快照以来累积的所有快照数据和任何 CDC 记录。
例如,如果表在更新 15 结束时有 50 条记录,并且你在 update 16 中请求完全刷新:
- 引入网关在更新 16 时开始创建快照。
- 该表继续显示原始 50 条记录,直到快照完成。
- 快照完成时(在更新 16 或更高版本中,这取决于源表的大小),将在一个原子操作中自动应用完整刷新。
此方法可显著减少完全刷新操作期间的停机时间,并帮助防止 PENDING_RESET 和超时错误。
为数据库连接器配置完全刷新行为
适用于:![]()
![]()
了解如何在 Lakeflow Connect 中使用数据库连接器(如 SQL Server)为受管引入流水线配置完全刷新行为。 可以计划何时发生完全刷新快照,并启用自动完全刷新,以便从不支持的架构更改中恢复。
完全刷新窗口
可以使用完整刷新窗口功能来安排完全刷新的快照操作的执行时间。 当你请求完全刷新或系统自动触发完全刷新时,快照将在配置的窗口中的下一个可用时间启动。 下表显示了计划的工作原理:
| 请求时间 | 窗口 | 快照开始 | 注释 |
|---|---|---|---|
| 星期一 2025-10-20 10:00:00 UTC | 开始时间:20,天:星期二,时区:UTC | 星期二 2025-10-21 20:00:00 UTC | 快照被推迟到下一个可用窗口日 |
| 星期一 2025-10-20 09:30:00 UTC | 开始时间:9,天:星期一,时区:UTC | 星期一 2025-10-20 09:30:00 UTC | 同一天,请求时间在时间窗口内 |
| 星期一 2025-10-20 10:00:00 UTC | 开始时间:9,天:星期一,时区:UTC | 星期一 2025-10-27 09:00:00 UTC | 请求的时间超出了窗口,因此被推迟到下周 |
配置参数
在管道规范中 ingestion_definition 配置完整刷新窗口:
| 参数 | 类型 | Description | 必选 |
|---|---|---|---|
start_hour |
整数 | 窗口(0-23)的开始时间(24 小时)。 | 是的 |
days_of_week |
Array | 窗口处于活动状态的天数。 有效值:MONDAY、、TUESDAY、WEDNESDAYTHURSDAY、FRIDAY、SATURDAY。 SUNDAY 如果未指定,则使用所有日期。 |
否 |
time_zone_id |
String | 窗口的时区 ID。 请参阅设置会话时区以获取受支持时区 ID 的信息。 如果未指定,则默认为 UTC。 | 否 |
示例:配置完整刷新窗口
以下示例演示如何向管道定义添加完整刷新窗口。
Databricks 资产捆绑包
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <destination-catalog>
schema: <destination-schema>
ingestion_definition:
ingestion_gateway_id: <gateway-id>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog>
destination_schema: <destination-schema>
full_refresh_window:
start_hour: 20
days_of_week:
- MONDAY
- TUESDAY
time_zone_id: 'America/Los_Angeles'
Databricks 笔记本
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "<gateway-name>",
"catalog": "<destination-catalog>",
"target": "<destination-schema>",
"gateway_definition": {
"connection_id": "<connection-id>",
"gateway_storage_catalog": "<destination-catalog>",
"gateway_storage_schema": "<destination-schema>",
"gateway_storage_name": "<destination-schema>"
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": "<pipeline-name>",
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"ingestion_gateway_id": "<gateway-pipeline-id>",
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>"
}
}
],
"full_refresh_window": {
"start_hour": 20,
"days_of_week": ["MONDAY", "TUESDAY"],
"time_zone_id": "America/Los_Angeles"
}
}
}
自动完全刷新策略
为了帮助在不进行手动干预的情况下保持数据一致性,自动完全刷新策略允许在管道遇到不支持的 DDL作时自动触发完全刷新:
- 表清空
- 不兼容的架构更改(例如数据类型更改)
- 列重命名
- 添加带有默认值的列
如果未启用自动完全刷新,则必须在这些作发生时手动触发完全刷新。
配置参数
在管道规范中,您可以在管道级别或表级别配置自动全量刷新:
| 参数 | 类型 | Description | 违约 |
|---|---|---|---|
enabled |
布尔 | 是否启用自动完全刷新。 | false |
min_interval_hours |
整数 | 完全刷新之间的最小等待间隔(以小时为单位)。 系统在启动新的自动完全刷新之前等待自上次快照以来的此间隔。 | 24 |
可以在多个级别配置自动完全刷新:
- 管道级别:输入
ingestion_definition.table_configuration.auto_full_refresh_policy - 表级别:In
ingestion_definition.objects[].table.table_configuration.auto_full_refresh_policy
表级配置替代管道级配置。
示例:在管道级别配置自动完全刷新
以下示例演示如何为管道中的所有表启用自动全量刷新。
Databricks 资产捆绑包
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <destination-catalog>
schema: <destination-schema>
ingestion_definition:
ingestion_gateway_id: <gateway-id>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog>
destination_schema: <destination-schema>
table_configuration:
auto_full_refresh_policy:
enabled: true
min_interval_hours: 24
Databricks 笔记本
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "<gateway-name>",
"catalog": "<destination-catalog>",
"target": "<destination-schema>",
"gateway_definition": {
"connection_id": "<connection-id>",
"gateway_storage_catalog": "<destination-catalog>",
"gateway_storage_schema": "<destination-schema>",
"gateway_storage_name": "<destination-schema>"
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": "<pipeline-name>",
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"ingestion_gateway_id": "<gateway-pipeline-id>",
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>"
}
}
],
"table_configuration": {
"auto_full_refresh_policy": {
"enabled": True,
"min_interval_hours": 24
}
}
}
}
示例:为每个表配置自动完全刷新
以下示例展示了如何在管道级别启用自动完全刷新,但在特定表中禁用此功能。
Databricks 资产捆绑包
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <destination-catalog>
schema: <destination-schema>
ingestion_definition:
ingestion_gateway_id: <gateway-id>
objects:
- table:
source_schema: <source-schema>
source_table: table_1
destination_catalog: <destination-catalog>
destination_schema: <destination-schema>
- table:
source_schema: <source-schema>
source_table: table_2
destination_catalog: <destination-catalog>
destination_schema: <destination-schema>
table_configuration:
auto_full_refresh_policy:
enabled: false
min_interval_hours: 24
table_configuration:
auto_full_refresh_policy:
enabled: true
min_interval_hours: 24
Databricks 笔记本
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "<gateway-name>",
"catalog": "<destination-catalog>",
"target": "<destination-schema>",
"gateway_definition": {
"connection_id": "<connection-id>",
"gateway_storage_catalog": "<destination-catalog>",
"gateway_storage_schema": "<destination-schema>",
"gateway_storage_name": "<destination-schema>"
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": "<pipeline-name>",
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"ingestion_gateway_id": "<gateway-pipeline-id>",
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "table_1",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>"
}
},
{
"table": {
"source_schema": "<source-schema>",
"source_table": "table_2",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"auto_full_refresh_policy": {
"enabled": False,
"min_interval_hours": 24
}
}
}
}
],
"table_configuration": {
"auto_full_refresh_policy": {
"enabled": True,
"min_interval_hours": 24
}
}
}
}
在此示例中, table_1 使用管道级策略(已启用),而 table_2 用表级配置替代它(已禁用)。