Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ jobs:

- name: Pack terraform
run: dotnet pack terraform/terraform.csproj --configuration Release --no-build --output ./artifacts

- name: Pack Twinflow
run: dotnet pack Twinflow/Twinflow.csproj --configuration Release --no-build --output ./artifacts

- name: Publish to NuGet
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
Expand Down
15 changes: 15 additions & 0 deletions KestrelShrike.Demo/KestrelShrike.Demo.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<ServerGarbageCollection>true</ServerGarbageCollection>
<TieredPGO>true</TieredPGO>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\KestrelShrike\KestrelShrike.csproj" />
</ItemGroup>

</Project>
23 changes: 23 additions & 0 deletions KestrelShrike.Demo/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.Logging;
using KestrelShrike;

var builder = WebApplication.CreateBuilder(args);

builder.Logging.SetMinimumLevel(LogLevel.Warning); // benchmark: silence per-request logs

builder.WebHost.UseKestrel(kestrel =>
{
kestrel.ListenAnyIP(8080);
});

// SHRIKE=0 → Kestrel's default Socket transport (baseline). Otherwise the epoll Shrike transport.
if (Environment.GetEnvironmentVariable("SHRIKE") != "0")
{
builder.WebHost.UseShrike(opts => opts.ReactorCount = Math.Max(1, 4));
}

var app = builder.Build();

app.MapGet("/", () => "Hello from Shrike + Kestrel\n");

app.Run();
167 changes: 167 additions & 0 deletions KestrelShrike/EpollConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
namespace KestrelShrike;

/// <summary>
/// One TCP connection bridged to Kestrel through two BCL Pipes:
/// - Input: the reactor drains recv into Input.Writer; Kestrel reads Input.Reader.
/// - Output: Kestrel writes Output.Writer; the per-connection pump reads Output.Reader
/// and sends — DIRECTLY from the thread-pool thread, because an epoll
/// socket's send() is thread-safe. No reactor handoff (unlike io_uring's
/// single-issuer ring). The reactor is only involved on EAGAIN (arm
/// EPOLLOUT, signal the pump when writable).
/// Lifetime: 2-ref count (reactor/recv side + pump side); the fd closes when both end.
/// </summary>
internal sealed class EpollConnection
{
public readonly int Fd;
public readonly int Ep;
private readonly EpollReactor _reactor;

public readonly Pipe Input;
public readonly Pipe Output;

private TaskCompletionSource<bool>? _writable; // set while the pump waits for EPOLLOUT
private int _refs = 2;
private int _closed;

private const int RecvChunk = 16 * 1024;

public EpollConnection(int fd, int ep, EpollReactor reactor)
{
Fd = fd;
Ep = ep;
_reactor = reactor;
var o = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0, useSynchronizationContext: false);
Input = new Pipe(o);
Output = new Pipe(o);
}

public bool IsClosed => Volatile.Read(ref _closed) != 0;

// ---- recv (reactor thread): drain into Input.Writer. False => peer closed / error. ----
public unsafe bool OnReadable()
{
if (IsClosed) return false;

bool any = false;
bool ok = true;
while (true)
{
Span<byte> span = Input.Writer.GetSpan(RecvChunk);
long n;
fixed (byte* p = span) n = recv(Fd, p, (ulong)span.Length, 0);

if (n > 0) { Input.Writer.Advance((int)n); any = true; continue; }
if (n == 0) { ok = false; break; } // peer closed

int err = Marshal.GetLastPInvokeError();
if (err is EAGAIN or EWOULDBLOCK) break; // drained
if (err == EINTR) continue;
ok = false; break; // hard error
}

if (any) _ = Input.Writer.FlushAsync();
return ok;
}

// ---- output pump (thread pool) ----
public async Task RunOutputPump()
{
PipeReader reader = Output.Reader;
try
{
while (true)
{
ReadResult r = await reader.ReadAsync().ConfigureAwait(false);
if (r.IsCanceled) break;

ReadOnlySequence<byte> buf = r.Buffer;
bool fail = false;

foreach (ReadOnlyMemory<byte> seg in buf)
{
int off = 0;
while (off < seg.Length)
{
int sent = TrySend(seg.Span.Slice(off), out bool wouldBlock, out bool closed);
if (closed) { fail = true; break; }
if (sent > 0) { off += sent; continue; }
if (wouldBlock && !await WaitWritableAsync().ConfigureAwait(false)) { fail = true; break; }
// EINTR (sent == 0, not wouldBlock, not closed) just retries
}
if (fail) break;
}

reader.AdvanceTo(buf.End);
if (fail || r.IsCompleted) break;
}
}
catch { /* connection died mid-send */ }
finally { try { reader.Complete(); } catch { } DecRef(); }
}

private unsafe int TrySend(ReadOnlySpan<byte> data, out bool wouldBlock, out bool closed)
{
wouldBlock = false;
closed = false;
if (data.IsEmpty) return 0;

long n;
fixed (byte* p = data) n = send(Fd, p, data.Length, MSG_NOSIGNAL);
if (n > 0) return (int)n;

int err = (n == 0) ? EAGAIN : Marshal.GetLastPInvokeError();
if (err is EAGAIN or EWOULDBLOCK) { wouldBlock = true; return 0; }
if (err == EINTR) return 0;
closed = true;
return 0;
}

// ---- EAGAIN: arm EPOLLOUT and wait for the reactor's writable signal ----
private Task<bool> WaitWritableAsync()
{
if (IsClosed) return Task.FromResult(false);
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Volatile.Write(ref _writable, tcs);
ArmEpollOut(); // epoll_ctl is thread-safe
if (IsClosed) tcs.TrySetResult(false); // raced with close
return tcs.Task;
}

public void SignalWritable() // reactor: EPOLLOUT fired
{
TaskCompletionSource<bool>? tcs = Interlocked.Exchange(ref _writable, null);
if (tcs is not null)
{
ArmEpollIn();
tcs.TrySetResult(true);
}
}

public void MarkClosed() // reactor thread: completes Input.Writer (sole writer)
{
if (Interlocked.Exchange(ref _closed, 1) == 1) return;
try { Input.Writer.Complete(); } catch { }
Interlocked.Exchange(ref _writable, null)?.TrySetResult(false); // unblock the pump
}

public void DecRef()
{
if (Interlocked.Decrement(ref _refs) != 0) return;
_reactor.Remove(this);
close(Fd);
}

private unsafe void ArmEpollOut()
{
byte* ev = stackalloc byte[EvSize];
WriteEpollEvent(ev, (uint)(EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLERR | EPOLLHUP) | EPOLLET, Fd);
epoll_ctl(Ep, EPOLL_CTL_MOD, Fd, (IntPtr)ev);
}

private unsafe void ArmEpollIn()
{
byte* ev = stackalloc byte[EvSize];
WriteEpollEvent(ev, (uint)(EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP) | EPOLLET, Fd);
epoll_ctl(Ep, EPOLL_CTL_MOD, Fd, (IntPtr)ev);
}
}
36 changes: 36 additions & 0 deletions KestrelShrike/EpollEngine.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.Threading.Channels;

namespace KestrelShrike;

/// <summary>Owns N epoll reactors (each with its own SO_REUSEPORT listener) and funnels accepted connections to Kestrel.</summary>
internal sealed class EpollEngine
{
private readonly EpollReactor[] _reactors;
private readonly Channel<EpollConnection> _accepted =
Channel.CreateUnbounded<EpollConnection>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });

public EpollEngine(ushort port, int reactorCount, int backlog, int maxEvents)
{
_reactors = new EpollReactor[reactorCount];
for (int i = 0; i < reactorCount; i++)
_reactors[i] = new EpollReactor(i, port, backlog, maxEvents) { OnAccept = c => _accepted.Writer.TryWrite(c) };
}

public void Start()
{
for (int i = 0; i < _reactors.Length; i++)
{
int idx = i;
var t = new Thread(() => _reactors[idx].Run()) { IsBackground = true, Name = $"shrike-k-r{idx}" };
t.Start();
}
}

public ValueTask<EpollConnection> AcceptAsync(CancellationToken ct) => _accepted.Reader.ReadAsync(ct);

public void Stop()
{
_accepted.Writer.TryComplete();
foreach (EpollReactor r in _reactors) r.Stop();
}
}
144 changes: 144 additions & 0 deletions KestrelShrike/EpollReactor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
namespace KestrelShrike;

/// <summary>
/// One reactor = one thread + one epoll instance + its own SO_REUSEPORT listener
/// (Minima/Shrike topology — kernel balances accepts, no acceptor thread). It is
/// purely a readiness driver: EPOLLIN → drain recv into the connection's input
/// Pipe (Kestrel reads it); EPOLLOUT → wake a pump that hit EAGAIN; error/hup →
/// close. Response sends happen on the pump's thread, not here.
/// </summary>
internal sealed unsafe class EpollReactor
{
public readonly int Id;
private readonly ushort _port;
private readonly int _backlog;
private readonly int _maxEvents;

private int _ep;
private int _listenFd;
private readonly ConcurrentDictionary<int, EpollConnection> _conns = new();
private volatile bool _running = true;

internal Action<EpollConnection>? OnAccept;

public EpollReactor(int id, ushort port, int backlog, int maxEvents)
{
Id = id;
_port = port;
_backlog = backlog;
_maxEvents = maxEvents;
}

public void Stop() => _running = false;

internal void Remove(EpollConnection conn) =>
_conns.TryRemove(new KeyValuePair<int, EpollConnection>(conn.Fd, conn));

public void Run()
{
_ep = epoll_create1(EPOLL_CLOEXEC);
if (_ep < 0) throw new Exception("epoll_create1 failed");
_listenFd = OpenReusePortListener(_port, _backlog);

byte* lev = stackalloc byte[EvSize];
WriteEpollEvent(lev, (uint)(EPOLLIN | EPOLLERR | EPOLLHUP), _listenFd);
if (epoll_ctl(_ep, EPOLL_CTL_ADD, _listenFd, (IntPtr)lev) != 0)
throw new Exception("epoll_ctl ADD listen failed");

IntPtr eventsBuf = Marshal.AllocHGlobal(EvSize * _maxEvents);
Console.WriteLine($"[shrike-k r{Id}] listening on 0.0.0.0:{_port}");

while (_running)
{
int n = epoll_wait(_ep, eventsBuf, _maxEvents, -1);
if (n < 0) { if (Marshal.GetLastPInvokeError() == EINTR) continue; break; }

for (int i = 0; i < n; i++)
{
ReadEpollEvent((byte*)eventsBuf + i * EvSize, out uint evs, out int fd);

if (fd == _listenFd) { AcceptLoop(); continue; }

if (!_conns.TryGetValue(fd, out var conn)) continue;

if ((evs & (uint)(EPOLLERR | EPOLLHUP | EPOLLRDHUP)) != 0)
{
Close(conn);
continue;
}

if ((evs & (uint)EPOLLIN) != 0)
{
if (!conn.OnReadable()) { Close(conn); continue; }
}

if ((evs & (uint)EPOLLOUT) != 0)
{
conn.SignalWritable();
}
}
}

Marshal.FreeHGlobal(eventsBuf);
close(_listenFd);
close(_ep);
}

private void AcceptLoop()
{
for (;;)
{
int cfd = accept4(_listenFd, IntPtr.Zero, IntPtr.Zero, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (cfd >= 0)
{
int one = 1;
setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, ref one, sizeof(int));

byte* ev = stackalloc byte[EvSize];
WriteEpollEvent(ev, (uint)(EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP) | EPOLLET, cfd);
epoll_ctl(_ep, EPOLL_CTL_ADD, cfd, (IntPtr)ev);

var c = new EpollConnection(cfd, _ep, this);
_conns[cfd] = c;
OnAccept?.Invoke(c);
_ = c.RunOutputPump();
continue;
}
int err = Marshal.GetLastPInvokeError();
if (err == EINTR) continue;
break; // EAGAIN/EWOULDBLOCK (drained) or transient error
}
}

private static void Close(EpollConnection conn)
{
conn.MarkClosed(); // complete Input.Writer (reactor is its sole writer)
conn.DecRef(); // reactor/recv side done
}

private static int OpenReusePortListener(ushort port, int backlog)
{
int fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
if (fd < 0) throw new Exception($"socket failed errno={Marshal.GetLastPInvokeError()}");

int one = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, ref one, sizeof(int));
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, ref one, sizeof(int));

int fl = fcntl(fd, F_GETFL, 0);
if (fl >= 0) fcntl(fd, F_SETFL, fl | O_NONBLOCK);

var addr = new sockaddr_in
{
sin_family = (ushort)AF_INET,
sin_port = Htons(port),
sin_addr = new in_addr { s_addr = 0 },
sin_zero = new byte[8]
};
if (bind(fd, ref addr, (uint)Marshal.SizeOf<sockaddr_in>()) != 0)
throw new Exception($"bind failed errno={Marshal.GetLastPInvokeError()}");
if (listen(fd, backlog) != 0)
throw new Exception($"listen failed errno={Marshal.GetLastPInvokeError()}");
return fd;
}
}
Loading
Loading