Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions src/hackney_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@
request_from :: {pid(), reference()} | undefined,
method :: binary() | undefined,
path :: binary() | undefined,
%% Whether the current request carried Connection: close (the caller asked
%% the server to close). Folded into the keepalive decision at checkin so a
%% requested close is never reused. Reset on every request send.
request_close = false :: boolean(),

%% Response state
version :: {integer(), integer()} | undefined,
Expand Down Expand Up @@ -583,9 +587,12 @@ is_no_reuse(Pid) ->

%% @doc Get the flags the pool needs for its checkin decision in one call:
%% whether the connection was SSL upgraded, opted into SSL pooling, must not
%% be reused (proxy tunnels), and the negotiated protocol.
%% be reused (proxy tunnels), the negotiated protocol, whether the response
%% requires the connection to close (keepalive), and whether the socket is
%% proven ready to pool.
-spec checkin_info(pid()) -> #{upgraded_ssl := boolean(), no_reuse := boolean(),
pool_ssl := boolean(), protocol := atom()}.
pool_ssl := boolean(), protocol := atom(),
should_close := boolean(), ready := boolean()}.
checkin_info(Pid) ->
gen_statem:call(Pid, checkin_info).

Expand Down Expand Up @@ -921,7 +928,9 @@ connected({call, From}, is_no_reuse, #conn_data{no_reuse = NoReuse}) ->
{keep_state_and_data, [{reply, From, NoReuse}]};

connected({call, From}, checkin_info, Data) ->
{keep_state_and_data, [{reply, From, checkin_info_map(Data)}]};
%% In connected state: prove the socket is ready before the pool may keep it.
Map = (checkin_info_map(Data))#{ready => socket_ready(Data)},
{keep_state_and_data, [{reply, From, Map}]};

connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, #conn_data{protocol = http2} = Data) ->
%% HTTP/2 request - use h2_machine (1xx not applicable for HTTP/2)
Expand Down Expand Up @@ -1232,10 +1241,15 @@ streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) ->
RequestLine = build_request_line(Method, Path),
HeaderLines = [[Name, <<": ">>, Value, <<"\r\n">>] || {Name, Value} <- HeadersList],
HeadersData = [RequestLine, HeaderLines, <<"\r\n">>],
%% Record whether the caller asked the server to close (the request line is
%% built here, not via do_send_request/5), for the keepalive decision.
RequestClose = hackney_keepalive:request_closes(HeadersWithTE),
case Transport:send(Socket, HeadersData) of
ok ->
From = Data#conn_data.request_from,
{keep_state, Data#conn_data{request_from = undefined}, [{reply, From, ok}]};
{keep_state, Data#conn_data{request_from = undefined,
request_close = RequestClose},
[{reply, From, ok}]};
{error, Reason} ->
From = Data#conn_data.request_from,
{next_state, closed, Data, [{reply, From, {error, Reason}}]}
Expand Down Expand Up @@ -1840,7 +1854,9 @@ handle_common({call, From}, is_no_reuse, _State, #conn_data{no_reuse = NoReuse})
{keep_state_and_data, [{reply, From, NoReuse}]};

handle_common({call, From}, checkin_info, _State, Data) ->
{keep_state_and_data, [{reply, From, checkin_info_map(Data)}]};
%% Any non-connected state (e.g. already closed) is not poolable.
Map = (checkin_info_map(Data))#{ready => false},
{keep_state_and_data, [{reply, From, Map}]};

handle_common({call, From}, get_protocol, _State, #conn_data{protocol = Protocol}) ->
{keep_state_and_data, [{reply, From, Protocol}]};
Expand Down Expand Up @@ -1894,14 +1910,18 @@ reset_async(Data) ->
stream_to = undefined
}.

%% @private Check if connection should be closed based on response headers
should_close_connection(#conn_data{response_headers = undefined}) ->
%% @private Whether the connection must close (not be reused) for keepalive
%% reasons. Distinguishes "no response observed yet" (a fresh or idle pooled
%% conn, which stays poolable) from a parsed response, whose close decision
%% follows RFC 7230 via hackney_keepalive:should_close/3.
should_close_connection(#conn_data{response_headers = undefined, version = undefined}) ->
%% No request/response cycle has run on this connection: keep it poolable so
%% a plain checkout/checkin (no request) still returns the live conn to the
%% pool.
false;
should_close_connection(#conn_data{response_headers = Headers}) ->
case hackney_headers:get_value(<<"connection">>, Headers) of
undefined -> false;
Value -> hackney_bstr:to_lower(Value) =:= <<"close">>
end.
should_close_connection(#conn_data{version = Version, response_headers = Headers,
request_close = RequestClose}) ->
hackney_keepalive:should_close(Version, Headers, RequestClose).

%% @private Finish async streaming - close or return to connected based on Connection header
finish_async_streaming(Data) ->
Expand Down Expand Up @@ -1960,6 +1980,10 @@ do_send_request(Method, Path, Headers, Body, Data) ->
%% Build request headers
FinalHeaders = build_headers(Method, Headers, Body, Netloc),

%% Record whether the caller asked the server to close, for the keepalive
%% decision at checkin. Reset per request.
Data1 = Data#conn_data{request_close = hackney_keepalive:request_closes(FinalHeaders)},

%% Build request line and headers
Path1 = case Path of
<<>> -> <<"/">>;
Expand All @@ -1975,7 +1999,7 @@ do_send_request(Method, Path, Headers, Body, Data) ->
%% Send body if present
case send_body(Transport, Socket, Body) of
ok ->
{ok, Data};
{ok, Data1};
{error, Reason} ->
{error, Reason}
end;
Expand Down Expand Up @@ -2428,6 +2452,16 @@ has_pending_close(Socket) ->
false
end.

%% @private Whether the socket is proven ready to pool: present, no queued close,
%% and still connected. Runs in the conn process (peername + receive-after-0,
%% both fast); consuming a queued close here makes a server-closed idle conn
%% report not-ready. Does not disable active-once, so no stranded-byte regression.
socket_ready(#conn_data{socket = undefined}) ->
false;
socket_ready(#conn_data{transport = Transport, socket = Socket}) ->
not has_pending_close(Socket) andalso
check_socket_health(Transport, Socket) =:= ok.

%% @private Drain and return any {tcp/ssl, Socket, Data} bytes queued in the
%% mailbox. #544 puts idle pooled sockets in {active, once}, which delivers the
%% next inbound bytes (the start of the response on a reused connection) as a
Expand Down Expand Up @@ -2478,16 +2512,24 @@ notify_pool_available_sync(#conn_data{pool_pid = undefined}) ->
ok;
notify_pool_available_sync(#conn_data{pool_pid = PoolPid, upgraded_ssl = UpgradedSsl,
no_reuse = NoReuse, pool_ssl = PoolSsl,
protocol = Protocol}) ->
protocol = Protocol} = Data) ->
%% do_checkin_with_close_flag/3 pools whenever this flag is false, with no
%% further check, so also close on a keepalive-close response and on an
%% unready socket - matching the async-cast checkin gate.
ShouldClose = NoReuse orelse (UpgradedSsl andalso not PoolSsl)
orelse Protocol =/= http1,
orelse Protocol =/= http1
orelse should_close_connection(Data)
orelse not socket_ready(Data),
gen_server:call(PoolPid, {checkin_sync, self(), ShouldClose}, 5000).

%% @private Flags for the pool's checkin decision, see checkin_info/1.
%% @private Flags for the pool's checkin decision, see checkin_info/1. The
%% `ready' flag is added per-state by the checkin_info handlers, since it
%% depends on the conn being in `connected' with a healthy socket.
checkin_info_map(#conn_data{upgraded_ssl = UpgradedSsl, no_reuse = NoReuse,
pool_ssl = PoolSsl, protocol = Protocol}) ->
pool_ssl = PoolSsl, protocol = Protocol} = Data) ->
#{upgraded_ssl => UpgradedSsl, no_reuse => NoReuse,
pool_ssl => PoolSsl, protocol => Protocol}.
pool_ssl => PoolSsl, protocol => Protocol,
should_close => should_close_connection(Data)}.

%% @private Start an async request
do_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, Data) ->
Expand Down
85 changes: 85 additions & 0 deletions src/hackney_keepalive.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
%%% -*- erlang -*-
%%%
%%% This file is part of hackney released under the Apache 2 license.
%%% See the NOTICE for more information.
%%%
%%% Copyright (c) 2012-2024, Benoît Chesneau <benoitc@e-engura.org>

%% @doc HTTP/1.x keepalive semantics.
%%
%% Single source of truth for deciding whether an HTTP/1.x connection must be
%% closed (not reused) after a response, per RFC 7230. The `Connection' header
%% is a list-valued, hop-by-hop field (RFC 7230 3.2.2, 6.1): a recipient may get
%% it as several header lines or as one comma-joined value, and both are
%% equivalent. It is forbidden in HTTP/2 and HTTP/3, so these rules apply only to
%% HTTP/1.x; multiplexed conns are never pooled in `available'.
%%
%% Every function tolerates undefined or malformed header objects so a bad header
%% can never crash the keepalive decision.
-module(hackney_keepalive).

-export([should_close/3,
request_closes/1,
connection_tokens/1]).

%% @doc Whether a parsed HTTP/1.x response means the connection must close.
%%
%% The caller (hackney_conn:should_close_connection/1) guards the "no response
%% observed yet" case; here `Version'/`RespHeaders' describe a response that was
%% actually parsed. Order matters: an explicit `close' wins over version default.
-spec should_close(Version, RespHeaders, RequestClose) -> boolean() when
Version :: {integer(), integer()} | undefined,
RespHeaders :: term(),
RequestClose :: boolean().
should_close(_Version, _RespHeaders, true) ->
%% We asked the server to close (request carried Connection: close).
true;
should_close(Version, RespHeaders, false) ->
Tokens = connection_tokens(RespHeaders),
case lists:member(<<"close">>, Tokens) of
true ->
true;
false ->
case Version of
{1, 1} ->
%% HTTP/1.1 default is keep-alive; an absent Connection header
%% stays persistent and poolable.
false;
{1, 0} ->
%% HTTP/1.0 default is close unless it opts into keep-alive.
not lists:member(<<"keep-alive">>, Tokens);
_ ->
%% Unknown version on a parsed response: close on the safe side.
true
end
end.

%% @doc Whether request headers carry `Connection: close'.
-spec request_closes(term()) -> boolean().
request_closes(ReqHeaders) ->
lists:member(<<"close">>, connection_tokens(ReqHeaders)).

%% @doc Lower-cased, trimmed tokens from every `Connection' header.
%%
%% Defensive at each layer: an undefined or malformed header object yields `[]',
%% and a value that does not convert to a binary is skipped rather than crashing.
-spec connection_tokens(term()) -> [binary()].
connection_tokens(undefined) ->
[];
connection_tokens(Headers) ->
Values = try hackney_headers:lookup(<<"connection">>, Headers)
catch _:_ -> []
end,
lists:flatmap(fun({_Key, Value}) -> value_tokens(Value) end, Values).

%% @private
value_tokens(Value) ->
case (try hackney_bstr:to_binary(Value) catch _:_ -> error end) of
error ->
[];
Bin ->
Lower = hackney_bstr:to_lower(Bin),
Parts = binary:split(Lower, <<",">>, [global]),
Trimmed = [hackney_bstr:trim(P) || P <- Parts],
[T || T <- Trimmed, T =/= <<>>]
end.
79 changes: 49 additions & 30 deletions src/hackney_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -874,17 +874,12 @@ h3_connection_key(Host0, Port, Transport, Options) ->
stop_conn(Pid) ->
try hackney_conn:stop(Pid) catch _:_ -> ok end.

%% @private Find a reusable idle connection for `Key', discarding any that are
%% no longer keepalive-ready. Only a conn that is_ready reports `{ok, connected}'
%% is handed out; a closed conn is stopped and dropped (never reanimated). Fresh
%% dialing for an empty bucket is the caller's `none' branch, off the pool's hot
%% path. The SSL alias exists only to mark intent at the SSL checkout site.
find_available(Key, Available) ->
find_available(Key, Available, true).

%% @private Like find_available/2 but never redials a closed connection.
%% Used for SSL buckets: hackney_conn:connect/1 on an upgraded conn would
%% reconnect the raw transport without the upgrade handshake options, so a
%% closed conn is stopped and dropped instead.
find_available_ssl(Key, Available) ->
find_available(Key, Available, false).

find_available(Key, Available, Redial) ->
case maps:find(Key, Available) of
{ok, [Pid | Rest]} ->
Available2 = case Rest of
Expand All @@ -896,34 +891,36 @@ find_available(Key, Available, Redial) ->
true ->
%% is_ready checks both state and socket health in one call.
%% The connection can die between is_process_alive/1 above
%% and these gen_statem calls (flaky network); the resulting
%% and this gen_statem call (flaky network); the resulting
%% noproc exit must not crash the pool, so skip and move on.
try hackney_conn:is_ready(Pid) of
{ok, connected} -> {ok, Pid, Available2};
{ok, closed} when Redial ->
%% Connection closed, try reconnect
try hackney_conn:connect(Pid) of
ok -> {ok, Pid, Available2};
_ -> find_available(Key, Available2, Redial)
catch
_:_ -> find_available(Key, Available2, Redial)
end;
{ok, closed} ->
{ok, connected} ->
{ok, Pid, Available2};
_ ->
%% Closed or unusable: discard it rather than redial
%% from inside the pool. Reanimating a closed pid would
%% break the "only keepalive conns are reused" invariant
%% and a redial here would block the pool on connect.
stop_conn(Pid),
find_available(Key, Available2, Redial);
_ -> find_available(Key, Available2, Redial)
find_available(Key, Available2)
catch
_:_ -> find_available(Key, Available2, Redial)
_:_ -> find_available(Key, Available2)
end;
false ->
find_available(Key, Available2, Redial)
find_available(Key, Available2)
end;
{ok, []} ->
none;
error ->
none
end.

%% @private SSL-bucket variant. Now identical to find_available/2 (closed conns
%% are always dropped, never redialed); kept as a named alias to mark intent at
%% the SSL checkout site.
find_available_ssl(Key, Available) ->
find_available(Key, Available).

%% @private SSL checkout miss: reuse or dial a TCP connection for the caller
%% to upgrade. It is recorded in in_use under the SSL key so the checkin
%% decision can tell it apart from plain TCP checkouts.
Expand Down Expand Up @@ -1031,9 +1028,14 @@ do_checkin(Pid, State) ->
%% Check if connection is still alive
case is_process_alive(Pid) of
true ->
%% One call fetches every flag the decision needs
Info = try hackney_conn:checkin_info(Pid) catch _:_ -> #{} end,
case checkin_poolable(Key, Info) andalso pool_has_idle_room(State) of
%% One call fetches every flag the decision needs. A failed
%% checkin_info means we cannot prove the conn keepalive/ready,
%% so treat it as not poolable (close) rather than pooling blind.
Poolable = case checkin_info(Pid) of
{ok, Info} -> checkin_poolable(Key, Info);
error -> false
end,
case Poolable andalso pool_has_idle_room(State) of
true ->
checkin_pool(Pid, Key, InUse2, State);
false ->
Expand All @@ -1060,10 +1062,27 @@ do_checkin(Pid, State) ->
checkin_poolable({_Host, _Port, hackney_ssl, _TlsKey}, Info) ->
maps:get(no_reuse, Info, true) =:= false andalso
maps:get(upgraded_ssl, Info, false) =:= true andalso
maps:get(protocol, Info, undefined) =:= http1;
maps:get(protocol, Info, undefined) =:= http1 andalso
keepalive_ready(Info);
checkin_poolable(_TcpKey, Info) ->
maps:get(no_reuse, Info, false) =:= false andalso
maps:get(upgraded_ssl, Info, false) =:= false.
maps:get(upgraded_ssl, Info, false) =:= false andalso
keepalive_ready(Info).

%% @private Fetch the conn's checkin flags, or `error' if the call fails (the
%% conn died between is_process_alive/1 and here). Caller treats `error' as
%% not poolable.
checkin_info(Pid) ->
try {ok, hackney_conn:checkin_info(Pid)}
catch _:_ -> error
end.

%% @private Shared keepalive/readiness gate for checkin: only pool a conn whose
%% response left it reusable and whose socket is proven ready. Defaults are the
%% safe side so an unknown flag closes rather than pools.
keepalive_ready(Info) ->
maps:get(should_close, Info, true) =:= false andalso
maps:get(ready, Info, false) =:= true.

%% @private Close branch of a checkin: drop the monitor and keep the host's
%% TCP prewarm warm (the replacement for a closed conn is a TCP conn that
Expand Down
5 changes: 4 additions & 1 deletion test/hackney_integration_tests_async_long_headers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ dummy_server_loop(LSock, Port, StatusCode) ->
]),
send(Sock, Response),
ok = gen_tcp:shutdown(Sock, read_write),
dummy_server_loop(LSock, RedirectUrl, StatusCode).
%% Recurse with Port (not RedirectUrl): the loop must keep accepting. Pool
%% prewarm reconnects to this host after a closed conn is checked in, so a
%% second accept happens; passing the binary URL here crashed integer_to_list.
dummy_server_loop(LSock, Port, StatusCode).

send(Sock, << Data :128/binary, Rest/binary>>) ->
ok = gen_tcp:send(Sock, Data),
Expand Down
Loading
Loading