diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 8aff6b5fd4c9b..303e420f91e86 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -317,11 +317,19 @@ AsyncConnectionImpl::ReadObject(ReadObjectParams p) { CreateHashFunction(*current)); auto hash_validator = CreateHashValidator(p.request, *current); + absl::optional requested_length; + if (p.request.read_limit() > 0) { + requested_length = p.request.read_limit(); + } + auto bucket = p.request.bucket(); + auto object = p.request.object(); + auto connection_factory = MakeReaderConnectionFactory( std::move(current), std::move(p.request), hash_function); auto connection = std::make_unique( std::move(resume_policy), std::move(hash_function), - std::move(hash_validator), std::move(connection_factory)); + std::move(hash_validator), std::move(connection_factory), + requested_length, std::move(bucket), std::move(object)); return make_ready_future(ReturnType(std::move(connection))); } diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 4630c8f3caa68..35f09b9bb6dd1 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -26,6 +26,7 @@ #include "google/cloud/grpc_error_delegate.h" #include "google/cloud/internal/opentelemetry.h" #include "google/rpc/status.pb.h" +#include #include #include @@ -165,8 +166,20 @@ std::unique_ptr ObjectDescriptorImpl::Read( auto hash_function = CreateHashFunction(is_full_read); auto hash_validator = CreateHashValidator(is_full_read); - auto range = std::make_shared(p.start, p.length, hash_function, - std::move(hash_validator)); + absl::optional limit; + if (p.start < 0) { + if (p.start == (std::numeric_limits::min)()) { + limit = (std::numeric_limits::max)(); + } else { + limit = -p.start; + } + } else if (p.length > 0) { + limit = p.length; + } + + auto range = std::make_shared( + p.start, limit, hash_function, std::move(hash_validator), + read_object_spec_.bucket(), read_object_spec_.object()); std::unique_lock lk(mu_); if (stream_manager_->Empty()) { @@ -329,8 +342,12 @@ void ObjectDescriptorImpl::OnRead( std::move(*response->mutable_read_handle()); } auto copy = it->active_ranges; - // Release the lock while notifying the ranges. The notifications may trigger - // application code, and that code may callback on this class. + bool is_transcoded = false; + absl::optional object_size; + if (metadata_.has_value()) { + is_transcoded = metadata_->content_encoding() == "gzip"; + object_size = metadata_->size(); + } lk.unlock(); for (auto& range_data : *response->mutable_object_data_ranges()) { auto id = range_data.read_range().read_id(); @@ -338,7 +355,7 @@ void ObjectDescriptorImpl::OnRead( if (l == copy.end()) continue; // TODO(#15104) - Consider returning if the range is done, and then // skipping CleanupDoneRanges(). - l->second->OnRead(std::move(range_data)); + l->second->OnRead(std::move(range_data), is_transcoded, object_size); } lk.lock(); stream_manager_->CleanupDoneRanges(it); diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_test.cc b/google/cloud/storage/internal/async/object_descriptor_reader_test.cc index 3523ba082f959..3788e06475094 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_test.cc @@ -39,7 +39,7 @@ TEST(ObjectDescriptorReader, Basic) { range_end: false )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); - impl->OnRead(std::move(data)); + impl->OnRead(std::move(data), /*is_transcoded=*/false); auto actual = tested.Read().get(); EXPECT_THAT(actual, diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc index 5b759637e3596..98dc46eb9045a 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc @@ -54,7 +54,7 @@ TEST(ObjectDescriptorReaderTracing, Read) { range_end: false )pb"; EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); - impl->OnRead(std::move(data)); + impl->OnRead(std::move(data), /*is_transcoded=*/false); auto actual = reader->Read().get(); auto spans = span_catcher->GetSpans(); diff --git a/google/cloud/storage/internal/async/read_range.cc b/google/cloud/storage/internal/async/read_range.cc index 9141b012a33d6..7145f400998ee 100644 --- a/google/cloud/storage/internal/async/read_range.cc +++ b/google/cloud/storage/internal/async/read_range.cc @@ -36,6 +36,10 @@ absl::optional ReadRange::RangeForResume( range.set_read_id(read_id); std::lock_guard lk(mu_); if (status_.has_value()) return absl::nullopt; + if (requested_length_.has_value() && *requested_length_ >= 0 && + received_bytes_ >= static_cast(*requested_length_)) { + return absl::nullopt; + } range.set_read_offset(offset_); range.set_read_length(length_); return range; @@ -61,6 +65,7 @@ void ReadRange::OnFinish(Status status) { std::unique_lock lk(mu_); if (status_) return; status_ = std::move(status); + CheckOverrun(); if (!wait_) return; auto p = std::move(*wait_); wait_.reset(); @@ -68,8 +73,12 @@ void ReadRange::OnFinish(Status status) { p.set_value(*status_); } -void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { +void ReadRange::OnRead(google::storage::v2::ObjectRangeData data, + bool is_transcoded, + absl::optional object_size) { std::unique_lock lk(mu_); + is_transcoded_ = is_transcoded_ || is_transcoded; + if (object_size.has_value()) object_size_ = object_size; if (status_) return; auto* check_summed_data = data.mutable_checksummed_data(); auto content = StealMutableContent(*data.mutable_checksummed_data()); @@ -81,13 +90,19 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { } offset_ += content.size(); + received_bytes_ += content.size(); if (length_ != 0) length_ -= std::min(content.size(), length_); auto p = ReadPayloadImpl::Make(std::move(content)); if (data.range_end()) { status_ = Status{}; + CheckOverrun(); auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - if (result.is_mismatch) { + bool transcoded_download = + is_transcoded_ && + (!object_size_.has_value() || + (received_bytes_ != static_cast(*object_size_))); + if (result.is_mismatch && !transcoded_download) { status_ = google::cloud::internal::DataLossError( absl::StrCat("mismatched checksums detected at the end of the " "download, received={", @@ -105,6 +120,19 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) { payload_ = std::move(p); } +void ReadRange::CheckOverrun() { + if (requested_length_.has_value() && *requested_length_ >= 0 && + received_bytes_ > static_cast(*requested_length_) && + !is_transcoded_ && !logged_warning_) { + logged_warning_ = true; + GCP_LOG(WARNING) << "storage: received " + << (received_bytes_ - + static_cast(*requested_length_)) + << " more bytes than requested from GCS for bucket \"" + << bucket_name_ << "\", object \"" << object_name_ << "\""; + } +} + void ReadRange::Notify(std::unique_lock lk, storage::ReadPayload p) { auto wait = std::move(*wait_); diff --git a/google/cloud/storage/internal/async/read_range.h b/google/cloud/storage/internal/async/read_range.h index b315e36b01ea8..4afee370f9244 100644 --- a/google/cloud/storage/internal/async/read_range.h +++ b/google/cloud/storage/internal/async/read_range.h @@ -45,15 +45,32 @@ class ReadRange { public: using ReadResponse = storage::AsyncReaderConnection::ReadResponse; - ReadRange(std::int64_t offset, std::int64_t length, + ReadRange(std::int64_t offset, absl::optional requested_length, + std::shared_ptr hash_function, + std::unique_ptr hash_validator, + std::string bucket_name, std::string object_name) + : offset_(offset), + requested_length_(requested_length), + length_(requested_length.value_or(0)), + bucket_name_(std::move(bucket_name)), + object_name_(std::move(object_name)), + hash_function_(std::move(hash_function)), + hash_validator_(std::move(hash_validator)) {} + + ReadRange(std::int64_t offset, absl::optional requested_length, std::shared_ptr hash_function = storage::internal::CreateNullHashFunction(), std::unique_ptr hash_validator = storage::internal::CreateNullHashValidator()) - : offset_(offset), - length_(length), - hash_function_(std::move(hash_function)), - hash_validator_(std::move(hash_validator)) {} + : ReadRange(offset, requested_length, std::move(hash_function), + std::move(hash_validator), {}, {}) {} + + ReadRange(std::int64_t offset, absl::optional requested_length, + std::string bucket_name, std::string object_name) + : ReadRange(offset, requested_length, + storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + std::move(bucket_name), std::move(object_name)) {} bool IsDone() const; @@ -63,14 +80,24 @@ class ReadRange { future Read(); void OnFinish(Status status); - void OnRead(google::storage::v2::ObjectRangeData data); + void OnRead(google::storage::v2::ObjectRangeData data, + bool is_transcoded = false, + absl::optional object_size = absl::nullopt); private: void Notify(std::unique_lock lk, storage::ReadPayload p); + void CheckOverrun(); mutable std::mutex mu_; std::int64_t offset_; + absl::optional requested_length_; std::int64_t length_; + std::string bucket_name_; + std::string object_name_; + std::size_t received_bytes_ = 0; + bool is_transcoded_ = false; + bool logged_warning_ = false; + absl::optional object_size_; absl::optional payload_; absl::optional status_; absl::optional> wait_; diff --git a/google/cloud/storage/internal/async/read_range_test.cc b/google/cloud/storage/internal/async/read_range_test.cc index a28cbd9b10c49..c1391b2cf41c6 100644 --- a/google/cloud/storage/internal/async/read_range_test.cc +++ b/google/cloud/storage/internal/async/read_range_test.cc @@ -21,6 +21,7 @@ #include "google/cloud/storage/testing/canonical_errors.h" #include "google/cloud/storage/testing/mock_hash_function.h" #include "google/cloud/testing_util/is_proto_equal.h" +#include "google/cloud/testing_util/scoped_log.h" #include "google/cloud/testing_util/status_matchers.h" #include "absl/strings/cord.h" #include "absl/strings/string_view.h" @@ -42,12 +43,14 @@ using ::google::cloud::storage::testing::MockHashFunction; using ::google::cloud::storage::testing::canonical_errors::PermanentError; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsProtoEqual; +using ::google::cloud::testing_util::ScopedLog; using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; using ::testing::_; using ::testing::An; using ::testing::AtLeast; using ::testing::ElementsAre; +using ::testing::HasSubstr; using ::testing::Optional; using ::testing::ResultOf; using ::testing::VariantWith; @@ -357,6 +360,227 @@ TEST(ReadRange, FullObjectChecksumValidationSuccessComposite) { EXPECT_THAT(next_read.get(), VariantWith(IsOk())); } +TEST(ReadRange, OverrunLogging) { + ScopedLog log; + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "012345678901234" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, ReadLastLessIndexNoLogging) { + ScopedLog log; + // Simulating ReadLast(100) where GCS returns 50 bytes (e.g. object size is + // 50) + ReadRange actual(-100, 100, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { + content: "01234567890123456789012345678901234567890123456789" + } + read_range { read_offset: -100 read_length: 100 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(ReadRange, ReadLastOvershootLogging) { + ScopedLog log; + // Simulating ReadLast(50) where GCS returns 60 bytes (overshoot by 10) + ReadRange actual(-50, 50, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { + content: "012345678901234567890123456789012345678901234567890123456789" + } + read_range { read_offset: -50 read_length: 50 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 10 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, TranscodingSuppressesWarning) { + ScopedLog log; + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "012345678901234" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data), /*is_transcoded=*/true); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(ReadRange, ZeroLengthOverrunLogging) { + ScopedLog log; + // Pass 0 as the limit (not nullopt, which would mean read to end). + ReadRange actual(0, absl::make_optional(0), "my-bucket", + "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "1" } + read_range { read_offset: 0 read_length: 0 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 1 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, ReadLastOverrunLogging) { + ScopedLog log; + // ReadLast(5) is represented by start = -5, limit = 5. + ReadRange actual(-5, absl::make_optional(5), "my-bucket", + "my-object"); + + // GCS returns 10 bytes (overrun of 5 bytes) + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "0123456789" } + read_range { read_offset: -5 read_length: 5 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ReadRange, TranscodingIgnoresChecksumMismatch) { + auto hash_function = + std::make_shared(); + auto hash_validator = + std::make_unique(); + + storage::internal::HashValues expected_hashes; + expected_hashes.crc32c = "wrong-crc32c"; + hash_validator->ProcessHashValues(expected_hashes); + + ReadRange actual(0, 10, hash_function, std::move(hash_validator), "my-bucket", + "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "0123456789" crc32c: 576848900 } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: true + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + // Pass is_transcoded = true to suppress the checksum mismatch error! + actual.OnRead(std::move(data), /*is_transcoded=*/true); + + EXPECT_TRUE(actual.IsDone()); + EXPECT_TRUE(pending.is_ready()); + EXPECT_THAT(pending.get(), VariantWith(_)); + + // The second read should return Status::OK instead of DataLossError! + auto next_read = actual.Read(); + EXPECT_TRUE(next_read.is_ready()); + EXPECT_THAT(next_read.get(), VariantWith(IsOk())); +} + +TEST(ReadRange, NoResumeIfRequestSatisfied) { + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "0123456789" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + // The range is fully satisfied (10 bytes received for 10-byte request) + // RangeForResume should return nullopt to prevent resuming! + auto resume = actual.RangeForResume(42); + EXPECT_FALSE(resume.has_value()); +} + +TEST(ReadRange, NoResumeIfRequestExceeded) { + ReadRange actual(0, 10, "my-bucket", "my-object"); + + auto data = google::storage::v2::ObjectRangeData{}; + auto constexpr kData0 = R"pb( + checksummed_data { content: "012345678901234" } + read_range { read_offset: 0 read_length: 10 read_id: 7 } + range_end: false + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data)); + + auto pending = actual.Read(); + actual.OnRead(std::move(data)); + + // The range is exceeded (15 bytes received for 10-byte request) + // RangeForResume should return nullopt to prevent resuming! + auto resume = actual.RangeForResume(42); + EXPECT_FALSE(resume.has_value()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/reader_connection_resume.cc b/google/cloud/storage/internal/async/reader_connection_resume.cc index 8d7bd62e5c058..6772031bae11b 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume.cc @@ -15,6 +15,7 @@ #include "google/cloud/storage/internal/async/reader_connection_resume.h" #include "google/cloud/storage/internal/async/read_payload_impl.h" #include "google/cloud/internal/make_status.h" +#include "google/cloud/log.h" #include "absl/strings/str_cat.h" namespace google { @@ -61,16 +62,28 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { ReadPayloadImpl::GetObjectHashes(response).value_or( storage::internal::HashValues{})); received_bytes_ += response.size(); - if (response.metadata().has_value() && !generation_.has_value()) { - generation_ = storage::Generation(response.metadata()->generation()); + total_received_bytes_ += response.size(); + if (response.metadata().has_value()) { + if (!generation_.has_value()) { + generation_ = storage::Generation(response.metadata()->generation()); + } + object_size_ = response.metadata()->size(); + if (response.metadata()->content_encoding() == "gzip") { + is_transcoded_ = true; + } } return make_ready_future(ReadResponse(std::move(response))); } auto const& status = absl::get(r); if (status.ok()) { + CheckOverrun(); // The download finished. Validate the hash results, if any. auto result = std::move(*hash_validator_).Finish(hash_function_->Finish()); - if (!result.is_mismatch) return make_ready_future(std::move(r)); + bool transcoded_download = + is_transcoded_ && + (!object_size_.has_value() || (total_received_bytes_ != *object_size_)); + if (!result.is_mismatch || transcoded_download) + return make_ready_future(std::move(r)); return make_ready_future(ReadResponse(internal::InvalidArgumentError( absl::StrCat("mismatched checksums detected at the end of the " "download, received={", @@ -78,7 +91,13 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { FormatComputedHashes(result), "}"), GCP_ERROR_INFO()))); } + if (requested_length_.has_value() && *requested_length_ >= 0 && + total_received_bytes_ >= *requested_length_) { + CheckOverrun(); + return make_ready_future(ReadResponse(Status())); + } if (resume_policy_->OnFinish(status) == ResumePolicy::kStop) { + CheckOverrun(); return make_ready_future(std::move(r)); } return Reconnect(); @@ -87,6 +106,10 @@ future AsyncReaderConnectionResume::OnRead(ReadResponse r) { future AsyncReaderConnectionResume::Reconnect() { // Capturing `this` is safe here. See the comments in the implementation of // `Read()` for details. + if (CurrentImpl() != nullptr && requested_length_.has_value() && + *requested_length_ >= 0 && total_received_bytes_ >= *requested_length_) { + return make_ready_future(ReadResponse(Status())); + } return reader_factory_(generation_, received_bytes_).then([this](auto f) { return OnResume(f.get()); }); @@ -113,6 +136,19 @@ AsyncReaderConnectionResume::CurrentImpl() { return CurrentImpl(std::unique_lock(mu_)); } +void AsyncReaderConnectionResume::CheckOverrun() { + if (requested_length_.has_value() && *requested_length_ >= 0 && + total_received_bytes_ > *requested_length_ && !is_transcoded_ && + !logged_warning_) { + logged_warning_ = true; + GCP_LOG(WARNING) << "storage: received " + << (total_received_bytes_ - + static_cast(*requested_length_)) + << " more bytes than requested from GCS for bucket \"" + << bucket_name_ << "\", object \"" << object_name_ << "\""; + } +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud diff --git a/google/cloud/storage/internal/async/reader_connection_resume.h b/google/cloud/storage/internal/async/reader_connection_resume.h index 3b91f52362d55..02e2655a29a98 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume.h +++ b/google/cloud/storage/internal/async/reader_connection_resume.h @@ -41,11 +41,16 @@ class AsyncReaderConnectionResume : public storage::AsyncReaderConnection { std::unique_ptr resume_policy, std::shared_ptr hash, std::unique_ptr validator, - AsyncReaderConnectionFactory reader_factory) + AsyncReaderConnectionFactory reader_factory, + absl::optional requested_length = absl::nullopt, + std::string bucket_name = {}, std::string object_name = {}) : resume_policy_(std::move(resume_policy)), hash_function_(std::move(hash)), hash_validator_(std::move(validator)), - reader_factory_(std::move(reader_factory)) {} + reader_factory_(std::move(reader_factory)), + requested_length_(requested_length), + bucket_name_(std::move(bucket_name)), + object_name_(std::move(object_name)) {} void Cancel() override; @@ -61,13 +66,21 @@ class AsyncReaderConnectionResume : public storage::AsyncReaderConnection { std::shared_ptr CurrentImpl( std::unique_lock const&); std::shared_ptr CurrentImpl(); + void CheckOverrun(); std::unique_ptr resume_policy_; std::shared_ptr hash_function_; std::unique_ptr hash_validator_; AsyncReaderConnectionFactory reader_factory_; + absl::optional requested_length_; + std::string bucket_name_; + std::string object_name_; + bool is_transcoded_ = false; + bool logged_warning_ = false; + absl::optional object_size_; storage::Generation generation_; std::int64_t received_bytes_ = 0; + std::int64_t total_received_bytes_ = 0; std::mutex mu_; std::shared_ptr impl_; }; diff --git a/google/cloud/storage/internal/async/reader_connection_resume_test.cc b/google/cloud/storage/internal/async/reader_connection_resume_test.cc index 1352a872c5875..e58ead9f3f7ea 100644 --- a/google/cloud/storage/internal/async/reader_connection_resume_test.cc +++ b/google/cloud/storage/internal/async/reader_connection_resume_test.cc @@ -26,6 +26,7 @@ #include "google/cloud/storage/testing/mock_resume_policy.h" #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/mock_async_streaming_read_rpc.h" +#include "google/cloud/testing_util/scoped_log.h" #include "google/cloud/testing_util/status_matchers.h" #include #include @@ -44,12 +45,14 @@ using ::google::cloud::storage::testing::MockResumePolicy; using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsProtoEqual; +using ::google::cloud::testing_util::ScopedLog; using ::google::cloud::testing_util::StatusIs; using ::testing::_; using ::testing::AllOf; using ::testing::AtMost; using ::testing::ElementsAre; using ::testing::Eq; +using ::testing::HasSubstr; using ::testing::IsEmpty; using ::testing::Optional; using ::testing::Pair; @@ -498,6 +501,358 @@ TEST(AsyncReaderConnectionResume, StopAfterTooManyReconnects) { EXPECT_THAT(metadata.trailers, IsEmpty()); } +TEST(AsyncReaderConnectionResume, OverrunLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future( + ReadResponse(ReadPayload(std::string(15, '1')))); + }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(15, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(AsyncReaderConnectionResume, ReadLastLessIndexNoLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future( + ReadResponse(ReadPayload(std::string(50, '1')))); + }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + // ReadLast(100) + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/100, "my-bucket", + "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(50, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(AsyncReaderConnectionResume, ReadLastOvershootLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future( + ReadResponse(ReadPayload(std::string(60, '1')))); + }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + // ReadLast(50) + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/50, "my-bucket", + "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(60, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 10 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(AsyncReaderConnectionResume, TranscodingSuppressesWarning) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read).WillOnce([] { + auto payload = ReadPayload(std::string(15, '1')); + auto metadata = google::storage::v2::Object{}; + metadata.set_content_encoding("gzip"); + payload.set_metadata(std::move(metadata)); + return make_ready_future(ReadResponse(std::move(payload))); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(15, '1'))); + + auto lines = log.ExtractLines(); + EXPECT_TRUE(lines.empty()); +} + +TEST(AsyncReaderConnectionResume, ZeroLengthOverrunLogging) { + ScopedLog log; + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce( + [] { return make_ready_future(ReadResponse(ReadPayload("1"))); }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/0, "my-bucket", + "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(1, '1'))); + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); + + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 1 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(AsyncReaderConnectionResume, TranscodingIgnoresChecksumMismatch) { + auto hash_function = std::make_shared(); + EXPECT_CALL(*hash_function, Finish) + .WillOnce(Return(storage::internal::HashValues{"crc32c", "md5"})); + + auto hash_validator = std::make_unique(); + EXPECT_CALL(*hash_validator, ProcessHashValues).Times(1); + EXPECT_CALL(std::move(*hash_validator), Finish) + .WillOnce([](storage::internal::HashValues const&) { + return storage::internal::HashValidator::Result{ + /*.received=*/{"wrong-crc32c", "wrong-md5"}, + /*.computed=*/{"crc32c", "md5"}, + /*.is_mismatch=*/true}; + }); + + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + auto payload = ReadPayload(std::string(10, '1')); + auto metadata = google::storage::v2::Object{}; + metadata.set_content_encoding("gzip"); + payload.set_metadata(std::move(metadata)); + return make_ready_future(ReadResponse(std::move(payload))); + }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), std::move(hash_function), + std::move(hash_validator), mock_factory.AsStdFunction(), + /*requested_length=*/10, "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(10, '1'))); + // Should return Status::OK instead of DataLoss! + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); +} + +TEST(AsyncReaderConnectionResume, NoResumeIfRequestSatisfied) { + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future( + ReadResponse(ReadPayload(std::string(10, '1')))); + }) + .WillOnce([] { + // Simulate transient error to trigger reconnection + return make_ready_future(ReadResponse(TransientError())); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + // Since we received 10 bytes for a 10-byte request, we should NOT attempt to + // reconnect! The factory's Call should NOT be called a second time. + EXPECT_CALL(mock_factory, Call(WithGeneration(1234), 10)).Times(0); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + // Reconnect is bypassed, so OnFinish should NOT be called because we return + // OK directly. + EXPECT_CALL(*resume_policy, OnFinish).Times(0); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(10, '1'))); + // Should return Status::OK directly instead of TransientError! + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); +} + +TEST(AsyncReaderConnectionResume, NoResumeIfRequestExceeded) { + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + return make_ready_future( + ReadResponse(ReadPayload(std::string(15, '1')))); + }) + .WillOnce([] { + // Simulate transient error to trigger reconnection + return make_ready_future(ReadResponse(TransientError())); + }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + // Since we received 15 bytes for a 10-byte request, we should NOT attempt to + // reconnect! The factory's Call should NOT be called a second time. + EXPECT_CALL(mock_factory, Call(WithGeneration(1234), 15)).Times(0); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + EXPECT_CALL(*resume_policy, OnFinish).Times(0); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), storage::internal::CreateNullHashFunction(), + storage::internal::CreateNullHashValidator(), + mock_factory.AsStdFunction(), /*requested_length=*/10, "my-bucket", + "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(15, '1'))); + // Should return Status::OK directly instead of TransientError! + EXPECT_THAT(tested.Read().get(), VariantWith(IsOk())); +} + +TEST(AsyncReaderConnectionResume, TranscodingFailsOnRealChecksumMismatch) { + auto hash_function = std::make_shared(); + EXPECT_CALL(*hash_function, Finish) + .WillOnce(Return(storage::internal::HashValues{"crc32c", "md5"})); + + auto hash_validator = std::make_unique(); + EXPECT_CALL(*hash_validator, ProcessHashValues).Times(1); + EXPECT_CALL(std::move(*hash_validator), Finish) + .WillOnce([](storage::internal::HashValues const&) { + return storage::internal::HashValidator::Result{ + /*.received=*/{"wrong-crc32c", "wrong-md5"}, + /*.computed=*/{"crc32c", "md5"}, + /*.is_mismatch=*/true}; + }); + + MockAsyncReaderConnectionFactory mock_factory; + EXPECT_CALL(mock_factory, Call(WithGeneration(0), 0)).WillOnce([] { + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Read) + .WillOnce([] { + auto payload = ReadPayload(std::string(10, '1')); + auto metadata = google::storage::v2::Object{}; + metadata.set_content_encoding("gzip"); + metadata.set_size(10); // Set size equal to received bytes! + payload.set_metadata(std::move(metadata)); + return make_ready_future(ReadResponse(std::move(payload))); + }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + EXPECT_CALL(*mock, GetRequestMetadata).Times(AtMost(1)); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(mock)))); + }); + + auto resume_policy = std::make_unique(); + EXPECT_CALL(*resume_policy, OnStartSuccess).Times(1); + + AsyncReaderConnectionResume tested( + std::move(resume_policy), std::move(hash_function), + std::move(hash_validator), mock_factory.AsStdFunction(), + /*requested_length=*/10, "my-bucket", "my-object"); + + EXPECT_THAT(tested.Read().get(), + VariantWith(ContentsMatch(10, '1'))); + // Should return InvalidArgumentError (mismatched checksums)! + EXPECT_THAT(tested.Read().get(), + VariantWith(StatusIs(StatusCode::kInvalidArgument))); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/object_read_streambuf.cc b/google/cloud/storage/internal/object_read_streambuf.cc index 7bc8ae3e27d3b..fbb91f9506cc2 100644 --- a/google/cloud/storage/internal/object_read_streambuf.cc +++ b/google/cloud/storage/internal/object_read_streambuf.cc @@ -16,7 +16,9 @@ #include "google/cloud/storage/hash_mismatch_error.h" #include "google/cloud/storage/internal/hash_function.h" #include "google/cloud/internal/make_status.h" +#include "google/cloud/log.h" #include +#include #include #include #include @@ -37,12 +39,90 @@ std::streamoff InitialOffset(ReadObjectRangeRequest const& request) { return request.StartingByte(); } +class OverrunLoggingObjectReadSource : public ObjectReadSource { + public: + OverrunLoggingObjectReadSource(std::unique_ptr child, + absl::optional requested_length, + std::string bucket_name, + std::string object_name) + : child_(std::move(child)), + requested_length_(requested_length), + bucket_name_(std::move(bucket_name)), + object_name_(std::move(object_name)) {} + + ~OverrunLoggingObjectReadSource() override { CheckOverrun(); } + + bool IsOpen() const override { return child_->IsOpen(); } + + StatusOr Close() override { + auto res = child_->Close(); + CheckOverrun(); + return res; + } + + StatusOr Read(char* buf, std::size_t n) override { + auto res = child_->Read(buf, n); + if (!res) return res; + + received_bytes_ += res->bytes_received; + + // Dynamically learn size for full-object reads + if (res->size.has_value() && !requested_length_.has_value()) { + requested_length_ = *res->size; + } + + // Detect server-side transcoding + if (res->transformation.has_value() && + *res->transformation == "gunzipped") { + is_transcoded_ = true; + } + + return res; + } + + private: + void CheckOverrun() { + if (requested_length_.has_value() && *requested_length_ >= 0 && + received_bytes_ > static_cast(*requested_length_) && + !is_transcoded_ && !logged_warning_.exchange(true)) { + GCP_LOG(WARNING) << "storage: received " + << (received_bytes_ - *requested_length_) + << " more bytes than requested from GCS for bucket \"" + << bucket_name_ << "\", object \"" << object_name_ + << "\""; + } + } + + std::unique_ptr child_; + absl::optional requested_length_; + std::string bucket_name_; + std::string object_name_; + std::size_t received_bytes_ = 0; + bool is_transcoded_ = false; + std::atomic logged_warning_{false}; +}; + +absl::optional ExtractRequestedLength( + ReadObjectRangeRequest const& request) { + if (request.HasOption()) { + auto range = request.GetOption().value(); + return range.end - range.begin; + } + if (request.HasOption()) { + auto last = request.GetOption(); + return last.value(); + } + return absl::nullopt; +} + } // namespace ObjectReadStreambuf::ObjectReadStreambuf( ReadObjectRangeRequest const& request, std::unique_ptr source) - : source_(std::move(source)), + : source_(std::make_unique( + std::move(source), ExtractRequestedLength(request), + request.bucket_name(), request.object_name())), source_pos_(InitialOffset(request)), hash_function_(CreateHashFunction(request)), hash_validator_(CreateHashValidator(request)) {} diff --git a/google/cloud/storage/internal/object_read_streambuf_test.cc b/google/cloud/storage/internal/object_read_streambuf_test.cc index 47f2fc144364d..ad0ec17c46810 100644 --- a/google/cloud/storage/internal/object_read_streambuf_test.cc +++ b/google/cloud/storage/internal/object_read_streambuf_test.cc @@ -14,6 +14,7 @@ #include "google/cloud/storage/internal/object_read_streambuf.h" #include "google/cloud/storage/testing/mock_client.h" +#include "google/cloud/testing_util/scoped_log.h" #include #include #include @@ -26,6 +27,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace internal { namespace { +using ::google::cloud::testing_util::ScopedLog; +using ::testing::HasSubstr; using ::testing::Return; TEST(ObjectReadStreambufTest, FailedTellg) { @@ -173,6 +176,196 @@ TEST(ObjectReadStreambufTest, WrongSeek) { EXPECT_TRUE(stream.fail()); } +TEST(ObjectReadStreambufTest, OverrunLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read).WillOnce(Return(ReadSourceResult{25, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(0, 20)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(25); + stream.read(v.data(), 25); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, OverrunLoggingCharByChar) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{15, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(0, 10)), + std::move(read_source)); + + std::istream stream(&buf); + + // Read character by character + int count = 0; + while (stream.get() != EOF) { + count++; + } + EXPECT_EQ(count, 15); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, ReadLastLessIndexNoLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{50, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadLast(100)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(100); + stream.read(v.data(), 100); + + buf.Close(); + EXPECT_TRUE(log.ExtractLines().empty()); +} + +TEST(ObjectReadStreambufTest, ReadLastOvershootLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{60, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf( + ReadObjectRangeRequest("my-bucket", "my-object").set_option(ReadLast(50)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(100); + stream.read(v.data(), 100); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 10 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, EmptyRangeOverrunLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(ReadSourceResult{5, {}})) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(10, 10)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(10); + stream.read(v.data(), 10); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + +TEST(ObjectReadStreambufTest, TranscodingSuppressesWarning) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + + ReadSourceResult result{15, {}}; + result.transformation = "gunzipped"; + + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(result)) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object") + .set_option(ReadRange(0, 10)), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(20); + stream.read(v.data(), 20); + + buf.Close(); + EXPECT_TRUE(log.ExtractLines().empty()); +} + +TEST(ObjectReadStreambufTest, FullObjectReadOverrunLogging) { + ScopedLog log; + auto read_source = std::make_unique(); + EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true)); + + // GCS returns 15 bytes, and tells us the object size is 10 bytes (in + // metadata) + ReadSourceResult result{15, {}}; + result.size = 10; + + EXPECT_CALL(*read_source, Read) + .WillOnce(Return(result)) + .WillRepeatedly(Return(ReadSourceResult{0, {}})); + EXPECT_CALL(*read_source, Close()).WillRepeatedly(Return(HttpResponse{})); + + // Full object read request (no range options) + ObjectReadStreambuf buf(ReadObjectRangeRequest("my-bucket", "my-object"), + std::move(read_source)); + + std::istream stream(&buf); + std::vector v(20); + stream.read(v.data(), 20); + + buf.Close(); + auto lines = log.ExtractLines(); + EXPECT_EQ(lines.size(), 1); + EXPECT_THAT( + lines[0], + HasSubstr( + "storage: received 5 more bytes than requested from GCS for bucket " + "\"my-bucket\", object \"my-object\"")); +} + } // namespace } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END