diff --git a/app/graphql/mutations/velorum/generate_flow.rb b/app/graphql/mutations/velorum/generate_flow.rb new file mode 100644 index 00000000..ba608c27 --- /dev/null +++ b/app/graphql/mutations/velorum/generate_flow.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +module Mutations + module Velorum + class GenerateFlow < BaseMutation + description 'Start a Velorum flow generation job.' + + field :execution_identifier, + type: GraphQL::Types::String, + null: true, + description: 'Identifier that can be used to subscribe to the generated flow response.' + + argument :flow_id, + type: Types::GlobalIdType[::Flow], + required: false, + description: 'Flow to update with the prompt' + argument :model_identifier, + type: GraphQL::Types::String, + required: true, + description: 'Selected Velorum model identifier' + argument :project_id, + type: Types::GlobalIdType[::NamespaceProject], + required: true, + description: 'Project to generate a flow for' + argument :prompt, + type: GraphQL::Types::String, + required: true, + description: 'Prompt to send to Velorum' + + def resolve(project_id:, prompt:, model_identifier:, flow_id: nil) + return error_response(:invalid_setting, 'Velorum is disabled') unless velorum_enabled? + + project = SagittariusSchema.object_from_id(project_id) + return error_response(:project_not_found, 'Invalid project id') if project.nil? + + flow = flow_id.present? ? SagittariusSchema.object_from_id(flow_id) : nil + return error_response(:flow_not_found, 'Flow does not exist') if flow_id.present? && flow.nil? + if flow.present? && flow.project != project + return error_response(:invalid_flow, 'Flow does not belong to the project') + end + return error_response(:no_primary_runtime, 'Project has no primary runtime') if project.primary_runtime.nil? + + return error_response(:missing_permission, 'Missing permission') unless allowed?(project, flow) + + execution_identifier = SecureRandom.uuid + VelorumGenerateFlowJob.perform_later(execution_identifier, project.id, prompt, model_identifier, flow&.id) + + { execution_identifier: execution_identifier, errors: [] } + end + + private + + def velorum_enabled? + Sagittarius::Configuration.config[:velorum][:enabled] + end + + def allowed?(project, flow) + return false unless Ability.allowed?(current_authentication, :read_velorum_config, :global) + + ability = flow.present? ? :update_flow : :create_flow + subject = flow || project + + Ability.allowed?(current_authentication, ability, subject) + end + + def error_response(error_code, message) + { + execution_identifier: nil, + errors: [create_error(error_code, message)], + } + end + end + end +end diff --git a/app/graphql/subscription_triggers.rb b/app/graphql/subscription_triggers.rb index fabc741e..5cb369e1 100644 --- a/app/graphql/subscription_triggers.rb +++ b/app/graphql/subscription_triggers.rb @@ -9,4 +9,13 @@ def self.execution_result(execution_result) context: { visibility_profile: :execution } ) end + + def self.velorum_generate_flow(execution_identifier, flow) + SagittariusSchema.subscriptions.trigger( + :velorum_generate_flow, + { execution_identifier: execution_identifier }, + flow, + context: { visibility_profile: :execution } + ) + end end diff --git a/app/graphql/subscriptions/velorum/generate_flow.rb b/app/graphql/subscriptions/velorum/generate_flow.rb new file mode 100644 index 00000000..f502c236 --- /dev/null +++ b/app/graphql/subscriptions/velorum/generate_flow.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Subscriptions + module Velorum + class GenerateFlow < BaseSubscription + description 'Generate a flow through Velorum and close the subscription with the generated flow' + + argument :execution_identifier, + type: GraphQL::Types::String, + required: true, + description: 'Velorum generation request identifier returned by the mutation' + + field :flow, + type: GraphQL::Types::JSON, + null: true, + description: 'Generated flow returned by Velorum' + + def subscribe(**) + :no_response + end + + def update(*) + unsubscribe(flow: object) + end + end + end +end diff --git a/app/graphql/types/mutation_type.rb b/app/graphql/types/mutation_type.rb index 4575bf18..fbead1bc 100644 --- a/app/graphql/types/mutation_type.rb +++ b/app/graphql/types/mutation_type.rb @@ -47,6 +47,7 @@ class MutationType < Types::BaseObject mount_mutation Mutations::Users::PasswordReset mount_mutation Mutations::Users::Register mount_mutation Mutations::Users::Update + mount_mutation Mutations::Velorum::GenerateFlow mount_mutation Mutations::Echo end end diff --git a/app/graphql/types/subscription_type.rb b/app/graphql/types/subscription_type.rb index 07b5bfbc..e28dd2e2 100644 --- a/app/graphql/types/subscription_type.rb +++ b/app/graphql/types/subscription_type.rb @@ -7,6 +7,7 @@ class SubscriptionType < Types::BaseObject include Sagittarius::Graphql::MountSubscription mount_subscription Subscriptions::Namespaces::Projects::Flows::ExecutionResult + mount_subscription Subscriptions::Velorum::GenerateFlow mount_subscription Subscriptions::Echo end end diff --git a/app/jobs/velorum_generate_flow_job.rb b/app/jobs/velorum_generate_flow_job.rb new file mode 100644 index 00000000..52bc245c --- /dev/null +++ b/app/jobs/velorum_generate_flow_job.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +class VelorumGenerateFlowJob < ApplicationJob + def perform(execution_identifier, project_id, prompt, model_identifier, flow_id = nil) + project = NamespaceProject.find_by(id: project_id) + return if project.nil? + + flow = flow_id.present? ? Flow.find_by(id: flow_id) : nil + return if flow_id.present? && flow.nil? + + response = Velorum::GenerateFlowService.new( + nil, + project: project, + prompt: prompt, + model_identifier: model_identifier, + flow: flow, + authorize: false + ).execute + return unless response.success? + + SubscriptionTriggers.velorum_generate_flow(execution_identifier, response.payload[:flow]) + end +end diff --git a/app/models/flow.rb b/app/models/flow.rb index 8fce28b0..7c451750 100644 --- a/app/models/flow.rb +++ b/app/models/flow.rb @@ -64,4 +64,14 @@ def to_grpc signature: signature ) end + + def to_generation_grpc + Tucana::Shared::GenerationFlow.new( + name: name, + type: flow_type.identifier, + starting_node_id: starting_node&.id&.to_s, + settings: flow_settings.map(&:to_grpc), + node_functions: node_functions.map(&:to_grpc) + ) + end end diff --git a/app/models/flow_type.rb b/app/models/flow_type.rb index 6b5c1876..3928ad35 100644 --- a/app/models/flow_type.rb +++ b/app/models/flow_type.rb @@ -42,4 +42,23 @@ def parsed_version def runtime_identifier runtime_flow_type&.identifier end + + def to_grpc + Tucana::Shared::FlowType.new( + identifier: identifier, + settings: flow_type_settings.map(&:to_grpc), + editable: editable, + name: names.map(&:to_grpc), + description: descriptions.map(&:to_grpc), + documentation: documentations.map(&:to_grpc), + display_message: display_messages.map(&:to_grpc), + alias: aliases.map(&:to_grpc), + version: version, + display_icon: display_icon, + definition_source: definition_source, + linked_data_type_identifiers: referenced_data_types.map(&:identifier), + signature: signature, + runtime_identifier: runtime_identifier + ) + end end diff --git a/app/models/flow_type_setting.rb b/app/models/flow_type_setting.rb index 7213d08c..6c41e84b 100644 --- a/app/models/flow_type_setting.rb +++ b/app/models/flow_type_setting.rb @@ -27,4 +27,18 @@ class FlowTypeSetting < ApplicationRecord scope :active, -> { where(removed_at: nil) } scope :removed, -> { where.not(removed_at: nil) } + + def to_grpc + args = { + identifier: identifier, + unique: unique.to_s.upcase.to_sym, + name: names.map(&:to_grpc), + description: descriptions.map(&:to_grpc), + optional: optional, + hidden: hidden, + default_value: Tucana::Shared::Value.from_ruby(default_value), + } + + Tucana::Shared::FlowTypeSetting.new(**args) + end end diff --git a/app/services/velorum/generate_flow_service.rb b/app/services/velorum/generate_flow_service.rb new file mode 100644 index 00000000..b52b37d6 --- /dev/null +++ b/app/services/velorum/generate_flow_service.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +module Velorum + class GenerateFlowService + CACHE_KEY_PREFIX = 'velorum/generate_flow_definitions' + + def initialize( + current_authentication, + project:, + prompt:, + model_identifier:, + flow: nil, + client: nil, + cache: Rails.cache, + config: Sagittarius::Configuration.config[:velorum], + authorize: true + ) + @current_authentication = current_authentication + @project = project + @prompt = prompt + @model_identifier = model_identifier + @flow = flow + @client = client + @cache = cache + @config = config + @authorize = authorize + end + + def execute + return missing_permission_response if authorize? && !velorum_config_allowed? + return disabled_response unless config[:enabled] + return flow_project_mismatch_response if flow.present? && flow.project != project + return missing_permission_response if authorize? && !allowed? + return no_primary_runtime_response if runtime.nil? + + response = flow.present? ? client.flow(flow_request) : client.prompt(prompt_request) + write_cache(response.cached_until) + + ServiceResponse.success( + message: 'Generated flow', + payload: { + flow: GenerationFlowSerializer.new(response.flow).to_h, + cached_until: response.cached_until, + usage: response.usage, + } + ) + end + + private + + attr_reader :current_authentication, :project, :prompt, :model_identifier, :flow, :cache, :config + + def authorize? + @authorize + end + + def prompt_request + Tucana::Velorum::PromptRequest.new(**base_request_args) + end + + def flow_request + Tucana::Velorum::FlowRequest.new(**base_request_args, flow: flow.to_generation_grpc) + end + + def base_request_args + args = { + prompt: prompt, + project_id: project.id, + model_identifier: model_identifier, + } + + return args if definitions_cached? + + args.merge( + functions: runtime.function_definitions.map(&:to_grpc), + data_types: runtime.data_types.map(&:to_grpc), + flow_types: runtime.flow_types.map(&:to_grpc) + ) + end + + def definitions_cached? + cached_until = cache.read(cache_key).to_i + cached_until > current_time_ms + end + + def write_cache(cached_until) + return if cached_until.to_i <= current_time_ms + + cache.write(cache_key, cached_until, expires_in: ((cached_until - current_time_ms) / 1000.0).ceil.seconds) + end + + def cache_key + [ + CACHE_KEY_PREFIX, + "project:#{project.id}", + "runtime:#{runtime.id}", + "model:#{model_identifier}" + ].join(':') + end + + def current_time_ms + (Time.now.to_f * 1000).to_i + end + + def runtime + project.primary_runtime + end + + def client + @client ||= Sagittarius::Velorum::Client.new + end + + def velorum_config_allowed? + Ability.allowed?(current_authentication, :read_velorum_config, :global) + end + + def allowed? + ability = flow.present? ? :update_flow : :create_flow + subject = flow || project + + Ability.allowed?(current_authentication, ability, subject) + end + + def disabled_response + ServiceResponse.error(message: 'Velorum is disabled', error_code: :invalid_setting) + end + + def flow_project_mismatch_response + ServiceResponse.error(message: 'Flow does not belong to the project', error_code: :invalid_flow) + end + + def missing_permission_response + ServiceResponse.error(message: 'Missing permission', error_code: :missing_permission) + end + + def no_primary_runtime_response + ServiceResponse.error(message: 'Project has no primary runtime', error_code: :no_primary_runtime) + end + end +end diff --git a/app/services/velorum/generation_flow_serializer.rb b/app/services/velorum/generation_flow_serializer.rb new file mode 100644 index 00000000..a249b44a --- /dev/null +++ b/app/services/velorum/generation_flow_serializer.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +module Velorum + class GenerationFlowSerializer + def initialize(flow) + @flow = flow + end + + def to_h + { + name: flow.name, + type: flow.type, + starting_node_id: blank_zero(flow.starting_node_id), + settings: flow.settings.map { |setting| flow_setting_to_h(setting) }, + nodes: flow.node_functions.map { |node| node_to_h(node) }, + } + end + + private + + attr_reader :flow + + def node_to_h(node) + { + id: blank_zero(node.database_id), + function_identifier: node.runtime_function_id, + next_node_id: blank_zero(node.next_node_id), + definition_source: node.definition_source, + parameters: node.parameters.map { |parameter| parameter_to_h(parameter) }, + } + end + + def parameter_to_h(parameter) + { + id: blank_zero(parameter.database_id), + parameter_identifier: parameter.runtime_parameter_id, + cast: parameter.cast, + value: node_value_to_h(parameter.value), + } + end + + def node_value_to_h(value) + return {} if value.nil? + + if value.literal_value + { literal_value: value.literal_value.to_ruby(true) } + elsif value.reference_value + { reference_value: reference_value_to_h(value.reference_value) } + elsif value.sub_flow + { sub_flow_value: sub_flow_to_h(value.sub_flow) } + else + {} + end + end + + def reference_value_to_h(value) + hash = { + reference_path: value.paths.map { |path| reference_path_to_h(path) }, + } + + if value.input_type + hash[:node_function_id] = blank_zero(value.input_type.node_id) + hash[:parameter_index] = blank_zero(value.input_type.parameter_index) + hash[:input_index] = blank_zero(value.input_type.input_index) + elsif !value.flow_input + hash[:node_function_id] = blank_zero(value.node_id) + end + + hash + end + + def reference_path_to_h(path) + { + path: path.path, + array_index: blank_zero(path.array_index), + } + end + + def sub_flow_to_h(sub_flow) + { + starting_node_id: blank_zero(sub_flow.starting_node_id), + function_identifier: sub_flow.function_identifier, + signature: sub_flow.signature, + settings: sub_flow.settings.map { |setting| sub_flow_setting_to_h(setting) }, + } + end + + def sub_flow_setting_to_h(setting) + { + identifier: setting.identifier, + default_value: setting.default_value&.to_ruby(true), + optional: setting.optional, + hidden: setting.hidden, + } + end + + def flow_setting_to_h(setting) + { + id: blank_zero(setting.database_id), + flow_setting_id: setting.flow_setting_id, + value: setting.value&.to_ruby(true), + cast: setting.cast, + } + end + + def blank_zero(value) + return if value.blank? || value.to_s == '0' + + value + end + end +end diff --git a/docs/graphql/mutation/velorumgenerateflow.md b/docs/graphql/mutation/velorumgenerateflow.md new file mode 100644 index 00000000..ff8bd5a9 --- /dev/null +++ b/docs/graphql/mutation/velorumgenerateflow.md @@ -0,0 +1,23 @@ +--- +title: velorumGenerateFlow +--- + +Start a Velorum flow generation job. + +## Arguments + +| Name | Type | Description | +|------|------|-------------| +| `clientMutationId` | [`String`](../scalar/string.md) | A unique identifier for the client performing the mutation. | +| `flowId` | [`FlowID`](../scalar/flowid.md) | Flow to update with the prompt | +| `modelIdentifier` | [`String!`](../scalar/string.md) | Selected Velorum model identifier | +| `projectId` | [`NamespaceProjectID!`](../scalar/namespaceprojectid.md) | Project to generate a flow for | +| `prompt` | [`String!`](../scalar/string.md) | Prompt to send to Velorum | + +## Fields + +| Name | Type | Description | +|------|------|-------------| +| `clientMutationId` | [`String`](../scalar/string.md) | A unique identifier for the client performing the mutation. | +| `errors` | [`[Error!]!`](../object/error.md) | Errors encountered during execution of the mutation. | +| `executionIdentifier` | [`String`](../scalar/string.md) | Identifier that can be used to subscribe to the generated flow response. | diff --git a/docs/graphql/subscription/velorumgenerateflow.md b/docs/graphql/subscription/velorumgenerateflow.md new file mode 100644 index 00000000..dfe25b78 --- /dev/null +++ b/docs/graphql/subscription/velorumgenerateflow.md @@ -0,0 +1,17 @@ +--- +title: velorumGenerateFlow +--- + +Generate a flow through Velorum and close the subscription with the generated flow + +## Arguments + +| Name | Type | Description | +|------|------|-------------| +| `executionIdentifier` | [`String!`](../scalar/string.md) | Velorum generation request identifier returned by the mutation | + +## Fields + +| Name | Type | Description | +|------|------|-------------| +| `flow` | [`JSON`](../scalar/json.md) | Generated flow returned by Velorum | diff --git a/lib/sagittarius/velorum/client.rb b/lib/sagittarius/velorum/client.rb index 77973150..56df5215 100644 --- a/lib/sagittarius/velorum/client.rb +++ b/lib/sagittarius/velorum/client.rb @@ -14,15 +14,27 @@ def initialize( end def models - stub.models(Tucana::Velorum::ModelsRequest.new, metadata: authentication_metadata) + info_stub.models(Tucana::Velorum::ModelsRequest.new, metadata: authentication_metadata) + end + + def prompt(request) + generate_stub.prompt(request, metadata: authentication_metadata) + end + + def flow(request) + generate_stub.flow(request, metadata: authentication_metadata) end private attr_reader :host, :jwt_secret, :jwt_ttl_minutes - def stub - @stub ||= Tucana::Velorum::InfoService::Stub.new(host, :this_channel_is_insecure) + def info_stub + @info_stub ||= Tucana::Velorum::InfoService::Stub.new(host, :this_channel_is_insecure) + end + + def generate_stub + @generate_stub ||= Tucana::Velorum::GenerateService::Stub.new(host, :this_channel_is_insecure) end def authentication_metadata diff --git a/spec/jobs/velorum_generate_flow_job_spec.rb b/spec/jobs/velorum_generate_flow_job_spec.rb new file mode 100644 index 00000000..fc3ab5f7 --- /dev/null +++ b/spec/jobs/velorum_generate_flow_job_spec.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe VelorumGenerateFlowJob do + include ActiveJob::TestHelper + + let(:execution_identifier) { SecureRandom.uuid } + let(:project) { create(:namespace_project, primary_runtime: create(:runtime)) } + let(:service) { instance_double(Velorum::GenerateFlowService, execute: service_response) } + let(:flow) { { name: 'Generated flow', type: 'default', nodes: [] } } + let(:service_response) { ServiceResponse.success(payload: { flow: flow }) } + + before do + allow(Velorum::GenerateFlowService).to receive(:new).and_return(service) + allow(SubscriptionTriggers).to receive(:velorum_generate_flow) + end + + it 'calls Velorum in the background and triggers the subscription response' do + perform_enqueued_jobs do + described_class.perform_later(execution_identifier, project.id, 'Generate a flow', 'gpt-5') + end + + expect(Velorum::GenerateFlowService).to have_received(:new).with( + nil, + project: project, + prompt: 'Generate a flow', + model_identifier: 'gpt-5', + flow: nil, + authorize: false + ) + expect(SubscriptionTriggers).to have_received(:velorum_generate_flow).with(execution_identifier, flow) + end + + it 'does not trigger the subscription when the project is gone' do + perform_enqueued_jobs do + described_class.perform_later(execution_identifier, -1, 'Generate a flow', 'gpt-5') + end + + expect(Velorum::GenerateFlowService).not_to have_received(:new) + expect(SubscriptionTriggers).not_to have_received(:velorum_generate_flow) + end +end diff --git a/spec/lib/sagittarius/velorum/client_spec.rb b/spec/lib/sagittarius/velorum/client_spec.rb index d9ed37a8..cff5831d 100644 --- a/spec/lib/sagittarius/velorum/client_spec.rb +++ b/spec/lib/sagittarius/velorum/client_spec.rb @@ -3,7 +3,8 @@ require 'rails_helper' RSpec.describe Sagittarius::Velorum::Client do - let(:stub) { instance_double(Tucana::Velorum::InfoService::Stub) } + let(:info_stub) { instance_double(Tucana::Velorum::InfoService::Stub) } + let(:generate_stub) { instance_double(Tucana::Velorum::GenerateService::Stub) } let(:response) { Tucana::Velorum::ModelsResponse.new } let(:jwt_secret) { 'velorum-secret' } let(:jwt_ttl_minutes) { 15 } @@ -11,8 +12,13 @@ before do allow(Time).to receive(:now).and_return(time) - allow(Tucana::Velorum::InfoService::Stub).to receive(:new).and_return(stub) - allow(stub).to receive(:models).and_return(response) + allow(Tucana::Velorum::InfoService::Stub).to receive(:new).and_return(info_stub) + allow(Tucana::Velorum::GenerateService::Stub).to receive(:new).and_return(generate_stub) + allow(info_stub).to receive(:models).and_return(response) + allow(generate_stub).to receive_messages( + prompt: Tucana::Velorum::FlowResponse.new, + flow: Tucana::Velorum::FlowResponse.new + ) end it 'uses the configured Velorum gRPC host to request models' do @@ -25,7 +31,7 @@ expect(Tucana::Velorum::InfoService::Stub) .to have_received(:new) .with('velorum.example:50052', :this_channel_is_insecure) - expect(stub).to have_received(:models).with( + expect(info_stub).to have_received(:models).with( an_instance_of(Tucana::Velorum::ModelsRequest), metadata: a_hash_including(authorization: kind_of(String)) ) @@ -38,7 +44,7 @@ jwt_ttl_minutes: jwt_ttl_minutes ).models - expect(stub).to have_received(:models) do |_, options| + expect(info_stub).to have_received(:models) do |_, options| token = options.fetch(:metadata).fetch(:authorization) encoded_header, encoded_payload, encoded_signature = token.split('.') signature_body = [encoded_header, encoded_payload].join('.') @@ -59,6 +65,39 @@ end end + it 'uses the generated Velorum gRPC host to request flow generation from a prompt' do + request = Tucana::Velorum::PromptRequest.new(prompt: 'Generate a flow') + + described_class.new( + host: 'velorum.example:50052', + jwt_secret: jwt_secret, + jwt_ttl_minutes: jwt_ttl_minutes + ).prompt(request) + + expect(Tucana::Velorum::GenerateService::Stub) + .to have_received(:new) + .with('velorum.example:50052', :this_channel_is_insecure) + expect(generate_stub).to have_received(:prompt).with( + request, + metadata: a_hash_including(authorization: kind_of(String)) + ) + end + + it 'uses the generated Velorum gRPC host to request flow generation from an existing flow' do + request = Tucana::Velorum::FlowRequest.new(prompt: 'Adjust the flow') + + described_class.new( + host: 'velorum.example:50052', + jwt_secret: jwt_secret, + jwt_ttl_minutes: jwt_ttl_minutes + ).flow(request) + + expect(generate_stub).to have_received(:flow).with( + request, + metadata: a_hash_including(authorization: kind_of(String)) + ) + end + it 'raises a clear error when no Velorum JWT secret is configured' do expect do described_class.new(host: 'velorum.example:50052', jwt_secret: nil).models diff --git a/spec/models/flow_spec.rb b/spec/models/flow_spec.rb index 140a4ee2..dd26f223 100644 --- a/spec/models/flow_spec.rb +++ b/spec/models/flow_spec.rb @@ -140,4 +140,42 @@ ) end end + + describe '#to_generation_grpc' do + let(:flow) do + create( + :flow, + flow_type: create(:flow_type, identifier: 'HTTP'), + flow_settings: [ + create(:flow_setting, flow_setting_id: 'HTTP_URL', object: { url: '/some-url' }) + ] + ) + end + + before do + runtime = create(:runtime, namespace: flow.project.namespace) + rfd = create(:runtime_function_definition, runtime: runtime) + fd = create(:function_definition, runtime_function_definition: rfd) + func = create(:node_function, function_definition: fd, flow: flow) + + flow.update!(starting_node: func) + end + + it 'returns a Velorum generation flow' do + grpc_object = flow.to_generation_grpc + + expect(grpc_object).to be_a(Tucana::Shared::GenerationFlow) + expect(grpc_object.to_h).to include( + name: flow.name, + type: 'HTTP', + starting_node_id: flow.starting_node.id.to_s, + settings: [ + a_hash_including(flow_setting_id: 'HTTP_URL') + ], + node_functions: [ + a_hash_including(database_id: flow.starting_node.id) + ] + ) + end + end end diff --git a/spec/models/flow_type_setting_spec.rb b/spec/models/flow_type_setting_spec.rb index 3a05356d..5355ad4c 100644 --- a/spec/models/flow_type_setting_spec.rb +++ b/spec/models/flow_type_setting_spec.rb @@ -34,4 +34,32 @@ expect(described_class.removed).not_to include(active_setting) end end + + describe '#to_grpc' do + let(:setting) do + create( + :flow_type_setting, + identifier: 'HTTP_URL', + unique: :project, + default_value: '/status', + optional: true, + hidden: true + ) + end + + it 'returns a shared flow type setting definition' do + grpc_object = setting.to_grpc + + expect(grpc_object).to be_a(Tucana::Shared::FlowTypeSetting) + expect(grpc_object.to_h).to include( + identifier: 'HTTP_URL', + unique: :PROJECT, + default_value: { + string_value: '/status', + }, + optional: true, + hidden: true + ) + end + end end diff --git a/spec/models/flow_type_spec.rb b/spec/models/flow_type_spec.rb index e4c8a854..5b5cf5c9 100644 --- a/spec/models/flow_type_spec.rb +++ b/spec/models/flow_type_spec.rb @@ -54,4 +54,39 @@ end end end + + describe '#to_grpc' do + let(:flow_type) do + create( + :flow_type, + identifier: 'HTTP', + editable: true, + version: '1.2.3', + display_icon: 'network', + definition_source: 'sagittarius', + signature: '(input: REST_ADAPTER_INPUT): HTTP_RESPONSE', + flow_type_settings: [ + build(:flow_type_setting, identifier: 'HTTP_URL', default_value: '/status') + ] + ) + end + + it 'returns a shared flow type definition' do + grpc_object = flow_type.to_grpc + + expect(grpc_object).to be_a(Tucana::Shared::FlowType) + expect(grpc_object.to_h).to include( + identifier: 'HTTP', + editable: true, + version: '1.2.3', + display_icon: 'network', + definition_source: 'sagittarius', + signature: '(input: REST_ADAPTER_INPUT): HTTP_RESPONSE', + runtime_identifier: flow_type.runtime_flow_type.identifier, + settings: [ + a_hash_including(identifier: 'HTTP_URL') + ] + ) + end + end end diff --git a/spec/requests/graphql/mutation/velorum/generate_flow_spec.rb b/spec/requests/graphql/mutation/velorum/generate_flow_spec.rb new file mode 100644 index 00000000..474dc02b --- /dev/null +++ b/spec/requests/graphql/mutation/velorum/generate_flow_spec.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe 'velorumGenerateFlow Mutation' do + include GraphqlHelpers + + subject(:mutate!) { post_graphql mutation, variables: variables, current_user: current_user } + + let(:mutation) do + <<~GQL + mutation($input: VelorumGenerateFlowInput!) { + velorumGenerateFlow(input: $input) { + executionIdentifier + #{error_query} + } + } + GQL + end + + let(:current_user) { create(:user) } + let(:runtime) { create(:runtime) } + let(:project) { create(:namespace_project, primary_runtime: runtime) } + let(:variables) do + { + input: { + projectId: project.to_global_id.to_s, + prompt: 'Generate a flow', + modelIdentifier: 'gpt-5', + }, + } + end + + before do + allow(Sagittarius::Configuration).to receive(:config) + .and_return(velorum: { enabled: true }) + allow(VelorumGenerateFlowJob).to receive(:perform_later) + + create(:namespace_member, namespace: project.namespace, user: current_user) + stub_allowed_ability(NamespaceProjectPolicy, :create_flow, user: current_user, subject: project) + end + + it 'returns an execution identifier and enqueues the Velorum generation job' do + mutate! + + execution_identifier = graphql_data_at(:velorum_generate_flow, :execution_identifier) + expect(execution_identifier).to be_present + expect(graphql_data_at(:velorum_generate_flow, :errors)).to eq([]) + expect(VelorumGenerateFlowJob).to have_received(:perform_later).with( + execution_identifier, + project.id, + 'Generate a flow', + 'gpt-5', + nil + ) + end + + context 'when Velorum is disabled' do + before do + allow(Sagittarius::Configuration).to receive(:config) + .and_return(velorum: { enabled: false }) + end + + it 'returns an error and does not enqueue a job' do + mutate! + + expect(graphql_data_at(:velorum_generate_flow, :execution_identifier)).to be_nil + expect(graphql_data_at(:velorum_generate_flow, :errors, 0, :error_code)).to eq('INVALID_SETTING') + expect(VelorumGenerateFlowJob).not_to have_received(:perform_later) + end + end +end diff --git a/spec/requests/graphql/subscription/velorum/generate_flow_spec.rb b/spec/requests/graphql/subscription/velorum/generate_flow_spec.rb new file mode 100644 index 00000000..92dd311a --- /dev/null +++ b/spec/requests/graphql/subscription/velorum/generate_flow_spec.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe 'velorumGenerateFlow Subscription', type: :channel do + include AuthenticationHelpers + include ActionCable::Channel::TestCase::Behavior + + include_context 'with graphql subscription support' + + tests GraphqlChannel + + let(:user) { create(:user) } + let(:token) { "Session #{authorization_token(user)}" } + let(:execution_identifier) { SecureRandom.uuid } + let(:flow) { { name: 'Generated flow', type: 'default', nodes: [] } } + let(:subscription_query) do + <<~GQL + subscription($executionIdentifier: String!) { + velorumGenerateFlow(executionIdentifier: $executionIdentifier) { + flow + } + } + GQL + end + + before do + subscribe(token: token) + end + + it 'delivers a generated flow for the matching execution identifier and closes the subscription' do + perform :execute, + query: subscription_query, + variables: { executionIdentifier: execution_identifier } + + SubscriptionTriggers.velorum_generate_flow(execution_identifier, flow) + + expect(transmissions.last).to include('more' => false) + expect(transmissions.last.dig('result', 'data', 'velorumGenerateFlow')).to eq( + 'flow' => { 'name' => 'Generated flow', 'type' => 'default', 'nodes' => [] } + ) + end +end diff --git a/spec/services/velorum/generate_flow_service_spec.rb b/spec/services/velorum/generate_flow_service_spec.rb new file mode 100644 index 00000000..96086504 --- /dev/null +++ b/spec/services/velorum/generate_flow_service_spec.rb @@ -0,0 +1,181 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Velorum::GenerateFlowService do + subject(:service_response) do + described_class.new( + current_authentication, + project: project, + prompt: prompt, + model_identifier: model_identifier, + flow: flow, + client: client, + cache: cache, + config: { enabled: true } + ).execute + end + + let(:current_authentication) { instance_double(UserSession) } + let(:project) { instance_double(NamespaceProject, id: 12, primary_runtime: runtime) } + let(:runtime) do + instance_double( + Runtime, + id: 9, + function_definitions: [function_definition], + data_types: [data_type], + flow_types: [flow_type] + ) + end + let(:function_definition) { instance_double(FunctionDefinition, to_grpc: grpc_function_definition) } + let(:data_type) { instance_double(DataType, to_grpc: grpc_data_type) } + let(:flow_type) { instance_double(FlowType, to_grpc: grpc_flow_type) } + let(:grpc_function_definition) { Tucana::Shared::FunctionDefinition.new(runtime_name: 'sum') } + let(:grpc_data_type) { Tucana::Shared::DefinitionDataType.new(identifier: 'number') } + let(:grpc_flow_type) { Tucana::Shared::FlowType.new(identifier: 'default') } + let(:client) { instance_double(Sagittarius::Velorum::Client) } + let(:prompt_requests) { [] } + let(:flow_requests) { [] } + let(:cache) { ActiveSupport::Cache::MemoryStore.new } + let(:prompt) { 'Generate a flow' } + let(:model_identifier) { 'gpt-5' } + let(:flow) { nil } + let(:cached_until) { 1_900_000_000_000 } + let(:generated_flow) do + Tucana::Shared::GenerationFlow.new( + name: 'Generated flow', + type: 'default', + starting_node_id: '1', + settings: [ + Tucana::Shared::FlowSetting.new( + flow_setting_id: 'region', + value: Tucana::Shared::Value.from_ruby('eu') + ) + ], + node_functions: [ + Tucana::Shared::NodeFunction.new( + database_id: 1, + runtime_function_id: 'sum', + parameters: [ + Tucana::Shared::NodeParameter.new( + runtime_parameter_id: 'left', + value: Tucana::Shared::NodeValue.new(literal_value: Tucana::Shared::Value.from_ruby(1)) + ) + ] + ) + ] + ) + end + let(:flow_response) do + Tucana::Velorum::FlowResponse.new( + flow: generated_flow, + cached_until: cached_until, + usage: 42 + ) + end + + before do + allow(Time).to receive(:now).and_return(Time.zone.local(2026, 6, 12, 10, 0, 0)) + allow(Ability).to receive(:allowed?).and_return(true) + allow(client).to receive(:prompt) do |request| + prompt_requests << request + flow_response + end + allow(client).to receive(:flow) do |request| + flow_requests << request + flow_response + end + end + + it 'sends available definitions with a prompt request when Velorum has no valid cache marker' do + expect(service_response).to be_success + + expect(client).to have_received(:prompt) do |request| + expect(request).to be_a(Tucana::Velorum::PromptRequest) + expect(request.prompt).to eq(prompt) + expect(request.project_id).to eq(project.id) + expect(request.model_identifier).to eq(model_identifier) + expect(request.functions).to eq([grpc_function_definition]) + expect(request.data_types).to eq([grpc_data_type]) + expect(request.flow_types).to eq([grpc_flow_type]) + end + + expect(service_response.payload).to include(cached_until: cached_until, usage: 42) + expect(service_response.payload[:flow]).to include( + name: 'Generated flow', + type: 'default', + starting_node_id: '1' + ) + end + + it 'omits definitions while the Velorum cache marker is still valid' do + service_response + + second_response = described_class.new( + current_authentication, + project: project, + prompt: prompt, + model_identifier: model_identifier, + client: client, + cache: cache, + config: { enabled: true } + ).execute + + expect(second_response).to be_success + expect(prompt_requests.size).to eq(2) + expect(prompt_requests.last.functions).to be_empty + expect(prompt_requests.last.data_types).to be_empty + expect(prompt_requests.last.flow_types).to be_empty + end + + context 'with an existing flow' do + let(:flow) do + instance_double( + Flow, + project: project, + to_generation_grpc: Tucana::Shared::GenerationFlow.new(name: 'Existing flow') + ) + end + + it 'uses the Velorum Flow RPC and checks update permissions' do + expect(service_response).to be_success + + expect(Ability).to have_received(:allowed?).with(current_authentication, :update_flow, flow) + expect(client).to have_received(:flow) do |request| + expect(request).to be_a(Tucana::Velorum::FlowRequest) + expect(request.flow.name).to eq('Existing flow') + end + end + end + + context 'when the project does not have a primary runtime' do + let(:runtime) { nil } + + it 'returns an error response' do + expect(service_response).to be_error + expect(service_response.payload[:error_code]).to eq(:no_primary_runtime) + expect(client).not_to have_received(:prompt) + end + end + + context 'when Velorum is disabled' do + subject(:service_response) do + described_class.new( + current_authentication, + project: project, + prompt: prompt, + model_identifier: model_identifier, + client: client, + cache: cache, + config: { enabled: false } + ).execute + end + + it 'returns an error without calling Velorum' do + expect(service_response).to be_error + expect(service_response.payload[:error_code]).to eq(:invalid_setting) + expect(client).not_to have_received(:prompt) + expect(client).not_to have_received(:flow) + end + end +end diff --git a/spec/services/velorum/generation_flow_serializer_spec.rb b/spec/services/velorum/generation_flow_serializer_spec.rb new file mode 100644 index 00000000..74b4b370 --- /dev/null +++ b/spec/services/velorum/generation_flow_serializer_spec.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Velorum::GenerationFlowSerializer do + it 'serializes a generated flow into frontend-consumable JSON data' do + flow = Tucana::Shared::GenerationFlow.new( + name: 'Generated flow', + type: 'default', + node_functions: [ + Tucana::Shared::NodeFunction.new( + runtime_function_id: 'sum', + parameters: [ + Tucana::Shared::NodeParameter.new( + runtime_parameter_id: 'left', + value: Tucana::Shared::NodeValue.new(literal_value: Tucana::Shared::Value.from_ruby(1)) + ) + ] + ) + ] + ) + + expect(described_class.new(flow).to_h).to include( + name: 'Generated flow', + type: 'default', + nodes: [ + a_hash_including( + function_identifier: 'sum', + parameters: [ + a_hash_including( + parameter_identifier: 'left', + value: { literal_value: 1 } + ) + ] + ) + ] + ) + end +end