+ {hasFake200PostStreamFailure && (
+
+
+ {t("logs.details.fake200ForwardedNotice")}
+
+ )}
{onChainItemClick
? t("logs.providerChain.clickItemForDetails")
diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts
index 846dc7e0b..652ee6b7f 100644
--- a/src/app/v1/_lib/proxy/forwarder.ts
+++ b/src/app/v1/_lib/proxy/forwarder.ts
@@ -49,6 +49,7 @@ import {
import { ModelRedirector } from "./model-redirector";
import { ProxyProviderResolver } from "./provider-selector";
import type { ProxySession } from "./session";
+import { setDeferredStreamingFinalization } from "./stream-finalization";
import {
detectThinkingBudgetRectifierTrigger,
rectifyThinkingBudget,
@@ -377,6 +378,40 @@ export class ProxyForwarder {
const contentType = response.headers.get("content-type") || "";
const isSSE = contentType.includes("text/event-stream");
+ // ========== 流式响应:延迟成功判定(避免“假 200”)==========
+ // 背景:上游可能返回 HTTP 200,但 SSE 内容为错误 JSON(如 {"error": "..."})。
+ // 如果在“收到响应头”时就立刻记录 success / 更新 session 绑定:
+ // - 会把会话粘到一个实际不可用的 provider;
+ // - 熔断/故障转移统计被误记为成功;
+ // - 客户端下一次自动重试可能仍复用到同一 provider,导致“假 200”让重试失效。
+ //
+ // 解决:Forwarder 只负责尽快把 Response 返回给下游开始透传,
+ // 把最终成功/失败结算延迟到 ResponseHandler:等 SSE 正常结束后再基于最终 body 补充检查并更新内部状态。
+ if (isSSE) {
+ setDeferredStreamingFinalization(session, {
+ providerId: currentProvider.id,
+ providerName: currentProvider.name,
+ providerPriority: currentProvider.priority || 0,
+ attemptNumber: attemptCount,
+ totalProvidersAttempted,
+ isFirstAttempt: totalProvidersAttempted === 1 && attemptCount === 1,
+ isFailoverSuccess: totalProvidersAttempted > 1,
+ endpointId: activeEndpoint.endpointId,
+ endpointUrl: endpointAudit.endpointUrl,
+ upstreamStatusCode: response.status,
+ });
+
+ logger.info("ProxyForwarder: Streaming response received, deferring finalization", {
+ providerId: currentProvider.id,
+ providerName: currentProvider.name,
+ attemptNumber: attemptCount,
+ totalProvidersAttempted,
+ statusCode: response.status,
+ });
+
+ return response;
+ }
+
if (!isSSE) {
// 非流式响应:检测空响应
const contentLength = response.headers.get("content-length");
diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts
index 473611b3b..2842907a7 100644
--- a/src/app/v1/_lib/proxy/response-handler.ts
+++ b/src/app/v1/_lib/proxy/response-handler.ts
@@ -11,6 +11,7 @@ import { SessionTracker } from "@/lib/session-tracker";
import { calculateRequestCost } from "@/lib/utils/cost-calculation";
import { hasValidPriceData } from "@/lib/utils/price-data";
import { parseSSEData } from "@/lib/utils/sse";
+import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection";
import {
updateMessageRequestCost,
updateMessageRequestDetails,
@@ -23,6 +24,7 @@ import { GeminiAdapter } from "../gemini/adapter";
import type { GeminiResponse } from "../gemini/types";
import { isClientAbortError } from "./errors";
import type { ProxySession } from "./session";
+import { consumeDeferredStreamingFinalization } from "./stream-finalization";
export type UsageMetrics = {
input_tokens?: number;
@@ -59,6 +61,310 @@ function cleanResponseHeaders(headers: Headers): Headers {
return cleaned;
}
+type FinalizeDeferredStreamingResult = {
+ /**
+ * “内部结算用”的状态码。
+ *
+ * 注意:这不会改变客户端实际收到的 HTTP 状态码(SSE 已经开始透传后无法回头改)。
+ * 这里的目的仅是让内部统计/熔断/会话绑定把“假 200”按失败处理。
+ */
+ effectiveStatusCode: number;
+ /**
+ * 内部记录的错误原因(用于写入 DB/监控,帮助定位“假 200”问题)。
+ */
+ errorMessage: string | null;
+ /**
+ * 写入 DB 时用于归因的 providerId(优先使用 deferred meta 的 providerId)。
+ *
+ * 说明:对 SSE 来说,session.provider 可能在后续逻辑里被更新/覆盖;而 deferred meta 代表本次流真正对应的 provider。
+ * 该字段用于保证 DB 的 providerId 与 providerChain/熔断归因一致。
+ */
+ providerIdForPersistence: number | null;
+};
+
+/**
+ * 若本次 SSE 被标记为“延迟结算”,则在流结束后补齐成功/失败的最终判定。
+ *
+ * 触发条件
+ * - Forwarder 收到 Response 且识别为 SSE 时,会在 session 上挂载 DeferredStreamingFinalization 元信息。
+ * - ResponseHandler 在后台读取完整 SSE 内容后,调用本函数:
+ * - 如果内容看起来是上游错误 JSON(假 200),则:
+ * - 计入熔断器失败;
+ * - 不更新 session 智能绑定(避免把会话粘到坏 provider);
+ * - 内部状态码改为 502(只影响统计与后续重试选择,不影响本次客户端响应)。
+ * - 如果流正常结束且未命中错误判定,则按成功结算并更新绑定/熔断/endpoint 成功率。
+ *
+ * @param streamEndedNormally - 必须是 reader 读到 done=true 的“自然结束”;超时/中断等异常结束由其它逻辑处理。
+ * @param clientAborted - 标记是否为客户端主动中断(用于内部状态码映射,避免把中断记为 200 completed)
+ * @param abortReason - 非自然结束时的原因码(用于内部记录/熔断归因;不会影响客户端响应)
+ */
+async function finalizeDeferredStreamingFinalizationIfNeeded(
+ session: ProxySession,
+ allContent: string,
+ upstreamStatusCode: number,
+ streamEndedNormally: boolean,
+ clientAborted: boolean,
+ abortReason?: string
+): Promise {
+ const meta = consumeDeferredStreamingFinalization(session);
+ const provider = session.provider;
+
+ const providerIdForPersistence = meta?.providerId ?? provider?.id ?? null;
+
+ // 仅在“上游 HTTP=200 且流自然结束”时做“假 200”检测:
+ // - 非 200:HTTP 已经表明失败(无需额外启发式)
+ // - 非自然结束:内容可能是部分流/截断,启发式会显著提高误判风险
+ //
+ // 此处返回 `{isError:false}` 仅表示“跳过检测”,最终仍会在下面按中断/超时视为失败结算。
+ const shouldDetectFake200 = streamEndedNormally && upstreamStatusCode === 200;
+ const detected = shouldDetectFake200
+ ? detectUpstreamErrorFromSseOrJsonText(allContent)
+ : ({ isError: false } as const);
+
+ // “内部结算用”的状态码(不会改变客户端实际 HTTP 状态码)。
+ // - 假 200:映射为 502,确保内部统计/熔断/会话绑定把它当作失败。
+ // - 未自然结束:也应映射为失败(避免把中断/部分流误记为 200 completed)。
+ let effectiveStatusCode: number;
+ let errorMessage: string | null;
+ if (detected.isError) {
+ effectiveStatusCode = 502;
+ errorMessage = detected.code;
+ } else if (!streamEndedNormally) {
+ effectiveStatusCode = clientAborted ? 499 : 502;
+ errorMessage = clientAborted ? "CLIENT_ABORTED" : (abortReason ?? "STREAM_ABORTED");
+ } else {
+ effectiveStatusCode = upstreamStatusCode;
+ errorMessage = null;
+ }
+
+ // 未启用延迟结算 / provider 缺失:
+ // - 只返回“内部状态码 + 错误原因”,由调用方写入统计;
+ // - 不在这里更新熔断/绑定(meta 缺失意味着 Forwarder 没有启用延迟结算;provider 缺失意味着无法归因)。
+ if (!meta || !provider) {
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
+ }
+
+ // meta 由 Forwarder 在“拿到 upstream Response 的那一刻”记录,代表真正产生本次流的 provider。
+ // 即使 session.provider 在之后被其它逻辑意外修改(极端情况),我们仍以 meta 为准更新:
+ // - provider/endpoint 熔断与统计
+ // - session 智能绑定
+ // 这样能避免把成功/失败记到错误的 provider 上。
+ let providerForChain = provider;
+ if (provider.id !== meta.providerId) {
+ logger.warn("[ResponseHandler] Deferred streaming meta provider mismatch", {
+ sessionId: session.sessionId ?? null,
+ metaProviderId: meta.providerId,
+ currentProviderId: provider.id,
+ canonicalProviderId: meta.providerId,
+ });
+
+ // 尝试用 meta.providerId 找回正确的 Provider 对象,保证 providerChain 的审计数据一致
+ try {
+ const providers = await session.getProvidersSnapshot();
+ const resolved = providers.find((p) => p.id === meta.providerId);
+ if (resolved) {
+ providerForChain = resolved;
+ } else {
+ logger.warn("[ResponseHandler] Deferred streaming meta provider not found in snapshot", {
+ sessionId: session.sessionId ?? null,
+ metaProviderId: meta.providerId,
+ currentProviderId: provider.id,
+ });
+ }
+ } catch (resolveError) {
+ logger.warn("[ResponseHandler] Failed to resolve meta provider from snapshot", {
+ sessionId: session.sessionId ?? null,
+ metaProviderId: meta.providerId,
+ currentProviderId: provider.id,
+ error: resolveError,
+ });
+ }
+ }
+
+ // 未自然结束:不更新 session 绑定(避免把会话粘到不稳定 provider),但要避免把它误记为 200 completed。
+ //
+ // 同时,为了让故障转移/熔断能正确工作:
+ // - 客户端主动中断:不计入熔断器(这通常不是供应商问题)
+ // - 非客户端中断:计入 provider/endpoint 熔断失败(与 timeout 路径保持一致)
+ if (!streamEndedNormally) {
+ if (!clientAborted) {
+ try {
+ // 动态导入:避免 proxy 模块与熔断器模块之间潜在的循环依赖。
+ const { recordFailure } = await import("@/lib/circuit-breaker");
+ await recordFailure(meta.providerId, new Error(errorMessage ?? "STREAM_ABORTED"));
+ } catch (cbError) {
+ logger.warn("[ResponseHandler] Failed to record streaming failure in circuit breaker", {
+ providerId: meta.providerId,
+ sessionId: session.sessionId ?? null,
+ error: cbError,
+ });
+ }
+
+ if (meta.endpointId != null) {
+ try {
+ const { recordEndpointFailure } = await import("@/lib/endpoint-circuit-breaker");
+ await recordEndpointFailure(meta.endpointId, new Error(errorMessage ?? "STREAM_ABORTED"));
+ } catch (endpointError) {
+ logger.warn("[ResponseHandler] Failed to record endpoint failure (stream aborted)", {
+ endpointId: meta.endpointId,
+ providerId: meta.providerId,
+ sessionId: session.sessionId ?? null,
+ error: endpointError,
+ });
+ }
+ }
+ }
+
+ session.addProviderToChain(providerForChain, {
+ endpointId: meta.endpointId,
+ endpointUrl: meta.endpointUrl,
+ reason: "system_error",
+ attemptNumber: meta.attemptNumber,
+ statusCode: effectiveStatusCode,
+ errorMessage: errorMessage ?? undefined,
+ });
+
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
+ }
+
+ if (detected.isError) {
+ logger.warn("[ResponseHandler] SSE completed but body indicates error (fake 200)", {
+ providerId: meta.providerId,
+ providerName: meta.providerName,
+ upstreamStatusCode: meta.upstreamStatusCode,
+ effectiveStatusCode,
+ code: detected.code,
+ detail: detected.detail ?? null,
+ });
+
+ // 计入熔断器:让后续请求能正确触发故障转移/熔断
+ try {
+ // 动态导入:避免 proxy 模块与熔断器模块之间潜在的循环依赖。
+ const { recordFailure } = await import("@/lib/circuit-breaker");
+ await recordFailure(meta.providerId, new Error(detected.code));
+ } catch (cbError) {
+ logger.warn("[ResponseHandler] Failed to record fake-200 error in circuit breaker", {
+ providerId: meta.providerId,
+ sessionId: session.sessionId ?? null,
+ error: cbError,
+ });
+ }
+
+ // endpoint 级熔断:与成功路径保持对称,避免“假 200”只影响 provider 而不影响 endpoint 健康度
+ if (meta.endpointId != null) {
+ try {
+ const { recordEndpointFailure } = await import("@/lib/endpoint-circuit-breaker");
+ await recordEndpointFailure(meta.endpointId, new Error(detected.code));
+ } catch (endpointError) {
+ logger.warn("[ResponseHandler] Failed to record endpoint failure (fake 200)", {
+ endpointId: meta.endpointId,
+ providerId: meta.providerId,
+ error: endpointError,
+ });
+ }
+ }
+
+ // 记录到决策链(用于日志展示与 DB 持久化)。
+ // 注意:这里用 effectiveStatusCode(502)而不是 upstreamStatusCode(200),
+ // 以便让内部链路明确显示这是一次失败(否则会被误读为成功)。
+ session.addProviderToChain(providerForChain, {
+ endpointId: meta.endpointId,
+ endpointUrl: meta.endpointUrl,
+ reason: "retry_failed",
+ attemptNumber: meta.attemptNumber,
+ statusCode: effectiveStatusCode,
+ errorMessage: detected.code,
+ });
+
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
+ }
+
+ // ========== 真正成功(SSE 完整结束且未命中错误判定)==========
+ if (meta.endpointId != null) {
+ try {
+ const { recordEndpointSuccess } = await import("@/lib/endpoint-circuit-breaker");
+ await recordEndpointSuccess(meta.endpointId);
+ } catch (endpointError) {
+ logger.warn("[ResponseHandler] Failed to record endpoint success (stream)", {
+ endpointId: meta.endpointId,
+ providerId: meta.providerId,
+ error: endpointError,
+ });
+ }
+ }
+
+ try {
+ const { recordSuccess } = await import("@/lib/circuit-breaker");
+ await recordSuccess(meta.providerId);
+ } catch (cbError) {
+ logger.warn("[ResponseHandler] Failed to record streaming success in circuit breaker", {
+ providerId: meta.providerId,
+ error: cbError,
+ });
+ }
+
+ // 成功后绑定 session 到供应商(智能绑定策略)
+ if (session.sessionId) {
+ const result = await SessionManager.updateSessionBindingSmart(
+ session.sessionId,
+ meta.providerId,
+ meta.providerPriority,
+ meta.isFirstAttempt,
+ meta.isFailoverSuccess
+ );
+
+ if (result.updated) {
+ logger.info("[ResponseHandler] Session binding updated (stream finalized)", {
+ sessionId: session.sessionId,
+ providerId: meta.providerId,
+ providerName: meta.providerName,
+ priority: meta.providerPriority,
+ reason: result.reason,
+ details: result.details,
+ attemptNumber: meta.attemptNumber,
+ totalProvidersAttempted: meta.totalProvidersAttempted,
+ });
+ } else {
+ logger.debug("[ResponseHandler] Session binding not updated (stream finalized)", {
+ sessionId: session.sessionId,
+ providerId: meta.providerId,
+ providerName: meta.providerName,
+ priority: meta.providerPriority,
+ reason: result.reason,
+ details: result.details,
+ });
+ }
+
+ // 统一更新两个数据源(确保监控数据一致)
+ void SessionManager.updateSessionProvider(session.sessionId, {
+ providerId: meta.providerId,
+ providerName: meta.providerName,
+ }).catch((err) => {
+ logger.error("[ResponseHandler] Failed to update session provider info (stream)", {
+ error: err,
+ });
+ });
+ }
+
+ session.addProviderToChain(providerForChain, {
+ endpointId: meta.endpointId,
+ endpointUrl: meta.endpointUrl,
+ reason: meta.isFirstAttempt ? "request_success" : "retry_success",
+ attemptNumber: meta.attemptNumber,
+ statusCode: meta.upstreamStatusCode,
+ });
+
+ logger.info("[ResponseHandler] Streaming request finalized as success", {
+ providerId: meta.providerId,
+ providerName: meta.providerName,
+ attemptNumber: meta.attemptNumber,
+ totalProvidersAttempted: meta.totalProvidersAttempted,
+ statusCode: meta.upstreamStatusCode,
+ });
+
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
+}
+
export class ProxyResponseHandler {
static async dispatch(session: ProxySession, response: Response): Promise {
let fixedResponse = response;
@@ -215,8 +521,8 @@ export class ProxyResponseHandler {
statusCode: statusCode,
ttfbMs: session.ttfbMs ?? duration,
providerChain: session.getProviderChain(),
- model: session.getCurrentModel() ?? undefined, // ⭐ 更新重定向后的模型
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
+ model: session.getCurrentModel() ?? undefined, // 更新重定向后的模型
+ providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
context1mApplied: session.getContext1mApplied(),
});
const tracker = ProxyStatusTracker.getInstance();
@@ -371,8 +677,8 @@ export class ProxyResponseHandler {
cacheCreation1hInputTokens: usageMetrics?.cache_creation_1h_input_tokens,
cacheTtlApplied: usageMetrics?.cache_ttl ?? null,
providerChain: session.getProviderChain(),
- model: session.getCurrentModel() ?? undefined, // ⭐ 更新重定向后的模型
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
+ model: session.getCurrentModel() ?? undefined, // 更新重定向后的模型
+ providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
context1mApplied: session.getContext1mApplied(),
});
@@ -573,15 +879,22 @@ export class ProxyResponseHandler {
const reader = responseForStats.body?.getReader();
if (!reader) return;
+ // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容:
+ // - 用于解析 usage/cost 与内部结算(例如“假 200”检测)
+ // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。
const chunks: string[] = [];
const decoder = new TextDecoder();
let isFirstChunk = true;
+ let streamEndedNormally = false;
while (true) {
if (session.clientAbortSignal?.aborted) break;
const { done, value } = await reader.read();
- if (done) break;
+ if (done) {
+ streamEndedNormally = true;
+ break;
+ }
if (value) {
if (isFirstChunk) {
isFirstChunk = false;
@@ -594,6 +907,7 @@ export class ProxyResponseHandler {
const flushed = decoder.decode();
if (flushed) chunks.push(flushed);
const allContent = chunks.join("");
+ const clientAborted = session.clientAbortSignal?.aborted ?? false;
// 存储响应体到 Redis(5分钟过期)
if (session.sessionId) {
@@ -608,7 +922,21 @@ export class ProxyResponseHandler {
// 使用共享的统计处理方法
const duration = Date.now() - session.startTime;
- await finalizeRequestStats(session, allContent, statusCode, duration);
+ const finalized = await finalizeDeferredStreamingFinalizationIfNeeded(
+ session,
+ allContent,
+ statusCode,
+ streamEndedNormally,
+ clientAborted
+ );
+ await finalizeRequestStats(
+ session,
+ allContent,
+ finalized.effectiveStatusCode,
+ duration,
+ finalized.errorMessage ?? undefined,
+ finalized.providerIdForPersistence ?? undefined
+ );
} catch (error) {
if (!isClientAbortError(error as Error)) {
logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error);
@@ -702,6 +1030,9 @@ export class ProxyResponseHandler {
const processingPromise = (async () => {
const reader = internalStream.getReader();
const decoder = new TextDecoder();
+ // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容:
+ // - 用于解析 usage/cost 与内部结算(例如“假 200”检测)
+ // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。
const chunks: string[] = [];
let usageForCost: UsageMetrics | null = null;
let isFirstChunk = true; // ⭐ 标记是否为第一块数据
@@ -779,7 +1110,24 @@ export class ProxyResponseHandler {
return chunks.join("");
};
- const finalizeStream = async (allContent: string): Promise => {
+ const finalizeStream = async (
+ allContent: string,
+ streamEndedNormally: boolean,
+ clientAborted: boolean,
+ abortReason?: string
+ ): Promise => {
+ const finalized = await finalizeDeferredStreamingFinalizationIfNeeded(
+ session,
+ allContent,
+ statusCode,
+ streamEndedNormally,
+ clientAborted,
+ abortReason
+ );
+ const effectiveStatusCode = finalized.effectiveStatusCode;
+ const streamErrorMessage = finalized.errorMessage;
+ const providerIdForPersistence = finalized.providerIdForPersistence;
+
// 存储响应体到 Redis(5分钟过期)
if (session.sessionId) {
void SessionManager.storeSessionResponse(
@@ -839,10 +1187,10 @@ export class ProxyResponseHandler {
await trackCostToRedis(session, usageForCost);
// 更新 session 使用量到 Redis(用于实时监控)
- if (session.sessionId && usageForCost) {
+ if (session.sessionId) {
let costUsdStr: string | undefined;
try {
- if (session.request.model) {
+ if (usageForCost && session.request.model) {
const priceData = await session.getCachedPriceDataByBillingSource();
if (priceData) {
const cost = calculateRequestCost(
@@ -862,22 +1210,30 @@ export class ProxyResponseHandler {
});
}
- void SessionManager.updateSessionUsage(session.sessionId, {
- inputTokens: usageForCost.input_tokens,
- outputTokens: usageForCost.output_tokens,
- cacheCreationInputTokens: usageForCost.cache_creation_input_tokens,
- cacheReadInputTokens: usageForCost.cache_read_input_tokens,
- costUsd: costUsdStr,
- status: statusCode >= 200 && statusCode < 300 ? "completed" : "error",
- statusCode: statusCode,
- }).catch((error: unknown) => {
- logger.error("[ResponseHandler] Failed to update session usage:", error);
- });
+ const payload: SessionUsageUpdate = {
+ status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error",
+ statusCode: effectiveStatusCode,
+ ...(streamErrorMessage ? { errorMessage: streamErrorMessage } : {}),
+ };
+
+ if (usageForCost) {
+ payload.inputTokens = usageForCost.input_tokens;
+ payload.outputTokens = usageForCost.output_tokens;
+ payload.cacheCreationInputTokens = usageForCost.cache_creation_input_tokens;
+ payload.cacheReadInputTokens = usageForCost.cache_read_input_tokens;
+ payload.costUsd = costUsdStr;
+ }
+
+ void SessionManager.updateSessionUsage(session.sessionId, payload).catch(
+ (error: unknown) => {
+ logger.error("[ResponseHandler] Failed to update session usage:", error);
+ }
+ );
}
// 保存扩展信息(status code, tokens, provider chain)
await updateMessageRequestDetails(messageContext.id, {
- statusCode: statusCode,
+ statusCode: effectiveStatusCode,
inputTokens: usageForCost?.input_tokens,
outputTokens: usageForCost?.output_tokens,
ttfbMs: session.ttfbMs,
@@ -887,13 +1243,15 @@ export class ProxyResponseHandler {
cacheCreation1hInputTokens: usageForCost?.cache_creation_1h_input_tokens,
cacheTtlApplied: usageForCost?.cache_ttl ?? null,
providerChain: session.getProviderChain(),
- model: session.getCurrentModel() ?? undefined, // ⭐ 更新重定向后的模型
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
+ ...(streamErrorMessage ? { errorMessage: streamErrorMessage } : {}),
+ model: session.getCurrentModel() ?? undefined, // 更新重定向后的模型
+ providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后)
context1mApplied: session.getContext1mApplied(),
});
};
try {
+ let streamEndedNormally = false;
while (true) {
// 检查取消信号
if (session.clientAbortSignal?.aborted || abortController.signal.aborted) {
@@ -907,6 +1265,7 @@ export class ProxyResponseHandler {
const { value, done } = await reader.read();
if (done) {
+ streamEndedNormally = true;
break;
}
if (value) {
@@ -945,7 +1304,30 @@ export class ProxyResponseHandler {
// ⭐ 流式读取完成:清除静默期计时器
clearIdleTimer();
const allContent = flushAndJoin();
- await finalizeStream(allContent);
+ const clientAborted = session.clientAbortSignal?.aborted ?? false;
+ try {
+ await finalizeStream(allContent, streamEndedNormally, clientAborted);
+ } catch (finalizeError) {
+ logger.error("ResponseHandler: Failed to finalize stream", {
+ taskId,
+ providerId: provider.id,
+ providerName: provider.name,
+ messageId: messageContext.id,
+ streamEndedNormally,
+ clientAborted,
+ finalizeError,
+ });
+
+ // 回退:避免 finalizeStream 失败导致 request record 未被更新
+ await persistRequestFailure({
+ session,
+ messageContext,
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 500,
+ error: finalizeError,
+ taskId,
+ phase: "stream",
+ });
+ }
} catch (error) {
// 检测 AbortError 的来源:响应超时 vs 静默期超时 vs 客户端/上游中断
const err = error as Error;
@@ -972,32 +1354,30 @@ export class ProxyResponseHandler {
errorName: err.name,
});
- // ⚠️ 计入熔断器(动态导入避免循环依赖)
+ // 注意:无法重试,因为客户端已收到 HTTP 200
+ // 错误已记录,不抛出异常(避免影响后台任务)
+
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
try {
- const { recordFailure } = await import("@/lib/circuit-breaker");
- await recordFailure(provider.id, err);
- logger.debug("ResponseHandler: Response timeout recorded in circuit breaker", {
- providerId: provider.id,
+ const allContent = flushAndJoin();
+ await finalizeStream(allContent, false, false, "STREAM_RESPONSE_TIMEOUT");
+ } catch (finalizeError) {
+ logger.error("ResponseHandler: Failed to finalize response-timeout stream", {
+ taskId,
+ messageId: messageContext.id,
+ finalizeError,
});
- } catch (cbError) {
- logger.warn("ResponseHandler: Failed to record timeout in circuit breaker", {
- providerId: provider.id,
- error: cbError,
+
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
+ await persistRequestFailure({
+ session,
+ messageContext,
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
+ error: err,
+ taskId,
+ phase: "stream",
});
}
-
- // 注意:无法重试,因为客户端已收到 HTTP 200
- // 错误已记录,熔断器已更新,不抛出异常(避免影响后台任务)
-
- // 更新数据库记录(避免 orphan record)
- await persistRequestFailure({
- session,
- messageContext,
- statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
- error: err,
- taskId,
- phase: "stream",
- });
} else if (isIdleTimeout) {
// ⚠️ 静默期超时:计入熔断器并记录错误日志
logger.error("ResponseHandler: Streaming idle timeout", {
@@ -1008,32 +1388,30 @@ export class ProxyResponseHandler {
chunksCollected: chunks.length,
});
- // ⚠️ 计入熔断器(动态导入避免循环依赖)
+ // 注意:无法重试,因为客户端已收到 HTTP 200
+ // 错误已记录,不抛出异常(避免影响后台任务)
+
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
try {
- const { recordFailure } = await import("@/lib/circuit-breaker");
- await recordFailure(provider.id, err);
- logger.debug("ResponseHandler: Streaming idle timeout recorded in circuit breaker", {
- providerId: provider.id,
+ const allContent = flushAndJoin();
+ await finalizeStream(allContent, false, false, "STREAM_IDLE_TIMEOUT");
+ } catch (finalizeError) {
+ logger.error("ResponseHandler: Failed to finalize idle-timeout stream", {
+ taskId,
+ messageId: messageContext.id,
+ finalizeError,
});
- } catch (cbError) {
- logger.warn("ResponseHandler: Failed to record timeout in circuit breaker", {
- providerId: provider.id,
- error: cbError,
+
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
+ await persistRequestFailure({
+ session,
+ messageContext,
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
+ error: err,
+ taskId,
+ phase: "stream",
});
}
-
- // 注意:无法重试,因为客户端已收到 HTTP 200
- // 错误已记录,熔断器已更新,不抛出异常(避免影响后台任务)
-
- // 更新数据库记录(避免 orphan record - 这是导致 185 个孤儿记录的根本原因!)
- await persistRequestFailure({
- session,
- messageContext,
- statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
- error: err,
- taskId,
- phase: "stream",
- });
} else if (!clientAborted) {
// 上游在流式过程中意外中断:视为供应商/网络错误
logger.error("ResponseHandler: Upstream stream aborted unexpectedly", {
@@ -1046,14 +1424,27 @@ export class ProxyResponseHandler {
errorMessage: err.message || "(empty message)",
});
- await persistRequestFailure({
- session,
- messageContext,
- statusCode: 502,
- error: err,
- taskId,
- phase: "stream",
- });
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
+ try {
+ const allContent = flushAndJoin();
+ await finalizeStream(allContent, false, false, "STREAM_UPSTREAM_ABORTED");
+ } catch (finalizeError) {
+ logger.error("ResponseHandler: Failed to finalize upstream-aborted stream", {
+ taskId,
+ messageId: messageContext.id,
+ finalizeError,
+ });
+
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
+ await persistRequestFailure({
+ session,
+ messageContext,
+ statusCode: 502,
+ error: err,
+ taskId,
+ phase: "stream",
+ });
+ }
} else {
// 客户端主动中断:正常日志,不抛出错误
logger.warn("ResponseHandler: Stream reading aborted by client", {
@@ -1070,7 +1461,7 @@ export class ProxyResponseHandler {
});
try {
const allContent = flushAndJoin();
- await finalizeStream(allContent);
+ await finalizeStream(allContent, false, true);
} catch (finalizeError) {
logger.error("ResponseHandler: Failed to finalize aborted stream response", {
taskId,
@@ -1082,15 +1473,27 @@ export class ProxyResponseHandler {
} else {
logger.error("Failed to save SSE content:", error);
- // 更新数据库记录(避免 orphan record)
- await persistRequestFailure({
- session,
- messageContext,
- statusCode: statusCode && statusCode >= 400 ? statusCode : 500,
- error,
- taskId,
- phase: "stream",
- });
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
+ try {
+ const allContent = flushAndJoin();
+ await finalizeStream(allContent, false, clientAborted, "STREAM_PROCESSING_ERROR");
+ } catch (finalizeError) {
+ logger.error("ResponseHandler: Failed to finalize stream after processing error", {
+ taskId,
+ messageId: messageContext.id,
+ finalizeError,
+ });
+
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
+ await persistRequestFailure({
+ session,
+ messageContext,
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 500,
+ error,
+ taskId,
+ phase: "stream",
+ });
+ }
}
} finally {
// 确保资源释放
@@ -1795,18 +2198,25 @@ async function updateRequestCostFromUsage(
/**
* 统一的请求统计处理方法
* 用于消除 Gemini 透传、普通非流式、普通流式之间的重复统计逻辑
+ *
+ * @param statusCode - 内部结算状态码(可能与客户端实际收到的 HTTP 状态不同,例如“假 200”会被映射为 502)
+ * @param errorMessage - 可选的内部错误原因(用于把假 200/解析失败等信息写入 DB 与监控)
*/
export async function finalizeRequestStats(
session: ProxySession,
responseText: string,
statusCode: number,
- duration: number
+ duration: number,
+ errorMessage?: string,
+ providerIdOverride?: number
): Promise {
const { messageContext, provider } = session;
if (!provider || !messageContext) {
return;
}
+ const providerIdForPersistence = providerIdOverride ?? session.provider?.id;
+
// 1. 结束请求状态追踪
ProxyStatusTracker.getInstance().endRequest(messageContext.user.id, messageContext.id);
@@ -1820,10 +2230,11 @@ export async function finalizeRequestStats(
// 即使没有 usageMetrics,也需要更新状态码和 provider chain
await updateMessageRequestDetails(messageContext.id, {
statusCode: statusCode,
+ ...(errorMessage ? { errorMessage } : {}),
ttfbMs: session.ttfbMs ?? duration,
providerChain: session.getProviderChain(),
model: session.getCurrentModel() ?? undefined,
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
+ providerId: providerIdForPersistence, // 更新最终供应商ID(重试切换后)
context1mApplied: session.getContext1mApplied(),
});
return;
@@ -1892,6 +2303,7 @@ export async function finalizeRequestStats(
costUsd: costUsdStr,
status: statusCode >= 200 && statusCode < 300 ? "completed" : "error",
statusCode: statusCode,
+ ...(errorMessage ? { errorMessage } : {}),
}).catch((error: unknown) => {
logger.error("[ResponseHandler] Failed to update session usage:", error);
});
@@ -1909,8 +2321,9 @@ export async function finalizeRequestStats(
cacheCreation1hInputTokens: normalizedUsage.cache_creation_1h_input_tokens,
cacheTtlApplied: normalizedUsage.cache_ttl ?? null,
providerChain: session.getProviderChain(),
+ ...(errorMessage ? { errorMessage } : {}),
model: session.getCurrentModel() ?? undefined,
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
+ providerId: providerIdForPersistence, // 更新最终供应商ID(重试切换后)
context1mApplied: session.getContext1mApplied(),
});
}
@@ -2066,7 +2479,7 @@ async function persistRequestFailure(options: {
ttfbMs: phase === "non-stream" ? (session.ttfbMs ?? duration) : session.ttfbMs,
providerChain: session.getProviderChain(),
model: session.getCurrentModel() ?? undefined,
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
+ providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
context1mApplied: session.getContext1mApplied(),
});
diff --git a/src/app/v1/_lib/proxy/stream-finalization.ts b/src/app/v1/_lib/proxy/stream-finalization.ts
new file mode 100644
index 000000000..9a915843e
--- /dev/null
+++ b/src/app/v1/_lib/proxy/stream-finalization.ts
@@ -0,0 +1,55 @@
+import type { ProxySession } from "./session";
+
+/**
+ * 流式响应(SSE)在“收到响应头”时无法确定成功与否:
+ * - 上游可能返回 HTTP 200,但 body 是错误 JSON(假 200)
+ * - 只有在 SSE 结束后才能做最终判定
+ *
+ * 该结构用于 Forwarder → ResponseHandler 之间传递“延迟结算”的必要信息:
+ * - Forwarder:拿到 Response 后尽快开始向客户端透传(降低延迟);但不要立刻记为 success/绑定 session。
+ * - ResponseHandler:在流正常结束后,基于最终响应体做一次补充检查,然后再更新熔断/endpoint/会话绑定。
+ *
+ * 说明:
+ * - 这里选择使用 WeakMap,而不是把字段挂到 session 上:
+ * - 避免污染 ProxySession 对象;
+ * - 更类型安全;
+ * - 元信息生命周期跟随 session 实例,消费后可立即清理。
+ * - 元信息是一次性的:消费后会被清空,避免跨请求污染。
+ */
+export type DeferredStreamingFinalization = {
+ providerId: number;
+ providerName: string;
+ providerPriority: number;
+ attemptNumber: number;
+ totalProvidersAttempted: number;
+ isFirstAttempt: boolean;
+ isFailoverSuccess: boolean;
+ endpointId: number | null;
+ endpointUrl: string;
+ upstreamStatusCode: number;
+};
+
+const deferredMeta = new WeakMap();
+
+export function setDeferredStreamingFinalization(
+ session: ProxySession,
+ meta: DeferredStreamingFinalization
+): void {
+ // Forwarder 在识别到 SSE 时调用:标记该请求需要在流结束后“二次结算”。
+ deferredMeta.set(session, meta);
+}
+
+export function consumeDeferredStreamingFinalization(
+ session: ProxySession
+): DeferredStreamingFinalization | null {
+ // 备注:
+ // - 该函数内部无 await;JS 事件循环保证单次调用不会被并发打断。
+ // - ProxySession 是“每次请求”创建的实例;即使多个后台任务先后调用,
+ // 也只有第一次能拿到 meta,其余调用都会得到 null。
+ const meta = deferredMeta.get(session) ?? null;
+ if (meta) {
+ // 只允许消费一次:避免重复结算(例如多个后台统计任务并行时)。
+ deferredMeta.delete(session);
+ }
+ return meta;
+}
diff --git a/src/lib/config/env.schema.ts b/src/lib/config/env.schema.ts
index 0055d3722..b120fd8c8 100644
--- a/src/lib/config/env.schema.ts
+++ b/src/lib/config/env.schema.ts
@@ -98,6 +98,14 @@ export const EnvSchema = z.object({
// - false (默认):存储请求/响应体但对 message 内容脱敏 [REDACTED]
// - true:原样存储 message 内容(注意隐私和存储空间影响)
STORE_SESSION_MESSAGES: z.string().default("false").transform(booleanTransform),
+ // 会话响应体存储开关
+ // - true (默认):存储响应体(SSE/JSON),用于调试/回放/问题定位(Redis 临时缓存,默认 5 分钟)
+ // - false:不存储响应体(注意:不影响本次请求处理;仅影响后续在 UI/诊断中查看 response body)
+ //
+ // 说明:
+ // - 该开关只影响“写入 Redis 的响应体内容”,不影响内部统计逻辑读取响应体(例如 tokens/费用统计、SSE 结束后的假 200 检测)。
+ // - message 内容是否脱敏仍由 STORE_SESSION_MESSAGES 控制。
+ STORE_SESSION_RESPONSE_BODY: z.string().default("true").transform(booleanTransform),
DEBUG_MODE: z.string().default("false").transform(booleanTransform),
LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"),
TZ: z.string().default("Asia/Shanghai"),
diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts
index 276fd6dd1..5af426dd6 100644
--- a/src/lib/session-manager.ts
+++ b/src/lib/session-manager.ts
@@ -1331,7 +1331,11 @@ export class SessionManager {
/**
* 存储 session 响应体(临时存储,5分钟过期)
*
- * 存储策略受 STORE_SESSION_MESSAGES 控制:
+ * 存储行为受 STORE_SESSION_RESPONSE_BODY 控制:
+ * - true (默认):存储响应体到 Redis 临时缓存
+ * - false:不存储(注意:不影响本次请求处理与统计,仅影响后续查看 response body)
+ *
+ * 存储策略(脱敏/原样)受 STORE_SESSION_MESSAGES 控制:
* - true:原样存储响应内容
* - false(默认):对 JSON 响应体中的 message 内容脱敏 [REDACTED]
*
@@ -1344,6 +1348,10 @@ export class SessionManager {
response: string | object,
requestSequence?: number
): Promise {
+ // 允许通过环境变量显式关闭响应体存储(例如隐私/节省 Redis 内存)。
+ // 注意:这里仅关闭“写入 Redis”这一步;调用方仍然可能在内存中读取响应体用于统计或错误检测。
+ if (!getEnvConfig().STORE_SESSION_RESPONSE_BODY) return;
+
const redis = getRedisClient();
if (!redis || redis.status !== "ready") return;
diff --git a/src/lib/utils/upstream-error-detection.test.ts b/src/lib/utils/upstream-error-detection.test.ts
new file mode 100644
index 000000000..d1facd969
--- /dev/null
+++ b/src/lib/utils/upstream-error-detection.test.ts
@@ -0,0 +1,213 @@
+import { describe, expect, test } from "vitest";
+import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection";
+
+describe("detectUpstreamErrorFromSseOrJsonText", () => {
+ test("空响应体视为错误", () => {
+ expect(detectUpstreamErrorFromSseOrJsonText("")).toEqual({
+ isError: true,
+ code: "FAKE_200_EMPTY_BODY",
+ });
+ });
+
+ test("纯空白响应体视为错误", () => {
+ expect(detectUpstreamErrorFromSseOrJsonText(" \n\t ")).toEqual({
+ isError: true,
+ code: "FAKE_200_EMPTY_BODY",
+ });
+ });
+
+ test("纯 JSON:error 字段非空视为错误", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"error":"当前无可用凭证"}');
+ expect(res.isError).toBe(true);
+ });
+
+ test("纯 JSON:error 为对象且 error.message 非空视为错误", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText(
+ JSON.stringify({ error: { message: "error: no credentials" } })
+ );
+ expect(res.isError).toBe(true);
+ });
+
+ test.each([
+ '{"error":true}',
+ '{"error":42}',
+ ])("纯 JSON:error 为非字符串类型也应视为错误(%s)", (body) => {
+ const res = detectUpstreamErrorFromSseOrJsonText(body);
+ expect(res.isError).toBe(true);
+ });
+
+ test("JSON 数组输入不视为错误(目前不做解析)", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('[{"error":"something"}]');
+ expect(res.isError).toBe(false);
+ });
+
+ test("detail 应对 Bearer token 做脱敏(避免泄露到日志/Redis/DB)", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"error":"Bearer abc.def_ghi"}');
+ expect(res.isError).toBe(true);
+ if (res.isError) {
+ const detail = res.detail ?? "";
+ expect(detail).toContain("Bearer [REDACTED]");
+ expect(detail).not.toContain("abc.def_ghi");
+ }
+ });
+
+ test("detail 应对常见 API key 前缀做脱敏(避免泄露到日志/Redis/DB)", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"error":"sk-1234567890abcdef123456"}');
+ expect(res.isError).toBe(true);
+ if (res.isError) {
+ const detail = res.detail ?? "";
+ expect(detail).toContain("[REDACTED_KEY]");
+ expect(detail).not.toContain("sk-1234567890abcdef123456");
+ }
+ });
+
+ test("detail 应对 JWT 做脱敏(避免泄露到日志/Redis/DB)", () => {
+ const jwt = "eyJaaaaaaaaaaaaaaa.bbbbbbbbbbbbbbbbbbbb.cccccccccccccccccccc";
+ const res = detectUpstreamErrorFromSseOrJsonText(JSON.stringify({ error: jwt }));
+ expect(res.isError).toBe(true);
+ if (res.isError) {
+ const detail = res.detail ?? "";
+ expect(detail).toContain("[JWT]");
+ expect(detail).not.toContain("eyJaaaaaaaaaaaaaaa");
+ }
+ });
+
+ test("detail 应对 email 做脱敏(避免泄露到日志/Redis/DB)", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText(
+ JSON.stringify({ error: "user@example.com is not allowed" })
+ );
+ expect(res.isError).toBe(true);
+ if (res.isError) {
+ const detail = res.detail ?? "";
+ expect(detail).toContain("[EMAIL]");
+ expect(detail).not.toContain("user@example.com");
+ }
+ });
+
+ test("detail 应对通用敏感键值做脱敏(避免泄露到日志/Redis/DB)", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText(
+ JSON.stringify({ error: 'token=abc123 secret:xyz password:"p@ss" api_key=key123' })
+ );
+ expect(res.isError).toBe(true);
+ if (res.isError) {
+ const detail = res.detail ?? "";
+ expect(detail).toContain("token:***");
+ expect(detail).toContain("secret:***");
+ expect(detail).toContain("password:***");
+ expect(detail).toContain("api_key:***");
+ expect(detail).not.toContain("abc123");
+ expect(detail).not.toContain("xyz");
+ expect(detail).not.toContain("p@ss");
+ expect(detail).not.toContain("key123");
+ }
+ });
+
+ test("detail 应对常见配置/凭证路径做脱敏(避免泄露到日志/Redis/DB)", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText(
+ JSON.stringify({ error: "failed to read /etc/app/config.yaml" })
+ );
+ expect(res.isError).toBe(true);
+ if (res.isError) {
+ const detail = res.detail ?? "";
+ expect(detail).toContain("[PATH]");
+ expect(detail).not.toContain("config.yaml");
+ }
+ });
+
+ test("detail 过长时应截断(避免把大段响应写入日志/DB)", () => {
+ const longText = "a".repeat(250);
+ const res = detectUpstreamErrorFromSseOrJsonText(JSON.stringify({ error: longText }));
+ expect(res.isError).toBe(true);
+ if (res.isError) {
+ const detail = res.detail ?? "";
+ expect(detail.endsWith("…")).toBe(true);
+ expect(detail.length).toBeLessThanOrEqual(201);
+ }
+ });
+
+ test("纯 JSON:error 为空字符串不视为错误", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"error":""}');
+ expect(res.isError).toBe(false);
+ });
+
+ test("纯 JSON:message 不包含关键字不视为错误", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"message":"all good"}');
+ expect(res.isError).toBe(false);
+ });
+
+ test("纯 JSON:小于 1000 字符且 message 包含 error 字样视为错误", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"message":"some error happened"}');
+ expect(res.isError).toBe(true);
+ });
+
+ test("纯 JSON:options.messageKeyword 可覆盖默认关键字判定", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"message":"boom happened"}', {
+ messageKeyword: /boom/i,
+ });
+ expect(res).toEqual({
+ isError: true,
+ code: "FAKE_200_JSON_MESSAGE_KEYWORD_MATCH",
+ detail: "boom happened",
+ });
+ });
+
+ test("纯 JSON:options.maxJsonCharsForMessageCheck 可关闭 message 关键字检测", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText('{"message":"some error happened"}', {
+ maxJsonCharsForMessageCheck: 5,
+ });
+ expect(res.isError).toBe(false);
+ });
+
+ test("纯 JSON:大于等于 1000 字符时不做 message 关键字判定", () => {
+ const longMessage = "a".repeat(1000);
+ const res = detectUpstreamErrorFromSseOrJsonText(
+ JSON.stringify({ message: `${longMessage} error ${longMessage}` })
+ );
+ expect(res.isError).toBe(false);
+ });
+
+ test("纯 JSON:非法 JSON 不抛错且不视为错误", () => {
+ const res = detectUpstreamErrorFromSseOrJsonText("{not-json}");
+ expect(res.isError).toBe(false);
+ });
+
+ test("SSE:data JSON 包含非空 error 字段视为错误", () => {
+ const sse = ["event: message", 'data: {"error":"当前无可用凭证"}', ""].join("\n");
+ const res = detectUpstreamErrorFromSseOrJsonText(sse);
+ expect(res.isError).toBe(true);
+ });
+
+ test("SSE:data JSON error 为对象且 error.message 非空视为错误", () => {
+ const sse = ['data: {"error":{"message":"ERROR: no credentials"}}', ""].join("\n");
+ const res = detectUpstreamErrorFromSseOrJsonText(sse);
+ expect(res.isError).toBe(true);
+ });
+
+ test("SSE:data JSON 小于 1000 字符且 message 包含 error 字样视为错误", () => {
+ const sse = ['data: {"message":"ERROR: no credentials"}', ""].join("\n");
+ const res = detectUpstreamErrorFromSseOrJsonText(sse);
+ expect(res.isError).toBe(true);
+ });
+
+ test("SSE:message 为对象时不应误判为错误", () => {
+ // 类 Anthropic SSE:message 字段通常是对象(不是错误字符串)
+ const sse = [
+ 'data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant"}}',
+ "",
+ ].join("\n");
+ const res = detectUpstreamErrorFromSseOrJsonText(sse);
+ expect(res.isError).toBe(false);
+ });
+
+ test("SSE:不包含 error/message key 时不解析且不视为错误", () => {
+ const sse = ['data: {"foo":"bar"}', ""].join("\n");
+ const res = detectUpstreamErrorFromSseOrJsonText(sse);
+ expect(res.isError).toBe(false);
+ });
+
+ test("SSE:仅有 [DONE] 不视为错误", () => {
+ const sse = ["data: [DONE]", ""].join("\n");
+ const res = detectUpstreamErrorFromSseOrJsonText(sse);
+ expect(res.isError).toBe(false);
+ });
+});
diff --git a/src/lib/utils/upstream-error-detection.ts b/src/lib/utils/upstream-error-detection.ts
new file mode 100644
index 000000000..066f1bc8f
--- /dev/null
+++ b/src/lib/utils/upstream-error-detection.ts
@@ -0,0 +1,249 @@
+import { parseSSEData } from "@/lib/utils/sse";
+
+/**
+ * 上游“假 200”错误检测(仅用于内部统计/熔断/故障转移判定)。
+ *
+ * 背景
+ * - 一些上游供应商在鉴权/配额/风控等错误场景下,会返回 HTTP 200,
+ * 但在 body 里给出错误 JSON(例如:`{"error":"当前无可用凭证"}`)。
+ * - 在流式 SSE 场景中,这类错误可能被包裹在某个 `data: {...}` 事件里。
+ * - CCH 在“已开始向客户端透传 SSE”后,无法再把 HTTP 状态码改成 4xx/5xx,
+ * 也无法阻止错误内容继续被传递到客户端。
+ *
+ * 为什么还要检测
+ * - 我们至少要让 CCH 自己意识到“这次请求实际上是失败的”,从而:
+ * 1) 触发故障转移/供应商熔断的失败统计;
+ * 2) 避免把 session 智能绑定(粘性)更新到一个实际不可用的 provider;
+ * 3) 让客户端下一次自动重试时,有机会切换到其他 provider(避免“假 200”导致重试仍复用同一坏 provider)。
+ *
+ * 设计目标(偏保守)
+ * - 仅基于结构化字段做启发式判断:`error` 与 `message`;
+ * - 不扫描模型生成的正文内容(例如 content/choices),避免把用户/模型自然语言里的 "error" 误判为上游错误;
+ * - message 关键字检测仅对“小体积 JSON”启用,降低误判与性能开销。
+ * - 返回的 `code` 是语言无关的错误码(便于写入 DB/监控/告警);
+ * - 返回的 `detail`(如有)会做脱敏与截断:用于日志排查,但不建议直接作为用户展示文案。
+ */
+export type UpstreamErrorDetectionResult =
+ | { isError: false }
+ | {
+ isError: true;
+ code: string;
+ detail?: string;
+ };
+
+type DetectionOptions = {
+ /**
+ * 仅对小体积 JSON 启用 message 关键字检测,避免误判与无谓开销。
+ *
+ * 说明:这里的“体积”是原始 JSON 文本(或 SSE 单个 data 的 JSON)序列化后的字符数,
+ * 而不是 HTTP 的 Content-Length。
+ */
+ maxJsonCharsForMessageCheck?: number;
+ /**
+ * message 关键字匹配规则(默认 /error/i)。
+ *
+ * 注意:该规则只用于检查 `message` 字段(字符串)。
+ * `error.message` 属于更强信号:只要 `error` 非空(含对象形式),就会直接判定为错误。
+ */
+ messageKeyword?: RegExp;
+};
+
+const DEFAULT_MAX_JSON_CHARS_FOR_MESSAGE_CHECK = 1000;
+const DEFAULT_MESSAGE_KEYWORD = /error/i;
+
+const FAKE_200_CODES = {
+ EMPTY_BODY: "FAKE_200_EMPTY_BODY",
+ JSON_ERROR_NON_EMPTY: "FAKE_200_JSON_ERROR_NON_EMPTY",
+ JSON_ERROR_MESSAGE_NON_EMPTY: "FAKE_200_JSON_ERROR_MESSAGE_NON_EMPTY",
+ JSON_MESSAGE_KEYWORD_MATCH: "FAKE_200_JSON_MESSAGE_KEYWORD_MATCH",
+} as const;
+
+// SSE 快速过滤:仅当文本里“看起来存在 JSON key”时才进入 parseSSEData(避免无谓解析)。
+// 注意:这里必须是 `"key"\s*:` 形式,避免误命中 JSON 字符串内容里的 `\"key\"`。
+const MAY_HAVE_JSON_ERROR_KEY = /"error"\s*:/;
+const MAY_HAVE_JSON_MESSAGE_KEY = /"message"\s*:/;
+
+function isPlainRecord(value: unknown): value is Record {
+ return !!value && typeof value === "object" && !Array.isArray(value);
+}
+
+function hasNonEmptyValue(value: unknown): boolean {
+ // 这里的“非空”是为了判断“error 字段是否有内容”。
+ // - string:trim 后非空
+ // - number:非 0 且非 NaN(避免把默认 0 当作错误)
+ // - boolean:true 视为非空
+ // - array/object:存在元素/键才算非空
+ if (value === null || value === undefined) return false;
+ if (typeof value === "string") return value.trim().length > 0;
+ if (typeof value === "number") return !Number.isNaN(value) && value !== 0;
+ if (typeof value === "boolean") return value;
+ if (Array.isArray(value)) return value.length > 0;
+ if (typeof value === "object") return Object.keys(value as Record).length > 0;
+ return true;
+}
+
+function sanitizeErrorTextForDetail(text: string): string {
+ // 注意:这里的目的不是“完美脱敏”,而是尽量降低上游错误信息中意外夹带敏感内容的风险。
+ // 若后续发现更多敏感模式,可在不改变检测语义的前提下补充。
+ let sanitized = text;
+
+ // Bearer token
+ sanitized = sanitized.replace(/Bearer\s+[A-Za-z0-9._-]+/gi, "Bearer [REDACTED]");
+
+ // Common API key prefixes (OpenAI/Claude/Codex 等)
+ sanitized = sanitized.replace(/\b(?:sk|rk|pk)-[A-Za-z0-9_-]{16,}\b/giu, "[REDACTED_KEY]");
+ sanitized = sanitized.replace(/\bAIza[0-9A-Za-z_-]{16,}\b/g, "[REDACTED_KEY]");
+
+ // JWT(base64url 三段)
+ sanitized = sanitized.replace(
+ /\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b/g,
+ "[JWT]"
+ );
+
+ // Email
+ sanitized = sanitized.replace(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b/g, "[EMAIL]");
+
+ // 通用敏感键值(尽量覆盖常见写法)
+ sanitized = sanitized.replace(
+ /\b(password|token|secret|api[_-]?key)\b\s*[:=]\s*['"]?[^'"\s]+['"]?/gi,
+ "$1:***"
+ );
+
+ // 常见配置/凭证路径(避免把文件名/路径泄露到审计字段里)
+ sanitized = sanitized.replace(/\/[\w.-]+\.(?:env|ya?ml|json|conf|ini)/gi, "[PATH]");
+
+ return sanitized;
+}
+
+function truncateForDetail(text: string, maxLen: number = 200): string {
+ const trimmed = sanitizeErrorTextForDetail(text).trim();
+ if (trimmed.length <= maxLen) return trimmed;
+ return `${trimmed.slice(0, maxLen)}…`;
+}
+
+function detectFromJsonObject(
+ obj: Record,
+ rawJsonChars: number,
+ options: Required>
+): UpstreamErrorDetectionResult {
+ // 判定优先级:
+ // 1) `error` 非空:直接判定为错误(强信号)
+ // 2) 小体积 JSON 下,`message` 命中关键字:判定为错误(弱信号,但能覆盖部分“错误只写在 message”场景)
+ const errorValue = obj.error;
+ if (hasNonEmptyValue(errorValue)) {
+ // 优先展示 string 或 error.message,避免把整个对象塞进 detail
+ if (typeof errorValue === "string") {
+ return {
+ isError: true,
+ code: FAKE_200_CODES.JSON_ERROR_NON_EMPTY,
+ detail: truncateForDetail(errorValue),
+ };
+ }
+
+ if (isPlainRecord(errorValue) && typeof errorValue.message === "string") {
+ return {
+ isError: true,
+ code: FAKE_200_CODES.JSON_ERROR_MESSAGE_NON_EMPTY,
+ detail: truncateForDetail(errorValue.message),
+ };
+ }
+
+ return { isError: true, code: FAKE_200_CODES.JSON_ERROR_NON_EMPTY };
+ }
+
+ if (rawJsonChars < options.maxJsonCharsForMessageCheck) {
+ const message = typeof obj.message === "string" ? obj.message : null;
+
+ // 注意:仅检查 message 字段本身,不扫描其它字段。
+ if (message && options.messageKeyword.test(message)) {
+ return {
+ isError: true,
+ code: FAKE_200_CODES.JSON_MESSAGE_KEYWORD_MATCH,
+ detail: truncateForDetail(message),
+ };
+ }
+ }
+
+ return { isError: false };
+}
+
+/**
+ * 用于“流式 SSE 已经结束后”的补充检查:
+ * - 响应体为空:视为错误
+ * - JSON 里包含非空 error 字段:视为错误
+ * - 小于 1000 字符的 JSON:若 message 包含 "error" 字样:视为错误
+ *
+ * 注意与限制:
+ * - 该函数不负责判断 HTTP 状态码;调用方通常只在“上游返回 200 且 SSE 正常结束后”使用它。
+ * - 对 SSE 文本,仅解析 `data:` 事件中的 JSON(通过 parseSSEData)。
+ * - 如果文本不是合法 JSON / SSE,函数会返回 `{isError:false}`(不做过度猜测)。
+ */
+export function detectUpstreamErrorFromSseOrJsonText(
+ text: string,
+ options: DetectionOptions = {}
+): UpstreamErrorDetectionResult {
+ const merged: Required> =
+ {
+ maxJsonCharsForMessageCheck:
+ options.maxJsonCharsForMessageCheck ?? DEFAULT_MAX_JSON_CHARS_FOR_MESSAGE_CHECK,
+ messageKeyword: options.messageKeyword ?? DEFAULT_MESSAGE_KEYWORD,
+ };
+
+ const trimmed = text.trim();
+ if (!trimmed) {
+ return { isError: true, code: FAKE_200_CODES.EMPTY_BODY };
+ }
+
+ // 情况 1:纯 JSON(对象)
+ // 上游可能 Content-Type 设置为 SSE,但实际上返回 JSON;此处只处理对象格式({...}),
+ // 不处理数组([...])以避免误判(数组场景的语义差异较大,后续若确认需要再扩展)。
+ if (trimmed.startsWith("{")) {
+ try {
+ const parsed = JSON.parse(trimmed) as unknown;
+ if (isPlainRecord(parsed)) {
+ return detectFromJsonObject(parsed, trimmed.length, merged);
+ }
+ } catch {
+ // JSON 解析失败:不视为错误,交由上层逻辑处理
+ }
+ return { isError: false };
+ }
+
+ if (trimmed.startsWith("[")) {
+ return { isError: false };
+ }
+
+ // 情况 2:SSE 文本。快速过滤:既无 "error"/"message" key 时跳过解析
+ // 注意:这里要求 key 命中 `"key"\s*:`,尽量避免误命中 JSON 字符串内容里的 `\"error\"`。
+ if (!MAY_HAVE_JSON_ERROR_KEY.test(text) && !MAY_HAVE_JSON_MESSAGE_KEY.test(text)) {
+ return { isError: false };
+ }
+
+ // parseSSEData 会把每个事件的 data 尝试解析成对象;我们只对 object data 做结构化判定。
+ const events = parseSSEData(text);
+ for (const evt of events) {
+ if (!isPlainRecord(evt.data)) continue;
+ // 性能优化:只有在 message 是字符串、且“看起来足够小”时才需要精确计算 JSON 字符数。
+ // 对大多数 SSE 事件(message 为对象、或没有 message),无需 JSON.stringify。
+ let chars = 0;
+ const errorValue = evt.data.error;
+ const messageValue = evt.data.message;
+ if (!hasNonEmptyValue(errorValue) && typeof messageValue === "string") {
+ if (messageValue.length >= merged.maxJsonCharsForMessageCheck) {
+ chars = merged.maxJsonCharsForMessageCheck; // >= 阈值即可跳过 message 关键字判定
+ } else {
+ try {
+ chars = JSON.stringify(evt.data).length;
+ } catch {
+ // stringify 失败时回退为近似值(仍保持“仅小体积 JSON 才做 message 检测”的意图)
+ chars = messageValue.length;
+ }
+ }
+ }
+
+ const res = detectFromJsonObject(evt.data, chars, merged);
+ if (res.isError) return res;
+ }
+
+ return { isError: false };
+}
diff --git a/tests/unit/lib/env-store-session-response-body.test.ts b/tests/unit/lib/env-store-session-response-body.test.ts
new file mode 100644
index 000000000..bcb25c45c
--- /dev/null
+++ b/tests/unit/lib/env-store-session-response-body.test.ts
@@ -0,0 +1,44 @@
+import { afterEach, describe, expect, it } from "vitest";
+import { EnvSchema } from "@/lib/config/env.schema";
+
+describe("EnvSchema - STORE_SESSION_RESPONSE_BODY", () => {
+ const originalEnv = process.env.STORE_SESSION_RESPONSE_BODY;
+
+ afterEach(() => {
+ if (originalEnv === undefined) {
+ delete process.env.STORE_SESSION_RESPONSE_BODY;
+ } else {
+ process.env.STORE_SESSION_RESPONSE_BODY = originalEnv;
+ }
+ });
+
+ it("should default to true when not set", () => {
+ delete process.env.STORE_SESSION_RESPONSE_BODY;
+ const result = EnvSchema.parse(process.env);
+ expect(result.STORE_SESSION_RESPONSE_BODY).toBe(true);
+ });
+
+ it("should parse 'true' as true", () => {
+ process.env.STORE_SESSION_RESPONSE_BODY = "true";
+ const result = EnvSchema.parse(process.env);
+ expect(result.STORE_SESSION_RESPONSE_BODY).toBe(true);
+ });
+
+ it("should parse 'false' as false", () => {
+ process.env.STORE_SESSION_RESPONSE_BODY = "false";
+ const result = EnvSchema.parse(process.env);
+ expect(result.STORE_SESSION_RESPONSE_BODY).toBe(false);
+ });
+
+ it("should parse '0' as false", () => {
+ process.env.STORE_SESSION_RESPONSE_BODY = "0";
+ const result = EnvSchema.parse(process.env);
+ expect(result.STORE_SESSION_RESPONSE_BODY).toBe(false);
+ });
+
+ it("should parse '1' as true", () => {
+ process.env.STORE_SESSION_RESPONSE_BODY = "1";
+ const result = EnvSchema.parse(process.env);
+ expect(result.STORE_SESSION_RESPONSE_BODY).toBe(true);
+ });
+});
diff --git a/tests/unit/lib/session-manager-redaction.test.ts b/tests/unit/lib/session-manager-redaction.test.ts
index ae9b24cef..3cc3c24f3 100644
--- a/tests/unit/lib/session-manager-redaction.test.ts
+++ b/tests/unit/lib/session-manager-redaction.test.ts
@@ -48,9 +48,11 @@ vi.mock("@/lib/redis", () => ({
// Mock config - we'll control STORE_SESSION_MESSAGES dynamically
let mockStoreMessages = false;
+let mockStoreSessionResponseBody = true;
vi.mock("@/lib/config/env.schema", () => ({
getEnvConfig: () => ({
STORE_SESSION_MESSAGES: mockStoreMessages,
+ STORE_SESSION_RESPONSE_BODY: mockStoreSessionResponseBody,
SESSION_TTL: 300,
}),
}));
@@ -62,10 +64,12 @@ describe("SessionManager - Redaction based on STORE_SESSION_MESSAGES", () => {
beforeEach(() => {
vi.clearAllMocks();
mockStoreMessages = false; // default: redact
+ mockStoreSessionResponseBody = true; // default: store response body
});
afterEach(() => {
mockStoreMessages = false;
+ mockStoreSessionResponseBody = true;
});
describe("storeSessionMessages", () => {
@@ -160,6 +164,14 @@ describe("SessionManager - Redaction based on STORE_SESSION_MESSAGES", () => {
});
describe("storeSessionResponse", () => {
+ it("should skip storing response body when STORE_SESSION_RESPONSE_BODY=false", async () => {
+ mockStoreSessionResponseBody = false;
+
+ await SessionManager.storeSessionResponse("sess_disabled", '{"message":"hello"}', 1);
+
+ expect(redisMock.setex).not.toHaveBeenCalled();
+ });
+
it("should store redacted JSON response when STORE_SESSION_MESSAGES=false", async () => {
mockStoreMessages = false;
const responseBody = {