Compartilhar via


APIs de streaming do Orleans

Os aplicativos interagem com fluxos por meio de APIs muito semelhantes às conhecidas Rx (Extensões Reativas) no .NET. A principal diferença é que Orleans as extensões de fluxo são assíncronas para tornar o processamento mais eficiente na Orleansmalha de computação distribuída e escalável.

Fluxo assíncrono

Você começa usando um provedor de fluxo para obter um identificador para um fluxo. Você pode pensar em um provedor de fluxo como uma fábrica de fluxo que permite aos implementadores personalizar o comportamento e a semântica de fluxos:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Você pode obter uma referência ao provedor de fluxo chamando o método Grain.GetStreamProvider quando estiver dentro de um grain ou chamando o método GetStreamProvider na instância do cliente.

Orleans.Streams.IAsyncStream<T> é um identificador lógico e fortemente tipado para um fluxo virtual, semelhante em espírito a uma Orleans Referência de Grãos. Chamadas para GetStreamProvider e GetStream são puramente locais. Os argumentos para GetStream são um GUID e uma string adicional chamada espaço de nomes de fluxo (que pode ser nulo). Juntos, o GUID e a cadeia de caracteres de namespace compõem a identidade do fluxo (semelhante aos argumentos para IGrainFactory.GetGrain). Essa combinação fornece flexibilidade extra na determinação de identidades de fluxo. Assim como grão 7 pode existir dentro do PlayerGrain tipo e um grão 7 diferente pode existir dentro do ChatRoomGrain tipo, Stream 123 pode existir dentro do PlayerEventsStream namespace, e um stream 123 diferente pode existir dentro do ChatRoomMessagesStream namespace.

Produção e consumo

O IAsyncStream<T> implementa as interfaces IAsyncObserver<T> e IAsyncObservable<T>. Isso permite que seu aplicativo use o fluxo para produzir novos eventos usando IAsyncObserver<T> ou para assinar e consumir eventos usando IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Para produzir eventos no fluxo, seu aplicativo chama:

await stream.OnNextAsync<T>(event)

Para assinar um fluxo, seu aplicativo chama:

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

O argumento de SubscribeAsync pode ser um objeto que implementa a interface IAsyncObserver<T> ou uma combinação de funções lambda para processar eventos de entrada. Mais opções para SubscribeAsync estão disponíveis por meio da classe AsyncObservableExtensions. SubscribeAsync retorna um StreamSubscriptionHandle<T>, um identificador opaco usado para desinscrever-se do fluxo (semelhante a uma versão assíncrona de IDisposable).

await subscriptionHandle.UnsubscribeAsync()

É importante observar que a assinatura é para um grão, não para ativação. Depois que o código do grão assina o fluxo, essa assinatura ultrapassa a duração dessa ativação e permanece durável até que o código do grão (potencialmente em uma ativação diferente) cancele explicitamente a assinatura. Esse é o núcleo da abstração do fluxo virtual: não só todos os fluxos sempre existem logicamente, mas uma assinatura de fluxo também é durável e está além da ativação física específica que a criou.

Multiplicidade

Um Orleans fluxo pode ter vários produtores e vários consumidores. Uma mensagem publicada por um produtor é entregue a todos os consumidores inscritos no fluxo antes da publicação da mensagem.

Além disso, um consumidor pode assinar o mesmo fluxo várias vezes. Cada vez que ele se inscreve, ele recebe de volta um exclusivo StreamSubscriptionHandle<T>. Se um grão (ou cliente) assinar X vezes ao mesmo fluxo, ele receberá o mesmo evento X vezes, uma vez por assinatura. O consumidor também pode cancelar uma assinatura individual. Você pode encontrar todas as assinaturas atuais chamando:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Recuperando de falhas

Se o produtor de um fluxo morrer (ou seu grão for desativado), ele não precisará fazer nada. Na próxima vez que esse grão quiser produzir mais eventos, ele poderá obter o manipulador de fluxo novamente e produzir novos eventos como de costume.

A lógica do consumidor está um pouco mais envolvida. Conforme mencionado anteriormente, uma vez que um grão consumidor assina um fluxo, essa assinatura é válida até que o grão cancele a inscrição explicitamente. Se o consumidor do fluxo morrer (ou seu grão for desativado) e um novo evento for gerado no fluxo, o grão do consumidor reativará automaticamente (assim como qualquer grão regular Orleans é ativado automaticamente quando uma mensagem é enviada para ele). A única coisa que o código de grãos precisa fazer agora é fornecer um IAsyncObserver<T> para processar os dados. O consumidor precisa anexar novamente a lógica de processamento como parte do método OnActivateAsync(). Para fazer isso, ele pode chamar:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

O consumidor usa o identificador anterior obtido durante a assinatura inicial para "retomar o processamento". Observe que ResumeAsync apenas atualiza uma assinatura existente com a nova instância da IAsyncObserver lógica e não altera o fato de que esse consumidor já está inscrito nesse fluxo.

Como o consumidor obtém o antigo subscriptionHandle? Há duas opções. O consumidor pode ter persistido o identificador retornado da operação original SubscribeAsync e pode usá-lo agora. Alternativamente, se o consumidor não possuir o identificador, poderá pedi-lo ao IAsyncStream<T> para obter todos os identificadores de assinatura ativos, chamando:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Em seguida, o consumidor pode retomar todos eles ou cancelar a assinatura de alguns, se desejado.

Dica

Se o consumidor implementar a IAsyncObserver<T> interface diretamente (public class MyGrain<T> : Grain, IAsyncObserver<T>), teoricamente não será necessário anexar IAsyncObserver novamente, portanto, não precisaria chamar ResumeAsync. O runtime de streaming deve descobrir automaticamente que o grão já implementa IAsyncObserver e invocar esses métodos IAsyncObserver. No entanto, o runtime de streaming atualmente não dá suporte a isso, e o código do grain ainda precisa chamar ResumeAsync explicitamente, mesmo que o grain implemente IAsyncObserver diretamente.

Assinaturas explícitas e implícitas

Por padrão, um consumidor de fluxo deve assinar explicitamente o fluxo. Essa assinatura geralmente é disparada por uma mensagem externa que o grão (ou cliente) recebe instruindo-a a assinar. Por exemplo, em um serviço de chat, quando um usuário entra em uma sala de chat, seu grão recebe uma JoinChatGroup mensagem com o nome do chat, fazendo com que o usuário assine esse fluxo de chat.

Além disso, Orleans os fluxos dão suporte a assinaturas implícitas. Nesse modelo, o grão não adere explicitamente. É inscrito automaticamente e implicitamente com base em sua identidade de grão e um ImplicitStreamSubscriptionAttribute. O principal valor das assinaturas implícitas é permitir que a atividade de fluxo dispare a ativação de grãos (e, portanto, a assinatura) automaticamente. Por exemplo, usando fluxos de SMS, se um grão quisesse produzir um fluxo e outro grão processá-lo, o produtor precisaria da identidade do grão do consumidor e faria uma chamada ao grão para que ele se inscreva. Só então ele poderia começar a enviar eventos. Em vez disso, com assinaturas implícitas, o produtor pode simplesmente começar a produzir eventos para um stream, e o componente consumidor ativa e se inscreve automaticamente. Nesse caso, o produtor não precisa saber quem está lendo os eventos.

A implementação de granularidade MyGrainType pode declarar um atributo [ImplicitStreamSubscription("MyStreamNamespace")]. Isso informa ao runtime de streaming que, quando um evento é gerado em um fluxo com GUID de identidade XXX e namespace "MyStreamNamespace", ele deve ser entregue ao grão com a identidade XXX do tipo MyGrainType. Isso significa que os mapas de runtime transmitem <XXX, MyStreamNamespace> para o grão do consumidor <XXX, MyGrainType>.

A presença de ImplicitStreamSubscription faz com que o runtime de streaming assine automaticamente essa instância ao streaming e entregue eventos de streaming a ela. No entanto, o código de granulação ainda precisa informar ao runtime como ele deseja que os eventos sejam processados. Essencialmente, ele precisa anexar o IAsyncObserver. Portanto, quando o grão é ativado, o código de grão dentro OnActivateAsync precisa chamar:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Elaboração da lógica de assinatura

Abaixo estão as diretrizes para escrever a lógica de assinatura para vários casos: assinaturas explícitas e implícitas, fluxos rebobináveis e não rebobináveis. A principal diferença entre assinaturas explícitas e implícitas é que, para assinaturas implícitas, o grain sempre tem exatamente uma assinatura implícita por namespace de fluxo. Não há como criar várias assinaturas (sem multiplicidade de assinatura), nenhuma maneira de cancelar a assinatura e a lógica de granulação só precisa anexar a lógica de processamento. Isso também significa que nunca é necessário retomar uma assinatura implícita. Por outro lado, para assinaturas explícitas, você precisa reiniciar a assinatura; caso contrário, assinar novamente resulta no componente sendo inscrito várias vezes.

Assinaturas implícitas:

Para assinaturas implícitas, a granularidade ainda precisa se inscrever para anexar a lógica de processamento. Você pode fazer isso no grão do consumidor implementando as interfaces IStreamSubscriptionObserver e IAsyncObserver<T>, permitindo que o grão seja ativado separadamente da assinatura. Para assinar o fluxo, a granularidade cria um identificador e chama await handle.ResumeAsync(this) em seu método OnSubscribed(...).

Para processar mensagens, implemente o IAsyncObserver<T>.OnNextAsync(...) método para receber dados de fluxo e um token de sequência. Como alternativa, o ResumeAsync método pode usar um conjunto de delegados que representam os métodos da IAsyncObserver<T> interface: onNextAsync, onErrorAsynce onCompletedAsync.

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

Assinaturas explícitas:

Para assinaturas explícitas, um grão deve chamar SubscribeAsync para assinar o fluxo. Isso cria uma assinatura e anexa a lógica de processamento. A assinatura explícita permanece até que o grão cancele a assinatura. Se um grão se desativa e se reativa, ele ainda estará explicitamente inscrito, mas nenhuma lógica de processamento estará associada. Nesse caso, o grão precisa reanexar a lógica de processamento. Para fazer isso, em seu OnActivateAsync, o grão primeiro precisa descobrir suas assinaturas chamando IAsyncStream<T>.GetAllSubscriptionHandles(). O grão deve executar ResumeAsync em cada identificador com o qual deseja continuar o processamento ou UnsubscribeAsync em qualquer identificador com o qual ele não precisa mais. O grão também pode, opcionalmente, especificar o StreamSequenceToken como um argumento nas chamadas de ResumeAsync, fazendo com que essa assinatura explícita comece a consumir desse token.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    }
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Ordem de fluxo e tokens de sequência

A ordem de entrega de eventos entre um produtor individual e um consumidor depende do provedor de streaming.

Com o SMS, o produtor controla explicitamente a ordem dos eventos vistos pelo consumidor controlando como eles os publicam. Por padrão (se a opção SimpleMessageStreamProviderOptions.FireAndForgetDelivery para o provedor de SMS for false) e caso o produtor espere por cada chamada OnNextAsync, os eventos chegarão na ordem FIFO. Em SMS, o produtor decide como lidar com falhas de entrega indicadas por um erro Task retornado pela chamada OnNextAsync.

Os fluxos de Fila do Azure não garantem a ordem FIFO porque as Filas do Azure subjacentes não asseguram a ordem em casos de falha, embora garantam a ordem FIFO quando não há falhas nas execuções. Quando um produtor produz um evento em uma Fila do Azure, se a operação de fila falhar, o produtor deverá tentar outra fila e, posteriormente, lidar com possíveis mensagens duplicadas. No lado da entrega, o Orleans runtime de streaming desativa o evento e tenta entregá-lo para processamento aos consumidores. O runtime exclui o evento da fila somente após o processamento bem-sucedido. Se a entrega ou o processamento falhar, o evento não será excluído da fila e reaparecerá automaticamente mais tarde. O runtime de streaming tenta entregá-lo novamente, potencialmente quebrando a ordem FIFO. Esse comportamento corresponde à semântica normal das Filas do Azure.

Ordem definida pelo aplicativo: para lidar com os problemas de ordenação acima, o aplicativo pode, opcionalmente, especificar sua ordenação. Obtenha isso usando um StreamSequenceTokenobjeto opaco IComparable usado para ordenar eventos. Um produtor pode transmitir um StreamSequenceToken opcional para a chamada OnNextAsync. Isso StreamSequenceToken é repassado ao consumidor e entregue juntamente com o evento. Dessa forma, seu aplicativo pode raciocinar e reconstruir sua ordem independentemente do runtime de streaming.

Fluxos rebobináveis

Alguns fluxos só permitem a assinatura começando do ponto mais recente no tempo, enquanto outros permitem "voltar no tempo". Essa funcionalidade depende da tecnologia de fila subjacente e do provedor de fluxo específico. Por exemplo, as Filas do Azure só permitem consumir os eventos enfileirados mais recentes, enquanto os Hubs de Eventos permitem reexecutar eventos a partir de um ponto arbitrário no tempo (até algum tempo de expiração). Os fluxos com suporte para voltar no tempo são chamados de fluxos rebobináveis.

.O consumidor de um fluxo rebobinável pode transmitir um StreamSequenceToken para a chamada SubscribeAsync. O runtime fornece eventos a partir disso StreamSequenceToken. Um token nulo significa que o consumidor deseja receber eventos a partir do mais recente.

A capacidade de rebobinar um fluxo é muito útil em cenários de recuperação. Por exemplo, considere um grão que assina um fluxo e verifica periodicamente seu estado junto com o token de sequência mais recente. Ao se recuperar de uma falha, o componente pode se reinscrever no mesmo fluxo com base no token de sequência do ponto de verificação mais recente, recuperando-se sem perder nenhum evento gerado a partir do último ponto de verificação.

O provedor de Hubs de Eventos é retrocedível. É possível encontrar o código dele no GitHub: Orleans/Azure/Orleans.Streaming.EventHubs. O SMS (agora Canal de Transmissão) e o provedor de filas do Azurenão são rebobináveis.

Processamento sem estado expandido automaticamente

Por padrão, Orleans o Streaming tem como destino dar suporte a um grande número de fluxos relativamente pequenos, cada um processado por um ou mais grãos com estado. Coletivamente, o processamento de todos os fluxos é fragmentado entre muitos grãos regulares (com estado). O código do aplicativo controla essa fragmentação atribuindo IDs de fluxo e IDs de grãos e assinando explicitamente. O objetivo é o processamento fragmentado com estado.

No entanto, também há um cenário interessante de processamento sem estado com escalonamento automático. Nesse cenário, um aplicativo tem um pequeno número de fluxos (ou até mesmo um fluxo grande) e a meta é o processamento sem estado. Um exemplo é um fluxo global de eventos em que o processamento envolve a decodificação de cada evento e potencialmente encaminhá-lo para outros fluxos para processamento com estado adicional. O processamento de fluxo escalonado sem estado pode ser suportado em Orleans via StatelessWorkerAttribute grains.

Status atual do processamento sem estado, dimensionado automaticamente: Isso ainda não foi implementado. A tentativa de inscrever-se em um fluxo de um StatelessWorker grain resulta em um comportamento indefinido. Estamos considerando dar suporte a essa opção.

Granularidades e clientes do Orleans

Os streams do Orleans funcionam uniformemente em granularidades e em clientes do Orleans. Isso significa que você pode usar as mesmas APIs dentro de um grão e em um Orleans cliente para produzir e consumir eventos. Isso simplifica muito a lógica do aplicativo, tornando as APIs especiais do lado do cliente, como o Grain Observers, redundantes.

Pub-Sub de streaming totalmente gerenciado e confiável

Para acompanhar assinaturas de fluxo, Orleans usa um componente de runtime chamado Streaming Pub-Sub, que serve como um ponto de encontro para consumidores e produtores de fluxo. O pub-sub gerencia todas as assinaturas de fluxo, mantém-nas e faz a correspondência entre consumidores de fluxo e produtores de fluxo.

Os aplicativos podem escolher onde e como os dados do Pub-Sub são armazenados. O próprio componente Pub-Sub é implementado na forma de granularidades (chamadas PubSubRendezvousGrain), que usam a persistência declarativa do Orleans. O PubSubRendezvousGrain usa o provedor de armazenamento PubSubStore. Como acontece com qualquer grão, é possível designar uma implementação para um provedor de armazenamento. Para o Streaming Pub-Sub, você pode alterar a implementação do PubSubStore no momento da construção do silo usando o construtor de host do silo:

A seguir, configura Pub-Sub para armazenar seu estado em tabelas do Azure.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

Dessa forma, os dados Pub-Sub são armazenados de forma durável na Tabela do Azure. Para o desenvolvimento inicial, também é possível usar o armazenamento de memória. Além do Pub-Sub, o Orleans Streaming Runtime entrega eventos de produtores a consumidores, gerencia todos os recursos de runtime alocados para streams usados ativamente e faz a coleta de lixo transparente dos recursos de runtime de streams não utilizados.

Configuração

Para usar fluxos, você precisa habilitar provedores de fluxo por meio do host de silo ou dos construtores de clientes de cluster. Configuração de exemplo do provedor de fluxo:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Confira também

Provedores de Streaming do Orleans