Multiple questions about Indexes, functions and watches to implement etcd-layer

Hi everyone! After playing with the FDB Operator, I decided to play a bit with the record-layer by developing an ETCD layer. My goal is to see how far I can go using the record-layer without using a fdb-client directly. This is mostly a toy project for now, but I’m having a lot of fun :slight_smile:

Here’s what I did for now:

  • etcd protobuf was imported and exposed with Vert.x,
  • Record-layer is used. As etcd is also using protobuf, we are directly storing the KeyValue message,
  • Integrations test using a real FDB spawned with testcontainers and official Java etcd client,
  • Tests are backported from jetcd test cases
  • Supported operations:
    • put,
    • get,
    • scan,
    • delete,
    • compact,
  • ETCD MVCC simulated using FDB’s read version

I successfully implemented basic crud operations thanks to the recordStore. Now, before moving to watches and leases, I want to understand correctly how Indexes works and how I can use them, appart from optimize queries with filtering.

Indexes and functions

By reading the Extending doc, I understood the fact that an index is only a subspace in-sync with a record-store. There seems to be multiples IndexTypes which are related to Functions; their goal seems to be the maintainer of the index’s subspace.

There seems to be different kind of functions, and I have some questions about them:

  • What is the difference between:

    • aggregate func and aggregate index func?
    • RecordFunction, StoreRecordFunction and IndexRecordFunction? I have difficulties to represent the difference by reading the related part on the Extending doc.
  • What is the workflow of a IndexMaintainer? Is it something like:

    1. call to saveRecords
    2. foreach index in indexes:
      • apply keyExpression to the record to produce a list of tuples for each index entry
      • use IndexMaintainer on the list of Tuples to updates Index
    3. store record
  • Where can I find the code for the moment where IndexTypes.VALUE or IndexTypes.MAX_EVER Indexes are used? I couldn’t find it and I would like to see under the hood how it is working for a simple case like the secondary index pattern or keeping the ever seen max long.

Query an Index

I runned into some difficulties to directly query an Index. I tried to use the universal Index globalCount declared in the example.Main class but it is always returning 0 (code and test are available here). I do have some questions about this:

  • Why do we need to specify a KeyExpression in the context of an universal Index? It seems to me that I don’t know how to create a keyExpression that would be matching for example all records for a specific KeySpace/Directory.
IndexAggregateFunction function = new IndexAggregateFunction(
        FunctionNames.COUNT, COUNT_INDEX.getRootExpression(), COUNT_INDEX.getName());

return recordStore.evaluateAggregateFunction(
        Collections.singletonList("KeyValue"),
        function, Key.Evaluated.fromTuple(new Tuple()), IsolationLevel.SERIALIZABLE)
  • for the above code, why do I need to Evaluate again against keys? I thought I already did defined them inside the Index. Is is the original key?

  • Why do I need to provide an AggregateFunction? From my point-of-view, As the index is maintained, I just need to get within the index keyspace?

Watch

My next big work will soon be able Watch and Lease support. I did not went deep yet on thinking how I should implement this, so for now I only have questions.

As an index is an in-sync keyspace aside the recordStore, is it the right way to use it for storing other info, a bit like the MAX_EVER info?
Could we theoretically be able to reimplement all the needed things to have watches in the similar fashion than @pH14’s work around the ZK layer?

I watched the talk, and my first thoughts were mixed. On the other hand, I can easily create some records that will be used to create for example the Watch Event Log, but I’m not certain about certains things:

  • watch over record-layer,
  • versionstamp as part of the primary-key not supported in record-layer,
  • the approach between Index and Record.Maybe we can represent the Watch Event log as an Index? Or maybe as a regular other recordStore?

I would love to have some insights/tips/infos from more experienced record-layer users than myself, hence the number of questions included in this post :stuck_out_tongue:

Thanks a lot for reading my pretty long post!

4 Likes

I continued to play with indexes, and answer some questions by myself :slight_smile:
I will try to explain what I learned :stuck_out_tongue:

Indexes

Indexes are a subspace in-sync with a record-store. It is used to optimize queries (setFilters) but you can also view them as an alternative POV of your recordstore. For example, You can setup an Index that will count every record that you have:

  // Keep a global track of the number of records stored
  protected static final Index COUNT_INDEX = new Index(
    "globalRecordCount", // name
    new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), // a key expression
    IndexTypes.COUNT); // an index-type
// and after

// add a global index that will count all records and updates
metadataBuilder.addUniversalIndex(COUNT_INDEX);

Here, for every record, we will construct a Index called globalRecordCount and will populate with keys and values according to the key-expression and index-type. In this example, if you scan the index using this:

        recordStoreProvider.apply(context).scanIndex(
          COUNT_INDEX,
          IndexScanType.BY_GROUP,
          TupleRange.ALL,
          null, // continuation,
          ScanProperties.FORWARD_SCAN
        ).asList().join().stream().forEach(
          indexEntry -> log.trace("found an indexEntry for stats: key:'{}', value: '{}'", indexEntry.getKey(), indexEntry.getValue())
        );

You willl have in your log as requested a empty key and the count of records:

EtcdRecordStore - found an indexEntry for stats: key:'()', value: '(2)'

Let’s make another example. In my case (ETCD) each record is represented by a key and a version. Version are starting at 1 and are incrementing. One inserting a new record, I need to retrieve the latest version and do a +1 before inserting one. Before I was scanning old records for the key and filtering to get the last version. I can now use an Index that will hold this for me.

As I have multiple versions per key, I will aggregate my record by keeping the greatest value of Version. I could have went with a Index.Count, but I wanted to play with Max :slight_smile:

  // keep track of the version per key with an index
  protected static final Index INDEX_VERSION_PER_KEY = new Index(
    "index-version-per-key",
    Key.Expressions.field("version").groupBy(Key.Expressions.field("key")),
    IndexTypes.MAX_EVER_LONG);

Almost the same thing, except that I can represent the aggregation performed on my record before populate my index: here, I’m basically saying what the key and value will look like in the index:

  • rowkey will be my key field on protobuf, with version aggregated
  • value will be the max-ever long value seen

Let’s scan it

        recordStoreProvider.apply(context).scanIndex(
          INDEX_VERSION_PER_KEY,
          IndexScanType.BY_GROUP,
          TupleRange.ALL,
          null, // continuation,
          ScanProperties.FORWARD_SCAN
        ).asList().join().stream().forEach(
          indexEntry -> log.trace("found an indexEntry: key:'{}', value: '{}'", indexEntry.getKey(), indexEntry.getValue())
        );

and after inserting some values:

found an indexEntry: key:'(b"sample_key")', value: '(1)'
found an indexEntry: key:'(b"sample_key2")', value: '(2)'

So we have a keyspace, with key as a rowkey and with a tuple of the max ever seen value of version field. Let’s retrieve the value for a specific index.


      // retrieve version using an index
      IndexAggregateFunction function = new IndexAggregateFunction(
        FunctionNames.MAX_EVER, INDEX_VERSION_PER_KEY.getRootExpression(), INDEX_VERSION_PER_KEY.getName());

      Tuple maxResult = recordStoreProvider.apply(context)
        .evaluateAggregateFunction(
          Collections.singletonList("KeyValue"), function,
          Key.Evaluated.concatenate(record.getKey().toByteArray()), IsolationLevel.SERIALIZABLE)
        .join();

The IndexAggregateFunction is a function that can be applied on a aggregated index, such as MAX. We can choose on which key we will evaluate, here the original key.

One thing I did not understand is why I needed to say again the key during evaluateAggregateFunction. The answer is that I can choose which key I want to aggregate. Do I want to retrieve only one value? Or maybe I want to retrieve the max version across all keys? It is simple as replacing Key.Evaluated.concatenate(record.getKey().toByteArray()), with Key.Evaluated.empty, which will match all keys from the index.

Function MAX, count and so on are using FDB atomic operations.

All modifications to an index are done within the same transaction as inserting the record, so you are indeed in-sync.

Questions

Now I have others questions :stuck_out_tongue::

  • I still have no clues what RecordFunction , StoreRecordFunction and IndexRecordFunction are,

  • You seems to be able to load your own functions and indexes, but I did not yet found how,

  • does watching is allowed through the record-layer? Or do I need to use the FDB client,

  • As we can design the key of an index to be pretty static thanks to aggregation, we would imagine put a watch on a key of a designed aggregated index. For that I need the full rowkey of a value, how can I compute/retrieve it? My guess is that I can retrieve it with the Index KeySpace.

  • for the globalCount, can I create an universal record count that will “know” the context like:

    • the directory with the current context,
    • the record name?
      I would love to have something like (“application”, “etcd”, “tenant”, “tenantA”, “record”, “myRecord”) => (myCount)

Thanks again for open-sourcing the record-layer, I must admit that I’m having a lot of fun lately! I really love the API and the possibilities offered by it :+1:

Nice work!

Stay safe in those trouble times,
Pierre

1 Like

Thanks for your patience as we were mulling over this post! We’re certainly glad you’re finding the Record Layer fun to explore!

Functions

Okay, there were a couple of different questions regarding the various function classes. If I’m honest, the distinctions are a little subtle (and I needed to look them again), but the understanding I have is:

  1. Aggregate functions and aggregate index functions are both operations where you want to take all values from a specific field across multiple records and “aggregate” them together. For example, the “sum” aggregate function represents the sum of a certain field aggregated across all records. Something like SELECT sum(some_field) FROM some_record_type WHERE some_predicate() in SQL-ish.
  2. Aggregate index functions are like regular aggregate functions, but they use an index to answer. For many aggregate functions, using an index is just an optimization, though there are certain aggregates that actually require an index. For example, if you didn’t have a “sum” index, you could still answer a “sum” query by iterating over all records and summing the value. But, the “max_ever” function (which is an aggregate index function) cannot be answered without having a “max_ever” index because if you insert a record with a higher value of the indexed field, then that updates the “max_ever” value, but then if you remove it, then you should still have the same “max_ever” value even though it can no longer be recomputed from just the records. (The “count_updates” index, which counts how many times a thing is updated, is similar, and is probably miscategorized in FunctionNames.)
  3. Record functions are functions that produce a single value based on the value of a field for a single record. That function might require looking at an index and it’s value may depend on what other data are being stored, but it’s not “aggregating” the values across multiple records; it computes the value for a single record. So, for example, the “rank” function is a record function because each individual record will have its own rank (based on a given field).
  4. Then an IndexRecordFunction is a record function whose value is determined by looking at an index, whereas the StoreRecordFunction doesn’t look at an index (just the record store). For example, the rank function will typically be implemented with an IndexRecordFunction as it needs to figure out the rank of a record based on an appropriate “rank” index, but a “version” function (getting the FDBRecordVersion associated with a given record) would typically use a StoreRecordFunction as it doesn’t require using a separate index (the versions are stored right with the records themselves).

So, I hope that helps, though it’s definitely a somewhat confusing hierarchy, so if you have more follow up questions, I’d be happy to try and clarify.

Indexes

The basic flow you lay out for index maintenance is essentially correct. Within saveRecord, we essentially iterate over the indexes defined on that index type and then call update on the index maintainer implementation. Then the commit will transactionally update the records and all the indexes.

Believe it or not, these questions are actually related.

So, when an index is about to be updated, we first load it, based on its type, from a registry mapping index type to index maintainer implementation (or, actually, index maintainer . See: https://github.com/FoundationDB/fdb-record-layer/blob/1804da48ccf3dcec301767e27743ea492f394f74/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java#L583

The registry knows what maintainers exist by reading from a service loader the list of available maintainers. See: https://github.com/FoundationDB/fdb-record-layer/blob/1804da48ccf3dcec301767e27743ea492f394f74/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexMaintainerRegistryImpl.java#L58 (Though the maintainer registry is configurable, so you could pick your own favorite implementation instead of using the default one, though there may be places where we call IndexMaintainerRegistryImpl.instance() other than just to populate the default, so here there be dragons.)

The service loader knows which classes exist by looking in a file included in the Jar’s resources. Entries can be added to that file by using the AutoService class annotation. See the ValueIndexMaintainerFactory, which lists the types of indexes it provides maintainers for as just the IndexType.VALUE type and it marks itself with @AutoService(IndexMaintainerFactory.class). See also the AtomicMutationIndexMaintainerFactory, which provides an implementation for something that provides multiple types.

So, the steps to creating a custom index type are something like:

  1. Create an implementation of the IndexMaintainer class (which probably should be an interface instead of an abstract class), possibly one that extends the StandardIndexMaintainer. Note that our track record with keeping the API there stable from the point of view of an implementor is a little bit weaker than our track record with keeping it stable from the point of view of a consumer. So, for example, we may add methods that new adopters will need to add their own implementations of.
  2. Create an implementation of the IndexMaintainerFactory that knows how to produce IndexMaintainers of the new type you’d like to add. Add the type to list it returns in getTypes, and mark the class as @AutoService(IndexMaintainerFactory.class).
  3. There’s no step 3! (Or, more seriously, you should be able to now create indexes with the type you defined.)

Note that it’s kind of on you, the index maintainer, to make sure you don’t accidentally change the index in some backwards incompatible way, storage wise. So, for example, if your index involves serializing some data, you have to make sure that the serialization is “stable” and won’t break when you upgrade software (or are okay with not being able to read old data, etc.).

For custom functions, I’m not sure if you are referring to the “record functions” and “aggregate functions” discussed above, or FunctionKeyExpressions. In many instances, it may actually be preferable to produce a new FunctionKeyExpression instead of a new index type. For example, if your index is really about extracting information in a new way (like, for example, an “descending” sort index, where field values are sorted in descending rather than ascending order), you may just want a custom function (in the descending sort case, one that flips the bytes of the extracted field so that it sorts “the wrong way”). If your index is about storing data in a new way (e.g., maintaining a whole new data structure so that you can do exotic things that can’t be done with an existing index), then you might have a new container. For example, in the world of “weird indexes you might want to have on text”, we have (1) a “text index”, for full body search, that stores data on disk in a data structure that is designed to use less space than a naïve approach, and (2) a “collation” FunctionKeyExpression, which allows the user to specify a locale to produce an index entry with appropriate case folding for the text they produce (for use with a standard value index). (As an aside, it’s possible if we were doing things again that text indexes would take a special FunctionKeyExpression from “string” to “map<string, list>” instead of essentially putting both things in the index, but also, maybe alea iacta est on that one…)

In any case, defining a custom FunctionKeyExpression is very similar to the way a custom index is defined:

  1. Create a new implementation of the FunctionKeyExpression class. See, e.g., GeophileSpatialFunctionKeyExpression
  2. Create an implementation of the FunctionKeyExpression.Factory interface, and make sure to mark it as @AutoService(FunctionKeyExpression.Factory.class) as well as include a builder for the key expression in the list of builders it returns. See, e.g., GeophileSpatialFunctionKeyExpressionFactory
  3. The function can now be referenced by name with something like Key.Expressions.function(functionName), and it can be queried (if the function implements QueryableKeyExpression), though that bit of the API is somewhat newer and may have some deficiencies in the planner.

Defining a custom IndexAggregateFunction I think is easier. It’s been a while since I’ve done it, so I could be wrong, but I think you can just start using a new name for the function, and then as long as you implement canEvaluateAggregateFunction and evaluateAggregateFunction on your index maintainer, then it should “just work”. Likewise for record functions, though it’s canEvaluateRecordFunction and canEvaluateRecordFunction.

Right. I think you’ve figured most of this out, based on the response, but perhaps just to summarize: with an aggregate index, you can provided “grouping” and “grouped” (or “aggregated”) columns. So, in your example, you’ve produced a MAX_EVER index with one grouping column (the key) and one grouped column (the version). So, in SQL, the equivalent query is something like:

SELECT max_ever(KeyValue.version GROUPED BY KeyValue.key) FROM KeyValue WHERE KeyValue.key = key

For some key. The “evaluate aggregate function” syntax is essentially a translation of that, where:

  1. The first argument says the list of types to evaluate this query over, like FROM KeyValue
  2. The function argument is essentially serving the purpose of the max_ever(KeyValue.version GROUPED BY keyValue.key) part of the query, where the function name (the first argument) specifies the max_ever part and the key expression (the second argument) specifies KeyValue.version GROUPED BY KeyValue.key. I think you don’t actually need the index name in in the IndexAggregateFunction creation, though adding the index name essentially provides a hint to the Record Store to choose the right index.
  3. The third argument is doing the some thing as the WHERE KeyValue.key = key part of the query, though it’s somewhat subtle as to how. The API expects essentially the values of the grouping keys in the order they are declared by the query’s “grouped by” expression, and then it rendezvous them, one by one. So, in this case, with one grouping column, it matches the first grouping value to the grouping column itself.
  4. For completeness, the fourth argument, setting the IsolationLevel, just sets whether the transaction should be aborted if (1) this transaction is a read/write transaction and (2) any of the data read in the index changes. If “yes”, then this should be set to SERIALIZABLE and if “no”, then SNAPSHOT. The SQL statement here doesn’t really have an opinion on that.

So, to answer your question about whether the index could be used to retrieve the max version across all keys, I believe the answer is “yes”, though you would need to translate a slightly different statement, namely:

SELECT max_ever(KeyValue.version) FROM KeyValue

Note the lack of grouping keys. So something like:

IndexAggregateFunction function = new IndexAggregateFunction(
   FunctionNames.MAX_EVER,
   Key.Expressions.field("version").ungrouped(),
   INDEX_VERSION_PER_KEY.getName()
);
recordStore.evaluateAggregateFunction(
   Collections.singletonList("KeyValue"),
   function,
   Key.Evaluated.EMPTY,
   IsolationLevel.SERIALIZABLE
);

That should produce the max ever across all records, though note that this will require scanning the entire index, which for an index that is one per (etcd) key, that might be too many scans to fit in the timespan of a single transaction (five seconds). In that case, you may need to defined a second aggregate index without the grouping column.

Hmm, I’m not sure I totally understand all of the facets of this question, but you might be able to some of these things with additional grouping columns on your universal index. I’m not, for example, totally sure what a “directory” is in this context, but if it’s expressible as a field on a record (or a FunctionKeyExpression of a field on record), you could add it as an additional grouping column to your universal index. Note that you’ll need to make sure that the field is defined on all types, but if that’s possible, it can be done. You can also do the same with “multi-type” indexes, which are defined, as the name implies, on more than one type, though not necessarily all of them (like a universal index is).

Versionstamps

Yeah, at the moment, versionstamps within primary keys are not supported. There are some details as to why in Issue #100, but the tl;dr is that it would require some special case logic in the places where we write records, though maybe it’s worth doing. If you have more specific questions about this, I’d be happy to try and provide more details.

Watches

Unfortunately, the answer to both of this questions is that the Record Layer doesn’t supporting watching (on its own), so you’d need to either use the FDB APIs directly or we’d need to think about how we’d want watches exposed. One thing that’s come up in the past when things like this have been discussed is that there are a few different facets to what a “watch” API would look like:

  1. For example, should you be able to watch records for changes? And would the record be required to already exist? The reason this is important is that if you don’t have versionstamp tracking enabled, there isn’t actually any FDB key that is guaranteed to be touched when a record is written (due to some details of the storage format). So, it would either be required that you can only watch records that already exist, or it would be required that you have versionstamp tracking enabled.
  2. Should you be able to watch changes in index keys? In the case of an etcd layer, for example, you might want to watch the “max_version_by_key” index for changes. And if that’s the goal, what should the API look like?
  3. The FDB watch API has problems with A -> B -> A updates being lost. (See Understanding "Watches"). Should the Record Layer try and “protect” you from such an update by, for example, only allowing you to watch versionstamped keys, or something, which are guaranteed to be monotonically increasing?

So, there would need to be some thought put into exactly what we’d want the API to look like. Not an insurmountable amount of thought, mind you, but some thought.

This is getting a little bit into the territory of “peeling back abstractions”, but perhaps it is necessary given that the Record Layer doesn’t let you specify a way to watch keys, etc. But it would be something like recordStore.indexSubspace(index).pack(indexKey) to produce the key that an aggregate index is using, which you could then watch. Again, not sure if that’s recommended, per se, but it’s possible.

Yeah, I think that sounds like about the approach you’d need to take, given that FDB only maintains a finite amount of MVCC history, if you wanted to implement the full etcd watch API with events. Then you’d also need to play some tricks to correctly implement “range watches”, which aren’t really supported by the FDB API, and probably require some engineering around to make work.

Some Closing Thoughts

I hope that helps! There are probably either weird edge cases you’ll run into with a project as complicated as this, especially as adapting APIs built with one set of assumptions about the underlying storage implementation (e.g., single-Raft group with a single master writer) when applying it to a totally different backend. I hope some of this is helpful, and let us know if you have additional questions, and good luck with the project!

3 Likes

Hi Alec!
What an awesome response :astonished: THANK YOU for taking so much time to answer me :smile:

It sure does!

It makes total sense, thanks for all the links :slight_smile:

I actually got that one on my one last week, but it is super nice to be aligned with your point-of-view.

The issue is opened since 2018, is there other people asking for this? Might worth the shot :yum:

Sorry, poor choice of words, I was mainly talking about KeySpaces, where I can use the DirectoryLayerDirectory. I was wondering if an index could have insights about the current hierarchy used for the data keyspace, but I guess not as we are kind of chrooted in the KeySpace :tongue: I played with another piece of code where I’m created a dedicated MetadataStore with its own KeySpace scoped by a tenant for example, so I have a better understanding of what is possible for now.

Does versionstamp tracking has downsides?

I found the feature pretty cool!

That could be a log with versionstamp as key, and serialized record as value. Not cheap I admit, that could be an option for a record.

As you say, “range watches”, aren’t supported by the FDB API, but that may worth the shot to add them to the record-layer. I really like the idea of being notified when the “max_version_by_key” index has changed.

Maybe that could be a first step to add watches in record-layer, knowing that we can generate an value-type index. Then we could wire a log to provide range watches, what do you think?

Again, thanks a lot for all the insights you gave me. It is a fun project indeed, it gave me a good understanding of what the record-layer looks like and the possibilities. Now I’m kind of interested in contributing directly to the record-layer, especially the watch part, depending on how the discussion will go :rocket:

Ah, I really should be better at responding more promptly to things…

Hm, I think there are probably a few different people vaguely asking about having this (or wishing this existed). The main difficultly would be trying to make sure that all of the places that perform a set do the “right thing” and translate the set to a versionstamp operation, I think.

Mainly just that it uses some extra space to store the versions (per record, roughly one extra key with a 13 byte value) and therefore also a bit of extra I/O. So, nothing too bad, but not zero, per se.

Yeah, something like that. I think the exact nature of this log has interesting requirements/scalability questions. For example:

  1. If it needs to support arbitrary range watches, is it okay if it needs to scan and filter out data from “unused” ranges? For example, you could imagine keeping a history of all changes to everything, and then just throwing out changes not in your watched range. But is that too slow (especially if you’re watching a small range). And could this use pattern lead to hot spots? For example, if all updates end up writing to the tail of this log, then will the storage servers for those log keys be overwhelmed with too many writes?
  2. An alternative design would, for example, maintain one queue on disk for each range watch request that contains only the changes needed to satisfy that request. But then if there are ranges with many watchers, does that produce a lot of write amplification? And how do you know who’s watching at write time?

There may be an interesting hybrid or something that allows for most range watches or all range watches (with some ranges being more efficient to watch than others).

Awesome! Best of luck exploring!

Thanks Alec for the answers!

I think I will start my contributions by enabling versions within the primary keys, and then I will upsert any GitHub issues related to watches. Is that OK for you?