From e9a4fb2516c316d4eb8d50e3437ff43d1146fceb Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Fri, 12 Jun 2026 10:49:56 +0000 Subject: [PATCH 01/20] feat: create a generic wrapper for gRPC and REST pure background thread impl --- google/cloud/google_cloud_cpp_common.bzl | 1 + google/cloud/google_cloud_cpp_common.cmake | 1 + .../cloud/internal/background_threads_impl.cc | 42 +--------- .../cloud/internal/background_threads_impl.h | 17 +--- .../generic_background_threads_impl.h | 81 +++++++++++++++++++ .../rest_pure_background_threads_impl.cc | 42 ---------- .../rest_pure_background_threads_impl.h | 19 +---- 7 files changed, 92 insertions(+), 111 deletions(-) create mode 100644 google/cloud/internal/generic_background_threads_impl.h diff --git a/google/cloud/google_cloud_cpp_common.bzl b/google/cloud/google_cloud_cpp_common.bzl index 421c9c00a4160..5c86d6d9df7fc 100644 --- a/google/cloud/google_cloud_cpp_common.bzl +++ b/google/cloud/google_cloud_cpp_common.bzl @@ -55,6 +55,7 @@ google_cloud_cpp_common_hdrs = [ "internal/future_fwd.h", "internal/future_impl.h", "internal/future_then_impl.h", + "internal/generic_background_threads_impl.h", "internal/getenv.h", "internal/group_options.h", "internal/invocation_id_generator.h", diff --git a/google/cloud/google_cloud_cpp_common.cmake b/google/cloud/google_cloud_cpp_common.cmake index 8b3601db39f01..a892e6db3d02c 100644 --- a/google/cloud/google_cloud_cpp_common.cmake +++ b/google/cloud/google_cloud_cpp_common.cmake @@ -86,6 +86,7 @@ add_library( internal/future_impl.h internal/future_then_impl.cc internal/future_then_impl.h + internal/generic_background_threads_impl.h internal/getenv.cc internal/getenv.h internal/group_options.h diff --git a/google/cloud/internal/background_threads_impl.cc b/google/cloud/internal/background_threads_impl.cc index d5e10b1750699..174d57311eb12 100644 --- a/google/cloud/internal/background_threads_impl.cc +++ b/google/cloud/internal/background_threads_impl.cc @@ -21,47 +21,7 @@ namespace google { namespace cloud { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -namespace internal { - -AutomaticallyCreatedBackgroundThreads::AutomaticallyCreatedBackgroundThreads( - std::size_t thread_count) - : pool_(thread_count == 0 ? 1 : thread_count) { - std::generate_n(pool_.begin(), pool_.size(), [this] { - promise started; - auto thread = std::thread( - [](CompletionQueue cq, promise& started) { - started.set_value(); - cq.Run(); - }, - cq_, std::ref(started)); - started.get_future().wait(); - return thread; - }); -} - -AutomaticallyCreatedBackgroundThreads:: - ~AutomaticallyCreatedBackgroundThreads() { - Shutdown(); -} - -void AutomaticallyCreatedBackgroundThreads::Shutdown() { - cq_.Shutdown(); - for (auto& t : pool_) { -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - try { -#endif - t.join(); -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - } catch (std::system_error const& e) { - GCP_LOG(FATAL) << "AutomaticallyCreatedBackgroundThreads::Shutdown: " - << e.what(); - } -#endif - } - pool_.clear(); -} - -} // namespace internal +namespace internal {} // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google diff --git a/google/cloud/internal/background_threads_impl.h b/google/cloud/internal/background_threads_impl.h index a307ded7976cf..b13f477658e1d 100644 --- a/google/cloud/internal/background_threads_impl.h +++ b/google/cloud/internal/background_threads_impl.h @@ -17,6 +17,7 @@ #include "google/cloud/background_threads.h" #include "google/cloud/completion_queue.h" +#include "google/cloud/internal/generic_background_threads_impl.h" #include "google/cloud/version.h" #include #include @@ -40,19 +41,9 @@ class CustomerSuppliedBackgroundThreads : public BackgroundThreads { }; /// Create a background thread to perform background operations. -class AutomaticallyCreatedBackgroundThreads : public BackgroundThreads { - public: - explicit AutomaticallyCreatedBackgroundThreads(std::size_t thread_count = 1U); - ~AutomaticallyCreatedBackgroundThreads() override; - - CompletionQueue cq() const override { return cq_; } - void Shutdown(); - std::size_t pool_size() const { return pool_.size(); } - - private: - CompletionQueue cq_; - std::vector pool_; -}; +using AutomaticallyCreatedBackgroundThreads = + AutomaticallyCreatedBackgroundThreadsImpl; } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/internal/generic_background_threads_impl.h b/google/cloud/internal/generic_background_threads_impl.h new file mode 100644 index 0000000000000..ae1e3b1196420 --- /dev/null +++ b/google/cloud/internal/generic_background_threads_impl.h @@ -0,0 +1,81 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_GENERIC_BACKGROUND_THREADS_IMPL_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_GENERIC_BACKGROUND_THREADS_IMPL_H + +#include "google/cloud/future.h" +#include "google/cloud/log.h" +#include "google/cloud/version.h" +#include +#include +#include +#include + +namespace google { +namespace cloud { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace internal { + +template +class AutomaticallyCreatedBackgroundThreadsImpl : public BaseInterface { + public: + explicit AutomaticallyCreatedBackgroundThreadsImpl( + std::size_t thread_count = 1U) + : pool_(thread_count == 0 ? 1 : thread_count) { + std::generate_n(pool_.begin(), pool_.size(), [this] { + promise started; + auto thread = std::thread( + [](QueueType cq, promise& started) { + started.set_value(); + cq.Run(); + }, + cq_, std::ref(started)); + started.get_future().wait(); + return thread; + }); + } + + ~AutomaticallyCreatedBackgroundThreadsImpl() override { Shutdown(); } + + QueueType cq() const override { return cq_; } + std::size_t pool_size() const { return pool_.size(); } + + void Shutdown() { + cq_.Shutdown(); + for (auto& t : pool_) { +#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS + try { +#endif + if (t.joinable()) t.join(); +#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS + } catch (std::system_error const& e) { + GCP_LOG(FATAL) << "Shutdown error: " << e.what(); + } +#endif + } + pool_.clear(); + } + + private: + QueueType cq_; + std::vector pool_; +}; + +} // namespace internal +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_GENERIC_BACKGROUND_THREADS_IMPL_H diff --git a/google/cloud/internal/rest_pure_background_threads_impl.cc b/google/cloud/internal/rest_pure_background_threads_impl.cc index 525b92c0adb79..a830172037f80 100644 --- a/google/cloud/internal/rest_pure_background_threads_impl.cc +++ b/google/cloud/internal/rest_pure_background_threads_impl.cc @@ -25,48 +25,6 @@ namespace cloud { namespace rest_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -AutomaticallyCreatedRestPureBackgroundThreads:: - AutomaticallyCreatedRestPureBackgroundThreads(std::size_t thread_count) - : cq_(std::make_shared()), - pool_(thread_count == 0 ? 1 : thread_count) { - std::generate_n(pool_.begin(), pool_.size(), [this] { - promise started; - auto thread = std::thread( - [](RestPureCompletionQueue cq, promise& started, - internal::CallContext c) { - internal::ScopedCallContext scope(std::move(c)); - started.set_value(); - cq.Run(); - }, - cq_, std::ref(started), internal::CallContext{}); - started.get_future().wait(); - return thread; - }); -} - -AutomaticallyCreatedRestPureBackgroundThreads:: - ~AutomaticallyCreatedRestPureBackgroundThreads() { - Shutdown(); -} - -void AutomaticallyCreatedRestPureBackgroundThreads::Shutdown() { - cq_.Shutdown(); - for (auto& t : pool_) { -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - try { -#endif - t.join(); -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - } catch (std::system_error const& e) { - GCP_LOG(FATAL) - << "AutomaticallyCreatedRestPureBackgroundThreads::Shutdown: " - << e.what(); - } -#endif - } - pool_.clear(); -} - GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace rest_internal } // namespace cloud diff --git a/google/cloud/internal/rest_pure_background_threads_impl.h b/google/cloud/internal/rest_pure_background_threads_impl.h index 4061729dd7f63..36d4593ebecfa 100644 --- a/google/cloud/internal/rest_pure_background_threads_impl.h +++ b/google/cloud/internal/rest_pure_background_threads_impl.h @@ -15,6 +15,7 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H +#include "google/cloud/internal/generic_background_threads_impl.h" #include "google/cloud/internal/rest_pure_completion_queue_impl.h" #include "google/cloud/version.h" #include @@ -38,21 +39,9 @@ class RestPureBackgroundThreads { }; /// Background threads that run on a RestPureCompletionQueue. -class AutomaticallyCreatedRestPureBackgroundThreads - : public RestPureBackgroundThreads { - public: - explicit AutomaticallyCreatedRestPureBackgroundThreads( - std::size_t thread_count = 1U); - ~AutomaticallyCreatedRestPureBackgroundThreads() override; - - RestPureCompletionQueue cq() const override { return cq_; } - void Shutdown(); - std::size_t pool_size() const { return pool_.size(); } - - private: - RestPureCompletionQueue cq_; - std::vector pool_; -}; +using AutomaticallyCreatedRestPureBackgroundThreads = + google::cloud::internal::AutomaticallyCreatedBackgroundThreadsImpl< + RestPureCompletionQueue, RestPureBackgroundThreads>; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace rest_internal From b36200c90c6db7be27c15151c8939957f3c613d6 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Fri, 12 Jun 2026 11:14:47 +0000 Subject: [PATCH 02/20] add unit tests and address review comments --- .../google_cloud_cpp_rest_internal.cmake | 1 + ...gle_cloud_cpp_rest_internal_unit_tests.bzl | 1 + .../generic_background_threads_impl.h | 18 +++- .../rest_pure_background_threads_impl.h | 17 +++- .../rest_pure_background_threads_impl_test.cc | 84 +++++++++++++++++++ 5 files changed, 116 insertions(+), 5 deletions(-) create mode 100644 google/cloud/internal/rest_pure_background_threads_impl_test.cc diff --git a/google/cloud/google_cloud_cpp_rest_internal.cmake b/google/cloud/google_cloud_cpp_rest_internal.cmake index c2fa155d7ad9e..588e1da4bda2b 100644 --- a/google/cloud/google_cloud_cpp_rest_internal.cmake +++ b/google/cloud/google_cloud_cpp_rest_internal.cmake @@ -298,6 +298,7 @@ if (BUILD_TESTING) internal/rest_lro_helpers_test.cc internal/rest_opentelemetry_test.cc internal/rest_parse_json_error_test.cc + internal/rest_pure_background_threads_impl_test.cc internal/rest_request_test.cc internal/rest_response_test.cc internal/rest_retry_loop_test.cc diff --git a/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl b/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl index 003a9fe1d2080..c0e1909d4ca6b 100644 --- a/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl +++ b/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl @@ -61,6 +61,7 @@ google_cloud_cpp_rest_internal_unit_tests = [ "internal/rest_lro_helpers_test.cc", "internal/rest_opentelemetry_test.cc", "internal/rest_parse_json_error_test.cc", + "internal/rest_pure_background_threads_impl_test.cc", "internal/rest_request_test.cc", "internal/rest_response_test.cc", "internal/rest_retry_loop_test.cc", diff --git a/google/cloud/internal/generic_background_threads_impl.h b/google/cloud/internal/generic_background_threads_impl.h index ae1e3b1196420..ca1c79b71731c 100644 --- a/google/cloud/internal/generic_background_threads_impl.h +++ b/google/cloud/internal/generic_background_threads_impl.h @@ -28,18 +28,28 @@ namespace cloud { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace internal { -template +template +struct DefaultQueueTraits { + static QueueType Create() { return QueueType(); } + static void Run(QueueType cq, promise& started) { + started.set_value(); + cq.Run(); + } +}; + +template > class AutomaticallyCreatedBackgroundThreadsImpl : public BaseInterface { public: explicit AutomaticallyCreatedBackgroundThreadsImpl( std::size_t thread_count = 1U) - : pool_(thread_count == 0 ? 1 : thread_count) { + : cq_(QueueTraits::Create()), + pool_(thread_count == 0 ? 1 : thread_count) { std::generate_n(pool_.begin(), pool_.size(), [this] { promise started; auto thread = std::thread( [](QueueType cq, promise& started) { - started.set_value(); - cq.Run(); + QueueTraits::Run(std::move(cq), started); }, cq_, std::ref(started)); started.get_future().wait(); diff --git a/google/cloud/internal/rest_pure_background_threads_impl.h b/google/cloud/internal/rest_pure_background_threads_impl.h index 36d4593ebecfa..a9c8aef7c14ca 100644 --- a/google/cloud/internal/rest_pure_background_threads_impl.h +++ b/google/cloud/internal/rest_pure_background_threads_impl.h @@ -15,6 +15,7 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H +#include "google/cloud/internal/call_context.h" #include "google/cloud/internal/generic_background_threads_impl.h" #include "google/cloud/internal/rest_pure_completion_queue_impl.h" #include "google/cloud/version.h" @@ -38,10 +39,24 @@ class RestPureBackgroundThreads { virtual RestPureCompletionQueue cq() const = 0; }; +struct RestPureQueueTraits { + static RestPureCompletionQueue Create() { + return RestPureCompletionQueue( + std::make_shared()); + } + static void Run(RestPureCompletionQueue cq, promise& started) { + internal::CallContext context; + internal::ScopedCallContext scope(std::move(context)); + started.set_value(); + cq.Run(); + } +}; + /// Background threads that run on a RestPureCompletionQueue. using AutomaticallyCreatedRestPureBackgroundThreads = google::cloud::internal::AutomaticallyCreatedBackgroundThreadsImpl< - RestPureCompletionQueue, RestPureBackgroundThreads>; + RestPureCompletionQueue, RestPureBackgroundThreads, + RestPureQueueTraits>; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace rest_internal diff --git a/google/cloud/internal/rest_pure_background_threads_impl_test.cc b/google/cloud/internal/rest_pure_background_threads_impl_test.cc new file mode 100644 index 0000000000000..31ee8e68ec0c1 --- /dev/null +++ b/google/cloud/internal/rest_pure_background_threads_impl_test.cc @@ -0,0 +1,84 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/internal/rest_pure_background_threads_impl.h" +#include +#include +#include + +namespace google { +namespace cloud { +namespace rest_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::testing::Contains; +using ::testing::Not; + +/// @test Verify that automatically created completion queues are usable. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, IsActive) { + AutomaticallyCreatedRestPureBackgroundThreads actual; + EXPECT_EQ(1, actual.pool_size()); + + promise bg; + actual.cq().RunAsync([&bg] { bg.set_value(std::this_thread::get_id()); }); + EXPECT_NE(std::this_thread::get_id(), bg.get_future().get()); +} + +/// @test Verify that automatically created completion queues are usable. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, NoEmptyPools) { + AutomaticallyCreatedRestPureBackgroundThreads actual(0); + EXPECT_EQ(1, actual.pool_size()); + + promise bg; + actual.cq().RunAsync([&bg] { bg.set_value(std::this_thread::get_id()); }); + EXPECT_NE(std::this_thread::get_id(), bg.get_future().get()); +} + +/// @test Verify that automatically created completion queues work. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, ManyThreads) { + auto constexpr kThreadCount = 4; + AutomaticallyCreatedRestPureBackgroundThreads actual(kThreadCount); + EXPECT_EQ(kThreadCount, actual.pool_size()); + + std::vector> promises(100 * kThreadCount); + for (auto& p : promises) { + actual.cq().RunAsync([&p] { p.set_value(std::this_thread::get_id()); }); + } + std::set ids; + for (auto& p : promises) ids.insert(p.get_future().get()); + EXPECT_FALSE(ids.empty()); + EXPECT_GE(kThreadCount, ids.size()); + EXPECT_THAT(ids, Not(Contains(std::this_thread::get_id()))); +} + +/// @test Verify that automatically created completion queues work. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, ManualShutdown) { + auto constexpr kThreadCount = 4; + AutomaticallyCreatedRestPureBackgroundThreads actual(kThreadCount); + EXPECT_EQ(kThreadCount, actual.pool_size()); + + std::vector> promises(2 * kThreadCount); + for (auto& p : promises) { + actual.cq().RunAsync([&p] { p.set_value(); }); + } + for (auto& p : promises) p.get_future().get(); + actual.Shutdown(); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace rest_internal +} // namespace cloud +} // namespace google From 32c53afca5e9224fed42454395e70cf2adf9956e Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 26 May 2026 13:24:34 +0000 Subject: [PATCH 03/20] feat(storage): add resource span attributes ACO ( App Centric Observability ) --- .../storage/internal/tracing_connection.cc | 31 ++++- .../storage/internal/tracing_connection.h | 3 + .../internal/tracing_connection_test.cc | 128 ++++++++++++++++++ 3 files changed, 157 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 6d7e20a2d349b..85994dea2ddfa 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -31,6 +31,17 @@ TracingConnection::TracingConnection(std::shared_ptr impl) Options TracingConnection::options() const { return impl_->options(); } +void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, + storage::BucketMetadata const& metadata) { + std::string id = "projects/" + std::to_string(metadata.project_number()) + "/buckets/" + metadata.name(); + std::string location = metadata.location(); + if (metadata.location_type() == "multi-region" || metadata.location_type() == "dual-region") { + location = "global"; + } + span.SetAttribute("gcp.resource.destination.id", id); + span.SetAttribute("gcp.resource.destination.location", location); +} + StatusOr TracingConnection::ListBuckets( storage::internal::ListBucketsRequest const& request) { // TODO(#11395) - use a internal::MakeTracedStreamRange in storage::Client @@ -43,14 +54,18 @@ StatusOr TracingConnection::CreateBucket( storage::internal::CreateBucketRequest const& request) { auto span = internal::MakeSpan("storage::Client::CreateBucket"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->CreateBucket(request)); + auto result = impl_->CreateBucket(request); + if (result.ok()) EnrichSpan(*span, *result); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetBucketMetadata( storage::internal::GetBucketMetadataRequest const& request) { auto span = internal::MakeSpan("storage::Client::GetBucketMetadata"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->GetBucketMetadata(request)); + auto result = impl_->GetBucketMetadata(request); + if (result.ok()) EnrichSpan(*span, *result); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::DeleteBucket( @@ -64,14 +79,18 @@ StatusOr TracingConnection::UpdateBucket( storage::internal::UpdateBucketRequest const& request) { auto span = internal::MakeSpan("storage::Client::UpdateBucket"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->UpdateBucket(request)); + auto result = impl_->UpdateBucket(request); + if (result.ok()) EnrichSpan(*span, *result); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::PatchBucket( storage::internal::PatchBucketRequest const& request) { auto span = internal::MakeSpan("storage::Client::PatchBucket"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->PatchBucket(request)); + auto result = impl_->PatchBucket(request); + if (result.ok()) EnrichSpan(*span, *result); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetNativeBucketIamPolicy( @@ -100,7 +119,9 @@ StatusOr TracingConnection::LockBucketRetentionPolicy( storage::internal::LockBucketRetentionPolicyRequest const& request) { auto span = internal::MakeSpan("storage::Client::LockBucketRetentionPolicy"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->LockBucketRetentionPolicy(request)); + auto result = impl_->LockBucketRetentionPolicy(request); + if (result.ok()) EnrichSpan(*span, *result); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::InsertObjectMedia( diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 45df6239f956d..aab04a6bd8e1a 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -178,6 +178,9 @@ class TracingConnection : public storage::internal::StorageConnection { std::vector InspectStackStructure() const override; private: + void EnrichSpan(opentelemetry::trace::Span& span, + storage::BucketMetadata const& metadata); + std::shared_ptr impl_; }; diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index bda9bc46d0a2d..adbdb253b5dd5 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -41,6 +41,7 @@ using ::google::cloud::testing_util::SpanIsRoot; using ::google::cloud::testing_util::SpanKindIsClient; using ::google::cloud::testing_util::SpanNamed; using ::google::cloud::testing_util::SpanWithStatus; +using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::StatusIs; using ::google::cloud::testing_util::ThereIsAnActiveSpan; using ::testing::AllOf; @@ -105,6 +106,31 @@ TEST(TracingClientTest, CreateBucket) { "gl-cpp.status_code", code_str))))); } +TEST(TracingClientTest, CreateBucketSuccess) { + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_shared(); + EXPECT_CALL(*mock, CreateBucket).WillOnce([](auto const&) { + EXPECT_TRUE(ThereIsAnActiveSpan()); + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + return metadata; + }); + auto under_test = TracingConnection(mock); + auto actual = under_test.CreateBucket(storage::internal::CreateBucketRequest()); + EXPECT_THAT(actual, IsOk()); + EXPECT_THAT(span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::CreateBucket"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", "us-east1"))))); +} + TEST(TracingClientTest, GetBucketMetadata) { auto span_catcher = InstallSpanCatcher(); auto mock = std::make_shared(); @@ -128,6 +154,32 @@ TEST(TracingClientTest, GetBucketMetadata) { "gl-cpp.status_code", code_str))))); } +TEST(TracingClientTest, GetBucketMetadataSuccess) { + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_shared(); + EXPECT_CALL(*mock, GetBucketMetadata).WillOnce([](auto const&) { + EXPECT_TRUE(ThereIsAnActiveSpan()); + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + return metadata; + }); + auto under_test = TracingConnection(mock); + auto actual = under_test.GetBucketMetadata( + storage::internal::GetBucketMetadataRequest("test-bucket")); + EXPECT_THAT(actual, IsOk()); + EXPECT_THAT(span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::GetBucketMetadata"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", "us-east1"))))); +} + TEST(TracingClientTest, DeleteBucket) { auto span_catcher = InstallSpanCatcher(); auto mock = std::make_shared(); @@ -174,6 +226,31 @@ TEST(TracingClientTest, UpdateBucket) { "gl-cpp.status_code", code_str))))); } +TEST(TracingClientTest, UpdateBucketSuccess) { + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_shared(); + EXPECT_CALL(*mock, UpdateBucket).WillOnce([](auto const&) { + EXPECT_TRUE(ThereIsAnActiveSpan()); + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + return metadata; + }); + auto under_test = TracingConnection(mock); + auto actual = under_test.UpdateBucket(storage::internal::UpdateBucketRequest()); + EXPECT_THAT(actual, IsOk()); + EXPECT_THAT(span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::UpdateBucket"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", "us-east1"))))); +} + TEST(TracingClientTest, PatchBucket) { auto span_catcher = InstallSpanCatcher(); auto mock = std::make_shared(); @@ -196,6 +273,31 @@ TEST(TracingClientTest, PatchBucket) { "gl-cpp.status_code", code_str))))); } +TEST(TracingClientTest, PatchBucketSuccess) { + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_shared(); + EXPECT_CALL(*mock, PatchBucket).WillOnce([](auto const&) { + EXPECT_TRUE(ThereIsAnActiveSpan()); + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + return metadata; + }); + auto under_test = TracingConnection(mock); + auto actual = under_test.PatchBucket(storage::internal::PatchBucketRequest()); + EXPECT_THAT(actual, IsOk()); + EXPECT_THAT(span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::PatchBucket"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", "us-east1"))))); +} + TEST(TracingClientTest, GetNativeBucketIamPolicy) { auto span_catcher = InstallSpanCatcher(); auto mock = std::make_shared(); @@ -290,6 +392,32 @@ TEST(TracingClientTest, LockBucketRetentionPolicy) { "gl-cpp.status_code", code_str))))); } +TEST(TracingClientTest, LockBucketRetentionPolicySuccess) { + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_shared(); + EXPECT_CALL(*mock, LockBucketRetentionPolicy).WillOnce([](auto const&) { + EXPECT_TRUE(ThereIsAnActiveSpan()); + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + return metadata; + }); + auto under_test = TracingConnection(mock); + auto actual = under_test.LockBucketRetentionPolicy( + storage::internal::LockBucketRetentionPolicyRequest()); + EXPECT_THAT(actual, IsOk()); + EXPECT_THAT(span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::LockBucketRetentionPolicy"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", "us-east1"))))); +} + TEST(TracingClientTest, InsertObjectMedia) { auto span_catcher = InstallSpanCatcher(); auto mock = std::make_shared(); From 520c711379159d8d4fe0f72be260b1db66e816b9 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Wed, 27 May 2026 10:22:56 +0000 Subject: [PATCH 04/20] Adding LRU cache and span attributes to other bucket and object operations --- .../storage/internal/tracing_connection.cc | 291 +++++++++++++++--- .../storage/internal/tracing_connection.h | 90 +++++- .../internal/tracing_connection_test.cc | 155 +++++++--- 3 files changed, 445 insertions(+), 91 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 85994dea2ddfa..b29f20156d930 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -16,6 +16,7 @@ #include "google/cloud/storage/internal/tracing_object_read_source.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/opentelemetry.h" +#include #include #include #include @@ -29,17 +30,88 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN TracingConnection::TracingConnection(std::shared_ptr impl) : impl_(std::move(impl)) {} +TracingConnection::~TracingConnection() { + for (auto& f : bg_tasks_) { + if (f.valid()) f.wait(); + } +} + Options TracingConnection::options() const { return impl_->options(); } +void TracingConnection::CleanupCompletedTasks() { + std::lock_guard lock(mu_); + bg_tasks_.erase(std::remove_if(bg_tasks_.begin(), bg_tasks_.end(), + [](std::future const& f) { + return f.wait_for(std::chrono::seconds(0)) == + std::future_status::ready; + }), + bg_tasks_.end()); +} + +void TracingConnection::MaybeTriggerBackgroundFetch( + std::string const& bucket_name) { + CleanupCompletedTasks(); + + std::lock_guard lock(mu_); + if (in_flight_fetch_.find(bucket_name) != in_flight_fetch_.end()) { + return; + } + + in_flight_fetch_.insert(bucket_name); + + auto f = std::async(std::launch::async, [this, bucket_name]() { + storage::internal::GetBucketMetadataRequest request(bucket_name); + auto result = impl_->GetBucketMetadata(request); + + BucketCacheEntry entry; + if (result.ok()) { + entry.id = "projects/" + std::to_string(result->project_number()) + + "/buckets/" + result->name(); + entry.location = result->location(); + if (result->location_type() == "multi-region" || + result->location_type() == "dual-region") { + entry.location = "global"; + } + cache_.Put(bucket_name, std::move(entry)); + } else if (result.status().code() == StatusCode::kPermissionDenied) { + entry.id = "projects/_/buckets/" + bucket_name; + entry.location = "global"; + cache_.Put(bucket_name, std::move(entry)); + } + + std::lock_guard lock(mu_); + in_flight_fetch_.erase(bucket_name); + }); + + bg_tasks_.push_back(std::move(f)); +} + +void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, + std::string const& bucket_name) { + if (bucket_name.empty()) return; + auto entry = cache_.Get(bucket_name); + if (entry.has_value()) { + span.SetAttribute("gcp.resource.destination.id", entry->id); + span.SetAttribute("gcp.resource.destination.location", entry->location); + } else { + MaybeTriggerBackgroundFetch(bucket_name); + } +} + void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, storage::BucketMetadata const& metadata) { - std::string id = "projects/" + std::to_string(metadata.project_number()) + "/buckets/" + metadata.name(); + std::string id = "projects/" + std::to_string(metadata.project_number()) + + "/buckets/" + metadata.name(); std::string location = metadata.location(); - if (metadata.location_type() == "multi-region" || metadata.location_type() == "dual-region") { + if (metadata.location_type() == "multi-region" || + metadata.location_type() == "dual-region") { location = "global"; } span.SetAttribute("gcp.resource.destination.id", id); span.SetAttribute("gcp.resource.destination.location", location); + + // Populate cache since we have metadata! + cache_.Put(metadata.name(), {id, location}); } StatusOr TracingConnection::ListBuckets( @@ -72,7 +144,10 @@ StatusOr TracingConnection::DeleteBucket( storage::internal::DeleteBucketRequest const& request) { auto span = internal::MakeSpan("storage::Client::DeleteBucket"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->DeleteBucket(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->DeleteBucket(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::UpdateBucket( @@ -97,14 +172,20 @@ StatusOr TracingConnection::GetNativeBucketIamPolicy( storage::internal::GetBucketIamPolicyRequest const& request) { auto span = internal::MakeSpan("storage::Client::GetNativeBucketIamPolicy"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->GetNativeBucketIamPolicy(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->GetNativeBucketIamPolicy(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::SetNativeBucketIamPolicy( storage::internal::SetNativeBucketIamPolicyRequest const& request) { auto span = internal::MakeSpan("storage::Client::SetNativeBucketIamPolicy"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->SetNativeBucketIamPolicy(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->SetNativeBucketIamPolicy(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -112,7 +193,10 @@ TracingConnection::TestBucketIamPermissions( storage::internal::TestBucketIamPermissionsRequest const& request) { auto span = internal::MakeSpan("storage::Client::TestBucketIamPermissions"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->TestBucketIamPermissions(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->TestBucketIamPermissions(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::LockBucketRetentionPolicy( @@ -128,21 +212,30 @@ StatusOr TracingConnection::InsertObjectMedia( storage::internal::InsertObjectMediaRequest const& request) { auto span = internal::MakeSpan("storage::Client::InsertObjectMedia"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->InsertObjectMedia(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->InsertObjectMedia(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::CopyObject( storage::internal::CopyObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::CopyObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->CopyObject(request)); + EnrichSpan(*span, request.destination_bucket()); + auto result = impl_->CopyObject(request); + MaybeInvalidate(result, request.destination_bucket()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetObjectMetadata( storage::internal::GetObjectMetadataRequest const& request) { auto span = internal::MakeSpan("storage::Client::GetObjectMetadata"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->GetObjectMetadata(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->GetObjectMetadata(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr> @@ -150,8 +243,12 @@ TracingConnection::ReadObject( storage::internal::ReadObjectRangeRequest const& request) { auto span = internal::MakeSpan("storage::Client::ReadObject"); auto scope = opentelemetry::trace::Scope(span); + EnrichSpan(*span, request.bucket_name()); auto reader = impl_->ReadObject(request); - if (!reader) return internal::EndSpan(*span, std::move(reader)); + if (!reader) { + MaybeInvalidate(reader, request.bucket_name()); + return internal::EndSpan(*span, std::move(reader)); + } return std::unique_ptr( std::make_unique(std::move(span), *std::move(reader))); @@ -162,42 +259,60 @@ StatusOr TracingConnection::ListObjects( // TODO(#11395) - use a internal::MakeTracedStreamRange in storage::Client auto span = internal::MakeSpan("storage::Client::ListObjects"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->ListObjects(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->ListObjects(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::DeleteObject( storage::internal::DeleteObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::DeleteObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->DeleteObject(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->DeleteObject(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::UpdateObject( storage::internal::UpdateObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::UpdateObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->UpdateObject(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->UpdateObject(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::MoveObject( storage::internal::MoveObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::MoveObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->MoveObject(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->MoveObject(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::PatchObject( storage::internal::PatchObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::PatchObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->PatchObject(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->PatchObject(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::ComposeObject( storage::internal::ComposeObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::ComposeObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->ComposeObject(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->ComposeObject(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -205,14 +320,20 @@ TracingConnection::RewriteObject( storage::internal::RewriteObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::RewriteObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->RewriteObject(request)); + EnrichSpan(*span, request.destination_bucket()); + auto result = impl_->RewriteObject(request); + MaybeInvalidate(result, request.destination_bucket()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::RestoreObject( storage::internal::RestoreObjectRequest const& request) { auto span = internal::MakeSpan("storage::Client::RestoreObject"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->RestoreObject(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->RestoreObject(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -222,7 +343,10 @@ TracingConnection::CreateResumableUpload( auto span = internal::MakeSpan("storage::Client::WriteObject/CreateResumableUpload"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->CreateResumableUpload(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->CreateResumableUpload(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -258,8 +382,10 @@ StatusOr> TracingConnection::UploadFileSimple( auto span = internal::MakeSpan("storage::Client::UploadFile/UploadFileSimple"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan( - *span, impl_->UploadFileSimple(file_name, file_size, request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->UploadFileSimple(file_name, file_size, request); + if (!result) MaybeInvalidate(result.status(), request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr> TracingConnection::UploadFileResumable( @@ -268,8 +394,10 @@ StatusOr> TracingConnection::UploadFileResumable( auto span = internal::MakeSpan("storage::Client::UploadFile/UploadFileResumable"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, - impl_->UploadFileResumable(file_name, request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->UploadFileResumable(file_name, request); + if (!result) MaybeInvalidate(result.status(), request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } Status TracingConnection::DownloadStreamToFile( @@ -278,8 +406,11 @@ Status TracingConnection::DownloadStreamToFile( auto span = internal::MakeSpan( "storage::Client::DownloadToFile/DownloadStreamToFile"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->DownloadStreamToFile( - std::move(stream), file_name, request)); + EnrichSpan(*span, request.bucket_name()); + auto result = + impl_->DownloadStreamToFile(std::move(stream), file_name, request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, result); } StatusOr TracingConnection::ExecuteParallelUploadFile( @@ -300,42 +431,60 @@ TracingConnection::ListBucketAcl( // TODO(#11395) - use a internal::MakeTracedStreamRange in storage::Client auto span = internal::MakeSpan("storage::Client::ListBucketAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->ListBucketAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->ListBucketAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::CreateBucketAcl( storage::internal::CreateBucketAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::CreateBucketAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->CreateBucketAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->CreateBucketAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::DeleteBucketAcl( storage::internal::DeleteBucketAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::DeleteBucketAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->DeleteBucketAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->DeleteBucketAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetBucketAcl( storage::internal::GetBucketAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::GetBucketAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->GetBucketAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->GetBucketAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::UpdateBucketAcl( storage::internal::UpdateBucketAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::UpdateBucketAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->UpdateBucketAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->UpdateBucketAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::PatchBucketAcl( storage::internal::PatchBucketAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::PatchBucketAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->PatchBucketAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->PatchBucketAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -344,42 +493,60 @@ TracingConnection::ListObjectAcl( // TODO(#11395) - use a internal::MakeTracedStreamRange in storage::Client auto span = internal::MakeSpan("storage::Client::ListObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->ListObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->ListObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::CreateObjectAcl( storage::internal::CreateObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::CreateObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->CreateObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->CreateObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::DeleteObjectAcl( storage::internal::DeleteObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::DeleteObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->DeleteObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->DeleteObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetObjectAcl( storage::internal::GetObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::GetObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->GetObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->GetObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::UpdateObjectAcl( storage::internal::UpdateObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::UpdateObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->UpdateObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->UpdateObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::PatchObjectAcl( storage::internal::PatchObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::PatchObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->PatchObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->PatchObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -388,7 +555,10 @@ TracingConnection::ListDefaultObjectAcl( // TODO(#11395) - use a internal::MakeTracedStreamRange in storage::Client auto span = internal::MakeSpan("storage::Client::ListDefaultObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->ListDefaultObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->ListDefaultObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -396,7 +566,10 @@ TracingConnection::CreateDefaultObjectAcl( storage::internal::CreateDefaultObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::CreateDefaultObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->CreateDefaultObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->CreateDefaultObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -404,14 +577,20 @@ TracingConnection::DeleteDefaultObjectAcl( storage::internal::DeleteDefaultObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::DeleteDefaultObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->DeleteDefaultObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->DeleteDefaultObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetDefaultObjectAcl( storage::internal::GetDefaultObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::GetDefaultObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->GetDefaultObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->GetDefaultObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -419,14 +598,20 @@ TracingConnection::UpdateDefaultObjectAcl( storage::internal::UpdateDefaultObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::UpdateDefaultObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->UpdateDefaultObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->UpdateDefaultObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::PatchDefaultObjectAcl( storage::internal::PatchDefaultObjectAclRequest const& request) { auto span = internal::MakeSpan("storage::Client::PatchDefaultObjectAcl"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->PatchDefaultObjectAcl(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->PatchDefaultObjectAcl(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetServiceAccount( @@ -487,21 +672,30 @@ TracingConnection::ListNotifications( // TODO(#11395) - use a internal::MakeTracedStreamRange in storage::Client auto span = internal::MakeSpan("storage::Client::ListNotifications"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->ListNotifications(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->ListNotifications(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::CreateNotification( storage::internal::CreateNotificationRequest const& request) { auto span = internal::MakeSpan("storage::Client::CreateNotification"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->CreateNotification(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->CreateNotification(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr TracingConnection::GetNotification( storage::internal::GetNotificationRequest const& request) { auto span = internal::MakeSpan("storage::Client::GetNotification"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->GetNotification(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->GetNotification(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } StatusOr @@ -509,7 +703,10 @@ TracingConnection::DeleteNotification( storage::internal::DeleteNotificationRequest const& request) { auto span = internal::MakeSpan("storage::Client::DeleteNotification"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan(*span, impl_->DeleteNotification(request)); + EnrichSpan(*span, request.bucket_name()); + auto result = impl_->DeleteNotification(request); + MaybeInvalidate(result, request.bucket_name()); + return internal::EndSpan(*span, std::move(result)); } std::vector TracingConnection::InspectStackStructure() const { diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index aab04a6bd8e1a..5142178c549c8 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -18,8 +18,14 @@ #include "google/cloud/storage/internal/storage_connection.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/storage/version.h" +#include "absl/types/optional.h" +#include +#include #include +#include #include +#include +#include #include namespace google { @@ -27,10 +33,70 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +struct BucketCacheEntry { + std::string id; + std::string location; +}; + +class BucketMetadataCache { + public: + explicit BucketMetadataCache(std::size_t max_size = 10000) + : max_size_(max_size) {} + + absl::optional Get(std::string const& bucket_name) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it == map_.end()) return absl::nullopt; + + list_.erase(it->second.second); + list_.push_front(bucket_name); + it->second.second = list_.begin(); + return it->second.first; + } + + void Put(std::string const& bucket_name, BucketCacheEntry entry) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it != map_.end()) { + it->second.first = entry; + list_.erase(it->second.second); + list_.push_front(bucket_name); + it->second.second = list_.begin(); + return; + } + + if (map_.size() >= max_size_) { + auto oldest = list_.back(); + list_.pop_back(); + map_.erase(oldest); + } + + list_.push_front(bucket_name); + map_[bucket_name] = {std::move(entry), list_.begin()}; + } + + void Invalidate(std::string const& bucket_name) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it != map_.end()) { + list_.erase(it->second.second); + map_.erase(it); + } + } + + private: + std::size_t max_size_; + std::mutex mu_; + std::list list_; + std::unordered_map::iterator>> + map_; +}; + class TracingConnection : public storage::internal::StorageConnection { public: explicit TracingConnection(std::shared_ptr impl); - ~TracingConnection() override = default; + ~TracingConnection() override; Options options() const override; @@ -178,10 +244,32 @@ class TracingConnection : public storage::internal::StorageConnection { std::vector InspectStackStructure() const override; private: + void EnrichSpan(opentelemetry::trace::Span& span, + std::string const& bucket_name); void EnrichSpan(opentelemetry::trace::Span& span, storage::BucketMetadata const& metadata); + void MaybeTriggerBackgroundFetch(std::string const& bucket_name); + void CleanupCompletedTasks(); + + template + void MaybeInvalidate(StatusOr const& result, + std::string const& bucket_name) { + if (!result.ok() && result.status().code() == StatusCode::kNotFound) { + cache_.Invalidate(bucket_name); + } + } + + void MaybeInvalidate(Status const& status, std::string const& bucket_name) { + if (!status.ok() && status.code() == StatusCode::kNotFound) { + cache_.Invalidate(bucket_name); + } + } std::shared_ptr impl_; + BucketMetadataCache cache_; + std::mutex mu_; + std::unordered_set in_flight_fetch_; + std::vector> bg_tasks_; }; std::shared_ptr MakeTracingClient( diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index adbdb253b5dd5..16cf9159cf093 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -34,6 +34,7 @@ using ::google::cloud::storage::testing::MockClient; using ::google::cloud::storage::testing::MockObjectReadSource; using ::google::cloud::storage::testing::canonical_errors::PermanentError; using ::google::cloud::testing_util::InstallSpanCatcher; +using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::OTelAttribute; using ::google::cloud::testing_util::SpanHasAttributes; using ::google::cloud::testing_util::SpanHasInstrumentationScope; @@ -41,7 +42,6 @@ using ::google::cloud::testing_util::SpanIsRoot; using ::google::cloud::testing_util::SpanKindIsClient; using ::google::cloud::testing_util::SpanNamed; using ::google::cloud::testing_util::SpanWithStatus; -using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::StatusIs; using ::google::cloud::testing_util::ThereIsAnActiveSpan; using ::testing::AllOf; @@ -119,16 +119,20 @@ TEST(TracingClientTest, CreateBucketSuccess) { return metadata; }); auto under_test = TracingConnection(mock); - auto actual = under_test.CreateBucket(storage::internal::CreateBucketRequest()); + auto actual = + under_test.CreateBucket(storage::internal::CreateBucketRequest()); EXPECT_THAT(actual, IsOk()); - EXPECT_THAT(span_catcher->GetSpans(), - ElementsAre(AllOf( - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanNamed("storage::Client::CreateBucket"), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk), - SpanHasAttributes( - OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), - OTelAttribute("gcp.resource.destination.location", "us-east1"))))); + EXPECT_THAT( + span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::CreateBucket"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", + "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", + "us-east1"))))); } TEST(TracingClientTest, GetBucketMetadata) { @@ -170,14 +174,69 @@ TEST(TracingClientTest, GetBucketMetadataSuccess) { auto actual = under_test.GetBucketMetadata( storage::internal::GetBucketMetadataRequest("test-bucket")); EXPECT_THAT(actual, IsOk()); - EXPECT_THAT(span_catcher->GetSpans(), - ElementsAre(AllOf( - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanNamed("storage::Client::GetBucketMetadata"), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk), - SpanHasAttributes( - OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), - OTelAttribute("gcp.resource.destination.location", "us-east1"))))); + EXPECT_THAT( + span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::GetBucketMetadata"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", + "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", + "us-east1"))))); +} + +TEST(TracingClientTest, BucketMetadataCacheSuccess) { + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_shared(); + + std::promise bg_fetch_done; + auto bg_fetch_future = bg_fetch_done.get_future(); + + EXPECT_CALL(*mock, GetObjectMetadata).WillOnce([](auto const&) { + return PermanentError(); + }); + + EXPECT_CALL(*mock, GetBucketMetadata) + .WillOnce([&bg_fetch_done](auto const& request) { + EXPECT_EQ("test-bucket", request.bucket_name()); + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + bg_fetch_done.set_value(); + return metadata; + }); + + auto under_test = TracingConnection(mock); + + (void)under_test.GetObjectMetadata( + storage::internal::GetObjectMetadataRequest("test-bucket", + "test-object")); + + bg_fetch_future.wait_for(std::chrono::seconds(5)); + + EXPECT_CALL(*mock, DeleteObject).WillOnce([](auto const&) { + return PermanentError(); + }); + + // Clear spans from GetObjectMetadata + (void)span_catcher->GetSpans(); + + (void)under_test.DeleteObject( + storage::internal::DeleteObjectRequest("test-bucket", "test-object")); + + EXPECT_THAT( + span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanNamed("storage::Client::DeleteObject"), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", + "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", + "us-east1"))))); } TEST(TracingClientTest, DeleteBucket) { @@ -239,16 +298,20 @@ TEST(TracingClientTest, UpdateBucketSuccess) { return metadata; }); auto under_test = TracingConnection(mock); - auto actual = under_test.UpdateBucket(storage::internal::UpdateBucketRequest()); + auto actual = + under_test.UpdateBucket(storage::internal::UpdateBucketRequest()); EXPECT_THAT(actual, IsOk()); - EXPECT_THAT(span_catcher->GetSpans(), - ElementsAre(AllOf( - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanNamed("storage::Client::UpdateBucket"), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk), - SpanHasAttributes( - OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), - OTelAttribute("gcp.resource.destination.location", "us-east1"))))); + EXPECT_THAT( + span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::UpdateBucket"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", + "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", + "us-east1"))))); } TEST(TracingClientTest, PatchBucket) { @@ -288,14 +351,17 @@ TEST(TracingClientTest, PatchBucketSuccess) { auto under_test = TracingConnection(mock); auto actual = under_test.PatchBucket(storage::internal::PatchBucketRequest()); EXPECT_THAT(actual, IsOk()); - EXPECT_THAT(span_catcher->GetSpans(), - ElementsAre(AllOf( - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanNamed("storage::Client::PatchBucket"), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk), - SpanHasAttributes( - OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), - OTelAttribute("gcp.resource.destination.location", "us-east1"))))); + EXPECT_THAT( + span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::PatchBucket"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", + "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", + "us-east1"))))); } TEST(TracingClientTest, GetNativeBucketIamPolicy) { @@ -408,14 +474,17 @@ TEST(TracingClientTest, LockBucketRetentionPolicySuccess) { auto actual = under_test.LockBucketRetentionPolicy( storage::internal::LockBucketRetentionPolicyRequest()); EXPECT_THAT(actual, IsOk()); - EXPECT_THAT(span_catcher->GetSpans(), - ElementsAre(AllOf( - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanNamed("storage::Client::LockBucketRetentionPolicy"), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk), - SpanHasAttributes( - OTelAttribute("gcp.resource.destination.id", "projects/123456/buckets/test-bucket"), - OTelAttribute("gcp.resource.destination.location", "us-east1"))))); + EXPECT_THAT( + span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("storage::Client::LockBucketRetentionPolicy"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasAttributes( + OTelAttribute("gcp.resource.destination.id", + "projects/123456/buckets/test-bucket"), + OTelAttribute("gcp.resource.destination.location", + "us-east1"))))); } TEST(TracingClientTest, InsertObjectMedia) { From 713c565b17d666b00b19cae76d1cad0e9aee49a9 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 2 Jun 2026 14:11:22 +0000 Subject: [PATCH 05/20] fix ci failures --- .../storage/internal/tracing_connection.cc | 17 +++++++++++++---- .../cloud/storage/internal/tracing_connection.h | 15 ++++++++++++--- .../storage/internal/tracing_connection_test.cc | 1 + 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index b29f20156d930..c63633494c1cb 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -36,6 +36,15 @@ TracingConnection::~TracingConnection() { } } +BucketMetadataCache& TracingConnection::cache() { + static BucketMetadataCache instance(10000); + return instance; +} + +void TracingConnection::ResetCacheForTesting() { + cache().Clear(); +} + Options TracingConnection::options() const { return impl_->options(); } void TracingConnection::CleanupCompletedTasks() { @@ -72,11 +81,11 @@ void TracingConnection::MaybeTriggerBackgroundFetch( result->location_type() == "dual-region") { entry.location = "global"; } - cache_.Put(bucket_name, std::move(entry)); + cache().Put(bucket_name, std::move(entry)); } else if (result.status().code() == StatusCode::kPermissionDenied) { entry.id = "projects/_/buckets/" + bucket_name; entry.location = "global"; - cache_.Put(bucket_name, std::move(entry)); + cache().Put(bucket_name, std::move(entry)); } std::lock_guard lock(mu_); @@ -89,7 +98,7 @@ void TracingConnection::MaybeTriggerBackgroundFetch( void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, std::string const& bucket_name) { if (bucket_name.empty()) return; - auto entry = cache_.Get(bucket_name); + auto entry = cache().Get(bucket_name); if (entry.has_value()) { span.SetAttribute("gcp.resource.destination.id", entry->id); span.SetAttribute("gcp.resource.destination.location", entry->location); @@ -111,7 +120,7 @@ void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, span.SetAttribute("gcp.resource.destination.location", location); // Populate cache since we have metadata! - cache_.Put(metadata.name(), {id, location}); + cache().Put(metadata.name(), {id, location}); } StatusOr TracingConnection::ListBuckets( diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 5142178c549c8..947dba8a27474 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -84,6 +84,12 @@ class BucketMetadataCache { } } + void Clear() { + std::lock_guard lock(mu_); + map_.clear(); + list_.clear(); + } + private: std::size_t max_size_; std::mutex mu_; @@ -98,6 +104,8 @@ class TracingConnection : public storage::internal::StorageConnection { explicit TracingConnection(std::shared_ptr impl); ~TracingConnection() override; + static void ResetCacheForTesting(); + Options options() const override; StatusOr ListBuckets( @@ -255,18 +263,19 @@ class TracingConnection : public storage::internal::StorageConnection { void MaybeInvalidate(StatusOr const& result, std::string const& bucket_name) { if (!result.ok() && result.status().code() == StatusCode::kNotFound) { - cache_.Invalidate(bucket_name); + cache().Invalidate(bucket_name); } } void MaybeInvalidate(Status const& status, std::string const& bucket_name) { if (!status.ok() && status.code() == StatusCode::kNotFound) { - cache_.Invalidate(bucket_name); + cache().Invalidate(bucket_name); } } + static BucketMetadataCache& cache(); + std::shared_ptr impl_; - BucketMetadataCache cache_; std::mutex mu_; std::unordered_set in_flight_fetch_; std::vector> bg_tasks_; diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 16cf9159cf093..8b402f3245306 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -188,6 +188,7 @@ TEST(TracingClientTest, GetBucketMetadataSuccess) { } TEST(TracingClientTest, BucketMetadataCacheSuccess) { + TracingConnection::ResetCacheForTesting(); auto span_catcher = InstallSpanCatcher(); auto mock = std::make_shared(); From 0371f0d1499e5b475a6cdacdae6681b9fa4d0a78 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 2 Jun 2026 14:24:29 +0000 Subject: [PATCH 06/20] fix ci failures --- google/cloud/storage/internal/tracing_connection.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index c63633494c1cb..c6bd8e1b1ff58 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -41,9 +41,7 @@ BucketMetadataCache& TracingConnection::cache() { return instance; } -void TracingConnection::ResetCacheForTesting() { - cache().Clear(); -} +void TracingConnection::ResetCacheForTesting() { cache().Clear(); } Options TracingConnection::options() const { return impl_->options(); } From c58347f364cc48aca73097e6d0e0f46033026282 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Wed, 3 Jun 2026 08:09:39 +0000 Subject: [PATCH 07/20] fix ci failures --- .../storage/internal/tracing_connection.cc | 17 ++-- .../storage/internal/tracing_connection.h | 95 ------------------- .../internal/tracing_connection_test.cc | 5 +- 3 files changed, 15 insertions(+), 102 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index c6bd8e1b1ff58..7c1feb859a0dd 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -17,6 +17,7 @@ #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/opentelemetry.h" #include +#include #include #include #include @@ -59,16 +60,21 @@ void TracingConnection::MaybeTriggerBackgroundFetch( std::string const& bucket_name) { CleanupCompletedTasks(); - std::lock_guard lock(mu_); - if (in_flight_fetch_.find(bucket_name) != in_flight_fetch_.end()) { + if (!cache().StartFetch(bucket_name)) { return; } - in_flight_fetch_.insert(bucket_name); - auto f = std::async(std::launch::async, [this, bucket_name]() { storage::internal::GetBucketMetadataRequest request(bucket_name); auto result = impl_->GetBucketMetadata(request); + std::cout << "BG Thread: GetBucketMetadata returned ok: " << result.ok() + << "\n"; + if (!result.ok()) { + std::cout << "BG Thread: GetBucketMetadata status: " + << result.status().message() + << " code: " << static_cast(result.status().code()) + << "\n"; + } BucketCacheEntry entry; if (result.ok()) { @@ -86,8 +92,7 @@ void TracingConnection::MaybeTriggerBackgroundFetch( cache().Put(bucket_name, std::move(entry)); } - std::lock_guard lock(mu_); - in_flight_fetch_.erase(bucket_name); + cache().EndFetch(bucket_name); }); bg_tasks_.push_back(std::move(f)); diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 947dba8a27474..bd648b1d68577 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -18,9 +18,6 @@ #include "google/cloud/storage/internal/storage_connection.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/storage/version.h" -#include "absl/types/optional.h" -#include -#include #include #include #include @@ -33,72 +30,6 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -struct BucketCacheEntry { - std::string id; - std::string location; -}; - -class BucketMetadataCache { - public: - explicit BucketMetadataCache(std::size_t max_size = 10000) - : max_size_(max_size) {} - - absl::optional Get(std::string const& bucket_name) { - std::lock_guard lock(mu_); - auto it = map_.find(bucket_name); - if (it == map_.end()) return absl::nullopt; - - list_.erase(it->second.second); - list_.push_front(bucket_name); - it->second.second = list_.begin(); - return it->second.first; - } - - void Put(std::string const& bucket_name, BucketCacheEntry entry) { - std::lock_guard lock(mu_); - auto it = map_.find(bucket_name); - if (it != map_.end()) { - it->second.first = entry; - list_.erase(it->second.second); - list_.push_front(bucket_name); - it->second.second = list_.begin(); - return; - } - - if (map_.size() >= max_size_) { - auto oldest = list_.back(); - list_.pop_back(); - map_.erase(oldest); - } - - list_.push_front(bucket_name); - map_[bucket_name] = {std::move(entry), list_.begin()}; - } - - void Invalidate(std::string const& bucket_name) { - std::lock_guard lock(mu_); - auto it = map_.find(bucket_name); - if (it != map_.end()) { - list_.erase(it->second.second); - map_.erase(it); - } - } - - void Clear() { - std::lock_guard lock(mu_); - map_.clear(); - list_.clear(); - } - - private: - std::size_t max_size_; - std::mutex mu_; - std::list list_; - std::unordered_map::iterator>> - map_; -}; - class TracingConnection : public storage::internal::StorageConnection { public: explicit TracingConnection(std::shared_ptr impl); @@ -252,33 +183,7 @@ class TracingConnection : public storage::internal::StorageConnection { std::vector InspectStackStructure() const override; private: - void EnrichSpan(opentelemetry::trace::Span& span, - std::string const& bucket_name); - void EnrichSpan(opentelemetry::trace::Span& span, - storage::BucketMetadata const& metadata); - void MaybeTriggerBackgroundFetch(std::string const& bucket_name); - void CleanupCompletedTasks(); - - template - void MaybeInvalidate(StatusOr const& result, - std::string const& bucket_name) { - if (!result.ok() && result.status().code() == StatusCode::kNotFound) { - cache().Invalidate(bucket_name); - } - } - - void MaybeInvalidate(Status const& status, std::string const& bucket_name) { - if (!status.ok() && status.code() == StatusCode::kNotFound) { - cache().Invalidate(bucket_name); - } - } - - static BucketMetadataCache& cache(); - std::shared_ptr impl_; - std::mutex mu_; - std::unordered_set in_flight_fetch_; - std::vector> bg_tasks_; }; std::shared_ptr MakeTracingClient( diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 8b402f3245306..0bd1c9308e0d2 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include namespace google { @@ -33,6 +34,7 @@ namespace { using ::google::cloud::storage::testing::MockClient; using ::google::cloud::storage::testing::MockObjectReadSource; using ::google::cloud::storage::testing::canonical_errors::PermanentError; +using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::testing_util::InstallSpanCatcher; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::OTelAttribute; @@ -196,7 +198,7 @@ TEST(TracingClientTest, BucketMetadataCacheSuccess) { auto bg_fetch_future = bg_fetch_done.get_future(); EXPECT_CALL(*mock, GetObjectMetadata).WillOnce([](auto const&) { - return PermanentError(); + return TransientError(); }); EXPECT_CALL(*mock, GetBucketMetadata) @@ -218,6 +220,7 @@ TEST(TracingClientTest, BucketMetadataCacheSuccess) { "test-object")); bg_fetch_future.wait_for(std::chrono::seconds(5)); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); EXPECT_CALL(*mock, DeleteObject).WillOnce([](auto const&) { return PermanentError(); From a96acb0f270afe78d2b819d69d68b5b62bbc81d4 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Wed, 3 Jun 2026 10:32:05 +0000 Subject: [PATCH 08/20] fix ci failures --- .../storage/internal/tracing_connection.cc | 64 +++++----- .../storage/internal/tracing_connection.h | 111 ++++++++++++++++++ 2 files changed, 145 insertions(+), 30 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 7c1feb859a0dd..327466cbef263 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -16,6 +16,7 @@ #include "google/cloud/storage/internal/tracing_object_read_source.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/opentelemetry.h" +#include "google/cloud/options.h" #include #include #include @@ -64,36 +65,39 @@ void TracingConnection::MaybeTriggerBackgroundFetch( return; } - auto f = std::async(std::launch::async, [this, bucket_name]() { - storage::internal::GetBucketMetadataRequest request(bucket_name); - auto result = impl_->GetBucketMetadata(request); - std::cout << "BG Thread: GetBucketMetadata returned ok: " << result.ok() - << "\n"; - if (!result.ok()) { - std::cout << "BG Thread: GetBucketMetadata status: " - << result.status().message() - << " code: " << static_cast(result.status().code()) - << "\n"; - } - - BucketCacheEntry entry; - if (result.ok()) { - entry.id = "projects/" + std::to_string(result->project_number()) + - "/buckets/" + result->name(); - entry.location = result->location(); - if (result->location_type() == "multi-region" || - result->location_type() == "dual-region") { - entry.location = "global"; - } - cache().Put(bucket_name, std::move(entry)); - } else if (result.status().code() == StatusCode::kPermissionDenied) { - entry.id = "projects/_/buckets/" + bucket_name; - entry.location = "global"; - cache().Put(bucket_name, std::move(entry)); - } - - cache().EndFetch(bucket_name); - }); + auto current_options = google::cloud::internal::SaveCurrentOptions(); + auto f = + std::async(std::launch::async, [this, bucket_name, current_options]() { + google::cloud::internal::OptionsSpan span(current_options); + storage::internal::GetBucketMetadataRequest request(bucket_name); + auto result = impl_->GetBucketMetadata(request); + std::cout << "BG Thread: GetBucketMetadata returned ok: " << result.ok() + << "\n"; + if (!result.ok()) { + std::cout << "BG Thread: GetBucketMetadata status: " + << result.status().message() + << " code: " << static_cast(result.status().code()) + << "\n"; + } + + BucketCacheEntry entry; + if (result.ok()) { + entry.id = "projects/" + std::to_string(result->project_number()) + + "/buckets/" + result->name(); + entry.location = result->location(); + if (result->location_type() == "multi-region" || + result->location_type() == "dual-region") { + entry.location = "global"; + } + cache().Put(bucket_name, std::move(entry)); + } else if (result.status().code() == StatusCode::kPermissionDenied) { + entry.id = "projects/_/buckets/" + bucket_name; + entry.location = "global"; + cache().Put(bucket_name, std::move(entry)); + } + + cache().EndFetch(bucket_name); + }); bg_tasks_.push_back(std::move(f)); } diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index bd648b1d68577..4c0ce96b97846 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -18,6 +18,9 @@ #include "google/cloud/storage/internal/storage_connection.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/storage/version.h" +#include "absl/types/optional.h" +#include +#include #include #include #include @@ -30,6 +33,88 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +struct BucketCacheEntry { + std::string id; + std::string location; +}; + +class BucketMetadataCache { + public: + explicit BucketMetadataCache(std::size_t max_size = 10000) + : max_size_(max_size) {} + + absl::optional Get(std::string const& bucket_name) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it == map_.end()) return absl::nullopt; + + list_.erase(it->second.second); + list_.push_front(bucket_name); + it->second.second = list_.begin(); + return it->second.first; + } + + void Put(std::string const& bucket_name, BucketCacheEntry entry) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it != map_.end()) { + it->second.first = entry; + list_.erase(it->second.second); + list_.push_front(bucket_name); + it->second.second = list_.begin(); + return; + } + + if (map_.size() >= max_size_) { + auto oldest = list_.back(); + list_.pop_back(); + map_.erase(oldest); + } + + list_.push_front(bucket_name); + map_[bucket_name] = {std::move(entry), list_.begin()}; + } + + void Invalidate(std::string const& bucket_name) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it != map_.end()) { + list_.erase(it->second.second); + map_.erase(it); + } + } + + void Clear() { + std::lock_guard lock(mu_); + map_.clear(); + list_.clear(); + in_flight_fetch_.clear(); + } + + bool StartFetch(std::string const& bucket_name) { + std::lock_guard lock(mu_); + if (in_flight_fetch_.find(bucket_name) != in_flight_fetch_.end()) { + return false; + } + in_flight_fetch_.insert(bucket_name); + return true; + } + + void EndFetch(std::string const& bucket_name) { + std::lock_guard lock(mu_); + in_flight_fetch_.erase(bucket_name); + } + + private: + std::size_t max_size_; + std::mutex mu_; + std::list list_; + std::unordered_map::iterator>> + map_; + std::unordered_set in_flight_fetch_; +}; + class TracingConnection : public storage::internal::StorageConnection { public: explicit TracingConnection(std::shared_ptr impl); @@ -183,7 +268,33 @@ class TracingConnection : public storage::internal::StorageConnection { std::vector InspectStackStructure() const override; private: + void EnrichSpan(opentelemetry::trace::Span& span, + std::string const& bucket_name); + static void EnrichSpan(opentelemetry::trace::Span& span, + storage::BucketMetadata const& metadata); + void MaybeTriggerBackgroundFetch(std::string const& bucket_name); + void CleanupCompletedTasks(); + + template + static void MaybeInvalidate(StatusOr const& result, + std::string const& bucket_name) { + if (!result.ok() && result.status().code() == StatusCode::kNotFound) { + cache().Invalidate(bucket_name); + } + } + + static void MaybeInvalidate(Status const& status, + std::string const& bucket_name) { + if (!status.ok() && status.code() == StatusCode::kNotFound) { + cache().Invalidate(bucket_name); + } + } + + static BucketMetadataCache& cache(); + std::shared_ptr impl_; + std::mutex mu_; + std::vector> bg_tasks_; }; std::shared_ptr MakeTracingClient( From 9365a01de44cdc0b758e76adf72f0845e26910ce Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Mon, 8 Jun 2026 11:51:19 +0000 Subject: [PATCH 09/20] fix ci failures --- ...plenty_clients_serially_integration_test.cc | 16 +++++++++------- ..._clients_simultaneously_integration_test.cc | 18 ++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc b/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc index f947d62118c76..b37026e27f79d 100644 --- a/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc +++ b/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc @@ -42,15 +42,17 @@ TEST_F(ObjectPlentyClientsSeriallyIntegrationTest, PlentyClientsSerially) { // own tests. if (UsingGrpc()) GTEST_SKIP(); - auto client = MakeIntegrationTestClient(); + auto options = Options{}.set(false); auto object_name = MakeRandomObjectName(); - std::string expected = LoremIpsum(); - StatusOr meta = client.InsertObject( - bucket_name_, object_name, expected, IfGenerationMatch(0)); - ASSERT_STATUS_OK(meta); - ScheduleForDelete(*meta); + { + auto client = MakeIntegrationTestClient(options); + StatusOr meta = client.InsertObject( + bucket_name_, object_name, expected, IfGenerationMatch(0)); + ASSERT_STATUS_OK(meta); + ScheduleForDelete(*meta); + } // Track the number of open files to ensure every client creates the same // number of file descriptors and none are leaked. @@ -64,7 +66,7 @@ TEST_F(ObjectPlentyClientsSeriallyIntegrationTest, PlentyClientsSerially) { } std::size_t delta = 0; for (int i = 0; i != 100; ++i) { - auto read_client = MakeIntegrationTestClient(); + auto read_client = MakeIntegrationTestClient(options); auto stream = read_client.ReadObject(bucket_name_, object_name); char c; stream.read(&c, 1); diff --git a/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc b/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc index 3e489b2e23db0..82dee21ffb9ed 100644 --- a/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc +++ b/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc @@ -16,6 +16,7 @@ #include "google/cloud/storage/testing/object_integration_test.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/log.h" +#include "google/cloud/opentelemetry_options.h" #include "google/cloud/status_or.h" #include "google/cloud/testing_util/expect_exception.h" #include "google/cloud/testing_util/status_matchers.h" @@ -45,22 +46,23 @@ TEST_F(ObjectPlentyClientsSimultaneouslyIntegrationTest, // own tests. if (UsingGrpc()) GTEST_SKIP(); - auto client = MakeIntegrationTestClient(); + auto options = Options{}.set(false); auto object_name = MakeRandomObjectName(); - std::string expected = LoremIpsum(); - // Create the object, but only if it does not exist already. - StatusOr meta = client.InsertObject( - bucket_name_, object_name, expected, IfGenerationMatch(0)); - ASSERT_STATUS_OK(meta); - ScheduleForDelete(*meta); + { + auto client = MakeIntegrationTestClient(options); + StatusOr meta = client.InsertObject( + bucket_name_, object_name, expected, IfGenerationMatch(0)); + ASSERT_STATUS_OK(meta); + ScheduleForDelete(*meta); + } auto num_fds_before_test = GetNumOpenFiles(); std::vector read_clients; std::vector read_streams; for (int i = 0; i != 100; ++i) { - auto read_client = MakeIntegrationTestClient(); + auto read_client = MakeIntegrationTestClient(options); auto stream = read_client.ReadObject(bucket_name_, object_name); char c; stream.read(&c, 1); From f2c6d981089fa4fe7b7dd3668b0c214b9d16faf7 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Mon, 8 Jun 2026 14:27:46 +0000 Subject: [PATCH 10/20] fix ci failures --- .../tests/object_plenty_clients_serially_integration_test.cc | 4 +++- .../object_plenty_clients_simultaneously_integration_test.cc | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc b/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc index b37026e27f79d..508010073c09c 100644 --- a/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc +++ b/google/cloud/storage/tests/object_plenty_clients_serially_integration_test.cc @@ -16,6 +16,7 @@ #include "google/cloud/storage/testing/object_integration_test.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/log.h" +#include "google/cloud/opentelemetry_options.h" #include "google/cloud/status_or.h" #include "google/cloud/testing_util/expect_exception.h" #include "google/cloud/testing_util/status_matchers.h" @@ -42,7 +43,8 @@ TEST_F(ObjectPlentyClientsSeriallyIntegrationTest, PlentyClientsSerially) { // own tests. if (UsingGrpc()) GTEST_SKIP(); - auto options = Options{}.set(false); + auto options = + Options{}.set(false); auto object_name = MakeRandomObjectName(); std::string expected = LoremIpsum(); diff --git a/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc b/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc index 82dee21ffb9ed..26204cc90c5fe 100644 --- a/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc +++ b/google/cloud/storage/tests/object_plenty_clients_simultaneously_integration_test.cc @@ -46,7 +46,8 @@ TEST_F(ObjectPlentyClientsSimultaneouslyIntegrationTest, // own tests. if (UsingGrpc()) GTEST_SKIP(); - auto options = Options{}.set(false); + auto options = + Options{}.set(false); auto object_name = MakeRandomObjectName(); std::string expected = LoremIpsum(); From 4974d799624b51f38c54bf1852951526b68fe72a Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 9 Jun 2026 10:56:34 +0000 Subject: [PATCH 11/20] move BucketMetadataCache to separate file --- .../storage/google_cloud_cpp_storage.cmake | 2 + .../storage/internal/bucket_metadata_cache.cc | 91 +++++++++++++++++++ .../storage/internal/bucket_metadata_cache.h | 65 +++++++++++++ .../storage/internal/tracing_connection.h | 87 +----------------- 4 files changed, 159 insertions(+), 86 deletions(-) create mode 100644 google/cloud/storage/internal/bucket_metadata_cache.cc create mode 100644 google/cloud/storage/internal/bucket_metadata_cache.h diff --git a/google/cloud/storage/google_cloud_cpp_storage.cmake b/google/cloud/storage/google_cloud_cpp_storage.cmake index f3d70b6766817..8e64ebef61389 100644 --- a/google/cloud/storage/google_cloud_cpp_storage.cmake +++ b/google/cloud/storage/google_cloud_cpp_storage.cmake @@ -74,6 +74,8 @@ add_library( internal/bucket_access_control_parser.h internal/bucket_acl_requests.cc internal/bucket_acl_requests.h + internal/bucket_metadata_cache.cc + internal/bucket_metadata_cache.h internal/bucket_metadata_parser.cc internal/bucket_metadata_parser.h internal/bucket_requests.cc diff --git a/google/cloud/storage/internal/bucket_metadata_cache.cc b/google/cloud/storage/internal/bucket_metadata_cache.cc new file mode 100644 index 0000000000000..ff0b8ce02ba4c --- /dev/null +++ b/google/cloud/storage/internal/bucket_metadata_cache.cc @@ -0,0 +1,91 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/storage/internal/bucket_metadata_cache.h" +#include +#include + +namespace google { +namespace cloud { +namespace storage_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +absl::optional BucketMetadataCache::Get( + std::string const& bucket_name) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it == map_.end()) return absl::nullopt; + + list_.erase(it->second.second); + list_.push_front(bucket_name); + it->second.second = list_.begin(); + return it->second.first; +} + +void BucketMetadataCache::Put(std::string const& bucket_name, + BucketCacheEntry entry) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it != map_.end()) { + it->second.first = std::move(entry); + list_.erase(it->second.second); + list_.push_front(bucket_name); + it->second.second = list_.begin(); + return; + } + + if (map_.size() >= max_size_) { + auto oldest = list_.back(); + list_.pop_back(); + map_.erase(oldest); + } + + list_.push_front(bucket_name); + map_[bucket_name] = {std::move(entry), list_.begin()}; +} + +void BucketMetadataCache::Invalidate(std::string const& bucket_name) { + std::lock_guard lock(mu_); + auto it = map_.find(bucket_name); + if (it != map_.end()) { + list_.erase(it->second.second); + map_.erase(it); + } +} + +void BucketMetadataCache::Clear() { + std::lock_guard lock(mu_); + map_.clear(); + list_.clear(); + in_flight_fetch_.clear(); +} + +bool BucketMetadataCache::StartFetch(std::string const& bucket_name) { + std::lock_guard lock(mu_); + if (in_flight_fetch_.find(bucket_name) != in_flight_fetch_.end()) { + return false; + } + in_flight_fetch_.insert(bucket_name); + return true; +} + +void BucketMetadataCache::EndFetch(std::string const& bucket_name) { + std::lock_guard lock(mu_); + in_flight_fetch_.erase(bucket_name); +} + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace storage_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/storage/internal/bucket_metadata_cache.h b/google/cloud/storage/internal/bucket_metadata_cache.h new file mode 100644 index 0000000000000..cf7dc95971a62 --- /dev/null +++ b/google/cloud/storage/internal/bucket_metadata_cache.h @@ -0,0 +1,65 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_BUCKET_METADATA_CACHE_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_BUCKET_METADATA_CACHE_H + +#include "google/cloud/storage/version.h" +#include "absl/types/optional.h" +#include +#include +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace storage_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +struct BucketCacheEntry { + std::string id; + std::string location; +}; + +class BucketMetadataCache { + public: + explicit BucketMetadataCache(std::size_t max_size = 10000) + : max_size_(max_size) {} + + absl::optional Get(std::string const& bucket_name); + void Put(std::string const& bucket_name, BucketCacheEntry entry); + void Invalidate(std::string const& bucket_name); + void Clear(); + bool StartFetch(std::string const& bucket_name); + void EndFetch(std::string const& bucket_name); + + private: + std::size_t max_size_; + std::mutex mu_; + std::list list_; + std::unordered_map::iterator>> + map_; + std::unordered_set in_flight_fetch_; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace storage_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_BUCKET_METADATA_CACHE_H diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 4c0ce96b97846..9a88d9693a7b4 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -15,17 +15,14 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_TRACING_CONNECTION_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_TRACING_CONNECTION_H +#include "google/cloud/storage/internal/bucket_metadata_cache.h" #include "google/cloud/storage/internal/storage_connection.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/storage/version.h" -#include "absl/types/optional.h" #include -#include #include #include #include -#include -#include #include namespace google { @@ -33,88 +30,6 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -struct BucketCacheEntry { - std::string id; - std::string location; -}; - -class BucketMetadataCache { - public: - explicit BucketMetadataCache(std::size_t max_size = 10000) - : max_size_(max_size) {} - - absl::optional Get(std::string const& bucket_name) { - std::lock_guard lock(mu_); - auto it = map_.find(bucket_name); - if (it == map_.end()) return absl::nullopt; - - list_.erase(it->second.second); - list_.push_front(bucket_name); - it->second.second = list_.begin(); - return it->second.first; - } - - void Put(std::string const& bucket_name, BucketCacheEntry entry) { - std::lock_guard lock(mu_); - auto it = map_.find(bucket_name); - if (it != map_.end()) { - it->second.first = entry; - list_.erase(it->second.second); - list_.push_front(bucket_name); - it->second.second = list_.begin(); - return; - } - - if (map_.size() >= max_size_) { - auto oldest = list_.back(); - list_.pop_back(); - map_.erase(oldest); - } - - list_.push_front(bucket_name); - map_[bucket_name] = {std::move(entry), list_.begin()}; - } - - void Invalidate(std::string const& bucket_name) { - std::lock_guard lock(mu_); - auto it = map_.find(bucket_name); - if (it != map_.end()) { - list_.erase(it->second.second); - map_.erase(it); - } - } - - void Clear() { - std::lock_guard lock(mu_); - map_.clear(); - list_.clear(); - in_flight_fetch_.clear(); - } - - bool StartFetch(std::string const& bucket_name) { - std::lock_guard lock(mu_); - if (in_flight_fetch_.find(bucket_name) != in_flight_fetch_.end()) { - return false; - } - in_flight_fetch_.insert(bucket_name); - return true; - } - - void EndFetch(std::string const& bucket_name) { - std::lock_guard lock(mu_); - in_flight_fetch_.erase(bucket_name); - } - - private: - std::size_t max_size_; - std::mutex mu_; - std::list list_; - std::unordered_map::iterator>> - map_; - std::unordered_set in_flight_fetch_; -}; - class TracingConnection : public storage::internal::StorageConnection { public: explicit TracingConnection(std::shared_ptr impl); From de0a4bb959fff70bc0d4e387c70eba1f3a913008 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 9 Jun 2026 12:10:20 +0000 Subject: [PATCH 12/20] add unit tests for bucket_metdata_cache --- .../storage/google_cloud_cpp_storage.bzl | 2 + .../storage/google_cloud_cpp_storage.cmake | 1 + .../internal/bucket_metadata_cache_test.cc | 103 ++++++++++++++++++ .../storage/storage_client_unit_tests.bzl | 1 + 4 files changed, 107 insertions(+) create mode 100644 google/cloud/storage/internal/bucket_metadata_cache_test.cc diff --git a/google/cloud/storage/google_cloud_cpp_storage.bzl b/google/cloud/storage/google_cloud_cpp_storage.bzl index ed40ddb04ab5f..de3c9b4516942 100644 --- a/google/cloud/storage/google_cloud_cpp_storage.bzl +++ b/google/cloud/storage/google_cloud_cpp_storage.bzl @@ -49,6 +49,7 @@ google_cloud_cpp_storage_hdrs = [ "internal/binary_data_as_debug_string.h", "internal/bucket_access_control_parser.h", "internal/bucket_acl_requests.h", + "internal/bucket_metadata_cache.h", "internal/bucket_metadata_parser.h", "internal/bucket_requests.h", "internal/complex_option.h", @@ -164,6 +165,7 @@ google_cloud_cpp_storage_srcs = [ "internal/base64.cc", "internal/bucket_access_control_parser.cc", "internal/bucket_acl_requests.cc", + "internal/bucket_metadata_cache.cc", "internal/bucket_metadata_parser.cc", "internal/bucket_requests.cc", "internal/compute_engine_util.cc", diff --git a/google/cloud/storage/google_cloud_cpp_storage.cmake b/google/cloud/storage/google_cloud_cpp_storage.cmake index 8e64ebef61389..52e106efc9e5c 100644 --- a/google/cloud/storage/google_cloud_cpp_storage.cmake +++ b/google/cloud/storage/google_cloud_cpp_storage.cmake @@ -422,6 +422,7 @@ if (BUILD_TESTING) idempotency_policy_test.cc internal/base64_test.cc internal/bucket_acl_requests_test.cc + internal/bucket_metadata_cache_test.cc internal/bucket_requests_test.cc internal/complex_option_test.cc internal/compute_engine_util_test.cc diff --git a/google/cloud/storage/internal/bucket_metadata_cache_test.cc b/google/cloud/storage/internal/bucket_metadata_cache_test.cc new file mode 100644 index 0000000000000..9290e7b7d4e6f --- /dev/null +++ b/google/cloud/storage/internal/bucket_metadata_cache_test.cc @@ -0,0 +1,103 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/storage/internal/bucket_metadata_cache.h" +#include + +namespace google { +namespace cloud { +namespace storage_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::testing::Eq; +using ::testing::IsFalse; +using ::testing::IsTrue; + +TEST(BucketMetadataCacheTest, HitAndMiss) { + BucketMetadataCache cache(10); + EXPECT_FALSE(cache.Get("test-bucket").has_value()); + + BucketCacheEntry entry{"projects/123/buckets/test-bucket", "us-central1"}; + cache.Put("test-bucket", entry); + + auto res = cache.Get("test-bucket"); + ASSERT_TRUE(res.has_value()); + EXPECT_THAT(res->id, Eq("projects/123/buckets/test-bucket")); + EXPECT_THAT(res->location, Eq("us-central1")); +} + +TEST(BucketMetadataCacheTest, PutUpdatesExisting) { + BucketMetadataCache cache(10); + BucketCacheEntry entry1{"projects/123/buckets/test-bucket", "us-central1"}; + cache.Put("test-bucket", entry1); + + BucketCacheEntry entry2{"projects/456/buckets/test-bucket", "global"}; + cache.Put("test-bucket", entry2); + + auto res = cache.Get("test-bucket"); + ASSERT_TRUE(res.has_value()); + EXPECT_THAT(res->id, Eq("projects/456/buckets/test-bucket")); + EXPECT_THAT(res->location, Eq("global")); +} + +TEST(BucketMetadataCacheTest, InvalidateAndClear) { + BucketMetadataCache cache(10); + BucketCacheEntry entry{"projects/123/buckets/test-bucket", "us-central1"}; + cache.Put("test-bucket", entry); + EXPECT_TRUE(cache.Get("test-bucket").has_value()); + + cache.Invalidate("test-bucket"); + EXPECT_FALSE(cache.Get("test-bucket").has_value()); + + cache.Put("bucket1", entry); + cache.Put("bucket2", entry); + EXPECT_TRUE(cache.Get("bucket1").has_value()); + EXPECT_TRUE(cache.Get("bucket2").has_value()); + + cache.Clear(); + EXPECT_FALSE(cache.Get("bucket1").has_value()); + EXPECT_FALSE(cache.Get("bucket2").has_value()); +} + +TEST(BucketMetadataCacheTest, EvictsOldest) { + BucketMetadataCache cache(2); + BucketCacheEntry entry{"id", "loc"}; + + cache.Put("b1", entry); + cache.Put("b2", entry); + EXPECT_TRUE(cache.Get("b1").has_value()); // b1 becomes most recent + + cache.Put("b3", entry); // pushes out b2 (oldest) + EXPECT_TRUE(cache.Get("b1").has_value()); + EXPECT_FALSE(cache.Get("b2").has_value()); + EXPECT_TRUE(cache.Get("b3").has_value()); +} + +TEST(BucketMetadataCacheTest, InFlightFetch) { + BucketMetadataCache cache(10); + EXPECT_THAT(cache.StartFetch("b1"), IsTrue()); + EXPECT_THAT(cache.StartFetch("b1"), IsFalse()); + + EXPECT_THAT(cache.StartFetch("b2"), IsTrue()); + + cache.EndFetch("b1"); + EXPECT_THAT(cache.StartFetch("b1"), IsTrue()); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace storage_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/storage/storage_client_unit_tests.bzl b/google/cloud/storage/storage_client_unit_tests.bzl index 54c1c64a555b6..c08dc7b2119c7 100644 --- a/google/cloud/storage/storage_client_unit_tests.bzl +++ b/google/cloud/storage/storage_client_unit_tests.bzl @@ -43,6 +43,7 @@ storage_client_unit_tests = [ "idempotency_policy_test.cc", "internal/base64_test.cc", "internal/bucket_acl_requests_test.cc", + "internal/bucket_metadata_cache_test.cc", "internal/bucket_requests_test.cc", "internal/complex_option_test.cc", "internal/compute_engine_util_test.cc", From e68c833a6ec9a312e7f544629f5c8a8d09db2bd1 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 9 Jun 2026 13:33:29 +0000 Subject: [PATCH 13/20] more refactoring --- .../storage/internal/bucket_metadata_cache.cc | 24 +++++++++---------- .../storage/internal/bucket_metadata_cache.h | 2 ++ .../storage/internal/tracing_connection.cc | 11 +-------- 3 files changed, 15 insertions(+), 22 deletions(-) diff --git a/google/cloud/storage/internal/bucket_metadata_cache.cc b/google/cloud/storage/internal/bucket_metadata_cache.cc index ff0b8ce02ba4c..3b6704c2bb7da 100644 --- a/google/cloud/storage/internal/bucket_metadata_cache.cc +++ b/google/cloud/storage/internal/bucket_metadata_cache.cc @@ -21,27 +21,27 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +void BucketMetadataCache::MoveToFront(std::list::iterator it) { + list_.splice(list_.begin(), list_, it); +} + absl::optional BucketMetadataCache::Get( std::string const& bucket_name) { - std::lock_guard lock(mu_); + std::unique_lock lk(mu_); auto it = map_.find(bucket_name); if (it == map_.end()) return absl::nullopt; - list_.erase(it->second.second); - list_.push_front(bucket_name); - it->second.second = list_.begin(); + MoveToFront(it->second.second); return it->second.first; } void BucketMetadataCache::Put(std::string const& bucket_name, BucketCacheEntry entry) { - std::lock_guard lock(mu_); + std::unique_lock lk(mu_); auto it = map_.find(bucket_name); if (it != map_.end()) { it->second.first = std::move(entry); - list_.erase(it->second.second); - list_.push_front(bucket_name); - it->second.second = list_.begin(); + MoveToFront(it->second.second); return; } @@ -56,7 +56,7 @@ void BucketMetadataCache::Put(std::string const& bucket_name, } void BucketMetadataCache::Invalidate(std::string const& bucket_name) { - std::lock_guard lock(mu_); + std::unique_lock lk(mu_); auto it = map_.find(bucket_name); if (it != map_.end()) { list_.erase(it->second.second); @@ -65,14 +65,14 @@ void BucketMetadataCache::Invalidate(std::string const& bucket_name) { } void BucketMetadataCache::Clear() { - std::lock_guard lock(mu_); + std::unique_lock lk(mu_); map_.clear(); list_.clear(); in_flight_fetch_.clear(); } bool BucketMetadataCache::StartFetch(std::string const& bucket_name) { - std::lock_guard lock(mu_); + std::unique_lock lk(mu_); if (in_flight_fetch_.find(bucket_name) != in_flight_fetch_.end()) { return false; } @@ -81,7 +81,7 @@ bool BucketMetadataCache::StartFetch(std::string const& bucket_name) { } void BucketMetadataCache::EndFetch(std::string const& bucket_name) { - std::lock_guard lock(mu_); + std::unique_lock lk(mu_); in_flight_fetch_.erase(bucket_name); } diff --git a/google/cloud/storage/internal/bucket_metadata_cache.h b/google/cloud/storage/internal/bucket_metadata_cache.h index cf7dc95971a62..6c51ba3e1dc74 100644 --- a/google/cloud/storage/internal/bucket_metadata_cache.h +++ b/google/cloud/storage/internal/bucket_metadata_cache.h @@ -48,6 +48,8 @@ class BucketMetadataCache { void EndFetch(std::string const& bucket_name); private: + void MoveToFront(std::list::iterator it); + std::size_t max_size_; std::mutex mu_; std::list list_; diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 327466cbef263..2024b58fe9c2a 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -18,7 +18,6 @@ #include "google/cloud/internal/opentelemetry.h" #include "google/cloud/options.h" #include -#include #include #include #include @@ -48,7 +47,7 @@ void TracingConnection::ResetCacheForTesting() { cache().Clear(); } Options TracingConnection::options() const { return impl_->options(); } void TracingConnection::CleanupCompletedTasks() { - std::lock_guard lock(mu_); + std::unique_lock lk(mu_); bg_tasks_.erase(std::remove_if(bg_tasks_.begin(), bg_tasks_.end(), [](std::future const& f) { return f.wait_for(std::chrono::seconds(0)) == @@ -71,14 +70,6 @@ void TracingConnection::MaybeTriggerBackgroundFetch( google::cloud::internal::OptionsSpan span(current_options); storage::internal::GetBucketMetadataRequest request(bucket_name); auto result = impl_->GetBucketMetadata(request); - std::cout << "BG Thread: GetBucketMetadata returned ok: " << result.ok() - << "\n"; - if (!result.ok()) { - std::cout << "BG Thread: GetBucketMetadata status: " - << result.status().message() - << " code: " << static_cast(result.status().code()) - << "\n"; - } BucketCacheEntry entry; if (result.ok()) { From c889791150ce6c9b8150bee7c74b0c9e4cfcea77 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 9 Jun 2026 15:07:03 +0000 Subject: [PATCH 14/20] more refactoring --- .../storage/internal/bucket_metadata_cache.cc | 13 +++ .../storage/internal/bucket_metadata_cache.h | 7 ++ .../storage/internal/tracing_connection.cc | 84 +++++++++---------- .../storage/internal/tracing_connection.h | 14 ++-- 4 files changed, 69 insertions(+), 49 deletions(-) diff --git a/google/cloud/storage/internal/bucket_metadata_cache.cc b/google/cloud/storage/internal/bucket_metadata_cache.cc index 3b6704c2bb7da..dc57578715f17 100644 --- a/google/cloud/storage/internal/bucket_metadata_cache.cc +++ b/google/cloud/storage/internal/bucket_metadata_cache.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/internal/bucket_metadata_cache.h" +#include "google/cloud/storage/bucket_metadata.h" #include #include @@ -21,6 +22,18 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +BucketCacheEntry BucketCacheEntry::FromMetadata( + storage::BucketMetadata const& m) { + std::string loc = m.location(); + if (m.location_type() == "multi-region" || + m.location_type() == "dual-region") { + loc = "global"; + } + return { + "projects/" + std::to_string(m.project_number()) + "/buckets/" + m.name(), + std::move(loc)}; +} + void BucketMetadataCache::MoveToFront(std::list::iterator it) { list_.splice(list_.begin(), list_, it); } diff --git a/google/cloud/storage/internal/bucket_metadata_cache.h b/google/cloud/storage/internal/bucket_metadata_cache.h index 6c51ba3e1dc74..172dc48772f8b 100644 --- a/google/cloud/storage/internal/bucket_metadata_cache.h +++ b/google/cloud/storage/internal/bucket_metadata_cache.h @@ -27,12 +27,19 @@ namespace google { namespace cloud { +namespace storage { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +class BucketMetadata; +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace storage namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN struct BucketCacheEntry { std::string id; std::string location; + + static BucketCacheEntry FromMetadata(storage::BucketMetadata const& m); }; class BucketMetadataCache { diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 2024b58fe9c2a..36d454e22e189 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -56,6 +56,12 @@ void TracingConnection::CleanupCompletedTasks() { bg_tasks_.end()); } +void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, + BucketCacheEntry const& entry) { + span.SetAttribute("gcp.resource.destination.id", entry.id); + span.SetAttribute("gcp.resource.destination.location", entry.location); +} + void TracingConnection::MaybeTriggerBackgroundFetch( std::string const& bucket_name) { CleanupCompletedTasks(); @@ -65,30 +71,20 @@ void TracingConnection::MaybeTriggerBackgroundFetch( } auto current_options = google::cloud::internal::SaveCurrentOptions(); - auto f = - std::async(std::launch::async, [this, bucket_name, current_options]() { - google::cloud::internal::OptionsSpan span(current_options); - storage::internal::GetBucketMetadataRequest request(bucket_name); - auto result = impl_->GetBucketMetadata(request); - - BucketCacheEntry entry; - if (result.ok()) { - entry.id = "projects/" + std::to_string(result->project_number()) + - "/buckets/" + result->name(); - entry.location = result->location(); - if (result->location_type() == "multi-region" || - result->location_type() == "dual-region") { - entry.location = "global"; - } - cache().Put(bucket_name, std::move(entry)); - } else if (result.status().code() == StatusCode::kPermissionDenied) { - entry.id = "projects/_/buckets/" + bucket_name; - entry.location = "global"; - cache().Put(bucket_name, std::move(entry)); - } - - cache().EndFetch(bucket_name); - }); + auto f = std::async(std::launch::async, [this, bucket_name, + current_options]() { + google::cloud::internal::OptionsSpan span(current_options); + storage::internal::GetBucketMetadataRequest request(bucket_name); + auto result = impl_->GetBucketMetadata(request); + + if (result.ok()) { + cache().Put(bucket_name, BucketCacheEntry::FromMetadata(*result)); + } else if (result.status().code() == StatusCode::kPermissionDenied) { + cache().Put(bucket_name, {"projects/_/buckets/" + bucket_name, "global"}); + } + + cache().EndFetch(bucket_name); + }); bg_tasks_.push_back(std::move(f)); } @@ -98,8 +94,7 @@ void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, if (bucket_name.empty()) return; auto entry = cache().Get(bucket_name); if (entry.has_value()) { - span.SetAttribute("gcp.resource.destination.id", entry->id); - span.SetAttribute("gcp.resource.destination.location", entry->location); + EnrichSpan(span, *entry); } else { MaybeTriggerBackgroundFetch(bucket_name); } @@ -107,18 +102,9 @@ void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, storage::BucketMetadata const& metadata) { - std::string id = "projects/" + std::to_string(metadata.project_number()) + - "/buckets/" + metadata.name(); - std::string location = metadata.location(); - if (metadata.location_type() == "multi-region" || - metadata.location_type() == "dual-region") { - location = "global"; - } - span.SetAttribute("gcp.resource.destination.id", id); - span.SetAttribute("gcp.resource.destination.location", location); - - // Populate cache since we have metadata! - cache().Put(metadata.name(), {id, location}); + auto entry = BucketCacheEntry::FromMetadata(metadata); + EnrichSpan(span, entry); + cache().Put(metadata.name(), std::move(entry)); } StatusOr TracingConnection::ListBuckets( @@ -143,7 +129,11 @@ StatusOr TracingConnection::GetBucketMetadata( auto span = internal::MakeSpan("storage::Client::GetBucketMetadata"); auto scope = opentelemetry::trace::Scope(span); auto result = impl_->GetBucketMetadata(request); - if (result.ok()) EnrichSpan(*span, *result); + if (result.ok()) { + EnrichSpan(*span, *result); + } else { + MaybeInvalidate(result, request.bucket_name()); + } return internal::EndSpan(*span, std::move(result)); } @@ -153,7 +143,9 @@ StatusOr TracingConnection::DeleteBucket( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->DeleteBucket(request); - MaybeInvalidate(result, request.bucket_name()); + if (result.ok() || result.status().code() == StatusCode::kNotFound) { + cache().Invalidate(request.bucket_name()); + } return internal::EndSpan(*span, std::move(result)); } @@ -162,7 +154,11 @@ StatusOr TracingConnection::UpdateBucket( auto span = internal::MakeSpan("storage::Client::UpdateBucket"); auto scope = opentelemetry::trace::Scope(span); auto result = impl_->UpdateBucket(request); - if (result.ok()) EnrichSpan(*span, *result); + if (result.ok()) { + EnrichSpan(*span, *result); + } else { + MaybeInvalidate(result, request.metadata().name()); + } return internal::EndSpan(*span, std::move(result)); } @@ -171,7 +167,11 @@ StatusOr TracingConnection::PatchBucket( auto span = internal::MakeSpan("storage::Client::PatchBucket"); auto scope = opentelemetry::trace::Scope(span); auto result = impl_->PatchBucket(request); - if (result.ok()) EnrichSpan(*span, *result); + if (result.ok()) { + EnrichSpan(*span, *result); + } else { + MaybeInvalidate(result, request.bucket()); + } return internal::EndSpan(*span, std::move(result)); } diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 9a88d9693a7b4..78cb909501fd3 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -187,22 +187,22 @@ class TracingConnection : public storage::internal::StorageConnection { std::string const& bucket_name); static void EnrichSpan(opentelemetry::trace::Span& span, storage::BucketMetadata const& metadata); + static void EnrichSpan(opentelemetry::trace::Span& span, + BucketCacheEntry const& entry); void MaybeTriggerBackgroundFetch(std::string const& bucket_name); void CleanupCompletedTasks(); - template - static void MaybeInvalidate(StatusOr const& result, + static void MaybeInvalidate(Status const& status, std::string const& bucket_name) { - if (!result.ok() && result.status().code() == StatusCode::kNotFound) { + if (!status.ok() && status.code() == StatusCode::kNotFound) { cache().Invalidate(bucket_name); } } - static void MaybeInvalidate(Status const& status, + template + static void MaybeInvalidate(StatusOr const& result, std::string const& bucket_name) { - if (!status.ok() && status.code() == StatusCode::kNotFound) { - cache().Invalidate(bucket_name); - } + MaybeInvalidate(result.status(), bucket_name); } static BucketMetadataCache& cache(); From 394c2ee15183585eeff92d093a0a0d57bb6aa4fb Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Thu, 11 Jun 2026 06:14:03 +0000 Subject: [PATCH 15/20] address review comments --- google/cloud/storage/client.cc | 3 ++- .../storage/internal/bucket_metadata_cache.cc | 3 ++- .../cloud/storage/internal/tracing_connection.cc | 7 +++++++ google/cloud/storage/options.h | 16 +++++++++++++++- 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/client.cc b/google/cloud/storage/client.cc index 559d2f87fca7c..ee138a3caf5be 100644 --- a/google/cloud/storage/client.cc +++ b/google/cloud/storage/client.cc @@ -555,7 +555,8 @@ Options DefaultOptions(Options opts) { STORAGE_CLIENT_DEFAULT_MAXIMUM_BACKOFF_DELAY, STORAGE_CLIENT_DEFAULT_BACKOFF_SCALING) .clone()) - .set(AlwaysRetryIdempotencyPolicy().clone()); + .set(AlwaysRetryIdempotencyPolicy().clone()) + .set(true); o = google::cloud::internal::MergeOptions(std::move(opts), std::move(o)); // If the application did not set `DownloadStallTimeoutOption` then use the diff --git a/google/cloud/storage/internal/bucket_metadata_cache.cc b/google/cloud/storage/internal/bucket_metadata_cache.cc index dc57578715f17..f663eb31714fc 100644 --- a/google/cloud/storage/internal/bucket_metadata_cache.cc +++ b/google/cloud/storage/internal/bucket_metadata_cache.cc @@ -50,6 +50,7 @@ absl::optional BucketMetadataCache::Get( void BucketMetadataCache::Put(std::string const& bucket_name, BucketCacheEntry entry) { + if (max_size_ == 0) return; std::unique_lock lk(mu_); auto it = map_.find(bucket_name); if (it != map_.end()) { @@ -58,7 +59,7 @@ void BucketMetadataCache::Put(std::string const& bucket_name, return; } - if (map_.size() >= max_size_) { + if (map_.size() >= max_size_ && !list_.empty()) { auto oldest = list_.back(); list_.pop_back(); map_.erase(oldest); diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 36d454e22e189..50d9619c6bc98 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -14,6 +14,7 @@ #include "google/cloud/storage/internal/tracing_connection.h" #include "google/cloud/storage/internal/tracing_object_read_source.h" +#include "google/cloud/storage/options.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/opentelemetry.h" #include "google/cloud/options.h" @@ -92,6 +93,9 @@ void TracingConnection::MaybeTriggerBackgroundFetch( void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, std::string const& bucket_name) { if (bucket_name.empty()) return; + auto const enabled = + options().get(); + if (!enabled) return; auto entry = cache().Get(bucket_name); if (entry.has_value()) { EnrichSpan(span, *entry); @@ -102,6 +106,9 @@ void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, storage::BucketMetadata const& metadata) { + auto const enabled = + options().get(); + if (!enabled) return; auto entry = BucketCacheEntry::FromMetadata(metadata); EnrichSpan(span, entry); cache().Put(metadata.name(), std::move(entry)); diff --git a/google/cloud/storage/options.h b/google/cloud/storage/options.h index f64f5d1f28e51..5c3fddeae543b 100644 --- a/google/cloud/storage/options.h +++ b/google/cloud/storage/options.h @@ -52,6 +52,19 @@ struct HttpVersionOption { using Type = std::string; }; +/** + * Enable/disable OpenTelemetry trace span enrichment with GCS bucket resource + * metadata. + * + * When enabled, the GCS client decorates spans with gcp.resource.destination.id + * and location attributes by fetching metadata in the background. + * + * @ingroup storage-options + */ +struct OTelSpanEnrichmentOption { + using Type = bool; +}; + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_experimental @@ -325,7 +338,8 @@ using ClientOptionList = ::google::cloud::OptionList< MaximumCurlSocketRecvSizeOption, MaximumCurlSocketSendSizeOption, TransferStallTimeoutOption, RetryPolicyOption, BackoffPolicyOption, IdempotencyPolicyOption, CARootsFilePathOption, - storage_experimental::HttpVersionOption>; + storage_experimental::HttpVersionOption, + storage_experimental::OTelSpanEnrichmentOption>; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage From 9bc244da3347ac153b2b7d124d2ecad3c9d422d4 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Thu, 11 Jun 2026 13:22:56 +0000 Subject: [PATCH 16/20] address review comments --- .../storage/internal/tracing_connection.cc | 28 ++---- .../storage/internal/tracing_connection.h | 8 +- .../internal/tracing_connection_test.cc | 90 +++++++++++++++++++ 3 files changed, 100 insertions(+), 26 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 50d9619c6bc98..0d81e911cb50b 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -33,6 +33,7 @@ TracingConnection::TracingConnection(std::shared_ptr impl) : impl_(std::move(impl)) {} TracingConnection::~TracingConnection() { + std::lock_guard lk(mu_); for (auto& f : bg_tasks_) { if (f.valid()) f.wait(); } @@ -87,9 +88,13 @@ void TracingConnection::MaybeTriggerBackgroundFetch( cache().EndFetch(bucket_name); }); - bg_tasks_.push_back(std::move(f)); + { + std::lock_guard lk(mu_); + bg_tasks_.push_back(std::move(f)); + } } + void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, std::string const& bucket_name) { if (bucket_name.empty()) return; @@ -228,7 +233,6 @@ StatusOr TracingConnection::InsertObjectMedia( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->InsertObjectMedia(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -238,7 +242,6 @@ StatusOr TracingConnection::CopyObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.destination_bucket()); auto result = impl_->CopyObject(request); - MaybeInvalidate(result, request.destination_bucket()); return internal::EndSpan(*span, std::move(result)); } @@ -248,7 +251,6 @@ StatusOr TracingConnection::GetObjectMetadata( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->GetObjectMetadata(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -260,7 +262,6 @@ TracingConnection::ReadObject( EnrichSpan(*span, request.bucket_name()); auto reader = impl_->ReadObject(request); if (!reader) { - MaybeInvalidate(reader, request.bucket_name()); return internal::EndSpan(*span, std::move(reader)); } return std::unique_ptr( @@ -285,7 +286,6 @@ StatusOr TracingConnection::DeleteObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->DeleteObject(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -295,7 +295,6 @@ StatusOr TracingConnection::UpdateObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->UpdateObject(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -305,7 +304,6 @@ StatusOr TracingConnection::MoveObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->MoveObject(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -315,7 +313,6 @@ StatusOr TracingConnection::PatchObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->PatchObject(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -325,7 +322,6 @@ StatusOr TracingConnection::ComposeObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->ComposeObject(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -336,7 +332,6 @@ TracingConnection::RewriteObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.destination_bucket()); auto result = impl_->RewriteObject(request); - MaybeInvalidate(result, request.destination_bucket()); return internal::EndSpan(*span, std::move(result)); } @@ -346,7 +341,6 @@ StatusOr TracingConnection::RestoreObject( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->RestoreObject(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -359,7 +353,6 @@ TracingConnection::CreateResumableUpload( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->CreateResumableUpload(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -398,7 +391,6 @@ StatusOr> TracingConnection::UploadFileSimple( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->UploadFileSimple(file_name, file_size, request); - if (!result) MaybeInvalidate(result.status(), request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -410,7 +402,6 @@ StatusOr> TracingConnection::UploadFileResumable( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->UploadFileResumable(file_name, request); - if (!result) MaybeInvalidate(result.status(), request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -423,7 +414,6 @@ Status TracingConnection::DownloadStreamToFile( EnrichSpan(*span, request.bucket_name()); auto result = impl_->DownloadStreamToFile(std::move(stream), file_name, request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, result); } @@ -509,7 +499,6 @@ TracingConnection::ListObjectAcl( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->ListObjectAcl(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -519,7 +508,6 @@ StatusOr TracingConnection::CreateObjectAcl( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->CreateObjectAcl(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -529,7 +517,6 @@ StatusOr TracingConnection::DeleteObjectAcl( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->DeleteObjectAcl(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -539,7 +526,6 @@ StatusOr TracingConnection::GetObjectAcl( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->GetObjectAcl(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -549,7 +535,6 @@ StatusOr TracingConnection::UpdateObjectAcl( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->UpdateObjectAcl(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } @@ -559,7 +544,6 @@ StatusOr TracingConnection::PatchObjectAcl( auto scope = opentelemetry::trace::Scope(span); EnrichSpan(*span, request.bucket_name()); auto result = impl_->PatchObjectAcl(request); - MaybeInvalidate(result, request.bucket_name()); return internal::EndSpan(*span, std::move(result)); } diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 78cb909501fd3..6dc8751a960eb 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -185,10 +185,10 @@ class TracingConnection : public storage::internal::StorageConnection { private: void EnrichSpan(opentelemetry::trace::Span& span, std::string const& bucket_name); - static void EnrichSpan(opentelemetry::trace::Span& span, - storage::BucketMetadata const& metadata); - static void EnrichSpan(opentelemetry::trace::Span& span, - BucketCacheEntry const& entry); + void EnrichSpan(opentelemetry::trace::Span& span, + storage::BucketMetadata const& metadata); + void EnrichSpan(opentelemetry::trace::Span& span, + BucketCacheEntry const& entry); void MaybeTriggerBackgroundFetch(std::string const& bucket_name); void CleanupCompletedTasks(); diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 0bd1c9308e0d2..bd81aac61d306 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -1685,6 +1685,96 @@ TEST(TracingClientTest, DeleteNotification) { "gl-cpp.status_code", code_str))))); } +TEST(TracingClientTest, BucketMetadataMaybeInvalidateBucketLevelEvict) { + TracingConnection::ResetCacheForTesting(); + auto mock = std::make_shared(); + + EXPECT_CALL(*mock, options) + .WillRepeatedly(testing::Return( + Options{}.set(true))); + + // Seed cache + EXPECT_CALL(*mock, GetBucketMetadata).WillOnce([](auto const&) { + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + return metadata; + }); + + auto under_test = TracingConnection(mock); + (void)under_test.GetBucketMetadata( + storage::internal::GetBucketMetadataRequest("test-bucket")); + + // Fail a bucket-level operation with 404 (DeleteBucket) + EXPECT_CALL(*mock, DeleteBucket).WillOnce([](auto const&) { + return Status(StatusCode::kNotFound, "Bucket not found"); + }); + (void)under_test.DeleteBucket( + storage::internal::DeleteBucketRequest("test-bucket")); + + // Verify that the cache entry was evicted. + testing::Mock::VerifyAndClearExpectations(mock.get()); + EXPECT_CALL(*mock, options) + .WillRepeatedly(testing::Return( + Options{}.set(true))); + + EXPECT_CALL(*mock, GetObjectMetadata).WillOnce([](auto const&) { + return storage::ObjectMetadata(); + }); + EXPECT_CALL(*mock, GetBucketMetadata).WillOnce([](auto const&) { + return Status(StatusCode::kNotFound, "Bucket not found"); + }); + + (void)under_test.GetObjectMetadata( + storage::internal::GetObjectMetadataRequest("test-bucket", "test-object")); +} + +TEST(TracingClientTest, BucketMetadataMaybeInvalidateObjectLevelNoEvict) { + TracingConnection::ResetCacheForTesting(); + auto mock = std::make_shared(); + + EXPECT_CALL(*mock, options) + .WillRepeatedly(testing::Return( + Options{}.set(true))); + + // Seed cache + EXPECT_CALL(*mock, GetBucketMetadata).WillOnce([](auto const&) { + storage::BucketMetadata metadata; + metadata.set_name("test-bucket"); + metadata.set_project_number(123456); + metadata.set_location("us-east1"); + metadata.set_location_type("regional"); + return metadata; + }); + + auto under_test = TracingConnection(mock); + (void)under_test.GetBucketMetadata( + storage::internal::GetBucketMetadataRequest("test-bucket")); + + // Fail an object-level operation with 404 (GetObjectMetadata) + EXPECT_CALL(*mock, GetObjectMetadata).WillOnce([](auto const&) { + return Status(StatusCode::kNotFound, "Object not found"); + }); + (void)under_test.GetObjectMetadata( + storage::internal::GetObjectMetadataRequest("test-bucket", "test-object")); + + // Verify that the cache entry was NOT evicted. + testing::Mock::VerifyAndClearExpectations(mock.get()); + EXPECT_CALL(*mock, options) + .WillRepeatedly(testing::Return( + Options{}.set(true))); + + EXPECT_CALL(*mock, GetObjectMetadata).WillOnce([](auto const&) { + return storage::ObjectMetadata(); + }); + EXPECT_CALL(*mock, GetBucketMetadata).Times(0); + + (void)under_test.GetObjectMetadata( + storage::internal::GetObjectMetadataRequest("test-bucket", "test-object")); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal From 80f0ba02da5223c73cda46f3cc2a27d52595a935 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Thu, 11 Jun 2026 15:33:42 +0000 Subject: [PATCH 17/20] ci failure fix --- google/cloud/storage/internal/tracing_connection.cc | 1 - google/cloud/storage/internal/tracing_connection.h | 4 ++-- google/cloud/storage/internal/tracing_connection_test.cc | 9 ++++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 0d81e911cb50b..8436e380eb02a 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -94,7 +94,6 @@ void TracingConnection::MaybeTriggerBackgroundFetch( } } - void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, std::string const& bucket_name) { if (bucket_name.empty()) return; diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 6dc8751a960eb..0f5e59a039039 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -187,8 +187,8 @@ class TracingConnection : public storage::internal::StorageConnection { std::string const& bucket_name); void EnrichSpan(opentelemetry::trace::Span& span, storage::BucketMetadata const& metadata); - void EnrichSpan(opentelemetry::trace::Span& span, - BucketCacheEntry const& entry); + static void EnrichSpan(opentelemetry::trace::Span& span, + BucketCacheEntry const& entry); void MaybeTriggerBackgroundFetch(std::string const& bucket_name); void CleanupCompletedTasks(); diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index bd81aac61d306..d3305d023a126 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -1728,7 +1728,8 @@ TEST(TracingClientTest, BucketMetadataMaybeInvalidateBucketLevelEvict) { }); (void)under_test.GetObjectMetadata( - storage::internal::GetObjectMetadataRequest("test-bucket", "test-object")); + storage::internal::GetObjectMetadataRequest("test-bucket", + "test-object")); } TEST(TracingClientTest, BucketMetadataMaybeInvalidateObjectLevelNoEvict) { @@ -1758,7 +1759,8 @@ TEST(TracingClientTest, BucketMetadataMaybeInvalidateObjectLevelNoEvict) { return Status(StatusCode::kNotFound, "Object not found"); }); (void)under_test.GetObjectMetadata( - storage::internal::GetObjectMetadataRequest("test-bucket", "test-object")); + storage::internal::GetObjectMetadataRequest("test-bucket", + "test-object")); // Verify that the cache entry was NOT evicted. testing::Mock::VerifyAndClearExpectations(mock.get()); @@ -1772,7 +1774,8 @@ TEST(TracingClientTest, BucketMetadataMaybeInvalidateObjectLevelNoEvict) { EXPECT_CALL(*mock, GetBucketMetadata).Times(0); (void)under_test.GetObjectMetadata( - storage::internal::GetObjectMetadataRequest("test-bucket", "test-object")); + storage::internal::GetObjectMetadataRequest("test-bucket", + "test-object")); } } // namespace From d63973e181ca1e6b731785f84ecf3874f29e030e Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Fri, 12 Jun 2026 06:36:14 +0000 Subject: [PATCH 18/20] fix ci failure --- google/cloud/storage/internal/tracing_connection.cc | 2 +- google/cloud/storage/internal/tracing_connection.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 8436e380eb02a..14e3b53cc8290 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -109,7 +109,7 @@ void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, } void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, - storage::BucketMetadata const& metadata) { + storage::BucketMetadata const& metadata) const { auto const enabled = options().get(); if (!enabled) return; diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 0f5e59a039039..e05b953475301 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -186,7 +186,7 @@ class TracingConnection : public storage::internal::StorageConnection { void EnrichSpan(opentelemetry::trace::Span& span, std::string const& bucket_name); void EnrichSpan(opentelemetry::trace::Span& span, - storage::BucketMetadata const& metadata); + storage::BucketMetadata const& metadata) const; static void EnrichSpan(opentelemetry::trace::Span& span, BucketCacheEntry const& entry); void MaybeTriggerBackgroundFetch(std::string const& bucket_name); From 1e342de9c39bcd046023cfe93df798d286cd4e5f Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Fri, 12 Jun 2026 08:34:15 +0000 Subject: [PATCH 19/20] using completetion_queue to run background threads --- .../storage/internal/tracing_connection.cc | 55 +++++++++---------- .../storage/internal/tracing_connection.h | 9 +-- .../internal/tracing_connection_test.cc | 3 +- 3 files changed, 32 insertions(+), 35 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 14e3b53cc8290..662189a0da73b 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -17,7 +17,11 @@ #include "google/cloud/storage/options.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/opentelemetry.h" +#include "google/cloud/internal/rest_pure_background_threads_impl.h" #include "google/cloud/options.h" +#if GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC +#include "google/cloud/grpc_options.h" +#endif #include #include #include @@ -29,15 +33,27 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { +std::size_t DefaultThreadPoolSize(Options const& options) { +#if GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC + auto pool_size = options.get(); + if (pool_size == 0) return 1U; + return pool_size; +#else + (void)options; + return 1U; +#endif +} +} // namespace + TracingConnection::TracingConnection(std::shared_ptr impl) - : impl_(std::move(impl)) {} + : impl_(std::move(impl)), + background_threads_( + std::make_unique< + rest_internal::AutomaticallyCreatedRestPureBackgroundThreads>( + DefaultThreadPoolSize(impl_->options()))) {} -TracingConnection::~TracingConnection() { - std::lock_guard lk(mu_); - for (auto& f : bg_tasks_) { - if (f.valid()) f.wait(); - } -} +TracingConnection::~TracingConnection() = default; BucketMetadataCache& TracingConnection::cache() { static BucketMetadataCache instance(10000); @@ -48,16 +64,6 @@ void TracingConnection::ResetCacheForTesting() { cache().Clear(); } Options TracingConnection::options() const { return impl_->options(); } -void TracingConnection::CleanupCompletedTasks() { - std::unique_lock lk(mu_); - bg_tasks_.erase(std::remove_if(bg_tasks_.begin(), bg_tasks_.end(), - [](std::future const& f) { - return f.wait_for(std::chrono::seconds(0)) == - std::future_status::ready; - }), - bg_tasks_.end()); -} - void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, BucketCacheEntry const& entry) { span.SetAttribute("gcp.resource.destination.id", entry.id); @@ -66,15 +72,12 @@ void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, void TracingConnection::MaybeTriggerBackgroundFetch( std::string const& bucket_name) { - CleanupCompletedTasks(); - if (!cache().StartFetch(bucket_name)) { return; } auto current_options = google::cloud::internal::SaveCurrentOptions(); - auto f = std::async(std::launch::async, [this, bucket_name, - current_options]() { + background_threads_->cq().RunAsync([this, bucket_name, current_options]() { google::cloud::internal::OptionsSpan span(current_options); storage::internal::GetBucketMetadataRequest request(bucket_name); auto result = impl_->GetBucketMetadata(request); @@ -87,11 +90,6 @@ void TracingConnection::MaybeTriggerBackgroundFetch( cache().EndFetch(bucket_name); }); - - { - std::lock_guard lk(mu_); - bg_tasks_.push_back(std::move(f)); - } } void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, @@ -108,8 +106,9 @@ void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, } } -void TracingConnection::EnrichSpan(opentelemetry::trace::Span& span, - storage::BucketMetadata const& metadata) const { +void TracingConnection::EnrichSpan( + opentelemetry::trace::Span& span, + storage::BucketMetadata const& metadata) const { auto const enabled = options().get(); if (!enabled) return; diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index e05b953475301..c30d0dee2938f 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -19,11 +19,9 @@ #include "google/cloud/storage/internal/storage_connection.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/storage/version.h" -#include +#include "google/cloud/internal/rest_pure_background_threads_impl.h" #include -#include #include -#include namespace google { namespace cloud { @@ -190,7 +188,6 @@ class TracingConnection : public storage::internal::StorageConnection { static void EnrichSpan(opentelemetry::trace::Span& span, BucketCacheEntry const& entry); void MaybeTriggerBackgroundFetch(std::string const& bucket_name); - void CleanupCompletedTasks(); static void MaybeInvalidate(Status const& status, std::string const& bucket_name) { @@ -208,8 +205,8 @@ class TracingConnection : public storage::internal::StorageConnection { static BucketMetadataCache& cache(); std::shared_ptr impl_; - std::mutex mu_; - std::vector> bg_tasks_; + std::unique_ptr + background_threads_; }; std::shared_ptr MakeTracingClient( diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index d3305d023a126..79d130996005b 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -57,7 +57,8 @@ TEST(TracingClientTest, Options) { }; auto mock = std::make_shared(); - EXPECT_CALL(*mock, options).WillOnce(Return(Options{}.set(42))); + EXPECT_CALL(*mock, options) + .WillRepeatedly(Return(Options{}.set(42))); auto under_test = TracingConnection(mock); auto const options = under_test.options(); EXPECT_EQ(42, options.get()); From 85efb53794753e123d70d2bbaad84307393f6d07 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Fri, 12 Jun 2026 14:13:01 +0000 Subject: [PATCH 20/20] use generic+background_thread --- .../storage/internal/tracing_connection.cc | 4 +--- .../storage/internal/tracing_connection.h | 24 +++++++++++++++++-- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 662189a0da73b..706bf839abdbd 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -17,7 +17,6 @@ #include "google/cloud/storage/options.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/opentelemetry.h" -#include "google/cloud/internal/rest_pure_background_threads_impl.h" #include "google/cloud/options.h" #if GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC #include "google/cloud/grpc_options.h" @@ -49,8 +48,7 @@ std::size_t DefaultThreadPoolSize(Options const& options) { TracingConnection::TracingConnection(std::shared_ptr impl) : impl_(std::move(impl)), background_threads_( - std::make_unique< - rest_internal::AutomaticallyCreatedRestPureBackgroundThreads>( + std::make_unique( DefaultThreadPoolSize(impl_->options()))) {} TracingConnection::~TracingConnection() = default; diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index c30d0dee2938f..0ca9643905354 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -19,7 +19,13 @@ #include "google/cloud/storage/internal/storage_connection.h" #include "google/cloud/storage/parallel_upload.h" #include "google/cloud/storage/version.h" +#include "google/cloud/internal/generic_background_threads_impl.h" +#if GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC +#include "google/cloud/background_threads.h" +#include "google/cloud/completion_queue.h" +#else #include "google/cloud/internal/rest_pure_background_threads_impl.h" +#endif #include #include @@ -204,9 +210,23 @@ class TracingConnection : public storage::internal::StorageConnection { static BucketMetadataCache& cache(); +#if GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC + using StorageBackgroundThreads = google::cloud::BackgroundThreads; + using AutomaticallyCreatedStorageBackgroundThreads = + google::cloud::internal::AutomaticallyCreatedBackgroundThreadsImpl< + google::cloud::CompletionQueue, google::cloud::BackgroundThreads>; +#else + using StorageBackgroundThreads = + google::cloud::rest_internal::RestPureBackgroundThreads; + using AutomaticallyCreatedStorageBackgroundThreads = + google::cloud::internal::AutomaticallyCreatedBackgroundThreadsImpl< + rest_internal::RestPureCompletionQueue, + rest_internal::RestPureBackgroundThreads, + rest_internal::RestPureQueueTraits>; +#endif + std::shared_ptr impl_; - std::unique_ptr - background_threads_; + std::unique_ptr background_threads_; }; std::shared_ptr MakeTracingClient(