From 0cbc3d1c1085522ea54775631b46bc6cc6950ed3 Mon Sep 17 00:00:00 2001 From: Jeremy Hiatt Date: Sun, 28 Dec 2025 15:21:44 -0800 Subject: [PATCH 1/4] DO NOT MERGE use patches This is only for convenience in developing with cross-repo deps. --- Cargo.lock | 12 ++++-------- Cargo.toml | 6 ++++++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4242195b..6711c4a3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3748,8 +3748,6 @@ dependencies = [ [[package]] name = "web-transport-proto" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b5400535d6dd4c07dc86e83651a838fd513de7f5011d4e4eafa239fa4d0ded4" dependencies = [ "bytes", "http", @@ -3761,8 +3759,6 @@ dependencies = [ [[package]] name = "web-transport-quinn" version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91815d3170c715230c94b5107a71ccf81646513e548ee1408c3ce285d021d6ca" dependencies = [ "bytes", "futures", @@ -3781,8 +3777,6 @@ dependencies = [ [[package]] name = "web-transport-trait" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f4bafa8c6ff708042f67ef8031ca0f342822fd785b70f36a4b2c014760fc442" dependencies = [ "bytes", ] @@ -3790,8 +3784,6 @@ dependencies = [ [[package]] name = "web-transport-ws" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7690a94b7f9e843f6b07be25a03bdec6356b909840b52961c249c0bd75df6564" dependencies = [ "bytes", "futures", @@ -4261,3 +4253,7 @@ dependencies = [ "quote", "syn", ] + +[[patch.unused]] +name = "web-transport-any" +version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5568d6f94..d5ea0e6b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,12 @@ web-transport-quinn = "0.10" web-transport-trait = "0.3" web-transport-ws = "0.2" +[patch.crates-io] +web-transport-any = { path = "../web-transport/web-transport-any" } +web-transport-quinn = { path = "../web-transport/web-transport-quinn" } +web-transport-trait = { path = "../web-transport/web-transport-trait" } +web-transport-ws = { path = "../web-transport/web-transport-ws" } + [profile.dev] panic = "abort" From 8ef80af95d0a254733e20859137b28344e0a4349 Mon Sep 17 00:00:00 2001 From: Jeremy Hiatt Date: Tue, 16 Dec 2025 17:59:08 -0800 Subject: [PATCH 2/4] moq-native: Factor out helper function We'll get a cleaner diff when we add another connection method to the struct in a future commit. --- rs/moq-native/src/client.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index 2eb36d5e2..8f917230e 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -138,7 +138,11 @@ impl Client { Ok(Self { quic, tls, transport }) } - pub async fn connect(&self, mut url: Url) -> anyhow::Result { + pub async fn connect(&self, url: Url) -> anyhow::Result { + self.connect_quic(url).await + } + + async fn connect_quic(&self, mut url: Url) -> anyhow::Result { let mut config = self.tls.clone(); let host = url.host().context("invalid DNS name")?.to_string(); From cd2d37ce61d4d28e0e5f619ebc1b856ec9b6a5ae Mon Sep 17 00:00:00 2001 From: Jeremy Hiatt Date: Sun, 30 Nov 2025 15:08:42 -0800 Subject: [PATCH 3/4] moq-native: Add fallback to WebSocket If desired, use Client::connect_with_fallback() to use a WebSocket-based transport when the QUIC connection fails. The logic mirrors the TypeScript implementation by affording the QUIC codepath a small (configurable) headstart. --- Cargo.lock | 21 ++++- Cargo.toml | 1 + rs/moq-native/Cargo.toml | 8 ++ rs/moq-native/src/client.rs | 169 +++++++++++++++++++++++++++++++++++- rs/moq-native/src/lib.rs | 1 + 5 files changed, 195 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6711c4a3c..2aa3ea481 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1785,7 +1785,9 @@ dependencies = [ "console-subscriber", "futures", "hex", + "humantime", "moq-lite", + "once_cell", "parking_lot", "quinn", "rcgen", @@ -1798,10 +1800,15 @@ dependencies = [ "serde_with", "time", "tokio", + "tokio-tungstenite 0.24.0", + "tokio-util", "tracing", "tracing-subscriber", "url", + "web-transport-any", "web-transport-quinn", + "web-transport-trait", + "web-transport-ws", ] [[package]] @@ -3745,6 +3752,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-transport-any" +version = "0.1.0" +dependencies = [ + "bytes", + "web-transport-quinn", + "web-transport-trait", + "web-transport-ws", +] + [[package]] name = "web-transport-proto" version = "0.3.0" @@ -4253,7 +4270,3 @@ dependencies = [ "quote", "syn", ] - -[[patch.unused]] -name = "web-transport-any" -version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d5ea0e6b0..6688d654e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ moq-token = { version = "0.5", path = "rs/moq-token" } serde = { version = "1", features = ["derive"] } tokio = "1.48" web-async = { version = "0.1.1", features = ["tracing"] } +web-transport-any = "0.1" web-transport-quinn = "0.10" web-transport-trait = "0.3" web-transport-ws = "0.2" diff --git a/rs/moq-native/Cargo.toml b/rs/moq-native/Cargo.toml index 39d940272..9f76961b8 100644 --- a/rs/moq-native/Cargo.toml +++ b/rs/moq-native/Cargo.toml @@ -19,10 +19,13 @@ tokio-console = ["dep:console-subscriber"] [dependencies] anyhow = { version = "1", features = ["backtrace"] } +bytes = "1" clap = { version = "4", features = ["derive", "env"] } console-subscriber = { version = "0.5", optional = true } futures = "0.3" hex = "0.4" +humantime = "2.3" +once_cell = "1.20" moq-lite = { workspace = true } parking_lot = { version = "0.12", features = ["deadlock_detection"] } @@ -41,10 +44,15 @@ serde = { version = "1", features = ["derive"] } serde_with = "3" time = "0.3" tokio = { workspace = true, features = ["full"] } +tokio-tungstenite = "0.24" +tokio-util = "0.7" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2" +web-transport-any = { workspace = true } web-transport-quinn = { workspace = true } +web-transport-trait = { workspace = true } +web-transport-ws = { workspace = true } [dev-dependencies] anyhow = "1" diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index 8f917230e..470e3aa85 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -1,10 +1,38 @@ use crate::crypto; use anyhow::Context; +use once_cell::sync::Lazy; use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use rustls::RootCertStore; +use std::collections::HashSet; use std::path::PathBuf; +use std::sync::Mutex; use std::{fs, io, net, sync::Arc, time}; use url::Url; +use web_transport_any::WebTransportSessionAny; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct ServerCacheKey(String, u16); + +impl std::fmt::Display for ServerCacheKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}", self.0, self.1) + } +} + +// Track servers (hostname:port) where WebSocket won the race, so we won't give QUIC a headstart next time +// Keyed by "hostname:port" string (e.g., "relay.example.com:443") +static WEBSOCKET_WON: Lazy>> = Lazy::new(|| Mutex::new(HashSet::new())); + +// Helper function to extract hostname:port key from URL +fn server_key(url: &Url) -> Option { + let host = url.host_str()?; + let port = url.port().unwrap_or_else(|| match url.scheme() { + "https" | "wss" | "moql" | "moqt" => 443, + "http" | "ws" => 80, + _ => 443, + }); + Some(ServerCacheKey(host.to_string(), port)) +} #[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)] #[serde(default, deny_unknown_fields)] @@ -30,6 +58,41 @@ pub struct ClientTls { pub disable_verify: Option, } +#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct ClientWebSocket { + /// Delay in milliseconds before attempting WebSocket fallback (default: 200) + /// If WebSocket won the previous race for a given server, this will be 0. + #[arg( + id = "websocket-delay", + long = "websocket-delay", + env = "MOQ_CLIENT_WEBSOCKET_DELAY", + default_value = "200ms" + )] + #[serde(deserialize_with = "deserialize_humantime")] + #[serde(serialize_with = "serialize_humantime")] + #[serde(skip_serializing_if = "Option::is_none")] + pub delay: Option, +} + +fn deserialize_humantime<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let buf = ::deserialize(deserializer)?; + + buf.parse::() + .map_err(serde::de::Error::custom) + .map(Some) +} + +fn serialize_humantime(duration: &Option, serializer: S) -> Result +where + S: serde::Serializer, +{ + ::serialize(&duration.unwrap_or_default().to_string(), serializer) +} + #[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)] #[serde(deny_unknown_fields, default)] pub struct ClientConfig { @@ -45,6 +108,10 @@ pub struct ClientConfig { #[command(flatten)] #[serde(default)] pub tls: ClientTls, + + #[command(flatten)] + #[serde(default)] + pub websocket: ClientWebSocket, } impl Default for ClientConfig { @@ -52,6 +119,7 @@ impl Default for ClientConfig { Self { bind: "[::]:0".parse().unwrap(), tls: ClientTls::default(), + websocket: ClientWebSocket::default(), } } } @@ -67,6 +135,7 @@ pub struct Client { pub quic: quinn::Endpoint, pub tls: rustls::ClientConfig, pub transport: Arc, + pub websocket_delay: Option, } impl Client { @@ -135,13 +204,73 @@ impl Client { let quic = quinn::Endpoint::new(endpoint_config, None, socket, runtime).context("failed to create QUIC endpoint")?; - Ok(Self { quic, tls, transport }) + Ok(Self { + quic, + tls, + transport, + websocket_delay: config.websocket.delay.map(Into::into), + }) } pub async fn connect(&self, url: Url) -> anyhow::Result { self.connect_quic(url).await } + pub async fn connect_with_fallback(&self, url: Url) -> anyhow::Result { + // Capture QUIC error so it can be used if both transports fail + let mut quic_error: Option = None; + + // Create futures for both possible protocols + let quic_url = url.clone(); + let quic_handle = async { + match self.connect_quic(quic_url).await { + Ok(session) => Some(session), + Err(err) => { + quic_error = Some(err); + None + } + } + }; + + let ws_handle = async { + let cache_key = server_key(&url); + + // Apply a small penalty to WebSocket to improve odds for QUIC to connect first, + // unless we've already had to fall back to WebSockets for this server. + let websocket_penalty = match &cache_key { + Some(key) if !WEBSOCKET_WON.lock().unwrap().contains(key) => self.websocket_delay, + _ => None, + }; + + if let Some(delay) = websocket_penalty { + tokio::time::sleep(delay).await; + tracing::debug!(url = %url, delay_ms = %delay.as_millis(), "QUIC not yet connected, attempting WebSocket fallback"); + } + + match self.connect_websocket(url).await { + Ok(session) => { + if let Some(cache_key) = cache_key { + tracing::warn!(server = %cache_key, "using WebSocket fallback"); + WEBSOCKET_WON.lock().unwrap().insert(cache_key); + } + Some(session) + } + Err(err) => { + tracing::debug!(%err, "WebSocket connection failed"); + None + } + } + }; + + // Race the connection futures + tokio::select! { + Some(quic_session) = quic_handle => Ok(quic_session.into()), + Some(ws_session) = ws_handle => Ok(ws_session.into()), + // If both attempts fail, return the QUIC error (if available) + else => Err(quic_error.unwrap_or_else(|| anyhow::Error::msg("unknown error"))), + } + } + async fn connect_quic(&self, mut url: Url) -> anyhow::Result { let mut config = self.tls.clone(); @@ -207,6 +336,44 @@ impl Client { Ok(session) } + + async fn connect_websocket(&self, mut url: Url) -> anyhow::Result { + // Convert URL scheme: http:// -> ws://, https:// -> wss:// + let ws_url = match url.scheme() { + "http" => { + url.set_scheme("ws").expect("failed to set scheme"); + url + } + "https" | "moql" | "moqt" => { + url.set_scheme("wss").expect("failed to set scheme"); + url + } + "ws" | "wss" => url, + _ => anyhow::bail!("unsupported URL scheme for WebSocket: {}", url.scheme()), + }; + + tracing::debug!(url = %ws_url, "connecting via WebSocket"); + + // Connect using tokio-tungstenite + let (ws_stream, _response) = tokio_tungstenite::connect_async_with_config( + ws_url.as_str(), + Some(tokio_tungstenite::tungstenite::protocol::WebSocketConfig { + max_message_size: Some(64 << 20), // 64 MB + max_frame_size: Some(16 << 20), // 16 MB + accept_unmasked_frames: false, + ..Default::default() + }), + false, // disable_nagle + ) + .await + .context("failed to connect WebSocket")?; + + // Wrap WebSocket in WebTransport compatibility layer + // Similar to what the relay does: web_transport_ws::Session::new(socket, true) + let session = web_transport_ws::Session::new(ws_stream, false); + + Ok(session) + } } #[derive(Debug)] diff --git a/rs/moq-native/src/lib.rs b/rs/moq-native/src/lib.rs index 0a1b9703a..af3f3ea65 100644 --- a/rs/moq-native/src/lib.rs +++ b/rs/moq-native/src/lib.rs @@ -10,4 +10,5 @@ pub use server::*; // Re-export these crates. pub use moq_lite; pub use rustls; +pub use web_transport_any; pub use web_transport_quinn; From 8e3414d17398b74067af870f7e8ecfdd3aad7d4e Mon Sep 17 00:00:00 2001 From: Jeremy Hiatt Date: Sun, 28 Dec 2025 17:06:13 -0800 Subject: [PATCH 4/4] moq-clock: Update example to use WebSocket fallback --- rs/moq-clock/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index e548f0145..686365c4c 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -48,7 +48,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!(url = ?config.url, "connecting to server"); - let session = client.connect(config.url).await?; + let session = client.connect_with_fallback(config.url).await?; let track = Track { name: config.track,