diff --git a/libs/internal/include/launchdarkly/async/promise.hpp b/libs/internal/include/launchdarkly/async/promise.hpp index ca9370334..fa83a8b09 100644 --- a/libs/internal/include/launchdarkly/async/promise.hpp +++ b/libs/internal/include/launchdarkly/async/promise.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include namespace launchdarkly::async { diff --git a/libs/internal/include/launchdarkly/encoding/base_64.hpp b/libs/internal/include/launchdarkly/encoding/base_64.hpp index ae33c0eb3..0424a5fa6 100644 --- a/libs/internal/include/launchdarkly/encoding/base_64.hpp +++ b/libs/internal/include/launchdarkly/encoding/base_64.hpp @@ -13,4 +13,13 @@ namespace launchdarkly::encoding { */ std::string Base64UrlEncode(std::string const& input); +/** + * Return a standard base64 encoded version of the input string, using the + * RFC 4648 section 4 alphabet (with '+' and '/'). Unlike @ref Base64UrlEncode, + * this is NOT URL-safe. + * @param input The string to Base64 encode. + * @return The encoded value. + */ +std::string Base64Encode(std::string const& input); + } // namespace launchdarkly::encoding diff --git a/libs/internal/src/encoding/base_64.cpp b/libs/internal/src/encoding/base_64.cpp index 3d9ac4dfe..7b01bd7f4 100644 --- a/libs/internal/src/encoding/base_64.cpp +++ b/libs/internal/src/encoding/base_64.cpp @@ -1,10 +1,13 @@ #include +#include + #include #include #include #include #include +#include static unsigned char const kEncodeSize = 4; static unsigned char const kInputBytesPerEncodeSize = 3; @@ -75,4 +78,17 @@ std::string Base64UrlEncode(std::string const& input) { return out; } +std::string Base64Encode(std::string const& input) { + if (input.empty()) { + return {}; + } + // EVP_EncodeBlock writes 4 output characters per 3 input bytes (rounded up) + // plus a NUL terminator. + std::vector out(4 * ((input.size() + 2) / 3) + 1); + int const written = EVP_EncodeBlock( + out.data(), reinterpret_cast(input.data()), + static_cast(input.size())); + return std::string(reinterpret_cast(out.data()), written); +} + } // namespace launchdarkly::encoding diff --git a/libs/internal/tests/base_64_test.cpp b/libs/internal/tests/base_64_test.cpp index ffef5fcab..4b8c601b6 100644 --- a/libs/internal/tests/base_64_test.cpp +++ b/libs/internal/tests/base_64_test.cpp @@ -2,10 +2,11 @@ #include "launchdarkly/encoding/base_64.hpp" +using launchdarkly::encoding::Base64Encode; using launchdarkly::encoding::Base64UrlEncode; TEST(Base64Encoding, CanEncodeString) { - // Test vectors from RFC4668 + // Test vectors from RFC4648 // https://datatracker.ietf.org/doc/html/rfc4648#section-10 EXPECT_EQ("", Base64UrlEncode("")); EXPECT_EQ("Zg==", Base64UrlEncode("f")); @@ -15,3 +16,24 @@ TEST(Base64Encoding, CanEncodeString) { EXPECT_EQ("Zm9vYmE=", Base64UrlEncode("fooba")); EXPECT_EQ("Zm9vYmFy", Base64UrlEncode("foobar")); } + +TEST(Base64Encoding, StandardCanEncodeString) { + // Test vectors from RFC4648 + // https://datatracker.ietf.org/doc/html/rfc4648#section-10 + EXPECT_EQ("", Base64Encode("")); + EXPECT_EQ("Zg==", Base64Encode("f")); + EXPECT_EQ("Zm8=", Base64Encode("fo")); + EXPECT_EQ("Zm9v", Base64Encode("foo")); + EXPECT_EQ("Zm9vYg==", Base64Encode("foob")); + EXPECT_EQ("Zm9vYmE=", Base64Encode("fooba")); + EXPECT_EQ("Zm9vYmFy", Base64Encode("foobar")); +} + +TEST(Base64Encoding, StandardUsesNonUrlSafeAlphabet) { + // "???" encodes to a value ending in '/' under the standard alphabet and + // '_' under the URL-safe one; ">>>" exercises '+' versus '-'. + EXPECT_EQ("Pz8/", Base64Encode("???")); + EXPECT_EQ("Pz8_", Base64UrlEncode("???")); + EXPECT_EQ("Pj4+", Base64Encode(">>>")); + EXPECT_EQ("Pj4-", Base64UrlEncode(">>>")); +} diff --git a/libs/server-sdk-dynamodb-source/src/dynamodb_big_segment_store.cpp b/libs/server-sdk-dynamodb-source/src/dynamodb_big_segment_store.cpp index da8e3dc65..3dc633eb8 100644 --- a/libs/server-sdk-dynamodb-source/src/dynamodb_big_segment_store.cpp +++ b/libs/server-sdk-dynamodb-source/src/dynamodb_big_segment_store.cpp @@ -89,10 +89,9 @@ IBigSegmentStore::GetMembershipResult DynamoDBBigSegmentStore::GetMembership( it != item.end()) { if (it->second.GetType() != Aws::DynamoDB::Model::ValueType::STRING_SET) { - return tl::make_unexpected( - std::string("DynamoDB Big Segments '") + - kBigSegmentsIncludedAttribute + - "' is not of type STRING_SET"); + return tl::make_unexpected(std::string("DynamoDB Big Segments '") + + kBigSegmentsIncludedAttribute + + "' is not of type STRING_SET"); } for (auto const& ref : it->second.GetSS()) { included.emplace_back(ref); @@ -102,10 +101,9 @@ IBigSegmentStore::GetMembershipResult DynamoDBBigSegmentStore::GetMembership( it != item.end()) { if (it->second.GetType() != Aws::DynamoDB::Model::ValueType::STRING_SET) { - return tl::make_unexpected( - std::string("DynamoDB Big Segments '") + - kBigSegmentsExcludedAttribute + - "' is not of type STRING_SET"); + return tl::make_unexpected(std::string("DynamoDB Big Segments '") + + kBigSegmentsExcludedAttribute + + "' is not of type STRING_SET"); } for (auto const& ref : it->second.GetSS()) { excluded.emplace_back(ref); @@ -157,7 +155,9 @@ IBigSegmentStore::GetMetadataResult DynamoDBBigSegmentStore::GetMetadata() "DynamoDB Big Segments 'synchronizedOn' is not a valid integer"); } - return StoreMetadata{std::chrono::milliseconds{parsed}}; + // The stored value is a Unix-epoch millisecond count: system_clock's epoch. + return StoreMetadata{std::chrono::system_clock::time_point{ + std::chrono::milliseconds{parsed}}}; } } // namespace launchdarkly::server_side::integrations diff --git a/libs/server-sdk-dynamodb-source/tests/dynamodb_big_segment_store_test.cpp b/libs/server-sdk-dynamodb-source/tests/dynamodb_big_segment_store_test.cpp index 426cbd9bb..d367bfbfb 100644 --- a/libs/server-sdk-dynamodb-source/tests/dynamodb_big_segment_store_test.cpp +++ b/libs/server-sdk-dynamodb-source/tests/dynamodb_big_segment_store_test.cpp @@ -163,7 +163,8 @@ TEST_F(DynamoDBBigSegmentTests, GetMetadataWithEmptyPrefix) { ASSERT_TRUE(result); ASSERT_TRUE(result->has_value()); ASSERT_EQ(result->value().last_up_to_date, - std::chrono::milliseconds{1700000000000LL}); + std::chrono::system_clock::time_point{ + std::chrono::milliseconds{1700000000000LL}}); } TEST_F(DynamoDBBigSegmentTests, GetMembershipRejectsMalformedIncluded) { @@ -182,7 +183,8 @@ TEST_F(DynamoDBBigSegmentTests, GetMetadataReturnsSyncTime) { ASSERT_TRUE(result); ASSERT_TRUE(result->has_value()); ASSERT_EQ(result->value().last_up_to_date, - std::chrono::milliseconds{1700000000000LL}); + std::chrono::system_clock::time_point{ + std::chrono::milliseconds{1700000000000LL}}); } TEST_F(DynamoDBBigSegmentTests, GetMetadataAbsentSyncTimeReturnsNoMetadata) { diff --git a/libs/server-sdk-redis-source/src/redis_big_segment_store.cpp b/libs/server-sdk-redis-source/src/redis_big_segment_store.cpp index 94f14c0cf..7136bbf78 100644 --- a/libs/server-sdk-redis-source/src/redis_big_segment_store.cpp +++ b/libs/server-sdk-redis-source/src/redis_big_segment_store.cpp @@ -98,7 +98,9 @@ IBigSegmentStore::GetMetadataResult RedisBigSegmentStore::GetMetadata() const { "Redis Big Segments synchronized_on is not a valid integer"); } - return StoreMetadata{std::chrono::milliseconds{parsed}}; + // The stored value is a Unix-epoch millisecond count: system_clock's epoch. + return StoreMetadata{std::chrono::system_clock::time_point{ + std::chrono::milliseconds{parsed}}}; } } // namespace launchdarkly::server_side::integrations diff --git a/libs/server-sdk-redis-source/tests/redis_big_segment_store_test.cpp b/libs/server-sdk-redis-source/tests/redis_big_segment_store_test.cpp index 3817a6374..9ba3c1a49 100644 --- a/libs/server-sdk-redis-source/tests/redis_big_segment_store_test.cpp +++ b/libs/server-sdk-redis-source/tests/redis_big_segment_store_test.cpp @@ -143,7 +143,8 @@ TEST_F(RedisBigSegmentTests, GetMetadataWithEmptyPrefix) { ASSERT_TRUE(result); ASSERT_TRUE(result->has_value()); ASSERT_EQ(result->value().last_up_to_date, - std::chrono::milliseconds{1700000000000LL}); + std::chrono::system_clock::time_point{ + std::chrono::milliseconds{1700000000000LL}}); } TEST_F(RedisBigSegmentTests, GetMetadataReturnsSyncTime) { @@ -153,7 +154,8 @@ TEST_F(RedisBigSegmentTests, GetMetadataReturnsSyncTime) { ASSERT_TRUE(result); ASSERT_TRUE(result->has_value()); ASSERT_EQ(result->value().last_up_to_date, - std::chrono::milliseconds{1700000000000LL}); + std::chrono::system_clock::time_point{ + std::chrono::milliseconds{1700000000000LL}}); } TEST_F(RedisBigSegmentTests, GetMetadataRejectsMalformedSyncTime) { diff --git a/libs/server-sdk/include/launchdarkly/server_side/integrations/big_segments/big_segment_store_types.hpp b/libs/server-sdk/include/launchdarkly/server_side/integrations/big_segments/big_segment_store_types.hpp index a3ea7abaa..0b86e6517 100644 --- a/libs/server-sdk/include/launchdarkly/server_side/integrations/big_segments/big_segment_store_types.hpp +++ b/libs/server-sdk/include/launchdarkly/server_side/integrations/big_segments/big_segment_store_types.hpp @@ -87,11 +87,11 @@ class Membership { */ struct StoreMetadata { /** - * @brief Wall-clock timestamp (Unix epoch) at which the data populator - * (e.g. the LaunchDarkly Relay Proxy) last confirmed it had pushed all - * pending Big Segments updates to the store. + * @brief Wall-clock instant at which the data populator (e.g. the + * LaunchDarkly Relay Proxy) last confirmed it had pushed all pending Big + * Segments updates to the store. */ - std::chrono::milliseconds last_up_to_date; + std::chrono::system_clock::time_point last_up_to_date; }; } // namespace launchdarkly::server_side::integrations diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 29da3c307..94e12f113 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -44,6 +44,11 @@ target_sources(${LIBNAME} data_components/dependency_tracker/dependency_tracker.cpp data_components/expiration_tracker/expiration_tracker.hpp data_components/expiration_tracker/expiration_tracker.cpp + data_components/big_segments/big_segments_status.hpp + data_components/big_segments/membership_cache.hpp + data_components/big_segments/membership_cache.cpp + data_components/big_segments/big_segment_store_wrapper.hpp + data_components/big_segments/big_segment_store_wrapper.cpp data_interfaces/destination/itransactional_destination.hpp data_components/memory_store/memory_store.hpp data_components/memory_store/memory_store.cpp diff --git a/libs/server-sdk/src/data_components/big_segments/big_segment_store_wrapper.cpp b/libs/server-sdk/src/data_components/big_segments/big_segment_store_wrapper.cpp new file mode 100644 index 000000000..287261765 --- /dev/null +++ b/libs/server-sdk/src/data_components/big_segments/big_segment_store_wrapper.cpp @@ -0,0 +1,194 @@ +#include "big_segment_store_wrapper.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_components { + +namespace { +// The store is keyed by base64(sha256(contextKey)), matching what the Relay +// Proxy writes; raw context keys are never sent to the store. +std::string HashContextKey(std::string const& context_key) { + auto const digest = encoding::Sha256String(context_key); + return encoding::Base64Encode(std::string( + reinterpret_cast(digest.data()), digest.size())); +} +} // namespace + +BigSegmentStoreWrapper::BigSegmentStoreWrapper( + config::built::BigSegmentsConfig const& config, + boost::asio::any_io_executor executor, + Logger const& logger) + : store_(config.store), + stale_after_(config.stale_after), + poll_interval_(config.status_poll_interval), + logger_(logger), + executor_(std::move(executor)), + cache_(config.context_cache_size, config.context_cache_time) {} + +BigSegmentStoreWrapper::~BigSegmentStoreWrapper() { + poll_cancel_.Cancel(); +} + +void BigSegmentStoreWrapper::Start() { + ScheduleNextPoll(); +} + +BigSegmentStoreWrapper::GetMembershipResult +BigSegmentStoreWrapper::GetMembership(std::string const& context_key) { + auto membership = cache_.Get(context_key); + if (!membership.has_value()) { + auto const result = LoadMembership(context_key); + if (!result.has_value()) { + LD_LOG(logger_, LogLevel::kError) + << "Big Segment store returned error: " << result.error(); + MarkStoreUnavailable(); + return {integrations::Membership::FromSegmentRefs({}, {}), + BigSegmentsStatus::kStoreError}; + } + membership = *result; + } + + auto const status = GetStatus().stale ? BigSegmentsStatus::kStale + : BigSegmentsStatus::kHealthy; + return {std::move(*membership), status}; +} + +BigSegmentStoreWrapper::StoreMembership BigSegmentStoreWrapper::LoadMembership( + std::string const& context_key) { + std::shared_ptr query; + bool is_leader = false; + { + std::lock_guard lock(load_mutex_); + auto const it = in_flight_.find(context_key); + if (it != in_flight_.end()) { + query = it->second; + } else { + query = std::make_shared(); + in_flight_.emplace(context_key, query); + is_leader = true; + } + } + + if (!is_leader) { + std::unique_lock lock(query->mutex); + query->cv.wait(lock, [&query] { return query->result.has_value(); }); + return *query->result; + } + + // Query the store outside any lock, then publish the result to waiters and + // drop the in-flight entry. + auto result = store_->GetMembership(HashContextKey(context_key)); + if (result.has_value()) { + cache_.Set(context_key, *result); + } + { + std::lock_guard lock(load_mutex_); + in_flight_.erase(context_key); + } + { + std::lock_guard lock(query->mutex); + query->result = result; + } + query->cv.notify_all(); + return result; +} + +void BigSegmentStoreWrapper::MarkStoreUnavailable() { + BigSegmentStoreStatus new_status; + bool changed; + { + std::lock_guard lock(status_mutex_); + new_status.available = false; + new_status.stale = last_status_.has_value() && last_status_->stale; + changed = !last_status_.has_value() || *last_status_ != new_status; + last_status_ = new_status; + } + if (changed) { + status_signal_(new_status); + } +} + +void BigSegmentStoreWrapper::ScheduleNextPoll() { + auto weak_self = weak_from_this(); + async::Delay(executor_, poll_interval_, poll_cancel_.GetToken()) + .Then( + [weak_self](bool const& fired_normally) -> std::monostate { + if (!fired_normally) { + // Cancelled (wrapper destroyed); stop polling. + return {}; + } + if (auto self = weak_self.lock()) { + self->PollStoreAndUpdateStatus(); + self->ScheduleNextPoll(); + } + return {}; + }, + async::kInlineExecutor); +} + +BigSegmentStoreStatus BigSegmentStoreWrapper::GetStatus() { + { + std::lock_guard lock(status_mutex_); + if (last_status_.has_value()) { + return *last_status_; + } + } + // No poll has completed yet; do one now so the caller gets an accurate + // staleness reading rather than a default. + return PollStoreAndUpdateStatus(); +} + +BigSegmentStoreStatus BigSegmentStoreWrapper::PollStoreAndUpdateStatus() { + // Query the store outside the lock so a slow store doesn't block callers. + auto const metadata = store_->GetMetadata(); + if (!metadata.has_value()) { + LD_LOG(logger_, LogLevel::kError) + << "Big Segment store status query returned error: " + << metadata.error(); + } + + BigSegmentStoreStatus new_status; + bool changed; + { + std::lock_guard lock(status_mutex_); + if (metadata.has_value()) { + new_status.available = true; + new_status.stale = + !metadata->has_value() || IsStale((*metadata)->last_up_to_date); + } else { + new_status.available = false; + // Availability is bad; carry the last-known staleness forward. + new_status.stale = last_status_.has_value() && last_status_->stale; + } + changed = !last_status_.has_value() || *last_status_ != new_status; + last_status_ = new_status; + } + + // Broadcast after releasing the lock so a listener can't deadlock against + // it. Listeners are notified on the first poll and on any change. + if (changed) { + status_signal_(new_status); + } + return new_status; +} + +bool BigSegmentStoreWrapper::IsStale( + std::chrono::system_clock::time_point last_up_to_date) const { + return (std::chrono::system_clock::now() - last_up_to_date) >= stale_after_; +} + +std::unique_ptr BigSegmentStoreWrapper::OnStatusChange( + std::function handler) { + return std::make_unique( + status_signal_.connect(std::move(handler))); +} + +} // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/src/data_components/big_segments/big_segment_store_wrapper.hpp b/libs/server-sdk/src/data_components/big_segments/big_segment_store_wrapper.hpp new file mode 100644 index 000000000..295382e68 --- /dev/null +++ b/libs/server-sdk/src/data_components/big_segments/big_segment_store_wrapper.hpp @@ -0,0 +1,154 @@ +#pragma once + +#include "big_segments_status.hpp" +#include "membership_cache.hpp" + +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_components { + +/** + * @brief Internal layer between the evaluator and a customer-provided + * @ref integrations::IBigSegmentStore. + * + * Adds context-key hashing, an LRU membership cache, a background task that + * polls the store's metadata to track availability and staleness, and a + * status-change broadcaster. + * + * Construct with @ref std::make_shared and call @ref Start once to begin + * background polling; polling stops when the wrapper is destroyed. + * + * Thread-safe: @ref GetMembership, @ref GetStatus, and @ref OnStatusChange may + * be called concurrently from any number of evaluation threads, including while + * the background poll runs on the executor. The store is only ever queried with + * the relevant lock released, so a slow store never blocks unrelated callers. + */ +class BigSegmentStoreWrapper + : public std::enable_shared_from_this { + public: + /** + * @param config Resolved Big Segments configuration. + * @param executor Executor the background poll runs on. + * @param logger Used for store-error and debug logging. Must outlive the + * wrapper. + */ + BigSegmentStoreWrapper(config::built::BigSegmentsConfig const& config, + boost::asio::any_io_executor executor, + Logger const& logger); + + ~BigSegmentStoreWrapper(); + + BigSegmentStoreWrapper(BigSegmentStoreWrapper const&) = delete; + BigSegmentStoreWrapper(BigSegmentStoreWrapper&&) = delete; + BigSegmentStoreWrapper& operator=(BigSegmentStoreWrapper const&) = delete; + BigSegmentStoreWrapper& operator=(BigSegmentStoreWrapper&&) = delete; + + /** + * @brief Begins periodic metadata polling on the executor. Call once after + * construction. + */ + void Start(); + + struct GetMembershipResult { + integrations::Membership membership; + BigSegmentsStatus status; + }; + + /** + * @brief Returns a context's Big Segments membership, with a status + * describing how trustworthy the answer is. + * + * May be called concurrently from multiple evaluation threads. If the + * status is @ref BigSegmentsStatus::kStoreError the lookup failed and the + * returned membership is empty. + * + * @param context_key The unhashed context key. + */ + [[nodiscard]] GetMembershipResult GetMembership( + std::string const& context_key); + + /** + * @brief Returns the current store health. If no metadata poll has + * completed yet, performs one synchronously on the calling thread first, so + * the first caller still gets an accurate staleness reading. + */ + [[nodiscard]] BigSegmentStoreStatus GetStatus(); + + /** + * @brief Registers a listener invoked whenever the store status changes + * (not on every poll). The returned connection unregisters on destruction. + */ + std::unique_ptr OnStatusChange( + std::function handler); + + private: + using StoreMembership = integrations::IBigSegmentStore::GetMembershipResult; + + // A store query shared by all callers that miss the same key concurrently: + // the leader fills result and notifies; waiters block on cv until then. + struct InFlightQuery { + std::mutex mutex; + std::condition_variable cv; + std::optional result; + }; + + // Returns the membership for a key, querying the store on a cache miss and + // coalescing concurrent misses of the same key into one query. + [[nodiscard]] StoreMembership LoadMembership( + std::string const& context_key); + + // Marks the store unavailable after an evaluation-time error, preserving + // the last-known staleness, and broadcasts if the status changed. + void MarkStoreUnavailable(); + + // Queries store metadata, atomically updates the status, broadcasts if it + // changed, and returns the new status. + BigSegmentStoreStatus PollStoreAndUpdateStatus(); + + [[nodiscard]] bool IsStale( + std::chrono::system_clock::time_point last_up_to_date) const; + + // Arms a one-shot delay that polls then re-arms itself, until cancelled. + void ScheduleNextPoll(); + + std::shared_ptr const store_; + std::chrono::milliseconds const stale_after_; + std::chrono::milliseconds const poll_interval_; + Logger const& logger_; + + boost::asio::any_io_executor executor_; + + // Cancels the pending poll delay on destruction, ending the poll loop. + async::CancellationSource poll_cancel_; + + MembershipCache cache_; + + // In-flight store queries keyed by context key. Protected by load_mutex_. + std::mutex load_mutex_; + std::unordered_map> in_flight_; + + // Protected by status_mutex_; nullopt until the first poll completes. + std::mutex status_mutex_; + std::optional last_status_; + + boost::signals2::signal status_signal_; +}; + +} // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/src/data_components/big_segments/big_segments_status.hpp b/libs/server-sdk/src/data_components/big_segments/big_segments_status.hpp new file mode 100644 index 000000000..9089f1792 --- /dev/null +++ b/libs/server-sdk/src/data_components/big_segments/big_segments_status.hpp @@ -0,0 +1,38 @@ +#pragma once + +namespace launchdarkly::server_side::data_components { + +/** + * @brief Trustworthiness of a Big Segments membership answer, surfaced on an + * evaluation's reason. + * + * - kHealthy: the store was queried successfully and its data is up to date. + * - kStale: queried successfully, but the data may be out of date. + * - kStoreError: the store query failed. + * - kNotConfigured: a flag referenced a Big Segment but no store is wired up, + * or the segment has no generation. + */ +enum class BigSegmentsStatus { kHealthy, kStale, kStoreError, kNotConfigured }; + +/** + * @brief Health of the Big Segments store as a whole, independent of any single + * context's membership. + */ +struct BigSegmentStoreStatus { + // The most recent store query or metadata poll succeeded. + bool available{false}; + // The store's last-known update is older than the configured threshold. + bool stale{false}; +}; + +inline bool operator==(BigSegmentStoreStatus const& a, + BigSegmentStoreStatus const& b) { + return a.available == b.available && a.stale == b.stale; +} + +inline bool operator!=(BigSegmentStoreStatus const& a, + BigSegmentStoreStatus const& b) { + return !(a == b); +} + +} // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/src/data_components/big_segments/membership_cache.cpp b/libs/server-sdk/src/data_components/big_segments/membership_cache.cpp new file mode 100644 index 000000000..9f1c1a41c --- /dev/null +++ b/libs/server-sdk/src/data_components/big_segments/membership_cache.cpp @@ -0,0 +1,70 @@ +#include "membership_cache.hpp" + +#include + +namespace launchdarkly::server_side::data_components { + +MembershipCache::MembershipCache( + std::size_t capacity, + std::chrono::milliseconds ttl, + std::function clock) + : capacity_(capacity), ttl_(ttl), clock_(std::move(clock)) {} + +std::optional MembershipCache::Get( + std::string const& key) { + std::lock_guard lock(mutex_); + + auto const it = entries_.find(key); + if (it == entries_.end()) { + return std::nullopt; + } + + if (clock_() >= it->second.expires_at) { + recency_.erase(it->second.recency_position); + entries_.erase(it); + return std::nullopt; + } + + // Move to front as most-recently-used. + recency_.splice(recency_.begin(), recency_, it->second.recency_position); + return it->second.membership; +} + +void MembershipCache::Set(std::string const& key, + integrations::Membership membership) { + std::lock_guard lock(mutex_); + + auto const expires_at = clock_() + ttl_; + + auto const it = entries_.find(key); + if (it != entries_.end()) { + recency_.splice(recency_.begin(), recency_, + it->second.recency_position); + it->second.membership = std::move(membership); + it->second.expires_at = expires_at; + return; + } + + recency_.push_front(key); + entries_.emplace( + key, Entry{recency_.begin(), std::move(membership), expires_at}); + + if (entries_.size() > capacity_) { + auto const& lru_key = recency_.back(); + entries_.erase(lru_key); + recency_.pop_back(); + } +} + +void MembershipCache::Clear() { + std::lock_guard lock(mutex_); + entries_.clear(); + recency_.clear(); +} + +std::size_t MembershipCache::Size() const { + std::lock_guard lock(mutex_); + return entries_.size(); +} + +} // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/src/data_components/big_segments/membership_cache.hpp b/libs/server-sdk/src/data_components/big_segments/membership_cache.hpp new file mode 100644 index 000000000..4f148553e --- /dev/null +++ b/libs/server-sdk/src/data_components/big_segments/membership_cache.hpp @@ -0,0 +1,93 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_components { + +/** + * @brief Bounded, time-expiring cache of Big Segment membership lookups, keyed + * by the unhashed context key. + * + * The cache holds at most a fixed number of entries; inserting beyond that + * evicts the least-recently-used entry. Each entry also has a time-to-live. + * + * Expiration is lazy — there is no background reaper. An expired entry is + * detected and dropped only when @ref Get is next called for its key (reported + * as a miss). Until then it keeps occupying a slot, counts toward @ref Size, + * and is eligible for normal LRU eviction like any live entry. The cache never + * exceeds its capacity regardless, because @ref Set evicts on insert. + * + * Thread-safe: every method is guarded by an internal mutex and may be called + * concurrently. Concurrent-load deduplication (querying the store once when + * several callers miss the same key at once) is not handled here; that is the + * caller's responsibility. + */ +class MembershipCache { + public: + /** + * @param capacity Maximum number of entries retained before LRU eviction. + * @param ttl How long an entry remains valid after insertion. + * @param clock Source of the current time, injectable for testing. + * Defaults to the steady clock. + */ + MembershipCache( + std::size_t capacity, + std::chrono::milliseconds ttl, + std::function clock = [] { + return std::chrono::steady_clock::now(); + }); + + /** + * @brief Returns the cached membership for a context key, or nullopt if the + * key is absent or its entry has expired. A hit refreshes the entry's LRU + * recency but not its expiration; an expired entry is removed. + */ + [[nodiscard]] std::optional Get( + std::string const& key); + + /** + * @brief Inserts or replaces the membership for a context key, marking it + * most-recently-used and resetting its expiration. Evicts the least- + * recently-used entry if this pushes the cache over capacity. + */ + void Set(std::string const& key, integrations::Membership membership); + + /** + * @brief Removes all entries. + */ + void Clear(); + + /** + * @brief Returns the number of entries currently held, including any that + * have expired but not yet been pruned. + */ + [[nodiscard]] std::size_t Size() const; + + private: + struct Entry { + // Position of this key in recency_, for O(1) LRU updates. + std::list::iterator recency_position; + integrations::Membership membership; + std::chrono::steady_clock::time_point expires_at; + }; + + std::size_t const capacity_; + std::chrono::milliseconds const ttl_; + std::function const clock_; + + mutable std::mutex mutex_; + // Most-recently-used at the front, least at the back. Protected by mutex_. + std::list recency_; + std::unordered_map entries_; +}; + +} // namespace launchdarkly::server_side::data_components diff --git a/libs/server-sdk/tests/big_segment_store_wrapper_test.cpp b/libs/server-sdk/tests/big_segment_store_wrapper_test.cpp new file mode 100644 index 000000000..3fb8945e8 --- /dev/null +++ b/libs/server-sdk/tests/big_segment_store_wrapper_test.cpp @@ -0,0 +1,368 @@ +#include + +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +using launchdarkly::server_side::data_components::BigSegmentsStatus; +using launchdarkly::server_side::data_components::BigSegmentStoreStatus; +using launchdarkly::server_side::data_components::BigSegmentStoreWrapper; +namespace integrations = launchdarkly::server_side::integrations; +namespace built = launchdarkly::server_side::config::built; +using namespace std::chrono_literals; + +namespace { + +// In-memory store whose metadata response the test controls. Records how many +// times GetMetadata was called and lets a test block until a target count. +class FakeBigSegmentStore : public integrations::IBigSegmentStore { + public: + GetMembershipResult GetMembership(std::string const&) const override { + std::unique_lock lock(mutex_); + ++membership_calls_; + cv_.notify_all(); + // If gated, hold the query "in flight" until ReleaseMembership opens + // it. + cv_.wait(lock, [this] { return gate_open_; }); + return membership_; + } + + GetMetadataResult GetMetadata() const override { + std::lock_guard lock(mutex_); + ++metadata_calls_; + cv_.notify_all(); + return metadata_; + } + + void SetMembership(GetMembershipResult membership) { + std::lock_guard lock(mutex_); + membership_ = std::move(membership); + } + + void SetMetadata(GetMetadataResult metadata) { + std::lock_guard lock(mutex_); + metadata_ = std::move(metadata); + } + + int MembershipCalls() const { + std::lock_guard lock(mutex_); + return membership_calls_; + } + + int MetadataCalls() const { + std::lock_guard lock(mutex_); + return metadata_calls_; + } + + void WaitForMembershipCalls(int n, std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + cv_.wait_for(lock, timeout, [&] { return membership_calls_ >= n; }); + } + + void WaitForMetadataCalls(int n, std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + cv_.wait_for(lock, timeout, [&] { return metadata_calls_ >= n; }); + } + + void BlockMembership() { + std::lock_guard lock(mutex_); + gate_open_ = false; + } + + void ReleaseMembership() { + { + std::lock_guard lock(mutex_); + gate_open_ = true; + } + cv_.notify_all(); + } + + private: + mutable std::mutex mutex_; + // Notified whenever a call is recorded or the gate opens. Tests wait on it + // for call counts; a gated GetMembership waits on it for the gate to open. + mutable std::condition_variable cv_; + + // All of the following are protected by mutex_. + mutable int metadata_calls_ = 0; + mutable int membership_calls_ = 0; + GetMetadataResult metadata_ = + std::optional{std::nullopt}; + GetMembershipResult membership_ = + integrations::Membership::FromSegmentRefs({}, {}); + // When false, GetMembership blocks until ReleaseMembership(). + bool gate_open_ = true; +}; + +// Collects status broadcasts and lets a test block until a target count. +class StatusCollector { + public: + void Add(BigSegmentStoreStatus status) { + std::lock_guard lock(mutex_); + statuses_.push_back(status); + cv_.notify_all(); + } + + bool WaitForCount(std::size_t n, std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, + [&] { return statuses_.size() >= n; }); + } + + std::vector Statuses() const { + std::lock_guard lock(mutex_); + return statuses_; + } + + private: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::vector statuses_; +}; + +built::BigSegmentsConfig MakeConfig( + std::shared_ptr store, + std::chrono::milliseconds poll_interval, + std::chrono::milliseconds stale_after) { + built::BigSegmentsConfig config; + config.store = std::move(store); + config.context_cache_size = 1000; + config.context_cache_time = 5s; + config.status_poll_interval = poll_interval; + config.stale_after = stale_after; + return config; +} + +// Builds a wrapper over a store with the given metadata and returns the status +// from its initial synchronous poll. The executor is never run, since GetStatus +// polls inline when no background poll has happened yet. +BigSegmentStoreStatus StatusForMetadata( + integrations::IBigSegmentStore::GetMetadataResult metadata) { + auto store = std::make_shared(); + store->SetMetadata(std::move(metadata)); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5s, /*stale_after=*/2min), + ioc.get_executor(), logger); + return wrapper->GetStatus(); +} + +} // namespace + +TEST(BigSegmentStoreWrapperStatusTest, FreshMetadataIsHealthy) { + auto status = StatusForMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now()}); + EXPECT_TRUE(status.available); + EXPECT_FALSE(status.stale); +} + +TEST(BigSegmentStoreWrapperStatusTest, OldMetadataIsStale) { + auto status = StatusForMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now() - 5min}); + EXPECT_TRUE(status.available); + EXPECT_TRUE(status.stale); +} + +TEST(BigSegmentStoreWrapperStatusTest, NeverSyncedIsStale) { + auto status = StatusForMetadata( + std::optional{std::nullopt}); + EXPECT_TRUE(status.available); + EXPECT_TRUE(status.stale); +} + +TEST(BigSegmentStoreWrapperStatusTest, StoreErrorIsUnavailable) { + auto status = StatusForMetadata(tl::make_unexpected("boom")); + EXPECT_FALSE(status.available); + EXPECT_FALSE(status.stale); +} + +TEST(BigSegmentStoreWrapperStatusTest, GetStatusCachesFirstPollResult) { + auto store = std::make_shared(); + store->SetMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now()}); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5s, /*stale_after=*/2min), + ioc.get_executor(), logger); + + auto first = wrapper->GetStatus(); + auto second = wrapper->GetStatus(); + EXPECT_EQ(first, second); + + // Only the first GetStatus queries the store; the rest reuse its result. + EXPECT_EQ(1, store->MetadataCalls()); +} + +TEST(BigSegmentStoreWrapperStatusTest, BackgroundPollNotifiesOnlyOnChange) { + auto store = std::make_shared(); + store->SetMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now()}); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto work_guard = boost::asio::make_work_guard(ioc); + std::thread io_thread([&ioc] { ioc.run(); }); + + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5ms, /*stale_after=*/2min), + ioc.get_executor(), logger); + + StatusCollector collector; + auto connection = wrapper->OnStatusChange( + [&collector](BigSegmentStoreStatus status) { collector.Add(status); }); + + wrapper->Start(); + + // First poll broadcasts the initial healthy status. + ASSERT_TRUE(collector.WaitForCount(1, 1s)); + + // Several more polls occur with unchanged metadata; none should broadcast. + store->WaitForMetadataCalls(5, 1s); + + // A change flips availability and broadcasts exactly once more. + store->SetMetadata(tl::make_unexpected("boom")); + ASSERT_TRUE(collector.WaitForCount(2, 1s)); + + // Stop the io loop before the wrapper is destroyed so no poll runs during + // destruction. + work_guard.reset(); + ioc.stop(); + io_thread.join(); + + auto statuses = collector.Statuses(); + ASSERT_EQ(2u, statuses.size()); + EXPECT_TRUE(statuses[0].available); + EXPECT_FALSE(statuses[1].available); +} + +TEST(BigSegmentStoreWrapperMembershipTest, + CacheMissReturnsMembershipFromStore) { + auto store = std::make_shared(); + store->SetMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now()}); + store->SetMembership( + integrations::Membership::FromSegmentRefs({"segA.g1"}, {"segB.g2"})); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5s, /*stale_after=*/2min), + ioc.get_executor(), logger); + + auto result = wrapper->GetMembership("ctx"); + EXPECT_EQ(BigSegmentsStatus::kHealthy, result.status); + EXPECT_EQ(true, result.membership.CheckMembership("segA.g1")); + EXPECT_EQ(false, result.membership.CheckMembership("segB.g2")); +} + +TEST(BigSegmentStoreWrapperMembershipTest, CacheHitAvoidsSecondQuery) { + auto store = std::make_shared(); + store->SetMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now()}); + store->SetMembership( + integrations::Membership::FromSegmentRefs({"segA.g1"}, {})); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5s, /*stale_after=*/2min), + ioc.get_executor(), logger); + + auto first = wrapper->GetMembership("ctx"); + auto second = wrapper->GetMembership("ctx"); + EXPECT_EQ(true, second.membership.CheckMembership("segA.g1")); + EXPECT_EQ(1, store->MembershipCalls()); +} + +TEST(BigSegmentStoreWrapperMembershipTest, + StoreErrorIsNotCachedAndMarksUnavailable) { + auto store = std::make_shared(); + store->SetMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now()}); + store->SetMembership(tl::make_unexpected("boom")); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5s, /*stale_after=*/2min), + ioc.get_executor(), logger); + + auto result = wrapper->GetMembership("ctx"); + EXPECT_EQ(BigSegmentsStatus::kStoreError, result.status); + EXPECT_FALSE(result.membership.CheckMembership("segA.g1").has_value()); + EXPECT_FALSE(wrapper->GetStatus().available); + + // The error was not cached, so the next lookup queries the store again. + (void)wrapper->GetMembership("ctx"); + EXPECT_EQ(2, store->MembershipCalls()); +} + +TEST(BigSegmentStoreWrapperMembershipTest, StaleStoreMakesMembershipStale) { + auto store = std::make_shared(); + store->SetMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now() - 5min}); + store->SetMembership( + integrations::Membership::FromSegmentRefs({"segA.g1"}, {})); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5s, /*stale_after=*/2min), + ioc.get_executor(), logger); + + auto result = wrapper->GetMembership("ctx"); + EXPECT_EQ(BigSegmentsStatus::kStale, result.status); + EXPECT_EQ(true, result.membership.CheckMembership("segA.g1")); +} + +TEST(BigSegmentStoreWrapperMembershipTest, ConcurrentMissesShareOneStoreQuery) { + auto store = std::make_shared(); + store->SetMetadata( + integrations::StoreMetadata{std::chrono::system_clock::now()}); + store->SetMembership( + integrations::Membership::FromSegmentRefs({"segA.g1"}, {})); + store->BlockMembership(); + + auto logger = launchdarkly::logging::NullLogger(); + boost::asio::io_context ioc; + auto wrapper = std::make_shared( + MakeConfig(store, /*poll_interval=*/5s, /*stale_after=*/2min), + ioc.get_executor(), logger); + + std::optional r1; + std::optional r2; + std::thread t1([&] { r1 = wrapper->GetMembership("ctx"); }); + std::thread t2([&] { r2 = wrapper->GetMembership("ctx"); }); + + // One caller becomes the leader and blocks in the store; while it is + // blocked the in-flight entry persists, so the brief pause lets the other + // caller reach the coalescing check and wait on that entry rather than + // issuing its own query. (If coalescing were broken the follower would call + // the store during the pause, pushing MembershipCalls to 2.) + store->WaitForMembershipCalls(1, 1s); + std::this_thread::sleep_for(50ms); + store->ReleaseMembership(); + + t1.join(); + t2.join(); + + ASSERT_TRUE(r1.has_value()); + ASSERT_TRUE(r2.has_value()); + EXPECT_EQ(true, r1->membership.CheckMembership("segA.g1")); + EXPECT_EQ(true, r2->membership.CheckMembership("segA.g1")); + EXPECT_EQ(1, store->MembershipCalls()); +} diff --git a/libs/server-sdk/tests/membership_cache_test.cpp b/libs/server-sdk/tests/membership_cache_test.cpp new file mode 100644 index 000000000..f1d6fb765 --- /dev/null +++ b/libs/server-sdk/tests/membership_cache_test.cpp @@ -0,0 +1,122 @@ +#include + +#include + +#include +#include + +using launchdarkly::server_side::data_components::MembershipCache; +using launchdarkly::server_side::integrations::Membership; +using namespace std::chrono_literals; + +namespace { + +// A clock whose value the test advances by hand, so TTL behavior is +// deterministic rather than dependent on wall-clock timing. +class FakeClock { + public: + void Advance(std::chrono::milliseconds by) { now_ += by; } + + std::function AsFn() { + return [this] { return now_; }; + } + + private: + std::chrono::steady_clock::time_point now_{}; +}; + +Membership MemberOf(std::string const& segment_ref) { + return Membership::FromSegmentRefs({segment_ref}, {}); +} + +} // namespace + +TEST(MembershipCacheTest, MissOnAbsentKey) { + MembershipCache cache(10, 1000ms); + EXPECT_FALSE(cache.Get("nobody").has_value()); +} + +TEST(MembershipCacheTest, ReturnsStoredValue) { + MembershipCache cache(10, 1000ms); + cache.Set("a", MemberOf("seg.g1")); + + auto result = cache.Get("a"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(true, result->CheckMembership("seg.g1")); +} + +TEST(MembershipCacheTest, SetReplacesExistingValue) { + MembershipCache cache(10, 1000ms); + cache.Set("a", MemberOf("first.g1")); + cache.Set("a", MemberOf("second.g1")); + + auto result = cache.Get("a"); + ASSERT_TRUE(result.has_value()); + EXPECT_FALSE(result->CheckMembership("first.g1").has_value()); + EXPECT_EQ(true, result->CheckMembership("second.g1")); + EXPECT_EQ(1u, cache.Size()); +} + +TEST(MembershipCacheTest, EvictsLeastRecentlyUsedAtCapacity) { + MembershipCache cache(2, 1000ms); + cache.Set("a", MemberOf("a.g1")); + cache.Set("b", MemberOf("b.g1")); + cache.Set("c", MemberOf("c.g1")); // evicts "a", the LRU + + EXPECT_EQ(2u, cache.Size()); + EXPECT_FALSE(cache.Get("a").has_value()); + EXPECT_TRUE(cache.Get("b").has_value()); + EXPECT_TRUE(cache.Get("c").has_value()); +} + +TEST(MembershipCacheTest, GetRefreshesRecency) { + MembershipCache cache(2, 1000ms); + cache.Set("a", MemberOf("a.g1")); + cache.Set("b", MemberOf("b.g1")); + + // Touch "a" so "b" becomes the least-recently-used. + EXPECT_TRUE(cache.Get("a").has_value()); + + cache.Set("c", MemberOf("c.g1")); // evicts "b" now, not "a" + + EXPECT_TRUE(cache.Get("a").has_value()); + EXPECT_FALSE(cache.Get("b").has_value()); + EXPECT_TRUE(cache.Get("c").has_value()); +} + +TEST(MembershipCacheTest, ExpiresAfterTtlAndIsRemoved) { + FakeClock clock; + MembershipCache cache(10, 1000ms, clock.AsFn()); + cache.Set("a", MemberOf("a.g1")); + + clock.Advance(999ms); + EXPECT_TRUE(cache.Get("a").has_value()); + + clock.Advance(1ms); // now exactly at the TTL boundary + EXPECT_FALSE(cache.Get("a").has_value()); + EXPECT_EQ(0u, cache.Size()); // the expired entry is dropped, not retained +} + +TEST(MembershipCacheTest, SetResetsExpiration) { + FakeClock clock; + MembershipCache cache(10, 1000ms, clock.AsFn()); + cache.Set("a", MemberOf("a.g1")); + + clock.Advance(600ms); + cache.Set("a", MemberOf("a.g1")); // re-insert resets the TTL + + clock.Advance(600ms); // 1200ms since first set, 600ms since the second + EXPECT_TRUE(cache.Get("a").has_value()); +} + +TEST(MembershipCacheTest, ClearRemovesEverything) { + MembershipCache cache(10, 1000ms); + cache.Set("a", MemberOf("a.g1")); + cache.Set("b", MemberOf("b.g1")); + + cache.Clear(); + + EXPECT_EQ(0u, cache.Size()); + EXPECT_FALSE(cache.Get("a").has_value()); + EXPECT_FALSE(cache.Get("b").has_value()); +}