Skip to content
9 changes: 7 additions & 2 deletions include/hmll/linux/backend/iouring.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,13 @@ static inline int hmll_io_uring_slot_find_available(const struct hmll_iouring_io
{
for (unsigned i = 0; i < HMLL_URING_IOBUSY_WORDS; ++i) {
const int pos = __builtin_ffsll(~iobusy.bits[i]);
if (pos > 0)
return (int)(i * 64) + pos - 1;
if (pos > 0) {
const int slot = (int)(i * 64) + pos - 1;
// Ensure we don't return slots beyond QUEUE_DEPTH
if (slot >= (int)HMLL_URING_QUEUE_DEPTH)
return -1;
return slot;
}
}
return -1;
}
Expand Down
109 changes: 104 additions & 5 deletions lib/linux/backend/iouring.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sched.h>
#include <sys/utsname.h>
#include "hmll/hmll.h"
#include "hmll/memory.h"
Expand All @@ -14,6 +17,61 @@
#include <driver_types.h>
#endif

/* ── NUMA topology helpers ──────────────────────────────────────────── */

/**
* Get the NUMA node for a CUDA device by reading sysfs via the PCI bus ID.
* Returns -1 on failure.
*/
static int hmll_get_gpu_numa_node(const int device_idx)
{
#if defined(__HMLL_CUDA_ENABLED__)
char pci_bus_id[64] = {0};
if (cudaDeviceGetPCIBusId(pci_bus_id, sizeof(pci_bus_id), device_idx) != cudaSuccess)
return -1;

/* Convert to lowercase for sysfs path (CUDA returns uppercase hex) */
for (char *p = pci_bus_id; *p; p++)
*p = (*p >= 'A' && *p <= 'Z') ? (*p + 32) : *p;

char path[256];
snprintf(path, sizeof(path), "/sys/bus/pci/devices/%s/numa_node", pci_bus_id);

FILE *f = fopen(path, "r");
if (!f) return -1;

int node = -1;
if (fscanf(f, "%d", &node) != 1) node = -1;
fclose(f);

return node;
#else
(void)device_idx;
return -1;
#endif
}

/**
* Get the first CPU core on a given NUMA node by parsing sysfs.
* Returns -1 on failure.
*/
static int hmll_get_first_cpu_on_node(const int numa_node)
{
if (numa_node < 0) return -1;

char path[256];
snprintf(path, sizeof(path), "/sys/devices/system/node/node%d/cpulist", numa_node);

FILE *f = fopen(path, "r");
if (!f) return -1;

int first_cpu = -1;
if (fscanf(f, "%d", &first_cpu) != 1) first_cpu = -1;
fclose(f);

return first_cpu;
}

/* ── runtime kernel version detection ───────────────────────────────── */
static inline unsigned hmll_kernel_version_internal(unsigned maj, unsigned min)
{
Expand Down Expand Up @@ -80,6 +138,7 @@ static struct hmll_error hmll_io_uring_register_staging_buffers(
}

unsigned char *arena = hmll_alloc(HMLL_URING_QUEUE_DEPTH * HMLL_URING_BUFFER_SIZE, device, HMLL_MEM_STAGING);

if (!arena) {
ctx->error = HMLL_ERR(HMLL_ERR_ALLOCATION_FAILED);
return ctx->error;
Expand Down Expand Up @@ -139,13 +198,14 @@ static inline void hmll_io_uring_reclaim_slots(
struct hmll_io_uring_cuda_context *dctx = fetcher->device_ctx;
for (size_t i = 0; i < HMLL_URING_QUEUE_DEPTH; ++i) {
struct hmll_io_uring_cuda_context *cd = dctx + i;
if (hmll_io_uring_slot_is_busy(fetcher->iobusy, i)) {
if (cd->state == HMLL_CUDA_STREAM_MEMCPY && cudaEventQuery(cd->done) == cudaSuccess) {
if (hmll_io_uring_slot_is_busy(fetcher->iobusy, i) && cd->state == HMLL_CUDA_STREAM_MEMCPY) {
if (cudaEventQuery(cd->done) == cudaSuccess) {
hmll_io_uring_cuda_stream_set_idle(&cd->state);
hmll_io_uring_slot_set_available(&fetcher->iobusy, cd->slot);
}
}
}

#else
HMLL_UNUSED(fetcher);
HMLL_UNUSED(device);
Expand Down Expand Up @@ -196,6 +256,7 @@ static inline void hmll_io_uring_prep_sqe(
#if defined(__HMLL_CUDA_ENABLED__)
else if (hmll_device_is_cuda(device)) {
struct hmll_io_uring_cuda_context *dctx = fetcher->device_ctx;

dctx[slot].offset = offset;
io_uring_prep_read_fixed(sqe, iofile, fetcher->iovecs[slot].iov_base, len, offset, slot);
io_uring_sqe_set_data(sqe, dctx + slot);
Expand Down Expand Up @@ -296,7 +357,7 @@ static ssize_t hmll_io_uring_fetch_loop(
if (count == 0 && n_inflight > 0) {
struct io_uring_cqe *cqe;
if (unlikely(io_uring_wait_cqe(&fetcher->ioring, &cqe) < 0)) {
ctx->error = HMLL_ERR(HMLL_ERR_IO_ERROR);
ctx->error = HMLL_SYS_ERR(errno);
return -1;
}
cqes[0] = cqe;
Expand Down Expand Up @@ -758,8 +819,19 @@ static struct hmll_error hmll_io_uring_queue_init(
const struct hmll_device device
) {
(void)ctx;

/* Detect NUMA node for the target device and pin SQPOLL thread accordingly */
int numa_node = -1;
int sq_cpu = 0;

if (hmll_device_is_cuda(device)) {
numa_node = hmll_get_gpu_numa_node(device.idx);
int cpu = hmll_get_first_cpu_on_node(numa_node);
if (cpu >= 0) sq_cpu = cpu;
}

struct io_uring_params params = {
.sq_thread_cpu = 0,
.sq_thread_cpu = (unsigned)sq_cpu,
.flags = hmll_io_uring_get_setup_flags(),
.sq_thread_idle = 500
};
Expand All @@ -771,6 +843,33 @@ static struct hmll_error hmll_io_uring_queue_init(
return HMLL_ERR(HMLL_ERR_CUDA_SET_DEVICE_FAILED);
}

/* Pin this thread to the GPU's NUMA node for optimal memory allocation */
if (numa_node >= 0) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
char path[256];
snprintf(path, sizeof(path), "/sys/devices/system/node/node%d/cpulist", numa_node);
FILE *f = fopen(path, "r");
if (f) {
char buf[1024] = {0};
if (fgets(buf, sizeof(buf), f)) {
/* Parse cpulist format: "0-23,48-71" */
char *tok = strtok(buf, ",\n");
while (tok) {
int lo, hi;
if (sscanf(tok, "%d-%d", &lo, &hi) == 2) {
for (int c = lo; c <= hi; c++) CPU_SET(c, &cpuset);
} else if (sscanf(tok, "%d", &lo) == 1) {
CPU_SET(lo, &cpuset);
}
tok = strtok(NULL, ",\n");
}
}
fclose(f);
sched_setaffinity(0, sizeof(cpuset), &cpuset);
}
}

struct hmll_io_uring_cuda_context *data = calloc(HMLL_URING_QUEUE_DEPTH, sizeof(struct hmll_io_uring_cuda_context));
backend->device_ctx = (void *)data;

Expand Down Expand Up @@ -887,7 +986,7 @@ void hmll_io_uring_destroy(void *ptr)
}
}

munmap(backend->iovecs[0].iov_base, HMLL_URING_QUEUE_DEPTH * sizeof(struct iovec));
cudaFreeHost(backend->iovecs[0].iov_base);
free(backend->device_ctx);
backend->device_ctx = NULL;
}
Expand Down
7 changes: 4 additions & 3 deletions lib/rust/hmll/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
);

// Store in an array to ensure proper lifetime
let sources = [source];
let source_size = source.size();
let sources = vec![source];

// Create a weight loader
println!("\nCreating weight loader...");
let mut loader = WeightLoader::new(&sources, Device::Cpu, LoaderKind::Auto)?;
let mut loader = WeightLoader::new(sources, Device::Cpu, LoaderKind::Auto)?;
println!("✓ Loader created successfully");
println!(" Device: {}", loader.device());
println!(" Number of sources: {}", loader.num_sources());

// Fetch some data from the beginning of the file
let fetch_size = end - start;
let actual_fetch_size = fetch_size.min(sources[0].size());
let actual_fetch_size = fetch_size.min(source_size);
println!(
"\nFetching {} bytes ({:.2} MB)...",
actual_fetch_size,
Expand Down
2 changes: 1 addition & 1 deletion lib/rust/hmll/examples/multi_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// Create a weight loader for all sources
println!("\nCreating weight loader for {} sources...", sources.len());
let mut loader = WeightLoader::new(&sources, Device::Cpu, LoaderKind::Auto)?;
let mut loader = WeightLoader::new(sources, Device::Cpu, LoaderKind::Auto)?;
println!("✓ Loader created successfully");

// Display information about each source
Expand Down
77 changes: 45 additions & 32 deletions lib/rust/hmll/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,41 @@ impl From<Range> for ops::Range<usize> {
/// - **Empty**: Zero-length buffer with no memory.
/// - **Owned**: Allocated memory that is freed when the buffer is dropped.
/// - **SourceView**: Zero-copy pointer into mmap'd memory, kept alive via Arc.
pub struct Buffer {
pub struct BufferInner {
buf: hmll_iobuf,
kind: BufferKind,
}

impl Drop for BufferInner {
fn drop(&mut self) {
if let BufferKind::Owned = self.kind {
if !self.buf.ptr.is_null() {
unsafe { hmll_free_buffer(&mut self.buf) };
}
}
// For SourceView: the Arc is dropped automatically, decrementing refcount.
// When the last Arc is dropped, SourceHandle::drop() unmaps the memory.
}
}

// SAFETY: BufferInner is safe to send across threads because:
// - The buffer data is immutable after creation (read-only access)
// - Owned buffers: memory is allocated by hmll, only freed on drop
// - SourceView buffers: memory is mmap'd and kept alive by Arc<SourceHandle>
// - No internal mutation occurs after construction
//
// Callers must NOT mutate data through `as_ptr()` - doing so would be UB.
unsafe impl Send for BufferInner {}
unsafe impl Sync for BufferInner {}

#[derive(Clone)]
pub struct Buffer(Arc<BufferInner>);

impl std::fmt::Debug for Buffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Buffer")
.field("size", &self.buf.size)
.field("ptr", &self.buf.ptr)
.field("size", &self.0.buf.size)
.field("ptr", &self.0.buf.ptr)
.field("device", &self.device())
.field("owned", &self.is_owned())
.finish()
Expand All @@ -112,14 +137,14 @@ impl Buffer {
/// This is useful when you need to represent a zero-length fetch result.
#[inline(always)]
pub fn empty(device: Device) -> Self {
Self {
Self(Arc::new(BufferInner {
buf: hmll_iobuf {
size: 0,
ptr: std::ptr::null_mut(),
device: device.to_raw(),
},
kind: BufferKind::Empty,
}
}))
}

/// Create a new owned buffer from an `hmll_iobuf`.
Expand All @@ -132,10 +157,10 @@ impl Buffer {
/// and that the memory was allocated via hmll allocation functions.
#[inline(always)]
pub(crate) unsafe fn from_raw_owned(buf: hmll_iobuf) -> Self {
Self {
Self(Arc::new(BufferInner {
buf,
kind: BufferKind::Owned,
}
}))
}

/// Create a zero-copy view into mmap'd source memory.
Expand All @@ -153,28 +178,28 @@ impl Buffer {
device: Device,
source_handle: Arc<SourceHandle>,
) -> Self {
Self {
Self(Arc::new(BufferInner {
buf: hmll_iobuf {
size,
ptr,
device: device.to_raw(),
},
kind: BufferKind::SourceView(source_handle),
}
}))
}

/// Get the buffer as a byte slice (CPU only).
#[inline]
pub fn as_slice(&self) -> Option<&[u8]> {
if self.device() == Device::Cpu {
if self.buf.ptr.is_null() || self.buf.size == 0 {
if self.0.buf.ptr.is_null() || self.0.buf.size == 0 {
// Return empty slice for empty/null buffers
Some(&[])
} else {
unsafe {
Some(std::slice::from_raw_parts(
self.buf.ptr as *const u8,
self.buf.size,
self.0.buf.ptr as *const u8,
self.0.buf.size,
))
}
}
Expand All @@ -185,26 +210,26 @@ impl Buffer {

/// Get the size of the buffer in bytes.
#[inline(always)]
pub const fn len(&self) -> usize {
self.buf.size
pub fn len(&self) -> usize {
self.0.buf.size
}

/// Check if the buffer is empty.
#[inline(always)]
pub const fn is_empty(&self) -> bool {
self.buf.size == 0
pub fn is_empty(&self) -> bool {
self.0.buf.size == 0
}

/// Get the device where the buffer is located.
#[inline(always)]
pub fn device(&self) -> Device {
Device::from_raw(self.buf.device)
Device::from_raw(self.0.buf.device)
}

/// Get a raw pointer to the buffer.
#[inline(always)]
pub const fn as_ptr(&self) -> *const u8 {
self.buf.ptr as *const u8
pub fn as_ptr(&self) -> *const u8 {
self.0.buf.ptr as *const u8
}

/// Convert to a Vec (copies data if on CPU).
Expand All @@ -223,18 +248,6 @@ impl Buffer {
/// and are kept alive by an Arc reference to the source.
#[inline(always)]
pub fn is_owned(&self) -> bool {
matches!(self.kind, BufferKind::Owned)
}
}

impl Drop for Buffer {
fn drop(&mut self) {
if let BufferKind::Owned = self.kind {
if !self.buf.ptr.is_null() {
unsafe { hmll_free_buffer(&mut self.buf) };
}
}
// For SourceView: the Arc is dropped automatically, decrementing refcount.
// When the last Arc is dropped, SourceHandle::drop() unmaps the memory.
matches!(self.0.kind, BufferKind::Owned)
}
}
Loading
Loading