Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
El conector de PostgreSQL para Lakeflow Connect está en versión preliminar pública. Póngase en contacto con el equipo de su cuenta de Databricks para inscribirse en la Vista previa pública.
En esta página se describe cómo ingerir datos de PostgreSQL y cargarlos en Azure Databricks mediante Lakeflow Connect. El conector postgreSQL admite AWS RDS PostgreSQL, Aurora PostgreSQL, Amazon EC2, Azure Database for PostgreSQL, máquinas virtuales de Azure, GCP Cloud SQL for PostgreSQL y bases de datos postgreSQL locales mediante Azure ExpressRoute, AWS Direct Connect o redes VPN.
Antes de empezar
Para crear una puerta de enlace de ingesta y una canalización de ingesta, debe cumplir los siguientes requisitos:
El área de trabajo está habilitada para Unity Catalog.
El proceso sin servidor está habilitado para el área de trabajo. Consulte Requisitos de proceso sin servidor.
Si tiene previsto crear una conexión: tiene
CREATE CONNECTIONprivilegios en el metastore.Si el conector admite la creación de canalizaciones basadas en la interfaz de usuario, puede crear la conexión y la canalización al mismo tiempo completando los pasos de esta página. Sin embargo, si usa la creación de canalizaciones basadas en API, debe crear la conexión en el Explorador de catálogos antes de completar los pasos de esta página. Consulte Conexión a orígenes de ingesta administrados.
Si planea utilizar una conexión existente: Tiene privilegios
USE CONNECTIONoALL PRIVILEGESen la conexión.Tiene
USE CATALOGprivilegios en el catálogo de destino.Tiene privilegios
USE SCHEMA,CREATE TABLEyCREATE VOLUMEen un esquema existente o privilegiosCREATE SCHEMAen el catálogo de destino.
Tiene acceso a una instancia principal de PostgreSQL. La replicación lógica solo se admite en instancias principales y no en réplicas de lectura.
Permisos sin restricciones para crear clústeres o una directiva personalizada (solo API). Una directiva personalizada para la puerta de enlace debe cumplir los siguientes requisitos:
Familia: Proceso de trabajos
Invalidaciones de familia de directivas:
{ "cluster_type": { "type": "fixed", "value": "dlt" }, "num_workers": { "type": "unlimited", "defaultValue": 1, "isOptional": true }, "runtime_engine": { "type": "fixed", "value": "STANDARD", "hidden": true } }Databricks recomienda especificar los nodos de trabajo más pequeños posibles para las puertas de enlace de ingesta porque no afectan al rendimiento de la puerta de enlace. La siguiente directiva de proceso permite a Azure Databricks escalar la puerta de enlace de ingesta para satisfacer las necesidades de la carga de trabajo. El requisito mínimo es de 8 núcleos para habilitar la extracción de datos eficaz y eficaz de la base de datos de origen.
{ "driver_node_type_id": { "type": "fixed", "value": "Standard_E64d_v4" }, "node_type_id": { "type": "fixed", "value": "Standard_F4s" } }Para obtener más información sobre las directivas de clúster, consulte Seleccionar una directiva de procesamiento.
Para ingerir desde PostgreSQL, también debe completar la configuración de origen.
Opción 1: Interfaz de usuario de Azure Databricks
Nota:
La compatibilidad con la interfaz de usuario para PostgreSQL estará disponible próximamente. Por ahora, use el cuaderno o el flujo de trabajo de la CLI descritos en la opción 2.
Los usuarios administradores pueden crear una conexión y una canalización al mismo tiempo en la interfaz de usuario de Azure Databricks. Esta es la manera más sencilla de crear canalizaciones de ingesta administradas.
En la barra lateral del área de trabajo de Azure Databricks, haga clic en Ingesta de datos.
En la página Agregar datos , en Conectores de Databricks, haga clic en PostgreSQL.
Se abre el asistente para la ingesta.
En la página Puerta de enlace de ingesta del asistente, introduzca un nombre único para la puerta de enlace.
Seleccione un catálogo y un esquema para almacenar provisionalmente los datos de ingesta y, a continuación, haga clic en Siguiente.
En la página Canalización de ingesta , escriba un nombre único para la canalización.
En Catálogo de destino, seleccione un catálogo para almacenar los datos ingeridos.
Seleccione la conexión de Unity Catalog que almacena las credenciales necesarias para acceder a los datos de origen.
Si no hay conexiones existentes al origen, haga clic en Crear conexión y escriba los detalles de autenticación que obtuvo de la opción Configurar PostgreSQL para la ingesta en Azure Databricks. Debe tener privilegios
CREATE CONNECTIONen el metastore.Haga clic en Crear canalización y continúe.
En la página Origen , seleccione las tablas que se van a ingerir.
Opcionalmente, cambie la configuración de seguimiento del historial predeterminada. Para obtener más información, consulte Habilitación del seguimiento del historial (tipo SCD 2).
Haga clic en Siguiente.
En la página Destino , seleccione el catálogo de Unity Catalog y el esquema en los que escribir.
Si no desea usar un esquema existente, haga clic en Crear esquema. Debe tener privilegios
USE CATALOGyCREATE SCHEMAen el catálogo primario.Haga clic en Guardar y continuar.
En la página Configuración de base de datos , escriba el nombre del espacio de replicación y el nombre de la publicación de cada base de datos desde la que desea ingerir. Estos se crearon durante la configuración de PostgreSQL para la ingesta en Azure Databricks.
Haga clic en Siguiente.
(Opcional) En la página Configuración , haga clic en Crear programación. Establezca la frecuencia para actualizar las tablas de destino.
(Opcional) Establezca las notificaciones por correo electrónico para que la operación de canalización se complete correctamente o no.
Haga clic en Guardar y ejecutar canalización.
Opción 2: Otras interfaces
Antes de realizar la ingesta mediante los paquetes de activos de Databricks, las API de Databricks, los SDK de Databricks o la CLI de Databricks, debe tener acceso a una conexión existente de Unity Catalog. Para obtener instrucciones, consulte Conexión a orígenes de ingesta administrados.
Crear el catálogo de staging y el esquema
El catálogo y el esquema de almacenamiento provisional pueden ser los mismos que el catálogo y el esquema de destino. El catálogo de ensayo no puede ser un catálogo externo.
Interfaz de línea de comandos (CLI)
export CONNECTION_NAME="my_postgresql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_postgresql_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="postgresql-instance.example.com"
export DB_PORT="5432"
export DB_DATABASE="your_database"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "POSTGRESQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"database": "'"$DB_DATABASE"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Crea la puerta de enlace y la canalización de ingesta
La puerta de enlace de ingesta extrae la instantánea y cambia los datos de la base de datos de origen y los almacena en el volumen de almacenamiento provisional de Unity Catalog. Debe usar la puerta de enlace como una canalización continua. Es fundamental para PostgreSQL evitar el sobredimensionamiento del Write-Ahead Log (WAL) y asegurarse de que las ranuras de replicación no acumulen cambios sin consumir.
La canalización de ingesta aplica la instantánea y cambia los datos del volumen de almacenamiento provisional en tablas de streaming de destino.
Conjuntos de recursos de Databricks
En esta pestaña se describe cómo implementar una canalización de ingesta mediante conjuntos de recursos de Databricks. Las agrupaciones pueden contener definiciones de YAML de trabajos y tareas, se administran mediante la CLI de Databricks y se pueden compartir y ejecutar en diferentes áreas de trabajo de destino (como desarrollo, almacenamiento provisional y producción). Para obtener más información, consulte Conjuntos de recursos de Databricks.
Cree una nueva agrupación mediante la CLI de Databricks:
databricks bundle initAgregue dos nuevos archivos de recursos al lote:
- Un archivo de definición de canalización (
resources/postgresql_pipeline.yml). - Un archivo de flujo de trabajo que controla la frecuencia de ingesta de datos (
resources/postgresql_job.yml).
A continuación se muestra un archivo
resources/postgresql_pipeline.ymlde ejemplo:variables: # Common variables used multiple places in the DAB definition. gateway_name: default: postgresql-gateway dest_catalog: default: main dest_schema: default: ingest-destination-schema resources: pipelines: gateway: name: ${var.gateway_name} gateway_definition: connection_name: <postgresql-connection> gateway_storage_catalog: main gateway_storage_schema: ${var.dest_schema} gateway_storage_name: ${var.gateway_name} target: ${var.dest_schema} catalog: ${var.dest_catalog} pipeline_postgresql: name: postgresql-ingestion-pipeline ingestion_definition: ingestion_gateway_id: ${resources.pipelines.gateway.id} source_type: POSTGRESQL objects: # Modify this with your tables! - table: # Ingest the table public.orders to dest_catalog.dest_schema.orders. source_catalog: your_database source_schema: public source_table: orders destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - schema: # Ingest all tables in the public schema to dest_catalog.dest_schema. The destination # table name will be the same as it is on the source. source_catalog: your_database source_schema: public destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} source_configurations: - catalog: source_catalog: your_database postgres: slot_config: slot_name: db_slot publication_name: db_pub target: ${var.dest_schema} catalog: ${var.dest_catalog}A continuación se muestra un archivo
resources/postgresql_job.ymlde ejemplo:resources: jobs: postgresql_dab_job: name: postgresql_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_postgresql.id}- Un archivo de definición de canalización (
Implemente la canalización mediante la CLI de Databricks:
databricks bundle deploy
Notebook
Actualice la celda Configuration del cuaderno siguiente con la conexión de origen, el catálogo de destino, el esquema de destino y las tablas que se van a importar desde el origen.
Creación de la puerta de enlace y la canalización de ingesta
Interfaz de línea de comandos (CLI)
Para crear la puerta de enlace:
gateway_json=$(cat <<EOF
{
"name": "$GATEWAY_PIPELINE_NAME",
"gateway_definition": {
"connection_name": "$CONNECTION_NAME",
"gateway_storage_catalog": "$STAGING_CATALOG",
"gateway_storage_schema": "$STAGING_SCHEMA",
"gateway_storage_name": "$GATEWAY_PIPELINE_NAME"
}
}
EOF
)
output=$(databricks pipelines create --json "$gateway_json")
echo $output
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
Para crear la canalización de ingesta:
pipeline_json=$(cat <<EOF
{
"name": "$INGESTION_PIPELINE_NAME",
"ingestion_definition": {
"ingestion_gateway_id": "$GATEWAY_PIPELINE_ID",
"source_type": "POSTGRESQL",
"objects": [
{
# Modify this with your tables!
"table": {
# Ingest the table public.orders to dest_catalog.dest_schema.orders.
"source_catalog": "your_database",
"source_schema": "public",
"source_table": "table",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA",
"destination_table": "<YOUR_DATABRICKS_TABLE>"
}
},
{
"schema": {
# Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
"source_catalog": "your_database",
"source_schema": "public",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA"
}
}
],
"source_configurations": [
{
"catalog": {
"source_catalog": "your_database",
"postgres": {
"slot_config": {
"slot_name": "db_slot", # Slot created during source setup
"publication_name": "db_pub" # Publication created during source setup
}
}
}
}
]
}
}
EOF
)
databricks pipelines create --json "$pipeline_json"
Requiere la CLI de Databricks v0.276.0 o posterior.
Inicio, programación y establecimiento de alertas en la canalización
Para obtener información sobre cómo iniciar, programar y establecer alertas en la canalización, consulte Tareas comunes de mantenimiento de canalización.
Comprobación de la ingesta de datos correcta
La vista en lista en la página de detalles de la canalización muestra el número de registros procesados conforme se incorporan los datos. Estos números se actualizan automáticamente.
Las columnas Upserted records y Deleted records no se muestran de forma predeterminada. Puede habilitarlas haciendo clic en el botón de configuración de columnas
y seleccionándolas.