Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
La prise en charge des hooks d'événement est en version préliminaire publique.
Vous pouvez utiliser des hooks d’événements pour ajouter des fonctions de rappel Python personnalisées qui s’exécutent lorsque les événements sont conservés dans le journal des événements d’un pipeline. Vous pouvez utiliser des hooks d’événement pour implémenter des solutions personnalisées de surveillance et d’alerte. Par exemple, vous pouvez utiliser des hooks d'événement pour envoyer des e-mails ou écrire dans un journal lorsque des événements spécifiques se produisent ou pour intégrer des solutions tierces pour surveiller les événements de pipeline.
Définissez un hook d’événement avec une fonction Python qui accepte un seul argument, où l’argument est un dictionnaire représentant un événement. Incluez ensuite les hooks d’événements dans le cadre du code source d’un pipeline. Tous les hooks d’événements définis dans un pipeline tenteront de traiter tous les événements générés pendant chaque mise à jour de pipeline. Si votre pipeline est composé de plusieurs fichiers de code source, tous les hooks d’événements définis sont appliqués à l’ensemble du pipeline. Bien que les hooks d'événement soient inclus dans le code source de votre pipeline, ils ne sont pas inclus dans le graphique du pipeline.
Vous pouvez utiliser des hooks d’événements avec des pipelines qui publient sur le metastore Hive ou Unity Catalog.
Note
- Python est le seul langage pris en charge pour définir des hooks d'événements. Pour définir des fonctions Python personnalisées qui traitent des événements dans un pipeline implémenté à l’aide de l’interface SQL, ajoutez les fonctions personnalisées dans un fichier source Python distinct qui s’exécute dans le cadre du pipeline. Les fonctions Python sont appliquées à l’ensemble du pipeline lors de l’exécution du pipeline.
- Les hooks d’événements sont déclenchés uniquement pour les événements où maturity_level est
STABLE. - Les hooks d'événements sont exécutés de manière asynchrone à partir des mises à jour du pipeline, mais de manière synchrone avec d'autres hooks d'événements. Cela signifie qu'un seul hook d'événement s'exécute à la fois, et que les autres hooks d'événement attendent de s'exécuter jusqu'à ce que le hook d'événement en cours d'exécution se termine. Si un hook d'événement s'exécute indéfiniment, il bloque tous les autres hooks d'événement.
- Lakeflow Spark Declarative Pipelines (SDP) tente d’exécuter chaque hook d’événement sur chaque événement émis lors d’une mise à jour de pipeline. Pour s'assurer que les crochets d’événements en retard ont le temps de traiter tous les événements mis en file d’attente, SDP attend une période fixe non configurable avant de mettre fin à l'opération de calcul qui exécute le pipeline. Cependant, il n'est pas garanti que tous les hooks soient déclenchés sur tous les événements avant la fin du calcul.
Surveiller le traitement des hooks d'événements
Utilisez le hook_progress type d’événement dans le journal des événements de pipeline pour surveiller l’état des hooks d’événements d’une mise à jour. Pour éviter les dépendances circulaires, les hooks d'événements ne sont pas déclenchés pour les événements hook_progress.
Définir un hook d'événement
Pour définir un hook d'événement, utilisez l'élément décoratif on_event_hook :
@dp.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
max_allowable_consecutive_failures décrit le nombre maximum de fois consécutives où un hook d'événement peut échouer avant d'être désactivé. L'échec d'un hook d’événement est défini à chaque fois que le hook d’événement lève une exception. Si un crochet d'événement est désactivé, il ne traite pas les nouveaux événements tant que le pipeline n'est pas redémarré.
max_allowable_consecutive_failures doit être un entier supérieur ou égal à 0 ou None. La valeur None (affectée par défaut) signifie qu’il n’existe aucune limite au nombre d’échecs consécutifs autorisés pour le hook d’événement et que le hook d’événement n’est jamais désactivé.
Les échecs des hooks d’événements et la désactivation des hooks d’événements peuvent être surveillés dans le journal des événements en tant qu’événements hook_progress.
La fonction de hook d'événement doit être une fonction Python qui accepte exactement un paramètre, une représentation de dictionnaire de l'événement qui a déclenché ce hook d'événement. Toute valeur de retour de la fonction de hook d'événement est ignorée.
Exemple : Sélectionner des événements spécifiques pour le traitement
L'exemple suivant illustre un hook d'événement qui sélectionne des événements spécifiques à traiter. Plus précisément, cet exemple attend que les événements STOPPING du pipeline soient reçus, puis génère un message dans les journaux du pilote stdout.
@dp.on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Exemple : envoyer tous les événements à un canal Slack
L'exemple suivant implémente un hook, d'événement qui envoie tous les événements reçus à un canal Slack à l'aide de l'API Slack.
Cet exemple utilise un secret Databricks pour stocker en toute sécurité un jeton requis pour l'authentification auprès de l'API Slack.
from pyspark import pipelines as dp
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@dp.on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Exemple : configurer un hook d'événement pour le désactiver après quatre échecs consécutifs
L'exemple suivant montre comment configurer un crochet d'événement qui est désactivé s'il échoue quatre fois consécutivement.
from pyspark import pipelines as dp
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@dp.on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Exemple : Pipeline avec un hook d’événement
L'exemple suivant montre l'ajout d'un hook d'événement au code source d'un pipeline. Il s'agit d'un exemple simple mais complet d'utilisation de hooks d'événement avec un pipeline.
from pyspark import pipelines as dp
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@dp.table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@dp.on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})