Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion backend/internal/cli/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func TestStopDoesNotShutdownUnverifiedReusedPID(t *testing.T) {
func TestStopUsesShutdownEndpoint(t *testing.T) {
cfg := setConfigEnv(t)
shutdownCalled := make(chan struct{}, 1)
var shutdownSeen atomic.Bool
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/healthz":
Expand All @@ -240,6 +241,7 @@ func TestStopUsesShutdownEndpoint(t *testing.T) {
if err := runfile.Remove(cfg.runFile); err != nil {
t.Fatal(err)
}
shutdownSeen.Store(true)
shutdownCalled <- struct{}{}
_, _ = fmt.Fprintf(w, `{"status":"shutting_down","service":%q,"pid":%d}`, daemonmeta.ServiceName, os.Getpid())
default:
Expand All @@ -253,7 +255,12 @@ func TestStopUsesShutdownEndpoint(t *testing.T) {
}

out, _, err := executeCLI(t, Deps{
ProcessAlive: func(pid int) bool { return pid == os.Getpid() },
ProcessAlive: func(pid int) bool {
if pid != os.Getpid() {
return false
}
return !shutdownSeen.Load()
},
}, "stop", "--json")
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/cli/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newSpawnCommand(ctx *commandContext) *cobra.Command {
})
f.StringVar(&opts.project, "project", "", "Project id to spawn the session in (required)")
f.StringVar(&opts.harness, "harness", "", "Agent harness / --agent: claude-code, codex, aider, opencode, grok, droid, amp, agy, crush, cursor, qwen, copilot, goose, auggie, continue, devin, cline, kimi, kiro, kilocode, vibe, pi, autohand (default: the daemon's AO_AGENT)")
f.StringVar(&opts.branch, "branch", "", "Branch for the session worktree (default: ao/<session-id>)")
f.StringVar(&opts.branch, "branch", "", "Branch for the session worktree (default: ao/<session-id>/root)")
f.StringVar(&opts.prompt, "prompt", "", "Initial prompt for the agent")
f.StringVar(&opts.issue, "issue", "", "Issue id to associate with the session")
f.StringVar(&opts.claimPR, "claim-pr", "", "Immediately claim an existing PR for the spawned session")
Expand Down
15 changes: 11 additions & 4 deletions backend/internal/cli/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,28 @@ func (c *commandContext) waitForStopped(ctx context.Context, pid int, runFilePat
return daemonStatus{}, err
}
alive := c.deps.ProcessAlive(pid)
if info == nil {
return daemonStatus{State: stateStopped, RunFile: runFilePath, DataDir: dataDir}, nil
}
if !alive {
// Only remove the run-file if it still belongs to the process we
// stopped. A concurrent `ao start` may have already written a new
// run-file for a different daemon; removing that would corrupt its
// handshake and make a live daemon look stopped.
if info.PID == pid {
if info != nil && info.PID == pid {
if err := runfile.Remove(runFilePath); err != nil {
return daemonStatus{}, err
}
}
return daemonStatus{State: stateStopped, RunFile: runFilePath, DataDir: dataDir}, nil
}
if info == nil {
// The daemon removes running.json before the process necessarily exits.
// Keep waiting so Windows releases inherited handles such as daemon.log
// before tests or callers clean up the data directory.
if !c.deps.Now().Before(deadline) {
return daemonStatus{}, fmt.Errorf("daemon pid %d removed run-file but did not exit within %s", pid, timeout)
}
c.deps.Sleep(100 * time.Millisecond)
continue
}
if !c.deps.Now().Before(deadline) {
return daemonStatus{}, fmt.Errorf("daemon pid %d did not stop within %s", pid, timeout)
}
Expand Down
37 changes: 37 additions & 0 deletions backend/internal/cli/stop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,40 @@ func TestWaitForStoppedRemovesOwnRunFile(t *testing.T) {
t.Fatalf("own run-file should have been removed, got %#v", info)
}
}

func TestWaitForStoppedWaitsAfterRunFileRemovedUntilProcessExits(t *testing.T) {
dir := t.TempDir()
runFile := filepath.Join(dir, "running.json")

const stoppedPID = 1111
now := time.Unix(200, 0).UTC()
aliveChecks := 0
sleeps := 0
c := &commandContext{deps: Deps{
ProcessAlive: func(int) bool {
aliveChecks++
return aliveChecks < 3
},
Now: func() time.Time {
return now
},
Sleep: func(d time.Duration) {
sleeps++
now = now.Add(d)
},
}.withDefaults()}

st, err := c.waitForStopped(context.Background(), stoppedPID, runFile, dir, time.Second)
if err != nil {
t.Fatal(err)
}
if st.State != stateStopped {
t.Fatalf("state = %q, want stopped", st.State)
}
if sleeps == 0 {
t.Fatal("waitForStopped returned before waiting for process exit")
}
if aliveChecks < 3 {
t.Fatalf("process checks = %d, want at least 3", aliveChecks)
}
}
29 changes: 21 additions & 8 deletions backend/internal/observe/scm/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,27 +605,40 @@ func (o *Observer) discoverNewPRs(ctx context.Context, sessionRepos []sessionRep
}

// matchSession picks the session that owns sourceBranch. A session owns the
// branch when it is an exact match or a stacked descendant ("branch/..."). When
// several session branches are prefixes of the same source branch the longest
// (most specific) one wins, so a child session claims its own stacked PRs rather
// than the ancestor session.
// branch when it is an exact match or a stacked descendant ("branch/..."). The
// default worker branch is a leaf named "<namespace>/root"; for that shape the
// session also owns sibling branches under "<namespace>/..." so Git can create
// child PR branches without colliding with the root ref. When several session
// branches are prefixes of the same source branch the longest (most specific)
// one wins, so a child session claims its own stacked PRs rather than the
// ancestor session.
func matchSession(candidates []sessionRepo, sourceBranch string) (sessionRepo, bool) {
var best sessionRepo
bestLen := -1
for _, sr := range candidates {
if sr.branch == "" {
continue
}
if sr.branch == sourceBranch || strings.HasPrefix(sourceBranch, sr.branch+"/") {
if len(sr.branch) > bestLen {
best = sr
bestLen = len(sr.branch)
for _, prefix := range sessionBranchPrefixes(sr.branch) {
if prefix == sourceBranch || strings.HasPrefix(sourceBranch, prefix+"/") {
if len(prefix) > bestLen {
best = sr
bestLen = len(prefix)
}
}
}
}
return best, bestLen >= 0
}

func sessionBranchPrefixes(branch string) []string {
prefixes := []string{branch}
if namespace, ok := strings.CutSuffix(branch, "/root"); ok && namespace != "" {
prefixes = append(prefixes, namespace)
}
return prefixes
}

func (o *Observer) selectRefreshCandidates(ctx context.Context, subjects map[string]*subject, guards map[string]repoGuardState, markRepoFailed func(ports.SCMRepo)) refreshSelection {
selection := refreshSelection{
subjectsByPR: map[string]*subject{},
Expand Down
32 changes: 32 additions & 0 deletions backend/internal/observe/scm/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,38 @@ func TestPoll_DiscoversStackedChildByBranchPrefix(t *testing.T) {
}
}

func TestPoll_DiscoversSiblingUnderRootSessionNamespace(t *testing.T) {
store := testStoreWithSession()
store.sessions[0].Metadata.Branch = "ao/p-1/root"
prObs := testObs(1)
prObs.PR.SourceBranch = "ao/p-1/fix"
prObs.PR.TargetBranch = "main"
provider := &fakeProvider{
repoGuards: map[string]ports.SCMGuardResult{prKey(testRepo, 0): {ETag: "v2"}},
openPRs: map[string][]ports.SCMPRObservation{prKey(testRepo, 0): {
{URL: "https://github.com/o/r/pull/1", Number: 1, SourceBranch: "ao/p-1/fix", HeadRepo: "o/r", TargetBranch: "main", HeadSHA: "sha1"},
}},
observations: map[string]ports.SCMObservation{prKey(testRepo, 1): prObs},
}
lc := &fakeLifecycle{}
obs := newTestObserver(store, provider, lc, time.Unix(1, 0).UTC())
if err := obs.Poll(context.Background()); err != nil {
t.Fatal(err)
}
if len(store.writes) == 0 {
t.Fatal("expected discovered PR write")
}
if got := store.writes[0].pr.SourceBranch; got != "ao/p-1/fix" {
t.Fatalf("source branch = %q, want ao/p-1/fix", got)
}
if got := store.writes[0].pr.SessionID; got != "p-1" {
t.Fatalf("session id = %q, want p-1", got)
}
if len(lc.observed) != 1 {
t.Fatalf("lifecycle observations = %d, want 1", len(lc.observed))
}
}

// A PR whose head branch matches a session branch but lives in a fork (its head
// repo differs from the project repo) must not be auto-attributed: its commits
// are not the session's work. It is neither fetched nor persisted.
Expand Down
20 changes: 11 additions & 9 deletions backend/internal/session_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,10 @@ func defaultSessionBranch(id domain.SessionID, kind domain.SessionKind, prefix s
return "ao/" + prefix + "-orchestrator"
}
// A fresh, unique branch per worker session: gitworktree can't add a worktree
// on a branch already checked out elsewhere (e.g. main), so default to one
// derived from the assigned session id.
return "ao/" + string(id)
// on a branch already checked out elsewhere (e.g. main). Put the root work
// branch under a session namespace so sibling PR branches such as
// ao/<session>/<topic> remain valid Git refs.
return "ao/" + string(id) + "/root"
}

func buildPrompt(cfg ports.SpawnConfig) string {
Expand Down Expand Up @@ -699,16 +700,17 @@ Only ping the orchestrator for true blockers, cross-session coordination, or dec

// workerMultiPRPrompt explains the branch convention AO uses to attribute pull
// requests to this session. A worker may open several PRs in one session: AO
// tracks every open PR whose source branch is the session's own branch or a
// descendant of it. Stacking a PR on top of another therefore only requires
// branching off with a `<session-branch>/<topic>` name; PRs on unrelated
// branches are attributed to whichever session owns their branch prefix.
// tracks every open PR whose source branch is the session's own branch or lives
// in the same session namespace. Stacking a PR on top of another therefore only
// requires branching off with a `<session-namespace>/<topic>` name; PRs on
// unrelated branches are attributed to whichever session owns their namespace.
func workerMultiPRPrompt() string {
return `## Pull requests for this session

You can open more than one pull request from this session. AO attributes a PR to you when its source branch is your session's working branch or a branch descended from it (a "/"-separated child like ` + "`your-branch/topic`" + `).
You can open more than one pull request from this session. AO attributes a PR to you when its source branch is your session's working branch or another branch in the same session namespace.

- For independent PRs, create each source branch as a child of your session branch (` + "`your-branch/<topic>`" + `) so it stays in this session's namespace, then open the PR targeting your base branch as usual. The PR can target the base branch; only the source branch needs to stay under your session branch for AO to track it.
- If your current branch ends in ` + "`/root`" + `, create independent PR branches as siblings under the same namespace, for example ` + "`<namespace>/<topic>`" + ` from ` + "`<namespace>/root`" + `. Do not create ` + "`<namespace>/root/<topic>`" + `.
- Otherwise, create each source branch as a child of your session branch (` + "`your-branch/<topic>`" + `) so it stays in this session's namespace, then open the PR targeting your base branch as usual. The PR can target the base branch; only the source branch needs to stay under your session namespace for AO to track it.
- To stack a PR on top of another (so it merges after its parent), create the child branch from the parent branch and name it ` + "`<parent-branch>/<topic>`" + `, then target the parent branch in the PR. AO recognizes the stack from the branch relationship and will only nudge you to resolve conflicts on the bottom-most PR.

Keep branch names within your session's branch namespace so AO can track every PR you open.`
Expand Down
7 changes: 4 additions & 3 deletions backend/internal/session_manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,10 @@ func TestSpawn_DefaultsBranchFromSessionID(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// An empty SpawnConfig.Branch defaults to a unique per-session branch.
if got := st.sessions[s.ID].Metadata.Branch; got != "ao/mer-1" {
t.Fatalf("default branch = %q, want ao/mer-1", got)
// An empty SpawnConfig.Branch defaults to a unique per-session root branch
// under a namespace that can also hold sibling PR branches.
if got := st.sessions[s.ID].Metadata.Branch; got != "ao/mer-1/root" {
t.Fatalf("default branch = %q, want ao/mer-1/root", got)
}
}

Expand Down
Loading