Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/hackney.erl
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ do_request(ConnPid, Method, Path, Headers0, Body, Options, URL, Host) ->
Options, URL, FollowRedirect, MaxRedirect, RedirectCount);
_ ->
%% Async request with optional redirect handling
async_request(ConnPid, MethodBin, Path, Headers3, Body, Async, StreamTo, FollowRedirect)
async_request(ConnPid, MethodBin, Path, Headers3, Body, Async, StreamTo, FollowRedirect, Options)
end,

case Result of
Expand Down Expand Up @@ -850,10 +850,15 @@ sync_request_with_redirect_body(ConnPid, Method, Path, HeadersList, FinalBody,
undefined -> [];
InformFun -> [{inform_fun, InformFun}]
end,
ReqOpts = case proplists:get_value(auto_decompress, Options, false) of
ReqOpts1 = case proplists:get_value(auto_decompress, Options, false) of
true -> [{auto_decompress, true} | ReqOpts0];
false -> ReqOpts0
end,
%% Pass recv_timeout through to the connection so it's applied per-request
ReqOpts = case proplists:get_value(recv_timeout, Options) of
undefined -> ReqOpts1;
RecvTimeout -> [{recv_timeout, RecvTimeout} | ReqOpts1]
end,
case hackney_conn:request(ConnPid, Method, Path, HeadersList, FinalBody, infinity, ReqOpts) of
%% HTTP/2 returns body directly - handle 4-tuple first
{ok, Status, RespHeaders, RespBody} when Status >= 301, Status =< 303; Status =:= 307; Status =:= 308 ->
Expand Down Expand Up @@ -1055,13 +1060,18 @@ maybe_strip_auth_on_redirect(CurrentURL, NewURL, Options) ->
end
end.

async_request(ConnPid, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect) ->
async_request(ConnPid, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, Options) ->
%% Handle body encoding
{FinalHeaders, FinalBody} = encode_body(Headers, Body, []),
HeadersList = hackney_headers:to_list(FinalHeaders),
%% Build ReqOpts for recv_timeout (fix for issue #832)
ReqOpts = case proplists:get_value(recv_timeout, Options) of
undefined -> [];
RecvTimeout -> [{recv_timeout, RecvTimeout}]
end,
%% Note: Issue #646 - ownership transfer to StreamTo (when different from caller)
%% is handled atomically inside hackney_conn:do_request_async
case hackney_conn:request_async(ConnPid, Method, Path, HeadersList, FinalBody, AsyncMode, StreamTo, FollowRedirect) of
case hackney_conn:request_async(ConnPid, Method, Path, HeadersList, FinalBody, AsyncMode, StreamTo, FollowRedirect, ReqOpts) of
{ok, Ref} ->
{ok, Ref};
{error, Reason} ->
Expand Down
45 changes: 39 additions & 6 deletions src/hackney_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
request_async/6,
request_async/7,
request_async/8,
request_async/9,
stream_next/1,
stop_async/1,
pause_stream/1,
Expand Down Expand Up @@ -324,6 +325,11 @@ request_async(Pid, Method, Path, Headers, Body, AsyncMode, StreamTo) ->
request_async(Pid, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect) ->
gen_statem:call(Pid, {request_async, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect}).

-spec request_async(pid(), binary(), binary(), list(), binary() | iolist(), true | once, pid(), boolean(), list()) ->
{ok, reference()} | {error, term()}.
request_async(Pid, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, ReqOpts) ->
gen_statem:call(Pid, {request_async, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, ReqOpts}).

%% @doc Request the next message in {async, once} mode.
-spec stream_next(pid()) -> ok | {error, term()}.
stream_next(Pid) ->
Expand Down Expand Up @@ -794,13 +800,19 @@ connected({call, From}, is_upgraded_ssl, #conn_data{upgraded_ssl = Upgraded}) ->
connected({call, From}, is_no_reuse, #conn_data{no_reuse = NoReuse}) ->
{keep_state_and_data, [{reply, From, NoReuse}]};

connected({call, From}, {request, Method, Path, Headers, Body, _ReqOpts}, #conn_data{protocol = http2} = Data) ->
connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, #conn_data{protocol = http2} = Data) ->
%% HTTP/2 request - use h2_machine (1xx not applicable for HTTP/2)
do_h2_request(From, Method, Path, Headers, Body, Data);
%% Allow recv_timeout to be overridden per-request (fix for issue #832)
RecvTimeout = proplists:get_value(recv_timeout, ReqOpts, Data#conn_data.recv_timeout),
NewData = Data#conn_data{recv_timeout = RecvTimeout},
do_h2_request(From, Method, Path, Headers, Body, NewData);

connected({call, From}, {request, Method, Path, Headers, Body, _ReqOpts}, #conn_data{protocol = http3} = Data) ->
connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, #conn_data{protocol = http3} = Data) ->
%% HTTP/3 request - use hackney_h3 (1xx not applicable for HTTP/3)
do_h3_request(From, Method, Path, Headers, Body, Data);
%% Allow recv_timeout to be overridden per-request (fix for issue #832)
RecvTimeout = proplists:get_value(recv_timeout, ReqOpts, Data#conn_data.recv_timeout),
NewData = Data#conn_data{recv_timeout = RecvTimeout},
do_h3_request(From, Method, Path, Headers, Body, NewData);

connected({call, From}, {request_async, Method, Path, Headers, Body, AsyncMode, StreamTo}, #conn_data{protocol = http2} = Data) ->
%% HTTP/2 async request
Expand All @@ -826,6 +838,8 @@ connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, Data) -
%% HTTP/1.1 request
InformFun = proplists:get_value(inform_fun, ReqOpts, undefined),
AutoDecompress = proplists:get_value(auto_decompress, ReqOpts, false),
%% Allow recv_timeout to be overridden per-request (fix for issue #832)
RecvTimeout = proplists:get_value(recv_timeout, ReqOpts, Data#conn_data.recv_timeout),
NewData = Data#conn_data{
request_from = From,
method = Method,
Expand All @@ -840,7 +854,8 @@ connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, Data) -
async_ref = undefined,
stream_to = undefined,
inform_fun = InformFun,
auto_decompress = AutoDecompress
auto_decompress = AutoDecompress,
recv_timeout = RecvTimeout
},
{next_state, sending, NewData, [{next_event, internal, {send_request, Method, Path, Headers, Body}}]};

Expand All @@ -849,9 +864,27 @@ connected({call, From}, {request_async, Method, Path, Headers, Body, AsyncMode,
do_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, false, Data);

connected({call, From}, {request_async, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect}, Data) ->
%% Start a new async request with redirect option
%% Start a new async request with redirect option (HTTP/1.1)
do_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, Data);

connected({call, From}, {request_async, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, ReqOpts}, #conn_data{protocol = http2} = Data) ->
%% HTTP/2 async request with ReqOpts (fix for issue #832)
RecvTimeout = proplists:get_value(recv_timeout, ReqOpts, Data#conn_data.recv_timeout),
NewData = Data#conn_data{recv_timeout = RecvTimeout},
do_h2_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, NewData);

connected({call, From}, {request_async, Method, Path, Headers, Body, AsyncMode, StreamTo, _FollowRedirect, ReqOpts}, #conn_data{protocol = http3} = Data) ->
%% HTTP/3 async request with ReqOpts (fix for issue #832, redirect not yet implemented for H3)
RecvTimeout = proplists:get_value(recv_timeout, ReqOpts, Data#conn_data.recv_timeout),
NewData = Data#conn_data{recv_timeout = RecvTimeout},
do_h3_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, NewData);

connected({call, From}, {request_async, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, ReqOpts}, Data) ->
%% HTTP/1.1 async request with ReqOpts (fix for issue #832)
RecvTimeout = proplists:get_value(recv_timeout, ReqOpts, Data#conn_data.recv_timeout),
NewData = Data#conn_data{recv_timeout = RecvTimeout},
do_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, NewData);

connected({call, From}, {send_headers, Method, Path, Headers}, #conn_data{protocol = http3} = Data) ->
%% HTTP/3 streaming body - send headers only via QUIC
do_h3_send_headers(From, Method, Path, Headers, Data);
Expand Down
23 changes: 23 additions & 0 deletions test/delay_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
%%% -*- erlang -*-
%%%
%%% Test resource that delays before responding
%%% Used to test timeout behavior

-module(delay_handler).

-export([init/2]).

init(Req0, State) ->
%% Extract delay seconds from path binding
Seconds = cowboy_req:binding(seconds, Req0),
DelayMs = binary_to_integer(Seconds) * 1000,

%% Sleep to simulate slow response
timer:sleep(DelayMs),

%% Return response
Body = <<"{\"delayed\": true}">>,
Req = cowboy_req:reply(200, #{
<<"content-type">> => <<"application/json">>
}, Body, Req0),
{ok, Req, State}.
98 changes: 98 additions & 0 deletions test/hackney_recv_timeout_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
%%% -*- erlang -*-
%%%
%%% Test for recv_timeout option with connection pooling
%%%
%%% Bug: When using connection pooling, the recv_timeout option passed in
%%% hackney:request/5 is ignored. The connection uses its original timeout
%%% from when it was created.
%%%
%%% Expected: Each request should respect its own recv_timeout option
%%% Actual: Pooled connections use the timeout they were created with

-module(hackney_recv_timeout_tests).

-include_lib("eunit/include/eunit.hrl").

-define(PORT, 8125).
-define(URL(Path), "http://127.0.0.1:" ++ integer_to_list(?PORT) ++ Path).

%%====================================================================
%% Test Setup
%%====================================================================

recv_timeout_test_() ->
{setup,
fun setup/0,
fun teardown/1,
[
{"recv_timeout works without pooling", fun test_timeout_no_pool/0},
{"recv_timeout should work with pooling", fun test_timeout_with_pool/0}
]}.

setup() ->
error_logger:tty(false),
{ok, _} = application:ensure_all_started(cowboy),
{ok, _} = application:ensure_all_started(hackney),

%% Start test server with delay endpoint
Host = '_',
Routes = [
{"/delay/:seconds", delay_handler, []},
{"/[...]", test_http_resource, []}
],
Dispatch = cowboy_router:compile([{Host, Routes}]),
{ok, _} = cowboy:start_clear(recv_timeout_test_server,
[{port, ?PORT}],
#{env => #{dispatch => Dispatch}}),
ok.

teardown(_) ->
cowboy:stop_listener(recv_timeout_test_server),
error_logger:tty(true),
ok.

%%====================================================================
%% Tests
%%====================================================================

test_timeout_no_pool() ->
%% Without pooling, recv_timeout should work correctly
%% Request to /delay/2 with 100ms timeout should timeout
Url = ?URL("/delay/2"),
Opts = [
{pool, false}, % Disable pooling
{recv_timeout, 100} % 100ms timeout
],

Result = hackney:request(get, Url, [], <<>>, Opts),

%% Should timeout
?assertEqual({error, timeout}, Result).

test_timeout_with_pool() ->
%% With pooling, recv_timeout should still work for each request
PoolName = recv_timeout_test_pool,

%% Create a dedicated pool for this test
ok = hackney_pool:start_pool(PoolName, [{pool_size, 1}]),

try
Url = ?URL("/delay/2"),

%% First request: create connection with long timeout (10000ms)
%% This should succeed (delay is only 2 seconds)
{ok, 200, _, _} = hackney:request(get, ?URL("/get"), [], <<>>,
[{pool, PoolName}, {recv_timeout, 10000}]),

%% Second request: try to use same pooled connection with 100ms timeout
%% This SHOULD timeout because the delay is 2 seconds
ShortTimeoutOpts = [{pool, PoolName}, {recv_timeout, 100}],

Result = hackney:request(get, Url, [], <<>>, ShortTimeoutOpts),

%% Expected: {error, timeout}
%% Bug: {ok, 200, _, _} - request succeeds because timeout is ignored
?assertEqual({error, timeout}, Result)
after
hackney_pool:stop_pool(PoolName)
end.
Loading