fix(proxy): 加固 AgentPool 清理与透传 stats#762
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthrough将响应流统计从单一缓冲重构为头/尾滑动窗口,改写流式推送与拼接逻辑并引入 SSE 检测改动与用量解析调整;为代理池引入非阻塞的待办清理追踪与超时删除;更新若干单元测试的 mock 与断言以配合新行为。 Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @tesgth032, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 此拉取请求旨在增强系统的健壮性和稳定性,通过优化 AgentPool 的清理流程,确保在服务关闭时能够更优雅地处理挂起的清理任务,同时避免潜在的内存泄漏。此外,它还改进了 Gemini 代理的统计数据透传机制,通过更智能的缓冲策略和 SSE 文本判断,有效规避了处理超大响应体时可能出现的内存溢出和数据解析错误,从而提升了系统的防御能力和数据处理的准确性。 Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
本次 PR 对 AgentPool 的清理机制和 Gemini stats 的透传逻辑进行了加固,显著提升了系统的稳定性和鲁棒性。AgentPool 的 shutdown 流程通过引入带超时的 best-effort 等待,以及对悬挂 promise 的内存泄漏防御,变得更加健壮。Gemini 大响应体的处理方式升级为“头+尾”窗口,并增加了截断标记,是很好的内存优化和防御性设计。单测的修复也保证了代码质量。整体来看,这是一次高质量的改进。我只在 AgentPool 的 shutdown 方法中发现一个可以优化的地方,即将串行关闭 Agent 的逻辑改回并行处理,以提高效率和代码的惯用性。
src/lib/proxy-agent/agent-pool.ts
Outdated
| for (const [key, cached] of this.cache.entries()) { | ||
| closePromises.push(this.closeAgent(cached.agent, key)); | ||
| await this.closeAgent(cached.agent, key); | ||
| } |
There was a problem hiding this comment.
关闭 agent 的循环由于在 for...of 循环内部使用了 await,现在是串行执行。虽然 closeAgent 函数的设计执行很快,但使用 Promise.all 并行执行这些调用更符合惯例,也稍微更高效。可以考虑使用 Array.from 和 map 来更简洁地实现并行处理。
| for (const [key, cached] of this.cache.entries()) { | |
| closePromises.push(this.closeAgent(cached.agent, key)); | |
| await this.closeAgent(cached.agent, key); | |
| } | |
| await Promise.all( | |
| Array.from(this.cache.entries()).map(([key, cached]) => | |
| this.closeAgent(cached.agent, key) | |
| ) | |
| ); |
| // 优先填充 head;超过 head 上限后切到 tail(但不代表一定发生截断,只有 tail 溢出才算截断) | ||
| if (!inTailMode) { | ||
| if (headBufferedBytes + bytes <= MAX_STATS_HEAD_BYTES) { | ||
| headChunks.push(text); | ||
| headBufferedBytes += bytes; | ||
| return; | ||
| } | ||
|
|
||
| inTailMode = true; | ||
| } |
There was a problem hiding this comment.
Head window drops bytes
In passthrough stats buffering, when headBufferedBytes + bytes would exceed MAX_STATS_HEAD_BYTES, the code switches to tail mode and drops the current chunk entirely from the head, even though part of it could still fit. This creates an unintended gap between head and tail (more truncation than necessary) and can break parsing if important metadata starts near the end of the head window. Consider splitting the chunk (take remaining head capacity, then send the rest to tail) so the head window is truly “first N bytes.”
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1070:1079
Comment:
**Head window drops bytes**
In passthrough stats buffering, when `headBufferedBytes + bytes` would exceed `MAX_STATS_HEAD_BYTES`, the code switches to tail mode and **drops the current chunk entirely from the head**, even though part of it could still fit. This creates an unintended gap between head and tail (more truncation than necessary) and can break parsing if important metadata starts near the end of the head window. Consider splitting the chunk (take remaining head capacity, then send the rest to tail) so the head window is truly “first N bytes.”
How can I resolve this? If you propose a fix, please make it concise.
Additional Comments (1)
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 335:343
Comment:
**Shutdown can hang**
`shutdown()` now does `await this.closeAgent(...)` for each cached entry, but `closeAgent()` is explicitly fire-and-forget and **does not await** the underlying `destroy()/close()` promise. This means `pendingCleanups` won’t be populated in time for the subsequent best-effort wait, and `shutdown()` will return immediately without actually waiting for any cleanup. If the intent is to best-effort wait during shutdown, `closeAgent()` needs a mode that returns/awaits the cleanup promise (or `shutdown()` should directly enqueue/track promises without awaiting `closeAgent()`).
How can I resolve this? If you propose a fix, please make it concise. |
There was a problem hiding this comment.
Code Review Summary
No significant issues identified in this PR. The changes are well-structured defensive hardening for the AgentPool cleanup lifecycle and the passthrough stats buffering strategy. The fire-and-forget cleanup with pendingCleanups tracking, the head+tail buffer split with SSE-safe truncation markers, and the isSSEText() adoption for more accurate SSE detection are all sound approaches.
PR Size: S
- Lines changed: 230 (174 additions, 56 deletions)
- Files changed: 4
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Clean
- Type safety - Clean
- Documentation accuracy - Clean
- Test coverage - Adequate
- Code clarity - Good
Notes
AgentPool (agent-pool.ts):
pendingCleanupstracking with 60s stale-reference timeout and 2s best-effort shutdown wait is a solid pattern for handling never-settling dispatchers.- The
trackedPromiseself-removal in.finally()correctly leverages closure timing (assigned before the callback executes asynchronously).
Response handler (response-handler.ts):
- Head (1MB) + tail (9MB) buffer split with
\n\n: [cch_truncated]\n\nseparator correctly prevents cross-event splicing when head and tail are joined, sinceparseSSEDataignores:comment lines and empty lines flush events. isSSEText()replacingresponseText.includes("data:")eliminates false positives from JSON payloads containing the substring "data:".hasFirstByteTimeoutguard on idle timer startup correctly ensures at least one timeout is always active.
Tests:
- Probe test mock additions (
getEndpointCircuitStateSync,resetEndpointCircuit) are mechanical fixes for new exports ondevbaseline. - Agent pool test additions validate mock function integrity before asserting on call behavior.
Automated review by Claude AI
c2240ca to
0e5b662
Compare
|
已根据本 PR 的 AI review 补齐两个关键边界点: esponse-handler.ts 的 head+tail stats 缓冲:在 head 边界的 chunk 按 byte 切分,保证 head 真的是“前 1MB”,避免整块直接落到 tail 导致 head 出现不必要的空洞。 本地已验证: |
| const headText = decoder.decode(headPart, { stream: true }); | ||
| pushChunk(headText, remainingHeadBytes); | ||
|
|
||
| const tailText = decoder.decode(tailPart, { stream: true }); | ||
| pushChunk(tailText, chunkSize - remainingHeadBytes); |
There was a problem hiding this comment.
Splitting a chunk and calling decode() twice sequentially on the same decoder maintains UTF-8 multi-byte sequence integrity because stream: true preserves incomplete sequences between calls - this is correct. However, pushChunk(headText, remainingHeadBytes) on line 1195 passes the original byte count (remainingHeadBytes), but headText may have fewer characters if the split occurred mid-UTF-8-sequence (decoder will withhold those bytes until the next call). This means headBufferedBytes could become incorrect, potentially under-filling the head window.
Consider tracking the actual decoded byte length or adjusting the accounting to handle incomplete sequences.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1194:1198
Comment:
Splitting a chunk and calling `decode()` twice sequentially on the same decoder maintains UTF-8 multi-byte sequence integrity because `stream: true` preserves incomplete sequences between calls - this is correct. However, `pushChunk(headText, remainingHeadBytes)` on line 1195 passes the original **byte** count (`remainingHeadBytes`), but `headText` may have **fewer characters** if the split occurred mid-UTF-8-sequence (decoder will withhold those bytes until the next call). This means `headBufferedBytes` could become incorrect, potentially under-filling the head window.
Consider tracking the actual decoded byte length or adjusting the accounting to handle incomplete sequences.
How can I resolve this? If you propose a fix, please make it concise.| await Promise.race([ | ||
| Promise.allSettled(pending).then(() => {}), | ||
| new Promise<void>((resolve) => { | ||
| timeoutId = setTimeout(resolve, WAIT_MS); | ||
| timeoutId.unref(); | ||
| }), | ||
| ]); |
There was a problem hiding this comment.
Race condition: if a cleanup promise settles and removes itself from pendingCleanups (line 421) after line 348 creates the array snapshot but before the Promise.race starts, that promise is still included in pending but no longer tracked. When it later settles during the race, line 421 tries to delete from an already-cleared set (line 363 clears after race completes), causing a harmless but unintended delete operation on an empty set.
Low severity - functionally safe, but the timing window could be tightened by checking this.pendingCleanups.size again after the race or accepting this benign race.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 352:358
Comment:
Race condition: if a cleanup promise settles and removes itself from `pendingCleanups` (line 421) **after** line 348 creates the array snapshot but **before** the `Promise.race` starts, that promise is still included in `pending` but no longer tracked. When it later settles during the race, line 421 tries to delete from an already-cleared set (line 363 clears after race completes), causing a harmless but unintended delete operation on an empty set.
Low severity - functionally safe, but the timing window could be tightened by checking `this.pendingCleanups.size` again after the race or accepting this benign race.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/app/v1/_lib/proxy/response-handler.ts`:
- Around line 1110-1114: The chunksCollected value currently only counts tail
chunks via Math.max(0, tailChunks.length - tailHead) and therefore omits head
chunks; update the calculation in response-handler.ts (the code assigning
chunksCollected) to include headChunks.length (i.e., headChunks.length +
Math.max(0, tailChunks.length - tailHead)) so chunksCollected reports the total
number of collected chunks, referencing the existing variables headChunks,
tailChunks, and tailHead.
🧹 Nitpick comments (1)
src/lib/proxy-agent/agent-pool.ts (1)
403-431: cleanup promise 追踪与 60s 安全超时机制实现正确。几点确认:
trackedPromise在.finally()闭包中引用自身是安全的——回调异步执行时变量已完成赋值。- 60s
setTimeout+unref()兜底确保即使 dispatcher 永不 settle 也不会造成内存泄漏。pendingCleanups.delete对不存在的元素是 no-op,与shutdown()中的.clear()不冲突。一个小的风格建议:
trackedPromise可以用const声明(闭包捕获的是引用,异步回调执行时变量已初始化),语义更精确。可选:let → const
- let trackedPromise: Promise<void>; - let dropRefTimeoutId: NodeJS.Timeout | null = null; - - trackedPromise = cleanupPromise + let dropRefTimeoutId: NodeJS.Timeout | null = null; + + const trackedPromise: Promise<void> = cleanupPromise
|
已按 CodeRabbit 的建议更新:
|
| const headText = decoder.decode(headPart, { stream: true }); | ||
| pushChunk(headText, remainingHeadBytes); |
There was a problem hiding this comment.
When splitting at a UTF-8 multi-byte sequence boundary, decoder.decode(headPart, { stream: true }) may withhold incomplete bytes (e.g., 2 bytes of a 3-byte character). The decoder returns text representing fewer bytes than remainingHeadBytes, but pushChunk is told the full remainingHeadBytes count. This causes headBufferedBytes to overcount by 1-3 bytes, triggering premature switch to tail mode and under-filling the head buffer.
Calculate actual decoded byte length:
| const headText = decoder.decode(headPart, { stream: true }); | |
| pushChunk(headText, remainingHeadBytes); | |
| const headText = decoder.decode(headPart, { stream: true }); | |
| const actualHeadBytes = new TextEncoder().encode(headText).byteLength; | |
| pushChunk(headText, actualHeadBytes); |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1194:1195
Comment:
When splitting at a UTF-8 multi-byte sequence boundary, `decoder.decode(headPart, { stream: true })` may withhold incomplete bytes (e.g., 2 bytes of a 3-byte character). The decoder returns text representing fewer bytes than `remainingHeadBytes`, but `pushChunk` is told the full `remainingHeadBytes` count. This causes `headBufferedBytes` to overcount by 1-3 bytes, triggering premature switch to tail mode and under-filling the head buffer.
Calculate actual decoded byte length:
```suggestion
const headText = decoder.decode(headPart, { stream: true });
const actualHeadBytes = new TextEncoder().encode(headText).byteLength;
pushChunk(headText, actualHeadBytes);
```
How can I resolve this? If you propose a fix, please make it concise.| const tailText = decoder.decode(tailPart, { stream: true }); | ||
| pushChunk(tailText, chunkSize - remainingHeadBytes); |
There was a problem hiding this comment.
Same byte accounting issue: chunkSize - remainingHeadBytes represents the tail part's byte count, but if UTF-8 bytes were withheld from the head decode, those bytes are prepended internally by the decoder when decoding tailPart. The decoded tailText will contain more characters than chunkSize - remainingHeadBytes would suggest, leading to inaccurate byte tracking.
| const tailText = decoder.decode(tailPart, { stream: true }); | |
| pushChunk(tailText, chunkSize - remainingHeadBytes); | |
| const tailText = decoder.decode(tailPart, { stream: true }); | |
| const actualTailBytes = new TextEncoder().encode(tailText).byteLength; | |
| pushChunk(tailText, actualTailBytes); |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1197:1198
Comment:
Same byte accounting issue: `chunkSize - remainingHeadBytes` represents the tail part's byte count, but if UTF-8 bytes were withheld from the head decode, those bytes are prepended internally by the decoder when decoding `tailPart`. The decoded `tailText` will contain more characters than `chunkSize - remainingHeadBytes` would suggest, leading to inaccurate byte tracking.
```suggestion
const tailText = decoder.decode(tailPart, { stream: true });
const actualTailBytes = new TextEncoder().encode(tailText).byteLength;
pushChunk(tailText, actualTailBytes);
```
How can I resolve this? If you propose a fix, please make it concise.| // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) | ||
| if (!isFirstChunk) { | ||
| // 若未配置首字节超时,则 done=false 后也启动 idle timer 作为兜底(包含 0-byte chunk/value=undefined) | ||
| if (!isFirstChunk || !hasFirstByteTimeout) { |
There was a problem hiding this comment.
Idle timer now starts even before first chunk arrives when !hasFirstByteTimeout. This means for streams with 0-byte chunks or value=undefined, the idle timer could fire before any real data arrives, potentially aborting legitimate slow-starting streams. Consider whether the idle timer should truly be a fallback for missing first-byte timeout, or if it should only start after actual data (non-empty value).
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1209:1209
Comment:
Idle timer now starts even before first chunk arrives when `!hasFirstByteTimeout`. This means for streams with 0-byte chunks or `value=undefined`, the idle timer could fire before any real data arrives, potentially aborting legitimate slow-starting streams. Consider whether the idle timer should truly be a fallback for missing first-byte timeout, or if it should only start after actual data (non-empty `value`).
How can I resolve this? If you propose a fix, please make it concise.| // 1. 标准 SSE (event: + data:) - Claude/OpenAI | ||
| // 2. 纯 data: 格式 - Gemini | ||
| if (!usageMetrics && responseText.includes("data:")) { | ||
| if (!usageMetrics && isSSEText(responseText)) { |
There was a problem hiding this comment.
Replaced naive responseText.includes("data:") with isSSEText() utility for more accurate SSE format detection. This prevents false positives from JSON bodies containing the string "data:" and correctly identifies SSE by checking line-start patterns.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 2311:2311
Comment:
Replaced naive `responseText.includes("data:")` with `isSSEText()` utility for more accurate SSE format detection. This prevents false positives from JSON bodies containing the string "data:" and correctly identifies SSE by checking line-start patterns.
How can I resolve this? If you propose a fix, please make it concise.| closePromises.push(this.closeAgent(cached.agent, key)); | ||
| // Best-effort:等待部分 pending cleanup 完成,但永不无限等待(避免重蹈 “close() 等待 in-flight” 的覆辙) | ||
| if (this.pendingCleanups.size > 0) { | ||
| const pending = Array.from(this.pendingCleanups); |
There was a problem hiding this comment.
Timing window: if a cleanup promise settles between this snapshot (line 348) and when Promise.race starts (line 352), its .finally() handler (line 420) will try to delete from pendingCleanups even though it's already in the pending array. When the race completes and line 363 clears the set, that promise's delete becomes a no-op on an empty set. Benign but worth noting.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 348:348
Comment:
Timing window: if a cleanup promise settles between this snapshot (line 348) and when `Promise.race` starts (line 352), its `.finally()` handler (line 420) will try to delete from `pendingCleanups` even though it's already in the `pending` array. When the race completes and line 363 clears the set, that promise's delete becomes a no-op on an empty set. Benign but worth noting.
How can I resolve this? If you propose a fix, please make it concise.- AgentPool: close/destroy 走 fire-and-forget,并追踪 pending,shutdown best-effort - Gemini passthrough stats: 头+尾缓冲、截断 flush 边界、idle watchdog 兜底 - Usage SSE 检测改用 isSSEText,避免误判干扰 JSON 修复链路 - 补强 AgentPool 单测对 undici mock 的断言
- 增加 getEndpointCircuitStateSync/resetEndpointCircuit,避免新增导出导致 vitest mock 访问时报错
c91ee3f to
a172ad8
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/app/v1/_lib/proxy/response-handler.ts`:
- Around line 1178-1182: The idle timer is being started before the first chunk
when hasFirstByteTimeout is false, causing premature termination for
slow-starting streams; update the logic around the reader.read loop where
isFirstChunk, hasFirstByteTimeout and startIdleTimer are used so the idle timer
is only started after the first chunk arrives (i.e., require !isFirstChunk to
start) or, if you want a fallback, introduce a separate firstByteIdleFallbackMs
and start a dedicated first-byte fallback timer before the first chunk that is
wider than streamingIdleTimeoutMs and only begin the regular startIdleTimer()
after the first chunk; change the condition that currently reads (!isFirstChunk
|| !hasFirstByteTimeout) to either (!isFirstChunk) or to use the fallback path
when hasFirstByteTimeout is false and reference isFirstChunk,
hasFirstByteTimeout, startIdleTimer, streamingIdleTimeoutMs and
firstByteTimeoutStreamingMs when making the change.
🧹 Nitpick comments (4)
tests/unit/lib/provider-endpoints/probe.test.ts (1)
52-56: Mock 设置重复度极高,建议抽取为辅助函数。
@/lib/endpoint-circuit-breaker的 mock 在所有 10 个测试用例中完全一致(包括logger和@/repository的 mock 也高度雷同)。可以考虑抽取一个setupCommonMocks()辅助函数,后续新增字段只需改一处。示例
function mockCircuitBreaker() { return { getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), resetEndpointCircuit: vi.fn(async () => {}), }; } // 在每个测试中: vi.doMock("@/lib/endpoint-circuit-breaker", () => mockCircuitBreaker());src/lib/proxy-agent/agent-pool.ts (1)
335-370:shutdown()重构后行为正确,有一个小细节值得注意。由于
shutdown()改为直接调用closeAgent+cache.clear()(绕过了evictByKey),stats.evictedAgents在关闭期间不会递增。如果有监控依赖该计数器来确认关闭期间清理了多少 agent,会出现偏差。不过既然是 shutdown 场景,通常不影响实际使用。src/app/v1/_lib/proxy/response-handler.ts (2)
1157-1175: UTF-8 多字节字符在 head 边界处切分可能导致 head 略少于 1 MB。
value.subarray(0, remainingHeadBytes)可能在多字节 UTF-8 字符中间切断。decoder.decode(headPart, { stream: true })会将不完整的尾部字节缓存在 decoder 内部状态中,这些字节会被追加到后续decoder.decode(tailPart, { stream: true })的输出。因此:
headText可能比预期少几个字节的文本内容(最多 3 bytes/次切分)。pushChunk(headText, remainingHeadBytes)中bytes参数略微高估了实际文本字节数。对统计/结算来说这个偏差可以忽略(head 上限本身就是近似值),但如果需要精确,可以用
TextEncoder做字节级修正:可选修正:精确字节计数
const headText = decoder.decode(headPart, { stream: true }); - pushChunk(headText, remainingHeadBytes); + const actualHeadBytes = new TextEncoder().encode(headText).byteLength; + pushChunk(headText, actualHeadBytes); const tailText = decoder.decode(tailPart, { stream: true }); - pushChunk(tailText, chunkSize - remainingHeadBytes); + pushChunk(tailText, chunkSize - actualHeadBytes);
1437-1440: 非 Gemini 流路径仍使用无界chunks数组,缺少 head/tail 缓冲保护。Gemini passthrough 已经实现了 head/tail 双窗口缓冲来防止 OOM,但非 Gemini 流路径(line 1440
const chunks: string[] = [])仍然无限累积所有 chunk。对于超大响应,这同样存在内存风险。建议后续将 head/tail 缓冲逻辑提取为可复用的工具类,在两条路径中统一使用。
| const headText = decoder.decode(headPart, { stream: true }); | ||
| pushChunk(headText, remainingHeadBytes); | ||
|
|
||
| const tailText = decoder.decode(tailPart, { stream: true }); | ||
| pushChunk(tailText, chunkSize - remainingHeadBytes); |
There was a problem hiding this comment.
UTF-8 byte accounting wrong
In the head/tail split path, pushChunk(headText, remainingHeadBytes) and pushChunk(tailText, chunkSize - remainingHeadBytes) assume the decoder returns text for exactly those byte counts. If headPart ends mid–multi-byte UTF-8 sequence, TextDecoder.decode(headPart, { stream: true }) will buffer 1–3 bytes internally and not emit them until the next decode call. That means headBufferedBytes/tailBufferedBytes can become inaccurate, and you can either (a) switch to tail mode too early (under-filling head) or (b) exceed the intended limits without setting wasTruncated correctly. This is observable whenever a chunk boundary/split hits a multi-byte sequence.
Also appears at src/app/v1/_lib/proxy/response-handler.ts:1165 and :1168.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1164:1168
Comment:
**UTF-8 byte accounting wrong**
In the head/tail split path, `pushChunk(headText, remainingHeadBytes)` and `pushChunk(tailText, chunkSize - remainingHeadBytes)` assume the decoder returns text for exactly those byte counts. If `headPart` ends mid–multi-byte UTF-8 sequence, `TextDecoder.decode(headPart, { stream: true })` will buffer 1–3 bytes internally and *not* emit them until the next decode call. That means `headBufferedBytes`/`tailBufferedBytes` can become inaccurate, and you can either (a) switch to tail mode too early (under-filling head) or (b) exceed the intended limits without setting `wasTruncated` correctly. This is observable whenever a chunk boundary/split hits a multi-byte sequence.
Also appears at src/app/v1/_lib/proxy/response-handler.ts:1165 and :1168.
How can I resolve this? If you propose a fix, please make it concise.|
已逐条核对并采纳 AI review 的可行动建议(仅限本 PR 改动范围):
当前 Actions checks 全绿,reviewDecision=APPROVED,可合并。 |
概要
作为 #759(AgentPool 驱逐阻塞导致全局 requesting 卡死)的后续加固,本 PR 主要做三件事:
responseText.includes("data:")改为isSSEText(),避免 JSON 中包含 "data:" 的误判,避免干扰后续 JSON 修复/解析链路。pendingCleanups追踪与 shutdown best-effort 等待(2s cap + 60s drop-ref),既避免全局阻塞,又尽量优雅收尾/防内存泄漏。关键点
firstByteTimeoutStreamingMs(若配置)兜底。变更文件
src/app/v1/_lib/proxy/response-handler.tssrc/lib/proxy-agent/agent-pool.tstests/unit/lib/provider-endpoints/probe.test.tstests/unit/lib/proxy-agent/agent-pool.test.ts测试
npm testnpm run typecheckGreptile Overview
Greptile Summary
This PR hardens the
AgentPoolcleanup lifecycle and upgrades the Gemini passthrough stats buffering strategy.: [cch_truncated]) inserted at the truncation boundary to prevent cross-event misparsing when head and tail are joined.isSSEText()(checks line-start patterns) instead of a naiveresponseText.includes("data:"), preventing false positives from JSON bodies containing "data:".pendingCleanupstracking: Fire-and-forgetdestroy()/close()calls are now tracked in aSet<Promise<void>>with a 60-second drop-ref timeout, allowingshutdown()to do a best-effort 2-second wait for graceful cleanup without ever blocking indefinitely.destroy/closeare actually mock functions before asserting call behavior, catching mock setup regressions early.Confidence Score: 3/5
src/app/v1/_lib/proxy/response-handler.ts— specifically the head/tail chunk-splitting logic around lines 1158-1174 where UTF-8 multi-byte sequence boundaries interact with byte accounting.Important Files Changed
includes("data:")withisSSEText(). The UTF-8 byte-accounting issue at the head/tail split boundary (already discussed) remains the primary concern.pendingCleanupstracking for fire-and-forget destroy/close promises, with a 60s drop-ref safety net and 2s best-effort shutdown wait. Well-structured; minor benign race in cleanup tracking already noted.Flowchart
flowchart TD A[Upstream chunk arrives] --> B{value && byteLength > 0?} B -->|No| I[Skip to idle timer check] B -->|Yes| C{!inTailMode && headBytes < 1MB?} C -->|Yes| D{chunkSize > remainingHeadBytes?} C -->|No| H[pushChunk to tail] D -->|Yes| E[Split: headPart + tailPart] D -->|No| F[pushChunk whole to head] E --> F2[pushChunk headText to head] F2 --> G[pushChunk tailText to tail] G --> I F --> I H --> I I --> J{!isFirstChunk?} J -->|Yes| K[startIdleTimer] J -->|No| L[Continue loop] K --> L subgraph shutdown[AgentPool Shutdown] S1[closeAgent fire-and-forget] --> S2[Add to pendingCleanups] S2 --> S3[60s dropRef timeout] S3 --> S4{shutdown called?} S4 -->|Yes| S5[Promise.race: allSettled vs 2s timeout] S5 --> S6[pendingCleanups.clear] endLast reviewed commit: fcc1716