ทํางานกับข้อมูลในกรอบข้อมูล Spark
โดยดั้งเดิม Spark ใช้โครงสร้างข้อมูลที่เรียกว่าชุดข้อมูลแบบกระจายที่ยืดหยุ่น (RDD) แต่เดิมเรียกว่าชุดข้อมูลแบบกระจายที่ยืดหยุ่น แต่ในขณะที่คุณ สามารถเขียนโค้ดที่ทํางานกับ RDD ได้โดยตรง โครงสร้างข้อมูลที่ใช้บ่อยที่สุดสําหรับการทํางานกับข้อมูลที่มีโครงสร้างใน Spark คือ กรอบข้อมูลซึ่งเป็นส่วนหนึ่งของไลบรารี Spark SQL Dataframe ใน Spark จะคล้ายกับกรอบข้อมูลที่อยู่ในไลบรารี pandas Pandas Python แต่ได้รับการปรับให้เหมาะสมเพื่อทํางานในสภาพแวดล้อมการประมวลผลแบบกระจายของ Spark
หมายเหตุ
นอกเหนือจาก Dataframe API แล้ว Spark SQL ยังมี ชุดข้อมูล API ที่รองรับใน Java และ Scala อย่างมาก เราจะมุ่งเน้นที่ Dataframe API ในมอดูลนี้
การโหลดข้อมูลลงในดาต้าเฟรม
เรามาสํารวจตัวอย่างสมมติฐานเพื่อดูว่าคุณสามารถใช้ดาต้าเฟรมในการทํางานกับข้อมูลได้อย่างไร สมมติว่า คุณมีข้อมูลต่อไปนี้ในแฟ้มข้อความที่คั่นด้วยเครื่องหมายจุลภาคชื่อ products.csv ในโฟลเดอร์ แฟ้ม/ข้อมูล ใน lakehouse ของคุณ:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
อนุมานเค้าร่าง
ในสมุดบันทึก Spark คุณสามารถใช้รหัส PySpark ต่อไปนี้เพื่อโหลดข้อมูลไฟล์ลงใน dataframe และแสดง 10 แถวแรก:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
เส้น %%pyspark ที่จุดเริ่มต้นเรียกว่า เวทมนตร์และบอก Spark ว่าภาษาที่ใช้ในเซลล์นี้คือ PySpark คุณสามารถเลือกภาษาที่คุณต้องการใช้เป็นค่าเริ่มต้นในแถบเครื่องมือของอินเทอร์เฟซสมุดบันทึกจากนั้นใช้เวทมนตร์เพื่อแทนที่ตัวเลือกสําหรับเซลล์ที่ระบุ ตัวอย่างเช่น นี่คือรหัสสเกลาที่เทียบเท่าสําหรับตัวอย่างข้อมูลผลิตภัณฑ์:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
%%spark เวทมนตร์ถูกใช้เพื่อระบุสเกลา
ทั้งสองตัวอย่างโค้ดเหล่านี้จะสร้างผลลัพธ์ดังนี้:
| ProductID | ชื่อผลิตภัณฑ์ | ประเภท | ราคาตั้ง |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | จักรยานเสือภูเขา | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | จักรยานเสือภูเขา | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | จักรยานเสือภูเขา | 3399.9900 |
| ... | ... | ... | ... |
การระบุ Schema ที่ชัดเจน
ในตัวอย่างก่อนหน้านี้ แถวแรกของไฟล์ CSV ประกอบด้วยชื่อคอลัมน์ และ Spark สามารถอนุมานชนิดข้อมูลของแต่ละคอลัมน์จากข้อมูลที่มีได้ คุณยังสามารถระบุ Schema ที่ชัดเจนสําหรับข้อมูลซึ่งจะเป็นประโยชน์เมื่อไม่มีชื่อคอลัมน์รวมอยู่ในไฟล์ข้อมูล เช่นตัวอย่าง CSV นี้:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
ตัวอย่าง PySpark ต่อไปนี้แสดงวิธีการระบุ Schema สําหรับ dataframe ที่จะโหลดจากไฟล์ที่มีชื่อ product-data.csv ในรูปแบบนี้:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
ผลลัพธ์จะคล้ายกับ:
| ProductID | ชื่อผลิตภัณฑ์ | ประเภท | ราคาตั้ง |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | จักรยานเสือภูเขา | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | จักรยานเสือภูเขา | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | จักรยานเสือภูเขา | 3399.9900 |
| ... | ... | ... | ... |
เคล็ดลับ
การระบุเค้าร่างที่ชัดเจนยังช่วยปรับปรุงประสิทธิภาพการทํางานด้วย!
การกรองและการจัดกลุ่มกรอบข้อมูล
คุณสามารถใช้วิธีการของคลาส Dataframe เพื่อกรอง เรียงลําดับ จัดกลุ่ม และจัดการข้อมูลที่มีอยู่ได้ ตัวอย่างเช่น ตัวอย่างรหัสต่อไปนี้ใช้วิธีการ เลือก เพื่อดึงคอลัมน์ ProductID และ ListPrice จากดาต้าเฟรม df ที่ประกอบด้วยข้อมูลผลิตภัณฑ์ในตัวอย่างก่อนหน้านี้:
pricelist_df = df.select("ProductID", "ListPrice")
ผลลัพธ์จากตัวอย่างโค้ดนี้จะมีลักษณะดังนี้:
| ProductID | ราคาตั้ง |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
เช่นเดียวกับวิธีการจัดการข้อมูลส่วนใหญ่ เลือก ส่งกลับออบเจ็กต์ dataframe ใหม่
เคล็ดลับ
การเลือกชุดย่อยของคอลัมน์จาก dataframe เป็นการดําเนินการทั่วไปซึ่งสามารถทําได้โดยใช้ไวยากรณ์ที่สั้นกว่าต่อไปนี้:
pricelist_df = df["ProductID", "ListPrice"]
คุณสามารถ "ลูกโซ่" วิธีการรวมกันเพื่อดําเนินการจัดการชุดที่ส่งผลให้มีการแปลง dataframe ตัวอย่างเช่น โค้ดตัวอย่างนี้จะเชื่อมโยง เลือก และ โดยที่ วิธีการสร้างกรอบข้อมูลใหม่ที่มี ชื่อผลิตภัณฑ์ และคอลัมน์ ListPrice สําหรับผลิตภัณฑ์ที่มีประเภทของ จักรยานเสือภูเขา หรือ จักรยานเสือหมาด:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
ผลลัพธ์จากตัวอย่างโค้ดนี้จะมีลักษณะดังนี้:
| ชื่อผลิตภัณฑ์ | ประเภท | ราคาตั้ง |
|---|---|---|
| Mountain-100 Silver, 38 | จักรยานเสือภูเขา | 3399.9900 |
| Road-750 สีดํา, 52 | จักรยานเสือหมอบ | 539.9900 |
| ... | ... | ... |
หากต้องการจัดกลุ่มและรวมข้อมูล คุณสามารถใช้ groupBy เมธอดและฟังก์ชันการรวมได้ ตัวอย่างเช่น รหัส PySpark ต่อไปนี้จะนับจํานวนผลิตภัณฑ์สําหรับแต่ละหมวดหมู่:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
ผลลัพธ์จากตัวอย่างโค้ดนี้จะมีลักษณะดังนี้:
| ประเภท | นับ |
|---|---|
| ชุด หู ฟัง | 3 |
| ล้อ | 14 |
| จักรยานเสือภูเขา | 32 |
| ... | ... |
การบันทึกกรอบข้อมูล
คุณมักจะต้องการใช้ Spark เพื่อแปลงข้อมูลดิบและบันทึกผลลัพธ์สําหรับการวิเคราะห์เพิ่มเติมหรือการประมวลผลปลายทาง ตัวอย่างโค้ดต่อไปนี้จะบันทึก dataFrame ลงในไฟล์ parquet parquet ใน data lake แทนที่ไฟล์ใด ๆ ที่มีอยู่ที่มีชื่อเดียวกัน
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
หมายเหตุ
โดยทั่วไปแล้วรูปแบบ Parquet เป็นที่ต้องการสําหรับไฟล์ข้อมูลที่คุณจะใช้สําหรับการวิเคราะห์หรือการนําเข้าไปยังที่เก็บการวิเคราะห์เพิ่มเติม Parquet เป็นรูปแบบที่มีประสิทธิภาพมากที่ได้รับการสนับสนุนโดยระบบการวิเคราะห์ข้อมูลขนาดใหญ่ส่วนใหญ่ อันที่จริงแล้ว ในบางครั้งความต้องการในการแปลงข้อมูลของคุณอาจเป็นเพียงแค่การแปลงข้อมูลจากรูปแบบอื่น (เช่น CSV) เป็น Parquet!
การแบ่งพาร์ติชันแฟ้มเอาต์พุต
การแบ่งพาร์ติชันเป็นเทคนิคการปรับให้เหมาะสมที่ช่วยให้ Spark เพิ่มประสิทธิภาพสูงสุดในโหนดผู้ปฏิบัติงาน การเพิ่มประสิทธิภาพการทํางานเพิ่มขึ้นสามารถทําได้เมื่อกรองข้อมูลในคิวรีโดยการกําจัด IO ของดิสก์ที่ไม่จําเป็นออก
เมื่อต้องการบันทึก dataframe เป็นชุดไฟล์ที่แบ่งพาร์ติชัน ให้ใช้ partitionBy วิธีการเมื่อเขียนข้อมูล ตัวอย่างต่อไปนี้จะบันทึกกรอบข้อมูล bikes_df (ซึ่งมีข้อมูลผลิตภัณฑ์สําหรับ จักรยานเสือภูเขา และประเภทของ จักรยานเสือห ทาง) และแบ่งพาร์ติชันข้อมูลตามประเภท:
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
ชื่อโฟลเดอร์ที่สร้างขึ้นเมื่อมีการแบ่งพาร์ติชัน dataframe ประกอบด้วยชื่อคอลัมน์และค่าที่เป็นพาร์ติชันในรูปแบบ column=value ดังนั้นตัวอย่างโค้ดจะสร้างโฟลเดอร์ที่ชื่อ bike_data ที่ประกอบด้วยโฟลเดอร์ย่อยต่อไปนี้:
- Category=จักรยานเสือภูเขา
- หมวดหมู่=จักรยานเสือหมอบ
โฟลเดอร์ย่อยแต่ละโฟลเดอร์ประกอบด้วยไฟล์ parquet อย่างน้อยหนึ่งไฟล์ที่มีข้อมูลผลิตภัณฑ์สําหรับหมวดหมู่ที่เหมาะสม
หมายเหตุ
คุณสามารถแบ่งพาร์ติชันข้อมูลตามหลายคอลัมน์ ซึ่งส่งผลให้มีลําดับชั้นของโฟลเดอร์สําหรับแต่ละคีย์การแบ่งพาร์ติชัน ตัวอย่างเช่น คุณอาจแบ่งพาร์ติชันข้อมูลคําสั่งขายตามปีและเดือน เพื่อให้ลําดับชั้นโฟลเดอร์มีโฟลเดอร์สําหรับค่าแต่ละปี ซึ่งจะประกอบด้วยโฟลเดอร์ย่อยสําหรับแต่ละค่าเดือนตามลําดับ
โหลดข้อมูลที่แบ่งพาร์ติชัน
เมื่ออ่านข้อมูลที่มีการแบ่งพาร์ติชันลงใน dataframe คุณสามารถโหลดข้อมูลจากโฟลเดอร์ใด ๆ ภายในลําดับชั้นโดยการระบุค่าที่ชัดเจนหรืออักขระตัวแทนสําหรับเขตข้อมูลที่มีการแบ่งพาร์ติชัน ตัวอย่างต่อไปนี้โหลดข้อมูลสําหรับผลิตภัณฑ์ในประเภท จักรยานเสือหทาง :
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
หมายเหตุ
คอลัมน์การแบ่งพาร์ติชันที่ระบุในเส้นทางของไฟล์จะถูกละเว้นใน dataframe ผลลัพธ์ ผลลัพธ์ที่เกิดจากคิวรีตัวอย่างจะไม่รวมคอลัมน์หมวดหมู่ - หมวดหมู่สําหรับแถวทั้งหมดจะเป็นจักรยานเสือหลบถนน