Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ See [docs/pipelines.md](docs/pipelines.md) for the step-author walkthrough.
| `http.auth` | `Credential` sealed hierarchy (`KeyCredential`, `NamedKeyCredential`, `BearerToken`), `BearerTokenProvider`, `AuthScheme`, `AuthMetadata`, RFC 7235 challenge parser, `BasicChallengeHandler`, `DigestChallengeHandler`, `CompositeChallengeHandler`. |
| `http.sse` | `ServerSentEventReader` (WHATWG spec), `ServerSentEvent`, `ServerSentEventListener`, `BufferedSource.readServerSentEvents()`. |
| `http.paging` | `PagedIterable<T>`, `PagedResponse<T>`, `PagingOptions` with `byPage()` and `stream()` accessors. |
| `pagination` | `Paginator<T>` (with a `maxPages` safety cap) over cursor / page-number / link-header `PaginationStrategy` implementations, plus `Page<T>` / `SimplePage<T>`. |
| `pagination` | `Paginator<T>` (with a `maxPages` safety cap) over cursor / page-number / link-header `PaginationStrategy` implementations, plus `Page<T>` / `SimplePage<T>`. Token-style APIs use `CursorPaginationStrategy` with the query-param name set (e.g. `"page_token"`). |
| `pipeline` | Recovery-aware primitives: `RequestPipeline`, `ResponsePipeline`, `ExecutionPipeline` over a sealed `ResponseOutcome`, with steps (`pipeline.step`, `pipeline.step.retry`) like `RetryStep`, `ResponseRecoveryStep`, `IdempotencyKeyStep`, `ClientIdentityStep`. |
| `serde` | `Serde`, `Serializer`, `Deserializer` abstractions, `Tristate<T>` (absent / null / present), and `SerdeException` (the unchecked failure adapters translate codec errors into). |
| `io` | `Source`, `Sink`, `Buffer`, `BufferedSource`, `BufferedSink`, `IoProvider`, `Io`, `TeeSink`. |
Expand Down
19 changes: 16 additions & 3 deletions sdk-core/api/sdk-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -1960,12 +1960,25 @@ public abstract interface class org/dexpace/sdk/core/io/Source : java/io/Closeab
}

public final class org/dexpace/sdk/core/pagination/CursorPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy {
public fun <init> (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
public fun <init> (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V
public synthetic fun <init> (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lkotlin/jvm/functions/Function1;)V
public fun <init> (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V
public synthetic fun <init> (Lkotlin/jvm/functions/Function1;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun parse (Lorg/dexpace/sdk/core/http/response/Response;Lorg/dexpace/sdk/core/http/request/Request;)Lorg/dexpace/sdk/core/pagination/Page;
}

public final class org/dexpace/sdk/core/pagination/CursorResult {
public fun <init> (Ljava/util/List;Ljava/lang/String;)V
public final fun component1 ()Ljava/util/List;
public final fun component2 ()Ljava/lang/String;
public final fun copy (Ljava/util/List;Ljava/lang/String;)Lorg/dexpace/sdk/core/pagination/CursorResult;
public static synthetic fun copy$default (Lorg/dexpace/sdk/core/pagination/CursorResult;Ljava/util/List;Ljava/lang/String;ILjava/lang/Object;)Lorg/dexpace/sdk/core/pagination/CursorResult;
public fun equals (Ljava/lang/Object;)Z
public final fun getItems ()Ljava/util/List;
public final fun getNextCursor ()Ljava/lang/String;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class org/dexpace/sdk/core/pagination/LinkHeaderPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy {
public fun <init> (Lkotlin/jvm/functions/Function1;)V
public fun <init> (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,45 @@ import org.dexpace.sdk.core.http.response.Response
* - Page-N response: arbitrary body containing both the items and the next cursor.
* - End of stream: response carries a `null` or absent next cursor.
*
* The strategy is **stateless**: it relies on [itemsExtractor] and [cursorExtractor]
* lambdas to pull data out of the response, and on [RequestRebuilder] to mutate the
* initial request's URL with the new cursor query parameter.
* The strategy is **stateless**: it relies on a single [extractor] to pull both the items
* and the next cursor out of the response in one pass — see [CursorResult] — and rewrites the
* initial request's URL, setting the cursor query parameter to the new value, for the next
* page.
*
* ## Single read
*
* A response body is single-use. Because cursor APIs carry the items and the next cursor in
* the same payload, the extractor reads the body **once** and returns both via a
* [CursorResult]. This avoids the double-drain trap of pulling items and cursor with two
* independent `(Response) -> …` lambdas, which forces either a failed second read or a
* per-response cache to work around it.
*
* @param T Element type extracted from the response.
* @property itemsExtractor Reads the list of items from the response. Called once per
* page; must drain the response body synchronously.
* @property cursorExtractor Reads the next cursor from the response, or returns `null`
* if there are no more pages.
* @property extractor Reads the items and next cursor from the response in a single pass.
* Called once per page; must drain the response body synchronously and return both pieces.
* @property cursorQueryParam Query parameter name used to send the cursor (default
* `"cursor"`). Servers vary; common alternatives include `"after"`, `"next"`,
* `"pageCursor"`.
*/
public class CursorPaginationStrategy<T>
@JvmOverloads
constructor(
private val itemsExtractor: (Response) -> List<T>,
private val cursorExtractor: (Response) -> String?,
private val extractor: (Response) -> CursorResult<T>,
private val cursorQueryParam: String = "cursor",
) : PaginationStrategy<T> {
override fun parse(
response: Response,
initialRequest: Request,
): Page<T> {
val items: List<T> = itemsExtractor(response)
val nextCursor: String? = cursorExtractor(response)
val result: CursorResult<T> = extractor(response)
val nextCursor: String? = result.nextCursor
val hasNext: Boolean = !nextCursor.isNullOrEmpty()
val nextRequest: Request? =
if (hasNext) {
RequestRebuilder.withQueryParam(initialRequest, cursorQueryParam, nextCursor)
} else {
null
}
return SimplePage(items = items, hasNext = hasNext, nextRequest = nextRequest)
return SimplePage(items = result.items, hasNext = hasNext, nextRequest = nextRequest)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2026 dexpace and Omar Aljarrah
*
* Licensed under the MIT License. See LICENSE in the project root.
* SPDX-License-Identifier: MIT
*/

package org.dexpace.sdk.core.pagination

import org.dexpace.sdk.core.http.response.Response

/**
* Result of a single-pass cursor extraction: the items on a page together with the cursor
* for the next page, pulled from one read of the [Response].
*
* Cursor APIs return both the items and the next cursor in the same payload, so reading
* them with two separate `(Response) -> …` lambdas means draining the body twice. A response
* body is single-use, so the second drain either fails or — at best — is worked around with
* a per-response cache. `CursorResult` lets a [CursorPaginationStrategy] extractor parse the
* response once and hand back both pieces, keeping the public API honest about the
* single-read constraint.
*
* ## Thread-safety
*
* The [items] list passed at construction is stored by reference, so a caller that retains a
* mutable list can mutate it afterwards and the change will be visible through this result.
* The bundled extractor path always builds a fresh list per page, so values produced by the
* SDK are effectively immutable; hand-built results that share a mutable list should copy it
* (`items.toList()`) before constructing.
*
* @param T Element type carried in [items].
* @property items Items on the page, in server-defined order. Never `null`; may be empty.
* @property nextCursor Opaque cursor to send on the next request, or `null` (or empty) when
* the response signals end-of-stream.
*/
public data class CursorResult<out T>(
public val items: List<T>,
public val nextCursor: String?,
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package org.dexpace.sdk.core.pagination
import org.dexpace.sdk.core.http.request.Method
import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.http.response.Response
import java.util.IdentityHashMap
import java.util.stream.Collectors
import kotlin.test.BeforeTest
import kotlin.test.Test
Expand All @@ -29,34 +28,18 @@ class CursorPaginationTest {
.build()

/**
* Parses the body format `items=<csv>\ncursor=<next-or-empty>` into a (items, cursor)
* pair. Reads the body exactly once.
* Single-pass extractor for the body format `items=<csv>\ncursor=<next-or-empty>`. Reads
* the body exactly once and returns both the items and the next cursor as a
* [CursorResult], so there is no double-drain of the single-use response body.
*/
private fun parsePayload(resp: Response): Pair<List<String>, String?> {
private val extractor: (Response) -> CursorResult<String> = { resp ->
val body = resp.body!!.source().use { it.readUtf8() }
val itemsLine = body.lineSequence().firstOrNull { it.startsWith("items=") } ?: "items="
val cursorLine = body.lineSequence().firstOrNull { it.startsWith("cursor=") } ?: "cursor="
val itemsRaw = itemsLine.removePrefix("items=")
val cursorRaw = cursorLine.removePrefix("cursor=")
val items = if (itemsRaw.isEmpty()) emptyList() else itemsRaw.split(",")
val cursor: String? = cursorRaw.ifEmpty { null }
return Pair(items, cursor)
}

/**
* Pair of extractors that share a per-Response identity-keyed cache so the
* single-use body is read exactly once per page even though the strategy's contract
* splits items + cursor into two calls.
*/
private fun buildCachedExtractors(): Pair<(Response) -> List<String>, (Response) -> String?> {
val cache: MutableMap<Response, Pair<List<String>, String?>> = IdentityHashMap()
val items: (Response) -> List<String> = { r ->
cache.getOrPut(r) { parsePayload(r) }.first
}
val cursor: (Response) -> String? = { r ->
cache.getOrPut(r) { parsePayload(r) }.second
}
return Pair(items, cursor)
CursorResult(items, cursorRaw.ifEmpty { null })
}

@Test
Expand All @@ -72,8 +55,7 @@ class CursorPaginationTest {
textResponse(req, "items=g,h,i\ncursor=")
}

val (items, cursor) = buildCachedExtractors()
val strategy = CursorPaginationStrategy(items, cursor, cursorQueryParam = "cursor")
val strategy = CursorPaginationStrategy(extractor, cursorQueryParam = "cursor")
val paginator = Paginator(client, initialRequest(), strategy)

val collected: List<String> = paginator.iterateAll().toList()
Expand All @@ -96,8 +78,7 @@ class CursorPaginationTest {
client.on("https://api.example.com/items") { req ->
textResponse(req, "items=only-1,only-2\ncursor=")
}
val (items, cursor) = buildCachedExtractors()
val strategy = CursorPaginationStrategy(items, cursor, "cursor")
val strategy = CursorPaginationStrategy(extractor, "cursor")
val paginator = Paginator(client, initialRequest(), strategy)
assertEquals(listOf("only-1", "only-2"), paginator.iterateAll().toList())
assertEquals(1, client.callCount)
Expand All @@ -123,8 +104,7 @@ class CursorPaginationTest {
.addHeader("Authorization", "Bearer xyz")
.build()

val (items, cursor) = buildCachedExtractors()
val strategy = CursorPaginationStrategy(items, cursor)
val strategy = CursorPaginationStrategy(extractor)
val paginator = Paginator(client, authRequest, strategy)
assertEquals(listOf("a", "b"), paginator.iterateAll().toList())
}
Expand All @@ -139,8 +119,7 @@ class CursorPaginationTest {
textResponse(req, "items=3,4\ncursor=")
}

val (items, cursor) = buildCachedExtractors()
val strategy = CursorPaginationStrategy(items, cursor)
val strategy = CursorPaginationStrategy(extractor)
val paginator = Paginator(client, initialRequest(), strategy)
val streamed: List<String> = paginator.streamAll().collect(Collectors.toList())
assertEquals(listOf("1", "2", "3", "4"), streamed)
Expand All @@ -149,50 +128,25 @@ class CursorPaginationTest {
@Test
fun `cursor with special characters is URL encoded in next request`() {
// Opaque cursors may contain `=` `+` `/` characters (base64) — the rebuilder must
// URL-encode them so the server sees the original value unmangled.
// URL-encode them so the server sees the original value unmangled. A custom query
// param name (e.g. `page_token`) covers token-style APIs that reuse this strategy.
val rawCursor = "a+b/c="
val encoded = "a%2Bb%2Fc%3D"
val client = StubHttpClient()
client.on("https://api.example.com/items") { req ->
textResponse(req, "items=one\ncursor=$rawCursor")
}
client.on("https://api.example.com/items?cursor=$encoded") { req ->
textResponse(req, "items=two\ncursor=")
}

val (items, cursor) = buildCachedExtractors()
val strategy = CursorPaginationStrategy(items, cursor)
val paginator = Paginator(client, initialRequest(), strategy)
assertEquals(listOf("one", "two"), paginator.iterateAll().toList())
assertEquals(
listOf(
"https://api.example.com/items",
"https://api.example.com/items?cursor=$encoded",
),
client.receivedUrls,
)
}

@Test
fun `custom query-param name is used for the next-page cursor`() {
// Token-style APIs (next_page_token, pageToken, …) are served by setting
// cursorQueryParam; the next request must carry the cursor under that name.
val client = StubHttpClient()
client.on("https://api.example.com/items") { req ->
textResponse(req, "items=one\ncursor=tok1")
}
client.on("https://api.example.com/items?page_token=tok1") { req ->
client.on("https://api.example.com/items?page_token=$encoded") { req ->
textResponse(req, "items=two\ncursor=")
}

val (items, cursor) = buildCachedExtractors()
val strategy = CursorPaginationStrategy(items, cursor, cursorQueryParam = "page_token")
val strategy = CursorPaginationStrategy(extractor, cursorQueryParam = "page_token")
val paginator = Paginator(client, initialRequest(), strategy)
assertEquals(listOf("one", "two"), paginator.iterateAll().toList())
assertEquals(
listOf(
"https://api.example.com/items",
"https://api.example.com/items?page_token=tok1",
"https://api.example.com/items?page_token=$encoded",
),
client.receivedUrls,
)
Expand Down
Loading
Loading