diff --git a/.changeset/kb-readiness.md b/.changeset/kb-readiness.md new file mode 100644 index 00000000..86e5c32c --- /dev/null +++ b/.changeset/kb-readiness.md @@ -0,0 +1,15 @@ +--- +"@aws-blocks/bb-knowledge-base": minor +"@aws-blocks/blocks": minor +--- + +Add `isSynced()` / `waitUntilSynced()` ingestion-sync API to KnowledgeBase. + +Bedrock ingestion runs asynchronously after deploy, so during the initial pre-sync window `retrieve()` returns an empty array even for queries that would later match — making "empty" ambiguous between "not yet synced with your latest data" and "synced, no match". The new methods resolve that ambiguity (mirroring Bedrock's own "Sync" / "sync with your latest data" terminology): + +- `isSynced(): Promise` — `true` once the data source's most recent ingestion job is `COMPLETE`; `false` while it is not yet synced with your latest data. This reports data *freshness*, not availability — `retrieve()` is always callable and serves the prior synced snapshot during a re-ingestion. Both local-folder and imported `s3://` sources register a BB-managed data source, so both are tracked (the "no managed data source → synced" shortcut applies only to deployments predating this API, which have no data source id injected). Throws a typed `IngestionFailedException` (including `failureReasons`) if the latest job failed. +- `waitUntilSynced(options?: { timeoutMs?: number; pollIntervalMs?: number; maxConsecutiveTransientErrors?: number; signal?: AbortSignal }): Promise` — polls until synced (defaults: `timeoutMs` 300000, `pollIntervalMs` 5000, `maxConsecutiveTransientErrors` 3), throwing a typed `KnowledgeBaseTimeoutException` on timeout or propagating `IngestionFailedException` on a failed job. Up to `maxConsecutiveTransientErrors` *consecutive* transient control-plane errors are tolerated (the counter resets on a clean poll); terminal errors short-circuit immediately. Transient covers both throttling / transient network failures **and** a *not-yet-visible* knowledge base — during the post-deploy window the control plane can briefly return `ResourceNotFoundException` (the freshly-created KB/data source hasn't propagated yet), which is ridden out rather than treated as terminal; a *missing-KB config* error (`KB_ID` unset) stays terminal. The poll interval carries ±20% jitter (only the delay between polls varies, never the poll count or the deadline) so many KBs don't poll in lockstep. Pass an optional `signal` (`AbortSignal`) to cancel the wait — checked before each poll and during the inter-poll delay — which rejects with the signal's abort reason (default: a `DOMException` named `'AbortError'`). + +Purely additive — `retrieve()` and all existing signatures are unchanged. The local mock reports synced immediately (no async ingestion window in local dev). + +The umbrella `@aws-blocks/blocks` package now also re-exports the new `WaitUntilSyncedOptions` type (alongside the existing `KnowledgeBase` re-exports) from both its runtime and CDK entry points, so consumers importing from `@aws-blocks/blocks` can reference it directly. diff --git a/.changeset/kb-teardown.md b/.changeset/kb-teardown.md new file mode 100644 index 00000000..7fe5b506 --- /dev/null +++ b/.changeset/kb-teardown.md @@ -0,0 +1,9 @@ +--- +"@aws-blocks/bb-knowledge-base": patch +--- + +fix(bb-knowledge-base): apply the data bucket's removal policy to the S3 Vectors resources on teardown + +On a `removalPolicy: 'destroy'` (or sandbox) teardown, the data `s3.Bucket` was force-deleted and auto-emptied, but the S3 Vectors store — the `CfnVectorBucket` + `CfnIndex` L1 resources — relied solely on its default CloudFormation `DeletionPolicy` and leaked. Those resources now mirror the data bucket: `DeletionPolicy: Delete` (via `applyRemovalPolicy(RemovalPolicy.DESTROY)`) when `destroy` is requested, and `RemovalPolicy.RETAIN` otherwise, so the vector bucket and index are dropped alongside the data bucket on a clean teardown. + +Purely additive — no exported types, signatures, or error constants changed. diff --git a/package-lock.json b/package-lock.json index 859a3d97..313e8924 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21884,6 +21884,27 @@ "node": ">=20.0.0" } }, + "node_modules/@aws-sdk/client-bedrock-agent": { + "version": "3.1075.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/client-bedrock-agent/-/client-bedrock-agent-3.1075.0.tgz", + "integrity": "sha512-CLYI9l9ub2FkNtsJNThQXoG/HJewgPCnhWI0QS9uLZOPEZKt7FoBNSOREmx9IEfI/goWDPC70sEZK/hjlmSyyg==", + "license": "Apache-2.0", + "dependencies": { + "@aws-crypto/sha256-browser": "5.2.0", + "@aws-crypto/sha256-js": "5.2.0", + "@aws-sdk/core": "^3.974.23", + "@aws-sdk/credential-provider-node": "^3.972.58", + "@aws-sdk/types": "^3.973.13", + "@smithy/core": "^3.24.6", + "@smithy/fetch-http-handler": "^5.4.6", + "@smithy/node-http-handler": "^4.7.6", + "@smithy/types": "^4.14.3", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=20.0.0" + } + }, "node_modules/@aws-sdk/client-bedrock-agent-runtime": { "version": "3.1046.0", "resolved": "https://registry.npmjs.org/@aws-sdk/client-bedrock-agent-runtime/-/client-bedrock-agent-runtime-3.1046.0.tgz", @@ -55694,6 +55715,7 @@ "dependencies": { "@aws-blocks/bb-logger": "^0.1.2", "@aws-blocks/core": "^0.1.2", + "@aws-sdk/client-bedrock-agent": "^3.0.0", "@aws-sdk/client-bedrock-agent-runtime": "^3.0.0" }, "devDependencies": { diff --git a/packages/bb-knowledge-base/API.md b/packages/bb-knowledge-base/API.md index a86e204a..80405b75 100644 --- a/packages/bb-knowledge-base/API.md +++ b/packages/bb-knowledge-base/API.md @@ -24,9 +24,11 @@ export class KnowledgeBase extends Scope { constructor(scope: ScopeParent, id: string, _options: KnowledgeBaseOptions); // (undocumented) readonly bbName = "KnowledgeBase"; + isSynced(): Promise; // @internal protected log: ChildLogger; retrieve(query: string, options?: RetrieveOptions): Promise; + waitUntilSynced(options?: WaitUntilSyncedOptions): Promise; } // @public @@ -37,6 +39,8 @@ export const KnowledgeBaseErrors: { readonly InvalidFilter: "InvalidFilterException"; readonly ValidationError: "KnowledgeBaseValidationError"; readonly BrowserNotSupported: "BrowserNotSupportedException"; + readonly IngestionFailed: "IngestionFailedException"; + readonly Timeout: "KnowledgeBaseTimeoutException"; }; // @public @@ -71,6 +75,14 @@ export interface RetrieveResult { // @public export type SourceConfig = string; +// @public +export interface WaitUntilSyncedOptions { + maxConsecutiveTransientErrors?: number; + pollIntervalMs?: number; + signal?: AbortSignal; + timeoutMs?: number; +} + // (No @packageDocumentation comment for this package) ``` diff --git a/packages/bb-knowledge-base/DESIGN.md b/packages/bb-knowledge-base/DESIGN.md index f44b5655..53dd5d44 100644 --- a/packages/bb-knowledge-base/DESIGN.md +++ b/packages/bb-knowledge-base/DESIGN.md @@ -20,6 +20,12 @@ Design document for KnowledgeBase. For usage, see [README.md](./README.md). **Rationale:** Ingestion can take minutes to hours depending on corpus size. Blocking `cdk deploy` until ingestion completes would make iterative development painful. Fire-and-forget means the deploy finishes quickly and ingestion happens in the background. The trade-off is that the knowledge base may return stale or empty results for a brief window after deploy. This is acceptable because the alternative (using a CDK `Provider` with `isComplete` polling) adds significant complexity and Lambda cold-start cost for a one-time operation. +**Resolution of the warm-up window:** The `isSynced()` / `waitUntilSynced()` sync API (see [README.md](./README.md#sync)) closes the gap left by fire-and-forget ingestion. Rather than blocking the deploy, callers poll the data source's ingestion-job status at runtime (`ListIngestionJobs` / `GetIngestionJob`) and gate on completion — keeping deploys fast while giving application code a reliable "is the KB synced with my latest data yet?" signal. `COMPLETE` → synced, `FAILED` → throws `IngestionFailedException`, anything else (or no jobs yet) → not synced. This tracks *freshness*, not availability: `retrieve()` is always callable and serves the prior snapshot during a re-ingestion (it returns empty only during the initial pre-sync window, before the first ingestion completes). So `isSynced() === false` means "not yet synced with your latest data," never "unavailable." + +**Embedding-propagation lag after `COMPLETE`:** `isSynced() === true` means the ingestion *job* reached `COMPLETE`. Per the Bedrock docs, for non-Aurora vector stores — and this BB uses S3 Vectors — newly-written embeddings can take a few minutes after `COMPLETE` before they are fully queryable. So `isSynced()` signals "the ingestion job finished," with a possible short embedding-propagation lag before the freshest chunks surface in `retrieve()` results. + +**Source coverage (folder and imported `s3://`):** Both a local-folder source and an imported `s3://` URI create a BB-managed `CfnDataSource` and register its `DATA_SOURCE_ID` unconditionally, so the sync API tracks the ingestion job for either source type — `isSynced()` / `waitUntilSynced()` reflect that data source's most recent ingestion job in both cases. (For an `s3://` source the construct skips the `BucketDeployment` step, since the documents are expected to already be in the bucket, but it still creates the data source and fires the ingestion job — so sync is tracked the same way.) The only case with nothing to track is a deployment that predates this sync API: such a handler has no `DATA_SOURCE_ID` injected, so `isSynced()` returns `true` immediately (treating "no managed data source" as synced). Re-deploying injects the id and enables sync tracking. + ### D-KB-3: Semantic chunking as default strategy **Decision:** Default chunking strategy is `'semantic'` (breakpoint-based topic detection), not fixed-size. @@ -56,13 +62,45 @@ Design document for KnowledgeBase. For usage, see [README.md](./README.md). **Rationale:** KnowledgeBase requires Bedrock API access (AWS runtime) or filesystem reads (mock). Neither is available in the browser. Throwing at construction — not at `retrieve()` time — gives developers an immediate, clear error message guiding them to use server actions, API routes, or Lambda handlers. This follows the same pattern as other server-only Building Blocks. +The method stubs are consistent with construction: `retrieve()`, `isSynced()`, and `waitUntilSynced()` **all** throw `BrowserNotSupportedException` as well (none silently no-ops or returns a fake "synced"). So the browser layer's sync contract matches `retrieve()` — completing the picture across all three layers: mock is always synced (`isSynced()` returns `true`, `waitUntilSynced()` resolves immediately; see the table below), AWS polls the real ingestion job, and browser throws for the data *and* sync methods alike. + +### D-KB-9: Raw `s3.Bucket` for the data bucket (not the `FileBucket` Building Block) + +**Decision:** Provision the data bucket with a raw `aws-cdk-lib/aws-s3` `s3.Bucket` rather than the `FileBucket` Building Block, even though `FileBucket` exists for "an app needs an S3 bucket" use cases. + +**Rationale:** Bedrock ingestion assumes an IAM role that must **read** the data bucket, and the Knowledge Base / Data Source wiring needs low-level bucket primitives that `FileBucket` intentionally does not expose: + +- **`bucketArn`** — fed verbatim into `CfnDataSource.s3Configuration.bucketArn`. +- **`grantRead(role)`** — grants the Bedrock service-principal role read access with the exact resource scoping CDK generates. +- **`enforceSSL: true`** — required posture for the bucket policy. +- **`PhysicalName.GENERATE_IF_NEEDED`** — a CDK-generated name so the bucket can be referenced cross-construct (and, for an imported `s3://` source, swapped for `Bucket.fromBucketName`) without the caller having to name it. + +`FileBucket` is a higher-level, app-facing abstraction (presigned uploads, client access patterns) and does not surface these primitives. Reaching for the raw L2 here keeps the Bedrock IAM grant precise and avoids bending `FileBucket` into an infrastructure role it was not designed for. + +### D-KB-10: S3 Vectors resources mirror the data bucket's removal policy + +**Decision:** Apply a removal policy to the S3 Vectors L1 resources (`s3vectors.CfnVectorBucket` + `s3vectors.CfnIndex`), computed from the **same** `destroy` signal that drives the data bucket. + +**Rationale:** Unlike the L2 `s3.Bucket` — which defaults to `RETAIN` and supports `autoDeleteObjects` — these L1 resources rely solely on their CloudFormation `DeletionPolicy`, whose default is `Delete`. Left unmanaged they are inconsistent with the data bucket on teardown. So: + +- `removalPolicy: 'destroy'` (or sandbox mode with no explicit policy) → `RemovalPolicy.DESTROY` → `DeletionPolicy: Delete`. The vector bucket + index are dropped alongside the (auto-emptied) data bucket. +- otherwise → `RemovalPolicy.RETAIN` → `DeletionPolicy: Retain`, matching the data bucket's `RETAIN`-by-default posture so customer data is never silently destroyed. + +`applyRemovalPolicy()` sets both `DeletionPolicy` and `UpdateReplacePolicy`. There is no `autoDeleteObjects` equivalent for S3 Vectors, but a vector bucket deleted by CloudFormation is removed with its contents, so no manual emptying step is needed for the vector store. + +### D-KB-11: Teardown caveat — the stack-level `RemovalPolicies` aspect cannot auto-empty the data bucket + +**Decision:** For a clean teardown, pass `removalPolicy: 'destroy'` to the KnowledgeBase (or run in sandbox mode) rather than relying solely on a stack-level `RemovalPolicies.of(stack).destroy()` aspect. + +**Rationale:** The stack-level aspect flips every resource's `DeletionPolicy` to `Delete`, **but it cannot enable `autoDeleteObjects`** on a bucket — `autoDeleteObjects` is a constructor behavior (it provisions a custom resource + Lambda that empties the bucket on delete), not a CloudFormation attribute an aspect can toggle after the fact. Consequence: if you rely solely on the stack-level aspect and do **not** pass `removalPolicy: 'destroy'` to the KnowledgeBase, the data bucket's `DeletionPolicy` becomes `Delete` but it still has objects in it, so CloudFormation's `DELETE` fails with `BucketNotEmpty` and the teardown stalls. Passing `removalPolicy: 'destroy'` (or running in sandbox mode) pairs `RemovalPolicy.DESTROY` with `autoDeleteObjects` on the data bucket and `DeletionPolicy: Delete` on the S3 Vectors resources (see D-KB-10), so the bucket is emptied and every resource is removed without manual intervention. + ## Infrastructure (CDK) Creates the following resources: 1. **S3 Data Bucket** — Stores source documents. Created new for local folder sources; imported via `Bucket.fromBucketName` for `s3://` URI sources. Block public access enabled, SSE-S3 encryption. Removal policy defaults to CDK's default (`RETAIN`) — the bucket and its documents are preserved on `cdk destroy` — unless `removalPolicy: 'destroy'` is set or the stack is in sandbox mode (`sandboxMode` context), in which case it becomes `DESTROY` with `autoDeleteObjects` enabled. -2. **S3 Vectors VectorBucket + Index** — Serverless vector store for embeddings. Index configured with `float32` data type, cosine distance metric, and configurable dimensions (default 1024). `AMAZON_BEDROCK_TEXT` and `AMAZON_BEDROCK_METADATA` marked as non-filterable metadata keys. +2. **S3 Vectors VectorBucket + Index** — Serverless vector store for embeddings. Index configured with `float32` data type, cosine distance metric, and configurable dimensions (default 1024). `AMAZON_BEDROCK_TEXT` and `AMAZON_BEDROCK_METADATA` marked as non-filterable metadata keys. On teardown these two L1 resources now mirror the data bucket's removal policy — `DeletionPolicy: Delete` when `removalPolicy: 'destroy'` (or sandbox mode), `Retain` otherwise — so they are dropped alongside the data bucket instead of being left behind (see D-KB-10). 3. **IAM Role** — Assumed by `bedrock.amazonaws.com` (scoped via `aws:SourceAccount`). Grants: S3 read on data bucket, S3 Vectors CRUD on vector bucket/index, `bedrock:InvokeModel` on Titan V2 (both inference profile and foundation model ARNs). @@ -74,8 +112,8 @@ Creates the following resources: 7. **AwsCustomResource (StartIngestionJob)** — Fires `bedrock:StartIngestionJob` on Create/Update. Ingestion runs asynchronously. Depends on both the data source and bucket deployment (when present) so documents are in S3 before ingestion starts. -**Environment variables injected:** `BLOCKS_{FULLID}_KB_ID` -**IAM grants to handler:** `bedrock:Retrieve` on the knowledge base ARN +**Handler config** (registered via `registerConfig`, surfaced to the runtime as env vars): `BLOCKS_{FULLID}_KB_ID`, `BLOCKS_{FULLID}_DATA_SOURCE_ID` (the data source id drives the `isSynced()` / `waitUntilSynced()` sync checks) +**IAM grants to handler:** `bedrock:Retrieve`, `bedrock:GetIngestionJob`, `bedrock:ListIngestionJobs` on the knowledge base ARN (the ingestion-job actions back the sync checks; the data source and its ingestion jobs are sub-resources of the KB ARN) ## Mock Implementation @@ -98,3 +136,4 @@ Creates the following resources: | No ingestion pipeline | Documents are indexed synchronously on first `retrieve()` | No mitigation — the mock doesn't need async ingestion. First call may be slower due to indexing | | No IAM enforcement | Permission errors only surface in AWS | No mitigation — IAM is handled by CDK grants automatically | | Immediate consistency | New documents appear instantly vs async ingestion in AWS | No mitigation — eventual consistency in AWS is inherent to the Bedrock ingestion pipeline | +| Unconditional mock sync | `isSynced()` always returns `true` (and `waitUntilSynced()` resolves immediately) — even for an `s3://` source that `retrieve()` rejects with `InvalidSourceConfigException`. Local sync state is therefore NOT a proxy for a working local `retrieve()` on `s3://` sources — the inverse of the production contract, where `isSynced() === true` implies `retrieve()` is queryable | No mitigation — local has no async ingestion to wait on, so sync is a no-op. `s3://` sources require AWS infrastructure; validate them in sandbox/production where sync state genuinely reflects queryability | diff --git a/packages/bb-knowledge-base/README.md b/packages/bb-knowledge-base/README.md index 139e910e..9f102d46 100644 --- a/packages/bb-knowledge-base/README.md +++ b/packages/bb-knowledge-base/README.md @@ -38,6 +38,8 @@ const kb = new KnowledgeBase(scope, id, options) | Method | Returns | Description | |--------|---------|-------------| | `retrieve(query, options?)` | `Promise` | Search for relevant document chunks. Returns results ranked by relevance score. | +| `isSynced()` | `Promise` | Whether the KB is synced with your latest data. `true` once the latest ingestion job is `COMPLETE` (or there is no BB-managed data source to track). Reports data *freshness*, not availability — `retrieve()` is always callable and serves the prior snapshot while a re-ingestion is in flight. Throws `IngestionFailed` if the latest job failed. | +| `waitUntilSynced(options?)` | `Promise` | Poll `isSynced()` until the KB is synced with your latest data or the timeout elapses. Throws `Timeout` if it does not sync in time. Accepts an optional `AbortSignal` to cancel the wait. | ### Options @@ -103,6 +105,30 @@ chunking: { strategy: 'fixed', chunkSize: 500, chunkOverlap: 10 } | `source` | `string` | Source document path or URL. | | `metadata` | `Record` | Document metadata. Includes auto-populated `folder` from subfolders. | +### Sync + +Bedrock ingestion runs asynchronously after deploy, so immediately after `cdk deploy` the knowledge base is not yet synced with your latest data — during that initial pre-sync window `retrieve()` returns an empty array even for queries that will later match. (Once at least one ingestion job has completed, `retrieve()` always serves the most recent synced snapshot, even while a later re-ingestion is in flight.) Use `isSynced()` / `waitUntilSynced()` to gate on ingestion completion: + +```typescript +// Block until the KB is synced with your latest data (e.g. right after deploy), then query +await kb.waitUntilSynced({ timeoutMs: 600_000 }); +const results = await kb.retrieve('getting started'); + +// Or check without blocking +if (await kb.isSynced()) { + const results = await kb.retrieve('getting started'); +} + +// Cancel the wait with an AbortSignal (e.g. an overall request deadline) +await kb.waitUntilSynced({ signal: AbortSignal.timeout(120_000) }); +``` + +`waitUntilSynced(options?)` accepts `timeoutMs` (default `300_000`), `pollIntervalMs` (default `5_000`, clamped to a 1ms minimum), `maxConsecutiveTransientErrors` (default `3`, minimum `0`), and an optional `signal` (`AbortSignal`). The poll interval carries a small amount of random jitter (±20%) so that many knowledge bases polling after a shared deploy don't fall into lockstep — the jitter only varies the delay *between* polls and never pushes a sleep past `timeoutMs`. + +`maxConsecutiveTransientErrors` is the number of *consecutive* transient control-plane errors tolerated before giving up; the counter resets on any clean poll. Two conditions are treated as transient and ridden out: throttling / transient network failures, **and** a *not-yet-visible* knowledge base — in the post-deploy window the control plane can briefly return `ResourceNotFoundException` (the freshly-created KB or data source hasn't propagated yet), which `waitUntilSynced()` absorbs rather than giving up on. Terminal errors always short-circuit immediately regardless of the limit: a `FAILED` ingestion job, and a *missing-KB config* error (the `KB_ID` env var is unset — distinct from the transient not-yet-visible case). When `signal` is provided, the wait is cancelled promptly (checked before each poll and during the inter-poll delay), rejecting with the signal's abort reason (by default a `DOMException` named `'AbortError'`). + +Both local-folder and imported `s3://` sources register a BB-managed data source, so sync state reflects that data source's ingestion job in either case. (A deployment predating this sync API has no data source id injected, so `isSynced()` returns `true` immediately — there is nothing to track.) This pre-feature deployment is the **only** case where `isSynced()` returns `true` without consulting an actual ingestion job — re-deploying injects `DATA_SOURCE_ID` and restores real tracking, so a freshly deployed KB always reflects a real job status (don't mistake the "nothing to track" shortcut for "ingestion confirmed complete" when gating live traffic). In local development the mock is always synced. Note that a local `isSynced()` of `true` does **not** imply `retrieve()` works for an `s3://` source — the mock rejects `s3://` with `InvalidSourceConfigException` (the inverse of the production contract), so validate `s3://` sources in sandbox/production where sync state genuinely reflects queryability. + ## Metadata Filtering Filter results by document metadata. All conditions use AND semantics: @@ -147,6 +173,8 @@ try { |---|---|---| | `KnowledgeBaseErrors.RetrievalFailed` | `RetrievalFailedException` | Bedrock retrieval call failed | | `KnowledgeBaseErrors.NotReady` | `KnowledgeBaseNotReadyException` | KB not deployed or env vars missing | +| `KnowledgeBaseErrors.IngestionFailed` | `IngestionFailedException` | The most recent ingestion job failed (message includes `failureReasons`) — thrown by `isSynced()` / `waitUntilSynced()` | +| `KnowledgeBaseErrors.Timeout` | `KnowledgeBaseTimeoutException` | `waitUntilSynced()` exceeded its timeout before ingestion completed | | `KnowledgeBaseErrors.InvalidSource` | `InvalidSourceConfigException` | Source folder not found or invalid config | | `KnowledgeBaseErrors.InvalidFilter` | `InvalidFilterException` | Invalid filter keys in Bedrock query | | `KnowledgeBaseErrors.ValidationError` | `KnowledgeBaseValidationError` | Empty or invalid query | @@ -154,7 +182,7 @@ try { ## Deploy Behavior -`cdk deploy` automatically triggers document ingestion (fire-and-forget). Ingestion runs asynchronously after the deploy completes. Check the AWS console to monitor ingestion progress. +`cdk deploy` automatically triggers document ingestion (fire-and-forget). Ingestion runs asynchronously after the deploy completes. Check the AWS console to monitor ingestion progress, or call [`isSynced()` / `waitUntilSynced()`](#sync) from your code to gate queries on ingestion completion. ## Scaling & Cost (AWS) diff --git a/packages/bb-knowledge-base/package.json b/packages/bb-knowledge-base/package.json index be9b3bbf..f55ea5e9 100644 --- a/packages/bb-knowledge-base/package.json +++ b/packages/bb-knowledge-base/package.json @@ -31,6 +31,7 @@ "dependencies": { "@aws-blocks/bb-logger": "^0.1.2", "@aws-blocks/core": "^0.1.2", + "@aws-sdk/client-bedrock-agent": "^3.0.0", "@aws-sdk/client-bedrock-agent-runtime": "^3.0.0" }, "devDependencies": { diff --git a/packages/bb-knowledge-base/src/errors.ts b/packages/bb-knowledge-base/src/errors.ts index 441de13a..7859dfb6 100644 --- a/packages/bb-knowledge-base/src/errors.ts +++ b/packages/bb-knowledge-base/src/errors.ts @@ -28,4 +28,8 @@ export const KnowledgeBaseErrors = { ValidationError: 'KnowledgeBaseValidationError', /** KnowledgeBase is server-side only and cannot be used in browser contexts. */ BrowserNotSupported: 'BrowserNotSupportedException', + /** The data source's most recent Bedrock ingestion job failed. The error message includes the reported `failureReasons`. */ + IngestionFailed: 'IngestionFailedException', + /** `waitUntilSynced()` exceeded its timeout before the knowledge base finished ingesting. */ + Timeout: 'KnowledgeBaseTimeoutException', } as const; diff --git a/packages/bb-knowledge-base/src/index.aws.test.ts b/packages/bb-knowledge-base/src/index.aws.test.ts index 39310a0b..63a76640 100644 --- a/packages/bb-knowledge-base/src/index.aws.test.ts +++ b/packages/bb-knowledge-base/src/index.aws.test.ts @@ -4,6 +4,7 @@ import { test, describe, mock, afterEach } from 'node:test'; import assert from 'node:assert'; import { BedrockAgentRuntimeClient } from '@aws-sdk/client-bedrock-agent-runtime'; +import { BedrockAgentClient } from '@aws-sdk/client-bedrock-agent'; import { KnowledgeBaseErrors, KnowledgeBase } from './index.aws.js'; // ── SDK mock helpers ─────────────────────────────────────────────────────── @@ -12,6 +13,11 @@ function mockRuntimeSend(fn: (cmd: unknown) => unknown) { return mock.method(BedrockAgentRuntimeClient.prototype, 'send', fn); } +// Control-plane client used by isSynced()/waitUntilSynced(). +function mockAgentSend(fn: (cmd: { constructor: { name: string }; input: any }) => unknown) { + return mock.method(BedrockAgentClient.prototype, 'send', fn as (cmd: unknown) => unknown); +} + afterEach(() => { try { mock.restoreAll(); } catch {} }); @@ -24,6 +30,23 @@ function setKbEnv(scopeId: string, instanceId: string, kbId = 'kb-test-123') { }; } +// Sets KB_ID and (unless dataSourceId is null) DATA_SOURCE_ID, mirroring the +// two config values the CDK layer registers. Used by the sync tests. +function setSyncEnv( + scopeId: string, + instanceId: string, + opts: { kbId?: string; dataSourceId?: string | null } = {}, +) { + const { kbId = 'kb-test-123', dataSourceId = 'ds-test-123' } = opts; + const prefix = `BLOCKS_${scopeId}_${instanceId}`.toUpperCase().replace(/[^A-Z0-9]/g, '_'); + process.env[`${prefix}_KB_ID`] = kbId; + if (dataSourceId !== null) process.env[`${prefix}_DATA_SOURCE_ID`] = dataSourceId; + return () => { + delete process.env[`${prefix}_KB_ID`]; + delete process.env[`${prefix}_DATA_SOURCE_ID`]; + }; +} + // ── Constructor validation ───────────────────────────────────────────────── describe('KnowledgeBase constructor validation', () => { @@ -520,3 +543,621 @@ describe('error classification — other SDK exceptions', () => { } }); }); + +// ── Sync — isSynced() ──────────────────────────────────────────────────────── +// +// Ingestion runs asynchronously after deploy, so isSynced() inspects the data +// source's most recent ingestion job: COMPLETE → synced, FAILED → throws, +// anything else (or no jobs / no data source) → not-synced (or synced when there +// is nothing to track). + +describe('isSynced', () => { + test('returns true when the latest ingestion job is COMPLETE', async () => { + const cleanup = setSyncEnv('TEST', 'RDY1'); + mockAgentSend((cmd) => { + assert.strictEqual(cmd.constructor.name, 'ListIngestionJobsCommand'); + return { ingestionJobSummaries: [{ ingestionJobId: 'job-1', status: 'COMPLETE' }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy1', { source: './knowledge' }); + assert.strictEqual(await kb.isSynced(), true); + } finally { + cleanup(); + } + }); + + test('returns false when the latest ingestion job is IN_PROGRESS', async () => { + const cleanup = setSyncEnv('TEST', 'RDY2'); + mockAgentSend(() => ({ ingestionJobSummaries: [{ ingestionJobId: 'job-1', status: 'IN_PROGRESS' }] })); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy2', { source: './knowledge' }); + assert.strictEqual(await kb.isSynced(), false); + } finally { + cleanup(); + } + }); + + test('returns false when no ingestion jobs exist yet (empty list)', async () => { + const cleanup = setSyncEnv('TEST', 'RDY3'); + mockAgentSend(() => ({ ingestionJobSummaries: [] })); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy3', { source: './knowledge' }); + assert.strictEqual(await kb.isSynced(), false); + } finally { + cleanup(); + } + }); + + test('returns false when ingestionJobSummaries is undefined', async () => { + const cleanup = setSyncEnv('TEST', 'RDY3B'); + mockAgentSend(() => ({})); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy3b', { source: './knowledge' }); + assert.strictEqual(await kb.isSynced(), false); + } finally { + cleanup(); + } + }); + + test('returns true (without calling the control plane) when no data source id is configured', async () => { + // A deployment that predates the sync API: KB_ID present, but no + // DATA_SOURCE_ID was injected, so there is no ingestion job to track. (The + // CDK layer now always registers a DATA_SOURCE_ID for both folder and + // imported s3:// sources — see DESIGN.md, the "Source coverage (folder and + // imported s3://)" note — so this is purely the pre-feature case, not a + // source-type distinction.) + const cleanup = setSyncEnv('TEST', 'RDY4', { dataSourceId: null }); + let sendCalled = false; + mockAgentSend(() => { + sendCalled = true; + return { ingestionJobSummaries: [] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy4', { source: './knowledge' }); + assert.strictEqual(await kb.isSynced(), true); + assert.strictEqual(sendCalled, false, 'should not query the control plane when there is no data source to track'); + } finally { + cleanup(); + } + }); + + test('throws NotReady when KB_ID env var is not set', async () => { + const prefix = 'BLOCKS_TEST_RDY5'; + const orig = process.env[`${prefix}_KB_ID`]; + delete process.env[`${prefix}_KB_ID`]; + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy5', { source: './knowledge' }); + await assert.rejects( + () => kb.isSynced(), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.NotReady); + return true; + }, + ); + } finally { + if (orig !== undefined) process.env[`${prefix}_KB_ID`] = orig; + } + }); + + test('throws IngestionFailed (with failureReasons) when the latest job FAILED', async () => { + const cleanup = setSyncEnv('TEST', 'RDY6'); + mockAgentSend((cmd) => { + if (cmd.constructor.name === 'ListIngestionJobsCommand') { + return { ingestionJobSummaries: [{ ingestionJobId: 'job-x', status: 'FAILED' }] }; + } + // GetIngestionJobCommand → failure detail + return { ingestionJob: { status: 'FAILED', failureReasons: ['boom one', 'boom two'] } }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy6', { source: './knowledge' }); + await assert.rejects( + () => kb.isSynced(), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.IngestionFailed); + assert.ok(err.message.includes('boom one'), 'message should include failure reasons'); + assert.ok(err.message.includes('boom two')); + return true; + }, + ); + } finally { + cleanup(); + } + }); + + test('queries ListIngestionJobs with the configured ids, sorted by STARTED_AT desc, maxResults 1', async () => { + const cleanup = setSyncEnv('TEST', 'RDY7', { kbId: 'kb-aaa', dataSourceId: 'ds-bbb' }); + let captured: any; + mockAgentSend((cmd) => { + captured = cmd.input; + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'COMPLETE' }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy7', { source: './knowledge' }); + await kb.isSynced(); + assert.strictEqual(captured.knowledgeBaseId, 'kb-aaa'); + assert.strictEqual(captured.dataSourceId, 'ds-bbb'); + assert.strictEqual(captured.maxResults, 1); + assert.strictEqual(captured.sortBy.attribute, 'STARTED_AT'); + assert.strictEqual(captured.sortBy.order, 'DESCENDING'); + } finally { + cleanup(); + } + }); + + test('maps control-plane ResourceNotFoundException to NotReady', async () => { + const cleanup = setSyncEnv('TEST', 'RDY8'); + const err = new Error('No knowledge base with ID kb-test-123 exists'); + err.name = 'ResourceNotFoundException'; + mockAgentSend(() => { throw err; }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'rdy8', { source: './knowledge' }); + await assert.rejects( + () => kb.isSynced(), + (e: Error) => { + assert.strictEqual(e.name, KnowledgeBaseErrors.NotReady); + return true; + }, + ); + } finally { + cleanup(); + } + }); +}); + +// ── Sync — waitUntilSynced() ───────────────────────────────────────────────── + +describe('waitUntilSynced', () => { + test('resolves immediately when ingestion is already COMPLETE', async () => { + const cleanup = setSyncEnv('TEST', 'WUR1'); + mockAgentSend(() => ({ ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'COMPLETE' }] })); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur1', { source: './knowledge' }); + await kb.waitUntilSynced({ timeoutMs: 1000, pollIntervalMs: 10 }); + } finally { + cleanup(); + } + }); + + test('polls until the ingestion job becomes COMPLETE', async () => { + const cleanup = setSyncEnv('TEST', 'WUR2'); + let calls = 0; + mockAgentSend(() => { + calls += 1; + const status = calls < 3 ? 'IN_PROGRESS' : 'COMPLETE'; + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur2', { source: './knowledge' }); + await kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 5 }); + assert.ok(calls >= 3, `expected at least 3 polls before COMPLETE, got ${calls}`); + } finally { + cleanup(); + } + }); + + test('throws IngestionFailed (with failureReasons) when ingestion FAILED', async () => { + const cleanup = setSyncEnv('TEST', 'WUR3'); + mockAgentSend((cmd) => { + if (cmd.constructor.name === 'ListIngestionJobsCommand') { + return { ingestionJobSummaries: [{ ingestionJobId: 'job-fail', status: 'FAILED' }] }; + } + return { ingestionJob: { status: 'FAILED', failureReasons: ['S3 access denied'] } }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur3', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 1000, pollIntervalMs: 10 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.IngestionFailed); + assert.ok(err.message.includes('S3 access denied'), 'should surface failure reasons'); + return true; + }, + ); + } finally { + cleanup(); + } + }); + + test('throws Timeout when the job never completes within the budget', async () => { + const cleanup = setSyncEnv('TEST', 'WUR4'); + mockAgentSend(() => ({ ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'IN_PROGRESS' }] })); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur4', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 30, pollIntervalMs: 5 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.Timeout); + assert.ok(err.message.includes('30ms'), 'timeout message should include the budget'); + // Every poll was a clean IN_PROGRESS (no transient errors), so the + // message stays the plain form — no transient detail appended. + assert.ok( + !err.message.includes('last transient error'), + 'a clean (non-transient) timeout must not claim a transient error', + ); + return true; + }, + ); + } finally { + cleanup(); + } + }); + + test('Timeout message surfaces the last transient error when the budget runs out mid-streak', async () => { + const cleanup = setSyncEnv('TEST', 'WUR16'); + // Every poll throws a transient (throttling → RetrievalFailed) error, but the + // tolerance is high enough that the deadline — not the transient budget — ends + // the wait. The Timeout message must then surface the last transient error so a + // caller can't mistake it for a healthy KB that merely never finished ingesting. + mockAgentSend(() => { + const e = new Error('Rate exceeded'); + e.name = 'ThrottlingException'; // → RetrievalFailed (transient) on every poll + throw e; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur16', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 30, pollIntervalMs: 5, maxConsecutiveTransientErrors: 1000 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.Timeout); + assert.ok(err.message.includes('30ms'), 'timeout message should include the budget'); + assert.ok( + err.message.includes('last transient error'), + 'timeout message should flag that the final polls were failing transiently', + ); + assert.ok( + err.message.includes('Rate exceeded'), + 'timeout message should include the underlying transient detail', + ); + return true; + }, + ); + } finally { + cleanup(); + } + }); + + test('Timeout message stays plain when an early transient blip was cleared by later clean polls', async () => { + const cleanup = setSyncEnv('TEST', 'WUR17'); + // Inverse of the mid-streak case (WUR16): the transient blip happens on the + // FIRST poll, but every later poll is a clean IN_PROGRESS, so the streak (and + // the remembered error) reset well before the deadline. The Timeout that + // eventually fires must read as a plain "still ingesting" timeout — the stale + // transient from the already-cleared streak must never be folded into it. + let calls = 0; + mockAgentSend(() => { + calls += 1; + if (calls === 1) { + const e = new Error('Rate exceeded'); + e.name = 'ThrottlingException'; // → RetrievalFailed (transient) on the first poll only + throw e; + } + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'IN_PROGRESS' }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur17', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 40, pollIntervalMs: 5 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.Timeout); + assert.ok(calls >= 2, `expected clean polls after the initial blip, got ${calls} call(s)`); + assert.ok( + !err.message.includes('last transient error'), + 'a transient blip cleared by later clean polls must not leak into the timeout message', + ); + assert.ok( + !err.message.includes('Rate exceeded'), + 'the stale transient detail from the cleared streak must not appear', + ); + return true; + }, + ); + } finally { + cleanup(); + } + }); + + test('resolves immediately when no data source id is configured', async () => { + // Pre-sync-API deployment: no DATA_SOURCE_ID injected, so there is + // nothing to poll. (Not a source-type distinction — the CDK layer registers + // DATA_SOURCE_ID for folder and imported s3:// sources alike; see DESIGN.md, + // the "Source coverage (folder and imported s3://)" note.) + const cleanup = setSyncEnv('TEST', 'WUR5', { dataSourceId: null }); + let sendCalled = false; + mockAgentSend(() => { + sendCalled = true; + return { ingestionJobSummaries: [] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur5', { source: './knowledge' }); + await kb.waitUntilSynced({ timeoutMs: 30, pollIntervalMs: 5 }); + assert.strictEqual(sendCalled, false, 'should not poll the control plane when there is nothing to track'); + } finally { + cleanup(); + } + }); + + test('tolerates a transient control-plane error, then resolves once COMPLETE', async () => { + const cleanup = setSyncEnv('TEST', 'WUR6'); + let calls = 0; + mockAgentSend(() => { + calls += 1; + if (calls === 1) { + // Unrecognized SDK error → mapSdkError classifies it as RetrievalFailed (transient). + const e = new Error('Rate exceeded'); + e.name = 'ThrottlingException'; + throw e; + } + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'COMPLETE' }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur6', { source: './knowledge' }); + await kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1 }); + assert.ok(calls >= 2, `expected a retry after the transient blip, got ${calls} call(s)`); + } finally { + cleanup(); + } + }); + + test('throws once consecutive transient errors exceed the tolerance', async () => { + const cleanup = setSyncEnv('TEST', 'WUR7'); + let calls = 0; + mockAgentSend(() => { + calls += 1; + const e = new Error('Rate exceeded'); + e.name = 'ThrottlingException'; // → RetrievalFailed (transient) on every poll + throw e; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur7', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1, maxConsecutiveTransientErrors: 2 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.RetrievalFailed); + return true; + }, + ); + // tolerance 2 → polls 1 & 2 absorbed, poll 3 exceeds the limit and rethrows. + assert.strictEqual(calls, 3, `expected 3 polls (2 tolerated + 1 over the limit), got ${calls}`); + } finally { + cleanup(); + } + }); + + test('short-circuits immediately on IngestionFailed (never retried as transient)', async () => { + const cleanup = setSyncEnv('TEST', 'WUR8'); + let listCalls = 0; + mockAgentSend((cmd) => { + if (cmd.constructor.name === 'ListIngestionJobsCommand') { + listCalls += 1; + return { ingestionJobSummaries: [{ ingestionJobId: 'job-fail', status: 'FAILED' }] }; + } + return { ingestionJob: { status: 'FAILED', failureReasons: ['boom'] } }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur8', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1, maxConsecutiveTransientErrors: 5 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.IngestionFailed); + return true; + }, + ); + assert.strictEqual(listCalls, 1, 'a FAILED job is terminal — it must short-circuit, not poll again'); + } finally { + cleanup(); + } + }); + + test('short-circuits immediately on NotReady (unset KB_ID is not retried forever)', async () => { + const prefix = 'BLOCKS_TEST_WUR9'; + const orig = process.env[`${prefix}_KB_ID`]; + delete process.env[`${prefix}_KB_ID`]; + let sendCalled = false; + mockAgentSend(() => { + sendCalled = true; + return { ingestionJobSummaries: [] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur9', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1, maxConsecutiveTransientErrors: 5 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.NotReady); + return true; + }, + ); + assert.strictEqual(sendCalled, false, 'a missing-KB config error fails before any poll and must not be retried'); + } finally { + if (orig !== undefined) process.env[`${prefix}_KB_ID`] = orig; + } + }); + + test('resets the transient-error counter after a clean poll', async () => { + const cleanup = setSyncEnv('TEST', 'WUR10'); + // transient → clean (IN_PROGRESS) → transient → COMPLETE. With tolerance 1 this + // only succeeds if the counter resets after the clean poll — otherwise the second + // transient error would be the 2nd consecutive failure and exceed the limit. + const seq = ['throw', 'IN_PROGRESS', 'throw', 'COMPLETE']; + let i = 0; + mockAgentSend(() => { + const step = seq[i++] ?? 'COMPLETE'; + if (step === 'throw') { + const e = new Error('Rate exceeded'); + e.name = 'ThrottlingException'; + throw e; + } + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status: step }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur10', { source: './knowledge' }); + await kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1, maxConsecutiveTransientErrors: 1 }); + assert.strictEqual(i, 4, 'should consume the full transient/clean/transient/complete sequence'); + } finally { + cleanup(); + } + }); + + // Cause-based transient classification: a control-plane ResourceNotFoundException + // (the KB/data source not yet visible in the post-deploy window) maps to NotReady + // WITH a `cause`, and is tolerated as transient — whereas an unset-KB_ID NotReady + // (thrown directly by ensureKbId, no `cause`) stays terminal. + + test('tolerates a transient control-plane ResourceNotFoundException (KB not yet visible), then resolves once COMPLETE', async () => { + const cleanup = setSyncEnv('TEST', 'WUR11'); + let calls = 0; + mockAgentSend(() => { + calls += 1; + if (calls === 1) { + // Post-deploy window: the freshly-created KB/data source has not + // propagated yet, so the control plane 404s. mapSdkError maps this to + // NotReady with cause.name === 'ResourceNotFoundException' → transient. + const e = new Error('No knowledge base with ID kb-test-123 exists'); + e.name = 'ResourceNotFoundException'; + throw e; + } + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'COMPLETE' }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur11', { source: './knowledge' }); + await kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1 }); + assert.ok(calls >= 2, `expected a retry after the not-yet-visible blip, got ${calls} call(s)`); + } finally { + cleanup(); + } + }); + + test('throws once consecutive control-plane ResourceNotFound errors exceed the tolerance', async () => { + const cleanup = setSyncEnv('TEST', 'WUR12'); + let calls = 0; + mockAgentSend(() => { + calls += 1; + const e = new Error('No knowledge base with ID kb-test-123 exists'); + e.name = 'ResourceNotFoundException'; // → NotReady (transient, carries cause) on every poll + throw e; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur12', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1, maxConsecutiveTransientErrors: 2 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.NotReady); + assert.strictEqual( + (err.cause as Error | undefined)?.name, + 'ResourceNotFoundException', + 'the rethrown NotReady should still carry the originating SDK error as cause', + ); + return true; + }, + ); + // tolerance 2 → polls 1 & 2 absorbed, poll 3 exceeds the limit and rethrows. + assert.strictEqual(calls, 3, `expected 3 polls (2 tolerated + 1 over the limit), got ${calls}`); + } finally { + cleanup(); + } + }); + + test('does NOT retry an unset-KB_ID NotReady (config error has no cause → terminal)', async () => { + const prefix = 'BLOCKS_TEST_WUR13'; + const orig = process.env[`${prefix}_KB_ID`]; + delete process.env[`${prefix}_KB_ID`]; + let sendCalled = false; + mockAgentSend(() => { + sendCalled = true; + return { ingestionJobSummaries: [] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur13', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 1, maxConsecutiveTransientErrors: 5 }), + (err: Error) => { + assert.strictEqual(err.name, KnowledgeBaseErrors.NotReady); + // The cause-based classification hinges on this: ensureKbId() throws + // NotReady directly, so there is no `cause` (unlike a not-yet-visible + // ResourceNotFoundException) — which keeps the config error terminal. + assert.strictEqual(err.cause, undefined, 'a config NotReady must carry no cause'); + return true; + }, + ); + assert.strictEqual(sendCalled, false, 'a missing-KB config error fails before any poll and must not be retried'); + } finally { + if (orig !== undefined) process.env[`${prefix}_KB_ID`] = orig; + } + }); + + // Cancellation via AbortSignal — checked before each poll and during the inter-poll sleep. + + test('rejects immediately when the signal is already aborted (no polling)', async () => { + const cleanup = setSyncEnv('TEST', 'WUR14'); + let sendCalled = false; + mockAgentSend(() => { + sendCalled = true; + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'IN_PROGRESS' }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur14', { source: './knowledge' }); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 5000, pollIntervalMs: 5, signal: AbortSignal.abort() }), + (err: Error) => { + assert.strictEqual(err.name, 'AbortError', 'default abort reason is a DOMException named AbortError'); + return true; + }, + ); + assert.strictEqual(sendCalled, false, 'an already-aborted signal must reject before any poll'); + } finally { + cleanup(); + } + }); + + test('aborts during the inter-poll delay and rejects with the supplied abort reason', async () => { + const cleanup = setSyncEnv('TEST', 'WUR15'); + const controller = new AbortController(); + let calls = 0; + mockAgentSend(() => { + calls += 1; + // Always "not synced yet" so the wait reaches the inter-poll sleep, where + // the abort fired below interrupts it. + return { ingestionJobSummaries: [{ ingestionJobId: 'j', status: 'IN_PROGRESS' }] }; + }); + + try { + const kb = new KnowledgeBase({ id: 'test' }, 'wur15', { source: './knowledge' }); + const reason = new Error('caller cancelled'); + setTimeout(() => controller.abort(reason), 20).unref?.(); + await assert.rejects( + () => kb.waitUntilSynced({ timeoutMs: 60_000, pollIntervalMs: 50, signal: controller.signal }), + (err: Error) => { + assert.strictEqual(err, reason, 'should reject with the exact reason passed to abort()'); + return true; + }, + ); + assert.ok(calls >= 1, 'should have polled at least once before being aborted'); + } finally { + cleanup(); + } + }); +}); diff --git a/packages/bb-knowledge-base/src/index.aws.ts b/packages/bb-knowledge-base/src/index.aws.ts index 341bc32c..521b996a 100644 --- a/packages/bb-knowledge-base/src/index.aws.ts +++ b/packages/bb-knowledge-base/src/index.aws.ts @@ -7,9 +7,21 @@ import { type RetrievalFilter, type KnowledgeBaseRetrievalResult, } from '@aws-sdk/client-bedrock-agent-runtime'; +import { + BedrockAgentClient, + ListIngestionJobsCommand, + GetIngestionJobCommand, + type IngestionJobSummary, +} from '@aws-sdk/client-bedrock-agent'; import { Scope, registerSdkIdentifiers, getSdkIdentifiers } from '@aws-blocks/core'; import type { ScopeParent } from '@aws-blocks/core'; -import type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, MetadataFilter } from './types.js'; +import type { + KnowledgeBaseOptions, + RetrieveOptions, + RetrieveResult, + MetadataFilter, + WaitUntilSyncedOptions, +} from './types.js'; import { KnowledgeBaseErrors } from './errors.js'; import { BB_NAME, BB_VERSION } from './version.js'; import { Logger } from '@aws-blocks/bb-logger'; @@ -23,6 +35,7 @@ export type { RetrieveOptions, RetrieveResult, MetadataFilter, + WaitUntilSyncedOptions, } from './types.js'; export { KnowledgeBaseErrors } from './errors.js'; @@ -43,6 +56,44 @@ function blocksError(name: string, message: string): Error { return err; } +/** + * Resolve after `ms` milliseconds. Used to space out the sync polls in + * `waitUntilSynced()`. If an {@link AbortSignal} is supplied and fires (or is + * already aborted), the returned promise rejects promptly with the signal's + * abort reason instead of waiting out the full delay. + */ +function sleep(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason); + return; + } + let onAbort: (() => void) | undefined; + const timer = setTimeout(() => { + if (signal && onAbort) signal.removeEventListener('abort', onAbort); + resolve(); + }, ms); + if (signal) { + onAbort = () => { + clearTimeout(timer); + reject(signal.reason); + }; + signal.addEventListener('abort', onAbort, { once: true }); + } + }); +} + +/** + * Apply ±20% random jitter to a poll interval so that many knowledge bases + * polling after a shared deploy do not synchronize into lockstep. Only the + * delay *between* polls varies — never the number of polls — and the caller + * still clamps the result to the remaining time budget. + */ +function jitterInterval(ms: number): number { + const factor = 1 + (Math.random() * 2 - 1) * 0.2; // 0.8–1.2 + return Math.max(1, Math.round(ms * factor)); +} + // Match only messages that clearly indicate a metadata filter issue. // Default unknown ValidationExceptions to ValidationError — false negatives // (filter error → generic) are less harmful than false positives (content @@ -95,6 +146,39 @@ function mapSdkError(err: unknown): Error { return mapped; } +/** + * Whether a mapped sync-poll error is a *transient* control-plane failure worth + * a bounded retry in {@link KnowledgeBase.waitUntilSynced}, rather than a terminal + * one that should short-circuit the wait. + * + * Two cases are transient: + * - `RetrievalFailedException` — the bucket {@link mapSdkError} uses for network + * errors, throttling, and other unrecognized SDK failures. + * - A *not-yet-visible* knowledge base. During the post-deploy window the control + * plane can briefly return `ResourceNotFoundException` (the KB or data source + * isn't visible yet); {@link mapSdkError} maps that to `KnowledgeBaseNotReadyException` + * **with the original SDK error attached as the non-enumerable `cause`**. Detect + * it via `cause.name === 'ResourceNotFoundException'` and ride it out — that is + * the entire purpose of `waitUntilSynced()`. + * + * Everything else is terminal and short-circuits immediately: the `NotReady` + * raised for an unset `KB_ID` config is thrown directly by `ensureKbId()` (so it + * carries **no** `cause`, which is exactly how we tell it apart from the transient + * not-yet-visible case above); `IngestionFailedException` (the job failed); and + * the validation errors all map to other names. + */ +function isTransientControlPlaneError(err: unknown): boolean { + if (!(err instanceof Error)) return false; + if (err.name === KnowledgeBaseErrors.RetrievalFailed) return true; + // A control-plane ResourceNotFoundException is mapped to NotReady WITH the SDK + // error attached as `cause`; the unset-KB_ID NotReady is thrown directly and has + // none. Only the former — a KB not yet visible post-deploy — is transient. + return ( + err.name === KnowledgeBaseErrors.NotReady && + (err.cause as Error | undefined)?.name === 'ResourceNotFoundException' + ); +} + // ── Filter builder ───────────────────────────────────────────────────────── function buildFilter(filter?: MetadataFilter): RetrievalFilter | undefined { @@ -138,11 +222,16 @@ function buildFilter(filter?: MetadataFilter): RetrievalFilter | undefined { * * **Environment variables (injected by CDK):** * - `BLOCKS_{FULLID}_KB_ID` — Bedrock Knowledge Base ID + * - `BLOCKS_{FULLID}_DATA_SOURCE_ID` — Bedrock data source ID (used by `isSynced()` / `waitUntilSynced()`) */ export class KnowledgeBase extends Scope { readonly bbName = BB_NAME; private readonly fullIdCached: string; private readonly runtimeClient: BedrockAgentRuntimeClient; + // Control-plane client for ingestion-job status (sync checks). Created + // lazily on first sync call via getAgentClient() so instances that only + // ever retrieve() (or never check sync state) don't allocate it. + private agentClient?: BedrockAgentClient; /** @internal Logger for internal operations. Defaults to error-level when not provided. */ protected log: ChildLogger; @@ -157,7 +246,8 @@ export class KnowledgeBase extends Scope { customUserAgent: this.buildUserAgentChain(), }); const kbId = process.env[envKey(this.fullIdCached, 'KB_ID')] ?? ''; - registerSdkIdentifiers(this.fullId, { kbId }); + const dataSourceId = process.env[envKey(this.fullIdCached, 'DATA_SOURCE_ID')] ?? ''; + registerSdkIdentifiers(this.fullId, { kbId, dataSourceId }); } private ensureKbId(): string { @@ -170,6 +260,22 @@ export class KnowledgeBase extends Scope { ); } + /** + * Resolve the configured Bedrock data source id, or `undefined` when none was + * registered. Named `get*` rather than `ensure*` because — unlike + * {@link KnowledgeBase.ensureKbId}, which throws when the id is missing — a + * missing data source id is a valid state that this simply reports as + * `undefined`. Both folder and imported `s3://` sources register a BB-managed + * data source id at deploy time, so this normally returns a value for either + * source type. It is `undefined` only for deployments that predate the sync + * API (no `DATA_SOURCE_ID` injected) — in which case there is no ingestion job + * to track and callers treat the KB as synced. + */ + private getDataSourceId(): string | undefined { + const dataSourceId = getSdkIdentifiers(this).dataSourceId; + return dataSourceId ? dataSourceId : undefined; + } + /** * Retrieve relevant document chunks for a natural language query. * @@ -227,6 +333,296 @@ export class KnowledgeBase extends Scope { throw mapped; } } + + /** + * Report whether the knowledge base is **synced with your latest data** — + * i.e. its most recent Bedrock ingestion job has reached `COMPLETE`. Mirrors + * the "Sync" state Bedrock surfaces in the console. + * + * Bedrock ingestion runs asynchronously after deploy (it is triggered + * fire-and-forget), so on a first deploy `retrieve()` returns an empty array + * during the initial pre-sync window even for queries that will later match. + * Use `isSynced()` to distinguish "not synced with your latest data yet" + * (`false`) from "synced, genuinely no match" (`true` alongside an empty + * `retrieve()` result). + * + * **Freshness, not availability.** This reports freshness, not reachability. + * Once the first ingestion has completed, `retrieve()` stays queryable + * throughout any subsequent re-ingestion — Bedrock keeps serving the prior + * snapshot while it re-indexes, it does not go dark. So `false` during a + * re-sync means "your newest documents aren't indexed yet", **not** "the KB + * is unavailable"; a caller that gates every `retrieve()` on `isSynced()` + * would back off unnecessarily on each document-update cycle even though the + * previous snapshot is fully queryable. + * + * Resolution strategy: lists the data source's ingestion jobs (most recent + * first) and inspects the latest job's status — `COMPLETE` → synced, + * `FAILED` → throws, anything else (`STARTING` / `IN_PROGRESS`, or no jobs + * yet) → not synced. Both folder and imported `s3://` sources register a + * BB-managed data source id, so both are tracked here; the "no data source + * id configured → reported synced" shortcut applies only to deployments that + * predate this API (no `DATA_SOURCE_ID` injected — nothing to track). + * + * **Embedding-propagation lag.** `COMPLETE` reflects that the ingestion job + * finished. For non-Aurora vector stores — this Building Block uses S3 + * Vectors — AWS notes embeddings can take a few more minutes to become + * queryable after the job completes, so `isSynced() === true` means the job + * completed, with a possible short propagation lag before the newest chunks + * surface in `retrieve()`. + * + * @returns `true` when the latest ingestion job is `COMPLETE` (or there is + * no managed data source to track); `false` while ingestion is pending. + * @throws {IngestionFailedException} If the most recent ingestion job failed (message includes `failureReasons`). + * @throws {KnowledgeBaseNotReadyException | KnowledgeBaseValidationError | InvalidFilterException | RetrievalFailedException} + * For mapped Bedrock control-plane errors. Two distinct conditions map to `NotReady`: + * the `KB_ID` env var being unset (a *config* error thrown directly, so it carries no + * `cause`), and a control-plane `ResourceNotFoundException` (a *not-yet-visible* KB, + * mapped with the SDK error as `cause`). {@link waitUntilSynced} relies on that + * distinction: it rides out the not-yet-visible case as transient but treats the + * unset-`KB_ID` config error as terminal. A control-plane `ValidationException` maps to + * `KnowledgeBaseValidationError` (or `InvalidFilterException`); any other SDK error + * (network, auth, throttling) maps to `RetrievalFailedException`. This is the same + * mapping {@link mapSdkError} applies to `retrieve()`. + * + * @example + * ```typescript + * if (await kb.isSynced()) { + * const results = await kb.retrieve('how do I reset my password'); + * } + * ``` + */ + async isSynced(): Promise { + const knowledgeBaseId = this.ensureKbId(); + const dataSourceId = this.getDataSourceId(); + // No BB-managed ingestion to track → nothing to wait for. + if (!dataSourceId) return true; + + const job = await this.fetchLatestIngestionJob(knowledgeBaseId, dataSourceId); + // No ingestion job recorded yet → ingestion has not started; not synced yet. + if (!job) return false; + + if (job.status === 'COMPLETE') return true; + if (job.status === 'FAILED') { + const reasons = await this.fetchFailureReasons(knowledgeBaseId, dataSourceId, job.ingestionJobId); + throw blocksError( + KnowledgeBaseErrors.IngestionFailed, + `Knowledge base ingestion failed.${reasons.length ? ` Reasons: ${reasons.join('; ')}` : ''}`, + ); + } + // STARTING | IN_PROGRESS | STOPPING | STOPPED → not synced yet. + return false; + } + + /** + * Wait until the knowledge base is **synced with your latest data** (its most + * recent ingestion job reaches `COMPLETE`), polling the ingestion-job status + * until synced or until the timeout elapses. + * + * Polls {@link isSynced} every `pollIntervalMs` until it returns `true` + * (resolves) or the `timeoutMs` budget is exhausted (throws). If the most + * recent ingestion job has `FAILED`, the underlying `IngestionFailedException` + * propagates immediately rather than waiting out the timeout. + * + * Because this method exists for the noisy post-deploy window, it tolerates a + * bounded run of *transient* control-plane errors rather than aborting on the + * first blip: up to `maxConsecutiveTransientErrors` consecutive transient + * failures are absorbed and retried, and any clean poll resets that counter. + * Two kinds of error are transient — `RetrievalFailedException` (throttling / + * transient network failures) and a *not-yet-visible* KB, where the control + * plane briefly returns `ResourceNotFoundException` while the freshly-deployed + * KB or data source has not propagated yet (mapped to `KnowledgeBaseNotReadyException`). + * Riding out that window is the whole point of this method. + * + * Terminal errors still short-circuit immediately — a `FAILED` job + * (`IngestionFailedException`) and a *config* missing-KB error (`KB_ID` unset, + * which `ensureKbId()` throws directly with no `cause`, distinct from the + * transient not-yet-visible case above) are never retried. + * + * The delay between polls carries ±20% jitter so that many knowledge bases + * polling after a shared deploy do not synchronize; jitter only varies the + * sleep duration, never the number of polls, and is clamped to the deadline. + * Pass `options.signal` to cancel the wait — it is checked before each poll + * and during the inter-poll delay, rejecting promptly with the signal's abort + * reason (default: a `DOMException` named `'AbortError'`). + * + * @param {WaitUntilSyncedOptions} options - Optional polling parameters. + * `timeoutMs` (default 300000) bounds the total wait; `pollIntervalMs` + * (default 5000, clamped to a minimum of 1ms, ±20% jitter) spaces out the + * polls; `maxConsecutiveTransientErrors` (default 3, minimum 0) bounds how + * many consecutive transient control-plane errors are tolerated before + * giving up; `signal` (optional `AbortSignal`) cancels the wait. + * @throws {KnowledgeBaseTimeoutException} If the KB does not sync within `timeoutMs`. + * @throws {IngestionFailedException} If the most recent ingestion job failed (message includes `failureReasons`). + * @throws {KnowledgeBaseNotReadyException | KnowledgeBaseValidationError | InvalidFilterException | RetrievalFailedException} + * Propagated from {@link isSynced} for mapped Bedrock control-plane errors — see its docs + * for the full mapping (`ResourceNotFoundException`/unset `KB_ID` → `NotReady`, + * `ValidationException` → `KnowledgeBaseValidationError`/`InvalidFilterException`, + * other SDK errors → `RetrievalFailedException`). Transient errors (a `RetrievalFailedException`, + * or a `NotReady` caused by a not-yet-visible `ResourceNotFoundException`) are retried up + * to `maxConsecutiveTransientErrors` times before being rethrown. + * @throws The `signal`'s abort reason (default `DOMException` `'AbortError'`) if `options.signal` fires. + * + * @example + * ```typescript + * // Block until the KB is queryable (e.g. right after deploy) + * await kb.waitUntilSynced({ timeoutMs: 600_000 }); + * const results = await kb.retrieve('getting started'); + * + * // With cancellation (e.g. an overall request deadline) + * await kb.waitUntilSynced({ signal: AbortSignal.timeout(120_000) }); + * ``` + */ + async waitUntilSynced(options?: WaitUntilSyncedOptions): Promise { + const timeoutMs = Math.max(options?.timeoutMs ?? 300_000, 0); + const pollIntervalMs = Math.max(options?.pollIntervalMs ?? 5_000, 1); + const maxConsecutiveTransientErrors = Math.max(options?.maxConsecutiveTransientErrors ?? 3, 0); + const signal = options?.signal; + const deadline = Date.now() + timeoutMs; + + let consecutiveTransientErrors = 0; + let lastTransient: Error | undefined; + for (;;) { + // Cancellation: bail out before doing any work on each iteration. An + // already-aborted signal throws here on the very first pass (no poll). + signal?.throwIfAborted(); + try { + // isSynced() resolves true (synced) / false (not synced yet), throws + // IngestionFailedException on a FAILED job, NotReady when the KB is + // not deployed (or briefly not-yet-visible), or RetrievalFailedException + // for transient blips. + if (await this.isSynced()) return; + // A clean poll clears any transient-error streak — reset the remembered + // error alongside the counter so a later Timeout can only ever fold in a + // transient from the streak still in flight at the deadline, never a stale + // one from an earlier streak that clean polls already rode out. + consecutiveTransientErrors = 0; + lastTransient = undefined; + } catch (err) { + // Terminal errors (FAILED job, missing-KB config, validation) short-circuit. + // isTransientControlPlaneError() only returns true after its own + // `instanceof Error` guard, so pairing it with one here narrows `err` to + // `Error` for the rest of the catch — no per-use `as Error` casts needed. + if (!(err instanceof Error) || !isTransientControlPlaneError(err)) throw err; + // Transient control-plane blip: absorb a bounded run, then give up. + consecutiveTransientErrors += 1; + lastTransient = err; + if (consecutiveTransientErrors > maxConsecutiveTransientErrors) { + // Distinct from a Timeout on a healthy-but-still-ingesting KB: log that + // the transient tolerance was exhausted before rethrowing, so "gave up + // after N consecutive control-plane errors" is greppable in CloudWatch + // and not mistaken for a KB that simply never finished syncing. + this.log.warn( + `waitUntilSynced: giving up after ${consecutiveTransientErrors} consecutive transient ` + + `control-plane error(s) — tolerance (${maxConsecutiveTransientErrors}) exhausted: ${err.message}`, + ); + throw err; + } + this.log.warn( + `waitUntilSynced: tolerating transient control-plane error ` + + `(${consecutiveTransientErrors}/${maxConsecutiveTransientErrors}), retrying: ${err.message}`, + ); + } + if (Date.now() >= deadline) { + // If the budget ran out while we were still absorbing transient + // control-plane errors, fold the most recent one into the message. + // Otherwise a timeout reads like a healthy KB that just never finished + // ingesting, hiding that the final polls were actually failing transiently. + const base = `Knowledge base did not sync within ${timeoutMs}ms`; + throw blocksError( + KnowledgeBaseErrors.Timeout, + consecutiveTransientErrors > 0 && lastTransient + ? `${base} (last transient error: ${lastTransient.message})` + : `${base}.`, + ); + } + // Jitter the interval to avoid lockstep, but never sleep past the + // deadline; the sleep is abortable via the signal. + await sleep(Math.min(jitterInterval(pollIntervalMs), Math.max(deadline - Date.now(), 0)), signal); + } + } + + /** + * Lazily construct (and memoize) the Bedrock control-plane client used for + * ingestion-job status during sync checks. Built on first use rather + * than in the constructor so instances that only ever call {@link retrieve} + * — or never check sync state at all — don't allocate a client they won't use. + * Subsequent calls return the cached instance. + */ + private getAgentClient(): BedrockAgentClient { + if (!this.agentClient) { + this.agentClient = new BedrockAgentClient({ + maxAttempts: 3, + retryMode: 'adaptive', + customUserAgent: this.buildUserAgentChain(), + }); + } + return this.agentClient; + } + + /** + * List the data source's ingestion jobs (most recent first) and return the + * latest summary, or `undefined` when none exist yet. SDK errors are mapped + * to Blocks error constants via {@link mapSdkError}. + */ + private async fetchLatestIngestionJob( + knowledgeBaseId: string, + dataSourceId: string, + ): Promise { + try { + const response = await this.getAgentClient().send( + new ListIngestionJobsCommand({ + knowledgeBaseId, + dataSourceId, + sortBy: { attribute: 'STARTED_AT', order: 'DESCENDING' }, + maxResults: 1, + }), + ); + return response.ingestionJobSummaries?.[0]; + } catch (err) { + const mapped = mapSdkError(err); + // Logged at debug, not error: this path fires for the transient control-plane + // blips (throttling → RetrievalFailed, a not-yet-visible KB → NotReady) that + // waitUntilSynced() is designed to absorb and retry during the post-deploy + // warm-up window — emitting them at error produced spurious CloudWatch ERROR + // entries during expected behavior. waitUntilSynced() owns the operator signal + // (its own warn at the retry/give-up sites); a direct isSynced() caller receives + // the thrown mapped error and owns how to surface it. + this.log.debug(mapped.message); + throw mapped; + } + } + + /** + * Fetch the `failureReasons` for a failed ingestion job. Best-effort: the + * `ListIngestionJobs` summary omits failure reasons, so this issues a + * `GetIngestionJob` for the detail. Returns an empty array if the id is + * missing or the lookup fails — the caller still reports the failure. + */ + private async fetchFailureReasons( + knowledgeBaseId: string, + dataSourceId: string, + ingestionJobId: string | undefined, + ): Promise { + if (!ingestionJobId) return []; + try { + const response = await this.getAgentClient().send( + new GetIngestionJobCommand({ knowledgeBaseId, dataSourceId, ingestionJobId }), + ); + const reasons = response.ingestionJob?.failureReasons ?? []; + if (reasons.length === 0) { + // A FAILED job with no reported reasons is unusual — surface a hint so + // the otherwise reason-less IngestionFailedException is easier to diagnose. + this.log.warn( + `Ingestion job ${ingestionJobId} is FAILED but reported no failureReasons; ` + + `the surfaced error will not include a cause.`, + ); + } + return reasons; + } catch (err) { + this.log.error(mapSdkError(err).message); + return []; + } + } } // ── Result mapping ───────────────────────────────────────────────────────── diff --git a/packages/bb-knowledge-base/src/index.browser.ts b/packages/bb-knowledge-base/src/index.browser.ts index b4de0772..f66cb8e0 100644 --- a/packages/bb-knowledge-base/src/index.browser.ts +++ b/packages/bb-knowledge-base/src/index.browser.ts @@ -2,14 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 import type { ScopeParent } from '@aws-blocks/core'; -import type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult } from './types.js'; +import type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, WaitUntilSyncedOptions } from './types.js'; import { KnowledgeBaseErrors } from './errors.js'; export type { KnowledgeBaseOptions, SourceConfig, ChunkingConfig, ChunkingStrategy, RetrieveOptions, RetrieveResult, - MetadataFilter, + MetadataFilter, WaitUntilSyncedOptions, } from './types.js'; export { KnowledgeBaseErrors } from './errors.js'; @@ -40,4 +40,12 @@ export class KnowledgeBase { async retrieve(_query: string, _options?: RetrieveOptions): Promise { throw browserError(); } + + async isSynced(): Promise { + throw browserError(); + } + + async waitUntilSynced(_options?: WaitUntilSyncedOptions): Promise { + throw browserError(); + } } diff --git a/packages/bb-knowledge-base/src/index.cdk.test.ts b/packages/bb-knowledge-base/src/index.cdk.test.ts new file mode 100644 index 00000000..b0a12e41 --- /dev/null +++ b/packages/bb-knowledge-base/src/index.cdk.test.ts @@ -0,0 +1,281 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/** + * CDK-side tests for KnowledgeBase. + * + * Teardown: the data `s3.Bucket` paired `RemovalPolicy.DESTROY` with + * `autoDeleteObjects` on a `destroy`/sandbox teardown, but the S3 Vectors L1 + * resources (`CfnVectorBucket` + `CfnIndex`) relied solely on their default + * CloudFormation `DeletionPolicy` and leaked. Those tests pin the fix: the + * vector resources now mirror the data bucket's removal policy. + * + * Ingestion sync: the handler role must be able to read ingestion-job + * status (`bedrock:GetIngestionJob` / `bedrock:ListIngestionJobs`) — scoped to + * the KB ARN like the existing `bedrock:Retrieve` grant — and the + * `DATA_SOURCE_ID` config the runtime sync checks rely on must be + * registered and surface in the synthesized template. + * + * Synth guards: the runtime methods (`retrieve` / `isSynced` / `waitUntilSynced`) + * are stubbed on the CDK construct so an accidental synth-time call throws an + * actionable error instead of a cryptic `TypeError: not a function`. + */ +import { test } from 'node:test'; +import assert from 'node:assert'; +import { fileURLToPath } from 'node:url'; +import { dirname, resolve } from 'node:path'; +import * as cdk from 'aws-cdk-lib'; +import type { Construct } from 'constructs'; +import { Template, Match } from 'aws-cdk-lib/assertions'; +import * as s3vectors from 'aws-cdk-lib/aws-s3vectors'; +import { Scope, DEFAULT_NODE_RUNTIME, finalizeConfigRegistry } from '@aws-blocks/core/cdk'; +import { KnowledgeBase } from './index.cdk.js'; + +// Real local-folder source so BucketDeployment + sidecar generation synth. +const FIXTURES = resolve(dirname(fileURLToPath(import.meta.url)), '..', 'test-fixtures', 'knowledge'); + +// Pull CFN type names off the L1 classes so the assertions don't drift if AWS +// renames the underlying resource types. +const VECTOR_BUCKET_TYPE = s3vectors.CfnVectorBucket.CFN_RESOURCE_TYPE_NAME; +const VECTOR_INDEX_TYPE = s3vectors.CfnIndex.CFN_RESOURCE_TYPE_NAME; + +// Minimal BlocksStack-shaped parent — KnowledgeBase calls +// `this.handler.addToRolePolicy(...)` and `cdk.Stack.of(this)`, both of which +// resolve through CURRENT_BLOCKS_STACK (mirrors the production BlocksStack). +class StubBlocksStack extends cdk.Stack { + public readonly handler: cdk.aws_lambda.Function; + public readonly id: string; + constructor(scope: Construct, id: string) { + super(scope, id); + this.id = id; + (globalThis as any).CURRENT_BLOCKS_STACK = this; + this.handler = new cdk.aws_lambda.Function(this, 'StubHandler', { + runtime: DEFAULT_NODE_RUNTIME, + handler: 'index.handler', + code: cdk.aws_lambda.Code.fromInline('exports.handler = async () => {};'), + }); + } +} + +function buildStack(options: { removalPolicy?: 'destroy' | 'retain'; sandbox?: boolean; source?: string } = {}): { + stack: StubBlocksStack; + kb: KnowledgeBase; +} { + const app = new cdk.App(options.sandbox ? { context: { sandboxMode: 'true' } } : undefined); + // S3 bucket names must be lowercase; the data bucket derives its name from + // the scope chain, so keep ids lowercase. + const stack = new StubBlocksStack(app, 'teststack'); + const parent = new Scope('app'); + const kb = new KnowledgeBase(parent, 'docs', { + source: options.source ?? FIXTURES, + ...(options.removalPolicy ? { removalPolicy: options.removalPolicy } : {}), + }); + return { stack, kb }; +} + +function synth(options: { removalPolicy?: 'destroy' | 'retain'; sandbox?: boolean } = {}): Template { + return Template.fromStack(buildStack(options).stack); +} + +test("CDK: removalPolicy 'destroy' makes the data bucket + vector store deletable and adds auto-delete", () => { + const template = synth({ removalPolicy: 'destroy' }); + + // Data bucket: force-deletable and auto-empties on teardown. + template.hasResource('AWS::S3::Bucket', { DeletionPolicy: 'Delete' }); + template.resourceCountIs('Custom::S3AutoDeleteObjects', 1); + + // S3 Vectors resources mirror the data bucket — dropped on teardown. + template.hasResource(VECTOR_BUCKET_TYPE, { DeletionPolicy: 'Delete' }); + template.hasResource(VECTOR_INDEX_TYPE, { DeletionPolicy: 'Delete' }); +}); + +test("CDK: removalPolicy 'retain' keeps the data bucket + vector store and omits auto-delete", () => { + const template = synth({ removalPolicy: 'retain' }); + + template.hasResource('AWS::S3::Bucket', { DeletionPolicy: 'Retain' }); + template.resourceCountIs('Custom::S3AutoDeleteObjects', 0); + + template.hasResource(VECTOR_BUCKET_TYPE, { DeletionPolicy: 'Retain' }); + template.hasResource(VECTOR_INDEX_TYPE, { DeletionPolicy: 'Retain' }); +}); + +test('CDK: sandboxMode context defaults the data bucket + vector store to destroy', () => { + const template = synth({ sandbox: true }); + + template.hasResource('AWS::S3::Bucket', { DeletionPolicy: 'Delete' }); + template.resourceCountIs('Custom::S3AutoDeleteObjects', 1); + + template.hasResource(VECTOR_BUCKET_TYPE, { DeletionPolicy: 'Delete' }); + template.hasResource(VECTOR_INDEX_TYPE, { DeletionPolicy: 'Delete' }); +}); + +test('CDK: handler role can read ingestion-job status (GetIngestionJob/ListIngestionJobs), scoped to the KB ARN like bedrock:Retrieve', () => { + const template = synth(); + + // isSynced()/waitUntilSynced() poll ingestion-job status — the handler role + // needs both actions, granted as Allow. + template.hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: Match.objectLike({ + Statement: Match.arrayWith([ + Match.objectLike({ + Action: ['bedrock:GetIngestionJob', 'bedrock:ListIngestionJobs'], + Effect: 'Allow', + }), + ]), + }), + }); + + // ...and that grant is scoped to the SAME knowledge-base ARN as the existing + // bedrock:Retrieve grant (not a wildcard) — ingestion jobs are sub-resources + // of the KB ARN. + const statements = Object.values(template.findResources('AWS::IAM::Policy')).flatMap( + (policy) => policy.Properties.PolicyDocument.Statement as Array>, + ); + const retrieveStmt = statements.find((s) => s.Action === 'bedrock:Retrieve'); + const ingestionStmt = statements.find( + (s) => Array.isArray(s.Action) && (s.Action as string[]).includes('bedrock:GetIngestionJob'), + ); + assert.ok(retrieveStmt, 'bedrock:Retrieve grant is present'); + assert.ok(ingestionStmt, 'ingestion-status grant is present'); + assert.deepStrictEqual( + ingestionStmt.Resource, + retrieveStmt.Resource, + 'ingestion-status grant is scoped to the same KB ARN as bedrock:Retrieve', + ); +}); + +test('CDK: registers the DATA_SOURCE_ID config (wired to the data source) and surfaces it in the synthesized template', () => { + const { stack } = buildStack(); + + // registerConfig records BLOCKS_{FULLID}_DATA_SOURCE_ID on the stack's config + // registry, bound to the Bedrock data source's id — the runtime sync + // checks read it back at cold start. (Mirrors bb-app-setting's CDK test.) + const registry = (stack as any)[Symbol.for('BLOCKS_CONFIG_REGISTRY')] as + | { entries: Map } + | undefined; + assert.ok(registry, 'config registry exists on the stack'); + + const dataSourceKey = [...registry.entries.keys()].find((k) => k.endsWith('_DATA_SOURCE_ID')); + assert.ok(dataSourceKey, 'a *_DATA_SOURCE_ID config key is registered'); + assert.match(dataSourceKey, /^BLOCKS_.+_DATA_SOURCE_ID$/); + + const resolvedValue = stack.resolve(registry.entries.get(dataSourceKey)) as { + 'Fn::GetAtt'?: [string, string]; + }; + assert.ok(resolvedValue['Fn::GetAtt'], 'config value is a CDK token (Fn::GetAtt)'); + assert.strictEqual( + resolvedValue['Fn::GetAtt'][1], + 'DataSourceId', + 'config value is wired to the data source id', + ); + + // finalizeConfigRegistry serializes the registry into blocks-config.json via a + // BucketDeployment; the rendered config blob in the synthesized template + // carries the DATA_SOURCE_ID key bound to the data source's DataSourceId, and + // the handler is wired to read it from S3. (Mirrors bb-auth-cognito's CDK test.) + finalizeConfigRegistry(stack, stack.handler); + const template = Template.fromStack(stack); + + const configBlob = JSON.stringify( + Object.values(template.findResources('Custom::CDKBucketDeployment')), + ); + assert.match(configBlob, /BLOCKS_[A-Z0-9_]+_DATA_SOURCE_ID/); + assert.ok( + configBlob.includes('DataSourceId'), + 'config blob binds the DATA_SOURCE_ID key to the data source id', + ); + + template.hasResourceProperties('AWS::Lambda::Function', { + Environment: Match.objectLike({ + Variables: Match.objectLike({ + BLOCKS_CONFIG_BUCKET: Match.anyValue(), + BLOCKS_CONFIG_KEY: 'blocks-config.json', + }), + }), + }); +}); + +// ── S3 URI (imported bucket) source ───────────────────────────────────────── +// An imported s3:// source skips the documents BucketDeployment (the objects +// already live in the bucket) but still provisions a BB-managed CfnDataSource +// and fires the ingestion job — so the runtime sync grants and DATA_SOURCE_ID +// wiring must be present exactly as they are for a local-folder source (see +// DESIGN.md, "Source coverage (folder and imported s3://)"). +const S3_SOURCE = 's3://my-docs-bucket'; + +test('CDK (s3:// source): handler still gets bedrock:Retrieve + ingestion-status grants scoped to the KB ARN', () => { + const { stack } = buildStack({ source: S3_SOURCE }); + const template = Template.fromStack(stack); + + // Imported bucket → no documents BucketDeployment (proves the s3:// branch is + // taken, not the folder path; finalizeConfigRegistry isn't called here). + template.resourceCountIs('Custom::CDKBucketDeployment', 0); + + // Same ingestion-status grant as a folder source: both actions, granted Allow. + template.hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: Match.objectLike({ + Statement: Match.arrayWith([ + Match.objectLike({ + Action: ['bedrock:GetIngestionJob', 'bedrock:ListIngestionJobs'], + Effect: 'Allow', + }), + ]), + }), + }); + + // ...scoped to the SAME knowledge-base ARN as the existing bedrock:Retrieve + // grant (not a wildcard) — ingestion jobs are sub-resources of the KB ARN. + const statements = Object.values(template.findResources('AWS::IAM::Policy')).flatMap( + (policy) => policy.Properties.PolicyDocument.Statement as Array>, + ); + const retrieveStmt = statements.find((s) => s.Action === 'bedrock:Retrieve'); + const ingestionStmt = statements.find( + (s) => Array.isArray(s.Action) && (s.Action as string[]).includes('bedrock:GetIngestionJob'), + ); + assert.ok(retrieveStmt, 'bedrock:Retrieve grant is present for an s3:// source'); + assert.ok(ingestionStmt, 'ingestion-status grant is present for an s3:// source'); + assert.deepStrictEqual( + ingestionStmt.Resource, + retrieveStmt.Resource, + 'ingestion-status grant is scoped to the same KB ARN as bedrock:Retrieve', + ); +}); + +test('CDK (s3:// source): DATA_SOURCE_ID config is wired to the data source id (same as a folder source)', () => { + const { stack } = buildStack({ source: S3_SOURCE }); + + // Even though the bucket is imported, the construct still registers + // BLOCKS_{FULLID}_DATA_SOURCE_ID bound to the Bedrock data source's id, so the + // runtime isSynced()/waitUntilSynced() checks track the imported source's + // ingestion job exactly as they do for a local folder. + const registry = (stack as any)[Symbol.for('BLOCKS_CONFIG_REGISTRY')] as + | { entries: Map } + | undefined; + assert.ok(registry, 'config registry exists on the stack'); + + const dataSourceKey = [...registry.entries.keys()].find((k) => k.endsWith('_DATA_SOURCE_ID')); + assert.ok(dataSourceKey, 'a *_DATA_SOURCE_ID config key is registered for an s3:// source'); + assert.match(dataSourceKey, /^BLOCKS_.+_DATA_SOURCE_ID$/); + + const resolvedValue = stack.resolve(registry.entries.get(dataSourceKey)) as { + 'Fn::GetAtt'?: [string, string]; + }; + assert.ok(resolvedValue['Fn::GetAtt'], 'config value is a CDK token (Fn::GetAtt)'); + assert.strictEqual( + resolvedValue['Fn::GetAtt'][1], + 'DataSourceId', + 'config value is wired to the data source id even when the source is an S3 URI', + ); +}); + +test('CDK: calling a runtime method throws an actionable synth-time error (not a cryptic TypeError)', () => { + const { kb } = buildStack(); + const construct = kb as unknown as Record unknown>; + for (const method of ['retrieve', 'isSynced', 'waitUntilSynced']) { + assert.throws( + () => construct[method]('x'), + /cannot be called during CDK synth/, + `${method}() should throw the actionable synth-time error`, + ); + } +}); diff --git a/packages/bb-knowledge-base/src/index.cdk.ts b/packages/bb-knowledge-base/src/index.cdk.ts index 765cb26b..9d9b409c 100644 --- a/packages/bb-knowledge-base/src/index.cdk.ts +++ b/packages/bb-knowledge-base/src/index.cdk.ts @@ -8,7 +8,7 @@ import * as bedrock from 'aws-cdk-lib/aws-bedrock'; import * as s3vectors from 'aws-cdk-lib/aws-s3vectors'; import * as s3deploy from 'aws-cdk-lib/aws-s3-deployment'; import * as cr from 'aws-cdk-lib/custom-resources'; -import { Scope, registerConfig } from '@aws-blocks/core/cdk'; +import { Scope, registerConfig, synthGuard } from '@aws-blocks/core/cdk'; import type { ScopeParent } from '@aws-blocks/core'; import type { KnowledgeBaseOptions, ChunkingConfig } from './types.js'; import * as path from 'node:path'; @@ -19,7 +19,7 @@ export type { KnowledgeBaseOptions, SourceConfig, ChunkingConfig, ChunkingStrategy, RetrieveOptions, RetrieveResult, - MetadataFilter, + MetadataFilter, WaitUntilSyncedOptions, } from './types.js'; export { KnowledgeBaseErrors } from './errors.js'; @@ -176,9 +176,11 @@ function generateMetadataSidecars(sourceDir: string): string | undefined { * * **Environment variables injected into the handler:** * - `BLOCKS_{FULLID}_KB_ID` — Bedrock Knowledge Base ID (used by the AWS runtime) + * - `BLOCKS_{FULLID}_DATA_SOURCE_ID` — Bedrock data source ID (used by `isSynced()` / `waitUntilSynced()`) * * **IAM grants to the handler:** * - `bedrock:Retrieve` — query the knowledge base at runtime + * - `bedrock:GetIngestionJob`, `bedrock:ListIngestionJobs` — poll ingestion-job sync status * * @param scope - Parent scope. * @param id - Unique identifier within the scope. @@ -192,6 +194,15 @@ export class KnowledgeBase extends Scope { // ── 1. S3 Data Bucket ────────────────────────────────────────────── + // In sandbox mode, default to DESTROY + autoDeleteObjects so a teardown + // can fully clean up without manual bucket emptying. An explicit + // `removalPolicy` from the customer always takes precedence. Computed + // up-front because it also drives the S3 Vectors resources' deletion + // policy (section 2) — keeping the data bucket and the vector store in + // sync on teardown. + const isSandbox = cdk.Stack.of(this).node.tryGetContext('sandboxMode') === 'true'; + const destroy = options.removalPolicy === 'destroy' || (isSandbox && options.removalPolicy === undefined); + let dataBucket: s3.IBucket; let inclusionPrefixes: string[] | undefined; @@ -219,11 +230,6 @@ export class KnowledgeBase extends Scope { inclusionPrefixes = [prefix.endsWith('/') ? prefix : prefix + '/']; } } else { - // In sandbox mode, default to DESTROY + autoDeleteObjects so - // `cdk destroy` can fully clean up without manual bucket emptying. - // Explicit `removalPolicy` from the customer takes precedence. - const isSandbox = cdk.Stack.of(this).node.tryGetContext('sandboxMode') === 'true'; - const destroy = options.removalPolicy === 'destroy' || (isSandbox && options.removalPolicy === undefined); dataBucket = new s3.Bucket(this, 'Data', { bucketName: cdk.PhysicalName.GENERATE_IF_NEEDED, blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL, @@ -256,6 +262,19 @@ export class KnowledgeBase extends Scope { }, }); + // Mirror the data bucket's teardown behavior on the S3 Vectors L1 + // resources. Unlike the L2 s3.Bucket — which defaults to RETAIN and can + // auto-empty via autoDeleteObjects — these CfnVectorBucket/CfnIndex + // resources rely solely on their CloudFormation DeletionPolicy (default: + // Delete). Without an explicit policy they'd be inconsistent with the + // data bucket on teardown. applyRemovalPolicy sets both DeletionPolicy + // and UpdateReplacePolicy. RETAIN by default (parity with the data + // bucket); DELETE only when a `removalPolicy:'destroy'` / sandbox + // teardown is requested, so the vector store is dropped alongside it. + const vectorRemovalPolicy = destroy ? cdk.RemovalPolicy.DESTROY : cdk.RemovalPolicy.RETAIN; + vectorBucket.applyRemovalPolicy(vectorRemovalPolicy); + vectorIndex.applyRemovalPolicy(vectorRemovalPolicy); + // ── 3. IAM Role for Bedrock ──────────────────────────────────────── // Scoped to this account via aws:SourceAccount to prevent confused-deputy. // Ideally we'd also add aws:SourceArn scoped to the KB ARN, but that @@ -430,23 +449,44 @@ export class KnowledgeBase extends Scope { startIngestion.node.addDependency(deployment); } - // ── 8. Handler env vars ─────────────────────────────────────────── - // The AWS runtime reads these to locate the Bedrock resources. + // ── 8. Handler config (read by the AWS runtime) ─────────────────── + // Registered via registerConfig (not addEnvironment) so the runtime can + // locate the Bedrock resources. KB_ID drives retrieve(); DATA_SOURCE_ID + // drives the isSynced()/waitUntilSynced() ingestion-sync checks. registerConfig(this, envKey(this.fullId, 'KB_ID'), knowledgeBase.attrKnowledgeBaseId); + registerConfig(this, envKey(this.fullId, 'DATA_SOURCE_ID'), dataSource.attrDataSourceId); // ── 9. Handler IAM grants ───────────────────────────────────────── + const knowledgeBaseArn = cdk.Stack.of(this).formatArn({ + service: 'bedrock', + resource: 'knowledge-base', + resourceName: knowledgeBase.attrKnowledgeBaseId, + arnFormat: cdk.ArnFormat.SLASH_RESOURCE_NAME, + }); + this.handler.addToRolePolicy(new iam.PolicyStatement({ actions: ['bedrock:Retrieve'], - resources: [ - cdk.Stack.of(this).formatArn({ - service: 'bedrock', - resource: 'knowledge-base', - resourceName: knowledgeBase.attrKnowledgeBaseId, - arnFormat: cdk.ArnFormat.SLASH_RESOURCE_NAME, - }), - ], + resources: [knowledgeBaseArn], + })); + + // Ingestion-job status for isSynced()/waitUntilSynced(). These actions are + // authorized at the knowledge-base resource level (the data source and + // ingestion jobs are sub-resources of the KB ARN). + this.handler.addToRolePolicy(new iam.PolicyStatement({ + actions: ['bedrock:GetIngestionJob', 'bedrock:ListIngestionJobs'], + resources: [knowledgeBaseArn], })); } + + // ── Runtime methods are not available during CDK synth ──────────────── + // Under `--conditions=cdk` a KnowledgeBase resolves to this construct, which + // only provisions infrastructure. The data/sync methods (retrieve/ + // isSynced/waitUntilSynced) live in the runtime build. Calling them at module + // top-level (which runs during synth) would otherwise fail with a cryptic + // `X is not a function`; these stubs turn that into an actionable message. + retrieve(..._args: unknown[]): never { return synthGuard('KnowledgeBase', 'retrieve'); } + isSynced(..._args: unknown[]): never { return synthGuard('KnowledgeBase', 'isSynced'); } + waitUntilSynced(..._args: unknown[]): never { return synthGuard('KnowledgeBase', 'waitUntilSynced'); } } diff --git a/packages/bb-knowledge-base/src/index.mock.test.ts b/packages/bb-knowledge-base/src/index.mock.test.ts index 27d11d5b..b51cee26 100644 --- a/packages/bb-knowledge-base/src/index.mock.test.ts +++ b/packages/bb-knowledge-base/src/index.mock.test.ts @@ -1032,5 +1032,33 @@ describe('unicode / multilingual retrieval', () => { }); }); +// ── Sync (local dev: no async ingestion window) ───────────────────────────── +// +// The local corpus loads synchronously on first retrieve(), so there is no +// asynchronous ingestion window — isSynced() is always true and +// waitUntilSynced() resolves immediately (options are ignored). + +describe('sync', () => { + test('isSynced() resolves true immediately', async () => { + const kb = new KnowledgeBase({ id: 'test' }, 'synced', { source: 'test-knowledge-tmp' }); + assert.strictEqual(await kb.isSynced(), true); + }); + + test('waitUntilSynced() resolves immediately', async () => { + const kb = new KnowledgeBase({ id: 'test' }, 'waitsynced', { source: 'test-knowledge-tmp' }); + await kb.waitUntilSynced(); + }); + + test('isSynced() is true even for an S3 URI source (no local ingestion window)', async () => { + const kb = new KnowledgeBase({ id: 'test' }, 'synceds3', { source: 's3://my-docs-bucket' }); + assert.strictEqual(await kb.isSynced(), true); + }); + + test('waitUntilSynced() ignores options and resolves immediately', async () => { + const kb = new KnowledgeBase({ id: 'test' }, 'waitopts', { source: 'test-knowledge-tmp' }); + await kb.waitUntilSynced({ timeoutMs: 1, pollIntervalMs: 1 }); + }); +}); + // ── Cleanup after all tests ──────────────────────────────────────────────── test('cleanup', () => { cleanup(); }); diff --git a/packages/bb-knowledge-base/src/index.mock.ts b/packages/bb-knowledge-base/src/index.mock.ts index d873ca9c..44ebeb70 100644 --- a/packages/bb-knowledge-base/src/index.mock.ts +++ b/packages/bb-knowledge-base/src/index.mock.ts @@ -8,7 +8,7 @@ import { existsSync, readFileSync, writeFileSync, mkdirSync, readdirSync, statSy import { join, relative, dirname, extname, resolve, sep } from 'node:path'; import { createHash } from 'node:crypto'; import { buildIndex, search, type TfIdfIndex } from './tfidf.js'; -import type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, MetadataFilter, ChunkingStrategy } from './types.js'; +import type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, MetadataFilter, ChunkingStrategy, WaitUntilSyncedOptions } from './types.js'; import { KnowledgeBaseErrors } from './errors.js'; import { Logger } from '@aws-blocks/bb-logger'; import type { ChildLogger } from '@aws-blocks/bb-logger'; @@ -22,6 +22,7 @@ export type { RetrieveOptions, RetrieveResult, MetadataFilter, + WaitUntilSyncedOptions, } from './types.js'; export { KnowledgeBaseErrors } from './errors.js'; @@ -251,6 +252,34 @@ export class KnowledgeBase extends Scope { return results; } + /** + * Report whether the knowledge base is synced with your latest data. + * + * Local development has no asynchronous ingestion window — the corpus is read + * and indexed synchronously on the first `retrieve()` — so it is always in + * sync and this resolves `true`. (In production the AWS runtime polls the + * Bedrock ingestion-job status, which reports `false` until the latest + * ingestion job reaches `COMPLETE`.) + * + * @returns Always `true` in local development. + */ + async isSynced(): Promise { + return true; + } + + /** + * Resolve once the knowledge base is synced with your latest data. + * + * Local development has no asynchronous ingestion window (see {@link isSynced}), + * so this resolves immediately. The options are accepted for API parity + * with the AWS runtime and are otherwise ignored locally. + * + * @param {WaitUntilSyncedOptions} _options - Accepted for API parity; ignored in local development. + */ + async waitUntilSynced(_options?: WaitUntilSyncedOptions): Promise { + // No-op: the local corpus loads synchronously, so there is nothing to wait for. + } + // ── Lazy loading ────────────────────────────────────────────────────── private ensureLoaded(): Promise { diff --git a/packages/bb-knowledge-base/src/types.ts b/packages/bb-knowledge-base/src/types.ts index 6068bcdb..3ed3e017 100644 --- a/packages/bb-knowledge-base/src/types.ts +++ b/packages/bb-knowledge-base/src/types.ts @@ -153,3 +153,66 @@ export interface RetrieveResult { /** Document metadata key-value pairs. Includes auto-populated `folder` key derived from subfolder structure. */ metadata: Record; } + +// ── Sync Options ─────────────────────────────────────────────────────────── + +/** + * Options for the `waitUntilSynced()` method. + * + * @example + * ```typescript + * // Wait up to 10 minutes, polling every 10 seconds + * await kb.waitUntilSynced({ timeoutMs: 600_000, pollIntervalMs: 10_000 }); + * ``` + */ +export interface WaitUntilSyncedOptions { + /** + * Maximum time to wait for ingestion to complete, in milliseconds. Default: 300000 (5 minutes). + * + * `timeoutMs: 0` is a one-shot check, not a no-poll fast-fail: the deadline is + * evaluated *after* the first `isSynced()` poll, so exactly one poll always + * runs (and can resolve the wait) before a `KnowledgeBaseTimeoutException` is + * thrown. No inter-poll sleep occurs in this case either — the sleep is + * `min(jitter, deadline − now)`, and the remaining-budget clamp drives it to 0 + * when the budget is already spent. Clamped to a minimum of 0. + */ + timeoutMs?: number; + /** + * Delay between sync polls, in milliseconds. Clamped to a minimum of 1ms. + * A small amount of random jitter (±20%) is applied to each delay so that many + * knowledge bases polling after a shared deploy do not fall into lockstep — the + * jitter only varies the wait *between* polls (never the number of polls) and is + * still clamped so a sleep never overruns `timeoutMs`. Default: 5000 (5 seconds). + */ + pollIntervalMs?: number; + /** + * Maximum number of *consecutive* transient control-plane errors to tolerate + * before giving up, instead of aborting the wait on the first blip. Two kinds + * of error are treated as transient during a sync poll: + * - `RetrievalFailedException` — the catch-all for network failures, throttling, + * and other unrecognized SDK errors. + * - A *not-yet-visible* knowledge base: in the post-deploy window the control + * plane can briefly return `ResourceNotFoundException` (the KB or data source + * isn't visible yet), which surfaces as `KnowledgeBaseNotReadyException`. Only + * this control-plane variant is transient — riding it out is the whole point + * of `waitUntilSynced()`. + * + * Each clean poll (ingestion still in progress, or synced) resets the counter, + * so only an unbroken run of failures counts toward the limit. + * + * Terminal errors always short-circuit immediately regardless of this value: + * `IngestionFailedException` (the job failed), a *config* `KnowledgeBaseNotReadyException` + * (the `KB_ID` env var is unset — distinct from the transient not-yet-visible case + * above), and validation errors. Set to `0` to fail fast on the first transient + * error. Clamped to a minimum of 0. Default: 3. + */ + maxConsecutiveTransientErrors?: number; + /** + * Optional {@link AbortSignal} to cancel the wait. When the signal is aborted, + * `waitUntilSynced()` rejects promptly — checked before each poll and during the + * inter-poll delay — with the signal's abort reason (by default a `DOMException` + * named `'AbortError'`, or whatever value was passed to `AbortController.abort(reason)`). + * An already-aborted signal rejects immediately, before any polling. + */ + signal?: AbortSignal; +} diff --git a/packages/blocks/API.md b/packages/blocks/API.md index 4dfc7453..62785807 100644 --- a/packages/blocks/API.md +++ b/packages/blocks/API.md @@ -163,6 +163,7 @@ import { Transaction } from '@aws-blocks/bb-data'; import { TransactionOptions } from '@aws-blocks/bb-distributed-data'; import { UpdateAttributeOutcome } from '@aws-blocks/bb-auth-cognito'; import { UserAttribute } from '@aws-blocks/bb-auth-cognito'; +import { WaitUntilSyncedOptions } from '@aws-blocks/bb-knowledge-base'; export { Agent } @@ -571,6 +572,8 @@ export { UpdateAttributeOutcome } export { UserAttribute } +export { WaitUntilSyncedOptions } + export * from "@aws-blocks/core"; diff --git a/packages/blocks/src/index.cdk.ts b/packages/blocks/src/index.cdk.ts index 771cef0c..b18940aa 100644 --- a/packages/blocks/src/index.cdk.ts +++ b/packages/blocks/src/index.cdk.ts @@ -53,7 +53,7 @@ export type { FileBucketOptions, PutOptions as FBPutOptions, GetUrlOptions, PutU export { AppSetting, AppSettingErrors } from '@aws-blocks/bb-app-setting'; export type { AppSettingOptions } from '@aws-blocks/bb-app-setting'; export { KnowledgeBase, KnowledgeBaseErrors } from '@aws-blocks/bb-knowledge-base'; -export type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, MetadataFilter, SourceConfig, ChunkingConfig, ChunkingStrategy } from '@aws-blocks/bb-knowledge-base'; +export type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, MetadataFilter, SourceConfig, ChunkingConfig, ChunkingStrategy, WaitUntilSyncedOptions } from '@aws-blocks/bb-knowledge-base'; export { Tracer } from '@aws-blocks/bb-tracer'; export type { TracerOptions, Segment, AnnotationValue } from '@aws-blocks/bb-tracer'; export { Logger, LoggingErrors } from '@aws-blocks/bb-logger'; diff --git a/packages/blocks/src/index.ts b/packages/blocks/src/index.ts index 28a5727e..48ad1215 100644 --- a/packages/blocks/src/index.ts +++ b/packages/blocks/src/index.ts @@ -286,7 +286,7 @@ export type { AppSettingOptions } from '@aws-blocks/bb-app-setting'; * Full docs: `README.md` in the package directory above. */ export { KnowledgeBase, KnowledgeBaseErrors } from '@aws-blocks/bb-knowledge-base'; -export type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, MetadataFilter, SourceConfig, ChunkingConfig, ChunkingStrategy } from '@aws-blocks/bb-knowledge-base'; +export type { KnowledgeBaseOptions, RetrieveOptions, RetrieveResult, MetadataFilter, SourceConfig, ChunkingConfig, ChunkingStrategy, WaitUntilSyncedOptions } from '@aws-blocks/bb-knowledge-base'; /** * **Distributed tracing backed by AWS X-Ray.** diff --git a/test-apps/comprehensive/aws-blocks/index.ts b/test-apps/comprehensive/aws-blocks/index.ts index 966d0857..01c5d909 100644 --- a/test-apps/comprehensive/aws-blocks/index.ts +++ b/test-apps/comprehensive/aws-blocks/index.ts @@ -13,7 +13,7 @@ import { DistributedTableErrors } from '@aws-blocks/bb-distributed-table'; import { isBlocksError } from '@aws-blocks/core'; import { AsyncJob } from '@aws-blocks/bb-async-job'; import { AppSetting } from '@aws-blocks/bb-app-setting'; -import type { RetrieveOptions } from '@aws-blocks/bb-knowledge-base'; +import type { RetrieveOptions, WaitUntilSyncedOptions } from '@aws-blocks/bb-knowledge-base'; import { Tracer } from '@aws-blocks/bb-tracer'; import { Logger } from '@aws-blocks/bb-logger'; import { createKyselyAdapter, DatabaseErrors } from '@aws-blocks/bb-data'; @@ -1863,6 +1863,18 @@ export const api = new ApiNamespace(scope, 'api', (context) => ({ return await kb.retrieve(query, options); }, + // Ingestion sync — Bedrock ingests asynchronously after deploy, so e2e + // tests gate retrieval on these instead of polling retrieve() for results. + // The local mock reports synced immediately. + async kbSynced() { + return await kb.isSynced(); + }, + + async kbWaitUntilSynced(options?: WaitUntilSyncedOptions) { + await kb.waitUntilSynced(options); + return { success: true }; + }, + // ------------------------------------------------------------------------ // EmailClient Tests // ------------------------------------------------------------------------ diff --git a/test-apps/comprehensive/test/knowledge-base.test.ts b/test-apps/comprehensive/test/knowledge-base.test.ts index 1aea12af..aea1a062 100644 --- a/test-apps/comprehensive/test/knowledge-base.test.ts +++ b/test-apps/comprehensive/test/knowledge-base.test.ts @@ -3,44 +3,48 @@ import { describe, test, before } from 'node:test'; import assert from 'node:assert'; -import { setTimeout } from 'node:timers/promises'; import { isBlocksError } from '@aws-blocks/core'; import type { api as apiType } from 'aws-blocks'; const ValidationError = 'KnowledgeBaseValidationError'; +const Timeout = 'KnowledgeBaseTimeoutException'; const ENV = process.env.BLOCKS_TEST_ENV || 'local'; const isLocal = ENV === 'local'; /** - * Poll kbRetrieve until at least one result is returned, indicating that - * the knowledge-base ingestion job has completed. In local mode the mock - * returns results immediately; in sandbox/production Bedrock ingestion is - * async and may take a couple of minutes after deploy. + * Gate retrieval tests on knowledge-base ingestion sync. + * + * Bedrock ingests asynchronously after deploy, so during the initial pre-sync + * window we wait for the KB to sync with our latest data before probing + * `kbRetrieve`. We delegate to the wired `waitUntilSynced()` endpoint (exposed + * here as `kbWaitUntilSynced`) rather than hand-rolling a poll loop over + * `isSynced()` (`kbSynced`): `waitUntilSynced()` owns the deadline AND rides out + * transient control-plane blips — throttling, a `RetrievalFailed`, or a brief + * not-yet-visible `ResourceNotFoundException` during the post-deploy poll — that + * a per-poll `isSynced()` would otherwise surface as a hard suite failure. + * + * A *thrown* error here is therefore a real failure (a failed ingestion job + * surfaced as `IngestionFailedException`, a `KnowledgeBaseValidationError`, the + * sync timeout, or anything unexpected) and is surfaced immediately rather than + * masked as an in-progress sync. + * + * In local mode the mock resolves immediately, so this returns on the first poll. */ -async function waitForIngestion( +async function gateOnSync( api: typeof apiType, - query: string, - { timeoutMs = 180_000, intervalMs = 10_000 } = {}, + { timeoutMs = 180_000, pollIntervalMs = 10_000 } = {}, ): Promise { const start = Date.now(); - const deadline = start + timeoutMs; - let attempt = 0; - while (Date.now() < deadline) { - attempt++; - const elapsed = Math.round((Date.now() - start) / 1000); - try { - const results = await api.kbRetrieve(query); - console.log(`[KB ingestion poll #${attempt}] ${results.length} results (${elapsed}s elapsed)`); - if (results.length > 0) return; - } catch (err: any) { - console.log(`[KB ingestion poll #${attempt}] error: ${err.name || err.message} (${elapsed}s elapsed)`); - // ValidationError = real bug, throw immediately - if (isBlocksError(err, ValidationError)) throw err; - // NotReady, RetrievalFailed, etc. = KB not ready yet, keep polling - } - await setTimeout(intervalMs); + console.log('⏳ Waiting for KB to sync with latest data (ingesting if needed)…'); + try { + await api.kbWaitUntilSynced({ timeoutMs, pollIntervalMs }); + } catch (err: any) { + // Real failure (failed ingestion / validation / timeout / unexpected) — surface it. + console.error(`❌ KB sync check failed: ${err.name || err.message}`); + throw err; } - throw new Error(`KB ingestion did not complete within ${timeoutMs / 1000}s`); + const elapsed = Math.round((Date.now() - start) / 1000); + console.log(`✅ KB synced (ingestion complete) — ${elapsed}s elapsed`); } export function knowledgeBaseTests(getApi: () => typeof apiType) { @@ -72,12 +76,46 @@ export function knowledgeBaseTests(getApi: () => typeof apiType) { }); }); + // --- Sync: cover the wired waitUntilSynced() endpoint end-to-end --- + // The retrieval suites gate via gateOnSync() → kbWaitUntilSynced(); this + // exercises that same waitUntilSynced() polling path directly. (kbSynced / + // isSynced() is wired but not exercised by this suite — it's covered by unit + // tests.) Locally the mock resolves on the first poll; on AWS we give it the + // same budget as gateOnSync so a still-ingesting KB is waited out rather + // than surfaced as a failure. + describe('waitUntilSynced', () => { + test('resolves once the KB is synced', async (t) => { + const api = getApi(); + try { + const result = await api.kbWaitUntilSynced( + isLocal + ? { timeoutMs: 5_000, pollIntervalMs: 50 } + : { timeoutMs: 180_000, pollIntervalMs: 10_000 }, + ); + assert.deepStrictEqual(result, { success: true }); + } catch (err: unknown) { + // Sync tolerance: unlike the retrieval suites, this standalone test is NOT + // behind the gateOnSync() gate, so on AWS a slow-but-healthy KB whose + // ingestion outruns the 180s budget makes kbWaitUntilSynced throw a Timeout. + // That's the post-deploy sync window, not a defect — soft-skip it here. A + // genuine IngestionFailed (or any other error) still fails the test. The local + // mock resolves on the first poll, so this branch never trips and the success + // assertion above always runs. + if (isBlocksError(err, Timeout)) { + t.skip(`KB still syncing — ingestion exceeded budget: ${(err as Error).message}`); + return; + } + throw err; + } + }); + }); + // --- Retrieval tests: wait for ingestion before running --- describe('retrieve', () => { before(async () => { const api = getApi(); - await waitForIngestion(api, 'getting started'); + await gateOnSync(api); }); test('returns results for a matching query', async () => { @@ -134,7 +172,7 @@ export function knowledgeBaseTests(getApi: () => typeof apiType) { before(async () => { const api = getApi(); - await waitForIngestion(api, 'getting started'); + await gateOnSync(api); }); test('maxResults limits results', async () => { @@ -172,7 +210,7 @@ export function knowledgeBaseTests(getApi: () => typeof apiType) { before(async () => { const api = getApi(); - await waitForIngestion(api, 'deployment'); + await gateOnSync(api); }); test('customer metadata category is present on tutorial doc', async () => {