-
Notifications
You must be signed in to change notification settings - Fork 455
feat(storage): implement GCS overrun logging and resume prevention #16150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
45b2018
bfd9113
37a3cef
1682ace
9b4616b
bf89846
6b74031
c09eeec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| #include "google/cloud/grpc_error_delegate.h" | ||
| #include "google/cloud/internal/opentelemetry.h" | ||
| #include "google/rpc/status.pb.h" | ||
| #include <limits> | ||
| #include <memory> | ||
| #include <utility> | ||
|
|
||
|
|
@@ -165,8 +166,20 @@ std::unique_ptr<storage::AsyncReaderConnection> ObjectDescriptorImpl::Read( | |
| auto hash_function = CreateHashFunction(is_full_read); | ||
| auto hash_validator = CreateHashValidator(is_full_read); | ||
|
|
||
| auto range = std::make_shared<ReadRange>(p.start, p.length, hash_function, | ||
| std::move(hash_validator)); | ||
| absl::optional<std::int64_t> limit; | ||
| if (p.start < 0) { | ||
| if (p.start == (std::numeric_limits<std::int64_t>::min)()) { | ||
| limit = (std::numeric_limits<std::int64_t>::max)(); | ||
| } else { | ||
| limit = -p.start; | ||
| } | ||
| } else if (p.length > 0) { | ||
| limit = p.length; | ||
| } | ||
|
|
||
| auto range = std::make_shared<ReadRange>( | ||
| p.start, limit, hash_function, std::move(hash_validator), | ||
| read_object_spec_.bucket(), read_object_spec_.object()); | ||
|
|
||
| std::unique_lock<std::mutex> lk(mu_); | ||
| if (stream_manager_->Empty()) { | ||
|
|
@@ -329,16 +342,20 @@ void ObjectDescriptorImpl::OnRead( | |
| std::move(*response->mutable_read_handle()); | ||
| } | ||
| auto copy = it->active_ranges; | ||
| // Release the lock while notifying the ranges. The notifications may trigger | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: move this comment before |
||
| // application code, and that code may callback on this class. | ||
| bool is_transcoded = false; | ||
| absl::optional<std::int64_t> object_size; | ||
| if (metadata_.has_value()) { | ||
| is_transcoded = metadata_->content_encoding() == "gzip"; | ||
| object_size = metadata_->size(); | ||
| } | ||
|
v-pratap marked this conversation as resolved.
|
||
| lk.unlock(); | ||
| for (auto& range_data : *response->mutable_object_data_ranges()) { | ||
| auto id = range_data.read_range().read_id(); | ||
| auto const l = copy.find(id); | ||
| if (l == copy.end()) continue; | ||
| // TODO(#15104) - Consider returning if the range is done, and then | ||
| // skipping CleanupDoneRanges(). | ||
| l->second->OnRead(std::move(range_data)); | ||
| l->second->OnRead(std::move(range_data), is_transcoded, object_size); | ||
| } | ||
| lk.lock(); | ||
| stream_manager_->CleanupDoneRanges(it); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,10 @@ absl::optional<google::storage::v2::ReadRange> ReadRange::RangeForResume( | |
| range.set_read_id(read_id); | ||
| std::lock_guard<std::mutex> lk(mu_); | ||
| if (status_.has_value()) return absl::nullopt; | ||
| if (requested_length_.has_value() && *requested_length_ >= 0 && | ||
| received_bytes_ >= static_cast<std::size_t>(*requested_length_)) { | ||
| return absl::nullopt; | ||
| } | ||
| range.set_read_offset(offset_); | ||
| range.set_read_length(length_); | ||
| return range; | ||
|
|
@@ -61,15 +65,20 @@ void ReadRange::OnFinish(Status status) { | |
| std::unique_lock<std::mutex> lk(mu_); | ||
| if (status_) return; | ||
| status_ = std::move(status); | ||
| CheckOverrun(); | ||
| if (!wait_) return; | ||
| auto p = std::move(*wait_); | ||
| wait_.reset(); | ||
| lk.unlock(); | ||
| p.set_value(*status_); | ||
| } | ||
|
|
||
| void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { | ||
| void ReadRange::OnRead(google::storage::v2::ObjectRangeData data, | ||
| bool is_transcoded, | ||
| absl::optional<std::int64_t> object_size) { | ||
| std::unique_lock<std::mutex> lk(mu_); | ||
| is_transcoded_ = is_transcoded_ || is_transcoded; | ||
| if (object_size.has_value()) object_size_ = object_size; | ||
| if (status_) return; | ||
| auto* check_summed_data = data.mutable_checksummed_data(); | ||
| auto content = StealMutableContent(*data.mutable_checksummed_data()); | ||
|
|
@@ -81,13 +90,19 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { | |
| } | ||
|
|
||
| offset_ += content.size(); | ||
| received_bytes_ += content.size(); | ||
| if (length_ != 0) length_ -= std::min<std::int64_t>(content.size(), length_); | ||
| auto p = ReadPayloadImpl::Make(std::move(content)); | ||
|
|
||
| if (data.range_end()) { | ||
| status_ = Status{}; | ||
| CheckOverrun(); | ||
| auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); | ||
| if (result.is_mismatch) { | ||
| bool transcoded_download = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add a comment explaining why we're performing this check. Same comment for this change under reader_connection_resume.cc. |
||
| is_transcoded_ && | ||
| (!object_size_.has_value() || | ||
| (received_bytes_ != static_cast<std::size_t>(*object_size_))); | ||
| if (result.is_mismatch && !transcoded_download) { | ||
| status_ = google::cloud::internal::DataLossError( | ||
| absl::StrCat("mismatched checksums detected at the end of the " | ||
| "download, received={", | ||
|
|
@@ -105,6 +120,19 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { | |
| payload_ = std::move(p); | ||
| } | ||
|
|
||
| void ReadRange::CheckOverrun() { | ||
| if (requested_length_.has_value() && *requested_length_ >= 0 && | ||
| received_bytes_ > static_cast<std::size_t>(*requested_length_) && | ||
| !is_transcoded_ && !logged_warning_) { | ||
| logged_warning_ = true; | ||
| GCP_LOG(WARNING) << "storage: received " | ||
| << (received_bytes_ - | ||
| static_cast<std::size_t>(*requested_length_)) | ||
| << " more bytes than requested from GCS for bucket \"" | ||
| << bucket_name_ << "\", object \"" << object_name_ << "\""; | ||
| } | ||
| } | ||
|
|
||
| void ReadRange::Notify(std::unique_lock<std::mutex> lk, | ||
| storage::ReadPayload p) { | ||
| auto wait = std::move(*wait_); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pls add a comment about how limit is calculated for readers who are unfamiliar with GCS range request behaviors.