Shard marker for log-like data structures

I was just at the FoundationDB summit yesterday and in one of the talks the topic of how to efficiently write log-like data structures was discussed. The basic idea is that you need to introduce a shard key before a version stamp on write and then coalesce reads across all shards. So you might end up with keys that look like:

/{shard}/key/{versionstamp}

Where {shard} is a power of 2 range so that you can later split and combine shards. For example, if your number of shards were 4, then you would have a series of writes that looks like:

/0/key/{versionstamp}
/1/key/{versionstamp}
/2/key/{versionstamp}
/3/key/{versionstamp}
/0/key/{versionstamp}
/1/key/{versionstamp}
/2/key/{versionstamp}
/3/key/{versionstamp}

And then to read them you would need to read 4 prefix ranges to read the complete log:

/0/key
/1/key
/2/key
/4/key

And then interleave them based on the version stamp discovered in the key during the scan. The client also needs to maintain some statistics about their access patterns and the size of the cluster they are running on in order to pick an optimal number of shards. Wouldn’t it be great if this were something that FDB could handle itself? Basically a new placeholder in the key that is the “shard key” that isn’t managed by the client. What do y’all think about this?

Append-only workloads become an amplified problem currently because data distribution always splits shards in the middle, which is rather suboptimal for append-only workloads. If it instead had a bit of heuristics to detect when writes to a shard are always increasing (or decreasing), it could instead split shards by setting the split point to be the highest (or lowest) current key in the shard. Then future appends would go into a brand new shard, created on some other server. I’d sort of prefer to see if FoundationDB could just handle this better rather than bake workarounds into the client API.

That said, I’ve never actually seen a database implement doing this to know exactly how much of the problem it will solve, and client changes are obviously easier, solve problems now, and have a faster feedback loop.

I’m not sure why moving the hot shard between servers would make much of a difference — I suppose it means that we don’t have to move the data immediately but you are still loading a single server with the workload.

Hi Sam, which is the talk are you referring to? I presented something for same issue in one of mine - are you referring to that or something different?

Few comments on your questions:

  • simple sharding has obvious problem that you have to read All the data and then throw away what you don’t need; you can’t ‘tail’ the log efficiently.
  • moving hot shard to a new server will probably not change things, but if you switch the storage server quickly enough (not necessarily with every write), then each server can accommodate short bursts and not get saturated.

(@ryanworl’s talk, see the slides on CDC, https://static.sched.com/hosted_files/foundationdbsummit2019/63/Ryan%20Worl%20FDB%20Summit%202019.pdf)

This is for cases where you will always be reading or at least scanning the entire log. For instance, you might build a scalable queue on FDB, a transaction log for another system or in Ryan’s case, change data capture log. The pattern described here is the current best practice for building this kind of data structure on foundationdb. The proposal here is to add native support to it that can be more efficient than doing it in a layer.

Thanks. Please also take a look at our approach.

It has worked quite well for us to get efficient change feeds with very little hot spotting and option to not do full scan (where such a thing is needed).

1 Like

The underlying assumption I am making that I don’t think you or alex are making is that a single team cannot keep up with my write load. I might need 8, 16, 32 or more processes just for the writes. I think that Alex’s suggestion above would adequately handle your case though.

@gaurav I’m not sure I understand your point that simple sharding breaks the ability to tail the log. Can’t I just tail the N shards and merge them as needed to get the totally-ordered list?

I was in the other track during your talk. I think I get the gist of the bucketing algorithm though. Do you worry about the ability to consume a single stream? The simple sharding enables an easy approach to parallelism on the consumer side (at the expense of total ordering, of course).

1 Like

Hi Adam, yes I am assuming total ordering to be a hard constraint for the design in my approach. If that is not a constraint, then it would probably be a better choice to shard/partition the change feed (something analogous to kafka partitions of a topic).

Scanning and merging the shards will be doable, but could more be expensive/involved at read time - one would either read small batches of records from every shard (if they care about not reading too much data / or not retaining lot of data in memory) and repeat this till they have fetched enough, or, if they can afford to keep all new records from all shards in memory (since last checkpoint), they would then do a client side merge of n sorted streams (and possibly throw away anything extra if there is a limit parameter they need to support).

In our case, we wanted to get a strongly ordered sorted stream (ability to consume it was not a concern in this case), as cheaply as possible at the read time (we have many clients that are doing this operation every 20-100 ms, to keep some other state in sync).

1 Like

If FDB can split shards more intelligently, thereby switching the SS more quickly, it should definitely help. Only thing I am not sure is whether the shard split would happen sooner, or would SS queue saturation would happen first? Assuming that shards will be of about 64-128 MB in size, but SS queue corresponding to it might be much bigger (due to memory overheads of transaction in queue).

I went to do some back-of-the-envelope math, and forgot how abysmal sqlite is at write throughput. SQLite doing 5MB/s and a cluster accepting a conservative 50MB/s, means that by the time a storage server falls ~4s behind, it would take another 35s to catch up, even if we perfectly split immediately. (Which I think also answers @gaurav 's question above.). So even with my proposed change, gaurav’s solution would still be advantageous.

It’s sounding like redwood’s ability to do sequential inserts will be significantly better, which would then change a good amount of how this could work. I’m still sort of assuming that workloads wouldn’t literally be 100% of appending to a single queue, but I suppose Ryan’s CDC example would have been 50% appending to one large queue.

There’s still things that one could consider doing in FDB core that would make this better. What if we could assign more than one team of servers to a range of keys, and do the random assignment between them and clients would need to know to read from all and merge results? This gives us something similar to a shard key, but in configuration rather than the key. Is configuration better? Or we could allow teams of a larger size to be assigned for a queue range of keys, in which we assign keys to servers by hash. Maybe I should just go search google scholar for if people have done research on optimizations for sequential writes on ordered databases instead of throwing out random theories on the forums.

But if in the end, the best thing to do is actually to provide clients with a special sharding API, then that’s clearly better than making everyone rewrite a good sharding solution themself.

2 Likes

@alexmiller I like the automatic distribution between shards that you describe as it reminds me a lot of the consistent caching solution coming soon and would Just Work™. DD could even sort and combine them later once they are no longer hot.

I hacked together a quick sequential write test, writing to Redwood and SQLite directly using the IKeyValueStore interface, running on my MacBook.

Using 64 bit (8 byte) BigEndian integer keys, starting at 0 and counting up, and 100 byte values, Redwood can write 12.2GB of KV pairs at 48.5 MB/s, and SQLite is 31.65 MB/s.

Of course a StorageServer does a lot of other things beside use an IKeyValueStore, but it looks like we should see at least a 50% speedup in this workload based on Redwood’s current state which is not yet fully optimized in a bunch of ways.

1 Like

Disclaimer: my knowledge is getting rusty and there could be a dumb mistake below

I think the ideal solution for sequential write throughput is to get shard “splits” to happen in the write path so that any given write just goes to one storage team and doesn’t move, and each sequential 100MB or so of writes goes to a different team. Of course at extreme scale that 100MB would come in over an arbitrarily tiny interval, but as long as the write-to-read latency impact of receiving and queuing the 100MB on a given server is bearable, that is fine. (And of course you could use even finer sharding, treating the new data as temporarily hot, at the throughput cost of more data movement to coalesce larger shards as they cool)

Getting this to actually happen will be tricky! You want to do each split (as Alex says) right after the last write, so that ~no data has to move, but you also have to generate it instantly on the proxy in sequence with the other mutations so that the very next write can go to the new shard. (And not to the old team while any preexisting data is being moved, which I can’t remember if is possible now, but could be). So you would have to have all the information needed to decide and carry out the change on the proxy, and the data distribution system would have to tolerate something else making changes.

But it would probably work really well if you could work out the details. Reads would still be able to go just one place, and writes would round robin with a reasonable and adjustable granularity.

(Making the storage engine decently fast at sequential writes would also be a good idea!)

(And to finally post back an old thought I had…)

In what I had described, I meant “random assignment” as “choose one of the two teams at random when inserting”, and specifically I did not mean “use a hash function over the key to determine which storage sub-team is responsible for the key”. While this would work well for set() operations, it fails to handle atomic ops. In the case of an atomic add, there’s no way to send it to one of the two teams jointly covering a key range, and then reconcile the correct sum later – FoundationDB doesn’t have the schema information that would be required to know which byte ranges to add together when reading from both teams. This would need to work as a static hash mapping of key to storage sub-team, so that all atomic adds always go to the same team of servers. If predicate pushdown is ever to be implemented in a way that allows a predicate to be evaluated over a range of key/values, then that would spell doom for either way of doing this sub-team sharding.

While hash-based sharding over sub-teams would still help improve write bandwidth for log-like data structures, it doesn’t provide the infinite single-key write throughput that “write to any storage sub-team and read from all of them” would achieve.

And regardless, “auto-detect append-mostly workloads and split shards at the end instead of the middle” is still on the list of features I’ve always wanted in a database, but never had.

Writing logs to sequencilal files instead of sqlite with large blocks (>128kb) improves bandwith ~10 times on a typical ssd.

For example, a typical nvme ssd has 1000-2000 mb/sec write speed when the writing is sequencial with 1m blocks and has only 100-200 mb/sec when the writing is random with 4k blocks.