Scanning a large range with Locality API hangs

Hello!

I need to scan a huge keyrange in fdb. Because such volume of data can not be processed within a 5-seconds limit, I divide the source keyrange to smaller pieces with LocalityUtil.getBoundaryKeys and then I process the pieces. The database is not changed during the scan so I do not warry about consistency.

When I make some code like this (in java)

final CloseableAsyncIterator<byte[]> boundaryKeysIter = LocalityUtil.getBoundaryKeys(db, keyFrom, keyTo);
byte[] laskKey = keyFrom;

while (boundaryKeys.hasNext()) {
  final byte[] nextKey = boundaryKeys.next();
  // do something long with the range lastKey .. nextKey
  lastKey = nextKey;
}
// do something long with the range lastKey .. keyTo

The problem is boundaryKeysIter.hasNext() hangs after about 1500 iterations.

If I modify my loop to iterate over AsyncUtil.collectRemaining(boundaryKeysIter).get() intead of direct iterationg of boundaryKeysIter then my loop works without hang, but this solution will not work when the boundaryKeys collection does not fit to memory.

Seems boundaryKeysIter hangs aftersome period of time. When I do something long during iteration then some transactional timeout occur. But I’d prefer to receive an exception for reinitialising boundaryKeysIter from lastKey and not hang.

What is the right way of iterating through large range keys?

I’m not sure about boundaryKeysIter behavior, but scanning large amount of data is quite common pattern. You will find lot of discussions around it in the forum. For example see this for one of the approaches:

Thank you for the fast answer.

I read the topic Large Range Scans - avoid 5s limit
and I saw this snipped. Its problem is processing of each key sequencionally.

I’d like to iterate subranges (not individual keys) sequencially and to start processing of each subrange in parallel with limitation of the number of ranges are processed simultaneously. So sometimes I have to wait until some ranges finished so it takes some time and makes getBoundaryKeys.hasNext hangs.

I’m trying to implement something similar using getBoundaryKeys()… following is the code snippet

        List<StaticBuffer> keys = new ArrayList<>();
        try (CloseableAsyncIterator<byte[]> it = LocalityUtil.getBoundaryKeys(manager.db, db.range().begin, db.range().end)) {
        	it.forEachRemaining(key -> keys.add(getBuffer(db.unpack(key).getBytes(0))));
        }

I’m encountering an error in above code… below is the error

java.lang.IllegalArgumentException: No terminator found for bytes starting at 1
        at com.apple.foundationdb.tuple.TupleUtil$DecodeState.findNullTerminator(TupleUtil.java:98)
        at com.apple.foundationdb.tuple.TupleUtil.decode(TupleUtil.java:438)
        at com.apple.foundationdb.tuple.TupleUtil.unpack(TupleUtil.java:676)
        at com.apple.foundationdb.tuple.Tuple.fromBytes(Tuple.java:526)
        at com.apple.foundationdb.subspace.Subspace.unpack(Subspace.java:231)
        at com.experoinc.janusgraph.diskstorage.foundationdb.FoundationDBKeyValueStore.lambda$getBoundaryKeys$1(FoundationDBKeyValueStore.java:283)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at com.experoinc.janusgraph.diskstorage.foundationdb.FoundationDBKeyValueStore.getBoundaryKeys(FoundationDBKeyValueStore.java:283)

Any idea about this?

Are you able to log the key that it’s failing to decode, and if so would you be comfortable pasting it here? Also, can you confirm that the key begins with the prefix of your db subspace?

I’m unable to do that. It is throwing error for the first key it is trying to hit.

Are you able to log the key that it’s failing to decode, and if so would you be comfortable pasting it here? Also, can you confirm that the key begins with the prefix of your db subspace?

I’m trying to read a large data from FDB and was trying few things… I’m new to FDB and keep running into issues… My main concern is following error

Caused by: org.janusgraph.diskstorage.PermanentBackendException: Permanent failure in storage backend
        at com.experoinc.janusgraph.diskstorage.foundationdb.FoundationDBKeyValueStore.getSlice(FoundationDBKeyValueStore.java:171)
        at org.janusgraph.diskstorage.keycolumnvalue.keyvalue.OrderedKeyValueStoreAdapter.getKeys(OrderedKeyValueStoreAdapter.java:116)
        at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.getKeys(KCVSProxy.java:56)
        at org.janusgraph.diskstorage.BackendTransaction$4.call(BackendTransaction.java:384)
        at org.janusgraph.diskstorage.BackendTransaction$4.call(BackendTransaction.java:381)
        at org.janusgraph.diskstorage.util.BackendOperation.executeDirect(BackendOperation.java:68)
        at org.janusgraph.diskstorage.util.BackendOperation.execute(BackendOperation.java:54)
        ... 10 more
Caused by: java.util.concurrent.CompletionException: com.apple.foundationdb.FDBException: Transaction is too old to perform reads or be committed
        at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
        at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
        at com.apple.foundationdb.RangeQuery$AsyncRangeIterator.hasNext(RangeQuery.java:250)
        at com.experoinc.janusgraph.diskstorage.foundationdb.FoundationDBKeyValueStore.getSlice(FoundationDBKeyValueStore.java:160)
        ... 16 more
Caused by: com.apple.foundationdb.FDBException: Transaction is too old to perform reads or be committed

It’s throwing it before you even get into the loop, or it’s throwing it the first time in the loop? In other words, could you have something like this?

    List<StaticBuffer> keys = new ArrayList<>();
    try (CloseableAsyncIterator<byte[]> it = LocalityUtil.getBoundaryKeys(manager.db, db.range().begin, db.range().end)) {
    	// Log key here
    	it.forEachRemaining(key -> keys.add(getBuffer(db.unpack(key).getBytes(0))));
    }

I modified the code a little to check a key

        try (CloseableAsyncIterator<byte[]> it = LocalityUtil.getBoundaryKeys(manager.db, db.range().begin, db.range().end)) {
        	// Log key here
        	if(it.hasNext()) {
        		byte[] bytes = it.next();
        		String s = Base64.getEncoder().encodeToString(bytes);
        		System.out.println("LOG: "+bytes+" Length: "+bytes.length+" String: "+s);
        	}
        	it.forEachRemaining(key -> keys.add(getBuffer(db.unpack(key).getBytes(0))));
        }

This is being logged for if condition

LOG: [B@5b970f7 Length: 14 String: FAEA/wD/AP8A/wD/J2I=

It goes inside forEachRemaining loop and fails at

getBuffer(db.unpack(key).getBytes(0))

Below is getBuffer()

    private static StaticBuffer getBuffer(byte[] entry) {
        return new StaticArrayBuffer(entry);
    }

I was able to execute getBoundaryKeys()… had to make some changes since StaticBuffer wasn’t working

   public List<byte[]> getBoundaryKeys(byte[] startkey, byte[] endkey) {    	
        List<byte[]> list = new ArrayList<>();
        try (CloseableAsyncIterator<byte[]> it = LocalityUtil.getBoundaryKeys(manager.db, startkey, endkey)) {
        	CompletableFuture<List<byte[]>> collection = AsyncUtil.collectRemaining(it);
			list = collection.join();
        }
        return list;
    }

Calling it like this

final byte[] foundKey = db.pack(keyStart.as(ENTRY_FACTORY));
final byte[] endKey = db.pack(keyEnd.as(ENTRY_FACTORY));
List<byte[]> list = new ArrayList<>();
list = getBoundaryKeys(foundKey, endKey);
int i = 0;
for(byte[] key : list) {
        	System.out.println(i++ + ": " + ByteArrayUtil.printable(key)+" "+list.size());
       }

When I decode this key, I get \x14\x01\x00\xff\x00\xff\x00\xff\x00\xff\x00\xff'b. The character at index 1 (\x01) is the type code to start a byte string, and it needs to be terminated by a \x00 character. All of the \x00 characters in the string are immediately followed by \xff, though, which is used to escape actual null bytes in the string. In other words, this seems to be encoding tuple that looks like (0, "\x00\x00\x00\x00\x00'b"), except that the byte string is unterminated.

That explains the error message No terminator found for bytes starting at 1.

Your second version that works instead uses ByteArrayUtil.printable(key) to print it rather than unpacking the tuple. I suspect this is the reason that it doesn’t fail.

Thinking about this more, I believe the reason this doesn’t work when you try to decode the tuples is that boundary keys don’t necessarily have to be keys in the database. For efficiency reasons, the keys chosen as the boundary for shards are often a truncated version of actual keys (e.g. if we want to introduce a split between consecutive keys abc and def, the key d will suffice as a split point), and the result of that truncation could be something that is no longer a valid encoded tuple.

I’m not sure this fact is documented anywhere, which we should probably change (see GitHub issue).

Thanks for the explanation, A.J.

I’m little confused how to proceed after getting boundaryKeys.
So I got a List of boundaryKeys of type byte []

List<byte[]> boundaryKeys

Now this has a size of 470 for my use case. How do I proceed and get a List of all KeyValueEntry from this boundaryKeys list?

List<KeyValueEntry> result

Also, could you look at the following link too?

Thanks for the help!!

Are you asking to convert the boundary keys entries into KeyValueEntry objects? I take it this is the JanusGraph object, which I’m not really familiar with, but the boundary keys are not key-value pairs and this may not be the best representation for them. Instead, the boundary keys list contains just keys, and a type that represents that may be more appropriate (a key is just a byte string, so your type would probably need to be something that wraps a byte string).

What specifically are you trying to do with the boundary keys once you have them?

Through BoundaryKeys list, I iterated through them to get all the KeyValueEntry.
Basically, I used first key as start key and second key as end key to the get an Iterable range and then iterated through this range to get KeyValueEntry.

            Iterable<KeyValue> range = tx.getRangeAsync(startKey, endKey, limit);
            return Iterators.filter(
                Iterators.transform(range.iterator(), keyValue -> {
                	StaticBuffer key = getBuffer(db.unpack(keyValue.getKey()).getBytes(0));
                    if (selector == null || selector.include(key)) {
                        return new KeyValueEntry(key, getBuffer(keyValue.getValue()));
                    } else {
                        return null;
                    }
                }),
                keyValue -> keyValue != null);

This process is slow though. Takes up a lot of time if boundary keys are more.