From 08fb5bb1c3fd1398c94cc5844779c2cbd8637baf Mon Sep 17 00:00:00 2001 From: typebasedio Date: Fri, 8 May 2026 13:56:46 -0400 Subject: [PATCH] feat(v0.5.3): AdoptExistingPersistentSession with race guard, UpdatedAt bump, attach-error propagation --- internal/session/adopt.go | 206 ++++++++++++++++++++++++++++++ internal/session/adopt_test.go | 224 +++++++++++++++++++++++++++++++++ 2 files changed, 430 insertions(+) create mode 100644 internal/session/adopt.go create mode 100644 internal/session/adopt_test.go diff --git a/internal/session/adopt.go b/internal/session/adopt.go new file mode 100644 index 0000000..3e35f3e --- /dev/null +++ b/internal/session/adopt.go @@ -0,0 +1,206 @@ +package session + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/warrenronsiek/ctask/internal/lockfile" + "github.com/warrenronsiek/ctask/internal/shell" + "github.com/warrenronsiek/ctask/internal/workspace" +) + +// adoptAttacher and adoptPoll are test seams that *wrap* shell primitives +// rather than hand-rolling tmux invocations. Production calls go through +// shell.AttachSession / shell.PollSessionEnd. Tests override these +// variables; do not run such tests with t.Parallel(). +var ( + adoptAttacher = shell.AttachSession + adoptPoll = shell.PollSessionEnd +) + +// AdoptExistingPersistentSession is the adopted-reattach path. It is +// invoked when a tmux session for the workspace exists but the lease is +// missing, stale, or fresh-but-from-another-host (the cmd-layer dispatcher +// has already prompted for confirmation in the fresh_remote case before +// calling here — see cmd/persistent.go::confirmFreshRemoteAdoption). +// +// Eight-step flow (see v0.5.3-spec.md §3 path C): +// 1. Print one diagnostic line announcing the adoption. +// 2. Acquire write lock; under lock, re-read lease state. If now +// fresh-local (concurrent adopter raced ahead), release the lock and +// fall through to AttachExisting — the race winner owns the lease. +// Otherwise, remove the orphaned lease, write a new lease for this +// process, and bump task.yaml.UpdatedAt. +// 3. Capture a fresh start manifest. Without it, finalize has no +// reliable diff baseline. +// 4. Start the heartbeat goroutine. +// 5. shell.AttachSession (returns nil on clean exit, error on non-zero). +// 6. Polling loop until tmux reports session gone — runs ONLY after a +// successful attach. +// 7. Stop the heartbeat goroutine. +// 8. finalize with SessionOwnership="adopted" and AdoptedFromOrphanAt set. +// +// On attach failure (step 5 returns error), steps 6-8 are skipped and the +// error is returned — the user sees the underlying tmux failure. +func AdoptExistingPersistentSession(tmuxPath, sessionName, wsDir string, opts LaunchOpts) error { + fmt.Fprintln(os.Stderr, + "[ctask] adopting orphaned persistent session (previous owner exited without finalizing)") + + startTime := time.Now().UTC().Truncate(time.Second) + adoptedAt := startTime + leasePath := LeasePath(wsDir) + + var raced bool + skipped, lockErr := lockfile.WithLock( + ctaskWriteLockPath(wsDir), + sessionWriteLockTimeout, sessionWriteLockStaleAfter, + func() error { + // Re-check under lock. If a concurrent adopter beat us to it, + // fall through to passive reattach. No lease writes, no + // task.yaml.UpdatedAt bump on this branch. + if InspectLease(wsDir) == LeaseStateFreshLocal { + raced = true + return nil + } + if _, rmErr := CleanupStaleLease(leasePath, StaleLeaseAfter); rmErr != nil { + fmt.Fprintf(os.Stderr, + "[ctask] warning: failed to remove orphaned lease: %v\n", rmErr) + } + lease := NewLease(startTime, opts.Agent, opts.Mode) + if err := WriteLease(leasePath, lease); err != nil { + return fmt.Errorf("writing lease: %w", err) + } + // Adoption-only: bump task.yaml.UpdatedAt. Passive reattach + // (and the race fall-through above) leave it untouched. + metaPath := filepath.Join(wsDir, "task.yaml") + if meta, err := workspace.ReadMeta(metaPath); err == nil && meta != nil { + meta.UpdatedAt = startTime + if err := workspace.WriteMeta(metaPath, meta); err != nil { + fmt.Fprintf(os.Stderr, + "[ctask] warning: failed to update task.yaml on adoption: %v\n", err) + } + } + return nil + }, + ) + if skipped { + fmt.Fprintf(os.Stderr, + "[ctask] Warning: could not acquire metadata lock; falling through to passive reattach\n") + return AttachExisting(tmuxPath, sessionName) + } + if lockErr != nil { + return fmt.Errorf("adoption lease write: %w", lockErr) + } + if raced { + return AttachExisting(tmuxPath, sessionName) + } + + // Step 3: fresh start manifest — load-bearing for finalize's diff baseline. + startManifest, err := CaptureManifest(wsDir) + if err != nil { + fmt.Fprintf(os.Stderr, + "[ctask] warning: failed to capture start manifest during adoption: %v; finalize diff will be skipped\n", err) + } else { + mPath := manifestStartPath(wsDir) + if _, lockErr := lockfile.WithLock( + ctaskWriteLockPath(wsDir), + sessionWriteLockTimeout, sessionWriteLockStaleAfter, + func() error { return WriteManifest(mPath, startManifest) }, + ); lockErr != nil { + fmt.Fprintf(os.Stderr, + "[ctask] warning: failed to write start manifest: %v\n", lockErr) + startManifest = nil + } + } + + // Step 4: heartbeat. + hb := StartHeartbeat(leasePath, HeartbeatInterval) + + // Step 5: attach. Non-zero exit is a real failure; surface it + // and skip steps 6-8 (polling and finalize). Stop the heartbeat first + // so we don't leak the goroutine. + if attachErr := adoptAttacher(tmuxPath, sessionName); attachErr != nil { + hb.Stop() + return attachErr + } + + // Step 6: poll until session ends. + adoptPoll(tmuxPath, sessionName, shell.PollInterval) + + // Step 7: stop heartbeat. + hb.Stop() + + // Step 8: finalize with adoption fields. + endTime := time.Now().UTC().Truncate(time.Second) + if startManifest != nil { + if err := finalizeAdopted(opts, wsDir, startManifest, startTime, endTime, adoptedAt); err != nil { + fmt.Fprintf(os.Stderr, "[ctask] warning: adoption finalize failed: %v\n", err) + } + } + return nil +} + +// finalizeAdopted is the adoption-specific finalize path. Mirrors +// session.Run's finalize() but stamps SessionOwnership = "adopted" and +// AdoptedFromOrphanAt onto the summary. +func finalizeAdopted(opts LaunchOpts, wsDir string, startManifest *Manifest, startTime, endTime, adoptedAt time.Time) error { + endManifest, err := CaptureManifest(wsDir) + if err != nil { + return fmt.Errorf("capturing end manifest: %w", err) + } + diff := DiffManifests(startManifest, endManifest) + + agent := opts.Agent + if opts.Shell { + agent = "shell" + } + info := &SessionInfo{ + Agent: agent, + Mode: opts.Mode, + StartTime: startTime, + EndTime: endTime, + Diff: diff, + } + + sessionID := NewSessionID(currentHostname(), os.Getpid(), startTime) + if l, err := ReadLease(LeasePath(wsDir)); err == nil && l != nil { + sessionID = l.SessionID + } + + summary := SummarizeFromDiff( + sessionID, currentHostname(), agent, opts.Mode, + startTime, endTime, diff, endManifest, + ) + summary.EndReason = "tmux_session_ended" + summary.DetectedVia = "polling" + summary.SessionOwnership = "adopted" + summary.AdoptedFromOrphanAt = &adoptedAt + + skipped, lockErr := lockfile.WithLock( + ctaskWriteLockPath(wsDir), + sessionWriteLockTimeout, sessionWriteLockStaleAfter, + func() error { + if err := AppendSessionLog(wsDir, info); err != nil { + fmt.Fprintf(os.Stderr, "[ctask] warning: append session log failed: %v\n", err) + } + if err := WriteSummary(SummaryPath(wsDir), summary); err != nil { + return fmt.Errorf("write summary: %w", err) + } + if rmErr := os.Remove(LeasePath(wsDir)); rmErr != nil && !os.IsNotExist(rmErr) { + fmt.Fprintf(os.Stderr, "[ctask] warning: could not remove lease: %v\n", rmErr) + } + if rmErr := os.Remove(manifestStartPath(wsDir)); rmErr != nil && !os.IsNotExist(rmErr) { + fmt.Fprintf(os.Stderr, "[ctask] warning: could not remove start manifest: %v\n", rmErr) + } + return nil + }, + ) + if skipped { + fmt.Fprintf(os.Stderr, + "[ctask] Warning: could not acquire metadata lock at adoption finalize; skipping summary write\n") + return nil + } + return lockErr +} diff --git a/internal/session/adopt_test.go b/internal/session/adopt_test.go new file mode 100644 index 0000000..f8e67af --- /dev/null +++ b/internal/session/adopt_test.go @@ -0,0 +1,224 @@ +package session + +import ( + "errors" + "os" + "path/filepath" + "testing" + "time" + + "github.com/warrenronsiek/ctask/internal/workspace" +) + +// adoptionFixture wires a temp workspace with a stale lease, a task.yaml +// with a known initial UpdatedAt, and the test seams (adoptAttacher, +// adoptPoll) overridden to no-ops. Tests using this fixture must not run +// in parallel — the seams are package globals. +type adoptionFixture struct { + wsDir string + staleLease *Lease + initialUpdate time.Time + attachCalls int +} + +func newAdoptionFixture(t *testing.T) *adoptionFixture { + t.Helper() + ws := t.TempDir() + + // task.yaml with a known UpdatedAt so we can assert the adoption bump. + initial := time.Now().UTC().Add(-2 * time.Hour).Truncate(time.Second) + meta := &workspace.TaskMeta{ + ID: "demo-id", Slug: "demo", Title: "demo", + CreatedAt: initial, UpdatedAt: initial, + Status: "active", Category: "projects", Type: "project", + Mode: "local", Agent: "claude", + } + if err := workspace.WriteMeta(filepath.Join(ws, "task.yaml"), meta); err != nil { + t.Fatalf("WriteMeta: %v", err) + } + + stale := &Lease{ + SessionID: "host-old-1-20260101000000", + PID: 99999, + Hostname: currentHostname(), + Username: "u", + Agent: "claude", + Mode: "local", + StartedAt: time.Now().UTC().Add(-1 * time.Hour), + LastHeartbeatAt: time.Now().UTC().Add(-1 * time.Hour), + Terminal: "test", + } + if err := WriteLease(LeasePath(ws), stale); err != nil { + t.Fatalf("WriteLease: %v", err) + } + + return &adoptionFixture{wsDir: ws, staleLease: stale, initialUpdate: initial} +} + +// stubSeams installs no-op replacements for adoptAttacher, adoptPoll, and +// the `attacher` used by AttachExisting (which is on the race fall-through +// path). Tests must call t.Cleanup(restore). +func (fx *adoptionFixture) stubSeams(t *testing.T) { + t.Helper() + origA := adoptAttacher + adoptAttacher = func(_, _ string) error { fx.attachCalls++; return nil } + origP := adoptPoll + adoptPoll = func(_, _ string, _ time.Duration) {} + // AttachExisting's seam — race fall-through routes through it. + origE := attacher + attacher = func(_, _ string) error { return nil } + t.Cleanup(func() { + adoptAttacher = origA + adoptPoll = origP + attacher = origE + }) +} + +func defaultAdoptionOpts(wsDir string) LaunchOpts { + return LaunchOpts{ + WsDir: wsDir, + Agent: "claude", + Mode: "local", + Slug: "demo", + Category: "projects", + SessionMode: "persistent", + SessionName: "ctask-projects-demo-abc123", + TmuxPath: "/usr/bin/tmux", + } +} + +func TestAdoptionReplacesLeaseAndCapturesStartManifest(t *testing.T) { + fx := newAdoptionFixture(t) + fx.stubSeams(t) + + opts := defaultAdoptionOpts(fx.wsDir) + if err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts); err != nil { + t.Fatalf("Adopt: %v", err) + } + if fx.attachCalls != 1 { + t.Errorf("expected attach call, got %d", fx.attachCalls) + } + + got, err := ReadLease(LeasePath(fx.wsDir)) + if err == nil && got != nil && got.SessionID == fx.staleLease.SessionID { + t.Error("lease still has stale SessionID — adoption did not replace it") + } + // finalize removes the lease at end-of-session; either nil-or-different is acceptable. + + mfPath := filepath.Join(fx.wsDir, ".ctask", "manifest-start.json") + if _, err := os.Stat(mfPath); err == nil { + t.Errorf("expected start manifest removed by finalize at %s", mfPath) + } + + summary, err := ReadSummary(SummaryPath(fx.wsDir)) + if err != nil { + t.Fatalf("ReadSummary: %v", err) + } + if summary == nil { + t.Fatal("summary not written") + } + if summary.SessionOwnership != "adopted" { + t.Errorf("SessionOwnership: got %q, want %q", summary.SessionOwnership, "adopted") + } + if summary.AdoptedFromOrphanAt == nil { + t.Error("AdoptedFromOrphanAt must be set on adopted reattach") + } + if summary.EndReason != "tmux_session_ended" { + t.Errorf("EndReason: got %q", summary.EndReason) + } + if summary.DetectedVia != "polling" { + t.Errorf("DetectedVia: got %q", summary.DetectedVia) + } +} + +// Successful adoption must bump task.yaml.UpdatedAt. +func TestAdoptionBumpsUpdatedAtOnSuccess(t *testing.T) { + fx := newAdoptionFixture(t) + fx.stubSeams(t) + + opts := defaultAdoptionOpts(fx.wsDir) + if err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts); err != nil { + t.Fatalf("Adopt: %v", err) + } + meta, err := workspace.ReadMeta(filepath.Join(fx.wsDir, "task.yaml")) + if err != nil { + t.Fatalf("ReadMeta: %v", err) + } + if !meta.UpdatedAt.After(fx.initialUpdate) { + t.Errorf("UpdatedAt must be bumped on adoption: got %v, initial %v", + meta.UpdatedAt, fx.initialUpdate) + } +} + +// Race-guard fall-through to passive reattach must NOT bump UpdatedAt. +func TestAdoptionRaceGuardFallsThroughAndDoesNotBumpUpdatedAt(t *testing.T) { + fx := newAdoptionFixture(t) + fx.stubSeams(t) + + // Simulate concurrent adopter winning by writing a fresh local lease. + freshLease := &Lease{ + SessionID: "race-winner-1-x", PID: os.Getpid(), Hostname: currentHostname(), + Username: "u", Agent: "claude", Mode: "local", + StartedAt: time.Now().UTC(), LastHeartbeatAt: time.Now().UTC(), + Terminal: "test", + } + if err := WriteLease(LeasePath(fx.wsDir), freshLease); err != nil { + t.Fatalf("WriteLease: %v", err) + } + + opts := defaultAdoptionOpts(fx.wsDir) + if err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts); err != nil { + t.Fatalf("Adopt (race fall-through): %v", err) + } + + // Race winner's lease must be intact. + got, err := ReadLease(LeasePath(fx.wsDir)) + if err != nil { + t.Fatalf("ReadLease: %v", err) + } + if got.SessionID != "race-winner-1-x" { + t.Errorf("race-winner lease overwritten: got SessionID %q", got.SessionID) + } + + // No summary on fall-through — adoption did not run. + if summary, _ := ReadSummary(SummaryPath(fx.wsDir)); summary != nil { + t.Error("fall-through to passive must not write a summary") + } + + // UpdatedAt must NOT be bumped on the fall-through path. + meta, err := workspace.ReadMeta(filepath.Join(fx.wsDir, "task.yaml")) + if err != nil { + t.Fatalf("ReadMeta: %v", err) + } + if !meta.UpdatedAt.Equal(fx.initialUpdate) { + t.Errorf("UpdatedAt must NOT be bumped on race fall-through: got %v, initial %v", + meta.UpdatedAt, fx.initialUpdate) + } +} + +// A non-zero attach error from the seam propagates as an error. +// Adoption must not silently swallow attach failures. +func TestAdoptionPropagatesAttachError(t *testing.T) { + fx := newAdoptionFixture(t) + + wantErr := errors.New("tmux attach-session exited 5") + origA := adoptAttacher + adoptAttacher = func(_, _ string) error { return wantErr } + origP := adoptPoll + adoptPoll = func(_, _ string, _ time.Duration) { + t.Fatal("polling must not run after attach failure") + } + t.Cleanup(func() { + adoptAttacher = origA + adoptPoll = origP + }) + + opts := defaultAdoptionOpts(fx.wsDir) + err := AdoptExistingPersistentSession(opts.TmuxPath, opts.SessionName, fx.wsDir, opts) + if err == nil { + t.Fatal("expected error when attach fails") + } + if !errors.Is(err, wantErr) { + t.Errorf("expected wrapped attach error, got %v", err) + } +}