Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
O Apache HBase normalmente é consultado com sua API de baixo nível (scans, gets, e puts) ou com uma sintaxe SQL usando o Apache Phoenix. O Apache também fornece o Apache Spark HBase Connector. O conector é uma alternativa conveniente e eficiente para consultar e modificar dados armazenados pelo HBase.
Pré-requisitos
Dois clusters HDInsight separados implantados na mesma rede virtual. Um HBase e um Spark com pelo menos o Spark 2.1 (HDInsight 3.6) instalado. Para obter mais informações, consulte Criar clusters baseados em Linux no HDInsight usando o portal do Azure.
O esquema de URI para o armazenamento primário de clusters. Esse esquema seria wasb:// para o Armazenamento de Blobs do Azure,
abfs://para o Azure Data Lake Storage Gen2 ou adl:// para o Azure Data Lake Storage Gen1. Se a transferência segura estiver habilitada para o Armazenamento de Blob, o URI seráwasbs://. Consulte também, transferência segura.
Processo geral
O processo de alto nível para permitir que o cluster do Spark consulte o cluster HBase é o seguinte:
- Prepare alguns dados de amostra no HBase.
- Adquira o arquivo hbase-site.xml da pasta de configuração do cluster HBase (/etc/hbase/conf) e coloque uma cópia do hbase-site.xml na pasta de configuração do Spark 2 (/etc/spark2/conf). (OPCIONAL: use o script fornecido pela equipe do HDInsight para automatizar esse processo)
- Execute
spark-shella referência ao Spark HBase Connector por suas coordenadas Maven napackagesopção. - Defina um catálogo que mapeie o esquema do Spark para o HBase.
- Interaja com os dados do HBase usando as APIs RDD ou DataFrame.
Preparar dados de exemplo no Apache HBase
Nesta etapa, você cria e preenche uma tabela no Apache HBase que pode ser consultada usando o Spark.
Use o
sshcomando para se conectar ao cluster HBase. Edite o comando substituindoHBASECLUSTERpelo nome do cluster HBase e digite o comando:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.netUse o
hbase shellcomando para iniciar o shell interativo do HBase. Digite o seguinte comando em sua conexão SSH:hbase shellUse o
createcomando para criar uma tabela HBase com famílias de duas colunas. Introduza o seguinte comando:create 'Contacts', 'Personal', 'Office'Use o
putcomando para inserir valores em uma coluna especificada em uma linha especificada em uma tabela específica. Introduza o seguinte comando:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'Use o
exitcomando para parar o shell interativo do HBase. Introduza o seguinte comando:exit
Executar scripts para configurar a conexão entre clusters
Para configurar a comunicação entre clusters, siga as etapas para executar dois scripts em seus clusters. Esses scripts automatizarão o processo de cópia de arquivos descrito na seção 'Configurar a comunicação manualmente'.
- O script executado a partir do cluster HBase carregará
hbase-site.xmlas informações de mapeamento IP do HBase para o armazenamento padrão anexado ao cluster do Spark. - O script executado a partir do cluster do Spark configura dois trabalhos cron para executar dois scripts auxiliares periodicamente:
- HBase cron job – faça download de novos
hbase-site.xmlarquivos e mapeamento de IP do HBase da conta de armazenamento padrão do Spark para o nó local - Spark cron job – verifica se ocorreu um dimensionamento do Spark e se o cluster é seguro. Em caso afirmativo, edite
/etc/hostspara incluir o mapeamento IP do HBase armazenado localmente
- HBase cron job – faça download de novos
NOTA: Antes de continuar, certifique-se de que adicionou a conta de armazenamento do cluster Spark ao cluster HBase como conta de armazenamento secundário. Certifique-se de que os scripts estão em ordem, conforme indicado.
Use a Ação de script no cluster do HBase para aplicar as alterações com as seguintes considerações:
Property valor Bash script URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.shTipo(s) de nó(s) País/Região Parâmetros -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAMEPersistiu sim -
SECONDARYS_STORAGE_URLé a url do armazenamento padrão do lado do Spark. Exemplo de parâmetro:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
-
Use a Ação de Script no cluster do Spark para aplicar as alterações com as seguintes considerações:
Property valor Bash script URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.shTipo(s) de nó(s) Chefe, Trabalhador, Zookeeper Parâmetros -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)Persistiu sim - Você pode especificar com que frequência deseja que esse cluster verifique automaticamente se está atualizado. Padrão:
-s “*/1 * * * *” -h 0(Neste exemplo, o trabalho cron do Spark é executado a cada minuto, enquanto o cron do HBase não é executado) - Como o cron do HBase não está configurado por padrão, você precisa executar esse script novamente ao executar o dimensionamento para o cluster do HBase. Se o cluster do HBase for dimensionado com frequência, você poderá optar por configurar o trabalho cron do HBase automaticamente. Por exemplo:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"configura o script para executar verificações a cada 30 minutos. Isso executará o cronograma cron do HBase periodicamente para automatizar o download de novas informações do HBase na conta de armazenamento comum para o nó local.
- Você pode especificar com que frequência deseja que esse cluster verifique automaticamente se está atualizado. Padrão:
Nota
Esses scripts funcionam apenas em clusters HDI 5.0 e HDI 5.1.
Configurar a comunicação manualmente (Opcional, se o script fornecido na etapa acima falhar)
NOTA: Estas etapas precisam ser executadas sempre que um dos clusters passa por uma atividade de dimensionamento.
Copie o hbase-site.xml do armazenamento local para a raiz do armazenamento padrão do cluster Spark. Edite o comando para refletir sua configuração. Em seguida, da sessão SSH aberta para o cluster HBase, digite o comando:
Valor da sintaxe Novo valor Esquema de URI Modifique para refletir seu armazenamento. A sintaxe é para armazenamento de blob com transferência segura habilitada. SPARK_STORAGE_CONTAINERSubstitua pelo nome do contêiner de armazenamento padrão usado para o cluster do Spark. SPARK_STORAGE_ACCOUNTSubstitua pelo nome da conta de armazenamento padrão usado para o cluster do Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/Em seguida, saia da conexão ssh para o cluster HBase.
exitConecte-se ao nó principal do cluster do Spark usando SSH. Edite o comando substituindo
SPARKCLUSTERpelo nome do cluster do Spark e digite o comando:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.netInsira o comando para copiar
hbase-site.xmldo armazenamento padrão do cluster Spark para a pasta de configuração do Spark 2 no armazenamento local do cluster:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Execute o Spark Shell fazendo referência ao Spark HBase Connector
Depois de concluir a etapa anterior, você poderá executar o shell do Spark, fazendo referência à versão apropriada do Spark HBase Connector.
Como exemplo, a tabela a seguir lista duas versões e os comandos correspondentes que a equipe do HDInsight usa atualmente. Você pode usar as mesmas versões para seus clusters se as versões do HBase e do Spark forem as mesmas indicadas na tabela.
Na sessão SSH aberta para o cluster do Spark, digite o seguinte comando para iniciar um shell do Spark:
Versão do Spark HDI HBase versão Versão SHC Comando 2.1 IDH 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/Mantenha essa instância do shell do Spark aberta e continue para Definir um catálogo e uma consulta. Se você não encontrar os jars que correspondem às suas versões no repositório SHC Core, continue lendo.
Para combinações subsequentes das versões Spark e HBase, esses artefatos não são mais publicados no repositório acima. Você pode construir os frascos diretamente da ramificação GitHub spark-hbase-connector . Por exemplo, se você estiver executando com o Spark 2.4 e o HBase 2.1, conclua estas etapas:
Clone o repo:
git clone https://github.com/hortonworks-spark/shcIr para ramo-2.4:
git checkout branch-2.4Compilar a partir da ramificação (cria um arquivo .jar):
mvn clean package -DskipTestsExecute o seguinte comando (certifique-se de alterar o nome .jar que corresponde ao arquivo .jar que você criou):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*Mantenha esta instância do shell do Spark aberta e continue para a próxima seção.
Definir um catálogo e uma consulta
Nesta etapa, você define um objeto de catálogo que mapeia o esquema do Apache Spark para o Apache HBase.
No Spark Shell aberto, insira as seguintes
importinstruções:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._Digite o comando abaixo para definir um catálogo para a tabela Contatos que você criou no HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMarginO código:
- Define um esquema de catálogo para a tabela do HBase chamada
Contacts. - Identifica a chave de linha como
key, e mapeia os nomes de coluna usados no Spark para a família de colunas, o nome da coluna e o tipo de coluna usados no HBase. - Define a chave de linha em detalhes como uma coluna nomeada (
rowkey), que tem uma famíliacfde colunas específica derowkey.
- Define um esquema de catálogo para a tabela do HBase chamada
Insira o comando para definir um método que forneça um DataFrame ao redor de sua
Contactstabela no HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }Crie uma instância do DataFrame:
val df = withCatalog(catalog)Consulte o DataFrame:
df.show()Você verá duas linhas de dados:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+Registre uma tabela temporária para que você possa consultar a tabela do HBase usando o Spark SQL:
df.createTempView("contacts")Emita uma consulta SQL na
contactstabela:spark.sqlContext.sql("select personalName, officeAddress from contacts").showVocê deve ver resultados como estes:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Inserir novos dados
Para inserir um novo registro de contato, defina uma
ContactRecordclasse:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )Crie uma instância de
ContactRecorde coloque-a em uma matriz:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContactSalve a matriz de novos dados no HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()Examine os resultados:
df.show()Deverá ver um resultado como este:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+Feche o shell de faísca digitando o seguinte comando:
:q