Java: concurrency questions

I’m working on Clojure bindings for FoundationDB, which means I’m using the Java API. Some things are unclear to me even after looking at the source code and reading the documentation, so I thought I’d ask and make sure:

  • Do I understand correctly that if I call forEach on the AsyncIterable, I can expect to get a callback whenever an item becomes available, without tying up the FDB Java API?
  • Is there a better way to achieve the same thing (get a callback for each item becoming available)?
  • What if my callback function blocks because of backpressure? Is that something that the FDB API expects?
  • Calling open on an FDB instance will return a Database object. Is this object intended to be shared by all threads within the application? Will the underlying networking code “do the right thing”?
  • Assuming I do not perform my own blocking operations in transactions, do I need to worry about thread pool size and the potential for deadlocks from thread pool exhaustion if multiple long-running operations are launched?

Hm, I think it depends on what you mean. We don’t override the forEach method, so it’s exactly equivalent to:

for (T item : asyncIterable) {
    func.apply(item);
}

So it will block and eat up a thread doing so, but the callback will, in fact, be invoked as results become available. That thread is the caller thread, rather than, say, the FDB native thread that is actually serving requests, so it won’t tie up everything, but it will eat up resources.

There’s an asynchronous version that executes its callbacks on a user-providable executor, if you want that behavior instead: AsyncUtil (FoundationDB Java Client API)

I think the forEachAsync method in AsyncUtil (linked above) is what you want.

If your function ends up blocking, then it will block the execution thread of whatever executor is running it. The better way to handle that is to return a future to wait instead of blocking. We implemented something like that using a scheduled executor in the Record Layer: https://github.com/FoundationDB/fdb-record-layer/blob/152d3c5b16da550edc511c8d29cff131df9d88f7/fdb-extensions/src/main/java/com/apple/foundationdb/async/MoreAsyncUtil.java#L802

Yep, the Database object can be shared by multiple threads (and, indeed, this is expected behavior). The underlying code executes everything on a single threaded executor (more-or-less), so all the complicated stuff won’t care about how many threads you have calling it.

Hm. If everything is, in fact, non-blocking, then in general, you shouldn’t need to worry too much about thread pool exhaustion except that (as stated above), there is one (and only one) network thread at the native level, so that thread can get clogged up. I don’t know if I’d say that you “never” need to worry about backing up, say, a thread pool that executes callbacks from async work, but if you don’t have any blocking operations, I wouldn’t be too concerned about it. But be forewarned that it can be easy to accidentally introduce blocking code and accidentally tie up threads in your thread pool.

Hmm. I am looking at forEach here: https://github.com/apple/foundationdb/blob/ff78d27575bc793f9d69d6a217b076ebd074a4df/bindings/java/src/main/com/apple/foundationdb/async/AsyncUtil.java#L95 which would appear to call forEachRemaining with a DEFAULT_EXECUTOR https://github.com/apple/foundationdb/blob/ff78d27575bc793f9d69d6a217b076ebd074a4df/bindings/java/src/main/com/apple/foundationdb/async/AsyncUtil.java#L131

I am not sure I fully understand the behavior of forEachRemaining, but at a first glance it appears to be what I need: it calls consumer.accept() for every item. I just don’t know what will happen if the consumer blocks because the downstream isn’t ready for more data.

Let me ask a more general question, which will perhaps explain where I’m coming from. Let’s imagine I’m doing a monster getRange() query. As the results start flowing in (I’m assuming they aren’t all buffered in memory, but are being streamed), my code is consuming them, but can’t keep up. Even if there are buffers, they are not infinite, so there needs to be a backpressure mechanism. How do I consume these results, providing backpressure if needed, but without tying up critical FDB threads? Ideally I’d just block in consumer.accept(), but have to know that this is expected.

I was also hoping for a mechanism where FDB controls what happens, so that I don’t need to manage a separate thread pool with threads that block waiting for data (so, promises don’t help much in this case).

But be forewarned that it can be easy to accidentally introduce blocking code and accidentally tie up threads in your thread pool

Yes, I’ve just had this happen in a large and complex application (not using FDB) and the resulting deadlocks are incredibly difficult to debug. That’s why I’m being extra careful this time.

There is also a variant to specify your own executor:

forEachRemaining is a non-blocking function (at least with respect to reading from the iterator) that calls your Consumer on every item remaining in an AsyncIterator. When called from forEach, you’ll get a fresh iterator from your AsyncIterable and will therefore consume every item.

This will block whatever thread the executor is using to run this work. It won’t block the FDB network thread (where all of the FDB client-side work happens) unless you are using some executor that is running everything on the network thread, which I wouldn’t recommend.

When reading ranges, we fetch the results in batches, only starting a new batch once we’ve exhausted a prior batch. If you slow down your consumption of the iterator, then it will slow down the rate at which results are requested from the cluster. It isn’t necessary that you slow things down by blocking, though. As Alec mentioned, you could instead return a future from your function that processes each result and avoid tying up thread resources. I don’t think we have a variant of forEach to do this exactly, but you could write one using whileTrue if desired.

If you aren’t concerned about blocking threads in your executor, then blocking in the consumption of items should be safe to do from the perspective of the FDB network thread (again with the caveat that you aren’t using an executor that runs things on the FDB network thread).

In either case (either with blocking or using futures), you’ll need to avoid waiting too long if you want to be able to read a range transactionally. After the first read, a transaction will only be able to continue reading for 5 seconds, and since the results are fetched in batches you may find that later batches fail if it takes too long to get to them. In fact, even if you don’t block at all you’ll still be limited in how large a range you can read in one transaction.

If you thought that the processing of data was going to be slow enough to be a problem with respect to this time limit, you could force the reading of all the data at once using the asList function.

After digging deeper into Java async programming, I think I need to do something different and access the iterator myself. I plan to write something similar to forEachRemaining but not in the form of a loop. The difference would be that it would call the subsequent iterator.onHasNext() only after the (Clojure) put! to my outgoing channel has succeeded (from my put! callback). A recursion pattern similar to the one shown on this page Go Block Best Practices · clojure/core.async Wiki · GitHub

From what I understand, this should not block any threads, because it would use the async facilities both on the Java and on Clojure side, and it would provide backpressure. It would also not create any extra thread pools or buffers, which I’m really trying to avoid.

I do realize there is a time limit on transactions. That’s fine. I’m writing a library here, and I’d like to get things correct, regardless of what the application actually does. I’m not actually concerned about multi-second reads, but mostly about an incorrect implementation where some requests throttle others.

By the way, your (that’s a collective team “your”) answers in these forums are fantastic. I don’t know how you find the time or the energy to do this, but the resulting value is incredible. Thank you!

Hello, @alloc. There is no forEachAsync method in AsyncUtils class

1 Like

Right, yeah. Sorry for the confusion. I meant the forEach method on AsyncUtil, sorry.