diff --git a/src/main.zig b/src/main.zig index c82074e..413bed9 100644 --- a/src/main.zig +++ b/src/main.zig @@ -3209,38 +3209,54 @@ fn userspaceEventLoop( posix.setsockopt(udp_sock.fd, posix.SOL.SOCKET, SO_BUSY_POLL, std.mem.asBytes(&busy_poll_us)) catch {}; var gro_rx = BatchUdp.GROReceiver{}; + var udp_ring: lib.net.IoUring.UdpRing = undefined; + const use_udp_ring = blk: { + udp_ring.init(udp_sock.fd) catch break :blk false; + break :blk true; + }; + defer if (use_udp_ring) udp_ring.deinit(); + if (use_udp_ring) { + const sqpoll_msg = if (udp_ring.sqpoll) "SQPOLL" else "submit"; + const buffer_msg = if (udp_ring.registered_buffers) "registered buffers" else "unregistered buffers"; + writeFormatted(stdout, " io_uring UDP: recvmsg/sendmsg ring active ({s}, {s})\n", .{ sqpoll_msg, buffer_msg }) catch {}; + } else { + writeFormatted(stdout, " io_uring UDP: unavailable, using poll+recvmsg path\n", .{}) catch {}; + } + const MAX_DECRYPTED = 64; var decrypt_storage: [MAX_DECRYPTED][1500]u8 = undefined; var decrypt_lens: [MAX_DECRYPTED]usize = undefined; var decrypt_slots: [MAX_DECRYPTED]usize = undefined; + var cqe_buffer: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; + const UDP_RING_IDLE_BACKOFF_NS = 100 * std.time.ns_per_us; while (swim.running.load(.acquire)) { - var fds = [_]posix.pollfd{ - .{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 }, - }; - _ = posix.poll(&fds, 50) catch continue; - - if (fds[0].revents & posix.POLL.IN != 0) { + if (use_udp_ring) { + const n_cqes = udp_ring.copyCompletions(&cqe_buffer, 0) catch 0; var n_decrypted: usize = 0; - // Drain loop: process all pending GRO batches before returning to poll() - while (true) { - const total_bytes = gro_rx.recvGRO(udp_sock.fd); - if (total_bytes == 0) break; // EAGAIN — socket drained + for (cqe_buffer[0..n_cqes]) |cqe| { + udp_ring.noteSendCompletion(cqe); + const recv = udp_ring.recvCompletion(cqe) orelse { + if (lib.net.IoUring.UdpRing.recvSlotFromUserData(cqe.user_data)) |slot| { + udp_ring.resubmitRecv(slot, udp_sock.fd) catch {}; + } + continue; + }; - const sender = gro_rx.getSender(); - const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes; + const total_bytes = recv.data.len; + const seg_size: usize = if (recv.segment_size > 0) recv.segment_size else total_bytes; var offset: usize = 0; while (offset < total_bytes) { const remaining = total_bytes - offset; const pkt_len = @min(seg_size, remaining); - const pkt = gro_rx.buf[offset..][0..pkt_len]; + const pkt = recv.data[offset..][0..pkt_len]; processIncomingPacket( pkt, - sender.addr, - sender.port, + recv.sender_addr, + recv.sender_port, wg_dev, swim, udp_sock, @@ -3255,6 +3271,8 @@ fn userspaceEventLoop( offset += pkt_len; } + udp_ring.resubmitRecv(recv.slot, udp_sock.fd) catch {}; + // Flush decrypted packets to TUN if buffer is filling up if (n_decrypted >= MAX_DECRYPTED - 44) { if (n_decrypted > 0) { @@ -3270,6 +3288,11 @@ fn userspaceEventLoop( } } + // Short backoff avoids spinning while keeping control-plane latency below the + // 50ms poll fallback timeout when the SQPOLL thread has no completions ready. + // The fallback branch blocks in poll(), so it does not need this extra backoff. + if (n_cqes == 0) sleepNs(UDP_RING_IDLE_BACKOFF_NS); + // Final flush of remaining decrypted packets if (n_decrypted > 0) { if (tun_dev.vnet_hdr) { @@ -3280,6 +3303,73 @@ fn userspaceEventLoop( } } } + } else { + var fds = [_]posix.pollfd{ + .{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 }, + }; + _ = posix.poll(&fds, 50) catch continue; + + if (fds[0].revents & posix.POLL.IN != 0) { + var n_decrypted: usize = 0; + + // Drain loop: process all pending GRO batches before returning to poll() + while (true) { + const total_bytes = gro_rx.recvGRO(udp_sock.fd); + if (total_bytes == 0) break; // EAGAIN — socket drained + + const sender = gro_rx.getSender(); + const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes; + + var offset: usize = 0; + while (offset < total_bytes) { + const remaining = total_bytes - offset; + const pkt_len = @min(seg_size, remaining); + const pkt = gro_rx.buf[offset..][0..pkt_len]; + + processIncomingPacket( + pkt, + sender.addr, + sender.port, + wg_dev, + swim, + udp_sock, + stdout, + &decrypt_storage, + &decrypt_lens, + &decrypt_slots, + &n_decrypted, + service_filter, + ); + + offset += pkt_len; + } + + // Flush decrypted packets to TUN if buffer is filling up + if (n_decrypted >= MAX_DECRYPTED - 44) { + if (n_decrypted > 0) { + if (tun_dev.vnet_hdr) { + writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted); + } else { + for (0..n_decrypted) |d| { + writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]); + } + } + n_decrypted = 0; + } + } + } + + // Final flush of remaining decrypted packets + if (n_decrypted > 0) { + if (tun_dev.vnet_hdr) { + writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted); + } else { + for (0..n_decrypted) |d| { + writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]); + } + } + } + } } swim.tickTimersOnly(); diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index a7090de..14fbcba 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -1,9 +1,35 @@ const std = @import("std"); +const builtin = @import("builtin"); const posix = std.posix; const linux = std.os.linux; const IoUring = linux.IoUring; const Offload = @import("offload.zig"); +const UDP_RECV_USER_DATA: u64 = 0x1000_0000_0000_0000; +const UDP_SEND_USER_DATA: u64 = 0x2000_0000_0000_0000; +const UDP_SLOT_MASK: u64 = 0x0000_0000_0000_ffff; + +const UDP_GRO = 104; +const SOL_UDP = 17; +// UDP_GRO cmsg payloads are emitted by the Linux kernel in native byte order. +const NATIVE_ENDIAN = builtin.cpu.arch.endian(); +// posix.O is a packed flag struct on this Zig version; compute the raw bit for fcntl(F_SETFL). +const O_NONBLOCK_FLAG: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK"); + +const Cmsghdr = extern struct { + len: usize, + level: i32, + type: i32, +}; + +fn initRing(entries: u16, use_sqpoll: bool) !IoUring { + const flags: u32 = if (use_sqpoll) linux.IORING_SETUP_SQPOLL else 0; + return IoUring.init(entries, flags) catch |err| { + if (!use_sqpoll) return err; + return try IoUring.init(entries, 0); + }; +} + /// TUN reader backed by io_uring. /// Pre-submits N read SQEs and resubmits on each completion. /// The caller processes completed reads identically to the poll()+read() path. @@ -25,7 +51,7 @@ pub const TunRingReader = struct { /// Initialize the io_uring and pre-submit read SQEs for the TUN fd. pub fn init(tun_fd: posix.fd_t) !TunRingReader { var self = TunRingReader{ - .ring = try IoUring.init(RING_DEPTH, 0), + .ring = try initRing(RING_DEPTH, true), .slots = undefined, }; @@ -80,6 +106,265 @@ pub const TunRingReader = struct { } }; +/// UDP receive/send event loop backed by io_uring. +/// Keeps receive SQEs in flight and uses registered fixed buffers plus SQPOLL +/// when the kernel/container permits it. UDP payload buffers are stable for the +/// lifetime of the ring (until deinit), so callers can process completions +/// without a copy. +pub const UdpRing = struct { + pub const RECV_DEPTH: u16 = 32; + pub const SEND_DEPTH: u16 = 32; + // Keep spare SQEs beyond recv+send slots for completions/resubmits during bursts. + pub const RING_DEPTH: u16 = 128; + pub const RECV_BUF_SIZE: usize = 65536; + pub const SEND_BUF_SIZE: usize = 2048; + + pub const RecvCompletion = struct { + data: []const u8, + sender_addr: [4]u8, + sender_port: u16, + segment_size: u16, + slot: u16, + }; + + const RecvSlot = struct { + buf: [RECV_BUF_SIZE]u8 = undefined, + cmsg_buf: [128]u8 align(@alignOf(Cmsghdr)) = undefined, + addr: posix.sockaddr.in = undefined, + iov: posix.iovec = undefined, + msg: linux.msghdr = undefined, + + fn setup(self: *RecvSlot) void { + self.addr = std.mem.zeroes(posix.sockaddr.in); + self.iov = .{ .base = &self.buf, .len = self.buf.len }; + @memset(&self.cmsg_buf, 0); + self.msg = .{ + .name = @ptrCast(&self.addr), + .namelen = @sizeOf(posix.sockaddr.in), + .iov = @ptrCast(&self.iov), + .iovlen = 1, + .control = @ptrCast(&self.cmsg_buf), + .controllen = self.cmsg_buf.len, + .flags = 0, + }; + } + + fn resetForRecv(self: *RecvSlot) void { + self.addr = std.mem.zeroes(posix.sockaddr.in); + self.iov.len = self.buf.len; + @memset(&self.cmsg_buf, 0); + self.msg.namelen = @sizeOf(posix.sockaddr.in); + self.msg.controllen = self.cmsg_buf.len; + self.msg.flags = 0; + } + + fn segmentSize(self: *const RecvSlot) u16 { + if (self.msg.controllen == 0) return 0; + var offset: usize = 0; + while (offset + @sizeOf(Cmsghdr) <= self.msg.controllen) { + const cmsg: *const Cmsghdr = @ptrCast(@alignCast(&self.cmsg_buf[offset])); + if (cmsg.len < @sizeOf(Cmsghdr)) break; + if (cmsg.level == SOL_UDP and cmsg.type == UDP_GRO) { + const data_offset = offset + @sizeOf(Cmsghdr); + if (data_offset + 2 <= self.msg.controllen) { + return std.mem.readInt(u16, self.cmsg_buf[data_offset..][0..2], NATIVE_ENDIAN); + } + break; + } + const aligned_len = (cmsg.len + @alignOf(Cmsghdr) - 1) & ~(@as(usize, @alignOf(Cmsghdr)) - 1); + offset += aligned_len; + } + return 0; + } + }; + + const SendSlot = struct { + buf: [SEND_BUF_SIZE]u8 = undefined, + addr: posix.sockaddr.in = undefined, + iov: posix.iovec_const = undefined, + msg: linux.msghdr_const = undefined, + busy: bool = false, + + fn setup(self: *SendSlot) void { + self.addr = std.mem.zeroes(posix.sockaddr.in); + self.iov = .{ .base = &self.buf, .len = 0 }; + self.msg = .{ + .name = @ptrCast(&self.addr), + .namelen = @sizeOf(posix.sockaddr.in), + .iov = @ptrCast(&self.iov), + .iovlen = 1, + .control = null, + .controllen = 0, + .flags = 0, + }; + self.busy = false; + } + }; + + ring: IoUring, + recv_slots: [RECV_DEPTH]RecvSlot, + send_slots: [SEND_DEPTH]SendSlot, + registered_iovecs: [RECV_DEPTH + SEND_DEPTH]posix.iovec, + fd: posix.fd_t, + original_status_flags: ?usize, + registered_buffers: bool, + sqpoll: bool, + + pub fn init(self: *UdpRing, fd: posix.fd_t) !void { + self.* = UdpRing{ + .ring = try initRing(RING_DEPTH, true), + .recv_slots = undefined, + .send_slots = undefined, + .registered_iovecs = undefined, + .fd = fd, + .original_status_flags = null, + .registered_buffers = false, + .sqpoll = false, + }; + // initRing may fall back when SQPOLL is denied by permissions/seccomp; + // the kernel-returned flags are the source of truth for runtime logging. + self.sqpoll = (self.ring.flags & linux.IORING_SETUP_SQPOLL) != 0; + errdefer { + self.restoreFdFlags(); + self.ring.deinit(); + } + + const raw_flags = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); + switch (posix.errno(raw_flags)) { + .SUCCESS => { + const flags: usize = @intCast(raw_flags); + self.original_status_flags = flags; + if ((flags & O_NONBLOCK_FLAG) != 0) { + // Blocking fds let io_uring arm async socket receive instead of completing + // immediately with EAGAIN on every pre-submitted recvmsg SQE. The saved + // original flags are restored by restoreFdFlags() during cleanup. + const set_rc = posix.system.fcntl(fd, posix.F.SETFL, flags & ~O_NONBLOCK_FLAG); + switch (posix.errno(set_rc)) { + .SUCCESS => {}, + else => |err| return posix.unexpectedErrno(err), + } + } + }, + else => |err| return posix.unexpectedErrno(err), + } + + for (0..RECV_DEPTH) |i| { + self.recv_slots[i].setup(); + self.registered_iovecs[i] = .{ + .base = &self.recv_slots[i].buf, + .len = self.recv_slots[i].buf.len, + }; + } + for (0..SEND_DEPTH) |i| { + self.send_slots[i].setup(); + self.registered_iovecs[RECV_DEPTH + i] = .{ + .base = &self.send_slots[i].buf, + .len = self.send_slots[i].buf.len, + }; + } + + if (self.ring.register_buffers(&self.registered_iovecs)) |_| { + self.registered_buffers = true; + } else |_| { + // Registration can fail in containers with low memlock limits; the ring still works, + // just without fixed/pinned buffers. + } + + for (0..RECV_DEPTH) |i| { + try self.submitRecv(@intCast(i), fd); + } + _ = try self.ring.submit(); + } + + fn submitRecv(self: *UdpRing, slot: u16, fd: posix.fd_t) !void { + self.recv_slots[slot].resetForRecv(); + _ = try self.ring.recvmsg(UDP_RECV_USER_DATA | slot, fd, &self.recv_slots[slot].msg, 0); + } + + pub fn resubmitRecv(self: *UdpRing, slot: u16, fd: posix.fd_t) !void { + try self.submitRecv(slot, fd); + _ = try self.ring.submit(); + } + + pub fn copyCompletions( + self: *UdpRing, + cqes: []linux.io_uring_cqe, + wait_nr: u32, + ) !u32 { + return try self.ring.copy_cqes(cqes, wait_nr); + } + + pub fn recvSlotFromUserData(user_data: u64) ?u16 { + if ((user_data & UDP_RECV_USER_DATA) == 0) return null; + const slot: u16 = @intCast(user_data & UDP_SLOT_MASK); + if (slot >= RECV_DEPTH) return null; + return slot; + } + + pub fn recvCompletion(self: *UdpRing, cqe: linux.io_uring_cqe) ?RecvCompletion { + const slot = recvSlotFromUserData(cqe.user_data) orelse return null; + if (cqe.res <= 0) return null; + const len: usize = @intCast(cqe.res); + const recv_slot = &self.recv_slots[slot]; + return .{ + .data = recv_slot.buf[0..len], + .sender_addr = @bitCast(recv_slot.addr.addr), + .sender_port = std.mem.bigToNative(u16, recv_slot.addr.port), + .segment_size = recv_slot.segmentSize(), + .slot = slot, + }; + } + + pub fn noteSendCompletion(self: *UdpRing, cqe: linux.io_uring_cqe) void { + if ((cqe.user_data & UDP_SEND_USER_DATA) == 0) return; + const slot: u16 = @intCast(cqe.user_data & UDP_SLOT_MASK); + if (slot < SEND_DEPTH) self.send_slots[slot].busy = false; + } + + /// Queue a UDP datagram through IORING_OP_SENDMSG. The payload is copied + /// into a registered ring-owned slot so the caller's buffer may go out of + /// scope immediately after this function returns. + /// Returns error.ExceedsSendBufferSize when `data.len` is greater than SEND_BUF_SIZE. + pub fn sendTo(self: *UdpRing, fd: posix.fd_t, data: []const u8, dest_addr: [4]u8, dest_port: u16) !bool { + if (data.len > SEND_BUF_SIZE) return error.ExceedsSendBufferSize; + + for (0..SEND_DEPTH) |i| { + if (self.send_slots[i].busy) continue; + var slot = &self.send_slots[i]; + @memcpy(slot.buf[0..data.len], data); + slot.addr = .{ + .family = linux.AF.INET, + .port = std.mem.nativeToBig(u16, dest_port), + .addr = @bitCast(dest_addr), + .zero = .{0} ** 8, + }; + slot.iov = .{ .base = &slot.buf, .len = data.len }; + slot.msg.namelen = @sizeOf(posix.sockaddr.in); + slot.busy = true; + _ = try self.ring.sendmsg(UDP_SEND_USER_DATA | @as(u64, @intCast(i)), fd, &slot.msg, 0); + _ = try self.ring.submit(); + return true; + } + + return false; + } + + pub fn deinit(self: *UdpRing) void { + if (self.registered_buffers) { + self.ring.unregister_buffers() catch {}; + } + self.restoreFdFlags(); + self.ring.deinit(); + } + + fn restoreFdFlags(self: *UdpRing) void { + if (self.original_status_flags) |flags| { + _ = posix.system.fcntl(self.fd, posix.F.SETFL, flags); + self.original_status_flags = null; + } + } +}; + /// Check if io_uring is available AND compatible with TUN devices. /// Currently disabled: TUN character devices (tun_chr_read_iter) do not /// reliably support io_uring IORING_OP_READ — reads fail silently or @@ -90,3 +375,21 @@ pub fn isAvailable() bool { // on bare metal (outside LXC containers). return false; } + +test "UdpRing parses UDP_GRO cmsg segment size" { + var slot = UdpRing.RecvSlot{}; + slot.setup(); + + const cmsg_len = @sizeOf(Cmsghdr) + @sizeOf(u16); + const cmsg_space = (cmsg_len + @alignOf(Cmsghdr) - 1) & ~(@as(usize, @alignOf(Cmsghdr)) - 1); + const hdr: *Cmsghdr = @ptrCast(@alignCast(&slot.cmsg_buf)); + hdr.* = .{ + .len = cmsg_len, + .level = SOL_UDP, + .type = UDP_GRO, + }; + std.mem.writeInt(u16, slot.cmsg_buf[@sizeOf(Cmsghdr)..][0..2], 1440, NATIVE_ENDIAN); + slot.msg.controllen = cmsg_space; + + try std.testing.expectEqual(@as(u16, 1440), slot.segmentSize()); +}