Usar o Spark para trabalhar com arquivos de dados
Depois de configurar um bloco de anotações e anexá-lo a um cluster, você pode usar o Spark para ler e processar arquivos de dados. O Spark oferece suporte a uma ampla variedade de formatos, como CSV, JSON, Parquet, ORC, Avro e Delta, e o Databricks fornece conectores internos para acessar arquivos armazenados no espaço de trabalho, no Azure Data Lake ou no Armazenamento de Blobs ou em outros sistemas externos.
O fluxo de trabalho geralmente segue três etapas:
Leia um arquivo em um Spark DataFrame usando spark.read com o formato e o caminho corretos. Ao ler formatos de texto bruto, como CSV ou JSON, o Spark pode inferir o esquema (nomes de colunas e tipos de dados), mas isso às vezes é lento ou não confiável. Uma prática melhor na produção é definir o esquema explicitamente para que os dados sejam carregados de forma consistente e eficiente.
Explore e transforme o DataFrame usando operações SQL ou DataFrame (por exemplo, filtrando linhas, selecionando colunas, agregando valores).
Escreva os resultados de volta para o armazenamento em um formato escolhido.
O trabalho com arquivos no Spark foi projetado para ser consistente em conjuntos de dados pequenos e grandes. O mesmo código usado para testar um pequeno arquivo CSV também funcionará em conjuntos de dados muito maiores, já que o Spark distribui o trabalho pelo cluster. Isso facilita a expansão da exploração rápida para o processamento de dados mais complexo.
Carregando dados em um dataframe
Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar com dados. Suponha que você tenha os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na pasta de dados no armazenamento do sistema de arquivos Databricks (DBFS):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Em um bloco de anotações do Spark, você pode usar o seguinte código PySpark para carregar os dados em um dataframe e exibir as primeiras 10 linhas:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
A %pyspark linha no início é chamada de magia, e diz a Spark que a linguagem usada nesta célula é PySpark. Aqui está o código Scala equivalente para o exemplo de dados de produtos:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
A magia %spark é usada para especificar Scala.
Gorjeta
Você também pode selecionar o idioma que deseja usar para cada célula na interface do Bloco de Anotações.
Ambos os exemplos mostrados anteriormente produziriam resultados como este:
| ID do Produto | Nome do Produto | Categoria | PreçoListado |
|---|---|---|---|
| 771 | Montanha 100 Prateado, 38 | Bicicletas de Montanha | 3399.9900 |
| 772 | Montanha 100 Prateado, 42 | Bicicletas de Montanha | 3399.9900 |
| 773 | Montanha 100 Prateado, 44 | Bicicletas de Montanha | 3399.9900 |
| ... | ... | ... | ... |
Especificando um esquema de dataframe
No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark era capaz de inferir o tipo de dados de cada coluna a partir dos dados que ela contém. Você também pode especificar um esquema explícito para os dados, que é útil quando os nomes das colunas não são incluídos no arquivo de dados, como este exemplo CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
O exemplo PySpark a seguir mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Os resultados seriam, mais uma vez, semelhantes a:
| ID do Produto | Nome do Produto | Categoria | PreçoListado |
|---|---|---|---|
| 771 | Montanha 100 Prateado, 38 | Bicicletas de Montanha | 3399.9900 |
| 772 | Montanha 100 Prateado, 42 | Bicicletas de Montanha | 3399.9900 |
| 773 | Montanha 100 Prateado, 44 | Bicicletas de Montanha | 3399.9900 |
| ... | ... | ... | ... |
Filtragem e agrupamento de dataframes
Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados que ela contém. Por exemplo, o exemplo de código a seguir usa o select método para recuperar as colunas ProductName e ListPrice do dataframe df que contém dados do produto no exemplo anterior:
pricelist_df = df.select("ProductID", "ListPrice")
Os resultados deste exemplo de código seriam mais ou menos assim:
| ID do Produto | PreçoListado |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto dataframe.
Gorjeta
Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser obtida usando a seguinte sintaxe mais curta:
pricelist_df = df["ProductID", "ListPrice"]
Você pode "encadear" métodos juntos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este exemplo de código encadeia os select métodos e where para criar um novo dataframe contendo as colunas ProductName e ListPrice para produtos com uma categoria de Mountain Bikes ou Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Os resultados deste exemplo de código seriam mais ou menos assim:
| Nome do Produto | PreçoListado |
|---|---|
| Montanha 100 Prateado, 38 | 3399.9900 |
| Road-750 Preta, 52 | 539.9900 |
| ... | ... |
Para agrupar e agregar dados, você pode usar o método groupby e funções de agregação. Por exemplo, o seguinte código PySpark conta o número de produtos para cada categoria:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Os resultados deste exemplo de código seriam mais ou menos assim:
| Categoria | contagem |
|---|---|
| Auriculares | 3 |
| Pneus | 14 |
| Bicicletas de Montanha | 32 |
| ... | ... |
Nota
Os Spark DataFrames são declarativos e imutáveis. Cada transformação (como select, filterou groupBy) cria um novo DataFrame que representa o que você deseja, não como ele é executado. Isso torna o código reutilizável, otimizado e livre de efeitos colaterais. Mas nenhuma dessas transformações realmente é executada até que você acione uma ação (por exemplo, display, , collectwrite), momento em que o Spark executa o plano otimizado completo.
Usando expressões SQL no Spark
A API Dataframe faz parte de uma biblioteca do Spark chamada Spark SQL, que permite que os analistas de dados usem expressões SQL para consultar e manipular dados.
Criando objetos de banco de dados no catálogo do Spark
O catálogo do Spark é um metastore para objetos de dados relacionais, como exibições e tabelas. O tempo de execução do Spark pode usar o catálogo para integrar perfeitamente o código escrito em qualquer linguagem suportada pelo Spark com expressões SQL que podem ser mais naturais para alguns analistas de dados ou desenvolvedores.
Uma das maneiras mais simples de disponibilizar dados em um dataframe para consulta no catálogo do Spark é criar uma exibição temporária, conforme mostrado no exemplo de código a seguir:
df.createOrReplaceTempView("products")
Uma vista é temporária, o que significa que é automaticamente eliminada no final da sessão atual. Você também pode criar tabelas que são persistentes no catálogo para definir um banco de dados que pode ser consultado usando o Spark SQL.
Nota
Não exploraremos as tabelas de catálogo do Spark em profundidade neste módulo, mas vale a pena reservar um tempo para destacar alguns pontos-chave:
- Você pode criar uma tabela vazia usando o
spark.catalog.createTablemétodo. As tabelas são estruturas de metadados que armazenam seus dados subjacentes no local de armazenamento associado ao catálogo. A exclusão de uma tabela também exclui seus dados subjacentes. - Você pode salvar um dataframe como uma tabela usando seu
saveAsTablemétodo. - Você pode criar uma tabela externa usando o
spark.catalog.createExternalTablemétodo. As tabelas externas definem metadados no catálogo, mas obtêm seus dados subjacentes de um local de armazenamento externo; normalmente uma pasta em um data lake. A exclusão de uma tabela externa não exclui os dados subjacentes.
Usando a API SQL do Spark para consultar dados
Você pode usar a API SQL do Spark em código escrito em qualquer idioma para consultar dados no catálogo. Por exemplo, o código PySpark a seguir usa uma consulta SQL para retornar dados da exibição de produtos como um dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Os resultados do exemplo de código seriam semelhantes à tabela a seguir:
| Nome do Produto | PreçoListado |
|---|---|
| Montanha 100 Prateado, 38 | 3399.9900 |
| Road-750 Preta, 52 | 539.9900 |
| ... | ... |
Usando código SQL
O exemplo anterior demonstrou como usar a API SQL do Spark para incorporar expressões SQL no código do Spark. Em um bloco de anotações, você também pode usar a mágica para executar o %sql código SQL que consulta objetos no catálogo, da seguinte forma:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
O exemplo de código SQL retorna um conjunto de resultados que é exibido automaticamente no bloco de anotações como uma tabela, como a abaixo:
| Categoria | Contagem de produtos |
|---|---|
| Bib-Calções | 3 |
| Suportes para bicicletas | 1 |
| Suportes para Bicicletas | 1 |
| ... | ... |