Freigeben über


Orleans Schnellstart für Streaming

Dieses Handbuch zeigt Ihnen eine schnelle Möglichkeit zum Einrichten und Verwenden von Orleans Streams. Weitere Informationen zu den Details zu Streamingfeatures finden Sie in anderen Teilen dieser Dokumentation.

Erforderliche Konfigurationen

In diesem Leitfaden verwenden Sie einen speicherbasierten Stream, der Grain-Messaging verwendet, um Stream-Daten an Abonnenten zu senden. Sie verwenden den In-Memory-Speicheranbieter, um Listen von Abonnements zu speichern. Die Verwendung speicherbasierter Mechanismen für Streaming und Speicher ist nur für lokale Entwicklung und Tests vorgesehen, nicht für Produktionsumgebungen.

Auf dem Silo, wo silo ein ISiloBuilder ist, rufen Sie AddMemoryStreams an:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

Auf dem Clusterclient, wo client ein IClientBuilder ist, führen Sie AddMemoryStreams aus.

client.AddMemoryStreams("StreamProvider");

Verwenden Sie in diesem Leitfaden einen einfachen nachrichtenbasierten Datenstrom, der Grain-Messaging nutzt, um Daten an Abonnenten zu senden. Verwenden Sie den Speicheranbieter im Arbeitsspeicher, um Listen von Abonnements zu speichern; Dies ist keine kluge Wahl für echte Produktionsanwendungen.

Auf dem Silo, wo hostBuilder ein ISiloHostBuilder ist, rufen Sie AddSimpleMessageStreamProvider an:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

Auf dem Clusterclient, wo clientBuilder ein IClientBuilder ist, führen Sie AddSimpleMessageStreamProvider aus.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Hinweis

Nachrichten, die über den Einfachen Nachrichtenstream übermittelt werden, gelten standardmäßig als unveränderlich und können als Referenz an andere Grains übergeben werden. Um dieses Verhalten zu deaktivieren, konfigurieren Sie den SMS-Anbieter, um es zu deaktivieren SimpleMessageStreamProviderOptions.OptimizeForImmutableData.

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Sie können Datenströme erstellen, Daten als Produzenten senden und Daten als Abonnenten empfangen.

Produzieren von Ereignissen

Es ist relativ einfach, Ereignisse für Datenströme zu erzeugen. Rufen Sie zunächst den Zugriff auf den in der Konfiguration definierten Datenstromanbieter ab ("StreamProvider"), und wählen Sie dann einen Datenstrom aus, und übertragen Sie daten an ihn.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

Es ist relativ einfach, Ereignisse für Datenströme zu erzeugen. Rufen Sie zunächst den Zugriff auf den in der Konfiguration definierten Datenstromanbieter ab ("SMSProvider"), und wählen Sie dann einen Datenstrom aus, und übertragen Sie daten an ihn.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Wie Sie sehen können, verfügt der Datenstrom über eine GUID und einen Namespace. Dies erleichtert das Identifizieren eindeutiger Datenströme. Beispielsweise könnte der Namespace für einen Chatraum "Räume" sein, und die GUID könnte die GUID des Besitzers RoomGrain sein.

Verwenden Sie hier die GUID eines bekannten Chatrooms. Mithilfe der OnNextAsync-Methode des Datenstroms übergeben Sie Daten in den Datenstrom. Lassen Sie uns dies innerhalb eines Zeitgebers mithilfe von Zufallszahlen tun. Sie können auch jeden anderen Datentyp für den Datenstrom verwenden.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Abonnieren und Empfangen von Streamingdaten

Für den Empfang von Daten können Sie implizite und explizite Abonnements verwenden, die in expliziten und impliziten Abonnements ausführlicher beschrieben werden. In diesem Beispiel werden implizite Abonnements verwendet, die einfacher sind. Wenn ein Korntyp implizit einen Datenstrom abonnieren möchte, wird das Attribut [ImplicitStreamSubscription(namespace)]verwendet.

Definieren Sie für Ihren Fall folgendes ReceiverGrain :

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Immer wenn Daten an Datenströme im RANDOMDATA Namespace übertragen werden (wie im Timerbeispiel), erhält ein Grain des Typs ReceiverGrain mit demselben Guid wie dem Datenstrom die Nachricht. Selbst wenn aktuell keine Aktivierungen des Grains vorhanden sind, erstellt die Laufzeitumgebung automatisch eine neue und sendet die Nachricht an diese.

Damit dies funktioniert, schließen Sie den Abonnementprozess ab, indem Sie die Methode für den OnNextAsync Empfang von Daten festlegen. Dazu sollte ReceiverGrain etwas wie folgt in seinem OnActivateAsync aufrufen:

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Sie sind fertig! Jetzt ist die einzige Anforderung, dass etwas die Erzeugung des Erzeugerkorns auslöst. Anschließend registriert er den Timer und beginnt mit dem Senden von ganzzahligen Zufallszahlen an alle interessierten Parteien.

Auch hier werden viele Details übersprungen und nur eine allgemeine Übersicht bereitgestellt. Lesen Sie andere Teile dieses Handbuchs und anderer Ressourcen auf Rx, um ein gutes Verständnis dafür zu erhalten, was verfügbar ist und wie es funktioniert.

Die reaktive Programmierung kann ein leistungsstarker Ansatz sein, um viele Probleme zu lösen. Beispielsweise können Sie LINQ in einem Subscriber-Objekt verwenden, um Zahlen zu filtern und verschiedene interessante Operationen auszuführen.

Siehe auch

Orleans Stream-Programmierungs-APIs