Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
O Lakeflow Spark Declarative Pipelines (SDP) apresenta várias novas palavras-chave e funções do SQL para definir exibições materializadas e tabelas de streaming em pipelines. O suporte SQL para o desenvolvimento de pipelines baseia-se nos conceitos básicos do Spark SQL e adiciona suporte para a funcionalidade de Streaming Estruturado.
Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver código de pipeline com Python. O Python oferece suporte a testes e operações mais extensos que são difíceis de implementar com SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.
Para obter uma referência completa da sintaxe sql do pipeline, consulte a referência de linguagem SQL do Pipeline.
Noções básicas de SQL para desenvolvimento de pipeline
O código SQL que cria conjuntos de dados de pipeline usa a CREATE OR REFRESH sintaxe para definir exibições materializadas e tabelas de streaming em relação aos resultados da consulta.
A STREAM palavra-chave indica se a fonte de dados referenciada em uma SELECT cláusula deve ser lida com semântica de streaming.
As leituras e gravações padrão são feitas no catálogo e no esquema especificados durante a configuração do pipeline. Confira Definir o catálogo e o esquema de destino.
O código-fonte do pipeline difere criticamente dos scripts SQL: o SDP avalia todas as definições de conjunto de dados em todos os arquivos de código-fonte configurados em um pipeline e cria um grafo de fluxo de dados antes que as consultas sejam executadas. A ordem das consultas que aparecem nos arquivos de origem define a ordem de avaliação de código, mas não a ordem de execução da consulta.
Criar uma exibição materializada com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma exibição materializada com SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Criar uma tabela de streaming com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de streaming com SQL. Ao ler uma fonte para uma tabela de streaming, a palavra-chave STREAM indica que será usada a semântica de streaming para a origem. Não use a STREAM palavra-chave ao criar uma exibição materializada:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Observação
Use a palavra-chave STREAM para usar a semântica de streaming para ler a fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente de acréscimos. Para ingerir dados que tenham confirmações de alterações, você pode usar Python e a opção SkipChangeCommits para lidar com erros.
Carregar dados do armazenamento de objetos
Os pipelines dão suporte ao carregamento de dados de todos os formatos compatíveis com o Azure Databricks. Confira Opções de formato de arquivo.
Observação
Esses exemplos usam dados disponíveis em /databricks-datasets montados automaticamente no seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos de nuvem. Confira O que são volumes do Unity Catalog?.
O Databricks recomenda usar o Carregador Automático e as tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos de nuvem. Confira O que é o Carregador Automático?.
O SQL usa a read_files função para invocar a funcionalidade do Carregador Automático. Você também deve usar a palavra-chave STREAM para configurar uma leitura de streaming com read_files.
A seguir é descrita a sintaxe para read_files no SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
As opções do Carregador Automático são pares de chave e valor. Para obter detalhes sobre formatos e opções com suporte, consulte Opções.
O exemplo a seguir cria uma tabela de streaming de arquivos JSON usando o Carregador Automático:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
A read_files função também dá suporte à semântica em lote para criar exibições materializadas. O exemplo a seguir usa a semântica em lote para ler um diretório JSON e criar uma exibição materializada:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Valide dados com expectativas
Você pode usar expectativas para definir e impor restrições de qualidade de dados. Confira Gerenciar a qualidade dos dados com as expectativas do pipeline.
O código a seguir define uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Consultar exibições materializadas e tabelas de streaming definidas no pipeline
O exemplo a seguir define quatro conjuntos de dados:
- Uma tabela de streaming chamada
ordersque carrega dados JSON. - Uma exibição materializada chamada
customersque carrega dados CSV. - Uma exibição materializada nomeada
customer_ordersque une registros dos conjuntos de dadosordersecustomers, converte o carimbo de data/hora do pedido em uma data e seleciona os camposcustomer_id,order_number,stateeorder_date. - Uma exibição materializada chamada
daily_orders_by_stateque agrega a contagem diária de pedidos para cada estado.
Observação
Ao consultar exibições ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou usar os padrões configurados em seu pipeline. Neste exemplo, as tabelas orders, customerse customer_orders são gravadas e lidas do catálogo e do esquema padrão configurados para o pipeline.
O modo de publicação herdado usa o esquema LIVE para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline. Em novos pipelines, a sintaxe de esquema LIVE é ignorada silenciosamente. Confira Esquema LIVE (herdado).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Definir uma tabela privada
Você pode usar a PRIVATE cláusula ao criar uma exibição materializada ou uma tabela de streaming. Ao criar uma tabela privada, você cria a tabela, mas não cria os metadados para a tabela. A PRIVATE cláusula instrui o SDP a criar uma tabela disponível para o pipeline, mas que não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela privada persiste durante o tempo de vida do pipeline que a cria, e não apenas uma única atualização.
Tabelas privadas podem ter o mesmo nome que tabelas no catálogo. Se você especificar um nome não qualificado para uma tabela dentro de um pipeline, se houver uma tabela privada e uma tabela de catálogo com esse nome, a tabela privada será usada.
Tabelas privadas foram referenciadas anteriormente como tabelas temporárias.
Excluir permanentemente os registros de uma exibição materializada ou tabela de fluxo
Para excluir permanentemente registros de uma tabela de streaming com vetores de exclusão habilitados, como para conformidade com GDPR, operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma tabela de streaming, consulte Excluir permanentemente registros de uma tabela de streaming.
As exibições materializadas sempre refletem os dados nas tabelas subjacentes quando são atualizadas. Para excluir dados em uma exibição Materializada, exclua os dados da origem e atualize a exibição materializada.
Parametrizar valores usados ao declarar tabelas ou exibições com SQL
Use SET para especificar um valor de configuração em uma consulta que declare uma tabela ou exibição, incluindo configurações do Spark. Qualquer tabela ou visualização que você definir em um arquivo de origem depois que a SET declaração tiver acesso ao valor definido. Todas as configurações do Spark especificadas usando a instrução SET são usadas ao executar a consulta Spark para qualquer tabela ou exibição após a instrução SET. Para ler um valor de configuração em uma consulta, use a sintaxe de interpolação de cadeia de caracteres ${}. O seguinte exemplo define um valor de configuração do Spark chamado startDate e usa esse valor em uma consulta:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Para especificar vários valores de configuração, use uma instrução SET separada para cada valor.
Limitações
Não há suporte para a cláusula PIVOT. A operação pivot no Spark requer o carregamento adiantado de dados de entrada para calcular o esquema da saída. Não há suporte para essa funcionalidade em pipelines.
Observação
A sintaxe CREATE OR REFRESH LIVE TABLE para criar uma exibição materializada foi preterida. Em vez disso, use CREATE OR REFRESH MATERIALIZED VIEW.