Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions KestrelMinima.Demo/KestrelMinima.Demo.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<ServerGarbageCollection>true</ServerGarbageCollection>
<ConcurrentGarbageCollection>false</ConcurrentGarbageCollection>
<TieredPGO>true</TieredPGO>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../KestrelMinima/KestrelMinima.csproj" />
</ItemGroup>

</Project>
12 changes: 12 additions & 0 deletions KestrelMinima.Demo/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using KestrelMinima;

var builder = WebApplication.CreateSlimBuilder(args);
builder.Logging.SetMinimumLevel(LogLevel.Warning);

builder.WebHost
.UseKestrelMinima(o => o.ReactorCount = 8)
.ConfigureKestrel(o => o.ListenAnyIP(8080));

var app = builder.Build();
app.MapGet("/", () => "Hello World!");
app.Run();
33 changes: 33 additions & 0 deletions KestrelMinima/Connection/Connection.InputPipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.IO.Pipelines;

namespace KestrelMinima;

/// <summary>
/// Kestrel-mode input path. The reactor copies recv bytes into a BCL
/// <see cref="Pipe"/> and Kestrel reads <c>InputPipe.Reader</c> — bypassing the
/// hand-rolled read IVTS, which can't take Kestrel's concurrent off-reactor
/// access. Output uses the write slab + a fire-and-forget FlushAsync (no IVTS).
/// </summary>
public sealed unsafe partial class Connection
{
internal Pipe? InputPipe;

internal void InitInputPipe()
=> InputPipe = new Pipe(new PipeOptions(
pauseWriterThreshold: 0,
resumeWriterThreshold: 0,
useSynchronizationContext: false));

/// <summary>Reactor-thread: copy recv bytes into the pipe and publish.</summary>
internal void FeedInput(byte* ptr, int len)
{
Span<byte> dst = InputPipe!.Writer.GetSpan(len);
new ReadOnlySpan<byte>(ptr, len).CopyTo(dst);
InputPipe.Writer.Advance(len);
_ = InputPipe.Writer.FlushAsync(); // no backpressure → completes synchronously
}

/// <summary>Reactor-thread: signal EOF to Kestrel's reader.</summary>
internal void CompleteInput(Exception? error = null)
=> InputPipe?.Writer.Complete(error);
}
163 changes: 163 additions & 0 deletions KestrelMinima/Connection/Connection.Read.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
using System.Threading.Tasks.Sources;
using KestrelMinima.Utils;

// ReSharper disable SuggestVarOrType_BuiltInTypes

namespace KestrelMinima;

/// <summary>
/// Per-connection state. The handler may run on any thread (e.g. resumed by
/// a thread-pool timer); reactor-only side effects are funnelled through the
/// MPSC queues on `Reactor`. Coordination uses Interlocked.Exchange on the
/// arm flags and a sticky `_pending` to close the lost-wakeup race.
///
/// Lifetime is pool-managed: the reactor pops a Connection on accept (or new
/// one if pool is empty), and pushes it back on teardown after `Clear()`. The
/// `_generation` field is bumped on each `Clear` so stale `ValueTask` tokens
/// from a previous connection life are detectable and return `Closed()`
/// instead of leaking the new tenant's state.
/// </summary>
public sealed unsafe partial class Connection : IValueTaskSource<RecvSnapshot>
{
internal Connection SetFd(int fd)
{
ClientFd = fd;
return this;
}

private ManualResetValueTaskSourceCore<RecvSnapshot> _readSignal;
private int _armed;
private int _pending;
private int _closed;

private readonly SpscRecvRing _recv = new(capacityPow2: 16);

public ValueTask<RecvSnapshot> ReadAsync()
{
if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1)
{
Volatile.Write(ref _pending, 0);
return new ValueTask<RecvSnapshot>(
new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
}

if (Volatile.Read(ref _closed) != 0)
{
return new ValueTask<RecvSnapshot>(RecvSnapshot.Closed());
}

if (Interlocked.Exchange(ref _armed, 1) == 1)
{
throw new InvalidOperationException("ReadAsync already armed.");
}

// Snapshot the generation as the IVTS token so a future Clear() can
// invalidate this awaiter if the connection gets pool-recycled.
int gen = Volatile.Read(ref _generation);

// Race recovery: re-check between arming and returning the IVTS task.
if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1 || Volatile.Read(ref _closed) != 0)
{
Volatile.Write(ref _pending, 0);
Interlocked.Exchange(ref _armed, 0);

return new ValueTask<RecvSnapshot>(
new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
}

return new ValueTask<RecvSnapshot>(this, (short)gen);
}

public bool TryGetItem(in RecvSnapshot snap, out SpscRecvRing.Item item)
=> _recv.TryDequeueUntil(snap.Tail, out item);

public void ResetRead() => _readSignal.Reset();

public void Complete(int res, ushort bid, bool hasBuffer, byte* ptr)
{
if (!_recv.TryEnqueue(new SpscRecvRing.Item
{
Ptr = ptr,
Bid = bid,
Len = res,
HasBuffer = hasBuffer,
Gen = (ushort)Volatile.Read(ref _generation)
}))
{
Console.Error.WriteLine("[conn] recv queue overflow.");
if (hasBuffer)
{
_reactor.ReturnBufferDirect(bid);
}
Volatile.Write(ref _closed, 1);
}

if (Interlocked.Exchange(ref _armed, 0) == 1)
{
_readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
}
else
{
Volatile.Write(ref _pending, 1);
}
}

internal void DrainRecv()
{
// Return any buffer IDs still sitting in the SPSC ring (handler exited
// before draining them, or a recv arrived after _closed was set).
while (_recv.TryDequeue(out SpscRecvRing.Item item))
{
if (item.HasBuffer)
{
_reactor.ReturnBufferDirect(item.Bid);
}
}
}

// =========================================================================
// IValueTaskSource plumbing — token (= snapshot of `_generation` at await
// time) is compared against the current `_generation` to detect stale
// awaiters from before a Clear()/pool reuse. Stale awaiters get a
// sentinel result rather than the new tenant's state.
//
// For the actual IVTS dispatch we pass `_readSignal.Version` /
// `_flushSignal.Version` to the underlying core (not `token`) because the
// core's version is bumped by ResetRead/CompleteFlush mid-life and is
// unrelated to the cross-life generation guard.
// =========================================================================

RecvSnapshot IValueTaskSource<RecvSnapshot>.GetResult(short token)
{
if (token != (short)Volatile.Read(ref _generation))
{
return RecvSnapshot.Closed();
}

return _readSignal.GetResult(_readSignal.Version);
}

ValueTaskSourceStatus IValueTaskSource<RecvSnapshot>.GetStatus(short token)
{
if (token != (short)Volatile.Read(ref _generation))
{
return ValueTaskSourceStatus.Succeeded;
}

return _readSignal.GetStatus(_readSignal.Version);
}

void IValueTaskSource<RecvSnapshot>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (token != (short)Volatile.Read(ref _generation))
{
// Stale — run the continuation now so the awaiter unblocks and
// gets RecvSnapshot.Closed() from GetResult.
continuation(state);

return;
}

_readSignal.OnCompleted(continuation, state, _readSignal.Version, flags);
}
}
102 changes: 102 additions & 0 deletions KestrelMinima/Connection/Connection.Write.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System.Buffers;
using KestrelMinima.Utils;

// ReSharper disable SuggestVarOrType_BuiltInTypes

namespace KestrelMinima;

/// <summary>
/// Fire-and-forget write path: FlushAsync hands the slab to the reactor (an io_uring
/// send SQE + eventfd wake) and returns synchronously. No IValueTaskSource, no
/// awaiter scheduling, no continuation hop. Safe for HTTP/1.1 plaintext because the
/// client cannot send the next request until it receives the previous response —
/// which in turn cannot happen until the kernel finishes our send (and the reactor
/// has processed the resulting send CQE, which is what resets WriteHead/WriteTail).
/// So by the time Kestrel produces the next response into this slab, the previous
/// send is fully ack'd and the slab is free for reuse.
/// </summary>
public sealed unsafe partial class Connection : IBufferWriter<byte>
{
private readonly int _writeSlabSize;
internal byte* WriteBuffer;
// WriteHead — bytes ack'd by the kernel (reactor thread mutates).
// WriteSubmitted — bytes queued to the kernel via SubmitSend (reactor thread mutates).
// WriteTail — bytes produced by Kestrel into the slab (Kestrel thread mutates).
internal int WriteHead;
internal int WriteSubmitted;
internal int WriteTail;

private readonly UnmanagedMemoryManager _manager;

// IBufferWriter<byte>
#region IBufferWriter<byte>

public Memory<byte> GetMemory(int sizeHint = 0)
{
int remaining = _writeSlabSize - WriteTail;
if (sizeHint > remaining)
{
throw new InvalidOperationException(
$"GetMemory: sizeHint={sizeHint} > remaining={remaining} (slab={_writeSlabSize}, WriteTail={WriteTail}, WriteSubmitted={WriteSubmitted}, WriteHead={WriteHead}, closed={Volatile.Read(ref _closed)})");
}

return _manager.Memory.Slice(WriteTail, remaining);
}

public Span<byte> GetSpan(int sizeHint = 0)
{
if (WriteTail + sizeHint > _writeSlabSize)
{
throw new InvalidOperationException(
$"GetSpan: sizeHint={sizeHint}, WriteTail={WriteTail}, slab={_writeSlabSize}, WriteSubmitted={WriteSubmitted}, WriteHead={WriteHead}, closed={Volatile.Read(ref _closed)}");
}

return new Span<byte>(WriteBuffer + WriteTail, _writeSlabSize - WriteTail);
}

public void Advance(int count) => WriteTail += count;

#endregion

// Write to the inner buffer
public void Write(ReadOnlySpan<byte> source)
{
int len = source.Length;
if (WriteTail + len > _writeSlabSize)
{
throw new InvalidOperationException("Write buffer too small.");
}

source.CopyTo(new Span<byte>(WriteBuffer + WriteTail, len));
WriteTail += len;
}

// Fire-and-forget: hand the fd to the reactor and return. The reactor reads
// [WriteSubmitted, WriteTail) on drain and submits an SQE. Multi-flush within
// one response is handled naturally — the MPSC may have the fd queued multiple
// times, but the second drain finds end <= begin and no-ops.
public ValueTask FlushAsync()
{
if (Volatile.Read(ref _closed) == 1)
{
return default;
}

if (WriteTail == 0)
{
return default;
}

_reactor.EnqueueFlush(ClientFd);

return default;
}

// Reactor-thread: all submitted bytes ack'd AND no new bytes pending — reset.
internal void CompleteFlush()
{
WriteHead = 0;
WriteSubmitted = 0;
WriteTail = 0;
}
}
68 changes: 68 additions & 0 deletions KestrelMinima/Connection/Connection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Runtime.InteropServices;
using KestrelMinima.Utils;

namespace KestrelMinima;

public sealed unsafe partial class Connection
{
private readonly Reactor _reactor;

public int ClientFd { get; private set; }

// Bumped on Clear(); the low 16 bits are used as the read IVTS token so
// stale awaiters can be detected after pool reuse. (The Kestrel path never
// touches the read IVTS, but it's reused by MarkClosed's `_readSignal`
// SetResult — harmless when nobody awaits.)
private int _generation;

public Connection(Reactor reactor, int fd, int writeSlabSize = 256 * 1024)
{
_reactor = reactor;
ClientFd = fd;
_writeSlabSize = writeSlabSize;
WriteBuffer = (byte*)NativeMemory.AlignedAlloc((nuint)writeSlabSize, 64);

_manager = new UnmanagedMemoryManager(WriteBuffer, writeSlabSize);
}

// Reactor-thread only — called from Recycle in the reactor's recv/send error paths.
public void MarkClosed()
{
Volatile.Write(ref _closed, 1);

if (Interlocked.Exchange(ref _armed, 0) == 1)
{
_readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), isClosed: true));
}
else
{
Volatile.Write(ref _pending, 1);
}
}

internal void Clear()
{
Interlocked.Increment(ref _generation);

Volatile.Write(ref _armed, 0);
Volatile.Write(ref _pending, 0);
Volatile.Write(ref _closed, 0);

WriteHead = 0;
WriteSubmitted = 0;
WriteTail = 0;

_readSignal.Reset();

_recv.Reset(); // discard any leftover SPSC items
}

public void Dispose()
{
if (WriteBuffer != null)
{
NativeMemory.AlignedFree(WriteBuffer);
WriteBuffer = null;
}
}
}
Loading
Loading