Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Isso contém notebooks e códigos de exemplo para padrões comuns para trabalhar com o Streaming Estruturado no Azure Databricks.
Introdução ao streaming estruturado
Se você for novato no Fluxo Estruturado, confira Executar sua primeira carga de trabalho de Fluxo Estruturado.
Gravar no Cassandra como um coletor para o Streaming Estruturado no Python
O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalonável e altamente disponível.
O Streaming Estruturado funciona com o Cassandra pelo Conector do Cassandra do Spark. Esse conector dá suporte a APIs RDD e DataFrame e tem suporte nativo para gravar dados de streaming. Importante Você deve usar a versão correspondente do spark-cassandra-connector-assembly.
O exemplo a seguir está conectado a um ou mais hosts em um cluster de banco de dados do Cassandra. Ele também especifica as configurações de conexão, como o local do ponto de verificação e os nomes específicos do keyspace e da tabela:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Escrever para o Azure Synapse Analytics usando foreachBatch() em Python
streamingDF.writeStream.foreachBatch() permite reutilizar os gravadores de dados em lotes existentes para gravar a saída de uma consulta de streaming no Azure Synapse Analytics. Confira a documentação do foreachBatch para obter detalhes.
Para executar este exemplo, você precisa do conector do Azure Synapse Analytics. Para obter detalhes sobre o Azure Synapse Analytics, consulte Dados de consulta no Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Junções de fluxo a fluxo
Esses dois notebooks mostram como usar junções de fluxo a fluxo no Python e Scala.