Partager via


Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires

Cet article explique comment utiliser foreachBatch avec Structured Streaming pour écrire le résultat d’une requête en continu vers des sources de données qui ne disposent pas d’un récepteur de flux existant.

Le modèle de code streamingDF.writeStream.foreachBatch(...) vous permet d’appliquer des fonctions par lot aux données de sortie de chaque micro-lot de la requête en continu. Les fonctions utilisées avec foreachBatch acceptent deux paramètres :

  • Un DataFrame contenant les données de sortie d’un micro-lot.
  • L’ID unique du micro-lot.

Vous devez utiliser foreachBatch pour les opérations de fusion Delta Lake dans Structured Streaming. Veuillez consulter la section Fusion à partir de requêtes en continu avec foreachBatch.

Appliquer d’autres opérations DataFrame

De nombreuses opérations DataFrame et DataSet ne sont pas prises en charge dans les DataFrames de streaming, Spark ne prend pas en charge la génération de plans incrémentiels dans ces cas. En utilisant foreachBatch(), vous pouvez appliquer certaines de ces opérations sur chaque sortie de micro-batch. Par exemple, vous pouvez utiliser foreachBatch() et l’opération SQL MERGE INTO pour écrire le résultat d’agrégations en continu dans une table Delta en mode mise à jour. Pour plus d’informations, consultez MERGE INTO.

Importante

  • foreachBatch() ne fournit que des garanties d'écriture au moins une fois. Mais vous pouvez utiliser le batchId fourni à la fonction pour dédupliquer la sortie et obtenir une garantie une seule fois. Dans les deux cas, vous devrez définir vous-même la sémantique de bout en bout.
  • foreachBatch() ne fonctionne pas avec le mode de traitement continu car il repose fondamentalement sur l'exécution par micro-batch d'une requête de streaming. Si vous écrivez des données en mode continu, utilisez foreach() à la place.
  • Lorsque vous utilisez foreachBatch avec un opérateur avec état, il est important de consommer entièrement chaque lot avant la fin du traitement. Veuillez consulter la section Consommer entièrement chaque DataFrame de lot

Un dataframe vide peut être appelé avec foreachBatch() et le code utilisateur doit être résilient pour permettre un fonctionnement approprié. En voici un exemple :

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Changements de comportement pour foreachBatch dans Databricks Runtime 14.0

Dans Databricks Runtime 14.0 et versions ultérieures sur le calcul configuré avec le mode d’accès standard, les modifications de comportement suivantes s’appliquent :

  • Les commandes print() écrivent la sortie dans les journaux du driver.
  • Vous ne pouvez pas accéder au sous-module dbutils.widgets à l’intérieur de la fonction.
  • Tous les fichiers, modules ou objets référencés dans la fonction doivent être sérialisables et disponibles sur Spark.

Réutiliser les sources de données de lot existantes

Grâce à foreachBatch(), vous pouvez utiliser les writers de données batch existants pour des récepteurs de données qui ne sont peut-être pas pris en charge par Structured Streaming. Voici quelques exemples :

De nombreuses autres sources de données par lots peuvent être utilisées à partir de foreachBatch(). Consultez Se connecter aux sources de données et aux services externes.

Écrire dans plusieurs emplacements

Si vous devez écrire le résultat d’une requête en continu vers plusieurs emplacements, Databricks recommande d’utiliser plusieurs writers Structured Streaming pour une meilleure parallélisation et un meilleur débit.

L’utilisation de foreachBatch pour écrire vers plusieurs récepteurs sérialise l’exécution des écritures en continu, ce qui peut augmenter la latence de chaque micro-lot.

Si vous utilisez foreachBatch pour écrire dans plusieurs tables Delta, veuillez consulter la section Écritures idempotentes dans les tables dans foreachBatch.

Consommer entièrement chaque DataFrame de lot

Lorsque vous utilisez des opérateurs d'état (par exemple, en utilisant dropDuplicatesWithinWatermark), chaque itération de lot doit consommer l’intégralité du DataFrame ou redémarrer la requête. Si vous n’utilisez pas l’intégralité du DataFrame, la requête de streaming échoue avec le lot suivant.

Cela peut se produire dans plusieurs cas. Les exemples suivants montrent comment corriger les requêtes qui ne consomment pas correctement un DataFrame.

Utilisation intentionnelle d’un sous-ensemble du lot

Si vous vous souciez uniquement d’un sous-ensemble du lot, vous pouvez avoir du code tel que le suivant.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Dans ce cas, le batch_df.show(2) seul gère les deux premiers éléments du lot, ce qui est attendu, mais s’il y a plus d’éléments, ils doivent être consommés. Le code suivant utilise le DataFrame complet.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Ici, la do_nothing fonction ignore silencieusement le reste du DataFrame.

Gestion d’une erreur dans un lot

Une erreur peut se produire lors de l’exécution d’un foreachBatch processus. Vous pouvez avoir du code tel que le suivant (dans ce cas, l’exemple déclenche intentionnellement une erreur pour afficher le problème).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

En gérant (et en ignorant silencieusement) l’erreur, le reste du lot peut ne pas être consommé. Il existe deux options pour gérer cette situation.

Vous pouvez d’abord relancer l’erreur pour qu’elle soit transmise à votre couche d’orchestration afin de relancer le lot. Cela peut résoudre l’erreur, s’il s’agit d’un problème temporaire ou le déclencher pour que votre équipe des opérations tente de corriger manuellement. Pour ce faire, modifiez le partial_func code pour qu’il ressemble à ceci :

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

La deuxième option, si vous souhaitez intercepter l’exception et ignorer le reste du lot, consiste à remplacer le code par celui-ci.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Ce code utilise la do_nothing fonction pour ignorer silencieusement le reste du lot.