Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions src/__tests__/daemon-proxy.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { test } from 'vitest';
import assert from 'node:assert/strict';
import crypto from 'node:crypto';
import http from 'node:http';
import { createDaemonProxyServer } from '../daemon-proxy.ts';
import { DAEMON_RPC_PROTOCOL_VERSION } from '../daemon/http-health.ts';
Expand Down Expand Up @@ -238,3 +239,180 @@ test('daemon proxy streams uploads and artifact downloads with upstream daemon t
await closeLoopbackServer(upstream);
}
});

test('daemon proxy forwards resumable upload routes and rewrites direct upload tickets', async (t) => {
if (await skipWhenLoopbackUnavailable(t)) return;

const capture: ResumableUploadProxyCapture = {};
const upstream = createResumableUploadProxyUpstream(capture);
const proxy = createDaemonProxyServer({
upstreamBaseUrl: `http://127.0.0.1:${await listenOnLoopback(upstream)}`,
upstreamToken: 'daemon-secret',
clientToken: 'proxy-secret',
});

try {
const proxyPort = await listenOnLoopback(proxy);
const ticket = await requestRewrittenUploadTicket(proxyPort);
await assertDirectUploadUsesDaemonToken(ticket, capture);
await assertFinalizeUsesDaemonToken(proxyPort, capture);
} finally {
await closeLoopbackServer(proxy);
await closeLoopbackServer(upstream);
}
});

type RewrittenUploadTicket = {
url: string;
headers: Record<string, string>;
};

type ResumableUploadProxyCapture = {
directAuth?: string;
directTokenHeader?: string;
directContentRange?: string;
directBody?: string;
finalizeAuth?: string;
};

async function requestRewrittenUploadTicket(proxyPort: number): Promise<RewrittenUploadTicket> {
const preflight = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/preflight`, {
method: 'POST',
headers: {
authorization: 'Bearer proxy-secret',
'content-type': 'application/json',
},
body: JSON.stringify({
sha256: crypto.createHash('sha256').update('resumed').digest('hex'),
fileName: 'demo.apk',
sizeBytes: 7,
artifactType: 'file',
}),
});
assert.equal(preflight.status, 200);

const body = (await preflight.json()) as {
upload?: { url?: string; headers?: Record<string, string> };
};
const ticket = readUploadTicket(body);
assert.match(
ticket.url,
new RegExp(`^http://127\\.0\\.0\\.1:${proxyPort}/agent-device/upload/direct/upload-1$`),
);
assert.equal(ticket.headers.authorization, 'Bearer proxy-secret');
assert.equal(ticket.headers['x-agent-device-token'], 'proxy-secret');
return ticket;
}

function readUploadTicket(body: {
upload?: { url?: string; headers?: Record<string, string> };
}): RewrittenUploadTicket {
if (!body.upload?.url) throw new Error('missing upload url');
return {
url: body.upload.url,
headers: body.upload.headers ?? {},
};
}

async function assertDirectUploadUsesDaemonToken(
ticket: RewrittenUploadTicket,
capture: ResumableUploadProxyCapture,
): Promise<void> {
const direct = await fetch(ticket.url, {
method: 'PUT',
headers: {
...ticket.headers,
'content-range': 'bytes 3-6/7',
},
body: Buffer.from('umed'),
});
assert.equal(direct.status, 200);
assert.equal(capture.directAuth, 'Bearer daemon-secret');
assert.equal(capture.directTokenHeader, 'daemon-secret');
assert.equal(capture.directContentRange, 'bytes 3-6/7');
assert.equal(capture.directBody, 'umed');
}

async function assertFinalizeUsesDaemonToken(
proxyPort: number,
capture: ResumableUploadProxyCapture,
): Promise<void> {
const finalize = await fetch(`http://127.0.0.1:${proxyPort}/agent-device/upload/finalize`, {
method: 'POST',
headers: {
authorization: 'Bearer proxy-secret',
'content-type': 'application/json',
},
body: JSON.stringify({ uploadId: 'upload-1' }),
});
assert.equal(finalize.status, 200);
assert.deepEqual(await finalize.json(), { ok: true, uploadId: 'tracked-upload-1' });
assert.equal(capture.finalizeAuth, 'Bearer daemon-secret');
}

function createResumableUploadProxyUpstream(capture: ResumableUploadProxyCapture): http.Server {
return http.createServer((req, res) => {
const route = `${req.method ?? ''} ${req.url ?? ''}`;
if (route === 'GET /health') return sendResumableProxyHealth(res);
if (route === 'POST /upload/preflight') return sendResumableProxyPreflight(res);
if (route === 'PUT /upload/direct/upload-1') {
return receiveResumableProxyDirectUpload(req, res, capture);
}
if (route === 'POST /upload/finalize') return sendResumableProxyFinalize(req, res, capture);
res.statusCode = 404;
res.end('not found');
});
}

function sendResumableProxyHealth(res: http.ServerResponse): void {
res.setHeader('content-type', 'application/json');
res.end(JSON.stringify({ ok: true }));
}

function sendResumableProxyPreflight(res: http.ServerResponse): void {
res.setHeader('content-type', 'application/json');
res.end(
JSON.stringify({
ok: true,
cacheHit: false,
uploadId: 'upload-1',
upload: {
url: 'http://127.0.0.1:65535/upload/direct/upload-1',
headers: {
authorization: 'Bearer daemon-secret',
'x-agent-device-token': 'daemon-secret',
'content-type': 'application/octet-stream',
},
},
}),
);
}

function receiveResumableProxyDirectUpload(
req: http.IncomingMessage,
res: http.ServerResponse,
capture: ResumableUploadProxyCapture,
): void {
capture.directAuth = String(req.headers.authorization ?? '');
capture.directTokenHeader = String(req.headers['x-agent-device-token'] ?? '');
capture.directContentRange = String(req.headers['content-range'] ?? '');
capture.directBody = '';
req.setEncoding('utf8');
req.on('data', (chunk) => {
capture.directBody += chunk;
});
req.on('end', () => {
res.statusCode = 200;
res.end('ok');
});
}

function sendResumableProxyFinalize(
req: http.IncomingMessage,
res: http.ServerResponse,
capture: ResumableUploadProxyCapture,
): void {
capture.finalizeAuth = String(req.headers.authorization ?? '');
res.setHeader('content-type', 'application/json');
res.end(JSON.stringify({ ok: true, uploadId: 'tracked-upload-1' }));
}
80 changes: 80 additions & 0 deletions src/__tests__/upload-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,86 @@ test('uploadArtifact resumes a direct upload from the server-reported offset', a
}
});

test('uploadArtifact re-preflights and resumes after an interrupted direct upload', async () => {
const content = 'direct-upload-interrupted-payload';
const artifactPath = createTempFile('app.apk', content);
const resumeOffset = 8;
const requests: string[] = [];
let preflightAttempts = 0;
let uploadAttempts = 0;
let resumedUploadBody = '';

const server = await startServer(async (req, res) => {
requests.push(`${req.method} ${req.url}`);
if (req.method === 'POST' && req.url === '/upload/preflight') {
preflightAttempts += 1;
await readRequestBody(req);
sendJson(res, {
ok: true,
cacheHit: false,
uploadId: 'resume-after-error-ticket',
upload: {
url: `${server.baseUrl}/resumable-upload-after-error`,
headers: {
'x-signed-ticket': 'resume-after-error-ticket-header',
},
},
});
return;
}
if (req.method === 'PUT' && req.url === '/resumable-upload-after-error') {
uploadAttempts += 1;
if (uploadAttempts === 1) {
req.destroy(new Error('simulated low-connectivity interruption'));
return;
}
if (uploadAttempts === 2) {
assert.equal(req.headers['content-range'], undefined);
res.statusCode = 308;
res.setHeader('x-upload-offset', String(resumeOffset));
res.end();
return;
}

assert.equal(
req.headers['content-range'],
`bytes ${resumeOffset}-${Buffer.byteLength(content) - 1}/${Buffer.byteLength(content)}`,
);
resumedUploadBody = (await readRequestBody(req)).toString('utf8');
res.statusCode = 200;
res.end('ok');
return;
}
if (req.method === 'POST' && req.url === '/upload/finalize') {
sendJson(res, { ok: true, uploadId: 'upload-resumed-after-error' });
return;
}
res.statusCode = 404;
res.end('not found');
});

try {
const uploadId = await uploadArtifact({
localPath: artifactPath,
baseUrl: server.baseUrl,
token: TEST_TOKEN,
});
assert.equal(uploadId, 'upload-resumed-after-error');
assert.equal(preflightAttempts, 2);
assert.equal(resumedUploadBody, content.slice(resumeOffset));
assert.deepEqual(requests, [
'POST /upload/preflight',
'PUT /resumable-upload-after-error',
'POST /upload/preflight',
'PUT /resumable-upload-after-error',
'PUT /resumable-upload-after-error',
'POST /upload/finalize',
]);
} finally {
await server.close();
}
});

test('uploadArtifact preflights and legacy-uploads compressed app bundle directories', async () => {
const tempRoot = createTempDir();
const appPath = path.join(tempRoot, 'Sample.app');
Expand Down
77 changes: 77 additions & 0 deletions src/daemon-proxy-upload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import type { IncomingMessage } from 'node:http';
import { buildDaemonHttpAuthHeaders } from './daemon/http-contract.ts';

export function isSupportedProxyUploadRoute(route: string, method: string | undefined): boolean {
if (route === '/upload') return method === 'POST';
if (route === '/upload/preflight') return method === 'POST';
if (route === '/upload/finalize') return method === 'POST';
if (route.startsWith('/upload/direct/')) return method === 'PUT';
return false;
}

export function shouldRewriteUploadProxyResponse(route: string): boolean {
return route === '/upload/preflight';
}

export function rewriteUploadPreflightResponse(
body: string,
req: IncomingMessage,
clientToken: string,
): string {
let parsed: unknown;
try {
parsed = JSON.parse(body) as unknown;
} catch {
return body;
}

if (!parsed || typeof parsed !== 'object') return body;
const record = parsed as { upload?: { url?: unknown; headers?: unknown } };
if (!record.upload || typeof record.upload.url !== 'string') {
return body;
}

const rewrittenUrl = rewriteUploadDirectUrl(record.upload.url, req);
if (!rewrittenUrl) return body;

const headers =
record.upload.headers && typeof record.upload.headers === 'object'
? { ...(record.upload.headers as Record<string, unknown>) }
: {};
Object.assign(headers, buildDaemonHttpAuthHeaders(clientToken));

return JSON.stringify({
...(parsed as Record<string, unknown>),
upload: {
...record.upload,
url: rewrittenUrl,
headers,
},
});
}

function rewriteUploadDirectUrl(upstreamUrl: string, req: IncomingMessage): string | null {
let parsed: URL;
try {
parsed = new URL(upstreamUrl);
} catch {
return null;
}

if (!parsed.pathname.startsWith('/upload/')) {
return null;
}

const host = typeof req.headers.host === 'string' ? req.headers.host : '';
if (!host) return null;

const requestPath = new URL(req.url ?? '/', 'http://127.0.0.1').pathname;
const uploadIndex = requestPath.lastIndexOf('/upload/preflight');
const uploadPrefix = uploadIndex >= 0 ? requestPath.slice(0, uploadIndex) : '';
const forwardedProto = req.headers['x-forwarded-proto'];
const proto = Array.isArray(forwardedProto) ? forwardedProto[0] : forwardedProto;
const rewritten = new URL(`${proto || 'http'}://${host}`);
rewritten.pathname = `${uploadPrefix}${parsed.pathname}`;
rewritten.search = parsed.search;
return rewritten.toString();
}
Loading
Loading