Skip to content

Commit f25f54e

Browse files
committed
Shard session forwarding workers and raise default to 16
1 parent 39071c8 commit f25f54e

2 files changed

Lines changed: 160 additions & 72 deletions

File tree

crates/bonded-server/src/main.rs

Lines changed: 157 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use tokio::sync::mpsc;
3636
use tokio_rustls::TlsAcceptor;
3737
use tracing::{error, info, warn, Level};
3838

39+
const FORWARD_WORKER_SHARDS: usize = 16;
40+
3941
#[derive(Debug, Parser)]
4042
#[command(name = "bonded-server")]
4143
struct Args {
@@ -272,6 +274,15 @@ async fn run_websocket_server(
272274
let (udp_tx, mut udp_rx) = mpsc::unbounded_channel();
273275
let udp_sessions =
274276
UdpSessionManager::new(handle.session_id, udp_tx, udp_tracker.clone());
277+
let (forward_shards, mut forward_responses) = spawn_forward_workers(
278+
handle.session_id,
279+
upstream_tcp_target.clone(),
280+
tcp_flows.clone(),
281+
udp_sessions.clone(),
282+
tcp_tracker.clone(),
283+
icmp_tracker.clone(),
284+
"websocket",
285+
);
275286

276287
loop {
277288
tokio::select! {
@@ -291,6 +302,28 @@ async fn run_websocket_server(
291302
break;
292303
}
293304
}
305+
maybe_forwarded_frame = forward_responses.recv() => {
306+
let Some(forwarded_frame) = maybe_forwarded_frame else {
307+
warn!(
308+
peer = %peer,
309+
public_key = %public_key,
310+
session_id = handle.session_id,
311+
"websocket forward response queue closed"
312+
);
313+
break;
314+
};
315+
316+
if let Err(err) = transport.send(forwarded_frame).await {
317+
warn!(
318+
peer = %peer,
319+
public_key = %public_key,
320+
session_id = handle.session_id,
321+
error = %err,
322+
"failed to return websocket forwarded frame"
323+
);
324+
break;
325+
}
326+
}
294327
recv_result = transport.recv() => {
295328
match recv_result {
296329
Ok(frame) => {
@@ -332,45 +365,15 @@ async fn run_websocket_server(
332365
continue;
333366
}
334367

335-
let response = match forward_frame(
336-
frame,
337-
if upstream_tcp_target.is_empty() {
338-
None
339-
} else {
340-
Some(upstream_tcp_target.as_str())
341-
},
342-
&tcp_flows,
343-
&udp_sessions,
344-
&tcp_tracker,
345-
&icmp_tracker,
346-
handle.session_id,
347-
)
348-
.await
349-
{
350-
Ok(value) => value,
351-
Err(err) => {
352-
warn!(
353-
peer = %peer,
354-
public_key = %public_key,
355-
session_id = handle.session_id,
356-
error = %err,
357-
"failed to forward websocket session frame"
358-
);
359-
continue;
360-
}
361-
};
362-
363-
let Some(response) = response else {
364-
continue;
365-
};
366-
367-
if let Err(err) = transport.send(response).await {
368+
let shard_idx = shard_index_for_connection(frame.header.connection_id);
369+
if let Err(err) = forward_shards[shard_idx].send(frame) {
368370
warn!(
369371
peer = %peer,
370372
public_key = %public_key,
371373
session_id = handle.session_id,
372-
error = %err,
373-
"failed to return websocket forwarded frame"
374+
error = ?err,
375+
shard_idx,
376+
"failed to enqueue websocket frame for forwarding"
374377
);
375378
break;
376379
}
@@ -389,6 +392,7 @@ async fn run_websocket_server(
389392
}
390393
}
391394
}
395+
drop(forward_shards);
392396

393397
sessions.unregister_client(&public_key);
394398
udp_tracker.clear_session(handle.session_id);
@@ -504,6 +508,15 @@ async fn run_server(
504508
let (udp_tx, mut udp_rx) = mpsc::unbounded_channel();
505509
let udp_sessions =
506510
UdpSessionManager::new(handle.session_id, udp_tx, udp_tracker.clone());
511+
let (forward_shards, mut forward_responses) = spawn_forward_workers(
512+
handle.session_id,
513+
upstream_tcp_target.clone(),
514+
tcp_flows.clone(),
515+
udp_sessions.clone(),
516+
tcp_tracker.clone(),
517+
icmp_tracker.clone(),
518+
"naive-tcp",
519+
);
507520
loop {
508521
tokio::select! {
509522
maybe_udp_frame = udp_rx.recv() => {
@@ -522,6 +535,28 @@ async fn run_server(
522535
break;
523536
}
524537
}
538+
maybe_forwarded_frame = forward_responses.recv() => {
539+
let Some(forwarded_frame) = maybe_forwarded_frame else {
540+
warn!(
541+
peer = %peer,
542+
public_key = %public_key,
543+
session_id = handle.session_id,
544+
"forward response queue closed"
545+
);
546+
break;
547+
};
548+
549+
if let Err(err) = transport.send(forwarded_frame).await {
550+
warn!(
551+
peer = %peer,
552+
public_key = %public_key,
553+
session_id = handle.session_id,
554+
error = %err,
555+
"failed to return forwarded frame"
556+
);
557+
break;
558+
}
559+
}
525560
recv_result = transport.recv() => {
526561
match recv_result {
527562
Ok(frame) => {
@@ -563,45 +598,15 @@ async fn run_server(
563598
continue;
564599
}
565600

566-
let response = match forward_frame(
567-
frame,
568-
if upstream_tcp_target.is_empty() {
569-
None
570-
} else {
571-
Some(upstream_tcp_target.as_str())
572-
},
573-
&tcp_flows,
574-
&udp_sessions,
575-
&tcp_tracker,
576-
&icmp_tracker,
577-
handle.session_id,
578-
)
579-
.await
580-
{
581-
Ok(value) => value,
582-
Err(err) => {
583-
warn!(
584-
peer = %peer,
585-
public_key = %public_key,
586-
session_id = handle.session_id,
587-
error = %err,
588-
"failed to forward session frame"
589-
);
590-
continue;
591-
}
592-
};
593-
594-
let Some(response) = response else {
595-
continue;
596-
};
597-
598-
if let Err(err) = transport.send(response).await {
601+
let shard_idx = shard_index_for_connection(frame.header.connection_id);
602+
if let Err(err) = forward_shards[shard_idx].send(frame) {
599603
warn!(
600604
peer = %peer,
601605
public_key = %public_key,
602606
session_id = handle.session_id,
603-
error = %err,
604-
"failed to return forwarded frame"
607+
error = ?err,
608+
shard_idx,
609+
"failed to enqueue frame for forwarding"
605610
);
606611
break;
607612
}
@@ -620,6 +625,7 @@ async fn run_server(
620625
}
621626
}
622627
}
628+
drop(forward_shards);
623629

624630
sessions.unregister_client(&public_key);
625631
udp_tracker.clear_session(handle.session_id);
@@ -634,6 +640,87 @@ async fn run_server(
634640
}
635641
}
636642

643+
fn shard_index_for_connection(connection_id: u32) -> usize {
644+
(connection_id as usize) % FORWARD_WORKER_SHARDS
645+
}
646+
647+
fn spawn_forward_workers(
648+
session_id: u64,
649+
upstream_tcp_target: String,
650+
tcp_flows: TcpFlowTable,
651+
udp_sessions: UdpSessionManager,
652+
tcp_tracker: TcpSessionTracker,
653+
icmp_tracker: IcmpSessionTracker,
654+
transport_label: &'static str,
655+
) -> (
656+
Vec<mpsc::UnboundedSender<SessionFrame>>,
657+
mpsc::UnboundedReceiver<SessionFrame>,
658+
) {
659+
let (response_tx, response_rx) = mpsc::unbounded_channel();
660+
let upstream = if upstream_tcp_target.trim().is_empty() {
661+
None
662+
} else {
663+
Some(upstream_tcp_target)
664+
};
665+
666+
let mut shard_senders = Vec::with_capacity(FORWARD_WORKER_SHARDS);
667+
for shard_idx in 0..FORWARD_WORKER_SHARDS {
668+
let (tx, mut rx) = mpsc::unbounded_channel::<SessionFrame>();
669+
shard_senders.push(tx);
670+
671+
let response_tx = response_tx.clone();
672+
let tcp_flows = tcp_flows.clone();
673+
let udp_sessions = udp_sessions.clone();
674+
let tcp_tracker = tcp_tracker.clone();
675+
let icmp_tracker = icmp_tracker.clone();
676+
let upstream = upstream.clone();
677+
678+
tokio::spawn(async move {
679+
while let Some(frame) = rx.recv().await {
680+
let response = match forward_frame(
681+
frame,
682+
upstream.as_deref(),
683+
&tcp_flows,
684+
&udp_sessions,
685+
&tcp_tracker,
686+
&icmp_tracker,
687+
session_id,
688+
)
689+
.await
690+
{
691+
Ok(value) => value,
692+
Err(err) => {
693+
warn!(
694+
session_id,
695+
shard_idx,
696+
transport = transport_label,
697+
error = %err,
698+
"failed to forward session frame"
699+
);
700+
continue;
701+
}
702+
};
703+
704+
let Some(response) = response else {
705+
continue;
706+
};
707+
708+
if response_tx.send(response).is_err() {
709+
warn!(
710+
session_id,
711+
shard_idx,
712+
transport = transport_label,
713+
"forward response receiver dropped"
714+
);
715+
break;
716+
}
717+
}
718+
});
719+
}
720+
721+
(shard_senders, response_rx)
722+
}
723+
637724
fn apply_env_overrides<F>(cfg: &mut ServerConfig, mut read_env: F)
638725
where
639726
F: FnMut(&str) -> Option<String>,

docs/design/implementation-plan.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Implementation Plan — Server, Linux Client, Android Client
22

33
**Status:** In Progress
4-
**Last Updated:** 2026-04-06 (session 25)
4+
**Last Updated:** 2026-04-08 (session 26)
55

66
This is a living document. Update the status column and notes as work progresses.
77

@@ -254,7 +254,7 @@ Build the server binary on top of `bonded-core`.
254254
| 2.1 | Server config loading (env vars + config file) | completed | Server loads TOML via `BONDED_CONFIG`/`--config`, falls back to defaults on read failure, applies env overrides for bind/public/health/log/key paths, and now deserializes partial `server.toml` files by filling missing options from defaults |
255255
| 2.2 | Authorized keys file — load, watch for changes, reload | completed | Added server authorized key store loading from TOML plus `notify` watcher callbacks; hardened watcher to ignore non-mutating access events and debounce rapid bursts to avoid self-triggered tight reload loops; server startup pre-creates missing state files/directories so operators only need to provide `server.toml` |
256256
| 2.3 | Accept NaiveTCP connections, perform auth handshake | completed | Added NaiveTCP listener accept loop and line-delimited JSON challenge-signature handshake with authorized-key enforcement |
257-
| 2.4 | Server-side session management (multiple concurrent clients) | completed | Added concurrent session registry keyed by authenticated client key with unique server session IDs and per-connection frame receive loop lifecycle |
257+
| 2.4 | Server-side session management (multiple concurrent clients) | completed | Added concurrent session registry keyed by authenticated client key with unique server session IDs and per-connection frame receive loop lifecycle; improved per-session runtime by offloading frame forwarding into sharded worker queues so slow flow forwarding no longer blocks the transport receive loop |
258258
| 2.5 | IP packet forwarding — read from session, write to internet (TUN or raw socket) | completed | Added user-space internet egress for IPv4+UDP and IPv4 ICMP echo frames: UDP payloads are relayed via `UdpSocket`; ICMP echo requests are relayed via IPv4 ICMP datagram sockets (`socket2`) with echo-id/sequence matching; retains optional upstream TCP relay fallback for non-IP payloads |
259259
| 2.6 | Return traffic — read from internet, write back to correct client session | completed | Added checksum-correct IPv4 response synthesis for UDP and ICMP echo reply traffic, and wired `forward_frame` to return `None` on per-protocol timeout/no-response so tunneled packets are not spuriously echoed |
260260
| 2.7 | Invite token creation (on admin request / startup) | completed | Added startup invite-token bootstrap that reuses existing usable token or creates/persists a new single-use token |
@@ -440,6 +440,7 @@ Decisions made during implementation that aren't in the requirements docs.
440440
| Server frame forwarder now handles IPv4 ICMP echo request/reply in addition to UDP | 2026-04-03 | Uses Linux-compatible IPv4 ICMP datagram sockets through `socket2`, matches echo identifier/sequence, and synthesizes IPv4 ICMP reply packets with recomputed checksums for client return path |
441441
| UDP forwarding now uses per-client-session long-lived flow sockets with 4-minute idle expiry | 2026-04-06 | Each UDP 4-tuple creates/reuses a connected ephemeral socket; server pushes all remote datagrams back to client asynchronously until no client packet is seen for 4 minutes |
442442
| Server exposes a lightweight status HTML endpoint on a dedicated bind (`status_bind`) | 2026-04-06 | Page auto-refreshes and reports authenticated sessions plus active UDP/TCP flow tables and recent ICMP outcomes to aid runtime diagnostics during tunnel bring-up |
443+
| Per-session frame forwarding now runs through sharded async workers and a response queue (16 shards by connection ID) | 2026-04-08 | Reduces head-of-line blocking where one slow forward (for example upstream TCP timeout) could stall unrelated flows in the same session; preserves serialized transport writes in the main session loop |
443444

444445
---
445446

0 commit comments

Comments
 (0)