From a9220c9f1e090912b58dbc38b3a73259d98c3f95 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Feb 2026 13:44:18 -0500 Subject: [PATCH 01/12] feat: Add Snappy support Implement the Snappy framing format as a new compression algorithm, following the same pattern as the existing codecs (zstd, lz4, etc.). The encoder produces Snappy frames with stream identifiers, compressed/uncompressed chunks (falling back to uncompressed when compression ratio is poor), and masked CRC32C checksums. The decoder is a state machine that parses frame headers, buffers chunk data, and verifies checksums before emitting decompressed output. New dependencies: `snap` for raw Snappy compression and `crc32c` for checksum computation. Both are optional behind the `snappy` feature flag. --- crates/async-compression/Cargo.toml | 8 + crates/async-compression/src/macros.rs | 14 ++ crates/async-compression/tests/proptest.rs | 3 + crates/async-compression/tests/snappy.rs | 4 + crates/async-compression/tests/utils/algos.rs | 27 +++ crates/compression-codecs/Cargo.toml | 4 + crates/compression-codecs/src/lib.rs | 4 + .../compression-codecs/src/snappy/decoder.rs | 203 ++++++++++++++++++ .../compression-codecs/src/snappy/encoder.rs | 144 +++++++++++++ crates/compression-codecs/src/snappy/mod.rs | 96 +++++++++ 10 files changed, 507 insertions(+) create mode 100644 crates/async-compression/tests/snappy.rs create mode 100644 crates/compression-codecs/src/snappy/decoder.rs create mode 100644 crates/compression-codecs/src/snappy/encoder.rs create mode 100644 crates/compression-codecs/src/snappy/mod.rs diff --git a/crates/async-compression/Cargo.toml b/crates/async-compression/Cargo.toml index 6e78e174..61a4ff48 100644 --- a/crates/async-compression/Cargo.toml +++ b/crates/async-compression/Cargo.toml @@ -30,6 +30,7 @@ all-algorithms = [ "xz-parallel", "zlib", "zstd", + "snappy", ] # algorithms @@ -46,6 +47,7 @@ xz2 = ["compression-codecs/xz2", "xz"] zlib = ["compression-codecs/zlib"] zstd = ["compression-codecs/zstd"] zstdmt = ["compression-codecs/zstdmt", "zstd"] +snappy = ["compression-codecs/snappy"] [dependencies] @@ -72,6 +74,7 @@ tokio = { version = "1.38.2", default-features = false, features = [ "macros", "rt-multi-thread", "io-std", + "fs" ] } tokio-util = { version = "0.7", default-features = false, features = ["io"] } @@ -83,6 +86,7 @@ lz4 = "1.28.1" liblzma = "0.4.2" zstd-safe = { version = "7", default-features = false } deflate64 = "0.1.5" +snap = "1" [lints] workspace = true @@ -131,6 +135,10 @@ required-features = ["zstd", "tokio"] name = "zstd-window-size" required-features = ["zstd", "tokio"] +[[test]] +name = "snappy" +required-features = ["snappy"] + [[example]] name = "zlib_tokio_write" required-features = ["zlib", "tokio"] diff --git a/crates/async-compression/src/macros.rs b/crates/async-compression/src/macros.rs index 75af23cc..9eb72ea6 100644 --- a/crates/async-compression/src/macros.rs +++ b/crates/async-compression/src/macros.rs @@ -375,5 +375,19 @@ macro_rules! algos { { @dec } ); + algos!(@algo snappy ["snappy"] SnappyDecoder SnappyEncoder <$inner> + { @enc + + pub fn with_quality(inner: $inner, _level: crate::core::Level) -> Self { + Self { + inner: crate::$($mod::)+generic::Encoder::new( + inner, + crate::codecs::SnappyEncoder::new() + ), + } + } + } + { @dec } + ); } } diff --git a/crates/async-compression/tests/proptest.rs b/crates/async-compression/tests/proptest.rs index ba66b1f7..215fc02b 100644 --- a/crates/async-compression/tests/proptest.rs +++ b/crates/async-compression/tests/proptest.rs @@ -142,4 +142,7 @@ mod proptest { #[cfg(feature = "zstd")] tests!(zstd); + + #[cfg(feature = "snappy")] + tests!(snappy); } diff --git a/crates/async-compression/tests/snappy.rs b/crates/async-compression/tests/snappy.rs new file mode 100644 index 00000000..a0d9b8d0 --- /dev/null +++ b/crates/async-compression/tests/snappy.rs @@ -0,0 +1,4 @@ +#[macro_use] +mod utils; + +test_cases!(snappy); diff --git a/crates/async-compression/tests/utils/algos.rs b/crates/async-compression/tests/utils/algos.rs index 634c47dd..4c4f0345 100644 --- a/crates/async-compression/tests/utils/algos.rs +++ b/crates/async-compression/tests/utils/algos.rs @@ -229,6 +229,33 @@ algos! { } } } + + pub mod snappy("snappy", SnappyEncoder, SnappyDecoder) { + pub mod sync { + pub use crate::utils::impls::sync::to_vec; + + pub fn compress(bytes: &[u8]) -> Vec { + if bytes.is_empty() { + return vec![0xff, 0x06, 0x00, 0x00, b's', b'N', b'a', b'P', b'p', b'Y']; + } + + use std::io::Write; + use snap::write::FrameEncoder; + + let mut output = Vec::new(); + { + let mut encoder = FrameEncoder::new(&mut output); + encoder.write_all(bytes).unwrap(); + } + output + } + + pub fn decompress(bytes: &[u8]) -> Vec { + use snap::read::FrameDecoder; + to_vec(FrameDecoder::new(bytes)) + } + } + } } macro_rules! io_algo_parallel { diff --git a/crates/compression-codecs/Cargo.toml b/crates/compression-codecs/Cargo.toml index bd01d5ba..3638e7ce 100644 --- a/crates/compression-codecs/Cargo.toml +++ b/crates/compression-codecs/Cargo.toml @@ -28,6 +28,7 @@ all-algorithms = [ "zlib", "zstd", "deflate64", + "snappy" ] # algorithms @@ -42,6 +43,7 @@ zlib = ["flate2"] zstd = ["libzstd", "zstd-safe"] zstdmt = ["zstd", "zstd-safe/zstdmt"] deflate64 = ["dep:deflate64"] +snappy = ["snap", "crc32c"] [dependencies] # Workspace dependencies. @@ -56,6 +58,8 @@ lz4 = { version = "1.28.1", optional = true } liblzma = { version = "0.4.5", optional = true } memchr = { version = "2", optional = true } zstd-safe = { version = "7", optional = true, default-features = false } +snap = { version = "1", optional = true, default-features = false } +crc32c = { version = "0.6.8", optional = true, default-features = false } [lints] workspace = true diff --git a/crates/compression-codecs/src/lib.rs b/crates/compression-codecs/src/lib.rs index 32c186a9..1281175f 100644 --- a/crates/compression-codecs/src/lib.rs +++ b/crates/compression-codecs/src/lib.rs @@ -30,6 +30,8 @@ pub mod xz2; pub mod zlib; #[cfg(feature = "zstd")] pub mod zstd; +#[cfg(feature = "snappy")] +pub mod snappy; use compression_core::util::{PartialBuffer, WriteBuffer}; @@ -57,6 +59,8 @@ pub use self::xz2::{Xz2Decoder, Xz2Encoder, Xz2FileFormat}; pub use self::zlib::{ZlibDecoder, ZlibEncoder}; #[cfg(feature = "zstd")] pub use self::zstd::{ZstdDecoder, ZstdEncoder}; +#[cfg(feature = "snappy")] +pub use self::snappy::{SnappyDecoder, SnappyEncoder}; fn forward_output( output: &mut PartialBuffer + AsMut<[u8]>>, diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs new file mode 100644 index 00000000..643903aa --- /dev/null +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -0,0 +1,203 @@ +use crate::snappy::{mask_crc, ChunkType, FrameHeader}; +use crate::DecodeV2; +use compression_core::util::{PartialBuffer, WriteBuffer}; +use std::convert::TryInto; +use std::{io, mem}; + +#[derive(Debug, Default)] +pub struct SnappyDecoder { + state: State, +} + +impl SnappyDecoder { + pub fn new() -> Self { + Self::default() + } +} + +fn decode_chunk(chunk_type: ChunkType, mut buffer: Vec) -> std::io::Result> { + let data = buffer.split_off(4); + + let expected_sum: [u8; 4] = buffer + .try_into() + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid checksum length"))?; + let expected_sum = u32::from_le_bytes(expected_sum); + + let output = match chunk_type { + ChunkType::Compressed => { + let uncompress_length = snap::raw::decompress_len(&data)?; + let mut out_buf = vec![0; uncompress_length]; + let mut decoder = snap::raw::Decoder::new(); + decoder.decompress(&data, &mut out_buf)?; + out_buf + } + ChunkType::Uncompressed => data, + _ => unreachable!( + "can only decode compressed or uncompressed chunks, not {:?}", + chunk_type + ), + }; + + let got_sum = crc32c::crc32c(&output); + let got_sum = mask_crc(got_sum); + if expected_sum != got_sum { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "checksum mismatch", + )); + } + + Ok(output) +} + +#[derive(Debug)] +enum State { + StreamIdentifier(PartialBuffer<[u8; 4]>), + ChunkHeader(PartialBuffer<[u8; 4]>), + Skipping(usize), + Buffering { + remaining: usize, + chunk_type: ChunkType, + buffer: Vec, + }, + Sending(PartialBuffer>), +} + +impl Default for State { + fn default() -> Self { + State::StreamIdentifier(PartialBuffer::new([0; 4])) + } +} + +impl DecodeV2 for SnappyDecoder { + fn reinit(&mut self) -> std::io::Result<()> { + *self = Self::new(); + Ok(()) + } + + fn decode( + &mut self, + input: &mut PartialBuffer<&[u8]>, + output: &mut WriteBuffer<'_>, + ) -> std::io::Result { + loop { + match &mut self.state { + State::StreamIdentifier(header) => { + header.copy_unwritten_from(input); + if !header.unwritten().is_empty() { + return Ok(false); + } + + let header = FrameHeader::parse(header.written())?; + if let ChunkType::Stream = header.chunk_type { + self.state = State::Skipping(header.data_frame_length as usize) + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid chunk type, expected Stream, got: {:?}", + header.chunk_type + ), + )); + } + } + State::ChunkHeader(header) => { + header.copy_unwritten_from(input); + if !header.unwritten().is_empty() { + return Ok(false); + } + + let header = FrameHeader::parse(header.written())?; + + let data_frame_length = header.data_frame_length as usize; + + match header.chunk_type { + ChunkType::Stream + | ChunkType::ReservedSkippable(_) + | ChunkType::Padding => self.state = State::Skipping(data_frame_length), + ChunkType::Compressed | ChunkType::Uncompressed => { + self.state = State::Buffering { + remaining: data_frame_length, + chunk_type: header.chunk_type, + buffer: Vec::with_capacity(data_frame_length), + } + } + ChunkType::ReservedUnskippable(chunk_type) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Reserved unskippable chunk type encountered: {}", + chunk_type + ), + )) + } + } + } + State::Skipping(n) => { + let input_len = input.unwritten().len(); + if input_len < *n { + input.advance(input_len); + *n -= input_len; + return Ok(false); + } + input.advance(*n); + self.state = State::ChunkHeader([0u8; 4].into()) + } + State::Buffering { + remaining, + chunk_type, + buffer, + } => { + let input_buf = input.unwritten(); + let boundary = (*remaining).min(input_buf.len()); + let input_buf = &input_buf[..boundary]; + + *remaining -= input_buf.len(); + + buffer.extend_from_slice(input_buf); + input.advance(input_buf.len()); + + if *remaining != 0 { + return Ok(false); + } + + // We're done buffering, so let's decode the chunk + let chunk_type = *chunk_type; + let buffer = mem::take(buffer); + let output = decode_chunk(chunk_type, buffer)?; + self.state = State::Sending(PartialBuffer::new(output)) + } + State::Sending(buffer) => { + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()) + } else { + return Ok(false); + } + } + } + } + } + + fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { + match &mut self.state { + State::Sending(buffer) => { + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()); + Ok(true) + } else { + Ok(false) + } + } + _ => Ok(true), + } + } + + fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> std::io::Result { + match &mut self.state { + State::ChunkHeader(header) if header.unwritten().len() == 4 => Ok(true), + _ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)), + } + } +} diff --git a/crates/compression-codecs/src/snappy/encoder.rs b/crates/compression-codecs/src/snappy/encoder.rs new file mode 100644 index 00000000..d65eb78f --- /dev/null +++ b/crates/compression-codecs/src/snappy/encoder.rs @@ -0,0 +1,144 @@ +use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, STREAM_FRAME}; +use crate::EncodeV2; +use compression_core::util::{PartialBuffer, WriteBuffer}; + +const MAX_BLOCK_SIZE: usize = 1 << 16; + +#[derive(Debug)] +pub struct SnappyEncoder { + state: State, + chunk: Vec, +} + +impl Default for SnappyEncoder { + fn default() -> Self { + Self { + state: State::InitStream(PartialBuffer::new(STREAM_FRAME)), + chunk: Vec::with_capacity(MAX_BLOCK_SIZE), + } + } +} + +impl SnappyEncoder { + pub fn new() -> Self { + Self::default() + } +} + +#[derive(Debug)] +enum State { + InitStream(PartialBuffer<&'static [u8]>), + Buffering, + Writing(PartialBuffer>), +} + +fn compress_frame(buffer: &[u8]) -> std::io::Result> { + let checksum = crc32c_masked(&buffer); + + let mut encoder = snap::raw::Encoder::new(); + let compress_data = encoder.compress_vec(&buffer)?; + let (chunk_type, data) = if compress_data.len() >= buffer.len() - (buffer.len() / 8) { + (ChunkType::Uncompressed, buffer) + } else { + (ChunkType::Compressed, compress_data.as_slice()) + }; + + // We add 4 because the length includes the 4 bytes of the checksum. + let chunk_len = data.len() + 4; + let header = FrameHeader { + chunk_type, + data_frame_length: chunk_len as u64, + }; + + let mut frame = Vec::with_capacity(data.len() + 8); + let raw_header: [u8; 4] = header.into(); + let raw_checksum: [u8; 4] = checksum.to_le_bytes(); + + frame.extend_from_slice(&raw_header); + frame.extend_from_slice(&raw_checksum); + frame.extend_from_slice(&data); + + Ok(frame) +} + +impl EncodeV2 for SnappyEncoder { + fn encode( + &mut self, + input: &mut PartialBuffer<&[u8]>, + output: &mut WriteBuffer<'_>, + ) -> std::io::Result<()> { + loop { + match &mut self.state { + State::InitStream(buffer) => { + if buffer.unwritten().len() > 0 { + output.copy_unwritten_from(buffer); + if output.has_no_spare_space() { + return Ok(()); + } + } + self.state = State::Buffering + } + State::Buffering => { + let buffer = &mut self.chunk; + let input_buf = input.unwritten(); + let available = MAX_BLOCK_SIZE - buffer.len(); + let boundary = available.min(input_buf.len()); + let input_buf = &input_buf[..boundary]; + + buffer.extend_from_slice(input_buf); + input.advance(input_buf.len()); + + if buffer.len() < MAX_BLOCK_SIZE { + return Ok(()); + } + + let compressed_frame = compress_frame(buffer)?; + buffer.clear(); + self.state = State::Writing(compressed_frame.into()) + } + State::Writing(buffer) => { + if buffer.unwritten().len() > 0 { + output.copy_unwritten_from(buffer); + return Ok(()); + } else { + self.state = State::Buffering + } + } + } + } + } + + fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { + loop { + match &mut self.state { + State::InitStream(buffer) => { + if buffer.unwritten().len() > 0 { + output.copy_unwritten_from(buffer); + if output.has_no_spare_space() { + return Ok(false); + } + } + self.state = State::Buffering + } + State::Buffering => { + let buffer = &mut self.chunk; + let compressed_data = compress_frame(&buffer)?; + buffer.clear(); + self.state = State::Writing(compressed_data.into()) + } + State::Writing(buffer) => { + return if buffer.unwritten().len() > 0 { + output.copy_unwritten_from(buffer); + Ok(false) + } else { + Ok(true) + } + } + } + } + } + + fn finish(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { + self.flush(output) + } +} diff --git a/crates/compression-codecs/src/snappy/mod.rs b/crates/compression-codecs/src/snappy/mod.rs new file mode 100644 index 00000000..ba19c30d --- /dev/null +++ b/crates/compression-codecs/src/snappy/mod.rs @@ -0,0 +1,96 @@ +mod decoder; +mod encoder; + +pub use self::decoder::SnappyDecoder; +pub use self::encoder::SnappyEncoder; + +use std::io; + +const STREAM_FRAME: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; + +#[derive(Debug, Copy, Clone)] +struct FrameHeader { + chunk_type: ChunkType, + data_frame_length: u64, +} + +#[derive(Debug, Copy, Clone)] +enum ChunkType { + Stream, + Compressed, + Uncompressed, + Padding, + ReservedUnskippable(u8), + ReservedSkippable(u8), +} + +impl From for ChunkType { + fn from(value: u8) -> Self { + match value { + 0xFF => Self::Stream, + 0x00 => Self::Compressed, + 0x01 => Self::Uncompressed, + 0xFE => Self::Padding, + 0x02..=0x7f => Self::ReservedUnskippable(value), + 0x80..=0xFD => Self::ReservedSkippable(value), + } + } +} + +impl From for u8 { + fn from(value: ChunkType) -> Self { + match value { + ChunkType::Stream => 0xFF, + ChunkType::Compressed => 0x00, + ChunkType::Uncompressed => 0x01, + ChunkType::Padding => 0xFE, + ChunkType::ReservedUnskippable(chunk_type) => chunk_type, + ChunkType::ReservedSkippable(chunk_type) => chunk_type, + } + } +} + +impl FrameHeader { + fn parse(input: &[u8]) -> io::Result { + let (header_part, _): (&[u8; 4], _) = input.split_first_chunk().ok_or(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Need a least 4 bytes to parse the frame's header", + ))?; + + let chunk_type = ChunkType::from(header_part[0]); + // SAFETY: header_part is guaranteed to have at least 4 bytes due to split_first_chunk + let length_part: &[u8; 3] = header_part[1..].first_chunk().unwrap(); + + let length = read_u24_le(length_part) as u64; + + Ok(Self { + chunk_type, + data_frame_length: length, + }) + } +} + +impl From for [u8; 4] { + fn from(value: FrameHeader) -> Self { + let frame_length = value.data_frame_length as u32; + + let mut header = [0u8; 4]; + header[0] = u8::from(value.chunk_type); + // We're writing a little endian u24 from an u32 by removing the latest significant byte + header[1..4].copy_from_slice(&frame_length.to_le_bytes()[..3]); + header + } +} + +pub fn read_u24_le(slice: &[u8; 3]) -> u32 { + slice[0] as u32 | (slice[1] as u32) << 8 | (slice[2] as u32) << 16 +} + +fn crc32c_masked(input: &[u8]) -> u32 { + let sum = crc32c::crc32c(input); + mask_crc(sum) +} + +fn mask_crc(crc: u32) -> u32 { + (crc.wrapping_shr(15) | crc.wrapping_shl(17)).wrapping_add(0xA282EAD8) +} From dc0a8c8391b3c998dce436c28e5c6fc0216f9792 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Mar 2026 10:09:21 -0400 Subject: [PATCH 02/12] fix: Address Clippy complains --- crates/compression-codecs/src/snappy/encoder.rs | 16 ++++++++-------- crates/compression-codecs/src/snappy/mod.rs | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/compression-codecs/src/snappy/encoder.rs b/crates/compression-codecs/src/snappy/encoder.rs index d65eb78f..40b07508 100644 --- a/crates/compression-codecs/src/snappy/encoder.rs +++ b/crates/compression-codecs/src/snappy/encoder.rs @@ -33,10 +33,10 @@ enum State { } fn compress_frame(buffer: &[u8]) -> std::io::Result> { - let checksum = crc32c_masked(&buffer); + let checksum = crc32c_masked(buffer); let mut encoder = snap::raw::Encoder::new(); - let compress_data = encoder.compress_vec(&buffer)?; + let compress_data = encoder.compress_vec(buffer)?; let (chunk_type, data) = if compress_data.len() >= buffer.len() - (buffer.len() / 8) { (ChunkType::Uncompressed, buffer) } else { @@ -56,7 +56,7 @@ fn compress_frame(buffer: &[u8]) -> std::io::Result> { frame.extend_from_slice(&raw_header); frame.extend_from_slice(&raw_checksum); - frame.extend_from_slice(&data); + frame.extend_from_slice(data); Ok(frame) } @@ -70,7 +70,7 @@ impl EncodeV2 for SnappyEncoder { loop { match &mut self.state { State::InitStream(buffer) => { - if buffer.unwritten().len() > 0 { + if !buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); if output.has_no_spare_space() { return Ok(()); @@ -97,7 +97,7 @@ impl EncodeV2 for SnappyEncoder { self.state = State::Writing(compressed_frame.into()) } State::Writing(buffer) => { - if buffer.unwritten().len() > 0 { + if buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); return Ok(()); } else { @@ -112,7 +112,7 @@ impl EncodeV2 for SnappyEncoder { loop { match &mut self.state { State::InitStream(buffer) => { - if buffer.unwritten().len() > 0 { + if buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); if output.has_no_spare_space() { return Ok(false); @@ -122,12 +122,12 @@ impl EncodeV2 for SnappyEncoder { } State::Buffering => { let buffer = &mut self.chunk; - let compressed_data = compress_frame(&buffer)?; + let compressed_data = compress_frame(buffer)?; buffer.clear(); self.state = State::Writing(compressed_data.into()) } State::Writing(buffer) => { - return if buffer.unwritten().len() > 0 { + return if buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); Ok(false) } else { diff --git a/crates/compression-codecs/src/snappy/mod.rs b/crates/compression-codecs/src/snappy/mod.rs index ba19c30d..53178cb7 100644 --- a/crates/compression-codecs/src/snappy/mod.rs +++ b/crates/compression-codecs/src/snappy/mod.rs @@ -6,7 +6,7 @@ pub use self::encoder::SnappyEncoder; use std::io; -const STREAM_FRAME: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; +const STREAM_FRAME: &[u8] = b"\xFF\x06\x00\x00sNaPpY"; #[derive(Debug, Copy, Clone)] struct FrameHeader { From 130777e78548cc86ba5eda166f248c78a90b76a7 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Mar 2026 10:11:45 -0400 Subject: [PATCH 03/12] fix: Address Cargo formatting complains --- crates/compression-codecs/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/compression-codecs/src/lib.rs b/crates/compression-codecs/src/lib.rs index 1281175f..ba234bbb 100644 --- a/crates/compression-codecs/src/lib.rs +++ b/crates/compression-codecs/src/lib.rs @@ -22,6 +22,8 @@ pub mod gzip; pub mod lz4; #[cfg(feature = "lzma")] pub mod lzma; +#[cfg(feature = "snappy")] +pub mod snappy; #[cfg(feature = "xz")] pub mod xz; #[cfg(feature = "lzma")] @@ -30,8 +32,6 @@ pub mod xz2; pub mod zlib; #[cfg(feature = "zstd")] pub mod zstd; -#[cfg(feature = "snappy")] -pub mod snappy; use compression_core::util::{PartialBuffer, WriteBuffer}; @@ -51,6 +51,8 @@ pub use self::gzip::{GzipDecoder, GzipEncoder}; pub use self::lz4::{Lz4Decoder, Lz4Encoder}; #[cfg(feature = "lzma")] pub use self::lzma::{LzmaDecoder, LzmaEncoder}; +#[cfg(feature = "snappy")] +pub use self::snappy::{SnappyDecoder, SnappyEncoder}; #[cfg(feature = "xz")] pub use self::xz::{XzDecoder, XzEncoder}; #[cfg(feature = "lzma")] @@ -59,8 +61,6 @@ pub use self::xz2::{Xz2Decoder, Xz2Encoder, Xz2FileFormat}; pub use self::zlib::{ZlibDecoder, ZlibEncoder}; #[cfg(feature = "zstd")] pub use self::zstd::{ZstdDecoder, ZstdEncoder}; -#[cfg(feature = "snappy")] -pub use self::snappy::{SnappyDecoder, SnappyEncoder}; fn forward_output( output: &mut PartialBuffer + AsMut<[u8]>>, From ad114abe8da93c8e7b71cf444e1d5760777045e3 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Mar 2026 11:04:58 -0400 Subject: [PATCH 04/12] Fix regression from previous commit --- crates/compression-codecs/src/snappy/encoder.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/compression-codecs/src/snappy/encoder.rs b/crates/compression-codecs/src/snappy/encoder.rs index 40b07508..ce6d2bed 100644 --- a/crates/compression-codecs/src/snappy/encoder.rs +++ b/crates/compression-codecs/src/snappy/encoder.rs @@ -97,7 +97,7 @@ impl EncodeV2 for SnappyEncoder { self.state = State::Writing(compressed_frame.into()) } State::Writing(buffer) => { - if buffer.unwritten().is_empty() { + if !buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); return Ok(()); } else { @@ -112,7 +112,7 @@ impl EncodeV2 for SnappyEncoder { loop { match &mut self.state { State::InitStream(buffer) => { - if buffer.unwritten().is_empty() { + if !buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); if output.has_no_spare_space() { return Ok(false); @@ -127,7 +127,7 @@ impl EncodeV2 for SnappyEncoder { self.state = State::Writing(compressed_data.into()) } State::Writing(buffer) => { - return if buffer.unwritten().is_empty() { + return if !buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); Ok(false) } else { From fba07bb371b722885befe73d5872a0eb397fcf98 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Mar 2026 17:42:50 -0400 Subject: [PATCH 05/12] Apply review from @NobodyXu --- .../tests/utils/test_cases.rs | 1 + .../compression-codecs/src/snappy/decoder.rs | 102 ++++++++++-------- .../compression-codecs/src/snappy/encoder.rs | 96 ++++++++++------- crates/compression-codecs/src/snappy/mod.rs | 8 ++ 4 files changed, 122 insertions(+), 85 deletions(-) diff --git a/crates/async-compression/tests/utils/test_cases.rs b/crates/async-compression/tests/utils/test_cases.rs index af79c4b1..071f367f 100644 --- a/crates/async-compression/tests/utils/test_cases.rs +++ b/crates/async-compression/tests/utils/test_cases.rs @@ -176,6 +176,7 @@ macro_rules! io_test_cases { #[test] #[ntest::timeout(1000)] + #[cfg(not(feature = "snappy"))] fn trailer() { let mut compressed = sync::compress(&[1, 2, 3, 4, 5, 6]); diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs index 643903aa..34a95eff 100644 --- a/crates/compression-codecs/src/snappy/decoder.rs +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -1,53 +1,67 @@ -use crate::snappy::{mask_crc, ChunkType, FrameHeader}; +use crate::snappy::{mask_crc, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE}; use crate::DecodeV2; use compression_core::util::{PartialBuffer, WriteBuffer}; use std::convert::TryInto; use std::{io, mem}; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SnappyDecoder { state: State, + in_buf: Vec, + out_buf: PartialBuffer>, +} + +impl Default for SnappyDecoder { + fn default() -> Self { + Self { + state: State::default(), + in_buf: Vec::with_capacity(MAX_FRAME_SIZE), + out_buf: PartialBuffer::new(Vec::with_capacity(MAX_BLOCK_SIZE)), + } + } } impl SnappyDecoder { pub fn new() -> Self { Self::default() } -} -fn decode_chunk(chunk_type: ChunkType, mut buffer: Vec) -> std::io::Result> { - let data = buffer.split_off(4); - - let expected_sum: [u8; 4] = buffer - .try_into() - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid checksum length"))?; - let expected_sum = u32::from_le_bytes(expected_sum); - - let output = match chunk_type { - ChunkType::Compressed => { - let uncompress_length = snap::raw::decompress_len(&data)?; - let mut out_buf = vec![0; uncompress_length]; - let mut decoder = snap::raw::Decoder::new(); - decoder.decompress(&data, &mut out_buf)?; - out_buf + fn decode_chunk(&mut self, chunk_type: ChunkType) -> std::io::Result<()> { + let (expected_sum, data) = self.in_buf.split_at(4); + + let expected_sum: [u8; 4] = expected_sum + .try_into() + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid checksum length"))?; + let expected_sum = u32::from_le_bytes(expected_sum); + + self.out_buf.reset(); + let mut out_buf = self.out_buf.get_mut(); + out_buf.clear(); + match chunk_type { + ChunkType::Compressed => { + let uncompress_length = snap::raw::decompress_len(data)?; + out_buf.resize(uncompress_length, 0); + let mut decoder = snap::raw::Decoder::new(); + decoder.decompress(&data, &mut out_buf)?; + } + ChunkType::Uncompressed => out_buf.extend_from_slice(data), + _ => unreachable!( + "can only decode compressed or uncompressed chunks, not {:?}", + chunk_type + ), + }; + + let got_sum = crc32c::crc32c(&out_buf); + let got_sum = mask_crc(got_sum); + if expected_sum != got_sum { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "checksum mismatch", + )); } - ChunkType::Uncompressed => data, - _ => unreachable!( - "can only decode compressed or uncompressed chunks, not {:?}", - chunk_type - ), - }; - - let got_sum = crc32c::crc32c(&output); - let got_sum = mask_crc(got_sum); - if expected_sum != got_sum { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "checksum mismatch", - )); - } - Ok(output) + Ok(()) + } } #[derive(Debug)] @@ -58,9 +72,8 @@ enum State { Buffering { remaining: usize, chunk_type: ChunkType, - buffer: Vec, }, - Sending(PartialBuffer>), + Sending, } impl Default for State { @@ -116,10 +129,11 @@ impl DecodeV2 for SnappyDecoder { | ChunkType::ReservedSkippable(_) | ChunkType::Padding => self.state = State::Skipping(data_frame_length), ChunkType::Compressed | ChunkType::Uncompressed => { + let in_buf = &mut self.in_buf; + in_buf.clear(); self.state = State::Buffering { remaining: data_frame_length, chunk_type: header.chunk_type, - buffer: Vec::with_capacity(data_frame_length), } } ChunkType::ReservedUnskippable(chunk_type) => { @@ -146,7 +160,6 @@ impl DecodeV2 for SnappyDecoder { State::Buffering { remaining, chunk_type, - buffer, } => { let input_buf = input.unwritten(); let boundary = (*remaining).min(input_buf.len()); @@ -154,7 +167,7 @@ impl DecodeV2 for SnappyDecoder { *remaining -= input_buf.len(); - buffer.extend_from_slice(input_buf); + self.in_buf.extend_from_slice(input_buf); input.advance(input_buf.len()); if *remaining != 0 { @@ -163,11 +176,11 @@ impl DecodeV2 for SnappyDecoder { // We're done buffering, so let's decode the chunk let chunk_type = *chunk_type; - let buffer = mem::take(buffer); - let output = decode_chunk(chunk_type, buffer)?; - self.state = State::Sending(PartialBuffer::new(output)) + self.decode_chunk(chunk_type)?; + self.state = State::Sending } - State::Sending(buffer) => { + State::Sending => { + let buffer = &mut self.out_buf; output.copy_unwritten_from(buffer); if buffer.unwritten().is_empty() { self.state = State::ChunkHeader([0u8; 4].into()) @@ -181,7 +194,8 @@ impl DecodeV2 for SnappyDecoder { fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { match &mut self.state { - State::Sending(buffer) => { + State::Sending => { + let buffer = &mut self.out_buf; output.copy_unwritten_from(buffer); if buffer.unwritten().is_empty() { self.state = State::ChunkHeader([0u8; 4].into()); diff --git a/crates/compression-codecs/src/snappy/encoder.rs b/crates/compression-codecs/src/snappy/encoder.rs index ce6d2bed..f9468d1a 100644 --- a/crates/compression-codecs/src/snappy/encoder.rs +++ b/crates/compression-codecs/src/snappy/encoder.rs @@ -1,4 +1,4 @@ -use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, STREAM_FRAME}; +use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, MAX_FRAME_SIZE, STREAM_FRAME}; use crate::EncodeV2; use compression_core::util::{PartialBuffer, WriteBuffer}; @@ -8,6 +8,7 @@ const MAX_BLOCK_SIZE: usize = 1 << 16; pub struct SnappyEncoder { state: State, chunk: Vec, + out_buf: PartialBuffer>, } impl Default for SnappyEncoder { @@ -15,6 +16,7 @@ impl Default for SnappyEncoder { Self { state: State::InitStream(PartialBuffer::new(STREAM_FRAME)), chunk: Vec::with_capacity(MAX_BLOCK_SIZE), + out_buf: PartialBuffer::new(Vec::with_capacity(MAX_FRAME_SIZE)), } } } @@ -23,42 +25,53 @@ impl SnappyEncoder { pub fn new() -> Self { Self::default() } + + fn compress_frame(&mut self) -> std::io::Result<()> { + let in_buffer = &self.chunk; + let checksum = crc32c_masked(in_buffer); + + self.out_buf.reset(); + let out_buf = self.out_buf.get_mut(); + out_buf.clear(); + let max_compress_size = snap::raw::max_compress_len(in_buffer.len()); + out_buf.resize(max_compress_size + 8, 0); + + let mut encoder = snap::raw::Encoder::new(); + let compress_data = encoder.compress(in_buffer, &mut out_buf[8..])?; + + let chunk_type = if compress_data >= in_buffer.len() - (in_buffer.len() / 8) { + out_buf.clear(); + out_buf.resize(in_buffer.len() + 8, 0); + (&mut out_buf[8..]).copy_from_slice(in_buffer); + + ChunkType::Uncompressed + } else { + out_buf.truncate(compress_data + 8); + ChunkType::Compressed + }; + + // We add 4 because the length includes the 4 bytes of the checksum. + let chunk_len = out_buf.len() - 4; + let header = FrameHeader { + chunk_type, + data_frame_length: chunk_len as u64, + }; + + let raw_header: [u8; 4] = header.into(); + let raw_checksum: [u8; 4] = checksum.to_le_bytes(); + + (&mut out_buf[0..4]).copy_from_slice(&raw_header); + (&mut out_buf[4..8]).copy_from_slice(&raw_checksum); + + Ok(()) + } } #[derive(Debug)] enum State { InitStream(PartialBuffer<&'static [u8]>), Buffering, - Writing(PartialBuffer>), -} - -fn compress_frame(buffer: &[u8]) -> std::io::Result> { - let checksum = crc32c_masked(buffer); - - let mut encoder = snap::raw::Encoder::new(); - let compress_data = encoder.compress_vec(buffer)?; - let (chunk_type, data) = if compress_data.len() >= buffer.len() - (buffer.len() / 8) { - (ChunkType::Uncompressed, buffer) - } else { - (ChunkType::Compressed, compress_data.as_slice()) - }; - - // We add 4 because the length includes the 4 bytes of the checksum. - let chunk_len = data.len() + 4; - let header = FrameHeader { - chunk_type, - data_frame_length: chunk_len as u64, - }; - - let mut frame = Vec::with_capacity(data.len() + 8); - let raw_header: [u8; 4] = header.into(); - let raw_checksum: [u8; 4] = checksum.to_le_bytes(); - - frame.extend_from_slice(&raw_header); - frame.extend_from_slice(&raw_checksum); - frame.extend_from_slice(data); - - Ok(frame) + Writing, } impl EncodeV2 for SnappyEncoder { @@ -92,11 +105,12 @@ impl EncodeV2 for SnappyEncoder { return Ok(()); } - let compressed_frame = compress_frame(buffer)?; - buffer.clear(); - self.state = State::Writing(compressed_frame.into()) + self.compress_frame()?; + self.chunk.clear(); + self.state = State::Writing } - State::Writing(buffer) => { + State::Writing => { + let buffer = &mut self.out_buf; if !buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); return Ok(()); @@ -121,18 +135,18 @@ impl EncodeV2 for SnappyEncoder { self.state = State::Buffering } State::Buffering => { - let buffer = &mut self.chunk; - let compressed_data = compress_frame(buffer)?; - buffer.clear(); - self.state = State::Writing(compressed_data.into()) + self.compress_frame()?; + self.chunk.clear(); + self.state = State::Writing } - State::Writing(buffer) => { + State::Writing => { + let buffer = &mut self.out_buf; return if !buffer.unwritten().is_empty() { output.copy_unwritten_from(buffer); Ok(false) } else { Ok(true) - } + }; } } } diff --git a/crates/compression-codecs/src/snappy/mod.rs b/crates/compression-codecs/src/snappy/mod.rs index 53178cb7..85c46d2b 100644 --- a/crates/compression-codecs/src/snappy/mod.rs +++ b/crates/compression-codecs/src/snappy/mod.rs @@ -7,6 +7,14 @@ pub use self::encoder::SnappyEncoder; use std::io; const STREAM_FRAME: &[u8] = b"\xFF\x06\x00\x00sNaPpY"; +const CHUNK_HEADER_SIZE: usize = 4; +const CRC_SIZE: usize = 4; +const MAX_COMPRESSED_SIZE: usize = 76490; + +const MAX_FRAME_SIZE: usize = + STREAM_FRAME.len() + CHUNK_HEADER_SIZE + CRC_SIZE + MAX_COMPRESSED_SIZE; + +const MAX_BLOCK_SIZE: usize = 65536; #[derive(Debug, Copy, Clone)] struct FrameHeader { From 3b20b9c9b033b32151758591fc7ca445d92672ad Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Mar 2026 17:53:04 -0400 Subject: [PATCH 06/12] Apply missing review from @NobodyXu --- crates/compression-codecs/src/snappy/decoder.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs index 34a95eff..b46ed5ec 100644 --- a/crates/compression-codecs/src/snappy/decoder.rs +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -1,4 +1,4 @@ -use crate::snappy::{mask_crc, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE}; +use crate::snappy::{crc32c_masked, mask_crc, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE}; use crate::DecodeV2; use compression_core::util::{PartialBuffer, WriteBuffer}; use std::convert::TryInto; @@ -51,8 +51,7 @@ impl SnappyDecoder { ), }; - let got_sum = crc32c::crc32c(&out_buf); - let got_sum = mask_crc(got_sum); + let got_sum = crc32c_masked(&out_buf); if expected_sum != got_sum { return Err(io::Error::new( io::ErrorKind::InvalidData, From f61f7b4b07dc21b8b489381c46793fae4af76970 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Mar 2026 17:54:18 -0400 Subject: [PATCH 07/12] Remove unused import --- crates/compression-codecs/src/snappy/decoder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs index b46ed5ec..b521e6a9 100644 --- a/crates/compression-codecs/src/snappy/decoder.rs +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -1,8 +1,8 @@ -use crate::snappy::{crc32c_masked, mask_crc, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE}; +use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE}; use crate::DecodeV2; use compression_core::util::{PartialBuffer, WriteBuffer}; use std::convert::TryInto; -use std::{io, mem}; +use std::io; #[derive(Debug)] pub struct SnappyDecoder { From 04edb10e7db38caed9993f11f5938624c7f9f448 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Fri, 27 Mar 2026 17:57:11 -0400 Subject: [PATCH 08/12] Address Clippy issues --- crates/compression-codecs/src/snappy/decoder.rs | 6 +++--- crates/compression-codecs/src/snappy/encoder.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs index b521e6a9..e4ec6117 100644 --- a/crates/compression-codecs/src/snappy/decoder.rs +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -35,14 +35,14 @@ impl SnappyDecoder { let expected_sum = u32::from_le_bytes(expected_sum); self.out_buf.reset(); - let mut out_buf = self.out_buf.get_mut(); + let out_buf = self.out_buf.get_mut(); out_buf.clear(); match chunk_type { ChunkType::Compressed => { let uncompress_length = snap::raw::decompress_len(data)?; out_buf.resize(uncompress_length, 0); let mut decoder = snap::raw::Decoder::new(); - decoder.decompress(&data, &mut out_buf)?; + decoder.decompress(data, out_buf)?; } ChunkType::Uncompressed => out_buf.extend_from_slice(data), _ => unreachable!( @@ -51,7 +51,7 @@ impl SnappyDecoder { ), }; - let got_sum = crc32c_masked(&out_buf); + let got_sum = crc32c_masked(out_buf); if expected_sum != got_sum { return Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/crates/compression-codecs/src/snappy/encoder.rs b/crates/compression-codecs/src/snappy/encoder.rs index f9468d1a..f51efd27 100644 --- a/crates/compression-codecs/src/snappy/encoder.rs +++ b/crates/compression-codecs/src/snappy/encoder.rs @@ -42,7 +42,7 @@ impl SnappyEncoder { let chunk_type = if compress_data >= in_buffer.len() - (in_buffer.len() / 8) { out_buf.clear(); out_buf.resize(in_buffer.len() + 8, 0); - (&mut out_buf[8..]).copy_from_slice(in_buffer); + out_buf[8..].copy_from_slice(in_buffer); ChunkType::Uncompressed } else { @@ -60,8 +60,8 @@ impl SnappyEncoder { let raw_header: [u8; 4] = header.into(); let raw_checksum: [u8; 4] = checksum.to_le_bytes(); - (&mut out_buf[0..4]).copy_from_slice(&raw_header); - (&mut out_buf[4..8]).copy_from_slice(&raw_checksum); + out_buf[0..4].copy_from_slice(&raw_header); + out_buf[4..8].copy_from_slice(&raw_checksum); Ok(()) } From cae45894ac7440f5f5f671eda2841aeda46d9047 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Mon, 30 Mar 2026 14:53:55 -0400 Subject: [PATCH 09/12] Stop copy if uncompress in decoder --- .../compression-codecs/src/snappy/decoder.rs | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs index e4ec6117..7dc88d49 100644 --- a/crates/compression-codecs/src/snappy/decoder.rs +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -1,13 +1,12 @@ use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE}; use crate::DecodeV2; use compression_core::util::{PartialBuffer, WriteBuffer}; -use std::convert::TryInto; use std::io; #[derive(Debug)] pub struct SnappyDecoder { state: State, - in_buf: Vec, + in_buf: PartialBuffer>, out_buf: PartialBuffer>, } @@ -15,7 +14,7 @@ impl Default for SnappyDecoder { fn default() -> Self { Self { state: State::default(), - in_buf: Vec::with_capacity(MAX_FRAME_SIZE), + in_buf: PartialBuffer::new(Vec::with_capacity(MAX_FRAME_SIZE)), out_buf: PartialBuffer::new(Vec::with_capacity(MAX_BLOCK_SIZE)), } } @@ -27,31 +26,38 @@ impl SnappyDecoder { } fn decode_chunk(&mut self, chunk_type: ChunkType) -> std::io::Result<()> { - let (expected_sum, data) = self.in_buf.split_at(4); + let mut expected_sum: PartialBuffer<[u8; 4]> = PartialBuffer::default(); + expected_sum.copy_unwritten_from(&mut self.in_buf); + let expected_sum = u32::from_le_bytes(expected_sum.into_inner()); - let expected_sum: [u8; 4] = expected_sum - .try_into() - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid checksum length"))?; - let expected_sum = u32::from_le_bytes(expected_sum); + let data = self.in_buf.unwritten(); self.out_buf.reset(); let out_buf = self.out_buf.get_mut(); out_buf.clear(); - match chunk_type { + let got_sum = match chunk_type { ChunkType::Compressed => { let uncompress_length = snap::raw::decompress_len(data)?; out_buf.resize(uncompress_length, 0); let mut decoder = snap::raw::Decoder::new(); decoder.decompress(data, out_buf)?; + self.state = State::CompressedCopy; + crc32c_masked(out_buf) + } + ChunkType::Uncompressed => { + // Data is uncompressed, so we just need to reset the partial buffer and advance + // past the header + self.in_buf.reset(); + self.in_buf.advance(4); + self.state = State::UncompressedCopy; + crc32c_masked(self.in_buf.unwritten()) } - ChunkType::Uncompressed => out_buf.extend_from_slice(data), _ => unreachable!( "can only decode compressed or uncompressed chunks, not {:?}", chunk_type ), }; - let got_sum = crc32c_masked(out_buf); if expected_sum != got_sum { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -72,7 +78,8 @@ enum State { remaining: usize, chunk_type: ChunkType, }, - Sending, + UncompressedCopy, + CompressedCopy, } impl Default for State { @@ -129,7 +136,8 @@ impl DecodeV2 for SnappyDecoder { | ChunkType::Padding => self.state = State::Skipping(data_frame_length), ChunkType::Compressed | ChunkType::Uncompressed => { let in_buf = &mut self.in_buf; - in_buf.clear(); + in_buf.get_mut().clear(); + in_buf.reset(); self.state = State::Buffering { remaining: data_frame_length, chunk_type: header.chunk_type, @@ -166,7 +174,7 @@ impl DecodeV2 for SnappyDecoder { *remaining -= input_buf.len(); - self.in_buf.extend_from_slice(input_buf); + self.in_buf.get_mut().extend_from_slice(input_buf); input.advance(input_buf.len()); if *remaining != 0 { @@ -176,9 +184,17 @@ impl DecodeV2 for SnappyDecoder { // We're done buffering, so let's decode the chunk let chunk_type = *chunk_type; self.decode_chunk(chunk_type)?; - self.state = State::Sending } - State::Sending => { + State::UncompressedCopy => { + let buffer = &mut self.in_buf; + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()) + } else { + return Ok(false); + } + } + State::CompressedCopy => { let buffer = &mut self.out_buf; output.copy_unwritten_from(buffer); if buffer.unwritten().is_empty() { @@ -193,7 +209,17 @@ impl DecodeV2 for SnappyDecoder { fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { match &mut self.state { - State::Sending => { + State::UncompressedCopy => { + let buffer = &mut self.in_buf; + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()); + Ok(true) + } else { + Ok(false) + } + } + State::CompressedCopy => { let buffer = &mut self.out_buf; output.copy_unwritten_from(buffer); if buffer.unwritten().is_empty() { From 6e96ede6d30624be09b0d76561667acdefc0f72e Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Mon, 30 Mar 2026 16:47:12 -0400 Subject: [PATCH 10/12] Stop copy if uncompress in Encoder --- .../compression-codecs/src/snappy/encoder.rs | 92 ++++++++++++------- 1 file changed, 57 insertions(+), 35 deletions(-) diff --git a/crates/compression-codecs/src/snappy/encoder.rs b/crates/compression-codecs/src/snappy/encoder.rs index f51efd27..00ed0c4a 100644 --- a/crates/compression-codecs/src/snappy/encoder.rs +++ b/crates/compression-codecs/src/snappy/encoder.rs @@ -7,7 +7,7 @@ const MAX_BLOCK_SIZE: usize = 1 << 16; #[derive(Debug)] pub struct SnappyEncoder { state: State, - chunk: Vec, + in_buf: PartialBuffer>, out_buf: PartialBuffer>, } @@ -15,7 +15,7 @@ impl Default for SnappyEncoder { fn default() -> Self { Self { state: State::InitStream(PartialBuffer::new(STREAM_FRAME)), - chunk: Vec::with_capacity(MAX_BLOCK_SIZE), + in_buf: PartialBuffer::new(Vec::with_capacity(MAX_BLOCK_SIZE)), out_buf: PartialBuffer::new(Vec::with_capacity(MAX_FRAME_SIZE)), } } @@ -27,7 +27,7 @@ impl SnappyEncoder { } fn compress_frame(&mut self) -> std::io::Result<()> { - let in_buffer = &self.chunk; + let in_buffer = &self.in_buf.unwritten(); let checksum = crc32c_masked(in_buffer); self.out_buf.reset(); @@ -39,39 +39,63 @@ impl SnappyEncoder { let mut encoder = snap::raw::Encoder::new(); let compress_data = encoder.compress(in_buffer, &mut out_buf[8..])?; - let chunk_type = if compress_data >= in_buffer.len() - (in_buffer.len() / 8) { - out_buf.clear(); - out_buf.resize(in_buffer.len() + 8, 0); - out_buf[8..].copy_from_slice(in_buffer); - - ChunkType::Uncompressed + let (chunk_type, chunk_len) = if compress_data >= in_buffer.len() - (in_buffer.len() / 8) { + (ChunkType::Uncompressed, in_buffer.len()) } else { - out_buf.truncate(compress_data + 8); - ChunkType::Compressed + out_buf.truncate(compress_data); + (ChunkType::Compressed, out_buf.len()) }; // We add 4 because the length includes the 4 bytes of the checksum. - let chunk_len = out_buf.len() - 4; + let chunk_len = chunk_len + 4; let header = FrameHeader { chunk_type, data_frame_length: chunk_len as u64, }; - let raw_header: [u8; 4] = header.into(); + let mut raw_chunk_header = [0u8; 8]; + let raw_frame_header: [u8; 4] = header.into(); let raw_checksum: [u8; 4] = checksum.to_le_bytes(); - out_buf[0..4].copy_from_slice(&raw_header); - out_buf[4..8].copy_from_slice(&raw_checksum); + raw_chunk_header[0..4].copy_from_slice(&raw_frame_header); + raw_chunk_header[4..8].copy_from_slice(&raw_checksum); + + match chunk_type { + ChunkType::Compressed => self.state = State::CompressCopy(raw_chunk_header.into()), + ChunkType::Uncompressed => self.state = State::UncompressCopy(raw_chunk_header.into()), + _ => unreachable!(), + } Ok(()) } } +fn write( + header: &mut PartialBuffer<[u8; 8]>, + input: &mut PartialBuffer>, + output: &mut WriteBuffer<'_>, +) -> bool { + if !header.unwritten().is_empty() { + output.copy_unwritten_from(header); + if output.has_no_spare_space() { + return false; + } + } + + if !input.unwritten().is_empty() { + output.copy_unwritten_from(input); + false + } else { + true + } +} + #[derive(Debug)] enum State { InitStream(PartialBuffer<&'static [u8]>), Buffering, - Writing, + UncompressCopy(PartialBuffer<[u8; 8]>), + CompressCopy(PartialBuffer<[u8; 8]>), } impl EncodeV2 for SnappyEncoder { @@ -92,7 +116,7 @@ impl EncodeV2 for SnappyEncoder { self.state = State::Buffering } State::Buffering => { - let buffer = &mut self.chunk; + let buffer = self.in_buf.get_mut(); let input_buf = input.unwritten(); let available = MAX_BLOCK_SIZE - buffer.len(); let boundary = available.min(input_buf.len()); @@ -106,17 +130,22 @@ impl EncodeV2 for SnappyEncoder { } self.compress_frame()?; - self.chunk.clear(); - self.state = State::Writing } - State::Writing => { - let buffer = &mut self.out_buf; - if !buffer.unwritten().is_empty() { - output.copy_unwritten_from(buffer); + State::UncompressCopy(header) => { + if !write(header, &mut self.in_buf, output) { + return Ok(()); + } + self.in_buf.get_mut().clear(); + self.in_buf.reset(); + self.state = State::Buffering + } + State::CompressCopy(header) => { + if !write(header, &mut self.out_buf, output) { return Ok(()); - } else { - self.state = State::Buffering } + self.in_buf.get_mut().clear(); + self.in_buf.reset(); + self.state = State::Buffering } } } @@ -136,18 +165,11 @@ impl EncodeV2 for SnappyEncoder { } State::Buffering => { self.compress_frame()?; - self.chunk.clear(); - self.state = State::Writing } - State::Writing => { - let buffer = &mut self.out_buf; - return if !buffer.unwritten().is_empty() { - output.copy_unwritten_from(buffer); - Ok(false) - } else { - Ok(true) - }; + State::UncompressCopy(header) => { + return Ok(write(header, &mut self.in_buf, output)) } + State::CompressCopy(header) => return Ok(write(header, &mut self.out_buf, output)), } } } From 9e740a54fa63c2ce45a473b1a160f2c2d75a0073 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Mon, 30 Mar 2026 16:48:31 -0400 Subject: [PATCH 11/12] Apply review from @NobodyXu --- crates/compression-codecs/src/snappy/decoder.rs | 7 +++++-- crates/compression-codecs/src/snappy/mod.rs | 6 +++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs index 7dc88d49..b97fb8d1 100644 --- a/crates/compression-codecs/src/snappy/decoder.rs +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -90,7 +90,10 @@ impl Default for State { impl DecodeV2 for SnappyDecoder { fn reinit(&mut self) -> std::io::Result<()> { - *self = Self::new(); + self.state = State::default(); + self.in_buf.get_mut().clear(); + self.in_buf.reset(); + self.out_buf.reset(); Ok(()) } @@ -108,7 +111,7 @@ impl DecodeV2 for SnappyDecoder { } let header = FrameHeader::parse(header.written())?; - if let ChunkType::Stream = header.chunk_type { + if matches!(header.chunk_type, ChunkType::Stream) { self.state = State::Skipping(header.data_frame_length as usize) } else { return Err(std::io::Error::new( diff --git a/crates/compression-codecs/src/snappy/mod.rs b/crates/compression-codecs/src/snappy/mod.rs index 85c46d2b..03ef26b8 100644 --- a/crates/compression-codecs/src/snappy/mod.rs +++ b/crates/compression-codecs/src/snappy/mod.rs @@ -39,7 +39,7 @@ impl From for ChunkType { 0x00 => Self::Compressed, 0x01 => Self::Uncompressed, 0xFE => Self::Padding, - 0x02..=0x7f => Self::ReservedUnskippable(value), + 0x02..=0x7F => Self::ReservedUnskippable(value), 0x80..=0xFD => Self::ReservedSkippable(value), } } @@ -66,7 +66,7 @@ impl FrameHeader { ))?; let chunk_type = ChunkType::from(header_part[0]); - // SAFETY: header_part is guaranteed to have at least 4 bytes due to split_first_chunk + // header_part is guaranteed to have at least 4 bytes due to split_first_chunk let length_part: &[u8; 3] = header_part[1..].first_chunk().unwrap(); let length = read_u24_le(length_part) as u64; @@ -90,7 +90,7 @@ impl From for [u8; 4] { } } -pub fn read_u24_le(slice: &[u8; 3]) -> u32 { +fn read_u24_le(slice: &[u8; 3]) -> u32 { slice[0] as u32 | (slice[1] as u32) << 8 | (slice[2] as u32) << 16 } From a1e2112aca9bbba2399d776400ddb613255e4e10 Mon Sep 17 00:00:00 2001 From: Antoine Rouaze Date: Tue, 31 Mar 2026 11:39:55 -0400 Subject: [PATCH 12/12] Use NonZeroUsize in decoder --- .../compression-codecs/src/snappy/decoder.rs | 49 ++++++++++++++----- crates/compression-codecs/src/snappy/mod.rs | 1 + 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs index b97fb8d1..85cc5b94 100644 --- a/crates/compression-codecs/src/snappy/decoder.rs +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -1,7 +1,10 @@ -use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE}; +use crate::snappy::{ + crc32c_masked, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE, STREAM_DATA_FRAME_SIZE, +}; use crate::DecodeV2; use compression_core::util::{PartialBuffer, WriteBuffer}; use std::io; +use std::num::NonZeroUsize; #[derive(Debug)] pub struct SnappyDecoder { @@ -73,9 +76,9 @@ impl SnappyDecoder { enum State { StreamIdentifier(PartialBuffer<[u8; 4]>), ChunkHeader(PartialBuffer<[u8; 4]>), - Skipping(usize), + Skipping(NonZeroUsize), Buffering { - remaining: usize, + remaining: NonZeroUsize, chunk_type: ChunkType, }, UncompressedCopy, @@ -112,7 +115,18 @@ impl DecodeV2 for SnappyDecoder { let header = FrameHeader::parse(header.written())?; if matches!(header.chunk_type, ChunkType::Stream) { - self.state = State::Skipping(header.data_frame_length as usize) + let data_frame_length = header.data_frame_length as usize; + if data_frame_length != STREAM_DATA_FRAME_SIZE { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid stream frame data length, expected {}, got {}", + STREAM_DATA_FRAME_SIZE, data_frame_length + ), + )); + } + // We checked above that the stream data frame length is valid and non-zero + self.state = State::Skipping(NonZeroUsize::new(data_frame_length).unwrap()) } else { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -131,7 +145,12 @@ impl DecodeV2 for SnappyDecoder { let header = FrameHeader::parse(header.written())?; - let data_frame_length = header.data_frame_length as usize; + let Some(data_frame_length) = + NonZeroUsize::new(header.data_frame_length as usize) + else { + self.state = State::ChunkHeader([0u8; 4].into()); + continue; + }; match header.chunk_type { ChunkType::Stream @@ -159,28 +178,34 @@ impl DecodeV2 for SnappyDecoder { } State::Skipping(n) => { let input_len = input.unwritten().len(); - if input_len < *n { + + let n = n.get(); + if input_len < n { input.advance(input_len); - *n -= input_len; - return Ok(false); + if let Some(n) = NonZeroUsize::new(n - input_len) { + self.state = State::Skipping(n); + return Ok(false); + } } - input.advance(*n); + input.advance(n); self.state = State::ChunkHeader([0u8; 4].into()) } State::Buffering { remaining, chunk_type, } => { + let mut rem = remaining.get(); let input_buf = input.unwritten(); - let boundary = (*remaining).min(input_buf.len()); + let boundary = rem.min(input_buf.len()); let input_buf = &input_buf[..boundary]; - *remaining -= input_buf.len(); + rem -= input_buf.len(); self.in_buf.get_mut().extend_from_slice(input_buf); input.advance(input_buf.len()); - if *remaining != 0 { + if let Some(rem) = NonZeroUsize::new(rem) { + *remaining = rem; return Ok(false); } diff --git a/crates/compression-codecs/src/snappy/mod.rs b/crates/compression-codecs/src/snappy/mod.rs index 03ef26b8..4e63b5f4 100644 --- a/crates/compression-codecs/src/snappy/mod.rs +++ b/crates/compression-codecs/src/snappy/mod.rs @@ -7,6 +7,7 @@ pub use self::encoder::SnappyEncoder; use std::io; const STREAM_FRAME: &[u8] = b"\xFF\x06\x00\x00sNaPpY"; +const STREAM_DATA_FRAME_SIZE: usize = 6; const CHUNK_HEADER_SIZE: usize = 4; const CRC_SIZE: usize = 4; const MAX_COMPRESSED_SIZE: usize = 76490;