Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article liste toutes les API prises en charge par Hive Warehouse Connector. Tous les exemples présentés ci-dessous sont exécutés à l’aide de l’interpréteur de commandes Spark et d’une session Hive Warehouse Connector.
Comment créer une session Hive Warehouse Connector :
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Configuration requise
Suivez les étapes de Configuration de Hive Warehouse Connector.
API prises en charge
Définir la base de données :
hive.setDatabase("<database-name>")Lister toutes les bases de données :
hive.showDatabases()Lister toutes les tables de la base de données active
hive.showTables()Décrire une table
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")Déposer une base de données
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)Supprimer une table de la base de données active
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)Création d'une base de données
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)Créer une table dans la base de données active
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")Le générateur pour la création de table prend en charge uniquement les opérations ci-dessous :
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")// Creates the table createTableBuilder.create()Notes
Cette API crée une table au format ORC à l’emplacement par défaut. Pour d’autres fonctionnalités/options ou pour créer une table à l’aide de requêtes Hive, utilisez l’API
executeUpdate.Lire une table
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")Exécuter des commandes DDL sur HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean valueExécuter une requête Hive et charger le résultat dans un jeu de données
Exécution d’une requête par le biais de démons LLAP. [Recommandé]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")Exécution d’une requête par le biais de HiveServer2 via JDBC.
Définir
spark.datasource.hive.warehouse.smartExecutionsurfalsedans les configurations Spark avant de démarrer la session Spark pour utiliser cette APIhive.execute("<hive-query>")
Fermer une session Hive Warehouse Connector
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()Exécuter une requête de fusion Hive
Cette API crée une requête de fusion Hive au format ci-dessous
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge queryLe générateur prend en charge les opérations suivantes :
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")mergeBuilder.using("<source-expression/table>", "<sourceAlias>")mergeBuilder.on("<onExpr>")mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")mergeBuilder.whenMatchedThenDelete("<deleteExpr>")mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");// Executes the merge query mergeBuilder.merge()Écrire un jeu de données dans la table Hive par lot
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()TableName être au format
<db>.<table>ou<table>. Si aucun nom de base de données n’est fourni, la table est explorée/créée dans la base de données activeLes types SaveMode (Mode d’enregistrement) sont les suivants :
Append : ajoute le jeu de données à la table spécifiée
Overwrite : remplace les données de la table spécifiée par le jeu de données
Ignore : ignore l’écriture si la table existe déjà ; aucune erreur n’est générée
ErrorIfExists : génère une erreur si la table existe déjà
Écrire un jeu de données dans une table Hive à l’aide de HiveStreaming
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()Notes
Les écritures en flux ajoutent toujours des données.
Écriture d’un flux Spark dans une table Hive
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()