Skip to content

Commit 72d5941

Browse files
committed
Implement scalable I/O model with worker-per-context
Restructure event loop to follow Erlang's scalable I/O model: - Add py_event_worker: dedicated worker process per Python context that receives FD events and timers directly via enif_select - Add py_event_worker_registry: ETS-based O(1) worker lookup by loop_id - Add py_event_worker_sup: simple_one_for_one supervisor for workers C layer changes: - Add worker_pid, has_worker, loop_id fields to erlang_event_loop_t - Add nif_event_loop_set_worker and nif_event_loop_set_id NIFs - Update add_reader/writer/call_later/cancel_timer to route to worker when available, falling back to router for backward compatibility Performance improvements (vs baseline): - Single-worker timer throughput: +11.7% - Timer latency p95: -8.5% - TCP scaling efficiency at 2 workers: +20.6% All 113 tests pass.
1 parent e5ce2b4 commit 72d5941

13 files changed

+1161
-30
lines changed

c_src/py_event_loop.c

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,66 @@ ERL_NIF_TERM nif_event_loop_set_router(ErlNifEnv *env, int argc,
510510
return ATOM_OK;
511511
}
512512

513+
/**
514+
* event_loop_set_worker(LoopRef, WorkerPid) -> ok
515+
* Scalable I/O model: set the worker process for direct event routing.
516+
*/
517+
ERL_NIF_TERM nif_event_loop_set_worker(ErlNifEnv *env, int argc,
518+
const ERL_NIF_TERM argv[]) {
519+
(void)argc;
520+
521+
erlang_event_loop_t *loop;
522+
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
523+
(void **)&loop)) {
524+
return make_error(env, "invalid_loop");
525+
}
526+
527+
if (!enif_get_local_pid(env, argv[1], &loop->worker_pid)) {
528+
return make_error(env, "invalid_pid");
529+
}
530+
531+
loop->has_worker = true;
532+
533+
/* Also set as router for compatibility */
534+
if (!loop->has_router) {
535+
loop->router_pid = loop->worker_pid;
536+
loop->has_router = true;
537+
}
538+
539+
return ATOM_OK;
540+
}
541+
542+
/**
543+
* event_loop_set_id(LoopRef, LoopId) -> ok
544+
*/
545+
ERL_NIF_TERM nif_event_loop_set_id(ErlNifEnv *env, int argc,
546+
const ERL_NIF_TERM argv[]) {
547+
(void)argc;
548+
549+
erlang_event_loop_t *loop;
550+
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
551+
(void **)&loop)) {
552+
return make_error(env, "invalid_loop");
553+
}
554+
555+
ErlNifBinary id_bin;
556+
if (!enif_inspect_binary(env, argv[1], &id_bin)) {
557+
char atom_buf[64];
558+
if (!enif_get_atom(env, argv[1], atom_buf, sizeof(atom_buf), ERL_NIF_LATIN1)) {
559+
return make_error(env, "invalid_id");
560+
}
561+
strncpy(loop->loop_id, atom_buf, sizeof(loop->loop_id) - 1);
562+
loop->loop_id[sizeof(loop->loop_id) - 1] = '\0';
563+
} else {
564+
size_t copy_len = id_bin.size < sizeof(loop->loop_id) - 1 ?
565+
id_bin.size : sizeof(loop->loop_id) - 1;
566+
memcpy(loop->loop_id, id_bin.data, copy_len);
567+
loop->loop_id[copy_len] = '\0';
568+
}
569+
570+
return ATOM_OK;
571+
}
572+
513573
/**
514574
* add_reader(LoopRef, Fd, CallbackId) -> {ok, FdRef}
515575
*/
@@ -533,9 +593,11 @@ ERL_NIF_TERM nif_add_reader(ErlNifEnv *env, int argc,
533593
return make_error(env, "invalid_callback_id");
534594
}
535595

536-
if (!loop->has_router) {
596+
/* Scalable I/O: prefer worker, fall back to router */
597+
if (!loop->has_worker && !loop->has_router) {
537598
return make_error(env, "no_router");
538599
}
600+
ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid;
539601

540602
/* Allocate fd resource */
541603
fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE,
@@ -547,7 +609,7 @@ ERL_NIF_TERM nif_add_reader(ErlNifEnv *env, int argc,
547609
fd_res->fd = fd;
548610
fd_res->read_callback_id = callback_id;
549611
fd_res->write_callback_id = 0;
550-
fd_res->owner_pid = loop->router_pid;
612+
fd_res->owner_pid = *target_pid;
551613
fd_res->reader_active = true;
552614
fd_res->writer_active = false;
553615
fd_res->loop = loop;
@@ -558,14 +620,14 @@ ERL_NIF_TERM nif_add_reader(ErlNifEnv *env, int argc,
558620
fd_res->owns_fd = false;
559621

560622
/* Monitor owner process for cleanup on death */
561-
if (enif_monitor_process(env, fd_res, &loop->router_pid,
623+
if (enif_monitor_process(env, fd_res, target_pid,
562624
&fd_res->owner_monitor) == 0) {
563625
fd_res->monitor_active = true;
564626
}
565627

566628
/* Register with Erlang scheduler for read monitoring */
567629
int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_READ,
568-
fd_res, &loop->router_pid, enif_make_ref(env));
630+
fd_res, target_pid, enif_make_ref(env));
569631

570632
if (ret < 0) {
571633
if (fd_res->monitor_active) {
@@ -643,9 +705,11 @@ ERL_NIF_TERM nif_add_writer(ErlNifEnv *env, int argc,
643705
return make_error(env, "invalid_callback_id");
644706
}
645707

646-
if (!loop->has_router) {
708+
/* Scalable I/O: prefer worker, fall back to router */
709+
if (!loop->has_worker && !loop->has_router) {
647710
return make_error(env, "no_router");
648711
}
712+
ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid;
649713

650714
/* Allocate fd resource */
651715
fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE,
@@ -657,7 +721,7 @@ ERL_NIF_TERM nif_add_writer(ErlNifEnv *env, int argc,
657721
fd_res->fd = fd;
658722
fd_res->read_callback_id = 0;
659723
fd_res->write_callback_id = callback_id;
660-
fd_res->owner_pid = loop->router_pid;
724+
fd_res->owner_pid = *target_pid;
661725
fd_res->reader_active = false;
662726
fd_res->writer_active = true;
663727
fd_res->loop = loop;
@@ -668,14 +732,14 @@ ERL_NIF_TERM nif_add_writer(ErlNifEnv *env, int argc,
668732
fd_res->owns_fd = false;
669733

670734
/* Monitor owner process for cleanup on death */
671-
if (enif_monitor_process(env, fd_res, &loop->router_pid,
735+
if (enif_monitor_process(env, fd_res, target_pid,
672736
&fd_res->owner_monitor) == 0) {
673737
fd_res->monitor_active = true;
674738
}
675739

676740
/* Register with Erlang scheduler for write monitoring */
677741
int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_WRITE,
678-
fd_res, &loop->router_pid, enif_make_ref(env));
742+
fd_res, target_pid, enif_make_ref(env));
679743

680744
if (ret < 0) {
681745
if (fd_res->monitor_active) {
@@ -755,14 +819,16 @@ ERL_NIF_TERM nif_call_later(ErlNifEnv *env, int argc,
755819
return make_error(env, "invalid_callback_id");
756820
}
757821

758-
if (!loop->has_router) {
822+
/* Scalable I/O: prefer worker, fall back to router */
823+
if (!loop->has_worker && !loop->has_router) {
759824
return make_error(env, "no_router");
760825
}
826+
ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid;
761827

762828
/* Create timer reference */
763829
ERL_NIF_TERM timer_ref = enif_make_ref(env);
764830

765-
/* Send message to router: {start_timer, DelayMs, CallbackId, TimerRef} */
831+
/* Send message to target: {start_timer, DelayMs, CallbackId, TimerRef} */
766832
ERL_NIF_TERM msg = enif_make_tuple4(
767833
env,
768834
ATOM_START_TIMER,
@@ -771,7 +837,7 @@ ERL_NIF_TERM nif_call_later(ErlNifEnv *env, int argc,
771837
timer_ref
772838
);
773839

774-
if (!enif_send(env, &loop->router_pid, NULL, msg)) {
840+
if (!enif_send(env, target_pid, NULL, msg)) {
775841
return make_error(env, "send_failed");
776842
}
777843

@@ -793,14 +859,16 @@ ERL_NIF_TERM nif_cancel_timer(ErlNifEnv *env, int argc,
793859

794860
ERL_NIF_TERM timer_ref = argv[1];
795861

796-
if (!loop->has_router) {
862+
/* Scalable I/O: prefer worker, fall back to router */
863+
if (!loop->has_worker && !loop->has_router) {
797864
return make_error(env, "no_router");
798865
}
866+
ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid;
799867

800-
/* Send message to router: {cancel_timer, TimerRef} */
868+
/* Send message to target: {cancel_timer, TimerRef} */
801869
ERL_NIF_TERM msg = enif_make_tuple2(env, ATOM_CANCEL_TIMER, timer_ref);
802870

803-
if (!enif_send(env, &loop->router_pid, NULL, msg)) {
871+
if (!enif_send(env, target_pid, NULL, msg)) {
804872
return make_error(env, "send_failed");
805873
}
806874

c_src/py_event_loop.h

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,17 +165,27 @@ typedef struct {
165165
* @brief Main state for the Erlang-backed asyncio event loop
166166
*
167167
* This structure maintains all state needed for the event loop:
168-
* - Reference to the Erlang router process
168+
* - Reference to the Erlang worker process (scalable I/O model)
169+
* - Reference to the Erlang router process (legacy)
169170
* - Pending events queue
170171
* - Synchronization primitives
171172
*/
172173
typedef struct erlang_event_loop {
173-
/** @brief PID of the py_event_router gen_server */
174+
/** @brief PID of the py_event_router gen_server (legacy) */
174175
ErlNifPid router_pid;
175176

176177
/** @brief Whether router_pid has been set */
177178
bool has_router;
178179

180+
/** @brief PID of the py_event_worker gen_server (scalable I/O model) */
181+
ErlNifPid worker_pid;
182+
183+
/** @brief Whether worker_pid has been set */
184+
bool has_worker;
185+
186+
/** @brief Loop identifier for routing */
187+
char loop_id[64];
188+
179189
/** @brief Mutex protecting the event loop state */
180190
pthread_mutex_t mutex;
181191

@@ -301,13 +311,29 @@ ERL_NIF_TERM nif_event_loop_destroy(ErlNifEnv *env, int argc,
301311
const ERL_NIF_TERM argv[]);
302312

303313
/**
304-
* @brief Set the router PID for the event loop
314+
* @brief Set the router PID for the event loop (legacy)
305315
*
306316
* NIF: event_loop_set_router(LoopRef, RouterPid) -> ok | {error, Reason}
307317
*/
308318
ERL_NIF_TERM nif_event_loop_set_router(ErlNifEnv *env, int argc,
309319
const ERL_NIF_TERM argv[]);
310320

321+
/**
322+
* @brief Set the worker PID for the event loop (scalable I/O model)
323+
*
324+
* NIF: event_loop_set_worker(LoopRef, WorkerPid) -> ok | {error, Reason}
325+
*/
326+
ERL_NIF_TERM nif_event_loop_set_worker(ErlNifEnv *env, int argc,
327+
const ERL_NIF_TERM argv[]);
328+
329+
/**
330+
* @brief Set the loop identifier
331+
*
332+
* NIF: event_loop_set_id(LoopRef, LoopId) -> ok | {error, Reason}
333+
*/
334+
ERL_NIF_TERM nif_event_loop_set_id(ErlNifEnv *env, int argc,
335+
const ERL_NIF_TERM argv[]);
336+
311337
/**
312338
* @brief Register a file descriptor for read monitoring
313339
*

c_src/py_nif.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1882,6 +1882,8 @@ static ErlNifFunc nif_funcs[] = {
18821882
{"event_loop_new", 0, nif_event_loop_new, 0},
18831883
{"event_loop_destroy", 1, nif_event_loop_destroy, 0},
18841884
{"event_loop_set_router", 2, nif_event_loop_set_router, 0},
1885+
{"event_loop_set_worker", 2, nif_event_loop_set_worker, 0},
1886+
{"event_loop_set_id", 2, nif_event_loop_set_id, 0},
18851887
{"event_loop_wakeup", 1, nif_event_loop_wakeup, 0},
18861888
{"add_reader", 3, nif_add_reader, 0},
18871889
{"remove_reader", 2, nif_remove_reader, 0},

0 commit comments

Comments
 (0)