From cbe077a2736888b37f257539dbe90b829d396403 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 08:17:46 +0000 Subject: [PATCH 1/8] Initial plan From eb6b8a67ee5c9afc9c8878f361a6340e6c111ade Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 08:27:38 +0000 Subject: [PATCH 2/8] Add io_uring UDP ring path Agent-Logs-Url: https://github.com/igorls/meshguard/sessions/71f00ba9-16d4-42e0-82d3-49200c7d7dcc Co-authored-by: igorls <4753812+igorls@users.noreply.github.com> --- src/main.zig | 115 ++++++++++++++--- src/net/io_uring.zig | 286 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 385 insertions(+), 16 deletions(-) diff --git a/src/main.zig b/src/main.zig index c82074e..9fd0884 100644 --- a/src/main.zig +++ b/src/main.zig @@ -3209,38 +3209,52 @@ fn userspaceEventLoop( posix.setsockopt(udp_sock.fd, posix.SOL.SOCKET, SO_BUSY_POLL, std.mem.asBytes(&busy_poll_us)) catch {}; var gro_rx = BatchUdp.GROReceiver{}; + var udp_ring: lib.net.IoUring.UdpRing = undefined; + const use_udp_ring = blk: { + udp_ring.init(udp_sock.fd) catch break :blk false; + break :blk true; + }; + defer if (use_udp_ring) udp_ring.deinit(); + if (use_udp_ring) { + const sqpoll_msg = if (udp_ring.sqpoll) "SQPOLL" else "submit"; + writeFormatted(stdout, " io_uring UDP: recvmsg/sendmsg ring active ({s}, registered buffers)\n", .{sqpoll_msg}) catch {}; + } else { + writeFormatted(stdout, " io_uring UDP: unavailable, using poll+recvmsg path\n", .{}) catch {}; + } + const MAX_DECRYPTED = 64; var decrypt_storage: [MAX_DECRYPTED][1500]u8 = undefined; var decrypt_lens: [MAX_DECRYPTED]usize = undefined; var decrypt_slots: [MAX_DECRYPTED]usize = undefined; while (swim.running.load(.acquire)) { - var fds = [_]posix.pollfd{ - .{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 }, - }; - _ = posix.poll(&fds, 50) catch continue; - - if (fds[0].revents & posix.POLL.IN != 0) { + if (use_udp_ring) { + var cqes: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; + const n_cqes = udp_ring.copyCompletions(&cqes, 0) catch 0; var n_decrypted: usize = 0; - // Drain loop: process all pending GRO batches before returning to poll() - while (true) { - const total_bytes = gro_rx.recvGRO(udp_sock.fd); - if (total_bytes == 0) break; // EAGAIN — socket drained + for (cqes[0..n_cqes]) |cqe| { + udp_ring.noteSendCompletion(cqe); + const recv = udp_ring.recvCompletion(cqe) orelse { + if (lib.net.IoUring.UdpRing.recvSlotFromUserData(cqe.user_data)) |slot| { + udp_ring.resubmitRecv(slot, udp_sock.fd) catch {}; + } + continue; + }; - const sender = gro_rx.getSender(); - const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes; + const total_bytes = recv.data.len; + const seg_size: usize = if (recv.segment_size > 0) recv.segment_size else total_bytes; var offset: usize = 0; while (offset < total_bytes) { const remaining = total_bytes - offset; const pkt_len = @min(seg_size, remaining); - const pkt = gro_rx.buf[offset..][0..pkt_len]; + const pkt = recv.data[offset..][0..pkt_len]; processIncomingPacket( pkt, - sender.addr, - sender.port, + recv.sender_addr, + recv.sender_port, wg_dev, swim, udp_sock, @@ -3255,6 +3269,8 @@ fn userspaceEventLoop( offset += pkt_len; } + udp_ring.resubmitRecv(recv.slot, udp_sock.fd) catch {}; + // Flush decrypted packets to TUN if buffer is filling up if (n_decrypted >= MAX_DECRYPTED - 44) { if (n_decrypted > 0) { @@ -3270,6 +3286,8 @@ fn userspaceEventLoop( } } + if (n_cqes == 0) sleepNs(std.time.ns_per_ms); + // Final flush of remaining decrypted packets if (n_decrypted > 0) { if (tun_dev.vnet_hdr) { @@ -3280,6 +3298,73 @@ fn userspaceEventLoop( } } } + } else { + var fds = [_]posix.pollfd{ + .{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 }, + }; + _ = posix.poll(&fds, 50) catch continue; + + if (fds[0].revents & posix.POLL.IN != 0) { + var n_decrypted: usize = 0; + + // Drain loop: process all pending GRO batches before returning to poll() + while (true) { + const total_bytes = gro_rx.recvGRO(udp_sock.fd); + if (total_bytes == 0) break; // EAGAIN — socket drained + + const sender = gro_rx.getSender(); + const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes; + + var offset: usize = 0; + while (offset < total_bytes) { + const remaining = total_bytes - offset; + const pkt_len = @min(seg_size, remaining); + const pkt = gro_rx.buf[offset..][0..pkt_len]; + + processIncomingPacket( + pkt, + sender.addr, + sender.port, + wg_dev, + swim, + udp_sock, + stdout, + &decrypt_storage, + &decrypt_lens, + &decrypt_slots, + &n_decrypted, + service_filter, + ); + + offset += pkt_len; + } + + // Flush decrypted packets to TUN if buffer is filling up + if (n_decrypted >= MAX_DECRYPTED - 44) { + if (n_decrypted > 0) { + if (tun_dev.vnet_hdr) { + writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted); + } else { + for (0..n_decrypted) |d| { + writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]); + } + } + n_decrypted = 0; + } + } + } + + // Final flush of remaining decrypted packets + if (n_decrypted > 0) { + if (tun_dev.vnet_hdr) { + writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted); + } else { + for (0..n_decrypted) |d| { + writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]); + } + } + } + } } swim.tickTimersOnly(); diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index a7090de..7e1af59 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -4,6 +4,29 @@ const linux = std.os.linux; const IoUring = linux.IoUring; const Offload = @import("offload.zig"); +const UDP_RECV_USER_DATA: u64 = 0x1000_0000_0000_0000; +const UDP_SEND_USER_DATA: u64 = 0x2000_0000_0000_0000; +const UDP_SLOT_MASK: u64 = 0x0000_0000_0000_ffff; + +const UDP_GRO = 104; +const SOL_UDP = 17; +const O_NONBLOCK_FLAG: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK"); + +const Cmsghdr = extern struct { + len: usize, + level: i32, + type: i32, +}; + +fn initRing(entries: u16, use_sqpoll: bool) !IoUring { + if (use_sqpoll) { + return IoUring.init(entries, linux.IORING_SETUP_SQPOLL) catch { + return try IoUring.init(entries, 0); + }; + } + return try IoUring.init(entries, 0); +} + /// TUN reader backed by io_uring. /// Pre-submits N read SQEs and resubmits on each completion. /// The caller processes completed reads identically to the poll()+read() path. @@ -25,7 +48,7 @@ pub const TunRingReader = struct { /// Initialize the io_uring and pre-submit read SQEs for the TUN fd. pub fn init(tun_fd: posix.fd_t) !TunRingReader { var self = TunRingReader{ - .ring = try IoUring.init(RING_DEPTH, 0), + .ring = try initRing(RING_DEPTH, true), .slots = undefined, }; @@ -80,6 +103,249 @@ pub const TunRingReader = struct { } }; +/// UDP receive/send event loop backed by io_uring. +/// Keeps receive SQEs in flight and uses registered fixed buffers plus SQPOLL +/// when the kernel/container permits it. UDP payload buffers are stable for the +/// lifetime of the ring, so callers can process completions without a copy. +pub const UdpRing = struct { + pub const RECV_DEPTH: u16 = 32; + pub const SEND_DEPTH: u16 = 32; + pub const RING_DEPTH: u16 = 128; + pub const RECV_BUF_SIZE: usize = 65536; + pub const SEND_BUF_SIZE: usize = 2048; + + pub const RecvCompletion = struct { + data: []const u8, + sender_addr: [4]u8, + sender_port: u16, + segment_size: u16, + slot: u16, + }; + + const RecvSlot = struct { + buf: [RECV_BUF_SIZE]u8 = undefined, + cmsg_buf: [128]u8 align(@alignOf(Cmsghdr)) = undefined, + addr: posix.sockaddr.in = undefined, + iov: posix.iovec = undefined, + msg: linux.msghdr = undefined, + + fn setup(self: *RecvSlot) void { + self.addr = std.mem.zeroes(posix.sockaddr.in); + self.iov = .{ .base = &self.buf, .len = self.buf.len }; + @memset(&self.cmsg_buf, 0); + self.msg = .{ + .msg_name = @ptrCast(&self.addr), + .msg_namelen = @sizeOf(posix.sockaddr.in), + .msg_iov = @ptrCast(&self.iov), + .msg_iovlen = 1, + .msg_control = @ptrCast(&self.cmsg_buf), + .msg_controllen = self.cmsg_buf.len, + .msg_flags = 0, + }; + } + + fn resetForRecv(self: *RecvSlot) void { + self.addr = std.mem.zeroes(posix.sockaddr.in); + self.iov.len = self.buf.len; + @memset(&self.cmsg_buf, 0); + self.msg.msg_namelen = @sizeOf(posix.sockaddr.in); + self.msg.msg_controllen = self.cmsg_buf.len; + self.msg.msg_flags = 0; + } + + fn segmentSize(self: *const RecvSlot) u16 { + if (self.msg.msg_controllen == 0) return 0; + var offset: usize = 0; + while (offset + @sizeOf(Cmsghdr) <= self.msg.msg_controllen) { + const cmsg: *const Cmsghdr = @ptrCast(@alignCast(&self.cmsg_buf[offset])); + if (cmsg.len < @sizeOf(Cmsghdr)) break; + if (cmsg.level == SOL_UDP and cmsg.type == UDP_GRO) { + const data_offset = offset + @sizeOf(Cmsghdr); + if (data_offset + 2 <= self.msg.msg_controllen) { + return std.mem.readInt(u16, self.cmsg_buf[data_offset..][0..2], .little); + } + break; + } + const aligned_len = (cmsg.len + @alignOf(Cmsghdr) - 1) & ~(@as(usize, @alignOf(Cmsghdr)) - 1); + offset += aligned_len; + } + return 0; + } + }; + + const SendSlot = struct { + buf: [SEND_BUF_SIZE]u8 = undefined, + addr: posix.sockaddr.in = undefined, + iov: posix.iovec_const = undefined, + msg: linux.msghdr_const = undefined, + busy: bool = false, + + fn setup(self: *SendSlot) void { + self.addr = std.mem.zeroes(posix.sockaddr.in); + self.iov = .{ .base = &self.buf, .len = 0 }; + self.msg = .{ + .msg_name = @ptrCast(&self.addr), + .msg_namelen = @sizeOf(posix.sockaddr.in), + .msg_iov = @ptrCast(&self.iov), + .msg_iovlen = 1, + .msg_control = null, + .msg_controllen = 0, + .msg_flags = 0, + }; + self.busy = false; + } + }; + + ring: IoUring, + recv_slots: [RECV_DEPTH]RecvSlot, + send_slots: [SEND_DEPTH]SendSlot, + registered_iovecs: [RECV_DEPTH + SEND_DEPTH]posix.iovec, + fd: posix.fd_t, + original_status_flags: ?usize, + registered_buffers: bool, + sqpoll: bool, + + pub fn init(self: *UdpRing, fd: posix.fd_t) !void { + self.* = UdpRing{ + .ring = try initRing(RING_DEPTH, true), + .recv_slots = undefined, + .send_slots = undefined, + .registered_iovecs = undefined, + .fd = fd, + .original_status_flags = null, + .registered_buffers = false, + .sqpoll = false, + }; + self.sqpoll = (self.ring.flags & linux.IORING_SETUP_SQPOLL) != 0; + errdefer { + self.restoreFdFlags(); + self.ring.deinit(); + } + + const flags = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); + switch (posix.errno(flags)) { + .SUCCESS => { + self.original_status_flags = flags; + if ((flags & O_NONBLOCK_FLAG) != 0) { + _ = posix.system.fcntl(fd, posix.F.SETFL, flags & ~O_NONBLOCK_FLAG); + } + }, + else => {}, + } + + for (0..RECV_DEPTH) |i| { + self.recv_slots[i].setup(); + self.registered_iovecs[i] = .{ + .base = &self.recv_slots[i].buf, + .len = self.recv_slots[i].buf.len, + }; + } + for (0..SEND_DEPTH) |i| { + self.send_slots[i].setup(); + self.registered_iovecs[RECV_DEPTH + i] = .{ + .base = &self.send_slots[i].buf, + .len = self.send_slots[i].buf.len, + }; + } + + if (self.ring.register_buffers(&self.registered_iovecs)) |_| { + self.registered_buffers = true; + } else |_| {} + + for (0..RECV_DEPTH) |i| { + try self.submitRecv(@intCast(i), fd); + } + _ = try self.ring.submit(); + } + + fn submitRecv(self: *UdpRing, slot: u16, fd: posix.fd_t) !void { + self.recv_slots[slot].resetForRecv(); + _ = try self.ring.recvmsg(UDP_RECV_USER_DATA | slot, fd, &self.recv_slots[slot].msg, 0); + } + + pub fn resubmitRecv(self: *UdpRing, slot: u16, fd: posix.fd_t) !void { + try self.submitRecv(slot, fd); + _ = try self.ring.submit(); + } + + pub fn copyCompletions( + self: *UdpRing, + cqes: []linux.io_uring_cqe, + wait_nr: u32, + ) !u32 { + return try self.ring.copy_cqes(cqes, wait_nr); + } + + pub fn recvSlotFromUserData(user_data: u64) ?u16 { + if ((user_data & UDP_RECV_USER_DATA) == 0) return null; + const slot: u16 = @intCast(user_data & UDP_SLOT_MASK); + if (slot >= RECV_DEPTH) return null; + return slot; + } + + pub fn recvCompletion(self: *UdpRing, cqe: linux.io_uring_cqe) ?RecvCompletion { + const slot = recvSlotFromUserData(cqe.user_data) orelse return null; + if (cqe.res <= 0) return null; + const len: usize = @intCast(cqe.res); + const recv_slot = &self.recv_slots[slot]; + return .{ + .data = recv_slot.buf[0..len], + .sender_addr = @bitCast(recv_slot.addr.addr), + .sender_port = std.mem.bigToNative(u16, recv_slot.addr.port), + .segment_size = recv_slot.segmentSize(), + .slot = slot, + }; + } + + pub fn noteSendCompletion(self: *UdpRing, cqe: linux.io_uring_cqe) void { + if ((cqe.user_data & UDP_SEND_USER_DATA) == 0) return; + const slot: u16 = @intCast(cqe.user_data & UDP_SLOT_MASK); + if (slot < SEND_DEPTH) self.send_slots[slot].busy = false; + } + + /// Queue a UDP datagram through IORING_OP_SENDMSG. The payload is copied + /// into a registered ring-owned slot so the caller's buffer may go out of + /// scope immediately after this function returns. + pub fn sendTo(self: *UdpRing, fd: posix.fd_t, data: []const u8, dest_addr: [4]u8, dest_port: u16) !bool { + if (data.len > SEND_BUF_SIZE) return error.DatagramTooLarge; + + for (0..SEND_DEPTH) |i| { + if (self.send_slots[i].busy) continue; + var slot = &self.send_slots[i]; + @memcpy(slot.buf[0..data.len], data); + slot.addr = .{ + .family = linux.AF.INET, + .port = std.mem.nativeToBig(u16, dest_port), + .addr = @bitCast(dest_addr), + .zero = .{0} ** 8, + }; + slot.iov = .{ .base = &slot.buf, .len = data.len }; + slot.msg.msg_namelen = @sizeOf(posix.sockaddr.in); + slot.busy = true; + _ = try self.ring.sendmsg(UDP_SEND_USER_DATA | @as(u64, @intCast(i)), fd, &slot.msg, 0); + _ = try self.ring.submit(); + return true; + } + + return false; + } + + pub fn deinit(self: *UdpRing) void { + if (self.registered_buffers) { + self.ring.unregister_buffers() catch {}; + } + self.restoreFdFlags(); + self.ring.deinit(); + } + + fn restoreFdFlags(self: *UdpRing) void { + if (self.original_status_flags) |flags| { + _ = posix.system.fcntl(self.fd, posix.F.SETFL, flags); + self.original_status_flags = null; + } + } +}; + /// Check if io_uring is available AND compatible with TUN devices. /// Currently disabled: TUN character devices (tun_chr_read_iter) do not /// reliably support io_uring IORING_OP_READ — reads fail silently or @@ -90,3 +356,21 @@ pub fn isAvailable() bool { // on bare metal (outside LXC containers). return false; } + +test "UdpRing parses UDP_GRO cmsg segment size" { + var slot = UdpRing.RecvSlot{}; + slot.setup(); + + const cmsg_len = @sizeOf(Cmsghdr) + @sizeOf(u16); + const cmsg_space = (cmsg_len + @alignOf(Cmsghdr) - 1) & ~(@as(usize, @alignOf(Cmsghdr)) - 1); + const hdr: *Cmsghdr = @ptrCast(@alignCast(&slot.cmsg_buf)); + hdr.* = .{ + .len = cmsg_len, + .level = SOL_UDP, + .type = UDP_GRO, + }; + std.mem.writeInt(u16, slot.cmsg_buf[@sizeOf(Cmsghdr)..][0..2], 1440, .little); + slot.msg.msg_controllen = cmsg_space; + + try std.testing.expectEqual(@as(u16, 1440), slot.segmentSize()); +} From 6229a52e4997a58aeea8418ee3b0f00bdca01066 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 08:30:11 +0000 Subject: [PATCH 3/8] Address io_uring review feedback Agent-Logs-Url: https://github.com/igorls/meshguard/sessions/71f00ba9-16d4-42e0-82d3-49200c7d7dcc Co-authored-by: igorls <4753812+igorls@users.noreply.github.com> --- src/main.zig | 8 ++++---- src/net/io_uring.zig | 13 +++++++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/main.zig b/src/main.zig index 9fd0884..476479b 100644 --- a/src/main.zig +++ b/src/main.zig @@ -3226,14 +3226,14 @@ fn userspaceEventLoop( var decrypt_storage: [MAX_DECRYPTED][1500]u8 = undefined; var decrypt_lens: [MAX_DECRYPTED]usize = undefined; var decrypt_slots: [MAX_DECRYPTED]usize = undefined; + var udp_ring_cqes: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; while (swim.running.load(.acquire)) { if (use_udp_ring) { - var cqes: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; - const n_cqes = udp_ring.copyCompletions(&cqes, 0) catch 0; + const n_cqes = udp_ring.copyCompletions(&udp_ring_cqes, 0) catch 0; var n_decrypted: usize = 0; - for (cqes[0..n_cqes]) |cqe| { + for (udp_ring_cqes[0..n_cqes]) |cqe| { udp_ring.noteSendCompletion(cqe); const recv = udp_ring.recvCompletion(cqe) orelse { if (lib.net.IoUring.UdpRing.recvSlotFromUserData(cqe.user_data)) |slot| { @@ -3286,7 +3286,7 @@ fn userspaceEventLoop( } } - if (n_cqes == 0) sleepNs(std.time.ns_per_ms); + if (n_cqes == 0) sleepNs(100 * std.time.ns_per_us); // Final flush of remaining decrypted packets if (n_decrypted > 0) { diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index 7e1af59..b823556 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const posix = std.posix; const linux = std.os.linux; const IoUring = linux.IoUring; @@ -162,7 +163,7 @@ pub const UdpRing = struct { if (cmsg.level == SOL_UDP and cmsg.type == UDP_GRO) { const data_offset = offset + @sizeOf(Cmsghdr); if (data_offset + 2 <= self.msg.msg_controllen) { - return std.mem.readInt(u16, self.cmsg_buf[data_offset..][0..2], .little); + return std.mem.readInt(u16, self.cmsg_buf[data_offset..][0..2], builtin.cpu.arch.endian()); } break; } @@ -227,7 +228,11 @@ pub const UdpRing = struct { .SUCCESS => { self.original_status_flags = flags; if ((flags & O_NONBLOCK_FLAG) != 0) { - _ = posix.system.fcntl(fd, posix.F.SETFL, flags & ~O_NONBLOCK_FLAG); + const set_rc = posix.system.fcntl(fd, posix.F.SETFL, flags & ~O_NONBLOCK_FLAG); + switch (posix.errno(set_rc)) { + .SUCCESS => {}, + else => |err| return posix.unexpectedErrno(err), + } } }, else => {}, @@ -307,7 +312,7 @@ pub const UdpRing = struct { /// into a registered ring-owned slot so the caller's buffer may go out of /// scope immediately after this function returns. pub fn sendTo(self: *UdpRing, fd: posix.fd_t, data: []const u8, dest_addr: [4]u8, dest_port: u16) !bool { - if (data.len > SEND_BUF_SIZE) return error.DatagramTooLarge; + if (data.len > SEND_BUF_SIZE) return error.TooBig; for (0..SEND_DEPTH) |i| { if (self.send_slots[i].busy) continue; @@ -369,7 +374,7 @@ test "UdpRing parses UDP_GRO cmsg segment size" { .level = SOL_UDP, .type = UDP_GRO, }; - std.mem.writeInt(u16, slot.cmsg_buf[@sizeOf(Cmsghdr)..][0..2], 1440, .little); + std.mem.writeInt(u16, slot.cmsg_buf[@sizeOf(Cmsghdr)..][0..2], 1440, builtin.cpu.arch.endian()); slot.msg.msg_controllen = cmsg_space; try std.testing.expectEqual(@as(u16, 1440), slot.segmentSize()); From e204c64fb73ef811dd7a592c78b7c716426b7bc2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 08:32:00 +0000 Subject: [PATCH 4/8] Clarify io_uring runtime behavior Agent-Logs-Url: https://github.com/igorls/meshguard/sessions/71f00ba9-16d4-42e0-82d3-49200c7d7dcc Co-authored-by: igorls <4753812+igorls@users.noreply.github.com> --- src/main.zig | 5 ++++- src/net/io_uring.zig | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main.zig b/src/main.zig index 476479b..67cf19c 100644 --- a/src/main.zig +++ b/src/main.zig @@ -3217,7 +3217,8 @@ fn userspaceEventLoop( defer if (use_udp_ring) udp_ring.deinit(); if (use_udp_ring) { const sqpoll_msg = if (udp_ring.sqpoll) "SQPOLL" else "submit"; - writeFormatted(stdout, " io_uring UDP: recvmsg/sendmsg ring active ({s}, registered buffers)\n", .{sqpoll_msg}) catch {}; + const buffer_msg = if (udp_ring.registered_buffers) "registered buffers" else "unregistered buffers"; + writeFormatted(stdout, " io_uring UDP: recvmsg/sendmsg ring active ({s}, {s})\n", .{ sqpoll_msg, buffer_msg }) catch {}; } else { writeFormatted(stdout, " io_uring UDP: unavailable, using poll+recvmsg path\n", .{}) catch {}; } @@ -3286,6 +3287,8 @@ fn userspaceEventLoop( } } + // Short backoff avoids spinning while keeping control-plane latency below the + // 50ms poll fallback timeout when the SQPOLL thread has no completions ready. if (n_cqes == 0) sleepNs(100 * std.time.ns_per_us); // Final flush of remaining decrypted packets diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index b823556..3861a97 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -11,6 +11,7 @@ const UDP_SLOT_MASK: u64 = 0x0000_0000_0000_ffff; const UDP_GRO = 104; const SOL_UDP = 17; +// posix.O is a packed flag struct on this Zig version; compute the raw bit for fcntl(F_SETFL). const O_NONBLOCK_FLAG: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK"); const Cmsghdr = extern struct { @@ -111,6 +112,7 @@ pub const TunRingReader = struct { pub const UdpRing = struct { pub const RECV_DEPTH: u16 = 32; pub const SEND_DEPTH: u16 = 32; + // Keep spare SQEs beyond recv+send slots for completions/resubmits during bursts. pub const RING_DEPTH: u16 = 128; pub const RECV_BUF_SIZE: usize = 65536; pub const SEND_BUF_SIZE: usize = 2048; @@ -228,6 +230,8 @@ pub const UdpRing = struct { .SUCCESS => { self.original_status_flags = flags; if ((flags & O_NONBLOCK_FLAG) != 0) { + // Blocking fds let io_uring arm async socket receive instead of completing + // immediately with EAGAIN on every pre-submitted recvmsg SQE. const set_rc = posix.system.fcntl(fd, posix.F.SETFL, flags & ~O_NONBLOCK_FLAG); switch (posix.errno(set_rc)) { .SUCCESS => {}, @@ -255,7 +259,10 @@ pub const UdpRing = struct { if (self.ring.register_buffers(&self.registered_iovecs)) |_| { self.registered_buffers = true; - } else |_| {} + } else |_| { + // Registration can fail in containers with low memlock limits; the ring still works, + // just without fixed/pinned buffers. + } for (0..RECV_DEPTH) |i| { try self.submitRecv(@intCast(i), fd); From 8b45c7b5ccb64d5e3ada63142b674fb0ec9abd53 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 08:33:55 +0000 Subject: [PATCH 5/8] Polish io_uring UDP review items Agent-Logs-Url: https://github.com/igorls/meshguard/sessions/71f00ba9-16d4-42e0-82d3-49200c7d7dcc Co-authored-by: igorls <4753812+igorls@users.noreply.github.com> --- src/main.zig | 3 ++- src/net/io_uring.zig | 15 ++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main.zig b/src/main.zig index 67cf19c..cbb3138 100644 --- a/src/main.zig +++ b/src/main.zig @@ -3228,6 +3228,7 @@ fn userspaceEventLoop( var decrypt_lens: [MAX_DECRYPTED]usize = undefined; var decrypt_slots: [MAX_DECRYPTED]usize = undefined; var udp_ring_cqes: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; + const UDP_RING_IDLE_BACKOFF_NS = 100 * std.time.ns_per_us; while (swim.running.load(.acquire)) { if (use_udp_ring) { @@ -3289,7 +3290,7 @@ fn userspaceEventLoop( // Short backoff avoids spinning while keeping control-plane latency below the // 50ms poll fallback timeout when the SQPOLL thread has no completions ready. - if (n_cqes == 0) sleepNs(100 * std.time.ns_per_us); + if (n_cqes == 0) sleepNs(UDP_RING_IDLE_BACKOFF_NS); // Final flush of remaining decrypted packets if (n_decrypted > 0) { diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index 3861a97..a39782a 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -21,12 +21,11 @@ const Cmsghdr = extern struct { }; fn initRing(entries: u16, use_sqpoll: bool) !IoUring { - if (use_sqpoll) { - return IoUring.init(entries, linux.IORING_SETUP_SQPOLL) catch { - return try IoUring.init(entries, 0); - }; - } - return try IoUring.init(entries, 0); + const flags: u32 = if (use_sqpoll) linux.IORING_SETUP_SQPOLL else 0; + return IoUring.init(entries, flags) catch |err| { + if (!use_sqpoll) return err; + return try IoUring.init(entries, 0); + }; } /// TUN reader backed by io_uring. @@ -219,6 +218,8 @@ pub const UdpRing = struct { .registered_buffers = false, .sqpoll = false, }; + // initRing may fall back when SQPOLL is denied by permissions/seccomp; + // the kernel-returned flags are the source of truth for runtime logging. self.sqpoll = (self.ring.flags & linux.IORING_SETUP_SQPOLL) != 0; errdefer { self.restoreFdFlags(); @@ -319,7 +320,7 @@ pub const UdpRing = struct { /// into a registered ring-owned slot so the caller's buffer may go out of /// scope immediately after this function returns. pub fn sendTo(self: *UdpRing, fd: posix.fd_t, data: []const u8, dest_addr: [4]u8, dest_port: u16) !bool { - if (data.len > SEND_BUF_SIZE) return error.TooBig; + if (data.len > SEND_BUF_SIZE) return error.ExceedsSendBufferSize; for (0..SEND_DEPTH) |i| { if (self.send_slots[i].busy) continue; From 263e5f043b11ae6b991923140ecc992a26ce0bc8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 08:35:32 +0000 Subject: [PATCH 6/8] Document io_uring completion handling Agent-Logs-Url: https://github.com/igorls/meshguard/sessions/71f00ba9-16d4-42e0-82d3-49200c7d7dcc Co-authored-by: igorls <4753812+igorls@users.noreply.github.com> --- src/main.zig | 7 ++++--- src/net/io_uring.zig | 4 +++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main.zig b/src/main.zig index cbb3138..413bed9 100644 --- a/src/main.zig +++ b/src/main.zig @@ -3227,15 +3227,15 @@ fn userspaceEventLoop( var decrypt_storage: [MAX_DECRYPTED][1500]u8 = undefined; var decrypt_lens: [MAX_DECRYPTED]usize = undefined; var decrypt_slots: [MAX_DECRYPTED]usize = undefined; - var udp_ring_cqes: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; + var cqe_buffer: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; const UDP_RING_IDLE_BACKOFF_NS = 100 * std.time.ns_per_us; while (swim.running.load(.acquire)) { if (use_udp_ring) { - const n_cqes = udp_ring.copyCompletions(&udp_ring_cqes, 0) catch 0; + const n_cqes = udp_ring.copyCompletions(&cqe_buffer, 0) catch 0; var n_decrypted: usize = 0; - for (udp_ring_cqes[0..n_cqes]) |cqe| { + for (cqe_buffer[0..n_cqes]) |cqe| { udp_ring.noteSendCompletion(cqe); const recv = udp_ring.recvCompletion(cqe) orelse { if (lib.net.IoUring.UdpRing.recvSlotFromUserData(cqe.user_data)) |slot| { @@ -3290,6 +3290,7 @@ fn userspaceEventLoop( // Short backoff avoids spinning while keeping control-plane latency below the // 50ms poll fallback timeout when the SQPOLL thread has no completions ready. + // The fallback branch blocks in poll(), so it does not need this extra backoff. if (n_cqes == 0) sleepNs(UDP_RING_IDLE_BACKOFF_NS); // Final flush of remaining decrypted packets diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index a39782a..145034b 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -107,7 +107,8 @@ pub const TunRingReader = struct { /// UDP receive/send event loop backed by io_uring. /// Keeps receive SQEs in flight and uses registered fixed buffers plus SQPOLL /// when the kernel/container permits it. UDP payload buffers are stable for the -/// lifetime of the ring, so callers can process completions without a copy. +/// lifetime of the ring (until deinit), so callers can process completions +/// without a copy. pub const UdpRing = struct { pub const RECV_DEPTH: u16 = 32; pub const SEND_DEPTH: u16 = 32; @@ -319,6 +320,7 @@ pub const UdpRing = struct { /// Queue a UDP datagram through IORING_OP_SENDMSG. The payload is copied /// into a registered ring-owned slot so the caller's buffer may go out of /// scope immediately after this function returns. + /// Returns error.ExceedsSendBufferSize when `data.len` is greater than SEND_BUF_SIZE. pub fn sendTo(self: *UdpRing, fd: posix.fd_t, data: []const u8, dest_addr: [4]u8, dest_port: u16) !bool { if (data.len > SEND_BUF_SIZE) return error.ExceedsSendBufferSize; From 969d5c07615c8bcef514b8c01c70dc968a3ba352 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 08:37:39 +0000 Subject: [PATCH 7/8] Handle UDP ring fd flag errors Agent-Logs-Url: https://github.com/igorls/meshguard/sessions/71f00ba9-16d4-42e0-82d3-49200c7d7dcc Co-authored-by: igorls <4753812+igorls@users.noreply.github.com> --- src/net/io_uring.zig | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index 145034b..f122a11 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -11,6 +11,8 @@ const UDP_SLOT_MASK: u64 = 0x0000_0000_0000_ffff; const UDP_GRO = 104; const SOL_UDP = 17; +// UDP_GRO cmsg payloads are emitted by the Linux kernel in native byte order. +const NATIVE_ENDIAN = builtin.cpu.arch.endian(); // posix.O is a packed flag struct on this Zig version; compute the raw bit for fcntl(F_SETFL). const O_NONBLOCK_FLAG: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK"); @@ -165,7 +167,7 @@ pub const UdpRing = struct { if (cmsg.level == SOL_UDP and cmsg.type == UDP_GRO) { const data_offset = offset + @sizeOf(Cmsghdr); if (data_offset + 2 <= self.msg.msg_controllen) { - return std.mem.readInt(u16, self.cmsg_buf[data_offset..][0..2], builtin.cpu.arch.endian()); + return std.mem.readInt(u16, self.cmsg_buf[data_offset..][0..2], NATIVE_ENDIAN); } break; } @@ -233,7 +235,8 @@ pub const UdpRing = struct { self.original_status_flags = flags; if ((flags & O_NONBLOCK_FLAG) != 0) { // Blocking fds let io_uring arm async socket receive instead of completing - // immediately with EAGAIN on every pre-submitted recvmsg SQE. + // immediately with EAGAIN on every pre-submitted recvmsg SQE. The saved + // original flags are restored by restoreFdFlags() during cleanup. const set_rc = posix.system.fcntl(fd, posix.F.SETFL, flags & ~O_NONBLOCK_FLAG); switch (posix.errno(set_rc)) { .SUCCESS => {}, @@ -241,7 +244,7 @@ pub const UdpRing = struct { } } }, - else => {}, + else => |err| return posix.unexpectedErrno(err), } for (0..RECV_DEPTH) |i| { @@ -384,7 +387,7 @@ test "UdpRing parses UDP_GRO cmsg segment size" { .level = SOL_UDP, .type = UDP_GRO, }; - std.mem.writeInt(u16, slot.cmsg_buf[@sizeOf(Cmsghdr)..][0..2], 1440, builtin.cpu.arch.endian()); + std.mem.writeInt(u16, slot.cmsg_buf[@sizeOf(Cmsghdr)..][0..2], 1440, NATIVE_ENDIAN); slot.msg.msg_controllen = cmsg_space; try std.testing.expectEqual(@as(u16, 1440), slot.segmentSize()); From 6779ef6b3be5e6a65b35edca73d988da474d7f64 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Tue, 28 Apr 2026 06:59:12 -0300 Subject: [PATCH 8/8] =?UTF-8?q?fix:=20io=5Furing=20build=20errors=20?= =?UTF-8?q?=E2=80=94=20msghdr=20field=20names=20and=20fcntl=20return=20typ?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename msghdr/msghdr_const fields to match Zig 0.16.0 std (msg_name→name, msg_namelen→namelen, etc.) - Cast fcntl return value via @intCast to handle c_int→usize mismatch when libc is linked --- src/net/io_uring.zig | 49 ++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/net/io_uring.zig b/src/net/io_uring.zig index f122a11..14fbcba 100644 --- a/src/net/io_uring.zig +++ b/src/net/io_uring.zig @@ -139,13 +139,13 @@ pub const UdpRing = struct { self.iov = .{ .base = &self.buf, .len = self.buf.len }; @memset(&self.cmsg_buf, 0); self.msg = .{ - .msg_name = @ptrCast(&self.addr), - .msg_namelen = @sizeOf(posix.sockaddr.in), - .msg_iov = @ptrCast(&self.iov), - .msg_iovlen = 1, - .msg_control = @ptrCast(&self.cmsg_buf), - .msg_controllen = self.cmsg_buf.len, - .msg_flags = 0, + .name = @ptrCast(&self.addr), + .namelen = @sizeOf(posix.sockaddr.in), + .iov = @ptrCast(&self.iov), + .iovlen = 1, + .control = @ptrCast(&self.cmsg_buf), + .controllen = self.cmsg_buf.len, + .flags = 0, }; } @@ -153,20 +153,20 @@ pub const UdpRing = struct { self.addr = std.mem.zeroes(posix.sockaddr.in); self.iov.len = self.buf.len; @memset(&self.cmsg_buf, 0); - self.msg.msg_namelen = @sizeOf(posix.sockaddr.in); - self.msg.msg_controllen = self.cmsg_buf.len; - self.msg.msg_flags = 0; + self.msg.namelen = @sizeOf(posix.sockaddr.in); + self.msg.controllen = self.cmsg_buf.len; + self.msg.flags = 0; } fn segmentSize(self: *const RecvSlot) u16 { - if (self.msg.msg_controllen == 0) return 0; + if (self.msg.controllen == 0) return 0; var offset: usize = 0; - while (offset + @sizeOf(Cmsghdr) <= self.msg.msg_controllen) { + while (offset + @sizeOf(Cmsghdr) <= self.msg.controllen) { const cmsg: *const Cmsghdr = @ptrCast(@alignCast(&self.cmsg_buf[offset])); if (cmsg.len < @sizeOf(Cmsghdr)) break; if (cmsg.level == SOL_UDP and cmsg.type == UDP_GRO) { const data_offset = offset + @sizeOf(Cmsghdr); - if (data_offset + 2 <= self.msg.msg_controllen) { + if (data_offset + 2 <= self.msg.controllen) { return std.mem.readInt(u16, self.cmsg_buf[data_offset..][0..2], NATIVE_ENDIAN); } break; @@ -189,13 +189,13 @@ pub const UdpRing = struct { self.addr = std.mem.zeroes(posix.sockaddr.in); self.iov = .{ .base = &self.buf, .len = 0 }; self.msg = .{ - .msg_name = @ptrCast(&self.addr), - .msg_namelen = @sizeOf(posix.sockaddr.in), - .msg_iov = @ptrCast(&self.iov), - .msg_iovlen = 1, - .msg_control = null, - .msg_controllen = 0, - .msg_flags = 0, + .name = @ptrCast(&self.addr), + .namelen = @sizeOf(posix.sockaddr.in), + .iov = @ptrCast(&self.iov), + .iovlen = 1, + .control = null, + .controllen = 0, + .flags = 0, }; self.busy = false; } @@ -229,9 +229,10 @@ pub const UdpRing = struct { self.ring.deinit(); } - const flags = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); - switch (posix.errno(flags)) { + const raw_flags = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); + switch (posix.errno(raw_flags)) { .SUCCESS => { + const flags: usize = @intCast(raw_flags); self.original_status_flags = flags; if ((flags & O_NONBLOCK_FLAG) != 0) { // Blocking fds let io_uring arm async socket receive instead of completing @@ -338,7 +339,7 @@ pub const UdpRing = struct { .zero = .{0} ** 8, }; slot.iov = .{ .base = &slot.buf, .len = data.len }; - slot.msg.msg_namelen = @sizeOf(posix.sockaddr.in); + slot.msg.namelen = @sizeOf(posix.sockaddr.in); slot.busy = true; _ = try self.ring.sendmsg(UDP_SEND_USER_DATA | @as(u64, @intCast(i)), fd, &slot.msg, 0); _ = try self.ring.submit(); @@ -388,7 +389,7 @@ test "UdpRing parses UDP_GRO cmsg segment size" { .type = UDP_GRO, }; std.mem.writeInt(u16, slot.cmsg_buf[@sizeOf(Cmsghdr)..][0..2], 1440, NATIVE_ENDIAN); - slot.msg.msg_controllen = cmsg_space; + slot.msg.controllen = cmsg_space; try std.testing.expectEqual(@as(u16, 1440), slot.segmentSize()); }