feat(v0.4): add heartbeat goroutine that updates lease timestamp
This commit is contained in:
@@ -0,0 +1,80 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HeartbeatInterval is the default cadence for updating the lease's
|
||||
// last_heartbeat_at timestamp.
|
||||
const HeartbeatInterval = 30 * time.Second
|
||||
|
||||
// StaleLeaseAfter is the age at which a lease's heartbeat is considered dead.
|
||||
const StaleLeaseAfter = 60 * time.Second
|
||||
|
||||
// Heartbeat is a background goroutine that periodically rewrites the lease
|
||||
// file at leasePath with an updated LastHeartbeatAt. Stop blocks until the
|
||||
// goroutine has exited.
|
||||
type Heartbeat struct {
|
||||
stop chan struct{}
|
||||
done chan struct{}
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
// StartHeartbeat launches a heartbeat goroutine using the default logger
|
||||
// (writes to stderr on update failure). Heartbeat write failures are
|
||||
// best-effort (spec: "log warning but do not interrupt the session").
|
||||
func StartHeartbeat(leasePath string, interval time.Duration) *Heartbeat {
|
||||
return StartHeartbeatWith(leasePath, interval, func(err error) {
|
||||
fmt.Fprintf(os.Stderr, "[ctask] Warning: heartbeat write failed: %v\n", err)
|
||||
})
|
||||
}
|
||||
|
||||
// StartHeartbeatWith is the test-injectable form of StartHeartbeat: on each
|
||||
// update failure, onErr is invoked instead of writing to stderr.
|
||||
func StartHeartbeatWith(leasePath string, interval time.Duration, onErr func(error)) *Heartbeat {
|
||||
h := &Heartbeat{
|
||||
stop: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
go h.run(leasePath, interval, onErr)
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *Heartbeat) run(leasePath string, interval time.Duration, onErr func(error)) {
|
||||
defer close(h.done)
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
if err := updateHeartbeat(leasePath); err != nil {
|
||||
onErr(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop signals the heartbeat to exit and waits for the goroutine to finish.
|
||||
// Safe to call more than once.
|
||||
func (h *Heartbeat) Stop() {
|
||||
h.stopOnce.Do(func() {
|
||||
close(h.stop)
|
||||
})
|
||||
<-h.done
|
||||
}
|
||||
|
||||
// updateHeartbeat reads the lease file, overwrites LastHeartbeatAt with the
|
||||
// current UTC timestamp, and writes the lease back.
|
||||
func updateHeartbeat(leasePath string) error {
|
||||
l, err := ReadLease(leasePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.LastHeartbeatAt = time.Now().UTC().Truncate(time.Second)
|
||||
return WriteLease(leasePath, l)
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestHeartbeatUpdatesFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
leasePath := filepath.Join(dir, "session.json")
|
||||
|
||||
// Use an obviously old initial heartbeat so the seconds-truncated update
|
||||
// will be detectably newer regardless of when tick fires within a second.
|
||||
old := time.Now().Add(-1 * time.Hour).UTC().Truncate(time.Second)
|
||||
lease := NewLease(old, "claude", "local")
|
||||
if err := WriteLease(leasePath, lease); err != nil {
|
||||
t.Fatalf("WriteLease: %v", err)
|
||||
}
|
||||
|
||||
hb := StartHeartbeat(leasePath, 30*time.Millisecond)
|
||||
defer hb.Stop()
|
||||
|
||||
time.Sleep(90 * time.Millisecond)
|
||||
|
||||
got, err := ReadLease(leasePath)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadLease: %v", err)
|
||||
}
|
||||
if !got.LastHeartbeatAt.After(old) {
|
||||
t.Errorf("LastHeartbeatAt should be after initial time (%v), got %v", old, got.LastHeartbeatAt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeartbeatStopHaltsUpdates(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
leasePath := filepath.Join(dir, "session.json")
|
||||
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
lease := NewLease(now, "claude", "local")
|
||||
WriteLease(leasePath, lease)
|
||||
|
||||
hb := StartHeartbeat(leasePath, 20*time.Millisecond)
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
hb.Stop()
|
||||
|
||||
afterStop, _ := ReadLease(leasePath)
|
||||
lastSeen := afterStop.LastHeartbeatAt
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
again, _ := ReadLease(leasePath)
|
||||
|
||||
if !again.LastHeartbeatAt.Equal(lastSeen) {
|
||||
t.Errorf("heartbeat continued after Stop: before=%v after=%v", lastSeen, again.LastHeartbeatAt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeartbeatMissingFileLogsAndContinues(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
leasePath := filepath.Join(dir, "session.json")
|
||||
|
||||
var writeFailures atomic.Int32
|
||||
hb := StartHeartbeatWith(leasePath, 20*time.Millisecond, func(err error) {
|
||||
writeFailures.Add(1)
|
||||
})
|
||||
defer hb.Stop()
|
||||
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
if writeFailures.Load() == 0 {
|
||||
t.Error("expected at least one heartbeat failure on missing file")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user