Skip to content
14 changes: 12 additions & 2 deletions libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ namespace launchdarkly {
/**
* Protocol state machine for the FDv2 wire format.
*
* Accumulates put-object and delete-object events between a server-intent
* and payload-transferred event, then emits a complete FDv2ChangeSet.
* A server-intent opens a transfer cycle: put-object and delete-object
* events accumulate until a payload-transferred event, which emits an
* FDv2ChangeSet. The handler then remains active — subsequent put/delete
* + payload-transferred cycles emit kPartial changesets reusing the prior
* intent, until the server sends a new server-intent, error, or goodbye.
*
* Shared between the polling and streaming synchronizers.
*/
Expand Down Expand Up @@ -95,6 +98,13 @@ class FDv2ProtocolHandler {
*/
void Reset();

/**
* @return true if event_type is one that the protocol handler recognizes
* and may dispatch on. Events outside this set are spec-defined as
* "unrecognized data that can be safely ignored".
*/
static bool IsKnownEvent(std::string_view event_type);

FDv2ProtocolHandler() = default;

private:
Expand Down
21 changes: 13 additions & 8 deletions libs/internal/src/fdv2_protocol_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ static char const* const kGoodbye = "goodbye";

using Error = FDv2ProtocolHandler::Error;

bool FDv2ProtocolHandler::IsKnownEvent(std::string_view event_type) {
return event_type == kServerIntent || event_type == kPutObject ||
event_type == kDeleteObject || event_type == kPayloadTransferred ||
event_type == kError || event_type == kGoodbye;
}

FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleEvent(
std::string_view event_type,
boost::json::value const& data) {
Expand Down Expand Up @@ -78,9 +84,6 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleServerIntent(

FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject(
boost::json::value const& data) {
if (state_ == State::kInactive) {
return std::monostate{};
}
auto result = boost::json::value_to<
tl::expected<std::optional<PutObject>, JsonError>>(data);
if (!result) {
Expand All @@ -101,9 +104,6 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject(

FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleDeleteObject(
boost::json::value const& data) {
if (state_ == State::kInactive) {
return std::monostate{};
}
auto result = boost::json::value_to<
tl::expected<std::optional<DeleteObject>, JsonError>>(data);
if (!result) {
Expand Down Expand Up @@ -152,15 +152,20 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePayloadTransferred(
type, std::move(changes_),
data_model::Selector{data_model::Selector::State{transferred.version,
transferred.state}}};
Reset();
// Transition to kPartial so subsequent put-object/payload-transferred
// cycles work without requiring a new server-intent.
changes_.clear();
state_ = State::kPartial;
return changeset;
}

FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleError(
boost::json::value const& data) {
auto result = boost::json::value_to<
tl::expected<std::optional<FDv2Error>, JsonError>>(data);
Reset();
// Discard any partial-payload accumulation but keep state intact so
// the next put-object/payload-transferred cycle continues normally.
changes_.clear();
if (!result) {
return Error::JsonParseError(result.error(),
"could not deserialize error event");
Expand Down
50 changes: 50 additions & 0 deletions libs/internal/tests/fdv2_protocol_handler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,29 @@ TEST(FDv2ProtocolHandlerTest,
EXPECT_TRUE(cs->changes.empty());
}

TEST(FDv2ProtocolHandlerTest, ErrorMidPayloadDiscardsPartialAcceptsSubsequent) {
FDv2ProtocolHandler handler;

handler.HandleEvent("server-intent", MakeServerIntent("xfer-full"));
handler.HandleEvent("put-object",
MakePutObject("flag", "abandoned", kFlagJson));
handler.HandleEvent(
"error", boost::json::parse(R"({"reason":"something went wrong"})"));

// After the error, a fresh put + payload-transferred (without an
// intervening server-intent) emits a changeset containing only the
// post-error put.
handler.HandleEvent("put-object",
MakePutObject("flag", "fresh", kFlagJson));
auto result = handler.HandleEvent("payload-transferred",
MakePayloadTransferred("s", 1));

auto* cs = std::get_if<data_model::FDv2ChangeSet>(&result);
ASSERT_NE(cs, nullptr);
ASSERT_EQ(cs->changes.size(), 1u);
EXPECT_EQ(cs->changes[0].key, "fresh");
}

TEST(FDv2ProtocolHandlerTest, ErrorEventWithIdSetsServerId) {
FDv2ProtocolHandler handler;

Expand Down Expand Up @@ -353,3 +376,30 @@ TEST(FDv2ProtocolHandlerTest, PayloadTransferredWithoutServerIntentIsError) {
ASSERT_NE(err, nullptr);
EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kProtocolError);
}

TEST(FDv2ProtocolHandlerTest, ConsecutivePayloadsWithoutNewServerIntent) {
FDv2ProtocolHandler handler;

handler.HandleEvent("server-intent", MakeServerIntent("xfer-full"));
handler.HandleEvent("put-object",
MakePutObject("flag", "first", kFlagJson));
auto first = handler.HandleEvent("payload-transferred",
MakePayloadTransferred("s1", 1));
auto* cs1 = std::get_if<data_model::FDv2ChangeSet>(&first);
ASSERT_NE(cs1, nullptr);
ASSERT_EQ(cs1->changes.size(), 1u);
EXPECT_EQ(cs1->changes[0].key, "first");

// A subsequent payload arrives without an intervening server-intent
// (streaming incremental update): the handler emits a kPartial
// changeset reusing the prior intent.
handler.HandleEvent("put-object",
MakePutObject("flag", "second", kFlagJson));
auto second = handler.HandleEvent("payload-transferred",
MakePayloadTransferred("s2", 2));
auto* cs2 = std::get_if<data_model::FDv2ChangeSet>(&second);
ASSERT_NE(cs2, nullptr);
ASSERT_EQ(cs2->changes.size(), 1u);
EXPECT_EQ(cs2->changes[0].key, "second");
EXPECT_EQ(cs2->type, data_model::ChangeSetType::kPartial);
}
22 changes: 18 additions & 4 deletions libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "fdv2_polling_impl.hpp"
#include "../background_sync/detail/payload_filter_validation/payload_filter_validation.hpp"
#include "fdv2_changeset_translation.hpp"

#include <launchdarkly/network/http_error_messages.hpp>
Expand All @@ -11,7 +12,6 @@

namespace launchdarkly::server_side::data_systems {

static char const* const kFDv2PollPath = "/sdk/poll";
static char const* const kFDv1FallbackHeader = "X-LD-FD-Fallback";

static char const* const kErrorParsingBody =
Expand Down Expand Up @@ -44,7 +44,8 @@ network::HttpRequest MakeFDv2PollRequest(
config::built::ServiceEndpoints const& endpoints,
config::built::HttpProperties const& http_properties,
data_model::Selector const& selector,
std::optional<std::string> const& filter_key) {
std::optional<std::string> const& filter_key,
Logger const& logger) {
config::builders::HttpPropertiesBuilder const builder(http_properties);

auto parsed = boost::urls::parse_uri(endpoints.PollingBaseUrl());
Expand All @@ -54,12 +55,25 @@ network::HttpRequest MakeFDv2PollRequest(
}

boost::urls::url u = parsed.value();
u.set_path(u.path() + kFDv2PollPath);
// A trailing '/' on the base URL appears as an empty final segment;
// remove it so subsequent push_backs don't produce a double slash.
auto segs = u.segments();
Comment thread
beekld marked this conversation as resolved.
if (!segs.empty() && segs.back().empty()) {
segs.pop_back();
}
segs.push_back("sdk");
segs.push_back("poll");
if (selector.value) {
u.params().append({"basis", selector.value->state});
}
if (filter_key) {
u.params().append({"filter", *filter_key});
if (detail::ValidateFilterKey(*filter_key)) {
Comment thread
beekld marked this conversation as resolved.
u.params().append({"filter", *filter_key});
} else {
LD_LOG(logger, LogLevel::kError)
<< "data source config: provided filter is invalid, will "
"request full environment instead";
}
}

return {std::string(u.buffer()), network::HttpMethod::kGet, builder.Build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ network::HttpRequest MakeFDv2PollRequest(
config::built::ServiceEndpoints const& endpoints,
config::built::HttpProperties const& http_properties,
data_model::Selector const& selector,
std::optional<std::string> const& filter_key);
std::optional<std::string> const& filter_key,
Logger const& logger);

// Parse an HTTP response from the FDv2 polling endpoint through the protocol
// handler and return the appropriate result. identity is used in log messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ FDv2PollingInitializer::FDv2PollingInitializer(
: request_(MakeFDv2PollRequest(endpoints,
http_properties,
std::move(selector),
std::move(filter_key))),
std::move(filter_key),
logger)),
requester_(executor, http_properties.Tls()),
state_(std::make_shared<State>(logger)) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#include "polling_synchronizer.hpp"
#include "fdv2_polling_impl.hpp"

Expand Down Expand Up @@ -34,7 +34,7 @@
async::Future<network::HttpResult> FDv2PollingSynchronizer::State::Request(
data_model::Selector const& selector) const {
auto request = MakeFDv2PollRequest(endpoints_, http_properties_, selector,
filter_key_);
filter_key_, logger_);

// Promise must be in a shared_ptr because Requester requires callbacks
// to be copy-constructible (stored in std::function).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "streaming_synchronizer.hpp"
#include "../background_sync/detail/payload_filter_validation/payload_filter_validation.hpp"
#include "fdv2_changeset_translation.hpp"

#include <launchdarkly/async/timer.hpp>
Expand Down Expand Up @@ -74,10 +75,14 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(

boost::urls::url u = parsed.value();

// Safer way of appending /sdk/stream than string concatenation: avoids
// double slashes if the base URL has a trailing slash.
u.segments().push_back("sdk");
u.segments().push_back("stream");
// A trailing '/' on the base URL appears as an empty final segment;
// remove it so subsequent push_backs don't produce a double slash.
auto segs = u.segments();
if (!segs.empty() && segs.back().empty()) {
segs.pop_back();
}
segs.push_back("sdk");
segs.push_back("stream");

// basis and filter are not added here — they are appended per-connect by
// the on_connect hook (OnConnect), so that each (re)connection uses the
Expand Down Expand Up @@ -168,7 +173,13 @@ void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) {
u.params().set("basis", latest_selector_.value->state);
}
if (filter_key_) {
u.params().set("filter", *filter_key_);
if (detail::ValidateFilterKey(*filter_key_)) {
u.params().set("filter", *filter_key_);
} else {
LD_LOG(logger_, LogLevel::kError)
<< "data source config: provided filter is invalid, will "
"request full environment instead";
}
}
req->target(u.encoded_target());
}
Expand All @@ -183,6 +194,10 @@ void FDv2StreamingSynchronizer::State::OnResponse(
}

void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
if (!FDv2ProtocolHandler::IsKnownEvent(event.type())) {
return;
}

boost::system::error_code ec;
auto data = boost::json::parse(event.data(), ec);
if (ec) {
Expand All @@ -191,6 +206,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, std::move(msg))}});
std::lock_guard lock(mutex_);
if (sse_client_) {
sse_client_->async_restart("FDv2 parse error");
}
return;
}

Expand Down Expand Up @@ -250,6 +269,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
<< kIdentity << ": " << r.message;
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, r.message)}});
std::lock_guard lock(mutex_);
if (sse_client_) {
sse_client_->async_restart("FDv2 protocol error");
}
} else {
static_assert(always_false_v<T>, "non-exhaustive visitor");
}
Expand Down
45 changes: 45 additions & 0 deletions libs/server-sdk/tests/fdv2_polling_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <data_systems/fdv2/fdv2_polling_impl.hpp>

#include <launchdarkly/config/shared/defaults.hpp>
#include <launchdarkly/fdv2_protocol_handler.hpp>
#include <launchdarkly/logging/logger.hpp>
#include <launchdarkly/network/http_requester.hpp>
Expand Down Expand Up @@ -107,3 +108,47 @@ TEST(HandleFDv2PollResponseTest, NetworkErrorDoesNotSetFlag) {
std::holds_alternative<FDv2SourceResult::Interrupted>(result.value));
EXPECT_FALSE(result.fdv1_fallback);
}

TEST(MakeFDv2PollRequestTest, BaseWithTrailingSlashDoesNotProduceDoubleSlash) {
auto logger = MakeNullLogger();
config::shared::built::ServiceEndpoints endpoints{"http://example.com/", "",
""};
auto props =
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
std::nullopt, logger);
EXPECT_EQ(req.Url(), "http://example.com/sdk/poll");
}

TEST(MakeFDv2PollRequestTest, BaseWithSubpathTrailingSlashJoinsCleanly) {
auto logger = MakeNullLogger();
config::shared::built::ServiceEndpoints endpoints{
"http://example.com/relay/", "", ""};
auto props =
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
std::nullopt, logger);
EXPECT_EQ(req.Url(), "http://example.com/relay/sdk/poll");
}

TEST(MakeFDv2PollRequestTest, ValidFilterKeyIsIncluded) {
auto logger = MakeNullLogger();
config::shared::built::ServiceEndpoints endpoints{"http://example.com", "",
""};
auto props =
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
std::string{"my-filter_1.0"}, logger);
EXPECT_EQ(req.Url(), "http://example.com/sdk/poll?filter=my-filter_1.0");
}

TEST(MakeFDv2PollRequestTest, InvalidFilterKeyIsDropped) {
auto logger = MakeNullLogger();
config::shared::built::ServiceEndpoints endpoints{"http://example.com", "",
""};
auto props =
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
std::string{"has spaces"}, logger);
EXPECT_EQ(req.Url(), "http://example.com/sdk/poll");
}
Loading
Loading