diff --git a/src/core/ipc.c b/src/core/ipc.c index f2562541..9c597525 100644 --- a/src/core/ipc.c +++ b/src/core/ipc.c @@ -51,6 +51,9 @@ #endif #include "lang/eval.h" +#include "lang/env.h" +#include "lang/internal.h" +#include "table/sym.h" /* ===== Compression (delta + RLE) ===== */ @@ -182,6 +185,126 @@ static bool validate_creds(const uint8_t* buf, uint8_t cred_len, return ct_eq(pw_buf, secret, 256); } +/* ===== Connection hooks (.ipc.on.*) ===== + * + * Five user-settable lambdas that intercept the connection lifecycle. + * Lookup is by interned sym id; we cache the ids in `hook_syms[]` so the + * fast path is a single ray_env_get + RAY_LAMBDA-type check per dispatch. + * + * `g_current_handle` is the thread-local value `.ipc.handle` reads back + * to Rayfall. HOOK_SCOPE saves/restores it around each invocation, so a + * hook that opens its own connection (calling `.ipc.handle` inside a + * nested `.ipc.send` round-trip) still sees the outer handle when its + * body resumes. Default value -1 means "no hook is currently on the + * stack" — exposed verbatim through the builtin. + * + * Errors: + * - on.open / on.close / on.async: logged to stderr, swallowed. + * - on.sync: error becomes the response (same as a raw `eval` error). + * - on.auth: error treated as reject (handshake refused). */ + +/* Hook indices must match the order in `src/lang/env.c` + * (g_ipc_hook_syms[]) — the `ray_sym_ipc_hook(idx)` getter assumes this + * mapping. Keep them in lockstep. */ +enum { + IPC_HOOK_OPEN = 0, + IPC_HOOK_CLOSE = 1, + IPC_HOOK_SYNC = 2, + IPC_HOOK_ASYNC = 3, + IPC_HOOK_AUTH = 4, + IPC_HOOK_COUNT = 5, +}; + +static _Thread_local int64_t g_current_handle = -1; + +int64_t ray_ipc_current_handle(void) { + return g_current_handle; +} + +/* Fetch the hook lambda if one is installed and is in fact a lambda. + * Non-lambda bindings (cleared via `set .ipc.on.X 0` or never bound) + * yield NULL — caller falls back to default behaviour. Returns a + * borrowed ref; do not release. Sym IDs come from env.c's central + * cache, so a runtime destroy/recreate cycle invalidates them in one + * place and the lookup here always sees IDs from the current sym + * table. */ +static ray_t* hook_lookup(int idx) { + int64_t sym = ray_sym_ipc_hook(idx); + if (sym < 0) return NULL; + ray_t* fn = ray_env_get(sym); + if (!fn || fn->type != RAY_LAMBDA) return NULL; + return fn; +} + +/* Call a single-arg hook for lifecycle events (on.open / on.close). + * Errors are logged and swallowed — a buggy logging hook must never + * wedge connection teardown. */ +static void hook_call_lifecycle(int idx, int64_t handle) { + ray_t* fn = hook_lookup(idx); + if (!fn) return; + ray_t* arg = make_i64(handle); + if (!arg || RAY_IS_ERR(arg)) { if (arg) ray_release(arg); return; } + int64_t prev = g_current_handle; + g_current_handle = handle; + ray_t* r = call_fn1(fn, arg); + g_current_handle = prev; + if (r && RAY_IS_ERR(r)) { + const char* name = (idx == IPC_HOOK_OPEN) ? ".ipc.on.open" : ".ipc.on.close"; + fprintf(stderr, "ipc: %s hook raised an error (handle=%lld)\n", + name, (long long)handle); + } + ray_release(arg); + if (r && r != RAY_NULL_OBJ) ray_release(r); +} + +/* Call the on.auth hook with (user, pass) string atoms. Returns: + * - 1 → hook ran and returned truthy; caller continues the handshake. + * - 0 → hook ran and returned falsy (or errored); caller rejects. + * - -1 → no hook installed; caller uses the existing pass-through. + * The constant-time secret compare in validate_creds always runs first, + * so this hook can only narrow access — never widen it. */ +static int hook_call_auth(int64_t handle, const uint8_t* cred_buf, + uint8_t cred_len) { + ray_t* fn = hook_lookup(IPC_HOOK_AUTH); + if (!fn) return -1; + + /* Split user:pass exactly the way validate_creds does — colon- + * separated, with the leading user part possibly empty. Strip the + * trailing NUL the client appends so the hook sees clean strings. */ + const char* creds = (const char*)cred_buf; + const char* colon = memchr(creds, ':', cred_len); + const char* upart = creds; + size_t ulen = colon ? (size_t)(colon - creds) : 0; + const char* ppart = colon ? colon + 1 : creds; + size_t plen = colon ? (size_t)(cred_len - (ppart - creds)) + : (size_t)cred_len; + if (plen > 0 && ppart[plen - 1] == '\0') plen--; + + ray_t* u = ray_str(upart, ulen); + ray_t* p = ray_str(ppart, plen); + if (!u || !p || RAY_IS_ERR(u) || RAY_IS_ERR(p)) { + if (u && !RAY_IS_ERR(u)) ray_release(u); + if (p && !RAY_IS_ERR(p)) ray_release(p); + return 0; /* allocation failure → reject conservatively */ + } + int64_t prev = g_current_handle; + g_current_handle = handle; + ray_t* r = call_fn2(fn, u, p); + g_current_handle = prev; + ray_release(u); + ray_release(p); + + int ok; + if (!r || RAY_IS_ERR(r)) { + fprintf(stderr, "ipc: .ipc.on.auth hook raised an error — rejecting\n"); + ok = 0; + } else { + ok = is_truthy(r) ? 1 : 0; + } + if (r && r != RAY_NULL_OBJ) ray_release(r); + return ok; +} + static void send_response(ray_sock_t fd, ray_t* result) { int64_t ser_size = ray_serde_size(result); @@ -299,7 +422,28 @@ static ray_t* eval_payload_core(uint8_t* payload, size_t payload_len, ray_t* result = NULL; if (msg && !RAY_IS_ERR(msg)) { - if (msg->type == -RAY_STR) { + /* Dispatch through `.ipc.on.sync` / `.ipc.on.async` hook if + * installed; otherwise fall back to v1's inline-eval default. + * The hook receives the raw deserialised payload — same shape + * any other Rayfall lambda would see — so a hook installed + * as `{[m] eval m}` reproduces the default behaviour. */ + int hook_idx = (hdr->msgtype == RAY_IPC_MSG_SYNC) ? IPC_HOOK_SYNC + : IPC_HOOK_ASYNC; + ray_t* hook = hook_lookup(hook_idx); + if (hook) { + result = call_fn1(hook, msg); + ray_release(msg); + /* Async errors have nowhere to go on the wire (async never + * sends a response), so log + drop here. Without this the + * caller would silently release the error and the operator + * would never see the hook misbehaving. */ + if (result && RAY_IS_ERR(result) && + hdr->msgtype == RAY_IPC_MSG_ASYNC) { + fprintf(stderr, "ipc: .ipc.on.async hook raised an error\n"); + ray_release(result); + result = NULL; + } + } else if (msg->type == -RAY_STR) { const char* str = ray_str_ptr(msg); size_t slen = ray_str_len(msg); if (str && slen > 0) { @@ -492,6 +636,10 @@ static ray_t* ipc_read_handshake(ray_poll_t* poll, ray_selector_t* sel) cd->phase = RAY_IPC_PHASE_HEADER; sel->rx.read_fn = ipc_read_header; ray_poll_rx_request(poll, sel, sizeof(ray_ipc_header_t)); + /* No-auth path: connection is now fully ready for inbound messages. + * Fire `.ipc.on.open` AFTER we've requested the next read, so a + * hook that calls back into the server can't race the read pump. */ + hook_call_lifecycle(IPC_HOOK_OPEN, sel->id); return NULL; } @@ -525,6 +673,17 @@ static ray_t* ipc_read_creds(ray_poll_t* poll, ray_selector_t* sel) bool ok = validate_creds(sel->rx.buf->data + 1, cred_len, poll->auth_secret); + + /* Secondary user-defined check via `.ipc.on.auth`. Only consulted + * when the constant-time secret compare already passed — this hook + * can narrow access (deny extras) but never widen it. Errors and + * falsy returns flip `ok` to false, triggering the same reject byte + * + deregister the secret-mismatch path would. */ + if (ok) { + int hook_ok = hook_call_auth(sel->id, sel->rx.buf->data + 1, cred_len); + if (hook_ok == 0) ok = false; + } + uint8_t result = ok ? 0x00 : 0x01; ray_sock_send((ray_sock_t)sel->fd, &result, 1); @@ -536,6 +695,10 @@ static ray_t* ipc_read_creds(ray_poll_t* poll, ray_selector_t* sel) cd->phase = RAY_IPC_PHASE_HEADER; sel->rx.read_fn = ipc_read_header; ray_poll_rx_request(poll, sel, sizeof(ray_ipc_header_t)); + /* Auth path: fully handshaked and authed — connection is now ready + * for inbound messages. Same ordering as the no-auth branch above: + * fire AFTER the next read is requested. */ + hook_call_lifecycle(IPC_HOOK_OPEN, sel->id); return NULL; } @@ -573,10 +736,19 @@ static ray_t* ipc_read_payload(ray_poll_t* poll, ray_selector_t* sel) bool prev_restricted = ray_eval_get_restricted(); ray_eval_set_restricted(cd->restricted); + /* Expose this connection's selector id to `.ipc.handle` for the + * duration of any `.ipc.on.sync` / `.ipc.on.async` hook that runs + * inside eval_payload. Save/restore so a hook that itself opens + * a nested IPC round-trip doesn't leave the wrong handle visible + * when its caller resumes. */ + int64_t prev_handle = g_current_handle; + g_current_handle = sel->id; + /* Eval and produce result */ ray_t* result = eval_payload(sel->rx.buf->data, (size_t)sel->rx.buf->offset, &cd->hdr); + g_current_handle = prev_handle; ray_eval_set_restricted(prev_restricted); /* Send response for sync messages */ @@ -595,7 +767,20 @@ static ray_t* ipc_read_payload(ray_poll_t* poll, ray_selector_t* sel) static void ipc_on_close(ray_poll_t* poll, ray_selector_t* sel) { (void)poll; + /* Fire `.ipc.on.close` BEFORE tearing the per-conn state down so a + * hook reading `.ipc.handle` still sees this connection's id, and + * before the listener's own close path (which would otherwise also + * route through here) runs the hook with a stale fd. Guard on: + * - sel->data: the listener itself has no conn data. + * - phase ≥ HEADER: the connection actually completed handshake + * (otherwise no matching on.open was fired, so on.close must + * also stay silent to keep the pair balanced for the user). */ if (sel->data) { + ray_ipc_conn_data_t* cd = (ray_ipc_conn_data_t*)sel->data; + if (cd->phase == RAY_IPC_PHASE_HEADER || + cd->phase == RAY_IPC_PHASE_PAYLOAD) { + hook_call_lifecycle(IPC_HOOK_CLOSE, sel->id); + } ray_sys_free(sel->data); sel->data = NULL; } @@ -630,6 +815,15 @@ int64_t ray_ipc_listen(ray_poll_t* poll, uint16_t port) static void conn_close(ray_ipc_server_t* srv, ray_ipc_conn_t* c) { + /* `.ipc.on.close` fires only for conns that were actually opened — + * a slot whose phase never advanced past HANDSHAKE/CREDS was never + * announced via on.open and so shouldn't be announced via on.close. + * Keeps the pair balanced for the user. */ + if (c->phase == RAY_IPC_PHASE_HEADER || + c->phase == RAY_IPC_PHASE_PAYLOAD) { + hook_call_lifecycle(IPC_HOOK_CLOSE, (int64_t)(c - srv->conns)); + } + #if defined(__linux__) epoll_ctl(srv->poll_fd, EPOLL_CTL_DEL, c->fd, NULL); #elif defined(__APPLE__) @@ -678,6 +872,8 @@ static void conn_on_handshake(ray_ipc_server_t* srv, ray_ipc_conn_t* c) c->rx_need = sizeof(ray_ipc_header_t); c->phase = RAY_IPC_PHASE_HEADER; + /* Legacy path mirror of the poll-path post-handshake fire. */ + hook_call_lifecycle(IPC_HOOK_OPEN, (int64_t)(c - srv->conns)); } static void conn_on_header(ray_ipc_server_t* srv, ray_ipc_conn_t* c) @@ -702,8 +898,16 @@ static void conn_on_payload(ray_ipc_server_t* srv, ray_ipc_conn_t* c) bool prev = ray_eval_get_restricted(); ray_eval_set_restricted(srv->restricted); + /* Conn-array index doubles as the handle on the legacy path — + * stable for the connection's lifetime, distinct across active + * connections, freed back to the pool on close. Mirrored shape + * of the poll path's sel->id. */ + int64_t prev_handle = g_current_handle; + g_current_handle = (int64_t)(c - srv->conns); + ray_t* result = eval_payload(c->rx_buf, c->rx_len, &c->hdr); + g_current_handle = prev_handle; ray_eval_set_restricted(prev); if (c->hdr.msgtype == RAY_IPC_MSG_SYNC) @@ -735,6 +939,14 @@ static void conn_on_creds(ray_ipc_server_t* srv, ray_ipc_conn_t* c) uint8_t cred_len = c->rx_buf[0]; bool ok = validate_creds(c->rx_buf + 1, cred_len, srv->auth_secret); + /* Legacy path mirror of the poll-path on.auth call: same handle-as- + * conn-index convention, same narrowing semantics. */ + if (ok) { + int hook_ok = hook_call_auth((int64_t)(c - srv->conns), + c->rx_buf + 1, cred_len); + if (hook_ok == 0) ok = false; + } + uint8_t result = ok ? 0x00 : 0x01; ray_sock_send(c->fd, &result, 1); @@ -748,6 +960,7 @@ static void conn_on_creds(ray_ipc_server_t* srv, ray_ipc_conn_t* c) c->rx_len = 0; c->rx_need = sizeof(ray_ipc_header_t); c->phase = RAY_IPC_PHASE_HEADER; + hook_call_lifecycle(IPC_HOOK_OPEN, (int64_t)(c - srv->conns)); } static void conn_on_readable(ray_ipc_server_t* srv, ray_ipc_conn_t* c) diff --git a/src/core/ipc.h b/src/core/ipc.h index 52f61689..e3aa7536 100644 --- a/src/core/ipc.h +++ b/src/core/ipc.h @@ -63,6 +63,16 @@ size_t ray_ipc_decompress(const uint8_t* src, size_t clen, #define RAY_IPC_FLAG_VERBOSE 0x04 #define RAY_IPC_MAX_CONNS 256 +/* ===== Connection hooks (.ipc.on.*) ===== */ + +/* Current connection handle, readable from Rayfall via the `.ipc.handle` + * builtin while a `.ipc.on.*` hook is on the stack. Set/restored by the + * server around every hook invocation; defaults to -1 outside any hook. + * Thread-local — IPC dispatch is single-threaded today, but the storage + * class keeps the value scoped to the dispatch thread should that ever + * change. */ +int64_t ray_ipc_current_handle(void); + /* ===== Poll-based IPC (new API) ===== */ /* Register IPC listener on poll. Returns selector id or -1. */ diff --git a/src/lang/compile.c b/src/lang/compile.c index 61bc2cf8..586d913a 100644 --- a/src/lang/compile.c +++ b/src/lang/compile.c @@ -236,11 +236,18 @@ static void compile_list(compiler_t *c, ray_t *ast) { * ray_env_set_local enforces on the tree-walking path. * Setting c->error aborts bytecode emission; call_lambda * then falls back to the tree-walking interpreter which - * raises the proper `reserve` error via ray_let_fn. */ + * raises the proper `reserve` error via ray_let_fn. + * The five `.ipc.on.*` connection-hook names are exempt — + * they're user-settable, so a `let .ipc.on.open ...` in a + * compiled body must emit the same OP_STOREENV as any other + * local binding. Without this, the bytecode path would + * abort and silently fall back to the interpreter on every + * such write. */ if (sym_id == sf_let && n == 3) { ray_t *name_obj = elems[1]; if (name_obj->type != -RAY_SYM || - ray_sym_is_reserved(name_obj->i64)) { + (ray_sym_is_reserved(name_obj->i64) && + !ray_sym_is_ipc_hook(name_obj->i64))) { c->error = true; return; } diff --git a/src/lang/env.c b/src/lang/env.c index 125ced49..6b5d8280 100644 --- a/src/lang/env.c +++ b/src/lang/env.c @@ -130,9 +130,52 @@ static _Thread_local int32_t scope_depth = 0; int32_t ray_env_scope_depth(void) { return scope_depth; } int32_t ray_env_global_count(void) { return g_env.count; } +/* The five connection-hook sym ids carved out of the reserved-name reject. + * Populated lazily on first probe; idempotent — ray_sym_intern is content- + * keyed, so repeated calls return the same id. The carve-out applies ONLY + * to these five names; everything else under `.ipc.*` (open/send/close/ + * handle) stays unsettable, as do all `.sys.*` / `.os.*` / etc. + * Lambda parameters with these names remain forbidden — that's pure + * shadowing with no legitimate use. */ +static int64_t g_ipc_hook_syms[5] = {0}; +static bool g_ipc_hook_syms_ready = false; + +static void ipc_hook_syms_ensure(void) { + if (g_ipc_hook_syms_ready) return; + g_ipc_hook_syms[0] = ray_sym_intern(".ipc.on.open", strlen(".ipc.on.open")); + g_ipc_hook_syms[1] = ray_sym_intern(".ipc.on.close", strlen(".ipc.on.close")); + g_ipc_hook_syms[2] = ray_sym_intern(".ipc.on.sync", strlen(".ipc.on.sync")); + g_ipc_hook_syms[3] = ray_sym_intern(".ipc.on.async", strlen(".ipc.on.async")); + g_ipc_hook_syms[4] = ray_sym_intern(".ipc.on.auth", strlen(".ipc.on.auth")); + g_ipc_hook_syms_ready = true; +} + +bool ray_sym_is_ipc_hook(int64_t sym_id) { + ipc_hook_syms_ensure(); + for (int i = 0; i < 5; i++) + if (g_ipc_hook_syms[i] == sym_id) return true; + return false; +} + +int64_t ray_sym_ipc_hook(int idx) { + if (idx < 0 || idx >= 5) return -1; + ipc_hook_syms_ensure(); + return g_ipc_hook_syms[idx]; +} + ray_err_t ray_env_init(void) { memset(&g_env, 0, sizeof(g_env)); scope_depth = 0; + /* The IPC hook sym IDs are interned lazily on first probe via + * `ipc_hook_syms_ensure`. We deliberately do NOT pre-intern here: + * single-char operator names (`+`, `-`, `*`, …) are registered just + * after `ray_env_init` returns, and they need the low sym-ID slots. + * Several call sites (notably `resolve` on tables of small ints) + * depend on those low IDs corresponding to operator chars so the + * "slen < 2 / operator-char" guard rejects them. Stealing IDs 1-5 + * for long dotted names would silently promote `[1 2 3]` columns to + * SYM. Lazy interning runs after all builtin registration, so the + * hook syms land safely past the operator block. */ return RAY_OK; } @@ -143,6 +186,12 @@ void ray_env_destroy(void) { if (g_env.vals[i]) ray_release(g_env.vals[i]); } memset(&g_env, 0, sizeof(g_env)); + /* Invalidate the IPC hook sym cache: the runtime destroy path tears + * down the sym table too, so the cached sym IDs in `g_ipc_hook_syms` + * are stale by the time the next runtime spins up. Reset the flag + * so the next ray_env_init re-interns fresh IDs from the new sym + * table. */ + g_ipc_hook_syms_ready = false; } /* Flat (non-dotted) lookup — scope stack top-down, then global env. @@ -541,7 +590,11 @@ ray_err_t ray_env_bind_flat(int64_t sym_id, ray_t* val) { } ray_err_t ray_env_set(int64_t sym_id, ray_t* val) { - if (ray_sym_is_reserved(sym_id)) return RAY_ERR_RESERVED; + /* Reserved-namespace gate, with a narrow carve-out for the five IPC + * connection hooks. Everything else under `.ipc.*` and all other + * dotted system namespaces remain unsettable. */ + if (ray_sym_is_reserved(sym_id) && !ray_sym_is_ipc_hook(sym_id)) + return RAY_ERR_RESERVED; /* Same machinery as ray_env_bind, but routes through the user-flagged * binder so the journal snapshot can pick this slot. Without this * flip, env_bind_global would also be reached via ray_env_bind below @@ -662,8 +715,11 @@ ray_err_t ray_env_set_local(int64_t sym_id, ray_t* val) { /* Reserved names (.sys.*, .os.*, .csv.*, .ipc.*) can only be * populated by builtin registration (ray_env_bind). Refuse at * every user-reachable binding path so `(let .sys.gc 99)` or a - * lambda parameter named `.sys.gc` cannot shadow the builtin. */ - if (ray_sym_is_reserved(sym_id)) return RAY_ERR_RESERVED; + * lambda parameter named `.sys.gc` cannot shadow the builtin. + * Narrow carve-out: the five `.ipc.on.*` connection-hook syms are + * user-settable here too, matching the global `ray_env_set` path. */ + if (ray_sym_is_reserved(sym_id) && !ray_sym_is_ipc_hook(sym_id)) + return RAY_ERR_RESERVED; if (scope_depth <= 0) return ray_env_set(sym_id, val); if (ray_sym_is_dotted(sym_id)) { return env_set_dotted(sym_id, val, lookup_top_frame, env_bind_local); diff --git a/src/lang/env.h b/src/lang/env.h index 25170c2a..1e34aae1 100644 --- a/src/lang/env.h +++ b/src/lang/env.h @@ -69,6 +69,27 @@ ray_err_t ray_env_bind_flat(int64_t sym_id, ray_t* val); * installer) refuse such names so system bindings can't be shadowed. */ bool ray_sym_is_reserved(int64_t sym_id); +/* True if a symbol is one of the five user-settable IPC connection-hook + * names: `.ipc.on.open`, `.ipc.on.close`, `.ipc.on.sync`, `.ipc.on.async`, + * `.ipc.on.auth`. These are the narrow carve-out from the reserved-name + * reject in `ray_env_set` / `ray_env_set_local` and the compile-time `let` + * guard; everything else under `.ipc.*` stays unsettable. Lazy-interned + * on first probe — idempotent within one sym-table lifetime; the cache + * is invalidated in `ray_env_destroy` so it survives runtime cycles. */ +bool ray_sym_is_ipc_hook(int64_t sym_id); + +/* Stable index → sym-id accessor for the five IPC hook names. Used by + * `src/core/ipc.c` to look up hook bindings from `g_env` without having + * to maintain its own duplicate cache (and its own stale-cache bug + * across runtime cycles). `idx` must be one of: + * 0 = .ipc.on.open + * 1 = .ipc.on.close + * 2 = .ipc.on.sync + * 3 = .ipc.on.async + * 4 = .ipc.on.auth + * Out-of-range returns -1. */ +int64_t ray_sym_ipc_hook(int idx); + /* Resolve a name for a Rayfall expression (tree-walking eval or bytecode * op_resolve): returns an OWNED ref (rc >= 1) that the caller must * release, or NULL if undefined. Unlike ray_env_get which returns a diff --git a/src/lang/eval.c b/src/lang/eval.c index 2f6cac11..e4f9d71f 100644 --- a/src/lang/eval.c +++ b/src/lang/eval.c @@ -2818,6 +2818,9 @@ static void ray_register_builtins(void) { register_unary( ".ipc.open", RAY_FN_RESTRICTED, ray_hopen_fn); register_unary( ".ipc.close", RAY_FN_RESTRICTED, ray_hclose_fn); register_binary(".ipc.send", RAY_FN_RESTRICTED, ray_hsend_fn); + /* Current connection handle inside any `.ipc.on.*` hook, -1 otherwise. + * Variadic for the `(.ipc.handle)` / `(.ipc.handle 0)` convention. */ + register_vary( ".ipc.handle", RAY_FN_NONE, ray_ipc_handle_fn); /* Remote-REPL session control under `.repl.*`. Once .repl.connect * succeeds, the local REPL line-loop reroutes each subsequent input diff --git a/src/lang/internal.h b/src/lang/internal.h index 13fffe64..2df8f3eb 100644 --- a/src/lang/internal.h +++ b/src/lang/internal.h @@ -470,6 +470,7 @@ ray_t* ray_de_fn(ray_t* val); ray_t* ray_hopen_fn(ray_t* x); ray_t* ray_hclose_fn(ray_t* x); ray_t* ray_hsend_fn(ray_t* handle, ray_t* msg); +ray_t* ray_ipc_handle_fn(ray_t** args, int64_t n); ray_t* ray_set_splayed_fn(ray_t** args, int64_t n); ray_t* ray_get_splayed_fn(ray_t** args, int64_t n); ray_t* ray_get_parted_fn(ray_t** args, int64_t n); diff --git a/src/ops/system.c b/src/ops/system.c index 9ccc90b4..02187135 100644 --- a/src/ops/system.c +++ b/src/ops/system.c @@ -843,3 +843,12 @@ ray_t* ray_hsend_fn(ray_t* handle, ray_t* msg) { return ray_error("type", "message not serializable"); return ray_ipc_send(h, msg); } + +/* (.ipc.handle) → i64 current connection handle inside any `.ipc.on.*` + * hook, or -1 outside any hook. Registered variadic so both + * `(.ipc.handle)` (no args) and `(.ipc.handle 0)` (one arg, ignored) + * work — matches the convention of `.sys.gc` / `.sys.info`. */ +ray_t* ray_ipc_handle_fn(ray_t** args, int64_t n) { + (void)args; (void)n; + return make_i64(ray_ipc_current_handle()); +} diff --git a/test/rfl/system/reserved_namespace.rfl b/test/rfl/system/reserved_namespace.rfl index 3f7577f5..af84bcc4 100644 --- a/test/rfl/system/reserved_namespace.rfl +++ b/test/rfl/system/reserved_namespace.rfl @@ -24,8 +24,8 @@ ;; gc, exec, build, mem, info, cmd, timeit, listen, env (count .os) -- 4 ;; getenv, setenv, size, list -(count .ipc) -- 3 -;; open, close, send +(count .ipc) -- 4 +;; open, close, send, handle (count .csv) -- 4 ;; read, splayed, parted, write ;; .db.* — three-level reserved namespace for storage I/O. The @@ -115,3 +115,32 @@ internals !- name ;; User-level dotted writes under a non-`.` root still work. (set myns.x 1) myns.x -- 1 +;; Connection-hook carve-out: the five `.ipc.on.*` names are the only +;; dotted-reserved names a user can set. `set` accepts them, `let` +;; accepts them, and the rest of `.ipc.*` (open / close / send / handle) +;; stays unsettable. +(set .ipc.on.open (fn [h] h)) +(set .ipc.on.close (fn [h] h)) +(set .ipc.on.sync (fn [m] m)) +(set .ipc.on.async (fn [m] m)) +(set .ipc.on.auth (fn [u p] true)) +;; Each binding resolves back to a callable lambda. +(nil? .ipc.on.open) -- false +(nil? .ipc.on.close) -- false +(nil? .ipc.on.sync) -- false +(nil? .ipc.on.async) -- false +(nil? .ipc.on.auth) -- false +;; Anything ELSE under `.ipc.*` (or any other reserved root) is still +;; rejected with `reserve`, even names that visually resemble a hook. +(set .ipc.handle 9) !- reserve +(set .ipc.open (fn [x] x)) !- reserve +(set .ipc.send (fn [a b] a)) !- reserve +(set .ipc.on.bogus (fn [x] x)) !- reserve +(let .ipc.on.bogus (fn [x] x)) !- reserve +;; Clearing a hook with a non-lambda value is allowed — the server +;; treats anything that isn't a lambda as "no hook installed". +(set .ipc.on.open 0) +.ipc.on.open -- 0 +;; `.ipc.handle` outside any hook reads -1 — the thread-local default. +(.ipc.handle) -- -1 +(.ipc.handle 0) -- -1 diff --git a/test/test_ipc.c b/test/test_ipc.c index f456574c..e7045a9a 100644 --- a/test/test_ipc.c +++ b/test/test_ipc.c @@ -1492,6 +1492,170 @@ static test_result_t test_ipc_send_lazy_msg(void) { PASS(); } +/* ---- test_ipc_hooks_lifecycle ------------------------------------------- */ +/* + * End-to-end exercise of the `.ipc.on.*` connection hooks on the legacy + * server path. Three hooks are installed via `set`; a single round-trip + * (open → SYNC eval → close) drives them all. We track side effects + * through plain user-bound globals that hooks mutate, then read those + * globals back after the connection lifecycle has completed. + * + * Covers: the reserved-namespace allow-list for .ipc.on.*, hook_lookup, + * hook_call_lifecycle, the sync-message dispatch replacement in + * eval_payload_core, and `.ipc.handle` thread-local set/restore around + * the eval window. The "_hook_sync_handle" assertion is the cross- + * thread piece: the server thread writes through ray_env_set into the + * same global env the test thread reads via ray_env_resolve. + */ +static test_result_t test_ipc_hooks_lifecycle(void) { + /* Wire up counters + hooks BEFORE the server thread starts so the + * very first `.ipc.on.open` fired sees the binding. */ + const char* setup = + "(set _hook_open 0)" + "(set _hook_close 0)" + "(set _hook_sync_handle (- 0 99))" + "(set _hook_sync_msg 0)" + /* Lifecycle hooks: just bump counters. Return value ignored. */ + "(set .ipc.on.open (fn [h] (set _hook_open (+ _hook_open 1))))" + "(set .ipc.on.close (fn [h] (set _hook_close (+ _hook_close 1))))" + /* Sync hook: capture the handle visible via `.ipc.handle`, then + * parse + eval the inbound message and return its result so the + * client still gets a sensible response. We use `parse + eval` + * because Rayfall's `eval` operates on a parsed AST, while the + * client sends a string source — the dual-path the default in + * eval_payload_core handles is now the user's responsibility. */ + "(set .ipc.on.sync (fn [m] " + " (set _hook_sync_handle (.ipc.handle)) " + " (set _hook_sync_msg 1) " + " (eval (parse m))))"; + ray_t* r = ray_eval_str(setup); + TEST_ASSERT_NOT_NULL(r); + TEST_ASSERT_FALSE(RAY_IS_ERR(r)); + if (r != RAY_NULL_OBJ) ray_release(r); + + ray_ipc_server_t srv; + ray_err_t err = ray_ipc_server_init(&srv, 0); + TEST_ASSERT_EQ_I(err, RAY_OK); + + uint16_t port = get_listen_port(srv.listen_fd); + TEST_ASSERT((port) > (0), "port > 0"); + + ray_vm_t* srv_vm = make_server_vm(); + TEST_ASSERT_NOT_NULL(srv_vm); + + ipc_thread_ctx_t ctx = { .srv = &srv, .vm = srv_vm }; + ray_thread_t tid; + ray_thread_create(&tid, server_thread_fn, &ctx); + + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + TEST_ASSERT((h) >= (0), "h >= 0"); + + /* One SYNC round-trip — drives on.open (after handshake), on.sync + * (during eval), and on.close (on client-side close below). */ + ray_t* msg = ray_str("(+ 2 3)", 7); + ray_t* resp = ray_ipc_send(h, msg); + ray_release(msg); + TEST_ASSERT_NOT_NULL(resp); + TEST_ASSERT_FALSE(RAY_IS_ERR(resp)); + TEST_ASSERT_EQ_I(resp->type, -RAY_I64); + TEST_ASSERT_EQ_I(resp->i64, 5); + ray_release(resp); + + /* Trigger on.close on the SERVER side by tearing the connection + * down from underneath the poll loop. Closing the client socket + * makes the server's recv return 0, which routes through conn_close + * → hook_call_lifecycle(CLOSE). */ + ray_ipc_close(h); + sleep_ms(50); + + /* Stop the server before reading hook side effects — guarantees + * the close hook has fired (otherwise we'd race the poll loop). */ + srv.running = false; + ray_thread_join(tid); + ray_ipc_server_destroy(&srv); + ray_sys_free(srv_vm); + + /* Read counters back through the global env. on.open + on.sync + * + on.close each fired exactly once. `_hook_sync_handle` records + * `.ipc.handle` as seen INSIDE the sync hook — must equal the + * legacy server's conn-array index (0 for the only active conn). */ + int64_t sym_open = ray_sym_intern("_hook_open", strlen("_hook_open")); + int64_t sym_close = ray_sym_intern("_hook_close", strlen("_hook_close")); + int64_t sym_h = ray_sym_intern("_hook_sync_handle", strlen("_hook_sync_handle")); + int64_t sym_msg = ray_sym_intern("_hook_sync_msg", strlen("_hook_sync_msg")); + + ray_t* v_open = ray_env_get(sym_open); TEST_ASSERT_NOT_NULL(v_open); + ray_t* v_close = ray_env_get(sym_close); TEST_ASSERT_NOT_NULL(v_close); + ray_t* v_h = ray_env_get(sym_h); TEST_ASSERT_NOT_NULL(v_h); + ray_t* v_msg = ray_env_get(sym_msg); TEST_ASSERT_NOT_NULL(v_msg); + + TEST_ASSERT_EQ_I(v_open->i64, 1); + TEST_ASSERT_EQ_I(v_close->i64, 1); + TEST_ASSERT_EQ_I(v_msg->i64, 1); + TEST_ASSERT_EQ_I(v_h->i64, 0); + + /* `.ipc.handle` outside any hook reads back -1. */ + ray_t* handle_outside = ray_eval_str("(.ipc.handle)"); + TEST_ASSERT_NOT_NULL(handle_outside); + TEST_ASSERT_FALSE(RAY_IS_ERR(handle_outside)); + TEST_ASSERT_EQ_I(handle_outside->type, -RAY_I64); + TEST_ASSERT_EQ_I(handle_outside->i64, -1); + if (handle_outside != RAY_NULL_OBJ) ray_release(handle_outside); + PASS(); +} + +/* ---- test_ipc_hooks_auth_narrow ----------------------------------------- */ +/* + * Poll-based server with `-u` auth + a `.ipc.on.auth` hook that rejects + * the username "ban". Drives the on.auth narrowing path: the secret + * compare passes (correct password) but the hook returns false, so the + * handshake is refused. A second client with username "ok" succeeds — + * proving the hook can selectively narrow rather than blanket-reject. + */ +static test_result_t test_ipc_hooks_auth_narrow(void) { + ray_t* r = ray_eval_str( + "(set .ipc.on.auth (fn [u p] (!= u \"ban\")))"); + TEST_ASSERT_NOT_NULL(r); + TEST_ASSERT_FALSE(RAY_IS_ERR(r)); + if (r != RAY_NULL_OBJ) ray_release(r); + + ray_poll_t* poll = ray_poll_create(); + TEST_ASSERT_NOT_NULL(poll); + strcpy(poll->auth_secret, "secret"); + + int64_t listener_id = ray_ipc_listen(poll, 0); + TEST_ASSERT((listener_id) >= (0), "listener_id >= 0"); + ray_selector_t* listener_sel = ray_poll_get(poll, listener_id); + TEST_ASSERT_NOT_NULL(listener_sel); + uint16_t port = get_listen_port((ray_sock_t)listener_sel->fd); + TEST_ASSERT((port) > (0), "port > 0"); + + ray_vm_t* srv_vm = make_server_vm(); + TEST_ASSERT_NOT_NULL(srv_vm); + + poll_thread_ctx_t pctx = { .poll = poll, .vm = srv_vm, .running = 1 }; + ray_thread_t tid; + ray_thread_create(&tid, (void(*)(void*))poll_server_thread_fn, &pctx); + sleep_ms(20); + + /* "ban":secret → password is correct, but the hook returns false + * → handshake rejected with the same 0x01 byte the wrong-password + * path uses, so the client surfaces -3 (auth rejected). */ + int64_t h_banned = ray_ipc_connect("127.0.0.1", port, "ban", "secret"); + TEST_ASSERT_EQ_I(h_banned, -3); + + /* "ok":secret → both checks pass, connection succeeds. */ + int64_t h_ok = ray_ipc_connect("127.0.0.1", port, "ok", "secret"); + TEST_ASSERT((h_ok) >= (0), "h_ok >= 0"); + if (h_ok >= 0) ray_ipc_close(h_ok); + + poll_stop(poll, port); + ray_thread_join(tid); + ray_poll_destroy(poll); + ray_sys_free(srv_vm); + PASS(); +} + /* ---- Registry ------------------------------------------------------------ */ const test_entry_t ipc_entries[] = { @@ -1523,5 +1687,7 @@ const test_entry_t ipc_entries[] = { { "ipc/server_conn_swap", test_ipc_server_conn_swap, ipc_setup, ipc_teardown }, { "ipc/journal_restricted", test_ipc_journal_restricted, ipc_setup, ipc_teardown }, { "ipc/send_lazy_msg", test_ipc_send_lazy_msg, ipc_setup, ipc_teardown }, + { "ipc/hooks_lifecycle", test_ipc_hooks_lifecycle, ipc_setup, ipc_teardown }, + { "ipc/hooks_auth_narrow", test_ipc_hooks_auth_narrow, ipc_setup, ipc_teardown }, { NULL, NULL, NULL, NULL }, }; diff --git a/website/docs/ipc.html b/website/docs/ipc.html index e80b65f7..338fe966 100644 --- a/website/docs/ipc.html +++ b/website/docs/ipc.html @@ -330,6 +330,100 @@

.ipc.close

Message types. .ipc.send uses synchronous messaging — it blocks until the server returns a result. Asynchronous (fire-and-forget) messaging is available via the C API (ray_ipc_send_async). +

Connection Hooks

+

The server side exposes the inbound connection lifecycle to Rayfall code through five hook slots under .ipc.on.*. Each is a user-installable lambda; when unbound the server falls back to its built-in default, so installing a hook is purely opt-in.

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameSignatureFiresReturn value
.ipc.on.open(fn [h] ...)Inbound connection fully handshaked (and authed when -u/-U is on), just before the first header read.Ignored.
.ipc.on.close(fn [h] ...)Inbound connection about to close, before its socket is closed and per-conn state is freed. Pairs 1-to-1 with .ipc.on.open — never fires for connections that died mid-handshake.Ignored.
.ipc.on.sync(fn [m] ...)Inbound sync request, after the payload is deserialised into m. Replaces the default in-process eval.Serialised and shipped to the client as the response.
.ipc.on.async(fn [m] ...)Inbound async message. Same dispatch point as on.sync, but the wire produces no response.Ignored (errors are logged to stderr).
.ipc.on.auth(fn [u p] ...)After the constant-time secret compare in -u/-U auth passes. Servers started without auth never reach this hook.Truthy = accept connection, falsy / error = reject and close.
.ipc.handle(.ipc.handle)Builtin readable inside any of the five hooks above — returns the current connection's handle.-1 outside any hook.
+ +

Install with plain set or the colon binder:

+ +
(set .ipc.on.open  (fn [h] (println "+ " h)))
+(set .ipc.on.close (fn [h] (println "- " h)))
+
+;; Sync hook receives the raw deserialised payload.  Strings need an
+;; explicit parse before eval; the default in-server dispatch does this
+;; for you, but a hook gets the message as-is.
+(set .ipc.on.sync  (fn [m] (eval (parse m))))
+
+;; Narrow auth: hook runs AFTER the password check, so it can only
+;; deny extras — never widen access.  Here, deny the username "ban".
+(set .ipc.on.auth  (fn [u p] (!= u "ban")))
+ +
+ Reserved-namespace carve-out. The five .ipc.on.* names are the only dotted-reserved names a user can set — the rest of .ipc.* (open, close, send, handle) and every other system namespace (.sys.*, .os.*, .csv.*, …) stays unsettable and returns a reserve error on any binding attempt. +
+ +

Clear a hook by assigning a non-lambda value:

+ +
;; Cleared — server falls back to the default behaviour.
+(set .ipc.on.open 0)
+ +

Anything that isn’t a callable lambda is treated as “no hook installed”, so a stale binding never wedges the server. Per-hook error handling:

+ + + + + + + + + + + + + + +
HookError in hook body
.ipc.on.open / .ipc.on.closeLogged to stderr, swallowed. Connection teardown proceeds.
.ipc.on.syncSerialised and shipped to the client as the response — same as a raw eval error.
.ipc.on.asyncLogged to stderr, dropped. No wire response on async.
.ipc.on.authTreated as reject. Same 0x01 handshake byte as a wrong-password rejection.
+ +

Hooks run under the same restricted-mode flag the inbound message would otherwise see — a .ipc.on.sync installed on a -U server cannot escalate privilege; the same blocked-builtins list above applies inside the hook body.

+

Serialization with ser

The ser builtin converts any value to a binary buffer (a U8 vector). Pass it any Rayforce value — atom, vector, list, or table:

diff --git a/website/docs/reference.html b/website/docs/reference.html index e6f1b146..c69b8b62 100644 --- a/website/docs/reference.html +++ b/website/docs/reference.html @@ -739,6 +739,12 @@

IPC (Inter-Process Communication)

.ipc.openunaryrestrictedOpen TCP connection to host:port, returns handle(.ipc.open "localhost:5000") .ipc.closeunaryrestrictedClose an IPC connection handle(.ipc.close h) .ipc.sendbinaryrestrictedSend a value over an IPC handle (sync request)(.ipc.send h "(sum (til 100))") + .ipc.handlevariadic—Current connection handle inside any .ipc.on.* hook, -1 outside(.ipc.handle) + .ipc.on.openhookuser-settableFires after inbound connection completes handshake; arg = handle(set .ipc.on.open (fn [h] ...)) + .ipc.on.closehookuser-settableFires before inbound connection teardown; arg = handle(set .ipc.on.close (fn [h] ...)) + .ipc.on.synchookuser-settableIntercepts sync messages; return value becomes the response(set .ipc.on.sync (fn [m] (eval (parse m)))) + .ipc.on.asynchookuser-settableIntercepts async messages; return value ignored(set .ipc.on.async (fn [m] ...)) + .ipc.on.authhookuser-settableNarrows -u/-U auth; truthy = accept, falsy = reject(set .ipc.on.auth (fn [u p] (!= u "ban")))