Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
A API REST do Microsoft Fabric fornece um endpoint de serviço para operações CRUD de itens do Fabric. Neste tutorial, percorremos um cenário de ponta a ponta de como criar e atualizar um artefato de definição de trabalho do Spark. Trata-se de três etapas de alto nível:
- Crie um item de Definição de Tarefa Spark com algum estado inicial.
- Carregar o ficheiro principal de definição e outros ficheiros de biblioteca.
- Atualize o item de Definição de Trabalho Spark com a URL OneLake do ficheiro principal de definição e outros ficheiros de bibliotecas.
Pré-requisitos
- É necessário um token Microsoft Entra para aceder à API REST do Fabric. A biblioteca MSAL é recomendada para obter o token. Para obter mais informações, consulte Suporte ao fluxo de autenticação no MSAL.
- Um token de armazenamento é necessário para acessar a API do OneLake. Para obter mais informações, consulte MSAL para Python.
Criar um item de Definição de Trabalho do Spark com o estado inicial
A API REST do Microsoft Fabric define um endpoint unificado para operações CRUD dos itens do Fabric. O ponto final é https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.
Os detalhes do item são especificados no corpo da requisição. Aqui está um exemplo do corpo da solicitação para criar um item de definição de trabalho do Spark:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
Neste exemplo, o item de Definição de Trabalho Spark é denominado SJDHelloWorld. O payload campo é o conteúdo codificado base64 da configuração detalhada. Após a decodificação, o conteúdo é:
{
"executableFile":null,
"defaultLakehouseArtifactId":"",
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":null,
"commandLineArguments":"",
"additionalLibraryUris":[],
"language":"",
"environmentArtifactId":null
}
Aqui estão duas funções auxiliares para codificar e decodificar a configuração detalhada:
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Aqui está o trecho de código para criar um item de definição de trabalho do Spark:
import requests
bearerToken = "<REDACTED>" # Replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
payload = "<REDACTED>"
# Define the payload data for the POST request
payload_data = {
"displayName": "SJDHelloWorld",
"Type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": payload,
"payloadType": "InlineBase64"
}
]
}
}
# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)
Carregue o arquivo de definição principal e outros arquivos lib
Um token de armazenamento é necessário para carregar o arquivo no OneLake. Aqui está uma função auxiliar para obter o token de armazenamento:
import msal
def getOnelakeStorageToken():
app = msal.PublicClientApplication(
"<REDACTED>", # This field should be the client ID
authority="https://login.microsoftonline.com/microsoft.com")
result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])
print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")
return result['access_token']
Agora temos um item de Definição de Trabalho Spark criado. Para o tornar executável, precisamos de configurar o ficheiro principal de definição e as propriedades necessárias. O ponto de extremidade para carregar o arquivo para este item SJD é https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}. Deve ser usado o mesmo "workspaceId" do passo anterior. O valor de "sjdartifactid" pode ser encontrado no corpo de resposta do passo anterior. Aqui está o trecho de código para configurar o arquivo de definição principal:
import requests
# Three steps are required: create file, append file, flush file
onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid" # Replace the ID of workspace and artifact with the right one
mainExecutableFile = "main.py" # The name of the main executable file
mainSubFolder = "Main" # The sub folder name of the main executable file. Don't change this value
onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # The URL for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}", # The storage token can be achieved from the helper function above
}
onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
# Request was successful
print(f"Main File '{mainExecutableFile}' was successfully created in OneLake.")
# With the previous step, the main executable file is created in OneLake. Now we need to append the content of the main executable file
appendPosition = 0
appendAction = "append"
### Main File Append.
mainExecutableFileSizeInBytes = 83 # The size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}"
mainFileContents = "<REDACTED>" # The content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83 # The size of the main executable file in bytes, this value should match the size of the mainFileContents
onelakePatchRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}",
"Content-Type": "text/plain"
}
onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
# Request was successful
print(f"Successfully accepted main file '{mainExecutableFile}' append data.")
# With the previous step, the content of the main executable file is appended to the file in OneLake. Now we need to flush the file
flushAction = "flush"
### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
print(f"Successfully flushed main file '{mainExecutableFile}' contents.")
else:
print(onelakeFlushMainFileResponse.json())
Siga o mesmo processo para carregar os outros arquivos lib, se necessário.
Atualize o item Definição de trabalho do Spark com a URL do OneLake do arquivo de definição principal e outros arquivos lib
Até agora, criámos um item de Definição de Job Spark com um estado inicial específico e carregámos o ficheiro principal de definição e outros ficheiros de biblioteca. O último passo é atualizar o item de Definição de Job do Spark para definir as propriedades URL do ficheiro principal de definição e de outros ficheiros de lib. O ponto de extremidade para atualizar o item Definição de Trabalho do Spark é https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}. O mesmo "workspaceId" e "sjdartifactid" das etapas anteriores devem ser usados. Aqui está o trecho de código para atualizar o item Definição de trabalho do Spark:
mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # The workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}" # The workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = '<REDACTED>' # Replace this with the real default lakehouse ID
updateRequestBodyJson = {
"executableFile": mainAbfssPath,
"defaultLakehouseArtifactId": defaultLakehouseId,
"mainClass": "",
"additionalLakehouseIds": [],
"retryPolicy": None,
"commandLineArguments": "",
"additionalLibraryUris": [libsAbfssPath],
"language": "Python",
"environmentArtifactId": None}
# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)
# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)
# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"
updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"
# Define the headers with Bearer authentication
bearerToken = "<REDACTED>" # Replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
# Define the payload data for the POST request
payload_data = {
"displayName": "sjdCreateTest11",
"Type": Type,
"definition": {
"format": format,
"parts": [
{
"path": path,
"payload": updatePayload,
"payloadType": payloadType
}
]
}
}
# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
print("Successfully updated SJD.")
else:
print(response.json())
print(response.status_code)
Para recapitular todo o processo, a API REST da malha e a API do OneLake são necessárias para criar e atualizar um item de definição de trabalho do Spark. A API REST do Fabric é usada para criar e atualizar o item de Definição de Trabalho do Spark. A API OneLake é usada para carregar o ficheiro principal de definição e outros ficheiros de lib. O arquivo de definição principal e outros arquivos lib são carregados para OneLake primeiro. Em seguida, as propriedades de URL do arquivo de definição principal e outros arquivos lib são definidas no item Definição de trabalho do Spark.