Fdb-zk: rough cut of Zookeeper API layer


#1

Hello! Thought I’d share something I’ve been hacking on: to get my feet wet with FDB, I made a layer that implements the bones of the Zookeeper 3.4.6 API: https://github.com/pH14/fdb-zk

I can’t say I’d recommend using it yet, but I’m soliciting feedback for next steps / design choices, and of course, contributors are welcome :wink:

The ideal goal is to offer a drop-in replacement for Zookeeper on top of FDB that can support much higher read/write volume and data size than ZK. ZK’s API is relatively simple, so it seemed like a good opportunity to show off FDB’s flexibility, and for me to learn a bit more about each system. I think it’d be valuable if you could spin up FDB, fire up Curator, and voila: scalable lock server. Or further lean into layers, and be able to replace multiple systems, like both Mongo / and that-ZK-cluster-you-need-for-other-system-X, with just FDB.

That said, let the tome commence:

K/V modeling

ZNodes on the DirectoryLayer

In terms of key/value design, a ZK node is largely modeled on top of the DirectoryLayer, which maps quite well to the problem. The DL reduces key size (at the cost of more reads), and already enforces invariants that align with the ZK API – e.g. it won’t let you create a subpath unless the parent directory already exists (no mkdir -p). The fact that the DL’s prefixes are assigned per-path also means that the full keyspace will be used, even for large directory trees. This reduces locality, but in the access patterns I’ve contemplated, is a potential win to distribute load.

ZNode stats

For an individual ZK node, the DirectoryLayer-assigned subspace is then used to store a variety of keys that map to node values. All of the stat entries, such as creation txn id, last-modified txn id, data size, etc each have their own key within the node’s subspace: directoryLayer(zk_node_path):stat_key_name --> stat_key_value. Fetching the full stat is then a simple range scan: directoryLayer(zk_node_path):stat_key*, and individual stat entries can be fetched directly.

ZNode data

The node’s data is broken into 100kB chunks, which are pushed into a keyspace that boils down to directoryLayer(zk_node_path):data:blockN --> byte array. Fortunately, ZK has a pretty low default data-size cap, so we don’t have to do any shenanigans to avoid hitting FDB’s max-data-size-per-transaction limit.

Global Transaction ID

One of the signature aspects of ZK is its global transaction id, which supplies a total ordering to all writes. To replace this, we use FDB’s versionstamp and write this value in every place that uses a ZK global id. While still monotonic (with caveats, more on this below), this differs from ZK which begins at txnid 1 and increments by 1 each time. This unfortunately means it’s not super easy to run fdb-zk against ZK’s tests which looks for +1 incremented ids.

This is enough to get the basic CRUD operations on ZK nodes going, and some things fall into place really nicely. Things like fetching children nodes is pretty trivial on top of the DirectoryLayer, since it’s already tracking children of a given path.

Watches

Implementing ZK’s watches requires a bit more trickery: if someone leaves a on-creation watch on node /locks/some-id, how do we know what path the DirectoryLayer is going to assign for that node? We don’t, so instead we leave a watch on a key: e.g. directoryLayer(watch-subspace) : /locks/some-id : on-create, and when the node is created, the layer sets the value of this key to the current versionstamp, triggering the FDB watch (note: we use a versionstamp so we can guarantee the value is different each time, since rewriting the same value to a key won’t trigger a watch). Watches on child-creation and deletion are also supported.

Client Sessions

Session handling is a bit of a WIP right now. The thought was to borrow a tip from the Record Layer, which does some cool stuff like leaving cursors inside FDB itself, and store all session information + heartbeat timestamps in another subspace. That’s what’s currently done right now, but it needs more TLC and testing. The rough schema is something like: directoryLayer(session_subspace) : client_id --> heartbeat timestamp, where client_id is derived from yet another versionstamp.

Actually running it

For now, the layer exists as a co-hosted process to an application, and the application can continue to use its preferred ZK client pointed to localhost. This allows it to work in a language-agnostic, client-only manner, similar to the design of the Document Layer. If you wanted, the process could also live as a FDB coprocessor, and clients could hit that directly instead.

┌──────────────────────┐     ┌──────────────────────┐
│ ┌──────────────────┐ │     │ ┌──────────────────┐ │
│ │   Application    │ │     │ │   Application    │ │
│ └──────────────────┘ │     │ └──────────────────┘ │
│           │          │     │           │          │
│           │          │     │           │          │
│       ZooKeeper      │     │       ZooKeeper      │
│        protocol      │     │        protocol      │
│           │          │     │           │          │
│           │          │     │           │          │
│           ▼          │     │           ▼          │
│ ┌──────────────────┐ │     │ ┌──────────────────┐ │
│ │  fdb-zk service  │ │     │ │  fdb-zk service  │ │
│ └──────────────────┘ │     │ └──────────────────┘ │
└──────────────────────┘     └──────────────────────┘
            │                            │
         FDB ops                      FDB ops
            │                            │
            ▼                            ▼
┌───────────────────────────────────────────────────┐
│                   FoundationDB                    │
└───────────────────────────────────────────────────┘

The layer sits behind a janky shim of the actual Zookeeper server, which handles all request/response IO and translates calls into FDB operations. At the moment, this is definitely the weakest component, and the thing I’ve spent the least time on. The ZK server isn’t terribly heavyweight, but there’s no reason to run much of anything except the bare essential network IO and an FDB client. The flip side: it meant not having to write ZK protocol network code… yet :sweat_smile:

Overall, I’ve gotten some Curator libs to sometimes work on it, but I’m missing something in the network / session handling code right now that can cause disconnects. I’ve also had issues running actual ZK locally, so I’m clearly doing something foolish ^^

Of course, if you’re aiming to replace ZK, a system whose purpose is simplicity and stability, you’d want a lot of testing.

Open questions (the interesting part!)

Ephemeral nodes

There are some elements of ZK functionality that are hard to emulate in a purely stateless-client environment. A key example of this is ephemeral nodes: given that clients can die at any time, who becomes responsible for cleaning up ephemeral nodes that are supposed to expire at client disconnect?

Spitballing ideas:

  • Clients periodically scan the session subspace and look for related unexpired nodes past their prime and remove them
  • Deploy a dedicated fdb-zk client whose job is for central-server-like operations
  • Deploy a fdb-zk layer coprocess on the FDB nodes themselves that are responsible for central-server ops on their local key subspaces
  • Do everything at read-time – if a client is asked to create a node that already exists, check the sessions table if it should have been expired, if so, create a new node, if not, throw an error.

Viability of versionstamps

Using versionstamps to replace ZK’s transaction id feels nice at first, but given the discussions in VersionStamp uniqueness and monotonicity, I’m not sure they’re truly a viable option. Trusting that the versionstamp will always be increasing is an important aspect of the design. Having a forever-monotonic id would be a really powerful construct.

ACLs

Full confession: despite trying to mimic ZK, I actually know very little about Zookeeper in practice. I’m not familiar with how the ACLs are used, nor how we’d want to store & enforce them.

Watch Guarantees

ZK states this about watches (from https://zookeeper.apache.org/doc/r3.3.3/zookeeperProgrammers.html#ch_zkWatches) :

Watches are ordered with respect to other events, other watches, and asynchronous replies. The ZooKeeper client libraries ensures that everything is dispatched in order.

A client will see a watch event for a znode it is watching before seeing the new data that corresponds to that znode.

The order of watch events from ZooKeeper corresponds to the order of the updates as seen by the ZooKeeper service.

I haven’t thought too much about these guarantees or looked into how they’re implemented, so I’m not sure using FDB’s watches the way fdb-zk currently does maintains the same semantics.

Misc. feedback on using FDB / Java bindings

So far I’m pretty happy at how quickly this has moved along, given that it’s 1x engineer working on it on the side, and a lot of that credit goes to the data model of FDB and the existence of the DirectoryLayer. Of course, there were a few things that came up that caused some confusion. I’m semi-hesitant to include them here since I don’t want to distract from the questions above, but, hey, here goes:

  • Versionstamps need more documentation if they’re to be a first-class feature. The interplay between the commit version, transaction batch order, user bytes, 80-bits vs 96-bits flavors is a lot to unravel. I leaned heavily on some existing forum posts like VersionStamp vs CommittedVersion, and it doesn’t seem too terrible to distill down the learnings there to some concise usage guides / API docs.

  • little-endian-only atomic operations was a surprise. All of the byte serialization was written using Guava, which uses big-endian for integer <–> byte array conversion, so this was a painful realization. Would there be interest in supporting big-endian ADD / MIN / MAX natively as well? That would be enormously helpful for many projects that already live in a big-endian world.

  • Handling versionstamps / commit versions in the Java bindings gets tricky. There have been a few posts on this, but returning a CompletableFuture that only completes after the transaction completes can require a bit of contorting to work around when that value is relevant for other aspects of the transaction / application operation.

  • You can’t cancel a transaction from within fdb.run(tr -> lambda) without throwing an exception, since it’ll try to commit once the function completes and that fails if the transaction has already been cancelled.

  • Unit testing wasn’t super simple. I pushed the whole app into its own directory subspace, so every key is actually prefixed by directoryLayer(fdb-zk). Right now the tests blow away the whole directory and start over each time, but they could ideally reassign the root directory to directoryLayer(fdb-zk-test). An embedded test server would be nice.

Phew! That was a lot. Let me know if you have any thoughts on the design choices so far and ideas for tackling the open questions (and further questions I might be missing!). I am neither ZK nor FDB expert, so I’m sure there’s a lot in my blindspots. If this is interesting to anyone… PRs welcome :smiling_imp:


(Alex Miller) #2

Oooh, fancy. I’ve been curious to hear how something like this would actually go.

It sounds like overall well for the basic data model, but then starts to become difficult when you get to the features which assume that ZK has one leader on one machine. Which is, I suppose, pretty unsurprising.

I think running a janky leader election on top of FDB to nominate one of your layer processes as the one to do the central-server-like operations shouldn’t be too hard. You then have something to do your scan and clear. When I’ve thought through similar ideas before, I haven’t liked anything client driven, because of the unreliability of clients, and pushing the work off to read time means forever increasing amounts of data in the cluster if clients never happen to try and read the right keys.

They… should. Admittedly, versionstamps interacting with DR and backup was overlooked for a while, but fixes to both will be in FDB 6.1. The only case I’m aware of that would cause versionstamps to not be monotonically increasing is if you handroll your own code to copy data between clusters, and there’s not much we can do about that.

This feels like you would need something more like change feeds than watches. I’m not aware of any promises we have on the ordering of watch events being triggered, and the API of watches says that in the case of an A->B->A update chain, we’re allowed to never send a notification at all.

I managed to hand wave out a way to be able to abuse transaction conflicts to be able to tell when data has probably changed as a way to implement change feeds on FDB, but it’s a horrible and unscaleable idea. I’m not aware of any better way to do it, than just implementing change feeds for FDB.


(Ryan Worl) #3

Has this been discussed before in a way that isn’t just the way fdbdr works? An API to directly read TLog data like “read this key prefix forward from version X”, where the client would go to the TLog that owns that range where the prefix falls, and then the TLog filters further out the data for that range.

I’ll move this out into a separate thread if it gets too far off topic. :smiley:


#4

That’s an interesting idea – it seems like it’d have to be coupled with read-repairs, in the case that a client hasn’t yet been elected. Any preferred janky leader election techniques?

I haven’t looked too deeply into the Record Layer – how does it handle a task like generating a new secondary index on existing data? That seems like a related concern.

:+1: good to know

Makes sense, I’ll chew on this one for a bit and read more into what exactly ZK is doing as well. If anyone has more familiarity here, I’m all ears.


(Alex Miller) #5

You can have your clients check leaderKey for an (ID, versionstamp), and consider that process to be the leader if its versionstamp is within 10,000,000 of your read version. Write {ID, versionstamp} to the database if the key is empty or the versionstamp was too old. Heartbeat leaderKey, and also watch it from the leader so that it is informed if it loses leadership. You’ll get re-elections every time FDB has a recovery, and you could have more than one leader for a short time, but that seems fine in your use case.

See Best way to add an index on already-existing data?, which is Alec answering basically that.


(David Scherer) #6

We once prototyped a coordination system (not a ZK emulator, but with similar capabilities) on top of FDB that (as best I can recall) used the following solutions for these problems:

  1. Ephemeral nodes: We kept an index of ephemeral nodes by their expiration time. Scanning the index makes it easy to locate the old nodes that should be cleared, and you can also check at read time if you want expirations to be more precise. This system kept its own global clock cooperatively by having everyone read the clock, wait one second, and write it to clock+1 transactionally; only one such transaction will succeed each second. I think we made the client that incremented the clock a short term “leader” responsible for clearing ephemeral nodes for that second. An alternative would be to use the cluster version, which occasionally jumps forward but is monotonic and usually runs at a predictable rate.

  2. Watches: I think that system had FDB-like watch semantics (watches are asynchronous, reads are consistent). I think the only plausible way to reproduce the exact ZK semantics efficiently in a leaderless system is to keep indices something like:

(“active_watches”, zknode, clientId, watchId) := ()
(“triggered_watches”, clientId, versionStamp) := watchId
(“watch_watch”, clientId) := incrementing_value (or versionStamp, or whatever)

When you are writing to a zknode, you range read and clear its active_watches list and move each one into the appropriate place in the triggered_watches index. You also increment watch_watch for the affected client. Clients can quickly range read (and clear) the triggered_watches index to get an ordered list of events. Clients (fdb) watch the “watch_watch” index to avoid polling, and also check triggered_watches before doing any consistent read.

Dave


#7

Thanks for the added context Dave, that’s very helpful.

The clock-based election make sense – do you still see this pattern as a good solution here? My hesitation with it would be the amount of write traffic that targets the c+1 key if there are a lot of clients (maybe have only a certain % of clients vie for a given time slot). And maybe it doesn’t become a problem – one thing I don’t have good intuition for yet is the relative cost of a read vs write vs read conflict vs write conflict, and when a certain # of each to an individual key becomes problematic in FDB.

The watch schema you outlined looks a lot like what I’ve been sketching out after Alex’s change feed suggestion, so that’s promising :slight_smile: