Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 131 additions & 28 deletions google/cloud/storage/internal/async/writer_connection_buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class AsyncWriterConnectionBufferedState
buffer_size_hwm_(buffer_size_hwm),
impl_(std::move(impl)) {
finalized_future_ = finalized_.get_future();
closed_future_ = closed_.get_future();
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
SetFinalized(std::unique_lock<std::mutex>(mu_),
Expand Down Expand Up @@ -121,6 +122,19 @@ class AsyncWriterConnectionBufferedState
return std::move(finalized_future_);
}

future<Status> Close(storage::WritePayload const& p) {
std::unique_lock<std::mutex> lk(mu_);
if (close_ || closed_promise_completed_) {
return make_ready_future(internal::FailedPreconditionError(
"Close() already called", GCP_ERROR_INFO()));
}
resend_buffer_.Append(WritePayloadImpl::GetImpl(p));
close_ = true;
// Force flush to drain the buffer first.
HandleNewData(std::move(lk), true);
return std::move(closed_future_);
}
Comment thread
kalragauri marked this conversation as resolved.

future<Status> Flush(storage::WritePayload const& p) {
std::unique_lock<std::mutex> lk(mu_);
// Create a new promise for this flush operation.
Expand Down Expand Up @@ -207,6 +221,10 @@ class AsyncWriterConnectionBufferedState
// FinalizeStep will set the finalizing_ flag.
return FinalizeStep(std::move(lk));
}
if (close_ && !closing_) {
// CloseStep will set the closing_ flag.
return CloseStep(std::move(lk));
}
Comment thread
kalragauri marked this conversation as resolved.
// If not finalizing, check if an empty flush is needed.
if (flush_) {
// Pass empty payload to FlushStep
Expand Down Expand Up @@ -239,6 +257,22 @@ class AsyncWriterConnectionBufferedState
SetFinalized(std::unique_lock<std::mutex>(mu_), std::move(result));
}

void CloseStep(std::unique_lock<std::mutex> lk) {
if (closing_) return;
closing_ = true;
auto impl = Impl(lk);
lk.unlock();
(void)impl->Close(storage::WritePayload{})
.then([w = WeakFromThis()](auto f) {
if (auto self = w.lock()) return self->OnClose(f.get());
});
}

void OnClose(Status const& result) {
if (!result.ok()) return Resume(std::move(result));
SetClosed(std::unique_lock<std::mutex>(mu_), std::move(result));
}

void FlushStep(std::unique_lock<std::mutex> lk, absl::Cord payload) {
auto impl = Impl(lk);
lk.unlock();
Expand Down Expand Up @@ -343,40 +377,41 @@ class AsyncWriterConnectionBufferedState
}

void Resume(Status const& s) {
// Capture the finalization state *before* starting the async resume.
// Capture the finalization and close state *before* starting the async
// resume.
bool was_finalizing;
bool was_closing;
{
std::unique_lock<std::mutex> lk(mu_);
was_finalizing = finalizing_;
was_closing = closing_;
if (!s.ok() && cancelled_) {
return SetError(std::move(lk), std::move(s));
}
}
// Pass the original status `s` and `was_finalizing` to the callback.
factory_().then([s, was_finalizing, w = WeakFromThis()](auto f) {
if (auto self = w.lock())
return self->OnResume(s, was_finalizing, f.get());
});
// Pass the original status `s`, `was_finalizing`, and `was_closing` to the
// callback.
factory_().then(
[s, was_finalizing, was_closing, w = WeakFromThis()](auto f) {
if (auto self = w.lock())
return self->OnResume(s, was_finalizing, was_closing, f.get());
});
}

void OnResume(
Status const& original_status, bool was_finalizing,
Status const& original_status, bool was_finalizing, bool was_closing,
StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> impl) {
std::unique_lock<std::mutex> lk(mu_);

// Resume was *not* triggered by finalization failure.
// Resume was *not* triggered by finalization or close failure.
if (!impl) return SetError(std::move(lk), std::move(impl).status());
// On successful resume, immediately update the active connection.
impl_ = std::move(*impl);

auto state = impl_->PersistedState();
if (was_finalizing) {
// If resuming due to a finalization error, we *must* complete the
// finalized_ promise now, based on the resume attempt's outcome.
// The resume attempt itself failed. Use that error.
if (!impl) return SetError(std::move(lk), std::move(impl).status());

// Resume attempt succeeded, check the persisted state.
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
// Resume found the object is finalized. Success.
return SetFinalized(
Expand All @@ -391,7 +426,21 @@ class AsyncWriterConnectionBufferedState
return SetError(std::move(lk), std::move(original_status));
}

auto state = impl_->PersistedState();
if (was_closing) {
// If resuming due to a close error, we must complete the
// closed_ promise now, based on the resume attempt's outcome.
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
// Resume found the object is finalized (which implies closed). Success.
return SetClosed(std::move(lk), Status{});
}
// Resume succeeded, but the object is still not finalized/closed.
// This means the original close attempt failed permanently.
// Use the original status that triggered the resume. Reset closing_
// before setting the error, as the attempt is now over.
closing_ = false;
return SetError(std::move(lk), std::move(original_status));
}

if (absl::holds_alternative<google::storage::v2::Object>(state)) {
// Found finalized object (maybe finalized concurrently or resumed).
return SetFinalized(std::move(lk), absl::get<google::storage::v2::Object>(
Expand Down Expand Up @@ -432,6 +481,29 @@ class AsyncWriterConnectionBufferedState
p.set_value(std::move(object)); // Set value on the moved promise
}

void SetClosed(std::unique_lock<std::mutex> lk, Status const& status) {
resend_buffer_.Clear();
writing_ = false;
close_ = false;
closing_ = false;
flush_ = false;
// Check if the promise has already been completed.
if (closed_promise_completed_) {
return;
}
// Mark the promise as completed before moving it.
closed_promise_completed_ = true;
auto handlers = ClearHandlers(lk);
// Also clear any pending flush promises on success.
auto pending_flushes = std::move(pending_flush_promises_);
auto p = std::move(closed_); // Move the member promise.
lk.unlock();
// Notify handlers and pending flushes after releasing the lock.
for (auto& h : handlers) h->Execute(status);
for (auto& pf : pending_flushes) pf.set_value(status);
p.set_value(status); // Set value on the moved promise.
}

void SetFlushed(std::unique_lock<std::mutex> lk, Status const& result) {
if (!result.ok()) return SetError(std::move(lk), std::move(result));
flush_ = false; // Reset flush flag; WriteLoop may set it again.
Expand Down Expand Up @@ -465,27 +537,32 @@ class AsyncWriterConnectionBufferedState
writing_ = false;
finalize_ = false;
finalizing_ = false; // Reset finalizing flag
close_ = false;
closing_ = false; // Reset closing flag
flush_ = false;

// Always clear handlers and pending flushes on error.
auto handlers = ClearHandlers(lk);
auto pending_flushes = std::move(pending_flush_promises_);

// Check if the finalized promise has already been completed.
if (finalized_promise_completed_) {
// Finalized promise already set, just notify handlers and pending
// flushes.
lk.unlock(); // Release lock before notifying
for (auto& h : handlers) h->Execute(status);
for (auto& pf : pending_flushes) pf.set_value(status);
return;
bool complete_finalized = false;
promise<StatusOr<google::storage::v2::Object>> finalized_to_complete;
if (!finalized_promise_completed_) {
finalized_promise_completed_ = true;
finalized_to_complete = std::move(finalized_);
complete_finalized = true;
}

// Check if the closed promise has already been completed.
bool complete_closed = false;
promise<Status> closed_to_complete;
if (!closed_promise_completed_) {
closed_promise_completed_ = true;
closed_to_complete = std::move(closed_);
complete_closed = true;
}

// Mark the finalized promise as completed *before* moving it under the
// lock.
finalized_promise_completed_ = true;
// Move the finalized promise.
auto p = std::move(finalized_);
lk.unlock(); // Release lock before notifying

// Notify handlers first.
Expand All @@ -494,8 +571,13 @@ class AsyncWriterConnectionBufferedState
for (auto& pf : pending_flushes) {
pf.set_value(status);
}
// Set error on the moved finalized promise *once*.
p.set_value(status);
// Set error on the moved promises *once*.
if (complete_finalized) {
finalized_to_complete.set_value(status);
}
if (complete_closed) {
closed_to_complete.set_value(status);
}
}

std::shared_ptr<storage::AsyncWriterConnection> Impl(
Expand Down Expand Up @@ -540,6 +622,14 @@ class AsyncWriterConnectionBufferedState
// finalized_.
future<StatusOr<google::storage::v2::Object>> finalized_future_;

// The result of calling `Close()`. Note that only one such call is ever
// made.
promise<Status> closed_;

// Retrieve the future in the constructor, as some operations reset
// closed_.
future<Status> closed_future_;

// Queue of promises for outstanding Flush() calls.
std::deque<promise<Status>> pending_flush_promises_;

Expand All @@ -557,6 +647,15 @@ class AsyncWriterConnectionBufferedState
// If true, all the data to finalize an upload is in `resend_buffer_`.
bool finalize_ = false;

// If true, all the data to close an upload is in `resend_buffer_`.
bool close_ = false;

// True if CloseStep has been initiated. Prevents re-entry.
bool closing_ = false;

// Tracks if the final promise (`closed_`) has been completed.
bool closed_promise_completed_ = false;

// If true, all data should be uploaded with `Flush()`.
bool flush_ = false;

Expand Down Expand Up @@ -659,6 +758,10 @@ class AsyncWriterConnectionBuffered : public storage::AsyncWriterConnection {
return state_->Flush(std::move(p));
}

future<Status> Close(storage::WritePayload p) override {
return state_->Close(std::move(p));
}

future<StatusOr<std::int64_t>> Query() override { return state_->Query(); }

RpcMetadata GetRequestMetadata() override {
Expand Down
Loading
Loading