Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette section fournit une vue d’ensemble générale de l’implémentation de Orleans Stream. Il décrit les concepts et les détails qui ne sont pas visibles au niveau de l’application. Si vous envisagez uniquement d’utiliser des flux, vous n’avez pas besoin de lire cette section.
Terminologie :
Avec le terme « file d’attente », nous faisons référence à toute technologie de stockage durable capable d’ingérer des événements de flux et qui permet de tirer (pull) des événements ou qui fournit un mécanisme de type push pour consommer des événements. Généralement, pour fournir une extensibilité, ces technologies offrent des files d’attente shardées/partitionnées. Par exemple, les files d’attente Azure vous permettent de créer plusieurs files d’attente et Event Hubs ont plusieurs hubs.
Flux persistants
Tous les Orleans fournisseurs de flux persistants partagent une implémentation PersistentStreamProvidercommune. Ces fournisseurs de flux génériques doivent être configurés avec une technologie spécifique IQueueAdapterFactory.
Par exemple, à des fins de test, nous avons des adaptateurs de file d’attente qui génèrent leurs données de test plutôt que de lire les données à partir d’une file d’attente. Le code ci-dessous montre comment configurer un fournisseur de flux persistant pour utiliser notre adaptateur de file d’attente personnalisé (générateur). Il le fait en configurant le fournisseur de flux persistant avec une fonction de fabrique utilisée pour créer l’adaptateur.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Lorsqu’un producteur de flux génère un nouvel élément de flux et appelle stream.OnNext(), le streaming runtime Orleans invoque la méthode appropriée sur le fournisseur de flux IQueueAdapter qui enfile l’élément directement dans la file adéquate.
Agents de tirage
Les agents de tirage figurent au cœur du fournisseur de flux persistant. Les agents de tirage tirent (pull) des événements à partir d’un ensemble de files d’attente durables et les remettent au code d’application en grains qu’il consomme. Les agents de tirage peuvent être vus comme un « micro-service » distribué, un composant distribué, partitionné, hautement disponible et élastique. Les agents d’extraction s’exécutent dans les mêmes silos que ceux qui hébergent les grains d’application et sont entièrement gérés par le Runtime de diffusion Orleans.
StreamQueueMapper et StreamQueueBalancer
Les agents de traction sont paramétrés avec IStreamQueueMapper et IStreamQueueBalancer. IStreamQueueMapper fournit une liste de toutes les files d’attente et est responsable également du mappage des flux aux files d’attente. De cette façon, le côté producteur du fournisseur de flux persistant sait dans quelle file d’attente placer le message.
L’IStreamQueueBalancer exprime comment les files d'attente sont équilibrées entre les silos Orleans et les agents. L’objectif est d’affecter des files d’attente à des agents de manière équilibrée, afin d’éviter les goulots d’étranglement et de favoriser l’élasticité. Lorsqu’un nouveau silo est ajouté au Orleans cluster, les files d’attente sont rééquilibrées automatiquement sur les anciens et nouveaux silos. StreamQueueBalancer permet de personnaliser ce processus. Orleans a plusieurs streamQueueBalancers intégrés pour prendre en charge différents scénarios d’équilibrage (grand et petit nombre de files d’attente) et différents environnements (Azure, local, statique).
À l’aide de l’exemple de générateur de test ci-dessus, le code ci-dessous montre comment configurer le mappeur de file d’attente et l’équilibreur de file d’attente.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Le code ci-dessus configure le GeneratorAdapterFactory pour utiliser un mappeur de file d’attente avec huit files d’attente et équilibre les files d’attente sur le cluster à l’aide du DynamicClusterConfigDeploymentBalancer.
Protocole de tirage (pull)
Chaque silo exécute un ensemble d’agents de tirage et chaque agent effectue le tirage à partir d’une file d’attente. Les agents d’extraction eux-mêmes sont implémentés par un composant runtime interne, appelé SystemTarget. Les composants SystemTarget sont essentiellement des grains de runtime, sont soumis à une concurrence à thread unique, peuvent utiliser la messagerie de grain standard et sont aussi légers que les grains. Contrairement aux grains, les composants SystemTarget ne sont pas virtuels : ils sont explicitement créés (par le runtime) et ne sont pas transparents pour l’emplacement. En mettant en œuvre des agents d'extraction comme SystemTargets, le runtime de streaming Orleans peut s'appuyer sur les fonctionnalités Orleans intégrées et être mis à l'échelle vers un très grand nombre de files d'attente, car la création d'un nouvel agent d'extraction est aussi peu coûteuse que celle d'un nouveau grain.
Chaque agent de tirage exécute un minuteur périodique qui effectue un tirage à partir de la file d’attente en appelant la méthode IQueueAdapterReceiver.GetQueueMessagesAsync. Les messages retournés sont placés dans la structure de données interne par agent appelée IQueueCache. Chaque message est inspecté pour connaître son flux de destination. L’agent utilise le Pub-Sub pour obtenir la liste des utilisateurs du flux qui se sont abonnés à ce flux. Une fois la liste des consommateurs récupérée, l’agent la stocke localement (dans son cache pub-sub) pour éviter de consulter Pub-Sub à chaque message. L’agent s’abonne également au pub-sub pour recevoir la notification de tous les nouveaux consommateurs qui s’abonnent à ce flux. Cette négociation entre l’agent et pub-sub garantit une sémantique d’abonnement de streaming forte : une fois que le consommateur s’est abonné au flux, il verra tous les événements qui ont été générés après son abonnement. En outre, l’utilisation StreamSequenceToken lui permet de s’abonner dans le passé.
Cache de file d’attente
IQueueCache est une structure de données par agent interne qui permet de dissocier le retrait des nouveaux événements de la file d’attente et de les remettre aux consommateurs. Il permet également de découpler la livraison à différents flux et différents consommateurs.
Imaginez une situation où un flux a 3 consommateurs de flux et où l’un d’eux est lent. Si des précautions ne sont pas prises, la lenteur de ce consommateur peut affecter la progression de l’agent, ralentir la consommation d’autres consommateurs de ce flux, et même ralentir le retrait de file d’attente et la remise des événements pour d’autres flux. Pour éviter cela et autoriser le parallélisme maximal dans l’agent, nous utilisons IQueueCache.
IQueueCache place en mémoire tampon les événements de flux et permet à l’agent de remettre les événements à chaque consommateur à son rythme propre. La livraison par consommateur est implémentée par le composant interne appelé IQueueCacheCursor, qui suit la progression par consommateur. De cette façon, chaque consommateur reçoit des événements à son propre rythme : les consommateurs rapides reçoivent des événements aussi rapidement qu’ils sont supprimés de la file d’attente, tandis que les consommateurs lents les reçoivent plus tard. Une fois le message remis à tous les consommateurs, il peut être supprimé du cache.
Contre-pression
La rétropression dans le système d'exécution Orleans de streaming s'applique dans deux cas de figure : transférer les événements de flux de la file d'attente à l'agent et transmettre les événements de l'agent aux consommateurs de flux.
Ce dernier est fourni par le mécanisme de remise de messages intégré Orleans . Chaque événement de flux est remis de l’agent aux consommateurs par le biais de la messagerie grain standard Orleans, un à la fois. Autrement dit, les agents envoient un événement (ou un lot limité d’événements) à chaque consommateur de flux et attendent cet appel. L’événement suivant ne démarre pas tant que la tâche de l’événement précédent n’a pas été résolue ou interrompue. De cette façon, nous limitons naturellement le taux de livraison par consommateur à un message à la fois.
Lorsque vous transférez des événements de flux de la file d’attente à l’agent, le streaming Orleans fournit un nouveau mécanisme spécial de rétropression. Comme l’agent dissocie le retrait des événements de la file d’attente et leur remise aux consommateurs, un consommateur lent peut à lui seul prendre un retard tel, que le cache IQueueCache sera rempli. Pour éviter IQueueCache de croître indéfiniment, nous limitons sa taille (la limite de taille est configurable). Toutefois, l’agent ne rejette jamais des événements non remis.
Au lieu de cela, quand le cache commence à se remplir, les agents ralentissent le taux de retrait des événements de la file d’attente. De cette façon, nous pouvons traverser les périodes de livraison lentes en ajustant le taux auquel nous consommons à partir de la file d’attente (en exerçant une « backpressure ») et revenir à des taux de consommation rapides par la suite. Pour détecter les creux de « remise lente », IQueueCache utilise une structure de données interne faite de compartiments de cache, qui suit la progression de la remise d’événements aux consommateurs de flux individuels. Cela entraîne un système très réactif et autoajustable.