I have a question regarding concurrent read and write transactions in FoundationDB in terms of conflict checking.
In this example, I have two transactions: a read transaction, T1, performs a scan on a key range, and another write transaction, T2, which changes the keys within this key range, interleaves while the read transaction T1 is pending (not committed yet).
I expect FDB to provide linearizability, meaning that after the write transaction T2 commits, the read transaction T1 should be determined to have conflict and thus gets aborted by the Resolver, because T1 reads the key range that covers the keys that have been inserted/updated by T2. However, when I ran the experiment to perform these two concurrent transactions, I found that the read transaction can read all old keys (meaning the keys with the version before the write transaction T2 starts) and then commit without triggering the FDB to raise error/exception.
Am I missing something? My understanding of the transaction conflict is for transactions T1 and T2, if T1 reads some key ranges with a commit version V1, and T2 has committed and updated some overlapped key ranges with a higher commit version V2, then T1 should be aborted.
The timeline to show T1 and T2 start and end transactions, and the involved key ranges, are shown in the diagram below:
T1 T2
start
scan some first keys
of range [0x00, 0xff]
start
add new keys within range [0x00, 0xff]
commit
scan the remaining keys
commit
Below is the full source code of my sample program. FDB version is 6.2.27, and FDB java client version is 6.22.
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
public class SampleConcurrentFDBTx {
private static final int NUM_KEYS_TO_INSERT = 1000;
private static final int NUM_KEYS_TO_READ_BEFORE_WRITE = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
FDB fdb = FDB.selectAPIVersion(620);
Database db = fdb.open("/usr/local/etc/foundationdb/fdb.cluster");
// clear the database
clear(db);
// To prepare the tests by inserting some keys
Transaction tx = db.createTransaction();
for (int i = 0; i < NUM_KEYS_TO_INSERT; i++) {
byte[] key = ("key" + i).getBytes(StandardCharsets.UTF_8);
byte[] val = ("val" + i).getBytes(StandardCharsets.UTF_8);
tx.set(key, val);
}
tx.commit().get();
// Start the read transaction T1 and read some keys first
// in the range of [0x00, 0xFF]
Transaction readTx = db.createTransaction();
System.out.println("Read tx start at " + readTx.getReadVersion().get() +
" with read version " + readTx.getReadVersion().get());
KeySelector begin = new KeySelector(new byte[] { (byte) 0x00 }, true, 0);
KeySelector end = new KeySelector(new byte[] { (byte) 0xff }, true, 0);
Iterator<KeyValue> iter = readTx.getRange(begin, end, 0, false, StreamingMode.SMALL).iterator();
for (int i = 0; i < NUM_KEYS_TO_READ_BEFORE_WRITE; ++i) {
KeyValue keyValue = iter.next();
System.out.println(String.format("Key: %s, value: %s", new String(keyValue.getKey()),
new String(keyValue.getValue())));
}
System.out.println("Read tx completed reading some keys.");
// Now start the write transaction to add new key-value pairs and then commit
Transaction writeTx = db.createTransaction();
System.out.println("Write tx starts at " + System.currentTimeMillis() +
" with read version " + tx.getReadVersion().get());
for (int i = 0; i < NUM_KEYS_TO_INSERT; i++) {
byte[] key = ("key" + i + "abc").getBytes(StandardCharsets.UTF_8);
byte[] val = ("val" + i + "abc").getBytes(StandardCharsets.UTF_8);
writeTx.set(key, val);
}
writeTx.commit().get();
System.out.println("Write tx committed at " + System.currentTimeMillis());
// Back to the read transaction to read remain key-value pairs and commit
System.out.println("Read tx resumes read keys at " + System.currentTimeMillis());
while (iter.hasNext()) {
KeyValue keyValue = iter.next();
System.out.println(String.format("Key: %s, value: %s", new String(keyValue.getKey()),
new String(keyValue.getValue())));
}
readTx.commit().get();
System.out.println("Read tx committed at " + System.currentTimeMillis());
// The code below could be uncommented to re-scan to verify that the write transaction did commit successfully
// System.out.println("Re-scan the database");
// readTx = db.createTransaction();
// iter = readTx.getRange(begin, end, 0, false, StreamingMode.SMALL).iterator();
// while (iter.hasNext()) {
// KeyValue keyValue = iter.next();
// System.out.println(String.format("Key: %s, value: %s", new String(keyValue.getKey()),
// new String(keyValue.getValue())));
// }
// readTx.commit().get();
}
private static void clear(Database db) throws ExecutionException, InterruptedException {
byte[] begin = new byte[] { (byte) 0x00 };
byte[] end = new byte[] { (byte) 0xFF };
Transaction tx = db.createTransaction();
tx.clear(begin, end);
tx.commit().get();
}
}