I’m currently designing a new layer that needs to coordinate work items, and distribute them among a set of worker servers, and I plan to use Watches for the notification between producers and workers.
Unfortunately, the design requires a lot of works (thousands) and even more unique “queues” (hundred of thousands). Each unique queue can have work items queued, and each item must be processed by a specific worker.
Note: I’ve already looked at the code in the client and storage node actors to see how Watches are handled, but I’m not sure yet about the performance characteristics
Example Scenario
Let’s say I want to create the next YouTube.
I would have thousands of file servers that have large video or image files upload by many users. Each user as a queue of files to convert/transcode on demand and work must be coordinated among all the file servers (a process like ffmpeg
will read from the file on local disk).
- Each user can only have one file transcoded at the same time in the cluster (the next file in the queue)
- Files are physically located on a specific server’s hard disk, so only that server can transcode the file
- Servers have lots of core, so can transcode multiple files at the same time
- Files for a single user can be stored on multiple servers.
I have a set of subspaces that store the description of each work item, and also a set of queues with “work orders” stored (using VersionStamps as ids to keep ordering).
My plan is to have each server having its own key that is watched. When a producer schedules a new workitem in a queue, and if this queue is empty, then it can touch the corresponding server’s key to wake it up.
In this example:
- User A has three files:
UserA_1.mp4
,UserA_2.mp4
,UserA_3.mp4
that are stored on several servers (1 and 2) - Server 1 can start working on
UserA_1.mp4
andUserK_3.mp4
concurrently. It cannot work onUserB_7.mp4
yet because User K has other files before in the queue. - Server 2 can start working on
UserB_1.mp4
, but must wait for Server 1 to finish before working onUserA_2.mp4
and thenUserA_3.mp4
.
When a server finishes working on a file, it can remove the corresponding entry from the queue, and check for the next entry.
- If there is none, it can stop.
- If there is another work item in the queue,
- either the next file is located on the same server, so it can continue locally
- or the next file is located on another server, and the current worker must wake up the other worker so that it can pick up the work.
I also need to support cancellation of work items (user deletes a file, a file is flagged and must be removed, the account is suspended/deleted, etc…)
- for works that are in second position or more in the queue, simply remove them.
- for works that are in first position in the queue, need to send a command to the server to stop working on this
Notification of servers
Each server will watch a key, that is used to notify it must work on something immediately.
Probably the keys will be (..., "servers", SERVER_ID) = ####
and I will use an atomic add operation (so the value is opaque)
Question #1: I will have thousands of such keys, which will be stored in the same subspace so probably store on the same storage node (or maybe two if the subspace spans two shards). Will this create too much load on that particular fdb storage node that will have tons of watches ?
Question #2: In what circumstances can a watch become “dead”? If a server has a callback on a Watch, but for any reason it dies (during a cluster event) or the client does not receive the message when the watch triggers, is it possible for the thing to deadlock? Do I need an additional polling system that checks things every X minutes, and rely on watch only to shorten the latency?
To tell a server that it must work on a file, I cannot simply set its key with a workitem id, because watches are async, and by the time the server wakes up and check the key, I could have overwritten it with another workitem id.
I need a queue of notification for each server that contains commands “work on this file”, “stop working on this file”. With a Versionstamped key, this looks trivial to do.
(..., "server_workqueue", SERVER_ID) = #### // <-- watched key
(..., "server_workqueue", SERVER_ID, <Versionstamp>) = 'Please work on file UserX_Y.mp4`
(..., "server_workqueue", SERVER_ID, <Versionstamp>) = 'Stop working on file UserX_Y.mp4 !!!`
My fear is that if the server crashes or freeze for whatever reason, I could end up sending many messages to the queue, and with nobody to consume them, I could leak a lot of keys in there (until the server restarts or is declared dead and the queue is purge).
Also, how to I handle the case when a server is frozen, I push a command “start working on file 1”, more commands for files 2 to N, and then queue a work item “abort working on file 1”. When the server unfreeze, it will see the first command, start working on file 1, even though later in the queue, there is a stop order.
How can I construct a command queue that allows cancelling commands, without requiring the consumer to read the entire queue before processing commands?
Failure Detection
I also need to be able to handle failure of a worker. In my case, I don’t need to handle retrying a work item, since the file was on the same server that crashed and won’t be available anyway. I just need to skip over to the next item in the queue that is on a different server.
I’m not sure how I can achieve this without having another set of servers that monitor the health of the workers. I would like to be able to do this in a distributed fashion, with a set of servers that periodically “check” the state of the other servers, and can detect deadlock/failures and intervene.
My idea is that each server would periodically update a “keep alive” key, as well as store somewhere the list of work items that it is working on, like (..., TIMESTAMP, SERVER_ID) = ""
. Every polling_interval / 2, the server would clear this key, and create a new one. Now, “checkers” would get_range(begin: (..., 0), end (..., NOW))
, and all results would be server that have died.
(..., "server_watchdog", <SECONDS_SINCE_EPOCH>, SERVER_ID) = ''
(..., "server_tasks", <SERVER_ID>, <FILE_ID>) = 'started at TS, currently at X%'
I think that even if multiple servers do the same check, they would not stomp on each other (at worse, they would conflict).
Question #3: is this the best approach, or are there any simpler solutions?
Performance of Watches
Watches are created on the client, and there is a max numbers of them per database handle. Right now, I only need one key per server, so I don’t think I’ll have issues there.
But watches are also handled by the storage actors, which as an AsyncMap<Key>
which contains all the keys that are watched somewhere. Looking at the code, each Set or ClearRange mutation must check this map for a watch (and trigger it if found).
Since all my watched server keys will be in the same subspace, it is highly probably that they will be in the same shard (or maybe across two shards), so handled by the same storage node.
Question #4: Will this impact negatively that storage node if it has a lot of keys in the AsyncMap? The current implementation uses std::map
, but I don’t know what is algorithm complexity looks like. Does this scale to thousands / tens of thousands of watch? It is queries for every key mutate in the shards handled by the storage node, so even if it is O(1) it would become O(N) for inserting N keys in the db…
Looking at the code, I’m not sure what happens when using double/triple redundancy. If a watch is set on a key that is replicated on 3 storage nodes, are the 3 nodes all checking the same watch? Or only one of them check all the watches? Or the watches are load balanced across the 3 storage nodes?
What happens when a storage node dies/restarts? Or if the shard is moved somewhere else?