diff --git a/.gitignore b/.gitignore index f137669..5c6d378 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ *~ .DS_Store config.toml +test_config.toml +cache/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a02830..601e46f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.1.2] - 2026-02-10 + +### Added +- **Optional nginx-like disk cache**: Persistent disk-based cache for media files + - New `disk_cache_enabled` configuration option (default: false) + - New `disk_cache_path` configuration option to specify cache directory (default: ./cache) + - New `disk_cache_max_size` configuration option to limit disk space usage (default: 1GB) + - Automatic LRU eviction when disk cache size limit is reached + - TTL-based expiration using file metadata + - Atomic file writes to prevent cache corruption + - Fallback chain: memory cache → disk cache → upstream fetch + - Cache statistics in `/metrics` endpoint now include disk cache metrics + +### Changed +- Bumped version from 0.1.1 to 0.1.2 + ## [0.1.1] - 2026-02-09 ### Fixed @@ -29,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Initial implementation -[Unreleased]: https://github.com/BlockG-ws/akkoproxy/compare/v0.1.1...HEAD +[Unreleased]: https://github.com/BlockG-ws/akkoproxy/compare/v0.1.2...HEAD +[0.1.2]: https://github.com/BlockG-ws/akkoproxy/compare/v0.1.1...v0.1.2 [0.1.1]: https://github.com/BlockG-ws/akkoproxy/compare/v0.1.0...v0.1.1 [0.1.0]: https://github.com/BlockG-ws/akkoproxy/releases/tag/v0.1.0 diff --git a/Cargo.lock b/Cargo.lock index 06c0412..705e001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,13 +19,14 @@ dependencies = [ [[package]] name = "akkoproxy" -version = "0.1.1" +version = "0.1.2" dependencies = [ "anyhow", "axum", "bytes", "clap", "futures", + "hex", "http-body-util", "hyper", "hyper-util", @@ -35,6 +36,9 @@ dependencies = [ "moka", "reqwest", "serde", + "serde_json", + "sha2", + "tempfile", "thiserror 1.0.69", "tokio", "toml 0.8.23", @@ -43,6 +47,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "walkdir", ] [[package]] @@ -354,6 +359,15 @@ dependencies = [ "core2", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "brotli" version = "8.0.2" @@ -581,6 +595,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -630,6 +653,26 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -673,6 +716,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "event-listener" version = "5.4.1" @@ -709,6 +762,12 @@ dependencies = [ "zune-inflate", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fax" version = "0.2.6" @@ -858,6 +917,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -953,6 +1022,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "1.4.0" @@ -1381,6 +1456,12 @@ dependencies = [ "cc", ] +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "litemap" version = "0.8.1" @@ -2150,6 +2231,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.35" @@ -2197,6 +2291,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scan_fmt" version = "0.2.6" @@ -2294,6 +2397,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2461,6 +2575,19 @@ version = "0.12.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" +[[package]] +name = "tempfile" +version = "3.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" +dependencies = [ + "fastrand", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -2819,6 +2946,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "unicode-ident" version = "1.0.22" @@ -2889,6 +3022,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3006,6 +3155,15 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-link" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 0f5be70..30a7073 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akkoproxy" -version = "0.1.1" +version = "0.1.2" edition = "2021" authors = ["Akkoproxy Contributors"] description = "A fast caching and optimization media proxy for Akkoma/Pleroma" @@ -25,6 +25,7 @@ libavif = { version = "0.12", optional = true } # Configuration serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" toml = "0.8" # Caching @@ -41,9 +42,15 @@ http-body-util = "0.1" url = "2.5" clap = { version = "4.5", features = ["derive"] } ipnetwork = "0.20" +sha2 = "0.10" +hex = "0.4" +walkdir = "2.4" [profile.release] opt-level = 3 lto = "thin" codegen-units = 1 strip = true + +[dev-dependencies] +tempfile = "3.8" diff --git a/README.md b/README.md index 6d8c9f6..4a0bd66 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ A fast caching and optimization media proxy for Akkoma/Pleroma, built in Rust. ## Features - **Caching Reverse Proxy**: Caches media and proxy requests to reduce load on upstream servers + - In-memory LRU cache for fast access + - Optional nginx-like disk cache for persistence across restarts - **Secure X-Forwarded Headers**: Opt-in forwarding of `X-Forwarded-Proto`, `X-Forwarded-For`, and `X-Forwarded-Host` headers with trusted proxy validation, ensuring compatibility with Akkoma's `force_ssl` configuration - **Header Preservation**: Preserves all upstream headers by default, including redirects (302) with Location headers - **Image Format Conversion**: Automatically converts images to modern formats (AVIF, WebP) based on client `Accept` headers @@ -176,11 +178,26 @@ This setup allows Cloudflare to cache different formats separately based on the ```toml [cache] -max_capacity = 10000 # Maximum number of cached items -ttl = 3600 # Cache TTL in seconds (1 hour) -max_item_size = 10485760 # Maximum cacheable item size (10MB) +max_capacity = 10000 # Maximum number of cached items in memory +ttl = 3600 # Cache TTL in seconds (1 hour) +max_item_size = 10485760 # Maximum cacheable item size (10MB) + +# Optional nginx-like disk cache for persistence across restarts +disk_cache_enabled = false # Enable disk-based cache (default: false) +disk_cache_path = "./cache" # Path to disk cache directory (default: ./cache) +disk_cache_max_size = 1073741824 # Maximum disk cache size in bytes (default: 1GB) ``` +**Disk Cache Features:** +- **Persistence**: Cached media survives server restarts +- **Larger capacity**: Cache more data than what fits in RAM +- **Nginx-like behavior**: Similar to nginx's `proxy_cache` functionality +- **Automatic cleanup**: LRU eviction when max size is reached +- **Atomic writes**: Prevents cache corruption from incomplete writes +- **TTL support**: Respects the configured TTL for expiration + +**Note**: The disk cache stores only the media content and content-type. Upstream headers are preserved only in the memory cache. When a disk cache hit is promoted to memory, it will not include the original upstream headers. + ### Image Processing Configuration ```toml diff --git a/config.example.toml b/config.example.toml index b36d92f..6416d8c 100644 --- a/config.example.toml +++ b/config.example.toml @@ -52,6 +52,16 @@ ttl = 3600 # Maximum size of a cached item in bytes (default: 10485760, 10MB) max_item_size = 10485760 +# Enable disk-based cache (default: false) +# When enabled, cached media will be stored on disk for persistence across restarts +disk_cache_enabled = false + +# Path to disk cache directory (default: ./cache) +disk_cache_path = "./cache" + +# Maximum disk cache size in bytes (default: 1073741824, 1GB) +disk_cache_max_size = 1073741824 + [image] # Enable AVIF conversion (default: true) enable_avif = true diff --git a/src/cache.rs b/src/cache.rs index 97ce869..eb5049b 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -3,6 +3,9 @@ use bytes::Bytes; use moka::future::Cache; use std::sync::Arc; use std::time::Duration; +use tracing::warn; + +use crate::disk_cache::DiskCache; /// Cache key for storing responses #[derive(Debug, Clone, Hash, Eq, PartialEq)] @@ -29,28 +32,119 @@ pub struct CachedResponse { #[derive(Clone)] pub struct ResponseCache { cache: Cache>, + disk_cache: Option, + max_item_size: u64, } impl ResponseCache { /// Create a new response cache - pub fn new(max_capacity: u64, ttl: Duration, _max_item_size: u64) -> Self { + pub fn new(max_capacity: u64, ttl: Duration, max_item_size: u64) -> Self { + let cache = Cache::builder() + .max_capacity(max_capacity) + .time_to_live(ttl) + .initial_capacity(100) + .weigher(|_key, value: &Arc| -> u32 { + // Use the size of the cached data as the weight + // Cap at u32::MAX to avoid overflow + value.data.len().min(u32::MAX as usize) as u32 + }) + .build(); + + Self { + cache, + disk_cache: None, + max_item_size, + } + } + + /// Create a new response cache with disk cache support + pub fn new_with_disk_cache( + max_capacity: u64, + ttl: Duration, + max_item_size: u64, + disk_cache: DiskCache + ) -> Self { let cache = Cache::builder() .max_capacity(max_capacity) .time_to_live(ttl) .initial_capacity(100) + .weigher(|_key, value: &Arc| -> u32 { + // Use the size of the cached data as the weight + // Cap at u32::MAX to avoid overflow + value.data.len().min(u32::MAX as usize) as u32 + }) .build(); - Self { cache } + Self { + cache, + disk_cache: Some(disk_cache), + max_item_size, + } } /// Get a cached response pub async fn get(&self, key: &CacheKey) -> Option> { - self.cache.get(key).await + // First, check memory cache + if let Some(cached) = self.cache.get(key).await { + return Some(cached); + } + + // If not in memory and disk cache is enabled, check disk + if let Some(disk_cache) = &self.disk_cache { + if let Some((data, content_type)) = disk_cache.get(key).await { + // Check if the item size exceeds the max_item_size limit + let item_size = data.len() as u64; + if item_size > self.max_item_size { + warn!( + "Disk cache item size ({} bytes) exceeds max_item_size ({} bytes), not promoting to memory", + item_size, self.max_item_size + ); + // Still return the data from disk, just don't promote to memory + let response = CachedResponse { + data, + content_type, + upstream_headers: None, + }; + return Some(Arc::new(response)); + } + + // Found in disk cache, promote to memory cache + let response = CachedResponse { + data, + content_type, + upstream_headers: None, // Note: disk cache doesn't store headers + }; + let arc_response = Arc::new(response); + self.cache.insert(key.clone(), arc_response.clone()).await; + return Some(arc_response); + } + } + + None } /// Store a response in the cache pub async fn put(&self, key: CacheKey, response: CachedResponse) { - self.cache.insert(key, Arc::new(response)).await; + // Check if the item size exceeds the max_item_size limit + let item_size = response.data.len() as u64; + if item_size > self.max_item_size { + warn!( + "Item size ({} bytes) exceeds max_item_size ({} bytes), skipping cache", + item_size, self.max_item_size + ); + return; + } + + // Store in memory cache + self.cache.insert(key.clone(), Arc::new(response.clone())).await; + + // If disk cache is enabled, store there too + if let Some(disk_cache) = &self.disk_cache { + // Store to disk cache and log any errors + if let Err(e) = disk_cache.put(&key, response.data, response.content_type).await { + warn!("Failed to write to disk cache: {}", e); + } + } } /// Get cache statistics @@ -58,6 +152,8 @@ impl ResponseCache { CacheStats { entry_count: self.cache.entry_count(), weighted_size: self.cache.weighted_size(), + disk_cache_enabled: self.disk_cache.is_some(), + disk_stats: self.disk_cache.as_ref().map(|dc| dc.stats()), } } } @@ -67,6 +163,8 @@ impl ResponseCache { pub struct CacheStats { pub entry_count: u64, pub weighted_size: u64, + pub disk_cache_enabled: bool, + pub disk_stats: Option, } #[cfg(test)] diff --git a/src/config.rs b/src/config.rs index 3b46c2b..92eb300 100644 --- a/src/config.rs +++ b/src/config.rs @@ -79,6 +79,18 @@ pub struct CacheConfig { /// Maximum size of a cached item in bytes #[serde(default = "default_max_item_size")] pub max_item_size: u64, + + /// Enable disk-based cache (default: false) + #[serde(default)] + pub disk_cache_enabled: bool, + + /// Path to disk cache directory (default: ./cache) + #[serde(default = "default_disk_cache_path")] + pub disk_cache_path: String, + + /// Maximum disk cache size in bytes (default: 1GB) + #[serde(default = "default_disk_cache_max_size")] + pub disk_cache_max_size: u64, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -139,6 +151,14 @@ fn default_max_dimension() -> u32 { 4096 } +fn default_disk_cache_path() -> String { + "./cache".to_string() +} + +fn default_disk_cache_max_size() -> u64 { + 1024 * 1024 * 1024 // 1GB +} + impl Default for ServerConfig { fn default() -> Self { Self { @@ -158,6 +178,9 @@ impl Default for CacheConfig { max_capacity: default_max_capacity(), ttl: default_ttl(), max_item_size: default_max_item_size(), + disk_cache_enabled: false, + disk_cache_path: default_disk_cache_path(), + disk_cache_max_size: default_disk_cache_max_size(), } } } diff --git a/src/disk_cache.rs b/src/disk_cache.rs new file mode 100644 index 0000000..12dbe85 --- /dev/null +++ b/src/disk_cache.rs @@ -0,0 +1,535 @@ +//! Disk-based cache for media files +//! +//! This module provides a persistent, nginx-like disk cache for media content. +//! The cache stores files using SHA-256 hashed keys and supports automatic +//! TTL-based expiration and LRU eviction. +//! +//! # Features +//! +//! - **Persistent storage**: Cached media survives server restarts +//! - **Automatic cleanup**: LRU eviction when disk space limit is reached +//! - **TTL support**: Respects configured TTL using file creation times +//! - **LRU eviction**: Tracks last access time separately from creation time +//! - **Atomic writes**: Uses temporary files to prevent corruption +//! - **Subdirectory structure**: Organizes files to avoid too many in one directory +//! +//! # Limitations +//! +//! The disk cache stores only: +//! - Media content (bytes) +//! - Content-Type header +//! - Timestamps (created_at, last_access) +//! +//! Other upstream headers are NOT stored in the disk cache. They are only +//! preserved in the memory cache. + +use anyhow::{Context, Result}; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, error, warn}; +use walkdir::WalkDir; + +use crate::cache::CacheKey; + +/// Metadata stored alongside cached content +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CacheMetadata { + content_type: String, + created_at: u64, + last_access: u64, +} + +/// Disk-based cache for media files +#[derive(Clone)] +pub struct DiskCache { + cache_dir: PathBuf, + max_size: u64, + ttl: u64, +} + +impl DiskCache { + /// Create a new disk cache + pub fn new>(cache_dir: P, max_size: u64, ttl: u64) -> Result { + let cache_dir = cache_dir.as_ref().to_path_buf(); + + // Create cache directory if it doesn't exist + if !cache_dir.exists() { + fs::create_dir_all(&cache_dir) + .with_context(|| format!("Failed to create cache directory: {:?}", cache_dir))?; + } + + Ok(Self { + cache_dir, + max_size, + ttl, + }) + } + + /// Generate a cache file path from a cache key + fn get_cache_path(&self, key: &CacheKey) -> PathBuf { + // Create a unique hash for the cache key + let cache_key_str = format!("{}:{}", key.path, key.format); + let mut hasher = Sha256::new(); + hasher.update(cache_key_str.as_bytes()); + let hash = hasher.finalize(); + let hash_str = hex::encode(hash); + + // Use first 2 chars for subdirectory to avoid too many files in one dir + let subdir = &hash_str[0..2]; + let filename = &hash_str[2..]; + + self.cache_dir.join(subdir).join(filename) + } + + /// Get a cached item from disk + pub async fn get(&self, key: &CacheKey) -> Option<(Bytes, String)> { + let cache_path = self.get_cache_path(key); + let metadata_path = cache_path.with_extension("meta"); + let ttl = self.ttl; + + // Use spawn_blocking for disk I/O + let result = tokio::task::spawn_blocking(move || { + // Check if both cache file and metadata exist + if !cache_path.exists() || !metadata_path.exists() { + return None; + } + + // Read and parse metadata + let metadata_str = match fs::read_to_string(&metadata_path) { + Ok(m) => m, + Err(e) => { + warn!("Failed to read metadata file {:?}: {}", metadata_path, e); + return None; + } + }; + + let mut metadata: CacheMetadata = match serde_json::from_str(&metadata_str) { + Ok(m) => m, + Err(e) => { + warn!("Failed to parse metadata file {:?}: {}", metadata_path, e); + return None; + } + }; + + // Check TTL based on created_at + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let age = now.saturating_sub(metadata.created_at); + + // Read the cached data + let data = match fs::read(&cache_path) { + Ok(data) => Bytes::from(data), + Err(e) => { + warn!("Failed to read cache file {:?}: {}", cache_path, e); + return None; + } + }; + + // Update last_access timestamp + metadata.last_access = now; + if let Ok(metadata_json) = serde_json::to_string(&metadata) { + let _ = fs::write(&metadata_path, metadata_json); + } + + debug!("Cache hit from disk: {:?}", cache_path); + Some((data, metadata.content_type, age, ttl)) + }) + .await + .ok() + .flatten(); + + // Check TTL after reading + if let Some((data, content_type, age, ttl)) = result { + if age > ttl { + // Cache entry expired, schedule removal + let cache_path = self.get_cache_path(key); + let metadata_path = cache_path.with_extension("meta"); + tokio::task::spawn_blocking(move || { + debug!("Cache entry expired, removing: {:?}", cache_path); + let _ = fs::remove_file(&cache_path); + let _ = fs::remove_file(&metadata_path); + }); + return None; + } + Some((data, content_type)) + } else { + None + } + } + + /// Store an item in the disk cache + pub async fn put(&self, key: &CacheKey, data: Bytes, content_type: String) -> Result<()> { + let cache_path = self.get_cache_path(key); + let metadata_path = cache_path.with_extension("meta"); + + // Use spawn_blocking for disk I/O + tokio::task::spawn_blocking(move || { + // Create subdirectory if it doesn't exist + if let Some(parent) = cache_path.parent() { + if !parent.exists() { + fs::create_dir_all(parent) + .with_context(|| format!("Failed to create cache subdirectory: {:?}", parent))?; + } + } + + // Create metadata with timestamps + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let metadata = CacheMetadata { + content_type, + created_at: now, + last_access: now, + }; + + // Write data to a temporary file first (atomic write) + // Use proper .tmp extension + let temp_path = { + let file_name = cache_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("data"); + cache_path + .parent() + .unwrap_or_else(|| Path::new("")) + .join(format!("{}.tmp", file_name)) + }; + fs::write(&temp_path, &data) + .with_context(|| format!("Failed to write cache file: {:?}", temp_path))?; + + // Write metadata to a temporary file + let temp_metadata_path = { + let file_name = metadata_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("meta"); + metadata_path + .parent() + .unwrap_or_else(|| Path::new("")) + .join(format!("{}.tmp", file_name)) + }; + let metadata_json = serde_json::to_string(&metadata) + .with_context(|| "Failed to serialize metadata")?; + fs::write(&temp_metadata_path, metadata_json) + .with_context(|| format!("Failed to write metadata file: {:?}", temp_metadata_path))?; + + // Rename to final location (atomic operation) + fs::rename(&temp_path, &cache_path) + .with_context(|| format!("Failed to rename cache file: {:?}", cache_path))?; + fs::rename(&temp_metadata_path, &metadata_path) + .with_context(|| format!("Failed to rename metadata file: {:?}", metadata_path))?; + + debug!("Cache written to disk: {:?}", cache_path); + Ok::<(), anyhow::Error>(()) + }) + .await + .with_context(|| "Spawn blocking task failed")??; + + // Check and enforce cache size limit + self.enforce_size_limit().await; + + Ok(()) + } + + /// Enforce the cache size limit by removing oldest files + async fn enforce_size_limit(&self) { + let cache_dir = self.cache_dir.clone(); + let max_size = self.max_size; + + tokio::task::spawn_blocking(move || { + let total_size = Self::calculate_total_size(&cache_dir); + + if total_size <= max_size { + return; + } + + debug!( + "Cache size ({} bytes) exceeds limit ({} bytes), cleaning up...", + total_size, max_size + ); + + // Collect all cache files with their last access times + let mut files: Vec<(PathBuf, PathBuf, u64)> = Vec::new(); + + for entry in WalkDir::new(&cache_dir) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| { + // Only include actual cache data files (not .meta or .tmp) + e.path().extension().is_none() + }) + { + let data_path = entry.path().to_path_buf(); + let meta_path = data_path.with_extension("meta"); + + // Read metadata to get last_access timestamp + if let Ok(meta_str) = fs::read_to_string(&meta_path) { + if let Ok(metadata) = serde_json::from_str::(&meta_str) { + files.push((data_path, meta_path, metadata.last_access)); + } + } + } + + // Sort by last access time (oldest first) - this is true LRU + files.sort_by_key(|(_, _, last_access)| *last_access); + + // Remove files until we're under the limit + let mut current_size = total_size; + for (data_path, meta_path, _) in files { + if current_size <= max_size { + break; + } + + // Calculate size of both data and metadata files + let mut pair_size = 0u64; + if let Ok(metadata) = fs::metadata(&data_path) { + pair_size += metadata.len(); + } + if let Ok(metadata) = fs::metadata(&meta_path) { + pair_size += metadata.len(); + } + + // Remove both cache file and its metadata + if let Err(e) = fs::remove_file(&data_path) { + error!("Failed to remove cache file {:?}: {}", data_path, e); + } else { + debug!("Removed cache file: {:?}", data_path); + } + + if let Err(e) = fs::remove_file(&meta_path) { + error!("Failed to remove metadata file {:?}: {}", meta_path, e); + } + + current_size = current_size.saturating_sub(pair_size); + } + + debug!("Cache cleanup complete. New size: {} bytes", current_size); + }) + .await + .ok(); + } + + /// Calculate the total size of the cache directory + fn calculate_total_size(cache_dir: &Path) -> u64 { + WalkDir::new(cache_dir) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| { + // Only count cache files and their metadata, not temp files + let ext = e.path().extension().and_then(|s| s.to_str()); + ext.is_none() || ext == Some("meta") + }) + .filter_map(|e| e.metadata().ok()) + .map(|m| m.len()) + .sum() + } + + /// Get the total size of the cache directory + fn get_total_size(&self) -> u64 { + Self::calculate_total_size(&self.cache_dir) + } + + /// Get cache statistics + pub fn stats(&self) -> DiskCacheStats { + let mut file_count = 0u64; + let total_size = self.get_total_size(); + + for _entry in WalkDir::new(&self.cache_dir) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| { + // Only count actual cache data files + e.path().extension().is_none() + }) + { + file_count += 1; + } + + DiskCacheStats { + file_count, + total_size, + max_size: self.max_size, + } + } +} + +/// Disk cache statistics +#[derive(Debug, Clone)] +pub struct DiskCacheStats { + pub file_count: u64, + pub total_size: u64, + pub max_size: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_disk_cache_put_and_get() { + let temp_dir = TempDir::new().unwrap(); + let cache = DiskCache::new(temp_dir.path(), 1024 * 1024, 3600).unwrap(); + + let key = CacheKey::new("/media/test.jpg".to_string(), "avif".to_string()); + let data = Bytes::from("test data"); + let content_type = "image/avif".to_string(); + + cache.put(&key, data.clone(), content_type.clone()).await.unwrap(); + + let result = cache.get(&key).await; + assert!(result.is_some()); + let (cached_data, cached_content_type) = result.unwrap(); + assert_eq!(cached_data, data); + assert_eq!(cached_content_type, content_type); + } + + #[tokio::test] + async fn test_disk_cache_miss() { + let temp_dir = TempDir::new().unwrap(); + let cache = DiskCache::new(temp_dir.path(), 1024 * 1024, 3600).unwrap(); + + let key = CacheKey::new("/media/nonexistent.jpg".to_string(), "webp".to_string()); + let result = cache.get(&key).await; + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_disk_cache_ttl() { + let temp_dir = TempDir::new().unwrap(); + // Create cache with 1 second TTL + let cache = DiskCache::new(temp_dir.path(), 1024 * 1024, 1).unwrap(); + + let key = CacheKey::new("/media/test.jpg".to_string(), "avif".to_string()); + let data = Bytes::from("test data"); + let content_type = "image/avif".to_string(); + + cache.put(&key, data.clone(), content_type.clone()).await.unwrap(); + + // Should be in cache immediately + let result = cache.get(&key).await; + assert!(result.is_some()); + + // Wait for TTL to expire + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // Should be gone after TTL + let result = cache.get(&key).await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_disk_cache_eviction() { + let temp_dir = TempDir::new().unwrap(); + // Create cache with small max size (2KB to allow for metadata overhead) + let cache = DiskCache::new(temp_dir.path(), 2048, 3600).unwrap(); + + // Add multiple entries that exceed the limit + let data = Bytes::from(vec![0u8; 512]); // 512 bytes each + + let key1 = CacheKey::new("/media/test1.jpg".to_string(), "avif".to_string()); + cache.put(&key1, data.clone(), "image/avif".to_string()).await.unwrap(); + + // Small delay to ensure different timestamps + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let key2 = CacheKey::new("/media/test2.jpg".to_string(), "avif".to_string()); + cache.put(&key2, data.clone(), "image/avif".to_string()).await.unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let key3 = CacheKey::new("/media/test3.jpg".to_string(), "avif".to_string()); + cache.put(&key3, data.clone(), "image/avif".to_string()).await.unwrap(); + + // Wait for cleanup to complete + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // At least one of the older entries should have been evicted + let stats = cache.stats(); + assert!(stats.total_size <= cache.max_size, + "Cache size {} should be <= max size {}", stats.total_size, cache.max_size); + + // The newest entry (key3) should still be there + assert!(cache.get(&key3).await.is_some(), "Most recent entry should be retained"); + } + + #[tokio::test] + async fn test_disk_cache_ignores_temp_files() { + let temp_dir = TempDir::new().unwrap(); + let cache = DiskCache::new(temp_dir.path(), 1024 * 1024, 3600).unwrap(); + + // Add a real cache entry + let key = CacheKey::new("/media/test.jpg".to_string(), "avif".to_string()); + let data = Bytes::from("test data"); + cache.put(&key, data.clone(), "image/avif".to_string()).await.unwrap(); + + // Manually create a temp file that should be ignored + let cache_path = cache.get_cache_path(&key); + if let Some(parent) = cache_path.parent() { + let temp_file = parent.join("orphaned_file.tmp"); + fs::write(&temp_file, "orphaned data").unwrap(); + } + + // Stats should only count the real cache entry, not the temp file + let stats = cache.stats(); + assert_eq!(stats.file_count, 1, "Should only count actual cache files"); + + // Total size should not include the temp file + // (it should be close to the size of our test data + metadata) + assert!(stats.total_size < 1024, "Size should be small without temp file"); + } + + #[tokio::test] + async fn test_disk_cache_lru_access_tracking() { + let temp_dir = TempDir::new().unwrap(); + let cache = DiskCache::new(temp_dir.path(), 2048, 3600).unwrap(); + + let data = Bytes::from(vec![0u8; 512]); + + // Add three entries + let key1 = CacheKey::new("/media/test1.jpg".to_string(), "avif".to_string()); + cache.put(&key1, data.clone(), "image/avif".to_string()).await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let key2 = CacheKey::new("/media/test2.jpg".to_string(), "avif".to_string()); + cache.put(&key2, data.clone(), "image/avif".to_string()).await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let key3 = CacheKey::new("/media/test3.jpg".to_string(), "avif".to_string()); + cache.put(&key3, data.clone(), "image/avif".to_string()).await.unwrap(); + + // Access key1 to update its last_access time + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let _ = cache.get(&key1).await; + + // Add one more entry to trigger eviction + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let key4 = CacheKey::new("/media/test4.jpg".to_string(), "avif".to_string()); + cache.put(&key4, data.clone(), "image/avif".to_string()).await.unwrap(); + + // Wait for cleanup + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // key2 should be evicted (least recently accessed, not counting the initial write) + // key1 should still be there (was accessed more recently) + let key1_exists = cache.get(&key1).await.is_some(); + let key2_exists = cache.get(&key2).await.is_some(); + + assert!(key1_exists || !key2_exists, + "LRU should prefer recently accessed items: key1={}, key2={}", + key1_exists, key2_exists); + } +} diff --git a/src/main.rs b/src/main.rs index 00fc975..342e9d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod cache; mod config; +mod disk_cache; mod image; mod proxy; diff --git a/src/proxy.rs b/src/proxy.rs index ef9e95e..f6717e3 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -149,11 +149,41 @@ impl AppState { config.server.bind, config.upstream.url ); - let cache = ResponseCache::new( - config.cache.max_capacity, - Duration::from_secs(config.cache.ttl), - config.cache.max_item_size, - ); + // Initialize cache with optional disk cache support + let cache = if config.cache.disk_cache_enabled { + match crate::disk_cache::DiskCache::new( + &config.cache.disk_cache_path, + config.cache.disk_cache_max_size, + config.cache.ttl, + ) { + Ok(disk_cache) => { + info!( + "Disk cache enabled: path={}, max_size={} bytes", + config.cache.disk_cache_path, config.cache.disk_cache_max_size + ); + ResponseCache::new_with_disk_cache( + config.cache.max_capacity, + Duration::from_secs(config.cache.ttl), + config.cache.max_item_size, + disk_cache, + ) + } + Err(e) => { + warn!("Failed to initialize disk cache: {}. Using memory-only cache.", e); + ResponseCache::new( + config.cache.max_capacity, + Duration::from_secs(config.cache.ttl), + config.cache.max_item_size, + ) + } + } + } else { + ResponseCache::new( + config.cache.max_capacity, + Duration::from_secs(config.cache.ttl), + config.cache.max_item_size, + ) + }; debug!( "Cache initialized: max_capacity={}, ttl={}s, max_item_size={} bytes", config.cache.max_capacity, config.cache.ttl, config.cache.max_item_size @@ -590,11 +620,22 @@ pub async fn health_handler() -> impl IntoResponse { /// Metrics handler pub async fn metrics_handler(State(state): State) -> impl IntoResponse { let stats = state.cache.stats(); - let body = format!( - "# Cache Statistics\ncache_entries {}\ncache_size_bytes {}\n", - stats.entry_count, stats.weighted_size + let mut body = format!( + "# Cache Statistics\ncache_entries {}\ncache_size_bytes {}\ndisk_cache_enabled {}\n", + stats.entry_count, + stats.weighted_size, + if stats.disk_cache_enabled { 1 } else { 0 } ); + if let Some(disk_stats) = stats.disk_stats { + body.push_str(&format!( + "disk_cache_files {}\ndisk_cache_size_bytes {}\ndisk_cache_max_size_bytes {}\n", + disk_stats.file_count, + disk_stats.total_size, + disk_stats.max_size + )); + } + ( StatusCode::OK, [(header::CONTENT_TYPE, "text/plain; version=0.0.4")],