Understanding "Watches"

I am trying to understand watches by creating a toy service discover layer in Java. Basic idea is as follow

I have a Subspace(service, ip), when instances of service come up they will register themselves by doing a set on the subspace

t.set(example_service,127.0.0.1)
t.set(example_service,127.0.0.2)
t.set(example_service,127.0.0.2)

Any process that needs to discover how many instances of a particular service are up and running will put a watch on the service and when watch is trigger do a range query to get all the ips

t.watch(example_service)
t.getRange(example,service)

I am running into following issues when trying to implement this in Java.

  1. It seems like I cannot put watch on parent subspace (example_service in this case). I have to put watch on exact key
    `example_service,127.0.0.1

  2. In java binding result of the watch in a Future, and watch is trigger when future completes. Which means that I only get notified of the first change on that key. In my usecase I want to get notified everytime a new ip in added under example_service. Not sure if its possible to accomplish that?

  3. Finally, to mimic this example on my laptop I had two different transaction, one starting the watch and second triggering the watch by updating the key. This doesnt seem to work as the future from the watch never completes. Only way i got it to work is when both watch is created and key is updated in the same transaction. How would this work when in real application when watch and update will happen on different machines?

Watches are only supported on individual keys, not on ranges. If you want to track changes to a range, one way is to have a separate key that you modify every time the range gets modified. This key could store a value that’s incremented with atomic adds or that includes a versionstamp. Then you could watch this key to know when the range is updated. There’s a good bit of discussion about this in this thread.

The contract is a little different than this. Instead of the first change, you’ll be notified when the key has changed, assuming it stays changed. It’s possible the key may have undergone multiple changes by the time you are notified, and it’s also possible in the case of an ABA update (one where the value changes from A to B and then back to A) that you may not be notified at all. If you use atomic add or versionstamps as I described above, though, then you don’t have to worry about the ABA problem.

In order to be notified about future changes after a watch fires, you’ll want to set a new watch in the same transaction as the one where you read the range of keys being watched.

The transaction that creates a watch needs to be committed in order for the watch to be set. A watch that isn’t committed can only be triggered by modifications that happen in the same transaction as the watch. Is your watch transaction being committed?

Hello, also I would like to ask my question about watches, I got a problem when watch is never fires, I explained it in stack overflow https://stackoverflow.com/questions/53415446/watches-never-trigger-in-foundationdb, is anybody have an idea about it? Thanks.

I have the exact same problem as @ArchDev. I assume that when we can db.runAsync(…) it will take care of committing the transaction however the future return by watch never finishes

Ok, I see. I just replied to his question on stack overflow, but I can duplicate the answer here as well:

I think what’s going on here is that your transaction creating the watch is never completing. It’s maybe not obvious from the documentation, but runAsync won’t return until the CompletableFuture returned in your function is ready. Because you are returning the watch future and not changing the value until after the transaction, it’s never becoming ready and the transaction never ends.

If you replaced runAsync with run, I think it would work:

   CompletableFuture<Void> watchExecuted = db.run(tr -> {
      tr.set(key, new Tuple().add(1).pack());
      return tr.watch(key);
    });

If you wanted to use runAsync, then you would need to return your watch future wrapped in another object.

EDIT: or rather, if you want to use runAsync, you could return a CompletableFuture<CompletableFuture<Void>>:

CompletableFuture<CompletableFuture<Void>> watch = db.runAsync(tr -> {
  tr.set(key, new Tuple().add(1).pack());
  return CompletableFuture.completedFuture(tr.watch(key));
});

If i return CompletableFuture<CompletableFuture<Void>>, I get the exception below when I try to complete the inner future. I guess it sort of make sense since by returning CompletableFuture<CompletableFuture<Void>> we are forcing db.runAsync(..) to commit the transaction however inner future still have reference to now closed transaction?

java.lang.IllegalStateException: Cannot access closed object
	at com.apple.foundationdb.NativeObjectWrapper.getPtr(NativeObjectWrapper.java:85)
	at com.apple.foundationdb.FDBTransaction.getPtr(FDBTransaction.java:554)
	at com.apple.foundationdb.FDBTransaction.watch(FDBTransaction.java:477)

Would you mind posting a snippet of code for your transaction? That might make it easier to see what’s going on.

Is there a typo in the edited code snippet: should it be runAsync instead of run ?

Yeah, I should have used runAsync and also changed the return type. I’ve fixed it now.

So the code below works. My actual code is in Scala in which i transform the future returned from watch to Scala concurrency primitive and thats whats resulting in the error i posted. I will dig into that more myself as its unlikely related to FDB

import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.tuple.Tuple;

import java.util.concurrent.CompletableFuture;

public class WatchTest {

    public static void main(String[] args)  {
        final Database db = FDB.selectAPIVersion(600).open();

        final CompletableFuture<CompletableFuture<Void>> future = db.runAsync(t -> {
            t.set(Tuple.from("key").pack(), Tuple.from("value").pack());
            CompletableFuture<Void> watch = t.watch(Tuple.from("key").pack()).thenAccept(u -> System.out.println("watch fired"));

            return CompletableFuture.completedFuture(watch);
        });


        db.run(t -> {
            t.set(Tuple.from("key").pack(), Tuple.from("value2").pack());
            return null;
        });

        final CompletableFuture<Void> watch = future.join(); //outer future completes
        watch.join(); 

    }
}