From 750b2fcb8b43d715af4b920b72df3c312f5c88c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Sun, 28 Jun 2026 19:43:22 +0200 Subject: [PATCH 1/7] feat: support resumable proxy uploads --- src/__tests__/daemon-proxy.test.ts | 120 ++++++++ src/__tests__/upload-client.test.ts | 80 ++++++ src/daemon-proxy-upload.ts | 77 +++++ src/daemon-proxy.ts | 23 +- src/daemon/http-errors.ts | 23 ++ src/daemon/http-server.ts | 82 ++---- src/daemon/resumable-upload.ts | 270 ++++++++++++++++++ src/daemon/upload-http.ts | 259 +++++++++++++++++ src/upload-client.ts | 74 ++++- src/utils/cli-command-overrides.ts | 2 +- .../daemon-http-server.test.ts | 92 +++++- website/docs/docs/remote-proxy.md | 2 +- 12 files changed, 1020 insertions(+), 84 deletions(-) create mode 100644 src/daemon-proxy-upload.ts create mode 100644 src/daemon/http-errors.ts create mode 100644 src/daemon/resumable-upload.ts create mode 100644 src/daemon/upload-http.ts diff --git a/src/__tests__/daemon-proxy.test.ts b/src/__tests__/daemon-proxy.test.ts index 85bd88df4..480874d98 100644 --- a/src/__tests__/daemon-proxy.test.ts +++ b/src/__tests__/daemon-proxy.test.ts @@ -1,5 +1,6 @@ import { test } from 'vitest'; import assert from 'node:assert/strict'; +import crypto from 'node:crypto'; import http from 'node:http'; import { createDaemonProxyServer } from '../daemon-proxy.ts'; import { DAEMON_RPC_PROTOCOL_VERSION } from '../daemon/http-health.ts'; @@ -238,3 +239,122 @@ test('daemon proxy streams uploads and artifact downloads with upstream daemon t await closeLoopbackServer(upstream); } }); + +test('daemon proxy forwards resumable upload routes and rewrites direct upload tickets', async (t) => { + if (await skipWhenLoopbackUnavailable(t)) return; + + let directAuth = ''; + let directTokenHeader = ''; + let directContentRange = ''; + let directBody = ''; + let finalizeAuth = ''; + const upstream = http.createServer((req, res) => { + if (req.method === 'GET' && req.url === '/health') { + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true })); + return; + } + if (req.method === 'POST' && req.url === '/upload/preflight') { + res.setHeader('content-type', 'application/json'); + res.end( + JSON.stringify({ + ok: true, + cacheHit: false, + uploadId: 'upload-1', + upload: { + url: 'http://127.0.0.1:65535/upload/direct/upload-1', + headers: { + authorization: 'Bearer daemon-secret', + 'x-agent-device-token': 'daemon-secret', + 'content-type': 'application/octet-stream', + }, + }, + }), + ); + return; + } + if (req.method === 'PUT' && req.url === '/upload/direct/upload-1') { + directAuth = String(req.headers.authorization ?? ''); + directTokenHeader = String(req.headers['x-agent-device-token'] ?? ''); + directContentRange = String(req.headers['content-range'] ?? ''); + req.setEncoding('utf8'); + req.on('data', (chunk) => { + directBody += chunk; + }); + req.on('end', () => { + res.statusCode = 200; + res.end('ok'); + }); + return; + } + if (req.method === 'POST' && req.url === '/upload/finalize') { + finalizeAuth = String(req.headers.authorization ?? ''); + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); + return; + } + res.statusCode = 404; + res.end('not found'); + }); + const proxy = createDaemonProxyServer({ + upstreamBaseUrl: `http://127.0.0.1:${await listenOnLoopback(upstream)}`, + upstreamToken: 'daemon-secret', + clientToken: 'proxy-secret', + }); + + try { + const proxyPort = await listenOnLoopback(proxy); + const preflight = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/preflight`, { + method: 'POST', + headers: { + authorization: 'Bearer proxy-secret', + 'content-type': 'application/json', + }, + body: JSON.stringify({ + sha256: crypto.createHash('sha256').update('resumed').digest('hex'), + fileName: 'demo.apk', + sizeBytes: 7, + artifactType: 'file', + }), + }); + assert.equal(preflight.status, 200); + const preflightBody = (await preflight.json()) as { + upload?: { url?: string; headers?: Record }; + }; + assert.match( + preflightBody.upload?.url ?? '', + new RegExp(`^http://127\\.0\\.0\\.1:${proxyPort}/agent-device/upload/direct/upload-1$`), + ); + assert.equal(preflightBody.upload?.headers?.authorization, 'Bearer proxy-secret'); + assert.equal(preflightBody.upload?.headers?.['x-agent-device-token'], 'proxy-secret'); + + const direct = await fetch(preflightBody.upload?.url ?? '', { + method: 'PUT', + headers: { + ...(preflightBody.upload?.headers ?? {}), + 'content-range': 'bytes 3-6/7', + }, + body: Buffer.from('umed'), + }); + assert.equal(direct.status, 200); + assert.equal(directAuth, 'Bearer daemon-secret'); + assert.equal(directTokenHeader, 'daemon-secret'); + assert.equal(directContentRange, 'bytes 3-6/7'); + assert.equal(directBody, 'umed'); + + const finalize = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/finalize`, { + method: 'POST', + headers: { + authorization: 'Bearer proxy-secret', + 'content-type': 'application/json', + }, + body: JSON.stringify({ uploadId: 'upload-1' }), + }); + assert.equal(finalize.status, 200); + assert.deepEqual(await finalize.json(), { ok: true, uploadId: 'tracked-upload-1' }); + assert.equal(finalizeAuth, 'Bearer daemon-secret'); + } finally { + await closeLoopbackServer(proxy); + await closeLoopbackServer(upstream); + } +}); diff --git a/src/__tests__/upload-client.test.ts b/src/__tests__/upload-client.test.ts index f6627901a..14152d40c 100644 --- a/src/__tests__/upload-client.test.ts +++ b/src/__tests__/upload-client.test.ts @@ -607,6 +607,86 @@ test('uploadArtifact resumes a direct upload from the server-reported offset', a } }); +test('uploadArtifact re-preflights and resumes after an interrupted direct upload', async () => { + const content = 'direct-upload-interrupted-payload'; + const artifactPath = createTempFile('app.apk', content); + const resumeOffset = 8; + const requests: string[] = []; + let preflightAttempts = 0; + let uploadAttempts = 0; + let resumedUploadBody = ''; + + const server = await startServer(async (req, res) => { + requests.push(`${req.method} ${req.url}`); + if (req.method === 'POST' && req.url === '/upload/preflight') { + preflightAttempts += 1; + await readRequestBody(req); + sendJson(res, { + ok: true, + cacheHit: false, + uploadId: 'resume-after-error-ticket', + upload: { + url: `${server.baseUrl}/resumable-upload-after-error`, + headers: { + 'x-signed-ticket': 'resume-after-error-ticket-header', + }, + }, + }); + return; + } + if (req.method === 'PUT' && req.url === '/resumable-upload-after-error') { + uploadAttempts += 1; + if (uploadAttempts === 1) { + req.destroy(new Error('simulated low-connectivity interruption')); + return; + } + if (uploadAttempts === 2) { + assert.equal(req.headers['content-range'], undefined); + res.statusCode = 308; + res.setHeader('x-upload-offset', String(resumeOffset)); + res.end(); + return; + } + + assert.equal( + req.headers['content-range'], + `bytes ${resumeOffset}-${Buffer.byteLength(content) - 1}/${Buffer.byteLength(content)}`, + ); + resumedUploadBody = (await readRequestBody(req)).toString('utf8'); + res.statusCode = 200; + res.end('ok'); + return; + } + if (req.method === 'POST' && req.url === '/upload/finalize') { + sendJson(res, { ok: true, uploadId: 'upload-resumed-after-error' }); + return; + } + res.statusCode = 404; + res.end('not found'); + }); + + try { + const uploadId = await uploadArtifact({ + localPath: artifactPath, + baseUrl: server.baseUrl, + token: TEST_TOKEN, + }); + assert.equal(uploadId, 'upload-resumed-after-error'); + assert.equal(preflightAttempts, 2); + assert.equal(resumedUploadBody, content.slice(resumeOffset)); + assert.deepEqual(requests, [ + 'POST /upload/preflight', + 'PUT /resumable-upload-after-error', + 'POST /upload/preflight', + 'PUT /resumable-upload-after-error', + 'PUT /resumable-upload-after-error', + 'POST /upload/finalize', + ]); + } finally { + await server.close(); + } +}); + test('uploadArtifact preflights and legacy-uploads compressed app bundle directories', async () => { const tempRoot = createTempDir(); const appPath = path.join(tempRoot, 'Sample.app'); diff --git a/src/daemon-proxy-upload.ts b/src/daemon-proxy-upload.ts new file mode 100644 index 000000000..401db9cbe --- /dev/null +++ b/src/daemon-proxy-upload.ts @@ -0,0 +1,77 @@ +import type { IncomingMessage } from 'node:http'; +import { buildDaemonHttpAuthHeaders } from './daemon/http-contract.ts'; + +export function isSupportedProxyUploadRoute(route: string, method: string | undefined): boolean { + if (route === '/upload') return method === 'POST'; + if (route === '/upload/preflight') return method === 'POST'; + if (route === '/upload/finalize') return method === 'POST'; + if (route.startsWith('/upload/direct/')) return method === 'PUT'; + return false; +} + +export function shouldRewriteUploadProxyResponse(route: string): boolean { + return route === '/upload/preflight'; +} + +export function rewriteUploadPreflightResponse( + body: string, + req: IncomingMessage, + clientToken: string, +): string { + let parsed: unknown; + try { + parsed = JSON.parse(body) as unknown; + } catch { + return body; + } + + if (!parsed || typeof parsed !== 'object') return body; + const record = parsed as { upload?: { url?: unknown; headers?: unknown } }; + if (!record.upload || typeof record.upload.url !== 'string') { + return body; + } + + const rewrittenUrl = rewriteUploadDirectUrl(record.upload.url, req); + if (!rewrittenUrl) return body; + + const headers = + record.upload.headers && typeof record.upload.headers === 'object' + ? { ...(record.upload.headers as Record) } + : {}; + Object.assign(headers, buildDaemonHttpAuthHeaders(clientToken)); + + return JSON.stringify({ + ...(parsed as Record), + upload: { + ...record.upload, + url: rewrittenUrl, + headers, + }, + }); +} + +function rewriteUploadDirectUrl(upstreamUrl: string, req: IncomingMessage): string | null { + let parsed: URL; + try { + parsed = new URL(upstreamUrl); + } catch { + return null; + } + + if (!parsed.pathname.startsWith('/upload/')) { + return null; + } + + const host = typeof req.headers.host === 'string' ? req.headers.host : ''; + if (!host) return null; + + const requestPath = new URL(req.url ?? '/', 'http://127.0.0.1').pathname; + const uploadIndex = requestPath.lastIndexOf('/upload/preflight'); + const uploadPrefix = uploadIndex >= 0 ? requestPath.slice(0, uploadIndex) : ''; + const forwardedProto = req.headers['x-forwarded-proto']; + const proto = Array.isArray(forwardedProto) ? forwardedProto[0] : forwardedProto; + const rewritten = new URL(`${proto || 'http'}://${host}`); + rewritten.pathname = `${uploadPrefix}${parsed.pathname}`; + rewritten.search = parsed.search; + return rewritten.toString(); +} diff --git a/src/daemon-proxy.ts b/src/daemon-proxy.ts index f1b419d6c..eb271f32c 100644 --- a/src/daemon-proxy.ts +++ b/src/daemon-proxy.ts @@ -10,6 +10,11 @@ import { buildDaemonHttpUrl, } from './daemon/http-contract.ts'; import { buildDaemonHealthPayload } from './daemon/http-health.ts'; +import { + isSupportedProxyUploadRoute, + rewriteUploadPreflightResponse, + shouldRewriteUploadProxyResponse, +} from './daemon-proxy-upload.ts'; export type DaemonProxyOptions = { upstreamBaseUrl: string; @@ -23,7 +28,14 @@ export type DaemonProxyOptions = { const DEFAULT_MAX_RPC_BODY_BYTES = 1024 * 1024; const DEFAULT_UPSTREAM_TIMEOUT_MS = 5 * 60 * 1000; const DAEMON_PROXY_PREFIX = `${DAEMON_HTTP_BASE_PATH}/`; -const FORWARDED_REQUEST_HEADERS = ['content-type', 'x-artifact-type', 'x-artifact-filename']; +const FORWARDED_REQUEST_HEADERS = [ + 'content-type', + 'content-range', + 'x-artifact-type', + 'x-artifact-filename', + 'x-artifact-hash', + 'x-artifact-hash-algorithm', +]; const FORWARDED_RESPONSE_HEADERS = ['content-type', 'content-disposition', 'x-request-id']; export function createDaemonProxyServer(options: DaemonProxyOptions): http.Server { @@ -115,6 +127,13 @@ async function forwardProxyRequest(params: { res.setHeader('x-request-id', resolveRequestId(req)); } + if (shouldRewriteUploadProxyResponse(route)) { + const text = await response.text(); + res.setHeader('content-type', response.headers.get('content-type') ?? 'application/json'); + res.end(rewriteUploadPreflightResponse(text, req, options.clientToken)); + return; + } + if (!response.body) { res.end(); return; @@ -167,7 +186,7 @@ function resolveProxyRoute(requestUrl: string): string { function isSupportedDaemonRoute(route: string, method: string | undefined): boolean { if (route === '/rpc') return method === 'POST'; - if (route === '/upload') return method === 'POST'; + if (isSupportedProxyUploadRoute(route, method)) return true; if (route.startsWith('/artifacts/')) return method === 'GET'; return false; } diff --git a/src/daemon/http-errors.ts b/src/daemon/http-errors.ts new file mode 100644 index 000000000..ace94f7fa --- /dev/null +++ b/src/daemon/http-errors.ts @@ -0,0 +1,23 @@ +import type http from 'node:http'; +import { normalizeError } from '../utils/errors.ts'; + +export type NormalizedHttpError = ReturnType; + +export function statusCodeForNormalizedError(code: string): number { + switch (code) { + case 'INVALID_ARGS': + return 400; + case 'UNAUTHORIZED': + return 401; + case 'SESSION_NOT_FOUND': + return 404; + default: + return 500; + } +} + +export function sendRestJsonError(res: http.ServerResponse, normalized: NormalizedHttpError): void { + res.statusCode = statusCodeForNormalizedError(normalized.code); + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: false, error: normalized.message, code: normalized.code })); +} diff --git a/src/daemon/http-server.ts b/src/daemon/http-server.ts index 0fb3543fb..fabad4a97 100644 --- a/src/daemon/http-server.ts +++ b/src/daemon/http-server.ts @@ -22,12 +22,7 @@ import { import path from 'node:path'; import { pathToFileURL } from 'node:url'; import { sleep } from '../utils/timeouts.ts'; -import { - cleanupDownloadableArtifact, - prepareDownloadableArtifact, - trackUploadedArtifact, -} from './artifact-tracking.ts'; -import { receiveUpload } from './upload.ts'; +import { cleanupDownloadableArtifact, prepareDownloadableArtifact } from './artifact-tracking.ts'; import { type RequestProgressEvent, withRequestProgressSink } from './request-progress.ts'; import { serializeDaemonProgressEnvelope, @@ -35,6 +30,8 @@ import { shouldStreamRequestProgress, } from './request-progress-protocol.ts'; import { buildDaemonHealthPayload } from './http-health.ts'; +import { sendRestJsonError, statusCodeForNormalizedError } from './http-errors.ts'; +import { handleUploadHttpRoute, isUploadHttpRoute } from './upload-http.ts'; type JsonRpcRequest = JsonRpcRequestEnvelope; @@ -145,19 +142,6 @@ function writeRpcResponseEnvelope( res.end(); } -function statusCodeForNormalizedError(code: string): number { - switch (code) { - case 'INVALID_ARGS': - return 400; - case 'UNAUTHORIZED': - return 401; - case 'SESSION_NOT_FOUND': - return 404; - default: - return 500; - } -} - // Map a thrown boundary error to its JSON-RPC error code. Invalid params (malformed // wire input rejected before the request reaches the handler) is JSON-RPC -32602, to // match the explicit `Invalid params` sibling checks below; everything else is the @@ -541,8 +525,20 @@ export async function createDaemonHttpServer(options: { return; } - if (req.method === 'POST' && req.url === '/upload') { - handleUpload(req, res, authHook, token); + if (isUploadHttpRoute(req)) { + void handleUploadHttpRoute({ + req, + res, + token: resolveToken({}, req.headers), + authorize: async (request) => + await authorizeAuxiliaryHttpRequest({ + req: request.req, + res: request.res, + authHook, + expectedToken: token, + daemonRequest: request.daemonRequest, + }), + }); return; } @@ -735,41 +731,6 @@ export async function createDaemonHttpServer(options: { }); } -async function handleUpload( - req: http.IncomingMessage, - res: http.ServerResponse, - authHook: HttpAuthHook | null, - expectedToken?: string, -): Promise { - try { - const auth = await authorizeAuxiliaryHttpRequest({ - req, - res, - authHook, - expectedToken, - daemonRequest: { - command: 'upload', - positionals: [], - }, - }); - if (!auth) return; - - const result = await receiveUpload(req); - const uploadId = trackUploadedArtifact({ - artifactPath: result.artifactPath, - tempDir: result.tempDir, - tenantId: auth.tenantId, - }); - - res.statusCode = 200; - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true, uploadId })); - } catch (error) { - const normalized = normalizeError(error); - sendRestJsonError(res, normalized); - } -} - async function handleArtifactDownload( req: http.IncomingMessage, res: http.ServerResponse, @@ -902,15 +863,6 @@ async function authorizeAuxiliaryHttpRequest(params: { return { tenantId: authResult.tenantId }; } -function sendRestJsonError( - res: http.ServerResponse, - normalized: ReturnType, -): void { - res.statusCode = statusCodeForNormalizedError(normalized.code); - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: false, error: normalized.message, code: normalized.code })); -} - function enforceDaemonToken( requestToken: string, expectedToken: string | undefined, diff --git a/src/daemon/resumable-upload.ts b/src/daemon/resumable-upload.ts new file mode 100644 index 000000000..a3e3a555b --- /dev/null +++ b/src/daemon/resumable-upload.ts @@ -0,0 +1,270 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs'; +import path from 'node:path'; +import type { IncomingMessage } from 'node:http'; +import { pipeline } from 'node:stream/promises'; +import { AppError } from '../utils/errors.ts'; +import { extractTarInstallableArtifact } from './artifact-archive.ts'; +import { + createArtifactTempDir, + sanitizeArtifactFilename, + validateArtifactContentLength, +} from './artifact-download.ts'; + +const RESUMABLE_UPLOAD_CLEANUP_TIMEOUT_MS = 5 * 60 * 1000; +const RESUMABLE_UPLOAD_HASH_ALGORITHM = 'sha256'; +const RESUMABLE_UPLOADS_BY_ID = new Map(); +const RESUMABLE_UPLOADS_BY_KEY = new Map(); + +type UploadArtifactType = 'file' | 'app-bundle'; + +export type BeginResumableUploadOptions = { + baseUrl: string; + tokenHeaders: Record; + sha256: string; + fileName: string; + sizeBytes: number; + artifactType: UploadArtifactType; + platform?: string; + contentType?: string; + tenantId?: string; +}; + +type ResumableUploadEntry = { + id: string; + key: string; + tempDir: string; + payloadPath: string; + fileName: string; + sizeBytes: number; + sha256: string; + artifactType: UploadArtifactType; + platform?: string; + tenantId?: string; + timer: ReturnType; +}; + +export function beginResumableUpload(options: BeginResumableUploadOptions): { + uploadId: string; + cacheHit: false; + upload: { + url: string; + headers: Record; + }; +} { + validateResumableUploadOptions(options); + const key = buildResumableUploadKey(options); + const existingId = RESUMABLE_UPLOADS_BY_KEY.get(key); + const existing = existingId ? RESUMABLE_UPLOADS_BY_ID.get(existingId) : undefined; + const entry = existing ?? createResumableUploadEntry(options, key); + refreshResumableUploadTimer(entry); + + return { + uploadId: entry.id, + cacheHit: false, + upload: { + url: new URL(`upload/direct/${entry.id}`, ensureTrailingSlash(options.baseUrl)).toString(), + headers: { + ...options.tokenHeaders, + 'content-type': options.contentType || 'application/octet-stream', + }, + }, + }; +} + +export async function receiveResumableUploadChunk(params: { + uploadId: string; + req: IncomingMessage; + tenantId?: string; +}): Promise<{ complete: boolean; offset: number }> { + const entry = requireResumableUpload(params.uploadId, params.tenantId); + const currentOffset = currentResumableUploadOffset(entry); + const contentRange = parseContentRange(params.req.headers['content-range'], entry.sizeBytes); + if (contentRange && contentRange.start !== currentOffset) { + return { complete: false, offset: currentOffset }; + } + if (!contentRange && currentOffset > 0) { + return { complete: false, offset: currentOffset }; + } + + validateArtifactContentLength(params.req.headers['content-length']); + await pipeline(params.req, fs.createWriteStream(entry.payloadPath, { flags: 'a' })); + refreshResumableUploadTimer(entry); + + const offset = currentResumableUploadOffset(entry); + return { complete: offset >= entry.sizeBytes, offset }; +} + +export async function finalizeResumableUpload( + uploadId: string, + tenantId?: string, +): Promise<{ artifactPath: string; tempDir: string }> { + const entry = requireResumableUpload(uploadId, tenantId); + const offset = currentResumableUploadOffset(entry); + if (offset !== entry.sizeBytes) { + throw new AppError('INVALID_ARGS', 'Upload is incomplete', { + uploadId, + offset, + sizeBytes: entry.sizeBytes, + }); + } + const actualHash = await computeFileHash(entry.payloadPath); + if (actualHash !== entry.sha256) { + cleanupResumableUpload(entry.id); + throw new AppError('INVALID_ARGS', 'Upload hash mismatch', { + uploadId, + expectedSha256: entry.sha256, + actualSha256: actualHash, + }); + } + + RESUMABLE_UPLOADS_BY_ID.delete(entry.id); + RESUMABLE_UPLOADS_BY_KEY.delete(entry.key); + clearTimeout(entry.timer); + + if (entry.artifactType === 'file') { + const artifactPath = path.join(entry.tempDir, entry.fileName); + fs.renameSync(entry.payloadPath, artifactPath); + return { artifactPath, tempDir: entry.tempDir }; + } + + const artifactPath = await extractTarInstallableArtifact({ + archivePath: entry.payloadPath, + tempDir: entry.tempDir, + platform: entry.platform === 'android' ? 'android' : 'ios', + expectedRootName: entry.fileName, + }); + fs.rmSync(entry.payloadPath, { force: true }); + return { artifactPath, tempDir: entry.tempDir }; +} + +function validateResumableUploadOptions(options: BeginResumableUploadOptions): void { + if (!/^[a-f0-9]{64}$/i.test(options.sha256)) { + throw new AppError('INVALID_ARGS', 'Invalid upload sha256'); + } + if (!Number.isSafeInteger(options.sizeBytes) || options.sizeBytes < 0) { + throw new AppError('INVALID_ARGS', 'Invalid upload sizeBytes'); + } + validateArtifactContentLength(String(options.sizeBytes)); + sanitizeArtifactFilename(options.fileName); + if (options.artifactType !== 'file' && options.artifactType !== 'app-bundle') { + throw new AppError( + 'INVALID_ARGS', + `Invalid artifactType: ${options.artifactType}. Must be "file" or "app-bundle".`, + ); + } +} + +function createResumableUploadEntry( + options: BeginResumableUploadOptions, + key: string, +): ResumableUploadEntry { + const id = crypto.randomUUID(); + const tempDir = createArtifactTempDir('upload'); + const entry: ResumableUploadEntry = { + id, + key, + tempDir, + payloadPath: path.join(tempDir, 'payload'), + fileName: sanitizeArtifactFilename(options.fileName), + sizeBytes: options.sizeBytes, + sha256: options.sha256.toLowerCase(), + artifactType: options.artifactType, + platform: options.platform, + tenantId: options.tenantId, + timer: setTimeout(() => cleanupResumableUpload(id), RESUMABLE_UPLOAD_CLEANUP_TIMEOUT_MS), + }; + entry.timer.unref(); + RESUMABLE_UPLOADS_BY_ID.set(id, entry); + RESUMABLE_UPLOADS_BY_KEY.set(key, id); + return entry; +} + +function refreshResumableUploadTimer(entry: ResumableUploadEntry): void { + clearTimeout(entry.timer); + entry.timer = setTimeout( + () => cleanupResumableUpload(entry.id), + RESUMABLE_UPLOAD_CLEANUP_TIMEOUT_MS, + ); + entry.timer.unref(); +} + +function cleanupResumableUpload(uploadId: string): void { + const entry = RESUMABLE_UPLOADS_BY_ID.get(uploadId); + if (!entry) return; + clearTimeout(entry.timer); + RESUMABLE_UPLOADS_BY_ID.delete(uploadId); + RESUMABLE_UPLOADS_BY_KEY.delete(entry.key); + fs.rmSync(entry.tempDir, { recursive: true, force: true }); +} + +function requireResumableUpload( + uploadId: string, + tenantId: string | undefined, +): ResumableUploadEntry { + const entry = RESUMABLE_UPLOADS_BY_ID.get(uploadId); + if (!entry) { + throw new AppError('INVALID_ARGS', `Upload not found: ${uploadId}`); + } + if (entry.tenantId && entry.tenantId !== tenantId) { + throw new AppError('UNAUTHORIZED', 'Upload belongs to a different tenant'); + } + return entry; +} + +function currentResumableUploadOffset(entry: ResumableUploadEntry): number { + if (!fs.existsSync(entry.payloadPath)) return 0; + return Math.min(fs.statSync(entry.payloadPath).size, entry.sizeBytes); +} + +function parseContentRange( + value: string | string[] | undefined, + sizeBytes: number, +): { start: number; end: number } | undefined { + const raw = Array.isArray(value) ? value[0] : value; + if (!raw) return undefined; + const match = raw.match(/^bytes (\d+)-(\d+)\/(\d+)$/); + if (!match) { + throw new AppError('INVALID_ARGS', 'Invalid content-range header'); + } + const start = Number(match[1]); + const end = Number(match[2]); + const size = Number(match[3]); + if ( + !Number.isSafeInteger(start) || + !Number.isSafeInteger(end) || + !Number.isSafeInteger(size) || + start < 0 || + end < start || + size !== sizeBytes + ) { + throw new AppError('INVALID_ARGS', 'Invalid content-range header'); + } + return { start, end }; +} + +function buildResumableUploadKey(options: BeginResumableUploadOptions): string { + return [ + options.tenantId ?? '', + options.sha256.toLowerCase(), + String(options.sizeBytes), + sanitizeArtifactFilename(options.fileName), + options.artifactType, + options.platform ?? '', + ].join('\0'); +} + +function ensureTrailingSlash(value: string): string { + return value.endsWith('/') ? value : `${value}/`; +} + +async function computeFileHash(filePath: string): Promise { + const hash = crypto.createHash(RESUMABLE_UPLOAD_HASH_ALGORITHM); + await pipeline(fs.createReadStream(filePath), async function* (source) { + for await (const chunk of source) { + hash.update(chunk); + yield chunk; + } + }); + return hash.digest('hex'); +} diff --git a/src/daemon/upload-http.ts b/src/daemon/upload-http.ts new file mode 100644 index 000000000..fc6ce2ea1 --- /dev/null +++ b/src/daemon/upload-http.ts @@ -0,0 +1,259 @@ +import http from 'node:http'; +import { AppError, normalizeError } from '../utils/errors.ts'; +import type { DaemonRequest } from './types.ts'; +import { trackUploadedArtifact } from './artifact-tracking.ts'; +import { + beginResumableUpload, + finalizeResumableUpload, + receiveResumableUploadChunk, +} from './resumable-upload.ts'; +import { receiveUpload } from './upload.ts'; +import { sendRestJsonError } from './http-errors.ts'; + +type UploadHttpRoute = 'upload' | 'preflight' | 'direct' | 'finalize'; + +type AuxiliaryHttpAuthorizer = (params: { + req: http.IncomingMessage; + res: http.ServerResponse; + daemonRequest: Pick; +}) => Promise<{ tenantId?: string } | null>; + +export async function handleUploadHttpRoute(params: { + req: http.IncomingMessage; + res: http.ServerResponse; + authorize: AuxiliaryHttpAuthorizer; + token: string; +}): Promise { + const { req, res, authorize, token } = params; + switch (resolveUploadHttpRoute(req)) { + case 'preflight': + await handleUploadPreflight(req, res, authorize, token); + return true; + case 'direct': + await handleResumableUpload(req, res, authorize); + return true; + case 'finalize': + await handleUploadFinalize(req, res, authorize); + return true; + case 'upload': + await handleUpload(req, res, authorize); + return true; + default: + return false; + } +} + +export function isUploadHttpRoute(req: http.IncomingMessage): boolean { + return resolveUploadHttpRoute(req) !== null; +} + +function resolveUploadHttpRoute(req: http.IncomingMessage): UploadHttpRoute | null { + if (req.method === 'POST' && req.url === '/upload/preflight') return 'preflight'; + if (req.method === 'PUT' && req.url?.startsWith('/upload/direct/')) return 'direct'; + if (req.method === 'POST' && req.url === '/upload/finalize') return 'finalize'; + if (req.method === 'POST' && req.url === '/upload') return 'upload'; + return null; +} + +async function handleUpload( + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, +): Promise { + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: [], + }, + }); + if (!auth) return; + + const result = await receiveUpload(req); + const uploadId = trackUploadedArtifact({ + artifactPath: result.artifactPath, + tempDir: result.tempDir, + tenantId: auth.tenantId, + }); + + sendJson(res, { ok: true, uploadId }); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +async function handleUploadPreflight( + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, + token: string, +): Promise { + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: ['preflight'], + }, + }); + if (!auth) return; + + const body = await readRestJsonBody(req, 64 * 1024); + const upload = beginResumableUpload({ + baseUrl: resolveHttpRequestBaseUrl(req), + tokenHeaders: buildUploadTicketAuthHeaders(token), + sha256: readRequiredText(body, 'sha256'), + fileName: readRequiredText(body, 'fileName'), + sizeBytes: readRequiredInteger(body, 'sizeBytes'), + artifactType: readRequiredArtifactType(body), + platform: readOptionalText(body, 'platform'), + contentType: readOptionalText(body, 'contentType'), + tenantId: auth.tenantId, + }); + + sendJson(res, { ok: true, ...upload }); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +async function handleResumableUpload( + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, +): Promise { + const uploadId = req.url?.slice('/upload/direct/'.length).split('?')[0] ?? ''; + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: ['direct', uploadId], + }, + }); + if (!auth) return; + + const result = await receiveResumableUploadChunk({ uploadId, req, tenantId: auth.tenantId }); + if (result.complete) { + res.statusCode = 200; + res.end('ok'); + return; + } + + res.statusCode = 308; + if (result.offset > 0) { + res.setHeader('range', `bytes=0-${result.offset - 1}`); + } + res.setHeader('x-upload-offset', String(result.offset)); + res.end(); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +async function handleUploadFinalize( + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, +): Promise { + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: ['finalize'], + }, + }); + if (!auth) return; + + const body = await readRestJsonBody(req, 64 * 1024); + const result = await finalizeResumableUpload(readRequiredText(body, 'uploadId'), auth.tenantId); + const uploadId = trackUploadedArtifact({ + artifactPath: result.artifactPath, + tempDir: result.tempDir, + tenantId: auth.tenantId, + }); + + sendJson(res, { ok: true, uploadId }); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +function sendJson(res: http.ServerResponse, body: Record): void { + res.statusCode = 200; + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify(body)); +} + +async function readRestJsonBody( + req: http.IncomingMessage, + maxBodyBytes: number, +): Promise> { + const chunks: Buffer[] = []; + let bodyBytes = 0; + for await (const chunk of req) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + bodyBytes += buffer.length; + if (bodyBytes > maxBodyBytes) { + throw new AppError('INVALID_ARGS', 'Request body is too large.'); + } + chunks.push(buffer); + } + + const raw = Buffer.concat(chunks).toString('utf8'); + try { + const parsed = JSON.parse(raw) as unknown; + if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { + throw new Error('expected object'); + } + return parsed as Record; + } catch (error) { + throw new AppError('INVALID_ARGS', 'Invalid JSON request body', {}, error); + } +} + +function readRequiredText(record: Record, key: string): string { + const value = readOptionalText(record, key)?.trim(); + if (!value) throw new AppError('INVALID_ARGS', `${key} is required`); + return value; +} + +function readOptionalText(record: Record, key: string): string | undefined { + const value = record[key]; + return typeof value === 'string' ? value : undefined; +} + +function readRequiredInteger(record: Record, key: string): number { + const value = record[key]; + if (typeof value !== 'number' || !Number.isSafeInteger(value)) { + throw new AppError('INVALID_ARGS', `${key} must be an integer`); + } + return value; +} + +function readRequiredArtifactType(record: Record): 'file' | 'app-bundle' { + const value = readRequiredText(record, 'artifactType'); + if (value === 'file' || value === 'app-bundle') return value; + throw new AppError('INVALID_ARGS', 'artifactType must be "file" or "app-bundle"'); +} + +function buildUploadTicketAuthHeaders(token: string): Record { + return { + authorization: `Bearer ${token}`, + 'x-agent-device-token': token, + }; +} + +function resolveHttpRequestBaseUrl(req: http.IncomingMessage): string { + const host = typeof req.headers.host === 'string' ? req.headers.host : ''; + if (!host) throw new AppError('INVALID_ARGS', 'Missing host header'); + const forwardedProto = req.headers['x-forwarded-proto']; + const proto = Array.isArray(forwardedProto) ? forwardedProto[0] : forwardedProto; + return `${proto || 'http'}://${host}`; +} diff --git a/src/upload-client.ts b/src/upload-client.ts index d5905b5d4..a144def6b 100644 --- a/src/upload-client.ts +++ b/src/upload-client.ts @@ -77,20 +77,18 @@ export async function uploadArtifact(options: UploadArtifactOptions): Promise; +}): Promise { + const uploadOnce = async ( + preflight: Extract, + ): Promise => { + await uploadDirectArtifact(options.artifact.payloadPath, preflight); + return await finalizeDirectUpload({ + normalizedBase: options.normalizedBase, + token: options.token, + uploadId: preflight.uploadId, + }); + }; + + try { + return await uploadOnce(options.preflight); + } catch (error) { + if (!shouldRetryDirectUpload(error)) return undefined; + const retryPreflight = await requestUploadPreflight({ + normalizedBase: options.normalizedBase, + token: options.token, + artifact: options.artifact, + }); + if (retryPreflight?.kind === 'cache-hit') { + return retryPreflight.uploadId; + } + if (retryPreflight?.kind === 'direct-upload') { + try { + return await uploadOnce(retryPreflight); + } catch { + return undefined; + } + } + return undefined; + } +} + +function shouldRetryDirectUpload(error: unknown): boolean { + if (!(error instanceof AppError)) return true; + return ( + error.message === 'Failed to upload artifact with direct upload ticket' || + error.message === 'Direct artifact upload timed out' + ); +} + async function prepareUploadArtifact( localPath: string, requestedPlatform: string | undefined, diff --git a/src/utils/cli-command-overrides.ts b/src/utils/cli-command-overrides.ts index 4db492e5c..2c669ec7a 100644 --- a/src/utils/cli-command-overrides.ts +++ b/src/utils/cli-command-overrides.ts @@ -83,7 +83,7 @@ const SCHEMA_ONLY_CLI_COMMAND_SCHEMAS = { Run this on the host that has access to simulators/devices, expose the printed local proxy URL through a tunnel, then point another machine at the tunnel URL with connect proxy. -The proxy starts or reuses a local HTTP daemon, accepts /health, /rpc, /upload, and /artifacts/*, and also accepts the same routes under /agent-device/*. Health is unauthenticated for reachability probes. Other routes require the generated bearer token printed at startup, or the explicit --daemon-auth-token value when provided. The proxy rewrites authorized client requests to the upstream daemon token instead of exposing the local daemon token. +The proxy starts or reuses a local HTTP daemon, accepts /health, /rpc, /upload and resumable /upload/* routes, and /artifacts/*, and also accepts the same routes under /agent-device/*. Health is unauthenticated for reachability probes. Other routes require the generated bearer token printed at startup, or the explicit --daemon-auth-token value when provided. The proxy rewrites authorized client requests to the upstream daemon token instead of exposing the local daemon token. Use the /agent-device base path when connecting through cloudflared, ngrok, or another shared origin. Treat the bearer token as a secret; anyone with it can control the proxied daemon. This direct proxy flow does not use agent-device auth. diff --git a/test/integration/provider-scenarios/daemon-http-server.test.ts b/test/integration/provider-scenarios/daemon-http-server.test.ts index 881a52d66..b39411e1b 100644 --- a/test/integration/provider-scenarios/daemon-http-server.test.ts +++ b/test/integration/provider-scenarios/daemon-http-server.test.ts @@ -1,11 +1,16 @@ import assert from 'node:assert/strict'; +import crypto from 'node:crypto'; import fs from 'node:fs'; import http from 'node:http'; import os from 'node:os'; import path from 'node:path'; import { test } from 'vitest'; import { AppError } from '../../../src/utils/errors.ts'; -import { trackDownloadableArtifact } from '../../../src/daemon/artifact-tracking.ts'; +import { + cleanupUploadedArtifact, + prepareUploadedArtifact, + trackDownloadableArtifact, +} from '../../../src/daemon/artifact-tracking.ts'; import { DAEMON_RPC_PROTOCOL_VERSION } from '../../../src/daemon/http-health.ts'; import { createDaemonHttpServer } from '../../../src/daemon/http-server.ts'; import { emitRequestProgress } from '../../../src/daemon/request-progress.ts'; @@ -519,6 +524,91 @@ test('Provider-backed integration daemon HTTP server accepts uploads and streams } }); +test('Provider-backed integration daemon HTTP server resumes direct uploads before finalize', async (t) => { + if (await skipWhenLoopbackUnavailable(t, 'daemon HTTP integration coverage')) { + return; + } + + const content = Buffer.from('fake-apk-resumable-content'); + const server = await createDaemonHttpServer({ + token: 'provider-scenario-token', + handleRequest: async (): Promise => ({ ok: true, data: {} }), + }); + let trackedUploadId = ''; + + try { + const port = await listenOnLoopback(server); + const preflight = await fetch(`http://127.0.0.1:${port}/upload/preflight`, { + method: 'POST', + headers: { + authorization: 'Bearer provider-scenario-token', + 'content-type': 'application/json', + }, + body: JSON.stringify({ + sha256: crypto.createHash('sha256').update(content).digest('hex'), + fileName: 'demo.apk', + sizeBytes: content.length, + artifactType: 'file', + platform: 'android', + contentType: 'application/octet-stream', + }), + }); + assert.equal(preflight.status, 200); + const preflightBody = (await preflight.json()) as { + uploadId?: string; + upload?: { url?: string; headers?: Record }; + }; + assert.equal(typeof preflightBody.uploadId, 'string'); + assert.equal(typeof preflightBody.upload?.url, 'string'); + if (!preflightBody.upload?.url) throw new Error('missing upload url'); + const uploadUrl = preflightBody.upload.url; + const uploadHeaders = preflightBody.upload.headers ?? {}; + + const firstChunk = await fetch(uploadUrl, { + method: 'PUT', + headers: uploadHeaders, + body: content.subarray(0, 9), + }); + assert.equal(firstChunk.status, 308); + assert.equal(firstChunk.headers.get('x-upload-offset'), '9'); + + const restartFromZero = await fetch(uploadUrl, { + method: 'PUT', + headers: uploadHeaders, + body: content, + }); + assert.equal(restartFromZero.status, 308); + assert.equal(restartFromZero.headers.get('x-upload-offset'), '9'); + + const resumed = await fetch(uploadUrl, { + method: 'PUT', + headers: { + ...uploadHeaders, + 'content-range': `bytes 9-${content.length - 1}/${content.length}`, + }, + body: content.subarray(9), + }); + assert.equal(resumed.status, 200); + + const finalize = await fetch(`http://127.0.0.1:${port}/upload/finalize`, { + method: 'POST', + headers: { + authorization: 'Bearer provider-scenario-token', + 'content-type': 'application/json', + }, + body: JSON.stringify({ uploadId: preflightBody.uploadId }), + }); + assert.equal(finalize.status, 200); + const finalizeBody = (await finalize.json()) as { uploadId?: string }; + trackedUploadId = finalizeBody.uploadId ?? ''; + assert.equal(typeof trackedUploadId, 'string'); + assert.deepEqual(fs.readFileSync(prepareUploadedArtifact(trackedUploadId)), content); + } finally { + if (trackedUploadId) cleanupUploadedArtifact(trackedUploadId); + await closeLoopbackServer(server); + } +}); + test('Provider-backed integration daemon HTTP auth hook can scope tenants and reject requests', async (t) => { if (await skipWhenLoopbackUnavailable(t, 'daemon HTTP integration coverage')) { return; diff --git a/website/docs/docs/remote-proxy.md b/website/docs/docs/remote-proxy.md index 11dfa4aa4..cdc222ed0 100644 --- a/website/docs/docs/remote-proxy.md +++ b/website/docs/docs/remote-proxy.md @@ -52,7 +52,7 @@ Do not commit a config file that contains a live `daemonAuthToken`. ## What Is Exposed -The proxy allows only the daemon HTTP contract: `/health`, `/rpc`, `/upload`, and `/artifacts/*`, with the same routes also available under `/agent-device/*`. Health checks are unauthenticated; command, upload, and artifact routes require the bearer token. +The proxy allows only the daemon HTTP contract: `/health`, `/rpc`, `/upload` plus resumable `/upload/*` routes, and `/artifacts/*`, with the same routes also available under `/agent-device/*`. Health checks are unauthenticated; command, upload, and artifact routes require the bearer token. The proxy validates the client token and rewrites authorized upstream requests to the local daemon token. The local daemon still validates its own token, so the daemon token is not exposed to remote clients. From f9a8777d31af5e278b2fa19dfea49e6d78d86419 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Sun, 28 Jun 2026 21:25:42 +0200 Subject: [PATCH 2/7] refactor: reduce upload proxy fallow findings --- src/__tests__/daemon-proxy.test.ts | 262 ++++++++++++++++++----------- src/daemon-proxy.ts | 68 +++++--- src/daemon/resumable-upload.ts | 43 +++-- src/daemon/upload-http.ts | 51 +++--- src/utils/node-http.ts | 19 +++ 5 files changed, 276 insertions(+), 167 deletions(-) diff --git a/src/__tests__/daemon-proxy.test.ts b/src/__tests__/daemon-proxy.test.ts index 480874d98..ec9db872d 100644 --- a/src/__tests__/daemon-proxy.test.ts +++ b/src/__tests__/daemon-proxy.test.ts @@ -243,59 +243,8 @@ test('daemon proxy streams uploads and artifact downloads with upstream daemon t test('daemon proxy forwards resumable upload routes and rewrites direct upload tickets', async (t) => { if (await skipWhenLoopbackUnavailable(t)) return; - let directAuth = ''; - let directTokenHeader = ''; - let directContentRange = ''; - let directBody = ''; - let finalizeAuth = ''; - const upstream = http.createServer((req, res) => { - if (req.method === 'GET' && req.url === '/health') { - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true })); - return; - } - if (req.method === 'POST' && req.url === '/upload/preflight') { - res.setHeader('content-type', 'application/json'); - res.end( - JSON.stringify({ - ok: true, - cacheHit: false, - uploadId: 'upload-1', - upload: { - url: 'http://127.0.0.1:65535/upload/direct/upload-1', - headers: { - authorization: 'Bearer daemon-secret', - 'x-agent-device-token': 'daemon-secret', - 'content-type': 'application/octet-stream', - }, - }, - }), - ); - return; - } - if (req.method === 'PUT' && req.url === '/upload/direct/upload-1') { - directAuth = String(req.headers.authorization ?? ''); - directTokenHeader = String(req.headers['x-agent-device-token'] ?? ''); - directContentRange = String(req.headers['content-range'] ?? ''); - req.setEncoding('utf8'); - req.on('data', (chunk) => { - directBody += chunk; - }); - req.on('end', () => { - res.statusCode = 200; - res.end('ok'); - }); - return; - } - if (req.method === 'POST' && req.url === '/upload/finalize') { - finalizeAuth = String(req.headers.authorization ?? ''); - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); - return; - } - res.statusCode = 404; - res.end('not found'); - }); + const capture: ResumableUploadProxyCapture = {}; + const upstream = createResumableUploadProxyUpstream(capture); const proxy = createDaemonProxyServer({ upstreamBaseUrl: `http://127.0.0.1:${await listenOnLoopback(upstream)}`, upstreamToken: 'daemon-secret', @@ -304,57 +253,166 @@ test('daemon proxy forwards resumable upload routes and rewrites direct upload t try { const proxyPort = await listenOnLoopback(proxy); - const preflight = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/preflight`, { - method: 'POST', - headers: { - authorization: 'Bearer proxy-secret', - 'content-type': 'application/json', - }, - body: JSON.stringify({ - sha256: crypto.createHash('sha256').update('resumed').digest('hex'), - fileName: 'demo.apk', - sizeBytes: 7, - artifactType: 'file', - }), - }); - assert.equal(preflight.status, 200); - const preflightBody = (await preflight.json()) as { - upload?: { url?: string; headers?: Record }; - }; - assert.match( - preflightBody.upload?.url ?? '', - new RegExp(`^http://127\\.0\\.0\\.1:${proxyPort}/agent-device/upload/direct/upload-1$`), - ); - assert.equal(preflightBody.upload?.headers?.authorization, 'Bearer proxy-secret'); - assert.equal(preflightBody.upload?.headers?.['x-agent-device-token'], 'proxy-secret'); - - const direct = await fetch(preflightBody.upload?.url ?? '', { - method: 'PUT', - headers: { - ...(preflightBody.upload?.headers ?? {}), - 'content-range': 'bytes 3-6/7', - }, - body: Buffer.from('umed'), - }); - assert.equal(direct.status, 200); - assert.equal(directAuth, 'Bearer daemon-secret'); - assert.equal(directTokenHeader, 'daemon-secret'); - assert.equal(directContentRange, 'bytes 3-6/7'); - assert.equal(directBody, 'umed'); - - const finalize = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/finalize`, { - method: 'POST', - headers: { - authorization: 'Bearer proxy-secret', - 'content-type': 'application/json', - }, - body: JSON.stringify({ uploadId: 'upload-1' }), - }); - assert.equal(finalize.status, 200); - assert.deepEqual(await finalize.json(), { ok: true, uploadId: 'tracked-upload-1' }); - assert.equal(finalizeAuth, 'Bearer daemon-secret'); + const ticket = await requestRewrittenUploadTicket(proxyPort); + await assertDirectUploadUsesDaemonToken(ticket, capture); + await assertFinalizeUsesDaemonToken(proxyPort, capture); } finally { await closeLoopbackServer(proxy); await closeLoopbackServer(upstream); } }); + +type RewrittenUploadTicket = { + url: string; + headers: Record; +}; + +type ResumableUploadProxyCapture = { + directAuth?: string; + directTokenHeader?: string; + directContentRange?: string; + directBody?: string; + finalizeAuth?: string; +}; + +async function requestRewrittenUploadTicket(proxyPort: number): Promise { + const preflight = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/preflight`, { + method: 'POST', + headers: { + authorization: 'Bearer proxy-secret', + 'content-type': 'application/json', + }, + body: JSON.stringify({ + sha256: crypto.createHash('sha256').update('resumed').digest('hex'), + fileName: 'demo.apk', + sizeBytes: 7, + artifactType: 'file', + }), + }); + assert.equal(preflight.status, 200); + + const body = (await preflight.json()) as { + upload?: { url?: string; headers?: Record }; + }; + const ticket = readUploadTicket(body); + assert.match( + ticket.url, + new RegExp(`^http://127\\.0\\.0\\.1:${proxyPort}/agent-device/upload/direct/upload-1$`), + ); + assert.equal(ticket.headers.authorization, 'Bearer proxy-secret'); + assert.equal(ticket.headers['x-agent-device-token'], 'proxy-secret'); + return ticket; +} + +function readUploadTicket(body: { + upload?: { url?: string; headers?: Record }; +}): RewrittenUploadTicket { + if (!body.upload?.url) throw new Error('missing upload url'); + return { + url: body.upload.url, + headers: body.upload.headers ?? {}, + }; +} + +async function assertDirectUploadUsesDaemonToken( + ticket: RewrittenUploadTicket, + capture: ResumableUploadProxyCapture, +): Promise { + const direct = await fetch(ticket.url, { + method: 'PUT', + headers: { + ...ticket.headers, + 'content-range': 'bytes 3-6/7', + }, + body: Buffer.from('umed'), + }); + assert.equal(direct.status, 200); + assert.equal(capture.directAuth, 'Bearer daemon-secret'); + assert.equal(capture.directTokenHeader, 'daemon-secret'); + assert.equal(capture.directContentRange, 'bytes 3-6/7'); + assert.equal(capture.directBody, 'umed'); +} + +async function assertFinalizeUsesDaemonToken( + proxyPort: number, + capture: ResumableUploadProxyCapture, +): Promise { + const finalize = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/finalize`, { + method: 'POST', + headers: { + authorization: 'Bearer proxy-secret', + 'content-type': 'application/json', + }, + body: JSON.stringify({ uploadId: 'upload-1' }), + }); + assert.equal(finalize.status, 200); + assert.deepEqual(await finalize.json(), { ok: true, uploadId: 'tracked-upload-1' }); + assert.equal(capture.finalizeAuth, 'Bearer daemon-secret'); +} + +function createResumableUploadProxyUpstream(capture: ResumableUploadProxyCapture): http.Server { + return http.createServer((req, res) => { + const route = `${req.method ?? ''} ${req.url ?? ''}`; + if (route === 'GET /health') return sendResumableProxyHealth(res); + if (route === 'POST /upload/preflight') return sendResumableProxyPreflight(res); + if (route === 'PUT /upload/direct/upload-1') { + return receiveResumableProxyDirectUpload(req, res, capture); + } + if (route === 'POST /upload/finalize') return sendResumableProxyFinalize(req, res, capture); + res.statusCode = 404; + res.end('not found'); + }); +} + +function sendResumableProxyHealth(res: http.ServerResponse): void { + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true })); +} + +function sendResumableProxyPreflight(res: http.ServerResponse): void { + res.setHeader('content-type', 'application/json'); + res.end( + JSON.stringify({ + ok: true, + cacheHit: false, + uploadId: 'upload-1', + upload: { + url: 'http://127.0.0.1:65535/upload/direct/upload-1', + headers: { + authorization: 'Bearer daemon-secret', + 'x-agent-device-token': 'daemon-secret', + 'content-type': 'application/octet-stream', + }, + }, + }), + ); +} + +function receiveResumableProxyDirectUpload( + req: http.IncomingMessage, + res: http.ServerResponse, + capture: ResumableUploadProxyCapture, +): void { + capture.directAuth = String(req.headers.authorization ?? ''); + capture.directTokenHeader = String(req.headers['x-agent-device-token'] ?? ''); + capture.directContentRange = String(req.headers['content-range'] ?? ''); + capture.directBody = ''; + req.setEncoding('utf8'); + req.on('data', (chunk) => { + capture.directBody += chunk; + }); + req.on('end', () => { + res.statusCode = 200; + res.end('ok'); + }); +} + +function sendResumableProxyFinalize( + req: http.IncomingMessage, + res: http.ServerResponse, + capture: ResumableUploadProxyCapture, +): void { + capture.finalizeAuth = String(req.headers.authorization ?? ''); + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); +} diff --git a/src/daemon-proxy.ts b/src/daemon-proxy.ts index eb271f32c..ee5e75f08 100644 --- a/src/daemon-proxy.ts +++ b/src/daemon-proxy.ts @@ -3,6 +3,7 @@ import { Readable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { randomUUID } from 'node:crypto'; import { AppError, normalizeError } from './utils/errors.ts'; +import { readNodeHttpRequestBody } from './utils/node-http.ts'; import { timingSafeStringEqual } from './utils/timing-safe-equal.ts'; import { DAEMON_HTTP_BASE_PATH, @@ -66,7 +67,13 @@ async function handleProxyRequest( let rpcBody: string | undefined; if (route === '/rpc') { - rpcBody = (await readBodyBuffer(req, options.maxRpcBodyBytes)).toString('utf8'); + rpcBody = ( + await readNodeHttpRequestBody( + req, + options.maxRpcBodyBytes, + 'Proxy request body is too large.', + ) + ).toString('utf8'); } if (!isAuthorized(req, options.clientToken, rpcBody)) { @@ -118,22 +125,55 @@ async function forwardProxyRequest(params: { ...(body ? { body, duplex: 'half' as const } : {}), }); + await sendProxyResponse({ req, res, route, response, clientToken: options.clientToken }); +} + +async function sendProxyResponse(params: { + req: IncomingMessage; + res: ServerResponse; + route: string; + response: Response; + clientToken: string; +}): Promise { + const { req, res, route, response, clientToken } = params; res.statusCode = response.status; + copyProxyResponseHeaders(response, res); + ensureProxyRequestId(req, res); + + if (shouldRewriteUploadProxyResponse(route)) { + await sendRewrittenUploadPreflightResponse({ req, res, response, clientToken }); + return; + } + + await pipeProxyResponseBody(response, res); +} + +function copyProxyResponseHeaders(response: Response, res: ServerResponse): void { for (const name of FORWARDED_RESPONSE_HEADERS) { const value = response.headers.get(name); if (value) res.setHeader(name, value); } +} + +function ensureProxyRequestId(req: IncomingMessage, res: ServerResponse): void { if (!res.hasHeader('x-request-id')) { res.setHeader('x-request-id', resolveRequestId(req)); } +} - if (shouldRewriteUploadProxyResponse(route)) { - const text = await response.text(); - res.setHeader('content-type', response.headers.get('content-type') ?? 'application/json'); - res.end(rewriteUploadPreflightResponse(text, req, options.clientToken)); - return; - } +async function sendRewrittenUploadPreflightResponse(params: { + req: IncomingMessage; + res: ServerResponse; + response: Response; + clientToken: string; +}): Promise { + const { req, res, response, clientToken } = params; + const text = await response.text(); + res.setHeader('content-type', response.headers.get('content-type') ?? 'application/json'); + res.end(rewriteUploadPreflightResponse(text, req, clientToken)); +} +async function pipeProxyResponseBody(response: Response, res: ServerResponse): Promise { if (!response.body) { res.end(); return; @@ -281,20 +321,6 @@ function resolveRequestId(req: IncomingMessage): string { return randomUUID(); } -async function readBodyBuffer(req: IncomingMessage, maxBodyBytes: number): Promise { - const chunks: Buffer[] = []; - let bodyBytes = 0; - for await (const chunk of req) { - const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); - bodyBytes += buffer.length; - if (bodyBytes > maxBodyBytes) { - throw new AppError('INVALID_ARGS', 'Proxy request body is too large.'); - } - chunks.push(buffer); - } - return Buffer.concat(chunks); -} - function sendUnauthorized(res: ServerResponse, route: string, rpcId: unknown): void { res.statusCode = 401; res.setHeader('content-type', 'application/json'); diff --git a/src/daemon/resumable-upload.ts b/src/daemon/resumable-upload.ts index a3e3a555b..d396bd6e9 100644 --- a/src/daemon/resumable-upload.ts +++ b/src/daemon/resumable-upload.ts @@ -223,24 +223,35 @@ function parseContentRange( ): { start: number; end: number } | undefined { const raw = Array.isArray(value) ? value[0] : value; if (!raw) return undefined; - const match = raw.match(/^bytes (\d+)-(\d+)\/(\d+)$/); - if (!match) { - throw new AppError('INVALID_ARGS', 'Invalid content-range header'); - } - const start = Number(match[1]); - const end = Number(match[2]); - const size = Number(match[3]); - if ( - !Number.isSafeInteger(start) || - !Number.isSafeInteger(end) || - !Number.isSafeInteger(size) || - start < 0 || - end < start || - size !== sizeBytes - ) { + const range = readContentRange(raw); + if (!range || !isValidContentRange(range, sizeBytes)) { throw new AppError('INVALID_ARGS', 'Invalid content-range header'); } - return { start, end }; + return { start: range.start, end: range.end }; +} + +function readContentRange(raw: string): { start: number; end: number; size: number } | null { + const match = raw.match(/^bytes (\d+)-(\d+)\/(\d+)$/); + if (!match) return null; + return { + start: Number(match[1]), + end: Number(match[2]), + size: Number(match[3]), + }; +} + +function isValidContentRange( + range: { start: number; end: number; size: number }, + sizeBytes: number, +): boolean { + return ( + Number.isSafeInteger(range.start) && + Number.isSafeInteger(range.end) && + Number.isSafeInteger(range.size) && + range.start >= 0 && + range.end >= range.start && + range.size === sizeBytes + ); } function buildResumableUploadKey(options: BeginResumableUploadOptions): string { diff --git a/src/daemon/upload-http.ts b/src/daemon/upload-http.ts index fc6ce2ea1..2d9264223 100644 --- a/src/daemon/upload-http.ts +++ b/src/daemon/upload-http.ts @@ -9,6 +9,7 @@ import { } from './resumable-upload.ts'; import { receiveUpload } from './upload.ts'; import { sendRestJsonError } from './http-errors.ts'; +import { readNodeHttpRequestBody } from '../utils/node-http.ts'; type UploadHttpRoute = 'upload' | 'preflight' | 'direct' | 'finalize'; @@ -71,14 +72,7 @@ async function handleUpload( }); if (!auth) return; - const result = await receiveUpload(req); - const uploadId = trackUploadedArtifact({ - artifactPath: result.artifactPath, - tempDir: result.tempDir, - tenantId: auth.tenantId, - }); - - sendJson(res, { ok: true, uploadId }); + sendUploadedArtifactResponse(res, await receiveUpload(req), auth.tenantId); } catch (error) { sendRestJsonError(res, normalizeError(error)); } @@ -172,14 +166,11 @@ async function handleUploadFinalize( if (!auth) return; const body = await readRestJsonBody(req, 64 * 1024); - const result = await finalizeResumableUpload(readRequiredText(body, 'uploadId'), auth.tenantId); - const uploadId = trackUploadedArtifact({ - artifactPath: result.artifactPath, - tempDir: result.tempDir, - tenantId: auth.tenantId, - }); - - sendJson(res, { ok: true, uploadId }); + sendUploadedArtifactResponse( + res, + await finalizeResumableUpload(readRequiredText(body, 'uploadId'), auth.tenantId), + auth.tenantId, + ); } catch (error) { sendRestJsonError(res, normalizeError(error)); } @@ -191,22 +182,26 @@ function sendJson(res: http.ServerResponse, body: Record): void res.end(JSON.stringify(body)); } +function sendUploadedArtifactResponse( + res: http.ServerResponse, + result: { artifactPath: string; tempDir: string }, + tenantId: string | undefined, +): void { + const uploadId = trackUploadedArtifact({ + artifactPath: result.artifactPath, + tempDir: result.tempDir, + tenantId, + }); + sendJson(res, { ok: true, uploadId }); +} + async function readRestJsonBody( req: http.IncomingMessage, maxBodyBytes: number, ): Promise> { - const chunks: Buffer[] = []; - let bodyBytes = 0; - for await (const chunk of req) { - const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); - bodyBytes += buffer.length; - if (bodyBytes > maxBodyBytes) { - throw new AppError('INVALID_ARGS', 'Request body is too large.'); - } - chunks.push(buffer); - } - - const raw = Buffer.concat(chunks).toString('utf8'); + const raw = ( + await readNodeHttpRequestBody(req, maxBodyBytes, 'Request body is too large.') + ).toString('utf8'); try { const parsed = JSON.parse(raw) as unknown; if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { diff --git a/src/utils/node-http.ts b/src/utils/node-http.ts index df0dc6eff..d44500fd5 100644 --- a/src/utils/node-http.ts +++ b/src/utils/node-http.ts @@ -1,4 +1,5 @@ import type { IncomingMessage } from 'node:http'; +import { AppError } from './errors.ts'; export function readNodeHttpResponseBody(res: IncomingMessage): Promise { return new Promise((resolve, reject) => { @@ -11,3 +12,21 @@ export function readNodeHttpResponseBody(res: IncomingMessage): Promise res.on('error', reject); }); } + +export async function readNodeHttpRequestBody( + req: IncomingMessage, + maxBodyBytes: number, + tooLargeMessage: string, +): Promise { + const chunks: Buffer[] = []; + let bodyBytes = 0; + for await (const chunk of req) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + bodyBytes += buffer.length; + if (bodyBytes > maxBodyBytes) { + throw new AppError('INVALID_ARGS', tooLargeMessage); + } + chunks.push(buffer); + } + return Buffer.concat(chunks); +} From ac6f2377fcaf346bd2a1402fa0ca1823ba5d0dca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Mon, 29 Jun 2026 08:11:48 +0200 Subject: [PATCH 3/7] refactor: localize proxy upload routing --- src/daemon-proxy-upload.ts | 77 ---------------------------------- src/daemon-proxy.ts | 84 ++++++++++++++++++++++++++++++++++---- 2 files changed, 77 insertions(+), 84 deletions(-) delete mode 100644 src/daemon-proxy-upload.ts diff --git a/src/daemon-proxy-upload.ts b/src/daemon-proxy-upload.ts deleted file mode 100644 index 401db9cbe..000000000 --- a/src/daemon-proxy-upload.ts +++ /dev/null @@ -1,77 +0,0 @@ -import type { IncomingMessage } from 'node:http'; -import { buildDaemonHttpAuthHeaders } from './daemon/http-contract.ts'; - -export function isSupportedProxyUploadRoute(route: string, method: string | undefined): boolean { - if (route === '/upload') return method === 'POST'; - if (route === '/upload/preflight') return method === 'POST'; - if (route === '/upload/finalize') return method === 'POST'; - if (route.startsWith('/upload/direct/')) return method === 'PUT'; - return false; -} - -export function shouldRewriteUploadProxyResponse(route: string): boolean { - return route === '/upload/preflight'; -} - -export function rewriteUploadPreflightResponse( - body: string, - req: IncomingMessage, - clientToken: string, -): string { - let parsed: unknown; - try { - parsed = JSON.parse(body) as unknown; - } catch { - return body; - } - - if (!parsed || typeof parsed !== 'object') return body; - const record = parsed as { upload?: { url?: unknown; headers?: unknown } }; - if (!record.upload || typeof record.upload.url !== 'string') { - return body; - } - - const rewrittenUrl = rewriteUploadDirectUrl(record.upload.url, req); - if (!rewrittenUrl) return body; - - const headers = - record.upload.headers && typeof record.upload.headers === 'object' - ? { ...(record.upload.headers as Record) } - : {}; - Object.assign(headers, buildDaemonHttpAuthHeaders(clientToken)); - - return JSON.stringify({ - ...(parsed as Record), - upload: { - ...record.upload, - url: rewrittenUrl, - headers, - }, - }); -} - -function rewriteUploadDirectUrl(upstreamUrl: string, req: IncomingMessage): string | null { - let parsed: URL; - try { - parsed = new URL(upstreamUrl); - } catch { - return null; - } - - if (!parsed.pathname.startsWith('/upload/')) { - return null; - } - - const host = typeof req.headers.host === 'string' ? req.headers.host : ''; - if (!host) return null; - - const requestPath = new URL(req.url ?? '/', 'http://127.0.0.1').pathname; - const uploadIndex = requestPath.lastIndexOf('/upload/preflight'); - const uploadPrefix = uploadIndex >= 0 ? requestPath.slice(0, uploadIndex) : ''; - const forwardedProto = req.headers['x-forwarded-proto']; - const proto = Array.isArray(forwardedProto) ? forwardedProto[0] : forwardedProto; - const rewritten = new URL(`${proto || 'http'}://${host}`); - rewritten.pathname = `${uploadPrefix}${parsed.pathname}`; - rewritten.search = parsed.search; - return rewritten.toString(); -} diff --git a/src/daemon-proxy.ts b/src/daemon-proxy.ts index ee5e75f08..7713529b9 100644 --- a/src/daemon-proxy.ts +++ b/src/daemon-proxy.ts @@ -11,11 +11,6 @@ import { buildDaemonHttpUrl, } from './daemon/http-contract.ts'; import { buildDaemonHealthPayload } from './daemon/http-health.ts'; -import { - isSupportedProxyUploadRoute, - rewriteUploadPreflightResponse, - shouldRewriteUploadProxyResponse, -} from './daemon-proxy-upload.ts'; export type DaemonProxyOptions = { upstreamBaseUrl: string; @@ -140,7 +135,7 @@ async function sendProxyResponse(params: { copyProxyResponseHeaders(response, res); ensureProxyRequestId(req, res); - if (shouldRewriteUploadProxyResponse(route)) { + if (isUploadPreflightRoute(route)) { await sendRewrittenUploadPreflightResponse({ req, res, response, clientToken }); return; } @@ -173,6 +168,69 @@ async function sendRewrittenUploadPreflightResponse(params: { res.end(rewriteUploadPreflightResponse(text, req, clientToken)); } +function rewriteUploadPreflightResponse( + body: string, + req: IncomingMessage, + clientToken: string, +): string { + let parsed: unknown; + try { + parsed = JSON.parse(body) as unknown; + } catch { + return body; + } + + if (!parsed || typeof parsed !== 'object') return body; + const record = parsed as { upload?: { url?: unknown; headers?: unknown } }; + if (!record.upload || typeof record.upload.url !== 'string') { + return body; + } + + const rewrittenUrl = rewriteUploadDirectUrl(record.upload.url, req); + if (!rewrittenUrl) return body; + + const headers = + record.upload.headers && typeof record.upload.headers === 'object' + ? { ...(record.upload.headers as Record) } + : {}; + Object.assign(headers, buildDaemonHttpAuthHeaders(clientToken)); + + return JSON.stringify({ + ...(parsed as Record), + upload: { + ...record.upload, + url: rewrittenUrl, + headers, + }, + }); +} + +function rewriteUploadDirectUrl(upstreamUrl: string, req: IncomingMessage): string | null { + let parsed: URL; + try { + parsed = new URL(upstreamUrl); + } catch { + return null; + } + + if (!parsed.pathname.startsWith('/upload/')) { + return null; + } + + const host = typeof req.headers.host === 'string' ? req.headers.host : ''; + if (!host) return null; + + const requestPath = new URL(req.url ?? '/', 'http://127.0.0.1').pathname; + const uploadIndex = requestPath.lastIndexOf('/upload/preflight'); + const uploadPrefix = uploadIndex >= 0 ? requestPath.slice(0, uploadIndex) : ''; + const forwardedProto = req.headers['x-forwarded-proto']; + const proto = Array.isArray(forwardedProto) ? forwardedProto[0] : forwardedProto; + const rewritten = new URL(`${proto || 'http'}://${host}`); + rewritten.pathname = `${uploadPrefix}${parsed.pathname}`; + rewritten.search = parsed.search; + return rewritten.toString(); +} + async function pipeProxyResponseBody(response: Response, res: ServerResponse): Promise { if (!response.body) { res.end(); @@ -226,11 +284,23 @@ function resolveProxyRoute(requestUrl: string): string { function isSupportedDaemonRoute(route: string, method: string | undefined): boolean { if (route === '/rpc') return method === 'POST'; - if (isSupportedProxyUploadRoute(route, method)) return true; + if (isSupportedUploadRoute(route, method)) return true; if (route.startsWith('/artifacts/')) return method === 'GET'; return false; } +function isSupportedUploadRoute(route: string, method: string | undefined): boolean { + if (route === '/upload') return method === 'POST'; + if (isUploadPreflightRoute(route)) return method === 'POST'; + if (route === '/upload/finalize') return method === 'POST'; + if (route.startsWith('/upload/direct/')) return method === 'PUT'; + return false; +} + +function isUploadPreflightRoute(route: string): boolean { + return route === '/upload/preflight'; +} + function buildUpstreamUrl(upstreamBaseUrl: string, route: string, rawUrl: string): URL { const upstreamUrl = new URL(buildDaemonHttpUrl(upstreamBaseUrl, route)); const rawSearchIndex = rawUrl.indexOf('?'); From c4203f13d10c4cf2ca55ac6915dcd0ed360e1fa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Mon, 29 Jun 2026 08:17:44 +0200 Subject: [PATCH 4/7] test: simplify resumable proxy upload coverage --- src/__tests__/daemon-proxy.test.ts | 133 ++++++++++++++--------------- 1 file changed, 65 insertions(+), 68 deletions(-) diff --git a/src/__tests__/daemon-proxy.test.ts b/src/__tests__/daemon-proxy.test.ts index ec9db872d..dd1d3f52d 100644 --- a/src/__tests__/daemon-proxy.test.ts +++ b/src/__tests__/daemon-proxy.test.ts @@ -268,10 +268,12 @@ type RewrittenUploadTicket = { }; type ResumableUploadProxyCapture = { - directAuth?: string; - directTokenHeader?: string; - directContentRange?: string; - directBody?: string; + direct?: { + auth: string; + token: string; + contentRange: string; + body: string; + }; finalizeAuth?: string; }; @@ -327,10 +329,12 @@ async function assertDirectUploadUsesDaemonToken( body: Buffer.from('umed'), }); assert.equal(direct.status, 200); - assert.equal(capture.directAuth, 'Bearer daemon-secret'); - assert.equal(capture.directTokenHeader, 'daemon-secret'); - assert.equal(capture.directContentRange, 'bytes 3-6/7'); - assert.equal(capture.directBody, 'umed'); + assert.deepEqual(capture.direct, { + auth: 'Bearer daemon-secret', + token: 'daemon-secret', + contentRange: 'bytes 3-6/7', + body: 'umed', + }); } async function assertFinalizeUsesDaemonToken( @@ -351,68 +355,61 @@ async function assertFinalizeUsesDaemonToken( } function createResumableUploadProxyUpstream(capture: ResumableUploadProxyCapture): http.Server { + const routes: Record void> = { + 'GET /health': (_req, res) => { + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true })); + }, + 'POST /upload/preflight': (_req, res) => { + res.setHeader('content-type', 'application/json'); + res.end( + JSON.stringify({ + ok: true, + cacheHit: false, + uploadId: 'upload-1', + upload: { + url: 'http://127.0.0.1:65535/upload/direct/upload-1', + headers: { + authorization: 'Bearer daemon-secret', + 'x-agent-device-token': 'daemon-secret', + 'content-type': 'application/octet-stream', + }, + }, + }), + ); + }, + 'PUT /upload/direct/upload-1': (req, res) => { + const direct = { + auth: String(req.headers.authorization ?? ''), + token: String(req.headers['x-agent-device-token'] ?? ''), + contentRange: String(req.headers['content-range'] ?? ''), + body: '', + }; + capture.direct = direct; + req.setEncoding('utf8'); + req.on('data', (chunk) => { + direct.body += chunk; + }); + req.on('end', () => { + res.statusCode = 200; + res.end('ok'); + }); + }, + 'POST /upload/finalize': (req, res) => { + capture.finalizeAuth = String(req.headers.authorization ?? ''); + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); + }, + }; + return http.createServer((req, res) => { const route = `${req.method ?? ''} ${req.url ?? ''}`; - if (route === 'GET /health') return sendResumableProxyHealth(res); - if (route === 'POST /upload/preflight') return sendResumableProxyPreflight(res); - if (route === 'PUT /upload/direct/upload-1') { - return receiveResumableProxyDirectUpload(req, res, capture); + const handler = routes[route]; + if (handler) { + handler(req, res); + } else { + res.statusCode = 404; + res.end('not found'); } - if (route === 'POST /upload/finalize') return sendResumableProxyFinalize(req, res, capture); - res.statusCode = 404; - res.end('not found'); - }); -} - -function sendResumableProxyHealth(res: http.ServerResponse): void { - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true })); -} - -function sendResumableProxyPreflight(res: http.ServerResponse): void { - res.setHeader('content-type', 'application/json'); - res.end( - JSON.stringify({ - ok: true, - cacheHit: false, - uploadId: 'upload-1', - upload: { - url: 'http://127.0.0.1:65535/upload/direct/upload-1', - headers: { - authorization: 'Bearer daemon-secret', - 'x-agent-device-token': 'daemon-secret', - 'content-type': 'application/octet-stream', - }, - }, - }), - ); -} - -function receiveResumableProxyDirectUpload( - req: http.IncomingMessage, - res: http.ServerResponse, - capture: ResumableUploadProxyCapture, -): void { - capture.directAuth = String(req.headers.authorization ?? ''); - capture.directTokenHeader = String(req.headers['x-agent-device-token'] ?? ''); - capture.directContentRange = String(req.headers['content-range'] ?? ''); - capture.directBody = ''; - req.setEncoding('utf8'); - req.on('data', (chunk) => { - capture.directBody += chunk; }); - req.on('end', () => { - res.statusCode = 200; - res.end('ok'); - }); -} - -function sendResumableProxyFinalize( - req: http.IncomingMessage, - res: http.ServerResponse, - capture: ResumableUploadProxyCapture, -): void { - capture.finalizeAuth = String(req.headers.authorization ?? ''); - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); } From 9a6a37d30bdf356c39ad0adda73050ec22bc33a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Mon, 29 Jun 2026 08:36:52 +0200 Subject: [PATCH 5/7] refactor: deepen upload route handling --- src/daemon/http-server.ts | 9 +++++---- src/daemon/upload-http.ts | 34 +++++++++++++++++++++------------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/daemon/http-server.ts b/src/daemon/http-server.ts index fabad4a97..23198eef3 100644 --- a/src/daemon/http-server.ts +++ b/src/daemon/http-server.ts @@ -31,7 +31,7 @@ import { } from './request-progress-protocol.ts'; import { buildDaemonHealthPayload } from './http-health.ts'; import { sendRestJsonError, statusCodeForNormalizedError } from './http-errors.ts'; -import { handleUploadHttpRoute, isUploadHttpRoute } from './upload-http.ts'; +import { tryHandleUploadHttpRoute } from './upload-http.ts'; type JsonRpcRequest = JsonRpcRequestEnvelope; @@ -525,8 +525,8 @@ export async function createDaemonHttpServer(options: { return; } - if (isUploadHttpRoute(req)) { - void handleUploadHttpRoute({ + if ( + tryHandleUploadHttpRoute({ req, res, token: resolveToken({}, req.headers), @@ -538,7 +538,8 @@ export async function createDaemonHttpServer(options: { expectedToken: token, daemonRequest: request.daemonRequest, }), - }); + }) + ) { return; } diff --git a/src/daemon/upload-http.ts b/src/daemon/upload-http.ts index 2d9264223..cd99d4aba 100644 --- a/src/daemon/upload-http.ts +++ b/src/daemon/upload-http.ts @@ -19,35 +19,43 @@ type AuxiliaryHttpAuthorizer = (params: { daemonRequest: Pick; }) => Promise<{ tenantId?: string } | null>; -export async function handleUploadHttpRoute(params: { +export function tryHandleUploadHttpRoute(params: { req: http.IncomingMessage; res: http.ServerResponse; authorize: AuxiliaryHttpAuthorizer; token: string; -}): Promise { +}): boolean { const { req, res, authorize, token } = params; - switch (resolveUploadHttpRoute(req)) { + const route = resolveUploadHttpRoute(req); + if (route === null) return false; + + void handleUploadHttpRoute(route, req, res, authorize, token); + return true; +} + +async function handleUploadHttpRoute( + route: UploadHttpRoute, + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, + token: string, +): Promise { + switch (route) { case 'preflight': await handleUploadPreflight(req, res, authorize, token); - return true; + return; case 'direct': await handleResumableUpload(req, res, authorize); - return true; + return; case 'finalize': await handleUploadFinalize(req, res, authorize); - return true; + return; case 'upload': await handleUpload(req, res, authorize); - return true; - default: - return false; + return; } } -export function isUploadHttpRoute(req: http.IncomingMessage): boolean { - return resolveUploadHttpRoute(req) !== null; -} - function resolveUploadHttpRoute(req: http.IncomingMessage): UploadHttpRoute | null { if (req.method === 'POST' && req.url === '/upload/preflight') return 'preflight'; if (req.method === 'PUT' && req.url?.startsWith('/upload/direct/')) return 'direct'; From 92bb7f03a7084f4d0f77d25c94ed6589f0fad431 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Mon, 29 Jun 2026 08:44:52 +0200 Subject: [PATCH 6/7] refactor: tighten upload http types --- src/daemon/resumable-upload.ts | 6 ---- src/daemon/upload-http.ts | 65 ++++++++++++++++++++++++++-------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/src/daemon/resumable-upload.ts b/src/daemon/resumable-upload.ts index d396bd6e9..9fca100af 100644 --- a/src/daemon/resumable-upload.ts +++ b/src/daemon/resumable-upload.ts @@ -147,12 +147,6 @@ function validateResumableUploadOptions(options: BeginResumableUploadOptions): v } validateArtifactContentLength(String(options.sizeBytes)); sanitizeArtifactFilename(options.fileName); - if (options.artifactType !== 'file' && options.artifactType !== 'app-bundle') { - throw new AppError( - 'INVALID_ARGS', - `Invalid artifactType: ${options.artifactType}. Must be "file" or "app-bundle".`, - ); - } } function createResumableUploadEntry( diff --git a/src/daemon/upload-http.ts b/src/daemon/upload-http.ts index cd99d4aba..29e724ac6 100644 --- a/src/daemon/upload-http.ts +++ b/src/daemon/upload-http.ts @@ -3,6 +3,7 @@ import { AppError, normalizeError } from '../utils/errors.ts'; import type { DaemonRequest } from './types.ts'; import { trackUploadedArtifact } from './artifact-tracking.ts'; import { + type BeginResumableUploadOptions, beginResumableUpload, finalizeResumableUpload, receiveResumableUploadChunk, @@ -11,7 +12,22 @@ import { receiveUpload } from './upload.ts'; import { sendRestJsonError } from './http-errors.ts'; import { readNodeHttpRequestBody } from '../utils/node-http.ts'; -type UploadHttpRoute = 'upload' | 'preflight' | 'direct' | 'finalize'; +const DIRECT_UPLOAD_PATH_PREFIX = '/upload/direct/'; + +type UploadHttpRoute = + | { kind: 'upload' } + | { kind: 'preflight' } + | { kind: 'direct'; uploadId: string } + | { kind: 'finalize' }; + +type UploadPreflightBody = Pick< + BeginResumableUploadOptions, + 'artifactType' | 'contentType' | 'fileName' | 'platform' | 'sha256' | 'sizeBytes' +>; + +type UploadFinalizeBody = { + uploadId: string; +}; type AuxiliaryHttpAuthorizer = (params: { req: http.IncomingMessage; @@ -40,12 +56,12 @@ async function handleUploadHttpRoute( authorize: AuxiliaryHttpAuthorizer, token: string, ): Promise { - switch (route) { + switch (route.kind) { case 'preflight': await handleUploadPreflight(req, res, authorize, token); return; case 'direct': - await handleResumableUpload(req, res, authorize); + await handleResumableUpload(route.uploadId, req, res, authorize); return; case 'finalize': await handleUploadFinalize(req, res, authorize); @@ -57,10 +73,15 @@ async function handleUploadHttpRoute( } function resolveUploadHttpRoute(req: http.IncomingMessage): UploadHttpRoute | null { - if (req.method === 'POST' && req.url === '/upload/preflight') return 'preflight'; - if (req.method === 'PUT' && req.url?.startsWith('/upload/direct/')) return 'direct'; - if (req.method === 'POST' && req.url === '/upload/finalize') return 'finalize'; - if (req.method === 'POST' && req.url === '/upload') return 'upload'; + if (req.method === 'POST' && req.url === '/upload/preflight') return { kind: 'preflight' }; + if (req.method === 'PUT' && req.url?.startsWith(DIRECT_UPLOAD_PATH_PREFIX)) { + return { + kind: 'direct', + uploadId: req.url.slice(DIRECT_UPLOAD_PATH_PREFIX.length).replace(/\?.*$/, ''), + }; + } + if (req.method === 'POST' && req.url === '/upload/finalize') return { kind: 'finalize' }; + if (req.method === 'POST' && req.url === '/upload') return { kind: 'upload' }; return null; } @@ -104,15 +125,11 @@ async function handleUploadPreflight( if (!auth) return; const body = await readRestJsonBody(req, 64 * 1024); + const preflight = readUploadPreflightBody(body); const upload = beginResumableUpload({ baseUrl: resolveHttpRequestBaseUrl(req), tokenHeaders: buildUploadTicketAuthHeaders(token), - sha256: readRequiredText(body, 'sha256'), - fileName: readRequiredText(body, 'fileName'), - sizeBytes: readRequiredInteger(body, 'sizeBytes'), - artifactType: readRequiredArtifactType(body), - platform: readOptionalText(body, 'platform'), - contentType: readOptionalText(body, 'contentType'), + ...preflight, tenantId: auth.tenantId, }); @@ -123,11 +140,11 @@ async function handleUploadPreflight( } async function handleResumableUpload( + uploadId: string, req: http.IncomingMessage, res: http.ServerResponse, authorize: AuxiliaryHttpAuthorizer, ): Promise { - const uploadId = req.url?.slice('/upload/direct/'.length).split('?')[0] ?? ''; try { const auth = await authorize({ req, @@ -174,9 +191,10 @@ async function handleUploadFinalize( if (!auth) return; const body = await readRestJsonBody(req, 64 * 1024); + const finalize = readUploadFinalizeBody(body); sendUploadedArtifactResponse( res, - await finalizeResumableUpload(readRequiredText(body, 'uploadId'), auth.tenantId), + await finalizeResumableUpload(finalize.uploadId, auth.tenantId), auth.tenantId, ); } catch (error) { @@ -221,6 +239,23 @@ async function readRestJsonBody( } } +function readUploadPreflightBody(record: Record): UploadPreflightBody { + return { + sha256: readRequiredText(record, 'sha256'), + fileName: readRequiredText(record, 'fileName'), + sizeBytes: readRequiredInteger(record, 'sizeBytes'), + artifactType: readRequiredArtifactType(record), + platform: readOptionalText(record, 'platform'), + contentType: readOptionalText(record, 'contentType'), + }; +} + +function readUploadFinalizeBody(record: Record): UploadFinalizeBody { + return { + uploadId: readRequiredText(record, 'uploadId'), + }; +} + function readRequiredText(record: Record, key: string): string { const value = readOptionalText(record, key)?.trim(); if (!value) throw new AppError('INVALID_ARGS', `${key} is required`); From f9cde42beab02aab28e00262aab86b526762b3c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Mon, 29 Jun 2026 09:59:47 +0200 Subject: [PATCH 7/7] test: avoid dynamic proxy route dispatch --- src/__tests__/daemon-proxy.test.ts | 125 +++++++++++++++++------------ 1 file changed, 72 insertions(+), 53 deletions(-) diff --git a/src/__tests__/daemon-proxy.test.ts b/src/__tests__/daemon-proxy.test.ts index dd1d3f52d..9df97ee22 100644 --- a/src/__tests__/daemon-proxy.test.ts +++ b/src/__tests__/daemon-proxy.test.ts @@ -355,61 +355,80 @@ async function assertFinalizeUsesDaemonToken( } function createResumableUploadProxyUpstream(capture: ResumableUploadProxyCapture): http.Server { - const routes: Record void> = { - 'GET /health': (_req, res) => { - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true })); - }, - 'POST /upload/preflight': (_req, res) => { - res.setHeader('content-type', 'application/json'); - res.end( - JSON.stringify({ - ok: true, - cacheHit: false, - uploadId: 'upload-1', - upload: { - url: 'http://127.0.0.1:65535/upload/direct/upload-1', - headers: { - authorization: 'Bearer daemon-secret', - 'x-agent-device-token': 'daemon-secret', - 'content-type': 'application/octet-stream', - }, - }, - }), - ); - }, - 'PUT /upload/direct/upload-1': (req, res) => { - const direct = { - auth: String(req.headers.authorization ?? ''), - token: String(req.headers['x-agent-device-token'] ?? ''), - contentRange: String(req.headers['content-range'] ?? ''), - body: '', - }; - capture.direct = direct; - req.setEncoding('utf8'); - req.on('data', (chunk) => { - direct.body += chunk; - }); - req.on('end', () => { - res.statusCode = 200; - res.end('ok'); - }); - }, - 'POST /upload/finalize': (req, res) => { - capture.finalizeAuth = String(req.headers.authorization ?? ''); - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); - }, - }; - return http.createServer((req, res) => { const route = `${req.method ?? ''} ${req.url ?? ''}`; - const handler = routes[route]; - if (handler) { - handler(req, res); - } else { - res.statusCode = 404; - res.end('not found'); + switch (route) { + case 'GET /health': + sendUploadProxyHealth(res); + return; + case 'POST /upload/preflight': + sendUploadProxyPreflight(res); + return; + case 'PUT /upload/direct/upload-1': + captureUploadProxyDirectRequest(req, res, capture); + return; + case 'POST /upload/finalize': + sendUploadProxyFinalize(req, res, capture); + return; + default: + res.statusCode = 404; + res.end('not found'); } }); } + +function sendUploadProxyHealth(res: http.ServerResponse): void { + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true })); +} + +function sendUploadProxyPreflight(res: http.ServerResponse): void { + res.setHeader('content-type', 'application/json'); + res.end( + JSON.stringify({ + ok: true, + cacheHit: false, + uploadId: 'upload-1', + upload: { + url: 'http://127.0.0.1:65535/upload/direct/upload-1', + headers: { + authorization: 'Bearer daemon-secret', + 'x-agent-device-token': 'daemon-secret', + 'content-type': 'application/octet-stream', + }, + }, + }), + ); +} + +function captureUploadProxyDirectRequest( + req: http.IncomingMessage, + res: http.ServerResponse, + capture: ResumableUploadProxyCapture, +): void { + const direct = { + auth: String(req.headers.authorization ?? ''), + token: String(req.headers['x-agent-device-token'] ?? ''), + contentRange: String(req.headers['content-range'] ?? ''), + body: '', + }; + capture.direct = direct; + req.setEncoding('utf8'); + req.on('data', (chunk) => { + direct.body += chunk; + }); + req.on('end', () => { + res.statusCode = 200; + res.end('ok'); + }); +} + +function sendUploadProxyFinalize( + req: http.IncomingMessage, + res: http.ServerResponse, + capture: ResumableUploadProxyCapture, +): void { + capture.finalizeAuth = String(req.headers.authorization ?? ''); + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); +}