Partilhar via


Orleans Início rápido de streaming

Este guia mostra uma maneira rápida de configurar e usar Orleans o Streams. Para saber mais sobre os detalhes dos recursos de streaming, leia outras partes desta documentação.

Configurações necessárias

Neste guia, você usa um fluxo baseado em memória que usa mensagens de grão para enviar dados de fluxo para assinantes. Use o provedor de armazenamento na memória para armazenar listas de assinaturas. O uso de mecanismos baseados em memória para streaming e armazenamento destina-se apenas ao desenvolvimento e teste locais, não a ambientes de produção.

No silo, silo é um ISiloBuilder, chame AddMemoryStreams.

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

No cliente de cluster, onde client é um IClientBuilder, chame AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

Neste guia, use um fluxo simples baseado em mensagens que usa mensagens de grão para enviar dados de fluxo para assinantes. Use o provedor de armazenamento na memória para armazenar listas de assinaturas; Esta não é uma escolha sábia para aplicações de produção reais.

No silo, hostBuilder é um ISiloHostBuilder, chame AddSimpleMessageStreamProvider.

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

No cliente de cluster, onde clientBuilder é um IClientBuilder, chame AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Observação

Por padrão, as mensagens passadas pelo Fluxo de Mensagens Simples são consideradas imutáveis e podem ser passadas por referência a outros grãos. Para desativar esse comportamento, configure o provedor de SMS para desativar SimpleMessageStreamProviderOptions.OptimizeForImmutableData.

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Você pode criar fluxos, enviar dados usando-os como produtores e receber dados como assinantes.

Produzir eventos

É relativamente fácil gerar eventos para fluxos de dados. Primeiro, obtenha acesso ao provedor de fluxo definido na configuração anteriormente ("StreamProvider"), em seguida, escolha um fluxo e envie dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

É relativamente fácil gerar eventos para fluxos de dados. Primeiro, obtenha acesso ao provedor de fluxo definido na configuração anteriormente ("SMSProvider"), em seguida, escolha um fluxo e envie dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Como você pode ver, o fluxo tem um GUID e um namespace. Isso facilita a identificação de fluxos exclusivos. Por exemplo, o namespace para uma sala de chat pode ser "Rooms" e o GUID pode ser o GUID do proprietário RoomGrain.

Aqui, use o GUID de uma sala de chat conhecida. Usando o OnNextAsync método do fluxo, envie dados para ele. Vamos fazer isso dentro de um temporizador usando números aleatórios. Você também pode usar qualquer outro tipo de dados para o fluxo.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Subscrever e receber dados de streaming

Para receber dados, você pode usar assinaturas implícitas e explícitas, descritas com mais detalhes em Assinaturas explícitas e implícitas. Este exemplo usa assinaturas implícitas, que são mais fáceis. Quando um tipo de grão deseja se inscrever implicitamente em um fluxo, ele usa o atributo [ImplicitStreamSubscription(namespace)].

Para o seu caso, defina um ReceiverGrain como este:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Sempre que são enviados dados por push para fluxos no namespace (como no exemplo do temporizador), um grain do tipo RANDOMDATA com o mesmo ReceiverGrain que o fluxo recebe a mensagem. Mesmo que nenhuma ativação do grão exista atualmente, o tempo de execução cria automaticamente um novo e envia a mensagem para ele.

Para que isso funcione, conclua o processo de assinatura definindo o OnNextAsync método para receber dados. Para fazer isso, o ReceiverGrain deve chamar algo assim em seu OnActivateAsync:

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Está tudo pronto! Agora, a única exigência é que algo provoque a criação do grão produtor. Em seguida, ele registra o temporizador e começa a enviar inteiros aleatórios para todas as partes interessadas.

Mais uma vez, este guia ignora muitos detalhes e fornece apenas uma visão geral de alto nível. Leia outras partes deste manual e outros recursos no Rx para obter uma boa compreensão do que está disponível e como funciona.

A programação reativa pode ser uma abordagem poderosa para resolver muitos problemas. Por exemplo, você pode usar o LINQ no assinante para filtrar números e executar várias operações interessantes.

Ver também

Orleans APIs de programação de fluxos