Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Die Microsoft Fabric-REST-API stellt einen Dienstendpunkt für CRUD-Vorgänge von Fabric-Elementen bereit. In diesem Tutorial wird ein End-to-End-Szenario zum Erstellen und Aktualisieren eines Spark-Auftragsdefinitionsartefakts erläutert. Es sind drei allgemeine Schritte auszuführen:
- Erstellen Sie ein Spark Job Definition-Element mit einem anfänglichen Zustand.
- Laden Sie die Hauptdefinitionsdatei und andere Lib-Dateien hoch.
- Aktualisieren Sie das Spark Job Definition-Element mit der OneLake-URL der Hauptdefinitionsdatei und anderen Lib-Dateien.
Voraussetzungen
- Für den Zugriff auf die Fabric-REST-API ist ein Microsoft Entra-Token erforderlich. Die MSAL-Bibliothek wird zum Abrufen des Token empfohlen. Weitere Informationen finden Sie unter Unterstützung des Authentifizierungsflusses in MSAL.
- Für den Zugriff auf die OneLake-API ist ein Speichertoken erforderlich. Weitere Informationen finden Sie unter MSAL für Python.
Erstellen eines Spark-Auftragsdefinitionselements mit dem Anfangszustand
Die Microsoft Fabric-REST-API definiert einen einheitlichen Endpunkt für CRUD-Vorgänge von Fabric-Elementen. Der Endpunkt ist https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.
Die Elementdetails werden im Anforderungstext angegeben. Hier ist ein Beispiel für den Anforderungstext zum Erstellen eines Spark-Auftragsdefinitionselements:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
In diesem Beispiel heißt SJDHelloWorlddas Spark Job Definition-Element . Das payload Feld ist der base64-codierte Inhalt des detaillierten Setups. Nach der Decodierung lautet der Inhalt:
{
"executableFile":null,
"defaultLakehouseArtifactId":"",
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":null,
"commandLineArguments":"",
"additionalLibraryUris":[],
"language":"",
"environmentArtifactId":null
}
Hier sind zwei Hilfsfunktionen zum Codieren und Decodieren des detaillierten Setups:
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Hier sehen Sie das Codeschnipsel zum Erstellen eines Spark-Auftragsdefinitionselements:
import requests
bearerToken = "<REDACTED>" # Replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
payload = "<REDACTED>"
# Define the payload data for the POST request
payload_data = {
"displayName": "SJDHelloWorld",
"Type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": payload,
"payloadType": "InlineBase64"
}
]
}
}
# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)
Hochladen der Hauptdefinitionsdatei und anderer Bibliotheksdateien
Zum Hochladen der Datei in OneLake-API ist ein Speichertoken erforderlich. Hier ist eine Hilfsfunktion zum Abrufen des Speichertokens:
import msal
def getOnelakeStorageToken():
app = msal.PublicClientApplication(
"<REDACTED>", # This field should be the client ID
authority="https://login.microsoftonline.com/microsoft.com")
result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])
print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")
return result['access_token']
Jetzt wurde ein Spark Job Definition-Element erstellt. Damit sie ausgeführt werden kann, müssen wir die Hauptdefinitionsdatei und die erforderlichen Eigenschaften einrichten. Der Endpunkt zum Hochladen der Datei für dieses Spark-Auftragsdefinitionselement lautet https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}. Die gleiche "workspaceId" aus dem vorherigen Schritt sollte verwendet werden. Der Wert von "sjdartifactid" wurde im Antworttext des vorherigen Schritts gefunden. Hier sehen Sie das Codeschnipsel zum Einrichten der Hauptdefinitionsdatei:
import requests
# Three steps are required: create file, append file, flush file
onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid" # Replace the ID of workspace and artifact with the right one
mainExecutableFile = "main.py" # The name of the main executable file
mainSubFolder = "Main" # The sub folder name of the main executable file. Don't change this value
onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # The URL for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}", # The storage token can be achieved from the helper function above
}
onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
# Request was successful
print(f"Main File '{mainExecutableFile}' was successfully created in OneLake.")
# With the previous step, the main executable file is created in OneLake. Now we need to append the content of the main executable file
appendPosition = 0
appendAction = "append"
### Main File Append.
mainExecutableFileSizeInBytes = 83 # The size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}"
mainFileContents = "<REDACTED>" # The content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83 # The size of the main executable file in bytes, this value should match the size of the mainFileContents
onelakePatchRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}",
"Content-Type": "text/plain"
}
onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
# Request was successful
print(f"Successfully accepted main file '{mainExecutableFile}' append data.")
# With the previous step, the content of the main executable file is appended to the file in OneLake. Now we need to flush the file
flushAction = "flush"
### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
print(f"Successfully flushed main file '{mainExecutableFile}' contents.")
else:
print(onelakeFlushMainFileResponse.json())
Verwenden Sie das gleiche Verfahren, um die anderen Bibliotheksdateien bei Bedarf hochzuladen.
Aktualisieren des Spark-Auftragsdefinitionselements mit der OneLake-URL der Hauptdefinitionsdatei und anderen Bibliotheksdateien
Bisher haben wir ein Spark Job Definition-Element mit einem anfänglichen Zustand erstellt und die Hauptdefinitionsdatei und andere Lib-Dateien hochgeladen. Der letzte Schritt besteht darin, das Spark Job Definition-Element zu aktualisieren, um die URL-Eigenschaften der Hauptdefinitionsdatei und anderer Lib-Dateien festzulegen. Der Endpunkt zum Aktualisieren des Spark-Auftragsdefinitionselements lautet https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}. Es sollten die gleichen Werte für „workspaceId“ und „sjdartifactid“ wie in den vorherigen Schritten verwendet werden. Hier sehen Sie das Codeschnipsel zum Aktualisieren des Spark-Auftragsdefinitionselements:
mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # The workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}" # The workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = '<REDACTED>' # Replace this with the real default lakehouse ID
updateRequestBodyJson = {
"executableFile": mainAbfssPath,
"defaultLakehouseArtifactId": defaultLakehouseId,
"mainClass": "",
"additionalLakehouseIds": [],
"retryPolicy": None,
"commandLineArguments": "",
"additionalLibraryUris": [libsAbfssPath],
"language": "Python",
"environmentArtifactId": None}
# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)
# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)
# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"
updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"
# Define the headers with Bearer authentication
bearerToken = "<REDACTED>" # Replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
# Define the payload data for the POST request
payload_data = {
"displayName": "sjdCreateTest11",
"Type": Type,
"definition": {
"format": format,
"parts": [
{
"path": path,
"payload": updatePayload,
"payloadType": payloadType
}
]
}
}
# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
print("Successfully updated SJD.")
else:
print(response.json())
print(response.status_code)
Zusammenfassend lässt sich feststellen: Es sind sowohl die Fabric-REST-API als auch die OneLake-API erforderlich, um ein Spark-Auftragsdefinitionselement zu erstellen und zu aktualisieren. Die Fabric-REST-API wird verwendet, um das Spark Job Definition-Element zu erstellen und zu aktualisieren. Die OneLake-API wird verwendet, um die Hauptdefinitionsdatei und andere Lib-Dateien hochzuladen. Die Hauptdefinitionsdatei und andere Lib-Dateien werden zuerst in OneLake hochgeladen. Anschließend werden die URL-Eigenschaften der Hauptdefinitionsdatei und anderer Lib-Dateien im Spark Job Definition-Element festgelegt.