Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo se explica cómo procesar mensajes de Azure Event Hubs en una canalización. No puede usar el conector de Event Hubs de Structured Streaming porque esta biblioteca no está disponible como parte de Databricks Runtime y las canalizaciones declarativas de Spark de Lakeflow no permiten utilizar bibliotecas JVM de terceros.
¿Cómo se puede conectar una canalización a Azure Event Hubs?
Azure Event Hubs proporciona un punto de conexión compatible con Apache Kafka que puede usar con el conector de Kafka Structured Streaming, disponible en Databricks Runtime, para procesar mensajes desde Azure Event Hubs. Para más información sobre la compatibilidad de Azure Event Hubs y Apache Kafka, consulte Uso de Azure Event Hubs desde aplicaciones de Apache Kafka.
En los pasos siguientes se describe cómo conectar una canalización a una instancia de Event Hubs existente y consumir eventos de un tema. Para completar estos pasos, necesita los siguientes valores de conexión de Event Hubs:
- Nombre del espacio de nombres de Event Hubs.
- Nombre de la instancia del centro de eventos en el espacio de nombres de Event Hubs.
- Un nombre de directiva de acceso compartido y una clave de directiva para Event Hubs. De forma predeterminada, se crea una
RootManageSharedAccessKeydirectiva para cada espacio de nombres de Event Hubs. Esta política tienemanage,sendylistenpermisos. Si la canalización solo lee de Event Hubs, Databricks recomienda crear una nueva directiva con permiso para escuchar únicamente.
Para obtener más información sobre la cadena de conexión de Event Hubs, consulte "Obtención de una cadena de conexión de Event Hubs".
Nota:
- Azure Event Hubs proporciona opciones tanto de OAuth 2.0 como de firma de acceso compartido (SAS) para autorizar el acceso a tus recursos seguros. Estas instrucciones usan la autenticación basada en SAS.
- Si obtiene la cadena de conexión de Hubs de Eventos desde el portal de Azure, es posible que no contenga el valor
EntityPath. ElEntityPathvalor solo es necesario cuando se usa el conector de Event Hubs de Structured Streaming. El uso del conector de Kafka de Structured Streaming requiere proporcionar solo el nombre del tema.
Guardar la clave de política en un secreto de Azure Databricks
Dado que la clave de directiva es información confidencial, Databricks recomienda no codificar de forma rígida el valor en el código de canalización. En su lugar, use secretos de Azure Databricks para almacenar y administrar el acceso a la clave.
En el ejemplo siguiente se usa la CLI de Databricks para crear un ámbito secreto y almacenar la clave en ese ámbito secreto. En el código de canalización, use la función dbutils.secrets.get() con la scope-name y la shared-policy-name para recuperar el valor de clave.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Para más información sobre los secretos de Azure Databricks, consulte Administración de secretos.
Cree una canalización y añada código para consumir eventos
En el ejemplo siguiente se leen eventos de IoT de un tópico, pero puede adaptar el ejemplo para los requisitos de su aplicación. Como procedimiento recomendado, Databricks recomienda usar la configuración de canalización para configurar las variables de aplicación. A continuación, el código de canalización usa la spark.conf.get() función para recuperar valores.
from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dp.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dp.expect("valid_topic", "topic IS NOT NULL")
@dp.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Creación de la canalización
Cree una nueva tubería utilizando un archivo fuente de Python e introduzca el código mencionado anteriormente.
El código hace referencia a parámetros configurados. Use la siguiente configuración JSON, reemplazando los valores de marcador de posición por los valores adecuados para su entorno (consulte la lista, siguiendo el código JSON). Puede establecer los parámetros mediante la interfaz de usuario de configuración o editando el JSON de configuración directamente. Para más información sobre el uso de la configuración de canalización para parametrizar la canalización, consulte Uso de parámetros con canalizaciones.
Este archivo de configuración también define la ubicación de almacenamiento de una cuenta de almacenamiento de Azure Data Lake Storage (ADLS). Como procedimiento recomendado, esta canalización no usa la ruta de acceso de almacenamiento de DBFS predeterminada, sino que usa una cuenta de almacenamiento de ADLS. Para obtener más información sobre cómo configurar la autenticación para una cuenta de almacenamiento de ADLS, consulte Acceso seguro a las credenciales de almacenamiento con secretos en una canalización.
{
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
}
}
Reemplace los siguientes marcadores de posición:
-
<container-name>con el nombre de un contenedor de cuentas de almacenamiento de Azure. -
<storage-account-name>con el nombre de una cuenta de almacenamiento de ADLS. -
<eh-namespace>con el nombre del espacio de nombres de Event Hubs. -
<eh-policy-name>con la clave de alcance secreta para la clave de política de Event Hubs. -
<eventhub>con el nombre de la instancia de Event Hubs. -
<secret-scope-name>con el nombre del espacio secreto de Azure Databricks que contiene la clave de política de Event Hubs.