Partager via


Charger et traiter des données de manière incrémentielle avec des flux de pipelines déclaratifs Spark Lakeflow

Les données sont traitées dans des pipelines à travers des flux. Chaque flux se compose d’une requête et, généralement, d’une cible. Le processus de flux traite la requête, soit en tant que lot, soit de manière incrémentielle comme un flux de données vers la cible. Un flux réside au sein d'un pipeline dans Lakeflow Spark Declarative Pipelines.

En règle générale, les flux sont définis automatiquement lorsque vous créez une requête dans un pipeline qui met à jour une cible, mais vous pouvez également définir explicitement des flux supplémentaires pour un traitement plus complexe, comme l’ajout à une cible unique à partir de plusieurs sources.

Mises à jour

Un flux de travail est exécuté chaque fois que le pipeline qui le définit est mis à jour. Le flux crée ou met à jour des tables avec les données les plus récentes disponibles. Selon le type de flux et l’état des modifications apportées aux données, la mise à jour peut effectuer une actualisation incrémentielle, qui traite uniquement les nouveaux enregistrements ou effectue une actualisation complète, qui retraite tous les enregistrements de la source de données.

Créer un flux par défaut

Lorsque vous créez un pipeline, vous définissez généralement une table ou une vue avec la requête qui la prend en charge. Par exemple, dans cette requête SQL, vous créez une table de diffusion en continu appelée customers_silver en lisant la table appelée customers_bronze.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Vous pouvez également créer la même table de diffusion en continu en Python. En Python, vous utilisez des pipelines en créant une fonction de requête qui retourne un DataFrame, avec des décorateurs pour ajouter des fonctionnalités des pipelines déclaratifs Lakeflow Spark.

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

Dans cet exemple, vous avez créé une table de streaming. Vous pouvez également créer des vues matérialisées avec une syntaxe similaire dans SQL et Python. Pour plus d’informations, consultez les tables de diffusion en continu et les vues matérialisées.

Cet exemple crée un flux par défaut avec la table de diffusion en continu. Le flux par défaut d’une table de diffusion en continu est un flux d’ajout , qui ajoute de nouvelles lignes avec chaque déclencheur. Il s’agit de la façon la plus courante d’utiliser des pipelines : créez un flux et la cible en une seule étape. Vous pouvez utiliser ce style pour ingérer des données ou transformer des données.

Les flux d’ajout prennent également en charge le traitement qui nécessite la lecture de données à partir de plusieurs sources de diffusion en continu pour mettre à jour une cible unique. Par exemple, vous pouvez utiliser la fonctionnalité d’ajout de flux lorsque vous disposez d’une table et d’un flux de streaming existants et que vous souhaitez ajouter une nouvelle source de diffusion en continu qui écrit dans cette table de diffusion en continu existante.

Utilisation de plusieurs flux pour écrire dans une seule cible

Dans l’exemple précédent, vous avez créé un flux et une table de diffusion en continu en une seule étape. Vous pouvez également créer des flux pour une table créée précédemment. Dans cet exemple, vous pouvez voir la création d’une table et le flux associé à celui-ci dans des étapes distinctes. Ce code a des résultats identiques à la création d’un flux par défaut, notamment en utilisant le même nom pour la table de diffusion en continu et le flux.

Python

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

La création d’un flux indépendamment de la cible signifie que vous pouvez également créer plusieurs flux qui ajoutent des données à la même cible.

Utilisez le @dp.append_flow décorateur dans l’interface Python ou la CREATE FLOW...INSERT INTO clause de l’interface SQL pour créer un nouveau flux, par exemple pour cibler une table de flux à partir de plusieurs sources de flux. Utilisez le flux d’ajout pour traiter des tâches telles que les suivantes :

  • Ajoutez des sources de diffusion en continu qui ajoutent des données à une table de diffusion en continu existante sans nécessiter une actualisation complète. Par exemple, vous pouvez avoir une table combinant des données régionales de chaque région dans laquelle vous travaillez. À mesure que de nouvelles régions sont déployées, vous pouvez ajouter les nouvelles données de région à la table sans effectuer une actualisation complète. Pour obtenir un exemple d’ajout de sources de diffusion en continu à une table de streaming existante, consultez Exemple : Écrire dans une table de diffusion en continu à partir de plusieurs rubriques Kafka.
  • Mettez à jour une table de diffusion en continu en ajoutant des données historiques manquantes (remplissage). Vous pouvez utiliser la INSERT INTO ONCE syntaxe pour créer un ajout de remplissage historique qui s’exécute une seule fois. Par exemple, vous disposez d’une table de diffusion en continu existante qui est alimentée par un topic Apache Kafka. Vous disposez également de données historiques stockées dans une table que vous avez besoin d’insérer exactement une fois dans la table de diffusion en continu, et vous ne pouvez pas diffuser les données, car votre traitement inclut l’exécution d’une agrégation complexe avant d’insérer les données. Pour obtenir un exemple de rétro-remplissage, consultez Rétro-remplissage des données historiques avec des pipelines.
  • Combinez les données de plusieurs sources et écrivez-les dans une seule table de flux au lieu d'utiliser la clause UNION dans une requête. L’utilisation du traitement par flux d’ajout au lieu de UNION vous permet de mettre à jour la table cible de manière incrémentielle sans exécuter une mise à jour d’actualisation complète. Pour obtenir un exemple d’union effectuée de cette façon, consultez Exemple : Utiliser le traitement de flux d’ajout au lieu de UNION.

La cible pour la sortie des enregistrements par le traitement de flux d’ajout peut être une table existante ou une nouvelle table. Pour les requêtes Python, utilisez la fonction create_streaming_table() pour créer une table cible.

L’exemple suivant ajoute deux flux pour la même cible, créant une union des deux tables sources :

Python

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Important

  • Si vous devez définir des contraintes de qualité des données avec des attentes, définissez les attentes sur la table cible dans le cadre de la create_streaming_table() fonction ou sur une définition de table existante. Vous ne pouvez pas définir les attentes dans la @append_flow définition.
  • Les flux sont identifiés par un nom de flux et ce nom est utilisé pour identifier les points de contrôle de diffusion en continu. L’utilisation du nom de flux pour identifier le point de contrôle signifie ce qui suit :
    • Si un flux existant dans un pipeline est renommé, le point de contrôle n’est pas reporté et le flux renommé est effectivement un flux entièrement nouveau.
    • Vous ne pouvez pas réutiliser un nom de flux dans un pipeline, car le point de contrôle existant ne correspond pas à la nouvelle définition de flux.

Types de flux

Les flux par défaut pour les tables de streaming et les vues matérialisées sont des flux d’ajout. Vous pouvez également créer des flux pour lire à partir de sources de capture de données modifiées. Le tableau suivant décrit les différents types de flux.

Type de flux Descriptif
Append Les flux d’ajout sont le type de flux le plus courant, où de nouveaux enregistrements dans la source sont écrits dans la cible avec chaque mise à jour. Ils correspondent au mode d’ajout en streaming structuré. Vous pouvez ajouter l’indicateur ONCE , indiquant une requête par lot dont les données doivent être insérées dans la cible une seule fois, sauf si la cible est entièrement actualisée. Un nombre quelconque de flux d’ajout peut écrire dans une cible particulière.
Les flux par défaut (créés avec la table de diffusion en continu cible ou la vue matérialisée) ont le même nom que la cible. Les autres cibles n’ont pas de flux par défaut.
Auto CDC (précédemment appliquer les modifications) Un processus Auto CDC ingère une requête contenant des données de capture de modifications des données (CDC). Les flux CDC automatiques peuvent uniquement cibler des tables de diffusion en continu, et la source doit être une source de diffusion en continu (même dans le cas des flux ONCE). Plusieurs flux CDC automatiques peuvent cibler une table de streaming unique. Une table de diffusion en continu qui agit comme cible pour un flux de capture de données modifiées automatique ne peut être ciblée que par d’autres flux de capture de données modifiées automatiques.
Pour plus d’informations sur les données CDC, consultez les APIs CDC AUTO : Simplifiez la capture de données modifiées avec des pipelines.

Informations supplémentaires

Pour plus d’informations sur les flux et leur utilisation, consultez les rubriques suivantes :