Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions engineering/sql-storage-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Server Storage Migration: KV → Native DO SQLite

## Summary

This release migrates the server's Durable Object persistence layer from a hand-rolled checkpoint+journal system on top of Cloudflare's KV-style storage API to native DO SQLite tables.

## What changes for users

**Nothing visible.** This is a server-internal storage change. The sync protocol, client plugin, and WebSocket behavior are unchanged. Clients do not need to update.

## What happens on deploy

1. When a vault's Durable Object wakes for the first time after deploy:
- It attempts to load from SQL tables (new path)
- If SQL is empty, it checks for existing KV data (old path)
- If KV data exists, it migrates: loads the full Y.Doc state from KV, writes a clean snapshot to SQL, records a migration marker
- Future loads use SQL exclusively

2. Migration is automatic and transparent. No user action required.

3. Old KV data is preserved after migration (rollback safety). It is NOT auto-deleted.

## Performance impact

- **Cold start:** Faster. Reading a SQL snapshot is a single `SELECT` query instead of batched KV reads + manifest parsing.
- **Saves:** Faster. Journal append is a single `INSERT` instead of a multi-key transactional write.
- **Compaction:** No longer a failure risk. The old system could fail compaction when the journal grew large (transaction size limits). The new system uses `transactionSync` with bounded operations.

## Known limitations

- Journal entries >1.5MB route to full checkpoint write (by design, not a failure)
- Admin debug routes (`/debug/compact`, `/debug/cleanup-kv`) require `YAOS_ENABLE_ADMIN_ROUTES` env var to be set

## Monitoring

The `/__yaos/debug` endpoint now includes a `storage` section:

```json
{
"storage": {
"mode": "sql" | "kv-migrated" | "fresh" | "kv-fallback",
"migrationStatus": "already_sql" | "migrated" | "not_started" | "failed",
"migrationAt": "2026-05-28T...",
"migrationDurationMs": 1234,
"coldLoadDurationMs": 567,
"oversizedDeltaCount": 0,
"migrationMeta": { ... }
}
}
```

## Rollback plan

If SQL storage shows problems after deploy:

### Option A: Revert deploy (recommended)

1. Deploy the previous server version (before this commit)
2. The old code reads from KV, which still has all the data (we don't delete it)
3. Rooms resume from KV state as if nothing happened

### Option B: KV fallback (automatic)

If SQL tables become corrupt (rare, catastrophic), the server automatically:
1. Detects SQL load failure
2. Falls back to reading from KV (if KV data still exists)
3. Sets `storageMode: "kv-fallback"`
4. Operates in **non-durable degraded relay mode:**
- No persistence attempts (saves are skipped)
- No SV echoes sent (clients are not told the server received their state)
- CRDT relay still functions (connected peers can sync through server memory)
- Clients retain local truth via IndexedDB
5. Logs `kv-fallback-activated` trace for operator visibility

In this state, the DO relays sync messages between peers but does not persist or acknowledge durable receipt. Clients retain their data locally. The operator must fix the SQL issue or revert the deploy.

### KV data cleanup

Old KV keys are **not** auto-deleted. To clean them up after confirming SQL is stable:

1. Set `YAOS_ENABLE_ADMIN_ROUTES = "true"` in wrangler.toml
2. Deploy
3. `POST /vault/:id/debug/cleanup-kv` with auth header
4. The endpoint verifies SQL has data before deleting KV keys

Only do this after a successful bake period (72+ hours of normal use with no errors).

## Compatibility

| Component | Compatibility |
|---|---|
| Client plugin | No change needed. Any version works. |
| Server deploy | One-way migration. Deploy is the trigger. |
| Rollback to old server | Safe. KV data preserved. |
| Multiple vault DOs | Each migrates independently on first wake. |
152 changes: 79 additions & 73 deletions server/src/chunkedDocStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ const JOURNAL_META_KEY = "document:journal:meta";
const JOURNAL_MANIFEST_PREFIX = "document:journal:manifest:";
const JOURNAL_CHUNK_PREFIX = "document:journal:chunk:";

const DEFAULT_CHUNK_SIZE_BYTES = 512 * 1024;
const DEFAULT_CHUNK_SIZE_BYTES = 64 * 1024; // 64KB — safe margin under CF DO per-value limit
const DEFAULT_MAX_KEYS_PER_OPERATION = 128;
/** Max keys per put when writing large binary chunks to avoid exceeding per-call size limits. */
const MAX_CHUNK_KEYS_PER_PUT = 16;

interface ManifestPointer {
version: number;
Expand Down Expand Up @@ -225,22 +227,26 @@ async function putChunkedPayloadBatched(
bytes: Uint8Array,
chunkSizeBytes: number,
chunkKeyForIndex: (index: number) => string,
maxKeysPerOperation: number,
_maxKeysPerOperation: number,
): Promise<number> {
const chunkCount = bytes.byteLength === 0
? 0
: Math.ceil(bytes.byteLength / chunkSizeBytes);

if (chunkCount === 0) return 0;

for (let chunkStart = 0; chunkStart < chunkCount; chunkStart += maxKeysPerOperation) {
const chunkEnd = Math.min(chunkStart + maxKeysPerOperation, chunkCount);
// Use a smaller batch size for binary chunk writes to avoid exceeding
// per-call total size limits (e.g., 16 * 64KB = 1MB per put call).
const batchSize = MAX_CHUNK_KEYS_PER_PUT;
for (let chunkStart = 0; chunkStart < chunkCount; chunkStart += batchSize) {
const chunkEnd = Math.min(chunkStart + batchSize, chunkCount);
const record: Record<string, Uint8Array> = {};
for (let i = chunkStart; i < chunkEnd; i++) {
const start = i * chunkSizeBytes;
const end = Math.min(start + chunkSizeBytes, bytes.byteLength);
// subarray avoids eagerly copying chunk bytes into transient arrays.
record[chunkKeyForIndex(i)] = bytes.subarray(start, end);
// .slice() creates an independent copy — avoids issues with
// structured clone of views sharing a large underlying buffer.
record[chunkKeyForIndex(i)] = bytes.slice(start, end);
}
await target.put(record);
}
Expand Down Expand Up @@ -420,85 +426,85 @@ export class ChunkedDocStore {
const updateHash = await sha256Hex(updateBytes);
const stateVectorHash = await sha256Hex(stateVectorBytes);

await this.storage.transaction(async (txn) => {
const cleanupKeys = new Set<string>();
const existingPointerRaw = await txn.get<unknown>(CHECKPOINT_POINTER_KEY);
const existingPointer = existingPointerRaw === undefined
? null
: isManifestPointer(existingPointerRaw)
? existingPointerRaw
: (() => {
throw new Error("checkpoint pointer is invalid");
})();

if (existingPointer) {
const oldManifestKey = checkpointManifestKey(existingPointer.version);
const oldManifestRaw = await txn.get<unknown>(oldManifestKey);
if (!isChunkedManifest(oldManifestRaw)) {
throw new Error(`checkpoint manifest missing or invalid for version ${existingPointer.version}`);
}
cleanupKeys.add(oldManifestKey);
// ── Pre-compute: determine new version and collect cleanup keys ──────
const existingPointerRaw = await this.storage.get<unknown>(CHECKPOINT_POINTER_KEY);
const existingPointer = existingPointerRaw === undefined
? null
: isManifestPointer(existingPointerRaw)
? existingPointerRaw
: (() => { throw new Error("checkpoint pointer is invalid"); })();

const newVersion = existingPointer ? existingPointer.version + 1 : 1;

// Collect cleanup keys (old checkpoint + journal entries)
const cleanupKeys = new Set<string>();
if (existingPointer) {
const oldManifestRaw = await this.storage.get<unknown>(checkpointManifestKey(existingPointer.version));
if (isChunkedManifest(oldManifestRaw)) {
cleanupKeys.add(checkpointManifestKey(existingPointer.version));
cleanupKeys.add(checkpointStateVectorKey(existingPointer.version));
for (let i = 0; i < oldManifestRaw.chunkCount; i++) {
cleanupKeys.add(checkpointChunkKey(oldManifestRaw.version, i));
}
}
}

const journalMeta = await this.readJournalMeta(txn);
if (journalMeta.entryCount > 0) {
const seqs = expectedJournalSeqs(journalMeta);
const manifestKeys = seqs.map((seq) => journalManifestKey(seq));
const manifestMap = await getManyBatched<unknown>(txn, manifestKeys, this.maxKeysPerOperation);
if (manifestMap.size !== manifestKeys.length) {
throw new Error(
`journal compact failed: expected ${manifestKeys.length} manifests, found ${manifestMap.size}`,
);
}
for (const seq of seqs) {
const key = journalManifestKey(seq);
const manifestRaw = manifestMap.get(key);
if (!isJournalEntryManifest(manifestRaw) || manifestRaw.seq !== seq) {
throw new Error(`journal compact failed: invalid manifest for seq ${seq}`);
}
cleanupKeys.add(key);
for (let i = 0; i < manifestRaw.chunkCount; i++) {
cleanupKeys.add(journalChunkKey(seq, i));
}
}
const journalMeta = await this.readJournalMeta(this.storage);
if (journalMeta.entryCount > 0) {
const seqs = expectedJournalSeqs(journalMeta);
for (const seq of seqs) {
cleanupKeys.add(journalManifestKey(seq));
cleanupKeys.add(journalChunkKey(seq, 0));
}
}

const newVersion = existingPointer
? existingPointer.version + 1
: 1;
const now = new Date().toISOString();
const chunkCount = await putChunkedPayloadBatched(
// ── Phase 1: Write new checkpoint chunks (non-transactional) ─────────
// These keys are not referenced by anything yet — safe to write outside
// a transaction. If we crash here, they're orphaned (harmless).
const chunkCount = await putChunkedPayloadBatched(
this.storage,
updateBytes,
this.chunkSizeBytes,
(i) => checkpointChunkKey(newVersion, i),
this.maxKeysPerOperation,
);

// ── Phase 2: Atomic pointer swap (small transaction) ─────────────────
// This is the critical section. It atomically:
// - Publishes the new checkpoint (manifest + pointer)
// - Resets the journal to empty
// If this fails, the chunks from Phase 1 are orphaned (harmless).
const now = new Date().toISOString();
const manifest: CheckpointManifest = {
format: CHECKPOINT_FORMAT,
version: newVersion,
chunkSizeBytes: this.chunkSizeBytes,
chunkCount,
byteLength: updateBytes.byteLength,
sha256: updateHash,
stateVectorByteLength: stateVectorBytes.byteLength,
stateVectorSha256: stateVectorHash,
updatedAt: now,
};

await this.storage.transaction(async (txn) => {
await putEntriesBatched(
txn,
updateBytes,
this.chunkSizeBytes,
(i) => checkpointChunkKey(newVersion, i),
[
[checkpointManifestKey(newVersion), manifest],
[checkpointStateVectorKey(newVersion), stateVectorBytes],
[CHECKPOINT_POINTER_KEY, { version: newVersion } satisfies ManifestPointer],
[JOURNAL_META_KEY, emptyJournalMeta(now)],
],
this.maxKeysPerOperation,
);

const entries: Array<[string, unknown]> = [];
const manifest: CheckpointManifest = {
format: CHECKPOINT_FORMAT,
version: newVersion,
chunkSizeBytes: this.chunkSizeBytes,
chunkCount,
byteLength: updateBytes.byteLength,
sha256: updateHash,
stateVectorByteLength: stateVectorBytes.byteLength,
stateVectorSha256: stateVectorHash,
updatedAt: now,
};
entries.push([checkpointManifestKey(newVersion), manifest]);
entries.push([checkpointStateVectorKey(newVersion), stateVectorBytes]);
entries.push([CHECKPOINT_POINTER_KEY, { version: newVersion } satisfies ManifestPointer]);
entries.push([JOURNAL_META_KEY, emptyJournalMeta(now)]);

await putEntriesBatched(txn, entries, this.maxKeysPerOperation);
await deleteKeysBatched(txn, Array.from(cleanupKeys), this.maxKeysPerOperation);
});

// ── Phase 3: Non-transactional cleanup ───────────────────────────────
// Delete orphaned keys. Safe to fail — they're unreachable.
if (cleanupKeys.size > 0) {
await deleteKeysBatched(this.storage, Array.from(cleanupKeys), this.maxKeysPerOperation);
}
}

async loadLatest(): Promise<Uint8Array | null> {
Expand Down
11 changes: 9 additions & 2 deletions server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { corsPreflight, html, json, withCors } from "./routes/http";
import { handleSnapshotRoute } from "./routes/snapshots";
import { handleSyncSocketRoute, parseSyncPath } from "./routes/syncSocket";
import { handleTicketRoute } from "./routes/ticket";
import { fetchVaultDebug, fetchVaultDocument, recordVaultTrace } from "./routes/trace";
import { fetchVaultDebug, fetchVaultDocument, recordVaultTrace, compactVault, cleanupVaultKv } from "./routes/trace";
import type { AuthState, AuthStateCached, Env } from "./routes/types";

const LOG_PREFIX = "[yaos-sync:worker]";
Expand Down Expand Up @@ -130,7 +130,10 @@ function isKnownVaultRouteShape(method: string, resource: string, rest: string[]
return method === "POST" && rest.length === 1 && rest[0] === "ticket";

case "debug":
return method === "GET" && rest.length === 1 && rest[0] === "recent";
if (method === "GET" && rest.length === 1 && rest[0] === "recent") return true;
if (method === "POST" && rest.length === 1 && rest[0] === "compact") return true;
if (method === "POST" && rest.length === 1 && rest[0] === "cleanup-kv") return true;
return false;

case "blobs": {
if (rest.length !== 1) return false;
Expand Down Expand Up @@ -404,6 +407,10 @@ const worker = {
response = withCors(authFailure);
} else if (resource === "debug" && req.method === "GET" && rest[0] === "recent") {
response = withCors(await fetchVaultDebug(env, vaultId));
} else if (resource === "debug" && req.method === "POST" && rest[0] === "compact") {
response = withCors(await compactVault(env, vaultId));
} else if (resource === "debug" && req.method === "POST" && rest[0] === "cleanup-kv") {
response = withCors(await cleanupVaultKv(env, vaultId));
} else if (resource === "auth" && rest[0] === "ticket" && req.method === "POST") {
response = withCors(await handleTicketRoute(req, authState, vaultId, json, env));
} else if (resource === "blobs") {
Expand Down
Loading
Loading