Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/skeleton/spatial_skeleton_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -898,4 +898,105 @@ describe("skeleton/spatial_skeleton_manager", () => {
expect(state.getCachedSegmentNodes(1)).toBeUndefined();
expect(state.getCachedNode(1)).toBeUndefined();
});

function makeLimiterTestLayer(itemLimit?: number) {
const resolvers: Array<
(
value: Array<{
nodeId: number;
parentNodeId?: number;
position: Float32Array;
segmentId: number;
isTrueEnd: boolean;
}>,
) => void
> = [];
const getSkeleton = vi.fn(
(_segmentId: number, options?: { signal?: AbortSignal }) =>
new Promise<
Array<{
nodeId: number;
parentNodeId?: number;
position: Float32Array;
segmentId: number;
isTrueEnd: boolean;
}>
>((resolve, reject) => {
resolvers.push(resolve);
options?.signal?.addEventListener(
"abort",
() => reject(options.signal?.reason),
{ once: true },
);
}),
);
const skeletonLayer = {
source: {
readonly: false,
listSkeletons: async () => [],
getSkeleton,
fetchNodes: async () => [],
getSpatialIndexMetadata: async () => null,
},
...(itemLimit === undefined
? {}
: {
chunkManager: {
chunkQueueManager: {
capacities: { download: { itemLimit: { value: itemLimit } } },
},
},
}),
} as any;
return { skeletonLayer, getSkeleton, resolvers };
}

it("caps concurrent full segment fetches at the download item limit", async () => {
const state = new SpatialSkeletonState();
const { skeletonLayer, getSkeleton, resolvers } = makeLimiterTestLayer(2);

const pending = [11, 12, 13, 14].map((segmentId) =>
state.getFullSegmentNodes(skeletonLayer, segmentId),
);

expect(getSkeleton).toHaveBeenCalledTimes(2);
resolvers[0]([]);
await pending[0];
expect(getSkeleton).toHaveBeenCalledTimes(3);
resolvers[1]([]);
resolvers[2]([]);
await Promise.all([pending[1], pending[2]]);
expect(getSkeleton).toHaveBeenCalledTimes(4);
resolvers[3]([]);
await pending[3];
});

it("caps concurrent full segment fetches when no chunk manager is available", () => {
const state = new SpatialSkeletonState();
const { skeletonLayer, getSkeleton } = makeLimiterTestLayer();

for (let segmentId = 1; segmentId <= 10; ++segmentId) {
void state
.getFullSegmentNodes(skeletonLayer, segmentId)
.catch(() => undefined);
}

expect(getSkeleton).toHaveBeenCalledTimes(8);
});

it("never starts a queued full segment fetch that is evicted first", async () => {
const state = new SpatialSkeletonState();
const { skeletonLayer, getSkeleton, resolvers } = makeLimiterTestLayer(1);

const first = state.getFullSegmentNodes(skeletonLayer, 11);
const queued = state.getFullSegmentNodes(skeletonLayer, 12);
expect(getSkeleton).toHaveBeenCalledTimes(1);

expect(state.evictInactiveSegmentNodes([11])).toBe(false);
await expect(queued).rejects.toMatchObject({ name: "AbortError" });

resolvers[0]([]);
await first;
expect(getSkeleton).toHaveBeenCalledTimes(1);
});
});
87 changes: 56 additions & 31 deletions src/skeleton/spatial_skeleton_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { SpatialSkeletonCommandHistory } from "#src/skeleton/command_history.js"
import type { SpatiallyIndexedSkeletonLayer } from "#src/skeleton/frontend.js";
import { WatchableValue } from "#src/trackable_value.js";
import { RefCounted } from "#src/util/disposable.js";
import { PromiseConcurrencyLimiter } from "#src/util/promise_concurrency_limiter.js";

interface SpatialSkeletonSourceAccess {
source: unknown;
Expand Down Expand Up @@ -228,6 +229,12 @@ function cloneSpatiallyIndexedSkeletonNode(
};
}

/**
* Full-segment skeleton fetches bypass the chunk queue manager, so they are
* capped separately at min(this, the concurrentDownloads viewer setting).
*/
const MAX_CONCURRENT_FULL_SEGMENT_NODE_FETCHES = 8;

export class SpatialSkeletonState extends RefCounted {
readonly commandHistory = this.registerDisposer(
new SpatialSkeletonCommandHistory(),
Expand All @@ -254,6 +261,18 @@ export class SpatialSkeletonState extends RefCounted {
abortController: AbortController;
}
>();
private fullSegmentNodeFetchLimitLayer:
| SpatiallyIndexedSkeletonLayer
| undefined;
private fullSegmentNodeFetchLimiter = new PromiseConcurrencyLimiter(() => {
const itemLimit =
this.fullSegmentNodeFetchLimitLayer?.chunkManager?.chunkQueueManager
?.capacities?.download?.itemLimit?.value;
return Math.min(
MAX_CONCURRENT_FULL_SEGMENT_NODE_FETCHES,
itemLimit ?? Number.POSITIVE_INFINITY,
);
});
private cachedNodesById = new Map<number, SpatiallyIndexedSkeletonNode>();

setNodeRadius(nodeId: number, radius: number) {
Expand Down Expand Up @@ -859,38 +878,44 @@ export class SpatialSkeletonState extends RefCounted {
const pendingFetch: {
promise?: Promise<SpatiallyIndexedSkeletonNode[]>;
} = {};
const fetchPromise = (async () => {
const fetchedNodes = await skeletonSource.getSkeleton(segmentId, {
signal: abortController.signal,
});
const normalizedNodes: SpatiallyIndexedSkeletonNode[] = [];
for (const fetchedNode of fetchedNodes) {
const mappedNode = normalizeSpatiallyIndexedSkeletonNode(
fetchedNode,
segmentId,
);
if (mappedNode === undefined) continue;
normalizedNodes.push(mappedNode);
}
normalizedNodes.sort((a, b) => a.nodeId - b.nodeId);
if (
this.fullSkeletonCacheGeneration === fetchVersion &&
pendingFetch.promise !== undefined &&
this.pendingFullSegmentNodeFetches.get(segmentId)?.promise ===
this.fullSegmentNodeFetchLimitLayer = skeletonLayer;
const fetchPromise = this.fullSegmentNodeFetchLimiter
.run(
async () => {
const fetchedNodes = await skeletonSource.getSkeleton(segmentId, {
signal: abortController.signal,
});
const normalizedNodes: SpatiallyIndexedSkeletonNode[] = [];
for (const fetchedNode of fetchedNodes) {
const mappedNode = normalizeSpatiallyIndexedSkeletonNode(
fetchedNode,
segmentId,
);
if (mappedNode === undefined) continue;
normalizedNodes.push(mappedNode);
}
normalizedNodes.sort((a, b) => a.nodeId - b.nodeId);
if (
this.fullSkeletonCacheGeneration === fetchVersion &&
pendingFetch.promise !== undefined &&
this.pendingFullSegmentNodeFetches.get(segmentId)?.promise ===
pendingFetch.promise
) {
this.replaceCachedSegmentNodes(segmentId, normalizedNodes);
this.markNodeDataChanged({ invalidateFullSkeletonCache: false });
}
return normalizedNodes;
},
{ signal: abortController.signal },
)
.finally(() => {
if (
this.pendingFullSegmentNodeFetches.get(segmentId)?.promise ===
pendingFetch.promise
) {
this.replaceCachedSegmentNodes(segmentId, normalizedNodes);
this.markNodeDataChanged({ invalidateFullSkeletonCache: false });
}
return normalizedNodes;
})().finally(() => {
if (
this.pendingFullSegmentNodeFetches.get(segmentId)?.promise ===
pendingFetch.promise
) {
this.pendingFullSegmentNodeFetches.delete(segmentId);
}
});
) {
this.pendingFullSegmentNodeFetches.delete(segmentId);
}
});
pendingFetch.promise = fetchPromise;
this.pendingFullSegmentNodeFetches.set(segmentId, {
promise: fetchPromise,
Expand Down
162 changes: 162 additions & 0 deletions src/util/promise_concurrency_limiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/**
* @license
* Copyright 2026 Google Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { describe, expect, test } from "vitest";
import { PromiseConcurrencyLimiter } from "#src/util/promise_concurrency_limiter.js";

function makeDeferredTask() {
let resolve!: () => void;
let reject!: (reason: unknown) => void;
let started = false;
const promise = new Promise<void>((promiseResolve, promiseReject) => {
resolve = promiseResolve;
reject = promiseReject;
});
return {
task: () => {
started = true;
return promise;
},
get started() {
return started;
},
resolve,
reject,
};
}

describe("PromiseConcurrencyLimiter", () => {
test("starts tasks synchronously while under the limit", () => {
const limiter = new PromiseConcurrencyLimiter(() => 2);
const a = makeDeferredTask();
const b = makeDeferredTask();
void limiter.run(a.task);
void limiter.run(b.task);
expect(a.started).toBe(true);
expect(b.started).toBe(true);
expect(limiter.pendingCount).toBe(0);
});

test("queues tasks beyond the limit and dequeues in FIFO order", async () => {
const limiter = new PromiseConcurrencyLimiter(() => 1);
const a = makeDeferredTask();
const b = makeDeferredTask();
const c = makeDeferredTask();
const aPromise = limiter.run(a.task);
void limiter.run(b.task);
void limiter.run(c.task);
expect(a.started).toBe(true);
expect(b.started).toBe(false);
expect(c.started).toBe(false);
expect(limiter.pendingCount).toBe(2);
a.resolve();
await aPromise;
expect(b.started).toBe(true);
expect(c.started).toBe(false);
expect(limiter.pendingCount).toBe(1);
});

test("a rejected task releases its slot", async () => {
const limiter = new PromiseConcurrencyLimiter(() => 1);
const a = makeDeferredTask();
const b = makeDeferredTask();
const aPromise = limiter.run(a.task);
void limiter.run(b.task);
a.reject(new Error("failure"));
await expect(aPromise).rejects.toThrowError("failure");
expect(b.started).toBe(true);
});

test("a synchronously throwing task releases its slot", async () => {
const limiter = new PromiseConcurrencyLimiter(() => 1);
const b = makeDeferredTask();
const aPromise = limiter.run(() => {
throw new Error("sync failure");
});
void limiter.run(b.task);
await expect(aPromise).rejects.toThrowError("sync failure");
expect(b.started).toBe(true);
});

test("re-reads the limit at each dispatch", async () => {
let limit = 1;
const limiter = new PromiseConcurrencyLimiter(() => limit);
const a = makeDeferredTask();
const b = makeDeferredTask();
const c = makeDeferredTask();
const aPromise = limiter.run(a.task);
void limiter.run(b.task);
void limiter.run(c.task);
expect(b.started).toBe(false);
limit = 2;
a.resolve();
await aPromise;
expect(b.started).toBe(true);
expect(c.started).toBe(true);
});

test("treats a limit below one as one", () => {
const limiter = new PromiseConcurrencyLimiter(() => 0);
const a = makeDeferredTask();
const b = makeDeferredTask();
void limiter.run(a.task);
void limiter.run(b.task);
expect(a.started).toBe(true);
expect(b.started).toBe(false);
});

test("rejects immediately for an already aborted signal", async () => {
const limiter = new PromiseConcurrencyLimiter(() => 1);
const controller = new AbortController();
controller.abort(new Error("already aborted"));
const a = makeDeferredTask();
await expect(
limiter.run(a.task, { signal: controller.signal }),
).rejects.toThrowError("already aborted");
expect(a.started).toBe(false);
});

test("aborting a queued task rejects it without consuming a slot", async () => {
const limiter = new PromiseConcurrencyLimiter(() => 1);
const a = makeDeferredTask();
const b = makeDeferredTask();
const c = makeDeferredTask();
const controller = new AbortController();
const aPromise = limiter.run(a.task);
const bPromise = limiter.run(b.task, { signal: controller.signal });
void limiter.run(c.task);
controller.abort(new Error("queued abort"));
await expect(bPromise).rejects.toThrowError("queued abort");
expect(b.started).toBe(false);
expect(limiter.pendingCount).toBe(1);
a.resolve();
await aPromise;
expect(c.started).toBe(true);
});

test("aborting after a task starts does not affect the limiter", async () => {
const limiter = new PromiseConcurrencyLimiter(() => 1);
const a = makeDeferredTask();
const b = makeDeferredTask();
const controller = new AbortController();
const aPromise = limiter.run(a.task, { signal: controller.signal });
void limiter.run(b.task);
a.resolve();
await aPromise;
controller.abort();
expect(b.started).toBe(true);
});
});
Loading