Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En esta sección se proporciona información general de alto nivel sobre la implementación de Orleans Stream. Describe los conceptos y detalles que no están visibles en el nivel de aplicación. Si solo tiene previsto usar secuencias, no tiene que leer esta sección.
Terminología:
Con el término "cola" nos referimos a cualquier tecnología de almacenamiento duradero que puede ingerir eventos de secuencia y permite extraer eventos o proporcionar un mecanismo basado en inserción para consumir eventos. Normalmente, para proporcionar escalabilidad, esas tecnologías ofrecen colas fragmentadas o particionadas. Por ejemplo, Las colas de Azure permiten crear varias colas y Event Hubs tiene varios centros.
Secuencias persistentes
Todos los Orleans proveedores de flujos persistentes comparten una implementación PersistentStreamProvidercomún. Estos proveedores de secuencias genéricos deben configurarse con un generador IQueueAdapterFactory específico de la tecnología.
Por ejemplo, para fines de prueba, disponemos de adaptadores de cola que generan sus propios datos de prueba en lugar de leerlos de una cola. El código siguiente muestra cómo configuramos un proveedor de flujos persistente para usar nuestro adaptador de cola personalizado (generador). Para ello, configura el proveedor de flujos persistente con una función de fábrica que se usa para crear el adaptador.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Cuando un productor de secuencias genera un nuevo elemento de secuencia y llama a stream.OnNext(), el entorno de ejecución de streaming de Orleans invoca el método adecuado en el IQueueAdapter de ese proveedor de secuencias que pone en cola directamente el elemento en la cola adecuada.
Agentes de extracción
En el centro del proveedor de secuencias persistentes se encuentran los agentes de extracción. Los agentes de extracción extraen eventos de un conjunto de colas duraderas y los entrega al código de la aplicación en granos que los consume. Los agentes de extracción podrían considerarse un "microservicio" distribuido, es decir, un componente distribuido elástico con particiones y alta disponibilidad. Los agentes de extracción se ejecutan dentro de los mismos silos que albergan los granos de aplicación y están totalmente gestionados por el entorno de ejecución de streaming Orleans.
StreamQueueMapper y StreamQueueBalancer
Los agentes de extracción se parametrizan con IStreamQueueMapper y IStreamQueueBalancer. IStreamQueueMapper proporciona una lista de todas las colas y también es responsable de asignar secuencias a colas. De este modo, el lado productor del proveedor de secuencias persistentes sabe en qué cola debe poner el mensaje.
El IStreamQueueBalancer expresa la forma en que las colas se equilibran entre los silos y los agentes de Orleans. El objetivo es asignar colas a los agentes de forma equilibrada para evitar cuellos de botella y admitir la elasticidad. Cuando se agrega un nuevo silo al clúster Orleans, las colas se reequilibran automáticamente en los silos antiguos y nuevos. StreamQueueBalancer permite personalizar ese proceso. Orleans tiene varios StreamQueueBalancers integrados, para admitir diferentes escenarios de equilibrio (gran y pequeño número de colas) y entornos diferentes (Azure, local, estático).
Siguiendo con el ejemplo anterior del generador de prueba, en el código siguiente se muestra cómo se puede configurar el asignador de colas y el equilibrador de colas.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
El código anterior configura GeneratorAdapterFactory para usar un asignador de colas con ocho colas y equilibra las colas en todo el clúster mediante DynamicClusterConfigDeploymentBalancer.
Protocolo de extracción
Cada silo ejecuta un conjunto de agentes de extracción, cada uno de los cuales realiza la extracción de una cola. Los propios agentes de extracción se implementan mediante un componente del runtime interno, denominado SystemTarget. Los componentes SystemTarget son esencialmente granos del runtime, están sujetos a simultaneidad uniproceso, pueden usar la mensajería de grano normal y son tan ligeros como los granos. A diferencia de los granos, los SystemTargets no son virtuales: se crean explícitamente (por el entorno de ejecución) y no tienen transparencia de ubicación. Al implementar agentes de extracción como SystemTargets, el entorno de ejecución de streaming de Orleans puede basarse en características de Orleans integradas y puede escalar a un gran número de colas, ya que crear un nuevo agente de extracción es tan barato como crear un nuevo intervalo de agregación.
Cada agente de extracción ejecuta un temporizador periódico que extrae de la cola mediante la invocación del método IQueueAdapterReceiver.GetQueueMessagesAsync. Los mensajes devueltos se colocan en la estructura de datos interna por agente denominada IQueueCache. Cada mensaje se inspecciona para averiguar su flujo de destino. El agente usa Pub-Sub para averiguar la lista de consumidores de secuencias que se suscribieron a esta secuencia. Una vez recuperada la lista de consumidores, el agente la almacena localmente (en su caché pub-sub), por lo que no es necesario consultar con Pub-Sub en cada mensaje. El agente también se suscribe a la publicación/suscripción para recibir notificaciones de los nuevos consumidores que se suscriben a esa secuencia. Este protocolo de enlace entre el agente y Pub-Sub garantiza una semántica de suscripción de streaming segura. Es decir, una vez que el consumidor se haya suscrito a la secuencia, verá todos los eventos que se generaron después de que se suscribiera. Además, el uso de StreamSequenceToken permite suscribirse en el pasado.
Caché de colas
IQueueCache es una estructura de datos interna por agente que permite desacoplar la eliminación de nuevos eventos de la cola y entregarlos a los consumidores. También permite desacoplar la entrega a diferentes flujos y distintos consumidores.
Imagina una situación en la que un flujo tiene 3 consumidores y uno de ellos es lento. Si no se tiene cuidado, este consumidor lento puede afectar al progreso del agente, ralentizar el consumo de otros consumidores de esa secuencia e incluso ralentizar la eliminación de la cola y la entrega de eventos para otras secuencias. Para evitar eso y permitir el paralelismo máximo en el agente, usamos IQueueCache.
IQueueCache almacena en búfer eventos de secuencia y proporciona una manera de que el agente entregue eventos a cada consumidor a su propio ritmo. La entrega por consumidor se implementa mediante el componente interno denominado IQueueCacheCursor, que realiza un seguimiento del progreso por consumidor. De este modo, cada consumidor recibe eventos a su propio ritmo: los consumidores rápidos reciben eventos tan rápido como se quitan de la cola, mientras que los consumidores lentos los reciben más adelante. Una vez que el mensaje se entrega a todos los consumidores, se puede eliminar de la memoria caché.
Contrapresión
La contrapresión en el tiempo de ejecución del streaming de Orleans se implementa en dos lugares: trasladar eventos de flujo de la cola al agente y entregar los eventos del agente a los consumidores de la secuencia.
Este último lo proporciona el mecanismo integrado Orleans de entrega de mensajes. Cada evento de secuencia se entrega desde el agente a los consumidores a través de la mensajería específica Orleans, uno a la vez. Es decir, los agentes envían un evento (o un lote de eventos de tamaño limitado) a cada consumidor de secuencias y esperan esta llamada. El siguiente evento no comenzará a entregarse hasta que la tarea del evento anterior se resolvió o se interrumpió. De este modo, limitamos naturalmente la tasa de entrega por consumidor a un mensaje a la vez.
Al traer eventos de secuencia de la cola al agente, el streaming de Orleans proporciona un nuevo mecanismo especial de presión de retorno. Dado que el agente desacopla la eliminación de eventos de la cola y su entrega a los consumidores, un único consumidor lento puede quedar tan retrasado que IQueueCache se llene. Para evitar IQueueCache que crezca indefinidamente, limitamos su tamaño (el límite de tamaño es configurable). Aun así, el agente nunca descarta eventos no entregados.
En su lugar, cuando la memoria caché empieza a llenarse, los agentes ralentizan la velocidad de eliminación de eventos de la cola. De este modo, podemos "superar" los períodos de entrega lentos ajustando la velocidad a la que consumimos desde la cola ("contrapresión") y volver a tasas de consumo rápidas más adelante. Para detectar los valles de entrega lenta, IQueueCache usa una estructura de datos interna de cubos de caché que realiza un seguimiento del progreso de la entrega de eventos a consumidores de flujos individuales. Esto da como resultado un sistema muy dinámico y autoajuste.