feat(v0.5.3): AdoptExistingPersistentSession with race guard, UpdatedAt bump, attach-error propagation

This commit is contained in:
2026-05-08 13:56:46 -04:00
parent a1309b596e
commit 08fb5bb1c3
2 changed files with 430 additions and 0 deletions
+206
View File
@@ -0,0 +1,206 @@
package session
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/warrenronsiek/ctask/internal/lockfile"
"github.com/warrenronsiek/ctask/internal/shell"
"github.com/warrenronsiek/ctask/internal/workspace"
)
// adoptAttacher and adoptPoll are test seams that *wrap* shell primitives
// rather than hand-rolling tmux invocations. Production calls go through
// shell.AttachSession / shell.PollSessionEnd. Tests override these
// variables; do not run such tests with t.Parallel().
var (
adoptAttacher = shell.AttachSession
adoptPoll = shell.PollSessionEnd
)
// AdoptExistingPersistentSession is the adopted-reattach path. It is
// invoked when a tmux session for the workspace exists but the lease is
// missing, stale, or fresh-but-from-another-host (the cmd-layer dispatcher
// has already prompted for confirmation in the fresh_remote case before
// calling here — see cmd/persistent.go::confirmFreshRemoteAdoption).
//
// Eight-step flow (see v0.5.3-spec.md §3 path C):
// 1. Print one diagnostic line announcing the adoption.
// 2. Acquire write lock; under lock, re-read lease state. If now
// fresh-local (concurrent adopter raced ahead), release the lock and
// fall through to AttachExisting — the race winner owns the lease.
// Otherwise, remove the orphaned lease, write a new lease for this
// process, and bump task.yaml.UpdatedAt.
// 3. Capture a fresh start manifest. Without it, finalize has no
// reliable diff baseline.
// 4. Start the heartbeat goroutine.
// 5. shell.AttachSession (returns nil on clean exit, error on non-zero).
// 6. Polling loop until tmux reports session gone — runs ONLY after a
// successful attach.
// 7. Stop the heartbeat goroutine.
// 8. finalize with SessionOwnership="adopted" and AdoptedFromOrphanAt set.
//
// On attach failure (step 5 returns error), steps 6-8 are skipped and the
// error is returned — the user sees the underlying tmux failure.
func AdoptExistingPersistentSession(tmuxPath, sessionName, wsDir string, opts LaunchOpts) error {
fmt.Fprintln(os.Stderr,
"[ctask] adopting orphaned persistent session (previous owner exited without finalizing)")
startTime := time.Now().UTC().Truncate(time.Second)
adoptedAt := startTime
leasePath := LeasePath(wsDir)
var raced bool
skipped, lockErr := lockfile.WithLock(
ctaskWriteLockPath(wsDir),
sessionWriteLockTimeout, sessionWriteLockStaleAfter,
func() error {
// Re-check under lock. If a concurrent adopter beat us to it,
// fall through to passive reattach. No lease writes, no
// task.yaml.UpdatedAt bump on this branch.
if InspectLease(wsDir) == LeaseStateFreshLocal {
raced = true
return nil
}
if _, rmErr := CleanupStaleLease(leasePath, StaleLeaseAfter); rmErr != nil {
fmt.Fprintf(os.Stderr,
"[ctask] warning: failed to remove orphaned lease: %v\n", rmErr)
}
lease := NewLease(startTime, opts.Agent, opts.Mode)
if err := WriteLease(leasePath, lease); err != nil {
return fmt.Errorf("writing lease: %w", err)
}
// Adoption-only: bump task.yaml.UpdatedAt. Passive reattach
// (and the race fall-through above) leave it untouched.
metaPath := filepath.Join(wsDir, "task.yaml")
if meta, err := workspace.ReadMeta(metaPath); err == nil && meta != nil {
meta.UpdatedAt = startTime
if err := workspace.WriteMeta(metaPath, meta); err != nil {
fmt.Fprintf(os.Stderr,
"[ctask] warning: failed to update task.yaml on adoption: %v\n", err)
}
}
return nil
},
)
if skipped {
fmt.Fprintf(os.Stderr,
"[ctask] Warning: could not acquire metadata lock; falling through to passive reattach\n")
return AttachExisting(tmuxPath, sessionName)
}
if lockErr != nil {
return fmt.Errorf("adoption lease write: %w", lockErr)
}
if raced {
return AttachExisting(tmuxPath, sessionName)
}
// Step 3: fresh start manifest — load-bearing for finalize's diff baseline.
startManifest, err := CaptureManifest(wsDir)
if err != nil {
fmt.Fprintf(os.Stderr,
"[ctask] warning: failed to capture start manifest during adoption: %v; finalize diff will be skipped\n", err)
} else {
mPath := manifestStartPath(wsDir)
if _, lockErr := lockfile.WithLock(
ctaskWriteLockPath(wsDir),
sessionWriteLockTimeout, sessionWriteLockStaleAfter,
func() error { return WriteManifest(mPath, startManifest) },
); lockErr != nil {
fmt.Fprintf(os.Stderr,
"[ctask] warning: failed to write start manifest: %v\n", lockErr)
startManifest = nil
}
}
// Step 4: heartbeat.
hb := StartHeartbeat(leasePath, HeartbeatInterval)
// Step 5: attach. Non-zero exit is a real failure; surface it
// and skip steps 6-8 (polling and finalize). Stop the heartbeat first
// so we don't leak the goroutine.
if attachErr := adoptAttacher(tmuxPath, sessionName); attachErr != nil {
hb.Stop()
return attachErr
}
// Step 6: poll until session ends.
adoptPoll(tmuxPath, sessionName, shell.PollInterval)
// Step 7: stop heartbeat.
hb.Stop()
// Step 8: finalize with adoption fields.
endTime := time.Now().UTC().Truncate(time.Second)
if startManifest != nil {
if err := finalizeAdopted(opts, wsDir, startManifest, startTime, endTime, adoptedAt); err != nil {
fmt.Fprintf(os.Stderr, "[ctask] warning: adoption finalize failed: %v\n", err)
}
}
return nil
}
// finalizeAdopted is the adoption-specific finalize path. Mirrors
// session.Run's finalize() but stamps SessionOwnership = "adopted" and
// AdoptedFromOrphanAt onto the summary.
func finalizeAdopted(opts LaunchOpts, wsDir string, startManifest *Manifest, startTime, endTime, adoptedAt time.Time) error {
endManifest, err := CaptureManifest(wsDir)
if err != nil {
return fmt.Errorf("capturing end manifest: %w", err)
}
diff := DiffManifests(startManifest, endManifest)
agent := opts.Agent
if opts.Shell {
agent = "shell"
}
info := &SessionInfo{
Agent: agent,
Mode: opts.Mode,
StartTime: startTime,
EndTime: endTime,
Diff: diff,
}
sessionID := NewSessionID(currentHostname(), os.Getpid(), startTime)
if l, err := ReadLease(LeasePath(wsDir)); err == nil && l != nil {
sessionID = l.SessionID
}
summary := SummarizeFromDiff(
sessionID, currentHostname(), agent, opts.Mode,
startTime, endTime, diff, endManifest,
)
summary.EndReason = "tmux_session_ended"
summary.DetectedVia = "polling"
summary.SessionOwnership = "adopted"
summary.AdoptedFromOrphanAt = &adoptedAt
skipped, lockErr := lockfile.WithLock(
ctaskWriteLockPath(wsDir),
sessionWriteLockTimeout, sessionWriteLockStaleAfter,
func() error {
if err := AppendSessionLog(wsDir, info); err != nil {
fmt.Fprintf(os.Stderr, "[ctask] warning: append session log failed: %v\n", err)
}
if err := WriteSummary(SummaryPath(wsDir), summary); err != nil {
return fmt.Errorf("write summary: %w", err)
}
if rmErr := os.Remove(LeasePath(wsDir)); rmErr != nil && !os.IsNotExist(rmErr) {
fmt.Fprintf(os.Stderr, "[ctask] warning: could not remove lease: %v\n", rmErr)
}
if rmErr := os.Remove(manifestStartPath(wsDir)); rmErr != nil && !os.IsNotExist(rmErr) {
fmt.Fprintf(os.Stderr, "[ctask] warning: could not remove start manifest: %v\n", rmErr)
}
return nil
},
)
if skipped {
fmt.Fprintf(os.Stderr,
"[ctask] Warning: could not acquire metadata lock at adoption finalize; skipping summary write\n")
return nil
}
return lockErr
}
+224
View File
@@ -0,0 +1,224 @@
package session
import (
"errors"
"os"
"path/filepath"
"testing"
"time"
"github.com/warrenronsiek/ctask/internal/workspace"
)
// adoptionFixture wires a temp workspace with a stale lease, a task.yaml
// with a known initial UpdatedAt, and the test seams (adoptAttacher,
// adoptPoll) overridden to no-ops. Tests using this fixture must not run
// in parallel — the seams are package globals.
type adoptionFixture struct {
wsDir string
staleLease *Lease
initialUpdate time.Time
attachCalls int
}
func newAdoptionFixture(t *testing.T) *adoptionFixture {
t.Helper()
ws := t.TempDir()
// task.yaml with a known UpdatedAt so we can assert the adoption bump.
initial := time.Now().UTC().Add(-2 * time.Hour).Truncate(time.Second)
meta := &workspace.TaskMeta{
ID: "demo-id", Slug: "demo", Title: "demo",
CreatedAt: initial, UpdatedAt: initial,
Status: "active", Category: "projects", Type: "project",
Mode: "local", Agent: "claude",
}
if err := workspace.WriteMeta(filepath.Join(ws, "task.yaml"), meta); err != nil {
t.Fatalf("WriteMeta: %v", err)
}
stale := &Lease{
SessionID: "host-old-1-20260101000000",
PID: 99999,
Hostname: currentHostname(),
Username: "u",
Agent: "claude",
Mode: "local",
StartedAt: time.Now().UTC().Add(-1 * time.Hour),
LastHeartbeatAt: time.Now().UTC().Add(-1 * time.Hour),
Terminal: "test",
}
if err := WriteLease(LeasePath(ws), stale); err != nil {
t.Fatalf("WriteLease: %v", err)
}
return &adoptionFixture{wsDir: ws, staleLease: stale, initialUpdate: initial}
}
// stubSeams installs no-op replacements for adoptAttacher, adoptPoll, and
// the `attacher` used by AttachExisting (which is on the race fall-through
// path). Tests must call t.Cleanup(restore).
func (fx *adoptionFixture) stubSeams(t *testing.T) {
t.Helper()
origA := adoptAttacher
adoptAttacher = func(_, _ string) error { fx.attachCalls++; return nil }
origP := adoptPoll
adoptPoll = func(_, _ string, _ time.Duration) {}
// AttachExisting's seam — race fall-through routes through it.
origE := attacher
attacher = func(_, _ string) error { return nil }
t.Cleanup(func() {
adoptAttacher = origA
adoptPoll = origP
attacher = origE
})
}
func defaultAdoptionOpts(wsDir string) LaunchOpts {
return LaunchOpts{
WsDir: wsDir,
Agent: "claude",
Mode: "local",
Slug: "demo",
Category: "projects",
SessionMode: "persistent",
SessionName: "ctask-projects-demo-abc123",
TmuxPath: "/usr/bin/tmux",
}
}
func TestAdoptionReplacesLeaseAndCapturesStartManifest(t *testing.T) {
fx := newAdoptionFixture(t)
fx.stubSeams(t)
opts := defaultAdoptionOpts(fx.wsDir)
if err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts); err != nil {
t.Fatalf("Adopt: %v", err)
}
if fx.attachCalls != 1 {
t.Errorf("expected attach call, got %d", fx.attachCalls)
}
got, err := ReadLease(LeasePath(fx.wsDir))
if err == nil && got != nil && got.SessionID == fx.staleLease.SessionID {
t.Error("lease still has stale SessionID — adoption did not replace it")
}
// finalize removes the lease at end-of-session; either nil-or-different is acceptable.
mfPath := filepath.Join(fx.wsDir, ".ctask", "manifest-start.json")
if _, err := os.Stat(mfPath); err == nil {
t.Errorf("expected start manifest removed by finalize at %s", mfPath)
}
summary, err := ReadSummary(SummaryPath(fx.wsDir))
if err != nil {
t.Fatalf("ReadSummary: %v", err)
}
if summary == nil {
t.Fatal("summary not written")
}
if summary.SessionOwnership != "adopted" {
t.Errorf("SessionOwnership: got %q, want %q", summary.SessionOwnership, "adopted")
}
if summary.AdoptedFromOrphanAt == nil {
t.Error("AdoptedFromOrphanAt must be set on adopted reattach")
}
if summary.EndReason != "tmux_session_ended" {
t.Errorf("EndReason: got %q", summary.EndReason)
}
if summary.DetectedVia != "polling" {
t.Errorf("DetectedVia: got %q", summary.DetectedVia)
}
}
// Successful adoption must bump task.yaml.UpdatedAt.
func TestAdoptionBumpsUpdatedAtOnSuccess(t *testing.T) {
fx := newAdoptionFixture(t)
fx.stubSeams(t)
opts := defaultAdoptionOpts(fx.wsDir)
if err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts); err != nil {
t.Fatalf("Adopt: %v", err)
}
meta, err := workspace.ReadMeta(filepath.Join(fx.wsDir, "task.yaml"))
if err != nil {
t.Fatalf("ReadMeta: %v", err)
}
if !meta.UpdatedAt.After(fx.initialUpdate) {
t.Errorf("UpdatedAt must be bumped on adoption: got %v, initial %v",
meta.UpdatedAt, fx.initialUpdate)
}
}
// Race-guard fall-through to passive reattach must NOT bump UpdatedAt.
func TestAdoptionRaceGuardFallsThroughAndDoesNotBumpUpdatedAt(t *testing.T) {
fx := newAdoptionFixture(t)
fx.stubSeams(t)
// Simulate concurrent adopter winning by writing a fresh local lease.
freshLease := &Lease{
SessionID: "race-winner-1-x", PID: os.Getpid(), Hostname: currentHostname(),
Username: "u", Agent: "claude", Mode: "local",
StartedAt: time.Now().UTC(), LastHeartbeatAt: time.Now().UTC(),
Terminal: "test",
}
if err := WriteLease(LeasePath(fx.wsDir), freshLease); err != nil {
t.Fatalf("WriteLease: %v", err)
}
opts := defaultAdoptionOpts(fx.wsDir)
if err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts); err != nil {
t.Fatalf("Adopt (race fall-through): %v", err)
}
// Race winner's lease must be intact.
got, err := ReadLease(LeasePath(fx.wsDir))
if err != nil {
t.Fatalf("ReadLease: %v", err)
}
if got.SessionID != "race-winner-1-x" {
t.Errorf("race-winner lease overwritten: got SessionID %q", got.SessionID)
}
// No summary on fall-through — adoption did not run.
if summary, _ := ReadSummary(SummaryPath(fx.wsDir)); summary != nil {
t.Error("fall-through to passive must not write a summary")
}
// UpdatedAt must NOT be bumped on the fall-through path.
meta, err := workspace.ReadMeta(filepath.Join(fx.wsDir, "task.yaml"))
if err != nil {
t.Fatalf("ReadMeta: %v", err)
}
if !meta.UpdatedAt.Equal(fx.initialUpdate) {
t.Errorf("UpdatedAt must NOT be bumped on race fall-through: got %v, initial %v",
meta.UpdatedAt, fx.initialUpdate)
}
}
// A non-zero attach error from the seam propagates as an error.
// Adoption must not silently swallow attach failures.
func TestAdoptionPropagatesAttachError(t *testing.T) {
fx := newAdoptionFixture(t)
wantErr := errors.New("tmux attach-session exited 5")
origA := adoptAttacher
adoptAttacher = func(_, _ string) error { return wantErr }
origP := adoptPoll
adoptPoll = func(_, _ string, _ time.Duration) {
t.Fatal("polling must not run after attach failure")
}
t.Cleanup(func() {
adoptAttacher = origA
adoptPoll = origP
})
opts := defaultAdoptionOpts(fx.wsDir)
err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts)
if err == nil {
t.Fatal("expected error when attach fails")
}
if !errors.Is(err, wantErr) {
t.Errorf("expected wrapped attach error, got %v", err)
}
}