diff --git a/extensions/copilot/src/platform/workspaceChunkSearch/node/codeSearch/externalIngestClient.ts b/extensions/copilot/src/platform/workspaceChunkSearch/node/codeSearch/externalIngestClient.ts index 638bb4d7c658d3..0da4e4d2b71628 100644 --- a/extensions/copilot/src/platform/workspaceChunkSearch/node/codeSearch/externalIngestClient.ts +++ b/extensions/copilot/src/platform/workspaceChunkSearch/node/codeSearch/externalIngestClient.ts @@ -10,7 +10,7 @@ import { CancellationToken } from 'vscode-languageserver-protocol'; import { toErrorMessage } from '../../../../util/common/errorMessage'; import { Result } from '../../../../util/common/result'; import { CallTracker } from '../../../../util/common/telemetryCorrelationId'; -import { raceCancellationError } from '../../../../util/vs/base/common/async'; +import { raceCancellationError, timeout } from '../../../../util/vs/base/common/async'; import { encodeBase64, VSBuffer } from '../../../../util/vs/base/common/buffer'; import { CancellationTokenSource } from '../../../../util/vs/base/common/cancellation'; import { CancellationError, isCancellationError } from '../../../../util/vs/base/common/errors'; @@ -331,6 +331,73 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC } } + // Phase 3 + 4: Upload documents, then finalize. + const maxFinalizeAttempts = 3; + // Unique docShas the server accepted across all finalize passes. Using a set (rather + // than summing per-pass counts) prevents over-reporting `updatedFileCount` when the + // same document is re-uploaded after a finalize 412. + const uploadedDocShas = new Set(); + for (let finalizeAttempt = 0; ; finalizeAttempt++) { + const uploadedThisPass = await this.uploadDocuments(authToken, ingestId, mappings, uploadedDocShas, callTracker, token, onProgress); + + // Phase 4: Finalize + onProgress?.(l10n.t('Finalizing index...')); + try { + const resp = await this.makeRequest(authToken, 'POST', '/external/code/ingest/finalize', { + ingest_id: ingestId, + }, {}, callTracker, token); + + this.logService.info('ExternalIngestClient::performIngestion(): Successfully finalized ingest.'); + const requestId = resp.headers.get('x-github-request-id'); + const body = await resp.text(); + this.logService.debug(`requestId: '${requestId}', body: ${body}`); + break; + } catch (err) { + const isPhaseMismatch = err instanceof ExternalIngestRequestError && err.response.status === 412; + + // 412 means the server is not yet in the finalize phase. Re-poll `/batch`, + // re-upload anything still missing, back off and retry. + if (isPhaseMismatch && finalizeAttempt < maxFinalizeAttempts - 1) { + this.logService.warn(`ExternalIngestClient::performIngestion(): Finalize returned 412 (phase mismatch), re-uploading missing documents and retrying (attempt ${finalizeAttempt + 1}/${maxFinalizeAttempts})`); + onProgress?.(l10n.t('Reconciling remaining documents...')); + await timeout(1000, token); + continue; + } + + // Retries are exhausted. If the final pass uploaded nothing - i.e. the + // server reported no missing documents yet still refuses to finalize - + // surface a distinct, more diagnostic error: this is most likely a + // server-side phase issue that no amount of client retrying can resolve. + if (isPhaseMismatch && uploadedThisPass === 0) { + const requestId = err.response.headers.get(githubHeaders.requestId); + throw new Error(`External ingest finalize still reported a phase mismatch (412) after ${maxFinalizeAttempts} attempts even though the server reported no missing documents${requestId ? ` (requestId: ${requestId})` : ''}. This likely indicates a server-side issue.`); + } + + throw err; + } + } + + return Result.ok({ checkpoint: newCheckpoint, totalFileCount: mappings.size, updatedFileCount: uploadedDocShas.size }); + } + + /** + * Uploads every document the server still needs for the given ingest. + * + * Polls `/external/code/ingest/batch` for the set of documents the server is + * missing and uploads them in parallel. A fresh "seen" set is used on every + * call so that re-running this after a finalize phase mismatch re-uploads any + * documents the server is still expecting. Returns the number of documents + * uploaded during this pass. + */ + private async uploadDocuments( + authToken: string, + ingestId: string, + mappings: Map, + uploadedDocShas: Set, + callTracker: CallTracker, + token: CancellationToken, + onProgress?: (message: string) => void, + ): Promise { // Phase 3: Document upload onProgress?.(l10n.t('Uploading documents...')); this.logService.debug('ExternalIngestClient::performIngestion(): Starting document upload...'); @@ -339,6 +406,15 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC const seenDocShas = new Set(); const uploading = new Set>(); let uploaded = 0; + // Documents whose upload request ultimately failed (after the HTTP layer's own + // 5xx/rate-limit retries). These are NOT counted as uploaded and cause the pass + // to fail with a descriptive error rather than silently leaving the server in the + // upload phase (which would otherwise surface later as a confusing finalize 412). + const failedUploads: Array<{ readonly relativePath: string; readonly status?: number; readonly requestId?: string | null }> = []; + // A fatal upload error (e.g. a 404 meaning the ingest itself is gone) aborts the + // whole pass. We record it rather than rejecting the pooled upload promise, because + // an intermediate `Promise.all(uploading)` could otherwise swallow the rejection. + let fatalUploadError: ExternalIngestRequestError | undefined; const uploadStart = performance.now(); const uploadCts = new CancellationTokenSource(token); @@ -348,6 +424,12 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC throw new CancellationError(); } + // A fatal upload error (e.g. the ingest is gone) was seen on a previous + // page - stop fetching more work and drain what is in flight. + if (fatalUploadError) { + break; + } + try { await raceCancellationError(Promise.all(uploading), token); } catch (e) { @@ -382,11 +464,17 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC throw new CancellationError(); } + if (fatalUploadError) { + break; + } + seenDocShas.add(requestedDocSha); const p = (async () => { const fileEntry = mappings.get(requestedDocSha); if (!fileEntry) { - throw new Error(`No mapping for docSha: ${requestedDocSha}`); + this.logService.error(`ExternalIngestClient::performIngestion(): Server requested a docSha with no local mapping: ${requestedDocSha}`); + failedUploads.push({ relativePath: `` }); + return; } try { @@ -409,6 +497,10 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC file_path: typeof content === 'string' ? fileEntry.relativePath : '', doc_id: requestedDocSha, }, { retriesOn500: 3, retriesOnRateLimiting: 10 }, callTracker, uploadCts.token); + + // Only successful uploads are counted. + uploaded += 1; + uploadedDocShas.add(requestedDocSha); } catch (e) { if (isCancellationError(e) || isConflictError(e)) { throw e; @@ -420,20 +512,28 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC this.logService.error(`ExternalIngestClient::performIngestion(): Document upload for ${fileEntry.relativePath} failed with status: '${e.response.status}', requestId: '${requestId}'${responseBody ? `, body: ${responseBody}` : ''}`); - // If the document is not found + // A 404 means the ingest itself is gone server-side; no further + // uploads or a finalize can succeed. Record it as fatal so the pass + // aborts deterministically after draining (rather than relying on a + // rejection that an intermediate `Promise.all` could swallow). if (e.response.status === 404) { - throw new ExternalIngestRequestError(`Ingest not found (404) for document: ${fileEntry?.relativePath}`, e.response); + fatalUploadError ??= new ExternalIngestRequestError(`Ingest not found (404) for document: ${fileEntry.relativePath}`, e.response); + } else { + // Record the failure so the pass surfaces a descriptive error after + // draining, instead of swallowing it and stranding the server. + failedUploads.push({ relativePath: fileEntry.relativePath, status: e.response.status, requestId }); } } else { this.logService.error('ExternalIngestClient::performIngestion(): Error uploading document:', e); + failedUploads.push({ relativePath: fileEntry.relativePath }); } } })(); p.finally(() => { uploading.delete(p); - uploaded += 1; - if (uploaded % 10 === 0) { - const remaining = mappings.size - uploaded; + const processed = uploaded + failedUploads.length; + if (processed % 10 === 0) { + const remaining = mappings.size - processed; onProgress?.(l10n.t('Uploading documents... ({0} remaining)', remaining)); const elapsed = Math.round(performance.now() - uploadStart); const docsPerSecond = Math.round(uploaded / (elapsed / 1000)); @@ -464,18 +564,22 @@ export class ExternalIngestClient extends Disposable implements IExternalIngestC `ExternalIngestClient::performIngestion(): Uploaded ${uploaded} ingestable files in ${Math.round(performance.now() - uploadStart)}ms`, ); - // Phase 4: Finalize - onProgress?.(l10n.t('Finalizing index...')); - const resp = await this.makeRequest(authToken, 'POST', '/external/code/ingest/finalize', { - ingest_id: ingestId, - }, {}, callTracker, token); + // A fatal error (e.g. the ingest is gone) takes precedence and aborts the run. + if (fatalUploadError) { + throw fatalUploadError; + } - this.logService.info('ExternalIngestClient::performIngestion(): Successfully finalized ingest.'); - const requestId = resp.headers.get('x-github-request-id'); - const body = await resp.text(); - this.logService.debug(`requestId: '${requestId}', body: ${body}`); + // Surface a descriptive error if any document failed to upload. Doing this here - + // rather than swallowing the failure - means the run fails with the real cause + // (e.g. a 5xx on a specific document) instead of later masquerading as a finalize + // phase mismatch, and prevents the server from being stranded mid-upload. + if (failedUploads.length > 0) { + const first = failedUploads[0]; + const detail = `${first.relativePath}${first.status !== undefined ? ` (status ${first.status})` : ''}${first.requestId ? `, requestId: ${first.requestId}` : ''}`; + throw new Error(`Failed to upload ${failedUploads.length} document(s) to the external ingest service. First failure: ${detail}.`); + } - return Result.ok({ checkpoint: newCheckpoint, totalFileCount: mappings.size, updatedFileCount: uploaded }); + return uploaded; } async listFilesets(callTracker: CallTracker, token: CancellationToken): Promise { diff --git a/extensions/copilot/src/platform/workspaceChunkSearch/test/node/externalIngest.spec.ts b/extensions/copilot/src/platform/workspaceChunkSearch/test/node/externalIngest.spec.ts index 0ea503e7623058..c403c7596b5156 100644 --- a/extensions/copilot/src/platform/workspaceChunkSearch/test/node/externalIngest.spec.ts +++ b/extensions/copilot/src/platform/workspaceChunkSearch/test/node/externalIngest.spec.ts @@ -3,6 +3,7 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ +import ingestUtils = require('@github/blackbird-external-ingest-utils'); import assert from 'assert'; import { afterEach, beforeEach, suite, test, vi } from 'vitest'; import type { FileSystemWatcher } from 'vscode'; @@ -13,9 +14,13 @@ import { CancellationToken } from '../../../../util/vs/base/common/cancellation' import { DisposableStore } from '../../../../util/vs/base/common/lifecycle'; import { ResourceMap } from '../../../../util/vs/base/common/map'; import { URI } from '../../../../util/vs/base/common/uri'; +import { SyncDescriptor } from '../../../../util/vs/platform/instantiation/common/descriptors'; import { IInstantiationService } from '../../../../util/vs/platform/instantiation/common/instantiation'; +import { IAuthenticationService } from '../../../authentication/common/authentication'; +import { StaticGitHubAuthenticationService } from '../../../authentication/common/staticGitHubAuthenticationService'; import { IFileSystemService } from '../../../filesystem/common/fileSystemService'; import { FileType } from '../../../filesystem/common/fileTypes'; +import { GithubRequestOptions, IGithubApiFetcherService } from '../../../github/common/githubApiFetcherService'; import { ISearchService } from '../../../search/common/searchService'; import { createPlatformServices, TestingServiceCollection } from '../../../test/node/services'; import { IWorkspaceService, NullWorkspaceService } from '../../../workspace/common/workspaceService'; @@ -522,3 +527,249 @@ suite('ExternalIngestIndex', () => { assert.strictEqual(mockFs.countReadFileCalls(file3), 1, 'Unchanged file3 should not be re-read'); }); }); + +/** + * Mock GitHub API fetcher that drives the external ingest HTTP protocol so we can + * exercise the real {@link ExternalIngestClient} ingestion flow end-to-end. +*/ +class RecordingGithubApiFetcherService extends mock() implements IGithubApiFetcherService { + readonly requests: Array<{ method: string; path: string }> = []; + private batchPolls = 0; + private finalizeAttempts = 0; + private documentUploads = 0; + private documentFailuresRemaining: number; + + constructor( + private readonly docShaIds: string[], + private readonly finalize412Count: number, + private readonly batchPollsWithDocs = 1, + private readonly documentFailure?: { readonly count: number; readonly status: number; readonly requestId: string }, + ) { + super(); + this.documentFailuresRemaining = documentFailure?.count ?? 0; + } + + get finalizeCallCount(): number { return this.finalizeAttempts; } + get batchPollCount(): number { return this.batchPolls; } + get documentUploadCount(): number { return this.documentUploads; } + + override async makeRequest(options: GithubRequestOptions): Promise { + const path = new URL(options.url).pathname; + this.requests.push({ method: options.method, path }); + + const json = (body: unknown, status = 200, headers: Record = {}) => + new Response(JSON.stringify(body), { status, headers: { 'content-type': 'application/json', ...headers } }); + + switch (path) { + case '/external/code/ingest': + return json({ ingest_id: 'ingest-1', coded_symbol_range: { start: 0, end: 1 } }); + case '/external/code/ingest/coded_symbols': + return json({ next_coded_symbol_range: undefined }); + case '/external/code/ingest/batch': { + const docIds = this.batchPolls < this.batchPollsWithDocs ? this.docShaIds : []; + this.batchPolls++; + return json({ doc_ids: docIds, next_page_token: undefined }); + } + case '/external/code/ingest/document': + this.documentUploads++; + if (this.documentFailure && this.documentFailuresRemaining > 0) { + this.documentFailuresRemaining--; + return json({ message: 'boom' }, this.documentFailure.status, { 'x-github-request-id': this.documentFailure.requestId }); + } + return json({}); + case '/external/code/ingest/finalize': { + this.finalizeAttempts++; + if (this.finalizeAttempts <= this.finalize412Count) { + return json({ message: 'request sent for phase which was not the current phase' }, 412); + } + return json({}); + } + default: + throw new Error(`Unexpected request path: ${path}`); + } + } +} + +suite('ExternalIngestClient finalize retry', () => { + const disposables = new DisposableStore(); + + beforeEach(() => { + vi.useFakeTimers({ toFake: ['setTimeout', 'clearTimeout'] }); + }); + + afterEach(() => { + vi.useRealTimers(); + disposables.clear(); + }); + + /** + * Drives `updateIndex` to completion while flushing the fake finalize-retry backoff + * timers, so the test never waits on real wall-clock time. + */ + function runUpdateIndex(client: ExternalIngestClient, fileSet: ExternalIngestFileSet): Promise> { + const promise = client.updateIndex('fileset-1', fileSet, new CallTracker('externalIngest.spec.ts'), CancellationToken.None, emptyProgressCb); + // Run all (current and subsequently scheduled) timers, flushing microtasks between + // each, so the backoff `timeout(...)` calls resolve and the promise settles. + return vi.runAllTimersAsync().then(() => promise); + } + + function createIngestFile(relativePath: string, content: string): ExternalIngestFile { + const bytes = new TextEncoder().encode(content); + const docSha = ingestUtils.getDocSha(relativePath, new ingestUtils.DocumentContents(bytes)); + return { + uri: URI.file(`/workspace/${relativePath}`), + relativePath, + docSha, + read: async () => bytes, + }; + } + + function createClient(fetcher: IGithubApiFetcherService): ExternalIngestClient { + const testingServiceCollection = disposables.add(createPlatformServices()); + testingServiceCollection.set(IGithubApiFetcherService, fetcher); + // Provide a static GitHub token so `getAuthToken()` resolves without real + // credentials - CI has no GITHUB_PAT/GITHUB_OAUTH_TOKEN set. + testingServiceCollection.define(IAuthenticationService, new SyncDescriptor(StaticGitHubAuthenticationService, [() => 'test-github-token'])); + const accessor = disposables.add(testingServiceCollection.createTestingAccessor()); + const instantiationService = accessor.get(IInstantiationService); + return disposables.add(instantiationService.createInstance(ExternalIngestClient)); + } + + test('retries finalize after a 412 phase mismatch and re-polls for missing documents', async () => { + const file = createIngestFile('src/file1.ts', 'const x = 1;'); + const docShaId = Buffer.from(file.docSha).toString('base64'); + + // Server registers the document after the first upload, so the second batch + // poll reports nothing missing and finalize succeeds on the retry. + const fetcher = new RecordingGithubApiFetcherService([docShaId], /* finalize412Count */ 1, /* batchPollsWithDocs */ 1); + const client = createClient(fetcher); + + const fileSet: ExternalIngestFileSet = { files: [file], checkpoint: 'checkpoint-1' }; + const result = await runUpdateIndex(client, fileSet); + + assert.ok(result.isOk(), `updateIndex should succeed, got: ${result.isError() ? result.err.message : ''}`); + assert.strictEqual(fetcher.finalizeCallCount, 2, 'Finalize should be retried once after the 412'); + assert.strictEqual(fetcher.batchPollCount, 2, 'Batch should be re-polled for missing documents after the 412'); + assert.strictEqual(fetcher.documentUploadCount, 1, 'Document should be uploaded once when the server registers it after the first pass'); + }); + + test('re-uploads documents the server still reports as missing after a 412', async () => { + const file = createIngestFile('src/file1.ts', 'const x = 1;'); + const docShaId = Buffer.from(file.docSha).toString('base64'); + + // The server still reports the document as missing on the second batch poll, + // so the reconcile loop must re-upload it before finalize can succeed. + const fetcher = new RecordingGithubApiFetcherService([docShaId], /* finalize412Count */ 1, /* batchPollsWithDocs */ 2); + const client = createClient(fetcher); + + const fileSet: ExternalIngestFileSet = { files: [file], checkpoint: 'checkpoint-1' }; + const result = await runUpdateIndex(client, fileSet); + + assert.ok(result.isOk(), `updateIndex should succeed, got: ${result.isError() ? result.err.message : ''}`); + assert.strictEqual(fetcher.finalizeCallCount, 2, 'Finalize should be retried once after the 412'); + assert.strictEqual(fetcher.documentUploadCount, 2, 'The still-missing document should be uploaded again on the retry pass'); + }); + + test('updateIndex fails when finalize keeps returning 412 and documents remain missing', async () => { + const file = createIngestFile('src/file1.ts', 'const x = 1;'); + const docShaId = Buffer.from(file.docSha).toString('base64'); + + // The server keeps asking for the document on every batch poll and finalize + // always reports a phase mismatch, so the run fails after exhausting retries. + const fetcher = new RecordingGithubApiFetcherService([docShaId], /* finalize412Count */ 10, /* batchPollsWithDocs */ 10); + const client = createClient(fetcher); + + const fileSet: ExternalIngestFileSet = { files: [file], checkpoint: 'checkpoint-1' }; + const result = await runUpdateIndex(client, fileSet); + + assert.ok(result.isError(), 'updateIndex should fail after exhausting finalize retries'); + assert.strictEqual(fetcher.finalizeCallCount, 3, 'Finalize should be attempted the maximum number of times'); + }); + + test('reports a likely server-side issue when finalize 412s but no documents are missing', async () => { + const file = createIngestFile('src/file1.ts', 'const x = 1;'); + const docShaId = Buffer.from(file.docSha).toString('base64'); + + // The document is delivered on the first pass (batch reports nothing missing + // afterwards) yet finalize keeps returning 412. Since the client cannot make + // progress by re-uploading, it should surface a distinct server-side error. + const fetcher = new RecordingGithubApiFetcherService([docShaId], /* finalize412Count */ 10, /* batchPollsWithDocs */ 1); + const client = createClient(fetcher); + + const fileSet: ExternalIngestFileSet = { files: [file], checkpoint: 'checkpoint-1' }; + const result = await runUpdateIndex(client, fileSet); + + assert.ok(result.isError(), 'updateIndex should fail after exhausting finalize retries'); + assert.strictEqual(fetcher.finalizeCallCount, 3, 'Finalize should be attempted the maximum number of times'); + assert.match(result.isError() ? result.err.message : '', /no missing documents/i, 'Error should describe the server-side phase mismatch'); + }); + + test('surfaces the underlying document upload failure instead of a finalize phase error', async () => { + const file = createIngestFile('src/file1.ts', 'const x = 1;'); + const docShaId = Buffer.from(file.docSha).toString('base64'); + + // The document upload persistently fails (i.e. it has already exhausted the + // HTTP-layer retry budget). The run must fail with the real document error and + // must never reach finalize - so the user sees the true cause, not a 412. + const fetcher = new RecordingGithubApiFetcherService( + [docShaId], + /* finalize412Count */ 0, + /* batchPollsWithDocs */ 1, + { count: 100, status: 500, requestId: 'doc-req-42' }, + ); + const client = createClient(fetcher); + + const fileSet: ExternalIngestFileSet = { files: [file], checkpoint: 'checkpoint-1' }; + const result = await runUpdateIndex(client, fileSet); + + assert.ok(result.isError(), 'updateIndex should fail when a document upload fails'); + const message = result.isError() ? result.err.message : ''; + assert.match(message, /Failed to upload 1 document/i, 'Error should describe the document upload failure'); + assert.match(message, /status 500/i, 'Error should include the real failure status'); + assert.match(message, /doc-req-42/, 'Error should include the failing requestId for diagnosis'); + assert.strictEqual(fetcher.finalizeCallCount, 0, 'Finalize should never be attempted when documents failed to upload'); + }); + + test('aborts with a descriptive error when a document upload returns 404 (ingest gone)', async () => { + const file = createIngestFile('src/file1.ts', 'const x = 1;'); + const docShaId = Buffer.from(file.docSha).toString('base64'); + + // A 404 means the ingest itself is gone server-side. The pass must abort + // deterministically with the "ingest not found" error and never reach finalize. + const fetcher = new RecordingGithubApiFetcherService( + [docShaId], + /* finalize412Count */ 0, + /* batchPollsWithDocs */ 1, + { count: 100, status: 404, requestId: 'doc-404' }, + ); + const client = createClient(fetcher); + + const fileSet: ExternalIngestFileSet = { files: [file], checkpoint: 'checkpoint-1' }; + const result = await runUpdateIndex(client, fileSet); + + assert.ok(result.isError(), 'updateIndex should fail when the ingest is gone (404)'); + assert.match(result.isError() ? result.err.message : '', /Ingest not found \(404\)/i, 'Error should describe the missing ingest'); + assert.strictEqual(fetcher.finalizeCallCount, 0, 'Finalize should never be attempted after a 404'); + }); + + test('fails deterministically when the server requests a docSha with no local mapping', async () => { + const file = createIngestFile('src/file1.ts', 'const x = 1;'); + + // The server asks for a docSha the client never advertised (no local mapping). + // The pass must fail deterministically with a descriptive error and never reach + // finalize, rather than swallowing the mismatch and stranding the server. + const unmappedDocShaId = Buffer.from('doc-sha-the-client-never-advertised').toString('base64'); + const fetcher = new RecordingGithubApiFetcherService([unmappedDocShaId], /* finalize412Count */ 0, /* batchPollsWithDocs */ 1); + const client = createClient(fetcher); + + const fileSet: ExternalIngestFileSet = { files: [file], checkpoint: 'checkpoint-1' }; + const result = await runUpdateIndex(client, fileSet); + + assert.ok(result.isError(), 'updateIndex should fail when the server requests an unmapped docSha'); + const message = result.isError() ? result.err.message : ''; + assert.match(message, /Failed to upload 1 document/i, 'Error should describe the upload failure'); + assert.match(message, /unmapped docSha/i, 'Error should identify the unmapped docSha'); + assert.strictEqual(fetcher.documentUploadCount, 0, 'No document upload should be attempted for an unmapped docSha'); + assert.strictEqual(fetcher.finalizeCallCount, 0, 'Finalize should never be attempted for an unmapped docSha'); + }); +});