diff --git a/README.md b/README.md index 536306d7..57e1fe7f 100644 --- a/README.md +++ b/README.md @@ -256,7 +256,7 @@ See [docs/pipelines.md](docs/pipelines.md) for the step-author walkthrough. | `http.auth` | `Credential` sealed hierarchy (`KeyCredential`, `NamedKeyCredential`, `BearerToken`), `BearerTokenProvider`, `AuthScheme`, `AuthMetadata`, RFC 7235 challenge parser, `BasicChallengeHandler`, `DigestChallengeHandler`, `CompositeChallengeHandler`. | | `http.sse` | `ServerSentEventReader` (WHATWG spec), `ServerSentEvent`, `ServerSentEventListener`, `BufferedSource.readServerSentEvents()`. | | `http.paging` | `PagedIterable`, `PagedResponse`, `PagingOptions` with `byPage()` and `stream()` accessors. | -| `pagination` | `Paginator` (with a `maxPages` safety cap) over cursor / page-number / link-header `PaginationStrategy` implementations, plus `Page` / `SimplePage`. | +| `pagination` | `Paginator` (with a `maxPages` safety cap) over cursor / page-number / link-header `PaginationStrategy` implementations, plus `Page` / `SimplePage`. Token-style APIs use `CursorPaginationStrategy` with the query-param name set (e.g. `"page_token"`). | | `pipeline` | Recovery-aware primitives: `RequestPipeline`, `ResponsePipeline`, `ExecutionPipeline` over a sealed `ResponseOutcome`, with steps (`pipeline.step`, `pipeline.step.retry`) like `RetryStep`, `ResponseRecoveryStep`, `IdempotencyKeyStep`, `ClientIdentityStep`. | | `serde` | `Serde`, `Serializer`, `Deserializer` abstractions, `Tristate` (absent / null / present), and `SerdeException` (the unchecked failure adapters translate codec errors into). | | `io` | `Source`, `Sink`, `Buffer`, `BufferedSource`, `BufferedSink`, `IoProvider`, `Io`, `TeeSink`. | diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 9c5cb560..b4ec3744 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1960,12 +1960,25 @@ public abstract interface class org/dexpace/sdk/core/io/Source : java/io/Closeab } public final class org/dexpace/sdk/core/pagination/CursorPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy { - public fun (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V - public fun (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V - public synthetic fun (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lkotlin/jvm/functions/Function1;)V + public fun (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V + public synthetic fun (Lkotlin/jvm/functions/Function1;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public fun parse (Lorg/dexpace/sdk/core/http/response/Response;Lorg/dexpace/sdk/core/http/request/Request;)Lorg/dexpace/sdk/core/pagination/Page; } +public final class org/dexpace/sdk/core/pagination/CursorResult { + public fun (Ljava/util/List;Ljava/lang/String;)V + public final fun component1 ()Ljava/util/List; + public final fun component2 ()Ljava/lang/String; + public final fun copy (Ljava/util/List;Ljava/lang/String;)Lorg/dexpace/sdk/core/pagination/CursorResult; + public static synthetic fun copy$default (Lorg/dexpace/sdk/core/pagination/CursorResult;Ljava/util/List;Ljava/lang/String;ILjava/lang/Object;)Lorg/dexpace/sdk/core/pagination/CursorResult; + public fun equals (Ljava/lang/Object;)Z + public final fun getItems ()Ljava/util/List; + public final fun getNextCursor ()Ljava/lang/String; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class org/dexpace/sdk/core/pagination/LinkHeaderPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy { public fun (Lkotlin/jvm/functions/Function1;)V public fun (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationStrategy.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationStrategy.kt index 2d5bc9eb..9e5f2314 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationStrategy.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationStrategy.kt @@ -21,15 +21,22 @@ import org.dexpace.sdk.core.http.response.Response * - Page-N response: arbitrary body containing both the items and the next cursor. * - End of stream: response carries a `null` or absent next cursor. * - * The strategy is **stateless**: it relies on [itemsExtractor] and [cursorExtractor] - * lambdas to pull data out of the response, and on [RequestRebuilder] to mutate the - * initial request's URL with the new cursor query parameter. + * The strategy is **stateless**: it relies on a single [extractor] to pull both the items + * and the next cursor out of the response in one pass — see [CursorResult] — and rewrites the + * initial request's URL, setting the cursor query parameter to the new value, for the next + * page. + * + * ## Single read + * + * A response body is single-use. Because cursor APIs carry the items and the next cursor in + * the same payload, the extractor reads the body **once** and returns both via a + * [CursorResult]. This avoids the double-drain trap of pulling items and cursor with two + * independent `(Response) -> …` lambdas, which forces either a failed second read or a + * per-response cache to work around it. * * @param T Element type extracted from the response. - * @property itemsExtractor Reads the list of items from the response. Called once per - * page; must drain the response body synchronously. - * @property cursorExtractor Reads the next cursor from the response, or returns `null` - * if there are no more pages. + * @property extractor Reads the items and next cursor from the response in a single pass. + * Called once per page; must drain the response body synchronously and return both pieces. * @property cursorQueryParam Query parameter name used to send the cursor (default * `"cursor"`). Servers vary; common alternatives include `"after"`, `"next"`, * `"pageCursor"`. @@ -37,16 +44,15 @@ import org.dexpace.sdk.core.http.response.Response public class CursorPaginationStrategy @JvmOverloads constructor( - private val itemsExtractor: (Response) -> List, - private val cursorExtractor: (Response) -> String?, + private val extractor: (Response) -> CursorResult, private val cursorQueryParam: String = "cursor", ) : PaginationStrategy { override fun parse( response: Response, initialRequest: Request, ): Page { - val items: List = itemsExtractor(response) - val nextCursor: String? = cursorExtractor(response) + val result: CursorResult = extractor(response) + val nextCursor: String? = result.nextCursor val hasNext: Boolean = !nextCursor.isNullOrEmpty() val nextRequest: Request? = if (hasNext) { @@ -54,6 +60,6 @@ public class CursorPaginationStrategy } else { null } - return SimplePage(items = items, hasNext = hasNext, nextRequest = nextRequest) + return SimplePage(items = result.items, hasNext = hasNext, nextRequest = nextRequest) } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorResult.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorResult.kt new file mode 100644 index 00000000..2a459311 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorResult.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.http.response.Response + +/** + * Result of a single-pass cursor extraction: the items on a page together with the cursor + * for the next page, pulled from one read of the [Response]. + * + * Cursor APIs return both the items and the next cursor in the same payload, so reading + * them with two separate `(Response) -> …` lambdas means draining the body twice. A response + * body is single-use, so the second drain either fails or — at best — is worked around with + * a per-response cache. `CursorResult` lets a [CursorPaginationStrategy] extractor parse the + * response once and hand back both pieces, keeping the public API honest about the + * single-read constraint. + * + * ## Thread-safety + * + * The [items] list passed at construction is stored by reference, so a caller that retains a + * mutable list can mutate it afterwards and the change will be visible through this result. + * The bundled extractor path always builds a fresh list per page, so values produced by the + * SDK are effectively immutable; hand-built results that share a mutable list should copy it + * (`items.toList()`) before constructing. + * + * @param T Element type carried in [items]. + * @property items Items on the page, in server-defined order. Never `null`; may be empty. + * @property nextCursor Opaque cursor to send on the next request, or `null` (or empty) when + * the response signals end-of-stream. + */ +public data class CursorResult( + public val items: List, + public val nextCursor: String?, +) diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationTest.kt index 0222da02..3a4a8c8d 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationTest.kt @@ -10,7 +10,6 @@ package org.dexpace.sdk.core.pagination import org.dexpace.sdk.core.http.request.Method import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response -import java.util.IdentityHashMap import java.util.stream.Collectors import kotlin.test.BeforeTest import kotlin.test.Test @@ -29,34 +28,18 @@ class CursorPaginationTest { .build() /** - * Parses the body format `items=\ncursor=` into a (items, cursor) - * pair. Reads the body exactly once. + * Single-pass extractor for the body format `items=\ncursor=`. Reads + * the body exactly once and returns both the items and the next cursor as a + * [CursorResult], so there is no double-drain of the single-use response body. */ - private fun parsePayload(resp: Response): Pair, String?> { + private val extractor: (Response) -> CursorResult = { resp -> val body = resp.body!!.source().use { it.readUtf8() } val itemsLine = body.lineSequence().firstOrNull { it.startsWith("items=") } ?: "items=" val cursorLine = body.lineSequence().firstOrNull { it.startsWith("cursor=") } ?: "cursor=" val itemsRaw = itemsLine.removePrefix("items=") val cursorRaw = cursorLine.removePrefix("cursor=") val items = if (itemsRaw.isEmpty()) emptyList() else itemsRaw.split(",") - val cursor: String? = cursorRaw.ifEmpty { null } - return Pair(items, cursor) - } - - /** - * Pair of extractors that share a per-Response identity-keyed cache so the - * single-use body is read exactly once per page even though the strategy's contract - * splits items + cursor into two calls. - */ - private fun buildCachedExtractors(): Pair<(Response) -> List, (Response) -> String?> { - val cache: MutableMap, String?>> = IdentityHashMap() - val items: (Response) -> List = { r -> - cache.getOrPut(r) { parsePayload(r) }.first - } - val cursor: (Response) -> String? = { r -> - cache.getOrPut(r) { parsePayload(r) }.second - } - return Pair(items, cursor) + CursorResult(items, cursorRaw.ifEmpty { null }) } @Test @@ -72,8 +55,7 @@ class CursorPaginationTest { textResponse(req, "items=g,h,i\ncursor=") } - val (items, cursor) = buildCachedExtractors() - val strategy = CursorPaginationStrategy(items, cursor, cursorQueryParam = "cursor") + val strategy = CursorPaginationStrategy(extractor, cursorQueryParam = "cursor") val paginator = Paginator(client, initialRequest(), strategy) val collected: List = paginator.iterateAll().toList() @@ -96,8 +78,7 @@ class CursorPaginationTest { client.on("https://api.example.com/items") { req -> textResponse(req, "items=only-1,only-2\ncursor=") } - val (items, cursor) = buildCachedExtractors() - val strategy = CursorPaginationStrategy(items, cursor, "cursor") + val strategy = CursorPaginationStrategy(extractor, "cursor") val paginator = Paginator(client, initialRequest(), strategy) assertEquals(listOf("only-1", "only-2"), paginator.iterateAll().toList()) assertEquals(1, client.callCount) @@ -123,8 +104,7 @@ class CursorPaginationTest { .addHeader("Authorization", "Bearer xyz") .build() - val (items, cursor) = buildCachedExtractors() - val strategy = CursorPaginationStrategy(items, cursor) + val strategy = CursorPaginationStrategy(extractor) val paginator = Paginator(client, authRequest, strategy) assertEquals(listOf("a", "b"), paginator.iterateAll().toList()) } @@ -139,8 +119,7 @@ class CursorPaginationTest { textResponse(req, "items=3,4\ncursor=") } - val (items, cursor) = buildCachedExtractors() - val strategy = CursorPaginationStrategy(items, cursor) + val strategy = CursorPaginationStrategy(extractor) val paginator = Paginator(client, initialRequest(), strategy) val streamed: List = paginator.streamAll().collect(Collectors.toList()) assertEquals(listOf("1", "2", "3", "4"), streamed) @@ -149,50 +128,25 @@ class CursorPaginationTest { @Test fun `cursor with special characters is URL encoded in next request`() { // Opaque cursors may contain `=` `+` `/` characters (base64) — the rebuilder must - // URL-encode them so the server sees the original value unmangled. + // URL-encode them so the server sees the original value unmangled. A custom query + // param name (e.g. `page_token`) covers token-style APIs that reuse this strategy. val rawCursor = "a+b/c=" val encoded = "a%2Bb%2Fc%3D" val client = StubHttpClient() client.on("https://api.example.com/items") { req -> textResponse(req, "items=one\ncursor=$rawCursor") } - client.on("https://api.example.com/items?cursor=$encoded") { req -> - textResponse(req, "items=two\ncursor=") - } - - val (items, cursor) = buildCachedExtractors() - val strategy = CursorPaginationStrategy(items, cursor) - val paginator = Paginator(client, initialRequest(), strategy) - assertEquals(listOf("one", "two"), paginator.iterateAll().toList()) - assertEquals( - listOf( - "https://api.example.com/items", - "https://api.example.com/items?cursor=$encoded", - ), - client.receivedUrls, - ) - } - - @Test - fun `custom query-param name is used for the next-page cursor`() { - // Token-style APIs (next_page_token, pageToken, …) are served by setting - // cursorQueryParam; the next request must carry the cursor under that name. - val client = StubHttpClient() - client.on("https://api.example.com/items") { req -> - textResponse(req, "items=one\ncursor=tok1") - } - client.on("https://api.example.com/items?page_token=tok1") { req -> + client.on("https://api.example.com/items?page_token=$encoded") { req -> textResponse(req, "items=two\ncursor=") } - val (items, cursor) = buildCachedExtractors() - val strategy = CursorPaginationStrategy(items, cursor, cursorQueryParam = "page_token") + val strategy = CursorPaginationStrategy(extractor, cursorQueryParam = "page_token") val paginator = Paginator(client, initialRequest(), strategy) assertEquals(listOf("one", "two"), paginator.iterateAll().toList()) assertEquals( listOf( "https://api.example.com/items", - "https://api.example.com/items?page_token=tok1", + "https://api.example.com/items?page_token=$encoded", ), client.receivedUrls, ) diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorSingleReadTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorSingleReadTest.kt new file mode 100644 index 00000000..20201ef1 --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorSingleReadTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNull + +/** + * Exercises the single-read extractor on [CursorPaginationStrategy]: items and the next + * cursor are pulled from one pass over the response, so a single-use body is read exactly + * once per page. + */ +class CursorSingleReadTest { + @BeforeTest + fun setup() { + installIoProvider() + } + + private fun initialRequest(): Request = + Request.builder() + .url("https://api.example.com/items") + .method(Method.GET) + .build() + + /** + * Parses the body format `items=\ncursor=` into a [CursorResult]. + * Reads the body exactly once and reports each read through [reads]. + */ + private fun singleReadExtractor(reads: AtomicInteger): (Response) -> CursorResult = + { resp -> + reads.incrementAndGet() + val body = resp.body!!.source().use { it.readUtf8() } + val itemsLine = body.lineSequence().firstOrNull { it.startsWith("items=") } ?: "items=" + val cursorLine = body.lineSequence().firstOrNull { it.startsWith("cursor=") } ?: "cursor=" + val itemsRaw = itemsLine.removePrefix("items=") + val cursorRaw = cursorLine.removePrefix("cursor=") + val items = if (itemsRaw.isEmpty()) emptyList() else itemsRaw.split(",") + CursorResult(items, cursorRaw.ifEmpty { null }) + } + + @Test + fun `single read extractor reads the body once per page`() { + val client = StubHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse(req, "items=a,b,c\ncursor=abc") + } + client.on("https://api.example.com/items?cursor=abc") { req -> + textResponse(req, "items=d,e,f\ncursor=") + } + + val reads = AtomicInteger(0) + val strategy = CursorPaginationStrategy(singleReadExtractor(reads)) + val paginator = Paginator(client, initialRequest(), strategy) + + val collected: List = paginator.iterateAll().toList() + + assertEquals(listOf("a", "b", "c", "d", "e", "f"), collected) + // Two pages fetched, two HTTP calls, and exactly one body read per page — no double drain. + assertEquals(2, client.callCount) + assertEquals(2, reads.get()) + } + + @Test + fun `single read extractor parses each response exactly once`() { + // A response whose body cannot be read twice: the second source() call throws. This + // makes a double-drain a hard failure rather than a silently-cached read. + val parses = AtomicInteger(0) + val client = StubHttpClient() + client.on("https://api.example.com/items") { req -> + singleUseResponse(req, "items=x,y\ncursor=") + } + + val parsing: (Response) -> CursorResult = { resp -> + parses.incrementAndGet() + val body = resp.body!!.source().use { it.readUtf8() } + val itemsRaw = body.substringAfter("items=").substringBefore('\n') + CursorResult(itemsRaw.split(","), null) + } + val strategy = CursorPaginationStrategy(parsing) + val paginator = Paginator(client, initialRequest(), strategy) + + assertEquals(listOf("x", "y"), paginator.iterateAll().toList()) + assertEquals(1, parses.get()) + assertEquals(1, client.callCount) + } + + @Test + fun `CursorResult carries items and next cursor`() { + val present = CursorResult(listOf("one", "two"), "next-cursor") + assertEquals(listOf("one", "two"), present.items) + assertEquals("next-cursor", present.nextCursor) + + val end = CursorResult(listOf("last"), null) + assertEquals(listOf("last"), end.items) + assertNull(end.nextCursor) + } + + @Test + fun `single read extractor honours a custom cursor query param`() { + val client = StubHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse(req, "items=one\ncursor=tok") + } + client.on("https://api.example.com/items?page_token=tok") { req -> + textResponse(req, "items=two\ncursor=") + } + + val reads = AtomicInteger(0) + val strategy = + CursorPaginationStrategy(singleReadExtractor(reads), cursorQueryParam = "page_token") + val paginator = Paginator(client, initialRequest(), strategy) + + assertEquals(listOf("one", "two"), paginator.iterateAll().toList()) + assertEquals( + listOf( + "https://api.example.com/items", + "https://api.example.com/items?page_token=tok", + ), + client.receivedUrls, + ) + } +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/PaginationTestSupport.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/PaginationTestSupport.kt index 01d727d2..9657ffa8 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/PaginationTestSupport.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/PaginationTestSupport.kt @@ -52,6 +52,44 @@ private fun stringResponseBody(content: String): ResponseBody { } } +/** + * Convenience: build a 200 OK response whose body may be drained exactly once. A second + * `source()` call throws, so any double-read of the body is a hard failure rather than a + * silently-served cache — used to prove a single-pass extractor really reads once. + */ +internal fun singleUseResponse( + request: Request, + body: String, +): Response = + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .headers(Headers.Builder().build()) + .body(singleUseResponseBody(body)) + .build() + +private fun singleUseResponseBody(content: String): ResponseBody { + val bytes = content.toByteArray(Charsets.UTF_8) + return object : ResponseBody() { + private var consumed = false + + override fun mediaType(): org.dexpace.sdk.core.http.common.MediaType? = null + + override fun contentLength(): Long = bytes.size.toLong() + + override fun source(): org.dexpace.sdk.core.io.BufferedSource { + check(!consumed) { "Response body already consumed: source() called twice." } + consumed = true + return Io.provider.source(bytes) + } + + override fun close() { + consumed = true + } + } +} + /** Install the Okio IoProvider so test response bodies are readable. */ internal fun installIoProvider() { Io.installProvider(OkioIoProvider)