diff --git a/README.md b/README.md index 48e9f20..9d286d9 100644 --- a/README.md +++ b/README.md @@ -47,100 +47,163 @@ Here's a simple example to get you started: ```ruby require 'mars' -# Define agents -class Agent1 < MARS::Agent +# Define a RubyLLM agent +class ResolveCountryAgent < RubyLLM::Agent + instructions "Answer with only the country name." end -class Agent2 < MARS::Agent +# Wrap the agent in a MARS step +class ResolveCountry < MARS::AgentStep + agent ResolveCountryAgent end -class Agent3 < MARS::Agent +# Plain Ruby steps subclass MARS::Step +class ResearchFood < MARS::Step + def run(input, ctx: {}) + result(value: "Typical food of #{input.value}") + end end -# Create agents -agent1 = Agent1.new -agent2 = Agent2.new -agent3 = Agent3.new +class ResearchSports < MARS::Step + def run(input, ctx: {}) + result(value: "Popular sports of #{input.value}") + end +end + +class BuildReport < MARS::Aggregator + def run(results, ctx: {}) + result( + value: { + country: ctx[:resolve_country].value, + food: results[0].value, + sports: results[1].value + } + ) + end +end -# Create a sequential workflow workflow = MARS::Workflows::Sequential.new( - "My First Workflow", - steps: [agent1, agent2, agent3] + "Country Report", + steps: [ + ResolveCountry.new, + MARS::Workflows::Parallel.new( + "country_details", + steps: [ + ResearchFood.new, + ResearchSports.new + ], + aggregator: BuildReport.new + ) + ] ) -# Run the workflow result = workflow.run("Your input here") +pp result.value ``` ## Core Concepts -### Agents +### Steps -Agents are the basic building blocks of MARS. They represent individual units of work: +Every executable object in MARS responds to `run`. Plain Ruby steps subclass `MARS::Step`: ```ruby -class CustomAgent < MARS::Agent - def system_prompt - "You are a helpful assistant" +class NormalizeQuestion < MARS::Step + def run(input, ctx: {}) + result(value: input.value.strip) end end -agent = CustomAgent.new( - options: { model: "gpt-4o" } -) +step = NormalizeQuestion.new +``` + +### Agent Steps + +`MARS::AgentStep` is a thin wrapper around a configured `RubyLLM::Agent`: + +```ruby +class CountryAgent < RubyLLM::Agent + instructions "Answer with only the country name." +end + +class ResolveCountry < MARS::AgentStep + agent CountryAgent +end ``` ### Sequential Workflows -Execute agents one after another, passing outputs as inputs: +Sequential workflows execute steps one after another, passing the previous output to the next step: ```ruby -sequential = MARS::Workflows::Sequential.new( +workflow = MARS::Workflows::Sequential.new( "Sequential Pipeline", - steps: [agent1, agent2, agent3] + steps: [ResolveCountry.new, NormalizeQuestion.new] ) ``` ### Parallel Workflows -Run multiple agents concurrently and aggregate their results: +Parallel workflows use ordered `steps:`. Without an aggregator they return an array of step outputs. With an aggregator they return a single value: ```ruby -aggregator = MARS::Aggregator.new( - "Results Aggregator", - operation: lambda { |results| results.join(", ") } -) +class BuildReport < MARS::Aggregator + def run(results, ctx: {}) + result( + value: { + country: ctx[:resolve_country].value, + food: results[0].value, + sports: results[1].value + } + ) + end +end parallel = MARS::Workflows::Parallel.new( "Parallel Pipeline", - steps: [agent1, agent2, agent3], - aggregator: aggregator + steps: [ + ResearchFood.new, + ResearchSports.new + ], + aggregator: BuildReport.new ) ``` ### Gates -Gates act as guards that either let the workflow continue or divert to a fallback path: +Gates branch out of the happy path when a condition matches. If the `check` returns `nil`, the workflow continues normally. If it returns a branch key, the selected branch runs and the current workflow stops: ```ruby +class TooBroad < MARS::Step + def run(input, ctx: {}) + result( + value: { + error: "Please ask about one country", + resolved_value: input.value + } + ) + end +end + gate = MARS::Gate.new( - "Validation Gate", - check: ->(input) { :failure unless input[:score] > 0.5 }, - fallbacks: { - failure: failure_workflow + "country_guard", + check: ->(input, _ctx) { :too_broad if input.value.split.size > 5 }, + branches: { + too_broad: TooBroad.new } ) ``` -Control halt scope — `:local` (default) stops only the parent workflow, `:global` propagates to the root: +### Context And Result + +Steps receive a shared `ctx:` object and workflows always return `MARS::Result`: ```ruby -gate = MARS::Gate.new( - "Critical Gate", - check: ->(input) { :error unless input[:valid] }, - fallbacks: { error: error_workflow }, - halt_scope: :global -) +result = workflow.run("Which is the largest country in Europe?") + +result.value # final workflow output +result.outputs[:research_food] # output captured for a step +result.stopped? # whether a gate branched out of the happy path ``` ### Visualization diff --git a/examples/complex_llm_workflow/generator.rb b/examples/complex_llm_workflow/generator.rb index 6bc1436..a0a5b95 100755 --- a/examples/complex_llm_workflow/generator.rb +++ b/examples/complex_llm_workflow/generator.rb @@ -1,6 +1,9 @@ #!/usr/bin/env ruby # frozen_string_literal: true +require "json" +require "net/http" +require "uri" require_relative "../../lib/mars" RubyLLM.configure do |config| @@ -37,75 +40,94 @@ def execute(latitude:, longitude:) end end -# Define LLMs -class Agent1 < MARS::AgentStep - def system_prompt - "You are a helpful assistant that can answer questions. - When asked about a country, only answer with its name." - end +class ResolveCountryAgent < RubyLLM::Agent + instructions "Answer with only the country name." end -class Agent2 < MARS::AgentStep - def system_prompt - "You are a helpful assistant that can answer questions and help with tasks. - Return information about the typical food of the country." - end +class TypicalFoodAgent < RubyLLM::Agent + instructions "Return information about the typical food of the country." end -class Agent3 < MARS::AgentStep - def system_prompt - "You are a helpful assistant that can answer questions and help with tasks. - Return information about the popular sports of the country." - end +class PopularSportsAgent < RubyLLM::Agent + instructions "Return information about the popular sports of the country." + schema SportsSchema.new +end - def schema - SportsSchema.new - end +class CapitalWeatherAgent < RubyLLM::Agent + instructions "Return the current weather of the country's capital." + tools Weather.new end -class Agent4 < MARS::AgentStep - def system_prompt - "You are a helpful assistant that can answer questions and help with tasks. - Return the current weather of the country's capital." - end +class ResolveCountry < MARS::AgentStep + agent ResolveCountryAgent +end + +class TypicalFood < MARS::AgentStep + agent TypicalFoodAgent +end - def tools - [Weather.new] +class PopularSports < MARS::AgentStep + agent PopularSportsAgent +end + +class CapitalWeather < MARS::AgentStep + agent CapitalWeatherAgent +end + +class TooBroad < MARS::Step + def run(input, ctx: {}) + result( + value: { + error: "Please ask about one country", + resolved_value: input.value + } + ) end end -# Create the LLMs -llm1 = Agent1.new -llm2 = Agent2.new -llm3 = Agent3.new -llm4 = Agent4.new +class BuildReport < MARS::Aggregator + def run(results, ctx: {}) + result( + value: { + country: ctx[:resolve_country].value, + food: results[0].value, + sports: results[1].value, + weather: results[2].value + } + ) + end +end parallel_workflow = MARS::Workflows::Parallel.new( "Parallel workflow", - steps: [llm2, llm3, llm4] -) - -error_workflow = MARS::Workflows::Sequential.new( - "Error workflow", - steps: [] + steps: [ + TypicalFood.new, + PopularSports.new, + CapitalWeather.new + ], + aggregator: BuildReport.new ) gate = MARS::Gate.new( - check: ->(input) { :failure unless input.split.length < 10 }, - fallbacks: { - failure: error_workflow + "country_guard", + check: ->(input, _ctx) { :failure unless input.value.split.length < 10 }, + branches: { + failure: TooBroad.new } ) sequential_workflow = MARS::Workflows::Sequential.new( "Sequential workflow", - steps: [llm1, gate, parallel_workflow] + steps: [ + ResolveCountry.new, + gate, + parallel_workflow + ] ) -# Generate and save the diagram diagram = MARS::Rendering::Mermaid.new(sequential_workflow).render File.write("examples/complex_llm_workflow/diagram.md", diagram) puts "Complex workflow diagram saved to: examples/complex_llm_workflow/diagram.md" -# Run the workflow -puts sequential_workflow.run("Which is the largest country in Europe?") +result = sequential_workflow.run("Which is the largest country in Europe?") +pp result.value diff --git a/examples/complex_workflow/generator.rb b/examples/complex_workflow/generator.rb index 1247e6d..7080cd1 100755 --- a/examples/complex_workflow/generator.rb +++ b/examples/complex_workflow/generator.rb @@ -3,63 +3,82 @@ require_relative "../../lib/mars" -# Define LLMs -class Agent1 < MARS::AgentStep +class NormalizeQuestion < MARS::Step + def run(input, ctx: {}) + result(value: input.value.strip) + end end -class Agent2 < MARS::AgentStep +class ResearchFood < MARS::Step + def run(input, ctx: {}) + result(value: "Typical food of #{input.value}") + end end -class Agent3 < MARS::AgentStep +class ResearchSports < MARS::Step + def run(input, ctx: {}) + result(value: "Popular sports of #{input.value}") + end end -class Agent4 < MARS::AgentStep +class ResearchWeather < MARS::Step + def run(input, ctx: {}) + result(value: "Current weather in the capital of #{input.value}") + end end -class Agent5 < MARS::AgentStep +class TooBroad < MARS::Step + def run(input, ctx: {}) + result( + value: { + error: "Please ask about one country", + resolved_value: input.value + } + ) + end end -# Create the LLMs -llm1 = Agent1.new -llm2 = Agent2.new -llm3 = Agent3.new -llm4 = Agent4.new -llm5 = Agent5.new - -# Create a parallel workflow (LLM 2 x LLM 3) parallel_workflow = MARS::Workflows::Parallel.new( "Parallel workflow", - steps: [llm2, llm3] + steps: [ + ResearchFood.new, + ResearchSports.new + ] ) -# Create a sequential workflow (Parallel workflow -> LLM 4) sequential_workflow = MARS::Workflows::Sequential.new( "Sequential workflow", - steps: [llm4, parallel_workflow] + steps: [ + parallel_workflow, + ResearchWeather.new + ] ) -# Create a parallel workflow (Sequential workflow x LLM 5) parallel_workflow2 = MARS::Workflows::Parallel.new( "Parallel workflow 2", - steps: [sequential_workflow, llm5] + steps: [ + sequential_workflow, + NormalizeQuestion.new + ] ) -# Create the gate that decides between exit or continue gate = MARS::Gate.new( - check: ->(input) { input[:result] }, - fallbacks: { - warning: sequential_workflow, - error: parallel_workflow + "country_guard", + check: ->(input, _ctx) { :too_broad if input.value.split.size > 5 }, + branches: { + too_broad: TooBroad.new } ) -# Create the main workflow: LLM 1 -> Gate main_workflow = MARS::Workflows::Sequential.new( "Main Pipeline", - steps: [llm1, gate, parallel_workflow2] + steps: [ + NormalizeQuestion.new, + gate, + parallel_workflow2 + ] ) -# Generate and save the diagram diagram = MARS::Rendering::Mermaid.new(main_workflow).render File.write("examples/complex_workflow/diagram.md", diagram) puts "Complex workflow diagram saved to: examples/complex_workflow/diagram.md" diff --git a/examples/parallel_workflow/generator.rb b/examples/parallel_workflow/generator.rb index 66378bd..c6592cc 100755 --- a/examples/parallel_workflow/generator.rb +++ b/examples/parallel_workflow/generator.rb @@ -3,31 +3,33 @@ require_relative "../../lib/mars" -# Define the LLMs -class Agent1 < MARS::AgentStep +class ResearchFood < MARS::Step + def run(input, ctx: {}) + result(value: "Typical food of #{input.value}") + end end -class Agent2 < MARS::AgentStep +class ResearchSports < MARS::Step + def run(input, ctx: {}) + result(value: "Popular sports of #{input.value}") + end end -class Agent3 < MARS::AgentStep +class ResearchWeather < MARS::Step + def run(input, ctx: {}) + result(value: "Current weather in the capital of #{input.value}") + end end -# Create the LLMs -llm1 = Agent1.new -llm2 = Agent2.new -llm3 = Agent3.new - -aggregator = MARS::Aggregator.new("Aggregator", operation: lambda(&:sum)) - -# Create the parallel workflow (LLM 1, LLM 2, LLM 3) parallel_workflow = MARS::Workflows::Parallel.new( "Parallel workflow", - steps: [llm1, llm2, llm3], - aggregator: aggregator + steps: [ + ResearchFood.new, + ResearchSports.new, + ResearchWeather.new + ] ) -# Generate and save the diagram diagram = MARS::Rendering::Mermaid.new(parallel_workflow).render File.write("examples/parallel_workflow/diagram.md", diagram) puts "Parallel workflow diagram saved to: examples/parallel_workflow/diagram.md" diff --git a/examples/simple_workflow/generator.rb b/examples/simple_workflow/generator.rb index 5ac45c3..7818dfd 100755 --- a/examples/simple_workflow/generator.rb +++ b/examples/simple_workflow/generator.rb @@ -3,46 +3,41 @@ require_relative "../../lib/mars" -# Define the LLMs -class Agent1 < MARS::AgentStep +class ResolveCountryAgent < RubyLLM::Agent + instructions "Answer with only the country name." end -class Agent2 < MARS::AgentStep +class ResolveCountry < MARS::AgentStep + agent ResolveCountryAgent end -class Agent3 < MARS::AgentStep +class TooBroad < MARS::Step + def run(input, ctx: {}) + result( + value: { + error: "Please ask about one country", + resolved_value: input.value + } + ) + end end -class Agent4 < MARS::AgentStep -end - -# Create the LLMs -llm1 = Agent1.new -llm2 = Agent2.new -llm3 = Agent3.new -llm4 = Agent4.new - -# Create the failure workflow (LLM 3) -failure_workflow = MARS::Workflows::Sequential.new( - "Failure workflow", - steps: [llm4] -) - -# Create the gate that decides between exit or continue gate = MARS::Gate.new( - check: ->(input) { input[:result] }, - fallbacks: { - failure: failure_workflow + "country_guard", + check: ->(input, _ctx) { :too_broad if input.value.split.size > 5 }, + branches: { + too_broad: TooBroad.new } ) -# Create the main workflow: LLM 1 -> Gate main_workflow = MARS::Workflows::Sequential.new( "Main Pipeline", - steps: [llm1, gate, llm2, llm3] + steps: [ + ResolveCountry.new, + gate + ] ) -# Generate and save the diagram diagram = MARS::Rendering::Mermaid.new(main_workflow).render File.write("examples/simple_workflow/diagram.md", diagram) puts "Simple workflow diagram saved to: examples/simple_workflow/diagram.md" diff --git a/lib/mars/agent_step.rb b/lib/mars/agent_step.rb index a519997..c5f5755 100644 --- a/lib/mars/agent_step.rb +++ b/lib/mars/agent_step.rb @@ -1,15 +1,15 @@ # frozen_string_literal: true module MARS - class AgentStep < Runnable + class AgentStep < Step class << self def agent(klass = nil) klass ? @agent_class = klass : @agent_class end end - def run(input) - self.class.agent.new.ask(input).content + def run(input, ctx: {}) + result(value: self.class.agent.new.ask(input.value).content) end end end diff --git a/lib/mars/aggregator.rb b/lib/mars/aggregator.rb index d21b3bd..f2e46cc 100644 --- a/lib/mars/aggregator.rb +++ b/lib/mars/aggregator.rb @@ -1,17 +1,17 @@ # frozen_string_literal: true module MARS - class Aggregator < Runnable + class Aggregator < Step attr_reader :operation def initialize(name = "Aggregator", operation: nil, **kwargs) super(name: name, **kwargs) - @operation = operation || ->(inputs) { inputs } + @operation = operation || ->(inputs, _ctx) { inputs } end - def run(inputs) - operation.call(inputs) + def run(inputs, ctx: {}) + Result.wrap(operation.call(inputs, ctx)) end end end diff --git a/lib/mars/context.rb b/lib/mars/context.rb new file mode 100644 index 0000000..564e777 --- /dev/null +++ b/lib/mars/context.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module MARS + class Context + class Stop < StandardError + attr_reader :result + + def initialize(result) + @result = Result.wrap(result, stopped: true) + super("Workflow stopped") + end + end + + attr_reader :current_input, :outputs, :state + + def initialize(input: nil, state: {}, global_state: nil) + @current_input = Result.wrap(input) + @outputs = {} + @state = global_state || state + end + + def [](step_name) + outputs[step_name.to_sym] + end + + def fetch(step_name, *default, &block) + outputs.fetch(step_name.to_sym, *default, &block) + end + + def record(step_name, output) + formatted = Result.wrap(output) + @outputs[step_name.to_sym] = formatted + @current_input = formatted + end + + def fork(input: current_input) + self.class.new(input: input, state: state) + end + + def merge(child_contexts) + child_contexts.each do |child| + @outputs.merge!(child.outputs) + end + + self + end + + def stop!(value = current_input) + raise Stop.new(value) + end + + alias_method :global_state, :state + end +end diff --git a/lib/mars/execution_context.rb b/lib/mars/execution_context.rb deleted file mode 100644 index 2e4b7be..0000000 --- a/lib/mars/execution_context.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -module MARS - class ExecutionContext - attr_reader :current_input, :outputs, :global_state - - def initialize(input: nil, global_state: {}) - @current_input = input - @outputs = {} - @global_state = global_state - end - - def [](step_name) - outputs[step_name.to_sym] - end - - def record(step_name, output) - @outputs[step_name.to_sym] = output - @current_input = output - end - - def fork(input: current_input) - self.class.new(input: input, global_state: global_state) - end - - def merge(child_contexts) - child_contexts.each do |child| - @outputs.merge!(child.outputs) - end - - self - end - end -end diff --git a/lib/mars/formatter.rb b/lib/mars/formatter.rb index f82fb9c..2569567 100644 --- a/lib/mars/formatter.rb +++ b/lib/mars/formatter.rb @@ -7,7 +7,7 @@ def format_input(context) end def format_output(output) - output + Result.wrap(output) end end end diff --git a/lib/mars/gate.rb b/lib/mars/gate.rb index 5ce7502..b2a9471 100644 --- a/lib/mars/gate.rb +++ b/lib/mars/gate.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module MARS - class Gate < Runnable + class Gate < Step class << self def check(&block) @check_block = block @@ -9,41 +9,47 @@ def check(&block) attr_reader :check_block - def fallback(key, runnable) - fallbacks_map[key] = runnable + def branch(key, runnable) + branches_map[key] = runnable end - def fallbacks_map - @fallbacks_map ||= {} + def fallback(key, runnable) + branch(key, runnable) end - def halt_scope(scope = nil) - scope ? @halt_scope = scope : @halt_scope + def branches_map + @branches_map ||= {} end end - def initialize(name = "Gate", check: nil, fallbacks: nil, halt_scope: nil, **kwargs) + def initialize(name = "Gate", check: nil, branches: nil, fallbacks: nil, **kwargs) super(name: name, **kwargs) @check = check || self.class.check_block - @fallbacks = fallbacks || self.class.fallbacks_map - @halt_scope = halt_scope || self.class.halt_scope || :local + @branches = branches || fallbacks || self.class.branches_map end - def run(input) - result = check.call(input) + def run(input, ctx: {}) + input = Result.wrap(input) + local_ctx = ctx.is_a?(Context) ? ctx : Context.new(input: input) + decision = evaluate_check(input, local_ctx) - return input unless result + return input unless decision - branch = fallbacks[result] - raise ArgumentError, "No fallback registered for #{result.inspect}" unless branch + branch = branches[decision] + raise ArgumentError, "No branch registered for #{decision.inspect}" unless branch - Halt.new(resolve_branch(branch).run(input), scope: @halt_scope) + branch_result = resolve_branch(branch).run(input, ctx: local_ctx.fork(input: input)) + Result.wrap(branch_result, stopped: true) end private - attr_reader :check, :fallbacks + attr_reader :check, :branches + + def evaluate_check(input, ctx) + check.call(input, ctx) + end def resolve_branch(branch) branch.is_a?(Class) ? branch.new : branch diff --git a/lib/mars/halt.rb b/lib/mars/halt.rb deleted file mode 100644 index 043e80e..0000000 --- a/lib/mars/halt.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -module MARS - class Halt - attr_reader :result, :scope - - def initialize(result, scope: :local) - @result = result - @scope = scope - end - - def local? = scope == :local - def global? = scope == :global - end -end diff --git a/lib/mars/rendering/graph/gate.rb b/lib/mars/rendering/graph/gate.rb index ccc5099..daf201a 100644 --- a/lib/mars/rendering/graph/gate.rb +++ b/lib/mars/rendering/graph/gate.rb @@ -6,17 +6,17 @@ module Graph module Gate include Base - def to_graph(builder, parent_id: nil, value: nil) - builder.add_node(node_id, name, Node::GATE) - builder.add_edge(parent_id, node_id, value) + def to_graph(builder, parent_id: nil, value: nil) + builder.add_node(node_id, name, Node::GATE) + builder.add_edge(parent_id, node_id, value) - sink_nodes = fallbacks.map do |fallback_key, branch| - branch.to_graph(builder, parent_id: node_id, value: fallback_key) - end - - sink_nodes.flatten + sink_nodes = branches.map do |branch_key, branch| + branch.to_graph(builder, parent_id: node_id, value: branch_key) end + + sink_nodes.flatten end end + end end end diff --git a/lib/mars/rendering/graph/parallel_workflow.rb b/lib/mars/rendering/graph/parallel_workflow.rb index 2c8493b..cbd34cd 100644 --- a/lib/mars/rendering/graph/parallel_workflow.rb +++ b/lib/mars/rendering/graph/parallel_workflow.rb @@ -8,25 +8,30 @@ module ParallelWorkflow def to_graph(builder, parent_id: nil, value: nil) builder.add_subgraph(node_id, name) if steps.any? - builder.add_node(aggregator.node_id, aggregator.name, Node::STEP) + builder.add_node(aggregator.node_id, aggregator.name, Node::STEP) if aggregator - build_steps_graph(builder, parent_id, value) + sink_nodes = build_steps_graph(builder, parent_id, value) - [aggregator.node_id] + aggregator ? [aggregator.node_id] : sink_nodes end private def build_steps_graph(builder, parent_id, value) + all_sink_nodes = [] + steps.each do |step| sink_nodes = step.to_graph(builder, parent_id: parent_id, value: value) + all_sink_nodes.concat(sink_nodes) builder.add_node_to_subgraph(node_id, step.node_id) sink_nodes.each do |sink_node| - aggregator.to_graph(builder, parent_id: sink_node) + builder.add_edge(sink_node, aggregator.node_id) if aggregator end end + + all_sink_nodes end end end diff --git a/lib/mars/result.rb b/lib/mars/result.rb new file mode 100644 index 0000000..ee24414 --- /dev/null +++ b/lib/mars/result.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +module MARS + class Result + attr_reader :value, :outputs, :state + + def self.wrap(result, stopped: nil, outputs: nil, state: nil) + wrapped = + case result + when self + result + when Hash + if envelope_hash?(result) + new( + value: result[:value], + stopped: result.fetch(:stopped, false), + outputs: result.fetch(:outputs, {}), + state: result[:state] + ) + else + new(value: result) + end + else + new(value: result) + end + + wrapped.with( + stopped: stopped.nil? ? wrapped.stopped? : stopped, + outputs: outputs || wrapped.outputs, + state: state || wrapped.state + ) + end + + def initialize(value:, stopped: false, outputs: {}, state: nil) + @value = value + @stopped = stopped + @outputs = outputs + @state = state + end + + def [](key) + case key.to_sym + when :value + value + when :stopped + stopped? + when :outputs + outputs + when :state + state + else + outputs[key.to_sym] + end + end + + def stopped? + @stopped + end + + def ok? + !stopped? + end + + def with(value: self.value, stopped: stopped?, outputs: self.outputs, state: self.state) + self.class.new( + value: value, + stopped: stopped, + outputs: outputs, + state: state + ) + end + + def to_h + { + value: value, + stopped: stopped?, + outputs: outputs, + state: state + } + end + + def ==(other) + other.is_a?(self.class) && to_h == other.to_h + end + + def self.envelope_hash?(result) + result.key?(:value) || result.key?(:stopped) || result.key?(:outputs) || result.key?(:state) + end + private_class_method :envelope_hash? + end +end diff --git a/lib/mars/runnable.rb b/lib/mars/runnable.rb index ea071e5..4b97f71 100644 --- a/lib/mars/runnable.rb +++ b/lib/mars/runnable.rb @@ -1,33 +1,5 @@ # frozen_string_literal: true module MARS - class Runnable - include Hooks - - attr_reader :name, :formatter - attr_accessor :state - - class << self - def step_name - return @step_name if defined?(@step_name) - return unless name - - name.split("::").last.gsub(/([a-z])([A-Z])/, '\1_\2').downcase - end - - def formatter(klass = nil) - klass ? @formatter_class = klass : @formatter_class - end - end - - def initialize(name: self.class.step_name, state: {}, formatter: nil) - @name = name - @state = state - @formatter = formatter || self.class.formatter&.new || Formatter.new - end - - def run(input) - raise NotImplementedError - end - end + Runnable = Step end diff --git a/lib/mars/step.rb b/lib/mars/step.rb new file mode 100644 index 0000000..f886cf9 --- /dev/null +++ b/lib/mars/step.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +module MARS + class Step + include Hooks + + attr_reader :name, :formatter + attr_accessor :state + + class << self + def step_name + return @step_name if defined?(@step_name) + return unless name + + name.split("::").last.gsub(/([a-z])([A-Z])/, '\1_\2').downcase + end + + def formatter(klass = nil) + klass ? @formatter_class = klass : @formatter_class + end + end + + def initialize(name: self.class.step_name, state: {}, formatter: nil) + @name = name + @state = state + @formatter = formatter || self.class.formatter&.new || Formatter.new + end + + def run(_input, ctx: {}) + raise NotImplementedError + end + + private + + def result(value:, stopped: false) + Result.new(value: value, stopped: stopped) + end + end +end diff --git a/lib/mars/workflows/parallel.rb b/lib/mars/workflows/parallel.rb index 12c5d41..dd11533 100644 --- a/lib/mars/workflows/parallel.rb +++ b/lib/mars/workflows/parallel.rb @@ -2,16 +2,17 @@ module MARS module Workflows - class Parallel < Runnable + class Parallel < Step def initialize(name, steps:, aggregator: nil, **kwargs) super(name: name, **kwargs) @steps = steps - @aggregator = aggregator || Aggregator.new("#{name} Aggregator") + @aggregator = aggregator end - def run(input) - context = ensure_context(input) + def run(input, ctx: {}) + nested = ctx.is_a?(Context) + context = nested ? ctx : ensure_context(input) errors = [] child_contexts = [] results = execute_steps(context, errors, child_contexts) @@ -20,16 +21,25 @@ def run(input) context.merge(child_contexts) - has_global_halt = results.any? { |r| r.is_a?(Halt) && r.global? } - unwrapped = results.map { |r| r.is_a?(Halt) ? r.result : r } - result = aggregator.run(unwrapped) - has_global_halt ? Halt.new(result, scope: :global) : result + value = aggregator ? aggregator.run(results, ctx: context) : result(value: results) + + return Result.wrap(value, stopped: false) if nested + + Result.wrap( + value, + outputs: context.outputs.dup, + state: context.state + ) end private attr_reader :steps, :aggregator + def ensure_context(input) + input.is_a?(Context) ? input : Context.new(input: input) + end + def execute_steps(context, errors, child_contexts) Async do |workflow| tasks = steps.map do |step| @@ -37,31 +47,31 @@ def execute_steps(context, errors, child_contexts) child_contexts << child_ctx workflow.async do - step.run_before_hooks(child_ctx) - - step_input = step.formatter.format_input(child_ctx) - result = step.run(step_input) - - if result.is_a?(Halt) - step.run_after_hooks(child_ctx, result) - result - else - formatted = step.formatter.format_output(result) - child_ctx.record(step.name, formatted) - step.run_after_hooks(child_ctx, formatted) - formatted - end + execute_step(step, child_ctx) rescue StandardError => e errors << { error: e, step_name: step.name } end end - tasks.map(&:wait) + tasks.map(&:wait).compact end.result end - def ensure_context(input) - input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input) + def execute_step(step, child_ctx) + step.run_before_hooks(child_ctx) + + step_input = Result.wrap(step.formatter.format_input(child_ctx)) + result = step.run(step_input, ctx: child_ctx) + formatted = Result.wrap(step.formatter.format_output(result)) + + child_ctx.record(step.name, formatted) + step.run_after_hooks(child_ctx, formatted) + formatted + rescue Context::Stop => e + formatted = Result.wrap(step.formatter.format_output(e.result), stopped: true) + child_ctx.record(step.name, formatted) + step.run_after_hooks(child_ctx, formatted) + formatted end end end diff --git a/lib/mars/workflows/sequential.rb b/lib/mars/workflows/sequential.rb index 008dd18..51628c5 100644 --- a/lib/mars/workflows/sequential.rb +++ b/lib/mars/workflows/sequential.rb @@ -2,48 +2,60 @@ module MARS module Workflows - class Sequential < Runnable + class Sequential < Step def initialize(name, steps:, **kwargs) super(name: name, **kwargs) @steps = steps end - def run(input) - context = ensure_context(input) + def run(input, ctx: {}) + nested = ctx.is_a?(Context) + context = nested ? ctx : ensure_context(input) + value, stopped = execute(context) - @steps.each do |step| - step.run_before_hooks(context) + return Result.wrap(value, stopped: false) if nested - step_input = step.formatter.format_input(context) - result = step.run(step_input) + Result.wrap( + value, + stopped: stopped, + outputs: context.outputs.dup, + state: context.state + ) + end - if result.is_a?(Halt) - if result.global? - step.run_after_hooks(context, result) - return result - end + private - formatted = step.formatter.format_output(result.result) - context.record(step.name, formatted) - step.run_after_hooks(context, formatted) - break - end + attr_reader :steps - formatted = step.formatter.format_output(result) + def ensure_context(input) + input.is_a?(Context) ? input : Context.new(input: input) + end + + def execute(context) + steps.each do |step| + result = execute_step(step, context) + return [result, true] if result.stopped? + rescue Context::Stop => e + formatted = Result.wrap(step.formatter.format_output(e.result), stopped: true) context.record(step.name, formatted) step.run_after_hooks(context, formatted) + return [formatted, true] end - context.current_input + [context.current_input, false] end - private + def execute_step(step, context) + step.run_before_hooks(context) - attr_reader :steps + step_input = Result.wrap(step.formatter.format_input(context)) + result = step.run(step_input, ctx: context) + formatted = Result.wrap(step.formatter.format_output(result)) - def ensure_context(input) - input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input) + context.record(step.name, formatted) + step.run_after_hooks(context, formatted) + formatted end end end diff --git a/spec/mars/agent_step_spec.rb b/spec/mars/agent_step_spec.rb index 18244a5..13a2925 100644 --- a/spec/mars/agent_step_spec.rb +++ b/spec/mars/agent_step_spec.rb @@ -18,6 +18,8 @@ end describe "#run" do + let(:context) { MARS::Context.new(input: "hello") } + let(:mock_agent_instance) do instance_double("RubyLLM::Agent").tap do |mock| allow(mock).to receive(:ask).and_return(instance_double("RubyLLM::Message", content: "agent response")) @@ -39,9 +41,9 @@ it "creates a new agent instance and calls ask" do step = step_class.new - result = step.run("hello") + result = step.run(MARS::Result.new(value: "hello"), ctx: context) - expect(result).to eq("agent response") + expect(result).to eq(MARS::Result.new(value: "agent response")) expect(mock_agent_class).to have_received(:new) expect(mock_agent_instance).to have_received(:ask).with("hello") end @@ -49,7 +51,7 @@ describe "inheritance" do it "inherits from MARS::Runnable" do - expect(described_class.ancestors).to include(MARS::Runnable) + expect(described_class.ancestors).to include(MARS::Step) end it "has access to name, formatter, and hooks from Runnable" do diff --git a/spec/mars/aggregator_spec.rb b/spec/mars/aggregator_spec.rb index 408803e..6b0bcd3 100644 --- a/spec/mars/aggregator_spec.rb +++ b/spec/mars/aggregator_spec.rb @@ -6,17 +6,20 @@ let(:aggregator) { described_class.new } it "returns the input as is" do - result = aggregator.run([1, 2, 3]) - expect(result).to eq([1, 2, 3]) + inputs = [MARS::Result.new(value: 1), MARS::Result.new(value: 2), MARS::Result.new(value: 3)] + result = aggregator.run(inputs) + expect(result).to eq(MARS::Result.new(value: inputs)) end end context "when initialized with an operation" do - let(:aggregator) { described_class.new("Aggregator", operation: lambda(&:join)) } + let(:aggregator) do + described_class.new("Aggregator", operation: lambda { |results, _ctx| results.map(&:value).join }) + end it "executes the operation and returns its value" do - result = aggregator.run(%w[a b c]) - expect(result).to eq("abc") + result = aggregator.run([MARS::Result.new(value: "a"), MARS::Result.new(value: "b"), MARS::Result.new(value: "c")]) + expect(result).to eq(MARS::Result.new(value: "abc")) end end end diff --git a/spec/mars/execution_context_spec.rb b/spec/mars/execution_context_spec.rb index d30d47d..a3eb4fe 100644 --- a/spec/mars/execution_context_spec.rb +++ b/spec/mars/execution_context_spec.rb @@ -1,15 +1,15 @@ # frozen_string_literal: true -RSpec.describe MARS::ExecutionContext do +RSpec.describe MARS::Context do describe "#current_input" do it "returns the initial input" do context = described_class.new(input: "query") - expect(context.current_input).to eq("query") + expect(context.current_input).to eq(MARS::Result.new(value: "query")) end - it "returns nil when no input is provided" do + it "wraps nil when no input is provided" do context = described_class.new - expect(context.current_input).to be_nil + expect(context.current_input).to eq(MARS::Result.new(value: nil)) end end @@ -18,14 +18,14 @@ context = described_class.new(input: "query") context.record(:researcher, "research result") - expect(context[:researcher]).to eq("research result") + expect(context[:researcher]).to eq(MARS::Result.new(value: "research result")) end it "updates current_input to the recorded output" do context = described_class.new(input: "query") context.record(:researcher, "research result") - expect(context.current_input).to eq("research result") + expect(context.current_input).to eq(MARS::Result.new(value: "research result")) end it "tracks multiple step outputs" do @@ -33,8 +33,13 @@ context.record(:researcher, "research result") context.record(:summarizer, "summary") - expect(context.outputs).to eq({ researcher: "research result", summarizer: "summary" }) - expect(context.current_input).to eq("summary") + expect(context.outputs).to eq( + { + researcher: MARS::Result.new(value: "research result"), + summarizer: MARS::Result.new(value: "summary") + } + ) + expect(context.current_input).to eq(MARS::Result.new(value: "summary")) end end @@ -48,19 +53,19 @@ describe "#global_state" do it "defaults to an empty hash" do context = described_class.new(input: "query") - expect(context.global_state).to eq({}) + expect(context.state).to eq({}) end it "accepts initial global state" do context = described_class.new(input: "query", global_state: { user_id: 42 }) - expect(context.global_state).to eq({ user_id: 42 }) + expect(context.state).to eq({ user_id: 42 }) end it "is mutable" do context = described_class.new(input: "query") - context.global_state[:key] = "value" + context.state[:key] = "value" - expect(context.global_state[:key]).to eq("value") + expect(context.state[:key]).to eq("value") end end @@ -69,23 +74,23 @@ context = described_class.new(input: "query") child = context.fork - expect(child.current_input).to eq("query") + expect(child.current_input).to eq(MARS::Result.new(value: "query")) end it "creates a child context with a custom input" do context = described_class.new(input: "query") child = context.fork(input: "custom") - expect(child.current_input).to eq("custom") + expect(child.current_input).to eq(MARS::Result.new(value: "custom")) end it "shares global_state with the parent" do context = described_class.new(input: "query", global_state: { shared: true }) child = context.fork - child.global_state[:added_by_child] = true + child.state[:added_by_child] = true - expect(context.global_state[:added_by_child]).to be(true) + expect(context.state[:added_by_child]).to be(true) end it "has independent outputs from the parent" do @@ -113,9 +118,29 @@ context.merge([child1, child2]) - expect(context[:step1]).to eq("output1") - expect(context[:branch_a]).to eq("result_a") - expect(context[:branch_b]).to eq("result_b") + expect(context[:step1]).to eq(MARS::Result.new(value: "output1")) + expect(context[:branch_a]).to eq(MARS::Result.new(value: "result_a")) + expect(context[:branch_b]).to eq(MARS::Result.new(value: "result_b")) + end + end + + describe "#fetch" do + it "fetches a stored output by step name" do + context = described_class.new(input: "query") + context.record(:researcher, "research result") + + expect(context.fetch(:researcher)).to eq(MARS::Result.new(value: "research result")) + end + end + + describe "#stop!" do + it "raises an internal stop signal with the provided value" do + context = described_class.new(input: "query") + + expect { context.stop!("done") } + .to raise_error(MARS::Context::Stop) do |error| + expect(error.result).to eq(MARS::Result.new(value: "done", stopped: true)) + end end end end diff --git a/spec/mars/formatter_spec.rb b/spec/mars/formatter_spec.rb index 55b17f3..868b62a 100644 --- a/spec/mars/formatter_spec.rb +++ b/spec/mars/formatter_spec.rb @@ -5,14 +5,14 @@ describe "#format_input" do it "returns the context's current_input" do - context = MARS::ExecutionContext.new(input: "hello") - expect(formatter.format_input(context)).to eq("hello") + context = MARS::Context.new(input: "hello") + expect(formatter.format_input(context)).to eq(MARS::Result.new(value: "hello")) end end describe "#format_output" do - it "returns the output unchanged" do - expect(formatter.format_output("result")).to eq("result") + it "normalizes raw output into a result" do + expect(formatter.format_output("result")).to eq(MARS::Result.new(value: "result")) end end @@ -20,11 +20,11 @@ let(:custom_formatter_class) do Class.new(described_class) do def format_input(context) - context.current_input.upcase + MARS::Result.new(value: context.current_input.value.upcase) end def format_output(output) - "formatted: #{output}" + MARS::Result.new(value: "formatted: #{output.value}", stopped: output.stopped?) end end end @@ -32,12 +32,13 @@ def format_output(output) let(:custom_formatter) { custom_formatter_class.new } it "can override format_input" do - context = MARS::ExecutionContext.new(input: "hello") - expect(custom_formatter.format_input(context)).to eq("HELLO") + context = MARS::Context.new(input: "hello") + expect(custom_formatter.format_input(context)).to eq(MARS::Result.new(value: "HELLO")) end it "can override format_output" do - expect(custom_formatter.format_output("result")).to eq("formatted: result") + expect(custom_formatter.format_output(MARS::Result.new(value: "result"))) + .to eq(MARS::Result.new(value: "formatted: result")) end end end diff --git a/spec/mars/gate_spec.rb b/spec/mars/gate_spec.rb index cea8849..081a1ec 100644 --- a/spec/mars/gate_spec.rb +++ b/spec/mars/gate_spec.rb @@ -1,18 +1,20 @@ # frozen_string_literal: true RSpec.describe MARS::Gate do + let(:context) { MARS::Context.new(input: "hello") } + let(:fallback_step) do - Class.new(MARS::Runnable) do - def run(input) - "fallback: #{input}" + Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "fallback: #{input.value}") end end.new end let(:error_step) do - Class.new(MARS::Runnable) do - def run(input) - "error: #{input}" + Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "error: #{input.value}") end end.new end @@ -22,54 +24,50 @@ def run(input) it "passes through when check returns falsy" do gate = described_class.new( "PassGate", - check: ->(_input) {}, - fallbacks: { fail: fallback_step } + check: ->(_input, _ctx) {}, + branches: { fail: fallback_step } ) - expect(gate.run("hello")).to eq("hello") + expect(gate.run(MARS::Result.new(value: "hello"), ctx: context)).to eq(MARS::Result.new(value: "hello")) end - it "halts with fallback result when check returns a key" do + it "returns branch result when check returns a key" do gate = described_class.new( "FailGate", - check: ->(_input) { :fail }, - fallbacks: { fail: fallback_step } + check: ->(_input, _ctx) { :fail }, + branches: { fail: fallback_step } ) - result = gate.run("hello") - expect(result).to be_a(MARS::Halt) - expect(result.result).to eq("fallback: hello") + expect(gate.run(MARS::Result.new(value: "hello"), ctx: context)).to eq(MARS::Result.new(value: "fallback: hello", stopped: true)) end it "raises when check returns an unregistered key" do gate = described_class.new( "BadGate", - check: ->(_input) { :unknown }, - fallbacks: { fail: fallback_step } + check: ->(_input, _ctx) { :unknown }, + branches: { fail: fallback_step } ) - expect { gate.run("hello") }.to raise_error(ArgumentError, /No fallback registered for :unknown/) + expect { gate.run(MARS::Result.new(value: "hello"), ctx: context) }.to raise_error(ArgumentError, /No branch registered for :unknown/) end - it "selects among multiple fallbacks" do + it "selects among multiple branches" do gate = described_class.new( "MultiFallback", - check: ->(input) { input[:error_type] }, - fallbacks: { timeout: fallback_step, auth: error_step } + check: ->(input, _ctx) { input.value[:error_type] }, + branches: { timeout: fallback_step, auth: error_step } ) - input = { error_type: :auth } - result = gate.run(input) - expect(result).to be_a(MARS::Halt) - expect(result.result).to eq("error: #{input}") + input = MARS::Result.new(value: { error_type: :auth }) + expect(gate.run(input, ctx: context)).to eq(MARS::Result.new(value: "error: #{input.value}", stopped: true)) end end context "with class-level DSL" do let(:fallback_cls) do - Class.new(MARS::Runnable) do - def run(input) - "handled: #{input}" + Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "handled: #{input.value}") end end end @@ -77,52 +75,48 @@ def run(input) it "uses check and fallback DSL" do cls = fallback_cls gate_class = Class.new(described_class) do - check { |input| :invalid if input.length > 5 } - fallback :invalid, cls + check { |input, _ctx| :invalid if input.value.length > 5 } + branch :invalid, cls end gate = gate_class.new("DSLGate") - expect(gate.run("hi")).to eq("hi") - expect(gate.run("longstring").result).to eq("handled: longstring") - end - - it "supports halt_scope DSL" do - cls = fallback_cls - gate_class = Class.new(described_class) do - check { |_input| :fail } - fallback :fail, cls - halt_scope :global - end - - result = gate_class.new("GlobalGate").run("test") - expect(result).to be_a(MARS::Halt) - expect(result).to be_global + expect(gate.run(MARS::Result.new(value: "hi"), ctx: context)).to eq(MARS::Result.new(value: "hi")) + expect(gate.run(MARS::Result.new(value: "longstring"), ctx: context)).to eq(MARS::Result.new(value: "handled: longstring", stopped: true)) end end + end - context "with halt scope" do - it "defaults to local scope" do - gate = described_class.new( - "LocalGate", - check: ->(_input) { :fail }, - fallbacks: { fail: fallback_step } - ) - - result = gate.run("hello") - expect(result).to be_local - end - - it "respects constructor halt_scope" do - gate = described_class.new( - "GlobalGate", - check: ->(_input) { :fail }, - fallbacks: { fail: fallback_step }, - halt_scope: :global - ) - - result = gate.run("hello") - expect(result).to be_global - end + describe "inside a workflow" do + it "stops the current happy path after executing the selected branch" do + branch = Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "branched:#{input.value}") + end + end.new(name: "branch_step") + + gate = described_class.new( + "gate", + check: ->(_input, _ctx) { :branch }, + branches: { branch: branch } + ) + + workflow = MARS::Workflows::Sequential.new( + "gate_workflow", + steps: [ + gate, + Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "after:#{input.value}") + end + end.new(name: "after_step") + ] + ) + + result = workflow.run("hello") + expect(result).to be_stopped + expect(result.value).to eq("branched:hello") + expect(result.outputs[:gate]).to eq(MARS::Result.new(value: "branched:hello", stopped: true)) + expect(result[:after_step]).to be_nil end end end diff --git a/spec/mars/halt_spec.rb b/spec/mars/halt_spec.rb deleted file mode 100644 index da3b9c5..0000000 --- a/spec/mars/halt_spec.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe MARS::Halt do - describe "#scope" do - it "defaults to :local" do - halt = described_class.new("result") - expect(halt.scope).to eq(:local) - expect(halt).to be_local - expect(halt).not_to be_global - end - - it "can be set to :global" do - halt = described_class.new("result", scope: :global) - expect(halt.scope).to eq(:global) - expect(halt).to be_global - expect(halt).not_to be_local - end - end - - describe "#result" do - it "stores the result" do - halt = described_class.new("hello") - expect(halt.result).to eq("hello") - end - end -end diff --git a/spec/mars/hooks_spec.rb b/spec/mars/hooks_spec.rb index d304a7c..586d3bf 100644 --- a/spec/mars/hooks_spec.rb +++ b/spec/mars/hooks_spec.rb @@ -35,7 +35,7 @@ def initialize(name) hookable_class.before_run { |ctx, step| calls << [ctx, step.name] } step = hookable_class.new("test_step") - context = MARS::ExecutionContext.new(input: "input") + context = MARS::Context.new(input: "input") step.run_before_hooks(context) expect(calls).to eq([[context, "test_step"]]) @@ -47,7 +47,7 @@ def initialize(name) hookable_class.before_run { |_ctx, _step| order << :second } step = hookable_class.new("test_step") - step.run_before_hooks(MARS::ExecutionContext.new(input: "input")) + step.run_before_hooks(MARS::Context.new(input: "input")) expect(order).to eq(%i[first second]) end @@ -59,10 +59,11 @@ def initialize(name) hookable_class.after_run { |ctx, result, step| calls << [ctx, result, step.name] } step = hookable_class.new("test_step") - context = MARS::ExecutionContext.new(input: "input") - step.run_after_hooks(context, "the result") + context = MARS::Context.new(input: "input") + result = MARS::Result.new(value: "the result") + step.run_after_hooks(context, result) - expect(calls).to eq([[context, "the result", "test_step"]]) + expect(calls).to eq([[context, result, "test_step"]]) end end diff --git a/spec/mars/rendering/mermaid_spec.rb b/spec/mars/rendering/mermaid_spec.rb index f4f8da1..0a471a1 100644 --- a/spec/mars/rendering/mermaid_spec.rb +++ b/spec/mars/rendering/mermaid_spec.rb @@ -3,7 +3,7 @@ RSpec.describe MARS::Rendering::Mermaid do it "renders a custom Runnable subclass as a box node" do step = Class.new(MARS::Runnable) do - def run(input) = input + def run(input, ctx: {}) = input end.new(name: "custom_step") mermaid = described_class.new(step) @@ -23,10 +23,10 @@ def run(input) = input it "renders a Gate as a diamond node" do gate = MARS::Gate.new( "my_gate", - check: ->(_input) { :branch }, + check: ->(_input, _ctx) { :branch }, fallbacks: { branch: Class.new(MARS::Runnable) do - def run(input) = input + def run(input, ctx: {}) = input end.new(name: "branch_step") } ) @@ -54,7 +54,11 @@ def run(input) = input it "renders a Parallel workflow with aggregator" do step1 = MARS::AgentStep.new(name: "step1") step2 = MARS::AgentStep.new(name: "step2") - workflow = MARS::Workflows::Parallel.new("parallel", steps: [step1, step2]) + workflow = MARS::Workflows::Parallel.new( + "parallel", + steps: [step1, step2], + aggregator: MARS::Aggregator.new("parallel aggregator") + ) mermaid = described_class.new(workflow) output = mermaid.render diff --git a/spec/mars/result_spec.rb b/spec/mars/result_spec.rb new file mode 100644 index 0000000..99976b2 --- /dev/null +++ b/spec/mars/result_spec.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +RSpec.describe MARS::Result do + describe ".wrap" do + it "wraps raw values" do + expect(described_class.wrap("hello")).to eq(described_class.new(value: "hello")) + end + + it "passes through result instances" do + result = described_class.new(value: "hello") + expect(described_class.wrap(result)).to eq(result) + end + + it "does not treat domain hashes as envelopes" do + payload = { country: "Uruguay" } + expect(described_class.wrap(payload)).to eq(described_class.new(value: payload)) + end + end + + describe "#[]" do + it "reads core attributes and step outputs" do + child = described_class.new(value: "child") + result = described_class.new(value: "root", stopped: true, outputs: { step: child }, state: { user_id: 1 }) + + expect(result[:value]).to eq("root") + expect(result[:stopped]).to be(true) + expect(result[:outputs]).to eq(step: child) + expect(result[:state]).to eq(user_id: 1) + expect(result[:step]).to eq(child) + end + end +end diff --git a/spec/mars/runnable_spec.rb b/spec/mars/runnable_spec.rb index 972a549..cad1d15 100644 --- a/spec/mars/runnable_spec.rb +++ b/spec/mars/runnable_spec.rb @@ -6,15 +6,15 @@ let(:runnable) { described_class.new } it "raises NotImplementedError" do - expect { runnable.run("any input") }.to raise_error(NotImplementedError) + expect { runnable.run({ value: "any input" }, ctx: {}) }.to raise_error(NotImplementedError) end end context "when implemented in a subclass" do let(:test_runnable_class) do Class.new(MARS::Runnable) do - def run(input) - "processed: #{input}" + def run(input, ctx: {}) + MARS::Result.new(value: "processed: #{input.value}") end end end @@ -22,8 +22,8 @@ def run(input) let(:runnable) { test_runnable_class.new } it "can be successfully overridden" do - result = runnable.run("test input") - expect(result).to eq("processed: test input") + result = runnable.run(MARS::Result.new(value: "test input"), ctx: {}) + expect(result).to eq(MARS::Result.new(value: "processed: test input")) end end @@ -37,7 +37,7 @@ def run(input) let(:runnable) { incomplete_runnable_class.new } it "still raises NotImplementedError" do - expect { runnable.run("input") }.to raise_error(NotImplementedError) + expect { runnable.run(MARS::Result.new(value: "input"), ctx: {}) }.to raise_error(NotImplementedError) end end end @@ -92,7 +92,7 @@ def run(input) klass.before_run { |_ctx, step| calls << step.name } step = klass.new(name: "test") - step.run_before_hooks(MARS::ExecutionContext.new(input: "x")) + step.run_before_hooks(MARS::Context.new(input: "x")) expect(calls).to eq(["test"]) end @@ -103,9 +103,10 @@ def run(input) klass.after_run { |_ctx, result, _step| calls << result } step = klass.new(name: "test") - step.run_after_hooks(MARS::ExecutionContext.new(input: "x"), "result") + result = MARS::Result.new(value: "result") + step.run_after_hooks(MARS::Context.new(input: "x"), result) - expect(calls).to eq(["result"]) + expect(calls).to eq([result]) end end diff --git a/spec/mars/workflows/parallel_spec.rb b/spec/mars/workflows/parallel_spec.rb index 97407c8..cb47257 100644 --- a/spec/mars/workflows/parallel_spec.rb +++ b/spec/mars/workflows/parallel_spec.rb @@ -1,7 +1,9 @@ # frozen_string_literal: true RSpec.describe MARS::Workflows::Parallel do - let(:sum_aggregator) { MARS::Aggregator.new("Sum Aggregator", operation: lambda(&:sum)) } + let(:sum_aggregator) do + MARS::Aggregator.new("Sum Aggregator", operation: lambda { |results, _ctx| MARS::Result.new(value: results.sum(&:value)) }) + end let(:add_step_class) do Class.new(MARS::Runnable) do @@ -10,9 +12,9 @@ def initialize(value, **kwargs) @value = value end - def run(input) + def run(input, ctx: {}) sleep 0.1 - input + @value + MARS::Result.new(value: input.value + @value) end end end @@ -24,8 +26,8 @@ def initialize(multiplier, **kwargs) @multiplier = multiplier end - def run(input) - input * @multiplier + def run(input, ctx: {}) + MARS::Result.new(value: input.value * @multiplier) end end end @@ -37,7 +39,7 @@ def initialize(message, **kwargs) @message = message end - def run(_input) + def run(_input, ctx: {}) raise StandardError, @message end end @@ -49,108 +51,121 @@ def run(_input) multiply_three = multiply_step_class.new(3, name: "multiply_three") add_two = add_step_class.new(2, name: "add_two") - workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two]) + workflow = described_class.new( + "math_workflow", + steps: [add_five, multiply_three, add_two] + ) # 10 + 5 = 15, 10 * 3 = 30, 10 + 2 = 12 - expect(workflow.run(10)).to eq([15, 30, 12]) + expect(workflow.run(10).value).to eq( + [ + MARS::Result.new(value: 15), + MARS::Result.new(value: 30), + MARS::Result.new(value: 12) + ] + ) end it "executes steps in parallel with a custom aggregator" do add_five = add_step_class.new(5, name: "add_five") multiply_three = multiply_step_class.new(3, name: "multiply_three") add_two = add_step_class.new(2, name: "add_two") - workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two], - aggregator: sum_aggregator) + workflow = described_class.new( + "math_workflow", + steps: [add_five, multiply_three, add_two], + aggregator: sum_aggregator + ) - expect(workflow.run(10)).to eq(57) + expect(workflow.run(10).value).to eq(57) end it "handles single step" do multiply_step = multiply_step_class.new(7, name: "multiply") workflow = described_class.new("single_step", steps: [multiply_step]) - expect(workflow.run(6)).to eq([42]) + expect(workflow.run(6).value).to eq([MARS::Result.new(value: 42)]) end it "returns empty array when no steps" do workflow = described_class.new("empty", steps: []) - expect(workflow.run(42)).to eq([]) + expect(workflow.run(42).value).to eq([]) end it "records outputs in context per step" do - step1 = Class.new(MARS::Runnable) do - def run(input) = "from_step1:#{input}" + step1 = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "from_step1:#{input.value}") end.new(name: "step1") - step2 = Class.new(MARS::Runnable) do - def run(input) = "from_step2:#{input}" + step2 = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "from_step2:#{input.value}") end.new(name: "step2") - context = MARS::ExecutionContext.new(input: "hello") + context = MARS::Context.new(input: "hello") workflow = described_class.new("ctx_workflow", steps: [step1, step2]) - workflow.run(context) + result = workflow.run(context) - expect(context[:step1]).to eq("from_step1:hello") - expect(context[:step2]).to eq("from_step2:hello") + expect(context[:step1]).to eq(MARS::Result.new(value: "from_step1:hello")) + expect(context[:step2]).to eq(MARS::Result.new(value: "from_step2:hello")) + expect(result.outputs[:step1]).to eq(MARS::Result.new(value: "from_step1:hello")) end it "forks context so parallel steps get independent current_input" do - step1 = Class.new(MARS::Runnable) do - def run(input) = "#{input}_modified" + step1 = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "#{input.value}_modified") end.new(name: "step1") - step2 = Class.new(MARS::Runnable) do - def run(input) = "#{input}_also_modified" + step2 = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "#{input.value}_also_modified") end.new(name: "step2") - context = MARS::ExecutionContext.new(input: "original") + context = MARS::Context.new(input: "original") workflow = described_class.new("fork_test", steps: [step1, step2]) workflow.run(context) # Both steps received the same original input - expect(context[:step1]).to eq("original_modified") - expect(context[:step2]).to eq("original_also_modified") + expect(context[:step1]).to eq(MARS::Result.new(value: "original_modified")) + expect(context[:step2]).to eq(MARS::Result.new(value: "original_also_modified")) end it "shares global_state across forked contexts" do - step1 = Class.new(MARS::Runnable) do - def run(_input) - "done" + step1 = Class.new(MARS::Step) do + def run(_input, ctx: {}) + MARS::Result.new(value: "done") end end.new(name: "step1") - context = MARS::ExecutionContext.new(input: "test", global_state: { shared: true }) + context = MARS::Context.new(input: "test", global_state: { shared: true }) workflow = described_class.new("shared_state", steps: [step1]) workflow.run(context) - expect(context.global_state[:shared]).to be true + expect(context.state[:shared]).to be true end it "calls formatter on each step" do uppercase_formatter = Class.new(MARS::Formatter) do def format_output(output) - output.upcase + MARS::Result.new(value: output.value.upcase, stopped: output.stopped?) end end - step = Class.new(MARS::Runnable) do - def run(input) = "result:#{input}" + step = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "result:#{input.value}") end.new(name: "step", formatter: uppercase_formatter.new) workflow = described_class.new("fmt_workflow", steps: [step]) - expect(workflow.run("hello")).to eq(["RESULT:HELLO"]) + expect(workflow.run("hello").value).to eq([MARS::Result.new(value: "RESULT:HELLO")]) end it "fires before_run and after_run hooks" do hook_log = [] - step_class = Class.new(MARS::Runnable) do + step_class = Class.new(MARS::Step) do before_run { |_ctx, step| hook_log << "before:#{step.name}" } after_run { |_ctx, _result, step| hook_log << "after:#{step.name}" } - def run(input) = input + def run(input, ctx: {}) = input end step = step_class.new(name: "hooked") @@ -160,48 +175,33 @@ def run(input) = input expect(hook_log).to eq(["before:hooked", "after:hooked"]) end - it "unwraps local halts and returns plain result" do + it "returns branch results when a nested gate stops inside one branch" do gate = MARS::Gate.new( - "local_branch", - check: ->(_input) { :branch }, - fallbacks: { - branch: Class.new(MARS::Runnable) do - def run(input) - "branched:#{input}" + "branch_gate", + check: ->(_input, _ctx) { :branch }, + branches: { + branch: Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "branched:#{input.value}") end end.new(name: "branch_step") } ) add_five = add_step_class.new(5, name: "add_five") - workflow = described_class.new("halt_workflow", steps: [gate, add_five]) - - result = workflow.run(10) - expect(result).not_to be_a(MARS::Halt) - expect(result).to eq(["branched:10", 15]) - end - - it "propagates global halt to parent workflow" do - gate = MARS::Gate.new( - "global_branch", - check: ->(_input) { :branch }, - fallbacks: { - branch: Class.new(MARS::Runnable) do - def run(input) - "branched:#{input}" - end - end.new(name: "branch_step") - }, - halt_scope: :global + workflow = described_class.new( + "halt_workflow", + steps: [gate, add_five] ) - add_five = add_step_class.new(5, name: "add_five") - - workflow = described_class.new("halt_workflow", steps: [gate, add_five]) result = workflow.run(10) - expect(result).to be_a(MARS::Halt) - expect(result).to be_global - expect(result.result).to eq(["branched:10", 15]) + expect(result.value).to eq( + [ + MARS::Result.new(value: "branched:10", stopped: true), + MARS::Result.new(value: 15) + ] + ) + expect(result).not_to be_stopped end it "propagates errors from steps" do @@ -209,7 +209,10 @@ def run(input) error_step = error_step_class.new("Step failed", name: "error_step_one") error_step_two = error_step_class.new("Step failed two", name: "error_step_two") - workflow = described_class.new("error_workflow", steps: [add_step, error_step, error_step_two]) + workflow = described_class.new( + "error_workflow", + steps: [add_step, error_step, error_step_two] + ) expect { workflow.run(10) }.to raise_error( MARS::Workflows::AggregateError, @@ -221,7 +224,7 @@ def run(input) describe "inheritance" do it "inherits from MARS::Runnable" do workflow = described_class.new("test", steps: []) - expect(workflow).to be_a(MARS::Runnable) + expect(workflow).to be_a(MARS::Step) end end end diff --git a/spec/mars/workflows/sequential_spec.rb b/spec/mars/workflows/sequential_spec.rb index b2ea2b5..602d39e 100644 --- a/spec/mars/workflows/sequential_spec.rb +++ b/spec/mars/workflows/sequential_spec.rb @@ -8,8 +8,8 @@ def initialize(value, **kwargs) @value = value end - def run(input) - input + @value + def run(input, ctx: {}) + MARS::Result.new(value: input.value + @value) end end end @@ -21,8 +21,8 @@ def initialize(multiplier, **kwargs) @multiplier = multiplier end - def run(input) - input * @multiplier + def run(input, ctx: {}) + MARS::Result.new(value: input.value * @multiplier) end end end @@ -34,7 +34,7 @@ def initialize(message, **kwargs) @message = message end - def run(_input) + def run(_input, ctx: {}) raise StandardError, @message end end @@ -49,73 +49,76 @@ def run(_input) workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two]) # 10 + 5 = 15, 15 * 3 = 45, 45 + 2 = 47 - expect(workflow.run(10)).to eq(47) + result = workflow.run(10) + expect(result.value).to eq(47) + expect(result).not_to be_stopped end it "handles single step" do multiply_step = multiply_step_class.new(7, name: "multiply") workflow = described_class.new("single_step", steps: [multiply_step]) - expect(workflow.run(6)).to eq(42) + expect(workflow.run(6).value).to eq(42) end it "returns input unchanged when no steps" do workflow = described_class.new("empty", steps: []) - expect(workflow.run(42)).to eq(42) + expect(workflow.run(42).value).to eq(42) end it "records outputs in context accessible by step name" do - step1 = Class.new(MARS::Runnable) do - def run(input) = "from_step1:#{input}" + step1 = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "from_step1:#{input.value}") end.new(name: "step1") - step2 = Class.new(MARS::Runnable) do - def run(input) = "from_step2:#{input}" + step2 = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "from_step2:#{input.value}") end.new(name: "step2") - context = MARS::ExecutionContext.new(input: "hello") + context = MARS::Context.new(input: "hello") workflow = described_class.new("ctx_workflow", steps: [step1, step2]) - workflow.run(context) + result = workflow.run(context) - expect(context[:step1]).to eq("from_step1:hello") - expect(context[:step2]).to eq("from_step2:from_step1:hello") + expect(context[:step1]).to eq(MARS::Result.new(value: "from_step1:hello")) + expect(context[:step2]).to eq(MARS::Result.new(value: "from_step2:from_step1:hello")) + expect(result.outputs[:step2]).to eq(MARS::Result.new(value: "from_step2:from_step1:hello")) end - it "wraps raw input in ExecutionContext automatically" do - step = Class.new(MARS::Runnable) do - def run(input) = "processed:#{input}" + it "wraps raw input in Context automatically" do + step = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "processed:#{input.value}") end.new(name: "step") workflow = described_class.new("auto_wrap", steps: [step]) - expect(workflow.run("raw")).to eq("processed:raw") + expect(workflow.run("raw").value).to eq("processed:raw") end it "calls formatter on each step" do uppercase_formatter = Class.new(MARS::Formatter) do def format_output(output) - output.upcase + MARS::Result.new(value: output.value.upcase, stopped: output.stopped?) end end - step = Class.new(MARS::Runnable) do - def run(input) = "result:#{input}" + step = Class.new(MARS::Step) do + def run(input, ctx: {}) = MARS::Result.new(value: "result:#{input.value}") end.new(name: "step", formatter: uppercase_formatter.new) workflow = described_class.new("fmt_workflow", steps: [step]) - expect(workflow.run("hello")).to eq("RESULT:HELLO") + expect(workflow.run("hello").value).to eq("RESULT:HELLO") end it "fires before_run and after_run hooks" do hook_log = [] - step_class = Class.new(MARS::Runnable) do + step_class = Class.new(MARS::Step) do before_run { |_ctx, step| hook_log << "before:#{step.name}" } after_run { |_ctx, _result, step| hook_log << "after:#{step.name}" } - def run(input) = input + def run(input, ctx: {}) = input end step = step_class.new(name: "hooked") @@ -125,15 +128,15 @@ def run(input) = input expect(hook_log).to eq(["before:hooked", "after:hooked"]) end - it "halts locally when a gate triggers with local scope" do + it "stops the current workflow when a gate triggers" do add_five = add_step_class.new(5, name: "add_five") gate = MARS::Gate.new( - "local_gate", - check: ->(_input) { :branch }, - fallbacks: { - branch: Class.new(MARS::Runnable) do - def run(input) - "branched:#{input}" + "gate", + check: ->(_input, _ctx) { :branch }, + branches: { + branch: Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "branched:#{input.value}") end end.new(name: "branch_step") } @@ -144,65 +147,18 @@ def run(input) # 10 + 5 = 15, gate branches -> "branched:15", multiply_three is never reached result = workflow.run(10) - expect(result).to eq("branched:15") - expect(result).not_to be_a(MARS::Halt) - end - - it "propagates global halt without unwrapping" do - add_five = add_step_class.new(5, name: "add_five") - gate = MARS::Gate.new( - "global_gate", - check: ->(_input) { :branch }, - fallbacks: { - branch: Class.new(MARS::Runnable) do - def run(input) - "branched:#{input}" - end - end.new(name: "branch_step") - }, - halt_scope: :global - ) - multiply_three = multiply_step_class.new(3, name: "multiply_three") - - workflow = described_class.new("halt_workflow", steps: [add_five, gate, multiply_three]) - - result = workflow.run(10) - expect(result).to be_a(MARS::Halt) - expect(result).to be_global - expect(result.result).to eq("branched:15") - end - - it "propagates global halt through nested sequential workflows" do - inner_gate = MARS::Gate.new( - "inner_gate", - check: ->(_input) { :stop }, - fallbacks: { - stop: Class.new(MARS::Runnable) do - def run(input) - "stopped:#{input}" - end - end.new(name: "stop_step") - }, - halt_scope: :global - ) - - inner = described_class.new("inner", steps: [inner_gate]) - after_inner = add_step_class.new(100, name: "after_inner") - outer = described_class.new("outer", steps: [inner, after_inner]) - - result = outer.run(1) - expect(result).to be_a(MARS::Halt) - expect(result.result).to eq("stopped:1") + expect(result).to be_stopped + expect(result.value).to eq("branched:15") end it "consumes local halt — outer workflow continues" do inner_gate = MARS::Gate.new( "inner_gate", - check: ->(_input) { :stop }, - fallbacks: { - stop: Class.new(MARS::Runnable) do - def run(input) - "stopped:#{input}" + check: ->(_input, _ctx) { :stop }, + branches: { + stop: Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "stopped:#{input.value}") end end.new(name: "stop_step") } @@ -210,17 +166,17 @@ def run(input) inner = described_class.new("inner", steps: [inner_gate]) - string_step = Class.new(MARS::Runnable) do - def run(input) - "after:#{input}" + string_step = Class.new(MARS::Step) do + def run(input, ctx: {}) + MARS::Result.new(value: "after:#{input.value}") end end.new(name: "after_step") outer = described_class.new("outer", steps: [inner, string_step]) result = outer.run(1) - expect(result).to eq("after:stopped:1") - expect(result).not_to be_a(MARS::Halt) + expect(result.value).to eq("after:stopped:1") + expect(result).not_to be_stopped end it "propagates errors from steps" do @@ -236,7 +192,7 @@ def run(input) describe "inheritance" do it "inherits from MARS::Runnable" do workflow = described_class.new("test", steps: []) - expect(workflow).to be_a(MARS::Runnable) + expect(workflow).to be_a(MARS::Step) end end end