diff --git a/src/skeleton/spatial_skeleton_manager.spec.ts b/src/skeleton/spatial_skeleton_manager.spec.ts index d47b7f092..64b7b4f54 100644 --- a/src/skeleton/spatial_skeleton_manager.spec.ts +++ b/src/skeleton/spatial_skeleton_manager.spec.ts @@ -898,4 +898,105 @@ describe("skeleton/spatial_skeleton_manager", () => { expect(state.getCachedSegmentNodes(1)).toBeUndefined(); expect(state.getCachedNode(1)).toBeUndefined(); }); + + function makeLimiterTestLayer(itemLimit?: number) { + const resolvers: Array< + ( + value: Array<{ + nodeId: number; + parentNodeId?: number; + position: Float32Array; + segmentId: number; + isTrueEnd: boolean; + }>, + ) => void + > = []; + const getSkeleton = vi.fn( + (_segmentId: number, options?: { signal?: AbortSignal }) => + new Promise< + Array<{ + nodeId: number; + parentNodeId?: number; + position: Float32Array; + segmentId: number; + isTrueEnd: boolean; + }> + >((resolve, reject) => { + resolvers.push(resolve); + options?.signal?.addEventListener( + "abort", + () => reject(options.signal?.reason), + { once: true }, + ); + }), + ); + const skeletonLayer = { + source: { + readonly: false, + listSkeletons: async () => [], + getSkeleton, + fetchNodes: async () => [], + getSpatialIndexMetadata: async () => null, + }, + ...(itemLimit === undefined + ? {} + : { + chunkManager: { + chunkQueueManager: { + capacities: { download: { itemLimit: { value: itemLimit } } }, + }, + }, + }), + } as any; + return { skeletonLayer, getSkeleton, resolvers }; + } + + it("caps concurrent full segment fetches at the download item limit", async () => { + const state = new SpatialSkeletonState(); + const { skeletonLayer, getSkeleton, resolvers } = makeLimiterTestLayer(2); + + const pending = [11, 12, 13, 14].map((segmentId) => + state.getFullSegmentNodes(skeletonLayer, segmentId), + ); + + expect(getSkeleton).toHaveBeenCalledTimes(2); + resolvers[0]([]); + await pending[0]; + expect(getSkeleton).toHaveBeenCalledTimes(3); + resolvers[1]([]); + resolvers[2]([]); + await Promise.all([pending[1], pending[2]]); + expect(getSkeleton).toHaveBeenCalledTimes(4); + resolvers[3]([]); + await pending[3]; + }); + + it("caps concurrent full segment fetches when no chunk manager is available", () => { + const state = new SpatialSkeletonState(); + const { skeletonLayer, getSkeleton } = makeLimiterTestLayer(); + + for (let segmentId = 1; segmentId <= 10; ++segmentId) { + void state + .getFullSegmentNodes(skeletonLayer, segmentId) + .catch(() => undefined); + } + + expect(getSkeleton).toHaveBeenCalledTimes(8); + }); + + it("never starts a queued full segment fetch that is evicted first", async () => { + const state = new SpatialSkeletonState(); + const { skeletonLayer, getSkeleton, resolvers } = makeLimiterTestLayer(1); + + const first = state.getFullSegmentNodes(skeletonLayer, 11); + const queued = state.getFullSegmentNodes(skeletonLayer, 12); + expect(getSkeleton).toHaveBeenCalledTimes(1); + + expect(state.evictInactiveSegmentNodes([11])).toBe(false); + await expect(queued).rejects.toMatchObject({ name: "AbortError" }); + + resolvers[0]([]); + await first; + expect(getSkeleton).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/skeleton/spatial_skeleton_manager.ts b/src/skeleton/spatial_skeleton_manager.ts index f0d4565e4..d8d1c6538 100644 --- a/src/skeleton/spatial_skeleton_manager.ts +++ b/src/skeleton/spatial_skeleton_manager.ts @@ -32,6 +32,7 @@ import { SpatialSkeletonCommandHistory } from "#src/skeleton/command_history.js" import type { SpatiallyIndexedSkeletonLayer } from "#src/skeleton/frontend.js"; import { WatchableValue } from "#src/trackable_value.js"; import { RefCounted } from "#src/util/disposable.js"; +import { PromiseConcurrencyLimiter } from "#src/util/promise_concurrency_limiter.js"; interface SpatialSkeletonSourceAccess { source: unknown; @@ -228,6 +229,12 @@ function cloneSpatiallyIndexedSkeletonNode( }; } +/** + * Full-segment skeleton fetches bypass the chunk queue manager, so they are + * capped separately at min(this, the concurrentDownloads viewer setting). + */ +const MAX_CONCURRENT_FULL_SEGMENT_NODE_FETCHES = 8; + export class SpatialSkeletonState extends RefCounted { readonly commandHistory = this.registerDisposer( new SpatialSkeletonCommandHistory(), @@ -254,6 +261,18 @@ export class SpatialSkeletonState extends RefCounted { abortController: AbortController; } >(); + private fullSegmentNodeFetchLimitLayer: + | SpatiallyIndexedSkeletonLayer + | undefined; + private fullSegmentNodeFetchLimiter = new PromiseConcurrencyLimiter(() => { + const itemLimit = + this.fullSegmentNodeFetchLimitLayer?.chunkManager?.chunkQueueManager + ?.capacities?.download?.itemLimit?.value; + return Math.min( + MAX_CONCURRENT_FULL_SEGMENT_NODE_FETCHES, + itemLimit ?? Number.POSITIVE_INFINITY, + ); + }); private cachedNodesById = new Map(); setNodeRadius(nodeId: number, radius: number) { @@ -859,38 +878,44 @@ export class SpatialSkeletonState extends RefCounted { const pendingFetch: { promise?: Promise; } = {}; - const fetchPromise = (async () => { - const fetchedNodes = await skeletonSource.getSkeleton(segmentId, { - signal: abortController.signal, - }); - const normalizedNodes: SpatiallyIndexedSkeletonNode[] = []; - for (const fetchedNode of fetchedNodes) { - const mappedNode = normalizeSpatiallyIndexedSkeletonNode( - fetchedNode, - segmentId, - ); - if (mappedNode === undefined) continue; - normalizedNodes.push(mappedNode); - } - normalizedNodes.sort((a, b) => a.nodeId - b.nodeId); - if ( - this.fullSkeletonCacheGeneration === fetchVersion && - pendingFetch.promise !== undefined && - this.pendingFullSegmentNodeFetches.get(segmentId)?.promise === + this.fullSegmentNodeFetchLimitLayer = skeletonLayer; + const fetchPromise = this.fullSegmentNodeFetchLimiter + .run( + async () => { + const fetchedNodes = await skeletonSource.getSkeleton(segmentId, { + signal: abortController.signal, + }); + const normalizedNodes: SpatiallyIndexedSkeletonNode[] = []; + for (const fetchedNode of fetchedNodes) { + const mappedNode = normalizeSpatiallyIndexedSkeletonNode( + fetchedNode, + segmentId, + ); + if (mappedNode === undefined) continue; + normalizedNodes.push(mappedNode); + } + normalizedNodes.sort((a, b) => a.nodeId - b.nodeId); + if ( + this.fullSkeletonCacheGeneration === fetchVersion && + pendingFetch.promise !== undefined && + this.pendingFullSegmentNodeFetches.get(segmentId)?.promise === + pendingFetch.promise + ) { + this.replaceCachedSegmentNodes(segmentId, normalizedNodes); + this.markNodeDataChanged({ invalidateFullSkeletonCache: false }); + } + return normalizedNodes; + }, + { signal: abortController.signal }, + ) + .finally(() => { + if ( + this.pendingFullSegmentNodeFetches.get(segmentId)?.promise === pendingFetch.promise - ) { - this.replaceCachedSegmentNodes(segmentId, normalizedNodes); - this.markNodeDataChanged({ invalidateFullSkeletonCache: false }); - } - return normalizedNodes; - })().finally(() => { - if ( - this.pendingFullSegmentNodeFetches.get(segmentId)?.promise === - pendingFetch.promise - ) { - this.pendingFullSegmentNodeFetches.delete(segmentId); - } - }); + ) { + this.pendingFullSegmentNodeFetches.delete(segmentId); + } + }); pendingFetch.promise = fetchPromise; this.pendingFullSegmentNodeFetches.set(segmentId, { promise: fetchPromise, diff --git a/src/util/promise_concurrency_limiter.spec.ts b/src/util/promise_concurrency_limiter.spec.ts new file mode 100644 index 000000000..05eaf2123 --- /dev/null +++ b/src/util/promise_concurrency_limiter.spec.ts @@ -0,0 +1,162 @@ +/** + * @license + * Copyright 2026 Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { describe, expect, test } from "vitest"; +import { PromiseConcurrencyLimiter } from "#src/util/promise_concurrency_limiter.js"; + +function makeDeferredTask() { + let resolve!: () => void; + let reject!: (reason: unknown) => void; + let started = false; + const promise = new Promise((promiseResolve, promiseReject) => { + resolve = promiseResolve; + reject = promiseReject; + }); + return { + task: () => { + started = true; + return promise; + }, + get started() { + return started; + }, + resolve, + reject, + }; +} + +describe("PromiseConcurrencyLimiter", () => { + test("starts tasks synchronously while under the limit", () => { + const limiter = new PromiseConcurrencyLimiter(() => 2); + const a = makeDeferredTask(); + const b = makeDeferredTask(); + void limiter.run(a.task); + void limiter.run(b.task); + expect(a.started).toBe(true); + expect(b.started).toBe(true); + expect(limiter.pendingCount).toBe(0); + }); + + test("queues tasks beyond the limit and dequeues in FIFO order", async () => { + const limiter = new PromiseConcurrencyLimiter(() => 1); + const a = makeDeferredTask(); + const b = makeDeferredTask(); + const c = makeDeferredTask(); + const aPromise = limiter.run(a.task); + void limiter.run(b.task); + void limiter.run(c.task); + expect(a.started).toBe(true); + expect(b.started).toBe(false); + expect(c.started).toBe(false); + expect(limiter.pendingCount).toBe(2); + a.resolve(); + await aPromise; + expect(b.started).toBe(true); + expect(c.started).toBe(false); + expect(limiter.pendingCount).toBe(1); + }); + + test("a rejected task releases its slot", async () => { + const limiter = new PromiseConcurrencyLimiter(() => 1); + const a = makeDeferredTask(); + const b = makeDeferredTask(); + const aPromise = limiter.run(a.task); + void limiter.run(b.task); + a.reject(new Error("failure")); + await expect(aPromise).rejects.toThrowError("failure"); + expect(b.started).toBe(true); + }); + + test("a synchronously throwing task releases its slot", async () => { + const limiter = new PromiseConcurrencyLimiter(() => 1); + const b = makeDeferredTask(); + const aPromise = limiter.run(() => { + throw new Error("sync failure"); + }); + void limiter.run(b.task); + await expect(aPromise).rejects.toThrowError("sync failure"); + expect(b.started).toBe(true); + }); + + test("re-reads the limit at each dispatch", async () => { + let limit = 1; + const limiter = new PromiseConcurrencyLimiter(() => limit); + const a = makeDeferredTask(); + const b = makeDeferredTask(); + const c = makeDeferredTask(); + const aPromise = limiter.run(a.task); + void limiter.run(b.task); + void limiter.run(c.task); + expect(b.started).toBe(false); + limit = 2; + a.resolve(); + await aPromise; + expect(b.started).toBe(true); + expect(c.started).toBe(true); + }); + + test("treats a limit below one as one", () => { + const limiter = new PromiseConcurrencyLimiter(() => 0); + const a = makeDeferredTask(); + const b = makeDeferredTask(); + void limiter.run(a.task); + void limiter.run(b.task); + expect(a.started).toBe(true); + expect(b.started).toBe(false); + }); + + test("rejects immediately for an already aborted signal", async () => { + const limiter = new PromiseConcurrencyLimiter(() => 1); + const controller = new AbortController(); + controller.abort(new Error("already aborted")); + const a = makeDeferredTask(); + await expect( + limiter.run(a.task, { signal: controller.signal }), + ).rejects.toThrowError("already aborted"); + expect(a.started).toBe(false); + }); + + test("aborting a queued task rejects it without consuming a slot", async () => { + const limiter = new PromiseConcurrencyLimiter(() => 1); + const a = makeDeferredTask(); + const b = makeDeferredTask(); + const c = makeDeferredTask(); + const controller = new AbortController(); + const aPromise = limiter.run(a.task); + const bPromise = limiter.run(b.task, { signal: controller.signal }); + void limiter.run(c.task); + controller.abort(new Error("queued abort")); + await expect(bPromise).rejects.toThrowError("queued abort"); + expect(b.started).toBe(false); + expect(limiter.pendingCount).toBe(1); + a.resolve(); + await aPromise; + expect(c.started).toBe(true); + }); + + test("aborting after a task starts does not affect the limiter", async () => { + const limiter = new PromiseConcurrencyLimiter(() => 1); + const a = makeDeferredTask(); + const b = makeDeferredTask(); + const controller = new AbortController(); + const aPromise = limiter.run(a.task, { signal: controller.signal }); + void limiter.run(b.task); + a.resolve(); + await aPromise; + controller.abort(); + expect(b.started).toBe(true); + }); +}); diff --git a/src/util/promise_concurrency_limiter.ts b/src/util/promise_concurrency_limiter.ts new file mode 100644 index 000000000..d09f05e76 --- /dev/null +++ b/src/util/promise_concurrency_limiter.ts @@ -0,0 +1,104 @@ +/** + * @license + * Copyright 2026 Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +interface QueuedTask { + start: () => void; + reject: (reason: unknown) => void; + signal: AbortSignal | undefined; + abortListener: (() => void) | undefined; +} + +/** + * Limits the number of concurrently running promise-returning tasks. + * + * Tasks submitted while under the limit start synchronously; the rest queue + * in FIFO order and start as running tasks settle. The limit is re-read at + * each dispatch, so it may change dynamically. + */ +export class PromiseConcurrencyLimiter { + private runningCount = 0; + private queue: QueuedTask[] = []; + + constructor(private getLimit: () => number) {} + + get pendingCount() { + return this.queue.length; + } + + run( + task: () => Promise, + options: { signal?: AbortSignal } = {}, + ): Promise { + const { signal } = options; + if (signal?.aborted) { + return Promise.reject(signal.reason); + } + if (this.runningCount < Math.max(1, this.getLimit())) { + return this.start(task); + } + return new Promise((resolve, reject) => { + const entry: QueuedTask = { + start: () => { + this.start(task).then(resolve, reject); + }, + reject, + signal, + abortListener: undefined, + }; + if (signal !== undefined) { + const abortListener = () => { + const index = this.queue.indexOf(entry); + if (index !== -1) { + this.queue.splice(index, 1); + } + reject(signal.reason); + }; + entry.abortListener = abortListener; + signal.addEventListener("abort", abortListener, { once: true }); + } + this.queue.push(entry); + }); + } + + private start(task: () => Promise): Promise { + ++this.runningCount; + let promise: Promise; + try { + promise = task(); + } catch (error) { + promise = Promise.reject(error); + } + promise.then( + () => this.releaseSlot(), + () => this.releaseSlot(), + ); + return promise; + } + + private releaseSlot() { + --this.runningCount; + while ( + this.queue.length > 0 && + this.runningCount < Math.max(1, this.getLimit()) + ) { + const entry = this.queue.shift()!; + if (entry.abortListener !== undefined) { + entry.signal!.removeEventListener("abort", entry.abortListener); + } + entry.start(); + } + } +}