Udostępnij przez


Kanały emisji w programie Orleans

Kanały emisji to specjalny typ mechanizmu emisji, który może służyć do wysyłania komunikatów do wszystkich subskrybentów. W przeciwieństwie do dostawców przesyłania strumieniowego kanały emisji nie są trwałe i nie przechowują komunikatów i nie zastępują trwałych strumieni. W przypadku kanałów emisji ziarna są niejawnie subskrybowane do kanału emisji i odbierać komunikaty emisji od producenta. Spowoduje to oddzielenie nadawcy i odbiorcy komunikatu, co jest przydatne w scenariuszach, w których nadawca i odbiorca nie są znane z wyprzedzeniem.

Aby użyć kanału emisji, należy skonfigurować Orleans strumienie, a następnie włączyć emisję w kanale przy użyciu konfiguracji silosu AddBroadcastChannel .

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Przykładowy scenariusz

Rozważmy scenariusz, w którym masz ziarno, które musi otrzymywać aktualizacje cen akcji od dostawcy cen akcji. Dostawca cen akcji to usługa w tle, która publikuje aktualizacje cen akcji w kanale emisji. Ziarna są niejawnie subskrybowane do kanału emisji i otrzymują zaktualizowane ceny akcji. Na poniższym diagramie przedstawiono scenariusz:

Diagram cen zapasów przedstawiający silos, ziarna zapasów i zużywającego klienta w prostej architekturze kanału emisji.

Na powyższym diagramie:

  • Silos publikuje aktualizacje cen akcji do kanału emisji.
  • Ziarno subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.
  • Klient korzysta z aktualizacji cen akcji z ziarna zapasów.

Kanał emisji rozdziela producenta i konsumenta aktualizacji cen akcji. Producent publikuje aktualizacje cen akcji w kanale emisji, a konsument subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.

Definiowanie ziarna konsumenta

Aby korzystać z komunikatów kanału emisji, ziarno musi zaimplementować IOnBroadcastChannelSubscribed interfejs. Implementacja IBroadcastChannelSubscription.Attach będzie używać metody w celu dołączenia do kanału emisji. Metoda Attach przyjmuje parametr typu ogólnego dla typu komunikatu, który ma zostać odebrany. W poniższym przykładzie przedstawiono ziarno, które subskrybuje kanał emisji typu Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

Powyższy kod:

  • Ziarno LiveStockGrain implementuje IOnBroadcastChannelSubscribed interfejs.
  • Metoda jest wywoływana OnSubscribed , gdy ziarno subskrybuje kanał emisji.
  • Parametr subscription służy do wywoływania Attach metody w celu dołączenia do kanału emisji.
    • Metoda OnStockUpdated jest przekazywana jako Attach wywołanie zwrotne uruchamiane po odebraniu komunikatu Stock .
    • Metoda OnError jest przekazywana jako Attach wywołanie zwrotne uruchamiane po wystąpieniu błędu.

To przykładowe ziarno będzie zawierać najnowsze ceny akcji opublikowane w kanale emisji. Każdy klient, który prosi o to ziarno dla najnowszej ceny akcji, otrzyma najnowszą cenę z kanału emisji.

Publikowanie komunikatów w kanale emisji

Aby opublikować komunikaty w kanale emisji, musisz uzyskać odwołanie do kanału emisji. W tym celu należy pobrać element IBroadcastChannelProvider z obiektu IClusterClient. Za pomocą dostawcy możesz wywołać metodę IBroadcastChannelProvider.GetChannelWriter , aby uzyskać wystąpienie IBroadcastChannelWriter<T>klasy . Składnik zapisywania służy do publikowania komunikatów w kanale emisji. W poniższym przykładzie pokazano, jak publikować komunikaty w kanale emisji:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

Powyższy kod:

  • Klasa StockWorker to usługa w tle, która publikuje komunikaty w kanale emisji.
  • Konstruktor przyjmuje IStockClient parametr i IClusterClient jako parametry.
  • Z wystąpienia klienta klastra GetBroadcastChannelProvider metoda jest używana do pobierania dostawcy kanału emisji.
  • IStockClientUżywając klasy StockWorker , pobiera najnowszą cenę akcji dla symbolu akcji.
  • Co 15 sekund StockWorker klasa publikuje Stock komunikat w kanale emisji.

Publikowanie komunikatów w kanale emisji jest oddzielone od ziarna konsumenta. Ziarno konsumenta subskrybuje kanał emisji i odbiera komunikaty z kanału emisji. Producent mieszka w silosie i jest odpowiedzialny za publikowanie wiadomości w kanale emisji i nie wie nic o konsumpcji ziarna.

Zobacz też