Partager via


Modèle d’extraction de flux de modification dans Azure Cosmos DB

Utilisez le modèle d’extraction de flux de modification pour consommer le flux de modification Azure Cosmos DB à votre rythme. Comme avec le processeur de flux de modification, vous pouvez utiliser le modèle pull du flux de modification pour paralléliser le traitement des modifications entre plusieurs consommateurs de flux de modification.

Comparaison avec le processeur de flux de modification

De nombreux scénarios permettent de traiter le flux de modification à l’aide du processeur de flux de modification ou du modèle de tirage du flux de modification. Les jetons de continuation du modèle d’extraction et le conteneur de baux du processeur de flux de modification font office de signets pour le dernier élément (ou lot d’éléments) traité dans le flux de modification.

Toutefois, vous ne pouvez pas convertir les jetons de continuation en bail (ou inversement).

Note

Dans la plupart des cas, lorsque vous devez lire à partir du flux de modification, l’option la plus simple consiste à utiliser le processeur de flux de modification.

Envisagez plutôt d’utiliser le modèle de tirage dans les scénarios suivants :

  • Pour lire les modifications d’une clé de partition spécifique
  • Pour contrôler le rythme auquel votre client reçoit des modifications pour le traitement
  • Pour effectuer une lecture unique des données existantes dans le flux de modification (par exemple, pour effectuer une migration de données)

Voici quelques différences clés entre le processeur de flux de modification et le modèle d’extraction de flux de modification :

Caractéristique Processeur de flux de modification Modèle d’extraction de flux de modification
Suivi du point actuel dans le traitement du flux de modification Bail (stocké dans un conteneur Azure Cosmos DB) Jeton de continuation (stocké en mémoire ou rendu persistant manuellement)
Possibilité de relire les modifications passées Oui, avec le modèle d’envoi Oui, avec le modèle d’extraction
Sondage sur les modifications futures Vérifie automatiquement les modifications en fonction de la valeur WithPollInterval spécifiée par l’utilisateur Manuel
Comportement en l’absence de nouvelles modifications Attend automatiquement la valeur de WithPollInterval, puis relance la vérification Vous devez vérifier l’état et revérifier manuellement
Traitement des modifications d’un conteneur entier Oui, avec parallélisation automatique sur plusieurs threads et machines qui consomment depuis le même conteneur Oui avec parallélisation manuelle à l’aide de FeedRange
Traitement des modifications à partir d’une unique clé de partition Non prise en charge Oui

Note

Contrairement à la lecture à l'aide du processeur de flux de modification, lorsque vous utilisez le modèle de récupération, vous devez explicitement gérer les cas où il n'y a pas de nouvelles modifications. Cela est indiqué par un HTTP 304 NotModified. Une réponse de flux de modification retournant 0 documents avec un code d’état HTTP 200 OK ne signifie pas nécessairement que vous avez atteint la fin du flux de modification ; vous devez donc continuer à interroger.

Utilisation du modèle d’extraction

Pour traiter le flux de modification avec le modèle pull, créez une instance du FeedIterator. Lorsque vous créez un FeedIterator, vous devez définir une valeur ChangeFeedStartFrom obligatoire constituée de la position de départ pour la lecture des modifications et de la valeur FeedRange à utiliser. FeedRange offre une plage de valeurs de clé de partition qui spécifie les éléments pouvant être lus à partir du flux de modification en utilisant le FeedIterator en question. Vous devez également spécifier une valeur requise ChangeFeedMode pour le mode dans lequel vous souhaitez traiter les modifications : dernière version ou toutes les versions et suppressions. Utilisez ChangeFeedMode.LatestVersion ou ChangeFeedMode.AllVersionsAndDeletes pour indiquer le mode à utiliser pour lire le flux de modification. Quand vous utilisez le mode « Toutes les versions et suppressions », vous devez sélectionner une valeur de début de flux de modification égale à Now() ou correspondant à un jeton de continuation spécifique.

Vous pouvez, si vous le souhaitez, spécifier ChangeFeedRequestOptions pour définir un PageSizeHint. Quand cette propriété est définie, elle détermine le nombre maximal d’éléments reçus par page. Si des opérations dans la collection suivie sont effectuées par le biais de procédures stockées, l’étendue de la transaction est conservée lors de la lecture d’éléments du flux de modifications. Ainsi, le nombre d’éléments reçus peut être supérieur à la valeur spécifiée, de sorte à renvoyer les éléments modifiés par une même transaction dans un même lot atomique.

Voici un exemple qui décrit comment obtenir un FeedIterator en mode « Dernière version » qui renvoie les objets d’entité, en l’occurrence un objet User :

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Conseil / Astuce

Pour les versions antérieures à 3.34.0, le mode de version le plus récent peut être utilisé en définissant ChangeFeedMode.Incremental. Les Incremental et LatestVersion se réfèrent au dernier mode de version du flux de modification, et les applications qui utilisent l’un ou l’autre mode présentent le même comportement.

Le mode Toutes les versions et suppressions est en préversion et peut être utilisé avec les préversions >= 3.32.0-preview du kit SDK .NET. Voici un exemple pour obtenir un FeedIterator en mode Toutes les versions et suppressions qui retourne des objets User :

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Note

En mode « Dernière version », vous recevez les objets qui représentent l’élément modifié avec des métadonnées supplémentaires. Le mode « Toutes les versions et suppressions » renvoie un modèle de données différent.

Vous pouvez obtenir l’exemple complet pour le mode dernière version ou le mode toutes les versions et suppressions.

Consommation du flux de modification avec des flux

Avec les deux modes de flux de modification, le FeedIterator offre deux options. En plus des exemples qui retournent des objets d’entité, vous pouvez également obtenir la réponse avec la prise en charge de Stream. Les flux vous permettent de lire des données sans avoir à les désérialiser au préalable, ce qui permet d'économiser les ressources des clients.

Voici un exemple qui décrit comment obtenir un FeedIterator en mode « Dernière version » qui renvoie un Stream :

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Consommation des modifications d’un conteneur entier

Si vous ne définissez pas de paramètre FeedRange pour un FeedIterator, vous pouvez traiter le flux de modification d’un conteneur entier à votre rythme. Voici un exemple qui permet de lire toutes les modifications à partir de l’heure actuelle en utilisant le mode« Dernière version » :

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Comme le flux de modification constitue effectivement une liste d’éléments infinie qui englobe toutes les écritures et mises à jour ultérieures, la valeur de HasMoreResults est toujours true. Lorsque vous essayez de lire le flux de modification et qu’aucune nouvelle modification n’est disponible, vous recevez une réponse avec l’état NotModified. Cela est différent de la réception d'une réponse sans modifications et d'état OK. Il est possible d’obtenir des réponses de flux de modification vides pendant que d’autres modifications sont disponibles et que vous devez continuer l’interrogation jusqu’à la réception de NotModified. Dans l'exemple précédent, NotModified est géré en attendant cinq secondes avant de vérifier à nouveau les modifications.

Consommation des modifications d’une clé de partition

Dans certains cas, vous pouvez traiter uniquement les modifications d’une clé de partition donnée. Vous pouvez obtenir un FeedIterator pour une clé de partition donnée et traiter les modifications comme vous le feriez pour un conteneur entier.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Utilisation de FeedRange pour la parallélisation

Dans le processeur de flux de modification, le travail est automatiquement réparti sur plusieurs consommateurs. Dans le modèle d’extraction de flux de modification, vous pouvez utiliser FeedRange pour paralléliser le traitement du flux de modification. FeedRange représente une plage de valeurs de clé de partition.

Voici un exemple qui décrit comment obtenir une liste de plages pour votre conteneur :

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Lorsque vous obtenez la liste des valeurs FeedRange de votre conteneur, vous obtenez un FeedRange par partition physique.

Vous pouvez utiliser un FeedRange pour créer un FeedIterator afin de paralléliser le traitement du flux de modification sur plusieurs machines ou threads. Contrairement à l’exemple précédent qui illustrait comment obtenir un FeedIterator pour l’ensemble du conteneur ou pour une seule clé de partition, vous pouvez utiliser FeedRanges pour obtenir plusieurs FeedIterators qui peuvent traiter le flux de modification en parallèle.

Si vous souhaitez utiliser FeedRanges, vous devez disposer d’un processus d’orchestrateur qui obtient FeedRanges et les distribue à ces machines. Cette distribution peut être :

  • Utilisation de FeedRange.ToJsonString et distribution de cette valeur de chaîne. Les consommateurs peuvent utiliser cette valeur avec FeedRange.FromJsonString.
  • Transmission de la référence d’objet FeedRange si la distribution est en cours.

Voici un exemple qui décrit comment utiliser deux machines hypothétiques distinctes qui lisent en parallèle pour lire à partir du début du flux de modification du conteneur :

Machine 1 :

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Machine 2 :

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Enregistrement des jetons de continuation

Vous pouvez enregistrer la position de votre FeedIterator en obtenant le jeton de continuation. Un jeton de continuation est une valeur de chaîne qui garde une trace des dernières modifications traitées par votre FeedIterator et permet à FeedIterator de reprendre à ce point plus tard. Le jeton de continuation, s’il est spécifié, est prioritaire sur l’heure de début et commence à partir des valeurs de début. Le code suivant lit le flux de modification depuis la création du conteneur. Une fois que vous n’avez plus de modifications disponibles, il conserve un jeton de continuation afin que la consommation de flux de modification puisse être reprise ultérieurement.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

En mode« Dernière version », le jeton de continuation FeedIterator n’expire jamais tant que le conteneur Azure Cosmos DB existe. En mode « Toutes les versions et suppressions », le jeton de continuation FeedIterator est valide tant que les modifications ont lieu pendant la période de rétention des sauvegardes continues.

Étapes suivantes