Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel werden Spark-basierte Vorgänge erläutert, die vom Hive Warehouse Connector (HWC) unterstützt werden. Alle gezeigten Beispiele werden über die Apache Spark-Shell ausgeführt.
Voraussetzungen
Schließen Sie die Schritte für das Hive Warehouse Connector-Setup ab.
Erste Schritte
Führen Sie die folgenden Schritte aus, um eine spark-shell-Sitzung zu starten:
Verwenden Sie den Befehl ssh, um eine Verbindung mit Ihrem Apache Spark-Cluster herzustellen. Bearbeiten Sie den Befehl, indem Sie CLUSTERNAME durch den Namen Ihres Clusters ersetzen, und geben Sie den Befehl dann ein:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.netFühren Sie in Ihrer SSH-Sitzung den folgenden Befehl aus, und notieren Sie sich die Version von
hive-warehouse-connector-assembly:ls /usr/hdp/current/hive_warehouse_connectorBearbeiten Sie den Code, indem Sie die oben ermittelte Version für
hive-warehouse-connector-assemblyverwenden. Führen Sie anschließend den Befehl aus, um die Spark-Shell zu starten:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=falseNachdem die Spark-Shell gestartet wurde, kann mit den folgenden Befehlen eine Hive Warehouse Connector-Instanz gestartet werden:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Erstellen von Spark-Datenrahmen mithilfe von Hive-Abfragen
Die Ergebnisse aller Abfragen mit Verwendung der HWC-Bibliothek werden als Datenrahmen zurückgegeben. In den folgenden Beispielen wird veranschaulicht, wie Sie eine einfache Hive-Abfrage erstellen.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Bei den Ergebnissen der Abfrage handelt es sich um Spark-Datenrahmen, die mit Spark-Bibliotheken wie MLIB und SparkSQL verwendet werden können.
Schreiben von Spark-Datenrahmen in Hive-Tabellen
Spark bietet keine native Unterstützung für das Schreiben in verwaltete ACID-Tabellen von Hive. Mithilfe von HWC können Sie jedoch alle Datenrahmen in eine Hive-Tabelle schreiben. Diese Funktionalität wird im folgenden Beispiel veranschaulicht:
Erstellen Sie eine Tabelle mit dem Namen
sampletable_colorado, und geben Sie Spalten dafür an, indem Sie den folgenden Befehl verwenden:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()Filtern Sie die Tabelle
hivesampletable, in der die Spaltestateden EintragColoradoenthält. Diese Hive-Abfrage gibt einen Spark-Datenrahmen zurück, und das Ergebnis wird mit der Funktionwritein der Hive-Tabellesampletable_coloradogespeichert.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()Zeigen Sie die Ergebnisse mit dem folgenden Befehl an:
hive.table("sampletable_colorado").show()
Schreibvorgänge per strukturiertem Stream
Mit Hive Warehouse Connector können Sie das Spark-Streaming nutzen, um Daten in Hive-Tabellen zu schreiben.
Wichtig
Strukturierte Streamingschreibvorgänge werden in ESP-fähigen Spark 4.0-Clustern nicht unterstützt.
Mithilfe der Schritte können Sie Daten aus einem Spark-Stream vom Localhost-Port 9999 abrufen und als Hive-Tabelle formatieren. (über den Hive Warehouse Connector).
Starten Sie in der geöffneten Spark-Shell mit dem folgenden Befehl einen Spark-Stream:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()Generieren Sie Daten für den von Ihnen erstellten Spark-Stream, indem Sie die folgenden Schritte ausführen:
- Richten Sie im selben Spark-Cluster eine zweite SSH-Sitzung ein.
- Geben Sie an der Eingabeaufforderung
nc -lk 9999ein: Bei diesem Befehl wird das Hilfsprogrammnetcatverwendet, um Daten über die Befehlszeile an den angegebenen Port zu senden.
Kehren Sie zur ersten SSH-Sitzung zurück, und erstellen Sie für die Streamingdaten eine neue Hive-Tabelle. Geben Sie in spark-shell den folgenden Befehl ein:
hive.createTable("stream_table").column("value","string").create()Schreiben Sie anschließend die Streamingdaten mit dem folgenden Befehl in die neu erstellte Tabelle:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()Wichtig
Aufgrund eines bekannten Problems in Apache Spark müssen die Optionen
metastoreUriunddatabasederzeit manuell festgelegt werden. Weitere Informationen zu diesem Problem finden Sie unter SPARK-25460.Kehren Sie zur zweiten SSH-Sitzung zurück, und geben Sie die folgenden Werte ein:
foo HiveSpark barKehren Sie zur ersten SSH-Sitzung zurück, und beachten Sie die kurze Aktivität. Verwenden Sie zum Anzeigen der Daten den folgenden Befehl:
hive.table("stream_table").show()
Drücken Sie STRG+C, um netcat in der zweiten SSH-Sitzung zu beenden. Verwenden Sie :q, um spark-shell in der ersten SSH-Sitzung zu beenden.