Sketching out a partition assignment strategy

One idea I’ve been noodling on a bit recently is how to run continuous partition assignment on top of FDB. e.g. you have some resource (a message queue topic like in Kafka) with a bunch of partitions, and a bunch of consumers, and you want to assign each consumer a roughly even number of partitions to work on.

The approach I’ve seen most often in the wild is to use something like Zookeeper-or-similar to track the clients via ephemeral nodes, and then either use a dedicated leader instance or run a leader election among the clients to pick one that will look at the active participants and assign out the partitions. Another approach I’ve sometimes seen is the feeding-frenzy, where each client just tries to claim locks on a certain number of partitions.

Depending on the circumstances, these and other approaches can be fine, but it got me thinking about how you might do this on FDB.

Couple of general requirements:

  • The partition assignment is on-going, and should tolerate the addition and removal of clients over time
  • We should minimize the time that two clients may consume the same partition simultaneously, but don’t make an effort to entirely eliminate it

We’ll need a few definitions:

  • Resource definition key: resource_key --> # of partitions. Clients are assumed to know which resource(s) they’re interested in consuming in advance. We can stuff more metadata into the value here, but to start merely knowing the number of partitions in the resource is fine.

  • Tracker for the current assignment round: resource_key : current --> {assignment_versionstamp}. This gets rewritten each time we need to reassign partitions. It is watched by all active participants.

  • Current assignment subspace: resource_key : assignment_{assignment_versionstamp} :. This is used by the clients to mark their participation in the current assignment round.

  • Reassignment Settle Time (RST). How long clients wait to check who is participating in an assignment round. This can be measured in ticks of the versionstamp (we’re relying on some relation between ticks and actual time: Keyspace partitions & performance)

  • Minimum Reassignment Interval (MRI). The minimum amount of time to wait between reassignments, to avoid continuous reassignment. Also measured in ticks of the versionstamp

  • Heartbeat Interval (HBI). Mandatory interval to trigger reassignments to handle failed nodes. Also measured in ticks of the versionstamp.

When a node wants to join:

  • Read resource_key : current to find the current assignment round versionstamp.
    • If current versionstamp is <= RST-distance away, there is an active reassignment happening, and the client should try to join it.

      • Write resource_key : assignment_{assignment_versionstamp} : {client_versionstamp} to mark participation in current assignment
        • If {client_versionstamp} is <= {assignment_versionstamp} + RST, we made it in!
        • Otherwise, the client missed out on this assignment and can wait for {assignment_versionstamp} + MRI to pass and retry.
    • If the current versionstamp is > MRI-distance away, the client needs to trigger a new assignment

      • Set resource_key : current to the latest versionstamp – triggers a watch on all clients, notifying all current clients that they need to participate in a reassignment.
      • Write resource_key : assignment_{new_assignment_versionstamp} : {versionstamp} to mark participation in reassignment round.
      • Set a watch on resource_key : current
      • Clear resource_key : assignment_* subspace
      • If this whole transaction fails, some other client beat it to triggering a new reassignment. This means it can just retry to join the new-active round.

When a node’s watch is triggered:

  • Set another watch on resource_key : current
  • Read resource_key : current to find the latest {assignment_versionstamp}
  • Write resource_key : assignment_{assignment_versionstamp} : {versionstamp} to mark participation in latest round.

For each participating node:

  • Wait for assignment_versionstamp + RST to pass and then read the whole resource_key : assignment_{assignment_versionstamp} subspace. Each node that wrote a client versionstamp between [assignment_versionstamp, assignment_versionstamp + RST] is participating. They can then use some agreed-upon strategy to divvy up the partitions. One simple possibility is round-robin, where the ordering of each client is determined by the position of their versionstamp in the subspace (if you wrote the lowest versionstamp, you’re participant 1, etc). Each client knows how many partitions there are, so they can each calculate which partitions they can consume.

To remove failed nodes:

  • Clients will trigger a reassignment round at {assignment_versionstamp} + heartbeat, if none comes sooner

Possible extensions:

  1. In some cases, partition reassignment is expensive and we want to minimize the amount of shuffling that occurs each reassignment round. Clients could write their currently-consumed partitions as values when registering themselves in resource_key : assignment_{assignment_versionstamp}. If there are no new participants (each client had work from a previous round), the clients can skip reassigning partitions. If there are new participants, the assignment strategy could be updated to move partitions to the new entrants with the fewest possible moves.

  2. The # of partitions + intervals + assignment strategy could all be stuffed into the resource_key value, which all participants then watch. This would allow for updating these values, as each change would force a new assignment round with the new parameters.

I think this ought to work. Am I overcomplicating things? :slight_smile:

Also I originally wrote this to lead up to some questions about watches, but then I just discovered Performance characteristics of using Watches for Distributed Task Scheduling :open_mouth:

An alternative I have tried which admittedly would not allow for very flexible assignment algorithms (sticky being the most obvious) is to make a key per resource, then store a versionstamp in the value along with the ID of the node who owns it.

Each client does a snapshot range read for the keys they are interested in acquiring locks from, then picks a few at random to acquire. I think you also need to add back read conflict ranges for the specific ones you acquired, but I don’t remember exactly what I did. Then you use SET_VERSIONSTAMPED_VALUE to write the new version plus your own client ID into the keys you’ve chosen.

I don’t think you’re overcomplicating it for the scenario where you want more control than just random over the assignment algorithm. My use case was fine with random.