Spark Delta-lake merge slowness due to skewed partitions
Note
This issue pertains to open-source version of Spark and delta lake. The Databricks version has an entirely different execution plan that is pretty much immune to partition skew related issues for the merge operation. Spark 3.3 and Delta-lake 2.2 are used.
Merge Operation
This is a delta-lake operation to merge one data set into a delta table. The syntax looks as following. The delta_table1 is partitioned and the partitions are uneven.
val delta_table1 = DeltaTable.forPath("path to delta table 1") // Partitioned
val table2 = ..... a dataframe //partition or not doesn't matter here
delta_table1.merge(table2),"join condition .... filter conditions")
.whenMatched().updateAll()
.whenNotMatched().insertAll().execute()
The delta-lake API, converts this syntax into a series of operations as below
== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
AQEShuffleRead (17)
+- ShuffleQueryStage (16), Statistics(sizeInBytes=1161.3 GiB, rowCount=1.34E+8)
+- Exchange (15)
+- SerializeFromObject (14)
+- MapPartitions (13)
+- DeserializeToObject (12)
+- SortMergeJoin FullOuter (11)
:- Sort (5)
: +- ShuffleQueryStage (4), Statistics(sizeInBytes=23.9 GiB, rowCount=1.45E+6)
: +- Exchange (3)
: +- Project (2)
: +- Scan ExistingRDD mergeMaterializedSource (1)
+- Sort (10)
+- ShuffleQueryStage (9), Statistics(sizeInBytes=1161.7 GiB, rowCount=1.34E+8)
+- Exchange (8)
+- Project (7)
+- Scan parquet (6)
.... <others parts of the plan are removed> ....
(11) SortMergeJoin
Left keys [2]: [<join key1>, <join key2>]
Right keys [2]: [<join key1>, <join key2>]
Join condition: <any filter condition>
(15) Exchange
Input [99]: [<all the columns in delta_table1>]
Arguments: hashpartitioning(<partition column as defined for delta table1>#6447L, 105), REPARTITION_BY_COL, [id=#1309]
.... <others parts of the plan are removed> ....
This includes 2 main steps,
- A full outer join between the 2 tables, stage 11
- Rebuild delta_table1 using the defined partition schema, stage 15
Rebuild the partitioned delta table
This is the last step of the merge operation. After the 2 sets of data been merged, the result data is repartitioned as per the partitioning strategy defined for the delta table being merged into. This process involves shuffle of data across partitions, but that is not the slowest part. The slowest part is that it uses only 1 task per partition. If delta_table1 has only 10 partitions then only 10 tasks in parallel will run even though there are more than 10 CPUs assigned to the job. When those partitions are heavily skewed, the tasks for large partitions will take much longer than others, hence full capacity of the cluster won’t be used.
As depicted below, not all CPUs will be equally loaded and more CPUs than the partitions can’t be used. The following example will take 182 minutes to finish and can only use maximum 6 CPUs.
Solution
More and even partitions
The solution is to create more partitions so that more CPUs can be used in parallel and use a partition key that reduces the skew in size. Sometimes it is not possible to create even partitions but we can always create more partitions.
The following partitioning schema will result in the job time to be 45 minutes that is about 4X faster.
Partition strategy
Ideally the partitions should be aligned with business use case and the read pattern so that the data is naturally filtered while reading.
However for this specific purpose of improving the merge perfrmance you can choose any partitioning strategy as long as there are more and even partitions. You can use a hash mechanism to evenly divide data.
Small file problem
Small partitions which are already smaller will even get more smaller. This can’t be avoided fully but can be managed using Spark 3’s adaptive query execution parameters “spark.sql.adaptive.coalescePartitions.*”. This documentation will help choose those parameters.
What didn’t help
Adaptive query execution mechanism
Spark 3’s adaptive query execution mechanism tries to solve the large file problem using the parameters spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled and spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor. But these parameters help the join operation by breaking large partitions into smaller ones to favor parallelism. However unfortunately these parameters don’t help with the rebuilding a partitioned delta-table.
spark.databricks.delta.merge.repartitionBeforeWrite.enabled
Enabling this parameter also didn’t help though documentation suggests that it might.
Databricks magic
Interestingly Databricks generates an entirely different plan and it doesn’t suffer from this slowness. It uses semi-join rather than a full outer join and entirely skips the rebuil of delta-table. I wish this is released to open source delta lake and Spark sooner than later, but until then you can use this trick.