From bce2779a370f323cbdfef36b5ef9ecd4ec47d14b Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Tue, 9 Jun 2026 08:57:24 +0000 Subject: [PATCH 1/4] fix(storage): Add Close() support to AsyncWriterConnectionBuffered --- .../async/writer_connection_buffered.cc | 155 +++++++++++--- .../async/writer_connection_buffered_test.cc | 197 ++++++++++++++++++ 2 files changed, 324 insertions(+), 28 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_buffered.cc b/google/cloud/storage/internal/async/writer_connection_buffered.cc index ec179e207b516..8e1499dd49d43 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered.cc @@ -72,6 +72,7 @@ class AsyncWriterConnectionBufferedState buffer_size_hwm_(buffer_size_hwm), impl_(std::move(impl)) { finalized_future_ = finalized_.get_future(); + closed_future_ = closed_.get_future(); auto state = impl_->PersistedState(); if (absl::holds_alternative(state)) { SetFinalized(std::unique_lock(mu_), @@ -121,6 +122,15 @@ class AsyncWriterConnectionBufferedState return std::move(finalized_future_); } + future Close(storage::WritePayload const& p) { + std::unique_lock lk(mu_); + resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); + close_ = true; + // Force flush to drain the buffer first. + HandleNewData(std::move(lk), true); + return std::move(closed_future_); + } + future Flush(storage::WritePayload const& p) { std::unique_lock lk(mu_); // Create a new promise for this flush operation. @@ -207,6 +217,10 @@ class AsyncWriterConnectionBufferedState // FinalizeStep will set the finalizing_ flag. return FinalizeStep(std::move(lk)); } + if (close_ && !closing_) { + // CloseStep will set the closing_ flag. + return CloseStep(std::move(lk)); + } // If not finalizing, check if an empty flush is needed. if (flush_) { // Pass empty payload to FlushStep @@ -239,6 +253,22 @@ class AsyncWriterConnectionBufferedState SetFinalized(std::unique_lock(mu_), std::move(result)); } + void CloseStep(std::unique_lock lk) { + if (closing_) return; + closing_ = true; + auto impl = Impl(lk); + lk.unlock(); + (void)impl->Close(storage::WritePayload{}) + .then([w = WeakFromThis()](auto f) { + if (auto self = w.lock()) return self->OnClose(f.get()); + }); + } + + void OnClose(Status result) { + if (!result.ok()) return Resume(std::move(result)); + SetClosed(std::unique_lock(mu_), std::move(result)); + } + void FlushStep(std::unique_lock lk, absl::Cord payload) { auto impl = Impl(lk); lk.unlock(); @@ -343,40 +373,41 @@ class AsyncWriterConnectionBufferedState } void Resume(Status const& s) { - // Capture the finalization state *before* starting the async resume. + // Capture the finalization and close state before starting the async + // resume. bool was_finalizing; + bool was_closing; { std::unique_lock lk(mu_); was_finalizing = finalizing_; + was_closing = closing_; if (!s.ok() && cancelled_) { return SetError(std::move(lk), std::move(s)); } } - // Pass the original status `s` and `was_finalizing` to the callback. - factory_().then([s, was_finalizing, w = WeakFromThis()](auto f) { - if (auto self = w.lock()) - return self->OnResume(s, was_finalizing, f.get()); - }); + // Pass the original status `s`, `was_finalizing`, and `was_closing` to the + // callback. + factory_().then( + [s, was_finalizing, was_closing, w = WeakFromThis()](auto f) { + if (auto self = w.lock()) + return self->OnResume(s, was_finalizing, was_closing, f.get()); + }); } void OnResume( - Status const& original_status, bool was_finalizing, + Status const& original_status, bool was_finalizing, bool was_closing, StatusOr> impl) { std::unique_lock lk(mu_); - // Resume was *not* triggered by finalization failure. + // Resume was not triggered by finalization or close failure. if (!impl) return SetError(std::move(lk), std::move(impl).status()); // On successful resume, immediately update the active connection. impl_ = std::move(*impl); + auto state = impl_->PersistedState(); if (was_finalizing) { // If resuming due to a finalization error, we *must* complete the // finalized_ promise now, based on the resume attempt's outcome. - // The resume attempt itself failed. Use that error. - if (!impl) return SetError(std::move(lk), std::move(impl).status()); - - // Resume attempt succeeded, check the persisted state. - auto state = impl_->PersistedState(); if (absl::holds_alternative(state)) { // Resume found the object is finalized. Success. return SetFinalized( @@ -391,7 +422,21 @@ class AsyncWriterConnectionBufferedState return SetError(std::move(lk), std::move(original_status)); } - auto state = impl_->PersistedState(); + if (was_closing) { + // If resuming due to a close error, we *must* complete the + // closed_ promise now, based on the resume attempt's outcome. + if (absl::holds_alternative(state)) { + // Resume found the object is finalized (which implies closed). Success. + return SetClosed(std::move(lk), Status{}); + } + // Resume succeeded, but the object is still not finalized/closed. + // This means the original close attempt failed permanently. + // Use the original status that triggered the resume. Reset closing_ + // before setting the error, as the attempt is now over. + closing_ = false; + return SetError(std::move(lk), std::move(original_status)); + } + if (absl::holds_alternative(state)) { // Found finalized object (maybe finalized concurrently or resumed). return SetFinalized(std::move(lk), absl::get( @@ -432,6 +477,29 @@ class AsyncWriterConnectionBufferedState p.set_value(std::move(object)); // Set value on the moved promise } + void SetClosed(std::unique_lock lk, Status const& status) { + resend_buffer_.Clear(); + writing_ = false; + close_ = false; + closing_ = false; + flush_ = false; + // Check if the promise has already been completed. + if (closed_promise_completed_) { + return; + } + // Mark the promise as completed before moving it. + closed_promise_completed_ = true; + auto handlers = ClearHandlers(lk); + // Also clear any pending flush promises on success. + auto pending_flushes = std::move(pending_flush_promises_); + auto p = std::move(closed_); // Move the member promise + lk.unlock(); + // Notify handlers and pending flushes after releasing the lock. + for (auto& h : handlers) h->Execute(status); + for (auto& pf : pending_flushes) pf.set_value(status); + p.set_value(status); // Set value on the moved promise. + } + void SetFlushed(std::unique_lock lk, Status const& result) { if (!result.ok()) return SetError(std::move(lk), std::move(result)); flush_ = false; // Reset flush flag; WriteLoop may set it again. @@ -465,6 +533,8 @@ class AsyncWriterConnectionBufferedState writing_ = false; finalize_ = false; finalizing_ = false; // Reset finalizing flag + close_ = false; + closing_ = false; // Reset closing flag flush_ = false; // Always clear handlers and pending flushes on error. @@ -472,20 +542,23 @@ class AsyncWriterConnectionBufferedState auto pending_flushes = std::move(pending_flush_promises_); // Check if the finalized promise has already been completed. - if (finalized_promise_completed_) { - // Finalized promise already set, just notify handlers and pending - // flushes. - lk.unlock(); // Release lock before notifying - for (auto& h : handlers) h->Execute(status); - for (auto& pf : pending_flushes) pf.set_value(status); - return; + bool complete_finalized = false; + promise> finalized_to_complete; + if (!finalized_promise_completed_) { + finalized_promise_completed_ = true; + finalized_to_complete = std::move(finalized_); + complete_finalized = true; + } + + // Check if the closed promise has already been completed. + bool complete_closed = false; + promise closed_to_complete; + if (!closed_promise_completed_) { + closed_promise_completed_ = true; + closed_to_complete = std::move(closed_); + complete_closed = true; } - // Mark the finalized promise as completed *before* moving it under the - // lock. - finalized_promise_completed_ = true; - // Move the finalized promise. - auto p = std::move(finalized_); lk.unlock(); // Release lock before notifying // Notify handlers first. @@ -494,8 +567,13 @@ class AsyncWriterConnectionBufferedState for (auto& pf : pending_flushes) { pf.set_value(status); } - // Set error on the moved finalized promise *once*. - p.set_value(status); + // Set error on the moved promises *once*. + if (complete_finalized) { + finalized_to_complete.set_value(status); + } + if (complete_closed) { + closed_to_complete.set_value(status); + } } std::shared_ptr Impl( @@ -540,6 +618,14 @@ class AsyncWriterConnectionBufferedState // finalized_. future> finalized_future_; + // The result of calling `Close()`. Note that only one such call is ever + // made. + promise closed_; + + // Retrieve the future in the constructor, as some operations reset + // closed_. + future closed_future_; + // Queue of promises for outstanding Flush() calls. std::deque> pending_flush_promises_; @@ -557,6 +643,15 @@ class AsyncWriterConnectionBufferedState // If true, all the data to finalize an upload is in `resend_buffer_`. bool finalize_ = false; + // If true, all the data to close an upload is in `resend_buffer_`. + bool close_ = false; + + // True if CloseStep has been initiated. Prevents re-entry. + bool closing_ = false; + + // Tracks if the final promise (`closed_`) has been completed. + bool closed_promise_completed_ = false; + // If true, all data should be uploaded with `Flush()`. bool flush_ = false; @@ -659,6 +754,10 @@ class AsyncWriterConnectionBuffered : public storage::AsyncWriterConnection { return state_->Flush(std::move(p)); } + future Close(storage::WritePayload p) override { + return state_->Close(std::move(p)); + } + future> Query() override { return state_->Query(); } RpcMetadata GetRequestMetadata() override { diff --git a/google/cloud/storage/internal/async/writer_connection_buffered_test.cc b/google/cloud/storage/internal/async/writer_connection_buffered_test.cc index ed01b4581a3f0..6c604541d0f23 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered_test.cc @@ -771,6 +771,203 @@ TEST(WriteConnectionBuffered, ErrorFailsPendingFlushes) { EXPECT_THAT(f2.get(), StatusIs(resume_error.code())); } +TEST(WriteConnectionBuffered, CloseEmpty) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillOnce(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillRepeatedly([&](auto) { + return sequencer.PushBack("Close").then([](auto f) -> Status { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).Times(0); + + auto connection = MakeWriterConnectionBuffered( + mock_factory.AsStdFunction(), std::move(mock), TestOptions()); + EXPECT_EQ(connection->UploadId(), "test-upload-id"); + EXPECT_THAT(connection->PersistedState(), VariantWith(0)); + + auto close = connection->Close({}); + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + EXPECT_STATUS_OK(close.get()); +} + +TEST(WriteConnectionBuffered, CloseWithPayload) { + AsyncSequencer sequencer; + + auto expected_write_size = [](std::size_t n) { + return ResultOf( + "payload size", [](auto payload) { return payload.size(); }, Eq(n)); + }; + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + // The payload is flushed first. + EXPECT_CALL(*mock, Flush(expected_write_size(8 * 1024))).WillOnce([&](auto) { + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); + }); + EXPECT_CALL(*mock, Query).WillOnce([&]() { + return sequencer.PushBack("Query").then([](auto) { + return make_status_or(static_cast(8 * 1024)); + }); + }); + // Then the stream is closed with empty payload. + EXPECT_CALL(*mock, Close(expected_write_size(0))).WillOnce([&](auto) { + return sequencer.PushBack("Close").then([](auto) { return Status{}; }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).Times(0); + + auto connection = MakeWriterConnectionBuffered( + mock_factory.AsStdFunction(), std::move(mock), TestOptions()); + + auto close = connection->Close(TestPayload(8 * 1024)); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Query"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + EXPECT_STATUS_OK(close.get()); +} + +TEST(WriteConnectionBuffered, CloseFailsAndResumeFails) { + AsyncSequencer sequencer; + auto resume_error = PermanentError(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([&](auto) { + return sequencer.PushBack("Close").then( + [](auto) { return TransientError(); }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).WillOnce([&]() { + return sequencer.PushBack("Retry").then([&](auto) { + return StatusOr>( + resume_error); + }); + }); + + auto connection = MakeWriterConnectionBuffered( + mock_factory.AsStdFunction(), std::move(mock), TestOptions()); + + auto close = connection->Close({}); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Retry"); + next.first.set_value(true); + + EXPECT_THAT(close.get(), StatusIs(resume_error.code())); +} + +TEST(WriteConnectionBuffered, CloseFailsAndResumeSucceedsButNotClosed) { + AsyncSequencer sequencer; + auto close_error = TransientError(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([&](auto) { + return sequencer.PushBack("Close").then( + [close_error](auto) { return close_error; }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).WillOnce([&]() { + return sequencer.PushBack("Resume").then([](auto) { + auto resumed_mock = std::make_unique(); + EXPECT_CALL(*resumed_mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + return make_status_or(std::unique_ptr( + std::move(resumed_mock))); + }); + }); + + auto connection = MakeWriterConnectionBuffered( + mock_factory.AsStdFunction(), std::move(mock), TestOptions()); + + auto close = connection->Close({}); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Resume"); + next.first.set_value(true); + + EXPECT_THAT(close.get(), StatusIs(close_error.code())); +} + +TEST(WriteConnectionBuffered, CloseFailsAndResumeSucceedsAndFinalized) { + AsyncSequencer sequencer; + auto close_error = TransientError(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([&](auto) { + return sequencer.PushBack("Close").then( + [close_error](auto) { return close_error; }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).WillOnce([&]() { + return sequencer.PushBack("Resume").then([](auto) { + auto resumed_mock = std::make_unique(); + EXPECT_CALL(*resumed_mock, PersistedState) + .WillRepeatedly(Return(TestObject())); + return make_status_or(std::unique_ptr( + std::move(resumed_mock))); + }); + }); + + auto connection = MakeWriterConnectionBuffered( + mock_factory.AsStdFunction(), std::move(mock), TestOptions()); + + auto close = connection->Close({}); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Resume"); + next.first.set_value(true); + + EXPECT_STATUS_OK(close.get()); +} + TEST(WriteConnectionBuffered, Query) { AsyncSequencer sequencer; From fe49ad5615a3f27ce3101afef7c86f7ddcd1bb97 Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Tue, 9 Jun 2026 09:46:49 +0000 Subject: [PATCH 2/4] Handle duplicate Close() calls --- .../async/writer_connection_buffered.cc | 12 +++++++---- .../async/writer_connection_buffered_test.cc | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_buffered.cc b/google/cloud/storage/internal/async/writer_connection_buffered.cc index 8e1499dd49d43..3451cadf1e9e4 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered.cc @@ -124,6 +124,10 @@ class AsyncWriterConnectionBufferedState future Close(storage::WritePayload const& p) { std::unique_lock lk(mu_); + if (close_ || closed_promise_completed_) { + return make_ready_future(internal::FailedPreconditionError( + "Close() already called", GCP_ERROR_INFO())); + } resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); close_ = true; // Force flush to drain the buffer first. @@ -373,7 +377,7 @@ class AsyncWriterConnectionBufferedState } void Resume(Status const& s) { - // Capture the finalization and close state before starting the async + // Capture the finalization and close state *before* starting the async // resume. bool was_finalizing; bool was_closing; @@ -399,7 +403,7 @@ class AsyncWriterConnectionBufferedState StatusOr> impl) { std::unique_lock lk(mu_); - // Resume was not triggered by finalization or close failure. + // Resume was *not* triggered by finalization or close failure. if (!impl) return SetError(std::move(lk), std::move(impl).status()); // On successful resume, immediately update the active connection. impl_ = std::move(*impl); @@ -423,7 +427,7 @@ class AsyncWriterConnectionBufferedState } if (was_closing) { - // If resuming due to a close error, we *must* complete the + // If resuming due to a close error, we must complete the // closed_ promise now, based on the resume attempt's outcome. if (absl::holds_alternative(state)) { // Resume found the object is finalized (which implies closed). Success. @@ -492,7 +496,7 @@ class AsyncWriterConnectionBufferedState auto handlers = ClearHandlers(lk); // Also clear any pending flush promises on success. auto pending_flushes = std::move(pending_flush_promises_); - auto p = std::move(closed_); // Move the member promise + auto p = std::move(closed_); // Move the member promise. lk.unlock(); // Notify handlers and pending flushes after releasing the lock. for (auto& h : handlers) h->Execute(status); diff --git a/google/cloud/storage/internal/async/writer_connection_buffered_test.cc b/google/cloud/storage/internal/async/writer_connection_buffered_test.cc index 6c604541d0f23..3cbe405fe8753 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered_test.cc @@ -798,6 +798,26 @@ TEST(WriteConnectionBuffered, CloseEmpty) { EXPECT_STATUS_OK(close.get()); } +TEST(WriteConnectionBuffered, DuplicateCloseFails) { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([](auto) { + return make_ready_future(Status{}); + }); + + MockFactory mock_factory; + auto connection = MakeWriterConnectionBuffered( + mock_factory.AsStdFunction(), std::move(mock), TestOptions()); + + auto close1 = connection->Close({}); + auto close2 = connection->Close({}); + + EXPECT_STATUS_OK(close1.get()); + EXPECT_THAT(close2.get(), StatusIs(StatusCode::kFailedPrecondition)); +} + TEST(WriteConnectionBuffered, CloseWithPayload) { AsyncSequencer sequencer; From baec59579fa356802cd7245d80b37026c7c71b2a Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Wed, 10 Jun 2026 08:24:07 +0000 Subject: [PATCH 3/4] Fix failing checks --- .../cloud/storage/internal/async/writer_connection_buffered.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/async/writer_connection_buffered.cc b/google/cloud/storage/internal/async/writer_connection_buffered.cc index 3451cadf1e9e4..c7d67b9866873 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered.cc @@ -268,7 +268,7 @@ class AsyncWriterConnectionBufferedState }); } - void OnClose(Status result) { + void OnClose(Status const& result) { if (!result.ok()) return Resume(std::move(result)); SetClosed(std::unique_lock(mu_), std::move(result)); } From a253210f652afc1ba14e7717a693bedd6db01266 Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Fri, 12 Jun 2026 04:29:15 +0000 Subject: [PATCH 4/4] retrigger checks