Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions c_src/py_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -859,21 +859,20 @@ static void executor_stop(void) {
/**
* Main function for a multi-executor thread.
* Each executor has its own queue and processes requests independently.
*
* GIL handling: Acquire GIL only when processing work, not while idle.
* This prevents idle executors from competing with dirty schedulers
* running actual Python work via the context-based API.
*/
static void *multi_executor_thread_main(void *arg) {
executor_t *exec = (executor_t *)arg;

/* Acquire GIL for this thread */
PyGILState_STATE gstate = PyGILState_Ensure();

exec->running = true;

while (!exec->shutdown) {
py_request_t *req = NULL;

/* Release GIL while waiting for work */
Py_BEGIN_ALLOW_THREADS

/* Wait for work - NO GIL held while idle */
pthread_mutex_lock(&exec->mutex);
while (exec->queue_head == NULL && !exec->shutdown) {
pthread_cond_wait(&exec->cond, &exec->mutex);
Expand All @@ -890,8 +889,6 @@ static void *multi_executor_thread_main(void *arg) {
}
pthread_mutex_unlock(&exec->mutex);

Py_END_ALLOW_THREADS

if (req != NULL) {
if (req->type == PY_REQ_SHUTDOWN) {
pthread_mutex_lock(&req->mutex);
Expand All @@ -900,10 +897,16 @@ static void *multi_executor_thread_main(void *arg) {
pthread_mutex_unlock(&req->mutex);
break;
} else {
/* Process the request with GIL held */
/* Acquire GIL only for actual work */
PyGILState_STATE gstate = PyGILState_Ensure();

/* Process the request */
process_request(req);

/* Signal completion */
/* Release GIL immediately after processing */
PyGILState_Release(gstate);

/* Signal completion (outside GIL) */
pthread_mutex_lock(&req->mutex);
req->completed = true;
pthread_cond_signal(&req->cond);
Expand All @@ -913,7 +916,6 @@ static void *multi_executor_thread_main(void *arg) {
}

exec->running = false;
PyGILState_Release(gstate);

return NULL;
}
Expand Down Expand Up @@ -953,8 +955,8 @@ static int multi_executor_start(int num_executors) {
return 0;
}

if (num_executors <= 0) {
num_executors = 4;
if (num_executors < MIN_EXECUTORS) {
num_executors = MIN_EXECUTORS;
}
if (num_executors > MAX_EXECUTORS) {
num_executors = MAX_EXECUTORS;
Expand Down
2 changes: 1 addition & 1 deletion c_src/py_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ static ERL_NIF_TERM nif_py_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM arg
default:
/* Start multiple executors for GIL contention mode */
{
int num_exec = 4; /* Default */
int num_exec = MIN_EXECUTORS; /* Fallback if not provided */
/* Check for config */
if (argc > 0 && enif_is_map(env, argv[0])) {
ERL_NIF_TERM key = enif_make_atom(env, "num_executors");
Expand Down
8 changes: 7 additions & 1 deletion c_src/py_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -1063,11 +1063,17 @@ typedef struct {
* @{
*/

/**
* @def MIN_EXECUTORS
* @brief Minimum number of executor threads in the pool
*/
#define MIN_EXECUTORS 2

/**
* @def MAX_EXECUTORS
* @brief Maximum number of executor threads in the pool
*/
#define MAX_EXECUTORS 16
#define MAX_EXECUTORS 32

/**
* @struct executor_t
Expand Down
6 changes: 5 additions & 1 deletion src/erlang_python_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ init([]) ->
ContextMode = application:get_env(erlang_python, context_mode, auto),
NumAsyncWorkers = application:get_env(erlang_python, num_async_workers, 2),

%% Default executors: 4 (benchmarked sweet spot for most workloads)
%% Can be overridden via {erlang_python, [{num_executors, N}]}
NumExecutors = application:get_env(erlang_python, num_executors, 4),

%% Initialize Python runtime first
ok = py_nif:init(),
ok = py_nif:init(#{num_executors => NumExecutors}),

%% Initialize the semaphore ETS table for rate limiting
ok = py_semaphore:init(),
Expand Down