Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class IFDv2SynchronizerFactory {
public:
virtual std::unique_ptr<IFDv2Synchronizer> Build() = 0;

[[nodiscard]] virtual bool IsFDv1Fallback() const { return false; }

virtual ~IFDv2SynchronizerFactory() = default;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete;
Expand Down
84 changes: 55 additions & 29 deletions libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ FDv2DataSystem::FDv2DataSystem(
store_(),
change_notifier_(store_, store_),
initialize_called_(false),
last_logged_synchronizer_interrupted_(false),
closed_(false),
selector_(),
initializer_index_(0),
Expand All @@ -67,7 +68,6 @@ void FDv2DataSystem::Close() {
if (active_conditions_) {
active_conditions_->Close();
}
status_manager_->SetState(DataSourceStatus::DataSourceState::kOff);
}

std::shared_ptr<data_model::FlagDescriptor> FDv2DataSystem::GetFlag(
Expand Down Expand Up @@ -121,6 +121,9 @@ void FDv2DataSystem::RunNextInitializer() {
} else {
auto& factory = initializer_factories_[initializer_index_++];
active_initializer_ = factory->Build();
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": starting initializer "
<< active_initializer_->Identity();
active_initializer_->Run().Then(
[this](data_interfaces::FDv2SourceResult const& result)
-> std::monostate {
Expand Down Expand Up @@ -152,6 +155,8 @@ void FDv2DataSystem::OnInitializerResult(
cs.change_set.selector.value.has_value();
ApplyChangeSet(std::move(cs.change_set));
if (has_selector) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": initializer succeeded";
got_basis = true;
}
},
Expand Down Expand Up @@ -185,6 +190,12 @@ void FDv2DataSystem::OnInitializerResult(
if (closed_ || got_shutdown) {
return;
}
if (result.fdv1_fallback) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv1 fallback engaged";
source_manager_.SwitchToFDv1Fallback();
got_basis = true;
}
}

if (got_basis) {
Expand All @@ -205,6 +216,10 @@ void FDv2DataSystem::StartSynchronizers() {
}
active_synchronizer_ = source_manager_.NextSynchronizer();
if (active_synchronizer_) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": starting synchronizer "
<< active_synchronizer_->Identity();
last_logged_synchronizer_interrupted_.store(false);
active_conditions_ = BuildActiveConditions();
} else {
exhausted = true;
Expand Down Expand Up @@ -314,33 +329,37 @@ void FDv2DataSystem::OnSynchronizerResult(
bool got_shutdown = false;
bool advance = false;

std::visit(overloaded{
[&](Result::ChangeSet& cs) {
ApplyChangeSet(std::move(cs.change_set));
},
[&](Result::Shutdown&) { got_shutdown = true; },
[&](Result::Interrupted const& iv) {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity() << ": synchronizer interrupted: "
<< iv.error.Message();
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
iv.error.Kind(), iv.error.Message());
},
[&](Result::TerminalError const& te) {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity() << ": synchronizer terminal error: "
<< te.error.Message();
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
te.error.Kind(), te.error.Message());
advance = true;
},
[&](Result::Goodbye const&) {
// The synchronizer handles this internally.
},
},
result.value);
std::visit(
overloaded{
[&](Result::ChangeSet& cs) {
last_logged_synchronizer_interrupted_.store(false);
ApplyChangeSet(std::move(cs.change_set));
},
[&](Result::Shutdown&) { got_shutdown = true; },
[&](Result::Interrupted const& iv) {
if (!last_logged_synchronizer_interrupted_.exchange(true)) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity()
<< ": synchronizer interrupted: " << iv.error.Message();
}
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
iv.error.Kind(), iv.error.Message());
},
[&](Result::TerminalError const& te) {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity()
<< ": synchronizer terminal error: " << te.error.Message();
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
te.error.Kind(), te.error.Message());
advance = true;
},
[&](Result::Goodbye const&) {
// The synchronizer handles this internally.
},
},
result.value);

{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -349,7 +368,14 @@ void FDv2DataSystem::OnSynchronizerResult(
active_conditions_.reset();
return;
}
if (advance) {
if (result.fdv1_fallback) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv1 fallback engaged";
source_manager_.SwitchToFDv1Fallback();
active_synchronizer_.reset();
active_conditions_.reset();
advance = true;
} else if (advance) {
source_manager_.BlockCurrentSynchronizer();
active_synchronizer_.reset();
active_conditions_.reset();
Expand Down
17 changes: 8 additions & 9 deletions libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ namespace launchdarkly::server_side::data_systems {
* Destruction protocol:
*
* The destructor cancels in-flight orchestration (closes the active
* source, emits status kOff), but does NOT block to drain executor
* callbacks that may already be queued. Before destroying, the caller
* must ensure both of:
* source) but does NOT block to drain executor callbacks that may
* already be queued. Before destroying, the caller must ensure both of:
*
* 1. The executor that orchestration callbacks run on has been stopped
* AND any thread running it has been joined. Otherwise a previously-
Expand Down Expand Up @@ -120,8 +119,6 @@ namespace launchdarkly::server_side::data_systems {
* v
* [Done; final status preserved]
*
* Calling the destructor at any time -> [Closed; status kOff].
*
* Status transitions:
*
* kInitializing (initial) -> kValid on first successful ChangeSet apply.
Expand All @@ -130,11 +127,10 @@ namespace launchdarkly::server_side::data_systems {
* the initializer phase if not yet Valid).
* kOff if all initializers exhaust without data
* and no synchronizers are configured.
* kValid -> kInterrupted on errors; kOff in destructor or
* when synchronizers cycle through and exhaust.
* kValid -> kInterrupted on errors; kOff when
* synchronizers cycle through and exhaust.
* kInterrupted -> kValid on next successful ChangeSet apply;
* kOff in destructor or on synchronizer
* exhaustion.
* kOff on synchronizer exhaustion.
* kOff -> terminal.
*/
class FDv2DataSystem final : public data_interfaces::IDataSystem {
Expand Down Expand Up @@ -283,6 +279,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem {
// Set by Initialize() to detect repeat or concurrent calls.
std::atomic_bool initialize_called_;

// Suppresses consecutive "interrupted" logs from the active synchronizer.
std::atomic_bool last_logged_synchronizer_interrupted_;

// Orchestration state, guarded by mutex_.
std::mutex mutex_;
bool closed_;
Expand Down
16 changes: 13 additions & 3 deletions libs/server-sdk/src/data_systems/fdv2/source_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ SourceManager::SourceManager(
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories) {
synchronizers_.reserve(factories.size());
for (auto& factory : factories) {
synchronizers_.push_back(
SynchronizerFactoryWithState{std::move(factory), State::kAvailable,
/*is_fdv1_fallback=*/false});
bool const is_fdv1_fallback = factory->IsFDv1Fallback();
synchronizers_.push_back(SynchronizerFactoryWithState{
std::move(factory),
is_fdv1_fallback ? State::kBlocked : State::kAvailable,
is_fdv1_fallback});
}
}

Expand Down Expand Up @@ -44,6 +46,14 @@ void SourceManager::ResetSourceIndex() {
synchronizer_index_ = -1;
}

void SourceManager::SwitchToFDv1Fallback() {
for (auto& entry : synchronizers_) {
entry.state =
entry.is_fdv1_fallback ? State::kAvailable : State::kBlocked;
}
synchronizer_index_ = -1;
}

bool SourceManager::IsPrimeSynchronizer() const {
for (std::size_t i = 0; i < synchronizers_.size(); ++i) {
if (synchronizers_[i].state == State::kAvailable) {
Expand Down
16 changes: 11 additions & 5 deletions libs/server-sdk/src/data_systems/fdv2/source_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ namespace launchdarkly::server_side::data_systems {
* by recovery, which wants to fall back to the most-preferred Available
* synchronizer.
*
* Each factory also carries an is_fdv1_fallback flag, currently always
* false. TODO: populate when the FDv1 fallback directive is implemented.
* Factories whose IsFDv1Fallback() returns true start in the Blocked state.
*
* Not thread-safe. The caller is responsible for serializing all calls.
*/
Expand Down Expand Up @@ -54,6 +53,14 @@ class SourceManager {
*/
void ResetSourceIndex();

/**
* Blocks every non-FDv1 factory and unblocks the FDv1 fallback factory,
* if one was configured. Resets the iteration cursor so the next call to
* NextSynchronizer returns the FDv1 fallback. If no FDv1 fallback factory
* was configured, every factory is left blocked.
*/
void SwitchToFDv1Fallback();

/**
* Returns true if the currently tracked factory is the first Available
* factory in the list. Returns false if no factory is currently tracked.
Expand All @@ -73,9 +80,8 @@ class SourceManager {
[[nodiscard]] std::size_t SynchronizerCount() const;

/**
* Returns true if the currently tracked factory was configured as the
* FDv1 fallback synchronizer. Always false until the FDv1 fallback
* directive is implemented.
* Returns true if the currently tracked factory is the FDv1 fallback
* synchronizer.
*/
[[nodiscard]] bool IsCurrentSynchronizerFDv1Fallback() const;

Expand Down
Loading
Loading