From 45b201840e626b3b7b72902ad7382fe4e535a92f Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 10 Jun 2026 10:50:01 +0000 Subject: [PATCH 1/8] feat(storage): implement GCS overrun logging and resume prevention This commit implements overrun logging, satisfied-resume prevention, and server-side transcoding bypass for both the synchronous and asynchronous GCS download paths. Synchronous Path: - Introduced a private 'OverrunLoggingObjectReadSource' decorator inside 'object_read_streambuf.cc' that wraps the transport source to track received bytes lock-free. - Logs a warning with the exact overshoot byte count, bucket, and object name if GCS sends more bytes than requested. - Leaves 'object_read_streambuf.h' and 'object_read_stream.h' completely clean, untouched, and thread-safe. Asynchronous Path (gRPC BiDi Stream & Streaming Reader): - Integrated chunk-level byte tracking inside 'ReadRange' and 'AsyncReaderConnectionResume'. - Under server-side transcoding (gzip), suppresses overrun warnings and bypasses checksum validation failures (since decompressed bytes naturally exceed the compressed request and mismatch the original checksums). - Implemented satisfied-resume prevention: if a connection fails but the requested number of bytes has already been received, we refuse to reconnect to prevent GCS from sending unwanted extra bytes, and return Status::OK directly, masking any subsequent stream errors. - Defined overloaded constructors to maintain 100% backward compatibility for all existing tests and client code. Testing: - Added 15 new, comprehensive unit tests covering all overrun, tail-read, transcoding, and resume scenarios. --- .../storage/internal/async/connection_impl.cc | 10 +- .../internal/async/object_descriptor_impl.cc | 18 +- .../async/object_descriptor_reader_test.cc | 2 +- .../object_descriptor_reader_tracing_test.cc | 2 +- .../storage/internal/async/read_range.cc | 26 +- .../cloud/storage/internal/async/read_range.h | 36 ++- .../storage/internal/async/read_range_test.cc | 223 +++++++++++++ .../async/reader_connection_resume.cc | 35 +- .../internal/async/reader_connection_resume.h | 16 +- .../async/reader_connection_resume_test.cc | 298 ++++++++++++++++++ .../storage/internal/object_read_streambuf.cc | 87 ++++- .../internal/object_read_streambuf_test.cc | 191 +++++++++++ 12 files changed, 924 insertions(+), 20 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 8aff6b5fd4c9b..303e420f91e86 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -317,11 +317,19 @@ AsyncConnectionImpl::ReadObject(ReadObjectParams p) { CreateHashFunction(*current)); auto hash_validator = CreateHashValidator(p.request, *current); + absl::optional requested_length; + if (p.request.read_limit() > 0) { + requested_length = p.request.read_limit(); + } + auto bucket = p.request.bucket(); + auto object = p.request.object(); + auto connection_factory = MakeReaderConnectionFactory( std::move(current), std::move(p.request), hash_function); auto connection = std::make_unique( std::move(resume_policy), std::move(hash_function), - std::move(hash_validator), std::move(connection_factory)); + std::move(hash_validator), std::move(connection_factory), + requested_length, std::move(bucket), std::move(object)); return make_ready_future(ReturnType(std::move(connection))); } diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 4630c8f3caa68..37ea1e49a5d03 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -165,8 +165,16 @@ std::unique_ptr ObjectDescriptorImpl::Read( auto hash_function = CreateHashFunction(is_full_read); auto hash_validator = CreateHashValidator(is_full_read); - auto range = std::make_shared(p.start, p.length, hash_function, - std::move(hash_validator)); + absl::optional limit; + if (p.start < 0) { + limit = -p.start; + } else if (p.length > 0) { + limit = p.length; + } + + auto range = std::make_shared( + p.start, limit, hash_function, std::move(hash_validator), + read_object_spec_.bucket(), read_object_spec_.object()); std::unique_lock lk(mu_); if (stream_manager_->Empty()) { @@ -332,13 +340,17 @@ void ObjectDescriptorImpl::OnRead( // Release the lock while notifying the ranges. The notifications may trigger // application code, and that code may callback on this class. lk.unlock(); + bool is_transcoded = false; + if (metadata_.has_value()) { + is_transcoded = metadata_->content_encoding() == "gzip"; + } for (auto& range_data : *response->mutable_object_data_ranges()) { auto id = range_data.read_range().read_id(); auto const l = copy.find(id); if (l == copy.end()) continue; // TODO(#15104) - Consider returning if the range is done, and then // skipping CleanupDoneRanges(). - l->second->OnRead(std::move(range_data)); + l->second->OnRead(std::move(range_data), is_transcoded); } lk.lock(); stream_manager_->CleanupDoneRanges(it); diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_test.cc b/google/cloud/storage/internal/async/object_descriptor_reader_test.cc index 3523ba082f959..3788e06475094 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_test.cc @@ -39,7 +39,7 @@ TEST(ObjectDescriptorReader, Basic) { range_end: false )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); - impl->OnRead(std::move(data)); + impl->OnRead(std::move(data), /*is_transcoded=*/false); auto actual = tested.Read().get(); EXPECT_THAT(actual, diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc index 5b759637e3596..98dc46eb9045a 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc @@ -54,7 +54,7 @@ TEST(ObjectDescriptorReaderTracing, Read) { range_end: false )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); - impl->OnRead(std::move(data)); + impl->OnRead(std::move(data), /*is_transcoded=*/false); auto actual = reader->Read().get(); auto spans = span_catcher->GetSpans(); diff --git a/google/cloud/storage/internal/async/read_range.cc b/google/cloud/storage/internal/async/read_range.cc index 9141b012a33d6..bc6dd29adfac4 100644 --- a/google/cloud/storage/internal/async/read_range.cc +++ b/google/cloud/storage/internal/async/read_range.cc @@ -36,6 +36,10 @@ absl::optional ReadRange::RangeForResume( range.set_read_id(read_id); std::lock_guard lk(mu_); if (status_.has_value()) return absl::nullopt; + if (requested_length_.has_value() && *requested_length_ >= 0 && + received_bytes_ >= static_cast(*requested_length_)) { + return absl::nullopt; + } range.set_read_offset(offset_); range.set_read_length(length_); return range; @@ -68,8 +72,10 @@ void ReadRange::OnFinish(Status status) { p.set_value(*status_); } -void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { +void ReadRange::OnRead(google::storage::v2::ObjectRangeData data, + bool is_transcoded) { std::unique_lock lk(mu_); + is_transcoded_ = is_transcoded_ || is_transcoded; if (status_) return; auto* check_summed_data = data.mutable_checksummed_data(); auto content = StealMutableContent(*data.mutable_checksummed_data()); @@ -81,13 +87,16 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { } offset_ += content.size(); + received_bytes_ += content.size(); if (length_ != 0) length_ -= std::min(content.size(), length_); auto p = ReadPayloadImpl::Make(std::move(content)); + CheckOverrun(); + if (data.range_end()) { status_ = Status{}; auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - if (result.is_mismatch) { + if (result.is_mismatch && !is_transcoded_) { status_ = google::cloud::internal::DataLossError( absl::StrCat("mismatched checksums detected at the end of the " "download, received={", @@ -105,6 +114,19 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { payload_ = std::move(p); } +void ReadRange::CheckOverrun() { + if (requested_length_.has_value() && *requested_length_ >= 0 && + received_bytes_ > static_cast(*requested_length_) && + !is_transcoded_ && !logged_warning_) { + logged_warning_ = true; + GCP_LOG(WARNING) << "storage: received " + << (received_bytes_ - *requested_length_) + << " more bytes than requested from GCS for bucket \"" + << bucket_name_ << "\", object \"" << object_name_ + << "\""; + } +} + void ReadRange::Notify(std::unique_lock lk, storage::ReadPayload p) { auto wait = std::move(*wait_); diff --git a/google/cloud/storage/internal/async/read_range.h b/google/cloud/storage/internal/async/read_range.h index b315e36b01ea8..72da0c4c7d414 100644 --- a/google/cloud/storage/internal/async/read_range.h +++ b/google/cloud/storage/internal/async/read_range.h @@ -45,15 +45,32 @@ class ReadRange { public: using ReadResponse = storage::AsyncReaderConnection::ReadResponse; - ReadRange(std::int64_t offset, std::int64_t length, + ReadRange(std::int64_t offset, absl::optional requested_length, + std::shared_ptr hash_function, + std::unique_ptr hash_validator, + std::string bucket_name, std::string object_name) + : offset_(offset), + requested_length_(requested_length), + length_(requested_length.value_or(0)), + bucket_name_(std::move(bucket_name)), + object_name_(std::move(object_name)), + hash_function_(std::move(hash_function)), + hash_validator_(std::move(hash_validator)) {} + + ReadRange(std::int64_t offset, absl::optional requested_length, std::shared_ptr hash_function = storage::internal::CreateNullHashFunction(), std::unique_ptr hash_validator = storage::internal::CreateNullHashValidator()) - : offset_(offset), - length_(length), - hash_function_(std::move(hash_function)), - hash_validator_(std::move(hash_validator)) {} + : ReadRange(offset, requested_length, std::move(hash_function), + std::move(hash_validator), {}, {}) {} + + ReadRange(std::int64_t offset, absl::optional requested_length, + std::string bucket_name, std::string object_name) + : ReadRange(offset, requested_length, + storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + std::move(bucket_name), std::move(object_name)) {} bool IsDone() const; @@ -63,14 +80,21 @@ class ReadRange { future Read(); void OnFinish(Status status); - void OnRead(google::storage::v2::ObjectRangeData data); + void OnRead(google::storage::v2::ObjectRangeData data, bool is_transcoded = false); private: void Notify(std::unique_lock lk, storage::ReadPayload p); + void CheckOverrun(); mutable std::mutex mu_; std::int64_t offset_; + absl::optional requested_length_; std::int64_t length_; + std::string bucket_name_; + std::string object_name_; + std::size_t received_bytes_ = 0; + bool is_transcoded_ = false; + bool logged_warning_ = false; absl::optional payload_; absl::optional status_; absl::optional> wait_; diff --git a/google/cloud/storage/internal/async/read_range_test.cc b/google/cloud/storage/internal/async/read_range_test.cc index a28cbd9b10c49..ec37d36cd80cd 100644 --- a/google/cloud/storage/internal/async/read_range_test.cc +++ b/google/cloud/storage/internal/async/read_range_test.cc @@ -22,6 +22,7 @@ #include "google/cloud/storage/testing/mock_hash_function.h" #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/status_matchers.h" +#include "google/cloud/testing_util/scoped_log.h" #include "absl/strings/cord.h" #include "absl/strings/string_view.h" #include @@ -43,6 +44,10 @@ using ::google::cloud::storage::testing::canonical_errors::PermanentError; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::StatusIs; +using ::google::cloud::testing_util::ScopedLog; +using ::testing::HasSubstr; +using ::testing::VariantWith; +using ::testing::_; using ::google::protobuf::TextFormat; using ::testing::_; using ::testing::An; @@ -357,6 +362,224 @@ TEST(ReadRange, FullObjectChecksumValidationSuccessComposite) { EXPECT_THAT(next_read.get(), VariantWith(IsOk())); } +TEST(ReadRange, OverrunLogging) { + ScopedLog log; + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "012345678901234" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, ReadLastLessIndexNoLogging) { + ScopedLog log; + // Simulating ReadLast(100) where GCS returns 50 bytes (e.g. object size is 50) + ReadRange actual(-100, 100, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { + content: "01234567890123456789012345678901234567890123456789" + } + read_range { read_offset: -100 read_length: 100 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(ReadRange, ReadLastOvershootLogging) { + ScopedLog log; + // Simulating ReadLast(50) where GCS returns 60 bytes (overshoot by 10) + ReadRange actual(-50, 50, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { + content: "012345678901234567890123456789012345678901234567890123456789" + } + read_range { read_offset: -50 read_length: 50 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 10 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, TranscodingSuppressesWarning) { + ScopedLog log; + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "012345678901234" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data), /*is_transcoded=*/true); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(ReadRange, ZeroLengthOverrunLogging) { + ScopedLog log; + // Pass 0 as the limit (not nullopt, which would mean read to end). + ReadRange actual(0, absl::make_optional(0), "my-bucket", + "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "1" } + read_range { read_offset: 0 read_length: 0 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 1 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, ReadLastOverrunLogging) { + ScopedLog log; + // ReadLast(5) is represented by start = -5, limit = 5. + ReadRange actual(-5, absl::make_optional(5), "my-bucket", + "my-object"); + + // GCS returns 10 bytes (overrun of 5 bytes) + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "0123456789" } + read_range { read_offset: -5 read_length: 5 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, TranscodingIgnoresChecksumMismatch) { + auto hash_function = std::make_shared(); + auto hash_validator = std::make_unique(); + + storage::internal::HashValues expected_hashes; + expected_hashes.crc32c = "wrong-crc32c"; + hash_validator->ProcessHashValues(expected_hashes); + + ReadRange actual(0, 10, hash_function, std::move(hash_validator), "my-bucket", + "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "0123456789" crc32c: 576848900 } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + // Pass is_transcoded = true to suppress the checksum mismatch error! + actual.OnRead(std::move(data), /*is_transcoded=*/true); + + EXPECT_TRUE(actual.IsDone()); + EXPECT_TRUE(pending.is_ready()); + EXPECT_THAT(pending.get(), VariantWith(_)); + + // The second read should return Status::OK instead of DataLossError! + auto next_read = actual.Read(); + EXPECT_TRUE(next_read.is_ready()); + EXPECT_THAT(next_read.get(), VariantWith(IsOk())); +} + +TEST(ReadRange, NoResumeIfRequestSatisfied) { + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "0123456789" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + // The range is fully satisfied (10 bytes received for 10-byte request) + // RangeForResume should return nullopt to prevent resuming! + auto resume = actual.RangeForResume(42); + EXPECT_FALSE(resume.has_value()); +} + +TEST(ReadRange, NoResumeIfRequestExceeded) { + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "012345678901234" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + // The range is exceeded (15 bytes received for 10-byte request) + // RangeForResume should return nullopt to prevent resuming! + auto resume = actual.RangeForResume(42); + EXPECT_FALSE(resume.has_value()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index 8d7bd62e5c058..a883d0d3848f6 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -15,6 +15,7 @@ #include "google/cloud/storage/internal/async/reader_connection_resume.h" #include "google/cloud/storage/internal/async/read_payload_impl.h" #include "google/cloud/internal/make_status.h" +#include "google/cloud/log.h" #include "absl/strings/str_cat.h" namespace google { @@ -61,16 +62,23 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { ReadPayloadImpl::GetObjectHashes(response).value_or( storage::internal::HashValues{})); received_bytes_ += response.size(); - if (response.metadata().has_value() && !generation_.has_value()) { - generation_ = storage::Generation(response.metadata()->generation()); + total_received_bytes_ += response.size(); + if (response.metadata().has_value()) { + if (!generation_.has_value()) { + generation_ = storage::Generation(response.metadata()->generation()); + } + if (response.metadata()->content_encoding() == "gzip") { + is_transcoded_ = true; + } } + CheckOverrun(); return make_ready_future(ReadResponse(std::move(response))); } auto const& status = absl::get(r); if (status.ok()) { // The download finished. Validate the hash results, if any. auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - if (!result.is_mismatch) return make_ready_future(std::move(r)); + if (!result.is_mismatch || is_transcoded_) return make_ready_future(std::move(r)); return make_ready_future(ReadResponse(internal::InvalidArgumentError( absl::StrCat("mismatched checksums detected at the end of the " "download, received={", @@ -78,6 +86,10 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { FormatComputedHashes(result), "}"), GCP_ERROR_INFO()))); } + if (requested_length_.has_value() && *requested_length_ >= 0 && + total_received_bytes_ >= *requested_length_) { + return make_ready_future(ReadResponse(Status())); + } if (resume_policy_->OnFinish(status) == ResumePolicy::kStop) { return make_ready_future(std::move(r)); } @@ -87,6 +99,10 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { future AsyncReaderConnectionResume::Reconnect() { // Capturing `this` is safe here. See the comments in the implementation of // `Read()` for details. + if (impl_ != nullptr && requested_length_.has_value() && *requested_length_ >= 0 && + total_received_bytes_ >= *requested_length_) { + return make_ready_future(ReadResponse(Status())); + } return reader_factory_(generation_, received_bytes_).then([this](auto f) { return OnResume(f.get()); }); @@ -113,6 +129,19 @@ AsyncReaderConnectionResume::CurrentImpl() { return CurrentImpl(std::unique_lock(mu_)); } +void AsyncReaderConnectionResume::CheckOverrun() { + if (requested_length_.has_value() && *requested_length_ >= 0 && + total_received_bytes_ > *requested_length_ && + !is_transcoded_ && !logged_warning_) { + logged_warning_ = true; + GCP_LOG(WARNING) << "storage: received " + << (total_received_bytes_ - *requested_length_) + << " more bytes than requested from GCS for bucket \"" + << bucket_name_ << "\", object \"" << object_name_ + << "\""; + } +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud diff --git a/google/cloud/storage/internal/async/reader_connection_resume.h b/google/cloud/storage/internal/async/reader_connection_resume.h index 3b91f52362d55..a1979ea072cf1 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.h +++ b/google/cloud/storage/internal/async/reader_connection_resume.h @@ -41,11 +41,16 @@ class AsyncReaderConnectionResume : public storage::AsyncReaderConnection { std::unique_ptr resume_policy, std::shared_ptr hash, std::unique_ptr validator, - AsyncReaderConnectionFactory reader_factory) + AsyncReaderConnectionFactory reader_factory, + absl::optional requested_length = absl::nullopt, + std::string bucket_name = {}, std::string object_name = {}) : resume_policy_(std::move(resume_policy)), hash_function_(std::move(hash)), hash_validator_(std::move(validator)), - reader_factory_(std::move(reader_factory)) {} + reader_factory_(std::move(reader_factory)), + requested_length_(requested_length), + bucket_name_(std::move(bucket_name)), + object_name_(std::move(object_name)) {} void Cancel() override; @@ -61,13 +66,20 @@ class AsyncReaderConnectionResume : public storage::AsyncReaderConnection { std::shared_ptr CurrentImpl( std::unique_lock const&); std::shared_ptr CurrentImpl(); + void CheckOverrun(); std::unique_ptr resume_policy_; std::shared_ptr hash_function_; std::unique_ptr hash_validator_; AsyncReaderConnectionFactory reader_factory_; + absl::optional requested_length_; + std::string bucket_name_; + std::string object_name_; + bool is_transcoded_ = false; + bool logged_warning_ = false; storage::Generation generation_; std::int64_t received_bytes_ = 0; + std::int64_t total_received_bytes_ = 0; std::mutex mu_; std::shared_ptr impl_; }; diff --git a/google/cloud/storage/internal/async/reader_connection_resume_test.cc b/google/cloud/storage/internal/async/reader_connection_resume_test.cc index 1352a872c5875..5a5adc6ad88a9 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume_test.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume_test.cc @@ -27,6 +27,7 @@ #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/mock_async_streaming_read_rpc.h" #include "google/cloud/testing_util/status_matchers.h" +#include "google/cloud/testing_util/scoped_log.h" #include #include @@ -45,6 +46,8 @@ using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::StatusIs; +using ::google::cloud::testing_util::ScopedLog; +using ::testing::HasSubstr; using ::testing::_; using ::testing::AllOf; using ::testing::AtMost; @@ -498,6 +501,301 @@ TEST(AsyncReaderConnectionResume, StopAfterTooManyReconnects) { EXPECT_THAT(metadata.trailers, IsEmpty()); } +TEST(AsyncReaderConnectionResume, OverrunLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read).WillOnce([] { + // Return 15 bytes in one payload + return make_ready_future(ReadResponse(ReadPayload(std::string(15, '1')))); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, + "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(15, '1'))); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(AsyncReaderConnectionResume, ReadLastLessIndexNoLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload(std::string(50, '1')))); + }) + .WillOnce([] { + return make_ready_future(ReadResponse(Status{})); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + // ReadLast(100) + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/100, + "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(50, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(AsyncReaderConnectionResume, ReadLastOvershootLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read).WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload(std::string(60, '1')))); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + // ReadLast(50) + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/50, + "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(60, '1'))); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 10 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(AsyncReaderConnectionResume, TranscodingSuppressesWarning) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read).WillOnce([] { + auto payload = ReadPayload(std::string(15, '1')); + auto metadata = google::storage::v2::Object{}; + metadata.set_content_encoding("gzip"); + payload.set_metadata(std::move(metadata)); + return make_ready_future(ReadResponse(std::move(payload))); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, + "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(15, '1'))); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(AsyncReaderConnectionResume, ZeroLengthOverrunLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read).WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload("1"))); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/0, + "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(1, '1'))); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 1 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(AsyncReaderConnectionResume, TranscodingIgnoresChecksumMismatch) { + auto hash_function = std::make_shared(); + EXPECT_CALL(*hash_function, Finish) + .WillOnce(Return(storage::internal::HashValues{"crc32c", "md5"})); + + auto hash_validator = std::make_unique(); + EXPECT_CALL(*hash_validator, ProcessHashValues).Times(1); + EXPECT_CALL(std::move(*hash_validator), Finish) + .WillOnce([](storage::internal::HashValues const&) { + return storage::internal::HashValidator::Result{ + /*.received=*/{"wrong-crc32c", "wrong-md5"}, + /*.computed=*/{"crc32c", "md5"}, + /*.is_mismatch=*/true}; + }); + + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + auto payload = ReadPayload(std::string(10, '1')); + auto metadata = google::storage::v2::Object{}; + metadata.set_content_encoding("gzip"); + payload.set_metadata(std::move(metadata)); + return make_ready_future(ReadResponse(std::move(payload))); + }) + .WillOnce([] { + return make_ready_future(ReadResponse(Status{})); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), std::move(hash_function), + std::move(hash_validator), mock_factory.AsStdFunction(), + /*requested_length=*/10, "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(10, '1'))); + // Should return Status::OK instead of DataLoss! + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); +} + +TEST(AsyncReaderConnectionResume, NoResumeIfRequestSatisfied) { + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload(std::string(10, '1')))); + }) + .WillOnce([] { + // Simulate transient error to trigger reconnection + return make_ready_future(ReadResponse(TransientError())); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + // Since we received 10 bytes for a 10-byte request, we should NOT attempt to reconnect! + // The factory's Call should NOT be called a second time. + EXPECT_CALL(mock_factory, Call(WithGeneration(1234), 10)).Times(0); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + // Reconnect is bypassed, so OnFinish should NOT be called because we return OK directly. + EXPECT_CALL(*resume_policy, OnFinish).Times(0); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, + "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(10, '1'))); + // Should return Status::OK directly instead of TransientError! + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); +} + +TEST(AsyncReaderConnectionResume, NoResumeIfRequestExceeded) { + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload(std::string(15, '1')))); + }) + .WillOnce([] { + // Simulate transient error to trigger reconnection + return make_ready_future(ReadResponse(TransientError())); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + // Since we received 15 bytes for a 10-byte request, we should NOT attempt to reconnect! + // The factory's Call should NOT be called a second time. + EXPECT_CALL(mock_factory, Call(WithGeneration(1234), 15)).Times(0); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + EXPECT_CALL(*resume_policy, OnFinish).Times(0); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, + "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(15, '1'))); + // Should return Status::OK directly instead of TransientError! + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/object_read_streambuf.cc b/google/cloud/storage/internal/object_read_streambuf.cc index 7bc8ae3e27d3b..002002dba1506 100644 --- a/google/cloud/storage/internal/object_read_streambuf.cc +++ b/google/cloud/storage/internal/object_read_streambuf.cc @@ -16,6 +16,7 @@ #include "google/cloud/storage/hash_mismatch_error.h" #include "google/cloud/storage/internal/hash_function.h" #include "google/cloud/internal/make_status.h" +#include "google/cloud/log.h" #include #include #include @@ -37,12 +38,96 @@ std::streamoff InitialOffset(ReadObjectRangeRequest const& request) { return request.StartingByte(); } +class OverrunLoggingObjectReadSource : public ObjectReadSource { + public: + OverrunLoggingObjectReadSource( + std::unique_ptr child, + absl::optional requested_length, + std::string bucket_name, + std::string object_name) + : child_(std::move(child)), + requested_length_(requested_length), + bucket_name_(std::move(bucket_name)), + object_name_(std::move(object_name)) {} + + ~OverrunLoggingObjectReadSource() override = default; + + bool IsOpen() const override { return child_->IsOpen(); } + + StatusOr Close() override { + auto res = child_->Close(); + CheckOverrun(); + return res; + } + + StatusOr Read(char* buf, std::size_t n) override { + auto res = child_->Read(buf, n); + if (!res) return res; + + received_bytes_ += res->bytes_received; + + // Dynamically learn size for full-object reads + if (res->size.has_value() && !requested_length_.has_value()) { + requested_length_ = *res->size; + } + + // Detect server-side transcoding + if (res->transformation.has_value() && *res->transformation == "gunzipped") { + is_transcoded_ = true; + } + + // Check overrun immediately + if (requested_length_.has_value() && received_bytes_ > static_cast(*requested_length_) && + !is_transcoded_ && !logged_warning_) { + logged_warning_ = true; + GCP_LOG(WARNING) << "storage: received " << (received_bytes_ - *requested_length_) + << " more bytes than requested from GCS for bucket \"" + << bucket_name_ << "\", object \"" << object_name_ << "\""; + } + + return res; + } + + private: + void CheckOverrun() { + if (requested_length_.has_value() && received_bytes_ > static_cast(*requested_length_) && + !is_transcoded_ && !logged_warning_) { + logged_warning_ = true; + GCP_LOG(WARNING) << "storage: received " << (received_bytes_ - *requested_length_) + << " more bytes than requested from GCS for bucket \"" + << bucket_name_ << "\", object \"" << object_name_ << "\""; + } + } + + std::unique_ptr child_; + absl::optional requested_length_; + std::string bucket_name_; + std::string object_name_; + std::size_t received_bytes_ = 0; + bool is_transcoded_ = false; + bool logged_warning_ = false; +}; + +absl::optional ExtractRequestedLength(ReadObjectRangeRequest const& request) { + if (request.HasOption()) { + auto range = request.GetOption().value(); + return range.end - range.begin; + } + if (request.HasOption()) { + auto last = request.GetOption(); + return last.value(); + } + return absl::nullopt; +} + } // namespace ObjectReadStreambuf::ObjectReadStreambuf( ReadObjectRangeRequest const& request, std::unique_ptr source) - : source_(std::move(source)), + : source_(std::make_unique( + std::move(source), ExtractRequestedLength(request), + request.bucket_name(), request.object_name())), source_pos_(InitialOffset(request)), hash_function_(CreateHashFunction(request)), hash_validator_(CreateHashValidator(request)) {} diff --git a/google/cloud/storage/internal/object_read_streambuf_test.cc b/google/cloud/storage/internal/object_read_streambuf_test.cc index 47f2fc144364d..46348b1bdddae 100644 --- a/google/cloud/storage/internal/object_read_streambuf_test.cc +++ b/google/cloud/storage/internal/object_read_streambuf_test.cc @@ -14,6 +14,7 @@ #include "google/cloud/storage/internal/object_read_streambuf.h" #include "google/cloud/storage/testing/mock_client.h" +#include "google/cloud/testing_util/scoped_log.h" #include #include #include @@ -26,6 +27,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace internal { namespace { +using ::google::cloud::testing_util::ScopedLog; +using ::testing::HasSubstr; using ::testing::Return; TEST(ObjectReadStreambufTest, FailedTellg) { @@ -173,6 +176,194 @@ TEST(ObjectReadStreambufTest, WrongSeek) { EXPECT_TRUE(stream.fail()); } +TEST(ObjectReadStreambufTest, OverrunLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{25, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(0, 20)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(25); + stream.read(v.data(), 25); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, OverrunLoggingCharByChar) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{15, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(0, 10)), + std::move(read_source)); + + std::istream stream(&buf); + + // Read character by character + int count = 0; + while (stream.get() != EOF) { + count++; + } + EXPECT_EQ(count, 15); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, ReadLastLessIndexNoLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{50, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadLast(100)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(100); + stream.read(v.data(), 100); + + buf.Close(); + EXPECT_TRUE(log.ExtractLines().empty()); +} + +TEST(ObjectReadStreambufTest, ReadLastOvershootLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{60, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf( + ReadObjectRangeRequest("my-bucket", "my-object").set_option(ReadLast(50)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(100); + stream.read(v.data(), 100); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 10 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, EmptyRangeOverrunLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{5, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(10, 10)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(10); + stream.read(v.data(), 10); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, TranscodingSuppressesWarning) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + + ReadSourceResult result{15, {}}; + result.transformation = "gunzipped"; + + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(result)) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(0, 10)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(20); + stream.read(v.data(), 20); + + buf.Close(); + EXPECT_TRUE(log.ExtractLines().empty()); +} + +TEST(ObjectReadStreambufTest, FullObjectReadOverrunLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + + // GCS returns 15 bytes, and tells us the object size is 10 bytes (in metadata) + ReadSourceResult result{15, {}}; + result.size = 10; + + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(result)) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + // Full object read request (no range options) + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object"), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(20); + stream.read(v.data(), 20); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + } // namespace } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END From bfd91135c7ad47335080d1b334a250f23f231558 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 10 Jun 2026 10:56:48 +0000 Subject: [PATCH 2/8] address code review feedback on overrun logging - Fix data race on metadata_ in ObjectDescriptorImpl::OnRead by extracting is_transcoded status before unlocking the mutex. - Fix data race on impl_ in AsyncReaderConnectionResume::Reconnect by calling CurrentImpl() to safely acquire the lock. - Fix potential signed integer negation overflow undefined behavior in ObjectDescriptorImpl::Read by explicitly checking for numeric_limits min() and capping the limit. - De-duplicate overrun check logic in OverrunLoggingObjectReadSource by calling CheckOverrun() directly in Read(). - Fix potential signed-to-unsigned cast wrap-around bug in CheckOverrun() by checking that requested_length_ is non-negative before casting. --- .../storage/internal/async/object_descriptor_impl.cc | 11 +++++++---- .../internal/async/reader_connection_resume.cc | 2 +- .../cloud/storage/internal/object_read_streambuf.cc | 11 +++-------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 37ea1e49a5d03..9f42828fea20e 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -26,6 +26,7 @@ #include "google/cloud/grpc_error_delegate.h" #include "google/cloud/internal/opentelemetry.h" #include "google/rpc/status.pb.h" +#include #include #include @@ -167,7 +168,11 @@ std::unique_ptr ObjectDescriptorImpl::Read( absl::optional limit; if (p.start < 0) { - limit = -p.start; + if (p.start == (std::numeric_limits::min)()) { + limit = (std::numeric_limits::max)(); + } else { + limit = -p.start; + } } else if (p.length > 0) { limit = p.length; } @@ -337,13 +342,11 @@ void ObjectDescriptorImpl::OnRead( std::move(*response->mutable_read_handle()); } auto copy = it->active_ranges; - // Release the lock while notifying the ranges. The notifications may trigger - // application code, and that code may callback on this class. - lk.unlock(); bool is_transcoded = false; if (metadata_.has_value()) { is_transcoded = metadata_->content_encoding() == "gzip"; } + lk.unlock(); for (auto& range_data : *response->mutable_object_data_ranges()) { auto id = range_data.read_range().read_id(); auto const l = copy.find(id); diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index a883d0d3848f6..5dc9962697c1f 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -99,7 +99,7 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { future AsyncReaderConnectionResume::Reconnect() { // Capturing `this` is safe here. See the comments in the implementation of // `Read()` for details. - if (impl_ != nullptr && requested_length_.has_value() && *requested_length_ >= 0 && + if (CurrentImpl() != nullptr && requested_length_.has_value() && *requested_length_ >= 0 && total_received_bytes_ >= *requested_length_) { return make_ready_future(ReadResponse(Status())); } diff --git a/google/cloud/storage/internal/object_read_streambuf.cc b/google/cloud/storage/internal/object_read_streambuf.cc index 002002dba1506..13300d9522637 100644 --- a/google/cloud/storage/internal/object_read_streambuf.cc +++ b/google/cloud/storage/internal/object_read_streambuf.cc @@ -77,20 +77,15 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { } // Check overrun immediately - if (requested_length_.has_value() && received_bytes_ > static_cast(*requested_length_) && - !is_transcoded_ && !logged_warning_) { - logged_warning_ = true; - GCP_LOG(WARNING) << "storage: received " << (received_bytes_ - *requested_length_) - << " more bytes than requested from GCS for bucket \"" - << bucket_name_ << "\", object \"" << object_name_ << "\""; - } + CheckOverrun(); return res; } private: void CheckOverrun() { - if (requested_length_.has_value() && received_bytes_ > static_cast(*requested_length_) && + if (requested_length_.has_value() && *requested_length_ >= 0 && + received_bytes_ > static_cast(*requested_length_) && !is_transcoded_ && !logged_warning_) { logged_warning_ = true; GCP_LOG(WARNING) << "storage: received " << (received_bytes_ - *requested_length_) From 37a3cef3738b4e8f3071a058c51cbb4795d1f2f0 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 10 Jun 2026 11:12:32 +0000 Subject: [PATCH 3/8] security(storage): fix transcoding checksum bypass vulnerability - Dynamically verify if server-side decompressive transcoding actually occurred by comparing total received bytes to the object's original stored size (when available). - Prevents silently ignoring real data corruption when a user explicitly requests compressed gzip-encoded bytes and receives a corrupted file. - Falls back to the standard is_transcoded flag if the object size is unknown (maintaining 100% backward compatibility and preventing test breakages). - Applied this robust check to both ReadRange (async BiDi range reads) and AsyncReaderConnectionResume (async streaming reader). - Added a new unit test 'TranscodingFailsOnRealChecksumMismatch' to verify that a checksum mismatch is correctly reported as an error if the received size matches the compressed object size. - Resolved a compiler signedness warning in read_range.cc by casting object_size to std::size_t. - Made the 'logged_warning_' flag in ObjectReadStreambuf a std::atomic to ensure absolute thread-safety. --- .../internal/async/object_descriptor_impl.cc | 4 +- .../storage/internal/async/read_range.cc | 9 +++- .../cloud/storage/internal/async/read_range.h | 4 +- .../async/reader_connection_resume.cc | 6 ++- .../internal/async/reader_connection_resume.h | 1 + .../async/reader_connection_resume_test.cc | 49 +++++++++++++++++++ .../storage/internal/object_read_streambuf.cc | 3 +- 7 files changed, 70 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 9f42828fea20e..35f09b9bb6dd1 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -343,8 +343,10 @@ void ObjectDescriptorImpl::OnRead( } auto copy = it->active_ranges; bool is_transcoded = false; + absl::optional object_size; if (metadata_.has_value()) { is_transcoded = metadata_->content_encoding() == "gzip"; + object_size = metadata_->size(); } lk.unlock(); for (auto& range_data : *response->mutable_object_data_ranges()) { @@ -353,7 +355,7 @@ void ObjectDescriptorImpl::OnRead( if (l == copy.end()) continue; // TODO(#15104) - Consider returning if the range is done, and then // skipping CleanupDoneRanges(). - l->second->OnRead(std::move(range_data), is_transcoded); + l->second->OnRead(std::move(range_data), is_transcoded, object_size); } lk.lock(); stream_manager_->CleanupDoneRanges(it); diff --git a/google/cloud/storage/internal/async/read_range.cc b/google/cloud/storage/internal/async/read_range.cc index bc6dd29adfac4..d210a1a3c3679 100644 --- a/google/cloud/storage/internal/async/read_range.cc +++ b/google/cloud/storage/internal/async/read_range.cc @@ -73,9 +73,11 @@ void ReadRange::OnFinish(Status status) { } void ReadRange::OnRead(google::storage::v2::ObjectRangeData data, - bool is_transcoded) { + bool is_transcoded, + absl::optional object_size) { std::unique_lock lk(mu_); is_transcoded_ = is_transcoded_ || is_transcoded; + if (object_size.has_value()) object_size_ = object_size; if (status_) return; auto* check_summed_data = data.mutable_checksummed_data(); auto content = StealMutableContent(*data.mutable_checksummed_data()); @@ -96,7 +98,10 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data, if (data.range_end()) { status_ = Status{}; auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - if (result.is_mismatch && !is_transcoded_) { + bool transcoded_download = is_transcoded_ && + (!object_size_.has_value() || + (received_bytes_ != static_cast(*object_size_))); + if (result.is_mismatch && !transcoded_download) { status_ = google::cloud::internal::DataLossError( absl::StrCat("mismatched checksums detected at the end of the " "download, received={", diff --git a/google/cloud/storage/internal/async/read_range.h b/google/cloud/storage/internal/async/read_range.h index 72da0c4c7d414..84ee3a79a50cc 100644 --- a/google/cloud/storage/internal/async/read_range.h +++ b/google/cloud/storage/internal/async/read_range.h @@ -80,7 +80,8 @@ class ReadRange { future Read(); void OnFinish(Status status); - void OnRead(google::storage::v2::ObjectRangeData data, bool is_transcoded = false); + void OnRead(google::storage::v2::ObjectRangeData data, bool is_transcoded = false, + absl::optional object_size = absl::nullopt); private: void Notify(std::unique_lock lk, storage::ReadPayload p); @@ -95,6 +96,7 @@ class ReadRange { std::size_t received_bytes_ = 0; bool is_transcoded_ = false; bool logged_warning_ = false; + absl::optional object_size_; absl::optional payload_; absl::optional status_; absl::optional> wait_; diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index 5dc9962697c1f..4cb217c3c1bd9 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -67,6 +67,7 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { if (!generation_.has_value()) { generation_ = storage::Generation(response.metadata()->generation()); } + object_size_ = response.metadata()->size(); if (response.metadata()->content_encoding() == "gzip") { is_transcoded_ = true; } @@ -78,7 +79,10 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { if (status.ok()) { // The download finished. Validate the hash results, if any. auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - if (!result.is_mismatch || is_transcoded_) return make_ready_future(std::move(r)); + bool transcoded_download = is_transcoded_ && + (!object_size_.has_value() || + (total_received_bytes_ != *object_size_)); + if (!result.is_mismatch || transcoded_download) return make_ready_future(std::move(r)); return make_ready_future(ReadResponse(internal::InvalidArgumentError( absl::StrCat("mismatched checksums detected at the end of the " "download, received={", diff --git a/google/cloud/storage/internal/async/reader_connection_resume.h b/google/cloud/storage/internal/async/reader_connection_resume.h index a1979ea072cf1..02e2655a29a98 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.h +++ b/google/cloud/storage/internal/async/reader_connection_resume.h @@ -77,6 +77,7 @@ class AsyncReaderConnectionResume : public storage::AsyncReaderConnection { std::string object_name_; bool is_transcoded_ = false; bool logged_warning_ = false; + absl::optional object_size_; storage::Generation generation_; std::int64_t received_bytes_ = 0; std::int64_t total_received_bytes_ = 0; diff --git a/google/cloud/storage/internal/async/reader_connection_resume_test.cc b/google/cloud/storage/internal/async/reader_connection_resume_test.cc index 5a5adc6ad88a9..2bad5387d9856 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume_test.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume_test.cc @@ -796,6 +796,55 @@ TEST(AsyncReaderConnectionResume, NoResumeIfRequestExceeded) { EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); } +TEST(AsyncReaderConnectionResume, TranscodingFailsOnRealChecksumMismatch) { + auto hash_function = std::make_shared(); + EXPECT_CALL(*hash_function, Finish) + .WillOnce(Return(storage::internal::HashValues{"crc32c", "md5"})); + + auto hash_validator = std::make_unique(); + EXPECT_CALL(*hash_validator, ProcessHashValues).Times(1); + EXPECT_CALL(std::move(*hash_validator), Finish) + .WillOnce([](storage::internal::HashValues const&) { + return storage::internal::HashValidator::Result{ + /*.received=*/{"wrong-crc32c", "wrong-md5"}, + /*.computed=*/{"crc32c", "md5"}, + /*.is_mismatch=*/true}; + }); + + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + auto payload = ReadPayload(std::string(10, '1')); + auto metadata = google::storage::v2::Object{}; + metadata.set_content_encoding("gzip"); + metadata.set_size(10); // Set size equal to received bytes! + payload.set_metadata(std::move(metadata)); + return make_ready_future(ReadResponse(std::move(payload))); + }) + .WillOnce([] { + return make_ready_future(ReadResponse(Status{})); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), std::move(hash_function), + std::move(hash_validator), mock_factory.AsStdFunction(), + /*requested_length=*/10, "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(10, '1'))); + // Should return InvalidArgumentError (mismatched checksums)! + EXPECT_THAT(tested.Read().get(), VariantWith(StatusIs(StatusCode::kInvalidArgument))); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/object_read_streambuf.cc b/google/cloud/storage/internal/object_read_streambuf.cc index 13300d9522637..fd531dab5b854 100644 --- a/google/cloud/storage/internal/object_read_streambuf.cc +++ b/google/cloud/storage/internal/object_read_streambuf.cc @@ -18,6 +18,7 @@ #include "google/cloud/internal/make_status.h" #include "google/cloud/log.h" #include +#include #include #include #include @@ -100,7 +101,7 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { std::string object_name_; std::size_t received_bytes_ = 0; bool is_transcoded_ = false; - bool logged_warning_ = false; + std::atomic logged_warning_{false}; }; absl::optional ExtractRequestedLength(ReadObjectRangeRequest const& request) { From 1682aceb6818cf229acbf33ae7525dda2d1a810e Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 10 Jun 2026 14:04:05 +0000 Subject: [PATCH 4/8] chore(storage): align overrun logging with Go client deferred pattern - Refactored overrun warning logs to print once at the end of the download (stream Close or completion/failure) rather than immediately on the first overrunning chunk. - Aligns the C++ GCS client's logging behavior with the official Google Cloud Go client (cloud.google.com/go/storage). - Ensures that the warning log reports the true, final total overrun byte count instead of a partial overrun count from the first chunk. - Sync Path: Removed CheckOverrun from the hot Read() path in ObjectReadStreambuf, moving it to Close() and the decorator destructor (~OverrunLoggingObjectReadSource). - Async Range Path: Deferred CheckOverrun in ReadRange to the final range_end and OnFinish paths. - Async Resume Path: Deferred CheckOverrun in AsyncReaderConnectionResume to the final status.ok, satisfied, and stop paths. - Updated all unit tests in object_read_streambuf_test.cc, read_range_test.cc, and reader_connection_resume_test.cc to drive the streams to completion (EOF/Close) before verifying the logs. --- .../storage/internal/async/read_range.cc | 4 +-- .../storage/internal/async/read_range_test.cc | 8 ++--- .../async/reader_connection_resume.cc | 4 ++- .../async/reader_connection_resume_test.cc | 34 +++++++++++++------ .../storage/internal/object_read_streambuf.cc | 7 ++-- .../internal/object_read_streambuf_test.cc | 2 ++ 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/google/cloud/storage/internal/async/read_range.cc b/google/cloud/storage/internal/async/read_range.cc index d210a1a3c3679..b708d15857642 100644 --- a/google/cloud/storage/internal/async/read_range.cc +++ b/google/cloud/storage/internal/async/read_range.cc @@ -65,6 +65,7 @@ void ReadRange::OnFinish(Status status) { std::unique_lock lk(mu_); if (status_) return; status_ = std::move(status); + CheckOverrun(); if (!wait_) return; auto p = std::move(*wait_); wait_.reset(); @@ -93,10 +94,9 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data, if (length_ != 0) length_ -= std::min(content.size(), length_); auto p = ReadPayloadImpl::Make(std::move(content)); - CheckOverrun(); - if (data.range_end()) { status_ = Status{}; + CheckOverrun(); auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); bool transcoded_download = is_transcoded_ && (!object_size_.has_value() || diff --git a/google/cloud/storage/internal/async/read_range_test.cc b/google/cloud/storage/internal/async/read_range_test.cc index ec37d36cd80cd..34ed8b32de8e9 100644 --- a/google/cloud/storage/internal/async/read_range_test.cc +++ b/google/cloud/storage/internal/async/read_range_test.cc @@ -370,7 +370,7 @@ TEST(ReadRange, OverrunLogging) { auto constexpr kData0 = R"pb( checksummed_data { content: "012345678901234" } read_range { read_offset: 0 read_length: 10 read_id: 7 } - range_end: false + range_end: true )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); @@ -419,7 +419,7 @@ TEST(ReadRange, ReadLastOvershootLogging) { content: "012345678901234567890123456789012345678901234567890123456789" } read_range { read_offset: -50 read_length: 50 read_id: 7 } - range_end: false + range_end: true )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); @@ -464,7 +464,7 @@ TEST(ReadRange, ZeroLengthOverrunLogging) { auto constexpr kData0 = R"pb( checksummed_data { content: "1" } read_range { read_offset: 0 read_length: 0 read_id: 7 } - range_end: false + range_end: true )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); @@ -491,7 +491,7 @@ TEST(ReadRange, ReadLastOverrunLogging) { auto constexpr kData0 = R"pb( checksummed_data { content: "0123456789" } read_range { read_offset: -5 read_length: 5 read_id: 7 } - range_end: false + range_end: true )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index 4cb217c3c1bd9..8bb80021bccab 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -72,11 +72,11 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { is_transcoded_ = true; } } - CheckOverrun(); return make_ready_future(ReadResponse(std::move(response))); } auto const& status = absl::get(r); if (status.ok()) { + CheckOverrun(); // The download finished. Validate the hash results, if any. auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); bool transcoded_download = is_transcoded_ && @@ -92,9 +92,11 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { } if (requested_length_.has_value() && *requested_length_ >= 0 && total_received_bytes_ >= *requested_length_) { + CheckOverrun(); return make_ready_future(ReadResponse(Status())); } if (resume_policy_->OnFinish(status) == ResumePolicy::kStop) { + CheckOverrun(); return make_ready_future(std::move(r)); } return Reconnect(); diff --git a/google/cloud/storage/internal/async/reader_connection_resume_test.cc b/google/cloud/storage/internal/async/reader_connection_resume_test.cc index 2bad5387d9856..878b56685cc82 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume_test.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume_test.cc @@ -506,10 +506,13 @@ TEST(AsyncReaderConnectionResume, OverrunLogging) { MockAsyncReaderConnectionFactory mock_factory; EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read).WillOnce([] { - // Return 15 bytes in one payload - return make_ready_future(ReadResponse(ReadPayload(std::string(15, '1')))); - }); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload(std::string(15, '1')))); + }) + .WillOnce([] { + return make_ready_future(ReadResponse(Status{})); + }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -526,6 +529,7 @@ TEST(AsyncReaderConnectionResume, OverrunLogging) { EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(15, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); auto lines = log.ExtractLines(); EXPECT_EQ(lines.size(), 1); @@ -576,9 +580,13 @@ TEST(AsyncReaderConnectionResume, ReadLastOvershootLogging) { MockAsyncReaderConnectionFactory mock_factory; EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read).WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload(std::string(60, '1')))); - }); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload(std::string(60, '1')))); + }) + .WillOnce([] { + return make_ready_future(ReadResponse(Status{})); + }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -596,6 +604,7 @@ TEST(AsyncReaderConnectionResume, ReadLastOvershootLogging) { EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(60, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); auto lines = log.ExtractLines(); EXPECT_EQ(lines.size(), 1); @@ -644,9 +653,13 @@ TEST(AsyncReaderConnectionResume, ZeroLengthOverrunLogging) { MockAsyncReaderConnectionFactory mock_factory; EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read).WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload("1"))); - }); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future(ReadResponse(ReadPayload("1"))); + }) + .WillOnce([] { + return make_ready_future(ReadResponse(Status{})); + }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -663,6 +676,7 @@ TEST(AsyncReaderConnectionResume, ZeroLengthOverrunLogging) { EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(1, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); auto lines = log.ExtractLines(); EXPECT_EQ(lines.size(), 1); diff --git a/google/cloud/storage/internal/object_read_streambuf.cc b/google/cloud/storage/internal/object_read_streambuf.cc index fd531dab5b854..13fa3d10b4d4e 100644 --- a/google/cloud/storage/internal/object_read_streambuf.cc +++ b/google/cloud/storage/internal/object_read_streambuf.cc @@ -51,7 +51,9 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { bucket_name_(std::move(bucket_name)), object_name_(std::move(object_name)) {} - ~OverrunLoggingObjectReadSource() override = default; + ~OverrunLoggingObjectReadSource() override { + CheckOverrun(); + } bool IsOpen() const override { return child_->IsOpen(); } @@ -77,9 +79,6 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { is_transcoded_ = true; } - // Check overrun immediately - CheckOverrun(); - return res; } diff --git a/google/cloud/storage/internal/object_read_streambuf_test.cc b/google/cloud/storage/internal/object_read_streambuf_test.cc index 46348b1bdddae..5c806e989de4b 100644 --- a/google/cloud/storage/internal/object_read_streambuf_test.cc +++ b/google/cloud/storage/internal/object_read_streambuf_test.cc @@ -192,6 +192,7 @@ TEST(ObjectReadStreambufTest, OverrunLogging) { std::vector v(25); stream.read(v.data(), 25); + buf.Close(); auto lines = log.ExtractLines(); EXPECT_EQ(lines.size(), 1); EXPECT_THAT( @@ -223,6 +224,7 @@ TEST(ObjectReadStreambufTest, OverrunLoggingCharByChar) { } EXPECT_EQ(count, 15); + buf.Close(); auto lines = log.ExtractLines(); EXPECT_EQ(lines.size(), 1); EXPECT_THAT( From 9b4616b3005574c5af2d0191bb4f11387ea61eac Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 10 Jun 2026 14:57:20 +0000 Subject: [PATCH 5/8] chore(storage): format code with clang-format to pass CI style checks --- .../storage/internal/async/connection_impl.cc | 21 +++-- .../storage/internal/async/read_range.cc | 10 +- .../cloud/storage/internal/async/read_range.h | 3 +- .../storage/internal/async/read_range_test.cc | 17 ++-- .../async/reader_connection_resume.cc | 20 ++-- .../async/reader_connection_resume_test.cc | 94 +++++++++---------- .../storage/internal/object_read_streambuf.cc | 25 ++--- .../internal/object_read_streambuf_test.cc | 6 +- 8 files changed, 97 insertions(+), 99 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 303e420f91e86..af7ed6e5882e6 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -452,7 +452,7 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { return pending.then( [current, request = std::move(p.request), persisted_size, fa = std::move(factory)](auto f) mutable - -> StatusOr> { + -> StatusOr> { return MakeAppendableWriter(current, std::move(request), persisted_size, std::move(fa), f.get()); }); @@ -494,7 +494,7 @@ AsyncConnectionImpl::StartBufferedUpload(UploadParams p) { return StartUnbufferedUpload(std::move(p)) .then([current = std::move(current), async_write_object = std::move(async_write_object)](auto f) mutable - -> StatusOr> { + -> StatusOr> { auto w = f.get(); if (!w) return std::move(w).status(); auto factory = [upload_id = (*w)->UploadId(), @@ -528,14 +528,15 @@ AsyncConnectionImpl::ResumeBufferedUpload(ResumeUploadParams p) { }; auto f = make_unbuffered(); - return f.then([current = std::move(current), - make_unbuffered = std::move(make_unbuffered)](auto f) mutable - -> StatusOr> { - auto w = f.get(); - if (!w) return std::move(w).status(); - return MakeWriterConnectionBuffered(std::move(make_unbuffered), - *std::move(w), *current); - }); + return f.then( + [current = std::move(current), + make_unbuffered = std::move(make_unbuffered)](auto f) mutable + -> StatusOr> { + auto w = f.get(); + if (!w) return std::move(w).status(); + return MakeWriterConnectionBuffered(std::move(make_unbuffered), + *std::move(w), *current); + }); } future> diff --git a/google/cloud/storage/internal/async/read_range.cc b/google/cloud/storage/internal/async/read_range.cc index b708d15857642..f73d18f3f5182 100644 --- a/google/cloud/storage/internal/async/read_range.cc +++ b/google/cloud/storage/internal/async/read_range.cc @@ -98,9 +98,10 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data, status_ = Status{}; CheckOverrun(); auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - bool transcoded_download = is_transcoded_ && - (!object_size_.has_value() || - (received_bytes_ != static_cast(*object_size_))); + bool transcoded_download = + is_transcoded_ && + (!object_size_.has_value() || + (received_bytes_ != static_cast(*object_size_))); if (result.is_mismatch && !transcoded_download) { status_ = google::cloud::internal::DataLossError( absl::StrCat("mismatched checksums detected at the end of the " @@ -127,8 +128,7 @@ void ReadRange::CheckOverrun() { GCP_LOG(WARNING) << "storage: received " << (received_bytes_ - *requested_length_) << " more bytes than requested from GCS for bucket \"" - << bucket_name_ << "\", object \"" << object_name_ - << "\""; + << bucket_name_ << "\", object \"" << object_name_ << "\""; } } diff --git a/google/cloud/storage/internal/async/read_range.h b/google/cloud/storage/internal/async/read_range.h index 84ee3a79a50cc..4afee370f9244 100644 --- a/google/cloud/storage/internal/async/read_range.h +++ b/google/cloud/storage/internal/async/read_range.h @@ -80,7 +80,8 @@ class ReadRange { future Read(); void OnFinish(Status status); - void OnRead(google::storage::v2::ObjectRangeData data, bool is_transcoded = false, + void OnRead(google::storage::v2::ObjectRangeData data, + bool is_transcoded = false, absl::optional object_size = absl::nullopt); private: diff --git a/google/cloud/storage/internal/async/read_range_test.cc b/google/cloud/storage/internal/async/read_range_test.cc index 34ed8b32de8e9..c1391b2cf41c6 100644 --- a/google/cloud/storage/internal/async/read_range_test.cc +++ b/google/cloud/storage/internal/async/read_range_test.cc @@ -21,8 +21,8 @@ #include "google/cloud/storage/testing/canonical_errors.h" #include "google/cloud/storage/testing/mock_hash_function.h" #include "google/cloud/testing_util/is_proto_equal.h" -#include "google/cloud/testing_util/status_matchers.h" #include "google/cloud/testing_util/scoped_log.h" +#include "google/cloud/testing_util/status_matchers.h" #include "absl/strings/cord.h" #include "absl/strings/string_view.h" #include @@ -43,16 +43,14 @@ using ::google::cloud::storage::testing::MockHashFunction; using ::google::cloud::storage::testing::canonical_errors::PermanentError; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsProtoEqual; -using ::google::cloud::testing_util::StatusIs; using ::google::cloud::testing_util::ScopedLog; -using ::testing::HasSubstr; -using ::testing::VariantWith; -using ::testing::_; +using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; using ::testing::_; using ::testing::An; using ::testing::AtLeast; using ::testing::ElementsAre; +using ::testing::HasSubstr; using ::testing::Optional; using ::testing::ResultOf; using ::testing::VariantWith; @@ -388,7 +386,8 @@ TEST(ReadRange, OverrunLogging) { TEST(ReadRange, ReadLastLessIndexNoLogging) { ScopedLog log; - // Simulating ReadLast(100) where GCS returns 50 bytes (e.g. object size is 50) + // Simulating ReadLast(100) where GCS returns 50 bytes (e.g. object size is + // 50) ReadRange actual(-100, 100, "my-bucket", "my-object"); auto data = google::storage::v2::ObjectRangeData{}; @@ -508,8 +507,10 @@ TEST(ReadRange, ReadLastOverrunLogging) { } TEST(ReadRange, TranscodingIgnoresChecksumMismatch) { - auto hash_function = std::make_shared(); - auto hash_validator = std::make_unique(); + auto hash_function = + std::make_shared(); + auto hash_validator = + std::make_unique(); storage::internal::HashValues expected_hashes; expected_hashes.crc32c = "wrong-crc32c"; diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index 8bb80021bccab..cd747532c6824 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -79,10 +79,11 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { CheckOverrun(); // The download finished. Validate the hash results, if any. auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - bool transcoded_download = is_transcoded_ && - (!object_size_.has_value() || - (total_received_bytes_ != *object_size_)); - if (!result.is_mismatch || transcoded_download) return make_ready_future(std::move(r)); + bool transcoded_download = + is_transcoded_ && + (!object_size_.has_value() || (total_received_bytes_ != *object_size_)); + if (!result.is_mismatch || transcoded_download) + return make_ready_future(std::move(r)); return make_ready_future(ReadResponse(internal::InvalidArgumentError( absl::StrCat("mismatched checksums detected at the end of the " "download, received={", @@ -105,8 +106,8 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { future AsyncReaderConnectionResume::Reconnect() { // Capturing `this` is safe here. See the comments in the implementation of // `Read()` for details. - if (CurrentImpl() != nullptr && requested_length_.has_value() && *requested_length_ >= 0 && - total_received_bytes_ >= *requested_length_) { + if (CurrentImpl() != nullptr && requested_length_.has_value() && + *requested_length_ >= 0 && total_received_bytes_ >= *requested_length_) { return make_ready_future(ReadResponse(Status())); } return reader_factory_(generation_, received_bytes_).then([this](auto f) { @@ -137,14 +138,13 @@ AsyncReaderConnectionResume::CurrentImpl() { void AsyncReaderConnectionResume::CheckOverrun() { if (requested_length_.has_value() && *requested_length_ >= 0 && - total_received_bytes_ > *requested_length_ && - !is_transcoded_ && !logged_warning_) { + total_received_bytes_ > *requested_length_ && !is_transcoded_ && + !logged_warning_) { logged_warning_ = true; GCP_LOG(WARNING) << "storage: received " << (total_received_bytes_ - *requested_length_) << " more bytes than requested from GCS for bucket \"" - << bucket_name_ << "\", object \"" << object_name_ - << "\""; + << bucket_name_ << "\", object \"" << object_name_ << "\""; } } diff --git a/google/cloud/storage/internal/async/reader_connection_resume_test.cc b/google/cloud/storage/internal/async/reader_connection_resume_test.cc index 878b56685cc82..e58ead9f3f7ea 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume_test.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume_test.cc @@ -26,8 +26,8 @@ #include "google/cloud/storage/testing/mock_resume_policy.h" #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/mock_async_streaming_read_rpc.h" -#include "google/cloud/testing_util/status_matchers.h" #include "google/cloud/testing_util/scoped_log.h" +#include "google/cloud/testing_util/status_matchers.h" #include #include @@ -45,14 +45,14 @@ using ::google::cloud::storage::testing::MockResumePolicy; using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsProtoEqual; -using ::google::cloud::testing_util::StatusIs; using ::google::cloud::testing_util::ScopedLog; -using ::testing::HasSubstr; +using ::google::cloud::testing_util::StatusIs; using ::testing::_; using ::testing::AllOf; using ::testing::AtMost; using ::testing::ElementsAre; using ::testing::Eq; +using ::testing::HasSubstr; using ::testing::IsEmpty; using ::testing::Optional; using ::testing::Pair; @@ -508,11 +508,10 @@ TEST(AsyncReaderConnectionResume, OverrunLogging) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Read) .WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload(std::string(15, '1')))); + return make_ready_future( + ReadResponse(ReadPayload(std::string(15, '1')))); }) - .WillOnce([] { - return make_ready_future(ReadResponse(Status{})); - }); + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -524,8 +523,8 @@ TEST(AsyncReaderConnectionResume, OverrunLogging) { AsyncReaderConnectionResume tested( std::move(resume_policy), storage::internal::CreateNullHashFunction(), storage::internal::CreateNullHashValidator(), - mock_factory.AsStdFunction(), /*requested_length=*/10, - "my-bucket", "my-object"); + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(15, '1'))); @@ -547,11 +546,10 @@ TEST(AsyncReaderConnectionResume, ReadLastLessIndexNoLogging) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Read) .WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload(std::string(50, '1')))); + return make_ready_future( + ReadResponse(ReadPayload(std::string(50, '1')))); }) - .WillOnce([] { - return make_ready_future(ReadResponse(Status{})); - }); + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -564,8 +562,8 @@ TEST(AsyncReaderConnectionResume, ReadLastLessIndexNoLogging) { AsyncReaderConnectionResume tested( std::move(resume_policy), storage::internal::CreateNullHashFunction(), storage::internal::CreateNullHashValidator(), - mock_factory.AsStdFunction(), /*requested_length=*/100, - "my-bucket", "my-object"); + mock_factory.AsStdFunction(), /*requested_length=*/100, "my-bucket", + "my-object"); EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(50, '1'))); @@ -582,11 +580,10 @@ TEST(AsyncReaderConnectionResume, ReadLastOvershootLogging) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Read) .WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload(std::string(60, '1')))); + return make_ready_future( + ReadResponse(ReadPayload(std::string(60, '1')))); }) - .WillOnce([] { - return make_ready_future(ReadResponse(Status{})); - }); + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -599,8 +596,8 @@ TEST(AsyncReaderConnectionResume, ReadLastOvershootLogging) { AsyncReaderConnectionResume tested( std::move(resume_policy), storage::internal::CreateNullHashFunction(), storage::internal::CreateNullHashValidator(), - mock_factory.AsStdFunction(), /*requested_length=*/50, - "my-bucket", "my-object"); + mock_factory.AsStdFunction(), /*requested_length=*/50, "my-bucket", + "my-object"); EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(60, '1'))); @@ -638,8 +635,8 @@ TEST(AsyncReaderConnectionResume, TranscodingSuppressesWarning) { AsyncReaderConnectionResume tested( std::move(resume_policy), storage::internal::CreateNullHashFunction(), storage::internal::CreateNullHashValidator(), - mock_factory.AsStdFunction(), /*requested_length=*/10, - "my-bucket", "my-object"); + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(15, '1'))); @@ -654,12 +651,9 @@ TEST(AsyncReaderConnectionResume, ZeroLengthOverrunLogging) { EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { auto mock = std::make_unique(); EXPECT_CALL(*mock, Read) - .WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload("1"))); - }) - .WillOnce([] { - return make_ready_future(ReadResponse(Status{})); - }); + .WillOnce( + [] { return make_ready_future(ReadResponse(ReadPayload("1"))); }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -671,8 +665,8 @@ TEST(AsyncReaderConnectionResume, ZeroLengthOverrunLogging) { AsyncReaderConnectionResume tested( std::move(resume_policy), storage::internal::CreateNullHashFunction(), storage::internal::CreateNullHashValidator(), - mock_factory.AsStdFunction(), /*requested_length=*/0, - "my-bucket", "my-object"); + mock_factory.AsStdFunction(), /*requested_length=*/0, "my-bucket", + "my-object"); EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(1, '1'))); @@ -713,9 +707,7 @@ TEST(AsyncReaderConnectionResume, TranscodingIgnoresChecksumMismatch) { payload.set_metadata(std::move(metadata)); return make_ready_future(ReadResponse(std::move(payload))); }) - .WillOnce([] { - return make_ready_future(ReadResponse(Status{})); - }); + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -741,7 +733,8 @@ TEST(AsyncReaderConnectionResume, NoResumeIfRequestSatisfied) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Read) .WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload(std::string(10, '1')))); + return make_ready_future( + ReadResponse(ReadPayload(std::string(10, '1')))); }) .WillOnce([] { // Simulate transient error to trigger reconnection @@ -752,20 +745,21 @@ TEST(AsyncReaderConnectionResume, NoResumeIfRequestSatisfied) { std::unique_ptr(std::move(mock)))); }); - // Since we received 10 bytes for a 10-byte request, we should NOT attempt to reconnect! - // The factory's Call should NOT be called a second time. + // Since we received 10 bytes for a 10-byte request, we should NOT attempt to + // reconnect! The factory's Call should NOT be called a second time. EXPECT_CALL(mock_factory, Call(WithGeneration(1234), 10)).Times(0); auto resume_policy = std::make_unique(); EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); - // Reconnect is bypassed, so OnFinish should NOT be called because we return OK directly. + // Reconnect is bypassed, so OnFinish should NOT be called because we return + // OK directly. EXPECT_CALL(*resume_policy, OnFinish).Times(0); AsyncReaderConnectionResume tested( std::move(resume_policy), storage::internal::CreateNullHashFunction(), storage::internal::CreateNullHashValidator(), - mock_factory.AsStdFunction(), /*requested_length=*/10, - "my-bucket", "my-object"); + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(10, '1'))); @@ -779,7 +773,8 @@ TEST(AsyncReaderConnectionResume, NoResumeIfRequestExceeded) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Read) .WillOnce([] { - return make_ready_future(ReadResponse(ReadPayload(std::string(15, '1')))); + return make_ready_future( + ReadResponse(ReadPayload(std::string(15, '1')))); }) .WillOnce([] { // Simulate transient error to trigger reconnection @@ -790,8 +785,8 @@ TEST(AsyncReaderConnectionResume, NoResumeIfRequestExceeded) { std::unique_ptr(std::move(mock)))); }); - // Since we received 15 bytes for a 10-byte request, we should NOT attempt to reconnect! - // The factory's Call should NOT be called a second time. + // Since we received 15 bytes for a 10-byte request, we should NOT attempt to + // reconnect! The factory's Call should NOT be called a second time. EXPECT_CALL(mock_factory, Call(WithGeneration(1234), 15)).Times(0); auto resume_policy = std::make_unique(); @@ -801,8 +796,8 @@ TEST(AsyncReaderConnectionResume, NoResumeIfRequestExceeded) { AsyncReaderConnectionResume tested( std::move(resume_policy), storage::internal::CreateNullHashFunction(), storage::internal::CreateNullHashValidator(), - mock_factory.AsStdFunction(), /*requested_length=*/10, - "my-bucket", "my-object"); + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(15, '1'))); @@ -833,13 +828,11 @@ TEST(AsyncReaderConnectionResume, TranscodingFailsOnRealChecksumMismatch) { auto payload = ReadPayload(std::string(10, '1')); auto metadata = google::storage::v2::Object{}; metadata.set_content_encoding("gzip"); - metadata.set_size(10); // Set size equal to received bytes! + metadata.set_size(10); // Set size equal to received bytes! payload.set_metadata(std::move(metadata)); return make_ready_future(ReadResponse(std::move(payload))); }) - .WillOnce([] { - return make_ready_future(ReadResponse(Status{})); - }); + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); return make_ready_future(make_status_or( std::unique_ptr(std::move(mock)))); @@ -856,7 +849,8 @@ TEST(AsyncReaderConnectionResume, TranscodingFailsOnRealChecksumMismatch) { EXPECT_THAT(tested.Read().get(), VariantWith(ContentsMatch(10, '1'))); // Should return InvalidArgumentError (mismatched checksums)! - EXPECT_THAT(tested.Read().get(), VariantWith(StatusIs(StatusCode::kInvalidArgument))); + EXPECT_THAT(tested.Read().get(), + VariantWith(StatusIs(StatusCode::kInvalidArgument))); } } // namespace diff --git a/google/cloud/storage/internal/object_read_streambuf.cc b/google/cloud/storage/internal/object_read_streambuf.cc index 13fa3d10b4d4e..d4f1d3d2723a6 100644 --- a/google/cloud/storage/internal/object_read_streambuf.cc +++ b/google/cloud/storage/internal/object_read_streambuf.cc @@ -41,19 +41,16 @@ std::streamoff InitialOffset(ReadObjectRangeRequest const& request) { class OverrunLoggingObjectReadSource : public ObjectReadSource { public: - OverrunLoggingObjectReadSource( - std::unique_ptr child, - absl::optional requested_length, - std::string bucket_name, - std::string object_name) + OverrunLoggingObjectReadSource(std::unique_ptr child, + absl::optional requested_length, + std::string bucket_name, + std::string object_name) : child_(std::move(child)), requested_length_(requested_length), bucket_name_(std::move(bucket_name)), object_name_(std::move(object_name)) {} - ~OverrunLoggingObjectReadSource() override { - CheckOverrun(); - } + ~OverrunLoggingObjectReadSource() override { CheckOverrun(); } bool IsOpen() const override { return child_->IsOpen(); } @@ -75,7 +72,8 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { } // Detect server-side transcoding - if (res->transformation.has_value() && *res->transformation == "gunzipped") { + if (res->transformation.has_value() && + *res->transformation == "gunzipped") { is_transcoded_ = true; } @@ -88,9 +86,11 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { received_bytes_ > static_cast(*requested_length_) && !is_transcoded_ && !logged_warning_) { logged_warning_ = true; - GCP_LOG(WARNING) << "storage: received " << (received_bytes_ - *requested_length_) + GCP_LOG(WARNING) << "storage: received " + << (received_bytes_ - *requested_length_) << " more bytes than requested from GCS for bucket \"" - << bucket_name_ << "\", object \"" << object_name_ << "\""; + << bucket_name_ << "\", object \"" << object_name_ + << "\""; } } @@ -103,7 +103,8 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { std::atomic logged_warning_{false}; }; -absl::optional ExtractRequestedLength(ReadObjectRangeRequest const& request) { +absl::optional ExtractRequestedLength( + ReadObjectRangeRequest const& request) { if (request.HasOption()) { auto range = request.GetOption().value(); return range.end - range.begin; diff --git a/google/cloud/storage/internal/object_read_streambuf_test.cc b/google/cloud/storage/internal/object_read_streambuf_test.cc index 5c806e989de4b..ad0ec17c46810 100644 --- a/google/cloud/storage/internal/object_read_streambuf_test.cc +++ b/google/cloud/storage/internal/object_read_streambuf_test.cc @@ -180,8 +180,7 @@ TEST(ObjectReadStreambufTest, OverrunLogging) { ScopedLog log; auto read_source = std::make_unique(); EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); - EXPECT_CALL(*read_source, Read) - .WillOnce(Return(ReadSourceResult{25, {}})); + EXPECT_CALL(*read_source, Read).WillOnce(Return(ReadSourceResult{25, {}})); EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") @@ -339,7 +338,8 @@ TEST(ObjectReadStreambufTest, FullObjectReadOverrunLogging) { auto read_source = std::make_unique(); EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); - // GCS returns 15 bytes, and tells us the object size is 10 bytes (in metadata) + // GCS returns 15 bytes, and tells us the object size is 10 bytes (in + // metadata) ReadSourceResult result{15, {}}; result.size = 10; From bf898464df487f957c944b1cb4822a615da11671 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 10 Jun 2026 17:42:08 +0000 Subject: [PATCH 6/8] chore(storage): format connection_impl.cc to match GCB CI checkers style exactly --- .../storage/internal/async/connection_impl.cc | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index af7ed6e5882e6..303e420f91e86 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -452,7 +452,7 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { return pending.then( [current, request = std::move(p.request), persisted_size, fa = std::move(factory)](auto f) mutable - -> StatusOr> { + -> StatusOr> { return MakeAppendableWriter(current, std::move(request), persisted_size, std::move(fa), f.get()); }); @@ -494,7 +494,7 @@ AsyncConnectionImpl::StartBufferedUpload(UploadParams p) { return StartUnbufferedUpload(std::move(p)) .then([current = std::move(current), async_write_object = std::move(async_write_object)](auto f) mutable - -> StatusOr> { + -> StatusOr> { auto w = f.get(); if (!w) return std::move(w).status(); auto factory = [upload_id = (*w)->UploadId(), @@ -528,15 +528,14 @@ AsyncConnectionImpl::ResumeBufferedUpload(ResumeUploadParams p) { }; auto f = make_unbuffered(); - return f.then( - [current = std::move(current), - make_unbuffered = std::move(make_unbuffered)](auto f) mutable - -> StatusOr> { - auto w = f.get(); - if (!w) return std::move(w).status(); - return MakeWriterConnectionBuffered(std::move(make_unbuffered), - *std::move(w), *current); - }); + return f.then([current = std::move(current), + make_unbuffered = std::move(make_unbuffered)](auto f) mutable + -> StatusOr> { + auto w = f.get(); + if (!w) return std::move(w).status(); + return MakeWriterConnectionBuffered(std::move(make_unbuffered), + *std::move(w), *current); + }); } future> From 6b740314a11797c15e5c296b00ebdf3ffbfb19f2 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Thu, 11 Jun 2026 04:45:23 +0000 Subject: [PATCH 7/8] chore(storage): apply PR review polish and safety improvements - Converted the logged_warning_ check-and-set in ObjectReadStreambuf to a thread-safe atomic exchange (!logged_warning_.exchange(true)) to prevent any potential check-then-act race conditions. - Added explicit static_cast casts to requested_length_ in both ReadRange and AsyncReaderConnectionResume overrun logging statements. This prevents signed/unsigned mixing arithmetic and conforms perfectly to strict compiler sign-comparison flags. - Documented and verified architectural correctness of total_received_bytes_ relative counting (essential for ranged reads starting at non-zero offsets) and the >0 read_limit check (mandatory for GCS proto3 default semantics). --- google/cloud/storage/internal/async/read_range.cc | 2 +- .../cloud/storage/internal/async/reader_connection_resume.cc | 2 +- google/cloud/storage/internal/object_read_streambuf.cc | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/internal/async/read_range.cc b/google/cloud/storage/internal/async/read_range.cc index f73d18f3f5182..16e50b41353ca 100644 --- a/google/cloud/storage/internal/async/read_range.cc +++ b/google/cloud/storage/internal/async/read_range.cc @@ -126,7 +126,7 @@ void ReadRange::CheckOverrun() { !is_transcoded_ && !logged_warning_) { logged_warning_ = true; GCP_LOG(WARNING) << "storage: received " - << (received_bytes_ - *requested_length_) + << (received_bytes_ - static_cast(*requested_length_)) << " more bytes than requested from GCS for bucket \"" << bucket_name_ << "\", object \"" << object_name_ << "\""; } diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index cd747532c6824..fe6da2cb519b4 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -142,7 +142,7 @@ void AsyncReaderConnectionResume::CheckOverrun() { !logged_warning_) { logged_warning_ = true; GCP_LOG(WARNING) << "storage: received " - << (total_received_bytes_ - *requested_length_) + << (total_received_bytes_ - static_cast(*requested_length_)) << " more bytes than requested from GCS for bucket \"" << bucket_name_ << "\", object \"" << object_name_ << "\""; } diff --git a/google/cloud/storage/internal/object_read_streambuf.cc b/google/cloud/storage/internal/object_read_streambuf.cc index d4f1d3d2723a6..fbb91f9506cc2 100644 --- a/google/cloud/storage/internal/object_read_streambuf.cc +++ b/google/cloud/storage/internal/object_read_streambuf.cc @@ -84,8 +84,7 @@ class OverrunLoggingObjectReadSource : public ObjectReadSource { void CheckOverrun() { if (requested_length_.has_value() && *requested_length_ >= 0 && received_bytes_ > static_cast(*requested_length_) && - !is_transcoded_ && !logged_warning_) { - logged_warning_ = true; + !is_transcoded_ && !logged_warning_.exchange(true)) { GCP_LOG(WARNING) << "storage: received " << (received_bytes_ - *requested_length_) << " more bytes than requested from GCS for bucket \"" From c09eeeca49b978e044a206504bd005b0b7f06560 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Thu, 11 Jun 2026 04:57:17 +0000 Subject: [PATCH 8/8] chore(storage): wrap long lines in CheckOverrun log statements to satisfy GCB checkers --- google/cloud/storage/internal/async/read_range.cc | 3 ++- .../cloud/storage/internal/async/reader_connection_resume.cc | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/async/read_range.cc b/google/cloud/storage/internal/async/read_range.cc index 16e50b41353ca..7145f400998ee 100644 --- a/google/cloud/storage/internal/async/read_range.cc +++ b/google/cloud/storage/internal/async/read_range.cc @@ -126,7 +126,8 @@ void ReadRange::CheckOverrun() { !is_transcoded_ && !logged_warning_) { logged_warning_ = true; GCP_LOG(WARNING) << "storage: received " - << (received_bytes_ - static_cast(*requested_length_)) + << (received_bytes_ - + static_cast(*requested_length_)) << " more bytes than requested from GCS for bucket \"" << bucket_name_ << "\", object \"" << object_name_ << "\""; } diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index fe6da2cb519b4..6772031bae11b 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -142,7 +142,8 @@ void AsyncReaderConnectionResume::CheckOverrun() { !logged_warning_) { logged_warning_ = true; GCP_LOG(WARNING) << "storage: received " - << (total_received_bytes_ - static_cast(*requested_length_)) + << (total_received_bytes_ - + static_cast(*requested_length_)) << " more bytes than requested from GCS for bucket \"" << bucket_name_ << "\", object \"" << object_name_ << "\""; }