Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este artigo apresenta exemplos simples para ilustrar o uso do PySpark. Ele pressupõe que você entenda os conceitos fundamentais do Apache Spark e esteja executando comandos em um bloco de anotações do Azure Databricks conectado à computação. Você cria DataFrames usando dados de exemplo, executa transformações básicas, incluindo operações de linha e coluna nesses dados, combina vários DataFrames e agrega esses dados, visualiza esses dados e os salva em uma tabela ou arquivo.
Carregar dados
Alguns exemplos neste artigo usam dados de exemplo fornecidos pelo Databricks para demonstrar o uso de DataFrames para carregar, transformar e salvar dados. Se você quiser usar seus próprios dados que ainda não estão no Databricks, você pode carregá-los primeiro e criar um DataFrame a partir dele. Consulte Criar ou modificar uma tabela usando o upload de arquivos e Carregar arquivos para um volume do Catálogo Unity.
Sobre os dados de exemplo do Databricks
Databricks fornece dados de exemplo no samples catálogo e no /databricks-datasets diretório.
- Para acessar os dados de exemplo no
samplescatálogo, use o formatosamples.<schema-name>.<table-name>. Este artigo usa tabelas nosamples.tpchesquema, que contém dados de uma empresa fictícia. Acustomertabela contém informações sobre os clientes eorderscontém informações sobre os pedidos feitos por esses clientes. - Use
dbutils.fs.lspara explorar dados no/databricks-datasets. Use o Spark SQL ou DataFrames para consultar dados nesse local usando caminhos de arquivo. Para saber mais sobre dados de exemplo fornecidos pelo Databricks, consulte Conjuntos de dados de exemplo.
Importar tipos de dados
Muitas operações do PySpark exigem que você use funções SQL ou interaja com tipos nativos do Spark. Importe diretamente apenas as funções e os tipos de que você precisa ou, para evitar substituir as funções internas do Python, importe esses módulos usando um alias comum.
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F
Para uma lista abrangente de tipos de dados, consulte Tipos de Dados PySpark.
Para uma lista abrangente das funções SQL do PySpark, consulte Funções PySpark.
Criar um DataFrame
Há várias maneiras de criar um DataFrame. Normalmente, você define um DataFrame em relação a uma fonte de dados, como uma tabela ou coleção de arquivos. Em seguida, conforme descrito na seção de conceitos fundamentais do Apache Spark, use uma ação, como display, para acionar as transformações a serem executadas. O display método produz DataFrames.
Criar um DataFrame com valores especificados
Para criar um DataFrame com valores especificados, use o createDataFrame método, onde as linhas são expressas como uma lista de tuplas:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Observe na saída que os tipos de dados de colunas de df_children são automaticamente inferidos. Como alternativa, você pode especificar os tipos adicionando um esquema. Os esquemas são definidos usando o StructType que é composto por que especificam o nome, o tipo de StructFields dados e um sinalizador booleano indicando se eles contêm um valor nulo ou não. Você deve importar tipos de dados do pyspark.sql.types.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Criar um DataFrame a partir de uma tabela no Unity Catalog
Para criar um DataFrame a partir de uma tabela no Unity Catalog, use o table método que identifica a tabela usando o formato <catalog-name>.<schema-name>.<table-name>. Clique em Catálogo na barra de navegação da esquerda para usar o Explorador de Catálogos e navegar até à sua tabela. Clique nele e selecione Copiar caminho da tabela para inserir o caminho da tabela no bloco de anotações.
O exemplo a seguir carrega a tabela samples.tpch.customer, mas você pode, alternativamente, fornecer o caminho para sua própria tabela.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Criar um DataFrame a partir de um arquivo carregado
Para criar um DataFrame a partir de um arquivo que você carregou para volumes do Catálogo Unity, use a read propriedade. Esse método retorna um DataFrameReader, que você pode usar para ler o formato apropriado. Clique na opção de catálogo na pequena barra lateral à esquerda e use o navegador de catálogo para localizar seu arquivo. Selecione-o e clique em Copiar caminho do arquivo de volume.
O exemplo abaixo lê a partir de um *.csv arquivo, mas DataFrameReader suporta o upload de arquivos em muitos outros formatos. Ver métodos DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Para obter mais informações sobre os volumes do Catálogo Unity, consulte O que são volumes do Catálogo Unity?.
Criar um DataFrame a partir de uma resposta JSON
Para criar um DataFrame a partir de uma carga útil de resposta JSON retornada por uma API REST, use o pacote Python requests para consultar e analisar a resposta. Você deve importar o pacote para usá-lo. Este exemplo usa dados do banco de dados de aplicativos de medicamentos da Food and Drug Administration dos Estados Unidos.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Para obter informações sobre como trabalhar com JSON e outros dados semiestruturados no Databricks, consulte Modelar dados semiestruturados.
Selecionar um campo ou objeto JSON
Para selecionar um campo ou objeto específico do JSON convertido, use a [] notação. Por exemplo, para selecionar o products campo que é uma matriz de produtos:
display(df_drugs.select(df_drugs["products"]))
Você também pode encadear chamadas de método para atravessar vários campos. Por exemplo, para produzir o nome da marca do primeiro produto em uma aplicação de medicamento:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Criar um DataFrame a partir de um arquivo
Para demonstrar a criação de um DataFrame a partir de um arquivo, este exemplo carrega dados CSV no /databricks-datasets diretório.
Para navegar até aos conjuntos de dados de exemplo, pode usar os comandos do sistema de ficheiros Databricks Utilties . O exemplo a seguir usa dbutils para listar os conjuntos de dados disponíveis em /databricks-datasets:
display(dbutils.fs.ls('/databricks-datasets'))
Como alternativa, você pode usar %fs para acessar os comandos do sistema de arquivos da CLI do Databricks, conforme mostrado no exemplo a seguir:
%fs ls '/databricks-datasets'
Para criar um DataFrame a partir de um arquivo ou diretório de arquivos, especifique o load caminho no método:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transforme dados com DataFrames
DataFrames facilitam a transformação de dados usando métodos internos para classificar, filtrar e agregar dados. Muitas transformações não são especificadas como métodos em DataFrames, mas são fornecidas no pyspark.sql.functions pacote. Veja Funções SQL do Databricks PySpark.
Operações de coluna
O Spark fornece muitas operações básicas de coluna:
Tip
Para gerar a saída de todas as colunas em um DataFrame, use columns, por exemplo df_customer.columns, .
Selecionar colunas
Você pode selecionar colunas específicas usando select e col. A col função está no pyspark.sql.functions submódulo.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Você também pode se referir a uma coluna usando expr uma expressão definida como uma cadeia de caracteres:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Você também pode usar selectExpr, que aceita expressões SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Para selecionar colunas usando um literal de cadeia de caracteres, faça o seguinte:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Para selecionar explicitamente uma coluna de um DataFrame específico, você pode usar o [] operador ou o . operador. (O . operador não pode ser usado para selecionar colunas que comecem com um número inteiro ou que contenham um espaço ou caractere especial.) Isso pode ser especialmente útil quando você está unindo DataFrames onde algumas colunas têm o mesmo nome.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Criar colunas
Para criar uma nova coluna, use o withColumn método. O exemplo a seguir cria uma nova coluna que contém um valor booleano com base em se o saldo c_acctbal da conta do cliente excede 1000:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Renomear colunas
Para renomear uma coluna, use o withColumnRenamed método, que aceita os nomes de coluna novos e existentes:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
O alias método é especialmente útil quando você deseja renomear suas colunas como parte de agregações:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Tipos de coluna de transmissão
Em alguns casos, você pode querer alterar o tipo de dados para uma ou mais das colunas em seu DataFrame. Para fazer isso, use o cast método para converter entre tipos de dados de coluna. O exemplo a seguir mostra como converter uma coluna de um inteiro para o tipo de cadeia de caracteres, usando o col método para fazer referência a uma coluna:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Remover colunas
Para remover colunas, você pode omitir colunas durante uma seleção ou select(*) except pode usar o drop método:
df_customer_flag_renamed.drop("balance_flag_renamed")
Você também pode soltar várias colunas de uma só vez:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Operações de linha
O Spark fornece muitas operações básicas de linha:
- Filtrar linhas
- Remover linhas duplicadas
- Manipular valores nulos
- Acrescentar linhas
- Ordenar linhas
- Filtrar linhas
Filtrar linhas
Para filtrar linhas, use o filter método ou where em um DataFrame para retornar apenas determinadas linhas. Para identificar uma coluna para filtrar, use o col método ou uma expressão que é avaliada como uma coluna.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Para filtrar em várias condições, use operadores lógicos. Por exemplo, & e | permitir que você e ANDOR condições, respectivamente. O exemplo a seguir filtra linhas em que o c_nationkey é igual a 20 e c_acctbal é maior que 1000.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Remover linhas duplicadas
Para eliminar a duplicação de linhas, use distinct, que retorna apenas as linhas exclusivas.
df_unique = df_customer.distinct()
Manipular valores nulos
Para manipular valores nulos, solte linhas que contenham valores nulos usando o na.drop método. Esse método permite especificar se você deseja soltar linhas contendo any valores nulos ou all valores nulos.
Para descartar quaisquer valores nulos, use um dos exemplos a seguir.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Se, em vez disso, você quiser filtrar apenas as linhas que contêm todos os valores nulos, use o seguinte:
df_customer_no_nulls = df_customer.na.drop("all")
Você pode aplicar isso a um subconjunto de colunas especificando isso, conforme mostrado abaixo:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Para preencher os valores em falta, utilize o fill método. Você pode optar por aplicar isso a todas as colunas ou a um subconjunto de colunas. No exemplo abaixo, os saldos de conta que têm um valor nulo para o saldo c_acctbal da conta são preenchidos com 0.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Para substituir cadeias de caracteres por outros valores, use o replace método. No exemplo abaixo, todas as cadeias de endereços vazias são substituídas pela palavra UNKNOWN:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Acrescentar linhas
Para acrescentar linhas, você precisa usar o union método para criar um novo DataFrame. No exemplo a seguir, o DataFrame df_that_one_customer criado anteriormente e df_filtered_customer são combinados, que retorna um DataFrame com três clientes:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Note
Você também pode combinar DataFrames gravando-os em uma tabela e, em seguida, anexando novas linhas. Para cargas de trabalho de produção, o processamento incremental de fontes de dados para uma tabela de destino pode reduzir drasticamente a latência e os custos de computação à medida que os dados crescem em tamanho. Consulte Conectores padrão no Lakeflow Connect.
Ordenar linhas
Important
A classificação pode ser cara em escala, e se você armazenar dados classificados e recarregar os dados com o Spark, o pedido não é garantido. Certifique-se de que você é intencional em seu uso de classificação.
Para classificar linhas por uma ou mais colunas, use o sort método ou orderBy . Por padrão, esses métodos classificam em ordem crescente:
df_customer.orderBy(col("c_acctbal"))
Para filtrar em ordem decrescente, use desc:
df_customer.sort(col("c_custkey").desc())
O exemplo a seguir mostra como classificar em duas colunas:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Para limitar o número de linhas a serem retornadas depois que o DataFrame for classificado, use o limit método. O exemplo a seguir exibe apenas os 10 principais resultados:
display(df_sorted.limit(10))
Junte-se a DataFrames
Para unir dois ou mais DataFrames, use o join método. Você pode especificar como gostaria que os DataFrames fossem unidos nos how parâmetros (o tipo de junção) e on (em quais colunas basear a junção). Os tipos comuns de junção incluem:
-
inner: Este é o padrão de tipo de junção, que retorna um DataFrame que mantém apenas as linhas onde há uma correspondência para oonparâmetro em todos os DataFrames. -
left: Isso mantém todas as linhas do primeiro DataFrame especificado e apenas as linhas do segundo DataFrame especificado que têm uma correspondência com o primeiro. -
outer: Uma junção externa mantém todas as linhas de ambos os DataFrames, independentemente da correspondência.
Para obter informações detalhadas sobre associações, consulte Trabalhar com associações no Azure Databricks. Para uma lista de joins suportados no PySpark, veja DataFrame joins.
O exemplo a seguir retorna um único DataFrame onde cada linha do DataFrame é unida orders com a linha correspondente do customers DataFrame. Uma junção interna é usada, pois a expectativa é que cada pedido corresponda exatamente a um cliente.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Para unir em várias condições, use operadores booleanos como & e | para especificar AND e OR, respectivamente. O exemplo a seguir adiciona uma condição adicional, filtrando apenas para as linhas que têm o_totalprice maior que 500,000:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Dados agregados
Para agregar dados em um DataFrame, semelhante a um GROUP BY em SQL, use o groupBy método para especificar colunas para agrupar e o agg método para especificar agregações. Importar agregações comuns, incluindo avg, sum, max, e min de pyspark.sql.functions. O exemplo a seguir mostra o saldo médio do cliente por segmento de mercado:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Algumas agregações são ações, o que significa que desencadeiam cálculos. Neste caso, você não precisa usar outras ações para produzir resultados.
Para contar linhas em um DataFrame, use o count método:
df_customer.count()
Encadeamento de chamadas
Os métodos que transformam DataFrames retornam DataFrames, e o Spark não age em transformações até que as ações sejam chamadas. Esta avaliação preguiçosa significa que pode encadear vários métodos para maior conveniência e legibilidade. O exemplo a seguir mostra como encadear a filtragem, a agregação e a ordenação:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualize seu DataFrame
Para visualizar um DataFrame num caderno, clique no + sinal ao lado da tabela no canto superior esquerdo do DataFrame e selecione Visualização para adicionar um ou mais gráficos com base no seu DataFrame. Para obter detalhes sobre visualizações, consulte Visualizações em blocos de anotações Databricks e editor SQL.
display(df_order)
Para executar visualizações adicionais, o Databricks recomenda o uso da API pandas para Spark. O .pandas_api() permite que você converta para a API pandas correspondente para um Spark DataFrame. Para obter mais informações, consulte API Pandas no Spark.
Guardar os dados
Depois de transformar seus dados, você pode salvá-los usando os DataFrameWriter métodos. Uma lista completa destes métodos pode ser encontrada no DataFrameWriter. As seções a seguir mostram como salvar seu DataFrame como uma tabela e como uma coleção de arquivos de dados.
Salve seu DataFrame como uma tabela
Para salvar seu DataFrame como uma tabela no Unity Catalog, use o write.saveAsTable método e especifique o caminho no formato <catalog-name>.<schema-name>.<table-name>.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Escreva seu DataFrame como CSV
Para gravar seu DataFrame no *.csv formato, use o write.csv método, especificando o formato e as opções. Por padrão, se existirem dados no caminho especificado, a operação de gravação falhará. Você pode especificar um dos seguintes modos para executar uma ação diferente:
-
overwritesubstitui todos os dados existentes no caminho de destino pelo conteúdo do DataFrame. -
appendacrescenta conteúdo do DataFrame aos dados no caminho de destino. -
ignoreFalha silenciosamente na gravação se existirem dados no caminho de destino.
O exemplo a seguir demonstra a substituição de dados com conteúdo DataFrame como arquivos CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Próximos passos
Para aproveitar mais recursos do Spark no Databricks, consulte: