A few design-pattern + check-my-understanding questions

I am a new FoundationDB user! So far very impressed :slight_smile:

Apologies in advance for the longer post. Over the last few days I’ve read almost the entire manual, and I have a few questions and gaps in my knowledge:

1. Does FDB draw an important distinction between read-only and write transactions?

In the Go driver, I notice there is a ReadTransact function which only allows read operations inside it. I could not find anything in the main docs mentioning a difference, so I’m assuming this is mainly present for helpful type-safety / ease of mind when writing a read-only function.

2. Are writes within a single transaction sequential / blocking?

The docs mention how it is advisable to start all the reads you’ll need, and only block on the data once you need it - in Go, this is accomplished with a := tr.Get(A); b := tr.Get(B); a.MustGet().

If later in the same transaction, I call tr.Set(A, x); tr.Set(B, y), does Set block? Or to ask it a different way: is the latency of two sequential Set calls zero, one, or two round-trips from the client to the database? I checked the source but it immediately dives into C.fdb_transaction_set.

3. Does the possibility of unknown transaction results make the FDB atomic features tricky to use?

From the unknown results doc, any transaction that is not idempotent can result in unintended behavior. The banking example in the link includes using a separate, idempotent depositID check/set to prevent an add from running twice.

It seems to me like you’d run into this a lot - suppose you want to set some key, and log how many times that feature was used each day. A transaction of (in not-quite-correct pseudocode) tr.Set(A, x); tr.Add(tuple.Tuple{year, month, day}, 1) is not idempotent because of the Add, and key A may have already been x before this so checking for that condition in the transaction is insufficient.

Should you use a separate, idempotent key / 2PC type design whenever you want to use FDB atomic features, or am I missing something?

4. Are “snapshot reads” only useful if you also intend to write in the same transaction?

This might be an obvious one, and maybe I’m just confused because of the name. Am I correct in thinking a read-only transaction would never conflict with a write transaction, because of MVVC, and therefore snapshot reads are only useful for avoiding write conflicts with another write transaction?

5. Is it possible to efficiently count keys in a range?

Suppose you want to know how many keys are in a (sparse) range - with the tools at hand, it looks like I’d have to stream the entire set of K/V data to my client. Or, have thought of wanting this capability before writing any data at all, and transactionally increment / decrement a separate counter key as above…

For bonus points, can I find the approximate “middle” (+/- X) key in a range? That would allow easy concurrency for my next question:

6. What’s a good pattern for reading a large key space?

Suppose you want to perform some large statistical analysis on your entire data set. Inconsistency (not fitting in 5s) is fine, but it’s not clear to me how best to be performant.

Maybe do some TCP congestion window type situation by increasing how much you read (using a limit + the send-all range option) until a timeout happens, and use that as a heuristic? How about concurrency?

This is partially addressed in the segmented range reads recipe.

7. What’s a good pattern for an online schema upgrade?

Suppose you have a large range of keys that are encoded with protobuf or similar, and you want to add some new, optional field to your struct. Keys are being added / removed during the upgrade. You want to efficiently upgrade many millions of keys.

My off-the-cuff: upgrade your software fleet to save the new field when adding or modifying keys. Once the entire fleet is no longer writing the old format, do something similar to the method in the question above, setting the batch “low priority” flag on your transactions. Having iterated through the entire range, you could then iterate the entire range a second time validating that there are no further changes to be made.

Thank you very much if you got this far. I’m excited to build things! :slight_smile:

1 Like

“Yes”.

  • Read-only transactions don’t commit anything so they don’t have to wait for any response when they are done, and also they don’t “allocate” any resources/locks on the server while they are alive. They only need to acquire the read version before the first read. Since they don’t write, they cannot conflict.
  • Write-only transactions have to commit (obviously) but don’t need to acquire a read version (because they don’t need it). Since they don’t read, they cannot conflict (but can cause others to conflict)
  • Read-Write transactions will need to both acquire a read version for the reads, and then have to wait for the commit to succeed at the end. Since they both read and write, they can conflict or cause others to conflict.

Mutations are not blocking: the transaction only buffers internally the complete set of mutations that you performed, and then send the whole list (plus the list of conflict ranges) to the cluster at commit time. So it is “as if” all the writes did occur at the end while waiting for the commit to complete.

Reads are the exact opposite since you have to go to one of the storage process to get the results. That’s why it is usually faster if you can issue all your reads concurrently (and not sequentially)

Mutations are also “sequential” in the sense that they affect the database in the order you issued them: for example if you would first CLEAR(X) then SET(X, 42), the transaction would only send “SET(X, 42)” to the cluster. On the other hand, If you would first SET(X, 42) then CLEAR(X), the transaction would only send “CLEAR(X)” to the cluster (ie: it “merges” operations on the same key).

If you are mutating different keys in the same transaction, then the order does not really matter (unless you insert some reads between them). I believe that the list of mutations is sorted lexicographically by the client when the transaction commits, so the cluster doesn’t know the ordering of the individual mutations.

Yes and no.

Since fdb relies on optimistic locking, you always have to use retry-loops anyway, so you just need to add some extra logic at the start of your transaction handler to check for the previous attempt’s error code. If it was unknown result and if you have a way to check if the change did already happen, you can then simply abort without committing.

This is a bit tricky to do if your transaction is write-only and only use atomic increments, if you are using random UUIDs that were generated inside the retry loop (just generate them outside!) or use versionstamps.

You may need to create some additional key that can serve as the witness of the operation’s success.

As always with distributed systems, it is easier if your operations are idempotent.

Snapshot reads are used if you DON’T want your transaction to conflict if another one changed the value of the key behind your back.

This can be used to reduce the probability of conflicts for global counters or other high contention algorithms. For example, you can read a range of values, choose one in particular, go mutate some keys related to it, and then add a read conflict range on that one key. This will be as if you never read the other ones. Another concurrent transaction would have may chosen a different key and so would not conflict with you.

You can also use them to intentionally relax the ACID guarantees by only using snapshot reads in the transaction, if you don’t care if the key was changed. But make sure to not shoot yourself in the foot :slight_smile:

I believe snapshot reads are a tiny bit faster, because the client does not need to remember the list of keys you read, nor send them to the cluster when you commit. If it is impossible for the key to be changed by another transaction, then you may use snapshot reads to squeeze a few microseconds here and there.

You may gain a tiny bit of network bandwidth and cpu time (we’re talking maybe a few %). Not really worth the pain I would say.

Not at this time: there are multiple ways to “count” keys in a range, but they all consume a lot of resources server-side. Basically they will require the storage server to touch all or most of the leaf pages in the B-Tree, so will probably impact the page cache’s efficiency.

The current recommended way is to have an additional key per “collection”, that is used as a counter. This key would be updated with atomic increments or decrements whenever you add/remove items to the “collection”. If you don’t forget to update the key in all the possible code paths, then the counter should always be consistent with the actual data.

If you want to find some middle point in a range, you can combine key selectors and “recursive” algorithms, a bit like a binary search would do. Downside is that you will have to issue single reads at each “step” before deciding to go left or right, meaning the total latency will be high…

If you know that the range will be somewhat small (in bytes, not in number of keys), then sometimes it is faster to read the whole range and then do the search in memory.

If not, then you need to add another data structure on the side that will allow you do speed up the search process.

If your scan only needs a portion of the key’s value… I would say store a copy of only the part you need in another set of keys (a bit like a column-store would do?). Maybe use covering indexes if possible?

If you need to read the entire value to perform your aggregate, then I guess you could use the locality API to split the range into smaller chunks, with boundaries aligned to each individual storage server. Spawn multiple threads which will work on the individual chunks, and then merge the results once they are done. This should help parallelize the computation on the maximum number of storage nodes, but will generate a lot of network traffic.

Another approach is to have one instance of your app running on the same servers as the fdb storage process, and use the same locality API to send a message to each instance, asking it to process “local” chunks. Hopefully they will query the local storage process and not have to go across the network (it will use “localhost” instead)

And as always, you can also try to add another data structure that is updated while data is inserted in the database, that can be used to speed up the query (maybe some index that pre-computes partial aggregates?)

It’s a bit difficult because it really depends on the type of data, access patterns and so on.

There is one general purpose solution (not always the best but sometimes useful) where you use the Directory Layer to create a new temporary subspace, convert the previous data from the “current” subspace into the new temporary subspace, in a background process (again, by using the locality API you can spawn multiple workers that will work on different chunks).

While this is running, you track in some queue all entities that have been changed while the process was going on.

Once all workers are complete, you then drain the queue of recently changed entities, again and again until it is empty (you may want to have some “lock” key somewhere to stop all mutations if you are never able to “keep up”).

Then, you can use the Directory Layer to - in the same transaction - atomically swap the “current” and “temporary” subspaces. This means that any new transaction will now use the updated data. Then you can simply “clear range” the old subspace.

So yeah the downside is that you need to “clone” all the existing data, which - depending on the size of your dataset - may or may not be feasible (both in time and in required disk space).

Another solution is to update your entities “Just in Time”: you have a schema version and entity version field on each entity. When you read an entity with the old schema version you upgrade it “in memory” and at the same time enqueue it in a background thread. This thread can then lazily update “for real” the entity (checking first if it was not mutated/updated by another worker in the mean time). When you have to mutate an entity with the old schema version, then you can simply update it, mutate it and store the update version in the database. The background thread will either skip this record later, or conflict if it was attempting to upgrade it at the same time.

This is a bit trickier to do if your schema change also add indexes or change existing ones. Again, this can be worked on in the background in a temporary subspace, and then you can “publish” the new index by renaming the temporary subspace into the final name for that index…

Personally, my dataset have always been small enough that I’ve always been able to do offline upgrades (put app in “maintenance mode”, export everything, clear, reimport everything with the upgraded schemas, recompute indexes, restart the app).

Christope’s detailed answer covers your questions pretty well, but I just wanted to add a couple points.

Committed transactions do actually require a read version, and a write-only transaction that hasn’t acquired one will do so implicitly at commit time. This read version won’t result in any conflicts, though, if there aren’t any reads or read conflict ranges.

The answer above covered this pretty well, but to be explicit the answer to your question is basically yes. Without a write (or setting write conflict ranges), your transact won’t commit and wouldn’t conflict regardless of whether you used snapshot reads or regular ones. As Christophe suggested, there may be a performance benefit to doing so, but not one we’ve measured as far as I know. See also Use snapshot transactions in our bindings for read-only retry loops. · Issue #718 · apple/foundationdb · GitHub.

Yes, but I would imagine that acquiring the read version then happens “cluster”-side during the commit process, and so at least you would need one fewer network round-trip? Also, could the read version of a transaction that read nothing could simply be equal to its commit version?

Out of curiosity, when is the read version acquired, relative to transaction creation and the first get?

It happens automatically when you first read, but you can also explicitly request it (via fdb_transaction_get_read_version), and then its cached for all subsequent reads in this transaction. As I understand, the 5 seconds “deadline” starts from your first read, not the creation of the transaction handle itself.

To make it clearer, here is a sample transaction profiling trace:

note: this comes from a server that was somewhat slow at the time, I got better timings than this on better hardware and switching to linux for the server. But the general shape is very typical

Transaction #568 (read/write, 10 operations, '#' = 1.0 ms, started 17:05:06.1642960Z, ended 17:05:06.2194419Z)
┌  oper. ┬─────────────────────────────────────────────────────────┬──── start ──── end ── duration ──┬─ sent  recv ┐
│ 0   G *│ ###########################:                            │ T+ 10.797 ~  32.669 ( 21,872 µs) │    46    12 │ Get (24, 2985, "schedules_status", {55555555-5555-5555-4344-e061000000ab}, "slot") => <00><00>;<02><D1>TLO<00><05><00><01>
│ 1   c  │ ___________________________`                            │ T+ 32.672 ~  32.674 (      1 µs) │    43       │ Clear (24, 2985, "devices_queues", "PRN0017", @64883287936079-5#1)
│ 2   c  │ ___________________________`                            │ T+ 32.674 ~  32.675 (      1 µs) │    38       │ Clear (24, 2985, "schedules_meta", {55555555-5555-5555-4344-e061000000ab})
│ 3   cr │ ___________________________`                            │ T+ 32.677 ~  32.678 (      1 µs) │    82       │ ClearRange (24, 2985, "schedules_status", {55555555-5555-5555-4344-e061000000ab}).<00> <= k < (24, 2985, "schedules_status", {55555555-5555-5555-4344-e061000000ab}).<FF>
│ 4   c  │ ___________________________`                            │ T+ 32.679 ~  32.680 (      1 µs) │    60       │ Clear (24, 2985, "schedules_by_ticket", {aaaaaaaa-aaaa-aaaa-e5ff-cc3100000002}, {55555555-5555-5555-4344-e061000000ab})
│ 5   R °│ ___________________________$$                           │ T+ 32.686 ~  34.001 (  1,315 µs) │    62    59 │ GetRange fGE{(24, 2985, "devices_queues", "PRN0017").<00>} <= k < fGE{(24, 2985, "devices_queues", "PRN0017").<FF>}, limit(1) => 1 result(s), has_more
│ 6   G °│ _____________________________x#####;                    │ T+ 34.464 ~  39.201 (  4,737 µs) │    38   205 │ Get (24, 2985, "schedules_meta", {55555555-5555-5555-4344-e061000000ac}) => {"id":"55555555-5555-5555-4344-e061000000ac","usr":"DOMACME\\dupond","tid":"aaaa...017","dt":"2018-05-29T17:05:04.8936595Z","ix":1,"st":0,"act":"SRV006","r":false}
│ 7   a  │ ___________________________________`                    │ T+ 39.641 ~  39.671 (     29 µs) │    34       │ Atomic_Add (24, 2985, "server_commands", "SRV006"), <01 00 00 00>
│ 8   a  │ ___________________________________`                    │ T+ 39.780 ~  39.798 (     18 µs) │   166       │ Atomic_VersionStampedKey (24, 2985, "server_commands", "SRV006", @?#1), <7B 22 63 6D 64 22 3A 22 70 72 69 6E 74 22 2C 22 73 69 64 22 3A 22 35 35 35 35 35 35 35 35 2D 35 35 35 35 2D 35 35 35 35 2D 34 33 34 34 2D 65 30 36 31 30 30 30 30 30 30 61 63 22 2C 22 74 69 64 22 3A 22 61 61 61 61 61 61 61 61 2D 61 61 61 61 2D 61 61 61 61 2D 65 35 66 66 2D 63 63 33 31 30 30 30 30 30 30 32 66 22 2C 22 64 65 76 22 3A 22 50 52 4E 30 30 31 37 22 7D>
│ 9   Co*│ ____________________________________################### │ T+ 39.803 ~  55.105 ( 15,303 µs) │             │ Commit
└────────┴─────────────────────────────────────────────────────────┴──────────────────────────────────┴─────────────┘
> Read 276 bytes and Committed 679 bytes in 55.115 ms and 1 attempt(s)

You can see that the very first Get is “slow” (21.8 ms) compared to the next GetRange (1.5 ms) because the Get is actually a GetReadVersion followed by a Get. If I would explicitly call GetReadVersion then Get, I would see 20ms + 1.5 ms because obtaining the read version is more complex than just querying a storage process (especially if the client already has cached the “map” of which shards are located where).

Also, all the Clear, ClearRange and AtomicAdd are “instant” (sub microsecond, the chart is probably rounding up) because its just updating some state in memory.

Then finally the commit is doing all the mutation work (sending the mutation and conflict ranges lists to the cluster, waiting for the result).

So yeah, it is possible that both the first (implicit) GetReadVersion and Commit would take the bulk of the transaction time.

Last thing to note: technically this transaction handle lasted 55.1 ms (from creation to end), but since the first read starts a T+10.8 ms, the actual lifetime, as seen by fdb, is about 45ms.

It does happen client-side and before the commit request is sent, but I believe it uses the casual_read_risky flag that makes it a little cheaper by avoiding the transaction logs.

I don’t know all of the constraints on the read version off the top is my head, and I’m not in a position to easily look it up right now, but one important thing to note is that getting the read version is where ratekeeper adds latency to control the transaction rate if needed. Not doing the read version call would bypass the throttling mechanism.

Thank you all for your responses! In particular I found the distinction between read-only, write-only, and read-write queries very helpful.

For #3, it sounds like maybe write-only, atomic-only transaction could be “fixed” by generating a UUID before the retry loop, and just inserting a tuple key (serverProcessUUID, txnUUID). Checking if this key exists makes the transaction idempotent, and some cleanup process can later delete any throwaway keys that belong to a serverProcessUUID that has been harvested.

For #6, co-locating worker processes with storage is interesting. I did not realize that clients could optimize their access in that way. Calculating partial aggregates on-line may be an easier approach though, I will need to think more about machine sizing first.

For #7, the directory layer approach seems tricky, unless I’m missing something I think the migrator would still have to empty (part of) the queue after making the move, because the first attempt could conflict and need to retry. Maybe versionstamps in the queue items could help there. I think the “lazy” approach is probably easier to get right, our suggestions are pretty close there.

Last question: what’s that tracing tool? :slight_smile: Is it built into Go / FDB?

That’s what I was calling a “witness” key. Sometimes you can re-use an existing field, like a last-modified or document-version field.

Other times, if the transaction is completing a work-item or command that is in a queue somewhere, and the transaction also deletes that entry from the command queue, you can use that as well…

Maybe this thread could be interesting as well: Implementing atomic DDL for SQL schema

This is a feature of the .NET Binding. I wrote it because I needed it and nothing like that was available in fdb out of the box at the time.

I don’t know how I would have been able to build complex layers without this, so it’s a mystery to me how everyone else is able to do it “in the blind” :slight_smile:

I hope that either this come as a native feature of the C client library some day, or that authors of other bindings will port it to their language. It’s not very complex to do and the code for this part can be found here and the ASCII-art goodness can be found here