Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Dieser Abschnitt enthält eine allgemeine Übersicht über die Orleans Stream-Implementierung. Es beschreibt Konzepte und Details, die auf Anwendungsebene nicht sichtbar sind. Wenn Sie nur Datenströme verwenden möchten, müssen Sie diesen Abschnitt nicht lesen.
Terminologie:
Wir verweisen mit dem Wort „Warteschlange“ auf jede dauerhafte Speichertechnologie, die Streamereignisse erfassen kann und entweder das Pullen von Ereignissen ermöglicht oder einen pushbasierten Mechanismus zum Nutzen von Ereignissen bereitstellt. In der Regel stellen diese Technologien zur Gewährleistung der Skalierbarkeit aufgeteilte/partitionierte Warteschlangen bereit. Mit Azure-Warteschlangen können Sie beispielsweise mehrere Warteschlangen erstellen, und Event Hubs verfügen über mehrere Hubs.
Persistente Ströme
Alle Orleans anbieter für beständigen Datenstrom teilen eine gemeinsame Implementierung PersistentStreamProvider. Diese generischen Streamanbieter müssen mit einer technologiespezifischen IQueueAdapterFactory konfiguriert werden.
Beispielsweise verfügen wir zu Testzwecken über Warteschlangenadapter, die ihre Testdaten generieren, anstatt die Daten aus einer Warteschlange zu lesen. Der folgende Code zeigt, wie wir einen dauerhaften Stream-Provider so konfigurieren, dass er unseren benutzerdefinierten Warteschlangen-Adapter (Generator) verwendet. Hierzu konfigurieren Sie den Anbieter für beständigen Datenstrom mit einer Factoryfunktion, die zum Erstellen des Adapters verwendet wird.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Wenn ein Stream-Producer ein neues Stream-Element generiert und stream.OnNext() aufruft, ruft die Orleans Streaming Runtime die entsprechende Methode auf dem IQueueAdapter dieses Stream-Anbieters auf, der das Element direkt in die entsprechende Warteschlange einreiht.
Pull-Agenten
Im Mittelpunkt des persistenten Streamanbieters stehen die Pull-Agenten. Pull-Agenten pullen Ereignisse aus einer Reihe dauerhafter Warteschlangen und übermitteln sie an den Anwendungscode in Grains, die sie nutzen. Man kann sich die Pull-Agenten als einen verteilten „Mikroservice“ vorstellen – eine partitionierte, hochverfügbare und elastische verteilte Komponente. Die Pulling-Agenten werden in denselben Silos ausgeführt, in denen auch die Anwendungs-Grains gehostet werden, und werden vollständig von der Orleans Streaming Runtime verwaltet.
StreamQueueMapper und StreamQueueBalancer
Pull-Agenten werden mit IStreamQueueMapper und IStreamQueueBalancer parametrisiert. Die IStreamQueueMapper stellt eine Liste aller Warteschlangen bereit und ist auch für die Zuordnung von Streams zu Warteschlangen verantwortlich. Auf diese Weise weiß die Producerseite des persistenten Streamanbieters, in welche Warteschlange die Nachricht eingereiht werden soll.
Das IStreamQueueBalancer drückt aus, wie die Warteschlangen über Orleans Silos und Agenten hinweg ausgeglichen werden. Ziel ist es, Agenten Warteschlangen auf ausgewogene Weise zuzuweisen, um Engpässe zu vermeiden und die Flexibilität zu unterstützen. Wenn dem Orleans Cluster ein neues Silo hinzugefügt wird, werden Warteschlangen automatisch über die alten und neuen Silos neu ausgeglichen. Dies StreamQueueBalancer ermöglicht das Anpassen dieses Prozesses. Orleans verfügt über mehrere integrierte StreamQueueBalancers, um verschiedene Ausgleichsszenarien (große und kleine Anzahl von Warteschlangen) und verschiedene Umgebungen (Azure, lokal, statisch) zu unterstützen.
Im obigen Beispiel für den Testgenerator zeigt der folgende Code, wie man den Warteschlangen-Mapper und den Warteschlangenausgleicher konfigurieren kann.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Mit dem obigen Code wird die GeneratorAdapterFactory so konfiguriert, dass sie Warteschlangenzuordnungsprogramm mit acht Warteschlangen verwendet und die Warteschlangen über den Cluster mithilfe von DynamicClusterConfigDeploymentBalancer ausgeglichen werden.
Pull-Protokoll
Jedes Silo führt eine Reihe von Pull-Agenten aus, wobei jeder Agent aus einer Warteschlange pullt. Pulling Agents selbst werden von einer internen Laufzeitkomponente implementiert, die als SystemTarget bezeichnet wird. SystemTargets sind im Wesentlichen Runtime-Grains, sie unterliegen Singlethread-Parallelität, können reguläres Grain-Messaging verwenden und sind so einfach wie Grains. Im Gegensatz zu Grains sind Systemziele nicht virtuell: Sie werden explizit während der Laufzeit erstellt und sind nicht ortsunabhängig. Durch die Implementierung von Pulling-Agents als SystemTargets kann die Orleans Streaming-Runtime auf integrierte Orleans Features zurückgreifen und auf eine sehr große Anzahl von Warteschlangen skalieren, da das Erstellen eines neuen Pulling-Agenten genauso kostengünstig ist wie das Erstellen eines neuen Grain.
Jeder Pull-Agent führt einen regelmäßigen Timer aus, der durch Aufrufen der IQueueAdapterReceiver.GetQueueMessagesAsync-Methode aus der Warteschlange pullt. Die zurückgegebenen Nachrichten werden in die interne Datenstruktur eingefügt, die pro Agenten IQueueCache genannt wird. Jede Nachricht wird überprüft, um den Zieldatenstrom zu ermitteln. Der Agent verwendet die Pub-Sub, um die Liste der Stream-Consumer zu ermitteln, die diesen Stream abonniert haben. Sobald die Verbraucherliste abgerufen wurde, speichert der Agent sie lokal (im Pub-Sub-Cache), damit er nicht bei jeder Nachricht mit Pub-Sub in Verbindung treten muss. Der Agent abonniert außerdem die Pub-Sub, um Benachrichtigungen über neue Verbraucher zu erhalten, die diesen Stream abonnieren. Dieser Handshake zwischen dem Agent und dem Pub-Sub garantiert eine starke Streamingabonnementsemantik: Sobald der Verbraucher den Stream abonniert hat, werden alle Ereignisse angezeigt, die nach dem Abonnieren generiert wurden. Darüber hinaus ermöglicht die Verwendung von StreamSequenceToken ein Abonnieren in der Vergangenheit.
Warteschlangencache
IQueueCache ist eine interne Datenstruktur pro Agent, die die Möglichkeit bietet, neue Ereignisse von der Warteschlange zu entkoppeln und sie an die Consumer zu liefern. Es ermöglicht auch die Entkoppelung der Übermittlung an verschiedene Datenströme und verschiedene Verbraucher.
Stellen Sie sich eine Situation vor, in der ein Stream 3 Stream-Verbraucher hat und einer davon langsam ist. Wenn dies nicht der Fall ist, kann sich dieser langsame Consumer auf den Fortschritt des Agents auswirken, den Verbrauch anderer Consumer dieses Datenstroms verlangsamen und sogar das Dequeuing und die Übermittlung von Ereignissen für andere Streams verlangsamen. Um dies zu verhindern und maximale Parallelität im Agenten zuzulassen, verwenden wir IQueueCache.
IQueueCache puffert Stream-Ereignisse und bietet dem Agenten die Möglichkeit, jedem Consumer Ereignisse in seinem eigenen Tempo zu liefern. Die Übermittlung pro Consumer wird durch die interne Komponente namens IQueueCacheCursor implementiert, die den Fortschritt pro Consumer nachverfolgt. Auf diese Weise empfängt jeder Verbraucher Ereignisse in seinem eigenen Tempo: Schnelle Verbraucher erhalten Ereignisse so schnell wie sie aus der Warteschlange entfernt werden, und langsame Verbraucher erhalten sie später. Sobald die Nachricht an alle Verbraucher übermittelt wurde, kann sie aus dem Cache gelöscht werden.
Rückdruck
Der Gegendruck in der Orleans Streaming Runtime kommt an zwei Stellen zum Tragen: Bringen von Stream-Ereignissen aus der Warteschlange zum Agenten und Liefern der Ereignisse vom Agenten an die Stream-Consumer.
Letzteres wird vom integrierten Orleans Nachrichtenübermittlungsmechanismus bereitgestellt. Jedes Stream-Ereignis wird vom Agenten über das standardmäßige Orleans-Grain-Messaging an die Consumer geliefert, eines nach dem anderen. Das heißt, die Agents senden ein Ereignis (oder eine begrenzte Anzahl von Ereignissen) an jeden Stream-Consumer und warten diesen Anruf. Das nächste Ereignis wird erst übermittelt, wenn die Aufgabe für das vorherige Ereignis aufgelöst oder unterbrochen wurde. Auf diese Weise beschränken wir die Übermittlungsrate pro Verbraucher auf jeweils eine Nachricht.
Bei der Übermittlung von Stream-Ereignissen aus der Warteschlange an den Agenten bietet Orleans-Streaming einen neuen speziellen Backpressure-Mechanismus. Da der Agent das Entfernen von Ereignissen aus der Warteschlange entkoppelt und sie an Verbraucher sendet, kann ein einzelner langsamer Consumer so stark zurückfallen, dass sich der IQueueCache füllt. Um zu verhindern, dass IQueueCache unbegrenzt wächst, begrenzen wir seine Größe (die Größenbegrenzung ist konfigurierbar). Der Agent entsorgt jedoch nie nicht zugestellte Ereignisse.
Wenn sich der Cache zu füllen beginnt, verlangsamen die Agents stattdessen die Geschwindigkeit, Ereignisse aus der Warteschlange zu entfernen. Auf diese Weise können wir die langsamen Lieferzeiten „aussitzen“, indem wir die Rate anpassen, mit der wir aus der Warteschlange verbrauchen („Rückstau“) und später wieder zu schnellen Verbrauchsraten übergehen. Um die Täler der "langsamen Zustellung" zu erkennen, benutzt die IQueueCache eine interne Datenstruktur von Cache-Buckets, die den Fortschritt der Zustellung von Ereignissen an einzelne Stream-Consumer nachverfolgt. Dies führt zu einem sehr reaktionsfähigen und selbst anpassenden System.