diff --git a/pyproject.toml b/pyproject.toml index 27e9482..6e0441d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,8 +19,13 @@ dependencies = [ "ollama==0.6.0", "openai==1.108.0", "prompt_toolkit==3.0.52", - "rich==14.1.0", + "rich==14.2.0", "sentence-transformers==5.1.0", + "textual==6.5.0", + "textual-dev==1.8.0", + "textual-serve==1.1.3", + "typeguard==2.13", + "pyperclip==1.11.0", ] [project.optional-dependencies] diff --git a/src/vulcanai/console/__init__.py b/src/vulcanai/console/__init__.py index ce0f2fa..dd75589 100644 --- a/src/vulcanai/console/__init__.py +++ b/src/vulcanai/console/__init__.py @@ -17,6 +17,8 @@ _EXPORTS = { "VulcanConsole": ".console:VulcanConsole", "VulcanAILogger": ".logger:VulcanAILogger", + "LogSink": ".logger:LogSink", + "RichStdoutSink": ".logger:RichStdoutSink", } __all__ = list(_EXPORTS.keys()) diff --git a/src/vulcanai/console/console.py b/src/vulcanai/console/console.py index 6450548..c101167 100644 --- a/src/vulcanai/console/console.py +++ b/src/vulcanai/console/console.py @@ -12,209 +12,1095 @@ # See the License for the specific language governing permissions and # limitations under the License. -from threading import Lock +from __future__ import annotations + import argparse -import os +import asyncio +import pyperclip # To paste the clipboard into the terminal +import sys +import threading -from prompt_toolkit import PromptSession -from rich.progress import Progress, SpinnerColumn, TextColumn -from vulcanai.models.model import IModelHooks -from vulcanai.console.logger import console +from textual import events, work +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Horizontal, Vertical, VerticalScroll +from textual.events import MouseEvent +from textual.markup import escape # To remove potential errors in textual terminal +from textual.widgets import Input, Static +from vulcanai.console.widget_custom_log_text_area import CustomLogTextArea +from vulcanai.console.logger import VulcanAILogger +from vulcanai.console.modal_screens import CheckListModal, RadioListModal, ReverseSearchModal +from vulcanai.console.utils import attach_ros_logger_to_console, common_prefix, SpinnerHook, StreamToTextual +from vulcanai.console.widget_spinner import SpinnerStatus -class SpinnerHook(IModelHooks): - """ - Single entrant spinner controller for console. - - Starts the spinner on the first LLM request. - - Stops the spinner when LLM request is over. + +class TextualLogSink: + """A default console that prints to standard output.""" + def __init__(self, textual_console) -> None: + self.console = textual_console + + def write(self, msg: str, color: str = "") -> None: + self.console.add_line(msg, color) + + +class VulcanConsole(App): + + # CSS Styles + # Two panels: left (log + input) and right (history + variables) + # Right panel: 48 characters length + # Left panel: fills remaining space + CSS = """ + Screen { + layout: horizontal; + } + + #left { + width: 1fr; + layout: vertical; + } + + #right { + width: 48; + layout: vertical; + border: tall #56AA08; + padding: 0; + } + + #logcontent { + height: auto; + min-height: 1; + max-height: 1fr; + border: tall #333333; + } + + #llm_spinner { + height: 0; + display: none; + content-align: left middle; + padding-left: 2; + } + + #cmd { + dock: bottom; + } + + #history_title { + content-align: center middle; + margin: 0; + padding: 0; + } + + #history_scroll { + height: 1fr; + margin: 1; + } + + #history { + width: 100%; + } """ - def __init__(self, console): - self.console = console - self._lock = Lock() - - def on_request_start(self) -> None: - with self._lock: - # First request => create spinner - self._progress = Progress( - SpinnerColumn(spinner_name="dots2"), - TextColumn("[blue]Querying LLM...[/blue]"), - console=self.console, - transient=True, - ) - self._progress.start() - self._task_id = self._progress.add_task("llm", start=True) - - def on_request_end(self) -> None: - with self._lock: - if self._progress is not None: - # Last request finished => stop spinner - try: - if self._task_id is not None: - self._progress.remove_task(self._task_id) - except Exception: - pass - self._progress.stop() - self._progress = None - self._task_id = None + # Bindings for the console + BINDINGS = [ + Binding("ctrl+l", "clear_log", "Clear log"), + Binding("f2", "show_help", "Show help", priority=True), + Binding("ctrl+r", "reverse_search", "Reverse search"), + Binding("ctrl+c", "stop_streaming_task", "Stop Streaming"), + Binding("up", "history_prev", show=False), + Binding("down", "history_next", show=False), + ] + + def __init__(self, + model: str = "gpt-5-nano", k: int = 7, iterative: bool = False, + register_from_file:str = "", tools_from_entrypoints: str = "", + user_context: str = "", main_node = None): + super().__init__() # Textual lib -class VulcanConsole: - def __init__(self, model: str = "gpt-5-nano", k: int = 7, iterative: bool = False): + # -- Main variables -- + # Manager instance self.manager = None - self.session = PromptSession() - self.last_plan = None + # List of generated plans + self.plans_list = [] + # Last generated blackboard state self.last_bb = None - self.hooks = SpinnerHook(console) - + # Spinner hook for LLM requests + self.spinner_status = None + self.hooks = None + # AI model self.model = model + # 'k' value for top_k tools selection self.k = k + # Iterative mode + self.iterative = iterative + # CustomLogTextArea instance + self.left_pannel = None + # Logger instance + self.logger = VulcanAILogger.default() + self.logger.set_textualizer_console(TextualLogSink(self)) - self.init_manager(iterative) + # Tools to register from entry points + self.register_from_file = register_from_file + self.tools_from_entrypoints = tools_from_entrypoints + self.user_context = user_context + # ROS 2 main node + self.main_node = main_node - # Override hooks with spinner controller - try: - self.manager.llm.set_hooks(self.hooks) - except Exception: - pass + # -- Console extra variables -- + + # Commands available in the console + self.commands = None + # Tab completion state + self.tab_matches = [] + # Current index in the tab matches + self.tab_index = 0 + + # Terminal qol + self.history = [] + + # Streaming task control + self.stream_task = None + # Suggestion index for RadioListModal + self.suggestion_index = -1 + self.suggestion_index_changed = threading.Event() + + + async def on_mouse_down(self, event: MouseEvent) -> None: + """ + Function used to paste the string for the user clipboard + + on_mouse_down() function is called when a mouse button is pressed. + In this case, only the middle button is used to paste the clipboard content. + """ + + if event.button == 2: # Middle click + await self._paste_clipboard() + event.prevent_default() + event.stop() - def run(self): - self.print("VulcanAI Interactive Console") - self.print("Type 'exit' to quit.\n") + async def on_mount(self) -> None: + """ + Function called when the console is mounted. + """ - while True: + self.left_pannel = self.query_one("#logcontent", CustomLogTextArea) + self.spinner_status = self.query_one("#llm_spinner", SpinnerStatus) + self.hooks = SpinnerHook(self.spinner_status) + + # Disable terminal input + self.set_input_enabled(False) + sys.stdout = StreamToTextual(self, "stdout") + sys.stderr = StreamToTextual(self, "stderr") + + self.loop = asyncio.get_running_loop() + asyncio.create_task(self.bootstrap()) + + def compose(self) -> ComposeResult: + """ + Function used to create the console layout. + It is called at the beggining of the console execution. + """ + + color_tmp = VulcanAILogger.vulcanai_theme["vulcanai"] + + vulcanai_title_slant = \ +f"""[{color_tmp}] + _ __ __ ___ ____ +| | / /_ __/ /________ ____ / | / _/ +| | / / / / / / ___/ __ `/ __ \/ /| | / / +| |/ / /_/ / / /__/ /_/ / / / / ___ |_/ / +|___/\__,_/_/\___/\__,_/_/ /_/_/ |_/___/[/{color_tmp}] +""" + + # Textual layout + with Horizontal(): + # Left + with Vertical(id="left"): + # Log Area + logcontent = CustomLogTextArea(id="logcontent") + yield logcontent + # Spinner Area + yield SpinnerStatus(logcontent, id="llm_spinner") + # Input Area + yield Input(placeholder="> ", id="cmd") + + # Right + with Vertical(id="right"): + # Title Area + yield Static(vulcanai_title_slant, id="history_title") + # Variable info Area + yield Static(f" Loading info...", id="variables") + # History Area + with VerticalScroll(id="history_scroll"): + # NOTE: markup=True so [bold reverse] works + yield Static("", id="history", markup=True) + + async def bootstrap(self) -> None: + """ + Function used to initialize the console manager. + Print information at runtime execution of a function, without blocking the main thread + so Textual Log does not freeze. + """ + + def worker() -> None: + """ + Worker function to run in a separate thread. + """ + + self.init_manager() + + # -- Add the commands -- + # Command registry: name -> handler + self.commands = { + "/help": self.cmd_help, + "/tools": self.cmd_tools, + "/edit_tools": self.cmd_edit_tools, + "/change_k": self.cmd_change_k, + "/history": self.cmd_history_index, + "/show_history": self.cmd_show_history, + "/clear_history": self.cmd_clear_history, + "/plan": self.cmd_plan, + "/rerun": self.cmd_rerun, + "/bb": self.cmd_blackboard_state, + "/clear": self.cmd_clear, + "/exit": self.cmd_quit, + } + + # Tab matches initialization + self.tab_matches = [] + self.tab_index = 0 + + # -- Spinner controller -- try: - user_input = self.session.prompt("[USER] >>> ") - if user_input.strip().lower() in ("exit", "quit"): - break + self.manager.llm.set_hooks(self.hooks) + except Exception: + pass - # Internal commands start with / - if user_input.startswith("/"): - self.handle_command(user_input) - continue + # -- Register tools -- + # Default tools - # Check for image input. Must be always at the end of the input + # File paths tools + for tool_file_path in self.register_from_file: + self.manager.register_tools_from_file(tool_file_path) + + # Entry points tools + if self.tools_from_entrypoints != "": + self.manager.register_tools_from_entry_points(self.tools_from_entrypoints) + + # Add user context + self.manager.add_user_context(self.user_context) + # Add console to blackboard + self.manager.bb["console"] = self + + # Add the shared node to the console manager blackboard to be used by tools + if self.main_node != None: + self.manager.bb["main_node"] = self.main_node + attach_ros_logger_to_console(self, self.main_node) + + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, lambda: worker()) + + self.is_ready = True + self.logger.log_console("VulcanAI Interactive Console") + self.logger.log_console("Use 'Ctrl+Q' to quit.") + + # Activate the terminal input + self.set_input_enabled(True) + + async def queriestrap(self, user_input: str="") -> None: + """ + Function used to handle user requests. + Print information at runtime execution of a function, without blocking the main thread + so Textual Log does not freeze. + """ + + def worker(user_input: str="") -> None: + """ + Worker function to run in a separate thread. + + user_input: (str) The user input to process. + """ + # Disable terminal input + self.set_input_enabled(False) + + try: images = [] + + # Add the images if "--image=" in user_input: images = self.get_images(user_input) # Handle user request try: result = self.manager.handle_user_request(user_input, context={"images": images}) + except Exception as e: - self.print(f"[error]Error handling request:[/error] {e}") - continue + self.logger.log_msg(f"[error]Error handling request:[/error] {e}") + return - self.last_plan = result.get("plan", None) + # Store the plan and blackboard state + self.plans_list.append(result.get("plan", None)) self.last_bb = result.get("blackboard", None) - self.print(f"Output of plan: {result.get('blackboard', {None})}") + # Print the backboard state + bb_ret = result.get('blackboard', None) + bb_ret = str(bb_ret).replace('<', '\'').replace('>', '\'') + self.logger.log_console(f"Output of plan: {bb_ret}") except KeyboardInterrupt: - console.print("[yellow]Exiting...[/yellow]") - break + self.logger.log_msg("Exiting...") + return except EOFError: - console.print("[yellow]Exiting...[/yellow]") - break + self.logger.log_msg("Exiting...") + return - def init_manager(self, iterative: bool = False): - if iterative: - from vulcanai.core.manager_iterator import IterativeManager as ConsoleManager + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, lambda: worker(user_input)) + + # Activate the terminal input + self.set_input_enabled(True) + + # region Utilities + + def _apply_history_to_input(self) -> None: + """ + Function used to apply the current history index to the input box. + + Used in the history navigation actions (up/down keys). + """ + # Get the input box + cmd_input = self.query_one("#cmd", Input) + if self.history_index is None or self.history_index == len(self.history): + cmd_input.value = "" else: - from vulcanai.core.manager_plan import PlanManager as ConsoleManager - console.print(f"[console]Initializing Manager '{ConsoleManager.__name__}'...[/console]") - self.manager = ConsoleManager(model=self.model, k=self.k) - self.print(f"Manager initialized with model '{self.model}'.") - - def handle_command(self, cmd: str): - """Process internal console commands.""" - if cmd == "/help": - help_msg = ( - "Available commands:\n" - "/help - Show this help message\n" - "/tools - List available tools\n" - "/change_k - Change the 'k' value for the top_k algorithm selection\n" - "/history - Change the history depth or show the current value if no is provided\n" - "/show_history - Show the current history\n" - "/plan - Show the last generated plan\n" - "/rerun - Rerun the last plan\n" - "/bb - Show the last blackboard state\n" - "/clear - Clear the console screen\n" - "exit - Exit the console\n" - "Query any other text to process it with the LLM and execute the plan generated." - "Add --image= to include images in the query. It can be used multiple times to add more images." - " Example: ' --image=/path/to/image1 --image=/path/to/image2'" - ) - self.print(help_msg) - - elif cmd == "/tools": - help_msg = f"\nAvailable tools (current index k={self.manager.k}):\n" - for tool in self.manager.registry.tools.values(): - help_msg += f"- {tool.name}: {tool.description}\n" - self.print(help_msg) - - elif cmd.startswith("/change_k"): - parts = cmd.split() - if len(parts) != 2 or not parts[1].isdigit(): - self.print(f"[error]Usage: /change_k [/error] - Actual k is {self.manager.k}") + # Set the input value to the current history index + cmd_input.value = self.history[self.history_index] + + # Focus the input box + cmd_input.cursor_position = len(cmd_input.value) + cmd_input.focus() + + def _update_history_panel(self) -> None: + """ + Function used to update the right panel 'history' widget with + the current history list of written commands/queries. + """ + # Get the history widget + history_widget = self.query_one("#history", Static) + + plan_count = 0 + lines = [] + for i, cmd in enumerate(self.history): + cmd_esc = escape(cmd) + prefix = "" + tmp_color = "" + if len(cmd_esc) > 0 and cmd_esc[0] != '/': + tmp_color = VulcanAILogger.vulcanai_theme["vulcanai"] + prefix = f" [{tmp_color}][Plan {plan_count}][/{tmp_color}]\n" + plan_count += 1 + else: + tmp_color = VulcanAILogger.vulcanai_theme["console"] + + text = f"{prefix} [{tmp_color}]{i+1}:[/{tmp_color}] {escape(cmd)}" + if self.history_index is not None and self.history_index == i: + # Highlight current selection + text = f"[bold reverse]{text}[/]" + lines.append(text) + + history_widget.update("\n".join(lines)) + + def _update_variables_panel(self) -> None: + """ + Function used to update the right panel 'variables' widget with + the current variables info (model, k, history_depth). + """ + + text = f" AI model: {self.model.replace('ollama-', '')}\n K = {self.manager.k}\n history_depth = {self.manager.history_depth}" + kvalue_widget = self.query_one("#variables", Static) + kvalue_widget.update(text) + + @work # Runs in a worker. waiting won't freeze the UI + async def open_checklist(self, tools_list: list[str], active_tools_num: int) -> None: + """ + Function used to open a Checklist ModalScreen in the console. + Used in the /edit_tools command. + """ + # Create the checklist dialog + selected = await self.push_screen_wait(CheckListModal(tools_list, active_tools_num)) + + if selected is None: + self.logger.log_msg("Selection cancelled.") + else: + + # Iterate over all tools and activate/deactivate accordingly + # to the selection made by the user + for tool_tmp in tools_list: + # Remove "- " prefix + tool = tool_tmp[2:] + + if tool_tmp in selected: + # Current tool checbox, activated + if self.manager.registry.activate_tool(tool): + self.logger.log_console(f"Activated tool '{tool}'") + else: + # Current tool checbox, deactivated + if self.manager.registry.deactivate_tool(tool): + self.logger.log_console(f"Deactivated tool '{tool}'") + + @work + async def open_radiolist(self, option_list: list[str], tool: str = "") -> str: + """ + Function used to open a RadioList ModalScreen in the console. + Used in the tool suggestion selection, for default tools. + """ + # Create the checklist dialog + selected = await self.push_screen_wait(RadioListModal(option_list)) + + if selected is None: + self.logger.log_tool(f"Suggestion cancelled", tool_name=tool) + self.suggestion_index = -2 + return + + self.logger.log_tool(f"Selected suggestion: \"{option_list[selected]}\"", tool_name=tool) + self.suggestion_index = selected + self.suggestion_index_changed.set() # signal change + + + # endregion + + # region Commands + + def cmd_help(self, _) -> None: + table = "\n".join( + [ + "___________________\n" + "Available commands:\n" + "‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\n" + "/help - Show this help message\n" + "/tools - List available tools\n" + "/edit_tools - Edit the list of available tools\n" + "/change_k 'int' - Change the 'k' value for the top_k algorithm selection or show the current value if no 'int' is provided\n" + "/history 'int' - Change the history depth or show the current value if no 'int' is provided\n" + "/show_history - Show the current history\n" + "/clear_history - Clear the history\n" + "/plan - Show the last generated plan\n" + "/rerun - Rerun the last plan\n" + "/bb - Show the last blackboard state\n" + "/clear - Clears the console screen\n" + "/exit - Exit the console\n" + "Query any other text to process it with the LLM and execute the plan generated.\n\n" + "Add --image='path' to include images in the query. It can be used multiple times to add more images.\n" + "Example: 'user_prompt' --image=/path/to/image1 --image=/path/to/image2'\n" + "___________________\n" + "Available keybinds:\n" + "‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\n" + "F2 - Show this help message\n" + "F3 - Copy selection area\n" + "Ctrl+Q - Exit the console\n" + "Ctrl+L - Clears the console screen\n" + "Ctrl+U - Clears the entire command line input\n" + "Ctrl+K - Clears from the cursor to then end of the line\n" + "Ctrl+W - Delete the word before the cursor\n" + "Ctrl+'left/right' - Move cursor backward/forward by one word\n" + "Ctrl+R - Reverse search through command history (try typing part of a previous command).\n" + ] + ) + self.logger.log_console(table, "console") + + def cmd_tools(self, _) -> None: + tmp_msg = f"(current index k={self.manager.k})" + tool_msg = ("_" * len(tmp_msg)) + '\n' + tool_msg += f"Available tools:\n" + tool_msg += tmp_msg + '\n' + ("‾" * len(tmp_msg)) + '\n' + + for tool in self.manager.registry.tools.values(): + tool_msg += f"- {tool.name}: {tool.description}\n" + self.logger.log_console(tool_msg, "console") + + def cmd_edit_tools(self, _) -> None: + tools_list = [] + for tool in self.manager.registry.tools.values(): + tools_list.append(f"- {tool.name}") + + active_tools_num = len(tools_list) + + for deactivated_tool in self.manager.registry.deactivated_tools.values(): + tools_list.append(f"- {deactivated_tool.name}") + + self.open_checklist(tools_list, active_tools_num) + + def cmd_change_k(self, args) -> None: + if len(args) == 0: + self.logger.log_console(f"Current 'k' is {self.manager.k}") + return + if len(args) != 1 or not args[0].isdigit(): + self.logger.log_console(f"Usage: /change_k 'int' - Actual 'k' is {self.manager.k}") + return + + new_k = int(args[0]) + self.manager.k = new_k + self.manager.update_k_index(new_k) + # Update right panel info + self._update_variables_panel() + + def cmd_history_index(self, args) -> None: + if len(args) == 0: + self.logger.log_console(f"Current 'history depth' is {self.manager.history_depth}") + return + if len(args) != 1 or not args[0].isdigit(): + self.logger.log_console(f"Usage: /history 'int' - Actual 'history depth' is {self.manager.history_depth}") + return + + new_hist = int(args[0]) + self.manager.update_history_depth(new_hist) + # Update right panel info + self._update_variables_panel() + + def cmd_show_history(self, _) -> None: + if not self.manager.history: + self.logger.log_console("No history available.") + return + + history_msg = \ + "________________\n" + \ + "Current history:\n" + \ + "(oldest first)\n" + \ + "‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\n" + + self.logger.log_console(history_msg, "console") + for i, (user_text, plan_summary) in enumerate(self.manager.history): + user_req_cmd = user_text.split('\n') + self.logger.log_msg(f"{i+1}. [USER] >>> {user_req_cmd[1]}\n",) + self.logger.log_msg(f"Plan summary: {plan_summary}\n") + + def cmd_clear_history(self, _) -> None: + # Reset history + self.history.clear() + self.history_index = None + + # Empty right panel 'history' + history_widget = self.query_one("#history", Static) + history_widget.update("") + # Clear the list of plans + self.plans_list.clear() + + # Add feedback line + self.logger.log_msg("History cleared.") + + def cmd_plan(self, _) -> None: + if len(self.plans_list) > 0: + self.logger.log_console("Last generated plan:") + self.logger.log_console(str(self.plans_list[-1]), color="white") + else: + self.logger.log_console("No plan has been generated yet.") + + def cmd_rerun(self, args) -> None: + self._rerun_worker(args) # start worker (dont await) + + @work(thread=True) + async def _rerun_worker(self, args) -> None: + """ + Worker function used to run the command "rerun". + It has to be a worker(thead=True) because the call 'self.manager.executor.run' + might have a "call_from_thread" in the tool executed, + and it is only valid in non Textual app Threads (separated Thread). + + @work runs on the app's event loop (app thread) and is for async, non-blocking code. + @work(thread=True) runs in a separate OS thread and is for blocking. + + e.g.: + 'move_turtle' tool contains a 'call_from_thread' + 'ros2_topic' tool does not contains a 'call_from_thread' + """ + selected_plan = 0 + if len(args) == 0: + # No index specified. Last plan selected + selected_plan = len(self.plans_list) - 1 + elif len(args) != 1 or not args[0].isdigit(): + self.logger.log_console("Usage: /rerun 'int'") + return + else: + selected_plan = int(args[0]) + if selected_plan < -1: + self.logger.log_console("Usage: /rerun 'int' [int >= -1].") return - new_k = int(parts[1]) - self.manager.k = new_k - self.print(f"Changed k to {new_k}") - - elif cmd.startswith("/history"): - parts = cmd.split() - if len(parts) == 1: - self.print(f"Current history depth is {self.manager.history_depth}") + + if not self.plans_list: + self.logger.log_console("No plan to rerun.") + return + + self.logger.log_console(f"Rerunning {selected_plan}-th plan...") + + # Execute the plan + result = self.manager.executor.run(self.plans_list[selected_plan], self.manager.bb) + + last_bb = result.get("blackboard", None) + last_bb_parsed = str(last_bb).replace("<", "'").replace(">", "'") + + # UI updates must happen on the app thread: + def apply_result(): + self.last_bb = last_bb + self.logger.log_console(f"Output of rerun: {last_bb_parsed}") + + self.call_from_thread(apply_result) + + def cmd_blackboard_state(self, _) -> None: + if self.last_bb: + self.logger.log_console("Lastest blackboard state:") + # Parse the blackboard to avoid <...> issues in textual + last_bb_parsed = str(self.last_bb) + last_bb_parsed = last_bb_parsed.replace('<', '\'').replace('>', '\'') + self.logger.log_console(last_bb_parsed) + else: + self.logger.log_console("No blackboard available.") + + def cmd_clear(self, _) -> None: + self.left_pannel.clear_console() + + def cmd_quit(self, _) -> None: + self.exit() + + # endregion + + # region Logging + + def add_line(self, input: str, + color: str = "", + subprocess_flag: bool = False) -> None: + """ + Function used to write an input in the VulcanAI terminal. + """ + # Split incoming text into individual lines + lines = input.splitlines() + + color_begin = "" + color_end = "" + if color != "": + color_begin = f"<{color}>" + color_end = f"" + + + # Append each line; deque automatically truncates old ones + for line in lines: + line_processed = line + if subprocess_flag: + line_processed = escape(line) + text = f"{color_begin}{line_processed}{color_end}" + if not self.left_pannel.append_line(text): + self.logger.log_console(f"Warning: Trying to add an empty line.") + + def delete_last_line(self): + """ + Function used to remove the last line in the VulcanAI terminal. + """ + self.left_pannel.delete_last_row() + + # endregion + + # region Input + + def set_input_enabled(self, enabled: bool) -> None: + """ + Function used to enable/disable the terminal input box. + """ + cmd = self.query_one("#cmd", Input) + cmd.disabled = not enabled + if enabled: + self.set_focus(cmd) + + async def on_input_submitted(self, event: Input.Submitted) -> None: + """ + Function called when the user submits a command in the input box. + It handles the command, queries and updates the history. + """ + if not self.is_ready: + # Console not ready yet + return + + cmd = event.value.strip() + if not cmd: + # Empty command + return + + cmd_input = self.query_one("#cmd", Input) + + try: + if event.input.id != "cmd": + # Not the command input box return - if len(parts) != 2 or not parts[1].isdigit(): - self.print(f"[error]Usage: /history [/error] - Actual history depth is {self.manager.history_depth}") + + # Get the user input and strip leading/trailing spaces + user_input = (event.value or "").strip() + + # The the user_input in the history navigation list (used when the up, down keys are pressed) + self.history.append(user_input) + self.history_index = len(self.history) + # Update the right panel 'history' widget + self._update_history_panel() + event.input.value = "" + event.input.focus() + + # Reset tab state + self.tab_matches = [] + self.tab_index = 0 + + if not user_input: + cmd_input.focus() return - new_hist = int(parts[1]) - self.manager.update_history_depth(new_hist) - elif cmd == "/show_history": - if not self.manager.history: - self.print("No history available.") + # Echo what the user typed + self.logger.log_user(cmd) + + # If it start with '/', just print it as output and stop here + if user_input.startswith("/"): + self.handle_command(user_input) return - help_msg = "\nCurrent history (oldest first):\n" - for i, (user_text, plan_summary) in enumerate(self.manager.history): - help_msg += f"{i+1}. User: {user_text}\n Plan summary: {plan_summary}\n" - self.print(help_msg) - - elif cmd == "/plan": - if self.last_plan: - self.print("Last generated plan:") - console.print(self.last_plan) - else: - self.print("No plan has been generated yet.") - elif cmd == "/rerun": - if self.last_plan: - self.print("Rerunning last plan...") - result = self.manager.executor.run(self.last_plan, self.manager.bb) - self.last_bb = result.get("blackboard", None) - self.print(f"Output of rerun: {result.get('blackboard', {None})}") - else: - self.print("No plan to rerun.") + await asyncio.sleep(0) + asyncio.create_task(self.queriestrap(user_input)) + + except KeyboardInterrupt: + self.logger.log_msg("Exiting...") + return + except EOFError: + self.logger.log_msg("Exiting...") + return + + def handle_command(self, user_input: str) -> None: + """ + Function used to handle slash-commands in the console. + 1. If the command is known, execute it. + 2. If the command is unknown, print an error message. + """ + # Parse as a command + parts = user_input.split() + cmd = parts[0].lower() + args = parts[1:] + + handler = self.commands.get(cmd) + if handler is None: + # Only complain for slash-commands + self.logger.log_console(f"Unknown command: {cmd}. Type '/help'.") + else: + try: + handler(args) + except Exception as e: + self.logger.log_msg(f"[error]Error: {e!r}[/error]") + + async def _paste_clipboard(self) -> None: + """ + Function used to paste the clipboard content into the terminal input box. + """ + # Get the input box + cmd_input = self.query_one("#cmd", Input) + + try: + paste_text = pyperclip.paste() or "" + except Exception as e: + self.logger.log_msg(f"[error]Clipboard error: {e}[/error]") + return + + if not paste_text: + # Nothing to paste + return + + # Remove endlines + cut = paste_text.find("\n") + if cut != -1: + # Only paste up to the first newline + paste_text = paste_text[:cut] + + value = cmd_input.value + cursor = cmd_input.cursor_position + + # Insert text at cursor + cmd_input.value = value[:cursor] + paste_text + value[cursor:] + cmd_input.cursor_position = cursor + len(paste_text) + cmd_input.focus() + + + async def on_key(self, event: events.Key) -> None: + """ + Function used to handle key events in the terminal input box. + + It handles: + - "tab": Autocomplete command. + - "ctr+w": Delete the word before the cursor. + - "ctrl+delete", "escape": Delete the word after the curso. + - "ctrl+v": Paste the clipboard content into the terminal input box. + """ + key = event.key + cmd_input = self.query_one("#cmd", Input) + + # -- tab: Autocomplete ------------------------------------------------ + if key == "tab": + # Current text + # (don’t strip right-side spaces; keep user’s spacing) + raw = cmd_input.value or "" + # Leading spaces are not part of the command token + left = len(raw) - len(raw.lstrip()) + value = raw[left:] + + if len(value) <= 0: + # Nothing typed yet + return - elif cmd == "/bb": - if self.last_bb: - self.print("Last blackboard state:") - console.print(self.last_bb) + # Split into head (first token) and the remainder + head, *_ = value.split(maxsplit=1) + # Nothing typed yet: list all commands + all_cmds = sorted(self.commands) if self.commands else [] + if not all_cmds: + # No commands available + return + + # Get matching commands + self.tab_matches = [c for c in all_cmds if c.startswith(head)] if head else all_cmds + self.tab_index = 0 + + matches = self.tab_matches + if not matches: + # No matches, do nothing + cmd_input.focus() + event.prevent_default() + event.stop() + return + + # If multiple matches, check for a longer common prefix to insert first + if len(matches) > 1: + # Find common prefix + prefix, commands = common_prefix(matches) + new_value = prefix + # If the common prefix is just the original head, cycle through options + self.logger.log_user(cmd_input.value) + self.logger.log_console(commands) else: - self.print("No blackboard available.") + # Single match: complete directly + new_value = matches[0] + + # Rebuild the input value: + cmd_input.value = new_value + cmd_input.cursor_position = len(cmd_input.value) + cmd_input.focus() + event.prevent_default() + event.stop() + + return + + # - ctrl+w: Delete before cursor -------------------------------------- + if key == "ctrl+w": + # Input value and cursor position + value = cmd_input.value + cursor = cmd_input.cursor_position + i = cursor - 1 + # Flag used to detect first non-space character + first_char = False + + # Iterate backwards to find the start of the previous word + while i > 0: + if value[i] == ' ': + # First space after a word + if first_char: + i += 1 + break + else: + first_char = True + i -= 1 + + # Update input value and cursor position + cmd_input.value = value[:i] + value[cursor:] + cmd_input.cursor_position = i + cmd_input.focus() + event.prevent_default() + event.stop() + + return + + # - escape/ctrl+delete: Delete after cursor --------------------------- + if key in ("ctrl+delete", "escape") : + # Input value and cursor position + value = cmd_input.value + cursor = cmd_input.cursor_position + i = cursor + n = len(value) + count = 0 + # Flag used to detect first non-space character + first_char = False + + # Iterate forwards to find the end of the next word + while i < n: + if(value[i] == ' '): + if first_char: + break + else: + first_char = True + i += 1 + count +=1 + + # Update input value and cursor position + cmd_input.value = value[:cursor] + value[i:] + cmd_input.cursor_position = max(i - count, 0) + cmd_input.focus() + event.prevent_default() + event.stop() + + return + + # -- ctrl+v: Paste clipboard ------------------------------------------ + if key == "ctrl+v": + # Paste clipboard content + await self._paste_clipboard() + event.prevent_default() + event.stop() + + return + + # Any other keypress resets tab cycle if the prefix changes + if len(key) == 1 or key in ("backspace", "delete"): + # Reset tab state + self.tab_matches = [] + self.tab_index = 0 + + def set_stream_task(self, input_stream): + """ + Function used in the tools to set the current streaming task. + with this variable the user can finish the execution of the + task by using the signal "Ctrl + C" + """ + self.stream_task = input_stream + + # endregion + + # region Actions (key bindings) + + def action_clear_log(self) -> None: + """ + Function used to clear the console log area. + Used in the Ctrl + L key binding. + + Binding("ctrl+l", "clear_log", ...), + """ + self.cmd_clear(_=None) + + def action_show_help(self) -> None: + """ + Function used to show the help command in the console. + Used in the F2 key binding. + + Binding("f2", "show_help", ...), + """ + self.logger.log_user("/help") + self.cmd_help(_=None) + + def action_reverse_search(self) -> None: + """ + Function used to open the Reverse Search ModalScreen in the console. + Used in the Ctrl + R key binding. - elif cmd == "/clear": - os.system("clear") + Binding("ctrl+r", "reverse_search", ...) + """ + if not self.history: + return + + def done(result: str | None) -> None: + """ + Callback when modal closes. + """ + + cmd_input = self.query_one("#cmd", Input) + if result: + cmd_input.value = result + cmd_input.cursor_position = len(result) + cmd_input.focus() + + self.push_screen(ReverseSearchModal(self.history), done) + + def action_stop_streaming_task(self) -> None: + """ + Function used to stop the current streaming task in the console. + Used in the Ctrl + C key binding. + + Binding("ctrl+c", "stop_streaming_task", ...), + """ + if self.stream_task != None and not self.stream_task.done(): + # Cancel the streaming task + self.stream_task.cancel() # Triggers CancelledError in the task + self.stream_task = None else: - self.print(f"[error]Unknown command {cmd}[/error]") + # No streaming task running, just notify the user + self.notify("Press (Ctrl+Q) to exit.") + + def action_history_prev(self) -> None: + """ + Function used to navigate to the previous command in the history. + Used in the Up Arrow key binding. + + Binding("up", "history_prev", ...), + """ + if not self.history: + # No history, do nothing + return + if self.history_index is None: + # The history index class it is not initialized + self.history_index = len(self.history) - 1 + else: + self.history_index = max(0, self.history_index - 1) + + self._apply_history_to_input() + self._update_history_panel() + + def action_history_next(self) -> None: + """ + Function used to navigate to the next command in the history. + Used in the Down Arrow key binding. + + Binding("down", "history_next", ...), + """ + if not self.history: + # No history, do nothing + return + + if self.history_index is None: + self.history_index = len(self.history) + elif self.history_index >= len(self.history) - 1: + self.history_index = len(self.history) + else: + self.history_index += 1 - def print(self, msg: str): - console.print(f"[console]{msg}[/console]") + self._apply_history_to_input() + self._update_history_panel() - def get_images(self, user_input: str): + # endregion + + def run_console(self) -> None: + """ + Function used to run VulcanAI. + """ + self.run() + + + def init_manager(self) -> None: + """ + Function used to initialize VulcanAI Manager. + """ + if self.iterative: + from vulcanai.core.manager_iterator import IterativeManager as ConsoleManager + else: + from vulcanai.core.manager_plan import PlanManager as ConsoleManager + + self.logger.log_console(f"Initializing Manager '{ConsoleManager.__name__}'...") + + self.manager = ConsoleManager(model=self.model, k=self.k, logger=self.logger) + + self.logger.log_console(f"Manager initialized with model '{self.model.replace('ollama-', '')}'") + # Update right panel info + self._update_variables_panel() + + def get_images(self, user_input: str) -> None: + """ + Function used to get the images added by the user in the LLM query. + """ parts = user_input.split() images = [] @@ -224,7 +1110,7 @@ def get_images(self, user_input: str): return images -def main(): +def main() -> None: parser = argparse.ArgumentParser(description="VulcanAI Interactive Console") parser.add_argument( "--model", type=str, default="gpt-5-nano", @@ -247,15 +1133,13 @@ def main(): help="Enable Iterative Manager (default: off)" ) + args = parser.parse_args() - console = VulcanConsole(model=args.model, k=args.k, iterative=args.iterative) - if args.register_from_file: - for file in args.register_from_file: - console.manager.register_tools_from_file(file) - if args.register_from_entry_point: - for entry_point in args.register_from_entry_point: - console.manager.register_tools_from_entry_points(entry_point) - console.run() + + console = VulcanConsole(register_from_file=args.register_from_file, + tools_from_entrypoints=args.register_from_entry_point, + model=args.model, k=args.k, iterative=args.iterative) + console.run_console() if __name__ == "__main__": diff --git a/src/vulcanai/console/logger.py b/src/vulcanai/console/logger.py index fe94f23..a225cdd 100644 --- a/src/vulcanai/console/logger.py +++ b/src/vulcanai/console/logger.py @@ -12,68 +12,192 @@ # See the License for the specific language governing permissions and # limitations under the License. -from rich.console import Console -from rich.theme import Theme +import re +from typing import Protocol, Optional -vulcanai_theme = Theme({ - "manager": "bold blue", - "executor": "bold green", - "step": "bold yellow", - "tool": "bold cyan", - "validator": "bold orange_red1", - "error": "bold red", - "console": "bold magenta" -}) -console = Console(theme=vulcanai_theme) +class LogSink(Protocol): + """A default console that prints to standard output.""" + def write(self, msg: str, color: str = "") -> None: + ... + + +class RichStdoutSink: + def __init__(self, logger_theme) -> None: + from rich.console import Console + from rich.theme import Theme + self.console = Console(theme=Theme(logger_theme)) + + def write(self, msg: str, color: str = "") -> None: + self.console.print(msg) class VulcanAILogger: - """Logger class for VulcanAI components.""" - @staticmethod - def log_manager(msg: str, error: bool = False): + + """ + Logger class for VulcanAI components. + Provides methods to log messages with different tags and colors. + """ + + vulcanai_theme = { + "registry": "#068399", + "manager": "#0d87c0", + "executor": "#15B606", + "vulcanai": "#56AA08", + "user": "#91DD16", + "validator": "#C49C00", + "tool": "#EB921E", + "error": "#FF0000", + "console": "#8F6296", + "warning": "#D8C412", + } + + _default_instance: Optional["VulcanAILogger"] = None + _rich_markup = True + + @classmethod + def default(cls) -> "VulcanAILogger": + """Get the default VulcanAILogger instance. This method acts as a singleton factory for the logger.""" + if cls._default_instance is None: + cls._default_instance = cls() + return cls._default_instance + + def __init__(self, sink: Optional[LogSink] = None): + # A default console will be used if none is provided + self.sink: LogSink = sink or RichStdoutSink(VulcanAILogger.vulcanai_theme) + + # region UTILS + + def set_sink(self, sink: "LogSink") -> None: + """Set a new sink for the logger.""" + self.sink = sink + + def set_textualizer_console(self, textual_console) -> None: + """Set a Textual console as sink for the logger and configure formatting tags.""" + self.set_sink(textual_console) + VulcanAILogger._rich_markup = False + + def parse_color(self, msg): + """ + Parse custom [tag]...[/tag] in messages and convert them + to based on the vulcanai_theme defined colors. + """ + + # Matches [tag] or [/tag] + pattern = re.compile(r'\[(\/?)([^\]]+)\]') + + def replace_tag(match): + slash, tag = match.groups() + + # If the tag is defined in the theme, replace it + if tag in self.vulcanai_theme: + return f"<{slash}{self.vulcanai_theme[tag]}>" + + # Otherwise, keep the original tag + return match.group(0) + + return pattern.sub(replace_tag, msg) + + def parse_rich_markup(self, msg: str) -> str: + """Parse rich markup tags if rich markup is enabled.""" + if VulcanAILogger._rich_markup: + # Convert / / / etc. + msg = re.sub(r"<(/?)(bold|italic|underline|reverse|dim)>", r"[\1\2]", msg) + # Convert hex colors: <#RRGGBB> ... -> [#RRGGBB] ... [/#RRGGBB] + msg = re.sub(r"<(/?)(#[0-9a-fA-F]{6})>", r"[\1\2]", msg) + return msg + + def process_msg(self, msg: str, prefix: str = "", color: str = "") -> str: + """Process the message by adding prefix, applying color formatting and rich markup if enabled.""" + color_begin = color_end = color + if color != "": + if color in self.vulcanai_theme: + color = self.vulcanai_theme[color] + color_begin = f"<{color}>" + color_end = f"" + + msg = f"{prefix}{color_begin}{msg}{color_end}" + + return self.parse_rich_markup((self.parse_color(msg))) + + # endregion + + # region LOG + + def log_manager(self, msg: str, error: bool = False, color: str = ""): if error: - msg = f"[error][MANAGER] [ERROR][/error] {msg}" + prefix = f"[error][MANAGER] [ERROR][/error] " else: - msg = f"[manager][MANAGER][/manager] {msg}" - console.print(msg) + prefix = f"[manager][MANAGER][/manager] " + + processed_msg = self.process_msg(msg, prefix=prefix, color=color) + self.sink.write(processed_msg) - @staticmethod - def log_executor(msg: str, error: bool = False, tool: bool = False, tool_name: str = ''): + def log_executor(self, msg: str, error: bool = False, tool: bool = False, tool_name: str = '', color: str = ""): if error: - msg = f"[error][EXECUTOR] [ERROR][/error] {msg}" + prefix = f"[error][EXECUTOR] [ERROR][/error] " elif tool: - VulcanAILogger.log_tool(msg, tool_name=tool_name) + self.log_tool(msg, tool_name=tool_name) return else: - msg = f"[executor][EXECUTOR][/executor] {msg}" - console.print(msg) + prefix = f"[executor][EXECUTOR][/executor] " - @staticmethod - def log_tool(msg: str, tool_name: str = '', error: bool = False): + processed_msg = self.process_msg(msg, prefix=prefix, color=color) + self.sink.write(processed_msg) + + def log_tool(self, msg: str, tool_name: str = '', error: bool = False, color: str = ""): if tool_name: - tag = f"[TOOL [italic]{tool_name}[/italic]]" + tag = f"[TOOL {tool_name}]" else: - tag = '[TOOL]' + tag = '[TOOL]' if error: - msg = f"[error]{tag} [ERROR][/error] {msg}" + prefix = f"[error]{tag} [ERROR][/error] " else: - msg = f"[step]{tag}[/step] {msg}" - console.print(msg) + prefix = f"[tool]{tag}[/tool] " + + processed_msg = self.process_msg(msg, prefix=prefix, color=color) + self.sink.write(processed_msg) - @staticmethod - def log_registry(msg: str, error: bool = False): + def log_registry(self, msg: str, error: bool = False, color: str = ""): if error: - msg = f"[error][REGISTRY] [ERROR][/error] {msg}" + prefix = f"[error][REGISTRY] [ERROR][/error] " else: - msg = f"[tool][REGISTRY][/tool] {msg}" - console.print(msg) + prefix = f"[registry][REGISTRY][/registry] " + + processed_msg = self.process_msg(msg, prefix=prefix, color=color) + self.sink.write(processed_msg) + + def log_validator(self, msg: str, error: bool = False, color: str = ""): + if error: + prefix = f"[error][VALIDATOR] [ERROR][/error] " + else: + prefix = f"[validator][VALIDATOR][/validator] " + + processed_msg = self.process_msg(msg, prefix=prefix, color=color) + self.sink.write(processed_msg) + + def log_console(self, msg: str, color: str = ""): + if color == "": + msg = f"[console]{msg}[/console]" + + processed_msg = self.parse_color(msg) + if color == "": + self.sink.write(processed_msg) + else: + log_color = self.vulcanai_theme.get(color, "") + self.sink.write(processed_msg, self.vulcanai_theme["console"] if log_color != "" else "") + + def log_msg(self, msg: str, error: bool = False, color: str = ""): + if error: + color = self.vulcanai_theme.get("error", "") + + processed_msg = self.process_msg(msg, color=color) + self.sink.write(processed_msg) + + def log_user(self, msg: str): + prefix = f"[user][USER] >>>[/user] " - @staticmethod - def log_validator(msg: str): - msg = f"[validator][VALIDATOR][/validator] {msg}" - console.print(msg) + processed_msg = self.process_msg(msg, prefix=prefix) + self.sink.write(processed_msg) - @staticmethod - def log_error(msg: str): - console.print(f"[error][ERROR][/error] {msg}") + # endregion diff --git a/src/vulcanai/console/modal_screens.py b/src/vulcanai/console/modal_screens.py new file mode 100644 index 0000000..a00f894 --- /dev/null +++ b/src/vulcanai/console/modal_screens.py @@ -0,0 +1,282 @@ +# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from textual import events +from textual.app import ComposeResult +from textual.containers import VerticalScroll, Horizontal, Vertical, Container +from textual.screen import ModalScreen +from textual.widgets import Input, Checkbox, Button, Label, RadioSet, RadioButton + + + +class ReverseSearchModal(ModalScreen[str | None]): + """ + Bottom modal for reverse-i-search. + """ + + DEFAULT_CSS = """ + ReverseSearchModal { + align-horizontal: center; + align-vertical: bottom; + } + + #rev-box { + width: 100%; + height: 3; + background: $surface; + border-top: solid $accent; + padding: 0 1; + layout: horizontal; + } + + #rev-label { + width: 2fr; + content-align: left middle; + } + + #rev-input { + width: 1fr; + } + """ + + def __init__(self, history: list[str]) -> None: + super().__init__() + self.history = history + self.search_query: str = "" + self.match_index: int | None = None + + def compose(self) -> ComposeResult: + with Container(id="rev-box"): + # Label shows "(reverse-i-search)`query`: match" + yield Label("(reverse-i-search)``: ", id="rev-label") + yield Input(placeholder="type to search…", id="rev-input") + + def on_mount(self) -> None: + self.query_one("#rev-input", Input).focus() + + def update_label(self) -> None: + """ + Update label based on current query + best match from history. + """ + + label = self.query_one("#rev-label", Label) + query = self.search_query + + if not self.history or not query: + label.update(f"(reverse-i-search)`{query}`: ") + return + + # Start from the end and search backwards for first match + start = len(self.history) if self.match_index is None else self.match_index + found = None + for i in range(start - 1, -1, -1): + if query in self.history[i]: + found = (i, self.history[i]) + break + + if found is not None: + self.match_index, match = found + label.update(f"(reverse-i-search)`{query}`: {match}") + else: + self.match_index = None + label.update(f"(reverse-i-search)`{query}`: ") + + def on_input_changed(self, event: Input.Changed) -> None: + """ + Whenever user types in the textbox, update query + label. + """ + + if event.input.id != "rev-input": + return + self.search_query = event.value + self.match_index = None # restart from most recent + self.update_label() + + async def on_key(self, event: events.Key) -> None: + key = event.key + + # Accept current match + if key in ("enter", "tab"): + result: str | None + if self.match_index is not None and 0 <= self.match_index < len(self.history): + result = self.history[self.match_index] + else: + result = self.search_query or None + + self.dismiss(result) + event.stop() + return + + # Cancel with Esc / Ctrl+C + if key in ("escape", "ctrl+c"): + self.dismiss(None) + event.stop() + return + + # Ctrl+R while in modal = go to previous match (optional) + if key == "ctrl+r" and self.search_query: + # look for previous match starting before current match_index + start = self.match_index if self.match_index is not None else len(self.history) + found = None + for i in range(start - 1, -1, -1): + if self.search_query in self.history[i]: + found = (i, self.history[i]) + break + + if found is not None: + self.match_index, match = found + label = self.query_one("#rev-label", Label) + label.update(f"(reverse-i-search)`{self.search_query}`: {match}") + + event.prevent_default() + event.stop() + return + +class CheckListModal(ModalScreen[list[str] | None]): + + + CSS = """ + CheckListModal { + align: center middle; + } + + .dialog { + width: 60%; + max-width: 90%; + height: 80%; /* fixed portion of terminal */ + border: round $accent; + padding: 1 2; + background: $panel; + } + + .title { + text-align: center; + margin-bottom: 1; + } + + /* This is the important part */ + .checkbox-list { + height: 1fr; /* take all remaining vertical space */ + /* no max-height, no overflow-y here */ + } + + .btns { + height: 3; /* give buttons row a fixed height */ + padding-top: 1; + content-align: right middle; + } + """ + + def __init__(self, lines: list[str], active_tools_num: int = 0) -> None: + super().__init__() + self.lines = list(lines) + self.active_tools_num = active_tools_num + + def compose(self) -> ComposeResult: + with Vertical(classes="dialog"): + yield Label("Pick tools you want to enable", classes="title") + + # SCROLLABLE CHECKBOX LIST + with VerticalScroll(classes="checkbox-list"): + for i, line in enumerate(self.lines, start=1): + yield Checkbox(line, value=i <= self.active_tools_num, id=f"cb{i}") + + # Buttons + with Horizontal(classes="btns"): + yield Button("Cancel", variant="default", id="cancel") + yield Button("Submit", variant="primary", id="submit") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "submit": + boxes = list(self.query(Checkbox)) + selected = [self.lines[i] for i, cb in enumerate(boxes) if cb.value] + self.dismiss(selected) + elif event.button.id == "cancel": + self.dismiss(None) + + def on_mount(self) -> None: + first_cb = self.query_one(Checkbox) + self.set_focus(first_cb) + + + + +class RadioListModal(ModalScreen[str | None]): + + CSS = """ + RadioListModal { + align: center middle; + } + + .dialog { + width: 60%; + max-width: 90%; + height: 40%; + border: round $accent; + padding: 1 2; + background: $panel; + } + + .title { + text-align: center; + margin-bottom: 1; + } + + .radio-list { + height: 1fr; + } + + .btns { + height: 3; + padding-top: 1; + content-align: right middle; + } + """ + + def __init__(self, lines: list[str], default_index: int = 0) -> None: + super().__init__() + self.lines = lines + self.default_index = default_index + + def compose(self) -> ComposeResult: + with Vertical(classes="dialog"): + yield Label("Pick one option", classes="title") + + # One-select radio list + with VerticalScroll(classes="radio-list"): + with RadioSet(id="radio-set"): + for i, line in enumerate(self.lines): + yield RadioButton( + line, + id=f"rb{i}", + value=(i == self.default_index) + ) + + # Buttons + with Horizontal(classes="btns"): + yield Button("Cancel", variant="default", id="cancel") + yield Button("Submit", variant="primary", id="submit") + + def on_mount(self) -> None: + first_rb = self.query_one(RadioButton) + self.set_focus(first_rb) + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "submit": + radioset = self.query_one("#radio-set", RadioSet) + selected = radioset.pressed_index + if selected != None: + self.dismiss(selected) + else: + self.dismiss(None) \ No newline at end of file diff --git a/src/vulcanai/console/utils.py b/src/vulcanai/console/utils.py new file mode 100644 index 0000000..6720f66 --- /dev/null +++ b/src/vulcanai/console/utils.py @@ -0,0 +1,320 @@ +# Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio +import difflib +import heapq +import subprocess +import sys +import time +from textual.markup import escape # To remove potential errors in textual terminal + +class StreamToTextual: + """ + Class used to redirect the stdout/stderr streams in the textual terminal + """ + + def __init__(self, app, stream_name: str = "stdout"): + self.app = app + self.real_stream = getattr(sys, stream_name) + + def write(self, data: str): + if not data: + return + + if data.strip(): + # Ensure update happens on the app thread + #self.app.call_from_thread(self.app.append_log_text, data) + self.app.call_from_thread(self.app.add_line, data) + + def flush(self): + self.real_stream.flush() + + +class SpinnerHook: + """ + Single entrant spinner controller for console. + - Starts the spinner on the LLM request. + - Stops the spinner when LLM request is over. + """ + + def __init__(self, spinner_status): + self.spinner_status = spinner_status + + def on_request_start(self, text="Querying LLM..."): + self.spinner_status.start(text) + + def on_request_end(self): + self.spinner_status.stop() + + +def attach_ros_logger_to_console(console, node): + """ + Function that remove ROS node overlaping prints in the terminal + """ + + logger = node.get_logger() + + def info_hook(msg, *args, **kwargs): + console.call_from_thread(console.logger.log_msg, f"[ROS] [INFO] {msg}") + + def warn_hook(msg, *args, **kwargs): + console.call_from_thread(console.logger.log_msg, f"[ROS] [WARN] {msg}") + + def error_hook(msg, *args, **kwargs): + console.call_from_thread(console.logger.log_msg, f"[ROS] [ERROR] {msg}") + + logger.info = info_hook + logger.warning = warn_hook + logger.error = error_hook + + +def common_prefix(strings: str) -> str: + if not strings: + return "" + + common_prefix = strings[0] + commands = strings[0] + + for i in range(1, len(strings)): + commands += f" {strings[i]}" + + tmp = "" + n = min(len(common_prefix), len(strings[i])) + j = 0 + + while j < n: + if common_prefix[j] != strings[i][j]: + break + tmp += common_prefix[j] + + j += 1 + + if j < n: + common_prefix = tmp + + return common_prefix, commands + +async def run_streaming_cmd_async(console, args: list[str], + max_duration: float = 60, + max_lines: int = 1000, + echo: bool = True, + tool_name="") -> str: + + + # Unpack the command + cmd, *cmd_args = args + + # Create the subprocess + process = await asyncio.create_subprocess_exec( + cmd, + *cmd_args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + assert process.stdout is not None + + start_time = time.monotonic() + line_count = 0 + + try: + # Subprocess main loop. Read line by line + async for raw_line in process.stdout: + line = raw_line.decode(errors="ignore").rstrip("\n") + + # Print the line + if echo: + line_processed = escape(line) + console.add_line(line_processed) + + # Count the line + line_count += 1 + if max_lines is not None and line_count >= max_lines: + console.logger.log_tool(f"[tool]Stopping:[/tool] Reached max_lines = {max_lines}", tool_name=tool_name) + console.set_stream_task(None) + process.terminate() + break + + # Check duration + if max_duration and (time.monotonic() - start_time) >= max_duration: + console.logger.log_tool(f"[tool]Stopping:[/tool] Exceeded max_duration = {max_duration}s", tool_name=tool_name) + console.set_stream_task(None) + process.terminate() + break + + + except asyncio.CancelledError: + # Task was cancelled → stop the subprocess + console.logger.log_tool(f"[tool]Cancellation received:[/tool] terminating subprocess...", tool_name=tool_name) + process.terminate() + raise + # Not necessary, textual terminal get the keyboard input + except KeyboardInterrupt: + # Ctrl+C pressed → stop subprocess + console.logger.log_tool(f"[tool]Ctrl+C received:[/tool] terminating subprocess...", tool_name=tool_name) + process.terminate() + + finally: + try: + await asyncio.wait_for(process.wait(), timeout=3.0) + except asyncio.TimeoutError: + console.logger.log_tool(f"Subprocess didn't exit in time → killing it.", tool_name=tool_name, error=True) + process.kill() + await process.wait() + + return "Process stopped due to Ctrl+C" + + +def execute_subprocess(console, tool_name, base_args, max_duration, max_lines): + + stream_task = None + + def _launcher() -> None: + nonlocal stream_task + # This always runs in the Textual event-loop thread + loop = asyncio.get_running_loop() + stream_task = loop.create_task( + run_streaming_cmd_async( + console, + base_args, + max_duration=max_duration, + max_lines=max_lines, + tool_name=tool_name#tool_header_str + ) + ) + + def _on_done(task: asyncio.Task) -> None: + + if task.cancelled(): + # Normal path → don't log as an error + # If you want a message, call UI methods directly here, + # not via console.write (see Fix 2) + return + + try: + task.result() + except Exception as e: + console.logger.log_msg(f"Echo task error: {e!r}\n", error=True) + #result["output"] = False + return + + stream_task.add_done_callback(_on_done) + + try: + # Are we already in the Textual event loop thread? + asyncio.get_running_loop() + except RuntimeError: + # No loop here → probably ROS thread. Bounce into Textual thread. + # `console.app` is your Textual App instance. + console.app.call_from_thread(_launcher) + else: + # We *are* in the loop → just launch directly. + _launcher() + + # Store the task in the console to be able to cancel it later + console.set_stream_task(stream_task) + console.logger.log_tool("[tool]Subprocess created![tool]", tool_name=tool_name) + +def run_oneshot_cmd(args: list[str]) -> str: + try: + return subprocess.check_output( + args, + stderr=subprocess.STDOUT, + text=True + ) + + except subprocess.CalledProcessError as e: + raise Exception(f"Failed to run '{' '.join(args)}': {e.output}") + + +def suggest_string(console, tool_name, string_name, input_string, real_string_list): + + ret = None + + def _similarity(a: str, b: str) -> float: + """Return a similarity score between 0 and 1.""" + return difflib.SequenceMatcher(None, a, b).ratio() + + def _get_suggestions(real_string_list_comp: list[str], string_comp: str) -> tuple[str, list[str]]: + """ + Function used to search for the most "similar" string in a list. + + Used in ROS2 cli commands to used the "simmilar" in case + the queried topic does not exists. + + Example: + real_string_list_comp = [ + "/parameter_events", + "/rosout", + "/turtle1/cmd_vel", + "/turtle1/color_sensor", + "/turtle1/pose", + ] + string_comp = "trtle1" + + @return + str: the most "similar" string + list[str] a sorted list by a similitud value + """ + + topic_list_pq = [] + n = len(string_comp) + + for string in real_string_list_comp: + m = len(string) + # Calculate the similitud + score = _similarity(string_comp, string) + # Give more value for the nearest size comparations. + score -= abs(n - m) * 0.005 + # Max-heap (via negative score) + heapq.heappush(topic_list_pq, (-score, string)) + + # Pop ordered list + ret_list: list[str] = [] + _, most_topic_similar = heapq.heappop(topic_list_pq) + + ret_list.append(most_topic_similar) + + while topic_list_pq: + _ , topic = heapq.heappop(topic_list_pq) + ret_list.append(topic) + + return most_topic_similar, ret_list + + if input_string not in real_string_list: + + #console.add_line(f"{tool_header_str} {string_name}: \"{input_string}\" does not exists") + console.logger.log_tool(f"{string_name}: \"{input_string}\" does not exists", tool_name=tool_name) + + # Get the suggestions list sorted by similitud value + _, topic_sim_list = _get_suggestions(real_string_list, input_string) + + # Open the ModalScreen + console.open_radiolist(topic_sim_list, f"{tool_name}") + + # Wait for the user to select and item in the + # RadioList ModalScreen + console.suggestion_index_changed.wait() + + # Check if the user cancelled the suggestion + if console.suggestion_index >= 0: + ret = topic_sim_list[console.suggestion_index] + + # Reset suggestion index + console.suggestion_index = -1 + console.suggestion_index_changed.clear() + + return ret diff --git a/src/vulcanai/console/widget_custom_log_text_area.py b/src/vulcanai/console/widget_custom_log_text_area.py new file mode 100644 index 0000000..45ef422 --- /dev/null +++ b/src/vulcanai/console/widget_custom_log_text_area.py @@ -0,0 +1,382 @@ +# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from collections import defaultdict, deque +import pyperclip +import re +import threading + +from rich.style import Style +from textual.widgets import TextArea + + +class CustomLogTextArea(TextArea): + """ + TextArea-based Log Panel. Includes the following features: + - Selectable text. + - Color text. + + NOTE: Uses private TextArea internals (_highlights, _theme). + This may break on future Textual versions. + """ + + BINDINGS = [ + ("f3", "copy_selection", "Copy selection"), + ] + + # Maximum number of lines to keep in the log + MAX_LINES = 500 + + # body + TAG_RE = re.compile(r"<(?P[A-Za-z0-9_# ]+)>(?P.*?)") + # join tags + TAG_TOKEN_RE = re.compile(r"]+>") + + + def __init__(self, **kwargs): + super().__init__(read_only=True, **kwargs) + + # Lock used to avoid data races in 'self._lines_styles' + # when VulcanAI and ROS threads writes at the same time + self._lock = threading.Lock() + + # Internal variable used by the father class (TextArea) + # to change the colors of the lines in the Log. + # When the function self.refresh() is called, all the styles + # are refreshed (includes: colors, bold, italic, ...) + self._highlights = defaultdict(list) + + # Private variable of the number of lines + # that are currently being displayed + self._line_count = 0 + + # Private dictionary (int, list()) with all the style for the lines + # A row with N styles # (e.g.: N = 2: two colors, one bold and color) + # will have N entries. + self._lines_styles = deque(maxlen=self.MAX_LINES) + + # region UTILS + + def _trim_highlights(self) -> None: + """ + Function used to trim the CustomLogTextArea to the maximum number of lines. + + Keep only the last MAX_LINES lines (text + styles). + """ + if self._line_count <= self.MAX_LINES: + # Less than max lines, nothing to do + return + + # MAX_LINES exceeded, need to trim + + # Calculate how many lines to remove + extra = self._line_count - self.MAX_LINES + + # 1) Remove the first 'extra' lines from the TextArea document. + # end=(extra, 0) means "start of line 'extra'", so it deletes lines [0..extra-1] + self.replace("", start=(0, 0), end=(extra, 0)) + + # 3) Update line count + self._line_count -= extra + + def _rebuild_highlights(self) -> None: + """ + Function used to rebuild the highlights of the TextArea based on the current + _lines_styles mapping. + + Each time a new line is added, or a line is modified, this function should be called + to update the _highlights variable used by the TextArea to render the styles. + """ + + # Clears the previous highlights + self._highlights.clear() + + # Rebuild highlights from _lines_styles + for row, value_list in enumerate(self._lines_styles): + for tuple in value_list: + self._highlights[row].append((tuple[0], tuple[1], tuple[2])) + + def join_nested_tags(self, input_text: str) -> str: + """ + Function used to join nested tags into combined tags. + + E.g.: + Input: "This is important text" + Output: "This is important text" + """ + out: list[str] = [] + stack: list[str] = [] + + pos = 0 + # Iterate over all tags in 'input_text' + for m in self.TAG_TOKEN_RE.finditer(input_text): + # Emit text between tags + text = input_text[pos:m.start()] + if text: + if stack: + combined = " ".join(stack) + out.append(f"<{combined}>{text}") + else: + out.append(text) + + # Process tag + token = m.group(0) + is_close = token.startswith("{tail}") + else: + out.append(tail) + + return "".join(out) + + def _ensure_style_token(self, style: str) -> str: + """ + Function used to ensure a style token exists in the theme. + If the style does not exist, it is created. + + E.g.: + style: style string (e.g., "red bold", "#FF0000 italic", "green underline", ...) + """ + style_list = style.split() + + color = None + bold = False + italic = False + underline = False + dim = False + + token_parts = [] + + # Iterate over style parts + for st in style_list: + if st.startswith("#"): + # Hex color + color = st + token_parts.append("hex_" + st[1:].upper()) + elif st in {"bold", "italic", "underline", "dim"}: + # Text style + token_parts.append(st) + if st == "bold": + bold = True + elif st == "italic": + italic = True + elif st == "underline": + underline = True + elif st == "dim": + dim = True + else: + # Other colors + + # Gray color is not supported + if st == "gray": + color = "#8D8D8D" + token_parts.append("hex_" + color[1:]) + else: + # Assume named color like "red", "yellow" + color = st + token_parts.append(st) + + token = "_".join(token_parts) + + # Register the style if it doesn't exist + if token not in self._theme.syntax_styles: + self._theme.syntax_styles[token] = Style( + color=color, + bold=bold, + italic=italic, + underline=underline, + dim=dim, + ) + + return token + + # endregion + + # region LOG + + def append_line(self, text: str) -> bool: + """ + Function used to append a new line to the CustomLogTextArea. + + The 'text' parameter can include tags to specify styles. + E.g.: + "This is red text" + "This is green bold text" + """ + + # Check if the input text has a breakline + if "\n" in text or "\r" in text: + return False + + # 'text' without tags + plain = "" + # Spans -> Multiple styles (start_col, end_col, token) + spans = [] + # Position cursor in 'text' + cursor = 0 + + # First, join nested tags into combined tags + # for easier parsing. + text = self.join_nested_tags(text) + + # Parse tags and build plain text + 'spans' + # TAG_RE.finditer finds all tags in 'text' + for m in self.TAG_RE.finditer(text): + # Append text before the current tag + plain += text[cursor:m.start()] + # Get tag and body + tag = m.group("tag") + body = m.group("body") + # Record span + start = len(plain) + # Append body in plain text + plain += body + end = len(plain) + # Get or create style token + token = self._ensure_style_token(tag) + spans.append((start, end, token)) + # Move cursor after the current tag + cursor = m.end() + + # Append remaining text after last tag + plain += text[cursor:] + + # Append the new line with styles + # Use a lock to avoid race conditions when multiple threads + # try to write to the console at the same time. + # This can happen when VulcanAI and ROS nodes are logging simultaneously. + # + # e.g., VulcanAI manager logging and ROS listener logging. + # [EXECUTOR] Invoking 'move_turtle' with args: ... + # [ROS] [INFO] Publishing message 1 to ... + with self._lock: + + # Append via document API to keep row tracking consistent + # Only add a newline before the new line if there is already content + insert_text = ("\n" if self.document.text else "") + plain + self.insert(insert_text, location=self.document.end) + + # Track styles for the new line (always at the end) + row = self._line_count + self._line_count += 1 + + if (self._line_count > self.MAX_LINES): + self._highlights.pop(self._line_count - self.MAX_LINES, None) + + # Store styles + # Each line may have multiple styles (spans) + current_line = [] + for start, end, token in spans: + current_line.append((start, end, token)) + + self._lines_styles.append(current_line) + + # Trim now + self._trim_highlights() + + # Scroll to end + self.scroll_end(animate=False) + + # Rebuild highlights and refresh + self._rebuild_highlights() + self.refresh() + + return True + + def delete_last_row(self) -> None: + """ + Function used to delete the last line in the CustomLogTextArea. + Used when a AI query is completed. To remove the "Querying..." line. + """ + with self._lock: + if self._line_count == 0: + # No lines, does nothing. + return + + last_row = self._line_count - 1 + + if last_row > 0: + # Delete the newline before the last line + the line itself + self.replace( + "", + start=(last_row - 1, len(self.document.get_line(last_row - 1))), + end=(last_row, len(self.document.get_line(last_row))), + ) + else: + # Only one line + self.replace( + "", + start=(0, 0), + end=(0, len(self.document.get_line(0))), + ) + + # Decrease line count and remove styles + self._line_count -= 1 + self._lines_styles.pop() + + # Rebuild highlights and refresh + self._rebuild_highlights() + self.refresh() + + # endregion + + def clear_console(self) -> None: + """ + Function used to clear the entire CustomLogTextArea. + """ + + with self._lock: + # Clear internal structures + self._highlights.clear() + + # Clear the document + self._line_count = 0 + self._lines_styles.clear() + + # Refresh and clear the TextArea + self.refresh() + self.clear() + + def action_copy_selection(self) -> None: + """ + Action: Copies the selected text of the CustomLogTextArea + """ + + # Check if there is selected text + if self.selected_text == "": + self.notify("No text selected to copy!") + return + + # Copy to clipboard, using pyperclip library + pyperclip.copy(self.selected_text) + self.notify("Selected area copied to clipboard!") diff --git a/src/vulcanai/console/widget_spinner.py b/src/vulcanai/console/widget_spinner.py new file mode 100644 index 0000000..fc56aed --- /dev/null +++ b/src/vulcanai/console/widget_spinner.py @@ -0,0 +1,74 @@ +# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from rich.spinner import Spinner +from rich.text import Text +from textual.widgets import Static + + +class SpinnerStatus(Static): + """ + Widget to display a spinner indicating LLM activity. + It implements rich's Spinner and manages its display state, + starting and stopping it as needed through SpinnerHook. + """ + def __init__(self, logcontent, **kwargs) -> None: + super().__init__(**kwargs) + self.logcontent = logcontent + self._spinner = Spinner("dots2", text="") + self._timer = None + self._forced_compact = False + + def on_mount(self) -> None: + self._timer = self.set_interval(1/30, self._refresh, pause=True) + self.display = False + self.styles.height = 0 + + def _refresh(self) -> None: + self.update(self._spinner) + + def _log_is_filling_space(self) -> bool: + """ + Determine if the log content is filling all available space. + """ + visible = max(1, self.logcontent.size.height) + lines = getattr(self.logcontent.document, "line_count", 0) + return lines > visible + + def start(self, text: str = "Querying LLM...") -> None: + self._spinner.text = Text(text, style="#0d87c0") + self.display = True + self.styles.height = 1 + if self._log_is_filling_space(): + self.logcontent.styles.height = "1fr" + self._forced_compact = True + self.logcontent.refresh(layout=True) + else: + self._forced_compact = False + self.refresh(layout=True) + + self._timer.resume() + + def stop(self) -> None: + self._timer.pause() + self.display = False + self.styles.height = 0 + if self._forced_compact: + self.logcontent.styles.height = "auto" + self._forced_compact = False + self.refresh(layout=True) + self.logcontent.refresh(layout=True) + + self.update("") diff --git a/src/vulcanai/core/agent.py b/src/vulcanai/core/agent.py index 4abf08c..febc67e 100644 --- a/src/vulcanai/core/agent.py +++ b/src/vulcanai/core/agent.py @@ -25,11 +25,12 @@ class Brand(str, Enum): class Agent: + """Interface to operate the LLM.""" def __init__(self, model_name: str, logger=None): self.brand, name = self._detect_brand(model_name) self.model = None - self.logger = logger or VulcanAILogger.log_manager + self.logger = logger or VulcanAILogger.default() self._load_model(name) def inference_plan( @@ -133,17 +134,20 @@ def _detect_brand(self, model_name: str) -> tuple[Brand, str]: def _load_model(self, model_name: str): if self.brand == Brand.gpt: from vulcanai.models.openai import OpenAIModel - self.logger(f"Using OpenAI API with model: {model_name}") + self.logger.log_manager(f"Using OpenAI API with model: " + \ + f"[manager]{model_name}[/manager]") self.model = OpenAIModel(model_name, self.logger) elif self.brand == Brand.gemini: from vulcanai.models.gemini import GeminiModel - self.logger(f"Using Gemini API with model: {model_name}") + self.logger.log_manager(f"Using Gemini API with model: " + \ + f"[manager]{model_name}[/manager]") self.model = GeminiModel(model_name, self.logger) elif self.brand == Brand.ollama: from vulcanai.models.ollama_model import OllamaModel - self.logger(f"Using Ollama API with model: {model_name}") + self.logger.log_manager(f"Using Ollama API with model: " + \ + f"[manager]{model_name}[/manager]") self.model = OllamaModel(model_name, self.logger) else: @@ -154,8 +158,8 @@ def set_hooks(self, hooks) -> None: if self.model: try: self.model.hooks = hooks - self.logger("LLM hooks set.") + self.logger.log_manager("LLM hooks set.") except Exception as e: - self.logger(f"Failed to set LLM hooks: {e}", error=True) + self.logger.log_manager(f"Failed to set LLM hooks: {e}", error=True) else: - self.logger("LLM model not initialized, cannot set hooks.", error=True) + self.logger.log_manager("LLM model not initialized, cannot set hooks.", error=True) diff --git a/src/vulcanai/core/executor.py b/src/vulcanai/core/executor.py index d473ee0..3cc41c3 100644 --- a/src/vulcanai/core/executor.py +++ b/src/vulcanai/core/executor.py @@ -59,7 +59,7 @@ class PlanExecutor: def __init__(self, registry, logger=None): self.registry = registry - self.logger = logger or VulcanAILogger.log_executor + self.logger = logger or VulcanAILogger.default() def run(self, plan: GlobalPlan, bb: Blackboard) -> Dict[str, Any]: """ @@ -80,19 +80,22 @@ def _run_plan_node(self, node: PlanBase, bb: Blackboard) -> bool: """Run a PlanNode with execution control parameters.""" # Evaluate PlanNode-level condition if node.condition and not self.safe_eval(node.condition, bb): - self.logger(f"Skipping PlanNode {node.kind} due to not fulfilled condition={node.condition}") + self.logger.log_executor(f"Skipping PlanNode {node.kind} due to not fulfilled " + \ + f"condition={node.condition}") return True attempts = node.retry + 1 if node.retry else 1 for i in range(attempts): ok = self._execute_plan_node_with_timeout(node, bb) if ok and self._check_success(node, bb): - self.logger(f"PlanNode {node.kind} succeeded on attempt {i+1}/{attempts}") + self.logger.log_executor(f"PlanNode [registry]{node.kind}[/registry] " + \ + f"[executor]succeeded[/executor] " + \ + f"on attempt {i+1}/{attempts}") return True - self.logger(f"PlanNode {node.kind} failed on attempt {i+1}/{attempts}", error=True) - + self.logger.log_executor(f"PlanNode {node.kind} failed on attempt {i+1}/{attempts}", error=True) if node.on_fail: - self.logger(f"Executing on_fail branch for PlanNode {node.kind}") + self.logger.log_executor(f"Executing on_fail branch for PlanNode " + \ + f"[registry]{node.kind}[/registry]") # Execute the on_fail branch but ignore its result and return False self._run_plan_node(node.on_fail, bb) @@ -106,7 +109,7 @@ def _execute_plan_node_with_timeout(self, node: PlanBase, bb: Blackboard) -> boo future = executor.submit(self._execute_plan_node, node, bb) return future.result(timeout=node.timeout_ms / 1000.0) except concurrent.futures.TimeoutError: - self.logger(f"PlanNode {node.kind} timed out after {node.timeout_ms} ms") + self.logger.log_executor(f"PlanNode {node.kind} timed out after {node.timeout_ms} ms", error=True) return False else: return self._execute_plan_node(node, bb) @@ -126,13 +129,15 @@ def _execute_plan_node(self, node: PlanBase, bb: Blackboard) -> bool: return all(results) # Pydantic should have validated this already - self.logger(f"Unknown PlanNode kind {node.kind}, skipping", error=True) + + self.logger.log_executor(f"Unknown PlanNode kind {node.kind}, skipping", error=True) return True def _run_step(self, step: Step, bb: Blackboard, parallel: bool = False) -> bool: # Evaluate Step-level condition if step.condition and not self.safe_eval(step.condition, bb): - self.logger(f"Skipping step [italic]'{step.tool}'[/italic] due to condition={step.condition}") + self.logger.log_executor(f"Skipping step '{step.tool}' " + \ + f"due to condition=[executor]{step.condition}[/executor]") return True # Bind args with blackboard placeholders @@ -150,7 +155,8 @@ def _run_step(self, step: Step, bb: Blackboard, parallel: bool = False) -> bool: if ok and self._check_success(step, bb, is_step=True): return True else: - self.logger(f"Step [italic]'{step.tool}'[/italic] attempt {i+1}/{attempts} failed") + self.logger.log_executor(f"Step [executor]'{step.tool}'[/executor] " + \ + f"attempt {i+1}/{attempts} failed") return False @@ -161,10 +167,12 @@ def _check_success(self, entity: Step | PlanBase, bb: Blackboard, is_step: bool return True log_value = entity.tool if is_step else entity.kind if self.safe_eval(entity.success_criteria, bb): - self.logger(f"Entity '{log_value}' succeeded with criteria={entity.success_criteria}") + self.logger.log_executor(f"Entity '{log_value}' [executor]succeeded[/executor] " + \ + f"with criteria={entity.success_criteria}") return True else: - self.logger(f"Entity '{log_value}' failed with criteria={entity.success_criteria}") + self.logger.log_executor(f"Entity '{log_value}' [error]failed[/error] " + \ + f"with criteria={entity.success_criteria}") return False def safe_eval(self, expr: str, bb: Blackboard) -> bool: @@ -179,7 +187,7 @@ def safe_eval(self, expr: str, bb: Blackboard) -> bool: # Eval does not correctly evaluate dot notation with nested dicts return bool(eval(sub_expr)) except Exception as e: - self.logger(f"Condition evaluation failed: {expr} ({e})") + self.logger.log_executor(f"Condition evaluation failed: {expr} ({e})", error=True) return False def _make_bb_subs(self, expr: str, bb: Blackboard) -> str: @@ -192,7 +200,7 @@ def _make_bb_subs(self, expr: str, bb: Blackboard) -> str: expr = expr.replace(f"{{{{{match}}}}}", str(val)) return expr except Exception as e: - self.logger(f"Blackboard substitution failed: {expr} ({e})", error=True) + self.logger.log_executor(f"Blackboard substitution failed: {expr} ({e})", error=True) return expr def _bind_args(self, args: List[ArgValue], schema: List[Tuple[str, str]], bb: Blackboard) -> List[ArgValue]: @@ -241,17 +249,32 @@ def _call_tool(self, bb: Blackboard = None, parallel: bool = False) -> Tuple[bool, Any]: """Invoke a registered tool.""" + tool = self.registry.tools.get(tool_name) if not tool: - self.logger(f"Tool [italic]'{tool_name}'[/italic] not found", error=True) + self.logger.log_executor(f"Tool '{tool_name}' not found", error=True) return False, None # Convert args list to dict arg_dict = {a.key: a.val for a in args} tool.bb = bb + first = True + + msg = f"Invoking [executor]'{tool_name}'[/executor] with args:" + msg += "'{" + for key, value in arg_dict.items(): + if first: + msg += f"[validator]'{key}'[/validator]: " + \ + f"[registry]'{value}'[/registry]" + else: + msg += f", [validator]'{key}'[/validator]: " + \ + f"[registry]'{value}'[/registry]" + first = False + msg+="}'" + self.logger.log_executor(msg) + start = time.time() - self.logger(f"Invoking [italic]'{tool_name}'[/italic] with args: [italic]'{arg_dict}'[/italic]") tool_log = "" try: if timeout_ms: @@ -274,13 +297,28 @@ def _call_tool(self, result = tool.run(**arg_dict) tool_log = buff.getvalue().strip() if tool_log: - self.logger(f"{tool_log}", tool=True, tool_name=tool_name) + self.logger.log_executor(f"{tool_log}: {tool_name}") elapsed = (time.time() - start) * 1000 - self.logger(f"Executed [italic]'{tool_name}'[/italic] in {elapsed:.1f} ms with result: {result}") + self.logger.log_executor(f"Executed [executor]'{tool_name}'[/executor] " + \ + f"in [registry]{elapsed:.1f} ms[/registry] " + \ + f"with result:") + + if isinstance(result, dict): + for key, value in result.items(): + if key == "ros2": + continue + self.logger.log_msg(f"{key}") + self.logger.log_msg(value) + else: + self.logger.log_msg(result) + return True, result except concurrent.futures.TimeoutError: - self.logger(f"Execution of [italic]'{tool_name}'[/italic] timed out after {timeout_ms} ms") + self.logger.log_executor(f"Execution of [executor]'{tool_name}'[/executor] " + \ + f"[error]timed out[/error] " + \ + f"after [registry]{timeout_ms}[/registry] ms") return False, None except Exception as e: - self.logger(f"Execution failed for [italic]'{tool_name}'[/italic]: {e}") + self.logger.log_executor(f"Execution [error]failed[/error] for " + \ + f"[executor]'{tool_name}'[/executor]: {e}") return False, None diff --git a/src/vulcanai/core/manager.py b/src/vulcanai/core/manager.py index 128335a..93e094a 100644 --- a/src/vulcanai/core/manager.py +++ b/src/vulcanai/core/manager.py @@ -22,23 +22,25 @@ from vulcanai.tools.tool_registry import ToolRegistry + class ToolManager: """Manages the LLM Agent and calls the executor with the LLM output.""" def __init__( self, model: str, - registry: Optional[ToolRegistry]=None, - validator: Optional[PlanValidator]=None, - k: int=10, + registry: Optional[ToolRegistry] = None, + validator: Optional[PlanValidator] = None, + k: int = 10, hist_depth: int = 3, - logger=None + logger: Optional[VulcanAILogger] = None ): - self.logger = logger or VulcanAILogger.log_manager - self.llm = Agent(model, self.logger) + # Logger default to a stdout logger if none is provided (StdoutLogSink) + self.logger = logger or VulcanAILogger.default() + self.llm = Agent(model, logger=self.logger) self.k = k - self.registry = registry or ToolRegistry(logger=(logger or VulcanAILogger.log_registry)) + self.registry = registry or ToolRegistry(logger=self.logger) self.validator = validator - self.executor = PlanExecutor(self.registry, logger=(logger or VulcanAILogger.log_executor)) + self.executor = PlanExecutor(self.registry, logger=self.logger) self.bb = Blackboard() self.user_context = "" # History is saved as a list of Tuples of user requests and plan summaries @@ -88,6 +90,7 @@ def handle_user_request(self, user_text: str, context: Dict[str, Any]) -> Dict[s :return: A dictionary with the execution result, including the plan used and the final blackboard state. """ try: + # Get plan from LLM plan = self.get_plan_from_user_request(user_text, context) if not plan: @@ -97,12 +100,12 @@ def handle_user_request(self, user_text: str, context: Dict[str, Any]) -> Dict[s try: self.validator.validate(plan) except Exception as e: - VulcanAILogger.log_validator(f"Plan validation error: {e}") + self.logger.log_validator(f"Plan validation: {e}", error=True) raise e # Execute plan ret = self.execute_plan(plan) except Exception as e: - self.logger(f"Error handling user request: {e}", error=True) + self.logger.log_manager(f"Error handling user request: {e}", error=True) ret = {"error": str(e)} return ret @@ -129,7 +132,7 @@ def get_plan_from_user_request(self, user_text: str, context: Dict[str, Any] = N # Query LLM plan = self.llm.inference_plan(system_prompt, user_prompt, images, self.history) - self.logger(f"Plan received:\n{plan}") + self.logger.log_manager(f"Plan received:\n{plan}") # Save to history if plan: self._add_to_history(user_prompt, plan.summary) @@ -164,7 +167,7 @@ def _build_prompt(self, user_text: str, ctx: Dict[str, Any]) -> Tuple[str, str]: """ tools = self.registry.top_k(user_text, self.k) if not tools: - self.logger("No tools available in the registry.", error=True) + self.logger.log_manager(f"No tools available in the registry.", error=True) return "", "" tool_descriptions = [] for tool in tools: @@ -196,13 +199,22 @@ def update_history_depth(self, new_depth: int): :param new_depth: The new history depth. """ self.history_depth = max(0, int(new_depth)) - self.logger(f"Updated history depth to {new_depth}") + self.logger.log_console(f"Updated history depth to {new_depth}") if len(self.history) > self.history_depth: if self.history_depth <= 0: self.history = [] else: self.history = self.history[-self.history_depth:] + def update_k_index(self, new_k: int): + """ + Update the k index. + + :param new_k: The new k index. + """ + self.k = max(1, int(new_k)) + self.logger.log_console(f"Updated k index to {new_k}") + def _get_prompt_template(self) -> str: template = """ You are a planner assistant controlling a robotic system. diff --git a/src/vulcanai/core/manager_iterator.py b/src/vulcanai/core/manager_iterator.py index a209006..37aa960 100644 --- a/src/vulcanai/core/manager_iterator.py +++ b/src/vulcanai/core/manager_iterator.py @@ -79,13 +79,13 @@ def handle_user_request(self, user_text: str, context: Dict[str, Any]) -> Dict[s self.goal = self._get_goal_from_user_request(user_text, context) self._timeline.append({"iteration": self.iter, "event": TimelineEvent.GOAL_SET.value}) except Exception as e: - self.logger(f"Error getting goal from user request: {e}", error=True) + self.logger.log_manager(f"Error getting goal from user request: {e}", error=True) return {"error": "Error getting goal from user request."} skip_verification_step = False while self.iter < self.max_iters: self.iter += 1 - self.logger(f"--- Iteration {self.iter} ---") + self.logger.log_manager(f"--- Iteration {self.iter} ---") try: # Check progress toward the goal and stop if achieved if not skip_verification_step and self._verify_progress(): @@ -96,13 +96,13 @@ def handle_user_request(self, user_text: str, context: Dict[str, Any]) -> Dict[s # Get plan from LLM plan = self.get_plan_from_user_request(user_text, context) if not plan: - self.logger(f"Error getting plan from model", error=True) + self.logger.log_manager(f"Error getting plan from model", error=True) self._timeline.append({"iteration": self.iter, "event": TimelineEvent.PLAN_GENERATION_FAILED.value}) skip_verification_step = True continue plan_str = str(plan.plan) if plan_str in self._used_plans: - self.logger("LLM produced a repeated plan. Stopping iterations.", error=True) + self.logger.log_manager("LLM produced a repeated plan. Stopping iterations.", error=True) self._timeline.append({"iteration": self.iter, "event": TimelineEvent.PLAN_REPEATED.value}) skip_verification_step = True continue @@ -113,7 +113,7 @@ def handle_user_request(self, user_text: str, context: Dict[str, Any]) -> Dict[s try: self.validator.validate(plan) except Exception as e: - self.logger(f"Plan validation error. Asking for new plan: {e}") + self.logger.log_manager(f"Plan validation error. Asking for new plan: {e}") self._timeline.append({"iteration": self.iter, "event": TimelineEvent.PLAN_NOT_VALID.value, "detail": str(e)}) continue @@ -134,10 +134,10 @@ def handle_user_request(self, user_text: str, context: Dict[str, Any]) -> Dict[s # If execution was successful, return the result if not ret.get("success", False): - self.logger(f"Iteration {self.iter} failed.", error=True) + self.logger.log_manager(f"Iteration {self.iter} failed.", error=True) except Exception as e: - self.logger(f"Error handling user request: {e}", error=True) + self.logger.log_manager(f"Error handling user request: {e}", error=True) return {"error": str(e), "timeline": self._timeline} return {"timeline": self._timeline, "success": self.bb.get("goal_achieved", False), "blackboard": self.bb.copy(), "plan": plan} @@ -192,7 +192,7 @@ def _build_prompt(self, user_text: str, ctx: Dict[str, Any]) -> Tuple[str, str]: """Override the prompt building method to include iteration information.""" tools = self.registry.top_k(user_text, self.k) if not tools: - self.logger("No tools available in the registry.", error=True) + self.logger.log_manager("No tools available in the registry.", error=True) return "", "" tool_descriptions = [] for tool in tools: @@ -389,7 +389,7 @@ def _get_goal_from_user_request(self, user_text, context) -> GoalSpec: # Query LLM goal = self.llm.inference_goal(system_prompt, user_prompt, self.history) - self.logger(f"Goal received:\n{goal}") + self.logger.log_manager(f"Goal received:\n{goal}") if not goal: raise Exception("Error: Goal inference failed to produce a valid goal.") @@ -418,7 +418,7 @@ def _init_single_tool_plan(self): def _set_single_tool_params(self, tool_name: str, args: List[ArgValue]): """Set the parameters of the single tool plan.""" if not self._single_tool_plan or not self._single_tool_plan.plan: - self.logger("Single tool plan is not initialized properly.", error=True) + self.logger.log_manager("Single tool plan is not initialized properly.", error=True) return self._single_tool_plan.plan[0].steps[0].tool = tool_name self._single_tool_plan.plan[0].steps[0].args = args @@ -436,7 +436,7 @@ def _verify_progress(self) -> bool: if mode == "objective": # Check if goal is already achieved if self._is_goal_achieved(): - self.logger("Goal achieved in objective mode. Stopping iterations.") + self.logger.log_manager("Goal achieved in objective mode. Stopping iterations.") return True else: return False @@ -458,7 +458,7 @@ def _verify_progress(self) -> bool: images_paths.extend(self.bb.get(tool.name, {}).get("images", [])) # Build prompt to check if goal is achieved system_prompt, user_prompt = self._build_validation_prompt(instruction, evidence) - self.logger(f"Running perceptual verification with instruction: {instruction} - Evidence: {evidence_keys} - Images: {images_paths}") + self.logger.log_manager(f"Running perceptual verification with instruction: {instruction} - Evidence: {evidence_keys} - Images: {images_paths}") try: validation = self.llm.inference_validation(system_prompt, user_prompt, images_paths, self.history) @@ -466,13 +466,13 @@ def _verify_progress(self) -> bool: self.bb["ai_validation"] = validation self.bb["goal_achieved"] = success if success: - self.logger(f"Goal achieved in perceptual mode. Stopping iterations. - {validation}") + self.logger.log_manager(f"Goal achieved in perceptual mode. Stopping iterations. - {validation}") return True else: - self.logger(f"Goal not achieved in perceptual mode. - {validation}") + self.logger.log_manager(f"Goal not achieved in perceptual mode. - {validation}") return False except Exception as e: - self.logger(f"Error during perceptual verification: {e}", error=True) + self.logger.log_manager(f"Error during perceptual verification: {e}", error=True) return False def _run_verification_tools(self): @@ -484,25 +484,25 @@ def _run_verification_tools(self): if self._timeline and len(self._timeline) > 0: last_event = self._timeline[-1].get('event', "N/A") if last_event not in (TimelineEvent.PLAN_EXECUTED.value, TimelineEvent.GOAL_SET.value): - self.logger("Skipping verification tools as no plan has been executed in this iteration.") + self.logger.log_manager("Skipping verification tools as no plan has been executed in this iteration.") return for verify_tool in self.goal.verify_tools: tool_name = verify_tool.tool tool_args = verify_tool.args tool = self.registry.tools.get(tool_name) if not tool: - self.logger(f"Verification tool '{tool_name}' not found in registry.", error=True) + self.logger.log_manager(f"Verification tool '{tool_name}' not found in registry.", error=True) continue try: self.bb self._set_single_tool_params(tool_name, tool_args) - self.logger(f"Running verification tool: {tool_name}") + self.logger.log_manager(f"Running verification tool: {tool_name}") result = self.execute_plan(self._single_tool_plan) if not result.get("success"): self.bb[tool_name] = {"error": "Validation tool execution failed"} - self.logger(f"Error running verification tool '{tool_name}'. BB entry of this tool will be empty.", error=True) + self.logger.log_manager(f"Error running verification tool '{tool_name}'. BB entry of this tool will be empty.", error=True) except Exception as e: - self.logger(f"Error running verification tool '{tool_name}': {e}", error=True) + self.logger.log_manager(f"Error running verification tool '{tool_name}': {e}", error=True) continue def _add_to_history(self, user_text: str, plan_summary: str): diff --git a/src/vulcanai/core/plan_types.py b/src/vulcanai/core/plan_types.py index e426957..678e019 100644 --- a/src/vulcanai/core/plan_types.py +++ b/src/vulcanai/core/plan_types.py @@ -15,6 +15,7 @@ from pydantic import BaseModel, Field from typing import List, Literal, Optional, Union +from vulcanai.console.logger import VulcanAILogger Kind = Literal["SEQUENCE","PARALLEL"] @@ -78,31 +79,59 @@ class GlobalPlan(BaseModel): def __str__(self) -> str: lines = [] if self.summary: - lines.append(f"- [bold]Plan Summary[/bold]: {self.summary}\n") + lines.append(f"- Plan Summary: {self.summary}\n") + + color_tool = VulcanAILogger.vulcanai_theme["executor"] + color_variable = VulcanAILogger.vulcanai_theme["validator"] + color_value = VulcanAILogger.vulcanai_theme["registry"] + color_error = VulcanAILogger.vulcanai_theme["error"] for i, node in enumerate(self.plan, 1): - lines.append(f"- PlanNode {i}: kind={node.kind}") + # - PlanNode : kind= + lines.append(f"- PlanNode {i}: <{color_variable}>kind=" + \ + f"<{color_value}>{node.kind}") + if node.condition: - lines.append(f" Condition: {node.condition}") + # Condition: + lines.append(f"\tCondition: <{color_value}>{node.condition}") if node.retry: - lines.append(f" Retry: {node.retry}") + # Retry: + lines.append(f"\t<{color_error}>Retry: " + \ + f"<{color_value}>{node.retry}") if node.timeout_ms: - lines.append(f" Timeout: {node.timeout_ms} ms") + # Timeout: ms + lines.append(f"\t<{color_error}>Timeout: " + \ + f"<{color_value}>{node.timeout_ms} ms") if node.success_criteria: - lines.append(f" Success Criteria: {node.success_criteria}") + # Succes Criteria: + lines.append(f"\<{color_tool}>tSuccess Criteria: " + \ + f"<{color_value}>{node.success_criteria}") if node.on_fail: - lines.append(f" On Fail: {node.on_fail.kind} with {len(node.on_fail.steps)} steps") + # On Fail: with steps + lines.append(f"\tOn Fail: <{color_value}>{node.on_fail.kind} with " + \ + f"<{color_value}>{len(node.on_fail.steps)} steps") for j, step in enumerate(node.steps, 1): - arg_str = ", ".join([f"{a.key}={a.val}" for a in step.args]) if step.args else "no args" - lines.append(f" Step {j}: {step.tool}({arg_str})") + #arg_str: =, ..., = + arg_str = ", ".join([f"<{color_variable}>{a.key}=" + \ + f"<{color_value}>{a.val}" for a in step.args]) \ + if step.args else f"<{color_value}>no args" + # Step : () + lines.append(f"\tStep {j}: <{color_tool}>{step.tool}({arg_str})") if step.condition: - lines.append(f" Condition: {step.condition}") + # Condition: + lines.append(f"\t Condition: <{color_value}>{step.condition}") if step.retry: - lines.append(f" Retry: {step.retry}") + # Condition: + lines.append(f"\t <{color_error}>Retry: " + \ + f"<{color_value}>{step.retry}") if step.timeout_ms: - lines.append(f" Timeout: {step.timeout_ms} ms") + # Timeout: ms + lines.append(f"\t <{color_error}>Timeout: " + \ + f"<{color_value}>{step.timeout_ms} ms") if step.success_criteria: - lines.append(f" Success Criteria: {step.success_criteria}") + # Success Criteria: + lines.append(f"\t <{color_tool}>Success Criteria: " + \ + f"<{color_value}>{step.success_criteria}") return "\n".join(lines) @@ -123,7 +152,7 @@ class GoalSpec(BaseModel): def __str__(self) -> str: lines = [] if self.summary: - lines.append(f"- [bold]Goal Summary[/bold]: {self.summary}") + lines.append(f"- Goal Summary: {self.summary}") if self.mode: lines.append(f"- Verification Mode: {self.mode}") diff --git a/src/vulcanai/models/gemini.py b/src/vulcanai/models/gemini.py index 2816c46..c7baef6 100644 --- a/src/vulcanai/models/gemini.py +++ b/src/vulcanai/models/gemini.py @@ -26,6 +26,7 @@ class GeminiModel(IModel): + """ Wrapper for most of Google models, Gemini mainly. """ def __init__(self, model_name:str, logger=None, hooks: Optional[IModelHooks] = None): super().__init__() @@ -35,7 +36,7 @@ def __init__(self, model_name:str, logger=None, hooks: Optional[IModelHooks] = N try: self.model = genai.Client(api_key=os.environ.get("GEMINI_API_KEY")) except Exception as e: - self.logger(f"Missing Gemini API Key: {e}", error=True) + self.logger.log_manager(f"ERROR. Missing Gemini API Key: {e}", error=True) def _inference( self, @@ -83,11 +84,14 @@ def _inference( contents=messages, config=cfg, ) + + # Extract parsed object safely parsed_response: Optional[T] = None try: parsed_response = response.parsed except Exception as e: - self.logger(f"Failed to get parsed goal from Gemini response, falling back to text: {e}", error=True) + self.logger.log_manager(f"ERROR. Failed to get parsed goal from Gemini response, " + \ + f"falling back to text: {e}", error=True) finally: # Notify hooks of request end try: @@ -110,13 +114,16 @@ def _inference( import json parsed_response = GoalSpec(**json.loads(raw)) except Exception as e: - self.logger(f"Failed to parse raw {response_cls.__name__} JSON: {e}", error=True) - + self.logger.log_manager(f"ERROR. Failed to parse raw {response_cls.__name__} JSON: {e}", + error=True) end = time.time() - self.logger(f"Gemini response time: {end - start:.3f} seconds") + self.logger.log_manager(f"Gemini response time: {end - start:.3f} seconds") usage = getattr(response, "usage_metadata", None) if usage: - self.logger(f"Prompt tokens: {usage.prompt_token_count}, Completion tokens: {usage.candidates_token_count}") + input_tokens = usage.prompt_token_count + output_tokens = usage.candidates_token_count + self.logger.log_manager(f"Prompt tokens: [manager]{input_tokens}[/manager], " + \ + f"Completion tokens: [manager]{output_tokens}[/manager]") return parsed_response @@ -130,7 +137,8 @@ def _build_user_content(self, user_text: str, images: Optional[Iterable[str]]) - import requests img = requests.get(image_path) if img.status_code != 200: - self.logger(f"Failed to fetch image from URL: {image_path}", error=True) + self.logger.log_manager(f"ERROR. Failed to fetch image from URL '{image_path}' ", + error=True) continue content.append(gtypes.Part.from_bytes(data=img.content, mime_type=img.headers.get("Content-Type", "image/png"))) else: @@ -143,7 +151,9 @@ def _build_user_content(self, user_text: str, images: Optional[Iterable[str]]) - )) except Exception as e: # Fail soft on a single bad image but continue with others - self.logger(f"Image '{image_path}' could not be encoded: {e}", error=True) + + self.logger.log_manager(f"Fail soft. Image '{image_path}' could not be encoded: {e}", + error=True) return content diff --git a/src/vulcanai/models/ollama_model.py b/src/vulcanai/models/ollama_model.py index 6ce1dd4..7e9877f 100644 --- a/src/vulcanai/models/ollama_model.py +++ b/src/vulcanai/models/ollama_model.py @@ -24,6 +24,7 @@ class OllamaModel(IModel): + """ Wrapper for Ollama models. """ def __init__(self, model_name: str, logger=None, hooks: Optional[IModelHooks] = None): super().__init__() @@ -33,7 +34,7 @@ def __init__(self, model_name: str, logger=None, hooks: Optional[IModelHooks] = try: self.model = ollama except Exception as e: - self.logger(f"Error loading Ollama client: {e}", error=True) + self.logger.log_manager(f"ERROR. Missing a API Key: {e}", error=True) def _inference( self, @@ -77,7 +78,7 @@ def _inference( options={"temperature": 0.1} ) except Exception as e: - self.logger(f"Ollama API error: {e}", error=True) + self.logger.log_manager(f"ERROR. Ollama API: {e}", error=True) return None finally: # Notify hooks of request end @@ -91,14 +92,16 @@ def _inference( try: parsed = response_cls.model_validate_json(completion.message.content) except Exception as e: - self.logger(f"Failed to parse response into {response_cls.__name__}: {e}", error=True) + self.logger.log_manager(f"ERROR. Failed to parse response into {response_cls.__name__}: {e}", + error=True) end = time.time() - self.logger(f"Ollama response time: {end - start:.3f} seconds") + self.logger.log_manager(f"Ollama response time: {end - start:.3f} seconds") try: input_tokens = completion.prompt_eval_count output_tokens = completion.eval_count - self.logger(f"Prompt tokens: {input_tokens}, Completion tokens: {output_tokens}") + self.logger.log_manager(f"Prompt tokens: [manager]{input_tokens}[/manager], " + \ + f"Completion tokens: [manager]{output_tokens}[/manager]") except Exception: pass @@ -115,7 +118,9 @@ def _build_user_content(self, user_text: str, images: Optional[Iterable[str]]) - encoded_images.append(base64_image) except Exception as e: # Fail soft on a single bad image but continue with others - self.logger(f"Image '{image_path}' could not be encoded: {e}", error=True) + + self.logger.log_manager(f"Fail soft. Image '{image_path}' could not be encoded: {e}", + error=True) if encoded_images: content["images"] = encoded_images return content diff --git a/src/vulcanai/models/openai.py b/src/vulcanai/models/openai.py index dde0f69..172e434 100644 --- a/src/vulcanai/models/openai.py +++ b/src/vulcanai/models/openai.py @@ -23,8 +23,8 @@ # Generic type variable for response classes T = TypeVar('T', GlobalPlan, GoalSpec, AIValidation) - class OpenAIModel(IModel): + """ Wrapper for OpenAI models. """ def __init__(self, model_name: str, logger=None, hooks: Optional[IModelHooks] = None): super().__init__() @@ -34,7 +34,7 @@ def __init__(self, model_name: str, logger=None, hooks: Optional[IModelHooks] = try: self.model = OpenAI() except Exception as e: - self.logger(f"Missing OpenAI API Key: {e}", error=True) + self.logger.log_manager(f"Missing OpenAI API Key: {e}", error=True) def _inference( self, @@ -77,7 +77,7 @@ def _inference( response_format=response_cls, ) except Exception as e: - self.logger(f"OpenAI API error: {e}", error=True) + self.logger.log_manager(f"OpenAI API: {e}", error=True) return None finally: # Notify hooks of request end @@ -91,14 +91,15 @@ def _inference( try: parsed = completion.choices[0].message.parsed except Exception as e: - self.logger(f"Failed to parse response into {response_cls.__name__}: {e}", error=True) + self.logger.log_manager(f"Failed to parse response into {response_cls.__name__}: {e}", error=True) end = time.time() - self.logger(f"GPT response time: {end - start:.3f} seconds") + self.logger.log_manager(f"GPT response time: [manager]{end - start:.3f} seconds[/manager]") try: input_tokens = completion.usage.prompt_tokens output_tokens = completion.usage.completion_tokens - self.logger(f"Prompt tokens: {input_tokens}, Completion tokens: {output_tokens}") + self.logger.log_manager(f"Prompt tokens: [manager]{input_tokens}[/manager], " + \ + f"Completion tokens: [manager]{output_tokens}[/manager]") except Exception: pass @@ -121,7 +122,8 @@ def _build_user_content(self, user_text: str, images: Optional[Iterable[str]]) - }) except Exception as e: # Fail soft on a single bad image but continue with others - self.logger(f"Image '{image_path}' could not be encoded: {e}", error=True) + + self.logger.log_manager(f"Fail soft. Image '{image_path}' could not be encoded: {e}", error=True) return content def _build_messages( diff --git a/src/vulcanai/tools/tool_registry.py b/src/vulcanai/tools/tool_registry.py index 40dc0a9..0807deb 100644 --- a/src/vulcanai/tools/tool_registry.py +++ b/src/vulcanai/tools/tool_registry.py @@ -37,7 +37,7 @@ class HelpTool(ITool): """A tool that provides help information.""" name = "help" description = "Provides help information for using the library. It can list all available tools or" \ - " give info about the usage of a specific tool if is provided as an argument." + " give info about the usage of a specific tool if 'tool_name' is provided as an argument." tags = ["help", "info", "documentation", "usage", "developer", "manual", "available tools"] input_schema = [("tool", "string")] output_schema = {"info": "str"} @@ -72,12 +72,15 @@ def run(self, **kwargs): class ToolRegistry: + """Holds all known tools and performs vector search over metadata.""" def __init__(self, embedder=None, logger=None): - # Logging function - self.logger = logger or VulcanAILogger.log_registry - # Dictionary of tool name -> tool instance + # Logging function from the class VulcanConsole + self.logger = logger or VulcanAILogger.default() + # Dictionary of tools (name -> tool instance) self.tools: Dict[str, ITool] = {} + # Dictionary of deactivated_tools (name -> tool instance) + self.deactivated_tools: Dict[str, ITool] = {} # Embedding model for tool metadata self.embedder = embedder or SBERTEmbedder() # Simple in-memory index of (name, embedding) @@ -95,18 +98,57 @@ def register_tool(self, tool: ITool, solve_deps: bool = True): # Avoid duplicates if tool.name in self.tools: return + self.tools[tool.name] = tool if tool.is_validation_tool: self.validation_tools.append(tool.name) emb = self.embedder.embed(self._doc(tool)) self._index.append((tool.name, emb)) - self.logger(f"Registered tool: {tool.name}") + self.logger.log_registry(f"Registered tool: [registry]{tool.name}[/registry]") self.help_tool.available_tools = self.tools if solve_deps: # Get class of tool if issubclass(type(tool), CompositeTool): self._resolve_dependencies(tool) + def activate_tool(self, tool_name) -> bool: + """Activate a singles tool instance.""" + # Check if the tool is already active + if tool_name in self.tools: + return False + # Check if the tool is deactivated + if tool_name not in self.deactivated_tools: + self.logger.log_registry(f"Tool [registry]'{tool_name}'[/registry] " + \ + f"not found in the deactivated tools list.", error=True) + return False + + # Add the tool to the active tools + self.tools[tool_name] = self.deactivated_tools[tool_name] + + # Removed the tool from the deactivated tools + del self.deactivated_tools[tool_name] + + return True + + def deactivate_tool(self, tool_name) -> bool: + """Deactivate a singles tool instance.""" + # Check if the tool is already deactivated + if tool_name in self.deactivated_tools: + return False + # Check if the tool is active + if tool_name not in self.tools: + self.logger.log_registry(f"Tool [registry]'{tool_name}'[/registry] "+ \ + f"not found in the active tools list.", error=True) + return False + + # Add the tool to the deactivated tools + self.deactivated_tools[tool_name] = self.tools[tool_name] + + # Removed the tool from the active tools + del self.tools[tool_name] + + return True + def register(self): """Register all loaded classes marked with @vulcanai_tool.""" composite_classes = [] @@ -129,7 +171,7 @@ def _resolve_dependencies(self, tool: CompositeTool): for dep_name in tool.dependencies: dep_tool = self.tools.get(dep_name) if dep_tool is None: - self.logger(f"Dependency '{dep_name}' for tool '{tool.name}' not found.", error=True) + self.logger.log_registry(f"ERROR. Dependency '{dep_name}' for tool '{tool.name}' not found.", error=True) else: tool.resolved_deps[dep_name] = dep_tool @@ -148,8 +190,7 @@ def _load_tools_from_file(self, path: str): spec.loader.exec_module(module) self._loaded_modules.append(module) except Exception as e: - self.logger(f"Error loading tools from {path}: {e}", error=True) - + self.logger.log_registry(f"Could not load tools from {path}: {e}", error=True) def discover_tools_from_file(self, path: str): """Load tools from a Python file and register them.""" self._load_tools_from_file(path) @@ -172,7 +213,7 @@ def discover_ros(self): def top_k(self, query: str, k: int = 5, validation: bool = False) -> list[ITool]: """Return top-k tools most relevant to the query.""" if not self._index: - self.logger("No tools registered.", error=True) + self.logger.log_registry("No tools registered.", error=True) return [] # Filter tools based on validation flag @@ -185,11 +226,11 @@ def top_k(self, query: str, k: int = 5, validation: bool = False) -> list[ITool] active_names: set = val_names if validation else nonval_names if not active_names: # If there is no tool for the requested category, be explicit and return [] - self.logger( - f"No matching tools for the requested mode ({'validation' if validation else 'action'}).", error=True + self.logger.log_registry( + f"No matching tools for the requested mode ({'validation' if validation else 'action'}).", + error=True ) return [] - # If k > number of ALL tools, return required tools if k > len(self._index): return [self.tools[name] for name in active_names] @@ -197,9 +238,10 @@ def top_k(self, query: str, k: int = 5, validation: bool = False) -> list[ITool] filtered_index = [(name, vec) for (name, vec) in self._index if name in active_names] if not filtered_index: # Index might be stale; log and return [] - self.logger("Index has no entries for the selected tool subset.", error=True) - return [] + self.logger.log_registry("Index has no entries for the selected tool subset.", + error=True) + return [] # If k > number of required tools, return required tools if k > len(filtered_index): return [self.tools[name] for name in active_names] diff --git a/tests/integration_tests/test_iterative_manager.py b/tests/integration_tests/test_iterative_manager.py index d858036..f1d548d 100644 --- a/tests/integration_tests/test_iterative_manager.py +++ b/tests/integration_tests/test_iterative_manager.py @@ -14,10 +14,17 @@ import pytest -from vulcanai import ArgValue, GlobalPlan, PlanNode, Step -from vulcanai import IterativeManager -from vulcanai import AtomicTool, ValidationTool -from vulcanai.core.manager_iterator import TimelineEvent +from vulcanai import ( + ArgValue, + AtomicTool, + GlobalPlan, + IterativeManager, + PlanNode, + Step, + TimelineEvent, + ValidationTool, + VulcanAILogger, +) class DummyTool(AtomicTool): @@ -123,7 +130,7 @@ def top_k(self, user_text, k, validation=False): class MockAgent: """Mock agent that records prompts passed to inference_plan() and inference_goal().""" - def __init__(self, plans=[], goal=None, validation=None, success_validation=0): + def __init__(self, plans=[], goal=None, validation=None, success_validation=0, logger=None): """ :param plans: list[GlobalPlan] to return on successive inference_plan() calls :param goal: GoalSpec to return on inference_goal() calls @@ -177,12 +184,19 @@ def make_single_step_plan(summary="plan", tool="dummy_tool", key="arg", val="x", ### Fixtures ################# +class ListSink: + def __init__(self): + self.lines = [] # list[str] + + def write(self, msg: str, color: str = "") -> None: + self.lines.append(msg) + @pytest.fixture def logger(): - def _logger(msg, error=False): - print(("ERROR: " if error else "") + str(msg)) - pass - return _logger + sink = ListSink() + log = VulcanAILogger(sink=sink) + log.set_sink(sink) + return log @pytest.fixture(autouse=True) def patch_core_symbols(monkeypatch):