FoundationDB

Performance characteristics of using Watches for Distributed Task Scheduling


(Christophe Chevalier) #1

I’m currently designing a new layer that needs to coordinate work items, and distribute them among a set of worker servers, and I plan to use Watches for the notification between producers and workers.

Unfortunately, the design requires a lot of works (thousands) and even more unique “queues” (hundred of thousands). Each unique queue can have work items queued, and each item must be processed by a specific worker.

Note: I’ve already looked at the code in the client and storage node actors to see how Watches are handled, but I’m not sure yet about the performance characteristics

Example Scenario

Let’s say I want to create the next YouTube.

I would have thousands of file servers that have large video or image files upload by many users. Each user as a queue of files to convert/transcode on demand and work must be coordinated among all the file servers (a process like ffmpeg will read from the file on local disk).

  • Each user can only have one file transcoded at the same time in the cluster (the next file in the queue)
  • Files are physically located on a specific server’s hard disk, so only that server can transcode the file
  • Servers have lots of core, so can transcode multiple files at the same time
  • Files for a single user can be stored on multiple servers.

I have a set of subspaces that store the description of each work item, and also a set of queues with “work orders” stored (using VersionStamps as ids to keep ordering).

My plan is to have each server having its own key that is watched. When a producer schedules a new workitem in a queue, and if this queue is empty, then it can touch the corresponding server’s key to wake it up.

image

In this example:

  • User A has three files: UserA_1.mp4, UserA_2.mp4, UserA_3.mp4 that are stored on several servers (1 and 2)
  • Server 1 can start working on UserA_1.mp4 and UserK_3.mp4 concurrently. It cannot work on UserB_7.mp4 yet because User K has other files before in the queue.
  • Server 2 can start working on UserB_1.mp4, but must wait for Server 1 to finish before working on UserA_2.mp4 and then UserA_3.mp4.

When a server finishes working on a file, it can remove the corresponding entry from the queue, and check for the next entry.

  • If there is none, it can stop.
  • If there is another work item in the queue,
    • either the next file is located on the same server, so it can continue locally
    • or the next file is located on another server, and the current worker must wake up the other worker so that it can pick up the work.

I also need to support cancellation of work items (user deletes a file, a file is flagged and must be removed, the account is suspended/deleted, etc…)

  • for works that are in second position or more in the queue, simply remove them.
  • for works that are in first position in the queue, need to send a command to the server to stop working on this

Notification of servers

Each server will watch a key, that is used to notify it must work on something immediately.

Probably the keys will be (..., "servers", SERVER_ID) = #### and I will use an atomic add operation (so the value is opaque)

Question #1: I will have thousands of such keys, which will be stored in the same subspace so probably store on the same storage node (or maybe two if the subspace spans two shards). Will this create too much load on that particular fdb storage node that will have tons of watches ?

Question #2: In what circumstances can a watch become “dead”? If a server has a callback on a Watch, but for any reason it dies (during a cluster event) or the client does not receive the message when the watch triggers, is it possible for the thing to deadlock? Do I need an additional polling system that checks things every X minutes, and rely on watch only to shorten the latency?

To tell a server that it must work on a file, I cannot simply set its key with a workitem id, because watches are async, and by the time the server wakes up and check the key, I could have overwritten it with another workitem id.

I need a queue of notification for each server that contains commands “work on this file”, “stop working on this file”. With a Versionstamped key, this looks trivial to do.

(..., "server_workqueue", SERVER_ID) = #### // <-- watched key
(..., "server_workqueue", SERVER_ID, <Versionstamp>) = 'Please work on file UserX_Y.mp4`
(..., "server_workqueue", SERVER_ID, <Versionstamp>) = 'Stop working on file UserX_Y.mp4 !!!`

My fear is that if the server crashes or freeze for whatever reason, I could end up sending many messages to the queue, and with nobody to consume them, I could leak a lot of keys in there (until the server restarts or is declared dead and the queue is purge).

Also, how to I handle the case when a server is frozen, I push a command “start working on file 1”, more commands for files 2 to N, and then queue a work item “abort working on file 1”. When the server unfreeze, it will see the first command, start working on file 1, even though later in the queue, there is a stop order.

How can I construct a command queue that allows cancelling commands, without requiring the consumer to read the entire queue before processing commands?

Failure Detection

I also need to be able to handle failure of a worker. In my case, I don’t need to handle retrying a work item, since the file was on the same server that crashed and won’t be available anyway. I just need to skip over to the next item in the queue that is on a different server.

I’m not sure how I can achieve this without having another set of servers that monitor the health of the workers. I would like to be able to do this in a distributed fashion, with a set of servers that periodically “check” the state of the other servers, and can detect deadlock/failures and intervene.

My idea is that each server would periodically update a “keep alive” key, as well as store somewhere the list of work items that it is working on, like (..., TIMESTAMP, SERVER_ID) = "". Every polling_interval / 2, the server would clear this key, and create a new one. Now, “checkers” would get_range(begin: (..., 0), end (..., NOW)), and all results would be server that have died.

(..., "server_watchdog", <SECONDS_SINCE_EPOCH>, SERVER_ID) = ''
(..., "server_tasks", <SERVER_ID>, <FILE_ID>) = 'started at TS, currently at X%'

I think that even if multiple servers do the same check, they would not stomp on each other (at worse, they would conflict).

Question #3: is this the best approach, or are there any simpler solutions?

Performance of Watches

Watches are created on the client, and there is a max numbers of them per database handle. Right now, I only need one key per server, so I don’t think I’ll have issues there.

But watches are also handled by the storage actors, which as an AsyncMap<Key> which contains all the keys that are watched somewhere. Looking at the code, each Set or ClearRange mutation must check this map for a watch (and trigger it if found).

Since all my watched server keys will be in the same subspace, it is highly probably that they will be in the same shard (or maybe across two shards), so handled by the same storage node.

Question #4: Will this impact negatively that storage node if it has a lot of keys in the AsyncMap? The current implementation uses std::map, but I don’t know what is algorithm complexity looks like. Does this scale to thousands / tens of thousands of watch? It is queries for every key mutate in the shards handled by the storage node, so even if it is O(1) it would become O(N) for inserting N keys in the db…

Looking at the code, I’m not sure what happens when using double/triple redundancy. If a watch is set on a key that is replicated on 3 storage nodes, are the 3 nodes all checking the same watch? Or only one of them check all the watches? Or the watches are load balanced across the 3 storage nodes?

What happens when a storage node dies/restarts? Or if the shard is moved somewhere else?


(Alex Miller) #2

I went and read the watch code, and thankfully there isn’t too much of it.

Watches get load balanced in the same way that reads do. If a storage server has too many pending watches, then it will cancel the watch and indicate to the client that it should poll for changes instead of having an outstanding request against the server. This is controlled by the MAX_STORAGE_SERVER_WATCH_BYTES knob.

If an fdbserver dies, you’ll end up getting an error back from the future that is your watch, and you’ll need to queue up another watch, which will then be against a (hopefully) live different fdbserver. If packets are dropped from the server to the client, the connection will break, which will show up as a failure in a similar way then.

Either a timestamp per item claimed by a worker or a timestamp per worker with the items it has claimed seem like the two ways you could possibly store “liveness” in FoundationDB. If you see too much contention on servers doing the same check, you could use the same tricks that the high contention fdb queue uses.

I mean, you’re correct that checking for watches is done inline with loading mutations into the versioned tree we can server reads from. I would hope that the shedding of watches done by MAX_STORAGE_SERVER_WATCH_BYTES is sufficient to prevent you from completely grinding a storage server into the ground via watches to some degree, but it wouldn’t surprise me if there exist some pathological cases here. :confused:

I think my other answers have covered most of the remaining questions, but in short:
Yes, watches are load balanced across copies of the data, and it only goes to one of them.
Most failure cases you’re concerned about will show up as an error in the watch future, which you’ll then need to initiate another watch.


(Christophe Chevalier) #3

Thanks!

So the way I see it:

  • if an error occurs somewhere, the client should see an error, but it can happen after some timeout expires (socket timeout, watchdog timer that cancels a list of pending futures, etc…). This could be an issue if the number of messages that can be published within this timeout is large enough to fill up the receiver’s inbox… Thankfully in my case I don’t anticipate a lot of events per second per queue, but this is something to keep in mind

  • there are a lot of different error codes that can happen related to watch, and it could get a bit tricky writing the correct code for a watch loop that handle all the cases (false positive triggers, retrying, etc…)

I am in a position where the thread that wakes up when a watch fires can quickly determine that it was a false positive and nothing was changed, so this is a bit simpler to implement. But again this means that you need at least some experience with the behavior of watches to write correct code…

Hopefully someone has already been through all the corner cases and could enlighten us :slight_smile:


(Rinat Abdullin) #4

It also appears that the upcoming FoundationDB release will contain improvements applicable to watches:

Performance: Load balancing temporarily avoids communicating with storage servers that have fallen behind.

and

Fixes: Watches registered on a lagging storage server would take a long time to trigger.


(Christophe Chevalier) #5

I have a working prototype, and the performances looks correct for now, on a small dataset.

Mini benchmark

The test creates a backlog of 5,000 workitems, scheduled on 500 uniques queues (10 per), and scheduled accross 10 workers.

A few remarks:

  • the 10 workers are running inside the same process, so they share the same client network thread
  • the cluster runs locally, with 4 process (configure ssd single)
  • each work item is immediately completed, the goal is to measure the overhead of the scheduling.

This is the worst-case scenario, where work items for each queue are randomly distribued among the 10 workers, so the next work item for each will almost always be on a different server (so a command must be sent).

The red line represents the initial version that opens the Directory Layer on every transaction, and each worker process commands sequentially.

For the blue line, the worker process commands for different queues concurrently, but serialize items per queue.

The green line uses cached Directory Subspace instances, in order to reduce the number of reads at the start of each transactions. Execution time is almost twice as fast as the red curve.

All three cases are linear, but we can clearly see the impact of not caching the directory subspace prefixes.

Unfortunately, the code does not handle timeout and detection of dead servers, so I expect the runtime to go up again…

Sample transaction logs:

Once a watch is trigger, callback will pop the next command from the server queue:

┌  oper. ┬─────────────────────────────────────┬──── start ──── end ── duration ──┬─ sent  recv ┐
│ 0   R °│ ###############$                    │ T+  0.019 ~   8.061 (  8,042 µs) │    61   164 │ GetRange fGT{(24, 2985, "server_commands", "SRV001")} <= k < fGE{(24, 2985, "server_commands", "SRV001").<FF>}, limit(1), Exact => 1 result(s), has_more
│ 1   c  │ _______________`                    │ T+  8.066 ~   8.074 (      7 µs) │    43       │ Clear (24, 2985, "server_commands", "SRV001", @64883287936080-1#1)
│ 2   Co°│ _______________.################### │ T+  8.077 ~  17.801 (  9,724 µs) │             │ Commit
└────────┴─────────────────────────────────────┴──────────────────────────────────┴─────────────┘
> Read 164 bytes and Committed 115 bytes in 17.831 ms and 1 attempt(s)

The work item will be schedule on some thread, which will read the work item and original data in //

Transaction #609 (read/write, 3 operations, '#' = 0.5 ms, started 17:05:06.2094163Z, ended 17:05:06.2274629Z)
┌  oper. ┬─────────────────────────────────────┬──── start ──── end ── duration ──┬─ sent  recv ┐
│ 0   G *│ ##################################$ │ T+  0.005 ~  17.515 ( 17,510 µs) │    38   205 │ Get (24, 2985, "schedules_meta", {55555555-5555-5555-4344-e061000001b9}) => {"id":"55555555-5555-5555-4344-e061000001b9","usr":"DOMACME\\dupond","tid":"aaaa...044","dt":"2018-05-29T17:05:04.9483179Z","ix":0,"st":0,"act":"SRV006","r":false}
│:0   G *│ ##################################$ │ T+  0.012 ~  17.506 ( 17,494 µs) │    36   560 │ Get (24, 2985, "tickets_meta", {aaaaaaaa-aaaa-aaaa-e5ff-cc3100000003}) => {"id":"aaaaaaaa-aaaa-aaaa-e5ff-cc3100000003","usr":"DOMACME\\dupond","q":"VIRTUA...e"},"ft":0,"a":"SRV006","loc":"wd://srv006/Queues/VIRTUALQ/144/TheKey_7E9B6DBB"}
│ 2   Co │ __________________________________. │ T+ 17.540 ~  17.594 (     53 µs) │             │ Commit
└────────┴─────────────────────────────────────┴──────────────────────────────────┴─────────────┘
> Read 765 bytes in 17.598 ms and 1 attempt(s)

Then, when the work item is complete, a single transaction will:

  • check that the work item is still scheduled
  • remove the item and its entry in the queue
  • check the next item in the queue
  • if there is one, post a message to the corresponding server
Transaction #568 (read/write, 10 operations, '#' = 1.0 ms, started 17:05:06.1642960Z, ended 17:05:06.2194419Z)
┌  oper. ┬─────────────────────────────────────────────────────────┬──── start ──── end ── duration ──┬─ sent  recv ┐
│ 0   G *│ ###########################:                            │ T+ 10.797 ~  32.669 ( 21,872 µs) │    46    12 │ Get (24, 2985, "schedules_status", {55555555-5555-5555-4344-e061000000ab}, "slot") => <00><00>;<02><D1>TLO<00><05><00><01>
│ 1   c  │ ___________________________`                            │ T+ 32.672 ~  32.674 (      1 µs) │    43       │ Clear (24, 2985, "devices_queues", "PRN0017", @64883287936079-5#1)
│ 2   c  │ ___________________________`                            │ T+ 32.674 ~  32.675 (      1 µs) │    38       │ Clear (24, 2985, "schedules_meta", {55555555-5555-5555-4344-e061000000ab})
│ 3   cr │ ___________________________`                            │ T+ 32.677 ~  32.678 (      1 µs) │    82       │ ClearRange (24, 2985, "schedules_status", {55555555-5555-5555-4344-e061000000ab}).<00> <= k < (24, 2985, "schedules_status", {55555555-5555-5555-4344-e061000000ab}).<FF>
│ 4   c  │ ___________________________`                            │ T+ 32.679 ~  32.680 (      1 µs) │    60       │ Clear (24, 2985, "schedules_by_ticket", {aaaaaaaa-aaaa-aaaa-e5ff-cc3100000002}, {55555555-5555-5555-4344-e061000000ab})
│ 5   R °│ ___________________________$$                           │ T+ 32.686 ~  34.001 (  1,315 µs) │    62    59 │ GetRange fGE{(24, 2985, "devices_queues", "PRN0017").<00>} <= k < fGE{(24, 2985, "devices_queues", "PRN0017").<FF>}, limit(1) => 1 result(s), has_more
│ 6   G °│ _____________________________x#####;                    │ T+ 34.464 ~  39.201 (  4,737 µs) │    38   205 │ Get (24, 2985, "schedules_meta", {55555555-5555-5555-4344-e061000000ac}) => {"id":"55555555-5555-5555-4344-e061000000ac","usr":"DOMACME\\dupond","tid":"aaaa...017","dt":"2018-05-29T17:05:04.8936595Z","ix":1,"st":0,"act":"SRV006","r":false}
│ 7   a  │ ___________________________________`                    │ T+ 39.641 ~  39.671 (     29 µs) │    34       │ Atomic_Add (24, 2985, "server_commands", "SRV006"), <01 00 00 00>
│ 8   a  │ ___________________________________`                    │ T+ 39.780 ~  39.798 (     18 µs) │   166       │ Atomic_VersionStampedKey (24, 2985, "server_commands", "SRV006", @?#1), <7B 22 63 6D 64 22 3A 22 70 72 69 6E 74 22 2C 22 73 69 64 22 3A 22 35 35 35 35 35 35 35 35 2D 35 35 35 35 2D 35 35 35 35 2D 34 33 34 34 2D 65 30 36 31 30 30 30 30 30 30 61 63 22 2C 22 74 69 64 22 3A 22 61 61 61 61 61 61 61 61 2D 61 61 61 61 2D 61 61 61 61 2D 65 35 66 66 2D 63 63 33 31 30 30 30 30 30 30 32 66 22 2C 22 64 65 76 22 3A 22 50 52 4E 30 30 31 37 22 7D>
│ 9   Co*│ ____________________________________################### │ T+ 39.803 ~  55.105 ( 15,303 µs) │             │ Commit
└────────┴─────────────────────────────────────────────────────────┴──────────────────────────────────┴─────────────┘
> Read 276 bytes and Committed 679 bytes in 55.115 ms and 1 attempt(s)

There are some optimization possible (I need to read more data than needed in some cases), and also the keys are currently very long (subspaces use a descriptive string to help debugging).


(Christophe Chevalier) #6

Looks like I forgot to release the handbrake all along!

Reading batches of commands from the queue (instead of only the next one) yields more than 3x improvement (and more than 7x compared to the initial version) with 5000 items scheduled in 3.6 seconds.


Time required to drain 5000 work items

I’m currently limiting myself to read the next 10 commands from a server’s queue:

Transaction #541 (read/write, 12 operations, '#' = 0.5 ms, started 20:19:09.0370983Z, ended 20:19:09.0581559Z)
┌  oper. ┬───────────────────────────────────────────┬──── start ──── end ── duration ──┬─ sent  recv ┐
│ 0   R °│ ################$                         │ T+  0.007 ~   8.535 (  8,527 µs) │    61  1640 │ GetRange fGT{(24, 3028, "server_commands", "SRV002")} <= k < fGE{(24, 3028, "server_commands", "SRV002").<FF>}, limit(10), Exact => 10 result(s), has_more
│ 1   c  │ ______________________`                   │ T+ 11.578 ~  11.594 (     16 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894930977916-0#1)
│ 2   c  │ ______________________`                   │ T+ 11.594 ~  11.603 (      8 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931041440-0#1)
│ 3   c  │ ______________________:;                  │ T+ 11.603 ~  11.838 (    235 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931130481-1#1)
│ 4   c  │ _______________________`                  │ T+ 11.839 ~  11.840 (      2 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931187803-4#1)
│ 5   c  │ _______________________`                  │ T+ 11.840 ~  11.841 (      1 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931188681-0#1)
│ 6   c  │ _______________________`                  │ T+ 11.841 ~  11.842 (      1 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931206271-1#1)
│ 7   c  │ _______________________`                  │ T+ 11.842 ~  11.842 (      1 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931215875-2#1)
│ 8   c  │ _______________________`                  │ T+ 11.842 ~  11.843 (      0 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931259458-2#1)
│ 9   c  │ _______________________`                  │ T+ 11.843 ~  11.843 (      0 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931273832-0#1)
│ 10  c  │ _______________________`                  │ T+ 11.843 ~  11.844 (      0 µs) │    43       │ Clear (24, 3028, "server_commands", "SRV002", @64894931306928-4#1)
│ 11  Co°│ _______________________X################# │ T+ 11.847 ~  20.833 (  8,987 µs) │             │ Commit
└────────┴───────────────────────────────────────────┴──────────────────────────────────┴─────────────┘
> Read 1,640 bytes and Committed 1,150 bytes in 20.837 ms and 1 attempt(s)

This is a bit scary because when the transaction commits, I’m holding 10 messages in a stack on some thread, that could get killed for any reason, and dropping all of them at once without any trace in the db :confused: … but the perf improvement is worth it!


(Christophe Chevalier) #7

EDIT : this was caused by something else!

When increasing the number of workers (each with their own message queue), I started noticing a lot of conflicts on the transaction that pop messages from the queues.

This is unexpected because there is only one producer and one consumer per queue, so no reason for conflict.

After some time, I realized that the conflict is cause by insertion into neighboring queues by other transaction, that create a sort of “false conflict”.
A message queue looks like

  • (..., 'Q', SRVID, ) = <counter>
  • (..., 'Q', SRVID, <VS+0>) = {message}
  • (..., 'Q', SRVID, <VS+1>) = {message}

The consumer watches the counter key, and then does a GetRange(…) on the message, followed by a clear for message consumed.
The producer appends a message using a VersionStamped key, and atomically increments the counter key.

The conflicts are created because the GetRange(…) “leaks” into the counter key of the next queue:

The transaction that pops from queue A, has a read conflict range that includes the counter key of queue B. If another transaction push a message to queue B, it will increment the counter for B. The conflict resolution algorithm does not care if the result of GetRange would not be changed if the value of the excluded last key is changed, and will conflict the transaction anyway.

Changing the GetRange to be a snapshot read fixes the issue for me (does not create conflicts), but only because I have a single consumer per queue. If I had multiple consumers per queue, I’m not sure how I could pop from the queue without conflicts.

Getting rid of conflicts, I was able to go from ~1,500 to ~1,666 work items processed per second (~10% improvement)


(Christophe Chevalier) #8

A look at the time taken to process an item, given its initial position in the queue.

Test:

  • Randomly assign all work items in all the queues
  • Plot the time taken to process an item, given its initial position in its queue
  • Compare the times by varying the number of concurrent queues (5, 10, 20)

Average delay between items:

  • 5 queues: 29 ms
  • 10 queues: 35 ms
  • 20 queues: 60 ms

It seems that above 10-15 concurrent queues, the latency per item starts to grow exponentially. I’m guessing I’m reaching the limits of my test (simulating all the workers in a single process).


(Christophe Chevalier) #9

Another test, that attempts to simulate 1,000 producers that schedule jobs at random interval (with random pause from 250ms to 5 sec), spread over 1,000 random queues over 10 servers, running for 30 seconds. Each producer has a small probabilty (~1%) to switch queue or server during the test.

The test runs locally, so does not include network latency!

The latency measured is “end to end”, meaning: each work item has a “Created” and “Processed” timestamp.

The delay includes all of this:

  • creating a work item with “Created” timestamp (BEGIN)
  • serializing the work item
  • insertion into the database
  • Posting a message to the remote server’s queue
  • If the queue is empty, updating the watched key of that server.
  • Watch on the remote server triggering
  • worker thread reads a batch of commands from its command queue
  • Worker reads the original work item payload
  • Worker stamps the work item with a “Processed” timestamp (END).
  • Notifying the next server in the queue by touching its watched key.


Legend: X is the time elapsed since start of the test, Y is the latency of a work item completed at that time.

There is an initial spike at the start (which may be caused by cold caches, JITing, etc…), but then it seems to hover around 100ms latency, with typical latency between 50ms and 150ms. After the initial spike, there are some outliers at ~300ms.


(A.J. Beamon) #10

If (Q, B) is really the key resolved to by your key selector for the end of the range, then it shouldn’t be part of the conflict range (conflict ranges are exclusive of the end key). If your selector happens to resolve to a key after (Q, B), though, then maybe it’s possible an update to (Q, B) could conflict.

I would actually expect, though, that the conflict range for your query to end at (Q, A).\xFF regardless of what the end selector resolves to. This is because the answer to the query doesn’t depend on anything after (or including) that key. If your conflict range is larger than that, that may indicate a bug.

If you are having conflicts of that nature, then a workaround could be to use snapshot reads and then manually set the conflict range to end at an appropriate key. It might also be a good test to see if the conflict ranges are actually larger than necessary or if there is a different conflict involved here.


(Christophe Chevalier) #11

I used a snapshot read and conflicts went away.

One thing to note, I do the range read with limit = 10, and streaming mode Exact. Could this have an impact? First iteration had “limit = 1” and conflicts started appearing when I used limit = 10. My hypothesis is that with limit=1, the end was the next command in the same queue, while with limit = 10 I could overflow past to the next queue…


(Christophe Chevalier) #12

Also, that’s another example where being able to extract the actual list of mutations, read and write range sent by the client to the cluster could help us here :slight_smile:
My transaction logs only see what key selectors I used at the binding level, but I have no idea how the range was split and transformed into a conflict range by the native actor.


(A.J. Beamon) #13

Right, but that isn’t enough to conclude whether the conflict was from something outside of the range being read (which I claimed isn’t necessary and could possibly indicate a bug) or from something inside the range. If you manually set the conflict ranges to the range you are reading, we can see if the conflicts are present or not and make that conclusion.


(Christophe Chevalier) #14

Ok so I added a read conflict range on (Q, ID).\x00(Q, ID).\xFF and conflicts are back. One observation: it is always only a single retry.

Without knowing exactly what caused the conflict, I don’t have any easy way to know what’s causing this, since there are thousands of transactions flying around.


(A.J. Beamon) #15

Yeah, as you know we don’t have much tooling support for identifying what keys are conflicting, though this is certainly something we’re interested in.

Your last test tells us that there are conflicts happening from someone writing to this same queue, and your observation about setting the limits probably indicates that it’s not happening at the beginning of the queue. Is it possible that someone is appending something to the queue simultaneously as your read, and if your range read reads the whole queue in it gets a conflict? Possibly you could try setting the end of your conflict range to lastKeyRead+\x00 to see if there continue to be conflicts (it’s possible this could be the correct conflict range for you in general, depending on your situation).


(Christophe Chevalier) #16

:sweat_smile:

Since I have two types of queues, I mixed them. This is the server command queues, and since the version where I made things concurrent, a server can now work on multiple work queues in parallel, so can receive multiple commands to work on items from multiple queues at the same time and from multiple remote servers.

So what happens is probably someone appends another command to this server !

oops :slight_smile:

You are right, it would be really helpful to have a way to pinpoint the cause of conflicts. The dream feature would be to have an error message like “your transaction was probably conflicted by that transaction other there, on that key (or range of key)”. This would save us a lot of time ! Even with a fully detailed log of transactions, I was not able to quickly pinpoint my error.


(Christophe Chevalier) #17

I’m trying to get the list of error codes that a Watch can throw but mean “you need to retry” vs “this is a real error”.

See below for the pseudo-code.

Questions:

  1. What are the error codes that are safe to retry for a Watch?

  2. Also, I think that I should probably wait a random amount of time before retrying (with exponential backoff?) to prevent hammering the database…

  3. Can I use fdb_transaction_on_error() for this case also? It seems that it is tailored for general errors that happen inside a transaction or during commit. But here this is happening outside of the transaction, so maybe we need a different list of codes?

  4. is there any easy way to make the watches fault to test that my code is handling all cases properly?

EDIT: looking at the code (watchValue(..) in NativeAPI.actor.cpp), it looks like the implementation already handles some error codes and does wait between errors:

  • both error_code_wrong_shard_server/error_code_all_alternatives_failed wait 10 ms before retrying
  • error_code_timed_out is retried after 10ms also
  • error_code_watch_cancelled is retried after 1 second

Any other code waits 10ms (same delay as timed_out) and is thrown back to the caller.

Looking at this, I’m not sure if I need to bother catching codes because they seem to already be handled.

I don’t know what I should do if I see too_many_watches though, I don’t really want to have another code path that defaults to polling :confused:


My algorithm looks like this:

  • loop until we get messages (or are cancelled by caller)
    • start a transaction to popup next messages, or if none, setup a watch
    • if got messages, return
    • await the watch for next messages
    • try again
async Task<Message[]> GetNextMessages(string queue, CancellationToken ct)
{
  while(!ct.IsCancellationRequested)
  {

    // get next messages, or setup a watch if none
    (var items, var watch) = await db.ReadWriteAsync(async tr =>
    {
      var items = await PopNextMessagesFromQueue(tr, queue, ....);

      if (items == null)
      { // nothing, setup the watch
        return (null, tr.Watch(....));
      }
      // we have some items!
      return (items, null); 
    }, ct);

    if (items != null) return items;

    // wait for next commands
    try
    { 
      await watch;
    }
    catch(FDBException x)
    { // the watch failed!

      if (x.Code not in [/* LIST OF RETRYABLE WATCH ERRORS */])
      { // this is a non recoverable error, abort!
         throw;
      }
      // this is a transient error, try again
      //TODO: should we wait a bit here?
    }
  }
}

(A.J. Beamon) #18

There are two sets of errors that need to be considered when dealing with watches – those that come before the watch’s transaction is committed and those that come after.

Most retryable errors that occur before the transaction is committed will also affect the commit, and so if you are running a retry loop the transaction will be retried anyway. Looking at the code, I’m not sure this is the case for the future_version error, though.

The other error that you have to watch out for pre-commit is the too_many_watches error. I’m not actually sure there’s a good way to check whether this (or really any other error) is being hit prior to committing, so you’ll have to fall back to the case of checking for errors after commit.

For errors that occur after the transaction is committed, I mostly just treat them as if the watch fired. In other words, I read the value at my watched key, check if it differs from my previously recorded value, and set a new watch. You may want to do something special in the too_many_watches case, although really the purpose of that error is to encourage you to stay away from that limit. If necessary, the number of allowable watches per client can be changed via database option, though that may have an effect on performance and resource usage. The limit also cannot be set higher than 1 million. Outside of too_many_watches, I don’t think you should expect any errors here that would indicate that you shouldn’t setup a new watch. It is always possible that some sort of bug could result in weird errors coming through, though (e.g. internal_error or broken_promise, etc).

I don’t think the expectation is that you need to implement a backoff mechanism to use watches. It’s perhaps a little weird that the future_version error not lead to a delay before retrying the transaction, but the default delay there is somewhat small (10ms), and you will end up spending some chunk of that time committing the transaction and getting a new read version to try again.


(Christophe Chevalier) #19

Yeah you are probably right, the algorithm is designed that way anyway. I think I’ll add a small 100ms - 500ms delay between retries anyway, so that it could fallback to polling if the error is too many watch or similar without a different path.


(Geo) #20

Keep up the good work Chris. You and I are apparently after the same thing – EDA and FoundationDB.

Get those Actors lean and scalable, and point me towards any Open Sources Repos you got.