Multiple questions about Indexes, functions and watches to implement etcd-layer

Thanks for your patience as we were mulling over this post! We’re certainly glad you’re finding the Record Layer fun to explore!

Functions

Okay, there were a couple of different questions regarding the various function classes. If I’m honest, the distinctions are a little subtle (and I needed to look them again), but the understanding I have is:

  1. Aggregate functions and aggregate index functions are both operations where you want to take all values from a specific field across multiple records and “aggregate” them together. For example, the “sum” aggregate function represents the sum of a certain field aggregated across all records. Something like SELECT sum(some_field) FROM some_record_type WHERE some_predicate() in SQL-ish.
  2. Aggregate index functions are like regular aggregate functions, but they use an index to answer. For many aggregate functions, using an index is just an optimization, though there are certain aggregates that actually require an index. For example, if you didn’t have a “sum” index, you could still answer a “sum” query by iterating over all records and summing the value. But, the “max_ever” function (which is an aggregate index function) cannot be answered without having a “max_ever” index because if you insert a record with a higher value of the indexed field, then that updates the “max_ever” value, but then if you remove it, then you should still have the same “max_ever” value even though it can no longer be recomputed from just the records. (The “count_updates” index, which counts how many times a thing is updated, is similar, and is probably miscategorized in FunctionNames.)
  3. Record functions are functions that produce a single value based on the value of a field for a single record. That function might require looking at an index and it’s value may depend on what other data are being stored, but it’s not “aggregating” the values across multiple records; it computes the value for a single record. So, for example, the “rank” function is a record function because each individual record will have its own rank (based on a given field).
  4. Then an IndexRecordFunction is a record function whose value is determined by looking at an index, whereas the StoreRecordFunction doesn’t look at an index (just the record store). For example, the rank function will typically be implemented with an IndexRecordFunction as it needs to figure out the rank of a record based on an appropriate “rank” index, but a “version” function (getting the FDBRecordVersion associated with a given record) would typically use a StoreRecordFunction as it doesn’t require using a separate index (the versions are stored right with the records themselves).

So, I hope that helps, though it’s definitely a somewhat confusing hierarchy, so if you have more follow up questions, I’d be happy to try and clarify.

Indexes

The basic flow you lay out for index maintenance is essentially correct. Within saveRecord, we essentially iterate over the indexes defined on that index type and then call update on the index maintainer implementation. Then the commit will transactionally update the records and all the indexes.

Believe it or not, these questions are actually related.

So, when an index is about to be updated, we first load it, based on its type, from a registry mapping index type to index maintainer implementation (or, actually, index maintainer . See: https://github.com/FoundationDB/fdb-record-layer/blob/1804da48ccf3dcec301767e27743ea492f394f74/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java#L583

The registry knows what maintainers exist by reading from a service loader the list of available maintainers. See: https://github.com/FoundationDB/fdb-record-layer/blob/1804da48ccf3dcec301767e27743ea492f394f74/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexMaintainerRegistryImpl.java#L58 (Though the maintainer registry is configurable, so you could pick your own favorite implementation instead of using the default one, though there may be places where we call IndexMaintainerRegistryImpl.instance() other than just to populate the default, so here there be dragons.)

The service loader knows which classes exist by looking in a file included in the Jar’s resources. Entries can be added to that file by using the AutoService class annotation. See the ValueIndexMaintainerFactory, which lists the types of indexes it provides maintainers for as just the IndexType.VALUE type and it marks itself with @AutoService(IndexMaintainerFactory.class). See also the AtomicMutationIndexMaintainerFactory, which provides an implementation for something that provides multiple types.

So, the steps to creating a custom index type are something like:

  1. Create an implementation of the IndexMaintainer class (which probably should be an interface instead of an abstract class), possibly one that extends the StandardIndexMaintainer. Note that our track record with keeping the API there stable from the point of view of an implementor is a little bit weaker than our track record with keeping it stable from the point of view of a consumer. So, for example, we may add methods that new adopters will need to add their own implementations of.
  2. Create an implementation of the IndexMaintainerFactory that knows how to produce IndexMaintainers of the new type you’d like to add. Add the type to list it returns in getTypes, and mark the class as @AutoService(IndexMaintainerFactory.class).
  3. There’s no step 3! (Or, more seriously, you should be able to now create indexes with the type you defined.)

Note that it’s kind of on you, the index maintainer, to make sure you don’t accidentally change the index in some backwards incompatible way, storage wise. So, for example, if your index involves serializing some data, you have to make sure that the serialization is “stable” and won’t break when you upgrade software (or are okay with not being able to read old data, etc.).

For custom functions, I’m not sure if you are referring to the “record functions” and “aggregate functions” discussed above, or FunctionKeyExpressions. In many instances, it may actually be preferable to produce a new FunctionKeyExpression instead of a new index type. For example, if your index is really about extracting information in a new way (like, for example, an “descending” sort index, where field values are sorted in descending rather than ascending order), you may just want a custom function (in the descending sort case, one that flips the bytes of the extracted field so that it sorts “the wrong way”). If your index is about storing data in a new way (e.g., maintaining a whole new data structure so that you can do exotic things that can’t be done with an existing index), then you might have a new container. For example, in the world of “weird indexes you might want to have on text”, we have (1) a “text index”, for full body search, that stores data on disk in a data structure that is designed to use less space than a naïve approach, and (2) a “collation” FunctionKeyExpression, which allows the user to specify a locale to produce an index entry with appropriate case folding for the text they produce (for use with a standard value index). (As an aside, it’s possible if we were doing things again that text indexes would take a special FunctionKeyExpression from “string” to “map<string, list>” instead of essentially putting both things in the index, but also, maybe alea iacta est on that one…)

In any case, defining a custom FunctionKeyExpression is very similar to the way a custom index is defined:

  1. Create a new implementation of the FunctionKeyExpression class. See, e.g., GeophileSpatialFunctionKeyExpression
  2. Create an implementation of the FunctionKeyExpression.Factory interface, and make sure to mark it as @AutoService(FunctionKeyExpression.Factory.class) as well as include a builder for the key expression in the list of builders it returns. See, e.g., GeophileSpatialFunctionKeyExpressionFactory
  3. The function can now be referenced by name with something like Key.Expressions.function(functionName), and it can be queried (if the function implements QueryableKeyExpression), though that bit of the API is somewhat newer and may have some deficiencies in the planner.

Defining a custom IndexAggregateFunction I think is easier. It’s been a while since I’ve done it, so I could be wrong, but I think you can just start using a new name for the function, and then as long as you implement canEvaluateAggregateFunction and evaluateAggregateFunction on your index maintainer, then it should “just work”. Likewise for record functions, though it’s canEvaluateRecordFunction and canEvaluateRecordFunction.

Right. I think you’ve figured most of this out, based on the response, but perhaps just to summarize: with an aggregate index, you can provided “grouping” and “grouped” (or “aggregated”) columns. So, in your example, you’ve produced a MAX_EVER index with one grouping column (the key) and one grouped column (the version). So, in SQL, the equivalent query is something like:

SELECT max_ever(KeyValue.version GROUPED BY KeyValue.key) FROM KeyValue WHERE KeyValue.key = key

For some key. The “evaluate aggregate function” syntax is essentially a translation of that, where:

  1. The first argument says the list of types to evaluate this query over, like FROM KeyValue
  2. The function argument is essentially serving the purpose of the max_ever(KeyValue.version GROUPED BY keyValue.key) part of the query, where the function name (the first argument) specifies the max_ever part and the key expression (the second argument) specifies KeyValue.version GROUPED BY KeyValue.key. I think you don’t actually need the index name in in the IndexAggregateFunction creation, though adding the index name essentially provides a hint to the Record Store to choose the right index.
  3. The third argument is doing the some thing as the WHERE KeyValue.key = key part of the query, though it’s somewhat subtle as to how. The API expects essentially the values of the grouping keys in the order they are declared by the query’s “grouped by” expression, and then it rendezvous them, one by one. So, in this case, with one grouping column, it matches the first grouping value to the grouping column itself.
  4. For completeness, the fourth argument, setting the IsolationLevel, just sets whether the transaction should be aborted if (1) this transaction is a read/write transaction and (2) any of the data read in the index changes. If “yes”, then this should be set to SERIALIZABLE and if “no”, then SNAPSHOT. The SQL statement here doesn’t really have an opinion on that.

So, to answer your question about whether the index could be used to retrieve the max version across all keys, I believe the answer is “yes”, though you would need to translate a slightly different statement, namely:

SELECT max_ever(KeyValue.version) FROM KeyValue

Note the lack of grouping keys. So something like:

IndexAggregateFunction function = new IndexAggregateFunction(
   FunctionNames.MAX_EVER,
   Key.Expressions.field("version").ungrouped(),
   INDEX_VERSION_PER_KEY.getName()
);
recordStore.evaluateAggregateFunction(
   Collections.singletonList("KeyValue"),
   function,
   Key.Evaluated.EMPTY,
   IsolationLevel.SERIALIZABLE
);

That should produce the max ever across all records, though note that this will require scanning the entire index, which for an index that is one per (etcd) key, that might be too many scans to fit in the timespan of a single transaction (five seconds). In that case, you may need to defined a second aggregate index without the grouping column.

Hmm, I’m not sure I totally understand all of the facets of this question, but you might be able to some of these things with additional grouping columns on your universal index. I’m not, for example, totally sure what a “directory” is in this context, but if it’s expressible as a field on a record (or a FunctionKeyExpression of a field on record), you could add it as an additional grouping column to your universal index. Note that you’ll need to make sure that the field is defined on all types, but if that’s possible, it can be done. You can also do the same with “multi-type” indexes, which are defined, as the name implies, on more than one type, though not necessarily all of them (like a universal index is).

Versionstamps

Yeah, at the moment, versionstamps within primary keys are not supported. There are some details as to why in Issue #100, but the tl;dr is that it would require some special case logic in the places where we write records, though maybe it’s worth doing. If you have more specific questions about this, I’d be happy to try and provide more details.

Watches

Unfortunately, the answer to both of this questions is that the Record Layer doesn’t supporting watching (on its own), so you’d need to either use the FDB APIs directly or we’d need to think about how we’d want watches exposed. One thing that’s come up in the past when things like this have been discussed is that there are a few different facets to what a “watch” API would look like:

  1. For example, should you be able to watch records for changes? And would the record be required to already exist? The reason this is important is that if you don’t have versionstamp tracking enabled, there isn’t actually any FDB key that is guaranteed to be touched when a record is written (due to some details of the storage format). So, it would either be required that you can only watch records that already exist, or it would be required that you have versionstamp tracking enabled.
  2. Should you be able to watch changes in index keys? In the case of an etcd layer, for example, you might want to watch the “max_version_by_key” index for changes. And if that’s the goal, what should the API look like?
  3. The FDB watch API has problems with A → B → A updates being lost. (See Understanding "Watches"). Should the Record Layer try and “protect” you from such an update by, for example, only allowing you to watch versionstamped keys, or something, which are guaranteed to be monotonically increasing?

So, there would need to be some thought put into exactly what we’d want the API to look like. Not an insurmountable amount of thought, mind you, but some thought.

This is getting a little bit into the territory of “peeling back abstractions”, but perhaps it is necessary given that the Record Layer doesn’t let you specify a way to watch keys, etc. But it would be something like recordStore.indexSubspace(index).pack(indexKey) to produce the key that an aggregate index is using, which you could then watch. Again, not sure if that’s recommended, per se, but it’s possible.

Yeah, I think that sounds like about the approach you’d need to take, given that FDB only maintains a finite amount of MVCC history, if you wanted to implement the full etcd watch API with events. Then you’d also need to play some tricks to correctly implement “range watches”, which aren’t really supported by the FDB API, and probably require some engineering around to make work.

Some Closing Thoughts

I hope that helps! There are probably either weird edge cases you’ll run into with a project as complicated as this, especially as adapting APIs built with one set of assumptions about the underlying storage implementation (e.g., single-Raft group with a single master writer) when applying it to a totally different backend. I hope some of this is helpful, and let us know if you have additional questions, and good luck with the project!

3 Likes