Partager via


Comment créer et mettre à jour une définition de travail Spark au format V2 via l’API REST Microsoft Fabric

Spark Job Definition (SJD) est un type d’élément Fabric qui permet aux utilisateurs de définir et d’exécuter des travaux Apache Spark dans Microsoft Fabric. L’API Spark Job Definition v2 permet aux utilisateurs de créer et de mettre à jour des éléments Spark Job Definition avec un nouveau format appelé SparkJobDefinitionV2. L’avantage principal de l’utilisation du format v2 est qu’il permet aux utilisateurs de gérer le fichier exécutable principal et d’autres fichiers de bibliothèque avec un seul appel d’API, au lieu d’utiliser l’API de stockage pour charger des fichiers séparément, aucun jeton de stockage n’est nécessaire pour gérer les fichiers.

Prerequisites

L’API REST Microsoft Fabric définit un point de terminaison unifié pour les opérations CRUD des éléments Fabric. Le point de terminaison a la valeur https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Aperçu du format de définition de tâche Spark v2

Dans la charge utile de gestion d’un élément Spark Job Definition, le definition champ est utilisé pour spécifier la configuration détaillée de l’élément Spark Job Definition. Le definition champ contient deux sous-champs : format et parts. Le format champ spécifie le format de l’élément Spark Job Definition, qui doit correspondre SparkJobDefinitionV2 au format v2.

Le parts champ est un tableau qui contient la configuration détaillée de l’élément Spark Job Definition. Chaque élément du parts tableau représente une partie de la configuration détaillée. Chaque partie contient trois sous-champs : path, payloadet payloadType. Le path champ spécifie le chemin d’accès de la partie, le payload champ spécifie le contenu de la partie encodé en base64, et le payloadType champ spécifie le type de la charge utile, qui doit être InlineBase64.

Important

Ce format v2 prend uniquement en charge les définitions de travaux Spark avec des formats de fichier de .py ou .scala. Le format de fichier .jar n’est pas pris en charge.

Créer un élément Spark Job Definition avec le fichier de définition principal et d’autres fichiers lib

Dans l’exemple suivant, nous allons créer un élément Spark Job Definition qui :

  1. Le nom est SJDHelloWorld.
  2. Le fichier de définition principal est main.py, qui consiste à lire un fichier CSV à partir de son lakehouse par défaut et à enregistrer sous la forme d’une table Delta dans le même Lakehouse.
  3. Un autre fichier lib est libs.py, qui a une fonction utilitaire pour renvoyer le nom du fichier CSV et la table Delta.
  4. La valeur par défaut de Lakehouse est définie sur un ID d’artefact Lakehouse spécifique.

Voici la charge utile détaillée pour la création de l’élément Spark Job Definition.

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib1.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

Pour décoder ou encoder la configuration détaillée, vous pouvez utiliser les fonctions d’assistance suivantes dans Python. Il existe également d’autres outils en ligne tels que https://www.base64decode.org/ ceux qui peuvent effectuer le même travail.

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Une réponse http code 202 indique que l’élément Spark Job Definition a été créé avec succès.

Obtenir la définition du Job Spark comprenant des éléments de la définition sous le format v2

Avec le nouveau format v2, lors de l’obtention d’un élément Spark Job Definition avec des parties de définition, le contenu du fichier de définition principal et d’autres fichiers lib sont tous inclus dans la charge utile de réponse, encodé en base64 sous le parts champ. Voici un exemple d’obtention d’un élément Spark Job Definition avec des parties de définition :

  1. Tout d’abord, effectuez une requête POST au point de terminaison https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. Vérifiez que la valeur du paramètre de requête de format est SparkJobDefinitionV2.
  2. Ensuite, dans les en-têtes de réponse, vérifiez le code d’état HTTP. Un code HTTP 202 indique que la requête a été acceptée avec succès. Copiez la valeur x-ms-operation-id des en-têtes de réponse.
  3. Enfin, effectuez une requête GET au point de terminaison https://api.fabric.microsoft.com/v1/operations/{operationId} avec la valeur copiée x-ms-operation-id pour obtenir le résultat de l’opération. Dans la charge utile de réponse, le definition champ contient la configuration détaillée de l’élément Spark Job Definition, y compris le fichier de définition principal et d’autres fichiers lib sous le parts champ.

Mettre à jour l’élément Spark Job Definition avec le fichier de définition principal et d’autres fichiers lib au format v2

Pour mettre à jour un élément Spark Job Definition existant avec le fichier de définition principal et d’autres fichiers lib au format v2, vous pouvez utiliser une structure de charge utile similaire à l’opération de création. Voici un exemple de mise à jour de l’élément Spark Job Definition créé dans la section précédente :

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib2.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

Avec la charge utile ci-dessus, les modifications suivantes sont apportées aux fichiers :

  1. Le fichier main.py est mis à jour avec le nouveau contenu.
  2. Le lib1.py est supprimé de cet élément de définition de travail Spark et également supprimé du stockage OneLake.
  3. Un nouveau fichier lib2.py est ajouté à cet élément Spark Job Definition et chargé dans le stockage OneLake.

Pour mettre à jour l’élément Spark Job Definition, effectuez une requête POST au point de terminaison https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} avec la charge utile ci-dessus. Une réponse http code 202 indique que l’élément Spark Job Definition a été mis à jour avec succès.