Transformieren von Daten mit SQL
Die SparkSQL-Bibliothek, die die Datenframestruktur bereitstellt, ermöglicht ihnen auch die Verwendung von SQL als Möglichkeit zum Arbeiten mit Daten. Mit diesem Ansatz können Sie Daten in Datenframes abfragen und transformieren, indem Sie SQL-Abfragen verwenden und die Ergebnisse als Tabellen speichern.
Hinweis
Tabellen sind Metadatenabstraktionen über Dateien. Die Daten werden nicht in einer relationalen Tabelle gespeichert, aber die Tabelle stellt eine relationale Ebene über Dateien im Datensee bereit.
Definieren von Tabellen und Ansichten
Tabellendefinitionen in Spark werden im Metastoregespeichert, einer Metadatenebene, die relationale Abstraktionen über Dateien kapselt. Externe Tabellen sind relationale Tabellen im Metastore, die auf Dateien in einem von Ihnen angegebenen Data Lake-Speicherort verweisen. Sie können auf diese Daten zugreifen, indem Sie die Tabelle abfragen oder die Dateien direkt aus dem Datensee lesen.
Hinweis
Externe Tabellen sind „lose an die zugrunde liegenden Dateien gebunden“, und durch das Löschen der Tabelle werden die Dateien nicht gelöscht. Auf diese Weise können Sie Spark verwenden, um die schweren Transformationen zu erledigen und dann die Daten im See zu speichern. Danach können Sie die Tabelle ablegen und nachgelagerte Prozesse können auf diese optimierten Strukturen zugreifen. Sie können auch verwalteten Tabellen definieren, für die die zugrunde liegenden Datendateien an einem intern verwalteten Speicherort gespeichert werden, der dem Metastore zugeordnet ist. Verwaltete Tabellen sind "eng gebunden" an die Dateien und das Ablegen einer verwalteten Tabelle löscht die zugehörigen Dateien.
Im folgenden Codebeispiel wird ein Datenframe (aus CSV-Dateien geladen) als externer Tabellenname sales_ordersgespeichert. Die Dateien werden im Ordner /sales_orders_table im Data Lake gespeichert.
order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')
Verwenden von SQL zum Abfragen und Transformieren der Daten
Nach dem Definieren einer Tabelle können Sie SQL verwenden, um die Daten abzufragen und zu transformieren. Der folgende Code erstellt zwei neue abgeleitete Spalten namens Year und Month und erstellt dann eine neue Tabelle transformed_orders mit den neuen abgeleiteten Spalten hinzugefügt.
# Create derived columns
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")
# Save the results
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')
Die Datendateien für die neue Tabelle werden in einer Hierarchie von Ordnern mit dem Format Year=*NNN* / Month=*N*gespeichert, wobei jeder Ordner eine Parkettdatei für die entsprechenden Bestellungen nach Jahr und Monat enthält.
Metaspeicher abfragen
Da diese neue Tabelle im Metastore erstellt wurde, können Sie sql verwenden, um sie direkt mit dem %%sql Magischen Schlüssel in der ersten Zeile abzufragen, um anzugeben, dass die SQL-Syntax wie im folgenden Skript dargestellt verwendet wird:
%%sql
SELECT * FROM transformed_orders
WHERE Year = 2021
AND Month = 1
Löschen von Tabellen
Wenn Sie mit externen Tabellen arbeiten, können Sie den Befehl DROP verwenden, um die Tabellendefinitionen aus dem Metastore zu löschen, ohne dass sich dies auf die Dateien im Datensee auswirkt. Mit diesem Ansatz können Sie den Metastore bereinigen, nachdem Sie SQL zum Transformieren der Daten verwendet haben, während die transformierten Datendateien für nachgelagerte Datenanalyse- und Aufnahmeprozesse verfügbar sind.
%%sql
DROP TABLE transformed_orders;
DROP TABLE sales_orders;