次の方法で共有


Microsoft Fabric REST API を使用して Spark ジョブ定義を作成および更新する方法

Microsoft Fabric REST API は、Fabric 項目の CRUD 操作用のサービス エンドポイントを提供します。 このチュートリアルでは、Spark ジョブ定義成果物を作成および更新する方法のエンド ツー エンドのシナリオの手順について説明します。 3 つの大まかなステップが含まれます。

  1. 何らかの初期状態の Spark ジョブ定義項目を作成します。
  2. メイン定義ファイルとその他の lib ファイルをアップロードします。
  3. Spark ジョブ定義項目を、メイン定義ファイルとその他の lib ファイルの OneLake URL で更新します。

前提条件

  • Fabric REST API にアクセスするには、Microsoft Entra トークンが必要です。 トークンを取得するには、MSAL ライブラリをお勧めします。 詳細については「MSAL での認証フローのサポート」を参照してください。
  • OneLake API にアクセスするには、ストレージ トークンが必要です。 詳細については、Python 用 MSAL に関するページを参照してください。

初期状態で Spark ジョブ定義項目を作成する

Microsoft Fabric REST API は、Fabric 項目の CRUD 操作用の統合エンドポイントを定義します。 エンドポイントが https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items です。

アイテムの詳細は、要求本文内で指定されます。 Spark ジョブ定義項目を作成するための要求本文の例を次に示します。

{
    "displayName": "SJDHelloWorld",
    "type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload": "<REDACTED>",
                "payloadType": "InlineBase64"
            }
        ]
    }
}

この例では、Spark ジョブ定義項目の名前は SJDHelloWorld です。 payload フィールドは、詳細なセットアップの base64 でエンコードされたコンテンツです。 デコード後、コンテンツは次のようになります。

{
    "executableFile":null,
    "defaultLakehouseArtifactId":"",
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":null,
    "commandLineArguments":"",
    "additionalLibraryUris":[],
    "language":"",
    "environmentArtifactId":null
}

詳細設定のエンコードとデコードのために、次の 2 つのヘルパー関数があります。

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Spark ジョブ定義項目を作成するコード スニペットを次に示します。

import requests

bearerToken = "<REDACTED>"  # Replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

payload = "<REDACTED>"

# Define the payload data for the POST request
payload_data = {
    "displayName": "SJDHelloWorld",
    "Type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload": payload,
                "payloadType": "InlineBase64"
            }
        ]
    }
}

# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)

メイン定義ファイルと他の lib ファイルをアップロードする

OneLake にファイルをアップロードするには、ストレージ トークンが必要です。 ストレージ トークンを取得するヘルパー関数を次に示します。

import msal

def getOnelakeStorageToken():
    app = msal.PublicClientApplication(
        "<REDACTED>",  # This field should be the client ID 
        authority="https://login.microsoftonline.com/microsoft.com")

    result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])

    print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")

    return result['access_token']

これで、Spark ジョブ定義項目が作成されました。 実行可能にするには、メイン定義ファイルと必要なプロパティを設定する必要があります。 この SJD 項目のファイルをアップロードするためのエンドポイントは https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid} です。 前の手順と同じ "workspaceId" を使用する必要があります。 "sjdartifactid" の値は、前の手順の応答本文にあります。 メイン定義ファイルを設定するコード スニペットを次に示します。

import requests

# Three steps are required: create file, append file, flush file

onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"  # Replace the ID of workspace and artifact with the right one
mainExecutableFile = "main.py"  # The name of the main executable file
mainSubFolder = "Main"  # The sub folder name of the main executable file. Don't change this value


onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file"  # The URL for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",  # The storage token can be achieved from the helper function above
}

onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
    # Request was successful
    print(f"Main File '{mainExecutableFile}' was successfully created in OneLake.")

# With the previous step, the main executable file is created in OneLake. Now we need to append the content of the main executable file

appendPosition = 0
appendAction = "append"

### Main File Append.
mainExecutableFileSizeInBytes = 83  # The size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}"
mainFileContents = "<REDACTED>"  # The content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83  # The size of the main executable file in bytes, this value should match the size of the mainFileContents

onelakePatchRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",
    "Content-Type": "text/plain"
}

onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
    # Request was successful
    print(f"Successfully accepted main file '{mainExecutableFile}' append data.")

# With the previous step, the content of the main executable file is appended to the file in OneLake. Now we need to flush the file

flushAction = "flush"

### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
    print(f"Successfully flushed main file '{mainExecutableFile}' contents.")
else:
    print(onelakeFlushMainFileResponse.json())

必要な場合は、同じプロセスで他の lib ファイルをアップロードします。

メイン定義ファイルと他の lib ファイルの OneLake URL を使用して Spark ジョブ定義項目を更新する

これまでは、何らかの初期状態の Spark ジョブ定義項目を作成し、メイン定義ファイルとその他の lib ファイルをアップロードしてきました。 最後の手順では、Spark ジョブ定義項目を更新して、メイン定義ファイルとその他の lib ファイルの URL プロパティを設定します。 Spark ジョブ定義項目を更新するためのエンドポイントは https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} です。 前のステップと同じ "workspaceId" と "sjdartifactid" を使う必要があります。 Spark ジョブ定義項目を更新するコード スニペットを次に示します。

mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}"  # The workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}"  # The workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = '<REDACTED>'  # Replace this with the real default lakehouse ID

updateRequestBodyJson = {
    "executableFile": mainAbfssPath,
    "defaultLakehouseArtifactId": defaultLakehouseId,
    "mainClass": "",
    "additionalLakehouseIds": [],
    "retryPolicy": None,
    "commandLineArguments": "",
    "additionalLibraryUris": [libsAbfssPath],
    "language": "Python",
    "environmentArtifactId": None}

# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)

# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)

# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"

updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"

# Define the headers with Bearer authentication
bearerToken = "<REDACTED>"  # Replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

# Define the payload data for the POST request
payload_data = {
    "displayName": "sjdCreateTest11",
    "Type": Type,
    "definition": {
        "format": format,
        "parts": [
            {
                "path": path,
                "payload": updatePayload,
                "payloadType": payloadType
            }
        ]
    }
}


# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
    print("Successfully updated SJD.")
else:
    print(response.json())
    print(response.status_code)

プロセス全体をまとめると、Spark ジョブ定義項目を作成および更新するには、Fabric REST API と OneLake API の両方が必要です。 Fabric REST API は、Spark ジョブ定義項目の作成と更新に使用されます。 OneLake API は、メイン定義ファイルとその他の lib ファイルをアップロードするために使用されます。 メイン定義ファイルと他の lib ファイルを最初に OneLake にアップロードします。 次に、メイン定義ファイルと他の lib ファイルの URL プロパティを Spark ジョブ定義項目に設定します。