将数据从 Unity 目录表同步到数据库实例

重要

此功能在以下区域中为公共预览版westuswestus2eastuseastus2centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindiasoutheastasiauksouth

本页介绍如何创建和管理同步表。 同步表是一个 Unity 目录只读 Postgres 表,它自动将数据从 Unity 目录表同步到 Lakebase 数据库实例。 将 Unity 目录表同步到 Postgres 可实现低延迟读取查询,并支持与其他 Postgres 表的查询时间联接。

同步由 Lakeflow Spark 声明性管道处理。 托管管道使用源表的更改持续更新 Postgres 表。 创建后,可以使用 Postgres 工具直接查询同步表。

同步表的主要特征如下:

  • 在 Postgres 中设置为只读,以维护与源数据的一致性和完整性。
  • 使用托管的 Lakeflow Spark 声明式流水线自动同步
  • 可通过标准 PostgreSQL 接口进行查询
  • 通过 Unity 目录进行管理,以便进行治理和生命周期管理

在您开始之前

  • 在任何目录中都有一个 Unity Catalog 表。
  • 你对数据库实例具有 CAN USE 权限。

创建同步表

UI

要将 Unity 目录表同步到 Postgres,请执行以下作:

  1. 在工作区边栏中单击 “目录 ”。

  2. 查找并选择要在其中创建同步表的 Unity 目录表。

  3. 单击“ 创建>同步表”。

  4. 选择目录、架构,然后输入新同步表的表名称。

    • 还可以在 标准目录中创建同步表,并配置一些其他配置。 选择标准目录、架构,并输入新创建的同步表的表名称。
  5. 选择数据库实例并输入要在其中创建同步表的 Postgres 数据库的名称。 Postgres 数据库字段默认为当前选定的目标目录。 如果 Postgres 数据库在此名称下不存在,Azure Databricks 将创建一个新数据库。

  6. 选择 主键。 主键 是必需的 ,因为它支持对行进行有效访问,以便读取、更新和删除。

    重要

    主键中的列在同步表中不允许为 null。 因此,包含 null 值的主键列中的行将从同步中排除

  7. 如果源表中的两行具有相同的主键,请选择 Timeseries 密钥 以配置去重。 指定时间序列键时,同步表仅包含每个主键具有最新时间序列键值的行。

  8. 快照触发连续选择同步模式。 有关每个同步模式的详细信息,请参阅 介绍的同步模式

  9. 选择是否要从新管道或现有管道创建此同步表。

    • 如果创建新管道并使用托管目录,请选择临时表的存储位置。 如果使用标准目录,临时表将自动存储在目录中。
    • 如果使用现有管道,请检查新的同步模式是否与管道模式匹配。
  10. (可选)选择 无服务器预算策略。 若要创建无服务器预算策略,请参阅 无服务器预算策略的属性使用情况。 这使你可以将计费使用情况归咎于特定的使用策略。

    • 对于同步表,可计费实体是基础 Lakeflow Spark 声明性管道管道。 若要修改预算策略,请修改基础管道对象。 请参阅 配置无服务器管道
  11. 同步表状态“联机”后,登录到数据库实例并查询新创建的表。 使用 SQL 编辑器外部工具笔记本查询表。

Python SDK

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy

# Initialize the Workspace client
w = WorkspaceClient()

# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
    SyncedDatabaseTable(
        name="database_catalog.schema.synced_table",  # Full three-part name
        spec=SyncedTableSpec(
            source_table_full_name="source_catalog.source_schema.source_table",
            primary_key_columns=["id"],  # Primary key columns
            scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED,  # SNAPSHOT, TRIGGERED, or CONTINUOUS
            # Optional: timeseries_key="timestamp"  # For deduplication
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog="storage_catalog",
                storage_schema="storage_schema"
            )
        ),
    )
)
print(f"Created synced table: {synced_table.name}")

# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
    SyncedDatabaseTable(
        name="standard_catalog.schema.synced_table",  # Full three-part name
        database_instance_name="my-database-instance",  # Required for standard catalogs
        logical_database_name="postgres_database",  # Required for standard catalogs
        spec=SyncedTableSpec(
            source_table_full_name="source_catalog.source_schema.source_table",
            primary_key_columns=["id"],
            scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
            create_database_objects_if_missing=True,  # Create database/schema if needed
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog="storage_catalog",
                storage_schema="storage_schema"
            )
        ),
    )
)
print(f"Created synced table: {synced_table.name}")

# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")

CLI

# Create a synced table in a database catalog
databricks database create-synced-database-table  \
  --json '{
    "spec": {
      "name": "database_catalog.schema.synced_table",
      "source_table_full_name": "source_catalog.source_schema.source_table",
      "primary_key_columns": ["id"],
      "scheduling_policy": "TRIGGERED"
    },
    "new_pipeline_spec": {
      "storage_catalog": "storage_catalog",
      "storage_schema": "storage_schema"
    }
  }'

# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
  --database-instance-name "my-database-instance" \
  --logical-database-name "databricks_postgres" \
  --json '{
    "name": "standard_catalog.schema.synced_table",
    "spec": {
      "source_table_full_name": "source_catalog.source_schema.source_table",
      "primary_key_columns": ["id"],
      "scheduling_policy": "CONTINUOUS",
      "create_database_objects_if_missing": true
    }
  }'

# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"

curl

在数据库目录中创建同步表。

export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"

curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
  "name": "$DEST_TBL",
  "spec": {
    "source_table_full_name": "$SRC_TBL",
    "primary_key_columns": $PKS,
    "scheduling_policy": "TRIGGERED",
  },
  "new_pipeline_spec": {
    "storage_catalog": "$ST_CATALOG",
    "storage_schema": "$ST_SCHEMA",
  }
}
EOF

在标准 Unity 目录目录中创建同步表。

export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"

curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
  "name": "$DEST_TBL",
  "database_instance_name": "$DATABASE_INSTANCE",
  "logical_database_name": "$POSTGRES_DATABASE",
  "spec": {
    "source_table_full_name": "$SRC_TBL",
    "primary_key_columns": $PKS,
    "scheduling_policy": "TRIGGERED",
  },
  "new_pipeline_spec": {
    "storage_catalog": "$ST_CATALOG",
    "storage_schema": "$ST_SCHEMA",
  }
}
EOF

检查同步表的状态。

export SYNCEDTABLE='pg_db.silver.sbtest1_online'

curl --request GET \
  "https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
  --header "Authorization: Bearer dapi..."

同步模式说明

可以使用以下同步模式之一创建同步表,从而确定如何将数据从源同步到 Postgres 中的同步表:

同步模式 Description Performance
快照 管道运行一次,以获取源表的快照并将其复制到同步表。 后续管道运行将整个源数据复制到目标,并将其以原子方式替换。 可以手动、通过 API 或按计划触发管道。 此模式比触发模式或连续同步模式高效 10 倍,因为它从头开始重新创建数据。 如果你要修改源表超过10%,请考虑使用此模式。
触发 管道运行一次,以获取源表的快照并将其复制到同步表。 与快照同步模式不同,当同步表被刷新时,仅检索自上次管道执行以来的更改,并应用到已经同步的表上。 可以通过 API 或计划手动触发增量刷新。 此模式在延迟和成本之间是一个很好的妥协,因为它按需运行,并且仅应用自上次运行以来的更改。 若要最大程度地减少延迟,请在更新源表后立即运行此管道。 如果运行此频率高于每 5 分钟一次,则由于每次启动和停止管道的成本,它可能比连续模式更昂贵。
连续 管道运行一次,以拍摄源表的快照,并将其复制到同步表,然后管道持续运行。 对源表的后续更改会以增量方式实时应用于同步表。 无需手动刷新。 此模式的滞后时间最低,但成本较高,因为它持续运行。

注释

若要支持触发连续同步模式,源表必须启用更改数据馈送。 某些源(如视图)不支持更改数据馈送,因此只能在快照模式下同步它们。

支持的操作

Databricks 建议仅在 Postgres 中为同步表执行以下操作:以避免意外覆盖或数据不一致。

  • 只读查询
  • 创建索引
  • 删除表(从 Unity Catalog 中删除此同步表以释放空间)

尽管可以通过其他方式修改此表,但它会干扰同步管道。

删除同步表

若要删除同步的表,必须将其从 Unity 目录中删除,然后删除数据库实例中的表。 从 Unity 目录删除同步表会取消注册表,并停止任何数据刷新。 但是,该表仍保留在基础 Postgres 数据库中。 若要释放数据库实例中的空间,请连接到实例并使用 DROP TABLE 命令。

UI

  1. 在工作区边栏中单击 “目录 ”。
  2. 找到并选择要删除的同步表。
  3. 单击 “Kebab”菜单图标。>删除
  4. 使用 psql、SQL 编辑器或笔记本连接到实例。
  5. 使用 PostgreSQL 删除表。
    DROP TABLE synced_table_database.synced_table_schema.synced_table
    

Python SDK

from databricks.sdk import WorkspaceClient

# Initialize the Workspace client
w = WorkspaceClient()

# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")

# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;

CLI

# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"

# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;

curl

# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
  https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME

# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;

所有权和权限

如果创建新的 Postgres 数据库、架构或表,Postgres 所有权设置如下:

  • 如果其 Azure Databricks 登录名作为 Postgres 中的角色存在,则所有权将分配给创建数据库、架构或表的用户。 若要在 Postgres 中添加 Azure Databricks 标识角色,请参阅 “管理 Postgres 角色”。
  • 否则,所有权将分配给 Postgres 中父对象的所有者(通常为 databricks_superuser)。

管理同步表访问

创建同步表后,databricks_superuser 可以从 Postgres READ 同步表。 该 databricks_superuser 具有 pg_read_all_data,允许此角色从所有表中读取。 它还具有 pg_write_all_data 权限,允许此角色写入所有表。 这意味着 databricks_superuser 也可以写入 Postgres 中的同步表。 如果需要在目标表中进行紧急更改,Lakebase 支持此写入行为。 但是,Databricks 建议改为在源表中进行修复。

  • databricks_superuser 还可以向其他用户授予这些权限。

    GRANT USAGE ON SCHEMA synced_table_schema TO user;
    
    GRANT SELECT ON synced_table_name TO user;
    
  • databricks_superuser 可以撤销这些特权:

    REVOKE USAGE ON SCHEMA synced_table_schema FROM user;
    
    REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
    

管理同步表操作

databricks_superuser 可以管理哪些用户有权对同步表执行特定操作。 同步表支持的操作包括:

  • CREATE INDEX
  • ALTER INDEX
  • DROP INDEX
  • DROP TABLE

已同步表的所有其他 DDL 操作都被拒绝。

若要向其他用户授予这些权限, databricks_superuser 必须先创建 databricks_auth以下扩展:

CREATE EXTENSION IF NOT EXISTS databricks_auth;

然后, databricks_superuser 可以添加用户来管理同步表:

SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');

databricks_superuser 可以将用户从同步表的管理中删除。

SELECT databricks_synced_table_remove_manager('[table]', '[user]');

databricks_superuser 可以查看所有管理员:

SELECT * FROM databricks_synced_table_managers;

数据类型映射

此类型映射表定义源 Unity 目录表中每个 数据类型 如何映射到 Postgres 中的目标同步表:

源列类型 Postgres 列类型
BIGINT BIGINT
二元的 BYTEA
布尔 BOOLEAN
日期 DATE
DECIMAL(p,s) NUMERIC
双精度
real
INT INTEGER
INTERVAL intervalQualifier 间隔
SMALLINT SMALLINT
字符串 文本
时间戳 时间戳与时区
TIMESTAMP_NTZ 不含时区的时间戳
TINYINT SMALLINT
GEOGRAPHY(srid) 不支持
GEOMETRY(srid) 不支持
ARRAY < elementType > JSONB
MAP < keyType,valueType > JSONB
STRUCT < [fieldName: fieldType [NOT NULL][COMMENT str][,...]] > JSONB
变体 JSONB
对象 不支持

注释

  • ARRAY、MAP 和 STRUCT 类型的映射于 2025 年 5 月更改。 同步之前创建的表继续将这些类型映射到 JSON。
  • TIMESTAMP 的映射于 2025 年 8 月更改。 同步之前创建的表继续将其映射到 TIMESTAMP WITHOUT TIME ZONE。

处理无效字符

某些字符(如 null 字节(0x00))允许在 STRINGARRAYMAPSTRUCT 列的 Delta 表中使用,但在 Postgres TEXTJSONB 列中不支持。 因此,将此类数据从 Delta 同步到 Postgres 可能会导致插入失败并出现错误:

org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00

org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
  • 第一个错误发生在顶级字符串列中出现 null 字节时,该列直接映射到 Postgres TEXT
  • 当 null 字节出现在复杂类型(STRUCTARRAYMAP)中的字符串内时,Azure Databricks 会将其序列化为 JSONB,这时会发生第二个错误。 在序列化期间,所有字符串都会被转换为 Postgres TEXT 格式,其中 \u0000 是不允许的。

解决方法:

可通过以下方式之一解决此问题:

  • 清理字符串字段

    在同步到 Postgres 之前,从所有字符串字段中删除或替换不受支持的字符,包括复杂类型中的字符。

    若要从顶级 STRING 列中删除 null 字节,请使用函数 REPLACE

    SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;
    
  • 转换为二进制格式(仅适用于 STRING 列)

    如果需要保留原始字节内容,请将受影响的 STRING 列转换为 BINARY

限制和注意事项

支持的源表

根据同步表的同步模式,支持不同类型的源表:

  • 对于快照模式,源表必须支持 SELECT *。 示例包括 Delta 表、Iceberg 表、视图、具体化视图和其他类似类型。

  • 对于触发或连续同步模式,源表 还必须 启用 更改数据馈送

命名和标识符限制

  • 允许的字符: 同步表的 Postgres 数据库、架构和表名称只能包含字母数字字符和下划线([A-Za-z0-9_]+)。 不支持连字符(-)和其他特殊字符。
  • 列和表标识符: 避免在源 Unity 目录表中的列名或表名中使用大写字母或特殊字符。 如果保留这些标识符,则在 Postgres 中引用它们时需要加以引用。

性能与同步

  • 同步速度: 将数据同步到同步表的速度可能会比使用本机 PostgreSQL 客户端直接将数据写入数据库实例的速度更慢,这是由于额外的处理。 连续同步模式将数据从 Unity 目录托管表刷新到同步表,最短间隔为 15 秒。
  • 连接用法: 每个表同步最多可以使用 16 个到数据库实例的连接,这计入实例的连接限制。
  • API 幂等性: 同步表 API 是幂等的,如果发生错误,为了确保操作的及时性,可能需要重试。
  • 架构更改:对于处于Triggered模式或Continuous模式的同步表,只有来自 Unity Catalog 表的累加架构更改(例如添加新列)才会反映在同步表上。
  • 重复键: 如果两行在源表中具有相同的主键,则同步管道会失败,除非使用 Timeseries 密钥配置重复数据删除。 但是,使用 Timeseries 密钥会产生性能损失。
  • 更新速率: 同步管道支持每秒约 1,200 行的连续写入,每个容量单位(CU)每秒最多 15,000 行进行批量写入。

容量和限制

  • 表限制:
    • 每个源表的同步表数限制为 20 个。
    • 每个表同步最多可以使用 16 个数据库连接。
  • 大小限制和完全刷新:
    • 如果完全刷新同步的表,则在同步新表之前,不会删除 Postgres 中的旧版本。 在刷新期间,这两个版本都会临时计入逻辑数据库大小限制。
    • 单个同步表没有限制,但实例中所有表的总逻辑数据大小限制为 2 TB。 但是,如果您需要刷新而非完整表重建,Databricks 建议不超过 1 TB。
    • 如果未压缩的 Unity 目录表的行格式大小超过数据库实例大小限制(2 TB),同步将失败。 必须先在 Postgres 中删除同步表,然后才能继续向实例写入数据。

目录集成

  • 目录重复: 在目标为 Postgres 数据库的标准目录中创建同步表,该数据库也注册为单独的数据库目录会导致同步表显示在标准目录和数据库目录下的 Unity 目录中。