Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article explique comment traiter les messages d’Azure Event Hubs dans un pipeline. Vous ne pouvez pas utiliser le connecteur Structured Streaming Event Hubs, car cette bibliothèque n’est pas disponible dans le cadre du Databricks Runtime, et les Pipelines Déclaratifs Spark Lakeflow ne vous permettent pas d’utiliser des bibliothèques JVM tierces.
Comment un pipeline peut-il se connecter à Azure Event Hubs ?
Azure Event Hubs fournit un point de terminaison compatible avec Apache Kafka que vous pouvez utiliser avec le connecteur Kafka Structured Streaming, disponible dans Databricks Runtime, pour traiter les messages à partir d’Azure Event Hubs. Pour plus d’informations sur la compatibilité d’Azure Event Hubs et Apache Kafka, consultez Utiliser Azure Event Hubs à partir d’applications Apache Kafka.
Les étapes suivantes décrivent la connexion d’un pipeline à une instance Event Hubs existante et l’utilisation d’événements à partir d’une rubrique. Pour effectuer ces étapes, vous avez besoin des valeurs de connexion Event Hubs suivantes :
- Nom de l’espace de noms Event Hubs.
- Nom de l’instance Event Hub dans l’espace de noms Event Hubs.
- Nom et clé de stratégie d’accès partagé pour Event Hubs. Par défaut, une
RootManageSharedAccessKeystratégie est créée pour chaque espace de noms d'Event Hubs. Cette politique a les autorisationsmanage,sendetlisten. Si votre pipeline lit uniquement à partir d'Event Hubs, Databricks recommande de créer une stratégie avec uniquement l’autorisation d’écoute.
Pour plus d’informations sur la chaîne de connexion Event Hubs, consultez Obtenir une chaîne de connexion Event Hubs.
Note
- Azure Event Hubs fournit à la fois des options OAuth 2.0 et de signature d'accès partagé (SAS) pour autoriser l'accès à vos ressources sécurisées. Ces instructions utilisent l’authentification basée sur SAP.
- Si vous obtenez la chaîne de connexion Event Hubs à partir du portail Azure, elle peut ne pas contenir la
EntityPathvaleur. LaEntityPathvaleur est requise uniquement lors de l’utilisation du connecteur Event Hubs Structured Streaming. L’utilisation du connecteur Kafka de streaming structuré nécessite de fournir uniquement le nom du sujet.
Stocker la clé de stratégie dans un secret Azure Databricks
Étant donné que la clé de stratégie est des informations sensibles, Databricks recommande de ne pas coder en dur la valeur dans votre code de pipeline. Utilisez plutôt les secrets Azure Databricks pour stocker et gérer l’accès à la clé.
L’exemple suivant utilise l’interface CLI Databricks pour créer une étendue secrète et stocker la clé dans cette étendue secrète. Dans votre code de pipeline, utilisez la fonction dbutils.secrets.get() avec scope-name et shared-policy-name pour récupérer la valeur clé.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Pour plus d’informations sur les secrets Azure Databricks, consultez Gestion des secrets.
Créer une pipeline et ajouter du code pour traiter des événements
L’exemple suivant lit les événements IoT à partir d’une rubrique, mais vous pouvez adapter l’exemple pour les exigences de votre application. Comme meilleure pratique, Databricks recommande d’utiliser les paramètres de pipeline pour configurer des variables d’application. Votre code de pipeline utilise ensuite la spark.conf.get() fonction pour récupérer des valeurs.
from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dp.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dp.expect("valid_topic", "topic IS NOT NULL")
@dp.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Créer le pipeline
Créez un pipeline avec un fichier source Python et entrez le code ci-dessus.
Le code fait référence aux paramètres configurés. Utilisez la configuration JSON suivante, en remplaçant les valeurs d’Espace réservé par les valeurs appropriées pour votre environnement (consultez la liste JSON ci-après). Vous pouvez définir les paramètres à l’aide de l’interface utilisateur des paramètres ou en modifiant directement le json des paramètres. Pour plus d’informations sur l’utilisation des paramètres de pipeline pour paramétrer votre pipeline, consultez Utiliser des paramètres avec des pipelines.
Ce fichier de paramètres définit également l’emplacement de stockage d’un compte de stockage Azure Data Lake Storage (ADLS). Comme meilleure pratique, ce pipeline n’utilise pas le chemin de stockage DBFS par défaut, mais utilise plutôt un compte de stockage ADLS. Pour plus d’informations sur la configuration de l’authentification pour un compte de stockage ADLS, consultez Accès sécurisé aux informations d’identification de stockage avec des secrets dans un pipeline.
{
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
}
}
Remplacez les espaces réservés suivants :
-
<container-name>avec le nom d’un conteneur de compte de stockage Azure. -
<storage-account-name>avec le nom d’un compte de stockage ADLS. -
<eh-namespace>par le nom de votre espace de noms Event Hubs. -
<eh-policy-name>avec la clé d’étendue secrète pour la clé de stratégie Event Hubs. -
<eventhub>avec le nom de votre instance Event Hubs. -
<secret-scope-name>avec le nom de l’étendue de secret Azure Databricks qui contient la clé de stratégie Event Hubs.