diff --git a/NEWS.md b/NEWS.md index 299baa72..56ec8ce1 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,24 @@ # NEWS +4.4.4 - 2026-06-17 +------------------ + +### Fixed + +- HTTP/2: a connection is no longer reused after the peer sends `GOAWAY` while + keeping the socket open (as AWS ALB does to recycle connections). The + connection is retired so the pool dials a fresh one, instead of being handed + out again with new streams the peer ignores until `recv_timeout`. +- HTTP/2: when the per-stream `recv_timeout` watchdog fires, the stalled stream + is cancelled (`RST_STREAM`) so the peer stops sending and the connection is not + reused with an orphaned stream. +- HTTP/1.1: bytes that issue #544's idle `{active, once}` delivers to the + connection mailbox on a reused connection are now buffered and fed to the next + request instead of dropping the connection (refines the 4.4.3 behavior below), + so a reused request no longer blocks to `recv_timeout` while the response sits + stranded as an unread message. The idle buffer is bounded, and a server close + still refuses reuse (#544). + 4.4.3 - 2026-06-17 ------------------ diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index 15dab47d..5c3ca8a5 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -119,6 +119,11 @@ %% late-arriving calls race the pool DOWN cleanup and still get a proper %% error reply instead of exit:{normal, _}. See issue #836. -define(CLOSED_GRACE_MS, 50). +%% Cap on bytes buffered from an idle HTTP/1.1 connection via #544 {active, +%% once}. A well-behaved peer sends nothing while idle; the next response's +%% stranded prefix is small. Past this, treat the peer as misbehaving (flooding +%% an idle connection) and drop it rather than buffer unboundedly. +-define(MAX_IDLE_BUFFER, 65536). %% State data record -record(conn_data, { @@ -838,24 +843,26 @@ connected({call, From}, verify_socket, #conn_data{transport = Transport, socket connected({call, From}, is_ready, #conn_data{socket = undefined} = Data) -> %% Socket not connected {next_state, closed, Data, [{reply, From, {ok, closed}}]}; -connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket} = Data) -> +connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket, + buffer = Buffer} = Data) -> %% Stop active-mode delivery before reconciling the socket for checkout, so %% no further {tcp,_}/{ssl,_} messages can land after we inspect it. _ = Transport:setopts(Socket, [{active, false}]), - %% A pooled connection is only reusable if nothing arrived while it was idle. - %% #544: a server-initiated close (tcp_closed/ssl_closed) means drop it. - %% Unsolicited data is just as disqualifying: hackney does not pipeline, so - %% bytes that arrived while idle cannot belong to the next response. Reusing - %% such a socket would strand them (passive recv blocks on an empty buffer) - %% or corrupt the next read. Drop it and let the pool dial a fresh one. - case has_pending_close(Socket) orelse has_pending_data(Transport, Socket) of + case has_pending_close(Socket) of true -> + %% #544: the server closed the idle connection - never reuse it. {next_state, closed, Data#conn_data{socket = undefined}, [{reply, From, {ok, closed}}]}; false -> case check_socket_health(Transport, Socket) of ok -> - {keep_state_and_data, [{reply, From, {ok, connected}}]}; + %% Bytes delivered to the mailbox while idle in {active, once} + %% are the start of the response; keep them in the read buffer + %% so the next request consumes them instead of stranding them. + Drained = drain_socket_data(Socket), + {keep_state, + Data#conn_data{buffer = <>}, + [{reply, From, {ok, connected}}]}; {error, _} -> {keep_state_and_data, [{reply, From, {ok, closed}}]} end @@ -965,7 +972,10 @@ connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, Data) - status = undefined, reason = undefined, response_headers = undefined, - buffer = <<>>, + %% NOTE: buffer is intentionally preserved (not reset to <<>>). It is + %% empty after any complete response, but may hold response bytes that + %% #544 {active, once} stranded into the mailbox and connected(info,...) + %% buffered; the next request must consume them. async = false, async_ref = undefined, stream_to = undefined, @@ -1040,7 +1050,8 @@ connected({call, From}, {send_headers, Method, Path, Headers}, Data) -> status = undefined, reason = undefined, response_headers = undefined, - buffer = <<>>, + %% buffer preserved (see the {request,...} handler): may hold stranded + %% response bytes buffered from #544 {active, once}. async = false, async_ref = undefined, stream_to = undefined @@ -1076,10 +1087,13 @@ connected(info, {ssl_error, Socket, _Reason}, #conn_data{socket = Socket} = Data %% Unexpected data received while idle - HTTP/1.1 only (H/2 socket is owned %% by h2_connection; H/3 uses QUIC messages). -connected(info, {tcp, Socket, _UnexpectedData}, #conn_data{socket = Socket} = Data) -> - {next_state, closed, Data#conn_data{socket = undefined}}; -connected(info, {ssl, Socket, _UnexpectedData}, #conn_data{socket = Socket} = Data) -> - {next_state, closed, Data#conn_data{socket = undefined}}; +%% Bytes delivered by #544 {active, once} while idle are the start of the next +%% response on a reused connection. Buffer them (do NOT treat as a broken +%% connection) and re-arm close detection, so the next request consumes them. +connected(info, {tcp, Socket, Data}, #conn_data{socket = Socket} = D) -> + buffer_idle_data(Data, D); +connected(info, {ssl, Socket, Data}, #conn_data{socket = Socket} = D) -> + buffer_idle_data(Data, D); %% HTTP/3 message handling connected(info, {h3, ConnRef, {stream_headers, StreamId, Headers, Fin}}, @@ -1131,15 +1145,20 @@ connected(EventType, Event, Data) -> %% State: sending - Sending request data %%==================================================================== -sending(enter, connected, #conn_data{transport = Transport, socket = Socket}) -> - %% Set socket to passive mode for blocking send/recv operations - %% (socket was in active mode while idle in connected state) - %% Note: socket may be undefined for HTTP/3 (QUIC) connections - _ = case Socket of - undefined -> ok; - _ -> Transport:setopts(Socket, [{active, false}]) - end, - keep_state_and_data; +sending(enter, connected, #conn_data{transport = Transport, socket = Socket, + buffer = Buffer} = Data) -> + %% Deterministically leave {active, once} before sending the request, and + %% drain any bytes already delivered to the mailbox into the read buffer so + %% the request/response cycle never runs with stranded data (the reuse hang). + %% Note: socket may be undefined for HTTP/3 (QUIC) connections. + case Socket of + undefined -> + keep_state_and_data; + _ -> + _ = Transport:setopts(Socket, [{active, false}]), + Drained = drain_socket_data(Socket), + {keep_state, Data#conn_data{buffer = <>}} + end; sending(internal, {send_request, Method, Path, Headers, Body}, Data) -> case do_send_request(Method, Path, Headers, Body, Data) of @@ -1186,15 +1205,18 @@ streaming_body(enter, connected, #conn_data{protocol = http2}) -> %% mode. hackney_conn must NOT flip it to passive or the h2 lib stops %% receiving frames (the response would never arrive). keep_state_and_data; -streaming_body(enter, connected, #conn_data{transport = Transport, socket = Socket}) -> - %% Set socket to passive mode for blocking send/recv operations - %% (socket was in active mode while idle in connected state) - %% Note: socket may be undefined for HTTP/3 (QUIC) connections - _ = case Socket of - undefined -> ok; - _ -> Transport:setopts(Socket, [{active, false}]) - end, - keep_state_and_data; +streaming_body(enter, connected, #conn_data{transport = Transport, socket = Socket, + buffer = Buffer} = Data) -> + %% Same as sending(enter): go passive and un-strand any mailbox bytes before + %% the request/response cycle (socket may be undefined for HTTP/3 QUIC). + case Socket of + undefined -> + keep_state_and_data; + _ -> + _ = Transport:setopts(Socket, [{active, false}]), + Drained = drain_socket_data(Socket), + {keep_state, Data#conn_data{buffer = <>}} + end; streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) -> %% Send only headers, then return ok and wait for body chunks @@ -2333,7 +2355,18 @@ stream_body_chunk_result({error, Reason}, _Data) -> %% @private Receive data from socket recv_data(#conn_data{transport = Transport, socket = Socket, recv_timeout = Timeout}) -> - Transport:recv(Socket, 0, Timeout). + %% Consume any bytes stranded in the mailbox by #544 {active, once} before + %% falling back to a passive socket read, so a reused connection never blocks + %% on an empty socket buffer while the response sits unread as a message. + case drain_socket_data(Socket) of + <<>> -> + case has_pending_close(Socket) of + true -> {error, closed}; + false -> Transport:recv(Socket, 0, Timeout) + end; + Bytes -> + {ok, Bytes} + end. %% @private Determine if we should enable active mode when entering connected state %% We only want active mode for close detection when the connection is truly idle @@ -2395,30 +2428,36 @@ has_pending_close(Socket) -> false end. -%% @private Detect unsolicited data that arrived while the socket was idle in -%% active mode. Returns true when any data is pending — such a connection is not -%% safe to reuse (hackney does not pipeline, so the bytes cannot belong to the -%% next response). Drains the mailbox so a dropped connection leaves nothing -%% behind, and peeks the socket buffer to close the active->passive race where a -%% {tcp,_}/{ssl,_} message has not yet landed. Must run after the socket is set -%% passive. Close messages are checked separately via has_pending_close/1. -has_pending_data(Transport, Socket) -> - HadMailbox = drain_socket_mailbox(Socket), - HadBuffer = case Transport:recv(Socket, 0, 0) of - {ok, _Bytes} -> true; %% bytes already buffered on the socket - {error, timeout} -> false; %% nothing pending - healthy idle socket - {error, _} -> true %% closed/other - not reusable - end, - HadMailbox orelse HadBuffer. +%% @private Drain and return any {tcp/ssl, Socket, Data} bytes queued in the +%% mailbox. #544 puts idle pooled sockets in {active, once}, which delivers the +%% next inbound bytes (the start of the response on a reused connection) as a +%% mailbox message and reverts the socket to passive. Those bytes are real +%% response data and must reach the parser, so we drain-and-return them rather +%% than discard them. Close/error messages are handled via has_pending_close/1. +drain_socket_data(Socket) -> + drain_socket_data(Socket, <<>>). -%% @private Remove any {tcp/ssl, Socket, Data} messages from the mailbox, -%% returning true if at least one was present. -drain_socket_mailbox(Socket) -> +drain_socket_data(Socket, Acc) -> receive - {tcp, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true; - {ssl, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true + {tcp, Socket, Data} -> drain_socket_data(Socket, <>); + {ssl, Socket, Data} -> drain_socket_data(Socket, <>) after 0 -> - false + Acc + end. + +%% @private Buffer bytes that #544 {active, once} delivered on an idle HTTP/1.1 +%% connection (the start of the next response on reuse) and re-arm close +%% detection, so the next request consumes them. Bounded by ?MAX_IDLE_BUFFER: a +%% peer flooding an idle connection is dropped rather than buffered unboundedly. +buffer_idle_data(Data, #conn_data{socket = Socket, transport = Transport, + buffer = Buffer} = D) -> + NewBuffer = <>, + case byte_size(NewBuffer) > ?MAX_IDLE_BUFFER of + true -> + {next_state, closed, D#conn_data{socket = undefined}}; + false -> + _ = Transport:setopts(Socket, [{active, once}]), + {keep_state, D#conn_data{buffer = NewBuffer}} end. %% @private Notify pool that connection is available for reuse (async) @@ -2476,7 +2515,8 @@ do_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowR status = undefined, reason = undefined, response_headers = undefined, - buffer = <<>>, + %% buffer preserved (see the {request,...} handler): may hold stranded + %% response bytes buffered from #544 {active, once}. async = AsyncMode, async_ref = Ref, stream_to = StreamTo, @@ -2738,12 +2778,18 @@ cancel_all_h2_timers(#conn_data{h2_timers = Timers} = Data) -> %% stream and a sync reader is parked, fail that reader and drop the stream. %% A stale timer (re-armed or already completed) is ignored. handle_h2_recv_timeout(StreamId, TRef, - #conn_data{h2_streams = Streams, h2_timers = Timers} = Data) -> + #conn_data{h2_streams = Streams, h2_timers = Timers, + h2_conn = H2Conn} = Data) -> case maps:get(StreamId, Timers, undefined) of TRef -> Timers2 = maps:remove(StreamId, Timers), case maps:get(StreamId, Streams, undefined) of {From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync -> + %% RST_STREAM(CANCEL) the stalled stream so the peer stops + %% sending for it and the h2 layer drops it; otherwise the + %% pooled connection would be reused with an orphaned stream + %% still open (h2_conn_usable only checks the conn state). + _ = cancel_h2_stream(H2Conn, StreamId), Streams2 = maps:remove(StreamId, Streams), {keep_state, Data#conn_data{h2_streams = Streams2, h2_timers = Timers2, @@ -2756,6 +2802,11 @@ handle_h2_recv_timeout(StreamId, TRef, {keep_state, Data} end. +%% @private RST_STREAM(CANCEL) a stalled HTTP/2 stream, tolerating a dead conn. +cancel_h2_stream(undefined, _StreamId) -> ok; +cancel_h2_stream(H2Conn, StreamId) -> + try h2_connection:cancel_stream(H2Conn, StreamId) catch _:_ -> ok end. + %% @private Send an HTTP/2 request via the h2 library. do_h2_request(From, Method, Path, Headers, Body, Data) -> do_h2_send(From, Method, Path, Headers, Body, @@ -3188,9 +3239,26 @@ h2_stream_parked_from({stream, headers, _, _, _, From}) -> From; h2_stream_parked_from({stream, body_full, _, _, _, From}) -> From; h2_stream_parked_from(_) -> undefined. -h2_on_goaway(ErrorCode, Data) -> +h2_on_goaway(ErrorCode, #conn_data{h2_conn = H2Conn, h2_mon = H2Mon} = Data) -> + %% A GOAWAY means the peer will not service new streams on this connection. + %% AWS ALBs recycle connections this way, sending GOAWAY but keeping the + %% socket open for a drain window. Leaving the conn `connected` and pooled + %% made checkout_h2/h2_conn_usable keep handing it out, so every reused + %% request opened a stream past last_stream_id that the peer ignored and hung + %% to recv_timeout. Tear the connection down and transition to `closed` (like + %% h2_on_closed/2): the pool then stops reusing it (h2_conn_usable requires + %% `connected`) and new requests dial a fresh connection. in-flight streams + %% are aborted with the goaway error as before. {Replies, Data1} = collect_h2_aborts({goaway, ErrorCode}, Data), - {keep_state, cancel_all_h2_timers(Data1), Replies}. + Data2 = cancel_all_h2_timers(Data1), + _ = case H2Mon of + undefined -> ok; + _ -> erlang:demonitor(H2Mon, [flush]) + end, + close_h2(H2Conn), + Stripped = Data2#conn_data{h2_conn = undefined, h2_mon = undefined, + socket = undefined, no_reuse = true}, + {next_state, closed, Stripped, Replies}. h2_on_closed(Reason, Data) -> {Replies, Data1} = collect_h2_aborts({closed, Reason}, Data), diff --git a/test/hackney_h1_reuse_tests.erl b/test/hackney_h1_reuse_tests.erl new file mode 100644 index 00000000..3f9d383c --- /dev/null +++ b/test/hackney_h1_reuse_tests.erl @@ -0,0 +1,152 @@ +%%% Regression for the HTTP/1.1 reused-connection hang (active-once stranding). +%%% +%%% Idle pooled sockets are put in {active, once} for #544 close detection. That +%%% delivers the next inbound bytes (the start of the response on a reused +%%% connection) as a {tcp/ssl,Socket,Data} mailbox message and reverts the +%%% socket to passive; a passive recv then blocked on the empty socket buffer +%%% while the bytes sat stranded. The fix consumes those bytes (feeds them to the +%%% parser) instead of discarding the connection. +-module(hackney_h1_reuse_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(BODY, <<"{\"ok\":true,\"n\":12345}">>). + +h1_reuse_test_() -> + {setup, + fun() -> _ = application:ensure_all_started(hackney), ok end, + fun(_) -> ok end, + [{"50 reused requests complete promptly with intact bodies", + {timeout, 60, fun reuse_no_hang/0}}, + {"response bytes stranded by active-once are consumed, not lost", + {timeout, 30, fun stranded_bytes_consumed/0}}]}. + +%% Test A: many sequential reused requests must never block to recv_timeout and +%% must return byte-intact bodies (pool and {pool, false}). +reuse_no_hang() -> + {Server, Port} = start_server(), + Pool = h1_reuse_pool, + catch hackney_pool:stop_pool(Pool), + ok = hackney_pool:start_pool(Pool, [{pool_size, 1}]), + Url = iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), <<"/">>]), + try + [ run_one(Url, [{pool, Pool}]) || _ <- lists:seq(1, 50) ], + [ run_one(Url, [{pool, false}]) || _ <- lists:seq(1, 50) ], + ok + after + catch hackney_pool:stop_pool(Pool), + stop_server(Server) + end. + +run_one(Url, PoolOpt) -> + Opts = PoolOpt ++ [{recv_timeout, 3000}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + T0 = erlang:monotonic_time(millisecond), + R = hackney:request(get, Url, [], <<>>, Opts), + Ms = erlang:monotonic_time(millisecond) - T0, + ?assert(Ms < 3000), + ?assertMatch({ok, 200, _, ?BODY}, R). + +%% Test B (deterministic): reuse one conn pid, strand the next response as an +%% {ssl,Socket,Data} message while idle in {active, once}, then issue the next +%% request. Before the fix the conn closed and the request failed with +%% {error, invalid_state}; after the fix the stranded bytes are consumed. +stranded_bytes_consumed() -> + {Server, Port} = start_server(), + {ok, ConnPid} = hackney_conn_sup:start_conn( + #{host => "localhost", port => Port, transport => hackney_ssl, + ssl_options => [{insecure, true}, {verify, verify_none}], + recv_timeout => 3000, idle_timeout => infinity}), + ok = hackney_conn:connect(ConnPid), + try + %% Request 1 over the raw conn; leaves it idle in connected/{active,once}. + {ok, 200, _H1} = hackney_conn:request(ConnPid, <<"GET">>, <<"/">>, [], <<>>), + {ok, ?BODY} = hackney_conn:body(ConnPid), + {connected, ConnData} = sys:get_state(ConnPid), + Socket = element(8, ConnData), %% #conn_data.socket + %% Deliver the next response as an active-once mailbox message. + Stranded = <<"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\n" + "content-length: 8\r\n\r\nSTRANDED">>, + ConnPid ! {ssl, Socket, Stranded}, + timer:sleep(50), + %% The conn must NOT have closed on the stranded bytes. + ?assertMatch({connected, _}, sys:get_state(ConnPid)), + %% Next request (/silent: server sends nothing) must read the stranded + %% response, not hang or fail. + T0 = erlang:monotonic_time(millisecond), + {ok, 200, _H2} = hackney_conn:request(ConnPid, <<"GET">>, <<"/silent">>, [], <<>>), + {ok, Body2} = hackney_conn:body(ConnPid), + Ms = erlang:monotonic_time(millisecond) - T0, + ?assert(Ms < 3000), + ?assertEqual(<<"STRANDED">>, Body2) + after + catch hackney_conn:close(ConnPid), + stop_server(Server) + end. + +%%==================================================================== +%% Minimal keep-alive HTTP/1.1 TLS server. /silent consumes the request +%% without responding. +%%==================================================================== + +start_server() -> + Certs = cert_dir(), + {ok, LSock} = ssl:listen(0, + [{certfile, filename:join(Certs, "server.pem")}, + {keyfile, filename:join(Certs, "server.key")}, + {versions, ['tlsv1.2', 'tlsv1.3']}, + {active, false}, {mode, binary}, {reuseaddr, true}]), + {ok, {_, Port}} = ssl:sockname(LSock), + Pid = spawn(fun() -> accept_loop(LSock) end), + {Pid, Port}. + +stop_server(Pid) -> exit(Pid, shutdown), ok. + +accept_loop(LSock) -> + case ssl:transport_accept(LSock, 2000) of + {ok, TSock} -> + spawn(fun() -> + case ssl:handshake(TSock, 5000) of + {ok, Sock} -> serve(Sock); + _ -> ok + end + end), + accept_loop(LSock); + {error, timeout} -> accept_loop(LSock); + {error, _} -> ok + end. + +serve(Sock) -> + case read_request(Sock, <<>>) of + {ok, Path} -> + case Path of + <<"/silent">> -> ok; + _ -> ok = ssl:send(Sock, response()) + end, + serve(Sock); + closed -> ok + end. + +read_request(Sock, Acc) -> + case binary:match(Acc, <<"\r\n\r\n">>) of + nomatch -> + case ssl:recv(Sock, 0, 30000) of + {ok, Data} -> read_request(Sock, <>); + {error, _} -> closed + end; + _ -> + Path = case binary:split(Acc, <<" ">>, [global]) of + [_Method, P | _] -> P; + _ -> <<"/">> + end, + {ok, Path} + end. + +response() -> + [<<"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: ">>, + integer_to_list(byte_size(?BODY)), <<"\r\n\r\n">>, ?BODY]. + +cert_dir() -> + BeamDir = filename:dirname(code:which(?MODULE)), + Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]), + filename:join([filename:absname(Root), "test", "certs"]). diff --git a/test/hackney_http2_goaway_tests.erl b/test/hackney_http2_goaway_tests.erl new file mode 100644 index 00000000..f9cedf34 --- /dev/null +++ b/test/hackney_http2_goaway_tests.erl @@ -0,0 +1,161 @@ +%%% Regression for the HTTP/2 GOAWAY pooled-reuse hang. +%%% +%%% An h2 server (like an AWS ALB) recycles a connection by sending GOAWAY and +%%% keeping the socket open for a drain window. hackney used to leave such a +%%% connection `connected` and pooled, so checkout_h2 kept handing it out and +%%% every reused request opened a stream past last_stream_id that the peer +%%% ignored, hanging to recv_timeout. The fix tears the connection down on +%%% GOAWAY so the pool dials a fresh one. +%%% +%%% This test embeds a minimal frame-level h2 server (via the h2 dep's h2_frame +%%% / h2_hpack) that answers N requests then sends GOAWAY and stays open. +-module(hackney_http2_goaway_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(PREFACE, <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>). +-define(GOAWAY_AFTER, 5). +-define(BODY, <<"{\"d\":\"", (binary:copy(<<"x">>, 13988))/binary, "\"}">>). + +goaway_reuse_test_() -> + {timeout, 60, fun goaway_does_not_hang_pooled_reuse/0}. + +%% Drive 12 sequential requests through a pool against a server that GOAWAYs +%% every 5 responses but keeps the socket open. No request may hang to +%% recv_timeout, and reuse after GOAWAY must dial a fresh connection and succeed. +goaway_does_not_hang_pooled_reuse() -> + _ = application:ensure_all_started(hackney), + _ = application:ensure_all_started(h2), + {ServerPid, Port} = start_server(), + Pool = goaway_test_pool, + catch hackney_pool:stop_pool(Pool), + ok = hackney_pool:start_pool(Pool, [{max_connections, 5}]), + Opts = [{pool, Pool}, {protocols, [http2]}, {recv_timeout, 2000}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + Url = iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), <<"/">>]), + try + Results = [ begin R = fetch(Url, Opts), timer:sleep(100), R end + || _ <- lists:seq(1, 12) ], + %% Regression: a GOAWAY'd-but-open connection must never strand a reused + %% request at recv_timeout. + Hangs = [ R || {error, timeout} = R <- Results ], + ?assertEqual([], Hangs), + %% Reuse after GOAWAY dials fresh and succeeds. The tail requests are well + %% past the first GOAWAY, on connections opened after it. + ExpectedBytes = byte_size(?BODY), + Tail = lists:nthtail(8, Results), + [ ?assertEqual({ok, 200, ExpectedBytes}, R) || R <- Tail ], + %% At most the single request that races the GOAWAY in-flight may come + %% back as a clean (retryable) goaway error rather than a 200. + Ok = [ R || {ok, 200, _} = R <- Results ], + ?assert(length(Ok) >= 11) + after + catch hackney_pool:stop_pool(Pool), + unlink(ServerPid), + exit(ServerPid, shutdown) + end. + +fetch(Url, Opts) -> + case hackney:request(get, Url, [], <<>>, Opts) of + {ok, S, _H, B} when is_binary(B) -> {ok, S, byte_size(B)}; + {ok, S, _H, Ref} -> + case hackney:body(Ref) of + {ok, B} -> {ok, S, byte_size(B)}; + {error, E} -> {error, E} + end; + {ok, S, _H} -> {ok, S, 0}; + {error, E} -> {error, E} + end. + +%%==================================================================== +%% Minimal frame-level h2 server: answer ?GOAWAY_AFTER requests, then GOAWAY +%% and keep the socket open (ignore later streams). +%%==================================================================== + +start_server() -> + Certs = cert_dir(), + {ok, LSock} = ssl:listen(0, + [{certfile, filename:join(Certs, "server.pem")}, + {keyfile, filename:join(Certs, "server.key")}, + {alpn_preferred_protocols, [<<"h2">>]}, + {versions, ['tlsv1.2', 'tlsv1.3']}, + {active, false}, {mode, binary}, {reuseaddr, true}]), + {ok, {_, Port}} = ssl:sockname(LSock), + Pid = spawn_link(fun() -> accept_loop(LSock) end), + {Pid, Port}. + +accept_loop(LSock) -> + case ssl:transport_accept(LSock, 2000) of + {ok, TSock} -> + spawn(fun() -> serve(TSock) end), + accept_loop(LSock); + {error, timeout} -> accept_loop(LSock); + {error, closed} -> ok + end. + +serve(TSock) -> + case ssl:handshake(TSock, 5000) of + {ok, Sock} -> + case recv_preface(Sock, <<>>) of + {ok, Rest} -> + send(Sock, h2_frame:settings([])), + loop(Sock, Rest, #{enc => h2_hpack:new_context(), count => 0, goaway => false}); + _ -> ok + end; + _ -> ok + end. + +recv_preface(_Sock, Acc) when byte_size(Acc) >= 24 -> + <> = Acc, + case Pre of ?PREFACE -> {ok, Rest}; _ -> {error, bad_preface} end; +recv_preface(Sock, Acc) -> + case ssl:recv(Sock, 0, 5000) of + {ok, Data} -> recv_preface(Sock, <>); + {error, R} -> {error, R} + end. + +loop(Sock, Buf, St) -> + case h2_frame:decode(Buf) of + {ok, Frame, Rest} -> + case handle(Sock, Frame, St) of + {continue, St2} -> loop(Sock, Rest, St2); + stop -> ok + end; + {more, _} -> + case ssl:recv(Sock, 0, 30000) of + {ok, Data} -> loop(Sock, <>, St); + {error, _} -> ok + end; + {error, _, Rest} -> loop(Sock, Rest, St); + {error, _} -> ok + end. + +handle(Sock, {settings, _}, St) -> send(Sock, h2_frame:settings_ack()), {continue, St}; +handle(_Sock, {settings_ack}, St) -> {continue, St}; +handle(Sock, {ping, D}, St) -> send(Sock, h2_frame:ping_ack(D)), {continue, St}; +handle(_Sock, {window_update, _, _}, St) -> {continue, St}; +handle(_Sock, {goaway, _, _, _}, _St) -> stop; +handle(_Sock, {rst_stream, _, _}, St) -> {continue, St}; +handle(_Sock, {headers, _Sid, _B, _E, _H}, #{goaway := true} = St) -> + %% After GOAWAY we no longer answer new streams (the client's read would + %% hang here without the fix). + {continue, St}; +handle(Sock, {headers, Sid, _B, _E, _H}, #{enc := Enc, count := C} = St) -> + {HBlock, Enc2} = h2_hpack:encode( + [{<<":status">>, <<"200">>}, {<<"content-type">>, <<"application/json">>}], Enc), + send(Sock, h2_frame:headers(Sid, HBlock, false)), + send(Sock, h2_frame:data(Sid, ?BODY, true)), + C2 = C + 1, + St2 = St#{enc := Enc2, count := C2}, + case C2 >= ?GOAWAY_AFTER of + true -> send(Sock, h2_frame:goaway(Sid, no_error, <<>>)), {continue, St2#{goaway := true}}; + false -> {continue, St2} + end; +handle(_Sock, _Other, St) -> {continue, St}. + +send(Sock, FrameData) -> ssl:send(Sock, h2_frame:encode(FrameData)). + +cert_dir() -> + BeamDir = filename:dirname(code:which(?MODULE)), + Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]), + filename:join([filename:absname(Root), "test", "certs"]). diff --git a/test/hackney_pool_tests.erl b/test/hackney_pool_tests.erl index 8a0b17f5..51bd33a0 100644 --- a/test/hackney_pool_tests.erl +++ b/test/hackney_pool_tests.erl @@ -59,8 +59,8 @@ hackney_pool_integration_test_() -> {"queue timeout", {timeout, 120, fun test_queue_timeout/0}}, {"checkout timeout", {timeout, 120, fun test_checkout_timeout/0}}, {"server close detected when idle (issue #544)", {timeout, 30, fun test_server_close_detected/0}}, - {"unsolicited idle data refuses reuse; clean conn stays reusable", - {timeout, 30, fun test_idle_data_refuses_reuse/0}}, + {"bytes stranded while idle are buffered, not dropped; conn stays reusable", + {timeout, 30, fun test_idle_data_consumed_not_dropped/0}}, {"checkout survives a connection dying mid-liveness-check (PR #869)", {timeout, 30, fun test_checkout_survives_dying_connection/0}}, {"checkout_ssl without pooled conns returns needs_upgrade", @@ -828,19 +828,18 @@ test_server_close_detected() -> ok = hackney_pool:stop_pool(test_pool_server_close). -%% Bug 2: a pooled HTTP/1.1 connection that received unsolicited bytes while -%% idle must not be reused. hackney does not pipeline, so bytes arriving while -%% idle can never belong to the next response; reusing the socket would strand -%% them (passive recv blocks on an empty buffer) or corrupt the next read. A -%% clean idle connection is still reusable, so keep-alive is preserved. -test_idle_data_refuses_reuse() -> +%% A pooled HTTP/1.1 connection that received bytes while idle in {active, once} +%% (the start of the next response on a reused connection) must BUFFER them and +%% stay reusable - not drop the connection and lose the bytes. A clean idle +%% connection is still reusable (keep-alive preserved); a server close is still +%% refused (#544, see test_server_close_detected). +test_idle_data_consumed_not_dropped() -> Self = self(), {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]), {ok, ServerPort} = inet:port(ListenSock), ServerPid = spawn_link(fun() -> {ok, ClientSock} = gen_tcp:accept(ListenSock, 5000), Self ! {server, accepted}, - receive {write, Bytes} -> gen_tcp:send(ClientSock, Bytes) end, receive stop -> ok end, gen_tcp:close(ClientSock), gen_tcp:close(ListenSock) @@ -852,15 +851,19 @@ test_idle_data_refuses_reuse() -> hackney_pool:checkout("127.0.0.1", ServerPort, hackney_tcp, Opts), receive {server, accepted} -> ok after 5000 -> error(timeout_accept) end, - %% Clean connection: is_ready sets the socket passive and keeps it reusable. - ?assertEqual({ok, connected}, hackney_conn:is_ready(ConnPid)), + {connected, ConnData} = sys:get_state(ConnPid), + Socket = element(8, ConnData), %% #conn_data.socket - %% Server pushes unsolicited bytes into the now-passive socket buffer. - ServerPid ! {write, <<"unsolicited">>}, + %% Deliver bytes as an active-once mailbox message (as #544 would on reuse). + ConnPid ! {tcp, Socket, <<"the-next-response">>}, timer:sleep(50), - %% The connection must now be refused rather than reused with stale bytes. - ?assertEqual({ok, closed}, hackney_conn:is_ready(ConnPid)), + %% Connection is NOT dropped; the bytes are buffered for the next request, + %% and is_ready keeps it reusable. + ?assertMatch({connected, _}, sys:get_state(ConnPid)), + ?assertEqual({ok, connected}, hackney_conn:is_ready(ConnPid)), + {connected, ConnData2} = sys:get_state(ConnPid), + ?assertEqual(<<"the-next-response">>, element(9, ConnData2)), %% #conn_data.buffer ServerPid ! stop, ok = hackney_pool:stop_pool(test_pool_idle_data). diff --git a/test/repro_h1_handler.erl b/test/repro_h1_handler.erl new file mode 100644 index 00000000..e82b85c9 --- /dev/null +++ b/test/repro_h1_handler.erl @@ -0,0 +1,10 @@ +%%% Cowboy handler for the HTTP/1.1 negative control: 200, application/json, +%%% ~14 KB body, NO content-length (chunked), connection kept alive. +-module(repro_h1_handler). +-export([init/2]). + +init(Req0, State) -> + Body = <<"{\"d\":\"", (binary:copy(<<"x">>, 13988))/binary, "\"}">>, + Req1 = cowboy_req:stream_reply(200, #{<<"content-type">> => <<"application/json">>}, Req0), + ok = cowboy_req:stream_body(Body, fin, Req1), + {ok, Req1, State}. diff --git a/test/repro_h1_hang.erl b/test/repro_h1_hang.erl new file mode 100644 index 00000000..fb649afc --- /dev/null +++ b/test/repro_h1_hang.erl @@ -0,0 +1,190 @@ +%%% Reproduction harness for the HTTP/1.1 reused-connection hang (active-once +%%% stranding). Raw keep-alive TLS HTTP/1.1 server with a `presend` knob that +%%% delivers the next response's bytes while the client socket is idle in +%%% {active, once}, plus a sequential hackney client loop. +-module(repro_h1_hang). +-compile(nowarn_deprecated_catch). + +-export([run/1, start_server/1, stop_server/1, client_loop/3, test_b/0]). + +-define(BODY, <<"{\"ok\":true,\"n\":12345}">>). + +run(Knobs) -> + _ = application:ensure_all_started(hackney), + {Server, Port} = start_server(Knobs), + Pool = maps:get(pool, Knobs, false), + PoolOpt = case Pool of + false -> [{pool, false}]; + _ -> catch hackney_pool:stop_pool(Pool), + {ok, _} = {hackney_pool:start_pool(Pool, [{pool_size, 5}]), ok}, + [{pool, Pool}] + end, + N = maps:get(n, Knobs, 20), + IdleMs = maps:get(idle_ms, Knobs, 0), + Opts = PoolOpt ++ [{recv_timeout, 3000}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + io:format("~n========== h1 reuse ~p, n=~w ==========~n", [Knobs, N]), + try + client_loop(Port, N, Opts, IdleMs) + after + case Pool of false -> ok; _ -> catch hackney_pool:stop_pool(Pool) end, + stop_server(Server) + end. + +client_loop(Port, N, Opts) -> + client_loop(Port, N, Opts, 0). + +client_loop(Port, N, Opts, IdleMs) -> + Url = iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), <<"/">>]), + Results = [ begin R = do_one(Url, Opts, I), + case IdleMs of 0 -> ok; _ -> timer:sleep(IdleMs) end, R end + || I <- lists:seq(1, N) ], + Hangs = [ I || {I, _, {error, timeout}} <- Results ], + Errs = [ {I, E} || {I, _, {error, E}} <- Results, E =/= timeout ], + io:format(" ---- hangs=~p other_errors=~p~n", [Hangs, Errs]), + Results. + +do_one(Url, Opts, I) -> + T0 = erlang:monotonic_time(millisecond), + R = fetch(Url, Opts), + Ms = erlang:monotonic_time(millisecond) - T0, + Detail = case R of + {ok, S, B} -> io_lib:format("ok status=~w bytes=~w", [S, B]); + {error, timeout} -> "HANG (recv_timeout)"; + {error, E} -> io_lib:format("error: ~p", [E]) + end, + io:format(" [~3w] ~6w ms ~s~n", [I, Ms, Detail]), + {I, Ms, case R of {ok,_,_} -> ok; Other -> Other end}. + +fetch(Url, Opts) -> + case hackney:request(get, Url, [], <<>>, Opts) of + {ok, S, _H, B} when is_binary(B) -> {ok, S, byte_size(B)}; + {ok, S, _H, Ref} -> + case hackney:body(Ref) of + {ok, B} -> {ok, S, byte_size(B)}; + {error, E} -> {error, E} + end; + {ok, S, _H} -> {ok, S, 0}; + {error, E} -> {error, E} + end. + +%%==================================================================== +%% Test B: deterministic active-once stranding on a reused conn pid +%%==================================================================== + +%% Reuse ONE conn pid (bypassing the pool's is_ready), strand the next response +%% as an {ssl,Socket,Data} message while the conn is idle in {active,once}, then +%% issue the next request and assert it consumes the stranded bytes. +test_b() -> + _ = application:ensure_all_started(hackney), + {Server, Port} = start_server(#{}), + {ok, ConnPid} = hackney_conn_sup:start_conn( + #{host => "localhost", port => Port, transport => hackney_ssl, + ssl_options => [{insecure, true}, {verify, verify_none}], + recv_timeout => 3000, idle_timeout => infinity}), + ok = hackney_conn:connect(ConnPid), + try + {ok, 200, _H1} = hackney_conn:request(ConnPid, <<"GET">>, <<"/">>, [], <<>>), + {ok, Body1} = hackney_conn:body(ConnPid), + io:format("req1 body=~p alive=~p~n", [Body1, is_process_alive(ConnPid)]), + %% conn is now idle in connected/{active,once}. Grab its socket. + {StateName, ConnData} = sys:get_state(ConnPid), + Socket = element(8, ConnData), + io:format("state=~p~n", [StateName]), + %% Strand the NEXT response as an active-once mailbox message. + FakeResp = <<"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ncontent-length: 8\r\n\r\nSTRANDED">>, + ConnPid ! {ssl, Socket, FakeResp}, + timer:sleep(50), + io:format("after strand: alive=~p state=~p~n", + [is_process_alive(ConnPid), (catch element(1, sys:get_state(ConnPid)))]), + %% Request 2 to /silent (server does not respond) -> must read the + %% stranded bytes, not hang or fail. + T0 = erlang:monotonic_time(millisecond), + R = (catch hackney_conn:request(ConnPid, <<"GET">>, <<"/silent">>, [], <<>>)), + R2 = case R of + {ok, 200, _} -> hackney_conn:body(ConnPid); + Other -> Other + end, + Ms = erlang:monotonic_time(millisecond) - T0, + io:format("req2 (~w ms) -> ~p~n", [Ms, R2]) + after + catch hackney_conn:close(ConnPid), + stop_server(Server) + end. + +%%==================================================================== +%% Raw keep-alive HTTP/1.1 TLS server +%%==================================================================== + +start_server(Knobs) -> + Certs = cert_dir(), + {ok, LSock} = ssl:listen(0, + [{certfile, filename:join(Certs, "server.pem")}, + {keyfile, filename:join(Certs, "server.key")}, + {versions, ['tlsv1.2', 'tlsv1.3']}, + {active, false}, {mode, binary}, {reuseaddr, true}]), + {ok, {_, Port}} = ssl:sockname(LSock), + Pid = spawn(fun() -> accept_loop(LSock, Knobs) end), + {Pid, Port}. + +stop_server(Pid) -> exit(Pid, shutdown), ok. + +accept_loop(LSock, Knobs) -> + case ssl:transport_accept(LSock, 2000) of + {ok, TSock} -> + spawn(fun() -> + case ssl:handshake(TSock, 5000) of + {ok, Sock} -> serve(Sock, Knobs, 0, false); + _ -> ok + end + end), + accept_loop(LSock, Knobs); + {error, timeout} -> accept_loop(LSock, Knobs); + {error, _} -> ok + end. + +serve(Sock, Knobs, N, SkipNext) -> + case read_request(Sock, <<>>) of + {ok, Path} -> + Silent = Path =:= <<"/silent">>, + case SkipNext orelse Silent of + true -> + serve(Sock, Knobs, N + 1, false); + false -> + ok = ssl:send(Sock, response()), + case maps:get(presend, Knobs, false) of + true -> + timer:sleep(maps:get(presend_delay, Knobs, 5)), + ok = ssl:send(Sock, response()), %% strand next response + serve(Sock, Knobs, N + 1, true); + false -> + serve(Sock, Knobs, N + 1, false) + end + end; + closed -> ok + end. + +read_request(Sock, Acc) -> + case binary:match(Acc, <<"\r\n\r\n">>) of + nomatch -> + case ssl:recv(Sock, 0, 30000) of + {ok, Data} -> read_request(Sock, <>); + {error, _} -> closed + end; + _ -> + %% Extract the request-line path (GET HTTP/1.1). + Path = case binary:split(Acc, <<" ">>, [global]) of + [_Method, P | _] -> P; + _ -> <<"/">> + end, + {ok, Path} + end. + +response() -> + [<<"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: ">>, + integer_to_list(byte_size(?BODY)), <<"\r\n\r\n">>, ?BODY]. + +cert_dir() -> + BeamDir = filename:dirname(code:which(?MODULE)), + Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]), + filename:join([filename:absname(Root), "test", "certs"]). diff --git a/test/repro_h2_hang.erl b/test/repro_h2_hang.erl new file mode 100644 index 00000000..eb866ba8 --- /dev/null +++ b/test/repro_h2_hang.erl @@ -0,0 +1,489 @@ +%%% Reproduction + localization harness for the intermittent HTTP/2 hang on +%%% reused/pooled connections (hackney 4.4.3). +%%% +%%% Run from the test profile shell, e.g.: +%%% rebar3 as test shell --eval 'repro_h2_hang:scenario(flow_control_reuse), halt().' +%%% +%%% Scenarios (escalation ladder): baseline, flow_control_reuse, framing_small, +%%% framing_big, trailers, negative_control_h1. `all/0` runs the h2 ladder. +-module(repro_h2_hang). +-compile(nowarn_deprecated_catch). + +-export([scenario/1, all/0, + start_h2_server/1, stop_h2_server/1, + client_loop/3, concurrent_rounds/4, direct_h2/3, dump/1, + raw_run/2, raw_matrix/0]). + +-define(RECV_TIMEOUT, 5000). + +%%==================================================================== +%% Scenarios +%%==================================================================== + +scenario(baseline) -> + run_h2(<<"baseline: 14KB, single DATA + END_STREAM, 50 reused requests">>, + #{body_size => 14000, frame_count => 1}, 50, ?RECV_TIMEOUT); + +scenario(flow_control_reuse) -> + %% Lead hypothesis: connection-level recv window (65535) depletes across + %% reused streams if the client fails to send WINDOW_UPDATE(0). 14KB x N + %% crosses 65535 by request ~5. + run_h2(<<"flow_control_reuse: 14KB no-content-length, 30 reused requests">>, + #{body_size => 14000, frame_count => 1}, 30, ?RECV_TIMEOUT); + +scenario(framing_small) -> + run_h2(<<"framing_small: 14KB split into 28 x 500B DATA frames, 30 requests">>, + #{body_size => 14000, frame_count => 28, inter_frame_ms => 1}, 30, ?RECV_TIMEOUT); + +scenario(framing_big) -> + %% >64KB forces a per-stream WINDOW_UPDATE within a single response. + run_h2(<<"framing_big: 200KB body, 10 reused requests">>, + #{body_size => 200000, frame_count => 1}, 10, ?RECV_TIMEOUT); + +scenario(trailers) -> + %% Regression: confirm 4.4.3 still handles trailered END_STREAM. + run_h2(<<"trailers: 14KB body closed by trailers, 20 reused requests">>, + #{body_size => 14000, frame_count => 1, trailers => true}, 20, ?RECV_TIMEOUT); + +scenario(idle_between) -> + %% Let the connection go idle between requests, as in prod, so any + %% control frame the peer sends while idle is processed before reuse. + run_h2_idle(<<"idle_between: 14KB, 100ms idle between 20 reused requests">>, + #{body_size => 14000, frame_count => 1}, 20, 5000, 100); + +scenario(small_window) -> + %% Server advertises a tiny initial_window_size (its receive window; gates + %% client->server DATA, not the GET response, but rules the knob out). + run_h2(<<"small_window: server initial_window_size=1024, 20 reused requests">>, + #{body_size => 14000, frame_count => 1, + settings => #{initial_window_size => 1024, max_concurrent_streams => unlimited}}, + 20, 5000); + +scenario(concurrency) -> + concurrency(#{body_size => 14000, frame_count => 1}, 20, 8, 5000); + +scenario(negative_control_h1) -> + negative_control_h1(30). + +all() -> + [ {S, scenario(S)} || + S <- [baseline, flow_control_reuse, framing_small, framing_big, trailers] ]. + +%%==================================================================== +%% h2 scenario driver +%%==================================================================== + +run_h2(Label, Knobs, N, RecvTimeout) -> + io:format("~n========== ~s ==========~n", [Label]), + ensure_started(), + {Server, Port} = start_h2_server(Knobs), + Pool = repro_pool, + catch hackney_pool:stop_pool(Pool), + ok = hackney_pool:start_pool(Pool, [{max_connections, 5}]), + Opts = [{pool, Pool}, {protocols, [http2]}, {recv_timeout, RecvTimeout}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + try + Summary = client_loop(Port, N, Opts), + report(Summary), + Summary + after + catch hackney_pool:stop_pool(Pool), + stop_h2_server(Server) + end. + +run_h2_idle(Label, Knobs, N, RecvTimeout, IdleMs) -> + io:format("~n========== ~s ==========~n", [Label]), + ensure_started(), + {Server, Port} = start_h2_server(Knobs), + Pool = repro_pool, + catch hackney_pool:stop_pool(Pool), + ok = hackney_pool:start_pool(Pool, [{max_connections, 5}]), + Opts = [{pool, Pool}, {protocols, [http2]}, {recv_timeout, RecvTimeout}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + Url = iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), <<"/">>]), + try + Results = [ begin R = do_one(Url, Opts, I), timer:sleep(IdleMs), R end + || I <- lists:seq(1, N) ], + report(summarize_results(N, Results)), + Results + after + catch hackney_pool:stop_pool(Pool), + stop_h2_server(Server) + end. + +%% Concurrency: Rounds of Conc simultaneous requests over the shared pool (one +%% multiplexed h2 connection), to exercise the multi-stream / reuse races. +concurrency(Knobs, Rounds, Conc, RecvTimeout) -> + io:format("~n========== concurrency: ~w rounds x ~w concurrent, 14KB ==========~n", + [Rounds, Conc]), + ensure_started(), + {Server, Port} = start_h2_server(Knobs), + Pool = repro_pool, + catch hackney_pool:stop_pool(Pool), + ok = hackney_pool:start_pool(Pool, [{max_connections, 10}]), + Opts = [{pool, Pool}, {protocols, [http2]}, {recv_timeout, RecvTimeout}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + Url = iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), <<"/">>]), + try + concurrent_rounds(Url, Opts, Rounds, Conc) + after + catch hackney_pool:stop_pool(Pool), + stop_h2_server(Server) + end. + +concurrent_rounds(Url, Opts, Rounds, Conc) -> + Parent = self(), + All = lists:foldl(fun(Round, Acc) -> + Pids = [ spawn_monitor(fun() -> + T0 = erlang:monotonic_time(millisecond), + R = fetch(Url, Opts), + Ms = erlang:monotonic_time(millisecond) - T0, + Parent ! {done, self(), Round, W, Ms, R} + end) || W <- lists:seq(1, Conc) ], + RoundRes = collect(length(Pids), []), + Hangs = [ {W, Ms} || {_R, W, Ms, {error, timeout}} <- RoundRes ], + Slow = [ {W, Ms} || {_R, W, Ms, {ok, _, _}} <- RoundRes, Ms >= 1000 ], + io:format(" round ~3w: ok=~w hangs=~w slow=~w~n", + [Round, length([1 || {_,_,_,{ok,_,_}} <- RoundRes]), + length(Hangs), length(Slow)]), + case Hangs of [] -> ok; _ -> io:format(" HANGS: ~p~n", [Hangs]) end, + Acc ++ RoundRes + end, [], lists:seq(1, Rounds)), + TotalHangs = [ X || {_R, _W, _Ms, {error, timeout}} = X <- All ], + io:format(" ---- concurrency summary: ~w reqs | total hangs=~w~n", + [length(All), length(TotalHangs)]), + case TotalHangs of + [] -> io:format(" RESULT: no hang reproduced~n"); + _ -> io:format(" RESULT: HANG reproduced (~w)~n", [length(TotalHangs)]) + end, + All. + +collect(0, Acc) -> Acc; +collect(N, Acc) -> + receive + {done, Pid, Round, W, Ms, R} -> + receive {'DOWN', _, process, Pid, _} -> ok after 5000 -> ok end, + collect(N - 1, [{Round, W, Ms, R} | Acc]); + {'DOWN', _, process, _Pid, Reason} when Reason =/= normal -> + collect(N - 1, [{crash, 0, 0, {error, Reason}} | Acc]) + after 30000 -> + Acc + end. + +summarize_results(N, Results) -> + #{n => N, + hangs => [ I || {I, _Ms, {error, timeout}} <- Results ], + errors => [ {I, E} || {I, _Ms, {error, E}} <- Results, E =/= timeout ], + slow => [ {I, Ms} || {I, Ms, ok} <- Results, Ms >= 1000 ], + results => Results}. + +%%==================================================================== +%% Raw frame-level server runs (ALB-quirk injection) +%%==================================================================== + +raw_run(Label, Knobs) -> + io:format("~n========== RAW ~s | ~p ==========~n", [Label, Knobs]), + ensure_started(), + Server = repro_h2_raw_server:start(Knobs), + Port = repro_h2_raw_server:port(Server), + Pool = repro_pool, + catch hackney_pool:stop_pool(Pool), + ok = hackney_pool:start_pool(Pool, [{max_connections, 5}]), + Opts = [{pool, Pool}, {protocols, [http2]}, {recv_timeout, 5000}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + N = maps:get(n, Knobs, 20), + try + Summary = client_loop(Port, N, Opts), + report(Summary), + Summary + after + catch hackney_pool:stop_pool(Pool), + catch repro_h2_raw_server:stop(element(1, Server)) + end. + +%% Try the ALB-quirk matrix until one hangs. +raw_matrix() -> + Cases = [ + {<<"no quirk (raw baseline)">>, #{quirk => none, n => 20}}, + {<<"SETTINGS after each response">>, #{quirk => settings, quirk_when => after_each, n => 20}}, + {<<"SETTINGS after first response">>, #{quirk => settings, quirk_when => after_first, n => 20}}, + {<<"SETTINGS mid first response">>, #{quirk => settings, quirk_when => mid_first, frame_count => 4, n => 20}}, + {<<"SETTINGS mid each response">>, #{quirk => settings, quirk_when => mid_each, frame_count => 4, n => 20}}, + {<<"SETTINGS(iw=65535) after each">>, #{quirk => {settings, [{initial_window_size, 65535}]}, quirk_when => after_each, n => 20}}, + {<<"SETTINGS(iw=16384) after each">>, #{quirk => {settings, [{initial_window_size, 16384}]}, quirk_when => after_each, n => 20}}, + {<<"PING after each response">>, #{quirk => ping, quirk_when => after_each, n => 20}}, + {<<"WINDOW_UPDATE after each">>, #{quirk => window_update, quirk_when => after_each, n => 20}} + ], + [ {Label, raw_run(Label, Knobs)} || {Label, Knobs} <- Cases ]. + +%%==================================================================== +%% h2 server (h2 dep), with knobs +%%==================================================================== + +start_h2_server(Knobs) -> + Certs = cert_dir(), + Handler = fun(Conn, Sid, M, P, H) -> handle_req(Conn, Sid, M, P, H, Knobs) end, + ServerSettings = maps:get(settings, Knobs, #{max_concurrent_streams => unlimited}), + {ok, Server} = h2:start_server(0, #{ + cert => filename:join(Certs, "server.pem"), + key => filename:join(Certs, "server.key"), + handler => Handler, + settings => ServerSettings + }), + Port = h2:server_port(Server), + {Server, Port}. + +stop_h2_server(Server) -> + catch h2:stop_server(Server), + ok. + +handle_req(Conn, Sid, _M, _P, _H, Knobs) -> + BodySize = maps:get(body_size, Knobs, 14000), + FrameCount = maps:get(frame_count, Knobs, 1), + DelayMs = maps:get(inter_frame_ms, Knobs, 0), + Trailers = maps:get(trailers, Knobs, false), + Body = make_body(BodySize), + %% 200, application/json, NO content-length, connection kept alive. + ok = h2:send_response(Conn, Sid, 200, [{<<"content-type">>, <<"application/json">>}]), + case Trailers of + true -> + send_frames(Conn, Sid, Body, FrameCount, DelayMs, false), + h2:send_trailers(Conn, Sid, [{<<"x-trailer">>, <<"end">>}]); + false -> + send_frames(Conn, Sid, Body, FrameCount, DelayMs, true) + end. + +%% Split Body into FrameCount DATA frames. LastFin marks whether the final +%% frame carries END_STREAM (false when trailers will close the stream). +send_frames(Conn, Sid, Body, FrameCount, DelayMs, LastFin) -> + Chunks = chunkify(Body, FrameCount), + send_chunks(Conn, Sid, Chunks, DelayMs, LastFin). + +send_chunks(Conn, Sid, [Last], _DelayMs, LastFin) -> + ok = h2:send_data(Conn, Sid, Last, LastFin); +send_chunks(Conn, Sid, [C | Rest], DelayMs, LastFin) -> + ok = h2:send_data(Conn, Sid, C, false), + case DelayMs of 0 -> ok; _ -> timer:sleep(DelayMs) end, + send_chunks(Conn, Sid, Rest, DelayMs, LastFin). + +chunkify(Body, 1) -> [Body]; +chunkify(Body, N) when N > 1 -> + Size = max(1, (byte_size(Body) + N - 1) div N), + chunkify_1(Body, Size). + +chunkify_1(Body, Size) when byte_size(Body) =< Size -> [Body]; +chunkify_1(Body, Size) -> + <> = Body, + [C | chunkify_1(Rest, Size)]. + +make_body(Size) when Size >= 12 -> + Fill = binary:copy(<<"x">>, Size - 12), + <<"{\"d\":\"", Fill/binary, "\"}">>; +make_body(Size) -> + binary:copy(<<"x">>, Size). + +%%==================================================================== +%% hackney client loop +%%==================================================================== + +client_loop(Port, N, Opts) -> + Url = iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), <<"/">>]), + Results = [ do_one(Url, Opts, I) || I <- lists:seq(1, N) ], + #{n => N, + hangs => [ I || {I, _Ms, {error, timeout}} <- Results ], + errors => [ {I, E} || {I, _Ms, {error, E}} <- Results, E =/= timeout ], + slow => [ {I, Ms} || {I, Ms, ok} <- Results, Ms >= 1000 ], + results => Results}. + +do_one(Url, Opts, I) -> + T0 = erlang:monotonic_time(millisecond), + R = fetch(Url, Opts), + Ms = erlang:monotonic_time(millisecond) - T0, + {Tag, Detail} = summarize(R), + io:format(" [~3w] ~6w ms ~s~n", [I, Ms, Detail]), + {I, Ms, Tag}. + +fetch(Url, Opts) -> + %% In this hackney version request/5 returns the full body in the 4th + %% element (h1 fetches it internally; h2 delivers it in the reply). Only a + %% streaming ref needs hackney:body/1. + case hackney:request(get, Url, [], <<>>, Opts) of + {ok, Status, _H, Body} when is_binary(Body) -> + {ok, Status, byte_size(Body)}; + {ok, Status, _H, Ref} -> + case hackney:body(Ref) of + {ok, Body} -> {ok, Status, byte_size(Body)}; + {error, E} -> {error, E} + end; + {ok, Status, _H} -> {ok, Status, 0}; + {error, E} -> {error, E} + end. + +summarize({ok, Status, Bytes}) -> {ok, io_lib:format("ok status=~w bytes=~w", [Status, Bytes])}; +summarize({error, timeout}) -> {{error, timeout}, "HANG (recv_timeout) -> {error, timeout}"}; +summarize({error, E}) -> {{error, E}, io_lib:format("error: ~p", [E])}. + +report(#{n := N, hangs := Hangs, errors := Errors, slow := Slow}) -> + io:format(" ---- summary: ~w requests | hangs=~w | other_errors=~w | slow(>=1s)=~w~n", + [N, length(Hangs), length(Errors), length(Slow)]), + case Hangs of + [] -> io:format(" RESULT: no hang reproduced~n"); + _ -> io:format(" RESULT: HANG reproduced at request(s) ~p~n", [Hangs]) + end. + +%%==================================================================== +%% Negative control: same body over HTTP/1.1 via cowboy +%%==================================================================== + +negative_control_h1(N) -> + io:format("~n========== negative_control_h1: 14KB chunked over http1, ~w requests ==========~n", [N]), + ensure_started(), + {ok, _} = application:ensure_all_started(cowboy), + Dispatch = cowboy_router:compile([{'_', [{"/", repro_h1_handler, []}]}]), + Certs = cert_dir(), + {ok, _} = cowboy:start_tls(repro_h1_listener, + [{port, 0}, + {certfile, filename:join(Certs, "server.pem")}, + {keyfile, filename:join(Certs, "server.key")}], + #{env => #{dispatch => Dispatch}}), + Port = ranch:get_port(repro_h1_listener), + Pool = repro_pool_h1, + catch hackney_pool:stop_pool(Pool), + ok = hackney_pool:start_pool(Pool, [{max_connections, 5}]), + Opts = [{pool, Pool}, {protocols, [http1]}, {recv_timeout, ?RECV_TIMEOUT}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + try + Summary = client_loop(Port, N, Opts), + report(Summary), + Summary + after + catch hackney_pool:stop_pool(Pool), + catch cowboy:stop_listener(repro_h1_listener) + end. + +%%==================================================================== +%% Part C: drive the h2 dep client directly (bypass hackney) +%%==================================================================== + +direct_h2(Port, N, ConnSettings) -> + io:format("~n========== direct_h2: ~w sequential GETs on ONE h2 connection ==========~n", [N]), + ensure_started(), + {ok, Conn} = h2:connect("localhost", Port, + #{verify => verify_none, settings => ConnSettings}), + ok = h2:wait_connected(Conn, 5000), + Hdrs = [{<<":method">>, <<"GET">>}, {<<":scheme">>, <<"https">>}, + {<<":authority">>, <<"localhost">>}, {<<":path">>, <<"/">>}], + Results = [ direct_one(Conn, Hdrs, I) || I <- lists:seq(1, N) ], + catch h2:close(Conn), + Hangs = [ I || {I, timeout} <- Results ], + io:format(" ---- direct_h2 summary: hangs=~p~n", [Hangs]), + Results. + +direct_one(Conn, Hdrs, I) -> + T0 = erlang:monotonic_time(millisecond), + R = case h2:request(Conn, Hdrs) of + {ok, StreamId} -> direct_read(Conn, StreamId, <<>>); + {error, E} -> {error, E} + end, + Ms = erlang:monotonic_time(millisecond) - T0, + case R of + {ok, Status, Bytes} -> io:format(" [~3w] ~6w ms ok status=~w bytes=~w~n", [I, Ms, Status, Bytes]), {I, ok}; + timeout -> io:format(" [~3w] ~6w ms HANG (no END_STREAM in 5s)~n", [I, Ms]), {I, timeout}; + Other -> io:format(" [~3w] ~6w ms ~p~n", [I, Ms, Other]), {I, Other} + end. + +direct_read(Conn, StreamId, Acc) -> + receive + {h2, Conn, {response, StreamId, Status, _Hdrs}} -> + direct_read_body(Conn, StreamId, Status, Acc); + {h2, Conn, {stream_reset, StreamId, Code}} -> {error, {stream_reset, Code}}; + {h2, Conn, {goaway, _, Code}} -> {error, {goaway, Code}}; + {h2, Conn, closed} -> {error, closed} + after 5000 -> timeout + end. + +direct_read_body(Conn, StreamId, Status, Acc) -> + receive + {h2, Conn, {data, StreamId, Data, true}} -> {ok, Status, byte_size(<>)}; + {h2, Conn, {data, StreamId, Data, false}} -> direct_read_body(Conn, StreamId, Status, <>); + {h2, Conn, {trailers, StreamId, _}} -> {ok, Status, byte_size(Acc)}; + {h2, Conn, {stream_reset, StreamId, Code}} -> {error, {stream_reset, Code}} + after 5000 -> timeout + end. + +%%==================================================================== +%% Localization dump +%%==================================================================== + +dump(PoolName) -> + PoolPid = hackney_pool:find_pool(PoolName), + io:format("~n==== DUMP pool=~p pid=~p ====~n", [PoolName, PoolPid]), + St = sys:get_state(PoolPid), + H2Conns = pool_h2_conns(St), + io:format("h2_connections: ~p~n", [H2Conns]), + [ dump_conn(ConnPid) || {_Key, ConnPid} <- H2Conns ], + ok. + +%% The pool #state{} is a record; pull the h2_connections map positionally is +%% fragile, so match on the field via sys:get_state's tuple and grep for a map +%% whose values are pids. Simplest robust path: print the whole state too. +pool_h2_conns(St) when is_tuple(St) -> + %% Find the map field that looks like #{Key => pid()}. + Maps = [ M || M <- tuple_to_list(St), is_map(M), + lists:all(fun erlang:is_pid/1, maps:values(M)), map_size(M) > 0 ], + case Maps of + [H2 | _] -> maps:to_list(H2); + [] -> [] + end. + +dump_conn(ConnPid) -> + io:format("~n-- hackney_conn ~p --~n", [ConnPid]), + {State, Data} = sys:get_state(ConnPid), + io:format(" state=~p~n", [State]), + print_conn_data(Data), + print_proc("hackney_conn", ConnPid), + case conn_data_field(Data, h2_conn) of + undefined -> io:format(" (no h2_conn)~n"); + H2Conn -> + io:format("~n-- h2_connection ~p --~n", [H2Conn]), + case is_pid(H2Conn) andalso is_process_alive(H2Conn) of + true -> + print_proc("h2_connection", H2Conn), + io:format(" h2 sys:get_state =~n ~p~n", [catch sys:get_state(H2Conn)]); + false -> io:format(" h2_connection dead~n") + end + end. + +%% #conn_data is a record; print the interesting fields positionally-agnostic by +%% pulling them with element/2 after locating known atoms is overkill - instead +%% rely on the fields we care about being printed by record_info via the shell. +%% Here we just dump the raw tuple plus the h2-relevant slice. +print_conn_data(Data) -> + io:format(" conn_data (raw)=~n ~p~n", [Data]). + +conn_data_field(Data, h2_conn) -> + %% Heuristic: the first live-or-dead pid field after the protocol atom. + L = tuple_to_list(Data), + case [ P || P <- L, is_pid(P) ] of + [] -> undefined; + Pids -> lists:last(Pids) %% best-effort; verified against the raw dump + end. + +print_proc(Label, Pid) -> + Info = process_info(Pid, [current_function, current_stacktrace, status, + message_queue_len, messages]), + io:format(" ~s process_info:~n ~p~n", [Label, Info]). + +%%==================================================================== +%% Helpers +%%==================================================================== + +ensure_started() -> + _ = application:ensure_all_started(hackney), + _ = application:ensure_all_started(h2), + ok. + +cert_dir() -> + BeamDir = filename:dirname(code:which(?MODULE)), + Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]), + filename:join([filename:absname(Root), "test", "certs"]). diff --git a/test/repro_h2_raw_server.erl b/test/repro_h2_raw_server.erl new file mode 100644 index 00000000..302dc70d --- /dev/null +++ b/test/repro_h2_raw_server.erl @@ -0,0 +1,216 @@ +%%% Raw, frame-level HTTP/2 TLS server for reproducing ALB-specific behaviour +%%% the high-level h2-dep server cannot emit: a SETTINGS / PING / WINDOW_UPDATE +%%% frame injected after (or mid) a response, exact framing, etc. +%%% +%%% Uses the h2 dep's h2_frame (encode/decode) and h2_hpack (encode) so we only +%%% drive the wire, not reimplement HPACK. Request header blocks are ignored +%%% (we never decode them) - we just respond on the same stream id. +%%% +%%% Knobs (map): +%%% body_size :: integer() (default 14000) +%%% frame_count :: integer() (default 1) split body into N DATA frames +%%% inter_frame_ms :: integer() (default 0) +%%% server_settings:: [{atom(),int()}] sent in the initial SETTINGS +%%% quirk :: none | settings | {settings, [{atom(),int()}]} +%%% | ping | window_update +%%% quirk_when :: after_each | after_first | mid_first | mid_each +-module(repro_h2_raw_server). + +-export([start/1, stop/1, port/1]). + +-define(PREFACE, <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>). + +start(Knobs) -> + Certs = cert_dir(), + {ok, LSock} = ssl:listen(0, + [{certfile, filename:join(Certs, "server.pem")}, + {keyfile, filename:join(Certs, "server.key")}, + {alpn_preferred_protocols, [<<"h2">>]}, + {versions, ['tlsv1.2', 'tlsv1.3']}, + {active, false}, {mode, binary}, {reuseaddr, true}]), + {ok, {_, Port}} = ssl:sockname(LSock), + Pid = spawn_link(fun() -> accept_loop(LSock, Knobs) end), + {Pid, Port}. + +stop(Pid) -> + unlink(Pid), + exit(Pid, shutdown), + ok. + +port({_Pid, Port}) -> Port. + +accept_loop(LSock, Knobs) -> + case ssl:transport_accept(LSock, 5000) of + {ok, TSock} -> + Parent = self(), + spawn(fun() -> handshake(Parent, TSock, Knobs) end), + accept_loop(LSock, Knobs); + {error, timeout} -> + accept_loop(LSock, Knobs); + {error, closed} -> + ok + end. + +handshake(_Parent, TSock, Knobs) -> + case ssl:handshake(TSock, 5000) of + {ok, Sock} -> + %% Read the 24-byte client connection preface, then frames. + case recv_preface(Sock, <<>>) of + {ok, Rest} -> + %% Send our initial SETTINGS immediately. + ServerSettings = maps:get(server_settings, Knobs, []), + send(Sock, h2_frame:settings(ServerSettings)), + EncCtx = h2_hpack:new_context(), + St = #{enc => EncCtx, count => 0, goaway => false}, + conn_loop(Sock, Rest, St, Knobs); + {error, _} -> ok + end; + {error, _} -> ok + end. + +recv_preface(Sock, Acc) when byte_size(Acc) >= 24 -> + <> = Acc, + case Pre of + ?PREFACE -> {ok, Rest}; + _ -> {error, bad_preface} + end; +recv_preface(Sock, Acc) -> + case ssl:recv(Sock, 0, 5000) of + {ok, Data} -> recv_preface(Sock, <>); + {error, R} -> {error, R} + end. + +conn_loop(Sock, Buf, St, Knobs) -> + case h2_frame:decode(Buf) of + {ok, Frame, Rest} -> + case handle_frame(Sock, Frame, St, Knobs) of + {continue, St2} -> + conn_loop(Sock, Rest, St2, Knobs); + stop -> ok + end; + {more, _} -> + case ssl:recv(Sock, 0, 60000) of + {ok, Data} -> conn_loop(Sock, <>, St, Knobs); + {error, _} -> ok + end; + {error, _, Rest} -> + conn_loop(Sock, Rest, St, Knobs); + {error, _} -> + ok + end. + +handle_frame(Sock, {settings, _List}, St, _Knobs) -> + send(Sock, h2_frame:settings_ack()), + {continue, St}; +handle_frame(_Sock, {settings_ack}, St, _Knobs) -> + {continue, St}; +handle_frame(Sock, {ping, Data}, St, _Knobs) -> + send(Sock, h2_frame:ping_ack(Data)), + {continue, St}; +handle_frame(_Sock, {ping_ack, _}, St, _Knobs) -> + {continue, St}; +handle_frame(_Sock, {window_update, _, _}, St, _Knobs) -> + {continue, St}; +handle_frame(_Sock, {goaway, _, _, _}, _St, _Knobs) -> + stop; +handle_frame(_Sock, {rst_stream, _, _}, St, _Knobs) -> + {continue, St}; +%% After we have sent GOAWAY, ignore new streams (the ALB stops answering +%% streams past last_stream_id). The client's read then hangs to recv_timeout. +handle_frame(_Sock, {headers, _StreamId, _B, _E, _H}, #{goaway := true} = St, _Knobs) -> + {continue, St}; +handle_frame(Sock, {headers, StreamId, _Block, _EndStream, _EndHeaders}, St, Knobs) -> + #{enc := EncCtx, count := Count} = St, + Count2 = Count + 1, + EncCtx2 = respond(Sock, StreamId, EncCtx, Count2, Knobs), + St2 = St#{enc := EncCtx2, count := Count2}, + maybe_goaway(Sock, StreamId, Count2, St2, Knobs); +handle_frame(_Sock, _Other, St, _Knobs) -> + {continue, St}. + +%% Recycle the connection like an ALB: after N responses, send GOAWAY with +%% last_stream_id = the stream we just answered, then either keep the socket +%% open (the dangerous case: hackney may keep reusing a doomed conn) or close it. +maybe_goaway(Sock, LastStreamId, Count, St, Knobs) -> + case maps:get(goaway_after, Knobs, infinity) of + N when is_integer(N), Count >= N -> + send(Sock, h2_frame:goaway(LastStreamId, no_error, <<>>)), + case maps:get(goaway_close, Knobs, false) of + true -> timer:sleep(50), ssl:close(Sock), stop; + false -> {continue, St#{goaway := true}} + end; + _ -> + {continue, St} + end. + +respond(Sock, StreamId, EncCtx, ReqCount, Knobs) -> + BodySize = maps:get(body_size, Knobs, 14000), + FrameCount = maps:get(frame_count, Knobs, 1), + DelayMs = maps:get(inter_frame_ms, Knobs, 0), + Body = make_body(BodySize), + %% 200, application/json, NO content-length. + {HBlock, EncCtx2} = h2_hpack:encode( + [{<<":status">>, <<"200">>}, + {<<"content-type">>, <<"application/json">>}], EncCtx), + send(Sock, h2_frame:headers(StreamId, HBlock, false)), + maybe_quirk(Sock, mid, ReqCount, Knobs), + send_body(Sock, StreamId, Body, FrameCount, DelayMs), + maybe_quirk(Sock, after_resp, ReqCount, Knobs), + EncCtx2. + +send_body(Sock, StreamId, Body, FrameCount, DelayMs) -> + Chunks = chunkify(Body, FrameCount), + send_chunks(Sock, StreamId, Chunks, DelayMs). + +send_chunks(Sock, StreamId, [Last], _DelayMs) -> + send(Sock, h2_frame:data(StreamId, Last, true)); +send_chunks(Sock, StreamId, [C | Rest], DelayMs) -> + send(Sock, h2_frame:data(StreamId, C, false)), + case DelayMs of 0 -> ok; _ -> timer:sleep(DelayMs) end, + send_chunks(Sock, StreamId, Rest, DelayMs). + +%% Inject the quirk frame at the requested point. +maybe_quirk(Sock, Point, ReqCount, Knobs) -> + Quirk = maps:get(quirk, Knobs, none), + When = maps:get(quirk_when, Knobs, after_each), + Active = case {Point, When} of + {after_resp, after_each} -> true; + {after_resp, after_first} -> ReqCount =:= 1; + {mid, mid_each} -> true; + {mid, mid_first} -> ReqCount =:= 1; + _ -> false + end, + case Active of + false -> ok; + true -> emit_quirk(Sock, Quirk) + end. + +emit_quirk(_Sock, none) -> ok; +emit_quirk(Sock, settings) -> send(Sock, h2_frame:settings([])); +emit_quirk(Sock, {settings, L}) -> send(Sock, h2_frame:settings(L)); +emit_quirk(Sock, ping) -> send(Sock, h2_frame:ping(<<0,0,0,0,0,0,0,7>>)); +emit_quirk(Sock, window_update) -> send(Sock, h2_frame:window_update(0, 1000)). + +send(Sock, FrameData) -> + ssl:send(Sock, h2_frame:encode(FrameData)). + +chunkify(Body, 1) -> [Body]; +chunkify(Body, N) when N > 1 -> + Size = max(1, (byte_size(Body) + N - 1) div N), + chunkify_1(Body, Size). + +chunkify_1(Body, Size) when byte_size(Body) =< Size -> [Body]; +chunkify_1(Body, Size) -> + <> = Body, + [C | chunkify_1(Rest, Size)]. + +make_body(Size) when Size >= 12 -> + Fill = binary:copy(<<"x">>, Size - 12), + <<"{\"d\":\"", Fill/binary, "\"}">>; +make_body(Size) -> + binary:copy(<<"x">>, Size). + +cert_dir() -> + BeamDir = filename:dirname(code:which(?MODULE)), + Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]), + filename:join([filename:absname(Root), "test", "certs"]).