Apache Spark Partitioning
How to get a handle on those out of control partition counts
A big consideration that goes into designing any Spark job is ensuring that the data it will use, from start to end of any job, is partitioned correctly. Why does it matter at all to consider it? At the end of the day it all boils down to performance. See what happens below when data is not partitioned correctly.
Since the data is not partitioned correctly, there are now over a million tasks (one task for every partition) to complete before the Spark job can complete. That means over a million times Spark must load something into an executor and serialize its results. All of the overhead that goes into that usually ends up causing a job to take longer than it should have or even not finish at all. In addition, we may be blocking the progress of others’ jobs when we are making our own Spark job take longer than it should.
So what can be done do to prevent or fix the shown partitioning problem? While many things can be done about it, as mentioned here, there are usually two main things which can be done: partitioning the data correctly before a Spark job is started and using
reparation when necessary during a one. I will cover both solutions but we need to understand the environment our Spark job is working in before we get there.
Before even attempting to improve the partitioning scheme of the data used for a Spark job, two metrics about a Spark job’s working environment should be collected beforehand.
- How many executors will be used?
- How many CPUs will each executor have?
While there are other metrics, these are what I find to be the two most important metrics to be considerate of when improving a Spark job’s partitioning scheme. Why these metrics are important is because they give round ball estimate of how many partitions (tasks) there should be worked on at any given time while a Spark job is running. Even when using a dynamic allocation of executors, for people that may be running on YARN, it is a good idea to settle on an acceptable number of partitions to be worked on at any given time otherwise a Spark job’s performance may degrade for no reason.
The current tuning guide gives an estimate of how many partitions should be worked on at any given time, 2–3x per executor CPU available, however, as mentioned here, it can be as many as 3–4x. So what does this mean exactly when setting up a Spark job? Let’s take an example. If a Spark job’s working environment has 16 executors with 5 CPUs each, which is optimal, that means it should be targeting to have around 240–320 partitions to be worked on concurrently. This translates to around 240–320 tasks in the Spark UI as seen below.
Now if you’re wondering why not use fewer partitions than the recommended amount is because doing so incurs a performance penalty similar to using too many partitions: using fewer than the recommended amount leads to a Spark job under-utilizing all of the computing power it has available running slower than it possibly could.
One thing people forget to do when solving any partitioning problems is that they can be prevented even before a Spark job runs by ensuring that the data used is partitioned correctly to begin with. In fact, even using
repartition can be avoided if this is done. How many partitions should the data used for a Spark job be partitioned into? It boils down to following three guidelines.
- How many executors will be used for a Spark job?
- How many CPUs will each executor have?
- Approximately how large is each data partition?
Notice how questions 1 and 2 are exactly the same questions asked earlier? That’s because the data used for a Spark job should have a partition count equal to the amount of tasks (partitions) being worked on during a Spark job at any given time. This will ensure, unless partitions are removed or generated during the course of a Spark job, that the optimal amount of partitions is being worked on.
So where does question 3 come into play? It overrides the earlier rule of thumb by making sure that each partition of data used should have a size of around 128 MB as seen below. This means that even if the partition count of the data used by a Spark job matches the earlier rule of thumb, fewer or more partitions may need to be used.
Why aim to have each partition of the data be sized around 128 MB even though it may result in fewer or more partitions than the recommend amount? Besides there being a physical 2 GB limit on partition sizes, there’s no requirement to have each partition of data be of a specific size. However, it has been shown empirically that using partitions sized around 128 MB allows a Spark job’s to achieve optimal parallelism: using very large partitions prevents shuffles from taking a minimal amount of time (whole partitions are moved during shuffles) and using very small partitions results in extra computing time being used in setting up and saving the results of partitions (time to process one large partition < multiple smaller partitions).
We’ve optimized the partitioning of the data used for a Spark job before it runs but it is reporting that intermediary jobs are using data that is partitioned to 10–20x of the optimal amount. What to do now? That’s when
repartition come into play: two methods that allow the repartitioning of data to a particular partition count. Both methods have small quirks so I will demonstrate both using the following example.
Coalesce repartitions data by merging partitions that are localized on the same executor (node), which enables it to be faster than
repartition, to a requested partition count. An example of it can be seen below where
colleges is coalesced down to 2 partitions. Colleges that were in the same partition before
coalesce was used, such as UC Berkley and University of Texas, are still in the same partition afterwards.
The biggest concern about using
coalesce is that it does not shuffle or rebalance the data being repartitioned across the requested partition count. This particular behavior has the unfortunate side effect of generating very large partitions, from time to time, leading to OOM or other strange issues that may not have occurred in a Spark job before using
coalesce which might be avoided entirely using
repartition. In addition, additional partitions cannot be created through
coalesce (makes little sense to make more partitions when a partition’s data must come from a single executor (node)).
Repartition repartitions data by redistributing it equally, unlike
coalesce, across a requested partition count. An example of it can be seen below where
colleges is repartitioned down to 2 partitions. Unlike in the example with
coalesce, a college does not necessarily stick, such as UC Berkley and University of Texas, with another college they were partitioned with beforehand as
repartition will ignore any previous partitioning schemes in order to balance data across a requested partition count.
Repartition does not have any of the problems
coalesce has in resizing the amount of partitions that a Spark job may currently be working on. So why is not always use
repartition? The only reason not to always use
repartition over coalesce is that it will, with 99% certainty, shuffle data to ensure that is balanced across a requested partition count. Outside of that, there are no major things to be considerate of using
repartition, however, this can be very expensive for when repartitioning very large amounts of data meaning
repartition should only be used when
It’s unfortunate that the Spark team does not put much emphasis on partitioning as they should as it can make or break a Spark job. What ends up happening is most people just use the default settings in regards to partitioning causing Spark jobs to become partitioned incorrectly and thus making some jobs appear impossible to complete when in reality they can be completed quite quickly.
Please add feedback so I can improve it and share it out so it can help others. I also have another post about checkpointing which is also very good to read if dealing with Spark performance problems. Thank you for taking the time to read this post!