Transaction.Watch Examples?

I am trying to determine how to use the Transaction.Watch capabilities of FoundationDB. Here is my thought on a use case.

I have been using the Postgres Notify functionality to push events directly from the database layer to n many subscribing stateless Golang services. The libpq driver exposes this notification as a golang channel which will receive all events whilst connected. This allows any of the golang services to trigger Postgres notify events and they are published to all the golang listeners.

In my mental model I had imagined that something similar could be replicated with FoundationDB’s watch capability by watching a specific key which had the value of the latest updated key (both updated in a single transaction):

Update value:

[queuename][timestamp] = [payload]

Update Watch key:

[watches][queuename] = [queuename][timestamp]

Then once the FutureNil is resolved run a transaction to get the value from the [watches][queuename] and use it to get the [payload] from [queuename][timestamp] and reset the watch on [watches][queuename] for future changes. The payload would then be pushed in to a buffered golang channel. Unless somehow I could use the transaction that is updating the value to also do this work (which may be possible using the fdb.Transactor) i feel that events may be missed. I have not been able to find any examples (even pseudocode) of how to practically use the Watch capability.

Am I on the right track?

Your intuition is correct that watches cannot be used to provide a stream of all mutations to a key, as there could be some mutations that slip through undetected. You can see some details for how a watch works here, but the basic idea is that it is guaranteed to fire at some point after the value of the watched key no longer matches the value of the key as seen in the transaction that set it. It won’t tell you what the new value is or how many times it changed, and it won’t remain in effect after it’s fired, meaning you’ll have to create a new watch if you want to be notified of further changes.

I’ll outline another approach here which could be used to accomplish what you’ve described. The idea is that you’d maintain some sort of per-queue counter that gets updated every time an item is added to a queue and then set watches on those counters. Your listeners would start up and process all events currently in the queue (or ignore them, whatever you want it do), remembering the last item that it processed. In the same transaction, you would also set a watch on the queue counter, noting its current value.

When the watch is triggered (successfully or with an error), you would check whether the value of the counter has increased, process any new events in the queue, and set a new watch on the counter, again in the same transaction.

Thus, your listeners would be notified every time the queue has changed from the state they last remembered, and then they would be responsible for picking up where they left off. Your producers would need to update the counters each time an item was added to the queue, probably using the atomic ADD mutation.

There are a few things to note about the above scheme:

  1. If there is a high rate of mutations to a queue counter, the counter may need to be sharded over multiple keys so that a single key isn’t getting hammered.

  2. You may need some mechanism to clean up events once they have been processed by all listeners.

  3. You need to be careful about how you do your reads to avoid unnecessary conflicts. With the above scheme, you’ll probably want reads of the counter keys to be snapshot reads so that simultaneous queue insertions won’t interfere with listeners. You’ll still know about simultaneous updates because the watches you set will fire if the value of the counter doesn’t match what was read in the transaction. You’ll similarly want to avoid setting conflict ranges past the last item in the queue that you process, which can be done by reading the queue with a snapshot range read and then manually applying conflict ranges later (either one for each item, or a single range that spans the items you processed). As long as your queue model guarantees that new items are inserted at the end, this should be safe. However, if you have multiple producers for a queue using timestamps for ordering, you’ll need to be certain that items aren’t inserted out of order due to clock sync issues. Otherwise, you may need to extend your conflict ranges beyond the end, perhaps based on some fixed length of time you’re willing to allow items to come in late.

If you haven’t already, you might want to check out the section on watches in the Developer Guide, which has some amount of code. It is watching multiple keys rather than using a key to detect a change in a queue, but that’s about the same idea. That appears to be our only example of watches (really) in the docs, so maybe that could be improved.

You definitely should be able to use fdb.Transactor to set a watch in the same transaction as the one where you read the new data from the queue. That will close up the gaps in which you could miss updates if you had separate transactions for the two.

As @ajbeamon points out, you have to be careful when doing queue stuff that you aren’t accidentally appending things in the wrong order (or make sure that you can handle the out-of-order-ness). They don’t play well with the Tuple layer in go, but you might want to look at the versionstamp API to use transaction commit ordering instead of time to get around some of those issues. There are other issues related to queues due to the non-idempotency of queue appends, but maybe that’s a good question for another time.

One other note about watches is that you can only have a limited number (100,000 by default) of outstanding watches at a time. So, if the number n of watchers is small, that shouldn’t be a big deal, but if it’s large, you could hit this limit.

Thanks both of you.

I think that building a queue system with an offset per golang service feels safe so that if there is a race condition and they do miss a read between receiving the watch event and establishing the new watch the data is still available and can be caught by the next event. Periodic log compaction seems like a good idea too.

I’ll have a bit of a play and see what can be done.

I find the developer guide examples a little weird as well. It gets at the idea that you use a watch to replace the sleep in a polling loop that would work but be inefficient and slow. But I feel like the classic simple case is that you have a transaction in this loop, and you watch the key within the transaction but wait on it outside, and that’s not really the pattern in the example.

the limit of 10000 watches which is set by default seems quite small (in case for example when you are planning to keep watch for each online user for example), does this limit caused by some performance limitations, or it is ok to increase it in config?