From 854669443e40cac51f779be6235f5d77a54ee835 Mon Sep 17 00:00:00 2001 From: Thales R Date: Thu, 24 Apr 2025 19:51:13 +0200 Subject: [PATCH 01/20] Replace SDK File struct with StorageEntry for web --- sdks/rust/Cargo.toml | 8 +- sdks/rust/src/credentials.rs | 311 +++++++++++++++++++++-------------- 2 files changed, 194 insertions(+), 125 deletions(-) diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index e1915f98d76..8de5f4c6112 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -28,15 +28,21 @@ bytes.workspace = true flate2.workspace = true futures.workspace = true futures-channel.workspace = true -home.workspace = true http.workspace = true log.workspace = true once_cell.workspace = true prometheus.workspace = true rand.workspace = true + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +home.workspace = true tokio.workspace = true tokio-tungstenite.workspace = true +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen = "0.2.100" +web-sys = { version = "0.3.77", features = [ "Window", "Storage" ] } + [dev-dependencies] # for quickstart-chat and cursive-chat examples hex.workspace = true diff --git a/sdks/rust/src/credentials.rs b/sdks/rust/src/credentials.rs index bdef761048c..497cd21c628 100644 --- a/sdks/rust/src/credentials.rs +++ b/sdks/rust/src/credentials.rs @@ -8,144 +8,207 @@ //! } //! ``` -use home::home_dir; -use spacetimedb_lib::{bsatn, de::Deserialize, ser::Serialize}; -use std::path::PathBuf; -use thiserror::Error; - -const CREDENTIALS_DIR: &str = ".spacetimedb_client_credentials"; - -#[derive(Error, Debug)] -pub enum CredentialFileError { - #[error("Failed to determine user home directory as root for credentials storage")] - DetermineHomeDir, - #[error("Error creating credential storage directory {path}")] - CreateDir { - path: PathBuf, - #[source] - source: std::io::Error, - }, - #[error("Error serializing credentials for storage in file")] - Serialize { - #[source] - source: bsatn::EncodeError, - }, - #[error("Error writing BSATN-serialized credentials to file {path}")] - Write { - path: PathBuf, - #[source] - source: std::io::Error, - }, - #[error("Error reading BSATN-serialized credentials from file {path}")] - Read { - path: PathBuf, - #[source] - source: std::io::Error, - }, - #[error("Error deserializing credentials from bytes stored in file {path}")] - Deserialize { - path: PathBuf, - #[source] - source: bsatn::DecodeError, - }, -} +#[cfg(not(target_arch = "wasm32"))] +mod native_mod { + use home::home_dir; + use spacetimedb_lib::{bsatn, de::Deserialize, ser::Serialize}; + use std::path::PathBuf; + use thiserror::Error; -/// A file on disk which stores, or can store, a JWT for authenticating with SpacetimeDB. -/// -/// The file does not necessarily exist or store credentials. -/// If the credentials have been stored previously, they can be accessed with [`File::load`]. -/// New credentials can be saved to disk with [`File::save`]. -pub struct File { - filename: String, -} + const CREDENTIALS_DIR: &str = ".spacetimedb_client_credentials"; -#[derive(Serialize, Deserialize)] -struct Credentials { - token: String, -} + #[derive(Error, Debug)] + pub enum CredentialFileError { + #[error("Failed to determine user home directory as root for credentials storage")] + DetermineHomeDir, + #[error("Error creating credential storage directory {path}")] + CreateDir { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("Error serializing credentials for storage in file")] + Serialize { + #[source] + source: bsatn::EncodeError, + }, + #[error("Error writing BSATN-serialized credentials to file {path}")] + Write { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("Error reading BSATN-serialized credentials from file {path}")] + Read { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("Error deserializing credentials from bytes stored in file {path}")] + Deserialize { + path: PathBuf, + #[source] + source: bsatn::DecodeError, + }, + } -impl File { - /// Get a handle on a file which stores a SpacetimeDB [`Identity`] and its private access token. + /// A file on disk which stores, or can store, a JWT for authenticating with SpacetimeDB. /// - /// This method does not create the file or check that it exists. - /// - /// Distinct applications running as the same user on the same machine - /// may share [`Identity`]/token pairs by supplying the same `key`. - /// Users who desire distinct credentials for their application - /// should supply a unique `key` per application. - /// - /// No additional namespacing is provided to tie the stored token - /// to a particular SpacetimeDB instance or cluster. - /// Users who intend to connect to multiple instances or clusters - /// should use a distinct `key` per cluster. - pub fn new(key: impl Into) -> Self { - Self { filename: key.into() } + /// The file does not necessarily exist or store credentials. + /// If the credentials have been stored previously, they can be accessed with [`File::load`]. + /// New credentials can be saved to disk with [`File::save`]. + pub struct File { + filename: String, } - fn determine_home_dir() -> Result { - home_dir().ok_or(CredentialFileError::DetermineHomeDir) + #[derive(Serialize, Deserialize)] + struct Credentials { + token: String, } - fn ensure_credentials_dir() -> Result<(), CredentialFileError> { - let mut path = Self::determine_home_dir()?; - path.push(CREDENTIALS_DIR); + impl File { + /// Get a handle on a file which stores a SpacetimeDB [`Identity`] and its private access token. + /// + /// This method does not create the file or check that it exists. + /// + /// Distinct applications running as the same user on the same machine + /// may share [`Identity`]/token pairs by supplying the same `key`. + /// Users who desire distinct credentials for their application + /// should supply a unique `key` per application. + /// + /// No additional namespacing is provided to tie the stored token + /// to a particular SpacetimeDB instance or cluster. + /// Users who intend to connect to multiple instances or clusters + /// should use a distinct `key` per cluster. + pub fn new(key: impl Into) -> Self { + Self { filename: key.into() } + } + + fn determine_home_dir() -> Result { + home_dir().ok_or(CredentialFileError::DetermineHomeDir) + } + + fn ensure_credentials_dir() -> Result<(), CredentialFileError> { + let mut path = Self::determine_home_dir()?; + path.push(CREDENTIALS_DIR); + + std::fs::create_dir_all(&path).map_err(|source| CredentialFileError::CreateDir { path, source }) + } + + fn path(&self) -> Result { + let mut path = Self::determine_home_dir()?; + path.push(CREDENTIALS_DIR); + path.push(&self.filename); + Ok(path) + } + + /// Store the provided `token` to disk in the file referred to by `self`. + /// + /// Future calls to [`Self::load`] on a `File` with the same key can retrieve the token. + /// + /// Expected usage is to call this from a [`super::DbConnectionBuilder::on_connect`] callback. + /// + /// ```ignore + /// DbConnection::builder() + /// .on_connect(|_ctx, _identity, token| { + /// credentials::File::new("my_app").save(token).unwrap(); + /// }) + /// ``` + pub fn save(self, token: impl Into) -> Result<(), CredentialFileError> { + Self::ensure_credentials_dir()?; + + let creds = bsatn::to_vec(&Credentials { token: token.into() }) + .map_err(|source| CredentialFileError::Serialize { source })?; + let path = self.path()?; + std::fs::write(&path, creds).map_err(|source| CredentialFileError::Write { path, source })?; + Ok(()) + } + + /// Load a saved token from disk in the file referred to by `self`, + /// if they have previously been stored by [`Self::save`]. + /// + /// Returns `Err` if I/O fails, + /// `None` if credentials have not previously been stored, + /// or `Some` if credentials are successfully loaded from disk. + /// After unwrapping the `Result`, the returned `Option` can be passed to + /// [`super::DbConnectionBuilder::with_token`]. + /// + /// ```ignore + /// DbConnection::builder() + /// .with_token(credentials::File::new("my_app").load().unwrap()) + /// ``` + pub fn load(self) -> Result, CredentialFileError> { + let path = self.path()?; + + let bytes = match std::fs::read(&path) { + Ok(bytes) => bytes, + Err(e) if matches!(e.kind(), std::io::ErrorKind::NotFound) => return Ok(None), + Err(source) => return Err(CredentialFileError::Read { path, source }), + }; - std::fs::create_dir_all(&path).map_err(|source| CredentialFileError::CreateDir { path, source }) + let creds = bsatn::from_slice::(&bytes) + .map_err(|source| CredentialFileError::Deserialize { path, source })?; + Ok(Some(creds.token)) + } } +} + +#[cfg(target_arch = "wasm32")] +mod web_mod { + use thiserror::Error; + + #[derive(Error, Debug)] + pub enum CredentialStorageError { + #[error("Could not access localStorage")] + LocalStorageAccess, + + #[error("Exception while interacting with localStorage: {0:?}")] + LocalStorageJsError(wasm_bindgen::JsValue), - fn path(&self) -> Result { - let mut path = Self::determine_home_dir()?; - path.push(CREDENTIALS_DIR); - path.push(&self.filename); - Ok(path) + #[error("Window object is not available in this context")] + WindowObjectAccess, } - /// Store the provided `token` to disk in the file referred to by `self`. - /// - /// Future calls to [`Self::load`] on a `File` with the same key can retrieve the token. - /// - /// Expected usage is to call this from a [`super::DbConnectionBuilder::on_connect`] callback. - /// - /// ```ignore - /// DbConnection::builder() - /// .on_connect(|_ctx, _identity, token| { - /// credentials::File::new("my_app").save(token).unwrap(); - /// }) - /// ``` - pub fn save(self, token: impl Into) -> Result<(), CredentialFileError> { - Self::ensure_credentials_dir()?; - - let creds = bsatn::to_vec(&Credentials { token: token.into() }) - .map_err(|source| CredentialFileError::Serialize { source })?; - let path = self.path()?; - std::fs::write(&path, creds).map_err(|source| CredentialFileError::Write { path, source })?; - Ok(()) + /// TODO: Give it an option for 'Local', 'Session', 'Cookie'? + pub struct StorageEntry { + key: String, } - /// Load a saved token from disk in the file referred to by `self`, - /// if they have previously been stored by [`Self::save`]. - /// - /// Returns `Err` if I/O fails, - /// `None` if credentials have not previously been stored, - /// or `Some` if credentials are successfully loaded from disk. - /// After unwrapping the `Result`, the returned `Option` can be passed to - /// [`super::DbConnectionBuilder::with_token`]. - /// - /// ```ignore - /// DbConnection::builder() - /// .with_token(credentials::File::new("my_app").load().unwrap()) - /// ``` - pub fn load(self) -> Result, CredentialFileError> { - let path = self.path()?; - - let bytes = match std::fs::read(&path) { - Ok(bytes) => bytes, - Err(e) if matches!(e.kind(), std::io::ErrorKind::NotFound) => return Ok(None), - Err(source) => return Err(CredentialFileError::Read { path, source }), - }; - - let creds = bsatn::from_slice::(&bytes) - .map_err(|source| CredentialFileError::Deserialize { path, source })?; - Ok(Some(creds.token)) + impl StorageEntry { + pub fn new(key: impl Into) -> Self { + Self { key: key.into() } + } + + pub fn save(&self, token: impl Into) -> Result<(), CredentialStorageError> { + let local_storage = web_sys::window() + .ok_or(CredentialStorageError::WindowObjectAccess)? + .local_storage() + .map_err(CredentialStorageError::LocalStorageJsError)? + .ok_or(CredentialStorageError::LocalStorageAccess)?; + local_storage + .set_item(&self.key, &token.into()) + .map_err(CredentialStorageError::LocalStorageJsError)?; + Ok(()) + } + + pub fn load(&self) -> Result, CredentialStorageError> { + let local_storage = web_sys::window() + .ok_or(CredentialStorageError::WindowObjectAccess)? + .local_storage() + .map_err(CredentialStorageError::LocalStorageJsError)? + .ok_or(CredentialStorageError::LocalStorageAccess)?; + + match local_storage.get_item(&self.key) { + Ok(Some(token)) => Ok(Some(token)), + Ok(None) => Ok(None), + Err(err) => Err(CredentialStorageError::LocalStorageJsError(err)), + } + } } } + +#[cfg(not(target_arch = "wasm32"))] +pub use native_mod::*; + +#[cfg(target_arch = "wasm32")] +pub use web_mod::*; From 27abe950f32c1c53af9210cf5cb4c578e070d5d7 Mon Sep 17 00:00:00 2001 From: Thales R Date: Thu, 24 Apr 2025 20:13:16 +0200 Subject: [PATCH 02/20] Enable web build for the SDK crate - `DbConnectionBuilder::build` becomes async without tokio's block_in_place. Still need to add `web` feature flag. --- Cargo.lock | 52 ++++++++++++ sdks/rust/Cargo.toml | 9 +++ sdks/rust/src/db_connection.rs | 142 ++++++++++++++++++++++++++++++++- sdks/rust/src/websocket.rs | 126 ++++++++++++++++++++++++++++- 4 files changed, 321 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3172c86728..ee71666801a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2546,6 +2546,32 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "gloo-console" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a17868f56b4a24f677b17c8cb69958385102fa879418052d60b50bc1727e261" +dependencies = [ + "gloo-utils", + "js-sys", + "serde", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "gloo-utils" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5555354113b18c547c1d3a98fbf7fb32a9ff4f6fa112ce823a21641a0ba3aa" +dependencies = [ + "js-sys", + "serde", + "serde_json", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "gzip-header" version = "1.0.0" @@ -6809,6 +6835,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ + "web-time", "zeroize", ] @@ -8344,6 +8371,8 @@ dependencies = [ "flate2", "futures", "futures-channel", + "getrandom 0.3.2", + "gloo-console", "hex", "home", "http 1.3.1", @@ -8362,6 +8391,11 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-tungstenite", + "tokio-tungstenite-wasm", + "tungstenite", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", ] [[package]] @@ -9391,6 +9425,24 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-tungstenite-wasm" +version = "0.5.0" +source = "git+https://github.com/thlsrms/tokio-tungstenite-wasm?rev=c788b7cfc30f576c#c788b7cfc30f576c207344c2907932b5317ca5e0" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.3.1", + "httparse", + "js-sys", + "thiserror 2.0.12", + "tokio", + "tokio-tungstenite", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "tokio-util" version = "0.7.16" diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index 8de5f4c6112..2337f3f39fe 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -40,7 +40,16 @@ tokio.workspace = true tokio-tungstenite.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.3.2", features = ["wasm_js"]} +gloo-console = "0.3.0" +rustls-pki-types = { version = "1.11.0", features = ["web"] } +tokio-tungstenite-wasm = { git = "https://github.com/thlsrms/tokio-tungstenite-wasm", rev = "c788b7cfc30f576c" } +tokio = { version = "1.37", default-features = false, features = [ + "rt", "macros", "sync" +] } +tungstenite = { version = "0.26.2", features = ["rustls"] } wasm-bindgen = "0.2.100" +wasm-bindgen-futures = "0.4.45" web-sys = { version = "0.3.77", features = [ "Window", "Storage" ] } [dev-dependencies] diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 3aff9776977..a0e34e9db1f 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -37,11 +37,13 @@ use http::Uri; use spacetimedb_client_api_messages::websocket::{self as ws, common::QuerySetId}; use spacetimedb_lib::{bsatn, ser::Serialize, ConnectionId, Identity, Timestamp}; use spacetimedb_sats::Deserialize; -use std::sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock}; -use tokio::{ - runtime::{self, Runtime}, - sync::Mutex as TokioMutex, +use std::{ + collections::HashMap, + sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock}, }; +use tokio::runtime::{self, Runtime}; +#[cfg(not(target_arch = "wasm32"))] +use tokio::sync::Mutex as TokioMutex; pub(crate) type SharedCell = Arc>; @@ -64,8 +66,12 @@ pub struct DbContextImpl { /// Receiver channel for WebSocket messages, /// which are pre-parsed in the background by [`parse_loop`]. + #[cfg(not(target_arch = "wasm32"))] recv: Arc>>>, + #[cfg(target_arch = "wasm32")] + recv: SharedCell>>, + /// Channel into which operations which apparently mutate SDK state, /// e.g. registering callbacks, push [`PendingMutation`] messages, /// rather than immediately locking the connection and applying their change, @@ -74,8 +80,12 @@ pub struct DbContextImpl { /// Receive end of `pending_mutations_send`, /// from which [Self::apply_pending_mutations] and friends read mutations. + #[cfg(not(target_arch = "wasm32"))] pending_mutations_recv: Arc>>>, + #[cfg(target_arch = "wasm32")] + pending_mutations_recv: SharedCell>>, + /// This connection's `Identity`. /// /// May be `None` if we connected anonymously @@ -309,9 +319,16 @@ impl DbContextImpl { /// Apply all queued [`PendingMutation`]s. fn apply_pending_mutations(&self) -> crate::Result<()> { + #[cfg(not(target_arch = "wasm32"))] while let Ok(Some(pending_mutation)) = self.pending_mutations_recv.blocking_lock().try_next() { self.apply_mutation(pending_mutation)?; } + + #[cfg(target_arch = "wasm32")] + while let Ok(Some(pending_mutation)) = self.pending_mutations_recv.lock().unwrap().try_next() { + self.apply_mutation(pending_mutation)?; + } + Ok(()) } @@ -498,6 +515,7 @@ impl DbContextImpl { /// If no WebSocket messages are in the queue, immediately return `false`. /// /// Called by the autogenerated `DbConnection` method of the same name. + #[cfg(not(target_arch = "wasm32"))] pub fn advance_one_message(&self) -> crate::Result { // Apply any pending mutations before processing a WS message, // so that pending callbacks don't get skipped. @@ -531,14 +549,45 @@ impl DbContextImpl { res } + #[cfg(target_arch = "wasm32")] + pub fn advance_one_message(&self) -> crate::Result { + self.apply_pending_mutations()?; + // Synchronously try to pull one server message + let res = { + let mut chan = self.recv.lock().unwrap(); + match chan.try_next() { + Ok(None) => { + // Shouldn’t happen on unbounded, treat as disconnect + let ctx = self.make_event_ctx(None); + self.invoke_disconnected(&ctx); + Err(crate::Error::Disconnected) + } + Err(_) => Ok(false), + Ok(Some(msg)) => self.process_message(msg).map(|_| true), + } + }; + + // send any pending outgoing mutations now that we've done a read + self.apply_pending_mutations()?; + + res + } + async fn get_message(&self) -> Message { // Holding these locks across the below await can only cause a deadlock if // there are multiple parallel callers of `advance_one_message` or its siblings. // We call this out as an incorrect and unsupported thing to do. #![allow(clippy::await_holding_lock)] + #[cfg(not(target_arch = "wasm32"))] let mut pending_mutations = self.pending_mutations_recv.lock().await; + #[cfg(target_arch = "wasm32")] + let mut pending_mutations = self.pending_mutations_recv.lock().unwrap(); + + #[cfg(not(target_arch = "wasm32"))] let mut recv = self.recv.lock().await; + #[cfg(target_arch = "wasm32")] + let mut recv = self.recv.lock().unwrap(); // Always process pending mutations before WS messages, if they're available, // so that newly registered callbacks run on messages. @@ -841,13 +890,21 @@ You must explicitly advance the connection by calling any one of: Which of these methods you should call depends on the specific needs of your application, but you must call one of them, or else the connection will never progress. "] + #[cfg(not(target_arch = "wasm32"))] pub fn build(self) -> crate::Result { let imp = self.build_impl()?; Ok(::new(imp)) } + #[cfg(target_arch = "wasm32")] + pub async fn build(self) -> crate::Result { + let imp = self.build_impl().await?; + Ok(::new(imp)) + } + /// Open a WebSocket connection, build an empty client cache, &c, /// to construct a [`DbContextImpl`]. + #[cfg(not(target_arch = "wasm32"))] fn build_impl(self) -> crate::Result> { let (runtime, handle) = enter_or_create_runtime()?; let db_callbacks = DbCallbacks::default(); @@ -905,6 +962,64 @@ but you must call one of them, or else the connection will never progress. Ok(ctx_imp) } + #[cfg(target_arch = "wasm32")] + pub async fn build_impl(self) -> crate::Result> { + let (runtime, handle) = enter_or_create_runtime()?; + let db_callbacks = DbCallbacks::default(); + let reducer_callbacks = ReducerCallbacks::default(); + + let connection_id_override = get_connection_id_override(); + let ws_connection = WsConnection::connect( + self.uri.unwrap(), + self.module_name.as_ref().unwrap(), + self.token.as_deref(), + connection_id_override, + self.params, + ) + .await + .map_err(|source| crate::Error::FailedToConnect { + source: InternalError::new("Failed to initiate WebSocket connection").with_cause(source), + })?; + + let (raw_msg_recv, raw_msg_send) = ws_connection.spawn_message_loop(); + let parsed_recv_chan = spawn_parse_loop::(raw_msg_recv, &handle); + + let inner = Arc::new(StdMutex::new(DbContextImplInner { + runtime, + + db_callbacks, + reducer_callbacks, + subscriptions: SubscriptionManager::default(), + + on_connect: self.on_connect, + on_connect_error: self.on_connect_error, + on_disconnect: self.on_disconnect, + call_reducer_flags: <_>::default(), + })); + + let mut cache = ClientCache::default(); + M::register_tables(&mut cache); + let cache = Arc::new(StdMutex::new(cache)); + let send_chan = Arc::new(StdMutex::new(Some(raw_msg_send))); + + let (pending_mutations_send, pending_mutations_recv) = mpsc::unbounded(); + let ctx_imp = DbContextImpl { + runtime: handle, + inner, + send_chan, + cache, + #[cfg(target_arch = "wasm32")] + recv: Arc::new(StdMutex::new(parsed_recv_chan)), + pending_mutations_send, + #[cfg(target_arch = "wasm32")] + pending_mutations_recv: Arc::new(StdMutex::new(pending_mutations_recv)), + identity: Arc::new(StdMutex::new(None)), + connection_id: Arc::new(StdMutex::new(connection_id_override)), + }; + + Ok(ctx_imp) + } + /// Set the URI of the SpacetimeDB host which is running the remote database. /// /// The URI must have either no scheme or one of the schemes `http`, `https`, `ws` or `wss`. @@ -930,6 +1045,7 @@ but you must call one of them, or else the connection will never progress. /// If the passed token is invalid or rejected by the host, /// the connection will fail asynchrnonously. // FIXME: currently this causes `disconnect` to be called rather than `on_connect_error`. + #[cfg(not(target_arch = "wasm32"))] pub fn with_token(mut self, token: Option>) -> Self { self.token = token.map(|token| token.into()); self @@ -1026,12 +1142,19 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal fn enter_or_create_runtime() -> crate::Result<(Option, runtime::Handle)> { match runtime::Handle::try_current() { Err(e) if e.is_missing_context() => { + #[cfg(not(target_arch = "wasm32"))] let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(1) .thread_name("spacetimedb-background-connection") .build() .map_err(|source| InternalError::new("Failed to create Tokio runtime").with_cause(source))?; + #[cfg(target_arch = "wasm32")] + let rt = tokio::runtime::Builder::new_current_thread() + .worker_threads(1) + .thread_name("spacetimedb-background-connection") + .build() + .map_err(|source| InternalError::new("Failed to create Tokio runtime").with_cause(source))?; let handle = rt.handle().clone(); Ok((Some(rt), handle)) @@ -1072,6 +1195,7 @@ enum ParsedMessage { }, } +#[cfg(not(target_arch = "wasm32"))] fn spawn_parse_loop( raw_message_recv: mpsc::UnboundedReceiver, handle: &runtime::Handle, @@ -1081,6 +1205,16 @@ fn spawn_parse_loop( (handle, parsed_message_recv) } +#[cfg(target_arch = "wasm32")] +fn spawn_parse_loop( + raw_message_recv: mpsc::UnboundedReceiver>, + _handle: &runtime::Handle, +) -> mpsc::UnboundedReceiver> { + let (parsed_message_send, parsed_message_recv) = mpsc::unbounded(); + wasm_bindgen_futures::spawn_local(parse_loop(raw_message_recv, parsed_message_send)); + parsed_message_recv +} + /// A loop which reads raw WS messages from `recv`, parses them into domain types, /// and pushes the [`ParsedMessage`]s into `send`. async fn parse_loop( diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 5668ecb3aed..5b7576a9540 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -2,26 +2,32 @@ //! //! This module is internal, and may incompatibly change without warning. +#[cfg(not(target_arch = "wasm32"))] use std::mem; use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] use std::time::Duration; use bytes::Bytes; -use futures::{SinkExt, StreamExt as _, TryStreamExt}; +#[cfg(not(target_arch = "wasm32"))] +use futures::TryStreamExt; +use futures::{SinkExt, StreamExt as _}; use futures_channel::mpsc; use http::uri::{InvalidUri, Scheme, Uri}; use spacetimedb_client_api_messages::websocket as ws; use spacetimedb_lib::{bsatn, ConnectionId}; use thiserror::Error; -use tokio::task::JoinHandle; -use tokio::time::Instant; -use tokio::{net::TcpStream, runtime}; +#[cfg(not(target_arch = "wasm32"))] +use tokio::{net::TcpStream, runtime, task::JoinHandle, time::Instant}; +#[cfg(not(target_arch = "wasm32"))] use tokio_tungstenite::{ connect_async_with_config, tungstenite::client::IntoClientRequest, tungstenite::protocol::{Message as WebSocketMessage, WebSocketConfig}, MaybeTlsStream, WebSocketStream, }; +#[cfg(target_arch = "wasm32")] +use tokio_tungstenite_wasm::{Message as WebSocketMessage, WebSocketStream}; use crate::compression::decompress_server_message; use crate::metrics::CLIENT_METRICS; @@ -52,6 +58,7 @@ pub enum WsError { #[error(transparent)] UriError(#[from] UriError), + #[cfg(not(target_arch = "wasm32"))] #[error("Error in WebSocket connection with {uri}: {source}")] Tungstenite { uri: Uri, @@ -60,6 +67,15 @@ pub enum WsError { source: Arc, }, + #[cfg(target_arch = "wasm32")] + #[error("Error in WebSocket connection with {uri}: {source}")] + Tungstenite { + uri: Uri, + #[source] + // `Arc` is required for `Self: Clone`, as `tungstenite::Error: !Clone`. + source: Arc, + }, + #[error("Received empty raw message, but valid messages always start with a one-byte compression flag")] EmptyMessage, @@ -83,7 +99,10 @@ pub enum WsError { pub(crate) struct WsConnection { db_name: Box, + #[cfg(not(target_arch = "wasm32"))] sock: WebSocketStream>, + #[cfg(target_arch = "wasm32")] + sock: WebSocketStream, } fn parse_scheme(scheme: Option) -> Result { @@ -172,6 +191,7 @@ fn make_uri(host: Uri, db_name: &str, connection_id: Option, param // rather than having Tungstenite manage its own connections. Should this library do // the same? +#[cfg(not(target_arch = "wasm32"))] fn make_request( host: Uri, db_name: &str, @@ -189,6 +209,7 @@ fn make_request( Ok(req) } +#[cfg(not(target_arch = "wasm32"))] fn request_insert_protocol_header(req: &mut http::Request<()>) { req.headers_mut().insert( http::header::SEC_WEBSOCKET_PROTOCOL, @@ -196,6 +217,7 @@ fn request_insert_protocol_header(req: &mut http::Request<()>) { ); } +#[cfg(not(target_arch = "wasm32"))] fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>) { if let Some(token) = token { let auth = ["Bearer ", token].concat().try_into().unwrap(); @@ -206,6 +228,7 @@ fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>) /// If `res` evaluates to `Err(e)`, log a warning in the form `"{}: {:?}", $cause, e`. /// /// Could be trivially written as a function, but macro-ifying it preserves the source location of the log. +#[cfg(not(target_arch = "wasm32"))] macro_rules! maybe_log_error { ($cause:expr, $res:expr) => { if let Err(e) = $res { @@ -215,6 +238,7 @@ macro_rules! maybe_log_error { } impl WsConnection { + #[cfg(not(target_arch = "wasm32"))] pub(crate) async fn connect( host: Uri, db_name: &str, @@ -246,6 +270,28 @@ impl WsConnection { }) } + #[cfg(target_arch = "wasm32")] + pub(crate) async fn connect( + host: Uri, + db_name: &str, + _token: Option<&str>, + connection_id: Option, + params: WsParams, + ) -> Result { + let uri = make_uri(host, db_name, connection_id, params)?; + let sock = tokio_tungstenite_wasm::connect_with_protocols(&uri.to_string(), &[BIN_PROTOCOL]) + .await + .map_err(|source| WsError::Tungstenite { + uri, + source: Arc::new(source), + })?; + + Ok(WsConnection { + db_name: db_name.into(), + sock, + }) + } + pub(crate) fn parse_response(bytes: &[u8]) -> Result { let bytes = &*decompress_server_message(bytes)?; bsatn::from_slice(bytes).map_err(|source| WsError::DeserializeMessage { source }) @@ -255,6 +301,7 @@ impl WsConnection { WebSocketMessage::Binary(bsatn::to_vec(&msg).unwrap().into()) } + #[cfg(not(target_arch = "wasm32"))] async fn message_loop( mut self, incoming_messages: mpsc::UnboundedSender, @@ -392,6 +439,7 @@ impl WsConnection { } } + #[cfg(not(target_arch = "wasm32"))] pub(crate) fn spawn_message_loop( self, runtime: &runtime::Handle, @@ -405,4 +453,74 @@ impl WsConnection { let handle = runtime.spawn(self.message_loop(incoming_send, outgoing_recv)); (handle, incoming_recv, outgoing_send) } + + #[cfg(target_arch = "wasm32")] + pub(crate) fn spawn_message_loop( + self, + ) -> ( + mpsc::UnboundedReceiver>, + mpsc::UnboundedSender>, + ) { + + let websocket_received = CLIENT_METRICS.websocket_received.with_label_values(&self.db_name); + let websocket_received_msg_size = CLIENT_METRICS + .websocket_received_msg_size + .with_label_values(&self.db_name); + let record_metrics = move |msg_size: usize| { + websocket_received.inc(); + websocket_received_msg_size.observe(msg_size as f64); + }; + + let (outgoing_tx, outgoing_rx) = mpsc::unbounded::>(); + let (incoming_tx, incoming_rx) = mpsc::unbounded::>(); + + let ws = self.sock; + + wasm_bindgen_futures::spawn_local(async move { + // fuse both streams so `select!` knows when one side is done + let mut ws_stream = ws.fuse(); + let mut outgoing = outgoing_rx.fuse(); + + loop { + futures::select! { + // 1) inbound WS frames + frame = ws_stream.next() => match frame { + Some(Ok(WebSocketMessage::Binary(bytes))) => { + record_metrics(bytes.len()); + // parse + forward into `incoming_tx` + if let Ok(msg) = Self::parse_response(&bytes) { + match incoming_tx.unbounded_send(msg) { + Ok(_) => {}, + Err(_) => {} + } + } + } + Some(Err(e)) => { + gloo_console::warn!("WS Error: ", format!("{:?}",e)); + break; + } + None => { + gloo_console::warn!("WS Closed"); + break; + } + _ => {} + }, + + // 2) outbound messages + client_msg = outgoing.next() => if let Some(client_msg) = client_msg { + let raw = Self::encode_message(client_msg); + if let Err(e) = ws_stream.send(raw).await { + gloo_console::warn!("WS Send error: ", format!("{:?}",e)); + break; + } + } else { + // channel closed, so we're done sending + break; + }, + } + } + }); + + (incoming_rx, outgoing_tx) + } } From e1d88e61a19ded15751e17fd39b638f7e3f75abb Mon Sep 17 00:00:00 2001 From: Thales R Date: Fri, 25 Apr 2025 18:38:16 +0200 Subject: [PATCH 03/20] Enable run_threaded on the web with spawn_local --- crates/codegen/src/rust.rs | 6 ++++++ .../tests/snapshots/codegen__codegen_rust.snap | 6 ++++++ sdks/rust/src/db_connection.rs | 15 +++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/crates/codegen/src/rust.rs b/crates/codegen/src/rust.rs index f4e3ec1996c..f128a89a1a2 100644 --- a/crates/codegen/src/rust.rs +++ b/crates/codegen/src/rust.rs @@ -1771,10 +1771,16 @@ impl DbConnection {{ }} /// Spawn a thread which processes WebSocket messages as they are received. + #[cfg(not(target_arch = \"wasm32\"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> {{ self.imp.run_threaded() }} + #[cfg(target_arch = \"wasm32\")] + pub fn run_threaded(&self) {{ + self.imp.run_threaded() + }} + /// Run an `async` loop which processes WebSocket messages when polled. pub async fn run_async(&self) -> __sdk::Result<()> {{ self.imp.run_async().await diff --git a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap index 53ea2c7b113..9f81022ae37 100644 --- a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap +++ b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap @@ -1516,10 +1516,16 @@ impl DbConnection { } /// Spawn a thread which processes WebSocket messages as they are received. + #[cfg(not(target_arch = "wasm32"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { self.imp.run_threaded() } + #[cfg(target_arch = "wasm32")] + pub fn run_threaded(&self) { + self.imp.run_threaded() + } + /// Run an `async` loop which processes WebSocket messages when polled. pub async fn run_async(&self) -> __sdk::Result<()> { self.imp.run_async().await diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index a0e34e9db1f..4d92ab9f203 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -645,6 +645,7 @@ impl DbContextImpl { /// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop. /// /// Called by the autogenerated `DbConnection` method of the same name. + #[cfg(not(target_arch = "wasm32"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { let this = self.clone(); std::thread::spawn(move || loop { @@ -656,6 +657,20 @@ impl DbContextImpl { }) } + #[cfg(target_arch = "wasm32")] + pub fn run_threaded(&self) { + let this = self.clone(); + wasm_bindgen_futures::spawn_local(async move { + loop { + match this.advance_one_message_async().await { + Ok(()) => (), + Err(e) if error_is_normal_disconnect(&e) => return, + Err(e) => panic!("{e:?}"), + } + } + }) + } + /// An async task which does [`Self::advance_one_message_async`] in a loop. /// /// Called by the autogenerated `DbConnection` method of the same name. From 8ae11101ab744ba8573bd1eb7ddb2f6a82a1e24b Mon Sep 17 00:00:00 2001 From: Thales R Date: Thu, 1 May 2025 22:34:00 +0200 Subject: [PATCH 04/20] Improve Rust SDK websocket message handling on wasm32 --- sdks/rust/src/websocket.rs | 65 +++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 5b7576a9540..2cb85157634 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -474,47 +474,76 @@ impl WsConnection { let (outgoing_tx, outgoing_rx) = mpsc::unbounded::>(); let (incoming_tx, incoming_rx) = mpsc::unbounded::>(); - let ws = self.sock; + let (mut ws_writer, ws_reader) = self.sock.split(); wasm_bindgen_futures::spawn_local(async move { - // fuse both streams so `select!` knows when one side is done - let mut ws_stream = ws.fuse(); + let mut incoming = ws_reader.fuse(); let mut outgoing = outgoing_rx.fuse(); loop { futures::select! { // 1) inbound WS frames - frame = ws_stream.next() => match frame { + inbound = incoming.next() => match inbound { + Some(Err(tokio_tungstenite_wasm::Error::ConnectionClosed)) | None => { + gloo_console::log!("Connection closed"); + break; + }, + Some(Ok(WebSocketMessage::Binary(bytes))) => { record_metrics(bytes.len()); // parse + forward into `incoming_tx` - if let Ok(msg) = Self::parse_response(&bytes) { - match incoming_tx.unbounded_send(msg) { - Ok(_) => {}, - Err(_) => {} - } + match Self::parse_response(&bytes) { + Ok(msg) => if let Err(_e) = incoming_tx.unbounded_send(msg) { + gloo_console::warn!("Incoming receiver dropped."); + break; + }, + Err(e) => { + gloo_console::warn!( + "Error decoding WebSocketMessage::Binay payload: ", + format!("{:?}", e) + ); + }, } - } - Some(Err(e)) => { - gloo_console::warn!("WS Error: ", format!("{:?}",e)); + }, + + Some(Ok(WebSocketMessage::Ping(payload))) + | Some(Ok(WebSocketMessage::Pong(payload))) => { + record_metrics(payload.len()); + }, + + Some(Ok(WebSocketMessage::Close(r))) => { + let reason: String = if let Some(r) = r { + format!("{}:{:?}", r, r.code) + } else {String::default()}; + gloo_console::warn!("Connection Closed.", reason); + let _ = ws_writer.close().await; break; - } - None => { - gloo_console::warn!("WS Closed"); + }, + + Some(Err(e)) => { + gloo_console::warn!( + "Error reading message from read WebSocket stream: ", + format!("{:?}",e) + ); break; + }, + + Some(Ok(other)) => { + record_metrics(other.len()); + gloo_console::warn!("Unexpected WebSocket message: ", format!("{:?}",other)); } - _ => {} }, // 2) outbound messages - client_msg = outgoing.next() => if let Some(client_msg) = client_msg { + outbound = outgoing.next() => if let Some(client_msg) = outbound { let raw = Self::encode_message(client_msg); - if let Err(e) = ws_stream.send(raw).await { + if let Err(e) = ws_writer.send(raw).await { gloo_console::warn!("WS Send error: ", format!("{:?}",e)); break; } } else { // channel closed, so we're done sending + let _ = ws_writer.close().await; break; }, } From 832b03d100c4e3849007f3c51fdbe998a4fe0440 Mon Sep 17 00:00:00 2001 From: Thales R Date: Fri, 2 May 2025 16:46:54 +0200 Subject: [PATCH 05/20] Add cookie API and re-export gloo storage wrappers for the web SDK --- Cargo.lock | 16 +++ sdks/rust/Cargo.toml | 3 +- sdks/rust/src/credentials.rs | 188 ++++++++++++++++++++++++++++------- 3 files changed, 168 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee71666801a..62262a818f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2559,6 +2559,21 @@ dependencies = [ "web-sys", ] +[[package]] +name = "gloo-storage" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc8031e8c92758af912f9bc08fbbadd3c6f3cfcbf6b64cdf3d6a81f0139277a" +dependencies = [ + "gloo-utils", + "js-sys", + "serde", + "serde_json", + "thiserror 1.0.69", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "gloo-utils" version = "0.2.0" @@ -8373,6 +8388,7 @@ dependencies = [ "futures-channel", "getrandom 0.3.2", "gloo-console", + "gloo-storage", "hex", "home", "http 1.3.1", diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index 2337f3f39fe..10fbfa7f18f 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -42,6 +42,7 @@ tokio-tungstenite.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.3.2", features = ["wasm_js"]} gloo-console = "0.3.0" +gloo-storage = "0.3.0" rustls-pki-types = { version = "1.11.0", features = ["web"] } tokio-tungstenite-wasm = { git = "https://github.com/thlsrms/tokio-tungstenite-wasm", rev = "c788b7cfc30f576c" } tokio = { version = "1.37", default-features = false, features = [ @@ -50,7 +51,7 @@ tokio = { version = "1.37", default-features = false, features = [ tungstenite = { version = "0.26.2", features = ["rustls"] } wasm-bindgen = "0.2.100" wasm-bindgen-futures = "0.4.45" -web-sys = { version = "0.3.77", features = [ "Window", "Storage" ] } +web-sys = { version = "0.3.77", features = [ "Document", "HtmlDocument", "Window", "Storage" ] } [dev-dependencies] # for quickstart-chat and cursive-chat examples diff --git a/sdks/rust/src/credentials.rs b/sdks/rust/src/credentials.rs index 497cd21c628..d9b322d8507 100644 --- a/sdks/rust/src/credentials.rs +++ b/sdks/rust/src/credentials.rs @@ -155,55 +155,167 @@ mod native_mod { #[cfg(target_arch = "wasm32")] mod web_mod { - use thiserror::Error; + pub use gloo_storage::{LocalStorage, SessionStorage, Storage}; - #[derive(Error, Debug)] - pub enum CredentialStorageError { - #[error("Could not access localStorage")] - LocalStorageAccess, + pub mod cookies { + use thiserror::Error; + use wasm_bindgen::{JsCast, JsValue}; + use web_sys::{window, Document, HtmlDocument}; - #[error("Exception while interacting with localStorage: {0:?}")] - LocalStorageJsError(wasm_bindgen::JsValue), + #[derive(Error, Debug)] + pub enum CookieError { + #[error("Window Object not valid in this context")] + NoWindow, + #[error("No `document` available on `window` object")] + NoDocument, + #[error("`document` is not an HtmlDocument")] + NoHtmlDocument, + #[error("web_sys error: {0:?}")] + WebSys(JsValue), + } - #[error("Window object is not available in this context")] - WindowObjectAccess, - } + impl From for CookieError { + fn from(err: JsValue) -> Self { + CookieError::WebSys(err) + } + } - /// TODO: Give it an option for 'Local', 'Session', 'Cookie'? - pub struct StorageEntry { - key: String, - } + /// A builder for contructing and setting cookies. + pub struct Cookie { + name: String, + value: String, + path: Option, + domain: Option, + max_age: Option, + secure: bool, + same_site: Option, + } - impl StorageEntry { - pub fn new(key: impl Into) -> Self { - Self { key: key.into() } + impl Cookie { + /// Creates a new cookie builder with the specified name and value. + pub fn new(name: impl Into, value: impl Into) -> Self { + Self { + name: name.into(), + value: value.into(), + path: None, + domain: None, + max_age: None, + secure: false, + same_site: None, + } + } + + /// Gets the value of a cookie by name. + pub fn get(name: &str) -> Result, CookieError> { + let doc = get_html_document()?; + let all = doc.cookie().map_err(CookieError::from)?; + for cookie in all.split(';') { + let cookie = cookie.trim(); + if let Some((k, v)) = cookie.split_once('=') { + if k == name { + return Ok(Some(v.to_string())); + } + } + } + + Ok(None) + } + + /// Sets the `Path` attribute (e.g., "/"). + pub fn path(mut self, path: impl Into) -> Self { + self.path = Some(path.into()); + self + } + + /// Sets the `Domain` attribute (e.g., "example.com"). + pub fn domain(mut self, domain: impl Into) -> Self { + self.domain = Some(domain.into()); + self + } + + /// Sets the `Max-Age` attribute in seconds. + pub fn max_age(mut self, seconds: i32) -> Self { + self.max_age = Some(seconds); + self + } + + /// Toggles the `Secure` flag. + /// The default is `false`. + pub fn secure(mut self, enabled: bool) -> Self { + self.secure = enabled; + self + } + + /// Sets the `SameSite` attribute (`Strict`, `Lax`, or `None`). + pub fn same_site(mut self, same_site: SameSite) -> Self { + self.same_site = Some(same_site); + self + } + + pub fn set(self) -> Result<(), CookieError> { + let doc = get_html_document()?; + let mut parts = vec![format!("{}={}", self.name, self.value)]; + + if let Some(path) = self.path { + parts.push(format!("Path={}", path)); + } + if let Some(domain) = self.domain { + parts.push(format!("Domain={}", domain)); + } + if let Some(age) = self.max_age { + parts.push(format!("Max-Age={}", age)); + } + if self.secure { + parts.push("Secure".into()); + } + if let Some(same) = self.same_site { + parts.push(format!("SameSite={}", same.to_string())); + } + + let cookie_str = parts.join("; "); + doc.set_cookie(&cookie_str).map_err(CookieError::from) + } + + /// Deletes the cookie by setting its value to empty and `Max-Age=0`. + pub fn delete(self) -> Result<(), CookieError> { + self.value("").max_age(0).set() + } + + /// Helper to override value for delete + fn value(mut self, value: impl Into) -> Self { + self.value = value.into(); + self + } } - pub fn save(&self, token: impl Into) -> Result<(), CredentialStorageError> { - let local_storage = web_sys::window() - .ok_or(CredentialStorageError::WindowObjectAccess)? - .local_storage() - .map_err(CredentialStorageError::LocalStorageJsError)? - .ok_or(CredentialStorageError::LocalStorageAccess)?; - local_storage - .set_item(&self.key, &token.into()) - .map_err(CredentialStorageError::LocalStorageJsError)?; - Ok(()) + /// Controls the `SameSite` attribute for cookies. + pub enum SameSite { + Strict, + Lax, + None, } - pub fn load(&self) -> Result, CredentialStorageError> { - let local_storage = web_sys::window() - .ok_or(CredentialStorageError::WindowObjectAccess)? - .local_storage() - .map_err(CredentialStorageError::LocalStorageJsError)? - .ok_or(CredentialStorageError::LocalStorageAccess)?; - - match local_storage.get_item(&self.key) { - Ok(Some(token)) => Ok(Some(token)), - Ok(None) => Ok(None), - Err(err) => Err(CredentialStorageError::LocalStorageJsError(err)), + impl ToString for SameSite { + fn to_string(&self) -> String { + match self { + SameSite::Strict => "Strict".into(), + SameSite::Lax => "Lax".into(), + SameSite::None => "None".into(), + } } } + + fn get_document() -> Result { + window() + .ok_or(CookieError::NoWindow)? + .document() + .ok_or(CookieError::NoDocument) + } + + fn get_html_document() -> Result { + let doc = get_document()?; + doc.dyn_into::().map_err(|_| CookieError::NoHtmlDocument) + } } } From a79a798fc4b2ee7c81480e262a686a068a74ce4b Mon Sep 17 00:00:00 2001 From: Thales R Date: Mon, 5 May 2025 14:09:51 +0200 Subject: [PATCH 06/20] Remove some redundancies from wasm32 rust sdk --- Cargo.lock | 5 ++-- sdks/rust/Cargo.toml | 6 ++-- sdks/rust/src/db_connection.rs | 55 ++++++++++------------------------ sdks/rust/src/websocket.rs | 11 +++---- 4 files changed, 26 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 62262a818f6..84e67ae8953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9443,8 +9443,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite-wasm" -version = "0.5.0" -source = "git+https://github.com/thlsrms/tokio-tungstenite-wasm?rev=c788b7cfc30f576c#c788b7cfc30f576c207344c2907932b5317ca5e0" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02567f5f341725fb3e452c1f55dd4e5b0f2a685355c3b10babf0fe8e137d176e" dependencies = [ "bytes", "futures-channel", diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index 10fbfa7f18f..8b6d4896ed6 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -44,14 +44,14 @@ getrandom = { version = "0.3.2", features = ["wasm_js"]} gloo-console = "0.3.0" gloo-storage = "0.3.0" rustls-pki-types = { version = "1.11.0", features = ["web"] } -tokio-tungstenite-wasm = { git = "https://github.com/thlsrms/tokio-tungstenite-wasm", rev = "c788b7cfc30f576c" } tokio = { version = "1.37", default-features = false, features = [ - "rt", "macros", "sync" + "rt", "macros", "sync", "io-util" ] } +tokio-tungstenite-wasm = "0.6.0" tungstenite = { version = "0.26.2", features = ["rustls"] } wasm-bindgen = "0.2.100" wasm-bindgen-futures = "0.4.45" -web-sys = { version = "0.3.77", features = [ "Document", "HtmlDocument", "Window", "Storage" ] } +web-sys = { version = "0.3.77", features = [ "Document", "HtmlDocument", "Window" ] } [dev-dependencies] # for quickstart-chat and cursive-chat examples diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 4d92ab9f203..20432f6a783 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -68,7 +68,6 @@ pub struct DbContextImpl { /// which are pre-parsed in the background by [`parse_loop`]. #[cfg(not(target_arch = "wasm32"))] recv: Arc>>>, - #[cfg(target_arch = "wasm32")] recv: SharedCell>>, @@ -82,7 +81,6 @@ pub struct DbContextImpl { /// from which [Self::apply_pending_mutations] and friends read mutations. #[cfg(not(target_arch = "wasm32"))] pending_mutations_recv: Arc>>>, - #[cfg(target_arch = "wasm32")] pending_mutations_recv: SharedCell>>, @@ -515,7 +513,6 @@ impl DbContextImpl { /// If no WebSocket messages are in the queue, immediately return `false`. /// /// Called by the autogenerated `DbConnection` method of the same name. - #[cfg(not(target_arch = "wasm32"))] pub fn advance_one_message(&self) -> crate::Result { // Apply any pending mutations before processing a WS message, // so that pending callbacks don't get skipped. @@ -532,34 +529,18 @@ impl DbContextImpl { // returns `Err(_)`. Similar behavior as `Iterator::next` and // `Stream::poll_next`. No comment on whether this is a good mental // model or not. - let res = match self.recv.blocking_lock().try_next() { - Ok(None) => { - let disconnect_ctx = self.make_event_ctx(None); - self.invoke_disconnected(&disconnect_ctx); - Err(crate::Error::Disconnected) - } - Err(_) => Ok(false), - Ok(Some(msg)) => self.process_message(msg).map(|_| true), - }; - // Also apply any new pending messages afterwards, - // so that outgoing WS messages get sent as soon as possible. - self.apply_pending_mutations()?; + let res = { + #[cfg(not(target_arch = "wasm32"))] + let mut recv = self.recv.blocking_lock(); - res - } + #[cfg(target_arch = "wasm32")] + let mut recv = self.recv.lock().unwrap(); - #[cfg(target_arch = "wasm32")] - pub fn advance_one_message(&self) -> crate::Result { - self.apply_pending_mutations()?; - // Synchronously try to pull one server message - let res = { - let mut chan = self.recv.lock().unwrap(); - match chan.try_next() { + match recv.try_next() { Ok(None) => { - // Shouldn’t happen on unbounded, treat as disconnect - let ctx = self.make_event_ctx(None); - self.invoke_disconnected(&ctx); + let disconnect_ctx = self.make_event_ctx(None); + self.invoke_disconnected(&disconnect_ctx); Err(crate::Error::Disconnected) } Err(_) => Ok(false), @@ -567,7 +548,8 @@ impl DbContextImpl { } }; - // send any pending outgoing mutations now that we've done a read + // Also apply any new pending messages afterwards, + // so that outgoing WS messages get sent as soon as possible. self.apply_pending_mutations()?; res @@ -997,7 +979,7 @@ but you must call one of them, or else the connection will never progress. })?; let (raw_msg_recv, raw_msg_send) = ws_connection.spawn_message_loop(); - let parsed_recv_chan = spawn_parse_loop::(raw_msg_recv, &handle); + let parsed_recv_chan = spawn_parse_loop::(raw_msg_recv); let inner = Arc::new(StdMutex::new(DbContextImplInner { runtime, @@ -1023,10 +1005,8 @@ but you must call one of them, or else the connection will never progress. inner, send_chan, cache, - #[cfg(target_arch = "wasm32")] recv: Arc::new(StdMutex::new(parsed_recv_chan)), pending_mutations_send, - #[cfg(target_arch = "wasm32")] pending_mutations_recv: Arc::new(StdMutex::new(pending_mutations_recv)), identity: Arc::new(StdMutex::new(None)), connection_id: Arc::new(StdMutex::new(connection_id_override)), @@ -1158,14 +1138,12 @@ fn enter_or_create_runtime() -> crate::Result<(Option, runtime::Handle) match runtime::Handle::try_current() { Err(e) if e.is_missing_context() => { #[cfg(not(target_arch = "wasm32"))] - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(1) - .thread_name("spacetimedb-background-connection") - .build() - .map_err(|source| InternalError::new("Failed to create Tokio runtime").with_cause(source))?; + let mut rt = tokio::runtime::Builder::new_multi_thread(); #[cfg(target_arch = "wasm32")] - let rt = tokio::runtime::Builder::new_current_thread() + let mut rt = tokio::runtime::Builder::new_current_thread(); + + let rt = rt + .enable_all() .worker_threads(1) .thread_name("spacetimedb-background-connection") .build() @@ -1223,7 +1201,6 @@ fn spawn_parse_loop( #[cfg(target_arch = "wasm32")] fn spawn_parse_loop( raw_message_recv: mpsc::UnboundedReceiver>, - _handle: &runtime::Handle, ) -> mpsc::UnboundedReceiver> { let (parsed_message_send, parsed_message_recv) = mpsc::unbounded(); wasm_bindgen_futures::spawn_local(parse_loop(raw_message_recv, parsed_message_send)); diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 2cb85157634..6da5a5fcf94 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -506,11 +506,6 @@ impl WsConnection { } }, - Some(Ok(WebSocketMessage::Ping(payload))) - | Some(Ok(WebSocketMessage::Pong(payload))) => { - record_metrics(payload.len()); - }, - Some(Ok(WebSocketMessage::Close(r))) => { let reason: String = if let Some(r) = r { format!("{}:{:?}", r, r.code) @@ -538,12 +533,14 @@ impl WsConnection { outbound = outgoing.next() => if let Some(client_msg) = outbound { let raw = Self::encode_message(client_msg); if let Err(e) = ws_writer.send(raw).await { - gloo_console::warn!("WS Send error: ", format!("{:?}",e)); + gloo_console::warn!("Error sending outgoing message:", format!("{:?}",e)); break; } } else { // channel closed, so we're done sending - let _ = ws_writer.close().await; + if let Err(e) = ws_writer.close().await { + gloo_console::warn!("Error sending close frame:", format!("{:?}", e)); + } break; }, } From c03cdce7c3c49bcffc9d0728509680844d76756d Mon Sep 17 00:00:00 2001 From: Thales R Date: Sun, 11 May 2025 12:18:57 +0200 Subject: [PATCH 07/20] Add `web` feature flag to the rust sdk --- Cargo.lock | 40 ++++++++++++++++++++++++----- sdks/rust/Cargo.toml | 31 ++++++++++++++++------- sdks/rust/src/credentials.rs | 8 +++--- sdks/rust/src/db_connection.rs | 46 +++++++++++++++++----------------- sdks/rust/src/websocket.rs | 36 +++++++++++++------------- 5 files changed, 101 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84e67ae8953..9623661203d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7686,7 +7686,7 @@ dependencies = [ "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.27.0", "toml 0.8.23", "toml_edit 0.22.27", "tracing", @@ -7745,7 +7745,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", - "tokio-tungstenite", + "tokio-tungstenite 0.27.0", "toml 0.8.23", "tower-http 0.5.2", "tower-layer", @@ -8406,9 +8406,8 @@ dependencies = [ "spacetimedb-testing", "thiserror 1.0.69", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.27.0", "tokio-tungstenite-wasm", - "tungstenite", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -9427,6 +9426,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.26.2", +] + [[package]] name = "tokio-tungstenite" version = "0.27.0" @@ -9438,7 +9449,7 @@ dependencies = [ "native-tls", "tokio", "tokio-native-tls", - "tungstenite", + "tungstenite 0.27.0", ] [[package]] @@ -9455,7 +9466,7 @@ dependencies = [ "js-sys", "thiserror 2.0.12", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.26.2", "wasm-bindgen", "web-sys", ] @@ -9831,6 +9842,23 @@ dependencies = [ "termcolor", ] +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.9.1", + "sha1", + "thiserror 2.0.12", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.27.0" diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index 8b6d4896ed6..bfd8fc7ff20 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -9,7 +9,18 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +default = [] allow_loopback_http_for_tests = ["spacetimedb-testing/allow_loopback_http_for_tests"] +web = [ + "dep:getrandom", + "dep:gloo-console", + "dep:gloo-storage", + "dep:rustls-pki-types", + "dep:tokio-tungstenite-wasm", + "dep:wasm-bindgen", + "dep:wasm-bindgen-futures", + "dep:web-sys", +] [dependencies] spacetimedb-data-structures.workspace = true @@ -34,24 +45,26 @@ once_cell.workspace = true prometheus.workspace = true rand.workspace = true +getrandom = { version = "0.3.2", features = ["wasm_js"], optional = true } +gloo-console = { version = "0.3.0", optional = true } +gloo-storage = { version = "0.3.0", optional = true } +rustls-pki-types = { version = "1.12.0", features = ["web"], optional = true } +tokio-tungstenite-wasm = { version = "0.6.0", optional = true } +wasm-bindgen = { version = "0.2.100", optional = true } +wasm-bindgen-futures = { version = "0.4.45", optional = true } +web-sys = { version = "0.3.77", features = [ + "Document", "HtmlDocument", "Window" +], optional = true} + [target.'cfg(not(target_arch = "wasm32"))'.dependencies] home.workspace = true tokio.workspace = true tokio-tungstenite.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] -getrandom = { version = "0.3.2", features = ["wasm_js"]} -gloo-console = "0.3.0" -gloo-storage = "0.3.0" -rustls-pki-types = { version = "1.11.0", features = ["web"] } tokio = { version = "1.37", default-features = false, features = [ "rt", "macros", "sync", "io-util" ] } -tokio-tungstenite-wasm = "0.6.0" -tungstenite = { version = "0.26.2", features = ["rustls"] } -wasm-bindgen = "0.2.100" -wasm-bindgen-futures = "0.4.45" -web-sys = { version = "0.3.77", features = [ "Document", "HtmlDocument", "Window" ] } [dev-dependencies] # for quickstart-chat and cursive-chat examples diff --git a/sdks/rust/src/credentials.rs b/sdks/rust/src/credentials.rs index d9b322d8507..4f5c774b591 100644 --- a/sdks/rust/src/credentials.rs +++ b/sdks/rust/src/credentials.rs @@ -8,7 +8,7 @@ //! } //! ``` -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] mod native_mod { use home::home_dir; use spacetimedb_lib::{bsatn, de::Deserialize, ser::Serialize}; @@ -153,7 +153,7 @@ mod native_mod { } } -#[cfg(target_arch = "wasm32")] +#[cfg(feature = "web")] mod web_mod { pub use gloo_storage::{LocalStorage, SessionStorage, Storage}; @@ -319,8 +319,8 @@ mod web_mod { } } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] pub use native_mod::*; -#[cfg(target_arch = "wasm32")] +#[cfg(feature = "web")] pub use web_mod::*; diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 20432f6a783..e15c6322fb8 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -42,7 +42,7 @@ use std::{ sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock}, }; use tokio::runtime::{self, Runtime}; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] use tokio::sync::Mutex as TokioMutex; pub(crate) type SharedCell = Arc>; @@ -66,9 +66,9 @@ pub struct DbContextImpl { /// Receiver channel for WebSocket messages, /// which are pre-parsed in the background by [`parse_loop`]. - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] recv: Arc>>>, - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] recv: SharedCell>>, /// Channel into which operations which apparently mutate SDK state, @@ -79,9 +79,9 @@ pub struct DbContextImpl { /// Receive end of `pending_mutations_send`, /// from which [Self::apply_pending_mutations] and friends read mutations. - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] pending_mutations_recv: Arc>>>, - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] pending_mutations_recv: SharedCell>>, /// This connection's `Identity`. @@ -317,12 +317,12 @@ impl DbContextImpl { /// Apply all queued [`PendingMutation`]s. fn apply_pending_mutations(&self) -> crate::Result<()> { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] while let Ok(Some(pending_mutation)) = self.pending_mutations_recv.blocking_lock().try_next() { self.apply_mutation(pending_mutation)?; } - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] while let Ok(Some(pending_mutation)) = self.pending_mutations_recv.lock().unwrap().try_next() { self.apply_mutation(pending_mutation)?; } @@ -531,10 +531,10 @@ impl DbContextImpl { // model or not. let res = { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] let mut recv = self.recv.blocking_lock(); - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] let mut recv = self.recv.lock().unwrap(); match recv.try_next() { @@ -561,14 +561,14 @@ impl DbContextImpl { // We call this out as an incorrect and unsupported thing to do. #![allow(clippy::await_holding_lock)] - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] let mut pending_mutations = self.pending_mutations_recv.lock().await; - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] let mut pending_mutations = self.pending_mutations_recv.lock().unwrap(); - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] let mut recv = self.recv.lock().await; - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] let mut recv = self.recv.lock().unwrap(); // Always process pending mutations before WS messages, if they're available, @@ -627,7 +627,7 @@ impl DbContextImpl { /// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop. /// /// Called by the autogenerated `DbConnection` method of the same name. - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { let this = self.clone(); std::thread::spawn(move || loop { @@ -639,7 +639,7 @@ impl DbContextImpl { }) } - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] pub fn run_threaded(&self) { let this = self.clone(); wasm_bindgen_futures::spawn_local(async move { @@ -887,13 +887,13 @@ You must explicitly advance the connection by calling any one of: Which of these methods you should call depends on the specific needs of your application, but you must call one of them, or else the connection will never progress. "] - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] pub fn build(self) -> crate::Result { let imp = self.build_impl()?; Ok(::new(imp)) } - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] pub async fn build(self) -> crate::Result { let imp = self.build_impl().await?; Ok(::new(imp)) @@ -901,7 +901,7 @@ but you must call one of them, or else the connection will never progress. /// Open a WebSocket connection, build an empty client cache, &c, /// to construct a [`DbContextImpl`]. - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] fn build_impl(self) -> crate::Result> { let (runtime, handle) = enter_or_create_runtime()?; let db_callbacks = DbCallbacks::default(); @@ -959,7 +959,7 @@ but you must call one of them, or else the connection will never progress. Ok(ctx_imp) } - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] pub async fn build_impl(self) -> crate::Result> { let (runtime, handle) = enter_or_create_runtime()?; let db_callbacks = DbCallbacks::default(); @@ -1137,9 +1137,9 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal fn enter_or_create_runtime() -> crate::Result<(Option, runtime::Handle)> { match runtime::Handle::try_current() { Err(e) if e.is_missing_context() => { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] let mut rt = tokio::runtime::Builder::new_multi_thread(); - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] let mut rt = tokio::runtime::Builder::new_current_thread(); let rt = rt @@ -1188,7 +1188,7 @@ enum ParsedMessage { }, } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] fn spawn_parse_loop( raw_message_recv: mpsc::UnboundedReceiver, handle: &runtime::Handle, @@ -1198,7 +1198,7 @@ fn spawn_parse_loop( (handle, parsed_message_recv) } -#[cfg(target_arch = "wasm32")] +#[cfg(feature = "web")] fn spawn_parse_loop( raw_message_recv: mpsc::UnboundedReceiver>, ) -> mpsc::UnboundedReceiver> { diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 6da5a5fcf94..6ca33727276 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -2,14 +2,14 @@ //! //! This module is internal, and may incompatibly change without warning. -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] use std::mem; use std::sync::Arc; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] use std::time::Duration; use bytes::Bytes; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] use futures::TryStreamExt; use futures::{SinkExt, StreamExt as _}; use futures_channel::mpsc; @@ -17,16 +17,16 @@ use http::uri::{InvalidUri, Scheme, Uri}; use spacetimedb_client_api_messages::websocket as ws; use spacetimedb_lib::{bsatn, ConnectionId}; use thiserror::Error; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] use tokio::{net::TcpStream, runtime, task::JoinHandle, time::Instant}; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] use tokio_tungstenite::{ connect_async_with_config, tungstenite::client::IntoClientRequest, tungstenite::protocol::{Message as WebSocketMessage, WebSocketConfig}, MaybeTlsStream, WebSocketStream, }; -#[cfg(target_arch = "wasm32")] +#[cfg(feature = "web")] use tokio_tungstenite_wasm::{Message as WebSocketMessage, WebSocketStream}; use crate::compression::decompress_server_message; @@ -58,7 +58,7 @@ pub enum WsError { #[error(transparent)] UriError(#[from] UriError), - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] #[error("Error in WebSocket connection with {uri}: {source}")] Tungstenite { uri: Uri, @@ -67,7 +67,7 @@ pub enum WsError { source: Arc, }, - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] #[error("Error in WebSocket connection with {uri}: {source}")] Tungstenite { uri: Uri, @@ -99,9 +99,9 @@ pub enum WsError { pub(crate) struct WsConnection { db_name: Box, - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] sock: WebSocketStream>, - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] sock: WebSocketStream, } @@ -191,7 +191,7 @@ fn make_uri(host: Uri, db_name: &str, connection_id: Option, param // rather than having Tungstenite manage its own connections. Should this library do // the same? -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] fn make_request( host: Uri, db_name: &str, @@ -209,7 +209,7 @@ fn make_request( Ok(req) } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] fn request_insert_protocol_header(req: &mut http::Request<()>) { req.headers_mut().insert( http::header::SEC_WEBSOCKET_PROTOCOL, @@ -217,7 +217,7 @@ fn request_insert_protocol_header(req: &mut http::Request<()>) { ); } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>) { if let Some(token) = token { let auth = ["Bearer ", token].concat().try_into().unwrap(); @@ -238,7 +238,7 @@ macro_rules! maybe_log_error { } impl WsConnection { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] pub(crate) async fn connect( host: Uri, db_name: &str, @@ -270,7 +270,7 @@ impl WsConnection { }) } - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] pub(crate) async fn connect( host: Uri, db_name: &str, @@ -301,7 +301,7 @@ impl WsConnection { WebSocketMessage::Binary(bsatn::to_vec(&msg).unwrap().into()) } - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] async fn message_loop( mut self, incoming_messages: mpsc::UnboundedSender, @@ -439,7 +439,7 @@ impl WsConnection { } } - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(feature = "web"))] pub(crate) fn spawn_message_loop( self, runtime: &runtime::Handle, @@ -454,7 +454,7 @@ impl WsConnection { (handle, incoming_recv, outgoing_send) } - #[cfg(target_arch = "wasm32")] + #[cfg(feature = "web")] pub(crate) fn spawn_message_loop( self, ) -> ( From 506d78fbd854e900ba2336967935bfad3730b91b Mon Sep 17 00:00:00 2001 From: Thales R Date: Wed, 7 May 2025 18:33:14 +0200 Subject: [PATCH 08/20] Add token verification for the wasm sdk websocket connection --- Cargo.lock | 23 +++++++++ sdks/rust/Cargo.toml | 4 ++ sdks/rust/src/db_connection.rs | 1 - sdks/rust/src/websocket.rs | 90 ++++++++++++++++++++++++++++++++-- 4 files changed, 114 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9623661203d..5fc3fe756c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2559,6 +2559,27 @@ dependencies = [ "web-sys", ] +[[package]] +name = "gloo-net" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06f627b1a58ca3d42b45d6104bf1e1a03799df472df00988b6ba21accc10580" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "gloo-utils", + "http 1.3.1", + "js-sys", + "pin-project", + "serde", + "serde_json", + "thiserror 1.0.69", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "gloo-storage" version = "0.3.0" @@ -8388,10 +8409,12 @@ dependencies = [ "futures-channel", "getrandom 0.3.2", "gloo-console", + "gloo-net", "gloo-storage", "hex", "home", "http 1.3.1", + "js-sys", "log", "once_cell", "prometheus", diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index bfd8fc7ff20..a7c4b0b926f 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -14,7 +14,9 @@ allow_loopback_http_for_tests = ["spacetimedb-testing/allow_loopback_http_for_te web = [ "dep:getrandom", "dep:gloo-console", + "dep:gloo-net", "dep:gloo-storage", + "dep:js-sys", "dep:rustls-pki-types", "dep:tokio-tungstenite-wasm", "dep:wasm-bindgen", @@ -47,7 +49,9 @@ rand.workspace = true getrandom = { version = "0.3.2", features = ["wasm_js"], optional = true } gloo-console = { version = "0.3.0", optional = true } +gloo-net = { version = "0.6.0", optional = true } gloo-storage = { version = "0.3.0", optional = true } +js-sys = { version = "0.3", optional = true } rustls-pki-types = { version = "1.12.0", features = ["web"], optional = true } tokio-tungstenite-wasm = { version = "0.6.0", optional = true } wasm-bindgen = { version = "0.2.100", optional = true } diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index e15c6322fb8..ba240fd71bc 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -1040,7 +1040,6 @@ but you must call one of them, or else the connection will never progress. /// If the passed token is invalid or rejected by the host, /// the connection will fail asynchrnonously. // FIXME: currently this causes `disconnect` to be called rather than `on_connect_error`. - #[cfg(not(target_arch = "wasm32"))] pub fn with_token(mut self, token: Option>) -> Self { self.token = token.map(|token| token.into()); self diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 6ca33727276..82821949532 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -95,6 +95,10 @@ pub enum WsError { #[error("Unrecognized compression scheme: {scheme:#x}")] UnknownCompressionScheme { scheme: u8 }, + + #[cfg(feature = "web")] + #[error("Token verification error: {0}")] + TokenVerification(String), } pub(crate) struct WsConnection { @@ -130,7 +134,29 @@ pub(crate) struct WsParams { pub confirmed: Option, } +#[cfg(not(feature = "web"))] fn make_uri(host: Uri, db_name: &str, connection_id: Option, params: WsParams) -> Result { + make_uri_impl(host, db_name, connection_id, params, None) +} + +#[cfg(feature = "web")] +fn make_uri( + host: Uri, + db_name: &str, + connection_id: Option, + params: WsParams, + token: Option<&str>, +) -> Result { + make_uri_impl(host, db_name, connection_id, params, token) +} + +fn make_uri_impl( + host: Uri, + db_name: &str, + connection_id: Option, + params: WsParams, + token: Option<&str>, +) -> Result { let mut parts = host.into_parts(); let scheme = parse_scheme(parts.scheme.take())?; parts.scheme = Some(scheme); @@ -174,6 +200,11 @@ fn make_uri(host: Uri, db_name: &str, connection_id: Option, param path.push_str(if confirmed { "true" } else { "false" }); } + // Specify the `token` param if needed + if let Some(token) = token { + path.push_str(&format!("&token={token}")); + } + parts.path_and_query = Some(path.parse().map_err(|source: InvalidUri| UriError::InvalidUri { source: Arc::new(source), })?); @@ -225,10 +256,57 @@ fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>) } } +#[cfg(feature = "web")] +async fn fetch_ws_token(host: &Uri, auth_token: &str) -> Result { + use gloo_net::http::{Method, RequestBuilder}; + use js_sys::{Reflect, JSON}; + use wasm_bindgen::{JsCast, JsValue}; + + let url = format!("{}v1/identity/websocket-token", host); + + // helpers to convert gloo_net::Error or JsValue into WsError::TokenVerification + let gloo_to_ws_err = |e: gloo_net::Error| match e { + gloo_net::Error::JsError(js_err) => WsError::TokenVerification(js_err.message.into()), + gloo_net::Error::SerdeError(e) => WsError::TokenVerification(e.to_string()), + gloo_net::Error::GlooError(msg) => WsError::TokenVerification(msg), + }; + let js_to_ws_err = |e: JsValue| { + if let Some(err) = e.dyn_ref::() { + WsError::TokenVerification(err.message().into()) + } else if let Some(s) = e.as_string() { + WsError::TokenVerification(s) + } else { + WsError::TokenVerification(format!("{:?}", e)) + } + }; + + let res = RequestBuilder::new(&url) + .method(Method::POST) + .header("Authorization", &format!("Bearer {auth_token}")) + .send() + .await + .map_err(gloo_to_ws_err)?; + + if !res.ok() { + return Err(WsError::TokenVerification(format!( + "HTTP error: {} {}", + res.status(), + res.status_text() + ))); + } + + let body = res.text().await.map_err(gloo_to_ws_err)?; + let json = JSON::parse(&body).map_err(js_to_ws_err)?; + let token_js = Reflect::get(&json, &JsValue::from_str("token")).map_err(js_to_ws_err)?; + token_js + .as_string() + .ok_or_else(|| WsError::TokenVerification("`token` parsing failed".into())) +} + /// If `res` evaluates to `Err(e)`, log a warning in the form `"{}: {:?}", $cause, e`. /// /// Could be trivially written as a function, but macro-ifying it preserves the source location of the log. -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(feature = "web"))] macro_rules! maybe_log_error { ($cause:expr, $res:expr) => { if let Err(e) = $res { @@ -274,11 +352,17 @@ impl WsConnection { pub(crate) async fn connect( host: Uri, db_name: &str, - _token: Option<&str>, + token: Option<&str>, connection_id: Option, params: WsParams, ) -> Result { - let uri = make_uri(host, db_name, connection_id, params)?; + let token = if let Some(auth_token) = token { + Some(fetch_ws_token(&host, auth_token).await?) + } else { + None + }; + + let uri = make_uri(host, db_name, connection_id, params, token.as_deref())?; let sock = tokio_tungstenite_wasm::connect_with_protocols(&uri.to_string(), &[BIN_PROTOCOL]) .await .map_err(|source| WsError::Tungstenite { From b126cae642c58def31956f805198b3610b810c54 Mon Sep 17 00:00:00 2001 From: Thales R Date: Wed, 7 May 2025 18:41:00 +0200 Subject: [PATCH 09/20] Rename misleading `run_threaded` to `run_background` on wasm32 Renamed the `run_threaded` method on `wasm32` to better reflect its behavior of spawning a background task. The generated `DbConnection` methods `run_threaded`, `run_background`, and `advance_one_message_blocking` now include runtime panics with a clear error feedback when called on unsupported targets. --- crates/codegen/src/rust.rs | 45 ++++++++++++++++--- .../snapshots/codegen__codegen_rust.snap | 45 ++++++++++++++++--- sdks/rust/src/db_connection.rs | 7 ++- 3 files changed, 83 insertions(+), 14 deletions(-) diff --git a/crates/codegen/src/rust.rs b/crates/codegen/src/rust.rs index f128a89a1a2..20082146501 100644 --- a/crates/codegen/src/rust.rs +++ b/crates/codegen/src/rust.rs @@ -1646,6 +1646,7 @@ impl __sdk::InModule for RemoteTables {{ /// /// - [`DbConnection::frame_tick`]. /// - [`DbConnection::run_threaded`]. +/// - [`DbConnection::run_background`]. /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. /// - [`DbConnection::advance_one_message_blocking`]. @@ -1746,8 +1747,19 @@ impl DbConnection {{ /// This is a low-level primitive exposed for power users who need significant control over scheduling. /// Most applications should call [`Self::run_threaded`] to spawn a thread /// which advances the connection automatically. + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn advance_one_message_blocking(&self) -> __sdk::Result<()> {{ - self.imp.advance_one_message_blocking() + #[cfg(target_arch = \"wasm32\")] + {{ + panic!(\"`DbConnection::advance_one_message_blocking` is not supported on WebAssembly (wasm32); \\ + prefer using `advance_one_message` or `advance_one_message_async` instead\"); + }} + #[cfg(not(target_arch = \"wasm32\"))] + {{ + self.imp.advance_one_message_blocking() + }} }} /// Process one WebSocket message, `await`ing until one is received. @@ -1771,14 +1783,35 @@ impl DbConnection {{ }} /// Spawn a thread which processes WebSocket messages as they are received. - #[cfg(not(target_arch = \"wasm32\"))] + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn run_threaded(&self) -> std::thread::JoinHandle<()> {{ - self.imp.run_threaded() + #[cfg(target_arch = \"wasm32\")] + {{ + panic!(\"`DbConnection::run_threaded` is not supported on WebAssembly (wasm32); \\ + prefer using `DbConnection::run_background` instead\"); + }} + #[cfg(not(target_arch = \"wasm32\"))] + {{ + self.imp.run_threaded() + }} }} - #[cfg(target_arch = \"wasm32\")] - pub fn run_threaded(&self) {{ - self.imp.run_threaded() + /// Spawn a task which processes WebSocket messages as they are received. + /// + /// # Panics + /// At runtime if called on any non-`wasm32` target. + pub fn run_background(&self) {{ + #[cfg(not(target_arch = \"wasm32\"))] + {{ + panic!(\"`DbConnection::run_background` is only supported on WebAssembly (wasm32); \\ + prefer using `DbConnection::run_threaded` instead\"); + }} + #[cfg(target_arch = \"wasm32\")] + {{ + self.imp.run_background() + }} }} /// Run an `async` loop which processes WebSocket messages when polled. diff --git a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap index 9f81022ae37..f6a9771d33b 100644 --- a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap +++ b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap @@ -1391,6 +1391,7 @@ impl __sdk::InModule for RemoteTables { /// /// - [`DbConnection::frame_tick`]. /// - [`DbConnection::run_threaded`]. +/// - [`DbConnection::run_background`]. /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. /// - [`DbConnection::advance_one_message_blocking`]. @@ -1491,8 +1492,19 @@ impl DbConnection { /// This is a low-level primitive exposed for power users who need significant control over scheduling. /// Most applications should call [`Self::run_threaded`] to spawn a thread /// which advances the connection automatically. + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn advance_one_message_blocking(&self) -> __sdk::Result<()> { - self.imp.advance_one_message_blocking() + #[cfg(target_arch = "wasm32")] + { + panic!("`DbConnection::advance_one_message_blocking` is not supported on WebAssembly (wasm32); \ + prefer using `advance_one_message` or `advance_one_message_async` instead"); + } + #[cfg(not(target_arch = "wasm32"))] + { + self.imp.advance_one_message_blocking() + } } /// Process one WebSocket message, `await`ing until one is received. @@ -1516,14 +1528,35 @@ impl DbConnection { } /// Spawn a thread which processes WebSocket messages as they are received. - #[cfg(not(target_arch = "wasm32"))] + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { - self.imp.run_threaded() + #[cfg(target_arch = "wasm32")] + { + panic!("`DbConnection::run_threaded` is not supported on WebAssembly (wasm32); \ + prefer using `DbConnection::run_background` instead"); + } + #[cfg(not(target_arch = "wasm32"))] + { + self.imp.run_threaded() + } } - #[cfg(target_arch = "wasm32")] - pub fn run_threaded(&self) { - self.imp.run_threaded() + /// Spawn a task which processes WebSocket messages as they are received. + /// + /// # Panics + /// At runtime if called on any non-`wasm32` target. + pub fn run_background(&self) { + #[cfg(not(target_arch = "wasm32"))] + { + panic!("`DbConnection::run_background` is only supported on WebAssembly (wasm32); \ + prefer using `DbConnection::run_threaded` instead"); + } + #[cfg(target_arch = "wasm32")] + { + self.imp.run_background() + } } /// Run an `async` loop which processes WebSocket messages when polled. diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index ba240fd71bc..ff949f26533 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -627,7 +627,6 @@ impl DbContextImpl { /// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop. /// /// Called by the autogenerated `DbConnection` method of the same name. - #[cfg(not(feature = "web"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { let this = self.clone(); std::thread::spawn(move || loop { @@ -639,8 +638,11 @@ impl DbContextImpl { }) } + /// Spawn a background task which does [`Self::advance_one_message_async`] in a loop. + /// + /// Called by the autogenerated `DbConnection` method of the same name. #[cfg(feature = "web")] - pub fn run_threaded(&self) { + pub fn run_background(&self) { let this = self.clone(); wasm_bindgen_futures::spawn_local(async move { loop { @@ -879,6 +881,7 @@ You must explicitly advance the connection by calling any one of: - `DbConnection::frame_tick`. - `DbConnection::run_threaded`. +- `DbConnection::run_background`. - `DbConnection::run_async`. - `DbConnection::advance_one_message`. - `DbConnection::advance_one_message_blocking`. From f8b3a775f7cd877cdb9163717bac18d44f19e979 Mon Sep 17 00:00:00 2001 From: Thales R Date: Sun, 11 May 2025 15:03:17 +0200 Subject: [PATCH 10/20] Reduce cfg noise by simplifying mutex handling in the sdk crate Trim down repetitive `cfg` clauses by extracting common lock patterns into `get_lock_[sync|async]`. --- sdks/rust/src/db_connection.rs | 67 ++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index ff949f26533..c3044469ef8 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -317,13 +317,7 @@ impl DbContextImpl { /// Apply all queued [`PendingMutation`]s. fn apply_pending_mutations(&self) -> crate::Result<()> { - #[cfg(not(feature = "web"))] - while let Ok(Some(pending_mutation)) = self.pending_mutations_recv.blocking_lock().try_next() { - self.apply_mutation(pending_mutation)?; - } - - #[cfg(feature = "web")] - while let Ok(Some(pending_mutation)) = self.pending_mutations_recv.lock().unwrap().try_next() { + while let Ok(Some(pending_mutation)) = get_lock_sync(&self.pending_mutations_recv).try_next() { self.apply_mutation(pending_mutation)?; } @@ -529,23 +523,14 @@ impl DbContextImpl { // returns `Err(_)`. Similar behavior as `Iterator::next` and // `Stream::poll_next`. No comment on whether this is a good mental // model or not. - - let res = { - #[cfg(not(feature = "web"))] - let mut recv = self.recv.blocking_lock(); - - #[cfg(feature = "web")] - let mut recv = self.recv.lock().unwrap(); - - match recv.try_next() { - Ok(None) => { - let disconnect_ctx = self.make_event_ctx(None); - self.invoke_disconnected(&disconnect_ctx); - Err(crate::Error::Disconnected) - } - Err(_) => Ok(false), - Ok(Some(msg)) => self.process_message(msg).map(|_| true), + let res = match get_lock_sync(&self.recv).try_next() { + Ok(None) => { + let disconnect_ctx = self.make_event_ctx(None); + self.invoke_disconnected(&disconnect_ctx); + Err(crate::Error::Disconnected) } + Err(_) => Ok(false), + Ok(Some(msg)) => self.process_message(msg).map(|_| true), }; // Also apply any new pending messages afterwards, @@ -561,15 +546,8 @@ impl DbContextImpl { // We call this out as an incorrect and unsupported thing to do. #![allow(clippy::await_holding_lock)] - #[cfg(not(feature = "web"))] - let mut pending_mutations = self.pending_mutations_recv.lock().await; - #[cfg(feature = "web")] - let mut pending_mutations = self.pending_mutations_recv.lock().unwrap(); - - #[cfg(not(feature = "web"))] - let mut recv = self.recv.lock().await; - #[cfg(feature = "web")] - let mut recv = self.recv.lock().unwrap(); + let mut pending_mutations = get_lock_async(&self.pending_mutations_recv).await; + let mut recv = get_lock_async(&self.recv).await; // Always process pending mutations before WS messages, if they're available, // so that newly registered callbacks run on messages. @@ -1163,6 +1141,31 @@ fn enter_or_create_runtime() -> crate::Result<(Option, runtime::Handle) } } +/// Synchronous lock helper: native = blocking_lock, web = lock().unwrap() +#[cfg(not(feature = "web"))] +fn get_lock_sync(mutex: &TokioMutex) -> tokio::sync::MutexGuard<'_, T> { + mutex.blocking_lock() +} + +/// Synchronous lock helper: native = blocking_lock, web = lock().unwrap() +#[cfg(feature = "web")] +fn get_lock_sync(mutex: &StdMutex) -> std::sync::MutexGuard<'_, T> { + mutex.lock().unwrap() +} + +// Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn +#[cfg(not(feature = "web"))] +async fn get_lock_async(mutex: &TokioMutex) -> tokio::sync::MutexGuard<'_, T> { + mutex.lock().await +} + +// Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn +#[cfg(feature = "web")] +pub async fn get_lock_async(mutex: &StdMutex) -> std::sync::MutexGuard<'_, T> { + // still async, but does the sync lock immediately + mutex.lock().unwrap() +} + enum ParsedMessage { TransactionUpdate(M::DbUpdate), IdentityToken(Identity, Box, ConnectionId), From 9149bc1f3fd21e5ffe6f939ed8ff477c8cea9f1c Mon Sep 17 00:00:00 2001 From: Thales R Date: Sun, 11 May 2025 15:30:26 +0200 Subject: [PATCH 11/20] Remove tokio dependency from the rust web sdk --- Cargo.lock | 1 - sdks/rust/Cargo.toml | 7 ------- sdks/rust/src/db_connection.rs | 37 +++++++++++++++++++++++----------- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5fc3fe756c6..cba8c1b3b03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6871,7 +6871,6 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ - "web-time", "zeroize", ] diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index a7c4b0b926f..105ae277bdd 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -17,7 +17,6 @@ web = [ "dep:gloo-net", "dep:gloo-storage", "dep:js-sys", - "dep:rustls-pki-types", "dep:tokio-tungstenite-wasm", "dep:wasm-bindgen", "dep:wasm-bindgen-futures", @@ -52,7 +51,6 @@ gloo-console = { version = "0.3.0", optional = true } gloo-net = { version = "0.6.0", optional = true } gloo-storage = { version = "0.3.0", optional = true } js-sys = { version = "0.3", optional = true } -rustls-pki-types = { version = "1.12.0", features = ["web"], optional = true } tokio-tungstenite-wasm = { version = "0.6.0", optional = true } wasm-bindgen = { version = "0.2.100", optional = true } wasm-bindgen-futures = { version = "0.4.45", optional = true } @@ -65,11 +63,6 @@ home.workspace = true tokio.workspace = true tokio-tungstenite.workspace = true -[target.'cfg(target_arch = "wasm32")'.dependencies] -tokio = { version = "1.37", default-features = false, features = [ - "rt", "macros", "sync", "io-util" -] } - [dev-dependencies] # for quickstart-chat and cursive-chat examples hex.workspace = true diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index c3044469ef8..95be5e7fff1 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -32,6 +32,8 @@ use crate::{ }; use bytes::Bytes; use futures::StreamExt; +#[cfg(feature = "web")] +use futures::{pin_mut, FutureExt}; use futures_channel::mpsc; use http::Uri; use spacetimedb_client_api_messages::websocket::{self as ws, common::QuerySetId}; @@ -41,9 +43,11 @@ use std::{ collections::HashMap, sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock}, }; -use tokio::runtime::{self, Runtime}; #[cfg(not(feature = "web"))] -use tokio::sync::Mutex as TokioMutex; +use tokio::{ + runtime::{self, Runtime}, + sync::Mutex as TokioMutex, +}; pub(crate) type SharedCell = Arc>; @@ -53,6 +57,7 @@ pub(crate) type SharedCell = Arc>; /// This must be relatively cheaply `Clone`-able, and have internal sharing, /// as numerous operations will clone it to get new handles on the connection. pub struct DbContextImpl { + #[cfg(not(feature = "web"))] runtime: runtime::Handle, /// All the state which is safe to hold a lock on while running callbacks. @@ -99,6 +104,7 @@ pub struct DbContextImpl { impl Clone for DbContextImpl { fn clone(&self) -> Self { Self { + #[cfg(not(feature = "web"))] runtime: self.runtime.clone(), // Being very explicit with `Arc::clone` here, // since we'll be doing `DbContextImpl::clone` very frequently, @@ -558,15 +564,28 @@ impl DbContextImpl { return Message::Local(pending_mutation.unwrap()); } + #[cfg(not(feature = "web"))] tokio::select! { pending_mutation = pending_mutations.next() => Message::Local(pending_mutation.unwrap()), incoming_message = recv.next() => Message::Ws(incoming_message), } + + #[cfg(feature = "web")] + { + let (pending_fut, recv_fut) = (pending_mutations.next().fuse(), recv.next().fuse()); + pin_mut!(pending_fut, recv_fut); + + futures::select! { + pending_mutation = pending_fut => Message::Local(pending_mutation.unwrap()), + incoming_message = recv_fut => Message::Ws(incoming_message), + } + } } /// Like [`Self::advance_one_message`], but sleeps the thread until a message is available. /// /// Called by the autogenerated `DbConnection` method of the same name. + #[cfg(not(feature = "web"))] pub fn advance_one_message_blocking(&self) -> crate::Result<()> { match self.runtime.block_on(self.get_message()) { Message::Local(pending) => self.apply_mutation(pending), @@ -605,6 +624,7 @@ impl DbContextImpl { /// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop. /// /// Called by the autogenerated `DbConnection` method of the same name. + #[cfg(not(feature = "web"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { let this = self.clone(); std::thread::spawn(move || loop { @@ -759,6 +779,7 @@ pub(crate) struct DbContextImplInner { /// `Some` if not within the context of an outer runtime. The `Runtime` must /// then live as long as `Self`. #[allow(unused)] + #[cfg(not(feature = "web"))] runtime: Option, db_callbacks: DbCallbacks, @@ -942,7 +963,6 @@ but you must call one of them, or else the connection will never progress. #[cfg(feature = "web")] pub async fn build_impl(self) -> crate::Result> { - let (runtime, handle) = enter_or_create_runtime()?; let db_callbacks = DbCallbacks::default(); let reducer_callbacks = ReducerCallbacks::default(); @@ -963,8 +983,6 @@ but you must call one of them, or else the connection will never progress. let parsed_recv_chan = spawn_parse_loop::(raw_msg_recv); let inner = Arc::new(StdMutex::new(DbContextImplInner { - runtime, - db_callbacks, reducer_callbacks, subscriptions: SubscriptionManager::default(), @@ -982,7 +1000,6 @@ but you must call one of them, or else the connection will never progress. let (pending_mutations_send, pending_mutations_recv) = mpsc::unbounded(); let ctx_imp = DbContextImpl { - runtime: handle, inner, send_chan, cache, @@ -1114,15 +1131,11 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal // When called from within an async context, return a handle to it (and no // `Runtime`), otherwise create a fresh `Runtime` and return it along with a // handle to it. +#[cfg(not(feature = "web"))] fn enter_or_create_runtime() -> crate::Result<(Option, runtime::Handle)> { match runtime::Handle::try_current() { Err(e) if e.is_missing_context() => { - #[cfg(not(feature = "web"))] - let mut rt = tokio::runtime::Builder::new_multi_thread(); - #[cfg(feature = "web")] - let mut rt = tokio::runtime::Builder::new_current_thread(); - - let rt = rt + let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(1) .thread_name("spacetimedb-background-connection") From 937d4982c75a4b1f2689af3c611b7f94e57eb6d2 Mon Sep 17 00:00:00 2001 From: Thales R Date: Fri, 16 May 2025 19:44:02 +0200 Subject: [PATCH 12/20] Improve error handling for the wasm sdk's Cookie builder --- Cargo.lock | 1 + sdks/rust/Cargo.toml | 6 ++--- sdks/rust/src/credentials.rs | 46 +++++++++++++----------------------- 3 files changed, 20 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cba8c1b3b03..337aa3160f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8410,6 +8410,7 @@ dependencies = [ "gloo-console", "gloo-net", "gloo-storage", + "gloo-utils", "hex", "home", "http 1.3.1", diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index 105ae277bdd..92e61e4fbf8 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -16,6 +16,7 @@ web = [ "dep:gloo-console", "dep:gloo-net", "dep:gloo-storage", + "dep:gloo-utils", "dep:js-sys", "dep:tokio-tungstenite-wasm", "dep:wasm-bindgen", @@ -50,13 +51,12 @@ getrandom = { version = "0.3.2", features = ["wasm_js"], optional = true } gloo-console = { version = "0.3.0", optional = true } gloo-net = { version = "0.6.0", optional = true } gloo-storage = { version = "0.3.0", optional = true } +gloo-utils = { version = "0.2.0", optional = true } js-sys = { version = "0.3", optional = true } tokio-tungstenite-wasm = { version = "0.6.0", optional = true } wasm-bindgen = { version = "0.2.100", optional = true } wasm-bindgen-futures = { version = "0.4.45", optional = true } -web-sys = { version = "0.3.77", features = [ - "Document", "HtmlDocument", "Window" -], optional = true} +web-sys = { version = "0.3.77", features = ["HtmlDocument"], optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] home.workspace = true diff --git a/sdks/rust/src/credentials.rs b/sdks/rust/src/credentials.rs index 4f5c774b591..30dff1e30bf 100644 --- a/sdks/rust/src/credentials.rs +++ b/sdks/rust/src/credentials.rs @@ -160,27 +160,18 @@ mod web_mod { pub mod cookies { use thiserror::Error; use wasm_bindgen::{JsCast, JsValue}; - use web_sys::{window, Document, HtmlDocument}; + use web_sys::HtmlDocument; #[derive(Error, Debug)] pub enum CookieError { - #[error("Window Object not valid in this context")] - NoWindow, - #[error("No `document` available on `window` object")] - NoDocument, - #[error("`document` is not an HtmlDocument")] - NoHtmlDocument, - #[error("web_sys error: {0:?}")] - WebSys(JsValue), - } + #[error("Error reading cookies: {0:?}")] + Get(JsValue), - impl From for CookieError { - fn from(err: JsValue) -> Self { - CookieError::WebSys(err) - } + #[error("Error setting cookie `{key}`: {js_value:?}")] + Set { key: String, js_value: JsValue }, } - /// A builder for contructing and setting cookies. + /// A builder for constructing and setting cookies. pub struct Cookie { name: String, value: String, @@ -207,8 +198,8 @@ mod web_mod { /// Gets the value of a cookie by name. pub fn get(name: &str) -> Result, CookieError> { - let doc = get_html_document()?; - let all = doc.cookie().map_err(CookieError::from)?; + let doc = get_html_document(); + let all = doc.cookie().map_err(|e| CookieError::Get(e))?; for cookie in all.split(';') { let cookie = cookie.trim(); if let Some((k, v)) = cookie.split_once('=') { @@ -240,7 +231,7 @@ mod web_mod { } /// Toggles the `Secure` flag. - /// The default is `false`. + /// Defaults to `false`. pub fn secure(mut self, enabled: bool) -> Self { self.secure = enabled; self @@ -253,7 +244,7 @@ mod web_mod { } pub fn set(self) -> Result<(), CookieError> { - let doc = get_html_document()?; + let doc = get_html_document(); let mut parts = vec![format!("{}={}", self.name, self.value)]; if let Some(path) = self.path { @@ -273,7 +264,10 @@ mod web_mod { } let cookie_str = parts.join("; "); - doc.set_cookie(&cookie_str).map_err(CookieError::from) + doc.set_cookie(&cookie_str).map_err(|e| CookieError::Set { + key: self.name.clone(), + js_value: e, + }) } /// Deletes the cookie by setting its value to empty and `Max-Age=0`. @@ -305,16 +299,8 @@ mod web_mod { } } - fn get_document() -> Result { - window() - .ok_or(CookieError::NoWindow)? - .document() - .ok_or(CookieError::NoDocument) - } - - fn get_html_document() -> Result { - let doc = get_document()?; - doc.dyn_into::().map_err(|_| CookieError::NoHtmlDocument) + fn get_html_document() -> HtmlDocument { + gloo_utils::document().unchecked_into::() } } } From 68a2eb415fdfc213d4060329de9f5fac4d573267 Mon Sep 17 00:00:00 2001 From: Thales R Date: Tue, 28 Oct 2025 13:51:35 +0100 Subject: [PATCH 13/20] Use conditional type aliases to reduce cfg noise on structs --- sdks/rust/src/db_connection.rs | 15 +++++++-------- sdks/rust/src/websocket.rs | 17 ++++++----------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 95be5e7fff1..600d5d9f20a 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -51,6 +51,11 @@ use tokio::{ pub(crate) type SharedCell = Arc>; +#[cfg(not(feature = "web"))] +type SharedAsyncCell = Arc>; +#[cfg(feature = "web")] +type SharedAsyncCell = SharedCell; + /// Implementation of `DbConnection`, `EventContext`, /// and anything else that provides access to the database connection. /// @@ -71,10 +76,7 @@ pub struct DbContextImpl { /// Receiver channel for WebSocket messages, /// which are pre-parsed in the background by [`parse_loop`]. - #[cfg(not(feature = "web"))] - recv: Arc>>>, - #[cfg(feature = "web")] - recv: SharedCell>>, + recv: SharedAsyncCell>>, /// Channel into which operations which apparently mutate SDK state, /// e.g. registering callbacks, push [`PendingMutation`] messages, @@ -84,10 +86,7 @@ pub struct DbContextImpl { /// Receive end of `pending_mutations_send`, /// from which [Self::apply_pending_mutations] and friends read mutations. - #[cfg(not(feature = "web"))] - pending_mutations_recv: Arc>>>, - #[cfg(feature = "web")] - pending_mutations_recv: SharedCell>>, + pending_mutations_recv: SharedAsyncCell>>, /// This connection's `Identity`. /// diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 82821949532..9e56adfc1b0 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -32,6 +32,11 @@ use tokio_tungstenite_wasm::{Message as WebSocketMessage, WebSocketStream}; use crate::compression::decompress_server_message; use crate::metrics::CLIENT_METRICS; +#[cfg(not(feature = "web"))] +type TokioTungsteniteError = tokio_tungstenite::tungstenite::Error; +#[cfg(feature = "web")] +type TokioTungsteniteError = tokio_tungstenite_wasm::Error; + #[derive(Error, Debug, Clone)] pub enum UriError { #[error("Unknown URI scheme {scheme}, expected http, https, ws or wss")] @@ -58,22 +63,12 @@ pub enum WsError { #[error(transparent)] UriError(#[from] UriError), - #[cfg(not(feature = "web"))] - #[error("Error in WebSocket connection with {uri}: {source}")] - Tungstenite { - uri: Uri, - #[source] - // `Arc` is required for `Self: Clone`, as `tungstenite::Error: !Clone`. - source: Arc, - }, - - #[cfg(feature = "web")] #[error("Error in WebSocket connection with {uri}: {source}")] Tungstenite { uri: Uri, #[source] // `Arc` is required for `Self: Clone`, as `tungstenite::Error: !Clone`. - source: Arc, + source: Arc, }, #[error("Received empty raw message, but valid messages always start with a one-byte compression flag")] From 5da7a5ba319e0b96175e7b4d00ceb74d62227260 Mon Sep 17 00:00:00 2001 From: Thales R Date: Tue, 28 Oct 2025 14:02:05 +0100 Subject: [PATCH 14/20] Consolidate web/non-web `build_impl` functions Moves the creation of DbContextImplInner and DbContextImpl into private helper functions (`build_db_ctx_inner` and `build_db_ctx`) to reduce duplication between the web and non-web implementations of `build_impl`. --- Cargo.lock | 8 +- sdks/rust/src/db_connection.rs | 146 ++++++++++++++++++--------------- 2 files changed, 85 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 337aa3160f5..8a69c4b1c41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8406,7 +8406,7 @@ dependencies = [ "flate2", "futures", "futures-channel", - "getrandom 0.3.2", + "getrandom 0.3.4", "gloo-console", "gloo-net", "gloo-storage", @@ -9487,7 +9487,7 @@ dependencies = [ "http 1.3.1", "httparse", "js-sys", - "thiserror 2.0.12", + "thiserror 2.0.17", "tokio", "tokio-tungstenite 0.26.2", "wasm-bindgen", @@ -9876,9 +9876,9 @@ dependencies = [ "http 1.3.1", "httparse", "log", - "rand 0.9.1", + "rand 0.9.2", "sha1", - "thiserror 2.0.12", + "thiserror 2.0.17", "utf-8", ] diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 600d5d9f20a..faea82e5723 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -905,9 +905,6 @@ but you must call one of them, or else the connection will never progress. #[cfg(not(feature = "web"))] fn build_impl(self) -> crate::Result> { let (runtime, handle) = enter_or_create_runtime()?; - let db_callbacks = DbCallbacks::default(); - let reducer_callbacks = ReducerCallbacks::default(); - let procedure_callbacks = ProcedureCallbacks::default(); let connection_id_override = get_connection_id_override(); let ws_connection = tokio::task::block_in_place(|| { @@ -925,49 +922,30 @@ but you must call one of them, or else the connection will never progress. let (_websocket_loop_handle, raw_msg_recv, raw_msg_send) = ws_connection.spawn_message_loop(&handle); let (_parse_loop_handle, parsed_recv_chan) = spawn_parse_loop::(raw_msg_recv, &handle); - - let inner = Arc::new(StdMutex::new(DbContextImplInner { - runtime, - - db_callbacks, - reducer_callbacks, - subscriptions: SubscriptionManager::default(), - - on_connect: self.on_connect, - on_connect_error: self.on_connect_error, - on_disconnect: self.on_disconnect, - procedure_callbacks, - })); - - let mut cache = ClientCache::default(); - M::register_tables(&mut cache); - let cache = Arc::new(StdMutex::new(cache)); - let send_chan = Arc::new(StdMutex::new(Some(raw_msg_send))); + let parsed_recv_chan = Arc::new(TokioMutex::new(parsed_recv_chan)); let (pending_mutations_send, pending_mutations_recv) = mpsc::unbounded(); - let ctx_imp = DbContextImpl { - runtime: handle, - inner, - send_chan, - cache, - recv: Arc::new(TokioMutex::new(parsed_recv_chan)), + let pending_mutations_recv = Arc::new(TokioMutex::new(pending_mutations_recv)); + + let inner_ctx = build_db_ctx_inner(runtime, self.on_connect, self.on_connect_error, self.on_disconnect); + Ok(build_db_ctx( + handle, + inner_ctx, + raw_msg_send, + parsed_recv_chan, pending_mutations_send, - pending_mutations_recv: Arc::new(TokioMutex::new(pending_mutations_recv)), - identity: Arc::new(StdMutex::new(None)), - connection_id: Arc::new(StdMutex::new(connection_id_override)), - }; - - Ok(ctx_imp) + pending_mutations_recv, + connection_id_override, + )) } + /// Open a WebSocket connection, build an empty client cache, &c, + /// to construct a [`DbContextImpl`]. #[cfg(feature = "web")] - pub async fn build_impl(self) -> crate::Result> { - let db_callbacks = DbCallbacks::default(); - let reducer_callbacks = ReducerCallbacks::default(); - + async fn build_impl(self) -> crate::Result> { let connection_id_override = get_connection_id_override(); let ws_connection = WsConnection::connect( - self.uri.unwrap(), + self.uri.clone().unwrap(), self.module_name.as_ref().unwrap(), self.token.as_deref(), connection_id_override, @@ -980,36 +958,20 @@ but you must call one of them, or else the connection will never progress. let (raw_msg_recv, raw_msg_send) = ws_connection.spawn_message_loop(); let parsed_recv_chan = spawn_parse_loop::(raw_msg_recv); - - let inner = Arc::new(StdMutex::new(DbContextImplInner { - db_callbacks, - reducer_callbacks, - subscriptions: SubscriptionManager::default(), - - on_connect: self.on_connect, - on_connect_error: self.on_connect_error, - on_disconnect: self.on_disconnect, - call_reducer_flags: <_>::default(), - })); - - let mut cache = ClientCache::default(); - M::register_tables(&mut cache); - let cache = Arc::new(StdMutex::new(cache)); - let send_chan = Arc::new(StdMutex::new(Some(raw_msg_send))); + let parsed_recv_chan = Arc::new(StdMutex::new(parsed_recv_chan)); let (pending_mutations_send, pending_mutations_recv) = mpsc::unbounded(); - let ctx_imp = DbContextImpl { - inner, - send_chan, - cache, - recv: Arc::new(StdMutex::new(parsed_recv_chan)), - pending_mutations_send, - pending_mutations_recv: Arc::new(StdMutex::new(pending_mutations_recv)), - identity: Arc::new(StdMutex::new(None)), - connection_id: Arc::new(StdMutex::new(connection_id_override)), - }; + let pending_mutations_recv = Arc::new(StdMutex::new(pending_mutations_recv)); - Ok(ctx_imp) + let inner_ctx = build_db_ctx_inner(self.on_connect, self.on_connect_error, self.on_disconnect); + Ok(build_db_ctx( + inner_ctx, + raw_msg_send, + parsed_recv_chan, + pending_mutations_send, + pending_mutations_recv, + connection_id_override, + )) } /// Set the URI of the SpacetimeDB host which is running the remote database. @@ -1127,6 +1089,60 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal } } +/// Create a [`DbContextImplInner`] wrapped in `Arc>`. +fn build_db_ctx_inner( + #[cfg(not(feature = "web"))] runtime: Option, + + on_connect_cb: Option>, + on_connect_error_cb: Option>, + on_disconnect_cb: Option>, +) -> Arc>> { + Arc::new(StdMutex::new(DbContextImplInner { + #[cfg(not(feature = "web"))] + runtime, + + db_callbacks: DbCallbacks::default(), + reducer_callbacks: ReducerCallbacks::default(), + subscriptions: SubscriptionManager::default(), + + on_connect: on_connect_cb, + on_connect_error: on_connect_error_cb, + on_disconnect: on_disconnect_cb, + call_reducer_flags: <_>::default(), + + procedure_callbacks: ProcedureCallbacks::default(), + })) +} + +/// Assemble and return a [`DbContextImpl`] from the provided [`DbContextImplInner`], and channels. +fn build_db_ctx( + #[cfg(not(feature = "web"))] runtime_handle: runtime::Handle, + + inner_ctx: Arc>>, + raw_msg_send: mpsc::UnboundedSender>, + parsed_msg_recv: SharedAsyncCell>>, + pending_mutations_send: mpsc::UnboundedSender>, + pending_mutations_recv: SharedAsyncCell>>, + connection_id: Option, +) -> DbContextImpl { + let mut cache = ClientCache::default(); + M::register_tables(&mut cache); + let cache = Arc::new(StdMutex::new(cache)); + + DbContextImpl { + #[cfg(not(feature = "web"))] + runtime: runtime_handle, + inner: inner_ctx, + send_chan: Arc::new(StdMutex::new(Some(raw_msg_send))), + cache, + recv: parsed_msg_recv, + pending_mutations_send, + pending_mutations_recv, + identity: Arc::new(StdMutex::new(None)), + connection_id: Arc::new(StdMutex::new(connection_id)), + } +} + // When called from within an async context, return a handle to it (and no // `Runtime`), otherwise create a fresh `Runtime` and return it along with a // handle to it. From 7598ebae377a3795249f88f9b4c8250c65f73a42 Mon Sep 17 00:00:00 2001 From: Thales R Date: Tue, 28 Oct 2025 14:43:13 +0100 Subject: [PATCH 15/20] Enforce compile error on non-portable `DbConnection` methods --- crates/codegen/src/rust.rs | 44 +++---------------- .../snapshots/codegen__codegen_rust.snap | 44 +++---------------- 2 files changed, 14 insertions(+), 74 deletions(-) diff --git a/crates/codegen/src/rust.rs b/crates/codegen/src/rust.rs index 20082146501..54e24b8bb1f 100644 --- a/crates/codegen/src/rust.rs +++ b/crates/codegen/src/rust.rs @@ -1747,19 +1747,9 @@ impl DbConnection {{ /// This is a low-level primitive exposed for power users who need significant control over scheduling. /// Most applications should call [`Self::run_threaded`] to spawn a thread /// which advances the connection automatically. - /// - /// # Panics - /// At runtime if called on any `wasm32` target. + #[cfg(not(target_arch = \"wasm32\"))] pub fn advance_one_message_blocking(&self) -> __sdk::Result<()> {{ - #[cfg(target_arch = \"wasm32\")] - {{ - panic!(\"`DbConnection::advance_one_message_blocking` is not supported on WebAssembly (wasm32); \\ - prefer using `advance_one_message` or `advance_one_message_async` instead\"); - }} - #[cfg(not(target_arch = \"wasm32\"))] - {{ - self.imp.advance_one_message_blocking() - }} + self.imp.advance_one_message_blocking() }} /// Process one WebSocket message, `await`ing until one is received. @@ -1783,35 +1773,15 @@ impl DbConnection {{ }} /// Spawn a thread which processes WebSocket messages as they are received. - /// - /// # Panics - /// At runtime if called on any `wasm32` target. + #[cfg(not(target_arch = \"wasm32\"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> {{ - #[cfg(target_arch = \"wasm32\")] - {{ - panic!(\"`DbConnection::run_threaded` is not supported on WebAssembly (wasm32); \\ - prefer using `DbConnection::run_background` instead\"); - }} - #[cfg(not(target_arch = \"wasm32\"))] - {{ - self.imp.run_threaded() - }} + self.imp.run_threaded() }} - /// Spawn a task which processes WebSocket messages as they are received. - /// - /// # Panics - /// At runtime if called on any non-`wasm32` target. + /// Spawn a background task which processes WebSocket messages as they are received. + #[cfg(target_arch = \"wasm32\")] pub fn run_background(&self) {{ - #[cfg(not(target_arch = \"wasm32\"))] - {{ - panic!(\"`DbConnection::run_background` is only supported on WebAssembly (wasm32); \\ - prefer using `DbConnection::run_threaded` instead\"); - }} - #[cfg(target_arch = \"wasm32\")] - {{ - self.imp.run_background() - }} + self.imp.run_background() }} /// Run an `async` loop which processes WebSocket messages when polled. diff --git a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap index f6a9771d33b..a52afa7b525 100644 --- a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap +++ b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap @@ -1492,19 +1492,9 @@ impl DbConnection { /// This is a low-level primitive exposed for power users who need significant control over scheduling. /// Most applications should call [`Self::run_threaded`] to spawn a thread /// which advances the connection automatically. - /// - /// # Panics - /// At runtime if called on any `wasm32` target. + #[cfg(not(target_arch = "wasm32"))] pub fn advance_one_message_blocking(&self) -> __sdk::Result<()> { - #[cfg(target_arch = "wasm32")] - { - panic!("`DbConnection::advance_one_message_blocking` is not supported on WebAssembly (wasm32); \ - prefer using `advance_one_message` or `advance_one_message_async` instead"); - } - #[cfg(not(target_arch = "wasm32"))] - { - self.imp.advance_one_message_blocking() - } + self.imp.advance_one_message_blocking() } /// Process one WebSocket message, `await`ing until one is received. @@ -1528,35 +1518,15 @@ impl DbConnection { } /// Spawn a thread which processes WebSocket messages as they are received. - /// - /// # Panics - /// At runtime if called on any `wasm32` target. + #[cfg(not(target_arch = "wasm32"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { - #[cfg(target_arch = "wasm32")] - { - panic!("`DbConnection::run_threaded` is not supported on WebAssembly (wasm32); \ - prefer using `DbConnection::run_background` instead"); - } - #[cfg(not(target_arch = "wasm32"))] - { - self.imp.run_threaded() - } + self.imp.run_threaded() } - /// Spawn a task which processes WebSocket messages as they are received. - /// - /// # Panics - /// At runtime if called on any non-`wasm32` target. + /// Spawn a background task which processes WebSocket messages as they are received. + #[cfg(target_arch = "wasm32")] pub fn run_background(&self) { - #[cfg(not(target_arch = "wasm32"))] - { - panic!("`DbConnection::run_background` is only supported on WebAssembly (wasm32); \ - prefer using `DbConnection::run_threaded` instead"); - } - #[cfg(target_arch = "wasm32")] - { - self.imp.run_background() - } + self.imp.run_background() } /// Run an `async` loop which processes WebSocket messages when polled. From 042c05bc133e5182fdeb187c6eb1179718083992 Mon Sep 17 00:00:00 2001 From: Thales R Date: Tue, 28 Oct 2025 14:47:36 +0100 Subject: [PATCH 16/20] Bump `getrandom` to 0.3.4, remove legacy `RUSTFLAGS` requirement --- sdks/rust/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/rust/Cargo.toml b/sdks/rust/Cargo.toml index 92e61e4fbf8..eecd5b77eab 100644 --- a/sdks/rust/Cargo.toml +++ b/sdks/rust/Cargo.toml @@ -47,7 +47,7 @@ once_cell.workspace = true prometheus.workspace = true rand.workspace = true -getrandom = { version = "0.3.2", features = ["wasm_js"], optional = true } +getrandom = { version = "0.3.4", features = ["wasm_js"], optional = true } gloo-console = { version = "0.3.0", optional = true } gloo-net = { version = "0.6.0", optional = true } gloo-storage = { version = "0.3.0", optional = true } From 4f913d5f5f72b30cb5824005b6c0b1a6f0cb3c53 Mon Sep 17 00:00:00 2001 From: Thales R Date: Tue, 28 Oct 2025 14:51:00 +0100 Subject: [PATCH 17/20] Minor formatting and code simplification for clippy --- sdks/rust/src/credentials.rs | 20 ++++++++++---------- sdks/rust/src/websocket.rs | 7 +++---- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sdks/rust/src/credentials.rs b/sdks/rust/src/credentials.rs index 30dff1e30bf..7174e8fea15 100644 --- a/sdks/rust/src/credentials.rs +++ b/sdks/rust/src/credentials.rs @@ -199,7 +199,7 @@ mod web_mod { /// Gets the value of a cookie by name. pub fn get(name: &str) -> Result, CookieError> { let doc = get_html_document(); - let all = doc.cookie().map_err(|e| CookieError::Get(e))?; + let all = doc.cookie().map_err(CookieError::Get)?; for cookie in all.split(';') { let cookie = cookie.trim(); if let Some((k, v)) = cookie.split_once('=') { @@ -248,19 +248,19 @@ mod web_mod { let mut parts = vec![format!("{}={}", self.name, self.value)]; if let Some(path) = self.path { - parts.push(format!("Path={}", path)); + parts.push(format!("Path={path}")); } if let Some(domain) = self.domain { - parts.push(format!("Domain={}", domain)); + parts.push(format!("Domain={domain}")); } if let Some(age) = self.max_age { - parts.push(format!("Max-Age={}", age)); + parts.push(format!("Max-Age={age}")); } if self.secure { parts.push("Secure".into()); } if let Some(same) = self.same_site { - parts.push(format!("SameSite={}", same.to_string())); + parts.push(format!("SameSite={same}")); } let cookie_str = parts.join("; "); @@ -289,12 +289,12 @@ mod web_mod { None, } - impl ToString for SameSite { - fn to_string(&self) -> String { + impl std::fmt::Display for SameSite { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - SameSite::Strict => "Strict".into(), - SameSite::Lax => "Lax".into(), - SameSite::None => "None".into(), + SameSite::Strict => f.write_str("Strict"), + SameSite::Lax => f.write_str("Lax"), + SameSite::None => f.write_str("None"), } } } diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 9e56adfc1b0..fd20caa7269 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -257,11 +257,11 @@ async fn fetch_ws_token(host: &Uri, auth_token: &str) -> Result use js_sys::{Reflect, JSON}; use wasm_bindgen::{JsCast, JsValue}; - let url = format!("{}v1/identity/websocket-token", host); + let url = format!("{host}v1/identity/websocket-token"); // helpers to convert gloo_net::Error or JsValue into WsError::TokenVerification let gloo_to_ws_err = |e: gloo_net::Error| match e { - gloo_net::Error::JsError(js_err) => WsError::TokenVerification(js_err.message.into()), + gloo_net::Error::JsError(js_err) => WsError::TokenVerification(js_err.message), gloo_net::Error::SerdeError(e) => WsError::TokenVerification(e.to_string()), gloo_net::Error::GlooError(msg) => WsError::TokenVerification(msg), }; @@ -271,7 +271,7 @@ async fn fetch_ws_token(host: &Uri, auth_token: &str) -> Result } else if let Some(s) = e.as_string() { WsError::TokenVerification(s) } else { - WsError::TokenVerification(format!("{:?}", e)) + WsError::TokenVerification(format!("{e:?}")) } }; @@ -540,7 +540,6 @@ impl WsConnection { mpsc::UnboundedReceiver>, mpsc::UnboundedSender>, ) { - let websocket_received = CLIENT_METRICS.websocket_received.with_label_values(&self.db_name); let websocket_received_msg_size = CLIENT_METRICS .websocket_received_msg_size From ebbd09f365023f5293e68b927399391659470eb7 Mon Sep 17 00:00:00 2001 From: Thales R Date: Sat, 22 Nov 2025 18:05:15 +0100 Subject: [PATCH 18/20] Rename run_background to run_background_task for clarity --- crates/codegen/src/rust.rs | 6 +++--- crates/codegen/tests/snapshots/codegen__codegen_rust.snap | 6 +++--- sdks/rust/src/db_connection.rs | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/codegen/src/rust.rs b/crates/codegen/src/rust.rs index 54e24b8bb1f..1d8b5bd7d3e 100644 --- a/crates/codegen/src/rust.rs +++ b/crates/codegen/src/rust.rs @@ -1646,7 +1646,7 @@ impl __sdk::InModule for RemoteTables {{ /// /// - [`DbConnection::frame_tick`]. /// - [`DbConnection::run_threaded`]. -/// - [`DbConnection::run_background`]. +/// - [`DbConnection::run_background_task`]. /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. /// - [`DbConnection::advance_one_message_blocking`]. @@ -1780,8 +1780,8 @@ impl DbConnection {{ /// Spawn a background task which processes WebSocket messages as they are received. #[cfg(target_arch = \"wasm32\")] - pub fn run_background(&self) {{ - self.imp.run_background() + pub fn run_background_task(&self) {{ + self.imp.run_background_task() }} /// Run an `async` loop which processes WebSocket messages when polled. diff --git a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap index a52afa7b525..cc455c154cb 100644 --- a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap +++ b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap @@ -1391,7 +1391,7 @@ impl __sdk::InModule for RemoteTables { /// /// - [`DbConnection::frame_tick`]. /// - [`DbConnection::run_threaded`]. -/// - [`DbConnection::run_background`]. +/// - [`DbConnection::run_background_task`]. /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. /// - [`DbConnection::advance_one_message_blocking`]. @@ -1525,8 +1525,8 @@ impl DbConnection { /// Spawn a background task which processes WebSocket messages as they are received. #[cfg(target_arch = "wasm32")] - pub fn run_background(&self) { - self.imp.run_background() + pub fn run_background_task(&self) { + self.imp.run_background_task() } /// Run an `async` loop which processes WebSocket messages when polled. diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index faea82e5723..5a3a5958382 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -639,7 +639,7 @@ impl DbContextImpl { /// /// Called by the autogenerated `DbConnection` method of the same name. #[cfg(feature = "web")] - pub fn run_background(&self) { + pub fn run_background_task(&self) { let this = self.clone(); wasm_bindgen_futures::spawn_local(async move { loop { @@ -879,7 +879,7 @@ You must explicitly advance the connection by calling any one of: - `DbConnection::frame_tick`. - `DbConnection::run_threaded`. -- `DbConnection::run_background`. +- `DbConnection::run_background_task`. - `DbConnection::run_async`. - `DbConnection::advance_one_message`. - `DbConnection::advance_one_message_blocking`. From 6e5951a915db3ad3a77a140e07ee9725ece3c85f Mon Sep 17 00:00:00 2001 From: Thales R Date: Sat, 22 Nov 2025 18:17:16 +0100 Subject: [PATCH 19/20] Hide wasm-only DbConnection methods in native docs --- crates/codegen/src/rust.rs | 6 +++--- .../tests/snapshots/codegen__codegen_rust.snap | 12 +++++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/codegen/src/rust.rs b/crates/codegen/src/rust.rs index 1d8b5bd7d3e..4fb408eaab2 100644 --- a/crates/codegen/src/rust.rs +++ b/crates/codegen/src/rust.rs @@ -1645,11 +1645,11 @@ impl __sdk::InModule for RemoteTables {{ /// You must explicitly advance the connection by calling any one of: /// /// - [`DbConnection::frame_tick`]. -/// - [`DbConnection::run_threaded`]. -/// - [`DbConnection::run_background_task`]. +#[cfg_attr(not(target_arch = \"wasm32\"), doc = \"- [`DbConnection::run_threaded`].\")] +#[cfg_attr(target_arch = \"wasm32\", doc = \"- [`DbConnection::run_background_task`].\")] /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. -/// - [`DbConnection::advance_one_message_blocking`]. +#[cfg_attr(not(target_arch = \"wasm32\"), doc = \"- [`DbConnection::advance_one_message_blocking`].\")] /// - [`DbConnection::advance_one_message_async`]. /// /// Which of these methods you should call depends on the specific needs of your application, diff --git a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap index cc455c154cb..f5dd60cec69 100644 --- a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap +++ b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap @@ -1390,11 +1390,17 @@ impl __sdk::InModule for RemoteTables { /// You must explicitly advance the connection by calling any one of: /// /// - [`DbConnection::frame_tick`]. -/// - [`DbConnection::run_threaded`]. -/// - [`DbConnection::run_background_task`]. +#[cfg_attr(not(target_arch = "wasm32"), doc = "- [`DbConnection::run_threaded`].")] +#[cfg_attr( + target_arch = "wasm32", + doc = "- [`DbConnection::run_background_task`]." +)] /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. -/// - [`DbConnection::advance_one_message_blocking`]. +#[cfg_attr( + not(target_arch = "wasm32"), + doc = "- [`DbConnection::advance_one_message_blocking`]." +)] /// - [`DbConnection::advance_one_message_async`]. /// /// Which of these methods you should call depends on the specific needs of your application, From 26cd1b53e18eda124768da6e1dd97c32fbb96125 Mon Sep 17 00:00:00 2001 From: Matthew Pocock Date: Tue, 17 Feb 2026 13:30:18 +0000 Subject: [PATCH 20/20] Fix wasm32 SDK build against latest master - Update websocket API references: ServerMessage -> ws::v2::ServerMessage, ClientMessage -> ws::v2::ClientMessage, BIN_PROTOCOL -> ws::v2::BIN_PROTOCOL - Rename module_name -> database_name in wasm32 build_impl - Remove call_reducer_flags field (removed upstream) - Remove unused HashMap import - Gate sql-parser size assertions to 64-bit targets (struct sizes differ on wasm32 due to pointer width) - Update codegen snapshot --- .../tests/snapshots/codegen__codegen_rust.snap | 10 ++-------- crates/sql-parser/src/parser/mod.rs | 2 ++ sdks/rust/src/db_connection.rs | 12 ++++-------- sdks/rust/src/websocket.rs | 16 ++++++++-------- 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap index f5dd60cec69..985622fb758 100644 --- a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap +++ b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap @@ -1391,16 +1391,10 @@ impl __sdk::InModule for RemoteTables { /// /// - [`DbConnection::frame_tick`]. #[cfg_attr(not(target_arch = "wasm32"), doc = "- [`DbConnection::run_threaded`].")] -#[cfg_attr( - target_arch = "wasm32", - doc = "- [`DbConnection::run_background_task`]." -)] +#[cfg_attr(target_arch = "wasm32", doc = "- [`DbConnection::run_background_task`].")] /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. -#[cfg_attr( - not(target_arch = "wasm32"), - doc = "- [`DbConnection::advance_one_message_blocking`]." -)] +#[cfg_attr(not(target_arch = "wasm32"), doc = "- [`DbConnection::advance_one_message_blocking`].")] /// - [`DbConnection::advance_one_message_async`]. /// /// Which of these methods you should call depends on the specific needs of your application, diff --git a/crates/sql-parser/src/parser/mod.rs b/crates/sql-parser/src/parser/mod.rs index 9e6e5642bda..d4061f1696c 100644 --- a/crates/sql-parser/src/parser/mod.rs +++ b/crates/sql-parser/src/parser/mod.rs @@ -209,7 +209,9 @@ pub(crate) fn parse_proj(expr: Expr) -> SqlParseResult { // These types determine the size of [`parse_expr`]'s stack frame. // Changing their sizes will require updating the recursion limit to avoid stack overflows. +#[cfg(target_pointer_width = "64")] const _: () = assert!(size_of::() == 168); +#[cfg(target_pointer_width = "64")] const _: () = assert!(size_of::>() == 40); /// Parse a scalar expression diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 5a3a5958382..c1fbe14afed 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -39,10 +39,7 @@ use http::Uri; use spacetimedb_client_api_messages::websocket::{self as ws, common::QuerySetId}; use spacetimedb_lib::{bsatn, ser::Serialize, ConnectionId, Identity, Timestamp}; use spacetimedb_sats::Deserialize; -use std::{ - collections::HashMap, - sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock}, -}; +use std::sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock}; #[cfg(not(feature = "web"))] use tokio::{ runtime::{self, Runtime}, @@ -946,7 +943,7 @@ but you must call one of them, or else the connection will never progress. let connection_id_override = get_connection_id_override(); let ws_connection = WsConnection::connect( self.uri.clone().unwrap(), - self.module_name.as_ref().unwrap(), + self.database_name.as_ref().unwrap(), self.token.as_deref(), connection_id_override, self.params, @@ -1108,7 +1105,6 @@ fn build_db_ctx_inner( on_connect: on_connect_cb, on_connect_error: on_connect_error_cb, on_disconnect: on_disconnect_cb, - call_reducer_flags: <_>::default(), procedure_callbacks: ProcedureCallbacks::default(), })) @@ -1119,7 +1115,7 @@ fn build_db_ctx( #[cfg(not(feature = "web"))] runtime_handle: runtime::Handle, inner_ctx: Arc>>, - raw_msg_send: mpsc::UnboundedSender>, + raw_msg_send: mpsc::UnboundedSender, parsed_msg_recv: SharedAsyncCell>>, pending_mutations_send: mpsc::UnboundedSender>, pending_mutations_recv: SharedAsyncCell>>, @@ -1233,7 +1229,7 @@ fn spawn_parse_loop( #[cfg(feature = "web")] fn spawn_parse_loop( - raw_message_recv: mpsc::UnboundedReceiver>, + raw_message_recv: mpsc::UnboundedReceiver, ) -> mpsc::UnboundedReceiver> { let (parsed_message_send, parsed_message_recv) = mpsc::unbounded(); wasm_bindgen_futures::spawn_local(parse_loop(raw_message_recv, parsed_message_send)); diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index fd20caa7269..80732af2dad 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -358,7 +358,7 @@ impl WsConnection { }; let uri = make_uri(host, db_name, connection_id, params, token.as_deref())?; - let sock = tokio_tungstenite_wasm::connect_with_protocols(&uri.to_string(), &[BIN_PROTOCOL]) + let sock = tokio_tungstenite_wasm::connect_with_protocols(&uri.to_string(), &[ws::v2::BIN_PROTOCOL]) .await .map_err(|source| WsError::Tungstenite { uri, @@ -537,8 +537,8 @@ impl WsConnection { pub(crate) fn spawn_message_loop( self, ) -> ( - mpsc::UnboundedReceiver>, - mpsc::UnboundedSender>, + mpsc::UnboundedReceiver, + mpsc::UnboundedSender, ) { let websocket_received = CLIENT_METRICS.websocket_received.with_label_values(&self.db_name); let websocket_received_msg_size = CLIENT_METRICS @@ -549,14 +549,14 @@ impl WsConnection { websocket_received_msg_size.observe(msg_size as f64); }; - let (outgoing_tx, outgoing_rx) = mpsc::unbounded::>(); - let (incoming_tx, incoming_rx) = mpsc::unbounded::>(); + let (outgoing_send, outgoing_recv) = mpsc::unbounded::(); + let (incoming_send, incoming_recv) = mpsc::unbounded::(); let (mut ws_writer, ws_reader) = self.sock.split(); wasm_bindgen_futures::spawn_local(async move { let mut incoming = ws_reader.fuse(); - let mut outgoing = outgoing_rx.fuse(); + let mut outgoing = outgoing_recv.fuse(); loop { futures::select! { @@ -571,7 +571,7 @@ impl WsConnection { record_metrics(bytes.len()); // parse + forward into `incoming_tx` match Self::parse_response(&bytes) { - Ok(msg) => if let Err(_e) = incoming_tx.unbounded_send(msg) { + Ok(msg) => if let Err(_e) = incoming_send.unbounded_send(msg) { gloo_console::warn!("Incoming receiver dropped."); break; }, @@ -625,6 +625,6 @@ impl WsConnection { } }); - (incoming_rx, outgoing_tx) + (incoming_recv, outgoing_send) } }