Compartir a través de


API de streaming de Orleans

Las aplicaciones interactúan con flujos a través de API muy similares a las conocidas extensiones reactivas (Rx) en .NET. La principal diferencia es que las Orleans extensiones de flujo son asincrónicas para que el procesamiento sea más eficaz en la Orleansestructura de computación distribuida y escalable.

Flujo asincrónico

Para empezar, use un proveedor de flujos para obtener un identificador de un flujo. Puede considerar un proveedor de flujos como un generador de flujos que permite a los implementadores personalizar el comportamiento y la semántica de los flujos:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Puede obtener una referencia al proveedor de flujos llamando al método Grain.GetStreamProvider cuando se encuentra dentro de un grain o llamando al método GetStreamProvider en la instancia del cliente.

Orleans.Streams.IAsyncStream<T> es un identificador lógico, fuertemente tipado para un flujo virtual, similar en espíritu a una Orleans Referencia de Grano. Las llamadas a GetStreamProvider y GetStream son puramente locales. Los argumentos para GetStream son un GUID y una cadena adicional denominada espacio de nombres de flujo (que puede ser nulo). Juntos, el GUID y la cadena de espacio de nombres componen la identidad de la secuencia (similar a los argumentos de IGrainFactory.GetGrain). Esta combinación proporciona flexibilidad adicional para determinar las identidades de flujo. Al igual que el grano 7 puede existir dentro del tipo PlayerGrain y otro grano 7 diferente puede existir dentro del tipo ChatRoomGrain, el flujo 123 puede existir dentro del espacio de nombres PlayerEventsStream y otro flujo 123 diferente puede existir dentro del espacio de nombres ChatRoomMessagesStream.

Producción y consumo

IAsyncStream<T> implementa las interfaces IAsyncObserver<T> y IAsyncObservable<T>. Esto permite que la aplicación use la secuencia para generar nuevos eventos mediante IAsyncObserver<T> o para suscribirse a y consumir eventos mediante IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Para generar eventos en el flujo, la aplicación llama a:

await stream.OnNextAsync<T>(event)

Para suscribirse a una secuencia, su aplicación llama a:

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

El argumento para SubscribeAsync puede ser un objeto que implementa la IAsyncObserver<T> interfaz o una combinación de funciones lambda para procesar eventos entrantes. Hay más opciones disponibles para SubscribeAsync a través de la clase AsyncObservableExtensions. SubscribeAsync devuelve un StreamSubscriptionHandle<T>, un identificador opaco usado para cancelar la suscripción de la secuencia (similar a una versión asincrónica de IDisposable).

await subscriptionHandle.UnsubscribeAsync()

Es importante tener en cuenta que la suscripción es para un grano, no para la activación. Una vez que el código de grano se suscribe a la secuencia, esta suscripción supera la vida de esta activación y permanece duradera para siempre hasta que el código de grano (potencialmente en una activación diferente) cancela explícitamente la suscripción. Este es el núcleo de la abstracción de flujo virtual: no solo existen lógicamente todos los flujos, sino que una suscripción de flujo también es duradera y vive más allá de la activación física concreta que la creó.

Multiplicidad

Una Orleans secuencia puede tener varios productores y varios consumidores. Un mensaje publicado por un productor se entrega a todos los consumidores suscritos a la secuencia antes de que se publique el mensaje.

Además, un consumidor puede suscribirse a la misma secuencia varias veces. Cada vez que se suscribe, recibe nuevamente un único StreamSubscriptionHandle<T>. Si un grano (o cliente) se suscribe X veces al mismo flujo, recibe el mismo evento X veces, una vez por cada suscripción. El consumidor también puede cancelar la suscripción individual. Puede encontrar todas las suscripciones actuales llamando a:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Recuperación de errores

Si el productor de un flujo muere (o su grano está desactivado), no necesita hacer nada. La próxima vez que este grano quiera generar más eventos, puede obtener el identificador de secuencia de nuevo y producir nuevos eventos como de costumbre.

La lógica del consumidor está ligeramente más implicada. Como se mencionó antes, una vez que un componente consumidor se suscribe a un flujo, esta suscripción es válida hasta que el componente cancela explícitamente la suscripción. Si el consumidor de la secuencia muere (o su grano está desactivado) y se genera un nuevo evento en la secuencia, el grano del consumidor se reactiva automáticamente (al igual que cualquier grano normal Orleans se activa automáticamente cuando se envía un mensaje a ella). Lo único que el código de grano debe hacer ahora es proporcionar un IAsyncObserver<T> para procesar los datos. El consumidor debe volver a adjuntar la lógica de procesamiento como parte del método OnActivateAsync(). Para ello, puede llamar a:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

El consumidor usa el identificador anterior obtenido durante la suscripción inicial para "reanudar el procesamiento". Tenga en cuenta que ResumeAsync simplemente actualiza una suscripción existente con la nueva instancia de IAsyncObserver lógica y no cambia el hecho de que este consumidor ya está suscrito a esta secuencia.

¿Cómo obtiene el consumidor el subscriptionHandle antiguo? Hay dos opciones. El usuario puede haber mantenido el identificador devuelto por la operación original SubscribeAsync, y puede usarlo ahora. Como alternativa, si el consumidor no tiene el identificador, puede pedirle a IAsyncStream<T> todos los identificadores de sus suscripciones activas llamando a:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Después, el consumidor puede reanudar todos ellos o cancelar la suscripción de algunos si lo desea.

Sugerencia

Si el grano del consumidor implementa la IAsyncObserver<T> interfaz directamente (public class MyGrain<T> : Grain, IAsyncObserver<T>), en teoría no debería tener que adjuntar de nuevo el IAsyncObserver y, por tanto, no tendría que llamar a ResumeAsync. El tiempo de ejecución de streaming debería averiguar automáticamente que el grano ya implementa IAsyncObserver e invoca esos IAsyncObserver métodos. Sin embargo, el tiempo de ejecución de streaming actualmente no admite esto y el código del grain todavía debe llamar a ResumeAsync explícitamente, incluso si el grain implementa IAsyncObserver directamente.

Suscripciones explícitas e implícitas

De forma predeterminada, un consumidor de flujo debe suscribirse explícitamente al flujo. Normalmente, una suscripción se desencadena mediante un mensaje externo que recibe el grano (o el cliente) que le indica que se suscriba. Por ejemplo, en un servicio de chat, cuando un usuario se une a una sala de chat, su componente recibe un JoinChatGroup mensaje con el nombre del chat, lo que provoca que el componente del usuario se suscriba a este flujo de chat.

Además, Orleans las secuencias admiten suscripciones implícitas. En este modelo, el grano no se suscribe explícitamente. Se suscribe de forma automática e implícita en función de su identidad de granulado y de un ImplicitStreamSubscriptionAttribute. El valor principal de las suscripciones implícitas es permitir que la actividad de flujo de datos desencadene la activación del proceso (y, por tanto, la suscripción) automáticamente. Por ejemplo, mediante secuencias SMS, si un grano quisiera generar una secuencia y otro grano procesarla, el productor necesitaría la identidad del grano consumidor y realizar una llamada entre granos indicándole que se suscriba. Solo entonces podría empezar a enviar eventos. En su lugar, con las suscripciones implícitas, el productor puede empezar a generar eventos en un flujo, y el componente del consumidor se activa y suscribe automáticamente. En este caso, el productor no necesita saber quién lee los eventos.

MyGrainType de implementación de grano puede declarar un atributo [ImplicitStreamSubscription("MyStreamNamespace")]. Esto le indica al entorno de ejecución de streaming que cuando se genera un evento en una secuencia con el GUID de identidad XXX y el espacio de nombres "MyStreamNamespace", debe entregarse al grano con la identidad XXX de tipo MyGrainType. Es decir, el runtime asigna el flujo <XXX, MyStreamNamespace> al grano del consumidor <XXX, MyGrainType>.

La presencia de ImplicitStreamSubscription hace que el tiempo de ejecución de streaming suscriba automáticamente este grano al flujo y entregue los eventos del flujo a él. Sin embargo, el código de grano todavía debe indicar al tiempo de ejecución cómo quiere que se procesen los eventos. Básicamente, debe adjuntar IAsyncObserver. Por lo tanto, cuando el grano se active, el código del grano dentro de OnActivateAsync debe llamar a:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Escritura de lógica de suscripción

A continuación se muestran instrucciones para escribir lógica de suscripción en varios casos: suscripciones explícitas e implícitas, flujos rebobinables y no rebobinables. La principal diferencia entre las suscripciones explícitas e implícitas es que, para las suscripciones implícitas, el grano siempre tiene exactamente una suscripción implícita por espacio de nombres de secuencia. No hay forma de crear varias suscripciones (sin multiplicidad de suscripción), ni forma de cancelar la suscripción, y la lógica de grano solo necesita adjuntar la lógica de procesamiento. Esto también significa que nunca es necesario reanudar una suscripción implícita. Por otro lado, para las suscripciones explícitas, debe reanudar la suscripción; De lo contrario, la suscripción de nuevo da como resultado que el grano se suscriba varias veces.

Suscripciones implícitas:

En el caso de las suscripciones implícitas, el intervalo de agregación aún debe suscribirse para adjuntar la lógica de procesamiento. Puede hacer esto en el grain del consumidor mediante la implementación de las interfaces IStreamSubscriptionObserver y IAsyncObserver<T>, lo que permite que el grain se active de manera independiente de la suscripción. Para suscribirse a la secuencia, el intervalo de agregación crea un identificador y llama a await handle.ResumeAsync(this) en su método OnSubscribed(...).

Para procesar mensajes, implemente el IAsyncObserver<T>.OnNextAsync(...) método para recibir datos de flujo y un token de secuencia. Como alternativa, el ResumeAsync método puede tomar un conjunto de delegados que representan los métodos de la IAsyncObserver<T> interfaz: onNextAsync, onErrorAsyncy onCompletedAsync.

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

Suscripciones explícitas:

En el caso de las suscripciones explícitas, un grando debe llamar a SubscribeAsync para suscribirse al flujo. Esto crea una suscripción y adjunta la lógica de procesamiento. La suscripción explícita existe hasta que el usuario se da de baja. Si un grano desactiva y vuelve a activarse, todavía está suscrito explícitamente, pero no se adjunta ninguna lógica de procesamiento. En este caso, el grano debe volver a adjuntar la lógica de procesamiento. Para ello, en su OnActivateAsync, el grano primero debe consultar sus suscripciones llamando a IAsyncStream<T>.GetAllSubscriptionHandles(). El proceso debe ejecutar ResumeAsync en cada identificador con el que desee continuar procesando, o UnsubscribeAsync en cualquier identificador con el que haya terminado. El grano también puede especificar opcionalmente el StreamSequenceToken como argumento en las llamadas ResumeAsync, haciendo que esta suscripción explícita empiece a consumir desde ese token.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    }
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Orden y tokens de flujo

El orden de entrega de eventos entre un productor individual y el consumidor depende del proveedor de transmisiones.

Con SMS, el productor controla explícitamente el orden de los eventos vistos por el consumidor controlando cómo los publica. De forma predeterminada (si la opción SimpleMessageStreamProviderOptions.FireAndForgetDelivery del proveedor de SMS es false) y si el productor espera cada llamada OnNextAsync, los eventos llegan en orden FIFO. En SMS, el productor decide cómo manejar los errores de entrega indicados por un Task fallido devuelto por la llamada OnNextAsync.

Los flujos de cola de Azure no garantizan el orden FIFO porque las colas subyacentes de Azure no garantizan el orden en los casos de error (aunque garantizan el orden FIFO en ejecuciones sin errores). Cuando un productor genera un evento en una cola de Azure, si se produce un error en la operación de cola, el productor debe intentar otra cola y, posteriormente, tratar con posibles mensajes duplicados. En el lado de entrega, el Orleans tiempo de ejecución de streaming quita el evento e intenta entregarlo para su procesamiento a los consumidores. El tiempo de ejecución elimina el evento de la cola solo tras el procesamiento correcto. Si se produce un error en la entrega o el procesamiento, el evento no se elimina de la cola y vuelve a aparecer automáticamente más adelante. El tiempo de ejecución de Streaming intenta entregarlo de nuevo, potencialmente rompiendo el orden FIFO. Este comportamiento coincide con la semántica normal de las colas de Azure.

Orden definido por la aplicación: para controlar los problemas de ordenación anteriores, la aplicación puede especificar opcionalmente su ordenación. Consigue esto mediante un StreamSequenceTokenobjeto , un objeto opaco IComparable que se usa para ordenar eventos. Un productor puede pasar un StreamSequenceToken opcional a la llamada OnNextAsync. Esto StreamSequenceToken se pasa al consumidor y es entregado con el evento. De este modo, la aplicación puede razonar y reconstruir su orden independientemente del entorno de ejecución de streaming.

Flujos rebobinables

Algunas secuencias solo permiten suscribirse a partir del último momento en el tiempo, mientras que otras permiten "retroceder en el tiempo". Esta funcionalidad depende de la tecnología de puesta en cola subyacente y del proveedor de flujos específico. Por ejemplo, Azure Queues solo permite consumir los eventos en cola más recientes, mientras que Event Hubs permite reproducir eventos desde cualquier punto arbitrario en el tiempo (hasta su fecha de caducidad). Las secuencias que admiten el retroceso en el tiempo se denominan secuencias rebobinables.

El consumidor de un flujo rebobinable puede pasar un StreamSequenceToken a la llamada SubscribeAsync. El tiempo de ejecución entrega eventos a partir de ese StreamSequenceToken. Un token null significa que el consumidor quiere recibir eventos a partir del más reciente.

La capacidad de rebobinar un flujo es muy útil en escenarios de recuperación. Por ejemplo, considere un grano que se suscribe a una secuencia y controla periódicamente su estado junto con el token de secuencia más reciente. Al recuperarse de un error, el grano puede volver a suscribirse a la misma secuencia desde el último token de secuencia de punto de control, recuperando sin perder ningún evento generado desde el último punto de control.

El proveedor de Centros de Eventos es retrocedible. Puede encontrar su código en GitHub: Orleans/Azure/Orleans.Streaming.EventHubs. Los proveedores de SMS (ahora Canal de difusión) y Cola de Azureno son retrocedibles.

Procesamiento de escalado horizontal sin estado automáticamente

De forma predeterminada, los Orleans destinos de streaming admiten un gran número de flujos relativamente pequeños, cada uno procesado por uno o varios granos con estado. Colectivamente, el procesamiento de todas las secuencias se particiona entre muchos granos regulares (con estado). El código de la aplicación controla este particionamiento mediante la asignación de identificadores de flujo e identificadores de grano, y mediante la suscripción explícita. El objetivo es el procesamiento con estado particionado.

Sin embargo, también hay un escenario interesante de procesamiento sin estado escalado automáticamente. En este escenario, una aplicación tiene un pequeño número de secuencias (o incluso una secuencia grande) y el objetivo es el procesamiento sin estado. Un ejemplo es una secuencia global de eventos en los que el procesamiento implica descodificar cada evento y reenviarlo a otros flujos para un procesamiento con estado adicional. El procesamiento de flujos de escalado horizontal sin estado puede ser compatible en Orleans mediante StatelessWorkerAttribute granos.

Estado actual del procesamiento sin estado escalable automáticamente: Esto aún no se ha implementado. Intentar suscribirse a una transmisión desde un grano da como resultado un comportamiento StatelessWorker indefinido. Estamos considerando admitir esta opción.

Granos y clientes de Orleans

Los flujos de Orleans funcionan uniformemente entre granos y clientes de Orleans. Esto significa que puede usar las mismas API dentro de un grain y en un Orleans cliente para producir y consumir eventos. Esto simplifica considerablemente la lógica de la aplicación, lo que hace que las API especiales del lado cliente, como los observadores de granos sean redundantes.

Pub-sub de flujo totalmente administrado y confiable

Para realizar un seguimiento de las suscripciones de streaming, Orleans usa un componente en tiempo de ejecución denominado Streaming Pub-Sub, que actúa como punto de encuentro para los consumidores y productores de flujos. Pub-sub realiza un seguimiento de todas las suscripciones de flujos, las conserva y empareja a los consumidores de flujos con los productores de flujos.

Las aplicaciones pueden elegir dónde y cómo se almacenan los datos Pub-Sub. El propio componente de Pub-Sub se implementa como granos (denominados PubSubRendezvousGrain), que usan persistencia declarativa de Orleans. PubSubRendezvousGrain usa el proveedor de almacenamiento denominado PubSubStore. Al igual que sucede con cualquier grano, puede designar una implementación para un proveedor de almacenamiento. Para Streaming Pub-Sub, puede cambiar la implementación de PubSubStore durante el tiempo de construcción del silo mediante el generador de hosts de silo.

A continuación se configura Pub-Sub para almacenar su estado en tablas de Azure.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

De este modo, los datos Pub-Sub se almacenan de forma permanente en Azure Table. Para el desarrollo inicial, también se puede usar el almacenamiento de memoria. Además de Pub-Sub, el runtime de Orleans Streaming Runtime entrega eventos de productores a consumidores, administra todos los recursos en tiempo de ejecución asignados a flujos usados activamente y recopila de forma transparente los recursos de runtime de secuencias no utilizadas.

Configuración

Para utilizar flujos, debe habilitar los proveedores de flujos a través del host de silo o los constructores de cliente de clúster. Configuración del proveedor de flujos de ejemplo:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Consulte también

Proveedores de secuencias de Orleans