Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 132 additions & 32 deletions src/hackney_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@
h2_streams = #{} :: #{pos_integer() => {term(), tuple()}},
%% Current HTTP/2 stream ID for streaming body mode (body = stream)
h2_stream_id :: pos_integer() | undefined,
%% Per-stream recv_timeout watchdog timers (sync one-shot reads):
%% StreamId => timer ref. Fires {timeout, TRef, {h2_recv_timeout, StreamId}}
%% if no progress within recv_timeout, so a lost frame fails fast instead of
%% blocking until the connection dies. Re-armed on each DATA frame.
h2_timers = #{} :: #{pos_integer() => reference()},

%% HTTP/3 support (QUIC)
%% HTTP/3 connection reference from hackney_h3
Expand Down Expand Up @@ -834,20 +839,22 @@ connected({call, From}, is_ready, #conn_data{socket = undefined} = Data) ->
%% Socket not connected
{next_state, closed, Data, [{reply, From, {ok, closed}}]};
connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket} = Data) ->
%% Check for pending close message first (from active mode)
case has_pending_close(Socket) of
%% Stop active-mode delivery before reconciling the socket for checkout, so
%% no further {tcp,_}/{ssl,_} messages can land after we inspect it.
_ = Transport:setopts(Socket, [{active, false}]),
%% A pooled connection is only reusable if nothing arrived while it was idle.
%% #544: a server-initiated close (tcp_closed/ssl_closed) means drop it.
%% Unsolicited data is just as disqualifying: hackney does not pipeline, so
%% bytes that arrived while idle cannot belong to the next response. Reusing
%% such a socket would strand them (passive recv blocks on an empty buffer)
%% or corrupt the next read. Drop it and let the pool dial a fresh one.
case has_pending_close(Socket) orelse has_pending_data(Transport, Socket) of
true ->
%% Server closed the connection - transition to closed state
{next_state, closed, Data#conn_data{socket = undefined}, [{reply, From, {ok, closed}}]};
{next_state, closed, Data#conn_data{socket = undefined},
[{reply, From, {ok, closed}}]};
false ->
%% No close message pending - check socket health
case check_socket_health(Transport, Socket) of
ok ->
%% Socket is healthy - set to passive mode for checkout
%% This ensures the socket is ready for blocking recv operations
_ = Transport:setopts(Socket, [{active, false}]),
%% Flush any data messages that arrived while in active mode
flush_socket_messages(Socket),
{keep_state_and_data, [{reply, From, {ok, connected}}]};
{error, _} ->
{keep_state_and_data, [{reply, From, {ok, closed}}]}
Expand Down Expand Up @@ -1044,6 +1051,9 @@ connected({call, From}, {send_headers, Method, Path, Headers}, Data) ->
%% HTTP/2 owner messages from h2 library
connected(info, {h2, H2Conn, Event}, #conn_data{h2_conn = H2Conn} = Data) ->
handle_h2_event(Event, Data);
%% HTTP/2 per-stream recv_timeout watchdog (see arm_h2_timer/2).
connected(info, {timeout, TRef, {h2_recv_timeout, StreamId}}, Data) ->
handle_h2_recv_timeout(StreamId, TRef, Data);
%% h2_connection is linked via start_link; trap_exit surfaces its termination
%% as an 'EXIT' signal. Convert to the same cleanup path as the monitor DOWN.
connected(info, {'EXIT', H2Conn, Reason}, #conn_data{h2_conn = H2Conn} = Data) ->
Expand Down Expand Up @@ -1340,6 +1350,8 @@ streaming_body(info, {ssl_closed, Socket}, #conn_data{socket = Socket} = Data) -
%% can surface it. Mirrors the connected-state h2 handlers.
streaming_body(info, {h2, H2Conn, Event}, #conn_data{h2_conn = H2Conn} = Data) ->
handle_h2_event(Event, Data);
streaming_body(info, {timeout, TRef, {h2_recv_timeout, StreamId}}, Data) ->
handle_h2_recv_timeout(StreamId, TRef, Data);
streaming_body(info, {'EXIT', H2Conn, Reason}, #conn_data{h2_conn = H2Conn} = Data) ->
h2_on_closed(Reason, Data#conn_data{h2_conn = undefined, h2_mon = undefined});
streaming_body(info, {'DOWN', Mon, process, _Pid, Reason}, #conn_data{h2_mon = Mon} = Data) ->
Expand Down Expand Up @@ -2383,15 +2395,30 @@ has_pending_close(Socket) ->
false
end.

%% @private Flush any socket data messages that arrived while in active mode
%% This is called after setting the socket to passive mode.
%% Note: Close messages are checked separately via has_pending_close/1.
flush_socket_messages(Socket) ->
%% @private Detect unsolicited data that arrived while the socket was idle in
%% active mode. Returns true when any data is pending — such a connection is not
%% safe to reuse (hackney does not pipeline, so the bytes cannot belong to the
%% next response). Drains the mailbox so a dropped connection leaves nothing
%% behind, and peeks the socket buffer to close the active->passive race where a
%% {tcp,_}/{ssl,_} message has not yet landed. Must run after the socket is set
%% passive. Close messages are checked separately via has_pending_close/1.
has_pending_data(Transport, Socket) ->
HadMailbox = drain_socket_mailbox(Socket),
HadBuffer = case Transport:recv(Socket, 0, 0) of
{ok, _Bytes} -> true; %% bytes already buffered on the socket
{error, timeout} -> false; %% nothing pending - healthy idle socket
{error, _} -> true %% closed/other - not reusable
end,
HadMailbox orelse HadBuffer.

%% @private Remove any {tcp/ssl, Socket, Data} messages from the mailbox,
%% returning true if at least one was present.
drain_socket_mailbox(Socket) ->
receive
{tcp, Socket, _Data} -> flush_socket_messages(Socket);
{ssl, Socket, _Data} -> flush_socket_messages(Socket)
{tcp, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true;
{ssl, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true
after 0 ->
ok
false
end.

%% @private Notify pool that connection is available for reuse (async)
Expand Down Expand Up @@ -2670,6 +2697,65 @@ h2_start_failure(after_upgrade, From, Reason) ->
close_h2(H2Conn) ->
try h2_connection:close(H2Conn) catch _:_ -> ok end.

%% @private Arm a per-stream recv_timeout watchdog for a sync HTTP/2 read so a
%% lost frame fails fast with {error, timeout} instead of blocking until the
%% connection dies. No-op when recv_timeout is infinity.
arm_h2_timer(StreamId, #conn_data{recv_timeout = Timeout, h2_timers = Timers} = Data) ->
case Timeout of
infinity ->
Data;
_ ->
TRef = erlang:start_timer(Timeout, self(), {h2_recv_timeout, StreamId}),
Data#conn_data{h2_timers = maps:put(StreamId, TRef, Timers)}
end.

%% @private Cancel and forget a stream's recv_timeout watchdog, if any.
cancel_h2_timer(StreamId, #conn_data{h2_timers = Timers} = Data) ->
case maps:take(StreamId, Timers) of
{TRef, Timers2} ->
_ = erlang:cancel_timer(TRef),
Data#conn_data{h2_timers = Timers2};
error ->
Data
end.

%% @private Reset a stream's recv_timeout watchdog after progress (headers or a
%% DATA frame). Keeps the deadline relative to the last byte received, matching
%% HTTP/1.1 per-recv timeout semantics. No-op for streams without a timer.
rearm_h2_timer(StreamId, #conn_data{h2_timers = Timers} = Data) ->
case maps:is_key(StreamId, Timers) of
true -> arm_h2_timer(StreamId, cancel_h2_timer(StreamId, Data));
false -> Data
end.

%% @private Cancel every outstanding recv_timeout watchdog (connection gone).
cancel_all_h2_timers(#conn_data{h2_timers = Timers} = Data) ->
_ = maps:fold(fun(_StreamId, TRef, _Acc) -> erlang:cancel_timer(TRef) end,
ok, Timers),
Data#conn_data{h2_timers = #{}}.

%% @private A recv_timeout watchdog fired: if it is still the live timer for the
%% stream and a sync reader is parked, fail that reader and drop the stream.
%% A stale timer (re-armed or already completed) is ignored.
handle_h2_recv_timeout(StreamId, TRef,
#conn_data{h2_streams = Streams, h2_timers = Timers} = Data) ->
case maps:get(StreamId, Timers, undefined) of
TRef ->
Timers2 = maps:remove(StreamId, Timers),
case maps:get(StreamId, Streams, undefined) of
{From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync ->
Streams2 = maps:remove(StreamId, Streams),
{keep_state,
Data#conn_data{h2_streams = Streams2, h2_timers = Timers2,
request_from = undefined},
[{reply, From, {error, timeout}}]};
_ ->
{keep_state, Data#conn_data{h2_timers = Timers2}}
end;
_ ->
{keep_state, Data}
end.

%% @private Send an HTTP/2 request via the h2 library.
do_h2_request(From, Method, Path, Headers, Body, Data) ->
do_h2_send(From, Method, Path, Headers, Body,
Expand Down Expand Up @@ -2723,7 +2809,9 @@ do_h2_send(From, Method, Path, Headers, Body, StreamState, Mode, Data) ->
},
NewData = case Mode of
sync ->
NewData0#conn_data{request_from = From};
%% Watchdog the response so a lost frame fails fast rather
%% than blocking on the infinity gen_statem:call.
arm_h2_timer(StreamId, NewData0#conn_data{request_from = From});
{async, Ref, StreamTo, AsyncMode} ->
NewData0#conn_data{
async = AsyncMode,
Expand Down Expand Up @@ -2912,8 +3000,12 @@ handle_h2_event({response, StreamId, Status, Headers}, Data) ->
h2_on_response(StreamId, Status, Headers, Data);
handle_h2_event({data, StreamId, Body, EndStream}, Data) ->
h2_on_data(StreamId, Body, EndStream, Data);
handle_h2_event({trailers, _StreamId, _Headers}, Data) ->
{keep_state, Data};
handle_h2_event({trailers, StreamId, _Headers}, Data) ->
%% RFC 9113 §5.1: a trailing HEADERS frame carries END_STREAM. The h2 lib
%% delivers it as a {trailers,...} event with no terminal DATA frame, so
%% treat it as end-of-stream to release a reader parked on END_STREAM
%% (otherwise the body read hangs until the connection dies).
h2_on_data(StreamId, <<>>, true, Data);
handle_h2_event({stream_reset, StreamId, ErrorCode}, Data) ->
h2_on_stream_reset(StreamId, ErrorCode, Data);
handle_h2_event({goaway, _LastStreamId, ErrorCode}, Data) ->
Expand All @@ -2933,9 +3025,11 @@ h2_on_response(StreamId, Status, Headers, Data) ->
Streams2 = maps:put(StreamId,
{From, {sync, body, Status, Headers, <<>>}},
Streams),
{keep_state, Data#conn_data{h2_streams = Streams2,
status = Status,
response_headers = Headers}};
Data2 = rearm_h2_timer(StreamId,
Data#conn_data{h2_streams = Streams2,
status = Status,
response_headers = Headers}),
{keep_state, Data2};
{StreamTo, {async, AsyncMode, StreamTo, Ref, waiting_headers}} ->
StreamTo ! {hackney_response, Ref, {status, Status, <<>>}},
StreamTo ! {hackney_response, Ref, {headers, Headers}},
Expand Down Expand Up @@ -2976,15 +3070,18 @@ h2_on_data(StreamId, Body, EndStream, Data) ->
case EndStream of
true ->
Streams2 = maps:remove(StreamId, Streams),
{keep_state,
Data#conn_data{h2_streams = Streams2,
request_from = undefined},
Data2 = cancel_h2_timer(StreamId,
Data#conn_data{h2_streams = Streams2,
request_from = undefined}),
{keep_state, Data2,
[{reply, From, {ok, Status, Headers, NewAcc}}]};
false ->
Streams2 = maps:put(StreamId,
{From, {sync, body, Status, Headers, NewAcc}},
Streams),
{keep_state, Data#conn_data{h2_streams = Streams2}}
Data2 = rearm_h2_timer(StreamId,
Data#conn_data{h2_streams = Streams2}),
{keep_state, Data2}
end;
{StreamTo, {async, AsyncMode, StreamTo, Ref, streaming, Status, Headers}} ->
_ = case byte_size(Body) of
Expand Down Expand Up @@ -3055,10 +3152,12 @@ h2_on_data(StreamId, Body, EndStream, Data) ->
h2_on_stream_reset(StreamId, ErrorCode, Data) ->
#conn_data{h2_streams = Streams} = Data,
case maps:get(StreamId, Streams, undefined) of
{From, {sync, _}} ->
{From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync ->
Streams2 = maps:remove(StreamId, Streams),
{keep_state,
Data#conn_data{h2_streams = Streams2, request_from = undefined},
Data2 = cancel_h2_timer(StreamId,
Data#conn_data{h2_streams = Streams2,
request_from = undefined}),
{keep_state, Data2,
[{reply, From, {error, {stream_error, ErrorCode}}}]};
{StreamTo, {async, _, StreamTo, Ref, _, _, _}} ->
StreamTo ! {hackney_response, Ref, {error, {stream_error, ErrorCode}}},
Expand Down Expand Up @@ -3091,11 +3190,12 @@ h2_stream_parked_from(_) -> undefined.

h2_on_goaway(ErrorCode, Data) ->
{Replies, Data1} = collect_h2_aborts({goaway, ErrorCode}, Data),
{keep_state, Data1, Replies}.
{keep_state, cancel_all_h2_timers(Data1), Replies}.

h2_on_closed(Reason, Data) ->
{Replies, Data1} = collect_h2_aborts({closed, Reason}, Data),
Stripped = Data1#conn_data{h2_conn = undefined, h2_mon = undefined,
Data2 = cancel_all_h2_timers(Data1),
Stripped = Data2#conn_data{h2_conn = undefined, h2_mon = undefined,
socket = undefined},
%% Transition to closed. For pooled conns, closed(enter,...) keeps the
%% process alive for ?CLOSED_GRACE_MS so calls from workers that raced
Expand Down
132 changes: 132 additions & 0 deletions test/hackney_http2_trailers_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
%%% Tests for HTTP/2 responses that signal end-of-stream with a trailing
%%% HEADERS frame (trailers) instead of an END_STREAM DATA flag, and for the
%%% per-stream recv_timeout watchdog.
%%%
%%% Regression for the production hang: an h2 response with no content-length
%%% whose body is closed by trailers (what proxies/ALBs emit for streamed
%%% bodies) left the body reader parked forever, because hackney ignored the
%%% {trailers,...} event and waited for an END_STREAM DATA frame that never
%%% came. The reader must complete as soon as the peer signals end-of-stream.
-module(hackney_http2_trailers_tests).

-include_lib("eunit/include/eunit.hrl").

-define(BODY, (binary:copy(<<"x">>, 14000))).

cert_dir() ->
BeamDir = filename:dirname(code:which(?MODULE)),
Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]),
filename:join([filename:absname(Root), "test", "certs"]).

%%====================================================================
%% Fixture
%%====================================================================

http2_trailers_test_() ->
{setup,
fun setup/0,
fun cleanup/1,
fun(Ctx) ->
[{"body closed by trailers completes promptly",
{timeout, 30, fun() -> t_trailered_response(Ctx) end}},
{"trailered + END_STREAM-on-DATA both work across a reused pooled conn",
{timeout, 60, fun() -> t_pooled_reuse(Ctx) end}},
{"a never-ending response fails fast on recv_timeout",
{timeout, 30, fun() -> t_recv_timeout_safety_net(Ctx) end}}]
end}.

setup() ->
_ = application:ensure_all_started(hackney),
_ = application:ensure_all_started(h2),
Certs = cert_dir(),
{ok, Server} = h2:start_server(0, #{
cert => filename:join(Certs, "server.pem"),
key => filename:join(Certs, "server.key"),
handler => fun server_handler/5,
settings => #{max_concurrent_streams => unlimited}
}),
Port = h2:server_port(Server),
#{server => Server, port => Port}.

cleanup(#{server := Server}) ->
try h2:stop_server(Server) catch _:_ -> ok end,
ok.

%%====================================================================
%% Server handler
%%====================================================================

%% /trailered : body, then END_STREAM via a trailing HEADERS frame.
%% /normal : body with END_STREAM on the final DATA frame.
%% /stall : headers + a partial body, then never end the stream.
server_handler(Conn, Sid, _Method, Path, _Headers) ->
Json = [{<<"content-type">>, <<"application/json">>}],
case Path of
<<"/trailered">> ->
ok = h2:send_response(Conn, Sid, 200, Json),
ok = h2:send_data(Conn, Sid, ?BODY, false),
ok = h2:send_trailers(Conn, Sid, [{<<"x-trailer">>, <<"end">>}]);
<<"/normal">> ->
ok = h2:send_response(Conn, Sid, 200, Json),
ok = h2:send_data(Conn, Sid, ?BODY, true);
<<"/stall">> ->
ok = h2:send_response(Conn, Sid, 200, Json),
ok = h2:send_data(Conn, Sid, <<"partial">>, false),
%% Leave the stream open well past the client's recv_timeout.
timer:sleep(5000),
try h2:send_data(Conn, Sid, <<>>, true) catch _:_ -> ok end
end.

%%====================================================================
%% Tests
%%====================================================================

%% Bug 1a: a response closed by trailers must return the full body, not hang.
t_trailered_response(#{port := Port}) ->
{ok, 200, _Headers, Body} =
hackney:request(get, url(Port, <<"/trailered">>), [], <<>>, opts(false)),
?assertEqual(?BODY, Body).

%% Reuse the single pooled h2 connection for many sequential requests,
%% alternating trailered and END_STREAM-on-DATA responses; every read must
%% return the full body with no lost bytes and no recv_timeout.
t_pooled_reuse(#{port := Port}) ->
Pool = hackney_h2_trailers_pool,
_ = hackney_pool:start_pool(Pool, [{max_connections, 5}]),
try
lists:foreach(fun(N) ->
Path = case N rem 2 of
0 -> <<"/trailered">>;
1 -> <<"/normal">>
end,
{ok, 200, _H, Body} =
hackney:request(get, url(Port, Path), [], <<>>, opts(Pool)),
?assertEqual(?BODY, Body)
end, lists:seq(1, 40))
after
(try hackney_pool:stop_pool(Pool) catch _:_ -> ok end)
end.

%% Bug 1b: with no trailers and no END_STREAM, the watchdog must surface
%% {error, timeout} quickly instead of blocking until the connection dies.
t_recv_timeout_safety_net(#{port := Port}) ->
Opts = [{pool, false}, {protocols, [http2]}, {recv_timeout, 500},
{ssl_options, [{insecure, true}, {verify, verify_none}]}],
Started = erlang:monotonic_time(millisecond),
Result = hackney:request(get, url(Port, <<"/stall">>), [], <<>>, Opts),
Elapsed = erlang:monotonic_time(millisecond) - Started,
?assertEqual({error, timeout}, Result),
?assert(Elapsed < 3000).

%%====================================================================
%% Helpers
%%====================================================================

opts(Pool) ->
[{pool, Pool},
{protocols, [http2]},
{recv_timeout, 5000},
{ssl_options, [{insecure, true}, {verify, verify_none}]}].

url(Port, Path) ->
iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), Path]).
Loading
Loading