Partilhar via


Como criar e atualizar uma Definição de Trabalho Spark com formato V2 através da API REST do Microsoft Fabric

A Definição de Trabalho Spark (SJD) é um tipo de item Fabric que permite aos utilizadores definir e executar trabalhos Apache Spark no Microsoft Fabric. A API de Definição de Trabalhos Spark v2 permite aos utilizadores criar e atualizar itens de Definição de Trabalhos Spark com um novo formato chamado SparkJobDefinitionV2. O principal benefício de usar o formato v2 é que permite aos utilizadores gerir o ficheiro executável principal e outros ficheiros da biblioteca com uma única chamada API; em vez de usar a API de armazenamento para carregar ficheiros separadamente, não é necessário mais token de armazenamento para gerir ficheiros.

Pré-requisitos

  • É necessário um token Microsoft Entra para aceder à API REST do Fabric. Recomenda-se a biblioteca MSAL (Microsoft Authentication Library) para obter o token. Para mais informações, veja Suporte a fluxo de autenticação em MSAL.

A API REST do Microsoft Fabric define um endpoint unificado para operações CRUD dos itens do Fabric. O ponto de extremidade é https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Visão geral do formato da Definição de Trabalho Spark v2

Na gestão de um item de Definição de Trabalho Spark, o campo definition é utilizado para especificar a configuração detalhada do item de Definição de Trabalho Spark. O definition campo contém dois subcampos: format e parts. O format campo especifica o formato do item de Definição de Trabalho Spark, que deve ser SparkJobDefinitionV2 para o formato v2.

O parts campo é um array que contém a configuração detalhada do item de Definição de Trabalho Spark. Cada elemento do parts array representa uma parte da configuração detalhada. Cada parte contém três subcampos: path, payload, e payloadType. O path campo especifica o caminho da peça, o payload campo especifica o conteúdo da peça codificada em base64, e o payloadType campo especifica o tipo da carga útil, que deve ser InlineBase64.

Importante

Este formato v2 suporta apenas Definições de Trabalho Spark com formatos de ficheiro .py ou .scala. O formato de ficheiro .jar não é suportado.

Crie um item de definição de trabalho Spark com o arquivo principal de definição e outros ficheiros de biblioteca

No exemplo seguinte, vamos criar uma Definição de Tarefa Spark que:

  1. O nome é SJDHelloWorld.
  2. O ficheiro principal de definição é main.py, que consiste em ler um ficheiro CSV do seu Lakehouse predefinido e guardar como tabela Delta de volta no mesmo Lakehouse.
  3. Outro ficheiro lib é libs.py, que tem uma função utilitária para devolver o nome do ficheiro CSV e da tabela Delta.
  4. A Lakehouse por defeito está definida para um ID de artefacto específico da Lakehouse.

Segue-se a carga útil detalhada para criar o item de Definição do Trabalho Spark.

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib1.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

Para decodificar ou codificar a configuração detalhada, pode usar as seguintes funções auxiliares em Python. Existem também outras ferramentas online, como https://www.base64decode.org/, que podem desempenhar a mesma função.

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Uma resposta ao código HTTP 202 indica que o item de Definição de Trabalho Spark foi criado com sucesso.

Obtenha Definição de Trabalho Spark com partes de definição no formato v2

Com o novo formato v2, ao obter um item de Definição de Tarefa Spark com partes de definição, o conteúdo do ficheiro principal de definição e outros ficheiros lib são todos incluídos na carga útil de resposta, codificados em base64 sob o campo parts. Aqui está um exemplo de como obter um item de Definição de Trabalho Spark com componentes da definição:

  1. Primeiro, faça um pedido POST ao endpoint https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. Certifique-se de que o valor do parâmetro de consulta de formato é SparkJobDefinitionV2.
  2. Depois, nos cabeçalhos de resposta, verifica o código de estado HTTP. Um código HTTP 202 indica que o pedido foi aceite com sucesso. Copie o valor x-ms-operation-id dos cabeçalhos de resposta.
  3. Finalmente, faça um pedido GET ao endpoint https://api.fabric.microsoft.com/v1/operations/{operationId} com o valor copiado x-ms-operation-id para obter o resultado da operação. No payload de resposta, o campo definition contém a configuração detalhada do item de Definição de Tarefa Spark, incluindo o ficheiro principal de definição e outros ficheiros de biblioteca sob o campo parts.

Atualize o item de Definição do Trabalho Spark com o ficheiro principal de definição e outros ficheiros de lib sob o formato v2

Para atualizar um item existente de Definição de Job Spark com o ficheiro principal de definição e outros ficheiros lib no formato v2, pode usar uma estrutura de payload semelhante à operação de criação. Aqui está um exemplo de atualização do item de Definição de Trabalho Spark criado na secção anterior:

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib2.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

Com a carga útil acima, são feitas as seguintes alterações aos ficheiros:

  1. O ficheiro main.py é atualizado com novo conteúdo.
  2. A lib1.py é eliminada deste item de Definição de Trabalho Spark e também removida do armazenamento do OneLake.
  3. Um novo ficheiro lib2.py é adicionado a este item de Definição de Trabalho Spark e carregado para o armazenamento do OneLake.

Para atualizar o item de Definição de Trabalho Spark, faça um pedido POST ao endpoint https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} com a carga útil acima. Uma resposta HTTP code 202 indica que o item de Definição de Trabalho Spark foi atualizado com sucesso.