One idea I’ve been noodling on a bit recently is how to run continuous partition assignment on top of FDB. e.g. you have some resource (a message queue topic like in Kafka) with a bunch of partitions, and a bunch of consumers, and you want to assign each consumer a roughly even number of partitions to work on.
The approach I’ve seen most often in the wild is to use something like Zookeeper-or-similar to track the clients via ephemeral nodes, and then either use a dedicated leader instance or run a leader election among the clients to pick one that will look at the active participants and assign out the partitions. Another approach I’ve sometimes seen is the feeding-frenzy, where each client just tries to claim locks on a certain number of partitions.
Depending on the circumstances, these and other approaches can be fine, but it got me thinking about how you might do this on FDB.
Couple of general requirements:
- The partition assignment is on-going, and should tolerate the addition and removal of clients over time
- We should minimize the time that two clients may consume the same partition simultaneously, but don’t make an effort to entirely eliminate it
We’ll need a few definitions:
-
Resource definition key:
resource_key --> # of partitions
. Clients are assumed to know which resource(s) they’re interested in consuming in advance. We can stuff more metadata into the value here, but to start merely knowing the number of partitions in the resource is fine. -
Tracker for the current assignment round:
resource_key : current --> {assignment_versionstamp}
. This gets rewritten each time we need to reassign partitions. It is watched by all active participants. -
Current assignment subspace:
resource_key : assignment_{assignment_versionstamp} :
. This is used by the clients to mark their participation in the current assignment round. -
Reassignment Settle Time (RST). How long clients wait to check who is participating in an assignment round. This can be measured in ticks of the versionstamp (we’re relying on some relation between ticks and actual time: Keyspace partitions & performance)
-
Minimum Reassignment Interval (MRI). The minimum amount of time to wait between reassignments, to avoid continuous reassignment. Also measured in ticks of the versionstamp
-
Heartbeat Interval (HBI). Mandatory interval to trigger reassignments to handle failed nodes. Also measured in ticks of the versionstamp.
When a node wants to join:
- Read
resource_key : current
to find the current assignment round versionstamp.-
If current versionstamp is <= RST-distance away, there is an active reassignment happening, and the client should try to join it.
- Write
resource_key : assignment_{assignment_versionstamp} : {client_versionstamp}
to mark participation in current assignment- If
{client_versionstamp}
is <={assignment_versionstamp} + RST
, we made it in! - Otherwise, the client missed out on this assignment and can wait for
{assignment_versionstamp} + MRI
to pass and retry.
- If
- Write
-
If the current versionstamp is > MRI-distance away, the client needs to trigger a new assignment
- Set
resource_key : current
to the latest versionstamp – triggers a watch on all clients, notifying all current clients that they need to participate in a reassignment. - Write
resource_key : assignment_{new_assignment_versionstamp} : {versionstamp}
to mark participation in reassignment round. - Set a watch on
resource_key : current
- Clear
resource_key : assignment_*
subspace - If this whole transaction fails, some other client beat it to triggering a new reassignment. This means it can just retry to join the new-active round.
- Set
-
When a node’s watch is triggered:
- Set another watch on
resource_key : current
- Read
resource_key : current
to find the latest{assignment_versionstamp}
- Write
resource_key : assignment_{assignment_versionstamp} : {versionstamp}
to mark participation in latest round.
For each participating node:
- Wait for
assignment_versionstamp + RST
to pass and then read the wholeresource_key : assignment_{assignment_versionstamp}
subspace. Each node that wrote a client versionstamp between[assignment_versionstamp, assignment_versionstamp + RST]
is participating. They can then use some agreed-upon strategy to divvy up the partitions. One simple possibility is round-robin, where the ordering of each client is determined by the position of their versionstamp in the subspace (if you wrote the lowest versionstamp, you’re participant 1, etc). Each client knows how many partitions there are, so they can each calculate which partitions they can consume.
To remove failed nodes:
- Clients will trigger a reassignment round at
{assignment_versionstamp} + heartbeat
, if none comes sooner
Possible extensions:
-
In some cases, partition reassignment is expensive and we want to minimize the amount of shuffling that occurs each reassignment round. Clients could write their currently-consumed partitions as values when registering themselves in
resource_key : assignment_{assignment_versionstamp}
. If there are no new participants (each client had work from a previous round), the clients can skip reassigning partitions. If there are new participants, the assignment strategy could be updated to move partitions to the new entrants with the fewest possible moves. -
The # of partitions + intervals + assignment strategy could all be stuffed into the
resource_key
value, which all participants then watch. This would allow for updating these values, as each change would force a new assignment round with the new parameters.
I think this ought to work. Am I overcomplicating things?
Also I originally wrote this to lead up to some questions about watches, but then I just discovered Performance characteristics of using Watches for Distributed Task Scheduling