Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Spark Connect is a gRPC-based protocol within Apache Spark that specifies how a client application can communicate with a remote Spark Server. It allows remote execution of Spark workloads using the DataFrame API.
Spark Connect is used in the following:
- Scala notebooks with Databricks Runtime version 13.3 and above, on standard compute
- Python notebooks with Databricks Runtime version 14.3 and above, on standard compute
- Serverless compute
- Databricks Connect
While both Spark Connect and Spark Classic utilize lazy execution for transformations, there are important differences to know to avoid unexpected behavior and performance issues when migrating existing code from Spark Classic to Spark Connect or when writing code that must work with both.
Lazy vs eager
The key difference between Spark Connect and Spark Classic is that Spark Connect defers analysis and name resolution to execution time, as summarized in the following table.
| Aspect | Spark Classic | Spark Connect |
|---|---|---|
| Query execution | Lazy | Lazy |
| Schema analysis | Eager | Lazy |
| Schema access | Local | Triggers RPC and caches the schema on first access |
| Temporary views | Plan embedded | Name lookup |
| UDF serialization | At creation | At execution |
Query execution
Both Spark Classic and Spark Connect follow the same lazy execution model for query execution.
In Spark Classic, DataFrame transformations (such as filter and limit) are lazy. This means they are not executed immediately, but are encoded in a logical plan. The actual computation is only triggered with an action (such as show(), collect()).
Spark Connect follows a similar lazy evaluation model. Transformations are constructed on the client side and sent as unresolved plans to the server. The server then performs the necessary analysis and execution when an action is called.
| Aspect | Spark Classic | Spark Connect |
|---|---|---|
Transformations: df.filter(...), df.select(...), df.limit(...) |
Lazy execution | Lazy execution |
SQL queries: spark.sql("select …") |
Lazy execution | Lazy execution |
Actions: df.collect(), df.show() |
Eager execution | Eager execution |
SQL commands: spark.sql("insert …"), spark.sql("create …") |
Eager execution | Eager execution |
Schema analysis
Spark Classic performs analysis eagerly during logical plan construction. This analysis phase converts the unresolved plan into a fully resolved logical plan and verifies that the operation can be executed by Spark. One of the key benefits of performing this work eagerly is that users receive immediate feedback when a mistake is made. For example, executing spark.sql("select 1 as a, 2 as b").filter("c > 1") will throw an error eagerly, indicating the column c cannot be found.
Spark Connect differs from Classic because the client constructs unresolved plans during transformation and defers their analysis. Any operation that requires a resolved plan—such as accessing a schema, explaining the plan, persisting a DataFrame, or executing an action—causes the client to send the unresolved plans to the server over RPC. The server then performs full analysis to get its resolved logical plan and do the operation. For example, spark.sql("select 1 as a, 2 as b").filter("c > 1") will not throw any error because the unresolved plan is client-side only, but on df.columns or df.show() an error will be thrown because the unresolved plan is sent to the server for analysis.
Unlike query execution, Spark Classic and Spark Connect differ on when schema analysis occurs.
| Aspect | Spark Classic | Spark Connect |
|---|---|---|
Transformations: df.filter(...), df.select(...), df.limit(...) |
Eager | Lazy |
Schema access: df.columns, df.schema, df.isStreaming |
Eager | Eager Triggers an analysis RPC request, unlike Spark Classic |
Actions: df.collect(), df.show() |
Eager | Eager |
| Dependent session state of DataFrames: UDFs, temporary views, configs | Eager | Lazy Evaluated during the plan execution of the DataFrame |
| Dependent session state of temporary views: UDFs, other temporary views, configs | Eager | Eager The analysis is triggered eagerly when creating the temporary view |
Best practices
The difference between lazy and eager analysis means there are some best practices to follow to avoid unexpected behavior and performance issues, specifically those caused by overwriting of temporary view names, capturing external variables in UDFs, delayed error detection, and excessive schema access on new DataFrames.
Create unique temporary view names
In Spark Connect, the DataFrame stores only a reference to the temporary view by name. As a result, if the temp view is later replaced, the data in the DataFrame will also change because it looks up the view by name at execution time.
This behavior differs from Spark Classic, where the logical plan of the temp view is embedded into the data frame's plan at the time of creation. Any subsequent replacement of the temp view does not affect the original data frame.
To mitigate the difference, always create unique temporary view names. For example, include a UUID in the view name. This avoids affecting any existing DataFrames that reference a previously registered temp view.
Python
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
Scala
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
Wrap UDF definitions
It is generally considered bad practice for UDFs to depend on mutable external variables, as this introduces implicit dependencies, can lead to non-deterministic behavior, and reduces composability. However, if you do have such a pattern, be aware of the following gotcha:
In Spark Connect, Python UDFs are lazy. Their serialization and registration are deferred until execution time. In the following example, the UDF is only serialized and uploaded to the Spark cluster for execution when show() is called.
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
This behavior differs from Spark Classic, where UDFs are eagerly created. In Spark Classic, the value of x at the time of UDF creation is captured, so subsequent changes to x do not affect the already-created UDF.
If you need to modify the value of external variables that a UDF depends on, use a function factory (closure with early binding) to correctly capture variable values. Specifically, wrap the UDF creation in a helper function to capture the value of a dependent variable.
Python
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
Scala
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
By wrapping the UDF definition inside another function (make_udf), we create a new scope where the current value of x is passed in as an argument. This ensures each generated UDF has its own copy of the field, bound at the time the UDF is created.
Trigger eager analysis for error detection
The following error handling is useful in Spark Classic because it performs eager analysis, which allows exceptions to be thrown promptly. However, in Spark Connect, this code does not cause any issue, as it only constructs a local unresolved plan without triggering any analysis.
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
If your code relies on the analysis exception and you want to catch it, you can trigger eager analysis, for example with df.columns, df.schema, or df.collect().
Python
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
Scala
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
Avoid too many eager analysis requests
Performance can be improved if you avoid analysis requests on large numbers of DataFrames.
Creating new DataFrames step by step and accessing their schema on each iteration
When you create large numbers of new DataFrames, avoid excessive usage of calls triggering eager analysis on them (such as df.columns, df.schema). You can access the schema of the same DataFrame multiple times, but triggering analysis on many newly created DataFrames will impact performance.
For example, when iteratively adding columns to a DataFrame inside a loop and checking whether each column already exists before adding it, calling df.columns on each newly created DataFrame triggers an analysis request on every iteration. To avoid this, maintain a set to track column names instead of repeatedly accessing the DataFrame's schema.
Python
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
# if new_column_name not in df.columns: # Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if new_column_name not in columns: # Check the set without triggering analysis
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
Scala
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
// if (!df.columns.contains(newColumnName)) { // Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if (!columns.contains(newColumnName)) { // Check the set without triggering analysis
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
Avoid accessing schemas for a large number of intermediate DataFrames
Another similar case is creating a large number of unnecessary intermediate DataFrames and analyzing them. In the following case, to extract the field names from each column of a struct type, obtain StructType field information directly from the DataFrame's schema instead of creating intermediate DataFrames.
Python
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
# column_schema.name: df.select(column_schema.name + ".*").columns # Bad practice. This creates an intermediate DataFrame and triggers an analysis request for each StructType column.
column_schema.name: [f.name for f in column_schema.dataType.fields] # Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
Scala
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
// field.name -> df.select(field.name + ".*").columns // Bad practice. This creates an intermediate DataFrame and triggers analysis for each StructType column.
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name) // Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
}
.toMap
println(structColumnFields)