How withColumn Can Degrade the Performance of a Spark Job?

Using Spark’s withColumn
may seem harmless, but if misused, it can slow down your job significantly! The surprising part? You might not notice it until you dig into Spark’s Logical Plans.
By the end of this newsletter edition, you will be able to identify and fix such issues in your Spark Job and write more performant code from now on.
Don't believe me ?!? Let's dive right into it.
Problem and How to Spot It
Suppose you need to add or cast hundreds of columns that need to be populated with some constant value or need casting to a specific data type.
A typical, thoughtful, and expandable approach is to get or define all such columns in a list and just run a simple for-loop:
Using withColumn
with for
loop
Looks so clean and simple, right? Let's look at the Physical Plan of this query
Physical Plan
generated for the execution
Everything looks fine and as expected. Cool..!! But wait, let's look at the Logical Plans also once
Logical Plan
computed by Spark
Multiple Project
nodes
We can see multiple project nodes (1 for each column added using withColumn
)

Logical Plan
when multiple withColumn
is usedOkay. There are multiple Project nodes, so what? It's not in the Physical Plan, and Spark will finally execute the selected Physical Plan. So, there shouldn't be any performance regression because of this during execution.

Well, the performance degradation happens before it even reaches the Physical Plan.
Cause of Performance Degradation
Each time withColumn
is used to add a column in the dataframe, Spark’s Catalyst optimizer re-evaluates the whole plan repeatedly. This adds up fast and strains performance.
The surprising part? You might not notice it until you dig into Spark’s Logical Plans.
This issue is not so obvious because it doesn't show up in the SparkUI. Your job that might take only 5mins to complete, can end up taking 5x more time because of multiple withColumn
.

A further deep dive into this will be in the end. Let's examine the solutions to avoid this degradation.
Solutions
Solution #1: Using .withColumns()
for Spark >= 3.3
Starting from Spark 3.3, withColumns()
transformation is available to use, that takes a dictionary of string
and Column
datatype.
Using .withColumns
present in Spark >= 3.3
Analyzed Logical Plan
having only a single Project
node
Only a single Project
node present in the Logical Plan.
Solution #2: Using .select()
with an alias
Another way to achieve the same is via .select
with alias or .selectExpr()
Using .select
to add Columns
Analyzed Logical Plan
having only a single Project
node
Using selectExpr
to add Columns
Analyzed Logical Plan
having only a single Project
node
Only single Project
Node in Logical Plans in both the cases.
FAQs
Every time I explain this, there are some follow up questions that engineers ask:
Is this the only case when .withColumn
is used in for
loop?
No. The same issue happens when we use multiple .withColumn
outside loop also. We can look into the Logical Plan again to verify it.
Using withColumn
outside for
loop
Multiple Project
nodes
Should we not use .withColumn
at all then?
If the number of columns being added are fairly low, we can use it, it wouldn't make much of a difference.
But if you are planning to write a code, that you think can further be extended based on the upcoming requirements, I would recommend using .withColumns
or other 2 options.
How many .withColumn
are too many that can cause degradation?
There are no specific numbers of columns, but if you have like 100s of withColumn
with some transformation logics, chances are your Spark Job can do so much better.
How can we look into SparkUI then if this is the issue?
The issue won't be so easily visible on SparkUI, the starting point is to compare the Job Uptime
and the time taken by the Jobs in Spark UI Jobs
tab.
If all the jobs are finishing quickly but total Uptime
is significantly higher, chances are multiple withColumn
are the potential cause.
If you have read it until here, you are equipped with all the tools and tricks to identify and rectify this issue.
Got any questions or suggestions? Put it in the comments.
If you are interested in what's happening internally that degrades the performance, let's dive deeper into it.
Diving a bit deeper
To reach to a Physical Plan for execution, the entire code goes through Spark Catalyst multiple stages starting from Parsing – Unresolved Logical Plan – Resolved Logical Plan – Optimized Logical Plan.
A Logical Plan is internally represented as a tree created after parsing the sql. On a very high level, all of these stages have some set of analyzer/optimizer rules that runs in batches on top of this generated tree.
Every time a .withColumn
is used, all the set of rules related to it runs again.
Want to see how many rules run every time and how much time does this take?
Spark’s Catalyst optimizer uses an internal class called RuleExecutor
. Its dumpTimeSpent()
method can provide all such details.
When multiple .withColumn
are used
Using RuleExecutor
class to check the timing and rules run
Total Number of runs and time taken
When .withColumns
is used
Using RuleExecutor
class to check the timing and rules run
Total Number of runs and time taken
If you look at the run and the time taken in both the cases it's whopping ~96% difference in time.
Time doesn't look much in this cases because we are just doing casting but in real project scenario, these withColumn include some complex transformation or calculation logic and those further adds more rules in the Analyzer that requires running.
Lesson Learned: Although withColumn
is a lazy transformation, each use has an analysis cost.
That's it for this one! 🚀
Member discussion