Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/internal/httpd/controllers/reviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func writeReviewError(w http.ResponseWriter, r *http.Request, err error) {
envelope.WriteAPIError(w, r, http.StatusUnprocessableEntity, "unprocessable", "REVIEW_INVALID", err.Error(), nil)
case errors.Is(err, reviewsvc.ErrNotFound):
envelope.WriteAPIError(w, r, http.StatusNotFound, "not_found", "REVIEW_NOT_FOUND", err.Error(), nil)
case errors.Is(err, reviewsvc.ErrAgentBinaryNotFound):
envelope.WriteAPIError(w, r, http.StatusUnprocessableEntity, "unprocessable", "REVIEWER_BINARY_NOT_FOUND", err.Error(), nil)
default:
envelope.WriteAPIError(w, r, http.StatusInternalServerError, "internal", "REVIEW_OPERATION_FAILED", "Review operation failed", nil)
}
Expand Down
61 changes: 61 additions & 0 deletions backend/internal/httpd/controllers/reviews_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package controllers_test

import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/httpd"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
reviewcore "github.com/aoagents/agent-orchestrator/backend/internal/review"
reviewsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/review"
)

type fakeReviewService struct {
triggerErr error
}

func (f *fakeReviewService) Trigger(context.Context, domain.SessionID) (reviewcore.TriggerResult, error) {
if f.triggerErr != nil {
return reviewcore.TriggerResult{}, f.triggerErr
}
return reviewcore.TriggerResult{Run: domain.ReviewRun{ID: "run-1"}, Created: true}, nil
}

func (f *fakeReviewService) Submit(context.Context, domain.SessionID, string, domain.ReviewVerdict, string) (domain.ReviewRun, error) {
return domain.ReviewRun{}, nil
}

func (f *fakeReviewService) List(context.Context, domain.SessionID) (reviewcore.SessionReviews, error) {
return reviewcore.SessionReviews{}, nil
}

func newReviewTestServer(t *testing.T, svc reviewsvc.Manager) *httptest.Server {
t.Helper()
log := slog.New(slog.NewTextHandler(io.Discard, nil))
srv := httptest.NewServer(httpd.NewRouterWithControl(config.Config{}, log, nil, httpd.APIDeps{Reviews: svc}, httpd.ControlDeps{}))
t.Cleanup(srv.Close)
return srv
}

func TestReviewsTrigger_MissingReviewerBinaryReturns422WithCause(t *testing.T) {
err := fmt.Errorf("launch reviewer: reviewer command: claude: %w", ports.ErrAgentBinaryNotFound)
srv := newReviewTestServer(t, &fakeReviewService{triggerErr: err})

body, status, headers := doRequest(t, srv, "POST", "/api/v1/sessions/mer-1/reviews/trigger", "")
assertJSON(t, headers)
assertErrorCode(t, body, status, http.StatusUnprocessableEntity, "REVIEWER_BINARY_NOT_FOUND")

var got errorBody
mustJSON(t, body, &got)
if !strings.Contains(got.Message, "claude") || !strings.Contains(got.Message, ports.ErrAgentBinaryNotFound.Error()) {
t.Fatalf("message = %q, want reviewer binary cause", got.Message)
}
}
76 changes: 43 additions & 33 deletions backend/internal/review/review.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ type SessionReviews struct {
}

// Trigger starts (or reuses) a review of a worker's PR at its current head:
// - if a run already exists for this commit, it is returned unchanged;
// - if a non-failed run already exists for this commit, it is returned unchanged;
// - otherwise, if a live reviewer pane exists, it is messaged to review the
// new commit; if not, a fresh reviewer is spawned;
// - only after the reviewer is launched is the run recorded.
// - the run is recorded before launch so startup failures leave a visible
// failed pass instead of an empty gap.
func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (TriggerResult, error) {
if workerID == "" {
return TriggerResult{}, fmt.Errorf("%w: worker session id is required", ErrInvalid)
Expand Down Expand Up @@ -183,10 +184,11 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge
return TriggerResult{}, err
}

// Idempotency: a pass already exists for this commit — return it as-is.
// Idempotency: return a non-failed pass as-is. Failed passes stay visible
// but can be retried after the user fixes the underlying issue.
if existing, ok, err := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); err != nil {
return TriggerResult{}, err
} else if ok {
} else if ok && existing.Status != domain.ReviewRunFailed {
return TriggerResult{Run: existing, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil
}

Expand All @@ -206,24 +208,57 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge
TargetSHA: targetSHA,
}

review, err = e.upsertReview(ctx, worker, harness, pr.URL, review.ReviewerHandleID, now)
if err != nil {
return TriggerResult{}, err
}
run := domain.ReviewRun{
ID: runID,
ReviewID: review.ID,
SessionID: workerID,
Harness: harness,
PRURL: pr.URL,
TargetSHA: targetSHA,
Status: domain.ReviewRunRunning,
Verdict: domain.VerdictNone,
CreatedAt: now,
}
if err := e.store.InsertReviewRun(ctx, run); err != nil {
if errors.Is(err, domain.ErrDuplicateReviewRun) {
if existing, ok, getErr := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); getErr != nil {
return TriggerResult{}, getErr
} else if ok {
return TriggerResult{Run: existing, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil
}
}
return TriggerResult{}, err
}

failRun := func(err error) error {
if _, updateErr := e.store.UpdateReviewRunResult(ctx, run.ID, domain.ReviewRunFailed, domain.VerdictNone, err.Error()); updateErr != nil {
return updateErr
}
return err
}

// Reuse a live reviewer pane if there is one; otherwise spawn a fresh one.
handleID := ""
if hasReview && review.ReviewerHandleID != "" {
alive, err := e.launcher.Alive(ctx, review.ReviewerHandleID)
if err != nil {
return TriggerResult{}, err
return TriggerResult{}, failRun(err)
}
if alive {
if err := e.launcher.Notify(ctx, review.ReviewerHandleID, spec); err != nil {
return TriggerResult{}, fmt.Errorf("notify reviewer: %w", err)
return TriggerResult{}, failRun(fmt.Errorf("notify reviewer: %w", err))
}
handleID = review.ReviewerHandleID
}
}
if handleID == "" {
h, err := e.launcher.Spawn(ctx, spec)
if err != nil {
return TriggerResult{}, fmt.Errorf("launch reviewer: %w", err)
return TriggerResult{}, failRun(fmt.Errorf("launch reviewer: %w", err))
}
handleID = h
}
Expand All @@ -233,32 +268,7 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge
if err != nil {
return TriggerResult{}, err
}
run := domain.ReviewRun{
ID: runID,
ReviewID: review.ID,
SessionID: workerID,
Harness: harness,
PRURL: pr.URL,
TargetSHA: targetSHA,
Status: domain.ReviewRunRunning,
Verdict: domain.VerdictNone,
CreatedAt: now,
}
if err := e.store.InsertReviewRun(ctx, run); err != nil {
// The per-worker lock serialises in-process triggers, but the unique
// index (migration 0013) can still reject a run a concurrent daemon (or
// a pre-lock restart) recorded for this commit. The reviewer is already
// launched, so don't surface a raw error: re-read the recorded run and
// return it as the existing, not-newly-created pass.
if errors.Is(err, domain.ErrDuplicateReviewRun) {
if existing, ok, getErr := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); getErr != nil {
return TriggerResult{}, getErr
} else if ok {
return TriggerResult{Run: existing, ReviewerHandleID: handleID, Created: false}, nil
}
}
return TriggerResult{}, err
}
run.ReviewID = review.ID
return TriggerResult{Run: run, ReviewerHandleID: handleID, Created: true}, nil
}

Expand Down
50 changes: 40 additions & 10 deletions backend/internal/review/review_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package review
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// --- fakes ---
Expand Down Expand Up @@ -217,8 +220,8 @@ func TestTriggerConcurrentSameWorkerSpawnsOnce(t *testing.T) {
}

func TestTriggerFallsBackToExistingRunOnUniqueConflict(t *testing.T) {
// The idempotency check passes (no run yet), the reviewer launches, but the
// insert loses to a concurrent writer the unique index already accepted.
// The idempotency check passes (no run yet), but the insert loses to a
// concurrent writer the unique index already accepted.
store := &fakeStore{insertErr: domain.ErrDuplicateReviewRun}
launcher := &fakeLauncher{handle: "review-mer-1"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)
Expand All @@ -233,8 +236,8 @@ func TestTriggerFallsBackToExistingRunOnUniqueConflict(t *testing.T) {
if res.Run.TargetSHA != "sha1" || res.Run.ID != "winner-id-1" {
t.Fatalf("expected the recorded winner run, got %+v", res.Run)
}
if launcher.spawnCount != 1 {
t.Fatalf("reviewer should still have launched once: %+v", launcher)
if launcher.spawnCount != 0 {
t.Fatalf("reviewer should not launch after unique conflict: %+v", launcher)
}
}

Expand Down Expand Up @@ -300,16 +303,43 @@ func TestTriggerSpawnsWhenReviewerDead(t *testing.T) {
}
}

func TestTriggerLaunchFailureRecordsNothing(t *testing.T) {
func TestTriggerLaunchFailureRecordsFailedRun(t *testing.T) {
store := &fakeStore{}
launcher := &fakeLauncher{spawnErr: errors.New("boom")}
launcher := &fakeLauncher{spawnErr: fmt.Errorf("claude: %w", ports.ErrAgentBinaryNotFound)}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

if _, err := eng.Trigger(context.Background(), "mer-1"); err == nil {
t.Fatal("want launch error")
if _, err := eng.Trigger(context.Background(), "mer-1"); !errors.Is(err, ports.ErrAgentBinaryNotFound) {
t.Fatalf("err = %v, want ports.ErrAgentBinaryNotFound", err)
}
if store.review == nil || len(store.runs) != 1 {
t.Fatalf("expected persisted failed review/run: review=%+v runs=%+v", store.review, store.runs)
}
run := store.runs[0]
if run.Status != domain.ReviewRunFailed || run.Verdict != domain.VerdictNone {
t.Fatalf("run = %+v, want failed with no verdict", run)
}
if !strings.Contains(run.Body, "claude") || !strings.Contains(run.Body, ports.ErrAgentBinaryNotFound.Error()) {
t.Fatalf("run body = %q, want launch cause", run.Body)
}
}

func TestTriggerRetriesAfterFailedRunForSameCommit(t *testing.T) {
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
runs: []domain.ReviewRun{{ID: "run-failed", ReviewID: "rev-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunFailed}},
}
launcher := &fakeLauncher{handle: "review-mer-1"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

res, err := eng.Trigger(context.Background(), "mer-1")
if err != nil {
t.Fatalf("Trigger: %v", err)
}
if !res.Created || res.Run.ID == "run-failed" {
t.Fatalf("expected retry to create a new run, got %+v", res)
}
if len(store.runs) != 0 || store.review != nil {
t.Fatalf("nothing should be persisted on launch failure: review=%+v runs=%+v", store.review, store.runs)
if len(store.runs) != 2 || !launcher.spawned {
t.Fatalf("expected new launch/run after failed pass: launched=%v runs=%+v", launcher.spawned, store.runs)
}
}

Expand Down
6 changes: 4 additions & 2 deletions backend/internal/service/review/review.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"context"

"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
reviewcore "github.com/aoagents/agent-orchestrator/backend/internal/review"
)

// ErrInvalid and ErrNotFound re-export the engine sentinels so the HTTP
// controller maps service failures to 422/404 without importing the core.
var (
ErrInvalid = reviewcore.ErrInvalid
ErrNotFound = reviewcore.ErrNotFound
ErrInvalid = reviewcore.ErrInvalid
ErrNotFound = reviewcore.ErrNotFound
ErrAgentBinaryNotFound = ports.ErrAgentBinaryNotFound
)

// Manager is the reviews surface the HTTP controller depends on.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Failed review runs are durable diagnostics, not idempotency winners. Exclude
-- them from the session/SHA unique index so a user can install a missing
-- reviewer harness and retry the same commit while keeping the failed attempt
-- visible in history.

-- +goose Up
-- +goose StatementBegin
DROP INDEX idx_review_run_session_sha;
-- +goose StatementEnd

-- +goose StatementBegin
CREATE UNIQUE INDEX idx_review_run_session_sha
ON review_run (session_id, target_sha)
WHERE target_sha != '' AND status != 'failed';
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP INDEX idx_review_run_session_sha;
-- +goose StatementEnd

-- +goose StatementBegin
DELETE FROM review_run
WHERE target_sha != ''
AND rowid NOT IN (
SELECT rowid FROM (
SELECT rowid,
ROW_NUMBER() OVER (
PARTITION BY session_id, target_sha
ORDER BY CASE status WHEN 'complete' THEN 0 WHEN 'running' THEN 1 ELSE 2 END,
created_at DESC,
rowid DESC
) AS rn
FROM review_run
WHERE target_sha != ''
)
WHERE rn = 1
);
-- +goose StatementEnd

-- +goose StatementBegin
CREATE UNIQUE INDEX idx_review_run_session_sha
ON review_run (session_id, target_sha) WHERE target_sha != '';
-- +goose StatementEnd
9 changes: 9 additions & 0 deletions backend/internal/storage/sqlite/store/review_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ func TestInsertReviewRunDuplicateSHAMapsToSentinel(t *testing.T) {
t.Fatalf("duplicate insert err = %v, want ErrDuplicateReviewRun", err)
}

if ok, err := s.UpdateReviewRunResult(ctx, "run-1", domain.ReviewRunFailed, domain.VerdictNone, "claude: not found"); err != nil {
t.Fatalf("mark failed: %v", err)
} else if !ok {
t.Fatal("mark failed: got ok=false")
}
if err := s.InsertReviewRun(ctx, dup); err != nil {
t.Fatalf("retry after failed insert: %v", err)
}

// An empty target_sha is excluded from the index, so two are allowed.
for _, id := range []string{"run-empty-1", "run-empty-2"} {
r := run
Expand Down
Loading