From 8e954d919077093c7d25fd9c8dcc14f1ffaf0629 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 17 Jun 2026 05:08:28 +0300 Subject: [PATCH] refactor: share an instrumentation emitter and harden body capture The sync and async instrumentation steps duplicated ~200 lines of emit/redact/preview/metrics logic. Extract a shared InstrumentationEmitters that owns the event shape, header redaction, body wrapping, and metric instruments; both steps now delegate to it and differ only in how they thread the downstream call (synchronous process vs. a CompletableFuture continuation). While consolidating, fix two body-capture problems the shared path now handles once for both steps: - Stop the sync step from stalling time-to-first-byte. The async step already skipped body capture for unknown-length (contentLength() < 0) response bodies because the bounded drain runs on the completion thread and would block on a slow producer; the sync step had no such guard and drained eagerly on the caller's thread, so an SSE / long-poll / chunked-trickle endpoint logged at BODY_AND_HEADERS did not return the response until the preview cap filled or the producer hit EOF. The shared wrapResponseForLogging applies the contentLength() < 0 skip to both steps, so unknown-length bodies stream to the caller unwrapped with no preview. - Make the true body size observable. The *.body.size fields are derived from the capped preview snapshot, so they saturate at bodyPreviewMaxBytes and a dashboard cannot tell an 8 KiB body from an 8 GB one. The events now also carry *.body.actual_size (the full length, emitted when known) and *.body.preview_truncated (true when the preview is only a prefix). Existing field names and metric/event names are unchanged. Docs and the HttpInstrumentationOptions KDoc are updated to describe the new fields and the symmetric streaming-body skip. Closes #26 Closes #107 Closes #108 --- docs/http-body-logging-and-concurrency.md | 45 ++- .../steps/DefaultAsyncInstrumentationStep.kt | 305 ++------------ .../steps/DefaultInstrumentationStep.kt | 273 +------------ .../steps/HttpInstrumentationOptions.kt | 22 +- .../pipeline/steps/InstrumentationEmitters.kt | 372 ++++++++++++++++++ .../http/response/LoggableResponseBody.kt | 9 + .../steps/AsyncInstrumentationStepTest.kt | 36 ++ .../pipeline/steps/InstrumentationStepTest.kt | 145 ++++++- 8 files changed, 642 insertions(+), 565 deletions(-) create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationEmitters.kt diff --git a/docs/http-body-logging-and-concurrency.md b/docs/http-body-logging-and-concurrency.md index 1212a4f2..946ad015 100644 --- a/docs/http-body-logging-and-concurrency.md +++ b/docs/http-body-logging-and-concurrency.md @@ -258,27 +258,34 @@ above). The preview you see in the log is a prefix; the consumer still reads eve **2. The logged size fields measure different things.** The step emits two size-related fields on the `http.response` event, and they are not the same number for an over-cap body: -| Field | Source | What it reports | -|---------------------------|----------------------------------------------|---------------------------------------------------------------------------------| -| `response.body.size` | `loggableBody.snapshot(bodyPreviewMaxBytes)` | Size of the **captured preview** — bounded by `bodyPreviewMaxBytes` | -| `response.body.preview` | the same captured bytes, decoded as UTF-8 | The preview text (a prefix for an over-cap body) | -| `response.content.length` | `response.body.contentLength()` | The body's **true** length when the origin declared one (`Content-Length`); `-1` for unknown-length / streaming bodies | +| Field | Source | What it reports | +|----------------------------------|----------------------------------------------|---------------------------------------------------------------------------------| +| `response.body.size` | `loggableBody.snapshot(bodyPreviewMaxBytes)` | Size of the **captured preview** — bounded by `bodyPreviewMaxBytes` | +| `response.body.actual_size` | `loggableBody.contentLength()` | The body's **true** size, emitted only when known (`>= 0`); omitted for unknown-length bodies | +| `response.body.preview_truncated`| derived from the capture (`isFullyCaptured`) | `true` when the preview is only a prefix of a larger body, `false` when the whole body fit the cap | +| `response.body.preview` | the same captured bytes, charset-aware | The preview text (a prefix for an over-cap body) | +| `response.content.length` | `response.body.contentLength()` | The body's **true** length when the origin declared one (`Content-Length`); `-1` for unknown-length / streaming bodies | So `response.body.size` is the *captured/preview* size, **not** necessarily the full body size. -When a body exceeds the cap, `response.body.size` saturates near `bodyPreviewMaxBytes` while -`response.content.length` still shows the real length. Read `content.length` (not -`body.size`) when you need the full size, and treat `body.preview` as a prefix that may be -truncated. The two agree only when the whole body fit within the cap — exactly the case where -`contentLength()` itself returns the captured size (see **`contentLength()`** above). - -**Streaming / unknown-length bodies (async path).** `DefaultAsyncInstrumentationStep` skips the -capture entirely when `contentLength() < 0`, because the bounded drain would run on the -future-completion thread and a slow producer could stall it. Such bodies stream to the consumer -unwrapped, so they carry **no** `response.body.size` / `response.body.preview` fields at all — -absence of those fields is expected for chunked or streaming responses, not a logging bug. The -synchronous `DefaultInstrumentationStep` drains known-length and unknown-length bodies alike (it -runs on the caller's thread), but the size-vs-preview distinction above applies to it just the -same. +When a body exceeds the cap, `response.body.size` saturates near `bodyPreviewMaxBytes` — but +`response.body.actual_size` carries the true length and `response.body.preview_truncated` is +`true`, so a dashboard keyed on body size no longer flatlines at the cap. Read `actual_size` (or +`content.length`) when you need the full size, and treat `body.preview` as a prefix that may be +truncated. `response.body.size` and `response.body.actual_size` agree only when the whole body +fit within the cap — exactly the case where `contentLength()` itself returns the captured size +(see **`contentLength()`** above). The request body carries the matching +`request.body.actual_size` / `request.body.preview_truncated` fields, derived from the request +body's declared `contentLength()`. + +**Streaming / unknown-length bodies.** Both `DefaultInstrumentationStep` and +`DefaultAsyncInstrumentationStep` skip the capture entirely when `contentLength() < 0`. The +bounded drain runs on whichever thread completes the call — the caller's thread in the sync step, +the future-completion thread in the async step — and draining an unknown-length body would block +that thread on a slow producer (SSE, long-poll, chunked trickle), stalling time-to-first-byte. +Such bodies stream to the consumer unwrapped, so they carry **no** `response.body.size` / +`response.body.actual_size` / `response.body.preview` fields at all — absence of those fields is +expected for chunked or streaming responses, not a logging bug. `response.content.length` is +still emitted (as `-1`). ### Reading a Snapshot diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt index 99d02131..da66186e 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt @@ -7,24 +7,16 @@ package org.dexpace.sdk.core.http.pipeline.steps -import org.dexpace.sdk.core.http.common.Headers -import org.dexpace.sdk.core.http.common.HttpHeaderName import org.dexpace.sdk.core.http.pipeline.AsyncHttpStep import org.dexpace.sdk.core.http.pipeline.AsyncPipelineNext import org.dexpace.sdk.core.http.pipeline.Stage import org.dexpace.sdk.core.http.request.LoggableRequestBody import org.dexpace.sdk.core.http.request.Request -import org.dexpace.sdk.core.http.response.LoggableResponseBody import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.instrumentation.ClientLogger -import org.dexpace.sdk.core.instrumentation.LoggingEvent import org.dexpace.sdk.core.instrumentation.MdcSnapshot import org.dexpace.sdk.core.instrumentation.Span -import org.dexpace.sdk.core.instrumentation.UrlRedactor import org.dexpace.sdk.core.instrumentation.makeCurrentWithLoggingContext -import org.dexpace.sdk.core.instrumentation.metrics.DoubleHistogram -import org.dexpace.sdk.core.instrumentation.metrics.LongCounter -import org.dexpace.sdk.core.io.Io import org.dexpace.sdk.core.util.Clock import org.dexpace.sdk.core.util.Futures import java.util.concurrent.CompletableFuture @@ -33,8 +25,9 @@ import java.util.concurrent.CompletionException /** * Async counterpart of [DefaultInstrumentationStep]. Emits the same `http.request` / * `http.response` / failure events, drives the same span and metric lifecycle, and is - * configured with the same [HttpInstrumentationOptions]. The only structural difference - * is that [processAsync] returns a [CompletableFuture] composed via `.handle` so the + * configured with the same [HttpInstrumentationOptions]. The emit / redact / preview / metrics + * logic is shared with the sync step via [InstrumentationEmitters]; the only structural + * difference is that [processAsync] returns a [CompletableFuture] composed via `.handle` so the * response event fires on the completion thread. * * ## Span-thread caveat @@ -49,11 +42,11 @@ import java.util.concurrent.CompletionException * * ## Body capture / failure semantics * - * Mostly identical to the sync step — see its KDoc. One difference: the response-body drain - * runs on the future-completion thread here, so an **unknown-length** (streaming) response - * body (`contentLength() < 0`) is left unwrapped — draining it could block the completion - * thread on a slow/idle producer. Known-length bodies are wrapped and bounded to the preview - * size as in the sync step. + * Identical to the sync step — see its KDoc and [InstrumentationEmitters]. As there, an + * **unknown-length** (streaming) response body (`contentLength() < 0`) is left unwrapped: + * the response-body drain runs on the future-completion thread here, and draining a slow / idle + * producer would stall that thread. Known-length bodies are wrapped and bounded to the preview + * size. * * ## Thread-safety * @@ -69,40 +62,21 @@ public class DefaultAsyncInstrumentationStep ) : AsyncHttpStep { override val stage: Stage = Stage.LOGGING - // Lazily constructed so a step instance never installed in a pipeline doesn't pay - // the counter/histogram registration cost. `PUBLICATION` mode avoids the synchronized - // block that the default `SYNCHRONIZED` mode uses for first-read coordination — that - // monitor would pin a virtual-thread carrier during init (see CLAUDE.md "ReentrantLock - // over synchronized"). With OTel meter implementations being idempotent on register-by- - // name, racing initializers cost at most one redundant registration that is discarded. - private val requestCounter: LongCounter by lazy(LazyThreadSafetyMode.PUBLICATION) { - options.meter.counter( - name = "http.client.request.count", - description = "Total HTTP requests sent through this pipeline.", - unit = "{request}", - ) - } - private val latencyHistogram: DoubleHistogram by lazy(LazyThreadSafetyMode.PUBLICATION) { - options.meter.histogram( - name = "http.client.request.duration", - description = "End-to-end duration of the downstream pipeline per request.", - unit = "ms", - ) - } + private val emitters = InstrumentationEmitters(options, clock, logger) override fun processAsync( request: Request, next: AsyncPipelineNext, ): CompletableFuture { - val redactedUrl = safeRedact(request) + val redactedUrl = emitters.safeRedact(request) val span = options.tracer.startSpan( name = "http ${request.method.name}", - attributes = spanAttributes(request, redactedUrl), + attributes = emitters.spanAttributes(request, redactedUrl), ) val startNanos = clock.monotonic() - val (outgoing, wrappedRequestBody) = buildOutgoingRequest(request) + val (outgoing, wrappedRequestBody) = emitters.wrapRequestBody(request) // Synchronous portion is wrapped in the MDC-aware scope. The scope closes BEFORE // the future's continuation runs — so the response/failure events emitted in @@ -114,15 +88,15 @@ public class DefaultAsyncInstrumentationStep span.makeCurrentWithLoggingContext().use { // Capture after the scope has pushed trace.id / span.id so the snapshot carries them. mdc = MdcSnapshot.capture() - emitRequestEvent(outgoing, redactedUrl) + emitters.emitRequestEvent(outgoing, redactedUrl) try { next.processAsync(outgoing) } catch (e: Throwable) { // Synchronous throw from the next step (e.g. argument validation). // Normalise to a failed future so callers get the uniform async contract. - val elapsedMs = elapsedMillis(startNanos) - emitFailureEvent(outgoing, redactedUrl, e, elapsedMs, wrappedRequestBody) - recordMetrics( + val elapsedMs = emitters.elapsedMillis(startNanos) + emitters.emitFailureEvent(outgoing, redactedUrl, e, elapsedMs, wrappedRequestBody) + emitters.recordMetrics( request, statusCode = -1, elapsedMs, @@ -149,30 +123,6 @@ public class DefaultAsyncInstrumentationStep } } - /** - * Wraps the request body in a [LoggableRequestBody] when body capture is enabled. - * Returns the (possibly rewritten) outgoing request and the wrapper (or null when - * capture is disabled or the original request has no body). - */ - private fun buildOutgoingRequest(request: Request): Pair { - val requestBody = request.body - val wrappedRequestBody = - if (shouldCaptureBody() && requestBody != null) { - // Cap the request-side tap so a multi-GB upload only mirrors a bounded preview - // into memory while still streaming the full payload to the transport. - LoggableRequestBody.bounded(requestBody, Io.provider, options.bodyPreviewMaxBytes.toLong()) - } else { - null - } - val outgoing = - if (wrappedRequestBody != null) { - request.newBuilder().body(wrappedRequestBody).build() - } else { - request - } - return outgoing to wrappedRequestBody - } - /** * Handles completion of the downstream future (either success or failure). * Emits the appropriate instrumentation events, records metrics, ends the span, @@ -196,13 +146,13 @@ public class DefaultAsyncInstrumentationStep redactedUrl: String, wrappedRequestBody: LoggableRequestBody?, ): Response { - val elapsedMs = elapsedMillis(startNanos) + val elapsedMs = emitters.elapsedMillis(startNanos) if (err != null) { // CompletableFuture wraps with CompletionException; unwrap so events see the original. val cause = Futures.unwrap(err) mdc.withMdc { - emitFailureEvent(outgoing, redactedUrl, cause, elapsedMs, wrappedRequestBody) - recordMetrics( + emitters.emitFailureEvent(outgoing, redactedUrl, cause, elapsedMs, wrappedRequestBody) + emitters.recordMetrics( request, statusCode = -1, elapsedMs, @@ -211,7 +161,7 @@ public class DefaultAsyncInstrumentationStep span.end(cause) } // Re-throw via the future graph — handle's return becomes the new value/completion. - // B4: The CompletionException wrap is correct per CompletableFuture.join() semantics: + // The CompletionException wrap is correct per CompletableFuture.join() semantics: // join() rethrows a stored CompletionException as-is and the unwrap chain via // Futures.unwrap exposes the original cause. RuntimeException / Error are rethrown // directly since join() would not re-wrap them. Callers should use Futures.unwrap() @@ -223,221 +173,12 @@ public class DefaultAsyncInstrumentationStep } } else { return mdc.withMdc { - val wrapped = wrapResponseForLogging(raw!!) - emitResponseEvent(outgoing, wrapped, redactedUrl, elapsedMs, wrappedRequestBody) - recordMetrics(request, statusCode = wrapped.status.code, elapsedMs, errorType = null) + val wrapped = emitters.wrapResponseForLogging(raw!!) + emitters.emitResponseEvent(outgoing, wrapped, redactedUrl, elapsedMs, wrappedRequestBody) + emitters.recordMetrics(request, statusCode = wrapped.status.code, elapsedMs, errorType = null) span.end() wrapped } } } - - // ===== Helpers duplicated from DefaultInstrumentationStep ===== - // Duplication is intentional for this commit. - // TODO(omar 2026-08-01): extract InstrumentationEmitters shared helper used by both DefaultInstrumentationStep and DefaultAsyncInstrumentationStep - - private fun shouldCaptureBody(): Boolean = options.logLevel == HttpLogLevel.BODY_AND_HEADERS - - private fun wrapResponseForLogging(response: Response): Response { - val responseBody = response.body - if (!shouldCaptureBody() || responseBody == null) return response - // The bounded drain below runs on the future-completion thread. For an unknown-length - // (streaming) body the read could block on a slow/idle producer and stall the - // completion thread, so we skip body capture entirely for contentLength() < 0 — the - // body streams to the caller unwrapped with no preview. Known-length bodies are safe - // to drain up to the bounded preview size. - if (responseBody.contentLength() < 0L) return response - // Bound the in-memory capture to the preview size. The full body still streams to the - // caller via the wrapper's live tail; only a preview prefix is buffered. - val wrapped = LoggableResponseBody.bounded(responseBody, Io.provider, options.bodyPreviewMaxBytes.toLong()) - // Force drain so we have bytes to log. Done here (not in the event emit) so a logging - // failure can't mask the drain error from the caller — they still get the wrapped body - // with the cached exception surfaced on source(). - try { - wrapped.snapshot(options.bodyPreviewMaxBytes) - } catch (t: Throwable) { - // Drain itself records the exception on the wrapper; capture for emit, but don't - // let it propagate — the caller will see it on next source() call. - logger.atWarning() - .event("http.instrumentation.response_drain_failed") - .field("error.type", t.javaClass.simpleName ?: "Throwable") - .cause(t) - .log() - } - return response.newBuilder().body(wrapped).build() - } - - private fun emitRequestEvent( - request: Request, - redactedUrl: String, - ) { - if (options.logLevel == HttpLogLevel.NONE) return - try { - val ev = - logger.atInfo() - .event("http.request") - .field("http.request.method", request.method.name) - .field("url.full", redactedUrl) - .field("request.content.length", request.body?.contentLength() ?: -1L) - appendHeadersFields(ev, request.headers, prefix = "http.request.header.") - ev.log() - } catch (t: Throwable) { - emitInstrumentationError("http.instrumentation.emit_request_failed", "request_event", t) - } - } - - private fun emitResponseEvent( - request: Request, - response: Response, - redactedUrl: String, - elapsedMs: Double, - requestBody: LoggableRequestBody?, - ) { - if (options.logLevel == HttpLogLevel.NONE) return - try { - val ev = - logger.atInfo() - .event("http.response") - .field("http.request.method", request.method.name) - .field("url.full", redactedUrl) - .field("http.response.status_code", response.status.code.toLong()) - .field("http.response.duration_ms", elapsedMs) - .field("response.content.length", response.body?.contentLength() ?: -1L) - appendHeadersFields(ev, response.headers, prefix = "http.response.header.") - if (shouldCaptureBody()) { - if (requestBody != null) { - val preview = requestBody.snapshot(options.bodyPreviewMaxBytes) - ev.field("request.body.size", preview.size.toLong()) - .field("request.body.preview", BodyPreview.render(preview, requestBody.mediaType())) - } - val responseBody = response.body - if (responseBody is LoggableResponseBody) { - val preview = responseBody.snapshot(options.bodyPreviewMaxBytes) - ev.field("response.body.size", preview.size.toLong()) - .field("response.body.preview", BodyPreview.render(preview, responseBody.mediaType())) - responseBody.captureException?.let { - ev.field("response.body.drain_error", it.javaClass.simpleName ?: "Throwable") - } - } - } - ev.log() - } catch (t: Throwable) { - emitInstrumentationError("http.instrumentation.emit_response_failed", "response_event", t) - } - } - - private fun emitFailureEvent( - request: Request, - redactedUrl: String, - cause: Throwable, - elapsedMs: Double, - requestBody: LoggableRequestBody?, - ) { - if (options.logLevel == HttpLogLevel.NONE) return - try { - val ev = - logger.atWarning() - .event("http.response") - .field("http.request.method", request.method.name) - .field("url.full", redactedUrl) - .field("error.type", cause.javaClass.simpleName ?: "Throwable") - .field("http.response.duration_ms", elapsedMs) - .cause(cause) - if (shouldCaptureBody() && requestBody != null) { - val preview = requestBody.snapshot(options.bodyPreviewMaxBytes) - ev.field("request.body.size", preview.size.toLong()) - .field("request.body.preview", BodyPreview.render(preview, requestBody.mediaType())) - } - ev.log() - } catch (t: Throwable) { - emitInstrumentationError("http.instrumentation.emit_failure_failed", "failure_event", t) - } - } - - private fun appendHeadersFields( - ev: LoggingEvent, - headers: Headers, - prefix: String, - ) { - // Iterate the headers actually present rather than the allow-list — the allow-list is - // usually larger than the headers on any one request. - for ((nameLower, values) in headers.entries()) { - val typed = HttpHeaderName.fromString(nameLower) - when { - options.allowedHeaderNames.contains(typed) -> - ev.field(prefix + nameLower, joinHeaderValues(values)) - options.isRedactedHeaderNamesLoggingEnabled -> - ev.field(prefix + nameLower, "REDACTED") - // else: silently omit - } - } - } - - private fun joinHeaderValues(values: List): String = - if (values.size == 1) values[0] else values.joinToString(", ") - - private fun safeRedact(request: Request): String = - try { - UrlRedactor.redact(request.url, options.allowedQueryParamNames) - } catch (t: Throwable) { - "[malformed url]" - } - - private fun spanAttributes( - request: Request, - redactedUrl: String, - ): Map = - mapOf( - "http.request.method" to request.method.name, - "url.full" to redactedUrl, - ) - - private fun recordMetrics( - request: Request, - statusCode: Int, - elapsedMs: Double, - errorType: String?, - ) { - val attrs: Map = - if (errorType != null) { - mapOf( - "http.request.method" to request.method.name, - "error.type" to errorType, - ) - } else { - mapOf( - "http.request.method" to request.method.name, - "http.response.status_code" to statusCode, - ) - } - requestCounter.add(1L, attrs) - latencyHistogram.record(elapsedMs, attrs) - } - - private fun elapsedMillis(startNanos: Long): Double = (clock.monotonic() - startNanos) / NANOS_PER_MILLI_DOUBLE - - private fun emitInstrumentationError( - event: String, - phase: String, - t: Throwable, - ) { - // Best-effort secondary log. If this also throws, swallow — we're inside the logging - // path already, and a thrown exception would corrupt the outer caller's flow. - try { - logger.atWarning() - .event(event) - .field("error.phase", phase) - .field("error.type", t.javaClass.simpleName ?: "Throwable") - .cause(t) - .log() - } catch (_: Throwable) { - // Intentionally swallowed — logging is best-effort. - } - } - - private companion object { - // Nanoseconds in one millisecond, expressed as Double so the division returns - // millisecond fractions (e.g. 1.234 ms) for high-resolution latency histograms. - private const val NANOS_PER_MILLI_DOUBLE = 1_000_000.0 - } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultInstrumentationStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultInstrumentationStep.kt index 2b7e84b1..fa284f5c 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultInstrumentationStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultInstrumentationStep.kt @@ -7,21 +7,13 @@ package org.dexpace.sdk.core.http.pipeline.steps -import org.dexpace.sdk.core.http.common.Headers -import org.dexpace.sdk.core.http.common.HttpHeaderName import org.dexpace.sdk.core.http.pipeline.PipelineNext import org.dexpace.sdk.core.http.request.LoggableRequestBody import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.LoggableResponseBody import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.instrumentation.ClientLogger -import org.dexpace.sdk.core.instrumentation.LoggingEvent -import org.dexpace.sdk.core.instrumentation.Span -import org.dexpace.sdk.core.instrumentation.UrlRedactor import org.dexpace.sdk.core.instrumentation.makeCurrentWithLoggingContext -import org.dexpace.sdk.core.instrumentation.metrics.DoubleHistogram -import org.dexpace.sdk.core.instrumentation.metrics.LongCounter -import org.dexpace.sdk.core.io.Io import org.dexpace.sdk.core.util.Clock import java.io.IOException @@ -32,6 +24,9 @@ import java.io.IOException * [org.dexpace.sdk.core.instrumentation.Tracer], and records request count + latency on the * configured [org.dexpace.sdk.core.instrumentation.metrics.Meter]. * + * The emit / redact / preview / metrics logic is shared with [DefaultAsyncInstrumentationStep] via + * [InstrumentationEmitters]; this step owns only the synchronous control flow. + * * ## Body capture semantics * * - **Request body**: wrapped before send; bytes are captured during the transport's `writeTo` @@ -43,12 +38,14 @@ import java.io.IOException * step calls `snapshot(bodyPreviewMaxBytes)` to force the bounded drain and log a preview. * A body larger than the cap still streams in full to the caller (the wrapper replays the * captured prefix then continues from the live tail); a body within the cap stays fully - * repeatable. + * repeatable. An **unknown-length** (streaming) body (`contentLength() < 0`) is left unwrapped + * — draining it could block the caller's thread on a slow / idle producer (SSE, long-poll, + * chunked trickle), so it streams to the caller with no body preview. * * ## Failure handling * * If the downstream pipeline throws, this step emits an `http.response` event with the - * exception class / message attached, ends the span via [Span.end] (throwable variant), and + * exception class / message attached, ends the span via its throwable-variant `end`, and * rethrows. Logging itself never fails the request — exceptions inside the logging path are * caught and re-emitted as an `http.instrumentation.error` warning. * @@ -69,71 +66,38 @@ public class DefaultInstrumentationStep private val clock: Clock = Clock.SYSTEM, internal val logger: ClientLogger = ClientLogger(DefaultInstrumentationStep::class), ) : InstrumentationStep() { - // Lazily constructed so a step instance never installed in a pipeline doesn't pay - // the counter/histogram registration cost. `PUBLICATION` mode avoids the synchronized - // block that the default `SYNCHRONIZED` mode uses for first-read coordination — that - // monitor would pin a virtual-thread carrier during init (see CLAUDE.md "ReentrantLock - // over synchronized"). With OTel meter implementations being idempotent on register-by- - // name, racing initializers cost at most one redundant registration that is discarded. - private val requestCounter: LongCounter by lazy(LazyThreadSafetyMode.PUBLICATION) { - options.meter.counter( - name = "http.client.request.count", - description = "Total HTTP requests sent through this pipeline.", - unit = "{request}", - ) - } - private val latencyHistogram: DoubleHistogram by lazy(LazyThreadSafetyMode.PUBLICATION) { - options.meter.histogram( - name = "http.client.request.duration", - description = "End-to-end duration of the downstream pipeline per request.", - unit = "ms", - ) - } + private val emitters = InstrumentationEmitters(options, clock, logger) @Throws(IOException::class) override fun process( request: Request, next: PipelineNext, ): Response { - val redactedUrl = safeRedact(request) + val redactedUrl = emitters.safeRedact(request) val span = options.tracer.startSpan( name = "http ${request.method.name}", - attributes = spanAttributes(request, redactedUrl), + attributes = emitters.spanAttributes(request, redactedUrl), ) val startNanos = clock.monotonic() - val requestBody = request.body - val wrappedRequestBody = - if (shouldCaptureBody() && requestBody != null) { - // Cap the request-side tap so a multi-GB upload only mirrors a bounded preview - // into memory while still streaming the full payload to the transport. - LoggableRequestBody.bounded(requestBody, Io.provider, options.bodyPreviewMaxBytes.toLong()) - } else { - null - } - val outgoing = - if (wrappedRequestBody != null) { - request.newBuilder().body(wrappedRequestBody).build() - } else { - request - } + val (outgoing, wrappedRequestBody) = emitters.wrapRequestBody(request) val response: Response = span.makeCurrentWithLoggingContext().use { - emitRequestEvent(outgoing, redactedUrl) + emitters.emitRequestEvent(outgoing, redactedUrl) try { val raw = next.process(outgoing) - val wrapped = wrapResponseForLogging(raw) - val elapsedMs = elapsedMillis(startNanos) - emitResponseEvent(outgoing, wrapped, redactedUrl, elapsedMs, wrappedRequestBody) - recordMetrics(request, statusCode = wrapped.status.code, elapsedMs, errorType = null) + val wrapped = emitters.wrapResponseForLogging(raw) + val elapsedMs = emitters.elapsedMillis(startNanos) + emitters.emitResponseEvent(outgoing, wrapped, redactedUrl, elapsedMs, wrappedRequestBody) + emitters.recordMetrics(request, statusCode = wrapped.status.code, elapsedMs, errorType = null) span.end() wrapped } catch (t: Throwable) { - val elapsedMs = elapsedMillis(startNanos) - emitFailureEvent(outgoing, redactedUrl, t, elapsedMs, wrappedRequestBody) - recordMetrics( + val elapsedMs = emitters.elapsedMillis(startNanos) + emitters.emitFailureEvent(outgoing, redactedUrl, t, elapsedMs, wrappedRequestBody) + emitters.recordMetrics( request, statusCode = -1, elapsedMs, @@ -145,203 +109,4 @@ public class DefaultInstrumentationStep } return response } - - private fun shouldCaptureBody(): Boolean = options.logLevel == HttpLogLevel.BODY_AND_HEADERS - - private fun wrapResponseForLogging(response: Response): Response { - val responseBody = response.body - if (!shouldCaptureBody() || responseBody == null) return response - // Bound the in-memory capture to the preview size. The full body still streams to the - // caller via the wrapper's live tail; only a preview prefix is buffered. - val wrapped = LoggableResponseBody.bounded(responseBody, Io.provider, options.bodyPreviewMaxBytes.toLong()) - // Force drain so we have bytes to log. Done here (not in the event emit) so a logging - // failure can't mask the drain error from the caller — they still get the wrapped body - // with the cached exception surfaced on source(). - try { - wrapped.snapshot(options.bodyPreviewMaxBytes) - } catch (t: Throwable) { - // Drain itself records the exception on the wrapper; capture for emit, but don't - // let it propagate — the caller will see it on next source() call. - logger.atWarning() - .event("http.instrumentation.response_drain_failed") - .field("error.type", t.javaClass.simpleName ?: "Throwable") - .cause(t) - .log() - } - return response.newBuilder().body(wrapped).build() - } - - private fun emitRequestEvent( - request: Request, - redactedUrl: String, - ) { - if (options.logLevel == HttpLogLevel.NONE) return - try { - val ev = - logger.atInfo() - .event("http.request") - .field("http.request.method", request.method.name) - .field("url.full", redactedUrl) - .field("request.content.length", request.body?.contentLength() ?: -1L) - appendHeadersFields(ev, request.headers, prefix = "http.request.header.") - ev.log() - } catch (t: Throwable) { - emitInstrumentationError("http.instrumentation.emit_request_failed", "request_event", t) - } - } - - private fun emitResponseEvent( - request: Request, - response: Response, - redactedUrl: String, - elapsedMs: Double, - requestBody: LoggableRequestBody?, - ) { - if (options.logLevel == HttpLogLevel.NONE) return - try { - val ev = - logger.atInfo() - .event("http.response") - .field("http.request.method", request.method.name) - .field("url.full", redactedUrl) - .field("http.response.status_code", response.status.code.toLong()) - .field("http.response.duration_ms", elapsedMs) - .field("response.content.length", response.body?.contentLength() ?: -1L) - appendHeadersFields(ev, response.headers, prefix = "http.response.header.") - if (shouldCaptureBody()) { - if (requestBody != null) { - val preview = requestBody.snapshot(options.bodyPreviewMaxBytes) - ev.field("request.body.size", preview.size.toLong()) - .field("request.body.preview", BodyPreview.render(preview, requestBody.mediaType())) - } - val responseBody = response.body - if (responseBody is LoggableResponseBody) { - val preview = responseBody.snapshot(options.bodyPreviewMaxBytes) - ev.field("response.body.size", preview.size.toLong()) - .field("response.body.preview", BodyPreview.render(preview, responseBody.mediaType())) - responseBody.captureException?.let { - ev.field("response.body.drain_error", it.javaClass.simpleName ?: "Throwable") - } - } - } - ev.log() - } catch (t: Throwable) { - emitInstrumentationError("http.instrumentation.emit_response_failed", "response_event", t) - } - } - - private fun emitFailureEvent( - request: Request, - redactedUrl: String, - cause: Throwable, - elapsedMs: Double, - requestBody: LoggableRequestBody?, - ) { - if (options.logLevel == HttpLogLevel.NONE) return - try { - val ev = - logger.atWarning() - .event("http.response") - .field("http.request.method", request.method.name) - .field("url.full", redactedUrl) - .field("error.type", cause.javaClass.simpleName ?: "Throwable") - .field("http.response.duration_ms", elapsedMs) - .cause(cause) - if (shouldCaptureBody() && requestBody != null) { - val preview = requestBody.snapshot(options.bodyPreviewMaxBytes) - ev.field("request.body.size", preview.size.toLong()) - .field("request.body.preview", BodyPreview.render(preview, requestBody.mediaType())) - } - ev.log() - } catch (t: Throwable) { - emitInstrumentationError("http.instrumentation.emit_failure_failed", "failure_event", t) - } - } - - private fun appendHeadersFields( - ev: LoggingEvent, - headers: Headers, - prefix: String, - ) { - // Iterate the headers actually present rather than the allow-list — the allow-list is - // usually larger than the headers on any one request. - for ((nameLower, values) in headers.entries()) { - val typed = HttpHeaderName.fromString(nameLower) - when { - options.allowedHeaderNames.contains(typed) -> - ev.field(prefix + nameLower, joinHeaderValues(values)) - options.isRedactedHeaderNamesLoggingEnabled -> - ev.field(prefix + nameLower, "REDACTED") - // else: silently omit - } - } - } - - private fun joinHeaderValues(values: List): String = - if (values.size == 1) values[0] else values.joinToString(", ") - - private fun safeRedact(request: Request): String = - try { - UrlRedactor.redact(request.url, options.allowedQueryParamNames) - } catch (t: Throwable) { - "[malformed url]" - } - - private fun spanAttributes( - request: Request, - redactedUrl: String, - ): Map = - mapOf( - "http.request.method" to request.method.name, - "url.full" to redactedUrl, - ) - - private fun recordMetrics( - request: Request, - statusCode: Int, - elapsedMs: Double, - errorType: String?, - ) { - val attrs: Map = - if (errorType != null) { - mapOf( - "http.request.method" to request.method.name, - "error.type" to errorType, - ) - } else { - mapOf( - "http.request.method" to request.method.name, - "http.response.status_code" to statusCode, - ) - } - requestCounter.add(1L, attrs) - latencyHistogram.record(elapsedMs, attrs) - } - - private fun elapsedMillis(startNanos: Long): Double = (clock.monotonic() - startNanos) / NANOS_PER_MILLI_DOUBLE - - private fun emitInstrumentationError( - event: String, - phase: String, - t: Throwable, - ) { - // Best-effort secondary log. If this also throws, swallow — we're inside the logging - // path already, and a thrown exception would corrupt the outer caller's flow. - try { - logger.atWarning() - .event(event) - .field("error.phase", phase) - .field("error.type", t.javaClass.simpleName ?: "Throwable") - .cause(t) - .log() - } catch (_: Throwable) { - // Intentionally swallowed — logging is best-effort. - } - } - - private companion object { - // Nanoseconds in one millisecond, expressed as Double so the division returns - // millisecond fractions (e.g. 1.234 ms) for high-resolution latency histograms. - private const val NANOS_PER_MILLI_DOUBLE = 1_000_000.0 - } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpInstrumentationOptions.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpInstrumentationOptions.kt index 6e7b7ee7..224fb16f 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpInstrumentationOptions.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpInstrumentationOptions.kt @@ -33,11 +33,12 @@ import org.dexpace.sdk.core.instrumentation.metrics.NoopMeter * - **Response body**: at most [bodyPreviewMaxBytes] bytes are buffered. A body within the cap * is fully captured and stays repeatable; a larger body still streams in full to the caller * (the wrapper replays the captured prefix then continues from the live tail) while only the - * preview occupies the heap. In the **sync** step the bounded drain happens eagerly inside - * the step. In the **async** step the bounded drain runs on the future-completion thread, so - * it is **skipped for unknown-length (streaming) bodies** (`contentLength() < 0`) — those - * stream to the caller unwrapped with no body preview, so a slow/idle producer never blocks - * the completion thread. + * preview occupies the heap. Both steps **skip body capture for unknown-length (streaming) + * bodies** (`contentLength() < 0`): those stream to the caller unwrapped with no body preview. + * The bounded drain runs on whichever thread completes the call — the caller's thread in the + * sync step, the future-completion thread in the async step — and draining an unknown-length + * body could block that thread on a slow/idle producer (SSE, long-poll, chunked trickle), + * stalling time-to-first-byte. Known-length bodies keep the bounded preview. * - **Request body**: the request-side tap is likewise capped at [bodyPreviewMaxBytes], so a * large (e.g. multi-GB file) upload mirrors only a bounded preview into memory while the full * payload streams zero-copy to the transport. @@ -51,10 +52,13 @@ import org.dexpace.sdk.core.instrumentation.metrics.NoopMeter * Because the capture is a bounded preview, the logged `response.body.size` / * `response.body.preview` fields describe the **captured preview**, not necessarily the full * body: for a body larger than [bodyPreviewMaxBytes] the consumer still receives every byte - * while those fields reflect only the preview prefix. The separate `response.content.length` - * field carries the body's true length when the origin declared one. See - * `docs/http-body-logging-and-concurrency.md` ("Logged body size vs. the body the consumer - * receives"). + * while those fields reflect only the preview prefix. To make the true size observable, the + * event also carries `response.body.actual_size` (the full body length, emitted when known) and + * `response.body.preview_truncated` (`true` when the preview is only a prefix). The request body + * carries the matching `request.body.actual_size` / `request.body.preview_truncated` fields. The + * separate `response.content.length` field still carries the body's true length when the origin + * declared one. See `docs/http-body-logging-and-concurrency.md` ("Logged body size vs. the body + * the consumer receives"). * * @property bodyPreviewMaxBytes Upper bound, in bytes, on the in-memory body capture under * [HttpLogLevel.BODY_AND_HEADERS]. Bounds the preview, not the body the consumer sees; the diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationEmitters.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationEmitters.kt new file mode 100644 index 00000000..289316d6 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationEmitters.kt @@ -0,0 +1,372 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.http.common.Headers +import org.dexpace.sdk.core.http.common.HttpHeaderName +import org.dexpace.sdk.core.http.request.LoggableRequestBody +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.LoggableResponseBody +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.instrumentation.ClientLogger +import org.dexpace.sdk.core.instrumentation.LoggingEvent +import org.dexpace.sdk.core.instrumentation.UrlRedactor +import org.dexpace.sdk.core.instrumentation.metrics.DoubleHistogram +import org.dexpace.sdk.core.instrumentation.metrics.LongCounter +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.core.util.Clock + +/** + * Shared emit / redact / preview / metrics logic for the sync ([DefaultInstrumentationStep]) and + * async ([DefaultAsyncInstrumentationStep]) instrumentation steps. Both steps own the same + * request/response/failure event shape, header redaction, body wrapping, and metric instruments; + * this class is the single home for that logic so the two steps differ only in how they thread the + * downstream call (synchronous `process` vs. a `CompletableFuture` continuation). + * + * ## Body-size fields + * + * For each captured body the emitted event carries three size-related fields: + * + * - `*.body.size` — the size of the **captured preview** (`min(actualSize, bodyPreviewMaxBytes)`). + * Preserved for backwards compatibility with existing dashboards. + * - `*.body.actual_size` — the body's **true** size when it is known (the request body's declared + * `contentLength()`, or the response body's full length); omitted when the size is unknown + * (a streaming / chunked body with `contentLength() < 0`). + * - `*.body.preview_truncated` — `true` when the preview is only a prefix of a larger body, so a + * consumer can tell an 8 KiB body from an 8 GB one without reading the bytes. + * + * ## Unknown-length response bodies + * + * [wrapResponseForLogging] skips body capture entirely for an unknown-length + * (`contentLength() < 0`) response body. The bounded drain would otherwise block the draining + * thread — the caller's thread in the sync step, the future-completion thread in the async step — + * on a slow / idle producer (SSE, long-poll, chunked trickle), stalling time-to-first-byte. Such a + * body streams to the caller unwrapped, so the `http.response` event still carries headers, status, + * and `response.content.length = -1`, but no body preview. + * + * ## Best-effort logging + * + * Every emit method catches its own failures and re-emits them as an + * `http.instrumentation.*` warning — a logging failure never fails the request. + * + * Stateless after construction (the meter instruments are reused). Safe to share across concurrent + * requests. + */ +internal class InstrumentationEmitters( + private val options: HttpInstrumentationOptions, + private val clock: Clock, + private val logger: ClientLogger, +) { + // Lazily constructed so a step instance never installed in a pipeline doesn't pay + // the counter/histogram registration cost. `PUBLICATION` mode avoids the synchronized + // block that the default `SYNCHRONIZED` mode uses for first-read coordination — that + // monitor would pin a virtual-thread carrier during init (see CLAUDE.md "ReentrantLock + // over synchronized"). With OTel meter implementations being idempotent on register-by- + // name, racing initializers cost at most one redundant registration that is discarded. + private val requestCounter: LongCounter by lazy(LazyThreadSafetyMode.PUBLICATION) { + options.meter.counter( + name = "http.client.request.count", + description = "Total HTTP requests sent through this pipeline.", + unit = "{request}", + ) + } + private val latencyHistogram: DoubleHistogram by lazy(LazyThreadSafetyMode.PUBLICATION) { + options.meter.histogram( + name = "http.client.request.duration", + description = "End-to-end duration of the downstream pipeline per request.", + unit = "ms", + ) + } + + fun shouldCaptureBody(): Boolean = options.logLevel == HttpLogLevel.BODY_AND_HEADERS + + /** + * Wraps [request]'s body in a bounded [LoggableRequestBody] when body capture is enabled, + * returning the (possibly rewritten) outgoing request and the wrapper (or `null` when capture + * is disabled or the request has no body). The tap is capped at `bodyPreviewMaxBytes` so a + * multi-GB upload mirrors only a bounded preview into memory while the full payload still + * streams to the transport. + */ + fun wrapRequestBody(request: Request): Pair { + val requestBody = request.body + val wrapped = + if (shouldCaptureBody() && requestBody != null) { + LoggableRequestBody.bounded(requestBody, Io.provider, options.bodyPreviewMaxBytes.toLong()) + } else { + null + } + val outgoing = + if (wrapped != null) { + request.newBuilder().body(wrapped).build() + } else { + request + } + return outgoing to wrapped + } + + /** + * Wraps [response]'s body in a bounded [LoggableResponseBody] and forces the bounded drain so a + * preview is available to log. An unknown-length (`contentLength() < 0`) body is left unwrapped + * — draining it could block the draining thread on a slow producer (see the class KDoc), so + * such a body streams to the caller untouched with no preview. Known-length bodies keep the + * bounded preview. + */ + fun wrapResponseForLogging(response: Response): Response { + val responseBody = response.body + if (!shouldCaptureBody() || responseBody == null) return response + // Skip capture for an unknown-length (streaming) body: the bounded drain runs on the + // draining thread (caller thread in sync, completion thread in async) and would block on a + // slow / idle producer, stalling time-to-first-byte. The body streams to the caller + // unwrapped with no preview; the event still carries response.content.length = -1. + if (responseBody.contentLength() < 0L) return response + // Bound the in-memory capture to the preview size. The full body still streams to the + // caller via the wrapper's live tail; only a preview prefix is buffered. + val wrapped = LoggableResponseBody.bounded(responseBody, Io.provider, options.bodyPreviewMaxBytes.toLong()) + // Force drain so we have bytes to log. Done here (not in the event emit) so a logging + // failure can't mask the drain error from the caller — they still get the wrapped body + // with the cached exception surfaced on source(). + try { + wrapped.snapshot(options.bodyPreviewMaxBytes) + } catch (t: Throwable) { + // Drain itself records the exception on the wrapper; capture for emit, but don't + // let it propagate — the caller will see it on next source() call. + logger.atWarning() + .event("http.instrumentation.response_drain_failed") + .field("error.type", t.javaClass.simpleName ?: "Throwable") + .cause(t) + .log() + } + return response.newBuilder().body(wrapped).build() + } + + fun emitRequestEvent( + request: Request, + redactedUrl: String, + ) { + if (options.logLevel == HttpLogLevel.NONE) return + try { + val ev = + logger.atInfo() + .event("http.request") + .field("http.request.method", request.method.name) + .field("url.full", redactedUrl) + .field("request.content.length", request.body?.contentLength() ?: -1L) + appendHeadersFields(ev, request.headers, prefix = "http.request.header.") + ev.log() + } catch (t: Throwable) { + emitInstrumentationError("http.instrumentation.emit_request_failed", "request_event", t) + } + } + + fun emitResponseEvent( + request: Request, + response: Response, + redactedUrl: String, + elapsedMs: Double, + requestBody: LoggableRequestBody?, + ) { + if (options.logLevel == HttpLogLevel.NONE) return + try { + val ev = + logger.atInfo() + .event("http.response") + .field("http.request.method", request.method.name) + .field("url.full", redactedUrl) + .field("http.response.status_code", response.status.code.toLong()) + .field("http.response.duration_ms", elapsedMs) + .field("response.content.length", response.body?.contentLength() ?: -1L) + appendHeadersFields(ev, response.headers, prefix = "http.response.header.") + if (shouldCaptureBody()) { + if (requestBody != null) { + appendRequestBodyFields(ev, requestBody) + } + val responseBody = response.body + if (responseBody is LoggableResponseBody) { + appendResponseBodyFields(ev, responseBody) + } + } + ev.log() + } catch (t: Throwable) { + emitInstrumentationError("http.instrumentation.emit_response_failed", "response_event", t) + } + } + + fun emitFailureEvent( + request: Request, + redactedUrl: String, + cause: Throwable, + elapsedMs: Double, + requestBody: LoggableRequestBody?, + ) { + if (options.logLevel == HttpLogLevel.NONE) return + try { + val ev = + logger.atWarning() + .event("http.response") + .field("http.request.method", request.method.name) + .field("url.full", redactedUrl) + .field("error.type", cause.javaClass.simpleName ?: "Throwable") + .field("http.response.duration_ms", elapsedMs) + .cause(cause) + if (shouldCaptureBody() && requestBody != null) { + appendRequestBodyFields(ev, requestBody) + } + ev.log() + } catch (t: Throwable) { + emitInstrumentationError("http.instrumentation.emit_failure_failed", "failure_event", t) + } + } + + /** + * Appends the request-body size / preview fields. `request.body.size` is the captured-preview + * size; `request.body.actual_size` carries the body's declared length when known (`>= 0`); and + * `request.body.preview_truncated` flags a preview that is only a prefix of a larger body. + */ + private fun appendRequestBodyFields( + ev: LoggingEvent, + requestBody: LoggableRequestBody, + ) { + val preview = requestBody.snapshot(options.bodyPreviewMaxBytes) + ev.field("request.body.size", preview.size.toLong()) + .field("request.body.preview", BodyPreview.render(preview, requestBody.mediaType())) + val actualSize = requestBody.contentLength() + if (actualSize >= 0L) { + ev.field("request.body.actual_size", actualSize) + } + ev.field("request.body.preview_truncated", isPreviewTruncated(actualSize, preview.size)) + } + + /** + * Appends the response-body size / preview fields. `response.body.size` is the captured-preview + * size; `response.body.actual_size` carries the full body length when it is known; and + * `response.body.preview_truncated` flags a preview that is only a prefix of a larger body. The + * truncation state comes from [LoggableResponseBody.isFullyCaptured], so it is derived from the + * existing capture without re-reading the body. + */ + private fun appendResponseBodyFields( + ev: LoggingEvent, + responseBody: LoggableResponseBody, + ) { + val preview = responseBody.snapshot(options.bodyPreviewMaxBytes) + ev.field("response.body.size", preview.size.toLong()) + .field("response.body.preview", BodyPreview.render(preview, responseBody.mediaType())) + val actualSize = responseBody.contentLength() + if (actualSize >= 0L) { + ev.field("response.body.actual_size", actualSize) + } + // The wrapper fully captured the body only when it fit within the cap; otherwise the + // preview is a prefix of a larger body and is therefore truncated. + ev.field("response.body.preview_truncated", !responseBody.isFullyCaptured) + responseBody.captureException?.let { + ev.field("response.body.drain_error", it.javaClass.simpleName ?: "Throwable") + } + } + + /** + * Decides whether a captured request-body preview is a truncated prefix. When the body's + * declared length is known, the preview is truncated iff that length exceeds the preview cap. + * When the length is unknown (`< 0`), fall back to whether the preview filled the cap — a + * shorter preview means the whole body was captured. + */ + private fun isPreviewTruncated( + actualSize: Long, + previewSize: Int, + ): Boolean = + if (actualSize >= 0L) { + actualSize > options.bodyPreviewMaxBytes.toLong() + } else { + previewSize >= options.bodyPreviewMaxBytes + } + + private fun appendHeadersFields( + ev: LoggingEvent, + headers: Headers, + prefix: String, + ) { + // Iterate the headers actually present rather than the allow-list — the allow-list is + // usually larger than the headers on any one request. + for ((nameLower, values) in headers.entries()) { + val typed = HttpHeaderName.fromString(nameLower) + when { + options.allowedHeaderNames.contains(typed) -> + ev.field(prefix + nameLower, joinHeaderValues(values)) + options.isRedactedHeaderNamesLoggingEnabled -> + ev.field(prefix + nameLower, "REDACTED") + // else: silently omit + } + } + } + + private fun joinHeaderValues(values: List): String = + if (values.size == 1) values[0] else values.joinToString(", ") + + fun safeRedact(request: Request): String = + try { + UrlRedactor.redact(request.url, options.allowedQueryParamNames) + } catch (t: Throwable) { + "[malformed url]" + } + + fun spanAttributes( + request: Request, + redactedUrl: String, + ): Map = + mapOf( + "http.request.method" to request.method.name, + "url.full" to redactedUrl, + ) + + fun recordMetrics( + request: Request, + statusCode: Int, + elapsedMs: Double, + errorType: String?, + ) { + val attrs: Map = + if (errorType != null) { + mapOf( + "http.request.method" to request.method.name, + "error.type" to errorType, + ) + } else { + mapOf( + "http.request.method" to request.method.name, + "http.response.status_code" to statusCode, + ) + } + requestCounter.add(1L, attrs) + latencyHistogram.record(elapsedMs, attrs) + } + + fun elapsedMillis(startNanos: Long): Double = (clock.monotonic() - startNanos) / NANOS_PER_MILLI_DOUBLE + + private fun emitInstrumentationError( + event: String, + phase: String, + t: Throwable, + ) { + // Best-effort secondary log. If this also throws, swallow — we're inside the logging + // path already, and a thrown exception would corrupt the outer caller's flow. + try { + logger.atWarning() + .event(event) + .field("error.phase", phase) + .field("error.type", t.javaClass.simpleName ?: "Throwable") + .cause(t) + .log() + } catch (_: Throwable) { + // Intentionally swallowed — logging is best-effort. + } + } + + private companion object { + // Nanoseconds in one millisecond, expressed as Double so the division returns + // millisecond fractions (e.g. 1.234 ms) for high-resolution latency histograms. + private const val NANOS_PER_MILLI_DOUBLE = 1_000_000.0 + } +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBody.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBody.kt index 027d8ec1..621b1ac4 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBody.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBody.kt @@ -183,6 +183,15 @@ public class LoggableResponseBody */ public val captureException: Throwable? get() = drainError + /** + * Internal seam for instrumentation: true once the whole body fit within the capture cap + * and was drained in full. False before any drain runs, or once the cap was hit with bytes + * still pending (the over-cap path). Instrumentation uses this to decide whether the logged + * preview is the complete body or a truncated prefix, without re-reading the body. + */ + @get:JvmSynthetic + internal val isFullyCaptured: Boolean get() = fullyCaptured + @Throws(IOException::class) override fun close() { lock.withLock { diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncInstrumentationStepTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncInstrumentationStepTest.kt index a17709da..e3de2f2a 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncInstrumentationStepTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncInstrumentationStepTest.kt @@ -202,6 +202,42 @@ class AsyncInstrumentationStepTest { response.close() } + @Test + fun `over-cap response body reports true actual_size and marks the preview truncated`() { + val fakeSlf4j = FakeSlf4jLogger("test.async.instrumentation.size") + val clientLogger = ClientLogger.forTesting(fakeSlf4j) + val payload = "z".repeat(100) + val fakeAsync = + AsyncHttpClient { request -> + CompletableFuture.completedFuture(okResponseWithBody(request, 200, payload)) + } + val pipeline = + AsyncHttpPipelineBuilder(fakeAsync) + .append( + DefaultAsyncInstrumentationStep( + options = + HttpInstrumentationOptions( + logLevel = HttpLogLevel.BODY_AND_HEADERS, + bodyPreviewMaxBytes = 10, + ), + logger = clientLogger, + ), + ) + .build() + + val response = pipeline.sendAsync(getRequest("https://api.example.com/data")).join() + response.close() + + val event = + fakeSlf4j.records + .last { rec -> rec.keyValues.any { it.key == "event" && it.value == "http.response" } } + .keyValues + .associate { it.key to it.value } + assertEquals(10L, event["response.body.size"], "body.size is the capped preview size") + assertEquals(100L, event["response.body.actual_size"], "actual_size is the true body length") + assertEquals(true, event["response.body.preview_truncated"]) + } + @Test fun `unknown-length response body is NOT wrapped in the async step`() { // The async drain runs on the completion thread; an unknown-length (streaming) body diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationStepTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationStepTest.kt index 82eca054..3048c8c3 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationStepTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/InstrumentationStepTest.kt @@ -480,7 +480,9 @@ class InstrumentationStepTest { object : ResponseBody() { override fun mediaType(): MediaType? = MediaType.parse("text/plain") - override fun contentLength(): Long = -1 + // Known length so the sync step still wraps and drains it (an unknown-length body + // is now skipped — see the streaming TTFB test below). + override fun contentLength(): Long = 16 override fun source(): BufferedSource = throw IOException("simulated drain failure") @@ -902,8 +904,149 @@ class InstrumentationStepTest { assertEquals(1, meter.counters.single().records.size) } + @Test + fun `unknown-length response body is NOT wrapped in the sync step`() { + // An unknown-length (streaming) body (contentLength() < 0) must be left unwrapped: the + // bounded drain would otherwise run on the caller's thread and block on a slow / idle + // producer (SSE, long-poll, chunked trickle), stalling time-to-first-byte. The body + // streams to the caller untouched with no preview. + val streamingBody = + object : ResponseBody() { + override fun mediaType(): MediaType? = MediaType.parse("text/plain") + + override fun contentLength(): Long = -1L + + override fun source(): BufferedSource = Io.provider.source("streamed".toByteArray()) + + override fun close() { /* no-op */ } + } + val client = + object : org.dexpace.sdk.core.client.HttpClient { + override fun execute(request: Request): Response = + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .headers(Headers.Builder().build()) + .body(streamingBody) + .build() + } + val pipeline = + HttpPipelineBuilder(client) + .append( + DefaultInstrumentationStep(HttpInstrumentationOptions(logLevel = HttpLogLevel.BODY_AND_HEADERS)), + ) + .build() + + val response = pipeline.send(getRequest("https://api.example.com/stream")) + val body = response.body ?: fail("expected non-null body") + assertFalse( + body is LoggableResponseBody, + "unknown-length body must NOT be wrapped (no eager drain on the caller thread)", + ) + // The original streaming body is passed through untouched. + assertEquals("streamed", body.source().readUtf8()) + response.close() + } + + @Test + fun `over-cap response body reports true actual_size and marks the preview truncated`() { + val fakeSlf4j = FakeSlf4jLogger("test.instrumentation") + val clientLogger = ClientLogger.forTesting(fakeSlf4j) + // A 100-byte known-length body with a 10-byte preview cap: the preview is a prefix and + // body.size saturates at the cap, but actual_size must carry the true length and + // preview_truncated must be true. + val payload = "x".repeat(100) + val fake = FakeHttpClient().enqueue { status(200).body(payload, MediaType.parse("text/plain")) } + val pipeline = + HttpPipelineBuilder(fake) + .append( + DefaultInstrumentationStep( + HttpInstrumentationOptions( + logLevel = HttpLogLevel.BODY_AND_HEADERS, + bodyPreviewMaxBytes = 10, + ), + FixedClock(), + clientLogger, + ), + ) + .build() + + val response = pipeline.send(getRequest("https://api.example.com/x")) + response.close() + + val event = lastResponseEvent(fakeSlf4j) + assertEquals(10L, event["response.body.size"], "body.size is the capped preview size") + assertEquals(100L, event["response.body.actual_size"], "actual_size is the true body length") + assertEquals(true, event["response.body.preview_truncated"]) + } + + @Test + fun `within-cap response body reports actual_size equal to size and is not truncated`() { + val fakeSlf4j = FakeSlf4jLogger("test.instrumentation") + val clientLogger = ClientLogger.forTesting(fakeSlf4j) + val fake = FakeHttpClient().enqueue { status(200).body("payload", MediaType.parse("text/plain")) } + val pipeline = + HttpPipelineBuilder(fake) + .append( + DefaultInstrumentationStep( + HttpInstrumentationOptions(logLevel = HttpLogLevel.BODY_AND_HEADERS), + FixedClock(), + clientLogger, + ), + ) + .build() + + val response = pipeline.send(getRequest("https://api.example.com/x")) + response.close() + + val event = lastResponseEvent(fakeSlf4j) + assertEquals(7L, event["response.body.size"]) + assertEquals(7L, event["response.body.actual_size"]) + assertEquals(false, event["response.body.preview_truncated"]) + } + + @Test + fun `over-cap request body reports true actual_size and marks the preview truncated`() { + val fakeSlf4j = FakeSlf4jLogger("test.instrumentation") + val clientLogger = ClientLogger.forTesting(fakeSlf4j) + val payload = "y".repeat(100) + val fake = FakeHttpClient().enqueue { status(200).body("ok", MediaType.parse("text/plain")) } + val draining = DrainingClient(fake) + val pipeline = + HttpPipelineBuilder(draining) + .append( + DefaultInstrumentationStep( + HttpInstrumentationOptions( + logLevel = HttpLogLevel.BODY_AND_HEADERS, + bodyPreviewMaxBytes = 10, + ), + FixedClock(), + clientLogger, + ), + ) + .build() + + val response = pipeline.send(postRequest("https://api.example.com/echo", payload)) + response.close() + + val event = lastResponseEvent(fakeSlf4j) + assertEquals(10L, event["request.body.size"], "body.size is the capped preview size") + assertEquals(100L, event["request.body.actual_size"], "actual_size is the declared body length") + assertEquals(true, event["request.body.preview_truncated"]) + } + // -- Helpers -------------------------------------------------------------------------------- + /** Returns the key/value fields of the most recent `http.response` event. */ + private fun lastResponseEvent(slf4j: FakeSlf4jLogger): Map { + val record = + slf4j.records.last { rec -> + rec.keyValues.any { it.key == "event" && it.value == "http.response" } + } + return record.keyValues.associate { it.key to it.value } + } + /** Extracts the `response.body.preview` field value from the latest `http.response` event. */ private fun responsePreview(slf4j: FakeSlf4jLogger): String = previewField(slf4j, "response.body.preview")