diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index e951a444..15dab47d 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -204,6 +204,11 @@ h2_streams = #{} :: #{pos_integer() => {term(), tuple()}}, %% Current HTTP/2 stream ID for streaming body mode (body = stream) h2_stream_id :: pos_integer() | undefined, + %% Per-stream recv_timeout watchdog timers (sync one-shot reads): + %% StreamId => timer ref. Fires {timeout, TRef, {h2_recv_timeout, StreamId}} + %% if no progress within recv_timeout, so a lost frame fails fast instead of + %% blocking until the connection dies. Re-armed on each DATA frame. + h2_timers = #{} :: #{pos_integer() => reference()}, %% HTTP/3 support (QUIC) %% HTTP/3 connection reference from hackney_h3 @@ -834,20 +839,22 @@ connected({call, From}, is_ready, #conn_data{socket = undefined} = Data) -> %% Socket not connected {next_state, closed, Data, [{reply, From, {ok, closed}}]}; connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket} = Data) -> - %% Check for pending close message first (from active mode) - case has_pending_close(Socket) of + %% Stop active-mode delivery before reconciling the socket for checkout, so + %% no further {tcp,_}/{ssl,_} messages can land after we inspect it. + _ = Transport:setopts(Socket, [{active, false}]), + %% A pooled connection is only reusable if nothing arrived while it was idle. + %% #544: a server-initiated close (tcp_closed/ssl_closed) means drop it. + %% Unsolicited data is just as disqualifying: hackney does not pipeline, so + %% bytes that arrived while idle cannot belong to the next response. Reusing + %% such a socket would strand them (passive recv blocks on an empty buffer) + %% or corrupt the next read. Drop it and let the pool dial a fresh one. + case has_pending_close(Socket) orelse has_pending_data(Transport, Socket) of true -> - %% Server closed the connection - transition to closed state - {next_state, closed, Data#conn_data{socket = undefined}, [{reply, From, {ok, closed}}]}; + {next_state, closed, Data#conn_data{socket = undefined}, + [{reply, From, {ok, closed}}]}; false -> - %% No close message pending - check socket health case check_socket_health(Transport, Socket) of ok -> - %% Socket is healthy - set to passive mode for checkout - %% This ensures the socket is ready for blocking recv operations - _ = Transport:setopts(Socket, [{active, false}]), - %% Flush any data messages that arrived while in active mode - flush_socket_messages(Socket), {keep_state_and_data, [{reply, From, {ok, connected}}]}; {error, _} -> {keep_state_and_data, [{reply, From, {ok, closed}}]} @@ -1044,6 +1051,9 @@ connected({call, From}, {send_headers, Method, Path, Headers}, Data) -> %% HTTP/2 owner messages from h2 library connected(info, {h2, H2Conn, Event}, #conn_data{h2_conn = H2Conn} = Data) -> handle_h2_event(Event, Data); +%% HTTP/2 per-stream recv_timeout watchdog (see arm_h2_timer/2). +connected(info, {timeout, TRef, {h2_recv_timeout, StreamId}}, Data) -> + handle_h2_recv_timeout(StreamId, TRef, Data); %% h2_connection is linked via start_link; trap_exit surfaces its termination %% as an 'EXIT' signal. Convert to the same cleanup path as the monitor DOWN. connected(info, {'EXIT', H2Conn, Reason}, #conn_data{h2_conn = H2Conn} = Data) -> @@ -1340,6 +1350,8 @@ streaming_body(info, {ssl_closed, Socket}, #conn_data{socket = Socket} = Data) - %% can surface it. Mirrors the connected-state h2 handlers. streaming_body(info, {h2, H2Conn, Event}, #conn_data{h2_conn = H2Conn} = Data) -> handle_h2_event(Event, Data); +streaming_body(info, {timeout, TRef, {h2_recv_timeout, StreamId}}, Data) -> + handle_h2_recv_timeout(StreamId, TRef, Data); streaming_body(info, {'EXIT', H2Conn, Reason}, #conn_data{h2_conn = H2Conn} = Data) -> h2_on_closed(Reason, Data#conn_data{h2_conn = undefined, h2_mon = undefined}); streaming_body(info, {'DOWN', Mon, process, _Pid, Reason}, #conn_data{h2_mon = Mon} = Data) -> @@ -2383,15 +2395,30 @@ has_pending_close(Socket) -> false end. -%% @private Flush any socket data messages that arrived while in active mode -%% This is called after setting the socket to passive mode. -%% Note: Close messages are checked separately via has_pending_close/1. -flush_socket_messages(Socket) -> +%% @private Detect unsolicited data that arrived while the socket was idle in +%% active mode. Returns true when any data is pending — such a connection is not +%% safe to reuse (hackney does not pipeline, so the bytes cannot belong to the +%% next response). Drains the mailbox so a dropped connection leaves nothing +%% behind, and peeks the socket buffer to close the active->passive race where a +%% {tcp,_}/{ssl,_} message has not yet landed. Must run after the socket is set +%% passive. Close messages are checked separately via has_pending_close/1. +has_pending_data(Transport, Socket) -> + HadMailbox = drain_socket_mailbox(Socket), + HadBuffer = case Transport:recv(Socket, 0, 0) of + {ok, _Bytes} -> true; %% bytes already buffered on the socket + {error, timeout} -> false; %% nothing pending - healthy idle socket + {error, _} -> true %% closed/other - not reusable + end, + HadMailbox orelse HadBuffer. + +%% @private Remove any {tcp/ssl, Socket, Data} messages from the mailbox, +%% returning true if at least one was present. +drain_socket_mailbox(Socket) -> receive - {tcp, Socket, _Data} -> flush_socket_messages(Socket); - {ssl, Socket, _Data} -> flush_socket_messages(Socket) + {tcp, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true; + {ssl, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true after 0 -> - ok + false end. %% @private Notify pool that connection is available for reuse (async) @@ -2670,6 +2697,65 @@ h2_start_failure(after_upgrade, From, Reason) -> close_h2(H2Conn) -> try h2_connection:close(H2Conn) catch _:_ -> ok end. +%% @private Arm a per-stream recv_timeout watchdog for a sync HTTP/2 read so a +%% lost frame fails fast with {error, timeout} instead of blocking until the +%% connection dies. No-op when recv_timeout is infinity. +arm_h2_timer(StreamId, #conn_data{recv_timeout = Timeout, h2_timers = Timers} = Data) -> + case Timeout of + infinity -> + Data; + _ -> + TRef = erlang:start_timer(Timeout, self(), {h2_recv_timeout, StreamId}), + Data#conn_data{h2_timers = maps:put(StreamId, TRef, Timers)} + end. + +%% @private Cancel and forget a stream's recv_timeout watchdog, if any. +cancel_h2_timer(StreamId, #conn_data{h2_timers = Timers} = Data) -> + case maps:take(StreamId, Timers) of + {TRef, Timers2} -> + _ = erlang:cancel_timer(TRef), + Data#conn_data{h2_timers = Timers2}; + error -> + Data + end. + +%% @private Reset a stream's recv_timeout watchdog after progress (headers or a +%% DATA frame). Keeps the deadline relative to the last byte received, matching +%% HTTP/1.1 per-recv timeout semantics. No-op for streams without a timer. +rearm_h2_timer(StreamId, #conn_data{h2_timers = Timers} = Data) -> + case maps:is_key(StreamId, Timers) of + true -> arm_h2_timer(StreamId, cancel_h2_timer(StreamId, Data)); + false -> Data + end. + +%% @private Cancel every outstanding recv_timeout watchdog (connection gone). +cancel_all_h2_timers(#conn_data{h2_timers = Timers} = Data) -> + _ = maps:fold(fun(_StreamId, TRef, _Acc) -> erlang:cancel_timer(TRef) end, + ok, Timers), + Data#conn_data{h2_timers = #{}}. + +%% @private A recv_timeout watchdog fired: if it is still the live timer for the +%% stream and a sync reader is parked, fail that reader and drop the stream. +%% A stale timer (re-armed or already completed) is ignored. +handle_h2_recv_timeout(StreamId, TRef, + #conn_data{h2_streams = Streams, h2_timers = Timers} = Data) -> + case maps:get(StreamId, Timers, undefined) of + TRef -> + Timers2 = maps:remove(StreamId, Timers), + case maps:get(StreamId, Streams, undefined) of + {From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync -> + Streams2 = maps:remove(StreamId, Streams), + {keep_state, + Data#conn_data{h2_streams = Streams2, h2_timers = Timers2, + request_from = undefined}, + [{reply, From, {error, timeout}}]}; + _ -> + {keep_state, Data#conn_data{h2_timers = Timers2}} + end; + _ -> + {keep_state, Data} + end. + %% @private Send an HTTP/2 request via the h2 library. do_h2_request(From, Method, Path, Headers, Body, Data) -> do_h2_send(From, Method, Path, Headers, Body, @@ -2723,7 +2809,9 @@ do_h2_send(From, Method, Path, Headers, Body, StreamState, Mode, Data) -> }, NewData = case Mode of sync -> - NewData0#conn_data{request_from = From}; + %% Watchdog the response so a lost frame fails fast rather + %% than blocking on the infinity gen_statem:call. + arm_h2_timer(StreamId, NewData0#conn_data{request_from = From}); {async, Ref, StreamTo, AsyncMode} -> NewData0#conn_data{ async = AsyncMode, @@ -2912,8 +3000,12 @@ handle_h2_event({response, StreamId, Status, Headers}, Data) -> h2_on_response(StreamId, Status, Headers, Data); handle_h2_event({data, StreamId, Body, EndStream}, Data) -> h2_on_data(StreamId, Body, EndStream, Data); -handle_h2_event({trailers, _StreamId, _Headers}, Data) -> - {keep_state, Data}; +handle_h2_event({trailers, StreamId, _Headers}, Data) -> + %% RFC 9113 §5.1: a trailing HEADERS frame carries END_STREAM. The h2 lib + %% delivers it as a {trailers,...} event with no terminal DATA frame, so + %% treat it as end-of-stream to release a reader parked on END_STREAM + %% (otherwise the body read hangs until the connection dies). + h2_on_data(StreamId, <<>>, true, Data); handle_h2_event({stream_reset, StreamId, ErrorCode}, Data) -> h2_on_stream_reset(StreamId, ErrorCode, Data); handle_h2_event({goaway, _LastStreamId, ErrorCode}, Data) -> @@ -2933,9 +3025,11 @@ h2_on_response(StreamId, Status, Headers, Data) -> Streams2 = maps:put(StreamId, {From, {sync, body, Status, Headers, <<>>}}, Streams), - {keep_state, Data#conn_data{h2_streams = Streams2, - status = Status, - response_headers = Headers}}; + Data2 = rearm_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2, + status = Status, + response_headers = Headers}), + {keep_state, Data2}; {StreamTo, {async, AsyncMode, StreamTo, Ref, waiting_headers}} -> StreamTo ! {hackney_response, Ref, {status, Status, <<>>}}, StreamTo ! {hackney_response, Ref, {headers, Headers}}, @@ -2976,15 +3070,18 @@ h2_on_data(StreamId, Body, EndStream, Data) -> case EndStream of true -> Streams2 = maps:remove(StreamId, Streams), - {keep_state, - Data#conn_data{h2_streams = Streams2, - request_from = undefined}, + Data2 = cancel_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2, + request_from = undefined}), + {keep_state, Data2, [{reply, From, {ok, Status, Headers, NewAcc}}]}; false -> Streams2 = maps:put(StreamId, {From, {sync, body, Status, Headers, NewAcc}}, Streams), - {keep_state, Data#conn_data{h2_streams = Streams2}} + Data2 = rearm_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2}), + {keep_state, Data2} end; {StreamTo, {async, AsyncMode, StreamTo, Ref, streaming, Status, Headers}} -> _ = case byte_size(Body) of @@ -3055,10 +3152,12 @@ h2_on_data(StreamId, Body, EndStream, Data) -> h2_on_stream_reset(StreamId, ErrorCode, Data) -> #conn_data{h2_streams = Streams} = Data, case maps:get(StreamId, Streams, undefined) of - {From, {sync, _}} -> + {From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync -> Streams2 = maps:remove(StreamId, Streams), - {keep_state, - Data#conn_data{h2_streams = Streams2, request_from = undefined}, + Data2 = cancel_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2, + request_from = undefined}), + {keep_state, Data2, [{reply, From, {error, {stream_error, ErrorCode}}}]}; {StreamTo, {async, _, StreamTo, Ref, _, _, _}} -> StreamTo ! {hackney_response, Ref, {error, {stream_error, ErrorCode}}}, @@ -3091,11 +3190,12 @@ h2_stream_parked_from(_) -> undefined. h2_on_goaway(ErrorCode, Data) -> {Replies, Data1} = collect_h2_aborts({goaway, ErrorCode}, Data), - {keep_state, Data1, Replies}. + {keep_state, cancel_all_h2_timers(Data1), Replies}. h2_on_closed(Reason, Data) -> {Replies, Data1} = collect_h2_aborts({closed, Reason}, Data), - Stripped = Data1#conn_data{h2_conn = undefined, h2_mon = undefined, + Data2 = cancel_all_h2_timers(Data1), + Stripped = Data2#conn_data{h2_conn = undefined, h2_mon = undefined, socket = undefined}, %% Transition to closed. For pooled conns, closed(enter,...) keeps the %% process alive for ?CLOSED_GRACE_MS so calls from workers that raced diff --git a/test/hackney_http2_trailers_tests.erl b/test/hackney_http2_trailers_tests.erl new file mode 100644 index 00000000..7caedd75 --- /dev/null +++ b/test/hackney_http2_trailers_tests.erl @@ -0,0 +1,132 @@ +%%% Tests for HTTP/2 responses that signal end-of-stream with a trailing +%%% HEADERS frame (trailers) instead of an END_STREAM DATA flag, and for the +%%% per-stream recv_timeout watchdog. +%%% +%%% Regression for the production hang: an h2 response with no content-length +%%% whose body is closed by trailers (what proxies/ALBs emit for streamed +%%% bodies) left the body reader parked forever, because hackney ignored the +%%% {trailers,...} event and waited for an END_STREAM DATA frame that never +%%% came. The reader must complete as soon as the peer signals end-of-stream. +-module(hackney_http2_trailers_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(BODY, (binary:copy(<<"x">>, 14000))). + +cert_dir() -> + BeamDir = filename:dirname(code:which(?MODULE)), + Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]), + filename:join([filename:absname(Root), "test", "certs"]). + +%%==================================================================== +%% Fixture +%%==================================================================== + +http2_trailers_test_() -> + {setup, + fun setup/0, + fun cleanup/1, + fun(Ctx) -> + [{"body closed by trailers completes promptly", + {timeout, 30, fun() -> t_trailered_response(Ctx) end}}, + {"trailered + END_STREAM-on-DATA both work across a reused pooled conn", + {timeout, 60, fun() -> t_pooled_reuse(Ctx) end}}, + {"a never-ending response fails fast on recv_timeout", + {timeout, 30, fun() -> t_recv_timeout_safety_net(Ctx) end}}] + end}. + +setup() -> + _ = application:ensure_all_started(hackney), + _ = application:ensure_all_started(h2), + Certs = cert_dir(), + {ok, Server} = h2:start_server(0, #{ + cert => filename:join(Certs, "server.pem"), + key => filename:join(Certs, "server.key"), + handler => fun server_handler/5, + settings => #{max_concurrent_streams => unlimited} + }), + Port = h2:server_port(Server), + #{server => Server, port => Port}. + +cleanup(#{server := Server}) -> + try h2:stop_server(Server) catch _:_ -> ok end, + ok. + +%%==================================================================== +%% Server handler +%%==================================================================== + +%% /trailered : body, then END_STREAM via a trailing HEADERS frame. +%% /normal : body with END_STREAM on the final DATA frame. +%% /stall : headers + a partial body, then never end the stream. +server_handler(Conn, Sid, _Method, Path, _Headers) -> + Json = [{<<"content-type">>, <<"application/json">>}], + case Path of + <<"/trailered">> -> + ok = h2:send_response(Conn, Sid, 200, Json), + ok = h2:send_data(Conn, Sid, ?BODY, false), + ok = h2:send_trailers(Conn, Sid, [{<<"x-trailer">>, <<"end">>}]); + <<"/normal">> -> + ok = h2:send_response(Conn, Sid, 200, Json), + ok = h2:send_data(Conn, Sid, ?BODY, true); + <<"/stall">> -> + ok = h2:send_response(Conn, Sid, 200, Json), + ok = h2:send_data(Conn, Sid, <<"partial">>, false), + %% Leave the stream open well past the client's recv_timeout. + timer:sleep(5000), + try h2:send_data(Conn, Sid, <<>>, true) catch _:_ -> ok end + end. + +%%==================================================================== +%% Tests +%%==================================================================== + +%% Bug 1a: a response closed by trailers must return the full body, not hang. +t_trailered_response(#{port := Port}) -> + {ok, 200, _Headers, Body} = + hackney:request(get, url(Port, <<"/trailered">>), [], <<>>, opts(false)), + ?assertEqual(?BODY, Body). + +%% Reuse the single pooled h2 connection for many sequential requests, +%% alternating trailered and END_STREAM-on-DATA responses; every read must +%% return the full body with no lost bytes and no recv_timeout. +t_pooled_reuse(#{port := Port}) -> + Pool = hackney_h2_trailers_pool, + _ = hackney_pool:start_pool(Pool, [{max_connections, 5}]), + try + lists:foreach(fun(N) -> + Path = case N rem 2 of + 0 -> <<"/trailered">>; + 1 -> <<"/normal">> + end, + {ok, 200, _H, Body} = + hackney:request(get, url(Port, Path), [], <<>>, opts(Pool)), + ?assertEqual(?BODY, Body) + end, lists:seq(1, 40)) + after + (try hackney_pool:stop_pool(Pool) catch _:_ -> ok end) + end. + +%% Bug 1b: with no trailers and no END_STREAM, the watchdog must surface +%% {error, timeout} quickly instead of blocking until the connection dies. +t_recv_timeout_safety_net(#{port := Port}) -> + Opts = [{pool, false}, {protocols, [http2]}, {recv_timeout, 500}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + Started = erlang:monotonic_time(millisecond), + Result = hackney:request(get, url(Port, <<"/stall">>), [], <<>>, Opts), + Elapsed = erlang:monotonic_time(millisecond) - Started, + ?assertEqual({error, timeout}, Result), + ?assert(Elapsed < 3000). + +%%==================================================================== +%% Helpers +%%==================================================================== + +opts(Pool) -> + [{pool, Pool}, + {protocols, [http2]}, + {recv_timeout, 5000}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}]. + +url(Port, Path) -> + iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), Path]). diff --git a/test/hackney_pool_tests.erl b/test/hackney_pool_tests.erl index 47300537..8a0b17f5 100644 --- a/test/hackney_pool_tests.erl +++ b/test/hackney_pool_tests.erl @@ -59,6 +59,8 @@ hackney_pool_integration_test_() -> {"queue timeout", {timeout, 120, fun test_queue_timeout/0}}, {"checkout timeout", {timeout, 120, fun test_checkout_timeout/0}}, {"server close detected when idle (issue #544)", {timeout, 30, fun test_server_close_detected/0}}, + {"unsolicited idle data refuses reuse; clean conn stays reusable", + {timeout, 30, fun test_idle_data_refuses_reuse/0}}, {"checkout survives a connection dying mid-liveness-check (PR #869)", {timeout, 30, fun test_checkout_survives_dying_connection/0}}, {"checkout_ssl without pooled conns returns needs_upgrade", @@ -825,3 +827,40 @@ test_server_close_detected() -> ?assertEqual(0, FreeCount2), ok = hackney_pool:stop_pool(test_pool_server_close). + +%% Bug 2: a pooled HTTP/1.1 connection that received unsolicited bytes while +%% idle must not be reused. hackney does not pipeline, so bytes arriving while +%% idle can never belong to the next response; reusing the socket would strand +%% them (passive recv blocks on an empty buffer) or corrupt the next read. A +%% clean idle connection is still reusable, so keep-alive is preserved. +test_idle_data_refuses_reuse() -> + Self = self(), + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]), + {ok, ServerPort} = inet:port(ListenSock), + ServerPid = spawn_link(fun() -> + {ok, ClientSock} = gen_tcp:accept(ListenSock, 5000), + Self ! {server, accepted}, + receive {write, Bytes} -> gen_tcp:send(ClientSock, Bytes) end, + receive stop -> ok end, + gen_tcp:close(ClientSock), + gen_tcp:close(ListenSock) + end), + + ok = hackney_pool:start_pool(test_pool_idle_data, [{pool_size, 5}]), + Opts = [{pool, test_pool_idle_data}], + {ok, _PoolInfo, ConnPid} = + hackney_pool:checkout("127.0.0.1", ServerPort, hackney_tcp, Opts), + receive {server, accepted} -> ok after 5000 -> error(timeout_accept) end, + + %% Clean connection: is_ready sets the socket passive and keeps it reusable. + ?assertEqual({ok, connected}, hackney_conn:is_ready(ConnPid)), + + %% Server pushes unsolicited bytes into the now-passive socket buffer. + ServerPid ! {write, <<"unsolicited">>}, + timer:sleep(50), + + %% The connection must now be refused rather than reused with stale bytes. + ?assertEqual({ok, closed}, hackney_conn:is_ready(ConnPid)), + + ServerPid ! stop, + ok = hackney_pool:stop_pool(test_pool_idle_data).