Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/core/buffer_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ class BufferPool {
void release_vector(std::vector<uint8_t>&& vec) {
std::lock_guard<std::mutex> lock(vec_mutex_);
if (vec_pool_.size() < max_buffers_per_bucket_ * 4) { // Higher limit for vectors
// Discard oversized buffers to prevent memory bloat from outlier images.
// If a single large image expanded the vector beyond 2x the default,
// let it deallocate rather than keeping it in the pool forever.
if (vec.capacity() > default_vector_size_ * 2) {
stats_.oversized_releases++;
return; // Let the vector deallocate
}
vec_pool_.push_back(std::move(vec));
}
}
Expand Down
25 changes: 22 additions & 3 deletions src/pipeline/pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ class TarWorker {
std::shared_ptr<std::vector<uint8_t>> remote_tar_data = nullptr,
size_t distributed_start_idx = 0,
size_t distributed_end_idx = 0,
cache::TieredCache* cache = nullptr)
cache::TieredCache* cache = nullptr,
std::atomic<size_t>* samples_skipped = nullptr)
: config_(config),
worker_id_(worker_id),
buffer_pool_(buffer_pool),
Expand All @@ -336,7 +337,8 @@ class TarWorker {
samples_processed_(0),
distributed_start_idx_(distributed_start_idx),
distributed_end_idx_(distributed_end_idx),
cache_(cache) {
cache_(cache),
samples_skipped_(samples_skipped) {

// Per-worker TAR reader
// Check if using remote TAR data (already fetched)
Expand Down Expand Up @@ -487,6 +489,7 @@ class TarWorker {
throw std::runtime_error(
"GPU decode failed for: " + entry.name);
}
if (samples_skipped_) samples_skipped_->fetch_add(1, std::memory_order_relaxed);
continue; // Skip corrupted (config allows)
}
} else
Expand All @@ -506,6 +509,7 @@ class TarWorker {
if (!config_.skip_corrupted) {
throw; // Re-throw if not skipping
}
if (samples_skipped_) samples_skipped_->fetch_add(1, std::memory_order_relaxed);
continue; // Skip corrupted (config allows)
}
usample.image_data = std::move(sample.decoded_rgb);
Expand Down Expand Up @@ -567,6 +571,9 @@ class TarWorker {
// Caching (NEW in v2.0.0)
cache::TieredCache* cache_;

// Skip counter (shared with pipeline)
std::atomic<size_t>* samples_skipped_ = nullptr;

// Shuffle support (NEW in v2.8.0)
size_t epoch_ = 0;
};
Expand Down Expand Up @@ -687,7 +694,8 @@ class UnifiedPipeline {
tar_workers_.push_back(std::make_unique<TarWorker>(
config_, i, buffer_pool_.get(), remote_tar_data,
distributed_start_idx_, distributed_end_idx_,
tiered_cache_.get()
tiered_cache_.get(),
&samples_skipped_
));
tar_workers_.back()->start();
}
Expand Down Expand Up @@ -1173,6 +1181,7 @@ class UnifiedPipeline {
std::atomic<bool> running_;
std::atomic<size_t> samples_processed_;
std::atomic<size_t> batches_produced_;
std::atomic<size_t> samples_skipped_{0}; // Corrupted/failed samples skipped

// Smart Batching (NEW in v1.5.1, auto-detection in v2.3.0)
std::unique_ptr<pipeline::SmartBatcher<UnifiedSample>> smart_batcher_;
Expand Down Expand Up @@ -1205,6 +1214,16 @@ class UnifiedPipeline {
return tiered_cache_ != nullptr;
}

/**
* @brief Get number of samples skipped due to decode errors
*
* Incremented when skip_corrupted=true and a sample fails to decode.
* Useful for monitoring data quality and detecting silent data loss.
*/
size_t skipped_samples() const {
return samples_skipped_.load(std::memory_order_relaxed);
}

/**
* @brief Check if smart batching is active (NEW in v2.3.0)
*
Expand Down
26 changes: 26 additions & 0 deletions tests/test_buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,32 @@ TEST_F(BufferPoolTest, TotalPooledCount) {
EXPECT_EQ(pool_->pooled_count(), 2); // Total
}

// Test that oversized vectors are discarded, not pooled
TEST_F(BufferPoolTest, OversizedVectorDiscard) {
// default_vector_size is 256*256*3 = 196608
// Threshold is 2x = 393216
// Acquire a vector, grow it beyond 2x, then let it release
{
auto vec = pool_->acquire_vector();
// Grow the internal vector way beyond 2x default size
vec->resize(500000); // ~500KB, well above 2*196608
}
// The oversized vector should NOT be pooled
EXPECT_EQ(pool_->vector_pooled_count(), 0u);

// Now do the same with a normal-sized vector
{
auto vec = pool_->acquire_vector();
vec->resize(1000); // Small, well within 2x
}
// Normal vector should be pooled
EXPECT_EQ(pool_->vector_pooled_count(), 1u);

// Check that the oversized release was tracked in stats
auto s = pool_->stats();
EXPECT_GE(s.oversized_releases, 1u);
}

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down