Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule wymieniono wszystkie interfejsy API obsługiwane przez łącznik magazynu Hive. Wszystkie przedstawione poniżej przykłady są uruchamiane przy użyciu sesji spark-shell oraz Hive Warehouse Connector.
Jak utworzyć sesję łącznika świata danych Hive:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Warunek wstępny
Wykonaj kroki konfiguracji łącznika Hive Warehouse.
Obsługiwane interfejsy API
Ustaw bazę danych:
hive.setDatabase("<database-name>")Wyświetl listę wszystkich baz danych:
hive.showDatabases()Wyświetlanie listy wszystkich tabel w bieżącej bazie danych
hive.showTables()Opisywanie tabeli
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")Usuwanie bazy danych
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)Usuwanie tabeli w bieżącej bazie danych
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)Tworzenie bazy danych
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)Tworzenie tabeli w bieżącej bazie danych
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")Konstruktor dla tabeli create-table obsługuje tylko poniższe operacje:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")// Creates the table createTableBuilder.create()Uwaga
Ten interfejs API tworzy tabelę sformatowaną w formacie ORC w domyślnej lokalizacji. W przypadku innych funkcji/opcji lub tworzenia tabeli przy użyciu zapytań hive użyj interfejsu
executeUpdateAPI.Odczytywanie tabeli
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")Wykonywanie poleceń DDL na serwerze HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean valueWykonywanie zapytania Hive i ładowanie wyniku w zestawie danych
Wykonywanie zapytania za pośrednictwem usług LLAP. [Zalecane]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")Wykonywanie zapytania za pośrednictwem serwera HiveServer2 za pośrednictwem JDBC.
Ustaw
spark.datasource.hive.warehouse.smartExecutionnafalsew konfiguracjach Spark przed rozpoczęciem sesji, aby użyć tego interfejsu APIhive.execute("<hive-query>")
Zamknij sesję łącznika magazynu Hive
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()Wykonywanie zapytania scalania programu Hive
Ten interfejs API tworzy zapytanie łączenia dla Hive w formacie podanym poniżej.
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge queryKonstruktor obsługuje następujące operacje:
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")mergeBuilder.using("<source-expression/table>", "<sourceAlias>")mergeBuilder.on("<onExpr>")mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")mergeBuilder.whenMatchedThenDelete("<deleteExpr>")mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");// Executes the merge query mergeBuilder.merge()Zapisz zestaw danych do tabeli Hive w trybie wsadowym
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()Nazwa_tabeli powinna mieć postać
<db>.<table>lub<table>. Jeśli nie podano nazwy bazy danych, tabela zostanie przeszukana/utworzona w bieżącej bazie danychTypy SaveMode to:
Dołączanie: dołącza zestaw danych do danej tabeli
Zastąp: zastępuje dane w danej tabeli za pomocą zestawu danych
Ignoruj: Pomija zapis, jeśli tabela już istnieje, nie zgłasza błędu.
ErrorIfExists: zgłasza błąd, jeśli tabela już istnieje
Zapisywanie zestawu danych w tabeli Hive przy użyciu technologii HiveStreaming
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()Uwaga
Zapis strumienia zawsze dołącza dane.
Pisanie strumienia Spark do tabeli Hive
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()