Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"crates/synddb-client",
"crates/synddb-chain-monitor",
"crates/synddb-shared",
"crates/synddb-wal-sequencer",
"tests/e2e/runner",
"tests/e2e-gcs/runner",
]
Expand Down Expand Up @@ -66,6 +67,7 @@ base64 = "0.22"
zstd = "0.13"
sha2 = "0.10"
sha3 = "0.10"
zerocopy = "0.8.31"

# CBOR/COSE serialization
ciborium = "0.2"
Expand Down
1 change: 1 addition & 0 deletions crates/synddb-shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
pub mod parse;
pub mod runtime;
pub mod types;
pub mod utils;
61 changes: 61 additions & 0 deletions crates/synddb-shared/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use alloy::primitives::keccak256;
use std::{
fs::{self},
ops::Deref,
panic,
path::{Path, PathBuf},
thread,
time::{SystemTime, UNIX_EPOCH},
};

#[derive(Debug)]
pub struct TmpDir(PathBuf);

impl Deref for TmpDir {
type Target = Path;
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Drop for TmpDir {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.0);
}
}

impl From<&TmpDir> for PathBuf {
fn from(tmp: &TmpDir) -> Self {
tmp.0.clone()
}
}

/// Returns a unique temporary path for tests.
///
/// The path is constructed by:
/// 1. Getting the caller's source location (file and line)
/// 2. Appending the current timestamp in nanoseconds, process ID, and thread ID
/// 3. Hashing the combined string
/// 4. Creating a path in the system temp directory with format `"{prefix}_{hash}"`
///
/// This ensures unique paths for concurrent tests by including both the test location,
/// process ID, and thread ID for debugging.
pub fn tmp_dir(prefix: &str, dir: Option<PathBuf>) -> TmpDir {
let location = panic::Location::caller();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let thread_id = thread::current().id();
let process_id = std::process::id();

let input = format!("{location}:{timestamp}:{process_id}:{thread_id:?}");
let hash = keccak256(input.as_bytes());
let hash_hex = alloy::hex::encode(hash);

let dir = dir
.unwrap_or_else(std::env::temp_dir)
.join(format!("{prefix}_{hash_hex}"));
fs::create_dir_all(&dir).unwrap();
TmpDir(dir)
}
28 changes: 28 additions & 0 deletions crates/synddb-wal-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "synddb-wal-sequencer"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
repository.workspace = true

[[bin]]
name = "synddb-wal-sequencer"
path = "src/main.rs"

[lints]
workspace = true

[dependencies]
clap = { workspace = true, features = ["env"] }
humantime = { workspace = true }
rusqlite = { workspace = true, features = ["bundled", "session", "hooks"] }
serde = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
zerocopy = { workspace = true, features = ["derive"] }
synddb-shared = { path = "../synddb-shared/" }

[dev-dependencies]
tracing-test = "0.2"
3 changes: 3 additions & 0 deletions crates/synddb-wal-sequencer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod storage_layer;
pub mod wal_monitor;
pub mod wal_reader;
185 changes: 185 additions & 0 deletions crates/synddb-wal-sequencer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
use std::{path::PathBuf, thread, time::Duration};

use clap::Parser;
use synddb_wal_sequencer::{
storage_layer::{watch_and_sync_to_storage, StorageLayer},
wal_monitor::monitor_wal,
};
use tracing::info;

fn parse_duration(s: &str) -> Result<Duration, humantime::DurationError> {
humantime::parse_duration(s)
}

#[derive(Debug, Clone, Parser)]
#[command(name = "synddb-wal-sequencer")]
#[command(about = "WAL sequencer for SQLite replication")]
pub struct Config {
#[arg(long, env = "DB_PATH")]
pub db_path: PathBuf,

#[arg(long, env = "WAL_BACKUPS_DIR")]
pub wal_backups_dir: PathBuf,

#[allow(clippy::doc_markdown)]
/// the storage layer to be used. examples:
/// - filesystem:/path/to/dir
#[arg(long, env = "STORAGE_LAYER")]
pub storage_layer: StorageLayer,

/// interval between WAL checkpoints (e.g., "1s", "500ms")
#[arg(long, env = "CHECKPOINT_INTERVAL", default_value = "1s", value_parser = parse_duration)]
pub checkpoint_interval: Duration,

/// interval between storage sync checks (e.g., "1s", "500ms")
#[arg(long, env = "STORAGE_SYNC_INTERVAL", default_value = "1s", value_parser = parse_duration)]
pub storage_sync_interval: Duration,
}

fn main() {
tracing_subscriber::fmt::init();
let config = Box::leak(Box::new(Config::parse()));
start(config);
}

fn start(config: &'static Config) {
let wal_monitor_handle = thread::Builder::new()
.name("wal_monitor".into())
.spawn(move || {
info!("starting WAL monitor");
monitor_wal(
&config.db_path,
&config.wal_backups_dir,
config.checkpoint_interval,
);
})
.unwrap();

let storage_handle = thread::Builder::new()
.name("storage_service".into())
.spawn(|| {
info!("starting Storage service");
watch_and_sync_to_storage(
&config.wal_backups_dir,
&config.storage_layer,
config.storage_sync_interval,
);
})
.unwrap();

wal_monitor_handle.join().expect("monitor thread panicked");
storage_handle.join().expect("uploader thread panicked");
}

#[cfg(test)]
mod tests {
use super::*;
use std::{fs, path::Path, thread, time::Duration};

use rusqlite::Connection;
use synddb_shared::utils::tmp_dir;
use synddb_wal_sequencer::{
storage_layer::StorageLayer,
wal_monitor::{JOURNAL_MODE, WAL, WAL_AUTOCHECKPOINT},
};
use tracing_test::traced_test;

use crate::Config;

fn apply_wal<P: AsRef<Path>>(wal_path: P, db_path: P) {
let db_path = db_path.as_ref();
let wal_dest = db_path.with_extension("db-wal");

fs::copy(&wal_path, &wal_dest).expect("failed to copy WAL file");

let conn = Connection::open(db_path).expect("failed to open db");
conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")
.expect("failed to checkpoint");
}

#[test]
#[traced_test]
fn basic_functionality() {
let db_dir = tmp_dir("db", None);
let wal_backups_dir = tmp_dir("db_backup", None);
let storage_dir = tmp_dir("storage", None);
let new_db_dir = tmp_dir("new_db", None);

let db_path = db_dir.join("test.db");

// start monitor and storage sync in background threads
let config = Box::leak(Box::new(Config {
db_path: db_path.clone(),
wal_backups_dir: (&wal_backups_dir).into(),
storage_layer: StorageLayer::Filesystem((&storage_dir).into()),
checkpoint_interval: Duration::from_secs(1),
storage_sync_interval: Duration::from_secs(1),
}));
let _handle = thread::spawn(move || start(config));

thread::sleep(Duration::from_millis(20));

// simulate an application writing to the DB
{
let conn = Connection::open(&db_path).unwrap();
conn.pragma_update(None, WAL_AUTOCHECKPOINT, 0).unwrap();

conn.execute(
"CREATE TABLE syndicate (id INTEGER PRIMARY KEY, value TEXT)",
[],
)
.unwrap();

for i in 0..10 {
conn.execute(
"INSERT INTO syndicate (value) VALUES (?1)",
[format!("value_{i}")],
)
.unwrap();
}
}

// collect and sort WAL files from storage
let mut attempts = 0u64;
let mut wal_files = vec![];
loop {
wal_files = fs::read_dir(&*storage_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.path())
.collect();
if !wal_files.is_empty() {
break;
};
attempts += 1;
assert!(attempts <= 50, "timeout waiting for WAL files in storage");
thread::sleep(Duration::from_millis(100));
}

wal_files.sort();

// apply WAL files to new database
let new_db_path = new_db_dir.join("test.db");
let new_conn = Connection::open(&new_db_path).unwrap();
new_conn.pragma_update(None, JOURNAL_MODE, WAL).unwrap();
for wal_file in &wal_files {
apply_wal(wal_file, &new_db_path);
}

// verify data consistency
let mut stmt = new_conn
.prepare("SELECT id, value FROM syndicate ORDER BY id")
.unwrap();
let rows: Vec<(i64, String)> = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
.unwrap()
.map(|r| r.unwrap())
.collect();

assert_eq!(rows.len(), 10);
for (i, (id, value)) in rows.iter().enumerate() {
assert_eq!(*id, (i + 1) as i64);
assert_eq!(*value, format!("value_{i}"));
}
}
}
Loading