Keyspace partitions & performance

I’m trying to better understand the FDB performance model relative to something like HBase / BigTable. With an HBase table, a lot of attention has to be paid to generating even / relevant region splits at table creation + hashing the row keys for good distribution + potentially salting heavily hit rowkey ranges, all in order to avoid hotspotting individual region servers. Do those same types of locality considerations carry over to FDB?

Some questions that come to mind:

  • if there were a single key that’s inundated with write requests, what kind of trouble could that cause overall? Does it degrade performance of the replicas that own that key, or could it push them over entirely? What does it mean for the rest of the cluster?

  • same question as ^ for but a read request deluge – suggests some small degree of load balancing, but how do the nodes degrade if the replicas aren’t sufficient?

  • if an application generates sequential keys but never needs range reads, is it worthwhile to hash the keys preemptively for even distribution? e.g. rather than storing (subspace, sequential_key), instead store (subspace, hash(sequential_key)) when the application only needs multi-gets.

  • when FDB is shuffling data to rebalance, how does it choose which keys to shuffle? Does it do it on a per-key/subspace/some-other-boundary basis? suggests some level of continuous movement, but it’s not clear how those decisions are made.

  • suppose there’s a write-intensive application that’s writes heavily to particular subspaces at a time. If / when does it become worthwhile to split those writes across multiple subspaces?

I’m sure I’ll have more questions as I continue to dig in… very excited to see FDB alive and well again :sunny:

1 Like

We automatically split shards that are receiving too much write bandwidth. We cannot split a single key into multiple shards, but the storage servers responsible for that hot key will give their other shard responsibility to different servers to balance the load.

Read load balancing happens on the client. In triple replication every key is held by three different storage servers. The client keeps track of the latency for each of those servers and sends the request to the least loaded server (the actual algorithm is slightly different than what I just described, but accomplishes the same result). If one key is really hot, the storage servers that responsible for that key will get heavily loaded. Clients will know those servers are heavily loaded, so are read requests that has the option to avoid those servers will go somewhere else. In practice this does an amazing job of keeping all the servers utilized.

There are definitely use cases where you may want to hash keys preemptively. In fact, our backup and DR work by storing a copy of the mutation log into the system keyspace. The key is the sequentially increasing version of the database, so to avoid overloading one team of storage servers with half the write bandwidth of the cluster we hash the version and put that in the key like you described. In our case we still want to do range reads, so we actually hash version/1e6, but the concept is the same.

FDB attempts to divide the database in shards of roughly equal size (both in terms of bytes and write bandwidth). If it notices that one ranges of keys has too much data, it will split it into smaller ranges, and each range will get assigned to a new set of storage servers. For each range, the algorithm starts with 4 random candidate teams of storage servers, and then assigns the range to the one that has the least amount of data. This algorithm converges to an optimal distribution while avoiding the herding effects of always attempting to move data to the least loaded servers.

Our data movement algorithm does not react instantly to changes in the workload. If all of your writes are directed at a new subspace every 30 seconds you will run into performance problems, and you should consider splitting the writes. If the writes are changing subspaces every hour it is probably okay.


Ah thank you etschannen, that’s enormously helpful!

Few follow-ups…

This is really cool :sunglasses: Which metrics are used client-side to determine how loaded a storage server is? What happens if this still isn’t enough and one storage server is overloaded? Is there any active load shedding, or could it fail over?

Is there an ideal shard size that FDB targets / how is the total number of shards determined? On each recalculation cycle, do all identified splits get redistributed / is there a limit to how much can shuffle at once?

Hmm how is an optimal distribution being defined here? If write bandwidth is factored into shard size it seems like that could vary significantly over time.

OOC why divide by 1e6?

By prefixing hash(version/1e6) to the key instead of hash(version), all versions in a range of 1 million would be mapped next to each other. So, this scheme spreads chunks of 1 million versions across shards but still every million versions are range readable. Actual key stored on database would be something like this hash(version/1e6),version.

1 Like

The actual algorithm the client uses is to attempt to keep an equal number of read requests outstanding to each storage server. In addition the client is allowed to send a read to multiple storage servers simultaneously 5% of the time. Basically the latency of the read requests themselves tells up which servers are too loaded. Here is a link to the source if you want to look at the details:

If a storage server is still overloaded read latencies will continue to grow longer and longer. The storage server itself prioritizes answering read requests over getting more writes from the tlogs, so it will start to fall behind. This will be noticed by ratekeeper and we will slow down the rate at which we are giving out read versions to clients which will give the storage server room to catch back up.

For large databases we target shards between 125MB and 500MB. We ramp up to that size based on the amount of data in the database, so that small databases still do some level of sharding. We are monitoring the sizes of each shards individually, and storage servers notify the data distribution algorithm once a shard size changes by more than 5% from the previous estimate. The storage servers use sampling to keep estimates of how much data is in each shard. We are asynchronously notified of these size changes, so there is no recalculation cycle, each shard is split or merged as we get new size estimates. We do limit the number of relocations that can happen simultaneously, basically there is a queue of split and merge requests and we process them depending on how busy the source shard is. There are a lot of details here, status reports how much data is queued for movement and how much data is currently in flight, which is one of the important things to monitor in an FoundationDB cluster.

Write bandwidth is only taken into consideration for when to split shards, it is not accounted for when chosen a new destination team. This does mean if you have a workload that is re-writing existing keys it is possible to get unlucky and have multiple small but high bandwidth shards assigned to the same team.

The database attempts to advance versions by 1 million versions per second, so this algorithm changes the prefix every second.

1 Like

Aha, thank you. I think I first parsed it as hash(version) / 1e6 which was confusing me.

Also good context! If it isn’t already, this might be worth documenting.

Thanks for the detailed summary of the rebalancing, super helpful information.

The million versions a second thing is controlled by the VERSIONS_PER_SECOND knob, specified in Knobs.cpp.

Because it is a “knob”, you can actually change it by passing different flags to fdbserver processes. For example, you could pass --knob_versions_per_second=100 to fdbserver on startup, and then it will only advance the commit version by 100 every second instead of 1,000,000. We regularly do stuff like this in simulation to test whether we really can change knobs.

I also wanted to mention this because it is part of the internals of the database, so relying on its behavior as a kind of fuzzy clock (for example) could be dangerous, and it could theoretically change in a future release without a lot of warning (because it is outside the contract of the database). The only guarantee we do provide about versions is that it will monotonically increase with time, i.e., that after we have committed a version v, all transactions that begin after the commit time of v will be assigned a read version greater than or equal to v. This guarantees what we call causality and what many people call linearizability.

changing knobs can break the database!

Setting the MAX_VERSIONS_IN_FLIGHT to be less than MAX_READ_TRANSACTION_LIFE_VERSIONS will break consistency.

Changing VERSIONS_PER_SECOND without adjusting CORE_VERSIONSPERSECOND to the same value will break backup and DR.

Changing it without adjusting MAX_READ_TRANSACTION_LIFE_VERSIONS and MAX_WRITE_TRANSACTION_LIFE_VERSIONS will cause the resolver or storage server to run out of memory respectively. Even if you got all of that right, a value of 100 is too low, and under load the master will be forced to give out more than 100 versions per second, making the 5 second transaction lifetime much less. When we buggify knobs in simulation we make sure to respect these hidden relationships between knobs.

Alloc was not actually recommending you try this, just letting you know it was possible. I would not recommend changing knobs on production databases unless you really know the full implications of the change.

Oh, yeah.

In any case, yeah, this wasn’t a serious suggestion for a production use case. I just did it on a local fdbserver and watched the commit version go up way slower when repeatedly setting “foo” to “bar”, which is about as comfortable as I’d be playing with knobs without really thinking about what I was doing.