Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 105 additions & 15 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3209,38 +3209,54 @@ fn userspaceEventLoop(
posix.setsockopt(udp_sock.fd, posix.SOL.SOCKET, SO_BUSY_POLL, std.mem.asBytes(&busy_poll_us)) catch {};

var gro_rx = BatchUdp.GROReceiver{};
var udp_ring: lib.net.IoUring.UdpRing = undefined;
const use_udp_ring = blk: {
udp_ring.init(udp_sock.fd) catch break :blk false;
break :blk true;
};
defer if (use_udp_ring) udp_ring.deinit();
if (use_udp_ring) {
const sqpoll_msg = if (udp_ring.sqpoll) "SQPOLL" else "submit";
const buffer_msg = if (udp_ring.registered_buffers) "registered buffers" else "unregistered buffers";
writeFormatted(stdout, " io_uring UDP: recvmsg/sendmsg ring active ({s}, {s})\n", .{ sqpoll_msg, buffer_msg }) catch {};
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startup log says "recvmsg/sendmsg ring active", but the control-plane still sends via udp_sock.sendTo(...) (regular sendto) rather than UdpRing.sendTo, so "sendmsg" isn't actually active here. Consider adjusting the message (e.g., "recvmsg ring active") or switching control-plane sends to the ring if intended.

Suggested change
writeFormatted(stdout, " io_uring UDP: recvmsg/sendmsg ring active ({s}, {s})\n", .{ sqpoll_msg, buffer_msg }) catch {};
writeFormatted(stdout, " io_uring UDP: recvmsg ring active ({s}, {s})\n", .{ sqpoll_msg, buffer_msg }) catch {};

Copilot uses AI. Check for mistakes.
} else {
writeFormatted(stdout, " io_uring UDP: unavailable, using poll+recvmsg path\n", .{}) catch {};
}

const MAX_DECRYPTED = 64;
var decrypt_storage: [MAX_DECRYPTED][1500]u8 = undefined;
var decrypt_lens: [MAX_DECRYPTED]usize = undefined;
var decrypt_slots: [MAX_DECRYPTED]usize = undefined;
var cqe_buffer: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined;
const UDP_RING_IDLE_BACKOFF_NS = 100 * std.time.ns_per_us;

while (swim.running.load(.acquire)) {
var fds = [_]posix.pollfd{
.{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 },
};
_ = posix.poll(&fds, 50) catch continue;

if (fds[0].revents & posix.POLL.IN != 0) {
if (use_udp_ring) {
const n_cqes = udp_ring.copyCompletions(&cqe_buffer, 0) catch 0;
var n_decrypted: usize = 0;
Comment on lines +3234 to 3236
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

udp_ring.copyCompletions(... ) catch 0 silently suppresses io_uring errors and treats them as "no CQEs", which can leave the loop spinning/backing off with no visibility and no fallback. Consider handling the error explicitly (log once and disable use_udp_ring / fall back to poll()), so failures don't get masked.

Copilot uses AI. Check for mistakes.

// Drain loop: process all pending GRO batches before returning to poll()
while (true) {
const total_bytes = gro_rx.recvGRO(udp_sock.fd);
if (total_bytes == 0) break; // EAGAIN — socket drained
for (cqe_buffer[0..n_cqes]) |cqe| {
udp_ring.noteSendCompletion(cqe);
const recv = udp_ring.recvCompletion(cqe) orelse {
if (lib.net.IoUring.UdpRing.recvSlotFromUserData(cqe.user_data)) |slot| {
udp_ring.resubmitRecv(slot, udp_sock.fd) catch {};
}
continue;
};

const sender = gro_rx.getSender();
const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes;
const total_bytes = recv.data.len;
const seg_size: usize = if (recv.segment_size > 0) recv.segment_size else total_bytes;

var offset: usize = 0;
while (offset < total_bytes) {
const remaining = total_bytes - offset;
const pkt_len = @min(seg_size, remaining);
const pkt = gro_rx.buf[offset..][0..pkt_len];
const pkt = recv.data[offset..][0..pkt_len];

processIncomingPacket(
pkt,
sender.addr,
sender.port,
recv.sender_addr,
recv.sender_port,
wg_dev,
swim,
udp_sock,
Expand All @@ -3255,6 +3271,8 @@ fn userspaceEventLoop(
offset += pkt_len;
}

udp_ring.resubmitRecv(recv.slot, udp_sock.fd) catch {};

// Flush decrypted packets to TUN if buffer is filling up
if (n_decrypted >= MAX_DECRYPTED - 44) {
if (n_decrypted > 0) {
Expand All @@ -3270,6 +3288,11 @@ fn userspaceEventLoop(
}
}

// Short backoff avoids spinning while keeping control-plane latency below the
// 50ms poll fallback timeout when the SQPOLL thread has no completions ready.
// The fallback branch blocks in poll(), so it does not need this extra backoff.
if (n_cqes == 0) sleepNs(UDP_RING_IDLE_BACKOFF_NS);

// Final flush of remaining decrypted packets
if (n_decrypted > 0) {
if (tun_dev.vnet_hdr) {
Expand All @@ -3280,6 +3303,73 @@ fn userspaceEventLoop(
}
}
}
} else {
var fds = [_]posix.pollfd{
.{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 },
};
_ = posix.poll(&fds, 50) catch continue;

if (fds[0].revents & posix.POLL.IN != 0) {
var n_decrypted: usize = 0;

// Drain loop: process all pending GRO batches before returning to poll()
while (true) {
const total_bytes = gro_rx.recvGRO(udp_sock.fd);
if (total_bytes == 0) break; // EAGAIN — socket drained

const sender = gro_rx.getSender();
const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes;

var offset: usize = 0;
while (offset < total_bytes) {
const remaining = total_bytes - offset;
const pkt_len = @min(seg_size, remaining);
const pkt = gro_rx.buf[offset..][0..pkt_len];

processIncomingPacket(
pkt,
sender.addr,
sender.port,
wg_dev,
swim,
udp_sock,
stdout,
&decrypt_storage,
&decrypt_lens,
&decrypt_slots,
&n_decrypted,
service_filter,
);

offset += pkt_len;
}

// Flush decrypted packets to TUN if buffer is filling up
if (n_decrypted >= MAX_DECRYPTED - 44) {
if (n_decrypted > 0) {
if (tun_dev.vnet_hdr) {
writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted);
} else {
for (0..n_decrypted) |d| {
writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]);
}
}
n_decrypted = 0;
}
}
}

// Final flush of remaining decrypted packets
if (n_decrypted > 0) {
if (tun_dev.vnet_hdr) {
writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted);
} else {
for (0..n_decrypted) |d| {
writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]);
}
}
}
}
}

swim.tickTimersOnly();
Expand Down
Loading
Loading