81 lines
2.2 KiB
Go
81 lines
2.2 KiB
Go
package session
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// HeartbeatInterval is the default cadence for updating the lease's
|
|
// last_heartbeat_at timestamp.
|
|
const HeartbeatInterval = 30 * time.Second
|
|
|
|
// StaleLeaseAfter is the age at which a lease's heartbeat is considered dead.
|
|
const StaleLeaseAfter = 60 * time.Second
|
|
|
|
// Heartbeat is a background goroutine that periodically rewrites the lease
|
|
// file at leasePath with an updated LastHeartbeatAt. Stop blocks until the
|
|
// goroutine has exited.
|
|
type Heartbeat struct {
|
|
stop chan struct{}
|
|
done chan struct{}
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
// StartHeartbeat launches a heartbeat goroutine using the default logger
|
|
// (writes to stderr on update failure). Heartbeat write failures are
|
|
// best-effort (spec: "log warning but do not interrupt the session").
|
|
func StartHeartbeat(leasePath string, interval time.Duration) *Heartbeat {
|
|
return StartHeartbeatWith(leasePath, interval, func(err error) {
|
|
fmt.Fprintf(os.Stderr, "[ctask] Warning: heartbeat write failed: %v\n", err)
|
|
})
|
|
}
|
|
|
|
// StartHeartbeatWith is the test-injectable form of StartHeartbeat: on each
|
|
// update failure, onErr is invoked instead of writing to stderr.
|
|
func StartHeartbeatWith(leasePath string, interval time.Duration, onErr func(error)) *Heartbeat {
|
|
h := &Heartbeat{
|
|
stop: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
go h.run(leasePath, interval, onErr)
|
|
return h
|
|
}
|
|
|
|
func (h *Heartbeat) run(leasePath string, interval time.Duration, onErr func(error)) {
|
|
defer close(h.done)
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-h.stop:
|
|
return
|
|
case <-t.C:
|
|
if err := updateHeartbeat(leasePath); err != nil {
|
|
onErr(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop signals the heartbeat to exit and waits for the goroutine to finish.
|
|
// Safe to call more than once.
|
|
func (h *Heartbeat) Stop() {
|
|
h.stopOnce.Do(func() {
|
|
close(h.stop)
|
|
})
|
|
<-h.done
|
|
}
|
|
|
|
// updateHeartbeat reads the lease file, overwrites LastHeartbeatAt with the
|
|
// current UTC timestamp, and writes the lease back.
|
|
func updateHeartbeat(leasePath string) error {
|
|
l, err := ReadLease(leasePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
l.LastHeartbeatAt = time.Now().UTC().Truncate(time.Second)
|
|
return WriteLease(leasePath, l)
|
|
}
|