Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { CancellationToken } from 'vscode-languageserver-protocol';
import { toErrorMessage } from '../../../../util/common/errorMessage';
import { Result } from '../../../../util/common/result';
import { CallTracker } from '../../../../util/common/telemetryCorrelationId';
import { raceCancellationError } from '../../../../util/vs/base/common/async';
import { raceCancellationError, timeout } from '../../../../util/vs/base/common/async';
import { encodeBase64, VSBuffer } from '../../../../util/vs/base/common/buffer';
import { CancellationTokenSource } from '../../../../util/vs/base/common/cancellation';
import { CancellationError, isCancellationError } from '../../../../util/vs/base/common/errors';
Expand Down Expand Up @@ -331,6 +331,73 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC
}
}

// Phase 3 + 4: Upload documents, then finalize.
const maxFinalizeAttempts = 3;
// Unique docShas the server accepted across all finalize passes. Using a set (rather
// than summing per-pass counts) prevents over-reporting `updatedFileCount` when the
// same document is re-uploaded after a finalize 412.
const uploadedDocShas = new Set<string>();
for (let finalizeAttempt = 0; ; finalizeAttempt++) {
const uploadedThisPass = await this.uploadDocuments(authToken, ingestId, mappings, uploadedDocShas, callTracker, token, onProgress);

// Phase 4: Finalize
onProgress?.(l10n.t('Finalizing index...'));
try {
const resp = await this.makeRequest(authToken, 'POST', '/external/code/ingest/finalize', {
ingest_id: ingestId,
}, {}, callTracker, token);

this.logService.info('ExternalIngestClient::performIngestion(): Successfully finalized ingest.');
const requestId = resp.headers.get('x-github-request-id');
const body = await resp.text();
this.logService.debug(`requestId: '${requestId}', body: ${body}`);
break;
} catch (err) {
const isPhaseMismatch = err instanceof ExternalIngestRequestError && err.response.status === 412;

// 412 means the server is not yet in the finalize phase. Re-poll `/batch`,
// re-upload anything still missing, back off and retry.
if (isPhaseMismatch && finalizeAttempt < maxFinalizeAttempts - 1) {
this.logService.warn(`ExternalIngestClient::performIngestion(): Finalize returned 412 (phase mismatch), re-uploading missing documents and retrying (attempt ${finalizeAttempt + 1}/${maxFinalizeAttempts})`);
onProgress?.(l10n.t('Reconciling remaining documents...'));
await timeout(1000, token);
continue;
}

// Retries are exhausted. If the final pass uploaded nothing - i.e. the
// server reported no missing documents yet still refuses to finalize -
// surface a distinct, more diagnostic error: this is most likely a
// server-side phase issue that no amount of client retrying can resolve.
if (isPhaseMismatch && uploadedThisPass === 0) {
const requestId = err.response.headers.get(githubHeaders.requestId);
throw new Error(`External ingest finalize still reported a phase mismatch (412) after ${maxFinalizeAttempts} attempts even though the server reported no missing documents${requestId ? ` (requestId: ${requestId})` : ''}. This likely indicates a server-side issue.`);
}

throw err;
}
}

return Result.ok({ checkpoint: newCheckpoint, totalFileCount: mappings.size, updatedFileCount: uploadedDocShas.size });
}

/**
* Uploads every document the server still needs for the given ingest.
*
* Polls `/external/code/ingest/batch` for the set of documents the server is
* missing and uploads them in parallel. A fresh "seen" set is used on every
* call so that re-running this after a finalize phase mismatch re-uploads any
* documents the server is still expecting. Returns the number of documents
* uploaded during this pass.
*/
private async uploadDocuments(
authToken: string,
ingestId: string,
mappings: Map<string, ExternalIngestFile>,
uploadedDocShas: Set<string>,
callTracker: CallTracker,
token: CancellationToken,
onProgress?: (message: string) => void,
): Promise<number> {
// Phase 3: Document upload
onProgress?.(l10n.t('Uploading documents...'));
this.logService.debug('ExternalIngestClient::performIngestion(): Starting document upload...');
Expand All @@ -339,6 +406,15 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC
const seenDocShas = new Set<string>();
const uploading = new Set<Promise<void>>();
let uploaded = 0;
// Documents whose upload request ultimately failed (after the HTTP layer's own
// 5xx/rate-limit retries). These are NOT counted as uploaded and cause the pass
// to fail with a descriptive error rather than silently leaving the server in the
// upload phase (which would otherwise surface later as a confusing finalize 412).
const failedUploads: Array<{ readonly relativePath: string; readonly status?: number; readonly requestId?: string | null }> = [];
// A fatal upload error (e.g. a 404 meaning the ingest itself is gone) aborts the
// whole pass. We record it rather than rejecting the pooled upload promise, because
// an intermediate `Promise.all(uploading)` could otherwise swallow the rejection.
let fatalUploadError: ExternalIngestRequestError | undefined;
const uploadStart = performance.now();

const uploadCts = new CancellationTokenSource(token);
Expand All @@ -348,6 +424,12 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC
throw new CancellationError();
}

// A fatal upload error (e.g. the ingest is gone) was seen on a previous
// page - stop fetching more work and drain what is in flight.
if (fatalUploadError) {
break;
}

try {
await raceCancellationError(Promise.all(uploading), token);
} catch (e) {
Expand Down Expand Up @@ -382,11 +464,17 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC
throw new CancellationError();
}

if (fatalUploadError) {
break;
}

seenDocShas.add(requestedDocSha);
const p = (async () => {
const fileEntry = mappings.get(requestedDocSha);
if (!fileEntry) {
throw new Error(`No mapping for docSha: ${requestedDocSha}`);
this.logService.error(`ExternalIngestClient::performIngestion(): Server requested a docSha with no local mapping: ${requestedDocSha}`);
failedUploads.push({ relativePath: `<unmapped docSha ${requestedDocSha}>` });
return;
}

try {
Expand All @@ -409,6 +497,10 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC
file_path: typeof content === 'string' ? fileEntry.relativePath : '',
doc_id: requestedDocSha,
}, { retriesOn500: 3, retriesOnRateLimiting: 10 }, callTracker, uploadCts.token);

// Only successful uploads are counted.
uploaded += 1;
uploadedDocShas.add(requestedDocSha);
} catch (e) {
if (isCancellationError(e) || isConflictError(e)) {
throw e;
Expand All @@ -420,20 +512,28 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC

this.logService.error(`ExternalIngestClient::performIngestion(): Document upload for ${fileEntry.relativePath} failed with status: '${e.response.status}', requestId: '${requestId}'${responseBody ? `, body: ${responseBody}` : ''}`);

// If the document is not found
// A 404 means the ingest itself is gone server-side; no further
// uploads or a finalize can succeed. Record it as fatal so the pass
// aborts deterministically after draining (rather than relying on a
// rejection that an intermediate `Promise.all` could swallow).
if (e.response.status === 404) {
throw new ExternalIngestRequestError(`Ingest not found (404) for document: ${fileEntry?.relativePath}`, e.response);
fatalUploadError ??= new ExternalIngestRequestError(`Ingest not found (404) for document: ${fileEntry.relativePath}`, e.response);
} else {
// Record the failure so the pass surfaces a descriptive error after
// draining, instead of swallowing it and stranding the server.
failedUploads.push({ relativePath: fileEntry.relativePath, status: e.response.status, requestId });
}
} else {
this.logService.error('ExternalIngestClient::performIngestion(): Error uploading document:', e);
failedUploads.push({ relativePath: fileEntry.relativePath });
}
}
})();
p.finally(() => {
uploading.delete(p);
uploaded += 1;
if (uploaded % 10 === 0) {
const remaining = mappings.size - uploaded;
const processed = uploaded + failedUploads.length;
if (processed % 10 === 0) {
const remaining = mappings.size - processed;
onProgress?.(l10n.t('Uploading documents... ({0} remaining)', remaining));
const elapsed = Math.round(performance.now() - uploadStart);
const docsPerSecond = Math.round(uploaded / (elapsed / 1000));
Expand Down Expand Up @@ -464,18 +564,22 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC
`ExternalIngestClient::performIngestion(): Uploaded ${uploaded} ingestable files in ${Math.round(performance.now() - uploadStart)}ms`,
);

// Phase 4: Finalize
onProgress?.(l10n.t('Finalizing index...'));
const resp = await this.makeRequest(authToken, 'POST', '/external/code/ingest/finalize', {
ingest_id: ingestId,
}, {}, callTracker, token);
// A fatal error (e.g. the ingest is gone) takes precedence and aborts the run.
if (fatalUploadError) {
throw fatalUploadError;
}

this.logService.info('ExternalIngestClient::performIngestion(): Successfully finalized ingest.');
const requestId = resp.headers.get('x-github-request-id');
const body = await resp.text();
this.logService.debug(`requestId: '${requestId}', body: ${body}`);
// Surface a descriptive error if any document failed to upload. Doing this here -
// rather than swallowing the failure - means the run fails with the real cause
// (e.g. a 5xx on a specific document) instead of later masquerading as a finalize
// phase mismatch, and prevents the server from being stranded mid-upload.
if (failedUploads.length > 0) {
const first = failedUploads[0];
const detail = `${first.relativePath}${first.status !== undefined ? ` (status ${first.status})` : ''}${first.requestId ? `, requestId: ${first.requestId}` : ''}`;
throw new Error(`Failed to upload ${failedUploads.length} document(s) to the external ingest service. First failure: ${detail}.`);
Comment thread
bhavyaus marked this conversation as resolved.
}

return Result.ok({ checkpoint: newCheckpoint, totalFileCount: mappings.size, updatedFileCount: uploaded });
return uploaded;
}

async listFilesets(callTracker: CallTracker, token: CancellationToken): Promise<string[]> {
Expand Down
Loading
Loading