diff --git a/.gitignore b/.gitignore index 2b72f82..e49cf44 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,9 @@ src/MultisetMedian.cpp src/SlidingWelford.cpp src/SlidingWelfordRing.cpp src/SlidingMoments.cpp +src/SlidingMean.cpp +src/SlidingCovariance.cpp +src/SlidingMomentsPrefix.cpp inst/include/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b9790c..a85b32f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ add_library(rolling_core src_core/MonotonicMin.cpp src_core/SlidingWelford.cpp src_core/SlidingWelfordRing.cpp + src_core/SlidingMean.cpp src_core/SlidingMoments.cpp src_core/SlidingCovariance.cpp ) diff --git a/R/rolling_metrics.R b/R/rolling_metrics.R index 4f650fc..9d27ee1 100644 --- a/R/rolling_metrics.R +++ b/R/rolling_metrics.R @@ -1,16 +1,3 @@ -# Count non-NA values inside each rolling window using vectorised cumsum. -.count_non_na <- function(x, window_size) { - cum <- cumsum(!is.na(x)) - lagged <- c(rep(0L, window_size), cum)[seq_along(x)] - cum - lagged -} - -.apply_min_periods <- function(result, x, window_size, min_periods) { - if (min_periods == 0L || length(x) == 0L) return(result) - result[.count_non_na(x, window_size) < min_periods] <- NA_real_ - result -} - .check_window <- function(window_size) { if (!is.finite(window_size) || window_size < 1L) stop("window_size must be a positive finite integer.", call. = FALSE) @@ -33,6 +20,9 @@ #' @param min_periods Minimum number of non-\code{NA} observations required in #' a window to return a result. Defaults to \code{window_size} (pandas #' semantics). Positions with fewer non-\code{NA} values yield \code{NA}. +#' @param method \code{"stable"} (default) uses the Welford online algorithm. +#' \code{"fast"} uses a prefix-sum approach (faster, but susceptible to +#' catastrophic cancellation when values are large and variance is small). #' #' @return #' A numeric vector with rolling sample variance values. Entries are @@ -45,13 +35,17 @@ #' @examples #' x <- as.double(c(1, 2, 3, 4)) #' rolling_variance(x, 3L) -rolling_variance <- function(x, window_size, min_periods = window_size) { +rolling_variance <- function(x, window_size, min_periods = window_size, + method = "stable") { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_variance_c", x, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + if (method == "fast") + .Call("rolling_variance_fast_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") + else + .Call("rolling_variance_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } #' @title Rolling Maximum @@ -76,9 +70,8 @@ rolling_max <- function(x, window_size, min_periods = window_size) { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_max_c", x, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + .Call("rolling_max_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } #' @title Rolling Minimum @@ -103,9 +96,8 @@ rolling_min <- function(x, window_size, min_periods = window_size) { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_min_c", x, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + .Call("rolling_min_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } #' @title Rolling Median @@ -131,9 +123,8 @@ rolling_median <- function(x, window_size, min_periods = window_size) { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_median_c", x, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + .Call("rolling_median_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } #' @title Rolling Mean @@ -145,6 +136,10 @@ rolling_median <- function(x, window_size, min_periods = window_size) { #' @param window_size Positive integer window length. #' @param min_periods Minimum number of non-\code{NA} observations required in #' a window to return a result. Defaults to \code{window_size}. +#' @param assume_finite If \code{TRUE}, assumes the input contains no +#' \code{NA} values and uses a faster SIMD prefix-sum path. Passing +#' \code{TRUE} when \code{NA}s are present produces incorrect results. +#' Defaults to \code{FALSE}. #' #' @return A numeric vector with rolling mean values. #' @@ -153,13 +148,13 @@ rolling_median <- function(x, window_size, min_periods = window_size) { #' @examples #' x <- as.double(c(1, 2, 3, 4)) #' rolling_mean(x, 3L) -rolling_mean <- function(x, window_size, min_periods = window_size) { +rolling_mean <- function(x, window_size, min_periods = window_size, + assume_finite = FALSE) { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_mean_c", x, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + .Call("rolling_mean_c", x, as.integer(window_size), as.integer(mp), + as.logical(assume_finite), PACKAGE = "robustrolling") } #' @title Rolling Skewness @@ -172,6 +167,9 @@ rolling_mean <- function(x, window_size, min_periods = window_size) { #' @param window_size Positive integer window length. #' @param min_periods Minimum number of non-\code{NA} observations required in #' a window to return a result. Defaults to \code{window_size}. +#' @param method \code{"stable"} (default) uses Terriberry's online algorithm. +#' \code{"fast"} uses a prefix-sum approach (faster, but susceptible to +#' catastrophic cancellation when values are large and variance is small). #' #' @return A numeric vector with rolling skewness values. #' @@ -180,13 +178,17 @@ rolling_mean <- function(x, window_size, min_periods = window_size) { #' @examples #' x <- as.double(c(1, 2, 3, 4, 5)) #' rolling_skewness(x, 3L) -rolling_skewness <- function(x, window_size, min_periods = window_size) { +rolling_skewness <- function(x, window_size, min_periods = window_size, + method = "stable") { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_skewness_c", x, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + if (method == "fast") + .Call("rolling_skewness_fast_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") + else + .Call("rolling_skewness_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } #' @title Rolling Kurtosis @@ -199,6 +201,9 @@ rolling_skewness <- function(x, window_size, min_periods = window_size) { #' @param window_size Positive integer window length. #' @param min_periods Minimum number of non-\code{NA} observations required in #' a window to return a result. Defaults to \code{window_size}. +#' @param method \code{"stable"} (default) uses Terriberry's online algorithm. +#' \code{"fast"} uses a prefix-sum approach (faster, but susceptible to +#' catastrophic cancellation when values are large and variance is small). #' #' @return A numeric vector with rolling excess kurtosis values. #' @@ -207,13 +212,17 @@ rolling_skewness <- function(x, window_size, min_periods = window_size) { #' @examples #' x <- as.double(c(1, 2, 3, 4, 5)) #' rolling_kurtosis(x, 4L) -rolling_kurtosis <- function(x, window_size, min_periods = window_size) { +rolling_kurtosis <- function(x, window_size, min_periods = window_size, + method = "stable") { x <- as.double(x) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_kurtosis_c", x, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + if (method == "fast") + .Call("rolling_kurtosis_fast_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") + else + .Call("rolling_kurtosis_c", x, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } #' @title Rolling Covariance @@ -241,9 +250,8 @@ rolling_cov <- function(x, y, window_size, min_periods = window_size) { if (length(x) != length(y)) stop("x and y must have the same length.", call. = FALSE) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_cov_c", x, y, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + .Call("rolling_cov_c", x, y, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } #' @title Rolling Correlation @@ -271,7 +279,6 @@ rolling_cor <- function(x, y, window_size, min_periods = window_size) { if (length(x) != length(y)) stop("x and y must have the same length.", call. = FALSE) .check_window(window_size) mp <- .check_min_periods(min_periods, window_size) - result <- .Call("rolling_cor_c", x, y, as.integer(window_size), - PACKAGE = "robustrolling") - .apply_min_periods(result, x, window_size, mp) + .Call("rolling_cor_c", x, y, as.integer(window_size), as.integer(mp), + PACKAGE = "robustrolling") } diff --git a/include/MonotonicMax.hpp b/include/MonotonicMax.hpp index c356219..048bdd9 100644 --- a/include/MonotonicMax.hpp +++ b/include/MonotonicMax.hpp @@ -28,4 +28,5 @@ class MonotonicMax : public RollingMetric { std::size_t window_size_; std::size_t current_tick_; std::deque deque_; + std::deque non_nan_ticks_; }; diff --git a/include/MonotonicMin.hpp b/include/MonotonicMin.hpp index 2b3ffc1..4d8e2be 100644 --- a/include/MonotonicMin.hpp +++ b/include/MonotonicMin.hpp @@ -28,4 +28,5 @@ class MonotonicMin : public RollingMetric { std::size_t window_size_; std::size_t current_tick_; std::deque deque_; + std::deque non_nan_ticks_; }; diff --git a/include/RollingMetric.hpp b/include/RollingMetric.hpp index ef7c9ca..9c287a6 100644 --- a/include/RollingMetric.hpp +++ b/include/RollingMetric.hpp @@ -21,7 +21,7 @@ template class RollingMetric { void skip() { static_cast(this)->skip_impl(); } void process_batch(const double *input_data, std::size_t length, - double *output_data) { + double *output_data, std::size_t min_periods = 0) { for (std::size_t i = 0; i < length; ++i) { if (std::isnan(input_data[i])) { this->skip(); @@ -30,6 +30,8 @@ template class RollingMetric { this->update(input_data[i]); output_data[i] = this->get_value(); } + if (this->current_size() < min_periods) + output_data[i] = std::numeric_limits::quiet_NaN(); } } diff --git a/include/SlidingCovariance.hpp b/include/SlidingCovariance.hpp index eb205bb..8ec59e5 100644 --- a/include/SlidingCovariance.hpp +++ b/include/SlidingCovariance.hpp @@ -8,6 +8,7 @@ class SlidingCovariance { public: explicit SlidingCovariance(std::size_t window_size); void update(double x, double y); + std::size_t current_size() const; double get_covariance() const; double get_correlation() const; double get_mean_x() const; diff --git a/include/SlidingMean.hpp b/include/SlidingMean.hpp new file mode 100644 index 0000000..9eacaa1 --- /dev/null +++ b/include/SlidingMean.hpp @@ -0,0 +1,32 @@ +#pragma once +#include "RollingMetric.hpp" +#include +#include +#include +#include +#include + +class SlidingMean : public RollingMetric { + friend class RollingMetric; + +public: + explicit SlidingMean(std::size_t window_size); + double get_mean() const; + void fast_mean_batch(const double *in, std::size_t n, double *out, + std::size_t min_periods) const; + void fast_mean_batch_finite(const double *in, std::size_t n, double *out, + std::size_t min_periods) const; + +private: + void update_impl(double value); + void skip_impl(); + double get_value_impl() const; + std::size_t current_size_impl() const; + + std::size_t window_size_; + std::vector buffer_; + std::size_t head_{0}; + std::size_t n_written_{0}; + std::size_t count_{0}; + double sum_{0.0}; +}; diff --git a/include/SlidingMoments.hpp b/include/SlidingMoments.hpp index 12def1d..989f8c1 100644 --- a/include/SlidingMoments.hpp +++ b/include/SlidingMoments.hpp @@ -8,6 +8,7 @@ class SlidingMoments { public: explicit SlidingMoments(std::size_t window_size); void update(double x); + void update_skewness_only(double value); void reset(); std::size_t current_size() const; double get_skewness() const; // adjusted Fisher-Pearson diff --git a/include/SlidingMomentsPrefix.hpp b/include/SlidingMomentsPrefix.hpp new file mode 100644 index 0000000..44c03d8 --- /dev/null +++ b/include/SlidingMomentsPrefix.hpp @@ -0,0 +1,20 @@ +#pragma once +#include +#include +#include +#include + +class SlidingMomentsPrefix { +public: + explicit SlidingMomentsPrefix(std::size_t window_size); + + void variance_batch(const double *in, std::size_t n, double *out, + std::size_t min_periods) const; + void skewness_batch(const double *in, std::size_t n, double *out, + std::size_t min_periods) const; + void kurtosis_batch(const double *in, std::size_t n, double *out, + std::size_t min_periods) const; + +private: + std::size_t window_size_; +}; \ No newline at end of file diff --git a/inst/tinytest/test_rolling_moments.R b/inst/tinytest/test_rolling_moments.R index 2902e51..1afb360 100644 --- a/inst/tinytest/test_rolling_moments.R +++ b/inst/tinytest/test_rolling_moments.R @@ -148,3 +148,49 @@ expect_equal(robustrolling::rolling_kurtosis(numeric(0), 4L), numeric(0)) # Input validation expect_true(is.double(robustrolling::rolling_kurtosis(1:5, 4L))) expect_error(robustrolling::rolling_kurtosis(as.double(1:5), 0L)) + +# ---- method = "fast" (SlidingMomentsPrefix) ---------------------------------- + +x_fast <- as.double(c(1, 3, -1, 5, 2, 8, 0, 4)) + +# fast skewness matches stable on well-conditioned data +for (k in c(4L, 5L, 6L)) { + expect_equal( + robustrolling::rolling_skewness(x_fast, k, min_periods = 1L, method = "fast"), + robustrolling::rolling_skewness(x_fast, k, min_periods = 1L), + tolerance = 1e-9 + ) +} + +# fast kurtosis matches stable on well-conditioned data +for (k in c(4L, 5L, 6L)) { + expect_equal( + robustrolling::rolling_kurtosis(x_fast, k, min_periods = 1L, method = "fast"), + robustrolling::rolling_kurtosis(x_fast, k, min_periods = 1L), + tolerance = 1e-9 + ) +} + +# warmup NAs — skewness needs 3 obs, kurtosis needs 4 +x_lin <- as.double(1:10) +expect_true(all(is.na(robustrolling::rolling_skewness(x_lin, 5L, method = "fast")[1:4]))) +expect_true(all(is.na(robustrolling::rolling_kurtosis(x_lin, 5L, method = "fast")[1:4]))) + +# min_periods respected +x_mp <- as.double(c(1, 2, 3, 4, 5)) +out_mp <- robustrolling::rolling_skewness(x_mp, 5L, min_periods = 3L, method = "fast") +expect_true(is.na(out_mp[1])) +expect_true(is.na(out_mp[2])) +expect_false(is.na(out_mp[3])) + +# NA in input handled correctly +x_na2 <- as.double(c(1, 2, NA_real_, 4, 5, 6, 7, 8)) +expect_equal( + robustrolling::rolling_skewness(x_na2, 5L, min_periods = 1L, method = "fast"), + robustrolling::rolling_skewness(x_na2, 5L, min_periods = 1L), + tolerance = 1e-9 +) + +# empty input +expect_equal(robustrolling::rolling_skewness(numeric(0), 3L, method = "fast"), numeric(0)) +expect_equal(robustrolling::rolling_kurtosis(numeric(0), 4L, method = "fast"), numeric(0)) diff --git a/inst/tinytest/test_rolling_variance.R b/inst/tinytest/test_rolling_variance.R index ee45543..35aacd2 100644 --- a/inst/tinytest/test_rolling_variance.R +++ b/inst/tinytest/test_rolling_variance.R @@ -95,3 +95,40 @@ expect_equal(res_mp0, robustrolling::rolling_variance(x_full, 3L, min_periods = # min_periods validation expect_error(robustrolling::rolling_variance(as.double(1:5), 3L, min_periods = -1L)) expect_error(robustrolling::rolling_variance(as.double(1:5), 3L, min_periods = 4L)) + +# ---- method = "fast" (SlidingMomentsPrefix) ---------------------------------- + +x_fast <- as.double(c(1, 3, -1, 5, 2, 8, 0, 4)) + +# fast matches stable on well-conditioned data +for (k in c(3L, 4L, 5L)) { + expect_equal( + robustrolling::rolling_variance(x_fast, k, min_periods = 1L, method = "fast"), + robustrolling::rolling_variance(x_fast, k, min_periods = 1L), + tolerance = 1e-9 + ) +} + +# warmup NAs — variance needs 2 obs +x_lin <- as.double(1:8) +expect_true(is.na(robustrolling::rolling_variance(x_lin, 4L, method = "fast")[1])) +expect_true(is.na(robustrolling::rolling_variance(x_lin, 4L, method = "fast")[2])) +expect_true(is.na(robustrolling::rolling_variance(x_lin, 4L, method = "fast")[3])) +expect_false(is.na(robustrolling::rolling_variance(x_lin, 4L, method = "fast")[4])) + +# min_periods respected +x_mp <- as.double(c(1, 2, 3, 4, 5)) +out_mp <- robustrolling::rolling_variance(x_mp, 4L, min_periods = 2L, method = "fast") +expect_true(is.na(out_mp[1])) +expect_false(is.na(out_mp[2])) + +# NA in input handled correctly +x_na2 <- as.double(c(1, 2, NA_real_, 4, 5, 6, 7, 8)) +expect_equal( + robustrolling::rolling_variance(x_na2, 4L, min_periods = 1L, method = "fast"), + robustrolling::rolling_variance(x_na2, 4L, min_periods = 1L), + tolerance = 1e-9 +) + +# empty input +expect_equal(robustrolling::rolling_variance(numeric(0), 3L, method = "fast"), numeric(0)) diff --git a/man/rolling_kurtosis.Rd b/man/rolling_kurtosis.Rd index f33caa7..f63faed 100644 --- a/man/rolling_kurtosis.Rd +++ b/man/rolling_kurtosis.Rd @@ -4,7 +4,7 @@ \alias{rolling_kurtosis} \title{Rolling Kurtosis} \usage{ -rolling_kurtosis(x, window_size, min_periods = window_size) +rolling_kurtosis(x, window_size, min_periods = window_size, method = "stable") } \arguments{ \item{x}{A numeric vector of type double.} @@ -13,6 +13,10 @@ rolling_kurtosis(x, window_size, min_periods = window_size) \item{min_periods}{Minimum number of non-\code{NA} observations required in a window to return a result. Defaults to \code{window_size}.} + +\item{method}{\code{"stable"} (default) uses Terriberry's online algorithm. +\code{"fast"} uses a prefix-sum approach (faster, but susceptible to +catastrophic cancellation when values are large and variance is small).} } \value{ A numeric vector with rolling excess kurtosis values. diff --git a/man/rolling_mean.Rd b/man/rolling_mean.Rd index ed5f340..f03110b 100644 --- a/man/rolling_mean.Rd +++ b/man/rolling_mean.Rd @@ -4,7 +4,7 @@ \alias{rolling_mean} \title{Rolling Mean} \usage{ -rolling_mean(x, window_size, min_periods = window_size) +rolling_mean(x, window_size, min_periods = window_size, assume_finite = FALSE) } \arguments{ \item{x}{A numeric vector of type double.} @@ -13,6 +13,11 @@ rolling_mean(x, window_size, min_periods = window_size) \item{min_periods}{Minimum number of non-\code{NA} observations required in a window to return a result. Defaults to \code{window_size}.} + +\item{assume_finite}{If \code{TRUE}, assumes the input contains no +\code{NA} values and uses a faster SIMD prefix-sum path. Passing +\code{TRUE} when \code{NA}s are present produces incorrect results. +Defaults to \code{FALSE}.} } \value{ A numeric vector with rolling mean values. diff --git a/man/rolling_skewness.Rd b/man/rolling_skewness.Rd index 1069bb3..639c38f 100644 --- a/man/rolling_skewness.Rd +++ b/man/rolling_skewness.Rd @@ -4,7 +4,7 @@ \alias{rolling_skewness} \title{Rolling Skewness} \usage{ -rolling_skewness(x, window_size, min_periods = window_size) +rolling_skewness(x, window_size, min_periods = window_size, method = "stable") } \arguments{ \item{x}{A numeric vector of type double.} @@ -13,6 +13,10 @@ rolling_skewness(x, window_size, min_periods = window_size) \item{min_periods}{Minimum number of non-\code{NA} observations required in a window to return a result. Defaults to \code{window_size}.} + +\item{method}{\code{"stable"} (default) uses Terriberry's online algorithm. +\code{"fast"} uses a prefix-sum approach (faster, but susceptible to +catastrophic cancellation when values are large and variance is small).} } \value{ A numeric vector with rolling skewness values. diff --git a/man/rolling_variance.Rd b/man/rolling_variance.Rd index 8a22842..d097e39 100644 --- a/man/rolling_variance.Rd +++ b/man/rolling_variance.Rd @@ -4,7 +4,7 @@ \alias{rolling_variance} \title{Rolling Sample Variance} \usage{ -rolling_variance(x, window_size, min_periods = window_size) +rolling_variance(x, window_size, min_periods = window_size, method = "stable") } \arguments{ \item{x}{A numeric vector of type double.} @@ -14,6 +14,10 @@ rolling_variance(x, window_size, min_periods = window_size) \item{min_periods}{Minimum number of non-\code{NA} observations required in a window to return a result. Defaults to \code{window_size} (pandas semantics). Positions with fewer non-\code{NA} values yield \code{NA}.} + +\item{method}{\code{"stable"} (default) uses the Welford online algorithm. +\code{"fast"} uses a prefix-sum approach (faster, but susceptible to +catastrophic cancellation when values are large and variance is small).} } \value{ A numeric vector with rolling sample variance values. Entries are diff --git a/py_package/CMakeLists.txt b/py_package/CMakeLists.txt index 7d73631..ef466a8 100644 --- a/py_package/CMakeLists.txt +++ b/py_package/CMakeLists.txt @@ -13,10 +13,13 @@ target_sources(robust_rolling_core PRIVATE ../src_core/MonotonicMin.cpp ../src_core/MultisetMedian.cpp ../src_core/SlidingCovariance.cpp + ../src_core/SlidingMean.cpp ../src_core/SlidingMoments.cpp + ../src_core/SlidingMomentsPrefix.cpp ../src_core/SlidingWelfordRing.cpp ) -target_compile_options(robust_rolling_core PRIVATE -O3 -DNDEBUG) +target_compile_options(robust_rolling_core PRIVATE -O3 -DNDEBUG -flto) +target_link_options(robust_rolling_core PRIVATE -flto) install(TARGETS robust_rolling_core LIBRARY DESTINATION .) \ No newline at end of file diff --git a/py_package/py_wrapper.cpp b/py_package/py_wrapper.cpp index 4773a4a..1319452 100644 --- a/py_package/py_wrapper.cpp +++ b/py_package/py_wrapper.cpp @@ -2,7 +2,9 @@ #include "../include/MonotonicMin.hpp" #include "../include/MultisetMedian.hpp" #include "../include/SlidingCovariance.hpp" +#include "../include/SlidingMean.hpp" #include "../include/SlidingMoments.hpp" +#include "../include/SlidingMomentsPrefix.hpp" #include "../include/SlidingWelfordRing.hpp" #include #include @@ -14,7 +16,8 @@ template py::array_t process_batch_generic( Metric &metric, py::array_t input_array, - Getter getter) { + Getter getter, std::size_t min_periods) { + py::buffer_info in_info = input_array.request(); if (in_info.ndim != 1) throw std::runtime_error("Input must be 1D array"); @@ -36,6 +39,9 @@ py::array_t process_batch_generic( metric.update(value); out_ptr[i] = getter(metric); } + + if (metric.current_size() < min_periods) + out_ptr[i] = std::numeric_limits::quiet_NaN(); } return result_array; @@ -48,51 +54,95 @@ PYBIND11_MODULE(robust_rolling_core, m) { .def(py::init()) .def("update", &SlidingWelfordRing::update) .def("get_variance", &SlidingWelfordRing::get_value) - .def("process_batch", - [](SlidingWelfordRing &self, - py::array_t - input) { - return process_batch_generic( - self, input, - [](SlidingWelfordRing &m) { return m.get_variance(); }); - }); + .def( + "process_batch", + [](SlidingWelfordRing &self, + py::array_t + input, + std::size_t min_periods) { + return process_batch_generic( + self, input, + [](SlidingWelfordRing &m) { return m.get_variance(); }, + min_periods); + }, + py::arg("input"), py::arg("min_periods") = 0); py::class_(m, "MonotonicMax") .def(py::init()) .def("update", &MonotonicMax::update) .def("get_max", &MonotonicMax::get_max) - .def("process_batch", - [](MonotonicMax &self, - py::array_t - input) { - return process_batch_generic( - self, input, [](MonotonicMax &m) { return m.get_max(); }); - }); + .def( + "process_batch", + [](MonotonicMax &self, + py::array_t + input, + std::size_t min_periods) { + return process_batch_generic( + self, input, [](MonotonicMax &m) { return m.get_max(); }, + min_periods); + }, + py::arg("input"), py::arg("min_periods") = 0); py::class_(m, "MonotonicMin") .def(py::init()) .def("update", &MonotonicMin::update) .def("get_min", &MonotonicMin::get_min) - .def("process_batch", - [](MonotonicMin &self, - py::array_t - input) { - return process_batch_generic( - self, input, [](MonotonicMin &m) { return m.get_min(); }); - }); + .def( + "process_batch", + [](MonotonicMin &self, + py::array_t + input, + std::size_t min_periods) { + return process_batch_generic( + self, input, [](MonotonicMin &m) { return m.get_min(); }, + min_periods); + }, + py::arg("input"), py::arg("min_periods") = 0); py::class_(m, "MultisetMedian") .def(py::init()) .def("update", &MultisetMedian::update) .def("get_median", &MultisetMedian::get_median) - .def("process_batch", - [](MultisetMedian &self, - py::array_t - input) { - return process_batch_generic( - self, input, - [](MultisetMedian &m) { return m.get_median(); }); - }); + .def( + "process_batch", + [](MultisetMedian &self, + py::array_t + input, + std::size_t min_periods) { + return process_batch_generic( + self, input, [](MultisetMedian &m) { return m.get_median(); }, + min_periods); + }, + py::arg("input"), py::arg("min_periods") = 0); + + py::class_(m, "SlidingMean") + .def(py::init()) + .def("update", &SlidingMean::update) + .def("get_mean", &SlidingMean::get_mean) + .def( + "process_batch", + [](SlidingMean &self, + py::array_t + input, + std::size_t min_periods, bool assume_finite) { + py::buffer_info info = input.request(); + if (info.ndim != 1) + throw std::runtime_error("Input must be 1D array"); + std::size_t n = static_cast(info.shape[0]); + auto result = py::array_t( + py::array::ShapeContainer{info.shape[0]}, + py::array::StridesContainer{ + static_cast(sizeof(double))}); + const double *in_ptr = static_cast(info.ptr); + double *out_ptr = static_cast(result.request().ptr); + if (assume_finite) + self.fast_mean_batch_finite(in_ptr, n, out_ptr, min_periods); + else + self.fast_mean_batch(in_ptr, n, out_ptr, min_periods); + return result; + }, + py::arg("input"), py::arg("min_periods") = 0, + py::arg("assume_finite") = false); py::class_(m, "SlidingMoments") .def(py::init()) @@ -105,7 +155,9 @@ PYBIND11_MODULE(robust_rolling_core, m) { .def("process_mean_batch", [](SlidingMoments &self, py::array_t - input) { + input, + std::size_t min_periods) { + // min_periods=0 means no restriction py::buffer_info info = input.request(); if (info.ndim != 1) throw std::runtime_error("Input must be 1D array"); @@ -117,14 +169,17 @@ PYBIND11_MODULE(robust_rolling_core, m) { auto *out = static_cast(result.request().ptr); for (py::ssize_t i = 0; i < info.shape[0]; ++i) { self.update(in[i]); - out[i] = self.get_mean(); + out[i] = self.current_size() < min_periods + ? std::numeric_limits::quiet_NaN() + : self.get_mean(); } return result; }) .def("process_skewness_batch", [](SlidingMoments &self, py::array_t - input) { + input, + std::size_t min_periods) { py::buffer_info info = input.request(); if (info.ndim != 1) throw std::runtime_error("Input must be 1D array"); @@ -135,15 +190,18 @@ PYBIND11_MODULE(robust_rolling_core, m) { const auto *in = static_cast(info.ptr); auto *out = static_cast(result.request().ptr); for (py::ssize_t i = 0; i < info.shape[0]; ++i) { - self.update(in[i]); - out[i] = self.get_skewness(); + self.update_skewness_only(in[i]); + out[i] = self.current_size() < min_periods + ? std::numeric_limits::quiet_NaN() + : self.get_skewness(); } return result; }) .def("process_kurtosis_batch", [](SlidingMoments &self, py::array_t - input) { + input, + std::size_t min_periods) { py::buffer_info info = input.request(); if (info.ndim != 1) throw std::runtime_error("Input must be 1D array"); @@ -155,7 +213,9 @@ PYBIND11_MODULE(robust_rolling_core, m) { auto *out = static_cast(result.request().ptr); for (py::ssize_t i = 0; i < info.shape[0]; ++i) { self.update(in[i]); - out[i] = self.get_kurtosis(); + out[i] = self.current_size() < min_periods + ? std::numeric_limits::quiet_NaN() + : self.get_kurtosis(); } return result; }); @@ -167,48 +227,83 @@ PYBIND11_MODULE(robust_rolling_core, m) { .def("get_correlation", &SlidingCovariance::get_correlation) .def("get_mean_x", &SlidingCovariance::get_mean_x) .def("get_mean_y", &SlidingCovariance::get_mean_y) - .def("process_covariance_batch", - [](SlidingCovariance &self, - py::array_t x, - py::array_t y) { - py::buffer_info xi = x.request(), yi = y.request(); - if (xi.ndim != 1 || yi.ndim != 1) - throw std::runtime_error("Inputs must be 1D arrays"); - if (xi.shape[0] != yi.shape[0]) - throw std::runtime_error("x and y must have the same length"); - auto result = py::array_t( - py::array::ShapeContainer{xi.shape[0]}, - py::array::StridesContainer{ - static_cast(sizeof(double))}); - const auto *xp = static_cast(xi.ptr); - const auto *yp = static_cast(yi.ptr); - auto *out = static_cast(result.request().ptr); - for (py::ssize_t i = 0; i < xi.shape[0]; ++i) { - self.update(xp[i], yp[i]); - out[i] = self.get_covariance(); - } - return result; - }) - .def("process_correlation_batch", - [](SlidingCovariance &self, - py::array_t x, - py::array_t y) { - py::buffer_info xi = x.request(), yi = y.request(); - if (xi.ndim != 1 || yi.ndim != 1) - throw std::runtime_error("Inputs must be 1D arrays"); - if (xi.shape[0] != yi.shape[0]) - throw std::runtime_error("x and y must have the same length"); - auto result = py::array_t( - py::array::ShapeContainer{xi.shape[0]}, - py::array::StridesContainer{ - static_cast(sizeof(double))}); - const auto *xp = static_cast(xi.ptr); - const auto *yp = static_cast(yi.ptr); - auto *out = static_cast(result.request().ptr); - for (py::ssize_t i = 0; i < xi.shape[0]; ++i) { - self.update(xp[i], yp[i]); - out[i] = self.get_correlation(); - } - return result; - }); + .def( + "process_covariance_batch", + [](SlidingCovariance &self, + py::array_t x, + py::array_t y) { + py::buffer_info xi = x.request(), yi = y.request(); + if (xi.ndim != 1 || yi.ndim != 1) + throw std::runtime_error("Inputs must be 1D arrays"); + if (xi.shape[0] != yi.shape[0]) + throw std::runtime_error("x and y must have the same length"); + auto result = py::array_t( + py::array::ShapeContainer{xi.shape[0]}, + py::array::StridesContainer{ + static_cast(sizeof(double))}); + const auto *xp = static_cast(xi.ptr); + const auto *yp = static_cast(yi.ptr); + auto *out = static_cast(result.request().ptr); + for (py::ssize_t i = 0; i < xi.shape[0]; ++i) { + self.update(xp[i], yp[i]); + out[i] = self.get_covariance(); + } + return result; + }) + .def( + "process_correlation_batch", + [](SlidingCovariance &self, + py::array_t x, + py::array_t y) { + py::buffer_info xi = x.request(), yi = y.request(); + if (xi.ndim != 1 || yi.ndim != 1) + throw std::runtime_error("Inputs must be 1D arrays"); + if (xi.shape[0] != yi.shape[0]) + throw std::runtime_error("x and y must have the same length"); + auto result = py::array_t( + py::array::ShapeContainer{xi.shape[0]}, + py::array::StridesContainer{ + static_cast(sizeof(double))}); + const auto *xp = static_cast(xi.ptr); + const auto *yp = static_cast(yi.ptr); + auto *out = static_cast(result.request().ptr); + for (py::ssize_t i = 0; i < xi.shape[0]; ++i) { + self.update(xp[i], yp[i]); + out[i] = self.get_correlation(); + } + return result; + }); + + auto make_prefix_batch = [&](auto method_ptr) { + return [method_ptr]( + SlidingMomentsPrefix &self, + py::array_t + input, + std::size_t min_periods) { + py::buffer_info info = input.request(); + if (info.ndim != 1) + throw std::runtime_error("Input must be 1D array"); + std::size_t n = static_cast(info.shape[0]); + auto result = py::array_t( + py::array::ShapeContainer{info.shape[0]}, + py::array::StridesContainer{ + static_cast(sizeof(double))}); + (self.*method_ptr)(static_cast(info.ptr), n, + static_cast(result.request().ptr), + min_periods); + return result; + }; + }; + + py::class_(m, "SlidingMomentsPrefix") + .def(py::init()) + .def("variance_batch", + make_prefix_batch(&SlidingMomentsPrefix::variance_batch), + py::arg("input"), py::arg("min_periods") = 0) + .def("skewness_batch", + make_prefix_batch(&SlidingMomentsPrefix::skewness_batch), + py::arg("input"), py::arg("min_periods") = 0) + .def("kurtosis_batch", + make_prefix_batch(&SlidingMomentsPrefix::kurtosis_batch), + py::arg("input"), py::arg("min_periods") = 0); } diff --git a/py_package/robustrolling/__init__.py b/py_package/robustrolling/__init__.py index 42d9e8c..f821da8 100644 --- a/py_package/robustrolling/__init__.py +++ b/py_package/robustrolling/__init__.py @@ -7,7 +7,9 @@ MonotonicMin, MultisetMedian, SlidingCovariance, + SlidingMean, SlidingMoments, + SlidingMomentsPrefix, SlidingWelford, ) @@ -48,26 +50,6 @@ def _wrap(result: np.ndarray, original): return result -def _count_non_nan_in_window(arr: np.ndarray, window_size: int) -> np.ndarray: - not_nan = (~np.isnan(arr)).astype(np.float64) - cum = np.cumsum(not_nan) - lagged = np.empty_like(cum) - lagged[:window_size] = 0.0 - if len(arr) > window_size: - lagged[window_size:] = cum[:-window_size] - return cum - lagged - - -def _apply_min_periods(result: np.ndarray, arr: np.ndarray, - window_size: int, min_periods: int) -> np.ndarray: - if min_periods == 0 or len(arr) == 0: - return result - non_na_count = _count_non_nan_in_window(arr, window_size) - result = result.copy() - result[non_na_count < min_periods] = np.nan - return result - - def _resolve_min_periods(min_periods: int | None, window_size: int) -> int: mp = window_size if min_periods is None else int(min_periods) if mp < 0 or mp > window_size: @@ -107,8 +89,7 @@ def rolling_max(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = MonotonicMax(window_size).process_batch(arr) - result = _apply_min_periods(result, arr, window_size, mp) + result = MonotonicMax(window_size).process_batch(arr, mp) return _wrap(result, x) @@ -142,17 +123,15 @@ def rolling_min(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = MonotonicMin(window_size).process_batch(arr) - result = _apply_min_periods(result, arr, window_size, mp) + result = MonotonicMin(window_size).process_batch(arr, mp) return _wrap(result, x) -def rolling_variance(x, window_size: int, min_periods: int | None = None): +def rolling_variance(x, window_size: int, min_periods: int | None = None, + method: str = "stable"): """ Compute the rolling sample variance (ddof=1) over a sliding window. - Uses the Welford online algorithm with a ring buffer for O(1) updates. - Parameters ---------- x : array-like @@ -162,6 +141,11 @@ def rolling_variance(x, window_size: int, min_periods: int | None = None): min_periods : int, optional Minimum number of non-NaN observations required to return a result. Defaults to ``window_size`` (pandas-compatible semantics). + method : {"stable", "fast"}, optional + ``"stable"`` uses the Welford online algorithm (numerically stable, + default). ``"fast"`` uses a prefix-sum approach (faster for large + arrays, but susceptible to catastrophic cancellation when values are + large and variance is small). Returns ------- @@ -180,8 +164,10 @@ def rolling_variance(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = SlidingWelford(window_size).process_batch(arr) - result = _apply_min_periods(result, arr, window_size, mp) + if method == "fast": + result = SlidingMomentsPrefix(window_size).variance_batch(arr, mp) + else: + result = SlidingWelford(window_size).process_batch(arr, mp) return _wrap(result, x) @@ -218,12 +204,11 @@ def rolling_median(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = MultisetMedian(window_size).process_batch(arr) - result = _apply_min_periods(result, arr, window_size, mp) + result = MultisetMedian(window_size).process_batch(arr, mp) return _wrap(result, x) -def rolling_mean(x, window_size: int, min_periods: int | None = None): +def rolling_mean(x, window_size: int, min_periods: int | None = None, assume_finite: bool = False): """ Compute the rolling arithmetic mean over a sliding window. @@ -236,7 +221,11 @@ def rolling_mean(x, window_size: int, min_periods: int | None = None): min_periods : int, optional Minimum number of non-NaN observations required to return a result. Defaults to ``window_size`` (pandas-compatible semantics). - + assume_finite : bool, optional + If ``True``, assumes the input contains no ``NaN`` values and uses a + faster single-pass prefix-sum path with SIMD acceleration. + Passing ``True`` when the input does contain ``NaN`` produces + incorrect results. Defaults to ``False``. Returns ------- numpy.ndarray or pandas.Series @@ -253,18 +242,15 @@ def rolling_mean(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = SlidingMoments(window_size).process_mean_batch(arr) - result = _apply_min_periods(result, arr, window_size, mp) + result = SlidingMean(window_size).process_batch(arr, mp, assume_finite) return _wrap(result, x) -def rolling_skewness(x, window_size: int, min_periods: int | None = None): +def rolling_skewness(x, window_size: int, min_periods: int | None = None, + method: str = "stable"): """ Compute the rolling adjusted Fisher-Pearson skewness over a sliding window. - Uses Terriberry's 4th-moment online algorithm for O(1) updates. - Requires at least 3 valid observations per window. - Parameters ---------- x : array-like @@ -274,6 +260,10 @@ def rolling_skewness(x, window_size: int, min_periods: int | None = None): min_periods : int, optional Minimum number of non-NaN observations required to return a result. Defaults to ``window_size`` (pandas-compatible semantics). + method : {"stable", "fast"}, optional + ``"stable"`` uses Terriberry's online algorithm (numerically stable, + default). ``"fast"`` uses a prefix-sum approach (faster for large + arrays, but susceptible to catastrophic cancellation). Returns ------- @@ -292,16 +282,18 @@ def rolling_skewness(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = SlidingMoments(window_size).process_skewness_batch(arr) - result = _apply_min_periods(result, arr, window_size, mp) + if method == "fast": + result = SlidingMomentsPrefix(window_size).skewness_batch(arr, mp) + else: + result = SlidingMoments(window_size).process_skewness_batch(arr, mp) return _wrap(result, x) -def rolling_kurtosis(x, window_size: int, min_periods: int | None = None): +def rolling_kurtosis(x, window_size: int, min_periods: int | None = None, + method: str = "stable"): """ Compute the rolling excess kurtosis (Fisher definition) over a sliding window. - Uses Terriberry's 4th-moment online algorithm for O(1) updates. Returns excess kurtosis (normal distribution = 0). Requires at least 4 valid observations per window. @@ -314,6 +306,10 @@ def rolling_kurtosis(x, window_size: int, min_periods: int | None = None): min_periods : int, optional Minimum number of non-NaN observations required to return a result. Defaults to ``window_size`` (pandas-compatible semantics). + method : {"stable", "fast"}, optional + ``"stable"`` uses Terriberry's online algorithm (numerically stable, + default). ``"fast"`` uses a prefix-sum approach (faster for large + arrays, but susceptible to catastrophic cancellation). Returns ------- @@ -332,8 +328,10 @@ def rolling_kurtosis(x, window_size: int, min_periods: int | None = None): """ arr = _to_float64(x) mp = _resolve_min_periods(min_periods, window_size) - result = SlidingMoments(window_size).process_kurtosis_batch(arr) - result = _apply_min_periods(result, arr, window_size, mp) + if method == "fast": + result = SlidingMomentsPrefix(window_size).kurtosis_batch(arr, mp) + else: + result = SlidingMoments(window_size).process_kurtosis_batch(arr, mp) return _wrap(result, x) diff --git a/py_package/tests/test_bindings.py b/py_package/tests/test_bindings.py index e5ea449..79302df 100644 --- a/py_package/tests/test_bindings.py +++ b/py_package/tests/test_bindings.py @@ -298,6 +298,93 @@ def test_integer_input_converted_to_float64(self): out = rrc.MultisetMedian(2).process_batch(np.array([1, 2, 3], dtype=np.int32)) assert out.dtype == np.float64 +# ── C++ engine — SlidingMean ───────────────────────────────────────────────────── + +def _mean_ref(x, k): + out = np.full(len(x), np.nan) + for i in range(len(x)): + w = x[max(0, i - k + 1):i + 1] + valid = w[~np.isnan(w)] + if len(valid) >= 1: + out[i] = valid.mean() + return out + + +class TestSlidingMean: + + def test_known_values(self): + x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + out = rrc.SlidingMean(3).process_batch(x) + _nan_allclose(out, [1.0, 1.5, 2.0, 3.0, 4.0]) + + def test_window_size_1_identity(self): + x = np.array([3.0, 7.0, -1.0]) + out = rrc.SlidingMean(1).process_batch(x) + _nan_allclose(out, x) + + def test_window_larger_than_array(self): + out = rrc.SlidingMean(10).process_batch(np.array([2.0, 4.0, 6.0])) + _nan_allclose(out, [2.0, 3.0, 4.0]) + + def test_constant_sequence(self): + out = rrc.SlidingMean(3).process_batch(np.full(6, 5.0)) + np.testing.assert_allclose(out, 5.0, atol=1e-12) + + @pytest.mark.parametrize("k", [2, 3, 5]) + def test_against_naive_reference(self, k): + x = np.array([-3.0, -1.0, 0.0, 2.0, 10.0, 7.0, 7.0, 8.0]) + out = rrc.SlidingMean(k).process_batch(x) + np.testing.assert_allclose(out, _mean_ref(x, k), rtol=1e-12, atol=1e-12, + equal_nan=True) + + def test_nan_does_not_contribute(self): + x = np.array([1.0, np.nan, 3.0, 4.0, 5.0]) + out = rrc.SlidingMean(3).process_batch(x) + # window [1, nan, 3] → mean of [1, 3] = 2.0 + assert np.isclose(out[2], 2.0, atol=1e-12) + + def test_nan_advances_window(self): + x = np.array([1.0, 2.0, np.nan, 4.0, 5.0]) + out = rrc.SlidingMean(3).process_batch(x) + # window [2, nan, 4] → mean of [2, 4] = 3.0 + assert np.isclose(out[3], 3.0, atol=1e-12) + + def test_all_nan_returns_nan(self): + x = np.array([np.nan, np.nan, np.nan]) + out = rrc.SlidingMean(2).process_batch(x) + assert np.all(np.isnan(out)) + + def test_empty_input(self): + out = rrc.SlidingMean(3).process_batch(np.array([])) + assert len(out) == 0 and out.dtype == np.float64 + + def test_rejects_zero_window(self): + with pytest.raises(ValueError, match="Window length must be greater than 0"): + rrc.SlidingMean(0) + + def test_rejects_2d_input(self): + with pytest.raises(RuntimeError, match="Input must be 1D array"): + rrc.SlidingMean(2).process_batch(np.ones((2, 3))) + + def test_integer_input_converted_to_float64(self): + out = rrc.SlidingMean(2).process_batch(np.array([1, 2, 3, 4], dtype=np.int32)) + assert out.dtype == np.float64 + + def test_min_periods_masks_warmup(self): + x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + out = rrc.SlidingMean(3).process_batch(x, min_periods=3) + assert np.isnan(out[0]) and np.isnan(out[1]) + assert np.isclose(out[2], 2.0, atol=1e-12) + + def test_min_periods_with_nan_input(self): + x = np.array([1.0, np.nan, 3.0, 4.0, 5.0]) + out = rrc.SlidingMean(3).process_batch(x, min_periods=2) + # pos 1: window [1, nan] → 1 non-NaN < 2 → NaN + assert np.isnan(out[1]) + # pos 2: window [1, nan, 3] → 2 non-NaN >= 2 → 2.0 + assert np.isclose(out[2], 2.0, atol=1e-12) + + # ── C++ engine — SlidingMoments ────────────────────────────────────────────────── class TestSlidingMoments: @@ -509,6 +596,13 @@ def test_rolling_variance_matches_pandas(self, data, k, mp): kw = {} if mp is None else {"min_periods": mp} _nan_allclose(rr.rolling_variance(arr, k, **kw), expected) + @pytest.mark.parametrize("data,k,mp", _CASE_PARAMS, ids=_CASE_IDS) + def test_rolling_mean_matches_pandas(self, data, k, mp): + arr = np.array(data, dtype=np.float64) + expected = pd.Series(arr).rolling(k, min_periods=_pd_mp(mp, k)).mean().to_numpy() + kw = {} if mp is None else {"min_periods": mp} + _nan_allclose(rr.rolling_mean(arr, k, **kw), expected) + # ── High-level API — min_periods edge cases ─────────────────────────────────── @@ -895,4 +989,70 @@ def test_series_input_returns_series(self): def test_empty_input(self): out = rr.rolling_cov(np.array([]), np.array([]), 3) - assert len(out) == 0 \ No newline at end of file + assert len(out) == 0 + + +# ── method="fast" (SlidingMomentsPrefix) ───────────────────────────────────── + +class TestFastMethod: + + @pytest.mark.parametrize("fn,pd_fn", [ + (lambda x, k: rr.rolling_variance(x, k, method="fast"), + lambda s, k: s.rolling(k).var().to_numpy()), + (lambda x, k: rr.rolling_skewness(x, k, method="fast"), + lambda s, k: s.rolling(k).skew().to_numpy()), + (lambda x, k: rr.rolling_kurtosis(x, k, method="fast"), + lambda s, k: s.rolling(k).kurt().to_numpy()), + ], ids=["variance", "skewness", "kurtosis"]) + def test_fast_matches_pandas_no_nan(self, fn, pd_fn): + rng = np.random.default_rng(0) + x = rng.standard_normal(300) + k = 10 + _nan_allclose(fn(x, k), pd_fn(pd.Series(x), k), rtol=1e-9, atol=1e-9) + + @pytest.mark.parametrize("fn_stable,fn_fast", [ + (lambda x, k, mp: rr.rolling_variance(x, k, min_periods=mp), + lambda x, k, mp: rr.rolling_variance(x, k, min_periods=mp, method="fast")), + (lambda x, k, mp: rr.rolling_skewness(x, k, min_periods=mp), + lambda x, k, mp: rr.rolling_skewness(x, k, min_periods=mp, method="fast")), + (lambda x, k, mp: rr.rolling_kurtosis(x, k, min_periods=mp), + lambda x, k, mp: rr.rolling_kurtosis(x, k, min_periods=mp, method="fast")), + ], ids=["variance", "skewness", "kurtosis"]) + def test_fast_agrees_with_stable_with_nan(self, fn_stable, fn_fast): + rng = np.random.default_rng(1) + x = rng.standard_normal(300) + x[rng.random(300) < 0.1] = np.nan + k, mp = 10, 5 + _nan_allclose(fn_fast(x, k, mp), fn_stable(x, k, mp), rtol=1e-9, atol=1e-9) + + def test_fast_variance_warmup_nan(self): + x = np.arange(1.0, 11.0) + out = rr.rolling_variance(x, 5, method="fast") + assert all(np.isnan(out[:4])) + assert not np.isnan(out[4]) + + def test_fast_skewness_needs_3_obs(self): + x = np.arange(1.0, 11.0) + out = rr.rolling_skewness(x, 5, method="fast") + assert all(np.isnan(out[:4])) + + def test_fast_kurtosis_needs_4_obs(self): + x = np.arange(1.0, 11.0) + out = rr.rolling_kurtosis(x, 5, method="fast") + assert all(np.isnan(out[:4])) + + def test_fast_min_periods(self): + x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + out = rr.rolling_variance(x, 3, min_periods=2, method="fast") + assert np.isnan(out[0]) + assert not np.isnan(out[1]) + + def test_fast_all_nan_window_returns_nan(self): + x = np.array([np.nan, np.nan, np.nan, 1.0, 2.0, 3.0]) + out = rr.rolling_variance(x, 3, min_periods=1, method="fast") + assert np.isnan(out[2]) + assert not np.isnan(out[5]) + + def test_fast_empty_input(self): + for fn in [rr.rolling_variance, rr.rolling_skewness, rr.rolling_kurtosis]: + assert len(fn(np.array([]), 3, method="fast")) == 0 \ No newline at end of file diff --git a/src/SlidingCovariance.cpp b/src/SlidingCovariance.cpp deleted file mode 100644 index 00d2cc0..0000000 --- a/src/SlidingCovariance.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include "SlidingCovariance.hpp" -#include - -SlidingCovariance::SlidingCovariance(std::size_t window_size) - : window_size_(window_size) { - if (window_size_ == 0) - throw std::invalid_argument("Window length must be greater than 0"); - - buffer_x_.resize(window_size, std::numeric_limits::quiet_NaN()); - buffer_y_.resize(window_size, std::numeric_limits::quiet_NaN()); -} - -static void cov_add(double x, double y, std::size_t &count_, double &mean_x_, - double &mean_y_, double &C_, double &M2_x_, double &M2_y_) { - count_++; - double n = static_cast(count_); - double delta_x = x - mean_x_; - double delta_y = y - mean_y_; - - mean_x_ += delta_x / n; - mean_y_ += delta_y / n; - - double delta_x2 = x - mean_x_; - double delta_y2 = y - mean_y_; - - C_ += delta_x * delta_y2; - M2_x_ += delta_x * delta_x2; - M2_y_ += delta_y * delta_y2; -} - -static void cov_remove(double x_out, double y_out, std::size_t &count_, - double &mean_x_, double &mean_y_, double &C_, - double &M2_x_, double &M2_y_) { - if (count_ == 1) { - mean_x_ = 0.0; - mean_y_ = 0.0; - C_ = 0.0; - M2_x_ = 0.0; - M2_y_ = 0.0; - count_ = 0; - return; - } - - double n = static_cast(count_); - double mean_x_new = (n * mean_x_ - x_out) / (n - 1); - double mean_y_new = (n * mean_y_ - y_out) / (n - 1); - - double dx = x_out - mean_x_new; - double dy = y_out - mean_y_new; - - C_ -= dx * (y_out - mean_y_); - M2_x_ -= dx * (x_out - mean_x_); - M2_y_ -= dy * (y_out - mean_y_); - M2_x_ = std::max(0.0, M2_x_); - M2_y_ = std::max(0.0, M2_y_); - - mean_x_ = mean_x_new; - mean_y_ = mean_y_new; - count_--; -} - -double SlidingCovariance::get_mean_x() const { - if (count_ == 0) - return std::numeric_limits::quiet_NaN(); - return mean_x_; -} - -double SlidingCovariance::get_mean_y() const { - if (count_ == 0) - return std::numeric_limits::quiet_NaN(); - return mean_y_; -} - -double SlidingCovariance::get_covariance() const { - if (count_ < 2) - return std::numeric_limits::quiet_NaN(); - return C_ / static_cast(count_ - 1); -} - -double SlidingCovariance::get_correlation() const { - if (count_ < 2 || M2_x_ <= 0.0 || M2_y_ <= 0.0) - return std::numeric_limits::quiet_NaN(); - return C_ / std::sqrt(M2_x_ * M2_y_); -} - -void SlidingCovariance::update(double x, double y) { - bool ring_full = (n_written_ >= window_size_); - - if (ring_full) { - double x_out = buffer_x_[head_]; - double y_out = buffer_y_[head_]; - - if (!std::isnan(x_out) && !std::isnan(y_out)) - cov_remove(x_out, y_out, count_, mean_x_, mean_y_, C_, M2_x_, M2_y_); - } - - buffer_x_[head_] = x; - buffer_y_[head_] = y; - head_++; - if (head_ == window_size_) - head_ = 0; - if (!ring_full) - n_written_++; - - if (!std::isnan(x) && !std::isnan(y)) - cov_add(x, y, count_, mean_x_, mean_y_, C_, M2_x_, M2_y_); -} \ No newline at end of file diff --git a/src/r_wrapper.cpp b/src/r_wrapper.cpp index 9929719..25f9470 100644 --- a/src/r_wrapper.cpp +++ b/src/r_wrapper.cpp @@ -2,7 +2,9 @@ #include "MonotonicMin.hpp" #include "MultisetMedian.hpp" #include "SlidingCovariance.hpp" +#include "SlidingMean.hpp" #include "SlidingMoments.hpp" +#include "SlidingMomentsPrefix.hpp" #include "SlidingWelfordRing.hpp" #include @@ -19,7 +21,14 @@ std::size_t read_window_size(SEXP r_window_size) { return static_cast(value); } -SEXP rolling_variance_c(SEXP r_data, SEXP r_window_size) { +std::size_t read_min_periods(SEXP r_min_periods) { + int value = Rf_asInteger(r_min_periods); + if (value == NA_INTEGER || value < 0) + Rf_error("min_periods must be a non-negative integer."); + return static_cast(value); +} + +SEXP rolling_variance_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_data)) { Rf_error("Input data must be a numeric vector."); } @@ -27,6 +36,7 @@ SEXP rolling_variance_c(SEXP r_data, SEXP r_window_size) { double *input_ptr = REAL(r_data); R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); SlidingWelfordRing welford(k); @@ -34,13 +44,13 @@ SEXP rolling_variance_c(SEXP r_data, SEXP r_window_size) { PROTECT(r_result = Rf_allocVector(REALSXP, n)); double *output_ptr = REAL(r_result); - welford.process_batch(input_ptr, static_cast(n), output_ptr); + welford.process_batch(input_ptr, static_cast(n), output_ptr, mp); UNPROTECT(1); return r_result; } -SEXP rolling_max_c(SEXP r_data, SEXP r_window_size) { +SEXP rolling_max_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_data)) { Rf_error("Input data must be a numeric vector."); } @@ -48,6 +58,7 @@ SEXP rolling_max_c(SEXP r_data, SEXP r_window_size) { double *input_ptr = REAL(r_data); R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); MonotonicMax monotonic_max(k); @@ -56,13 +67,13 @@ SEXP rolling_max_c(SEXP r_data, SEXP r_window_size) { double *output_ptr = REAL(r_result); monotonic_max.process_batch(input_ptr, static_cast(n), - output_ptr); + output_ptr, mp); UNPROTECT(1); return r_result; } -SEXP rolling_min_c(SEXP r_data, SEXP r_window_size) { +SEXP rolling_min_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_data)) { Rf_error("Input data must be a numeric vector."); } @@ -70,6 +81,7 @@ SEXP rolling_min_c(SEXP r_data, SEXP r_window_size) { double *input_ptr = REAL(r_data); R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); MonotonicMin monotonic_min(k); @@ -78,13 +90,13 @@ SEXP rolling_min_c(SEXP r_data, SEXP r_window_size) { double *output_ptr = REAL(r_result); monotonic_min.process_batch(input_ptr, static_cast(n), - output_ptr); + output_ptr, mp); UNPROTECT(1); return r_result; } -SEXP rolling_median_c(SEXP r_data, SEXP r_window_size) { +SEXP rolling_median_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_data)) { Rf_error("Input data must be a numeric vector."); } @@ -92,6 +104,7 @@ SEXP rolling_median_c(SEXP r_data, SEXP r_window_size) { double *input_ptr = REAL(r_data); R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); MultisetMedian multiset_median(k); @@ -100,42 +113,47 @@ SEXP rolling_median_c(SEXP r_data, SEXP r_window_size) { double *output_ptr = REAL(r_result); multiset_median.process_batch(input_ptr, static_cast(n), - output_ptr); + output_ptr, mp); UNPROTECT(1); return r_result; } -SEXP rolling_mean_c(SEXP r_data, SEXP r_window_size) { +SEXP rolling_mean_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods, + SEXP r_assume_finite) { if (!Rf_isReal(r_data)) Rf_error("Input data must be a numeric vector."); double *input_ptr = REAL(r_data); R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); + bool assume_finite = Rf_asLogical(r_assume_finite) == TRUE; - SlidingMoments sm(k); + SlidingMean sm(k); SEXP r_result; PROTECT(r_result = Rf_allocVector(REALSXP, n)); double *output_ptr = REAL(r_result); - for (R_xlen_t i = 0; i < n; ++i) { - sm.update(input_ptr[i]); - output_ptr[i] = sm.get_mean(); - } + if (assume_finite) + sm.fast_mean_batch_finite(input_ptr, static_cast(n), + output_ptr, mp); + else + sm.fast_mean_batch(input_ptr, static_cast(n), output_ptr, mp); UNPROTECT(1); return r_result; } -SEXP rolling_skewness_c(SEXP r_data, SEXP r_window_size) { +SEXP rolling_skewness_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_data)) Rf_error("Input data must be a numeric vector."); double *input_ptr = REAL(r_data); R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); SlidingMoments sm(k); @@ -145,20 +163,23 @@ SEXP rolling_skewness_c(SEXP r_data, SEXP r_window_size) { for (R_xlen_t i = 0; i < n; ++i) { sm.update(input_ptr[i]); - output_ptr[i] = sm.get_skewness(); + output_ptr[i] = sm.current_size() < mp + ? std::numeric_limits::quiet_NaN() + : sm.get_skewness(); } UNPROTECT(1); return r_result; } -SEXP rolling_kurtosis_c(SEXP r_data, SEXP r_window_size) { +SEXP rolling_kurtosis_c(SEXP r_data, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_data)) Rf_error("Input data must be a numeric vector."); double *input_ptr = REAL(r_data); R_xlen_t n = XLENGTH(r_data); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); SlidingMoments sm(k); @@ -168,14 +189,16 @@ SEXP rolling_kurtosis_c(SEXP r_data, SEXP r_window_size) { for (R_xlen_t i = 0; i < n; ++i) { sm.update(input_ptr[i]); - output_ptr[i] = sm.get_kurtosis(); + output_ptr[i] = sm.current_size() < mp + ? std::numeric_limits::quiet_NaN() + : sm.get_kurtosis(); } UNPROTECT(1); return r_result; } -SEXP rolling_cov_c(SEXP r_x, SEXP r_y, SEXP r_window_size) { +SEXP rolling_cov_c(SEXP r_x, SEXP r_y, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_x) || !Rf_isReal(r_y)) Rf_error("Input data must be numeric vectors."); R_xlen_t n = XLENGTH(r_x); @@ -185,6 +208,7 @@ SEXP rolling_cov_c(SEXP r_x, SEXP r_y, SEXP r_window_size) { double *x_ptr = REAL(r_x); double *y_ptr = REAL(r_y); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); SlidingCovariance sc(k); @@ -194,14 +218,16 @@ SEXP rolling_cov_c(SEXP r_x, SEXP r_y, SEXP r_window_size) { for (R_xlen_t i = 0; i < n; ++i) { sc.update(x_ptr[i], y_ptr[i]); - output_ptr[i] = sc.get_covariance(); + output_ptr[i] = sc.current_size() < mp + ? std::numeric_limits::quiet_NaN() + : sc.get_covariance(); } UNPROTECT(1); return r_result; } -SEXP rolling_cor_c(SEXP r_x, SEXP r_y, SEXP r_window_size) { +SEXP rolling_cor_c(SEXP r_x, SEXP r_y, SEXP r_window_size, SEXP r_min_periods) { if (!Rf_isReal(r_x) || !Rf_isReal(r_y)) Rf_error("Input data must be numeric vectors."); R_xlen_t n = XLENGTH(r_x); @@ -211,6 +237,7 @@ SEXP rolling_cor_c(SEXP r_x, SEXP r_y, SEXP r_window_size) { double *x_ptr = REAL(r_x); double *y_ptr = REAL(r_y); std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); SlidingCovariance sc(k); @@ -220,23 +247,76 @@ SEXP rolling_cor_c(SEXP r_x, SEXP r_y, SEXP r_window_size) { for (R_xlen_t i = 0; i < n; ++i) { sc.update(x_ptr[i], y_ptr[i]); - output_ptr[i] = sc.get_correlation(); + output_ptr[i] = sc.current_size() < mp + ? std::numeric_limits::quiet_NaN() + : sc.get_correlation(); } UNPROTECT(1); return r_result; } +SEXP rolling_variance_fast_c(SEXP r_data, SEXP r_window_size, + SEXP r_min_periods) { + if (!Rf_isReal(r_data)) + Rf_error("Input data must be a numeric vector."); + double *input_ptr = REAL(r_data); + R_xlen_t n = XLENGTH(r_data); + std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); + SlidingMomentsPrefix smp(k); + SEXP r_result; + PROTECT(r_result = Rf_allocVector(REALSXP, n)); + smp.variance_batch(input_ptr, static_cast(n), REAL(r_result), mp); + UNPROTECT(1); + return r_result; +} + +SEXP rolling_skewness_fast_c(SEXP r_data, SEXP r_window_size, + SEXP r_min_periods) { + if (!Rf_isReal(r_data)) + Rf_error("Input data must be a numeric vector."); + double *input_ptr = REAL(r_data); + R_xlen_t n = XLENGTH(r_data); + std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); + SlidingMomentsPrefix smp(k); + SEXP r_result; + PROTECT(r_result = Rf_allocVector(REALSXP, n)); + smp.skewness_batch(input_ptr, static_cast(n), REAL(r_result), mp); + UNPROTECT(1); + return r_result; +} + +SEXP rolling_kurtosis_fast_c(SEXP r_data, SEXP r_window_size, + SEXP r_min_periods) { + if (!Rf_isReal(r_data)) + Rf_error("Input data must be a numeric vector."); + double *input_ptr = REAL(r_data); + R_xlen_t n = XLENGTH(r_data); + std::size_t k = read_window_size(r_window_size); + std::size_t mp = read_min_periods(r_min_periods); + SlidingMomentsPrefix smp(k); + SEXP r_result; + PROTECT(r_result = Rf_allocVector(REALSXP, n)); + smp.kurtosis_batch(input_ptr, static_cast(n), REAL(r_result), mp); + UNPROTECT(1); + return r_result; +} + static const R_CallMethodDef CallEntries[] = { - {"rolling_variance_c", reinterpret_cast(&rolling_variance_c), 2}, - {"rolling_max_c", reinterpret_cast(&rolling_max_c), 2}, - {"rolling_min_c", reinterpret_cast(&rolling_min_c), 2}, - {"rolling_median_c", reinterpret_cast(&rolling_median_c), 2}, - {"rolling_mean_c", reinterpret_cast(&rolling_mean_c), 2}, - {"rolling_skewness_c", reinterpret_cast(&rolling_skewness_c), 2}, - {"rolling_kurtosis_c", reinterpret_cast(&rolling_kurtosis_c), 2}, - {"rolling_cov_c", reinterpret_cast(&rolling_cov_c), 3}, - {"rolling_cor_c", reinterpret_cast(&rolling_cor_c), 3}, + {"rolling_variance_c", reinterpret_cast(&rolling_variance_c), 3}, + {"rolling_max_c", reinterpret_cast(&rolling_max_c), 3}, + {"rolling_min_c", reinterpret_cast(&rolling_min_c), 3}, + {"rolling_median_c", reinterpret_cast(&rolling_median_c), 3}, + {"rolling_mean_c", reinterpret_cast(&rolling_mean_c), 4}, + {"rolling_skewness_c", reinterpret_cast(&rolling_skewness_c), 3}, + {"rolling_kurtosis_c", reinterpret_cast(&rolling_kurtosis_c), 3}, + {"rolling_variance_fast_c", reinterpret_cast(&rolling_variance_fast_c), 3}, + {"rolling_skewness_fast_c", reinterpret_cast(&rolling_skewness_fast_c), 3}, + {"rolling_kurtosis_fast_c", reinterpret_cast(&rolling_kurtosis_fast_c), 3}, + {"rolling_cov_c", reinterpret_cast(&rolling_cov_c), 4}, + {"rolling_cor_c", reinterpret_cast(&rolling_cor_c), 4}, {nullptr, nullptr, 0}}; } // namespace diff --git a/src_core/MonotonicMax.cpp b/src_core/MonotonicMax.cpp index d8b8e9b..c4f1bbf 100644 --- a/src_core/MonotonicMax.cpp +++ b/src_core/MonotonicMax.cpp @@ -15,7 +15,7 @@ double MonotonicMax::get_value_impl() const { } std::size_t MonotonicMax::current_size_impl() const { - return std::min(current_tick_, window_size_); + return non_nan_ticks_.size(); } double MonotonicMax::get_max() const { @@ -27,6 +27,10 @@ void MonotonicMax::skip_impl() { current_tick_ - deque_.front().tick_index >= window_size_) { deque_.pop_front(); } + while (!non_nan_ticks_.empty() && + current_tick_ - non_nan_ticks_.front() >= window_size_) { + non_nan_ticks_.pop_front(); + } current_tick_++; } @@ -40,5 +44,10 @@ void MonotonicMax::update_impl(double value) { current_tick_ - deque_.front().tick_index >= window_size_) { deque_.pop_front(); } + while (!non_nan_ticks_.empty() && + current_tick_ - non_nan_ticks_.front() >= window_size_) { + non_nan_ticks_.pop_front(); + } + non_nan_ticks_.push_back(current_tick_); current_tick_++; } diff --git a/src_core/MonotonicMin.cpp b/src_core/MonotonicMin.cpp index 6934f41..becbe95 100644 --- a/src_core/MonotonicMin.cpp +++ b/src_core/MonotonicMin.cpp @@ -15,7 +15,7 @@ double MonotonicMin::get_value_impl() const { } std::size_t MonotonicMin::current_size_impl() const { - return std::min(current_tick_, window_size_); + return non_nan_ticks_.size(); } double MonotonicMin::get_min() const { @@ -27,6 +27,10 @@ void MonotonicMin::skip_impl() { current_tick_ - deque_.front().tick_index >= window_size_) { deque_.pop_front(); } + while (!non_nan_ticks_.empty() && + current_tick_ - non_nan_ticks_.front() >= window_size_) { + non_nan_ticks_.pop_front(); + } current_tick_++; } @@ -40,5 +44,10 @@ void MonotonicMin::update_impl(double value) { current_tick_ - deque_.front().tick_index >= window_size_) { deque_.pop_front(); } + while (!non_nan_ticks_.empty() && + current_tick_ - non_nan_ticks_.front() >= window_size_) { + non_nan_ticks_.pop_front(); + } + non_nan_ticks_.push_back(current_tick_); current_tick_++; } diff --git a/src_core/SlidingCovariance.cpp b/src_core/SlidingCovariance.cpp index 00d2cc0..ddce404 100644 --- a/src_core/SlidingCovariance.cpp +++ b/src_core/SlidingCovariance.cpp @@ -59,6 +59,8 @@ static void cov_remove(double x_out, double y_out, std::size_t &count_, count_--; } +std::size_t SlidingCovariance::current_size() const { return count_; } + double SlidingCovariance::get_mean_x() const { if (count_ == 0) return std::numeric_limits::quiet_NaN(); diff --git a/src_core/SlidingMean.cpp b/src_core/SlidingMean.cpp new file mode 100644 index 0000000..d73c9c4 --- /dev/null +++ b/src_core/SlidingMean.cpp @@ -0,0 +1,148 @@ +#include "SlidingMean.hpp" +#include + +#if defined(__ARM_NEON) +#include +#elif defined(__AVX2__) +#include +#endif + +SlidingMean::SlidingMean(std::size_t window_size) : window_size_(window_size) { + if (window_size_ == 0) + throw std::invalid_argument("Window length must be greater than 0"); + buffer_.resize(window_size_, std::numeric_limits::quiet_NaN()); +} + +void SlidingMean::fast_mean_batch(const double *in, std::size_t n, double *out, + std::size_t min_periods) const { + std::vector prefix_sum(n + 1, 0.0); + std::vector prefix_count(n + 1, 0.0); + + for (std::size_t i = 0; i < n; ++i) { + if (std::isnan(in[i])) { + prefix_sum[i + 1] = prefix_sum[i]; + prefix_count[i + 1] = prefix_count[i]; + } else { + prefix_sum[i + 1] = prefix_sum[i] + in[i]; + prefix_count[i + 1] = prefix_count[i] + 1.0; + } + } + + for (std::size_t i = 0; i < std::min(window_size_ - 1, n); ++i) { + double cnt = prefix_count[i + 1]; + out[i] = cnt >= static_cast(min_periods) + ? prefix_sum[i + 1] / cnt + : std::numeric_limits::quiet_NaN(); + } + +#if defined(__ARM_NEON) + { + float64x2_t mp_vec = vdupq_n_f64(static_cast(min_periods)); + float64x2_t nan_vec = vdupq_n_f64(std::numeric_limits::quiet_NaN()); + std::size_t i = window_size_ - 1; + for (; i + 1 < n; i += 2) { + float64x2_t ws = vsubq_f64(vld1q_f64(&prefix_sum[i + 1]), + vld1q_f64(&prefix_sum[i - window_size_ + 1])); + float64x2_t wc = + vsubq_f64(vld1q_f64(&prefix_count[i + 1]), + vld1q_f64(&prefix_count[i - window_size_ + 1])); + uint64x2_t mask = vcgeq_f64(wc, mp_vec); + vst1q_f64(&out[i], vbslq_f64(mask, vdivq_f64(ws, wc), nan_vec)); + } + for (; i < n; ++i) { + double cnt = prefix_count[i + 1] - prefix_count[i - window_size_ + 1]; + double sm = prefix_sum[i + 1] - prefix_sum[i - window_size_ + 1]; + out[i] = cnt >= static_cast(min_periods) + ? sm / cnt + : std::numeric_limits::quiet_NaN(); + } + } +#else + for (std::size_t i = window_size_ - 1; i < n; ++i) { + double cnt = prefix_count[i + 1] - prefix_count[i - window_size_ + 1]; + double sm = prefix_sum[i + 1] - prefix_sum[i - window_size_ + 1]; + out[i] = cnt >= static_cast(min_periods) + ? sm / cnt + : std::numeric_limits::quiet_NaN(); + } +#endif +} + +void SlidingMean::fast_mean_batch_finite(const double *in, std::size_t n, + double *out, + std::size_t min_periods) const { + std::vector prefix(n + 1); + prefix[0] = 0.0; + for (std::size_t i = 0; i < n; ++i) + prefix[i + 1] = prefix[i] + in[i]; + + for (std::size_t i = 0; i < std::min(window_size_ - 1, n); ++i) { + std::size_t cnt = i + 1; + out[i] = cnt >= min_periods ? prefix[i + 1] / static_cast(cnt) + : std::numeric_limits::quiet_NaN(); + } + + const double inv_k = 1.0 / static_cast(window_size_); +#if defined(__ARM_NEON) + { + float64x2_t inv_k_vec = vdupq_n_f64(inv_k); + std::size_t i = window_size_ - 1; + for (; i + 1 < n; i += 2) { + float64x2_t a = vld1q_f64(&prefix[i + 1]); + float64x2_t b = vld1q_f64(&prefix[i - window_size_ + 1]); + vst1q_f64(&out[i], vmulq_f64(vsubq_f64(a, b), inv_k_vec)); + } + for (; i < n; ++i) + out[i] = (prefix[i + 1] - prefix[i - window_size_ + 1]) * inv_k; + } +#else + for (std::size_t i = window_size_ - 1; i < n; ++i) + out[i] = (prefix[i + 1] - prefix[i - window_size_ + 1]) * inv_k; +#endif +} + +void SlidingMean::update_impl(double value) { + bool ring_full = (n_written_ >= window_size_); + if (ring_full) { + double x_out = buffer_[head_]; + if (!std::isnan(x_out)) { + sum_ -= x_out; + --count_; + } + } + buffer_[head_] = value; + head_ = (head_ + 1) % window_size_; + if (!ring_full) + ++n_written_; + sum_ += value; + ++count_; +} + +void SlidingMean::skip_impl() { + bool ring_full = (n_written_ >= window_size_); + if (ring_full) { + double x_out = buffer_[head_]; + if (!std::isnan(x_out)) { + sum_ -= x_out; + --count_; + } + } + buffer_[head_] = std::numeric_limits::quiet_NaN(); + head_ = (head_ + 1) % window_size_; + if (!ring_full) + ++n_written_; +} + +double SlidingMean::get_value_impl() const { + return get_mean(); +} + +std::size_t SlidingMean::current_size_impl() const { + return count_; +} + +double SlidingMean::get_mean() const { + if (count_ == 0) + return std::numeric_limits::quiet_NaN(); + return sum_ / static_cast(count_); +} diff --git a/src_core/SlidingMoments.cpp b/src_core/SlidingMoments.cpp index 780b39f..fef0e24 100644 --- a/src_core/SlidingMoments.cpp +++ b/src_core/SlidingMoments.cpp @@ -51,6 +51,39 @@ static void moments_remove(double x_out, std::size_t &count_, double &mean_, count_--; } +static void moments_add_3(double x, std::size_t &count_, double &mean_, + double &M2_, double &M3_) { + count_++; + double n = static_cast(count_); + double delta = x - mean_; + double delta_n = delta / n; + double term = delta * delta_n * (n - 1); + M3_ += term * delta_n * (n - 2) - 3 * delta_n * M2_; + M2_ += term; + mean_ += delta_n; +} + +static void moments_remove_3(double x_out, std::size_t &count_, double &mean_, + double &M2_, double &M3_) { + if (count_ == 1) { + mean_ = 0.0; + M2_ = 0.0; + M3_ = 0.0; + count_ = 0; + return; + } + double n = static_cast(count_); + double mean_new = (n * mean_ - x_out) / (n - 1); + double delta = x_out - mean_new; + double delta_n = delta / n; + double term = delta * delta_n * (n - 1); + double M2_old = M2_ - term; + M3_ -= term * delta_n * (n - 2) - 3 * delta_n * M2_old; + M2_ = std::max(0.0, M2_old); + mean_ = mean_new; + count_--; +} + size_t SlidingMoments::current_size() const { return count_; } @@ -114,3 +147,19 @@ void SlidingMoments::update(double value) { if (!std::isnan(value)) moments_add(value, count_, mean_, M2_, M3_, M4_); } + +void SlidingMoments::update_skewness_only(double value) { + bool ring_full = (n_written_ >= window_size_); + if (ring_full) { + double x_out = buffer_[head_]; + if (!std::isnan(x_out)) + moments_remove_3(x_out, count_, mean_, M2_, M3_); + } + buffer_[head_] = value; + if (++head_ == window_size_) + head_ = 0; + if (!ring_full) + n_written_++; + if (!std::isnan(value)) + moments_add_3(value, count_, mean_, M2_, M3_); +} \ No newline at end of file diff --git a/src_core/SlidingMomentsPrefix.cpp b/src_core/SlidingMomentsPrefix.cpp new file mode 100644 index 0000000..7fe2482 --- /dev/null +++ b/src_core/SlidingMomentsPrefix.cpp @@ -0,0 +1,126 @@ +#include "SlidingMomentsPrefix.hpp" +#include + +SlidingMomentsPrefix::SlidingMomentsPrefix(std::size_t window_size) + : window_size_(window_size) { +} + +static constexpr double kNaN = std::numeric_limits::quiet_NaN(); + +void SlidingMomentsPrefix::variance_batch(const double *in, std::size_t n, + double *out, + std::size_t min_periods) const { + std::vector ps(n + 1, 0.0), ps2(n + 1, 0.0), pn(n + 1, 0.0); + for (std::size_t i = 0; i < n; ++i) { + if (std::isnan(in[i])) { + ps[i + 1] = ps[i]; + ps2[i + 1] = ps2[i]; + pn[i + 1] = pn[i]; + } else { + ps[i + 1] = ps[i] + in[i]; + ps2[i + 1] = ps2[i] + in[i] * in[i]; + pn[i + 1] = pn[i] + 1.0; + } + } + + auto compute = [&](std::size_t l, std::size_t r) -> double { + double cnt = pn[r] - pn[l]; + if (cnt < 2.0 || cnt < static_cast(min_periods)) + return kNaN; + double s = ps[r] - ps[l]; + double s2 = ps2[r] - ps2[l]; + return std::max(0.0, (s2 - s * s / cnt) / (cnt - 1.0)); + }; + + for (std::size_t i = 0; i < std::min(window_size_ - 1, n); ++i) + out[i] = compute(0, i + 1); + for (std::size_t i = window_size_ - 1; i < n; ++i) + out[i] = compute(i - window_size_ + 1, i + 1); +} + +void SlidingMomentsPrefix::skewness_batch(const double *in, std::size_t n, + double *out, + std::size_t min_periods) const { + std::vector ps(n + 1, 0), ps2(n + 1, 0), ps3(n + 1, 0), pn(n + 1, 0); + for (std::size_t i = 0; i < n; ++i) { + if (std::isnan(in[i])) { + ps[i + 1] = ps[i]; + ps2[i + 1] = ps2[i]; + ps3[i + 1] = ps3[i]; + pn[i + 1] = pn[i]; + } else { + double x = in[i]; + ps[i + 1] = ps[i] + x; + ps2[i + 1] = ps2[i] + x * x; + ps3[i + 1] = ps3[i] + x * x * x; + pn[i + 1] = pn[i] + 1.0; + } + } + + auto compute = [&](std::size_t l, std::size_t r) -> double { + double cnt = pn[r] - pn[l]; + if (cnt < 3.0 || cnt < static_cast(min_periods)) + return kNaN; + double s1 = ps[r] - ps[l]; + double s2 = ps2[r] - ps2[l]; + double s3 = ps3[r] - ps3[l]; + double mean = s1 / cnt; + double m2 = s2 / cnt - mean * mean; + if (m2 <= 0.0) + return kNaN; + double m3 = s3 / cnt - 3.0 * mean * s2 / cnt + 2.0 * mean * mean * mean; + double g1 = m3 / std::pow(m2, 1.5); + return g1 * std::sqrt(cnt * (cnt - 1.0)) / (cnt - 2.0); + }; + + for (std::size_t i = 0; i < std::min(window_size_ - 1, n); ++i) + out[i] = compute(0, i + 1); + for (std::size_t i = window_size_ - 1; i < n; ++i) + out[i] = compute(i - window_size_ + 1, i + 1); +} + +void SlidingMomentsPrefix::kurtosis_batch(const double *in, std::size_t n, + double *out, + std::size_t min_periods) const { + std::vector ps(n + 1, 0), ps2(n + 1, 0), ps3(n + 1, 0), ps4(n + 1, 0), + pn(n + 1, 0); + for (std::size_t i = 0; i < n; ++i) { + if (std::isnan(in[i])) { + ps[i + 1] = ps[i]; + ps2[i + 1] = ps2[i]; + ps3[i + 1] = ps3[i]; + ps4[i + 1] = ps4[i]; + pn[i + 1] = pn[i]; + } else { + double x = in[i], x2 = x * x; + ps[i + 1] = ps[i] + x; + ps2[i + 1] = ps2[i] + x2; + ps3[i + 1] = ps3[i] + x2 * x; + ps4[i + 1] = ps4[i] + x2 * x2; + pn[i + 1] = pn[i] + 1.0; + } + } + + auto compute = [&](std::size_t l, std::size_t r) -> double { + double cnt = pn[r] - pn[l]; + if (cnt < 4.0 || cnt < static_cast(min_periods)) + return kNaN; + double s1 = ps[r] - ps[l]; + double s2 = ps2[r] - ps2[l]; + double s3 = ps3[r] - ps3[l]; + double s4 = ps4[r] - ps4[l]; + double mean = s1 / cnt; + double m2 = s2 / cnt - mean * mean; + if (m2 <= 0.0) + return kNaN; + double m4 = s4 / cnt - 4.0 * mean * s3 / cnt + + 6.0 * mean * mean * s2 / cnt - 3.0 * mean * mean * mean * mean; + double g2 = m4 / (m2 * m2) - 3.0; + return (cnt - 1.0) / ((cnt - 2.0) * (cnt - 3.0)) * ((cnt + 1.0) * g2 + 6.0); + }; + + for (std::size_t i = 0; i < std::min(window_size_ - 1, n); ++i) + out[i] = compute(0, i + 1); + for (std::size_t i = window_size_ - 1; i < n; ++i) + out[i] = compute(i - window_size_ + 1, i + 1); +} \ No newline at end of file