Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Remarque
Cet article présente Databricks Connect pour Databricks Runtime 13.3 LTS et les versions ultérieures.
Cet article fournit des exemples de code qui utilisent Databricks Connect pour Python. Databricks Connect vous permet de connecter des environnements de développement intégré (IDE) populaires, des serveurs notebook et des applications personnalisées aux clusters Azure Databricks. Reportez-vous à Qu'est-ce que Databricks Connect ?. Pour obtenir la version Scala de cet article, reportez-vous à Exemples de code pour Databricks Connect pour Scala.
Avant de commencer à utiliser Databricks Connect, vous devez configurer le client Databricks Connect.
Les exemples suivants supposent que vous utilisez l’authentification par défaut pour la configuration du client Databricks Connect.
Exemple : Lire une table
Cet exemple de code simple interroge la table spécifiée, puis affiche les 5 premières lignes de la table spécifiées.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Exemple : Créer un DataFrame
L’exemple de code ci-dessous :
- Crée un DataFrame en mémoire.
- Crée une table portant le nom
zzz_demo_temps_tabledans le schémadefault. Si la table portant ce nom existe déjà, la table est d’abord supprimée. Pour utiliser un autre schéma ou table, ajustez les appels surspark.sql,temps.write.saveAsTableou les deux. - Enregistre le contenu du DataFrame dans la table.
- Exécute une
SELECTrequête sur le contenu de la table. - Affiche le résultat de la requête.
- Supprime la table.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Exemple : Utiliser DatabricksSession ou SparkSession
L’exemple suivant explique comment écrire du code portable entre Databricks Connect pour Databricks Runtime 13.3 LTS et versions ultérieures dans les environnements où la DatabricksSession classe n’est pas disponible, auquel cas elle utilise la classe à la place pour interroger la SparkSession table spécifiée et retourner les 5 premières lignes. Cet exemple utilise la variable d’environnement SPARK_REMOTE pour l’authentification.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)
Ressources supplémentaires
Databricks fournit des exemples d’applications supplémentaires qui montrent comment utiliser Databricks Connect dans le référentiel GitHub Databricks Connect, notamment :