Large Range Scans - avoid 5s limit

Hi,

If I have to perform a really large range scan - assume few 10s of million rows from a single client, what would be the suggested mechanism without running into 5 sec transaction limit?

Some more details

  • Scan required for order of 10s of million rows - actual bytes could run into few GB.
  • Key distribution is not precisely known in advance.
  • Even if FDB can return so much data to the single client (concurrent) within 5 seconds, assume that the client itself cannot hold all the returned data in memory and there would apply some processing on the incoming stream. Due to amount of computation required, this processing can overflow 5 seconds.
  • It is acceptable that client does not see any new data after it started the read operation.
  • Very likely there will not be any concurrent writes to the range being read, while this range scan is going on; however, if possible, it would be preferable to not abort those concurrent write transactions if they do occur.

Reading through the docs, it appears that I can do single reads at snapshot isolation level. Can range scans be done at snapshot isolation level as well? Would doing a range scan at snapshot isolation level meet the needs listed above? If not, then is there an alternative way to do this?


thanks,
gaurav

4 Likes

I use an iterator that chains together lots of limited size range scans in ReadTransactions. The idea is as follows:

  • Create the iterator with a start and end and a max number of keys per getRange call
  • In the constructor, async start the first range scan of up to max keys using readAsync/snapshot
  • In the hasNext, get the already running range and start the next one using KeySelector where it starts at the key after the last key in the range you just materialized
  • Continue until the range returned is empty

It works well as long as you don’t need consistency. If you need it to not see new writes you will have to put versionstamps or something into your key so you can skip newer versions than the one you started with.

4 Likes

I just want to add that if your processing allows parallelism then you can split the task into multiple non-overlapping ranges using the shard boundaries from get_boundary_keys.

1 Like

The Python locality API implementation shows another technique for doing non-isolated range scans (basically, just restart the scan when hitting the 5 second limit).

4 Likes

Thanks for all the useful suggestions! I am still just getting started with the data and consistency models here…

One follow up question: if I perform a range scan on a snapshot() of the database, which, if I understand correctly, disables conflict checking of any other concurrent transaction with this one, then why should 5 sec limit be enforced here?

In other words, if I am using a snapshot of the database for reading ranges, I am okay letting go of the consistency guarantee. So why apply 5 sec limit which most likely is present to prevent conflicting operations?

I am sure there must be something trivial that I am missing :slight_smile:

Your intuition that this doesn’t seem like it should be an architectural limitation is absolutely correct.

The reason read-only snapshot transactions are also bounded by the 5s limit is mostly due to the implementation of our SSD storage engine. Currently that storage engine is not in fact a multi-versioned store. It stores a single version of the database at a point more than 5 seconds in the past. MVCC of more recent versions is kept in storage server memory[1] in a data structure that overlays the data on disk. Periodically, the oldest versions are detached from the in-memory tree and written to disk.

If someday a storage engine were written that was multi-versioned on disk, my sense is it’d be relatively straightforward to support long-running read-only transactions. There’s a discussion thread on new storage engine ideas here: Discussion thread for new storage engine ideas

[1] But note that they are fully durable due to being written to disk on the transaction logs.

3 Likes

Thanks. That explains it perfectly!

[UPDATE: I think that transaction_too_old is a retryable exception by default, and thus, there may not be any need for special handling]

Hi, following the suggestion made by Dave, I am trying to write an async method that reads a large number of rows, and in case there is too_old_transaction error, a new transaction is created and the read continues.

It is the responsibility of the caller to do bookkeeping of which all rows have been fetched and once the caller has all the data it can return indicating that all data has been collected.

To do this I copied the method runAsync from FDBTransaction class and updated the handler (copied below).

  1. The retryable can be called multiple times (in case of retriable errors as well as in case of too old tx error.
  2. retryable has to perform any bookkeeping and return without any error on completion.

Could someone please review if the changes to handler are correct (and minimal)? I am closing the old transaction object in order to free up resources; is this the correct way to do so? I noticed that onError method does something more sophisticated (transfer etc.); is that needed here?

Is there a better/more robust way to achieve the same?


thanks,
gaurav

    public <T> CompletableFuture<T> longMultiReadAsync(
            final Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable, 
            Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<>(db.createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<>();
return AsyncUtil.whileTrue(() -> {
    CompletableFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());

    return AsyncUtil.composeHandleAsync(process.thenComposeAsync(returnVal ->
                    trRef.get().commit().thenApply(o -> {
                        returnValue.set(returnVal);
                        return false;
                    }), e),
            (value, t) -> {  // error handler
                if (t == null) { return CompletableFuture.completedFuture(value); }
                if (!(t instanceof RuntimeException)) { throw new CompletionException(t); }
 **                boolean handled = false;
 **                if (t instanceof FDBException) {
 **                    final int errCode = ((FDBException) t).getCode();
 **                    if (errCode == 1007) { // too old tx. just create a new tx and return
 **                        final Transaction oldTx = trRef.get();
 **                        trRef.set(db.createTransaction(e));
 **                        oldTx.close();
 **                        handled = true;
 **                    }
 **                }
 **                if (handled) {
 **                    return CompletableFuture.completedFuture(true);
 **                } else {
                    return trRef.get().onError(t).thenApply(newTr -> {
                        trRef.set(newTr);
                        return true;
                    });
                }
            }, e);
}, e)
     .thenApply(o -> returnValue.get())
     .whenComplete((v, t) -> trRef.get().close());
}

My use case is very similar to yours. Did the above code worked?

Here is the almost complete code snippet that I currently use for this purpose. It should be straight forward to extend/update it:

// input byte[] start, byte[] end, boolean reverse
final AtomicReference<byte[]> seen = new AtomicReference<>(null);

fdb.readAsync(tx -> {
    // DO: Set timeout, num retries on the transaction object (tx)
    byte[] en = end;
    byte[] st = start;

    if (reverse) {
        en = seen.get() != null ? seen.get() : end;
    } else {
        // Note: nextKey is merged in master branch and not present on 6.2; copy it from github
        st = seen.get() != null ? ByteArrayUtil.nextKey(seen.get()) : start; 
    }

    return AsyncUtil.forEach(
            tx.getRange(st, en, ROW_LIMIT_UNLIMITED, reverse),
            kv -> {
                seen.set(kv.getKey());
                // DO: Consume the KV here...
            });
}, executor).join();
1 Like

@gaurav
How do I scale the above async iterator to fetch data under multiple transactions. As in one transaction only 10^7 bytes can be read in 5 secs. Is there a way to read in multiple transactions. I want to read around 100Mb of data.

Is snapshot reads a possible strategy? Does foundationdb support other kinds of read-only transactions?

Above code snippet along with readAsync() method will handle what you are asking for:

  • readAsync() will retry the transaction function (in a new transaction every time) when it exceeds 5s limit.
  • the snippet above will fetch a new range read by providing a start key that is after the last fetched.

readAsync will not split the reads across multiple transactions.

Runs a function in this context that takes a read-only transaction. Depending on the type of context, this may execute the supplied function multiple times if an error is encountered. This call is non-blocking – control flow will return immediately with a CompletableFuture that will be set when the process is complete.

the above strategy will fail if my data is beyond 10 MB limit. It will also fail if the range read request will genuinely take > 5 sec even if you retry it multiple times.

The quoted sentence is the key: the function will execute multiple times on errors (retryable errors).
transaction_too_old is a retryable error.

That readAsync method is not explicitly splitting the work in multiple transactions - it is simply retrying the logic; the snippet above (with seen variable) is using this fact to split the work in multiple transactions.

Could you give the above snippet a try in your test/local setup and then let me know if it does not work.

Independently, 10MB limit only applies at commit() time; it is not applicable for reading data; one can read as much data as possible within 5s limit of a transaction.

One thing to watch out for when using the transaction_too_old error like this to scan a large range is that the transaction will insert a backoff delay before it retries, and I think by default this will reach up to 1 second.

There are various ways to avoid this behavior. You could start a new transaction after some amount of time before 5 seconds to avoid the error, and you could even simultaneously start a new transaction to get a read version before switching (so as to hide the GRV latency).

You can also manually write a retry loop and specifically check for the transaction_too_old error. If you get that error, then instead of calling onError, you can create a new transaction (or reset it, if available in your language). A quick python example to demonstrate this:

tr = db.create_transaction()
while True:
    try:
        # do reading, break when done
    except fdb.FDBError as e:
        if e.code == 1007: # transaction_too_old. May also want to check whether progress was made and use the backoff if not
            tr = db.create_transaction()
        else:
            tr.on_error(e).wait()

This is a very common issue, I suppose. Would it make sense to support transaction options to exclude certain type of error codes from back-off delay?

I’ve tried to implement large scans in parallel with LocalityUtil.getBoundaryKeys in java, but have some troubles. See more at Scanning a large range with Locality API hangs

Hi.

Seems ByteArrayUtil does not have nextKey method

This is probably what you are after: https://github.com/apple/foundationdb/blob/61fcce08cc9a894130c913ba8ca89963e93e6acc/bindings/java/src/main/com/apple/foundationdb/tuple/ByteArrayUtil.java#L364

Thanks, gaurav.

ByteArrayUtil.keyAfter is added in fdb 7.0, but I’m still using 6.2, where it does not exist.

So I’ve copied it from 7.0 to my code. Seems it works.

Another question: I use forEachAsync instead of forEach. Does anybody ensure the strong order of calling this seen.set. Another words, is always there the biggest key in the seen?