다음을 통해 공유


워크플로 오케스트레이션 매니저를 사용하여 기존 파이프라인 실행

적용 대상: Azure Data Factory Azure Synapse Analytics

기업용 올인원 분석 솔루션인 Microsoft Fabric의 Data Factory를 사용해 보세요. Microsoft Fabric은 데이터 이동부터 데이터 과학, 실시간 분석, 비즈니스 인텔리전스 및 보고에 이르기까지 모든 것을 다룹니다. 무료로 새 평가판을 시작하는 방법을 알아봅니다!

중요합니다

2026년 1월 1일부터는 ADF의 워크플로우 오케스트레이션 매니저를 사용해 새로운 에어플로우 인스턴스를 생성할 수 없습니다. 2025년 12월 31일 이전에 모든 워크플로우 오케스트레이션 매니저(Azure Data Factory의 Apache Airflow) 워크로드를 Microsoft Fabric의 Apache Airflow 작업 으로 이전할 것을 권장합니다.

자세한 내용 또는 Microsoft Fabric의 Apache Airflow로 마이그레이션하는 동안 지원을 보려면 Microsoft 지원에 문의하세요.

Data Factory 파이프라인은 확장 가능하고 안정적인 데이터 통합/데이터 흐름을 제공하는 100개 이상의 데이터 원본 커넥터를 제공합니다. Apache Airflow DAG에서 기존 데이터 팩터리 파이프라인을 실행하려는 시나리오가 있습니다. 이 자습서에서는 방법을 보여줍니다.

필수 조건

단계

  1. 아래 내용을 사용하여 새 Python 파일 adf.py를 만듭니다.

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    워크플로 오케스트레이션 관리자 UI 관리자 - 연결 -> '+' ->> '연결 유형'을 'Azure Data Factory'로 선택한 다음, client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_namepipeline_name 입력해야 합니다.

  2. DAGS라는 폴더 내에서 Blob Storage에 adf.py 파일을 업로드합니다.

  3. DAGS 폴더를 워크플로 오케스트레이션 관리자 환경으로 가져옵니다. 계정이 없는 경우 새 계정을 만듭니다.

    Airflow 섹션이 선택된 데이터 팩터리 관리 탭을 보여 주는 스크린샷