diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index 58158b2c6..d7d56568e 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -67,6 +67,212 @@ func NewService( } } +// parseYamlInput parses a YAML input (either a file path or a YAML string) and returns all pipeline configurations. +func (s *Service) parseYamlInput(ctx context.Context, yamlInput string) ([]config.Pipeline, string, error) { + // First, determine if the input is a file path or a YAML string + isFilePath := false + if !strings.Contains(yamlInput, "\n") && len(yamlInput) < 1024 { + if _, err := os.Stat(yamlInput); err == nil { + isFilePath = true + } + } + + var source string + var pipelineConfigs []config.Pipeline + + // Parse the YAML input + if isFilePath { + s.logger.Debug(ctx). + Str("file_path", yamlInput). + Msg("parsing pipeline configurations from file") + + // Parse the file and get all pipeline configurations + configs, err := s.parsePipelineConfigFile(ctx, yamlInput) + if err != nil { + return nil, "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err) + } + + if len(configs) == 0 { + return nil, "", cerrors.New("no pipelines found in the YAML file") + } + + // Use all pipelines from the file + pipelineConfigs = configs + source = yamlInput + } else { + s.logger.Debug(ctx). + Msg("parsing pipeline configurations from YAML string") + + reader := strings.NewReader(yamlInput) + configs, err := s.parser.Parse(ctx, reader) + if err != nil { + return nil, "", cerrors.Errorf("failed to parse YAML string: %w", err) + } + + if len(configs) == 0 { + return nil, "", cerrors.New("no pipelines found in the YAML string") + } + + // Use all pipelines from the string + pipelineConfigs = configs + source = "" + } + + // Log the number of pipelines found + s.logger.Debug(ctx). + Int("pipeline_count", len(pipelineConfigs)). + Msg("found pipelines in YAML") + + return pipelineConfigs, source, nil +} + +// UpsertYamlResult represents the result of processing a YAML input with multiple pipelines +type UpsertYamlResult struct { + PipelineIDs []string `json:"pipelineIDs"` // IDs of all successfully processed pipelines +} + +// UpsertYaml parses a YAML input (either a file path or a YAML string) and creates or updates pipelines. +// If a pipeline doesn't exist, it will be created. If it does exist, it will be updated. +// This can be used to reload pipeline configurations without restarting Conduit. +// The function processes all pipeline definitions found in the YAML input. +// Returns a list of all successfully processed pipeline IDs, or an error if parsing failed. +func (s *Service) UpsertYaml(ctx context.Context, yamlInput string) (*UpsertYamlResult, error) { + // Parse the YAML input + pipelineConfigs, source, err := s.parseYamlInput(ctx, yamlInput) + if err != nil { + return nil, err + } + + // Initialize the result + result := &UpsertYamlResult{ + PipelineIDs: make([]string, 0, len(pipelineConfigs)), + } + + // Process all pipelines + var errs []error + + s.logger.Info(ctx). + Int("pipeline_count", len(pipelineConfigs)). + Msg("processing pipelines from YAML") + + for i, pipelineConfig := range pipelineConfigs { + // Process the pipeline + pipelineID, err := s.reloadPipeline(ctx, pipelineConfig, source) + if err != nil { + errs = append(errs, cerrors.Errorf("failed to process pipeline %d (%s): %w", i, pipelineConfig.ID, err)) + } else { + // Add successfully processed pipeline ID to the result + result.PipelineIDs = append(result.PipelineIDs, pipelineID) + } + } + + // If there were any errors, return them along with the partial result + if len(errs) > 0 { + return result, cerrors.Join(errs...) + } + + return result, nil +} + +// reloadPipeline is a helper function that handles the common logic for reloading a pipeline +// from either a file or a YAML string. It will create a new pipeline if it doesn't exist, +// or update an existing one if it does. +func (s *Service) reloadPipeline(ctx context.Context, pipelineConfig config.Pipeline, source string) (string, error) { + // Extract the pipeline ID from the config + pipelineID := pipelineConfig.ID + + // Check if pipeline already exists + pipelineInstance, err := s.pipelineService.Get(ctx, pipelineID) + if err != nil { + if cerrors.Is(err, pipeline.ErrInstanceNotFound) { + // Pipeline doesn't exist, create it + return s.upsertPipeline(ctx, nil, pipelineConfig, source) + } else { + return "", cerrors.Errorf("error getting pipeline instance with ID %q: %w", pipelineID, err) + } + } else { + // Pipeline exists, update it + return s.upsertPipeline(ctx, pipelineInstance, pipelineConfig, source) + } +} + +// upsertPipeline creates a new pipeline or updates an existing one with the given configuration. +// If pipelineInstance is nil, a new pipeline will be created. Otherwise, the existing pipeline will be updated. +func (s *Service) upsertPipeline(ctx context.Context, pipelineInstance *pipeline.Instance, pipelineConfig config.Pipeline, source string) (string, error) { + pipelineID := pipelineConfig.ID + + // Check if we're creating a new pipeline or updating an existing one + isCreate := pipelineInstance == nil + + if isCreate { + // Creating a new pipeline + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Msg("pipeline doesn't exist, creating new pipeline") + } else { + // Updating an existing pipeline + // Check if the pipeline was provisioned by config + if pipelineInstance.ProvisionedBy != pipeline.ProvisionTypeConfig { + return "", cerrors.Errorf("pipeline with ID %q was not provisioned by config: %w", pipelineID, ErrNotProvisionedByConfig) + } + + // Check if pipeline is running and stop it if needed + pipelineWasRunning := pipelineInstance.GetStatus() == pipeline.StatusRunning + if pipelineWasRunning { + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("stopping pipeline before updating configuration") + + err := s.lifecycleService.Stop(ctx, pipelineID, false) + if err != nil { + // Ignore the error if the pipeline is not running + if !strings.Contains(err.Error(), "pipeline not running") { + return "", cerrors.Errorf("could not stop pipeline %q before updating: %w", pipelineID, err) + } + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("pipeline was not running, continuing with update") + } + } + + // Delete the existing pipeline + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("deleting existing pipeline before recreating") + + // Use s.Delete to properly clean up the pipeline + err := s.Delete(ctx, pipelineID) + if err != nil { + return "", cerrors.Errorf("could not delete existing pipeline %q: %w", pipelineID, err) + } + } + + // Provision the pipeline with the configuration + s.logger.Debug(ctx). + Str("pipeline_id", pipelineID). + Msg("provisioning pipeline") + + err := s.provisionPipeline(ctx, pipelineConfig) + if err != nil { + return "", cerrors.Errorf("pipeline %q, error while provisioning: %w", pipelineID, err) + } + + // Log success message based on whether we created or updated the pipeline + if isCreate { + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Str("source", source). + Msg("pipeline created successfully") + } else { + s.logger.Info(ctx). + Str("pipeline_id", pipelineID). + Str("source", source). + Msg("pipeline configuration updated successfully") + } + + return pipelineID, nil +} + // Init provision pipelines defined in pipelinePath directory. should initialize pipeline service // before calling this function, and all pipelines should be stopped. func (s *Service) Init(ctx context.Context) error { diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index db9b753f5..6bc2800b6 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -16,6 +16,7 @@ package provisioning import ( "context" + "fmt" "os" "testing" "time" @@ -479,6 +480,275 @@ func TestService_Delete(t *testing.T) { is.NoErr(err) } +func TestService_UpsertYaml(t *testing.T) { + is := is.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Add cleanup for output files that will be created during the test + t.Cleanup(func() { + for _, path := range []string{"./output-v1.txt", "./output-v2.txt", "./output-v3.txt", "./input.txt"} { + _ = os.Remove(path) + } + }) + + // Set up a real service with non-mock components for actual runtime verification + logger := log.InitLogger(zerolog.InfoLevel, log.FormatCLI) + logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{}) + + // Create a temporary database + db, err := badger.New(logger.Logger, t.TempDir()+"/test.db") + is.NoErr(err) + t.Cleanup(func() { + err := db.Close() + is.NoErr(err) + }) + + // Set up the necessary services + tokenService := connutils.NewAuthManager() + schemaRegistry, err := schemaregistry.NewSchemaRegistry(db) + is.NoErr(err) + connSchemaService := connutils.NewSchemaService(logger, schemaRegistry, tokenService) + + // Set up plugin services + connPluginService := conn_plugin.NewPluginService( + logger, + builtin.NewRegistry(logger, builtin.DefaultBuiltinConnectors, connSchemaService), + standalone.NewRegistry(logger, ""), + tokenService, + ) + connPluginService.Init(ctx, "conn-utils-token:12345") + + procPluginService := proc_plugin.NewPluginService( + logger, + proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry), + nil, + ) + + // Set up core services + plService := pipeline.NewService(logger, db) + connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) + procService := processor.NewService(logger, db, procPluginService) + + // Set up lifecycle service + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + MaxRetriesWindow: 5 * time.Minute, + } + lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService) + + // Create the provisioning service + service := NewService(db, logger, plService, connService, procService, connPluginService, lifecycleService, "") + + // Define the base YAML template for single pipeline + singlePipelineTemplate := `version: 2.2 +pipelines: + - id: test-pipeline + status: running + name: Test Pipeline + description: %s + connectors: + - id: source + type: source + plugin: builtin:file + settings: + path: ./input.txt + - id: destination + type: destination + plugin: builtin:file + settings: + path: %s` + + // Define the base YAML template for multiple pipelines + // Note: Each pipeline needs its own YAML document with version number + multiPipelineTemplate := `--- +version: 2.2 +pipelines: + - id: test-pipeline-1 + status: running + name: Test Pipeline 1 + description: %s + connectors: + - id: source-1 + type: source + plugin: builtin:file + settings: + path: ./input.txt + - id: destination-1 + type: destination + plugin: builtin:file + settings: + path: %s +--- +version: 2.2 +pipelines: + - id: test-pipeline-2 + status: running + name: Test Pipeline 2 + description: %s + connectors: + - id: source-2 + type: source + plugin: builtin:file + settings: + path: ./input.txt + - id: destination-2 + type: destination + plugin: builtin:file + settings: + path: %s` + + // Define test cases + type testCase struct { + name string + source string + sourceType string + description string + outputPath string + multiple bool // Whether to use multiple pipelines + description2 string // Description for second pipeline (if multiple) + outputPath2 string // Output path for second pipeline (if multiple) + } + + testCases := []testCase{ + { + name: "Initial configuration (single pipeline)", + sourceType: "string", + description: "Initial configuration", + outputPath: "./output-v1.txt", + multiple: false, + }, + { + name: "Update via YAML string (single pipeline)", + sourceType: "string", + description: "Updated configuration via string", + outputPath: "./output-v2.txt", + multiple: false, + }, + { + name: "Update via YAML file (single pipeline)", + sourceType: "file", + description: "Updated configuration via file", + outputPath: "./output-v3.txt", + multiple: false, + }, + { + name: "Multiple pipelines via YAML string", + sourceType: "string", + description: "First pipeline in multi-config", + outputPath: "./output-multi-1.txt", + multiple: true, + description2: "Second pipeline in multi-config", + outputPath2: "./output-multi-2.txt", + }, + } + + // Create a temporary file for the file-based test case + tmpfile, err := os.CreateTemp("", "test-pipeline-*.yaml") + is.NoErr(err) + defer os.Remove(tmpfile.Name()) + + // Add cleanup for additional output files that will be created during the test + t.Cleanup(func() { + for _, path := range []string{"./output-multi-1.txt", "./output-multi-2.txt"} { + _ = os.Remove(path) + } + }) + + // Run each test case + for i, tc := range testCases { + // Add a small delay between test cases to ensure the pipeline has time to stabilize + if i > 0 { + time.Sleep(1 * time.Second) + } + t.Logf("Running test case %d: %s", i+1, tc.name) + + // Generate the YAML content based on whether we're testing single or multiple pipelines + var yamlContent string + if tc.multiple { + yamlContent = fmt.Sprintf(multiPipelineTemplate, tc.description, tc.outputPath, tc.description2, tc.outputPath2) + } else { + yamlContent = fmt.Sprintf(singlePipelineTemplate, tc.description, tc.outputPath) + } + + // Set up the source based on the source type + var source string + if tc.sourceType == "file" { + // Write the YAML content to the file + err = os.WriteFile(tmpfile.Name(), []byte(yamlContent), 0o644) + is.NoErr(err) + source = tmpfile.Name() + } else { + source = yamlContent + } + + // Reload the pipeline with the current configuration + var expectedPipelineIDs []string + if tc.multiple { + expectedPipelineIDs = []string{"test-pipeline-1", "test-pipeline-2"} // Both pipeline IDs in multi-pipeline case + } else { + expectedPipelineIDs = []string{"test-pipeline"} // Single pipeline case + } + + result, err := service.UpsertYaml(ctx, source) + is.NoErr(err) + is.Equal(len(result.PipelineIDs), len(expectedPipelineIDs)) + + // Check that all expected pipeline IDs are in the result + for _, expectedID := range expectedPipelineIDs { + found := false + for _, actualID := range result.PipelineIDs { + if actualID == expectedID { + found = true + break + } + } + is.True(found) + } + + // Give the pipeline time to update + time.Sleep(500 * time.Millisecond) + + if tc.multiple { + // Verify the first pipeline was created/updated correctly + pipeline1, err := plService.Get(ctx, "test-pipeline-1") + is.NoErr(err) + is.Equal(pipeline1.Config.Name, "Test Pipeline 1") + is.Equal(pipeline1.Config.Description, tc.description) + + // Get the destination connector directly and verify its path setting + destConn1, err := connService.Get(ctx, "test-pipeline-1:destination-1") + is.NoErr(err) + is.Equal(destConn1.Config.Settings["path"], tc.outputPath) + + // Verify the second pipeline was created/updated correctly + pipeline2, err := plService.Get(ctx, "test-pipeline-2") + is.NoErr(err) + is.Equal(pipeline2.Config.Name, "Test Pipeline 2") + is.Equal(pipeline2.Config.Description, tc.description2) + + // Get the destination connector directly and verify its path setting + destConn2, err := connService.Get(ctx, "test-pipeline-2:destination-2") + is.NoErr(err) + is.Equal(destConn2.Config.Settings["path"], tc.outputPath2) + } else { + // Verify the configuration was applied correctly for single pipeline + pipeline, err := plService.Get(ctx, "test-pipeline") + is.NoErr(err) + is.Equal(pipeline.Config.Name, "Test Pipeline") + is.Equal(pipeline.Config.Description, tc.description) + + // Get the destination connector directly and verify its path setting + destConn, err := connService.Get(ctx, "test-pipeline:destination") + is.NoErr(err) + is.Equal(destConn.Config.Settings["path"], tc.outputPath) + } + } +} + func TestService_IntegrationTestServices(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background())