Utilisez le modèle d’extraction de flux de modification pour consommer le flux de modification Azure Cosmos DB à votre rythme. Comme avec le processeur de flux de modification, vous pouvez utiliser le modèle pull du flux de modification pour paralléliser le traitement des modifications entre plusieurs consommateurs de flux de modification.
Toutefois, vous ne pouvez pas convertir les jetons de continuation en bail (ou inversement).
Voici quelques différences clés entre le processeur de flux de modification et le modèle d’extraction de flux de modification :
Pour traiter le flux de modification avec le modèle pull, créez une instance du FeedIterator. Lorsque vous créez un FeedIterator, vous devez définir une valeur ChangeFeedStartFrom obligatoire constituée de la position de départ pour la lecture des modifications et de la valeur FeedRange à utiliser.
FeedRange offre une plage de valeurs de clé de partition qui spécifie les éléments pouvant être lus à partir du flux de modification en utilisant le FeedIterator en question. Vous devez également spécifier une valeur requise ChangeFeedMode pour le mode dans lequel vous souhaitez traiter les modifications : dernière version ou toutes les versions et suppressions. Utilisez ChangeFeedMode.LatestVersion ou ChangeFeedMode.AllVersionsAndDeletes pour indiquer le mode à utiliser pour lire le flux de modification. Quand vous utilisez le mode « Toutes les versions et suppressions », vous devez sélectionner une valeur de début de flux de modification égale à Now() ou correspondant à un jeton de continuation spécifique.
Vous pouvez, si vous le souhaitez, spécifier ChangeFeedRequestOptions pour définir un PageSizeHint. Quand cette propriété est définie, elle détermine le nombre maximal d’éléments reçus par page. Si des opérations dans la collection suivie sont effectuées par le biais de procédures stockées, l’étendue de la transaction est conservée lors de la lecture d’éléments du flux de modifications. Ainsi, le nombre d’éléments reçus peut être supérieur à la valeur spécifiée, de sorte à renvoyer les éléments modifiés par une même transaction dans un même lot atomique.
Voici un exemple qui décrit comment obtenir un FeedIterator en mode « Dernière version » qui renvoie les objets d’entité, en l’occurrence un objet User :
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Conseil / Astuce
Pour les versions antérieures à 3.34.0, le mode de version le plus récent peut être utilisé en définissant ChangeFeedMode.Incremental. Les Incremental et LatestVersion se réfèrent au dernier mode de version du flux de modification, et les applications qui utilisent l’un ou l’autre mode présentent le même comportement.
Le mode Toutes les versions et suppressions est en préversion et peut être utilisé avec les préversions >= 3.32.0-preview du kit SDK .NET. Voici un exemple pour obtenir un FeedIterator en mode Toutes les versions et suppressions qui retourne des objets User :
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Consommation du flux de modification avec des flux
Avec les deux modes de flux de modification, le FeedIterator offre deux options. En plus des exemples qui retournent des objets d’entité, vous pouvez également obtenir la réponse avec la prise en charge de Stream. Les flux vous permettent de lire des données sans avoir à les désérialiser au préalable, ce qui permet d'économiser les ressources des clients.
Voici un exemple qui décrit comment obtenir un FeedIterator en mode « Dernière version » qui renvoie un Stream :
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Consommation des modifications d’un conteneur entier
Si vous ne définissez pas de paramètre FeedRange pour un FeedIterator, vous pouvez traiter le flux de modification d’un conteneur entier à votre rythme. Voici un exemple qui permet de lire toutes les modifications à partir de l’heure actuelle en utilisant le mode« Dernière version » :
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Comme le flux de modification constitue effectivement une liste d’éléments infinie qui englobe toutes les écritures et mises à jour ultérieures, la valeur de HasMoreResults est toujours true. Lorsque vous essayez de lire le flux de modification et qu’aucune nouvelle modification n’est disponible, vous recevez une réponse avec l’état NotModified. Cela est différent de la réception d'une réponse sans modifications et d'état OK. Il est possible d’obtenir des réponses de flux de modification vides pendant que d’autres modifications sont disponibles et que vous devez continuer l’interrogation jusqu’à la réception de NotModified. Dans l'exemple précédent, NotModified est géré en attendant cinq secondes avant de vérifier à nouveau les modifications.
Consommation des modifications d’une clé de partition
Dans certains cas, vous pouvez traiter uniquement les modifications d’une clé de partition donnée. Vous pouvez obtenir un FeedIterator pour une clé de partition donnée et traiter les modifications comme vous le feriez pour un conteneur entier.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Utilisation de FeedRange pour la parallélisation
Dans le processeur de flux de modification, le travail est automatiquement réparti sur plusieurs consommateurs. Dans le modèle d’extraction de flux de modification, vous pouvez utiliser FeedRange pour paralléliser le traitement du flux de modification.
FeedRange représente une plage de valeurs de clé de partition.
Voici un exemple qui décrit comment obtenir une liste de plages pour votre conteneur :
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Lorsque vous obtenez la liste des valeurs FeedRange de votre conteneur, vous obtenez un FeedRange par partition physique.
Vous pouvez utiliser un FeedRange pour créer un FeedIterator afin de paralléliser le traitement du flux de modification sur plusieurs machines ou threads. Contrairement à l’exemple précédent qui illustrait comment obtenir un FeedIterator pour l’ensemble du conteneur ou pour une seule clé de partition, vous pouvez utiliser FeedRanges pour obtenir plusieurs FeedIterators qui peuvent traiter le flux de modification en parallèle.
Si vous souhaitez utiliser FeedRanges, vous devez disposer d’un processus d’orchestrateur qui obtient FeedRanges et les distribue à ces machines. Cette distribution peut être :
- Utilisation de
FeedRange.ToJsonString et distribution de cette valeur de chaîne. Les consommateurs peuvent utiliser cette valeur avec FeedRange.FromJsonString.
- Transmission de la référence d’objet
FeedRange si la distribution est en cours.
Voici un exemple qui décrit comment utiliser deux machines hypothétiques distinctes qui lisent en parallèle pour lire à partir du début du flux de modification du conteneur :
Machine 1 :
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Machine 2 :
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Enregistrement des jetons de continuation
Vous pouvez enregistrer la position de votre FeedIterator en obtenant le jeton de continuation. Un jeton de continuation est une valeur de chaîne qui garde une trace des dernières modifications traitées par votre FeedIterator et permet à FeedIterator de reprendre à ce point plus tard. Le jeton de continuation, s’il est spécifié, est prioritaire sur l’heure de début et commence à partir des valeurs de début. Le code suivant lit le flux de modification depuis la création du conteneur. Une fois que vous n’avez plus de modifications disponibles, il conserve un jeton de continuation afin que la consommation de flux de modification puisse être reprise ultérieurement.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
En mode« Dernière version », le jeton de continuation FeedIterator n’expire jamais tant que le conteneur Azure Cosmos DB existe. En mode « Toutes les versions et suppressions », le jeton de continuation FeedIterator est valide tant que les modifications ont lieu pendant la période de rétention des sauvegardes continues.
Pour traiter le flux de modification avec le modèle pull, créez une instance du Iterator<FeedResponse<JsonNode>> responseIterator. Lorsque vous créez CosmosChangeFeedRequestOptions, vous devez indiquer où commencer à lire le flux de modification et transmettre le paramètre FeedRange à utiliser.
FeedRange est une plage de valeurs de clés de partition qui spécifie les éléments qui peuvent être lus à partir du flux de modification.
Pour lire le flux de modification en mode « Toutes les versions et suppressions », vous devez également définir allVersionsAndDeletes() lorsque vous créez CosmosChangeFeedRequestOptions. Le mode Toutes les versions et suppressions ne prend pas en charge le traitement du flux de modification depuis le début ou à partir d’un point dans le temps. Vous devez traiter les modifications à partir de maintenant ou à partir d’un jeton de continuation. Toutes les versions et le mode de suppression est en préversion et est disponible dans la version du Kit de développement logiciel (SDK) Java >= 4.42.0.
Consommation des modifications d’un conteneur entier
Si vous définissez FeedRange.forFullRange(), vous pouvez traiter le flux de modification d’un conteneur entier à votre rythme. Vous pouvez spécifier une valeur dans byPage(). Quand cette propriété est définie, elle détermine le nombre maximal d’éléments reçus par page.
Voici un exemple qui décrit comment obtenir une valeur responseIterator en mode « Dernière version » :
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Voici un exemple qui décrit comment obtenir un responseIterator en mode « Toutes les versions et suppressions » :
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Nous pouvons ensuite itérer sur les résultats. Comme le flux de modification constitue effectivement une liste d’éléments infinie qui englobe toutes les écritures et mises à jour ultérieures, la valeur de responseIterator.hasNext() est toujours true. Voici un exemple en mode « Dernière version », qui lit toutes les modifications depuis le début. Chaque itération conserve un jeton de continuation après avoir traité tous les événements. Il est récupéré du dernier point traité dans le flux de modification et est géré à l’aide de createForProcessingFromContinuation :
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Consommation des modifications d’une clé de partition
Dans certains cas, vous pouvez traiter uniquement les modifications d’une clé de partition donnée. Vous pouvez traiter les modifications d’une clé de partition donnée comme vous le feriez pour un conteneur entier. Voici un exemple avec le mode « Dernière version » :
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Utilisation de FeedRange pour la parallélisation
Dans le processeur de flux de modification, le travail est automatiquement réparti sur plusieurs consommateurs. Dans le modèle d’extraction de flux de modification, vous pouvez utiliser FeedRange pour paralléliser le traitement du flux de modification.
FeedRange représente une plage de valeurs de clé de partition.
Voici un exemple avec le mode « Dernière version » qui décrit comment obtenir une liste de plages pour votre conteneur :
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
Lorsque vous obtenez la liste des FeedRanges pour votre conteneur, vous récupérez un FeedRange par partition physique.
Vous pour utiliser un FeedRange pour paralléliser le traitement du flux de modification sur plusieurs machines ou threads. Contrairement à l’exemple précédent qui illustrait comment traiter les modifications pour l’ensemble du conteneur ou pour une seule clé de partition, vous pouvez utiliser FeedRanges pour traiter le flux de modification en parallèle.
Si vous souhaitez utiliser FeedRanges, vous devez disposer d’un processus d’orchestrateur qui obtient FeedRanges et les distribue à ces machines. Cette distribution peut être :
- Utilisation de
FeedRange.toString() et distribution de cette valeur de chaîne.
- Transmission de la référence d’objet
FeedRange si la distribution est en cours.
Voici un exemple avec le mode « Dernière version ». Il indique comment lire à partir du début du flux de modification du conteneur en utilisant deux machines hypothétiques distinctes qui lisent en parallèle.
Machine 1 :
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Machine 2 :
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Pour traiter le flux de modification à l’aide du modèle collecteur, créez une instance de responseIterator avec le type ItemPaged[Dict[str, Any]].
Lorsque vous appelez l’API de flux de modification, vous devez spécifier où commencer à lire le flux de modification et passer le feed_range paramètre à utiliser.
feed_range est une plage de valeurs de clés de partition qui spécifie les éléments qui peuvent être lus à partir du flux de modification.
Vous pouvez également spécifier un mode paramètre pour le mode de flux de modification dans lequel vous souhaitez traiter les modifications : LatestVersion ou AllVersionsAndDeletes. La valeur par défaut est LatestVersion.
Utilisez LatestVersion ou AllVersionsAndDeletes pour indiquer le mode à utiliser pour lire le flux de modification.
Lorsque vous utilisez le mode AllVersionsAndDeletes, vous pouvez commencer à traiter les modifications à partir de maintenant ou à partir d’un jeton continuation.
La lecture du flux de modification à partir du début ou d’un point dans le temps en utilisant start_time n’est pas prise en charge.
Consommation des modifications d’un conteneur entier
Si vous ne fournissez pas de feed_range paramètre, vous pouvez traiter le flux de modification d’un conteneur entier à votre propre rythme.
Voici un exemple pour obtenir responseIterator en mode LatestVersion à partir de Beginning. Étant donné qu’il s’agit d’un mode par défaut, le paramètre LatestVersion n’a pas besoin d’être passé.
responseIterator = container.query_items_change_feed(start_time="Beginning")
Voici un exemple de la façon d’obtenir responseIterator en AllVersionsAndDeletes mode à partir de Now: Now étant une valeur par défaut de start_time paramètre, il n’est pas nécessaire de passer :
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Nous pouvons ensuite itérer sur les résultats. Étant donné que le flux de modification est effectivement une liste infinie d’éléments qui englobe toutes les futures écritures et mises à jour, responseIterator peut effectuer une boucle infinie.
Voici un exemple en mode « Dernière version », qui lit toutes les modifications depuis le début.
Chaque itération imprime les flux de modification pour les documents.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Consommation des modifications d’une clé de partition
Dans certains cas, vous pouvez traiter uniquement les modifications d’une clé de partition donnée.
Vous pouvez traiter les modifications de la même façon que pour un conteneur entier avec le partition_key paramètre.
Voici un exemple qui utilise le mode LatestVersion :
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Utilisation de FeedRange pour la parallélisation
Dans le modèle d’extraction de flux de modification, vous pouvez utiliser feed_range pour paralléliser le traitement du flux de modification.
feed_range représente une plage de valeurs de clé de partition.
Voici un exemple qui montre comment obtenir une liste de plages pour votre conteneur.
list la commande convertit l’itérateur en liste :
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
Lorsque vous obtenez la liste des valeurs feed_range de votre conteneur, vous obtenez un feed_range par partition physique.
Vous pouvez utiliser un feed_range pour créer un itérateur afin de paralléliser le traitement du flux de modification sur plusieurs machines ou threads.
Contrairement à l’exemple précédent qui a montré comment obtenir un responseIterator conteneur entier ou une clé de partition unique, vous pouvez utiliser feed_range pour obtenir plusieurs itérateurs, qui peuvent traiter le flux de modification en parallèle.
Voici un exemple qui décrit comment utiliser deux machines hypothétiques distinctes qui lisent en parallèle pour lire à partir du début du flux de modification du conteneur :
Machine 1 :
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
Machine 2 :
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Enregistrement des jetons de continuation
Vous pouvez enregistrer la position de votre itérateur en obtenant le jeton de continuation.
Un jeton de continuation est une valeur de chaîne qui effectue le suivi de vos responseIterator dernières modifications traitées et permet à l’itérateur de reprendre à ce stade ultérieurement.
Le jeton de continuation, s’il est spécifié, est prioritaire sur l’heure de début et commence à partir des valeurs de début.
Le code suivant lit le flux de modification depuis la création du conteneur.
Une fois que vous n’avez plus de modifications disponibles, il conserve un jeton de continuation afin que la consommation de flux de modification puisse être reprise ultérieurement.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Note
Étant donné que le jeton continuation contient le paramètre mode précédemment utilisé, si continuation a été utilisé, le paramètre mode est ignoré et utilise le mode du jeton continuation à la place.
Voici un exemple qui montre comment lire le flux de modification du conteneur à l’aide d’un continuation jeton :
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Pour traiter le flux de modification avec le modèle pull, créez une instance du ChangeFeedPullModelIterator. Lorsque vous créez ChangeFeedPullModelIteratorinitialement, vous devez spécifier une valeur requise changeFeedStartFrom à l’intérieur du ChangeFeedIteratorOptions, qui se compose à la fois de la position de départ pour la lecture des modifications et de la ressource (une clé de partition ou un FeedRange) pour laquelle les modifications doivent être extraites.
Note
Si aucune valeur n’est changeFeedStartFrom spécifiée, le flux de modification est extrait pour un conteneur entier à partir de Now().
Actuellement, seule la dernière version est prise en charge par le Kit de développement logiciel (SDK) JavaScript et est sélectionnée par défaut.
Vous pouvez éventuellement utiliser maxItemCount dans ChangeFeedIteratorOptions pour définir le nombre maximal d’éléments reçus par page.
Voici un exemple qui décrit comment obtenir un itérateur en mode de dernière version qui renvoie les objets d’entité :
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Consommation des modifications d’un conteneur entier
Si vous ne définissez pas de paramètre FeedRange ou PartitionKey dans ChangeFeedStartFrom, vous pouvez traiter le flux de modification d’un conteneur entier à votre rythme. Voici un exemple qui démarre la lecture de toutes les modifications à partir de l’heure actuelle :
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Comme le flux de modification constitue effectivement une liste d’éléments infinie qui englobe toutes les écritures et mises à jour ultérieures, la valeur de hasMoreResults est toujours true. Lorsque vous essayez de lire le flux de modification et qu’aucune nouvelle modification n’est disponible, vous recevez une réponse avec l’état NotModified. Cela est différent de la réception d'une réponse sans modifications et d'état OK. Il est possible d’obtenir des réponses de flux de modification vides pendant que d’autres modifications sont disponibles et que vous devez continuer l’interrogation jusqu’à la réception de NotModified. Dans l'exemple précédent, NotModified est géré en attendant cinq secondes avant de vérifier à nouveau les modifications.
Consommation des modifications d’une clé de partition
Dans certains cas, vous pouvez traiter uniquement les modifications d’une clé de partition donnée. Vous pouvez obtenir un itérateur pour une clé de partition spécifique, et traiter les modifications de la même façon que vous le faites pour un conteneur entier.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Utilisation de FeedRange pour la parallélisation
Dans le modèle d’extraction de flux de modification, vous pouvez utiliser FeedRange pour paralléliser le traitement du flux de modification.
FeedRange représente une plage de valeurs de clé de partition.
Voici un exemple qui décrit comment obtenir une liste de plages pour votre conteneur :
const ranges = await container.getFeedRanges();
Lorsque vous obtenez la liste des valeurs FeedRange de votre conteneur, vous obtenez un FeedRange par partition physique.
Vous pouvez utiliser un FeedRange pour créer un itérateur afin de paralléliser le traitement du flux de modification sur plusieurs machines ou threads. Contrairement à l’exemple précédent qui a montré comment obtenir un itérateur de flux de modification pour l’ensemble du conteneur ou une clé de partition unique, vous pouvez utiliser FeedRanges pour obtenir plusieurs itérateurs, qui peuvent traiter le flux de modification en parallèle.
Voici un exemple qui décrit comment utiliser deux machines hypothétiques distinctes qui lisent en parallèle pour lire à partir du début du flux de modification du conteneur :
Machine 1 :
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Machine 2 :
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Enregistrement des jetons de continuation
Vous pouvez enregistrer la position de votre itérateur en obtenant un jeton de continuation. Un jeton de continuation est une valeur de chaîne qui suit les modifications traitées en dernier par votre itérateur de flux de changements et permet à l'itérateur de reprendre plus tard. Le jeton de continuation, s’il est spécifié, est prioritaire sur l’heure de début et commence à partir des valeurs de début. Le code suivant lit le flux de modification depuis la création du conteneur. Une fois que vous n’avez plus de modifications disponibles, il conserve un jeton de continuation afin que la consommation de flux de modification puisse être reprise ultérieurement.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Le jeton de continuation n’expire jamais tant que le conteneur Azure Cosmos DB existe toujours.
Utiliser AsyncIterator
Vous pouvez utiliser JavaScript AsyncIterator pour extraire le flux de modification. Voici un exemple de AsyncIterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}