Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions frameworks/minima/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bin/
obj/
61 changes: 61 additions & 0 deletions frameworks/minima/Connection/Connection.Incremental.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System.Runtime.InteropServices;
using Minima.Utils;
// ReSharper disable SuggestVarOrType_BuiltInTypes

namespace Minima;

/// <summary>
/// Incremental-mode (IOU_PBUF_RING_INC) per-connection buffer-ring state.
/// Each connection owns its own ring + slab; one buffer accumulates this
/// connection's byte stream across many recvs. The reactor (Reactor.Incremental)
/// drives setup/teardown and the refcounted recycle; this partial just holds the
/// state and routes a handler return to the right reactor entry point.
///
/// All of these stay allocated across pool reuse and are freed in Dispose().
/// </summary>
public sealed unsafe partial class Connection
{
internal byte* BufRing; // kernel-shared ring control area
internal byte* BufSlab; // this connection's recv slab
internal ushort Bgid;
internal uint BufRingMask;
internal int BufRingEntries;
internal bool IncrementalMode;

internal int[]? CumOffset; // per-bid: byte offset where the next slice begins
internal int[]? RefCount; // per-bid: outstanding handler refs
internal bool[]? KernelDone; // per-bid: kernel finished appending (no F_BUF_MORE)

internal int Generation => Volatile.Read(ref _generation);

/// <summary>
/// Called by the handler to hand a consumed recv buffer back. Routes by mode:
/// incremental returns carry (fd, gen, bid) for refcounted recycle; the shared
/// path returns the bare bid to the reactor's single buf_ring.
/// </summary>
public void ReturnBuffer(in SpscRecvRing.Item item)
{
if (IncrementalMode)
{
_reactor.EnqueueReturnQIncremental(ClientFd, item.Gen, item.Bid);
}
else
{
_reactor.EnqueueReturnQ(item.Bid);
}
}

private void DisposeIncremental()
{
if (BufRing != null)
{
NativeMemory.AlignedFree(BufRing);
BufRing = null;
}
if (BufSlab != null)
{
NativeMemory.AlignedFree(BufSlab);
BufSlab = null;
}
}
}
166 changes: 166 additions & 0 deletions frameworks/minima/Connection/Connection.Read.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
using System.Threading.Tasks.Sources;
using Minima.Utils;

// ReSharper disable SuggestVarOrType_BuiltInTypes

namespace Minima;

/// <summary>
/// Per-connection state. The handler may run on any thread (e.g. resumed by
/// a thread-pool timer); reactor-only side effects are funnelled through the
/// MPSC queues on `Reactor`. Coordination uses Interlocked.Exchange on the
/// arm flags and a sticky `_pending` to close the lost-wakeup race.
///
/// Lifetime is pool-managed: the reactor pops a Connection on accept (or new
/// one if pool is empty), and pushes it back on teardown after `Clear()`. The
/// `_generation` field is bumped on each `Clear` so stale `ValueTask` tokens
/// from a previous connection life are detectable and return `Closed()`
/// instead of leaking the new tenant's state.
/// </summary>
public sealed unsafe partial class Connection : IValueTaskSource<RecvSnapshot>
{
internal Connection SetFd(int fd)
{
ClientFd = fd;
return this;
}

private ManualResetValueTaskSourceCore<RecvSnapshot> _readSignal = new()
{
RunContinuationsAsynchronously = false,
};
private int _armed;
private int _pending;
private int _closed;

private readonly SpscRecvRing _recv = new(capacityPow2: 16);

public ValueTask<RecvSnapshot> ReadAsync()
{
if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1)
{
Volatile.Write(ref _pending, 0);
return new ValueTask<RecvSnapshot>(
new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
}

if (Volatile.Read(ref _closed) != 0)
{
return new ValueTask<RecvSnapshot>(RecvSnapshot.Closed());
}

if (Interlocked.Exchange(ref _armed, 1) == 1)
{
throw new InvalidOperationException("ReadAsync already armed.");
}

// Snapshot the generation as the IVTS token so a future Clear() can
// invalidate this awaiter if the connection gets pool-recycled.
int gen = Volatile.Read(ref _generation);

// Race recovery: re-check between arming and returning the IVTS task.
if (!_recv.IsEmpty() || Volatile.Read(ref _pending) == 1 || Volatile.Read(ref _closed) != 0)
{
Volatile.Write(ref _pending, 0);
Interlocked.Exchange(ref _armed, 0);

return new ValueTask<RecvSnapshot>(
new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
}

return new ValueTask<RecvSnapshot>(this, (short)gen);
}

public bool TryGetItem(in RecvSnapshot snap, out SpscRecvRing.Item item)
=> _recv.TryDequeueUntil(snap.Tail, out item);

public void ResetRead() => _readSignal.Reset();

public void Complete(int res, ushort bid, bool hasBuffer, byte* ptr)
{
if (!_recv.TryEnqueue(new SpscRecvRing.Item
{
Ptr = ptr,
Bid = bid,
Len = res,
HasBuffer = hasBuffer,
Gen = (ushort)Volatile.Read(ref _generation)
}))
{
Console.Error.WriteLine("[conn] recv queue overflow.");
if (hasBuffer)
{
_reactor.ReturnBufferDirect(bid);
}
Volatile.Write(ref _closed, 1);
}

if (Interlocked.Exchange(ref _armed, 0) == 1)
{
_readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), Volatile.Read(ref _closed) != 0));
}
else
{
Volatile.Write(ref _pending, 1);
}
}

internal void DrainRecv()
{
// Return any buffer IDs still sitting in the SPSC ring (handler exited
// before draining them, or a recv arrived after _closed was set).
while (_recv.TryDequeue(out SpscRecvRing.Item item))
{
if (item.HasBuffer)
{
_reactor.ReturnBufferDirect(item.Bid);
}
}
}

// =========================================================================
// IValueTaskSource plumbing — token (= snapshot of `_generation` at await
// time) is compared against the current `_generation` to detect stale
// awaiters from before a Clear()/pool reuse. Stale awaiters get a
// sentinel result rather than the new tenant's state.
//
// For the actual IVTS dispatch we pass `_readSignal.Version` /
// `_flushSignal.Version` to the underlying core (not `token`) because the
// core's version is bumped by ResetRead/CompleteFlush mid-life and is
// unrelated to the cross-life generation guard.
// =========================================================================

RecvSnapshot IValueTaskSource<RecvSnapshot>.GetResult(short token)
{
if (token != (short)Volatile.Read(ref _generation))
{
return RecvSnapshot.Closed();
}

return _readSignal.GetResult(_readSignal.Version);
}

ValueTaskSourceStatus IValueTaskSource<RecvSnapshot>.GetStatus(short token)
{
if (token != (short)Volatile.Read(ref _generation))
{
return ValueTaskSourceStatus.Succeeded;
}

return _readSignal.GetStatus(_readSignal.Version);
}

void IValueTaskSource<RecvSnapshot>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (token != (short)Volatile.Read(ref _generation))
{
// Stale — run the continuation now so the awaiter unblocks and
// gets RecvSnapshot.Closed() from GetResult.
continuation(state);

return;
}

_readSignal.OnCompleted(continuation, state, _readSignal.Version, flags);
}
}
Loading