Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

{deps, [
{cowboy, "2.12.0"},
{erlang_python, "1.8.1"}
{erlang_python, {git, "https://github.com/benoitc/erlang-python.git", {branch, "feature/py-worker-pool"}}}
]}.

{shell, [
Expand Down
7 changes: 4 additions & 3 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
{"1.2.0",
[{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.12.0">>},0},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.13.0">>},1},
{<<"erlang_python">>,{pkg,<<"erlang_python">>,<<"1.8.1">>},0},
{<<"erlang_python">>,
{git,"https://github.com/benoitc/erlang-python.git",
{ref,"9956584c3c8ced865c59704a36ef641b9e4d72e3"}},
0},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},1}]}.
[
{pkg_hash,[
{<<"cowboy">>, <<"F276D521A1FF88B2B9B4C54D0E753DA6C66DD7BE6C9FCA3D9418B561828A3731">>},
{<<"cowlib">>, <<"DB8F7505D8332D98EF50A3EF34B34C1AFDDEC7506E4EE4DD4A3A266285D282CA">>},
{<<"erlang_python">>, <<"4DAFC7AFD315F0D5D45792F722364D8721CED438E396B7D14F19518DC78D198B">>},
{<<"ranch">>, <<"8C7A100A139FD57F17327B6413E4167AC559FBC04CA7448E9BE9057311597A1D">>}]},
{pkg_hash_ext,[
{<<"cowboy">>, <<"8A7ABE6D183372CEB21CAA2709BEC928AB2B72E18A3911AA1771639BEF82651E">>},
{<<"cowlib">>, <<"E1E1284DC3FC030A64B1AD0D8382AE7E99DA46C3246B815318A4B848873800A4">>},
{<<"erlang_python">>, <<"0F5893100A92285096519F8111D3A6D3F5DCD18668545ADF9F4144899D5997C2">>},
{<<"ranch">>, <<"49FBCFD3682FAB1F5D109351B61257676DA1A2FDBE295904176D5E521A2DDFE5">>}]}
].
20 changes: 19 additions & 1 deletion src/hornbeam.erl
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,22 @@ parse_app_spec(AppSpec) when is_binary(AppSpec) ->
ensure_python_runtime(Config) ->
Workers = maps:get(workers, Config, 4),
ok = application:set_env(erlang_python, num_workers, Workers),
case current_python_workers() of
Result = case current_python_workers() of
{ok, Workers} ->
ok;
_ ->
restart_python_runtime()
end,
%% Ensure worker pool is started for high-concurrency ASGI
case Result of
ok -> ensure_worker_pool();
Error -> Error
end.

ensure_worker_pool() ->
case py_worker_pool:stats() of
#{initialized := true} -> ok;
_ -> py_worker_pool:start_link()
end.

current_python_workers() ->
Expand All @@ -570,8 +581,15 @@ restart_python_runtime() ->
start_python_runtime() ->
case application:start(erlang_python) of
ok ->
%% Start the new worker pool for high-concurrency ASGI handling
ok = py_worker_pool:start_link(),
refresh_lifespan_manager();
{error, {already_started, erlang_python}} ->
%% Ensure worker pool is started
case py_worker_pool:stats() of
#{initialized := true} -> ok;
_ -> ok = py_worker_pool:start_link()
end,
refresh_lifespan_manager();
{error, Reason} ->
{error, {python_start_failed, Reason}}
Expand Down
182 changes: 169 additions & 13 deletions src/hornbeam_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ handle_websocket_upgrade(Req, State) ->
%%% ============================================================================

handle_wsgi(Req, State) ->
%% Build initial request map for hooks
%% Start profiling if enabled
Prof0 = hornbeam_profiler:start_request(),

%% Build initial request map for hooks (measures cowboy parsing overhead)
ReqInfo = build_request_info(Req),
Prof1 = hornbeam_profiler:mark(cowboy_parse, Prof0),

%% Run on_request hook
ReqInfo1 = hornbeam_http_hooks:run_on_request(ReqInfo),
Prof2 = hornbeam_profiler:mark(hooks_request, Prof1),

try
%% Get app module and callable from cached state (avoids ETS lookups)
Expand All @@ -108,24 +113,28 @@ handle_wsgi(Req, State) ->
Result = case PyContext of
undefined ->
%% Optimized py_wsgi:run/4 path (NIF-based marshalling)
run_wsgi_optimized(Req, AppModule, AppCallable, State);
run_wsgi_optimized_profiled(Req, AppModule, AppCallable, State, Prof2);
Ctx ->
%% Context affinity path - uses same worker as lifespan
run_wsgi_with_context(Req, AppModule, AppCallable, Ctx, TimeoutMs, State)
run_wsgi_with_context_profiled(Req, AppModule, AppCallable, Ctx, TimeoutMs, State, Prof2)
end,

case Result of
{ok, Response} ->
{ok, Response, Prof3} ->
%% Run on_response hook
Response1 = hornbeam_http_hooks:run_on_response(Response),
send_wsgi_response(Req, Response1, State);
Prof4 = hornbeam_profiler:mark(hooks_response, Prof3),
send_wsgi_response_profiled(Req, Response1, State, Prof4);
{error, {overloaded, Current, Max}} ->
hornbeam_profiler:end_request(Prof2),
overload_response(Req, Current, Max, State);
{error, Error} ->
hornbeam_profiler:end_request(Prof2),
handle_error(Req, Error, ReqInfo1, State)
end
catch
Class:Reason:Stack ->
hornbeam_profiler:end_request(Prof2),
error_logger:error_msg("WSGI handler error: ~p:~p~n~p~n",
[Class, Reason, Stack]),
handle_error(Req, {Class, Reason}, ReqInfo1, State)
Expand All @@ -145,6 +154,24 @@ run_wsgi_optimized(Req, AppModule, AppCallable, State) ->
Error
end.

%% @private
%% Optimized path with profiling
run_wsgi_optimized_profiled(Req, AppModule, AppCallable, State, Prof) ->
Environ = build_environ_for_nif(Req, State),
Prof1 = hornbeam_profiler:mark(scope_build, Prof),
%% Note: body is read inside build_environ_for_nif
Prof2 = hornbeam_profiler:mark(body_read, Prof1),
case py_wsgi:run(AppModule, AppCallable, Environ,
#{runner => <<"hornbeam_wsgi_runner">>}) of
{ok, {Status, Headers, Body}} ->
Prof3 = hornbeam_profiler:mark(python_exec, Prof2),
{ok, #{<<"status">> => Status,
<<"headers">> => Headers,
<<"body">> => Body}, Prof3};
{error, _} = Error ->
Error
end.

%% @private
%% Context-aware fallback path using py:ctx_call
run_wsgi_with_context(Req, AppModule, AppCallable, PyContext, TimeoutMs, State) ->
Expand All @@ -162,6 +189,29 @@ run_wsgi_with_context(Req, AppModule, AppCallable, PyContext, TimeoutMs, State)
py:ctx_call(PyContext, hornbeam_wsgi_runner, run_wsgi,
[AppModule, AppCallable, Environ1], #{}, TimeoutMs).

%% @private
%% Context-aware fallback path with profiling
run_wsgi_with_context_profiled(Req, AppModule, AppCallable, PyContext, TimeoutMs, State, Prof) ->
EnvOpts = case maps:get(script_name, State, undefined) of
undefined -> #{};
ScriptName -> #{script_name => ScriptName}
end,
Environ = hornbeam_wsgi:build_environ(Req, EnvOpts),
Prof1 = hornbeam_profiler:mark(scope_build, Prof),
Environ1 = case maps:get(path_info, State, undefined) of
undefined -> Environ;
PathInfo -> Environ#{<<"PATH_INFO">> => PathInfo}
end,
Prof2 = hornbeam_profiler:mark(body_read, Prof1),
case py:ctx_call(PyContext, hornbeam_wsgi_runner, run_wsgi,
[AppModule, AppCallable, Environ1], #{}, TimeoutMs) of
{ok, Response} ->
Prof3 = hornbeam_profiler:mark(python_exec, Prof2),
{ok, Response, Prof3};
Error ->
Error
end.

%% @private
%% Build environ dict for NIF optimization.
%% Uses binary keys which the NIF optimizes with interned strings.
Expand Down Expand Up @@ -261,6 +311,20 @@ send_wsgi_response(Req, Response, State) ->
Req2 = cowboy_req:reply(StatusCode, CowboyHeaders, Body, Req1),
{ok, Req2, State}.

%% @private
%% Send WSGI response with profiling
send_wsgi_response_profiled(Req, Response, State, Prof) ->
Status = maps:get(<<"status">>, Response),
Headers = maps:get(<<"headers">>, Response),
Body = maps:get(<<"body">>, Response),
EarlyHints = maps:get(<<"early_hints">>, Response, []),
StatusCode = parse_status_code(Status),
CowboyHeaders = convert_headers(Headers),
Req1 = send_early_hints(Req, EarlyHints),
Req2 = cowboy_req:reply(StatusCode, CowboyHeaders, Body, Req1),
hornbeam_profiler:end_request(Prof),
{ok, Req2, State}.

%% @private
send_early_hints(Req, []) ->
Req;
Expand All @@ -286,11 +350,16 @@ parse_status_code(Status) when is_integer(Status) ->
%%% ============================================================================

handle_asgi(Req, State) ->
%% Build initial request map for hooks
%% Start profiling if enabled
Prof0 = hornbeam_profiler:start_request(),

%% Build initial request map for hooks (measures cowboy parsing overhead)
ReqInfo = build_request_info(Req),
Prof1 = hornbeam_profiler:mark(cowboy_parse, Prof0),

%% Run on_request hook
ReqInfo1 = hornbeam_http_hooks:run_on_request(ReqInfo),
Prof2 = hornbeam_profiler:mark(hooks_request, Prof1),

try
%% Get app module and callable from cached state (avoids ETS lookups)
Expand All @@ -300,6 +369,7 @@ handle_asgi(Req, State) ->

%% Read request body
{ok, ReqBody, Req2} = cowboy_req:read_body(Req),
Prof3 = hornbeam_profiler:mark(body_read, Prof2),

%% Determine ASGI execution mode:
%% - context_affinity: Use lifespan context (for shared module state)
Expand All @@ -313,40 +383,46 @@ handle_asgi(Req, State) ->
%% Context affinity path - uses same worker as lifespan
%% Required when app stores resources in module-level variables
PyContext = hornbeam_lifespan:get_context(),
run_asgi_with_context(Req, AppModule, AppCallable, ReqBody, PyContext, TimeoutMs, State);
run_asgi_with_context_profiled(Req, AppModule, AppCallable, ReqBody, PyContext, TimeoutMs, State, Prof3);
{false, true} ->
%% Bound context path - binds worker for request duration
%% Better for apps with multiple async operations
run_asgi_bound(Req, AppModule, AppCallable, ReqBody, TimeoutMs, State);
run_asgi_bound_profiled(Req, AppModule, AppCallable, ReqBody, TimeoutMs, State, Prof3);
{false, false} ->
%% Optimized py_asgi:run/5 path (NIF-based marshalling)
%% Best for simple request/response apps
run_asgi_optimized(Req, AppModule, AppCallable, ReqBody, State)
run_asgi_optimized_profiled(Req, AppModule, AppCallable, ReqBody, State, Prof3)
end,

case Result of
{ok, Response} ->
{ok, Response, Prof4} ->
%% Run on_response hook
Response1 = hornbeam_http_hooks:run_on_response(Response),
send_asgi_response(Req2, Response1, State);
Prof5 = hornbeam_profiler:mark(hooks_response, Prof4),
send_asgi_response_profiled(Req2, Response1, State, Prof5);
{error, {overloaded, Current, Max}} ->
hornbeam_profiler:end_request(Prof3),
overload_response(Req2, Current, Max, State);
{error, Error} ->
hornbeam_profiler:end_request(Prof3),
handle_error(Req2, Error, ReqInfo1, State)
end
catch
Class:Reason:Stack ->
hornbeam_profiler:end_request(Prof2),
error_logger:error_msg("ASGI handler error: ~p:~p~n~p~n",
[Class, Reason, Stack]),
handle_error(Req, {Class, Reason}, ReqInfo1, State)
end.

%% @private
%% Optimized path using py_asgi:run/5 with NIF marshalling
%% Now routes through worker pool for better concurrency
run_asgi_optimized(Req, AppModule, AppCallable, ReqBody, State) ->
Scope = build_scope_for_nif(Req, State),
case py_asgi:run(AppModule, AppCallable, Scope, ReqBody,
#{runner => <<"hornbeam_asgi_runner">>}) of
%% Use worker pool for better GIL handling under high concurrency
case py_worker_pool:asgi_run(AppModule, AppCallable, Scope, ReqBody,
#{runner => <<"hornbeam_asgi_runner">>}) of
{ok, {Status, Headers, Body}} ->
{ok, #{<<"status">> => Status,
<<"headers">> => Headers,
Expand All @@ -355,6 +431,22 @@ run_asgi_optimized(Req, AppModule, AppCallable, ReqBody, State) ->
Error
end.

%% @private
%% Optimized path with profiling - uses worker pool
run_asgi_optimized_profiled(Req, AppModule, AppCallable, ReqBody, State, Prof) ->
Scope = build_scope_for_nif(Req, State),
Prof1 = hornbeam_profiler:mark(scope_build, Prof),
case py_worker_pool:asgi_run(AppModule, AppCallable, Scope, ReqBody,
#{runner => <<"hornbeam_asgi_runner">>}) of
{ok, {Status, Headers, Body}} ->
Prof2 = hornbeam_profiler:mark(python_exec, Prof1),
{ok, #{<<"status">> => Status,
<<"headers">> => Headers,
<<"body">> => Body}, Prof2};
{error, _} = Error ->
Error
end.

%% @private
%% Context-aware fallback path using py:ctx_call
run_asgi_with_context(Req, AppModule, AppCallable, ReqBody, PyContext, TimeoutMs, State) ->
Expand All @@ -372,6 +464,28 @@ run_asgi_with_context(Req, AppModule, AppCallable, ReqBody, PyContext, TimeoutMs
py:ctx_call(PyContext, hornbeam_asgi_runner, run_asgi,
[AppModule, AppCallable, Scope1, ReqBody], #{}, TimeoutMs).

%% @private
%% Context-aware fallback path with profiling
run_asgi_with_context_profiled(Req, AppModule, AppCallable, ReqBody, PyContext, TimeoutMs, State, Prof) ->
ScopeOpts = case maps:get(script_name, State, undefined) of
undefined -> #{};
ScriptName -> #{root_path => ScriptName}
end,
Scope = hornbeam_asgi:build_scope(Req, ScopeOpts),
Prof1 = hornbeam_profiler:mark(scope_build, Prof),
Scope1 = case maps:get(path_info, State, undefined) of
undefined -> Scope;
PathInfo -> Scope#{<<"path">> => PathInfo, <<"raw_path">> => PathInfo}
end,
case py:ctx_call(PyContext, hornbeam_asgi_runner, run_asgi,
[AppModule, AppCallable, Scope1, ReqBody], #{}, TimeoutMs) of
{ok, Response} ->
Prof2 = hornbeam_profiler:mark(python_exec, Prof1),
{ok, Response, Prof2};
Error ->
Error
end.

%% @private
%% Bound context path - binds a worker for the request duration.
%% This reduces overhead for apps with multiple async operations by
Expand All @@ -394,6 +508,30 @@ run_asgi_bound(Req, AppModule, AppCallable, ReqBody, TimeoutMs, State) ->
[AppModule, AppCallable, Scope1, ReqBody], #{}, TimeoutMs)
end).

%% @private
%% Bound context path with profiling
run_asgi_bound_profiled(Req, AppModule, AppCallable, ReqBody, TimeoutMs, State, Prof) ->
ScopeOpts = case maps:get(script_name, State, undefined) of
undefined -> #{};
ScriptName -> #{root_path => ScriptName}
end,
Scope = hornbeam_asgi:build_scope(Req, ScopeOpts),
Prof1 = hornbeam_profiler:mark(scope_build, Prof),
Scope1 = case maps:get(path_info, State, undefined) of
undefined -> Scope;
PathInfo -> Scope#{<<"path">> => PathInfo, <<"raw_path">> => PathInfo}
end,
case py:with_context(fun() ->
py:call(hornbeam_asgi_runner, run_asgi,
[AppModule, AppCallable, Scope1, ReqBody], #{}, TimeoutMs)
end) of
{ok, Response} ->
Prof2 = hornbeam_profiler:mark(python_exec, Prof1),
{ok, Response, Prof2};
Error ->
Error
end.

%% @private
%% Build scope with atom keys for NIF optimization.
%% The NIF uses asgi_get_key_for_term which optimizes atom key lookups.
Expand Down Expand Up @@ -485,6 +623,24 @@ send_asgi_response(Req, Response, State) ->
Req2 = cowboy_req:reply(StatusCode, CowboyHeaders, Body, Req1),
{ok, Req2, State}.

%% @private
%% Send ASGI response with profiling
send_asgi_response_profiled(Req, Response, State, Prof) ->
Status = maps:get(<<"status">>, Response),
Headers = maps:get(<<"headers">>, Response),
Body = maps:get(<<"body">>, Response),
EarlyHints = maps:get(<<"early_hints">>, Response, []),
StatusCode = case Status of
undefined -> 500;
S when is_integer(S) -> S;
S -> parse_status_code(S)
end,
CowboyHeaders = convert_headers(Headers),
Req1 = send_early_hints(Req, EarlyHints),
Req2 = cowboy_req:reply(StatusCode, CowboyHeaders, Body, Req1),
hornbeam_profiler:end_request(Prof),
{ok, Req2, State}.

%%% ============================================================================
%%% Error handling
%%% ============================================================================
Expand Down
Loading