diff --git a/src/__tests__/daemon-proxy.test.ts b/src/__tests__/daemon-proxy.test.ts index 85bd88df4..9df97ee22 100644 --- a/src/__tests__/daemon-proxy.test.ts +++ b/src/__tests__/daemon-proxy.test.ts @@ -1,5 +1,6 @@ import { test } from 'vitest'; import assert from 'node:assert/strict'; +import crypto from 'node:crypto'; import http from 'node:http'; import { createDaemonProxyServer } from '../daemon-proxy.ts'; import { DAEMON_RPC_PROTOCOL_VERSION } from '../daemon/http-health.ts'; @@ -238,3 +239,196 @@ test('daemon proxy streams uploads and artifact downloads with upstream daemon t await closeLoopbackServer(upstream); } }); + +test('daemon proxy forwards resumable upload routes and rewrites direct upload tickets', async (t) => { + if (await skipWhenLoopbackUnavailable(t)) return; + + const capture: ResumableUploadProxyCapture = {}; + const upstream = createResumableUploadProxyUpstream(capture); + const proxy = createDaemonProxyServer({ + upstreamBaseUrl: `http://127.0.0.1:${await listenOnLoopback(upstream)}`, + upstreamToken: 'daemon-secret', + clientToken: 'proxy-secret', + }); + + try { + const proxyPort = await listenOnLoopback(proxy); + const ticket = await requestRewrittenUploadTicket(proxyPort); + await assertDirectUploadUsesDaemonToken(ticket, capture); + await assertFinalizeUsesDaemonToken(proxyPort, capture); + } finally { + await closeLoopbackServer(proxy); + await closeLoopbackServer(upstream); + } +}); + +type RewrittenUploadTicket = { + url: string; + headers: Record; +}; + +type ResumableUploadProxyCapture = { + direct?: { + auth: string; + token: string; + contentRange: string; + body: string; + }; + finalizeAuth?: string; +}; + +async function requestRewrittenUploadTicket(proxyPort: number): Promise { + const preflight = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/preflight`, { + method: 'POST', + headers: { + authorization: 'Bearer proxy-secret', + 'content-type': 'application/json', + }, + body: JSON.stringify({ + sha256: crypto.createHash('sha256').update('resumed').digest('hex'), + fileName: 'demo.apk', + sizeBytes: 7, + artifactType: 'file', + }), + }); + assert.equal(preflight.status, 200); + + const body = (await preflight.json()) as { + upload?: { url?: string; headers?: Record }; + }; + const ticket = readUploadTicket(body); + assert.match( + ticket.url, + new RegExp(`^http://127\\.0\\.0\\.1:${proxyPort}/agent-device/upload/direct/upload-1$`), + ); + assert.equal(ticket.headers.authorization, 'Bearer proxy-secret'); + assert.equal(ticket.headers['x-agent-device-token'], 'proxy-secret'); + return ticket; +} + +function readUploadTicket(body: { + upload?: { url?: string; headers?: Record }; +}): RewrittenUploadTicket { + if (!body.upload?.url) throw new Error('missing upload url'); + return { + url: body.upload.url, + headers: body.upload.headers ?? {}, + }; +} + +async function assertDirectUploadUsesDaemonToken( + ticket: RewrittenUploadTicket, + capture: ResumableUploadProxyCapture, +): Promise { + const direct = await fetch(ticket.url, { + method: 'PUT', + headers: { + ...ticket.headers, + 'content-range': 'bytes 3-6/7', + }, + body: Buffer.from('umed'), + }); + assert.equal(direct.status, 200); + assert.deepEqual(capture.direct, { + auth: 'Bearer daemon-secret', + token: 'daemon-secret', + contentRange: 'bytes 3-6/7', + body: 'umed', + }); +} + +async function assertFinalizeUsesDaemonToken( + proxyPort: number, + capture: ResumableUploadProxyCapture, +): Promise { + const finalize = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/finalize`, { + method: 'POST', + headers: { + authorization: 'Bearer proxy-secret', + 'content-type': 'application/json', + }, + body: JSON.stringify({ uploadId: 'upload-1' }), + }); + assert.equal(finalize.status, 200); + assert.deepEqual(await finalize.json(), { ok: true, uploadId: 'tracked-upload-1' }); + assert.equal(capture.finalizeAuth, 'Bearer daemon-secret'); +} + +function createResumableUploadProxyUpstream(capture: ResumableUploadProxyCapture): http.Server { + return http.createServer((req, res) => { + const route = `${req.method ?? ''} ${req.url ?? ''}`; + switch (route) { + case 'GET /health': + sendUploadProxyHealth(res); + return; + case 'POST /upload/preflight': + sendUploadProxyPreflight(res); + return; + case 'PUT /upload/direct/upload-1': + captureUploadProxyDirectRequest(req, res, capture); + return; + case 'POST /upload/finalize': + sendUploadProxyFinalize(req, res, capture); + return; + default: + res.statusCode = 404; + res.end('not found'); + } + }); +} + +function sendUploadProxyHealth(res: http.ServerResponse): void { + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true })); +} + +function sendUploadProxyPreflight(res: http.ServerResponse): void { + res.setHeader('content-type', 'application/json'); + res.end( + JSON.stringify({ + ok: true, + cacheHit: false, + uploadId: 'upload-1', + upload: { + url: 'http://127.0.0.1:65535/upload/direct/upload-1', + headers: { + authorization: 'Bearer daemon-secret', + 'x-agent-device-token': 'daemon-secret', + 'content-type': 'application/octet-stream', + }, + }, + }), + ); +} + +function captureUploadProxyDirectRequest( + req: http.IncomingMessage, + res: http.ServerResponse, + capture: ResumableUploadProxyCapture, +): void { + const direct = { + auth: String(req.headers.authorization ?? ''), + token: String(req.headers['x-agent-device-token'] ?? ''), + contentRange: String(req.headers['content-range'] ?? ''), + body: '', + }; + capture.direct = direct; + req.setEncoding('utf8'); + req.on('data', (chunk) => { + direct.body += chunk; + }); + req.on('end', () => { + res.statusCode = 200; + res.end('ok'); + }); +} + +function sendUploadProxyFinalize( + req: http.IncomingMessage, + res: http.ServerResponse, + capture: ResumableUploadProxyCapture, +): void { + capture.finalizeAuth = String(req.headers.authorization ?? ''); + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' })); +} diff --git a/src/__tests__/upload-client.test.ts b/src/__tests__/upload-client.test.ts index f6627901a..14152d40c 100644 --- a/src/__tests__/upload-client.test.ts +++ b/src/__tests__/upload-client.test.ts @@ -607,6 +607,86 @@ test('uploadArtifact resumes a direct upload from the server-reported offset', a } }); +test('uploadArtifact re-preflights and resumes after an interrupted direct upload', async () => { + const content = 'direct-upload-interrupted-payload'; + const artifactPath = createTempFile('app.apk', content); + const resumeOffset = 8; + const requests: string[] = []; + let preflightAttempts = 0; + let uploadAttempts = 0; + let resumedUploadBody = ''; + + const server = await startServer(async (req, res) => { + requests.push(`${req.method} ${req.url}`); + if (req.method === 'POST' && req.url === '/upload/preflight') { + preflightAttempts += 1; + await readRequestBody(req); + sendJson(res, { + ok: true, + cacheHit: false, + uploadId: 'resume-after-error-ticket', + upload: { + url: `${server.baseUrl}/resumable-upload-after-error`, + headers: { + 'x-signed-ticket': 'resume-after-error-ticket-header', + }, + }, + }); + return; + } + if (req.method === 'PUT' && req.url === '/resumable-upload-after-error') { + uploadAttempts += 1; + if (uploadAttempts === 1) { + req.destroy(new Error('simulated low-connectivity interruption')); + return; + } + if (uploadAttempts === 2) { + assert.equal(req.headers['content-range'], undefined); + res.statusCode = 308; + res.setHeader('x-upload-offset', String(resumeOffset)); + res.end(); + return; + } + + assert.equal( + req.headers['content-range'], + `bytes ${resumeOffset}-${Buffer.byteLength(content) - 1}/${Buffer.byteLength(content)}`, + ); + resumedUploadBody = (await readRequestBody(req)).toString('utf8'); + res.statusCode = 200; + res.end('ok'); + return; + } + if (req.method === 'POST' && req.url === '/upload/finalize') { + sendJson(res, { ok: true, uploadId: 'upload-resumed-after-error' }); + return; + } + res.statusCode = 404; + res.end('not found'); + }); + + try { + const uploadId = await uploadArtifact({ + localPath: artifactPath, + baseUrl: server.baseUrl, + token: TEST_TOKEN, + }); + assert.equal(uploadId, 'upload-resumed-after-error'); + assert.equal(preflightAttempts, 2); + assert.equal(resumedUploadBody, content.slice(resumeOffset)); + assert.deepEqual(requests, [ + 'POST /upload/preflight', + 'PUT /resumable-upload-after-error', + 'POST /upload/preflight', + 'PUT /resumable-upload-after-error', + 'PUT /resumable-upload-after-error', + 'POST /upload/finalize', + ]); + } finally { + await server.close(); + } +}); + test('uploadArtifact preflights and legacy-uploads compressed app bundle directories', async () => { const tempRoot = createTempDir(); const appPath = path.join(tempRoot, 'Sample.app'); diff --git a/src/daemon-proxy.ts b/src/daemon-proxy.ts index f1b419d6c..7713529b9 100644 --- a/src/daemon-proxy.ts +++ b/src/daemon-proxy.ts @@ -3,6 +3,7 @@ import { Readable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { randomUUID } from 'node:crypto'; import { AppError, normalizeError } from './utils/errors.ts'; +import { readNodeHttpRequestBody } from './utils/node-http.ts'; import { timingSafeStringEqual } from './utils/timing-safe-equal.ts'; import { DAEMON_HTTP_BASE_PATH, @@ -23,7 +24,14 @@ export type DaemonProxyOptions = { const DEFAULT_MAX_RPC_BODY_BYTES = 1024 * 1024; const DEFAULT_UPSTREAM_TIMEOUT_MS = 5 * 60 * 1000; const DAEMON_PROXY_PREFIX = `${DAEMON_HTTP_BASE_PATH}/`; -const FORWARDED_REQUEST_HEADERS = ['content-type', 'x-artifact-type', 'x-artifact-filename']; +const FORWARDED_REQUEST_HEADERS = [ + 'content-type', + 'content-range', + 'x-artifact-type', + 'x-artifact-filename', + 'x-artifact-hash', + 'x-artifact-hash-algorithm', +]; const FORWARDED_RESPONSE_HEADERS = ['content-type', 'content-disposition', 'x-request-id']; export function createDaemonProxyServer(options: DaemonProxyOptions): http.Server { @@ -54,7 +62,13 @@ async function handleProxyRequest( let rpcBody: string | undefined; if (route === '/rpc') { - rpcBody = (await readBodyBuffer(req, options.maxRpcBodyBytes)).toString('utf8'); + rpcBody = ( + await readNodeHttpRequestBody( + req, + options.maxRpcBodyBytes, + 'Proxy request body is too large.', + ) + ).toString('utf8'); } if (!isAuthorized(req, options.clientToken, rpcBody)) { @@ -106,15 +120,118 @@ async function forwardProxyRequest(params: { ...(body ? { body, duplex: 'half' as const } : {}), }); + await sendProxyResponse({ req, res, route, response, clientToken: options.clientToken }); +} + +async function sendProxyResponse(params: { + req: IncomingMessage; + res: ServerResponse; + route: string; + response: Response; + clientToken: string; +}): Promise { + const { req, res, route, response, clientToken } = params; res.statusCode = response.status; + copyProxyResponseHeaders(response, res); + ensureProxyRequestId(req, res); + + if (isUploadPreflightRoute(route)) { + await sendRewrittenUploadPreflightResponse({ req, res, response, clientToken }); + return; + } + + await pipeProxyResponseBody(response, res); +} + +function copyProxyResponseHeaders(response: Response, res: ServerResponse): void { for (const name of FORWARDED_RESPONSE_HEADERS) { const value = response.headers.get(name); if (value) res.setHeader(name, value); } +} + +function ensureProxyRequestId(req: IncomingMessage, res: ServerResponse): void { if (!res.hasHeader('x-request-id')) { res.setHeader('x-request-id', resolveRequestId(req)); } +} + +async function sendRewrittenUploadPreflightResponse(params: { + req: IncomingMessage; + res: ServerResponse; + response: Response; + clientToken: string; +}): Promise { + const { req, res, response, clientToken } = params; + const text = await response.text(); + res.setHeader('content-type', response.headers.get('content-type') ?? 'application/json'); + res.end(rewriteUploadPreflightResponse(text, req, clientToken)); +} +function rewriteUploadPreflightResponse( + body: string, + req: IncomingMessage, + clientToken: string, +): string { + let parsed: unknown; + try { + parsed = JSON.parse(body) as unknown; + } catch { + return body; + } + + if (!parsed || typeof parsed !== 'object') return body; + const record = parsed as { upload?: { url?: unknown; headers?: unknown } }; + if (!record.upload || typeof record.upload.url !== 'string') { + return body; + } + + const rewrittenUrl = rewriteUploadDirectUrl(record.upload.url, req); + if (!rewrittenUrl) return body; + + const headers = + record.upload.headers && typeof record.upload.headers === 'object' + ? { ...(record.upload.headers as Record) } + : {}; + Object.assign(headers, buildDaemonHttpAuthHeaders(clientToken)); + + return JSON.stringify({ + ...(parsed as Record), + upload: { + ...record.upload, + url: rewrittenUrl, + headers, + }, + }); +} + +function rewriteUploadDirectUrl(upstreamUrl: string, req: IncomingMessage): string | null { + let parsed: URL; + try { + parsed = new URL(upstreamUrl); + } catch { + return null; + } + + if (!parsed.pathname.startsWith('/upload/')) { + return null; + } + + const host = typeof req.headers.host === 'string' ? req.headers.host : ''; + if (!host) return null; + + const requestPath = new URL(req.url ?? '/', 'http://127.0.0.1').pathname; + const uploadIndex = requestPath.lastIndexOf('/upload/preflight'); + const uploadPrefix = uploadIndex >= 0 ? requestPath.slice(0, uploadIndex) : ''; + const forwardedProto = req.headers['x-forwarded-proto']; + const proto = Array.isArray(forwardedProto) ? forwardedProto[0] : forwardedProto; + const rewritten = new URL(`${proto || 'http'}://${host}`); + rewritten.pathname = `${uploadPrefix}${parsed.pathname}`; + rewritten.search = parsed.search; + return rewritten.toString(); +} + +async function pipeProxyResponseBody(response: Response, res: ServerResponse): Promise { if (!response.body) { res.end(); return; @@ -167,11 +284,23 @@ function resolveProxyRoute(requestUrl: string): string { function isSupportedDaemonRoute(route: string, method: string | undefined): boolean { if (route === '/rpc') return method === 'POST'; - if (route === '/upload') return method === 'POST'; + if (isSupportedUploadRoute(route, method)) return true; if (route.startsWith('/artifacts/')) return method === 'GET'; return false; } +function isSupportedUploadRoute(route: string, method: string | undefined): boolean { + if (route === '/upload') return method === 'POST'; + if (isUploadPreflightRoute(route)) return method === 'POST'; + if (route === '/upload/finalize') return method === 'POST'; + if (route.startsWith('/upload/direct/')) return method === 'PUT'; + return false; +} + +function isUploadPreflightRoute(route: string): boolean { + return route === '/upload/preflight'; +} + function buildUpstreamUrl(upstreamBaseUrl: string, route: string, rawUrl: string): URL { const upstreamUrl = new URL(buildDaemonHttpUrl(upstreamBaseUrl, route)); const rawSearchIndex = rawUrl.indexOf('?'); @@ -262,20 +391,6 @@ function resolveRequestId(req: IncomingMessage): string { return randomUUID(); } -async function readBodyBuffer(req: IncomingMessage, maxBodyBytes: number): Promise { - const chunks: Buffer[] = []; - let bodyBytes = 0; - for await (const chunk of req) { - const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); - bodyBytes += buffer.length; - if (bodyBytes > maxBodyBytes) { - throw new AppError('INVALID_ARGS', 'Proxy request body is too large.'); - } - chunks.push(buffer); - } - return Buffer.concat(chunks); -} - function sendUnauthorized(res: ServerResponse, route: string, rpcId: unknown): void { res.statusCode = 401; res.setHeader('content-type', 'application/json'); diff --git a/src/daemon/http-errors.ts b/src/daemon/http-errors.ts new file mode 100644 index 000000000..ace94f7fa --- /dev/null +++ b/src/daemon/http-errors.ts @@ -0,0 +1,23 @@ +import type http from 'node:http'; +import { normalizeError } from '../utils/errors.ts'; + +export type NormalizedHttpError = ReturnType; + +export function statusCodeForNormalizedError(code: string): number { + switch (code) { + case 'INVALID_ARGS': + return 400; + case 'UNAUTHORIZED': + return 401; + case 'SESSION_NOT_FOUND': + return 404; + default: + return 500; + } +} + +export function sendRestJsonError(res: http.ServerResponse, normalized: NormalizedHttpError): void { + res.statusCode = statusCodeForNormalizedError(normalized.code); + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify({ ok: false, error: normalized.message, code: normalized.code })); +} diff --git a/src/daemon/http-server.ts b/src/daemon/http-server.ts index 0fb3543fb..23198eef3 100644 --- a/src/daemon/http-server.ts +++ b/src/daemon/http-server.ts @@ -22,12 +22,7 @@ import { import path from 'node:path'; import { pathToFileURL } from 'node:url'; import { sleep } from '../utils/timeouts.ts'; -import { - cleanupDownloadableArtifact, - prepareDownloadableArtifact, - trackUploadedArtifact, -} from './artifact-tracking.ts'; -import { receiveUpload } from './upload.ts'; +import { cleanupDownloadableArtifact, prepareDownloadableArtifact } from './artifact-tracking.ts'; import { type RequestProgressEvent, withRequestProgressSink } from './request-progress.ts'; import { serializeDaemonProgressEnvelope, @@ -35,6 +30,8 @@ import { shouldStreamRequestProgress, } from './request-progress-protocol.ts'; import { buildDaemonHealthPayload } from './http-health.ts'; +import { sendRestJsonError, statusCodeForNormalizedError } from './http-errors.ts'; +import { tryHandleUploadHttpRoute } from './upload-http.ts'; type JsonRpcRequest = JsonRpcRequestEnvelope; @@ -145,19 +142,6 @@ function writeRpcResponseEnvelope( res.end(); } -function statusCodeForNormalizedError(code: string): number { - switch (code) { - case 'INVALID_ARGS': - return 400; - case 'UNAUTHORIZED': - return 401; - case 'SESSION_NOT_FOUND': - return 404; - default: - return 500; - } -} - // Map a thrown boundary error to its JSON-RPC error code. Invalid params (malformed // wire input rejected before the request reaches the handler) is JSON-RPC -32602, to // match the explicit `Invalid params` sibling checks below; everything else is the @@ -541,8 +525,21 @@ export async function createDaemonHttpServer(options: { return; } - if (req.method === 'POST' && req.url === '/upload') { - handleUpload(req, res, authHook, token); + if ( + tryHandleUploadHttpRoute({ + req, + res, + token: resolveToken({}, req.headers), + authorize: async (request) => + await authorizeAuxiliaryHttpRequest({ + req: request.req, + res: request.res, + authHook, + expectedToken: token, + daemonRequest: request.daemonRequest, + }), + }) + ) { return; } @@ -735,41 +732,6 @@ export async function createDaemonHttpServer(options: { }); } -async function handleUpload( - req: http.IncomingMessage, - res: http.ServerResponse, - authHook: HttpAuthHook | null, - expectedToken?: string, -): Promise { - try { - const auth = await authorizeAuxiliaryHttpRequest({ - req, - res, - authHook, - expectedToken, - daemonRequest: { - command: 'upload', - positionals: [], - }, - }); - if (!auth) return; - - const result = await receiveUpload(req); - const uploadId = trackUploadedArtifact({ - artifactPath: result.artifactPath, - tempDir: result.tempDir, - tenantId: auth.tenantId, - }); - - res.statusCode = 200; - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: true, uploadId })); - } catch (error) { - const normalized = normalizeError(error); - sendRestJsonError(res, normalized); - } -} - async function handleArtifactDownload( req: http.IncomingMessage, res: http.ServerResponse, @@ -902,15 +864,6 @@ async function authorizeAuxiliaryHttpRequest(params: { return { tenantId: authResult.tenantId }; } -function sendRestJsonError( - res: http.ServerResponse, - normalized: ReturnType, -): void { - res.statusCode = statusCodeForNormalizedError(normalized.code); - res.setHeader('content-type', 'application/json'); - res.end(JSON.stringify({ ok: false, error: normalized.message, code: normalized.code })); -} - function enforceDaemonToken( requestToken: string, expectedToken: string | undefined, diff --git a/src/daemon/resumable-upload.ts b/src/daemon/resumable-upload.ts new file mode 100644 index 000000000..9fca100af --- /dev/null +++ b/src/daemon/resumable-upload.ts @@ -0,0 +1,275 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs'; +import path from 'node:path'; +import type { IncomingMessage } from 'node:http'; +import { pipeline } from 'node:stream/promises'; +import { AppError } from '../utils/errors.ts'; +import { extractTarInstallableArtifact } from './artifact-archive.ts'; +import { + createArtifactTempDir, + sanitizeArtifactFilename, + validateArtifactContentLength, +} from './artifact-download.ts'; + +const RESUMABLE_UPLOAD_CLEANUP_TIMEOUT_MS = 5 * 60 * 1000; +const RESUMABLE_UPLOAD_HASH_ALGORITHM = 'sha256'; +const RESUMABLE_UPLOADS_BY_ID = new Map(); +const RESUMABLE_UPLOADS_BY_KEY = new Map(); + +type UploadArtifactType = 'file' | 'app-bundle'; + +export type BeginResumableUploadOptions = { + baseUrl: string; + tokenHeaders: Record; + sha256: string; + fileName: string; + sizeBytes: number; + artifactType: UploadArtifactType; + platform?: string; + contentType?: string; + tenantId?: string; +}; + +type ResumableUploadEntry = { + id: string; + key: string; + tempDir: string; + payloadPath: string; + fileName: string; + sizeBytes: number; + sha256: string; + artifactType: UploadArtifactType; + platform?: string; + tenantId?: string; + timer: ReturnType; +}; + +export function beginResumableUpload(options: BeginResumableUploadOptions): { + uploadId: string; + cacheHit: false; + upload: { + url: string; + headers: Record; + }; +} { + validateResumableUploadOptions(options); + const key = buildResumableUploadKey(options); + const existingId = RESUMABLE_UPLOADS_BY_KEY.get(key); + const existing = existingId ? RESUMABLE_UPLOADS_BY_ID.get(existingId) : undefined; + const entry = existing ?? createResumableUploadEntry(options, key); + refreshResumableUploadTimer(entry); + + return { + uploadId: entry.id, + cacheHit: false, + upload: { + url: new URL(`upload/direct/${entry.id}`, ensureTrailingSlash(options.baseUrl)).toString(), + headers: { + ...options.tokenHeaders, + 'content-type': options.contentType || 'application/octet-stream', + }, + }, + }; +} + +export async function receiveResumableUploadChunk(params: { + uploadId: string; + req: IncomingMessage; + tenantId?: string; +}): Promise<{ complete: boolean; offset: number }> { + const entry = requireResumableUpload(params.uploadId, params.tenantId); + const currentOffset = currentResumableUploadOffset(entry); + const contentRange = parseContentRange(params.req.headers['content-range'], entry.sizeBytes); + if (contentRange && contentRange.start !== currentOffset) { + return { complete: false, offset: currentOffset }; + } + if (!contentRange && currentOffset > 0) { + return { complete: false, offset: currentOffset }; + } + + validateArtifactContentLength(params.req.headers['content-length']); + await pipeline(params.req, fs.createWriteStream(entry.payloadPath, { flags: 'a' })); + refreshResumableUploadTimer(entry); + + const offset = currentResumableUploadOffset(entry); + return { complete: offset >= entry.sizeBytes, offset }; +} + +export async function finalizeResumableUpload( + uploadId: string, + tenantId?: string, +): Promise<{ artifactPath: string; tempDir: string }> { + const entry = requireResumableUpload(uploadId, tenantId); + const offset = currentResumableUploadOffset(entry); + if (offset !== entry.sizeBytes) { + throw new AppError('INVALID_ARGS', 'Upload is incomplete', { + uploadId, + offset, + sizeBytes: entry.sizeBytes, + }); + } + const actualHash = await computeFileHash(entry.payloadPath); + if (actualHash !== entry.sha256) { + cleanupResumableUpload(entry.id); + throw new AppError('INVALID_ARGS', 'Upload hash mismatch', { + uploadId, + expectedSha256: entry.sha256, + actualSha256: actualHash, + }); + } + + RESUMABLE_UPLOADS_BY_ID.delete(entry.id); + RESUMABLE_UPLOADS_BY_KEY.delete(entry.key); + clearTimeout(entry.timer); + + if (entry.artifactType === 'file') { + const artifactPath = path.join(entry.tempDir, entry.fileName); + fs.renameSync(entry.payloadPath, artifactPath); + return { artifactPath, tempDir: entry.tempDir }; + } + + const artifactPath = await extractTarInstallableArtifact({ + archivePath: entry.payloadPath, + tempDir: entry.tempDir, + platform: entry.platform === 'android' ? 'android' : 'ios', + expectedRootName: entry.fileName, + }); + fs.rmSync(entry.payloadPath, { force: true }); + return { artifactPath, tempDir: entry.tempDir }; +} + +function validateResumableUploadOptions(options: BeginResumableUploadOptions): void { + if (!/^[a-f0-9]{64}$/i.test(options.sha256)) { + throw new AppError('INVALID_ARGS', 'Invalid upload sha256'); + } + if (!Number.isSafeInteger(options.sizeBytes) || options.sizeBytes < 0) { + throw new AppError('INVALID_ARGS', 'Invalid upload sizeBytes'); + } + validateArtifactContentLength(String(options.sizeBytes)); + sanitizeArtifactFilename(options.fileName); +} + +function createResumableUploadEntry( + options: BeginResumableUploadOptions, + key: string, +): ResumableUploadEntry { + const id = crypto.randomUUID(); + const tempDir = createArtifactTempDir('upload'); + const entry: ResumableUploadEntry = { + id, + key, + tempDir, + payloadPath: path.join(tempDir, 'payload'), + fileName: sanitizeArtifactFilename(options.fileName), + sizeBytes: options.sizeBytes, + sha256: options.sha256.toLowerCase(), + artifactType: options.artifactType, + platform: options.platform, + tenantId: options.tenantId, + timer: setTimeout(() => cleanupResumableUpload(id), RESUMABLE_UPLOAD_CLEANUP_TIMEOUT_MS), + }; + entry.timer.unref(); + RESUMABLE_UPLOADS_BY_ID.set(id, entry); + RESUMABLE_UPLOADS_BY_KEY.set(key, id); + return entry; +} + +function refreshResumableUploadTimer(entry: ResumableUploadEntry): void { + clearTimeout(entry.timer); + entry.timer = setTimeout( + () => cleanupResumableUpload(entry.id), + RESUMABLE_UPLOAD_CLEANUP_TIMEOUT_MS, + ); + entry.timer.unref(); +} + +function cleanupResumableUpload(uploadId: string): void { + const entry = RESUMABLE_UPLOADS_BY_ID.get(uploadId); + if (!entry) return; + clearTimeout(entry.timer); + RESUMABLE_UPLOADS_BY_ID.delete(uploadId); + RESUMABLE_UPLOADS_BY_KEY.delete(entry.key); + fs.rmSync(entry.tempDir, { recursive: true, force: true }); +} + +function requireResumableUpload( + uploadId: string, + tenantId: string | undefined, +): ResumableUploadEntry { + const entry = RESUMABLE_UPLOADS_BY_ID.get(uploadId); + if (!entry) { + throw new AppError('INVALID_ARGS', `Upload not found: ${uploadId}`); + } + if (entry.tenantId && entry.tenantId !== tenantId) { + throw new AppError('UNAUTHORIZED', 'Upload belongs to a different tenant'); + } + return entry; +} + +function currentResumableUploadOffset(entry: ResumableUploadEntry): number { + if (!fs.existsSync(entry.payloadPath)) return 0; + return Math.min(fs.statSync(entry.payloadPath).size, entry.sizeBytes); +} + +function parseContentRange( + value: string | string[] | undefined, + sizeBytes: number, +): { start: number; end: number } | undefined { + const raw = Array.isArray(value) ? value[0] : value; + if (!raw) return undefined; + const range = readContentRange(raw); + if (!range || !isValidContentRange(range, sizeBytes)) { + throw new AppError('INVALID_ARGS', 'Invalid content-range header'); + } + return { start: range.start, end: range.end }; +} + +function readContentRange(raw: string): { start: number; end: number; size: number } | null { + const match = raw.match(/^bytes (\d+)-(\d+)\/(\d+)$/); + if (!match) return null; + return { + start: Number(match[1]), + end: Number(match[2]), + size: Number(match[3]), + }; +} + +function isValidContentRange( + range: { start: number; end: number; size: number }, + sizeBytes: number, +): boolean { + return ( + Number.isSafeInteger(range.start) && + Number.isSafeInteger(range.end) && + Number.isSafeInteger(range.size) && + range.start >= 0 && + range.end >= range.start && + range.size === sizeBytes + ); +} + +function buildResumableUploadKey(options: BeginResumableUploadOptions): string { + return [ + options.tenantId ?? '', + options.sha256.toLowerCase(), + String(options.sizeBytes), + sanitizeArtifactFilename(options.fileName), + options.artifactType, + options.platform ?? '', + ].join('\0'); +} + +function ensureTrailingSlash(value: string): string { + return value.endsWith('/') ? value : `${value}/`; +} + +async function computeFileHash(filePath: string): Promise { + const hash = crypto.createHash(RESUMABLE_UPLOAD_HASH_ALGORITHM); + await pipeline(fs.createReadStream(filePath), async function* (source) { + for await (const chunk of source) { + hash.update(chunk); + yield chunk; + } + }); + return hash.digest('hex'); +} diff --git a/src/daemon/upload-http.ts b/src/daemon/upload-http.ts new file mode 100644 index 000000000..29e724ac6 --- /dev/null +++ b/src/daemon/upload-http.ts @@ -0,0 +1,297 @@ +import http from 'node:http'; +import { AppError, normalizeError } from '../utils/errors.ts'; +import type { DaemonRequest } from './types.ts'; +import { trackUploadedArtifact } from './artifact-tracking.ts'; +import { + type BeginResumableUploadOptions, + beginResumableUpload, + finalizeResumableUpload, + receiveResumableUploadChunk, +} from './resumable-upload.ts'; +import { receiveUpload } from './upload.ts'; +import { sendRestJsonError } from './http-errors.ts'; +import { readNodeHttpRequestBody } from '../utils/node-http.ts'; + +const DIRECT_UPLOAD_PATH_PREFIX = '/upload/direct/'; + +type UploadHttpRoute = + | { kind: 'upload' } + | { kind: 'preflight' } + | { kind: 'direct'; uploadId: string } + | { kind: 'finalize' }; + +type UploadPreflightBody = Pick< + BeginResumableUploadOptions, + 'artifactType' | 'contentType' | 'fileName' | 'platform' | 'sha256' | 'sizeBytes' +>; + +type UploadFinalizeBody = { + uploadId: string; +}; + +type AuxiliaryHttpAuthorizer = (params: { + req: http.IncomingMessage; + res: http.ServerResponse; + daemonRequest: Pick; +}) => Promise<{ tenantId?: string } | null>; + +export function tryHandleUploadHttpRoute(params: { + req: http.IncomingMessage; + res: http.ServerResponse; + authorize: AuxiliaryHttpAuthorizer; + token: string; +}): boolean { + const { req, res, authorize, token } = params; + const route = resolveUploadHttpRoute(req); + if (route === null) return false; + + void handleUploadHttpRoute(route, req, res, authorize, token); + return true; +} + +async function handleUploadHttpRoute( + route: UploadHttpRoute, + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, + token: string, +): Promise { + switch (route.kind) { + case 'preflight': + await handleUploadPreflight(req, res, authorize, token); + return; + case 'direct': + await handleResumableUpload(route.uploadId, req, res, authorize); + return; + case 'finalize': + await handleUploadFinalize(req, res, authorize); + return; + case 'upload': + await handleUpload(req, res, authorize); + return; + } +} + +function resolveUploadHttpRoute(req: http.IncomingMessage): UploadHttpRoute | null { + if (req.method === 'POST' && req.url === '/upload/preflight') return { kind: 'preflight' }; + if (req.method === 'PUT' && req.url?.startsWith(DIRECT_UPLOAD_PATH_PREFIX)) { + return { + kind: 'direct', + uploadId: req.url.slice(DIRECT_UPLOAD_PATH_PREFIX.length).replace(/\?.*$/, ''), + }; + } + if (req.method === 'POST' && req.url === '/upload/finalize') return { kind: 'finalize' }; + if (req.method === 'POST' && req.url === '/upload') return { kind: 'upload' }; + return null; +} + +async function handleUpload( + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, +): Promise { + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: [], + }, + }); + if (!auth) return; + + sendUploadedArtifactResponse(res, await receiveUpload(req), auth.tenantId); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +async function handleUploadPreflight( + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, + token: string, +): Promise { + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: ['preflight'], + }, + }); + if (!auth) return; + + const body = await readRestJsonBody(req, 64 * 1024); + const preflight = readUploadPreflightBody(body); + const upload = beginResumableUpload({ + baseUrl: resolveHttpRequestBaseUrl(req), + tokenHeaders: buildUploadTicketAuthHeaders(token), + ...preflight, + tenantId: auth.tenantId, + }); + + sendJson(res, { ok: true, ...upload }); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +async function handleResumableUpload( + uploadId: string, + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, +): Promise { + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: ['direct', uploadId], + }, + }); + if (!auth) return; + + const result = await receiveResumableUploadChunk({ uploadId, req, tenantId: auth.tenantId }); + if (result.complete) { + res.statusCode = 200; + res.end('ok'); + return; + } + + res.statusCode = 308; + if (result.offset > 0) { + res.setHeader('range', `bytes=0-${result.offset - 1}`); + } + res.setHeader('x-upload-offset', String(result.offset)); + res.end(); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +async function handleUploadFinalize( + req: http.IncomingMessage, + res: http.ServerResponse, + authorize: AuxiliaryHttpAuthorizer, +): Promise { + try { + const auth = await authorize({ + req, + res, + daemonRequest: { + command: 'upload', + positionals: ['finalize'], + }, + }); + if (!auth) return; + + const body = await readRestJsonBody(req, 64 * 1024); + const finalize = readUploadFinalizeBody(body); + sendUploadedArtifactResponse( + res, + await finalizeResumableUpload(finalize.uploadId, auth.tenantId), + auth.tenantId, + ); + } catch (error) { + sendRestJsonError(res, normalizeError(error)); + } +} + +function sendJson(res: http.ServerResponse, body: Record): void { + res.statusCode = 200; + res.setHeader('content-type', 'application/json'); + res.end(JSON.stringify(body)); +} + +function sendUploadedArtifactResponse( + res: http.ServerResponse, + result: { artifactPath: string; tempDir: string }, + tenantId: string | undefined, +): void { + const uploadId = trackUploadedArtifact({ + artifactPath: result.artifactPath, + tempDir: result.tempDir, + tenantId, + }); + sendJson(res, { ok: true, uploadId }); +} + +async function readRestJsonBody( + req: http.IncomingMessage, + maxBodyBytes: number, +): Promise> { + const raw = ( + await readNodeHttpRequestBody(req, maxBodyBytes, 'Request body is too large.') + ).toString('utf8'); + try { + const parsed = JSON.parse(raw) as unknown; + if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { + throw new Error('expected object'); + } + return parsed as Record; + } catch (error) { + throw new AppError('INVALID_ARGS', 'Invalid JSON request body', {}, error); + } +} + +function readUploadPreflightBody(record: Record): UploadPreflightBody { + return { + sha256: readRequiredText(record, 'sha256'), + fileName: readRequiredText(record, 'fileName'), + sizeBytes: readRequiredInteger(record, 'sizeBytes'), + artifactType: readRequiredArtifactType(record), + platform: readOptionalText(record, 'platform'), + contentType: readOptionalText(record, 'contentType'), + }; +} + +function readUploadFinalizeBody(record: Record): UploadFinalizeBody { + return { + uploadId: readRequiredText(record, 'uploadId'), + }; +} + +function readRequiredText(record: Record, key: string): string { + const value = readOptionalText(record, key)?.trim(); + if (!value) throw new AppError('INVALID_ARGS', `${key} is required`); + return value; +} + +function readOptionalText(record: Record, key: string): string | undefined { + const value = record[key]; + return typeof value === 'string' ? value : undefined; +} + +function readRequiredInteger(record: Record, key: string): number { + const value = record[key]; + if (typeof value !== 'number' || !Number.isSafeInteger(value)) { + throw new AppError('INVALID_ARGS', `${key} must be an integer`); + } + return value; +} + +function readRequiredArtifactType(record: Record): 'file' | 'app-bundle' { + const value = readRequiredText(record, 'artifactType'); + if (value === 'file' || value === 'app-bundle') return value; + throw new AppError('INVALID_ARGS', 'artifactType must be "file" or "app-bundle"'); +} + +function buildUploadTicketAuthHeaders(token: string): Record { + return { + authorization: `Bearer ${token}`, + 'x-agent-device-token': token, + }; +} + +function resolveHttpRequestBaseUrl(req: http.IncomingMessage): string { + const host = typeof req.headers.host === 'string' ? req.headers.host : ''; + if (!host) throw new AppError('INVALID_ARGS', 'Missing host header'); + const forwardedProto = req.headers['x-forwarded-proto']; + const proto = Array.isArray(forwardedProto) ? forwardedProto[0] : forwardedProto; + return `${proto || 'http'}://${host}`; +} diff --git a/src/upload-client.ts b/src/upload-client.ts index d5905b5d4..a144def6b 100644 --- a/src/upload-client.ts +++ b/src/upload-client.ts @@ -77,20 +77,18 @@ export async function uploadArtifact(options: UploadArtifactOptions): Promise; +}): Promise { + const uploadOnce = async ( + preflight: Extract, + ): Promise => { + await uploadDirectArtifact(options.artifact.payloadPath, preflight); + return await finalizeDirectUpload({ + normalizedBase: options.normalizedBase, + token: options.token, + uploadId: preflight.uploadId, + }); + }; + + try { + return await uploadOnce(options.preflight); + } catch (error) { + if (!shouldRetryDirectUpload(error)) return undefined; + const retryPreflight = await requestUploadPreflight({ + normalizedBase: options.normalizedBase, + token: options.token, + artifact: options.artifact, + }); + if (retryPreflight?.kind === 'cache-hit') { + return retryPreflight.uploadId; + } + if (retryPreflight?.kind === 'direct-upload') { + try { + return await uploadOnce(retryPreflight); + } catch { + return undefined; + } + } + return undefined; + } +} + +function shouldRetryDirectUpload(error: unknown): boolean { + if (!(error instanceof AppError)) return true; + return ( + error.message === 'Failed to upload artifact with direct upload ticket' || + error.message === 'Direct artifact upload timed out' + ); +} + async function prepareUploadArtifact( localPath: string, requestedPlatform: string | undefined, diff --git a/src/utils/cli-command-overrides.ts b/src/utils/cli-command-overrides.ts index 4db492e5c..2c669ec7a 100644 --- a/src/utils/cli-command-overrides.ts +++ b/src/utils/cli-command-overrides.ts @@ -83,7 +83,7 @@ const SCHEMA_ONLY_CLI_COMMAND_SCHEMAS = { Run this on the host that has access to simulators/devices, expose the printed local proxy URL through a tunnel, then point another machine at the tunnel URL with connect proxy. -The proxy starts or reuses a local HTTP daemon, accepts /health, /rpc, /upload, and /artifacts/*, and also accepts the same routes under /agent-device/*. Health is unauthenticated for reachability probes. Other routes require the generated bearer token printed at startup, or the explicit --daemon-auth-token value when provided. The proxy rewrites authorized client requests to the upstream daemon token instead of exposing the local daemon token. +The proxy starts or reuses a local HTTP daemon, accepts /health, /rpc, /upload and resumable /upload/* routes, and /artifacts/*, and also accepts the same routes under /agent-device/*. Health is unauthenticated for reachability probes. Other routes require the generated bearer token printed at startup, or the explicit --daemon-auth-token value when provided. The proxy rewrites authorized client requests to the upstream daemon token instead of exposing the local daemon token. Use the /agent-device base path when connecting through cloudflared, ngrok, or another shared origin. Treat the bearer token as a secret; anyone with it can control the proxied daemon. This direct proxy flow does not use agent-device auth. diff --git a/src/utils/node-http.ts b/src/utils/node-http.ts index df0dc6eff..d44500fd5 100644 --- a/src/utils/node-http.ts +++ b/src/utils/node-http.ts @@ -1,4 +1,5 @@ import type { IncomingMessage } from 'node:http'; +import { AppError } from './errors.ts'; export function readNodeHttpResponseBody(res: IncomingMessage): Promise { return new Promise((resolve, reject) => { @@ -11,3 +12,21 @@ export function readNodeHttpResponseBody(res: IncomingMessage): Promise res.on('error', reject); }); } + +export async function readNodeHttpRequestBody( + req: IncomingMessage, + maxBodyBytes: number, + tooLargeMessage: string, +): Promise { + const chunks: Buffer[] = []; + let bodyBytes = 0; + for await (const chunk of req) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + bodyBytes += buffer.length; + if (bodyBytes > maxBodyBytes) { + throw new AppError('INVALID_ARGS', tooLargeMessage); + } + chunks.push(buffer); + } + return Buffer.concat(chunks); +} diff --git a/test/integration/provider-scenarios/daemon-http-server.test.ts b/test/integration/provider-scenarios/daemon-http-server.test.ts index 881a52d66..b39411e1b 100644 --- a/test/integration/provider-scenarios/daemon-http-server.test.ts +++ b/test/integration/provider-scenarios/daemon-http-server.test.ts @@ -1,11 +1,16 @@ import assert from 'node:assert/strict'; +import crypto from 'node:crypto'; import fs from 'node:fs'; import http from 'node:http'; import os from 'node:os'; import path from 'node:path'; import { test } from 'vitest'; import { AppError } from '../../../src/utils/errors.ts'; -import { trackDownloadableArtifact } from '../../../src/daemon/artifact-tracking.ts'; +import { + cleanupUploadedArtifact, + prepareUploadedArtifact, + trackDownloadableArtifact, +} from '../../../src/daemon/artifact-tracking.ts'; import { DAEMON_RPC_PROTOCOL_VERSION } from '../../../src/daemon/http-health.ts'; import { createDaemonHttpServer } from '../../../src/daemon/http-server.ts'; import { emitRequestProgress } from '../../../src/daemon/request-progress.ts'; @@ -519,6 +524,91 @@ test('Provider-backed integration daemon HTTP server accepts uploads and streams } }); +test('Provider-backed integration daemon HTTP server resumes direct uploads before finalize', async (t) => { + if (await skipWhenLoopbackUnavailable(t, 'daemon HTTP integration coverage')) { + return; + } + + const content = Buffer.from('fake-apk-resumable-content'); + const server = await createDaemonHttpServer({ + token: 'provider-scenario-token', + handleRequest: async (): Promise => ({ ok: true, data: {} }), + }); + let trackedUploadId = ''; + + try { + const port = await listenOnLoopback(server); + const preflight = await fetch(`http://127.0.0.1:${port}/upload/preflight`, { + method: 'POST', + headers: { + authorization: 'Bearer provider-scenario-token', + 'content-type': 'application/json', + }, + body: JSON.stringify({ + sha256: crypto.createHash('sha256').update(content).digest('hex'), + fileName: 'demo.apk', + sizeBytes: content.length, + artifactType: 'file', + platform: 'android', + contentType: 'application/octet-stream', + }), + }); + assert.equal(preflight.status, 200); + const preflightBody = (await preflight.json()) as { + uploadId?: string; + upload?: { url?: string; headers?: Record }; + }; + assert.equal(typeof preflightBody.uploadId, 'string'); + assert.equal(typeof preflightBody.upload?.url, 'string'); + if (!preflightBody.upload?.url) throw new Error('missing upload url'); + const uploadUrl = preflightBody.upload.url; + const uploadHeaders = preflightBody.upload.headers ?? {}; + + const firstChunk = await fetch(uploadUrl, { + method: 'PUT', + headers: uploadHeaders, + body: content.subarray(0, 9), + }); + assert.equal(firstChunk.status, 308); + assert.equal(firstChunk.headers.get('x-upload-offset'), '9'); + + const restartFromZero = await fetch(uploadUrl, { + method: 'PUT', + headers: uploadHeaders, + body: content, + }); + assert.equal(restartFromZero.status, 308); + assert.equal(restartFromZero.headers.get('x-upload-offset'), '9'); + + const resumed = await fetch(uploadUrl, { + method: 'PUT', + headers: { + ...uploadHeaders, + 'content-range': `bytes 9-${content.length - 1}/${content.length}`, + }, + body: content.subarray(9), + }); + assert.equal(resumed.status, 200); + + const finalize = await fetch(`http://127.0.0.1:${port}/upload/finalize`, { + method: 'POST', + headers: { + authorization: 'Bearer provider-scenario-token', + 'content-type': 'application/json', + }, + body: JSON.stringify({ uploadId: preflightBody.uploadId }), + }); + assert.equal(finalize.status, 200); + const finalizeBody = (await finalize.json()) as { uploadId?: string }; + trackedUploadId = finalizeBody.uploadId ?? ''; + assert.equal(typeof trackedUploadId, 'string'); + assert.deepEqual(fs.readFileSync(prepareUploadedArtifact(trackedUploadId)), content); + } finally { + if (trackedUploadId) cleanupUploadedArtifact(trackedUploadId); + await closeLoopbackServer(server); + } +}); + test('Provider-backed integration daemon HTTP auth hook can scope tenants and reject requests', async (t) => { if (await skipWhenLoopbackUnavailable(t, 'daemon HTTP integration coverage')) { return; diff --git a/website/docs/docs/remote-proxy.md b/website/docs/docs/remote-proxy.md index 11dfa4aa4..cdc222ed0 100644 --- a/website/docs/docs/remote-proxy.md +++ b/website/docs/docs/remote-proxy.md @@ -52,7 +52,7 @@ Do not commit a config file that contains a live `daemonAuthToken`. ## What Is Exposed -The proxy allows only the daemon HTTP contract: `/health`, `/rpc`, `/upload`, and `/artifacts/*`, with the same routes also available under `/agent-device/*`. Health checks are unauthenticated; command, upload, and artifact routes require the bearer token. +The proxy allows only the daemon HTTP contract: `/health`, `/rpc`, `/upload` plus resumable `/upload/*` routes, and `/artifacts/*`, with the same routes also available under `/agent-device/*`. Health checks are unauthenticated; command, upload, and artifact routes require the bearer token. The proxy validates the client token and rewrites authorized upstream requests to the local daemon token. The local daemon still validates its own token, so the daemon token is not exposed to remote clients.