Bulk load in Record Layer is slow

We have a custom use case, for which we implemented our own layer in java. But we concluded that it was more or less same as record layer and we are now evaluating record layer.

The first hurdle we have hit is that bulk migration (terabytes) of data via Record Layer is really slow.
Here are details of calls taking a lot of time:

In our own implementation, we initialized a table without any index, migrated data fully and then created indexes later on (if required).
In our implementation, population of a table without any indexes only performed set() operation and no get() operation. We relied on the fact that our (custom) saveRecord() implementation will override the previous key (if present).

But in record Layer, saveRecord() API performs get() to get oldRecord and then set() the new one. Because there are extra operations, we are now seeing slowness in migration and sometimes “transaction too old” error. In my short experience, a transaction with only set() operation is super fast and reliable.

Looking at the the saveRecord() API of record layer, even when flag existenceCheck is set as NONE, we fetch oldRecord for 2 reasons:

(1) to get splitting logic of oldRecord so that we can effectively clean it before inserting newRecord:

(2) to clear indexes corresponding to oldRecord:

For #1, if metaData.isSplitLongRecords() == false, we can safely pass previousSizeInfo as null in saveWithSplit() API.

For #2, if we don’t have any secondary index on recordStore (which we get from metadata cache), we don’t need to call updateSecondaryIndexes() at all.

Is this understanding correct?
If yes, does this mean that we can avoid getting oldRecord for case when:

  • existenceCheck == NONE
  • metaData.isSplitLongRecords() == false
  • no secondary indexes?

Are there lot of other get() calls we are doing in this workflow. If there are many other get() calls we do anyway, above optimizations might not be so useful.

Some more info:
In our custom implementation, we implemented record splitting logic as [ (key,1) -> Split1 ] format. So to cleanup a key, we did clear(range(key)), which is still better then get() operation (based on my short experience). Right now, I am not sure about the splitting logic in record layer which required oldRecord to be fetched to clear it.

There is also bookkeeping associated with changes in the size of existing records along the same #1 path. So, you would need to somehow declare that you are not interested in that instrumentation. Maybe just the lack of a store-timer.

Splitting per se does not require the old record. It is managing the two ways in which versions can be stored. So, some combination of declaring that you never used the old version format and/or that you don’t use versions at all could turn that off.

Do you actually call the synchronous saveRecord one at a time? Because, as a rule, using the Async version and having a dozen to a hundred outstanding would let the reads happen while you were encoding other records. This might still not be good enough for your needs.

I don’t see any fundamental objections to making it possible to skip the read under suitable circumstances. I would be interested to discuss whether this should just be an optimization based on the existing semantics when it can be proven that the old record is not needed. Or a new mode where the caller asserts that there is no old record (and corruption occurs if there is), which would also work if you happened to have secondary indexes.

No, we do async version with batching of records based on size (in MBs). We never got “transaction to old” error in our implementation but we are getting it now on record layer.

It might be that we need to change our strategy of bulk migration for record layer, like keeping track of time as well. If so, please suggest alternates.

I am also interested in it. So the question is, if I have a store where:

  • metaData.isSplitLongRecords() == false OR store does not use versions
  • no secondary indexes
  • no metric collection like store size

Can saveRecord(existenceCheck == NONE) avoid get(oldRecord) call.

Also, does record layer supports enabling/building metrics like “store size” etc at a later stage?

It might be useful for record layer to have a more explicit bulk-loading API to avoid an overly delicate dance of determining if the store is in a state that it could optimize the insert. For some cases the API can help mitigate problems, such as by disabling any defined indexes, however if we wanted to allow bulk loading of split records (or with versioning enabled), it would be on the honor system that the client of the API would ensure that there are no duplicate records being loaded.

As a note: with regard to avoiding transaction_too_old and other potential issues, you can take a look the OnlineIndexer logic. There is an issue opened (https://github.com/FoundationDB/fdb-record-layer/issues/720), in which the plan is to try to encapsulate the logic used in the OnlineIndexer into a generic runner that can automatically take care of managing your transaction work to maximize throughput. In particular, it would be nice to have it use the ability to determine the current transaction size (https://github.com/apple/foundationdb/issues/1682) in informing when to commit (this, along with the usual culprits of transaction time and conflict rates).

1 Like