Partager via


Générer une application avec état personnalisé

Important

Cette fonctionnalité est disponible en préversion publique dans Databricks Runtime 16.2 et versions ultérieures.

Vous pouvez créer des applications de diffusion en continu à l’aide d’opérateurs avec état personnalisés pour implémenter des solutions à faible latence et en quasi-temps réel qui utilisent une logique avec état arbitraire. Les opérateurs personnalisés avec état ouvrent de nouveaux cas d'usage opérationnels et des modèles non disponibles avec le traitement structuré traditionnel en streaming.

Remarque

Databricks recommande d’utiliser les fonctionnalités Structured Streaming intégrées pour les opérations avec état prises en charge, telles que les agrégations, la déduplication et les jointures de flux. Consultez Qu’est-ce que la diffusion en continu avec état ?

Databricks recommande d’utiliser transformWithState plutôt que des opérateurs hérités pour les transformations d’état arbitraires. Pour obtenir de la documentation sur les opérateurs hérités flatMapGroupsWithState et mapGroupsWithState, consultez Les opérateurs hérités arbitraires avec état.

Spécifications

L’opérateur transformWithState et les API et classes associées ont les exigences suivantes :

  • Disponible dans Databricks Runtime 16.2 et versions ultérieures.
  • Le calcul doit utiliser le mode d’accès dédié ou sans isolation, sauf que le mode d’accès standard est pris en charge pour Python (transformWithStateInPandas) dans Databricks Runtime 16.3 et versions ultérieures, et pour Scala (transformWithState) dans Databricks Runtime 17.3 et versions ultérieures.
  • Vous devez utiliser le fournisseur de stockage d'état RocksDB. Databricks recommande d’activer RocksDB dans le cadre de la configuration de calcul.

Remarque

Pour activer le fournisseur de magasin d’état RocksDB pour la session active, exécutez les opérations suivantes :

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Qu’est-ce que transformWithState ?

L’opérateur transformWithState applique un processeur avec état personnalisé à une requête Structured Streaming. Vous devez implémenter un processeur avec état personnalisé pour utiliser transformWithState. Structured Streaming inclut des API pour la création de votre processeur avec état à l’aide de Python, Scala ou Java.

Vous utilisez transformWithState pour appliquer une logique personnalisée à une clé de regroupement pour les enregistrements traités de manière incrémentielle avec Structured Streaming. Les éléments suivants décrivent la conception générale :

  • Définissez une ou plusieurs variables d’état.
  • Les informations d’état sont conservées pour chaque clé de regroupement et sont accessibles pour chaque variable d’état en fonction de la logique définie par l’utilisateur.
  • Pour chaque micro lot traité, tous les enregistrements de la clé sont disponibles en tant qu’itérateur.
  • Utilisez des poignées intégrées pour contrôler quand et comment les enregistrements sont émis en fonction des minuteurs et des conditions définies par les utilisateurs.
  • Les valeurs d’état prennent en charge les définitions de durée de vie (TTL) individuelles, ce qui permet une flexibilité dans la gestion de l’expiration de l’état et de la taille de l’état.

Étant donné que transformWithState prend en charge l’évolution du schéma dans le magasin d’états, vous pouvez itérer et mettre à jour vos applications de production sans perdre les informations d’état historiques ou avoir besoin de retraiter les enregistrements, offrant une flexibilité pour le développement et la facilité de maintenance. Consultez l’évolution du schéma dans le store d’états.

Important

PySpark utilise l’opérateur transformWithStateInPandas au lieu de transformWithState. La documentation Azure Databricks utilise transformWithState pour décrire les fonctionnalités des implémentations Python et Scala.

Les implémentations Scala et Python des API associées diffèrent en raison de spécificités du transformWithState langage, mais fournissent les mêmes fonctionnalités. Reportez-vous à des exemples spécifiques au langage et à la documentation de l’API pour votre langage de programmation préféré.

Handles de traitement intégrés

Vous implémentez la logique principale de votre application avec état personnalisé en implémentant des gestionnaires à l’aide de handles intégrés.

  • Les handles fournissent les méthodes permettant d’interagir avec les valeurs d’état et les minuteurs, de traiter les enregistrements entrants et d’émettre des enregistrements.
  • Les gestionnaires définissent votre logique personnalisée basée sur les événements.

Les handles pour chaque type d’état sont implémentés en fonction de la structure de données sous-jacente, mais chacun contient des fonctionnalités permettant d’obtenir, placer, mettre à jour et supprimer des enregistrements.

Les gestionnaires sont implémentés en fonction des événements observés dans les enregistrements d’entrée ou les minuteurs, à l’aide de la sémantique suivante :

  • Définissez un gestionnaire à l’aide de la handleInputRows méthode pour contrôler le traitement des données, l’état est mis à jour et les enregistrements sont émis pour chaque micro lot d’enregistrements traités pour la clé de regroupement. Consultez Gérer les lignes d’entrée.
  • Définissez un gestionnaire en utilisant la méthode handleExpiredTimer pour appliquer des seuils basés sur le temps afin d'exécuter la logique, que des enregistrements supplémentaires soient ou non traités pour la clé de regroupement. Consultez les événements chronométrés du programme.

Le tableau suivant présente une comparaison des comportements fonctionnels pris en charge par ces gestionnaires :

Comportement handleInputRows handleExpiredTimer
Obtenir, placer, mettre à jour ou effacer les valeurs d’état Oui Oui
Créer ou supprimer un minuteur Oui Oui
Émettre des enregistrements Oui Oui
Itérer sur les enregistrements dans le micro batch actuel Oui Non
Logique de déclenchement basée sur le temps écoulé Non Oui

Vous pouvez combiner handleInputRows et handleExpiredTimer implémenter une logique complexe en fonction des besoins.

Par exemple, vous pouvez implémenter une application qui utilise handleInputRows pour mettre à jour les valeurs d’état pour chaque micro-lot et définir un minuteur de 10 secondes à l’avenir. Si aucun enregistrement supplémentaire n’est traité, vous pouvez utiliser handleExpiredTimer pour émettre les valeurs actuelles dans le magasin d’états. Si de nouveaux enregistrements sont traités pour la clé de regroupement, vous pouvez effacer le minuteur existant et définir un nouveau minuteur.

Types d’état personnalisés

Vous pouvez implémenter plusieurs objets d’état dans un seul opérateur avec état. Les noms que vous attribuez à chaque objet d’état persistent dans le magasin d’état, auquel vous pouvez accéder avec le lecteur du magasin d’état. Si votre objet d’état utilise un StructType, vous fournissez des noms pour chaque champ du struct tout en passant le schéma. Ces noms sont également visibles lors de la lecture du stockage d'état. Consultez les informations sur l’état de la diffusion en continu structurée.

Les fonctionnalités fournies par les classes et opérateurs intégrés sont destinées à fournir une flexibilité et une extensibilité, et les choix d’implémentation doivent être informés par la logique complète que votre application doit exécuter. Par exemple, vous pouvez implémenter une logique à peu près identique à l’aide d’un ValueState regroupé par champs user_id et session_id ou d’un MapState regroupé par user_idsession_id est la clé de MapState. Dans ce cas, un MapState pourrait être l’implémentation préférée si la logique doit évaluer les conditions à travers plusieurs session_ids.

Les sections suivantes décrivent les types d’état pris en charge par transformWithState.

ValueState

Pour chaque clé de regroupement, il existe une valeur associée.

Un état de valeur peut inclure des types complexes, tels qu’un struct ou un tuple. Lorsque vous mettez à jour un ValueState, vous implémentez la logique pour remplacer la valeur entière. La durée de vie d’un état de valeur est réinitialisée lorsque la valeur est mise à jour, mais elle n’est pas réinitialisée si une clé source correspondant à ValueState est traitée sans mise à jour du stockage ValueState.

ListState

Pour chaque clé de regroupement, il existe une liste associée.

Un état de liste est une collection de valeurs, chacune pouvant inclure des types complexes. Chaque valeur d’une liste a sa propre durée de vie. Vous pouvez ajouter des éléments à une liste en ajoutant des éléments individuels, en ajoutant une liste d’éléments ou en remplaçant toute la liste par un put. Seule l'opération de mise est considérée comme une mise à jour pour réinitialiser le temps de vie de stockage.

MapState

Pour chaque clé de regroupement, il existe une carte associée. Les cartes sont l’équivalent fonctionnel Apache Spark d’une dictée Python.

Important

Les clés de regroupement décrivent les champs spécifiés dans la clause de la GROUP BY requête Structured Streaming. Les états de mappage contiennent un nombre arbitraire de paires clé-valeur pour une clé de regroupement.

Par exemple, si vous regroupez user_id et souhaitez définir une carte pour chaque session_id, votre clé de regroupement est user_id et la clé de votre carte est session_id.

Un état de mappage est une collection de clés distinctes, chacune associée à une valeur pouvant inclure des types complexes. Chaque paire clé-valeur dans une carte a sa propre durée de vie. Vous pouvez mettre à jour la valeur d’une clé spécifique ou supprimer une clé et sa valeur. Vous pouvez retourner une valeur individuelle à l’aide de sa clé, répertorier toutes les clés, répertorier toutes les valeurs ou renvoyer un itérateur pour travailler avec l’ensemble complet de paires clé-valeur dans la carte.

Initialiser une variable d’état personnalisée

Lorsque vous initialisez votre StatefulProcessor, vous créez une variable locale pour chaque objet d’état qui vous permet d’interagir avec des objets d’état dans votre logique personnalisée. Les variables d’état sont définies et initialisées en substituant la méthode intégrée dans la classe init.

Vous définissez une quantité arbitraire d'objets d'état à l'aide des méthodes getValueState, getListState et getMapState lors de l'initialisation de votre StatefulProcessor.

Chaque objet d’état doit avoir les éléments suivants :

  • Un nom unique
  • Schéma spécifié
    • En Python, le schéma est spécifié explicitement.
    • Dans Scala, passez un schéma d’état Encoder pour spécifier le schéma d’état.

Vous pouvez également fournir une durée de vie (TTL) facultative en millisecondes. Si vous implémentez un état de carte, vous devez fournir une définition de schéma distincte pour les clés de carte et les valeurs.

Remarque

La logique de la façon dont les informations d’état sont interrogées, mises à jour et émises est gérée séparément. Consultez Utiliser vos variables d’état.

Exemple d’application avec état

L’exemple suivant illustre la syntaxe de base pour la définition et l’utilisation d’un processeur avec état personnalisé avec transformWithStatedes exemples de variables d’état pour chaque type pris en charge. Pour plus d’exemples, consultez Exemples d’applications avec état.

Remarque

Python utilise des tuples pour toutes les interactions avec les valeurs d’état. Cela signifie que le code Python doit passer des valeurs à l’aide de tuples lors de l’utilisation d’opérations telles que put et update et s'attendre à gérer des tuples lors de l’utilisation des get.

Par exemple, si le schéma de votre état de valeur n’est qu’un entier unique, vous implémentez du code comme suit :

current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0]  # Extracts the first item in the tuple
new_value = current_value + 1           # Calculate a new value
value_state.update((new_value,))        # Pass the new value formatted as a tuple

Cela est également vrai pour les éléments dans un ListState ou valeurs dans un MapState.

Python

import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

output_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("countAsString", StringType(), True),
    ]
)

class SimpleCounterProcessor(StatefulProcessor):
  def init(self, handle: StatefulProcessorHandle) -> None:
    value_state_schema = StructType([StructField("count", IntegerType(), True)])
    list_state_schema = StructType([StructField("count", IntegerType(), True)])
    self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
    self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
    # Schema can also be defined using strings and SQL DDL syntax
    self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")

  def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
    count = 0
    for pdf in rows:
      list_state_rows = [(120,), (20,)] # A list of tuples
      self.list_state.put(list_state_rows)
      self.list_state.appendValue((111,))
      self.list_state.appendList(list_state_rows)
      pdf_count = pdf.count()
      count += pdf_count.get("value")
    self.value_state.update((count,)) # Count is passed as a tuple
    iter = self.list_state.get()
    list_state_value = next(iter1)[0]
    value = count
    user_key = ("user_key",)
    if self.map_state.exists():
      if self.map_state.containsKey(user_key):
        value += self.map_state.getValue(user_key)[0]
    self.map_state.updateValue(user_key, (value,)) # Value is a tuple
    yield pd.DataFrame({"id": key, "countAsString": str(count)})

q = (df.groupBy("key")
  .transformWithStateInPandas(
    statefulProcessor=SimpleCounterProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",
  )
  .writeStream...
)

Langage de programmation Scala

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
  @transient private var countState: ValueState[Int] = _
  @transient private var listState: ListState[Int] = _
  @transient private var mapState: MapState[String, Int] = _

  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    countState = getHandle.getValueState[Int]("countState",
      Encoders.scalaLong, TTLConfig.NONE)
    listState = getHandle.getListState[Int]("listState",
      Encoders.scalaInt, TTLConfig.NONE)
    mapState = getHandle.getMapState[String, Int]("mapState",
      Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, String)],
      timerValues: TimerValues): Iterator[(String, String)] = {
    var count = countState.getOption().getOrElse(0)
    for (row <- inputRows) {
      val listData = Array(120, 20)
      listState.put(listData)
      listState.appendValue(count)
      listState.appendList(listData)
      count += 1
    }
    val iter = listState.get()
    var listStateValue = 0
    if (iter.hasNext) {
      listStateValue = iter.next()
    }
    countState.update(count)
    var value = count
    val userKey = "userKey"
    if (mapState.exists()) {
      if (mapState.containsKey(userKey)) {
        value += mapState.getValue(userKey)
      }
    }
    mapState.updateValue(userKey, value)
    Iterator((key, count.toString))
  }
}

val q = spark
        .readStream
        .format("delta")
        .load("$srcDeltaTableDir")
        .as[(String, String)]
        .groupByKey(x => x._1)
        .transformWithState(
            new SimpleCounterProcessor(),
            TimeMode.None(),
            OutputMode.Update(),
        )
        .writeStream...

StatefulProcessorHandle

PySpark inclut la StatefulProcessorHandle classe pour fournir l’accès aux fonctions qui contrôlent la façon dont votre code Python défini par l’utilisateur interagit avec les informations d’état. Vous devez toujours importer et passer la variable StatefulProcessorHandle à handle lors de l’initialisation d’un StatefulProcessor.

La handle variable lie la variable locale dans votre classe Python à la variable d’état.

Remarque

Scala utilise la getHandle méthode.

Spécifier l’état initial

Vous pouvez éventuellement fournir un état initial à utiliser avec le premier micro-lot. Cela peut être utile lors de la migration d’un flux de travail existant vers une nouvelle application personnalisée, de la mise à niveau d’un opérateur avec état pour modifier votre schéma ou votre logique, ou de réparer une défaillance qui ne peut pas être réparée automatiquement et nécessite une intervention manuelle.

Remarque

Utilisez le lecteur du magasin d’états pour interroger les informations d’état à partir d’un point de contrôle existant. Consultez les informations sur l’état de la diffusion en continu structurée.

Si vous convertissez une table Delta existante en application avec état, lisez la table à l’aide spark.read.table("table_name") et transmettez le DataFrame résultant. Vous pouvez optionnellement sélectionner ou modifier des champs pour les adapter à votre nouvelle application stateful.

Vous fournissez un état initial à l’aide d’un DataFrame avec le même schéma de clé de regroupement que les lignes d’entrée.

Remarque

Python utilise handleInitialState pour spécifier l’état initial lors de la définition d’un StatefulProcessor. Scala utilise la classe StatefulProcessorWithInitialStatedistincte .

Utiliser vos variables d’état

Les objets d’état pris en charge fournissent des méthodes pour obtenir l’état, mettre à jour les informations d’état existantes ou effacer l’état actuel. Chaque type d’état pris en charge a une implémentation unique de méthodes qui correspondent à la structure de données implémentée.

Chaque clé de regroupement observée a des informations d’état dédiées.

  • Les enregistrements sont émis en fonction de la logique que vous implémentez et à l’aide du schéma de sortie que vous spécifiez. Consultez Les enregistrements d’émission.
  • Vous pouvez accéder aux valeurs dans le registre d’états à l’aide du statestore lecteur. Ce lecteur dispose de fonctionnalités par lots et n’est pas destiné aux charges de travail à faible latence. Consultez les informations sur l’état de la diffusion en continu structurée.
  • La logique spécifiée en utilisant handleInputRows ne se déclenche que si des enregistrements pour la clé sont présents dans un micro-lot de données. Consultez Gérer les lignes d’entrée.
  • Utiliser handleExpiredTimer pour implémenter une logique basée sur le temps qui ne dépend pas de l’observation des enregistrements pour fonctionner. Consultez les événements chronométrés du programme.

Remarque

Les objets d’état sont isolés en regroupant les clés avec les implications suivantes :

  • Les valeurs d’état ne peuvent pas être affectées par les enregistrements associés à une autre clé de regroupement.
  • Vous ne pouvez pas implémenter la logique qui dépend de la comparaison des valeurs ou de la mise à jour de l’état entre les clés de regroupement.

Vous pouvez comparer des valeurs dans une clé de regroupement. Utilisez un MapState pour implémenter une logique avec une deuxième clé que votre logique personnalisée peut utiliser. Par exemple, regrouper par user_id et utiliser l’adresse MapState IP pour la clé vous permettrait d’implémenter une logique qui suit les sessions utilisateur simultanées.

Considérations avancées pour travailler avec l'état

L’écriture dans une variable d’état déclenche une écriture dans RocksDB. Pour optimiser les performances, Databricks recommande de traiter toutes les valeurs de l’itérateur pour une clé donnée et de valider les mises à jour en écriture unique dans la mesure du possible.

Remarque

Les mises à jour d’état sont tolérantes aux erreurs. Si une tâche se bloque avant la fin du traitement d’un micro-lot, la valeur du dernier micro-lot réussi est utilisée lors de la nouvelle tentative.

Les valeurs d’état n’ont pas de valeurs par défaut intégrées. Si votre logique nécessite la lecture des informations d’état existantes, utilisez la méthode lors de l’implémentation exists de votre logique.

Remarque

MapState les variables ont des fonctionnalités supplémentaires pour rechercher des clés individuelles ou répertorier toutes les clés afin d’implémenter la logique pour l’état Null.

Émettre des enregistrements

La logique définie par l’utilisateur contrôle comment transformWithState émet des enregistrements. Les enregistrements sont émis par clé de regroupement.

Les applications personnalisées avec état ne font aucune hypothèse sur la façon dont les informations d'état sont utilisées lors de la détermination de l'émission de données, et le nombre d'enregistrements retournés pour une condition donnée peut être aucun, un ou plusieurs.

Vous implémentez la logique pour émettre des enregistrements à l’aide de handleInputRows ou de handleExpiredTimer. Consultez Gérer les lignes d’entrée et les événements chronométrés par programme.

Remarque

Vous pouvez implémenter plusieurs valeurs d’état et définir plusieurs conditions pour émettre des enregistrements, mais tous les enregistrements émis doivent utiliser le même schéma.

Python

En Python, vous définissez votre schéma de sortie à l’aide du mot clé lors de l’appel outputStructTypetransformWithStateInPandas.

Vous émettez des enregistrements à l’aide d’un objet DataFrame pandas et yield.

Vous pouvez éventuellement yield utiliser un DataFrame vide. Lorsqu'il est combiné avec le mode de sortie update, l'émission d'un DataFrame vide met à jour les valeurs de la clé de regroupement pour qu'elles soient nulles.

Langage de programmation Scala

Dans Scala, vous émettez des enregistrements à l’aide d’un Iterator objet. Le schéma de la sortie est dérivé des enregistrements émis.

Vous pouvez facultativement produire un élément vide Iterator. En cas de combinaison avec update le mode de sortie, l’émission d’une valeur vide Iterator met à jour les valeurs de la clé de regroupement pour qu’elle soit null.

Gérer les lignes d’entrée

Utilisez la handleInputRows méthode pour définir la logique de la façon dont les enregistrements observés dans votre requête de streaming interagissent avec et mettent à jour les valeurs d’état. Le gestionnaire que vous définissez avec la handleInputRows méthode s’exécute chaque fois que tous les enregistrements sont traités via votre requête Structured Streaming.

Pour la plupart des applications avec état implémentées avec transformWithState, la logique principale est définie à l’aide de handleInputRows.

Pour chaque mise à jour de micro-lot traitée, tous les enregistrements du micro-lot pour une clé de regroupement donnée sont disponibles à l’aide d’un itérateur. La logique définie par l’utilisateur peut interagir avec tous les enregistrements à partir du microbatch et des valeurs actuelles dans le magasin d’états.

Événements chronométrés du programme

Vous pouvez utiliser des minuteurs pour implémenter une logique personnalisée en fonction du temps écoulé à partir d’une condition spécifiée.

Vous travaillez avec des minuteurs en implémentant la méthode handleExpiredTimer.

Dans une clé de regroupement, les minuteurs sont identifiés de manière unique par leur horodatage.

Lorsqu’un minuteur expire, le résultat est déterminé par la logique implémentée dans votre application. Les modèles courants sont les suivants :

  • Émission d’informations stockées dans une variable d’état.
  • Évacuer les informations d’état stockées.
  • Création d’un minuteur.

Les minuteurs expirés se déclenchent même si aucun enregistrement de leur clé associée n’est traité dans une micro-tranche.

Spécifier le modèle de temps

Lorsque vous passez votre StatefulProcessor à transformWithState, vous devez spécifier le modèle temporel. Les options suivantes sont prises en charge :

  • ProcessingTime
  • EventTime
  • NoTime ou TimeMode.None()

La spécification NoTime signifie que les minuteurs ne sont pas pris en charge pour votre processeur.

Valeurs de minuteur intégrées

Databricks recommande de ne pas appeler l’horloge système dans votre application personnalisée avec état, car cela peut entraîner des tentatives de relance peu fiables en cas d’échec de tâches. Utilisez les méthodes de la TimerValues classe lorsque vous devez accéder au temps de traitement ou au filigrane :

TimerValues Descriptif
getCurrentProcessingTimeInMs Retourne l’horodatage de l’heure de traitement du lot actuel en millisecondes depuis l’époque.
getCurrentWatermarkInMs Retourne l’horodatage du filigrane du lot actuel en millisecondes depuis l’époque.

Remarque

Le temps de traitement décrit l’heure à laquelle le micro-lot est traité par Apache Spark. De nombreuses sources de diffusion en continu, telles que Kafka, incluent également le temps de traitement du système.

Les filigranes sur les requêtes de diffusion en continu sont souvent définis par rapport à l’heure de l’événement ou à l’heure de traitement de la source de diffusion en continu. Consultez Appliquer des filigranes pour contrôler les seuils de traitement des données.

Les filigranes et les fenêtres peuvent être utilisés en combinaison avec transformWithState. Vous pouvez implémenter des fonctionnalités similaires dans votre application personnalisée avec état en tirant parti du TTL, des minuteurs et des fonctionnalités MapState ou ListState.

Qu’est-ce que l’heure de vie de l’état (TTL) ?

Les valeurs d’état utilisées par transformWithState prennent chacune en charge une spécification facultative de durée de vie (TTL). Lorsque la durée de vie du TTL expire, la valeur est supprimée du stockage d’états. TTL interagit uniquement avec des valeurs dans le magasin d'état, ce qui signifie que vous pouvez implémenter une logique pour supprimer les informations d'état, mais vous ne pouvez pas déclencher directement la logique lorsque le TTL expulse les valeurs d'état.

Important

Si vous n’implémentez pas le TTL (temps d'expiration), vous devez gérer l'état avec une logique différente pour éviter une croissance infinie de l'état.

Le TTL est appliqué à chaque valeur d'état, avec des règles différentes pour chaque type d'état.

  • Les variables d’état sont limitées au regroupement de clés.
  • Pour ValueState les objets, une seule valeur est stockée par clé de regroupement. Le TTL s'applique à cette valeur.
  • Pour ListState les objets, la liste peut contenir de nombreuses valeurs. La durée de vie (TTL) s’applique à chaque valeur d’une liste, indépendamment.
  • Pour MapState les objets, chaque clé de carte a une valeur d’état associée. La durée de vie s’applique indépendamment à chaque paire clé-valeur dans une carte.

Pour tous les types d’état, le TTL est réinitialisé si les informations d’état sont mises à jour.

Remarque

Bien que le TTL soit limité à des valeurs individuelles dans un ListState, la seule façon de mettre à jour une valeur dans une liste consiste à utiliser la méthode put pour écraser l'ensemble du contenu de la variable ListState.

Quelle est la différence entre les minuteurs et le temps de vie ?

Il existe un certain chevauchement entre les minuteurs et la durée de vie (TTL) pour les variables d’état, mais les minuteurs fournissent un ensemble plus large de fonctionnalités que la durée de vie.

Le TTL élimine les informations d’état qui n’ont pas été mises à jour pendant la période spécifiée par l’utilisateur. Cela permet aux utilisateurs d’empêcher la croissance de l’état non vérifié et de supprimer les entrées d’état obsolètes. Étant donné que les cartes et les listes implémentent le TTL pour chaque valeur, vous pouvez implémenter des fonctions qui prennent uniquement en compte les valeurs d'état qui ont été mises à jour récemment en définissant le TTL.

Les minuteurs vous permettent de définir une logique personnalisée, au-delà de la suppression de l’état, y compris l’émission d’enregistrements. Vous pouvez éventuellement utiliser des minuteurs pour effacer les informations d’état d’une valeur d’état donnée, avec la flexibilité supplémentaire pour émettre des valeurs ou déclencher une autre logique conditionnelle basée sur le minuteur.