4 steps for kafka rebalance
Apache Kafka is a popular distributed event streaming platform used for data pipelines, streaming analytics, data integration, and mission-critical applications.
After working with Kafka for a while, I encountered an issue of having to add new capacity and balance it several times after that. This issue can be amplified in high-scale environments, with lots of Brokers, Topics, and throughput, so it wasn’t a big surprise for me to encounter it again at Wix.
The challenge involves creating the desired balance of partitions between Brokers while also reducing the movement of data and throttling the change to minimize the effect on high throughput running clusters.
If you are here, I assume you already know the key components of Kafka, but here is a refresher:
- A Kafka cluster stores categories called Topics
- A cluster consist of one or more servers called Brokers
- Each Topic maintains a partitioned log
- A Topic may have many partitions which all act as the unit of parallelism
- Replicas is the list of Brokers that replicate the log for this partition
The good news is that this structure enables Kafka to scale pretty well. You may have one cluster with many Topics, Topics with many partitions, and on top of that a replication factor that provides resiliency.
Now while there are NOT many parameters to set up when creating a Topic (number of partitions, replication factor), there are many factors that can affect it. Concurrency, Throughput, Storage, Cloud availability zones or on premise rack placement are some of those factors. We are NOT going to dive into these aspects in this article.
Instead I would like to address the problem of redistributing the Topic:
- In case an extra capacity (Brokers) was added to the cluster
- In case of an unbalanced cluster that needs to be gracefully rebalanced
Why adding more capacity and distributing the load can introduce a problem?
Redistributing a Topic over Brokers involves data movement, because we are changing the partitions spread, hence moving subset of the data from old to new brokers. This sync requires additional network bandwidth and CPU utilization. How much extra load? That depends on how you do it - which brings us to the core question we’re trying to answer in this article:
How would you rebalance your cluster with minimum effect on producers and consumers?
Resharding partitions randomly over a new Brokers set means a complete restructure of the partitions per Topic and extensive data movement in the cluster (see option A).
You might get away with it with an idle cluster, but in production, with high throughput, this will simply overload Brokers and cause network saturation.
An alternative is to calculate the minimum changes needed in order to balance the cluster. This will result in much less data movement (see option B) in the cluster.
My rebalance process
Step I: Creating the original files
So how does it work? First stage comprises two parts:
- Get current Brokers, Topics and partitions assigned for each Topic from the zookeeper
- Generate original assignment files for each Topic
We are generating the original assignment files for two reasons:
- To note the position of the cluster scan in case the process is interrupted - that way we won’t have to start all over.
- We need the original assignment for testing later.
Step II: Gathering data to calculate the rebalance
The second stage includes running over Topics and creating several structures for later use:
- Topic as reflected in files
- Partitions per Topic
- Partitions per broker
- Replication factors per Topic
Then, we set the unbalanced Brokers (Brokers with lowest partition count). I set it as half of the median partition count, which is far enough below median and can indicate an unbalanced broker.
This value is related to the complete partition count in the cluster and NOT to a specific Topic.
The UNBALANCED_RATIO needs to be set lower, in case of fine tuning rebalance (versus the new Brokers use case).
Step III: Running over the Topics & partitions and creating migration files
Next step is to loop over the Topics while calculating several things:
- Number of partitions to change per Topic
- Brokers to reduce and Brokers to add to the Topic to improve balance
- Highest & Lowest Brokers
This will generate new balanced assignment files with minimum changes of Brokers in partitions. The files will be created in a separate directory.
Here’s how I calculated the partitions’ change per Topic:
Basically, we will run on a specific Topic, decrement any change we make from replace_count and will move to the next Topic when the replace_count is zero:
The search and replace process will scan for partitions with the highest broker count (to reduce their appearance) and won’t touch partitions that have already been affected:
Another important factor to keep track of and balance around is the preferred Broker for each ISR. This Broker is the first in the ISR and by default will act as a leader for the partition. Together With auto.leader.rebalance.enable=true (default) and leader.imbalance.per.broker.percentage set to low (10% by default) it will guarantee balanced leadership of brokers in the cluster.
This is important since producing and consuming is being done via the partitions leader, so unbalanced leadership in the cluster can cause saturation in the Brokers with high leadership count.
In order to guarantee leadership balance I am keeping a Dict: self.brokers_leadership_count that counts leadership for each Broker. Then, when replacing a Broker in the partition, I compare the count with other Brokers and in case of lower leadership count I insert the Broker as the first item in the replicas to act as preferred Broker, otherwise, I append it to the replicas:
Eventually according to my configuration:
This process will generate two directories:
The first directory will include the original assignment of Topics, each Topic represented by a JSON file of the same name. The second directory will include the same number of files and will have the same name BUT will feature the new reassignment structure, balanced.
As we can see below, we ended up with 2 directories: original and migrated,each containing a similar number of files:
The difference is the content of the assignment files, here’s an example of a diff between 2 partitions of one file:
Output of Topics partition count:
Brokers before balance:
{0: 6353, 1: 6576, 2: 6684, 3: 6636, 4: 6689, 5: 6797, 6: 6863, 7: 6968, 8: 7085, 9: 7220, 10: 6893, 11: 6932, 12: 6969, 13: 7037, 14: 6945, 15: 6776, 16: 6653, 17: 6613, 18: 6603, 19: 6621, 20: 7286, 21: 7334, 22: 7341, 23: 7285, 24: 7265, 25: 7231, 26: 7142, 27: 7085, 28: 7019, 29: 6947, 30: 6880, 31: 6893, 32: 5706, 33: 5382, 34: 5446, 35: 5408, 36: 5281, 37: 5089, 38: 4821, 39: 4823}
Brokers after balance:
{0: 6589, 1: 6588, 2: 6587, 3: 6590, 4: 6589, 5: 6590, 6: 6589, 7: 6597, 8: 6591, 9: 6587, 10: 6588, 11: 6587, 12: 6590, 13: 6587, 14: 6587, 15: 6595, 16: 6593, 17: 6596, 18: 6589, 19: 6590, 20: 6594, 21: 6593, 22: 6590, 23: 6589, 24: 6590, 25: 6586, 26: 6590, 27: 6591, 28: 6590, 29: 6589, 30: 6588, 31: 6587, 32: 6588, 33: 6587, 34: 6587, 35: 6589, 36: 6587, 37: 6587, 38: 6588, 39: 6588}
Step IV: Running tests
Now it’s time to validate that we actually produced the desired outcome…
We are testing several things:
- Test_changes_in_assignment: Test that there is no more than 1 change in each partition between original Topic balance to migrated Topic balance
- Test if_Topics_are_balanced: Test that any deviation is not higher than 15 percent, compared to the median of partition count for each broker after rebalance
- Check_Brokers_to_reduce: Check that the reduced Brokers are subset of the total Brokers list
- Test_Brokers_in_assign_files: Test that there are no replica members that are not in the broker list
- Test_that Topic_list is_not_empty: Test that Topic list retrieved from zookeepers is NOT empty
- Test that_Brokers_list is_not_empty: Test that broker list retrieved from zookeepers is NOT empty
- Test_leadership_balance: Test the deviation from median of each broker ISR leadership
Running the tests shows us some interesting things in regard to the deviation from median of partition count per broker:
Here is an example of rebalance in production using the process above.
Few notes:
- You can choose to tune the aggressiveness of the process according to the situation, slower under traffic or faster if you have the ability to shift traffic out from the DC. In this case we’re running it slowly under traffic, over a couple of days. This specific screenshot is from the middle of the process.
- You can notice clearly that the rebalance will prioritize the reduction of partitions from Brokers with a high count of partitions.
Conclusion
A perfect balanced high throughput cluster is an achievable goal.
It requires a careful calculation that involves retrieving the current state and measuring the minimum steps to achieve that balance combined with proper process management and testing.