다음을 통해 공유


Livy API를 사용하여 Livy 일괄 처리 작업 제출 및 실행

적용 대상:✅ Microsoft Fabric의 데이터 엔지니어링 및 데이터 과학

패브릭 데이터 엔지니어링용 Livy API를 사용하여 Spark 일괄 처리 작업을 제출하는 방법을 알아봅니다. Livy API는 현재 AZURE SPN(서비스 주체)을 지원하지 않습니다.

필수 조건

  • Lakehouse와 함께하는 Fabric 프리미엄 또는 평가판 용량

  • 원격 클라이언트, 예를 들어 Jupyter Notebook, PySpark 및 Python용 Microsoft 인증 라이브러리(MSAL)가 포함된 Visual Studio Code와 같은 클라이언트입니다.

  • 패브릭 Rest API에 액세스하려면 Microsoft Entra 앱 토큰이 필요합니다. Microsoft ID 플랫폼에 애플리케이션 등록

  • 이 예제에서는 NYC Taxi & 리무진 커미션의 green_tripdata_2022_08 parquet 파일을 이용하여 레이크하우스에 로드된 일부 데이터를 설명합니다.

Livy API는 작업에 대한 통합 엔드포인트를 정의합니다. 이 문서의 예제를 따를 때 자리 표시자 {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID}을(를) 적절한 값으로 바꿉니다.

Livy API Batch에 대한 Visual Studio Code 구성

  1. 패브릭 레이크하우스에서 레이크하우스 설정을 선택합니다.

    Lakehouse 설정을 보여 주는 스크린샷

  2. Livy 엔드포인트 섹션으로 이동합니다.

    Lakehouse Livy 엔드포인트 및 세션 작업 연결 문자열 보여 주는 스크린샷

  3. Batch 작업 연결 문자열(이미지의 두 번째 빨간색 상자)를 코드에 복사합니다.

  4. Microsoft Entra 관리 센터로 이동하여 애플리케이션(클라이언트) ID와 디렉터리(테넌트) ID를 모두 코드에 복사합니다.

    Microsoft Entra 관리 센터의 Livy API 앱 개요를 보여 주는 스크린샷

Spark Batch 코드 만들기 및 Lakehouse에 업로드

  1. .ipynb Visual Studio Code에서 Notebook을 만들고 다음 코드를 삽입합니다.

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
        #Spark session builder
        spark_session = (SparkSession
            .builder
            .appName("batch_demo") 
            .getOrCreate())
    
        spark_context = spark_session.sparkContext
        spark_context.setLogLevel("DEBUG")  
    
        tableName = spark_context.getConf().get("spark.targetTable")
    
        if tableName is not None:
            print("tableName: " + str(tableName))
        else:
            print("tableName is None")
    
        df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0")
        df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4))
    
    
        deltaTablePath = f"Tables/{tableName}CleanedTransactions"
        df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Python 파일을 로컬로 저장합니다. 이 Python 코드 페이로드는 Lakehouse의 데이터에 대해 작동하고 Lakehouse에 업로드해야 하는 두 개의 Spark 문을 포함합니다. 페이로드의 ABFS 경로는 Visual Studio Code에서 Livy API 일괄 처리 작업을 참조하고, Select SQL 문의 Lakehouse 테이블 이름을 지정할 때 필요합니다.

    Python 페이로드 셀을 보여 주는 스크린샷

  3. Lakehouse의 파일 섹션에 Python 페이로드를 업로드합니다. Lakehouse 탐색기에서 파일을 선택합니다. 그런 다음 >> 가져오기를 선택합니다. 파일 선택기를 통해 파일을 선택합니다.

    Lakehouse의 파일 섹션에 페이로드를 보여 주는 스크린샷

  4. 파일이 Lakehouse의 파일 섹션에 있으면 페이로드 파일 이름 오른쪽에 있는 세 개의 점을 클릭하고 속성을 선택합니다.

    Lakehouse의 파일 속성에서 페이로드 ABFS 경로를 보여 주는 스크린샷.

  5. 1단계에서 Notebook 셀에 이 ABFS 경로를 복사합니다.

Microsoft Entra 사용자 토큰 또는 Microsoft Entra SPN 토큰을 사용하여 Livy API Spark 일괄 처리 세션 인증

Microsoft Entra SPN 토큰을 사용하여 Livy API Spark 일괄 처리 세션 인증

  1. Visual Studio Code에서 .ipynb 노트북을 만들고 다음 코드를 삽입합니다.

    import sys
    from msal import ConfidentialClientApplication
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Service Principal Application ID
    
    # Certificate paths - Update these paths to your certificate files
    certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem"      # Public certificate file
    private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem"      # Private key file
    certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint
    
    # OAuth settings
    audience = "https://analysis.windows.net/powerbi/api/.default"
    authority = f"https://login.windows.net/{tenant_id}"
    
    def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        This function uses certificate-based authentication which is more secure than client secrets.
    
        Args:
            client_id (str): The Service Principal's client ID  
            audience (str): The audience for the token (resource scope)
            authority (str): The OAuth authority URL
            certificate_path (str): Path to the certificate file (.pem format)
            private_key_path (str): Path to the private key file (.pem format)
            certificate_thumbprint (str): Certificate thumbprint (optional but recommended)
    
        Returns:
            str: The access token for API authentication
    
        Raises:
            Exception: If token acquisition fails
        """
        try:
            # Read the certificate from PEM file
            with open(certificate_path, "r", encoding="utf-8") as f:
                certificate_pem = f.read()
    
            # Read the private key from PEM file
            with open(private_key_path, "r", encoding="utf-8") as f:
                private_key_pem = f.read()
    
            # Create the confidential client application
            app = ConfidentialClientApplication(
                client_id=client_id,
                authority=authority,
                client_credential={
                    "private_key": private_key_pem,
                    "thumbprint": certificate_thumbprint,
                    "certificate": certificate_pem
                }
            )
    
            # Acquire token using client credentials flow
            token_response = app.acquire_token_for_client(scopes=[audience])
    
            if "access_token" in token_response:
                print("Successfully acquired access token")
                return token_response["access_token"]
            else:
                raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}")
    
        except FileNotFoundError as e:
            print(f"Certificate file not found: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"Error retrieving token: {e}", file=sys.stderr)
            sys.exit(1)
    
    # Get the access token
    token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)
    
  2. Notebook 셀을 실행하면 Microsoft Entra 토큰이 반환됩니다.

    셀을 실행한 후 반환된 Microsoft Entra SPN 토큰을 보여 주는 스크린샷

Microsoft Entra 사용자 토큰을 사용하여 Livy API Spark 세션 인증

  1. Visual Studio Code에서 .ipynb 노트북을 만들고 다음 코드를 삽입합니다.

    from msal import PublicClientApplication
    import requests
    import time
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Application ID (can be the same as above or different)
    
    # Required scopes for Microsoft Fabric API access
    scopes = [
        "https://api.fabric.microsoft.com/Lakehouse.Execute.All",      # Execute operations in lakehouses
        "https://api.fabric.microsoft.com/Lakehouse.Read.All",        # Read lakehouse metadata
        "https://api.fabric.microsoft.com/Item.ReadWrite.All",        # Read/write fabric items
        "https://api.fabric.microsoft.com/Workspace.ReadWrite.All",   # Access workspace operations
        "https://api.fabric.microsoft.com/Code.AccessStorage.All",    # Access storage from code
        "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All",     # Access Azure Key Vault
        "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", # Access Azure Data Explorer
        "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All",     # Access Azure Data Lake
        "https://api.fabric.microsoft.com/Code.AccessFabric.All"             # General Fabric access
    ]
    
    def get_access_token(tenant_id, client_id, scopes):
        """
        Get an access token using interactive authentication.
    
        This method will open a browser window for user authentication.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID
            client_id (str): The application client ID
            scopes (list): List of required permission scopes
    
        Returns:
            str: The access token, or None if authentication fails
        """
        app = PublicClientApplication(
            client_id,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
    
        print("Opening browser for interactive authentication...")
        token_response = app.acquire_token_interactive(scopes=scopes)
    
        if "access_token" in token_response:
            print("Successfully authenticated")
            return token_response["access_token"]
        else:
            print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}")
            return None
    
    # Uncomment the lines below to use interactive authentication
    token = get_access_token(tenant_id, client_id, scopes)
    print("Access token acquired via interactive login")
    
  2. Notebook 셀을 실행하면 로그인할 ID를 선택할 수 있는 팝업이 브라우저에 표시됩니다.

    Microsoft Entra 앱에 대한 로그온 화면을 보여 주는 스크린샷

  3. 로그인할 ID를 선택한 후에는 Microsoft Entra 앱 등록 API 권한을 승인해야 합니다.

    Microsoft Entra 앱 API 권한을 보여 주는 스크린샷

  4. 인증을 완료한 후 브라우저 창을 닫습니다.

    인증 완료를 보여 주는 스크린샷

  5. Visual Studio Code에서 Microsoft Entra 토큰이 반환된 것을 볼 수 있습니다.

    셀을 실행하고 로그인한 후 반환된 Microsoft Entra 토큰을 보여 주는 스크린샷

Livy Batch를 제출하고 일괄 처리 작업을 모니터링합니다.

  1. 다른 Notebook 셀을 추가하고 이 코드를 삽입합니다.

    # submit payload to existing batch session
    
    import requests
    import time
    import json
    
    api_base_url = "https://api.fabric.microsoft.com/v1"  # Base URL for Fabric APIs
    
    # Fabric Resource IDs - Replace with your workspace and lakehouse IDs  
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Construct the Livy Batch API URL
    # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches
    livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches"
    
    # Set up authentication headers
    headers = {"Authorization": f"Bearer {token}"}
    
    print(f"Livy Batch API URL: {livy_base_url}")
    
    new_table_name = "TABLE_NAME"  # Name for the new table
    
    # Configure the batch job
    print("Configuring batch job parameters...")
    
    # Batch job configuration - Modify these values for your use case
    payload_data = {
        # Job name - will appear in the Fabric UI
        "name": f"livy_batch_demo_{new_table_name}",
    
        # Path to your Python file in the lakehouse
        "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>",  # Replace with your Python file path
    
        # Optional: Spark configuration parameters
        "conf": {
            "spark.targetTable": new_table_name,  # Custom configuration for your application
        },
    }
    
    print("Batch Job Configuration:")
    print(json.dumps(payload_data, indent=2))
    
    try:
        # Submit the batch job
        print("\nSubmitting batch job...")
        post_batch = requests.post(livy_base_url, headers=headers, json=payload_data)
    
        if post_batch.status_code == 202:
            batch_info = post_batch.json()
            print("Livy batch job submitted successfully!")
            print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}")
    
            # Extract batch ID for monitoring
            batch_id = batch_info['id']
            livy_batch_get_url = f"{livy_base_url}/{batch_id}"
    
            print(f"\nBatch Job ID: {batch_id}")
            print(f"Monitoring URL: {livy_batch_get_url}")
    
        else:
            print(f"Failed to submit batch job. Status code: {post_batch.status_code}")
            print(f"Response: {post_batch.text}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error occurred: {e}")
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response text: {post_batch.text}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  2. Notebook 셀을 실행하면 Livy Batch 작업이 만들어지고 실행될 때 여러 줄이 인쇄됩니다.

    Livy Batch 작업이 성공적으로 제출된 후 Visual Studio Code의 결과를 보여 주는 스크린샷

  3. 변경 내용을 보려면 레이크하우스로 다시 이동합니다.

패브릭 환경과 통합

기본적으로 이 Livy API 세션은 작업 영역에 대한 기본 시작 풀에 대해 실행됩니다. 또는 패브릭 환경 만들기, 구성 및 Microsoft Fabric의 환경을 사용하여 Livy API 세션에서 이러한 Spark 작업에 사용하는 Spark 풀을 사용자 지정할 수 있습니다. 패브릭 환경을 사용하려면 이전 notebook 셀을 한 줄 바꾸어 업데이트하세요.

payload_data = {
    "name":"livybatchdemo_with"+ newlakehouseName,
    "file":"abfss://YourABFSPathToYourPayload.py", 
    "conf": {
        "spark.targetLakehouse": "Fabric_LakehouseID",
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}"  # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
        }
    }

모니터링 허브에서 작업 보기

왼쪽 탐색 링크에서 모니터를 선택하여 모니터링 허브에 액세스하여 다양한 Apache Spark 활동을 볼 수 있습니다.

  1. 일괄 처리 작업이 완료되면 모니터로 이동하여 세션 상태를 볼 수 있습니다.

    모니터링 허브의 이전 Livy API 제출을 보여 주는 스크린샷

  2. 가장 최근 활동 이름을 선택하고 엽니다.

    모니터링 허브의 최신 Livy API 활동을 보여 주는 스크린샷

  3. 이 Livy API 세션 사례에서는 이전 일괄 처리 제출, 실행 세부 정보, Spark 버전 및 구성을 볼 수 있습니다. 오른쪽 위에 중지된 상태를 확인합니다.

    모니터링 허브의 최신 Livy API 활동 세부 정보를 보여 주는 스크린샷

전체 프로세스를 요약하려면 Visual Studio Code, Microsoft Entra 앱 토큰, Livy API 엔드포인트 URL, Lakehouse에 대한 인증, Lakehouse의 Spark 페이로드 및 마지막으로 일괄 처리 Livy API 세션과 같은 원격 클라이언트가 필요합니다.