Partager via


Comparer Spark Connect à Spark Classic

Spark Connect est un protocole basé sur gRPC dans Apache Spark qui spécifie comment une application cliente peut communiquer avec un serveur Spark distant. Il permet l’exécution à distance des charges de travail Spark à l’aide de l’API DataFrame.

Spark Connect est utilisé dans les éléments suivants :

  • Notebooks Scala avec Databricks Runtime version 13.3 et ultérieures, sur le calcul standard
  • Notebooks Python avec Databricks Runtime version 14.3 et ultérieures, sur le calcul standard
  • Informatique sans serveur
  • Databricks Connect

Bien que Spark Connect et Spark Classic utilisent l’exécution différée pour les transformations, il existe des différences importantes à connaître pour éviter les problèmes inattendus de comportement et de performances lors de la migration du code existant de Spark Classic vers Spark Connect ou lors de l’écriture de code qui doit fonctionner avec les deux.

Lazy vs hâtif

La principale différence entre Spark Connect et Spark Classic est que Spark Connect reporte l’analyse et la résolution de noms au temps d’exécution, comme résumé dans le tableau suivant.

Aspect Spark Classic Spark Connect
Exécution d’une requête Paresseux Paresseux
Analyse du schéma Impatient Paresseux
Accès au schéma Local Déclencheurs RPC
Vues temporaires Plan intégré Recherche de noms
Sérialisation UDF Lors de la création Lors de l’exécution

Exécution d’une requête

Spark Classic et Spark Connect suivent le même modèle d’exécution différé pour l’exécution des requêtes.

Dans Spark Classic, les transformations DataFrame (telles que filter et limit) sont différées. Cela signifie qu’ils ne sont pas exécutés immédiatement, mais qu’ils sont enregistrés dans un plan logique. Le calcul réel est déclenché uniquement lorsqu’une action (par exemple show(), , collect()) est appelée.

Spark Connect suit un modèle similaire d'évaluation paresseuse. Les transformations sont construites côté client et envoyées en tant que plans proto non résolus au serveur. Le serveur effectue ensuite l’analyse et l’exécution nécessaires lorsqu’une action est appelée.

Aspect Spark Classic Spark Connect
Transformations : df.filter(...), , df.select(...)df.limit(...) Exécution différée Exécution différée
Requêtes SQL : spark.sql("select …") Exécution différée Exécution différée
Actions : df.collect(), df.show() Exécution immédiate Exécution immédiate
Commandes SQL : spark.sql("insert …"), spark.sql("create …") Exécution immédiate Exécution immédiate

Analyse du schéma

Spark Classic effectue l’analyse de schéma avec impatience pendant la phase de construction du plan logique. Lorsque vous définissez des transformations, Spark analyse immédiatement le schéma du DataFrame pour vous assurer que toutes les colonnes et types de données référencés sont valides. Par exemple, l’exécution de spark.sql("select 1 as a, 2 as b").filter("c > 1") générera immédiatement une erreur, indiquant que la colonne c est introuvable.

Spark Connect construit à la place des plans proto non résolus pendant la transformation. Lors de l’accès à un schéma ou de l’exécution d’une action, le client envoie les plans non résolus au serveur via RPC (appel de procédure distante). Le serveur effectue ensuite l’analyse et l’exécution. Cette conception reporte l’analyse du schéma. Par exemple, spark.sql("select 1 as a, 2 as b").filter("c > 1") ne lève aucune erreur, car le plan non résolu est côté client uniquement, mais sur df.columns ou df.show() une erreur est levée, car le plan non résolu est envoyé au serveur à des fins d’analyse.

Contrairement à l’exécution des requêtes, Spark Classic et Spark Connect diffèrent en cas d’analyse de schéma.

Aspect Spark Classic Spark Connect
Transformations : df.filter(...), , df.select(...)df.limit(...) Impatient Paresseux
Accès au schéma : df.columns, df.schema, df.isStreaming Impatient Impatient
Déclenche une requête RPC d’analyse, contrairement à Spark Classic
Actions : df.collect(), df.show() Impatient Impatient
État de session dépendant : fonctions définies par l'utilisateur (UDF), vues temporaires, configurations Impatient Paresseux
Évalué pendant l’exécution

Meilleures pratiques

La différence entre l’analyse différée et évaluée sans délai implique qu’il existe certaines bonnes pratiques à suivre pour éviter les comportements inattendus et les problèmes de performances, en particulier ceux causés par la réécriture de noms d’affichage temporaires, la capture de variables externes dans les fonctions définies par l’utilisateur (UDF), la détection différée des erreurs, et l’accès excessif aux schémas sur de nouveaux DataFrames.

Créer des noms d’affichage temporaires uniques

Dans Spark Connect, le DataFrame stocke uniquement une référence à la vue temporaire par nom. Par conséquent, si l’affichage temporaire est remplacé ultérieurement, les données dans le DataFrame changent également, car elles recherchent l’affichage par nom au moment de l’exécution.

Ce comportement diffère de Spark Classic, où le plan logique de la vue temporaire est intégré dans le plan du dataframe au moment de la création. Tout remplacement ultérieur de la vue temporaire n’affecte pas la trame de données d’origine.

Pour atténuer la différence, créez toujours des noms d’affichage temporaires uniques. Par exemple, incluez un UUID dans le nom de la vue. Cela évite d’affecter les DataFrames existants qui font référence à une vue temporaire précédemment inscrite.

Python

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

Encapsulation des définitions UDF

Dans Spark Connect, les fonctions définies par l’utilisateur Python sont évaluées paresseusement. Leur sérialisation et leur inscription sont différées jusqu’au moment de l’exécution. Dans l’exemple suivant, l’UDF n’est sérialisée et chargée sur le cluster Spark pour l’exécution que lorsqu’elle show() est appelée.

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

Ce comportement diffère de Spark Classic, où les Fonctions Définies par l'Utilisateur (UDF) sont créées de manière immédiate. Dans Spark Classic, la valeur de x au moment de la création de la fonction définie par l'utilisateur est enregistrée. Par conséquent, les modifications suivantes de x n’affectent pas la fonction UDF déjà créée.

Si vous devez modifier la valeur des variables externes dont dépend une fonction UDF, utilisez une fabrique de fonctions (fermeture avec liaison anticipée) pour capturer correctement les valeurs des variables. Plus précisément, encapsulez la création de la fonction UDF dans une fonction d’assistance pour capturer la valeur d’une variable dépendante.

Python

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

En encapsulant la définition UDF à l’intérieur d’une autre fonction (make_udf), nous créons une nouvelle étendue dans laquelle la valeur x actuelle est passée en tant qu’argument. Cela garantit que chaque fonction UDF générée possède sa propre copie du champ, liée au moment où l’UDF est créée.

Déclencher une analyse impatiente pour la détection d’erreurs

La gestion des erreurs suivante est utile dans Spark Classic, car elle effectue une analyse impatiente, ce qui permet aux exceptions d’être levées rapidement. Toutefois, dans Spark Connect, ce code n’entraîne aucun problème, car il construit uniquement un plan proto non résolu local sans déclencher d’analyse.

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

Si votre code s’appuie sur l’exception d’analyse et que vous souhaitez l’intercepter, vous pouvez déclencher une analyse impatiente, par exemple avec df.columns, df.schemaou df.collect().

Python

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

Éviter trop de requêtes d’analyse fréquentes

Les performances peuvent être améliorées si vous évitez un grand nombre de requêtes d’analyse en évitant l’utilisation excessive des appels déclenchant une analyse anticipée (par exemple, df.columns, df.schema).

Si vous ne pouvez pas éviter cela et que vous devez fréquemment vérifier les colonnes de nouvelles trames de données, conservez un ensemble pour suivre les noms de colonnes afin d’éviter les demandes d’analyse.

Python

df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  if new_column_name not in columns: # Check the set
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()

Scala

import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  if (!columns.contains(newColumnName)) {
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

Un autre cas similaire consiste à créer un grand nombre de DataFrames intermédiaires inutiles et à les analyser. Au lieu de cela, obtenez des StructType informations de champ directement à partir du schéma du DataFrame au lieu de créer des DataFrames intermédiaires.

Python

from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    column_schema.name: [f.name for f in column_schema.dataType.fields]
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)

Scala

import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
  }
  .toMap
println(structColumnFields)