Mit dem Pullmodell für den Änderungsfeed können Sie den Azure Cosmos DB-Änderungsfeed in Ihrem eigenen Tempo verwenden. Ähnlich wie beim Änderungsfeedprozessor können Sie das Pullmodell für den Änderungsfeed verwenden, um die Verarbeitung von Änderungen über mehrere Änderungsfeedconsumer zu parallelisieren.
Es ist jedoch nicht möglich, Fortsetzungstoken in eine Lease oder umgekehrt zu konvertieren.
In folgenden Szenarien sollten Sie die Verwendung des Pullmodells in Erwägung ziehen:
Hier sind einige wesentliche Unterschiede zwischen dem Änderungsfeedprozessor und dem Änderungsfeed-Pullmodell:
Zum Verarbeiten des Änderungsfeeds mithilfe des Pullmodells erstellen Sie eine FeedIterator-Instanz. Wenn Sie erstmalig einen FeedIterator erstellen, müssen Sie einen erforderlichen ChangeFeedStartFrom-Wert angeben, der sowohl die Anfangsposition für das Lesen von Änderungen als auch den gewünschten Wert umfasst, den Sie für FeedRange nutzen möchten. Der FeedRange ist ein Bereich von Partitionsschlüsselwerten und gibt die Elemente an, die mithilfe dieses speziellen FeedIterator aus dem Änderungsfeed gelesen werden können. Sie müssen auch einen erforderlichen ChangeFeedMode-Wert für den Modus angeben, in dem Sie Änderungen verarbeiten möchten: Neueste Version oder Alle Versionen und Löschvorgänge. Verwenden Sie entweder ChangeFeedMode.LatestVersion oder ChangeFeedMode.AllVersionsAndDeletes, um anzugeben, welchen Modus Sie zum Lesen des Änderungsfeeds verwenden möchten. Wenn Sie den Modus „Alle Versionen und Löschvorgänge“ verwenden, müssen Sie als Startwert für den Änderungsfeed entweder Now() oder ein bestimmtes Fortsetzungstoken auswählen.
Optional können Sie ChangeFeedRequestOptions zum Festlegen eines PageSizeHint angeben. Diese Eigenschaft legt die Höchstzahl von Elementen fest, die pro Seite empfangen werden können. Wenn Vorgänge in der überwachten Sammlung über gespeicherte Prozeduren ausgeführt werden, wird der Transaktionsbereich beim Lesen von Elementen aus dem Änderungsfeed beibehalten. Dadurch kann die Anzahl der empfangenen Elemente unter Umständen höher als der angegebene Wert sein, sodass die von derselben Transaktion geänderten Elemente als Teil eines atomischen Batches zurückgegeben werden.
Hier sehen Sie ein Beispiel für das Abrufen eines FeedIterator im Modus „Neueste Version“, der Entitätsobjekte zurückgibt (in diesem Fall ein User-Objekt):
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tipp
Für frühere 3.34.0-Versionen kann der neueste Versionsmodus aktiviert werden, indem ChangeFeedMode.Incremental festgelegt wird. Sowohl Incremental als auch LatestVersion beziehen sich auf den neuesten Versionsmodus des Änderungsfeeds, und Anwendungen, die einen der beiden Modi verwenden, sehen dasselbe Verhalten.
Der Modus „Alle Versionen und Löschvorgänge“ befindet sich in der Vorschauphase und kann mit .NET SDK-Vorschauversionen >= 3.32.0-preview verwendet werden. Ein Beispiel für das Abrufen von FeedIterator im Modus „Alle Versionen und Löschvorgänge“, der User-Objekte zurückgibt:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Verwenden des Änderungsfeeds über Streams
FeedIteratorfür beide Änderungsfeedmodi gibt es zwei Optionen. Zusätzlich zu den Beispielen, die Entitätsobjekte zurückgeben, können Sie die Antwort auch mit Stream-Unterstützung abrufen. Datenströme ermöglichen Ihnen, Daten zu lesen, ohne sie zuvor deserialisieren zu müssen. Auf diese Weise sparen Sie Clientressourcen.
Hier sehen Sie ein Beispiel für das Abrufen eines FeedIterator im Modus „Neueste Version“, der einen Stream zurückgibt:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Verwenden der Änderungen für einen vollständigen Container
Wenn Sie keinen ParameterFeedRange für einen FeedIterator angeben, können Sie den Änderungsfeed eines vollständigen Containers in Ihrem eigenen Tempo verarbeiten. Im folgenden Beispiel wird mit dem Lesen aller Änderungen ab dem aktuellen Zeitpunkt im Modus „Neueste Version“ begonnen:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Da es sich beim Änderungsfeed im Grunde um eine unbegrenzte Liste von Elementen handelt, die alle zukünftigen Schreib- und Updatevorgänge enthält, ist der Wert von HasMoreResults immer true. Wenn Sie versuchen, den Änderungsfeed zu lesen, und keine neuen Änderungen vorliegen, erhalten Sie eine Antwort mit dem Status NotModified. Dies unterscheidet sich von dem Empfangen einer Antwort ohne Änderungen und OK Status. Es ist möglich, leere Antworten im Änderungsfeed zu erhalten, während weitere Änderungen verfügbar sind. Sie sollten weiter abfragen, bis der Empfang von NotModified erfolgt. Im vorherigen Beispiel wird NotModified behandelt, indem fünf Sekunden gewartet werden, bevor erneut auf Änderungen überprüft wird.
Verwenden der Änderungen für einen Partitionsschlüssel
In einigen Fällen können Sie nur die Änderungen für einen bestimmten Partitionsschlüssel verarbeiten. Sie können FeedIterator für einen bestimmten Partitionsschlüssel abrufen und die Änderungen auf die gleiche Weise wie für einen vollständigen Container verarbeiten.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Verwenden von FeedRange für die Parallelisierung
Beim Änderungsfeedprozessor wird die Arbeit automatisch auf mehrere Consumer verteilt. Beim Pullmodell für den Änderungsfeed können Sie FeedRange verwenden, um die Verarbeitung des Änderungsfeeds zu parallelisieren. Ein FeedRange stellt einen Bereich von Partitionsschlüsselwerten dar.
Das folgende Beispiel zeigt, wie Sie eine Liste von Bereichen für Ihren Container abrufen:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Wenn Sie eine Liste von FeedRange-Werten für Ihren Container abrufen, erhalten Sie einen FeedRange pro physische Partition.
Durch Verwenden eines FeedRange können Sie dann einen FeedIterator erstellen, um die Verarbeitung des Änderungsfeeds parallel auf mehrere Computer oder Threads zu verteilen. Im Gegensatz zum vorherigen Beispiel, in dem gezeigt wurde, wie Sie einen FeedIterator für den gesamten Container oder einen einzelnen Partitionsschlüssel abrufen, können Sie mithilfe von FeedRanges mehrere FeedIterators abrufen, über die der Änderungsfeed parallel verarbeitet werden kann.
Wenn Sie FeedRanges verwenden möchten, benötigen Sie einen Orchestratorprozess, der sie abruft und auf diese Computer verteilt. Diese Verteilung kann wie folgt sein:
- Verwenden von
FeedRange.ToJsonString und Verteilen dieses Zeichenfolgenwerts. Die Consumer können diesen Wert mit FeedRange.FromJsonString verwenden.
- Wenn die Verteilung bereits bearbeitet wird: Übergeben eines Verweises auf das
FeedRange-Objekt
Das folgende Beispiel zeigt, wie Sie vom Anfang des Änderungsfeeds des Containers lesen und dafür zwei hypothetische, getrennte Computer verwenden, die parallel lesen:
Computer 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Computer 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Speichern von Fortsetzungstoken
Sie können die Position Ihres FeedIterator speichern, indem Sie das Fortsetzungstoken abrufen. Ein Fortsetzungstoken ist ein Zeichenfolgenwert, der die letzten verarbeiteten Änderungen Ihres FeedIterators nachverfolgt und dem FeedIterator eine spätere Fortsetzung an dieser Stelle ermöglicht. Das Fortsetzungstoken (sofern angegeben) hat Vorrang vor den Werten für die Startzeit und das Beginnen am Anfang. Der folgende Code liest den Änderungsfeed seit der Erstellung des Containers. Wenn keine weiteren Änderungen mehr verfügbar sind, dauert ein Fortsetzungstoken fort, sodass der Änderungsfeed später weiter verarbeitet werden kann.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Wenn Sie den Modus der neuesten Version verwenden, läuft das FeedIterator Fortsetzungstoken nie ab, solange der Azure Cosmos DB-Container noch vorhanden ist. Wenn Sie alle Versionen und den Löschmodus verwenden, ist das FeedIterator Fortsetzungstoken gültig, solange die Änderungen im Aufbewahrungsfenster für fortlaufende Sicherungen vorgenommen wurden.
Zum Verarbeiten des Änderungsfeeds mithilfe des Pullmodells erstellen Sie eine Iterator<FeedResponse<JsonNode>> responseIterator-Instanz. Beim Erstellen CosmosChangeFeedRequestOptionsmüssen Sie angeben, wo sie mit dem Lesen des Änderungsfeeds beginnen und den Parameter übergeben, den FeedRange Sie verwenden möchten.
FeedRange ist ein Bereich von Partitionsschlüsselwerten, der die Elemente angibt, die aus dem Änderungsfeed gelesen werden können.
Wenn Sie den Änderungsfeed im Modus „Alle Versionen und Löschvorgänge“ lesen möchten, müssen Sie beim Erstellen von allVersionsAndDeletes() auch CosmosChangeFeedRequestOptions angeben. Im Modus „Alle Versionen und Löschvorgänge“ wird die Verarbeitung des Änderungsfeeds von Anfang an oder ab einem bestimmten Zeitpunkt nicht unterstützt. Sie müssen Änderungen entweder ab dem jetzigen Zeitpunkt oder ab einem Fortsetzungstoken verarbeiten. Der Modus „Alle Versionen und Löschvorgänge“ befindet sich in der Vorschauphase und ist in Java SDK-Version >= 4.42.0 verfügbar.
Verwenden der Änderungen für einen vollständigen Container
Wenn Sie FeedRange.forFullRange() angeben, können Sie den gesamten Änderungsfeed eines Containers im gewünschten Tempo verarbeiten. Optional können Sie einen Wert in byPage() angeben. Diese Eigenschaft legt die Höchstzahl von Elementen fest, die pro Seite empfangen werden können.
Hier sehen Sie ein Beispiel für das Abrufen eines responseIterator im Modus „Neueste Version“:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Hier ist ein Beispiel für das Abrufen eines responseIterator in allen Versionen und Löschmodus:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Dann können wir durch die Ergebnisse iterieren. Da es sich beim Änderungsfeed im Grunde um eine unbegrenzte Liste von Elementen handelt, die alle zukünftigen Schreib- und Updatevorgänge enthält, ist der Wert von responseIterator.hasNext() immer true. Hier ist ein Beispiel im Modus „Neueste Version“, bei dem alle Änderungen von Anfang an gelesen werden. Jede Iteration behält ein Fortsetzungstoken bei, nachdem alle Ereignisse verarbeitet wurden. Es wird vom letzten verarbeiteten Punkt im Änderungsfeed abgerufen und mithilfe createForProcessingFromContinuationvon verarbeitet:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Verwenden der Änderungen eines Partitionsschlüssels
In einigen Fällen können Sie nur die Änderungen für einen bestimmten Partitionsschlüssel verarbeiten. Sie können die Änderungen für einen bestimmten Partitionsschlüssel auf dieselbe Weise verarbeiten wie für einen gesamten Container. Hier ist ein Beispiel, das den Modus der neuesten Version verwendet:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Verwenden von FeedRange für die Parallelisierung
Beim Änderungsfeedprozessor wird die Arbeit automatisch auf mehrere Consumer verteilt. Beim Pullmodell für den Änderungsfeed können Sie FeedRange verwenden, um die Verarbeitung des Änderungsfeeds zu parallelisieren. Ein FeedRange stellt einen Bereich von Partitionsschlüsselwerten dar.
Hier ist ein Beispiel, bei dem der Modus „Neueste Version“ verwendet wird. Es zeigt, wie Sie eine Liste von Bereichen für Ihren Container abrufen:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
Wenn Sie eine Liste von FeedRanges für Ihren Container abrufen, erhalten Sie einen FeedRange pro physischer Partition.
Mithilfe von FeedRange können Sie die Verarbeitung des Änderungsfeeds auf mehreren Computern oder in mehreren Threads parallelisieren. Im Gegensatz zum vorherigen Beispiel, in dem gezeigt wurde, wie Sie Änderungen für den gesamten Container oder einen einzelnen Partitionsschlüssel verarbeiten können, können Sie FeedRanges verwenden, um den Änderungsfeed parallel zu verarbeiten.
Wenn Sie FeedRanges verwenden möchten, benötigen Sie einen Orchestratorprozess, der sie abruft und auf diese Computer verteilt. Diese Verteilung kann wie folgt sein:
- Verwenden von
FeedRange.toString() und Verteilen dieses Zeichenfolgenwerts.
- Wenn die Verteilung bereits bearbeitet wird: Übergeben eines Verweises auf das
FeedRange-Objekt
Hier ist ein Beispiel, das den Modus der neuesten Version verwendet. Es zeigt, wie Sie vom Anfang des Änderungsfeeds des Containers lesen und dafür zwei hypothetische, getrennte Computer verwenden, die parallel lesen:
Computer 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Computer 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Um den Änderungsfeed mithilfe des Pullmodells zu verarbeiten, erstellen Sie eine Instanz von responseIterator mit dem Typ ItemPaged[Dict[str, Any]].
Wenn Sie die Änderungsfeed-API aufrufen, müssen Sie angeben, wo Sie mit dem Lesen des Änderungsfeeds beginnen, und den feed_range-Parameter übergeben, den Sie verwenden möchten.
feed_range ist ein Bereich von Partitionsschlüsselwerten, der die Elemente angibt, die aus dem Änderungsfeed gelesen werden können.
Sie können auch den Parameter für den Änderungsfeedmodus angebenmode, in dem Sie Änderungen verarbeiten möchten: LatestVersion oder AllVersionsAndDeletes. Der Standardwert ist LatestVersion.
Verwenden Sie entweder LatestVersion oder AllVersionsAndDeletes, um anzugeben, welchen Modus Sie zum Lesen des Änderungsfeeds verwenden möchten.
Wenn Sie den AllVersionsAndDeletes-Modus verwenden, können Sie mit der Verarbeitung von Änderungen entweder ab jetzt oder ab einem continuation-Token beginnen.
Das Lesen des Änderungsfeeds vom Anfang oder von einem bestimmten Zeitpunkt aus unter Verwendung von start_time wird nicht unterstützt.
Verwenden der Änderungen für einen vollständigen Container
Wenn Sie keinen Parameter angeben feed_range , können Sie den Änderungsfeed eines gesamten Containers in Ihrem eigenen Tempo verarbeiten.
Das folgende Beispiel zeigt, wie Sie responseIterator im LatestVersion-Modus ab Beginning abrufen. Da es sich bei LatestVersion um einen Standardmodus handelt, muss der mode Parameter nicht übergeben werden.
responseIterator = container.query_items_change_feed(start_time="Beginning")
Hier ist ein Beispiel dafür, wie man responseIterator im AllVersionsAndDeletes-Modus von Now abruft. Da Now ein Standardwert des start_time-Parameters ist, muss er nicht übergeben werden.
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Dann können wir durch die Ergebnisse iterieren. Da der Änderungsfeed effektiv eine unendliche Liste von Elementen ist, die alle zukünftigen Schreibvorgänge und -updates umfasst, responseIterator kann endlos durchlaufen werden.
Hier ist ein Beispiel im Modus „Neueste Version“, bei dem alle Änderungen von Anfang an gelesen werden.
Jede Iteration druckt Änderungsfeeds für Dokumente.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Verwenden der Änderungen eines Partitionsschlüssels
In einigen Fällen können Sie nur die Änderungen für einen bestimmten Partitionsschlüssel verarbeiten.
Sie können die Änderungen auf die gleiche Weise verarbeiten wie für einen gesamten Container mit dem partition_key Parameter.
Hier ist ein Beispiel, das den Modus verwendet LatestVersion :
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Verwenden von FeedRange für die Parallelisierung
Beim Pullmodell für den Änderungsfeed können Sie feed_range verwenden, um die Verarbeitung des Änderungsfeeds zu parallelisieren.
Ein feed_range stellt einen Bereich von Partitionsschlüsselwerten dar.
Das folgende Beispiel zeigt, wie Sie eine Liste von Bereichen für Ihren Container abrufen.
list der Befehl wandelt iterator in eine Liste um:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
Wenn Sie eine Liste von feed_range-Werten für Ihren Container abrufen, erhalten Sie einen feed_range pro physische Partition.
Durch Verwenden eines feed_range können Sie einen Iterator erstellen, um die Verarbeitung des Änderungsfeeds parallel auf mehrere Computer oder Threads zu verteilen.
Im Gegensatz zum vorherigen Beispiel, das gezeigt hat, wie sie einen responseIterator für den gesamten Container oder einen einzelnen Partitionsschlüssel abrufen, können Sie mehrere feed_range Iteratoren abrufen, die den Änderungsfeed parallel verarbeiten können.
Das folgende Beispiel zeigt, wie Sie vom Anfang des Änderungsfeeds des Containers lesen und dafür zwei hypothetische, getrennte Computer verwenden, die parallel lesen:
Computer 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
Computer 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Speichern von Fortsetzungstoken
Sie können die Position Ihres Iterators speichern, indem Sie das Fortsetzungstoken abrufen.
Ein Fortsetzungstoken ist ein Zeichenfolgenwert, der die responseIterator letzten verarbeiteten Änderungen nachverfolgt und es dem Iterator ermöglicht, zu diesem Zeitpunkt später fortzusetzen.
Das Fortsetzungstoken (sofern angegeben) hat Vorrang vor den Werten für die Startzeit und das Beginnen am Anfang.
Der folgende Code liest den Änderungsfeed seit der Erstellung des Containers.
Wenn keine weiteren Änderungen mehr verfügbar sind, dauert ein Fortsetzungstoken fort, sodass der Änderungsfeed später weiter verarbeitet werden kann.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Hinweis
Da das continuation-Token den zuvor verwendeten mode-Parameter enthält, wird der continuation-Parameter ignoriert, wenn mode verwendet wurde, und stattdessen wird der mode aus dem continuation-Token verwendet.
Hier ist ein Beispiel, das zeigt, wie Sie mithilfe eines continuation Tokens aus dem Änderungsfeed des Containers lesen:
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Zum Verarbeiten des Änderungsfeeds mithilfe des Pullmodells erstellen Sie eine ChangeFeedPullModelIterator-Instanz. Wenn Sie ChangeFeedPullModelIterator zunächst erstellen, müssen Sie einen erforderlichen changeFeedStartFrom-Wert innerhalb des ChangeFeedIteratorOptions-Bereichs angeben, der sowohl aus der Startposition zum Lesen von Änderungen als auch aus der Ressource (einem Partitionsschlüssel oder einem FeedRange) besteht, für die Änderungen abgerufen werden sollen.
Hinweis
Wenn kein changeFeedStartFrom Wert festgelegt wurde, wird der Änderungsfeed für einen gesamten Container ab Now() abgerufen.
Derzeit wird nur die neueste Version vom JavaScript SDK unterstützt und standardmäßig ausgewählt.
Sie können optional maxItemCount in ChangeFeedIteratorOptions verwenden, um die maximale Anzahl von Elementen festzulegen, die pro Seite empfangen werden kann.
Hier sehen Sie ein Beispiel für das Abrufen eines Iterators im Modus „Neueste Version“, der Entitätsobjekte zurückgibt:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Verwenden der Änderungen für einen vollständigen Container
Wenn Sie keinen FeedRange- oder PartitionKey-Parameter in ChangeFeedStartFrom angeben, können Sie den Änderungsfeed eines vollständigen Containers in Ihrem eigenen Tempo verarbeiten. Im folgenden Beispiel wird das Lesen aller Änderungen ab dem aktuellen Zeitpunkt begonnen:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Da es sich beim Änderungsfeed im Grunde um eine unbegrenzte Liste von Elementen handelt, die alle zukünftigen Schreib- und Updatevorgänge enthält, ist der Wert von hasMoreResults immer true. Wenn Sie versuchen, den Änderungsfeed zu lesen, und keine neuen Änderungen vorliegen, erhalten Sie eine Antwort mit dem Status NotModified. Dies unterscheidet sich von dem Empfangen einer Antwort ohne Änderungen und OK Status. Es ist möglich, leere Antworten im Änderungsfeed zu erhalten, während weitere Änderungen verfügbar sind. Sie sollten weiter abfragen, bis der Empfang von NotModified erfolgt. Im vorherigen Beispiel wird NotModified behandelt, indem fünf Sekunden gewartet werden, bevor erneut auf Änderungen überprüft wird.
Verwenden der Änderungen für einen Partitionsschlüssel
In einigen Fällen können Sie nur die Änderungen für einen bestimmten Partitionsschlüssel verarbeiten. Sie können den Iterator für einen bestimmten Partitionsschlüssel abrufen und die Änderungen auf die gleiche Weise wie für einen vollständigen Container verarbeiten.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Verwenden von FeedRange für die Parallelisierung
Beim Pullmodell für den Änderungsfeed können Sie FeedRange verwenden, um die Verarbeitung des Änderungsfeeds zu parallelisieren. Ein FeedRange stellt einen Bereich von Partitionsschlüsselwerten dar.
Das folgende Beispiel zeigt, wie Sie eine Liste von Bereichen für Ihren Container abrufen:
const ranges = await container.getFeedRanges();
Wenn Sie eine Liste von FeedRange-Werten für Ihren Container abrufen, erhalten Sie einen FeedRange pro physische Partition.
Durch Verwenden eines FeedRange können Sie einen Iterator erstellen, um die Verarbeitung des Änderungsfeeds parallel auf mehrere Computer oder Threads zu verteilen. Im Gegensatz zum vorherigen Beispiel, das gezeigt hat, wie sie einen Änderungsfeed-Iterator für den gesamten Container oder einen einzelnen Partitionsschlüssel abrufen können, können Sie FeedRanges verwenden, um mehrere Iteratoren abzurufen, die den Änderungsfeed parallel verarbeiten können.
Das folgende Beispiel zeigt, wie Sie vom Anfang des Änderungsfeeds des Containers lesen und dafür zwei hypothetische, getrennte Computer verwenden, die parallel lesen:
Computer 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Computer 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Speichern von Fortsetzungstoken
Sie können die Position Ihres Iterators speichern, indem Sie ein Fortsetzungstoken abrufen. Ein Fortsetzungstoken ist ein Zeichenfolgenwert, der die zuletzt verarbeiteten Änderungen Ihres Änderungsfeeds nachverfolgt und dem Iterator zu diesem Zeitpunkt später die Fortsetzung ermöglicht. Das Fortsetzungstoken (sofern angegeben) hat Vorrang vor den Werten für die Startzeit und das Beginnen am Anfang. Der folgende Code liest den Änderungsfeed seit der Erstellung des Containers. Wenn keine weiteren Änderungen mehr verfügbar sind, dauert ein Fortsetzungstoken fort, sodass der Änderungsfeed später weiter verarbeitet werden kann.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Das Fortsetzungstoken läuft nie ab, solange der Azure Cosmos DB-Container noch vorhanden ist.
Verwenden von AsyncIterator
Sie können den Änderungsfeed mithilfe von JavaScript AsyncIterator abrufen. Hier ist ein Beispiel für AsyncIterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}