Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Definición de trabajo de Spark (SJD) es un tipo de elemento de Fabric que permite a los usuarios definir y ejecutar trabajos de Apache Spark en Microsoft Fabric. La API de definición de trabajos de Spark v2 permite a los usuarios crear y actualizar elementos de definición de trabajo de Spark con un nuevo formato denominado SparkJobDefinitionV2. La principal ventaja de usar el formato v2 es que permite a los usuarios administrar el archivo ejecutable principal y otros archivos de biblioteca con una sola llamada API, en lugar de usar la API de almacenamiento para cargar archivos por separado, no se necesita más token de almacenamiento para administrar archivos.
Prerrequisitos
- Se requiere un token de Microsoft Entra para acceder a la API REST de Fabric. Se recomienda la biblioteca MSAL (Biblioteca de autenticación de Microsoft) para obtener el token. Para obtener más información, consulte Compatibilidad con el flujo de autenticación en MSAL.
La API rest de Microsoft Fabric define un punto de conexión unificado para las operaciones CRUD de los elementos de Fabric. El extremo es https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.
Introducción al formato de definición de trabajo de Spark v2
En la carga útil al administrar un Spark Job Definition item, el campo definition se usa para especificar la configuración detallada del Spark Job Definition item. El definition campo contiene dos subcampos: format y parts. El format campo especifica el formato del elemento Definición de trabajo de Spark, que debe ser SparkJobDefinitionV2 para el formato v2.
El parts campo es una matriz que contiene la configuración detallada del elemento Definición de trabajo de Spark. Cada elemento de la parts matriz representa una parte de la configuración detallada. Cada parte contiene tres subcampos: path, payloady payloadType. El path campo especifica la ruta de acceso de la parte, el payload campo especifica el contenido de la parte codificada en base64 y el payloadType campo especifica el tipo de la carga, que debe ser InlineBase64.
Importante
Este formato v2 solo admite definiciones de trabajos de Spark con formatos de archivo de .py o .scala. No se admite el formato de archivo .jar.
Creación de un elemento de definición de trabajo de Spark con el archivo de definición principal y otros archivos lib
En el siguiente ejemplo, crearemos un elemento de definición de un trabajo de Spark que:
- El nombre es
SJDHelloWorld. - El archivo de definición principal es
main.py, el cual lee un archivo CSV desde su instancia predeterminada de Lakehouse y lo guarda como una tabla Delta en la misma instancia. - Otro archivo lib es
libs.py, que tiene una función de utilidad para devolver el nombre del archivo CSV y la tabla Delta. - El valor predeterminado de Lakehouse se establece en un identificador de artefacto de Lakehouse específico.
A continuación se muestra la carga detallada para crear el elemento Definición de Tarea de Spark.
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib1.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
Para descodificar o codificar la configuración detallada, puede usar las siguientes funciones auxiliares en Python. También hay otras herramientas en línea, como https://www.base64decode.org/ que pueden realizar el mismo trabajo.
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Una respuesta de código HTTP 202 indica que el elemento Definición de trabajo de Spark se creó correctamente.
Obtención de la definición de trabajos de Spark con elementos de definición en formato v2
Con el nuevo formato v2, al obtener un elemento de definición de trabajo de Spark con componentes de definición, el contenido del archivo de definición principal y otros archivos de biblioteca se incluyen en la carga de respuesta, codificados como base64 bajo la etiqueta parts. Este es un ejemplo de cómo obtener un elemento de definición de trabajo de Spark con elementos de definición:
- En primer lugar, realice una solicitud POST al punto de conexión
https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. Asegúrese de que el valor del parámetro de consulta de formato esSparkJobDefinitionV2. - A continuación, en los encabezados de respuesta, compruebe el código de estado HTTP. Un código HTTP 202 indica que la solicitud se aceptó correctamente. Copie el
x-ms-operation-idvalor de los encabezados de respuesta. - Por último, realice una solicitud GET al punto de conexión
https://api.fabric.microsoft.com/v1/operations/{operationId}con el valor copiadox-ms-operation-idpara obtener el resultado de la operación. En la carga de respuesta, el campodefinitioncontiene la configuración detallada del elemento Definición de Tarea de Spark, incluido el archivo de definición principal y otros archivos de biblioteca en el campoparts.
Actualice el elemento Definición de trabajo de Spark con el archivo de definición principal y otros archivos lib con formato v2.
Para actualizar un elemento de definición de tarea de Spark existente con el archivo principal de definición y otros archivos lib en el formato v2, puede usar una estructura de carga útil similar a la de la operación de creación. Este es un ejemplo de actualización del elemento Definición de trabajo de Spark creado en la sección anterior:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib2.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
Con la carga anterior, se realizan los siguientes cambios en los archivos:
- El archivo main.py se actualiza con nuevo contenido.
- El lib1.py se elimina de este elemento de definición de trabajo de Spark y también se quita del almacenamiento OneLake.
- Se agrega un nuevo archivo lib2.py a este elemento definición de trabajo de Spark y se carga en el almacenamiento de OneLake.
Para actualizar el elemento Spark Job Definition, realice una solicitud POST al endpoint https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} con la carga útil anterior. Una respuesta de código HTTP 202 indica que el elemento Definición de trabajo de Spark se actualizó correctamente.