From d77922097976301ee2cd0088ff601da923ca3df9 Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Tue, 23 Jun 2026 14:17:03 -0600 Subject: [PATCH] feat(test): honor io_env::stop_token in capy::test awaitables (#317) All capy::test mock awaitables now resolve to error::canceled when the environment's stop token is requested, so code under test can exercise its cancellation paths against the mocks. The synchronous mocks check the token up front; stream::read_some wakes a blocked read via a stop callback while preserving its synchronous fast path when data is already buffered. io_env reaches an awaitable only through await_suspend, so the synchronous mocks must suspend once to observe it. They return false from await_suspend (conditional no-suspend) rather than self-transferring via the returned handle: the return-h form leaks the awaiting coroutine frame under run_async. Also removes now-unused includes from the any_* tests. --- .../ROOT/pages/7.testing/7a.drivers.adoc | 30 +++ include/boost/capy/test/buffer_sink.hpp | 62 +++-- include/boost/capy/test/buffer_source.hpp | 39 ++-- include/boost/capy/test/read_source.hpp | 51 ++++- include/boost/capy/test/read_stream.hpp | 44 ++-- include/boost/capy/test/stream.hpp | 201 +++++++++++++--- include/boost/capy/test/write_sink.hpp | 94 ++++++-- include/boost/capy/test/write_stream.hpp | 41 ++-- test/unit/io/any_read_source.cpp | 1 - test/unit/io/any_write_sink.cpp | 1 - test/unit/io/any_write_stream.cpp | 1 - test/unit/test/buffer_sink.cpp | 51 +++++ test/unit/test/buffer_source.cpp | 25 ++ test/unit/test/read_source.cpp | 48 ++++ test/unit/test/read_stream.cpp | 25 ++ test/unit/test/stream.cpp | 216 ++++++++++++++++++ test/unit/test/write_sink.cpp | 88 +++++++ test/unit/test/write_stream.cpp | 25 ++ 18 files changed, 914 insertions(+), 129 deletions(-) diff --git a/doc/modules/ROOT/pages/7.testing/7a.drivers.adoc b/doc/modules/ROOT/pages/7.testing/7a.drivers.adoc index bd440f257..e6ec0c91e 100644 --- a/doc/modules/ROOT/pages/7.testing/7a.drivers.adoc +++ b/doc/modules/ROOT/pages/7.testing/7a.drivers.adoc @@ -120,6 +120,36 @@ context such as a thread pool. `on_error(ep)` on failure. |=== +=== Exercising Cancellation + +The stop token supplied to `run_blocking` propagates through the execution +environment to every `capy::test` awaitable. When the token is requested, the +next awaited test operation resolves to `error::canceled` instead of +performing its work, so code under test can be driven down its cancellation +paths without real I/O. + +[source,cpp] +---- +std::stop_source src; +src.request_stop(); + +run_blocking(src.get_token())([&]() -> task<> +{ + read_stream rs; + rs.provide("ignored"); + + char buf[32]; + auto [ec, n] = co_await rs.read_some(make_buffer(buf)); + assert(ec == cond::canceled); // honored the stop token +}()); +---- + +The mock sources, sinks, and buffer adapters complete synchronously, so they +check the token up front: an outstanding stop request yields `error::canceled` +on the next operation. A connected `stream` whose `read_some` is *blocked* +waiting for its peer is resumed with `error::canceled` when the token fires; a +read that can satisfy from already-buffered data is unaffected. + == fuse `fuse` tests every error-handling path in a coroutine by injecting failures diff --git a/include/boost/capy/test/buffer_sink.hpp b/include/boost/capy/test/buffer_sink.hpp index 5c6991f02..31996615e 100644 --- a/include/boost/capy/test/buffer_sink.hpp +++ b/include/boost/capy/test/buffer_sink.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -152,6 +153,10 @@ class buffer_sink @return An awaitable that await-returns `(error_code)`. + @par Cancellation + If the environment's stop token has been requested, the commit + completes immediately with `error::canceled` and commits no data. + @see fuse */ auto @@ -161,29 +166,31 @@ class buffer_sink { buffer_sink* self_; std::size_t n_; - - bool await_ready() const noexcept { return true; } - - // This method is required to satisfy Capy's IoAwaitable concept, - // but is never called because await_ready() returns true. - // - // Capy uses a two-layer awaitable system: the promise's - // await_transform wraps awaitables in a transform_awaiter whose - // standard await_suspend(coroutine_handle) calls this custom - // 2-argument overload, passing the io_env from the coroutine's - // context. For synchronous test awaitables like this one, the - // coroutine never suspends, so this is not invoked. The signature - // exists to allow the same awaitable type to work with both - // synchronous (test) and asynchronous (real I/O) code. - void await_suspend( + bool canceled_ = false; + + bool await_ready() const noexcept { return false; } + + // The operation completes synchronously, but await_suspend is + // the only place io_env is delivered (the promise's + // transform_awaiter forwards it here). Returning false means + // the coroutine does not actually suspend; it resumes + // immediately, having observed the stop token. See io_env, + // IoAwaitable. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result<> await_resume() { + if(canceled_) + return {error::canceled}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec}; @@ -209,6 +216,11 @@ class buffer_sink @return An awaitable that await-returns `(error_code)`. + @par Cancellation + If the environment's stop token has been requested, the operation + completes immediately with `error::canceled`, commits no data, and + does not signal end-of-stream. + @see fuse */ auto @@ -218,21 +230,27 @@ class buffer_sink { buffer_sink* self_; std::size_t n_; + bool canceled_ = false; - bool await_ready() const noexcept { return true; } + bool await_ready() const noexcept { return false; } - // This method is required to satisfy Capy's IoAwaitable concept, - // but is never called because await_ready() returns true. - // See the comment on commit(std::size_t) for a detailed explanation. - void await_suspend( + // Reads the stop token without suspending; see the comment + // on commit() for details. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result<> await_resume() { + if(canceled_) + return {error::canceled}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec}; diff --git a/include/boost/capy/test/buffer_source.hpp b/include/boost/capy/test/buffer_source.hpp index f80ad244d..0c1a2542f 100644 --- a/include/boost/capy/test/buffer_source.hpp +++ b/include/boost/capy/test/buffer_source.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -142,6 +143,10 @@ class buffer_source @return An awaitable that await-returns `(error_code,std::span)`. + @par Cancellation + If the environment's stop token has been requested, the pull + completes immediately with `error::canceled` and an empty span. + @see consume, fuse */ auto @@ -151,29 +156,31 @@ class buffer_source { buffer_source* self_; std::span dest_; - - bool await_ready() const noexcept { return true; } - - // This method is required to satisfy Capy's IoAwaitable concept, - // but is never called because await_ready() returns true. - // - // Capy uses a two-layer awaitable system: the promise's - // await_transform wraps awaitables in a transform_awaiter whose - // standard await_suspend(coroutine_handle) calls this custom - // 2-argument overload, passing the io_env from the coroutine's - // context. For synchronous test awaitables like this one, the - // coroutine never suspends, so this is not invoked. The signature - // exists to allow the same awaitable type to work with both - // synchronous (test) and asynchronous (real I/O) code. - void await_suspend( + bool canceled_ = false; + + bool await_ready() const noexcept { return false; } + + // The operation completes synchronously, but await_suspend is + // the only place io_env is delivered (the promise's + // transform_awaiter forwards it here). Returning false means + // the coroutine does not actually suspend; it resumes + // immediately, having observed the stop token. See io_env, + // IoAwaitable. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result> await_resume() { + if(canceled_) + return {error::canceled, {}}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, {}}; diff --git a/include/boost/capy/test/read_source.hpp b/include/boost/capy/test/read_source.hpp index 4ab265a30..1fd19ce0e 100644 --- a/include/boost/capy/test/read_source.hpp +++ b/include/boost/capy/test/read_source.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -123,6 +124,12 @@ class read_source @return An awaitable that await-returns `(error_code,std::size_t)`. + @par Cancellation + If the environment's stop token has been requested, the read + completes immediately with `error::canceled` and transfers no + data. An empty buffer sequence is a no-op that completes + successfully regardless of the stop token. + @see fuse */ template @@ -133,13 +140,23 @@ class read_source { read_source* self_; MB buffers_; - - bool await_ready() const noexcept { return true; } - - void await_suspend( + bool canceled_ = false; + + bool await_ready() const noexcept { return false; } + + // The operation completes synchronously, but await_suspend is + // the only place io_env is delivered (the promise's + // transform_awaiter forwards it here). Returning false means + // the coroutine does not actually suspend; it resumes + // immediately, having observed the stop token. See io_env, + // IoAwaitable. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result @@ -148,6 +165,9 @@ class read_source if(buffer_empty(buffers_)) return {{}, 0}; + if(canceled_) + return {error::canceled, 0}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, 0}; @@ -183,6 +203,12 @@ class read_source @return An awaitable that await-returns `(error_code,std::size_t)`. + @par Cancellation + If the environment's stop token has been requested, the read + completes immediately with `error::canceled` and transfers no + data. An empty buffer sequence is a no-op that completes + successfully regardless of the stop token. + @see fuse */ template @@ -193,13 +219,19 @@ class read_source { read_source* self_; MB buffers_; + bool canceled_ = false; - bool await_ready() const noexcept { return true; } + bool await_ready() const noexcept { return false; } - void await_suspend( + // Reads the stop token without suspending; see the comment + // on read_some() for details. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result @@ -208,6 +240,9 @@ class read_source if(buffer_empty(buffers_)) return {{}, 0}; + if(canceled_) + return {error::canceled, 0}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, 0}; diff --git a/include/boost/capy/test/read_stream.hpp b/include/boost/capy/test/read_stream.hpp index d31143507..6e1207769 100644 --- a/include/boost/capy/test/read_stream.hpp +++ b/include/boost/capy/test/read_stream.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -125,6 +126,13 @@ class read_stream @par Exception Safety No-throw guarantee. + @par Cancellation + If the environment's stop token has been requested, the read + completes immediately with `error::canceled` and transfers no + data. This lets code under test exercise its cancellation paths. + An empty buffer sequence is a no-op that completes successfully + regardless of the stop token. + @param buffers The mutable buffer sequence to receive data. @return An awaitable that await-returns `(error_code,std::size_t)`. @@ -139,34 +147,36 @@ class read_stream { read_stream* self_; MB buffers_; - - bool await_ready() const noexcept { return true; } - - // This method is required to satisfy Capy's IoAwaitable concept, - // but is never called because await_ready() returns true. - // - // Capy uses a two-layer awaitable system: the promise's - // await_transform wraps awaitables in a transform_awaiter whose - // standard await_suspend(coroutine_handle) calls this custom - // 2-argument overload, passing the io_env from the coroutine's - // context. For synchronous test awaitables like this one, the - // coroutine never suspends, so this is not invoked. The signature - // exists to allow the same awaitable type to work with both - // synchronous (test) and asynchronous (real I/O) code. - void await_suspend( + bool canceled_ = false; + + bool await_ready() const noexcept { return false; } + + // The operation completes synchronously, but await_suspend + // is the only place io_env is delivered (the promise's + // transform_awaiter forwards it here). Returning false means + // the coroutine does not actually suspend — it resumes + // immediately — so the read still completes synchronously + // while having observed the stop token. See io_env, IoAwaitable. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result await_resume() { // Empty buffer is a no-op regardless of - // stream state or fuse. + // stream state, stop token, or fuse. if(buffer_empty(buffers_)) return {{}, 0}; + if(canceled_) + return {error::canceled, 0}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, 0}; diff --git a/include/boost/capy/test/stream.hpp b/include/boost/capy/test/stream.hpp index 32078e678..21ce1f918 100644 --- a/include/boost/capy/test/stream.hpp +++ b/include/boost/capy/test/stream.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -24,7 +25,9 @@ #include #include +#include #include +#include #include #include #include @@ -89,6 +92,10 @@ class stream std::size_t max_read_size = std::size_t(-1); continuation pending_cont_; executor_ref pending_ex; + // Points at the suspended reader's claim flag (owned by the + // read awaitable). Lets a peer wake coordinate with a stop + // callback so the parked read is resumed exactly once. + std::atomic* pending_claimed = nullptr; bool eof = false; }; @@ -103,20 +110,31 @@ class stream { } + // Resume a suspended reader on this side, if any. Claims the + // reader's atomic so it is never double-resumed by a racing + // stop callback; the loser of the race skips the post. + static void wake(half& side) + { + if(! side.pending_cont_.h) + return; + if(! side.pending_claimed || + ! side.pending_claimed->exchange( + true, std::memory_order_acq_rel)) + { + side.pending_ex.post(side.pending_cont_); + } + side.pending_cont_.h = {}; + side.pending_ex = {}; + side.pending_claimed = nullptr; + } + // Set closed and resume any suspended readers // with eof on both sides. void close() { closed = true; for(auto& side : sides) - { - if(side.pending_cont_.h) - { - side.pending_ex.post(side.pending_cont_); - side.pending_cont_.h = {}; - side.pending_ex = {}; - } - } + wake(side); } }; @@ -166,12 +184,7 @@ class stream int peer = 1 - index_; auto& side = state_->sides[peer]; side.eof = true; - if(side.pending_cont_.h) - { - side.pending_ex.post(side.pending_cont_); - side.pending_cont_.h = {}; - side.pending_ex = {}; - } + state::wake(side); } /** Set the maximum bytes returned per read. @@ -204,17 +217,106 @@ class stream @return An awaitable that await-returns `(error_code,std::size_t)`. + @par Cancellation + Cancellation applies only to a read that would otherwise suspend: + if no data is available and the environment's stop token is + requested (before or during the wait), the read resumes with + `error::canceled`. A read that can complete immediately from + buffered data is unaffected by the stop token. + @see fuse, close */ template auto read_some(MB buffers) { + // The read suspends when no data is available, parking its + // continuation on the side until the peer writes/closes. To + // support cancellation it follows the same pattern as + // delay_awaitable: a stop callback claims the resume (racing + // the peer wake via an atomic) and posts the continuation + // through the executor. Because it owns a std::atomic and a + // std::stop_callback, the awaitable needs explicit move and + // destruction (the task promise moves it into its + // transform_awaiter before awaiting). struct awaitable { stream* self_; MB buffers_; + // Declared before stop_cb_buf_: the stop callback reads + // these, so they must outlive a blocking stop_cb_ destructor. + continuation cont_; + executor_ref ex_; + half* side_ = nullptr; + std::atomic claimed_{false}; + bool canceled_ = false; + bool stop_cb_active_ = false; + + struct cancel_fn + { + awaitable* self_; + + void operator()() const noexcept + { + if(! self_->claimed_.exchange( + true, std::memory_order_acq_rel)) + { + self_->canceled_ = true; + self_->ex_.post(self_->cont_); + } + } + }; + + using stop_cb_t = std::stop_callback; + + // Declared last: its destructor may block while the callback + // accesses the members above. A union gives correct alignment + // for stop_cb_t without an alignas specifier, which avoids + // MSVC's C4324 padding warning on this function-local class + // (the member-level pragma used by delay_awaitable does not + // suppress it here). Lifetime is managed manually: placement + // new in await_suspend, explicit destruction once done. + union { stop_cb_t stop_cb_; }; + + awaitable(stream* self, MB buffers) noexcept + : self_(self) + , buffers_(buffers) + { + } + + /// @pre Not yet awaited (no active stop callback). + awaitable(awaitable&& o) noexcept + : self_(o.self_) + , buffers_(o.buffers_) + , cont_(o.cont_) + , ex_(o.ex_) + , side_(o.side_) + , claimed_(o.claimed_.load(std::memory_order_relaxed)) + , canceled_(o.canceled_) + , stop_cb_active_(std::exchange(o.stop_cb_active_, false)) + { + } + + ~awaitable() + { + if(stop_cb_active_) + stop_cb_.~stop_cb_t(); + // Unlink from the side if still parked (e.g. the + // coroutine was destroyed while suspended), so a later + // peer wake does not dereference a freed claim flag. + if(side_ && side_->pending_claimed == &claimed_) + { + side_->pending_cont_.h = {}; + side_->pending_ex = {}; + side_->pending_claimed = nullptr; + } + } + + awaitable(awaitable const&) = delete; + awaitable& operator=(awaitable const&) = delete; + awaitable& operator=(awaitable&&) = delete; + bool await_ready() const noexcept { if(buffer_empty(buffers_)) @@ -229,19 +331,54 @@ class stream std::coroutine_handle<> h, io_env const* env) noexcept { + // Park the continuation, then register the stop callback. + // If stop is already requested, the callback fires inline + // during construction: it claims the resume and posts the + // continuation through the executor (never a symmetric + // self-transfer, which would leak this frame under + // run_async). The parked read is then resumed with + // error::canceled by the run loop. auto& side = self_->state_->sides[ self_->index_]; + cont_.h = h; + ex_ = env->executor; + side_ = &side; side.pending_cont_.h = h; side.pending_ex = env->executor; + side.pending_claimed = &claimed_; + + ::new(static_cast(&stop_cb_)) stop_cb_t( + env->stop_token, cancel_fn{this}); + stop_cb_active_ = true; + return std::noop_coroutine(); } io_result await_resume() { + if(stop_cb_active_) + { + stop_cb_.~stop_cb_t(); + stop_cb_active_ = false; + } + if(buffer_empty(buffers_)) return {{}, 0}; + if(canceled_) + { + // The stop callback posted us but left the side + // untouched; unlink if a peer wake has not already. + if(side_ && side_->pending_claimed == &claimed_) + { + side_->pending_cont_.h = {}; + side_->pending_ex = {}; + side_->pending_claimed = nullptr; + } + return {error::canceled, 0}; + } + auto* st = self_->state_.get(); auto& side = st->sides[ self_->index_]; @@ -287,6 +424,12 @@ class stream @return An awaitable that await-returns `(error_code,std::size_t)`. + @par Cancellation + If the environment's stop token has been requested, the write + completes immediately with `error::canceled` and transfers no + data. An empty buffer sequence is a no-op that completes + successfully regardless of the stop token. + @see fuse, close */ template @@ -297,13 +440,20 @@ class stream { stream* self_; CB buffers_; + bool canceled_ = false; - bool await_ready() const noexcept { return true; } + bool await_ready() const noexcept { return false; } - void await_suspend( + // The write completes synchronously; await_suspend is only + // used to observe the environment's stop token. Returning + // false means the coroutine does not actually suspend. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result @@ -313,6 +463,9 @@ class stream if(n == 0) return {{}, 0}; + if(canceled_) + return {error::canceled, 0}; + auto* st = self_->state_.get(); if(st->closed) @@ -333,12 +486,7 @@ class stream side.buf.data() + old_size, n), buffers_, n); - if(side.pending_cont_.h) - { - side.pending_ex.post(side.pending_cont_); - side.pending_cont_.h = {}; - side.pending_ex = {}; - } + state::wake(side); return {{}, n}; } @@ -363,12 +511,7 @@ class stream int peer = 1 - index_; auto& side = state_->sides[peer]; side.buf.append(sv); - if(side.pending_cont_.h) - { - side.pending_ex.post(side.pending_cont_); - side.pending_cont_.h = {}; - side.pending_ex = {}; - } + state::wake(side); } /** Read from this stream and verify the content. diff --git a/include/boost/capy/test/write_sink.hpp b/include/boost/capy/test/write_sink.hpp index 319342692..87a18cb55 100644 --- a/include/boost/capy/test/write_sink.hpp +++ b/include/boost/capy/test/write_sink.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -21,7 +22,6 @@ #include #include -#include #include #include @@ -157,6 +157,12 @@ class write_sink @return An awaitable that await-returns `(error_code,std::size_t)`. + @par Cancellation + If the environment's stop token has been requested, the write + completes immediately with `error::canceled` and transfers no + data. An empty buffer sequence is a no-op that completes + successfully regardless of the stop token. + @see fuse */ template @@ -167,13 +173,23 @@ class write_sink { write_sink* self_; CB buffers_; - - bool await_ready() const noexcept { return true; } - - void await_suspend( + bool canceled_ = false; + + bool await_ready() const noexcept { return false; } + + // The operation completes synchronously, but await_suspend is + // the only place io_env is delivered (the promise's + // transform_awaiter forwards it here). Returning false means + // the coroutine does not actually suspend; it resumes + // immediately, having observed the stop token. See io_env, + // IoAwaitable. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result @@ -182,6 +198,9 @@ class write_sink if(buffer_empty(buffers_)) return {{}, 0}; + if(canceled_) + return {error::canceled, 0}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, 0}; @@ -218,6 +237,11 @@ class write_sink @return An awaitable that await-returns `(error_code,std::size_t)`. + @par Cancellation + If the environment's stop token has been requested, the write + completes immediately with `error::canceled` and transfers no + data. + @see fuse */ template @@ -228,18 +252,27 @@ class write_sink { write_sink* self_; CB buffers_; + bool canceled_ = false; - bool await_ready() const noexcept { return true; } + bool await_ready() const noexcept { return false; } - void await_suspend( + // Reads the stop token without suspending; see the comment + // on write_some() for details. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result await_resume() { + if(canceled_) + return {error::canceled, 0}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, 0}; @@ -279,6 +312,11 @@ class write_sink @par Exception Safety No-throw guarantee. + @par Cancellation + If the environment's stop token has been requested, the operation + completes immediately with `error::canceled`, transfers no data, + and does not signal end-of-stream. + @param buffers The const buffer sequence containing data to write. @return An awaitable that await-returns `(error_code,std::size_t)`. @@ -293,18 +331,27 @@ class write_sink { write_sink* self_; CB buffers_; + bool canceled_ = false; - bool await_ready() const noexcept { return true; } + bool await_ready() const noexcept { return false; } - void await_suspend( + // Reads the stop token without suspending; see the comment + // on write_some() for details. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result await_resume() { + if(canceled_) + return {error::canceled, 0}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, 0}; @@ -343,6 +390,11 @@ class write_sink @par Exception Safety No-throw guarantee. + @par Cancellation + If the environment's stop token has been requested, the operation + completes immediately with `error::canceled` and does not signal + end-of-stream. + @return An awaitable that await-returns `(error_code)`. @see fuse @@ -353,21 +405,27 @@ class write_sink struct awaitable { write_sink* self_; + bool canceled_ = false; - bool await_ready() const noexcept { return true; } + bool await_ready() const noexcept { return false; } - // This method is required to satisfy Capy's IoAwaitable concept, - // but is never called because await_ready() returns true. - // See the comment on write(CB buffers) for a detailed explanation. - void await_suspend( + // Reads the stop token without suspending; see the comment + // on write_some() for details. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result<> await_resume() { + if(canceled_) + return {error::canceled}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec}; diff --git a/include/boost/capy/test/write_stream.hpp b/include/boost/capy/test/write_stream.hpp index 6e36abd3a..2e8f32312 100644 --- a/include/boost/capy/test/write_stream.hpp +++ b/include/boost/capy/test/write_stream.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Michael Vandeberg // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -142,6 +143,12 @@ class write_stream @par Exception Safety No-throw guarantee. + @par Cancellation + If the environment's stop token has been requested, the write + completes immediately with `error::canceled` and transfers no + data. An empty buffer sequence is a no-op that completes + successfully regardless of the stop token. + @param buffers The const buffer sequence containing data to write. @return An awaitable that await-returns `(error_code,std::size_t)`. @@ -156,24 +163,23 @@ class write_stream { write_stream* self_; CB buffers_; - - bool await_ready() const noexcept { return true; } - - // This method is required to satisfy Capy's IoAwaitable concept, - // but is never called because await_ready() returns true. - // - // Capy uses a two-layer awaitable system: the promise's - // await_transform wraps awaitables in a transform_awaiter whose - // standard await_suspend(coroutine_handle) calls this custom - // 2-argument overload, passing the io_env from the coroutine's - // context. For synchronous test awaitables like this one, the - // coroutine never suspends, so this is not invoked. The signature - // exists to allow the same awaitable type to work with both - // synchronous (test) and asynchronous (real I/O) code. - void await_suspend( + bool canceled_ = false; + + bool await_ready() const noexcept { return false; } + + // The operation completes synchronously, but await_suspend is + // the only place io_env is delivered (the promise's + // transform_awaiter forwards it here). Returning false means + // the coroutine does not actually suspend; it resumes + // immediately, having observed the stop token. See io_env, + // IoAwaitable. + bool + await_suspend( std::coroutine_handle<>, - io_env const*) const noexcept + io_env const* env) noexcept { + canceled_ = env->stop_token.stop_requested(); + return false; } io_result @@ -182,6 +188,9 @@ class write_stream if(buffer_empty(buffers_)) return {{}, 0}; + if(canceled_) + return {error::canceled, 0}; + auto ec = self_->f_.maybe_fail(); if(ec) return {ec, 0}; diff --git a/test/unit/io/any_read_source.cpp b/test/unit/io/any_read_source.cpp index b178edb47..247d0d9e7 100644 --- a/test/unit/io/any_read_source.cpp +++ b/test/unit/io/any_read_source.cpp @@ -25,7 +25,6 @@ #include #include -#include #include namespace boost { diff --git a/test/unit/io/any_write_sink.cpp b/test/unit/io/any_write_sink.cpp index 6d3b0852b..1b0f09034 100644 --- a/test/unit/io/any_write_sink.cpp +++ b/test/unit/io/any_write_sink.cpp @@ -24,7 +24,6 @@ #include #include -#include #include #include diff --git a/test/unit/io/any_write_stream.cpp b/test/unit/io/any_write_stream.cpp index 484b43af4..c75aaa278 100644 --- a/test/unit/io/any_write_stream.cpp +++ b/test/unit/io/any_write_stream.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include diff --git a/test/unit/test/buffer_sink.cpp b/test/unit/test/buffer_sink.cpp index de0381648..34cc1c7ae 100644 --- a/test/unit/test/buffer_sink.cpp +++ b/test/unit/test/buffer_sink.cpp @@ -12,12 +12,14 @@ #include #include +#include #include #include "test/unit/test_helpers.hpp" #include #include +#include #include namespace boost { @@ -280,6 +282,53 @@ class buffer_sink_test BOOST_TEST(eof_success_count > 0); } + void + testCommitCanceled() + { + // commit awaited with an already-requested stop token returns + // error::canceled and commits nothing. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + buffer_sink bs; + mutable_buffer arr[4]; + auto bufs = bs.prepare(arr); + std::memcpy(bufs[0].data(), "hello", 5); + + auto [ec] = co_await bs.commit(5); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(bs.size(), 0u); + }()); + BOOST_TEST(ran); + } + + void + testCommitEofCanceled() + { + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + buffer_sink bs; + mutable_buffer arr[4]; + auto bufs = bs.prepare(arr); + std::memcpy(bufs[0].data(), "hello", 5); + + auto [ec] = co_await bs.commit_eof(5); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(bs.size(), 0u); + BOOST_TEST(! bs.eof_called()); + }()); + BOOST_TEST(ran); + } + void run() { @@ -294,6 +343,8 @@ class buffer_sink_test testClear(); testFuseErrorInjectionCommit(); testFuseErrorInjectionCommitEof(); + testCommitCanceled(); + testCommitEofCanceled(); } }; diff --git a/test/unit/test/buffer_source.cpp b/test/unit/test/buffer_source.cpp index 398c055cf..86fba9166 100644 --- a/test/unit/test/buffer_source.cpp +++ b/test/unit/test/buffer_source.cpp @@ -19,6 +19,7 @@ #include "test/unit/test_helpers.hpp" #include +#include #include namespace boost { @@ -295,6 +296,29 @@ class buffer_source_test BOOST_TEST(r.success); } + void + testPullCanceled() + { + // pull awaited with an already-requested stop token returns + // error::canceled and an empty buffer span. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + buffer_source bs; + bs.provide("hello world"); + + const_buffer arr[4]; + auto [ec, bufs] = co_await bs.pull(arr); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST(bufs.empty()); + }()); + BOOST_TEST(ran); + } + void run() { @@ -310,6 +334,7 @@ class buffer_source_test testMaxPullSizeMultiple(); testFuseErrorInjection(); testClearAndReuse(); + testPullCanceled(); } }; diff --git a/test/unit/test/read_source.cpp b/test/unit/test/read_source.cpp index b96324714..b161a4b21 100644 --- a/test/unit/test/read_source.cpp +++ b/test/unit/test/read_source.cpp @@ -18,6 +18,7 @@ #include "test/unit/test_helpers.hpp" #include +#include #include namespace boost { @@ -496,6 +497,50 @@ class read_source_test BOOST_TEST(read_success_count > 0); } + void + testReadCanceled() + { + // read awaited with an already-requested stop token returns + // error::canceled instead of data. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + read_source rs; + rs.provide("hello world"); + + char buf[32] = {}; + auto [ec, n] = co_await rs.read(make_buffer(buf)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + }()); + BOOST_TEST(ran); + } + + void + testReadSomeCanceled() + { + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + read_source rs; + rs.provide("hello world"); + + char buf[32] = {}; + auto [ec, n] = co_await rs.read_some(make_buffer(buf)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + }()); + BOOST_TEST(ran); + } + void run() { @@ -523,6 +568,9 @@ class read_source_test testReadSomeMaxReadSize(); testReadSomeBufferSequence(); testReadSomeFuseErrorInjection(); + + testReadCanceled(); + testReadSomeCanceled(); } }; diff --git a/test/unit/test/read_stream.cpp b/test/unit/test/read_stream.cpp index 5e3f7dcc5..b3b59ddec 100644 --- a/test/unit/test/read_stream.cpp +++ b/test/unit/test/read_stream.cpp @@ -19,6 +19,7 @@ #include "test/unit/test_helpers.hpp" #include +#include #include namespace boost { @@ -346,6 +347,29 @@ class read_stream_test BOOST_TEST(r.success); } + void + testReadSomeCanceled() + { + // A read_some awaited with a stop token that is already + // requested returns error::canceled instead of data. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + read_stream rs; + rs.provide("hello world"); + + char buf[32] = {}; + auto [ec, n] = co_await rs.read_some(make_buffer(buf)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + }()); + BOOST_TEST(ran); + } + void run() { @@ -364,6 +388,7 @@ class read_stream_test testClearAndReuse(); testMaxReadSize(); testMaxReadSizeMultiple(); + testReadSomeCanceled(); } }; diff --git a/test/unit/test/stream.cpp b/test/unit/test/stream.cpp index d2e666a1a..4494b55b8 100644 --- a/test/unit/test/stream.cpp +++ b/test/unit/test/stream.cpp @@ -15,6 +15,11 @@ #include #include #include +#include +#include +#include +#include +#include #include #include #include @@ -23,7 +28,11 @@ #include "test/unit/test_helpers.hpp" #include +#include +#include +#include #include +#include namespace boost { namespace capy { @@ -658,6 +667,205 @@ class stream_test BOOST_TEST(write_success_count > 0); } + //-------------------------------------------- + // + // Cancellation + // + //-------------------------------------------- + + void + testReadSomeCancellationWhileSuspended() + { + // A read that suspends waiting for the peer is resumed with + // error::canceled when the environment's stop token fires. + // Driven manually on a queuing executor for deterministic, + // non-blocking control (mirrors async_mutex's cancellation test). + auto [a, b] = make_stream_pair(); + std::queue> q; + queuing_executor ex(q); + std::stop_source ss; + + std::error_code reader_ec; + bool reader_done = false; + + auto reader = [](stream& s, + std::error_code& out_ec, bool& done) -> task<> + { + char buf[32] = {}; + auto [ec, n] = co_await s.read_some(make_buffer(buf)); + out_ec = ec; + (void)n; + done = true; + }(b, reader_ec, reader_done); + + auto h = reader.handle(); + reader.release(); + io_env env; + env.executor = executor_ref(ex); + env.stop_token = ss.get_token(); + h.promise().set_environment(&env); + + // No data available: the read suspends. + h.resume(); + BOOST_TEST(! reader_done); + + // Requesting stop posts the continuation through the executor. + ss.request_stop(); + BOOST_TEST(! q.empty()); + + q.front().resume(); + q.pop(); + + BOOST_TEST(reader_done); + BOOST_TEST(reader_ec == make_error_code(error::canceled)); + + h.destroy(); + } + + void + testReadSomeStopRequestedBeforeSuspend() + { + // A read that would block (no data) but whose stop token is + // already requested resolves to error::canceled without parking. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + auto [a, b] = make_stream_pair(); + // No data provided: the read would otherwise suspend. + char buf[32] = {}; + auto [ec, n] = co_await b.read_some(make_buffer(buf)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + }()); + BOOST_TEST(ran); + } + + void + testReadSomeDestroyWhileSuspended() + { + // Destroying a coroutine while its read is parked must tear down + // the stop callback and unlink the pending slot, so a later peer + // operation does not touch a freed claim flag. + auto [a, b] = make_stream_pair(); + std::queue> q; + queuing_executor ex(q); + std::stop_source ss; + + auto reader = [](stream& s) -> task<> + { + char buf[32] = {}; + auto [ec, n] = co_await s.read_some(make_buffer(buf)); + (void)ec; + (void)n; + }(b); + + auto h = reader.handle(); + reader.release(); + io_env env; + env.executor = executor_ref(ex); + env.stop_token = ss.get_token(); + h.promise().set_environment(&env); + + h.resume(); + BOOST_TEST(q.empty()); + + // Destroy the suspended reader; the awaitable destructor runs. + h.destroy(); + + // The pending slot is unlinked: a subsequent peer write finds no + // parked reader and does not dereference the freed claim flag. + a.provide("late"); + BOOST_TEST(true); + } + + void + testReadSomeCancellationCrossThread() + { + // A read suspended on one thread is cancelled by a stop request + // issued from another, exercising the stop-callback path under + // real concurrency (validated by TSan). Only the stop callback + // crosses threads; the stream itself is touched solely by the + // pool thread, honoring its single-threaded contract. + thread_pool pool(1); + std::latch done(1); + std::latch suspended(1); + std::stop_source ss; + + auto [a, b] = make_stream_pair(); + std::error_code reader_ec; + + auto reader = [&]() -> task<> + { + char buf[32] = {}; + suspended.count_down(); + auto [ec, n] = co_await b.read_some(make_buffer(buf)); + reader_ec = ec; + (void)n; + }; + + run_async(pool.get_executor(), ss.get_token(), + [&]() { done.count_down(); }, + [&](std::exception_ptr) { done.count_down(); })(reader()); + + suspended.wait(); + // Give await_suspend time to register the stop callback. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + ss.request_stop(); + + done.wait(); + BOOST_TEST(reader_ec == make_error_code(error::canceled)); + } + + void + testReadSomeStopWithDataAvailable() + { + // When data is already buffered, a read completes normally even + // if the stop token is set: cancellation only applies to a read + // that would otherwise block. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + auto [a, b] = make_stream_pair(); + a.provide("hello"); + + char buf[32] = {}; + auto [ec, n] = co_await b.read_some(make_buffer(buf)); + ran = true; + BOOST_TEST(! ec); + BOOST_TEST_EQ(n, 5u); + BOOST_TEST_EQ(std::string_view(buf, n), "hello"); + }()); + BOOST_TEST(ran); + } + + void + testWriteSomeCancellation() + { + // write_some never blocks, so it honors the stop token up front: + // an already-requested stop yields error::canceled. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + auto [a, b] = make_stream_pair(); + auto [ec, n] = co_await a.write_some( + const_buffer("hi", 2)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + }()); + BOOST_TEST(ran); + } + void run() { @@ -689,6 +897,14 @@ class stream_test testLoopback(); testFuseReadErrorInjection(); testFuseWriteErrorInjection(); + + // Cancellation + testReadSomeCancellationWhileSuspended(); + testReadSomeStopRequestedBeforeSuspend(); + testReadSomeDestroyWhileSuspended(); + testReadSomeCancellationCrossThread(); + testReadSomeStopWithDataAvailable(); + testWriteSomeCancellation(); } }; diff --git a/test/unit/test/write_sink.cpp b/test/unit/test/write_sink.cpp index 0009a7623..e1102f596 100644 --- a/test/unit/test/write_sink.cpp +++ b/test/unit/test/write_sink.cpp @@ -12,11 +12,13 @@ #include #include +#include #include #include "test/unit/test_helpers.hpp" #include +#include #include namespace boost { @@ -520,6 +522,87 @@ class write_sink_test BOOST_TEST(write_success_count > 0); } + void + testWriteCanceled() + { + // Each write op awaited with an already-requested stop token + // returns error::canceled and has no effect. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + write_sink ws; + auto [ec, n] = co_await ws.write( + const_buffer("hello", 5)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + BOOST_TEST_EQ(ws.size(), 0u); + }()); + BOOST_TEST(ran); + } + + void + testWriteSomeCanceled() + { + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + write_sink ws; + auto [ec, n] = co_await ws.write_some( + const_buffer("hello", 5)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + BOOST_TEST_EQ(ws.size(), 0u); + }()); + BOOST_TEST(ran); + } + + void + testWriteEofWithBuffersCanceled() + { + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + write_sink ws; + auto [ec, n] = co_await ws.write_eof( + const_buffer("hello", 5)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + BOOST_TEST_EQ(ws.size(), 0u); + BOOST_TEST(! ws.eof_called()); + }()); + BOOST_TEST(ran); + } + + void + testWriteEofCanceled() + { + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + write_sink ws; + auto [ec] = co_await ws.write_eof(); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST(! ws.eof_called()); + }()); + BOOST_TEST(ran); + } + void run() { @@ -549,6 +632,11 @@ class write_sink_test testWriteSomeBufferSequence(); testWriteSomeMaxWriteSize(); testWriteSomeFuseErrorInjection(); + + testWriteCanceled(); + testWriteSomeCanceled(); + testWriteEofWithBuffersCanceled(); + testWriteEofCanceled(); } }; diff --git a/test/unit/test/write_stream.cpp b/test/unit/test/write_stream.cpp index 7c2249ec1..a426661d5 100644 --- a/test/unit/test/write_stream.cpp +++ b/test/unit/test/write_stream.cpp @@ -13,11 +13,13 @@ #include #include #include +#include #include #include "test/unit/test_helpers.hpp" #include +#include #include namespace boost { @@ -320,6 +322,28 @@ class write_stream_test BOOST_TEST(r.success); } + void + testWriteSomeCanceled() + { + // write_some awaited with an already-requested stop token + // returns error::canceled and writes nothing. + std::stop_source ss; + ss.request_stop(); + bool ran = false; + run_blocking(ss.get_token())( + [&]() -> task<> + { + write_stream ws; + auto [ec, n] = co_await ws.write_some( + const_buffer("hello", 5)); + ran = true; + BOOST_TEST(ec == cond::canceled); + BOOST_TEST_EQ(n, 0u); + BOOST_TEST_EQ(ws.size(), 0u); + }()); + BOOST_TEST(ran); + } + void run() { @@ -337,6 +361,7 @@ class write_stream_test testExpectExcessData(); testMaxWriteSize(); testMaxWriteSizeMultiple(); + testWriteSomeCanceled(); } };