Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo fornece exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Fluxo Estruturado no Azure Databricks. Você pode usar o Streaming Estruturado para cargas de trabalho de processamento incrementais e quase em tempo real.
O Streaming Estruturado é uma das várias tecnologias que alimentam tabelas de streaming no Lakeflow Spark Declarative Pipelines. O Databricks recomenda o uso de Pipelines Declarativas do Lakeflow Spark para todas as novas cargas de trabalho de ETL, ingestão e streaming estruturado. Consulte Pipelines Declarativos do Lakeflow Spark.
Observação
Embora o Lakeflow Spark Declarative Pipelines forneça uma sintaxe ligeiramente modificada para declarar tabelas de streaming, a sintaxe geral para configurar leituras e transformações de streaming se aplica a todos os casos de uso de streaming no Azure Databricks. O Lakeflow Spark Declarative Pipelines também simplifica o streaming gerenciando informações de estado, metadados e várias configurações.
Use o Auto Loader para ler dados de streaming do armazenamento de objetos
O exemplo a seguir demonstra como carregar dados JSON com o Auto Loader, que usa cloudFiles para denotar o formato e as opções. A opção schemaLocation habilita a inferência e a evolução do esquema. Cole o seguinte código em uma célula de notebook do Databricks e execute a célula para criar um DataFrame de streaming chamado raw_df:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Como ocorre com outras operações de leitura no Azure Databricks, configurar um fluxo de leitura na verdade não carrega dados. Você precisa acionar uma ação nos dados antes que o fluxo comece.
Observação
Chamar display() em um DataFrame de streaming inicia uma tarefa de streaming. Para a maioria dos casos de uso do Streaming Estruturado, a ação que dispara um fluxo deve gravar dados em um coletor. Confira Considerações de produção para o Streaming estruturado.
Executar um fluxo de transformação
Structured Streaming oferece suporte à maioria das transformações disponíveis no Azure Databricks e no Spark SQL. Você pode até carregar modelos do MLflow como UDFs e fazer previsões do fluxo como uma transformação.
O exemplo de código a seguir executa uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando funções SQL do Spark:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
O resultado transformed_df contém instruções de consulta para carregar e transformar cada registro à medida que chega à fonte de dados.
Observação
O Fluxo Estruturado trata as fontes de dados como conjuntos de dados não vinculados ou infinitos. Dessa forma, não há suporte para algumas transformações nas cargas de trabalho do Streaming Estruturado, porque elas exigiriam a classificação de um número infinito de itens.
A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Confira Aplicar marcas d’água para controlar os limites do processamento de dados.
Executar uma gravação em lote incremental no Delta Lake
O exemplo a seguir faz uma gravação no Delta Lake usando um caminho de arquivo especificado e um ponto de verificação.
Importante
Lembre-se sempre de especificar um local de ponto de verificação exclusivo para cada gravador de streaming que você configurar. O ponto de verificação fornece a identidade exclusiva para o seu fluxo, acompanhando todos os registros processados e as informações de estado associadas à consulta de streaming.
A configuração availableNow do gatilho instrui o Fluxo Estruturado a processar todos os registros não processados anteriormente do conjunto de dados de origem e a seguir desligar, para que você possa executar com segurança o código seguinte sem se preocupar em deixar um fluxo em execução:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Nesse exemplo, nenhum registro novo chega à nossa fonte de dados, então repetir a execução desse código não ingere novos registros.
Aviso
A execução do Streaming Estruturado pode impedir que o encerramento automático desligue os recursos de computação. Para evitar custos inesperados, certifique-se de finalizar as consultas de streaming.
Ler dados do Delta Lake, transformá-los e gravá-los no Delta Lake
O Delta Lake tem amplo suporte para o uso do Streaming Estruturado, tanto como uma origem quanto como um coletor. Veja Leituras e gravações do streaming de tabela Delta.
O exemplo a seguir mostra um exemplo de sintaxe para carregar incrementalmente todos os novos registros de uma tabela Delta, fazer sua junção com um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Você precisa ter as permissões adequadas configuradas para ler as tabelas de origem e gravar em tabelas de destino e no local de ponto de verificação especificado. Preencha todos os parâmetros indicados com colchetes angulares (<>) usando os valores relevantes para os coletores e as fontes de dados.
Observação
O Lakeflow Spark Declarative Pipelines fornece uma sintaxe totalmente declarativa para criar pipelines do Delta Lake e gerencia propriedades como gatilhos e pontos de verificação automaticamente. Consulte Pipelines Declarativos do Lakeflow Spark.
Ler dados do Kafka, realizar transformação e gravar no Kafka
O Apache Kafka e outros barramentos de mensagens proporcionam algumas das menores latências disponíveis para grandes conjuntos de dados. Você pode usar o Azure Databricks para aplicar transformações aos dados ingeridos do Kafka e, a seguir, gravar os dados no Kafka novamente.
Observação
Gravar dados no armazenamento de objetos na nuvem adiciona uma sobrecarga de latência extra. Se você deseja armazenar dados de um barramento de mensagens no Delta Lake, mas precisa ter a menor latência possível para fluxos de cargas de trabalho, o Databricks recomenda configurar fluxos de trabalho separados para ingerir dados no lakehouse e aplicar transformações quase em tempo real a coletores do barramento de mensagens downstream.
O exemplo de código a seguir demonstra um padrão simples para enriquecer dados do Kafka ao fazer sua junção com os dados de uma tabela Delta e, a seguir, os gravando novamente no Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Você precisa ter permissões adequadas configuradas para o acesso ao seu serviço do Kafka. Preencha todos os parâmetros indicados com colchetes angulares (<>) usando os valores relevantes para os coletores e as fontes de dados. Confira Processamento de fluxos com o Apache Kafka e o Azure Databricks.