Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker-compose.user.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ services:
- phantom_data:/app/data
- phantom_public:/app/public
- phantom_repos:/app/repos
# Claude Code credentials (persists `claude login` across restarts so
# subscription users do not have to re-authenticate on every upgrade).
- phantom_claude:/home/phantom/.claude
# Docker socket lets the agent create sibling containers on the host.
# This is required for code execution and development tasks.
- /var/run/docker.sock:/var/run/docker.sock
Expand Down Expand Up @@ -114,6 +117,7 @@ volumes:
phantom_data:
phantom_public:
phantom_repos:
phantom_claude:
qdrant_data:
ollama_data:

Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ services:
- phantom_data:/app/data
- phantom_public:/app/public
- phantom_repos:/app/repos
# Claude Code credentials (persists `claude login` across restarts so
# subscription users do not have to re-authenticate on every upgrade)
- phantom_claude:/home/phantom/.claude
# Docker socket for sibling container creation
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
Expand Down Expand Up @@ -105,6 +108,7 @@ volumes:
phantom_data:
phantom_public:
phantom_repos:
phantom_claude:
qdrant_data:
ollama_data:

Expand Down
16 changes: 15 additions & 1 deletion src/agent/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ export class AgentRuntime {
return this.config;
}

/**
* Peek whether a session key is currently executing. The scheduler uses
* this to avoid even calling handleMessage when a prior execution of the
* same job is still in flight (Phase 2.5 C2 braces layer). Direct callers
* outside the scheduler still see the belt layer: an Error-shaped return.
*/
isSessionBusy(channelId: string, conversationId: string): boolean {
return this.activeSessions.has(`${channelId}:${conversationId}`);
}

async handleMessage(
channelId: string,
conversationId: string,
Expand All @@ -80,8 +90,12 @@ export class AgentRuntime {
const startTime = Date.now();

if (this.activeSessions.has(sessionKey)) {
// Belt layer for C2: return a loud, parseable Error so direct callers
// (router, trigger, secret save) stop treating the bounce as success.
// The scheduler adds its own braces layer via isSessionBusy.
console.warn(`[runtime] Session busy, bouncing concurrent message: ${sessionKey}`);
return {
text: "I'm still working on your previous message. Please wait.",
text: "Error: session busy (previous execution still running)",
sessionId: "",
cost: emptyCost(),
durationMs: 0,
Expand Down
9 changes: 9 additions & 0 deletions src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { AuthMiddleware } from "../mcp/auth.ts";
import { loadMcpConfig } from "../mcp/config.ts";
import type { PhantomMcpServer } from "../mcp/server.ts";
import type { MemoryHealth } from "../memory/types.ts";
import type { SchedulerHealthSummary } from "../scheduler/health.ts";
import { handleUiRequest } from "../ui/serve.ts";

const VERSION = "0.18.2";
Expand All @@ -17,6 +18,7 @@ type RoleInfoProvider = () => { id: string; name: string } | null;
type OnboardingStatusProvider = () => string;
type WebhookHandler = (req: Request) => Promise<Response>;
type PeerHealthProvider = () => Record<string, { healthy: boolean; latencyMs: number; error?: string }>;
type SchedulerHealthProvider = () => SchedulerHealthSummary | null;
type TriggerDeps = {
runtime: AgentRuntime;
slackChannel?: SlackChannel;
Expand All @@ -31,6 +33,7 @@ let roleInfoProvider: RoleInfoProvider | null = null;
let onboardingStatusProvider: OnboardingStatusProvider | null = null;
let webhookHandler: WebhookHandler | null = null;
let peerHealthProvider: PeerHealthProvider | null = null;
let schedulerHealthProvider: SchedulerHealthProvider | null = null;
let triggerDeps: TriggerDeps | null = null;

export function setMemoryHealthProvider(provider: MemoryHealthProvider): void {
Expand Down Expand Up @@ -65,6 +68,10 @@ export function setPeerHealthProvider(provider: PeerHealthProvider): void {
peerHealthProvider = provider;
}

export function setSchedulerHealthProvider(provider: SchedulerHealthProvider): void {
schedulerHealthProvider = provider;
}

export function setTriggerDeps(deps: TriggerDeps): void {
triggerDeps = deps;
}
Expand Down Expand Up @@ -97,6 +104,7 @@ export function startServer(config: PhantomConfig, startedAt: number): ReturnTyp

const onboardingStatus = onboardingStatusProvider ? onboardingStatusProvider() : null;
const peers = peerHealthProvider ? peerHealthProvider() : null;
const scheduler = schedulerHealthProvider ? schedulerHealthProvider() : null;

return Response.json({
status,
Expand All @@ -112,6 +120,7 @@ export function startServer(config: PhantomConfig, startedAt: number): ReturnTyp
},
...(onboardingStatus ? { onboarding: onboardingStatus } : {}),
...(peers && Object.keys(peers).length > 0 ? { peers } : {}),
...(scheduler ? { scheduler } : {}),
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/db/__tests__/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("runMigrations", () => {
runMigrations(db);

const migrationCount = db.query("SELECT COUNT(*) as count FROM _migrations").get() as { count: number };
expect(migrationCount.count).toBe(9);
expect(migrationCount.count).toBe(10);
});

test("tracks applied migration indices", () => {
Expand All @@ -47,6 +47,6 @@ describe("runMigrations", () => {
.all()
.map((r) => (r as { index_num: number }).index_num);

expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8]);
expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
});
});
6 changes: 6 additions & 0 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,10 @@ export const MIGRATIONS: string[] = [
expires_at TEXT NOT NULL,
completed_at TEXT
)`,

// Phase 2.5 scheduler hardening: record whether the last delivery attempt
// actually made it to Slack. null = never delivered, "delivered" = sent,
// "dropped:<reason>" = skipped at the delivery branch, "error:<reason>" =
// Slack threw during send. Existing rows keep null on migration.
"ALTER TABLE scheduled_jobs ADD COLUMN last_delivery_status TEXT",
];
12 changes: 9 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
setOnboardingStatusProvider,
setPeerHealthProvider,
setRoleInfoProvider,
setSchedulerHealthProvider,
setTriggerDeps,
setWebhookHandler,
startServer,
Expand Down Expand Up @@ -177,6 +178,7 @@ async function main(): Promise<void> {

// Wire scheduler into the agent (Slack channel set later after channel init)
scheduler = new Scheduler({ db, runtime });
setSchedulerHealthProvider(() => scheduler?.getHealthSummary() ?? null);

// Pass factories (not singletons) so each query() gets fresh MCP server instances.
// The underlying registries (DynamicToolRegistry, Scheduler) are singletons.
Expand Down Expand Up @@ -593,9 +595,13 @@ async function main(): Promise<void> {

await router.connectAll();

// Wire Slack into scheduler and /trigger now that channels are connected
if (scheduler && slackChannel && channelsConfig?.slack?.owner_user_id) {
scheduler.setSlackChannel(slackChannel, channelsConfig.slack.owner_user_id);
// Wire Slack into scheduler and /trigger now that channels are connected.
// The owner_user_id gate was removed in Phase 2.5 (C3): channel-id and
// user-id delivery targets do not need the owner; only target="owner"
// does, and the scheduler's delivery path records a loud "dropped" status
// in that specific case instead of silently no-oping every job.
if (scheduler && slackChannel) {
scheduler.setSlackChannel(slackChannel, channelsConfig?.slack?.owner_user_id ?? null);
}
if (scheduler) {
await scheduler.start();
Expand Down
Loading
Loading