Feature Request: Predicate pushdown

I did a bit of thinking on the way back from FDB Summit about how to implement predicate pushdown at the storage process layer.

Today the only option is to check the layout of the keyspace and send requests outside of FDB to a layer process running on the same machine. This limits you to only the data stored on that machine, which is useful for some applications but probably not all. It also requires every layer to implement that router and correctly handle failures that are already handled by FDB.

My idea for implementation is to extend the transaction_get_range API to add the ability to send arbitrary predicate data and an arbitrary filtering process name as a part of the transaction_get_range API call

FDBFuture* fdb_transaction_get_range_predicate(
   FDBTransaction* transaction,
   uint8_t const* begin_key_name,
   int begin_key_name_length,
   fdb_bool_t begin_or_equal,
   int begin_offset,
   uint8_t const* end_key_name,
   int end_key_name_length,
   fdb_bool_t end_or_equal,
   int end_offset,
   uint8_t const** predicate_filter_names,
   uint8_t const** predicates,
   int limit,
   int target_bytes,
   FDBStreamingMode mode,
   int iteration,
   fdb_bool_t snapshot,
   fdb_bool_t reverse
)

In the foundationdb.conf, you would specify predicate filter processes like this

[predicate_filters.FILTER_NAME]
command = /usr/bin/...
port = 7654

Where FILTER_NAME is the value used in uint8_t const** predicate_filter_names. This would allow multiple different filter processes to live on the same machine at different ports.

Some protocol would be established to send each KV pair to the filter process, along with the data from the corresponding uint8_t const** predicates index. It would parse the predicate in whatever way it wants to for the layer’s goal (regex, comparing numbers, etc.) and return true or false to indicate whether the data should be filtered from the storage server response.

A new client library and set of bindings could be developed to make this part easier on developers. From there, they would just implement the parser for their layer’s predicate payload and execute the logic to filter based that payload.

How this interacts with r/w conflict ranges

Conflict checking would remain the same, this is just a performance optimization. Any data not returned to the client is still assumed to be read during the transaction.

Downsides to this approach

  • Requires work from layer developers
  • Copying data around internally more than before
  • More syscalls moving data around than if it were in-process

Benefits to this approach

  • Doesn’t allow CPU-hungry filters to block the Flow thread
  • Allows arbitrary filters to be implemented
  • Allows developers to use any language

Did I miss anything critical that would prevent this from moving forward? Has there been any discussion in the past on how best to implement this?

1 Like

I don’t think there’s been a solid discussion of what the requirements are of predicate pushdown support, and I’d be interested in seeing the requirements and desires of various potential layers’ uses of a predicate pushdown API before considering how to implement one.

Specifically, if you and anyone else interested in the feature could please detail:

  • What data model you have, and what predicate/computation you want to be able to push down to FDB?
  • Why does this predicate/computation need to be pushed down to FDB, and can’t as effectively or easily be a part of a layer?

I’m concerned that there’s probably a number of issues that complicate creating a layer that’s very efficient and tightly integrated to FDB, and I’d like to make sure we understand specifically what slice of that would be best solved by predicate pushdown (and thus define our requirements), and what problems could be better improved by some other piece of work.

We can use the document layer as an example. To implement a full collection scan and filter on an unindexed attribute, the layer would need to move all the data from each storage server across the network. You can also imagine a query planner deciding a full collection scan is cheaper than using a secondary index because the filter is not selective enough to outweigh the cost of all the PK lookups.

If you use the boundary keys API to target requests above FDB to your colocated layer process, you will reduce bandwidth used for some subset of the data within the range of the collection, but unless the collection fits entirely on that shard, you will still transfer the rest of the collection to your layer process. You could also implement a router on top of your layer to distribute the query across colocated layer - storage server pairs and set their read versions to be the same to scatter-gather the query at the layer level.

Even if bandwidth were free and unlimited, external layer processes would need to implement the parallel processing themselves to filter the full collection in a reasonable amount of time as data volume increases.

If the predicate could be pushed down into the storage layer in some way as an optional part of the get_range API, you would get parallel filtering to the degree of the number of servers involved in your query. Because this is outside of FDB, it could be implemented in any language and not block the Flow thread.

I’m not suggesting building any any data model specific features into the storage processes or any other part of FDB. Rather, you would register processes in foundationdb.conf that are colocated with the storage servers and expose a port. These processes would not be a part of the cluster and would not be reachable externally. The storage processes would send these colocated layer processes the KV pairs and the predicate data from the API. This is an arbitrary payload used by the colocated layer process performing the filter which contains the predicate in the format the layer expects.

An example for the document layer could be something like {"age" : [">", 30]} to represent returning only documents with the age field > 30. This data has no meaning to the storage server and is passed as-is to the colocated layer process performing the filtering.

These processes would be represented by a string name in foundationdb.conf, and you include that string in the client API call to tell the storage server which port to contact for filtering.

A goal when I was thinking about this solution was to avoid enforcing any specific data model and to minimize the CPU required in the storage processes.

I realize this may add the complication of needing to key the client transaction’s cache by not just the key range read but also the predicate filters too somehow.


To answer your questions directly:

What data model you have, and what predicate/computation you want to be able to push down to FDB?

The data model is records with attributes that can be filtered out based entirely upon the contents of the record Keys are also included as a part of the record conceptually. For example, a collection of customer records being filtered by age > 30. If this record is not already indexed by age, a full scan is required. You can imagine more complicated predicates like regular expressions or math which would require more specific indexes than are practical.

Why does this predicate/computation need to be pushed down to FDB, and can’t as effectively or easily be a part of a layer?

  • Reducing network bandwidth between external layers and storage servers for large range reads
  • Automatically parallelizing large range reads among multiple storage servers
  • Existing solutions require each layer to implement a router to send queries above FDB to a colocated storage process
  • Allows effectively more work to be done within the 5s transaction time limit by adding CPU parallelism to the existing IO parallelism.

As an aside (feel free to ignore :stuck_out_tongue:), I think this model could be expanded to allow the colocated process to modify the data before it is returned would also be useful. A motivating example would be performing an aggregation. Being able to push the aggregation into the context of a transaction, but also done in parallel across all the storage servers holding the data, would improve externally observed latency and reduce bandwidth usage for large scans.

To extend the above example, reading the range customers/\x00 to customers/\xff to calculate the average of the age field within the values could be pushed down to a colocated layer process. The keys returned from reading that range with the aggregate pushed down might look like this:

customers/\x00 => {"age": 20, "count": 500}
customers/\x01 => {"age": 42, "count": 1000}
customers/\x02 => {"age": 28, "count": 1200}

Where ...\x00, \x01, \x02 etc. are the boundaries between shards. The client making the get_range API call would know to combine the results into the final aggregate.

(see above about needing to key the transaction cache by the predicates and not just the key range anymore)

Would it be unfair to summarize your desires for predicate pushdown as making this router easier to implement or unnecessary? Are there improvements to the Locality API, or framework that the FDB client could provide to implement scatter/gather, that we could do instead to help make writing this router significantly easier?

For example, if we offered a function

// Returns the list of IPs that a get_range(begin, end) would send requests to,
// suffixed with the supplied port.
SplitScan( Key begin, Key end, int port ) -> [ ip:port of layer process ]

or maybe something like

// Given a function that sends a request to a layer process for a computation
// done between `shardBegin` and `shardEnd`, return the list of responses.
// Note:  the function may be called more than len(result) times, due to client
// shard location caches being incorrect. 
ScatterGather( Key begin, Key end,
    Function CustomRequestSender( Key shardBegin, Key shardEnd ) -> R) -> [R]

If you had the tools available to be able to write a precise router, such that every scan that any layer process would do would be assured to be a local scan, would you still have a reason to want predicate pushdown support?

Yes, an easier way of implementing that router would be more than half of it in my opinion. I think some documentation on how to scatter-gather the query from there once you have that list would also be useful for the main documentation site (which I suppose I’ve volunteered to write :sweat_smile:).

I just want to add that if a filter aims to accept or reject groups of consecutive key-value pairs where each group constitutes a logical “record” or “document” then the boundary keys that could be used between independent filter instances are not necessarily also FDB shard boundaries.

In the scatter-gather model using colocated layer instances, this is easy to work around because you can adjust the given begin and end keys of each query to ensure they are at the beginning and end of a logical record, respectively. The colocated layer will still query mostly local data but possibly also a small amount of remote data.

This is not so easy to address in the model where the storage server is calling some external filter on the result set composed of only local kv pairs.

1 Like

I hadn’t considered that aspect. That would complicate things a bit and is probably enough reason by itself to not implement it the way I described.

Thanks!

Hi All,

I’m part of the CouchDB and I am looking investigating how we would implement our Mango query engine in FDB.

@alexmiller asked us to give an overview of our use case for using a Predicate pushdown. To be honest, @ryanworl pretty much nailed our use case in his above post detailing the document layer. But I’ll give a quick summary from CouchDB’s perspective.

CouchDB Mango is a query language that is based off of MongoDB’s selector syntax. We recently implemented a predicate pushdown for mango. Previously how mango worked was an index was chosen for a query and a start key, end key for the index. Then each shard would stream every document in that range to the coordinating node. The coordinating node would then do an in-memory filter to see if the doc received from a shard matched the complete selector and if so return to the user. We recently changed it that before the shard sent a document to the coordinating node, it also did a full selector match. This resulted in only documents that fully matched the selector to be sent to the coordinating node. This lead to a really nice decrease in internal node network traffic. And it also opens up the possibility of adding aggregations to mango. The idea there being that we can do the aggregations at the shard level and then the coordinating node would do the final aggregation - again Ryan gave a decent summary in his post around that.