หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
Spark Job Definition (SJD) เป็นรายการ Fabric ชนิดหนึ่งที่อนุญาตให้ผู้ใช้กําหนดและเรียกใช้งาน Apache Spark ใน Microsoft Fabric Spark Job Definition API v2 ช่วยให้ผู้ใช้สามารถสร้างและอัปเดตรายการ Spark Job Definition ด้วยรูปแบบใหม่ที่เรียกว่า SparkJobDefinitionV2. ประโยชน์หลักของการใช้รูปแบบ v2 คือช่วยให้ผู้ใช้สามารถจัดการไฟล์ปฏิบัติการหลักและไฟล์ไลบรารีอื่น ๆ ได้ด้วยการเรียก API เพียงครั้งเดียวแทนที่จะใช้ API ที่เก็บข้อมูลเพื่ออัปโหลดไฟล์แยกต่างหากไม่จําเป็นต้องใช้โทเค็นที่เก็บข้อมูลอีกต่อไปสําหรับการจัดการไฟล์
ข้อกําหนดเบื้องต้น
- จําเป็นต้องใช้โทเค็น Microsoft Entra เพื่อเข้าถึง Fabric REST API แนะนําให้ใช้ไลบรารี MSAL (Microsoft Authentication Library) เพื่อรับโทเค็น สําหรับข้อมูลเพิ่มเติม โปรดดู การสนับสนุนโฟลว์การรับรองความถูกต้องใน MSAL
Microsoft Fabric REST API กําหนดจุดสิ้นสุดแบบรวมสําหรับการดําเนินการ CRUD ของรายการ Fabric ปลายทางคือhttps://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items
ภาพรวมรูปแบบ Spark Job Definition v2
ในเพย์โหลดของการจัดการรายการ Spark Job Definition ฟิลด์จะใช้ definition เพื่อระบุการตั้งค่าโดยละเอียดของรายการ Spark Job Definition ฟิลด์ประกอบด้วยdefinitionฟิลด์ย่อยสองฟิลด์: format และparts
formatฟิลด์ระบุรูปแบบของรายการ Spark Job Definition ซึ่งควรเป็นSparkJobDefinitionV2รูปแบบ v2
ฟิลด์เป็น parts อาร์เรย์ที่มีการตั้งค่าโดยละเอียดของรายการ Spark Job Definition แต่ละรายการใน parts อาร์เรย์แสดงถึงส่วนหนึ่งของการตั้งค่าโดยละเอียด แต่ละส่วนประกอบด้วยเขตข้อมูลย่อยสามช่อง: path, , payloadและpayloadType
pathฟิลด์ระบุเส้นทางของชิ้นส่วนpayloadฟิลด์ระบุเนื้อหาของชิ้นส่วนที่เข้ารหัส base64 และpayloadTypeฟิลด์ระบุชนิดของเพย์โหลดซึ่งควรเป็นInlineBase64
สําคัญ
รูปแบบ v2 นี้รองรับเฉพาะคําจํากัดความงาน Spark ที่มีรูปแบบไฟล์ .py หรือ .scala เท่านั้น ไม่รองรับรูปแบบไฟล์ .jar
สร้างรายการ Spark Job Definition ด้วยไฟล์คําจํากัดความหลักและไฟล์ lib อื่นๆ
ในตัวอย่างต่อไปนี้ เราจะสร้างรายการ Spark Job Definition ซึ่ง:
- ชื่อคือ
SJDHelloWorld. - ไฟล์คําจํากัดความหลักคือ
main.pyซึ่งก็คือการอ่านไฟล์ CSV จากเลคเฮาส์เริ่มต้นและบันทึกเป็นตารางเดลต้ากลับไปที่เลคเฮาส์เดียวกัน - ไฟล์ LIB อื่น ๆ คือ
libs.pyซึ่งมีฟังก์ชันยูทิลิตี้เพื่อส่งคืนชื่อของไฟล์ CSV และตารางเดลต้า - Lakehouse เริ่มต้นถูกตั้งค่าเป็น ID สิ่งประดิษฐ์ Lakehouse ที่เฉพาะเจาะจง
ต่อไปนี้เป็นเพย์โหลดโดยละเอียดสําหรับการสร้างรายการ Spark Job Definition
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib1.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
ในการถอดรหัสหรือเข้ารหัสการตั้งค่าโดยละเอียด คุณสามารถใช้ฟังก์ชันตัวช่วยต่อไปนี้ใน Python นอกจากนี้ยังมีเครื่องมือออนไลน์อื่น ๆ เช่น https://www.base64decode.org/ ที่สามารถทํางานเดียวกันได้
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
การตอบสนองรหัส HTTP 202 บ่งชี้ว่ารายการ Spark Job Definition ถูกสร้างขึ้นเรียบร้อยแล้ว
รับคําจํากัดความงาน Spark พร้อมส่วนคําจํากัดความภายใต้รูปแบบ v2
ด้วยรูปแบบ v2 ใหม่ เมื่อได้รับรายการ Spark Job Definition พร้อมส่วนคําจํากัดความ เนื้อหาไฟล์ของไฟล์คําจํากัดความหลักและไฟล์ lib อื่นๆ ทั้งหมดจะรวมอยู่ในเพย์โหลดการตอบสนอง base64 ที่เข้ารหัสภายใต้ parts ฟิลด์ นี่คือตัวอย่างของการรับรายการ Spark Job Definition พร้อมส่วนคําจํากัดความ:
- ขั้นแรก ให้ส่งคําขอ POST ไปยังปลายทาง
https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. ตรวจสอบให้แน่ใจว่าค่าของพารามิเตอร์คิวรีรูปแบบเป็นSparkJobDefinitionV2 - จากนั้นในส่วนหัวของการตอบกลับ ให้ตรวจสอบรหัสสถานะ HTTP รหัส HTTP 202 แสดงว่าคําขอได้รับการยอมรับเรียบร้อยแล้ว คัดลอก
x-ms-operation-idค่าจากส่วนหัวของการตอบกลับ - สุดท้าย ให้ส่งคําขอ GET ไปยังปลายทาง
https://api.fabric.microsoft.com/v1/operations/{operationId}ด้วยค่าที่x-ms-operation-idคัดลอกเพื่อรับผลการดําเนินการ ในเพย์โหลดการตอบสนอง ฟิลด์ประกอบด้วยdefinitionการตั้งค่าโดยละเอียดของรายการ Spark Job Definition รวมถึงไฟล์ข้อกําหนดหลักและไฟล์ lib อื่นๆ ภายใต้partsฟิลด์
อัปเดตรายการ Spark Job Definition ด้วยไฟล์คําจํากัดความหลักและไฟล์ lib อื่นๆ ภายใต้รูปแบบ v2
หากต้องการอัปเดตรายการ Spark Job Definition ที่มีอยู่ด้วยไฟล์ข้อกําหนดหลักและไฟล์ lib อื่นๆ ภายใต้รูปแบบ v2 คุณสามารถใช้โครงสร้างเพย์โหลดที่คล้ายกันเป็นการดําเนินการสร้าง นี่คือตัวอย่างของการอัปเดตรายการ Spark Job Definition ที่สร้างขึ้นในส่วนก่อนหน้า:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib2.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
ด้วยเพย์โหลดข้างต้น จะมีการเปลี่ยนแปลงต่อไปนี้กับไฟล์:
- ไฟล์ main.py ได้รับการอัปเดตด้วยเนื้อหาใหม่
- lib1.py จะถูกลบออกจากรายการ Spark Job Definition นี้ และยังถูกลบออกจากที่เก็บข้อมูล OneLake ด้วย
- ไฟล์ lib2.py ใหม่จะถูกเพิ่มลงในรายการ Spark Job Definition นี้ และอัปโหลดไปยังที่เก็บข้อมูล OneLake
หากต้องการอัปเดตรายการ Spark Job Definition ให้ส่งคําขอ POST ไปยังปลายทาง https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} ด้วยเพย์โหลดด้านบน การตอบกลับรหัส HTTP 202 บ่งชี้ว่ารายการ Spark Job Definition ได้รับการอัปเดตเรียบร้อยแล้ว