共用方式為


從變更提要處理器程式庫遷移到 Azure Cosmos DB .NET V3 SDK

本文說明將使用 變更資料處理程式庫 的現有應用程式程式碼遷移到最新版本 .NET SDK(亦稱為 .NET V3 SDK)變更 資料流 功能的必要步驟。

必要的程式碼變更

.NET V3 SDK 有多項重大變更,以下是遷移應用程式的關鍵步驟:

  1. DocumentCollectionInfo實例轉換為Container參考,以用於監控和租賃的容器。
  2. 自訂內容使用WithProcessorOptions的項目應更新為使用WithLeaseConfigurationWithPollInterval來表示間隔,使用WithStartTime來設定起始時間,以及使用WithMaxItems來定義最大項目數量。
  3. 設定 processorNameGetChangeFeedProcessorBuilder,以符合 ChangeFeedProcessorOptions.LeasePrefix 上的設定值,否則就使用 string.Empty
  4. 這些改動不再以 IReadOnlyList<Document> 的形式交付,而是以 IReadOnlyCollection<T> 形式交付,其中 T 是您需要定義的類型,不再有基礎物品類別。
  5. 要處理這些變更,你不再需要 的 IChangeFeedObserver實作,而是需要 定義一個代理。 代理可以是靜態函式,或者如果你需要在執行間維持狀態,也可以建立自己的類別,然後把實例方法當作代理傳遞。

例如,如果原始建置變更資訊處理器的程式碼如下:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

遷移後的程式碼如下:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

對於代表,你可以用靜態方式接收事件。 如果你從IChangeFeedObserverContext獲取資訊,可以遷移使用ChangeFeedProcessorContext

  • ChangeFeedProcessorContext.LeaseToken 可以用來代替 IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers 可以用來代替 IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics 包含關於故障排除請求延遲的詳細資訊
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

健康事件與可觀察性

如果你之前正在使用 IHealthMonitor 或正在利用 IChangeFeedObserver.OpenAsyncIChangeFeedObserver.CloseAsync請使用 Notifications API

  • IChangeFeedObserver.OpenAsync 可被替換為 WithLeaseAcquireNotification
  • IChangeFeedObserver.CloseAsync 可被替換為 WithLeaseReleaseNotification
  • IHealthMonitor.InspectAsync 可被替換為 WithErrorNotification

州政府與租賃貨櫃

類似於變更導向處理器函式庫,.NET V3 SDK 中的變更導覽功能使用 租約容器 來儲存狀態。 然而,這些模式不同。

SDK V3 變更資訊流處理器會偵測任何舊的函式庫狀態,並在遷移後的應用程式碼首次執行時自動將其遷移到新架構。

你可以安全地用舊程式碼停止應用程式,將程式碼遷移到新版本,啟動遷移後的應用程式,應用程式停止期間發生的任何變更都會被新版本接收並處理。

其他資源

後續步驟

你現在可以在以下文章中更進一步了解變更フィード處理器: