Compartir a través de


Utilice el estimador de flujo de cambios

En este artículo se describe cómo puede supervisar el progreso de las instancias del procesador de flujo de cambios mientras leen el flujo de cambios.

¿Por qué es importante supervisar el progreso?

El procesador de alimentador de cambios actúa como un puntero que avanza a través del alimentador de cambios y entrega los cambios a una implementación delegada.

La implementación del procesador de flujo de cambios puede procesar los cambios a un ritmo específico en función de sus recursos disponibles, como la CPU, la memoria, la red, etc.

Si esta velocidad es más lenta que la velocidad a la que se producen los cambios en el contenedor de Azure Cosmos DB, el procesador comienza a retardar.

La identificación de este escenario ayuda a comprender si necesitamos escalar la implementación del procesador de fuente de cambios.

Implementar el estimador de fuente de cambios

Como modelo de empuje para notificaciones automáticas

Al igual que el procesador de fuente de cambios, el estimador de fuente de cambios puede operar como modelo de inserción. El estimador mide la diferencia entre el último elemento procesado (definido por el estado del contenedor de arrendamientos) y el último cambio en el contenedor, y envía este valor a un delegado. El intervalo en el que se toma la medida también se puede personalizar con un valor predeterminado de 5 segundos.

Por ejemplo, si el procesador de cambio de datos utiliza el modo de versión más reciente y está definido de esta forma:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

La manera correcta de inicializar un estimador para medir ese procesador estaría usando GetChangeFeedEstimatorBuilder de la siguiente manera:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Donde tanto el procesador como el estimador comparten el mismo leaseContainer y el mismo nombre.

Los otros dos parámetros son el delegado, que recibe un número que representa cuántos cambios están pendientes de ser leídos por el procesador y el intervalo de tiempo en el que desea que se tome esta medida.

Un ejemplo de delegado que recibe la estimación es:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Puede enviar esta estimación a la solución de supervisión y usarla para comprender cómo se comporta el progreso con el tiempo.

Como estimación detallada a petición

A diferencia del modelo push, hay una alternativa que le permite obtener la estimación bajo demanda. Este modelo también proporciona información más detallada:

  • Retraso estimado por concesión.
  • La instancia propietaria y el procesamiento de cada concesión, por lo que puede identificar si hay un problema en una instancia.

Si el procesador de flujo de cambios se define de la siguiente manera:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Puede crear el estimador con la misma configuración de concesión:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

Y siempre que quiera, con la frecuencia que necesite, puede obtener la estimación detallada:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Cada ChangeFeedProcessorState contiene la información de arrendamiento y retraso, y también cuál es la instancia actual que lo posee.

Implementación del estimador

No es necesario implementar el estimador de fuente de cambios como parte del procesador de fuente de cambios ni formar parte del mismo proyecto. Se recomienda implementar el estimador en una instancia independiente de los procesadores. Una sola instancia de estimador puede realizar un seguimiento del progreso de todos los arrendamientos e instancias en la implementación del procesador de feed de cambios.

Cada estimación consume unidades de solicitud de contenedores supervisados y arrendados. Una frecuencia de 1 minuto entre es un buen punto de partida, cuanto menor sea la frecuencia, mayor será el consumo de las unidades de solicitud.

Modos de fuente de cambios admitidos

El estimador de fuente de cambios se puede usar tanto para el modo de versión más reciente como para todas las versiones y el modo de eliminaciones. En ambos modos, no se garantiza que la estimación proporcionada sea un recuento exacto de cambios pendientes en el proceso.

Recursos adicionales

Pasos siguientes

Ahora puede aprender más sobre el procesador de fuente de cambios en el artículo siguiente: