Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions app/graphql/mutations/velorum/generate_flow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# frozen_string_literal: true

module Mutations
module Velorum
class GenerateFlow < BaseMutation
description 'Start a Velorum flow generation job.'

field :execution_identifier,
type: GraphQL::Types::String,
null: true,
description: 'Identifier that can be used to subscribe to the generated flow response.'

argument :flow_id,
type: Types::GlobalIdType[::Flow],
required: false,
description: 'Flow to update with the prompt'
argument :model_identifier,
type: GraphQL::Types::String,
required: true,
description: 'Selected Velorum model identifier'
argument :project_id,
type: Types::GlobalIdType[::NamespaceProject],
required: true,
description: 'Project to generate a flow for'
argument :prompt,
type: GraphQL::Types::String,
required: true,
description: 'Prompt to send to Velorum'

def resolve(project_id:, prompt:, model_identifier:, flow_id: nil)
return error_response(:invalid_setting, 'Velorum is disabled') unless velorum_enabled?

project = SagittariusSchema.object_from_id(project_id)
return error_response(:project_not_found, 'Invalid project id') if project.nil?

flow = flow_id.present? ? SagittariusSchema.object_from_id(flow_id) : nil
return error_response(:flow_not_found, 'Flow does not exist') if flow_id.present? && flow.nil?
if flow.present? && flow.project != project
return error_response(:invalid_flow, 'Flow does not belong to the project')
end
return error_response(:no_primary_runtime, 'Project has no primary runtime') if project.primary_runtime.nil?

return error_response(:missing_permission, 'Missing permission') unless allowed?(project, flow)

execution_identifier = SecureRandom.uuid
VelorumGenerateFlowJob.perform_later(execution_identifier, project.id, prompt, model_identifier, flow&.id)

{ execution_identifier: execution_identifier, errors: [] }
end

private

def velorum_enabled?
Sagittarius::Configuration.config[:velorum][:enabled]
end

def allowed?(project, flow)
return false unless Ability.allowed?(current_authentication, :read_velorum_config, :global)

ability = flow.present? ? :update_flow : :create_flow
subject = flow || project

Ability.allowed?(current_authentication, ability, subject)
end

def error_response(error_code, message)
{
execution_identifier: nil,
errors: [create_error(error_code, message)],
}
end
end
end
end
9 changes: 9 additions & 0 deletions app/graphql/subscription_triggers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,13 @@ def self.execution_result(execution_result)
context: { visibility_profile: :execution }
)
end

def self.velorum_generate_flow(execution_identifier, flow)
SagittariusSchema.subscriptions.trigger(
:velorum_generate_flow,
{ execution_identifier: execution_identifier },
flow,
context: { visibility_profile: :execution }
)
end
end
27 changes: 27 additions & 0 deletions app/graphql/subscriptions/velorum/generate_flow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

module Subscriptions
module Velorum
class GenerateFlow < BaseSubscription
description 'Generate a flow through Velorum and close the subscription with the generated flow'

argument :execution_identifier,
type: GraphQL::Types::String,
required: true,
description: 'Velorum generation request identifier returned by the mutation'

field :flow,
type: GraphQL::Types::JSON,
null: true,
description: 'Generated flow returned by Velorum'

def subscribe(**)
:no_response
end

def update(*)
unsubscribe(flow: object)
end
end
end
end
1 change: 1 addition & 0 deletions app/graphql/types/mutation_type.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class MutationType < Types::BaseObject
mount_mutation Mutations::Users::PasswordReset
mount_mutation Mutations::Users::Register
mount_mutation Mutations::Users::Update
mount_mutation Mutations::Velorum::GenerateFlow
mount_mutation Mutations::Echo
end
end
Expand Down
1 change: 1 addition & 0 deletions app/graphql/types/subscription_type.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class SubscriptionType < Types::BaseObject
include Sagittarius::Graphql::MountSubscription

mount_subscription Subscriptions::Namespaces::Projects::Flows::ExecutionResult
mount_subscription Subscriptions::Velorum::GenerateFlow
mount_subscription Subscriptions::Echo
end
end
23 changes: 23 additions & 0 deletions app/jobs/velorum_generate_flow_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

class VelorumGenerateFlowJob < ApplicationJob
def perform(execution_identifier, project_id, prompt, model_identifier, flow_id = nil)
project = NamespaceProject.find_by(id: project_id)
return if project.nil?

flow = flow_id.present? ? Flow.find_by(id: flow_id) : nil
return if flow_id.present? && flow.nil?

response = Velorum::GenerateFlowService.new(
nil,
project: project,
prompt: prompt,
model_identifier: model_identifier,
flow: flow,
authorize: false
).execute
return unless response.success?

SubscriptionTriggers.velorum_generate_flow(execution_identifier, response.payload[:flow])
end
end
10 changes: 10 additions & 0 deletions app/models/flow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,14 @@ def to_grpc
signature: signature
)
end

def to_generation_grpc
Tucana::Shared::GenerationFlow.new(
name: name,
type: flow_type.identifier,
starting_node_id: starting_node&.id&.to_s,
settings: flow_settings.map(&:to_grpc),
node_functions: node_functions.map(&:to_grpc)
)
end
end
19 changes: 19 additions & 0 deletions app/models/flow_type.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,23 @@ def parsed_version
def runtime_identifier
runtime_flow_type&.identifier
end

def to_grpc
Tucana::Shared::FlowType.new(
identifier: identifier,
settings: flow_type_settings.map(&:to_grpc),
editable: editable,
name: names.map(&:to_grpc),
description: descriptions.map(&:to_grpc),
documentation: documentations.map(&:to_grpc),
display_message: display_messages.map(&:to_grpc),
alias: aliases.map(&:to_grpc),
version: version,
display_icon: display_icon,
definition_source: definition_source,
linked_data_type_identifiers: referenced_data_types.map(&:identifier),
signature: signature,
runtime_identifier: runtime_identifier
)
end
end
14 changes: 14 additions & 0 deletions app/models/flow_type_setting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,18 @@ class FlowTypeSetting < ApplicationRecord

scope :active, -> { where(removed_at: nil) }
scope :removed, -> { where.not(removed_at: nil) }

def to_grpc
args = {
identifier: identifier,
unique: unique.to_s.upcase.to_sym,
name: names.map(&:to_grpc),
description: descriptions.map(&:to_grpc),
optional: optional,
hidden: hidden,
default_value: Tucana::Shared::Value.from_ruby(default_value),
}

Tucana::Shared::FlowTypeSetting.new(**args)
end
end
140 changes: 140 additions & 0 deletions app/services/velorum/generate_flow_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# frozen_string_literal: true

module Velorum
class GenerateFlowService
CACHE_KEY_PREFIX = 'velorum/generate_flow_definitions'

def initialize(
current_authentication,
project:,
prompt:,
model_identifier:,
flow: nil,
client: nil,
cache: Rails.cache,
config: Sagittarius::Configuration.config[:velorum],
authorize: true
)
@current_authentication = current_authentication
@project = project
@prompt = prompt
@model_identifier = model_identifier
@flow = flow
@client = client
@cache = cache
@config = config
@authorize = authorize
end

def execute
return missing_permission_response if authorize? && !velorum_config_allowed?
return disabled_response unless config[:enabled]
return flow_project_mismatch_response if flow.present? && flow.project != project
return missing_permission_response if authorize? && !allowed?
return no_primary_runtime_response if runtime.nil?

response = flow.present? ? client.flow(flow_request) : client.prompt(prompt_request)
write_cache(response.cached_until)

ServiceResponse.success(
message: 'Generated flow',
payload: {
flow: GenerationFlowSerializer.new(response.flow).to_h,
cached_until: response.cached_until,
usage: response.usage,
}
)
end

private

attr_reader :current_authentication, :project, :prompt, :model_identifier, :flow, :cache, :config

def authorize?
@authorize
end

def prompt_request
Tucana::Velorum::PromptRequest.new(**base_request_args)
end

def flow_request
Tucana::Velorum::FlowRequest.new(**base_request_args, flow: flow.to_generation_grpc)
end

def base_request_args
args = {
prompt: prompt,
project_id: project.id,
model_identifier: model_identifier,
}

return args if definitions_cached?

args.merge(
functions: runtime.function_definitions.map(&:to_grpc),
data_types: runtime.data_types.map(&:to_grpc),
flow_types: runtime.flow_types.map(&:to_grpc)
)
end

def definitions_cached?
cached_until = cache.read(cache_key).to_i
cached_until > current_time_ms
end

def write_cache(cached_until)
return if cached_until.to_i <= current_time_ms

cache.write(cache_key, cached_until, expires_in: ((cached_until - current_time_ms) / 1000.0).ceil.seconds)
end

def cache_key
[
CACHE_KEY_PREFIX,
"project:#{project.id}",
"runtime:#{runtime.id}",
"model:#{model_identifier}"
].join(':')
end

def current_time_ms
(Time.now.to_f * 1000).to_i
end

def runtime
project.primary_runtime
end

def client
@client ||= Sagittarius::Velorum::Client.new
end

def velorum_config_allowed?
Ability.allowed?(current_authentication, :read_velorum_config, :global)
end

def allowed?
ability = flow.present? ? :update_flow : :create_flow
subject = flow || project

Ability.allowed?(current_authentication, ability, subject)
end

def disabled_response
ServiceResponse.error(message: 'Velorum is disabled', error_code: :invalid_setting)
end

def flow_project_mismatch_response
ServiceResponse.error(message: 'Flow does not belong to the project', error_code: :invalid_flow)
end

def missing_permission_response
ServiceResponse.error(message: 'Missing permission', error_code: :missing_permission)
end

def no_primary_runtime_response
ServiceResponse.error(message: 'Project has no primary runtime', error_code: :no_primary_runtime)
end
end
end
Loading