What do you monitor?

These queues exist on the storage and log processes, respectively, and they more or less represent work that’s outstanding in the system. The system has limits on the queue sizes enforced by ratekeeper, which is a process that artificially introduces latency at transaction start (i.e. get read version) to limit the transaction rate and protect the cluster from being overrun.

I’ll try to give you an idea of how the queues work before I describe why you might want to keep an eye on them. When data is committed, it eventually makes its way to the transaction logs to be made durable. This data is also stored in memory on the log processes at this time (to support quick fetching by the storage servers), so we consider it part of the queue and track how much memory it’s using.

Meanwhile, the storage servers are asynchronously fetching the most recently committed data from the logs. When a storage server gets a set of mutations, it also holds it in memory and these mutations get tracked in the storage server’s queue. It should be noted that the data structures holding this memory on the logs and storage servers are different, which means that the size of mutations in the queue will differ between the logs and the storage servers.

Without going into too much detail, as part of our MVCC implementation, storage servers are able to serve reads up to 5+ seconds old. The way this works is that the oldest available version is stored on disk and subsequent versions are held in a multi-version memory structure. This means that each mutation will reside in memory for 5 seconds, and during this time it is considered part of the storage queue. At the point when the data is made durable, it is removed from the memory data structure and consequently no longer tracked in the storage queue.

Once the storage server has made mutations durable, it notifies the logs that it’s done so. When all storage servers responsible for a set of mutations have reported this, the logs can safely discard that data and remove it from their queue.

What you expect to see in normal circumstances is that the log and storage queues will hold about 5 seconds worth of mutations (actually, it’s a little more in practice, something like 7-8s). If something isn’t able to keep up for some reason, then you may see the log and/or storage queues start to grow beyond that. One such reason you may see this is if a storage server fails. In that scenario, the storage server is unable to fetch data from the logs, and the logs end up holding onto that data until it is replicated elsewhere or the storage server comes back and gets the data.

In this scenario, you could imagine that this takes a while and the queue on the log could grow quite large. To protect against this, we have a process on the log call spilling. When the spilling conditions are met (currently, this is when the log queue >= 1.5GB), the data is written to a separate file on disk and removed from the memory structures. When this occurs, we also consider it to be removed from the queue.

Another case where queues might grow is if a process is being saturated. If a process reach 100% CPU, say, it may be unable to keep up with the incoming work and its queues may grow. When the queues reach a certain threshold (currently 900 MB on storage, 1.6GB on logs), ratekeeper will slowly start to throttle transactions. If the queue continues to grow, the throttling becomes more aggressive, attempting to achieve certain limits (1GB on storage, 2GB on logs). It may be worth noting, though, that ratekeeper will let 1 fault domain’s worth of storage servers fall arbitrarily far behind without throttling, in which case the storage server will eventually stop trying to fetch new data when it reaches what we call the e-brake (1.5GB).

The reason you may want to monitor the queues is that it can provide you early warning that your workload is going to saturate the cluster. You may also be able to see that you are approaching the queue limits, even if they aren’t currently growing.

However, alerting based on the queues should be done with care. There are different reasons that queues can grow large, and you may not want to treat all of them the same. As illustrated above, a large queue could mean:

  1. The cluster isn’t keeping up with the workload
  2. The cluster is keeping up with the workload, but 5 seconds worth of mutations represents a sizable portion of the queue
  3. The cluster has had a failure somewhere and the queues are large while it heals

I tend to think of situation #1 as more serious than the other two (i.e. I may want to know immediately if #1 is happening, eventually if #2 is happening, and possibly not at all if #3 is happening).

Based on the current behavior of the system, I probably wouldn’t recommend aggressive alerting on the log queue size at anything less than the spill threshold (you could alert at 1.6GB, perhaps) because otherwise you will get alerted any time #3 happens. Alerting on storage queue sizes is less prone to that and probably more reasonable (I currently do it), though it’s not immune.

To keep an eye on #2, I currently don’t use queue sizes directly. Instead, I look at something called cluster.processes.<id>.roles.input_bytes.hz for the storage and log processes in status json, which measures the rate that data is coming into the queue (durable_bytes, by the way, measures data leaving the queue. You can get a queue size for a process by computing input_bytes.counter - durable_bytes.counter). From that, I can estimate how big the queues should be based on that input rate.

For #1, in addition to watching the storage queue, you can also keep an eye on storage lag (in 5.2 it’s at cluster.processes.<id>.role.data_version_lag for storage roles, in 6.0 you can use cluster.processes.<id>.role.data_lag.seconds). This measures how far behind a storage server is from the logs, and if this gets and stays much larger than 5 seconds (or 5,000,000 versions for 5.2), then that could also indicate something’s going on. In practice, I actually do something a little more complicated than this, tracking the worst lag getting very large (hundreds of seconds) and the [replication_factor]th worst lag, subtracting failures, being around 15s or so (i.e. the 3rd worst for triple replication, but 2nd worst if a process is failed).

7 Likes

Thank you for the excellent explanation. This will be very valuable in the fdb documentation.

Few general follow up questions:

  • If all the mutations sent to storage servers are eventually anyways persisted to disk, why wait for 5 seconds before persisting those? I understand that they are required to be held in memory to support MVCC, but these could be held in memory independently of storing them on disk as they come in. Would this help in maintaining smaller queues (due to faster acknowledgement foe mutations)? Are mutations held in memory for some other reason - like optimizing disk writes (e.g collapsing of multiple versions/batching)?

  • Are all mutations written to disk without any coalescing of multiple versions for a same key? For example, in a hypothetical situation, if there are 1000s of mutations for a same key in a second, will it be useful to just wait a bit longer than 5s and write the latest version from the batch of mutations that are older than 5 seconds?

Because all of a client’s reads in FoundationDB are versioned, requests come in to the storage server that are of the form “get me key x at version v”. We want to be able to serve versions for five seconds after they get committed so that any transaction that gets a fresh version can continue reading for five seconds before they start getting transaction_too_old errors.

The storage engine’s interface, however, only allows us to keep a single version on disk. (In other words, the storage engines are not themselves multi-version.) So we: (1) keep an in-memory data structure of all new mutations for the last five seconds, and (2) an on-disk data structure of the one older version. Then if we get a request for key x at version v, if v is older than our on-disk version, we know we can’t determine that value, so we return transaction_too_old. If it is exactly equal to the on-disk version, we go there. If it is newer than the version on disk but no older than the newest version we’ve received and applied to our in-memory data structure, than we query the in-memory data structure at that version, and if we don’t find anything, than we go to the on-disk data structure. And if the version is newer than the newest version we’ve applied to our in-memory data structure, than we return future_version.

Hopefully, it’s clear that if this is the read path, there isn’t a way of (1) applying versions to disk earlier than 5 seconds and (2) allowing reads for at least 5 seconds.

We could change this if one or both of two changes were made. The first is that it could be fixed if we had a multi-version storage engine that persisted multiple versions to disk. (If there is a way to save versions, you could also imagine this also allowing for “infinitely” long read transactions that read at a consistent version. This could be useful for certain analytical workloads.) But this is pretty complicated and would require a new version of the storage engine’s interface and plumbing that through to everything else.

Another approach would be to modify our storage servers to save: (1) an in-memory “to-do” data structure of as-yet-unapplied mutations, (2) an on-disk data structure of some version, not necessarily 5 seconds in the past, and (3) an in-memory “old data” data structure in which values are held that have their old values from on-disk and are removed after the commit that pushed them out is 5 seconds old. (In other words, adding an “out-queue” in addition to an “in-queue”.) Then the read path is:

  1. For requests that are newer than the on-disk data structure’s version, query the “in” queue at the version and the on-disk data structure, and take the “in” queue if present, otherwise, take the on-disk value.
  2. For requests that are older than the on-disk data structure, query the “out” queue and the on-disk data structure. The the “out” queue value if present. Otherwise, take the on-disk value.

You have to be a little careful here. For example, your “out” queue has to include tombstones (or—the opposite of tombstones?) to signify that there were empty ranges or empty values that were then populated by later commits.

Note that the out-queue is, in some sense, not recoverable. In particular, if a storage server crashes and comes back, it can reconstitute its “in” queue by reading back a stream of mutations from the transaction logs. But the “out” queue can’t. The old values they would need to correctly populate that queue might not exist any where in the database. Now, this might actually not be that bad. In particular, it would mean that if a single storage server dies, some client requests will get transaction_too_old from that storage server, and then they’d probably want to try again from another member of that storage server’s team (i.e., another server that has the same data), and then they’d get the result they need. If there was something like a bounce of all storage servers, then you might have many clients who start getting transaction_too_old errors. In that case, the clients would then need to restart their transactions, but that’s not the end of the world, particularly given how rare failures of that kind are (or should be).

The question is then if it’s worth it. You’d get to decrease the storage server queue size…kind of. You’d probably end up keeping around about the same amount of stuff in memory on storage servers, but it would be split between the “in” and “out” queues. In theory, you could say that in high throughput situations, you’d want to remove stuff from the “out” queue more aggressively (saying that keeping up with the new is more important than keeping around the old…maybe). In return, you could pop stuff from the log queue more aggressively, which might be good for some workloads. (Spilling the log queue to disk is also fairly expensive, so maybe keeping the log queues down should be considered more important.)

One other thing I’ve omitted from this analysis: to hide latency, we will actually send possibly uncommitted mutations to the storage servers so that once the user hears that a commit has happened, that commit appears to be already available from the storage servers. We’d probably want to hold off on moving any such version from the “in” queue to on-disk until such time as we know that it’s been committed for sure, i.e., that all of the transaction logs have it. This is to avoid needing to “roll back” those transactions later.

Not to my knowledge. In particular, if there are different values for this hot key at different versions, then you need to keep around version history for all of those mutations or you won’t return the “right” answer. If you also want to keep up writing values to disk, then I don’t think you can quite batch them like that.

However, we do batch transactions together so that there is more than one transaction per commit batch. I believe that those could be batched together, so if there are 10,000 mutations per second on a key and 500 batches per second, then you’ll have roughly 20 mutations per batch that can be coalesced together (which I think we do, but I’m not certain).

1 Like

Thank you Allec.

Few more queries for me to better understand your explanations:
(these are not actual proposal for improvements; but merely hypothetical questions to better understand the inner working details and design rationales)

Then if we get a request for key x at version v , if v is older than our on-disk version, we know we can’t determine that value, so we return transaction_too_old . If it is exactly equal to the on-disk version, we go there. If it is newer than the version on disk but no older than the newest version we’ve received and applied to our in-memory data structure, than we query the in-memory data structure at that version, and if we don’t find anything, than we go to the on-disk data structure. And if the version is newer than the newest version we’ve applied to our in-memory data structure, than we return future_version .

Consider following approach:

(for conciseness, I am using <, >, <=. >= operators and assuming that older versions numbers are smaller than newer version numbers)

At write time, in-memory data structure keep mutations for last 5 seconds, and. in addition, the disk storage is updated to the latest mutation version as they are received by storage server (disk storage is not assumed to be MVCC; it just overwrites the same key with newer versioned value - as is the behavior presently).

A get request arrives for x-at-v

  • if v < min(disk_version, oldest_mem_version) => transaction_too_old
  • else if v >= disk_version && v < oldest_mem_version => serve disk version
  • else if v <= newest_mem_version => serve from memory data structure with the appropriate version based on v’s position in the version chain.
  • else => future_version

Is this approach equivalent to what you described as current read path? There could be a few corner cases in the above logic, but I am not certain what is the fundamental reason to push mutations to disk only once they are older than 5s limit?

In particular, if there are different values for this hot key at different versions, then you need to keep around version history for all of those mutations or you won’t return the “right” answer. If you also want to keep up writing values to disk, then I don’t think you can quite batch them like that.

What I was suggesting is this:

  • keep all versions up to 5s in memory (to be able to return the “right” version";
  • keep, say, 1s of more versions in memory and use these to answer get requests as we were using the 5s versions
  • take all the versions of 6th second (1 extra second that we kept), and store to disk only the newest version per key from that set of mutations.

My reasoning was that, if there are multiple versions for the same key in this 1s of mutation batch that we are moving out of memory, and because disk store contains only a single version per key, could the number of updates to disk version be avoided by writing only the latest version from the expired batch of mutation (without changing the any higher level contracts or guarantees)?

I realize that there may be something more fundamental that I may be missing to understand, but these conversations are very helpful for us to get a deeper understanding of the workings of FDB :slight_smile:

With respect to this discussion of (monitoring) queue sizes, it’s important to realize that FoundationDB’s ability to queue writes is nearly always good for performance. It means that, in mixed read/write workloads, when a short burst of requests comes in faster than the database can sustain in the steady state, often FoundationDB can respond without much latency impact (by prioritizing reads and deferring writes). The reason you want to monitor the queues is because by definition no system can exceed its maximum steady state throughput forever, and a growing queue indicates that you are trying to do this. By monitoring and reacting to growing queues, you have a little time to try to correct an operational situation in which most databases would already have fallen over.

At write time, in-memory data structure keep mutations for last 5 seconds, and. in addition, the disk storage is updated to the latest mutation version as they are received by storage server (disk storage is not assumed to be MVCC; it just overwrites the same key with newer versioned value - as is the behavior presently).

This won’t work. Consider the simplest situation in which MVCC is needed: a single write comes in key:=new_value at version 1000. You immediately commit this write to disk (overwriting whatever was there before) and also keep track of the write in memory. Then a read comes in at version 999. You need to respond with the value that key had at version 999 (before the write), but you don’t have this information anywhere (you overwrote it on disk, and it was no part of the write you have logged in memory). Thus, as Alex already explained, you would need to keep an additional “undo log” of data that has been overwritten by recent writes (not just the writes themselves). Because FoundationDB supports range clears, this log might be arbitrarily larger than the writes themselves. There are a number of other reasons that an undo log is probably not the best design (though with suitable engineering I think it would work).

< take all the versions of 6th second (1 extra second that we kept), and store to disk only the newest version per key from that set of mutations.

This on the other hand is a potentially reasonable idea, and an optimization that I have wanted to give a try at some point. In particular, we already have a data structure that can reasonably efficiently scan (a superset of) all the differences between the data stored on disk and a given version, and so we could try to update the storage based on that (filtering out changes whose versions indicate they are already committed) instead of replaying the mutation log in order. In addition to “coalescing” repeated writes better than the storage engine might be able to manage on its own, it might improve performance to do a block of writes in key order instead of in version order.

However, an attempt to implement this has to be careful about performance edge cases - it’s not enough to get good performance in the average case; it’s also important not to have any situation in which the amount of work created by a workload is O(N^2). The recently reported (performance) bug in AsyncFileCached is a good example of this sort of mistake, which performance testing can’t reliably find.

3 Likes

Thanks. this is very helpful!

1 Like

Hi AJ,

I have a few more questions as I dig deeper into storage server stats:

"data_lag" : {
    "seconds" : 0.0011839999999999999,
    "versions" : 1184
},
"data_version" : 74600473696,

"durability_lag" : {
    "seconds" : 5.4489599999999996,
    "versions" : 5448963
},
"durable_version" : 74595024733,

Could you please explain the semantics of above counters?

  • data_lag.seconds : I believe this represents how far is SS latest version behind the TLog committed versions. How is this calculated? Is it based on the difference between TLog and SS latest version divided by 1e6 (rate of version generation)? Or is it computed by taking the difference between wall clock time of TLog and SS latest version commit time?

  • data_lag.versions: ?

  • durability_lag.seconds: Does this represent the number of seconds worth of transactions kept in MVCC tree of SS?

  • durability_lag.versions: ?

  • data_version: Is this the latest version number that has been fetched by SS from TLog?

  • durable_version: Is this the latest version number that has been committed to disk by SS?

In one of the lightning talks on Hotspot detection, by Ashish from SnowFfake, he mentioned the term NonDurablerVersions. Is this calculated as (data_version - durable_version)?

He also mentioned that in aRead Hotspot SS, NonDurablerVersions would be high but Storage Queue (bytes) would be small. Could you please explain a bit more on why is that?

–thanks, gaurav

It compares the versions of the storage server and the transaction log and then divides by the rate at which versions advance (1 million per second unless you’ve changed something).

Same as above but without converting to seconds. Technically this is a more accurate interpretation because the version advances by a lot during a recovery, in which case the conversion to seconds isn’t really correct. On the other hand, I’ve found it’s been more natural for people to think of this lag in terms of time rather than versions, and so I included both. It is of course possible to do the conversion yourself, but given that the version rate isn’t strictly fixed (it can be changed via knob), I thought it seemed useful to do the conversion using the actual knob value.

Yeah, this measures the difference between data that’s just arriving to the storage server and data that has been committed to the storage engine. Typically this should hover somewhere just above 5 seconds, but in some cases when the storage server is struggling to keep up you’ll see this lag grow. This may also be reflected in the queue sizes of the storage server and transaction logs because their sizes include all the data in memory that hasn’t been made durable to the storage disks.

This is analogous to data_lag seconds vs. versions.

Correct.

That was my interpretation, although it’s possible they chose some other way.

It won’t always behave this way with a read hotspot, but if you do see this occur then a read hotspot is a likely explanation (it could also be a result of misbehaving hardware, etc.). The reason for this behavior is that if a storage server is being saturated with reads, the job to make data durable to disk may get starved. When that happens, you’ll see the durable version start to lag, so you’ll see non-durable versions increase.

All of the data not being made durable still resides in the storage queue (and some transaction log queues) while this happens. If the amount of data being written is relatively small, the effect of this will not be significant. If you are doing a lot of writes, however, then the queue sizes can quickly grow to the point that ratekeeper starts throttling transactions.

In the case that you are instead saturating a process with writes, you should see those writes building up in the queue.

I’ll also mention that I believe Ashish’s talk was based on an older version of FoundationDB (3.0), and some of the details may differ from the current versions. I don’t remember well enough what’s changed between then and now to say for certain.

Thanks for great explanations! This is very very helpful.

While this suggestion has worked for me for most instances, I am not sure if these counters retain their values when the tLog process/role restarts during recovery, or during a spill? What happens if the tLog had a large in-memory queue size (of mutations pending to be taken out by SS), and the tLog got restarted, or if the queue is spilled on disk; will these counters: input_bytes.counter and durable_bytes.counter be reset to 0?

I have another related question: What does cluster.qos.worst_queue_bytes_log_server indicate? Is it the max value for input_bytes.counter - durable_bytes.counter across all tLog roles? Does it consider on-disk tLog queues that might not have been fully applied on SS?

I have a status.json, where for all tLogs (total two tLogs), the individual input_bytes.counter and durable_bytes.counter are quite small, but the cluster.qos.worst_queue_bytes_log_server is much larger. I am unable to understand if there are some metrics in tLog status that add up to this value? (Note that there is a large data_lag on one of the storage servers)

Full Status JSON

"qos" : 
{
    "limiting_queue_bytes_storage_server" : 957969,
    "limiting_version_lag_storage_server" : 19525633954,
    "performance_limited_by" : {
        "description" : "The database is not being saturated by the workload.",
        "name" : "workload",
        "reason_id" : 2
    },
    "released_transactions_per_second" : 2513,
    "transactions_per_second_limit" : 4900440,
    "worst_queue_bytes_log_server" : 1258772141,
    "worst_queue_bytes_storage_server" : 1500788142,
    "worst_version_lag_storage_server" : 19525633954
}

tLog1

{
    "data_version" : 116847077523,
    "durable_bytes" : {
        "counter" : 352223,
        "hz" : 7673.1400000000003,
        "roughness" : 7724.7200000000003
    },
    "id" : "daa603da6907a8c9",
    "input_bytes" : {
        "counter" : 3762067,
        "hz" : 37439.400000000001,
        "roughness" : 49909.300000000003
    },
    "kvstore_available_bytes" : 611044757504,
    "kvstore_free_bytes" : 609033433088,
    "kvstore_total_bytes" : 914706128896,
    "kvstore_used_bytes" : 3083722752,
    "queue_disk_available_bytes" : 609033433088,
    "queue_disk_free_bytes" : 609033433088,
    "queue_disk_total_bytes" : 914706128896,
    "queue_disk_used_bytes" : 6104289280,
    "role" : "log"
}

tLog 2

{
    "data_version" : 116841999348,
    "durable_bytes" : {
        "counter" : 313854,
        "hz" : 31109.900000000001,
        "roughness" : 44553.699999999997
    },
    "id" : "95458676b4e9e98d",
    "input_bytes" : {
        "counter" : 3574854,
        "hz" : 131469,
        "roughness" : 30715.400000000001
    },
    "kvstore_available_bytes" : 636995022848,
    "kvstore_free_bytes" : 636995022848,
    "kvstore_total_bytes" : 914706128896,
    "kvstore_used_bytes" : 86264,
    "queue_disk_available_bytes" : 636995022848,
    "queue_disk_free_bytes" : 636995022848,
    "queue_disk_total_bytes" : 914706128896,
    "queue_disk_used_bytes" : 1334104064,
    "role" : "log"
}

Storage Server with large Data Lag

{
    "bytes_queried" : {
        "counter" : 0,
        "hz" : 0,
        "roughness" : 0
    },
    "data_lag" : {
        "seconds" : 19522.5,
        "versions" : 19522523338
    },
    "data_version" : 97319554185,
    "durability_lag" : {
        "seconds" : 547.94399999999996,
        "versions" : 547943553
    },
    "durable_bytes" : {
        "counter" : 3471849518,
        "hz" : 0,
        "roughness" : 0
    },
    "durable_version" : 96771610632,
    "finished_queries" : {
        "counter" : 2249560,
        "hz" : 35.999699999999997,
        "roughness" : 10.26
    },
    "id" : "27e2fe761833b0ac",
    "input_bytes" : {
        "counter" : 4972637660,
        "hz" : 0,
        "roughness" : 0
    },
    "keys_queried" : {
        "counter" : 0,
        "hz" : 0,
        "roughness" : 0
    },
    "kvstore_available_bytes" : 636994076672,
    "kvstore_free_bytes" : 636994076672,
    "kvstore_total_bytes" : 914706128896,
    "kvstore_used_bytes" : 35833831424,
    "mutation_bytes" : {
        "counter" : 357157824,
        "hz" : 0,
        "roughness" : 0
    },
    "mutations" : {
        "counter" : 4138520,
        "hz" : 0,
        "roughness" : 0
    },
    "query_queue_max" : 21,
    "role" : "storage",
    "stored_bytes" : 30084768842,
    "total_queries" : {
        "counter" : 2249560,
        "hz" : 35.999699999999997,
        "roughness" : 10.26
    }
}

My guess is that what you’re seeing is probably the result of the processes in status reporting only the current generation of logs, while the worst queue bytes also accounts for old generations of logs. Old generations exist for some time after a recovery in order for the data on them to be copied, and it may be that in your case the old logs have large queues.

I think it’s a problem with status that it omits this information about the old logs, so there should probably be some change there to make status better.

As to your other questions, the queue represents data that is being stored in memory on the log (it’s also stored on disk). Restarting the process loses the data in memory, so the input and durable bytes reset. I think recovering the data from disk can populate the queue again, though.

Spilling results in data being erased from memory, so it increases durable bytes and reduces the queue size.

I’m not certain what happens to the counters on the old generation of logs during a recovery, but as mentioned above you don’t actually get these counters in status currently.

Thank you for clarifying that!

I have a similar question but about json output. What the difference between “available” and “healthy” here?

 fdbcli --exec 'status json' | jq .client.database_status
{
  "available": true,
  "healthy": false
}

cluster.data.state shows this:

{
  "description": "Repartitioning",
  "healthy": true,
  "min_replicas_remaining": 2,
  "name": "healthy_repartitioning"
}

What is the version of the storage server? Is it the last version of TLog mutations appeared to the storage server?

An old transcription of the dashboard and alerts to text by @ajbeamon was rediscovered, and converted to a documentation page. It has been posted at [doc] Add "Monitored Metrics" page to the docs. by sfc-gh-almiller · Pull Request #6594 · apple/foundationdb · GitHub, which also contains a screenshot of the rendered page for easy viewing until the page goes live as part of a future release.

3 Likes

Thanks for all the information listed above :pray:

We are currently parsing fdbcli to generate metrics in order to properly benchmark our first cluster, but I was wondering if the events are containing metrics not available though the fdbcli? My only insights are from wavefront-fdb-tailer or observability_splunk_dashboard.

Which events should I be watching closely, beside ones with severity over 40?

1 Like