FoundationDB

Large Range Scans - avoid 5s limit


(gaurav) #1

Hi,

If I have to perform a really large range scan - assume few 10s of million rows from a single client, what would be the suggested mechanism without running into 5 sec transaction limit?

Some more details

  • Scan required for order of 10s of million rows - actual bytes could run into few GB.
  • Key distribution is not precisely known in advance.
  • Even if FDB can return so much data to the single client (concurrent) within 5 seconds, assume that the client itself cannot hold all the returned data in memory and there would apply some processing on the incoming stream. Due to amount of computation required, this processing can overflow 5 seconds.
  • It is acceptable that client does not see any new data after it started the read operation.
  • Very likely there will not be any concurrent writes to the range being read, while this range scan is going on; however, if possible, it would be preferable to not abort those concurrent write transactions if they do occur.

Reading through the docs, it appears that I can do single reads at snapshot isolation level. Can range scans be done at snapshot isolation level as well? Would doing a range scan at snapshot isolation level meet the needs listed above? If not, then is there an alternative way to do this?


thanks,
gaurav


(Sam Pullara) #2

I use an iterator that chains together lots of limited size range scans in ReadTransactions. The idea is as follows:

  • Create the iterator with a start and end and a max number of keys per getRange call
  • In the constructor, async start the first range scan of up to max keys using readAsync/snapshot
  • In the hasNext, get the already running range and start the next one using KeySelector where it starts at the key after the last key in the range you just materialized
  • Continue until the range returned is empty

It works well as long as you don’t need consistency. If you need it to not see new writes you will have to put versionstamps or something into your key so you can skip newer versions than the one you started with.


(Steve Atherton) #3

I just want to add that if your processing allows parallelism then you can split the task into multiple non-overlapping ranges using the shard boundaries from get_boundary_keys.


(David Scherer) #4

The Python locality API implementation shows another technique for doing non-isolated range scans (basically, just restart the scan when hitting the 5 second limit).


(gaurav) #5

Thanks for all the useful suggestions! I am still just getting started with the data and consistency models here…

One follow up question: if I perform a range scan on a snapshot() of the database, which, if I understand correctly, disables conflict checking of any other concurrent transaction with this one, then why should 5 sec limit be enforced here?

In other words, if I am using a snapshot of the database for reading ranges, I am okay letting go of the consistency guarantee. So why apply 5 sec limit which most likely is present to prevent conflicting operations?

I am sure there must be something trivial that I am missing :slight_smile:


(Will Wilson) #6

Your intuition that this doesn’t seem like it should be an architectural limitation is absolutely correct.

The reason read-only snapshot transactions are also bounded by the 5s limit is mostly due to the implementation of our SSD storage engine. Currently that storage engine is not in fact a multi-versioned store. It stores a single version of the database at a point more than 5 seconds in the past. MVCC of more recent versions is kept in storage server memory[1] in a data structure that overlays the data on disk. Periodically, the oldest versions are detached from the in-memory tree and written to disk.

If someday a storage engine were written that was multi-versioned on disk, my sense is it’d be relatively straightforward to support long-running read-only transactions. There’s a discussion thread on new storage engine ideas here: Discussion thread for new storage engine ideas

[1] But note that they are fully durable due to being written to disk on the transaction logs.


(gaurav) #7

Thanks. That explains it perfectly!


(gaurav) #8

[UPDATE: I think that transaction_too_old is a retryable exception by default, and thus, there may not be any need for special handling]

Hi, following the suggestion made by Dave, I am trying to write an async method that reads a large number of rows, and in case there is too_old_transaction error, a new transaction is created and the read continues.

It is the responsibility of the caller to do bookkeeping of which all rows have been fetched and once the caller has all the data it can return indicating that all data has been collected.

To do this I copied the method runAsync from FDBTransaction class and updated the handler (copied below).

  1. The retryable can be called multiple times (in case of retriable errors as well as in case of too old tx error.
  2. retryable has to perform any bookkeeping and return without any error on completion.

Could someone please review if the changes to handler are correct (and minimal)? I am closing the old transaction object in order to free up resources; is this the correct way to do so? I noticed that onError method does something more sophisticated (transfer etc.); is that needed here?

Is there a better/more robust way to achieve the same?


thanks,
gaurav

    public <T> CompletableFuture<T> longMultiReadAsync(
            final Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable, 
            Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<>(db.createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<>();
return AsyncUtil.whileTrue(() -> {
    CompletableFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());

    return AsyncUtil.composeHandleAsync(process.thenComposeAsync(returnVal ->
                    trRef.get().commit().thenApply(o -> {
                        returnValue.set(returnVal);
                        return false;
                    }), e),
            (value, t) -> {  // error handler
                if (t == null) { return CompletableFuture.completedFuture(value); }
                if (!(t instanceof RuntimeException)) { throw new CompletionException(t); }
 **                boolean handled = false;
 **                if (t instanceof FDBException) {
 **                    final int errCode = ((FDBException) t).getCode();
 **                    if (errCode == 1007) { // too old tx. just create a new tx and return
 **                        final Transaction oldTx = trRef.get();
 **                        trRef.set(db.createTransaction(e));
 **                        oldTx.close();
 **                        handled = true;
 **                    }
 **                }
 **                if (handled) {
 **                    return CompletableFuture.completedFuture(true);
 **                } else {
                    return trRef.get().onError(t).thenApply(newTr -> {
                        trRef.set(newTr);
                        return true;
                    });
                }
            }, e);
}, e)
     .thenApply(o -> returnValue.get())
     .whenComplete((v, t) -> trRef.get().close());
}