如何通过 Microsoft Fabric REST API 创建和更新 V2 格式的 Spark 作业定义

Spark 作业定义(SJD)是一种 Fabric 项类型,允许用户在 Microsoft Fabric 中定义和运行 Apache Spark 作业。 Spark 作业定义 API v2 允许用户使用名为 SparkJobDefinitionV2 的新格式创建和更新 Spark 作业定义项。 使用 v2 格式的主要好处是,它允许用户使用一个 API 调用管理主可执行文件和其他库文件,而不是使用存储 API 单独上传文件,无需再使用存储令牌来管理文件。

先决条件

  • 访问 Fabric REST API 需要Microsoft Entra 令牌。 建议使用 MSAL(Microsoft身份验证库)库来获取令牌。 有关详细信息,请参阅 MSAL 中的身份验证流支持

Microsoft Fabric REST API 定义了用于 Fabric 项 CRUD 操作的统一终结点。 终结点为 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items

Spark 作业定义 v2 格式概述

在管理 Spark 作业定义项的有效负载中, definition 该字段用于指定 Spark 作业定义项的详细设置。 该 definition 字段包含两个子字段: formatparts。 该 format 字段指定 Spark 作业定义项的格式,该项应 SparkJobDefinitionV2 为 v2 格式。

parts 字段是一个数组,其中包含 Spark 作业定义项的详细设置。 数组中的每个 parts 项都表示详细设置的一部分。 每个部分包含三个子字段: pathpayloadpayloadType。 该 path 字段指定部件的路径, payload 字段指定 base64 编码的部件的内容,字段 payloadType 指定有效负载的类型,应为 InlineBase64

重要

此 v2 格式仅支持具有.py或 .scala 文件格式的 Spark 作业定义。 不支持.jar文件格式。

使用主定义文件和其他 lib 文件创建 Spark 作业定义项

在以下示例中,我们将创建一个 Spark 作业定义项,该项如下:

  1. 名称为 SJDHelloWorld.
  2. 主定义文件是 main.py:从默认 Lakehouse 读取 CSV 文件,并将其另存为 Delta 表,并将其保存回同一 Lakehouse。
  3. 其他 lib 文件是 libs.py,它具有一个实用工具函数,用于返回 CSV 文件和 Delta 表的名称。
  4. 默认的 Lakehouse 已指定为特定的 Lakehouse 工件 ID。

下面是用于创建 Spark 作业定义项的详细有效负载。

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib1.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

若要解码或编码详细的设置,可以在 Python 中使用以下帮助程序函数。 还有其他联机工具,例如 https://www.base64decode.org/ 可以执行相同的作业。

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

HTTP 代码 202 响应指示已成功创建 Spark 作业定义项。

获取 Spark 作业定义及其在 v2 格式下的定义部分

使用新的 v2 格式,获取包含定义部件的 Spark 作业定义项时,主定义文件的文件内容和其他 lib 文件都包含在响应有效负载中,base64 编码在 parts 字段下。 下面是获取包含定义部分的 Spark 作业定义项的示例:

  1. 首先,向终结点 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2发出 POST 请求。 确保格式查询参数的值为 SparkJobDefinitionV2.
  2. 然后,在响应标头中,检查 HTTP 状态代码。 HTTP 代码 202 指示请求已成功接受。 从响应标头中复制x-ms-operation-id值。
  3. 最后,使用复制https://api.fabric.microsoft.com/v1/operations/{operationId}的值向终结点x-ms-operation-id发出 GET 请求以获取作结果。 在响应有效负载中,definition 字段包含 Spark 作业定义项的详细设置,包括主定义文件以及 parts 下的其他库文件。

使用 v2 格式的主定义文件和其他库文件更新 Spark 作业定义项

若要以 v2 格式使用主定义文件和其他库文件更新现有 Spark 作业定义项,可以使用与创建操作类似的负载结构。 下面是更新在上一部分创建的 Spark 作业定义项的示例:

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib2.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

当使用上述数据包时,文件将进行以下更改:

  1. main.py 文件使用新内容进行更新。
  2. 从此 Spark 作业定义项中删除了 lib1.py,并且也从 OneLake 存储中删除。
  3. 新的 lib2.py 文件将添加到此 Spark 作业定义项,并上传到 OneLake 存储。

若要更新 Spark 作业定义项,请向具有上述有效负载的终结点 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} 发出 POST 请求。 HTTP 代码 202 响应指示已成功更新 Spark 作业定义项。