Scalable time series database / Improving write throughput

Currently I am in the process of creating a time series DB. The main goal of the system is to store time series data about machine metrics (CPU load, etc.). My current model is something like this: key: resolution_per_second / machine_id / (metric, year, month, day, hour, minute, second) value: float64 There are other similar directories for other resolutions (minute, hour, day) e.g. for minute resolution: resolution_per_minute / machine_id / (metric, year, month, day, hour, minute) value: [sum (float64), count (int)]. In the above examples slashes represent directories and parentheses represent tuples. Each 5s a machine produces about 400 metrics that need to be written in the time series DB. Each time a metric is to be written, there is a check to update the aggregated values for each resolution. This works like this: If a new key-value with per second resolution is received then the per minute tuple is updated, and in a similar way this works for lower resolutions. As a result, the current hour tuple is updated every one minute, and the current day tuple is updated every one hour.
Unfortunately this model / implementation is not working quite well. Even though read speeds are quite fast, the write performance is not so good, which results to frequent timeouts. We use around 16 processes to handle approximately 15 machines and essentially each request / transaction is a bundle of 400 metrics for a specific machine. As you can probably tell we use directories in order to avoid large keys and minimize conflicts, we even tried to split those requests to smaller ones, but we still get timeouts. In addition, the fdb cluster doesn’t seem to be bothered by the load with mostly less than 20% IO load.
Do you have any suggestions on how we could improve performance?
Thank you for your time

Can you describe how you are using the directory layer?

Are you using atomic ops?

This sounds more like a request for help in optimizing your cluster rather than your data model. Can you please provide a full description of how you’re running your cluster, in what environment or on what shape of machines, and how you’ve configured FDB on it?

@SteavedHams
At this time we don’t use atomic ops. But undoubtedly they would help with the aggregations. Apart from that, everything else is quite standard, simple writes of key-values and range reads (which are quite fast).

@alexmiller
The FDB cluster is deployed on a GCE kubernetes cluster with the help of the FDB operator. The FDB cluster is run on local ssds. Here is a summary:

Configuration:
  Redundancy mode        - double
  Storage engine         - ssd-2
  Coordinators           - 3
  Desired Proxies        - 3
  Desired Resolvers      - 1
  Desired Logs           - 3

Cluster:
  FoundationDB processes - 7
  Zones                  - 7
  Machines               - 7

and

databaseConfiguration:
      log_routers: -1
      logs: 3
      proxies: 3
      redundancy_mode: double
      remote_logs: -1
      resolvers: 1
      storage_engine: ssd-2
      usable_regions: 1
    generations:
      reconciled: 4
    health:
      available: true
      fullReplication: true
      healthy: true
    processCounts:
      log: 4
      storage: 3

Also here is the status of the cluster when writing data, and at the same getting a few timeouts (transaction timeout: 2s with no retries)

Workload:
  Read rate              - 1078 Hz
  Write rate             - 651 Hz
  Transactions started   - 25 Hz
  Transactions committed - 4 Hz
  Conflict rate          - 0 Hz

Backup and DR:
  Running backups        - 0
  Running DRs            - 0

Process performance details:
  10.112.14.112:4501     ( 11% cpu; 17% machine; 0.003 Gbps;  7% disk IO; 2.7 GB / 11.4 GB RAM  )
  10.112.15.97:4501      (  2% cpu; 45% machine; 0.001 Gbps;  9% disk IO; 0.5 GB / 10.7 GB RAM  )
  10.112.16.97:4501      (  9% cpu; 23% machine; 0.002 Gbps;  6% disk IO; 0.7 GB / 11.6 GB RAM  )
  10.112.17.80:4501      (  2% cpu; 22% machine; 0.001 Gbps;  9% disk IO; 0.5 GB / 11.1 GB RAM  )
  10.112.18.89:4501      (  1% cpu; 49% machine; 0.003 Gbps;  0% disk IO; 0.5 GB / 10.3 GB RAM  )
  10.112.20.78:4501      (  3% cpu; 25% machine; 0.001 Gbps; 10% disk IO; 0.5 GB / 10.3 GB RAM  )
  10.112.23.111:4501     ( 11% cpu; 19% machine; 0.004 Gbps;  4% disk IO; 2.7 GB / 11.0 GB RAM  )

Coordination servers:
  10.112.14.112:4501  (reachable)
  10.112.16.97:4501  (reachable)
  10.112.23.111:4501  (reachable)

Client time: 03/30/20 14:28:06

@alexmiller I’m pretty sure this is a data model or implementation issue.

400 metrics being written per transaction, where each one is writing a raw data point then updating 3 summations for 3 different resolutions every 5 seconds. Without atomic ops, that’s 1200 keys read and 1600 keys written per transaction. With 15 logging hosts doing this, that’s 18,000 reads and 24,000 writes every 5 seconds, which is a very low rate that should not require any cluster tuning at all to achieve.

@v-pap Looking at your Workload statistics, I’m going to guess your transactions are written in a very serial way, so your 1200 reads are often taking more than 2 seconds. Also, the reason I asked about the directory layer is every unique folder read during a transaction adds an additional read latency. Note that “/a/b/c/d” involves 4 unique folders, not one.

At first I would like to thank you for your comment, but I think I need to clarify some things. Each machine starts to scrape metrics from a random offset, for example, at 16:30:14 another machine starts at 16:30:16 so the load is mostly evenly distributed. In addition, the aggregation is not updated every 5 second for every resolution. Indeed the minute aggregation is updated every 5 sec, but the other resolutions are updated in a rolling fashion. For example, the hour aggregation is updated each time a minute has passed, and the day aggregation is updated each time an hour has passed. This is achieved by checking if the current key is present in the FDB. If it isn’t, this means that this is the first time we get data for this e.g. minute so we can assume that the previous minute has been filled adequately and thus it is used to update the hour aggregation. Also I should have written that the metrics could be up to 400, not that they are exactly 400. They usually are between 200-300. As far as the directories are concerned, I don’t think that they are part of the problem, since we cache them each time we need to open them, and we are talking about a maximum depth of 3 directories.

I would say these perf stats are more representative of the workload I describe here.

Workload:
  Read rate              - 2436 Hz
  Write rate             - 1386 Hz
  Transactions started   - 11 Hz
  Transactions committed - 3 Hz
  Conflict rate          - 0 Hz

Backup and DR:
  Running backups        - 0
  Running DRs            - 0

And are you doing these reads serially or in parallel per metric? Read rate gives the number of point or range reads per second, but range reads could be far more expensive. What is the ping RTT between your client and one of the storage servers?

Overall, it seems like with your current implementation, it’s largely the 2s transaction timeout that’s harming you. Both Steve and I are guessing that you’re doing your reads in serial, so you either need to change your code to be able to handle each metric in parallel (and issue reads to FDB in parallel), or lengthen your transaction timeout to give your code more time to run.

As your utilization and 0% conflict rate numbers have indicated, this isn’t a cluster saturation problem nor a transaction conflict problem.

1 Like

It’s clearly obvious that ping is not the problem.

PING 10.112.15.97 (10.112.15.97) 56(84) bytes of data.
64 bytes from 10.112.15.97: icmp_seq=1 ttl=62 time=2.20 ms
64 bytes from 10.112.15.97: icmp_seq=2 ttl=62 time=0.148 ms
64 bytes from 10.112.15.97: icmp_seq=3 ttl=62 time=0.184 ms
64 bytes from 10.112.15.97: icmp_seq=4 ttl=62 time=0.181 ms
64 bytes from 10.112.15.97: icmp_seq=5 ttl=62 time=0.186 ms

If I understand correctly, one main issue of my implementation may be that I write serially the metrics for each machine in one transaction (this applies for the reads that I need for the aggregations as well) is that correct ? Since this implementation is written in python, which framework or library do you propose for parallelizing this process?

The python fdb library supports asynchronous reads. See
https://apple.github.io/foundationdb/api-python.html#future-objects
and
https://apple.github.io/foundationdb/developer-guide.html#developer-guide-programming-with-futures
for more info and an example.

Writes in an fdb transaction do not involve talking to the cluster, they are buffered in the client and submitted all at once when you call commit(). Serial reads are almost certainly why your transactions are taking so long.

Note that you can avoid reads to support aggregation if you use atomic ops, and you could have your current-hour and current-day metrics be updated live while the hour or day is still in-progress without having to do a range scan or accumulate history on the client or whatever you are currently doing. You just need to split your aggregation key into two keys, one for the sum and one for the count, and use the atomic add operation to update them. Since you are storing floats, however, you may need to normalize to some large internal 64 which will mean effectively fixed precision, in case that is a concern. Doing a single read of the current data point being logged to see if it already exists is still a good idea, as it will prevent a data point from adding to the aggregations more than once in the event of a commit_unknown_result error from the cluster.

1 Like