Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions router-tests/events/nats_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ var (

type ConfigPollerMock struct {
initConfig *nodev1.RouterConfig
updateConfig func(newConfig *nodev1.RouterConfig, oldVersion string) error
updateConfig func(newConfig *routerconfig.Response) error
ready chan struct{}
}

func (c *ConfigPollerMock) Subscribe(_ context.Context, handler func(newConfig *nodev1.RouterConfig, oldVersion string) error) {
func (c *ConfigPollerMock) Subscribe(_ context.Context, handler func(newConfig *routerconfig.Response) error) {
c.updateConfig = handler
close(c.ready)
}
Expand Down Expand Up @@ -1595,7 +1595,7 @@ func TestNatsEvents(t *testing.T) {
xEnv.WaitForSubscriptionCount(3, EventWaitTimeout)

// Swap config
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

// Wait for all providers to shut down and restart
require.Eventually(t, func() bool {
Expand Down
10 changes: 5 additions & 5 deletions router-tests/operations/cache_warmup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ func TestInMemoryPlanCacheFallback(t *testing.T) {
<-pm.ready

pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
Expand Down Expand Up @@ -1061,7 +1061,7 @@ func TestInMemoryPlanCacheFallback(t *testing.T) {
<-pm.ready

pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
Expand Down Expand Up @@ -1110,7 +1110,7 @@ func TestInMemoryPlanCacheFallback(t *testing.T) {
<-pm.ready

pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id customDetails: details { forename } } }`,
Expand Down Expand Up @@ -1412,11 +1412,11 @@ func writeTestConfig(t *testing.T, version string, path string) {

type ConfigPollerMock struct {
initConfig *nodev1.RouterConfig
updateConfig func(newConfig *nodev1.RouterConfig, oldVersion string) error
updateConfig func(newConfig *routerconfig.Response) error
ready chan struct{}
}

func (c *ConfigPollerMock) Subscribe(_ context.Context, handler func(newConfig *nodev1.RouterConfig, oldVersion string) error) {
func (c *ConfigPollerMock) Subscribe(_ context.Context, handler func(_ *routerconfig.Response) error) {
c.updateConfig = handler
close(c.ready)
}
Expand Down
9 changes: 5 additions & 4 deletions router-tests/operations/plan_fallback_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"
"github.com/wundergraph/cosmo/router/pkg/routerconfig"
)

func TestPlanFallbackCache(t *testing.T) {
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestPlanFallbackCache(t *testing.T) {
// Trigger config reload — new Ristretto cache is created (size 1).
<-pm.ready
pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

// After reload, slow queries should still be available via fallback cache.
waitForPlanCacheHits(t, xEnv, slowQueries, func(ct *assert.CollectT, res *testenv.TestResponse) {
Expand Down Expand Up @@ -230,7 +231,7 @@ func TestPlanFallbackCache(t *testing.T) {
// Trigger config reload — main plan cache is reset.
<-pm.ready
pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

// Wait for reload to complete by checking a slow query (which will be
// served from the fallback cache, confirming the new server is active).
Expand Down Expand Up @@ -293,15 +294,15 @@ func TestPlanFallbackCache(t *testing.T) {

// First reload
pm.initConfig.Version = "v2"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

waitForPlanCacheHits(t, xEnv, slowQueries, func(ct *assert.CollectT, res *testenv.TestResponse) {
assert.Equal(ct, "v2", res.Response.Header.Get("X-Router-Config-Version"))
})

// Second reload
pm.initConfig.Version = "v3"
require.NoError(t, pm.updateConfig(pm.initConfig, "v2"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

waitForPlanCacheHits(t, xEnv, slowQueries, func(ct *assert.CollectT, res *testenv.TestResponse) {
assert.Equal(ct, "v3", res.Response.Header.Get("X-Router-Config-Version"))
Expand Down
21 changes: 11 additions & 10 deletions router-tests/protocol/config_hot_reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (

"context"
"encoding/json"
"os"
"sync/atomic"
"testing"
"time"

"github.com/wundergraph/cosmo/router/pkg/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"os"
"sync/atomic"
"testing"
"time"

"github.com/wundergraph/cosmo/router/pkg/routerconfig"

Expand All @@ -33,11 +34,11 @@ var (

type ConfigPollerMock struct {
initConfig *nodev1.RouterConfig
updateConfig func(newConfig *nodev1.RouterConfig, oldVersion string) error
updateConfig func(response *routerconfig.Response) error
ready chan struct{}
}

func (c *ConfigPollerMock) Subscribe(_ context.Context, handler func(newConfig *nodev1.RouterConfig, oldVersion string) error) {
func (c *ConfigPollerMock) Subscribe(_ context.Context, handler func(response *routerconfig.Response) error) {
c.updateConfig = handler
close(c.ready)
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestConfigHotReloadPoller(t *testing.T) {
<-pm.ready

pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestConfigHotReloadPoller(t *testing.T) {

// Swap config
pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))

res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestConfigHotReloadPoller(t *testing.T) {

// Swap config — the ReadJSON below expects a possible websocket close error,
// so use a deadline instead of WSReadJSON (which retries on errors)
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
err = conn.ReadJSON(&msg)
conn.SetReadDeadline(time.Time{})
Expand Down Expand Up @@ -657,7 +658,7 @@ func BenchmarkConfigHotReload(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))
require.NoError(t, pm.updateConfig(&routerconfig.Response{Config: pm.initConfig}))
}

})
Expand Down
108 changes: 86 additions & 22 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang-jwt/jwt/v5"
"github.com/klauspost/compress/gzhttp"
"github.com/klauspost/compress/gzip"
"github.com/wundergraph/cosmo/router/pkg/routerconfig"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
oteltrace "go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -91,9 +92,11 @@ type (
baseRouterConfigVersion string
mux *chi.Mux
// inFlightRequests is used to track the number of requests currently being processed
// does not include websocket (hijacked) connections
inFlightRequests *atomic.Uint64
graphMuxList []*graphMux
// does not include websocket (hijacked) connections.
inFlightRequests *atomic.Uint64
// graphMuxList contains all graph muxes of this graph server.
// It's keyed by mux name (feature flag name or empty string for base graph).
graphMuxList map[string]*graphMux
graphMuxListLock sync.Mutex
runtimeMetrics *rmetric.RuntimeMetrics
otlpEngineMetrics *rmetric.EngineMetrics
Expand Down Expand Up @@ -127,18 +130,20 @@ type buildMultiGraphHandlerOptions struct {
baseMux *chi.Mux
featureFlagConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig
reloadPersistentState *ReloadPersistentState
currentGraphMuxes map[string]*graphMux
changes *routerconfig.Changes
}

// newGraphServer creates a new server instance.
func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterConfig, proxy ProxyFunc) (*graphServer, error) {
func newGraphServer(ctx context.Context, r *Router, response *routerconfig.Response, proxy ProxyFunc) (*graphServer, error) {
/* Older versions of composition will not populate a compatibility version.
* Currently, all "old" router execution configurations are compatible as there have been no breaking
* changes.
* Upon the first breaking change to the execution config, an unpopulated compatibility version will
* also be unsupported (and the logic for IsRouterCompatibleWithExecutionConfig will need to be updated).
*/
if !execution_config.IsRouterCompatibleWithExecutionConfig(r.logger, routerConfig.CompatibilityVersion) {
return nil, fmt.Errorf(`the compatibility version "%s" is not compatible with this router version`, routerConfig.CompatibilityVersion)
if !execution_config.IsRouterCompatibleWithExecutionConfig(r.logger, response.Config.CompatibilityVersion) {
return nil, fmt.Errorf(`the compatibility version "%s" is not compatible with this router version`, response.Config.CompatibilityVersion)
}

isConnStoreEnabled := r.metricConfig.OpenTelemetry.ConnectionStats || r.metricConfig.Prometheus.ConnectionStats
Expand Down Expand Up @@ -186,9 +191,9 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
subgraphTransports: subgraphTransports,
playgroundHandler: r.playgroundHandler,
traceDialer: traceDialer,
baseRouterConfigVersion: routerConfig.GetVersion(),
baseRouterConfigVersion: response.Config.GetVersion(),
inFlightRequests: &atomic.Uint64{},
graphMuxList: make([]*graphMux, 0, 1),
graphMuxList: make(map[string]*graphMux, 1),
instanceData: InstanceData{
HostName: r.hostName,
ListenAddress: r.listenAddr,
Expand Down Expand Up @@ -294,23 +299,38 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
s.circuitBreakerManager = manager
}

routingUrlGroupings, err := getRoutingUrlGroupingForCircuitBreakers(routerConfig, s.overrideRoutingURLConfiguration, s.overrides)
routingUrlGroupings, err := getRoutingUrlGroupingForCircuitBreakers(response.Config, s.overrideRoutingURLConfiguration, s.overrides)
if err != nil {
return nil, err
}

gm, err := s.buildGraphMux(ctx, BuildGraphMuxOptions{
RouterConfigVersion: s.baseRouterConfigVersion,
EngineConfig: routerConfig.GetEngineConfig(),
ConfigSubgraphs: routerConfig.GetSubgraphs(),
RoutingUrlGroupings: routingUrlGroupings,
ReloadPersistentState: r.reloadPersistentState,
})
if err != nil {
return nil, fmt.Errorf("failed to build base mux: %w", err)
currentMuxes := currentGraphMuxes(r)
var gm *graphMux

mux, oldBaseGraphMuxExists := currentMuxes[""]
needNewBaseGraphMux := response.Changes == nil || response.Changes.BaseGraphChanged() || !oldBaseGraphMuxExists

if needNewBaseGraphMux {
// build new base grap mux
gm, err = s.buildGraphMux(ctx, BuildGraphMuxOptions{
RouterConfigVersion: s.baseRouterConfigVersion,
EngineConfig: response.Config.GetEngineConfig(),
ConfigSubgraphs: response.Config.GetSubgraphs(),
RoutingUrlGroupings: routingUrlGroupings,
ReloadPersistentState: r.reloadPersistentState,
})
if err != nil {
return nil, fmt.Errorf("failed to build base mux: %w", err)
}
} else {
gm = mux
gm.reused.Store(true)
s.graphMuxListLock.Lock()
s.graphMuxList[""] = gm
s.graphMuxListLock.Unlock()
}

featureFlagConfigMap := routerConfig.FeatureFlagConfigs.GetConfigByFeatureFlagName()
featureFlagConfigMap := response.Config.FeatureFlagConfigs.GetConfigByFeatureFlagName()
if len(featureFlagConfigMap) > 0 {
s.logger.Info("Feature flags enabled", zap.Strings("flags", maps.Keys(featureFlagConfigMap)))
}
Expand All @@ -319,6 +339,8 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
baseMux: gm.mux,
featureFlagConfigs: featureFlagConfigMap,
reloadPersistentState: r.reloadPersistentState,
currentGraphMuxes: currentMuxes,
changes: response.Changes,
})
if err != nil {
return nil, fmt.Errorf("failed to build feature flag handler: %w", err)
Expand Down Expand Up @@ -477,6 +499,24 @@ func (s *graphServer) buildMultiGraphHandler(

// Build all the muxes for the feature flags in serial to avoid any race conditions
for featureFlagName, executionConfig := range opts.featureFlagConfigs {
if opts.changes != nil {
// if the ff is unchanged and still needed, we reuse it
_, hasChanged := opts.changes.ChangedConfigs[featureFlagName]
_, wasAdded := opts.changes.AddedConfigs[featureFlagName]

if !hasChanged && !wasAdded {
oldGraphMux, exists := opts.currentGraphMuxes[featureFlagName]
if exists {
featureFlagToMux[featureFlagName] = oldGraphMux.mux
s.graphMuxListLock.Lock()
s.graphMuxList[featureFlagName] = oldGraphMux
s.graphMuxListLock.Unlock()
oldGraphMux.reused.Store(true)
continue
}
}
}

gm, err := s.buildGraphMux(ctx, BuildGraphMuxOptions{
FeatureFlagName: featureFlagName,
RouterConfigVersion: executionConfig.GetVersion(),
Expand Down Expand Up @@ -544,7 +584,8 @@ func (s *graphServer) setupEngineStatistics(baseAttributes []attribute.KeyValue)
}

type graphMux struct {
mux *chi.Mux
mux *chi.Mux
reused atomic.Bool

planCache *ristretto.Cache[uint64, *planWithMetaData]
planFallbackCache *slowplancache.Cache[*planWithMetaData]
Expand Down Expand Up @@ -1734,7 +1775,7 @@ func (s *graphServer) buildGraphMux(

s.graphMuxListLock.Lock()
defer s.graphMuxListLock.Unlock()
s.graphMuxList = append(s.graphMuxList, gm)
s.graphMuxList[opts.FeatureFlagName] = gm

return gm, nil
}
Expand Down Expand Up @@ -1982,11 +2023,15 @@ func (s *graphServer) Shutdown(ctx context.Context) error {
}
}

// Shutdown all graphs muxes to release resources
// Shutdown graphs muxes, which are not reused by the next graph server, to release resources
// e.g. planner cache
s.graphMuxListLock.Lock()
defer s.graphMuxListLock.Unlock()
for _, mux := range s.graphMuxList {
if mux.reused.Load() {
mux.reused.Store(false) // set to false to avoid the mux from being skipped forever
continue
}
if err := mux.Shutdown(ctx); err != nil {
finalErr = errors.Join(finalErr, err)
}
Expand Down Expand Up @@ -2171,3 +2216,22 @@ func configureSubgraphOverwrites(

return subgraphs, nil
}

// currentGraphMuxes returns a list of currently active graph muxes
// used by the currently running graph server.
func currentGraphMuxes(r *Router) map[string]*graphMux {
currentState := r.httpServer.state.Load()
if currentState == nil {
return nil
}

currentGraphServer := currentState.graphServer
if currentGraphServer == nil {
return nil
}

currentGraphServer.graphMuxListLock.Lock()
defer currentGraphServer.graphMuxListLock.Unlock()

return maps.Clone(currentGraphServer.graphMuxList)
}
1 change: 1 addition & 0 deletions router/core/init_config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func InitializeConfigPoller(r *Router, registry *ProviderRegistry) (*configpolle
if hasSplitCfgFeature {
providerID := r.routerConfigPollerConfig.Storage.ProviderID
if providerID == "" {
r.logger.Debug("Use split-config poller to fetch execution config")
return newSplitConfigPoller(r)
}
r.logger.Info("split-config-loading feature is enabled but a custom storage provider is configured; falling back to regular config polling",
Expand Down
Loading
Loading