Changes feed without hot keys

How to implement a changes feed in FDB (watching for updates to given range)?

Unfortunately FoundationDB doesn’t support setting watches on ranges (see this thread Changefeeds (watching and getting updates on ranges of keys)).

One possible implementation could be to maintain number_of_records key and update it using atomic add. Then we can setup a watch on that key so other clients can get notification when database is updated. The client would use other means to find out what was updated.

There is a concern with this implementation that the number_of_records key would be causing transaction conflicts since all changes in the range would be modifying a single key.

We can replace single key with multiple and atomically update one of them (we can choose randomly). However, this approach doesn’t seem to be compatible with watches. Since:

  • we don’t know how many keys would be representing counter
  • we would have to set watches for every key used in counter

The following idea is a bit crazy, that’s why I am asking for help to validate if this is even possible.

Let’s assume we would form a chain of changes to the range by creating keys named as “{RANGE_PREFIX} / chain / {prev_vsn}”. The value of the key would be new_vsn(). Here vsn is an FDB versionstamp. We would create these keys only when we add new entries into the range. Here is how changes feed transaction might look like:

  1. we want to get changes since given since_vsn argument
  2. we assign last_seen = since_vsn
  3. we start new transaction and pass last_seen to it
  4. iterate over “{RANGE_PREFIX} / chain /” range starting from last_seen using range operation
  • we use a “limit” parameter to control transaction size
  1. we finish current transaction
  2. we repeat steps 3-5 until we reach end of range
  3. we start new transaction and get watch_key as
    watch_key = txn["{RANGE_PREFIX} / chain / {last_seen}".format(…)]
  4. we setup a watch for watch_key (note the key doesn’t exist yet)
  5. we finish current transaction
  6. wait for watch to fire (another client would create {RANGE_PREFIX} / chain / {last_seen}" key in the transaction which adds a key into the range)
  7. cancel the watch and repeat all steps

The questions are:

  • is it possible to set a watch for non-existent key?
  • how expensive it is to cancel the watch and create a new one over and over?
  • is there a better way of getting notifications about changes in the range

What you’re effectively asking here is “how can I observe all changes to the database in the order they happened” and “how can I be notified promptly of these changes.”

To answer the first bit of your question, using the atomic add operation does not generate conflicts so long as you do not also read that key in the same transaction. You can use a snapshot read to know what it was at the point your transaction started and it will not generate conflicts.

Now the reading in order part. If you need to know what changes occurred, write them into a log structure using versionstamped operations, which you can search for on the forums. If you only care that “something” changed, you can just write the key that was changed and not include any data, or you can choose to include some data. Up to you. This will scale to the capacity of a single storage nodes’ write throughput because all writes will hit one node. You can artificially create “shards” of this log, and you can still know the total order of operations, but reading requires reading all the shards to reconstruct the order.

This is the first problem you’ll run into. If you keep your universe of what you’re “watching” small enough (say, one user’s data and a few operations a second at peak), this is fine. Watching literally the entire database, or a very large portion, is not scalable.

Now onto the “promptly being notified” bit. This is what watches are for. Watches are for optimizing a polling loop. So if you watch a counter key and poll every second as a fallback, my understanding is you will in most circumstances get the watch firing immediately and then you can fallback to the polling if it breaks for whatever reason.

I hope this helps. I understand this type of feature is desireable, but the idealized implementation where you can watch anything for changes and expect immediate updates is not scalable, which is why products like Firebase have a limit on connections and throughput. They are just running all operations through a single node. Other systems have other mechanisms, but they typically don’t have a total order over writes, which is important if you actually want to know every single thing that happened and when.

What you’re effectively asking here is “how can I observe all changes to the database in the order they happened” and “how can I be notified promptly of these changes.”

There is a third part of a question which is do both ^^ without introducing hot keys in transactions which update database.

To answer the first bit of your question, using the atomic add operation does not generate conflicts so long as you do not also read that key in the same transaction.

As I mention the concern with this approach is the key which is used for atomic updates will be a hot key updated by all write transactions which add new key into the range. Which means that performance of the whole database would be limited by the performance of a single node.

Could you explain how would you push items to changefeed?, I believe that operation would get serialized and only one writer can progress at a time.

{chain, v0} -> v1
{chain, v1} -> v2
{chain, v2} -> v3

If two processes read the current version v3 and try to write, one of them would conflict? Note that, using versionstamp in the value would not prevent conflicts on key AFAIK

@ananthakumaran I think the use of the word “chain” might be misleading here … the items in the feed don’t typically need to be linked together but merely need to produce a total ordering. Using versionstamps in the key is sufficient for that purpose.

@iilyak you lost me on step 7. Are you trying to guess at the value that a new versionstamp will take? That won’t work; the versionstamps are incrementing constantly regardless of whether any specific transaction is actually associated to a given stamp.

I don’t know how hot a key that is only being mutated by atomic operations would have to get before it becomes a bottleneck; that’s a really good question. If it did become a problem I think one could probably come up with a scheme to distribute a small, known set of keys across the subspace and watch all of those.

Or maybe you just turn off updates to that key altogether and return true anytime someone asks “did this subspace get updated”? :laughing:

@ananthakumaran
clients would have to provide the current version of the document they want to update. Writer transaction would look something like the following pseudocode. I agree with @kocolosk that name chain is not the best one, naming is hard.

@transactional
def commit(txn, range_prefix, previous_version, document):
    watch_key = fdb.tuple.pack(range_prefix, 'chain', previous_version)
    txn.add_write_conflict_key(watch_key)

    # do something application specific here
    # it would add keys into the range we want to watch
    # random is used for illusatration purpose
    key = fdb.tuple.pack(range_prefix, 'docs', random())
    txn[key] = document

    # then we update by vsn index
    key_stamp = fdb.tuple.pack_with_versionstamp((
       range_prefix, 'by_vsn', fdb.tuple.Versionstamp(),))
    txn.set_versionstamped_key(key_stamp, key)

    # finally we create the key we use in the watch (the one we added to conflict_range)
    txn[watch_key] = key # value doesn't really matter 

The transaction will fail if concurrent client creates the watch_key. This design allows:

  • traversing the keys in the range in the insertion order with the help of by_vsn index
  • abort transactions which try to modify the same watch_key (we can call it a lock)
  • we have a key to set watch on
  • the keys in (range_prefix, 'chain', *) are ordered in insertion order (because previous_version is a version stamp of previous insertion operation in the range)
  • we can use clear_range() to clear the chain after watch has been fired (if we decide to)

By changing the order in which elements are mentioned in the watch_key to the following fdb.tuple.pack('chain', previous_version, range_prefix) we can ensure that every transaction would be routed to a different range.

@kocolosk

you lost me on step 7. Are you trying to guess at the value that a new versionstamp will take? That won’t work; the versionstamps are incrementing constantly regardless of whether any specific transaction is actually associated to a given stamp.

No we are not guessing. We use previous_version as part of the key. Very good question… Maybe we can get the previous_version at the beginning of transaction (by using snapshot reads). It will always be last key in the range_prefix, ‘chain’, 0x00)range. But in this case we wouldn't be able to usefdb.tuple.pack(‘chain’, previous_version, range_prefix)` to move key from server to server.

I’ll think some more.