Compartilhar via


Padrões de Streaming Estruturado no Azure Databricks

Isso contém notebooks e códigos de exemplo para padrões comuns para trabalhar com o Streaming Estruturado no Azure Databricks.

Introdução ao streaming estruturado

Se você for novato no Fluxo Estruturado, confira Executar sua primeira carga de trabalho de Fluxo Estruturado.

Gravar no Cassandra como um coletor para o Streaming Estruturado no Python

O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalonável e altamente disponível.

O Streaming Estruturado funciona com o Cassandra pelo Conector do Cassandra do Spark. Esse conector dá suporte a APIs RDD e DataFrame e tem suporte nativo para gravar dados de streaming. Importante Você deve usar a versão correspondente do spark-cassandra-connector-assembly.

O exemplo a seguir está conectado a um ou mais hosts em um cluster de banco de dados do Cassandra. Ele também especifica as configurações de conexão, como o local do ponto de verificação e os nomes específicos do keyspace e da tabela:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Escrever para o Azure Synapse Analytics usando foreachBatch() em Python

streamingDF.writeStream.foreachBatch() permite reutilizar os gravadores de dados em lotes existentes para gravar a saída de uma consulta de streaming no Azure Synapse Analytics. Confira a documentação do foreachBatch para obter detalhes.

Para executar este exemplo, você precisa do conector do Azure Synapse Analytics. Para obter detalhes sobre o Azure Synapse Analytics, consulte Dados de consulta no Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Junções de fluxo a fluxo

Esses dois notebooks mostram como usar junções de fluxo a fluxo no Python e Scala.

Junções de fluxo a fluxo no notebook Python

Obter laptop

Junções de fluxo a fluxo no notebook Scala

Obter laptop