diff --git a/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp index 11cfa7e4e..77b144935 100644 --- a/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp +++ b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp @@ -14,8 +14,11 @@ namespace launchdarkly { /** * Protocol state machine for the FDv2 wire format. * - * Accumulates put-object and delete-object events between a server-intent - * and payload-transferred event, then emits a complete FDv2ChangeSet. + * A server-intent opens a transfer cycle: put-object and delete-object + * events accumulate until a payload-transferred event, which emits an + * FDv2ChangeSet. The handler then remains active — subsequent put/delete + * + payload-transferred cycles emit kPartial changesets reusing the prior + * intent, until the server sends a new server-intent, error, or goodbye. * * Shared between the polling and streaming synchronizers. */ @@ -95,6 +98,13 @@ class FDv2ProtocolHandler { */ void Reset(); + /** + * @return true if event_type is one that the protocol handler recognizes + * and may dispatch on. Events outside this set are spec-defined as + * "unrecognized data that can be safely ignored". + */ + static bool IsKnownEvent(std::string_view event_type); + FDv2ProtocolHandler() = default; private: diff --git a/libs/internal/src/fdv2_protocol_handler.cpp b/libs/internal/src/fdv2_protocol_handler.cpp index 13b8e6823..c07b7fbfe 100644 --- a/libs/internal/src/fdv2_protocol_handler.cpp +++ b/libs/internal/src/fdv2_protocol_handler.cpp @@ -14,6 +14,12 @@ static char const* const kGoodbye = "goodbye"; using Error = FDv2ProtocolHandler::Error; +bool FDv2ProtocolHandler::IsKnownEvent(std::string_view event_type) { + return event_type == kServerIntent || event_type == kPutObject || + event_type == kDeleteObject || event_type == kPayloadTransferred || + event_type == kError || event_type == kGoodbye; +} + FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleEvent( std::string_view event_type, boost::json::value const& data) { @@ -78,9 +84,6 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleServerIntent( FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject( boost::json::value const& data) { - if (state_ == State::kInactive) { - return std::monostate{}; - } auto result = boost::json::value_to< tl::expected, JsonError>>(data); if (!result) { @@ -101,9 +104,6 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject( FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleDeleteObject( boost::json::value const& data) { - if (state_ == State::kInactive) { - return std::monostate{}; - } auto result = boost::json::value_to< tl::expected, JsonError>>(data); if (!result) { @@ -152,7 +152,10 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePayloadTransferred( type, std::move(changes_), data_model::Selector{data_model::Selector::State{transferred.version, transferred.state}}}; - Reset(); + // Transition to kPartial so subsequent put-object/payload-transferred + // cycles work without requiring a new server-intent. + changes_.clear(); + state_ = State::kPartial; return changeset; } @@ -160,7 +163,9 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleError( boost::json::value const& data) { auto result = boost::json::value_to< tl::expected, JsonError>>(data); - Reset(); + // Discard any partial-payload accumulation but keep state intact so + // the next put-object/payload-transferred cycle continues normally. + changes_.clear(); if (!result) { return Error::JsonParseError(result.error(), "could not deserialize error event"); diff --git a/libs/internal/tests/fdv2_protocol_handler_test.cpp b/libs/internal/tests/fdv2_protocol_handler_test.cpp index fb428c243..9e0cc8e18 100644 --- a/libs/internal/tests/fdv2_protocol_handler_test.cpp +++ b/libs/internal/tests/fdv2_protocol_handler_test.cpp @@ -235,6 +235,29 @@ TEST(FDv2ProtocolHandlerTest, EXPECT_TRUE(cs->changes.empty()); } +TEST(FDv2ProtocolHandlerTest, ErrorMidPayloadDiscardsPartialAcceptsSubsequent) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "abandoned", kFlagJson)); + handler.HandleEvent( + "error", boost::json::parse(R"({"reason":"something went wrong"})")); + + // After the error, a fresh put + payload-transferred (without an + // intervening server-intent) emits a changeset containing only the + // post-error put. + handler.HandleEvent("put-object", + MakePutObject("flag", "fresh", kFlagJson)); + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + ASSERT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "fresh"); +} + TEST(FDv2ProtocolHandlerTest, ErrorEventWithIdSetsServerId) { FDv2ProtocolHandler handler; @@ -353,3 +376,30 @@ TEST(FDv2ProtocolHandlerTest, PayloadTransferredWithoutServerIntentIsError) { ASSERT_NE(err, nullptr); EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kProtocolError); } + +TEST(FDv2ProtocolHandlerTest, ConsecutivePayloadsWithoutNewServerIntent) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "first", kFlagJson)); + auto first = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s1", 1)); + auto* cs1 = std::get_if(&first); + ASSERT_NE(cs1, nullptr); + ASSERT_EQ(cs1->changes.size(), 1u); + EXPECT_EQ(cs1->changes[0].key, "first"); + + // A subsequent payload arrives without an intervening server-intent + // (streaming incremental update): the handler emits a kPartial + // changeset reusing the prior intent. + handler.HandleEvent("put-object", + MakePutObject("flag", "second", kFlagJson)); + auto second = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s2", 2)); + auto* cs2 = std::get_if(&second); + ASSERT_NE(cs2, nullptr); + ASSERT_EQ(cs2->changes.size(), 1u); + EXPECT_EQ(cs2->changes[0].key, "second"); + EXPECT_EQ(cs2->type, data_model::ChangeSetType::kPartial); +} diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp index 260153737..1178aff46 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp @@ -1,4 +1,5 @@ #include "fdv2_polling_impl.hpp" +#include "../background_sync/detail/payload_filter_validation/payload_filter_validation.hpp" #include "fdv2_changeset_translation.hpp" #include @@ -11,7 +12,6 @@ namespace launchdarkly::server_side::data_systems { -static char const* const kFDv2PollPath = "/sdk/poll"; static char const* const kFDv1FallbackHeader = "X-LD-FD-Fallback"; static char const* const kErrorParsingBody = @@ -44,7 +44,8 @@ network::HttpRequest MakeFDv2PollRequest( config::built::ServiceEndpoints const& endpoints, config::built::HttpProperties const& http_properties, data_model::Selector const& selector, - std::optional const& filter_key) { + std::optional const& filter_key, + Logger const& logger) { config::builders::HttpPropertiesBuilder const builder(http_properties); auto parsed = boost::urls::parse_uri(endpoints.PollingBaseUrl()); @@ -54,12 +55,25 @@ network::HttpRequest MakeFDv2PollRequest( } boost::urls::url u = parsed.value(); - u.set_path(u.path() + kFDv2PollPath); + // A trailing '/' on the base URL appears as an empty final segment; + // remove it so subsequent push_backs don't produce a double slash. + auto segs = u.segments(); + if (!segs.empty() && segs.back().empty()) { + segs.pop_back(); + } + segs.push_back("sdk"); + segs.push_back("poll"); if (selector.value) { u.params().append({"basis", selector.value->state}); } if (filter_key) { - u.params().append({"filter", *filter_key}); + if (detail::ValidateFilterKey(*filter_key)) { + u.params().append({"filter", *filter_key}); + } else { + LD_LOG(logger, LogLevel::kError) + << "data source config: provided filter is invalid, will " + "request full environment instead"; + } } return {std::string(u.buffer()), network::HttpMethod::kGet, builder.Build(), diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp index 086d3c8db..047a4788d 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp @@ -19,7 +19,8 @@ network::HttpRequest MakeFDv2PollRequest( config::built::ServiceEndpoints const& endpoints, config::built::HttpProperties const& http_properties, data_model::Selector const& selector, - std::optional const& filter_key); + std::optional const& filter_key, + Logger const& logger); // Parse an HTTP response from the FDv2 polling endpoint through the protocol // handler and return the appropriate result. identity is used in log messages diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp index 05620161d..2f07d95b2 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp @@ -20,7 +20,8 @@ FDv2PollingInitializer::FDv2PollingInitializer( : request_(MakeFDv2PollRequest(endpoints, http_properties, std::move(selector), - std::move(filter_key))), + std::move(filter_key), + logger)), requester_(executor, http_properties.Tls()), state_(std::make_shared(logger)) {} diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp index 84df5e0f3..5fce4c07a 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp @@ -34,7 +34,7 @@ FDv2PollingSynchronizer::State::State( async::Future FDv2PollingSynchronizer::State::Request( data_model::Selector const& selector) const { auto request = MakeFDv2PollRequest(endpoints_, http_properties_, selector, - filter_key_); + filter_key_, logger_); // Promise must be in a shared_ptr because Requester requires callbacks // to be copy-constructible (stored in std::function). diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp index a87a648f2..c43483fc3 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -1,4 +1,5 @@ #include "streaming_synchronizer.hpp" +#include "../background_sync/detail/payload_filter_validation/payload_filter_validation.hpp" #include "fdv2_changeset_translation.hpp" #include @@ -74,10 +75,14 @@ void FDv2StreamingSynchronizer::State::EnsureStarted( boost::urls::url u = parsed.value(); - // Safer way of appending /sdk/stream than string concatenation: avoids - // double slashes if the base URL has a trailing slash. - u.segments().push_back("sdk"); - u.segments().push_back("stream"); + // A trailing '/' on the base URL appears as an empty final segment; + // remove it so subsequent push_backs don't produce a double slash. + auto segs = u.segments(); + if (!segs.empty() && segs.back().empty()) { + segs.pop_back(); + } + segs.push_back("sdk"); + segs.push_back("stream"); // basis and filter are not added here — they are appended per-connect by // the on_connect hook (OnConnect), so that each (re)connection uses the @@ -168,7 +173,13 @@ void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) { u.params().set("basis", latest_selector_.value->state); } if (filter_key_) { - u.params().set("filter", *filter_key_); + if (detail::ValidateFilterKey(*filter_key_)) { + u.params().set("filter", *filter_key_); + } else { + LD_LOG(logger_, LogLevel::kError) + << "data source config: provided filter is invalid, will " + "request full environment instead"; + } } req->target(u.encoded_target()); } @@ -183,6 +194,10 @@ void FDv2StreamingSynchronizer::State::OnResponse( } void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { + if (!FDv2ProtocolHandler::IsKnownEvent(event.type())) { + return; + } + boost::system::error_code ec; auto data = boost::json::parse(event.data(), ec); if (ec) { @@ -191,6 +206,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ MakeError(ErrorKind::kInvalidData, 0, std::move(msg))}}); + std::lock_guard lock(mutex_); + if (sse_client_) { + sse_client_->async_restart("FDv2 parse error"); + } return; } @@ -250,6 +269,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { << kIdentity << ": " << r.message; Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ MakeError(ErrorKind::kInvalidData, 0, r.message)}}); + std::lock_guard lock(mutex_); + if (sse_client_) { + sse_client_->async_restart("FDv2 protocol error"); + } } else { static_assert(always_false_v, "non-exhaustive visitor"); } diff --git a/libs/server-sdk/tests/fdv2_polling_impl_test.cpp b/libs/server-sdk/tests/fdv2_polling_impl_test.cpp index b4ec5c332..c9ed53da5 100644 --- a/libs/server-sdk/tests/fdv2_polling_impl_test.cpp +++ b/libs/server-sdk/tests/fdv2_polling_impl_test.cpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -107,3 +108,47 @@ TEST(HandleFDv2PollResponseTest, NetworkErrorDoesNotSetFlag) { std::holds_alternative(result.value)); EXPECT_FALSE(result.fdv1_fallback); } + +TEST(MakeFDv2PollRequestTest, BaseWithTrailingSlashDoesNotProduceDoubleSlash) { + auto logger = MakeNullLogger(); + config::shared::built::ServiceEndpoints endpoints{"http://example.com/", "", + ""}; + auto props = + config::shared::Defaults::HttpProperties(); + auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{}, + std::nullopt, logger); + EXPECT_EQ(req.Url(), "http://example.com/sdk/poll"); +} + +TEST(MakeFDv2PollRequestTest, BaseWithSubpathTrailingSlashJoinsCleanly) { + auto logger = MakeNullLogger(); + config::shared::built::ServiceEndpoints endpoints{ + "http://example.com/relay/", "", ""}; + auto props = + config::shared::Defaults::HttpProperties(); + auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{}, + std::nullopt, logger); + EXPECT_EQ(req.Url(), "http://example.com/relay/sdk/poll"); +} + +TEST(MakeFDv2PollRequestTest, ValidFilterKeyIsIncluded) { + auto logger = MakeNullLogger(); + config::shared::built::ServiceEndpoints endpoints{"http://example.com", "", + ""}; + auto props = + config::shared::Defaults::HttpProperties(); + auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{}, + std::string{"my-filter_1.0"}, logger); + EXPECT_EQ(req.Url(), "http://example.com/sdk/poll?filter=my-filter_1.0"); +} + +TEST(MakeFDv2PollRequestTest, InvalidFilterKeyIsDropped) { + auto logger = MakeNullLogger(); + config::shared::built::ServiceEndpoints endpoints{"http://example.com", "", + ""}; + auto props = + config::shared::Defaults::HttpProperties(); + auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{}, + std::string{"has spaces"}, logger); + EXPECT_EQ(req.Url(), "http://example.com/sdk/poll"); +} diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp index 115209eda..68687adb5 100644 --- a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -294,6 +294,29 @@ TEST(FDv2StreamingSynchronizerTest, OnConnectWithFilterKeyAppendsFilter) { EXPECT_EQ(req.target(), "/sdk/stream?filter=my-filter"); } +TEST(FDv2StreamingSynchronizerTest, OnConnectInvalidFilterKeyIsDropped) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::string("has spaces"), 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + + boost::beast::http::request req; + + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // A filter key that fails validation is silently dropped from the URL. + EXPECT_EQ(req.target(), "/sdk/stream"); +} + TEST(FDv2StreamingSynchronizerTest, OnConnectReconnectUsesLatestSelector) { auto logger = MakeNullLogger(); IoContextRunner runner; @@ -551,6 +574,26 @@ TEST(FDv2StreamingSynchronizerTest, ServerErrorEventReturnsInterrupted) { std::string::npos); } +TEST(FDv2StreamingSynchronizerTest, UnknownEventWithGarbageBodyIsIgnored) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + auto mock_client = std::make_shared(); + FDv2StreamingSynchronizerTestPeer::SetSseClient(synchronizer, mock_client); + + sse::Event unknown_with_garbage("whatever", "not json"); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + unknown_with_garbage); + + EXPECT_EQ(mock_client->restart_count_, 0); +} + TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) { auto logger = MakeNullLogger(); IoContextRunner runner; @@ -561,6 +604,9 @@ TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) { 1s); FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + auto mock_client = std::make_shared(); + FDv2StreamingSynchronizerTestPeer::SetSseClient(synchronizer, mock_client); + sse::Event bad_event("server-intent", "this is not json"); // Act: deliver an event whose data field cannot be parsed as JSON. @@ -569,13 +615,43 @@ TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) { auto result = future.WaitForResult(2s); // Assert: the synchronizer reports Interrupted{kInvalidData} so the - // orchestrator knows the stream produced unparseable bytes. + // orchestrator knows the stream produced unparseable bytes, and drives + // the SSE client to restart so the next connection starts clean. ASSERT_TRUE(result.has_value()); auto* interrupted = std::get_if(&result->value); ASSERT_NE(interrupted, nullptr); EXPECT_EQ(interrupted->error.Kind(), FDv2SourceResult::ErrorInfo::ErrorKind::kInvalidData); + EXPECT_EQ(mock_client->restart_count_, 1); +} + +TEST(FDv2StreamingSynchronizerTest, + SchemaViolationServerIntentTriggersRestart) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + auto mock_client = std::make_shared(); + FDv2StreamingSynchronizerTestPeer::SetSseClient(synchronizer, mock_client); + + // Well-formed JSON, but the shape doesn't match a server-intent payload. + sse::Event bad_event("server-intent", + R"({"data":{"flags":true,"segments":{}}})"); + + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, bad_event); + auto future = synchronizer.Next(data_model::Selector{}); + auto result = future.WaitForResult(2s); + + ASSERT_TRUE(result.has_value()); + ASSERT_NE(std::get_if(&result->value), + nullptr); + EXPECT_EQ(mock_client->restart_count_, 1); } TEST(FDv2StreamingSynchronizerTest, TranslationFailureReturnsInterrupted) {