diff --git a/software/libstf/memory_pool.cpp b/software/libstf/memory_pool.cpp index 6db0a31..0e7c0e3 100644 --- a/software/libstf/memory_pool.cpp +++ b/software/libstf/memory_pool.cpp @@ -1,4 +1,5 @@ #include "libstf/memory_pool.hpp" +#include "memory_pool.hpp" #include #include @@ -12,27 +13,30 @@ namespace libstf { // ---------------------------------------------------------------------------- std::string mallctl_error_to_string(int error) { - switch (error) { - case EINVAL: - return "The alignment parameter is not a power of 2 at least as large as sizeof(void " - "*)."; - case ENOENT: - return "Name or mib specifies an unknown/invalid value."; - case EPERM: - return "Attempt to read or write void value, or attempt to write read-only value."; - case EAGAIN: - return "A memory allocation failure occurred."; - case EFAULT: - return "EFAULT occurred"; - default: - return "Unknown error."; - } + switch (error) { + case EINVAL: + return "The alignment parameter is not a power of 2 at least as large as " + "sizeof(void " + "*)."; + case ENOENT: + return "Name or mib specifies an unknown/invalid value."; + case EPERM: + return "Attempt to read or write void value, or attempt to write read-only " + "value."; + case EAGAIN: + return "A memory allocation failure occurred."; + case EFAULT: + return "EFAULT occurred"; + default: + return "Unknown error."; + } } void check_mallctl_success(int error, std::string msg) { - if (error != 0) { - throw std::runtime_error(msg + " Got error: " + mallctl_error_to_string(error)); - } + if (error != 0) { + throw std::runtime_error(msg + + " Got error: " + mallctl_error_to_string(error)); + } } /** @@ -40,8 +44,9 @@ void check_mallctl_success(int error, std::string msg) { * @param control_name */ void je_mallctl_do(std::string control_name) { - check_mallctl_success(je_mallctl(control_name.c_str(), nullptr, nullptr, nullptr, 0), - "Failed to execute mallctl control " + control_name + "."); + check_mallctl_success( + je_mallctl(control_name.c_str(), nullptr, nullptr, nullptr, 0), + "Failed to execute mallctl control " + control_name + "."); } /** @@ -50,10 +55,11 @@ void je_mallctl_do(std::string control_name) { * @param control_name The name of the control to write to * @param value A pointer to the value to write! */ -template +template void je_mallctl_write(std::string control_name, A *value) { - check_mallctl_success(je_mallctl(control_name.c_str(), nullptr, nullptr, value, sizeof(A)), - "Failed to write to mallctl control " + control_name + "."); + check_mallctl_success( + je_mallctl(control_name.c_str(), nullptr, nullptr, value, sizeof(A)), + "Failed to write to mallctl control " + control_name + "."); } /** @@ -61,13 +67,13 @@ void je_mallctl_write(std::string control_name, A *value) { * @param control_name The name of the control to read from * @return A value of type A as returned by the given jemalloc control */ -template -A je_mallctl_read(std::string control_name) { - A value; - auto size = sizeof(A); - check_mallctl_success(je_mallctl(control_name.c_str(), &value, &size, nullptr, 0), - "Failed to read from mallctl control " + control_name + "."); - return std::move(value); +template A je_mallctl_read(std::string control_name) { + A value; + auto size = sizeof(A); + check_mallctl_success( + je_mallctl(control_name.c_str(), &value, &size, nullptr, 0), + "Failed to read from mallctl control " + control_name + "."); + return std::move(value); } /** @@ -77,14 +83,15 @@ A je_mallctl_read(std::string control_name) { * @param write_value The value to write * @return Reads & Writes from the given jemalloc control name */ -template +template A je_mallctl_read_write(std::string control_name, B *write_value) { - A read_value; - auto size = sizeof(A); - check_mallctl_success( - je_mallctl(control_name.c_str(), &read_value, &size, write_value, sizeof(B)), - "Failed to read/write from mallctl control " + control_name + "."); - return std::move(read_value); + A read_value; + auto size = sizeof(A); + check_mallctl_success(je_mallctl(control_name.c_str(), &read_value, &size, + write_value, sizeof(B)), + "Failed to read/write from mallctl control " + + control_name + "."); + return std::move(read_value); } // ---------------------------------------------------------------------------- @@ -93,377 +100,423 @@ A je_mallctl_read_write(std::string control_name, B *write_value) { // Assign a value to the zero_size_data while making sure its aligned according // to the default alignment. -alignas(HugePageMemoryPool::DEFAULT_ALIGNMENT) int64_t zero_size_data[1] = {0xFFFFULL}; +alignas(HugePageMemoryPool::DEFAULT_ALIGNMENT) int64_t zero_size_data[1] = { + 0xFFFFULL}; HugePageMemoryPool::HugePageMemoryPool() { - // Set some default options for jemalloc - // Immediately reuse pages - je_mallctl_write("arenas.dirty_decay_ms", 0); - - // Pre-allocate all 1 Gib huge pages available in the system - auto num_huge_pages = get_number_of_available_huge_pages(); - next_free_addr = mmap(nullptr, - num_huge_pages * PAGE_SIZE, - PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB | HUGE_PAGE_TYPE, - -1, - 0); - if (next_free_addr == MAP_FAILED) { - throw std::runtime_error( - "Could not allocated the expected number of 1GiB huge pages for HugePageMemoryPool."); - } - initial_address_ = next_free_addr; - total_capacity_ = num_huge_pages * PAGE_SIZE; - remaining_capacity = num_huge_pages * PAGE_SIZE; - - // Create a new jemalloc arena with customs hook - // -> The hook will request chunks of the 1GiB pages we pre-allocated - auto hooks = &hugepage_hooks; - arena_index = je_mallctl_read_write("arenas.create", &hooks); + // Set some default options for jemalloc + // Immediately reuse pages + je_mallctl_write("arenas.dirty_decay_ms", 0); + + // Pre-allocate all 1 Gib huge pages available in the system + auto num_huge_pages = get_number_of_available_huge_pages(); + next_free_addr = + mmap(nullptr, num_huge_pages * PAGE_SIZE, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB | HUGE_PAGE_TYPE, -1, 0); + if (next_free_addr == MAP_FAILED) { + throw std::runtime_error("Could not allocated the expected number of 1GiB " + "huge pages for HugePageMemoryPool."); + } + initial_address_ = next_free_addr; + total_capacity_ = num_huge_pages * PAGE_SIZE; + remaining_capacity = num_huge_pages * PAGE_SIZE; + + // Create a new jemalloc arena with customs hook + // -> The hook will request chunks of the 1GiB pages we pre-allocated + auto hooks = &hugepage_hooks; + arena_index = je_mallctl_read_write( + "arenas.create", &hooks); } HugePageMemoryPool::~HugePageMemoryPool() { - // Destroy all the tcaches we crated - // This is REQUIRED for destroying the arena and otherwise will lead to a SEGFAULT - for (auto tcache : tcache_ids) { - je_mallctl_write("tcache.destroy", &tcache.second); - } - - // It can take some time for the tcaches to be cleaned up. - // Unfortunately, the operation is not blocking. - // And there is no way to ask if the destruction has finished. - // This is really stupid. To prevent seg faults during termination, - // we sleep a fixed time here in the hope that after this, the tcache are destroyed. - std::this_thread::sleep_for(100ms); - - // Destroy the arena we created - std::ostringstream arena; - arena << "arena." << arena_index << ".destroy"; - je_mallctl_do(arena.str()); - - // Unmap all the pre-allocated memory - if (munmap(initial_address_, total_capacity_) == -1) { - std::cerr << "HugePageMemoryPool: Could not munmap the obtained huge page mappings." - << std::endl; - } + // Destroy all the tcaches we crated + // This is REQUIRED for destroying the arena and otherwise will lead to a + // SEGFAULT + for (auto tcache : tcache_ids) { + je_mallctl_write("tcache.destroy", &tcache.second); + } + + // It can take some time for the tcaches to be cleaned up. + // Unfortunately, the operation is not blocking. + // And there is no way to ask if the destruction has finished. + // This is really stupid. To prevent seg faults during termination, + // we sleep a fixed time here in the hope that after this, the tcache are + // destroyed. + std::this_thread::sleep_for(100ms); + + // Destroy the arena we created + std::ostringstream arena; + arena << "arena." << arena_index << ".destroy"; + je_mallctl_do(arena.str()); + + // Unmap all the pre-allocated memory + if (munmap(initial_address_, total_capacity_) == -1) { + std::cerr << "HugePageMemoryPool: Could not munmap the obtained huge page " + "mappings." + << std::endl; + } } int HugePageMemoryPool::get_number_of_available_huge_pages() { - std::ostringstream path; - path << "/sys/kernel/mm/hugepages/hugepages-" << PAGE_SIZE_KB << "kB/free_hugepages"; - std::ifstream file(path.str()); - - if (!file.is_open()) { - throw std::runtime_error( - "It seems the target system does not have 1GiB huge pages enabled, which are required " - "for FPGA support. Please enable 1GiB huge pages in your system."); - } - - // Read out the number of free pages - int free_pages = 0; - file >> free_pages; - file.close(); - if (free_pages == 0) { - throw std::runtime_error( - "Your system has 0 free 1GiB huge pages. The FPGA support requires 1GiB huge pages. " - "Please enable additional 1GiB huge pages as described in the Maximus readme."); - } - return free_pages; + std::ostringstream path; + path << "/sys/kernel/mm/hugepages/hugepages-" << PAGE_SIZE_KB + << "kB/free_hugepages"; + std::ifstream file(path.str()); + + if (!file.is_open()) { + throw std::runtime_error( + "It seems the target system does not have 1GiB huge pages enabled, " + "which are required " + "for FPGA support. Please enable 1GiB huge pages in your system."); + } + + // Read out the number of free pages + int free_pages = 0; + file >> free_pages; + file.close(); + if (free_pages == 0) { + throw std::runtime_error("Your system has 0 free 1GiB huge pages. The FPGA " + "support requires 1GiB huge pages. " + "Please enable additional 1GiB huge pages as " + "described in the Maximus readme."); + } + return free_pages; } unsigned HugePageMemoryPool::get_tcache_id_for_calling_thread() { - // Jemalloc automatically allocates a so-called tcache for the calling - // thread of mallocx, rallocx, or dallocx calls. - // The Problem is that when we want to destroy the arena, jemalloc requires us to first, - // destroy all the (automatically created) tcaches. - // Since they are automatically managed, and we don't know how many threads called our memory - // pool, we have NO WAY of destroying them all... - // -> We need to manually manage them. That way, we know all created tcaches - // and can destroy them - ReadLock r_lock(tcache_lock); - auto thread_id = std::this_thread::get_id(); - auto exists = tcache_ids.find(thread_id); - if (exists != tcache_ids.end()) { - r_lock.unlock(); - return exists->second; - } - - // If we get here, the calling thread does not yet have a tcache. Create one! + // Jemalloc automatically allocates a so-called tcache for the calling + // thread of mallocx, rallocx, or dallocx calls. + // The Problem is that when we want to destroy the arena, jemalloc requires us + // to first, destroy all the (automatically created) tcaches. Since they are + // automatically managed, and we don't know how many threads called our memory + // pool, we have NO WAY of destroying them all... + // -> We need to manually manage them. That way, we know all created tcaches + // and can destroy them + ReadLock r_lock(tcache_lock); + auto thread_id = std::this_thread::get_id(); + auto exists = tcache_ids.find(thread_id); + if (exists != tcache_ids.end()) { r_lock.unlock(); - WriteLock w_lock(tcache_lock); - unsigned tcache_id = je_mallctl_read("tcache.create"); - tcache_ids.insert(std::make_pair(thread_id, tcache_id)); - w_lock.unlock(); - return tcache_id; + return exists->second; + } + + // If we get here, the calling thread does not yet have a tcache. Create one! + r_lock.unlock(); + WriteLock w_lock(tcache_lock); + unsigned tcache_id = je_mallctl_read("tcache.create"); + tcache_ids.insert(std::make_pair(thread_id, tcache_id)); + w_lock.unlock(); + return tcache_id; } /** * Checks if the given buffer at `buf` is within the provided `bounds`. * Both the buffer and the bounds are encoded as a pair (base_addr, size). */ -inline bool is_buffer_within_bounds(std::pair bounds, std::pair buf) { - auto buf_start = static_cast(std::get<0>(buf)); - auto buf_end = buf_start + std::get<1>(buf); - auto bounds_start = static_cast(std::get<0>(bounds)); - auto bounds_end = bounds_start + std::get<1>(bounds); - - return buf_start >= bounds_start && buf_end <= bounds_end; +inline bool is_buffer_within_bounds(std::pair bounds, + std::pair buf) { + auto buf_start = static_cast(std::get<0>(buf)); + auto buf_end = buf_start + std::get<1>(buf); + auto bounds_start = static_cast(std::get<0>(bounds)); + auto bounds_end = bounds_start + std::get<1>(bounds); + + return buf_start >= bounds_start && buf_end <= bounds_end; } bool HugePageMemoryPool::is_in_bounds(void *ptr, size_t size) { - return is_buffer_within_bounds({initial_address_, total_capacity_}, {ptr, size}); + return is_buffer_within_bounds({initial_address_, total_capacity_}, + {ptr, size}); } -std::pair HugePageMemoryPool::get_page_boundaries(const void *ptr) { - if (!is_buffer_within_bounds({initial_address_, total_capacity_}, {const_cast(ptr), 0})) { - std::ostringstream err; - err << "The Provided address " << static_cast(ptr) - << " is not within the bounds of the HugePageMemoryPool"; - throw std::runtime_error(err.str()); - } - - auto initial = reinterpret_cast(const_cast(initial_address_)); - auto buf = reinterpret_cast(const_cast(ptr)); - auto n_th_page = (buf - initial) / PAGE_SIZE; - return std::make_pair(static_cast(initial + n_th_page * PAGE_SIZE), PAGE_SIZE); +std::pair +HugePageMemoryPool::get_page_boundaries(const void *ptr) { + if (!is_buffer_within_bounds({initial_address_, total_capacity_}, + {const_cast(ptr), 0})) { + std::ostringstream err; + err << "The Provided address " << static_cast(ptr) + << " is not within the bounds of the HugePageMemoryPool"; + throw std::runtime_error(err.str()); + } + + auto initial = + reinterpret_cast(const_cast(initial_address_)); + auto buf = reinterpret_cast(const_cast(ptr)); + auto n_th_page = (buf - initial) / PAGE_SIZE; + return std::make_pair(static_cast(initial + n_th_page * PAGE_SIZE), + PAGE_SIZE); } Status HugePageMemoryPool::allocate(size_t size, size_t alignment, void **out) { - if (size == 0) { - *out = ZeroSizePointer; - } else { - // Ensure the alignment is a power of two. See: https://stackoverflow.com/a/108360/5589776 - assert((alignment & (alignment - 1)) == 0); - auto tc = get_tcache_id_for_calling_thread(); - *out = je_mallocx( - size, MALLOCX_ALIGN(alignment) | MALLOCX_ARENA(arena_index) | MALLOCX_TCACHE(tc)); - if (*out == nullptr) { - return Status(StatusCode::OutOfMemory, - "HugePageMemoryPool is unable to allocate memory!"); - } - total_bytes_allocated_ += size; - bytes_allocated_ += size; - } - num_allocs_ += 1; - - return Status::OK(); -} - -Status HugePageMemoryPool::reallocate(size_t old_size, size_t new_size, size_t alignment, void **ptr) { - // We want to allocate from an existing zero allocation - if (*ptr == ZeroSizePointer) { - assert(old_size == 0); - return allocate(new_size, alignment, ptr); - } - // We want to decrease the new size to 0 - if (new_size == 0) { - free(*ptr, old_size, alignment); - *ptr = ZeroSizePointer; - return Status::OK(); - } - - // Ensure the alignment is a power of two. See: https://stackoverflow.com/a/108360/5589776 + if (size == 0) { + *out = ZeroSizePointer; + } else { + // Ensure the alignment is a power of two. See: + // https://stackoverflow.com/a/108360/5589776 assert((alignment & (alignment - 1)) == 0); auto tc = get_tcache_id_for_calling_thread(); - // Normal Re-allocation with jemalloc (which cannot handle size = 0) - *ptr = je_rallocx(*ptr, new_size, - MALLOCX_ALIGN(alignment) | MALLOCX_ARENA(arena_index) | MALLOCX_TCACHE(tc)); - - if (*ptr == nullptr) { - return Status(StatusCode::OutOfMemory, - "HugePageMemoryPool could not Reallocate as requested!"); + *out = + je_mallocx(size, MALLOCX_ALIGN(alignment) | MALLOCX_ARENA(arena_index) | + MALLOCX_TCACHE(tc)); + if (*out == nullptr) { + return Status(StatusCode::OutOfMemory, + "HugePageMemoryPool is unable to allocate memory!"); } + total_bytes_allocated_ += size; + bytes_allocated_ += size; + } + num_allocs_ += 1; - auto n_new_bytes = (new_size - old_size); - if (n_new_bytes >= 0) { - total_bytes_allocated_ += n_new_bytes; - } - bytes_allocated_ += n_new_bytes; + return Status::OK(); +} +Status HugePageMemoryPool::reallocate(size_t old_size, size_t new_size, + size_t alignment, void **ptr) { + // We want to allocate from an existing zero allocation + if (*ptr == ZeroSizePointer) { + assert(old_size == 0); + return allocate(new_size, alignment, ptr); + } + // We want to decrease the new size to 0 + if (new_size == 0) { + free(*ptr, old_size, alignment); + *ptr = ZeroSizePointer; return Status::OK(); + } + + // Ensure the alignment is a power of two. See: + // https://stackoverflow.com/a/108360/5589776 + assert((alignment & (alignment - 1)) == 0); + auto tc = get_tcache_id_for_calling_thread(); + // Normal Re-allocation with jemalloc (which cannot handle size = 0) + *ptr = je_rallocx(*ptr, new_size, + MALLOCX_ALIGN(alignment) | MALLOCX_ARENA(arena_index) | + MALLOCX_TCACHE(tc)); + + if (*ptr == nullptr) { + return Status(StatusCode::OutOfMemory, + "HugePageMemoryPool could not Reallocate as requested!"); + } + + auto n_new_bytes = (new_size - old_size); + if (n_new_bytes >= 0) { + total_bytes_allocated_ += n_new_bytes; + } + bytes_allocated_ += n_new_bytes; + + return Status::OK(); } void HugePageMemoryPool::free(void *ptr, size_t size, size_t alignment) { - if (ptr == ZeroSizePointer) { - assert(size == 0); - } else { - bytes_allocated_ -= size; - num_allocs_ -= 1; - auto tc = get_tcache_id_for_calling_thread(); - je_dallocx(ptr, MALLOCX_ARENA(arena_index) | MALLOCX_TCACHE(tc)); - } + if (ptr == ZeroSizePointer) { + assert(size == 0); + } else { + bytes_allocated_ -= size; + num_allocs_ -= 1; + auto tc = get_tcache_id_for_calling_thread(); + je_dallocx(ptr, MALLOCX_ARENA(arena_index) | MALLOCX_TCACHE(tc)); + } } -void *HugePageMemoryPool::huge_page_alloc(extent_hooks_t *hooks, - void *new_addr, - size_t size, - size_t alignment, - bool *zero, - bool *commit, +void *HugePageMemoryPool::huge_page_alloc(extent_hooks_t *hooks, void *new_addr, + size_t size, size_t alignment, + bool *zero, bool *commit, unsigned arena_ind) { - // When new_addr is != null, the man page says to return new_addr. - // -> Unclear what the intended behavior is (not documented) - // -> Ensure it is never NULL since we don't handle that case... - assert(new_addr == nullptr); - - allocations_mutex_.lock(); - - // Check if there is enough space to fit the requested size with alignment - auto aligned_address = std::align( - alignment, - size, - next_free_addr, - // Note: std::align decreases remaining_capacity automatically by the alignment bytes! - remaining_capacity); - - // There was not enough space remaining - if (aligned_address == nullptr) { - allocations_mutex_.unlock(); - std::cerr << "HugePageMemoryPool: Not enough huge page memory remaining to satisfy " - "jemalloc request over " - << size << " bytes. Please add additional 1GiB huge pages to your system." - << std::endl; - return nullptr; - } - - // There was enough space: Update values - remaining_capacity -= size; - next_free_addr = static_cast(static_cast(aligned_address) + size); - - // MAP_ANONYMOUS always ensures zero-ing of the memory and only returned commit memory - *zero = true; - *commit = true; + // When new_addr is != null, the man page says to return new_addr. + // -> Unclear what the intended behavior is (not documented) + // -> Ensure it is never NULL since we don't handle that case... + assert(new_addr == nullptr); + + allocations_mutex_.lock(); + + // Check if there is enough space to fit the requested size with alignment + auto aligned_address = + std::align(alignment, size, next_free_addr, + // Note: std::align decreases remaining_capacity automatically + // by the alignment bytes! + remaining_capacity); + + // There was not enough space remaining + if (aligned_address == nullptr) { allocations_mutex_.unlock(); - return aligned_address; + std::cerr << "HugePageMemoryPool: Not enough huge page memory remaining to " + "satisfy " + "jemalloc request over " + << size + << " bytes. Please add additional 1GiB huge pages to your system." + << std::endl; + return nullptr; + } + + // There was enough space: Update values + remaining_capacity -= size; + next_free_addr = + static_cast(static_cast(aligned_address) + size); + + // MAP_ANONYMOUS always ensures zero-ing of the memory and only returned + // commit memory + *zero = true; + *commit = true; + allocations_mutex_.unlock(); + return aligned_address; } -bool HugePageMemoryPool::huge_page_dealloc( - extent_hooks_t *hooks, void *addr, size_t size, bool committed, unsigned arena_ind) { - // True = opt out from deallocation and retain the memory for future use - return true; +bool HugePageMemoryPool::huge_page_dealloc(extent_hooks_t *hooks, void *addr, + size_t size, bool committed, + unsigned arena_ind) { + // True = opt out from deallocation and retain the memory for future use + return true; } -bool HugePageMemoryPool::huge_page_decommit(extent_hooks_t *hooks, - void *addr, - size_t size, - size_t offset, - size_t length, - unsigned arena_ind) { - // True = Opt out from decommit - return true; +bool HugePageMemoryPool::huge_page_decommit(extent_hooks_t *hooks, void *addr, + size_t size, size_t offset, + size_t length, unsigned arena_ind) { + // True = Opt out from decommit + return true; } bool HugePageMemoryPool::huge_page_split_extend(extent_hooks_t *hooks, - void *addr, - size_t size, - size_t size_a, - size_t size_b, + void *addr, size_t size, + size_t size_a, size_t size_b, bool committed, unsigned arena_ind) { - // False = Pages are successfully splitted - // -> We don't really need to do anything, everything remains in one big chunk. - return false; + // False = Pages are successfully splitted + // -> We don't really need to do anything, everything remains in one big + // chunk. + return false; } bool HugePageMemoryPool::huge_page_merge_extend(extent_hooks_t *hooks, - void *addr_a, - size_t size_a, - void *addr_b, - size_t size_b, + void *addr_a, size_t size_a, + void *addr_b, size_t size_b, bool committed, unsigned arena_ind) { - // False = Successfully merged extends - // Since all extends given to jemalloc are already continuous, we can always return false - return false; + // False = Successfully merged extends + // Since all extends given to jemalloc are already continuous, we can always + // return false + return false; } // ---------------------------------------------------------------------------- -// SimpleMemoryPool implementation +// LinearAllocator implementation // ---------------------------------------------------------------------------- -SimpleMemoryPool::~SimpleMemoryPool() { - std::lock_guard lock(allocated_buffers_mutex); - while (!allocated_buffers.empty()) { - auto &[ptr, size] = *allocated_buffers.begin(); - free(ptr, size); - } +LinearAllocator::LinearAllocator(size_t size) : size_(size), offset_(0) { + buffer_ = static_cast( + std::aligned_alloc(HugePageMemoryPool::DEFAULT_ALIGNMENT, size)); + if (!buffer_) + throw std::bad_alloc(); } -Status SimpleMemoryPool::allocate(size_t size, size_t alignment, void **out) { - if (size == 0) { - *out = nullptr; - } else { - // Ensure the alignment is a power of two. See: https://stackoverflow.com/a/108360/5589776 - assert((alignment & (alignment - 1)) == 0); - *out = std::aligned_alloc(alignment, size); - if (!out) { - return Status(StatusCode::OutOfMemory, "Unable to allocate memory!"); - } - - total_bytes_allocated_ += size; - bytes_allocated_ += size; - } +LinearAllocator::~LinearAllocator() { std::free(buffer_); } - { - std::lock_guard lock(allocated_buffers_mutex); - allocated_buffers.emplace(*out, size); - } - num_allocs_++; +bool LinearAllocator::allocate(size_t size, size_t alignment, void **out) { + size_t aligned = (offset_ + alignment - 1) & ~(alignment - 1); + if (aligned + size > size_) + return false; - return Status::OK(); + *out = buffer_ + aligned; + offset_ = aligned + size; + return true; } -Status SimpleMemoryPool::reallocate(size_t old_size, size_t new_size, size_t alignment, void **ptr) { - if (new_size == 0) { - free(*ptr, old_size, alignment); - *ptr = nullptr; - return Status::OK(); - } +void LinearAllocator::free(void *) {} - void* new_ptr = nullptr; - auto status = allocate(new_size, alignment, &new_ptr); - if (!status.ok()) { - return status; - } +// ---------------------------------------------------------------------------- +// SimpleMemoryPool implementation +// ---------------------------------------------------------------------------- - if (*ptr) { - std::memcpy(new_ptr, *ptr, std::min(old_size, new_size)); - free(*ptr, old_size, alignment); - } +constexpr size_t SIMPLE_ALLOCATOR_SIZE = 1 << 30; // 1GiB - auto n_new_bytes = (new_size - old_size); - if (n_new_bytes >= 0) { - total_bytes_allocated_ += n_new_bytes; +SimpleMemoryPool::SimpleMemoryPool() + : linear_allocator_(SIMPLE_ALLOCATOR_SIZE) {} + +SimpleMemoryPool::~SimpleMemoryPool() { + std::lock_guard lock(allocated_buffers_mutex); + while (!allocated_buffers.empty()) { + auto &[ptr, size] = *allocated_buffers.begin(); + free(ptr, size); + } +} + +Status SimpleMemoryPool::allocate(size_t size, size_t alignment, void **out) { + if (size == 0) { + *out = nullptr; + } else { + // Ensure the alignment is a power of two. See: + // https://stackoverflow.com/a/108360/5589776 + assert((alignment & (alignment - 1)) == 0); + if (!linear_allocator_.allocate(size, alignment, out)) { + return Status(StatusCode::OutOfMemory, "Unable to allocate memory!"); } - bytes_allocated_ += n_new_bytes; - *ptr = new_ptr; + total_bytes_allocated_ += size; + bytes_allocated_ += size; + } + + { + std::lock_guard lock(allocated_buffers_mutex); + allocated_buffers.emplace(*out, size); + } + num_allocs_++; + + return Status::OK(); +} + +Status SimpleMemoryPool::reallocate(size_t old_size, size_t new_size, + size_t alignment, void **ptr) { + if (new_size == 0) { + free(*ptr, old_size, alignment); + *ptr = nullptr; return Status::OK(); + } + + void *new_ptr = nullptr; + auto status = allocate(new_size, alignment, &new_ptr); + if (!status.ok()) { + return status; + } + + if (*ptr) { + std::memcpy(new_ptr, *ptr, std::min(old_size, new_size)); + free(*ptr, old_size, alignment); + } + + auto n_new_bytes = (new_size - old_size); + if (n_new_bytes >= 0) { + total_bytes_allocated_ += n_new_bytes; + } + bytes_allocated_ += n_new_bytes; + + *ptr = new_ptr; + return Status::OK(); } void SimpleMemoryPool::free(void *ptr, size_t size, size_t alignment) { - if (ptr) { - bytes_allocated_ -= size; - num_allocs_ -= 1; - std::free(ptr); + if (ptr) { + bytes_allocated_ -= size; + num_allocs_ -= 1; + linear_allocator_.free(ptr); - std::lock_guard lock(allocated_buffers_mutex); - allocated_buffers.erase(ptr); - } + std::lock_guard lock(allocated_buffers_mutex); + allocated_buffers.erase(ptr); + } } -std::pair SimpleMemoryPool::get_page_boundaries(const void *ptr) { - std::lock_guard lock(allocated_buffers_mutex); +std::pair +SimpleMemoryPool::get_page_boundaries(const void *ptr) { + std::lock_guard lock(allocated_buffers_mutex); - for (const auto &buffer : allocated_buffers) { - if (is_buffer_within_bounds({buffer.first, buffer.second}, {const_cast(ptr), 0})) { - return {buffer.first, buffer.second}; - } + for (const auto &buffer : allocated_buffers) { + if (is_buffer_within_bounds({buffer.first, buffer.second}, + {const_cast(ptr), 0})) { + return {buffer.first, buffer.second}; } + } - std::ostringstream err; - err << "The Provided address " << ptr << " is not within the bounds of the memory pool!"; - throw std::runtime_error(err.str()); + std::ostringstream err; + err << "The Provided address " << ptr + << " is not within the bounds of the memory pool!"; + throw std::runtime_error(err.str()); } -} // namespace libstf +} // namespace libstf diff --git a/software/libstf/memory_pool.hpp b/software/libstf/memory_pool.hpp index e1e67c1..b3e181e 100644 --- a/software/libstf/memory_pool.hpp +++ b/software/libstf/memory_pool.hpp @@ -4,13 +4,13 @@ #include #include +#include #include #include -#include -#include -#include #include +#include #include +#include #include "jemalloc/jemalloc.h" @@ -27,255 +27,297 @@ namespace libstf { */ class MemoryPool { public: - virtual ~MemoryPool() = default; - - /** - * Allocates a memory block of at least the specified size. - * - * @param size The minimum number of bytes to allocate. - * @param alignment The required alignment for the memory block. - * @param out Pointer to store the address of the allocated memory. - * @return Status indicating success or failure. - */ - virtual Status allocate(size_t size, size_t alignment, void **out) = 0; - virtual Status allocate(size_t size, void **out) = 0; - - /** - * Resizes an existing allocated memory block. - * - * Since most platform allocators do not support aligned reallocation, - * this operation may involve copying the data to a new memory block. - * @param old_size The current size of the allocated memory block. - * @param new_size The desired new size of the memory block. - * @param alignment The alignment requirement of the memory block. - * @param ptr Pointer to the memory block to be resized. Updated on success. - * @return Status indicating success or failure. - */ - virtual Status reallocate(size_t old_size, size_t new_size, size_t alignment, void **ptr) = 0; - - /** - * Frees a previously allocated memory block. - * - * @param ptr Pointer to the start of the allocated memory block. - * @param size The size of the allocated memory block. - * Some allocators may use this for tracking memory usage or - * optimizing deallocation. - * @param alignment The alignment of the memory block. - */ - virtual void free(void *ptr, size_t size, size_t alignment) = 0; - virtual void free(void *ptr, size_t size) = 0; - - /** - * Returns the address and size for the page in which the given allocation was placed. - * This information can be used, e.g. for TLB mappings on FPGAs. - * @param ptr The address where the allocation begins, as returned by 'allocate' - * @return A pair of: Start address and size of the allocated page - */ - virtual std::pair get_page_boundaries(const void *ptr) = 0; - - /** - * Retrieves the current amount of allocated memory that has not been freed. - * - * @return The number of bytes currently allocated. - */ - virtual size_t bytes_allocated() const = 0; - - /** - * Retrieves the total amount of memory allocated since the pool's creation. - * - * @return The cumulative number of bytes allocated. - */ - virtual size_t total_bytes_allocated() const = 0; - - /** - * Retrieves the total number of allocation and reallocation requests. - * - * @return The number of times memory has been allocated or reallocated. - */ - virtual size_t num_allocations() const = 0; - - /** - * Retrieves the peak memory usage recorded by this memory pool. - * - * @return The highest number of bytes allocated at any point. - * Returns -1 if tracking is not implemented. - */ - virtual size_t max_memory() const = 0; - - /** - * Retrieves the name of the memory allocation backend in use. - * - * @return A string representing the backend (e.g., "system", "jemalloc"). - */ - virtual std::string backend_name() const = 0; + virtual ~MemoryPool() = default; + + /** + * Allocates a memory block of at least the specified size. + * + * @param size The minimum number of bytes to allocate. + * @param alignment The required alignment for the memory block. + * @param out Pointer to store the address of the allocated memory. + * @return Status indicating success or failure. + */ + virtual Status allocate(size_t size, size_t alignment, void **out) = 0; + virtual Status allocate(size_t size, void **out) = 0; + + /** + * Resizes an existing allocated memory block. + * + * Since most platform allocators do not support aligned reallocation, + * this operation may involve copying the data to a new memory block. + * @param old_size The current size of the allocated memory block. + * @param new_size The desired new size of the memory block. + * @param alignment The alignment requirement of the memory block. + * @param ptr Pointer to the memory block to be resized. Updated on success. + * @return Status indicating success or failure. + */ + virtual Status reallocate(size_t old_size, size_t new_size, size_t alignment, + void **ptr) = 0; + + /** + * Frees a previously allocated memory block. + * + * @param ptr Pointer to the start of the allocated memory block. + * @param size The size of the allocated memory block. + * Some allocators may use this for tracking memory usage or + * optimizing deallocation. + * @param alignment The alignment of the memory block. + */ + virtual void free(void *ptr, size_t size, size_t alignment) = 0; + virtual void free(void *ptr, size_t size) = 0; + + /** + * Returns the address and size for the page in which the given allocation was + * placed. This information can be used, e.g. for TLB mappings on FPGAs. + * @param ptr The address where the allocation begins, as returned by + * 'allocate' + * @return A pair of: Start address and size of the allocated page + */ + virtual std::pair get_page_boundaries(const void *ptr) = 0; + + /** + * Retrieves the current amount of allocated memory that has not been freed. + * + * @return The number of bytes currently allocated. + */ + virtual size_t bytes_allocated() const = 0; + + /** + * Retrieves the total amount of memory allocated since the pool's creation. + * + * @return The cumulative number of bytes allocated. + */ + virtual size_t total_bytes_allocated() const = 0; + + /** + * Retrieves the total number of allocation and reallocation requests. + * + * @return The number of times memory has been allocated or reallocated. + */ + virtual size_t num_allocations() const = 0; + + /** + * Retrieves the peak memory usage recorded by this memory pool. + * + * @return The highest number of bytes allocated at any point. + * Returns -1 if tracking is not implemented. + */ + virtual size_t max_memory() const = 0; + + /** + * Retrieves the name of the memory allocation backend in use. + * + * @return A string representing the backend (e.g., "system", "jemalloc"). + */ + virtual std::string backend_name() const = 0; }; -// A static piece of memory for 0-size allocations, to return an aligned non-null pointer. This is -// required because Arrow memory pools (when we use this in other projects) need to support 0-byte -// allocations, reallocations, and deallocations but jemalloc does not support them! +// A static piece of memory for 0-size allocations, to return an aligned +// non-null pointer. This is required because Arrow memory pools (when we use +// this in other projects) need to support 0-byte allocations, reallocations, +// and deallocations but jemalloc does not support them! extern int64_t zero_size_data[1]; static void *const ZeroSizePointer = reinterpret_cast(&zero_size_data); /** * This class implements a MemoryPool that uses 1GiB huge pages. - * This is required for the FPGA support since all the data send/received from the FPGA - * needs to be mapped on the FPGAs TLB. Additionally, every TLB miss causes an FPGA-side interrupt - * and is handled in Coyotes kernel code. As small pages lead to many TLB misses, this can cause - * performance problems due to the large number of interrupts. The goal of this pool is to minimize - * such misses by using huge pages. + * This is required for the FPGA support since all the data send/received from + * the FPGA needs to be mapped on the FPGAs TLB. Additionally, every TLB miss + * causes an FPGA-side interrupt and is handled in Coyotes kernel code. As small + * pages lead to many TLB misses, this can cause performance problems due to the + * large number of interrupts. The goal of this pool is to minimize such misses + * by using huge pages. * - * The pool is implemented as follows: During initialization, it pre-allocates all available 1GiB - * pages in the system. Under the hood it uses jemalloc to handle the actual - * allocation/free requests. Jemalloc uses smart (and very complex) mechanisms to minimize - * fragmentation. However, whenever it runs out of memory, it asks this pool via extension_hooks, - * which provides a chunk of the pre-allocated memory. The pre-allocated 1GiB pages will be - * unmaped/freed when the MemoryPool is destroyed. + * The pool is implemented as follows: During initialization, it pre-allocates + * all available 1GiB pages in the system. Under the hood it uses jemalloc to + * handle the actual allocation/free requests. Jemalloc uses smart (and very + * complex) mechanisms to minimize fragmentation. However, whenever it runs out + * of memory, it asks this pool via extension_hooks, which provides a chunk of + * the pre-allocated memory. The pre-allocated 1GiB pages will be unmaped/freed + * when the MemoryPool is destroyed. */ class HugePageMemoryPool : public MemoryPool { public: - // The size of the pages to use. These are 1GiB huge pages by default - static inline const size_t HUGE_PAGE_BITS = 30; - static inline const size_t PAGE_SIZE = 1 << HUGE_PAGE_BITS; - static inline const size_t PAGE_SIZE_KB = PAGE_SIZE / 1024; - static inline const size_t HUGE_PAGE_TYPE = (HUGE_PAGE_BITS << MAP_HUGE_SHIFT); - - // Note: This alignment is required by Coyote anyway - static inline const size_t DEFAULT_ALIGNMENT = 64; - - HugePageMemoryPool(); - ~HugePageMemoryPool(); + // The size of the pages to use. These are 1GiB huge pages by default + static inline const size_t HUGE_PAGE_BITS = 30; + static inline const size_t PAGE_SIZE = 1 << HUGE_PAGE_BITS; + static inline const size_t PAGE_SIZE_KB = PAGE_SIZE / 1024; + static inline const size_t HUGE_PAGE_TYPE = + (HUGE_PAGE_BITS << MAP_HUGE_SHIFT); + + // Note: This alignment is required by Coyote anyway + static inline const size_t DEFAULT_ALIGNMENT = 64; + + HugePageMemoryPool(); + ~HugePageMemoryPool(); + + Status allocate(size_t raw_size, size_t alignment, void **out) override; + Status allocate(size_t size, void **out) override { + return allocate(size, DEFAULT_ALIGNMENT, out); + } + + Status reallocate(size_t old_size, size_t new_size, size_t alignment, + void **ptr) override; + + void free(void *ptr, size_t size, size_t alignment) override; + void free(void *ptr, size_t size) override { + free(ptr, size, DEFAULT_ALIGNMENT); + } + + std::pair get_page_boundaries(const void *ptr) override; + + /** + * @param ptr The start address of the buffer to check + * @param size The size of the buffer to check + * @return Whether the given buffer and size have been allocated by this + * memory pool + */ + bool is_in_bounds(void *ptr, size_t size); + + size_t bytes_allocated() const override { return bytes_allocated_.load(); } + size_t total_bytes_allocated() const override { + return total_bytes_allocated_.load(); + } + size_t num_allocations() const override { return num_allocs_.load(); } + size_t max_memory() const override { + return std::numeric_limits::max(); + } + std::string backend_name() const override { return "HugePageMemoryPool"; } + + void *initial_address() const { return initial_address_; } + size_t total_capacity() const { return total_capacity_; } - Status allocate(size_t raw_size, size_t alignment, void **out) override; - Status allocate(size_t size, void **out) override { return allocate(size, DEFAULT_ALIGNMENT, out); } - - Status reallocate(size_t old_size, size_t new_size, size_t alignment, void **ptr) override; - - void free(void *ptr, size_t size, size_t alignment) override; - void free(void *ptr, size_t size) override { free(ptr, size, DEFAULT_ALIGNMENT); } - - std::pair get_page_boundaries(const void *ptr) override; - - /** - * @param ptr The start address of the buffer to check - * @param size The size of the buffer to check - * @return Whether the given buffer and size have been allocated by this memory pool - */ - bool is_in_bounds(void *ptr, size_t size); +private: + /** + * @return The number of huge pages with PAGE_SIZE currently available in the + * system. Ensures this number is > 0 + */ + int get_number_of_available_huge_pages(); + + // Call back functions that get called by jemalloc to manage the underlying + // memory + static void *huge_page_alloc(extent_hooks_t *hooks, void *new_addr, + size_t size, size_t alignment, bool *zero, + bool *commit, unsigned arena_ind); + + static bool huge_page_dealloc(extent_hooks_t *hooks, void *addr, size_t size, + bool committed, unsigned arena_ind); + + static bool huge_page_decommit(extent_hooks_t *hooks, void *addr, size_t size, + size_t offset, size_t length, + unsigned arena_ind); + + static bool huge_page_split_extend(extent_hooks_t *hooks, void *addr, + size_t size, size_t size_a, size_t size_b, + bool committed, unsigned arena_ind); + + static bool huge_page_merge_extend(extent_hooks_t *hooks, void *addr_a, + size_t size_a, void *addr_b, size_t size_b, + bool committed, unsigned arena_ind); + + // Explicit management of thread caches (or tcaches) + unsigned get_tcache_id_for_calling_thread(); + ReaderWriterLock tcache_lock; + std::unordered_map tcache_ids; + + // The index of the area we allocate + unsigned arena_index; + // The struct of hooks we use for the allocation + // Needs to be static to ensure it lives long enough + static inline extent_hooks_t hugepage_hooks = { + huge_page_alloc, // alloc + huge_page_dealloc, // dalloc + nullptr, // destroy + nullptr, // commit + huge_page_decommit, // decommit + nullptr, // purge_lazy + nullptr, // purge_forced + huge_page_split_extend, // split + huge_page_merge_extend // merge + }; + + // The initial address of the memory we allocate + // -> Needed for the de-allocation + void *initial_address_; + // Total capacity of allocated memory + size_t total_capacity_; + // Atomics for the statistics + std::atomic total_bytes_allocated_{0}; + std::atomic bytes_allocated_{0}; + std::atomic num_allocs_{0}; + // How much capacity is remaining in the allocated huge page memory + static inline size_t remaining_capacity = 0; + // The next free address that can be returned to jemalloc + static inline void *next_free_addr = nullptr; + static inline std::mutex allocations_mutex_; +}; - size_t bytes_allocated() const override { return bytes_allocated_.load(); } - size_t total_bytes_allocated() const override { return total_bytes_allocated_.load(); } - size_t num_allocations() const override { return num_allocs_.load(); } - size_t max_memory() const override { return std::numeric_limits::max(); } - std::string backend_name() const override { return "HugePageMemoryPool"; } +class LinearAllocator { +public: + explicit LinearAllocator(size_t size); + ~LinearAllocator(); - void *initial_address() const { return initial_address_; } - size_t total_capacity() const { return total_capacity_; } + bool allocate(size_t size, size_t alignment, void **out); + void free(void *); private: - /** - * @return The number of huge pages with PAGE_SIZE currently available in the system. - * Ensures this number is > 0 - */ - int get_number_of_available_huge_pages(); - - // Call back functions that get called by jemalloc to manage the underlying memory - static void* huge_page_alloc(extent_hooks_t *hooks, - void *new_addr, - size_t size, - size_t alignment, - bool *zero, - bool *commit, - unsigned arena_ind); - - static bool huge_page_dealloc( - extent_hooks_t *hooks, void *addr, size_t size, bool committed, unsigned arena_ind); - - static bool huge_page_decommit( - extent_hooks_t *hooks, void *addr, size_t size, size_t offset, size_t length, unsigned arena_ind); - - static bool huge_page_split_extend( - extent_hooks_t *hooks, void* addr, size_t size, size_t size_a, size_t size_b, bool committed, unsigned arena_ind); - - static bool huge_page_merge_extend( - extent_hooks_t *hooks, void *addr_a, size_t size_a, void *addr_b, size_t size_b, bool committed, unsigned arena_ind); - - // Explicit management of thread caches (or tcaches) - unsigned get_tcache_id_for_calling_thread(); - ReaderWriterLock tcache_lock; - std::unordered_map tcache_ids; - - // The index of the area we allocate - unsigned arena_index; - // The struct of hooks we use for the allocation - // Needs to be static to ensure it lives long enough - static inline extent_hooks_t hugepage_hooks = { - huge_page_alloc, // alloc - huge_page_dealloc, // dalloc - nullptr, // destroy - nullptr, // commit - huge_page_decommit, // decommit - nullptr, // purge_lazy - nullptr, // purge_forced - huge_page_split_extend, // split - huge_page_merge_extend // merge - }; - - // The initial address of the memory we allocate - // -> Needed for the de-allocation - void *initial_address_; - // Total capacity of allocated memory - size_t total_capacity_; - // Atomics for the statistics - std::atomic total_bytes_allocated_{0}; - std::atomic bytes_allocated_{0}; - std::atomic num_allocs_{0}; - // How much capacity is remaining in the allocated huge page memory - static inline size_t remaining_capacity = 0; - // The next free address that can be returned to jemalloc - static inline void *next_free_addr = nullptr; - static inline std::mutex allocations_mutex_; + uint8_t *buffer_; + size_t size_; + size_t offset_; }; /** - * Implements a naive memory pool that is only used for simulation purposes in systems where there - * are no huge pages. + * Implements a naive memory pool that is only used for simulation purposes in + * systems where there are no huge pages. */ class SimpleMemoryPool : public MemoryPool { public: - static inline const size_t PAGE_SIZE = 4096; + static inline const size_t PAGE_SIZE = 4096; - // Note: This alignment is required by Coyote anyway - static inline const size_t DEFAULT_ALIGNMENT = 64; + // Note: This alignment is required by Coyote anyway + static inline const size_t DEFAULT_ALIGNMENT = 64; - SimpleMemoryPool() = default; - ~SimpleMemoryPool() override; + SimpleMemoryPool(); + ~SimpleMemoryPool() override; - Status allocate(size_t size, size_t alignment, void **out) override; - Status allocate(size_t size, void **out) override { return allocate(size, DEFAULT_ALIGNMENT, out); }; + Status allocate(size_t size, size_t alignment, void **out) override; + Status allocate(size_t size, void **out) override { + return allocate(size, DEFAULT_ALIGNMENT, out); + }; - Status reallocate(size_t old_size, size_t new_size, size_t alignment, void **ptr) override; + Status reallocate(size_t old_size, size_t new_size, size_t alignment, + void **ptr) override; - void free(void *ptr, size_t size, size_t alignment) override; - void free(void *ptr, size_t size) override { free(ptr, size, DEFAULT_ALIGNMENT); }; + void free(void *ptr, size_t size, size_t alignment) override; + void free(void *ptr, size_t size) override { + free(ptr, size, DEFAULT_ALIGNMENT); + }; - std::pair get_page_boundaries(const void *ptr) override; + std::pair get_page_boundaries(const void *ptr) override; - size_t bytes_allocated() const override { return bytes_allocated_.load(); }; - size_t total_bytes_allocated() const override { return total_bytes_allocated_.load(); }; - size_t num_allocations() const override { return num_allocs_.load(); }; - size_t max_memory() const override { return std::numeric_limits::max(); }; - std::string backend_name() const override { return "SimpleMemoryPool"; }; + size_t bytes_allocated() const override { return bytes_allocated_.load(); }; + size_t total_bytes_allocated() const override { + return total_bytes_allocated_.load(); + }; + size_t num_allocations() const override { return num_allocs_.load(); }; + size_t max_memory() const override { + return std::numeric_limits::max(); + }; + std::string backend_name() const override { return "SimpleMemoryPool"; }; private: - // A map of all the pages that have been allocated and mapped for this thread - std::unordered_map allocated_buffers; - // Recursive mutex to protect allocated_buffers (recursive to allow destructor to call free()) - mutable std::recursive_mutex allocated_buffers_mutex; - - // Atomics for the statistics - std::atomic total_bytes_allocated_{0}; - std::atomic bytes_allocated_{0}; - std::atomic num_allocs_{0}; + // A map of all the pages that have been allocated and mapped for this thread + std::unordered_map allocated_buffers; + // Recursive mutex to protect allocated_buffers (recursive to allow destructor + // to call free()) + mutable std::recursive_mutex allocated_buffers_mutex; + + // Atomics for the statistics + std::atomic total_bytes_allocated_{0}; + std::atomic bytes_allocated_{0}; + std::atomic num_allocs_{0}; + + LinearAllocator linear_allocator_; }; -} // namespace libstf +} // namespace libstf diff --git a/software/libstf/output_handle.cpp b/software/libstf/output_handle.cpp index 21dafa9..0fae867 100644 --- a/software/libstf/output_handle.cpp +++ b/software/libstf/output_handle.cpp @@ -15,11 +15,22 @@ void OutputHandle::push_buffer(stream_t stream_id, Buffer buffer) { } void OutputHandle::mark_done(stream_t stream_id) { - std::lock_guard guard(output_buffers_mutex); + if (callback_thread != std::nullopt) { + callback_thread->join(); + callback_thread = std::nullopt; + } - finished_streams.set(stream_id); + { + std::lock_guard guard(output_buffers_mutex); - output_buffers_cv.notify_all(); + finished_streams.set(stream_id); + + output_buffers_cv.notify_all(); + } + + if (callback != nullptr) { + callback_thread = std::thread(callback, stream_id); + } } // ---------------------------------------------------------------------------- @@ -28,6 +39,11 @@ void OutputHandle::mark_done(stream_t stream_id) { OutputHandle::~OutputHandle() { std::lock_guard guard(output_buffers_mutex); + if (callback_thread != std::nullopt) { + callback_thread->join(); + callback_thread = std::nullopt; + } + // Free any memory that has not been taken by users for (auto &queue : output_buffers) { while (!queue.empty()) { @@ -118,6 +134,17 @@ std::shared_ptr OutputHandle::get_next_stream_output(stream_t stream_id) } } +void OutputHandle::add_callback(std::function callback) { + auto previous_callback = this->callback; + this->callback = [previous_callback, callback](stream_t stream) { + if (previous_callback != nullptr) { + previous_callback (stream); + } + + callback(stream); + }; +} + // ---------------------------------------------------------------------------- // Private methods // ---------------------------------------------------------------------------- diff --git a/software/libstf/output_handle.hpp b/software/libstf/output_handle.hpp index 655d62c..0705cc1 100644 --- a/software/libstf/output_handle.hpp +++ b/software/libstf/output_handle.hpp @@ -2,10 +2,12 @@ #include #include +#include #include #include #include #include +#include #include #include @@ -82,9 +84,19 @@ class OutputHandle { */ std::shared_ptr get_next_stream_output(stream_t stream_id); + /** + * @param callback The callback function to call when a stream is marked as done. + * + * Adds a callback function which will be called when a stream is marked as done, + * and thus all output data for this transfer has been received on the host. + */ + void add_callback(std::function callback); + private: const stream_t NUM_STREAMS; std::shared_ptr memory_pool; + std::function callback; + std::optional callback_thread; stream_mask_t active_streams; // Streams active for this handle stream_mask_t finished_streams; // Streams that have received their last buffer