Usar o Spark para trabalhar com arquivos de dados
Depois de configurar um notebook e anexá-lo a um cluster, você pode usar o Spark para ler e processar arquivos de dados. O Spark dá suporte a uma ampla gama de formatos, como CSV, JSON, Parquet, ORC, Avro e Delta, e o Databricks fornece conectores internos para acessar arquivos armazenados no workspace, no Azure Data Lake ou no Armazenamento de Blobs ou em outros sistemas externos.
O fluxo de trabalho geralmente segue três etapas:
Leia um arquivo em um DataFrame do Spark usando spark.read com o formato e o caminho corretos. Ao ler formatos de texto bruto, como CSV ou JSON, o Spark pode inferir o esquema (nomes de coluna e tipos de dados), mas isso às vezes é lento ou não confiável. Uma prática melhor na produção é definir o esquema explicitamente para que os dados sejam carregados de forma consistente e eficiente.
Explore e transforme o DataFrame usando operações SQL ou DataFrame (por exemplo, filtrando linhas, selecionando colunas, agregando valores).
Escreva os resultados de volta ao armazenamento em um formato escolhido.
Trabalhar com arquivos no Spark foi projetado para ser consistente em conjuntos de dados pequenos e grandes. O mesmo código usado para testar um pequeno arquivo CSV também funcionará em conjuntos de dados muito maiores, já que o Spark distribui o trabalho em todo o cluster. Isso facilita a expansão da exploração rápida para o processamento de dados mais complexo.
Carregando dados em um dataframe
Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar usando dados. Suponha que você tenha os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na pasta de dados no armazenamento do DBFS (Databricks File System):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Em um notebook Spark, você pode usar o seguinte código PySpark para carregar os dados em um dataframe e exibir as primeiras dez linhas:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
A linha %pyspark no início é chamada mágica e informa ao Spark que a linguagem usada nesta célula é PySpark. Este é o código Scala equivalente do exemplo de dados de produtos:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
O magic %spark é usada para especificar Scala.
Dica
Você também pode selecionar a linguagem que deseja usar para cada célula na interface do Notebook.
Ambos os exemplos mostrados anteriormente produziriam uma saída como esta:
| ID do Produto | ProductName | Categoria | ListPrice |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | Bicicletas de Montanha | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | Bicicletas de Montanha | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | Bicicletas de Montanha | 3399.9900 |
| ... | ... | ... | ... |
Especificando um esquema de dataframe
No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark conseguiu inferir o tipo de dados de cada coluna com base nos dados contidos nela. Você também pode especificar um esquema explícito para os dados, o que é útil quando os nomes de coluna não são incluídos no arquivo de dados, como este exemplo de CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
O seguinte exemplo do PySpark mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Os resultados seriam mais uma vez semelhantes a:
| ID do Produto | ProductName | Categoria | ListPrice |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | Bicicletas de Montanha | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | Bicicletas de Montanha | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | Bicicletas de Montanha | 3399.9900 |
| ... | ... | ... | ... |
Filtrando e agrupando dataframes
Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados contidos nela. Por exemplo, o exemplo de código a seguir usa o select método para recuperar as colunas ProductName e ListPrice do dataframe df que contém dados do produto no exemplo anterior:
pricelist_df = df.select("ProductID", "ListPrice")
Os resultados desse exemplo de código seriam semelhantes a estes:
| ID do Produto | ListPrice |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto dataframe.
Dica
Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser realizada usando a seguinte sintaxe mais curta:
pricelist_df = df["ProductID", "ListPrice"]
Você pode "encadear" métodos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um novo dataframe contendo as colunas ProductName e ListPrice para produtos com uma categoria de Mountain Bikes ou Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Os resultados desse exemplo de código seriam semelhantes a estes:
| ProductName | ListPrice |
|---|---|
| Mountain-100 Silver, 38 | 3399.9900 |
| Road-750 Black, 52 | 539,9900 |
| ... | ... |
Para agrupar e agregar dados, você pode usar o groupby método e as funções de agregação. Por exemplo, o seguinte código PySpark conta o número de produtos de cada categoria:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Os resultados desse exemplo de código seriam semelhantes a estes:
| Categoria | contagem |
|---|---|
| Fones de ouvido | 3 |
| Wheels | 14 |
| Bicicletas de Montanha | 32 |
| ... | ... |
Observação
Os DataFrames do Spark são declarativos e imutáveis. Cada transformação (como select, filterou groupBy) cria um novo DataFrame que representa o que você deseja, não como ele é executado. Isso torna o código reutilizável, otimizado e livre de efeitos colaterais. Mas nenhuma dessas transformações realmente é executada até que você dispare uma ação (por exemplo, display, collect, ), writemomento em que o Spark executa o plano otimizado completo.
Usando expressões SQL no Spark
A API de Dataframe faz parte de uma biblioteca do Spark chamada Spark SQL, a qual permite que analistas de dados usem expressões SQL para consultar e manipular dados.
Criando objetos de banco de dados no catálogo do Spark
O catálogo do Spark é um metastore para objetos de dados relacionais, como exibições e tabelas. O runtime do Spark pode usar o catálogo para integrar perfeitamente o código escrito em qualquer linguagem com suporte do Spark a expressões SQL que podem ser mais naturais para alguns analistas de dados ou desenvolvedores.
Uma das maneiras mais simples de disponibilizar dados em um dataframe para consulta no catálogo do Spark é criar uma exibição temporária, conforme mostrado no seguinte código de exemplo:
df.createOrReplaceTempView("products")
Uma exibição é temporária, o que significa que ela é excluída automaticamente no final da sessão atual. Você também pode criar tabelas que são mantidas no catálogo para definir um banco de dados que pode ser consultado usando o SPARK SQL.
Observação
Não exploraremos as tabelas de catálogo do Spark a fundo neste módulo, mas aproveitaremos para realçar alguns pontos-chave:
- Você pode criar uma tabela vazia usando o método
spark.catalog.createTable. Tabelas são estruturas de metadados que armazenam dados subjacentes no local de armazenamento associado ao catálogo. Excluir uma tabela também excluirá os dados subjacentes. - Você pode salvar um dataframe como uma tabela usando o método
saveAsTable. - Você pode criar uma tabela externa usando o
spark.catalog.createExternalTablemétodo. As tabelas externas definem os metadados no catálogo, mas obtêm os dados subjacentes de um local de armazenamento externo; normalmente, de uma pasta em um data lake. Excluir uma tabela externa não exclui os dados subjacentes.
Usando a API do Spark SQL para consultar dados
Você pode usar a API do Spark SQL em código escrito em qualquer linguagem para consultar dados no catálogo. Por exemplo, o código PySpark a seguir usa uma consulta SQL para retornar dados da exibição de produtos como um dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Os resultados do exemplo de código seriam semelhantes a estes:
| ProductName | ListPrice |
|---|---|
| Mountain-100 Silver, 38 | 3399.9900 |
| Road-750 Black, 52 | 539,9900 |
| ... | ... |
Usando SQL código
O exemplo anterior demonstrou como usar a API do Spark SQL para inserir expressões SQL no código Spark. Em um notebook, você também pode usar o magic %sql para executar um código SQL que consulta objetos no catálogo, conforme exemplificado abaixo:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
O exemplo de código SQL retorna um conjunto de resultados que é automaticamente exibido no bloco de notas como uma tabela, como a que está abaixo:
| Categoria | ContagemDeProdutos |
|---|---|
| Bib-Shorts | 3 |
| Porta-bicicletas | 1 |
| Suportes de bicicleta | 1 |
| ... | ... |