Skip to content

Commit 467b017

Browse files
committed
Add OWN_GIL mode to event loop pool (#44)
Enable true parallel Python execution with per-worker GILs. Sessions map PIDs to workers with automatic cleanup on process exit. Config: {event_loop_pool_owngil, true}
1 parent 1e58d0c commit 467b017

File tree

8 files changed

+1285
-15
lines changed

8 files changed

+1285
-15
lines changed

c_src/py_nif.c

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6154,6 +6154,219 @@ static ERL_NIF_TERM nif_subinterp_thread_pool_stats(ErlNifEnv *env, int argc,
61546154
return map;
61556155
}
61566156

6157+
/**
6158+
* @brief NIF: Create OWN_GIL session for event loop pool
6159+
*
6160+
* Creates a new namespace in a worker thread for a calling process.
6161+
* Uses the worker_hint for worker assignment (typically loop index).
6162+
*
6163+
* Returns {ok, WorkerId, HandleId} on success.
6164+
*/
6165+
static ERL_NIF_TERM nif_owngil_create_session(ErlNifEnv *env, int argc,
6166+
const ERL_NIF_TERM argv[]) {
6167+
if (argc != 1) {
6168+
return enif_make_badarg(env);
6169+
}
6170+
6171+
if (!subinterp_thread_pool_is_ready()) {
6172+
return enif_make_tuple2(env, ATOM_ERROR,
6173+
enif_make_atom(env, "pool_not_ready"));
6174+
}
6175+
6176+
unsigned int worker_hint;
6177+
if (!enif_get_uint(env, argv[0], &worker_hint)) {
6178+
return enif_make_badarg(env);
6179+
}
6180+
6181+
/* Use worker_hint to select worker (modulo num_workers for safety) */
6182+
int num_workers = g_thread_pool.num_workers;
6183+
if (num_workers <= 0) {
6184+
return enif_make_tuple2(env, ATOM_ERROR,
6185+
enif_make_atom(env, "no_workers"));
6186+
}
6187+
6188+
int worker_id = worker_hint % num_workers;
6189+
uint64_t handle_id = atomic_fetch_add(&g_thread_pool.next_handle_id, 1);
6190+
6191+
/* Send create namespace request to worker */
6192+
subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];
6193+
6194+
pthread_mutex_lock(&w->dispatch_mutex);
6195+
6196+
uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
6197+
owngil_header_t header = {
6198+
.magic = OWNGIL_MAGIC,
6199+
.version = OWNGIL_PROTOCOL_VERSION,
6200+
.msg_type = MSG_REQUEST,
6201+
.req_type = REQ_CREATE_NS,
6202+
.request_id = request_id,
6203+
.handle_id = handle_id,
6204+
.payload_len = 0,
6205+
};
6206+
6207+
/* Write header */
6208+
if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header)) {
6209+
pthread_mutex_unlock(&w->dispatch_mutex);
6210+
return enif_make_tuple2(env, ATOM_ERROR,
6211+
enif_make_atom(env, "write_failed"));
6212+
}
6213+
6214+
/* Wait for response */
6215+
owngil_header_t resp;
6216+
if (read(w->result_pipe[0], &resp, sizeof(resp)) != sizeof(resp)) {
6217+
pthread_mutex_unlock(&w->dispatch_mutex);
6218+
return enif_make_tuple2(env, ATOM_ERROR,
6219+
enif_make_atom(env, "read_failed"));
6220+
}
6221+
6222+
pthread_mutex_unlock(&w->dispatch_mutex);
6223+
6224+
if (resp.msg_type != MSG_RESPONSE) {
6225+
return enif_make_tuple2(env, ATOM_ERROR,
6226+
enif_make_atom(env, "create_failed"));
6227+
}
6228+
6229+
return enif_make_tuple3(env, ATOM_OK,
6230+
enif_make_uint(env, worker_id),
6231+
enif_make_uint64(env, handle_id));
6232+
}
6233+
6234+
/**
6235+
* @brief NIF: Submit async task to OWN_GIL worker
6236+
*
6237+
* Submits a task to run in the worker's asyncio event loop.
6238+
* Result is sent to CallerPid as {async_result, Ref, Result}.
6239+
*/
6240+
static ERL_NIF_TERM nif_owngil_submit_task(ErlNifEnv *env, int argc,
6241+
const ERL_NIF_TERM argv[]) {
6242+
if (argc != 7) {
6243+
return enif_make_badarg(env);
6244+
}
6245+
6246+
if (!subinterp_thread_pool_is_ready()) {
6247+
return enif_make_tuple2(env, ATOM_ERROR,
6248+
enif_make_atom(env, "pool_not_ready"));
6249+
}
6250+
6251+
unsigned int worker_id;
6252+
ErlNifUInt64 handle_id;
6253+
ErlNifPid caller_pid;
6254+
6255+
if (!enif_get_uint(env, argv[0], &worker_id) ||
6256+
!enif_get_uint64(env, argv[1], &handle_id) ||
6257+
!enif_get_local_pid(env, argv[2], &caller_pid)) {
6258+
return enif_make_badarg(env);
6259+
}
6260+
6261+
if (worker_id >= (unsigned int)g_thread_pool.num_workers) {
6262+
return enif_make_tuple2(env, ATOM_ERROR,
6263+
enif_make_atom(env, "invalid_worker"));
6264+
}
6265+
6266+
/* Build payload tuple: {Module, Func, Args, Kwargs, CallerPid, Ref} */
6267+
ERL_NIF_TERM caller_pid_term = enif_make_pid(env, &caller_pid);
6268+
ERL_NIF_TERM kwargs = enif_make_new_map(env);
6269+
ERL_NIF_TERM payload_tuple = enif_make_tuple6(env,
6270+
argv[4], /* Module */
6271+
argv[5], /* Func */
6272+
argv[6], /* Args */
6273+
kwargs, /* Kwargs */
6274+
caller_pid_term,
6275+
argv[3] /* Ref */
6276+
);
6277+
6278+
/* Serialize to ETF */
6279+
ErlNifBinary payload_bin;
6280+
if (!enif_term_to_binary(env, payload_tuple, &payload_bin)) {
6281+
return enif_make_tuple2(env, ATOM_ERROR,
6282+
enif_make_atom(env, "serialization_failed"));
6283+
}
6284+
6285+
subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];
6286+
6287+
pthread_mutex_lock(&w->dispatch_mutex);
6288+
6289+
uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
6290+
owngil_header_t header = {
6291+
.magic = OWNGIL_MAGIC,
6292+
.version = OWNGIL_PROTOCOL_VERSION,
6293+
.msg_type = MSG_REQUEST,
6294+
.req_type = REQ_ASYNC_CALL,
6295+
.request_id = request_id,
6296+
.handle_id = handle_id,
6297+
.payload_len = payload_bin.size,
6298+
};
6299+
6300+
/* Write header and payload */
6301+
if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header) ||
6302+
write(w->cmd_pipe[1], payload_bin.data, payload_bin.size) != (ssize_t)payload_bin.size) {
6303+
pthread_mutex_unlock(&w->dispatch_mutex);
6304+
enif_release_binary(&payload_bin);
6305+
return enif_make_tuple2(env, ATOM_ERROR,
6306+
enif_make_atom(env, "write_failed"));
6307+
}
6308+
6309+
enif_release_binary(&payload_bin);
6310+
pthread_mutex_unlock(&w->dispatch_mutex);
6311+
6312+
/* For async, we don't wait for response - worker sends directly to caller */
6313+
return ATOM_OK;
6314+
}
6315+
6316+
/**
6317+
* @brief NIF: Destroy OWN_GIL session
6318+
*
6319+
* Cleans up the namespace in the worker thread.
6320+
*/
6321+
static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc,
6322+
const ERL_NIF_TERM argv[]) {
6323+
if (argc != 2) {
6324+
return enif_make_badarg(env);
6325+
}
6326+
6327+
if (!subinterp_thread_pool_is_ready()) {
6328+
return ATOM_OK; /* Nothing to clean up */
6329+
}
6330+
6331+
unsigned int worker_id;
6332+
ErlNifUInt64 handle_id;
6333+
6334+
if (!enif_get_uint(env, argv[0], &worker_id) ||
6335+
!enif_get_uint64(env, argv[1], &handle_id)) {
6336+
return enif_make_badarg(env);
6337+
}
6338+
6339+
if (worker_id >= (unsigned int)g_thread_pool.num_workers) {
6340+
return ATOM_OK; /* Invalid worker, nothing to do */
6341+
}
6342+
6343+
subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];
6344+
6345+
pthread_mutex_lock(&w->dispatch_mutex);
6346+
6347+
uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
6348+
owngil_header_t header = {
6349+
.magic = OWNGIL_MAGIC,
6350+
.version = OWNGIL_PROTOCOL_VERSION,
6351+
.msg_type = MSG_REQUEST,
6352+
.req_type = REQ_DESTROY_NS,
6353+
.request_id = request_id,
6354+
.handle_id = handle_id,
6355+
.payload_len = 0,
6356+
};
6357+
6358+
/* Write header */
6359+
if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) {
6360+
/* Wait for response */
6361+
owngil_header_t resp;
6362+
read(w->result_pipe[0], &resp, sizeof(resp));
6363+
}
6364+
6365+
pthread_mutex_unlock(&w->dispatch_mutex);
6366+
6367+
return ATOM_OK;
6368+
}
6369+
61576370
#else /* !HAVE_SUBINTERPRETERS */
61586371

61596372
/* Stub implementations for Python < 3.12 */
@@ -6232,6 +6445,27 @@ static ERL_NIF_TERM nif_subinterp_thread_pool_stats(ErlNifEnv *env, int argc,
62326445
return map;
62336446
}
62346447

6448+
/* OWN_GIL session stubs for non-subinterpreter builds */
6449+
static ERL_NIF_TERM nif_owngil_create_session(ErlNifEnv *env, int argc,
6450+
const ERL_NIF_TERM argv[]) {
6451+
(void)argc; (void)argv;
6452+
return enif_make_tuple2(env, ATOM_ERROR,
6453+
enif_make_atom(env, "not_supported"));
6454+
}
6455+
6456+
static ERL_NIF_TERM nif_owngil_submit_task(ErlNifEnv *env, int argc,
6457+
const ERL_NIF_TERM argv[]) {
6458+
(void)argc; (void)argv;
6459+
return enif_make_tuple2(env, ATOM_ERROR,
6460+
enif_make_atom(env, "not_supported"));
6461+
}
6462+
6463+
static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc,
6464+
const ERL_NIF_TERM argv[]) {
6465+
(void)argc; (void)argv;
6466+
return ATOM_OK;
6467+
}
6468+
62356469
#endif /* HAVE_SUBINTERPRETERS */
62366470

62376471
/* ============================================================================
@@ -6486,6 +6720,11 @@ static ErlNifFunc nif_funcs[] = {
64866720
{"subinterp_thread_cast", 4, nif_subinterp_thread_cast, 0},
64876721
{"subinterp_thread_async_call", 6, nif_subinterp_thread_async_call, 0},
64886722

6723+
/* OWN_GIL session management for event loop pool */
6724+
{"owngil_create_session", 1, nif_owngil_create_session, 0},
6725+
{"owngil_submit_task", 7, nif_owngil_submit_task, 0},
6726+
{"owngil_destroy_session", 2, nif_owngil_destroy_session, 0},
6727+
64896728
/* Execution mode info */
64906729
{"execution_mode", 0, nif_execution_mode, 0},
64916730
{"num_executors", 0, nif_num_executors, 0},

0 commit comments

Comments
 (0)