Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions google/cloud/storage/internal/object_read_streambuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "google/cloud/storage/hash_mismatch_error.h"
#include "google/cloud/storage/internal/hash_function.h"
#include "google/cloud/internal/make_status.h"
#include "google/cloud/log.h"
#include <algorithm>
#include <cstring>
#include <memory>
Expand All @@ -37,6 +38,18 @@ std::streamoff InitialOffset(ReadObjectRangeRequest const& request) {
return request.StartingByte();
}

absl::optional<std::int64_t> ExpectedBytes(
ReadObjectRangeRequest const& request) {
if (request.HasOption<ReadRange>()) {
auto const& range = request.GetOption<ReadRange>().value();
return range.end - range.begin;
}
Comment on lines +43 to +46

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If ReadRange is configured such that range.end <= range.begin (or if it represents an open-ended range where end is not set/defaults to 0), range.end - range.begin will be negative or zero. This would cause total_bytes_received_ > *expected_bytes_ to evaluate to true incorrectly, triggering false warning logs.

We should defensively check that range.end > range.begin before returning the difference.

  if (request.HasOption<ReadRange>()) {
    auto const& range = request.GetOption<ReadRange>().value();
    if (range.end > range.begin) {
      return range.end - range.begin;
    }
    return absl::nullopt;
  }
References
  1. Prefer defensive code, such as explicit checks, even if they seem redundant based on the current implementation of a framework, as the framework's contract may change in the future.

if (request.HasOption<ReadLast>()) {
return request.GetOption<ReadLast>().value();
}
return absl::nullopt;
}

} // namespace

ObjectReadStreambuf::ObjectReadStreambuf(
Expand All @@ -45,14 +58,20 @@ ObjectReadStreambuf::ObjectReadStreambuf(
: source_(std::move(source)),
source_pos_(InitialOffset(request)),
hash_function_(CreateHashFunction(request)),
hash_validator_(CreateHashValidator(request)) {}
hash_validator_(CreateHashValidator(request)),
bucket_name_(request.bucket_name()),
object_name_(request.object_name()),
expected_bytes_(ExpectedBytes(request)) {}

ObjectReadStreambuf::ObjectReadStreambuf(ReadObjectRangeRequest const&,
ObjectReadStreambuf::ObjectReadStreambuf(ReadObjectRangeRequest const& request,
Status status)
: source_(new ObjectReadErrorSource(status)),
source_pos_(-1),
hash_validator_(CreateNullHashValidator()),
status_(std::move(status)) {}
status_(std::move(status)),
bucket_name_(request.bucket_name()),
object_name_(request.object_name()),
expected_bytes_(ExpectedBytes(request)) {}

ObjectReadStreambuf::pos_type ObjectReadStreambuf::seekpos(
pos_type /*pos*/, std::ios_base::openmode /*which*/) {
Expand All @@ -77,6 +96,16 @@ ObjectReadStreambuf::pos_type ObjectReadStreambuf::seekoff(
bool ObjectReadStreambuf::IsOpen() const { return source_->IsOpen(); }

void ObjectReadStreambuf::Close() {
if (expected_bytes_.has_value() && total_bytes_received_ > *expected_bytes_) {
if (!transformation().has_value()) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with how other member variables (like expected_bytes_, total_bytes_received_, bucket_name_, and object_name_) are accessed in this function and in xsgetn(), we should access the member variable transformation_ directly instead of calling the accessor method transformation().

    if (!transformation_.has_value()) {

GCP_LOG(WARNING) << "storage: received "
<< (total_bytes_received_ - *expected_bytes_)
<< " more bytes than requested from GCS for bucket "
<< bucket_name_ << ", object " << object_name_;
}
expected_bytes_ = absl::nullopt;
}

auto response = source_->Close();
if (!response.ok()) {
ReportError(std::move(response).status());
Expand Down Expand Up @@ -224,6 +253,8 @@ std::streamsize ObjectReadStreambuf::xsgetn(char* s, std::streamsize count) {
if (!size_) size_ = std::move(read->size);
if (!transformation_) transformation_ = std::move(read->transformation);

total_bytes_received_ += read->bytes_received;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The total_bytes_received_ counter is only updated here in xsgetn(). However, ObjectReadStreambuf also overrides underflow() to handle buffered/single-character reads (e.g., via std::getline or operator>>). If a user reads from the stream using those methods, underflow() will read from the source but total_bytes_received_ will not be incremented, leading to an under-count and missed warnings.

Please ensure total_bytes_received_ is also updated in underflow(), or refactor the byte-tracking logic into a common helper method called by both xsgetn() and underflow().


if (source_pos_ >= 0) {
source_pos_ += static_cast<std::streamoff>(read->bytes_received);
} else if (size_) {
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/storage/internal/object_read_streambuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ class ObjectReadStreambuf : public std::basic_streambuf<char> {
absl::optional<std::string> storage_class_;
absl::optional<std::uint64_t> size_;
absl::optional<std::string> transformation_;
std::string bucket_name_;
std::string object_name_;
absl::optional<std::int64_t> expected_bytes_;
std::int64_t total_bytes_received_ = 0;
};

} // namespace internal
Expand Down
80 changes: 79 additions & 1 deletion google/cloud/storage/internal/object_read_streambuf_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include "google/cloud/storage/internal/object_read_streambuf.h"
#include "google/cloud/storage/testing/mock_client.h"
#include "google/cloud/testing_util/scoped_log.h"
#include <gmock/gmock.h>
#include <memory>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -173,6 +173,84 @@ TEST(ObjectReadStreambufTest, WrongSeek) {
EXPECT_TRUE(stream.fail());
}

TEST(ObjectReadStreambufTest, LogsWarningOnExcessBytes) {
testing_util::ScopedLog log;
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
EXPECT_CALL(*read_source, Read).WillOnce(Return(ReadSourceResult{15, {}}));
EXPECT_CALL(*read_source, Close())
.WillOnce(Return(HttpResponse{200, "OK", {}}));

ObjectReadStreambuf buf(
ReadObjectRangeRequest("test-bucket", "test-object")
.set_option(ReadRange(0, 10)),
std::move(read_source));

std::istream stream(&buf);
std::vector<char> v(1024);
stream.read(v.data(), 15);
buf.Close();

auto const log_lines = log.ExtractLines();
EXPECT_THAT(
log_lines,
testing::Contains(testing::HasSubstr(
"storage: received 5 more bytes than requested from GCS for bucket "
"test-bucket, object test-object")));
}

TEST(ObjectReadStreambufTest, NoWarningWhenExactBytesReceived) {
testing_util::ScopedLog log;
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
EXPECT_CALL(*read_source, Read).WillOnce(Return(ReadSourceResult{10, {}}));
EXPECT_CALL(*read_source, Close())
.WillOnce(Return(HttpResponse{200, "OK", {}}));

ObjectReadStreambuf buf(
ReadObjectRangeRequest("test-bucket", "test-object")
.set_option(ReadRange(0, 10)),
std::move(read_source));

std::istream stream(&buf);
std::vector<char> v(1024);
stream.read(v.data(), 10);
buf.Close();

auto const log_lines = log.ExtractLines();
EXPECT_THAT(
log_lines,
testing::Not(testing::Contains(testing::HasSubstr(
"more bytes than requested from GCS for bucket"))));
}

TEST(ObjectReadStreambufTest, NoWarningWhenDecompressed) {
testing_util::ScopedLog log;
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
auto response = ReadSourceResult{15, {}};
response.transformation = "gunzipped";
EXPECT_CALL(*read_source, Read).WillOnce(Return(response));
EXPECT_CALL(*read_source, Close())
.WillOnce(Return(HttpResponse{200, "OK", {}}));

ObjectReadStreambuf buf(
ReadObjectRangeRequest("test-bucket", "test-object")
.set_option(ReadRange(0, 10)),
std::move(read_source));

std::istream stream(&buf);
std::vector<char> v(1024);
stream.read(v.data(), 15);
buf.Close();

auto const log_lines = log.ExtractLines();
EXPECT_THAT(
log_lines,
testing::Not(testing::Contains(testing::HasSubstr(
"more bytes than requested from GCS for bucket"))));
}

} // namespace
} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
Loading