Compartir a través de


Orleans Inicio rápido de streaming

En esta guía se muestra una manera rápida de configurar y usar Orleans Streams. Para más información sobre los detalles de las características de streaming, lea otras partes de esta documentación.

Configuraciones necesarias

En esta guía, se utiliza un flujo basado en memoria que emplea mensajería por granos para enviar datos del flujo a los suscriptores. Use el proveedor de almacenamiento en memoria para almacenar listas de suscripciones. El uso de mecanismos basados en memoria para el streaming y el almacenamiento solo está pensado para el desarrollo y las pruebas locales, no para entornos de producción.

En el silo, donde silo es un ISiloBuilder, llame a AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

En el cliente del clúster, donde client es un IClientBuilder, llame a AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

En esta guía, use una secuencia sencilla basada en mensajes que emplea mensajería granular para enviar datos de flujo a los suscriptores. Use el proveedor de almacenamiento en memoria para almacenar listas de suscripciones; esta no es una opción inteligente para las aplicaciones de producción reales.

En el silo, donde hostBuilder es un ISiloHostBuilder, llame a AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

En el cliente del clúster, donde clientBuilder es un IClientBuilder, llame a AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Nota:

De forma predeterminada, los mensajes pasados por la secuencia de mensajes simples se consideran inmutables y pueden pasarse por referencia a otros granos. Para desactivar este comportamiento, configure el proveedor de SMS para desactivar SimpleMessageStreamProviderOptions.OptimizeForImmutableData.

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Puede crear flujos, enviar datos con ellos como productores y recibir datos como suscriptores.

Generar eventos

Es relativamente fácil generar eventos para secuencias. En primer lugar, obtenga acceso al proveedor de flujo definido anteriormente en la configuración ("StreamProvider"). Luego, elija un flujo e inserte datos en él.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

Es relativamente fácil generar eventos para secuencias. En primer lugar, obtenga acceso al proveedor de flujo definido anteriormente en la configuración ("SMSProvider"). Luego, elija un flujo e inserte datos en él.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Como puede ver, la secuencia tiene un GUID y un espacio de nombres. Esto facilita la identificación de flujos únicos. Por ejemplo, el espacio de nombres para una sala de chat podría ser "Rooms" y el GUID podría ser el GUID del propietario RoomGrain.

Aquí, utilice el GUID de una sala de chat conocida. Usando el método OnNextAsync de la secuencia, inserte datos en ella. Vamos a hacerlo dentro de un temporizador mediante números aleatorios. También puede usar cualquier otro tipo de datos para el flujo.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Suscribirse y recibir datos de streaming

Para recibir datos, puede usar suscripciones implícitas y explícitas, que se describen con más detalle en Suscripciones explícitas e implícitas. En este ejemplo se usan suscripciones implícitas, que son más fáciles. Cuando un tipo de grano quiere suscribirse implícitamente a una secuencia, usa el atributo [ImplicitStreamSubscription(namespace)].

Para tu caso, define un ReceiverGrain de la siguiente manera:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Cada vez que los datos se transmiten a secuencias en el RANDOMDATA espacio de nombres (como en el ejemplo del temporizador), un elemento de tipo ReceiverGrain con el mismo Guid que el de la secuencia recibe el mensaje. Incluso si actualmente no existen activaciones del grano, el tiempo de ejecución crea automáticamente uno nuevo y envía el mensaje a él.

Para que esto funcione, complete el proceso de suscripción estableciendo el OnNextAsync método para recibir datos. Para ello, ReceiverGrain debe llamar a algo parecido a esto en su OnActivateAsync:

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

¡Ya está a punto! Ahora, el único requisito es que algo desencadena la creación del grano del productor. A continuación, registra el temporizador y comienza a enviar enteros aleatorios a todas las partes interesadas.

De nuevo, esta guía omite muchos detalles y solo proporciona información general de alto nivel. Lea otras partes de este manual y otros recursos en Rx para obtener una buena comprensión de lo que está disponible y cómo funciona.

La programación reactiva puede ser un enfoque eficaz para resolver muchos problemas. Por ejemplo, podría usar LINQ en el suscriptor para filtrar números y realizar diversas operaciones interesantes.

Consulte también

Orleans API de programación de secuencias