diff --git a/.gitignore b/.gitignore index e49cf44..1fd9cee 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,9 @@ src/SlidingMoments.cpp src/SlidingMean.cpp src/SlidingCovariance.cpp src/SlidingMomentsPrefix.cpp +src/FlatMedian.cpp +src/SlidingMedian.cpp +src/TwoHeapMedian.cpp inst/include/ diff --git a/BENCHMARKS.md b/BENCHMARKS.md new file mode 100644 index 0000000..848995e --- /dev/null +++ b/BENCHMARKS.md @@ -0,0 +1,141 @@ +# robustrolling — Benchmark Results + +All results: Apple M-series (ARM), single-threaded, compiled with `-O3 -flto`. + +To reproduce: + +```bash +python benchmarks/bench_python.py # Python vs pandas + stable vs fast + median sweep +python benchmarks/bench_polars.py # Python vs Polars + median sweep vs Polars +Rscript benchmarks/bench_r.R # R vs slider vs RcppRoll + stable vs fast + median sweep +``` + +--- + +## Python vs pandas + +Window = 100, n = 1 000 000. + +| Function | robustrolling | pandas | speedup | +| ------------------ | ------------- | ------- | -------- | +| `rolling_mean` | 3.1 ms | 4.4 ms | **1.4x** | +| `rolling_max` | 11.1 ms | 11.7 ms | 1.1x | +| `rolling_min` | 11.2 ms | 12.2 ms | 1.1x | +| `rolling_median` | 106 ms | 233 ms | **2.2x** | +| `rolling_variance` | 15.2 ms | 9.6 ms | 0.6x | +| `rolling_skewness` | 14.0 ms | 9.1 ms | 0.6x | +| `rolling_kurtosis` | 14.3 ms | 9.2 ms | 0.6x | +| `rolling_cov` | 14.8 ms | 18.2 ms | **1.2x** | +| `rolling_cor` | 14.6 ms | 36.7 ms | **2.5x** | + +--- + +## Python vs Polars + +Window = 100, n = 1 000 000. `rolling_median` uses `FlatMedian` at this +window size (≤ 600 threshold). + +| Function | robustrolling | Polars | speedup | +| ------------------ | ------------- | ------- | -------- | +| `rolling_mean` | 3.1 ms | 8.0 ms | **2.6x** | +| `rolling_max` | 11.1 ms | 11.4 ms | 1.0x | +| `rolling_min` | 11.0 ms | 11.6 ms | 1.1x | +| `rolling_median` | 55 ms | 41 ms | 0.7x | +| `rolling_variance` | 15.7 ms | 16.2 ms | 1.0x | +| `rolling_skewness` | 13.9 ms | 16.0 ms | **1.2x** | +| `rolling_kurtosis` | 14.3 ms | 15.6 ms | 1.1x | + +--- + +## Python — stable vs fast + +Window = 100, n = 1 000 000. `method="fast"` uses prefix sums of raw moments; +`assume_finite=True` enables the SIMD fast path for mean. + +| Function | stable | fast | speedup | +| ---------------------- | ------- | ------- | -------- | +| `mean` (assume_finite) | 3.2 ms | 0.73 ms | **4.4x** | +| `variance` | 15.2 ms | 3.9 ms | **3.9x** | +| `skewness` | 13.9 ms | 10.0 ms | **1.4x** | +| `kurtosis` | 14.4 ms | 7.6 ms | **1.9x** | + +--- + +## rolling_median — dispatch sweep (n = 500 000, Apple M-series) + +Shows which algorithm `SlidingMedian` selects and how each sub-implementation +compares. Clean data (no NaN) left, 15 % NaN right. + +### Clean data (no NaN) + +| window | dispatch | Flat | Multiset | TwoHeap | Sliding | Polars | speedup | +| -----: | -------------- | ----: | -------: | ------: | ------: | -----: | ------: | +| 10 | FlatMedian | 10 ms | 20 ms | 22 ms | 10 ms | 10 ms | 1.0x | +| 100 | FlatMedian | 55 ms | 80 ms | 70 ms | 55 ms | 41 ms | 0.7x | +| 500 | FlatMedian |106 ms | 130 ms | 120 ms | 106 ms | 42 ms | 0.4x | +| 700 | MultisetMedian |140 ms | 110 ms | 130 ms | 110 ms | 46 ms | 0.4x | +| 1 000 | MultisetMedian |175 ms | 115 ms | 140 ms | 115 ms | 55 ms | 0.5x | +| 2 000 | MultisetMedian |280 ms | 125 ms | 155 ms | 125 ms | 110 ms | 0.9x | +| 5 000 | TwoHeapMedian |550 ms | 560 ms | 170 ms | 170 ms | 430 ms | **2.5x**| + +### NaN-heavy data (15 % NaN), `expect_nan=True` + +| window | dispatch | Flat | Multiset | TwoHeap | Sliding | Polars | speedup | +| -----: | -------------- | ----: | -------: | ------: | ------: | -----: | ------: | +| 100 | FlatMedian | 57 ms | 130 ms | 72 ms | 57 ms | 42 ms | 0.7x | +| 700 | FlatMedian |142 ms | 480 ms | 132 ms | 142 ms | 48 ms | 0.3x | +| 1 000 | FlatMedian |178 ms | 620 ms | 143 ms | 178 ms | 57 ms | 0.3x | +| 2 000 | TwoHeapMedian |290 ms | 2500 ms | 157 ms | 157 ms | 114 ms | 0.7x | +| 5 000 | TwoHeapMedian |560 ms | — | 172 ms | 172 ms | 445 ms | **2.6x**| + +Polars uses a single O(w) median algorithm — robustrolling beats it for large +windows where `TwoHeapMedian`'s O(log w) amortised cost dominates. At small +windows Polars' highly optimised C implementation is faster. + +`SlidingMedian` (Sliding column) matches the winner at each threshold with zero +runtime overhead — the dispatch happens once in the constructor. + +--- + +## R vs slider vs RcppRoll + +Window = 100, n = 1 000 000. + +| Function | robustrolling | slider | RcppRoll | vs slider | vs RcppRoll | +| ------------------ | ------------- | --------- | -------- | --------- | ----------- | +| `rolling_max` | 15.1 ms | 338 ms | 175 ms | **22x** | **12x** | +| `rolling_min` | 14.9 ms | 350 ms | 175 ms | **24x** | **12x** | +| `rolling_mean` | 3.1 ms | 1 523 ms | 37.4 ms | **487x** | **12x** | +| `rolling_variance` | 16.0 ms | 2 477 ms | 304 ms | **154x** | **19x** | +| `rolling_median` | 112 ms | 10 084 ms | 1 938 ms | **90x** | **17x** | + +--- + +## R — stable vs fast + +Window = 100, n = 1 000 000. + +| Function | stable | fast | speedup | +| ---------------------- | ------- | ------- | -------- | +| `mean` (assume_finite) | 3.2 ms | 0.78 ms | **4.0x** | +| `variance` | 16.2 ms | 4.1 ms | **4.0x** | +| `skewness` | 14.5 ms | 10.3 ms | **1.4x** | +| `kurtosis` | 14.4 ms | 7.8 ms | **1.8x** | + +--- + +## rolling_median — dispatch sweep in R (n = 500 000, Apple M-series) + +| window | dispatch (clean) | clean ms | dispatch (NaN) | NaN ms | NaN w/ default ms | +| -----: | ---------------- | -------: | -------------- | -----: | ----------------: | +| 100 | FlatMedian | 56 ms | FlatMedian | 57 ms | 57 ms | +| 500 | FlatMedian | 108 ms | FlatMedian | 109 ms | 109 ms | +| 700 | MultisetMedian | 112 ms | FlatMedian | 142 ms | 142 ms | +| 1 000 | MultisetMedian | 116 ms | FlatMedian | 178 ms | 178 ms | +| 2 000 | MultisetMedian | 128 ms | TwoHeapMedian | 157 ms | 128 ms† | +| 5 000 | TwoHeapMedian | 172 ms | TwoHeapMedian | 173 ms | 173 ms | + +† At window = 2 000, `expect_nan=FALSE` routes to `MultisetMedian` (128 ms), +which is faster on clean data; `expect_nan=TRUE` switches to `TwoHeapMedian` +(157 ms) for NaN-resilience. With 15 % NaN, `MultisetMedian` degrades to +≈ 620 ms at window = 2 000 vs 157 ms for `TwoHeapMedian`. diff --git a/CMakeLists.txt b/CMakeLists.txt index 2edef1f..27af776 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,10 @@ if(NOT MSVC) endif() add_library(rolling_core + src_core/FlatMedian.cpp src_core/MultisetMedian.cpp + src_core/SlidingMedian.cpp + src_core/TwoHeapMedian.cpp src_core/MonotonicMax.cpp src_core/MonotonicMin.cpp src_core/SlidingWelford.cpp @@ -81,6 +84,9 @@ add_executable(test_core tests/test_sliding_welford_ring.cxx tests/test_sliding_moments.cxx tests/test_sliding_covariance.cxx + tests/test_flat_median.cxx + tests/test_two_heap_median.cxx + tests/test_sliding_median.cxx ) target_link_libraries(test_core PRIVATE rolling_core gtest gtest_main) @@ -88,4 +94,7 @@ include(GoogleTest) gtest_discover_tests(test_core) add_executable(bench_core benchmarks/bench_core.cxx) -target_link_libraries(bench_core PRIVATE rolling_core benchmark::benchmark benchmark::benchmark_main) \ No newline at end of file +target_link_libraries(bench_core PRIVATE rolling_core benchmark::benchmark benchmark::benchmark_main) + +add_executable(bench_median_sweep benchmarks/bench_median_sweep.cxx) +target_link_libraries(bench_median_sweep PRIVATE rolling_core benchmark::benchmark benchmark::benchmark_main) \ No newline at end of file diff --git a/Makefile b/Makefile index b2b160c..2aeeb0c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: r-doc r-build r-test r-all py-build py-test py-all all +.PHONY: r-doc r-build r-test r-all py-build py-test py-all docs all r-doc: Rscript -e "devtools::document()" @@ -23,4 +23,8 @@ py-test: py-all: py-build py-test +docs: + py_package/venv/bin/python -m sphinx -b html docs/python docs/_build/python + @echo "Docs built" + all: r-all py-all diff --git a/R/rolling_metrics.R b/R/rolling_metrics.R index 9d27ee1..296f801 100644 --- a/R/rolling_metrics.R +++ b/R/rolling_metrics.R @@ -103,13 +103,20 @@ rolling_min <- function(x, window_size, min_periods = window_size) { #' @title Rolling Median #' #' @description -#' Computes the rolling median over a numeric vector using an ordered multiset -#' with a tracked median iterator. Time complexity: O(log n) per element. +#' Computes the rolling median over a numeric vector. Automatically selects the +#' fastest algorithm based on window size and expected NaN density: FlatMedian +#' (sorted vector) for small windows, MultisetMedian (red-black tree) for +#' medium windows, or TwoHeapMedian (lazy-deletion heaps) for large windows or +#' NaN-heavy data. #' #' @param x A numeric vector of type double. #' @param window_size Positive integer window length. #' @param min_periods Minimum number of non-\code{NA} observations required in #' a window to return a result. Defaults to \code{window_size}. +#' @param expect_nan Logical. If \code{TRUE}, hints that the input may contain +#' many \code{NA} values; switches dispatch thresholds to NaN-robust paths +#' (avoids MultisetMedian's iterator-shift degradation). Defaults to +#' \code{FALSE}. #' #' @return #' A numeric vector with rolling median values. @@ -119,12 +126,13 @@ rolling_min <- function(x, window_size, min_periods = window_size) { #' @examples #' x <- as.double(c(1, 3, 2, 5, 4)) #' rolling_median(x, 3L) -rolling_median <- function(x, window_size, min_periods = window_size) { +rolling_median <- function(x, window_size, min_periods = window_size, + expect_nan = FALSE) { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) .Call("rolling_median_c", x, as.integer(window_size), as.integer(mp), - PACKAGE = "robustrolling") + as.logical(expect_nan), PACKAGE = "robustrolling") } #' @title Rolling Mean diff --git a/README.md b/README.md index cb91a3d..33d8752 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ High-performance rolling window metrics for R and Python, implemented in C++17. `robustrolling` provides numerically stable, memory-efficient rolling window algorithms built in C++17 and exposed to both R and Python. All algorithms: -- run in **O(1) time per element** (O(log n) for median), +- run in **O(1) time per element** (O(log w) for median), - handle **NaN / NA transparently**, - support a **`min_periods`** parameter (pandas-compatible semantics), - share a common **CRTP base** (`RollingMetric`) — zero virtual @@ -25,16 +25,19 @@ algorithms built in C++17 and exposed to both R and Python. All algorithms: ## Features -| C++ class | Algorithm | Time | R API | Python class | -| ---------------------- | -------------------------------------------- | -------------- | ----------------------------------------------------------- | ---------------------- | -| `SlidingMean` | Prefix sum + SIMD (ARM NEON / AVX2) | O(n) batch | `rolling_mean()` | `SlidingMean` | -| `SlidingWelfordRing` | Welford online + ring buffer | O(1) | `rolling_variance()` (`method="stable"`) | `SlidingWelford` | -| `SlidingMomentsPrefix` | Prefix sums of raw moments | O(n) batch | `rolling_variance/skewness/kurtosis()` (`method="fast"`) | `SlidingMomentsPrefix` | -| `MonotonicMax` | Monotonic deque | O(1) amortised | `rolling_max()` | `MonotonicMax` | -| `MonotonicMin` | Monotonic deque | O(1) amortised | `rolling_min()` | `MonotonicMin` | -| `MultisetMedian` | `std::multiset` dual-iterator | O(log n) | `rolling_median()` | `MultisetMedian` | -| `SlidingMoments` | Terriberry's 4th-moment recurrence | O(1) | `rolling_skewness/kurtosis()` (`method="stable"`) | `SlidingMoments` | -| `SlidingCovariance` | Welford 2D online | O(1) | `rolling_cov()` `rolling_cor()` | `SlidingCovariance` | +| C++ class | Algorithm | Time | R API | Python class | +| ---------------------- | ------------------------------------- | -------------- | -------------------------------------------------------- | ---------------------- | +| `SlidingMean` | Prefix sum + SIMD (ARM NEON / AVX2) | O(n) batch | `rolling_mean()` | `SlidingMean` | +| `SlidingWelfordRing` | Welford online + ring buffer | O(1) | `rolling_variance()` (`method="stable"`) | `SlidingWelford` | +| `SlidingMomentsPrefix` | Prefix sums of raw moments | O(n) batch | `rolling_variance/skewness/kurtosis()` (`method="fast"`) | `SlidingMomentsPrefix` | +| `MonotonicMax` | Monotonic deque | O(1) amortised | `rolling_max()` | `MonotonicMax` | +| `MonotonicMin` | Monotonic deque | O(1) amortised | `rolling_min()` | `MonotonicMin` | +| `SlidingMedian` | Auto-dispatches to one of three below | — | `rolling_median()` (`expect_nan=`) | `SlidingMedian` | +| `FlatMedian` | Sorted vector + binary search | O(w) insert | — | `FlatMedian` | +| `MultisetMedian` | `std::multiset` + tracked iterator | O(log w) | — | `MultisetMedian` | +| `TwoHeapMedian` | Two heaps + lazy deletion | O(log w) | — | `TwoHeapMedian` | +| `SlidingMoments` | Terriberry's 4th-moment recurrence | O(1) | `rolling_skewness/kurtosis()` (`method="stable"`) | `SlidingMoments` | +| `SlidingCovariance` | Welford 2D online | O(1) | `rolling_cov()` `rolling_cor()` | `SlidingCovariance` | --- @@ -101,6 +104,22 @@ rolling_median(x, 3L) #> [1] NA NA 2 3 4 ``` +The implementation is chosen automatically from the window size: + +| Window size | Default (`expect_nan=FALSE`) | With `expect_nan=TRUE` | +| ------------- | ---------------------------- | ---------------------- | +| ≤ 600 | `FlatMedian` | `FlatMedian` | +| 601 – 1 500 | `MultisetMedian` | `FlatMedian` | +| 1 501 – 2 000 | `MultisetMedian` | `TwoHeapMedian` | +| > 2 000 | `TwoHeapMedian` | `TwoHeapMedian` | + +Pass `expect_nan = TRUE` when your data contains many `NA` values to avoid +`MultisetMedian`'s O(w) iterator-shift degradation on NaN-heavy data: + +```r +rolling_median(x, 1000L, expect_nan = TRUE) +``` + **Variance and mean** ```r @@ -123,6 +142,8 @@ rolling_kurtosis(y, 4L) #> [1] NA NA NA -1.2 -1.2 ``` +Returns excess kurtosis (Fisher's definition; normal distribution = 0). + **Covariance and Pearson correlation** ```r @@ -183,6 +204,9 @@ rr.rolling_min(x, 3) rr.rolling_median(x, 3) # array([nan, nan, 2., 3., 4.]) + +# NaN-heavy data — use expect_nan=True for large windows +rr.rolling_median(x, 1000, expect_nan=True) ``` ```python @@ -201,6 +225,8 @@ rr.rolling_kurtosis(y, 4) # array([nan, nan, nan, -1.2, -1.2]) ``` +Returns sample variance (ddof=1) and excess kurtosis (Fisher's definition). + ```python a = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) b = np.array([2.0, 4.0, 6.0, 8.0, 10.0]) @@ -249,89 +275,97 @@ rr.rolling_max(prices, 3) Direct access to the engine objects for incremental (streaming) use: ```python -from robustrolling import MonotonicMax, SlidingMoments, SlidingCovariance import numpy as np +import robust_rolling_core as rrc # Streaming — one value at a time -engine = MonotonicMax(3) +engine = rrc.MonotonicMax(3) for v in [1.0, 3.0, 2.0, 5.0]: engine.update(v) - print(engine.get_value()) + print(engine.get_max()) # 1.0 → 3.0 → 3.0 → 5.0 # Batch — zero-copy NumPy buffer -engine2 = SlidingMoments(3) +engine2 = rrc.SlidingMoments(3) x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) -print(engine2.process_skewness_batch(x)) +print(engine2.process_skewness_batch(x, 0)) # [nan, nan, 0., 0., 0.] # Covariance engine -cov_engine = SlidingCovariance(3) +cov_engine = rrc.SlidingCovariance(3) a = np.array([1.0, 2.0, 3.0, 4.0]) b = np.array([2.0, 4.0, 6.0, 8.0]) print(cov_engine.process_covariance_batch(a, b)) # [nan, nan, 2., 2.] ``` +**Median — auto-dispatch and direct algorithm access** + +```python +# Auto-dispatcher: picks FlatMedian / MultisetMedian / TwoHeapMedian +# based on window size and NaN hint +rrc.SlidingMedian(300).process_batch(x) # → FlatMedian (w ≤ 600) +rrc.SlidingMedian(700).process_batch(x) # → MultisetMedian (601–2000) +rrc.SlidingMedian(3000).process_batch(x) # → TwoHeapMedian (w > 2000) +rrc.SlidingMedian(700, expect_nan=True).process_batch(x) # → FlatMedian (w ≤ 1500) + +# Direct access — bypass dispatch and use a specific algorithm +rrc.FlatMedian(700).process_batch(x) +rrc.MultisetMedian(700).process_batch(x) +rrc.TwoHeapMedian(700).process_batch(x) +``` + +All four classes share the same interface: +`process_batch(array, min_periods=0)`, `update(value)`, `get_median()`. + +**Fast batch API — `SlidingMomentsPrefix`** + +`SlidingMomentsPrefix` is a stateless batch engine (prefix sums of raw +moments). It is faster than the online `SlidingMoments` but less numerically +stable for data with extreme values: + +```python +prefix = rrc.SlidingMomentsPrefix(3) +print(prefix.variance_batch(x)) # [nan, 0.5, 1., 1., 1.] +print(prefix.skewness_batch(x)) # [nan, nan, 0., 0., 0.] + +prefix4 = rrc.SlidingMomentsPrefix(4) +print(prefix4.kurtosis_batch(x)) # [nan, nan, nan, -1.2, -1.2] +``` + --- ## Performance Benchmarked on Apple M-series (ARM), window = 100, n = 1 000 000. +Full tables with all window sizes and R comparisons: [BENCHMARKS.md](BENCHMARKS.md). + +### Python vs pandas (highlights) -### Python vs pandas - -| Function | robustrolling | pandas | speedup | -| -------------------- | ------------- | -------- | -------- | -| `rolling_mean` | 3.1 ms | 4.4 ms | **1.4x** | -| `rolling_max` | 11.1 ms | 11.7 ms | 1.1x | -| `rolling_min` | 11.2 ms | 12.2 ms | 1.1x | -| `rolling_median` | 106 ms | 233 ms | **2.2x** | -| `rolling_variance` | 15.2 ms | 9.6 ms | 0.6x | -| `rolling_skewness` | 14.0 ms | 9.1 ms | 0.6x | -| `rolling_kurtosis` | 14.3 ms | 9.2 ms | 0.6x | -| `rolling_cov` | 14.8 ms | 18.2 ms | **1.2x** | -| `rolling_cor` | 14.6 ms | 36.7 ms | **2.5x** | - -### Python vs Polars - -| Function | robustrolling | Polars | speedup | -| -------------------- | ------------- | -------- | -------- | -| `rolling_mean` | 3.1 ms | 8.0 ms | **2.6x** | -| `rolling_max` | 11.1 ms | 11.4 ms | 1.0x | -| `rolling_min` | 11.0 ms | 11.6 ms | 1.1x | -| `rolling_median` | 106 ms | 40.8 ms | 0.4x | -| `rolling_variance` | 15.7 ms | 16.2 ms | 1.0x | -| `rolling_skewness` | 13.9 ms | 16.0 ms | **1.2x** | -| `rolling_kurtosis` | 14.3 ms | 15.6 ms | 1.1x | - -### Python — stable vs fast - -| Function | stable | fast | speedup | -| ---------------------- | -------- | -------- | -------- | -| `mean` (assume_finite) | 3.2 ms | 0.73 ms | **4.4x** | -| `variance` | 15.2 ms | 3.9 ms | **3.9x** | -| `skewness` | 13.9 ms | 10.0 ms | **1.4x** | -| `kurtosis` | 14.4 ms | 7.6 ms | **1.9x** | - -### R vs slider vs RcppRoll - -| Function | robustrolling | slider | RcppRoll | vs slider | vs RcppRoll | -| -------------------- | ------------- | ---------- | --------- | ---------- | ----------- | -| `rolling_max` | 15.1 ms | 338 ms | 175 ms | **22x** | **12x** | -| `rolling_min` | 14.9 ms | 350 ms | 175 ms | **24x** | **12x** | -| `rolling_mean` | 3.1 ms | 1 523 ms | 37.4 ms | **487x** | **12x** | -| `rolling_variance` | 16.0 ms | 2 477 ms | 304 ms | **154x** | **19x** | -| `rolling_median` | 112 ms | 10 084 ms | 1 938 ms | **90x** | **17x** | - -### R — stable vs fast - -| Function | stable | fast | speedup | -| ---------------------- | -------- | -------- | -------- | -| `mean` (assume_finite) | 3.2 ms | 0.78 ms | **4.0x** | -| `variance` | 16.2 ms | 4.1 ms | **4.0x** | -| `skewness` | 14.5 ms | 10.3 ms | **1.4x** | -| `kurtosis` | 14.4 ms | 7.8 ms | **1.8x** | +| Function | robustrolling | pandas | speedup | +| ------------------ | ------------- | ------- | -------- | +| `rolling_mean` | 3.1 ms | 4.4 ms | **1.4x** | +| `rolling_median` | 106 ms | 233 ms | **2.2x** | +| `rolling_cov` | 14.8 ms | 18.2 ms | **1.2x** | +| `rolling_cor` | 14.6 ms | 36.7 ms | **2.5x** | + +### Python vs Polars (highlights) + +`rolling_median` uses `FlatMedian` at window ≤ 600; `TwoHeapMedian` wins at +large windows where Polars' fixed O(w) algorithm loses ground. + +| Function | robustrolling | Polars | speedup | +| ------------------ | ------------- | ------- | ----------------- | +| `rolling_mean` | 3.1 ms | 8.0 ms | **2.6x** | +| `rolling_skewness` | 13.9 ms | 16.0 ms | **1.2x** | +| `rolling_median` | 170 ms | 430 ms | **2.5x** (w=5000) | + +### R vs slider vs RcppRoll (highlights) + +| Function | robustrolling | slider | RcppRoll | vs slider | vs RcppRoll | +| ------------------ | ------------- | --------- | -------- | --------- | ----------- | +| `rolling_mean` | 3.1 ms | 1 523 ms | 37.4 ms | **487x** | **12x** | +| `rolling_median` | 112 ms | 10 084 ms | 1 938 ms | **90x** | **17x** | --- @@ -345,7 +379,11 @@ RollingMetric ├── SlidingMean — prefix sum + ARM NEON / AVX2 SIMD ├── MonotonicMax — monotonic deque (max) ├── MonotonicMin — monotonic deque (min) -├── MultisetMedian — std::multiset + dual-iterator median tracking +├── FlatMedian — sorted vector + binary search (best: w ≤ 600) +├── MultisetMedian — std::multiset + tracked iterator (best: 601–2000) +├── TwoHeapMedian — two heaps + lazy deletion (best: w > 2000 or NaN-heavy) +├── SlidingMedian — dispatcher: holds variant, +│ selects implementation once in the constructor ├── SlidingWelfordRing — Welford variance + ring buffer eviction ├── SlidingMoments — Terriberry's 4th-moment recurrence └── SlidingCovariance — 2D Welford for covariance and Pearson correlation @@ -353,6 +391,13 @@ RollingMetric SlidingMomentsPrefix — stateless batch engine (prefix sums of raw moments) ``` +`SlidingMedian` dispatch rules (`std::visit` — zero runtime overhead): + +| `expect_nan` | w ≤ 600 | 601 – 1 500 | 1 501 – 2 000 | w > 2 000 | +| ------------ | ---------- | -------------- | -------------- | ------------- | +| `false` | FlatMedian | MultisetMedian | MultisetMedian | TwoHeapMedian | +| `true` | FlatMedian | FlatMedian | TwoHeapMedian | TwoHeapMedian | + **Bindings:** | Language | Technology | Notes | @@ -366,12 +411,12 @@ SlidingMomentsPrefix — stateless batch engine (prefix sums of raw moments) ### Requirements -| Tool | Version | -| ------------ | ----------------------------------------- | +| Tool | Version | +| ------------ | ------------------------------------------ | | C++ compiler | C++17 (GCC ≥ 9, Clang ≥ 10, MSVC ≥ 2019) | -| CMake | ≥ 3.14 | -| R | ≥ 4.0 | -| Python | ≥ 3.10 | +| CMake | ≥ 3.14 | +| R | ≥ 4.0 | +| Python | ≥ 3.10 | ### Build and test @@ -391,7 +436,8 @@ make py-test # pytest # Benchmarks Rscript benchmarks/bench_r.R -python benchmarks/bench_python.py +python benchmarks/bench_python.py # vs pandas + stable/fast + median sweep +python benchmarks/bench_polars.py # vs Polars + median sweep vs Polars ``` --- diff --git a/benchmarks/bench_core.cxx b/benchmarks/bench_core.cxx index ea10cdd..a3a5b0b 100644 --- a/benchmarks/bench_core.cxx +++ b/benchmarks/bench_core.cxx @@ -1,3 +1,4 @@ +#include "SlidingMedian.hpp" #include "MonotonicMax.hpp" #include "MonotonicMin.hpp" #include "MultisetMedian.hpp" @@ -5,6 +6,7 @@ #include "SlidingMean.hpp" #include "SlidingMoments.hpp" #include "SlidingWelfordRing.hpp" +#include "TwoHeapMedian.hpp" #include #include #include @@ -21,8 +23,8 @@ static std::vector make_data(std::size_t n, double nan_frac = 0.0) { return v; } -const auto DATA_CLEAN = make_data(100'000); -const auto DATA_NAN = make_data(100'000, 0.15); // 15% NaN +const auto DATA_CLEAN = make_data(1'000'000); +const auto DATA_NAN = make_data(1'000'000, 0.15); // 15% NaN static void BM_MonotonicMax(benchmark::State &state) { std::size_t w = static_cast(state.range(0)); @@ -58,7 +60,31 @@ static void BM_MultisetMedian(benchmark::State &state) { } } } -BENCHMARK(BM_MultisetMedian)->Arg(10)->Arg(100)->Arg(1000); +BENCHMARK(BM_MultisetMedian)->Arg(10)->Arg(100)->Arg(500)->Arg(1000)->Arg(5000)->Arg(20000); + +static void BM_TwoHeapMedian(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + TwoHeapMedian engine(w); + for (double v : DATA_CLEAN) { + engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} +BENCHMARK(BM_TwoHeapMedian)->Arg(10)->Arg(100)->Arg(1000)->Arg(5000)->Arg(20000); + +static void BM_SlidingMedian(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + SlidingMedian engine(w); + for (double v : DATA_CLEAN) { + engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} +BENCHMARK(BM_SlidingMedian)->Arg(10)->Arg(100)->Arg(500)->Arg(1000)->Arg(5000); static void BM_SlidingMean(benchmark::State &state) { std::size_t w = static_cast(state.range(0)); @@ -136,7 +162,37 @@ static void BM_MultisetMedian_NaN(benchmark::State &state) { } } } -BENCHMARK(BM_MultisetMedian_NaN)->Arg(100); +BENCHMARK(BM_MultisetMedian_NaN)->Arg(100)->Arg(1000); + +static void BM_SlidingMedian_NaN(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + SlidingMedian engine(w); + for (double v : DATA_NAN) { + if (std::isnan(v)) + engine.skip(); + else + engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} +BENCHMARK(BM_SlidingMedian_NaN)->Arg(100)->Arg(1000); + +static void BM_TwoHeapMedian_NaN(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + TwoHeapMedian engine(w); + for (double v : DATA_NAN) { + if (std::isnan(v)) + engine.skip(); + else + engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} +BENCHMARK(BM_TwoHeapMedian_NaN)->Arg(100)->Arg(1000); static void BM_SlidingMoments_NaN(benchmark::State &state) { std::size_t w = static_cast(state.range(0)); diff --git a/benchmarks/bench_median_sweep.cxx b/benchmarks/bench_median_sweep.cxx new file mode 100644 index 0000000..8e99a57 --- /dev/null +++ b/benchmarks/bench_median_sweep.cxx @@ -0,0 +1,98 @@ +#include "FlatMedian.hpp" +#include "MultisetMedian.hpp" +#include "SlidingMedian.hpp" +#include "TwoHeapMedian.hpp" +#include +#include +#include +#include +#include + +static std::vector make_data(std::size_t n, double nan_frac = 0.0) { + std::mt19937 gen(42); + std::normal_distribution dist(100.0, 5.0); + std::uniform_real_distribution coin(0.0, 1.0); + std::vector v(n); + for (auto &x : v) + x = (coin(gen) < nan_frac) ? std::numeric_limits::quiet_NaN() + : dist(gen); + return v; +} + +const auto DATA_CLEAN = make_data(500'000); +const auto DATA_NAN = make_data(500'000, 0.15); + + +template +static void run_clean(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + Engine engine(w); + for (double v : DATA_CLEAN) { + engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} + +template +static void run_nan(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + Engine engine(w); + for (double v : DATA_NAN) { + if (std::isnan(v)) engine.skip(); + else engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} + +#define WINDOW_ARGS \ + ->Arg(10)->Arg(50)->Arg(100)->Arg(200)->Arg(300)->Arg(400) \ + ->Arg(500)->Arg(600)->Arg(700)->Arg(800)->Arg(1000)->Arg(2000)->Arg(5000) + +static void BM_Flat_Clean(benchmark::State &s) { run_clean(s); } +static void BM_Multiset_Clean(benchmark::State &s) { run_clean(s); } +static void BM_TwoHeap_Clean(benchmark::State &s) { run_clean(s); } +static void BM_Sliding_Clean(benchmark::State &s) { run_clean(s); } + +static void BM_SlidingNaNHint_Clean(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + SlidingMedian engine(w, /*expect_nan=*/true); + for (double v : DATA_CLEAN) { + engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} + +BENCHMARK(BM_Flat_Clean) WINDOW_ARGS; +BENCHMARK(BM_Multiset_Clean) WINDOW_ARGS; +BENCHMARK(BM_TwoHeap_Clean) WINDOW_ARGS; +BENCHMARK(BM_Sliding_Clean) WINDOW_ARGS; +BENCHMARK(BM_SlidingNaNHint_Clean) WINDOW_ARGS; + +static void BM_Flat_NaN(benchmark::State &s) { run_nan(s); } +static void BM_Multiset_NaN(benchmark::State &s) { run_nan(s); } +static void BM_TwoHeap_NaN(benchmark::State &s) { run_nan(s); } +static void BM_Sliding_NaN(benchmark::State &s) { run_nan(s); } + +static void BM_SlidingNaNHint_NaN(benchmark::State &state) { + std::size_t w = static_cast(state.range(0)); + for (auto _ : state) { + SlidingMedian engine(w, /*expect_nan=*/true); + for (double v : DATA_NAN) { + if (std::isnan(v)) engine.skip(); + else engine.update(v); + benchmark::DoNotOptimize(engine.get_median()); + } + } +} + +BENCHMARK(BM_Flat_NaN) WINDOW_ARGS; +BENCHMARK(BM_Multiset_NaN) WINDOW_ARGS; +BENCHMARK(BM_TwoHeap_NaN) WINDOW_ARGS; +BENCHMARK(BM_Sliding_NaN) WINDOW_ARGS; +BENCHMARK(BM_SlidingNaNHint_NaN) WINDOW_ARGS; diff --git a/benchmarks/bench_polars.py b/benchmarks/bench_polars.py index e149adf..5a53307 100644 --- a/benchmarks/bench_polars.py +++ b/benchmarks/bench_polars.py @@ -1,8 +1,8 @@ """ -Benchmark: robustrolling vs Polars rolling functions (stable methods only). +Benchmark: robustrolling vs Polars rolling_median. +Uses process_batch for all engines (no Python-loop overhead). Usage: - pip install polars python benchmarks/bench_polars.py """ @@ -10,74 +10,221 @@ import numpy as np import polars as pl import robustrolling as rr +import robust_rolling_core as rrc RNG = np.random.default_rng(42) -SIZES = [10_000, 100_000, 1_000_000] -WINDOW = 100 -REPS = 10 +N = 500_000 +REPS = 5 +NAN_FRAC = 0.15 +WINDOW = 100 # fixed window for the vs-Polars all-metrics table +WINDOWS = [10, 50, 100, 200, 300, 400, 500, 600, 700, 800, 1000, 2000, 5000] -def bench(fn, reps: int = REPS) -> float: - """Return median wall time in milliseconds over `reps` runs.""" - fn() # warmup: prime caches before timing - times = [] + +def bench(fn, reps=REPS) -> float: + """Return median wall-time in milliseconds over *reps* runs (1 warmup).""" + fn() + ts = [] for _ in range(reps): t0 = time.perf_counter() fn() - times.append(time.perf_counter() - t0) - return float(np.median(times)) * 1_000 + ts.append(time.perf_counter() - t0) + return float(np.median(ts)) * 1_000 + + +def ns_per_elem(ms: float) -> float: + return ms / N * 1e6 -def make_data(n: int): - x = RNG.standard_normal(n) - return x, pl.Series(x) +def flag(v: float) -> str: + return "✓" if v >= 1.0 else " " + +# ── 1. all-metrics comparison at fixed window ────────────────────────────────── def run_vs_polars(n: int) -> list[dict]: - x, sx = make_data(n) - w = WINDOW + rng = np.random.default_rng(0) + x = rng.standard_normal(n) + sx = pl.Series(x) + w = WINDOW cases = [ - ("rolling_max", lambda: rr.rolling_max(x, w), lambda: sx.rolling_max(w)), - ("rolling_min", lambda: rr.rolling_min(x, w), lambda: sx.rolling_min(w)), - ("rolling_mean", lambda: rr.rolling_mean(x, w), lambda: sx.rolling_mean(w)), - ("rolling_variance", lambda: rr.rolling_variance(x, w), lambda: sx.rolling_var(w)), - ("rolling_median", lambda: rr.rolling_median(x, w), lambda: sx.rolling_median(w)), - ("rolling_skewness", lambda: rr.rolling_skewness(x, w), lambda: sx.rolling_skew(w)), - ("rolling_kurtosis", lambda: rr.rolling_kurtosis(x, w), lambda: sx.rolling_kurtosis(w)), + ("rolling_max", lambda: rr.rolling_max(x, w), + lambda: sx.rolling_max(window_size=w)), + ("rolling_min", lambda: rr.rolling_min(x, w), + lambda: sx.rolling_min(window_size=w)), + ("rolling_mean", lambda: rr.rolling_mean(x, w), + lambda: sx.rolling_mean(window_size=w)), + ("rolling_variance", lambda: rr.rolling_variance(x, w), + lambda: sx.rolling_var(window_size=w)), + ("rolling_median", lambda: rr.rolling_median(x, w), + lambda: sx.rolling_median(window_size=w)), + ("rolling_skewness", lambda: rr.rolling_skewness(x, w), + lambda: sx.rolling_skew(window_size=w)), + ("rolling_kurtosis", lambda: rr.rolling_kurtosis(x, w), + lambda: sx.rolling_kurtosis(window_size=w)), ] - results = [] + rows = [] for name, our_fn, pl_fn in cases: our_ms = bench(our_fn) - pl_ms = bench(pl_fn) - results.append({"name": name, "our_ms": our_ms, "pl_ms": pl_ms, - "speedup": pl_ms / our_ms}) - return results - - -def flag(v: float) -> str: - return "x" if v >= 1.0 else " " + pl_ms = bench(pl_fn) + rows.append({"name": name, "our_ms": our_ms, "pl_ms": pl_ms, + "speedup": pl_ms / our_ms}) + return rows -def print_table(n: int, rows: list[dict]) -> None: +def print_vs_polars(n: int, rows: list[dict]) -> None: print(f"\n n = {n:,} window = {WINDOW} (median of {REPS} runs)") - print(f" {'Function':<28} {'robustrolling':>14} {'polars':>10} {'speedup':>9}") - print(" " + "-" * 65) + print(f" {'Function':<22} {'robustrolling':>14} {'polars':>10} {'speedup':>9}") + print(" " + "-" * 59) for r in rows: print( - f" {r['name']:<28} {r['our_ms']:>11.2f} ms" + f" {r['name']:<22} {r['our_ms']:>11.2f} ms" f" {r['pl_ms']:>7.2f} ms" f" {r['speedup']:>6.2f}x {flag(r['speedup'])}" ) +# ── 2. rolling_median sweep vs Polars ───────────────────────────────────────── + +def which_algo(w: int, nan: bool) -> str: + """Mirror the SlidingMedian dispatch thresholds.""" + if nan: + return "Flat" if w <= 1500 else "TwoHeap" + if w <= 600: return "Flat" + if w <= 2000: return "Multiset" + return "TwoHeap" + + +def run_median_sweep() -> list[dict]: + data_clean = RNG.standard_normal(N) + data_nan = data_clean.copy() + data_nan[RNG.random(N) < NAN_FRAC] = np.nan + + pl_clean = pl.Series(data_clean) + pl_nan = pl.Series(data_nan).fill_nan(None) # Polars uses null, not NaN + + rows = [] + for w in WINDOWS: + row: dict = {"window": w} + + for label, data, nan_hint, pl_s in [ + ("clean", data_clean, False, pl_clean), + ("nan", data_nan, True, pl_nan), + ]: + # individual engines via process_batch (pure C++, no Python loop) + for name, factory in [ + ("Flat", lambda _w=w: rrc.FlatMedian(_w)), + ("Multiset", lambda _w=w: rrc.MultisetMedian(_w)), + ("TwoHeap", lambda _w=w: rrc.TwoHeapMedian(_w)), + ]: + row[f"{label}_{name}_ms"] = bench( + lambda _f=factory, _d=data: _f().process_batch(_d) + ) + + # SlidingMedian dispatcher + row[f"{label}_Sliding_ms"] = bench( + lambda _w=w, _h=nan_hint, _d=data: + rrc.SlidingMedian(_w, _h).process_batch(_d) + ) + + # high-level Python API (rr.rolling_median) + row[f"{label}_rr_ms"] = bench( + lambda _d=data, _w=w: rr.rolling_median(_d, _w) + ) + + # Polars + row[f"{label}_polars_ms"] = bench( + lambda _s=pl_s, _w=w: _s.rolling_median(window_size=_w) + ) + + row[f"{label}_algo"] = which_algo(w, nan_hint) + + rows.append(row) + return rows + + +def print_median_sweep(rows: list[dict]) -> None: + for label, title in [ + ("clean", "Clean data (no NaN)"), + ("nan", "NaN-heavy (15% NaN)"), + ]: + print(f"\n {title} — n = {N:,} (median of {REPS} runs, ns per element)") + print( + f" {'win':>5} {'dispatches':>9}" + f" {'Flat':>9} {'Multiset':>9} {'TwoHeap':>9}" + f" {'Sliding':>9} {'rr.API':>9} {'Polars':>9}" + f" {'speedup':>9}" + ) + print(" " + "-" * 97) + for r in rows: + w = r["window"] + sl = r[f"{label}_Sliding_ms"] + po = r[f"{label}_polars_ms"] + su = po / sl if sl > 0 else float("nan") + fl = ns_per_elem(r[f"{label}_Flat_ms"]) + mu = ns_per_elem(r[f"{label}_Multiset_ms"]) + he = ns_per_elem(r[f"{label}_TwoHeap_ms"]) + sl_n = ns_per_elem(sl) + rr_n = ns_per_elem(r[f"{label}_rr_ms"]) + po_n = ns_per_elem(po) + algo = r[f"{label}_algo"] + print( + f" {w:>5} {algo:>9}" + f" {fl:>7.2f} ns {mu:>7.2f} ns {he:>7.2f} ns" + f" {sl_n:>7.2f} ns {rr_n:>7.2f} ns {po_n:>7.2f} ns" + f" {su:>7.2f}x {flag(su)}" + ) + + print() + print(" Sliding = SlidingMedian.process_batch (auto-dispatch, no Python overhead)") + print(" rr.API = rr.rolling_median (high-level Python wrapper)") + print(" speedup = Polars ns / Sliding ns (>1.0x means we are faster)") + + +# ── 3. best engine per window (heatmap-style summary) ───────────────────────── + +def print_winner_summary(rows: list[dict]) -> None: + engines = ["Flat", "Multiset", "TwoHeap", "Sliding"] + print("\n Best C++ engine vs Polars per window (lower ns/elem is better)") + print(f" {'win':>5} {'clean: best':>13} {'ns/el':>8} {'speedup':>8}" + f" {'nan: best':>13} {'ns/el':>8} {'speedup':>8}") + print(" " + "-" * 80) + for r in rows: + w = r["window"] + for label in ["clean", "nan"]: + best_name = min(engines, key=lambda e, l=label, _r=r: _r[f"{l}_{e}_ms"]) + best_ms = r[f"{label}_{best_name}_ms"] + po_ms = r[f"{label}_polars_ms"] + su = po_ms / best_ms if best_ms > 0 else float("nan") + if label == "clean": + c_str = f"{best_name:>13} {ns_per_elem(best_ms):>7.2f} ns {su:>7.2f}x {flag(su)}" + else: + n_str = f"{best_name:>13} {ns_per_elem(best_ms):>7.2f} ns {su:>7.2f}x {flag(su)}" + print(f" {w:>5} {c_str} {n_str}") + + +# ── main ─────────────────────────────────────────────────────────────────────── + if __name__ == "__main__": - print(f"robustrolling vs Polars {pl.__version__} — rolling window benchmark") + print(f"robustrolling vs Polars {pl.__version__} — rolling window benchmark") + print("=" * 65) + + print("\n[1] All metrics at fixed window") print("=" * 65) - for n in SIZES: + for n in [10_000, 100_000, 1_000_000]: rows = run_vs_polars(n) - print_table(n, rows) + print_vs_polars(n, rows) + + print(f"\n\n[2] rolling_median sweep vs Polars (n = {N:,})") + print("=" * 99) + sweep = run_median_sweep() + print_median_sweep(sweep) + + print("\n\n[3] Best engine summary") + print("=" * 82) + print_winner_summary(sweep) print() diff --git a/benchmarks/bench_python.py b/benchmarks/bench_python.py index b5c8a62..145c00c 100644 --- a/benchmarks/bench_python.py +++ b/benchmarks/bench_python.py @@ -1,5 +1,6 @@ """ -Benchmark: robustrolling vs pandas rolling functions + stable vs fast. +Benchmark: robustrolling vs pandas rolling functions + stable vs fast + + rolling_median dispatch sweep across window sizes. Usage: pip install pandas @@ -10,6 +11,7 @@ import numpy as np import pandas as pd import robustrolling as rr +import robust_rolling_core as rrc RNG = np.random.default_rng(42) @@ -43,15 +45,15 @@ def run_vs_pandas(n: int) -> list[dict]: roll = s.rolling(w) cases = [ - ("rolling_max", lambda: rr.rolling_max(x, w), lambda: roll.max()), - ("rolling_min", lambda: rr.rolling_min(x, w), lambda: roll.min()), - ("rolling_mean", lambda: rr.rolling_mean(x, w), lambda: roll.mean()), - ("rolling_variance", lambda: rr.rolling_variance(x, w), lambda: roll.var()), - ("rolling_median", lambda: rr.rolling_median(x, w), lambda: roll.median()), - ("rolling_skewness", lambda: rr.rolling_skewness(x, w), lambda: roll.skew()), - ("rolling_kurtosis", lambda: rr.rolling_kurtosis(x, w), lambda: roll.kurt()), - ("rolling_cov", lambda: rr.rolling_cov(x, y, w), lambda: roll.cov(t)), - ("rolling_cor", lambda: rr.rolling_cor(x, y, w), lambda: roll.corr(t)), + ("rolling_max", lambda: rr.rolling_max(x, w), lambda: roll.max()), + ("rolling_min", lambda: rr.rolling_min(x, w), lambda: roll.min()), + ("rolling_mean", lambda: rr.rolling_mean(x, w), lambda: roll.mean()), + ("rolling_variance", lambda: rr.rolling_variance(x, w), lambda: roll.var()), + ("rolling_median", lambda: rr.rolling_median(x, w), lambda: roll.median()), + ("rolling_skewness", lambda: rr.rolling_skewness(x, w), lambda: roll.skew()), + ("rolling_kurtosis", lambda: rr.rolling_kurtosis(x, w), lambda: roll.kurt()), + ("rolling_cov", lambda: rr.rolling_cov(x, y, w), lambda: roll.cov(t)), + ("rolling_cor", lambda: rr.rolling_cor(x, y, w), lambda: roll.corr(t)), ] results = [] @@ -119,6 +121,70 @@ def print_stable_vs_fast(n: int, rows: list[dict]) -> None: ) +MEDIAN_WINDOWS = [10, 50, 100, 200, 300, 400, 500, 600, 700, 800, 1000, 2000, 5000] +MEDIAN_N = 500_000 +MEDIAN_NAN_FRAC = 0.15 + + +def run_median_sweep() -> list[dict]: + """Per-window-size comparison of FlatMedian / MultisetMedian / TwoHeapMedian + vs the SlidingMedian dispatcher, on clean and NaN-heavy data.""" + rng = np.random.default_rng(42) + data_clean = rng.standard_normal(MEDIAN_N) + data_nan = data_clean.copy() + data_nan[rng.random(MEDIAN_N) < MEDIAN_NAN_FRAC] = np.nan + + # Dispatch thresholds (must match SlidingMedian.hpp constants) + FLAT_CLEAN = 600 + HEAP_CLEAN = 2000 + FLAT_NAN = 1500 + + def which_algo(w: int, nan: bool) -> str: + if nan: + return "FlatMedian" if w <= FLAT_NAN else "TwoHeapMedian" + if w <= FLAT_CLEAN: + return "FlatMedian" + if w <= HEAP_CLEAN: + return "MultisetMedian" + return "TwoHeapMedian" + + results = [] + for w in MEDIAN_WINDOWS: + row: dict = {"window": w} + for label, data, nan_hint in [("clean", data_clean, False), + ("nan15", data_nan, True)]: + engines = { + "Flat": lambda _w=w: rrc.FlatMedian(_w), + "Multiset": lambda _w=w: rrc.MultisetMedian(_w), + "TwoHeap": lambda _w=w: rrc.TwoHeapMedian(_w), + "Sliding": lambda _w=w, _h=nan_hint: rrc.SlidingMedian(_w, _h), + } + for name, factory in engines.items(): + row[f"{label}_{name}_ms"] = bench( + lambda _d=data, _f=factory: _f().process_batch(_d), + reps=3, + ) + row[f"{label}_algo"] = which_algo(w, nan_hint) + results.append(row) + return results + + +def print_median_sweep(rows: list[dict]) -> None: + hdr = f" {'window':>6} {'dispatches to':>15} {'Flat':>8} {'Multiset':>9} {'TwoHeap':>9} {'Sliding':>9}" + for label, title in [("clean", "Clean data (no NaN)"), ("nan15", "NaN-heavy data (15% NaN)")]: + print(f"\n {title}") + print(hdr) + print(" " + "-" * 73) + for r in rows: + print( + f" {r['window']:>6} {r[f'{label}_algo']:>15}" + f" {r[f'{label}_Flat_ms']:>6.1f} ms" + f" {r[f'{label}_Multiset_ms']:>7.1f} ms" + f" {r[f'{label}_TwoHeap_ms']:>7.1f} ms" + f" {r[f'{label}_Sliding_ms']:>7.1f} ms" + ) + + if __name__ == "__main__": print("robustrolling vs pandas — rolling window benchmark") print("=" * 59) @@ -132,4 +198,9 @@ def print_stable_vs_fast(n: int, rows: list[dict]) -> None: rows = run_stable_vs_fast(n) print_stable_vs_fast(n, rows) + print("\n\nrolling_median dispatch sweep (n = 500 000)") + print("=" * 75) + sweep = run_median_sweep() + print_median_sweep(sweep) + print() diff --git a/benchmarks/bench_r.R b/benchmarks/bench_r.R index 0eeb9db..5091126 100644 --- a/benchmarks/bench_r.R +++ b/benchmarks/bench_r.R @@ -169,6 +169,66 @@ print_stable_vs_fast <- function(n, df) { } } +## ── rolling_median dispatch sweep ───────────────────────────────────────────── + +MEDIAN_WINDOWS <- c(10L, 50L, 100L, 200L, 300L, 400L, 500L, + 600L, 700L, 800L, 1000L, 2000L, 5000L) +MEDIAN_N <- 500000L +NAN_FRAC <- 0.15 + +which_algo <- function(w, nan_hint) { + if (nan_hint) { + if (w <= 1500L) "FlatMedian" else "TwoHeapMedian" + } else { + if (w <= 600L) "FlatMedian" + else if (w <= 2000L) "MultisetMedian" + else "TwoHeapMedian" + } +} + +run_median_sweep <- function() { + set.seed(42) + data_clean <- as.double(rnorm(MEDIAN_N)) + data_nan <- data_clean + data_nan[sample(MEDIAN_N, floor(MEDIAN_N * NAN_FRAC))] <- NA_real_ + + rows <- lapply(MEDIAN_WINDOWS, function(w) { + time_ms <- function(x, expect_nan) { + bm <- bench::mark( + rolling_median(x, w, min_periods = 0L, expect_nan = expect_nan), + iterations = 3L, check = FALSE, memory = FALSE + ) + as.numeric(bm$median) * 1000 + } + data.frame( + window = w, + clean_algo = which_algo(w, FALSE), + clean_ms = time_ms(data_clean, FALSE), + nan_algo = which_algo(w, TRUE), + nan_ms = time_ms(data_nan, TRUE), + nan_default_ms = time_ms(data_nan, FALSE), + stringsAsFactors = FALSE + ) + }) + do.call(rbind, rows) +} + +print_median_sweep <- function(df) { + cat(sprintf("\n %-8s %-15s %10s %-15s %10s %10s\n", + "window", "clean algo", "clean ms", + "nan algo", "nan ms", "nan(def) ms")) + cat(" ", strrep("-", 80), "\n", sep = "") + for (i in seq_len(nrow(df))) { + r <- df[i, ] + cat(sprintf(" %-8d %-15s %8.1f ms %-15s %8.1f ms %8.1f ms\n", + r$window, + r$clean_algo, r$clean_ms, + r$nan_algo, r$nan_ms, r$nan_default_ms)) + } + cat("\n nan ms = expect_nan=TRUE (NaN-robust dispatch path)\n") + cat(" nan(def) ms = expect_nan=FALSE (default path on NaN-heavy data)\n") +} + cat("robustrolling vs slider vs RcppRoll\n") cat(strrep("=", 80), "\n") for (n in SIZES) { @@ -190,4 +250,9 @@ for (n in SIZES) { print_stable_vs_fast(n, df) } +cat("\n\nrolling_median dispatch sweep (n = 500 000, NaN fraction = 15%)\n") +cat(strrep("=", 80), "\n") +df_sweep <- run_median_sweep() +print_median_sweep(df_sweep) + cat("\n") diff --git a/cspell.json b/cspell.json index 3abaf2c..a006dfc 100644 --- a/cspell.json +++ b/cspell.json @@ -3,7 +3,40 @@ "ignorePaths": [], "dictionaryDefinitions": [], "dictionaries": [], - "words": [], + "words": [ + "amortised", + "automodule", + "contiguous", + "cspell", + "ddof", + "deque", + "dtype", + "endian", + "ffp", + "flto", + "funroll", + "googletest", + "kurtosis", + "multiset", + "ndarray", + "ndim", + "optimised", + "pybind", + "pyproject", + "randn", + "rbind", + "rcpp", + "robustrolling", + "roxygen", + "rtol", + "scikit", + "skewness", + "tinytest", + "vapply", + "crtp", + "terriberry", + "welford" + ], "ignoreWords": [], "import": [] } diff --git a/docs/python/api.rst b/docs/python/api.rst index 4388496..d9b32e5 100644 --- a/docs/python/api.rst +++ b/docs/python/api.rst @@ -17,9 +17,9 @@ High-level functions Low-level classes ----------------- -Six C++ classes are exposed directly for streaming (one observation at a time) -or for computing multiple statistics in a single pass without calling several -high-level functions. +Eleven C++ classes are exposed directly for streaming (one observation at a +time) or for computing multiple statistics in a single pass without calling +several high-level functions. .. list-table:: :header-rows: 1 @@ -28,24 +28,39 @@ high-level functions. * - Class - Algorithm - Key methods + * - :py:class:`~robustrolling.SlidingMean` + - Prefix sum + SIMD + - ``update``, ``get_mean``, ``process_batch`` * - :py:class:`~robustrolling.MonotonicMax` - Monotonic deque - ``update``, ``get_max``, ``process_batch`` * - :py:class:`~robustrolling.MonotonicMin` - Monotonic deque - ``update``, ``get_min``, ``process_batch`` + * - :py:class:`~robustrolling.SlidingMedian` + - Auto-dispatch (Flat / Multiset / TwoHeap) + - ``update``, ``get_median``, ``process_batch`` + * - :py:class:`~robustrolling.FlatMedian` + - Sorted vector — best w ≤ 600 or NaN-heavy + - ``update``, ``get_median``, ``process_batch`` * - :py:class:`~robustrolling.MultisetMedian` - - ``std::multiset`` + tracked iterator + - ``std::multiset`` — best 601–2 000, clean data + - ``update``, ``get_median``, ``process_batch`` + * - :py:class:`~robustrolling.TwoHeapMedian` + - Two heaps + lazy deletion — best w > 2 000 or NaN-heavy - ``update``, ``get_median``, ``process_batch`` * - :py:class:`~robustrolling.SlidingWelford` - Welford + ring buffer - ``update``, ``get_variance``, ``process_batch`` * - :py:class:`~robustrolling.SlidingMoments` - Terriberry 4th-moment - - ``update``, ``get_mean``, ``get_skewness``, ``get_kurtosis`` + - ``update``, ``get_mean``, ``get_skewness``, ``get_kurtosis``, ``process_skewness_batch``, ``process_kurtosis_batch`` * - :py:class:`~robustrolling.SlidingCovariance` - 2-D Welford - - ``update``, ``get_covariance``, ``get_correlation`` + - ``update``, ``get_covariance``, ``get_correlation``, ``process_covariance_batch`` + * - :py:class:`~robustrolling.SlidingMomentsPrefix` + - Prefix sums of raw moments (stateless batch) + - ``variance_batch``, ``skewness_batch``, ``kurtosis_batch`` .. toctree:: :hidden: diff --git a/docs/python/conf.py b/docs/python/conf.py index ecbcb32..a879a75 100644 --- a/docs/python/conf.py +++ b/docs/python/conf.py @@ -54,3 +54,5 @@ "sidebar_hide_name": False, "navigation_with_keys": True, } + +toc_object_entries = False diff --git a/docs/python/index.rst b/docs/python/index.rst index bdd51af..f20eabe 100644 --- a/docs/python/index.rst +++ b/docs/python/index.rst @@ -4,13 +4,13 @@ robustrolling .. raw:: html

- High-performance rolling-window statistics for R and Python — six + High-performance rolling-window statistics for R and Python — eleven algorithms implemented in C++17, exposed through idiomatic bindings in - both languages, with O(1) or O(log n) updates per element. + both languages, with O(1) or O(log w) updates per element.

.. toctree:: - :maxdepth: 2 + :maxdepth: 1 :caption: API Reference api @@ -21,18 +21,28 @@ Algorithm overview .. list-table:: :header-rows: 1 - :widths: 26 22 12 20 20 + :widths: 26 26 12 18 18 * - C++ class - Algorithm - Complexity - R function(s) - - Python function(s) + - Python class / function(s) + * - ``SlidingMean`` + - Prefix sum + SIMD + - O(n) batch + - ``rolling_mean`` + - ``rolling_mean``, ``SlidingMean`` * - ``SlidingWelfordRing`` - Welford online variance (ring buffer) - O(1) - - ``rolling_variance`` + - ``rolling_variance`` (``method="stable"``) - ``rolling_variance``, ``SlidingWelford`` + * - ``SlidingMomentsPrefix`` + - Prefix sums of raw moments + - O(n) batch + - ``rolling_variance/skewness/kurtosis`` (``method="fast"``) + - ``SlidingMomentsPrefix`` * - ``MonotonicMax`` - Monotonic deque maximum - O(1) amortised @@ -43,16 +53,31 @@ Algorithm overview - O(1) amortised - ``rolling_min`` - ``rolling_min``, ``MonotonicMin`` + * - ``SlidingMedian`` + - Auto-dispatches to one of three below + - — + - ``rolling_median`` (``expect_nan=``) + - ``rolling_median``, ``SlidingMedian`` + * - ``FlatMedian`` + - Sorted ``std::vector`` + binary search + - O(w) insert + - — + - ``FlatMedian`` * - ``MultisetMedian`` - - ``std::multiset`` tracked-iterator median - - O(log n) - - ``rolling_median`` - - ``rolling_median``, ``MultisetMedian`` + - ``std::multiset`` + tracked iterator + - O(log w) + - — + - ``MultisetMedian`` + * - ``TwoHeapMedian`` + - Two heaps + lazy deletion + - O(log w) + - — + - ``TwoHeapMedian`` * - ``SlidingMoments`` - Terriberry 4th-moment online algorithm - O(1) - - ``rolling_mean``, ``rolling_skewness``, ``rolling_kurtosis`` - - ``rolling_mean``, ``rolling_skewness``, ``rolling_kurtosis``, ``SlidingMoments`` + - ``rolling_skewness``, ``rolling_kurtosis`` (``method="stable"``) + - ``rolling_skewness``, ``rolling_kurtosis``, ``SlidingMoments`` * - ``SlidingCovariance`` - 2-D Welford online covariance - O(1) @@ -63,8 +88,7 @@ Performance ----------- Benchmarked on Apple M-series (ARM), window = 100, n = 1 000 000 -(median of 10 runs). Best robustrolling configuration shown -(¹ ``assume_finite=True``, ² ``method="fast"``). +(median of 10 runs). **Python vs pandas** @@ -115,6 +139,9 @@ Benchmarked on Apple M-series (ARM), window = 100, n = 1 000 000 **Python vs Polars** +``rolling_median`` uses ``FlatMedian`` at window ≤ 600; ``TwoHeapMedian`` +wins at large windows where Polars' fixed O(w) algorithm loses ground. + .. list-table:: :header-rows: 1 :widths: 28 18 14 12 @@ -136,9 +163,9 @@ Benchmarked on Apple M-series (ARM), window = 100, n = 1 000 000 - 11.6 ms - 1.1x * - ``rolling_median`` - - 106 ms - - 40.8 ms - - 0.4x + - 55 ms + - 41 ms + - 0.7x (w=100) * - ``rolling_variance`` - 15.7 ms - 16.2 ms @@ -254,7 +281,7 @@ Install for R .. code-block:: r - # Requires a C++17 compiler (GCC ≥ 7, Clang ≥ 5, MSVC ≥ 2017) + # Requires R ≥ 4.0 and a C++17 compiler install.packages("remotes") remotes::install_github("IgorPtak/rolling_window") @@ -263,7 +290,7 @@ Install for Python .. code-block:: bash - # Requires Python ≥ 3.8 and a C++17 compiler + # Requires Python ≥ 3.10 and a C++17 compiler pip install git+https://github.com/IgorPtak/rolling_window.git#subdirectory=py_package # Or clone and install locally: diff --git a/docs/python/low_level.rst b/docs/python/low_level.rst index 0cffd97..e09f047 100644 --- a/docs/python/low_level.rst +++ b/docs/python/low_level.rst @@ -8,6 +8,27 @@ time) or to read multiple statistics from a single pass. ---- +.. py:class:: SlidingMean(window_size) + + Rolling mean — prefix sum with optional ARM NEON / AVX2 SIMD, O(n) batch. + + :param window_size: int + + .. py:method:: update(value: float) + .. py:method:: get_mean() -> float + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray + + .. code-block:: python + + import robust_rolling_core as rrc + import numpy as np + + sm = rrc.SlidingMean(3) + x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + sm.process_batch(x) # [1.0, 1.5, 2.0, 3.0, 4.0] + +---- + .. py:class:: MonotonicMax(window_size) Rolling maximum — monotonic deque, O(1) amortised. @@ -16,11 +37,11 @@ time) or to read multiple statistics from a single pass. .. py:method:: update(value: float) .. py:method:: get_max() -> float - .. py:method:: process_batch(x: numpy.ndarray) -> numpy.ndarray + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray .. code-block:: python - mm = MonotonicMax(3) + mm = rrc.MonotonicMax(3) mm.update(1.0); mm.update(3.0); mm.update(2.0) mm.get_max() # 3.0 @@ -34,20 +55,130 @@ time) or to read multiple statistics from a single pass. .. py:method:: update(value: float) .. py:method:: get_min() -> float - .. py:method:: process_batch(x: numpy.ndarray) -> numpy.ndarray + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray + +---- + +Median classes +-------------- + +Four classes implement rolling median. :py:class:`SlidingMedian` is the +recommended default — it selects the fastest sub-implementation in the +constructor, with zero runtime dispatch overhead (``std::visit`` on +``std::variant``). + +.. list-table:: + :header-rows: 1 + :widths: 20 25 55 + + * - Class + - Algorithm + - Best for + * - :py:class:`SlidingMedian` + - Auto-dispatcher + - All cases — picks one of the three below + * - :py:class:`FlatMedian` + - Sorted ``std::vector`` + - Small windows (w ≤ 600) or NaN-heavy data + * - :py:class:`MultisetMedian` + - ``std::multiset`` + tracked iterator + - Medium windows (601–2 000), clean data + * - :py:class:`TwoHeapMedian` + - Two heaps + lazy deletion + - Large windows (w > 2 000) or NaN-heavy data + +.. py:class:: SlidingMedian(window_size, expect_nan=False) + + Rolling median auto-dispatcher. Selects the fastest of + :py:class:`FlatMedian`, :py:class:`MultisetMedian`, or + :py:class:`TwoHeapMedian` based on *window_size* and *expect_nan* at + construction time. + + Dispatch thresholds (windows > 2 000 always use ``TwoHeapMedian``): + + .. list-table:: + :header-rows: 1 + :widths: 18 20 22 22 + + * - ``expect_nan`` + - w ≤ 600 + - 601–1 500 + - 1 501–2 000 + * - ``False`` + - ``FlatMedian`` + - ``MultisetMedian`` + - ``MultisetMedian`` + * - ``True`` + - ``FlatMedian`` + - ``FlatMedian`` + - ``TwoHeapMedian`` + + :param window_size: int + :param expect_nan: bool — hint that input contains many NaN values + (default ``False``) + + .. py:method:: update(value: float) + .. py:method:: get_median() -> float + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray + + .. code-block:: python + + import numpy as np + import robust_rolling_core as rrc + + x = np.array([1.0, 3.0, 2.0, 5.0, 4.0]) + + rrc.SlidingMedian(3).process_batch(x) + # array([1., 2., 2., 3., 4.]) + + # NaN-heavy data — use expect_nan=True for large windows + rrc.SlidingMedian(700, expect_nan=True).process_batch(x) + +---- + +.. py:class:: FlatMedian(window_size) + + Rolling median — sorted ``std::vector`` with binary-search insertion and + eviction. O(w) insert/evict but cache-friendly; fastest for small + windows (w ≤ 600) and for NaN-heavy streams where iterator tracking would + degrade. + + :param window_size: int + + .. py:method:: update(value: float) + .. py:method:: get_median() -> float + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray ---- .. py:class:: MultisetMedian(window_size) - Rolling median — ``std::multiset`` with tracked iterator, O(log n). - Even-size windows return the average of the two middle elements. + Rolling median — ``std::multiset`` (red-black tree) with a tracked + ``mid_`` iterator. O(log w) insert/evict; fastest for medium windows + (601–2 000) on clean data. Degrades significantly on NaN-heavy streams + because iterator repositioning must scan the tree. + + :param window_size: int + + .. py:method:: update(value: float) + .. py:method:: get_median() -> float + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray + +---- + +.. py:class:: TwoHeapMedian(window_size) + + Rolling median — two heaps (max-heap for lower half, min-heap for upper + half) with lazy deletion via a ``pending`` map. O(log w) amortised; + memory layout is less cache-friendly than ``FlatMedian`` but unaffected + by NaN density, making it the best choice for large windows or NaN-heavy + data. :param window_size: int .. py:method:: update(value: float) .. py:method:: get_median() -> float - .. py:method:: process_batch(x: numpy.ndarray) -> numpy.ndarray + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray ---- @@ -60,11 +191,11 @@ time) or to read multiple statistics from a single pass. .. py:method:: update(value: float) .. py:method:: get_variance() -> float - .. py:method:: process_batch(x: numpy.ndarray) -> numpy.ndarray + .. py:method:: process_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray .. code-block:: python - sw = SlidingWelford(3) + sw = rrc.SlidingWelford(3) for v in [1., 2., 3., 4.]: sw.update(v) sw.get_variance() # 1.0 @@ -85,18 +216,26 @@ time) or to read multiple statistics from a single pass. .. py:method:: get_mean() -> float .. py:method:: get_skewness() -> float .. py:method:: get_kurtosis() -> float - .. py:method:: process_mean_batch(x: numpy.ndarray) -> numpy.ndarray - .. py:method:: process_skewness_batch(x: numpy.ndarray) -> numpy.ndarray - .. py:method:: process_kurtosis_batch(x: numpy.ndarray) -> numpy.ndarray + .. py:method:: process_mean_batch(x: numpy.ndarray, min_periods: int) -> numpy.ndarray + .. py:method:: process_skewness_batch(x: numpy.ndarray, min_periods: int) -> numpy.ndarray + .. py:method:: process_kurtosis_batch(x: numpy.ndarray, min_periods: int) -> numpy.ndarray + + Note: ``min_periods`` is a required positional argument in the + ``process_*_batch`` methods (no default). .. code-block:: python - sm = SlidingMoments(4) + sm = rrc.SlidingMoments(4) for v in [1., 2., 3., 4.]: sm.update(v) sm.get_mean(), sm.get_skewness(), sm.get_kurtosis() # (2.5, 0.0, -1.2) + # Batch usage + x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + rrc.SlidingMoments(3).process_skewness_batch(x, 0) + # [nan, nan, 0., 0., 0.] + ---- .. py:class:: SlidingCovariance(window_size) @@ -116,8 +255,36 @@ time) or to read multiple statistics from a single pass. .. code-block:: python - sc = SlidingCovariance(3) - for x, y in [(1,2),(2,4),(3,6)]: + sc = rrc.SlidingCovariance(3) + for x, y in [(1, 2), (2, 4), (3, 6)]: sc.update(x, y) sc.get_covariance(), sc.get_correlation() # (2.0, 1.0) + +---- + +.. py:class:: SlidingMomentsPrefix(window_size) + + Stateless batch engine for variance, skewness, and kurtosis using prefix + sums of raw moments. Faster than :py:class:`SlidingMoments` but + susceptible to catastrophic cancellation for data with large values and + small variance. Use when numerical precision is not critical. + + :param window_size: int + + .. py:method:: variance_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray + .. py:method:: skewness_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray + .. py:method:: kurtosis_batch(x: numpy.ndarray, min_periods: int = 0) -> numpy.ndarray + + .. code-block:: python + + x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + + rrc.SlidingMomentsPrefix(3).variance_batch(x) + # [nan, 0.5, 1., 1., 1.] + + rrc.SlidingMomentsPrefix(3).skewness_batch(x) + # [nan, nan, 0., 0., 0.] + + rrc.SlidingMomentsPrefix(4).kurtosis_batch(x) + # [nan, nan, nan, -1.2, -1.2] diff --git a/docs/python/r_api/index.rst b/docs/python/r_api/index.rst index d03e34e..2893064 100644 --- a/docs/python/r_api/index.rst +++ b/docs/python/r_api/index.rst @@ -107,14 +107,34 @@ parameter compatible with *pandas* semantics. ---- -.. function:: rolling_median(x, window_size, min_periods = window_size) +.. function:: rolling_median(x, window_size, min_periods = window_size, expect_nan = FALSE) - *Rolling Median* — Computes the rolling median over a numeric vector using an ordered multiset - with a tracked median iterator. Time complexity: O(log n) per element. + *Rolling Median* — Automatically selects the fastest algorithm based on + window size and expected NaN density. + + Dispatch rules (windows > 2 000 always use ``TwoHeapMedian``): + + .. list-table:: + :header-rows: 1 + :widths: 20 25 55 + + * - ``expect_nan`` + - w ≤ 600 + - 601–2 000 + * - ``FALSE`` + - ``FlatMedian`` + - ``MultisetMedian`` + * - ``TRUE`` + - ``FlatMedian`` + - ``FlatMedian`` (601–1 500) / ``TwoHeapMedian`` (1 501–2 000) :param x: A numeric vector of type double. :param window_size: Positive integer window length. - :param min_periods: Minimum number of non-``NA`` observations required in a window to return a result. Defaults to ``window_size``. + :param min_periods: Minimum number of non-``NA`` observations required in + a window to return a result. Defaults to ``window_size``. + :param expect_nan: Logical. If ``TRUE``, switches dispatch thresholds to + NaN-robust paths to avoid ``MultisetMedian``'s O(n) iterator-shift + degradation on NaN-heavy streams. Defaults to ``FALSE``. :returns: A numeric vector with rolling median values. .. rubric:: Example @@ -124,6 +144,9 @@ parameter compatible with *pandas* semantics. x <- as.double(c(1, 3, 2, 5, 4)) rolling_median(x, 3L) + # NaN-heavy data — use expect_nan = TRUE for large windows + rolling_median(x, 1000L, expect_nan = TRUE) + ---- diff --git a/include/FlatMedian.hpp b/include/FlatMedian.hpp new file mode 100644 index 0000000..a904988 --- /dev/null +++ b/include/FlatMedian.hpp @@ -0,0 +1,23 @@ +#pragma once +#include "RollingMetric.hpp" +#include +#include +#include + +class FlatMedian : public RollingMetric { + friend class RollingMetric; + +public: + explicit FlatMedian(std::size_t window_size); + double get_median() const; + +private: + void update_impl(double x); + void skip_impl(); + double get_value_impl() const; + std::size_t current_size_impl() const; + + std::size_t window_size_; + std::vector sorted_; + std::queue history_; +}; diff --git a/include/SlidingMedian.hpp b/include/SlidingMedian.hpp new file mode 100644 index 0000000..46d3bfe --- /dev/null +++ b/include/SlidingMedian.hpp @@ -0,0 +1,26 @@ +#pragma once +#include "FlatMedian.hpp" +#include "MultisetMedian.hpp" +#include "RollingMetric.hpp" +#include "TwoHeapMedian.hpp" +#include + +class SlidingMedian : public RollingMetric { + friend class RollingMetric; + +public: + static constexpr std::size_t kFlatClean = 600; + static constexpr std::size_t kFlatNaN = 1500; + static constexpr std::size_t kHeapClean = 2000; + + explicit SlidingMedian(std::size_t window_size, bool expect_nan = false); + double get_median() const; + +private: + void update_impl(double x); + void skip_impl(); + double get_value_impl() const; + std::size_t current_size_impl() const; + + std::variant impl_; +}; diff --git a/include/TwoHeapMedian.hpp b/include/TwoHeapMedian.hpp new file mode 100644 index 0000000..c678ab5 --- /dev/null +++ b/include/TwoHeapMedian.hpp @@ -0,0 +1,32 @@ +#pragma once +#include "RollingMetric.hpp" +#include +#include +#include + +class TwoHeapMedian : public RollingMetric { + friend class RollingMetric; + +public: + explicit TwoHeapMedian(std::size_t window_size); + + double get_median() const; + +private: + void update_impl(double new_value); + void skip_impl(); + double get_value_impl() const; + std::size_t current_size_impl() const; + void evict(double oldest); + void clean_lower() const; + void clean_upper() const; + void insert_value(double x); + + std::size_t window_size_; + mutable std::priority_queue lower; + mutable std::priority_queue, std::greater> + upper; + mutable std::unordered_map pending; + mutable std::size_t lower_eff{0}, upper_eff{0}; + std::queue history_; +}; diff --git a/man/rolling_median.Rd b/man/rolling_median.Rd index 59133b4..7a80e78 100644 --- a/man/rolling_median.Rd +++ b/man/rolling_median.Rd @@ -4,7 +4,7 @@ \alias{rolling_median} \title{Rolling Median} \usage{ -rolling_median(x, window_size, min_periods = window_size) +rolling_median(x, window_size, min_periods = window_size, expect_nan = FALSE) } \arguments{ \item{x}{A numeric vector of type double.} @@ -13,13 +13,21 @@ rolling_median(x, window_size, min_periods = window_size) \item{min_periods}{Minimum number of non-\code{NA} observations required in a window to return a result. Defaults to \code{window_size}.} + +\item{expect_nan}{Logical. If \code{TRUE}, hints that the input may contain +many \code{NA} values; switches dispatch thresholds to NaN-robust paths +(avoids MultisetMedian's iterator-shift degradation). Defaults to +\code{FALSE}.} } \value{ A numeric vector with rolling median values. } \description{ -Computes the rolling median over a numeric vector using an ordered multiset -with a tracked median iterator. Time complexity: O(log n) per element. +Computes the rolling median over a numeric vector. Automatically selects the +fastest algorithm based on window size and expected NaN density: FlatMedian +(sorted vector) for small windows, MultisetMedian (red-black tree) for +medium windows, or TwoHeapMedian (lazy-deletion heaps) for large windows or +NaN-heavy data. } \examples{ x <- as.double(c(1, 3, 2, 5, 4)) diff --git a/py_package/CMakeLists.txt b/py_package/CMakeLists.txt index 66dd5ac..4ed2068 100644 --- a/py_package/CMakeLists.txt +++ b/py_package/CMakeLists.txt @@ -1,6 +1,9 @@ cmake_minimum_required(VERSION 3.15) project(robust_rolling_python LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + find_package(Python REQUIRED COMPONENTS Interpreter Development.Module) find_package(pybind11 CONFIG REQUIRED) @@ -9,14 +12,17 @@ pybind11_add_module(robust_rolling_core py_wrapper.cpp) target_include_directories(robust_rolling_core PRIVATE ../include) target_sources(robust_rolling_core PRIVATE + ../src_core/FlatMedian.cpp ../src_core/MonotonicMax.cpp ../src_core/MonotonicMin.cpp ../src_core/MultisetMedian.cpp ../src_core/SlidingCovariance.cpp ../src_core/SlidingMean.cpp + ../src_core/SlidingMedian.cpp ../src_core/SlidingMoments.cpp ../src_core/SlidingMomentsPrefix.cpp ../src_core/SlidingWelfordRing.cpp + ../src_core/TwoHeapMedian.cpp ) if(MSVC) diff --git a/py_package/py_wrapper.cpp b/py_package/py_wrapper.cpp index 595eef1..8eab70a 100644 --- a/py_package/py_wrapper.cpp +++ b/py_package/py_wrapper.cpp @@ -1,11 +1,14 @@ +#include "../include/FlatMedian.hpp" #include "../include/MonotonicMax.hpp" #include "../include/MonotonicMin.hpp" #include "../include/MultisetMedian.hpp" #include "../include/SlidingCovariance.hpp" #include "../include/SlidingMean.hpp" +#include "../include/SlidingMedian.hpp" #include "../include/SlidingMoments.hpp" #include "../include/SlidingMomentsPrefix.hpp" #include "../include/SlidingWelfordRing.hpp" +#include "../include/TwoHeapMedian.hpp" #include #include #include @@ -99,8 +102,24 @@ PYBIND11_MODULE(robust_rolling_core, m) { }, py::arg("input"), py::arg("min_periods") = 0); + py::class_(m, "FlatMedian") + .def(py::init(), py::arg("window_size")) + .def("update", &FlatMedian::update) + .def("get_median", &FlatMedian::get_median) + .def( + "process_batch", + [](FlatMedian &self, + py::array_t + input, + std::size_t min_periods) { + return process_batch_generic( + self, input, [](FlatMedian &m) { return m.get_median(); }, + min_periods); + }, + py::arg("input"), py::arg("min_periods") = 0); + py::class_(m, "MultisetMedian") - .def(py::init()) + .def(py::init(), py::arg("window_size")) .def("update", &MultisetMedian::update) .def("get_median", &MultisetMedian::get_median) .def( @@ -115,6 +134,39 @@ PYBIND11_MODULE(robust_rolling_core, m) { }, py::arg("input"), py::arg("min_periods") = 0); + py::class_(m, "TwoHeapMedian") + .def(py::init(), py::arg("window_size")) + .def("update", &TwoHeapMedian::update) + .def("get_median", &TwoHeapMedian::get_median) + .def( + "process_batch", + [](TwoHeapMedian &self, + py::array_t + input, + std::size_t min_periods) { + return process_batch_generic( + self, input, [](TwoHeapMedian &m) { return m.get_median(); }, + min_periods); + }, + py::arg("input"), py::arg("min_periods") = 0); + + py::class_(m, "SlidingMedian") + .def(py::init(), py::arg("window_size"), + py::arg("expect_nan") = false) + .def("update", &SlidingMedian::update) + .def("get_median", &SlidingMedian::get_median) + .def( + "process_batch", + [](SlidingMedian &self, + py::array_t + input, + std::size_t min_periods) { + return process_batch_generic( + self, input, [](SlidingMedian &m) { return m.get_median(); }, + min_periods); + }, + py::arg("input"), py::arg("min_periods") = 0); + py::class_(m, "SlidingMean") .def(py::init()) .def("update", &SlidingMean::update) diff --git a/py_package/robustrolling/__init__.py b/py_package/robustrolling/__init__.py index f821da8..d4c71e2 100644 --- a/py_package/robustrolling/__init__.py +++ b/py_package/robustrolling/__init__.py @@ -3,14 +3,17 @@ import numpy as np from robust_rolling_core import ( + FlatMedian, MonotonicMax, MonotonicMin, MultisetMedian, SlidingCovariance, SlidingMean, + SlidingMedian, SlidingMoments, SlidingMomentsPrefix, SlidingWelford, + TwoHeapMedian, ) try: @@ -20,12 +23,15 @@ _HAS_PANDAS = False __all__ = [ + "FlatMedian", "MonotonicMax", "MonotonicMin", "MultisetMedian", "SlidingCovariance", + "SlidingMedian", "SlidingMoments", "SlidingWelford", + "TwoHeapMedian", "rolling_max", "rolling_min", "rolling_variance", @@ -171,12 +177,15 @@ def rolling_variance(x, window_size: int, min_periods: int | None = None, return _wrap(result, x) -def rolling_median(x, window_size: int, min_periods: int | None = None): +def rolling_median(x, window_size: int, min_periods: int | None = None, + expect_nan: bool = False): """ Compute the rolling median over a sliding window. - Uses a ``std::multiset`` with a tracked median iterator. - Time complexity: O(log n) per element. + Automatically selects the fastest algorithm for the given window size and + NaN density. Use ``expect_nan=True`` if the input is expected to contain + a significant fraction of NaN values (avoids MultisetMedian's O(n) + iterator-shift degradation at large windows). Parameters ---------- @@ -187,6 +196,9 @@ def rolling_median(x, window_size: int, min_periods: int | None = None): min_periods : int, optional Minimum number of non-NaN observations required to return a result. Defaults to ``window_size`` (pandas-compatible semantics). + expect_nan : bool, optional + Hint that the input may contain many NaN values. Switches the + dispatch thresholds to NaN-robust paths. Defaults to ``False``. Returns ------- @@ -204,7 +216,7 @@ def rolling_median(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = MultisetMedian(window_size).process_batch(arr, mp) + result = SlidingMedian(window_size, expect_nan).process_batch(arr, mp) return _wrap(result, x) diff --git a/py_package/tests/test_engine.py b/py_package/tests/test_engine.py index 51d9202..8ef23c9 100644 --- a/py_package/tests/test_engine.py +++ b/py_package/tests/test_engine.py @@ -1,4 +1,4 @@ -"""Tests for the C++ engine layer (rrc.*) — stateful rolling objects.""" +"""Tests for the C++ engine layer (rrc.*) - stateful rolling objects.""" import math import numpy as np @@ -173,86 +173,132 @@ def test_rejects_2d_input(self): rrc.MonotonicMin(2).process_batch(np.ones((2, 3))) -# ── MultisetMedian ───────────────────────────────────────────────────────────── +# ── SlidingMedian ────────────────────────────────────────────────────────────── -class TestMultisetMedian: +class TestSlidingMedian: def test_known_values_odd_window(self): x = np.array([1.0, 3.0, 2.0, 5.0, 4.0]) - np.testing.assert_allclose(rrc.MultisetMedian(3).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(3).process_batch(x), median_ref(x, 3), rtol=1e-12) def test_known_values_even_window(self): x = np.array([1.0, 3.0, 2.0, 4.0]) - np.testing.assert_allclose(rrc.MultisetMedian(4).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(4).process_batch(x), median_ref(x, 4), rtol=1e-12) def test_window_size_1_identity(self): x = np.array([-1.0, 0.0, 10.0, 2.0]) - np.testing.assert_allclose(rrc.MultisetMedian(1).process_batch(x), x) + np.testing.assert_allclose(rrc.SlidingMedian(1).process_batch(x), x) - def test_window_size_2_regression(self): - # Regression: window_size=2 caused segfault before fix + def test_window_size_2(self): x = np.array([3.0, 1.0, 2.0, 5.0, 4.0]) - np.testing.assert_allclose(rrc.MultisetMedian(2).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(2).process_batch(x), median_ref(x, 2), rtol=1e-12) - def test_even_window_descending_fill_regression(self): - # Regression: even window filled descending caused wrong mid_ position + def test_even_window_descending_fill(self): x = np.array([4.0, 3.0, 2.0, 1.0]) - np.testing.assert_allclose(rrc.MultisetMedian(4).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(4).process_batch(x), median_ref(x, 4), rtol=1e-12) @pytest.mark.parametrize("k", [2, 3, 5]) def test_against_naive_reference(self, k): x = np.array([-2.0, 6.0, 1.0, -8.0, 0.0, 8.0, -1.0]) - np.testing.assert_allclose(rrc.MultisetMedian(k).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(k).process_batch(x), median_ref(x, k), rtol=1e-12) def test_large_array_against_reference(self): np.random.seed(42) x = np.random.randn(1000) k = 15 - np.testing.assert_allclose(rrc.MultisetMedian(k).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(k).process_batch(x), median_ref(x, k), rtol=1e-10) def test_element_entering_equals_leaving(self): x = np.array([1.0, 2.0, 3.0, 1.0, 2.0, 3.0]) - np.testing.assert_allclose(rrc.MultisetMedian(3).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(3).process_batch(x), median_ref(x, 3), rtol=1e-12) def test_window_larger_than_array(self): x = np.array([3.0, 1.0, 4.0, 1.0]) - np.testing.assert_allclose(rrc.MultisetMedian(20).process_batch(x), + np.testing.assert_allclose(rrc.SlidingMedian(20).process_batch(x), median_ref(x, 20), rtol=1e-12) def test_nan_does_not_contribute(self): # window=3, [1, 2, NaN, 4]: at NaN window=[1,2,NaN] -> median=1.5 x = np.array([1.0, 2.0, np.nan, 4.0]) - out = rrc.MultisetMedian(3).process_batch(x) + out = rrc.SlidingMedian(3).process_batch(x) assert np.isclose(out[2], 1.5, atol=1e-12) assert np.isclose(out[3], 3.0, atol=1e-12) # window=[2,NaN,4] -> median([2,4])=3 + def test_expect_nan_flag_same_results(self): + np.random.seed(7) + x = np.random.randn(200) + x[np.random.rand(200) < 0.15] = np.nan + k = 20 + out_default = rrc.SlidingMedian(k, False).process_batch(x) + out_nan = rrc.SlidingMedian(k, True).process_batch(x) + np.testing.assert_allclose(out_default, out_nan, rtol=1e-12, equal_nan=True) + def test_empty_input(self): - out = rrc.MultisetMedian(3).process_batch(np.array([])) + out = rrc.SlidingMedian(3).process_batch(np.array([])) assert len(out) == 0 and out.dtype == np.float64 def test_rejects_zero_window(self): with pytest.raises(ValueError, match="Window length must be greater than 0"): - rrc.MultisetMedian(0) + rrc.SlidingMedian(0) def test_rejects_none_window(self): with pytest.raises(TypeError): - rrc.MultisetMedian(None) + rrc.SlidingMedian(None) def test_rejects_2d_input(self): with pytest.raises(RuntimeError, match="Input must be 1D array"): - rrc.MultisetMedian(2).process_batch(np.ones((2, 3))) + rrc.SlidingMedian(2).process_batch(np.ones((2, 3))) def test_integer_input_converted_to_float64(self): - out = rrc.MultisetMedian(2).process_batch(np.array([1, 2, 3], dtype=np.int32)) + out = rrc.SlidingMedian(2).process_batch(np.array([1, 2, 3], dtype=np.int32)) assert out.dtype == np.float64 + @pytest.mark.parametrize("k", [100, 700, 2500]) + def test_dispatch_thresholds_correct_result(self, k): + np.random.seed(k) + x = np.random.randn(k * 3) + np.testing.assert_allclose(rrc.SlidingMedian(k).process_batch(x), + median_ref(x, k), rtol=1e-10) + + +# ── MultisetMedian (raw algorithm, advanced use) ─────────────────────────────── + +class TestMultisetMedian: + + def test_known_values_odd_window(self): + x = np.array([1.0, 3.0, 2.0, 5.0, 4.0]) + np.testing.assert_allclose(rrc.MultisetMedian(3).process_batch(x), + median_ref(x, 3), rtol=1e-12) + + def test_window_size_2_regression(self): + # Regression: window_size=2 caused segfault before fix + x = np.array([3.0, 1.0, 2.0, 5.0, 4.0]) + np.testing.assert_allclose(rrc.MultisetMedian(2).process_batch(x), + median_ref(x, 2), rtol=1e-12) + + def test_even_window_descending_fill_regression(self): + # Regression: even window filled descending caused wrong mid_ position + x = np.array([4.0, 3.0, 2.0, 1.0]) + np.testing.assert_allclose(rrc.MultisetMedian(4).process_batch(x), + median_ref(x, 4), rtol=1e-12) + + @pytest.mark.parametrize("k", [2, 3, 5]) + def test_against_naive_reference(self, k): + x = np.array([-2.0, 6.0, 1.0, -8.0, 0.0, 8.0, -1.0]) + np.testing.assert_allclose(rrc.MultisetMedian(k).process_batch(x), + median_ref(x, k), rtol=1e-12) + + def test_rejects_zero_window(self): + with pytest.raises(ValueError, match="Window length must be greater than 0"): + rrc.MultisetMedian(0) + # ── SlidingMean ──────────────────────────────────────────────────────────────── diff --git a/py_package/tests/test_robustness.py b/py_package/tests/test_robustness.py index 22aed20..f1d7847 100644 --- a/py_package/tests/test_robustness.py +++ b/py_package/tests/test_robustness.py @@ -1,4 +1,4 @@ -"""Memory layout, dtype, and fuzz tests — cross-cutting robustness.""" +"""Memory layout, dtype, and fuzz tests - cross-cutting robustness.""" import numpy as np import pandas as pd import pytest @@ -8,8 +8,8 @@ from conftest import nan_allclose -_ENGINE_CLASSES = [rrc.MonotonicMax, rrc.MonotonicMin, rrc.MultisetMedian, rrc.SlidingWelford] -_ENGINE_IDS = ["MonotonicMax", "MonotonicMin", "MultisetMedian", "SlidingWelford"] +_ENGINE_CLASSES = [rrc.MonotonicMax, rrc.MonotonicMin, rrc.SlidingMedian, rrc.SlidingWelford] +_ENGINE_IDS = ["MonotonicMax", "MonotonicMin", "SlidingMedian", "SlidingWelford"] _SIMPLE_FNS = [rr.rolling_max, rr.rolling_min, rr.rolling_median, rr.rolling_variance, rr.rolling_mean] _SIMPLE_IDS = ["max", "min", "median", "variance", "mean"] @@ -108,4 +108,4 @@ def test_moments_random_nan_matches_pandas(self, fn, pd_fn): def test_stress_many_instances_no_crash(self): x = np.random.randn(100).astype(np.float64) for _ in range(2000): - rrc.MultisetMedian(10).process_batch(x) + rrc.SlidingMedian(10).process_batch(x) diff --git a/src/r_wrapper.cpp b/src/r_wrapper.cpp index fc8177d..b203726 100644 --- a/src/r_wrapper.cpp +++ b/src/r_wrapper.cpp @@ -1,8 +1,8 @@ #include "MonotonicMax.hpp" #include "MonotonicMin.hpp" -#include "MultisetMedian.hpp" #include "SlidingCovariance.hpp" #include "SlidingMean.hpp" +#include "SlidingMedian.hpp" #include "SlidingMoments.hpp" #include "SlidingMomentsPrefix.hpp" #include "SlidingWelfordRing.hpp" @@ -96,7 +96,8 @@ SEXP rolling_min_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { return r_result; } -SEXP rolling_median_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { +SEXP rolling_median_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods, + SEXP r_expect_nan) { if (!Rf_isReal(r_data)) { Rf_error("Input data must be a numeric vector."); } @@ -105,15 +106,16 @@ SEXP rolling_median_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); std::size_t mp = read_min_periods(r_min_periods); + bool expect_nan = Rf_asLogical(r_expect_nan) == TRUE; - MultisetMedian multiset_median(k); + SlidingMedian sliding_median(k, expect_nan); SEXP r_result; PROTECT(r_result = Rf_allocVector(REALSXP, n)); double *output_ptr = REAL(r_result); - multiset_median.process_batch(input_ptr, static_cast(n), - output_ptr, mp); + sliding_median.process_batch(input_ptr, static_cast(n), + output_ptr, mp); UNPROTECT(1); return r_result; @@ -257,13 +259,14 @@ SEXP rolling_variance_fast_c(SEXP r_data, SEXP r_window_size, SlidingMomentsPrefix smp(k); SEXP r_result; PROTECT(r_result = Rf_allocVector(REALSXP, n)); - smp.variance_batch(input_ptr, static_cast(n), REAL(r_result), mp); + smp.variance_batch(input_ptr, static_cast(n), REAL(r_result), + mp); UNPROTECT(1); return r_result; } SEXP rolling_skewness_fast_c(SEXP r_data, SEXP r_window_size, - SEXP r_min_periods) { + SEXP r_min_periods) { if (!Rf_isReal(r_data)) Rf_error("Input data must be a numeric vector."); double *input_ptr = REAL(r_data); @@ -273,13 +276,14 @@ SEXP rolling_skewness_fast_c(SEXP r_data, SEXP r_window_size, SlidingMomentsPrefix smp(k); SEXP r_result; PROTECT(r_result = Rf_allocVector(REALSXP, n)); - smp.skewness_batch(input_ptr, static_cast(n), REAL(r_result), mp); + smp.skewness_batch(input_ptr, static_cast(n), REAL(r_result), + mp); UNPROTECT(1); return r_result; } SEXP rolling_kurtosis_fast_c(SEXP r_data, SEXP r_window_size, - SEXP r_min_periods) { + SEXP r_min_periods) { if (!Rf_isReal(r_data)) Rf_error("Input data must be a numeric vector."); double *input_ptr = REAL(r_data); @@ -289,7 +293,8 @@ SEXP rolling_kurtosis_fast_c(SEXP r_data, SEXP r_window_size, SlidingMomentsPrefix smp(k); SEXP r_result; PROTECT(r_result = Rf_allocVector(REALSXP, n)); - smp.kurtosis_batch(input_ptr, static_cast(n), REAL(r_result), mp); + smp.kurtosis_batch(input_ptr, static_cast(n), REAL(r_result), + mp); UNPROTECT(1); return r_result; } @@ -298,13 +303,16 @@ static const R_CallMethodDef CallEntries[] = { {"rolling_variance_c", reinterpret_cast(&rolling_variance_c), 3}, {"rolling_max_c", reinterpret_cast(&rolling_max_c), 3}, {"rolling_min_c", reinterpret_cast(&rolling_min_c), 3}, - {"rolling_median_c", reinterpret_cast(&rolling_median_c), 3}, + {"rolling_median_c", reinterpret_cast(&rolling_median_c), 4}, {"rolling_mean_c", reinterpret_cast(&rolling_mean_c), 4}, {"rolling_skewness_c", reinterpret_cast(&rolling_skewness_c), 3}, {"rolling_kurtosis_c", reinterpret_cast(&rolling_kurtosis_c), 3}, - {"rolling_variance_fast_c", reinterpret_cast(&rolling_variance_fast_c), 3}, - {"rolling_skewness_fast_c", reinterpret_cast(&rolling_skewness_fast_c), 3}, - {"rolling_kurtosis_fast_c", reinterpret_cast(&rolling_kurtosis_fast_c), 3}, + {"rolling_variance_fast_c", + reinterpret_cast(&rolling_variance_fast_c), 3}, + {"rolling_skewness_fast_c", + reinterpret_cast(&rolling_skewness_fast_c), 3}, + {"rolling_kurtosis_fast_c", + reinterpret_cast(&rolling_kurtosis_fast_c), 3}, {"rolling_cov_c", reinterpret_cast(&rolling_cov_c), 4}, {"rolling_cor_c", reinterpret_cast(&rolling_cor_c), 4}, {nullptr, nullptr, 0}}; diff --git a/src_core/FlatMedian.cpp b/src_core/FlatMedian.cpp new file mode 100644 index 0000000..1893e61 --- /dev/null +++ b/src_core/FlatMedian.cpp @@ -0,0 +1,47 @@ +#include "FlatMedian.hpp" +#include +#include + +FlatMedian::FlatMedian(std::size_t size) : window_size_(size) { + if (size == 0) + throw std::invalid_argument("Window length must be greater than 0"); + sorted_.reserve(size); +} + +std::size_t FlatMedian::current_size_impl() const { return sorted_.size(); } + +double FlatMedian::get_value_impl() const { + if (sorted_.empty()) + return std::numeric_limits::quiet_NaN(); + const std::size_t n = sorted_.size(); + return (n % 2 == 1) ? sorted_[n / 2] + : (sorted_[n / 2 - 1] + sorted_[n / 2]) / 2.0; +} + +double FlatMedian::get_median() const { return get_value(); } + +void FlatMedian::update_impl(double x) { + if (history_.size() >= window_size_) { + double oldest = history_.front(); + history_.pop(); + if (!std::isnan(oldest)) { + auto it = std::lower_bound(sorted_.begin(), sorted_.end(), oldest); + sorted_.erase(it); + } + } + auto pos = std::lower_bound(sorted_.begin(), sorted_.end(), x); + sorted_.insert(pos, x); + history_.push(x); +} + +void FlatMedian::skip_impl() { + if (history_.size() >= window_size_) { + double oldest = history_.front(); + history_.pop(); + if (!std::isnan(oldest)) { + auto it = std::lower_bound(sorted_.begin(), sorted_.end(), oldest); + sorted_.erase(it); + } + } + history_.push(std::numeric_limits::quiet_NaN()); +} diff --git a/src_core/MultisetMedian.cpp b/src_core/MultisetMedian.cpp index 7e1b121..e6816e0 100644 --- a/src_core/MultisetMedian.cpp +++ b/src_core/MultisetMedian.cpp @@ -33,7 +33,7 @@ double MultisetMedian::get_median() const { return get_value(); } -// Recompute mid_ from scratch — used after size-changing operations. +// Recompute mid_ from scratch - used after size-changing operations. void MultisetMedian::recompute_mid() { mid_ = window_data_.begin(); std::advance(mid_, window_data_.size() / 2); @@ -63,7 +63,7 @@ void MultisetMedian::update_impl(double new_value) { window_data_.insert(new_value); if (!std::isnan(oldest_val)) { - // Normal evict+insert: size stays same — use incremental mid_ adjustment. + // Normal evict+insert: size stays same use incremental mid_ adjustment. if (new_value < *mid_) { mid_--; } diff --git a/src_core/SlidingMedian.cpp b/src_core/SlidingMedian.cpp new file mode 100644 index 0000000..e958f2d --- /dev/null +++ b/src_core/SlidingMedian.cpp @@ -0,0 +1,40 @@ +#include "SlidingMedian.hpp" + +using Impl = std::variant; + +static Impl make_impl(std::size_t size, bool expect_nan) { + if (expect_nan) { + if (size <= SlidingMedian::kFlatNaN) + return Impl(std::in_place_index<0>, size); + return Impl(std::in_place_index<2>, size); + } + if (size <= SlidingMedian::kFlatClean) + return Impl(std::in_place_index<0>, size); + if (size <= SlidingMedian::kHeapClean) + return Impl(std::in_place_index<1>, size); + return Impl(std::in_place_index<2>, size); +} + +SlidingMedian::SlidingMedian(std::size_t size, bool expect_nan) + : impl_(make_impl(size, expect_nan)) { +} + +double SlidingMedian::get_median() const { + return get_value(); +} + +void SlidingMedian::update_impl(double x) { + std::visit([x](auto &m) { m.update(x); }, impl_); +} + +void SlidingMedian::skip_impl() { + std::visit([](auto &m) { m.skip(); }, impl_); +} + +double SlidingMedian::get_value_impl() const { + return std::visit([](const auto &m) { return m.get_value(); }, impl_); +} + +std::size_t SlidingMedian::current_size_impl() const { + return std::visit([](const auto &m) { return m.current_size(); }, impl_); +} diff --git a/src_core/TwoHeapMedian.cpp b/src_core/TwoHeapMedian.cpp new file mode 100644 index 0000000..3c48b64 --- /dev/null +++ b/src_core/TwoHeapMedian.cpp @@ -0,0 +1,117 @@ +#include "TwoHeapMedian.hpp" +#include +#include + +TwoHeapMedian::TwoHeapMedian(std::size_t size) : window_size_(size) { + if (size == 0) + throw std::invalid_argument("Window length must be greater than 0"); +} + +void TwoHeapMedian::clean_lower() const { + while (!lower.empty()) { + auto it = pending.find(lower.top()); + if (it == pending.end()) + break; + lower.pop(); + if (--it->second == 0) + pending.erase(it); + } +} + +void TwoHeapMedian::clean_upper() const { + while (!upper.empty()) { + auto it = pending.find(upper.top()); + if (it == pending.end()) + break; + upper.pop(); + if (--it->second == 0) + pending.erase(it); + } +} + +std::size_t TwoHeapMedian::current_size_impl() const { + return lower_eff + upper_eff; +} + +double TwoHeapMedian::get_value_impl() const { + if (lower_eff == 0 && upper_eff == 0) + return std::numeric_limits::quiet_NaN(); + clean_lower(); + if ((lower_eff + upper_eff) % 2 == 1) + return lower.top(); + clean_upper(); + return (lower.top() + upper.top()) / 2.0; +} + +void TwoHeapMedian::evict(double oldest) { + clean_lower(); + const bool in_lower = !lower.empty() && oldest <= lower.top(); + pending[oldest]++; + if (in_lower) { + lower_eff--; + if (lower_eff < upper_eff) { + clean_upper(); + lower.push(upper.top()); + upper.pop(); + lower_eff++; + upper_eff--; + } + } else { + upper_eff--; + if (lower_eff > upper_eff + 1) { + upper.push(lower.top()); + lower.pop(); + upper_eff++; + lower_eff--; + } + } +} + +void TwoHeapMedian::insert_value(double x) { + clean_lower(); + if (lower.empty() || x <= lower.top()) { + lower.push(x); + lower_eff++; + if (lower_eff > upper_eff + 1) { + upper.push(lower.top()); + lower.pop(); + lower_eff--; + upper_eff++; + } + } else { + upper.push(x); + upper_eff++; + if (lower_eff < upper_eff) { + clean_upper(); + lower.push(upper.top()); + upper.pop(); + upper_eff--; + lower_eff++; + } + } +} + +void TwoHeapMedian::update_impl(double x) { + if (history_.size() >= window_size_) { + double oldest = history_.front(); + history_.pop(); + if (!std::isnan(oldest)) + evict(oldest); + } + insert_value(x); + history_.push(x); +} + +void TwoHeapMedian::skip_impl() { + if (history_.size() >= window_size_) { + double oldest = history_.front(); + history_.pop(); + if (!std::isnan(oldest)) + evict(oldest); + } + history_.push(std::numeric_limits::quiet_NaN()); +} + +double TwoHeapMedian::get_median() const { + return get_value(); +} diff --git a/tests/test_flat_median.cxx b/tests/test_flat_median.cxx new file mode 100644 index 0000000..1a318fa --- /dev/null +++ b/tests/test_flat_median.cxx @@ -0,0 +1,107 @@ +#include "FlatMedian.hpp" + +#include +#include +#include + +TEST(FlatMedianTest, ThrowsOnZeroWindowSize) { + EXPECT_THROW(FlatMedian(0), std::invalid_argument); +} + +TEST(FlatMedianTest, InitialStateIsNaN) { + FlatMedian m(3); + EXPECT_EQ(m.current_size(), 0U); + EXPECT_TRUE(std::isnan(m.get_median())); +} + +TEST(FlatMedianTest, WarmUpAndSliding) { + FlatMedian m(3); + + m.update(10.0); + EXPECT_DOUBLE_EQ(m.get_median(), 10.0); + + m.update(20.0); + EXPECT_DOUBLE_EQ(m.get_median(), 15.0); // [10, 20] + + m.update(5.0); + EXPECT_DOUBLE_EQ(m.get_median(), 10.0); // [5, 10, 20] + + m.update(4.0); + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); // [4, 5, 20] after evicting 10 +} + +TEST(FlatMedianTest, EvenWindowMedian) { + FlatMedian m(4); + m.update(4.0); + m.update(3.0); + m.update(2.0); + m.update(1.0); + EXPECT_DOUBLE_EQ(m.get_median(), 2.5); // [1, 2, 3, 4] +} + +TEST(FlatMedianTest, WindowSize1) { + FlatMedian m(1); + m.update(5.0); + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); + m.update(3.0); + EXPECT_DOUBLE_EQ(m.get_median(), 3.0); +} + +TEST(FlatMedianTest, HandlesDuplicates) { + FlatMedian m(3); + m.update(5.0); + m.update(5.0); + m.update(5.0); + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); + + m.update(1.0); // evict 5 -> [1, 5, 5] + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); + + m.update(1.0); // evict 5 -> [1, 1, 5] + EXPECT_DOUBLE_EQ(m.get_median(), 1.0); +} + +TEST(FlatMedianTest, SkipDoesNotContributeToWindow) { + // window=3, sequence [1, 2, skip, 4]: + // after skip: window=[1,2,NaN] -> valid=[1,2], median=1.5 + // after update(4): window=[2,NaN,4] -> valid=[2,4], median=3.0 + FlatMedian m(3); + m.update(1.0); + m.update(2.0); + + m.skip(); + EXPECT_DOUBLE_EQ(m.get_median(), 1.5); + EXPECT_EQ(m.current_size(), 2U); + + m.update(4.0); + EXPECT_DOUBLE_EQ(m.get_median(), 3.0); + EXPECT_EQ(m.current_size(), 2U); +} + +TEST(FlatMedianTest, CrtpInterfaceMatchesMedian) { + FlatMedian m(3); + RollingMetric &base = m; + + EXPECT_TRUE(std::isnan(base.get_value())); + + base.update(7.0); + base.update(1.0); + base.update(4.0); // [1, 4, 7] + + EXPECT_EQ(base.current_size(), 3U); + EXPECT_DOUBLE_EQ(base.get_value(), m.get_median()); +} + +TEST(FlatMedianTest, ProcessBatch) { + FlatMedian m(3); + const double nan = std::numeric_limits::quiet_NaN(); + const double input[] = {1.0, 3.0, 2.0, nan, 5.0}; + double output[5]; + m.process_batch(input, 5, output, /*min_periods=*/2); + + EXPECT_TRUE(std::isnan(output[0])); // size=1 < 2 + EXPECT_DOUBLE_EQ(output[1], 2.0); // [1,3] -> 2 + EXPECT_DOUBLE_EQ(output[2], 2.0); // [1,2,3] -> 2 + EXPECT_DOUBLE_EQ(output[3], 2.5); // [2,3,NaN] -> valid=[2,3] -> 2.5 + EXPECT_DOUBLE_EQ(output[4], 3.5); // [2,NaN,5] -> valid=[2,5] -> 3.5 +} diff --git a/tests/test_sliding_median.cxx b/tests/test_sliding_median.cxx new file mode 100644 index 0000000..d678c53 --- /dev/null +++ b/tests/test_sliding_median.cxx @@ -0,0 +1,139 @@ +#include "FlatMedian.hpp" +#include "SlidingMedian.hpp" + +#include +#include +#include +#include + +TEST(SlidingMedianTest, ThrowsOnZeroWindowSize) { + EXPECT_THROW(SlidingMedian(0), std::invalid_argument); +} + +TEST(SlidingMedianTest, InitialStateIsNaN) { + EXPECT_TRUE(std::isnan(SlidingMedian(10).get_median())); + EXPECT_TRUE(std::isnan(SlidingMedian(1000).get_median())); + EXPECT_TRUE(std::isnan(SlidingMedian(3000).get_median())); +} + +TEST(SlidingMedianTest, DispatchBoundariesCleanData) { + // kFlatClean=600: <=600 -> Flat, 601..2000 -> Multiset, >2000 -> TwoHeap + for (std::size_t w : {600u, 601u, 2000u, 2001u}) { + SlidingMedian m(w); + m.update(1.0); + m.update(3.0); + m.update(2.0); + EXPECT_DOUBLE_EQ(m.get_median(), 2.0) << "window=" << w; + } +} + +TEST(SlidingMedianTest, DispatchBoundariesNaNData) { + // kFlatNaN=1500: <=1500 -> Flat, >1500 -> TwoHeap + for (std::size_t w : {1500u, 1501u}) { + SlidingMedian m(w, /*expect_nan=*/true); + m.update(1.0); + m.update(3.0); + m.update(2.0); + EXPECT_DOUBLE_EQ(m.get_median(), 2.0) << "window=" << w; + } +} + +TEST(SlidingMedianTest, WarmUpAndSliding) { + SlidingMedian m(3); + m.update(1.0); + m.update(2.0); + m.update(3.0); + EXPECT_DOUBLE_EQ(m.get_median(), 2.0); + + m.update(4.0); + EXPECT_DOUBLE_EQ(m.get_median(), 3.0); // evict 1 -> [2, 3, 4] +} + +TEST(SlidingMedianTest, SkipDoesNotContributeToWindow) { + SlidingMedian m(3); + m.update(1.0); + m.update(2.0); + + m.skip(); + EXPECT_DOUBLE_EQ(m.get_median(), 1.5); + EXPECT_EQ(m.current_size(), 2U); + + m.update(4.0); + EXPECT_DOUBLE_EQ(m.get_median(), 3.0); // [2, NaN, 4] -> valid=[2,4] +} + +TEST(SlidingMedianTest, CrtpInterfaceMatchesMedian) { + SlidingMedian m(3); + RollingMetric &base = m; + + base.update(7.0); + base.update(1.0); + base.update(4.0); // [1, 4, 7] + + EXPECT_EQ(base.current_size(), 3U); + EXPECT_DOUBLE_EQ(base.get_value(), m.get_median()); +} + +TEST(SlidingMedianTest, ProcessBatch) { + SlidingMedian m(3); + const double nan = std::numeric_limits::quiet_NaN(); + const double input[] = {1.0, 3.0, 2.0, nan, 5.0}; + double output[5]; + m.process_batch(input, 5, output, /*min_periods=*/2); + + EXPECT_TRUE(std::isnan(output[0])); // size=1 < 2 + EXPECT_DOUBLE_EQ(output[1], 2.0); // [1,3] + EXPECT_DOUBLE_EQ(output[2], 2.0); // [1,2,3] + EXPECT_DOUBLE_EQ(output[3], 2.5); // [2,3,NaN] -> valid=[2,3] + EXPECT_DOUBLE_EQ(output[4], 3.5); // [2,NaN,5] -> valid=[2,5] +} + +// Cross-validation: all dispatch paths must agree with the canonical implementation + +static const std::vector kSeq = {5, 3, 1, 7, 2, 9, 4, 6, 8, 0, + 5, 5, 5, 2, 9, 1, 3, 7, 4}; + +TEST(SlidingMedianTest, MatchesFlatMedian_FlatPath) { + FlatMedian ref(7); + SlidingMedian m(7); // <= kFlatClean(600) -> Flat + for (double v : kSeq) { + ref.update(v); m.update(v); + EXPECT_DOUBLE_EQ(m.get_median(), ref.get_median()); + } +} + +TEST(SlidingMedianTest, MatchesFlatMedian_MultisetPath) { + FlatMedian ref(700); + SlidingMedian m(700); // kFlatClean < 700 <= kHeapClean -> Multiset + for (double v : kSeq) { + ref.update(v); m.update(v); + EXPECT_DOUBLE_EQ(m.get_median(), ref.get_median()); + } +} + +TEST(SlidingMedianTest, MatchesFlatMedian_TwoHeapPath) { + FlatMedian ref(2500); + SlidingMedian m(2500); // > kHeapClean(2000) -> TwoHeap + for (double v : kSeq) { + ref.update(v); m.update(v); + EXPECT_DOUBLE_EQ(m.get_median(), ref.get_median()); + } +} + +TEST(SlidingMedianTest, MatchesFlatMedian_ExpectNaNFlatPath) { + FlatMedian ref(100); + SlidingMedian m(100, /*expect_nan=*/true); // <= kFlatNaN(1500) -> Flat + for (double v : kSeq) { + ref.update(v); m.update(v); + EXPECT_DOUBLE_EQ(m.get_median(), ref.get_median()); + } +} + +TEST(SlidingMedianTest, MatchesFlatMedian_ExpectNaNTwoHeapPath) { + FlatMedian ref(2000); + SlidingMedian m(2000, /*expect_nan=*/true); // > kFlatNaN(1500) -> TwoHeap + for (double v : kSeq) { + ref.update(v); m.update(v); + EXPECT_DOUBLE_EQ(m.get_median(), ref.get_median()); + } +} diff --git a/tests/test_two_heap_median.cxx b/tests/test_two_heap_median.cxx new file mode 100644 index 0000000..b197d0b --- /dev/null +++ b/tests/test_two_heap_median.cxx @@ -0,0 +1,121 @@ +#include "FlatMedian.hpp" +#include "TwoHeapMedian.hpp" + +#include +#include +#include + +TEST(TwoHeapMedianTest, ThrowsOnZeroWindowSize) { + EXPECT_THROW(TwoHeapMedian(0), std::invalid_argument); +} + +TEST(TwoHeapMedianTest, InitialStateIsNaN) { + TwoHeapMedian m(3); + EXPECT_EQ(m.current_size(), 0U); + EXPECT_TRUE(std::isnan(m.get_median())); +} + +TEST(TwoHeapMedianTest, WarmUpAndSliding) { + TwoHeapMedian m(3); + m.update(10.0); + EXPECT_DOUBLE_EQ(m.get_median(), 10.0); + + m.update(20.0); + EXPECT_DOUBLE_EQ(m.get_median(), 15.0); // [10, 20] + + m.update(5.0); + EXPECT_DOUBLE_EQ(m.get_median(), 10.0); // [5, 10, 20] + + m.update(4.0); + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); // [4, 5, 20] +} + +TEST(TwoHeapMedianTest, EvenWindowAndWindowSize2) { + TwoHeapMedian m(2); + m.update(1.0); + m.update(2.0); + EXPECT_DOUBLE_EQ(m.get_median(), 1.5); + m.update(3.0); + EXPECT_DOUBLE_EQ(m.get_median(), 2.5); +} + +TEST(TwoHeapMedianTest, WindowSize1) { + TwoHeapMedian m(1); + m.update(5.0); + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); + m.update(3.0); + EXPECT_DOUBLE_EQ(m.get_median(), 3.0); + m.update(8.0); + EXPECT_DOUBLE_EQ(m.get_median(), 8.0); +} + +TEST(TwoHeapMedianTest, HandlesDuplicatesWithLazyDeletion) { + // Multiple identical values exercise the pending-map eviction path + TwoHeapMedian m(3); + m.update(5.0); + m.update(5.0); + m.update(5.0); + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); + + m.update(3.0); + EXPECT_DOUBLE_EQ(m.get_median(), 5.0); // [3, 5, 5] + + m.update(1.0); + EXPECT_DOUBLE_EQ(m.get_median(), 3.0); // [1, 3, 5] + + m.update(2.0); + EXPECT_DOUBLE_EQ(m.get_median(), 2.0); // [1, 2, 3] +} + +TEST(TwoHeapMedianTest, SkipDoesNotContributeToWindow) { + // Same semantics as MultisetMedian / FlatMedian + TwoHeapMedian m(3); + m.update(1.0); + m.update(2.0); + + m.skip(); + EXPECT_DOUBLE_EQ(m.get_median(), 1.5); + EXPECT_EQ(m.current_size(), 2U); + + m.update(4.0); + EXPECT_DOUBLE_EQ(m.get_median(), 3.0); // [2, NaN, 4] -> valid=[2,4] + EXPECT_EQ(m.current_size(), 2U); +} + +TEST(TwoHeapMedianTest, MultipleSkips) { + TwoHeapMedian m(4); + m.update(1.0); + m.skip(); + m.skip(); + m.update(2.0); + EXPECT_DOUBLE_EQ(m.get_median(), 1.5); + EXPECT_EQ(m.current_size(), 2U); +} + +TEST(TwoHeapMedianTest, CrtpInterfaceMatchesMedian) { + TwoHeapMedian m(3); + RollingMetric &base = m; + + EXPECT_TRUE(std::isnan(base.get_value())); + + base.update(7.0); + base.update(1.0); + base.update(4.0); // [1, 4, 7] + + EXPECT_EQ(base.current_size(), 3U); + EXPECT_DOUBLE_EQ(base.get_value(), m.get_median()); +} + +TEST(TwoHeapMedianTest, AgreeWithFlatMedian) { + // Cross-check: every step must match the reference O(n) implementation + const std::vector seq = {5, 3, 1, 7, 2, 9, 4, 6, 8, 0, + 5, 5, 5, 2, 9, 1, 3, 7, 4}; + FlatMedian ref(5); + TwoHeapMedian m(5); + + for (double v : seq) { + ref.update(v); + m.update(v); + EXPECT_DOUBLE_EQ(m.get_median(), ref.get_median()); + } +}