Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions AspBaseline/AspBaseline.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<ServerGarbageCollection>true</ServerGarbageCollection>
<ConcurrentGarbageCollection>false</ConcurrentGarbageCollection>
<TieredPGO>true</TieredPGO>
</PropertyGroup>

</Project>
39 changes: 39 additions & 0 deletions AspBaseline/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Text.Json;
using Microsoft.Extensions.Logging;

var builder = WebApplication.CreateBuilder(args);
builder.Logging.SetMinimumLevel(LogLevel.Warning);
builder.WebHost.ConfigureKestrel(o => o.ListenAnyIP(8080)); // default Kestrel socket transport

var app = builder.Build();

// Same knob + same object as Minima's handler: serialize a WORK_ITEMS-element object to
// JSON per request and discard it (the work stands in for a serializing endpoint).
// 0 / unset = no work (plain "ok"). The handler already runs on the thread pool here —
// no Task.Run needed — which is exactly Kestrel's model.
int workItems = 1000;
Payload largeObject = BuildPayload(Math.Max(workItems, 1));

app.MapGet("/", () =>
{
if (workItems > 0)
{
_ = JsonSerializer.SerializeToUtf8Bytes(largeObject);
}
return "ok";
});

app.Run();

static Payload BuildPayload(int count)
{
var items = new Item[count];
for (int i = 0; i < count; i++)
{
items[i] = new Item(i, $"item-{i}", i * 1.5, (i & 1) == 0, $"category-{i % 8}");
}
return new Payload(DateTime.UtcNow.ToString("O"), count, items);
}

internal sealed record Item(int Id, string Name, double Value, bool Active, string Category);
internal sealed record Payload(string Generated, int Count, Item[] Items);
26 changes: 26 additions & 0 deletions Examples/ZeroAlloc/Basic/Rings_as_ReadOnlySpan.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
using System.Text.Json;
using zerg;
using zerg.core;

namespace Examples.ZeroAlloc.Basic;

internal sealed class Rings_as_ReadOnlySpan
{
internal sealed record Item(int Id, string Name, double Value, bool Active, string Category);
internal sealed record Payload(string Generated, int Count, Item[] Items);

// Real async-work knob: serialize an in-memory object of WORK_ITEMS elements to JSON
// on the THREAD POOL (via Task.Run) per request. 0 / unset = disabled (pure inline
// reactor path). Genuine CPU + allocation, not a busy-spin.
private static readonly int WorkItems = 50;

private static readonly Payload LargeObject = BuildPayload(Math.Max(WorkItems, 1));

private static Payload BuildPayload(int count)
{
var items = new Item[count];
for (int i = 0; i < count; i++)
{
items[i] = new Item(i, $"item-{i}", i * 1.5, (i & 1) == 0, $"category-{i % 8}");
}
return new Payload(DateTime.UtcNow.ToString("O"), count, items);
}

internal static async Task HandleConnectionAsync(Connection connection)
{
while (true)
Expand All @@ -19,6 +40,11 @@ internal static async Task HandleConnectionAsync(Connection connection)
var sequence = rings.ToReadOnlySequence();

// Process received data...
if (WorkItems > 0)
{
//_ = await Task.Run(static () => JsonSerializer.SerializeToUtf8Bytes(LargeObject));
JsonSerializer.SerializeToUtf8Bytes(LargeObject);
}

// Return rings to the kernel
foreach (var ring in rings)
Expand Down
17 changes: 17 additions & 0 deletions Minima/Connection/Connection.Write.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ public void Write(ReadOnlySpan<byte> source)
// Flush inner buffer data to the kernel
public ValueTask FlushAsync()
{
// Connection already torn down (reactor saw EOF/error → MarkClosed): don't flush
// a removed connection — the handoff would reach a reactor that no longer knows
// this fd and the awaiter would hang. Return completed so the handler unwinds to
// its next ReadAsync, sees IsClosed, and exits.
if (Volatile.Read(ref _closed) == 1)
{
return default;
}

if (Interlocked.Exchange(ref _flushInProgress, 1) == 1)
{
throw new InvalidOperationException("FlushAsync already in progress.");
Expand All @@ -115,6 +124,14 @@ public ValueTask FlushAsync()

_reactor.EnqueueFlush(ClientFd);

// Race recovery (mirrors ReadAsync): if close raced in after the guard above,
// self-complete so we don't hang waiting on a send the reactor will never make.
if (Volatile.Read(ref _closed) == 1 && Interlocked.Exchange(ref _flushArmed, 0) == 1)
{
Volatile.Write(ref _flushInProgress, 0);
_flushSignal.SetResult(true);
}

return new ValueTask(this, (short)gen);
}

Expand Down
23 changes: 21 additions & 2 deletions Minima/Connection/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ public sealed unsafe partial class Connection
// Bumped on Clear(); the low 16 bits are used as the IVTS token so stale
// awaiters can be detected after pool reuse.
private int _generation;


// Refcount: the connection has two owners — the reactor (recv side) and the
// handler (which may run off-reactor). Init to 2 on accept; each owner DecRef's
// when done; teardown (Recycle) runs only at refs==0, so a connection is never
// recycled or pool-reused while a handler is still in flight on another thread.
private int _refs;

public Connection(Reactor reactor, int fd, int writeSlabSize = 1024 * 16)
{
_reactor = reactor;
Expand Down Expand Up @@ -53,7 +59,20 @@ public void MarkClosed()
_flushSignal.SetResult(true);
}
}


// Init to 2 (reactor + handler) at accept.
internal void InitRefs() => Volatile.Write(ref _refs, 2);

// Release one owner's ref. Whoever drives it to 0 hands the connection to the
// reactor for teardown (close + Clear + pool) — never recycled before both done.
internal void DecRef()
{
if (Interlocked.Decrement(ref _refs) == 0)
{
_reactor.EnqueueRecycle(this);
}
}

internal void Clear()
{
// Bump generation first — readers of IVTS plumbing observe this via
Expand Down
37 changes: 37 additions & 0 deletions Minima/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.IO.Pipelines;
using System.Text.Json;
using Minima.Utils;

namespace Minima;
Expand All @@ -22,6 +23,7 @@ private static int Main()
var config = new ServerConfig()
{
UsePipe = false,
ReactorCount = 24
};

Console.WriteLine($"[Minima] starting {config.ReactorCount} reactors on port {config.Port} (incremental={config.Incremental})");
Expand Down Expand Up @@ -50,6 +52,23 @@ private static int Main()

internal static class Handler
{
// Real async-work knob: serialize an in-memory object of WORK_ITEMS elements to JSON
// on the THREAD POOL (via Task.Run) per request. 0 / unset = disabled (pure inline
// reactor path). Genuine CPU + allocation, not a busy-spin.
private static readonly int WorkItems = 50;

private static readonly Payload LargeObject = BuildPayload(Math.Max(WorkItems, 1));

private static Payload BuildPayload(int count)
{
var items = new Item[count];
for (int i = 0; i < count; i++)
{
items[i] = new Item(i, $"item-{i}", i * 1.5, (i & 1) == 0, $"category-{i % 8}");
}
return new Payload(DateTime.UtcNow.ToString("O"), count, items);
}

public static async Task HandleAsync(Reactor reactor, Connection conn)
{
try
Expand All @@ -73,6 +92,16 @@ public static async Task HandleAsync(Reactor reactor, Connection conn)
}
}

// Real async work: serialize a large object to JSON on the THREAD POOL.
// The handler resumes OFF-REACTOR, so the FlushAsync below pays the eventfd
// handoff the pure-inline path avoids — and the serialization is genuine
// CPU + GC pressure on the pool, not a busy-spin.
if (WorkItems > 0)
{
//_ = await Task.Run(static () => JsonSerializer.SerializeToUtf8Bytes(LargeObject));
JsonSerializer.SerializeToUtf8Bytes(LargeObject);
}

// One response per recv burst — accumulate in the connection's
// per-connection write slab, then submit and await ack.
conn.Write(Program.Response);
Expand All @@ -94,6 +123,10 @@ public static async Task HandleAsync(Reactor reactor, Connection conn)
// Reactor will clean the connection up via the recv-error path
// (or SPSC overflow) on the next CQE for this fd.
}
finally
{
conn.DecRef(); // release the handler's ref; teardown runs once the reactor releases too
}
}

// PipeReader/PipeWriter variant — same behavior, driven through the BCL
Expand Down Expand Up @@ -134,6 +167,10 @@ public static async Task HandlePipeAsync(Reactor reactor, Connection conn)
{
reader.Complete();
writer.Complete();
conn.DecRef();
}
}
}

internal sealed record Item(int Id, string Name, double Value, bool Active, string Category);
internal sealed record Payload(string Generated, int Count, Item[] Items);
12 changes: 8 additions & 4 deletions Minima/Reactor/Reactor.Incremental.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private void LoopIncremental()
{
DrainReturnQIncremental();
DrainFlushQ();
DrainRecycleQ();

int rc = Ring.SubmitAndWait(1);
if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY)
Expand Down Expand Up @@ -220,6 +221,7 @@ private void DispatchIncremental(in IoUringCqe cqe)
? pooled.SetFd(clientFd)
: new Connection(this, clientFd, _config.WriteSlabSize);
Connections[clientFd] = conn;
conn.InitRefs();
SetupConnectionBufRing(conn);
SubmitRecvMultishot(clientFd, conn.Bgid);

Expand Down Expand Up @@ -247,9 +249,10 @@ private void DispatchIncremental(in IoUringCqe cqe)
// Peer EOF / recv error — the whole per-conn ring is freed in Recycle.
if (Connections.Remove(fd, out var dyingConn))
{
Recycle(dyingConn, fd);
dyingConn.MarkClosed();
dyingConn.DecRef();
}

return;
}

Expand Down Expand Up @@ -284,8 +287,9 @@ private void DispatchIncremental(in IoUringCqe cqe)
if (cqe.res <= 0)
{
Connections.Remove(fd);
Recycle(conn, fd);

conn.MarkClosed();
conn.DecRef();

return;
}
conn.WriteHead += cqe.res;
Expand Down
38 changes: 35 additions & 3 deletions Minima/Reactor/Reactor.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Minima.Utils;
Expand Down Expand Up @@ -52,6 +53,12 @@ public sealed unsafe partial class Reactor
private readonly Mpsc<ushort> _returnQ = new(1 << 14); // 16384 slots
private readonly Mpsc<int> _flushQ = new(1 << 12); // 4096 slots

// Teardown handoff: when a connection's refcount hits 0 off-reactor (handler exited
// on the thread pool), the recycle must run on the reactor (it touches the buf_ring
// and the reactor-only pool). Connection is a ref type, so this is a ConcurrentQueue
// rather than the unmanaged Mpsc<T>.
private readonly ConcurrentQueue<Connection> _recycleQ = new();

// Connection pool. Reactor-thread-only — accept and teardown both run on
// this reactor, so a plain Stack<T> is sufficient (no MPMC primitive
// needed). PoolMax caps the slab footprint per reactor:
Expand Down Expand Up @@ -215,6 +222,27 @@ private void DrainFlushQ()
}
}

// Called by Connection.DecRef when the refcount hits 0. Teardown must run on the
// reactor (buf_ring + pool are reactor-owned), so off-reactor callers hand off.
internal void EnqueueRecycle(Connection conn)
{
if (Environment.CurrentManagedThreadId == _reactorThreadId)
{
Recycle(conn, conn.ClientFd);
return;
}
_recycleQ.Enqueue(conn);
WakeFdWrite();
}

private void DrainRecycleQ()
{
while (_recycleQ.TryDequeue(out Connection? conn))
{
Recycle(conn, conn.ClientFd);
}
}

private void ArmWakePoll()
{
IoUringSqe* sqe = GetSqeOrFlush();
Expand Down Expand Up @@ -277,6 +305,7 @@ private void LoopShared()
// Drain MPSC queues from off-reactor handlers. Cheap when empty.
DrainReturnQ();
DrainFlushQ();
DrainRecycleQ();

int rc = Ring.SubmitAndWait(1);
if (rc < 0 && rc != -EINTR && rc != -EAGAIN && rc != -EBUSY)
Expand Down Expand Up @@ -325,6 +354,7 @@ private void Dispatch(in IoUringCqe cqe)
? pooled.SetFd(clientFd)
: new Connection(this, clientFd, _config.WriteSlabSize);
Connections[clientFd] = conn;
conn.InitRefs();
SubmitRecvMultishot(clientFd);

_ = _config.UsePipe
Expand Down Expand Up @@ -355,7 +385,8 @@ private void Dispatch(in IoUringCqe cqe)
}
if (Connections.Remove(fd, out var dyingConn))
{
Recycle(dyingConn, fd);
dyingConn.MarkClosed(); // signal the handler to exit
dyingConn.DecRef(); // release the reactor's ref; teardown at refs==0
}
return;
}
Expand Down Expand Up @@ -386,9 +417,10 @@ private void Dispatch(in IoUringCqe cqe)
}
if (cqe.res <= 0)
{
// Send error — reactor owns teardown.
// Send error — release the reactor's ref; teardown when the handler exits too.
Connections.Remove(fd);
Recycle(conn, fd);
conn.MarkClosed();
conn.DecRef();
return;
}
conn.WriteHead += cqe.res;
Expand Down
Loading
Loading