Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion doc/modules/ROOT/pages/4.guide/4e.tcp-acceptor.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ connection on `peer` is closed first.
The operation is asynchronous—your coroutine suspends until a connection
arrives or an error occurs.

There is also a returning overload that constructs the peer socket for you,
associated with the acceptor's execution context:

[source,cpp]
----
auto [ec, peer] = co_await acc.accept();
----

Prefer the returning overload for the common case: it is simpler and guarantees
the acceptor and socket use the same `io_context`. Use the
`accept(tcp_socket&)` form when you pre-allocate or recycle sockets and want to
manage their lifetime yourself.

=== Errors

Common accept errors:
Expand All @@ -158,7 +171,8 @@ Common accept errors:
=== Preconditions

* The tcp_acceptor must be listening (`is_open() == true`)
* The peer socket must be associated with the same execution context
* For `accept(tcp_socket&)`, the peer socket must be associated with the same
execution context as the acceptor (the returning overload guarantees this)

== Cancellation

Expand Down
56 changes: 56 additions & 0 deletions include/boost/corosio/native/native_tcp_acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,44 @@ class native_tcp_acceptor : public tcp_acceptor
}
};

struct native_accept_value_awaitable
{
native_tcp_acceptor& acc_;
tcp_socket peer_;
std::stop_token token_;
mutable std::error_code ec_;
mutable io_object::implementation* peer_impl_ = nullptr;

explicit native_accept_value_awaitable(native_tcp_acceptor& acc)
: acc_(acc)
, peer_(acc.context())
{
}

bool await_ready() const noexcept
{
return token_.stop_requested();
}

capy::io_result<tcp_socket> await_resume() noexcept
{
if (token_.stop_requested())
return {make_error_code(std::errc::operation_canceled),
std::move(peer_)};
if (!ec_ && peer_impl_)
acc_.reset_peer_impl(peer_, peer_impl_);
return {ec_, std::move(peer_)};
}

auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
-> std::coroutine_handle<>
{
token_ = env->stop_token;
return acc_.get_impl().accept(
h, env->executor, token_, &ec_, &peer_impl_);
}
};

public:
/** Construct a native acceptor from an execution context.

Expand Down Expand Up @@ -208,6 +246,24 @@ class native_tcp_acceptor : public tcp_acceptor
return native_accept_awaitable(*this, peer);
}

/** Asynchronously accept an incoming connection, returning the peer.

Calls the backend implementation directly, bypassing virtual
dispatch. Otherwise identical to @ref tcp_acceptor::accept().

@return An awaitable yielding `io_result<tcp_socket>`.

@throws std::logic_error if the acceptor is not listening.

This acceptor must outlive the returned awaitable.
*/
auto accept()
{
if (!is_open())
detail::throw_logic_error("accept: acceptor not listening");
return native_accept_value_awaitable(*this);
}

/** Asynchronously wait for the acceptor to be ready.

Calls the backend implementation directly, bypassing virtual
Expand Down
83 changes: 83 additions & 0 deletions include/boost/corosio/tcp_acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,45 @@ class BOOST_COROSIO_DECL tcp_acceptor : public io_object
}
};

struct accept_value_awaitable
{
tcp_acceptor& acc_;
tcp_socket peer_;
std::stop_token token_;
mutable std::error_code ec_;
mutable io_object::implementation* peer_impl_ = nullptr;

explicit accept_value_awaitable(tcp_acceptor& acc)
: acc_(acc)
, peer_(acc.context())
{
}

bool await_ready() const noexcept
{
return token_.stop_requested();
}

capy::io_result<tcp_socket> await_resume() noexcept
{
if (token_.stop_requested())
return {make_error_code(std::errc::operation_canceled),
std::move(peer_)};

if (!ec_ && peer_impl_)
peer_.h_.reset(peer_impl_);
return {ec_, std::move(peer_)};
}

auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
-> std::coroutine_handle<>
{
token_ = env->stop_token;
return acc_.get().accept(
h, env->executor, token_, &ec_, &peer_impl_);
}
};

public:
/** Destructor.

Expand Down Expand Up @@ -341,6 +380,8 @@ class BOOST_COROSIO_DECL tcp_acceptor : public io_object
// Use peer socket
}
@endcode

@see accept()
*/
auto accept(tcp_socket& peer)
{
Expand All @@ -349,6 +390,48 @@ class BOOST_COROSIO_DECL tcp_acceptor : public io_object
return accept_awaitable(*this, peer);
}

/** Initiate an asynchronous accept operation, returning the peer.

Accepts an incoming connection and returns a newly constructed
socket for it, associated with this acceptor's execution context.
The acceptor must be listening before calling this function.

The caller does not pre-construct the peer socket; the returned
socket shares this acceptor's execution context.

The operation supports cancellation via `std::stop_token` through
the affine awaitable protocol. If the associated stop token is
triggered, the operation completes immediately with
`errc::operation_canceled`.

@return An awaitable that completes with `io_result<tcp_socket>`.
On success the payload is the connected peer socket; on failure
(including cancellation) the error code is set and the payload
socket is unconnected. Errors include:
- operation_canceled: Cancelled via stop_token or cancel().
Check `ec == cond::canceled` for portable comparison.

@par Preconditions
The acceptor must be listening (`is_open() == true`). This acceptor
must outlive the returned awaitable.

@par Example
@code
auto [ec, peer] = co_await acc.accept();
if (!ec) {
// peer is a connected socket
}
@endcode

@see accept(tcp_socket&)
*/
auto accept()
{
if (!is_open())
detail::throw_logic_error("accept: acceptor not listening");
return accept_value_awaitable(*this);
}

/** Wait for an incoming connection or readiness condition.

Suspends until the listen socket is ready in the
Expand Down
51 changes: 51 additions & 0 deletions test/unit/native/native_tcp_acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ struct native_tcp_acceptor_test
decltype(std::declval<tcp_acceptor&>().accept(
std::declval<tcp_socket&>()))>,
"native_tcp_acceptor::accept must shadow tcp_acceptor::accept");
static_assert(
!std::is_same_v<
decltype(std::declval<native_tcp_acceptor<Backend>&>().accept()),
decltype(std::declval<tcp_acceptor&>().accept())>,
"native_tcp_acceptor::accept() must shadow tcp_acceptor::accept()");
static_assert(
!std::is_same_v<
decltype(std::declval<native_tcp_acceptor<Backend>&>().wait(
Expand Down Expand Up @@ -120,6 +125,51 @@ struct native_tcp_acceptor_test
BOOST_TEST(!wait_ec);
}

// Exercise the shadowed returning accept() awaitable on the
// devirtualized path: it yields a connected peer socket.
void testNativeAcceptReturning()
{
io_context ioc(Backend);
auto ex = ioc.get_executor();

native_tcp_acceptor<Backend> acc(ioc);
acc.open();
acc.set_option(native_socket_option::reuse_address(true));
auto bec = acc.bind(endpoint(ipv4_address::loopback(), 0));
BOOST_TEST(!bec);
auto lec = acc.listen();
BOOST_TEST(!lec);
auto port = acc.local_endpoint().port();

native_tcp_socket<Backend> client(ioc);
client.open();

std::error_code accept_ec;
bool accept_done = false;
bool peer_connected = false;

auto acceptor = [&]() -> capy::task<> {
auto [ec, peer] = co_await acc.accept();
accept_ec = ec;
if (!ec)
peer_connected = peer.remote_endpoint().port() != 0;
accept_done = true;
};
auto connector = [&]() -> capy::task<> {
auto [ec] = co_await client.connect(
endpoint(ipv4_address::loopback(), port));
(void)ec;
};

capy::run_async(ex)(acceptor());
capy::run_async(ex)(connector());
ioc.run();

BOOST_TEST(accept_done);
BOOST_TEST(!accept_ec);
BOOST_TEST(peer_connected);
}

#ifdef SO_REUSEPORT
void testNativeReusePort()
{
Expand All @@ -143,6 +193,7 @@ struct native_tcp_acceptor_test
testAcceptorMoveConstruct();
testAcceptorPolymorphicSlice();
testWait();
testNativeAcceptReturning();
#ifdef SO_REUSEPORT
testNativeReusePort();
#endif
Expand Down
59 changes: 59 additions & 0 deletions test/unit/tcp_acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,64 @@ struct tcp_acceptor_test
acc.close();
}

void testAcceptReturning()
{
// Returning overload: accept() yields the peer socket directly,
// associated with the acceptor's execution context.
io_context ioc(Backend);
tcp_acceptor acc(ioc);
acc.open(tcp::v6());
acc.set_option(socket_option::reuse_address(true));
auto ec = acc.bind(endpoint(ipv6_address::loopback(), 0));
BOOST_TEST(!ec);
ec = acc.listen();
BOOST_TEST(!ec);
auto port = acc.local_endpoint().port();

tcp_socket client(ioc);

bool accept_done = false;
bool connect_done = false;
bool peer_local_v6 = false;
bool peer_remote_v6 = false;
std::error_code accept_ec, connect_ec;

auto ex = ioc.get_executor();
capy::run_async(ex)(
[](tcp_acceptor& a, std::error_code& ec_out, bool& done,
bool& local_v6, bool& remote_v6) -> capy::task<> {
auto [ec, peer] = co_await a.accept();
ec_out = ec;
if (!ec)
{
local_v6 = peer.local_endpoint().is_v6();
remote_v6 = peer.remote_endpoint().is_v6();
}
done = true;
}(acc, accept_ec, accept_done, peer_local_v6, peer_remote_v6));

capy::run_async(ex)(
[](tcp_socket& s, endpoint ep, std::error_code& ec_out,
bool& done) -> capy::task<> {
auto [ec] = co_await s.connect(ep);
ec_out = ec;
done = true;
}(client, endpoint(ipv6_address::loopback(), port), connect_ec,
connect_done));

ioc.run();

BOOST_TEST(accept_done);
BOOST_TEST(!accept_ec);
BOOST_TEST(connect_done);
BOOST_TEST(!connect_ec);
BOOST_TEST(peer_local_v6);
BOOST_TEST(peer_remote_v6);

client.close();
acc.close();
}

void testDualStackAccept()
{
io_context ioc(Backend);
Expand Down Expand Up @@ -666,6 +724,7 @@ struct tcp_acceptor_test
// IPv6
testListenV6();
testAcceptV6();
testAcceptReturning();

// Dual-stack
testDualStackAccept();
Expand Down
Loading