Compartilhar via


Desenvolver código de pipeline com Python

O Lakeflow Spark Declarative Pipelines (SDP) apresenta várias novas construções de código Python para definir exibições materializadas e tabelas de streaming em pipelines. O suporte do Python para o desenvolvimento de pipelines se baseia nas noções básicas do PySpark DataFrame e das APIs de Streaming Estruturado.

Para usuários que não estão familiarizados com Python e DataFrames, o Databricks recomenda usar a interface SQL. Consulte Desenvolver código de pipelines declarativos do Lakeflow Spark com SQL.

Para obter uma referência completa da sintaxe do Python do Lakeflow SDP, consulte a referência de linguagem Python do Lakeflow Spark Declarative Pipelines.

Noções básicas de Python para desenvolvimento de pipeline

O código Python que cria conjuntos de dados pipline deve retornar DataFrames.

Todas as APIs do Python do Lakeflow Spark Declarative Pipelines são implementadas no pyspark.pipelines módulo. Seu código de pipeline implementado com Python deve importar o módulo pipelines explicitamente no início do código fonte em Python. Em nossos exemplos, usamos o comando de importação a seguir e usamos dp em exemplos para fazer referência a pipelines.

from pyspark import pipelines as dp

Observação

O Apache Spark™ inclui pipelines declarativos a partir do Spark 4.1, disponíveis por meio do pyspark.pipelines módulo. O Databricks Runtime estende esses recursos de software livre com APIs adicionais e integrações para uso de produção gerenciada.

O código escrito com o módulo de software livre pipelines é executado sem modificação no Azure Databricks. Os seguintes recursos não fazem parte do Apache Spark:

  • dp.create_auto_cdc_flow
  • dp.create_auto_cdc_from_snapshot_flow
  • @dp.expect(...)

O pipeline lê e grava o padrão no catálogo e no esquema especificados durante a configuração do pipeline. Confira Definir o catálogo e o esquema de destino.

O código Python específico do pipeline difere de outros tipos de código Python de uma maneira crítica: o código de pipeline do Python não chama diretamente as funções que executam a ingestão e a transformação de dados para criar conjuntos de dados. Em vez disso, o SDP interpreta as funções de decorador do dp módulo em todos os arquivos de código-fonte configurados em um pipeline e cria um grafo de fluxo de dados.

Importante

Para evitar um comportamento inesperado quando o pipeline é executado, não inclua código que possa ter efeitos colaterais em suas funções que definem conjuntos de dados. Para saber mais, confira a referência do Python.

Criar uma exibição materializada ou uma tabela de streaming com Python

Use @dp.table para criar uma tabela de streaming com base nos resultados de uma leitura de streaming. Use @dp.materialized_view para criar uma exibição materializada com base nos resultados de uma leitura em lote.

Por padrão, os nomes de exibição materializada e de tabela de streaming são inferidos de nomes de função. O exemplo de código a seguir mostra a sintaxe básica para criar uma exibição materializada e uma tabela de streaming:

Observação

Ambas as funções fazem referência à mesma tabela no samples catálogo e usam a mesma função decoradora. Esses exemplos destacam que a única diferença na sintaxe básica para exibições materializadas e tabelas de streaming é usar spark.read versus spark.readStream.

Nem todas as fontes de dados dão suporte a leituras de streaming. Algumas fontes de dados sempre devem ser processadas com semântica de streaming.

from pyspark import pipelines as dp

@dp.materialized_view()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcionalmente, você pode especificar o nome da tabela usando o name argumento no @dp.table decorador. O exemplo a seguir demonstra esse padrão para uma exibição materializada e uma tabela de streaming:

from pyspark import pipelines as dp

@dp.materialized_view(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Carregar dados do armazenamento de objetos

Os pipelines dão suporte ao carregamento de dados de todos os formatos compatíveis com o Azure Databricks. Confira Opções de formato de arquivo.

Observação

Esses exemplos usam dados disponíveis em /databricks-datasets montados automaticamente no seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos de nuvem. Confira O que são volumes do Unity Catalog?.

O Databricks recomenda usar o Carregador Automático e as tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos de nuvem. Confira O que é o Carregador Automático?.

O exemplo a seguir cria uma tabela de streaming de arquivos JSON usando o Carregador Automático:

from pyspark import pipelines as dp

@dp.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

O exemplo a seguir usa a semântica em lote para ler um diretório JSON e criar uma exibição materializada:

from pyspark import pipelines as dp

@dp.materialized_view()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Valide dados com expectativas

Você pode usar expectativas para definir e impor restrições de qualidade de dados. Confira Gerenciar a qualidade dos dados com as expectativas do pipeline.

O código a seguir usa @dp.expect_or_drop para definir uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

from pyspark import pipelines as dp

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Consultar exibições materializadas e tabelas de streaming definidas no pipeline

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma exibição materializada chamada customers que carrega dados CSV.
  • Uma exibição materializada nomeada customer_orders que une registros dos conjuntos de dados orders e customers, converte o carimbo de data/hora do pedido em uma data e seleciona os campos customer_id, order_number, state e order_date.
  • Uma exibição materializada chamada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.

Observação

Ao consultar exibições ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou usar os padrões configurados em seu pipeline. Neste exemplo, as tabelas orders, customerse customer_orders são gravadas e lidas do catálogo e do esquema padrão configurados para o pipeline.

O modo de publicação herdado usa o esquema LIVE para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline. Em novos pipelines, a sintaxe de esquema LIVE é ignorada silenciosamente. Confira Esquema LIVE (herdado).

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dp.materialized_view()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dp.materialized_view()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dp.materialized_view()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Criar tabelas em um for loop

Você pode usar loops do Python for para criar várias tabelas programaticamente. Isso pode ser útil quando você tem muitas fontes de dados ou conjuntos de dados de destino que variam em apenas alguns parâmetros, resultando em menos código total para manter e menos redundância de código.

O for loop avalia a lógica em ordem sequencial, mas após a conclusão do planejamento para os conjuntos de dados, o pipeline executa a lógica em paralelo.

Importante

Ao usar esse padrão para definir conjuntos de dados, certifique-se de que a lista de valores passados para o for loop seja sempre aditiva. Se um conjunto de dados definido anteriormente em um pipeline for omitido de uma execução de pipeline futura, esse conjunto de dados será descartado automaticamente do esquema de destino.

O exemplo a seguir cria cinco tabelas que filtram pedidos de clientes por região. Aqui, o nome da região é usado para definir o nome das exibições materializadas de destino e filtrar os dados de origem. As exibições temporárias são usadas para definir junções das tabelas de origem usadas na construção das exibições materializadas finais.

from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col

@dp.temporary_view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dp.temporary_view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Veja a seguir um exemplo do gráfico de fluxo de dados para esse pipeline:

Um gráfico de fluxo de dados de duas exibições que levam a cinco tabelas regionais.

Solução de problemas: for o loop cria muitas tabelas com os mesmos valores

O modelo de execução lento que os pipelines usam para avaliar o código Python requer que sua lógica faça referência direta a valores individuais quando a função decorada por @dp.materialized_view() for invocada.

O exemplo a seguir demonstra duas abordagens corretas para definir tabelas com um for loop. Em ambos os exemplos, cada nome de tabela da tables lista é explicitamente referenciado na função decorada por @dp.materialized_view().

from pyspark import pipelines as dp

# Create a parent function to set local variables

def create_table(table_name):
  @dp.materialized_view(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dp.materialized_view()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized_view(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

O exemplo a seguir não faz referência a valores corretamente. Este exemplo cria tabelas com nomes distintos, mas todas as tabelas carregam dados do último valor no for loop:

from pyspark import pipelines as dp

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized(name=t_name)
  def create_table():
    return spark.read.table(t_name)

Excluir permanentemente os registros de uma exibição materializada ou tabela de fluxo

Para excluir permanentemente registros de uma exibição materializada ou tabela de streaming com vetores de exclusão habilitados, como para conformidade com GDPR, operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a deleção de registros de uma exibição materializada, verifique Excluir permanentemente registros de uma exibição materializada com vetores de deleção habilitados. Para garantir a exclusão de registros de uma tabela de streaming, consulte Excluir permanentemente registros de uma tabela de streaming.