How to troubleshoot throughput performance degrade?

My setting is 5 * i3.8xlarge for triple redundancy foundationDB cluster, each node has 32 fdbserver processes running. In total there are 10 logs, 3 resolvers, 10 proxies. Have 3 * c5.2x as API servers. My client program’s traffic is about PUT:GET ~= 40:1 ratio with other expiration logic, keys are 50 bytes and values are 7 KB.

During performance testing, in the beginning it could support > 30K/s PUT for hours, all queue size are low all the time and no durability lag. But later throughput dropped below 6K/s for unknown reasons. After the performance degraded, with < 6K/s of traffic, both log queue size and storage queue size could reach thresholds within few minutes. Even if I pause the client program and let cluster catch up, after resuming it, it fills up queue size within few minutes again. At the meantime, none of the log/storage processes are maxing out CPU.

15%20AM

I think there might be info in those .xml logs, but I’m clueless on what kind of <Event> to look at.

Also with > 100 storage processes, there are 2 storage processes having durability_lag > 500s, where for others it’s only < 100s (also bad I know). How do I let the workload distribute evenly for all storage processes?

My guess is that the background data movement among storage servers causes the throughput degradation.

It is likely that the 2 storage process that have high durability_lag hold too much data and is busy moving data to other storage processes. You can check the trace event
“BgDDMountainChopper” and “BgDDValleyFiller” for evidence:

TraceEvent(priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? “BgDDMountainChopper” : “BgDDValleyFiller”, self->distributorId)

                .detail("SourceBytes", sourceBytes)

                .detail("DestBytes", destBytes)

                .detail("ShardBytes", metrics.bytes)

                .detail("SourceTeam", sourceTeam->getDesc())

                .detail("DestTeam", destTeam->getDesc());

The problem can be caused by client’s write pattern:
For example, if a client writes data in increasing order of the keys, it will keeps putting pressure on one set (i.e., team) of storage servers.

To avoid the above problem, the client should write data to different key spaces concurrently.

1 Like

Hi @mengxu, thanks for your reply. I looked at the trace events for that lagging process, it doesn’t have “BgDDMountainChopper” nor “BgDDValleyFiller” event at all. If I filter on Severity="20", these are all existing event types FetchPast, N2_ReadError, N2_ReadProbeError, ShardServerFutureVersion1000x, SlowSSLoopx100, `StorageServerUpdateLag,.

To provide more context, we implemented expiry logic mainly based on this approach, but have 4 different kind of auxiliary keys. During performance testing, even though all keys generated from the client side are totally random, on API side we add 4 static prefixes. Would that be the reason the workload tends to fall on a few storage processes because all keys have only a few common prefixes? (Is there a trace event to see what (key, value) get routed to which process?)

iostat output: (we combine 4 physical ssds by RAID 0 and mount it as md0)

ubuntu@dev-ray-trb-fdb087.us-e:/dev$ iostat --human
Linux 4.15.0-1021-aws (dev-ray-trb-fdb087) 	06/11/19 	_x86_64_	(32 CPU)

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          24.9%    0.0%   14.6%    0.1%    0.0%   60.3%

Device             tps    kB_read/s    kB_wrtn/s    kB_read    kB_wrtn
loop0             0.02         0.0k         0.0k      10.5M       0.0k
loop1             0.01         0.0k         0.0k       3.5M       0.0k
loop2             0.03         0.0k         0.0k      12.3M       0.0k
loop3             0.01         0.0k         0.0k       3.7M       0.0k
xvda              0.57         1.2k        12.4k     457.9M       4.8G
nvme0n1       33641.35        54.4M       161.2M      21.0T      62.3T
nvme1n1       33486.13        54.2M       156.9M      21.0T      60.7T
nvme2n1       33335.21        54.1M       150.0M      20.9T      58.0T
nvme3n1       33238.43        54.0M       150.0M      20.9T      58.0T
md0           210527.52       216.7M       619.0M      83.8T     239.4T

If above is true, do you have other suggestion to implement expiry logic in FoundationDB?

From the information you’ve provided so far, it may be the case that you are saturating the disks on your storage servers. Do you have any external evidence that could support or contradict that idea?

Based on the numbers you’ve provided (30K/s PUTs of size 7050 bytes, triple redundancy), you would be writing something like 211 MB/s logical and at least 633 MB/s physical. It sounds like you have all 32 processes on a host sharing the same striped volume, in which case there may be some inefficiencies being introduced there. For example, we recommend that the logs do not share disks with the storage servers, as they have rather different write patterns with the logs fsyncing frequently. Also, with only 20 disks, the 211 MB/s logical write rate seems higher than I would have initially thought to be sustainable.

Besides trying to rearrange things a little more efficiently (e.g. by separating the logs, as described above), I think the only real recourse if you are disk bound is to either reduce your write rate or add more disks.

As a side note, you mentioned that you were able to achieve a high rate for a while but eventually slowed down. We’ve seen similar behavior from SSDs before where the rates may decrease after a long time running or as they get fuller. How full are your disks now?

Another side note – it looks like the i3.8xlarge instances have 32 vCPUs, or 16 physical CPUs. Although it sounds like you aren’t currently CPU bound, we do also recommend that each process in a cluster gets a physical core (or at least something relatively close to that). If you try to run 1 process per logical core, you may find as the cluster gets a little busier CPU-wise (say around 50% on average) that processes start getting starved, and the stability of the cluster can be greatly affected. Depending on the severity of the starvation, this is a situation that may not be handled particularly gracefully by the cluster.

1 Like

StorageServerUpdateLag indicates that storage server is in e-brake, as shown in the comment above the trace event in source code:

// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
// This is often referred to as the storage server e-brake (emergency brake)

Since you are running on EBS, it is possible that your VM runs out of IOPS?

@markus.pilman may have more insights on this topic.

It does look a bit like disk saturation to me. Or in general it looks like the storage nodes are unable to write to disk quickly enough (could also be a priority issue - reads are blocking writes - which is something we see quite often and which is why we are working on the local ratekeeper).

But I don’t think @rayjcwu is running on EBS. I don’t know what the max throughput of the ephemeral disks on these machines are (but the i3s have very very fast disks).

1 Like

You definitely need more than 4 prefixes. With only 4 prefixes and triple replication, only (4 x 3 replicas) 12 storage processes (about ~10% of the processes in your case) are writing at any given time. This means one will fall behind, the shard gets split because it is too hot, the next one falls behind, etc. Also you don’t get high enough queue depth to saturate the devices.

I would also try out the memory engine if you are going to be rapidly deleting data after you receive it.

I’d like to make sure we’re all on the same page about what a prefix is. Currently we are running tests with keys of the form kXX.... and sYY.... where X is a random ASCII letter and Y is a random byte. Is this only two prefixes? How many bytes does each storage process use for sharding?

Thanks for the reply. We currently create a raid 0 array on all SSDs and run FoundationDB on it. In progress on changing the code to let each log/storage process run on their own disk and own physical CPU.

the 211 MB/s logical write rate seems higher than I would have initially thought to be sustainable.

I’m actually not sure how to estimate the disks requirement to sustain our traffic.

I tried something similar to this approach. In single redundancy mode, having 1 log 19 storage, log process has durable_bytes.hz around 75MB. Having 1 storage 19 log, storage process has durable_bytes.hz around 15MB. (Even though in that post it used mutations, but I don’t know how to use that number). So in our case, the ratio of log:storage processes should be 1:5.

To sustain 633MB/s physical traffic, I’ll need around 10 log processes, and 50 storage processes, each with their own disk. Which means a cluster of 60 disks for triple redundancy mode.

Does the above estimation look correct?

However, If I use dd on the single disk without any other things, the write rate could be 300+MB/s (depends on the blocksize). Does that mean I am able to run more than one log/storage process per disk?

StorageServerUpdateLag indicates that storage server is in e-brake,

Thank you. By searching on GitHub repo, it’s the only one e-brake event.

If the end of your key is a monotonically increasing value, and the beginning of your key only has 4 different values, your writes are all going to 4 shards. Over time those shards will be split into more shards as they grow in size and write bandwidth, but the old shards will never receive any more writes, so you’re back at only 4 shards being written to.

It doesn’t matter if the prefix is 1 byte or 100 bytes.

You should definitely run more than one storage process per disk. See my explanation here: Storage queue limiting performance when initially loading data

The “end” of the key is not a monotonically increasing value; they are random apart from the 1-byte prefix. I’m not sure how much of the key is the “beginning”. Let me detail the exact format of the primary keys.

  1. ‘k’ key: 'k' + key (N random bytes in [a-zA-Z])
  2. ‘s’ key: 's' + 2 random bytes (chosen by hash of key) + 4 bytes of a uint32 representing a timestamp + key

If you have two bytes before the timestamp and those two bytes are sufficiently distributed that does rule out the number of prefixes being the problem. I had imagined a key like this

events/1
events/2
events/3

etc.

Someone actually did my recommended benchmarking :open_mouth:

If you are not using enterprise-grade SSDs, I think you’d actually be much better off not raid 0’ing your SSDs, and just running one (or more) fdbservers per disk, rather than many fdbservers for one raid0-ed disk. When FDB calls fsync on a raid0 drive, it’d have to fsync on all of the SSDs, so you’d potentially be having every drive do O(fdbservers) fsyncs, versus O(fdbservers/disks) fsyncs if you don’t raid them. More fsyncs typically means less drive lifetime. The enterprise-grade caveat is that the better classes of SSDs just include a large enough capacitor that they can flush out all the data if power is lost, so that they can turn fsync into a no-op, and thus it doesn’t matter anyway.

I later re-read and saw you’re running on AWS, so the disk lifetime will be of less concern to you, but I’m still not sure that you’ll get better behavior with raid-ing. Either way, I’ll leave the above in case anyone ever reads this with physical disks in mind.

Be careful about sharing disks between transaction logs, because they care just as much about bandwidth as they do fsync latency, and having transaction logs compete for fsyncs is much more dangerous for overall write throughput than storage servers competing for fsyncs.

Your math seems about right, except that the scaling probably isn’t linear. I’ve gone and dusted off my benchmarking results database, and I had run write bandwidth tests with single replication and got:

1 proxy 1 tlog = 50MB/s
3 proxy 3 tlog = 120MB/s
5 proxy 5 tlog = 170MB/s
7 proxy 7 tlog = 200MB/s

For a workload that I don’t remember, but was probably 100B values.

Numbers irrelevant, but comes out to a quadratic regression of $-2.5 x^2 + 45. x + 7.5$ being a pretty good fit. Your baseline is 1.5x better than mine, which… hrm. ~10 processes hits the peak on the quadratic equation at 300MB/s. So… it actually turns out I don’t have advice for you here, because I don’t have data laying around past what you can do with ~12 transaction class processes.

My guess is that you’ll probably fall somewhere in 10-15 logs to make it work, an equal number of proxies, and then a few more to give yourself a bit of headroom. I am concerned though that you’re pushing up against the limits of what FDB can do for write throughput right now, so if your use case is going to grow over time, you might be boxing yourself in here. FDB scales reads much better than writes, so a 40:1 ratio favoring writes is going in the wrong way for us.

FWIW, FDB6.2 will make a couple things default that resulted in a ~25% write bandwidth improvement for my tests. You can get them now if you fdbcli> configure log_version:=3 log_spill:=2. But a 25% change isn’t really going to make or break your use case here…

My guess is that you’ll probably fall somewhere in 10-15 logs to make it work

I rerun the load testing with 10 * i3.16x (80 disks total) with 10 log and 70 storage processes to 15 log and 65 storage processes. On each disk there is only one log or storage process, and an extra stateless process. In total always run the same number of proxy processes as log processes and about half of the number of resolver processes.

All configurations have similar results. The highest throughput I could reach is around 13K/s, there are always 2 or 3 storage queues way higher than the others. Usually one storage queue is around 900MB and one is at 1.5GB. At the same time log queues are all at 1.5GB threshold. Even if I add more disks, reaching 13 log 107 storage processes, the behavior is the same. From @mengxu’s post I think the bottleneck is at storage processes e-brake.

22%20PM

@ryanworl’s response makes me wonder if the issue coming from our data modeling. We model our key, value with expiry by introducing 4 kinds of auxiliary keys. Majority of them are ‘s’ and ‘k’ keys since each of our logical key has these two FoundationDB keys:

‘k’ key: 'k' + key (N random bytes in [a-zA-Z])
‘s’ key: 's' + 2 random bytes (chosen by hash of key) 
                + 4 bytes of a uint32 representing a timestamp + key

Is it because most of foundationDB keys start with ‘k’ and ‘s’ so most of the traffic fall on 2 storage processes?

I am concerned though that you’re pushing up against the limits of what FDB can do for write throughput right now

Is this write throughput limit posted somewhere? Our traffic is around 20K/s level and expect growing over time.

Nope. The key space will be split into shards (key ranges) almost evenly based on shards’ data size.
Eventually, data distribution will try to make sure each storage server team has similar amount of data.
But there is a delay.
During peak workload, data distribution does not have enough time and computation resource to move shards. That’s why hot spot cannot be quickly mitigated.

One thing I feel interesting/weird is that under triple replica, when hot spot happens on a shard, it typically causes 3 storage servers very busy (say high storage queue). In your figure, only 2 storage servers are busy, which makes me curious what happens to the 3rd storage server for the shard?

Oh yeah it’s also quite common there are 3 queues very busy, but usually the third one is less than 900MB. It also reminds me when there are few huge storage queue, sometimes I could see an alert message similar to “system is behind due to IP:port is behind” in “fdbcli —exec status”, sometimes I won’t see that message.

I could pause my client and cluster could catch up the huge storage queues in less than 1 minute. But when I resume my client, queue grows back to threshold within few minutes. Does that mean the traffic I set is simply too high? Is there other way to raise storage process’s priority on moving data?

One way to see the relative load on each the storage servers is to look at the quantity of writes and reads to each. If you are generating your graphs using the status json output, this can be done by getting cluster.processes.<id>.roles.input_bytes (related to bytes written), cluster.processes.<id>.roles.total_queries (number of reads), and cluster.processes.<id>.roles.bytes_queried (bytes read). There are other metrics there too that could provide more information on other dimensions (such as number of mutations, keys read, etc.).

If you are generating your graphs from the trace logs rather than status, you can also get this information from the StorageMetrics event. In that case, you’d get the data from the BytesInput, QueryQueue, and BytesQueried fields, respectively.

If you see that one of these metrics is notably higher on the processes where your queues get larger, it could give you an idea where to focus your efforts. If none of these metrics look any different on the processes where your queues get large, that would be interesting too.

1 Like

The traffic to the 3-server team is too high. The traffic to the entire cluster may not if other storage servers are under used. To make sure all storage servers to be used, data modeling needs to spread the writes to different key spaces (or shards).

As to data distribution (DD) question, the current DD moves data based on data size instead of data-access frequency. I doubt DD will help in this situation.

Since I could pause my client and cluster could catch up the huge storage queues in less than 1 minute., it is another evidence that the storage server simply cannot catch up with the write throughput to it.

I do also find it very suspicious that your first graph had a large number of storage servers all struggling, but your second one had only two.

I don’t see anyone mentioning it here, but if you’re starting from an empty database, then FDB is going to have to compete with your workload to try and split your data sufficiently so that the writes are spread across the servers.

Our own write bandwidth tests pre-load a large volume of data to create enough shards to distribute writes well, wait for data distribution to finish, and then run the actual write bandwidth test.

KrzysFR posted an example of this with good stats from his tooling before.

1 Like