다음을 통해 공유


대량 실행기 라이브러리에서 Azure Cosmos DB .NET V3 SDK의 대량 지원으로 마이그레이션

적용 대상: NoSQL

이 문서에서는 .NET 대량 실행기 라이브러리를 사용하는 기존 애플리케이션의 코드를 최신 버전의 .NET SDK에서 대량 지원 기능으로 마이그레이션하는 데 필요한 단계를 설명합니다.

대량 지원 활성화

CosmosClient 구성을 통해 인스턴스에서 대량 지원을 사용합니다.

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

각 운영에 대한 작업 만들기

.NET SDK의 대량 지원은 작업 병렬 라이브러리와 동시에 발생하는 그룹화 작업을 활용하여 작동합니다.

SDK에는 문서 또는 작업 목록을 입력 매개 변수로 사용하는 단일 메서드가 없지만 대량으로 실행하려는 각 작업에 대해 작업을 만든 다음 완료될 때까지 기다리기만 하면 됩니다.

예를 들어 초기 입력이 각 항목에 다음 스키마가 있는 항목 목록인 경우:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

대량 가져오기(BulkExecutor.BulkImportAsync 사용과 유사)를 수행하려면 CreateItemAsync에 대한 동시 호출이 있어야 합니다. 예시:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

대량 업데이트(BulkExecutor.BulkUpdateAsync 사용과 유사)를 수행하려면 항목 값을 업데이트한 후 ReplaceItemAsync 메서드에 대한 동시 호출이 있어야 합니다. 예시:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

그리고 대량 삭제(BulkExecutor.BulkDeleteAsync 사용과 유사)를 수행하려면 각 항목의 DeleteItemAsync 및 파티션 키를 사용하여 id에 대한 동시 호출을 수행해야 합니다. 예시:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

작업 결과 상태 캡처

이전 코드 예에서는 동시 작업 목록을 만들고 각 작업에서 CaptureOperationResponse 메서드를 호출했습니다. 이 메서드는 오류를 캡처하고 요청 단위 사용량을 추적하여 BulkExecutor와 유사한 응답 스키마를 유지할 수 있게 해주는 확장입니다.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

여기서 OperationResponse는 다음과 같이 선언됩니다.

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

작업을 동시에 실행

전체 작업 목록의 범위를 추적하기 위해 다음 도우미 클래스를 사용합니다.

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

ExecuteAsync 메서드는 모든 작업이 완료될 때까지 기다리며 다음과 같이 사용할 수 있습니다.

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

통계 수집

이전 코드는 모든 작업이 완료될 때까지 기다렸다가 필요한 통계를 계산합니다. 이러한 통계는 대량 실행기 라이브러리의 BulkImportResponse 통계와 유사합니다.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse에는 다음이 포함됩니다.

  1. 대량 지원을 통해 작업 목록을 처리하는 데 걸린 총 시간입니다.
  2. 성공한 작업의 수입니다.
  3. 총 요청 단위 소비량입니다.
  4. 오류가 있는 경우 예외가 포함된 튜플 목록과 로깅 및 식별을 위한 관련 항목이 표시됩니다.

재시도 구성

대량 실행기 라이브러리에는 라이브러리에 제어를 위임하기 위해 RetryOptionsMaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequests으로 설정하는 것을 언급한 0이 있었습니다.

.NET SDK의 대량 지원에는 숨겨진 동작이 없습니다. CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequestsCosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests를 통해 직접 재시도 옵션을 구성할 수 있습니다.

참고

프로비저닝된 요청 단위가 데이터 양에 따라 예상보다 훨씬 낮은 경우 높은 값으로 설정하는 것을 고려할 수 있습니다. 대량 작업은 시간이 더 오래 걸리지만 재시도 횟수가 높기 때문에 완전히 성공할 확률이 더 높습니다.

성능 개선 사항

.NET SDK를 사용한 다른 작업과 마찬가지로 스트리밍 API를 사용하면 성능이 향상되고 불필요한 직렬화가 방지됩니다.

스트리밍 API 사용은 사용하는 데이터의 특성이 바이트 스트리밍(예: 파일 스트리밍)의 특성과 일치하는 경우에만 가능합니다. 이러한 경우 CreateItemStreamAsync, ReplaceItemStreamAsync 또는 DeleteItemStreamAsync 메서드를 사용하고 ResponseMessage(ItemResponse 대신)를 사용하면 달성할 수 있는 처리량이 증가합니다.

다음 단계