diff --git a/frameworks/minima/.dockerignore b/frameworks/minima/.dockerignore new file mode 100644 index 00000000..cd42ee34 --- /dev/null +++ b/frameworks/minima/.dockerignore @@ -0,0 +1,2 @@ +bin/ +obj/ diff --git a/frameworks/minima/Connection/Connection.Incremental.cs b/frameworks/minima/Connection/Connection.Incremental.cs new file mode 100644 index 00000000..bb3dad76 --- /dev/null +++ b/frameworks/minima/Connection/Connection.Incremental.cs @@ -0,0 +1,61 @@ +using System.Runtime.InteropServices; +using Minima.Utils; +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima; + +/// +/// Incremental-mode (IOU_PBUF_RING_INC) per-connection buffer-ring state. +/// Each connection owns its own ring + slab; one buffer accumulates this +/// connection's byte stream across many recvs. The reactor (Reactor.Incremental) +/// drives setup/teardown and the refcounted recycle; this partial just holds the +/// state and routes a handler return to the right reactor entry point. +/// +/// All of these stay allocated across pool reuse and are freed in Dispose(). +/// +public sealed unsafe partial class Connection +{ + internal byte* BufRing; // kernel-shared ring control area + internal byte* BufSlab; // this connection's recv slab + internal ushort Bgid; + internal uint BufRingMask; + internal int BufRingEntries; + internal bool IncrementalMode; + + internal int[]? CumOffset; // per-bid: byte offset where the next slice begins + internal int[]? RefCount; // per-bid: outstanding handler refs + internal bool[]? KernelDone; // per-bid: kernel finished appending (no F_BUF_MORE) + + internal int Generation => Volatile.Read(ref _generation); + + /// + /// Called by the handler to hand a consumed recv buffer back. Routes by mode: + /// incremental returns carry (fd, gen, bid) for refcounted recycle; the shared + /// path returns the bare bid to the reactor's single buf_ring. + /// + public void ReturnBuffer(in SpscRecvRing.Item item) + { + if (IncrementalMode) + { + _reactor.EnqueueReturnQIncremental(ClientFd, item.Gen, item.Bid); + } + else + { + _reactor.EnqueueReturnQ(item.Bid); + } + } + + private void DisposeIncremental() + { + if (BufRing != null) + { + NativeMemory.AlignedFree(BufRing); + BufRing = null; + } + if (BufSlab != null) + { + NativeMemory.AlignedFree(BufSlab); + BufSlab = null; + } + } +} diff --git a/frameworks/minima/Connection/Connection.Read.cs b/frameworks/minima/Connection/Connection.Read.cs new file mode 100644 index 00000000..f803895a --- /dev/null +++ b/frameworks/minima/Connection/Connection.Read.cs @@ -0,0 +1,166 @@ +using System.Threading.Tasks.Sources; +using Minima.Utils; + +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima; + +/// +/// Per-connection state. The handler may run on any thread (e.g. resumed by +/// a thread-pool timer); reactor-only side effects are funnelled through the +/// MPSC queues on `Reactor`. Coordination uses Interlocked.Exchange on the +/// arm flags and a sticky `_pending` to close the lost-wakeup race. +/// +/// Lifetime is pool-managed: the reactor pops a Connection on accept (or new +/// one if pool is empty), and pushes it back on teardown after `Clear()`. The +/// `_generation` field is bumped on each `Clear` so stale `ValueTask` tokens +/// from a previous connection life are detectable and return `Closed()` +/// instead of leaking the new tenant's state. +/// +public sealed unsafe partial class Connection : IValueTaskSource +{ + internal Connection SetFd(int fd) + { + ClientFd = fd; + return this; + } + + private ManualResetValueTaskSourceCore _readSignal = new() + { + RunContinuationsAsynchronously = false, + }; + private int _armed; + private int _pending; + private int _closed; + + private readonly SpscRecvRing _recv = new(capacityPow2: 16); + + public ValueTask ReadAsync() + { + if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1) + { + Volatile.Write(ref _pending, 0); + return new ValueTask( + new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0)); + } + + if (Volatile.Read(ref _closed) != 0) + { + return new ValueTask(RecvSnapshot.Closed()); + } + + if (Interlocked.Exchange(ref _armed, 1) == 1) + { + throw new InvalidOperationException("ReadAsync already armed."); + } + + // Snapshot the generation as the IVTS token so a future Clear() can + // invalidate this awaiter if the connection gets pool-recycled. + int gen = Volatile.Read(ref _generation); + + // Race recovery: re-check between arming and returning the IVTS task. + if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1 || Volatile.Read(ref _closed) != 0) + { + Volatile.Write(ref _pending, 0); + Interlocked.Exchange(ref _armed, 0); + + return new ValueTask( + new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0)); + } + + return new ValueTask(this, (short)gen); + } + + public bool TryGetItem(in RecvSnapshot snap, out SpscRecvRing.Item item) + => _recv.TryDequeueUntil(snap.Tail, out item); + + public void ResetRead() => _readSignal.Reset(); + + public void Complete(int res, ushort bid, bool hasBuffer, byte* ptr) + { + if (!_recv.TryEnqueue(new SpscRecvRing.Item + { + Ptr = ptr, + Bid = bid, + Len = res, + HasBuffer = hasBuffer, + Gen = (ushort)Volatile.Read(ref _generation) + })) + { + Console.Error.WriteLine("[conn] recv queue overflow."); + if (hasBuffer) + { + _reactor.ReturnBufferDirect(bid); + } + Volatile.Write(ref _closed, 1); + } + + if (Interlocked.Exchange(ref _armed, 0) == 1) + { + _readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0)); + } + else + { + Volatile.Write(ref _pending, 1); + } + } + + internal void DrainRecv() + { + // Return any buffer IDs still sitting in the SPSC ring (handler exited + // before draining them, or a recv arrived after _closed was set). + while (_recv.TryDequeue(out SpscRecvRing.Item item)) + { + if (item.HasBuffer) + { + _reactor.ReturnBufferDirect(item.Bid); + } + } + } + + // ========================================================================= + // IValueTaskSource plumbing — token (= snapshot of `_generation` at await + // time) is compared against the current `_generation` to detect stale + // awaiters from before a Clear()/pool reuse. Stale awaiters get a + // sentinel result rather than the new tenant's state. + // + // For the actual IVTS dispatch we pass `_readSignal.Version` / + // `_flushSignal.Version` to the underlying core (not `token`) because the + // core's version is bumped by ResetRead/CompleteFlush mid-life and is + // unrelated to the cross-life generation guard. + // ========================================================================= + + RecvSnapshot IValueTaskSource.GetResult(short token) + { + if (token != (short)Volatile.Read(ref _generation)) + { + return RecvSnapshot.Closed(); + } + + return _readSignal.GetResult(_readSignal.Version); + } + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) + { + if (token != (short)Volatile.Read(ref _generation)) + { + return ValueTaskSourceStatus.Succeeded; + } + + return _readSignal.GetStatus(_readSignal.Version); + } + + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + { + if (token != (short)Volatile.Read(ref _generation)) + { + // Stale — run the continuation now so the awaiter unblocks and + // gets RecvSnapshot.Closed() from GetResult. + continuation(state); + + return; + } + + _readSignal.OnCompleted(continuation, state, _readSignal.Version, flags); + } +} diff --git a/frameworks/minima/Connection/Connection.Write.cs b/frameworks/minima/Connection/Connection.Write.cs new file mode 100644 index 00000000..2f4c0ce9 --- /dev/null +++ b/frameworks/minima/Connection/Connection.Write.cs @@ -0,0 +1,185 @@ +using System.Buffers; +using System.Threading.Tasks.Sources; +using Minima.Utils; + +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima; + +public sealed unsafe partial class Connection : IValueTaskSource, IBufferWriter +{ + private readonly int _writeSlabSize; + internal byte* WriteBuffer; + internal int WriteHead; + internal int WriteTail; + internal int WriteInFlight; + + private readonly UnmanagedMemoryManager _manager; + + private ManualResetValueTaskSourceCore _flushSignal = new() + { + RunContinuationsAsynchronously = false, + }; + private int _flushArmed; + private int _flushInProgress; + + // IBufferWrite +#region IBufferWrite + + public Memory GetMemory(int sizeHint = 0) + { + if (Volatile.Read(ref _flushInProgress) != 0) + { + throw new InvalidOperationException("Cannot write while flush is in progress."); + } + + int remaining = _writeSlabSize - WriteTail; + if (sizeHint > remaining) + { + throw new InvalidOperationException("Buffer too small."); + } + + return _manager.Memory.Slice(WriteTail, remaining); + } + + public Span GetSpan(int sizeHint = 0) + { + if (Volatile.Read(ref _flushInProgress) != 0) + { + throw new InvalidOperationException("Cannot write while flush is in progress."); + } + + if (WriteTail + sizeHint > _writeSlabSize) + { + throw new InvalidOperationException("Write buffer too small."); + } + + return new Span(WriteBuffer + WriteTail, _writeSlabSize - WriteTail); + } + + public void Advance(int count) + { + if (Volatile.Read(ref _flushInProgress) != 0) + { + throw new InvalidOperationException("Cannot write while flush is in progress."); + } + + WriteTail += count; + } + +#endregion + + // Write to the inner buffer + public void Write(ReadOnlySpan source) + { + if (Volatile.Read(ref _flushInProgress) != 0) + { + throw new InvalidOperationException("Cannot write while flush is in progress."); + } + + int len = source.Length; + if (WriteTail + len > _writeSlabSize) + { + throw new InvalidOperationException("Write buffer too small."); + } + + source.CopyTo(new Span(WriteBuffer + WriteTail, len)); + WriteTail += len; + } + + // Flush inner buffer data to the kernel + public ValueTask FlushAsync() + { + // Connection already torn down (reactor saw EOF/error → MarkClosed): don't flush + // a removed connection — the handoff would reach a reactor that no longer knows + // this fd and the awaiter would hang. Return completed so the handler unwinds to + // its next ReadAsync, sees IsClosed, and exits. + if (Volatile.Read(ref _closed) == 1) + { + return default; + } + + if (Interlocked.Exchange(ref _flushInProgress, 1) == 1) + { + throw new InvalidOperationException("FlushAsync already in progress."); + } + + int target = WriteTail; + if (target == 0) + { + Volatile.Write(ref _flushInProgress, 0); + + return default; + } + + if (Interlocked.Exchange(ref _flushArmed, 1) == 1) + { + throw new InvalidOperationException("FlushAsync already armed."); + } + + _flushSignal.Reset(); + WriteInFlight = target; + + int gen = Volatile.Read(ref _generation); + + _reactor.EnqueueFlush(ClientFd); + + // Race recovery (mirrors ReadAsync): if close raced in after the guard above, + // self-complete so we don't hang waiting on a send the reactor will never make. + if (Volatile.Read(ref _closed) == 1 && Interlocked.Exchange(ref _flushArmed, 0) == 1) + { + Volatile.Write(ref _flushInProgress, 0); + _flushSignal.SetResult(true); + } + + return new ValueTask(this, (short)gen); + } + + // Signal the FlushAsync was completed, called by the reactor's dispatcher send branch + internal void CompleteFlush() + { + WriteHead = 0; + WriteTail = 0; + WriteInFlight = 0; + Volatile.Write(ref _flushInProgress, 0); + Interlocked.Exchange(ref _flushArmed, 0); + + _flushSignal.SetResult(true); + } + + // IValueTaskSource +#region IValueTaskSource + + void IValueTaskSource.GetResult(short token) + { + if (token != (short)Volatile.Read(ref _generation)) + { + return; + } + + _flushSignal.GetResult(_flushSignal.Version); + } + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) + { + if (token != (short)Volatile.Read(ref _generation)) + { + return ValueTaskSourceStatus.Succeeded; + } + + return _flushSignal.GetStatus(_flushSignal.Version); + } + + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + { + if (token != (short)Volatile.Read(ref _generation)) + { + continuation(state); + + return; + } + _flushSignal.OnCompleted(continuation, state, _flushSignal.Version, flags); + } + +#endregion +} \ No newline at end of file diff --git a/frameworks/minima/Connection/Connection.cs b/frameworks/minima/Connection/Connection.cs new file mode 100644 index 00000000..b6b3a064 --- /dev/null +++ b/frameworks/minima/Connection/Connection.cs @@ -0,0 +1,108 @@ +using System.Runtime.InteropServices; +using Minima.Utils; + +namespace Minima; + +public sealed unsafe partial class Connection +{ + private readonly Reactor _reactor; + + public int ClientFd { get; private set; } + + // Bumped on Clear(); the low 16 bits are used as the IVTS token so stale + // awaiters can be detected after pool reuse. + private int _generation; + + // Refcount: the connection has two owners — the reactor (recv side) and the + // handler (which may run off-reactor). Init to 2 on accept; each owner DecRef's + // when done; teardown (Recycle) runs only at refs==0, so a connection is never + // recycled or pool-reused while a handler is still in flight on another thread. + private int _refs; + + public Connection(Reactor reactor, int fd, int writeSlabSize = 1024 * 16) + { + _reactor = reactor; + ClientFd = fd; + _writeSlabSize = writeSlabSize; + WriteBuffer = (byte*)NativeMemory.AlignedAlloc((nuint)writeSlabSize, 64); + + _manager = new UnmanagedMemoryManager(WriteBuffer, writeSlabSize); + } + + // ========================================================================= + // Pool lifecycle — invoked from Reactor.Dispatch's recv/send error paths. + // Reactor-thread only. + // + // teardown: MarkClosed() → wake awaiters with closed=1 + // DrainRecv() → return any in-flight buf_ring items + // close(fd) + // Clear() → reset state, bump _generation + // push to pool, OR Dispose() if pool is full + // ========================================================================= + + public void MarkClosed() + { + Volatile.Write(ref _closed, 1); + + if (Interlocked.Exchange(ref _armed, 0) == 1) + { + _readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), isClosed: true)); + } + else + { + Volatile.Write(ref _pending, 1); + } + + if (Interlocked.Exchange(ref _flushArmed, 0) == 1) + { + Volatile.Write(ref _flushInProgress, 0); + _flushSignal.SetResult(true); + } + } + + // Init to 2 (reactor + handler) at accept. + internal void InitRefs() => Volatile.Write(ref _refs, 2); + + // Release one owner's ref. Whoever drives it to 0 hands the connection to the + // reactor for teardown (close + Clear + pool) — never recycled before both done. + internal void DecRef() + { + if (Interlocked.Decrement(ref _refs) == 0) + { + _reactor.EnqueueRecycle(this); + } + } + + internal void Clear() + { + // Bump generation first — readers of IVTS plumbing observe this via + // Volatile.Read and stale tokens get RecvSnapshot.Closed() / no-op. + Interlocked.Increment(ref _generation); + + Volatile.Write(ref _armed, 0); + Volatile.Write(ref _pending, 0); + Volatile.Write(ref _closed, 0); + Volatile.Write(ref _flushArmed, 0); + Volatile.Write(ref _flushInProgress, 0); + + WriteHead = 0; + WriteTail = 0; + WriteInFlight = 0; + + _readSignal.Reset(); + _flushSignal.Reset(); + + _recv.Reset(); // discard any leftover SPSC items + IncrementalMode = false; // per-conn ring (if any) was torn down before Clear + } + + public void Dispose() + { + if (WriteBuffer != null) + { + NativeMemory.AlignedFree(WriteBuffer); + WriteBuffer = null; + } + DisposeIncremental(); + } +} \ No newline at end of file diff --git a/frameworks/minima/Connection/ConnectionDualPipe.cs b/frameworks/minima/Connection/ConnectionDualPipe.cs new file mode 100644 index 00000000..7b40e742 --- /dev/null +++ b/frameworks/minima/Connection/ConnectionDualPipe.cs @@ -0,0 +1,16 @@ +using System.IO.Pipelines; + +namespace Minima; + +public sealed class ConnectionDualPipe : IDuplexPipe +{ + public PipeReader Input { get; } + public PipeWriter Output { get; } + + public ConnectionDualPipe(Connection connection) + { + ArgumentNullException.ThrowIfNull(connection); + Input = new ConnectionPipeReader(connection); + Output = new ConnectionPipeWriter(connection); + } +} \ No newline at end of file diff --git a/frameworks/minima/Connection/ConnectionPipeReader.cs b/frameworks/minima/Connection/ConnectionPipeReader.cs new file mode 100644 index 00000000..14d9ca6c --- /dev/null +++ b/frameworks/minima/Connection/ConnectionPipeReader.cs @@ -0,0 +1,181 @@ +using System.Buffers; +using System.IO.Pipelines; +using Minima.Utils; +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima; + +/// +/// Adapts Minima's raw read API (ReadAsync + TryGetItem +/// + ReturnBuffer) to a standard . Recv buffers are +/// exposed zero-copy as a ReadOnlySequence<byte> (one segment per buffer) +/// and held until AdvanceTo consumes them, at which point fully-consumed buffers +/// are returned to the reactor. +/// +/// Convenience/compat layer for PipeReader consumers — the raw ReadAsync/ +/// TryGetItem path stays the faster one (this adds held-buffer + sequence +/// bookkeeping per read). +/// +public sealed class ConnectionPipeReader : PipeReader +{ + private readonly Connection _conn; + private readonly List _held = new(16); + private ReadOnlySequence _lastSequence; + + private bool _completed; + private bool _cancelRequested; + private bool _connectionClosed; + + private readonly struct Held + { + public readonly ReadOnlyMemory Memory; + public readonly SpscRecvRing.Item Item; + + public Held(ReadOnlyMemory memory, SpscRecvRing.Item item) + { + Memory = memory; + Item = item; + } + + public Held WithMemory(ReadOnlyMemory memory) => new(memory, Item); + } + + public ConnectionPipeReader(Connection connection) + { + _conn = connection ?? throw new ArgumentNullException(nameof(connection)); + } + + public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + ThrowIfCompleted(); + + if (_cancelRequested) + { + _cancelRequested = false; + return new ReadResult(BuildSequence(), isCanceled: true, isCompleted: _connectionClosed); + } + + // Anything still held from a previous read that wasn't fully consumed. + if (_held.Count > 0) + return new ReadResult(BuildSequence(), isCanceled: false, isCompleted: _connectionClosed); + + if (_connectionClosed) + return new ReadResult(default, isCanceled: false, isCompleted: true); + + RecvSnapshot snap = await _conn.ReadAsync(); + + while (_conn.TryGetItem(snap, out SpscRecvRing.Item item)) + { + if (item.HasBuffer) + _held.Add(new Held(item.AsMemoryManager().Memory, item)); + } + + _conn.ResetRead(); + + if (snap.IsClosed) + _connectionClosed = true; + + if (_cancelRequested) + { + _cancelRequested = false; + return new ReadResult(BuildSequence(), isCanceled: true, isCompleted: _connectionClosed); + } + + return new ReadResult(BuildSequence(), isCanceled: false, isCompleted: _connectionClosed); + } + + public override bool TryRead(out ReadResult result) + { + ThrowIfCompleted(); + + if (_held.Count > 0) + { + result = new ReadResult(BuildSequence(), isCanceled: false, isCompleted: _connectionClosed); + return true; + } + + if (_connectionClosed) + { + result = new ReadResult(default, isCanceled: false, isCompleted: true); + return true; + } + + result = default; + return false; + } + + public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed); + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + if (_held.Count == 0) + return; + + long consumedBytes = _lastSequence.Slice(0, consumed).Length; + + while (_held.Count > 0 && consumedBytes > 0) + { + Held seg = _held[0]; + int available = seg.Memory.Length; + + if (consumedBytes >= available) + { + // Whole buffer consumed — return it to the reactor. + _conn.ReturnBuffer(seg.Item); + _held.RemoveAt(0); + consumedBytes -= available; + } + else + { + // Partial — keep the unconsumed tail of this buffer. + _held[0] = seg.WithMemory(seg.Memory[(int)consumedBytes..]); + consumedBytes = 0; + } + } + } + + public override void CancelPendingRead() => _cancelRequested = true; + + public override void Complete(Exception? exception = null) + { + if (_completed) + return; + + _completed = true; + + for (int i = 0; i < _held.Count; i++) + _conn.ReturnBuffer(_held[i].Item); + + _held.Clear(); + } + + private ReadOnlySequence BuildSequence() + { + if (_held.Count == 0) + { + _lastSequence = default; + return _lastSequence; + } + + if (_held.Count == 1) + { + _lastSequence = new ReadOnlySequence(_held[0].Memory); + return _lastSequence; + } + + var head = new RingSegment(_held[0].Memory, _held[0].Item.Bid); + RingSegment tail = head; + + for (int i = 1; i < _held.Count; i++) + tail = tail.Append(_held[i].Memory, _held[i].Item.Bid); + + _lastSequence = new ReadOnlySequence(head, 0, tail, tail.Memory.Length); + return _lastSequence; + } + + private void ThrowIfCompleted() + { + if (_completed) + throw new InvalidOperationException("Reading is not allowed after the reader was completed."); + } +} diff --git a/frameworks/minima/Connection/ConnectionPipeWriter.cs b/frameworks/minima/Connection/ConnectionPipeWriter.cs new file mode 100644 index 00000000..56be337d --- /dev/null +++ b/frameworks/minima/Connection/ConnectionPipeWriter.cs @@ -0,0 +1,63 @@ +using System.IO.Pipelines; +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima; + +/// +/// Adapts Minima's write API (GetMemory/GetSpan/Advance/ +/// FlushAsync) to a standard , so PipeWriter-based code +/// can write responses through the connection's per-connection slab. +/// A thin wrapper — all the work lives in Connection. +/// +public sealed class ConnectionPipeWriter : PipeWriter +{ + private readonly Connection _conn; + private bool _completed; + private bool _cancelRequested; + private long _unflushed; + + public ConnectionPipeWriter(Connection connection) + { + _conn = connection ?? throw new ArgumentNullException(nameof(connection)); + } + + public override bool CanGetUnflushedBytes => true; + public override long UnflushedBytes => _unflushed; + + public override Memory GetMemory(int sizeHint = 0) => _conn.GetMemory(sizeHint); + + public override Span GetSpan(int sizeHint = 0) => _conn.GetSpan(sizeHint); + + public override void Advance(int bytes) + { + _unflushed += bytes; + _conn.Advance(bytes); + } + + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + if (_cancelRequested) + { + _cancelRequested = false; + return new ValueTask(new FlushResult(isCanceled: true, isCompleted: _completed)); + } + + _unflushed = 0; + ValueTask inner = _conn.FlushAsync(); + + if (inner.IsCompletedSuccessfully) + return new ValueTask(new FlushResult(isCanceled: false, isCompleted: _completed)); + + return AwaitFlush(inner); + } + + private async ValueTask AwaitFlush(ValueTask inner) + { + await inner; + return new FlushResult(isCanceled: false, isCompleted: _completed); + } + + public override void CancelPendingFlush() => _cancelRequested = true; + + public override void Complete(Exception? exception = null) => _completed = true; +} diff --git a/frameworks/minima/Connection/RecvSnapshot.cs b/frameworks/minima/Connection/RecvSnapshot.cs new file mode 100644 index 00000000..015afc4a --- /dev/null +++ b/frameworks/minima/Connection/RecvSnapshot.cs @@ -0,0 +1,15 @@ +namespace Minima; + +public readonly struct RecvSnapshot +{ + public readonly long Tail; + public readonly bool IsClosed; + + public RecvSnapshot(long tail, bool isClosed) + { + Tail = tail; + IsClosed = isClosed; + } + + public static RecvSnapshot Closed() => new(0, isClosed: true); +} \ No newline at end of file diff --git a/frameworks/minima/Dockerfile b/frameworks/minima/Dockerfile new file mode 100644 index 00000000..49948ab0 --- /dev/null +++ b/frameworks/minima/Dockerfile @@ -0,0 +1,15 @@ +FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build +WORKDIR /source +COPY minima.csproj ./ +RUN dotnet restore +COPY . . +RUN dotnet publish -c Release --no-self-contained -o /app/out + +# Minima drives io_uring through direct libc syscalls (no liburing). The bench +# harness runs containers with --security-opt seccomp=unconfined (required for +# io_uring_setup/enter); engine="io_uring" makes validate.sh enable it too. +FROM mcr.microsoft.com/dotnet/runtime:10.0 +WORKDIR /app +COPY --from=build /app/out ./ +EXPOSE 8080 +ENTRYPOINT ["dotnet", "minima.dll"] diff --git a/frameworks/minima/Program.cs b/frameworks/minima/Program.cs new file mode 100644 index 00000000..403b3c88 --- /dev/null +++ b/frameworks/minima/Program.cs @@ -0,0 +1,559 @@ +using System.Buffers.Text; +using System.Text; +using System.Text.Json; +using Minima.Utils; + +namespace Minima; + +/// +/// minima — the Minima io_uring engine serving the H1-isolated profiles +/// (baseline, pipelined, limited-conn). Minima's engine is vendored unchanged +/// (per-reactor SO_REUSEPORT + multishot accept, multishot recv into a provided +/// buffer ring, RCA=false inline continuations + the reactor-thread short-circuit +/// in Enqueue*). Only the request handler is ours: a hand-rolled HTTP/1.1 parser +/// on Minima's raw recv/send API. No HTTP framework. +/// +/// Endpoints: +/// GET/POST /baseline11?a=&b= -> text/plain "a + b (+ body)" +/// GET /pipeline -> text/plain "ok" +/// GET /json/{count}?m=N -> application/json, per-item total = price*quantity*N +/// +internal static class Program +{ + private static int Main() + { + int reactors = Environment.ProcessorCount; + if (int.TryParse(Environment.GetEnvironmentVariable("MINIMA_REACTORS"), out int r) && r > 0) + reactors = r; + + ushort port = 8080; + if (ushort.TryParse(Environment.GetEnvironmentVariable("MINIMA_PORT"), out ushort p) && p > 0) + port = p; + + var config = new ServerConfig + { + Port = port, + ReactorCount = reactors, + UsePipe = false, + Incremental = false, + RecvBufferSize = 16 * 1024, + BufferRingEntries = 1024, + }; + + Console.WriteLine($"[minima] {config.ReactorCount} reactors on :{config.Port} " + + $"(incremental={config.Incremental}) — hand-rolled HTTP/1.1"); + + var dsPath = Environment.GetEnvironmentVariable("MINIMA_DATASET") ?? "/data/dataset.json"; + var dataset = Dataset.Load(dsPath); + Console.WriteLine($"[minima] loaded {dataset.Count} dataset items from {dsPath}"); + + Handler.Init(config, dataset); + + var threads = new Thread[config.ReactorCount]; + for (int i = 0; i < config.ReactorCount; i++) + { + var reactor = new Reactor(i, config); + threads[i] = new Thread(reactor.Run) { Name = $"reactor-{i}", IsBackground = false }; + threads[i].Start(); + } + foreach (var t in threads) t.Join(); + return 0; + } +} + +internal static class Handler +{ + private static int _slab = 16 * 1024; + private static Dataset _ds = Dataset.Empty; + + public static void Init(ServerConfig config, Dataset ds) + { + _slab = config.WriteSlabSize; + _ds = ds; + } + + public static async Task HandleAsync(Reactor reactor, Connection conn) + { + var s = new HttpSession(_ds); + try + { + while (true) + { + RecvSnapshot snap = await conn.ReadAsync(); + while (conn.TryGetItem(snap, out SpscRecvRing.Item item)) + { + if (item.HasBuffer) + { + s.Feed(item.AsSpan()); + conn.ReturnBuffer(in item); + } + } + + int sent = 0; + while (sent < s.OutLen) + { + int chunk = Math.Min(s.OutLen - sent, _slab); + conn.Write(s.Out.AsSpan(sent, chunk)); + await conn.FlushAsync(); + sent += chunk; + } + s.OutLen = 0; + + if (snap.IsClosed || s.WantClose) + return; + + conn.ResetRead(); + } + } + catch (Exception ex) + { + Console.Error.WriteLine($"[r{reactor.Id}] http handler crash fd={conn.ClientFd}: {ex}"); + } + finally + { + conn.DecRef(); + } + } + + public static Task HandlePipeAsync(Reactor reactor, Connection conn) => HandleAsync(reactor, conn); +} + +/// +/// Hand-rolled HTTP/1.1: accumulates inbound bytes, parses complete requests +/// (request line, headers, Content-Length + chunked bodies, keep-alive, +/// pipelining, fragmented reads), and appends responses to . +/// +internal sealed class HttpSession +{ + private readonly Dataset _ds; + private byte[] _carry = new byte[2048]; + private int _carryLen; + + public byte[] Out = new byte[4096]; + public int OutLen; + public bool WantClose; + + public HttpSession(Dataset ds) => _ds = ds; + + public void Feed(ReadOnlySpan data) + { + AppendCarry(data); + int pos = 0; + while (TryOne(_carry.AsSpan(pos, _carryLen - pos), out int consumed, out bool close)) + { + pos += consumed; + if (close) { WantClose = true; break; } + } + if (pos > 0) + { + int rem = _carryLen - pos; + if (rem > 0) Array.Copy(_carry, pos, _carry, 0, rem); + _carryLen = rem; + } + } + + /// Parse one request from buf; append its response to Out. Returns false if + /// the request isn't fully buffered yet. + private bool TryOne(ReadOnlySpan buf, out int consumed, out bool close) + { + consumed = 0; + close = false; + + int he = buf.IndexOf("\r\n\r\n"u8); + if (he < 0) return false; + ReadOnlySpan head = buf[..he]; + + int rlEnd = head.IndexOf("\r\n"u8); + if (rlEnd < 0) rlEnd = head.Length; + ReadOnlySpan reqLine = head[..rlEnd]; + + ReadOnlySpan target = default; + int sp1 = reqLine.IndexOf((byte)' '); + if (sp1 >= 0) + { + ReadOnlySpan rest = reqLine[(sp1 + 1)..]; + int sp2 = rest.IndexOf((byte)' '); + target = sp2 >= 0 ? rest[..sp2] : rest; + } + + int contentLength = -1; + bool chunked = false; + ReadOnlySpan hdrs = head[Math.Min(rlEnd + 2, head.Length)..]; + while (hdrs.Length > 0) + { + int nl = hdrs.IndexOf("\r\n"u8); + ReadOnlySpan line = nl >= 0 ? hdrs[..nl] : hdrs; + int colon = line.IndexOf((byte)':'); + if (colon >= 0) + { + ReadOnlySpan name = line[..colon]; + ReadOnlySpan val = Trim(line[(colon + 1)..]); + if (CiEq(name, "content-length"u8)) + { + if (Utf8Parser.TryParse(val, out int cl, out _)) contentLength = cl; + } + else if (CiEq(name, "transfer-encoding"u8) && CiContains(val, "chunked"u8)) + { + chunked = true; + } + else if (CiEq(name, "connection"u8) && CiEq(val, "close"u8)) + { + close = true; + } + } + if (nl < 0) break; + hdrs = hdrs[(nl + 2)..]; + } + + int bodyStart = he + 4; + long bodyInt; + int total; + if (chunked) + { + if (!DecodeChunked(buf[bodyStart..], out bodyInt, out int used)) return false; + total = bodyStart + used; + } + else if (contentLength > 0) + { + if (buf.Length < bodyStart + contentLength) return false; + bodyInt = ParseLoose(buf.Slice(bodyStart, contentLength)); + total = bodyStart + contentLength; + } + else + { + bodyInt = 0; + total = bodyStart; + } + + Respond(target, bodyInt, close); + consumed = total; + return true; + } + + private void Respond(ReadOnlySpan target, long bodyInt, bool close) + { + int q = target.IndexOf((byte)'?'); + ReadOnlySpan path = q >= 0 ? target[..q] : target; + ReadOnlySpan query = q >= 0 ? target[(q + 1)..] : default; + + if (path.SequenceEqual("/pipeline"u8)) + { + WriteResp("ok"u8, close); + } + else if (path.StartsWith("/json/"u8)) + { + ReadOnlySpan tail = path[6..]; + if (Utf8Parser.TryParse(tail, out int count, out int used) && used == tail.Length + && count >= 1 && count <= _ds.Count) + { + JsonResp(count, ParseM(query), close); + } + else + { + Write404(close); + } + } + else + { + long sum = SumAB(query) + bodyInt; + Span num = stackalloc byte[24]; + Utf8Formatter.TryFormat(sum, num, out int n); + WriteResp(num[..n], close); + } + } + + private void WriteResp(ReadOnlySpan body, bool close) + { + AppendOut("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: "u8); + Span num = stackalloc byte[16]; + Utf8Formatter.TryFormat(body.Length, num, out int n); + AppendOut(num[..n]); + AppendOut(close ? "\r\nConnection: close\r\n\r\n"u8 : "\r\n\r\n"u8); + AppendOut(body); + } + + private void JsonResp(int count, long m, bool close) + { + AppendOut("HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: "u8); + int clOff = OutLen; + AppendOut("000000\r\n"u8); // 6-digit zero-padded Content-Length placeholder + if (close) AppendOut("Connection: close\r\n"u8); + AppendOut("\r\n"u8); + int bodyStart = OutLen; + + // Serialize from the parsed model on every request — no precomputed + // fragments. Synchronous on the reactor thread (a few microseconds of + // CPU): offloading to the thread pool would add a hop and resume the + // handler off-reactor, defeating the RCA short-circuit on Flush. + AppendOut("{\"items\":["u8); + for (int i = 0; i < count; i++) + { + if (i > 0) AppendOut(","u8); + ref readonly Item it = ref _ds.Items[i]; + AppendOut("{\"id\":"u8); + AppendLong(it.Id); + AppendOut(",\"name\":\""u8); + AppendOut(it.Name); + AppendOut("\",\"category\":\""u8); + AppendOut(it.Category); + AppendOut("\",\"price\":"u8); + AppendLong(it.Price); + AppendOut(",\"quantity\":"u8); + AppendLong(it.Quantity); + AppendOut(it.Active ? ",\"active\":true,\"tags\":["u8 : ",\"active\":false,\"tags\":["u8); + for (int t = 0; t < it.Tags.Length; t++) + { + if (t > 0) AppendOut(","u8); + AppendOut("\""u8); + AppendOut(it.Tags[t]); + AppendOut("\""u8); + } + AppendOut("],\"rating\":{\"score\":"u8); + AppendLong(it.Score); + AppendOut(",\"count\":"u8); + AppendLong(it.RatingCount); + AppendOut("},\"total\":"u8); + AppendLong(it.Price * it.Quantity * m); + AppendOut("}"u8); + } + AppendOut("],\"count\":"u8); + AppendLong(count); + AppendOut("}"u8); + + // Backfill the 6-digit zero-padded Content-Length now the body length is known. + int v = OutLen - bodyStart; + for (int d = clOff + 5; d >= clOff; d--) { Out[d] = (byte)('0' + v % 10); v /= 10; } + } + + private void Write404(bool close) + { + AppendOut("HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 9\r\n"u8); + if (close) AppendOut("Connection: close\r\n"u8); + AppendOut("\r\nNot Found"u8); + } + + private void AppendLong(long v) + { + Span num = stackalloc byte[20]; + Utf8Formatter.TryFormat(v, num, out int n); + AppendOut(num[..n]); + } + + private static long ParseM(ReadOnlySpan query) + { + while (query.Length > 0) + { + int amp = query.IndexOf((byte)'&'); + ReadOnlySpan kv = amp >= 0 ? query[..amp] : query; + if (kv.Length >= 2 && kv[0] == (byte)'m' && kv[1] == (byte)'=') + { + Utf8Parser.TryParse(kv[2..], out long m, out _); + return m; + } + if (amp < 0) break; + query = query[(amp + 1)..]; + } + return 1; + } + + private static long SumAB(ReadOnlySpan query) + { + long a = 0, b = 0; + while (query.Length > 0) + { + int amp = query.IndexOf((byte)'&'); + ReadOnlySpan kv = amp >= 0 ? query[..amp] : query; + int eq = kv.IndexOf((byte)'='); + if (eq >= 0) + { + ReadOnlySpan k = kv[..eq]; + ReadOnlySpan v = kv[(eq + 1)..]; + if (k.SequenceEqual("a"u8)) a = ParseLoose(v); + else if (k.SequenceEqual("b"u8)) b = ParseLoose(v); + } + if (amp < 0) break; + query = query[(amp + 1)..]; + } + return a + b; + } + + /// Decode a chunked body into an integer. Returns false if the terminating + /// 0-chunk isn't fully buffered. Bodies in these profiles are tiny. + private static bool DecodeChunked(ReadOnlySpan buf, out long bodyInt, out int used) + { + bodyInt = 0; + used = 0; + Span body = stackalloc byte[256]; + int blen = 0; + int pos = 0; + while (true) + { + int nl = buf[pos..].IndexOf("\r\n"u8); + if (nl < 0) return false; + if (!ParseHex(buf.Slice(pos, nl), out int size)) return false; + pos += nl + 2; + if (size == 0) + { + int end = buf[pos..].IndexOf("\r\n"u8); // final CRLF (no trailers) + if (end < 0) return false; + used = pos + end + 2; + bodyInt = ParseLoose(body[..blen]); + return true; + } + if (buf.Length < pos + size + 2) return false; + if (blen + size <= body.Length) + { + buf.Slice(pos, size).CopyTo(body[blen..]); + blen += size; + } + pos += size; + if (!buf.Slice(pos, 2).SequenceEqual("\r\n"u8)) return false; + pos += 2; + } + } + + // ── byte helpers ───────────────────────────────────────────────────────── + private void AppendCarry(ReadOnlySpan d) + { + if (_carry.Length < _carryLen + d.Length) + Array.Resize(ref _carry, Math.Max(_carryLen + d.Length, _carry.Length * 2)); + d.CopyTo(_carry.AsSpan(_carryLen)); + _carryLen += d.Length; + } + + private void AppendOut(ReadOnlySpan d) + { + if (Out.Length < OutLen + d.Length) + Array.Resize(ref Out, Math.Max(OutLen + d.Length, Out.Length * 2)); + d.CopyTo(Out.AsSpan(OutLen)); + OutLen += d.Length; + } + + private static ReadOnlySpan Trim(ReadOnlySpan b) + { + int s = 0, e = b.Length; + while (s < e && (b[s] == (byte)' ' || b[s] == (byte)'\t')) s++; + while (e > s && (b[e - 1] == (byte)' ' || b[e - 1] == (byte)'\t')) e--; + return b[s..e]; + } + + private static bool CiEq(ReadOnlySpan a, ReadOnlySpan b) + { + if (a.Length != b.Length) return false; + for (int i = 0; i < a.Length; i++) + if (Lower(a[i]) != Lower(b[i])) return false; + return true; + } + + private static bool CiContains(ReadOnlySpan h, ReadOnlySpan n) + { + if (n.Length == 0 || h.Length < n.Length) return false; + for (int i = 0; i + n.Length <= h.Length; i++) + if (CiEq(h.Slice(i, n.Length), n)) return true; + return false; + } + + private static byte Lower(byte c) => (byte)(c >= 'A' && c <= 'Z' ? c + 32 : c); + + private static long ParseLoose(ReadOnlySpan s) + { + int i = 0; + while (i < s.Length && (s[i] == ' ' || s[i] == '\t' || s[i] == '\r' || s[i] == '\n')) i++; + bool neg = false; + if (i < s.Length && s[i] == '-') { neg = true; i++; } + long n = 0; + while (i < s.Length && s[i] >= '0' && s[i] <= '9') { n = n * 10 + (s[i] - '0'); i++; } + return neg ? -n : n; + } + + private static bool ParseHex(ReadOnlySpan b, out int val) + { + val = 0; + bool any = false; + foreach (byte c in b) + { + int d; + if (c >= '0' && c <= '9') d = c - '0'; + else if (c >= 'a' && c <= 'f') d = c - 'a' + 10; + else if (c >= 'A' && c <= 'F') d = c - 'A' + 10; + else if (c == ';' || c == ' ') break; + else return any; + val = val * 16 + d; + any = true; + } + return any; + } +} + +/// +/// A dataset item parsed into its model fields (string values stored as UTF-8). +/// The json handler serializes these field-by-field on every request. +/// +internal readonly struct Item +{ + public readonly long Id, Price, Quantity, Score, RatingCount; + public readonly bool Active; + public readonly byte[] Name, Category; + public readonly byte[][] Tags; + + public Item(long id, byte[] name, byte[] category, long price, long quantity, + bool active, byte[][] tags, long score, long ratingCount) + { + Id = id; Name = name; Category = category; Price = price; Quantity = quantity; + Active = active; Tags = tags; Score = score; RatingCount = ratingCount; + } +} + +/// +/// Dataset for the json profile — items parsed into model fields at startup so +/// the handler serializes the full JSON from the model on every request (no +/// precomputed / cached response fragments). Read-only after load, shared across +/// reactor threads. String values are clean ASCII in the bench dataset, so the +/// handler emits them without escaping. +/// +internal sealed class Dataset +{ + public readonly Item[] Items; + public int Count => Items.Length; + + public static readonly Dataset Empty = new(Array.Empty()); + + private Dataset(Item[] items) { Items = items; } + + public static Dataset Load(string path) + { + try + { + using var doc = JsonDocument.Parse(File.ReadAllBytes(path)); + JsonElement root = doc.RootElement; + int n = root.GetArrayLength(); + var items = new Item[n]; + int i = 0; + foreach (JsonElement e in root.EnumerateArray()) + { + JsonElement rating = e.GetProperty("rating"); + JsonElement tagsEl = e.GetProperty("tags"); + var tags = new byte[tagsEl.GetArrayLength()][]; + int t = 0; + foreach (JsonElement tag in tagsEl.EnumerateArray()) + tags[t++] = Encoding.UTF8.GetBytes(tag.GetString() ?? ""); + items[i++] = new Item( + e.GetProperty("id").GetInt64(), + Encoding.UTF8.GetBytes(e.GetProperty("name").GetString() ?? ""), + Encoding.UTF8.GetBytes(e.GetProperty("category").GetString() ?? ""), + e.GetProperty("price").GetInt64(), + e.GetProperty("quantity").GetInt64(), + e.GetProperty("active").GetBoolean(), + tags, + rating.GetProperty("score").GetInt64(), + rating.GetProperty("count").GetInt64()); + } + return new Dataset(items); + } + catch (Exception ex) + { + Console.Error.WriteLine($"[minima] dataset load failed ({path}): {ex.Message}"); + return Empty; + } + } +} diff --git a/frameworks/minima/README.md b/frameworks/minima/README.md new file mode 100644 index 00000000..e2f10a56 --- /dev/null +++ b/frameworks/minima/README.md @@ -0,0 +1,32 @@ +# minima + +The **Minima** io_uring engine serving the H1-isolated profiles (`baseline`, +`pipelined`, `limited-conn`, `json`). Minima is a from-scratch C# multi-reactor +io_uring server; this entry vendors the engine unchanged and adds a hand-rolled +HTTP/1.1 handler (no HTTP framework). + +## Engine (vendored as-is) +- **Per-reactor SO_REUSEPORT + multishot accept** — each reactor thread owns its + own listener and ring; the kernel shards connections (no central acceptor). +- **Multishot recv into a provided buffer ring**. +- **RCA=false inline continuations** — handler resumes inline on the reactor + thread, and `Enqueue{Return,Flush,Recycle}` short-circuit straight to the ring + (no MPSC queue / eventfd wake) since the caller is the reactor thread. + +## Handler (`Program.cs`) +Hand-rolled HTTP/1.1: request line + headers, `Content-Length` and chunked +bodies, keep-alive, pipelining (responses batched per drain), and fragmented-read +reassembly. + +| Endpoint | Response | +|---|---| +| `GET/POST /baseline11?a=&b=` | `text/plain` — `a + b` (+ POST body as an integer) | +| `GET /pipeline` | `text/plain` — `ok` | +| `GET /json/{count}?m=N` | `application/json` — `{items:[…],count}`, each item with `total = price*quantity*N` | + +For `json`, each item's static JSON is precomputed from the mounted +`/data/dataset.json` at startup; a request only appends the dynamic `total`. + +io_uring needs `seccomp=unconfined` (harness-provided; `engine: "io_uring"` makes +validate.sh enable it). `MINIMA_PORT` / `MINIMA_REACTORS` / `MINIMA_DATASET` +override for local runs. diff --git a/frameworks/minima/Reactor/Reactor.Incremental.cs b/frameworks/minima/Reactor/Reactor.Incremental.cs new file mode 100644 index 00000000..13b13518 --- /dev/null +++ b/frameworks/minima/Reactor/Reactor.Incremental.cs @@ -0,0 +1,306 @@ +using System.Runtime.InteropServices; +using Minima.Utils; +using static Minima.Native; +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima; + +/// +/// Incremental-buffer (IOU_PBUF_RING_INC) path. Each connection gets its own +/// buffer ring: one buffer accumulates that connection's byte stream across many +/// recvs, so buffers are recycled only when the kernel is done appending AND the +/// handler has returned every slice it was handed. Selected per reactor by the +/// `_incremental` flag; the shared-ring path in Reactor.cs is untouched. +/// +public sealed unsafe partial class Reactor +{ + private Stack? _freeGids; + private Mpsc? _returnQInc; + + private void InitIncremental() + { + // Per-connection rings; no shared ring. GID 1 reserved; per-conn GIDs 2..MaxConnections+1. + _freeGids = new Stack(MaxConnections); + for (int g = MaxConnections + 1; g >= 2; g--) + _freeGids.Push((ushort)g); + + _returnQInc = new Mpsc(1 << 16); + } + + private ushort AllocGid() => _freeGids!.Pop(); + private void FreeGid(ushort gid) => _freeGids!.Push(gid); + + // ========================================================================= + // Per-connection ring lifecycle + // ========================================================================= + + private void SetupConnectionBufRing(Connection conn) + { + ushort gid = AllocGid(); + int entries = ConnBufRingEntries; + + // Ring control area + slab + tracking arrays are allocated once and + // reused across pool lives; only the kernel registration is per-life. + if (conn.BufRing == null) + conn.BufRing = (byte*)NativeMemory.AlignedAlloc((nuint)entries * 16, 4096); + NativeMemory.Clear(conn.BufRing, (nuint)entries * 16); + + if (conn.BufSlab == null) + conn.BufSlab = (byte*)NativeMemory.AlignedAlloc((nuint)entries * (nuint)IncRecvBufferSize, 64); + + conn.CumOffset ??= new int[entries]; + conn.RefCount ??= new int[entries]; + conn.KernelDone ??= new bool[entries]; + Array.Clear(conn.CumOffset, 0, entries); + Array.Clear(conn.RefCount, 0, entries); + Array.Clear(conn.KernelDone, 0, entries); + + var reg = new io_uring_buf_reg + { + ring_addr = (ulong)conn.BufRing, + ring_entries = (uint)entries, + bgid = gid, + flags = IOU_PBUF_RING_INC, + }; + int ret = io_uring_register(Ring.Fd, IORING_REGISTER_PBUF_RING, ®, 1); + if (ret < 0) + throw new InvalidOperationException($"register pbuf_ring (inc) failed: ret={ret} gid={gid}"); + + conn.Bgid = gid; + conn.BufRingEntries = entries; + conn.BufRingMask = (uint)(entries - 1); + conn.IncrementalMode = true; + + for (ushort bid = 0; bid < entries; bid++) + { + byte* slot = conn.BufRing + (uint)bid * 16; + *(ulong*)(slot + 0) = (ulong)(conn.BufSlab + bid * (nuint)IncRecvBufferSize); + *(uint*)(slot + 8) = IncRecvBufferSize; + *(ushort*)(slot + 12) = bid; + } + Volatile.Write(ref *(ushort*)(conn.BufRing + 14), (ushort)entries); + } + + private void TeardownConnectionBufRing(Connection conn) + { + if (conn.IncrementalMode) + { + var reg = new io_uring_buf_reg { bgid = conn.Bgid }; + io_uring_register(Ring.Fd, IORING_UNREGISTER_PBUF_RING, ®, 1); + FreeGid(conn.Bgid); + } + // BufRing / BufSlab / arrays stay allocated for pool reuse. + } + + // Re-add a fully-consumed buffer to its connection's ring (reactor-thread only). + private void ReturnConnectionBuffer(Connection conn, ushort bid) + { + conn.CumOffset![bid] = 0; + conn.RefCount![bid] = 0; + conn.KernelDone![bid] = false; + + ushort tail = Volatile.Read(ref *(ushort*)(conn.BufRing + 14)); + byte* slot = conn.BufRing + (tail & conn.BufRingMask) * 16; + *(ulong*)(slot + 0) = (ulong)(conn.BufSlab + bid * (nuint)IncRecvBufferSize); + *(uint*)(slot + 8) = IncRecvBufferSize; + *(ushort*)(slot + 12) = bid; + Volatile.Write(ref *(ushort*)(conn.BufRing + 14), (ushort)(tail + 1)); + } + + // ========================================================================= + // Refcounted return path (handler → reactor), carrying (fd, gen, bid) + // ========================================================================= + + // (fd, gen, bid) packed into one ulong for the incremental return queue: + // fd in the high 32 bits, gen in the next 16, bid in the low 16. + private static ulong PackReturn(int fd, ushort gen, ushort bid) + => ((ulong)(uint)fd << 32) | ((ulong)gen << 16) | bid; + + private static void UnpackReturn(ulong packed, out int fd, out ushort gen, out ushort bid) + { + fd = (int)(packed >> 32); + gen = (ushort)((packed >> 16) & 0xFFFF); + bid = (ushort)(packed & 0xFFFF); + } + + public void EnqueueReturnQIncremental(int fd, ushort gen, ushort bid) + { + // Fast path: caller is the reactor thread (handler resumed inline). + if (Environment.CurrentManagedThreadId == _reactorThreadId) + { + ApplyReturnIncremental(fd, gen, bid); + return; + } + ulong packed = PackReturn(fd, gen, bid); + SpinWait sw = default; + while (!_returnQInc!.TryEnqueue(packed)) + sw.SpinOnce(); + WakeFdWrite(); + } + + private void DrainReturnQIncremental() + { + while (_returnQInc!.TryDequeue(out ulong packed)) + { + UnpackReturn(packed, out int fd, out ushort gen, out ushort bid); + ApplyReturnIncremental(fd, gen, bid); + } + } + + private void ApplyReturnIncremental(int fd, ushort gen, ushort bid) + { + if (!Connections.TryGetValue(fd, out var conn) || !conn.IncrementalMode) + { + return; // fd gone / ring already torn down + } + if ((ushort)conn.Generation != gen) + { + return; // stale return from a previous life (fd reused) + } + + conn.RefCount![bid]--; + if (conn.RefCount[bid] <= 0 && conn.KernelDone![bid]) + { + ReturnConnectionBuffer(conn, bid); + } + } + + // ========================================================================= + // Incremental reactor loop + // ========================================================================= + + private void LoopIncremental() + { + while (true) + { + DrainReturnQIncremental(); + DrainFlushQ(); + DrainRecycleQ(); + + int rc = Ring.SubmitAndWait(1); + if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY) + { + Console.Error.WriteLine($"[r{Id}] io_uring_enter failed: {rc}"); + + break; + } + + uint ready = Ring.CqReady(); + for (uint i = 0; i < ready; i++) + { + DispatchIncremental(in Ring.CqeAt(i)); + } + Ring.CqAdvance(ready); + } + } + + private void DispatchIncremental(in IoUringCqe cqe) + { + ulong kind = cqe.user_data & 0xffffffff_00000000UL; + int fd = (int)(cqe.user_data & 0xffffffffUL); + bool more = (cqe.flags & IORING_CQE_F_MORE) != 0; + + if (kind == KindWake) + { + ulong drain; + read(_wakeFd, &drain, 8); + if (!more) + { + ArmWakePoll(); + } + return; + } + + if (kind == KindAccept) + { + if (cqe.res >= 0) + { + int clientFd = cqe.res; + SetNoDelay(clientFd); + Connection conn = _pool.TryPop(out var pooled) + ? pooled.SetFd(clientFd) + : new Connection(this, clientFd, _config.WriteSlabSize); + Connections[clientFd] = conn; + conn.InitRefs(); + SetupConnectionBufRing(conn); + SubmitRecvMultishot(clientFd, conn.Bgid); + + _ = _config.UsePipe + ? Handler.HandlePipeAsync(this, conn) + : Handler.HandleAsync(this, conn); + } + else + { + Console.Error.WriteLine($"[r{Id}] accept error: {cqe.res}"); + } + if (!more) + { + SubmitAcceptMultishot(); + } + } + else if (kind == KindRecv) + { + bool hasBuf = (cqe.flags & IORING_CQE_F_BUFFER) != 0; + bool bufMore = (cqe.flags & IORING_CQE_F_BUF_MORE) != 0; + ushort bid = hasBuf ? (ushort)(cqe.flags >> IORING_CQE_BUFFER_SHIFT) : (ushort)0; + + if (cqe.res <= 0) + { + // Peer EOF / recv error — the whole per-conn ring is freed in Recycle. + if (Connections.Remove(fd, out var dyingConn)) + { + dyingConn.MarkClosed(); + dyingConn.DecRef(); + } + + return; + } + + if (!Connections.TryGetValue(fd, out var conn)) + { + return; // straggler for a connection whose ring is already gone + } + + // Data lands at the buffer's running offset; the kernel keeps + // appending to this bid until the buffer is full (F_BUF_MORE clear). + byte* ptr = conn.BufSlab + (nuint)bid * (nuint)IncRecvBufferSize + (nuint)conn.CumOffset![bid]; + conn.CumOffset[bid] += cqe.res; + conn.RefCount![bid]++; + if (!bufMore || !more) + { + conn.KernelDone![bid] = true; + } + + conn.Complete(cqe.res, bid, hasBuffer: true, ptr); + + if (!more) + { + SubmitRecvMultishot(fd, conn.Bgid); + } + } + else if (kind == KindSend) + { + if (!Connections.TryGetValue(fd, out var conn)) + { + return; + } + if (cqe.res <= 0) + { + Connections.Remove(fd); + conn.MarkClosed(); + conn.DecRef(); + + return; + } + conn.WriteHead += cqe.res; + if (conn.WriteHead < conn.WriteInFlight) + { + SubmitSend(fd, conn.WriteBuffer + conn.WriteHead, (uint)(conn.WriteInFlight - conn.WriteHead)); + + return; + } + + conn.CompleteFlush(); + } + } +} diff --git a/frameworks/minima/Reactor/Reactor.cs b/frameworks/minima/Reactor/Reactor.cs new file mode 100644 index 00000000..764bccb6 --- /dev/null +++ b/frameworks/minima/Reactor/Reactor.cs @@ -0,0 +1,564 @@ +using System.Collections.Concurrent; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using Minima.Utils; +using static Minima.Native; +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima; + +/// +/// One reactor = one thread + one io_uring + one listening socket (SO_REUSEPORT) +/// + one connection map. The reactor thread is the sole writer of the SQ ring, +/// the kernel-shared buf_ring, and the connection map. Handlers may run on any +/// thread (e.g. resumed by a thread-pool timer after `await Task.Delay(1)`); +/// they reach the reactor only through two MPSC queues (`_returnQ`, `_flushQ`) +/// woken by an `eventfd` registered as a multishot poll in the ring. +/// +public sealed unsafe partial class Reactor +{ + public readonly int Id; + public Ring Ring = null!; // created on the reactor's own thread (DEFER_TASKRUN requires same-thread setup+enter) + public readonly Dictionary Connections = new(); + + private int _listenFd; + private readonly ServerConfig _config; + private readonly ushort _port; + private readonly uint _ringEntries; + private readonly bool _incremental; + private readonly uint RecvBufferSize; + + // CQE user_data layout: kind tag in the high 32 bits, fd in the low 32. + private const ulong KindAccept = 1UL << 32; + private const ulong KindRecv = 2UL << 32; + private const ulong KindSend = 3UL << 32; + private const ulong KindWake = 4UL << 32; // eventfd-based cross-thread wake + + // Provided-buffer ring (one per reactor, shared by all its connections). + private const ushort BgId = 1; + private readonly uint BufferRingEntries; // power of two + private byte* _bufRing; // io_uring_buf_ring (kernel-shared) + private byte* _bufSlab; // contiguous slab of recv buffers + private uint _bufRingMask; + private ushort _bufRingTail; + + // Cross-thread wake mechanism: handlers running off-reactor enqueue work + // into these MPSC queues and `eventfd_write` _wakeFd; a multishot poll on + // _wakeFd registered with the ring delivers a CQE that wakes the reactor. + // When the caller is already the reactor thread (the common case — handler + // resumed inline from an IVTS SetResult), the Enqueue* methods bypass + // the queue and call the direct op, avoiding 2 syscalls per request. + private int _wakeFd; + private int _reactorThreadId; + private readonly Mpsc _returnQ = new(1 << 14); // 16384 slots + private readonly Mpsc _flushQ = new(1 << 12); // 4096 slots + + // Teardown handoff: when a connection's refcount hits 0 off-reactor (handler exited + // on the thread pool), the recycle must run on the reactor (it touches the buf_ring + // and the reactor-only pool). Connection is a ref type, so this is a ConcurrentQueue + // rather than the unmanaged Mpsc. + private readonly ConcurrentQueue _recycleQ = new(); + + // Connection pool. Reactor-thread-only — accept and teardown both run on + // this reactor, so a plain Stack is sufficient (no MPMC primitive + // needed). PoolMax caps the slab footprint per reactor: + // PoolMax × WriteSlabSize × ReactorCount = total reserved native memory. + private readonly int PoolMax; + private readonly Stack _pool; + + // Incremental-mode (IOU_PBUF_RING_INC) sizing. Each connection gets its own + // ring, so reserved native memory is bounded by: + // PoolMax × ConnBufRingEntries × IncRecvBufferSize × ReactorCount. + // Keep entries small — the point of incremental is that one buffer holds + // many reads, so you need few of them per connection. + private readonly int MaxConnections; // GID cap (one bgid per active connection) + private readonly int ConnBufRingEntries; // buffers per connection ring + private readonly uint IncRecvBufferSize; // bytes per buffer (filled incrementally) + + // Transient io_uring_enter errnos (Linux): interrupted, would-block, busy. + private const int EINTR = 4; + private const int EAGAIN = 11; + private const int EBUSY = 16; + + public Reactor(int id, ServerConfig config) + { + Id = id; + _config = config; + _port = config.Port; + _ringEntries = config.RingEntries; + _incremental = config.Incremental; + RecvBufferSize = (uint)config.RecvBufferSize; + BufferRingEntries = (uint)config.BufferRingEntries; + PoolMax = config.PoolMax; + MaxConnections = config.MaxConnections; + ConnBufRingEntries = config.ConnBufRingEntries; + IncRecvBufferSize = (uint)config.IncRecvBufferSize; + _pool = new Stack(config.PoolMax); + } + + // ========================================================================= + // Buffer ring + // ========================================================================= + + private void InitBufferRing() + { + nuint ringBytes = (nuint)BufferRingEntries * 16; + _bufRing = (byte*)NativeMemory.AlignedAlloc(ringBytes, 4096); + NativeMemory.Clear(_bufRing, ringBytes); + + nuint slabBytes = BufferRingEntries * (nuint)RecvBufferSize; + _bufSlab = (byte*)NativeMemory.AlignedAlloc(slabBytes, 64); + + _bufRingMask = BufferRingEntries - 1; + + var reg = new io_uring_buf_reg { + ring_addr = (ulong)_bufRing, + ring_entries = BufferRingEntries, + bgid = BgId, + }; + + int ret = io_uring_register(Ring.Fd, IORING_REGISTER_PBUF_RING, ®, 1); + if (ret < 0) + { + int err = Marshal.GetLastPInvokeError(); + + throw new InvalidOperationException($"register pbuf_ring failed: ret={ret} errno={err}"); + } + + // Populate every slot once. Slot 0 overlaps with the ring's tail field + // at offset 14, but we only write addr/len/bid (offsets 0..13) so tail + // stays at zero until we set it explicitly. + for (ushort bid = 0; bid < BufferRingEntries; bid++) { + byte* slot = _bufRing + (uint)bid * 16; + *(ulong*)(slot + 0) = (ulong)(_bufSlab + bid * (nuint)RecvBufferSize); + *(uint*)(slot + 8) = RecvBufferSize; + *(ushort*)(slot + 12) = bid; + } + _bufRingTail = (ushort)BufferRingEntries; + + Volatile.Write(ref *(ushort*)(_bufRing + 14), _bufRingTail); + } + + // Reactor-thread-only: writes the kernel-shared buf_ring tail directly. + // Off-reactor callers must use EnqueueReturnQ instead. + internal void ReturnBufferDirect(ushort bid) + { + byte* slot = _bufRing + (_bufRingTail & _bufRingMask) * 16; + *(ulong*)(slot + 0) = (ulong)(_bufSlab + bid * (nuint)RecvBufferSize); + *(uint*)(slot + 8) = RecvBufferSize; + *(ushort*)(slot + 12) = bid; + _bufRingTail++; + + Volatile.Write(ref *(ushort*)(_bufRing + 14), _bufRingTail); + } + + // ========================================================================= + // Cross-thread entry points (safe to call from any thread) + // ========================================================================= + + public void EnqueueReturnQ(ushort bid) + { + // Fast path: caller is the reactor thread (handler running inline from + // an IVTS SetResult). Go straight to the buf_ring — no queue, no syscall. + if (Environment.CurrentManagedThreadId == _reactorThreadId) + { + ReturnBufferDirect(bid); + return; + } + SpinWait sw = default; + while (!_returnQ.TryEnqueue(bid)) + { + sw.SpinOnce(); + } + //WakeFdWrite(); + } + + internal void EnqueueFlush(int fd) + { + // Fast path: caller is the reactor thread; write the SQE directly. + if (Environment.CurrentManagedThreadId == _reactorThreadId) + { + if (Connections.TryGetValue(fd, out var conn)) + { + SubmitSend(fd, conn.WriteBuffer, (uint)conn.WriteInFlight); + } + return; + } + SpinWait sw = default; + while (!_flushQ.TryEnqueue(fd)) + { + sw.SpinOnce(); + } + WakeFdWrite(); + } + + // Called by Connection.DecRef when the refcount hits 0. Teardown must run on the + // reactor (buf_ring + pool are reactor-owned), so off-reactor callers hand off. + internal void EnqueueRecycle(Connection conn) + { + if (Environment.CurrentManagedThreadId == _reactorThreadId) + { + Recycle(conn, conn.ClientFd); + return; + } + _recycleQ.Enqueue(conn); + WakeFdWrite(); + } + + private void WakeFdWrite() + { + ulong v = 1; + // 8-byte write to eventfd increments its counter; the kernel marks the + // fd readable, which fires our registered multishot poll's next CQE. + write(_wakeFd, &v, 8); + } + + private void DrainReturnQ() + { + while (_returnQ.TryDequeue(out ushort bid)) + { + ReturnBufferDirect(bid); + } + } + + private void DrainFlushQ() + { + while (_flushQ.TryDequeue(out int fd)) + { + if (!Connections.TryGetValue(fd, out var conn)) + { + continue; + } + // Connection state was set by FlushAsync; the Enqueue/Dequeue pair + // establishes the happens-before so WriteInFlight is visible here. + SubmitSend(fd, conn.WriteBuffer, (uint)conn.WriteInFlight); + } + } + + private void DrainRecycleQ() + { + while (_recycleQ.TryDequeue(out Connection? conn)) + { + Recycle(conn, conn.ClientFd); + } + } + + private void ArmWakePoll() + { + IoUringSqe* sqe = GetSqeOrFlush(); + Unsafe.InitBlockUnaligned(sqe, 0, 64); + sqe->opcode = IORING_OP_POLL_ADD; + sqe->fd = _wakeFd; + sqe->op_flags = POLLIN; // poll32_events lives at this offset + sqe->len = IORING_POLL_ADD_MULTI; // multishot — stays armed across CQEs + sqe->user_data = KindWake | (uint)_wakeFd; + } + + // ========================================================================= + // Main loop + // ========================================================================= + + public void Run() + { + _reactorThreadId = Environment.CurrentManagedThreadId; + + Ring = Ring.Create(_ringEntries); + _listenFd = OpenReusePortListener(_port); + + if (_incremental) + { + InitIncremental(); + } + else + { + InitBufferRing(); + } + + _wakeFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (_wakeFd < 0) + { + throw new InvalidOperationException("eventfd failed"); + } + + Console.WriteLine($"[r{Id}] listening on 0.0.0.0:{_port} (incremental={_incremental})"); + SubmitAcceptMultishot(); + ArmWakePoll(); + + if (_incremental) + { + LoopIncremental(); + } + else + { + LoopShared(); + } + + close(_listenFd); + close(_wakeFd); + Ring.Dispose(); + } + + private void LoopShared() + { + while (true) + { + // Drain MPSC queues from off-reactor handlers. Cheap when empty. + DrainReturnQ(); + DrainFlushQ(); + DrainRecycleQ(); + + int rc = Ring.SubmitAndWait(1); + if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY) + { + Console.Error.WriteLine($"[r{Id}] io_uring_enter failed: {rc}"); + break; + } + + uint ready = Ring.CqReady(); + for (uint i = 0; i < ready; i++) + { + Dispatch(in Ring.CqeAt(i)); + } + Ring.CqAdvance(ready); + } + } + + private void Dispatch(in IoUringCqe cqe) + { + ulong kind = cqe.user_data & 0xffffffff_00000000UL; + int fd = (int)(cqe.user_data & 0xffffffffUL); + bool more = (cqe.flags & IORING_CQE_F_MORE) != 0; + + if (kind == KindWake) + { + // Drain the eventfd counter so the next write re-triggers POLLIN + // (multishot poll is edge-triggered on the user_space side). + ulong drain; + read(_wakeFd, &drain, 8); + // The actual queue drains happen at the top of the next loop + // iteration — nothing else to do here. + if (!more) + { + ArmWakePoll(); + } + return; + } + + if (kind == KindAccept) + { + if (cqe.res >= 0) + { + int clientFd = cqe.res; + SetNoDelay(clientFd); + Connection conn = _pool.TryPop(out var pooled) + ? pooled.SetFd(clientFd) + : new Connection(this, clientFd, _config.WriteSlabSize); + Connections[clientFd] = conn; + conn.InitRefs(); + SubmitRecvMultishot(clientFd); + + _ = _config.UsePipe + ? Handler.HandlePipeAsync(this, conn) + : Handler.HandleAsync(this, conn); + } + else + { + Console.Error.WriteLine($"[r{Id}] accept error: {cqe.res}"); + } + // Multishot accept stays armed; only re-arm if the kernel terminated it. + if (!more) + { + SubmitAcceptMultishot(); + } + } + else if (kind == KindRecv) + { + bool hasBuf = (cqe.flags & IORING_CQE_F_BUFFER) != 0; + ushort bid = hasBuf ? (ushort)(cqe.flags >> IORING_CQE_BUFFER_SHIFT) : (ushort)0; + + if (cqe.res <= 0) + { + // Peer EOF or recv error — reactor owns teardown. + if (hasBuf) + { + ReturnBufferDirect(bid); + } + if (Connections.Remove(fd, out var dyingConn)) + { + dyingConn.MarkClosed(); // signal the handler to exit + dyingConn.DecRef(); // release the reactor's ref; teardown at refs==0 + } + return; + } + + if (!Connections.TryGetValue(fd, out var conn)) + { + // Straggler buffer for an already-closed connection. + if (hasBuf) + { + ReturnBufferDirect(bid); + } + return; + } + + byte* ptr = hasBuf ? _bufSlab + (nuint)bid * (nuint)RecvBufferSize : null; + conn.Complete(cqe.res, bid, hasBuf, ptr); + + if (!more) + { + SubmitRecvMultishot(fd); + } + } + else if (kind == KindSend) + { + if (!Connections.TryGetValue(fd, out var conn)) + { + return; + } + if (cqe.res <= 0) + { + // Send error — release the reactor's ref; teardown when the handler exits too. + Connections.Remove(fd); + conn.MarkClosed(); + conn.DecRef(); + return; + } + conn.WriteHead += cqe.res; + if (conn.WriteHead < conn.WriteInFlight) + { + // Partial send: resubmit the remainder. + SubmitSend(fd, conn.WriteBuffer + conn.WriteHead, (uint)(conn.WriteInFlight - conn.WriteHead)); + return; + } + // Full target ack'd — resets buffer state and signals the awaiter. + conn.CompleteFlush(); + } + } + + // ========================================================================= + // SQE producers (reactor-thread-only — Connection.FlushAsync hands off via + // EnqueueFlush, which DrainFlushQ turns into SubmitSend on this thread) + // ========================================================================= + + private IoUringSqe* GetSqeOrFlush() + { + IoUringSqe* sqe = Ring.GetSqe(); + if (sqe != null) + { + return sqe; + } + + Ring.SubmitAndWait(0); + sqe = Ring.GetSqe(); + + if (sqe == null) + { + throw new InvalidOperationException("SQ full after flush"); + } + + return sqe; + } + + private void SubmitAcceptMultishot() + { + IoUringSqe* sqe = GetSqeOrFlush(); + Unsafe.InitBlockUnaligned(sqe, 0, 64); + sqe->opcode = IORING_OP_ACCEPT; + sqe->ioprio = IORING_ACCEPT_MULTISHOT; + sqe->fd = _listenFd; + sqe->user_data = KindAccept | (uint)_listenFd; + } + + private void SubmitRecvMultishot(int fd) => SubmitRecvMultishot(fd, BgId); + + private void SubmitRecvMultishot(int fd, ushort bgid) + { + IoUringSqe* sqe = GetSqeOrFlush(); + Unsafe.InitBlockUnaligned(sqe, 0, 64); + sqe->opcode = IORING_OP_RECV; + sqe->flags = IOSQE_BUFFER_SELECT; + sqe->ioprio = IORING_RECV_MULTISHOT; + sqe->fd = fd; + sqe->buf_index = bgid; // buffer-group id (shared BgId, or per-conn in incremental) + sqe->user_data = KindRecv | (uint)fd; + } + + private void SubmitSend(int fd, byte* buf, uint len) + { + IoUringSqe* sqe = GetSqeOrFlush(); + Unsafe.InitBlockUnaligned(sqe, 0, 64); + sqe->opcode = IORING_OP_SEND; + sqe->fd = fd; + sqe->addr = (ulong)buf; + sqe->len = len; + sqe->user_data = KindSend | (uint)fd; + } + + private void Recycle(Connection conn, int fd) + { + // Wake awaiters, drain in-flight buffers, close the fd, reset state, + // and either push the Connection back to the pool or free its native + // WriteBuffer if the pool is full. + conn.MarkClosed(); + if (_incremental) + { + // The per-connection ring is freed wholesale; no per-buffer return. + // Clear() empties the SPSC ring (leftover slices discarded). + TeardownConnectionBufRing(conn); + } + else + { + conn.DrainRecv(); // return leftover buffers to the shared ring + } + close(fd); + conn.Clear(); + + if (_pool.Count < PoolMax) + { + _pool.Push(conn); + } + else + { + conn.Dispose(); + } + } + + // Disable Nagle on an accepted connection. Must be set per-accepted-socket, + // not on the listener — TCP_NODELAY doesn't reliably inherit across accept, + // which is why zerg/terraform/rtr all set it on the client fd, not the listener. + private static void SetNoDelay(int fd) + { + int one = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(int)); + } + + private static int OpenReusePortListener(ushort port) + { + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) + { + throw new InvalidOperationException($"socket failed: {fd}"); + } + + int one = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)); + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)); + + sockaddr_in addr = default; + addr.sin_family = AF_INET; + addr.sin_port = Htons(port); + addr.sin_addr.s_addr = 0; // 0.0.0.0 + + if (bind(fd, &addr, (uint)sizeof(sockaddr_in)) < 0) + { + throw new InvalidOperationException("bind failed"); + } + + if (listen(fd, 128) < 0) + { + throw new InvalidOperationException("listen failed"); + } + + return fd; + } +} diff --git a/frameworks/minima/ServerConfig.cs b/frameworks/minima/ServerConfig.cs new file mode 100644 index 00000000..255d50c1 --- /dev/null +++ b/frameworks/minima/ServerConfig.cs @@ -0,0 +1,39 @@ +namespace Minima; + +/// +/// All server tunables in one place — replaces the consts that used to be +/// scattered across Program.cs and Reactor.cs. Defaults match the previous +/// hardcoded values; override via object initializer in Main, e.g.: +/// new ServerConfig { Port = 9000, ReactorCount = 8, Incremental = true }. +/// +public sealed record ServerConfig +{ + // Server-level. + public ushort Port { get; init; } = 8080; + public int ReactorCount { get; init; } = 12; + + // Handler style: false = raw ReadAsync/TryGetItem loop; true = PipeReader/PipeWriter. + public bool UsePipe { get; init; } = false; + + // Static file served by the handler via Magpie (io_uring file read). If the path + // doesn't exist a sample file is written there at startup. + public string FilePath { get; init; } = "/tmp/minima-magpie-sample.html"; + + // io_uring SQ/CQ depth. + public uint RingEntries { get; init; } = 8192; + + // Shared buffer ring (used when Incremental == false). + public int RecvBufferSize { get; init; } = 32 * 1024; + public int BufferRingEntries { get; init; } = 4096; + + // Per-connection write slab + connection pool cap. + public int WriteSlabSize { get; init; } = 16 * 1024; + public int PoolMax { get; init; } = 1024; + + // Incremental mode (IOU_PBUF_RING_INC) — per-connection rings. + // reserved native memory ≈ PoolMax × ConnBufRingEntries × IncRecvBufferSize × ReactorCount. + public bool Incremental { get; init; } = false; + public int MaxConnections { get; init; } = 4096; // GID cap (one bgid per active connection) + public int ConnBufRingEntries { get; init; } = 16; // buffers per connection ring + public int IncRecvBufferSize { get; init; } = 4096; // bytes per buffer (filled incrementally) +} diff --git a/frameworks/minima/Utils/Mpsc.cs b/frameworks/minima/Utils/Mpsc.cs new file mode 100644 index 00000000..78e445af --- /dev/null +++ b/frameworks/minima/Utils/Mpsc.cs @@ -0,0 +1,115 @@ +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima.Utils; + +/// +/// Bounded lock-free multi-producer / single-consumer queue. +/// +/// Dmitry Vyukov's bounded MPMC algorithm, specialised to one consumer. +/// Power-of-two capacity, zero-allocation after construction. Producers claim a +/// slot via CAS on the enqueue position (a failed TryEnqueue on a full queue +/// leaves the position untouched — no burned tickets); the single consumer +/// advances the dequeue position with a plain write. Each slot carries a +/// sequence number that coordinates ownership between producers and consumer. +/// +/// One generic queue serves every reactor handoff: Mpsc<ushort> for buffer +/// returns, Mpsc<int> for flush fds, Mpsc<ulong> for packed incremental +/// returns. T is unmanaged so each Cell is a blittable value type with no GC refs. +/// +internal sealed class Mpsc where T : unmanaged +{ + private struct Cell + { + public long Sequence; + public T Value; + } + + private readonly Cell[] _buffer; + private readonly int _mask; + + // PaddedLong is a top-level struct (not nested here) because the CLR forbids + // explicit layout on a type nested inside a generic. + private PaddedLong _enqueuePos; + private PaddedLong _dequeuePos; + + public Mpsc(int capacityPow2) + { + if (capacityPow2 < 2 || (capacityPow2 & (capacityPow2 - 1)) != 0) + throw new ArgumentException("Capacity must be a power of two >= 2.", nameof(capacityPow2)); + + _buffer = new Cell[capacityPow2]; + _mask = capacityPow2 - 1; + + for (int i = 0; i < capacityPow2; i++) + _buffer[i].Sequence = i; + } + + /// Multi-producer safe. Returns false if the queue is full. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryEnqueue(T item) + { + Cell[] buffer = _buffer; + int mask = _mask; + + while (true) + { + long pos = Volatile.Read(ref _enqueuePos.Value); + ref Cell cell = ref buffer[(int)pos & mask]; + + long seq = Volatile.Read(ref cell.Sequence); + long dif = seq - pos; + + if (dif == 0) + { + if (Interlocked.CompareExchange(ref _enqueuePos.Value, pos + 1, pos) == pos) + { + cell.Value = item; + Volatile.Write(ref cell.Sequence, pos + 1); + return true; + } + continue; // lost the race; reload and retry + } + + if (dif < 0) + return false; // slot not yet consumed → full + } + } + + /// Single-consumer only. Returns false if empty. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryDequeue(out T item) + { + Cell[] buffer = _buffer; + int mask = _mask; + + long pos = _dequeuePos.Value; // single consumer: plain read + ref Cell cell = ref buffer[(int)pos & mask]; + + long seq = Volatile.Read(ref cell.Sequence); + long dif = seq - (pos + 1); + + if (dif == 0) + { + item = cell.Value; + _dequeuePos.Value = pos + 1; // single consumer: plain write + Volatile.Write(ref cell.Sequence, pos + mask + 1); // free slot for producers + return true; + } + + item = default; + return false; + } +} + +/// +/// A single long padded to a 64-byte cache line so the producer and consumer +/// positions never share a line (no false sharing). Top-level and non-generic +/// so it can legally use explicit layout. +/// +[StructLayout(LayoutKind.Explicit, Size = 64)] +internal struct PaddedLong +{ + [FieldOffset(0)] public long Value; +} diff --git a/frameworks/minima/Utils/RingSegment.cs b/frameworks/minima/Utils/RingSegment.cs new file mode 100644 index 00000000..273d76e9 --- /dev/null +++ b/frameworks/minima/Utils/RingSegment.cs @@ -0,0 +1,31 @@ +using System.Buffers; + +namespace Minima.Utils; + +/// +/// One segment of a multi-buffer ReadOnlySequence<byte> built by the +/// ConnectionPipeReader when a single read spans more than one recv buffer. +/// BufferId is carried for debugging; buffer return is driven off the held +/// item list, not the segments. +/// +public sealed class RingSegment : ReadOnlySequenceSegment +{ + public ushort BufferId { get; } + + public RingSegment(ReadOnlyMemory memory, ushort bufferId) + { + Memory = memory; + BufferId = bufferId; + } + + public RingSegment Append(ReadOnlyMemory memory, ushort bufferId) + { + var next = new RingSegment(memory, bufferId) + { + RunningIndex = RunningIndex + Memory.Length + }; + + Next = next; + return next; + } +} diff --git a/frameworks/minima/Utils/SpscRecvRing.cs b/frameworks/minima/Utils/SpscRecvRing.cs new file mode 100644 index 00000000..b26642f9 --- /dev/null +++ b/frameworks/minima/Utils/SpscRecvRing.cs @@ -0,0 +1,105 @@ +using System.Runtime.CompilerServices; + +// ReSharper disable SuggestVarOrType_BuiltInTypes + +namespace Minima.Utils; + +public sealed unsafe class SpscRecvRing +{ + public struct Item + { + public byte* Ptr; + public ushort Bid; + public int Len; + public bool HasBuffer; + public ushort Gen; // connection generation when enqueued (incremental return guard) + + public ReadOnlySpan AsSpan() => new(Ptr, Len); + + public UnmanagedMemoryManager AsMemoryManager() => new(Ptr, Len, Bid); + } + + private readonly Item[] _items; + private readonly int _mask; + private long _tail; + private long _head; + + public SpscRecvRing(int capacityPow2) + { + if (capacityPow2 <= 0 || (capacityPow2 & (capacityPow2 - 1)) != 0) + { + throw new ArgumentException("capacity must be a power of two", nameof(capacityPow2)); + } + + _items = new Item[capacityPow2]; + _mask = capacityPow2 - 1; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryEnqueue(in Item item) + { + long head = Volatile.Read(ref _head); + long tail = _tail; + + if ((ulong)(tail - head) >= (ulong)_items.Length) + { + return false; + } + + _items[(int)(tail & _mask)] = item; + Volatile.Write(ref _tail, tail + 1); + + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryDequeue(out Item item) + { + long head = _head; + long tail = Volatile.Read(ref _tail); + + if (head >= tail) + { + item = default; + return false; + } + + item = _items[(int)(head & _mask)]; + Volatile.Write(ref _head, head + 1); + + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public long SnapshotTail() => Volatile.Read(ref _tail); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryDequeueUntil(long tailSnapshot, out Item item) + { + long head = _head; + + if (head >= tailSnapshot) + { + item = default; + return false; + } + + item = _items[(int)(head & _mask)]; + Volatile.Write(ref _head, head + 1); + + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool IsEmpty() => Volatile.Read(ref _head) >= Volatile.Read(ref _tail); + + // Reactor-thread-only, called during connection teardown (Clear) when no + // handler is consuming. Discards any leftover items so the recycled + // connection starts empty. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Reset() + { + _head = 0; + _tail = 0; + } +} diff --git a/frameworks/minima/Utils/UnmanagedMemoryManager.cs b/frameworks/minima/Utils/UnmanagedMemoryManager.cs new file mode 100644 index 00000000..994202c0 --- /dev/null +++ b/frameworks/minima/Utils/UnmanagedMemoryManager.cs @@ -0,0 +1,32 @@ +using System.Buffers; + +namespace Minima.Utils; + +public sealed unsafe class UnmanagedMemoryManager : MemoryManager +{ + private readonly byte* _ptr; + private readonly int _length; + + public ushort BufferId { get; } + + public UnmanagedMemoryManager(byte* ptr, int length) + { + _ptr = ptr; + _length = length; + } + + public UnmanagedMemoryManager(byte* ptr, int length, ushort bufferId) + { + _ptr = ptr; + _length = length; + BufferId = bufferId; + } + + public override Span GetSpan() => new(_ptr, _length); + + public override MemoryHandle Pin(int elementIndex = 0) => new(_ptr + elementIndex); + + public override void Unpin() { } + + protected override void Dispose(bool disposing) { } +} diff --git a/frameworks/minima/io_uring/Native.cs b/frameworks/minima/io_uring/Native.cs new file mode 100644 index 00000000..61d98018 --- /dev/null +++ b/frameworks/minima/io_uring/Native.cs @@ -0,0 +1,162 @@ +using System.Runtime.InteropServices; + +namespace Minima; + +/// +/// All native interop in one file: io_uring syscalls, libc socket calls, +/// the kernel struct layouts they expect, and the constants needed to +/// drive a minimal io_uring loop. +/// +public static unsafe class Native { + private const long SYS_IO_URING_SETUP = 425; + private const long SYS_IO_URING_ENTER = 426; + private const long SYS_IO_URING_REGISTER = 427; + + public const byte IORING_OP_POLL_ADD = 6; + public const byte IORING_OP_ACCEPT = 13; + public const byte IORING_OP_SEND = 26; + public const byte IORING_OP_RECV = 27; + public const uint IORING_ENTER_GETEVENTS = 1u << 0; + public const long IORING_OFF_SQ_RING = 0; + public const long IORING_OFF_SQES = 0x10000000; + + // Multishot / buffer-ring goodies. + public const ushort IORING_ACCEPT_MULTISHOT = 1 << 0; + public const ushort IORING_RECV_MULTISHOT = 1 << 1; + public const byte IOSQE_BUFFER_SELECT = 1 << 5; + public const uint IORING_CQE_F_BUFFER = 1u << 0; + public const uint IORING_CQE_F_MORE = 1u << 1; + public const int IORING_CQE_BUFFER_SHIFT = 16; + public const uint IORING_REGISTER_PBUF_RING = 22; + public const uint IORING_UNREGISTER_PBUF_RING = 23; + public const uint IORING_POLL_ADD_MULTI = 1u << 0; + + // Incremental provided-buffer consumption (kernel 6.12+). IOU_PBUF_RING_INC + // is set in io_uring_buf_reg.flags at registration; IORING_CQE_F_BUF_MORE is + // set on recv CQEs while the kernel will keep appending to the same buffer. + public const ushort IOU_PBUF_RING_INC = 2; + public const uint IORING_CQE_F_BUF_MORE = 1u << 4; + + // eventfd flags + poll mask (used for the cross-thread wake mechanism). + public const int EFD_CLOEXEC = 0x80000; + public const int EFD_NONBLOCK = 0x800; + public const uint POLLIN = 0x0001; + + // Setup flags. SINGLE_ISSUER tells the kernel only one thread will submit + // to this ring (skips locking on the SQ). DEFER_TASKRUN defers completion + // processing until io_uring_enter(GETEVENTS), which lets the kernel batch + // work and avoids interrupting the reactor with task_work mid-flight. + public const uint IORING_SETUP_SINGLE_ISSUER = 1u << 12; + public const uint IORING_SETUP_DEFER_TASKRUN = 1u << 13; + + public const int PROT_READ = 1; + public const int PROT_WRITE = 2; + public const int MAP_SHARED = 1; + public const int MAP_POPULATE = 0x8000; + + public const int AF_INET = 2; + public const int SOCK_STREAM = 1; + public const int SOL_SOCKET = 1; + public const int SO_REUSEADDR = 2; + public const int SO_REUSEPORT = 15; + public const int IPPROTO_TCP = 6; + public const int TCP_NODELAY = 1; + + [DllImport("libc", EntryPoint = "syscall")] + private static extern long syscall3(long nr, uint a1, IoUringParams* a2); + + [DllImport("libc", EntryPoint = "syscall")] + private static extern long syscall6(long nr, uint a1, uint a2, uint a3, uint a4, void* a5, nuint a6); + + [DllImport("libc", EntryPoint = "syscall", SetLastError = true)] + private static extern long syscall4(long nr, uint a1, uint a2, void* a3, uint a4); + + public static int io_uring_setup(uint entries, IoUringParams* p) => + (int)syscall3(SYS_IO_URING_SETUP, entries, p); + + public static int io_uring_enter(int fd, uint toSubmit, uint minComplete, uint flags) => + (int)syscall6(SYS_IO_URING_ENTER, (uint)fd, toSubmit, minComplete, flags, null, 0); + + public static int io_uring_register(int fd, uint opcode, void* arg, uint nrArgs) => + (int)syscall4(SYS_IO_URING_REGISTER, (uint)fd, opcode, arg, nrArgs); + + [DllImport("libc")] public static extern void* mmap(void* addr, nuint length, int prot, int flags, int fd, long offset); + [DllImport("libc")] public static extern int munmap(void* addr, nuint length); + [DllImport("libc")] public static extern int close(int fd); + [DllImport("libc")] public static extern int socket(int domain, int type, int proto); + [DllImport("libc")] public static extern int bind(int fd, sockaddr_in* addr, uint len); + [DllImport("libc")] public static extern int listen(int fd, int backlog); + [DllImport("libc")] public static extern int setsockopt(int fd, int level, int optname, void* optval, uint optlen); + [DllImport("libc")] public static extern int eventfd(uint initval, int flags); + [DllImport("libc")] public static extern long write(int fd, void* buf, nuint count); + [DllImport("libc")] public static extern long read(int fd, void* buf, nuint count); + + public static ushort Htons(ushort x) => (ushort)((x << 8) | (x >> 8)); + + // Kernel struct layouts (must match include/uapi/linux/io_uring.h) + [StructLayout(LayoutKind.Sequential)] + public struct SqRingOffsets { + public uint head, tail, ring_mask, ring_entries, flags, dropped, array, resv1; + public ulong resv2; + } + + [StructLayout(LayoutKind.Sequential)] + public struct CqRingOffsets { + public uint head, tail, ring_mask, ring_entries, overflow, cqes, flags, resv1; + public ulong resv2; + } + + [StructLayout(LayoutKind.Sequential)] + public struct IoUringParams { + public uint sq_entries, cq_entries, flags, sq_thread_cpu, sq_thread_idle; + public uint features, wq_fd, resv0, resv1, resv2; + public SqRingOffsets sq_off; + public CqRingOffsets cq_off; + } + + [StructLayout(LayoutKind.Explicit, Size = 64)] + public struct IoUringSqe { + [FieldOffset(0)] public byte opcode; + [FieldOffset(1)] public byte flags; + [FieldOffset(2)] public ushort ioprio; + [FieldOffset(4)] public int fd; + [FieldOffset(8)] public ulong off; + [FieldOffset(16)] public ulong addr; + [FieldOffset(24)] public uint len; + [FieldOffset(28)] public uint op_flags; + [FieldOffset(32)] public ulong user_data; + [FieldOffset(40)] public ushort buf_index; + [FieldOffset(42)] public ushort personality; + [FieldOffset(44)] public int splice_fd_in; + [FieldOffset(48)] public ulong addr3; + [FieldOffset(56)] public ulong __pad2; + } + + [StructLayout(LayoutKind.Sequential)] + public struct IoUringCqe { + public ulong user_data; + public int res; + public uint flags; + } + + // Argument struct for IORING_REGISTER_PBUF_RING. + [StructLayout(LayoutKind.Sequential)] + public struct io_uring_buf_reg { + public ulong ring_addr; + public uint ring_entries; + public ushort bgid; + public ushort flags; + public ulong resv1, resv2, resv3; + } + + [StructLayout(LayoutKind.Sequential)] + public struct in_addr { public uint s_addr; } + + [StructLayout(LayoutKind.Sequential)] + public unsafe struct sockaddr_in { + public ushort sin_family; + public ushort sin_port; + public in_addr sin_addr; + public fixed byte sin_zero[8]; + } +} diff --git a/frameworks/minima/io_uring/Ring.cs b/frameworks/minima/io_uring/Ring.cs new file mode 100644 index 00000000..c40040fd --- /dev/null +++ b/frameworks/minima/io_uring/Ring.cs @@ -0,0 +1,179 @@ +using System.Runtime.CompilerServices; +using static Minima.Native; + +// ReSharper disable SuggestVarOrType_BuiltInTypes +// ReSharper disable SuggestVarOrType_Elsewhere +#pragma warning disable CA1806 + +namespace Minima; + +public sealed unsafe class Ring : IDisposable +{ + private int _fd; + + public int Fd => _fd; + + private uint* _sqHead; + private uint* _sqTail; + private uint* _sqArray; + private uint _sqMask; + private uint _sqEntries; + private IoUringSqe* _sqes; + + private uint* _cqHead; + private uint* _cqTail; + private IoUringCqe* _cqes; + private uint _cqMask; + + private uint _sqeTail; + + private byte* _ringPtr; + private nuint _ringSize; + private byte* _sqePtr; + private nuint _sqeSize; + + public static Ring Create(uint entries) + { + IoUringParams ioUringParams = default; + ioUringParams.flags = IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN; + int fd = io_uring_setup(entries, &ioUringParams); + if (fd < 0) + { + throw new InvalidOperationException($"io_uring_setup failed: {fd}"); + } + + var ring = new Ring + { + _fd = fd, + _sqEntries = ioUringParams.sq_entries + }; + + nuint sqRingBytes = ioUringParams.sq_off.array + ioUringParams.sq_entries * sizeof(uint); + nuint cqRingBytes = ioUringParams.cq_off.cqes + ioUringParams.cq_entries * (nuint)sizeof(IoUringCqe); + nuint ringBytes = sqRingBytes > cqRingBytes ? sqRingBytes : cqRingBytes; + + void* ringMem = mmap(null, ringBytes, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); + if (ringMem == (void*)-1) + { + close(fd); + + throw new InvalidOperationException("mmap(SQ_RING) failed"); + } + ring._ringPtr = (byte*)ringMem; + ring._ringSize = ringBytes; + + nuint sqeBytes = ioUringParams.sq_entries * (nuint)sizeof(IoUringSqe); + void* sqeMem = mmap(null, sqeBytes, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); + if (sqeMem == (void*)-1) + { + munmap(ringMem, ringBytes); + close(fd); + + throw new InvalidOperationException("mmap(SQES) failed"); + } + ring._sqes = (IoUringSqe*)sqeMem; + ring._sqePtr = (byte*)sqeMem; + ring._sqeSize = sqeBytes; + + byte* ringPointer = (byte*)ringMem; + ring._sqHead = (uint*)(ringPointer + ioUringParams.sq_off.head); + ring._sqTail = (uint*)(ringPointer + ioUringParams.sq_off.tail); + ring._sqArray = (uint*)(ringPointer + ioUringParams.sq_off.array); + ring._sqMask = *(uint*)(ringPointer + ioUringParams.sq_off.ring_mask); + + ring._cqHead = (uint*)(ringPointer + ioUringParams.cq_off.head); + ring._cqTail = (uint*)(ringPointer + ioUringParams.cq_off.tail); + ring._cqes = (IoUringCqe*)(ringPointer + ioUringParams.cq_off.cqes); + ring._cqMask = *(uint*)(ringPointer + ioUringParams.cq_off.ring_mask); + + return ring; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public IoUringSqe* GetSqe() + { + uint head = Volatile.Read(ref *_sqHead); + + if (_sqeTail - head >= _sqEntries) + { + return null; + } + + uint slot = _sqeTail & _sqMask; + _sqArray[slot] = slot; + _sqeTail++; + + return &_sqes[slot]; + } + + public int SubmitAndWait(uint waitFor) + { + uint published = *_sqTail; + uint toSubmit = _sqeTail - published; + + if (toSubmit > 0) + { + Volatile.Write(ref *_sqTail, _sqeTail); + } + + if (toSubmit == 0 && waitFor == 0) return 0; + + uint flags = waitFor > 0 ? IORING_ENTER_GETEVENTS : 0; + + return io_uring_enter(_fd, toSubmit, waitFor, flags); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryGetCqe(out IoUringCqe cqe) + { + uint head = *_cqHead; + uint tail = Volatile.Read(ref *_cqTail); + + if (head == tail) + { + cqe = default; + + return false; + } + + cqe = _cqes[head & _cqMask]; + + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void CqeSeen() => Volatile.Write(ref *_cqHead, *_cqHead + 1); + + // Batched CQ drain (liburing io_uring_for_each_cqe + io_uring_cq_advance): + // read the kernel-written tail once (acquire), process the whole batch, + // then publish the consumed head once (release) instead of once per CQE. + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public uint CqReady() => Volatile.Read(ref *_cqTail) - *_cqHead; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ref readonly IoUringCqe CqeAt(uint i) => ref _cqes[(*_cqHead + i) & _cqMask]; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void CqAdvance(uint n) => Volatile.Write(ref *_cqHead, *_cqHead + n); + + public void Dispose() + { + if (_ringPtr != null) + { + munmap(_ringPtr, _ringSize); _ringPtr = null; + } + + if (_sqePtr != null) + { + munmap(_sqePtr, _sqeSize); _sqePtr = null; + } + + if (_fd > 0) + { + close(_fd); _fd = 0; + } + } +} + +#pragma warning restore CA1806 diff --git a/frameworks/minima/meta.json b/frameworks/minima/meta.json new file mode 100644 index 00000000..5c856615 --- /dev/null +++ b/frameworks/minima/meta.json @@ -0,0 +1,16 @@ +{ + "display_name": "minima", + "language": "C#", + "type": "engine", + "engine": "io_uring", + "description": "Minima — a from-scratch C# multi-reactor io_uring HTTP/1.1 server. Per-reactor SO_REUSEPORT + multishot accept, multishot recv into a provided buffer ring, inline (RCA=false) continuations with a reactor-thread short-circuit on buffer return/flush/recycle. The engine is vendored unchanged; the request handler (request line, headers, Content-Length + chunked bodies, keep-alive, pipelining, fragmented reads) is hand-written on its raw recv/send API. No HTTP framework.", + "repo": "", + "enabled": true, + "tests": [ + "baseline", + "pipelined", + "limited-conn", + "json" + ], + "maintainers": [] +} diff --git a/frameworks/minima/minima.csproj b/frameworks/minima/minima.csproj new file mode 100644 index 00000000..5a16eff8 --- /dev/null +++ b/frameworks/minima/minima.csproj @@ -0,0 +1,15 @@ + + + + Exe + net10.0 + enable + enable + true + Minima + minima + true + true + + +