Building scalable log streaming on FoundationDB

Hi everyone,

I’ve been thinking a lot lately about a FoundationDB use case that’s probably crazy, but since I can’t shake it out of my mind, I want to share it here to hear what you think! I apologize for the long post, but I need to give some context first :slight_smile:

Background

I’m working on a data platform targetting end-to-end data science use cases (if you’re interested: https://beneath.dev). Long story short, on the data storage side of things, the idea is to bundle three common types of data systems, and add a layer of abstraction on top that handles schema and data sync between them in a (slightly) opinionated way. The three systems are:

  • Log replay/streaming (e.g. Kafka)
  • Key-indexed range reads (e.g. FoundationDB)
  • Columnar OLAP (e.g. Presto on S3)

Writes go to the log first, and we then handle replication/CDC to the other systems. We need to support multi-tenant workloads with tens of millions of “collections” (which map to a “topic” in Kafka, a “key prefix” in FoundationDB, and a “file” in S3), which can scale to tens of TBs each.

The problem

My question is about two related problems:

  1. Kafka scales pretty poorly to millions of topics and consumer groups
  2. Syncing millions of topics between Kafka and a KV-store is also a little difficult

Our naive solution

Those problems aren’t the end of the world. We intend to address (1.) by running multiple Kafka clusters and sharding workloads (we’re also considering Apache Pulsar in the future, which seems more scalable, albeit still limited), and we tackle (2.) by first writing everything to one massive topic, and then fanning out records from there to the actual topics and the KV-store.

My idea

Instead of using Kafka, I’ve been thinking about a possible implementation based on FoundationDB. There’s not one great reason, but a few good ones:

  • We could theoretically scale to an infinite number of topics on one cluster (they’re just a key prefix)
  • We get rid of a dependency, leaving only FoundationDB and S3 to manage for storage
  • We could transactionally combine log appends and index writes (and get rid of a lot of sync code)
  • We get strong consistency for indexed reads (not a strong requirement, but unlocks new use cases)
  • We get more flexibility to add some advanced features later on (e.g. efficient filtered log subscribe)

Of course, there’s one good reason not to do it. We’d have to implement a new log streaming engine on FoundationDB! I know that’s previously been suggested on this forum, e.g. this excellent post by @ryanworl, but I haven’t heard of anyone who did it in production including the networking layer (consumer group management, low-latency push, caching of recent writes, …).

Still, the benefits has made me tempted to tackle it with a basic implementation we can optimize over time.

The implementation

This is how I currently imagine implementing the log storage side of things:

  • Topics are split into N partitions and each partition get a UUID
  • We have a broker for each partition that batches writes over ~10ms and writes compressed segments containing multiple records
  • Segments are written to (partition_id, starting_offset)
  • The broker pre-allocates offsets using atomic increments on a key ("offsets", partition_id) (it should aim to increment at most every 10 seconds)
    • It doesn’t transactionally increment offset on every write to avoid excessive reads/writes
    • If the broker dies, there’ll just be a minor gap in the log’s offsets
    • It doesn’t use versionstamps because integer offsets have nice properties (log length estimation, parallel range reads, backwards paging)

Then additionally, per our index sync use case described above, each write to the log would also transactionally update the separate key-value index, which has a key (topic_id, record_key) for every record (where record_key can be assumed to have a roughly uniform distribution).

My questions

  1. Is there any way we can scale out to achieve on the order of the same ingest throughput as a dedicated log storage engine (like Kafka) with this setup? Are there any specific bottlenecks that come to mind?
  2. Overall, is this mad – or actually a great FoundationDB use case?

I’m looking forward to hear what you think. And regardless, thank you for the awesome work with FoundationDB!

2 Likes

I think the proposed approach should scale well. There are no hotspots that I can detect in it, and writes are well spread out across partitions. Additionally, as you are batching the writes, it limits the iops. Caching of counter probably makes the transactions write-only, thereby eliminating conflicts and also saving network round trips to get a version stamp.

1 Like

Thanks for your feedback, @gaurav :slight_smile: Sounds promising! I’ve got a couple more questions:

  1. You say not using versionstamps in the keys saves a round trip – did I understand that correct? I was actually under the impression that FoundationDB could populate versionstamps in keys on write without a roundtrip.

  2. I know it can be hard to say without benchmarking, but do you have a gut feeling for the overhead of using FoundationDB for this kind of partitioned append-only workloads (assuming optimized batching of writes) versus a dedicated replicated log like Kafka or Pulsar (BookKeeper)? I’m assuming FoundationDB’s transaction coordination and indexing adds overhead, but do you reckon it would be on the order of 2x, 5x or 10x lower throughput?

I afraid that with small numbers of nodes fdb gives smaller performance than kafka.

Fdb is optimised mostly for random kv-access, when kafka is optimised for sequencial access. Usually sequencial access is much faster than random one (~ 30-100 times).

Of cause, scalability can compensate less performance by adding more nodes to the cluster.

Hey @osamarin, yeah, there’s no doubt the performance is going to be lower, but do you still think it’s going to be that severe given:

  1. Our multi-tenancy use case, where we’re expecting hundreds to thousands of separate “logs” (key prefixes) per node, and not just a handful (as e.g. Kafka is optimized for)
  2. Reads on a specific “log” (key prefix) will be range reads (which I would expect that FoundationDB translates to roughly sequential disk access, but I would love to learn if that’s not true)

Our multi-tenancy use case, where we’re expecting hundreds to thousands of separate “logs” (key prefixes) per node, and not just a handful (as e.g. Kafka is optimized for)

Yes. Having million of topics with few data in each is a pattern where FDB wins.
But if you have only a few topics with huge amount of data in each, than kafka wins.

Reads on a specific “log” (key prefix) will be range reads

  1. It depends on you data structure. You can read the index (topic_id, record_key) by range, but you still need to read the contents by record_key.

  2. Write/insert performance may also be important.

I would expect that FoundationDB translates to roughly sequential disk access

It is true, but FDB reads always by 4K blocks and does not batch them. Reading by 1M chunks would be significally faster.

1 Like

Fdb clients get the versionstamp in a transaction on first read operation. If a transaction has no reads(or read conflicts to be precise), then versionstamp will not be fetched.

One more aspect that should be considered with fdb is cleanup of older data. These would translate into roughly same amount of work and io as adding new data. Moreover the data being cleaned up will most likely be fetched from disk in 4K blocks prior to being deleted. This also leads to resizing and rebalancing of shards.

We have in the past run into lot of such issues related to performance issues when data cleanup kicks in which took fair amount of effort to resolve.

With Kafka, it is a matter of file truncation.

But if you are talking about 100k+ topics each potentially with multiple partitions, then operationally Kafka might be much more difficult to maintain. Btw, does Kafka automatically rebalances partitions across brokers on failures? When I last worked with it, I recall that rebalance was a manual step.

1 Like

Thanks for all the input!

I believe this would be addressed by storing compressed log segments directly at (partition_id, starting_offset), so there wouldn’t be a secondary lookup.

This is a very good point and could be a showstopper. I’ll investigate more.

Got it, thanks for clarifying that!

Interesting. Clearing data is important as we would not be storing infinite log history in FoundationDB. But they would be range clears of (partition_id, from_offset) to (partition_id, to_offset), spanning many MBs to a few GBs.

The documentation here states that range clears are “ultra-efficient”. But if I understand you correct, even range clears will consume significant resources after the fact to actually release the disk space?

On broker failure, Kafka will failover to a replica. But other than that, you’re right that rebalancing is a manual step.

I think I might have understated how unrealistic it seems to use Kafka for that many topics – hence this discussion! Pulsar is more suitable because brokers are stateless, but it’s a heck of a dependency.

Range clears incur no additional client-side latency. I wouldn’t call them efficient, though. They incur a debt that will be cleaned up asynchronously. There is a tipping point at which you won’t be able to clean pages fast enough, and your disk usage will grow faster than you can clean pages.

3 Likes