Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Os aplicativos interagem com fluxos por meio de APIs muito semelhantes às conhecidas extensões reativas (Rx) no .NET. A principal diferença é que as extensões Orleans de fluxo são assíncronas para tornar o processamento mais eficiente na malha distribuída e escalável de computação Orleans.
Fluxo assíncrono
Você começa usando um provedor de fluxo para obter um identificador para um fluxo. Você pode pensar em um provedor de fluxo como uma fábrica de fluxo que permite que os implementadores personalizem o comportamento e a semântica dos fluxos:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Você pode obter uma referência ao fornecedor de streaming, chamando o método Grain.GetStreamProvider quando estiver dentro de um grain ou chamando o método GetStreamProvider na instância do cliente.
Orleans.Streams.IAsyncStream<T> é uma referência lógica, fortemente tipificada para um fluxo virtual, semelhante em espírito a uma Orleans Grain Reference. As chamadas para GetStreamProvider e GetStream são puramente locais. Os argumentos para GetStream são um GUID e uma cadeia de caracteres adicional chamada namespace de fluxo (que pode ser nulo). Juntos, o GUID e a cadeia de caracteres do namespace compõem a identidade do fluxo (semelhante aos argumentos para IGrainFactory.GetGrain). Essa combinação fornece flexibilidade extra na determinação de identidades de fluxo. Assim como o grão 7 pode existir dentro do tipo PlayerGrain e um grão 7 diferente pode existir dentro do tipo ChatRoomGrain, o fluxo 123 pode existir dentro do espaço de nomes PlayerEventsStream, e um fluxo 123 diferente pode existir dentro do espaço de nomes ChatRoomMessagesStream.
Produzir e consumir
IAsyncStream<T> implementa as IAsyncObserver<T> interfaces e IAsyncObservable<T> . Isso permite que seu aplicativo use o fluxo para produzir novos eventos usando IAsyncObserver<T> ou para se inscrever e consumir eventos usando IAsyncObservable<T>o .
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Para produzir eventos no fluxo, seu aplicativo chama:
await stream.OnNextAsync<T>(event)
Para subscrever um fluxo, a sua aplicação chama:
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
O argumento para SubscribeAsync pode ser um objeto que implementa a interface IAsyncObserver<T> ou uma combinação de funções lambda para processar eventos entrantes. Mais opções para SubscribeAsync estão disponíveis através da classe AsyncObservableExtensions.
SubscribeAsync Retorna um StreamSubscriptionHandle<T>, um identificador opaco usado para cancelar a assinatura do fluxo (semelhante a uma versão assíncrona do IDisposable).
await subscriptionHandle.UnsubscribeAsync()
É importante notar que a assinatura é para um grão, não para ativação. Uma vez que o código grain se inscreve no stream, essa assinatura ultrapassa a vida útil dessa ativação e permanece durável para sempre até que o código grain (potencialmente em uma ativação diferente) cancele explicitamente a inscrição. Este é o núcleo da abstração de fluxo virtual: não apenas todos os fluxos sempre existem no plano lógico, mas uma assinatura de fluxo também é permanente e continua a existir além da ativação física específica que a criou.
Cardinalidade
Um Orleans fluxo pode ter vários produtores e vários consumidores. Uma mensagem publicada por um produtor é entregue a todos os consumidores inscritos no fluxo antes de a mensagem ser publicada.
Além disso, um consumidor pode assinar o mesmo fluxo várias vezes. Cada vez que inscreve-se, recebe de volta um único StreamSubscriptionHandle<T>. Se um grão (ou cliente) assina X vezes para o mesmo fluxo, ele recebe o mesmo evento X vezes, uma vez para cada assinatura. O consumidor também pode cancelar a assinatura de uma assinatura individual. Você pode encontrar todas as assinaturas atuais ligando para:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Recuperando-se de falhas
Se o produtor de um fluxo morre (ou seu grão é desativado), ele não precisa fazer nada. Da próxima vez que este grão quiser produzir mais eventos, ele pode obter novamente o identificador do fluxo e produzir novos eventos como de costume.
A lógica do consumidor está um pouco mais envolvida. Como mencionado anteriormente, uma vez que um grão de consumidor se inscreve em um fluxo, essa assinatura é válida até que o grão cancele explicitamente a inscrição. Se o consumidor do fluxo morre (ou seu grão é desativado) e um novo evento é gerado no fluxo, o grão do consumidor é reativado automaticamente (assim como qualquer grão regular Orleans é ativado automaticamente quando uma mensagem é enviada a ele). A única coisa que o código de grãos precisa fazer agora é fornecer um IAsyncObserver<T> para processar os dados. O consumidor precisa reanexar a lógica de processamento como parte do OnActivateAsync() método. Para fazer isso, ele pode chamar:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
O consumidor usa o identificador anterior obtido durante a assinatura inicial para "retomar o processamento". Observe que ResumeAsync apenas atualiza uma assinatura existente com a nova instância da lógica de IAsyncObserver e não altera o fato de que esse consumidor já está inscrito nesse fluxo.
Como é que o consumidor obtém os velhos subscriptionHandle? Existem duas opções. O consumidor poderá ter mantido o identificador retornado da operação original SubscribeAsync e pode utilizá-lo agora. Como alternativa, se o consumidor não tiver o identificador, ele pode solicitar todos IAsyncStream<T> os seus identificadores de assinatura ativos ligando para:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
O consumidor pode, então, retomar todos eles ou cancelar a assinatura de alguns, se desejar.
Gorjeta
Se o componente do consumidor implementar a IAsyncObserver<T> interface diretamente (public class MyGrain<T> : Grain, IAsyncObserver<T>), teoricamente, não deveria precisar re-anexar o IAsyncObserver e, portanto, não necessitaria chamar ResumeAsync. O tempo de execução do streaming deve descobrir automaticamente que o grão já implementa IAsyncObserver e invoca esses IAsyncObserver métodos. No entanto, atualmente o tempo de execução de streaming não suporta isso, e o código do grain ainda precisa chamar ResumeAsync explicitamente, mesmo que o grain implemente IAsyncObserver diretamente.
Subscrições explícitas e implícitas
Por padrão, um consumidor de fluxo deve assinar explicitamente o fluxo. Essa assinatura é geralmente acionada por uma mensagem externa que o grão (ou cliente) recebe instruindo-o a subscrever-se. Por exemplo, num serviço de chat, quando um utilizador entra numa sala de chat, o seu grain recebe uma JoinChatGroup mensagem com o nome do chat, fazendo com que o grain do utilizador se inscreva nesse fluxo de conversação.
Além disso, Orleans os fluxos suportam assinaturas implícitas. Neste modelo, o grão não faz uma subscrição explícita. É subscrito automática e implicitamente com base na sua identidade granulada e num ImplicitStreamSubscriptionAttribute. O principal valor das assinaturas implícitas é permitir que a atividade de fluxo acione a ativação de grãos (e, portanto, a assinatura) automaticamente. Por exemplo, usando fluxos de SMS, se um grão quisesse produzir um fluxo e outro grão processá-lo, o produtor precisaria da identidade do grão consumidor e faria uma invocação ao grão para que se subscreva. Só então poderia começar a enviar eventos. Em vez disso, com assinaturas implícitas, o produtor pode simplesmente começar a produzir eventos para um fluxo de dados, e o componente consumidor ativa-se e subscreve automaticamente. Neste caso, o produtor não precisa saber quem está lendo os eventos.
A implementação MyGrainType de grão pode declarar um atributo [ImplicitStreamSubscription("MyStreamNamespace")]. Isso informa ao tempo de execução de streaming que, quando um evento é gerado em um fluxo com GUID de identidade XXX e namespace "MyStreamNamespace", ele deve ser entregue ao grão com identidade XXX do tipo MyGrainType. Ou seja, o tempo de execução mapeia o fluxo <XXX, MyStreamNamespace> para o grão <XXX, MyGrainType>do consumidor.
A presença de ImplicitStreamSubscription faz com que o tempo de execução em streaming inscreva automaticamente esse grão no fluxo e entregue eventos de fluxo para ele. No entanto, o código grain ainda precisa dizer ao tempo de execução como ele deseja que os eventos sejam processados. Essencialmente, ele precisa anexar o IAsyncObserver. Portanto, quando o grão é ativado, o código de grão dentro OnActivateAsync precisa chamar:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Escrevendo a lógica da assinatura
Abaixo estão as diretrizes para escrever a lógica de subscrição para diferentes casos: subscrições explícitas e implícitas, fluxos com possibilidade de retrocesso e sem possibilidade de retrocesso. A principal diferença entre assinaturas explícitas e implícitas é que, para assinaturas implícitas, o grain sempre tem exatamente uma assinatura implícita por namespace de fluxo. Não é possível criar múltiplas assinaturas (sem multiplicidade de assinaturas), não é possível cancelar a assinatura, e a lógica de grão apenas necessita anexar a lógica de processamento. Isso também significa que nunca há necessidade de retomar uma assinatura implícita. Por outro lado, para assinaturas explícitas, você precisa retomar a assinatura; caso contrário, subscrever novamente resulta na subscrição do grão várias vezes.
Subscrições implícitas:
Para assinaturas implícitas, o grão ainda precisa se inscrever para anexar a lógica de processamento. Pode fazer isto no grão de consumidor através da implementação das interfaces IStreamSubscriptionObserver e IAsyncObserver<T>, permitindo que o grão seja ativado separadamente da subscrição. Para se inscrever no fluxo, o grão cria um identificador e chama await handle.ResumeAsync(this) em seu OnSubscribed(...) método.
Para processar mensagens, implemente o método IAsyncObserver<T>.OnNextAsync(...) para receber dados de fluxo e um token de sequência. Como alternativa, o ResumeAsync método pode ter um conjunto de delegados representando os métodos da IAsyncObserver<T> interface: onNextAsync, onErrorAsync, e onCompletedAsync.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Subscrições explícitas:
Para assinaturas explícitas, um grão deve ligar SubscribeAsync para se inscrever no fluxo. Isso cria uma assinatura e anexa a lógica de processamento. A subscrição explícita existe até que o grão cancele a subscrição. Se um grão for desativado e reativado, ele ainda estará explicitamente inscrito, mas não terá lógica de processamento associada. Neste caso, o grão precisa reanexar a lógica de processamento. Para fazer isso, em seu OnActivateAsync, o grão primeiro precisa descobrir as suas assinaturas chamando IAsyncStream<T>.GetAllSubscriptionHandles(). O grão deve executar ResumeAsync em cada controlador que deseja continuar processando ou UnsubscribeAsync em qualquer controlador que já tenha concluído. O componente também pode opcionalmente especificar o StreamSequenceToken como um argumento nas chamadas ResumeAsync, o que faz com que esta assinatura explícita comece a consumir a partir desse token.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Ordem de fluxo e tokens de sequência
A ordem de entrega do evento entre um produtor individual e um consumidor depende do fornecedor de stream.
Com o SMS, o produtor controla explicitamente a ordem dos eventos vistos pelo consumidor, controlando como eles os publicam. Por defeito (se a opção para o SimpleMessageStreamProviderOptions.FireAndForgetDelivery fornecedor de SMS for false) e se o produtor aguardar cada chamada OnNextAsync, os eventos chegam por ordem FIFO. No SMS, o produtor decide como lidar com falhas de entrega indicadas por uma quebra Task retornada pela chamada OnNextAsync.
Os fluxos de Fila do Azure não garantem a ordem FIFO porque as Filas do Azure subjacentes não garantem a ordem em casos de falha (embora garantam a ordem FIFO em execuções sem falhas). Quando um produtor produz um evento em uma Fila do Azure, se a operação de fila falhar, o produtor deverá tentar outra fila e, posteriormente, lidar com possíveis mensagens duplicadas. No lado da entrega, o runtime de streaming retira o evento da fila e tenta entregá-lo para processamento aos consumidores. O tempo de execução exclui o evento da fila somente após o processamento bem-sucedido. Se a entrega ou o processamento falharem, o evento não será excluído da fila e reaparecerá automaticamente mais tarde. O tempo de execução do Streaming tenta entregá-lo novamente, potencialmente quebrando a ordem FIFO. Esse comportamento corresponde à semântica habitual das Filas do Azure.
Ordem definida pelo aplicativo: Para lidar com os problemas de pedido acima, seu aplicativo pode, opcionalmente, especificar seu pedido. Para isso, use um StreamSequenceToken, um objeto opaco IComparable usado para ordenar eventos. Um produtor pode passar um opcional StreamSequenceToken para a OnNextAsync chamada. Este StreamSequenceToken passa para o consumidor e é entregue com o evento. Dessa forma, seu aplicativo pode raciocinar e reconstruir sua ordem independentemente do tempo de execução do streaming.
Correntes rebobináveis
Alguns fluxos só permitem a subscrição a partir do último ponto no tempo, enquanto outros permitem "voltar no tempo". Esse recurso depende da tecnologia de enfileiramento subjacente e do provedor de fluxo específico. Por exemplo, as Filas do Azure só permitem consumir os eventos mais recentemente enfileirados, enquanto os Hubs de Eventos permitem reproduzir eventos de um ponto arbitrário no tempo (até um determinado tempo de expiração). Os fluxos que suportam voltar no tempo são chamados de fluxos rebobináveis.
O consumidor de um fluxo rebobinável pode passar um StreamSequenceToken para a SubscribeAsync chamada. O ambiente de execução fornece eventos a partir daquele ponto StreamSequenceToken. Um token nulo significa que o consumidor deseja receber eventos a partir do mais recente.
A capacidade de retroceder um fluxo é muito útil em cenários de recuperação. Por exemplo, considere um grão que se inscreve em um fluxo e verifica periodicamente seu estado junto com o token de sequência mais recente. Ao se recuperar de uma falha, o grão pode se inscrever novamente no mesmo fluxo a partir do token de sequência de pontos de verificação mais recente, recuperando sem perder nenhum evento gerado desde o último ponto de verificação.
O provedor de Hubs de Eventos é reversível. Você pode encontrar seu código no GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. Os fornecedores de SMS (agora Canal de Difusão) e Fila do Azurenão são capazes de retroceder.
Processamento escalonado automaticamente sem estado
Por padrão, Orleans os destinos de Streaming suportam um grande número de fluxos relativamente pequenos, cada um processado por um ou mais grãos com estado. Coletivamente, o processamento de todos os fluxos é fragmentado entre muitos grãos regulares (stateful). O código da aplicação controla esta fragmentação ao atribuir identificadores de fluxo e identificadores de grão e ao inscrever-se explicitamente. O objetivo é o processamento com estado fragmentado.
No entanto, há também um cenário interessante de processamento sem estado que é automaticamente escalado. Nesse cenário, uma aplicação tem um pequeno número de fluxos (ou até mesmo um fluxo grande), e o objetivo é o processamento sem estado. Um exemplo é um fluxo global de eventos em que o processamento envolve a decodificação de cada evento e potencialmente encaminhá-los para outros fluxos para processamento adicional com estado. O processamento em fluxo escalonado sem estado pode ser suportado em Orleans via StatelessWorkerAttribute grains.
Status atual do processamento dimensionado automaticamente sem monitoração de estado: Isso ainda não está implementado. A tentativa de assinar um fluxo a partir de um StatelessWorker grão resulta em comportamento indefinido.
Estamos a considerar apoiar esta opção.
Grãos e Orleans clientes
Orleans Os fluxos funcionam uniformemente entre grãos e Orleans clientes. Isso significa que pode-se usar as mesmas APIs dentro de um "grain" e num Orleans cliente para produzir e consumir eventos. Isso simplifica muito a lógica do aplicativo, tornando redundantes APIs especiais do lado do cliente, como o Grain Observers.
Streaming pub-sub totalmente gerenciado e confiável
Para rastrear assinaturas de streaming, Orleans usa um componente de tempo de execução chamado Streaming Pub-Sub, que serve como um ponto de encontro para consumidores e produtores de streaming. O Pub-sub rastreia todas as assinaturas de streaming, persiste-as e faz a correspondência entre os consumidores de streaming e os produtores de streaming.
Os aplicativos podem escolher onde e como os dados Pub-Sub são armazenados. O próprio componente Pub-Sub é implementado como grãos (chamados PubSubRendezvousGrain), que usam Orleans persistência declarativa.
PubSubRendezvousGrain usa o provedor de armazenamento chamado PubSubStore. Como acontece com qualquer grão, você pode designar uma implementação para um provedor de armazenamento. Para Streaming Pub-Sub, pode-se alterar a implementação no momento da construção do silo PubSubStore usando o construtor de host de silo.
O seguinte configura Pub-Sub para armazenar seu estado em tabelas do Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Dessa forma, os dados Pub-Sub são armazenados de forma durável na Tabela do Azure. Para o desenvolvimento inicial, você também pode usar o armazenamento de memória. Além do Pub-Sub, o Orleans Streaming Runtime entrega eventos de produtores para consumidores, gerencia todos os recursos de tempo de execução alocados para fluxos usados ativamente e coleta recursos de tempo de execução de forma transparente de fluxos não utilizados.
Configuração
Para usar fluxos, precisa ativar os stream providers através do host de silo ou dos construtores de cliente do cluster. Exemplo de configuração do provedor de fluxo:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");