Compartilhar via


Detalhes da implementação de fluxos Orleans

Esta seção fornece uma visão geral de alto nível da implementação do fluxo do Orleans. Ela descreve conceitos e detalhes que não são visíveis no nível do aplicativo. Se você planeja usar apenas fluxos, não precisa ler esta seção.

Terminologia:

Nós nos referimos pela palavra "fila" a qualquer tecnologia de armazenamento durável que possa ingerir eventos de fluxo e permita efetuar pull de eventos ou forneça um mecanismo baseado em push para consumir eventos. Normalmente, para fornecer escalabilidade, essas tecnologias fornecem filas fragmentadas/particionadas. Por exemplo, as Filas do Azure permitem criar várias filas e os Hubs de Eventos têm vários hubs.

Fluxos persistentes

Todos os Orleans provedores de fluxo persistente compartilham uma implementação PersistentStreamProvidercomum. Esses provedores de fluxo genéricos precisam ser configurados com um IQueueAdapterFactory específico da tecnologia.

Por exemplo, para fins de teste, temos adaptadores de fila que geram seus dados de teste em vez de ler os dados de uma fila. O código a seguir mostra como configuramos um provedor de fluxo persistente para usar nosso adaptador de fila personalizado (gerador). Ele faz isso configurando o provedor de fluxo persistente com uma função de fábrica usada para criar o adaptador.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Quando um produtor de fluxo gera um novo item de fluxo e chama stream.OnNext(), o Orleans runtime de streaming invoca o método apropriado no IQueueAdapter do provedor de fluxo, que enfileira o item diretamente na fila apropriada.

Como efetuar pull de agentes

No centro do provedor de fluxo persistente estão os agentes de pull. Os agentes de pull efetuam pull de eventos de um conjunto de filas duráveis e os entregam ao código do aplicativo na granularidade que os consomem. É possível considerar os agentes de pull como um "microsserviço" distribuído, um componente distribuído particionado, altamente disponível e elástico. Os agentes de pull são executados dentro dos mesmos silos que hospedam granularidades do aplicativo e são totalmente gerenciados pelo Streaming Runtime Orleans.

StreamQueueMapper e StreamQueueBalancer

Os agentes de pull são parametrizados com IStreamQueueMapper e IStreamQueueBalancer. O IStreamQueueMapper fornece uma lista de todas as filas e é responsável por mapear fluxos para filas. Dessa forma, o lado produtor do provedor de fluxo persistente sabe a fila em que a mensagem deve ser colocada.

O IStreamQueueBalancer expressa como as filas são equilibradas através dos silos e agentes Orleans. A meta é atribuir filas aos agentes de maneira equilibrada, para evitar gargalos e dar suporte à elasticidade. Quando um novo silo é adicionado ao cluster, as Orleans filas são automaticamente rebalanceadas entre todos os silos antigos e novos. O StreamQueueBalancer permite personalizar esse processo. Orleans possui diversos StreamQueueBalancers integrados, para dar suporte a diferentes cenários de balanceamento (números grandes e pequenos de filas) e diferentes ambientes (Azure, local, estático).

Usando o exemplo de gerador de teste acima, o código abaixo mostra como é possível configurar o mapeador de fila e o balanceador de fila.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

O código acima configura o GeneratorAdapterFactory para usar um mapeador de filas com oito filas, e distribui as filas pelo cluster usando o DynamicClusterConfigDeploymentBalancer.

Como efetuar pull do protocolo

Cada silo executa um conjunto de agentes de pull, e cada agente efetua pull de uma fila. Os próprios agentes de pull são implementados por um componente de runtime interno, chamado SystemTarget. Os SystemTargets são essencialmente a granularidade de runtime, estão sujeitos à simultaneidade de thread único, podem usar mensagens de granularidade regular e são tão leves quanto à granularidade. Em contraste com a granularidade, os SystemTargets não são virtuais: eles são criados explicitamente (pelo runtime) e não são transparentes de localização. Ao implementar agentes pull como SystemTargets, o Streaming Runtime Orleans pode contar com recursos internos Orleans e ser dimensionado para um número muito grande de filas, já que criar um novo agente de pull é tão barato quanto criar uma nova granularidade.

Cada agente de pull executa um temporizador periódico que é retirado da fila pela invocação do método IQueueAdapterReceiver.GetQueueMessagesAsync. As mensagens retornadas são colocadas na estrutura de dados interna por agente chamada IQueueCache. Cada mensagem é inspecionada para descobrir seu fluxo de destino. O agente usa a publicação/assinatura para descobrir a lista de consumidores de fluxo que assinaram esse fluxo. Depois que a lista de consumidores é recuperada, o agente a armazena localmente (no cache da publicação/assinatura) para que não precise consultar a publicação/assinatura em cada mensagem. O agente também assina a publicação/assinatura para receber a notificação de todos os novos consumidores que assinam esse fluxo. Esse handshake entre o agente e a publicação/assinatura garante uma semântica de assinatura de streaming forte: depois que o consumidor tiver se inscrito no fluxo, ele verá todos os eventos que foram gerados após a assinatura dele. Além disso, o uso de StreamSequenceToken permite que ele se inscreva no passado.

Cache de fila

O IQueueCache é uma estrutura interna de dados por agente que permite separar novos eventos da fila e entregá-los aos consumidores. Ele também permite desassociar a entrega para fluxos diferentes e consumidores diferentes.

Imagine uma situação em que um fluxo tenha três consumidores de fluxo e um deles seja lento. Se você não tomar cuidado, esse consumidor lento poderá afetar o progresso do agente, diminuindo o consumo de outros consumidores desse fluxo e até retardando a remoção da fila e a entrega de eventos para outros fluxos. Para evitar isso e permitir o paralelismo máximo no agente, usamos IQueueCache.

O IQueueCache armazena em buffer os eventos de fluxo e fornece uma forma para o agente entregar eventos a cada consumidor em um ritmo próprio. A entrega por consumidor é implementada pelo componente interno chamado IQueueCacheCursor, que rastreia o progresso de cada consumidor. Dessa forma, cada consumidor recebe eventos em seu próprio ritmo: consumidores rápidos recebem eventos tão rapidamente quanto são retirados da fila, enquanto os consumidores lentos os recebem mais tarde. Depois que a mensagem for entregue a todos os consumidores, ela poderá ser excluída do cache.

Contrapressão

A contrapressão no Streaming Runtime Orleans se aplica em dois locais: trazer eventos de fluxo da fila para o agente e entregar os eventos do agente para os consumidores de fluxo.

Este último é fornecido pelo mecanismo de entrega de mensagens interno Orleans . Cada evento de fluxo é entregue do agente aos consumidores por meio do sistema de mensagens de granularidade padrão Orleans , um de cada vez. Ou seja, os agentes enviam um evento (ou um lote de eventos de tamanho limitado) para cada consumidor de fluxo e aguardam essa chamada. O próximo evento não começará a ser entregue até que a Tarefa do evento anterior tenha sido resolvida ou interrompida. Dessa forma, naturalmente limitamos a taxa de entrega por consumidor a uma mensagem de cada vez.

Ao trazer eventos de fluxo da fila para o agente, o Streaming Orleans fornece um novo mecanismo especial de contrapressão. Como o agente separa a remoção dos eventos da fila e a entrega deles aos consumidores, um consumidor lento individual pode ficar para trás, tanto que o IQueueCache será preenchido. Para evitar o IQueueCache crescimento indefinidamente, limitamos seu tamanho (o limite de tamanho é configurável). No entanto, o agente nunca joga fora os eventos não entregues.

Em vez disso, quando o cache começa a ser preenchido, os agentes reduzem a taxa de remoção de eventos da fila. Dessa forma, podemos "sobreviver" nos períodos de entrega lentos ajustando a taxa na qual consumimos da fila ("backpressure") e voltar às taxas de consumo rápidas mais tarde. Para detectar os vales de "entrega lenta", o IQueueCache usa uma estrutura de dados interna de buckets de cache que acompanha o progresso da entrega de eventos aos consumidores de fluxo individuais. Isso resulta em um sistema muito responsivo e autoajustado.