Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Na engenharia de dados, o backfilling refere-se ao processo de processamento retroativo de dados históricos por meio de um pipeline de dados que foi projetado para processar dados correntes ou de streaming.
Normalmente, esse é um fluxo separado que envia dados para suas tabelas existentes. A ilustração a seguir mostra um fluxo de backfill enviando dados históricos para as tabelas bronze em seu pipeline.
Alguns cenários que podem exigir uma reposição de dados:
- Processe dados históricos de um sistema herdado para treinar um modelo de ML (machine learning) ou criar um painel de análise de tendências históricas.
- Reprocesse um subconjunto de dados devido a um problema de qualidade de dados com fontes de dados upstream.
- Seus requisitos de negócios foram alterados e você precisa fazer o backup de dados para um período de tempo diferente que não foi coberto pelo pipeline inicial.
- Sua lógica de negócios mudou e você precisa reprocessar dados históricos e atuais.
Há suporte para um backfill no Lakeflow Spark Declarative Pipelines com um fluxo de acréscimo especializado que usa a opção ONCE . Consulte append_flow ou CREATE FLOW (pipelines) para obter mais informações sobre a opção ONCE .
Considerações ao fazer backup de dados históricos em uma tabela de streaming
- Normalmente, acrescente os dados à tabela bronze de streaming. As camadas downstream de prata e ouro coletarão os novos dados da camada de bronze.
- Verifique se o pipeline pode lidar com dados duplicados normalmente caso os mesmos dados sejam acrescentados várias vezes.
- Verifique se o esquema de dados históricos é compatível com o esquema de dados atual.
- Considere o tamanho do volume de dados e o SLA de tempo de processamento necessário e configure adequadamente os tamanhos do cluster e do lote.
Exemplo: Adicionando um backfill a um pipeline existente
Neste exemplo, digamos que você tenha um pipeline que ingere dados brutos de registro de eventos de uma fonte de armazenamento em nuvem, a partir de 01 de janeiro de 2025. Posteriormente, você percebe que deseja preencher retroativamente os dados históricos dos três anos anteriores para casos de uso de relatórios e análises posteriormente. Todos os dados estão em um local, particionados por ano, mês e dia, no formato JSON.
Pipeline inicial
Aqui está o código de pipeline inicial que ingere incrementalmente os dados brutos de registro de eventos do armazenamento em nuvem.
Python
from pyspark import pipelines as dp
source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"
# create a streaming table and the default flow to ingest streaming events
@dp.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
)
SQL
-- create a streaming table and the default flow to ingest streaming events
CREATE OR REFRESH STREAMING LIVE TABLE registration_events_raw AS
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
format => "json",
inferColumnTypes => true,
maxFilesPerTrigger => 100,
schemaEvolutionMode => "addNewColumns",
modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025'; -- safeguard to not process data before begin_year
Aqui, usamos a opção modifiedAfter Carregador Automático para garantir que não estamos processando todos os dados do caminho de armazenamento em nuvem. O processamento incremental é cortado nesse limite.
Dica
Outras fontes de dados, como Kafka, Kinesis e Hubs de Eventos do Azure, têm opções de leitor equivalentes para obter o mesmo comportamento.
Dados de preenchimento retrospectivo dos três anos anteriores
Agora você deseja adicionar um ou mais fluxos para fazer backup de dados anteriores. Neste exemplo, execute as seguintes etapas:
- Utilize o
append oncefluxo. Isso executa um backfill único, sem continuar executando após esse backfill. O código permanece em seu pipeline, e, se o pipeline for totalmente atualizado, o preenchimento de dados será executado novamente. - Crie três fluxos de backfill, um para cada ano (nesse caso, os dados são divididos por ano no caminho). Para Python, parametrizamos a criação dos fluxos, mas no SQL repetimos o código três vezes, uma vez para cada fluxo.
Se você estiver trabalhando em seu próprio projeto e não estiver usando computação sem servidor, talvez queira atualizar o número máximo de trabalhadores para o pipeline. Aumentar o número máximo de trabalhadores garante que você tenha os recursos para processar os dados históricos enquanto continua a processar os dados de streaming atuais dentro do SLA previsto.
Dica
Se você usar a computação sem servidor com dimensionamento automático aprimorado (o padrão), o cluster aumentará automaticamente quando a carga aumentar.
Python
from pyspark import pipelines as dp
source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"
# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
backfill_path = f"{source_root_path}/year={year}/*/*"
@dp.append_flow(
target="registration_events_raw",
once=True,
name=f"flow_registration_events_raw_backfill_{year}",
comment=f"Backfill {year} Raw registration events")
def backfill():
return (
spark
.read
.format("json")
.option("inferSchema", "true")
.load(backfill_path)
)
# create the streaming table
dp.create_streaming_table(name="registration_events_raw", comment="Raw registration events")
# append the original incremental, streaming flow
@dp.append_flow(
target="registration_events_raw",
name="flow_registration_events_raw_incremental",
comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}")
)
# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
setup_backfill_flow(year) # call the previously defined append_flow for each year
SQL
-- create the streaming table
CREATE OR REFRESH STREAMING TABLE registration_events_raw;
-- append the original incremental, streaming flow
CREATE FLOW
registration_events_raw_incremental
AS INSERT INTO
registration_events_raw BY NAME
SELECT * FROM STREAM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
format => "json",
inferColumnTypes => true,
maxFilesPerTrigger => 100,
schemaEvolutionMode => "addNewColumns",
modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025';
-- one time backfill 2024
CREATE FLOW
registration_events_raw_backfill_2024
AS INSERT INTO ONCE
registration_events_raw BY NAME
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/year=2024/*/*",
format => "json",
inferColumnTypes => true
);
-- one time backfill 2023
CREATE FLOW
registration_events_raw_backfill_2023
AS INSERT INTO ONCE
registration_events_raw BY NAME
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/year=2023/*/*",
format => "json",
inferColumnTypes => true
);
-- one time backfill 2022
CREATE FLOW
registration_events_raw_backfill_2022
AS INSERT INTO ONCE
registration_events_raw BY NAME
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/year=2022/*/*",
format => "json",
inferColumnTypes => true
);
Essa implementação realça vários padrões importantes.
Separação de interesses
- O processamento incremental é independente das operações de backfill.
- Cada fluxo tem suas próprias configurações e configurações de otimização.
- Há uma clara distinção entre operações incrementais e de backfill.
Execução controlada
- Usar a opção
ONCEgarante que cada backfill seja executado exatamente uma vez. - O fluxo de backfill permanece no grafo de pipeline, mas fica ocioso após sua conclusão. Ele está pronto para uso na atualização completa, automaticamente.
- Na definição de pipeline, há uma clara trilha de auditoria das operações de backfill.
Otimização de processamento
- Você pode dividir o backfill grande em vários backfills menores para um processamento mais rápido ou para melhor controle do processamento.
- O dimensionamento automático aprimorado ajusta dinamicamente o tamanho do cluster com base na carga atual do cluster.
Evolução do esquema
- Usar
schemaEvolutionMode="addNewColumns"lida com as alterações de esquema de forma eficiente. - Você tem inferência de esquema consistente entre dados históricos e atuais.
- Há uma manipulação segura de novas colunas em dados mais recentes.