Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
A Definição de Trabalho Spark (SJD) é um tipo de item Fabric que permite aos utilizadores definir e executar trabalhos Apache Spark no Microsoft Fabric. A API de Definição de Trabalhos Spark v2 permite aos utilizadores criar e atualizar itens de Definição de Trabalhos Spark com um novo formato chamado SparkJobDefinitionV2. O principal benefício de usar o formato v2 é que permite aos utilizadores gerir o ficheiro executável principal e outros ficheiros da biblioteca com uma única chamada API; em vez de usar a API de armazenamento para carregar ficheiros separadamente, não é necessário mais token de armazenamento para gerir ficheiros.
Pré-requisitos
- É necessário um token Microsoft Entra para aceder à API REST do Fabric. Recomenda-se a biblioteca MSAL (Microsoft Authentication Library) para obter o token. Para mais informações, veja Suporte a fluxo de autenticação em MSAL.
A API REST do Microsoft Fabric define um endpoint unificado para operações CRUD dos itens do Fabric. O ponto de extremidade é https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.
Visão geral do formato da Definição de Trabalho Spark v2
Na gestão de um item de Definição de Trabalho Spark, o campo definition é utilizado para especificar a configuração detalhada do item de Definição de Trabalho Spark. O definition campo contém dois subcampos: format e parts. O format campo especifica o formato do item de Definição de Trabalho Spark, que deve ser SparkJobDefinitionV2 para o formato v2.
O parts campo é um array que contém a configuração detalhada do item de Definição de Trabalho Spark. Cada elemento do parts array representa uma parte da configuração detalhada. Cada parte contém três subcampos: path, payload, e payloadType. O path campo especifica o caminho da peça, o payload campo especifica o conteúdo da peça codificada em base64, e o payloadType campo especifica o tipo da carga útil, que deve ser InlineBase64.
Importante
Este formato v2 suporta apenas Definições de Trabalho Spark com formatos de ficheiro .py ou .scala. O formato de ficheiro .jar não é suportado.
Crie um item de definição de trabalho Spark com o arquivo principal de definição e outros ficheiros de biblioteca
No exemplo seguinte, vamos criar uma Definição de Tarefa Spark que:
- O nome é
SJDHelloWorld. - O ficheiro principal de definição é
main.py, que consiste em ler um ficheiro CSV do seu Lakehouse predefinido e guardar como tabela Delta de volta no mesmo Lakehouse. - Outro ficheiro lib é
libs.py, que tem uma função utilitária para devolver o nome do ficheiro CSV e da tabela Delta. - A Lakehouse por defeito está definida para um ID de artefacto específico da Lakehouse.
Segue-se a carga útil detalhada para criar o item de Definição do Trabalho Spark.
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib1.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
Para decodificar ou codificar a configuração detalhada, pode usar as seguintes funções auxiliares em Python. Existem também outras ferramentas online, como https://www.base64decode.org/, que podem desempenhar a mesma função.
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Uma resposta ao código HTTP 202 indica que o item de Definição de Trabalho Spark foi criado com sucesso.
Obtenha Definição de Trabalho Spark com partes de definição no formato v2
Com o novo formato v2, ao obter um item de Definição de Tarefa Spark com partes de definição, o conteúdo do ficheiro principal de definição e outros ficheiros lib são todos incluídos na carga útil de resposta, codificados em base64 sob o campo parts. Aqui está um exemplo de como obter um item de Definição de Trabalho Spark com componentes da definição:
- Primeiro, faça um pedido POST ao endpoint
https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. Certifique-se de que o valor do parâmetro de consulta de formato éSparkJobDefinitionV2. - Depois, nos cabeçalhos de resposta, verifica o código de estado HTTP. Um código HTTP 202 indica que o pedido foi aceite com sucesso. Copie o valor
x-ms-operation-iddos cabeçalhos de resposta. - Finalmente, faça um pedido GET ao endpoint
https://api.fabric.microsoft.com/v1/operations/{operationId}com o valor copiadox-ms-operation-idpara obter o resultado da operação. No payload de resposta, o campodefinitioncontém a configuração detalhada do item de Definição de Tarefa Spark, incluindo o ficheiro principal de definição e outros ficheiros de biblioteca sob o campoparts.
Atualize o item de Definição do Trabalho Spark com o ficheiro principal de definição e outros ficheiros de lib sob o formato v2
Para atualizar um item existente de Definição de Job Spark com o ficheiro principal de definição e outros ficheiros lib no formato v2, pode usar uma estrutura de payload semelhante à operação de criação. Aqui está um exemplo de atualização do item de Definição de Trabalho Spark criado na secção anterior:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib2.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
Com a carga útil acima, são feitas as seguintes alterações aos ficheiros:
- O ficheiro main.py é atualizado com novo conteúdo.
- A lib1.py é eliminada deste item de Definição de Trabalho Spark e também removida do armazenamento do OneLake.
- Um novo ficheiro lib2.py é adicionado a este item de Definição de Trabalho Spark e carregado para o armazenamento do OneLake.
Para atualizar o item de Definição de Trabalho Spark, faça um pedido POST ao endpoint https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} com a carga útil acima. Uma resposta HTTP code 202 indica que o item de Definição de Trabalho Spark foi atualizado com sucesso.