Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Aplikacje współdziałają ze strumieniami za pośrednictwem interfejsów API bardzo podobnych do dobrze znanych rozszerzeń reaktywnych (Rx) na platformie .NET. Główną różnicą jest to, że Orleans rozszerzenia strumienia są asynchroniczne w celu zwiększenia wydajności przetwarzania w Orleansrozproszonej i skalowalnej sieci szkieletowej obliczeniowej.
Strumień asynchroniczny
Zacznij od użycia dostawcy strumienia , aby uzyskać dojście do strumienia. Dostawcę strumienia można traktować jako fabrykę strumieni, która umożliwia implementatorom dostosowywanie zachowania strumieni i semantyki:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Odwołanie do dostawcy strumienia można uzyskać, wywołując metodę Grain.GetStreamProvider wewnątrz ziarna lub wywołując metodę GetStreamProvider w instancji klienta.
Orleans.Streams.IAsyncStream<T> jest logicznym, silnie typizowanym uchwytem do strumienia wirtualnego, podobnie jak odniesienie do ziaren w Orleans. Wywołania do GetStreamProvider i GetStream są wyłącznie lokalne. Argumenty dla GetStream są identyfikatorem GUID i dodatkowym ciągiem nazywanym przestrzenią nazw strumienia (który może być pusty). Razem identyfikator GUID i ciąg przestrzeni nazw tworzą tożsamość strumienia (podobnie jak argumenty dla elementu IGrainFactory.GetGrain). Ta kombinacja zapewnia dodatkową elastyczność podczas określania tożsamości strumienia. Podobnie jak ziarno 7 może istnieć w ramach typu PlayerGrain, a inne ziarno 7 w ramach typu ChatRoomGrain, tak samo strumień 123 może istnieć w przestrzeni nazw PlayerEventsStream, a inny strumień 123 w przestrzeni nazw ChatRoomMessagesStream.
Tworzenie i używanie
IAsyncStream<T> implementuje zarówno interfejsy , jak IAsyncObserver<T> i IAsyncObservable<T> . Dzięki temu aplikacja może używać strumienia do tworzenia nowych zdarzeń przy użyciu IAsyncObserver<T> lub subskrybowania i konsumowania zdarzeń przy użyciu IAsyncObservable<T>.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Aby wygenerować zdarzenia w strumieniu, aplikacja wywołuje:
await stream.OnNextAsync<T>(event)
Aby zasubskrybować strumień, aplikacja wywołuje:
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
Argumentem do SubscribeAsync może być obiekt implementujący interfejs IAsyncObserver<T> lub kombinacja funkcji lambda do przetwarzania zdarzeń przychodzących. Więcej opcji dla SubscribeAsync dostępnych jest za pośrednictwem klasy AsyncObservableExtensions.
SubscribeAsync zwraca StreamSubscriptionHandle<T>, nieprzezroczysty uchwyt używany do anulowania subskrypcji strumienia (podobnie jak asynchroniczna wersja IDisposable).
await subscriptionHandle.UnsubscribeAsync()
Należy pamiętać, że subskrypcja dotyczy ziarna, a nie aktywacji. Gdy kod ziarna subskrybuje strumień, ta subskrypcja przekracza czas trwania tej aktywacji i pozostaje trwała na zawsze, dopóki kod ziarna (potencjalnie w innej aktywacji) jawnie odsubskrybuje strumień. Jest to rdzeń abstrakcji strumienia wirtualnego: nie tylko wszystkie strumienie istnieją zawsze logicznie, ale subskrypcja strumienia jest również trwała i przetrwa nawet po zakończeniu konkretnej aktywacji fizycznej, która ją utworzyła.
Kardynalność
Strumień Orleans może mieć wielu producentów i wielu odbiorców. Wiadomość opublikowana przez producenta jest dostarczana wszystkim konsumentom zasubskrybowanym strumieniem przed opublikowaniem wiadomości.
Ponadto użytkownik może wielokrotnie subskrybować ten sam strumień. Za każdym razem, gdy subskrybuje, jest zwracany unikatowy element StreamSubscriptionHandle<T>. Jeśli ziarno (lub klient) subskrybuje X razy ten sam strumień, otrzymuje to samo zdarzenie X razy, po jednym razie dla każdej subskrypcji. Użytkownik może również anulować subskrypcję indywidualną. Można znaleźć wszystkie bieżące subskrypcje, dzwoniąc:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Odzyskiwanie po awariach
Jeśli producent strumienia umiera (lub jego ziarno jest dezaktywowane), nie musi nic robić. Następnym razem, gdy to ziarno będzie chciało wygenerować więcej zdarzeń, może ponownie uzyskać uchwyt strumienia i utworzyć nowe zdarzenia jak zwykle.
Logika konsumentów jest nieco bardziej złożona. Jak wspomniano wcześniej, gdy konsumenckie ziarno subskrybuje strumień, subskrypcja pozostaje ważna, aż do momentu, gdy ziarno nie anuluje jej jawnie. Jeśli konsument strumienia umiera (lub jego ziarno jest dezaktywowane) i nowe zdarzenie jest generowane na strumieniu, ziarno konsumenta automatycznie reaguje (podobnie jak każde zwykłe Orleans ziarno automatycznie aktywuje się po wysłaniu do niego komunikatu). Jedyną rzeczą, jaką musi teraz wykonać kod ziarna, jest dostarczenie elementu IAsyncObserver<T> do przetwarzania danych. Użytkownik musi ponownie dołączyć logikę przetwarzania w ramach OnActivateAsync() metody . Aby to zrobić, może użyć:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Użytkownik używa poprzedniego dojścia uzyskanego podczas początkowej subskrypcji do "wznowienia przetwarzania". Należy pamiętać, że ResumeAsync tylko aktualizuje istniejącą subskrypcję przy użyciu nowego wystąpienia IAsyncObserver logiki i nie zmienia faktu, że ten użytkownik jest już subskrybowany do tego strumienia.
Jak konsument otrzymuje stary subscriptionHandle? Dostępne są dwie opcje. Konsument mógł utrwalić uchwyt zwrócony z oryginalnej SubscribeAsync operacji i może go teraz użyć. Alternatywnie, jeśli użytkownik nie ma identyfikatora, może poprosić IAsyncStream<T> o wszystkie aktywne identyfikatory subskrypcji, wywołując:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Użytkownik może następnie wznowić wszystkie z nich lub anulować subskrypcję niektórych w razie potrzeby.
Napiwek
Jeśli ziarno konsumenta bezpośrednio implementuje IAsyncObserver<T> interfejs (public class MyGrain<T> : Grain, IAsyncObserver<T>), teoretycznie nie powinno wymagać ponownego połączenia z IAsyncObserver, a tym samym wywołanie ResumeAsync nie będzie konieczne. Środowisko uruchomieniowe przesyłania strumieniowego powinno automatycznie ustalić, że ziarno już implementuje IAsyncObserver i wywołuje te IAsyncObserver metody. Jednak środowisko uruchomieniowe przesyłania strumieniowego obecnie tego nie obsługuje, a kod ziarna nadal musi jawnie wywołać ResumeAsync, nawet jeśli ziarno bezpośrednio implementuje IAsyncObserver.
Jawne i niejawne subskrypcje
Domyślnie odbiorca strumienia musi jawnie subskrybować strumień. Ta subskrypcja jest zwykle wyzwalana przez zewnętrzną wiadomość odbieraną przez ziarno (lub klienta) z poleceniem subskrypcji. Na przykład w usłudze czatu, gdy użytkownik dołącza do pokoju rozmów, jego ziarno otrzymuje JoinChatGroup wiadomość z nazwą czatu, co powoduje, że ziarno użytkownika subskrybuje ten strumień czatu.
Orleans Ponadto strumienie obsługują niejawne subskrypcje. W tym modelu ziarno nie subskrybuje jawnie. Jest to subskrybowane automatycznie i niejawnie na podstawie tożsamości ziarna i ImplicitStreamSubscriptionAttribute. Główną wartością niejawnych subskrypcji jest umożliwienie automatycznego wyzwalania aktywności strumienia, co prowadzi do aktywacji ziarna (a tym samym subskrypcji). Na przykład, używając strumieni SMS, jeśli jedno ziarno chciało utworzyć strumień, a inne ziarno go przetwarzać, producent potrzebowałby tożsamości ziarna konsumenta i musiałby skontaktować się z nim, aby poinformować o subskrypcji. Dopiero wtedy zaczął wysyłać zdarzenia. Zamiast tego, w przypadku niejawnych subskrypcji, producent może po prostu rozpocząć produkcję zdarzeń do strumienia, a moduł konsumenta automatycznie się aktywuje i subskrybuje. W takim przypadku producent nie musi wiedzieć, kto czyta wydarzenia.
Implementacja MyGrainType ziarna może zadeklarować atrybut [ImplicitStreamSubscription("MyStreamNamespace")]. Informuje to środowisko uruchomieniowe przesyłania strumieniowego, że po wygenerowaniu zdarzenia w strumieniu z identyfikatorem GUID XXX i przestrzeni nazw "MyStreamNamespace", powinno ono zostać dostarczone do ziarna o tożsamości XXX typu MyGrainType. Oznacza to, że środowisko uruchomieniowe mapuje strumień <XXX, MyStreamNamespace> na ziarno <XXX, MyGrainType>konsumenta .
Obecność ImplicitStreamSubscription powoduje, że środowisko uruchomieniowe przesyłania strumieniowego automatycznie subskrybuje ten obiekt do strumienia i dostarcza do niego zdarzenia ze strumienia. Jednak kod ziarna nadal musi poinformować środowisko uruchomieniowe, jak chce, aby zdarzenia były przetwarzane. Zasadniczo musi dołączyć element IAsyncObserver. W związku z tym po aktywowaniu ziarna kod ziarna wewnątrz OnActivateAsync musi wywołać:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Pisanie logiki subskrypcji
Poniżej przedstawiono wskazówki dotyczące pisania logiki subskrypcji w różnych przypadkach: jawne i niejawne subskrypcje, strumienie z możliwością przewijania i bez możliwości przewijania. Główną różnicą między jawnymi i niejawnymi subskrypcjami jest to, że w przypadku niejawnych subskrypcji ziarno zawsze ma dokładnie jedną niejawną subskrypcję na przestrzeń nazw strumienia. Nie ma możliwości utworzenia wielu subskrypcji (brak wielości subskrypcji), nie ma możliwości anulowania subskrypcji, a logika ziarna musi tylko dołączyć logikę przetwarzania. Oznacza to również, że nigdy nie trzeba wznowić niejawnej subskrypcji. Z drugiej strony w przypadku jawnych subskrypcji należy wznowić subskrypcję; w przeciwnym razie ponowne subskrybowanie powoduje wielokrotne subskrybowanie ziarna.
Niejawne subskrypcje:
W przypadku niejawnych subskrypcji ziarno nadal musi subskrybować, aby dołączyć logikę przetwarzania. Można to zrobić w ziarnie konsumenta, implementując interfejsy IStreamSubscriptionObserver i IAsyncObserver<T>, co pozwala na aktywację ziarna niezależnie od subskrypcji. Aby zasubskrybować strumień, ziarno tworzy uchwyt i wywołuje await handle.ResumeAsync(this) metodę OnSubscribed(...) .
Aby przetwarzać komunikaty, zaimplementuj metodę IAsyncObserver<T>.OnNextAsync(...) do odbierania danych strumieniowych oraz tokenu sekwencji. Alternatywnie ResumeAsync metoda może przyjąć zestaw delegatów reprezentujących metody interfejsu IAsyncObserver<T> : onNextAsync, onErrorAsynci onCompletedAsync.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Jawne subskrypcje:
W przypadku jawnych subskrypcji ziarno musi wywoływać SubscribeAsync , aby subskrybować strumień. Spowoduje to utworzenie subskrypcji i dołączenie logiki przetwarzania. Subskrypcja istnieje do momentu, gdy ziarno ją anuluje. Jeśli ziarno zostaje dezaktywowane i ponownie aktywowane, nadal jest wyraźnie subskrybowane, ale żadna logika przetwarzania nie jest powiązana. W takim przypadku ziarno musi ponownie dołączyć logikę przetwarzania. Aby to zrobić, w swoim OnActivateAsync ziarno najpierw musi znaleźć swoje subskrypcje, wywołując IAsyncStream<T>.GetAllSubscriptionHandles(). Ziarno musi wykonać ResumeAsync na każdym uchwycie, który chce kontynuować przetwarzanie lub UnsubscribeAsync na jakichkolwiek uchwytach, z którymi zakończyło pracę. Ziarno może również opcjonalnie określić StreamSequenceToken jako argument wywołań ResumeAsync , powodując, że ta jawna subskrypcja zacznie korzystać z tego tokenu.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Przesyłanie strumieniowe tokenów kolejności i sekwencji
Kolejność dostarczania zdarzeń pomiędzy indywidualnym producentem a konsumentem zależy od dostawcy strumienia.
Za pomocą programu SMS producent jawnie kontroluje kolejność zdarzeń widocznych przez konsumenta, kontrolując sposób ich publikowania. Domyślnie (jeśli opcja dostawcy SMS to SimpleMessageStreamProviderOptions.FireAndForgetDelivery) i jeśli producent czeka na każde wywołanie false, zdarzenia docierają w kolejności FIFO. W wiadomości SMS producent decyduje, jak obsługiwać błędy dostarczania wskazywane przez przerwane Task, które zostało zwrócone przez wywołanie OnNextAsync.
Strumienie kolejek platformy Azure nie zapewniają kolejności FIFO, ponieważ bazowe kolejki Azure nie zapewniają kolejności w sytuacjach awaryjnych (chociaż w przypadku braku awarii zapewniają kolejność FIFO). Gdy producent wstawia zdarzenie do kolejki Azure, jeśli operacja z kolejką się nie powiedzie, producent musi spróbować ponownie w innej kolejce, a później poradzić sobie z potencjalnie zduplikowanymi komunikatami. Po stronie Orleans dostarczania środowisko uruchomieniowe przesyłania strumieniowego usuwa zdarzenie i próbuje dostarczyć je do przetwarzania użytkownikom. Środowisko uruchomieniowe usuwa zdarzenie z kolejki tylko po pomyślnym przetworzeniu. Jeśli dostarczanie lub przetwarzanie zakończy się niepowodzeniem, zdarzenie nie zostanie usunięte z kolejki i zostanie automatycznie ponownie wyświetlone później. Środowisko uruchomieniowe przesyłania strumieniowego próbuje dostarczyć go ponownie, co może zakłócić kolejność FIFO. To zachowanie jest zgodne z normalną semantyką kolejek Azure.
Kolejność zdefiniowana przez aplikację: aby obsłużyć powyższe problemy z kolejnością, aplikacja może opcjonalnie określić jego kolejność. Wykonaj to przy użyciu StreamSequenceToken, nieprzezroczystego obiektu IComparable używanego do zamawiania zdarzeń. Producent może przekazać opcjonalny element StreamSequenceToken do wywołania OnNextAsync . To StreamSequenceToken jest przekazywane użytkownikowi i dostarczane wraz ze zdarzeniem. Dzięki temu aplikacja może wnioskować i rekonstruować jego kolejność niezależnie od systemu przesyłania strumieniowego.
Strumienie z możliwością przewijania
Niektóre strumienie zezwalają tylko na subskrybowanie od ostatniego punktu w czasie, podczas gdy inne zezwalają na "powrót w czasie". Ta funkcja zależy od podstawowej technologii kolejkowania i konkretnego dostawcy strumienia. Na przykład usługa Azure Queues zezwala tylko na odbieranie najnowszych zdarzeń w kolejce, podczas gdy usługa Event Hubs umożliwia ponowne odtwarzanie zdarzeń od dowolnego punktu w czasie (do momentu ich wygaśnięcia). Strumienie obsługujące powrót w czasie są nazywane strumieniami z możliwością przewijania.
Użytkownik strumienia z możliwością przewijania może przekazać StreamSequenceToken element do wywołania SubscribeAsync . Środowisko uruchomieniowe dostarcza zdarzenia od momentu StreamSequenceToken. Token o wartości null oznacza, że odbiorca chce odbierać zdarzenia rozpoczynające się od najnowszej.
Możliwość przewijania strumienia jest bardzo przydatna w scenariuszach odzyskiwania. Rozważmy na przykład proces, który subskrybuje strumień i okresowo zapisuje punkty kontrolne swojego stanu wraz z najnowszym tokenem sekwencji. Podczas odzyskiwania po awarii ziarno może ponownie subskrybować ten sam strumień z najnowszego tokenu sekwencji punktów kontrolnych, odzyskując bez utraty zdarzeń wygenerowanych od ostatniego punktu kontrolnego.
Dostawca usługi Event Hubs można przewijać. Kod można znaleźć w witrynie GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. Dostawcy wiadomości SMS (teraz kanał emisji) i usługi Azure Queuenie mogą się przewijać.
Przetwarzanie bezstanowe automatycznie skalowane w poziomie
Domyślnie Orleans obiekty docelowe przesyłania strumieniowego obsługują dużą liczbę stosunkowo małych strumieni, z których każdy jest przetwarzany przez co najmniej jedno ziarno stanowe. Łącznie przetwarzanie wszystkich strumieni jest rozdzielone na wiele fragmentów wśród wielu ziaren zwykłych (stanowych). Kod aplikacji steruje tym fragmentowaniem poprzez przypisywanie identyfikatorów strumienia i ziarna oraz poprzez jawne subskrybowanie. Celem jest przetwarzanie stanowe podzielone na fragmenty.
Jednak istnieje również ciekawy scenariusz automatycznego skalowania bezstanowego przetwarzania. W tym scenariuszu aplikacja ma niewielką liczbę strumieni (a nawet jednego dużego strumienia), a celem jest przetwarzanie bezstanowe. Przykładem jest globalny strumień zdarzeń, w których przetwarzanie obejmuje dekodowanie każdego zdarzenia i potencjalnie przekazywanie go do innych strumieni w celu dalszego przetwarzania stanowego. Bezstanowe przetwarzanie strumieni skalowane poziomo może być obsługiwane w Orleans za pomocą ziaren z StatelessWorkerAttribute.
Bieżący stan przetwarzania bezstanowego z automatycznym skalowaniem: Nie zostało to jeszcze zaimplementowane. Próba subskrybowania strumienia z ziarna StatelessWorker powoduje nieokreślone zachowanie.
Rozważamy wspieranie tej opcji.
Ziarna i Orleans klienci
Orleans strumienie działają równomiernie między ziarnami i Orleans klientami. Oznacza to, że można używać tych samych API wewnątrz ziarna i w Orleans kliencie do tworzenia i przetwarzania zdarzeń. Znacznie upraszcza to logikę aplikacji, sprawiając, że specjalne interfejsy API po stronie klienta, takie jak Grain Observers, stają się zbędne.
W pełni zarządzane i niezawodne przesyłanie strumieniowe pub-sub
Aby śledzić subskrypcje strumienia, Orleans korzysta z komponentu środowiska wykonawczego o nazwie Streaming Pub-Sub, który służy jako punkt zbiorczy dla odbiorców i dostawców strumieni. Pub-sub śledzi wszystkie subskrypcje strumienia, utrwala je i dopasowuje odbiorców strumienia do producentów strumieni.
Aplikacje mogą wybierać lokalizację i sposób przechowywania danych pub-sub. Sam składnik Pub-Sub jest implementowany jako ziarna (o nazwie PubSubRendezvousGrain), które używają Orleans trwałości deklaratywnej.
PubSubRendezvousGrain używa dostawcy magazynu o nazwie PubSubStore. Podobnie jak w przypadku dowolnego ziarna, można wyznaczyć implementację dla dostawcy magazynu. Dla Streaming Pub-Sub, można zmienić implementację PubSubStore podczas budowy silosu, używając konstruktora hosta silosu.
Poniżej przedstawiono konfigurację Pub-Sub przechowywania stanu w tabelach platformy Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Dzięki temu dane Pub-Sub są trwale przechowywane w tabeli platformy Azure. Na potrzeby początkowego programowania można również użyć magazynu pamięci. Oprócz pub-sub środowisko Orleans uruchomieniowe przesyłania strumieniowego dostarcza zdarzenia od producentów do konsumentów, zarządza wszystkimi zasobami środowiska uruchomieniowego przydzielonymi do aktywnie używanych strumieni i przezroczystie zbiera zasoby środowiska uruchomieniowego z nieużywanych strumieni.
Konfigurowanie
Aby używać strumieni, należy włączyć dostawców strumieni za pośrednictwem konstruktorów hosta silosu lub klienta klastra. Przykładowa konfiguracja dostawcy strumienia:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");