本文說明如何監控 變更串流 處理機在讀取變更資料時的進度。
為什麼監控進度很重要?
變更導向處理器作為指標,在 你的變更導向 中前進,並將變更交付給代理實作。
你的變更導向處理器部署可以根據其可用資源如 CPU、記憶體、網路等,以特定速率處理變更。
如果此速率比您在 Azure Cosmos DB 容器中發生變更的速率慢,您的處理器就會開始落後。
識別此情境有助於了解是否需要擴大變更導向處理工具的部署。
實作變更回饋估計器
作為自動通知的即時推送模型
與 變更饋源處理器類似,變更饋源估計器可作為推送模型運作。 估計器會衡量最後處理的項目(由租約容器狀態定義)與容器最新變更之間的差值,並將此值推送給代理。 測量的間隔也可以自訂,預設值為5秒。
舉例來說,如果你的變更進路處理器使用最新版本模式,定義如下:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
初始化估計量以測量該處理器的正確方法是如下 GetChangeFeedEstimatorBuilder :
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
同時處理器和估計器共享相同的 leaseContainer 及名稱。
另外兩個參數是委派(delegation),其接收代表處理器待讀取的變更數量的數值,以及您希望進行測量的時間間隔。
一個接收估計值的代表範例如下:
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
你可以將這份估算傳送給你的監測解決方案,並用它來了解進度隨時間的變化趨勢。
依需求進行詳細估算
與推送模型不同,有一種替代方案可以讓你隨時取得估算。 此模型亦提供更詳細的資訊:
- 每份租約的估計延遲。
- 擁有並處理每個租約的實例,這樣你才能判斷實例是否有問題。
如果你的變更饋送處理器定義如下:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
你可以用相同的租約配置建立估算器:
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
而且無論何時需要,頻率如何,都能取得詳細的估算:
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
每個 ChangeFeedProcessorState 都包含租約和延遲資訊,以及當前擁有這些資訊的實例。
估計器部署
變更來源估算器不必部署在你的變更來源處理器中,也不必屬於同一專案。 我們建議將估計器部署在獨立於處理器的實例上。 單一估算器實例可以追蹤您變更回饋處理器部署中所有的租約和實例的進度。
支援的變更饋源模式
變更饋源估計器可用於 最新版本模式 以及 所有版本與刪除模式。 在這兩種模式下,所提供的估算都無法保證是處理中未完成變更的精確數量。
其他資源
- Azure Cosmos DB SDK
- GitHub 上的使用範例(.NET 最新版本)
- GitHub 上的使用範例(.NET 所有版本與刪除)
- GitHub(Java)上的使用範例
- GitHub 上有更多範例
後續步驟
你現在可以在以下文章中進一步了解變換飼料處理機: