Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 222 additions & 14 deletions src/vulcanai/console/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

import argparse
import asyncio
import json
import sys
import threading
from datetime import datetime
from pathlib import Path
from typing import Optional

import pyperclip # To paste the clipboard into the terminal
from textual import events, work
Expand All @@ -33,6 +37,7 @@
from vulcanai.console.utils import SpinnerHook, StreamToTextual, attach_ros_logger_to_console, common_prefix
from vulcanai.console.widget_custom_log_text_area import CustomLogTextArea
from vulcanai.console.widget_spinner import SpinnerStatus
from vulcanai.core.plan_types import GlobalPlan


class TextualLogSink:
Expand Down Expand Up @@ -169,6 +174,9 @@ def __init__(
# Suggestion index for RadioListModal
self.suggestion_index = -1
self.suggestion_index_changed = threading.Event()
# Log creation files
now = datetime.now()
self.execution_time = now.strftime("%Y-%m-%d-%H-%M")

async def on_mouse_down(self, event: MouseEvent) -> None:
"""
Expand Down Expand Up @@ -266,6 +274,8 @@ def worker() -> None:
"/plan": self.cmd_plan,
"/rerun": self.cmd_rerun,
"/bb": self.cmd_blackboard_state,
"/save_plans": self.cmd_save_plans,
"/load_plan": self.cmd_load_plan,
"/clear": self.cmd_clear,
"/exit": self.cmd_quit,
}
Expand All @@ -288,8 +298,8 @@ def worker() -> None:
self.manager.register_tools_from_file(tool_file_path)

# Entry points tools
for ep in self.tools_from_entrypoints:
self.manager.register_tools_from_entry_points(ep)
if self.tools_from_entrypoints != "":
self.manager.register_tools_from_entry_points(self.tools_from_entrypoints)

# Add user context
self.manager.add_user_context(self.user_context)
Expand Down Expand Up @@ -367,6 +377,153 @@ def worker(user_input: str = "") -> None:

# region Utilities

def save_history_log(self, output_dir: Path | str = None) -> Optional[Path]:
"""
Save the queries history to a '.log' file.
"""
if output_dir is None:
# Default path
output_dir = Path("./saved_plans/")
else:
output_dir = Path(output_dir)

# Create the output directory if it doesn't exist
output_dir.mkdir(parents=True, exist_ok=True)

try:
# Create a filename with timestamp
filename = f"{self.execution_time}_history.log"
filepath = output_dir / filename

# 186║
# 187╗
# 188╝
# 200╚
# 201╔
# 202╩
# 203╦
# 204╠
# 205═

# Log tittle
log_lines = [
"╔" + "═" * 78 + "╗",
"║ VulcanAI Console Session History".ljust(79) + "║",
f"║ Execution Time: {self.execution_time}".ljust(79) + "║",
"╚" + "═" * 78 + "╝",
"",
]

# Add queries from manager history
if self.manager and hasattr(self.manager, "history") and self.manager.history:
log_lines.append(f"Total Queries: {len(self.manager.history)}")
log_lines.append("-" * 48)
log_lines.append("")

for i, (user_text, plan_summary) in enumerate(self.manager.history, 1):
log_lines.append(f"Query #{i}")
log_lines.append(
f" User Input: {user_text.split(chr(10))[-1] if chr(10) in user_text else user_text}"
)
log_lines.append(f" Plan Summary: {plan_summary}")
log_lines.append("")
else:
log_lines.append("No queries recorded.")
log_lines.append("")

# Save to file
filepath.write_text("\n".join(log_lines), encoding="utf-8")

self.logger.log_console(f"History log saved to: {filepath}")
return filepath

except Exception as e:
self.logger.log_console(f"Error saving history log: {e}")
return None

def save_plans_to_json(self, output_dir: Path | str = None) -> list[Path]:
"""
Save all plans from 'self.plans_list' to JSON files.
"""
if output_dir is None:
# Default path
output_dir = Path("./saved_plans/")
else:
output_dir = Path(output_dir)

# Create the output directory if it doesn't exist
output_dir.mkdir(parents=True, exist_ok=True)

saved_files = []

for idx, plan in enumerate(self.plans_list):
if plan is None:
self.logger.log_console(f"Skipping plan {idx}: plan is None")
continue

try:
# Create a filename based on index and timestamp
filename = f"{self.execution_time}_plan_{idx}.json"
filepath = output_dir / filename

# Convert plan to JSON dictionary
plan_dict = plan.model_dump()

# Save to file
filepath.write_text(json.dumps(plan_dict, indent=2), encoding="utf-8")

saved_files.append(filepath)
self.logger.log_console(f"Saved plan {idx} to: {filepath}")

except Exception as e:
self.logger.log_console(f"Error saving plan {idx}: {e}")

if saved_files:
self.logger.log_console(f"Successfully saved {len(saved_files)} plan(s)")
else:
self.logger.log_console("No plans were saved")

return saved_files

def load_and_execute_plan(self, plan_file: Path | str) -> Optional[dict]:
"""
Load a plan from a JSON file and execute it.
"""
try:
plan_file = Path(plan_file)

if not plan_file.exists():
self.logger.log_console(f"Error: Plan file not found: {plan_file}")
return None

# Load the plan from JSON
plan_data = json.loads(plan_file.read_text(encoding="utf-8"))

# Convert JSON to GlobalPlan object
plan = GlobalPlan.model_validate(plan_data)

self.logger.log_console(f"Loaded plan from: {plan_file}")
self.logger.log_console("Executing plan...")

# Execute the plan using the manager's executor and blackboard
result = self.manager.executor.run(plan, self.manager.bb)

# Update the last blackboard state
self.last_bb = result.get("blackboard", None)

# Log the result
bb_parsed = str(self.last_bb).replace("<", "'").replace(">", "'")
self.logger.log_console(f"Plan execution completed. Output: {bb_parsed}")

return result

except json.JSONDecodeError as e:
self.logger.log_console(f"Error: Invalid JSON in plan file: {e}")
return None
except Exception as e:
self.logger.log_console(f"Error executing plan: {e}")
return None

def _apply_history_to_input(self) -> None:
"""
Function used to apply the current history index to the input box.
Expand Down Expand Up @@ -482,20 +639,22 @@ def cmd_help(self, _) -> None:
"___________________\n"
"<bold>Available commands:</bold>\n"
"‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\n"
"/<bold>help</bold> - Show this help message\n"
"/<bold>tools</bold> - List available tools\n"
"/<bold>edit_tools</bold> - Edit the list of available tools\n"
"/<bold>change_k 'int'</bold> - Change the 'k' value for the top_k algorithm selection"
"/<bold>help</bold> - Show this help message\n"
"/<bold>tools</bold> - List available tools\n"
"/<bold>edit_tools</bold> - Edit the list of available tools\n"
"/<bold>change_k 'int'</bold> - Change the 'k' value for the top_k algorithm selection"
" or show the current value if no 'int' is provided\n"
"/<bold>history 'int'</bold> - Change the history depth or show the current value if no"
"/<bold>history 'int'</bold> - Change the history depth or show the current value if no"
" 'int' is provided\n"
"/<bold>show_history</bold> - Show the current history\n"
"/<bold>clear_history</bold> - Clear the history\n"
"/<bold>plan</bold> - Show the last generated plan\n"
"/<bold>rerun</bold> - Rerun the last plan\n"
"/<bold>bb</bold> - Show the last blackboard state\n"
"/<bold>clear</bold> - Clears the console screen\n"
"/<bold>exit</bold> - Exit the console\n"
"/<bold>show_history</bold> - Show the current history\n"
"/<bold>clear_history</bold> - Clear the history\n"
"/<bold>plan</bold> - Show the last generated plan\n"
"/<bold>rerun</bold> - Rerun the last plan\n"
"/<bold>save_plans 'dir'</bold> - Save all generated plans to JSON files (default: ./saved_plans/)\n"
"/<bold>load_plan 'path'</bold> - Load and execute a plan from a JSON file\n"
"/<bold>bb</bold> - Show the last blackboard state\n"
"/<bold>clear</bold> - Clears the console screen\n"
"/<bold>exit</bold> - Exit the console\n"
"<bold>Query any other text</bold> to process it with the LLM and execute the plan generated.\n\n"
"Add --image='path' to include images in the query. It can be used multiple times to add"
" more images.\n"
Expand Down Expand Up @@ -664,6 +823,55 @@ def cmd_blackboard_state(self, _) -> None:
else:
self.logger.log_console("No blackboard available.")

def cmd_save_plans(self, args) -> None:
"""Save all generated plans to JSON files and history log."""
output_dir = "./saved_plans/"
if len(args) > 0:
output_dir = args[0]

# Save history log first
history_log_path = self.save_history_log(output_dir)

if not self.plans_list:
self.logger.log_console("No plans to save")
if history_log_path:
self.logger.log_console(f"History log saved successfully to: {Path(output_dir).absolute()}")
return

saved_files = self.save_plans_to_json(output_dir)
if saved_files:
self.logger.log_console(f"All plans saved to: {Path(output_dir).absolute()}")
else:
self.logger.log_console("No plans were saved")

def cmd_load_plan(self, args) -> None:
self._load_plan_worker(args) # start worker (dont await)

@work(thread=True)
async def _load_plan_worker(self, args) -> None:
"""
Worker function used to run the command "rerun".
It has to be a worker(thead=True) because the call 'self.manager.executor.run'
might have a "call_from_thread" in the tool executed,
and it is only valid in non Textual app Threads (separated Thread).

@work runs on the app's event loop (app thread) and is for async, non-blocking code.
@work(thread=True) runs in a separate OS thread and is for blocking.

e.g.:
'move_turtle' tool contains a 'call_from_thread'
'ros2_topic' tool does not contains a 'call_from_thread'
"""

if len(args) == 0:
self.logger.log_console("Usage: /load_plan 'path/to/plan.json'")
return

plan_file = args[0]
result = self.load_and_execute_plan(plan_file)
if result is None:
self.logger.log_console(f"Failed to load and execute plan from: {plan_file}")

def cmd_clear(self, _) -> None:
self.left_pannel.clear_console()

Expand Down