-
Notifications
You must be signed in to change notification settings - Fork 0
Add io_uring UDP event loop path #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cbe077a
eb6b8a6
6229a52
e204c64
8b45c7b
263e5f0
969d5c0
6779ef6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3209,38 +3209,54 @@ fn userspaceEventLoop( | |
| posix.setsockopt(udp_sock.fd, posix.SOL.SOCKET, SO_BUSY_POLL, std.mem.asBytes(&busy_poll_us)) catch {}; | ||
|
|
||
| var gro_rx = BatchUdp.GROReceiver{}; | ||
| var udp_ring: lib.net.IoUring.UdpRing = undefined; | ||
| const use_udp_ring = blk: { | ||
| udp_ring.init(udp_sock.fd) catch break :blk false; | ||
| break :blk true; | ||
| }; | ||
| defer if (use_udp_ring) udp_ring.deinit(); | ||
| if (use_udp_ring) { | ||
| const sqpoll_msg = if (udp_ring.sqpoll) "SQPOLL" else "submit"; | ||
| const buffer_msg = if (udp_ring.registered_buffers) "registered buffers" else "unregistered buffers"; | ||
| writeFormatted(stdout, " io_uring UDP: recvmsg/sendmsg ring active ({s}, {s})\n", .{ sqpoll_msg, buffer_msg }) catch {}; | ||
| } else { | ||
| writeFormatted(stdout, " io_uring UDP: unavailable, using poll+recvmsg path\n", .{}) catch {}; | ||
| } | ||
|
|
||
| const MAX_DECRYPTED = 64; | ||
| var decrypt_storage: [MAX_DECRYPTED][1500]u8 = undefined; | ||
| var decrypt_lens: [MAX_DECRYPTED]usize = undefined; | ||
| var decrypt_slots: [MAX_DECRYPTED]usize = undefined; | ||
| var cqe_buffer: [lib.net.IoUring.UdpRing.RECV_DEPTH + lib.net.IoUring.UdpRing.SEND_DEPTH]std.os.linux.io_uring_cqe = undefined; | ||
| const UDP_RING_IDLE_BACKOFF_NS = 100 * std.time.ns_per_us; | ||
|
|
||
| while (swim.running.load(.acquire)) { | ||
| var fds = [_]posix.pollfd{ | ||
| .{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 }, | ||
| }; | ||
| _ = posix.poll(&fds, 50) catch continue; | ||
|
|
||
| if (fds[0].revents & posix.POLL.IN != 0) { | ||
| if (use_udp_ring) { | ||
| const n_cqes = udp_ring.copyCompletions(&cqe_buffer, 0) catch 0; | ||
| var n_decrypted: usize = 0; | ||
|
Comment on lines
+3234
to
3236
|
||
|
|
||
| // Drain loop: process all pending GRO batches before returning to poll() | ||
| while (true) { | ||
| const total_bytes = gro_rx.recvGRO(udp_sock.fd); | ||
| if (total_bytes == 0) break; // EAGAIN — socket drained | ||
| for (cqe_buffer[0..n_cqes]) |cqe| { | ||
| udp_ring.noteSendCompletion(cqe); | ||
| const recv = udp_ring.recvCompletion(cqe) orelse { | ||
| if (lib.net.IoUring.UdpRing.recvSlotFromUserData(cqe.user_data)) |slot| { | ||
| udp_ring.resubmitRecv(slot, udp_sock.fd) catch {}; | ||
| } | ||
| continue; | ||
| }; | ||
|
|
||
| const sender = gro_rx.getSender(); | ||
| const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes; | ||
| const total_bytes = recv.data.len; | ||
| const seg_size: usize = if (recv.segment_size > 0) recv.segment_size else total_bytes; | ||
|
|
||
| var offset: usize = 0; | ||
| while (offset < total_bytes) { | ||
| const remaining = total_bytes - offset; | ||
| const pkt_len = @min(seg_size, remaining); | ||
| const pkt = gro_rx.buf[offset..][0..pkt_len]; | ||
| const pkt = recv.data[offset..][0..pkt_len]; | ||
|
|
||
| processIncomingPacket( | ||
| pkt, | ||
| sender.addr, | ||
| sender.port, | ||
| recv.sender_addr, | ||
| recv.sender_port, | ||
| wg_dev, | ||
| swim, | ||
| udp_sock, | ||
|
|
@@ -3255,6 +3271,8 @@ fn userspaceEventLoop( | |
| offset += pkt_len; | ||
| } | ||
|
|
||
| udp_ring.resubmitRecv(recv.slot, udp_sock.fd) catch {}; | ||
|
|
||
| // Flush decrypted packets to TUN if buffer is filling up | ||
| if (n_decrypted >= MAX_DECRYPTED - 44) { | ||
| if (n_decrypted > 0) { | ||
|
|
@@ -3270,6 +3288,11 @@ fn userspaceEventLoop( | |
| } | ||
| } | ||
|
|
||
| // Short backoff avoids spinning while keeping control-plane latency below the | ||
| // 50ms poll fallback timeout when the SQPOLL thread has no completions ready. | ||
| // The fallback branch blocks in poll(), so it does not need this extra backoff. | ||
| if (n_cqes == 0) sleepNs(UDP_RING_IDLE_BACKOFF_NS); | ||
|
|
||
| // Final flush of remaining decrypted packets | ||
| if (n_decrypted > 0) { | ||
| if (tun_dev.vnet_hdr) { | ||
|
|
@@ -3280,6 +3303,73 @@ fn userspaceEventLoop( | |
| } | ||
| } | ||
| } | ||
| } else { | ||
| var fds = [_]posix.pollfd{ | ||
| .{ .fd = udp_sock.fd, .events = posix.POLL.IN, .revents = 0 }, | ||
| }; | ||
| _ = posix.poll(&fds, 50) catch continue; | ||
|
|
||
| if (fds[0].revents & posix.POLL.IN != 0) { | ||
| var n_decrypted: usize = 0; | ||
|
|
||
| // Drain loop: process all pending GRO batches before returning to poll() | ||
| while (true) { | ||
| const total_bytes = gro_rx.recvGRO(udp_sock.fd); | ||
| if (total_bytes == 0) break; // EAGAIN — socket drained | ||
|
|
||
| const sender = gro_rx.getSender(); | ||
| const seg_size: usize = if (gro_rx.segment_size > 0) gro_rx.segment_size else total_bytes; | ||
|
|
||
| var offset: usize = 0; | ||
| while (offset < total_bytes) { | ||
| const remaining = total_bytes - offset; | ||
| const pkt_len = @min(seg_size, remaining); | ||
| const pkt = gro_rx.buf[offset..][0..pkt_len]; | ||
|
|
||
| processIncomingPacket( | ||
| pkt, | ||
| sender.addr, | ||
| sender.port, | ||
| wg_dev, | ||
| swim, | ||
| udp_sock, | ||
| stdout, | ||
| &decrypt_storage, | ||
| &decrypt_lens, | ||
| &decrypt_slots, | ||
| &n_decrypted, | ||
| service_filter, | ||
| ); | ||
|
|
||
| offset += pkt_len; | ||
| } | ||
|
|
||
| // Flush decrypted packets to TUN if buffer is filling up | ||
| if (n_decrypted >= MAX_DECRYPTED - 44) { | ||
| if (n_decrypted > 0) { | ||
| if (tun_dev.vnet_hdr) { | ||
| writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted); | ||
| } else { | ||
| for (0..n_decrypted) |d| { | ||
| writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]); | ||
| } | ||
| } | ||
| n_decrypted = 0; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Final flush of remaining decrypted packets | ||
| if (n_decrypted > 0) { | ||
| if (tun_dev.vnet_hdr) { | ||
| writeCoalescedToTun(&tun_dev, &decrypt_storage, &decrypt_lens, n_decrypted); | ||
| } else { | ||
| for (0..n_decrypted) |d| { | ||
| writeFdNoErr(tun_dev.fd, decrypt_storage[d][0..decrypt_lens[d]]); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| swim.tickTimersOnly(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The startup log says "recvmsg/sendmsg ring active", but the control-plane still sends via
udp_sock.sendTo(...)(regularsendto) rather thanUdpRing.sendTo, so "sendmsg" isn't actually active here. Consider adjusting the message (e.g., "recvmsg ring active") or switching control-plane sends to the ring if intended.