Freigeben über


Codebeispiele für Databricks Connect für Python

Hinweis

Dieser Artikel behandelt Databricks Connect für Databricks Runtime Version 13.3 LTS und höher.

In diesem Artikel finden Sie Codebeispiele, in denen Databricks Connect für Python verwendet wird. Mit Databricks Connect können Sie beliebte IDEs, Notebookserver und benutzerdefinierte Anwendungen mit Azure Databricks-Clustern verbinden. Weitere Informationen finden Sie unter Was ist Databricks Connect?. Die Scala-Version dieses Artikels finden Sie unter Codebeispiele für Databricks Connect für Scala.

Bevor Sie beginnen, Databricks Connect zu verwenden, müssen Sie den Databricks Connect-Client einrichten.

In den folgenden Beispielen wird davon ausgegangen, dass Sie die Standardauthentifizierung für databricks Connect-Clientsetup verwenden.

Beispiel: Lesen einer Tabelle

In diesem einfachen Codebeispiel wird die angegebene Tabelle abgefragt und anschließend werden die ersten 5 Zeilen dieser Tabelle gezeigt.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Beispiel: Erstellen eines DataFrames

Das folgende Codebeispiel:

  1. Erstellt einen In-Memory-DataFrame.
  2. Erstellt eine Tabelle mit dem Namen zzz_demo_temps_table innerhalb des default-Schemas. Wenn die Tabelle mit diesem Namen bereits vorhanden ist, wird die Tabelle zuerst gelöscht. Um ein anderes Schema oder eine andere Tabelle zu verwenden, passen Sie die Aufrufe auf spark.sql, temps.write.saveAsTable oder beides an.
  3. Speichert den Inhalt des DataFrames in der Tabelle.
  4. Führt eine SELECT Abfrage für den Inhalt der Tabelle aus.
  5. Zeigt das Ergebnis der Abfrage an.
  6. Löscht die Tabelle.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Beispiel: Verwenden von DatabricksSesssion oder SparkSession

Im folgenden Beispiel wird beschrieben, wie Code geschrieben wird, der zwischen Databricks Connect für Databricks Runtime 13.3 LTS und höher portabel ist. In Umgebungen, in denen die DatabricksSession Klasse nicht verfügbar ist, wird stattdessen die SparkSession Klasse verwendet, um die angegebene Tabelle abzufragen und die ersten 5 Zeilen zurückzugeben. In diesem Beispiel wird die SPARK_REMOTE Umgebungsvariable für die Authentifizierung verwendet.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)

Weitere Ressourcen

Databricks bietet zusätzliche Beispielanwendungen, die zeigen, wie Databricks Connect im Databricks Connect-GitHub-Repository verwendet wird, einschließlich der folgenden: