diff --git a/frameworks/minima/.dockerignore b/frameworks/minima/.dockerignore
new file mode 100644
index 00000000..cd42ee34
--- /dev/null
+++ b/frameworks/minima/.dockerignore
@@ -0,0 +1,2 @@
+bin/
+obj/
diff --git a/frameworks/minima/Connection/Connection.Incremental.cs b/frameworks/minima/Connection/Connection.Incremental.cs
new file mode 100644
index 00000000..bb3dad76
--- /dev/null
+++ b/frameworks/minima/Connection/Connection.Incremental.cs
@@ -0,0 +1,61 @@
+using System.Runtime.InteropServices;
+using Minima.Utils;
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima;
+
+///
+/// Incremental-mode (IOU_PBUF_RING_INC) per-connection buffer-ring state.
+/// Each connection owns its own ring + slab; one buffer accumulates this
+/// connection's byte stream across many recvs. The reactor (Reactor.Incremental)
+/// drives setup/teardown and the refcounted recycle; this partial just holds the
+/// state and routes a handler return to the right reactor entry point.
+///
+/// All of these stay allocated across pool reuse and are freed in Dispose().
+///
+public sealed unsafe partial class Connection
+{
+ internal byte* BufRing; // kernel-shared ring control area
+ internal byte* BufSlab; // this connection's recv slab
+ internal ushort Bgid;
+ internal uint BufRingMask;
+ internal int BufRingEntries;
+ internal bool IncrementalMode;
+
+ internal int[]? CumOffset; // per-bid: byte offset where the next slice begins
+ internal int[]? RefCount; // per-bid: outstanding handler refs
+ internal bool[]? KernelDone; // per-bid: kernel finished appending (no F_BUF_MORE)
+
+ internal int Generation => Volatile.Read(ref _generation);
+
+ ///
+ /// Called by the handler to hand a consumed recv buffer back. Routes by mode:
+ /// incremental returns carry (fd, gen, bid) for refcounted recycle; the shared
+ /// path returns the bare bid to the reactor's single buf_ring.
+ ///
+ public void ReturnBuffer(in SpscRecvRing.Item item)
+ {
+ if (IncrementalMode)
+ {
+ _reactor.EnqueueReturnQIncremental(ClientFd, item.Gen, item.Bid);
+ }
+ else
+ {
+ _reactor.EnqueueReturnQ(item.Bid);
+ }
+ }
+
+ private void DisposeIncremental()
+ {
+ if (BufRing != null)
+ {
+ NativeMemory.AlignedFree(BufRing);
+ BufRing = null;
+ }
+ if (BufSlab != null)
+ {
+ NativeMemory.AlignedFree(BufSlab);
+ BufSlab = null;
+ }
+ }
+}
diff --git a/frameworks/minima/Connection/Connection.Read.cs b/frameworks/minima/Connection/Connection.Read.cs
new file mode 100644
index 00000000..f803895a
--- /dev/null
+++ b/frameworks/minima/Connection/Connection.Read.cs
@@ -0,0 +1,166 @@
+using System.Threading.Tasks.Sources;
+using Minima.Utils;
+
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima;
+
+///
+/// Per-connection state. The handler may run on any thread (e.g. resumed by
+/// a thread-pool timer); reactor-only side effects are funnelled through the
+/// MPSC queues on `Reactor`. Coordination uses Interlocked.Exchange on the
+/// arm flags and a sticky `_pending` to close the lost-wakeup race.
+///
+/// Lifetime is pool-managed: the reactor pops a Connection on accept (or new
+/// one if pool is empty), and pushes it back on teardown after `Clear()`. The
+/// `_generation` field is bumped on each `Clear` so stale `ValueTask` tokens
+/// from a previous connection life are detectable and return `Closed()`
+/// instead of leaking the new tenant's state.
+///
+public sealed unsafe partial class Connection : IValueTaskSource
+{
+ internal Connection SetFd(int fd)
+ {
+ ClientFd = fd;
+ return this;
+ }
+
+ private ManualResetValueTaskSourceCore _readSignal = new()
+ {
+ RunContinuationsAsynchronously = false,
+ };
+ private int _armed;
+ private int _pending;
+ private int _closed;
+
+ private readonly SpscRecvRing _recv = new(capacityPow2: 16);
+
+ public ValueTask ReadAsync()
+ {
+ if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1)
+ {
+ Volatile.Write(ref _pending, 0);
+ return new ValueTask(
+ new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
+ }
+
+ if (Volatile.Read(ref _closed) != 0)
+ {
+ return new ValueTask(RecvSnapshot.Closed());
+ }
+
+ if (Interlocked.Exchange(ref _armed, 1) == 1)
+ {
+ throw new InvalidOperationException("ReadAsync already armed.");
+ }
+
+ // Snapshot the generation as the IVTS token so a future Clear() can
+ // invalidate this awaiter if the connection gets pool-recycled.
+ int gen = Volatile.Read(ref _generation);
+
+ // Race recovery: re-check between arming and returning the IVTS task.
+ if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1 || Volatile.Read(ref _closed) != 0)
+ {
+ Volatile.Write(ref _pending, 0);
+ Interlocked.Exchange(ref _armed, 0);
+
+ return new ValueTask(
+ new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
+ }
+
+ return new ValueTask(this, (short)gen);
+ }
+
+ public bool TryGetItem(in RecvSnapshot snap, out SpscRecvRing.Item item)
+ => _recv.TryDequeueUntil(snap.Tail, out item);
+
+ public void ResetRead() => _readSignal.Reset();
+
+ public void Complete(int res, ushort bid, bool hasBuffer, byte* ptr)
+ {
+ if (!_recv.TryEnqueue(new SpscRecvRing.Item
+ {
+ Ptr = ptr,
+ Bid = bid,
+ Len = res,
+ HasBuffer = hasBuffer,
+ Gen = (ushort)Volatile.Read(ref _generation)
+ }))
+ {
+ Console.Error.WriteLine("[conn] recv queue overflow.");
+ if (hasBuffer)
+ {
+ _reactor.ReturnBufferDirect(bid);
+ }
+ Volatile.Write(ref _closed, 1);
+ }
+
+ if (Interlocked.Exchange(ref _armed, 0) == 1)
+ {
+ _readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
+ }
+ else
+ {
+ Volatile.Write(ref _pending, 1);
+ }
+ }
+
+ internal void DrainRecv()
+ {
+ // Return any buffer IDs still sitting in the SPSC ring (handler exited
+ // before draining them, or a recv arrived after _closed was set).
+ while (_recv.TryDequeue(out SpscRecvRing.Item item))
+ {
+ if (item.HasBuffer)
+ {
+ _reactor.ReturnBufferDirect(item.Bid);
+ }
+ }
+ }
+
+ // =========================================================================
+ // IValueTaskSource plumbing — token (= snapshot of `_generation` at await
+ // time) is compared against the current `_generation` to detect stale
+ // awaiters from before a Clear()/pool reuse. Stale awaiters get a
+ // sentinel result rather than the new tenant's state.
+ //
+ // For the actual IVTS dispatch we pass `_readSignal.Version` /
+ // `_flushSignal.Version` to the underlying core (not `token`) because the
+ // core's version is bumped by ResetRead/CompleteFlush mid-life and is
+ // unrelated to the cross-life generation guard.
+ // =========================================================================
+
+ RecvSnapshot IValueTaskSource.GetResult(short token)
+ {
+ if (token != (short)Volatile.Read(ref _generation))
+ {
+ return RecvSnapshot.Closed();
+ }
+
+ return _readSignal.GetResult(_readSignal.Version);
+ }
+
+ ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
+ {
+ if (token != (short)Volatile.Read(ref _generation))
+ {
+ return ValueTaskSourceStatus.Succeeded;
+ }
+
+ return _readSignal.GetStatus(_readSignal.Version);
+ }
+
+ void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
+ {
+ if (token != (short)Volatile.Read(ref _generation))
+ {
+ // Stale — run the continuation now so the awaiter unblocks and
+ // gets RecvSnapshot.Closed() from GetResult.
+ continuation(state);
+
+ return;
+ }
+
+ _readSignal.OnCompleted(continuation, state, _readSignal.Version, flags);
+ }
+}
diff --git a/frameworks/minima/Connection/Connection.Write.cs b/frameworks/minima/Connection/Connection.Write.cs
new file mode 100644
index 00000000..2f4c0ce9
--- /dev/null
+++ b/frameworks/minima/Connection/Connection.Write.cs
@@ -0,0 +1,185 @@
+using System.Buffers;
+using System.Threading.Tasks.Sources;
+using Minima.Utils;
+
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima;
+
+public sealed unsafe partial class Connection : IValueTaskSource, IBufferWriter
+{
+ private readonly int _writeSlabSize;
+ internal byte* WriteBuffer;
+ internal int WriteHead;
+ internal int WriteTail;
+ internal int WriteInFlight;
+
+ private readonly UnmanagedMemoryManager _manager;
+
+ private ManualResetValueTaskSourceCore _flushSignal = new()
+ {
+ RunContinuationsAsynchronously = false,
+ };
+ private int _flushArmed;
+ private int _flushInProgress;
+
+ // IBufferWrite
+#region IBufferWrite
+
+ public Memory GetMemory(int sizeHint = 0)
+ {
+ if (Volatile.Read(ref _flushInProgress) != 0)
+ {
+ throw new InvalidOperationException("Cannot write while flush is in progress.");
+ }
+
+ int remaining = _writeSlabSize - WriteTail;
+ if (sizeHint > remaining)
+ {
+ throw new InvalidOperationException("Buffer too small.");
+ }
+
+ return _manager.Memory.Slice(WriteTail, remaining);
+ }
+
+ public Span GetSpan(int sizeHint = 0)
+ {
+ if (Volatile.Read(ref _flushInProgress) != 0)
+ {
+ throw new InvalidOperationException("Cannot write while flush is in progress.");
+ }
+
+ if (WriteTail + sizeHint > _writeSlabSize)
+ {
+ throw new InvalidOperationException("Write buffer too small.");
+ }
+
+ return new Span(WriteBuffer + WriteTail, _writeSlabSize - WriteTail);
+ }
+
+ public void Advance(int count)
+ {
+ if (Volatile.Read(ref _flushInProgress) != 0)
+ {
+ throw new InvalidOperationException("Cannot write while flush is in progress.");
+ }
+
+ WriteTail += count;
+ }
+
+#endregion
+
+ // Write to the inner buffer
+ public void Write(ReadOnlySpan source)
+ {
+ if (Volatile.Read(ref _flushInProgress) != 0)
+ {
+ throw new InvalidOperationException("Cannot write while flush is in progress.");
+ }
+
+ int len = source.Length;
+ if (WriteTail + len > _writeSlabSize)
+ {
+ throw new InvalidOperationException("Write buffer too small.");
+ }
+
+ source.CopyTo(new Span(WriteBuffer + WriteTail, len));
+ WriteTail += len;
+ }
+
+ // Flush inner buffer data to the kernel
+ public ValueTask FlushAsync()
+ {
+ // Connection already torn down (reactor saw EOF/error → MarkClosed): don't flush
+ // a removed connection — the handoff would reach a reactor that no longer knows
+ // this fd and the awaiter would hang. Return completed so the handler unwinds to
+ // its next ReadAsync, sees IsClosed, and exits.
+ if (Volatile.Read(ref _closed) == 1)
+ {
+ return default;
+ }
+
+ if (Interlocked.Exchange(ref _flushInProgress, 1) == 1)
+ {
+ throw new InvalidOperationException("FlushAsync already in progress.");
+ }
+
+ int target = WriteTail;
+ if (target == 0)
+ {
+ Volatile.Write(ref _flushInProgress, 0);
+
+ return default;
+ }
+
+ if (Interlocked.Exchange(ref _flushArmed, 1) == 1)
+ {
+ throw new InvalidOperationException("FlushAsync already armed.");
+ }
+
+ _flushSignal.Reset();
+ WriteInFlight = target;
+
+ int gen = Volatile.Read(ref _generation);
+
+ _reactor.EnqueueFlush(ClientFd);
+
+ // Race recovery (mirrors ReadAsync): if close raced in after the guard above,
+ // self-complete so we don't hang waiting on a send the reactor will never make.
+ if (Volatile.Read(ref _closed) == 1 && Interlocked.Exchange(ref _flushArmed, 0) == 1)
+ {
+ Volatile.Write(ref _flushInProgress, 0);
+ _flushSignal.SetResult(true);
+ }
+
+ return new ValueTask(this, (short)gen);
+ }
+
+ // Signal the FlushAsync was completed, called by the reactor's dispatcher send branch
+ internal void CompleteFlush()
+ {
+ WriteHead = 0;
+ WriteTail = 0;
+ WriteInFlight = 0;
+ Volatile.Write(ref _flushInProgress, 0);
+ Interlocked.Exchange(ref _flushArmed, 0);
+
+ _flushSignal.SetResult(true);
+ }
+
+ // IValueTaskSource
+#region IValueTaskSource
+
+ void IValueTaskSource.GetResult(short token)
+ {
+ if (token != (short)Volatile.Read(ref _generation))
+ {
+ return;
+ }
+
+ _flushSignal.GetResult(_flushSignal.Version);
+ }
+
+ ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
+ {
+ if (token != (short)Volatile.Read(ref _generation))
+ {
+ return ValueTaskSourceStatus.Succeeded;
+ }
+
+ return _flushSignal.GetStatus(_flushSignal.Version);
+ }
+
+ void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
+ {
+ if (token != (short)Volatile.Read(ref _generation))
+ {
+ continuation(state);
+
+ return;
+ }
+ _flushSignal.OnCompleted(continuation, state, _flushSignal.Version, flags);
+ }
+
+#endregion
+}
\ No newline at end of file
diff --git a/frameworks/minima/Connection/Connection.cs b/frameworks/minima/Connection/Connection.cs
new file mode 100644
index 00000000..b6b3a064
--- /dev/null
+++ b/frameworks/minima/Connection/Connection.cs
@@ -0,0 +1,108 @@
+using System.Runtime.InteropServices;
+using Minima.Utils;
+
+namespace Minima;
+
+public sealed unsafe partial class Connection
+{
+ private readonly Reactor _reactor;
+
+ public int ClientFd { get; private set; }
+
+ // Bumped on Clear(); the low 16 bits are used as the IVTS token so stale
+ // awaiters can be detected after pool reuse.
+ private int _generation;
+
+ // Refcount: the connection has two owners — the reactor (recv side) and the
+ // handler (which may run off-reactor). Init to 2 on accept; each owner DecRef's
+ // when done; teardown (Recycle) runs only at refs==0, so a connection is never
+ // recycled or pool-reused while a handler is still in flight on another thread.
+ private int _refs;
+
+ public Connection(Reactor reactor, int fd, int writeSlabSize = 1024 * 16)
+ {
+ _reactor = reactor;
+ ClientFd = fd;
+ _writeSlabSize = writeSlabSize;
+ WriteBuffer = (byte*)NativeMemory.AlignedAlloc((nuint)writeSlabSize, 64);
+
+ _manager = new UnmanagedMemoryManager(WriteBuffer, writeSlabSize);
+ }
+
+ // =========================================================================
+ // Pool lifecycle — invoked from Reactor.Dispatch's recv/send error paths.
+ // Reactor-thread only.
+ //
+ // teardown: MarkClosed() → wake awaiters with closed=1
+ // DrainRecv() → return any in-flight buf_ring items
+ // close(fd)
+ // Clear() → reset state, bump _generation
+ // push to pool, OR Dispose() if pool is full
+ // =========================================================================
+
+ public void MarkClosed()
+ {
+ Volatile.Write(ref _closed, 1);
+
+ if (Interlocked.Exchange(ref _armed, 0) == 1)
+ {
+ _readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), isClosed: true));
+ }
+ else
+ {
+ Volatile.Write(ref _pending, 1);
+ }
+
+ if (Interlocked.Exchange(ref _flushArmed, 0) == 1)
+ {
+ Volatile.Write(ref _flushInProgress, 0);
+ _flushSignal.SetResult(true);
+ }
+ }
+
+ // Init to 2 (reactor + handler) at accept.
+ internal void InitRefs() => Volatile.Write(ref _refs, 2);
+
+ // Release one owner's ref. Whoever drives it to 0 hands the connection to the
+ // reactor for teardown (close + Clear + pool) — never recycled before both done.
+ internal void DecRef()
+ {
+ if (Interlocked.Decrement(ref _refs) == 0)
+ {
+ _reactor.EnqueueRecycle(this);
+ }
+ }
+
+ internal void Clear()
+ {
+ // Bump generation first — readers of IVTS plumbing observe this via
+ // Volatile.Read and stale tokens get RecvSnapshot.Closed() / no-op.
+ Interlocked.Increment(ref _generation);
+
+ Volatile.Write(ref _armed, 0);
+ Volatile.Write(ref _pending, 0);
+ Volatile.Write(ref _closed, 0);
+ Volatile.Write(ref _flushArmed, 0);
+ Volatile.Write(ref _flushInProgress, 0);
+
+ WriteHead = 0;
+ WriteTail = 0;
+ WriteInFlight = 0;
+
+ _readSignal.Reset();
+ _flushSignal.Reset();
+
+ _recv.Reset(); // discard any leftover SPSC items
+ IncrementalMode = false; // per-conn ring (if any) was torn down before Clear
+ }
+
+ public void Dispose()
+ {
+ if (WriteBuffer != null)
+ {
+ NativeMemory.AlignedFree(WriteBuffer);
+ WriteBuffer = null;
+ }
+ DisposeIncremental();
+ }
+}
\ No newline at end of file
diff --git a/frameworks/minima/Connection/ConnectionDualPipe.cs b/frameworks/minima/Connection/ConnectionDualPipe.cs
new file mode 100644
index 00000000..7b40e742
--- /dev/null
+++ b/frameworks/minima/Connection/ConnectionDualPipe.cs
@@ -0,0 +1,16 @@
+using System.IO.Pipelines;
+
+namespace Minima;
+
+public sealed class ConnectionDualPipe : IDuplexPipe
+{
+ public PipeReader Input { get; }
+ public PipeWriter Output { get; }
+
+ public ConnectionDualPipe(Connection connection)
+ {
+ ArgumentNullException.ThrowIfNull(connection);
+ Input = new ConnectionPipeReader(connection);
+ Output = new ConnectionPipeWriter(connection);
+ }
+}
\ No newline at end of file
diff --git a/frameworks/minima/Connection/ConnectionPipeReader.cs b/frameworks/minima/Connection/ConnectionPipeReader.cs
new file mode 100644
index 00000000..14d9ca6c
--- /dev/null
+++ b/frameworks/minima/Connection/ConnectionPipeReader.cs
@@ -0,0 +1,181 @@
+using System.Buffers;
+using System.IO.Pipelines;
+using Minima.Utils;
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima;
+
+///
+/// Adapts Minima's raw read API (ReadAsync + TryGetItem
+/// + ReturnBuffer) to a standard . Recv buffers are
+/// exposed zero-copy as a ReadOnlySequence<byte> (one segment per buffer)
+/// and held until AdvanceTo consumes them, at which point fully-consumed buffers
+/// are returned to the reactor.
+///
+/// Convenience/compat layer for PipeReader consumers — the raw ReadAsync/
+/// TryGetItem path stays the faster one (this adds held-buffer + sequence
+/// bookkeeping per read).
+///
+public sealed class ConnectionPipeReader : PipeReader
+{
+ private readonly Connection _conn;
+ private readonly List _held = new(16);
+ private ReadOnlySequence _lastSequence;
+
+ private bool _completed;
+ private bool _cancelRequested;
+ private bool _connectionClosed;
+
+ private readonly struct Held
+ {
+ public readonly ReadOnlyMemory Memory;
+ public readonly SpscRecvRing.Item Item;
+
+ public Held(ReadOnlyMemory memory, SpscRecvRing.Item item)
+ {
+ Memory = memory;
+ Item = item;
+ }
+
+ public Held WithMemory(ReadOnlyMemory memory) => new(memory, Item);
+ }
+
+ public ConnectionPipeReader(Connection connection)
+ {
+ _conn = connection ?? throw new ArgumentNullException(nameof(connection));
+ }
+
+ public override async ValueTask ReadAsync(CancellationToken cancellationToken = default)
+ {
+ ThrowIfCompleted();
+
+ if (_cancelRequested)
+ {
+ _cancelRequested = false;
+ return new ReadResult(BuildSequence(), isCanceled: true, isCompleted: _connectionClosed);
+ }
+
+ // Anything still held from a previous read that wasn't fully consumed.
+ if (_held.Count > 0)
+ return new ReadResult(BuildSequence(), isCanceled: false, isCompleted: _connectionClosed);
+
+ if (_connectionClosed)
+ return new ReadResult(default, isCanceled: false, isCompleted: true);
+
+ RecvSnapshot snap = await _conn.ReadAsync();
+
+ while (_conn.TryGetItem(snap, out SpscRecvRing.Item item))
+ {
+ if (item.HasBuffer)
+ _held.Add(new Held(item.AsMemoryManager().Memory, item));
+ }
+
+ _conn.ResetRead();
+
+ if (snap.IsClosed)
+ _connectionClosed = true;
+
+ if (_cancelRequested)
+ {
+ _cancelRequested = false;
+ return new ReadResult(BuildSequence(), isCanceled: true, isCompleted: _connectionClosed);
+ }
+
+ return new ReadResult(BuildSequence(), isCanceled: false, isCompleted: _connectionClosed);
+ }
+
+ public override bool TryRead(out ReadResult result)
+ {
+ ThrowIfCompleted();
+
+ if (_held.Count > 0)
+ {
+ result = new ReadResult(BuildSequence(), isCanceled: false, isCompleted: _connectionClosed);
+ return true;
+ }
+
+ if (_connectionClosed)
+ {
+ result = new ReadResult(default, isCanceled: false, isCompleted: true);
+ return true;
+ }
+
+ result = default;
+ return false;
+ }
+
+ public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed);
+
+ public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
+ {
+ if (_held.Count == 0)
+ return;
+
+ long consumedBytes = _lastSequence.Slice(0, consumed).Length;
+
+ while (_held.Count > 0 && consumedBytes > 0)
+ {
+ Held seg = _held[0];
+ int available = seg.Memory.Length;
+
+ if (consumedBytes >= available)
+ {
+ // Whole buffer consumed — return it to the reactor.
+ _conn.ReturnBuffer(seg.Item);
+ _held.RemoveAt(0);
+ consumedBytes -= available;
+ }
+ else
+ {
+ // Partial — keep the unconsumed tail of this buffer.
+ _held[0] = seg.WithMemory(seg.Memory[(int)consumedBytes..]);
+ consumedBytes = 0;
+ }
+ }
+ }
+
+ public override void CancelPendingRead() => _cancelRequested = true;
+
+ public override void Complete(Exception? exception = null)
+ {
+ if (_completed)
+ return;
+
+ _completed = true;
+
+ for (int i = 0; i < _held.Count; i++)
+ _conn.ReturnBuffer(_held[i].Item);
+
+ _held.Clear();
+ }
+
+ private ReadOnlySequence BuildSequence()
+ {
+ if (_held.Count == 0)
+ {
+ _lastSequence = default;
+ return _lastSequence;
+ }
+
+ if (_held.Count == 1)
+ {
+ _lastSequence = new ReadOnlySequence(_held[0].Memory);
+ return _lastSequence;
+ }
+
+ var head = new RingSegment(_held[0].Memory, _held[0].Item.Bid);
+ RingSegment tail = head;
+
+ for (int i = 1; i < _held.Count; i++)
+ tail = tail.Append(_held[i].Memory, _held[i].Item.Bid);
+
+ _lastSequence = new ReadOnlySequence(head, 0, tail, tail.Memory.Length);
+ return _lastSequence;
+ }
+
+ private void ThrowIfCompleted()
+ {
+ if (_completed)
+ throw new InvalidOperationException("Reading is not allowed after the reader was completed.");
+ }
+}
diff --git a/frameworks/minima/Connection/ConnectionPipeWriter.cs b/frameworks/minima/Connection/ConnectionPipeWriter.cs
new file mode 100644
index 00000000..56be337d
--- /dev/null
+++ b/frameworks/minima/Connection/ConnectionPipeWriter.cs
@@ -0,0 +1,63 @@
+using System.IO.Pipelines;
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima;
+
+///
+/// Adapts Minima's write API (GetMemory/GetSpan/Advance/
+/// FlushAsync) to a standard , so PipeWriter-based code
+/// can write responses through the connection's per-connection slab.
+/// A thin wrapper — all the work lives in Connection.
+///
+public sealed class ConnectionPipeWriter : PipeWriter
+{
+ private readonly Connection _conn;
+ private bool _completed;
+ private bool _cancelRequested;
+ private long _unflushed;
+
+ public ConnectionPipeWriter(Connection connection)
+ {
+ _conn = connection ?? throw new ArgumentNullException(nameof(connection));
+ }
+
+ public override bool CanGetUnflushedBytes => true;
+ public override long UnflushedBytes => _unflushed;
+
+ public override Memory GetMemory(int sizeHint = 0) => _conn.GetMemory(sizeHint);
+
+ public override Span GetSpan(int sizeHint = 0) => _conn.GetSpan(sizeHint);
+
+ public override void Advance(int bytes)
+ {
+ _unflushed += bytes;
+ _conn.Advance(bytes);
+ }
+
+ public override ValueTask FlushAsync(CancellationToken cancellationToken = default)
+ {
+ if (_cancelRequested)
+ {
+ _cancelRequested = false;
+ return new ValueTask(new FlushResult(isCanceled: true, isCompleted: _completed));
+ }
+
+ _unflushed = 0;
+ ValueTask inner = _conn.FlushAsync();
+
+ if (inner.IsCompletedSuccessfully)
+ return new ValueTask(new FlushResult(isCanceled: false, isCompleted: _completed));
+
+ return AwaitFlush(inner);
+ }
+
+ private async ValueTask AwaitFlush(ValueTask inner)
+ {
+ await inner;
+ return new FlushResult(isCanceled: false, isCompleted: _completed);
+ }
+
+ public override void CancelPendingFlush() => _cancelRequested = true;
+
+ public override void Complete(Exception? exception = null) => _completed = true;
+}
diff --git a/frameworks/minima/Connection/RecvSnapshot.cs b/frameworks/minima/Connection/RecvSnapshot.cs
new file mode 100644
index 00000000..015afc4a
--- /dev/null
+++ b/frameworks/minima/Connection/RecvSnapshot.cs
@@ -0,0 +1,15 @@
+namespace Minima;
+
+public readonly struct RecvSnapshot
+{
+ public readonly long Tail;
+ public readonly bool IsClosed;
+
+ public RecvSnapshot(long tail, bool isClosed)
+ {
+ Tail = tail;
+ IsClosed = isClosed;
+ }
+
+ public static RecvSnapshot Closed() => new(0, isClosed: true);
+}
\ No newline at end of file
diff --git a/frameworks/minima/Dockerfile b/frameworks/minima/Dockerfile
new file mode 100644
index 00000000..49948ab0
--- /dev/null
+++ b/frameworks/minima/Dockerfile
@@ -0,0 +1,15 @@
+FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build
+WORKDIR /source
+COPY minima.csproj ./
+RUN dotnet restore
+COPY . .
+RUN dotnet publish -c Release --no-self-contained -o /app/out
+
+# Minima drives io_uring through direct libc syscalls (no liburing). The bench
+# harness runs containers with --security-opt seccomp=unconfined (required for
+# io_uring_setup/enter); engine="io_uring" makes validate.sh enable it too.
+FROM mcr.microsoft.com/dotnet/runtime:10.0
+WORKDIR /app
+COPY --from=build /app/out ./
+EXPOSE 8080
+ENTRYPOINT ["dotnet", "minima.dll"]
diff --git a/frameworks/minima/Program.cs b/frameworks/minima/Program.cs
new file mode 100644
index 00000000..403b3c88
--- /dev/null
+++ b/frameworks/minima/Program.cs
@@ -0,0 +1,559 @@
+using System.Buffers.Text;
+using System.Text;
+using System.Text.Json;
+using Minima.Utils;
+
+namespace Minima;
+
+///
+/// minima — the Minima io_uring engine serving the H1-isolated profiles
+/// (baseline, pipelined, limited-conn). Minima's engine is vendored unchanged
+/// (per-reactor SO_REUSEPORT + multishot accept, multishot recv into a provided
+/// buffer ring, RCA=false inline continuations + the reactor-thread short-circuit
+/// in Enqueue*). Only the request handler is ours: a hand-rolled HTTP/1.1 parser
+/// on Minima's raw recv/send API. No HTTP framework.
+///
+/// Endpoints:
+/// GET/POST /baseline11?a=&b= -> text/plain "a + b (+ body)"
+/// GET /pipeline -> text/plain "ok"
+/// GET /json/{count}?m=N -> application/json, per-item total = price*quantity*N
+///
+internal static class Program
+{
+ private static int Main()
+ {
+ int reactors = Environment.ProcessorCount;
+ if (int.TryParse(Environment.GetEnvironmentVariable("MINIMA_REACTORS"), out int r) && r > 0)
+ reactors = r;
+
+ ushort port = 8080;
+ if (ushort.TryParse(Environment.GetEnvironmentVariable("MINIMA_PORT"), out ushort p) && p > 0)
+ port = p;
+
+ var config = new ServerConfig
+ {
+ Port = port,
+ ReactorCount = reactors,
+ UsePipe = false,
+ Incremental = false,
+ RecvBufferSize = 16 * 1024,
+ BufferRingEntries = 1024,
+ };
+
+ Console.WriteLine($"[minima] {config.ReactorCount} reactors on :{config.Port} " +
+ $"(incremental={config.Incremental}) — hand-rolled HTTP/1.1");
+
+ var dsPath = Environment.GetEnvironmentVariable("MINIMA_DATASET") ?? "/data/dataset.json";
+ var dataset = Dataset.Load(dsPath);
+ Console.WriteLine($"[minima] loaded {dataset.Count} dataset items from {dsPath}");
+
+ Handler.Init(config, dataset);
+
+ var threads = new Thread[config.ReactorCount];
+ for (int i = 0; i < config.ReactorCount; i++)
+ {
+ var reactor = new Reactor(i, config);
+ threads[i] = new Thread(reactor.Run) { Name = $"reactor-{i}", IsBackground = false };
+ threads[i].Start();
+ }
+ foreach (var t in threads) t.Join();
+ return 0;
+ }
+}
+
+internal static class Handler
+{
+ private static int _slab = 16 * 1024;
+ private static Dataset _ds = Dataset.Empty;
+
+ public static void Init(ServerConfig config, Dataset ds)
+ {
+ _slab = config.WriteSlabSize;
+ _ds = ds;
+ }
+
+ public static async Task HandleAsync(Reactor reactor, Connection conn)
+ {
+ var s = new HttpSession(_ds);
+ try
+ {
+ while (true)
+ {
+ RecvSnapshot snap = await conn.ReadAsync();
+ while (conn.TryGetItem(snap, out SpscRecvRing.Item item))
+ {
+ if (item.HasBuffer)
+ {
+ s.Feed(item.AsSpan());
+ conn.ReturnBuffer(in item);
+ }
+ }
+
+ int sent = 0;
+ while (sent < s.OutLen)
+ {
+ int chunk = Math.Min(s.OutLen - sent, _slab);
+ conn.Write(s.Out.AsSpan(sent, chunk));
+ await conn.FlushAsync();
+ sent += chunk;
+ }
+ s.OutLen = 0;
+
+ if (snap.IsClosed || s.WantClose)
+ return;
+
+ conn.ResetRead();
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"[r{reactor.Id}] http handler crash fd={conn.ClientFd}: {ex}");
+ }
+ finally
+ {
+ conn.DecRef();
+ }
+ }
+
+ public static Task HandlePipeAsync(Reactor reactor, Connection conn) => HandleAsync(reactor, conn);
+}
+
+///
+/// Hand-rolled HTTP/1.1: accumulates inbound bytes, parses complete requests
+/// (request line, headers, Content-Length + chunked bodies, keep-alive,
+/// pipelining, fragmented reads), and appends responses to .
+///
+internal sealed class HttpSession
+{
+ private readonly Dataset _ds;
+ private byte[] _carry = new byte[2048];
+ private int _carryLen;
+
+ public byte[] Out = new byte[4096];
+ public int OutLen;
+ public bool WantClose;
+
+ public HttpSession(Dataset ds) => _ds = ds;
+
+ public void Feed(ReadOnlySpan data)
+ {
+ AppendCarry(data);
+ int pos = 0;
+ while (TryOne(_carry.AsSpan(pos, _carryLen - pos), out int consumed, out bool close))
+ {
+ pos += consumed;
+ if (close) { WantClose = true; break; }
+ }
+ if (pos > 0)
+ {
+ int rem = _carryLen - pos;
+ if (rem > 0) Array.Copy(_carry, pos, _carry, 0, rem);
+ _carryLen = rem;
+ }
+ }
+
+ /// Parse one request from buf; append its response to Out. Returns false if
+ /// the request isn't fully buffered yet.
+ private bool TryOne(ReadOnlySpan buf, out int consumed, out bool close)
+ {
+ consumed = 0;
+ close = false;
+
+ int he = buf.IndexOf("\r\n\r\n"u8);
+ if (he < 0) return false;
+ ReadOnlySpan head = buf[..he];
+
+ int rlEnd = head.IndexOf("\r\n"u8);
+ if (rlEnd < 0) rlEnd = head.Length;
+ ReadOnlySpan reqLine = head[..rlEnd];
+
+ ReadOnlySpan target = default;
+ int sp1 = reqLine.IndexOf((byte)' ');
+ if (sp1 >= 0)
+ {
+ ReadOnlySpan rest = reqLine[(sp1 + 1)..];
+ int sp2 = rest.IndexOf((byte)' ');
+ target = sp2 >= 0 ? rest[..sp2] : rest;
+ }
+
+ int contentLength = -1;
+ bool chunked = false;
+ ReadOnlySpan hdrs = head[Math.Min(rlEnd + 2, head.Length)..];
+ while (hdrs.Length > 0)
+ {
+ int nl = hdrs.IndexOf("\r\n"u8);
+ ReadOnlySpan line = nl >= 0 ? hdrs[..nl] : hdrs;
+ int colon = line.IndexOf((byte)':');
+ if (colon >= 0)
+ {
+ ReadOnlySpan name = line[..colon];
+ ReadOnlySpan val = Trim(line[(colon + 1)..]);
+ if (CiEq(name, "content-length"u8))
+ {
+ if (Utf8Parser.TryParse(val, out int cl, out _)) contentLength = cl;
+ }
+ else if (CiEq(name, "transfer-encoding"u8) && CiContains(val, "chunked"u8))
+ {
+ chunked = true;
+ }
+ else if (CiEq(name, "connection"u8) && CiEq(val, "close"u8))
+ {
+ close = true;
+ }
+ }
+ if (nl < 0) break;
+ hdrs = hdrs[(nl + 2)..];
+ }
+
+ int bodyStart = he + 4;
+ long bodyInt;
+ int total;
+ if (chunked)
+ {
+ if (!DecodeChunked(buf[bodyStart..], out bodyInt, out int used)) return false;
+ total = bodyStart + used;
+ }
+ else if (contentLength > 0)
+ {
+ if (buf.Length < bodyStart + contentLength) return false;
+ bodyInt = ParseLoose(buf.Slice(bodyStart, contentLength));
+ total = bodyStart + contentLength;
+ }
+ else
+ {
+ bodyInt = 0;
+ total = bodyStart;
+ }
+
+ Respond(target, bodyInt, close);
+ consumed = total;
+ return true;
+ }
+
+ private void Respond(ReadOnlySpan target, long bodyInt, bool close)
+ {
+ int q = target.IndexOf((byte)'?');
+ ReadOnlySpan path = q >= 0 ? target[..q] : target;
+ ReadOnlySpan query = q >= 0 ? target[(q + 1)..] : default;
+
+ if (path.SequenceEqual("/pipeline"u8))
+ {
+ WriteResp("ok"u8, close);
+ }
+ else if (path.StartsWith("/json/"u8))
+ {
+ ReadOnlySpan tail = path[6..];
+ if (Utf8Parser.TryParse(tail, out int count, out int used) && used == tail.Length
+ && count >= 1 && count <= _ds.Count)
+ {
+ JsonResp(count, ParseM(query), close);
+ }
+ else
+ {
+ Write404(close);
+ }
+ }
+ else
+ {
+ long sum = SumAB(query) + bodyInt;
+ Span num = stackalloc byte[24];
+ Utf8Formatter.TryFormat(sum, num, out int n);
+ WriteResp(num[..n], close);
+ }
+ }
+
+ private void WriteResp(ReadOnlySpan body, bool close)
+ {
+ AppendOut("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: "u8);
+ Span num = stackalloc byte[16];
+ Utf8Formatter.TryFormat(body.Length, num, out int n);
+ AppendOut(num[..n]);
+ AppendOut(close ? "\r\nConnection: close\r\n\r\n"u8 : "\r\n\r\n"u8);
+ AppendOut(body);
+ }
+
+ private void JsonResp(int count, long m, bool close)
+ {
+ AppendOut("HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: "u8);
+ int clOff = OutLen;
+ AppendOut("000000\r\n"u8); // 6-digit zero-padded Content-Length placeholder
+ if (close) AppendOut("Connection: close\r\n"u8);
+ AppendOut("\r\n"u8);
+ int bodyStart = OutLen;
+
+ // Serialize from the parsed model on every request — no precomputed
+ // fragments. Synchronous on the reactor thread (a few microseconds of
+ // CPU): offloading to the thread pool would add a hop and resume the
+ // handler off-reactor, defeating the RCA short-circuit on Flush.
+ AppendOut("{\"items\":["u8);
+ for (int i = 0; i < count; i++)
+ {
+ if (i > 0) AppendOut(","u8);
+ ref readonly Item it = ref _ds.Items[i];
+ AppendOut("{\"id\":"u8);
+ AppendLong(it.Id);
+ AppendOut(",\"name\":\""u8);
+ AppendOut(it.Name);
+ AppendOut("\",\"category\":\""u8);
+ AppendOut(it.Category);
+ AppendOut("\",\"price\":"u8);
+ AppendLong(it.Price);
+ AppendOut(",\"quantity\":"u8);
+ AppendLong(it.Quantity);
+ AppendOut(it.Active ? ",\"active\":true,\"tags\":["u8 : ",\"active\":false,\"tags\":["u8);
+ for (int t = 0; t < it.Tags.Length; t++)
+ {
+ if (t > 0) AppendOut(","u8);
+ AppendOut("\""u8);
+ AppendOut(it.Tags[t]);
+ AppendOut("\""u8);
+ }
+ AppendOut("],\"rating\":{\"score\":"u8);
+ AppendLong(it.Score);
+ AppendOut(",\"count\":"u8);
+ AppendLong(it.RatingCount);
+ AppendOut("},\"total\":"u8);
+ AppendLong(it.Price * it.Quantity * m);
+ AppendOut("}"u8);
+ }
+ AppendOut("],\"count\":"u8);
+ AppendLong(count);
+ AppendOut("}"u8);
+
+ // Backfill the 6-digit zero-padded Content-Length now the body length is known.
+ int v = OutLen - bodyStart;
+ for (int d = clOff + 5; d >= clOff; d--) { Out[d] = (byte)('0' + v % 10); v /= 10; }
+ }
+
+ private void Write404(bool close)
+ {
+ AppendOut("HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 9\r\n"u8);
+ if (close) AppendOut("Connection: close\r\n"u8);
+ AppendOut("\r\nNot Found"u8);
+ }
+
+ private void AppendLong(long v)
+ {
+ Span num = stackalloc byte[20];
+ Utf8Formatter.TryFormat(v, num, out int n);
+ AppendOut(num[..n]);
+ }
+
+ private static long ParseM(ReadOnlySpan query)
+ {
+ while (query.Length > 0)
+ {
+ int amp = query.IndexOf((byte)'&');
+ ReadOnlySpan kv = amp >= 0 ? query[..amp] : query;
+ if (kv.Length >= 2 && kv[0] == (byte)'m' && kv[1] == (byte)'=')
+ {
+ Utf8Parser.TryParse(kv[2..], out long m, out _);
+ return m;
+ }
+ if (amp < 0) break;
+ query = query[(amp + 1)..];
+ }
+ return 1;
+ }
+
+ private static long SumAB(ReadOnlySpan query)
+ {
+ long a = 0, b = 0;
+ while (query.Length > 0)
+ {
+ int amp = query.IndexOf((byte)'&');
+ ReadOnlySpan kv = amp >= 0 ? query[..amp] : query;
+ int eq = kv.IndexOf((byte)'=');
+ if (eq >= 0)
+ {
+ ReadOnlySpan k = kv[..eq];
+ ReadOnlySpan v = kv[(eq + 1)..];
+ if (k.SequenceEqual("a"u8)) a = ParseLoose(v);
+ else if (k.SequenceEqual("b"u8)) b = ParseLoose(v);
+ }
+ if (amp < 0) break;
+ query = query[(amp + 1)..];
+ }
+ return a + b;
+ }
+
+ /// Decode a chunked body into an integer. Returns false if the terminating
+ /// 0-chunk isn't fully buffered. Bodies in these profiles are tiny.
+ private static bool DecodeChunked(ReadOnlySpan buf, out long bodyInt, out int used)
+ {
+ bodyInt = 0;
+ used = 0;
+ Span body = stackalloc byte[256];
+ int blen = 0;
+ int pos = 0;
+ while (true)
+ {
+ int nl = buf[pos..].IndexOf("\r\n"u8);
+ if (nl < 0) return false;
+ if (!ParseHex(buf.Slice(pos, nl), out int size)) return false;
+ pos += nl + 2;
+ if (size == 0)
+ {
+ int end = buf[pos..].IndexOf("\r\n"u8); // final CRLF (no trailers)
+ if (end < 0) return false;
+ used = pos + end + 2;
+ bodyInt = ParseLoose(body[..blen]);
+ return true;
+ }
+ if (buf.Length < pos + size + 2) return false;
+ if (blen + size <= body.Length)
+ {
+ buf.Slice(pos, size).CopyTo(body[blen..]);
+ blen += size;
+ }
+ pos += size;
+ if (!buf.Slice(pos, 2).SequenceEqual("\r\n"u8)) return false;
+ pos += 2;
+ }
+ }
+
+ // ── byte helpers ─────────────────────────────────────────────────────────
+ private void AppendCarry(ReadOnlySpan d)
+ {
+ if (_carry.Length < _carryLen + d.Length)
+ Array.Resize(ref _carry, Math.Max(_carryLen + d.Length, _carry.Length * 2));
+ d.CopyTo(_carry.AsSpan(_carryLen));
+ _carryLen += d.Length;
+ }
+
+ private void AppendOut(ReadOnlySpan d)
+ {
+ if (Out.Length < OutLen + d.Length)
+ Array.Resize(ref Out, Math.Max(OutLen + d.Length, Out.Length * 2));
+ d.CopyTo(Out.AsSpan(OutLen));
+ OutLen += d.Length;
+ }
+
+ private static ReadOnlySpan Trim(ReadOnlySpan b)
+ {
+ int s = 0, e = b.Length;
+ while (s < e && (b[s] == (byte)' ' || b[s] == (byte)'\t')) s++;
+ while (e > s && (b[e - 1] == (byte)' ' || b[e - 1] == (byte)'\t')) e--;
+ return b[s..e];
+ }
+
+ private static bool CiEq(ReadOnlySpan a, ReadOnlySpan b)
+ {
+ if (a.Length != b.Length) return false;
+ for (int i = 0; i < a.Length; i++)
+ if (Lower(a[i]) != Lower(b[i])) return false;
+ return true;
+ }
+
+ private static bool CiContains(ReadOnlySpan h, ReadOnlySpan n)
+ {
+ if (n.Length == 0 || h.Length < n.Length) return false;
+ for (int i = 0; i + n.Length <= h.Length; i++)
+ if (CiEq(h.Slice(i, n.Length), n)) return true;
+ return false;
+ }
+
+ private static byte Lower(byte c) => (byte)(c >= 'A' && c <= 'Z' ? c + 32 : c);
+
+ private static long ParseLoose(ReadOnlySpan s)
+ {
+ int i = 0;
+ while (i < s.Length && (s[i] == ' ' || s[i] == '\t' || s[i] == '\r' || s[i] == '\n')) i++;
+ bool neg = false;
+ if (i < s.Length && s[i] == '-') { neg = true; i++; }
+ long n = 0;
+ while (i < s.Length && s[i] >= '0' && s[i] <= '9') { n = n * 10 + (s[i] - '0'); i++; }
+ return neg ? -n : n;
+ }
+
+ private static bool ParseHex(ReadOnlySpan b, out int val)
+ {
+ val = 0;
+ bool any = false;
+ foreach (byte c in b)
+ {
+ int d;
+ if (c >= '0' && c <= '9') d = c - '0';
+ else if (c >= 'a' && c <= 'f') d = c - 'a' + 10;
+ else if (c >= 'A' && c <= 'F') d = c - 'A' + 10;
+ else if (c == ';' || c == ' ') break;
+ else return any;
+ val = val * 16 + d;
+ any = true;
+ }
+ return any;
+ }
+}
+
+///
+/// A dataset item parsed into its model fields (string values stored as UTF-8).
+/// The json handler serializes these field-by-field on every request.
+///
+internal readonly struct Item
+{
+ public readonly long Id, Price, Quantity, Score, RatingCount;
+ public readonly bool Active;
+ public readonly byte[] Name, Category;
+ public readonly byte[][] Tags;
+
+ public Item(long id, byte[] name, byte[] category, long price, long quantity,
+ bool active, byte[][] tags, long score, long ratingCount)
+ {
+ Id = id; Name = name; Category = category; Price = price; Quantity = quantity;
+ Active = active; Tags = tags; Score = score; RatingCount = ratingCount;
+ }
+}
+
+///
+/// Dataset for the json profile — items parsed into model fields at startup so
+/// the handler serializes the full JSON from the model on every request (no
+/// precomputed / cached response fragments). Read-only after load, shared across
+/// reactor threads. String values are clean ASCII in the bench dataset, so the
+/// handler emits them without escaping.
+///
+internal sealed class Dataset
+{
+ public readonly Item[] Items;
+ public int Count => Items.Length;
+
+ public static readonly Dataset Empty = new(Array.Empty- ());
+
+ private Dataset(Item[] items) { Items = items; }
+
+ public static Dataset Load(string path)
+ {
+ try
+ {
+ using var doc = JsonDocument.Parse(File.ReadAllBytes(path));
+ JsonElement root = doc.RootElement;
+ int n = root.GetArrayLength();
+ var items = new Item[n];
+ int i = 0;
+ foreach (JsonElement e in root.EnumerateArray())
+ {
+ JsonElement rating = e.GetProperty("rating");
+ JsonElement tagsEl = e.GetProperty("tags");
+ var tags = new byte[tagsEl.GetArrayLength()][];
+ int t = 0;
+ foreach (JsonElement tag in tagsEl.EnumerateArray())
+ tags[t++] = Encoding.UTF8.GetBytes(tag.GetString() ?? "");
+ items[i++] = new Item(
+ e.GetProperty("id").GetInt64(),
+ Encoding.UTF8.GetBytes(e.GetProperty("name").GetString() ?? ""),
+ Encoding.UTF8.GetBytes(e.GetProperty("category").GetString() ?? ""),
+ e.GetProperty("price").GetInt64(),
+ e.GetProperty("quantity").GetInt64(),
+ e.GetProperty("active").GetBoolean(),
+ tags,
+ rating.GetProperty("score").GetInt64(),
+ rating.GetProperty("count").GetInt64());
+ }
+ return new Dataset(items);
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"[minima] dataset load failed ({path}): {ex.Message}");
+ return Empty;
+ }
+ }
+}
diff --git a/frameworks/minima/README.md b/frameworks/minima/README.md
new file mode 100644
index 00000000..e2f10a56
--- /dev/null
+++ b/frameworks/minima/README.md
@@ -0,0 +1,32 @@
+# minima
+
+The **Minima** io_uring engine serving the H1-isolated profiles (`baseline`,
+`pipelined`, `limited-conn`, `json`). Minima is a from-scratch C# multi-reactor
+io_uring server; this entry vendors the engine unchanged and adds a hand-rolled
+HTTP/1.1 handler (no HTTP framework).
+
+## Engine (vendored as-is)
+- **Per-reactor SO_REUSEPORT + multishot accept** — each reactor thread owns its
+ own listener and ring; the kernel shards connections (no central acceptor).
+- **Multishot recv into a provided buffer ring**.
+- **RCA=false inline continuations** — handler resumes inline on the reactor
+ thread, and `Enqueue{Return,Flush,Recycle}` short-circuit straight to the ring
+ (no MPSC queue / eventfd wake) since the caller is the reactor thread.
+
+## Handler (`Program.cs`)
+Hand-rolled HTTP/1.1: request line + headers, `Content-Length` and chunked
+bodies, keep-alive, pipelining (responses batched per drain), and fragmented-read
+reassembly.
+
+| Endpoint | Response |
+|---|---|
+| `GET/POST /baseline11?a=&b=` | `text/plain` — `a + b` (+ POST body as an integer) |
+| `GET /pipeline` | `text/plain` — `ok` |
+| `GET /json/{count}?m=N` | `application/json` — `{items:[…],count}`, each item with `total = price*quantity*N` |
+
+For `json`, each item's static JSON is precomputed from the mounted
+`/data/dataset.json` at startup; a request only appends the dynamic `total`.
+
+io_uring needs `seccomp=unconfined` (harness-provided; `engine: "io_uring"` makes
+validate.sh enable it). `MINIMA_PORT` / `MINIMA_REACTORS` / `MINIMA_DATASET`
+override for local runs.
diff --git a/frameworks/minima/Reactor/Reactor.Incremental.cs b/frameworks/minima/Reactor/Reactor.Incremental.cs
new file mode 100644
index 00000000..13b13518
--- /dev/null
+++ b/frameworks/minima/Reactor/Reactor.Incremental.cs
@@ -0,0 +1,306 @@
+using System.Runtime.InteropServices;
+using Minima.Utils;
+using static Minima.Native;
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima;
+
+///
+/// Incremental-buffer (IOU_PBUF_RING_INC) path. Each connection gets its own
+/// buffer ring: one buffer accumulates that connection's byte stream across many
+/// recvs, so buffers are recycled only when the kernel is done appending AND the
+/// handler has returned every slice it was handed. Selected per reactor by the
+/// `_incremental` flag; the shared-ring path in Reactor.cs is untouched.
+///
+public sealed unsafe partial class Reactor
+{
+ private Stack? _freeGids;
+ private Mpsc? _returnQInc;
+
+ private void InitIncremental()
+ {
+ // Per-connection rings; no shared ring. GID 1 reserved; per-conn GIDs 2..MaxConnections+1.
+ _freeGids = new Stack(MaxConnections);
+ for (int g = MaxConnections + 1; g >= 2; g--)
+ _freeGids.Push((ushort)g);
+
+ _returnQInc = new Mpsc(1 << 16);
+ }
+
+ private ushort AllocGid() => _freeGids!.Pop();
+ private void FreeGid(ushort gid) => _freeGids!.Push(gid);
+
+ // =========================================================================
+ // Per-connection ring lifecycle
+ // =========================================================================
+
+ private void SetupConnectionBufRing(Connection conn)
+ {
+ ushort gid = AllocGid();
+ int entries = ConnBufRingEntries;
+
+ // Ring control area + slab + tracking arrays are allocated once and
+ // reused across pool lives; only the kernel registration is per-life.
+ if (conn.BufRing == null)
+ conn.BufRing = (byte*)NativeMemory.AlignedAlloc((nuint)entries * 16, 4096);
+ NativeMemory.Clear(conn.BufRing, (nuint)entries * 16);
+
+ if (conn.BufSlab == null)
+ conn.BufSlab = (byte*)NativeMemory.AlignedAlloc((nuint)entries * (nuint)IncRecvBufferSize, 64);
+
+ conn.CumOffset ??= new int[entries];
+ conn.RefCount ??= new int[entries];
+ conn.KernelDone ??= new bool[entries];
+ Array.Clear(conn.CumOffset, 0, entries);
+ Array.Clear(conn.RefCount, 0, entries);
+ Array.Clear(conn.KernelDone, 0, entries);
+
+ var reg = new io_uring_buf_reg
+ {
+ ring_addr = (ulong)conn.BufRing,
+ ring_entries = (uint)entries,
+ bgid = gid,
+ flags = IOU_PBUF_RING_INC,
+ };
+ int ret = io_uring_register(Ring.Fd, IORING_REGISTER_PBUF_RING, ®, 1);
+ if (ret < 0)
+ throw new InvalidOperationException($"register pbuf_ring (inc) failed: ret={ret} gid={gid}");
+
+ conn.Bgid = gid;
+ conn.BufRingEntries = entries;
+ conn.BufRingMask = (uint)(entries - 1);
+ conn.IncrementalMode = true;
+
+ for (ushort bid = 0; bid < entries; bid++)
+ {
+ byte* slot = conn.BufRing + (uint)bid * 16;
+ *(ulong*)(slot + 0) = (ulong)(conn.BufSlab + bid * (nuint)IncRecvBufferSize);
+ *(uint*)(slot + 8) = IncRecvBufferSize;
+ *(ushort*)(slot + 12) = bid;
+ }
+ Volatile.Write(ref *(ushort*)(conn.BufRing + 14), (ushort)entries);
+ }
+
+ private void TeardownConnectionBufRing(Connection conn)
+ {
+ if (conn.IncrementalMode)
+ {
+ var reg = new io_uring_buf_reg { bgid = conn.Bgid };
+ io_uring_register(Ring.Fd, IORING_UNREGISTER_PBUF_RING, ®, 1);
+ FreeGid(conn.Bgid);
+ }
+ // BufRing / BufSlab / arrays stay allocated for pool reuse.
+ }
+
+ // Re-add a fully-consumed buffer to its connection's ring (reactor-thread only).
+ private void ReturnConnectionBuffer(Connection conn, ushort bid)
+ {
+ conn.CumOffset![bid] = 0;
+ conn.RefCount![bid] = 0;
+ conn.KernelDone![bid] = false;
+
+ ushort tail = Volatile.Read(ref *(ushort*)(conn.BufRing + 14));
+ byte* slot = conn.BufRing + (tail & conn.BufRingMask) * 16;
+ *(ulong*)(slot + 0) = (ulong)(conn.BufSlab + bid * (nuint)IncRecvBufferSize);
+ *(uint*)(slot + 8) = IncRecvBufferSize;
+ *(ushort*)(slot + 12) = bid;
+ Volatile.Write(ref *(ushort*)(conn.BufRing + 14), (ushort)(tail + 1));
+ }
+
+ // =========================================================================
+ // Refcounted return path (handler → reactor), carrying (fd, gen, bid)
+ // =========================================================================
+
+ // (fd, gen, bid) packed into one ulong for the incremental return queue:
+ // fd in the high 32 bits, gen in the next 16, bid in the low 16.
+ private static ulong PackReturn(int fd, ushort gen, ushort bid)
+ => ((ulong)(uint)fd << 32) | ((ulong)gen << 16) | bid;
+
+ private static void UnpackReturn(ulong packed, out int fd, out ushort gen, out ushort bid)
+ {
+ fd = (int)(packed >> 32);
+ gen = (ushort)((packed >> 16) & 0xFFFF);
+ bid = (ushort)(packed & 0xFFFF);
+ }
+
+ public void EnqueueReturnQIncremental(int fd, ushort gen, ushort bid)
+ {
+ // Fast path: caller is the reactor thread (handler resumed inline).
+ if (Environment.CurrentManagedThreadId == _reactorThreadId)
+ {
+ ApplyReturnIncremental(fd, gen, bid);
+ return;
+ }
+ ulong packed = PackReturn(fd, gen, bid);
+ SpinWait sw = default;
+ while (!_returnQInc!.TryEnqueue(packed))
+ sw.SpinOnce();
+ WakeFdWrite();
+ }
+
+ private void DrainReturnQIncremental()
+ {
+ while (_returnQInc!.TryDequeue(out ulong packed))
+ {
+ UnpackReturn(packed, out int fd, out ushort gen, out ushort bid);
+ ApplyReturnIncremental(fd, gen, bid);
+ }
+ }
+
+ private void ApplyReturnIncremental(int fd, ushort gen, ushort bid)
+ {
+ if (!Connections.TryGetValue(fd, out var conn) || !conn.IncrementalMode)
+ {
+ return; // fd gone / ring already torn down
+ }
+ if ((ushort)conn.Generation != gen)
+ {
+ return; // stale return from a previous life (fd reused)
+ }
+
+ conn.RefCount![bid]--;
+ if (conn.RefCount[bid] <= 0 && conn.KernelDone![bid])
+ {
+ ReturnConnectionBuffer(conn, bid);
+ }
+ }
+
+ // =========================================================================
+ // Incremental reactor loop
+ // =========================================================================
+
+ private void LoopIncremental()
+ {
+ while (true)
+ {
+ DrainReturnQIncremental();
+ DrainFlushQ();
+ DrainRecycleQ();
+
+ int rc = Ring.SubmitAndWait(1);
+ if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY)
+ {
+ Console.Error.WriteLine($"[r{Id}] io_uring_enter failed: {rc}");
+
+ break;
+ }
+
+ uint ready = Ring.CqReady();
+ for (uint i = 0; i < ready; i++)
+ {
+ DispatchIncremental(in Ring.CqeAt(i));
+ }
+ Ring.CqAdvance(ready);
+ }
+ }
+
+ private void DispatchIncremental(in IoUringCqe cqe)
+ {
+ ulong kind = cqe.user_data & 0xffffffff_00000000UL;
+ int fd = (int)(cqe.user_data & 0xffffffffUL);
+ bool more = (cqe.flags & IORING_CQE_F_MORE) != 0;
+
+ if (kind == KindWake)
+ {
+ ulong drain;
+ read(_wakeFd, &drain, 8);
+ if (!more)
+ {
+ ArmWakePoll();
+ }
+ return;
+ }
+
+ if (kind == KindAccept)
+ {
+ if (cqe.res >= 0)
+ {
+ int clientFd = cqe.res;
+ SetNoDelay(clientFd);
+ Connection conn = _pool.TryPop(out var pooled)
+ ? pooled.SetFd(clientFd)
+ : new Connection(this, clientFd, _config.WriteSlabSize);
+ Connections[clientFd] = conn;
+ conn.InitRefs();
+ SetupConnectionBufRing(conn);
+ SubmitRecvMultishot(clientFd, conn.Bgid);
+
+ _ = _config.UsePipe
+ ? Handler.HandlePipeAsync(this, conn)
+ : Handler.HandleAsync(this, conn);
+ }
+ else
+ {
+ Console.Error.WriteLine($"[r{Id}] accept error: {cqe.res}");
+ }
+ if (!more)
+ {
+ SubmitAcceptMultishot();
+ }
+ }
+ else if (kind == KindRecv)
+ {
+ bool hasBuf = (cqe.flags & IORING_CQE_F_BUFFER) != 0;
+ bool bufMore = (cqe.flags & IORING_CQE_F_BUF_MORE) != 0;
+ ushort bid = hasBuf ? (ushort)(cqe.flags >> IORING_CQE_BUFFER_SHIFT) : (ushort)0;
+
+ if (cqe.res <= 0)
+ {
+ // Peer EOF / recv error — the whole per-conn ring is freed in Recycle.
+ if (Connections.Remove(fd, out var dyingConn))
+ {
+ dyingConn.MarkClosed();
+ dyingConn.DecRef();
+ }
+
+ return;
+ }
+
+ if (!Connections.TryGetValue(fd, out var conn))
+ {
+ return; // straggler for a connection whose ring is already gone
+ }
+
+ // Data lands at the buffer's running offset; the kernel keeps
+ // appending to this bid until the buffer is full (F_BUF_MORE clear).
+ byte* ptr = conn.BufSlab + (nuint)bid * (nuint)IncRecvBufferSize + (nuint)conn.CumOffset![bid];
+ conn.CumOffset[bid] += cqe.res;
+ conn.RefCount![bid]++;
+ if (!bufMore || !more)
+ {
+ conn.KernelDone![bid] = true;
+ }
+
+ conn.Complete(cqe.res, bid, hasBuffer: true, ptr);
+
+ if (!more)
+ {
+ SubmitRecvMultishot(fd, conn.Bgid);
+ }
+ }
+ else if (kind == KindSend)
+ {
+ if (!Connections.TryGetValue(fd, out var conn))
+ {
+ return;
+ }
+ if (cqe.res <= 0)
+ {
+ Connections.Remove(fd);
+ conn.MarkClosed();
+ conn.DecRef();
+
+ return;
+ }
+ conn.WriteHead += cqe.res;
+ if (conn.WriteHead < conn.WriteInFlight)
+ {
+ SubmitSend(fd, conn.WriteBuffer + conn.WriteHead, (uint)(conn.WriteInFlight - conn.WriteHead));
+
+ return;
+ }
+
+ conn.CompleteFlush();
+ }
+ }
+}
diff --git a/frameworks/minima/Reactor/Reactor.cs b/frameworks/minima/Reactor/Reactor.cs
new file mode 100644
index 00000000..764bccb6
--- /dev/null
+++ b/frameworks/minima/Reactor/Reactor.cs
@@ -0,0 +1,564 @@
+using System.Collections.Concurrent;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using Minima.Utils;
+using static Minima.Native;
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima;
+
+///
+/// One reactor = one thread + one io_uring + one listening socket (SO_REUSEPORT)
+/// + one connection map. The reactor thread is the sole writer of the SQ ring,
+/// the kernel-shared buf_ring, and the connection map. Handlers may run on any
+/// thread (e.g. resumed by a thread-pool timer after `await Task.Delay(1)`);
+/// they reach the reactor only through two MPSC queues (`_returnQ`, `_flushQ`)
+/// woken by an `eventfd` registered as a multishot poll in the ring.
+///
+public sealed unsafe partial class Reactor
+{
+ public readonly int Id;
+ public Ring Ring = null!; // created on the reactor's own thread (DEFER_TASKRUN requires same-thread setup+enter)
+ public readonly Dictionary Connections = new();
+
+ private int _listenFd;
+ private readonly ServerConfig _config;
+ private readonly ushort _port;
+ private readonly uint _ringEntries;
+ private readonly bool _incremental;
+ private readonly uint RecvBufferSize;
+
+ // CQE user_data layout: kind tag in the high 32 bits, fd in the low 32.
+ private const ulong KindAccept = 1UL << 32;
+ private const ulong KindRecv = 2UL << 32;
+ private const ulong KindSend = 3UL << 32;
+ private const ulong KindWake = 4UL << 32; // eventfd-based cross-thread wake
+
+ // Provided-buffer ring (one per reactor, shared by all its connections).
+ private const ushort BgId = 1;
+ private readonly uint BufferRingEntries; // power of two
+ private byte* _bufRing; // io_uring_buf_ring (kernel-shared)
+ private byte* _bufSlab; // contiguous slab of recv buffers
+ private uint _bufRingMask;
+ private ushort _bufRingTail;
+
+ // Cross-thread wake mechanism: handlers running off-reactor enqueue work
+ // into these MPSC queues and `eventfd_write` _wakeFd; a multishot poll on
+ // _wakeFd registered with the ring delivers a CQE that wakes the reactor.
+ // When the caller is already the reactor thread (the common case — handler
+ // resumed inline from an IVTS SetResult), the Enqueue* methods bypass
+ // the queue and call the direct op, avoiding 2 syscalls per request.
+ private int _wakeFd;
+ private int _reactorThreadId;
+ private readonly Mpsc _returnQ = new(1 << 14); // 16384 slots
+ private readonly Mpsc _flushQ = new(1 << 12); // 4096 slots
+
+ // Teardown handoff: when a connection's refcount hits 0 off-reactor (handler exited
+ // on the thread pool), the recycle must run on the reactor (it touches the buf_ring
+ // and the reactor-only pool). Connection is a ref type, so this is a ConcurrentQueue
+ // rather than the unmanaged Mpsc.
+ private readonly ConcurrentQueue _recycleQ = new();
+
+ // Connection pool. Reactor-thread-only — accept and teardown both run on
+ // this reactor, so a plain Stack is sufficient (no MPMC primitive
+ // needed). PoolMax caps the slab footprint per reactor:
+ // PoolMax × WriteSlabSize × ReactorCount = total reserved native memory.
+ private readonly int PoolMax;
+ private readonly Stack _pool;
+
+ // Incremental-mode (IOU_PBUF_RING_INC) sizing. Each connection gets its own
+ // ring, so reserved native memory is bounded by:
+ // PoolMax × ConnBufRingEntries × IncRecvBufferSize × ReactorCount.
+ // Keep entries small — the point of incremental is that one buffer holds
+ // many reads, so you need few of them per connection.
+ private readonly int MaxConnections; // GID cap (one bgid per active connection)
+ private readonly int ConnBufRingEntries; // buffers per connection ring
+ private readonly uint IncRecvBufferSize; // bytes per buffer (filled incrementally)
+
+ // Transient io_uring_enter errnos (Linux): interrupted, would-block, busy.
+ private const int EINTR = 4;
+ private const int EAGAIN = 11;
+ private const int EBUSY = 16;
+
+ public Reactor(int id, ServerConfig config)
+ {
+ Id = id;
+ _config = config;
+ _port = config.Port;
+ _ringEntries = config.RingEntries;
+ _incremental = config.Incremental;
+ RecvBufferSize = (uint)config.RecvBufferSize;
+ BufferRingEntries = (uint)config.BufferRingEntries;
+ PoolMax = config.PoolMax;
+ MaxConnections = config.MaxConnections;
+ ConnBufRingEntries = config.ConnBufRingEntries;
+ IncRecvBufferSize = (uint)config.IncRecvBufferSize;
+ _pool = new Stack(config.PoolMax);
+ }
+
+ // =========================================================================
+ // Buffer ring
+ // =========================================================================
+
+ private void InitBufferRing()
+ {
+ nuint ringBytes = (nuint)BufferRingEntries * 16;
+ _bufRing = (byte*)NativeMemory.AlignedAlloc(ringBytes, 4096);
+ NativeMemory.Clear(_bufRing, ringBytes);
+
+ nuint slabBytes = BufferRingEntries * (nuint)RecvBufferSize;
+ _bufSlab = (byte*)NativeMemory.AlignedAlloc(slabBytes, 64);
+
+ _bufRingMask = BufferRingEntries - 1;
+
+ var reg = new io_uring_buf_reg {
+ ring_addr = (ulong)_bufRing,
+ ring_entries = BufferRingEntries,
+ bgid = BgId,
+ };
+
+ int ret = io_uring_register(Ring.Fd, IORING_REGISTER_PBUF_RING, ®, 1);
+ if (ret < 0)
+ {
+ int err = Marshal.GetLastPInvokeError();
+
+ throw new InvalidOperationException($"register pbuf_ring failed: ret={ret} errno={err}");
+ }
+
+ // Populate every slot once. Slot 0 overlaps with the ring's tail field
+ // at offset 14, but we only write addr/len/bid (offsets 0..13) so tail
+ // stays at zero until we set it explicitly.
+ for (ushort bid = 0; bid < BufferRingEntries; bid++) {
+ byte* slot = _bufRing + (uint)bid * 16;
+ *(ulong*)(slot + 0) = (ulong)(_bufSlab + bid * (nuint)RecvBufferSize);
+ *(uint*)(slot + 8) = RecvBufferSize;
+ *(ushort*)(slot + 12) = bid;
+ }
+ _bufRingTail = (ushort)BufferRingEntries;
+
+ Volatile.Write(ref *(ushort*)(_bufRing + 14), _bufRingTail);
+ }
+
+ // Reactor-thread-only: writes the kernel-shared buf_ring tail directly.
+ // Off-reactor callers must use EnqueueReturnQ instead.
+ internal void ReturnBufferDirect(ushort bid)
+ {
+ byte* slot = _bufRing + (_bufRingTail & _bufRingMask) * 16;
+ *(ulong*)(slot + 0) = (ulong)(_bufSlab + bid * (nuint)RecvBufferSize);
+ *(uint*)(slot + 8) = RecvBufferSize;
+ *(ushort*)(slot + 12) = bid;
+ _bufRingTail++;
+
+ Volatile.Write(ref *(ushort*)(_bufRing + 14), _bufRingTail);
+ }
+
+ // =========================================================================
+ // Cross-thread entry points (safe to call from any thread)
+ // =========================================================================
+
+ public void EnqueueReturnQ(ushort bid)
+ {
+ // Fast path: caller is the reactor thread (handler running inline from
+ // an IVTS SetResult). Go straight to the buf_ring — no queue, no syscall.
+ if (Environment.CurrentManagedThreadId == _reactorThreadId)
+ {
+ ReturnBufferDirect(bid);
+ return;
+ }
+ SpinWait sw = default;
+ while (!_returnQ.TryEnqueue(bid))
+ {
+ sw.SpinOnce();
+ }
+ //WakeFdWrite();
+ }
+
+ internal void EnqueueFlush(int fd)
+ {
+ // Fast path: caller is the reactor thread; write the SQE directly.
+ if (Environment.CurrentManagedThreadId == _reactorThreadId)
+ {
+ if (Connections.TryGetValue(fd, out var conn))
+ {
+ SubmitSend(fd, conn.WriteBuffer, (uint)conn.WriteInFlight);
+ }
+ return;
+ }
+ SpinWait sw = default;
+ while (!_flushQ.TryEnqueue(fd))
+ {
+ sw.SpinOnce();
+ }
+ WakeFdWrite();
+ }
+
+ // Called by Connection.DecRef when the refcount hits 0. Teardown must run on the
+ // reactor (buf_ring + pool are reactor-owned), so off-reactor callers hand off.
+ internal void EnqueueRecycle(Connection conn)
+ {
+ if (Environment.CurrentManagedThreadId == _reactorThreadId)
+ {
+ Recycle(conn, conn.ClientFd);
+ return;
+ }
+ _recycleQ.Enqueue(conn);
+ WakeFdWrite();
+ }
+
+ private void WakeFdWrite()
+ {
+ ulong v = 1;
+ // 8-byte write to eventfd increments its counter; the kernel marks the
+ // fd readable, which fires our registered multishot poll's next CQE.
+ write(_wakeFd, &v, 8);
+ }
+
+ private void DrainReturnQ()
+ {
+ while (_returnQ.TryDequeue(out ushort bid))
+ {
+ ReturnBufferDirect(bid);
+ }
+ }
+
+ private void DrainFlushQ()
+ {
+ while (_flushQ.TryDequeue(out int fd))
+ {
+ if (!Connections.TryGetValue(fd, out var conn))
+ {
+ continue;
+ }
+ // Connection state was set by FlushAsync; the Enqueue/Dequeue pair
+ // establishes the happens-before so WriteInFlight is visible here.
+ SubmitSend(fd, conn.WriteBuffer, (uint)conn.WriteInFlight);
+ }
+ }
+
+ private void DrainRecycleQ()
+ {
+ while (_recycleQ.TryDequeue(out Connection? conn))
+ {
+ Recycle(conn, conn.ClientFd);
+ }
+ }
+
+ private void ArmWakePoll()
+ {
+ IoUringSqe* sqe = GetSqeOrFlush();
+ Unsafe.InitBlockUnaligned(sqe, 0, 64);
+ sqe->opcode = IORING_OP_POLL_ADD;
+ sqe->fd = _wakeFd;
+ sqe->op_flags = POLLIN; // poll32_events lives at this offset
+ sqe->len = IORING_POLL_ADD_MULTI; // multishot — stays armed across CQEs
+ sqe->user_data = KindWake | (uint)_wakeFd;
+ }
+
+ // =========================================================================
+ // Main loop
+ // =========================================================================
+
+ public void Run()
+ {
+ _reactorThreadId = Environment.CurrentManagedThreadId;
+
+ Ring = Ring.Create(_ringEntries);
+ _listenFd = OpenReusePortListener(_port);
+
+ if (_incremental)
+ {
+ InitIncremental();
+ }
+ else
+ {
+ InitBufferRing();
+ }
+
+ _wakeFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+ if (_wakeFd < 0)
+ {
+ throw new InvalidOperationException("eventfd failed");
+ }
+
+ Console.WriteLine($"[r{Id}] listening on 0.0.0.0:{_port} (incremental={_incremental})");
+ SubmitAcceptMultishot();
+ ArmWakePoll();
+
+ if (_incremental)
+ {
+ LoopIncremental();
+ }
+ else
+ {
+ LoopShared();
+ }
+
+ close(_listenFd);
+ close(_wakeFd);
+ Ring.Dispose();
+ }
+
+ private void LoopShared()
+ {
+ while (true)
+ {
+ // Drain MPSC queues from off-reactor handlers. Cheap when empty.
+ DrainReturnQ();
+ DrainFlushQ();
+ DrainRecycleQ();
+
+ int rc = Ring.SubmitAndWait(1);
+ if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY)
+ {
+ Console.Error.WriteLine($"[r{Id}] io_uring_enter failed: {rc}");
+ break;
+ }
+
+ uint ready = Ring.CqReady();
+ for (uint i = 0; i < ready; i++)
+ {
+ Dispatch(in Ring.CqeAt(i));
+ }
+ Ring.CqAdvance(ready);
+ }
+ }
+
+ private void Dispatch(in IoUringCqe cqe)
+ {
+ ulong kind = cqe.user_data & 0xffffffff_00000000UL;
+ int fd = (int)(cqe.user_data & 0xffffffffUL);
+ bool more = (cqe.flags & IORING_CQE_F_MORE) != 0;
+
+ if (kind == KindWake)
+ {
+ // Drain the eventfd counter so the next write re-triggers POLLIN
+ // (multishot poll is edge-triggered on the user_space side).
+ ulong drain;
+ read(_wakeFd, &drain, 8);
+ // The actual queue drains happen at the top of the next loop
+ // iteration — nothing else to do here.
+ if (!more)
+ {
+ ArmWakePoll();
+ }
+ return;
+ }
+
+ if (kind == KindAccept)
+ {
+ if (cqe.res >= 0)
+ {
+ int clientFd = cqe.res;
+ SetNoDelay(clientFd);
+ Connection conn = _pool.TryPop(out var pooled)
+ ? pooled.SetFd(clientFd)
+ : new Connection(this, clientFd, _config.WriteSlabSize);
+ Connections[clientFd] = conn;
+ conn.InitRefs();
+ SubmitRecvMultishot(clientFd);
+
+ _ = _config.UsePipe
+ ? Handler.HandlePipeAsync(this, conn)
+ : Handler.HandleAsync(this, conn);
+ }
+ else
+ {
+ Console.Error.WriteLine($"[r{Id}] accept error: {cqe.res}");
+ }
+ // Multishot accept stays armed; only re-arm if the kernel terminated it.
+ if (!more)
+ {
+ SubmitAcceptMultishot();
+ }
+ }
+ else if (kind == KindRecv)
+ {
+ bool hasBuf = (cqe.flags & IORING_CQE_F_BUFFER) != 0;
+ ushort bid = hasBuf ? (ushort)(cqe.flags >> IORING_CQE_BUFFER_SHIFT) : (ushort)0;
+
+ if (cqe.res <= 0)
+ {
+ // Peer EOF or recv error — reactor owns teardown.
+ if (hasBuf)
+ {
+ ReturnBufferDirect(bid);
+ }
+ if (Connections.Remove(fd, out var dyingConn))
+ {
+ dyingConn.MarkClosed(); // signal the handler to exit
+ dyingConn.DecRef(); // release the reactor's ref; teardown at refs==0
+ }
+ return;
+ }
+
+ if (!Connections.TryGetValue(fd, out var conn))
+ {
+ // Straggler buffer for an already-closed connection.
+ if (hasBuf)
+ {
+ ReturnBufferDirect(bid);
+ }
+ return;
+ }
+
+ byte* ptr = hasBuf ? _bufSlab + (nuint)bid * (nuint)RecvBufferSize : null;
+ conn.Complete(cqe.res, bid, hasBuf, ptr);
+
+ if (!more)
+ {
+ SubmitRecvMultishot(fd);
+ }
+ }
+ else if (kind == KindSend)
+ {
+ if (!Connections.TryGetValue(fd, out var conn))
+ {
+ return;
+ }
+ if (cqe.res <= 0)
+ {
+ // Send error — release the reactor's ref; teardown when the handler exits too.
+ Connections.Remove(fd);
+ conn.MarkClosed();
+ conn.DecRef();
+ return;
+ }
+ conn.WriteHead += cqe.res;
+ if (conn.WriteHead < conn.WriteInFlight)
+ {
+ // Partial send: resubmit the remainder.
+ SubmitSend(fd, conn.WriteBuffer + conn.WriteHead, (uint)(conn.WriteInFlight - conn.WriteHead));
+ return;
+ }
+ // Full target ack'd — resets buffer state and signals the awaiter.
+ conn.CompleteFlush();
+ }
+ }
+
+ // =========================================================================
+ // SQE producers (reactor-thread-only — Connection.FlushAsync hands off via
+ // EnqueueFlush, which DrainFlushQ turns into SubmitSend on this thread)
+ // =========================================================================
+
+ private IoUringSqe* GetSqeOrFlush()
+ {
+ IoUringSqe* sqe = Ring.GetSqe();
+ if (sqe != null)
+ {
+ return sqe;
+ }
+
+ Ring.SubmitAndWait(0);
+ sqe = Ring.GetSqe();
+
+ if (sqe == null)
+ {
+ throw new InvalidOperationException("SQ full after flush");
+ }
+
+ return sqe;
+ }
+
+ private void SubmitAcceptMultishot()
+ {
+ IoUringSqe* sqe = GetSqeOrFlush();
+ Unsafe.InitBlockUnaligned(sqe, 0, 64);
+ sqe->opcode = IORING_OP_ACCEPT;
+ sqe->ioprio = IORING_ACCEPT_MULTISHOT;
+ sqe->fd = _listenFd;
+ sqe->user_data = KindAccept | (uint)_listenFd;
+ }
+
+ private void SubmitRecvMultishot(int fd) => SubmitRecvMultishot(fd, BgId);
+
+ private void SubmitRecvMultishot(int fd, ushort bgid)
+ {
+ IoUringSqe* sqe = GetSqeOrFlush();
+ Unsafe.InitBlockUnaligned(sqe, 0, 64);
+ sqe->opcode = IORING_OP_RECV;
+ sqe->flags = IOSQE_BUFFER_SELECT;
+ sqe->ioprio = IORING_RECV_MULTISHOT;
+ sqe->fd = fd;
+ sqe->buf_index = bgid; // buffer-group id (shared BgId, or per-conn in incremental)
+ sqe->user_data = KindRecv | (uint)fd;
+ }
+
+ private void SubmitSend(int fd, byte* buf, uint len)
+ {
+ IoUringSqe* sqe = GetSqeOrFlush();
+ Unsafe.InitBlockUnaligned(sqe, 0, 64);
+ sqe->opcode = IORING_OP_SEND;
+ sqe->fd = fd;
+ sqe->addr = (ulong)buf;
+ sqe->len = len;
+ sqe->user_data = KindSend | (uint)fd;
+ }
+
+ private void Recycle(Connection conn, int fd)
+ {
+ // Wake awaiters, drain in-flight buffers, close the fd, reset state,
+ // and either push the Connection back to the pool or free its native
+ // WriteBuffer if the pool is full.
+ conn.MarkClosed();
+ if (_incremental)
+ {
+ // The per-connection ring is freed wholesale; no per-buffer return.
+ // Clear() empties the SPSC ring (leftover slices discarded).
+ TeardownConnectionBufRing(conn);
+ }
+ else
+ {
+ conn.DrainRecv(); // return leftover buffers to the shared ring
+ }
+ close(fd);
+ conn.Clear();
+
+ if (_pool.Count < PoolMax)
+ {
+ _pool.Push(conn);
+ }
+ else
+ {
+ conn.Dispose();
+ }
+ }
+
+ // Disable Nagle on an accepted connection. Must be set per-accepted-socket,
+ // not on the listener — TCP_NODELAY doesn't reliably inherit across accept,
+ // which is why zerg/terraform/rtr all set it on the client fd, not the listener.
+ private static void SetNoDelay(int fd)
+ {
+ int one = 1;
+ setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(int));
+ }
+
+ private static int OpenReusePortListener(ushort port)
+ {
+ int fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd < 0)
+ {
+ throw new InvalidOperationException($"socket failed: {fd}");
+ }
+
+ int one = 1;
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
+ setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int));
+
+ sockaddr_in addr = default;
+ addr.sin_family = AF_INET;
+ addr.sin_port = Htons(port);
+ addr.sin_addr.s_addr = 0; // 0.0.0.0
+
+ if (bind(fd, &addr, (uint)sizeof(sockaddr_in)) < 0)
+ {
+ throw new InvalidOperationException("bind failed");
+ }
+
+ if (listen(fd, 128) < 0)
+ {
+ throw new InvalidOperationException("listen failed");
+ }
+
+ return fd;
+ }
+}
diff --git a/frameworks/minima/ServerConfig.cs b/frameworks/minima/ServerConfig.cs
new file mode 100644
index 00000000..255d50c1
--- /dev/null
+++ b/frameworks/minima/ServerConfig.cs
@@ -0,0 +1,39 @@
+namespace Minima;
+
+///
+/// All server tunables in one place — replaces the consts that used to be
+/// scattered across Program.cs and Reactor.cs. Defaults match the previous
+/// hardcoded values; override via object initializer in Main, e.g.:
+/// new ServerConfig { Port = 9000, ReactorCount = 8, Incremental = true }.
+///
+public sealed record ServerConfig
+{
+ // Server-level.
+ public ushort Port { get; init; } = 8080;
+ public int ReactorCount { get; init; } = 12;
+
+ // Handler style: false = raw ReadAsync/TryGetItem loop; true = PipeReader/PipeWriter.
+ public bool UsePipe { get; init; } = false;
+
+ // Static file served by the handler via Magpie (io_uring file read). If the path
+ // doesn't exist a sample file is written there at startup.
+ public string FilePath { get; init; } = "/tmp/minima-magpie-sample.html";
+
+ // io_uring SQ/CQ depth.
+ public uint RingEntries { get; init; } = 8192;
+
+ // Shared buffer ring (used when Incremental == false).
+ public int RecvBufferSize { get; init; } = 32 * 1024;
+ public int BufferRingEntries { get; init; } = 4096;
+
+ // Per-connection write slab + connection pool cap.
+ public int WriteSlabSize { get; init; } = 16 * 1024;
+ public int PoolMax { get; init; } = 1024;
+
+ // Incremental mode (IOU_PBUF_RING_INC) — per-connection rings.
+ // reserved native memory ≈ PoolMax × ConnBufRingEntries × IncRecvBufferSize × ReactorCount.
+ public bool Incremental { get; init; } = false;
+ public int MaxConnections { get; init; } = 4096; // GID cap (one bgid per active connection)
+ public int ConnBufRingEntries { get; init; } = 16; // buffers per connection ring
+ public int IncRecvBufferSize { get; init; } = 4096; // bytes per buffer (filled incrementally)
+}
diff --git a/frameworks/minima/Utils/Mpsc.cs b/frameworks/minima/Utils/Mpsc.cs
new file mode 100644
index 00000000..78e445af
--- /dev/null
+++ b/frameworks/minima/Utils/Mpsc.cs
@@ -0,0 +1,115 @@
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima.Utils;
+
+///
+/// Bounded lock-free multi-producer / single-consumer queue.
+///
+/// Dmitry Vyukov's bounded MPMC algorithm, specialised to one consumer.
+/// Power-of-two capacity, zero-allocation after construction. Producers claim a
+/// slot via CAS on the enqueue position (a failed TryEnqueue on a full queue
+/// leaves the position untouched — no burned tickets); the single consumer
+/// advances the dequeue position with a plain write. Each slot carries a
+/// sequence number that coordinates ownership between producers and consumer.
+///
+/// One generic queue serves every reactor handoff: Mpsc<ushort> for buffer
+/// returns, Mpsc<int> for flush fds, Mpsc<ulong> for packed incremental
+/// returns. T is unmanaged so each Cell is a blittable value type with no GC refs.
+///
+internal sealed class Mpsc where T : unmanaged
+{
+ private struct Cell
+ {
+ public long Sequence;
+ public T Value;
+ }
+
+ private readonly Cell[] _buffer;
+ private readonly int _mask;
+
+ // PaddedLong is a top-level struct (not nested here) because the CLR forbids
+ // explicit layout on a type nested inside a generic.
+ private PaddedLong _enqueuePos;
+ private PaddedLong _dequeuePos;
+
+ public Mpsc(int capacityPow2)
+ {
+ if (capacityPow2 < 2 || (capacityPow2 & (capacityPow2 - 1)) != 0)
+ throw new ArgumentException("Capacity must be a power of two >= 2.", nameof(capacityPow2));
+
+ _buffer = new Cell[capacityPow2];
+ _mask = capacityPow2 - 1;
+
+ for (int i = 0; i < capacityPow2; i++)
+ _buffer[i].Sequence = i;
+ }
+
+ /// Multi-producer safe. Returns false if the queue is full.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool TryEnqueue(T item)
+ {
+ Cell[] buffer = _buffer;
+ int mask = _mask;
+
+ while (true)
+ {
+ long pos = Volatile.Read(ref _enqueuePos.Value);
+ ref Cell cell = ref buffer[(int)pos & mask];
+
+ long seq = Volatile.Read(ref cell.Sequence);
+ long dif = seq - pos;
+
+ if (dif == 0)
+ {
+ if (Interlocked.CompareExchange(ref _enqueuePos.Value, pos + 1, pos) == pos)
+ {
+ cell.Value = item;
+ Volatile.Write(ref cell.Sequence, pos + 1);
+ return true;
+ }
+ continue; // lost the race; reload and retry
+ }
+
+ if (dif < 0)
+ return false; // slot not yet consumed → full
+ }
+ }
+
+ /// Single-consumer only. Returns false if empty.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool TryDequeue(out T item)
+ {
+ Cell[] buffer = _buffer;
+ int mask = _mask;
+
+ long pos = _dequeuePos.Value; // single consumer: plain read
+ ref Cell cell = ref buffer[(int)pos & mask];
+
+ long seq = Volatile.Read(ref cell.Sequence);
+ long dif = seq - (pos + 1);
+
+ if (dif == 0)
+ {
+ item = cell.Value;
+ _dequeuePos.Value = pos + 1; // single consumer: plain write
+ Volatile.Write(ref cell.Sequence, pos + mask + 1); // free slot for producers
+ return true;
+ }
+
+ item = default;
+ return false;
+ }
+}
+
+///
+/// A single long padded to a 64-byte cache line so the producer and consumer
+/// positions never share a line (no false sharing). Top-level and non-generic
+/// so it can legally use explicit layout.
+///
+[StructLayout(LayoutKind.Explicit, Size = 64)]
+internal struct PaddedLong
+{
+ [FieldOffset(0)] public long Value;
+}
diff --git a/frameworks/minima/Utils/RingSegment.cs b/frameworks/minima/Utils/RingSegment.cs
new file mode 100644
index 00000000..273d76e9
--- /dev/null
+++ b/frameworks/minima/Utils/RingSegment.cs
@@ -0,0 +1,31 @@
+using System.Buffers;
+
+namespace Minima.Utils;
+
+///
+/// One segment of a multi-buffer ReadOnlySequence<byte> built by the
+/// ConnectionPipeReader when a single read spans more than one recv buffer.
+/// BufferId is carried for debugging; buffer return is driven off the held
+/// item list, not the segments.
+///
+public sealed class RingSegment : ReadOnlySequenceSegment
+{
+ public ushort BufferId { get; }
+
+ public RingSegment(ReadOnlyMemory memory, ushort bufferId)
+ {
+ Memory = memory;
+ BufferId = bufferId;
+ }
+
+ public RingSegment Append(ReadOnlyMemory memory, ushort bufferId)
+ {
+ var next = new RingSegment(memory, bufferId)
+ {
+ RunningIndex = RunningIndex + Memory.Length
+ };
+
+ Next = next;
+ return next;
+ }
+}
diff --git a/frameworks/minima/Utils/SpscRecvRing.cs b/frameworks/minima/Utils/SpscRecvRing.cs
new file mode 100644
index 00000000..b26642f9
--- /dev/null
+++ b/frameworks/minima/Utils/SpscRecvRing.cs
@@ -0,0 +1,105 @@
+using System.Runtime.CompilerServices;
+
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+
+namespace Minima.Utils;
+
+public sealed unsafe class SpscRecvRing
+{
+ public struct Item
+ {
+ public byte* Ptr;
+ public ushort Bid;
+ public int Len;
+ public bool HasBuffer;
+ public ushort Gen; // connection generation when enqueued (incremental return guard)
+
+ public ReadOnlySpan AsSpan() => new(Ptr, Len);
+
+ public UnmanagedMemoryManager AsMemoryManager() => new(Ptr, Len, Bid);
+ }
+
+ private readonly Item[] _items;
+ private readonly int _mask;
+ private long _tail;
+ private long _head;
+
+ public SpscRecvRing(int capacityPow2)
+ {
+ if (capacityPow2 <= 0 || (capacityPow2 & (capacityPow2 - 1)) != 0)
+ {
+ throw new ArgumentException("capacity must be a power of two", nameof(capacityPow2));
+ }
+
+ _items = new Item[capacityPow2];
+ _mask = capacityPow2 - 1;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool TryEnqueue(in Item item)
+ {
+ long head = Volatile.Read(ref _head);
+ long tail = _tail;
+
+ if ((ulong)(tail - head) >= (ulong)_items.Length)
+ {
+ return false;
+ }
+
+ _items[(int)(tail & _mask)] = item;
+ Volatile.Write(ref _tail, tail + 1);
+
+ return true;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool TryDequeue(out Item item)
+ {
+ long head = _head;
+ long tail = Volatile.Read(ref _tail);
+
+ if (head >= tail)
+ {
+ item = default;
+ return false;
+ }
+
+ item = _items[(int)(head & _mask)];
+ Volatile.Write(ref _head, head + 1);
+
+ return true;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public long SnapshotTail() => Volatile.Read(ref _tail);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool TryDequeueUntil(long tailSnapshot, out Item item)
+ {
+ long head = _head;
+
+ if (head >= tailSnapshot)
+ {
+ item = default;
+ return false;
+ }
+
+ item = _items[(int)(head & _mask)];
+ Volatile.Write(ref _head, head + 1);
+
+ return true;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool IsEmpty() => Volatile.Read(ref _head) >= Volatile.Read(ref _tail);
+
+ // Reactor-thread-only, called during connection teardown (Clear) when no
+ // handler is consuming. Discards any leftover items so the recycled
+ // connection starts empty.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void Reset()
+ {
+ _head = 0;
+ _tail = 0;
+ }
+}
diff --git a/frameworks/minima/Utils/UnmanagedMemoryManager.cs b/frameworks/minima/Utils/UnmanagedMemoryManager.cs
new file mode 100644
index 00000000..994202c0
--- /dev/null
+++ b/frameworks/minima/Utils/UnmanagedMemoryManager.cs
@@ -0,0 +1,32 @@
+using System.Buffers;
+
+namespace Minima.Utils;
+
+public sealed unsafe class UnmanagedMemoryManager : MemoryManager
+{
+ private readonly byte* _ptr;
+ private readonly int _length;
+
+ public ushort BufferId { get; }
+
+ public UnmanagedMemoryManager(byte* ptr, int length)
+ {
+ _ptr = ptr;
+ _length = length;
+ }
+
+ public UnmanagedMemoryManager(byte* ptr, int length, ushort bufferId)
+ {
+ _ptr = ptr;
+ _length = length;
+ BufferId = bufferId;
+ }
+
+ public override Span GetSpan() => new(_ptr, _length);
+
+ public override MemoryHandle Pin(int elementIndex = 0) => new(_ptr + elementIndex);
+
+ public override void Unpin() { }
+
+ protected override void Dispose(bool disposing) { }
+}
diff --git a/frameworks/minima/io_uring/Native.cs b/frameworks/minima/io_uring/Native.cs
new file mode 100644
index 00000000..61d98018
--- /dev/null
+++ b/frameworks/minima/io_uring/Native.cs
@@ -0,0 +1,162 @@
+using System.Runtime.InteropServices;
+
+namespace Minima;
+
+///
+/// All native interop in one file: io_uring syscalls, libc socket calls,
+/// the kernel struct layouts they expect, and the constants needed to
+/// drive a minimal io_uring loop.
+///
+public static unsafe class Native {
+ private const long SYS_IO_URING_SETUP = 425;
+ private const long SYS_IO_URING_ENTER = 426;
+ private const long SYS_IO_URING_REGISTER = 427;
+
+ public const byte IORING_OP_POLL_ADD = 6;
+ public const byte IORING_OP_ACCEPT = 13;
+ public const byte IORING_OP_SEND = 26;
+ public const byte IORING_OP_RECV = 27;
+ public const uint IORING_ENTER_GETEVENTS = 1u << 0;
+ public const long IORING_OFF_SQ_RING = 0;
+ public const long IORING_OFF_SQES = 0x10000000;
+
+ // Multishot / buffer-ring goodies.
+ public const ushort IORING_ACCEPT_MULTISHOT = 1 << 0;
+ public const ushort IORING_RECV_MULTISHOT = 1 << 1;
+ public const byte IOSQE_BUFFER_SELECT = 1 << 5;
+ public const uint IORING_CQE_F_BUFFER = 1u << 0;
+ public const uint IORING_CQE_F_MORE = 1u << 1;
+ public const int IORING_CQE_BUFFER_SHIFT = 16;
+ public const uint IORING_REGISTER_PBUF_RING = 22;
+ public const uint IORING_UNREGISTER_PBUF_RING = 23;
+ public const uint IORING_POLL_ADD_MULTI = 1u << 0;
+
+ // Incremental provided-buffer consumption (kernel 6.12+). IOU_PBUF_RING_INC
+ // is set in io_uring_buf_reg.flags at registration; IORING_CQE_F_BUF_MORE is
+ // set on recv CQEs while the kernel will keep appending to the same buffer.
+ public const ushort IOU_PBUF_RING_INC = 2;
+ public const uint IORING_CQE_F_BUF_MORE = 1u << 4;
+
+ // eventfd flags + poll mask (used for the cross-thread wake mechanism).
+ public const int EFD_CLOEXEC = 0x80000;
+ public const int EFD_NONBLOCK = 0x800;
+ public const uint POLLIN = 0x0001;
+
+ // Setup flags. SINGLE_ISSUER tells the kernel only one thread will submit
+ // to this ring (skips locking on the SQ). DEFER_TASKRUN defers completion
+ // processing until io_uring_enter(GETEVENTS), which lets the kernel batch
+ // work and avoids interrupting the reactor with task_work mid-flight.
+ public const uint IORING_SETUP_SINGLE_ISSUER = 1u << 12;
+ public const uint IORING_SETUP_DEFER_TASKRUN = 1u << 13;
+
+ public const int PROT_READ = 1;
+ public const int PROT_WRITE = 2;
+ public const int MAP_SHARED = 1;
+ public const int MAP_POPULATE = 0x8000;
+
+ public const int AF_INET = 2;
+ public const int SOCK_STREAM = 1;
+ public const int SOL_SOCKET = 1;
+ public const int SO_REUSEADDR = 2;
+ public const int SO_REUSEPORT = 15;
+ public const int IPPROTO_TCP = 6;
+ public const int TCP_NODELAY = 1;
+
+ [DllImport("libc", EntryPoint = "syscall")]
+ private static extern long syscall3(long nr, uint a1, IoUringParams* a2);
+
+ [DllImport("libc", EntryPoint = "syscall")]
+ private static extern long syscall6(long nr, uint a1, uint a2, uint a3, uint a4, void* a5, nuint a6);
+
+ [DllImport("libc", EntryPoint = "syscall", SetLastError = true)]
+ private static extern long syscall4(long nr, uint a1, uint a2, void* a3, uint a4);
+
+ public static int io_uring_setup(uint entries, IoUringParams* p) =>
+ (int)syscall3(SYS_IO_URING_SETUP, entries, p);
+
+ public static int io_uring_enter(int fd, uint toSubmit, uint minComplete, uint flags) =>
+ (int)syscall6(SYS_IO_URING_ENTER, (uint)fd, toSubmit, minComplete, flags, null, 0);
+
+ public static int io_uring_register(int fd, uint opcode, void* arg, uint nrArgs) =>
+ (int)syscall4(SYS_IO_URING_REGISTER, (uint)fd, opcode, arg, nrArgs);
+
+ [DllImport("libc")] public static extern void* mmap(void* addr, nuint length, int prot, int flags, int fd, long offset);
+ [DllImport("libc")] public static extern int munmap(void* addr, nuint length);
+ [DllImport("libc")] public static extern int close(int fd);
+ [DllImport("libc")] public static extern int socket(int domain, int type, int proto);
+ [DllImport("libc")] public static extern int bind(int fd, sockaddr_in* addr, uint len);
+ [DllImport("libc")] public static extern int listen(int fd, int backlog);
+ [DllImport("libc")] public static extern int setsockopt(int fd, int level, int optname, void* optval, uint optlen);
+ [DllImport("libc")] public static extern int eventfd(uint initval, int flags);
+ [DllImport("libc")] public static extern long write(int fd, void* buf, nuint count);
+ [DllImport("libc")] public static extern long read(int fd, void* buf, nuint count);
+
+ public static ushort Htons(ushort x) => (ushort)((x << 8) | (x >> 8));
+
+ // Kernel struct layouts (must match include/uapi/linux/io_uring.h)
+ [StructLayout(LayoutKind.Sequential)]
+ public struct SqRingOffsets {
+ public uint head, tail, ring_mask, ring_entries, flags, dropped, array, resv1;
+ public ulong resv2;
+ }
+
+ [StructLayout(LayoutKind.Sequential)]
+ public struct CqRingOffsets {
+ public uint head, tail, ring_mask, ring_entries, overflow, cqes, flags, resv1;
+ public ulong resv2;
+ }
+
+ [StructLayout(LayoutKind.Sequential)]
+ public struct IoUringParams {
+ public uint sq_entries, cq_entries, flags, sq_thread_cpu, sq_thread_idle;
+ public uint features, wq_fd, resv0, resv1, resv2;
+ public SqRingOffsets sq_off;
+ public CqRingOffsets cq_off;
+ }
+
+ [StructLayout(LayoutKind.Explicit, Size = 64)]
+ public struct IoUringSqe {
+ [FieldOffset(0)] public byte opcode;
+ [FieldOffset(1)] public byte flags;
+ [FieldOffset(2)] public ushort ioprio;
+ [FieldOffset(4)] public int fd;
+ [FieldOffset(8)] public ulong off;
+ [FieldOffset(16)] public ulong addr;
+ [FieldOffset(24)] public uint len;
+ [FieldOffset(28)] public uint op_flags;
+ [FieldOffset(32)] public ulong user_data;
+ [FieldOffset(40)] public ushort buf_index;
+ [FieldOffset(42)] public ushort personality;
+ [FieldOffset(44)] public int splice_fd_in;
+ [FieldOffset(48)] public ulong addr3;
+ [FieldOffset(56)] public ulong __pad2;
+ }
+
+ [StructLayout(LayoutKind.Sequential)]
+ public struct IoUringCqe {
+ public ulong user_data;
+ public int res;
+ public uint flags;
+ }
+
+ // Argument struct for IORING_REGISTER_PBUF_RING.
+ [StructLayout(LayoutKind.Sequential)]
+ public struct io_uring_buf_reg {
+ public ulong ring_addr;
+ public uint ring_entries;
+ public ushort bgid;
+ public ushort flags;
+ public ulong resv1, resv2, resv3;
+ }
+
+ [StructLayout(LayoutKind.Sequential)]
+ public struct in_addr { public uint s_addr; }
+
+ [StructLayout(LayoutKind.Sequential)]
+ public unsafe struct sockaddr_in {
+ public ushort sin_family;
+ public ushort sin_port;
+ public in_addr sin_addr;
+ public fixed byte sin_zero[8];
+ }
+}
diff --git a/frameworks/minima/io_uring/Ring.cs b/frameworks/minima/io_uring/Ring.cs
new file mode 100644
index 00000000..c40040fd
--- /dev/null
+++ b/frameworks/minima/io_uring/Ring.cs
@@ -0,0 +1,179 @@
+using System.Runtime.CompilerServices;
+using static Minima.Native;
+
+// ReSharper disable SuggestVarOrType_BuiltInTypes
+// ReSharper disable SuggestVarOrType_Elsewhere
+#pragma warning disable CA1806
+
+namespace Minima;
+
+public sealed unsafe class Ring : IDisposable
+{
+ private int _fd;
+
+ public int Fd => _fd;
+
+ private uint* _sqHead;
+ private uint* _sqTail;
+ private uint* _sqArray;
+ private uint _sqMask;
+ private uint _sqEntries;
+ private IoUringSqe* _sqes;
+
+ private uint* _cqHead;
+ private uint* _cqTail;
+ private IoUringCqe* _cqes;
+ private uint _cqMask;
+
+ private uint _sqeTail;
+
+ private byte* _ringPtr;
+ private nuint _ringSize;
+ private byte* _sqePtr;
+ private nuint _sqeSize;
+
+ public static Ring Create(uint entries)
+ {
+ IoUringParams ioUringParams = default;
+ ioUringParams.flags = IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN;
+ int fd = io_uring_setup(entries, &ioUringParams);
+ if (fd < 0)
+ {
+ throw new InvalidOperationException($"io_uring_setup failed: {fd}");
+ }
+
+ var ring = new Ring
+ {
+ _fd = fd,
+ _sqEntries = ioUringParams.sq_entries
+ };
+
+ nuint sqRingBytes = ioUringParams.sq_off.array + ioUringParams.sq_entries * sizeof(uint);
+ nuint cqRingBytes = ioUringParams.cq_off.cqes + ioUringParams.cq_entries * (nuint)sizeof(IoUringCqe);
+ nuint ringBytes = sqRingBytes > cqRingBytes ? sqRingBytes : cqRingBytes;
+
+ void* ringMem = mmap(null, ringBytes, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
+ if (ringMem == (void*)-1)
+ {
+ close(fd);
+
+ throw new InvalidOperationException("mmap(SQ_RING) failed");
+ }
+ ring._ringPtr = (byte*)ringMem;
+ ring._ringSize = ringBytes;
+
+ nuint sqeBytes = ioUringParams.sq_entries * (nuint)sizeof(IoUringSqe);
+ void* sqeMem = mmap(null, sqeBytes, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
+ if (sqeMem == (void*)-1)
+ {
+ munmap(ringMem, ringBytes);
+ close(fd);
+
+ throw new InvalidOperationException("mmap(SQES) failed");
+ }
+ ring._sqes = (IoUringSqe*)sqeMem;
+ ring._sqePtr = (byte*)sqeMem;
+ ring._sqeSize = sqeBytes;
+
+ byte* ringPointer = (byte*)ringMem;
+ ring._sqHead = (uint*)(ringPointer + ioUringParams.sq_off.head);
+ ring._sqTail = (uint*)(ringPointer + ioUringParams.sq_off.tail);
+ ring._sqArray = (uint*)(ringPointer + ioUringParams.sq_off.array);
+ ring._sqMask = *(uint*)(ringPointer + ioUringParams.sq_off.ring_mask);
+
+ ring._cqHead = (uint*)(ringPointer + ioUringParams.cq_off.head);
+ ring._cqTail = (uint*)(ringPointer + ioUringParams.cq_off.tail);
+ ring._cqes = (IoUringCqe*)(ringPointer + ioUringParams.cq_off.cqes);
+ ring._cqMask = *(uint*)(ringPointer + ioUringParams.cq_off.ring_mask);
+
+ return ring;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public IoUringSqe* GetSqe()
+ {
+ uint head = Volatile.Read(ref *_sqHead);
+
+ if (_sqeTail - head >= _sqEntries)
+ {
+ return null;
+ }
+
+ uint slot = _sqeTail & _sqMask;
+ _sqArray[slot] = slot;
+ _sqeTail++;
+
+ return &_sqes[slot];
+ }
+
+ public int SubmitAndWait(uint waitFor)
+ {
+ uint published = *_sqTail;
+ uint toSubmit = _sqeTail - published;
+
+ if (toSubmit > 0)
+ {
+ Volatile.Write(ref *_sqTail, _sqeTail);
+ }
+
+ if (toSubmit == 0 && waitFor == 0) return 0;
+
+ uint flags = waitFor > 0 ? IORING_ENTER_GETEVENTS : 0;
+
+ return io_uring_enter(_fd, toSubmit, waitFor, flags);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool TryGetCqe(out IoUringCqe cqe)
+ {
+ uint head = *_cqHead;
+ uint tail = Volatile.Read(ref *_cqTail);
+
+ if (head == tail)
+ {
+ cqe = default;
+
+ return false;
+ }
+
+ cqe = _cqes[head & _cqMask];
+
+ return true;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void CqeSeen() => Volatile.Write(ref *_cqHead, *_cqHead + 1);
+
+ // Batched CQ drain (liburing io_uring_for_each_cqe + io_uring_cq_advance):
+ // read the kernel-written tail once (acquire), process the whole batch,
+ // then publish the consumed head once (release) instead of once per CQE.
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public uint CqReady() => Volatile.Read(ref *_cqTail) - *_cqHead;
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public ref readonly IoUringCqe CqeAt(uint i) => ref _cqes[(*_cqHead + i) & _cqMask];
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void CqAdvance(uint n) => Volatile.Write(ref *_cqHead, *_cqHead + n);
+
+ public void Dispose()
+ {
+ if (_ringPtr != null)
+ {
+ munmap(_ringPtr, _ringSize); _ringPtr = null;
+ }
+
+ if (_sqePtr != null)
+ {
+ munmap(_sqePtr, _sqeSize); _sqePtr = null;
+ }
+
+ if (_fd > 0)
+ {
+ close(_fd); _fd = 0;
+ }
+ }
+}
+
+#pragma warning restore CA1806
diff --git a/frameworks/minima/meta.json b/frameworks/minima/meta.json
new file mode 100644
index 00000000..5c856615
--- /dev/null
+++ b/frameworks/minima/meta.json
@@ -0,0 +1,16 @@
+{
+ "display_name": "minima",
+ "language": "C#",
+ "type": "engine",
+ "engine": "io_uring",
+ "description": "Minima — a from-scratch C# multi-reactor io_uring HTTP/1.1 server. Per-reactor SO_REUSEPORT + multishot accept, multishot recv into a provided buffer ring, inline (RCA=false) continuations with a reactor-thread short-circuit on buffer return/flush/recycle. The engine is vendored unchanged; the request handler (request line, headers, Content-Length + chunked bodies, keep-alive, pipelining, fragmented reads) is hand-written on its raw recv/send API. No HTTP framework.",
+ "repo": "",
+ "enabled": true,
+ "tests": [
+ "baseline",
+ "pipelined",
+ "limited-conn",
+ "json"
+ ],
+ "maintainers": []
+}
diff --git a/frameworks/minima/minima.csproj b/frameworks/minima/minima.csproj
new file mode 100644
index 00000000..5a16eff8
--- /dev/null
+++ b/frameworks/minima/minima.csproj
@@ -0,0 +1,15 @@
+
+
+
+ Exe
+ net10.0
+ enable
+ enable
+ true
+ Minima
+ minima
+ true
+ true
+
+
+