使用 Livy API 提交和执行 Livy 批处理作业

适用于:✅Microsoft Fabric 中的数据工程和数据科学

了解如何使用 Livy API for Fabric 数据工程提交 Spark 批处理作业。 Livy API 目前不支持 Azure 服务主体(SPN)。

先决条件

Livy API 定义用于操作的统一终结点。 按照本文中的示例操作时,请将占位符 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID} 和 {Fabric_LakehouseID} 替换为相应的值。

为 Livy API 批处理配置 Visual Studio Code

  1. 在 Fabric Lakehouse 中选择“Lakehouse 设置”。

    屏幕截图显示湖屋设置。

  2. 导航到“Livy 终结点”部分。

    屏幕截图显示湖屋 Livy 终结点和会话作业连接字符串。

  3. 将批处理作业连接字符串(图像中的第二个红色框)复制到代码。

  4. 导航到 Microsoft Entra 管理中心,并将应用程序(客户端)ID 和目录(租户)ID 复制到代码。

    屏幕截图显示 Microsoft Entra 管理中心中的 Livy API 应用概述。

创建 Spark Batch 代码并上传到 Lakehouse

  1. 在 Visual Studio Code 中创建 .ipynb 笔记本并插入以下代码

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
        #Spark session builder
        spark_session = (SparkSession
            .builder
            .appName("batch_demo") 
            .getOrCreate())
    
        spark_context = spark_session.sparkContext
        spark_context.setLogLevel("DEBUG")  
    
        tableName = spark_context.getConf().get("spark.targetTable")
    
        if tableName is not None:
            print("tableName: " + str(tableName))
        else:
            print("tableName is None")
    
        df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0")
        df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4))
    
    
        deltaTablePath = f"Tables/{tableName}CleanedTransactions"
        df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. 在本地保存 Python 文件。 此 Python 代码有效负载包含两个处理湖屋中数据的 Spark 语句,需要上传到湖屋。 您需要 ABFS 路径来在 Visual Studio Code 中的 Livy API 批处理作业中引用负载,以及在 Select SQL 语句中引用 Lakehouse 表名称。

    屏幕截图显示 Python 代码单元格。

  3. 将 Python 负载上传到 Lakehouse 的“文件”部分。 在 Lakehouse 资源管理器中,选择 “文件”。 然后选择“ >获取数据>上传文件”。 通过文件选取器选择文件。

    屏幕截图显示湖屋的“文件”部分中的有效负载。

  4. 在文件位于湖屋的“文件”部分中后,请单击有效负载文件名右侧的三个点并选择“属性”。

    屏幕截图显示湖屋中文件的“属性”中的有效负载 ABFS 路径。

  5. 将此 ABFS 路径复制到步骤 1 中的笔记本单元格。

使用 Microsoft Entra 用户令牌或 Microsoft Entra SPN 令牌对 Livy API Spark 批处理会话进行身份验证

使用 Microsoft Entra SPN 令牌对 Livy API Spark 批处理会话进行身份验证

  1. 在 Visual Studio Code 中创建 .ipynb 笔记本并插入以下代码。

    import sys
    from msal import ConfidentialClientApplication
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Service Principal Application ID
    
    # Certificate paths - Update these paths to your certificate files
    certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem"      # Public certificate file
    private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem"      # Private key file
    certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint
    
    # OAuth settings
    audience = "https://analysis.windows.net/powerbi/api/.default"
    authority = f"https://login.windows.net/{tenant_id}"
    
    def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        This function uses certificate-based authentication which is more secure than client secrets.
    
        Args:
            client_id (str): The Service Principal's client ID  
            audience (str): The audience for the token (resource scope)
            authority (str): The OAuth authority URL
            certificate_path (str): Path to the certificate file (.pem format)
            private_key_path (str): Path to the private key file (.pem format)
            certificate_thumbprint (str): Certificate thumbprint (optional but recommended)
    
        Returns:
            str: The access token for API authentication
    
        Raises:
            Exception: If token acquisition fails
        """
        try:
            # Read the certificate from PEM file
            with open(certificate_path, "r", encoding="utf-8") as f:
                certificate_pem = f.read()
    
            # Read the private key from PEM file
            with open(private_key_path, "r", encoding="utf-8") as f:
                private_key_pem = f.read()
    
            # Create the confidential client application
            app = ConfidentialClientApplication(
                client_id=client_id,
                authority=authority,
                client_credential={
                    "private_key": private_key_pem,
                    "thumbprint": certificate_thumbprint,
                    "certificate": certificate_pem
                }
            )
    
            # Acquire token using client credentials flow
            token_response = app.acquire_token_for_client(scopes=[audience])
    
            if "access_token" in token_response:
                print("Successfully acquired access token")
                return token_response["access_token"]
            else:
                raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}")
    
        except FileNotFoundError as e:
            print(f"Certificate file not found: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"Error retrieving token: {e}", file=sys.stderr)
            sys.exit(1)
    
    # Get the access token
    token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)
    
  2. 运行笔记本单元后,您应该会看到返回的 Microsoft Entra 令牌。

    显示运行单元格后返回的 Microsoft Entra SPN 令牌的屏幕截图。

使用 Microsoft Entra 用户令牌对 Livy API Spark 会话进行身份验证

  1. 在 Visual Studio Code 中创建 .ipynb 笔记本并插入以下代码。

    from msal import PublicClientApplication
    import requests
    import time
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Application ID (can be the same as above or different)
    
    # Required scopes for Microsoft Fabric API access
    scopes = [
        "https://api.fabric.microsoft.com/Lakehouse.Execute.All",      # Execute operations in lakehouses
        "https://api.fabric.microsoft.com/Lakehouse.Read.All",        # Read lakehouse metadata
        "https://api.fabric.microsoft.com/Item.ReadWrite.All",        # Read/write fabric items
        "https://api.fabric.microsoft.com/Workspace.ReadWrite.All",   # Access workspace operations
        "https://api.fabric.microsoft.com/Code.AccessStorage.All",    # Access storage from code
        "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All",     # Access Azure Key Vault
        "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", # Access Azure Data Explorer
        "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All",     # Access Azure Data Lake
        "https://api.fabric.microsoft.com/Code.AccessFabric.All"             # General Fabric access
    ]
    
    def get_access_token(tenant_id, client_id, scopes):
        """
        Get an access token using interactive authentication.
    
        This method will open a browser window for user authentication.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID
            client_id (str): The application client ID
            scopes (list): List of required permission scopes
    
        Returns:
            str: The access token, or None if authentication fails
        """
        app = PublicClientApplication(
            client_id,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
    
        print("Opening browser for interactive authentication...")
        token_response = app.acquire_token_interactive(scopes=scopes)
    
        if "access_token" in token_response:
            print("Successfully authenticated")
            return token_response["access_token"]
        else:
            print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}")
            return None
    
    # Uncomment the lines below to use interactive authentication
    token = get_access_token(tenant_id, client_id, scopes)
    print("Access token acquired via interactive login")
    
  2. 运行笔记本单元格后,浏览器中应会弹出一个窗口,允许您选择用于登录的身份。

    屏幕截图显示 Microsoft Entra 应用的登录屏幕。

  3. 选择要登录的标识后,需要批准Microsoft Entra 应用注册 API 权限。

    屏幕截图显示 Microsoft Entra 应用 API 权限。

  4. 完成身份验证后,请关闭浏览器窗口。

    屏幕截图显示身份验证完成。

  5. 在 Visual Studio Code 中,应会看到返回的 Microsoft Entra 令牌。

    屏幕截图显示运行单元格并登录后返回的 Microsoft Entra 令牌。

提交 Livy Batch 并监视批处理作业。

  1. 添加另一个笔记本单元格并插入此代码。

    # submit payload to existing batch session
    
    import requests
    import time
    import json
    
    api_base_url = "https://api.fabric.microsoft.com/v1"  # Base URL for Fabric APIs
    
    # Fabric Resource IDs - Replace with your workspace and lakehouse IDs  
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Construct the Livy Batch API URL
    # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches
    livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches"
    
    # Set up authentication headers
    headers = {"Authorization": f"Bearer {token}"}
    
    print(f"Livy Batch API URL: {livy_base_url}")
    
    new_table_name = "TABLE_NAME"  # Name for the new table
    
    # Configure the batch job
    print("Configuring batch job parameters...")
    
    # Batch job configuration - Modify these values for your use case
    payload_data = {
        # Job name - will appear in the Fabric UI
        "name": f"livy_batch_demo_{new_table_name}",
    
        # Path to your Python file in the lakehouse
        "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>",  # Replace with your Python file path
    
        # Optional: Spark configuration parameters
        "conf": {
            "spark.targetTable": new_table_name,  # Custom configuration for your application
        },
    }
    
    print("Batch Job Configuration:")
    print(json.dumps(payload_data, indent=2))
    
    try:
        # Submit the batch job
        print("\nSubmitting batch job...")
        post_batch = requests.post(livy_base_url, headers=headers, json=payload_data)
    
        if post_batch.status_code == 202:
            batch_info = post_batch.json()
            print("Livy batch job submitted successfully!")
            print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}")
    
            # Extract batch ID for monitoring
            batch_id = batch_info['id']
            livy_batch_get_url = f"{livy_base_url}/{batch_id}"
    
            print(f"\nBatch Job ID: {batch_id}")
            print(f"Monitoring URL: {livy_batch_get_url}")
    
        else:
            print(f"Failed to submit batch job. Status code: {post_batch.status_code}")
            print(f"Response: {post_batch.text}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error occurred: {e}")
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response text: {post_batch.text}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  2. 运行笔记本单元格,应会看到在创建并运行 Livy 批处理作业时打印了几行。

    屏幕截图显示成功提交 Livy 批处理作业后 Visual Studio Code 中的结果。

  3. 若要查看更改,请导航回 Lakehouse。

与 Fabric 环境集成

默认情况下,此 Livy API 会话针对工作区的默认初学者池运行。 或者,你可以使用 Fabric 环境。在 Microsoft Fabric 中创建、配置并使用环境来自定义由 Livy API 会话用于这些 Spark 作业的 Spark 池。 若要使用 Fabric 环境,请使用此一行更改更新之前的笔记本单元。

payload_data = {
    "name":"livybatchdemo_with"+ newlakehouseName,
    "file":"abfss://YourABFSPathToYourPayload.py", 
    "conf": {
        "spark.targetLakehouse": "Fabric_LakehouseID",
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}"  # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
        }
    }

在监视中心查看作业

可以通过在左侧导航链接中选择“监视”来访问监视中心,以查看各种 Apache Spark 活动。

  1. 批处理作业处于完成状态后,可以通过导航到“监视”来查看会话状态。

    屏幕截图显示监控中心中以前的 Livy API 提交记录。

  2. 选择并打开最新活动名称。

    屏幕截图显示监视中心中的最新 Livy API 活动。

  3. 在此 Livy API 会话案例中,可以看到以前的批处理提交、运行详细信息、Spark 版本和配置。 请注意右上角的已停止状态。

    屏幕截图,其中显示了监视中心的最新 Livy API 活动详细信息。

若要回顾整个过程,您需要一个远程客户端(例如 Visual Studio Code)、Microsoft Entra 应用令牌、Livy API 端点 URL、对 Lakehouse 的身份验证、Lakehouse 中的 Spark 有效负载,以及批处理 Livy API 会话。