Get timestamp from fdb server?

Is there anyway, inside a transaction, to get a timestamp from the fdbserver ?

Context: I’m implementing a simple job queue in fdb. I want a way to know when a job has “stalled” (i.e. some worker claimed a job, but 30 seconds passed, and the worker did not finish the job.) To do this, I need somewhat accurate timestamps. Instead of setting up NTP on all clients that might take to fdb; is there a way to grab a timestamp from the fdbserver itself?

Thanks!

Foundation DB does not provide a timestamp by default. What I’ve done is the past is implement a “time” service which updates a key once per second. This solution only provides relative time. You need to compare two time values to get an elapsed duration.

In Go, it looks something like this:

func GetTime(db fdb.ReadTransactor) (time.Duration, error) {
	dur, err := db.ReadTransact(func(tr fdb.ReadTransaction) (interface{}, error) {
		val := tr.Snapshot().Get(clockKey).MustGet()
		if val == nil {
			return time.Duration(0), errors.New("missing key")
		}
		dur := time.Duration(binary.LittleEndian.Uint64(val))
		return dur * clockPeriod, nil
	})
	if err != nil {
		return 0, err
	}
	return dur.(time.Duration), nil
}

func KeepTime(ctx context.Context, db fdb.Transactor) error {
	timer := time.NewTimer(0)
	if !timer.Stop() {
		<-timer.C
	}

	for {
		_, err := db.Transact(func(tr fdb.Transaction) (interface{}, error) {
			// Reset the timer to ensure this transaction waits the entire
			// period before committing, even if it's auto-retried.
			timer.Reset(c.cfg.Period)

			// Because this is a write-only transaction, we need to manually
			// acquire a read version to facilitate conflicts between multiple
			// time keepers, preventing the clock from ticking multiple times
			// per period. Under high load, FDB will delay this operation,
			// potentially slowing the clock's effective period.
			tr.GetReadVersion().MustGet()

			// Because this is a write-only transaction, we need to manually
			// set a read conflict on the clock's key.
			err := tr.AddReadConflictKey(c.key)
			if err != nil {
				return nil, err
			}

			// Increment the value of the clock's key. If the key
			// is empty, this operation will set the key to '0'
			// before incrementing.
			tr.Add(c.key, littleEndianOne[:])

			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			case <-timer.C:
				return nil, nil
			}
		})
		if err != nil {
			if err, ok := err.(fdb.Error); ok {
				// If the transaction timed-out, just log and ignore
				// this error. When multiple clocks are running, a
				// single clock will usually continue to "win" the
				// race to update the clock value, meaning the other
				// clock's transactions will always time out.
				if err.Code == 1031 {
					log.Warn().Err(err).Msg("ignoring time-out error")
					continue
				}
			}
			return err
		}
	}
}
1 Like

Is this a correct understanding of your solution?

  1. One server elects self as “time manager”
  2. Every second, updates some field in fdb.

Then the issue seems to be:

  1. if there are 0 “time managers”, the diff doesn’t advance
  2. if there are N >1 “time managers”, the diff advances N every second

This whole notion of having exactly-1 server be “time manager”; I’m not sure I like this additional “complexity”

No election. Multiple servers compete to be the clock. FDB rejects transactions from all but one by nature of the above implementation.

Welcome to distributed systems. :slight_smile: I’m being a bit much, but this is a common failure case. The same applies to a DB providing timestamps going offline.

See my reply above.

Read the comments in the implementation. They spell these details out. Note that I’m copy pasting old code, so be sure to do some testing yourself. By my understanding, the clock will tick AT MOST once per second. If the DB is heavily loaded then the ticks may be a bit slower. This has been good enough for the applications I’ve worked on.

1 Like

Maybe it’ll be helpful for me to outline a timing diagram. Let’s assume 3 clients: Clock A, Clock B, & Clock C:

t1 : Clock A starts tx
t2 : Clock B starts tx
t3 : Clock C starts tx
t4 : Clock A commits, increments the tick, and starts new tx
t5 : Clock B tries to commit but fails because of read conflict, starts new tx
t6 : Clock C tries to commit but fails because of read conflict, starts new tx
-- : Clock A dies
t7 : Clock B commits, increments the tick, and starts new tx
t8 : Clock C tries to commit but fails because of read conflict, starts new tx

Because of the read conflict, only one clock can increment the tick per time period.

I need to understand this in detail (in order to port to Elixir). So I’m going to have to ask some dumb questions.

  1. The purpose of line 1 (in comment I deleted) is to match up with line 6, to ensure that the transaction takes atleast c.cfg.Period unit time? Thus ensuring that in any given c.cfg.Period, at most one client can increment.
  2. Why do we need this? I want to delete this. Intuitively to me, L3 does read, L4 does write.
  3. Why do we do (1) fake read (2) atomic add at all ? Why is this better than (1) actual read, (2) write old_value + 1 ?
  4. What can cause L5? If we got through the L3 fake-read and L4 write, what can kill our transaction?
  5. Supposing something can kill the transaction at L5, could it kill transaction for ALL clients; i.e. if we have N concurrent trying-to-increment, whatever that kills one of them; could it kill ALL of them ? Thus ensuring nothing increments the counter ?

Thanks!

1 Like

Very cool. I’ve been meaning to write an FDB app using Elixir at some point. A very nice language.

I seem to have somehow edited previous in response in stead of writing new response. Reposting here.

I need to understand this in detail (in order to port to Elixir). So I’m going to have to ask some dumb questions.

  1. The purpose of line 1 (in comment I deleted) is to match up with line 6, to ensure that the transaction takes atleast c.cfg.Period unit time? Thus ensuring that in any given c.cfg.Period, at most one client can increment.
  2. Why do we need this? I want to delete this. Intuitively to me, L3 does read, L4 does write.
  3. Why do we do (1) fake read (2) atomic add at all ? Why is this better than (1) actual read, (2) write old_value + 1 ?
  4. What can cause L5? If we got through the L3 fake-read and L4 write, what can kill our transaction?
  5. Supposing something can kill the transaction at L5, could it kill transaction for ALL clients; i.e. if we have N concurrent trying-to-increment, whatever that kills one of them; could it kill ALL of them ? Thus ensuring nothing increments the counter ?

Thanks!

Instead of doing a 1s long transaction, why not do:

```
get cur value of counter

sleep 1s

transaction {

if value of counter hasn’t changed; increment

}
```

I reset the timer at the start of the transaction. This ensure the transaction will last for 1s. Go bindings automatically retry transactions with transient errors, so I want to make sure I reset the timer every time a new transaction starts.

This transaction contains no reads. When a transaction does not perform reads, the client does not receive a “read version”, which is the versionstamp representing the state of the DB when the transaction started. To manually obtain a read conflict on the next line, I need to first manually obtain a read version. If I was actually doing a read, both of these operations are done for me automatically.

It better because less data is sent over the wire. It’s an optimization. Reading the value and adding one would also work.

This is a Go thing. In Go, you pass cancelation tokens via the context.Context type. This would occur if another part of the code canceled the context and it would cause this function to exit. Otherwise, this function will run forever, which may be OK. In Go, I always provide a way to cancel infinite loops, just in case I need to do so for some reason.

No, the context is local to the process. This is only used if one thread in the application wants to cancel this thread.

1 Like

This would work as well.

I think I finally understand all the subtleties involved now, and the trade offs of the various approaches.

Thank you for your time & patience in explaining everything to me.

Cheers!

1 Like

Is there a way to mark this as solution? I can’t seem to find a button to do it in my UI.

No worries. I enjoy talking about this stuff so it’s really not an issue. Let us know how your solution turns out! I’d like to see more Elixir on these forums.

I don’t know if this is a feature of these forums. :man_shrugging:

Maybe this should be a separate question, but posting here since it’s mostly directed at @janderland :

What are the “external api” and the “semantics” of your job queue?

Things I have in mind are (all stored in fdb)

  1. todo: List, in_progress: List
  2. init_command: Map<JobId, Job_Command>
  3. status: Map<JobId, [InProgress || Done]>

op enqueue:
push to $init_command, push to $todo

op take_job:
take from $todo; push to $in_progress
get job details from init_command

op mark_done:
delete from $in_progress
update $status

… maybe some ops for killing/restarting “zombie jobs”

Anyways, curious what “api” and “semantics” you ended picking for your job queue.

Thanks!

I work on a system which runs several hundred worker nodes. Each worker is assigned a shard. We run double replicas, so each shard has two nodes assigned to it. Requests come in and must be processed by ALL shards before they are completed. In FDB we store the following things:

  • Worker heartbeats: the works register themselves using a unique name and begin to heartbeat to let the system know they are alive.
  • Shard assignment: a singleton service called the “monitor” assigns workers to the various shards and stores there assignment in this subspace. The workers monitor it and download the shard that they are assigned.
  • Reduction space: this subspace is used to “reduce” the results from all workers into a single response. When a request comes in, we create a new entry in this space. The request entry includes a list of all the shards. As the workers complete their task, they delete their shard from the list and store the results here. Once the shard list for a particular job is empty, we know that job is completed.
  • If a request doesn’t complete after X ticks then it’s marked as failed.

After creating an entry in the “reduction space” for a request, we send the request onto a message queue which the workers read from. Each pair of replicas listens to a single “consumer” on the queue, meaning a single request will only be read by one of the replicas. We use NATS.io for our queue.

I’m curious. Why aren’t you using fdb for this too? My current philosophy of fdb is:

  1. store blobs in s3
  2. do realtime stuff of ws/webrtc
  3. for everything else, if kv size fits, and it needs to survive machine crashes, store it in fdb

You could use FDB for your queue. We experimented with that at one point. But we made the assumption that NATS will perform better for data transportation since that’s what it’s designed to do.