Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Para gerenciar efetivamente os dados mantidos no estado, use marcas d'água ao executar o processamento de fluxo com estado em Lakeflow Spark Declarative Pipelines, incluindo agregações, joins e deduplicação. Este artigo descreve como usar marcas d'água em suas consultas de pipeline e inclui exemplos das operações recomendadas.
Observação
Para garantir que as consultas que executam agregações sejam processadas incrementalmente e não totalmente recomputadas com cada atualização, você deve usar marcas d'água.
O que é uma marca d'água?
No processamento de fluxo, uma marca d'água é um recurso do Apache Spark que pode definir um limite baseado em tempo para processar dados ao executar operações com estado, como agregações. Os dados que chegam são processados até que o limite seja atingido, momento em que a janela de tempo definida pelo limite é fechada. As marcas d'água podem ser usadas para evitar problemas durante o processamento de consulta, principalmente ao processar conjuntos de dados maiores ou processamento de longa execução. Esses problemas podem incluir alta latência na produção de resultados e até mesmo erros de OOM (memória insuficiente) devido à quantidade de dados mantidos no estado durante o processamento. Como os dados de streaming são inerentemente não ordenados, as marcas d'água também dão suporte ao cálculo correto de operações como agregações de janela temporal.
Para saber mais sobre como usar marcas d'água no processamento de fluxo, consulte Marca d'água no Streaming Estruturado do Apache Spark e aplique marcas d'água para controlar os limites de processamento de dados.
Como você define uma marca d'água?
Você define uma marca d'água especificando um campo de carimbo de data/hora e um valor que representa o limite de tempo de espera para a chegada de dados que chegam atrasados. Os dados serão considerados atrasados se chegarem após o limite de tempo definido. Por exemplo, se o limite for definido como 10 minutos, os registros que chegam após o limite de 10 minutos poderão ser descartados.
Como os registros que chegam após o limite definido podem ser descartados, a seleção de um limite que atenda aos requisitos de latência versus correção é importante. Escolher um limite menor resulta em registros sendo emitidos mais cedo, mas também significa que registros atrasados são mais propensos a serem descartados. Um limite maior significa uma espera mais longa, mas possivelmente mais integridade dos dados. Devido ao tamanho de estado maior, um limite maior também pode exigir recursos de computação adicionais. Como o valor limite depende de seus dados e requisitos de processamento, testar e monitorar seu processamento é importante para determinar um limite ideal.
Você usa a função withWatermark() no Python para definir uma marca d'água. Em SQL, use a cláusula WATERMARK para definir uma marca d'água.
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Usar marcadores de água com junções de fluxo a fluxo
Para junções de fluxo a fluxo, você deve definir uma marca d'água em ambos os lados da junção e uma condição de intervalo de tempo. Como cada fonte de junção tem uma exibição incompleta dos dados, a cláusula de intervalo de tempo é necessária para informar ao mecanismo de streaming quando não é possível fazer mais correspondências. A cláusula de intervalo de tempo deve usar os mesmos campos usados para a definição das watermarks.
Como pode haver momentos em que cada fluxo requer limites diferentes para marcas d'água, os fluxos não precisam ter os mesmos limites. Para evitar perda de dados, o mecanismo de streaming mantém um watermark global com base no fluxo mais lento.
O exemplo a seguir une um fluxo de impressões de anúncios e um fluxo de cliques do usuário em anúncios. Neste exemplo, um clique deve ocorrer dentro de 3 minutos da impressão. Após o intervalo de tempo de 3 minutos, as linhas da tabela de estado que não puderem mais ser correspondidas serão descartadas.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Executar agregações em janelas com marcas d'água
Uma operação com estado comum em dados de streaming é uma agregação em janelas. As agregações em janelas são semelhantes às agregações agrupadas, exceto que os valores agregados são retornados para o conjunto de linhas que fazem parte da janela definida.
Uma janela pode ser definida como um determinado comprimento e uma operação de agregação pode ser executada em todas as linhas que fazem parte dessa janela. O Spark Streaming dá suporte a três tipos de janelas:
- Janelas em cascata (fixas): uma série de intervalos de tempo de tamanho fixo, não sobrepostos e contíguos. Um registro de entrada pertence a apenas uma única janela.
- Janelas deslizantes: semelhantes às janelas em cascata, as janelas deslizantes são de tamanho fixo, mas as janelas podem se sobrepor e um registro pode cair em várias janelas.
Quando os dados chegam após o final da janela mais o comprimento da marca d'água, nenhum dado novo é aceito para a janela, o resultado da agregação é emitido e o estado da janela é descartado.
O exemplo a seguir calcula uma soma de impressões a cada 5 minutos usando uma janela fixa. Neste exemplo, a cláusula select usa o alias impressions_windowe, em seguida, a própria janela é definida como parte da GROUP BY cláusula. A janela deve ser baseada na mesma coluna de marcador temporal que a marca d'água, a coluna clickTimestamp neste exemplo.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Um exemplo semelhante em Python para calcular o lucro em janelas fixas de horário:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Eliminação de duplicação de registros de streaming
O Streaming Estruturado tem garantias de processamento exatamente uma vez, mas não elimina automaticamente registros duplicados de fontes de dados. Por exemplo, como muitas filas de mensagens possuem garantias de pelo menos uma vez, espera-se registros duplicados ao ler de uma dessas filas de mensagens. Você pode usar a dropDuplicatesWithinWatermark() função para des duplicar registros em qualquer campo especificado, removendo duplicatas de um fluxo mesmo se alguns campos forem diferentes (como hora do evento ou hora de chegada). Para usar a função dropDuplicatesWithinWatermark(), você deve especificar uma marca d'água. Todos os dados duplicados que chegam dentro do intervalo de tempo definido pelo watermark são descartados.
Os dados ordenados são importantes porque os dados fora de ordem fazem com que o valor do marcador de progresso avance incorretamente. Em seguida, quando os dados mais antigos chegam, eles são considerados atrasados e descartados. Use a opção withEventTimeOrder para processar o instantâneo inicial em ordem com base no carimbo de data/hora especificado na marca d'água. A withEventTimeOrder opção pode ser declarada no código que define o conjunto de dados ou nas configurações de pipeline usando spark.databricks.delta.withEventTimeOrder.enabled. Por exemplo:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Observação
A withEventTimeOrder opção tem suporte apenas com Python.
No exemplo a seguir, os dados são processados ordenados pelo clickTimestamp, e os registros que chegam com um intervalo de até 5 segundos entre si, contendo colunas userId e clickAdId duplicadas, são descartados.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Otimizar a configuração de pipeline para processamento com estado
Para ajudar a evitar problemas de produção e latência excessiva, o Databricks recomenda habilitar o gerenciamento de estados baseado em RocksDB para o seu processamento de fluxo com estado, especialmente se o processamento exigir o armazenamento de uma grande quantidade de estado intermediário.
Os pipelines sem servidor gerenciam automaticamente as configurações do repositório de estado.
Você pode habilitar o gerenciamento de estado baseado no RocksDB definindo a seguinte configuração antes de implantar um pipeline:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Para saber mais sobre o repositório de estado do RocksDB, incluindo recomendações de configuração para o RocksDB, consulte Configurar o repositório de estado do RocksDB no Azure Databricks.