Hi everyone,
I’ve been thinking a lot lately about a FoundationDB use case that’s probably crazy, but since I can’t shake it out of my mind, I want to share it here to hear what you think! I apologize for the long post, but I need to give some context first
Background
I’m working on a data platform targetting end-to-end data science use cases (if you’re interested: https://beneath.dev). Long story short, on the data storage side of things, the idea is to bundle three common types of data systems, and add a layer of abstraction on top that handles schema and data sync between them in a (slightly) opinionated way. The three systems are:
- Log replay/streaming (e.g. Kafka)
- Key-indexed range reads (e.g. FoundationDB)
- Columnar OLAP (e.g. Presto on S3)
Writes go to the log first, and we then handle replication/CDC to the other systems. We need to support multi-tenant workloads with tens of millions of “collections” (which map to a “topic” in Kafka, a “key prefix” in FoundationDB, and a “file” in S3), which can scale to tens of TBs each.
The problem
My question is about two related problems:
- Kafka scales pretty poorly to millions of topics and consumer groups
- Syncing millions of topics between Kafka and a KV-store is also a little difficult
Our naive solution
Those problems aren’t the end of the world. We intend to address (1.) by running multiple Kafka clusters and sharding workloads (we’re also considering Apache Pulsar in the future, which seems more scalable, albeit still limited), and we tackle (2.) by first writing everything to one massive topic, and then fanning out records from there to the actual topics and the KV-store.
My idea
Instead of using Kafka, I’ve been thinking about a possible implementation based on FoundationDB. There’s not one great reason, but a few good ones:
- We could theoretically scale to an infinite number of topics on one cluster (they’re just a key prefix)
- We get rid of a dependency, leaving only FoundationDB and S3 to manage for storage
- We could transactionally combine log appends and index writes (and get rid of a lot of sync code)
- We get strong consistency for indexed reads (not a strong requirement, but unlocks new use cases)
- We get more flexibility to add some advanced features later on (e.g. efficient filtered log subscribe)
Of course, there’s one good reason not to do it. We’d have to implement a new log streaming engine on FoundationDB! I know that’s previously been suggested on this forum, e.g. this excellent post by @ryanworl, but I haven’t heard of anyone who did it in production including the networking layer (consumer group management, low-latency push, caching of recent writes, …).
Still, the benefits has made me tempted to tackle it with a basic implementation we can optimize over time.
The implementation
This is how I currently imagine implementing the log storage side of things:
- Topics are split into N partitions and each partition get a UUID
- We have a broker for each partition that batches writes over ~10ms and writes compressed segments containing multiple records
- Segments are written to
(partition_id, starting_offset)
- The broker pre-allocates offsets using atomic increments on a key
("offsets", partition_id)
(it should aim to increment at most every 10 seconds)- It doesn’t transactionally increment offset on every write to avoid excessive reads/writes
- If the broker dies, there’ll just be a minor gap in the log’s offsets
- It doesn’t use versionstamps because integer offsets have nice properties (log length estimation, parallel range reads, backwards paging)
Then additionally, per our index sync use case described above, each write to the log would also transactionally update the separate key-value index, which has a key (topic_id, record_key)
for every record (where record_key
can be assumed to have a roughly uniform distribution).
My questions
- Is there any way we can scale out to achieve on the order of the same ingest throughput as a dedicated log storage engine (like Kafka) with this setup? Are there any specific bottlenecks that come to mind?
- Overall, is this mad – or actually a great FoundationDB use case?
I’m looking forward to hear what you think. And regardless, thank you for the awesome work with FoundationDB!