Log abstraction on FoundationDB

What is the most efficient way to model a log abstraction like Kafka or Amazon Kinesis on top of FoundationDB?

So far I’ve come up with is create N keys to be used as atomic counters to keep track of the current tail of the log, where N is the number of parallel shards (partitions in Kafka). Within a transaction, atomically increment the counter for the shard you’re writing to, then write the keys in the form of “LOG_NAME/SHARD_ID/OFFSET” where offset is the value from the counter. The value at that key would be the value written at that position in the log.

Reads of this structure are pretty obvious so I won’t bother outlining them.

Is there a more efficient way to do this using a part of the FoundationDB API I haven’t tried yet? I saw something about versionstamps which could establish ordering, but I’m not sure how to actually use those.

Additionally, that key structure would potentially produce hot keys if they all share a prefix. Is that a problem in practice such that putting the Shard ID first in the key would help? The counter keys would be updated pretty frequently as well, which is the point of sharding them in the first place.

1 Like

Typically you would keep ID generator keys in a separate namespace that is related to the dataset they are tracking, but not necessarily a parent in the key space of the things being mapped. Then use the atomic to increment the ID generator during the insert transaction.

You could use the version stamps on the ID generator keys, but you would still have to “touch them” during the transaction.

Does this help?

There may be a way to do this implicitly, but I’m not aware of it and I’m three years out of date with the current codebase

So for instance, have a key, [TimeSeriesID, “IDGenerator”] that generates identifiers for a particular sharded time series, then store the actual time series data under another tuple

[TimeSeriesID GeneratedTimeSeriesID… your data]

The TimeSeriesID’s are your ShardID’s if you want to look at it that way.

Maybe an FDB person can say if you can control the policy of how a specific key space is actually sharded physically rather than logically. I dunno if that is possible.

Versionstamps are pretty much exactly for this. You don’t need a separate “generator” key. set_versionstamped_key() will automatically fill in the desired part of your key with a monotonically increasing value derived from the version number at which your transaction commits. It is up to you to order the writes within each transaction.

set_versionstamped_value is for building secondary indexes of your log.

1 Like

Yes, the assumption here is the data in question already has a defined order (and can be written as such) and it simply needs to be made available in a more convenient form for use in a different system or for long term storage.

Thanks for the guidance, this sounds pretty trivial now!

Perfect, that’s good to know and super useful

This changed my design for the read side a bit.

Using the Python tuple API I get keys that look like this:

\x01foobar\x003\x00\x00\x00\x0b<\xf1\xc4\xa0\x00\x00\x00\x00

In order to read from start to finish within this foobar stream, I can use the get_range API to read the earliest key within that prefix pretty easily with something like \x01foobar\x00 as the starting key, \x01foobar\x01 as the ending key, and a limit of 1. Technically that would be hidden from me because I’m using the Python tuple API, but I just want to understand what I’m actually doing here.

Reading past the first key I’m not as sure about. Should I just take the first key I read and keep reading until \x01foobar\x01 with a suitable limit as my batch size? Or is there a smarter way to go about finding the “end of stream” than just adding 1 onto the end of the key?

Yep, that’s the right idea (that you can read until you reach \x01foobar\x01. The subspace class abstracts some of that away from you (see: https://apple.github.io/foundationdb/api-python.html#subspaces), so you can do something like s = fdb.Subspace(("foobar",)) ; tr.get_range(last_key or s.range().start, s.range().end, limit=batch_size). Under the covers, it’s scanning from \x01foobar\x00\x00 to \x01foobar\x00\xff (that is, everything that is “strictly within” the subspace–it is strictly greater than \x01foobar\x00 and less than \x01foobar\x00\xff). You could alternatively scan through \x01foobar\x01 and it would work the same way.

We just finished implementing a Log Layer that handles compaction of large blocks of log after a while (to benefit from better compression ratio), and this is definitely non-trivial to do ! (took 3 full days of work, from scratch).

Our log Layer does support efficient insertion of small batches of logs into a “incomplete” arena, and when there is a sufficient quantity of logs, compacts a large batch of logs into a single chunk that goes into a “complete” arena, taking advantage of better compression ratio. Also, “complete” chunks are immutable, and can be cached as long as readers want (while incomplete chunks are not to be cached)

So something like:

  • (..., 'counter') = (total_log_count, incomplete_log_count)
  • (..., 'complete', ID0000) = compressed[chunk[1000 logs]]
  • (..., 'complete', ID1000) = compressed[chunk[1000 logs]]
  • (..., 'complete', ....) // <-- new compressed chunks are appended here
  • (..., 'incomplete', ID2000) = chunk[2 logs]
  • (..., 'incomplete', ID2002) = chunk[10 logs]
  • (..., 'incomplete', ID2012) = chunk[1 logs]
  • (..., 'incomplete', ID2013) = chunk[5 logs]
  • (....,'incomplete', ....) // <-- new logs are appended here

of course, ‘counter’, ‘complete’, ‘incomplete’ etc… are not stored as strings but as small integers to reduce key size!

The counter key is used to decide whether or not adding X more logs to the incomplete arena would go over the self-imposed limit of 1000 logs, and compact all the partial chunks into a single (or more!) compressed chunk. It will also be used by subscribers to create watches and be notified when stuff happens on this log.

Readers can use the fact that compressed chunks have the same ID as the first log they contain, so if they want to read log #1234 to log #1304 (which are compressed) GetRange(Last_Less_Or_Equal(ID1234), FirstGreaterThan(ID1304)) will return only the single chunk ID1000 which contains all the rows I want.

On paper, this looks simple to implement, especially the writer:

  • Maximum Value Size of 100,000 bytes, so a compressed[chunk[...]] cannot take more than ~100KB, which when compressing if very difficult to target
    • the trick of splitting the chunks into multiple keys with ,0, ,1 suffix is not possible here, because it breaks the invariant of 1 chunk = 1 key that is exploited to do efficient range reads of logs (using key selectors will not be possible if we don’t know how many ‘parts’ a chunk is split into, and would require additional reads, or to read a lot more data than needed)
  • Maximum Transaction Size of 10,000,000 bytes, which means that bulk insertion of existing log files is also an issue (need to reset the transaction every N batches that are unpredictible in size)
    • the issue is: do some work to pack a chunk, check the final size, and discover that it will be more than the budget per transaction, so throw away everything, rollback the state to before, commit, reset, etc…
  • We chose to have the writer responsible of adding new logs and compacting the data in a single transaction, so that we don’t need to have any other background thread (that could fail, slow down, etc…).

The reader is less complex:

  • spin two concurrent GetRange(…) on both subspace with Last_Less_Or_Equal(ID) and merge both results into a single list of results.

In the end, this is doable, and it works well, BUT this is somewhat complicated, with two nested loops

  • outer loop that spins new transactions and do “up to X MB of writes” per transaction to stay under 10 MB budget
  • inner loop that has to decide how many logs can fit into a a chunks that after compression will not exceed
    100 KB
  • inside the inner loop, some complex code that must decide to pack or not, and take into account leftovers from previous incomplete appends, etc…

I was initially planning to use VersionStamps for ID, but since the writer has to decide to pack or not pack, it needs to know how many incomplete logs are already there, so since it needs to read at least one key, it can also read the counter and create nicer looking sequential intergers.

I could have done a non-deterministic writer that only attempt to pack on some probability (1 in 10?) because I don’t care if I have a little bit more than 1000 unpacked logs. BUT there may be a subtle issue with this:

  • if the writer takes the non-deterministic path of packing the logs, and there’s some kind of bug (in the code, due to the more complex query, timeouts etc…) it will retry
  • if will then then take the no-packing code path which will not fail.

After a while, I will end up with all writers ending the retry chain with a non-packing result, and will end up with a huge ‘incomplete’ arena that is less than optimized (especially if the rest the code assume that it will be ~1000 logs only and uses some O(N) algorithm on that).

1 Like

I had a similar idea for compaction of messages which operated in a similar way, except there would be a third layer which expelled the compressed data into a file that is written to an object store to keep the data size small on the relatively expensive SSDs that are running the database. The database only exists to perform the sequencing of messages and temporary storage until the majority of messages are written to object storage on the order of minutes or seconds. Further compactions can happen at the object storage layer. Basically like a distributed LSM tree. This way you can operate systems in batch mode that work soley with the object store which can read at X00MB/s without issue and keep ultra low latency steaming from impacting each other. I think Pravega is a system that operates similarly.

For my compacting idea, each stream would have a single process elected to perform the compactions and it would always start from zero and read forward. Versionstamps work fine with this because the consumer API would not allow for read between X and Y, just read X + N messages. Efficient reads are easy as you outlined already, just choose the minimum key that exists before your desired key and your desired offset is somewhere in that compressed data.

Because the compression is all performed by a single writer per stream, it can easily decide to recompress a chunk of data if it exceeds 100KB by including a smaller number of messages until it fits. It could even use some basic statistics to detect the compression ratio of the data in the stream to know how many messages should work to avoid doing extra work as the stream grows.