적용 대상:
Azure Data Factory
Azure Synapse Analytics
팁
기업용 올인원 분석 솔루션인 Microsoft Fabric의 Data Factory를 사용해 보세요. Microsoft Fabric은 데이터 이동부터 데이터 과학, 실시간 분석, 비즈니스 인텔리전스 및 보고에 이르기까지 모든 것을 다룹니다. 무료로 새 평가판을 시작하는 방법을 알아봅니다!
중요합니다
2026년 1월 1일부터는 ADF의 워크플로우 오케스트레이션 매니저를 사용해 새로운 에어플로우 인스턴스를 생성할 수 없습니다. 2025년 12월 31일 이전에 모든 워크플로우 오케스트레이션 매니저(Azure Data Factory의 Apache Airflow) 워크로드를 Microsoft Fabric의 Apache Airflow 작업 으로 이전할 것을 권장합니다.
자세한 내용 또는 Microsoft Fabric의 Apache Airflow로 마이그레이션하는 동안 지원을 보려면 Microsoft 지원에 문의하세요.
Data Factory 파이프라인은 확장 가능하고 안정적인 데이터 통합/데이터 흐름을 제공하는 100개 이상의 데이터 원본 커넥터를 제공합니다. Apache Airflow DAG에서 기존 데이터 팩터리 파이프라인을 실행하려는 시나리오가 있습니다. 이 자습서에서는 방법을 보여줍니다.
필수 조건
- Azure 구독. Azure 구독이 아직 없는 경우 시작하기 전에 Azure 체험 계정을 만듭니다.
- Azure 스토리지 계정. 스토리지 계정이 없는 경우 Azure Storage 계정 만들기를 참조하세요. 스토리지 계정이 선택한 네트워크에서만 액세스를 허용하는지 확인합니다.
- Azure Data Factory 파이프라인. 아직 없는 경우 자습서에 따라 새 데이터 팩터리 파이프라인을 만들거나 시작 및 첫 번째 데이터 팩터리 파이프라인 사용해 보기에서 만들 수도 있습니다.
- 서비스 주체 설정. 워크플로 오케스트레이션 관리자 환경과 파이프라인이 동일한 데이터 팩터리에 있는 경우에도 새 서비스 주체를 만들거나 기존 서비스 주체를 사용하고 파이프라인(예: 기존 파이프라인이 있는 데이터 팩터리의 기여자 역할)을 실행할 수 있는 권한을 부여해야 합니다. 서비스 주체의 클라이언트 ID 및 클라이언트 암호(API 키)를 가져와야 합니다.
단계
아래 내용을 사용하여 새 Python 파일 adf.py를 만듭니다.
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor워크플로 오케스트레이션 관리자 UI 관리자 - 연결 -> '+' ->> '연결 유형'을 'Azure Data Factory'로 선택한 다음, client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name 및 pipeline_name 입력해야 합니다.
DAGS라는 폴더 내에서 Blob Storage에 adf.py 파일을 업로드합니다.
DAGS 폴더를 워크플로 오케스트레이션 관리자 환경으로 가져옵니다. 계정이 없는 경우 새 계정을 만듭니다.