Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Kanały emisji to specjalny typ mechanizmu emisji, który może służyć do wysyłania komunikatów do wszystkich subskrybentów. W przeciwieństwie do dostawców przesyłania strumieniowego kanały emisji nie są trwałe i nie przechowują komunikatów i nie zastępują trwałych strumieni. W przypadku kanałów emisji ziarna są niejawnie subskrybowane do kanału emisji i odbierać komunikaty emisji od producenta. Spowoduje to oddzielenie nadawcy i odbiorcy komunikatu, co jest przydatne w scenariuszach, w których nadawca i odbiorca nie są znane z wyprzedzeniem.
Aby użyć kanału emisji, należy skonfigurować Orleans strumienie, a następnie włączyć emisję w kanale przy użyciu konfiguracji silosu AddBroadcastChannel .
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Przykładowy scenariusz
Rozważmy scenariusz, w którym masz ziarno, które musi otrzymywać aktualizacje cen akcji od dostawcy cen akcji. Dostawca cen akcji to usługa w tle, która publikuje aktualizacje cen akcji w kanale emisji. Ziarna są niejawnie subskrybowane do kanału emisji i otrzymują zaktualizowane ceny akcji. Na poniższym diagramie przedstawiono scenariusz:
Na powyższym diagramie:
- Silos publikuje aktualizacje cen akcji do kanału emisji.
- Ziarno subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.
- Klient korzysta z aktualizacji cen akcji z ziarna zapasów.
Kanał emisji rozdziela producenta i konsumenta aktualizacji cen akcji. Producent publikuje aktualizacje cen akcji w kanale emisji, a konsument subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.
Definiowanie ziarna konsumenta
Aby korzystać z komunikatów kanału emisji, ziarno musi zaimplementować IOnBroadcastChannelSubscribed interfejs. Implementacja IBroadcastChannelSubscription.Attach będzie używać metody w celu dołączenia do kanału emisji. Metoda Attach przyjmuje parametr typu ogólnego dla typu komunikatu, który ma zostać odebrany. W poniższym przykładzie przedstawiono ziarno, które subskrybuje kanał emisji typu Stock:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
Powyższy kod:
- Ziarno
LiveStockGrainimplementujeIOnBroadcastChannelSubscribedinterfejs. - Metoda jest wywoływana
OnSubscribed, gdy ziarno subskrybuje kanał emisji. - Parametr
subscriptionsłuży do wywoływaniaAttachmetody w celu dołączenia do kanału emisji.- Metoda
OnStockUpdatedjest przekazywana jakoAttachwywołanie zwrotne uruchamiane po odebraniu komunikatuStock. - Metoda
OnErrorjest przekazywana jakoAttachwywołanie zwrotne uruchamiane po wystąpieniu błędu.
- Metoda
To przykładowe ziarno będzie zawierać najnowsze ceny akcji opublikowane w kanale emisji. Każdy klient, który prosi o to ziarno dla najnowszej ceny akcji, otrzyma najnowszą cenę z kanału emisji.
Publikowanie komunikatów w kanale emisji
Aby opublikować komunikaty w kanale emisji, musisz uzyskać odwołanie do kanału emisji. W tym celu należy pobrać element IBroadcastChannelProvider z obiektu IClusterClient. Za pomocą dostawcy możesz wywołać metodę IBroadcastChannelProvider.GetChannelWriter , aby uzyskać wystąpienie IBroadcastChannelWriter<T>klasy . Składnik zapisywania służy do publikowania komunikatów w kanale emisji. W poniższym przykładzie pokazano, jak publikować komunikaty w kanale emisji:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
Powyższy kod:
- Klasa
StockWorkerto usługa w tle, która publikuje komunikaty w kanale emisji. - Konstruktor przyjmuje
IStockClientparametr i IClusterClient jako parametry. - Z wystąpienia klienta klastra GetBroadcastChannelProvider metoda jest używana do pobierania dostawcy kanału emisji.
-
IStockClientUżywając klasyStockWorker, pobiera najnowszą cenę akcji dla symbolu akcji. - Co 15 sekund
StockWorkerklasa publikujeStockkomunikat w kanale emisji.
Publikowanie komunikatów w kanale emisji jest oddzielone od ziarna konsumenta. Ziarno konsumenta subskrybuje kanał emisji i odbiera komunikaty z kanału emisji. Producent mieszka w silosie i jest odpowiedzialny za publikowanie wiadomości w kanale emisji i nie wie nic o konsumpcji ziarna.