diff --git a/r/NAMESPACE b/r/NAMESPACE index bd4985db..b5e1ace5 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -38,6 +38,9 @@ export(collect_features) export(continuous_palette) export(convert_matrix_type) export(convert_to_fragments) +export(cor_dense) +export(cov_dense) +export(crossprod_dense) export(discrete_palette) export(draw_trackplot_grid) export(expm1_slow) @@ -62,6 +65,7 @@ export(match_gene_symbol) export(matrix_stats) export(matrix_type) export(merge_cells) +export(merge_peaks_iterative) export(min_by_col) export(min_by_row) export(min_scalar) @@ -118,6 +122,7 @@ export(shift_fragments) export(storage_order) export(subset_lengths) export(svds) +export(tcrossprod_dense) export(tile_matrix) export(trackplot_bulk) export(trackplot_combine) diff --git a/r/NEWS.md b/r/NEWS.md index 571579dd..00eb6507 100644 --- a/r/NEWS.md +++ b/r/NEWS.md @@ -3,6 +3,7 @@ ## Features - Add `write_matrix_anndata_hdf5_dense()` which allows writing matrices in AnnData's dense format, most commonly used for `obsm` or `varm` matrices. (Thanks to @ycli1995 for pull request #166) - Add `get_demo_mat()`, `get_demo_frags()` and `remove_demo_data()` to retrieve a small test matrix/fragments object from the [PBMC 3k dataset from 10X Genomics](https://www.10xgenomics.com/datasets/pbmc-from-a-healthy-donor-granulocytes-removed-through-cell-sorting-3-k-1-standard-2-0-0). (pull request #193) +- Add `cor_dense()`, `cov_dense()`, `crossprod_dense()`, `tcrossprod_dense()` which can accelerate all-by-all feature correlation and related statistics. Recommended for up to 10k features. (pull request #243) ## Improvements - Speed up taking large subsets of large concatenated matrices, e.g. selecting 9M cells from a 10M cell matrix composed of ~100 concatenated pieces. (pull request #179) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 1d2ed577..681409e7 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -761,6 +761,10 @@ matrix_max_per_col_cpp <- function(matrix) { .Call(`_BPCells_matrix_max_per_col_cpp`, matrix) } +dense_transpose_multiply_cpp <- function(matrix, buffer_bytes, threads) { + .Call(`_BPCells_dense_transpose_multiply_cpp`, matrix, buffer_bytes, threads) +} + pseudobulk_matrix_cpp <- function(mat, cell_groups, method, transpose) { .Call(`_BPCells_pseudobulk_matrix_cpp`, mat, cell_groups, method, transpose) } diff --git a/r/R/matrix.R b/r/R/matrix.R index de5a9848..7a1b7465 100644 --- a/r/R/matrix.R +++ b/r/R/matrix.R @@ -717,6 +717,165 @@ rlang::on_load({ } }) +#' Matrix correlation, covariance, or cross-product with dense output +#' @rdname crossprod_dense +#' +#' @description +#' These functions provide optimized versions of `tcrossprod()`, `crossprod()`, `cov()`, and `cor()` that take +#' BPCells matrices as inputs and produce dense matrix outputs. These functions will run more quickly than +#' combining multiple BPCells operations to produce equivalent output. +#' They are mainly recommended for sparse matrices with ~10k features or fewer, +#' since compute and memory costs will rise with the square of the number of features (see details). +#' +#' Note that `cor_dense()` and `cov_dense()` calculate correlation/covariance of *columns* to match `cov()` and `cor()`. +#' To calculate feature correlations on a standard `feature x cell` matrix, it will be necessary to pass `t(mat)` as the +#' first argument. +#' +#' @param x (IterableMatrix) Input matrix. In general, disk-backed matrices should have cell-major storage ordering. (See details, +#' or `transpose_storage_order()`) +#' @param buffer_bytes (integer) In-memory chunk size to use during computations. Performance is best when this is slightly below +#' the largest CPU cache size on the computer running BPCells. +#' @param threads (integer) Number of threads to use during execution. Thread counts higher than 4-8 may see diminishing speedups +#' @param method (character) Must be `"pearson"`. Provided for compatibility with `cov()` and `cor()` but otherwise ignored. +#' @param ... Reserved for potential future argument additions +#' +#' @return Symmetric, square dense matrix. Results should match the base R `crossprod()`, `tcrossprod()`, `cov()`, and `cor()`. +#' +#' @details +#' +#' **Input storage order** +#' +#' The input matrix must be row-major for `crossprod_dense()`, `cor_dense()`, and `cov_dense()`. +#' For `tcrossprod_dense()`, the input must be col-major. Stated another way: when these functions are used to calculate +#' feature correlations, the cell axis should always be the major axis. The functions will raise an error if +#' the input requires running `transpose_storage_order()` to switch to the required storage order. +#' +#' **Efficiency considerations** +#' +#' These functions require only 1 or 2 matrix passes, so they will run much faster than using `%*%` to perform +#' disk-backed sparse-sparse matrix multiplies. +#' +#' The memory required for the output matrix scales with the square of the feature count and the computational +#' work scales with the square of the number of non-zero entries per feature. So in typical cases running with +#' 20k features will take 100x the time and memory compared to 2k features. Correlation of 2k features from +#' 1.3M cells should take under 1 minute single-threaded, and under 15 seconds with 8 threads. +#' +#' @examples +#' set.seed(12345) +#' base_r_mat <- matrix(rpois(3*5, lambda=1), nrow=3, ncol=5) +#' rownames(base_r_mat) <- paste0("row", 1:3) +#' base_r_mat +#' +#' mat <- as(base_r_mat, "IterableMatrix") +#' +#' tcrossprod_dense(mat) +#' +#' all.equal(tcrossprod_dense(mat), tcrossprod(base_r_mat)) +#' all.equal(crossprod_dense(t(mat)), crossprod(t(base_r_mat))) +#' all.equal(cov_dense(t(mat)), cov(t(base_r_mat))) +#' all.equal(cor_dense(t(mat)), cor(t(base_r_mat))) +#' @export +tcrossprod_dense <- function(x, ..., buffer_bytes=5e6, threads=1L) { + assert_is(x, "IterableMatrix") + if (x@transpose) rlang::abort(c("Input matrix must have col-major storage order.", "Please use transpose_storage_order().")) + assert_is_wholenumber(buffer_bytes) + assert_is_wholenumber(threads) + assert_true(buffer_bytes >= 48*nrow(x)) + rlang::check_dots_empty() + + # Specializations to speed up linear algebra offset transformations + # Notice that if X = A + R*t(C): + # X*t(X) = A*t(A) + A*C*t(R) + t(A*C*t(R)) + R*t(C)*C*t(R) + # Requires 1 extra matrix pass, and 2-4x the output memory depending on how aggressive garbage collection is + if (is(x, "TransformLinearResidual") || (is(x, "TransformScaleShift") && any(x@active_transforms[,"shift"]))) { + if (is(x, "TransformScaleShift") && any(x@active_transforms[,"shift"])) { + col_shift <- rep.int(0, ncol(x)) + row_shift <- rep.int(0, nrow(x)) + if (x@active_transforms["row", "shift"]) row_shift <- x@row_params[2,] + if (x@active_transforms["col", "shift"]) col_shift <- x@col_params[2,] + if (x@active_transforms["global", "shift"]) row_shift <- row_shift + x@global_params[2] + x@active_transforms[,"shift"] <- FALSE + + R <- cbind(1, row_shift) + C <- cbind(col_shift, 1) + } + if (is(x, "TransformLinearResidual")) { + R <- -t(x@row_params) + C <- t(x@col_params) + x <- x@matrix + } + res <- tcrossprod_dense(x, buffer_bytes=buffer_bytes, threads=threads) + if (threads > 1) x <- parallel_split(x, threads=threads, chunks=4*threads) + # Run frequent garbage collection on fast mode to hopefully avoid big matrix copies + # without slowing down execution too much + offset <- tcrossprod(x %*% C, R) + res <- res + offset + gc(verbose=FALSE, full=FALSE) + offset <- t(offset) + gc(verbose=FALSE, full=FALSE) + res <- res + offset + gc(verbose=FALSE, full=FALSE) + res <- res + R %*% crossprod(C) %*% t(R) + gc(verbose=FALSE, full=FALSE) + return(res) + } + + buffer_bytes <- as.integer(buffer_bytes) + threads <- as.integer(threads) + + res <- dense_transpose_multiply_cpp(iterate_matrix(x), buffer_bytes=buffer_bytes, threads=threads) + rownames(res) <- rownames(x) + colnames(res) <- rownames(x) + + return(res) +} + +#' @rdname crossprod_dense +#' @export +crossprod_dense <- function(x, ..., buffer_bytes=5e6, threads=1L) { + assert_is(x, "IterableMatrix") + if (!x@transpose) rlang::abort(c("Input matrix must have row-major storage order.", "Please use transpose_storage_order().")) + assert_is_wholenumber(buffer_bytes) + assert_is_wholenumber(threads) + assert_true(buffer_bytes >= 48*ncol(x)) + rlang::check_dots_empty() + + return(tcrossprod_dense(t(x), buffer_bytes=buffer_bytes, threads=threads)) +} + +#' @rdname crossprod_dense +#' @export +cov_dense <- function(x, ..., method="pearson", buffer_bytes=5e6, threads=1L) { + assert_is(x, "IterableMatrix") + if (!x@transpose) rlang::abort(c("Input matrix must have row-major storage order.", "Please use transpose_storage_order().")) + assert_is_wholenumber(buffer_bytes) + assert_is_wholenumber(threads) + assert_true(buffer_bytes >= 48*ncol(x)) + assert_true(method == "pearson") + + col_means <- matrix_stats(x, threads=threads, col_stats="mean")$col_stats["mean",] + res <- crossprod_dense(add_cols(x, -col_means), buffer_bytes=buffer_bytes, threads=threads) + res <- res / (nrow(x) - 1) + return(res) +} + +#' @rdname crossprod_dense +#' @export +cor_dense <- function(x, ..., method="pearson", buffer_bytes=5e6, threads=1L) { + assert_is(x, "IterableMatrix") + if (!x@transpose) rlang::abort(c("Input matrix must have row-major storage order.", "Please use transpose_storage_order().")) + assert_is_wholenumber(buffer_bytes) + assert_is_wholenumber(threads) + assert_true(buffer_bytes >= 48*ncol(x)) + assert_true(method == "pearson") + + res <- cov_dense(x, method=method, buffer_bytes=buffer_bytes, threads=threads) + std_dev_inv <- sqrt(1/diag(res)) + res <- t(res * std_dev_inv) * std_dev_inv + return(res) +} + + # Index subsetting setClass("MatrixSubset", contains = "IterableMatrix", diff --git a/r/man/crossprod_dense.Rd b/r/man/crossprod_dense.Rd new file mode 100644 index 00000000..f312051c --- /dev/null +++ b/r/man/crossprod_dense.Rd @@ -0,0 +1,77 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/matrix.R +\name{tcrossprod_dense} +\alias{tcrossprod_dense} +\alias{crossprod_dense} +\alias{cov_dense} +\alias{cor_dense} +\title{Matrix correlation, covariance, or cross-product with dense output} +\usage{ +tcrossprod_dense(x, ..., buffer_bytes = 5e+06, threads = 1L) + +crossprod_dense(x, ..., buffer_bytes = 5e+06, threads = 1L) + +cov_dense(x, ..., method = "pearson", buffer_bytes = 5e+06, threads = 1L) + +cor_dense(x, ..., method = "pearson", buffer_bytes = 5e+06, threads = 1L) +} +\arguments{ +\item{x}{(IterableMatrix) Input matrix. In general, disk-backed matrices should have cell-major storage ordering. (See details, +or \code{transpose_storage_order()})} + +\item{...}{Reserved for potential future argument additions} + +\item{buffer_bytes}{(integer) In-memory chunk size to use during computations. Performance is best when this is slightly below +the largest CPU cache size on the computer running BPCells.} + +\item{threads}{(integer) Number of threads to use during execution. Thread counts higher than 4-8 may see diminishing speedups} + +\item{method}{(character) Must be \code{"pearson"}. Provided for compatibility with \code{cov()} and \code{cor()} but otherwise ignored.} +} +\value{ +Symmetric, square dense matrix. Results should match the base R \code{crossprod()}, \code{tcrossprod()}, \code{cov()}, and \code{cor()}. +} +\description{ +These functions provide optimized versions of \code{tcrossprod()}, \code{crossprod()}, \code{cov()}, and \code{cor()} that take +BPCells matrices as inputs and produce dense matrix outputs. These functions will run more quickly than +combining multiple BPCells operations to produce equivalent output. +They are mainly recommended for sparse matrices with ~10k features or fewer, +since compute and memory costs will rise with the square of the number of features (see details). + +Note that \code{cor_dense()} and \code{cov_dense()} calculate correlation/covariance of \emph{columns} to match \code{cov()} and \code{cor()}. +To calculate feature correlations on a standard \verb{feature x cell} matrix, it will be necessary to pass \code{t(mat)} as the +first argument. +} +\details{ +\strong{Input storage order} + +The input matrix must be row-major for \code{crossprod_dense()}, \code{cor_dense()}, and \code{cov_dense()}. +For \code{tcrossprod_dense()}, the input must be col-major. Stated another way: when these functions are used to calculate +feature correlations, the cell axis should always be the major axis. The functions will raise an error if +the input requires running \code{transpose_storage_order()} to switch to the required storage order. + +\strong{Efficiency considerations} + +These functions require only 1 or 2 matrix passes, so they will run much faster than using \code{\%*\%} to perform +disk-backed sparse-sparse matrix multiplies. + +The memory required for the output matrix scales with the square of the feature count and the computational +work scales with the square of the number of non-zero entries per feature. So in typical cases running with +20k features will take 100x the time and memory compared to 2k features. Correlation of 2k features from +1.3M cells should take under 1 minute single-threaded, and under 15 seconds with 8 threads. +} +\examples{ +set.seed(12345) +base_r_mat <- matrix(rpois(3*5, lambda=1), nrow=3, ncol=5) +rownames(base_r_mat) <- paste0("row", 1:3) +base_r_mat + +mat <- as(base_r_mat, "IterableMatrix") + +tcrossprod_dense(mat) + +all.equal(tcrossprod_dense(mat), tcrossprod(base_r_mat)) +all.equal(crossprod_dense(t(mat)), crossprod(t(base_r_mat))) +all.equal(cov_dense(t(mat)), cov(t(base_r_mat))) +all.equal(cor_dense(t(mat)), cor(t(base_r_mat))) +} diff --git a/r/pkgdown/_pkgdown.yml b/r/pkgdown/_pkgdown.yml index 1d4c6026..e9ff8a7a 100644 --- a/r/pkgdown/_pkgdown.yml +++ b/r/pkgdown/_pkgdown.yml @@ -139,6 +139,7 @@ reference: - checksum - apply_by_row - regress_out + - crossprod_dense - IterableMatrix-methods - pseudobulk_matrix diff --git a/r/src/Makevars.in b/r/src/Makevars.in index c816cc37..98d8b97f 100644 --- a/r/src/Makevars.in +++ b/r/src/Makevars.in @@ -53,6 +53,7 @@ bpcells-cpp/matrixTransforms/Round.o \ bpcells-cpp/matrixTransforms/SCTransform.o \ bpcells-cpp/matrixTransforms/Scale.o \ bpcells-cpp/matrixTransforms/Shift.o \ +bpcells-cpp/matrixUtils/DenseTransposeMultiply.o \ bpcells-cpp/matrixUtils/WilcoxonRankSum.o \ bpcells-cpp/matrixUtils/Pseudobulk.o \ bpcells-cpp/matrixUtils/Quantile.o \ diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 697d8b3f..72ad315e 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2510,6 +2510,19 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// dense_transpose_multiply_cpp +Eigen::MatrixXd dense_transpose_multiply_cpp(SEXP matrix, size_t buffer_bytes, size_t threads); +RcppExport SEXP _BPCells_dense_transpose_multiply_cpp(SEXP matrixSEXP, SEXP buffer_bytesSEXP, SEXP threadsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< SEXP >::type matrix(matrixSEXP); + Rcpp::traits::input_parameter< size_t >::type buffer_bytes(buffer_bytesSEXP); + Rcpp::traits::input_parameter< size_t >::type threads(threadsSEXP); + rcpp_result_gen = Rcpp::wrap(dense_transpose_multiply_cpp(matrix, buffer_bytes, threads)); + return rcpp_result_gen; +END_RCPP +} // pseudobulk_matrix_cpp List pseudobulk_matrix_cpp(SEXP mat, std::vector cell_groups, std::vector method, bool transpose); RcppExport SEXP _BPCells_pseudobulk_matrix_cpp(SEXP matSEXP, SEXP cell_groupsSEXP, SEXP methodSEXP, SEXP transposeSEXP) { @@ -2742,6 +2755,7 @@ static const R_CallMethodDef CallEntries[] = { {"_BPCells_apply_matrix_double_cpp", (DL_FUNC) &_BPCells_apply_matrix_double_cpp, 3}, {"_BPCells_matrix_max_per_row_cpp", (DL_FUNC) &_BPCells_matrix_max_per_row_cpp, 1}, {"_BPCells_matrix_max_per_col_cpp", (DL_FUNC) &_BPCells_matrix_max_per_col_cpp, 1}, + {"_BPCells_dense_transpose_multiply_cpp", (DL_FUNC) &_BPCells_dense_transpose_multiply_cpp, 3}, {"_BPCells_pseudobulk_matrix_cpp", (DL_FUNC) &_BPCells_pseudobulk_matrix_cpp, 4}, {"_BPCells_matrix_quantile_per_col_cpp", (DL_FUNC) &_BPCells_matrix_quantile_per_col_cpp, 4}, {"_BPCells_matrix_identical_uint32_t_cpp", (DL_FUNC) &_BPCells_matrix_identical_uint32_t_cpp, 2}, diff --git a/r/src/bpcells-cpp/matrixUtils/DenseTransposeMultiply.cpp b/r/src/bpcells-cpp/matrixUtils/DenseTransposeMultiply.cpp new file mode 100644 index 00000000..e455583b --- /dev/null +++ b/r/src/bpcells-cpp/matrixUtils/DenseTransposeMultiply.cpp @@ -0,0 +1,408 @@ +#// Copyright 2025 BPCells contributors +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#include +#include +#include + +#include "DenseTransposeMultiply.h" + +// Overall approach: +// - Load from input to in-memory CSC sparse matrix chunks +// - Multiply each chunk with its transpose and update the dense output +// - Doing an explicit transpose ensures that we have better cache locality +// when updating the output, as we can apply all outputs to a single column at a time +// - Use multi-threading to split up the multiply work, and buffer chunks so that +// data loading can overlap with multiply computations + +// Internal helper classes and functions +namespace { + +/** + * Represent a compressed sparse matrix in memory (CSC or CSR format) + * - val, idx (length >= # non-zeros) + * - ptr (length # cols/rows + 1) + * Unusual features in this code: + * - ptr might temporarily be modified during other algorithms + * - val and idx might have extra items beyond the number of non-zeros, which should be left + * untouched by other algorithms + */ +struct CSparseMat { + std::vector val; + std::vector idx; + std::vector ptr; +}; + +/** + * Initialize the out.ptr vector + * Preconditions: + * - `in` is a valid CSparseMat, though in.idx and in.val might have extra values at the end which + * should be left in place. + * - out.ptr is sized correctly + * Postconditions: + * - out.ptr is a valid pointer index for the transposed compressed storage view + */ +inline void init_transpose_ptr(const CSparseMat &in, CSparseMat &out) { + for (auto &x : out.ptr) { + x = 0; + } + for (size_t i = 0; i < in.ptr.back(); i++) { + out.ptr[in.idx[i] + 1]++; + } + for (size_t i = 1; i < out.ptr.size(); i++) { + out.ptr[i] += out.ptr[i - 1]; + } +} + +/** + * Transpose compressed sparse matrix storage. + * Preconditions: + * - `in` is a valid CSparseMat, though in.idx and in.val might have extra values at the end which + * should be left in place. (same for out.idx and out.val having extra usage beyond the number of + * nonzeros) + * - `out.ptr` has been initialized by `init_transpose_ptr` + * Postconditions: + * - `out` holds a transposed copy of `in` + */ +inline void transpose(const CSparseMat &in, CSparseMat &out) { + if (out.idx.size() < in.ptr.back()) out.idx.resize(in.ptr.back()); + if (out.val.size() < in.ptr.back()) out.val.resize(in.ptr.back()); + + for (size_t c = 0; c < in.ptr.size() - 1; c++) { + for (size_t i = in.ptr[c]; i < in.ptr[c + 1]; i++) { + size_t i_out = out.ptr[in.idx[i]]; + out.val[i_out] = in.val[i]; + out.idx[i_out] = c; + out.ptr[in.idx[i]]++; + } + } + + // Shift row_ptr values to give the correct pointers again + std::memmove( + out.ptr.data() + 1, + out.ptr.data(), + (out.ptr.size() - 1) * sizeof(decltype(out.ptr)::value_type) + ); + out.ptr[0] = 0; +} + +/** + * Perform a subset of the sparse multiply operations + * @param trans Row-major sparse storage of the column chunk + * @param m Col-major sparse storage of the column chunk + * @param ret Output dense results matrix + * @param task_id ID of the task (< n_tasks) + * @param n_tasks How many total tasks the multiply is being split into + * + * To try to balance work among tasks, each task calculates output entries of + * every n_tasks columns. + */ +void chunk_multiply_worker( + const CSparseMat &trans, + const CSparseMat &m, + Eigen::MatrixXd &ret, + size_t task_id, + size_t n_tasks +) { + for (size_t r = task_id; r < trans.ptr.size() - 1; r += n_tasks) { + double *out = &ret(0, r); + for (size_t i = trans.ptr[r]; i < trans.ptr[r + 1]; i++) { + uint32_t c = trans.idx[i]; + double vi = trans.val[i]; + for (size_t j = m.ptr[c]; j < m.ptr[c + 1]; j++) { + if (m.idx[j] >= r) { + out[m.idx[j]] += vi * m.val[j]; + } + } + } + } +} + +/** + * Load CSparseMat chunks from a MatrixLoader + */ +class SparseChunkLoader { + private: + std::unique_ptr> loader_; + const size_t buf_items_; + bool finished_matrix_ = false; + bool finished_column_ = true; + + CSparseMat mat_; + + public: + SparseChunkLoader(std::unique_ptr> &&mat, size_t buf_items) + : loader_(std::move(mat)) + , buf_items_(buf_items) { + + mat_.ptr.push_back(0); + mat_.val.reserve(buf_items); + mat_.idx.reserve(buf_items); + } + + // Return a reference to the CSparseMat chunk + CSparseMat &mat(); + + /** Load a new compressed sparse chunk of data + * @return true if new data was loaded, false if the matrix loader is finished + */ + bool next(); + + // Swap the contents of the internal matrix with another CSparseMat, + // preserving any leftovers at the end of the internal matrix and overwriting `other` + void swap_mat(CSparseMat &other); +}; + +// Allow a leader thread to coordinate with worker threads +class WorkerBarrier { + private: + const size_t workers_; + size_t waiting_ = 0; + size_t epoch_ = 0; + bool finished = false; + std::mutex m_; + std::condition_variable worker_cv_, leader_cv_; + + public: + WorkerBarrier(size_t workers) : workers_(workers) {} + + // Call from leader thread. Resumes execution for all worker threads without blocking + void start_workers() { + { + std::unique_lock lk(m_); + leader_cv_.wait(lk, [this] { return waiting_ == workers_; }); + epoch_++; + waiting_ = 0; + } + worker_cv_.notify_all(); + } + + // Call from leader thread. Blocks until all workers have reached a wait point + void wait_for_workers() { + std::unique_lock lk(m_); + leader_cv_.wait(lk, [this] { return waiting_ == workers_; }); + } + + // Call from a worker thread after performing work. + // Will pause execution until leader thread next calls start_workers() or finish(). + // Returns false once no more work is left to be done (i.e. finish has been called) + bool wait_for_leader() { + std::unique_lock lk(m_); + size_t run = epoch_; + waiting_++; + if (waiting_ == workers_) { + lk.unlock(); + leader_cv_.notify_one(); + lk.lock(); + } + worker_cv_.wait(lk, [this, run] { return epoch_ != run || finished; }); + return !finished; + } + + // Call from leader thread. Signals to worker threads to finish at their next wait point. + void finish() { + { + std::unique_lock lk(m_); + finished = true; + } + worker_cv_.notify_all(); + } + + const size_t workers() { return workers_; } +}; + +} // namespace + +Eigen::MatrixXd dense_transpose_multiply( + std::unique_ptr> &&mat, + size_t buffer_bytes, + size_t threads, + std::atomic *user_interrupt +) { + if (buffer_bytes < 48 * mat->rows()) { + throw std::runtime_error( + "dense_transpose_multiply: buffer_bytes must be at least 24 * mat.rows()" + ); + } + + threads = std::max(threads, (size_t) 1); + + std::vector thread_vec; + thread_vec.reserve(threads - 1); + + Eigen::MatrixXd ret(mat->rows(), mat->rows()); + ret.setZero(); + + size_t buf_items = buffer_bytes / (12 * 4); + if (threads == 1) buf_items *= 2; + + CSparseMat trans, trans_next, m; + trans.ptr.resize(mat->rows() + 1); + trans_next.ptr.resize(mat->rows() + 1); + + SparseChunkLoader l(std::move(mat), buf_items); + CSparseMat &m_next = l.mat(); + + // Handle single-threaded case standalone + if (threads <= 1) { + while (l.next()) { + init_transpose_ptr(m_next, trans_next); + transpose(m_next, trans_next); + transpose(trans_next, m_next); + chunk_multiply_worker(trans_next, m_next, ret, 0, 1); + if (user_interrupt != NULL && *user_interrupt) break; + } + + ret = ret.selfadjointView(); + return ret; + } + + // Set up multiplication worker threads: + // pull from a shared "queue" with more tasks than threads in case of uneven task lengths + WorkerBarrier b(threads - 1); + const size_t tasks = std::min(threads * 10, trans.ptr.size() - 1); + std::atomic task_id(0); + auto run_worker = [&]() { + while (b.wait_for_leader()) { + while (true) { + size_t id = task_id.fetch_add(1); + if (id >= tasks) break; + chunk_multiply_worker(trans, m, ret, id, tasks); + } + } + }; + for (size_t i = 0; i < threads - 1; i++) { + thread_vec.emplace_back(run_worker); + } + + // Load first batch of data + bool finished = !l.next(); + init_transpose_ptr(m_next, trans_next); + transpose(m_next, trans_next); + // Second transpose ensures are column-major chunk has nonzeros orderd by row, which is common + // but not guaranteed straight from the MatrixLoader interface. + transpose(trans_next, m_next); + + while (!finished) { + l.swap_mat(m); + std::swap(trans_next, trans); + + // Run workers while loading next data + task_id = 0; + b.start_workers(); + + finished = !l.next(); + init_transpose_ptr(m_next, trans_next); + transpose(m_next, trans_next); + transpose(trans_next, m_next); // See above comment for why second transpose is needed + + // If the main thread has finished a load before the worker threads + // are done, then do some chunk_multiply work tasks + while (true) { + size_t id = task_id.fetch_add(1); + if (id >= tasks) break; + chunk_multiply_worker(trans, m, ret, id, tasks); + } + + if (user_interrupt != NULL && *user_interrupt) break; + b.wait_for_workers(); + } + b.wait_for_workers(); + l.swap_mat(m); + std::swap(trans_next, trans); + b.start_workers(); + b.wait_for_workers(); + b.finish(); + + for (auto &t : thread_vec) { + t.join(); + } + + return ret.selfadjointView(); +} + +namespace { + +CSparseMat &SparseChunkLoader::mat() { return mat_; } + +bool SparseChunkLoader::next() { + if (finished_matrix_) return false; + + // Move any partial column data to the front of the buffers + size_t leftover = mat_.val.size() - mat_.ptr.back(); + std::memmove( + mat_.val.data(), + mat_.val.data() + mat_.ptr.back(), + leftover * sizeof(decltype(mat_.val)::value_type) + ); + mat_.val.resize(leftover); + std::memmove( + mat_.idx.data(), + mat_.idx.data() + mat_.ptr.back(), + leftover * sizeof(decltype(mat_.idx)::value_type) + ); + mat_.idx.resize(leftover); + + mat_.ptr.clear(); + mat_.ptr.push_back(0); + + uint32_t first_col = 0; + while (true) { + if (finished_column_ && !loader_->nextCol()) { + finished_matrix_ = true; + return true; + } + + while (!finished_column_ || loader_->load()) { + finished_column_ = true; + uint32_t cap = loader_->capacity(); + if (cap + mat_.val.size() > buf_items_) { + finished_column_ = false; + return true; + } + mat_.val.insert(mat_.val.end(), loader_->valData(), loader_->valData() + cap); + mat_.idx.insert(mat_.idx.end(), loader_->rowData(), loader_->rowData() + cap); + } + finished_column_ = true; + if (mat_.ptr.size() == 1) first_col = loader_->currentCol(); + // In case loader_->nextCol() ever skipped a column (I don't think the current interfaces do + // that, but being safe) + if (loader_->currentCol() + 1 > mat_.ptr.size() + first_col) { + mat_.ptr.insert( + mat_.ptr.end(), (loader_->currentCol() - first_col) - (mat_.ptr.size() - 1), 0 + ); + } + mat_.ptr.push_back(mat_.val.size()); + } +} + +void SparseChunkLoader::swap_mat(CSparseMat &other) { + // Transfer any partial column data from mat_ to other as needed. + size_t leftover = mat_.val.size() - mat_.ptr.back(); + other.val.resize(leftover); + other.idx.resize(leftover); + other.ptr.resize(1); + other.ptr[0] = 0; + std::memmove( + other.val.data(), + mat_.val.data() + mat_.ptr.back(), + leftover * sizeof(decltype(mat_.val)::value_type) + ); + std::memmove( + other.idx.data(), + mat_.idx.data() + mat_.ptr.back(), + leftover * sizeof(decltype(mat_.idx)::value_type) + ); + mat_.val.resize(mat_.ptr.back()); + mat_.idx.resize(mat_.ptr.back()); + + // Perform the swap + std::swap(mat_, other); +} + +} // namespace diff --git a/r/src/bpcells-cpp/matrixUtils/DenseTransposeMultiply.h b/r/src/bpcells-cpp/matrixUtils/DenseTransposeMultiply.h new file mode 100644 index 00000000..fd07d4a4 --- /dev/null +++ b/r/src/bpcells-cpp/matrixUtils/DenseTransposeMultiply.h @@ -0,0 +1,41 @@ +// Copyright 2025 BPCells contributors +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#pragma once + +#include +#include + +#ifndef RCPP_EIGEN +#include +#else +#define RCPP_NO_RTTI +#define RCPP_NO_SUGAR +#include +#endif + +#include "../matrixIterators/MatrixIterator.h" + + +/** + * Calculate mat * t(mat), returning a dense matrix output + * @param mat Unique pointer to the matrix + * @param buffer_bytes Number of bytes to use for buffering matrix columns. Must be at least + * mat.rows()*48. Recommended to be just below the size of the largest CPU cache (several MB) + * @param threads Number of threads to use + * @param user_interrupt Signal for user-requested early termination of computations + * + * This uses a one-pass algorithm to load contiguous column chunks of mat, perform sparse-sparse + * matrix multiplies in memory and update a dense output matrix. + */ +Eigen::MatrixXd dense_transpose_multiply( + std::unique_ptr> &&mat, + size_t buffer_bytes, + size_t threads, + std::atomic *user_interrupt +); \ No newline at end of file diff --git a/r/src/matrix_utils.cpp b/r/src/matrix_utils.cpp index 223feec0..84187568 100644 --- a/r/src/matrix_utils.cpp +++ b/r/src/matrix_utils.cpp @@ -1,5 +1,5 @@ // Copyright 2021 BPCells contributors -// +// // Licensed under the Apache License, Version 2.0 or the MIT license // , at your @@ -22,9 +22,10 @@ #include "bpcells-cpp/matrixIterators/RenameDims.h" #include "bpcells-cpp/matrixIterators/SVD.h" #include "bpcells-cpp/matrixIterators/TSparseMatrixWriter.h" -#include "bpcells-cpp/matrixUtils/WilcoxonRankSum.h" +#include "bpcells-cpp/matrixUtils/DenseTransposeMultiply.h" #include "bpcells-cpp/matrixUtils/Pseudobulk.h" #include "bpcells-cpp/matrixUtils/Quantile.h" +#include "bpcells-cpp/matrixUtils/WilcoxonRankSum.h" #include "bpcells-cpp/matrixIterators/MatrixAccumulators.h" #include "R_array_io.h" #include "R_interrupts.h" @@ -670,6 +671,16 @@ NumericVector matrix_max_per_col_cpp(SEXP matrix) { return Rcpp::wrap(result); } +// Compute x * t(x) as a dense output +// [[Rcpp::export]] +Eigen::MatrixXd dense_transpose_multiply_cpp(SEXP matrix, size_t buffer_bytes, size_t threads) { + return run_with_R_interrupt_check( + &dense_transpose_multiply, + take_unique_xptr>(matrix), + buffer_bytes, + threads + ); +} // [[Rcpp::export]] List pseudobulk_matrix_cpp(SEXP mat, @@ -688,7 +699,6 @@ List pseudobulk_matrix_cpp(SEXP mat, methodFlags = methodFlags | PseudobulkStatsMethod::NonZeros | PseudobulkStatsMethod::Mean | PseudobulkStatsMethod::Variance; } } - PseudobulkStats res = run_with_R_interrupt_check( &pseudobulk_matrix, take_unique_xptr>(mat), diff --git a/r/tests/testthat/test-matrix_stats.R b/r/tests/testthat/test-matrix_stats.R index 5d74f16d..8c9c4c4e 100644 --- a/r/tests/testthat/test-matrix_stats.R +++ b/r/tests/testthat/test-matrix_stats.R @@ -253,4 +253,40 @@ test_that("Matrix row quantiles works with transposed matrices", { m_quantiles <- m0 %>% matrixStats::rowQuantiles(probs = c(0.25, 0.75), type = 7) m_quantiles_bpcells <- rowQuantiles(m1, probs = c(0.25, 0.75), type = 7) expect_equal(m_quantiles, m_quantiles_bpcells) +}) + +test_that("crossprod_dense family works", { + m <- generate_sparse_matrix(20, 10) + rownames(m) <- paste0("row", seq_len(nrow(m))) + colnames(m) <- paste0("col", seq_len(ncol(m))) + m_bp <- as(m, "IterableMatrix") + m <- as.matrix(m) + + row_offset <- runif(nrow(m)) + col_offset <- runif(ncol(m)) + + latent_data <- tibble::tibble(x=runif(ncol(m)), y=rep.int(1, ncol(m))) + m_regress <- regress_out(m_bp, latent_data) + + matrix_cases <- tibble::tribble( + ~base, ~bpcells, + m, m_bp, + m+1, m_bp+1, + add_cols(m, col_offset), add_cols(m_bp, col_offset), + add_rows(m, row_offset), add_rows(m_bp, row_offset), + as.matrix(m_regress), m_regress + ) + for (i in seq_len(nrow(matrix_cases))) { + for (threads in 1:2) { + m <- matrix_cases$base[[i]] + m_bp <- matrix_cases$bpcells[[i]] + expect_equal(tcrossprod(m), tcrossprod_dense(m_bp, threads=threads)) + expect_equal(crossprod(t(m)), crossprod_dense(t(m_bp), threads=threads)) + expect_equal(cor(t(m)), cor_dense(t(m_bp), threads=threads)) + expect_equal(cov(t(m)), cov_dense(t(m_bp), threads=threads)) + expect_equal + } + } + expect_error(crossprod_dense(m_bp)) + expect_error(tcrossprod_dense(t(m_bp))) }) \ No newline at end of file