Compartilhar via


Tutorial: Criar um pipeline etl com pipelines declarativos do Lakeflow Spark

Este tutorial explica como criar e implantar um pipeline de ETL (extrair, transformar e carregar) para orquestração de dados usando Pipelines Declarativos do Lakeflow Spark e Carregador Automático. Um pipeline de ETL implementa as etapas para ler dados de sistemas de origem, transformar esses dados com base em requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.

Neste tutorial, você usará pipelines e Auto Loader para:

  • Importar dados brutos de origem para uma tabela de destino.
  • Transforme os dados brutos de origem e escreva os dados transformados em duas visões materializadas de destino.
  • Consulte os dados transformados.
  • Automatize o pipeline de ETL com um trabalho do Databricks.

Para obter mais informações sobre pipelines e Auto Loader, consulte Pipelines Declarativos do Spark do Lakeflow e O que é Auto Loader?

Requisitos

Para concluir este tutorial, você deve atender aos seguintes requisitos:

Sobre o conjunto de dados

O conjunto de dados usado neste exemplo é um subconjunto do Conjunto de dados de milhões de músicas, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de amostra incluídos no workspace do Azure Databricks.

Etapa 1: Criar um pipeline

Primeiro, crie um pipeline definindo os conjuntos de dados em arquivos ( chamados de código-fonte) usando a sintaxe do pipeline. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários arquivos específicos ao idioma no pipeline. Para saber mais, confira Pipelines Declarativos do Lakeflow Spark

Este tutorial usa a computação sem servidor e o Catálogo do Unity. Para todas as opções de configuração que não são especificadas, use as configurações padrão. Se a computação sem servidor não estiver habilitada ou tiver suporte em seu workspace, você poderá concluir o tutorial como escrito usando as configurações de computação padrão.

Para criar um novo pipeline, siga estas etapas:

  1. No workspace, clique no ícone PlusNovo na barra lateral, então selecione Pipeline ETL.
  2. Dê um nome exclusivo ao pipeline.
  3. Logo abaixo do nome, selecione o catálogo e o esquema padrão para os dados gerados. Você pode especificar outros destinos em suas transformações, mas este tutorial usa esses padrões. Você deve ter permissões para o catálogo e o esquema que você cria. Confira os Requisitos
  4. Para este tutorial, selecione Iniciar com um arquivo vazio.
  5. No caminho da pasta, especifique um local para seus arquivos de origem ou aceite o padrão (sua pasta de usuário).
  6. Escolha Python ou SQL como a linguagem do seu primeiro arquivo fonte (um pipeline pode combinar linguagens, mas cada arquivo deve estar em um único idioma).
  7. Clique em Selecionar.

O editor de pipeline é exibido para o novo pipeline. Um arquivo de origem vazio para seu idioma é criado, pronto para sua primeira transformação.

Etapa 2: Desenvolver sua lógica de pipeline

Nesta etapa, você usará o Editor do Lakeflow Pipelines para desenvolver e validar o código-fonte do pipeline interativamente.

O código usa o Carregador Automático para ingestão de dados incremental. O Carregador Automático detecta e processa automaticamente arquivos novos confirme eles chegam no armazenamento de objeto da nuvem. Para saber mais, confira o que é o Carregador Automático?

Um arquivo de código-fonte em branco é criado e configurado automaticamente para o pipeline. O arquivo é criado na pasta de transformações do pipeline. Por padrão, todos os arquivos *.py e *.sql na pasta de transformações fazem parte da origem do pipeline.

  1. Copie e cole o código a seguir no arquivo de origem. Use o idioma selecionado para o arquivo na Etapa 1.

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Essa fonte inclui código para três consultas. Você também pode colocar essas consultas em arquivos separados, para organizar os arquivos e codificar da maneira que preferir.

  2. Clique no ícone Reproduzir.Execute o arquivo ou execute o pipeline para iniciar uma atualização para o pipeline conectado. Com apenas um arquivo de origem em seu pipeline, eles são funcionalmente equivalentes.

Quando a atualização for concluída, o editor será atualizado com informações sobre seu pipeline.

  • O grafo de pipeline (DAG), na barra lateral à direita do código, mostra três tabelas, songs_raw, songs_prepared e top_artists_by_year.
  • Um resumo da atualização é mostrado na parte superior do navegador de ativos de pipeline.
  • Os detalhes das tabelas geradas são mostrados no painel inferior e você pode procurar dados das tabelas selecionando uma.

Isso inclui os dados brutos e limpos, bem como algumas análises simples para encontrar os principais artistas por ano. Na próxima etapa, você criará consultas ad hoc para análise adicional em um arquivo separado em seu pipeline.

Etapa 3: Explorar os conjuntos de dados criados pelo pipeline

Nesta etapa, você executa consultas ad-hoc nos dados processados no pipeline de ETL para analisar os dados das músicas no Editor de SQL do Databricks. Essas consultas usam os registros preparados criados na etapa anterior.

Primeiro, execute uma consulta que encontra os artistas que mais lançaram músicas a cada ano desde 1990.

  1. Na barra lateral do navegador de ativos de pipeline, clique no ícone Plus.Adicione então Exploração.

  2. Insira um Nome e selecione SQL para o arquivo de exploração. Um bloco de anotações SQL é criado em uma nova explorations pasta. Os arquivos na explorations pasta não são executados como parte de uma atualização de pipeline por padrão. O bloco de anotações SQL tem células que podem ser executadas em conjunto ou separadamente.

  3. Para criar uma tabela de artistas que lançam a maioria das músicas em cada ano após 1990, insira o código a seguir no novo arquivo SQL (se houver código de exemplo no arquivo, substitua-o). Como esse notebook não faz parte do pipeline, ele não usa o catálogo e o esquema padrão. Substitua o <catalog>.<schema> com o catálogo e o esquema que você usou como padrão para o pipeline.

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Clique no ícone Reproduzir ou pressione Shift + Enter para executar esta consulta.

Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançante.

  1. Adicione o código a seguir à próxima célula no mesmo arquivo. Novamente, substitua o <catalog>.<schema> pelo catálogo e pelo esquema que você usou como padrão para o pipeline.

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Clique no ícone Reproduzir ou pressione Shift + Enter para executar esta consulta.

Etapa 4: Criar um trabalho para executar o pipeline

Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão, processamento e análise de dados usando um trabalho do Databricks executado em um agendamento.

  1. Na parte superior do editor, escolha o botão Agendar .
  2. Se a caixa de diálogo Agendas for exibida, escolha Adicionar agendamento.
  3. Isso abre a caixa de diálogo Nova agenda, na qual você pode criar uma tarefa para executar o pipeline em um horário programado.
  4. Opcionalmente, dê um nome ao trabalho.
  5. Por padrão, o agendamento é definido para ser executado uma vez por dia. Você pode aceitar esse padrão ou então definir sua própria agenda. Escolher Avançado oferece a opção de definir uma hora específica em que o trabalho será executado. Selecionar Mais opções permite criar notificações quando o trabalho é executado.
  6. Selecione Criar para aplicar as alterações e criar o trabalho.

Agora, a tarefa será executada diariamente para manter seu pipeline atualizado. Você pode escolher Agendar novamente para exibir a lista de agendas. Você pode gerenciar agendas para o pipeline a partir dessa caixa de diálogo, incluindo adicionar, editar ou remover agendas.

Clicar no nome da agenda (ou trabalho) leva você para a página do trabalho na lista Trabalhos > pipelines . A partir daí, você pode exibir detalhes sobre execuções de trabalho, incluindo o histórico de execuções ou executar o trabalho imediatamente com o botão Executar agora .

Consulte Monitoramento e observabilidade para trabalhos do Lakeflow para obter mais informações sobre execuções de trabalhos.

Saiba mais