共用方式為


使用 Livy API 提交和執行 Livy 批次作業

適用於:✅Microsoft Fabric 中的資料工程和資料科學

瞭解如何使用適用於網狀架構數據工程的 Livy API 提交 Spark 批次作業。 Livy API 目前不支援 Azure 服務主體 (SPN)。

必要條件

Livy API 會定義作業的統一端點。 遵循本文範例時,請將佔位符 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID} 和 {Fabric_LakehouseID} 替換為您合適的值。

設定 Visual Studio Code 以支援 Livy API 批次處理

  1. 在 Fabric Lakehouse 中選取 [Lakehouse 設定 ]。

    顯示 Lakehouse 設定的螢幕快照。

  2. 移至 Livy 端點 區段。

    顯示 Lakehouse Livy 端點和會話工作作業連接字串的螢幕快照。

  3. 將 Batch 作業 連接字串 (映像中的第二個紅色方塊) 複製到您的程式代碼。

  4. 流覽至 Microsoft Entra 系統管理中心 ,並將應用程式 (用戶端) 識別碼和目錄 (租使用者) 識別碼複製到您的程式碼。

    顯示 Microsoft Entra 系統管理中心中 Livy API 應用程式概觀的螢幕快照。

建立 Spark 批次程式碼並上傳至您的 Lakehouse

  1. 在 Visual Studio Code 中建立 .ipynb 筆記本,並插入下列程式代碼

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
        #Spark session builder
        spark_session = (SparkSession
            .builder
            .appName("batch_demo") 
            .getOrCreate())
    
        spark_context = spark_session.sparkContext
        spark_context.setLogLevel("DEBUG")  
    
        tableName = spark_context.getConf().get("spark.targetTable")
    
        if tableName is not None:
            print("tableName: " + str(tableName))
        else:
            print("tableName is None")
    
        df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0")
        df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4))
    
    
        deltaTablePath = f"Tables/{tableName}CleanedTransactions"
        df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. 將 Python 檔案儲存在本機。 Python 程式碼載荷包含兩個 Spark 語句,可在 Lakehouse 中處理資料,需要上傳至您的 Lakehouse。 您需要使用 ABFS 路徑來在 Visual Studio Code 的 Livy API 批次作業中引用,並在 Select SQL 陳述式中指定 Lakehouse 資料表名稱。

    顯示 Python 承載儲存格的螢幕快照。

  3. 將 Python 貨載上傳至 Lakehouse 的檔案部分。 在 Lakehouse 瀏覽器中,選取[檔案]。 然後選取 [ >取得資料>] [上傳檔案]。 透過檔案選擇器選取檔案。

    螢幕快照顯示 Lakehouse 檔案區段中的承載。

  4. 當檔案已位於 Lakehouse 的 [檔案] 區段後,按一下您的檔案名稱右邊的三個點,然後選取 [屬性]。

    顯示 Lakehouse 中檔案屬性中承載 ABFS 路徑的螢幕快照。

  5. 將此 ABFS 路徑複製到步驟 1 中的 Notebook 單元格。

使用 Microsoft Entra 使用者權杖或 Microsoft Entra SPN 權杖來驗證 Livy API Spark 批次工作階段

使用 Microsoft Entra SPN 權杖來驗證 Livy API Spark 批次工作程序

  1. 在 Visual Studio Code 中建立 .ipynb 筆記本,並插入下列程式代碼。

    import sys
    from msal import ConfidentialClientApplication
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Service Principal Application ID
    
    # Certificate paths - Update these paths to your certificate files
    certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem"      # Public certificate file
    private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem"      # Private key file
    certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint
    
    # OAuth settings
    audience = "https://analysis.windows.net/powerbi/api/.default"
    authority = f"https://login.windows.net/{tenant_id}"
    
    def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        This function uses certificate-based authentication which is more secure than client secrets.
    
        Args:
            client_id (str): The Service Principal's client ID  
            audience (str): The audience for the token (resource scope)
            authority (str): The OAuth authority URL
            certificate_path (str): Path to the certificate file (.pem format)
            private_key_path (str): Path to the private key file (.pem format)
            certificate_thumbprint (str): Certificate thumbprint (optional but recommended)
    
        Returns:
            str: The access token for API authentication
    
        Raises:
            Exception: If token acquisition fails
        """
        try:
            # Read the certificate from PEM file
            with open(certificate_path, "r", encoding="utf-8") as f:
                certificate_pem = f.read()
    
            # Read the private key from PEM file
            with open(private_key_path, "r", encoding="utf-8") as f:
                private_key_pem = f.read()
    
            # Create the confidential client application
            app = ConfidentialClientApplication(
                client_id=client_id,
                authority=authority,
                client_credential={
                    "private_key": private_key_pem,
                    "thumbprint": certificate_thumbprint,
                    "certificate": certificate_pem
                }
            )
    
            # Acquire token using client credentials flow
            token_response = app.acquire_token_for_client(scopes=[audience])
    
            if "access_token" in token_response:
                print("Successfully acquired access token")
                return token_response["access_token"]
            else:
                raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}")
    
        except FileNotFoundError as e:
            print(f"Certificate file not found: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"Error retrieving token: {e}", file=sys.stderr)
            sys.exit(1)
    
    # Get the access token
    token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)
    
  2. 執行筆記本儲存格,您應該會看到返回的 Microsoft Entra 憑證。

    螢幕擷取畫面顯示執行儲存格之後傳回的 Microsoft Entra SPN 權杖。

使用 Microsoft Entra 使用者權杖驗證 Livy API Spark 會話

  1. 在 Visual Studio Code 中建立 .ipynb 筆記本,並插入下列程式代碼。

    from msal import PublicClientApplication
    import requests
    import time
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Application ID (can be the same as above or different)
    
    # Required scopes for Microsoft Fabric API access
    scopes = [
        "https://api.fabric.microsoft.com/Lakehouse.Execute.All",      # Execute operations in lakehouses
        "https://api.fabric.microsoft.com/Lakehouse.Read.All",        # Read lakehouse metadata
        "https://api.fabric.microsoft.com/Item.ReadWrite.All",        # Read/write fabric items
        "https://api.fabric.microsoft.com/Workspace.ReadWrite.All",   # Access workspace operations
        "https://api.fabric.microsoft.com/Code.AccessStorage.All",    # Access storage from code
        "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All",     # Access Azure Key Vault
        "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", # Access Azure Data Explorer
        "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All",     # Access Azure Data Lake
        "https://api.fabric.microsoft.com/Code.AccessFabric.All"             # General Fabric access
    ]
    
    def get_access_token(tenant_id, client_id, scopes):
        """
        Get an access token using interactive authentication.
    
        This method will open a browser window for user authentication.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID
            client_id (str): The application client ID
            scopes (list): List of required permission scopes
    
        Returns:
            str: The access token, or None if authentication fails
        """
        app = PublicClientApplication(
            client_id,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
    
        print("Opening browser for interactive authentication...")
        token_response = app.acquire_token_interactive(scopes=scopes)
    
        if "access_token" in token_response:
            print("Successfully authenticated")
            return token_response["access_token"]
        else:
            print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}")
            return None
    
    # Uncomment the lines below to use interactive authentication
    token = get_access_token(tenant_id, client_id, scopes)
    print("Access token acquired via interactive login")
    
  2. 執行筆記本數據格,瀏覽器中應該會出現彈出視窗,讓您選擇要用來登入的身分識別。

    顯示登入畫面至 Microsoft Entra 應用程式的螢幕快照。

  3. 選擇要登入時使用的身分識別之後,您必須核准 Microsoft Entra 應用程式註冊 API 許可權。

    顯示Microsoft Entra 應用程式 API 許可權的螢幕快照。

  4. 完成驗證之後關閉瀏覽器視窗。

    顯示驗證完成的螢幕快照。

  5. 在 Visual Studio Code 中,您應該會看到傳回 Microsoft Entra 令牌。

    此螢幕快照顯示執行儲存格和登入之後傳回Microsoft Entra 令牌。

提交 Livy 批次並監視批次作業。

  1. 新增另一個筆記本數據格,並插入此程序代碼。

    # submit payload to existing batch session
    
    import requests
    import time
    import json
    
    api_base_url = "https://api.fabric.microsoft.com/v1"  # Base URL for Fabric APIs
    
    # Fabric Resource IDs - Replace with your workspace and lakehouse IDs  
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Construct the Livy Batch API URL
    # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches
    livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches"
    
    # Set up authentication headers
    headers = {"Authorization": f"Bearer {token}"}
    
    print(f"Livy Batch API URL: {livy_base_url}")
    
    new_table_name = "TABLE_NAME"  # Name for the new table
    
    # Configure the batch job
    print("Configuring batch job parameters...")
    
    # Batch job configuration - Modify these values for your use case
    payload_data = {
        # Job name - will appear in the Fabric UI
        "name": f"livy_batch_demo_{new_table_name}",
    
        # Path to your Python file in the lakehouse
        "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>",  # Replace with your Python file path
    
        # Optional: Spark configuration parameters
        "conf": {
            "spark.targetTable": new_table_name,  # Custom configuration for your application
        },
    }
    
    print("Batch Job Configuration:")
    print(json.dumps(payload_data, indent=2))
    
    try:
        # Submit the batch job
        print("\nSubmitting batch job...")
        post_batch = requests.post(livy_base_url, headers=headers, json=payload_data)
    
        if post_batch.status_code == 202:
            batch_info = post_batch.json()
            print("Livy batch job submitted successfully!")
            print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}")
    
            # Extract batch ID for monitoring
            batch_id = batch_info['id']
            livy_batch_get_url = f"{livy_base_url}/{batch_id}"
    
            print(f"\nBatch Job ID: {batch_id}")
            print(f"Monitoring URL: {livy_batch_get_url}")
    
        else:
            print(f"Failed to submit batch job. Status code: {post_batch.status_code}")
            print(f"Response: {post_batch.text}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error occurred: {e}")
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response text: {post_batch.text}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  2. 執行筆記本單元格,您應該會看到幾行列印出來,表示 Livy Batch 作業已建立並執行。

    顯示成功提交 Livy Batch 作業之後 Visual Studio Code 中結果的螢幕快照。

  3. 若要查看變更,請流覽回您的 Lakehouse。

與網狀架構環境整合

根據預設,此 Livy API 會話會針對工作區的預設起始池執行。 或者,您可以使用 Fabric 環境 建立、設定及使用 Microsoft Fabric 中的環境,以自訂 Livy API 會話所使用的 Spark 作業的 Spark 集區。 若要使用您的 Fabric 環境,請使用此一行變更來更新先前的筆記本儲存格。

payload_data = {
    "name":"livybatchdemo_with"+ newlakehouseName,
    "file":"abfss://YourABFSPathToYourPayload.py", 
    "conf": {
        "spark.targetLakehouse": "Fabric_LakehouseID",
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}"  # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
        }
    }

在監控集中控制台檢視您的職務

您可以選取左側導覽連結中的 [監視],來存取監視中樞以檢視各種 Apache Spark 活動。

  1. 當批次作業完成狀態時,您可以流覽至 [監視] 來檢視會話狀態。

    顯示監視中樞中先前 Livy API 提交的螢幕快照。

  2. 選取並開啟最新的活動名稱。

    顯示監視中樞內最新 Livy API 活動的螢幕快照。

  3. 在此 Livy API 工作階段案例中,您可以看到先前的批次提交、執行詳細數據、Spark 版本和設定。 請注意右上方的已停止狀態。

    顯示監視中樞內最新 Livy API 活動詳細數據的螢幕快照。

若要回顧整個程式,您需要遠端用戶端,例如 Visual Studio Code、Microsoft Entra 應用程式令牌、Livy API 端點 URL、對 Lakehouse 的 Spark 驗證、Lakehouse 中的 Spark 承載,以及最後一個批次 Livy API 會話。