From 42fce738246f724d719fa4b79a4661b552be70dd Mon Sep 17 00:00:00 2001 From: typebasedio Date: Tue, 21 Apr 2026 17:05:15 -0400 Subject: [PATCH] feat(v0.4): add heartbeat goroutine that updates lease timestamp --- internal/session/heartbeat.go | 80 ++++++++++++++++++++++++++++++ internal/session/heartbeat_test.go | 74 +++++++++++++++++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 internal/session/heartbeat.go create mode 100644 internal/session/heartbeat_test.go diff --git a/internal/session/heartbeat.go b/internal/session/heartbeat.go new file mode 100644 index 0000000..a231d5d --- /dev/null +++ b/internal/session/heartbeat.go @@ -0,0 +1,80 @@ +package session + +import ( + "fmt" + "os" + "sync" + "time" +) + +// HeartbeatInterval is the default cadence for updating the lease's +// last_heartbeat_at timestamp. +const HeartbeatInterval = 30 * time.Second + +// StaleLeaseAfter is the age at which a lease's heartbeat is considered dead. +const StaleLeaseAfter = 60 * time.Second + +// Heartbeat is a background goroutine that periodically rewrites the lease +// file at leasePath with an updated LastHeartbeatAt. Stop blocks until the +// goroutine has exited. +type Heartbeat struct { + stop chan struct{} + done chan struct{} + stopOnce sync.Once +} + +// StartHeartbeat launches a heartbeat goroutine using the default logger +// (writes to stderr on update failure). Heartbeat write failures are +// best-effort (spec: "log warning but do not interrupt the session"). +func StartHeartbeat(leasePath string, interval time.Duration) *Heartbeat { + return StartHeartbeatWith(leasePath, interval, func(err error) { + fmt.Fprintf(os.Stderr, "[ctask] Warning: heartbeat write failed: %v\n", err) + }) +} + +// StartHeartbeatWith is the test-injectable form of StartHeartbeat: on each +// update failure, onErr is invoked instead of writing to stderr. +func StartHeartbeatWith(leasePath string, interval time.Duration, onErr func(error)) *Heartbeat { + h := &Heartbeat{ + stop: make(chan struct{}), + done: make(chan struct{}), + } + go h.run(leasePath, interval, onErr) + return h +} + +func (h *Heartbeat) run(leasePath string, interval time.Duration, onErr func(error)) { + defer close(h.done) + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-h.stop: + return + case <-t.C: + if err := updateHeartbeat(leasePath); err != nil { + onErr(err) + } + } + } +} + +// Stop signals the heartbeat to exit and waits for the goroutine to finish. +// Safe to call more than once. +func (h *Heartbeat) Stop() { + h.stopOnce.Do(func() { + close(h.stop) + }) + <-h.done +} + +// updateHeartbeat reads the lease file, overwrites LastHeartbeatAt with the +// current UTC timestamp, and writes the lease back. +func updateHeartbeat(leasePath string) error { + l, err := ReadLease(leasePath) + if err != nil { + return err + } + l.LastHeartbeatAt = time.Now().UTC().Truncate(time.Second) + return WriteLease(leasePath, l) +} diff --git a/internal/session/heartbeat_test.go b/internal/session/heartbeat_test.go new file mode 100644 index 0000000..8559252 --- /dev/null +++ b/internal/session/heartbeat_test.go @@ -0,0 +1,74 @@ +package session + +import ( + "path/filepath" + "sync/atomic" + "testing" + "time" +) + +func TestHeartbeatUpdatesFile(t *testing.T) { + dir := t.TempDir() + leasePath := filepath.Join(dir, "session.json") + + // Use an obviously old initial heartbeat so the seconds-truncated update + // will be detectably newer regardless of when tick fires within a second. + old := time.Now().Add(-1 * time.Hour).UTC().Truncate(time.Second) + lease := NewLease(old, "claude", "local") + if err := WriteLease(leasePath, lease); err != nil { + t.Fatalf("WriteLease: %v", err) + } + + hb := StartHeartbeat(leasePath, 30*time.Millisecond) + defer hb.Stop() + + time.Sleep(90 * time.Millisecond) + + got, err := ReadLease(leasePath) + if err != nil { + t.Fatalf("ReadLease: %v", err) + } + if !got.LastHeartbeatAt.After(old) { + t.Errorf("LastHeartbeatAt should be after initial time (%v), got %v", old, got.LastHeartbeatAt) + } +} + +func TestHeartbeatStopHaltsUpdates(t *testing.T) { + dir := t.TempDir() + leasePath := filepath.Join(dir, "session.json") + + now := time.Now().UTC().Truncate(time.Second) + lease := NewLease(now, "claude", "local") + WriteLease(leasePath, lease) + + hb := StartHeartbeat(leasePath, 20*time.Millisecond) + time.Sleep(60 * time.Millisecond) + hb.Stop() + + afterStop, _ := ReadLease(leasePath) + lastSeen := afterStop.LastHeartbeatAt + + time.Sleep(100 * time.Millisecond) + again, _ := ReadLease(leasePath) + + if !again.LastHeartbeatAt.Equal(lastSeen) { + t.Errorf("heartbeat continued after Stop: before=%v after=%v", lastSeen, again.LastHeartbeatAt) + } +} + +func TestHeartbeatMissingFileLogsAndContinues(t *testing.T) { + dir := t.TempDir() + leasePath := filepath.Join(dir, "session.json") + + var writeFailures atomic.Int32 + hb := StartHeartbeatWith(leasePath, 20*time.Millisecond, func(err error) { + writeFailures.Add(1) + }) + defer hb.Stop() + + time.Sleep(80 * time.Millisecond) + + if writeFailures.Load() == 0 { + t.Error("expected at least one heartbeat failure on missing file") + } +}