Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libs/server-sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ target_sources(${LIBNAME}
data_systems/fdv2/source_manager.cpp
data_systems/fdv2/fdv2_data_system.hpp
data_systems/fdv2/fdv2_data_system.cpp
data_systems/fdv2/fdv1_adapter_synchronizer.hpp
data_systems/fdv2/fdv1_adapter_synchronizer.cpp
data_systems/background_sync/sources/streaming/streaming_data_source.hpp
data_systems/background_sync/sources/streaming/streaming_data_source.cpp
data_systems/background_sync/sources/streaming/event_handler.hpp
Expand Down
187 changes: 187 additions & 0 deletions libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#include "fdv1_adapter_synchronizer.hpp"

#include <utility>

namespace launchdarkly::server_side::data_systems {

using data_interfaces::FDv2SourceResult;

// ----- State -----

bool FDv1AdapterSynchronizer::State::TryStart() {
std::lock_guard lock(mutex_);
if (started_ || closed_) {
return false;
}
started_ = true;
return true;
}

bool FDv1AdapterSynchronizer::State::MarkClosed() {
std::lock_guard lock(mutex_);
closed_ = true;
return started_;
}

async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::State::GetNext() {
std::lock_guard lock(mutex_);
if (!result_queue_.empty()) {
auto result = std::move(result_queue_.front());
result_queue_.pop_front();
return async::MakeFuture(std::move(result));
}
return pending_promise_.emplace().GetFuture();
}

void FDv1AdapterSynchronizer::State::ResolvePendingAsShutdown() {
std::optional<async::Promise<FDv2SourceResult>> promise;
{
std::lock_guard lock(mutex_);
if (pending_promise_) {
promise = std::move(pending_promise_);
pending_promise_.reset();
}
}
if (promise) {
promise->Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}});
}
}

void FDv1AdapterSynchronizer::State::Notify(FDv2SourceResult result) {
std::optional<async::Promise<FDv2SourceResult>> promise;
{
std::lock_guard lock(mutex_);
if (closed_) {
return;
}
if (pending_promise_) {
promise = std::move(pending_promise_);
pending_promise_.reset();
} else {
result_queue_.push_back(std::move(result));
return;
}
}
// Resolve outside the lock — Promise::Resolve may invoke inline
// continuations that could call back into Notify or GetNext.
promise->Resolve(std::move(result));
}

// ----- ConvertingDestination -----

FDv1AdapterSynchronizer::ConvertingDestination::ConvertingDestination(
std::weak_ptr<State> state)
: state_(std::move(state)) {}

void FDv1AdapterSynchronizer::ConvertingDestination::Init(
data_model::SDKDataSet data_set) {
auto state = state_.lock();
if (!state) {
return;
}
data_interfaces::ChangeSetData changes;
changes.reserve(data_set.flags.size() + data_set.segments.size());
for (auto& [key, flag] : data_set.flags) {
changes.push_back({key, std::move(flag)});
}
for (auto& [key, segment] : data_set.segments) {
changes.push_back({key, std::move(segment)});
}
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kFull, std::move(changes),
data_model::Selector{}}}});
}

void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
std::string const& key,
data_model::FlagDescriptor flag) {
auto state = state_.lock();
if (!state) {
return;
}
data_interfaces::ChangeSetData changes;
changes.push_back({key, std::move(flag)});
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kPartial, std::move(changes),
data_model::Selector{}}}});
}

void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
std::string const& key,
data_model::SegmentDescriptor segment) {
auto state = state_.lock();
if (!state) {
return;
}
data_interfaces::ChangeSetData changes;
changes.push_back({key, std::move(segment)});
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kPartial, std::move(changes),
data_model::Selector{}}}});
}

std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity()
const {
static std::string const identity = "FDv1 adapter destination";
return identity;
}

// ----- FDv1AdapterSynchronizer -----

FDv1AdapterSynchronizer::FDv1AdapterSynchronizer(
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source)
: state_(std::make_shared<State>()),
destination_(std::make_unique<ConvertingDestination>(state_)),
fdv1_source_(std::move(fdv1_source)) {}

FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() {
Close();
}

async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::Next(
data_model::Selector /*selector*/) {
auto closed = close_promise_.GetFuture();
if (closed.IsFinished()) {
return async::MakeFuture(
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
}
if (state_->TryStart()) {
fdv1_source_->StartAsync(destination_.get(),
/*bootstrap_data=*/nullptr);
}
auto result_future = state_->GetNext();
if (result_future.IsFinished()) {
return result_future;
}
return async::WhenAny(closed, result_future)
.Then(
[state = state_, result_future](std::size_t const& idx) mutable
-> async::Future<FDv2SourceResult> {
if (idx == 0) {
state->ResolvePendingAsShutdown();
return async::MakeFuture(
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
}
return result_future;
},
async::kInlineExecutor);
}

void FDv1AdapterSynchronizer::Close() {
if (!close_promise_.Resolve(std::monostate{})) {
return;
}
if (state_->MarkClosed()) {
fdv1_source_->ShutdownAsync([] {});
}
}

std::string const& FDv1AdapterSynchronizer::Identity() const {
static std::string const identity = "FDv1 fallback adapter";
return identity;
}

} // namespace launchdarkly::server_side::data_systems
107 changes: 107 additions & 0 deletions libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#pragma once

#include "../../data_interfaces/destination/idestination.hpp"
#include "../../data_interfaces/source/idata_synchronizer.hpp"
#include "../../data_interfaces/source/ifdv2_synchronizer.hpp"

#include <launchdarkly/async/promise.hpp>

#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <variant>

namespace launchdarkly::server_side::data_systems {

/**
* Adapts an FDv1 IDataSynchronizer to the IFDv2Synchronizer interface.
*
* FDv1 Init/Upsert callbacks delivered through an internal IDestination are
* translated into FDv2SourceResult::ChangeSet results, with empty selectors
* and fdv1_fallback = false (the directive does not re-fire from FDv1 data).
*
* Threading: Next() and Close() may be called from any thread; only one
* Next() may be outstanding at a time. The adapter blocks in its destructor
* waiting for the FDv1 source's ShutdownAsync completion, so no callbacks
* are in flight when the wrapped source is destroyed.
*/
class FDv1AdapterSynchronizer final
: public data_interfaces::IFDv2Synchronizer {
public:
explicit FDv1AdapterSynchronizer(
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source);

~FDv1AdapterSynchronizer() override;

async::Future<data_interfaces::FDv2SourceResult> Next(
data_model::Selector selector) override;
void Close() override;
[[nodiscard]] std::string const& Identity() const override;

private:
/**
* Holds the lifecycle, result queue, and pending Next() promise; shared
* with the FDv1 source's IDestination via the inner ConvertingDestination.
* All methods are thread-safe.
*/
class State {
public:
// Returns true if this call transitioned Initial → Started; false if
// already started or already closed. Used to gate the one-time
// StartAsync call on the wrapped FDv1 source.
bool TryStart();

// Marks the state closed and returns whether the source was started
// before the transition (so the caller knows whether ShutdownAsync
// needs to be called).
bool MarkClosed();

async::Future<data_interfaces::FDv2SourceResult> GetNext();

// Resolves any pending Next() promise with Shutdown and clears it.
// Called on the close path so the abandoned promise doesn't leave
// potential continuations dangling.
void ResolvePendingAsShutdown();

void Notify(data_interfaces::FDv2SourceResult result);

private:
// Protected by mutex_.
mutable std::mutex mutex_;
bool started_ = false;
bool closed_ = false;
std::optional<async::Promise<data_interfaces::FDv2SourceResult>>
pending_promise_;
std::deque<data_interfaces::FDv2SourceResult> result_queue_;
};

/**
* Translates FDv1 IDestination callbacks into FDv2 results queued on
* State. Thread-safe (delegates to State).
*/
class ConvertingDestination final : public data_interfaces::IDestination {
public:
explicit ConvertingDestination(std::weak_ptr<State> state);
void Init(data_model::SDKDataSet data_set) override;
void Upsert(std::string const& key,
data_model::FlagDescriptor flag) override;
void Upsert(std::string const& key,
data_model::SegmentDescriptor segment) override;
[[nodiscard]] std::string const& Identity() const override;

private:
std::weak_ptr<State> state_;
};

// const after construction.
std::shared_ptr<State> const state_;
std::unique_ptr<ConvertingDestination> const destination_;
std::unique_ptr<data_interfaces::IDataSynchronizer> const fdv1_source_;

// Thread-safe primitive.
async::Promise<std::monostate> close_promise_;
};

} // namespace launchdarkly::server_side::data_systems
Loading
Loading