Ändern und Speichern von Datenframes
Apache Spark stellt das Dataframe--Objekt als primäre Struktur für das Arbeiten mit Daten bereit. Sie können Datenframes verwenden, um Daten abzufragen und zu transformieren und die Ergebnisse in einem Datensee zu speichern. Um Daten in einen Dataframe zu laden, verwenden Sie die spark.read-Funktion, wobei sie das Dateiformat, den Pfad und optional das Schema der zu lesenden Daten angeben. Der folgende Code lädt beispielsweise Daten aus allen .csv Dateien im Bestellungen Ordner in einen Datenframe namens order_details und zeigt dann die ersten fünf Datensätze an.
order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))
Transformieren der Datenstruktur
Nachdem Sie die Quelldaten in einen Dataframe geladen haben, können Sie die Methoden und Spark-Funktionen des Dataframe-Objekts verwenden, um sie zu transformieren. Typische Vorgänge in einem Datenframe umfassen:
- Filtern von Zeilen und Spalten
- Umbenennen von Spalten
- Erstellen neuer Spalten, häufig abgeleitet von vorhandenen Spalten
- Ersetzen von Null- oder anderen Werten
Im folgenden Beispiel verwendet der Code die funktion split, um die Werte in der Spalte CustomerName in zwei neue Spalten namens FirstName- und Nachname-zu trennen. Anschließend wird die drop-Methode verwendet, um die ursprüngliche CustomerName- Spalte zu löschen.
from pyspark.sql.functions import split, col
# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")
display(transformed_df.limit(5))
Sie können die gesamte Leistungsfähigkeit der Spark SQL-Bibliothek verwenden, um die Daten zu transformieren, indem Sie Zeilen filtern, Spalten entfernen, umbenennen und alle anderen erforderlichen Datenänderungen anwenden.
Speichern der transformierten Daten
Nachdem sich Ihr dataFrame in der erforderlichen Struktur befindet, können Sie die Ergebnisse in einem unterstützten Format in Ihrem Data Lake speichern.
Im folgenden Codebeispiel wird der DataFrame in einer Parquet-Datei im Data Lake gespeichert, wobei jede vorhandene Datei mit demselben Namen ersetzt wird.
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")
Hinweis
Das Parquet-Format wird in der Regel für Datendateien verwendet, die Sie für die weitere Analyse oder Erfassung in einem Analysespeicher verwenden. Parquet ist ein sehr effizientes Format, das von den meisten umfangreichen Datenanalysesystemen unterstützt wird. Tatsächlich kann Ihre Datentransformation manchmal einfach darin bestehen, Daten aus einem anderen Format (z. B. CSV) in Parquet zu konvertieren!