Compartilhar via


Sincronizar dados de tabelas do Catálogo do Unity com uma instância de banco de dados

Importante

Esse recurso está em Visualização Pública nas seguintes regiões: westus, , westus2, eastus, eastus2, centralus, southcentralus, , northeurope, westeurope, , australiaeast, , brazilsouth, , canadacentral, , centralindia, , southeastasiauksouth.

Esta página descreve como criar e gerenciar uma tabela sincronizada. Uma tabela sincronizada é uma tabela Postgres somente leitura do Catálogo do Unity que sincroniza automaticamente os dados de uma tabela do Catálogo do Unity para a instância do banco de dados Lakebase. A sincronização de uma tabela do Catálogo do Unity com o Postgres permite consultas de leitura de baixa latência e dá suporte a junções de tempo de consulta com outras tabelas do Postgres.

A sincronização é tratada pelo Lakeflow Spark Declarative Pipelines. Um pipeline gerenciado atualiza continuamente a tabela Postgres com alterações da tabela de origem. Após a criação, as tabelas sincronizadas podem ser consultadas diretamente usando as ferramentas do Postgres.

As principais características das tabelas sincronizadas são as seguintes:

  • Somente leitura no Postgres para manter a integridade dos dados com a origem
  • Sincronizado automaticamente usando pipelines declarativos do Spark do Lakeflow gerenciado
  • Consultável por meio de interfaces PostgreSQL padrão
  • Gerenciado por meio do Catálogo do Unity para gerenciamento de governança e ciclo de vida

Antes de começar

  • Você tem uma tabela do Catálogo do Unity em qualquer catálogo.
  • Você tem permissões CAN USE na instância do banco de dados.

Criar uma tabela sincronizada

interface do usuário

Para sincronizar uma tabela do Catálogo do Unity no Postgres, faça o seguinte:

  1. Clique em Catálogo na barra lateral do workspace.

  2. Localize e selecione a tabela catálogo do Unity na qual você deseja criar uma tabela sincronizada.

  3. Clique em Criar>Tabela Sincronizada.

  4. Selecione seu catálogo, esquema e insira um nome de tabela para a nova tabela sincronizada.

    • Tabelas sincronizadas também podem ser criadas em catálogos Standard, com alguma configuração adicional. Selecione seu catálogo Standard, um esquema e insira um nome de tabela para a tabela sincronizada recém-criada.
  5. Selecione uma instância de banco de dados e insira o nome do banco de dados Postgres no qual criar a tabela sincronizada. O campo de banco de dados postgres usa como padrão o catálogo de destino selecionado no momento. Se um banco de dados Postgres não existir com esse nome, o Azure Databricks criará um novo.

  6. Selecione uma chave primária. Uma chave primária é necessária , pois permite acesso eficiente a linhas para leituras, atualizações e exclusões.

    Importante

    As colunas na chave primária não são anuláveis na tabela sincronizada. Portanto, linhas com nulos em colunas de chave primária são excluídas da sincronização.

  7. Se duas linhas tiverem a mesma chave primária na tabela de origem, selecione uma Chave timeseries para configurar a eliminação de duplicação. Quando uma chave timeseries é especificada, as tabelas sincronizadas contêm apenas as linhas com o valor de chave de timeseries mais recente para cada chave primária.

  8. Selecione o modo de sincronização: Instantânea, Disparada e Contínua. Para obter mais informações sobre cada modo de sincronização, consulte os modos de sincronização explicados.

  9. Escolha se deseja criar essa tabela sincronizada de um pipeline novo ou existente.

    • Se estiver criando um novo pipeline e usando um catálogo gerenciado, escolha o local de armazenamento para a tabela de preparo. Se estiver usando um catálogo padrão, a tabela de preparo será armazenada automaticamente no catálogo.
    • Se estiver usando um pipeline existente, verifique se o novo modo de sincronização corresponde ao modo de pipeline.
  10. (Opcional) Selecione uma política de orçamento sem servidor. Para criar uma política de orçamento sem servidor, consulte o uso de atributos com políticas de orçamento sem servidor. Isso permite atribuir o uso de cobrança a políticas de uso específicas.

    • Para tabelas sincronizadas, a entidade faturável é o pipeline subjacente do Lakeflow Spark Declarative Pipelines. Para modificar a política de orçamento, modifique o objeto de pipeline subjacente. Consulte Configurar um pipeline sem servidor.
  11. Depois que o status da tabela sincronizada estiver Online, faça login na instância do banco de dados e faça a consulta da nova tabela. Consulte sua tabela usando o editor do SQL, ferramentas externas ou notebooks.

SDK do Python

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy

# Initialize the Workspace client
w = WorkspaceClient()

# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
    SyncedDatabaseTable(
        name="database_catalog.schema.synced_table",  # Full three-part name
        spec=SyncedTableSpec(
            source_table_full_name="source_catalog.source_schema.source_table",
            primary_key_columns=["id"],  # Primary key columns
            scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED,  # SNAPSHOT, TRIGGERED, or CONTINUOUS
            # Optional: timeseries_key="timestamp"  # For deduplication
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog="storage_catalog",
                storage_schema="storage_schema"
            )
        ),
    )
)
print(f"Created synced table: {synced_table.name}")

# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
    SyncedDatabaseTable(
        name="standard_catalog.schema.synced_table",  # Full three-part name
        database_instance_name="my-database-instance",  # Required for standard catalogs
        logical_database_name="postgres_database",  # Required for standard catalogs
        spec=SyncedTableSpec(
            source_table_full_name="source_catalog.source_schema.source_table",
            primary_key_columns=["id"],
            scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
            create_database_objects_if_missing=True,  # Create database/schema if needed
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog="storage_catalog",
                storage_schema="storage_schema"
            )
        ),
    )
)
print(f"Created synced table: {synced_table.name}")

# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")

CLI

# Create a synced table in a database catalog
databricks database create-synced-database-table  \
  --json '{
    "spec": {
      "name": "database_catalog.schema.synced_table",
      "source_table_full_name": "source_catalog.source_schema.source_table",
      "primary_key_columns": ["id"],
      "scheduling_policy": "TRIGGERED"
    },
    "new_pipeline_spec": {
      "storage_catalog": "storage_catalog",
      "storage_schema": "storage_schema"
    }
  }'

# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
  --database-instance-name "my-database-instance" \
  --logical-database-name "databricks_postgres" \
  --json '{
    "name": "standard_catalog.schema.synced_table",
    "spec": {
      "source_table_full_name": "source_catalog.source_schema.source_table",
      "primary_key_columns": ["id"],
      "scheduling_policy": "CONTINUOUS",
      "create_database_objects_if_missing": true
    }
  }'

# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"

encurvar

Crie uma tabela sincronizada em um catálogo de banco de dados.

export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"

curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
  "name": "$DEST_TBL",
  "spec": {
    "source_table_full_name": "$SRC_TBL",
    "primary_key_columns": $PKS,
    "scheduling_policy": "TRIGGERED",
  },
  "new_pipeline_spec": {
    "storage_catalog": "$ST_CATALOG",
    "storage_schema": "$ST_SCHEMA",
  }
}
EOF

Crie uma tabela sincronizada em um catálogo padrão do Unity Catalog.

export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"

curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
  "name": "$DEST_TBL",
  "database_instance_name": "$DATABASE_INSTANCE",
  "logical_database_name": "$POSTGRES_DATABASE",
  "spec": {
    "source_table_full_name": "$SRC_TBL",
    "primary_key_columns": $PKS,
    "scheduling_policy": "TRIGGERED",
  },
  "new_pipeline_spec": {
    "storage_catalog": "$ST_CATALOG",
    "storage_schema": "$ST_SCHEMA",
  }
}
EOF

Verifique o status de uma tabela sincronizada.

export SYNCEDTABLE='pg_db.silver.sbtest1_online'

curl --request GET \
  "https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
  --header "Authorization: Bearer dapi..."

Modos de sincronização explicados

Uma tabela sincronizada pode ser criada com um dos seguintes modos de sincronização, que determinam como os dados são sincronizados da origem para a tabela sincronizada no Postgres:

Modo de sincronização Description Performance
Instantâneo O pipeline é executado uma vez para tirar um instantâneo da tabela de origem e copiá-la para a tabela sincronizada. As execuções subsequentes do pipeline copiam todos os dados de origem para o destino e substituem os dados originais no destino de forma atômica. O pipeline pode ser disparado manualmente, por meio de uma API ou por meio de um cronograma. Esse modo é 10 vezes mais eficiente do que os modos de sincronização disparados ou contínuos porque recria dados do zero. Se você estiver modificando mais de 10% da tabela de origem, considere usar esse modo.
Disparado O pipeline é executado uma vez para tirar um instantâneo da tabela de origem e copiá-la para a tabela sincronizada. Ao contrário do modo de sincronização de instantâneo, quando a tabela sincronizada é atualizada, somente as alterações desde a última execução do pipeline são recuperadas e aplicadas à tabela sincronizada. A atualização incremental pode ser disparada manualmente, por meio de uma API ou em um agendamento. Esse modo é um bom compromisso entre latência e custo, pois é executado sob demanda e aplica apenas as alterações desde a última execução. Para minimizar o atraso, execute esse pipeline imediatamente após atualizar a tabela de origem. Se você executar isso com mais frequência do que a cada 5 minutos, pode ser mais caro do que o modo Contínuo devido ao custo de iniciar e parar o pipeline a cada vez.
Contínuo O pipeline é executado uma vez para tirar um instantâneo da tabela de origem e copiá-lo para a tabela sincronizada e, em seguida, o pipeline é executado continuamente. As alterações subsequentes na tabela de origem são aplicadas incrementalmente à tabela sincronizada em tempo real. Não é necessária nenhuma atualização manual. Esse modo tem o menor atraso, mas custo mais alto, pois está em execução contínua.

Observação

Para dar suporte ao modo de sincronização Disparado ou Contínuo, a tabela de origem deve ter a habilitação Alterar feed de dados. Determinadas fontes (como Exibições) não oferecem suporte ao feed de dados de alterações, portanto, elas só podem ser sincronizadas no modo Instantâneo.

Operações com suporte

O Databricks recomenda executar apenas as seguintes operações no Postgres para tabelas sincronizadas para evitar substituições acidentais ou inconsistências de dados:

  • Consultas somente leitura
  • Criando índices
  • Soltando a tabela (para liberar espaço depois de remover a tabela sincronizada do Catálogo do Unity)

Embora você seja capaz de modificar essa tabela de outras maneiras, ela interfere no pipeline de sincronização.

Excluir uma tabela sincronizada

Para excluir uma tabela sincronizada, exclua-a do Catálogo do Unity e solte a tabela na instância do banco de dados. Excluir a tabela sincronizada do Catálogo do Unity desregissa a tabela e interrompe as atualizações de dados. No entanto, a tabela permanece no banco de dados Postgres subjacente. Para liberar espaço em sua instância de banco de dados, conecte-se à instância e use o DROP TABLE comando.

interface do usuário

  1. Clique em Catálogo na barra lateral do workspace.
  2. Localize e selecione a tabela sincronizada que você deseja excluir.
  3. Clique no ícone do menu Kebab.>Excluir.
  4. Conecte-se à instância com psql, o editor SQL ou a partir de um notebook.
  5. Exclua a tabela usando o PostgreSQL.
    DROP TABLE synced_table_database.synced_table_schema.synced_table
    

SDK do Python

from databricks.sdk import WorkspaceClient

# Initialize the Workspace client
w = WorkspaceClient()

# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")

# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;

CLI

# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"

# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;

encurvar

# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
  https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME

# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;

Propriedade e permissões

Se você criar um novo banco de dados, esquema ou tabela do Postgres, a propriedade do Postgres será definida da seguinte maneira:

  • A propriedade será atribuída ao usuário que cria o banco de dados, o esquema ou a tabela, se o logon do Azure Databricks existir como uma função no Postgres. Para adicionar uma função de identidade do Azure Databricks no Postgres, consulte Gerenciar funções do Postgres.
  • Caso contrário, a propriedade é atribuída ao proprietário do objeto pai no Postgres (normalmente o databricks_superuser).

Gerenciar o acesso à tabela sincronizada

Depois que uma tabela sincronizada é criada, databricks_superuser pode READ uma tabela sincronizada do Postgres. O databricks_superuser possui pg_read_all_data que permite que essa função leia de todas as tabelas. Ele também tem o pg_write_all_data privilégio, que permite que essa função escreva em todas as tabelas. Isso significa que um databricks_superuser também pode gravar em uma tabela sincronizada no Postgres. O Lakebase dá suporte a esse comportamento de gravação caso você precise fazer alterações urgentes em sua tabela de destino. No entanto, o Databricks recomenda fazer correções na tabela de origem.

  • O databricks_superuser também pode conceder esses privilégios a outros usuários:

    GRANT USAGE ON SCHEMA synced_table_schema TO user;
    
    GRANT SELECT ON synced_table_name TO user;
    
  • O databricks_superuser pode revogar esses privilégios:

    REVOKE USAGE ON SCHEMA synced_table_schema FROM user;
    
    REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
    

Gerenciar operações de tabela sincronizadas

O databricks_superuser pode gerenciar quais usuários estão autorizados a executar operações específicas em uma tabela sincronizada. As operações com suporte para tabelas sincronizadas são:

  • CREATE INDEX
  • ALTER INDEX
  • DROP INDEX
  • DROP TABLE

Todas as outras operações DDL são negadas para tabelas sincronizadas.

Para conceder esses privilégios a usuários adicionais, primeiro databricks_superuser deve criar uma extensão em databricks_auth:

CREATE EXTENSION IF NOT EXISTS databricks_auth;

Em seguida, o databricks_superuser pode adicionar um usuário para gerenciar uma tabela sincronizada:

SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');

databricks_superuser pode remover um usuário do gerenciamento de uma tabela sincronizada.

SELECT databricks_synced_table_remove_manager('[table]', '[user]');

O databricks_superuser pode ver todos os gerentes:

SELECT * FROM databricks_synced_table_managers;

Mapeamento de tipo de dados

Esta tabela de mapeamento de tipo define como cada tipo de dados na tabela de catálogo do Unity de origem é mapeado para a tabela de sincronização de destino no Postgres:

Tipo de coluna de origem Tipo de coluna Postgres
BIGINT BIGINT
BINÁRIO BYTEA
BOOLEANO BOOLEAN
DATA DATE
DECIMAL(p,s) NUMÉRICO
DOBRAR Dupla precisão
FLUTUAR REAL
INT INTEGER
INTERVAL intervalQualifier INTERVALO
SMALLINT SMALLINT
CORDA TEXTO
TIMESTAMP CARIMBO DE DATA/HORA COM FUSO HORÁRIO
TIMESTAMP_NTZ CARIMBO DE DATA/HORA SEM FUSO HORÁRIO
TINYINT SMALLINT
GEOGRAPHY(srid) SEM SUPORTE
GEOMETRY(srid) SEM SUPORTE
ElementType ARRAY <> JSONB
MAP < keyType,valueType > JSONB
STRUCT < [fieldName: fieldType [NOT NULL][COMMENT str][, ...]] > JSONB
VARIANTE JSONB
OBJETO SEM SUPORTE

Observação

  • O mapeamento para tipos ARRAY, MAP e STRUCT foi alterado em maio de 2025. As tabelas de sincronização criadas antes disso continuam a mapear esses tipos para JSON.
  • O mapeamento de TIMESTAMP foi alterado em agosto de 2025. As tabelas de sincronização criadas antes disso continuam a mapeá-la para TIMESTAMP SEM FUSO HORÁRIO.

Manipular caracteres inválidos

Determinados caracteres, como o byte nulo (0x00), são permitidos em STRING, ARRAYou MAPSTRUCT colunas em tabelas Delta, mas não têm suporte em Postgres TEXT ou JSONB colunas. Como resultado, a sincronização desses dados de Delta para Postgres pode levar a falhas de inserção com erros:

org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00

org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
  • O primeiro erro ocorre quando um byte nulo aparece em uma coluna de cadeia de caracteres de nível superior, que é mapeada diretamente para Postgres TEXT.
  • O segundo erro ocorre quando um byte nulo aparece em uma cadeia de caracteres aninhada dentro de um tipo complexo (STRUCTou ARRAYMAP), que o Azure Databricks serializa como JSONB. Durante a serialização, todas as cadeias de caracteres são convertidas em Postgres TEXT, onde \u0000 não é permitido.

Como resolver:

Você pode resolver esse problema de uma das seguintes maneiras:

  • Higienizar campos de string

    Remova ou substitua caracteres sem suporte de todos os campos de cadeia de caracteres, incluindo aqueles dentro de tipos complexos, antes de sincronizar com o Postgres.

    Para remover bytes nulos de uma coluna de nível superior STRING, use a função REPLACE:

    SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;
    
  • Converter para binário (apenas para colunas STRING)

    Se for necessário preservar o conteúdo bruto de bytes, converta as colunas afetadas STRING em BINARY.

Limitações e considerações

Tabelas de origem com suporte

Dependendo do modo de sincronização de uma tabela sincronizada, há suporte para diferentes tipos de tabelas de origem:

  • Para o modo instantâneo, a tabela de origem deve dar suporte SELECT *. Exemplos incluem tabelas Delta, tabelas iceberg, exibições, exibições materializadas e outros tipos semelhantes.

  • Para modos de sincronização disparados ou contínuos, a tabela de origem também deve ter o feed de dados de alteração habilitado.

Limitações de nomenclatura e identificador

  • Caracteres permitidos: Os nomes de banco de dados, esquema e tabela do Postgres para tabelas sincronizadas podem conter apenas caracteres alfanuméricos e sublinhados ([A-Za-z0-9_]+). Não há suporte para hífens (-) e outros caracteres especiais.
  • Identificadores de coluna e tabela: Evite usar letras maiúsculas ou caracteres especiais em nomes de coluna ou tabela na tabela de catálogo do Unity de origem. Se mantido, você precisará citar esses identificadores ao referenciá-los no Postgres.

Desempenho e sincronização

  • Velocidade de sincronização: A sincronização de dados em tabelas sincronizadas pode ser mais lenta do que gravar dados diretamente na instância do banco de dados com um cliente PostgreSQL nativo devido ao processamento adicional. O modo de sincronização contínua atualiza os dados da tabela gerenciada do Catálogo do Unity para a tabela sincronizada em um intervalo mínimo de 15 segundos.
  • Uso da conexão: Cada sincronização de tabela pode usar até 16 conexões com a instância do banco de dados, que contam para o limite de conexão da instância.
  • Idempotência da API: As APIs de tabela sincronizadas são idempotentes e podem precisar ser repetidas, caso ocorram erros, para garantir operações oportunas.
  • Alterações de esquema: Para tabelas sincronizadas no modo Triggered ou Continuous, somente alterações de esquema aditivas (por exemplo, a adição de uma nova coluna) das tabelas do Catálogo do Unity são refletidas na tabela sincronizada.
  • Chaves duplicadas: Se duas linhas tiverem a mesma chave primária na tabela de origem, o pipeline de sincronização falhará, a menos que você configure a eliminação de duplicação usando uma chave timeseries. No entanto, usar uma Chave de Série Temporal vem com uma penalidade de desempenho.
  • Taxa de atualização: O pipeline de sincronização dá suporte a escritas contínuas de aproximadamente 1.200 linhas por segundo por Unidade de Capacidade e escritas em massa de até 15.000 linhas por segundo por Unidade de Capacidade.

Capacidade e limites

  • Limites de tabela:
    • Limite de 20 tabelas sincronizadas por tabela de origem.
    • Cada sincronização de tabela pode usar até 16 conexões de banco de dados.
  • Limites de tamanho e atualização completa:
    • Se você atualizar totalmente uma tabela sincronizada, a versão antiga no Postgres não será excluída até que a nova tabela seja sincronizada. Ambas as versões contam temporariamente para o limite de tamanho do banco de dados lógico durante a atualização.
    • Tabelas sincronizadas individuais não têm um limite, mas o limite total de tamanho de dados lógicos em todas as tabelas na instância é de 2 TB. No entanto, se você precisar de atualizações em vez de recriação de tabela completa, o Databricks recomenda não exceder 1 TB.
    • Se o tamanho de formato de linha não compactado da tabela catálogo do Unity exceder o limite de tamanho da instância do banco de dados (2 TB), a sincronização falhará. Você deve excluir a tabela sincronizada no Postgres antes de fazer mais alterações na instância.

Integração de catálogo

  • Duplicação de catálogo: A criação de uma tabela sincronizada em um catálogo padrão direcionado a um banco de dados Postgres, que também está registrado como um catálogo de banco de dados separado, faz com que a tabela sincronizada apareça no Unity Catalog sob ambos os catálogos, padrão e de banco de dados.