다음을 통해 공유


Data Warehouse에서 Lakehouse로 데이터 증분 로드

이 자습서에서는 데이터 웨어하우스에서 Lakehouse로 새 데이터 또는 변경된 데이터만 복사하는 방법을 알아봅니다. 이 방법은 증분 로드라고 하며, 매번 모든 항목을 복사하지 않고 데이터를 up-to-date로 유지하려는 경우에 유용합니다.

솔루션의 개략적인 디자인은 다음과 같습니다.

증분 로드 데이터 로직을 보여주는 다이어그램.

  1. 워터마크 열을 선택합니다. 원본 테이블에서 새 레코드 또는 변경된 레코드를 추적하는 데 도움이 되는 열을 하나 선택합니다. 이 열에는 일반적으로 행을 추가하거나 업데이트할 때 증가하는 값(예: 타임스탬프 또는 ID)이 포함됩니다. 이 열에서 가장 높은 값을 "워터마크"로 사용하여 중단한 위치를 확인합니다.

  2. 마지막 워터마크 값을 저장하도록 테이블을 설정합니다.

  3. 다음을 수행하는 파이프라인을 빌드합니다.

    파이프라인에는 다음 작업이 포함됩니다.

    • 두 가지 조회 작업입니다. 처음 항목은 마지막 워터마크 값(우리가 마지막으로 중지한 지점)을 가져옵니다. 두 번째 것은 새 워터마크 값을 가져옵니다(이번에 중지할 위치입니다). 두 값 모두 복사 작업으로 전달됩니다.
    • 워터마크 열 값이 이전 워터마크와 새 워터마크 사이에 있는 행을 찾는 복사 작업입니다. 그런 다음 데이터 웨어하우스에서 Lakehouse로 이 데이터를 새 파일로 복사합니다.
    • 다음 파이프라인 실행에서 시작할 위치를 알 수 있도록 새 워터마크 값을 저장하는 저장 프로시저 작업입니다.

필수 조건

  • 데이터 웨어하우스. 데이터 웨어하우스를 원본 데이터 저장소로 사용합니다. 없는 경우 지침은 데이터 웨어하우스 만들기를 확인하세요.
  • Lakehouse. Lakehouse를 대상 데이터 저장소로 사용합니다. 없는 경우 지침은 Lakehouse 만들기를 참조하세요.
    • IncrementalCopy라는 폴더를 만들어 복사한 데이터를 저장합니다.

원본 준비

증분 복사 파이프라인을 구성하기 전에 데이터 웨어하우스에 필요한 테이블 및 저장 프로시저를 설정해 보겠습니다.

1. Data Warehouse에 데이터 원본 테이블 만들기

Data Warehouse에서 다음 SQL 명령을 실행하여 원본 테이블로 data_source_table 테이블을 만듭니다. 증분 복사본에 대한 샘플 데이터로 사용합니다.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

원본 테이블의 데이터는 다음과 같습니다.

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

이 자습서에서는 LastModifytime 을 워터마크 열로 사용합니다.

2. Data Warehouse에 마지막 워터마크 값을 저장할 또 다른 테이블 만들기

  1. Data Warehouse에서 다음 SQL 명령을 실행하여 마지막 워터마크 값을 저장할 워터마크테이블이라는 테이블을 만듭니다.

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. 원본 테이블 이름으로 마지막 워터마크의 기본값을 설정합니다. 이 자습서에서는 테이블 이름이 data_source_table 기본값 1/1/2010 12:00:00 AM을 .로 설정합니다.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. watermarktable의 데이터를 확인합니다.

    Select * from watermarktable
    

    출력:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Data Warehouse에 저장 프로시저 만들기

다음 명령을 실행하여 Data Warehouse에 저장 프로시저를 만듭니다. 이 저장 프로시저는 각 파이프라인 실행 후 마지막 워터마크 값을 업데이트합니다.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

증분 복사를 위한 파이프라인 구성

1단계: 파이프라인 만들기

  1. Power BI로 이동합니다.

  2. 화면 왼쪽 아래에서 Power BI 아이콘을 선택한 다음 패브릭을 선택합니다.

  3. 내 작업 영역을 선택하여 패브릭 작업 영역을 엽니다.

  4. + 새 항목을 선택한 다음 파이프라인을 선택한 다음 파이프라인 이름을 입력하여 새 파이프라인을 만듭니다.

    새로 만든 작업 영역의 새 파이프라인 단추를 보여 주는 스크린샷

    새 파이프라인을 만드는 이름을 보여주는 스크린샷.

2단계: 마지막 워터마크에 대한 조회 작업 추가

이 단계에서는 조회 작업을 만들어 마지막 워터마크 값을 가져옵니다. 앞에서 설정한 기본값 1/1/2010 12:00:00 AM 을 가져옵니다.

  1. 파이프라인 작업을 선택하고 드롭다운 목록에서 조회를 선택합니다.

  2. 일반 탭에서 이 작업의 이름을 LookupOldWaterMarkActivity로 바꿉니다.

  3. 설정 탭에서 다음을 구성합니다.

    • 연결: 웨어하우스 아래에서 모두 찾아보기를 선택하고 목록에서 데이터 웨어하우스를 선택합니다.
    • 쿼리 사용: 테이블을 선택합니다.
    • 테이블: dbo.watermarktable을 선택합니다.
    • 첫 번째 행만 해당: 선택되었습니다.

    이전 워터마크를 조회하는 스크린샷.

3단계: 새 워터마크에 대한 조회 작업 추가

이 단계에서는 새 워터마크 값을 가져오는 조회 작업을 만듭니다. 쿼리를 사용하여 원본 데이터 테이블에서 새 워터마크를 가져옵니다. data_source_table에서 LastModifytime 열의 가장 높은 값을 가져옵니다.

  1. 위쪽 막대의 작업 탭에서 조회를 선택하여 두 번째 조회 작업을 추가합니다.

  2. 일반 탭에서 이 작업의 이름을 LookupNewWaterMarkActivity로 바꿉니다.

  3. 설정 탭에서 다음을 구성합니다.

    • 연결: 웨어하우스 에서 모두 찾아보기를 선택하고 목록에서 데이터 웨어하우스를 선택하거나 패브릭 항목 연결에서 데이터 웨어하우스를 선택합니다.

    • 쿼리 사용: 쿼리를 선택합니다.

    • 쿼리: 다음 쿼리를 입력하여 마지막으로 수정한 최대 시간을 새 워터마크로 선택합니다.

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • 첫 번째 행만 해당: 선택되었습니다.

    새 워터마크를 조회하는 스크린샷.

4단계: 증분 데이터를 복사하는 복사 작업 추가

이 단계에서는 데이터 웨어하우스에서 Lakehouse로 마지막 워터마크와 새 워터마크 사이에 증분 데이터를 복사하는 복사 작업을 추가합니다.

  1. 위쪽 표시줄에서 활동을 선택하고 데이터 복사 ->캔버스에 추가를 선택하여 복사 작업을 가져옵니다.

  2. 일반 탭에서 이 작업의 이름을 IncrementalCopyActivity로 바꿉니다.

  3. 조회 활동에 연결된 '성공 시' 라고 표시된 녹색 단추를 복사 작업으로 드래그하여 두 조회 활동을 복사 작업에 연결합니다. 복사 활동의 테두리 색이 녹색으로 변경되면 마우스 단추를 놓습니다.

    조회 및 복사 작업 연결을 보여 주는 스크린샷.

  4. 원본 탭에서 다음을 구성합니다.

    • 연결: 웨어하우스 에서 모두 찾아보기를 선택하고 목록에서 데이터 웨어하우스를 선택하거나 패브릭 항목 연결에서 데이터 웨어하우스를 선택합니다.

    • 창고: 창고를 선택합니다.

    • 쿼리 사용: 쿼리를 선택합니다.

    • 쿼리: 마지막 워터마크와 새 워터마크 사이에 증분 데이터를 복사하려면 다음 쿼리를 입력합니다.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    데이터 원본 구성을 보여 주는 스크린샷.

  5. 대상 탭에서 다음을 구성합니다.

    • 연결: Lakehouse 에서 모두 찾아보기를 선택하고 목록에서 레이크하우스를 선택하거나 패브릭 항목 연결에서 레이크하우스를 선택합니다.
    • Lakehouse: Lakehouse를 선택하세요.
    • 루트 폴더: 파일을 선택합니다.
    • 파일 경로: 복사한 데이터를 저장할 폴더를 선택합니다. 폴더를 선택하려면 찾아보기를 선택하세요. 파일 이름의 경우 동적 콘텐츠 추가를 열고 @CONCAT('Incremental-', pipeline().RunId, '.txt')을(를) 열린 창에 입력하여 Lakehouse에서 복사한 데이터 파일의 파일 이름을 만듭니다.
    • 파일 형식: 데이터의 형식 유형을 선택합니다.

    복사 대상 구성을 보여주는 스크린샷.

5단계: 저장 프로시저 작업 추가

이 단계에서는 저장 프로시저 작업을 추가하여 다음 파이프라인 실행에 대한 마지막 워터마크 값을 업데이트합니다.

  1. 상단 표시줄에서 활동을 선택하고 저장 프로시저를 선택하여 저장 프로시저 작업을 추가합니다.

  2. 일반 탭에서 이 작업의 이름을 StoredProceduretoWriteWatermarkActivity로 바꿉니다.

  3. 복사 작업의 녹색(성공 시) 출력을 저장 프로시저 작업에 연결합니다.

  4. 설정 탭에서 다음을 구성합니다.

    • Data Warehouse: Data Warehouse를 선택합니다.

    • 저장 프로시저 이름: 데이터 웨어하우스에서 만든 저장 프로시저를 선택합니다. [dbo].[ usp_write_watermark].

    • 저장 프로시저 매개 변수를 확장합니다. 저장 프로시저 매개 변수에 대한 값을 설정하려면 가져오기를 선택하고 매개 변수에 대해 다음 값을 입력합니다.

      이름 타입
      최종 수정 시간 날짜시간 @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      테이블 이름 문자열 @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    저장 프로시저 활동 구성을 보여 주는 스크린샷.

6단계: 파이프라인 실행 및 결과 모니터링

위쪽 막대의 탭에서 실행을 선택합니다. 그런 다음 저장 및 실행을 선택합니다. 파이프라인 실행이 시작되고 출력 탭에서 모니터링할 수 있습니다.

파이프라인 실행 결과를 보여 주는 스크린샷.

Lakehouse로 이동하면 데이터 파일이 선택한 폴더 아래에 있음을 알 수 있습니다. 파일을 선택하여 복사한 데이터를 미리 볼 수 있습니다.

첫 번째 파이프라인 실행에 대한 레이크하우스 데이터를 보여주는 스크린샷.

첫 번째 파이프라인 실행에 대한 Lakehouse 데이터 미리 보기를 보여 주는 스크린샷.

더 많은 데이터를 추가하여 증분 복사 결과 확인하기

첫 번째 파이프라인 실행을 완료한 후 데이터 웨어하우스 원본 테이블에 더 많은 데이터를 추가하여 이 파이프라인이 증분 데이터를 복사할 수 있는지 알아보겠습니다.

1단계: 원본에 더 많은 데이터 추가

다음 쿼리를 실행하여 Data Warehouse에 새 데이터를 삽입합니다.

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

data_source_table용 업데이트된 데이터는 다음과 같습니다.

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

2단계: 다른 파이프라인 실행 트리거 및 결과 모니터링

파이프라인 페이지로 돌아갑니다. 위쪽 막대에서 탭에서 실행을 다시 선택합니다. 파이프라인 실행이 시작되고 출력에서 모니터링할 수 있습니다.

Lakehouse로 이동하면 새로 복사한 데이터 파일이 선택한 폴더 아래에 있음을 알 수 있습니다. 파일을 선택하여 복사한 데이터를 미리 볼 수 있습니다. 증분 데이터가 이 파일에 표시됩니다.

두 번째 파이프라인 실행에 대한 Lakehouse 데이터를 보여 주는 스크린샷.

두 번째 파이프라인 실행에 대한 Lakehouse 데이터 미리 보기를 보여 주는 스크린샷.

다음으로, Azure Blob Storage에서 Lakehouse로 복사하는 방법에 대해 자세히 알아봅니다.