Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Les applications interagissent avec des flux via des API très similaires aux extensions réactives connues (Rx) dans .NET. La principale différence est que les Orleans extensions de flux sont asynchrones pour rendre le traitement plus efficace dans Orleansl’infrastructure de calcul distribuée et évolutive.
Flux asynchrone
Vous commencez par utiliser un fournisseur de flux pour obtenir un handle sur un flux. Vous pouvez considérer un fournisseur de flux comme une fabrique de flux qui permet aux implémenteurs de personnaliser le comportement et la sémantique des flux :
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Vous pouvez obtenir une référence au fournisseur de flux en appelant la Grain.GetStreamProvider méthode à l’intérieur d’un grain ou en appelant la GetStreamProvider méthode sur l’instance cliente.
Orleans.Streams.IAsyncStream<T> est un handle logique, fortement typé à un flux virtuel, similaire dans l’esprit à une Orleans référence de grain. Les appels à GetStreamProvider et GetStream sont purement locaux. Les arguments de GetStream sont un GUID et une chaîne supplémentaire appelée espace de noms de flux (qui peut être nulle). Ensemble, le GUID et la chaîne d’espace de noms comprennent l’identité de flux (similaire aux arguments pour IGrainFactory.GetGrain). Cette combinaison offre une flexibilité supplémentaire pour déterminer les identités de flux. Tout comme le grain 7 peut exister dans le PlayerGrain type et un autre grain 7 peut exister dans le ChatRoomGrain type, Stream 123 peut exister dans l’espace PlayerEventsStream de noms, et un autre flux 123 peut exister dans l’espace ChatRoomMessagesStream de noms.
Production et consommation
IAsyncStream<T> implémente les interfaces IAsyncObserver<T> et IAsyncObservable<T>. Cela permet à votre application d’utiliser le flux pour produire de nouveaux événements à l’aide IAsyncObserver<T> ou pour s’abonner et consommer des événements à l’aide IAsyncObservable<T>de .
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Pour produire des événements dans le flux, votre application appelle :
await stream.OnNextAsync<T>(event)
Pour vous abonner à un flux, votre application appelle :
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
L'argument à SubscribeAsync peut être soit un objet implémentant l'interface IAsyncObserver<T>, soit une combinaison de fonctions lambda pour gérer les événements entrants. D’autres options SubscribeAsync sont disponibles via la AsyncObservableExtensions classe.
SubscribeAsync retourne un StreamSubscriptionHandle<T>handle opaque utilisé pour se désabonner du flux (similaire à une version asynchrone de IDisposable).
await subscriptionHandle.UnsubscribeAsync()
Il est important de noter que l’abonnement concerne un grain, pas pour une activation. Une fois que le code grain s’abonne au flux, cet abonnement dépasse la durée de cette activation et reste durable à jamais jusqu’à ce que le code grain (potentiellement dans une autre activation) se désabonne explicitement. Il s’agit du cœur de l’abstraction de flux virtuel : non seulement tous les flux existent toujours logiquement, mais un abonnement de flux est également durable et vit au-delà de l’activation physique particulière qui l’a créée.
Multiplicité
Un Orleans flux peut avoir plusieurs producteurs et plusieurs consommateurs. Un message publié par un producteur est remis à tous les consommateurs abonnés au flux avant la publication du message.
En outre, un consommateur peut s’abonner au même flux plusieurs fois. Chaque fois qu'il s'abonne, il obtient un StreamSubscriptionHandle<T> unique. Si un grain (ou client) s’abonne à X fois au même flux, il reçoit le même événement X fois, une fois pour chaque abonnement. Le consommateur peut également se désabonner d’un abonnement individuel. Vous trouverez tous les abonnements actuels en appelant :
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Récupération après échec
Si le producteur d’un flux meurt (ou si son grain est désactivé), il n’a rien à faire. La prochaine fois que ce grain veut produire davantage d’événements, il peut obtenir à nouveau le gestionnaire de flux et produire de nouveaux événements comme d’habitude.
La logique du consommateur est légèrement plus impliquée. Comme mentionné précédemment, une fois qu’un grain consommateur s’abonne à un flux, cet abonnement est valide jusqu’à ce que le grain se désabonne explicitement. Si le consommateur du flux meurt (ou si son grain est désactivé) et qu’un nouvel événement est généré sur le flux, le grain du consommateur se réactive automatiquement (tout comme tout grain normal Orleans s’active automatiquement lorsqu’un message lui est envoyé). La seule chose que le code grain doit faire maintenant est de fournir un IAsyncObserver<T> pour traiter les données. Le consommateur doit rattacher la logique de traitement dans le cadre de la méthode OnActivateAsync(). Pour ce faire, il peut appeler :
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Le consommateur utilise le handle précédent obtenu pendant l’abonnement initial pour « reprendre le traitement ». Notez que ResumeAsync simplement met à jour un abonnement existant avec la nouvelle instance de IAsyncObserver logique et ne modifie pas le fait que ce consommateur est déjà abonné à ce flux.
Comment le consommateur obtient-il l’ancien subscriptionHandle? Il existe deux options. Le consommateur a peut-être conservé le handle retourné à partir de l’opération d’origine SubscribeAsync et peut l’utiliser maintenant. Sinon, si le consommateur n’a pas l'identifiant, il peut demander à IAsyncStream<T> de lui fournir tous ses identifiants d’abonnement actifs en appelant :
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Le consommateur peut ensuite reprendre l’ensemble d’entre eux ou se désabonner de certains si vous le souhaitez.
Conseil
Si le grain du consommateur implémente directement l'interface IAsyncObserver<T> (public class MyGrain<T> : Grain, IAsyncObserver<T>), il ne devrait théoriquement pas avoir besoin de réattacher IAsyncObserver et donc n'aurait pas besoin d'appeler ResumeAsync. Le runtime de streaming doit déterminer automatiquement que le grain implémente déjà IAsyncObserver et appeler ces méthodes IAsyncObserver. Toutefois, le runtime de diffusion en continu ne prend actuellement pas en charge cela, et le code grain doit toujours appeler ResumeAsyncexplicitement, même si le grain implémente IAsyncObserver directement.
Abonnements explicites et implicites
Par défaut, un consommateur de flux doit s’abonner explicitement au flux. Cet abonnement est généralement déclenché par un message externe que le grain (ou le client) reçoit lui demandant de s’abonner. Par exemple, dans un service de conversation, lorsqu’un utilisateur rejoint une salle de conversation, son grain reçoit un JoinChatGroup message avec le nom de la conversation, ce qui entraîne l’abonnement de l’utilisateur à ce flux de conversation.
En outre, Orleans les flux prennent en charge les abonnements implicites. Dans ce modèle, le grain ne souscrit pas explicitement. Elle s'abonne automatiquement et implicitement en fonction de son identité de grain et d’un ImplicitStreamSubscriptionAttribute. La principale valeur des abonnements implicites réside dans leur capacité à permettre à l'activité de flux de déclencher automatiquement l'activation des grains, et par conséquent, celle des abonnements. Par exemple, à l'aide de flux SMS, si un grain voulait produire un flux et qu'un autre grain voulait le traiter, le producteur aurait besoin de l'identité du grain consommateur et devrait effectuer un appel au grain pour lui demander de s'abonner. Il peut alors commencer à envoyer des événements. Au lieu de cela, avec des abonnements implicites, le producteur peut simplement commencer à produire des événements dans un flux de données, et le grain du consommateur s’active et s’abonne automatiquement. Dans ce cas, le producteur n’a pas besoin de savoir qui lit les événements.
L’implémentation de grain MyGrainType peut déclarer un attribut [ImplicitStreamSubscription("MyStreamNamespace")]. Cela indique au runtime de streaming que lorsqu’un événement se produit sur un flux avec le GUID d’identité XXX et l’espace de noms "MyStreamNamespace", il doit être remis au grain avec l’identité XXX de type MyGrainType. Autrement dit, le runtime mappe le flux <XXX, MyStreamNamespace> au grain de consommateur <XXX, MyGrainType>.
La présence de ImplicitStreamSubscription dans le runtime entraîne l'abonnement automatique de ce grain au flux et la restitution des événements de flux. Toutefois, le code de grain doit toujours indiquer au runtime comment il souhaite traiter les événements. Essentiellement, il doit attacher IAsyncObserver. Par conséquent, lorsque le grain s’active, le code de grain à l’intérieur OnActivateAsync doit appeler :
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Écriture de la logique d’abonnement
Vous trouverez ci-dessous des instructions pour écrire une logique d’abonnement dans différents cas : abonnements explicites et implicites, flux réwindables et non réwindables. La principale différence entre les abonnements explicites et implicites est que pour les abonnements implicites, le grain a toujours exactement un abonnement implicite par espace de noms de flux. Il n’existe aucun moyen de créer plusieurs abonnements (aucune multiplicité des abonnements), aucun moyen de se désabonner, et la logique de grain n’a besoin que d’attacher la logique de traitement. Cela signifie également qu’il n’est jamais nécessaire de reprendre un abonnement implicite. En revanche, pour les abonnements explicites, vous devez reprendre l’abonnement ; sinon, l’abonnement entraîne à nouveau l’abonnement au grain plusieurs fois.
Abonnements implicites :
Pour les abonnements implicites, le grain doit toujours s’abonner pour attacher la logique de traitement. Vous pouvez le faire dans le grain consommateur en implémentant les interfaces IStreamSubscriptionObserver et IAsyncObserver<T>, autorisant ainsi le grain à s’activer séparément du processus de souscription. Pour s’abonner au flux, le grain crée un handle et appelle await handle.ResumeAsync(this) dans sa méthode OnSubscribed(...).
Pour traiter les messages, implémentez la IAsyncObserver<T>.OnNextAsync(...) méthode pour recevoir des données de flux et un jeton de séquence.
ResumeAsync La méthode peut également prendre un ensemble de délégués représentant les méthodes de l’interface IAsyncObserver<T> : onNextAsync, onErrorAsyncet onCompletedAsync.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Abonnements explicites :
Pour les abonnements explicites, un grain doit appeler SubscribeAsync pour s’abonner au flux. Cela crée un abonnement et y associe la logique de traitement. L’abonnement explicite existe jusqu’à ce que le grain se désabonne. Si un grain se désactive et se réactive, il est toujours abonné explicitement, mais aucune logique de traitement n'est associée. Dans ce cas, le grain doit rattacher la logique de traitement. Pour ce faire, le OnActivateAsync doit d'abord connaître ses abonnements en appelant IAsyncStream<T>.GetAllSubscriptionHandles(). Le grain doit exécuter ResumeAsync sur chaque référence pour laquelle il souhaite poursuivre le traitement ou UnsubscribeAsync sur toute référence avec laquelle il a terminé. Le grain peut également spécifier le StreamSequenceToken comme argument lors des appels de ResumeAsync, ce qui entraîne le début de la consommation de cet abonnement explicite à partir de ce jeton.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Ordre des flux et jetons de séquence
L’ordre de livraison des événements entre un producteur individuel et un consommateur dépend du fournisseur de flux.
Avec SMS, le producteur contrôle explicitement l’ordre des événements vus par le consommateur en contrôlant la façon dont ils les publient. Par défaut (si l’option SimpleMessageStreamProviderOptions.FireAndForgetDelivery du fournisseur SMS est false) et si le producteur attend chaque OnNextAsync appel, les événements arrivent dans l’ordre FIFO. Dans SMS, le producteur décide comment gérer les échecs de livraison indiqués par un échec Task retourné par l'appel OnNextAsync.
Les flux de files d’attente Azure ne garantissent pas l’ordre FIFO, car les files d’attente Azure sous-jacentes ne garantissent pas l’ordre dans les cas d’échec (bien qu’ils garantissent l’ordre FIFO dans les exécutions sans échec). Lorsqu’un producteur produit un événement dans une file d’attente Azure, si l’opération de file d’attente échoue, le producteur doit tenter une autre file d’attente et traiter ultérieurement les messages en double potentiels. Côté livraison, l'environnement d'exécution de streaming défile l'événement et tente de le livrer pour traitement aux consommateurs. Le runtime supprime l’événement de la file d’attente uniquement lors du traitement réussi. Si la remise ou le traitement échoue, l’événement n’est pas supprimé de la file d’attente et réapparaît automatiquement ultérieurement. Le runtime de streaming essaie de le redélivrer, ce qui pourrait casser l'ordre FIFO. Ce comportement correspond à la sémantique normale des files d’attente Azure.
Ordre défini par l’application : pour gérer les problèmes de classement ci-dessus, votre application peut éventuellement spécifier son classement. Effectuez cette opération à l’aide d’un StreamSequenceTokenobjet opaque IComparable utilisé pour classer les événements. Un producteur peut transmettre un StreamSequenceToken facultatif à l’appel OnNextAsync. Cela StreamSequenceToken est transmis au consommateur et est fourni avec l’événement. De cette façon, votre application peut raisonner et reconstruire son ordre indépendamment du runtime de diffusion en continu.
Flux rembobinables
Certains flux autorisent uniquement l’abonnement à partir du dernier point dans le temps, tandis que d’autres autorisent « revenir en temps ». Cette fonctionnalité dépend de la technologie de mise en file d’attente sous-jacente et du fournisseur de flux spécifique. Par exemple, les files d'attente Azure permettent uniquement de consommer les événements les plus récents mis en file d'attente, tandis qu'Event Hubs permet de relire les événements à partir d'un point arbitraire dans le temps (jusqu'à expiration). Les flux prenant en charge le retour dans le temps sont appelés flux rebouclables.
Le consommateur d’un flux rembobinable peut transmettre un StreamSequenceToken à l’appel SubscribeAsync. Le runtime transmet les événements à partir de ce StreamSequenceToken. Un jeton null signifie que le consommateur souhaite recevoir les événements à partir du plus récent.
La possibilité de rembobiner un flux est très utile dans les scénarios de récupération. Par exemple, considérez un grain qui s’abonne à un flux et contrôle régulièrement son état avec le dernier jeton de séquence. Lors de la récupération d’une défaillance, le grain peut s’abonner à nouveau au même flux à partir du dernier jeton de séquence enregistré, lui permettant de récupérer sans perdre les événements générés depuis le dernier point de contrôle.
Le fournisseur Event Hubs est réinitialisable. Vous trouverez son code sur GitHub : Orleans/Azure/Orleans.Streaming.EventHubs. Les fournisseurs SMS (désormais canal de diffusion) et File d’attente Azurene sont pas réinscriptibles.
Traitement sans état avec scale-out automatique
Par défaut, Orleans les cibles de streaming prennent en charge un grand nombre de flux relativement petits, chacun étant traité par un ou plusieurs grains avec état. Collectivement, le traitement de tous les flux est partitionné parmi de nombreux grains réguliers (avec état). Votre code d'application contrôle ce sharding en affectant des ID de flux et des ID de grain et en souscrivant explicitement. L’objectif est un traitement avec état partitionné.
Toutefois, il existe également un scénario intéressant de traitement sans état mis à l’échelle automatiquement. Dans ce scénario, une application a un petit nombre de flux (ou même un flux volumineux) et l’objectif est un traitement sans état. Un exemple est un flux global d’événements où le traitement implique le décodage de chaque événement et le transfert potentiellement vers d’autres flux pour un traitement avec état supplémentaire. Le traitement de flux avec montée en charge sans état peut être pris en charge dans Orleans via des grains grâce à StatelessWorkerAttribute.
État actuel du traitement automatiquement mis à l’échelle sans état : Cela n’est pas encore implémenté. La tentative d’abonnement à un flux à partir d’un StatelessWorker grain entraîne un comportement non défini.
Nous envisageons de prendre en charge cette option.
Grains et clients Orleans
Orleans envoie du travail en streaming uniformément entre les grains et les clients Orleans. Cela signifie que vous pouvez utiliser les mêmes API à l’intérieur d’un grain et dans un Orleans client pour produire et consommer des événements. Cela simplifie considérablement la logique de l’application, ce qui rend les API spéciales côté client comme les Grain Observers redondantes.
Streaming Pub-Sub entièrement managé et fiable
Pour suivre les abonnements de flux, Orleans utilise un composant runtime appelé Streaming Pub-Sub, qui sert de point de rendez-vous pour les consommateurs et les producteurs de flux. Pub-sub assure le suivi de tous les abonnements de flux, les conserve et met en relation les consommateurs de flux avec les producteurs de flux.
Les applications peuvent choisir où et comment stocker les données Pub-Sub. Le composant Pub-Sub lui-même est implémenté en tant que grains (appelés PubSubRendezvousGrain), qui utilisent la persistance déclarative d’Orleans.
PubSubRendezvousGrain utilise le fournisseur de stockage nommé PubSubStore. Comme avec n’importe quel grain, vous pouvez désigner une implémentation pour un fournisseur de stockage. Pour Streaming Pub-Sub, vous pouvez modifier l’implémentation du PubSubStore au moment de la construction du silo en utilisant le générateur d’hôte de silo.
Le code suivant configure Pub-Sub pour stocker son état dans les tables Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
De cette façon, les données Pub-Sub sont stockées durablement dans Azure Table. Pour le développement initial, vous pouvez également utiliser un stockage en mémoire. Outre Pub-Sub, le runtime de streaming Orleans remet les événements des producteurs aux consommateurs, gère toutes les ressources d’exécution allouées aux flux activement utilisés, et effectue de manière transparente le garbage collection des ressources d’exécution à partir des flux inutilisés.
Paramétrage
Pour utiliser des flux, vous devez activer les fournisseurs de flux via l’hôte de silo ou les générateurs de clients de cluster. Exemple de configuration d’un fournisseur de flux :
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");