แชร์ผ่าน


วิธีสร้างและอัปเดต Spark Job Definition ด้วยรูปแบบ V2 ผ่าน Microsoft Fabric REST API

Spark Job Definition (SJD) เป็นรายการ Fabric ชนิดหนึ่งที่อนุญาตให้ผู้ใช้กําหนดและเรียกใช้งาน Apache Spark ใน Microsoft Fabric Spark Job Definition API v2 ช่วยให้ผู้ใช้สามารถสร้างและอัปเดตรายการ Spark Job Definition ด้วยรูปแบบใหม่ที่เรียกว่า SparkJobDefinitionV2. ประโยชน์หลักของการใช้รูปแบบ v2 คือช่วยให้ผู้ใช้สามารถจัดการไฟล์ปฏิบัติการหลักและไฟล์ไลบรารีอื่น ๆ ได้ด้วยการเรียก API เพียงครั้งเดียวแทนที่จะใช้ API ที่เก็บข้อมูลเพื่ออัปโหลดไฟล์แยกต่างหากไม่จําเป็นต้องใช้โทเค็นที่เก็บข้อมูลอีกต่อไปสําหรับการจัดการไฟล์

ข้อกําหนดเบื้องต้น

Microsoft Fabric REST API กําหนดจุดสิ้นสุดแบบรวมสําหรับการดําเนินการ CRUD ของรายการ Fabric ปลายทางคือhttps://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items

ภาพรวมรูปแบบ Spark Job Definition v2

ในเพย์โหลดของการจัดการรายการ Spark Job Definition ฟิลด์จะใช้ definition เพื่อระบุการตั้งค่าโดยละเอียดของรายการ Spark Job Definition ฟิลด์ประกอบด้วยdefinitionฟิลด์ย่อยสองฟิลด์: format และparts formatฟิลด์ระบุรูปแบบของรายการ Spark Job Definition ซึ่งควรเป็นSparkJobDefinitionV2รูปแบบ v2

ฟิลด์เป็น parts อาร์เรย์ที่มีการตั้งค่าโดยละเอียดของรายการ Spark Job Definition แต่ละรายการใน parts อาร์เรย์แสดงถึงส่วนหนึ่งของการตั้งค่าโดยละเอียด แต่ละส่วนประกอบด้วยเขตข้อมูลย่อยสามช่อง: path, , payloadและpayloadType pathฟิลด์ระบุเส้นทางของชิ้นส่วนpayloadฟิลด์ระบุเนื้อหาของชิ้นส่วนที่เข้ารหัส base64 และpayloadTypeฟิลด์ระบุชนิดของเพย์โหลดซึ่งควรเป็นInlineBase64

สําคัญ

รูปแบบ v2 นี้รองรับเฉพาะคําจํากัดความงาน Spark ที่มีรูปแบบไฟล์ .py หรือ .scala เท่านั้น ไม่รองรับรูปแบบไฟล์ .jar

สร้างรายการ Spark Job Definition ด้วยไฟล์คําจํากัดความหลักและไฟล์ lib อื่นๆ

ในตัวอย่างต่อไปนี้ เราจะสร้างรายการ Spark Job Definition ซึ่ง:

  1. ชื่อคือ SJDHelloWorld.
  2. ไฟล์คําจํากัดความหลักคือ main.pyซึ่งก็คือการอ่านไฟล์ CSV จากเลคเฮาส์เริ่มต้นและบันทึกเป็นตารางเดลต้ากลับไปที่เลคเฮาส์เดียวกัน
  3. ไฟล์ LIB อื่น ๆ คือ libs.pyซึ่งมีฟังก์ชันยูทิลิตี้เพื่อส่งคืนชื่อของไฟล์ CSV และตารางเดลต้า
  4. Lakehouse เริ่มต้นถูกตั้งค่าเป็น ID สิ่งประดิษฐ์ Lakehouse ที่เฉพาะเจาะจง

ต่อไปนี้เป็นเพย์โหลดโดยละเอียดสําหรับการสร้างรายการ Spark Job Definition

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib1.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

ในการถอดรหัสหรือเข้ารหัสการตั้งค่าโดยละเอียด คุณสามารถใช้ฟังก์ชันตัวช่วยต่อไปนี้ใน Python นอกจากนี้ยังมีเครื่องมือออนไลน์อื่น ๆ เช่น https://www.base64decode.org/ ที่สามารถทํางานเดียวกันได้

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

การตอบสนองรหัส HTTP 202 บ่งชี้ว่ารายการ Spark Job Definition ถูกสร้างขึ้นเรียบร้อยแล้ว

รับคําจํากัดความงาน Spark พร้อมส่วนคําจํากัดความภายใต้รูปแบบ v2

ด้วยรูปแบบ v2 ใหม่ เมื่อได้รับรายการ Spark Job Definition พร้อมส่วนคําจํากัดความ เนื้อหาไฟล์ของไฟล์คําจํากัดความหลักและไฟล์ lib อื่นๆ ทั้งหมดจะรวมอยู่ในเพย์โหลดการตอบสนอง base64 ที่เข้ารหัสภายใต้ parts ฟิลด์ นี่คือตัวอย่างของการรับรายการ Spark Job Definition พร้อมส่วนคําจํากัดความ:

  1. ขั้นแรก ให้ส่งคําขอ POST ไปยังปลายทาง https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. ตรวจสอบให้แน่ใจว่าค่าของพารามิเตอร์คิวรีรูปแบบเป็นSparkJobDefinitionV2
  2. จากนั้นในส่วนหัวของการตอบกลับ ให้ตรวจสอบรหัสสถานะ HTTP รหัส HTTP 202 แสดงว่าคําขอได้รับการยอมรับเรียบร้อยแล้ว คัดลอก x-ms-operation-id ค่าจากส่วนหัวของการตอบกลับ
  3. สุดท้าย ให้ส่งคําขอ GET ไปยังปลายทาง https://api.fabric.microsoft.com/v1/operations/{operationId} ด้วยค่าที่ x-ms-operation-id คัดลอกเพื่อรับผลการดําเนินการ ในเพย์โหลดการตอบสนอง ฟิลด์ประกอบด้วย definition การตั้งค่าโดยละเอียดของรายการ Spark Job Definition รวมถึงไฟล์ข้อกําหนดหลักและไฟล์ lib อื่นๆ ภายใต้ parts ฟิลด์

อัปเดตรายการ Spark Job Definition ด้วยไฟล์คําจํากัดความหลักและไฟล์ lib อื่นๆ ภายใต้รูปแบบ v2

หากต้องการอัปเดตรายการ Spark Job Definition ที่มีอยู่ด้วยไฟล์ข้อกําหนดหลักและไฟล์ lib อื่นๆ ภายใต้รูปแบบ v2 คุณสามารถใช้โครงสร้างเพย์โหลดที่คล้ายกันเป็นการดําเนินการสร้าง นี่คือตัวอย่างของการอัปเดตรายการ Spark Job Definition ที่สร้างขึ้นในส่วนก่อนหน้า:

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib2.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

ด้วยเพย์โหลดข้างต้น จะมีการเปลี่ยนแปลงต่อไปนี้กับไฟล์:

  1. ไฟล์ main.py ได้รับการอัปเดตด้วยเนื้อหาใหม่
  2. lib1.py จะถูกลบออกจากรายการ Spark Job Definition นี้ และยังถูกลบออกจากที่เก็บข้อมูล OneLake ด้วย
  3. ไฟล์ lib2.py ใหม่จะถูกเพิ่มลงในรายการ Spark Job Definition นี้ และอัปโหลดไปยังที่เก็บข้อมูล OneLake

หากต้องการอัปเดตรายการ Spark Job Definition ให้ส่งคําขอ POST ไปยังปลายทาง https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} ด้วยเพย์โหลดด้านบน การตอบกลับรหัส HTTP 202 บ่งชี้ว่ารายการ Spark Job Definition ได้รับการอัปเดตเรียบร้อยแล้ว