Modificar e salvar dataframes
O Apache Spark fornece o objeto dataframe como a estrutura primária para trabalhar com dados. Você pode usar dataframes para consultar e transformar dados e persistir os resultados em um data lake. Para carregar dados em um dataframe, use a função spark.read , especificando o formato do arquivo, o caminho e, opcionalmente, o esquema dos dados a serem lidos. Por exemplo, o código a seguir carrega dados de todos os arquivos .csv na pasta pedidos em um dataframe chamado order_details e, em seguida, exibe os cinco primeiros registros.
order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))
Transformar a estrutura de dados
Depois de carregar os dados de origem em um dataframe, você pode usar os métodos do objeto dataframe e as funções do Spark para transformá-los. As operações típicas em um dataframe incluem:
- Filtrando linhas e colunas
- Renomeando colunas
- Criando novas colunas, muitas vezes derivadas das existentes
- Substituindo valores nulos ou outros
No exemplo a seguir, o código usa a split função para separar os valores na coluna CustomerName em duas novas colunas chamadas FirstName e LastName. Em seguida, ele usa o drop método para excluir a coluna CustomerName original.
from pyspark.sql.functions import split, col
# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")
display(transformed_df.limit(5))
Você pode usar todo o poder da biblioteca SQL do Spark para transformar os dados filtrando linhas, derivando, removendo, renomeando colunas e quaisquer outras modificações de dados necessárias.
Salvar os dados transformados
Depois que o dataFrame estiver na estrutura necessária, você poderá salvar os resultados em um formato com suporte no data lake.
O exemplo de código a seguir salva o dataFrame em um arquivo parquet no data lake, substituindo qualquer arquivo existente com o mesmo nome.
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")
Observação
Normalmente, o formato Parquet é preferencial para os arquivos de dados que você usará para análise ou ingestão posterior em um repositório analítico. O Parquet é um formato muito eficiente que é compatível com a maioria dos sistemas de análise de dados em grande escala. Na verdade, às vezes, seu requisito de transformação de dados pode ser apenas converter dados de outro formato (como CSV) em Parquet.