Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym przewodniku przedstawiono szybki sposób konfigurowania i używania Orleans usługi Streams. Aby dowiedzieć się więcej na temat szczegółów funkcji przesyłania strumieniowego, przeczytaj inne części tej dokumentacji.
Wymagane konfiguracje
W tym przewodniku użyjesz strumienia opartego na pamięci, wykorzystującego komunikaty ziarniste do wysyłania danych do subskrybentów. Dostawca pamięci operacyjnej służy do przechowywania list subskrypcji. Korzystanie z mechanizmów opartych na pamięci w celu przesyłania strumieniowego i przechowywania jest przeznaczone tylko do lokalnego rozwoju i testowania, a nie dla środowisk produkcyjnych.
Na silosie, gdzie silo jest ISiloBuilder, wywołaj AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
Na kliencie klastra, gdzie client jest IClientBuilder, wywołaj AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
W tym przewodniku używamy prostego strumienia opartego na komunikatach, wykorzystując komunikaty ziarna do przesyłania danych strumienia do subskrybentów. Użyj dostawcy magazynu w pamięci do przechowywania list subskrypcji; nie jest to mądry wybór dla aplikacji produkcyjnych w rzeczywistym środowisku.
Na silosie, gdzie hostBuilder jest ISiloHostBuilder, wywołaj AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
Na kliencie klastra, gdzie clientBuilder jest IClientBuilder, wywołaj AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Uwaga
Domyślnie komunikaty przekazywane przez Prosty Strumień Komunikatów są uznawane za niezmienne i mogą być przekazywane przez odwołanie do innych ziaren. Aby wyłączyć to zachowanie, skonfiguruj dostawcę programu SMS, aby wyłączyć usługę SimpleMessageStreamProviderOptions.OptimizeForImmutableData.
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Możesz tworzyć strumienie, wysyłać dane jako producenci i odbierać dane jako subskrybenci.
Tworzenie zdarzeń
Tworzenie zdarzeń dla strumieni jest stosunkowo łatwe. Najpierw uzyskaj dostęp do dostawcy strumienia zdefiniowanego wcześniej w konfiguracji ("StreamProvider"), a następnie wybierz strumień i wypchnij do niego dane.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Tworzenie zdarzeń dla strumieni jest stosunkowo łatwe. Najpierw uzyskaj dostęp do dostawcy strumienia zdefiniowanego wcześniej w konfiguracji ("SMSProvider"), a następnie wybierz strumień i wypchnij do niego dane.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Jak widać, strumień ma zarówno identyfikator GUID, jak i przestrzeń nazw. Ułatwia to identyfikowanie unikatowych strumieni. Na przykład przestrzeń nazw dla pokoju czatu może mieć wartość "Pokoje", a identyfikator GUID może być identyfikatorem GUID należącym do RoomGrain.
W tym miejscu użyj identyfikatora GUID znanego pokoju rozmów. Używając metody strumienia OnNextAsync, przesyłaj do niego dane. Zróbmy to wewnątrz czasomierza przy użyciu liczb losowych. Możesz również użyć dowolnego innego typu danych dla strumienia.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Subskrybowanie i odbieranie danych przesyłanych strumieniowo
W przypadku odbierania danych można użyć niejawnych i jawnych subskrypcji opisanych bardziej szczegółowo w artykule Jawne i niejawne subskrypcje. W tym przykładzie użyto niejawnych subskrypcji, które są łatwiejsze. Gdy typ ziarna chce niejawnie subskrybować strumień, używa atrybutu [ImplicitStreamSubscription(namespace)].
W twoim przypadku zdefiniuj następujący element ReceiverGrain :
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Za każdym razem, gdy dane są wypychane do strumieni w RANDOMDATA przestrzeni nazw (jak w przykładzie czasomierza), ziarno typu ReceiverGrain o tym samym Guid , co strumień odbiera komunikat. Nawet jeśli obecnie nie istnieją żadne aktywacje ziarna, środowisko uruchomieniowe automatycznie tworzy nowy i wysyła do niego komunikat.
Aby to zadziałało, ukończ proces subskrypcji, ustawiając metodę OnNextAsync odbierania danych. Aby to zrobić, ReceiverGrain powinien wywołać coś takiego jak OnActivateAsync.
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Wszystko jest gotowe! Teraz jedynym wymaganiem jest to, że coś wyzwala tworzenie ziarna producenta. Następnie rejestruje czasomierz i rozpoczyna wysyłanie losowych liczb całkowitych do wszystkich zainteresowanych stron.
Ponownie ten przewodnik pomija wiele szczegółów i zawiera tylko ogólne omówienie. Przeczytaj inne części tego podręcznika i innych zasobów na platformie Rx, aby uzyskać dobrą wiedzę na temat tego, co jest dostępne i jak działa.
Programowanie reaktywne może być zaawansowanym podejściem do rozwiązywania wielu problemów. Na przykład można użyć LINQ w subskrybencie do filtrowania numerów i wykonywania różnych interesujących operacji.