Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"

"github.com/openshift/monitoring-plugin/internal/managementrouter"
Expand Down Expand Up @@ -127,18 +128,11 @@ func createHTTPServer(ctx context.Context, cfg *Config) (*http.Server, error) {
var k8sconfig *rest.Config
var err error

// Uncomment the following line for local development:
// k8sconfig, err = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
// if err != nil {
// return nil, fmt.Errorf("cannot get kubeconfig from file: %w", err)
// }

// Comment the following line for local development:
var k8sclient *dynamic.DynamicClient
if acmMode || alertManagementAPIMode {
k8sconfig, err = rest.InClusterConfig()
k8sconfig, err = loadKubeConfig()
if err != nil {
return nil, fmt.Errorf("cannot get in cluster config: %w", err)
return nil, fmt.Errorf("cannot load kubernetes config: %w", err)
}

k8sclient, err = dynamic.NewForConfig(k8sconfig)
Expand All @@ -154,15 +148,20 @@ func createHTTPServer(ctx context.Context, cfg *Config) (*http.Server, error) {
// hang the entire server startup indefinitely.
var managementClient management.Client
if alertManagementAPIMode {
const initTimeout = 30 * time.Second
initCtx, initCancel := context.WithTimeout(ctx, initTimeout)
defer initCancel()

k8sClient, err := k8s.NewClient(initCtx, k8sconfig)
// The k8s client must receive the long-lived ctx so that its
// informers keep running for the lifetime of the server.
// Do NOT pass a timeout-scoped context here: informers that
// are started with a cancelled context stop watching and the
// relabeled-rules cache freezes.
k8sClient, err := k8s.NewClient(ctx, k8sconfig)
if err != nil {
return nil, fmt.Errorf("failed to create k8s client for alert management API: %w", err)
}

const initTimeout = 30 * time.Second
initCtx, initCancel := context.WithTimeout(ctx, initTimeout)
defer initCancel()

if err := k8sClient.TestConnection(initCtx); err != nil {
return nil, fmt.Errorf("failed to connect to kubernetes cluster for alert management API: %w", err)
}
Expand Down Expand Up @@ -402,6 +401,23 @@ func configHandler(cfg *Config) (http.HandlerFunc, *PluginConfig) {
}), &pluginConfig
}

// loadKubeConfig returns a *rest.Config by preferring KUBECONFIG (useful for
// local development and CI) and falling back to in-cluster service-account config.
func loadKubeConfig() (*rest.Config, error) {
if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("cannot build config from KUBECONFIG: %w", err)
}
return cfg, nil
}
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("cannot get in-cluster config: %w", err)
}
return cfg, nil
}

func startProxy(cfg *Config, k8sclient *dynamic.DynamicClient, tlsConfig *tls.Config, timeout time.Duration, kind proxy.KindType, port proxy.ProxyPort) {
proxyRouter := setupProxyRoutes(cfg, k8sclient, kind)
proxyRouter.Use(corsHeaderMiddleware())
Expand Down
7 changes: 4 additions & 3 deletions test/e2e/create_alert_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ func TestCreateUserDefinedAlertRule(t *testing.T) {
}
defer cleanup()

createExpr := "vector(1) or vector(0)"
payload := managementrouter.CreateAlertRuleRequest{
AlertingRule: &managementrouter.AlertRuleSpec{
Alert: strPtr("E2ECreateAlert"),
Expr: strPtr("vector(1)"),
Expr: &createExpr,
For: strPtr("1m"),
Labels: &map[string]string{
"severity": "info",
Expand Down Expand Up @@ -102,8 +103,8 @@ func TestCreateUserDefinedAlertRule(t *testing.T) {
for _, rule := range group.Rules {
if rule.Alert == "E2ECreateAlert" {
foundAlert = true
if rule.Expr.String() != "vector(1)" {
t.Errorf("Expected expr 'vector(1)', got %q", rule.Expr.String())
if rule.Expr.String() != createExpr {
t.Errorf("Expected expr %q, got %q", createExpr, rule.Expr.String())
}
if rule.For == nil || string(*rule.For) != "1m" {
t.Errorf("Expected for '1m', got %v", rule.For)
Expand Down
5 changes: 5 additions & 0 deletions test/e2e/delete_alert_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -42,6 +43,10 @@ func TestDeleteAlertRule(t *testing.T) {

t.Logf("Created 3 rules with IDs: %v", ruleIDs)

// Allow time for the informer watch event to propagate and
// the relabeled-rules cache to sync the new PrometheusRule.
time.Sleep(2 * time.Second)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to be flaky: a better option is to

  1. create the rules
  2. check that they are present
  3. delete them

All 3 steps should use a retry mechanism in case the API has a blip.


deleteReq := managementrouter.BulkDeleteAlertRulesRequest{
RuleIds: []string{ruleIDs[0], ruleIDs[1]},
}
Expand Down
78 changes: 77 additions & 1 deletion test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ import (

osmv1client "github.com/openshift/client-go/monitoring/clientset/versioned"
monitoringv1client "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
authv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/openshift/monitoring-plugin/pkg/k8s"
Expand Down Expand Up @@ -73,12 +77,17 @@ func New() (*Framework, error) {
return nil, fmt.Errorf("failed to create osmv1 clientset: %w", err)
}

bearerToken, err := resolveToken(clientset, config)
if err != nil {
return nil, fmt.Errorf("failed to resolve bearer token: %w", err)
}

f = &Framework{
Clientset: clientset,
Monitoringv1clientset: monitoringv1clientset,
Osmv1clientset: osmv1clientset,
PluginURL: pluginURL,
BearerToken: config.BearerToken,
BearerToken: bearerToken,
}

return f, nil
Expand Down Expand Up @@ -135,3 +144,70 @@ func (f *Framework) AuthorizedRequest(ctx context.Context, method, url string, b
}
return req, nil
}

// resolveToken returns a bearer token for authenticating HTTP requests to the
// plugin server. It tries, in order: inline BearerToken, BearerTokenFile, and
// finally creates a short-lived ServiceAccount token (for CI kubeconfigs that
// use client-certificate auth).
func resolveToken(clientset *kubernetes.Clientset, config *rest.Config) (string, error) {
if config.BearerToken != "" {
return config.BearerToken, nil
}
if config.BearerTokenFile != "" {
data, err := os.ReadFile(config.BearerTokenFile)
if err != nil {
return "", fmt.Errorf("reading bearer token file: %w", err)
}
token := strings.TrimSpace(string(data))
if token != "" {
return token, nil
}
}
return createServiceAccountToken(clientset)
}

// createServiceAccountToken creates a ServiceAccount with cluster-admin
// privileges and returns a short-lived bearer token for it. This covers CI
// environments where the kubeconfig authenticates via client certificates
// and no bearer token is available.
func createServiceAccountToken(clientset *kubernetes.Clientset) (string, error) {
ctx := context.Background()
const saName = "e2e-management-api"
const ns = "default"

sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{Name: saName},
}
if _, err := clientset.CoreV1().ServiceAccounts(ns).Create(ctx, sa, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
return "", fmt.Errorf("creating service account: %w", err)
}

crb := &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: saName},
Subjects: []rbacv1.Subject{{
Kind: rbacv1.ServiceAccountKind,
Name: saName,
Namespace: ns,
}},
RoleRef: rbacv1.RoleRef{
APIGroup: rbacv1.GroupName,
Kind: "ClusterRole",
Name: "cluster-admin",
},
}
if _, err := clientset.RbacV1().ClusterRoleBindings().Create(ctx, crb, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
return "", fmt.Errorf("creating cluster role binding: %w", err)
}

expSeconds := int64(3600)
treq := &authv1.TokenRequest{
Spec: authv1.TokenRequestSpec{
ExpirationSeconds: &expSeconds,
},
}
resp, err := clientset.CoreV1().ServiceAccounts(ns).CreateToken(ctx, saName, treq, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("requesting service account token: %w", err)
}
return resp.Status.Token, nil
}
9 changes: 8 additions & 1 deletion test/e2e/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
Expand All @@ -17,10 +18,16 @@ func strPtr(s string) *string { return &s }
func createRuleViaAPI(t *testing.T, f *framework.Framework, ctx context.Context, namespace, alertName, prName string) string {
t.Helper()

// Each rule needs a unique expression so the spec-equivalence check
// does not reject it as a duplicate of a rule from another test.
// absent() returns 1 when the selector matches nothing, which is
// always the case for a fabricated metric name.
expr := fmt.Sprintf("absent(nonexistent{e2e_rule=%q})", alertName)

payload := managementrouter.CreateAlertRuleRequest{
AlertingRule: &managementrouter.AlertRuleSpec{
Alert: &alertName,
Expr: strPtr("vector(1)"),
Expr: &expr,
For: strPtr("1m"),
Labels: &map[string]string{
"severity": "info",
Expand Down