Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
- Added `--include-dirty` to review, CI, and revalidation file filters for auditing uncommitted worktree changes, thanks @AsishKumarDalal.
- Fixed Bun package-manager detection to recognize the text `bun.lock` lockfile, thanks @austinm911.
- Fixed review-output schema to tolerate optional `reproduction` and `minimumFixScope` fields and zero-valued evidence line numbers (normalized to `null`), recovering 4 of 28 zod issue patterns observed in run `20260517T190759-3c9e9e` (78 errors over 1000 features) that previously dropped whole-feature output instead of the affected finding.
- Changed `clawpatch review --jobs` and `clawpatch ci --jobs` defaults from a fixed `10` to `floor(cpuCores / 2)` clamped to `[1, 10]`, thanks @coletebou.
- Added `clawpatch review --rate-limit-per-minute <n>` (also `CLAWPATCH_RPM`) to cap how many provider calls may start within any rolling 60s window across jobs, thanks @coletebou.

## 0.3.0 - 2026-05-18

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ Useful flags:
- `--json`
- `--plain`
- `--limit <n>`
- `--jobs <n>`
- `--jobs <n>` (default: half of CPU cores, max 10)
- `--rate-limit-per-minute <n>`
- `--source <heuristic|auto|agent>`
- `--feature <id>`
- `--project <name-or-root>`
Expand Down
3 changes: 2 additions & 1 deletion docs/code-review.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ Current behavior:

- selects pending features unless `--feature` is set
- claims each feature with an atomic lock file plus the feature run lock
- reviews with a bounded worker pool; default `--jobs` is `10`
- reviews with a bounded worker pool; default `--jobs` is half of CPU cores, max 10
- caps provider call starts with `--rate-limit-per-minute <n>` or `CLAWPATCH_RPM`
- emits progress to stderr unless `--quiet` is set
- builds bounded prompt context from owned files, context files, and tests
- includes a prompt context manifest with included files, omitted files, byte
Expand Down
25 changes: 25 additions & 0 deletions src/app.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,31 @@ describe("runProviderReviewWithRetry", () => {
expect(review).toHaveBeenCalledTimes(2);
});

it("acquires the RPM limiter before each retry attempt", async () => {
delete process.env["CLAWPATCH_REVIEW_RETRIES"];
const review = vi
.fn()
.mockRejectedValueOnce(new ClawpatchError("garbled", 8, "malformed-output"))
.mockResolvedValueOnce(emptyReview());
const acquire = vi.fn().mockResolvedValue(undefined);
const provider = fakeProvider(review);
await runProviderReviewWithRetry({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
provider: provider as any,
root: "/tmp",
prompt: "hi",
// eslint-disable-next-line @typescript-eslint/no-explicit-any
options: {} as any,
context: QUIET_CONTEXT,
featureId: "feat_x",
index: 0,
total: 1,
limiter: { acquire },
});
expect(review).toHaveBeenCalledTimes(2);
expect(acquire).toHaveBeenCalledTimes(2);
});

it("does NOT retry on provider-auth", async () => {
delete process.env["CLAWPATCH_REVIEW_RETRIES"];
const err = new ClawpatchError("auth", 4, "provider-auth");
Expand Down
39 changes: 28 additions & 11 deletions src/app.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { appendFile, lstat, readFile, realpath, writeFile } from "node:fs/promises";
import { join, relative, resolve } from "node:path";
import { hostname } from "node:os";
import { cpus, hostname } from "node:os";
import {
changedPathsBetweenSnapshots,
hasSourceDirtyWorktree,
Expand Down Expand Up @@ -74,6 +74,7 @@ import {
reasoningEfforts,
} from "./types.js";
import { validationCommandsForFeature } from "./validation.js";
import { createRpmLimiter, defaultJobs, rpmFromFlag, type RpmLimiter } from "./rpm-limiter.js";

export type AppContext = {
root: string;
Expand Down Expand Up @@ -315,6 +316,9 @@ export async function reviewCommand(
error: unknown;
}> = [];
const jobs = Math.min(reviewJobs(flags), Math.max(features.length, 1));
const limiter = createRpmLimiter(
rpmFromFlag(stringFlag(flags, "rateLimitPerMinute"), process.env["CLAWPATCH_RPM"]),
);
let cursor = 0;
emitProgress(context, "review", "start", {
run: currentRunId,
Expand Down Expand Up @@ -342,6 +346,7 @@ export async function reviewCommand(
total: features.length,
mode,
customPrompt,
limiter,
allowNonPendingFeatureReview: stringFlag(flags, "feature") !== undefined,
});
findingIds.push(...reviewed.findingIds);
Expand Down Expand Up @@ -651,6 +656,7 @@ type ReviewFeatureOptions = {
total: number;
mode: ReviewMode;
customPrompt: string | null;
limiter: RpmLimiter;
allowNonPendingFeatureReview: boolean;
};

Expand All @@ -668,6 +674,7 @@ async function reviewFeature(
total,
mode,
customPrompt,
limiter,
allowNonPendingFeatureReview,
} = options;
const started = Date.now();
Expand Down Expand Up @@ -705,6 +712,7 @@ async function reviewFeature(
featureId: feature.featureId,
index,
total,
limiter,
});
// Layer 1 drops: per-finding schema violations from parseReviewOutput.
const droppedFindings: DroppedFinding[] = [...providerOutput.droppedFindings];
Expand Down Expand Up @@ -817,12 +825,14 @@ async function runProviderReviewWithRetry(args: {
featureId: string;
index: number;
total: number;
limiter?: RpmLimiter;
}): Promise<ProviderReviewOutput> {
const { provider, root, prompt, options, context, featureId, index, total } = args;
const { provider, root, prompt, options, context, featureId, index, total, limiter } = args;
const maxAttempts = 1 + reviewRetries();
let lastError: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
await limiter?.acquire();
return await provider.review(root, prompt, options);
} catch (error: unknown) {
lastError = error;
Expand Down Expand Up @@ -1403,7 +1413,7 @@ function reviewFlagSubset(
flags: Record<string, string | boolean>,
): Record<string, string | boolean> {
const subset = providerFlagSubset(flags);
for (const flag of ["since", "limit", "jobs"] as const) {
for (const flag of ["since", "limit", "jobs", "rateLimitPerMinute"] as const) {
const value = stringFlag(flags, flag);
if (value !== undefined) {
subset[flag] = value;
Expand Down Expand Up @@ -2011,6 +2021,21 @@ async function filterFindingsByOwnedFilesSince(
return filterFindingsByChangedOwnedFiles(findings, features, changed);
}

export function reviewJobs(
flags: Record<string, string | boolean>,
coreCount: number = cpus().length,
): number {
const explicit = stringFlag(flags, "jobs");
if (explicit !== undefined) {
const parsed = Number(explicit);
if (!Number.isFinite(parsed) || parsed < 1) {
return 1;
}
return Math.min(Math.floor(parsed), 32);
}
return defaultJobs(coreCount);
}

async function changedFiles(
root: string,
flags: Record<string, string | boolean>,
Expand All @@ -2034,14 +2059,6 @@ function hasFileFilter(flags: Record<string, string | boolean>): boolean {
return stringFlag(flags, "since") !== undefined || flags["includeDirty"] === true;
}

function reviewJobs(flags: Record<string, string | boolean>): number {
const parsed = Number(stringFlag(flags, "jobs") ?? "10");
if (!Number.isFinite(parsed) || parsed < 1) {
return 1;
}
return Math.min(Math.floor(parsed), 32);
}

function reviewMode(flags: Record<string, string | boolean>): ReviewMode {
const mode = stringFlag(flags, "mode") ?? "default";
if (mode === "default" || mode === "deslopify") {
Expand Down
9 changes: 7 additions & 2 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ const commandFlags = {
"since",
"jobs",
"mode",
"rateLimitPerMinute",
"provider",
"model",
"reasoningEffort",
Expand All @@ -174,6 +175,7 @@ const commandFlags = {
"limit",
"since",
"jobs",
"rateLimitPerMinute",
"provider",
"model",
"reasoningEffort",
Expand Down Expand Up @@ -224,6 +226,7 @@ const valueFlagNames = new Set([
"since",
"jobs",
"mode",
"rate-limit-per-minute",
"source",
"provider",
"model",
Expand Down Expand Up @@ -415,8 +418,9 @@ Flags:
--limit <n>
--since <ref>
--include-dirty
--jobs <n> default: 10
--jobs <n> default: ~half of CPU cores, max 10
--mode <default|deslopify>
--rate-limit-per-minute <n> cap provider calls per 60s window (env: CLAWPATCH_RPM)
--provider <name>
--model <name>
--reasoning-effort <none|minimal|low|medium|high|xhigh>
Expand Down Expand Up @@ -462,7 +466,8 @@ Flags:
--since <ref>
--include-dirty
--limit <n>
--jobs <n> default: 10
--jobs <n> default: ~half of CPU cores, max 10
--rate-limit-per-minute <n> cap provider calls per 60s window (env: CLAWPATCH_RPM)
--provider <name>
--model <name>
--reasoning-effort <none|minimal|low|medium|high|xhigh>
Expand Down
25 changes: 25 additions & 0 deletions src/review-jobs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { describe, expect, it } from "vitest";
import { reviewJobs } from "./app.js";

describe("reviewJobs", () => {
it("defaults to floor(cores / 2) capped at 10 when --jobs is not given", () => {
expect(reviewJobs({}, 4)).toBe(2);
expect(reviewJobs({}, 8)).toBe(4);
expect(reviewJobs({}, 32)).toBe(10);
expect(reviewJobs({}, 1)).toBe(1);
});

it("honors explicit --jobs value", () => {
expect(reviewJobs({ jobs: "7" }, 32)).toBe(7);
expect(reviewJobs({ jobs: "1" }, 32)).toBe(1);
});

it("caps explicit --jobs at 32", () => {
expect(reviewJobs({ jobs: "100" }, 4)).toBe(32);
});

it("treats invalid explicit --jobs as 1", () => {
expect(reviewJobs({ jobs: "abc" }, 8)).toBe(1);
expect(reviewJobs({ jobs: "0" }, 8)).toBe(1);
});
});
116 changes: 116 additions & 0 deletions src/rpm-limiter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { describe, expect, it } from "vitest";
import { createRpmLimiter, defaultJobs, rpmFromFlag } from "./rpm-limiter.js";

type ScheduledTimeout = { handler: () => void; runAt: number };

function makeFakeClock() {
let nowMs = 0;
const pending: ScheduledTimeout[] = [];
return {
clock: {
now: () => nowMs,
setTimeout: (handler: () => void, ms: number) => {
pending.push({ handler, runAt: nowMs + ms });
},
},
advance(ms: number): void {
nowMs += ms;
// Fire any timeouts due at or before the current time, in scheduled order.
while (true) {
const due = pending.findIndex((entry) => entry.runAt <= nowMs);
if (due === -1) {
return;
}
const [entry] = pending.splice(due, 1);
entry?.handler();
}
},
setNow(ms: number): void {
nowMs = ms;
},
pending: () => pending.length,
};
}

describe("defaultJobs", () => {
it("returns floor(cores / 2) capped at 10", () => {
expect(defaultJobs(4)).toBe(2);
expect(defaultJobs(8)).toBe(4);
expect(defaultJobs(32)).toBe(10);
});

it("clamps to a minimum of 1", () => {
expect(defaultJobs(1)).toBe(1);
expect(defaultJobs(0)).toBe(1);
expect(defaultJobs(Number.NaN)).toBe(1);
});
});

describe("rpmFromFlag", () => {
it("prefers explicit flag over env", () => {
expect(rpmFromFlag("30", "60")).toBe(30);
});

it("falls back to env when flag is missing", () => {
expect(rpmFromFlag(undefined, "45")).toBe(45);
});

it("returns undefined when neither is set", () => {
expect(rpmFromFlag(undefined, undefined)).toBeUndefined();
expect(rpmFromFlag("", "")).toBeUndefined();
});

it("returns undefined for invalid values", () => {
expect(rpmFromFlag("abc", undefined)).toBeUndefined();
expect(rpmFromFlag("0", undefined)).toBeUndefined();
expect(rpmFromFlag("-5", undefined)).toBeUndefined();
});
});

describe("createRpmLimiter", () => {
it("is a no-op when limit is undefined", async () => {
const limiter = createRpmLimiter(undefined);
for (let i = 0; i < 100; i += 1) {
await limiter.acquire();
}
});

it("is a no-op when limit is invalid", async () => {
const limiter = createRpmLimiter(0);
await limiter.acquire();
const limiter2 = createRpmLimiter(Number.NaN);
await limiter2.acquire();
});

it("allows up to N starts in a 60s window without delay", async () => {
const fake = makeFakeClock();
const limiter = createRpmLimiter(3, fake.clock);
await limiter.acquire();
await limiter.acquire();
await limiter.acquire();
expect(fake.pending()).toBe(0);
});

it("delays the (N+1)th call until the oldest slot expires", async () => {
const fake = makeFakeClock();
const limiter = createRpmLimiter(2, fake.clock);
await limiter.acquire();
fake.advance(10_000);
await limiter.acquire();

let resolved = false;
const pending = limiter.acquire().then(() => {
resolved = true;
});

// Allow the chain to evaluate `step` and schedule its setTimeout.
await Promise.resolve();
await Promise.resolve();
expect(resolved).toBe(false);

// First slot was at t=0, window is 60s, so the next slot opens at t=60_000.
fake.advance(50_000);
await pending;
expect(resolved).toBe(true);
});
});
Loading