이 자습서에서는 데이터 웨어하우스에서 Lakehouse로 새 데이터 또는 변경된 데이터만 복사하는 방법을 알아봅니다. 이 방법은 증분 로드라고 하며, 매번 모든 항목을 복사하지 않고 데이터를 up-to-date로 유지하려는 경우에 유용합니다.
솔루션의 개략적인 디자인은 다음과 같습니다.
워터마크 열을 선택합니다. 원본 테이블에서 새 레코드 또는 변경된 레코드를 추적하는 데 도움이 되는 열을 하나 선택합니다. 이 열에는 일반적으로 행을 추가하거나 업데이트할 때 증가하는 값(예: 타임스탬프 또는 ID)이 포함됩니다. 이 열에서 가장 높은 값을 "워터마크"로 사용하여 중단한 위치를 확인합니다.
마지막 워터마크 값을 저장하도록 테이블을 설정합니다.
다음을 수행하는 파이프라인을 빌드합니다.
파이프라인에는 다음 작업이 포함됩니다.
- 두 가지 조회 작업입니다. 처음 항목은 마지막 워터마크 값(우리가 마지막으로 중지한 지점)을 가져옵니다. 두 번째 것은 새 워터마크 값을 가져옵니다(이번에 중지할 위치입니다). 두 값 모두 복사 작업으로 전달됩니다.
- 워터마크 열 값이 이전 워터마크와 새 워터마크 사이에 있는 행을 찾는 복사 작업입니다. 그런 다음 데이터 웨어하우스에서 Lakehouse로 이 데이터를 새 파일로 복사합니다.
- 다음 파이프라인 실행에서 시작할 위치를 알 수 있도록 새 워터마크 값을 저장하는 저장 프로시저 작업입니다.
필수 조건
- 데이터 웨어하우스. 데이터 웨어하우스를 원본 데이터 저장소로 사용합니다. 없는 경우 지침은 데이터 웨어하우스 만들기를 확인하세요.
-
Lakehouse. Lakehouse를 대상 데이터 저장소로 사용합니다. 없는 경우 지침은 Lakehouse 만들기를 참조하세요.
- IncrementalCopy라는 폴더를 만들어 복사한 데이터를 저장합니다.
원본 준비
증분 복사 파이프라인을 구성하기 전에 데이터 웨어하우스에 필요한 테이블 및 저장 프로시저를 설정해 보겠습니다.
1. Data Warehouse에 데이터 원본 테이블 만들기
Data Warehouse에서 다음 SQL 명령을 실행하여 원본 테이블로 data_source_table 테이블을 만듭니다. 증분 복사본에 대한 샘플 데이터로 사용합니다.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
원본 테이블의 데이터는 다음과 같습니다.
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
이 자습서에서는 LastModifytime 을 워터마크 열로 사용합니다.
2. Data Warehouse에 마지막 워터마크 값을 저장할 또 다른 테이블 만들기
Data Warehouse에서 다음 SQL 명령을 실행하여 마지막 워터마크 값을 저장할 워터마크테이블이라는 테이블을 만듭니다.
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );원본 테이블 이름으로 마지막 워터마크의 기본값을 설정합니다. 이 자습서에서는 테이블 이름이 data_source_table 기본값
1/1/2010 12:00:00 AM을 .로 설정합니다.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')watermarktable의 데이터를 확인합니다.
Select * from watermarktable출력:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Data Warehouse에 저장 프로시저 만들기
다음 명령을 실행하여 Data Warehouse에 저장 프로시저를 만듭니다. 이 저장 프로시저는 각 파이프라인 실행 후 마지막 워터마크 값을 업데이트합니다.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
증분 복사를 위한 파이프라인 구성
1단계: 파이프라인 만들기
Power BI로 이동합니다.
화면 왼쪽 아래에서 Power BI 아이콘을 선택한 다음 패브릭을 선택합니다.
내 작업 영역을 선택하여 패브릭 작업 영역을 엽니다.
+ 새 항목을 선택한 다음 파이프라인을 선택한 다음 파이프라인 이름을 입력하여 새 파이프라인을 만듭니다.
2단계: 마지막 워터마크에 대한 조회 작업 추가
이 단계에서는 조회 작업을 만들어 마지막 워터마크 값을 가져옵니다. 앞에서 설정한 기본값 1/1/2010 12:00:00 AM 을 가져옵니다.
파이프라인 작업을 선택하고 드롭다운 목록에서 조회를 선택합니다.
일반 탭에서 이 작업의 이름을 LookupOldWaterMarkActivity로 바꿉니다.
설정 탭에서 다음을 구성합니다.
- 연결: 웨어하우스 아래에서 모두 찾아보기를 선택하고 목록에서 데이터 웨어하우스를 선택합니다.
- 쿼리 사용: 테이블을 선택합니다.
- 테이블: dbo.watermarktable을 선택합니다.
- 첫 번째 행만 해당: 선택되었습니다.
3단계: 새 워터마크에 대한 조회 작업 추가
이 단계에서는 새 워터마크 값을 가져오는 조회 작업을 만듭니다. 쿼리를 사용하여 원본 데이터 테이블에서 새 워터마크를 가져옵니다. data_source_table에서 LastModifytime 열의 가장 높은 값을 가져옵니다.
위쪽 막대의 작업 탭에서 조회를 선택하여 두 번째 조회 작업을 추가합니다.
일반 탭에서 이 작업의 이름을 LookupNewWaterMarkActivity로 바꿉니다.
설정 탭에서 다음을 구성합니다.
연결: 웨어하우스 에서 모두 찾아보기를 선택하고 목록에서 데이터 웨어하우스를 선택하거나 패브릭 항목 연결에서 데이터 웨어하우스를 선택합니다.
쿼리 사용: 쿼리를 선택합니다.
쿼리: 다음 쿼리를 입력하여 마지막으로 수정한 최대 시간을 새 워터마크로 선택합니다.
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table첫 번째 행만 해당: 선택되었습니다.
4단계: 증분 데이터를 복사하는 복사 작업 추가
이 단계에서는 데이터 웨어하우스에서 Lakehouse로 마지막 워터마크와 새 워터마크 사이에 증분 데이터를 복사하는 복사 작업을 추가합니다.
위쪽 표시줄에서 활동을 선택하고 데이터 복사 ->캔버스에 추가를 선택하여 복사 작업을 가져옵니다.
일반 탭에서 이 작업의 이름을 IncrementalCopyActivity로 바꿉니다.
조회 활동에 연결된 '성공 시' 라고 표시된 녹색 단추를 복사 작업으로 드래그하여 두 조회 활동을 복사 작업에 연결합니다. 복사 활동의 테두리 색이 녹색으로 변경되면 마우스 단추를 놓습니다.
원본 탭에서 다음을 구성합니다.
연결: 웨어하우스 에서 모두 찾아보기를 선택하고 목록에서 데이터 웨어하우스를 선택하거나 패브릭 항목 연결에서 데이터 웨어하우스를 선택합니다.
창고: 창고를 선택합니다.
쿼리 사용: 쿼리를 선택합니다.
쿼리: 마지막 워터마크와 새 워터마크 사이에 증분 데이터를 복사하려면 다음 쿼리를 입력합니다.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
대상 탭에서 다음을 구성합니다.
- 연결: Lakehouse 에서 모두 찾아보기를 선택하고 목록에서 레이크하우스를 선택하거나 패브릭 항목 연결에서 레이크하우스를 선택합니다.
- Lakehouse: Lakehouse를 선택하세요.
- 루트 폴더: 파일을 선택합니다.
-
파일 경로: 복사한 데이터를 저장할 폴더를 선택합니다. 폴더를 선택하려면 찾아보기를 선택하세요. 파일 이름의 경우 동적 콘텐츠 추가를 열고
@CONCAT('Incremental-', pipeline().RunId, '.txt')을(를) 열린 창에 입력하여 Lakehouse에서 복사한 데이터 파일의 파일 이름을 만듭니다. - 파일 형식: 데이터의 형식 유형을 선택합니다.
5단계: 저장 프로시저 작업 추가
이 단계에서는 저장 프로시저 작업을 추가하여 다음 파이프라인 실행에 대한 마지막 워터마크 값을 업데이트합니다.
상단 표시줄에서 활동을 선택하고 저장 프로시저를 선택하여 저장 프로시저 작업을 추가합니다.
일반 탭에서 이 작업의 이름을 StoredProceduretoWriteWatermarkActivity로 바꿉니다.
복사 작업의 녹색(성공 시) 출력을 저장 프로시저 작업에 연결합니다.
설정 탭에서 다음을 구성합니다.
Data Warehouse: Data Warehouse를 선택합니다.
저장 프로시저 이름: 데이터 웨어하우스에서 만든 저장 프로시저를 선택합니다. [dbo].[ usp_write_watermark].
저장 프로시저 매개 변수를 확장합니다. 저장 프로시저 매개 변수에 대한 값을 설정하려면 가져오기를 선택하고 매개 변수에 대해 다음 값을 입력합니다.
이름 타입 값 최종 수정 시간 날짜시간 @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} 테이블 이름 문자열 @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
6단계: 파이프라인 실행 및 결과 모니터링
위쪽 막대의 홈 탭에서 실행을 선택합니다. 그런 다음 저장 및 실행을 선택합니다. 파이프라인 실행이 시작되고 출력 탭에서 모니터링할 수 있습니다.
Lakehouse로 이동하면 데이터 파일이 선택한 폴더 아래에 있음을 알 수 있습니다. 파일을 선택하여 복사한 데이터를 미리 볼 수 있습니다.
더 많은 데이터를 추가하여 증분 복사 결과 확인하기
첫 번째 파이프라인 실행을 완료한 후 데이터 웨어하우스 원본 테이블에 더 많은 데이터를 추가하여 이 파이프라인이 증분 데이터를 복사할 수 있는지 알아보겠습니다.
1단계: 원본에 더 많은 데이터 추가
다음 쿼리를 실행하여 Data Warehouse에 새 데이터를 삽입합니다.
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
data_source_table용 업데이트된 데이터는 다음과 같습니다.
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
2단계: 다른 파이프라인 실행 트리거 및 결과 모니터링
파이프라인 페이지로 돌아갑니다. 위쪽 막대에서 홈 탭에서 실행을 다시 선택합니다. 파이프라인 실행이 시작되고 출력에서 모니터링할 수 있습니다.
Lakehouse로 이동하면 새로 복사한 데이터 파일이 선택한 폴더 아래에 있음을 알 수 있습니다. 파일을 선택하여 복사한 데이터를 미리 볼 수 있습니다. 증분 데이터가 이 파일에 표시됩니다.
관련 콘텐츠
다음으로, Azure Blob Storage에서 Lakehouse로 복사하는 방법에 대해 자세히 알아봅니다.