Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<ItemGroup>
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.5" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<PackageVersion Include="xunit" Version="2.9.2" />
Expand Down
1 change: 1 addition & 0 deletions src/Dexpace.Sdk.Core/Dexpace.Sdk.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="all" />
</ItemGroup>

Expand Down
66 changes: 66 additions & 0 deletions src/Dexpace.Sdk.Core/Http/Response/Response.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

using Dexpace.Sdk.Core.Errors;
using Dexpace.Sdk.Core.Http.Common;

namespace Dexpace.Sdk.Core.Http.Response;
Expand Down Expand Up @@ -48,6 +49,71 @@ public Response(
/// <summary>Shorthand for <c>Status.IsSuccess</c>.</summary>
public bool IsSuccess => Status.IsSuccess;

/// <summary>
/// Throws <see cref="HttpResponseException"/> if the status is not in the 2xx success range.
/// </summary>
/// <remarks>
/// When the status is an error, the response body is drained up to
/// <see cref="MaxBufferedErrorBytes"/> into an in-memory buffer and attached to the
/// thrown exception so that <see cref="HttpResponseException.GetErrorAsync{T}"/> can read
/// it. The cap guards against oversized error pages consuming unbounded memory.
/// </remarks>
/// <param name="cancellationToken">A token that can cancel the body-drain operation.</param>
/// <returns>A <see cref="ValueTask"/> that completes when the check has been performed.</returns>
/// <exception cref="HttpResponseException">
/// The response status is not in the 2xx range. The exception carries a buffered copy of
/// the error body (up to <see cref="MaxBufferedErrorBytes"/> bytes).
/// </exception>
public async ValueTask EnsureSuccessAsync(CancellationToken cancellationToken = default)
{
if (IsSuccess)
{
return;
}

// Drain and cap the body so the caller can read it from the exception.
var rawBytes = await DrainCappedAsync(Body, MaxBufferedErrorBytes, cancellationToken)
.ConfigureAwait(false);

var bufferedBody = ResponseBody.FromBytes(rawBytes, Body.ContentType);
var bufferedResponse = new Response(Status, Headers, bufferedBody, Protocol);
throw new HttpResponseException(bufferedResponse);
}

/// <summary>
/// The maximum number of bytes buffered from an error response body by
/// <see cref="EnsureSuccessAsync"/>. Larger bodies are silently truncated to this limit.
/// </summary>
public const int MaxBufferedErrorBytes = 1024 * 1024; // 1 MiB

private static async Task<byte[]> DrainCappedAsync(
ResponseBody body,
int maxBytes,
CancellationToken cancellationToken)
{
await using var stream = await body.OpenReadAsync(cancellationToken).ConfigureAwait(false);
using var buffer = new MemoryStream();

var remaining = maxBytes;
var chunk = new byte[Math.Min(81920, maxBytes)];

while (remaining > 0)
{
var toRead = Math.Min(chunk.Length, remaining);
var read = await stream.ReadAsync(chunk.AsMemory(0, toRead), cancellationToken)
.ConfigureAwait(false);
if (read == 0)
{
break;
}

buffer.Write(chunk, 0, read);
remaining -= read;
}

return buffer.ToArray();
}

/// <inheritdoc/>
public void Dispose()
{
Expand Down
67 changes: 67 additions & 0 deletions src/Dexpace.Sdk.Core/Pipeline/DexpacePipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

using Dexpace.Sdk.Core.Client;
using Dexpace.Sdk.Core.Pipeline.Policies;
using Microsoft.Extensions.Logging;

namespace Dexpace.Sdk.Core.Pipeline;

/// <summary>
/// Factory for the default Dexpace SDK HTTP pipeline.
/// </summary>
/// <remarks>
/// <see cref="CreateDefault"/> assembles the standard policy set in the correct stage order:
/// <see cref="OperationPolicy"/> → <see cref="RedirectPolicy"/> → <see cref="IdempotencyPolicy"/>
/// → <see cref="ClientIdentityPolicy"/> → <see cref="RetryPolicy"/> → <see cref="SetDatePolicy"/>
/// → optional auth policy → <see cref="InstrumentationPolicy"/> → transport.
/// Because <see cref="PipelineBuilder"/> sorts by <see cref="HttpPipelinePolicy.Stage"/> at build
/// time, the insertion order of the <c>Add</c> calls here does not affect the final ordering.
/// </remarks>
public static class DexpacePipeline
{
/// <summary>
/// Builds the default HTTP pipeline with all standard policies wired in the correct stage order.
/// </summary>
/// <param name="transport">
/// The transport that executes HTTP requests. Called after all policies have run.
/// </param>
/// <param name="authPolicy">
/// An optional authentication policy inserted at <see cref="PipelineStage.Auth"/>. When
/// <see langword="null"/> no auth policy is added.
/// </param>
/// <param name="logger">
/// An optional logger forwarded to <see cref="InstrumentationPolicy"/>. When
/// <see langword="null"/> the policy falls back to <c>NullLogger.Instance</c>.
/// </param>
/// <param name="timeProvider">
/// An optional <see cref="TimeProvider"/> forwarded to <see cref="RetryPolicy"/> and
/// <see cref="SetDatePolicy"/>. Defaults to <see cref="TimeProvider.System"/> when
/// <see langword="null"/>.
/// </param>
/// <returns>A fully assembled <see cref="HttpPipeline"/> ready for use.</returns>
public static HttpPipeline CreateDefault(
IAsyncHttpClient transport,
HttpPipelinePolicy? authPolicy = null,
ILogger? logger = null,
TimeProvider? timeProvider = null)
{
ArgumentNullException.ThrowIfNull(transport);

var builder = new PipelineBuilder()
.Add(new OperationPolicy())
.Add(new RedirectPolicy())
.Add(new IdempotencyPolicy())
.Add(new ClientIdentityPolicy())
.Add(new RetryPolicy(timeProvider))
.Add(new SetDatePolicy(timeProvider))
.Add(new InstrumentationPolicy(logger));

if (authPolicy is not null)
{
builder.Add(authPolicy);
}

return builder.Build(transport);
}
}
84 changes: 84 additions & 0 deletions src/Dexpace.Sdk.Core/Pipeline/HttpPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

using Dexpace.Sdk.Core.Client;
using Dexpace.Sdk.Core.Configuration;
using Dexpace.Sdk.Core.Errors;
using Dexpace.Sdk.Core.Http.Request;
using Dexpace.Sdk.Core.Http.Response;

namespace Dexpace.Sdk.Core.Pipeline;

/// <summary>
/// The entry point for sending an HTTP request through the configured policy chain.
/// </summary>
/// <remarks>
/// <para>
/// Instances are created exclusively by <see cref="PipelineBuilder.Build"/>. The pipeline is
/// immutable after construction: the sorted policy array and transport are captured at build time.
/// </para>
/// <para>
/// <b>Sync bridge.</b> <see cref="Send"/> blocks the calling thread by driving the async chain
/// synchronously via <c>GetAwaiter().GetResult()</c>. Callers on a thread pool should prefer
/// <see cref="SendAsync"/> to avoid thread starvation.
/// </para>
/// </remarks>
public sealed class HttpPipeline
{
private readonly HttpPipelinePolicy[] _policies;
private readonly IAsyncHttpClient _transport;

internal HttpPipeline(HttpPipelinePolicy[] policies, IAsyncHttpClient transport)
{
_policies = policies;
_transport = transport;
}

/// <summary>
/// Asynchronously sends <paramref name="request"/> through the pipeline and returns the
/// response produced by the terminal transport.
/// </summary>
/// <param name="request">The request to send.</param>
/// <param name="options">Client options that apply to this call.</param>
/// <param name="cancellationToken">An optional token to cancel the call.</param>
/// <returns>
/// A <see cref="ValueTask{TResult}"/> that completes with the <see cref="Response"/> once
/// the pipeline chain has finished.
/// </returns>
/// <exception cref="PipelineAbortedException">
/// No policy or the transport produced a <see cref="Response"/> by the time the chain
/// completed (i.e. the pipeline was short-circuited without setting a response).
/// </exception>
public async ValueTask<Response> SendAsync(
Request request,
DexpaceClientOptions options,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(options);

var context = new PipelineContext(request, options, cancellationToken);
await new PipelineRunner(_policies, 0, _transport).RunAsync(context).ConfigureAwait(false);

return context.Response
?? throw new PipelineAbortedException(
"The pipeline completed without producing a response.");
}

/// <summary>
/// Synchronously sends <paramref name="request"/> through the pipeline and returns the
/// response. Blocks the calling thread until the async chain completes.
/// </summary>
/// <param name="request">The request to send.</param>
/// <param name="options">Client options that apply to this call.</param>
/// <param name="cancellationToken">An optional token to cancel the call.</param>
/// <returns>The <see cref="Response"/> produced by the pipeline.</returns>
/// <exception cref="PipelineAbortedException">
/// The pipeline completed without producing a response.
/// </exception>
public Response Send(
Request request,
DexpaceClientOptions options,
CancellationToken cancellationToken = default) =>
SendAsync(request, options, cancellationToken).AsTask().GetAwaiter().GetResult();
}
46 changes: 46 additions & 0 deletions src/Dexpace.Sdk.Core/Pipeline/HttpPipelinePolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

namespace Dexpace.Sdk.Core.Pipeline;

/// <summary>
/// Base class for every policy in the HTTP pipeline.
/// </summary>
/// <remarks>
/// <para>
/// A policy participates in the request/response lifecycle by implementing
/// <see cref="ProcessAsync"/>. Before calling <c>next.RunAsync</c>, a policy may mutate
/// <see cref="PipelineContext.Request"/>; after the call returns, it may inspect or replace
/// <see cref="PipelineContext.Response"/>.
/// </para>
/// <para>
/// <b>Re-entrancy.</b> <see cref="PipelineRunner"/> is a value type, so a policy may call
/// <c>next.RunAsync</c> more than once — this is how redirect and retry policies work.
/// </para>
/// <para>
/// <b>Async-only in v1.</b> There is no synchronous <c>Process</c> override on this base class.
/// The sync entry point on the pipeline drives the async chain via a blocking
/// bridge; concrete policy subclasses are only required to implement the async path.
/// </para>
/// </remarks>
public abstract class HttpPipelinePolicy
{
/// <summary>
/// The stage at which this policy is inserted in the pipeline.
/// </summary>
public abstract PipelineStage Stage { get; }

/// <summary>
/// Asynchronously participates in processing the request/response.
/// </summary>
/// <param name="context">
/// The mutable context carrying the current <see cref="PipelineContext.Request"/>,
/// <see cref="PipelineContext.Response"/>, and ancillary state for this call.
/// </param>
/// <param name="continuation">
/// The continuation that runs the remaining policies and eventually invokes the transport.
/// Call this to forward the request; omit the call to short-circuit the chain.
/// </param>
/// <returns>A <see cref="ValueTask"/> that completes when the policy has finished.</returns>
public abstract ValueTask ProcessAsync(PipelineContext context, PipelineRunner continuation);
}
Loading
Loading