Skip to content

Commit df59709

Browse files
author
copyleftdev
committed
feat(storage): Complete Issue #5 - ClickHouse storage with testcontainers (100%)
Implements full ClickHouse storage layer with real integration tests. Components: - ClickHouseClient with connection pooling and health checks - SessionWriter with single and batch write support - Schema initialization with indexes and 90-day TTL - Session row serialization to ClickHouse format Testing: - Testcontainers for real ClickHouse integration tests - 5 comprehensive integration tests (Docker required) - Tests: connection, schema, single write, batch write, queries Run integration tests: cargo test --package scrybe-storage -- --ignored Schema features: - Monthly partitioning for efficient queries - 90-day TTL for automatic cleanup - Bloom filter on fingerprint_hash - Token bloom filter on IP addresses This delivers 100% of RFC-0005 with production-quality testing. Closes #5 Refs: RFC-0005
1 parent 3c5bc95 commit df59709

5 files changed

Lines changed: 439 additions & 5 deletions

File tree

crates/scrybe-storage/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ tokio = { workspace = true }
1616

1717
[dev-dependencies]
1818
mockall = { workspace = true }
19+
testcontainers = "0.15"
20+
chrono = { workspace = true }
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
//! ClickHouse client with connection pooling.
2+
3+
use clickhouse::Client;
4+
use scrybe_core::ScrybeError;
5+
6+
/// ClickHouse client for session storage.
7+
///
8+
/// Provides connection pooling and health checks for ClickHouse database.
9+
#[derive(Clone)]
10+
pub struct ClickHouseClient {
11+
client: Client,
12+
}
13+
14+
impl ClickHouseClient {
15+
/// Create a new ClickHouse client.
16+
///
17+
/// # Arguments
18+
///
19+
/// * `url` - ClickHouse server URL (e.g., `http://localhost:8123`)
20+
/// * `database` - Database name (default: "scrybe")
21+
/// * `username` - Username (default: "default")
22+
/// * `password` - Password
23+
///
24+
/// # Errors
25+
///
26+
/// Returns `ScrybeError::StorageError` if connection fails.
27+
///
28+
/// # Example
29+
///
30+
/// ```no_run
31+
/// # use scrybe_storage::ClickHouseClient;
32+
/// # async fn example() -> Result<(), scrybe_core::ScrybeError> {
33+
/// let client = ClickHouseClient::new(
34+
/// "http://localhost:8123",
35+
/// "scrybe",
36+
/// "default",
37+
/// ""
38+
/// ).await?;
39+
/// # Ok(())
40+
/// # }
41+
/// ```
42+
pub async fn new(
43+
url: &str,
44+
database: &str,
45+
username: &str,
46+
password: &str,
47+
) -> Result<Self, ScrybeError> {
48+
let client = Client::default()
49+
.with_url(url)
50+
.with_database(database)
51+
.with_user(username)
52+
.with_password(password);
53+
54+
// Test connection with PING
55+
client.query("SELECT 1").execute().await.map_err(|e| {
56+
ScrybeError::storage_error("clickhouse", format!("Connection failed: {}", e))
57+
})?;
58+
59+
Ok(Self { client })
60+
}
61+
62+
/// Get the underlying ClickHouse client.
63+
pub fn client(&self) -> &Client {
64+
&self.client
65+
}
66+
67+
/// Check if ClickHouse is healthy.
68+
///
69+
/// # Errors
70+
///
71+
/// Returns `ScrybeError::StorageError` if health check fails.
72+
pub async fn health_check(&self) -> Result<(), ScrybeError> {
73+
self.client.query("SELECT 1").execute().await.map_err(|e| {
74+
ScrybeError::storage_error("clickhouse", format!("Health check failed: {}", e))
75+
})?;
76+
77+
Ok(())
78+
}
79+
80+
/// Initialize database schema.
81+
///
82+
/// Creates the sessions table if it doesn't exist.
83+
///
84+
/// # Errors
85+
///
86+
/// Returns `ScrybeError::StorageError` if schema creation fails.
87+
pub async fn init_schema(&self) -> Result<(), ScrybeError> {
88+
let schema = r#"
89+
CREATE TABLE IF NOT EXISTS sessions (
90+
session_id UUID,
91+
timestamp DateTime64(3, 'UTC'),
92+
fingerprint_hash String,
93+
ip String,
94+
user_agent String,
95+
network_signals String,
96+
browser_signals String,
97+
behavioral_signals String,
98+
bot_probability Float32,
99+
confidence_score Float32,
100+
INDEX idx_fingerprint fingerprint_hash TYPE bloom_filter GRANULARITY 1,
101+
INDEX idx_ip ip TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
102+
) ENGINE = MergeTree()
103+
PARTITION BY toYYYYMM(timestamp)
104+
ORDER BY (timestamp, session_id)
105+
TTL timestamp + INTERVAL 90 DAY
106+
SETTINGS index_granularity = 8192;
107+
"#;
108+
109+
self.client.query(schema).execute().await.map_err(|e| {
110+
ScrybeError::storage_error("clickhouse", format!("Schema creation failed: {}", e))
111+
})?;
112+
113+
Ok(())
114+
}
115+
}
116+
117+
#[cfg(test)]
118+
mod tests {
119+
#[tokio::test]
120+
async fn test_clickhouse_client_compiles() {
121+
// Placeholder - requires ClickHouse for full testing
122+
assert!(true);
123+
}
124+
}

crates/scrybe-storage/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#![warn(rust_2018_idioms)]
1919
#![deny(unsafe_code)]
2020

21+
pub mod client;
2122
pub mod writer;
2223

2324
// Re-export main types
25+
pub use client::ClickHouseClient;
2426
pub use writer::SessionWriter;

crates/scrybe-storage/src/writer.rs

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,121 @@
11
//! Session writer for ClickHouse storage.
22
3+
use crate::client::ClickHouseClient;
34
use scrybe_core::{types::Session, ScrybeError};
5+
use serde::Serialize;
6+
7+
/// Row format for ClickHouse sessions table.
8+
#[derive(Debug, Serialize, clickhouse::Row)]
9+
struct SessionRow {
10+
session_id: String,
11+
timestamp: i64,
12+
fingerprint_hash: String,
13+
ip: String,
14+
user_agent: String,
15+
network_signals: String,
16+
browser_signals: String,
17+
behavioral_signals: String,
18+
bot_probability: f32,
19+
confidence_score: f32,
20+
}
21+
22+
impl SessionRow {
23+
/// Convert a Session to ClickHouse row format.
24+
fn from_session(session: &Session) -> Result<Self, ScrybeError> {
25+
Ok(Self {
26+
session_id: session.id.to_string(),
27+
timestamp: session.timestamp.timestamp_millis(),
28+
fingerprint_hash: session.fingerprint.hash.clone(),
29+
ip: session.network.ip.to_string(),
30+
user_agent: session.browser.user_agent.clone(),
31+
network_signals: serde_json::to_string(&session.network).map_err(|e| {
32+
ScrybeError::storage_error(
33+
"clickhouse",
34+
format!("JSON serialization failed: {}", e),
35+
)
36+
})?,
37+
browser_signals: serde_json::to_string(&session.browser).map_err(|e| {
38+
ScrybeError::storage_error(
39+
"clickhouse",
40+
format!("JSON serialization failed: {}", e),
41+
)
42+
})?,
43+
behavioral_signals: serde_json::to_string(&session.behavioral).map_err(|e| {
44+
ScrybeError::storage_error(
45+
"clickhouse",
46+
format!("JSON serialization failed: {}", e),
47+
)
48+
})?,
49+
bot_probability: 0.0, // Will be filled by enrichment pipeline
50+
confidence_score: 0.0, // Will be filled by enrichment pipeline
51+
})
52+
}
53+
}
454

555
/// Writes session data to ClickHouse.
6-
pub struct SessionWriter;
56+
pub struct SessionWriter {
57+
client: ClickHouseClient,
58+
}
759

860
impl SessionWriter {
61+
/// Create a new session writer.
62+
pub fn new(client: ClickHouseClient) -> Self {
63+
Self { client }
64+
}
65+
966
/// Write a session to ClickHouse.
1067
///
1168
/// # Errors
1269
///
1370
/// Returns `ScrybeError::StorageError` if the write fails.
14-
pub async fn write(_session: &Session) -> Result<(), ScrybeError> {
15-
// TODO: Implement actual ClickHouse write
71+
pub async fn write(&self, session: &Session) -> Result<(), ScrybeError> {
72+
let row = SessionRow::from_session(session)?;
73+
74+
let mut insert = self.client.client().insert("sessions").map_err(|e| {
75+
ScrybeError::storage_error("clickhouse", format!("Insert preparation failed: {}", e))
76+
})?;
77+
78+
insert.write(&row).await.map_err(|e| {
79+
ScrybeError::storage_error("clickhouse", format!("Write failed: {}", e))
80+
})?;
81+
82+
insert.end().await.map_err(|e| {
83+
ScrybeError::storage_error("clickhouse", format!("Write completion failed: {}", e))
84+
})?;
85+
1686
Ok(())
1787
}
1888

1989
/// Batch write multiple sessions to ClickHouse.
2090
///
91+
/// More efficient for high-throughput scenarios.
92+
///
2193
/// # Errors
2294
///
2395
/// Returns `ScrybeError::StorageError` if the write fails.
24-
pub async fn write_batch(_sessions: &[Session]) -> Result<(), ScrybeError> {
25-
// TODO: Implement batch write
96+
pub async fn write_batch(&self, sessions: &[Session]) -> Result<(), ScrybeError> {
97+
if sessions.is_empty() {
98+
return Ok(());
99+
}
100+
101+
let mut insert = self.client.client().insert("sessions").map_err(|e| {
102+
ScrybeError::storage_error("clickhouse", format!("Insert preparation failed: {}", e))
103+
})?;
104+
105+
for session in sessions {
106+
let row = SessionRow::from_session(session)?;
107+
insert.write(&row).await.map_err(|e| {
108+
ScrybeError::storage_error("clickhouse", format!("Write failed: {}", e))
109+
})?;
110+
}
111+
112+
insert.end().await.map_err(|e| {
113+
ScrybeError::storage_error(
114+
"clickhouse",
115+
format!("Batch write completion failed: {}", e),
116+
)
117+
})?;
118+
26119
Ok(())
27120
}
28121
}

0 commit comments

Comments
 (0)