From a643c2ce8fc997a3cfdea8f7e7fa7c67728a123b Mon Sep 17 00:00:00 2001 From: Nick Chomey Date: Thu, 3 Apr 2025 19:34:19 -0600 Subject: [PATCH 1/4] add functionlity to hot reload yaml configs --- pkg/provisioning/service.go | 127 +++++++++++++++++++++++ pkg/provisioning/service_test.go | 168 +++++++++++++++++++++++++++++++ 2 files changed, 295 insertions(+) diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index 58158b2c6..486484bc7 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -67,6 +67,133 @@ func NewService( } } +// ReloadYaml parses a YAML input (either a file path or a YAML string) and reloads a pipeline configuration. +// This can be used to reload a pipeline configuration without restarting Conduit. +func (s *Service) ReloadYaml(ctx context.Context, yamlInput string) (string, error) { + // First, determine if the input is a file path or a YAML string + isFilePath := false + if !strings.Contains(yamlInput, "\n") && len(yamlInput) < 1024 { + if _, err := os.Stat(yamlInput); err == nil { + isFilePath = true + } + } + + var configs []config.Pipeline + var err error + var source string + + if isFilePath { + s.logger.Debug(ctx). + Str("file_path", yamlInput). + Msg("reloading pipeline configuration from file") + + configs, err = s.parsePipelineConfigFile(ctx, yamlInput) + if err != nil { + return "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err) + } + source = yamlInput + } else { + s.logger.Debug(ctx). + Msg("reloading pipeline configuration from YAML string") + + reader := strings.NewReader(yamlInput) + configs, err = s.parser.Parse(ctx, reader) + if err != nil { + return "", cerrors.Errorf("failed to parse YAML string: %w", err) + } + source = "" + } + + if len(configs) == 0 { + return "", cerrors.New("no pipelines found in the YAML input") + } + + pipelineID := configs[0].ID + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("using first pipeline from YAML") + + return s.reloadPipeline(ctx, configs, pipelineID, source) +} + +// reloadPipeline is a helper function that handles the common logic for reloading a pipeline +// from either a file or a YAML string. +func (s *Service) reloadPipeline(ctx context.Context, configs []config.Pipeline, pipelineID string, source string) (string, error) { + // Find the pipeline with the specified ID + var targetConfig config.Pipeline + found := false + for _, cfg := range configs { + if cfg.ID == pipelineID { + targetConfig = cfg + found = true + break + } + } + + if !found { + return "", cerrors.Errorf("pipeline with ID %q not found in source %q", pipelineID, source) + } + + // Check if pipeline already exists + pipelineInstance, err := s.pipelineService.Get(ctx, pipelineID) + if err != nil { + if cerrors.Is(err, pipeline.ErrInstanceNotFound) { + // Pipeline doesn't exist, create it + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Msg("pipeline doesn't exist, creating new pipeline") + } else { + return "", cerrors.Errorf("error getting pipeline instance with ID %q: %w", pipelineID, err) + } + } else { + // Pipeline exists, check if it was provisioned by config + if pipelineInstance.ProvisionedBy != pipeline.ProvisionTypeConfig { + return "", cerrors.Errorf("pipeline with ID %q was not provisioned by config: %w", pipelineID, ErrNotProvisionedByConfig) + } + + // Check if pipeline is running and stop it if needed + pipelineWasRunning := pipelineInstance.GetStatus() == pipeline.StatusRunning + if pipelineWasRunning { + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("stopping pipeline before updating configuration") + + err = s.lifecycleService.Stop(ctx, pipelineID, false) + if err != nil { + return "", cerrors.Errorf("could not stop pipeline %q before updating: %w", pipelineID, err) + } + } + + // Delete the existing pipeline + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("deleting existing pipeline before recreating") + + // Use s.Delete to properly clean up the pipeline + err = s.Delete(ctx, pipelineID) + if err != nil { + return "", cerrors.Errorf("could not delete existing pipeline %q: %w", pipelineID, err) + } + } + + // Provision the pipeline with the new configuration + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("provisioning pipeline with updated configuration") + + err = s.provisionPipeline(ctx, targetConfig) + if err != nil { + return "", cerrors.Errorf("pipeline %q, error while provisioning: %w", pipelineID, err) + } + + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Str("source", source). + Msg("pipeline configuration reloaded successfully") + + return pipelineID, nil +} + // Init provision pipelines defined in pipelinePath directory. should initialize pipeline service // before calling this function, and all pipelines should be stopped. func (s *Service) Init(ctx context.Context) error { diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index db9b753f5..11822390c 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -16,6 +16,7 @@ package provisioning import ( "context" + "fmt" "os" "testing" "time" @@ -479,6 +480,173 @@ func TestService_Delete(t *testing.T) { is.NoErr(err) } +func TestService_ReloadYaml(t *testing.T) { + is := is.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Add cleanup for output files that will be created during the test + t.Cleanup(func() { + for _, path := range []string{"./output-v1.txt", "./output-v2.txt", "./output-v3.txt", "./input.txt"} { + _ = os.Remove(path) + } + }) + + // Set up a real service with non-mock components for actual runtime verification + logger := log.InitLogger(zerolog.InfoLevel, log.FormatCLI) + logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{}) + + // Create a temporary database + db, err := badger.New(logger.Logger, t.TempDir()+"/test.db") + is.NoErr(err) + t.Cleanup(func() { + err := db.Close() + is.NoErr(err) + }) + + // Set up the necessary services + tokenService := connutils.NewAuthManager() + schemaRegistry, err := schemaregistry.NewSchemaRegistry(db) + is.NoErr(err) + connSchemaService := connutils.NewSchemaService(logger, schemaRegistry, tokenService) + + // Set up plugin services + connPluginService := conn_plugin.NewPluginService( + logger, + builtin.NewRegistry(logger, builtin.DefaultBuiltinConnectors, connSchemaService), + standalone.NewRegistry(logger, ""), + tokenService, + ) + connPluginService.Init(ctx, "conn-utils-token:12345") + + procPluginService := proc_plugin.NewPluginService( + logger, + proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry), + nil, + ) + + // Set up core services + plService := pipeline.NewService(logger, db) + connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) + procService := processor.NewService(logger, db, procPluginService) + + // Set up lifecycle service + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + MaxRetriesWindow: 5 * time.Minute, + } + lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService) + + // Create the provisioning service + service := NewService(db, logger, plService, connService, procService, connPluginService, lifecycleService, "") + + // Define the base YAML template + yamlTemplate := `version: 2.0 +pipelines: + - id: test-pipeline + status: running + name: Test Pipeline + description: %s + connectors: + - id: source + type: source + plugin: builtin:file + settings: + path: ./input.txt + - id: destination + type: destination + plugin: builtin:file + settings: + path: %s` + + // Define test cases + type testCase struct { + name string + source string + sourceType string + description string + outputPath string + } + + testCases := []testCase{ + { + name: "Initial configuration", + sourceType: "string", + description: "Initial configuration", + outputPath: "./output-v1.txt", + }, + { + name: "Update via YAML string", + sourceType: "string", + description: "Updated configuration via string", + outputPath: "./output-v2.txt", + }, + { + name: "Update via YAML file", + sourceType: "file", + description: "Updated configuration via file", + outputPath: "./output-v3.txt", + }, + } + + // Create a temporary file for the file-based test case + tmpfile, err := os.CreateTemp("", "test-pipeline-*.yaml") + is.NoErr(err) + defer os.Remove(tmpfile.Name()) + + // Run each test case + for i, tc := range testCases { + // Add a small delay between test cases to ensure the pipeline has time to stabilize + if i > 0 { + time.Sleep(1 * time.Second) + } + t.Logf("Running test case %d: %s", i+1, tc.name) + + // Generate the YAML content + yamlContent := fmt.Sprintf(yamlTemplate, tc.description, tc.outputPath) + + // Set up the source based on the source type + var source string + if tc.sourceType == "file" { + // Write the YAML content to the file + err = os.WriteFile(tmpfile.Name(), []byte(yamlContent), 0o644) + is.NoErr(err) + source = tmpfile.Name() + } else { + source = yamlContent + } + + // Reload the pipeline with the current configuration + pipelineID, err := service.ReloadYaml(ctx, source) + is.NoErr(err) + is.Equal(pipelineID, "test-pipeline") + + // Give the pipeline time to update + time.Sleep(500 * time.Millisecond) + + // Verify the configuration was applied correctly + pipeline, err := plService.Get(ctx, "test-pipeline") + is.NoErr(err) + is.Equal(pipeline.Config.Name, "Test Pipeline") + is.Equal(pipeline.Config.Description, tc.description) + + // Get the destination connector and verify its path setting + destConnID := "" + for _, connID := range pipeline.ConnectorIDs { + conn, err := connService.Get(ctx, connID) + is.NoErr(err) + if conn.ID == "test-pipeline:destination" { + destConnID = conn.ID + is.Equal(conn.Config.Settings["path"], tc.outputPath) + } + } + is.True(destConnID != "") + } +} + func TestService_IntegrationTestServices(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) From 807e085962e821da90d2681f0e74f952947d49df Mon Sep 17 00:00:00 2001 From: Nick Chomey Date: Thu, 3 Apr 2025 21:10:27 -0600 Subject: [PATCH 2/4] change reloadYaml to upsertYaml --- pkg/provisioning/service.go | 140 ++++++++++++++++++++----------- pkg/provisioning/service_test.go | 2 +- 2 files changed, 94 insertions(+), 48 deletions(-) diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index 486484bc7..bc9eef907 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -67,9 +67,8 @@ func NewService( } } -// ReloadYaml parses a YAML input (either a file path or a YAML string) and reloads a pipeline configuration. -// This can be used to reload a pipeline configuration without restarting Conduit. -func (s *Service) ReloadYaml(ctx context.Context, yamlInput string) (string, error) { +// parseYamlInput parses a YAML input (either a file path or a YAML string) and returns the first pipeline configuration. +func (s *Service) parseYamlInput(ctx context.Context, yamlInput string) (config.Pipeline, string, error) { // First, determine if the input is a file path or a YAML string isFilePath := false if !strings.Contains(yamlInput, "\n") && len(yamlInput) < 1024 { @@ -78,75 +77,108 @@ func (s *Service) ReloadYaml(ctx context.Context, yamlInput string) (string, err } } - var configs []config.Pipeline - var err error var source string + var pipelineConfig config.Pipeline + // Parse the YAML input if isFilePath { s.logger.Debug(ctx). Str("file_path", yamlInput). - Msg("reloading pipeline configuration from file") + Msg("parsing pipeline configuration from file") - configs, err = s.parsePipelineConfigFile(ctx, yamlInput) + // Parse the file and get the first pipeline configuration + pipelineConfigs, err := s.parsePipelineConfigFile(ctx, yamlInput) if err != nil { - return "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err) + return config.Pipeline{}, "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err) + } + + if len(pipelineConfigs) == 0 { + return config.Pipeline{}, "", cerrors.New("no pipelines found in the YAML file") } + + // Use the first pipeline from the file + pipelineConfig = pipelineConfigs[0] source = yamlInput } else { s.logger.Debug(ctx). - Msg("reloading pipeline configuration from YAML string") + Msg("parsing pipeline configuration from YAML string") reader := strings.NewReader(yamlInput) - configs, err = s.parser.Parse(ctx, reader) + pipelineConfigs, err := s.parser.Parse(ctx, reader) if err != nil { - return "", cerrors.Errorf("failed to parse YAML string: %w", err) + return config.Pipeline{}, "", cerrors.Errorf("failed to parse YAML string: %w", err) + } + + if len(pipelineConfigs) == 0 { + return config.Pipeline{}, "", cerrors.New("no pipelines found in the YAML string") } - source = "" - } - if len(configs) == 0 { - return "", cerrors.New("no pipelines found in the YAML input") + // Use the first pipeline from the string + pipelineConfig = pipelineConfigs[0] + source = "" } - pipelineID := configs[0].ID + // Extract the pipeline ID + pipelineID := pipelineConfig.ID s.logger.Debug(ctx). Str("pipeline_id", pipelineID). - Msg("using first pipeline from YAML") + Msg("using pipeline from YAML") - return s.reloadPipeline(ctx, configs, pipelineID, source) + return pipelineConfig, source, nil } -// reloadPipeline is a helper function that handles the common logic for reloading a pipeline -// from either a file or a YAML string. -func (s *Service) reloadPipeline(ctx context.Context, configs []config.Pipeline, pipelineID string, source string) (string, error) { - // Find the pipeline with the specified ID - var targetConfig config.Pipeline - found := false - for _, cfg := range configs { - if cfg.ID == pipelineID { - targetConfig = cfg - found = true - break - } +// UpsertYaml parses a YAML input (either a file path or a YAML string) and creates or updates a pipeline. +// If the pipeline doesn't exist, it will be created. If it does exist, it will be updated. +// This can be used to reload a pipeline configuration without restarting Conduit. +// If the YAML contains multiple pipeline definitions, only the first one will be used. +func (s *Service) UpsertYaml(ctx context.Context, yamlInput string) (string, error) { + // Parse the YAML input + pipelineConfig, source, err := s.parseYamlInput(ctx, yamlInput) + if err != nil { + return "", err } - if !found { - return "", cerrors.Errorf("pipeline with ID %q not found in source %q", pipelineID, source) - } + return s.reloadPipeline(ctx, pipelineConfig, source) +} + +// reloadPipeline is a helper function that handles the common logic for reloading a pipeline +// from either a file or a YAML string. It will create a new pipeline if it doesn't exist, +// or update an existing one if it does. +func (s *Service) reloadPipeline(ctx context.Context, pipelineConfig config.Pipeline, source string) (string, error) { + // Extract the pipeline ID from the config + pipelineID := pipelineConfig.ID // Check if pipeline already exists pipelineInstance, err := s.pipelineService.Get(ctx, pipelineID) if err != nil { if cerrors.Is(err, pipeline.ErrInstanceNotFound) { // Pipeline doesn't exist, create it - s.logger.Info(ctx). - Str("pipeline_id", pipelineID). - Msg("pipeline doesn't exist, creating new pipeline") + return s.upsertPipeline(ctx, nil, pipelineConfig, source) } else { return "", cerrors.Errorf("error getting pipeline instance with ID %q: %w", pipelineID, err) } } else { - // Pipeline exists, check if it was provisioned by config + // Pipeline exists, update it + return s.upsertPipeline(ctx, pipelineInstance, pipelineConfig, source) + } +} + +// upsertPipeline creates a new pipeline or updates an existing one with the given configuration. +// If pipelineInstance is nil, a new pipeline will be created. Otherwise, the existing pipeline will be updated. +func (s *Service) upsertPipeline(ctx context.Context, pipelineInstance *pipeline.Instance, pipelineConfig config.Pipeline, source string) (string, error) { + pipelineID := pipelineConfig.ID + + // Check if we're creating a new pipeline or updating an existing one + isCreate := pipelineInstance == nil + + if isCreate { + // Creating a new pipeline + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Msg("pipeline doesn't exist, creating new pipeline") + } else { + // Updating an existing pipeline + // Check if the pipeline was provisioned by config if pipelineInstance.ProvisionedBy != pipeline.ProvisionTypeConfig { return "", cerrors.Errorf("pipeline with ID %q was not provisioned by config: %w", pipelineID, ErrNotProvisionedByConfig) } @@ -158,9 +190,15 @@ func (s *Service) reloadPipeline(ctx context.Context, configs []config.Pipeline, Str("pipeline_id", pipelineID). Msg("stopping pipeline before updating configuration") - err = s.lifecycleService.Stop(ctx, pipelineID, false) + err := s.lifecycleService.Stop(ctx, pipelineID, false) if err != nil { - return "", cerrors.Errorf("could not stop pipeline %q before updating: %w", pipelineID, err) + // Ignore the error if the pipeline is not running + if !strings.Contains(err.Error(), "pipeline not running") { + return "", cerrors.Errorf("could not stop pipeline %q before updating: %w", pipelineID, err) + } + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("pipeline was not running, continuing with update") } } @@ -170,26 +208,34 @@ func (s *Service) reloadPipeline(ctx context.Context, configs []config.Pipeline, Msg("deleting existing pipeline before recreating") // Use s.Delete to properly clean up the pipeline - err = s.Delete(ctx, pipelineID) + err := s.Delete(ctx, pipelineID) if err != nil { return "", cerrors.Errorf("could not delete existing pipeline %q: %w", pipelineID, err) } } - // Provision the pipeline with the new configuration + // Provision the pipeline with the configuration s.logger.Debug(ctx). Str("pipeline_id", pipelineID). - Msg("provisioning pipeline with updated configuration") + Msg("provisioning pipeline") - err = s.provisionPipeline(ctx, targetConfig) + err := s.provisionPipeline(ctx, pipelineConfig) if err != nil { return "", cerrors.Errorf("pipeline %q, error while provisioning: %w", pipelineID, err) } - s.logger.Info(ctx). - Str("pipeline_id", pipelineID). - Str("source", source). - Msg("pipeline configuration reloaded successfully") + // Log success message based on whether we created or updated the pipeline + if isCreate { + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Str("source", source). + Msg("pipeline created successfully") + } else { + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Str("source", source). + Msg("pipeline configuration updated successfully") + } return pipelineID, nil } diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 11822390c..7126e6385 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -620,7 +620,7 @@ pipelines: } // Reload the pipeline with the current configuration - pipelineID, err := service.ReloadYaml(ctx, source) + pipelineID, err := service.UpsertYaml(ctx, source) is.NoErr(err) is.Equal(pipelineID, "test-pipeline") From 4c30e9a02b1c12da7559904daea14147d6147b72 Mon Sep 17 00:00:00 2001 From: Nick Chomey Date: Fri, 4 Apr 2025 11:27:01 -0600 Subject: [PATCH 3/4] tweaks to test --- pkg/provisioning/service_test.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 7126e6385..1fbdf81dd 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -480,7 +480,7 @@ func TestService_Delete(t *testing.T) { is.NoErr(err) } -func TestService_ReloadYaml(t *testing.T) { +func TestService_UpsertYaml(t *testing.T) { is := is.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -633,17 +633,10 @@ pipelines: is.Equal(pipeline.Config.Name, "Test Pipeline") is.Equal(pipeline.Config.Description, tc.description) - // Get the destination connector and verify its path setting - destConnID := "" - for _, connID := range pipeline.ConnectorIDs { - conn, err := connService.Get(ctx, connID) - is.NoErr(err) - if conn.ID == "test-pipeline:destination" { - destConnID = conn.ID - is.Equal(conn.Config.Settings["path"], tc.outputPath) - } - } - is.True(destConnID != "") + // Get the destination connector directly and verify its path setting + destConn, err := connService.Get(ctx, "test-pipeline:destination") + is.NoErr(err) + is.Equal(destConn.Config.Settings["path"], tc.outputPath) } } From 589d26585c3fa2b975885fcbcc7ad180070f5ee4 Mon Sep 17 00:00:00 2001 From: Nick Chomey Date: Wed, 16 Apr 2025 14:28:21 -0600 Subject: [PATCH 4/4] upsert multiple pipelines in single config --- pkg/provisioning/service.go | 95 ++++++++++++------- pkg/provisioning/service_test.go | 155 ++++++++++++++++++++++++++----- 2 files changed, 196 insertions(+), 54 deletions(-) diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index bc9eef907..d7d56568e 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -67,8 +67,8 @@ func NewService( } } -// parseYamlInput parses a YAML input (either a file path or a YAML string) and returns the first pipeline configuration. -func (s *Service) parseYamlInput(ctx context.Context, yamlInput string) (config.Pipeline, string, error) { +// parseYamlInput parses a YAML input (either a file path or a YAML string) and returns all pipeline configurations. +func (s *Service) parseYamlInput(ctx context.Context, yamlInput string) ([]config.Pipeline, string, error) { // First, determine if the input is a file path or a YAML string isFilePath := false if !strings.Contains(yamlInput, "\n") && len(yamlInput) < 1024 { @@ -78,67 +78,100 @@ func (s *Service) parseYamlInput(ctx context.Context, yamlInput string) (config. } var source string - var pipelineConfig config.Pipeline + var pipelineConfigs []config.Pipeline // Parse the YAML input if isFilePath { s.logger.Debug(ctx). Str("file_path", yamlInput). - Msg("parsing pipeline configuration from file") + Msg("parsing pipeline configurations from file") - // Parse the file and get the first pipeline configuration - pipelineConfigs, err := s.parsePipelineConfigFile(ctx, yamlInput) + // Parse the file and get all pipeline configurations + configs, err := s.parsePipelineConfigFile(ctx, yamlInput) if err != nil { - return config.Pipeline{}, "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err) + return nil, "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err) } - if len(pipelineConfigs) == 0 { - return config.Pipeline{}, "", cerrors.New("no pipelines found in the YAML file") + if len(configs) == 0 { + return nil, "", cerrors.New("no pipelines found in the YAML file") } - // Use the first pipeline from the file - pipelineConfig = pipelineConfigs[0] + // Use all pipelines from the file + pipelineConfigs = configs source = yamlInput } else { s.logger.Debug(ctx). - Msg("parsing pipeline configuration from YAML string") + Msg("parsing pipeline configurations from YAML string") reader := strings.NewReader(yamlInput) - pipelineConfigs, err := s.parser.Parse(ctx, reader) + configs, err := s.parser.Parse(ctx, reader) if err != nil { - return config.Pipeline{}, "", cerrors.Errorf("failed to parse YAML string: %w", err) + return nil, "", cerrors.Errorf("failed to parse YAML string: %w", err) } - if len(pipelineConfigs) == 0 { - return config.Pipeline{}, "", cerrors.New("no pipelines found in the YAML string") + if len(configs) == 0 { + return nil, "", cerrors.New("no pipelines found in the YAML string") } - // Use the first pipeline from the string - pipelineConfig = pipelineConfigs[0] + // Use all pipelines from the string + pipelineConfigs = configs source = "" } - // Extract the pipeline ID - pipelineID := pipelineConfig.ID + // Log the number of pipelines found s.logger.Debug(ctx). - Str("pipeline_id", pipelineID). - Msg("using pipeline from YAML") + Int("pipeline_count", len(pipelineConfigs)). + Msg("found pipelines in YAML") - return pipelineConfig, source, nil + return pipelineConfigs, source, nil } -// UpsertYaml parses a YAML input (either a file path or a YAML string) and creates or updates a pipeline. -// If the pipeline doesn't exist, it will be created. If it does exist, it will be updated. -// This can be used to reload a pipeline configuration without restarting Conduit. -// If the YAML contains multiple pipeline definitions, only the first one will be used. -func (s *Service) UpsertYaml(ctx context.Context, yamlInput string) (string, error) { +// UpsertYamlResult represents the result of processing a YAML input with multiple pipelines +type UpsertYamlResult struct { + PipelineIDs []string `json:"pipelineIDs"` // IDs of all successfully processed pipelines +} + +// UpsertYaml parses a YAML input (either a file path or a YAML string) and creates or updates pipelines. +// If a pipeline doesn't exist, it will be created. If it does exist, it will be updated. +// This can be used to reload pipeline configurations without restarting Conduit. +// The function processes all pipeline definitions found in the YAML input. +// Returns a list of all successfully processed pipeline IDs, or an error if parsing failed. +func (s *Service) UpsertYaml(ctx context.Context, yamlInput string) (*UpsertYamlResult, error) { // Parse the YAML input - pipelineConfig, source, err := s.parseYamlInput(ctx, yamlInput) + pipelineConfigs, source, err := s.parseYamlInput(ctx, yamlInput) if err != nil { - return "", err + return nil, err + } + + // Initialize the result + result := &UpsertYamlResult{ + PipelineIDs: make([]string, 0, len(pipelineConfigs)), + } + + // Process all pipelines + var errs []error + + s.logger.Info(ctx). + Int("pipeline_count", len(pipelineConfigs)). + Msg("processing pipelines from YAML") + + for i, pipelineConfig := range pipelineConfigs { + // Process the pipeline + pipelineID, err := s.reloadPipeline(ctx, pipelineConfig, source) + if err != nil { + errs = append(errs, cerrors.Errorf("failed to process pipeline %d (%s): %w", i, pipelineConfig.ID, err)) + } else { + // Add successfully processed pipeline ID to the result + result.PipelineIDs = append(result.PipelineIDs, pipelineID) + } + } + + // If there were any errors, return them along with the partial result + if len(errs) > 0 { + return result, cerrors.Join(errs...) } - return s.reloadPipeline(ctx, pipelineConfig, source) + return result, nil } // reloadPipeline is a helper function that handles the common logic for reloading a pipeline diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 1fbdf81dd..6bc2800b6 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -543,8 +543,8 @@ func TestService_UpsertYaml(t *testing.T) { // Create the provisioning service service := NewService(db, logger, plService, connService, procService, connPluginService, lifecycleService, "") - // Define the base YAML template - yamlTemplate := `version: 2.0 + // Define the base YAML template for single pipeline + singlePipelineTemplate := `version: 2.2 pipelines: - id: test-pipeline status: running @@ -562,33 +562,87 @@ pipelines: settings: path: %s` + // Define the base YAML template for multiple pipelines + // Note: Each pipeline needs its own YAML document with version number + multiPipelineTemplate := `--- +version: 2.2 +pipelines: + - id: test-pipeline-1 + status: running + name: Test Pipeline 1 + description: %s + connectors: + - id: source-1 + type: source + plugin: builtin:file + settings: + path: ./input.txt + - id: destination-1 + type: destination + plugin: builtin:file + settings: + path: %s +--- +version: 2.2 +pipelines: + - id: test-pipeline-2 + status: running + name: Test Pipeline 2 + description: %s + connectors: + - id: source-2 + type: source + plugin: builtin:file + settings: + path: ./input.txt + - id: destination-2 + type: destination + plugin: builtin:file + settings: + path: %s` + // Define test cases type testCase struct { - name string - source string - sourceType string - description string - outputPath string + name string + source string + sourceType string + description string + outputPath string + multiple bool // Whether to use multiple pipelines + description2 string // Description for second pipeline (if multiple) + outputPath2 string // Output path for second pipeline (if multiple) } testCases := []testCase{ { - name: "Initial configuration", + name: "Initial configuration (single pipeline)", sourceType: "string", description: "Initial configuration", outputPath: "./output-v1.txt", + multiple: false, }, { - name: "Update via YAML string", + name: "Update via YAML string (single pipeline)", sourceType: "string", description: "Updated configuration via string", outputPath: "./output-v2.txt", + multiple: false, }, { - name: "Update via YAML file", + name: "Update via YAML file (single pipeline)", sourceType: "file", description: "Updated configuration via file", outputPath: "./output-v3.txt", + multiple: false, + }, + { + name: "Multiple pipelines via YAML string", + sourceType: "string", + description: "First pipeline in multi-config", + outputPath: "./output-multi-1.txt", + multiple: true, + description2: "Second pipeline in multi-config", + outputPath2: "./output-multi-2.txt", }, } @@ -597,6 +651,13 @@ pipelines: is.NoErr(err) defer os.Remove(tmpfile.Name()) + // Add cleanup for additional output files that will be created during the test + t.Cleanup(func() { + for _, path := range []string{"./output-multi-1.txt", "./output-multi-2.txt"} { + _ = os.Remove(path) + } + }) + // Run each test case for i, tc := range testCases { // Add a small delay between test cases to ensure the pipeline has time to stabilize @@ -605,8 +666,13 @@ pipelines: } t.Logf("Running test case %d: %s", i+1, tc.name) - // Generate the YAML content - yamlContent := fmt.Sprintf(yamlTemplate, tc.description, tc.outputPath) + // Generate the YAML content based on whether we're testing single or multiple pipelines + var yamlContent string + if tc.multiple { + yamlContent = fmt.Sprintf(multiPipelineTemplate, tc.description, tc.outputPath, tc.description2, tc.outputPath2) + } else { + yamlContent = fmt.Sprintf(singlePipelineTemplate, tc.description, tc.outputPath) + } // Set up the source based on the source type var source string @@ -620,23 +686,66 @@ pipelines: } // Reload the pipeline with the current configuration - pipelineID, err := service.UpsertYaml(ctx, source) + var expectedPipelineIDs []string + if tc.multiple { + expectedPipelineIDs = []string{"test-pipeline-1", "test-pipeline-2"} // Both pipeline IDs in multi-pipeline case + } else { + expectedPipelineIDs = []string{"test-pipeline"} // Single pipeline case + } + + result, err := service.UpsertYaml(ctx, source) is.NoErr(err) - is.Equal(pipelineID, "test-pipeline") + is.Equal(len(result.PipelineIDs), len(expectedPipelineIDs)) + + // Check that all expected pipeline IDs are in the result + for _, expectedID := range expectedPipelineIDs { + found := false + for _, actualID := range result.PipelineIDs { + if actualID == expectedID { + found = true + break + } + } + is.True(found) + } // Give the pipeline time to update time.Sleep(500 * time.Millisecond) - // Verify the configuration was applied correctly - pipeline, err := plService.Get(ctx, "test-pipeline") - is.NoErr(err) - is.Equal(pipeline.Config.Name, "Test Pipeline") - is.Equal(pipeline.Config.Description, tc.description) + if tc.multiple { + // Verify the first pipeline was created/updated correctly + pipeline1, err := plService.Get(ctx, "test-pipeline-1") + is.NoErr(err) + is.Equal(pipeline1.Config.Name, "Test Pipeline 1") + is.Equal(pipeline1.Config.Description, tc.description) - // Get the destination connector directly and verify its path setting - destConn, err := connService.Get(ctx, "test-pipeline:destination") - is.NoErr(err) - is.Equal(destConn.Config.Settings["path"], tc.outputPath) + // Get the destination connector directly and verify its path setting + destConn1, err := connService.Get(ctx, "test-pipeline-1:destination-1") + is.NoErr(err) + is.Equal(destConn1.Config.Settings["path"], tc.outputPath) + + // Verify the second pipeline was created/updated correctly + pipeline2, err := plService.Get(ctx, "test-pipeline-2") + is.NoErr(err) + is.Equal(pipeline2.Config.Name, "Test Pipeline 2") + is.Equal(pipeline2.Config.Description, tc.description2) + + // Get the destination connector directly and verify its path setting + destConn2, err := connService.Get(ctx, "test-pipeline-2:destination-2") + is.NoErr(err) + is.Equal(destConn2.Config.Settings["path"], tc.outputPath2) + } else { + // Verify the configuration was applied correctly for single pipeline + pipeline, err := plService.Get(ctx, "test-pipeline") + is.NoErr(err) + is.Equal(pipeline.Config.Name, "Test Pipeline") + is.Equal(pipeline.Config.Description, tc.description) + + // Get the destination connector directly and verify its path setting + destConn, err := connService.Get(ctx, "test-pipeline:destination") + is.NoErr(err) + is.Equal(destConn.Config.Settings["path"], tc.outputPath) + } } }