Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Die asynchrone Fortschrittsverfolgung ermöglicht es strukturierten Streaming-Pipelines, den Fortschritt asynchron und parallel zur tatsächlichen Datenverarbeitung in einem Mikro-Batch zu überwachen, wodurch die Latenz reduziert wird, die mit der Aufrechterhaltung von offsetLog und commitLog verbunden ist.
Hinweis
Die asynchrone Statusnachverfolgung funktioniert nicht mit Trigger.once- oder Trigger.availableNow-Auslösern. Wenn Sie versuchen, dieses Feature mit diesen Triggern zu aktivieren, tritt ein Abfragefehler auf.
Wie funktioniert die asynchrone Statusverfolgung, um die Latenz zu reduzieren?
Strukturiertes Streaming basiert auf dem Beibehalten und Verwalten von Offsets als Statusindikatoren für die Abfrageverarbeitung. Der Offsetverwaltungsvorgang wirkt sich direkt auf die Verarbeitungslatenz aus, da keine Datenverarbeitung erfolgen kann, bis diese Vorgänge abgeschlossen sind. Die asynchrone Statusnachverfolgung ermöglicht es strukturierten Streaming-Pipelines, den Fortschritt zu überwachen, ohne dass diese Offsetverwaltungsvorgänge betroffen sind.
Wann sollten Sie die Prüfpunkthäufigkeit konfigurieren?
Benutzer können die Häufigkeit konfigurieren, mit der der Fortschritt überprüft wird. Die Standardeinstellungen für die Prüfpunkthäufigkeit bieten einen guten Durchsatz für die meisten Abfragen. Die Konfiguration der Häufigkeit ist hilfreich für Szenarien, in denen Offsetverwaltungsvorgänge mit einer höheren Rate auftreten, als sie verarbeitet werden können, wodurch ein immer größerer Backlog von Offsetverwaltungsvorgängen entsteht. Um diesen wachsenden Backlog zu vermeiden, wird die Datenverarbeitung blockiert oder verlangsamt, wobei das Verarbeitungsverhalten im Wesentlichen zurückgesetzt wird, um die Vorteile der asynchronen Fortschrittsnachverfolgung zu beseitigen.
Hinweis
Die Fehlerwiederherstellungszeit erhöht sich mit der Erhöhung der Prüfpunktintervallzeit. Im Falle eines Ausfalls muss eine Pipeline alle Daten vor dem vorherigen erfolgreichen Prüfpunkt erneut verarbeiten. Benutzer können diesen Kompromiss zwischen geringerer Latenz während der normalen Verarbeitung und der Wiederherstellungszeit im Falle eines Ausfalls berücksichtigen.
Welche Konfigurationen sind mit der asynchronen Statusverfolgung verknüpft?
| Option | Wert | Standard | Description |
|---|---|---|---|
| asyncProgressTrackingEnabled | TRUE/FALSE | Falsch | Aktivieren oder Deaktivieren der asynchronen Statusnachverfolgung |
| asyncProgressTrackingCheckpointIntervalMs | Millisekunden | 1.000 | das Intervall, in dem Offset- und Abschluss-Commits ausgeführt werden |
Wie können Benutzer die asynchrone Statusnachverfolgung aktivieren?
Benutzer können Code verwenden, der dem folgenden Code ähnelt, um dieses Feature zu aktivieren:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Deaktivieren der asynchronen Statusnachverfolgung
Wenn die asynchrone Fortschrittsverfolgung aktiviert ist, sichert das Framework den Fortschritt nicht für jeden Batch. Um dies zu beheben, verarbeiten Sie vor dem Deaktivieren der asynchronen Statusverfolgung mindestens zwei Mikrobatches mit den folgenden Einstellungen:
.option("asyncProgressTrackingEnabled", "true").option("asyncProgressTrackingCheckpointIntervalMs", 0)
Beenden Sie die Abfrage, nachdem mindestens zwei Mikrobatches die Verarbeitung abgeschlossen haben. Jetzt können Sie die asynchrone Statusnachverfolgung sicher deaktivieren und die Abfrage neu starten.
Wenn Sie die asynchrone Statusnachverfolgung deaktiviert haben, ohne diesen Schritt abzuschließen, tritt möglicherweise der folgende Fehler auf:
java.lang.IllegalStateException: batch x doesn't exist
In den Treiberprotokollen wird möglicherweise der folgende Fehler angezeigt:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Im Anschluss an die Anweisungen in diesem Abschnitt zum Deaktivieren der asynchronen Statusnachverfolgung können Sie diese Fehler beheben und Ihre Streaming-Workload reparieren.
Einschränkungen bei der asynchronen Statusnachverfolgung
Für diese Funktion gelten folgende Einschränkungen:
- Die asynchrone Statusnachverfolgung wird nur in zustandslosen Pipelines unterstützt, wenn Kafka als Sink verwendet wird.
- Eine genau einmalige End-to-End-Verarbeitung ist mit asynchroner Fortschrittsverfolgung nicht garantiert, da Offsetbereiche für Batch bei einem Fehler geändert werden können. Einige Senken, wie z. B. Kafka, bieten nie genau einmal Garantien.