Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este artigo mostra operações baseadas em faísca suportadas pelo Hive Warehouse Connector (HWC). Todos os exemplos mostrados serão executados através do shell do Apache Spark.
Pré-requisito
Conclua os passos de configuração do Hive Warehouse Connector.
Começar
Para iniciar uma sessão de spark-shell, execute as seguintes etapas:
Use o comando ssh para se conectar ao cluster do Apache Spark. Edite o comando substituindo CLUSTERNAME pelo nome do cluster e digite o comando:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.netA partir da sua sessão ssh, execute o seguinte comando para verificar a versão
hive-warehouse-connector-assembly:ls /usr/hdp/current/hive_warehouse_connectorEdite o código com a versão
hive-warehouse-connector-assemblyidentificada acima. Em seguida, execute o comando para iniciar o shell do Spark:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=falseDepois de iniciar o spark-shell, uma instância do Hive Warehouse Connector pode ser iniciada usando os seguintes comandos:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Criando DataFrames do Spark usando consultas do Hive
Os resultados de todas as consultas usando a biblioteca HWC são retornados como um DataFrame. Os exemplos a seguir demonstram como criar uma consulta de hive básica.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Os resultados da consulta são Spark DataFrames, que podem ser usados com bibliotecas Spark como MLIB e SparkSQL.
Exportando DataFrames do Spark para tabelas do Hive
O Spark não suporta nativamente a gravação nas tabelas ACID gerenciadas do Hive. No entanto, usando HWC, você pode gravar qualquer DataFrame em uma tabela Hive. Você pode ver essa funcionalidade em ação no exemplo a seguir:
Crie uma tabela chamada
sampletable_coloradoe especifique suas colunas usando o seguinte comando:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()Filtre a tabela
hivesampletableonde a colunastateé igual aColorado. Esta consulta do Hive retorna um Spark DataFrame e o resultado é salvo na tabelasampletable_coloradodo Hive usando a funçãowrite.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()Veja os resultados com o seguinte comando:
hive.table("sampletable_colorado").show()
Gravações de streaming estruturadas
Usando o Hive Warehouse Connector, você pode usar o streaming do Spark para gravar dados em tabelas do Hive.
Importante
Não há suporte para gravações de streaming estruturadas em clusters Spark 4.0 habilitados para ESP.
Siga as etapas para ingerir dados de um fluxo do Spark na porta localhost 9999 em uma tabela Hive via. Conector de armazém do Hive.
A partir do shell aberto do Spark, inicie um fluxo de faísca com o seguinte comando:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()Gere dados para o fluxo do Spark que você criou, executando as seguintes etapas:
- Abra uma segunda sessão SSH no mesmo cluster do Spark.
- No prompt de comando, digite
nc -lk 9999. Este comando usa onetcatutilitário para enviar dados da linha de comando para a porta especificada.
Retorne à primeira sessão SSH e crie uma nova tabela do Hive para armazenar os dados de streaming. No spark-shell, digite o seguinte comando:
hive.createTable("stream_table").column("value","string").create()Em seguida, escreva os dados de streaming na tabela recém-criada usando o seguinte comando:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()Importante
As
metastoreUriopções edatabasedevem ser definidas manualmente devido a um problema conhecido no Apache Spark. Para obter mais informações sobre esse problema, consulte SPARK-25460.Retorne à segunda sessão SSH e insira os seguintes valores:
foo HiveSpark barVolte para a primeira sessão SSH e anote a breve atividade. Use o seguinte comando para exibir os dados:
hive.table("stream_table").show()
Use Ctrl + C para parar netcat na segunda sessão SSH. Use :q para sair do spark-shell na primeira sessão SSH.