Skip to content

Commit e631349

Browse files
authored
Merge pull request #55 from benoitc/feature/owngil-imports-paths
Apply imports/paths to OWN_GIL sessions
2 parents 98728d1 + b0223bc commit e631349

File tree

7 files changed

+434
-3
lines changed

7 files changed

+434
-3
lines changed

c_src/py_nif.c

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6881,6 +6881,132 @@ static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc,
68816881
return ATOM_OK;
68826882
}
68836883

6884+
/**
6885+
* @brief NIF: Apply imports to OWN_GIL session
6886+
*
6887+
* Imports modules into the worker's sys.modules.
6888+
* Args: WorkerId, HandleId, Imports (list of {ModuleBin, FuncBin | all})
6889+
*/
6890+
static ERL_NIF_TERM nif_owngil_apply_imports(ErlNifEnv *env, int argc,
6891+
const ERL_NIF_TERM argv[]) {
6892+
if (argc != 3) {
6893+
return enif_make_badarg(env);
6894+
}
6895+
6896+
if (!subinterp_thread_pool_is_ready()) {
6897+
return ATOM_OK; /* Silently succeed if pool not ready */
6898+
}
6899+
6900+
unsigned int worker_id;
6901+
ErlNifUInt64 handle_id;
6902+
6903+
if (!enif_get_uint(env, argv[0], &worker_id) ||
6904+
!enif_get_uint64(env, argv[1], &handle_id)) {
6905+
return enif_make_badarg(env);
6906+
}
6907+
6908+
if (worker_id >= (unsigned int)g_thread_pool.num_workers) {
6909+
return ATOM_OK; /* Invalid worker, silently succeed */
6910+
}
6911+
6912+
/* Serialize imports list to ETF */
6913+
ErlNifBinary payload_bin;
6914+
if (!enif_term_to_binary(env, argv[2], &payload_bin)) {
6915+
return ATOM_OK; /* Serialization failed, silently succeed */
6916+
}
6917+
6918+
subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];
6919+
6920+
pthread_mutex_lock(&w->dispatch_mutex);
6921+
6922+
uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
6923+
owngil_header_t header = {
6924+
.magic = OWNGIL_MAGIC,
6925+
.version = OWNGIL_PROTOCOL_VERSION,
6926+
.msg_type = MSG_REQUEST,
6927+
.req_type = REQ_APPLY_IMPORTS,
6928+
.request_id = request_id,
6929+
.handle_id = handle_id,
6930+
.payload_len = payload_bin.size,
6931+
};
6932+
6933+
/* Write header and payload */
6934+
if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) {
6935+
write(w->cmd_pipe[1], payload_bin.data, payload_bin.size);
6936+
/* Wait for response */
6937+
owngil_header_t resp;
6938+
read(w->result_pipe[0], &resp, sizeof(resp));
6939+
}
6940+
6941+
enif_release_binary(&payload_bin);
6942+
pthread_mutex_unlock(&w->dispatch_mutex);
6943+
6944+
return ATOM_OK;
6945+
}
6946+
6947+
/**
6948+
* @brief NIF: Apply paths to OWN_GIL session
6949+
*
6950+
* Adds paths to the worker's sys.path.
6951+
* Args: WorkerId, HandleId, Paths (list of path binaries)
6952+
*/
6953+
static ERL_NIF_TERM nif_owngil_apply_paths(ErlNifEnv *env, int argc,
6954+
const ERL_NIF_TERM argv[]) {
6955+
if (argc != 3) {
6956+
return enif_make_badarg(env);
6957+
}
6958+
6959+
if (!subinterp_thread_pool_is_ready()) {
6960+
return ATOM_OK; /* Silently succeed if pool not ready */
6961+
}
6962+
6963+
unsigned int worker_id;
6964+
ErlNifUInt64 handle_id;
6965+
6966+
if (!enif_get_uint(env, argv[0], &worker_id) ||
6967+
!enif_get_uint64(env, argv[1], &handle_id)) {
6968+
return enif_make_badarg(env);
6969+
}
6970+
6971+
if (worker_id >= (unsigned int)g_thread_pool.num_workers) {
6972+
return ATOM_OK; /* Invalid worker, silently succeed */
6973+
}
6974+
6975+
/* Serialize paths list to ETF */
6976+
ErlNifBinary payload_bin;
6977+
if (!enif_term_to_binary(env, argv[2], &payload_bin)) {
6978+
return ATOM_OK; /* Serialization failed, silently succeed */
6979+
}
6980+
6981+
subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];
6982+
6983+
pthread_mutex_lock(&w->dispatch_mutex);
6984+
6985+
uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
6986+
owngil_header_t header = {
6987+
.magic = OWNGIL_MAGIC,
6988+
.version = OWNGIL_PROTOCOL_VERSION,
6989+
.msg_type = MSG_REQUEST,
6990+
.req_type = REQ_APPLY_PATHS,
6991+
.request_id = request_id,
6992+
.handle_id = handle_id,
6993+
.payload_len = payload_bin.size,
6994+
};
6995+
6996+
/* Write header and payload */
6997+
if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) {
6998+
write(w->cmd_pipe[1], payload_bin.data, payload_bin.size);
6999+
/* Wait for response */
7000+
owngil_header_t resp;
7001+
read(w->result_pipe[0], &resp, sizeof(resp));
7002+
}
7003+
7004+
enif_release_binary(&payload_bin);
7005+
pthread_mutex_unlock(&w->dispatch_mutex);
7006+
7007+
return ATOM_OK;
7008+
}
7009+
68847010
#else /* !HAVE_SUBINTERPRETERS */
68857011

68867012
/* Stub implementations for Python < 3.12 */
@@ -6980,6 +7106,18 @@ static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc,
69807106
return ATOM_OK;
69817107
}
69827108

7109+
static ERL_NIF_TERM nif_owngil_apply_imports(ErlNifEnv *env, int argc,
7110+
const ERL_NIF_TERM argv[]) {
7111+
(void)argc; (void)argv;
7112+
return ATOM_OK;
7113+
}
7114+
7115+
static ERL_NIF_TERM nif_owngil_apply_paths(ErlNifEnv *env, int argc,
7116+
const ERL_NIF_TERM argv[]) {
7117+
(void)argc; (void)argv;
7118+
return ATOM_OK;
7119+
}
7120+
69837121
#endif /* HAVE_SUBINTERPRETERS */
69847122

69857123
/* ============================================================================
@@ -7239,6 +7377,8 @@ static ErlNifFunc nif_funcs[] = {
72397377
{"owngil_create_session", 1, nif_owngil_create_session, 0},
72407378
{"owngil_submit_task", 7, nif_owngil_submit_task, 0},
72417379
{"owngil_destroy_session", 2, nif_owngil_destroy_session, 0},
7380+
{"owngil_apply_imports", 3, nif_owngil_apply_imports, 0},
7381+
{"owngil_apply_paths", 3, nif_owngil_apply_paths, 0},
72427382

72437383
/* Execution mode info */
72447384
{"execution_mode", 0, nif_execution_mode, 0},

c_src/py_subinterp_thread.c

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,105 @@ static void *worker_thread_main(void *arg) {
425425
continue;
426426
}
427427

428+
/* Handle apply imports - imports modules into sys.modules */
429+
if (header.req_type == REQ_APPLY_IMPORTS) {
430+
/* Payload is ETF list of {ModuleBin, FuncBin | all} tuples */
431+
if (payload != NULL && header.payload_len > 0) {
432+
ErlNifEnv *tmp_env = enif_alloc_env();
433+
if (tmp_env != NULL) {
434+
ERL_NIF_TERM imports_list;
435+
if (enif_binary_to_term(tmp_env, payload, header.payload_len,
436+
&imports_list, 0) != 0) {
437+
ERL_NIF_TERM head, tail = imports_list;
438+
int arity;
439+
const ERL_NIF_TERM *tuple;
440+
while (enif_get_list_cell(tmp_env, tail, &head, &tail)) {
441+
if (enif_get_tuple(tmp_env, head, &arity, &tuple) && arity == 2) {
442+
ErlNifBinary module_bin;
443+
if (enif_inspect_binary(tmp_env, tuple[0], &module_bin)) {
444+
char *module_name = enif_alloc(module_bin.size + 1);
445+
if (module_name != NULL) {
446+
memcpy(module_name, module_bin.data, module_bin.size);
447+
module_name[module_bin.size] = '\0';
448+
/* Skip __main__ */
449+
if (strcmp(module_name, "__main__") != 0) {
450+
PyObject *mod = PyImport_ImportModule(module_name);
451+
if (mod != NULL) {
452+
Py_DECREF(mod);
453+
} else {
454+
PyErr_Clear();
455+
}
456+
}
457+
enif_free(module_name);
458+
}
459+
}
460+
}
461+
}
462+
}
463+
enif_free_env(tmp_env);
464+
}
465+
}
466+
PyEval_SaveThread();
467+
/* Send OK response */
468+
owngil_header_t resp = {
469+
.magic = OWNGIL_MAGIC,
470+
.version = OWNGIL_PROTOCOL_VERSION,
471+
.msg_type = MSG_RESPONSE,
472+
.request_id = header.request_id,
473+
.payload_len = 0,
474+
};
475+
write_full(w->result_pipe[1], &resp, sizeof(resp));
476+
free(payload);
477+
continue;
478+
}
479+
480+
/* Handle apply paths - add paths to sys.path */
481+
if (header.req_type == REQ_APPLY_PATHS) {
482+
/* Payload is ETF list of path binaries */
483+
if (payload != NULL && header.payload_len > 0) {
484+
ErlNifEnv *tmp_env = enif_alloc_env();
485+
if (tmp_env != NULL) {
486+
ERL_NIF_TERM paths_list;
487+
if (enif_binary_to_term(tmp_env, payload, header.payload_len,
488+
&paths_list, 0) != 0) {
489+
PyObject *sys_path = PySys_GetObject("path");
490+
if (sys_path != NULL && PyList_Check(sys_path)) {
491+
ERL_NIF_TERM head, tail = paths_list;
492+
/* Insert in reverse order so first path ends up first */
493+
while (enif_get_list_cell(tmp_env, tail, &head, &tail)) {
494+
ErlNifBinary path_bin;
495+
if (enif_inspect_binary(tmp_env, head, &path_bin)) {
496+
PyObject *path_str = PyUnicode_FromStringAndSize(
497+
(const char *)path_bin.data, path_bin.size);
498+
if (path_str != NULL) {
499+
/* Check if path already in sys.path */
500+
int contains = PySequence_Contains(sys_path, path_str);
501+
if (contains == 0) {
502+
PyList_Insert(sys_path, 0, path_str);
503+
}
504+
Py_DECREF(path_str);
505+
}
506+
}
507+
}
508+
}
509+
}
510+
enif_free_env(tmp_env);
511+
}
512+
}
513+
PyEval_SaveThread();
514+
/* Send OK response */
515+
owngil_header_t resp = {
516+
.magic = OWNGIL_MAGIC,
517+
.version = OWNGIL_PROTOCOL_VERSION,
518+
.msg_type = MSG_RESPONSE,
519+
.request_id = header.request_id,
520+
.payload_len = 0,
521+
};
522+
write_full(w->result_pipe[1], &resp, sizeof(resp));
523+
free(payload);
524+
continue;
525+
}
526+
428527
/* Find namespace for this handle */
429528
subinterp_namespace_t *ns = worker_find_namespace(w, header.handle_id);
430529
if (ns == NULL) {

c_src/py_subinterp_thread.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ typedef enum {
9292
REQ_ASYNC_CALL = 5, /**< Async - response via erlang.send() */
9393
REQ_CREATE_NS = 10, /**< Create namespace for handle */
9494
REQ_DESTROY_NS = 11, /**< Destroy namespace for handle */
95+
REQ_APPLY_IMPORTS = 12, /**< Apply imports to namespace's sys.modules */
96+
REQ_APPLY_PATHS = 13, /**< Apply paths to namespace's sys.path */
9597
REQ_SHUTDOWN = 99, /**< Shutdown the worker */
9698
} owngil_req_type_t;
9799

src/py_event_loop_pool.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
%% Per-process namespace API
4545
exec/1, exec/2,
4646
eval/1, eval/2,
47-
get_all_loops/0
47+
get_all_loops/0,
48+
get_all_sessions/0,
49+
is_owngil_enabled/0
4850
]).
4951

5052
%% Legacy API
@@ -349,6 +351,24 @@ get_all_loops() ->
349351
{ok, [element(Idx, Loops) || Idx <- lists:seq(1, N)]}
350352
end.
351353

354+
%% @doc Get all OWN_GIL sessions in the pool.
355+
%%
356+
%% Returns a list of {WorkerId, HandleId} tuples for all active sessions.
357+
%% Used by py_import to apply imports/paths to running OWN_GIL sessions.
358+
-spec get_all_sessions() -> [{non_neg_integer(), non_neg_integer()}].
359+
get_all_sessions() ->
360+
case get_sessions_table() of
361+
undefined -> [];
362+
Tid ->
363+
ets:foldl(
364+
fun(#owngil_session{worker_id = W, handle_id = H}, Acc) ->
365+
[{W, H} | Acc]
366+
end,
367+
[],
368+
Tid
369+
)
370+
end.
371+
352372
%%% ============================================================================
353373
%%% Legacy API
354374
%%% ============================================================================

src/py_import.erl

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ load_config() ->
387387
),
388388
ok.
389389

390-
%% @private Apply import to all running interpreters (contexts + event loops)
390+
%% @private Apply import to all running interpreters (contexts + event loops + OWN_GIL sessions)
391391
apply_import_to_interpreters(ModuleBin) ->
392392
Imports = [{ModuleBin, all}],
393393
%% Apply to all contexts
@@ -418,9 +418,21 @@ apply_import_to_interpreters(ModuleBin) ->
418418
);
419419
_ -> ok
420420
end,
421+
%% Apply to all OWN_GIL sessions (if enabled)
422+
case py_event_loop_pool:is_owngil_enabled() of
423+
true ->
424+
lists:foreach(
425+
fun({WorkerId, HandleId}) ->
426+
catch py_nif:owngil_apply_imports(WorkerId, HandleId, Imports)
427+
end,
428+
py_event_loop_pool:get_all_sessions()
429+
);
430+
false ->
431+
ok
432+
end,
421433
ok.
422434

423-
%% @private Apply path to all running interpreters (contexts + event loops)
435+
%% @private Apply path to all running interpreters (contexts + event loops + OWN_GIL sessions)
424436
apply_path_to_interpreters(PathBin) ->
425437
Paths = [PathBin],
426438
%% Apply to all contexts
@@ -451,6 +463,18 @@ apply_path_to_interpreters(PathBin) ->
451463
);
452464
_ -> ok
453465
end,
466+
%% Apply to all OWN_GIL sessions (if enabled)
467+
case py_event_loop_pool:is_owngil_enabled() of
468+
true ->
469+
lists:foreach(
470+
fun({WorkerId, HandleId}) ->
471+
catch py_nif:owngil_apply_paths(WorkerId, HandleId, Paths)
472+
end,
473+
py_event_loop_pool:get_all_sessions()
474+
);
475+
false ->
476+
ok
477+
end,
454478
ok.
455479

456480
%% @private Get all context pids from all pools

src/py_nif.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@
7979
owngil_create_session/1,
8080
owngil_submit_task/7,
8181
owngil_destroy_session/2,
82+
owngil_apply_imports/3,
83+
owngil_apply_paths/3,
8284
%% Execution mode info
8385
execution_mode/0,
8486
num_executors/0,
@@ -645,6 +647,18 @@ owngil_submit_task(_WorkerId, _HandleId, _CallerPid, _Ref, _Module, _Func, _Args
645647
owngil_destroy_session(_WorkerId, _HandleId) ->
646648
?NIF_STUB.
647649

650+
%% @doc Apply imports to an OWN_GIL session.
651+
%% Imports modules into the worker's sys.modules.
652+
-spec owngil_apply_imports(non_neg_integer(), non_neg_integer(), [{binary(), binary() | all}]) -> ok | {error, term()}.
653+
owngil_apply_imports(_WorkerId, _HandleId, _Imports) ->
654+
?NIF_STUB.
655+
656+
%% @doc Apply paths to an OWN_GIL session.
657+
%% Adds paths to the worker's sys.path.
658+
-spec owngil_apply_paths(non_neg_integer(), non_neg_integer(), [binary()]) -> ok | {error, term()}.
659+
owngil_apply_paths(_WorkerId, _HandleId, _Paths) ->
660+
?NIF_STUB.
661+
648662
%%% ============================================================================
649663
%%% Execution Mode Info
650664
%%% ============================================================================

0 commit comments

Comments
 (0)