diff --git a/proto/wg/cosmo/cacheevents/v1/cacheevents.proto b/proto/wg/cosmo/cacheevents/v1/cacheevents.proto index b42008fa99..1d1a1b1b6f 100644 --- a/proto/wg/cosmo/cacheevents/v1/cacheevents.proto +++ b/proto/wg/cosmo/cacheevents/v1/cacheevents.proto @@ -4,7 +4,8 @@ package wg.cosmo.cacheevents.v1; // CacheEventsService receives raw entity-cache decision events from the router. // Events are ingested per-fetch (one event per cache decision) and persisted -// into ClickHouse for analytics. +// into ClickHouse for analytics. See graphqlmetrics/cacheevents/ for the +// server-side processor and ClickHouse schema. service CacheEventsService { rpc PublishEntityCacheEvents(PublishEntityCacheEventsRequest) returns (PublishEntityCacheEventsResponse) {} diff --git a/router-tests/operations/feature_flag_rollouts_test.go b/router-tests/operations/feature_flag_rollouts_test.go new file mode 100644 index 0000000000..305997cc85 --- /dev/null +++ b/router-tests/operations/feature_flag_rollouts_test.go @@ -0,0 +1,227 @@ +package integration + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" + "github.com/wundergraph/cosmo/router/pkg/config" +) + +// setMyFFTrafficPercentage rewires the standard testenv `myff` feature flag +// to carry the given traffic_percentage. The myff flag swaps in a feature +// subgraph that adds a `productCount` field to Employee — when the rollout +// selector picks myff, the response includes productCount; when it falls +// through to base, the query errors with "Cannot query field productCount". +// That gives every assertion a clean rollout-vs-base discriminator without +// needing to count which goroutine got which bucket. +func setMyFFTrafficPercentage(routerConfig *nodev1.RouterConfig, pct uint32) { + if routerConfig.FeatureFlagConfigs == nil { + return + } + if myff, ok := routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName["myff"]; ok { + myff.TrafficPercentage = &pct + } +} + +// rolloutsEnabled returns the RouterOptions slice that turns the rollout +// selector on. Tests explicitly opt-in because testenv defaults the +// FeatureFlagRollouts.Enabled to its zero value (false). +func rolloutsEnabled() []core.Option { + return []core.Option{ + core.WithFeatureFlagRollouts(config.FeatureFlagRollouts{Enabled: true}), + } +} + +const ( + productCountQuery = `{ employees { id productCount } }` + + // Error response when the base graph (which doesn't define productCount) + // serves the request because the rollout selector either picked nothing + // or the flag was 0%. + // + // res.Body is a raw JSON string, so the inner double-quotes around the + // field name are JSON-escaped. Match the on-the-wire form, not the + // human-readable form. + productCountFieldError = `Cannot query field \"productCount\"` +) + +func TestFeatureFlagRollouts(t *testing.T) { + t.Parallel() + + t.Run("traffic_percentage 100 routes every request to the rollout flag", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterOptions: rolloutsEnabled(), + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + setMyFFTrafficPercentage(routerConfig, 100) + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + for range 50 { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: productCountQuery}) + require.Equal(t, "myff", res.Response.Header.Get("X-Feature-Flag"), + "100%% rollout must always serve the flag's variant") + require.NotContains(t, res.Body, productCountFieldError) + } + }) + }) + + t.Run("traffic_percentage 0 never routes to the flag", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterOptions: rolloutsEnabled(), + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + setMyFFTrafficPercentage(routerConfig, 0) + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + for range 50 { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: productCountQuery}) + require.Empty(t, res.Response.Header.Get("X-Feature-Flag"), + "0%% rollout flag must never be picked") + require.Contains(t, res.Body, productCountFieldError) + } + }) + }) + + t.Run("header pin targeting a rollout flag is ignored", func(t *testing.T) { + t.Parallel() + + // myff at 0% means the rollout selector never picks it, AND the header + // pin must be bypassed because rollout flags aren't client-steerable. + // Net: every request falls through to base. + testenv.Run(t, &testenv.Config{ + RouterOptions: rolloutsEnabled(), + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + setMyFFTrafficPercentage(routerConfig, 0) + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: productCountQuery, + Header: map[string][]string{"X-Feature-Flag": {"myff"}}, + }) + require.Empty(t, res.Response.Header.Get("X-Feature-Flag"), + "header pin on a rollout flag must be ignored — base serves the request") + require.Contains(t, res.Body, productCountFieldError) + }) + }) + + t.Run("cookie pin targeting a rollout flag is ignored", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterOptions: rolloutsEnabled(), + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + setMyFFTrafficPercentage(routerConfig, 0) + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: productCountQuery, + Header: map[string][]string{"Cookie": {"feature_flag=myff"}}, + }) + require.Empty(t, res.Response.Header.Get("X-Feature-Flag"), + "cookie pin on a rollout flag must be ignored — base serves the request") + require.Contains(t, res.Body, productCountFieldError) + }) + }) + + t.Run("flag without traffic_percentage stays preview-only — header still works", func(t *testing.T) { + t.Parallel() + + // Selector is enabled, but myff has no traffic_percentage set, so it's + // a preview flag and header/cookie pinning still works exactly as before. + testenv.Run(t, &testenv.Config{ + RouterOptions: rolloutsEnabled(), + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: productCountQuery, + Header: map[string][]string{"X-Feature-Flag": {"myff"}}, + }) + require.Equal(t, "myff", res.Response.Header.Get("X-Feature-Flag")) + require.NotContains(t, res.Body, productCountFieldError) + }) + }) + + t.Run("rollouts disabled — header pin still works even with traffic_percentage set", func(t *testing.T) { + t.Parallel() + + // FeatureFlagRollouts.Enabled = false (default). Even with a non-zero + // traffic_percentage on myff, the selector is dormant: the flag stays + // header/cookie-pinnable and percentage is simply ignored. + testenv.Run(t, &testenv.Config{ + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + setMyFFTrafficPercentage(routerConfig, 50) + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: productCountQuery, + Header: map[string][]string{"X-Feature-Flag": {"myff"}}, + }) + require.Equal(t, "myff", res.Response.Header.Get("X-Feature-Flag"), + "selector off + header pin → flag still served the legacy way") + require.NotContains(t, res.Body, productCountFieldError) + }) + }) + + t.Run("traffic_percentage above 100 fails closed — selector disables itself", func(t *testing.T) { + t.Parallel() + + // 200% is rejected by newRolloutSelector with a logged error and + // returns nil — the selector is disabled, every unpinned request + // falls through to base. + testenv.Run(t, &testenv.Config{ + RouterOptions: rolloutsEnabled(), + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + setMyFFTrafficPercentage(routerConfig, 200) + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + for range 50 { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: productCountQuery}) + require.Empty(t, res.Response.Header.Get("X-Feature-Flag")) + require.Contains(t, res.Body, productCountFieldError) + } + }) + }) + + t.Run("traffic_percentage 50 distributes ~50/50 across rollout and base", func(t *testing.T) { + t.Parallel() + + // Statistical assertion. Random per-request bucketing → over many + // samples the empirical share should land near the target with a + // generous tolerance to avoid flake. + testenv.Run(t, &testenv.Config{ + RouterOptions: rolloutsEnabled(), + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + setMyFFTrafficPercentage(routerConfig, 50) + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + const samples = 1000 + rolloutHits := 0 + baseHits := 0 + for range samples { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: productCountQuery}) + switch { + case res.Response.Header.Get("X-Feature-Flag") == "myff": + rolloutHits++ + case strings.Contains(res.Body, productCountFieldError): + baseHits++ + default: + t.Fatalf("unexpected response: header=%q body=%q", + res.Response.Header.Get("X-Feature-Flag"), res.Body) + } + } + require.Equal(t, samples, rolloutHits+baseHits, "every request must hit one bucket") + + // Tolerance: ±10 percentage points across 1000 samples is comfortable + // (a true 50% Bernoulli with n=1000 has σ ≈ 1.6pp, so 10pp ≈ 6σ). + gotRolloutPct := float64(rolloutHits) / float64(samples) + require.InDeltaf(t, 0.50, gotRolloutPct, 0.10, + "expected ~50%% rollout, got %.3f (rollout=%d base=%d)", + gotRolloutPct, rolloutHits, baseHits) + }) + }) +} diff --git a/router/core/feature_flag_rollout.go b/router/core/feature_flag_rollout.go new file mode 100644 index 0000000000..cde79a416a --- /dev/null +++ b/router/core/feature_flag_rollout.go @@ -0,0 +1,195 @@ +package core + +import ( + mathrand "math/rand/v2" + "net/http" + "sort" + + "github.com/wundergraph/cosmo/router/pkg/config" + "go.uber.org/zap" + + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" +) + +const rolloutBucketScale = 10000 // basis points: 100% == 10000 + +// rolloutRule is a resolved per-flag percentage range in basis points. +// A request whose bucket falls in [lo, hi) is routed to flagName. +type rolloutRule struct { + flagName string + lo, hi uint32 +} + +// rolloutSelector picks a feature flag for an unpinned request based on the +// per-flag traffic_percentage shipped in the execution config. Bucketing is +// uniformly random per request — there is no per-user stickiness. +type rolloutSelector struct { + rules []rolloutRule + rolloutFlags map[string]struct{} +} + +// newRolloutSelector resolves the proto traffic_percentage for every flag in +// ffConfigs into a single ordered list of buckets. Returns nil (no rollout) +// when the feature is disabled, no flag carries a percentage, or the +// cumulative percentage exceeds 100 (we fail closed to base rather than +// route partial traffic). +func newRolloutSelector( + cfg *config.FeatureFlagRollouts, + ffConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig, + logger *zap.Logger, +) (*rolloutSelector, error) { + if cfg == nil || !cfg.Enabled { + return nil, nil + } + + // Pure proto-driven: every flag whose execution config carries a + // traffic_percentage participates. Sort by name for stable ordering + // across reloads. + var names []string + for name, ec := range ffConfigs { + if ec.TrafficPercentage == nil { + continue + } + names = append(names, name) + } + sort.Strings(names) + + // First pass: drop any flag whose own pct is over 100. This is always an + // operator typo — a single rollout slice can never exceed the whole graph. + accepted := names[:0] + for _, name := range names { + pct := ffConfigs[name].GetTrafficPercentage() + if pct > 100 { + logger.Error("feature_flag_rollouts: flag percentage exceeds 100; flag dropped, traffic falls through to base for it", + zap.String("flag", name), zap.Uint32("percentage", pct)) + continue + } + accepted = append(accepted, name) + } + + // Second pass: greedy fit under the 100% budget, alphabetical order. + // If the next flag would push cumulative over 100 it is dropped (logged) + // and we keep filling with the remaining flags. Earlier choice: returning + // nil here disabled every rollout flag on the graph and silently restored + // header/cookie pinnability — a single typo blew up the entire rollout. + // Drop-the-offender preserves siblings; the operator sees an explicit + // error in the log and the dropped flag's traffic falls through to base. + var sum uint64 + var cursor uint32 + rules := make([]rolloutRule, 0, len(accepted)) + rolloutFlags := make(map[string]struct{}, len(accepted)) + for _, name := range accepted { + pct := ffConfigs[name].GetTrafficPercentage() + if sum+uint64(pct) > 100 { + logger.Error("feature_flag_rollouts: flag would push cumulative percentage over 100; flag dropped, traffic falls through to base for it", + zap.String("flag", name), + zap.Uint32("percentage", pct), + zap.Uint64("cumulative_so_far", sum)) + continue + } + sum += uint64(pct) + rolloutFlags[name] = struct{}{} + if pct == 0 { + logger.Warn("feature_flag_rollouts: flag has percentage 0 (degenerate); flag will not be reachable", + zap.String("flag", name)) + continue + } + span := pct * (rolloutBucketScale / 100) + rules = append(rules, rolloutRule{flagName: name, lo: cursor, hi: cursor + span}) + cursor += span + } + + // In case all ffs had 0 percentage (or every flag was dropped above). + if len(rules) == 0 && len(rolloutFlags) == 0 { + logger.Warn("feature_flag_rollouts: no usable flags; selector disabled") + return nil, nil + } + + logRolloutFlagSummary(logger, ffConfigs, rolloutFlags, rules) + + return &rolloutSelector{ + rules: rules, + rolloutFlags: rolloutFlags, + }, nil +} + +// isRolloutFlag reports whether name is a rollout flag. The request handler +// uses this to decide whether a header/cookie pin should be ignored — rollout +// flags are never client-steerable. +func (s *rolloutSelector) isRolloutFlag(name string) bool { + if s == nil { + return false + } + _, ok := s.rolloutFlags[name] + return ok +} + +// pick chooses a rollout flag for an unpinned request. Each request is +// bucketed independently with crypto/rand so distribution holds in aggregate; +// individual clients may flicker between variants across requests. +func (s *rolloutSelector) pick(_ http.ResponseWriter, _ *http.Request) (flag, source string, ok bool) { + if s == nil || len(s.rules) == 0 { + return "", "", false + } + bucket := randomBucket() + for _, rule := range s.rules { + if bucket >= rule.lo && bucket < rule.hi { + return rule.flagName, "random", true + } + } + // Bucket landed outside any rule's range — request falls through to base. + return "", "random", false +} + +// randomBucket returns a uniform basis-point bucket in [0, rolloutBucketScale). +// +// math/rand/v2.Uint32N is lock-free per goroutine, ~5 ns, no syscall, and +// rejection-samples to remove the modulo bias `crypto/rand % N` would have. +// Traffic bucketing has no security requirement; predictability of the +// per-request bucket would not let a client influence the gate (the bucket +// itself is not visible to the client and is not derived from request +// content under the current "no stickiness" design). +func randomBucket() uint32 { + return mathrand.Uint32N(rolloutBucketScale) +} + +// logRolloutFlagSummary emits one line per feature flag at config load so +// operators can confirm reachability at a glance. +func logRolloutFlagSummary( + logger *zap.Logger, + ffConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig, + rolloutFlags map[string]struct{}, + rules []rolloutRule, +) { + if logger == nil || len(ffConfigs) == 0 { + return + } + pctByFlag := make(map[string]uint32, len(rules)) + for _, r := range rules { + pctByFlag[r.flagName] = (r.hi - r.lo) / (rolloutBucketScale / 100) + } + names := make([]string, 0, len(ffConfigs)) + for n := range ffConfigs { + names = append(names, n) + } + sort.Strings(names) + for _, name := range names { + if _, isRollout := rolloutFlags[name]; isRollout { + pct := pctByFlag[name] + if pct == 0 { + logger.Warn("feature flag has percentage=0 — unreachable", + zap.String("flag", name), zap.String("mode", "rollout")) + continue + } + logger.Info("feature flag registered", + zap.String("flag", name), + zap.String("mode", "rollout"), + zap.Uint32("percentage", pct)) + } else { + logger.Info("feature flag registered", + zap.String("flag", name), + zap.String("mode", "preview"), + zap.Strings("reachable_via", []string{"header", "cookie"})) + } + } +} diff --git a/router/core/feature_flag_rollout_test.go b/router/core/feature_flag_rollout_test.go new file mode 100644 index 0000000000..ec8cf09e35 --- /dev/null +++ b/router/core/feature_flag_rollout_test.go @@ -0,0 +1,183 @@ +package core + +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router/pkg/config" + "go.uber.org/zap" + + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" +) + +func u32p(v uint32) *uint32 { return &v } + +func newSelector(t *testing.T, ffConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig) *rolloutSelector { + t.Helper() + sel, err := newRolloutSelector( + &config.FeatureFlagRollouts{Enabled: true}, + ffConfigs, + zap.NewNop(), + ) + require.NoError(t, err) + return sel +} + +func TestNewRolloutSelector_DisabledReturnsNil(t *testing.T) { + t.Parallel() + sel, err := newRolloutSelector( + &config.FeatureFlagRollouts{Enabled: false}, + map[string]*nodev1.FeatureFlagRouterExecutionConfig{ + "foo": {TrafficPercentage: u32p(10)}, + }, + zap.NewNop(), + ) + require.NoError(t, err) + require.Nil(t, sel) +} + +func TestNewRolloutSelector_NilCfgReturnsNil(t *testing.T) { + t.Parallel() + sel, err := newRolloutSelector(nil, nil, zap.NewNop()) + require.NoError(t, err) + require.Nil(t, sel) +} + +func TestNewRolloutSelector_NoFlagsReturnsNil(t *testing.T) { + t.Parallel() + sel, err := newRolloutSelector( + &config.FeatureFlagRollouts{Enabled: true}, + map[string]*nodev1.FeatureFlagRouterExecutionConfig{ + "preview_only": {}, // no traffic_percentage + }, + zap.NewNop(), + ) + require.NoError(t, err) + require.Nil(t, sel) +} + +func TestNewRolloutSelector_DropsOverflowingFlagButKeepsSiblings(t *testing.T) { + t.Parallel() + // "a"=60, "b"=60 — alphabetical order means "a" lands inside the budget + // (60), and "b" would push to 120 → "b" is dropped, "a" is preserved. + sel, err := newRolloutSelector( + &config.FeatureFlagRollouts{Enabled: true}, + map[string]*nodev1.FeatureFlagRouterExecutionConfig{ + "a": {TrafficPercentage: u32p(60)}, + "b": {TrafficPercentage: u32p(60)}, + }, + zap.NewNop(), + ) + require.NoError(t, err) + require.NotNil(t, sel) + require.True(t, sel.isRolloutFlag("a"), "a fits under budget and stays a rollout flag") + require.False(t, sel.isRolloutFlag("b"), "b would overflow budget and is dropped (falls through to base, no header/cookie pin)") +} + +func TestNewRolloutSelector_DropsAbove100PercentFlag(t *testing.T) { + t.Parallel() + // Single flag with pct > 100 is always an operator typo — the flag is + // dropped (logged); selector returns nil because nothing is left. + sel, err := newRolloutSelector( + &config.FeatureFlagRollouts{Enabled: true}, + map[string]*nodev1.FeatureFlagRouterExecutionConfig{ + "a": {TrafficPercentage: u32p(101)}, + }, + zap.NewNop(), + ) + require.NoError(t, err) + require.Nil(t, sel) +} + +func TestNewRolloutSelector_ZeroPercentFlagIsRolloutButUnreachable(t *testing.T) { + t.Parallel() + sel := newSelector(t, map[string]*nodev1.FeatureFlagRouterExecutionConfig{ + "shadow": {TrafficPercentage: u32p(0)}, + "live": {TrafficPercentage: u32p(50)}, + }) + require.NotNil(t, sel) + // Both flags are rollout flags (header/cookie pins must be ignored) + require.True(t, sel.isRolloutFlag("shadow"), "0%% flags must still be rollout flags") + require.True(t, sel.isRolloutFlag("live")) + require.False(t, sel.isRolloutFlag("preview_only")) + + // Pick must never return the 0%% flag + for range 200 { + flag, _, _ := sel.pick(httptest.NewRecorder(), httptest.NewRequest("GET", "/", nil)) + require.NotEqual(t, "shadow", flag, "0%% flag must never be picked") + } +} + +func TestPick_HonorsCumulativeRanges(t *testing.T) { + t.Parallel() + // Two flags at 30% and 20% = 50% rollout, 50% base. With enough samples + // the empirical distribution should be close. + sel := newSelector(t, map[string]*nodev1.FeatureFlagRouterExecutionConfig{ + "a": {TrafficPercentage: u32p(30)}, + "b": {TrafficPercentage: u32p(20)}, + }) + require.NotNil(t, sel) + require.True(t, sel.isRolloutFlag("a")) + require.True(t, sel.isRolloutFlag("b")) + + const samples = 20000 + counts := map[string]int{"a": 0, "b": 0, "": 0} + for range samples { + flag, _, _ := sel.pick(httptest.NewRecorder(), httptest.NewRequest("GET", "/", nil)) + counts[flag]++ + } + + // Allow ±3pp tolerance on each band. + const tolPP = 0.03 + checkBand := func(name string, want float64) { + t.Helper() + got := float64(counts[name]) / float64(samples) + require.InDeltaf(t, want, got, tolPP, "flag=%q got=%.3f want=%.3f", name, got, want) + } + checkBand("a", 0.30) + checkBand("b", 0.20) + checkBand("", 0.50) // base/fall-through +} + +func TestPick_IsRandomPerRequest(t *testing.T) { + t.Parallel() + sel := newSelector(t, map[string]*nodev1.FeatureFlagRouterExecutionConfig{ + "a": {TrafficPercentage: u32p(50)}, + }) + require.NotNil(t, sel) + + // Same identical request shape, called many times — both buckets must + // occur (random per request, no stickiness). + flagsSeen := map[string]struct{}{} + for range 200 { + flag, source, _ := sel.pick(httptest.NewRecorder(), httptest.NewRequest("GET", "/", nil)) + require.Equal(t, "random", source) + flagsSeen[flag] = struct{}{} + } + require.Contains(t, flagsSeen, "a", "expected to land on rollout flag at least once") + require.Contains(t, flagsSeen, "", "expected to fall through to base at least once") +} + +func TestNilSelector_IsRolloutFlagReturnsFalse(t *testing.T) { + t.Parallel() + var sel *rolloutSelector + require.False(t, sel.isRolloutFlag("anything")) +} + +func TestNilSelector_PickReturnsFalse(t *testing.T) { + t.Parallel() + var sel *rolloutSelector + flag, source, ok := sel.pick(httptest.NewRecorder(), httptest.NewRequest("GET", "/", nil)) + require.Equal(t, "", flag) + require.Equal(t, "", source) + require.False(t, ok) +} + +func TestRandomBucket_InRange(t *testing.T) { + t.Parallel() + for range 1000 { + b := randomBucket() + require.Less(t, b, uint32(rolloutBucketScale)) + } +} diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 568646a8e4..7315be716e 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -141,6 +141,7 @@ type buildMultiGraphHandlerOptions struct { baseMux *chi.Mux featureFlagConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig reloadPersistentState *ReloadPersistentState + rolloutSelector *rolloutSelector } // newGraphServer creates a new server instance. @@ -349,10 +350,20 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC s.logger.Info("Feature flags enabled", zap.Strings("flags", maps.Keys(featureFlagConfigMap))) } + rolloutSel, err := newRolloutSelector( + &r.featureFlagRollouts, + featureFlagConfigMap, + s.logger, + ) + if err != nil { + s.logger.Error("feature_flag_rollouts: invalid config; rollouts disabled", zap.Error(err)) + } + multiGraphHandler, err := s.buildMultiGraphHandler(ctx, buildMultiGraphHandlerOptions{ baseMux: gm.mux, featureFlagConfigs: featureFlagConfigMap, reloadPersistentState: r.reloadPersistentState, + rolloutSelector: rolloutSel, }) if err != nil { return nil, fmt.Errorf("failed to build feature flag handler: %w", err) @@ -525,9 +536,13 @@ func (s *graphServer) buildMultiGraphHandler( } return func(w http.ResponseWriter, r *http.Request) { - // Extract the feature flag and run the corresponding mux - // 1. From the request header - // 2. From the cookie + // Flag selection precedence: + // 1. X-Feature-Flag header (preview flags only) + // 2. feature_flag cookie (preview flags only) + // 3. Rollout selector (rollout flags, by traffic %) + // + // Header/cookie pins targeting a *rollout* flag are ignored — + // rollout flags must not be steerable by clients. ff := strings.TrimSpace(r.Header.Get(featureFlagHeader)) if ff == "" { @@ -537,7 +552,25 @@ func (s *graphServer) buildMultiGraphHandler( } } + if ff != "" && opts.rolloutSelector != nil && opts.rolloutSelector.isRolloutFlag(ff) { + ff = "" + } + + if ff == "" && opts.rolloutSelector != nil { + if picked, _, ok := opts.rolloutSelector.pick(w, r); ok { + ff = picked + } + } + if mux, ok := featureFlagToMux[ff]; ok { + // Always echo the active flag. A previous attempt suppressed it + // for random rollout picks on the theory that the header lets a + // client grind onto the variant — but the variant must have an + // observably different response shape (otherwise the rollout has + // no purpose), so a grinder already has a side-channel. Stripping + // just hides debug signal from honest clients, breaks downstream + // caches keyed on the flag, and complicates integration tests + // that need to know which variant served the request. w.Header().Set(featureFlagHeader, ff) mux.ServeHTTP(w, r) return diff --git a/router/core/router.go b/router/core/router.go index 0126107603..3af4ad7057 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -2614,6 +2614,14 @@ func WithCacheWarmupConfig(cfg *config.CacheWarmupConfiguration) Option { } } +// WithFeatureFlagRollouts configures percentage-based traffic rollouts for +// feature flags shipped in the execution config. See FeatureFlagRollouts. +func WithFeatureFlagRollouts(cfg config.FeatureFlagRollouts) Option { + return func(r *Router) { + r.featureFlagRollouts = cfg + } +} + // WithPlanningDurationOverride sets a function that overrides the measured planning duration. // Used in tests to simulate slow queries that exceed the expensive query threshold. func WithPlanningDurationOverride(fn func(content string) time.Duration) Option { diff --git a/router/core/router_config.go b/router/core/router_config.go index 974ea2800b..9bf423ef7e 100644 --- a/router/core/router_config.go +++ b/router/core/router_config.go @@ -146,6 +146,7 @@ type Config struct { subgraphErrorPropagation config.SubgraphErrorPropagationConfiguration clientHeader config.ClientHeader cacheWarmup *config.CacheWarmupConfiguration + featureFlagRollouts config.FeatureFlagRollouts planningDurationOverride func(content string) time.Duration subscriptionHeartbeatInterval time.Duration hostName string diff --git a/router/core/supervisor_instance.go b/router/core/supervisor_instance.go index c6cd88a0da..6527cda7c7 100644 --- a/router/core/supervisor_instance.go +++ b/router/core/supervisor_instance.go @@ -270,6 +270,7 @@ func optionsFromResources(logger *zap.Logger, config *config.Config, reloadPersi WithRateLimitConfig(&config.RateLimit), WithClientHeader(config.ClientHeader), WithCacheWarmupConfig(&config.CacheWarmup), + WithFeatureFlagRollouts(config.FeatureFlagRollouts), WithMCP(config.MCP), WithConnectRPC(config.ConnectRPC), WithPlugins(config.Plugins), diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 4b854d67d7..26057a3148 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -1299,6 +1299,12 @@ type IntrospectionConfiguration struct { Secret string `yaml:"secret" env:"INTROSPECTION_SECRET"` } +// FeatureFlagRollouts toggles percentage-based traffic rollouts for feature +// flags shipped in the execution config. +type FeatureFlagRollouts struct { + Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"` +} + type Config struct { Version string `yaml:"version,omitempty" ignored:"true"` @@ -1377,6 +1383,8 @@ type Config struct { Plugins PluginsConfiguration `yaml:"plugins" envPrefix:"PLUGINS_"` WatchConfig WatchConfig `yaml:"watch_config" envPrefix:"WATCH_CONFIG_"` + + FeatureFlagRollouts FeatureFlagRollouts `yaml:"feature_flag_rollouts,omitempty" envPrefix:"FEATURE_FLAG_ROLLOUTS_"` } type WatchConfig struct { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 023c263d2c..bd554c7a64 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -4193,6 +4193,18 @@ } } } + }, + "feature_flag_rollouts": { + "type": "object", + "description": "Toggles percentage-based traffic rollouts for feature flags. When enabled, every flag whose execution config has traffic_percentage set becomes a rollout flag (header/cookie pins ignored). Per-request bucket is uniformly random. Off by default — opt-in to avoid changing routing semantics on upgrade.", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "default": false, + "description": "Enable percentage-based feature flag rollouts." + } + } } }, "$defs": { diff --git a/router/pkg/config/fixtures/full.yaml b/router/pkg/config/fixtures/full.yaml index 02280f4255..b951bc6279 100644 --- a/router/pkg/config/fixtures/full.yaml +++ b/router/pkg/config/fixtures/full.yaml @@ -548,3 +548,6 @@ apollo_compatibility_flags: enabled: true use_graphql_validation_failed_status: enabled: true + +feature_flag_rollouts: + enabled: true diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index e6a5c6145f..a9bcc3c80f 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -669,5 +669,8 @@ "Enabled": false, "Maximum": 10000000000 } + }, + "FeatureFlagRollouts": { + "Enabled": false } } \ No newline at end of file diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 2e93674fa7..4c45c470f9 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -1114,5 +1114,8 @@ "Enabled": true, "Maximum": 10000000000 } + }, + "FeatureFlagRollouts": { + "Enabled": true } } \ No newline at end of file