適用於:✅Microsoft Fabric 中的資料工程和資料科學
瞭解如何使用適用於網狀架構數據工程的 Livy API 提交 Spark 批次作業。 Livy API 目前不支援 Azure 服務主體 (SPN)。
必要條件
使用如 Visual Studio Code 的遠端用戶端與 Jupyter Notebooks、PySpark 和 適用於 Python 的 Microsoft 驗證庫 (MSAL)。
需要 Microsoft Entra 應用程式令牌才能存取 Fabric Rest API。 向 Microsoft 身分識別平台 註冊應用程式。
您的數據湖屋中有一些數據,本範例使用了來自紐約市計程車暨禮車管理委員會的 green_tripdata_2022_08 parquet 格式檔案載入至數據湖屋。
Livy API 會定義作業的統一端點。 遵循本文範例時,請將佔位符 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID} 和 {Fabric_LakehouseID} 替換為您合適的值。
設定 Visual Studio Code 以支援 Livy API 批次處理
在 Fabric Lakehouse 中選取 [Lakehouse 設定 ]。
移至 Livy 端點 區段。
將 Batch 作業 連接字串 (映像中的第二個紅色方塊) 複製到您的程式代碼。
流覽至 Microsoft Entra 系統管理中心 ,並將應用程式 (用戶端) 識別碼和目錄 (租使用者) 識別碼複製到您的程式碼。
建立 Spark 批次程式碼並上傳至您的 Lakehouse
在 Visual Studio Code 中建立
.ipynb筆記本,並插入下列程式代碼import sys import os from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import col if __name__ == "__main__": #Spark session builder spark_session = (SparkSession .builder .appName("batch_demo") .getOrCreate()) spark_context = spark_session.sparkContext spark_context.setLogLevel("DEBUG") tableName = spark_context.getConf().get("spark.targetTable") if tableName is not None: print("tableName: " + str(tableName)) else: print("tableName is None") df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0") df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4)) deltaTablePath = f"Tables/{tableName}CleanedTransactions" df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)將 Python 檔案儲存在本機。 Python 程式碼載荷包含兩個 Spark 語句,可在 Lakehouse 中處理資料,需要上傳至您的 Lakehouse。 您需要使用 ABFS 路徑來在 Visual Studio Code 的 Livy API 批次作業中引用,並在 Select SQL 陳述式中指定 Lakehouse 資料表名稱。
將 Python 貨載上傳至 Lakehouse 的檔案部分。 在 Lakehouse 瀏覽器中,選取[檔案]。 然後選取 [ >取得資料>] [上傳檔案]。 透過檔案選擇器選取檔案。
當檔案已位於 Lakehouse 的 [檔案] 區段後,按一下您的檔案名稱右邊的三個點,然後選取 [屬性]。
將此 ABFS 路徑複製到步驟 1 中的 Notebook 單元格。
使用 Microsoft Entra 使用者權杖或 Microsoft Entra SPN 權杖來驗證 Livy API Spark 批次工作階段
使用 Microsoft Entra SPN 權杖來驗證 Livy API Spark 批次工作程序
在 Visual Studio Code 中建立
.ipynb筆記本,並插入下列程式代碼。import sys from msal import ConfidentialClientApplication # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Service Principal Application ID # Certificate paths - Update these paths to your certificate files certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem" # Public certificate file private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem" # Private key file certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint # OAuth settings audience = "https://analysis.windows.net/powerbi/api/.default" authority = f"https://login.windows.net/{tenant_id}" def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. This function uses certificate-based authentication which is more secure than client secrets. Args: client_id (str): The Service Principal's client ID audience (str): The audience for the token (resource scope) authority (str): The OAuth authority URL certificate_path (str): Path to the certificate file (.pem format) private_key_path (str): Path to the private key file (.pem format) certificate_thumbprint (str): Certificate thumbprint (optional but recommended) Returns: str: The access token for API authentication Raises: Exception: If token acquisition fails """ try: # Read the certificate from PEM file with open(certificate_path, "r", encoding="utf-8") as f: certificate_pem = f.read() # Read the private key from PEM file with open(private_key_path, "r", encoding="utf-8") as f: private_key_pem = f.read() # Create the confidential client application app = ConfidentialClientApplication( client_id=client_id, authority=authority, client_credential={ "private_key": private_key_pem, "thumbprint": certificate_thumbprint, "certificate": certificate_pem } ) # Acquire token using client credentials flow token_response = app.acquire_token_for_client(scopes=[audience]) if "access_token" in token_response: print("Successfully acquired access token") return token_response["access_token"] else: raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}") except FileNotFoundError as e: print(f"Certificate file not found: {e}") sys.exit(1) except Exception as e: print(f"Error retrieving token: {e}", file=sys.stderr) sys.exit(1) # Get the access token token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)執行筆記本儲存格,您應該會看到返回的 Microsoft Entra 憑證。
使用 Microsoft Entra 使用者權杖驗證 Livy API Spark 會話
在 Visual Studio Code 中建立
.ipynb筆記本,並插入下列程式代碼。from msal import PublicClientApplication import requests import time # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Application ID (can be the same as above or different) # Required scopes for Microsoft Fabric API access scopes = [ "https://api.fabric.microsoft.com/Lakehouse.Execute.All", # Execute operations in lakehouses "https://api.fabric.microsoft.com/Lakehouse.Read.All", # Read lakehouse metadata "https://api.fabric.microsoft.com/Item.ReadWrite.All", # Read/write fabric items "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", # Access workspace operations "https://api.fabric.microsoft.com/Code.AccessStorage.All", # Access storage from code "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", # Access Azure Key Vault "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", # Access Azure Data Explorer "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", # Access Azure Data Lake "https://api.fabric.microsoft.com/Code.AccessFabric.All" # General Fabric access ] def get_access_token(tenant_id, client_id, scopes): """ Get an access token using interactive authentication. This method will open a browser window for user authentication. Args: tenant_id (str): The Azure Active Directory tenant ID client_id (str): The application client ID scopes (list): List of required permission scopes Returns: str: The access token, or None if authentication fails """ app = PublicClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}" ) print("Opening browser for interactive authentication...") token_response = app.acquire_token_interactive(scopes=scopes) if "access_token" in token_response: print("Successfully authenticated") return token_response["access_token"] else: print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}") return None # Uncomment the lines below to use interactive authentication token = get_access_token(tenant_id, client_id, scopes) print("Access token acquired via interactive login")執行筆記本數據格,瀏覽器中應該會出現彈出視窗,讓您選擇要用來登入的身分識別。
選擇要登入時使用的身分識別之後,您必須核准 Microsoft Entra 應用程式註冊 API 許可權。
完成驗證之後關閉瀏覽器視窗。
在 Visual Studio Code 中,您應該會看到傳回 Microsoft Entra 令牌。
提交 Livy 批次並監視批次作業。
新增另一個筆記本數據格,並插入此程序代碼。
# submit payload to existing batch session import requests import time import json api_base_url = "https://api.fabric.microsoft.com/v1" # Base URL for Fabric APIs # Fabric Resource IDs - Replace with your workspace and lakehouse IDs workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Construct the Livy Batch API URL # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches" # Set up authentication headers headers = {"Authorization": f"Bearer {token}"} print(f"Livy Batch API URL: {livy_base_url}") new_table_name = "TABLE_NAME" # Name for the new table # Configure the batch job print("Configuring batch job parameters...") # Batch job configuration - Modify these values for your use case payload_data = { # Job name - will appear in the Fabric UI "name": f"livy_batch_demo_{new_table_name}", # Path to your Python file in the lakehouse "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>", # Replace with your Python file path # Optional: Spark configuration parameters "conf": { "spark.targetTable": new_table_name, # Custom configuration for your application }, } print("Batch Job Configuration:") print(json.dumps(payload_data, indent=2)) try: # Submit the batch job print("\nSubmitting batch job...") post_batch = requests.post(livy_base_url, headers=headers, json=payload_data) if post_batch.status_code == 202: batch_info = post_batch.json() print("Livy batch job submitted successfully!") print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}") # Extract batch ID for monitoring batch_id = batch_info['id'] livy_batch_get_url = f"{livy_base_url}/{batch_id}" print(f"\nBatch Job ID: {batch_id}") print(f"Monitoring URL: {livy_batch_get_url}") else: print(f"Failed to submit batch job. Status code: {post_batch.status_code}") print(f"Response: {post_batch.text}") except requests.exceptions.RequestException as e: print(f"Network error occurred: {e}") except json.JSONDecodeError as e: print(f"JSON decode error: {e}") print(f"Response text: {post_batch.text}") except Exception as e: print(f"Unexpected error: {e}")執行筆記本單元格,您應該會看到幾行列印出來,表示 Livy Batch 作業已建立並執行。
若要查看變更,請流覽回您的 Lakehouse。
與網狀架構環境整合
根據預設,此 Livy API 會話會針對工作區的預設起始池執行。 或者,您可以使用 Fabric 環境 建立、設定及使用 Microsoft Fabric 中的環境,以自訂 Livy API 會話所使用的 Spark 作業的 Spark 集區。 若要使用您的 Fabric 環境,請使用此一行變更來更新先前的筆記本儲存格。
payload_data = {
"name":"livybatchdemo_with"+ newlakehouseName,
"file":"abfss://YourABFSPathToYourPayload.py",
"conf": {
"spark.targetLakehouse": "Fabric_LakehouseID",
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}" # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
}
}
在監控集中控制台檢視您的職務
您可以選取左側導覽連結中的 [監視],來存取監視中樞以檢視各種 Apache Spark 活動。
當批次作業完成狀態時,您可以流覽至 [監視] 來檢視會話狀態。
選取並開啟最新的活動名稱。
在此 Livy API 工作階段案例中,您可以看到先前的批次提交、執行詳細數據、Spark 版本和設定。 請注意右上方的已停止狀態。
若要回顧整個程式,您需要遠端用戶端,例如 Visual Studio Code、Microsoft Entra 應用程式令牌、Livy API 端點 URL、對 Lakehouse 的 Spark 驗證、Lakehouse 中的 Spark 承載,以及最後一個批次 Livy API 會話。