diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp index fc3c6421a..ba130e768 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp @@ -14,6 +14,8 @@ class IFDv2SynchronizerFactory { public: virtual std::unique_ptr Build() = 0; + [[nodiscard]] virtual bool IsFDv1Fallback() const { return false; } + virtual ~IFDv2SynchronizerFactory() = default; IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete; IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete; diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index b3caa5aba..695b08926 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -43,6 +43,7 @@ FDv2DataSystem::FDv2DataSystem( store_(), change_notifier_(store_, store_), initialize_called_(false), + last_logged_synchronizer_interrupted_(false), closed_(false), selector_(), initializer_index_(0), @@ -67,7 +68,6 @@ void FDv2DataSystem::Close() { if (active_conditions_) { active_conditions_->Close(); } - status_manager_->SetState(DataSourceStatus::DataSourceState::kOff); } std::shared_ptr FDv2DataSystem::GetFlag( @@ -121,6 +121,9 @@ void FDv2DataSystem::RunNextInitializer() { } else { auto& factory = initializer_factories_[initializer_index_++]; active_initializer_ = factory->Build(); + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": starting initializer " + << active_initializer_->Identity(); active_initializer_->Run().Then( [this](data_interfaces::FDv2SourceResult const& result) -> std::monostate { @@ -152,6 +155,8 @@ void FDv2DataSystem::OnInitializerResult( cs.change_set.selector.value.has_value(); ApplyChangeSet(std::move(cs.change_set)); if (has_selector) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": initializer succeeded"; got_basis = true; } }, @@ -185,6 +190,12 @@ void FDv2DataSystem::OnInitializerResult( if (closed_ || got_shutdown) { return; } + if (result.fdv1_fallback) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": FDv1 fallback engaged"; + source_manager_.SwitchToFDv1Fallback(); + got_basis = true; + } } if (got_basis) { @@ -205,6 +216,10 @@ void FDv2DataSystem::StartSynchronizers() { } active_synchronizer_ = source_manager_.NextSynchronizer(); if (active_synchronizer_) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": starting synchronizer " + << active_synchronizer_->Identity(); + last_logged_synchronizer_interrupted_.store(false); active_conditions_ = BuildActiveConditions(); } else { exhausted = true; @@ -314,33 +329,37 @@ void FDv2DataSystem::OnSynchronizerResult( bool got_shutdown = false; bool advance = false; - std::visit(overloaded{ - [&](Result::ChangeSet& cs) { - ApplyChangeSet(std::move(cs.change_set)); - }, - [&](Result::Shutdown&) { got_shutdown = true; }, - [&](Result::Interrupted const& iv) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() << ": synchronizer interrupted: " - << iv.error.Message(); - status_manager_->SetState( - DataSourceStatus::DataSourceState::kInterrupted, - iv.error.Kind(), iv.error.Message()); - }, - [&](Result::TerminalError const& te) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() << ": synchronizer terminal error: " - << te.error.Message(); - status_manager_->SetState( - DataSourceStatus::DataSourceState::kInterrupted, - te.error.Kind(), te.error.Message()); - advance = true; - }, - [&](Result::Goodbye const&) { - // The synchronizer handles this internally. - }, - }, - result.value); + std::visit( + overloaded{ + [&](Result::ChangeSet& cs) { + last_logged_synchronizer_interrupted_.store(false); + ApplyChangeSet(std::move(cs.change_set)); + }, + [&](Result::Shutdown&) { got_shutdown = true; }, + [&](Result::Interrupted const& iv) { + if (!last_logged_synchronizer_interrupted_.exchange(true)) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() + << ": synchronizer interrupted: " << iv.error.Message(); + } + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + iv.error.Kind(), iv.error.Message()); + }, + [&](Result::TerminalError const& te) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": synchronizer terminal error: " << te.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + te.error.Kind(), te.error.Message()); + advance = true; + }, + [&](Result::Goodbye const&) { + // The synchronizer handles this internally. + }, + }, + result.value); { std::lock_guard lock(mutex_); @@ -349,7 +368,14 @@ void FDv2DataSystem::OnSynchronizerResult( active_conditions_.reset(); return; } - if (advance) { + if (result.fdv1_fallback) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": FDv1 fallback engaged"; + source_manager_.SwitchToFDv1Fallback(); + active_synchronizer_.reset(); + active_conditions_.reset(); + advance = true; + } else if (advance) { source_manager_.BlockCurrentSynchronizer(); active_synchronizer_.reset(); active_conditions_.reset(); diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp index 7957de429..2eb243140 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp @@ -51,9 +51,8 @@ namespace launchdarkly::server_side::data_systems { * Destruction protocol: * * The destructor cancels in-flight orchestration (closes the active - * source, emits status kOff), but does NOT block to drain executor - * callbacks that may already be queued. Before destroying, the caller - * must ensure both of: + * source) but does NOT block to drain executor callbacks that may + * already be queued. Before destroying, the caller must ensure both of: * * 1. The executor that orchestration callbacks run on has been stopped * AND any thread running it has been joined. Otherwise a previously- @@ -120,8 +119,6 @@ namespace launchdarkly::server_side::data_systems { * v * [Done; final status preserved] * - * Calling the destructor at any time -> [Closed; status kOff]. - * * Status transitions: * * kInitializing (initial) -> kValid on first successful ChangeSet apply. @@ -130,11 +127,10 @@ namespace launchdarkly::server_side::data_systems { * the initializer phase if not yet Valid). * kOff if all initializers exhaust without data * and no synchronizers are configured. - * kValid -> kInterrupted on errors; kOff in destructor or - * when synchronizers cycle through and exhaust. + * kValid -> kInterrupted on errors; kOff when + * synchronizers cycle through and exhaust. * kInterrupted -> kValid on next successful ChangeSet apply; - * kOff in destructor or on synchronizer - * exhaustion. + * kOff on synchronizer exhaustion. * kOff -> terminal. */ class FDv2DataSystem final : public data_interfaces::IDataSystem { @@ -283,6 +279,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { // Set by Initialize() to detect repeat or concurrent calls. std::atomic_bool initialize_called_; + // Suppresses consecutive "interrupted" logs from the active synchronizer. + std::atomic_bool last_logged_synchronizer_interrupted_; + // Orchestration state, guarded by mutex_. std::mutex mutex_; bool closed_; diff --git a/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp b/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp index 362f13efd..4af2405a6 100644 --- a/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp @@ -11,9 +11,11 @@ SourceManager::SourceManager( std::vector> factories) { synchronizers_.reserve(factories.size()); for (auto& factory : factories) { - synchronizers_.push_back( - SynchronizerFactoryWithState{std::move(factory), State::kAvailable, - /*is_fdv1_fallback=*/false}); + bool const is_fdv1_fallback = factory->IsFDv1Fallback(); + synchronizers_.push_back(SynchronizerFactoryWithState{ + std::move(factory), + is_fdv1_fallback ? State::kBlocked : State::kAvailable, + is_fdv1_fallback}); } } @@ -44,6 +46,14 @@ void SourceManager::ResetSourceIndex() { synchronizer_index_ = -1; } +void SourceManager::SwitchToFDv1Fallback() { + for (auto& entry : synchronizers_) { + entry.state = + entry.is_fdv1_fallback ? State::kAvailable : State::kBlocked; + } + synchronizer_index_ = -1; +} + bool SourceManager::IsPrimeSynchronizer() const { for (std::size_t i = 0; i < synchronizers_.size(); ++i) { if (synchronizers_[i].state == State::kAvailable) { diff --git a/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp b/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp index 1f310bfed..bd2347ccf 100644 --- a/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp @@ -23,8 +23,7 @@ namespace launchdarkly::server_side::data_systems { * by recovery, which wants to fall back to the most-preferred Available * synchronizer. * - * Each factory also carries an is_fdv1_fallback flag, currently always - * false. TODO: populate when the FDv1 fallback directive is implemented. + * Factories whose IsFDv1Fallback() returns true start in the Blocked state. * * Not thread-safe. The caller is responsible for serializing all calls. */ @@ -54,6 +53,14 @@ class SourceManager { */ void ResetSourceIndex(); + /** + * Blocks every non-FDv1 factory and unblocks the FDv1 fallback factory, + * if one was configured. Resets the iteration cursor so the next call to + * NextSynchronizer returns the FDv1 fallback. If no FDv1 fallback factory + * was configured, every factory is left blocked. + */ + void SwitchToFDv1Fallback(); + /** * Returns true if the currently tracked factory is the first Available * factory in the list. Returns false if no factory is currently tracked. @@ -73,9 +80,8 @@ class SourceManager { [[nodiscard]] std::size_t SynchronizerCount() const; /** - * Returns true if the currently tracked factory was configured as the - * FDv1 fallback synchronizer. Always false until the FDv1 fallback - * directive is implemented. + * Returns true if the currently tracked factory is the FDv1 fallback + * synchronizer. */ [[nodiscard]] bool IsCurrentSynchronizerFDv1Fallback() const; diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index 48d9b49d1..3170fcb7d 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -145,6 +145,15 @@ class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory { std::unique_ptr source_; }; +class FDv1FallbackOneShotFactory : public OneShotSynchronizerFactory { + public: + explicit FDv1FallbackOneShotFactory( + std::unique_ptr source) + : OneShotSynchronizerFactory(std::move(source)) {} + + bool IsFDv1Fallback() const override { return true; } +}; + // Returns each pre-supplied source in order on successive Build() calls. // Returns nullptr once the supply is exhausted. Used in tests that exercise // wrap-around or recovery, where the same factory is built more than once. @@ -263,25 +272,6 @@ TEST(FDv2DataSystemTest, OfflineMode_NoFactories_StatusValid) { EXPECT_FALSE(ds.Initialized()); } -TEST(FDv2DataSystemTest, Destructor_TransitionsStatusToOff) { - auto logger = MakeNullLogger(); - boost::asio::io_context ioc; - data_components::DataSourceStatusManager status_manager; - - { - FDv2DataSystem ds({}, {}, /*fallback_condition_factory=*/nullptr, - /*recovery_condition_factory=*/nullptr, - ioc.get_executor(), &status_manager, logger); - ds.Initialize(); - ASSERT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kValid); - } - - // After ~FDv2DataSystem, status is Off. - EXPECT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kOff); -} - // ============================================================================ // Initializer phase // ============================================================================ @@ -1090,19 +1080,141 @@ TEST(FDv2DataSystemTest, SingleSynchronizerHasNoFallbackArmed) { status_manager.Status().State()); } +// ============================================================================ +// FDv1 fallback directive +// ============================================================================ + +TEST(FDv2DataSystemTest, SynchronizerFdv1FlagSwitchesToFdv1Adapter) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // FDv2 synchronizer emits a ChangeSet with the directive, then closes. + auto fdv2_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kNone, + {}, + data_model::Selector{}}}}; + r.fdv1_fallback = true; + return r; + }()}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + + // FDv1 adapter returns Shutdown when reached, ending orchestration. + auto fdv1_sync = + std::make_unique(std::vector{}); + auto fdv1_factory = + std::make_unique(std::move(fdv1_sync)); + auto* fdv1_factory_ptr = fdv1_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + synchronizers.push_back(std::move(fdv1_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(1, fdv1_factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, SynchronizerFdv1FlagWithoutAdapterTransitionsOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + auto fdv2_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{ + FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, + /*status_code=*/418, "directive", + std::chrono::system_clock::now()}}}; + r.fdv1_fallback = true; + return r; + }()}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(DataSourceStatus::DataSourceState::kOff, + status_manager.Status().State()); +} + +TEST(FDv2DataSystemTest, InitializerFdv1FlagSwitchesToFdv1Adapter) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Initializer returns Interrupted with the directive set. + FDv2SourceResult init_result{ + FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, + /*status_code=*/418, "directive", + std::chrono::system_clock::now()}}}; + init_result.fdv1_fallback = true; + auto initializer = + std::make_unique(std::move(init_result)); + + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(initializer))); + + auto fdv2_sync = + std::make_unique(std::vector{}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + auto* fdv2_factory_ptr = fdv2_factory.get(); + + auto fdv1_sync = + std::make_unique(std::vector{}); + auto fdv1_factory = + std::make_unique(std::move(fdv1_sync)); + auto* fdv1_factory_ptr = fdv1_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + synchronizers.push_back(std::move(fdv1_factory)); + + FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + // FDv2 synchronizer was skipped; FDv1 adapter was built and ran. + EXPECT_EQ(0, fdv2_factory_ptr->build_count_); + EXPECT_EQ(1, fdv1_factory_ptr->build_count_); +} + // ============================================================================ // Destruction protocol: in-flight orchestration // ============================================================================ // // The destructor contract (fdv2_data_system.hpp) requires the destructor to -// cancel in-flight orchestration (close the active source, transition status -// to kOff) without firing any continuation against the destroyed object. The -// caller's responsibility is to ensure the executor is no longer running by -// the time destruction begins; the orchestrator's responsibility is to leave -// nothing dangling. These two tests pin that contract for both phases. +// cancel in-flight orchestration (close the active source) without firing +// any continuation against the destroyed object. The caller's responsibility +// is to ensure the executor is no longer running by the time destruction +// begins; the orchestrator's responsibility is to leave nothing dangling. +// These two tests pin that contract for both phases. -TEST(FDv2DataSystemTest, - Destructor_WithInFlightInitializer_ClosesSourceAndStatusOff) { +TEST(FDv2DataSystemTest, Destructor_WithInFlightInitializer_ClosesSource) { auto logger = MakeNullLogger(); boost::asio::io_context ioc; data_components::DataSourceStatusManager status_manager; @@ -1129,12 +1241,9 @@ TEST(FDv2DataSystemTest, // ~FDv2DataSystem ran with the initializer's Future still unresolved. EXPECT_TRUE(initializer_closed); - EXPECT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kOff); } -TEST(FDv2DataSystemTest, - Destructor_WithInFlightSynchronizer_ClosesSourceAndStatusOff) { +TEST(FDv2DataSystemTest, Destructor_WithInFlightSynchronizer_ClosesSource) { auto logger = MakeNullLogger(); boost::asio::io_context ioc; data_components::DataSourceStatusManager status_manager; @@ -1160,6 +1269,4 @@ TEST(FDv2DataSystemTest, } EXPECT_TRUE(synchronizer_closed); - EXPECT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kOff); } diff --git a/libs/server-sdk/tests/source_manager_test.cpp b/libs/server-sdk/tests/source_manager_test.cpp index 244ca6400..77b207b46 100644 --- a/libs/server-sdk/tests/source_manager_test.cpp +++ b/libs/server-sdk/tests/source_manager_test.cpp @@ -43,6 +43,11 @@ class CountingFactory : public IFDv2SynchronizerFactory { int build_count = 0; }; +class FDv1FallbackFactory : public CountingFactory { + public: + bool IsFDv1Fallback() const override { return true; } +}; + } // namespace TEST(SourceManagerTest, EmptyManagerReportsZeroAvailable) { @@ -176,7 +181,7 @@ TEST(SourceManagerTest, ResetSourceIndexSkipsBlockedFirstFactory) { EXPECT_EQ(1, f1_ptr->build_count); } -TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) { +TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackFalseForFDv2Factory) { auto f0 = std::make_unique(); std::vector> factories; factories.push_back(std::move(f0)); @@ -185,3 +190,68 @@ TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) { mgr.NextSynchronizer(); EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback()); } + +TEST(SourceManagerTest, FDv1FallbackFactoryStartsBlockedAndIsSkipped) { + auto fdv2 = std::make_unique(); + auto fdv1 = std::make_unique(); + auto* fdv1_ptr = fdv1.get(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + factories.push_back(std::move(fdv1)); + SourceManager mgr(std::move(factories)); + + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + mgr.NextSynchronizer(); + EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback()); + EXPECT_EQ(0, fdv1_ptr->build_count); +} + +TEST(SourceManagerTest, SwitchToFDv1FallbackBlocksFDv2AndUnblocksFDv1) { + auto fdv2 = std::make_unique(); + auto fdv1 = std::make_unique(); + auto* fdv1_ptr = fdv1.get(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + factories.push_back(std::move(fdv1)); + SourceManager mgr(std::move(factories)); + + mgr.SwitchToFDv1Fallback(); + + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + auto sync = mgr.NextSynchronizer(); + ASSERT_NE(sync, nullptr); + EXPECT_EQ(1, fdv1_ptr->build_count); + EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback()); +} + +TEST(SourceManagerTest, SwitchToFDv1FallbackWithoutAdapterBlocksEverything) { + auto fdv2 = std::make_unique(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + SourceManager mgr(std::move(factories)); + + mgr.SwitchToFDv1Fallback(); + + EXPECT_EQ(0u, mgr.AvailableSynchronizerCount()); + EXPECT_EQ(nullptr, mgr.NextSynchronizer()); +} + +TEST(SourceManagerTest, SwitchToFDv1FallbackUnblocksPreviouslyBlockedFDv2) { + auto fdv2 = std::make_unique(); + auto fdv1 = std::make_unique(); + auto* fdv1_ptr = fdv1.get(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + factories.push_back(std::move(fdv1)); + SourceManager mgr(std::move(factories)); + + mgr.NextSynchronizer(); + mgr.BlockCurrentSynchronizer(); + mgr.SwitchToFDv1Fallback(); + + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + auto sync = mgr.NextSynchronizer(); + ASSERT_NE(sync, nullptr); + EXPECT_EQ(1, fdv1_ptr->build_count); + EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback()); +}