From d7bd4c6aca43dfe05f8d860e7f257d5c589e2d13 Mon Sep 17 00:00:00 2001 From: Diogo Martins Date: Mon, 8 Jun 2026 00:43:07 +0100 Subject: [PATCH] [C#] Add shrike-minima - epoll, SPSC recv ring (Minima model) A C# epoll engine with an IVTS-backed, RCA=true async handler, fixed the Minima way: an SPSC recv ring decouples the worker from the handler. The worker recv's into pooled buffers and enqueues (ptr, len) on a per-connection single-producer / single-consumer ring; the handler resumes on the thread pool, dequeues the chunks into its own parse buffer, and returns the buffers. Worker and handler never share a buffer (no driver/handler race), and recv pipelines with parse. The epoll analogue of Minima's SpscRecvRing. Sibling of shrike-tokio (#832), which fixes the same race the Tokio way (the handler does its own recv). Serves baseline, pipelined, limited-conn; hand-rolled HTTP/1.1; Connection: close sends a FIN. Validated 14/14 (every TCP-fragmentation case), 0/100 fragmented POST, 8000 keep-alive with zero drops. --- frameworks/shrike-minima/.dockerignore | 3 + frameworks/shrike-minima/.gitignore | 2 + frameworks/shrike-minima/ABI/Native.cs | 297 +++++++++++++++++ .../ABI/ProcessorArchDependant.cs | 149 +++++++++ frameworks/shrike-minima/Dockerfile | 12 + frameworks/shrike-minima/Engine/Connection.cs | 254 ++++++++++++++ frameworks/shrike-minima/Engine/RecvPipe.cs | 62 ++++ .../Engine/ShrikeEngine.Builder.cs | 174 ++++++++++ .../Engine/ShrikeEngine.Runner.cs | 37 +++ .../Engine/ShrikeEngine.Worker.cs | 176 ++++++++++ .../shrike-minima/Engine/ShrikeEngine.cs | 127 +++++++ frameworks/shrike-minima/Engine/Worker.cs | 84 +++++ .../HttpProtocol/H1/BinaryH1HeaderData.cs | 14 + .../HttpProtocol/H1/CachedH1Data.cs | 38 +++ .../HttpProtocol/H1/H1HeaderData.cs | 26 ++ .../HttpProtocol/H1/HeaderParsing.cs | 269 +++++++++++++++ .../HttpProtocol/H1/StringCache.cs | 113 +++++++ .../H1/StringCacheMemoryVariant.cs | 72 ++++ frameworks/shrike-minima/Program.cs | 312 ++++++++++++++++++ frameworks/shrike-minima/README.md | 38 +++ .../SerializableObjects/JsonMessage.cs | 14 + .../shrike-minima/Utilities/DateHelper.cs | 64 ++++ .../shrike-minima/Utilities/HashUtils.cs | 93 ++++++ .../Utilities/PinnedByteSequence.cs | 39 +++ .../Utilities/PooledDictionary.cs | 285 ++++++++++++++++ .../Utilities/UnmanagedMemoryManager.cs | 83 +++++ .../Writers/FixedBufferWriter.cs | 129 ++++++++ .../shrike-minima/Writers/ISpanWriter.cs | 20 ++ .../Writers/IUnmanagedBufferWriter.cs | 40 +++ frameworks/shrike-minima/_usings.cs | 15 + frameworks/shrike-minima/meta.json | 15 + frameworks/shrike-minima/shrike-minima.csproj | 20 ++ 32 files changed, 3076 insertions(+) create mode 100644 frameworks/shrike-minima/.dockerignore create mode 100644 frameworks/shrike-minima/.gitignore create mode 100644 frameworks/shrike-minima/ABI/Native.cs create mode 100644 frameworks/shrike-minima/ABI/ProcessorArchDependant.cs create mode 100644 frameworks/shrike-minima/Dockerfile create mode 100644 frameworks/shrike-minima/Engine/Connection.cs create mode 100644 frameworks/shrike-minima/Engine/RecvPipe.cs create mode 100644 frameworks/shrike-minima/Engine/ShrikeEngine.Builder.cs create mode 100644 frameworks/shrike-minima/Engine/ShrikeEngine.Runner.cs create mode 100644 frameworks/shrike-minima/Engine/ShrikeEngine.Worker.cs create mode 100644 frameworks/shrike-minima/Engine/ShrikeEngine.cs create mode 100644 frameworks/shrike-minima/Engine/Worker.cs create mode 100644 frameworks/shrike-minima/HttpProtocol/H1/BinaryH1HeaderData.cs create mode 100644 frameworks/shrike-minima/HttpProtocol/H1/CachedH1Data.cs create mode 100644 frameworks/shrike-minima/HttpProtocol/H1/H1HeaderData.cs create mode 100644 frameworks/shrike-minima/HttpProtocol/H1/HeaderParsing.cs create mode 100644 frameworks/shrike-minima/HttpProtocol/H1/StringCache.cs create mode 100644 frameworks/shrike-minima/HttpProtocol/H1/StringCacheMemoryVariant.cs create mode 100644 frameworks/shrike-minima/Program.cs create mode 100644 frameworks/shrike-minima/README.md create mode 100644 frameworks/shrike-minima/SerializableObjects/JsonMessage.cs create mode 100644 frameworks/shrike-minima/Utilities/DateHelper.cs create mode 100644 frameworks/shrike-minima/Utilities/HashUtils.cs create mode 100644 frameworks/shrike-minima/Utilities/PinnedByteSequence.cs create mode 100644 frameworks/shrike-minima/Utilities/PooledDictionary.cs create mode 100644 frameworks/shrike-minima/Utilities/UnmanagedMemoryManager.cs create mode 100644 frameworks/shrike-minima/Writers/FixedBufferWriter.cs create mode 100644 frameworks/shrike-minima/Writers/ISpanWriter.cs create mode 100644 frameworks/shrike-minima/Writers/IUnmanagedBufferWriter.cs create mode 100644 frameworks/shrike-minima/_usings.cs create mode 100644 frameworks/shrike-minima/meta.json create mode 100644 frameworks/shrike-minima/shrike-minima.csproj diff --git a/frameworks/shrike-minima/.dockerignore b/frameworks/shrike-minima/.dockerignore new file mode 100644 index 000000000..6a46dd93c --- /dev/null +++ b/frameworks/shrike-minima/.dockerignore @@ -0,0 +1,3 @@ +bin/ +obj/ +.git diff --git a/frameworks/shrike-minima/.gitignore b/frameworks/shrike-minima/.gitignore new file mode 100644 index 000000000..cd42ee34e --- /dev/null +++ b/frameworks/shrike-minima/.gitignore @@ -0,0 +1,2 @@ +bin/ +obj/ diff --git a/frameworks/shrike-minima/ABI/Native.cs b/frameworks/shrike-minima/ABI/Native.cs new file mode 100644 index 000000000..92944f9e0 --- /dev/null +++ b/frameworks/shrike-minima/ABI/Native.cs @@ -0,0 +1,297 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// Linux interop surface for a high-performance, epoll-driven TCP server. +/// +/// Design goals: +/// - **Minimal marshaling overhead**: prefer blittable types (e.g., pointers, ints). +/// - **Explicit error handling**: all functions are marked . +/// Use Marshal.GetLastPInvokeError() immediately after a failure to read errno. +/// - **Unsafe-friendly**: exposes pointer overloads for zero-copy recv/send. +/// +/// Platform notes: +/// - Constants can differ across libc/architectures/kernels. The values here target +/// mainstream Linux/glibc on x86_64. If you target other distros/architectures, verify +/// these values against system headers (bits/socket.h, fcntl.h, sys/epoll.h, sys/eventfd.h). +/// - Network byte order: ports must be big-endian (use htons); addresses must be set appropriately. +/// - SIGPIPE: either ignore SIGPIPE process-wide or pass to send. +/// +internal static unsafe class Native +{ + // ========================= + // P/Invoke + // ========================= + + /// + /// Create a socket. Typically domain=AF_INET, type=SOCK_STREAM, protocol=IPPROTO_TCP. + /// Returns a file descriptor (>= 0) on success, or -1 on error (check errno). + /// + [DllImport("libc", SetLastError = true)] internal static extern int socket(int domain, int type, int protocol); + + /// + /// Bind a socket to an address/port. Use for IPv4. + /// Returns 0 on success, -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int bind(int sockfd, ref sockaddr_in addr, uint addrlen); + + /// + /// Mark a bound socket as passive (accept incoming connections). + /// is the kernel queue length hint. + /// Returns 0 on success, -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int listen(int sockfd, int backlog); + + /// + /// Accept a new connection. flags can include and + /// to atomically configure the accepted FD. Returns new client FD or -1 on error. + /// Use Marshal.GetLastPInvokeError() to check for / in edge-triggered loops. + /// + [DllImport("libc", SetLastError = true)] internal static extern int accept4(int sockfd, IntPtr addr, IntPtr addrlen, int flags); + + /// + /// Set a socket option (int value). Common options: , TCP_NODELAY, etc. + /// Returns 0 on success, -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int setsockopt(int sockfd, int level, int optname, ref int optval, uint optlen); + + /// + /// Set SO_LINGER using struct. + /// Returns 0 on success, -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int setsockopt(int sockfd, int level, int optname, ref Linger optval, uint optlen); + + /// + /// File control. Typical usage: get/set O_NONBLOCK on a socket. + /// Returns result per command, or -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int fcntl(int fd, int cmd, int arg); + + /// + /// Close a file descriptor (socket or epoll/eventfd). Returns 0 on success, -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int close(int fd); + + /// + /// Read from a file descriptor into unmanaged memory. + /// For sockets, prefer . + /// Returns bytes read (>=0) or -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern long read(int fd, IntPtr buf, ulong count); + + /// + /// Write to a file descriptor from unmanaged memory. + /// For sockets, prefer . + /// Returns bytes written (>=0) or -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern long write(int fd, IntPtr buf, ulong count); + + /// + /// Receive from a socket into unmanaged memory. Returns bytes received (>=0), 0 on orderly shutdown, or -1 on error. + /// Set flags to 0 for normal reads. + /// + [DllImport("libc", SetLastError = true)] internal static extern long recv(int sockfd, IntPtr buf, ulong len, int flags); + + /// + /// Receive from a socket into a raw pointer. Equivalent to the IntPtr overload, but avoids extra pinning overhead when you already have a pointer. + /// + [DllImport("libc", SetLastError = true)] internal static extern long recv(int sockfd, byte* buf, ulong len, int flags); + + /// + /// Send to a socket from unmanaged memory. Returns bytes sent (>=0) or -1 on error. + /// Consider passing in flags to avoid SIGPIPE on closed peers. + /// + [DllImport("libc", SetLastError = true)] internal static extern long send(int sockfd, IntPtr buf, ulong len, int flags); + + /// + /// Send to a socket from a raw pointer (long length). + /// + [DllImport("libc", SetLastError = true)] internal static extern long send(int sockfd, byte* buf, long len, int flags); + + /// + /// Send to a socket from a raw void* and nuint length. + /// This signature maps closely to the native prototype and can reduce marshaling overhead in hot paths. + /// + [DllImport("libc", SetLastError = true)] public static extern nint send(int sockfd, void* buf, nuint len, int flags); + + /// + /// Create an epoll instance. Returns an epoll file descriptor (>=0) or -1 on error. + /// Use to set close-on-exec at creation time. + /// + [DllImport("libc", SetLastError = true)] internal static extern int epoll_create1(int flags); + + /// + /// Control the epoll interest list (add/mod/del). The ev points to an epoll_event struct in unmanaged memory. + /// Returns 0 on success, -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int epoll_ctl(int epfd, int op, int fd, IntPtr ev); + + /// + /// Wait for events. events points to a contiguous array of epoll_event (maxevents elements). + /// Returns number of events (>=0) or -1 on error. Use timeout < 0 to block indefinitely. + /// + [DllImport("libc", SetLastError = true)] internal static extern int epoll_wait(int epfd, IntPtr events, int maxevents, int timeout); + + /// + /// Create an eventfd (userspace semaphore/notification). Great for waking worker threads from another thread. + /// Returns fd (>=0) or -1 on error. + /// + [DllImport("libc", SetLastError = true)] internal static extern int eventfd(uint initval, int flags); + + [DllImport("libc", SetLastError = true)] internal static extern int sched_setaffinity(int pid, IntPtr cpusetsize, ref ulong mask); + + [DllImport("libc", SetLastError = true)] internal static extern int sched_setaffinity(int pid, IntPtr cpusetsize, ref cpu_set_t mask); + + [DllImport("libc")] internal static extern int gettid(); // Linux thread id + + // ========================= + // Struct definitions + // ========================= + + /// + /// IPv4 address (network byte order). + /// + [StructLayout(LayoutKind.Sequential)] + internal struct in_addr + { + /// + /// Address in network byte order (big-endian). 0 == INADDR_ANY. + /// + public uint s_addr; + } + + /// + /// IPv4 socket address. Must be passed with addrlen = (uint)sizeof(sockaddr_in). + /// + [StructLayout(LayoutKind.Sequential)] + internal struct sockaddr_in + { + /// Address family (AF_INET). + public ushort sin_family; + + /// Port in network byte order (use htons). + public ushort sin_port; + + /// IPv4 address (use INADDR_ANY or a specific address in network byte order). + public in_addr sin_addr; + + /// + /// Padding to match native layout (8 bytes). Must be present for correct size. + /// It need not be initialized for normal usage; the kernel ignores it. + /// + [MarshalAs(UnmanagedType.ByValArray, SizeConst = 8)] + public byte[] sin_zero; + } + + /// + /// linger option for SO_LINGER. + /// If l_onoff != 0, close() will block up to l_linger seconds to flush pending data. + /// Be careful: enabling linger can cause unexpected blocking on close. + /// + [StructLayout(LayoutKind.Sequential)] + internal struct Linger + { + public int l_onoff; + public int l_linger; + } + + + // ========================= + // Constants + // ========================= + // Socket families/types/protocols + internal const int AF_INET = 2; + internal const int SOCK_STREAM = 1; + internal const int IPPROTO_TCP = 6; + + // setsockopt levels / names + internal const int SOL_SOCKET = 1; + internal const int SO_REUSEADDR = 2; + internal const int SO_REUSEPORT = 15; + internal const int SO_LINGER = 13; + /// + /// TCP_NODELAY (disable Nagle). Linux defines this at level IPPROTO_TCP with optname=1. + /// (Kept here as constant=1; use level=IPPROTO_TCP when calling setsockopt.) + /// + internal const int TCP_NODELAY = 1; + + // fcntl / file status flags + internal const int O_NONBLOCK = 0x800; // Verify per-arch. + internal const int F_GETFL = 3; + internal const int F_SETFL = 4; + + // epoll events + internal const int EPOLLIN = 0x001; + internal const int EPOLLOUT = 0x004; + internal const int EPOLLERR = 0x008; + internal const int EPOLLHUP = 0x010; + internal const int EPOLLRDHUP = 0x2000; + internal const uint EPOLLET = 0x80000000; + internal const uint EPOLLONESHOT = 0x40000000; + + // epoll_ctl ops + internal const int EPOLL_CTL_ADD = 1; + internal const int EPOLL_CTL_DEL = 2; + internal const int EPOLL_CTL_MOD = 3; + + // CLOEXEC / NONBLOCK flags (creation-time) + /// Close-on-exec for epoll_create1/eventfd. (Verify on your target kernel/arch.) + internal const int EPOLL_CLOEXEC = 0x80000; + + /// + /// On many Linux systems, SOCK_CLOEXEC is 0x1000000 (not 0x80000). + /// Validate this constant on your target platform if you pass it to socket() or accept4(). + /// + internal const int SOCK_CLOEXEC = 0x80000; + + /// Creation-time nonblocking for socket/accept4. + internal const int SOCK_NONBLOCK = 0x800; + + // eventfd flags + internal const int EFD_NONBLOCK = 0x800; + internal const int EFD_CLOEXEC = 0x80000; + + // send/recv flags + /// + /// Suppress SIGPIPE on send. Alternatively, ignore SIGPIPE process-wide. + /// + internal const int MSG_NOSIGNAL = 0x4000; + + // Common errno values we branch on in tight loops + internal const int EINTR = 4; + internal const int EAGAIN = 11; + internal const int EWOULDBLOCK = 11; + internal const int EPIPE = 32; + internal const int ECONNABORTED = 103; + internal const int ECONNRESET = 104; + + public static void PinCurrentThreadToCpu(int cpuIndex) + { + if (cpuIndex < 0 || cpuIndex >= Environment.ProcessorCount) + throw new ArgumentOutOfRangeException(nameof(cpuIndex)); + + unsafe + { + var set = new cpu_set_t(); + int word = cpuIndex / 64; + int bit = cpuIndex % 64; + set.Bits[word] = 1UL << bit; + + int tid = gettid(); + int ret = sched_setaffinity(tid, (IntPtr)sizeof(cpu_set_t), ref set); + if (ret != 0) + throw new InvalidOperationException($"sched_setaffinity failed with errno {Marshal.GetLastPInvokeError()}"); + } + } +} + +internal unsafe struct cpu_set_t +{ + public fixed ulong Bits[16]; // 1024 bits (enough for up to 1024 CPUs) +} \ No newline at end of file diff --git a/frameworks/shrike-minima/ABI/ProcessorArchDependant.cs b/frameworks/shrike-minima/ABI/ProcessorArchDependant.cs new file mode 100644 index 000000000..c8a712b2a --- /dev/null +++ b/frameworks/shrike-minima/ABI/ProcessorArchDependant.cs @@ -0,0 +1,149 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// Provides architecture-dependent helpers for low-level socket and epoll interop. +/// +/// +/// Linux’s struct epoll_event has different binary layouts depending on CPU architecture +/// (notably 12 bytes on x86/x64 and 16 bytes on most other architectures like ARM/ARM64). +/// This class exposes constants and helpers to correctly read and write those structures +/// at runtime based on the process architecture. +/// +/// +/// +/// These methods are used to serialize and deserialize epoll_event structures directly +/// into unmanaged buffers when interfacing with epoll_wait, epoll_ctl, and related syscalls. +/// +/// +internal static unsafe class ProcessorArchDependant +{ + // ============================================================================================= + // Architecture-dependent configuration + // ============================================================================================= + + /// + /// Indicates whether the current platform uses a packed epoll_event layout (12 bytes). + /// + /// On x86 and x64 (little-endian), the epoll_event structure is packed to 12 bytes. + /// On ARM, ARM64, and others, it uses natural 8-byte alignment, resulting in 16 bytes. + /// + /// + internal static readonly bool Packed = + RuntimeInformation.ProcessArchitecture == Architecture.X64 || + RuntimeInformation.ProcessArchitecture == Architecture.X86; + + /// + /// The size (in bytes) of an epoll_event structure for the current runtime architecture. + /// + /// Typically 12 bytes for packed x86/x64 layouts and 16 for natural alignment layouts. + /// + /// + internal static readonly int EvSize = Packed ? 12 : 16; + + // ============================================================================================= + // Struct read/write helpers + // ============================================================================================= + + /// + /// Writes a Linux epoll_event structure into a preallocated unmanaged memory region. + /// + /// Destination pointer to write the structure into. + /// Bitmask of epoll events (e.g. EPOLLIN, EPOLLOUT, EPOLLRDHUP, etc.). + /// The file descriptor associated with the event. + /// + /// + /// Layouts by architecture: + /// + /// Packed (x86/x64): events @ 0 (4 bytes), data @ 4 (8 bytes) + /// Natural (ARM/others): events @ 0 (4 bytes), padding 4, data @ 8 (8 bytes) + /// + /// + /// Only the lower 32 bits of are stored in the data field. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static void WriteEpollEvent(void* dest, uint events, int fd) + { + if (Packed) + { + // events @0 (4 bytes), data @4 (8 bytes) + *(uint*)dest = events; + *(ulong*)((byte*)dest + 4) = (uint)fd; // store fd in low 32 bits + } + else + { + // events @0 (4 bytes), pad 4, data @8 (8 bytes) + *(uint*)dest = events; + *(ulong*)((byte*)dest + 8) = (uint)fd; + } + } + + /// + /// Reads a Linux epoll_event structure from unmanaged memory and extracts its fields. + /// + /// Pointer to the source buffer containing the epoll_event structure. + /// Outputs the event flags (EPOLLIN, EPOLLOUT, etc.). + /// Outputs the associated file descriptor. + /// + /// Reads using the correct layout depending on the flag. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static void ReadEpollEvent(void* src, out uint events, out int fd) + { + if (Packed) + { + events = *(uint*)src; + fd = (int)*(uint*)((byte*)src + 4); + } + else + { + events = *(uint*)src; + fd = (int)*(uint*)((byte*)src + 8); + } + } + + // Variations, TODO: Test performance required + [MethodImpl(MethodImplOptions.AggressiveInlining | MethodImplOptions.AggressiveOptimization)] + internal static void WriteEpollEvent2(void* dest, uint events, int fd) + { + // Write events (always aligned 4B store) + *(uint*)dest = events; + // Compute data offset (packed: +4, natural: +8) + var data = (byte*)dest + (Packed ? 4 : 8); + // Store only low 32 bits of fd and zero the high 32 bits. + // Using two 4B stores avoids an unaligned 8B write in the packed layout. + *(uint*)data = (uint)fd; // low 32 + *(uint*)(data + 4) = 0; // high 32 + } + + [MethodImpl(MethodImplOptions.AggressiveInlining | MethodImplOptions.AggressiveOptimization)] + internal static void ReadEpollEvent2(void* src, out uint events, out int fd) + { + events = *(uint*)src; + var data = (byte*)src + (Packed ? 4 : 8); + // We only ever wrote the low 32 bits; read exactly those. + fd = (int)*(uint*)data; + } + + // ============================================================================================= + // Networking helpers + // ============================================================================================= + + /// + /// Converts a 16-bit unsigned integer from host byte order to network byte order (big-endian). + /// + /// The value to convert. + /// The converted value in network byte order. + /// + /// Equivalent to the native htons() function from the BSD sockets API. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static ushort Htons(ushort x) => + BitConverter.IsLittleEndian ? BinaryPrimitives.ReverseEndianness(x) : x; +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Dockerfile b/frameworks/shrike-minima/Dockerfile new file mode 100644 index 000000000..cda98c974 --- /dev/null +++ b/frameworks/shrike-minima/Dockerfile @@ -0,0 +1,12 @@ +FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build +WORKDIR /src +COPY shrike-minima.csproj ./ +RUN dotnet restore +COPY . . +RUN dotnet publish -c Release -o /app/out --no-restore + +FROM mcr.microsoft.com/dotnet/runtime:10.0 +WORKDIR /app +COPY --from=build /app/out ./ +EXPOSE 8080 +ENTRYPOINT ["dotnet", "shrike-minima.dll"] diff --git a/frameworks/shrike-minima/Engine/Connection.cs b/frameworks/shrike-minima/Engine/Connection.cs new file mode 100644 index 000000000..0aea25e64 --- /dev/null +++ b/frameworks/shrike-minima/Engine/Connection.cs @@ -0,0 +1,254 @@ +using System.Threading.Tasks.Sources; + +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// Per-connection state with Minima-style IVTS on the read and flush paths. +/// +/// The epoll worker is the DRIVER: on EPOLLIN it drains recv into +/// then calls ; on EPOLLOUT +/// it continues and, when drained, calls . +/// The per-connection handler loop awaits / ; +/// because RunContinuationsAsynchronously = false, those continuations run inline +/// on the worker thread — the handler and the driver are the same thread (cooperative, +/// single-threaded), exactly like Minima's reactor. +/// +[SkipLocalsInit] +public sealed unsafe class Connection : IValueTaskSource, IValueTaskSource, IDisposable +{ + public enum FlushResult { Complete, Incomplete, Close } + + // ---- recv window: valid bytes in [Head .. Tail) ---- + public int Head, Tail; + public readonly byte* ReceiveBuffer; + private readonly int _inSlabSize; + + // ---- send buffer ---- + public readonly FixedBufferWriter WriteBuffer; + + // ---- per-request parsed header (no allocations) ---- + public BinaryH1HeaderData BinaryH1HeaderData { get; set; } + public H1HeaderData H1HeaderData { get; set; } = null!; + + // ---- epoll wiring (set when the connection is bound to a live fd) ---- + public int Fd; + public int Ep; + + // ---- read IVTS (result = isClosed) ---- + private ManualResetValueTaskSourceCore _readSignal = new() { RunContinuationsAsynchronously = true }; + private int _armed; + private int _pending; + private int _closed; + + // ---- flush IVTS ---- + private ManualResetValueTaskSourceCore _flushSignal = new() { RunContinuationsAsynchronously = true }; + private int _flushArmed; + + /// SPSC ring carrying recv chunks from the worker to the handler. + public readonly SpscRing RecvRing = new(1024); + + /// Capacity of the handler-owned parse buffer (ReceiveBuffer). + public int InCapacity => _inSlabSize; + + public Connection(int maxConnections, int inSlabSize, int outSlabSize) + { + _inSlabSize = inSlabSize; + ReceiveBuffer = (byte*)NativeMemory.AlignedAlloc((nuint)inSlabSize, 64); + WriteBuffer = new FixedBufferWriter((byte*)NativeMemory.AlignedAlloc((nuint)outSlabSize, 64), outSlabSize); + } + + /// + /// Parse the next complete HTTP request from the recv window into + /// and advance past it. Returns false when no + /// more complete requests remain, compacting any trailing partial to the front + /// so the next recv appends after it. Call in a loop after . + /// + public bool TryReadRequest() + { + int idx = 0; + ReadOnlySpan headerSpan = FindCrlfCrlf(ReceiveBuffer, Head, Tail, ref idx); + if (idx < 0) + { + Compact(); + return false; + } + BinaryH1HeaderData = ExtractBinaryH1HeaderData(headerSpan); + Head = idx + 4; // advance past CRLFCRLF + return true; + } + + public void Compact() + { + if (Head == 0) return; // nothing consumed — partial already at the front, keep it + if (Head < Tail) + { + int length = Tail - Head; + Buffer.MemoryCopy(ReceiveBuffer + Head, ReceiveBuffer, _inSlabSize, length); + Head = 0; + Tail = length; + } + else + { + Head = Tail = 0; // fully consumed + } + } + + /// Bind to a fresh fd taken from the pool and reset all per-connection state. + public void Reset(int fd, int ep) + { + Fd = fd; + Ep = ep; + Head = Tail = 0; + WriteBuffer.Reset(); + while (RecvRing.TryDequeue(out byte* leftover, out _)) RecvPool.Return(leftover); // drain stale chunks + Volatile.Write(ref _armed, 0); + Volatile.Write(ref _pending, 0); + Volatile.Write(ref _closed, 0); + Volatile.Write(ref _flushArmed, 0); + _readSignal.Reset(); + _flushSignal.Reset(); + } + + public void Clear() => H1HeaderData?.Clear(); + + public bool IsClosed => Volatile.Read(ref _closed) != 0; + + // ============================ READ ============================ + + public ValueTask ReadAsync() + { + if (Volatile.Read(ref _pending) == 1) + { + Volatile.Write(ref _pending, 0); + return new ValueTask(Volatile.Read(ref _closed) != 0); + } + if (Volatile.Read(ref _closed) != 0) + return new ValueTask(true); + + _readSignal.Reset(); + Volatile.Write(ref _armed, 1); + + // Lost-wakeup guard: data/close may have raced in just before we armed. + if (Volatile.Read(ref _pending) == 1 || Volatile.Read(ref _closed) != 0) + { + Volatile.Write(ref _pending, 0); + Volatile.Write(ref _armed, 0); + return new ValueTask(Volatile.Read(ref _closed) != 0); + } + return new ValueTask(this, _readSignal.Version); + } + + /// Worker thread: the fd is readable — wake the handler's ReadAsync. + /// The worker does NOT recv; the handler calls itself. + public void SignalReadable() + { + if (Interlocked.Exchange(ref _armed, 0) == 1) + _readSignal.SetResult(Volatile.Read(ref _closed) != 0); + else + Volatile.Write(ref _pending, 1); + } + + // ============================ FLUSH ============================ + + public ValueTask FlushAsync() + { + FlushResult r = TryFlush(); + if (r == FlushResult.Complete) + return ValueTask.CompletedTask; + if (r == FlushResult.Close) + { + MarkClosed(); + return ValueTask.CompletedTask; // Serve observes IsClosed and exits + } + + // Partial / EAGAIN — wait for the worker to drain EPOLLOUT. + _flushSignal.Reset(); + Volatile.Write(ref _flushArmed, 1); + ArmEpollOut(); + return new ValueTask(this, _flushSignal.Version); + } + + /// Non-blocking send of everything staged in . + public FlushResult TryFlush() + { + while (true) + { + long remaining = WriteBuffer.Tail - WriteBuffer.Head; + if (remaining == 0) { WriteBuffer.Reset(); return FlushResult.Complete; } + + byte* head = WriteBuffer.Ptr + WriteBuffer.Head; + long n = send(Fd, head, remaining, MSG_NOSIGNAL); + if (n > 0) + { + if (n == remaining) { WriteBuffer.Reset(); return FlushResult.Complete; } + WriteBuffer.Head += (int)n; + continue; + } + + int err = (n == 0) ? EAGAIN : Marshal.GetLastPInvokeError(); + if (err is EAGAIN or EWOULDBLOCK) return FlushResult.Incomplete; + return FlushResult.Close; + } + } + + /// Worker thread: EPOLLOUT fully drained — wake the handler's FlushAsync. + public void CompleteFlush() + { + if (Interlocked.Exchange(ref _flushArmed, 0) == 1) + _flushSignal.SetResult(true); + } + + // ============================ CLOSE ============================ + + public void MarkClosed() + { + Volatile.Write(ref _closed, 1); + if (Interlocked.Exchange(ref _armed, 0) == 1) + _readSignal.SetResult(true); + else + Volatile.Write(ref _pending, 1); + + if (Interlocked.Exchange(ref _flushArmed, 0) == 1) + _flushSignal.SetResult(true); + } + + // ========================= epoll arming ========================= + + private void ArmEpollOut() + { + byte* ev = stackalloc byte[EvSize]; + WriteEpollEvent(ev, EPOLLOUT | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET, Fd); + epoll_ctl(Ep, EPOLL_CTL_MOD, Fd, (IntPtr)ev); + } + + public void ArmEpollIn() + { + byte* ev = stackalloc byte[EvSize]; + WriteEpollEvent(ev, EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET, Fd); + epoll_ctl(Ep, EPOLL_CTL_MOD, Fd, (IntPtr)ev); + } + + // ===================== IValueTaskSource (read) ===================== + bool IValueTaskSource.GetResult(short token) => _readSignal.GetResult(token); + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _readSignal.GetStatus(token); + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + => _readSignal.OnCompleted(continuation, state, token, flags); + + // ======================= IValueTaskSource (flush) ======================= + void IValueTaskSource.GetResult(short token) => _flushSignal.GetResult(token); + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _flushSignal.GetStatus(token); + void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + => _flushSignal.OnCompleted(continuation, state, token, flags); + + public void Dispose() + { + if (ReceiveBuffer != null) + NativeMemory.AlignedFree(ReceiveBuffer); + WriteBuffer.Dispose(); + GC.SuppressFinalize(this); + } +} diff --git a/frameworks/shrike-minima/Engine/RecvPipe.cs b/frameworks/shrike-minima/Engine/RecvPipe.cs new file mode 100644 index 000000000..4c97fb5be --- /dev/null +++ b/frameworks/shrike-minima/Engine/RecvPipe.cs @@ -0,0 +1,62 @@ +namespace Shrike; + +/// +/// Per-connection single-producer / single-consumer ring of received chunks +/// (Minima-style recv handoff). The worker (producer) recv's into pooled buffers +/// and enqueues (ptr, len); the handler (consumer) dequeues them, copies the bytes +/// into its own parse buffer, and returns the buffer to the pool. Producer and +/// consumer touch disjoint ends (tail / head) with release/acquire ordering, so the +/// recv buffer is never shared between the two threads — no driver/handler race. +/// +public sealed unsafe class SpscRing +{ + private struct Slot { public byte* Ptr; public int Len; } + + private readonly Slot[] _slots; + private readonly int _mask; + private int _head; // consumer (handler) only writes this + private int _tail; // producer (worker) only writes this + + public SpscRing(int capacityPow2) + { + _slots = new Slot[capacityPow2]; + _mask = capacityPow2 - 1; + } + + /// Producer (worker thread). Returns false if the ring is full. + public bool TryEnqueue(byte* ptr, int len) + { + int tail = _tail; + if (tail - Volatile.Read(ref _head) >= _slots.Length) return false; + _slots[tail & _mask] = new Slot { Ptr = ptr, Len = len }; + Volatile.Write(ref _tail, tail + 1); // release: publish slot before tail + return true; + } + + /// Consumer (handler thread). Returns false if empty. + public bool TryDequeue(out byte* ptr, out int len) + { + int head = _head; + if (head == Volatile.Read(ref _tail)) { ptr = null; len = 0; return false; } // acquire + Slot s = _slots[head & _mask]; + ptr = s.Ptr; + len = s.Len; + Volatile.Write(ref _head, head + 1); + return true; + } +} + +/// +/// Global pool of fixed-size native recv buffers. The worker takes one to recv into; +/// the handler returns it after copying the bytes into its parse buffer. Thread-safe. +/// +internal static unsafe class RecvPool +{ + public const int BufSize = 16 * 1024; + private static readonly System.Collections.Concurrent.ConcurrentQueue Free = new(); + + public static byte* Take() + => Free.TryDequeue(out IntPtr p) ? (byte*)p : (byte*)NativeMemory.Alloc(BufSize); + + public static void Return(byte* buf) => Free.Enqueue((IntPtr)buf); +} diff --git a/frameworks/shrike-minima/Engine/ShrikeEngine.Builder.cs b/frameworks/shrike-minima/Engine/ShrikeEngine.Builder.cs new file mode 100644 index 000000000..b2d997ca1 --- /dev/null +++ b/frameworks/shrike-minima/Engine/ShrikeEngine.Builder.cs @@ -0,0 +1,174 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + +namespace Shrike; + +public sealed partial class ShrikeEngine +{ + /// + /// Factory for configuring and constructing an . + /// Usage: + /// + /// var engine = ShrikeEngine.CreateBuilder() + /// .SetPort(8080) + /// .SetBacklog(16384) + /// .SetMaxNumberConnectionsPerWorker(512) + /// .SetSlabSizes(16 * 1024, 16 * 1024) + /// .SetMaxEventsPerWake(512) + /// .SetMaxStackSizePerThread(1024 * 1024) + /// .SetNWorkersSolver(() => Environment.ProcessorCount / 2) + /// .InjectRequestHandler(conn => { /* write response */ }) + /// .Build(); + /// + /// + public static ShrikeBuilder CreateBuilder() => new ShrikeBuilder(); + + // ===== Engine-wide configuration (static for now; set via builder) ===== + private static int _port = 8080; + private static int _backlog = 16384; + private static int _maxNumberConnectionsPerWorker = 512; + private static int _inSlabSize = 16 * 1024; + private static int _outSlabSize = 16 * 1024; + private static int _maxEventsPerWake = 512; + private static int _maxStackSizePerThread = 1024 * 1024; + private static int _nWorkers; + + private static Func? _calculateNumberWorkers; + + // Default per-connection handler loop (overridden via builder). Demonstrates the + // Minima-style model: await ReadAsync, parse + write, await FlushAsync. + private static Func _sHandler = DefaultHandler; + + private static async Task DefaultHandler(Connection conn) + { + while (true) + { + if (await conn.ReadAsync()) return; // wait for data; true => peer closed + bool wrote = false; + while (conn.TryReadRequest()) + { + conn.WriteBuffer.WriteUnmanaged("HTTP/1.1 200 OK\r\n"u8 + + "Server: S\r\n"u8 + + "Content-Type: text/plain\r\n"u8 + + "Content-Length: 28\r\n\r\n"u8 + + "Request handler was not set!"u8); + conn.Clear(); + wrote = true; + } + if (wrote) await conn.FlushAsync(); + } + } + + private ShrikeEngine() { } + + /// + /// Fluent builder for . This configures static fields, + /// then finalizes OS resources (listen socket) and decides worker count. + /// + public sealed class ShrikeBuilder + { + private readonly ShrikeEngine _engine; + public ShrikeBuilder() => _engine = new ShrikeEngine(); + + /// Set the TCP listening port (default: 8080). + public ShrikeBuilder SetPort(int port) + { + _port = port; + return this; + } + + /// + /// Set the listen backlog (default: 16384). This is a hint to the kernel for + /// the maximum pending connection queue length. + /// + public ShrikeBuilder SetBacklog(int backlog) + { + _backlog = backlog; + return this; + } + + /// + /// Max concurrent connections each worker is allowed to track (default: 512). + /// Used for per-worker capacity planning and slab sizing. + /// + public ShrikeBuilder SetMaxNumberConnectionsPerWorker(int maxNumberConnectionsPerWorker) + { + _maxNumberConnectionsPerWorker = maxNumberConnectionsPerWorker; + return this; + } + + /// + /// Configure input/output slab sizes used by connections (defaults: 16 KiB each). + /// + public ShrikeBuilder SetSlabSizes(int inSlabSize, int outSlabSize) + { + _inSlabSize = inSlabSize; + _outSlabSize = outSlabSize; + return this; + } + + /// + /// Operational batch size for epoll wait loops—how many events to pull per wake (default: 512). + /// Increase if workers often see saturated event bursts. + /// + public ShrikeBuilder SetMaxEventsPerWake(int maxEventsPerWake) + { + _maxEventsPerWake = maxEventsPerWake; + return this; + } + + /// + /// Max stack size per worker thread (default: 1 MiB). Useful when using large stackallocs + /// in hot loops; keep conservative to avoid stack overflows under deep recursion or bursts. + /// + public ShrikeBuilder SetMaxStackSizePerThread(int maxStackSizePerThread) + { + _maxStackSizePerThread = maxStackSizePerThread; + return this; + } + + /// + /// Provide a delegate that decides the worker count at build time. + /// If not provided, defaults to Environment.ProcessorCount / 2. + /// + public ShrikeBuilder SetNWorkersSolver(Func? solver) + { + _calculateNumberWorkers = solver; + return this; + } + + /// + /// Inject the per-connection handler loop. It owns the request lifecycle: + /// await conn.ReadAsync()while (conn.TryReadRequest()) { write }await conn.FlushAsync(). + /// + public ShrikeBuilder InjectHandler(Func handler) + { + _sHandler = handler; + return this; + } + + /// + /// Finalize configuration: + /// - Logs arch and epoll_event packing + /// - Creates and configures the listening socket + /// - Resolves worker count via solver or default heuristic + /// + public ShrikeEngine Build() + { + Console.WriteLine( + $"Arch={RuntimeInformation.ProcessArchitecture}, Packed={(Packed ? 12 : 16)}-byte epoll_event"); + + // Decide worker count (solver wins if supplied). + _nWorkers = _calculateNumberWorkers is null + ? Environment.ProcessorCount / 2 + : _calculateNumberWorkers(); + + return _engine; + } + } + +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Engine/ShrikeEngine.Runner.cs b/frameworks/shrike-minima/Engine/ShrikeEngine.Runner.cs new file mode 100644 index 000000000..ce0c477e1 --- /dev/null +++ b/frameworks/shrike-minima/Engine/ShrikeEngine.Runner.cs @@ -0,0 +1,37 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +#pragma warning disable CA2014 + +namespace Shrike; + +public sealed partial class ShrikeEngine +{ + /// + /// Boots the engine. Each worker owns its own epoll instance and its own + /// SO_REUSEPORT listen socket — the kernel load-balances accepts across them, + /// so there is no acceptor thread (this mirrors Minima's per-reactor model). + /// Worker 0 runs on the calling thread (blocks for the process lifetime); the + /// rest run on background threads. + /// + public void Run() + { + var workers = new Worker[_nWorkers]; + for (int i = 0; i < _nWorkers; i++) + workers[i] = new Worker(i, _maxEventsPerWake, _port, _backlog); + + Console.WriteLine($"Shrike listening on 0.0.0.0:{_port} with {_nWorkers} worker(s) (SO_REUSEPORT, no acceptor)"); + + for (int i = 1; i < _nWorkers; i++) + { + int iCap = i; + var t = new Thread(() => WorkerLoop(workers[iCap]), _maxStackSizePerThread) + { + IsBackground = true, + Name = $"worker-{iCap}" + }; + t.Start(); + } + + WorkerLoop(workers[0]); // run one worker on the calling thread; blocks + } +} diff --git a/frameworks/shrike-minima/Engine/ShrikeEngine.Worker.cs b/frameworks/shrike-minima/Engine/ShrikeEngine.Worker.cs new file mode 100644 index 000000000..fb8719f93 --- /dev/null +++ b/frameworks/shrike-minima/Engine/ShrikeEngine.Worker.cs @@ -0,0 +1,176 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// ReSharper disable always StackAllocInsideLoop +#pragma warning disable CA2014 + +namespace Shrike; + +public sealed partial class ShrikeEngine +{ + // Pooled Connection instances (reused across fds to avoid per-connection allocation). + private static readonly ObjectPool ConnectionPool = + new DefaultObjectPool(new ConnectionPoolPolicy(), 1024 * 32); + + private sealed class ConnectionPoolPolicy : PooledObjectPolicy + { + public override Connection Create() => new(_maxNumberConnectionsPerWorker, _inSlabSize, _outSlabSize); + public override bool Return(Connection connection) { connection.Clear(); return true; } + } + + /// + /// Worker event loop. It is purely a DRIVER for each connection's IVTS — the + /// read/parse/write/flush loop lives in the injected handler (see HandleAsync), + /// which awaits /: + /// - EPOLLIN → drain recv → SignalReadable (resumes the handler inline) + /// - EPOLLOUT → continue the partial send → on drain CompleteFlush (resumes inline) + /// - error/hup/peer-close → MarkClosed (handler exits inline) → recycle + /// Continuations run inline (RunContinuationsAsynchronously=false), so handler and + /// driver share this one worker thread — cooperative, no cross-thread races. + /// + private static unsafe void WorkerLoop(Worker W) + { + var connections = new Dictionary(capacity: _maxNumberConnectionsPerWorker); + + for (;;) + { + int n = epoll_wait(W.Ep, W.EventsBuf, W.MaxEvents, -1); + if (n < 0) { if (Marshal.GetLastPInvokeError() == EINTR) continue; throw new Exception("epoll_wait worker"); } + + for (int i = 0; i < n; i++) + { + ReadEpollEvent((byte*)W.EventsBuf + i * EvSize, out uint evs, out int fd); + + // 1) Our own SO_REUSEPORT listener is readable — drain accepts. + if (fd == W.ListenFd) + { + for (;;) + { + int cfd = accept4(W.ListenFd, IntPtr.Zero, IntPtr.Zero, SOCK_NONBLOCK | SOCK_CLOEXEC); + if (cfd >= 0) + { + int one = 1; + setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, ref one, sizeof(int)); + + byte* ev = stackalloc byte[EvSize]; + WriteEpollEvent(ev, EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET, cfd); + epoll_ctl(W.Ep, EPOLL_CTL_ADD, cfd, (IntPtr)ev); + + Connection c = ConnectionPool.Get(); + c.Reset(cfd, W.Ep); + connections[cfd] = c; + _ = RunHandler(c); // suspends immediately on its first ReadAsync + continue; + } + int err = Marshal.GetLastPInvokeError(); + if (err == EINTR) continue; + break; // EAGAIN/EWOULDBLOCK (drained) or transient error + } + continue; + } + + if (!connections.TryGetValue(fd, out var conn)) { CloseQuiet(fd); continue; } + + // 2) Error / hangup / remote half-close. + if ((evs & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) != 0) + { + conn.MarkClosed(); // resumes & exits the handler inline + CloseConn(fd, connections); + continue; + } + + // 3) Read-ready. Minima-style: the worker recv's into POOLED buffers and + // hands them to the handler via the per-connection SPSC ring. The worker + // and handler never share a buffer, so there is no driver/handler race. + if ((evs & EPOLLIN) != 0) + { + bool closed = false; + while (true) + { + byte* buf = RecvPool.Take(); + long got = recv(fd, buf, (ulong)RecvPool.BufSize, 0); + if (got > 0) + { + if (!conn.RecvRing.TryEnqueue(buf, (int)got)) { RecvPool.Return(buf); break; } // ring full + continue; + } + RecvPool.Return(buf); + if (got == 0) { closed = true; break; } // peer closed + int err = Marshal.GetLastPInvokeError(); + if (err is EAGAIN or EWOULDBLOCK) break; // drained + if (err == EINTR) continue; + closed = true; break; // hard error + } + if (closed) + { + conn.MarkClosed(); + CloseConn(fd, connections); + continue; + } + conn.SignalReadable(); + if (conn.IsClosed) CloseConn(fd, connections); + continue; + } + + // 4) Write-ready (a previous flush was partial). + if ((evs & EPOLLOUT) != 0) + { + Connection.FlushResult r = conn.TryFlush(); + if (r == Connection.FlushResult.Complete) + { + conn.ArmEpollIn(); + conn.CompleteFlush(); // resume handler inline: loops back to ReadAsync + if (conn.IsClosed) CloseConn(fd, connections); + } + else if (r == Connection.FlushResult.Close) + { + conn.MarkClosed(); + CloseConn(fd, connections); + } + // Incomplete: stay armed for EPOLLOUT. + } + } + } + } + + /// Runs the injected per-connection handler; guarantees the connection is closed when it ends. + private static async Task RunHandler(Connection conn) + { + try { await _sHandler(conn); } + catch { /* handler faulted */ } + finally { conn.MarkClosed(); } // idempotent; the worker recycles once it observes IsClosed + } + + /// Drain recv into the connection buffer (edge-triggered: read until EAGAIN). False = peer closed / hard error. + private static unsafe bool RecvDrain(Connection conn, int fd) + { + while (true) + { + int avail = _inSlabSize - conn.Tail; + if (avail == 0) return true; // buffer full — handler must drain it (large-request limit, as in Unhinged) + + long got = recv(fd, conn.ReceiveBuffer + conn.Tail, (ulong)avail, 0); + if (got > 0) { conn.Tail += (int)got; continue; } + if (got == 0) return false; // peer closed + + int err = Marshal.GetLastPInvokeError(); + if (err is EAGAIN or EWOULDBLOCK) return true; // drained + if (err == EINTR) continue; + return false; // ECONNRESET / EPIPE / unexpected + } + } + + // ===== Close helpers ===== + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void CloseConn(int fd, Dictionary map) + { + if (map.Remove(fd, out var c)) + { + ConnectionPool.Return(c); + CloseQuiet(fd); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void CloseQuiet(int fd) { try { close(fd); } catch { /* best effort */ } } +} diff --git a/frameworks/shrike-minima/Engine/ShrikeEngine.cs b/frameworks/shrike-minima/Engine/ShrikeEngine.cs new file mode 100644 index 000000000..2588dae1e --- /dev/null +++ b/frameworks/shrike-minima/Engine/ShrikeEngine.cs @@ -0,0 +1,127 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + + +namespace Shrike; + +/// +/// ShrikeEngine — a minimal, high-performance HTTP engine core built around +/// Linux epoll and eventfd, written in C# with unsafe paths and pooling to +/// minimize allocations and syscalls. This class owns process-wide configuration, socket +/// initialization, worker creation, the accept loop, and the request/response I/O pipeline. +/// +/// +/// Design goals +/// +/// Throughput first: epoll-driven, non-blocking I/O; batched accept and send. +/// Predictable latency: avoid cross-thread contention; per-worker ownership of fds. +/// Low GC pressure: slabbed receive/write buffers; pooled Connection instances. +/// Simple mental model: one acceptor + N workers; explicit state transitions. +/// +/// +/// High-level architecture +/// +/// +/// +/// Acceptor thread (runs on the caller thread): +/// +/// Owns the listening socket (_listenFd), set to O_NONBLOCK. +/// Has a dedicated epoll fd and waits for EPOLLIN on the listen socket. +/// On readiness, drains accept4() in a loop until EAGAIN. +/// For each accepted cfd, selects the least-busy worker (by Worker.Current), enqueues the fd into the worker’s inbox, and signals the worker via eventfd. +/// +/// +/// +/// +/// +/// Workers (_nWorkers threads): +/// +/// Each worker owns an epoll instance, an eventfd (NotifyEfd), and an fd→Connection map. +/// On eventfd wakeup, they dequeue new client fds and register them for EPOLLIN|EPOLLRDHUP|EPOLLERR|EPOLLHUP. +/// EPOLLIN: recv into the per-connection receive slab; parse complete requests; write responses into the write slab; try to flush immediately. +/// EPOLLOUT: continue flushing partial responses; on drain, re-arm EPOLLIN. +/// On error/hangup, the worker closes the fd and returns the Connection to the pool. +/// +/// +/// +/// +/// +/// Core data structures +/// +/// Connection: pooled object containing receive/write slabs (unsafe pointers), head/tail indices, and lightweight per-request state (e.g., hashed route). +/// ObjectPool<Connection>: reduces allocation churn under load; return path can reset buffers. +/// Header parsing helpers: naive but vectorized IndexOf(CRLF/CRLFCRLF), and a route hash via Fnv1a32. +/// +/// +/// I/O flow (hot path) +/// +/// recv → accumulate bytes in ReceiveBuffer [Head..Tail). +/// Find \r\n\r\n; on full header → extract target → _sRequestHandler(connection) writes response into WriteBuffer. +/// send(MSG_NOSIGNAL) until EAGAIN or fully flushed; if partial → arm EPOLLOUT; if drained → arm EPOLLIN. +/// +/// +/// Threading & synchronization +/// +/// Acceptor is single-threaded; workers are OS threads with configurable stack size (defaults 1 MiB). +/// Work distribution via lock-free queue (worker inbox) + eventfd wakeup (8-byte increments). +/// Worker.Current is adjusted with Interlocked; least-busy selection uses Volatile.Read inside a simple O(N) scan. +/// +/// +/// Configuration knobs (set via ShrikeBuilder) +/// +/// _port, _backlog: listener endpoint and queue size. +/// _maxNumberConnectionsPerWorker: map capacity & planning per worker. +/// _inSlabSize/_outSlabSize: receive/write slab sizes (defaults 16 KiB). +/// _maxEventsPerWake: epoll batch size per wait. +/// _maxStackSizePerThread: worker thread stack (useful for stackalloc heavy code paths). +/// _calculateNumberWorkers: custom worker count heuristic; defaults to ProcessorCount / 2. +/// _sRequestHandler: application callback that writes to WriteBuffer. +/// +/// +/// Socket & epoll setup +/// +/// Listener: socket(AF_INET, SOCK_STREAM|CLOEXEC, IPPROTO_TCP), SO_REUSEADDR, O_NONBLOCK, bind(0.0.0.0:port), listen(backlog). +/// Acceptor epoll: monitor EPOLLIN|EPOLLERR|EPOLLHUP on the listen fd. +/// Accepted sockets: TCP_NODELAY, SO_LINGER off, O_NONBLOCK (via accept4 flags). +/// +/// +/// Error handling +/// +/// EINTR/EAGAIN/EWOULDBLOCK are treated as transient. +/// ECONNRESET/ECONNABORTED/EPIPE close the connection. +/// Unexpected errors during recv/send/ctl cause a quiet close; the worker load counter is decremented. +/// +/// +/// Performance notes +/// +/// Batching: accept loop drains in one epoll tick; send loop attempts to drain buffer before arming EPOLLOUT. +/// Pooling: Connection reuse avoids per-request allocations; slabs are fixed-size for cache locality. +/// Vectorized searches: Span<>.IndexOf on constants like CRLFCRLF leverages SIMD on modern runtimes. +/// +/// +/// Safety & invariants +/// +/// All fds registered in a worker’s epoll must be serviced only by that worker. +/// For ReadOnlySpan over byte*, callers guarantee the [Head..Tail) window is valid and pinned for the call duration. +/// WriteBuffer and ReceiveBuffer head/tail invariants are maintained by Connection methods and parsing routines. +/// +/// +/// Extensibility +/// +/// Custom routing/dispatch: swap _sRequestHandler; keep response writes contiguous for fewer syscalls. +/// Back-pressure: introduce per-fd send windowing or global egress caps if needed. +/// HTTP parsing: current header scan is naive; can be upgraded with SIMD or a finite-state parser. +/// +/// +/// Limitations / TODO +/// +/// No dynamic slab growth yet; large headers/bodies can overflow the configured receive slab. +/// Request pipelining supported at header-level; body handling and chunked decoding not implemented here. +/// Minimal logging/telemetry; production builds should integrate structured, sampling-friendly logs. +/// +/// +public sealed partial class ShrikeEngine { } \ No newline at end of file diff --git a/frameworks/shrike-minima/Engine/Worker.cs b/frameworks/shrike-minima/Engine/Worker.cs new file mode 100644 index 000000000..24027e790 --- /dev/null +++ b/frameworks/shrike-minima/Engine/Worker.cs @@ -0,0 +1,84 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// ReSharper disable always StackAllocInsideLoop +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// One worker = one thread + one epoll instance + its OWN SO_REUSEPORT listen +/// socket. Like Minima's reactor: the kernel load-balances accepts across the N +/// per-worker listeners, so there's no acceptor thread, no fd handoff, and no +/// eventfd. Each worker accepts and serves its own connections end to end. +/// +[SkipLocalsInit] +internal sealed unsafe class Worker : IDisposable +{ + internal readonly int Index; + + // This worker's own epoll instance (client fds + the listen socket). + internal readonly int Ep; + + // This worker's own SO_REUSEPORT listening socket. + internal readonly int ListenFd; + + // Unmanaged buffer for epoll_wait() results (avoids per-call allocation). + internal readonly IntPtr EventsBuf; + + // Max events per epoll_wait() batch. + internal readonly int MaxEvents; + + internal Worker(int idx, int maxEvents, int port, int backlog) + { + Index = idx; + MaxEvents = maxEvents; + + Ep = epoll_create1(EPOLL_CLOEXEC); + if (Ep < 0) + throw new Exception("epoll_create1 failed"); + + ListenFd = OpenReusePortListener(port, backlog); + + // Register our own listen socket (level-triggered: drain accepts each wake). + byte* ev = stackalloc byte[EvSize]; + WriteEpollEvent(ev, EPOLLIN | EPOLLERR | EPOLLHUP, ListenFd); + if (epoll_ctl(Ep, EPOLL_CTL_ADD, ListenFd, (IntPtr)ev) != 0) + throw new Exception("epoll_ctl ADD listen failed"); + + EventsBuf = Marshal.AllocHGlobal(EvSize * MaxEvents); + } + + /// One non-blocking SO_REUSEPORT listener per worker; the kernel balances accepts across them. + private static int OpenReusePortListener(int port, int backlog) + { + int fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP); + if (fd < 0) throw new Exception($"socket failed errno={Marshal.GetLastPInvokeError()}"); + + int one = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, ref one, sizeof(int)); + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, ref one, sizeof(int)); + + int fl = fcntl(fd, F_GETFL, 0); + if (fl >= 0) fcntl(fd, F_SETFL, fl | O_NONBLOCK); + + var addr = new sockaddr_in + { + sin_family = (ushort)AF_INET, + sin_port = Htons((ushort)port), + sin_addr = new in_addr { s_addr = 0 }, // 0.0.0.0 + sin_zero = new byte[8] + }; + if (bind(fd, ref addr, (uint)Marshal.SizeOf()) != 0) + throw new Exception($"bind failed errno={Marshal.GetLastPInvokeError()}"); + if (listen(fd, backlog) != 0) + throw new Exception($"listen failed errno={Marshal.GetLastPInvokeError()}"); + return fd; + } + + public void Dispose() + { + try { if (Ep >= 0) close(Ep); } catch { /* ignore */ } + try { if (ListenFd >= 0) close(ListenFd); } catch { /* ignore */ } + if (EventsBuf != IntPtr.Zero) Marshal.FreeHGlobal(EventsBuf); + } +} diff --git a/frameworks/shrike-minima/HttpProtocol/H1/BinaryH1HeaderData.cs b/frameworks/shrike-minima/HttpProtocol/H1/BinaryH1HeaderData.cs new file mode 100644 index 000000000..4def92192 --- /dev/null +++ b/frameworks/shrike-minima/HttpProtocol/H1/BinaryH1HeaderData.cs @@ -0,0 +1,14 @@ +namespace Shrike; + +public struct BinaryH1HeaderData +{ + public PinnedByteSequence HttpMethod; + + public PinnedByteSequence Route; + + public PinnedByteSequence QueryParameters; + + public bool HasQueryParameters => QueryParameters.Length > 0; + + public PinnedByteSequence Headers; +} \ No newline at end of file diff --git a/frameworks/shrike-minima/HttpProtocol/H1/CachedH1Data.cs b/frameworks/shrike-minima/HttpProtocol/H1/CachedH1Data.cs new file mode 100644 index 000000000..16eeb8eb1 --- /dev/null +++ b/frameworks/shrike-minima/HttpProtocol/H1/CachedH1Data.cs @@ -0,0 +1,38 @@ +namespace Shrike; + +internal static class CachedH1Data +{ + internal static readonly StringCache CachedRoutes + = new(null, 64); + + internal static readonly StringCache CachedQueryKeys + = new(null, 64); + + internal static readonly StringCache CachedHttpMethods + = new([ + "GET", + "POST", + "PUT", + "DELETE", + "PATCH", + "HEAD", + "OPTIONS", + "TRACE"], + 8); + + internal static readonly StringCache CachedHeaderKeys + = new([ + "Host", + "User-Agent", + "Cookie", + "Accept", + "Accept-Language", + "Connection"], + 64); + + internal static readonly StringCache CachedHeaderValues + = new([ + "keep-alive", + "server"], + 64); +} \ No newline at end of file diff --git a/frameworks/shrike-minima/HttpProtocol/H1/H1HeaderData.cs b/frameworks/shrike-minima/HttpProtocol/H1/H1HeaderData.cs new file mode 100644 index 000000000..5467cbcc0 --- /dev/null +++ b/frameworks/shrike-minima/HttpProtocol/H1/H1HeaderData.cs @@ -0,0 +1,26 @@ +namespace Shrike; + +public class H1HeaderData +{ + public string Route { get; internal set; } = null!; + public string HttpMethod { get; internal set; } = null!; + public PooledDictionary QueryParameters { get; } + public PooledDictionary Headers { get; } + + public H1HeaderData() + { + QueryParameters = new PooledDictionary( + capacity: 8, + comparer: StringComparer.OrdinalIgnoreCase); + + Headers = new PooledDictionary( + capacity: 8, + comparer: StringComparer.OrdinalIgnoreCase); + } + + public void Clear() + { + Headers?.Clear(); + QueryParameters?.Clear(); + } +} \ No newline at end of file diff --git a/frameworks/shrike-minima/HttpProtocol/H1/HeaderParsing.cs b/frameworks/shrike-minima/HttpProtocol/H1/HeaderParsing.cs new file mode 100644 index 000000000..f0097b906 --- /dev/null +++ b/frameworks/shrike-minima/HttpProtocol/H1/HeaderParsing.cs @@ -0,0 +1,269 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated + +using System.Text; + +#pragma warning disable CA2014 + +namespace Shrike; + +// Very naive +internal static unsafe class HeaderParsing +{ + /// + /// Finds the first occurrence of CRLFCRLF in a managed byte buffer slice [head..tail). + /// Returns the absolute index (relative to buf) of the '\r' in the sequence, or -1 if not found. + /// + /// + /// Fast path: relies on which is vectorized on modern runtimes. + /// The caller must ensure 0 ≤ head ≤ tail ≤ buf.Length. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static int FindCrlfCrlf(byte[] buf, int head, int tail) + { + int idx = buf.AsSpan(head, tail - head).IndexOf(CrlfCrlf); + return idx >= 0 ? head + idx : -1; + } + + /// + /// Finds CRLFCRLF within an unmanaged region addressed by in [head..tail). + /// Returns the absolute index (relative to the same coordinate system as head/tail) of the '\r' or -1. + /// + /// + /// The caller must guarantee that the memory range [buf+head, buf+tail) is valid and readable. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static int FindCrlfCrlf(byte* buf, int head, int tail) + { + // Construct a Span view over the raw memory. + // The caller must guarantee that (tail - head) bytes are valid and readable. + var span = new ReadOnlySpan(buf + head, tail - head); + + int idx = span.IndexOf(CrlfCrlf); + return idx >= 0 ? head + idx : -1; + } + + /// + /// Same as the unmanaged overload, but also returns a view over [head..tail). + /// Useful to avoid reconstructing the span twice (for parsing the request-line after the sentinel is found). + /// + /// Base pointer to the buffer. + /// Start offset (inclusive). + /// End offset (exclusive). + /// + /// Out: absolute index of the '\r' starting the CRLFCRLF sentinel, or -1 if not found. + /// + /// A span over the provided range [head..tail). + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static ReadOnlySpan FindCrlfCrlf(byte* buf, int head, int tail, ref int idx) + { + // Construct a Span view over the raw memory. + // The caller must guarantee that (tail - head) bytes are valid and readable. + var span = new ReadOnlySpan(buf + head, tail - head); + + idx = span.IndexOf(CrlfCrlf); + if (idx >= 0) + idx += head; + else + idx = -1; + + return span; + } + + /// + /// Extracts and hashes the HTTP request target from the request line (e.g., "GET /path?x=1 HTTP/1.1"). + /// Expects to begin at the start of the request line and contain at least + /// the first CRLF. Throws if the request-line is malformed. + /// + /// + /// Parsing steps: + /// 1) Find the first CRLF to isolate the request line. + /// 2) Find first and second spaces: METHOD SP REQUEST-TARGET SP HTTP-VERSION. + /// 3) Slice the REQUEST-TARGET and hash it with FNV-1a 32-bit. + /// + /// Notes: + /// - The hash is over the raw byte sequence of the target (no decoding / normalization). + /// - Query string is preserved in the slice; callers can change hashing if they want to ignore it. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static uint ExtractRoute(ReadOnlySpan headerSpan) + { + var lineEnd = headerSpan.IndexOf(Crlf); + var firstHeader = headerSpan[..lineEnd]; + + var firstSpace = firstHeader.IndexOf(Space); + if (firstSpace == -1) + throw new InvalidOperationException("Invalid request line"); + + var secondSpaceRelative = firstHeader[(firstSpace + 1)..].IndexOf(Space); + if (secondSpaceRelative == -1) + throw new InvalidOperationException("Invalid request line"); + + var secondSpace = firstSpace + secondSpaceRelative + 1; + + // REQUEST-TARGET slice: may include path + query (e.g., "/foo?bar=baz") + var url = firstHeader[(firstSpace + 1)..secondSpace]; + + return Fnv1a32(url); + } + + internal static BinaryH1HeaderData ExtractBinaryH1HeaderData(ReadOnlySpan headerSpan) + { + var headerData = new BinaryH1HeaderData(); + + var lineEnd = headerSpan.IndexOf(Crlf); + var firstHeader = headerSpan[..lineEnd]; + + var firstSpace = firstHeader.IndexOf(Space); + if (firstSpace == -1) + throw new InvalidOperationException("Invalid request line"); + + headerData.HttpMethod = new PinnedByteSequence(firstHeader[..firstSpace]); + + var secondSpaceRelative = firstHeader[(firstSpace + 1)..].IndexOf(Space); + if (secondSpaceRelative == -1) + throw new InvalidOperationException("Invalid request line"); + + var secondSpace = firstSpace + secondSpaceRelative + 1; + + // REQUEST-TARGET slice: may include path + query (e.g., "/foo?bar=baz") + var url = firstHeader[(firstSpace + 1)..secondSpace]; + + var queryParamSeparator = url.IndexOf(Question); + + if (queryParamSeparator == -1) + { + headerData.Route = new PinnedByteSequence(url); + } + else + { + headerData.Route = new PinnedByteSequence(url[..queryParamSeparator]); + headerData.QueryParameters = new PinnedByteSequence(url[(queryParamSeparator + 1)..]); + } + + // Get the rest of the headers + + headerData.Headers = new PinnedByteSequence(headerSpan[(lineEnd + 2)..]); + + return headerData; + } + + internal static H1HeaderData ExtractH1HeaderData(ReadOnlySpan headerSpan) + { + var headerData = new H1HeaderData(); + + var lineEnd = headerSpan.IndexOf(Crlf); + var firstHeader = headerSpan[..lineEnd]; + + var firstSpace = firstHeader.IndexOf(Space); + if (firstSpace == -1) + throw new InvalidOperationException("Invalid request line"); + + if (CachedH1Data.CachedHttpMethods.TryGetOrAdd(firstHeader[..firstSpace], out var httpMethod)) + { + headerData.HttpMethod = httpMethod; + } + + var secondSpaceRelative = firstHeader[(firstSpace + 1)..].IndexOf(Space); + if (secondSpaceRelative == -1) + throw new InvalidOperationException("Invalid request line"); + + var secondSpace = firstSpace + secondSpaceRelative + 1; + + // REQUEST-TARGET slice: may include path + query (e.g., "/foo?bar=baz") + var url = firstHeader[(firstSpace + 1)..secondSpace]; + + var queryParamSeparator = url.IndexOf(Question); + + if (queryParamSeparator == -1) + { + if (CachedH1Data.CachedHttpMethods.TryGetOrAdd(url, out var route)) + { + headerData.Route = route; + } + } + else + { + if (CachedH1Data.CachedHttpMethods.TryGetOrAdd(url[..queryParamSeparator], out var route)) + { + headerData.Route = route; + } + + var querySpan = url[(queryParamSeparator + 1)..]; + var current = 0; + + + while (current < querySpan.Length) + { + var separator = querySpan[current..].IndexOf(QuerySeparator); // (byte)'&' + ReadOnlySpan pair; + + if (separator == -1) + { + pair = querySpan[current..]; + current = querySpan.Length; + } + else + { + pair = querySpan.Slice(current, separator); + current += separator + 1; + } + + var equalsIndex = pair.IndexOf(Equal); + if (equalsIndex == -1) + break; + + headerData.QueryParameters!.TryAdd(CachedH1Data.CachedQueryKeys.GetOrAdd(pair[..equalsIndex]), + Encoding.UTF8.GetString(pair[(equalsIndex + 1)..])); + } + + // Parse remaining headers + + var lineStart = 0; + while (true) + { + lineStart += lineEnd + 2; + + lineEnd = headerSpan[lineStart..].IndexOf("\r\n"u8); + if (lineEnd == 0) + { + // All Headers read + break; + } + + var header = headerSpan.Slice(lineStart, lineEnd); + var colonIndex = header.IndexOf(Colon); + + if (colonIndex == -1) + { + // Malformed header + continue; + } + + var headerKey = header[..colonIndex]; + var headerValue = header[(colonIndex + 2)..]; + + headerData.Headers!.TryAdd(CachedH1Data.CachedHeaderKeys.GetOrAdd(headerKey), + CachedH1Data.CachedHeaderValues.GetOrAdd(headerValue)); + } + } + + return headerData; + } + + // ===== Common tokens (kept as ReadOnlySpan for zero-allocation literals) ===== + + private static ReadOnlySpan Crlf => "\r\n"u8; + private static ReadOnlySpan CrlfCrlf => "\r\n\r\n"u8; + + // ASCII byte codes (documented for clarity) + private const byte Space = 0x20; // ' ' + private const byte Question = 0x3F; // '?' + private const byte QuerySeparator = 0x26; // '&' + private const byte Equal = 0x3D; // '=' + private const byte Colon = 0x3A; // ':' + private const byte SemiColon = 0x3B; // ';' +} \ No newline at end of file diff --git a/frameworks/shrike-minima/HttpProtocol/H1/StringCache.cs b/frameworks/shrike-minima/HttpProtocol/H1/StringCache.cs new file mode 100644 index 000000000..e7e7c1a17 --- /dev/null +++ b/frameworks/shrike-minima/HttpProtocol/H1/StringCache.cs @@ -0,0 +1,113 @@ +using System.Text; + +namespace Shrike; + +internal class StringCache +{ + private readonly Dictionary _map; + + private readonly Lock _gate = new(); + + public StringCache(List? preCacheableStrings, int capacity = 256) + { + _map = new Dictionary(capacity, PinnedByteSequenceComparer.Instance); + + if (preCacheableStrings is null) + { + return; + } + + foreach (var preCacheableString in preCacheableStrings) + { + Add(preCacheableString); + } + } + + public string? GetOrAdd(ReadOnlySpan bytes) + { + var seq = new PinnedByteSequence(bytes); + + ref var item = ref CollectionsMarshal.GetValueRefOrNullRef(_map, seq); + if (!Unsafe.IsNullRef(ref item)) + { + return item; + } + + // Did not find a value, add it + var value = Encoding.UTF8.GetString(bytes); + if (TryAdd(seq, value)) + { + return value; + } + + return null; + } + + public bool TryGetOrAdd(ReadOnlySpan bytes, out string value) + { + var seq = new PinnedByteSequence(bytes); + + ref var item = ref CollectionsMarshal.GetValueRefOrNullRef(_map, seq); + if (!Unsafe.IsNullRef(ref item)) + { + value = item; + return true; + } + + // Did not find a value, add it + value = Encoding.UTF8.GetString(bytes); + return TryAdd(seq, value); + } + + private bool TryAdd(PinnedByteSequence key, string value) + { + var allocatedKey = AllocateSequence(key); + + lock (_gate) + { + return _map.TryAdd(allocatedKey, value); + } + } + + private unsafe PinnedByteSequence AllocateSequence(PinnedByteSequence sequence) + { + // Allocate pinned unmanaged slab + var ptr = (byte*)NativeMemory.AlignedAlloc((nuint)sequence.Length, 64); + + Buffer.MemoryCopy( + sequence.Ptr, + ptr, + sequence.Length, + sequence.Length); + + return new PinnedByteSequence(ptr, sequence.Length); + } + + private void Add(string item) + { + lock (_gate) + { + var bytes = Encoding.UTF8.GetBytes(item); + var seq = new PinnedByteSequence(bytes); + _map.TryAdd(seq, item); + } + } + + private sealed class PinnedByteSequenceComparer : IEqualityComparer + { + public static readonly PinnedByteSequenceComparer Instance = new(); + + public bool Equals(PinnedByteSequence x, PinnedByteSequence y) + { + return x.AsSpan().SequenceEqual(y.AsSpan()); + } + + public int GetHashCode(PinnedByteSequence mem) + { + var span = mem.AsSpan(); + var h = new HashCode(); + h.AddBytes(span); + return h.ToHashCode(); + } + } +} \ No newline at end of file diff --git a/frameworks/shrike-minima/HttpProtocol/H1/StringCacheMemoryVariant.cs b/frameworks/shrike-minima/HttpProtocol/H1/StringCacheMemoryVariant.cs new file mode 100644 index 000000000..87a99efa4 --- /dev/null +++ b/frameworks/shrike-minima/HttpProtocol/H1/StringCacheMemoryVariant.cs @@ -0,0 +1,72 @@ +using System.Text; + +namespace Shrike; + +internal class StringCacheMemoryVariant +{ + private readonly Dictionary, string> _map; + + private readonly Lock _gate = new(); + + public StringCacheMemoryVariant(List? preCacheableStrings, int capacity = 256) + { + _map = new Dictionary, string>(capacity, ReadOnlyMemoryComparer.Instance); + + if (preCacheableStrings is null) + { + return; + } + + foreach (var preCacheableString in preCacheableStrings) + { + Add(preCacheableString); + } + } + + public bool TryGetOrAdd(ReadOnlyMemory bytes, out string value) + { + ref var item = ref CollectionsMarshal.GetValueRefOrNullRef(_map, bytes); + if (!Unsafe.IsNullRef(ref item)) + { + value = item; + return true; + } + + // Did not find a value, add it + value = Encoding.UTF8.GetString(bytes.Span); + return TryAdd(bytes, value); + } + + private bool TryAdd(ReadOnlyMemory key, string value) + { + lock (_gate) + { + return _map.TryAdd(key, value); + } + } + + private void Add(string item) + { + lock (_gate) + { + var bytes = Encoding.UTF8.GetBytes(item); + _map.TryAdd(bytes, item); + } + } + + private sealed class ReadOnlyMemoryComparer : IEqualityComparer> + { + public static readonly ReadOnlyMemoryComparer Instance = new(); + + public bool Equals(ReadOnlyMemory x, ReadOnlyMemory y) + => x.Span.SequenceEqual(y.Span); + + public int GetHashCode(ReadOnlyMemory mem) + { + var span = mem.Span; + var h = new HashCode(); + h.AddBytes(span); + return h.ToHashCode(); + } + } +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Program.cs b/frameworks/shrike-minima/Program.cs new file mode 100644 index 000000000..8e55291a8 --- /dev/null +++ b/frameworks/shrike-minima/Program.cs @@ -0,0 +1,312 @@ +using System.Buffers.Text; +using Shrike; + +// ReSharper disable SuggestVarOrType_BuiltInTypes + +/// +/// shrike-minima — an epoll engine with an IVTS-backed, RCA=true async handler loop, +/// using Minima's SPSC recv handoff: the worker thread recv's into POOLED buffers and +/// enqueues them on a per-connection SPSC ring; the per-connection handler resumes on +/// the THREAD POOL, dequeues the chunks, copies them into its own parse buffer, and +/// returns the buffers to the pool. The worker and handler never share a buffer, so +/// there is no driver/handler race — and recv can pipeline with parse (the worker keeps +/// pumping while the handler works). FlushAsync sends directly, so the off-worker +/// handler needs no handoff. A fully async-work-ready architecture. +/// +/// Serves the H1-isolated profiles: baseline, pipelined, limited-conn. +/// GET/POST /baseline11?a=&b= -> text/plain a + b (+ body) +/// GET /pipeline -> text/plain ok +/// +internal static class Program +{ + [System.Runtime.InteropServices.DllImport("libc", SetLastError = true)] + private static extern int shutdown(int fd, int how); + private const int SHUT_WR = 1; + + private static void Main() + { + int port = 8080; + if (int.TryParse(Environment.GetEnvironmentVariable("SHRIKE_PORT"), out int p) && p > 0) + port = p; + + int workers = Math.Max(1, Environment.ProcessorCount / 2); + if (int.TryParse(Environment.GetEnvironmentVariable("SHRIKE_WORKERS"), out int w) && w > 0) + workers = w; + + ShrikeEngine.CreateBuilder() + .SetPort(port) + .SetBacklog(16384) + .SetMaxEventsPerWake(512) + .SetMaxNumberConnectionsPerWorker(8192) + .SetSlabSizes(64 * 1024, 32 * 1024) // parse buffer holds a 16K recv chunk + partial + .SetNWorkersSolver(() => workers) + .InjectHandler(HandleAsync) + .Build() + .Run(); + } + + /// + /// Per-connection handler. RCA=true → each await resumes on the thread pool. + /// ReadAsync waits for a recv signal; the handler then drains the SPSC ring (copying + /// chunks into its own parse buffer) and parses. Only this thread touches the buffer. + /// + private static async Task HandleAsync(Connection conn) + { + while (true) + { + if (await conn.ReadAsync()) // wait for a recv signal (true => peer closed) + return; + + bool wrote = DrainRing(conn, out bool close); + + if (wrote) + await conn.FlushAsync(); + + if (close) + { + // Connection: close — half-close to send a FIN now (the worker + // recycles the fd on the peer's EPOLLRDHUP). The handler thread + // can issue this directly (epoll, thread-safe), like the send(). + shutdown(conn.Fd, SHUT_WR); + return; + } + } + } + + /// + /// Dequeue every recv chunk the worker handed over, copy it into the connection's + /// own parse buffer (returning the pooled buffer), and parse complete requests after + /// each chunk. Only the handler thread touches the parse buffer. + /// + private static unsafe bool DrainRing(Connection conn, out bool close) + { + close = false; + bool wrote = false; + int cap = conn.InCapacity; + while (conn.RecvRing.TryDequeue(out byte* ptr, out int len)) + { + if (conn.Tail + len > cap) { RecvPool.Return(ptr); close = true; break; } // oversized request + Buffer.MemoryCopy(ptr, conn.ReceiveBuffer + conn.Tail, cap - conn.Tail, len); + conn.Tail += len; + RecvPool.Return(ptr); + wrote |= DrainRequests(conn, out close); // parse + compact after each chunk + if (close) break; + } + while (conn.RecvRing.TryDequeue(out byte* p, out _)) RecvPool.Return(p); // return any leftovers + return wrote; + } + + /// Parse every complete request in the recv window, write each response into the write buffer. + private static unsafe bool DrainRequests(Connection conn, out bool close) + { + close = false; + bool wrote = false; + int pos = conn.Head; + while (pos < conn.Tail) + { + var buf = new ReadOnlySpan(conn.ReceiveBuffer + pos, conn.Tail - pos); + int consumed = ParseOne(buf, conn.WriteBuffer, out bool reqClose); + if (consumed == 0) break; // incomplete request — wait for more + if (consumed < 0) { close = true; break; } + pos += consumed; + wrote = true; + if (reqClose) { close = true; break; } + } + conn.Head = pos; + conn.Compact(); // reclaim consumed bytes, slide partial to front + return wrote; + } + + /// Parse one request from , write its response into . + /// Returns bytes consumed, 0 if incomplete, -1 on error/no room. + private static int ParseOne(ReadOnlySpan buf, FixedBufferWriter wb, out bool close) + { + close = false; + int he = buf.IndexOf("\r\n\r\n"u8); + if (he < 0) return 0; + ReadOnlySpan head = buf[..he]; + + int rlEnd = head.IndexOf("\r\n"u8); + if (rlEnd < 0) rlEnd = head.Length; + ReadOnlySpan reqLine = head[..rlEnd]; + + ReadOnlySpan target = default; + int sp1 = reqLine.IndexOf((byte)' '); + if (sp1 >= 0) + { + ReadOnlySpan rest = reqLine[(sp1 + 1)..]; + int sp2 = rest.IndexOf((byte)' '); + target = sp2 >= 0 ? rest[..sp2] : rest; + } + + int contentLength = -1; + bool chunked = false; + bool reqClose = false; + ReadOnlySpan hdrs = head[Math.Min(rlEnd + 2, head.Length)..]; + while (hdrs.Length > 0) + { + int nl = hdrs.IndexOf("\r\n"u8); + ReadOnlySpan line = nl >= 0 ? hdrs[..nl] : hdrs; + int colon = line.IndexOf((byte)':'); + if (colon >= 0) + { + ReadOnlySpan name = line[..colon]; + ReadOnlySpan val = Trim(line[(colon + 1)..]); + if (CiEq(name, "content-length"u8)) { if (Utf8Parser.TryParse(val, out int cl, out _)) contentLength = cl; } + else if (CiEq(name, "transfer-encoding"u8) && CiContains(val, "chunked"u8)) chunked = true; + else if (CiEq(name, "connection"u8) && CiEq(val, "close"u8)) reqClose = true; + } + if (nl < 0) break; + hdrs = hdrs[(nl + 2)..]; + } + + int bodyStart = he + 4; + long bodyInt; + int total; + if (chunked) + { + if (!DecodeChunked(buf[bodyStart..], out bodyInt, out int used)) return 0; + total = bodyStart + used; + } + else if (contentLength > 0) + { + if (buf.Length < bodyStart + contentLength) return 0; + bodyInt = ParseLoose(buf.Slice(bodyStart, contentLength)); + total = bodyStart + contentLength; + } + else { bodyInt = 0; total = bodyStart; } + + Span w = wb.GetSpan(256); + int pos = 0; + if (!Respond(w, ref pos, target, bodyInt, reqClose)) return -1; + wb.Advance(pos); + close = reqClose; + return total; + } + + private static bool Respond(Span w, ref int pos, ReadOnlySpan target, long bodyInt, bool close) + { + int q = target.IndexOf((byte)'?'); + ReadOnlySpan path = q >= 0 ? target[..q] : target; + ReadOnlySpan query = q >= 0 ? target[(q + 1)..] : default; + + if (path.SequenceEqual("/pipeline"u8)) + return WriteText(w, ref pos, "ok"u8, close); + + long sum = SumAB(query) + bodyInt; + Span num = stackalloc byte[24]; + Utf8Formatter.TryFormat(sum, num, out int n); + return WriteText(w, ref pos, num[..n], close); + } + + private static bool WriteText(Span w, ref int pos, ReadOnlySpan body, bool close) + { + if (w.Length - pos < body.Length + 96) return false; + Wr(w, ref pos, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: "u8); + WrInt(w, ref pos, body.Length); + Wr(w, ref pos, close ? "\r\nConnection: close\r\n\r\n"u8 : "\r\n\r\n"u8); + Wr(w, ref pos, body); + return true; + } + + private static void Wr(Span w, ref int pos, ReadOnlySpan src) { src.CopyTo(w[pos..]); pos += src.Length; } + private static void WrInt(Span w, ref int pos, int v) { Utf8Formatter.TryFormat(v, w[pos..], out int n); pos += n; } + + private static long SumAB(ReadOnlySpan query) + { + long a = 0, b = 0; + while (query.Length > 0) + { + int amp = query.IndexOf((byte)'&'); + ReadOnlySpan kv = amp >= 0 ? query[..amp] : query; + int eq = kv.IndexOf((byte)'='); + if (eq >= 0) + { + ReadOnlySpan k = kv[..eq]; + if (k.SequenceEqual("a"u8)) a = ParseLoose(kv[(eq + 1)..]); + else if (k.SequenceEqual("b"u8)) b = ParseLoose(kv[(eq + 1)..]); + } + if (amp < 0) break; + query = query[(amp + 1)..]; + } + return a + b; + } + + private static bool DecodeChunked(ReadOnlySpan buf, out long bodyInt, out int used) + { + bodyInt = 0; used = 0; + Span body = stackalloc byte[256]; + int blen = 0, pos = 0; + while (true) + { + int nl = buf[pos..].IndexOf("\r\n"u8); + if (nl < 0) return false; + if (!ParseHex(buf.Slice(pos, nl), out int size)) return false; + pos += nl + 2; + if (size == 0) + { + int end = buf[pos..].IndexOf("\r\n"u8); + if (end < 0) return false; + used = pos + end + 2; + bodyInt = ParseLoose(body[..blen]); + return true; + } + if (buf.Length < pos + size + 2) return false; + if (blen + size <= body.Length) { buf.Slice(pos, size).CopyTo(body[blen..]); blen += size; } + pos += size; + if (!buf.Slice(pos, 2).SequenceEqual("\r\n"u8)) return false; + pos += 2; + } + } + + private static ReadOnlySpan Trim(ReadOnlySpan b) + { + int s = 0, e = b.Length; + while (s < e && (b[s] == (byte)' ' || b[s] == (byte)'\t')) s++; + while (e > s && (b[e - 1] == (byte)' ' || b[e - 1] == (byte)'\t')) e--; + return b[s..e]; + } + + private static bool CiEq(ReadOnlySpan a, ReadOnlySpan b) + { + if (a.Length != b.Length) return false; + for (int i = 0; i < a.Length; i++) if (Low(a[i]) != Low(b[i])) return false; + return true; + } + + private static bool CiContains(ReadOnlySpan h, ReadOnlySpan n) + { + if (n.Length == 0 || h.Length < n.Length) return false; + for (int i = 0; i + n.Length <= h.Length; i++) if (CiEq(h.Slice(i, n.Length), n)) return true; + return false; + } + + private static byte Low(byte c) => (byte)(c >= 'A' && c <= 'Z' ? c + 32 : c); + + private static long ParseLoose(ReadOnlySpan s) + { + int i = 0; + while (i < s.Length && (s[i] == ' ' || s[i] == '\t' || s[i] == '\r' || s[i] == '\n')) i++; + bool neg = false; + if (i < s.Length && s[i] == '-') { neg = true; i++; } + long n = 0; + while (i < s.Length && s[i] >= '0' && s[i] <= '9') { n = n * 10 + (s[i] - '0'); i++; } + return neg ? -n : n; + } + + private static bool ParseHex(ReadOnlySpan b, out int val) + { + val = 0; bool any = false; + foreach (byte c in b) + { + int d; + if (c >= '0' && c <= '9') d = c - '0'; + else if (c >= 'a' && c <= 'f') d = c - 'a' + 10; + else if (c >= 'A' && c <= 'F') d = c - 'A' + 10; + else if (c == ';' || c == ' ') break; + else return any; + val = val * 16 + d; any = true; + } + return any; + } +} diff --git a/frameworks/shrike-minima/README.md b/frameworks/shrike-minima/README.md new file mode 100644 index 000000000..fe8fbfe6c --- /dev/null +++ b/frameworks/shrike-minima/README.md @@ -0,0 +1,38 @@ +# shrike-minima + +An **epoll** engine with an IVTS-backed, **`RunContinuationsAsynchronously = true`** +async handler loop, fixed the **Minima way**: an **SPSC recv ring** decouples the +worker from the handler. + +## The model (why there is no race) + +- The **worker** recv's into **pooled buffers** and **enqueues** `(ptr, len)` onto a + per-connection **single-producer / single-consumer ring** (`SpscRing`), then signals. +- The **handler** resumes on the **thread pool**, **dequeues** the chunks, **copies** + each into its own parse buffer, **returns** the buffer to the pool, and parses. + +The worker and handler touch **disjoint ends** of the ring (tail / head, release/acquire), +and each recv buffer is owned by one side at a time — so the recv buffer is **never +shared**, and there's no driver/handler data race. Unlike the Tokio-style sibling +(`shrike-tokio`, where the handler does its own `recv`), here **recv pipelines with +parse**: the worker keeps pumping recv into the ring while the handler is still parsing. +The cost is one extra copy (chunk → parse buffer) plus the pool/ring bookkeeping. + +This is the epoll analogue of Minima's `SpscRecvRing` (io_uring provided buffers there; +pooled `recv()` buffers here). + +## Handler (`Program.cs`) + +Hand-rolled HTTP/1.1 over the parse buffer: request line, `Content-Length` and chunked +bodies, keep-alive, pipelining, fragmented-read reassembly. `Connection: close` sends a +FIN via `shutdown(SHUT_WR)`. + +| Endpoint | Response | +|---|---| +| `GET/POST /baseline11?a=&b=` | `text/plain` — `a + b` (+ POST body) | +| `GET /pipeline` | `text/plain` — `ok` | + +## Tests + +`baseline`, `pipelined`, `limited-conn`. epoll (not io_uring). `SHRIKE_PORT` / +`SHRIKE_WORKERS` override for local runs. diff --git a/frameworks/shrike-minima/SerializableObjects/JsonMessage.cs b/frameworks/shrike-minima/SerializableObjects/JsonMessage.cs new file mode 100644 index 000000000..a68d3fecc --- /dev/null +++ b/frameworks/shrike-minima/SerializableObjects/JsonMessage.cs @@ -0,0 +1,14 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + +namespace Shrike; + +public struct JsonMessage { public string Message { get; set; } } + +[JsonSourceGenerationOptions(GenerationMode = JsonSourceGenerationMode.Serialization | JsonSourceGenerationMode.Metadata)] +[JsonSerializable(typeof(JsonMessage))] +public partial class JsonContext : JsonSerializerContext { } \ No newline at end of file diff --git a/frameworks/shrike-minima/Utilities/DateHelper.cs b/frameworks/shrike-minima/Utilities/DateHelper.cs new file mode 100644 index 000000000..920b2866a --- /dev/null +++ b/frameworks/shrike-minima/Utilities/DateHelper.cs @@ -0,0 +1,64 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated + +#pragma warning disable CA2014 + +namespace Shrike; + +// Stolen from asp net core platform benchmark :D +// TODO: Try a different approach + +/// +/// Manages the generation of the date header value. +/// +public static class DateHelper +{ + private const int PrefixLength = 6; // "Date: ".Length + private const int DateTimeRLength = 29; // Wed, 14 Mar 2018 14:20:00 GMT + private const int SuffixLength = 2; // crlf + private const int SuffixIndex = DateTimeRLength + PrefixLength; + + private static readonly Timer STimer = new((s) => { + SetDateValues(DateTimeOffset.UtcNow); + }, null, 1000, 1000); + + private static byte[] _sHeaderBytesMaster = new byte[PrefixLength + DateTimeRLength + 2 * SuffixLength]; + private static byte[] _sHeaderBytesScratch = new byte[PrefixLength + DateTimeRLength + 2 * SuffixLength]; + + static DateHelper() + { + var utf8 = "Date: "u8; + + utf8.CopyTo(_sHeaderBytesMaster); + utf8.CopyTo(_sHeaderBytesScratch); + _sHeaderBytesMaster[SuffixIndex] = (byte)'\r'; + _sHeaderBytesMaster[SuffixIndex + 1] = (byte)'\n'; + _sHeaderBytesMaster[SuffixIndex + 2] = (byte)'\r'; + _sHeaderBytesMaster[SuffixIndex + 3] = (byte)'\n'; + _sHeaderBytesScratch[SuffixIndex] = (byte)'\r'; + _sHeaderBytesScratch[SuffixIndex + 1] = (byte)'\n'; + _sHeaderBytesScratch[SuffixIndex + 2] = (byte)'\r'; + _sHeaderBytesScratch[SuffixIndex + 3] = (byte)'\n'; + + SetDateValues(DateTimeOffset.UtcNow); + SyncDateTimer(); + } + + private static void SyncDateTimer() => STimer.Change(1000, 1000); + public static ReadOnlySpan HeaderBytes => _sHeaderBytesMaster; + + private static void SetDateValues(DateTimeOffset value) + { + lock (_sHeaderBytesScratch) + { + if (!Utf8Formatter.TryFormat(value, _sHeaderBytesScratch.AsSpan(PrefixLength), out var written, 'R')) + throw new Exception("date time format failed"); + + //Debug.Assert(written == dateTimeRLength); + (_sHeaderBytesScratch, _sHeaderBytesMaster) = (_sHeaderBytesMaster, _sHeaderBytesScratch); + } + } +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Utilities/HashUtils.cs b/frameworks/shrike-minima/Utilities/HashUtils.cs new file mode 100644 index 000000000..dd83e1397 --- /dev/null +++ b/frameworks/shrike-minima/Utilities/HashUtils.cs @@ -0,0 +1,93 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated + +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// Provides extremely lightweight hashing utilities optimized for short, hot-path inputs +/// (e.g. HTTP routes, header names, or method tokens). +/// +internal static class HashUtils +{ + /// + /// Computes a 32-bit FNV-1a (Fowler–Noll–Vo) hash of the given byte span. + /// + /// + /// FNV-1a is a simple, fast, non-cryptographic hash function designed for small inputs + /// and stable distribution. + /// + /// Formula: + /// + /// h = 2166136261 + /// for each byte b in data: + /// h = (h XOR b) * 16777619 + /// + /// + /// Characteristics: + /// • 32-bit unsigned integer output + /// • Deterministic and endian-independent + /// • Good avalanche behavior for short ASCII inputs + /// • Not suitable for security-critical use (collision-prone vs. modern hashes) + /// + /// Input data to hash (typically a small UTF-8 slice). + /// 32-bit unsigned FNV-1a hash of . + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static uint Fnv1a32(ReadOnlySpan data) + { + const uint offset = 2166136261u; + const uint prime = 16777619u; + uint h = offset; + + for (int i = 0; i < data.Length; i++) + { + h ^= data[i]; + h *= prime; + } + + return h; + } + + /// + /// Computes an 8-bit FNV-1a hash of the given byte span. + /// + /// + /// Derived from the 32-bit version but truncated to 8 bits (mod 256). + /// This variant is extremely small and fast — ideal for quick indexing or hashing + /// into small tables, but collisions are frequent due to the 1-byte range. + /// + /// Formula: + /// + /// h = 0xA3 + /// for each byte b in data: + /// h = (h XOR b) * 0x9B + /// return h + /// + /// + /// Characteristics: + /// • Output range: 0–255 + /// • Very lightweight; no heap allocations + /// • Not stable for large datasets (collisions expected) + /// + /// Input data to hash. + /// 8-bit FNV-1a hash value. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static byte Fnv1a8(ReadOnlySpan data) + { + const byte offset = 0xA3; // 163 decimal + const byte prime = 0x9B; // 155 decimal + byte h = offset; + + for (int i = 0; i < data.Length; i++) + { + h ^= data[i]; + unchecked { h = (byte)(h * prime); } + } + + return h; + } +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Utilities/PinnedByteSequence.cs b/frameworks/shrike-minima/Utilities/PinnedByteSequence.cs new file mode 100644 index 000000000..551ec0482 --- /dev/null +++ b/frameworks/shrike-minima/Utilities/PinnedByteSequence.cs @@ -0,0 +1,39 @@ +namespace Shrike; + +public readonly unsafe struct PinnedByteSequence : IEquatable +{ + private readonly byte* _ptr { get; } + + public readonly byte* Ptr => _ptr; + + public int Length { get; } + + public PinnedByteSequence(ReadOnlySpan span) + { + _ptr = (byte*)Unsafe.AsPointer(ref MemoryMarshal.GetReference(span)); + Length = span.Length; + } + + public PinnedByteSequence(byte* ptr, int length) + { + _ptr = ptr; + Length = length; + } + + public unsafe ReadOnlySpan AsSpan() => new(_ptr, Length); + + public bool Equals(PinnedByteSequence other) + { + return _ptr == other._ptr && Length == other.Length; + } + + public override bool Equals(object? obj) + { + return obj is PinnedByteSequence other && Equals(other); + } + + public override int GetHashCode() + { + return HashCode.Combine(unchecked((int)(long)_ptr), Length); + } +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Utilities/PooledDictionary.cs b/frameworks/shrike-minima/Utilities/PooledDictionary.cs new file mode 100644 index 000000000..fc32b089b --- /dev/null +++ b/frameworks/shrike-minima/Utilities/PooledDictionary.cs @@ -0,0 +1,285 @@ +using System.Collections; + +namespace Shrike; + +/// +/// Represents a high-performance, pooled dictionary that minimizes allocations by renting internal arrays from . +/// This structure is optimized for small, short-lived dictionaries such as HTTP headers or per-request state. +/// +/// The type of keys in the dictionary. Must implement . +/// The type of values in the dictionary. +public class PooledDictionary : IDictionary, IReadOnlyDictionary, IEnumerator> where TKey : IEquatable +{ + private static readonly ArrayPool> Pool = ArrayPool>.Shared; + + private short _enumerator = -1; + private ushort _index; + + private KeyValuePair[]? _entries; + + private readonly IEqualityComparer _comparer; + + #region Get-/Setters + + private KeyValuePair[] Entries => _entries ??= Pool.Rent(Capacity); + + private bool HasEntries => _entries is not null; + + public virtual TValue this[TKey key] + { + get + { + if (HasEntries) + { + for (var i = 0; i < _index; i++) + { + if (_comparer.Equals(Entries[i].Key, key)) + { + return Entries[i].Value; + } + } + } + + throw new KeyNotFoundException(); + } + set + { + if (HasEntries) + { + for (var i = 0; i < _index; i++) + { + if (_comparer.Equals(Entries[i].Key, key)) + { + Entries[i] = new KeyValuePair(key, value); + return; + } + } + } + + Add(key, value); + } + } + + public ICollection Keys + { + get + { + var result = new List(_index); + + if (HasEntries) + { + for (var i = 0; i < _index; i++) + { + result.Add(Entries[i].Key); + } + } + + return result; + } + } + + public ICollection Values + { + get + { + var result = new List(_index); + + if (HasEntries) + { + for (var i = 0; i < _index; i++) + { + result.Add(Entries[i].Value); + } + } + + return result; + } + } + + public int Count => _index; + + public bool IsReadOnly => false; + + IEnumerable IReadOnlyDictionary.Keys => Keys; + + IEnumerable IReadOnlyDictionary.Values => Values; + + public KeyValuePair Current => Entries[_enumerator]; + + object IEnumerator.Current => Entries[_enumerator]; + + public int Capacity { get; private set; } + + #endregion + + #region Initialization + + public PooledDictionary() : this(4, EqualityComparer.Default) + { + + } + + public PooledDictionary(int capacity, IEqualityComparer comparer) + { + Capacity = capacity; + + _comparer = comparer; + } + + #endregion + + #region Functionality + + public virtual void Add(TKey key, TValue value) + { + CheckResize(); + Entries[_index++] = new KeyValuePair(key, value); + } + + public virtual void Add(KeyValuePair item) + { + CheckResize(); + Entries[_index++] = item; + } + + public void Clear() + { + _index = 0; + } + + public bool Contains(KeyValuePair item) + { + if (HasEntries) + { + for (var i = 0; i < _index; i++) + { + if (_comparer.Equals(Entries[i].Key, item.Key)) + { + return true; + } + } + } + + return false; + } + + public bool ContainsKey(TKey key) + { + if (HasEntries) + { + for (var i = 0; i < _index; i++) + { + if (_comparer.Equals(Entries[i].Key, key)) + { + return true; + } + } + } + + return false; + } + + public void CopyTo(KeyValuePair[] array, int arrayIndex) + { + throw new NotSupportedException(); + } + + public IEnumerator> GetEnumerator() + { + _enumerator = -1; + return this; + } + + public bool Remove(TKey key) => throw new NotSupportedException(); + + public bool Remove(KeyValuePair item) => throw new NotSupportedException(); + + public bool TryGetValue(TKey key, out TValue value) + { + if (ContainsKey(key)) + { + value = this[key]; + return true; + } + +#pragma warning disable CS8653, CS8601 + value = default; +#pragma warning restore + + return false; + } + + IEnumerator IEnumerable.GetEnumerator() => this; + + public bool MoveNext() + { + _enumerator++; + return _enumerator < _index; + } + + public void Reset() + { + _enumerator = -1; + } + + private void CheckResize() + { + if (_index >= Entries.Length) + { + var oldEntries = Entries; + + try + { + if (oldEntries.Length > Capacity) + { + Capacity = oldEntries.Length * 2; + } + else + { + Capacity *= 2; + } + + _entries = Pool.Rent(Capacity); + + for (var i = 0; i < _index; i++) + { + Entries[i] = oldEntries[i]; + } + } + finally + { + Pool.Return(oldEntries); + } + } + } + + #endregion + + #region IDisposable Support + + private bool _disposed; + + private void Dispose(bool disposing) + { + if (_disposed) + return; + + if (disposing) + { + if (HasEntries) + { + Pool.Return(Entries); + } + } + + _disposed = true; + } + + public void Dispose() + { + Dispose(true); + } + + #endregion + +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Utilities/UnmanagedMemoryManager.cs b/frameworks/shrike-minima/Utilities/UnmanagedMemoryManager.cs new file mode 100644 index 000000000..0bb447df3 --- /dev/null +++ b/frameworks/shrike-minima/Utilities/UnmanagedMemoryManager.cs @@ -0,0 +1,83 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// Provides a and abstraction +/// over a block of unmanaged memory, without taking ownership of that memory. +/// +/// This class allows interop scenarios where a pointer +/// is obtained externally (for example, via malloc, native buffers, +/// or stackalloc) and must be safely exposed as +/// to .NET APIs that expect it. +/// +/// +/// Important: This class does not allocate or free the unmanaged memory. +/// The caller is fully responsible for ensuring that the pointer remains valid +/// for the lifetime of the instance. +/// +/// +public sealed unsafe class UnmanagedMemoryManager : MemoryManager +{ + private readonly byte* _ptr; + private readonly int _length; + + /// + /// Initializes a new instance of the class + /// over an existing unmanaged memory block. + /// + /// A pointer to the start of the unmanaged memory block. + /// The length of the memory block, in bytes. + /// Thrown if is negative. + /// Thrown if is . + public UnmanagedMemoryManager(byte* ptr, int length) + { + if (ptr == null) + throw new ArgumentNullException(nameof(ptr)); + + if (length < 0) + throw new ArgumentOutOfRangeException(nameof(length)); + + _ptr = ptr; + _length = length; + } + + /// + /// Returns a representing the unmanaged memory. + /// + /// + /// A starting at the unmanaged memory address + /// and covering bytes. + /// + public override Span GetSpan() => new(_ptr, _length); + + /// + /// Pins the unmanaged memory and returns a handle to it. + /// Since this memory is already unmanaged, pinning is a no-op. + /// + /// An optional offset, in bytes, from the start of the buffer. + /// + /// A pointing directly to the unmanaged buffer + /// at _ptr + elementIndex. + /// + public override MemoryHandle Pin(int elementIndex = 0) => new MemoryHandle(_ptr + elementIndex); + + /// + /// Unpins the memory. This is a no-op because unmanaged memory cannot be moved by the GC. + /// + public override void Unpin() { } + + /// + /// Releases resources used by this . + /// Since this class does not own the unmanaged memory, this method does nothing. + /// + /// + /// if called from ; otherwise . + /// + protected override void Dispose(bool disposing) { } +} diff --git a/frameworks/shrike-minima/Writers/FixedBufferWriter.cs b/frameworks/shrike-minima/Writers/FixedBufferWriter.cs new file mode 100644 index 000000000..dd833977c --- /dev/null +++ b/frameworks/shrike-minima/Writers/FixedBufferWriter.cs @@ -0,0 +1,129 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated + +using System.Text; + +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// A high-performance, *unmanaged* buffer writer designed for scenarios where +/// zero allocations and deterministic memory layout are critical. +/// +/// This struct provides a writable view over a fixed memory region (provided as +/// a raw pointer). It does not own or allocate memory itself +/// — unless the caller provides it one that was manually allocated, in which +/// case can free it if desired. +/// +/// The typical use case is for writing binary or HTTP data directly into a +/// pre-allocated unmanaged buffer (e.g., a native slab per connection) without +/// heap allocations or GC involvement. +/// +[SkipLocalsInit] +public unsafe class FixedBufferWriter : IUnmanagedBufferWriter, IBufferWriter, IDisposable +{ + private readonly int _capacity; + private readonly UnmanagedMemoryManager _manager; + public int Head; + public int Tail; + + public byte* Ptr { get; } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public FixedBufferWriter(byte* ptr, int capacity) + { + Ptr = ptr; + _capacity = capacity; + Head = 0; + Tail = 0; + + _manager = new UnmanagedMemoryManager(ptr, capacity); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Reset() + { + Head = 0; + Tail = 0; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Advance(int count) + { + //Volatile.Write(ref Tail, Tail + count); + Tail += count; + } + + public Memory GetMemory(int sizeHint = 0) + { + int remaining = _capacity - Tail; + if (sizeHint > remaining) + { + throw new InvalidOperationException("Buffer too small."); + } + + return _manager.Memory.Slice(Tail, remaining); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public byte* GetPointer() => Ptr; + + public Span GetSpan(int sizeHint = 0) + { + if (Tail + sizeHint > _capacity) + { + throw new InvalidOperationException("Buffer too small."); + } + + return new Span(Ptr + Tail, _capacity - Tail); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteUnmanaged(ReadOnlySpan source) + { + int len = source.Length; + if (Tail + len > _capacity) + { + throw new InvalidOperationException("Buffer too small."); + } + + fixed (byte* src = source) + { + Buffer.MemoryCopy(src, Ptr + Tail, _capacity - Tail, len); + } + + Tail += len; + } + + public void WriteUnmanaged(string source) + { + var span = new Span(Ptr + Tail, _capacity - Tail); + var bytesWritten = Encoding.UTF8.GetBytes(source, span); + Tail += bytesWritten; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Write(ReadOnlySpan source) + { + int len = source.Length; + if (Tail + len > _capacity) + { + throw new InvalidOperationException("Buffer too small."); + } + + source.CopyTo(new Span(Ptr + Tail, _capacity - Tail)); + Tail += len; + } + + public void Dispose() + { + if (Ptr != null) + { + NativeMemory.AlignedFree(Ptr); + } + } +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Writers/ISpanWriter.cs b/frameworks/shrike-minima/Writers/ISpanWriter.cs new file mode 100644 index 000000000..6c70fb861 --- /dev/null +++ b/frameworks/shrike-minima/Writers/ISpanWriter.cs @@ -0,0 +1,20 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + +namespace Shrike; + +internal interface ISpanWriter +{ + /// Notifies the that data items were written to the output or . + /// The number of data items written to the . + void Advance(int count); + + /// Returns a to write to that is at least the requested size (specified by ). + /// The minimum length of the returned . If 0, a non-empty buffer is returned. + /// A of at least the size . If is 0, returns a non-empty buffer. + Span GetSpan(int sizeHint = 0); +} \ No newline at end of file diff --git a/frameworks/shrike-minima/Writers/IUnmanagedBufferWriter.cs b/frameworks/shrike-minima/Writers/IUnmanagedBufferWriter.cs new file mode 100644 index 000000000..b8e2187e2 --- /dev/null +++ b/frameworks/shrike-minima/Writers/IUnmanagedBufferWriter.cs @@ -0,0 +1,40 @@ +// ReSharper disable always CheckNamespace +// ReSharper disable always SuggestVarOrType_BuiltInTypes +// (var is avoided intentionally in this project so that concrete types are visible at call sites.) +// ReSharper disable always StackAllocInsideLoop +// ReSharper disable always ClassCannotBeInstantiated +#pragma warning disable CA2014 + +namespace Shrike; + +/// +/// Minimal contract for writing to a caller-provided unmanaged, contiguous buffer. +/// Intended for high-performance I/O/serialization without GC pinning. +/// +/// Unmanaged element type (e.g., byte). +internal unsafe interface IUnmanagedBufferWriter where T : unmanaged +{ + /// + /// Advance the logical write cursor by elements + /// after data was written directly into the buffer. + /// + void Advance(int count); + + /// + /// Base pointer to the buffer. Valid only while the writer is alive. + /// Callers that write via this pointer must also call . + /// + T* GetPointer(); + + /// + /// Copy into the buffer and advance the cursor. + /// Throw if it would exceed capacity. + /// + void Write(ReadOnlySpan source); + + /// + /// Like , but allows an implementation + /// to use an unsafe/fast path (e.g., MemoryCopy). Must still enforce bounds. + /// + void WriteUnmanaged(ReadOnlySpan source); +} diff --git a/frameworks/shrike-minima/_usings.cs b/frameworks/shrike-minima/_usings.cs new file mode 100644 index 000000000..62ad342ff --- /dev/null +++ b/frameworks/shrike-minima/_usings.cs @@ -0,0 +1,15 @@ +global using System; +global using System.Buffers; +global using System.Buffers.Binary; +global using System.Buffers.Text; +global using System.Collections.Concurrent; +global using System.Runtime.CompilerServices; +global using System.Runtime.InteropServices; +global using System.Text.Json.Serialization; + +global using Microsoft.Extensions.ObjectPool; + +global using static Shrike.ProcessorArchDependant; +global using static Shrike.Native; +global using static Shrike.HeaderParsing; +global using static Shrike.HashUtils; \ No newline at end of file diff --git a/frameworks/shrike-minima/meta.json b/frameworks/shrike-minima/meta.json new file mode 100644 index 000000000..2fa5d03dc --- /dev/null +++ b/frameworks/shrike-minima/meta.json @@ -0,0 +1,15 @@ +{ + "display_name": "shrike-minima", + "language": "C#", + "type": "engine", + "engine": "epoll", + "description": "An epoll engine with an IVTS-backed, RunContinuationsAsynchronously=true async handler loop, using Minima's SPSC recv handoff: the worker recv's into pooled buffers and enqueues them on a per-connection single-producer/single-consumer ring; the handler resumes on the thread pool, dequeues the chunks into its own parse buffer, and returns the buffers. Worker and handler never share a buffer (no driver/handler race), and recv pipelines with parse. FlushAsync does a thread-safe send() directly, so an off-worker handler responds without a handoff. Per-worker SO_REUSEPORT, pooled connections, native slab buffers, hand-rolled HTTP/1.1.", + "repo": "", + "enabled": true, + "tests": [ + "baseline", + "pipelined", + "limited-conn" + ], + "maintainers": [] +} diff --git a/frameworks/shrike-minima/shrike-minima.csproj b/frameworks/shrike-minima/shrike-minima.csproj new file mode 100644 index 000000000..5c8e301a7 --- /dev/null +++ b/frameworks/shrike-minima/shrike-minima.csproj @@ -0,0 +1,20 @@ + + + + Exe + net10.0 + enable + enable + true + Shrike + shrike-minima + true + true + true + + + + + + +