diff --git a/google/cloud/google_cloud_cpp_common.bzl b/google/cloud/google_cloud_cpp_common.bzl index 421c9c00a4160..5c86d6d9df7fc 100644 --- a/google/cloud/google_cloud_cpp_common.bzl +++ b/google/cloud/google_cloud_cpp_common.bzl @@ -55,6 +55,7 @@ google_cloud_cpp_common_hdrs = [ "internal/future_fwd.h", "internal/future_impl.h", "internal/future_then_impl.h", + "internal/generic_background_threads_impl.h", "internal/getenv.h", "internal/group_options.h", "internal/invocation_id_generator.h", diff --git a/google/cloud/google_cloud_cpp_common.cmake b/google/cloud/google_cloud_cpp_common.cmake index 8b3601db39f01..a892e6db3d02c 100644 --- a/google/cloud/google_cloud_cpp_common.cmake +++ b/google/cloud/google_cloud_cpp_common.cmake @@ -86,6 +86,7 @@ add_library( internal/future_impl.h internal/future_then_impl.cc internal/future_then_impl.h + internal/generic_background_threads_impl.h internal/getenv.cc internal/getenv.h internal/group_options.h diff --git a/google/cloud/google_cloud_cpp_rest_internal.cmake b/google/cloud/google_cloud_cpp_rest_internal.cmake index c2fa155d7ad9e..588e1da4bda2b 100644 --- a/google/cloud/google_cloud_cpp_rest_internal.cmake +++ b/google/cloud/google_cloud_cpp_rest_internal.cmake @@ -298,6 +298,7 @@ if (BUILD_TESTING) internal/rest_lro_helpers_test.cc internal/rest_opentelemetry_test.cc internal/rest_parse_json_error_test.cc + internal/rest_pure_background_threads_impl_test.cc internal/rest_request_test.cc internal/rest_response_test.cc internal/rest_retry_loop_test.cc diff --git a/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl b/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl index 003a9fe1d2080..c0e1909d4ca6b 100644 --- a/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl +++ b/google/cloud/google_cloud_cpp_rest_internal_unit_tests.bzl @@ -61,6 +61,7 @@ google_cloud_cpp_rest_internal_unit_tests = [ "internal/rest_lro_helpers_test.cc", "internal/rest_opentelemetry_test.cc", "internal/rest_parse_json_error_test.cc", + "internal/rest_pure_background_threads_impl_test.cc", "internal/rest_request_test.cc", "internal/rest_response_test.cc", "internal/rest_retry_loop_test.cc", diff --git a/google/cloud/internal/background_threads_impl.cc b/google/cloud/internal/background_threads_impl.cc index d5e10b1750699..174d57311eb12 100644 --- a/google/cloud/internal/background_threads_impl.cc +++ b/google/cloud/internal/background_threads_impl.cc @@ -21,47 +21,7 @@ namespace google { namespace cloud { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -namespace internal { - -AutomaticallyCreatedBackgroundThreads::AutomaticallyCreatedBackgroundThreads( - std::size_t thread_count) - : pool_(thread_count == 0 ? 1 : thread_count) { - std::generate_n(pool_.begin(), pool_.size(), [this] { - promise started; - auto thread = std::thread( - [](CompletionQueue cq, promise& started) { - started.set_value(); - cq.Run(); - }, - cq_, std::ref(started)); - started.get_future().wait(); - return thread; - }); -} - -AutomaticallyCreatedBackgroundThreads:: - ~AutomaticallyCreatedBackgroundThreads() { - Shutdown(); -} - -void AutomaticallyCreatedBackgroundThreads::Shutdown() { - cq_.Shutdown(); - for (auto& t : pool_) { -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - try { -#endif - t.join(); -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - } catch (std::system_error const& e) { - GCP_LOG(FATAL) << "AutomaticallyCreatedBackgroundThreads::Shutdown: " - << e.what(); - } -#endif - } - pool_.clear(); -} - -} // namespace internal +namespace internal {} // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google diff --git a/google/cloud/internal/background_threads_impl.h b/google/cloud/internal/background_threads_impl.h index a307ded7976cf..b13f477658e1d 100644 --- a/google/cloud/internal/background_threads_impl.h +++ b/google/cloud/internal/background_threads_impl.h @@ -17,6 +17,7 @@ #include "google/cloud/background_threads.h" #include "google/cloud/completion_queue.h" +#include "google/cloud/internal/generic_background_threads_impl.h" #include "google/cloud/version.h" #include #include @@ -40,19 +41,9 @@ class CustomerSuppliedBackgroundThreads : public BackgroundThreads { }; /// Create a background thread to perform background operations. -class AutomaticallyCreatedBackgroundThreads : public BackgroundThreads { - public: - explicit AutomaticallyCreatedBackgroundThreads(std::size_t thread_count = 1U); - ~AutomaticallyCreatedBackgroundThreads() override; - - CompletionQueue cq() const override { return cq_; } - void Shutdown(); - std::size_t pool_size() const { return pool_.size(); } - - private: - CompletionQueue cq_; - std::vector pool_; -}; +using AutomaticallyCreatedBackgroundThreads = + AutomaticallyCreatedBackgroundThreadsImpl; } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/internal/generic_background_threads_impl.h b/google/cloud/internal/generic_background_threads_impl.h new file mode 100644 index 0000000000000..ca1c79b71731c --- /dev/null +++ b/google/cloud/internal/generic_background_threads_impl.h @@ -0,0 +1,91 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_GENERIC_BACKGROUND_THREADS_IMPL_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_GENERIC_BACKGROUND_THREADS_IMPL_H + +#include "google/cloud/future.h" +#include "google/cloud/log.h" +#include "google/cloud/version.h" +#include +#include +#include +#include + +namespace google { +namespace cloud { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace internal { + +template +struct DefaultQueueTraits { + static QueueType Create() { return QueueType(); } + static void Run(QueueType cq, promise& started) { + started.set_value(); + cq.Run(); + } +}; + +template > +class AutomaticallyCreatedBackgroundThreadsImpl : public BaseInterface { + public: + explicit AutomaticallyCreatedBackgroundThreadsImpl( + std::size_t thread_count = 1U) + : cq_(QueueTraits::Create()), + pool_(thread_count == 0 ? 1 : thread_count) { + std::generate_n(pool_.begin(), pool_.size(), [this] { + promise started; + auto thread = std::thread( + [](QueueType cq, promise& started) { + QueueTraits::Run(std::move(cq), started); + }, + cq_, std::ref(started)); + started.get_future().wait(); + return thread; + }); + } + + ~AutomaticallyCreatedBackgroundThreadsImpl() override { Shutdown(); } + + QueueType cq() const override { return cq_; } + std::size_t pool_size() const { return pool_.size(); } + + void Shutdown() { + cq_.Shutdown(); + for (auto& t : pool_) { +#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS + try { +#endif + if (t.joinable()) t.join(); +#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS + } catch (std::system_error const& e) { + GCP_LOG(FATAL) << "Shutdown error: " << e.what(); + } +#endif + } + pool_.clear(); + } + + private: + QueueType cq_; + std::vector pool_; +}; + +} // namespace internal +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_GENERIC_BACKGROUND_THREADS_IMPL_H diff --git a/google/cloud/internal/rest_pure_background_threads_impl.cc b/google/cloud/internal/rest_pure_background_threads_impl.cc index 525b92c0adb79..a830172037f80 100644 --- a/google/cloud/internal/rest_pure_background_threads_impl.cc +++ b/google/cloud/internal/rest_pure_background_threads_impl.cc @@ -25,48 +25,6 @@ namespace cloud { namespace rest_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -AutomaticallyCreatedRestPureBackgroundThreads:: - AutomaticallyCreatedRestPureBackgroundThreads(std::size_t thread_count) - : cq_(std::make_shared()), - pool_(thread_count == 0 ? 1 : thread_count) { - std::generate_n(pool_.begin(), pool_.size(), [this] { - promise started; - auto thread = std::thread( - [](RestPureCompletionQueue cq, promise& started, - internal::CallContext c) { - internal::ScopedCallContext scope(std::move(c)); - started.set_value(); - cq.Run(); - }, - cq_, std::ref(started), internal::CallContext{}); - started.get_future().wait(); - return thread; - }); -} - -AutomaticallyCreatedRestPureBackgroundThreads:: - ~AutomaticallyCreatedRestPureBackgroundThreads() { - Shutdown(); -} - -void AutomaticallyCreatedRestPureBackgroundThreads::Shutdown() { - cq_.Shutdown(); - for (auto& t : pool_) { -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - try { -#endif - t.join(); -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - } catch (std::system_error const& e) { - GCP_LOG(FATAL) - << "AutomaticallyCreatedRestPureBackgroundThreads::Shutdown: " - << e.what(); - } -#endif - } - pool_.clear(); -} - GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace rest_internal } // namespace cloud diff --git a/google/cloud/internal/rest_pure_background_threads_impl.h b/google/cloud/internal/rest_pure_background_threads_impl.h index 4061729dd7f63..a9c8aef7c14ca 100644 --- a/google/cloud/internal/rest_pure_background_threads_impl.h +++ b/google/cloud/internal/rest_pure_background_threads_impl.h @@ -15,6 +15,8 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_REST_PURE_BACKGROUND_THREADS_IMPL_H +#include "google/cloud/internal/call_context.h" +#include "google/cloud/internal/generic_background_threads_impl.h" #include "google/cloud/internal/rest_pure_completion_queue_impl.h" #include "google/cloud/version.h" #include @@ -37,23 +39,25 @@ class RestPureBackgroundThreads { virtual RestPureCompletionQueue cq() const = 0; }; -/// Background threads that run on a RestPureCompletionQueue. -class AutomaticallyCreatedRestPureBackgroundThreads - : public RestPureBackgroundThreads { - public: - explicit AutomaticallyCreatedRestPureBackgroundThreads( - std::size_t thread_count = 1U); - ~AutomaticallyCreatedRestPureBackgroundThreads() override; - - RestPureCompletionQueue cq() const override { return cq_; } - void Shutdown(); - std::size_t pool_size() const { return pool_.size(); } - - private: - RestPureCompletionQueue cq_; - std::vector pool_; +struct RestPureQueueTraits { + static RestPureCompletionQueue Create() { + return RestPureCompletionQueue( + std::make_shared()); + } + static void Run(RestPureCompletionQueue cq, promise& started) { + internal::CallContext context; + internal::ScopedCallContext scope(std::move(context)); + started.set_value(); + cq.Run(); + } }; +/// Background threads that run on a RestPureCompletionQueue. +using AutomaticallyCreatedRestPureBackgroundThreads = + google::cloud::internal::AutomaticallyCreatedBackgroundThreadsImpl< + RestPureCompletionQueue, RestPureBackgroundThreads, + RestPureQueueTraits>; + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace rest_internal } // namespace cloud diff --git a/google/cloud/internal/rest_pure_background_threads_impl_test.cc b/google/cloud/internal/rest_pure_background_threads_impl_test.cc new file mode 100644 index 0000000000000..31ee8e68ec0c1 --- /dev/null +++ b/google/cloud/internal/rest_pure_background_threads_impl_test.cc @@ -0,0 +1,84 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/internal/rest_pure_background_threads_impl.h" +#include +#include +#include + +namespace google { +namespace cloud { +namespace rest_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::testing::Contains; +using ::testing::Not; + +/// @test Verify that automatically created completion queues are usable. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, IsActive) { + AutomaticallyCreatedRestPureBackgroundThreads actual; + EXPECT_EQ(1, actual.pool_size()); + + promise bg; + actual.cq().RunAsync([&bg] { bg.set_value(std::this_thread::get_id()); }); + EXPECT_NE(std::this_thread::get_id(), bg.get_future().get()); +} + +/// @test Verify that automatically created completion queues are usable. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, NoEmptyPools) { + AutomaticallyCreatedRestPureBackgroundThreads actual(0); + EXPECT_EQ(1, actual.pool_size()); + + promise bg; + actual.cq().RunAsync([&bg] { bg.set_value(std::this_thread::get_id()); }); + EXPECT_NE(std::this_thread::get_id(), bg.get_future().get()); +} + +/// @test Verify that automatically created completion queues work. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, ManyThreads) { + auto constexpr kThreadCount = 4; + AutomaticallyCreatedRestPureBackgroundThreads actual(kThreadCount); + EXPECT_EQ(kThreadCount, actual.pool_size()); + + std::vector> promises(100 * kThreadCount); + for (auto& p : promises) { + actual.cq().RunAsync([&p] { p.set_value(std::this_thread::get_id()); }); + } + std::set ids; + for (auto& p : promises) ids.insert(p.get_future().get()); + EXPECT_FALSE(ids.empty()); + EXPECT_GE(kThreadCount, ids.size()); + EXPECT_THAT(ids, Not(Contains(std::this_thread::get_id()))); +} + +/// @test Verify that automatically created completion queues work. +TEST(AutomaticallyCreatedRestPureBackgroundThreads, ManualShutdown) { + auto constexpr kThreadCount = 4; + AutomaticallyCreatedRestPureBackgroundThreads actual(kThreadCount); + EXPECT_EQ(kThreadCount, actual.pool_size()); + + std::vector> promises(2 * kThreadCount); + for (auto& p : promises) { + actual.cq().RunAsync([&p] { p.set_value(); }); + } + for (auto& p : promises) p.get_future().get(); + actual.Shutdown(); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace rest_internal +} // namespace cloud +} // namespace google