Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/internal_modules/roc_core/memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bool MemoryLimiter::acquire(size_t num_bytes) {
}
cpu_relax();
} while (true);
roc_log(LogError,
roc_log(LogTrace,
"memory limiter (%s): could not acquire bytes due to limit: requested=%lu "
"acquired=%lu limit=%lu",
name_, (unsigned long)num_bytes, (unsigned long)current,
Expand Down
56 changes: 51 additions & 5 deletions src/internal_modules/roc_netio/target_libuv/roc_netio/udp_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ UdpPort::UdpPort(const UdpConfig& config,
, loop_(event_loop)
, handle_initialized_(false)
, write_sem_initialized_(false)
, recv_retry_timer_initialized_(false)
, multicast_group_joined_(false)
, recv_started_(false)
, want_close_(false)
Expand Down Expand Up @@ -200,11 +201,14 @@ void UdpPort::close_cb_(uv_handle_t* handle) {

if (handle == (uv_handle_t*)&self.handle_) {
self.handle_initialized_ = false;
} else if (handle == (uv_handle_t*)&self.recv_retry_timer_) {
self.recv_retry_timer_initialized_ = false;
} else {
self.write_sem_initialized_ = false;
}

if (self.handle_initialized_ || self.write_sem_initialized_) {
if (self.handle_initialized_ || self.write_sem_initialized_
|| self.recv_retry_timer_initialized_) {
return;
}

Expand All @@ -224,11 +228,8 @@ void UdpPort::alloc_cb_(uv_handle_t* handle, size_t size, uv_buf_t* buf) {

core::BufferPtr bp = self.packet_factory_.new_packet_buffer();
if (!bp) {
roc_log(LogError, "udp port: %s: can't allocate buffer", self.descriptor());

buf->base = NULL;
buf->len = 0;

return;
}

Expand All @@ -242,6 +243,25 @@ void UdpPort::alloc_cb_(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
buf->len = size;
}

void UdpPort::recv_retry_cb_(uv_timer_t* handle) {
roc_panic_if_not(handle);

UdpPort& self = *(UdpPort*)handle->data;

if (self.want_close_ || self.recv_started_) {
return;
}

roc_log(LogDebug, "udp port: %s: retrying recv after alloc pause", self.descriptor());

if (int err = uv_udp_recv_start(&self.handle_, alloc_cb_, recv_cb_)) {
roc_log(LogError, "udp port: %s: uv_udp_recv_start(): [%s] %s", self.descriptor(),
uv_err_name(err), uv_strerror(err));
return;
}
self.recv_started_ = true;
}

void UdpPort::recv_cb_(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* buf,
Expand All @@ -252,6 +272,26 @@ void UdpPort::recv_cb_(uv_udp_t* handle,

UdpPort& self = *(UdpPort*)handle->data;

// alloc_cb_() sets buf->base to NULL when buffer allocation fails.
// libuv still invokes recv_cb_() in this case; pause receiving to avoid
// busy-waiting and schedule a retry timer.
if (!buf->base) {
roc_log(LogDebug, "udp port: %s: can't allocate buffer, pausing recv",
self.descriptor());

uv_udp_recv_stop(&self.handle_);
self.recv_started_ = false;

if (!self.recv_retry_timer_initialized_) {
uv_timer_init(&self.loop_, &self.recv_retry_timer_);
self.recv_retry_timer_.data = &self;
self.recv_retry_timer_initialized_ = true;
}
uv_timer_start(&self.recv_retry_timer_, recv_retry_cb_, 200, 0);

return;
}

address::SocketAddr src_addr;
if (sockaddr) {
if (!src_addr.set_host_port_saddr(sockaddr)) {
Expand Down Expand Up @@ -480,7 +520,8 @@ bool UdpPort::try_nonblocking_write_(const packet::PacketPtr& pp) {
}

bool UdpPort::fully_closed_() const {
if (!handle_initialized_ && !write_sem_initialized_) {
if (!handle_initialized_ && !write_sem_initialized_
&& !recv_retry_timer_initialized_) {
return true;
}

Expand Down Expand Up @@ -510,6 +551,11 @@ void UdpPort::start_closing_() {
leave_multicast_group_();
}

if (recv_retry_timer_initialized_ && !uv_is_closing((uv_handle_t*)&recv_retry_timer_)) {
uv_timer_stop(&recv_retry_timer_);
uv_close((uv_handle_t*)&recv_retry_timer_, close_cb_);
}

if (handle_initialized_ && !uv_is_closing((uv_handle_t*)&handle_)) {
uv_close((uv_handle_t*)&handle_, close_cb_);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class UdpPort : public BasicPort, private packet::IWriter {
const sockaddr* addr,
unsigned flags);

static void recv_retry_cb_(uv_timer_t* handle);

static void write_sem_cb_(uv_async_t* handle);
static void send_cb_(uv_udp_send_t* req, int status);

Expand Down Expand Up @@ -142,6 +144,9 @@ class UdpPort : public BasicPort, private packet::IWriter {
uv_async_t write_sem_;
bool write_sem_initialized_;

uv_timer_t recv_retry_timer_;
bool recv_retry_timer_initialized_;

bool multicast_group_joined_;
bool recv_started_;
bool want_close_;
Expand Down
18 changes: 13 additions & 5 deletions src/internal_modules/roc_node/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@ namespace node {

Context::Context(const ContextConfig& config, core::IArena& arena)
: arena_(arena)
, packet_mem_limiter_("packet_mem", config.max_packet_pool_bytes)
, frame_mem_limiter_("frame_mem", config.max_frame_pool_bytes)
, packet_pool_("packet_pool", arena_)
, packet_buffer_pool_(
"packet_buffer_pool", arena_, sizeof(core::Buffer) + config.max_packet_size)
, frame_buffer_pool_(
"frame_buffer_pool", arena_, sizeof(core::Buffer) + config.max_frame_size)
, limited_packet_pool_(packet_pool_, packet_mem_limiter_)
, limited_packet_buffer_pool_(packet_buffer_pool_, packet_mem_limiter_)
, limited_frame_buffer_pool_(frame_buffer_pool_, frame_mem_limiter_)
, encoding_map_(arena_)
, network_loop_(packet_pool_, packet_buffer_pool_, arena_)
, network_loop_(limited_packet_pool_, limited_packet_buffer_pool_, arena_)
, control_loop_(network_loop_, arena_) {
roc_log(LogDebug, "context: initializing");
roc_log(LogDebug,
"context: initializing: packet_pool_limit=%lu frame_pool_limit=%lu",
(unsigned long)config.max_packet_pool_bytes,
(unsigned long)config.max_frame_pool_bytes);
}

Context::~Context() {
Expand All @@ -39,15 +47,15 @@ core::IArena& Context::arena() {
}

core::IPool& Context::packet_pool() {
return packet_pool_;
return limited_packet_pool_;
}

core::IPool& Context::packet_buffer_pool() {
return packet_buffer_pool_;
return limited_packet_buffer_pool_;
}

core::IPool& Context::frame_buffer_pool() {
return frame_buffer_pool_;
return limited_frame_buffer_pool_;
}

rtp::EncodingMap& Context::encoding_map() {
Expand Down
25 changes: 24 additions & 1 deletion src/internal_modules/roc_node/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "roc_core/allocation_policy.h"
#include "roc_core/atomic.h"
#include "roc_core/iarena.h"
#include "roc_core/limited_pool.h"
#include "roc_core/memory_limiter.h"
#include "roc_core/ref_counted.h"
#include "roc_core/slab_pool.h"
#include "roc_ctl/control_loop.h"
Expand All @@ -34,9 +36,19 @@ struct ContextConfig {
//! Maximum size in bytes of an audio frame.
size_t max_frame_size;

//! Maximum total bytes for packet pools (packet objects + packet buffers).
//! 0 = unlimited (no memory limit enforced).
size_t max_packet_pool_bytes;

//! Maximum total bytes for frame buffer pool.
//! 0 = unlimited (no memory limit enforced).
size_t max_frame_pool_bytes;

ContextConfig()
: max_packet_size(2048)
, max_frame_size(4096) {
, max_frame_size(4096)
, max_packet_pool_bytes(32 * 1024 * 1024)
, max_frame_pool_bytes(8 * 1024 * 1024) {
}
};

Expand Down Expand Up @@ -76,12 +88,23 @@ class Context : public core::RefCounted<Context, core::ManualAllocation> {
private:
core::IArena& arena_;

// Memory limiters (destroyed last, after all pools release memory).
core::MemoryLimiter packet_mem_limiter_;
core::MemoryLimiter frame_mem_limiter_;

// Underlying slab pools.
core::SlabPool<packet::Packet> packet_pool_;
core::SlabPool<core::Buffer> packet_buffer_pool_;
core::SlabPool<core::Buffer> frame_buffer_pool_;

// Limited pool wrappers (destroyed before slab pools).
core::LimitedPool limited_packet_pool_;
core::LimitedPool limited_packet_buffer_pool_;
core::LimitedPool limited_frame_buffer_pool_;

rtp::EncodingMap encoding_map_;

// Event loops (destroyed first, returning all objects to pools).
netio::NetworkLoop network_loop_;
ctl::ControlLoop control_loop_;
};
Expand Down
Loading