Skip to content
10 changes: 9 additions & 1 deletion google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,19 @@ AsyncConnectionImpl::ReadObject(ReadObjectParams p) {
CreateHashFunction(*current));
auto hash_validator = CreateHashValidator(p.request, *current);

absl::optional<std::int64_t> requested_length;
if (p.request.read_limit() > 0) {
requested_length = p.request.read_limit();
}
auto bucket = p.request.bucket();
auto object = p.request.object();

auto connection_factory = MakeReaderConnectionFactory(
std::move(current), std::move(p.request), hash_function);
auto connection = std::make_unique<AsyncReaderConnectionResume>(
std::move(resume_policy), std::move(hash_function),
std::move(hash_validator), std::move(connection_factory));
std::move(hash_validator), std::move(connection_factory),
requested_length, std::move(bucket), std::move(object));

return make_ready_future(ReturnType(std::move(connection)));
}
Expand Down
27 changes: 22 additions & 5 deletions google/cloud/storage/internal/async/object_descriptor_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "google/cloud/grpc_error_delegate.h"
#include "google/cloud/internal/opentelemetry.h"
#include "google/rpc/status.pb.h"
#include <limits>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -165,8 +166,20 @@ std::unique_ptr<storage::AsyncReaderConnection> ObjectDescriptorImpl::Read(
auto hash_function = CreateHashFunction(is_full_read);
auto hash_validator = CreateHashValidator(is_full_read);

auto range = std::make_shared<ReadRange>(p.start, p.length, hash_function,
std::move(hash_validator));
absl::optional<std::int64_t> limit;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pls add a comment about how limit is calculated for readers who are unfamiliar with GCS range request behaviors.

if (p.start < 0) {
if (p.start == (std::numeric_limits<std::int64_t>::min)()) {
limit = (std::numeric_limits<std::int64_t>::max)();
} else {
limit = -p.start;
}
} else if (p.length > 0) {
limit = p.length;
}

auto range = std::make_shared<ReadRange>(
p.start, limit, hash_function, std::move(hash_validator),
read_object_spec_.bucket(), read_object_spec_.object());

std::unique_lock<std::mutex> lk(mu_);
if (stream_manager_->Empty()) {
Expand Down Expand Up @@ -329,16 +342,20 @@ void ObjectDescriptorImpl::OnRead(
std::move(*response->mutable_read_handle());
}
auto copy = it->active_ranges;
// Release the lock while notifying the ranges. The notifications may trigger

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this comment before lk.unlock()?

// application code, and that code may callback on this class.
bool is_transcoded = false;
absl::optional<std::int64_t> object_size;
if (metadata_.has_value()) {
is_transcoded = metadata_->content_encoding() == "gzip";
object_size = metadata_->size();
}
Comment thread
v-pratap marked this conversation as resolved.
lk.unlock();
for (auto& range_data : *response->mutable_object_data_ranges()) {
auto id = range_data.read_range().read_id();
auto const l = copy.find(id);
if (l == copy.end()) continue;
// TODO(#15104) - Consider returning if the range is done, and then
// skipping CleanupDoneRanges().
l->second->OnRead(std::move(range_data));
l->second->OnRead(std::move(range_data), is_transcoded, object_size);
}
lk.lock();
stream_manager_->CleanupDoneRanges(it);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TEST(ObjectDescriptorReader, Basic) {
range_end: false
)pb";
EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data));
impl->OnRead(std::move(data));
impl->OnRead(std::move(data), /*is_transcoded=*/false);

auto actual = tested.Read().get();
EXPECT_THAT(actual,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST(ObjectDescriptorReaderTracing, Read) {
range_end: false
)pb";
EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data));
impl->OnRead(std::move(data));
impl->OnRead(std::move(data), /*is_transcoded=*/false);

auto actual = reader->Read().get();
auto spans = span_catcher->GetSpans();
Expand Down
32 changes: 30 additions & 2 deletions google/cloud/storage/internal/async/read_range.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ absl::optional<google::storage::v2::ReadRange> ReadRange::RangeForResume(
range.set_read_id(read_id);
std::lock_guard<std::mutex> lk(mu_);
if (status_.has_value()) return absl::nullopt;
if (requested_length_.has_value() && *requested_length_ >= 0 &&
received_bytes_ >= static_cast<std::size_t>(*requested_length_)) {
return absl::nullopt;
}
range.set_read_offset(offset_);
range.set_read_length(length_);
return range;
Expand All @@ -61,15 +65,20 @@ void ReadRange::OnFinish(Status status) {
std::unique_lock<std::mutex> lk(mu_);
if (status_) return;
status_ = std::move(status);
CheckOverrun();
if (!wait_) return;
auto p = std::move(*wait_);
wait_.reset();
lk.unlock();
p.set_value(*status_);
}

void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) {
void ReadRange::OnRead(google::storage::v2::ObjectRangeData data,
bool is_transcoded,
absl::optional<std::int64_t> object_size) {
std::unique_lock<std::mutex> lk(mu_);
is_transcoded_ = is_transcoded_ || is_transcoded;
if (object_size.has_value()) object_size_ = object_size;
if (status_) return;
auto* check_summed_data = data.mutable_checksummed_data();
auto content = StealMutableContent(*data.mutable_checksummed_data());
Expand All @@ -81,13 +90,19 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) {
}

offset_ += content.size();
received_bytes_ += content.size();
if (length_ != 0) length_ -= std::min<std::int64_t>(content.size(), length_);
auto p = ReadPayloadImpl::Make(std::move(content));

if (data.range_end()) {
status_ = Status{};
CheckOverrun();
auto result = std::move(*hash_validator_).Finish(hash_function_->Finish());
if (result.is_mismatch) {
bool transcoded_download =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment explaining why we're performing this check. Same comment for this change under reader_connection_resume.cc.

is_transcoded_ &&
(!object_size_.has_value() ||
(received_bytes_ != static_cast<std::size_t>(*object_size_)));
if (result.is_mismatch && !transcoded_download) {
status_ = google::cloud::internal::DataLossError(
absl::StrCat("mismatched checksums detected at the end of the "
"download, received={",
Expand All @@ -105,6 +120,19 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) {
payload_ = std::move(p);
}

void ReadRange::CheckOverrun() {
if (requested_length_.has_value() && *requested_length_ >= 0 &&
received_bytes_ > static_cast<std::size_t>(*requested_length_) &&
!is_transcoded_ && !logged_warning_) {
logged_warning_ = true;
GCP_LOG(WARNING) << "storage: received "
<< (received_bytes_ -
static_cast<std::size_t>(*requested_length_))
<< " more bytes than requested from GCS for bucket \""
<< bucket_name_ << "\", object \"" << object_name_ << "\"";
}
}

void ReadRange::Notify(std::unique_lock<std::mutex> lk,
storage::ReadPayload p) {
auto wait = std::move(*wait_);
Expand Down
39 changes: 33 additions & 6 deletions google/cloud/storage/internal/async/read_range.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,32 @@ class ReadRange {
public:
using ReadResponse = storage::AsyncReaderConnection::ReadResponse;

ReadRange(std::int64_t offset, std::int64_t length,
ReadRange(std::int64_t offset, absl::optional<std::int64_t> requested_length,
std::shared_ptr<storage::internal::HashFunction> hash_function,
std::unique_ptr<storage::internal::HashValidator> hash_validator,
std::string bucket_name, std::string object_name)
: offset_(offset),
requested_length_(requested_length),
length_(requested_length.value_or(0)),
bucket_name_(std::move(bucket_name)),
object_name_(std::move(object_name)),
hash_function_(std::move(hash_function)),
hash_validator_(std::move(hash_validator)) {}

ReadRange(std::int64_t offset, absl::optional<std::int64_t> requested_length,
std::shared_ptr<storage::internal::HashFunction> hash_function =
storage::internal::CreateNullHashFunction(),
std::unique_ptr<storage::internal::HashValidator> hash_validator =
storage::internal::CreateNullHashValidator())
: offset_(offset),
length_(length),
hash_function_(std::move(hash_function)),
hash_validator_(std::move(hash_validator)) {}
: ReadRange(offset, requested_length, std::move(hash_function),
std::move(hash_validator), {}, {}) {}

ReadRange(std::int64_t offset, absl::optional<std::int64_t> requested_length,
std::string bucket_name, std::string object_name)
: ReadRange(offset, requested_length,
storage::internal::CreateNullHashFunction(),
storage::internal::CreateNullHashValidator(),
std::move(bucket_name), std::move(object_name)) {}

bool IsDone() const;

Expand All @@ -63,14 +80,24 @@ class ReadRange {
future<ReadResponse> Read();
void OnFinish(Status status);

void OnRead(google::storage::v2::ObjectRangeData data);
void OnRead(google::storage::v2::ObjectRangeData data,
bool is_transcoded = false,
absl::optional<std::int64_t> object_size = absl::nullopt);

private:
void Notify(std::unique_lock<std::mutex> lk, storage::ReadPayload p);
void CheckOverrun();

mutable std::mutex mu_;
std::int64_t offset_;
absl::optional<std::int64_t> requested_length_;
std::int64_t length_;
std::string bucket_name_;
std::string object_name_;
std::size_t received_bytes_ = 0;
bool is_transcoded_ = false;
bool logged_warning_ = false;
absl::optional<std::int64_t> object_size_;
absl::optional<storage::ReadPayload> payload_;
absl::optional<Status> status_;
absl::optional<promise<ReadResponse>> wait_;
Expand Down
Loading
Loading