Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/include/cuvs/detail/jit_lto/common_fragments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct tag_i8 {};
struct tag_u8 {};
struct tag_filter_none {};
struct tag_filter_bitset {};
struct tag_filter_udf {};

struct tag_bitset_u32 {};

Expand Down
5 changes: 4 additions & 1 deletion cpp/include/cuvs/neighbors/cagra.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ struct search_params : cuvs::neighbors::search_params {
/**
* A parameter indicating the rate of nodes to be filtered-out, when filtering is used.
* The value must be equal to or greater than 0.0 and less than 1.0. Default value is
* negative, in which case the filtering rate is automatically calculated.
* negative, in which case the filtering rate is automatically calculated when possible.
* For `filtering::udf_filter`, CAGRA uses `udf_filter::filtering_rate` when this value is
* negative. If both values are negative, CAGRA assumes 0.0 because a UDF's selectivity cannot be
* inferred from the source string.
*/
float filtering_rate = -1.0;
};
Expand Down
44 changes: 43 additions & 1 deletion cpp/include/cuvs/neighbors/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

#include <memory>
#include <numeric>
#include <string>
#include <type_traits>
#include <utility>

#ifdef __cpp_lib_bitops
#include <bit>
Expand Down Expand Up @@ -495,7 +497,7 @@ namespace filtering {
* @{
*/

enum class FilterType { None, Bitmap, Bitset };
enum class FilterType { None, Bitmap, Bitset, UDF };

struct base_filter {
~base_filter() = default;
Expand Down Expand Up @@ -615,6 +617,46 @@ struct bitset_filter : public base_filter {
void to_csr(raft::resources const& handle, csr_matrix_t& csr);
};

/**
* @brief JIT-LTO user-defined filter predicate.
*
* The source must define a device function named by @c function_name with signature:
*
* @code{.cpp}
* __device__ bool cuvs_filter_udf(uint32_t query_id, source_index_t source_id, void* filter_data);
* @endcode
*
* Return @c true to allow a source vector to appear in the results and @c false to reject it.
* @c filter_data is passed through unchanged and must point to device-accessible memory when the
* UDF dereferences it. CAGRA currently provides @c source_index_t as @c uint32_t in the generated
* JIT fragment.
*/
struct udf_filter : public base_filter {
/** CUDA C++ source containing the device predicate. */
std::string source;
/** Opaque device-accessible pointer passed to the predicate. */
void* filter_data = nullptr;
/** Estimated fraction of rows rejected by the predicate, or negative if unknown. */
float filtering_rate = -1.0f;
/** Device function name to call from the generated CAGRA sample filter. */
std::string function_name = "cuvs_filter_udf";

udf_filter() = default;

explicit udf_filter(std::string source,
void* filter_data = nullptr,
float filtering_rate = -1.0f,
std::string function_name = "cuvs_filter_udf")
: source(std::move(source)),
filter_data(filter_data),
filtering_rate(filtering_rate),
function_name(std::move(function_name))
{
}

FilterType get_filter_type() const override { return FilterType::UDF; }
};

/** @} */ // end group neighbors_filtering

/**
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/neighbors/cagra.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include <rmm/cuda_stream_view.hpp>

#include <algorithm>

namespace cuvs::neighbors::cagra {

// Member function implementations for cagra::index
Expand Down Expand Up @@ -380,6 +382,25 @@ void search(raft::resources const& res,
auto sample_filter_copy = sample_filter;
return search_with_filtering<T, IdxT, decltype(sample_filter_copy), OutputIdxT>(
res, params_copy, idx, queries, neighbors, distances, sample_filter_copy);
} catch (const std::bad_cast&) {
}

try {
auto& sample_filter =
dynamic_cast<const cuvs::neighbors::filtering::udf_filter&>(sample_filter_ref);
search_params params_copy = params;
if (params.filtering_rate < 0.0) {
const float min_filtering_rate = 0.0f;
const float max_filtering_rate = 0.999f;
params_copy.filtering_rate =
sample_filter.filtering_rate < 0.0f
? 0.0f
: std::min(std::max(sample_filter.filtering_rate, min_filtering_rate),
max_filtering_rate);
}
auto sample_filter_copy = sample_filter;
return search_with_filtering<T, IdxT, decltype(sample_filter_copy), OutputIdxT>(
res, params_copy, idx, queries, neighbors, distances, sample_filter_copy);
} catch (const std::bad_cast&) {
RAFT_FAIL("Unsupported sample filter type");
}
Expand Down
256 changes: 256 additions & 0 deletions cpp/src/neighbors/detail/cagra/cagra_filter_payload.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights
* reserved. SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include "../../sample_filter.cuh" // public filter types
#include "../sample_filter_data.cuh"
#include "jit_lto_kernels/cagra_filter_payload.cuh"

#include <raft/core/error.hpp>

#include <cuda_runtime_api.h>

#include <cstddef>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <type_traits>
#include <unordered_map>
#include <vector>

namespace cuvs::neighbors::cagra::detail {

template <typename SourceIndexT>
using cagra_filter_data_storage = ::cuvs::neighbors::detail::bitset_filter_data_t<SourceIndexT>;

template <typename PayloadT>
std::uint64_t cagra_payload_hash(PayloadT const& payload)
{
static_assert(std::is_trivially_copyable_v<PayloadT>);
constexpr std::uint64_t kOffset = 1469598103934665603ull;
constexpr std::uint64_t kPrime = 1099511628211ull;
auto const* bytes = reinterpret_cast<unsigned char const*>(&payload);
std::uint64_t hash = kOffset;
for (std::size_t i = 0; i < sizeof(PayloadT); ++i) {
hash ^= bytes[i];
hash *= kPrime;
}
return hash;
}

template <typename PayloadT>
struct cagra_device_payload_owner {
struct state {
PayloadT host_payload{};
PayloadT* device_payload{nullptr};
cudaStream_t stream{};
cudaEvent_t ready_event{};
int device{-1};
std::mutex mutex;

explicit state(PayloadT payload) : host_payload(payload) {}

~state() noexcept
{
if (device_payload != nullptr) {
RAFT_CUDA_TRY_NO_THROW(cudaFreeAsync(device_payload, stream));
}
if (ready_event != nullptr) { RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(ready_event)); }
}

PayloadT* dev_ptr(cudaStream_t cuda_stream)
{
std::lock_guard<std::mutex> lock(mutex);
if (device_payload == nullptr) {
RAFT_CUDA_TRY(cudaGetDevice(&device));
RAFT_CUDA_TRY(cudaMallocAsync(
reinterpret_cast<void**>(&device_payload), sizeof(PayloadT), cuda_stream));
RAFT_CUDA_TRY(cudaMemcpyAsync(
device_payload, &host_payload, sizeof(PayloadT), cudaMemcpyHostToDevice, cuda_stream));
RAFT_CUDA_TRY(cudaEventCreateWithFlags(&ready_event, cudaEventDisableTiming));
RAFT_CUDA_TRY(cudaEventRecord(ready_event, cuda_stream));
stream = cuda_stream;
Comment on lines +67 to +75
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

file=$(fd -p 'cagra_filter_payload.hpp' | head -n1)

echo '--- state::dev_ptr initialization block ---'
sed -n '64,80p' "$file"

echo
echo '--- cache insertion before initialization ---'
sed -n '116,129p' "$file"

echo
echo '--- fallible CUDA setup calls and nearby cleanup/catch ---'
rg -n 'cudaMallocAsync|cudaMemcpyAsync|cudaEventCreateWithFlags|cudaEventRecord|cudaFreeAsync|catch|device_payload|ready_event' "$file"

Repository: rapidsai/cuvs

Length of output: 2714


🏁 Script executed:

#!/bin/bash
set -euo pipefail

file=$(fd -p 'cagra_filter_payload.hpp' | head -n1)

echo "=== File: $file ==="

echo
echo "---- state struct definition (around lines 40-90) ----"
sed -n '35,95p' "$file"

echo
echo "---- cache insertion (around lines 100-150) ----"
sed -n '95,150p' "$file"

echo
echo "---- get_cagra_device_payload (around lines 150-220) ----"
sed -n '150,230p' "$file"

echo
echo "---- RAFT_CUDA_TRY macro behavior (best-effort grep) ----"
rg -n "define RAFT_CUDA_TRY" -S . | head -n 20 || true

Repository: rapidsai/cuvs

Length of output: 7203


🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n "define RAFT_CUDA_TRY\\(" -S . --hidden --no-ignore-vcs

Repository: rapidsai/cuvs

Length of output: 1948


HIGH: Partially-initialized cached device payload on CUDA failure (missing rollback)

cagra_device_payload_owner::dev_ptr() inserts a new state into cache_[key] before state::dev_ptr() runs. If any state::dev_ptr() CUDA call fails after cudaMallocAsync succeeds (e.g., cudaMemcpyAsync, cudaEventCreateWithFlags, cudaEventRecord), the cached state can persist with device_payload != nullptr while ready_event remains default/invalid. Subsequent calls take the else path and call cudaStreamWaitEvent(cuda_stream, ready_event, 0), and the leaked allocation can persist for process lifetime since the cached entry is never removed.

Suggested fix
     PayloadT* dev_ptr(cudaStream_t cuda_stream)
     {
       std::lock_guard<std::mutex> lock(mutex);
       if (device_payload == nullptr) {
         RAFT_CUDA_TRY(cudaGetDevice(&device));
-        RAFT_CUDA_TRY(cudaMallocAsync(
-          reinterpret_cast<void**>(&device_payload), sizeof(PayloadT), cuda_stream));
-        RAFT_CUDA_TRY(cudaMemcpyAsync(
-          device_payload, &host_payload, sizeof(PayloadT), cudaMemcpyHostToDevice, cuda_stream));
-        RAFT_CUDA_TRY(cudaEventCreateWithFlags(&ready_event, cudaEventDisableTiming));
-        RAFT_CUDA_TRY(cudaEventRecord(ready_event, cuda_stream));
-        stream = cuda_stream;
+        PayloadT* new_device_payload{nullptr};
+        cudaEvent_t new_ready_event{};
+        try {
+          RAFT_CUDA_TRY(cudaMallocAsync(
+            reinterpret_cast<void**>(&new_device_payload), sizeof(PayloadT), cuda_stream));
+          RAFT_CUDA_TRY(cudaMemcpyAsync(new_device_payload,
+                                        &host_payload,
+                                        sizeof(PayloadT),
+                                        cudaMemcpyHostToDevice,
+                                        cuda_stream));
+          RAFT_CUDA_TRY(cudaEventCreateWithFlags(&new_ready_event, cudaEventDisableTiming));
+          RAFT_CUDA_TRY(cudaEventRecord(new_ready_event, cuda_stream));
+          device_payload = new_device_payload;
+          ready_event    = new_ready_event;
+          stream         = cuda_stream;
+        } catch (...) {
+          if (new_ready_event != nullptr) {
+            RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(new_ready_event));
+          }
+          if (new_device_payload != nullptr) {
+            RAFT_CUDA_TRY_NO_THROW(cudaFreeAsync(new_device_payload, cuda_stream));
+          }
+          throw;
+        }
       } else {
         RAFT_CUDA_TRY(cudaStreamWaitEvent(cuda_stream, ready_event, 0));
       }
       return device_payload;
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp/src/neighbors/detail/cagra/cagra_filter_payload.hpp` around lines 67 -
75, cagra_device_payload_owner::dev_ptr() can leave cache_[key] with a
partially-initialized state (device_payload set but ready_event invalid) if any
CUDA call after cudaMallocAsync fails; update state::dev_ptr() to perform
rollback on any CUDA failure: if cudaMallocAsync succeeded then on subsequent
error call cudaFreeAsync (or cudaFree if stream not available) to free
device_payload, destroy any created ready_event via cudaEventDestroy, reset
state members (device_payload = nullptr, ready_event = default/0, stream =
nullptr) and ensure the cache entry is removed or marked invalid (erase
cache_[key] or set a flag) so future calls do not use the leaked pointer and
invalid event; wrap CUDA calls with a try/catch or error-check path to execute
this cleanup whenever a RAFT_CUDA_TRY-like check reports failure.

} else {
RAFT_CUDA_TRY(cudaStreamWaitEvent(cuda_stream, ready_event, 0));
}
return device_payload;
}
};

// PayloadT is copied to device by value. Pointer fields inside PayloadT are shallow-copied and
// must already point to device-addressable memory that remains valid while the cached payload is
// usable.
struct cache_key {
std::uint64_t payload_hash{};
int device{};

bool operator==(cache_key const& other) const
{
return payload_hash == other.payload_hash && device == other.device;
}
};

struct cache_key_hash {
std::size_t operator()(cache_key const& key) const
{
auto seed = static_cast<std::size_t>(key.payload_hash);
seed ^= static_cast<std::size_t>(key.device) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
return seed;
}
};

cagra_device_payload_owner() = default;

void* dev_ptr(PayloadT payload, cudaStream_t stream) const
{
int device{};
RAFT_CUDA_TRY(cudaGetDevice(&device));

// Keep cached payload copies for process lifetime to avoid per-search allocation/copy churn.
// Cross-stream reuse is ordered by each state's ready_event before kernels consume the pointer.
const auto key = cache_key{cagra_payload_hash(payload), device};
std::shared_ptr<state> selected_state;
{
std::lock_guard<std::mutex> lock(cache_mutex_);
auto& entries = cache_[key];
for (auto const& cached : entries) {
if (std::memcmp(&cached->host_payload, &payload, sizeof(PayloadT)) == 0) {
selected_state = cached;
break;
}
}
if (selected_state == nullptr) {
selected_state = std::make_shared<state>(payload);
entries.push_back(selected_state);
}
}

return selected_state->dev_ptr(stream);
}

private:
mutable std::mutex cache_mutex_;
mutable std::unordered_map<cache_key, std::vector<std::shared_ptr<state>>, cache_key_hash> cache_;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: state does not need to be a shared_ptr anymore as the owner itself is static

};

template <typename T>
struct is_bitset_filter : std::false_type {};

template <typename bitset_t, typename index_t>
struct is_bitset_filter<::cuvs::neighbors::filtering::bitset_filter<bitset_t, index_t>>
: std::true_type {};

template <typename T>
struct is_udf_filter : std::false_type {};

template <>
struct is_udf_filter<::cuvs::neighbors::filtering::udf_filter> : std::true_type {};

template <typename SourceIndexT, typename FilterT>
cagra_filter_data_storage<SourceIndexT> make_cagra_filter_data_storage(const FilterT& filter)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok to address later: name this make_cagra_bitset_filter_storage so we can add factories for other pre-compiled types and don't need the specific alias anymore

{
const auto bitset_view = filter.view();
return cagra_filter_data_storage<SourceIndexT>{
const_cast<std::uint32_t*>(bitset_view.data()),
static_cast<SourceIndexT>(bitset_view.size()),
static_cast<SourceIndexT>(bitset_view.get_original_nbits())};
}

template <typename PayloadT>
void* get_cagra_device_payload(PayloadT payload, cudaStream_t stream)
{
static cagra_device_payload_owner<PayloadT> owner;
return owner.dev_ptr(payload, stream);
}
Comment on lines +162 to +167
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this in the factory? So the factory returns a perfectly valid device payload directly


template <typename SourceIndexT, typename FilterT>
void fill_cagra_sample_filter(cagra_sample_filter<SourceIndexT>& out,
const FilterT& filter,
cudaStream_t stream)
{
using DecayedFilter = std::decay_t<FilterT>;
if constexpr (is_bitset_filter<DecayedFilter>::value) {
out.filter_data = get_cagra_device_payload(make_cagra_filter_data_storage<SourceIndexT>(filter),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not need to call here then, going by above comment

stream);
} else if constexpr (is_udf_filter<DecayedFilter>::value) {
out.filter_data = filter.filter_data;
}
}

template <typename SourceIndexT, typename FilterT>
std::uint64_t cagra_filter_payload_hash(const FilterT& filter)
{
using DecayedFilter = std::decay_t<FilterT>;
if constexpr (is_bitset_filter<DecayedFilter>::value) {
return cagra_payload_hash(make_cagra_filter_data_storage<SourceIndexT>(filter));
} else if constexpr (requires { filter.filter; }) {
return cagra_filter_payload_hash<SourceIndexT>(filter.filter);
} else {
return 0;
}
}

template <typename FilterT>
void* cagra_filter_data_ptr(const FilterT& filter)
{
using DecayedFilter = std::decay_t<FilterT>;
if constexpr (is_udf_filter<DecayedFilter>::value) {
return filter.filter_data;
} else if constexpr (requires { filter.filter; }) {
return cagra_filter_data_ptr(filter.filter);
} else {
return nullptr;
}
}

template <typename SampleFilterT>
std::uint32_t cagra_filter_query_id_offset(const SampleFilterT& sample_filter)
{
if constexpr (requires {
sample_filter.filter;
sample_filter.offset;
}) {
return sample_filter.offset;
} else {
return 0;
}
}

/// Host: fill @ref cagra_sample_filter from a CAGRA filter object.
template <typename SourceIndexT, typename SampleFilterT>
cagra_sample_filter<SourceIndexT> extract_cagra_sample_filter(const SampleFilterT& sample_filter,
cudaStream_t stream)
{
cagra_sample_filter<SourceIndexT> out;
if constexpr (requires {
sample_filter.filter;
sample_filter.offset;
}) {
out.query_id_offset = sample_filter.offset;
fill_cagra_sample_filter(out, sample_filter.filter, stream);
} else {
fill_cagra_sample_filter(out, sample_filter, stream);
}
return out;
}

/// Host: find UDF compile/link metadata only. Query offsets stay in the runtime payload produced
/// by @ref extract_cagra_sample_filter and are applied before calling the linked sample_filter.
template <typename SampleFilterT>
const ::cuvs::neighbors::filtering::udf_filter* get_cagra_udf_filter(
const SampleFilterT& sample_filter)
{
using DecayedFilter = std::decay_t<SampleFilterT>;
if constexpr (is_udf_filter<DecayedFilter>::value) {
return &sample_filter;
} else if constexpr (requires { sample_filter.filter; }) {
return get_cagra_udf_filter(sample_filter.filter);
} else {
return nullptr;
}
}

} // namespace cuvs::neighbors::cagra::detail
Loading
Loading