diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 757b8f4..3fa0410 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -35,7 +35,28 @@ jobs:
- name: Run unit tests
run: |
- python -m unittest discover -s tests/unittest -t . -p "test*.py" -v
+ python -m unittest discover -s tests/unittest -t . -p "test*.py" --ignore-patterns "test_default_tools.py" -v
+
+ ros2_unittests:
+ name: ROS 2 unit tests (default tools)
+ runs-on: ubuntu-24.04
+ container:
+ image: eprosima/vulcanexus:kilted-desktop
+
+ steps:
+ - name: Sync repository
+ uses: eProsima/eProsima-CI/external/checkout@v0
+
+ - name: Install VulcanAI library
+ run: |
+ python3 -m pip install -U pip --break-system-packages
+ python3 -m pip install -e .[test] --break-system-packages
+
+ - name: Run ROS 2 default tools tests
+ shell: bash
+ run: |
+ source /opt/ros/jazzy/setup.bash
+ python3 -m unittest discover -s tests/unittest -t . -p "test_default_tools.py" -v
integration:
name: Integration tests (pytest)
diff --git a/pyproject.toml b/pyproject.toml
index a7ae25c..ee51a38 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -56,3 +56,6 @@ select = ["E", "F", "I"]
[tool.ruff.lint.isort]
known-first-party = ["vulcanai"]
+
+[project.entry-points."ros2_default_tools"]
+default_tools = "vulcanai.tools.default_tools"
diff --git a/src/vulcanai/console/console.py b/src/vulcanai/console/console.py
index b333a5a..675f833 100644
--- a/src/vulcanai/console/console.py
+++ b/src/vulcanai/console/console.py
@@ -50,6 +50,7 @@ class VulcanConsole(App):
# Two panels: left (log + input) and right (history + variables)
# Right panel: 48 characters length
# Left panel: fills remaining space
+
CSS = """
Screen {
layout: horizontal;
@@ -74,6 +75,13 @@ class VulcanConsole(App):
border: tall #333333;
}
+ #streamcontent {
+ height: 0;
+ min-height: 0;
+ border: solid #56AA08;
+ display: none;
+ }
+
#llm_spinner {
height: 0;
display: none;
@@ -120,6 +128,7 @@ def __init__(
tools_from_entrypoints: str = "",
user_context: str = "",
main_node=None,
+ default_tools: bool = True,
):
super().__init__() # Textual lib
@@ -137,10 +146,14 @@ def __init__(
self.model = model
# 'k' value for top_k tools selection
self.k = k
+ # Flag to indicate if default tools should be enabled
+ self.default_tools = default_tools
# Iterative mode
self.iterative = iterative
# CustomLogTextArea instance
- self.left_pannel = None
+ self.main_pannel = None
+ # Subprocess output panel
+ self.stream_pannel = None
# Logger instance
self.logger = VulcanAILogger.default()
self.logger.set_textualizer_console(TextualLogSink(self))
@@ -166,6 +179,8 @@ def __init__(
# Streaming task control
self.stream_task = None
+ # Route logger output to subprocess panel when needed.
+ self._route_logs_to_stream_panel = False
# Suggestion index for RadioListModal
self.suggestion_index = -1
self.suggestion_index_changed = threading.Event()
@@ -188,7 +203,8 @@ async def on_mount(self) -> None:
Function called when the console is mounted.
"""
- self.left_pannel = self.query_one("#logcontent", CustomLogTextArea)
+ self.main_pannel = self.query_one("#logcontent", CustomLogTextArea)
+ self.stream_pannel = self.query_one("#streamcontent", CustomLogTextArea)
self.spinner_status = self.query_one("#llm_spinner", SpinnerStatus)
self.hooks = SpinnerHook(self.spinner_status)
@@ -197,7 +213,7 @@ async def on_mount(self) -> None:
sys.stdout = StreamToTextual(self, "stdout")
sys.stderr = StreamToTextual(self, "stderr")
- if self.main_node is not None:
+ if self.main_node is not None or self.default_tools:
attach_ros_logger_to_console(self)
self.loop = asyncio.get_running_loop()
@@ -223,6 +239,9 @@ def compose(self) -> ComposeResult:
with Horizontal():
# Left
with Vertical(id="left"):
+ # Subprocess stream area (hidden by default, shown on-demand)
+ streamcontent = CustomLogTextArea(id="streamcontent")
+ yield streamcontent
# Log Area
logcontent = CustomLogTextArea(id="logcontent")
yield logcontent
@@ -248,7 +267,6 @@ async def bootstrap(self) -> None:
Blocking operations (file I/O) run in executor, non-blocking in event loop.
"""
- # Initialize manager (potentially blocking, run in executor)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.init_manager)
@@ -278,6 +296,15 @@ async def bootstrap(self) -> None:
except Exception:
pass
+ # Load a default ROS 2 node if default tools are enabled but no node is provided
+ if self.default_tools and self.main_node is None:
+ try:
+ from vulcanai.tools.default_tools import ROS2DefaultToolNode
+
+ self.main_node = ROS2DefaultToolNode()
+ except ImportError:
+ self.logger.log_console("Unable to load ROS 2 default node for default tools.")
+
# -- Register tools (file I/O - run in executor) --
# File paths tools
for tool_file_path in self.register_from_file:
@@ -298,7 +325,7 @@ async def bootstrap(self) -> None:
self.is_ready = True
self.logger.log_console("VulcanAI Interactive Console")
- self.logger.log_console("Use 'Ctrl+Q' to quit.")
+ self.logger.log_console("Use '/exit' or press 'Ctrl+Q' to quit.")
# Activate the terminal input
self.set_input_enabled(True)
@@ -345,8 +372,11 @@ def worker(user_input: str = "") -> None:
self.logger.log_console(f"Output of plan: {bb_ret}")
except KeyboardInterrupt:
- self.logger.log_msg("Exiting...")
- return
+ if self.stream_task is None:
+ self.logger.log_msg("Exiting...")
+ else:
+ self.stream_task.cancel() # triggers CancelledError in the task
+ self.stream_task = None
except EOFError:
self.logger.log_msg("Exiting...")
return
@@ -447,17 +477,20 @@ async def open_checklist(self, tools_list: list[str], active_tools_num: int) ->
self.logger.log_console(f"Deactivated tool '{tool}'")
@work
- async def open_radiolist(self, option_list: list[str], tool: str = "") -> str:
+ async def open_radiolist(
+ self, option_list: list[str], tool: str = "", category: str = "", input_string: str = ""
+ ) -> str:
"""
Function used to open a RadioList ModalScreen in the console.
Used in the tool suggestion selection, for default tools.
"""
# Create the checklist dialog
- selected = await self.push_screen_wait(RadioListModal(option_list))
+ selected = await self.push_screen_wait(RadioListModal(option_list, category, input_string))
if selected is None:
self.logger.log_tool("Suggestion cancelled", tool_name=tool)
self.suggestion_index = -2
+ self.suggestion_index_changed.set()
return
self.logger.log_tool(f'Selected suggestion: "{option_list[selected]}"', tool_name=tool)
@@ -484,7 +517,7 @@ def cmd_help(self, _) -> None:
"/show_history - Show the current history\n"
"/clear_history - Clear the history\n"
"/plan - Show the last generated plan\n"
- "/rerun - Rerun the last plan\n"
+ "/rerun 'int' - Rerun the last plan or the specified plan by index\n"
"/bb - Show the last blackboard state\n"
"/clear - Clears the console screen\n"
"/exit - Exit the console\n"
@@ -624,12 +657,15 @@ async def _rerun_worker(self, args) -> None:
else:
selected_plan = int(args[0])
if selected_plan < -1:
- self.logger.log_console("Usage: /rerun 'int' [int >= -1].")
+ self.logger.log_console("Usage: /rerun 'int' [int > -1].")
return
if not self.plans_list:
self.logger.log_console("No plan to rerun.")
return
+ elif selected_plan >= len(self.plans_list):
+ self.logger.log_console("Selected Plan index do not exists. selected_plan >= len(executed_plans).")
+ return
self.logger.log_console(f"Rerunning {selected_plan}-th plan...")
@@ -657,7 +693,9 @@ def cmd_blackboard_state(self, _) -> None:
self.logger.log_console("No blackboard available.")
def cmd_clear(self, _) -> None:
- self.left_pannel.clear_console()
+ if self.stream_pannel is not None:
+ self.stream_pannel.clear_console()
+ self.main_pannel.clear_console()
def cmd_quit(self, _) -> None:
self.exit()
@@ -666,6 +704,54 @@ def cmd_quit(self, _) -> None:
# region Logging
+ def show_subprocess_panel(self) -> None:
+ """
+ Show the dedicated subprocess output panel at the top of the main panel.
+ """
+ if self.stream_pannel is None:
+ return
+
+ self.stream_pannel.clear_console()
+ self.stream_pannel.display = True
+ self.stream_pannel.styles.height = 12
+ self.stream_pannel.refresh(layout=True)
+ self.refresh(layout=True)
+
+ def change_route_logs(self, value: bool = False) -> None:
+ """
+ Route logger sink output to stream panel.
+
+ value = True -> Stream panel
+ value = False -> Main panel
+ """
+
+ self._route_logs_to_stream_panel = value
+
+ def hide_subprocess_panel(self) -> None:
+ """
+ Hide the subprocess output panel and return space to the main log panel.
+ """
+ if self.stream_pannel is None:
+ return
+
+ self.stream_pannel.display = False
+ self.stream_pannel.styles.height = 0
+ self.stream_pannel.refresh(layout=True)
+ self.refresh(layout=True)
+
+ def add_subprocess_line(self, input: str) -> None:
+ """
+ Write output into the dedicated subprocess panel.
+ """
+ if self.stream_pannel is None:
+ self.add_line(input)
+ return
+
+ lines = input.splitlines()
+ for line in lines:
+ if not self.stream_pannel.append_line(line):
+ self.logger.log_console("Warning: Trying to add an empty subprocess line.")
+
def add_line(self, input: str, color: str = "", subprocess_flag: bool = False) -> None:
"""
Function used to write an input in the VulcanAI terminal.
@@ -679,20 +765,24 @@ def add_line(self, input: str, color: str = "", subprocess_flag: bool = False) -
color_begin = f"<{color}>"
color_end = f"{color}>"
+ target_panel = self.main_pannel
+ if self._route_logs_to_stream_panel and self.stream_pannel is not None and self.stream_pannel.display:
+ target_panel = self.stream_pannel
+
# Append each line; deque automatically truncates old ones
for line in lines:
line_processed = line
if subprocess_flag:
line_processed = escape(line)
text = f"{color_begin}{line_processed}{color_end}"
- if not self.left_pannel.append_line(text):
+ if not target_panel.append_line(text):
self.logger.log_console("Warning: Trying to add an empty line.")
def delete_last_line(self):
"""
Function used to remove the last line in the VulcanAI terminal.
"""
- self.left_pannel.delete_last_row()
+ self.main_pannel.delete_last_row()
# endregion
@@ -961,7 +1051,7 @@ async def on_key(self, event: events.Key) -> None:
def set_stream_task(self, input_stream):
"""
Function used in the tools to set the current streaming task.
- with this variable the user can finish the execution of the
+ With this variable the user can finish the execution of the
task by using the signal "Ctrl + C"
"""
self.stream_task = input_stream
@@ -1088,7 +1178,7 @@ def init_manager(self) -> None:
self.logger.log_console(f"Initializing Manager '{ConsoleManager.__name__}'...")
- self.manager = ConsoleManager(model=self.model, k=self.k, logger=self.logger)
+ self.manager = ConsoleManager(model=self.model, k=self.k, logger=self.logger, default_tools=self.default_tools)
self.logger.log_console(f"Manager initialized with model '{self.model.replace('ollama-', '')}'")
# Update right panel info
diff --git a/src/vulcanai/console/logger.py b/src/vulcanai/console/logger.py
index a6b6f88..9c781d5 100644
--- a/src/vulcanai/console/logger.py
+++ b/src/vulcanai/console/logger.py
@@ -45,8 +45,8 @@ class VulcanAILogger:
"executor": "#15B606",
"vulcanai": "#56AA08",
"user": "#91DD16",
- "validator": "#C49C00",
- "tool": "#EB921E",
+ "validator": "#9600C4",
+ "tool": "#C49C00",
"error": "#FF0000",
"console": "#8F6296",
"warning": "#D8C412",
diff --git a/src/vulcanai/console/modal_screens.py b/src/vulcanai/console/modal_screens.py
index 829ad13..81bcb3b 100644
--- a/src/vulcanai/console/modal_screens.py
+++ b/src/vulcanai/console/modal_screens.py
@@ -15,6 +15,7 @@
from textual import events
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
+from textual.content import Content
from textual.screen import ModalScreen
from textual.widgets import Button, Checkbox, Input, Label, RadioButton, RadioSet
@@ -170,9 +171,16 @@ class CheckListModal(ModalScreen[list[str] | None]):
}
.btns {
- height: 3; /* give buttons row a fixed height */
- padding-top: 1;
- content-align: right middle;
+ height: auto;
+ width: 100%;
+ margin-top: 1;
+ padding: 0;
+ content-align: center middle;
+ align-horizontal: center;
+ }
+
+ .btns Button {
+ padding: 0 3;
}
"""
@@ -209,6 +217,18 @@ def on_mount(self) -> None:
class RadioListModal(ModalScreen[str | None]):
+ class SquareRadioButton(RadioButton):
+ # BUTTON_INNER = '●'
+ @property
+ def _button(self) -> Content:
+ button_style = self.get_visual_style("toggle--button")
+ symbol = "☒" if self.value else "☐"
+ return Content.assemble(
+ (" ", button_style),
+ (symbol, button_style),
+ (" ", button_style),
+ )
+
CSS = """
RadioListModal {
align: center middle;
@@ -218,7 +238,6 @@ class RadioListModal(ModalScreen[str | None]):
width: 60%;
max-width: 90%;
height: 40%;
- border: round $accent;
padding: 1 2;
background: $panel;
}
@@ -233,26 +252,36 @@ class RadioListModal(ModalScreen[str | None]):
}
.btns {
- height: 3;
- padding-top: 1;
- content-align: right middle;
+ height: auto;
+ width: 100%;
+ margin-top: 1;
+ padding: 0;
+ content-align: center middle;
+ align-horizontal: center;
+ }
+
+ .btns Button {
+ padding: 0 1;
}
"""
- def __init__(self, lines: list[str], default_index: int = 0) -> None:
+ def __init__(self, lines: list[str], category: str = "", input_string: str = "", default_index: int = 0) -> None:
super().__init__()
self.lines = lines
+ self.category = category
+ self.input_string = input_string
self.default_index = default_index
def compose(self) -> ComposeResult:
+ dialog_msg = f"{self.category} '{self.input_string}' does not exist. Choose a suggestion:"
with Vertical(classes="dialog"):
- yield Label("Pick one option", classes="title")
+ yield Label(dialog_msg, classes="title")
# One-select radio list
with VerticalScroll(classes="radio-list"):
with RadioSet(id="radio-set"):
for i, line in enumerate(self.lines):
- yield RadioButton(line, id=f"rb{i}", value=(i == self.default_index))
+ yield self.SquareRadioButton(line, id=f"rb{i}", value=(i == self.default_index))
# Buttons
with Horizontal(classes="btns"):
@@ -260,7 +289,7 @@ def compose(self) -> ComposeResult:
yield Button("Submit", variant="primary", id="submit")
def on_mount(self) -> None:
- first_rb = self.query_one(RadioButton)
+ first_rb = self.query_one(self.SquareRadioButton)
self.set_focus(first_rb)
def on_button_pressed(self, event: Button.Pressed) -> None:
diff --git a/src/vulcanai/console/utils.py b/src/vulcanai/console/utils.py
index a6e72b7..4c8dce6 100644
--- a/src/vulcanai/console/utils.py
+++ b/src/vulcanai/console/utils.py
@@ -1,4 +1,4 @@
-# Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,8 +14,6 @@
import asyncio
-import difflib
-import heapq
import subprocess
import sys
import threading
@@ -57,10 +55,18 @@ def __init__(self, spinner_status):
self.spinner_status = spinner_status
def on_request_start(self, text="Querying LLM..."):
- self.spinner_status.start(text)
+ app = getattr(self.spinner_status, "app", None)
+ if app is not None and threading.current_thread() is not threading.main_thread():
+ app.call_from_thread(self.spinner_status.start, text)
+ else:
+ self.spinner_status.start(text)
def on_request_end(self):
- self.spinner_status.stop()
+ app = getattr(self.spinner_status, "app", None)
+ if app is not None and threading.current_thread() is not threading.main_thread():
+ app.call_from_thread(self.spinner_status.stop)
+ else:
+ self.spinner_status.stop()
def attach_ros_logger_to_console(console):
@@ -178,7 +184,6 @@ async def run_streaming_cmd_async(
if echo:
line_processed = escape(line)
console.add_line(line_processed)
-
# Count the line
line_count += 1
if max_lines is not None and line_count >= max_lines:
@@ -205,8 +210,6 @@ async def run_streaming_cmd_async(
except KeyboardInterrupt:
# Ctrl+C pressed → stop subprocess
console.logger.log_tool("[tool]Ctrl+C received:[/tool] terminating subprocess...", tool_name=tool_name)
- process.terminate()
-
finally:
try:
await asyncio.wait_for(process.wait(), timeout=3.0)
@@ -234,6 +237,8 @@ def _launcher() -> None:
tool_name=tool_name, # tool_header_str
)
)
+ # Keep the real task reference so Ctrl+C can cancel it.
+ console.set_stream_task(stream_task)
def _on_done(task: asyncio.Task) -> None:
if task.cancelled():
@@ -251,18 +256,12 @@ def _on_done(task: asyncio.Task) -> None:
stream_task.add_done_callback(_on_done)
- try:
- # Are we already in the Textual event loop thread?
- asyncio.get_running_loop()
- except RuntimeError:
- # No loop here → probably ROS thread. Bounce into Textual thread.
- console.app.call_from_thread(_launcher)
- else:
- # We *are* in the loop → just launch directly.
+ # Worker threads may have their own asyncio loop; only run directly on UI thread.
+ if threading.current_thread() is threading.main_thread():
_launcher()
+ else:
+ console.app.call_from_thread(_launcher)
- # Store the task in the console to be able to cancel it later
- console.set_stream_task(stream_task)
console.logger.log_tool("[tool]Subprocess created![tool]", tool_name=tool_name)
@@ -272,80 +271,3 @@ def run_oneshot_cmd(args: list[str]) -> str:
except subprocess.CalledProcessError as e:
raise Exception(f"Failed to run '{' '.join(args)}': {e.output}")
-
-
-def suggest_string(console, tool_name, string_name, input_string, real_string_list):
- ret = None
-
- def _similarity(a: str, b: str) -> float:
- """Return a similarity score between 0 and 1."""
- return difflib.SequenceMatcher(None, a, b).ratio()
-
- def _get_suggestions(real_string_list_comp: list[str], string_comp: str) -> tuple[str, list[str]]:
- """
- Function used to search for the most "similar" string in a list.
-
- Used in ROS2 cli commands to used the "simmilar" in case
- the queried topic does not exists.
-
- Example:
- real_string_list_comp = [
- "/parameter_events",
- "/rosout",
- "/turtle1/cmd_vel",
- "/turtle1/color_sensor",
- "/turtle1/pose",
- ]
- string_comp = "trtle1"
-
- @return
- str: the most "similar" string
- list[str] a sorted list by a similitud value
- """
-
- topic_list_pq = []
- n = len(string_comp)
-
- for string in real_string_list_comp:
- m = len(string)
- # Calculate the similitud
- score = _similarity(string_comp, string)
- # Give more value for the nearest size comparations.
- score -= abs(n - m) * 0.005
- # Max-heap (via negative score)
- heapq.heappush(topic_list_pq, (-score, string))
-
- # Pop ordered list
- ret_list: list[str] = []
- _, most_topic_similar = heapq.heappop(topic_list_pq)
-
- ret_list.append(most_topic_similar)
-
- while topic_list_pq:
- _, topic = heapq.heappop(topic_list_pq)
- ret_list.append(topic)
-
- return most_topic_similar, ret_list
-
- if input_string not in real_string_list:
- console.logger.log_tool(f'{string_name}: "{input_string}" does not exists', tool_name=tool_name)
-
- # Get the suggestions list sorted by similitud value
- _, topic_sim_list = _get_suggestions(real_string_list, input_string)
-
- # Open the ModalScreen
- console.open_radiolist(topic_sim_list, f"{tool_name}")
-
- # Wait for the user to select and item in the
- # RadioList ModalScreen
- console.suggestion_index_changed.wait()
-
- # Check if the user cancelled the suggestion
- if console.suggestion_index >= 0:
- ret = topic_sim_list[console.suggestion_index]
-
- # Reset suggestion index
- console.suggestion_index = -1
- console.suggestion_index_changed.clear()
-
- return ret
diff --git a/src/vulcanai/console/widget_custom_log_text_area.py b/src/vulcanai/console/widget_custom_log_text_area.py
index dd60f4c..bc624a4 100644
--- a/src/vulcanai/console/widget_custom_log_text_area.py
+++ b/src/vulcanai/console/widget_custom_log_text_area.py
@@ -21,6 +21,8 @@
from rich.style import Style
from textual.widgets import TextArea
+from vulcanai.console.logger import VulcanAILogger
+
class CustomLogTextArea(TextArea):
"""
@@ -45,7 +47,10 @@ class CustomLogTextArea(TextArea):
TAG_TOKEN_RE = re.compile(r"?[^>]+>")
def __init__(self, **kwargs):
- super().__init__(read_only=True, **kwargs)
+ # Disable the TextArea cursor in this read-only log panel.
+ # This prevents Textual from auto-scrolling to keep cursor/selection visible
+ # every time new text is inserted.
+ super().__init__(read_only=True, show_cursor=False, **kwargs)
# Lock used to avoid data races in 'self._lines_styles'
# when VulcanAI and ROS threads writes at the same time
@@ -68,6 +73,17 @@ def __init__(self, **kwargs):
# region UTILS
+ def is_near_vertical_scroll_end(self, tolerance: int = 1) -> bool:
+ """
+ Return True if the viewport is at, or very close to, the vertical end.
+
+ A small tolerance avoids false negatives after layout changes where
+ scroll position can be off by one line.
+ """
+ if not self.size:
+ return True
+ return (self.max_scroll_y - self.scroll_offset.y) <= max(0, tolerance)
+
def _trim_highlights(self) -> None:
"""
Function used to trim the CustomLogTextArea to the maximum number of lines.
@@ -279,6 +295,10 @@ def append_line(self, text: str) -> bool:
# [EXECUTOR] Invoking 'move_turtle' with args: ...
# [ROS] [INFO] Publishing message 1 to ...
with self._lock:
+ # Terminal-like behavior:
+ # keep following output only if the user was already at the bottom.
+ should_follow_output = self.is_near_vertical_scroll_end()
+
# Append via document API to keep row tracking consistent
# Only add a newline before the new line if there is already content
insert_text = ("\n" if self.document.text else "") + plain
@@ -301,8 +321,11 @@ def append_line(self, text: str) -> bool:
# Trim now
self._trim_highlights()
- # Scroll to end
- self.scroll_end(animate=False)
+ # Scroll to end only when the user was already at the bottom.
+ if should_follow_output:
+ self.scroll_end(animate=False, immediate=True, x_axis=False)
+ # Ensure we stay anchored after any pending layout updates.
+ self.call_after_refresh(self.scroll_end, animate=False, immediate=True, x_axis=False)
# Rebuild highlights and refresh
self._rebuild_highlights()
@@ -374,6 +397,12 @@ def action_copy_selection(self) -> None:
self.notify("No text selected to copy!")
return
- # Copy to clipboard, using pyperclip library
- pyperclip.copy(self.selected_text)
- self.notify("Selected area copied to clipboard!")
+ try:
+ # Copy to clipboard, using pyperclip library
+ pyperclip.copy(self.selected_text)
+ self.notify("Selected area copied to clipboard!")
+ except Exception as e:
+ error_color = VulcanAILogger.vulcanai_theme["error"]
+ self.append_line(f"<{error_color}>Clipboard error: {e}{error_color}>")
+ self.notify(f"Clipboard error: {e}")
+ return
diff --git a/src/vulcanai/console/widget_spinner.py b/src/vulcanai/console/widget_spinner.py
index a2019ed..b8c7a40 100644
--- a/src/vulcanai/console/widget_spinner.py
+++ b/src/vulcanai/console/widget_spinner.py
@@ -46,9 +46,14 @@ def _log_is_filling_space(self) -> bool:
"""
visible = max(1, self.logcontent.size.height)
lines = getattr(self.logcontent.document, "line_count", 0)
- return lines > visible
+ return lines >= visible
def start(self, text: str = "Querying LLM...") -> None:
+ # Keep the log anchored at bottom if the user was already following output.
+ if hasattr(self.logcontent, "is_near_vertical_scroll_end"):
+ was_at_bottom = self.logcontent.is_near_vertical_scroll_end()
+ else:
+ was_at_bottom = self.logcontent.is_vertical_scroll_end
self._spinner.text = Text(text, style="#0d87c0")
self.display = True
self.styles.height = 1
@@ -59,10 +64,19 @@ def start(self, text: str = "Querying LLM...") -> None:
else:
self._forced_compact = False
self.refresh(layout=True)
+ if was_at_bottom:
+ self.logcontent.scroll_end(animate=False, immediate=True, x_axis=False)
+ # Re-anchor after layout has settled.
+ self.call_after_refresh(self.logcontent.scroll_end, animate=False, immediate=True, x_axis=False)
self._timer.resume()
def stop(self) -> None:
+ # Keep the log anchored at bottom if the user was already following output.
+ if hasattr(self.logcontent, "is_near_vertical_scroll_end"):
+ was_at_bottom = self.logcontent.is_near_vertical_scroll_end()
+ else:
+ was_at_bottom = self.logcontent.is_vertical_scroll_end
self._timer.pause()
self.display = False
self.styles.height = 0
@@ -71,5 +85,9 @@ def stop(self) -> None:
self._forced_compact = False
self.refresh(layout=True)
self.logcontent.refresh(layout=True)
+ if was_at_bottom:
+ self.logcontent.scroll_end(animate=False, immediate=True, x_axis=False)
+ # Re-anchor after layout has settled.
+ self.call_after_refresh(self.logcontent.scroll_end, animate=False, immediate=True, x_axis=False)
self.update("")
diff --git a/src/vulcanai/core/executor.py b/src/vulcanai/core/executor.py
index 9b3010b..61d13f7 100644
--- a/src/vulcanai/core/executor.py
+++ b/src/vulcanai/core/executor.py
@@ -277,9 +277,9 @@ def _call_tool(
msg += "'{"
for key, value in arg_dict.items():
if first:
- msg += f"[validator]'{key}'[/validator]: " + f"[registry]'{value}'[/registry]"
+ msg += f"[tool]'{key}'[/tool]: " + f"[registry]'{value}'[/registry]"
else:
- msg += f", [validator]'{key}'[/validator]: " + f"[registry]'{value}'[/registry]"
+ msg += f", [tool]'{key}'[/tool]: " + f"[registry]'{value}'[/registry]"
first = False
msg += "}'"
self.logger.log_executor(msg)
@@ -315,15 +315,6 @@ def _call_tool(
+ "with result:"
)
- if isinstance(result, dict):
- for key, value in result.items():
- if key == "ros2":
- continue
- self.logger.log_msg(f"{key}")
- self.logger.log_msg(value)
- else:
- self.logger.log_msg(result)
-
return True, result
except concurrent.futures.TimeoutError:
self.logger.log_executor(
diff --git a/src/vulcanai/core/manager.py b/src/vulcanai/core/manager.py
index 66936d3..43988e3 100644
--- a/src/vulcanai/core/manager.py
+++ b/src/vulcanai/core/manager.py
@@ -33,12 +33,13 @@ def __init__(
k: int = 10,
hist_depth: int = 3,
logger: Optional[VulcanAILogger] = None,
+ default_tools: bool = True,
):
# Logger default to a stdout logger if none is provided (StdoutLogSink)
self.logger = logger or VulcanAILogger.default()
self.llm = Agent(model, logger=self.logger)
self.k = k
- self.registry = registry or ToolRegistry(logger=self.logger)
+ self.registry = registry or ToolRegistry(logger=self.logger, default_tools=default_tools)
self.validator = validator
self.executor = PlanExecutor(self.registry, logger=self.logger)
self.bb = Blackboard()
diff --git a/src/vulcanai/core/manager_iterator.py b/src/vulcanai/core/manager_iterator.py
index 8414792..e940b6d 100644
--- a/src/vulcanai/core/manager_iterator.py
+++ b/src/vulcanai/core/manager_iterator.py
@@ -43,8 +43,9 @@ def __init__(
logger=None,
max_iters: int = 5,
step_timeout_ms: Optional[int] = None,
+ default_tools: bool = True,
):
- super().__init__(model, registry, validator, k, max(3, hist_depth), logger)
+ super().__init__(model, registry, validator, k, max(3, hist_depth), logger, default_tools)
self.iter: int = 0
self.max_iters: int = int(max_iters)
diff --git a/src/vulcanai/core/manager_plan.py b/src/vulcanai/core/manager_plan.py
index 78b3cbb..32bfbdc 100644
--- a/src/vulcanai/core/manager_plan.py
+++ b/src/vulcanai/core/manager_plan.py
@@ -30,8 +30,17 @@ def __init__(
k: int = 5,
hist_depth: int = 3,
logger=None,
+ default_tools=True,
):
- super().__init__(model, registry=registry, validator=validator, k=k, hist_depth=hist_depth, logger=logger)
+ super().__init__(
+ model,
+ registry=registry,
+ validator=validator,
+ k=k,
+ hist_depth=hist_depth,
+ logger=logger,
+ default_tools=default_tools,
+ )
def _get_prompt_template(self) -> str:
"""
diff --git a/src/vulcanai/core/plan_types.py b/src/vulcanai/core/plan_types.py
index bdf915f..72d4fd7 100644
--- a/src/vulcanai/core/plan_types.py
+++ b/src/vulcanai/core/plan_types.py
@@ -89,7 +89,7 @@ def __str__(self) -> str:
lines.append(f"- Plan Summary: {self.summary}\n")
color_tool = VulcanAILogger.vulcanai_theme["executor"]
- color_variable = VulcanAILogger.vulcanai_theme["validator"]
+ color_variable = VulcanAILogger.vulcanai_theme["tool"]
color_value = VulcanAILogger.vulcanai_theme["registry"]
color_error = VulcanAILogger.vulcanai_theme["error"]
diff --git a/src/vulcanai/tools/default_tools.py b/src/vulcanai/tools/default_tools.py
new file mode 100644
index 0000000..841e6fa
--- /dev/null
+++ b/src/vulcanai/tools/default_tools.py
@@ -0,0 +1,1064 @@
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+This file contains the default tools given by VulcanAI.
+It contains atomic tools used to call ROS2 CLI.
+"""
+
+import importlib
+import json
+import threading
+import time
+from concurrent.futures import Future
+
+from vulcanai import AtomicTool, vulcanai_tool
+from vulcanai.tools.utils import execute_subprocess, last_output_lines, run_oneshot_cmd, suggest_string
+
+# ROS2 imports
+try:
+ import rclpy
+ from rclpy.node import Node
+ from rclpy.task import Future
+except ImportError:
+ raise ImportError("Unable to load default tools because no ROS 2 installation was found.")
+
+
+# This class contains a ROS 2 node that will be loaded if none is provided to launch ROS 2 default tools
+class ROS2DefaultToolNode(Node):
+ def __init__(self, name: str = "vulcanai_ros2_default_tools_node"):
+ if not rclpy.ok():
+ rclpy.init()
+ super().__init__(name)
+ # Dictionary to store created clients
+ self._vulcan_clients = {}
+ # Dictionary to store created publishers
+ self._vulcan_publishers = {}
+
+ # Ensure entities creation is thread-safe.
+ self.node_lock = threading.Lock()
+
+ def get_client(self, srv_type, srv_name):
+ """
+ Get a cached client for the specified service type and name or
+ create a new one if it doesn't exist.
+ """
+ key = (srv_type, srv_name)
+ with self.node_lock:
+ if key not in self._vulcan_clients:
+ client = self.create_client(srv_type, srv_name)
+ self._vulcan_clients[key] = client
+ self.get_logger().info(f"Created new client for {srv_name}")
+ return self._vulcan_clients[key]
+
+ def get_publisher(self, msg_type, topic_name):
+ """
+ Get a cached publisher for the specified message type and topic name or
+ create a new one if it doesn't exist.
+ """
+ key = (msg_type, topic_name)
+ with self.node_lock:
+ if key not in self._vulcan_publishers:
+ publisher = self.create_publisher(msg_type, topic_name, 10)
+ self._vulcan_publishers[key] = publisher
+ self.get_logger().info(f"Created new publisher for {topic_name}")
+ return self._vulcan_publishers[key]
+
+ def wait_for_message(self, msg_type, topic: str, timeout_sec: float = None):
+ """
+ Block until a message is received or timeout expires.
+ Subscriptions are created on demand and destroyed after use to avoid
+ handling spins and callbacks in a separate thread.
+ """
+ future = Future()
+
+ def callback(msg):
+ if not future.done():
+ future.set_result(msg)
+
+ sub = self.create_subscription(msg_type, topic, callback, 10)
+
+ rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec)
+ self.destroy_subscription(sub)
+
+ if future.done():
+ return future.result()
+ return None
+
+
+"""
+Available ROS 2 CLI commands that can be run with the tools in this file:
+
+- ros2 node
+ Commands:
+ info Output information about a node
+ list Output a list of available nodes
+
+- ros2 topic
+ Commands:
+ bw Display bandwidth used by topic
+ delay Display delay of topic from timestamp in header
+ echo Output messages from a topic
+ find Output a list of available topics of a given type
+ hz Print the average receiving rate to screen
+ info Print information about a topic
+ list Output a list of available topics
+ pub Publish a message to a topic
+ type Print a topic's type
+
+- ros2 service
+ Commands:
+ call Call a service
+ echo Echo a service
+ find Output a list of available services of a given type
+ info Print information about a service
+ list Output a list of available services
+ type Output a service's type
+
+- ros2 action
+ Commands:
+ info Print information about an action
+ list Output a list of action names
+ send_goal Send an action goal
+ type Print a action's type
+ echo Echo a action
+ find Find actions from type
+
+
+- ros2 param
+ Commands:
+ delete Delete parameter
+ describe Show descriptive information about declared parameters
+ dump Show all of the parameters of a node in a YAML file format
+ get Get parameter
+ list Output a list of available parameters
+ load Load parameter file for a node
+ set Set parameter
+
+
+- ros2 pkg
+ Commands:
+ executables Output a list of package specific executables
+ list Output a list of available packages
+ prefix Output the prefix path of a package
+ xml Output the XML of the package manifest or a specific ta
+
+
+- ros2 interfaces
+ Commands:
+ list List all interface types available
+ package Output a list of available interface types within one package
+ packages Output a list of packages that provide interfaces
+"""
+
+
+@vulcanai_tool
+class Ros2NodeTool(AtomicTool):
+ name = "ros2_node"
+ description = (
+ "Wrapper for `ros2 node` CLI."
+ "Run any subcommand: 'list', 'info'"
+ "With an optional argument 'node_name' for 'info' subcommand."
+ )
+ tags = ["ros2", "nodes", "cli", "info", "diagnostics"]
+
+ # - `command` lets you pick a single subcommand (list/info).
+ input_schema = [
+ ("command", "string"), # Command
+ ("topic_name", "string?"), # (optional) Topic name. (info/bw/delay/hz/type/pub)
+ ]
+
+ output_schema = {
+ "output": "string", # list of ros2 nodes or info of a node.
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None) # optional explicit subcommand
+ node_name = kwargs.get("node_name", "")
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ # -- Node name suggestions --
+ node_name_list_str = run_oneshot_cmd(["ros2", "node", "list"])
+ node_name_list = node_name_list_str.splitlines()
+
+ # -- Run `ros2 node list` ---------------------------------------------
+ if command == "list":
+ result["output"] = node_name_list_str
+
+ # -- Run `ros2 node info ` --------------------------------------
+ else:
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_topic = suggest_string(console, self.name, "Node", node_name, node_name_list)
+ if suggested_topic is not None:
+ node_name = suggested_topic
+
+ if not node_name:
+ raise ValueError("`command='{}'` requires `node_name`.".format("info"))
+
+ info_output = run_oneshot_cmd(["ros2", "node", "info", node_name])
+ result["output"] = info_output
+
+ return result
+
+
+@vulcanai_tool
+class Ros2TopicTool(AtomicTool):
+ name = "ros2_topic"
+ description = (
+ "Wrapper for `ros2 topic` CLI."
+ "Run a subcommand from: 'list', 'info', 'find', 'type', 'bw', 'delay', 'hz'."
+ "With optional arguments like 'topic_name', 'message_type', 'max_duration' or 'max_lines'"
+ )
+ tags = ["ros2", "topics", "cli", "info"]
+
+ # - `command` lets you pick a single subcommand (bw/hz/delay/find/pub/type).
+ input_schema = [
+ ("command", "string"), # Command
+ ("topic_name", "string?"), # (optional) Topic name. (info/bw/delay/hz/type/pub)
+ ("msg_type", "string?"), # (optional) Message type (`find` , `pub` )
+ ("max_duration", "number?"), # (optional) Seconds for streaming commands (bw/hz/delay)
+ ("max_lines", "int?"), # (optional) Cap number of lines for streaming commands
+ ]
+
+ output_schema = {
+ "output": "string", # output
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None) # optional explicit subcommand
+ topic_name = kwargs.get("topic_name", None)
+ msg_type = kwargs.get("msg_type", None)
+ # Streaming commands variables
+ max_duration = kwargs.get("max_duration", 60.0)
+ max_lines = kwargs.get("max_lines", 1000)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ topic_name_list_str = run_oneshot_cmd(["ros2", "topic", "list"])
+ topic_name_list = topic_name_list_str.splitlines()
+
+ # -- Topic name suggestions --
+ if command == "find":
+ # TODO?
+ """suggested_type = suggest_string(console, self.name, "Topic", msg_type, topic_name_list)
+ if suggested_type is not None:
+ msg_type = suggested_type"""
+ elif command != "list":
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_topic_name = suggest_string(console, self.name, "Topic", topic_name, topic_name_list)
+ if suggested_topic_name is not None:
+ topic_name = suggested_topic_name
+
+ # Check if the topic_name is null (suggest_string() failed)
+ if not topic_name:
+ raise ValueError("`command='{}'` requires `topic_name`.".format(command))
+
+ # -- Commands --
+ # -- ros2 topic list --------------------------------------------------
+ if command == "list":
+ result["output"] = topic_name_list_str
+
+ # -- ros2 topic info -------------------------------------
+ elif command == "info":
+ info_output = run_oneshot_cmd(["ros2", "topic", "info", topic_name])
+ result["output"] = info_output
+
+ # -- ros2 topic find -------------------------------------------
+ elif command == "find":
+ find_output = run_oneshot_cmd(["ros2", "topic", "find", msg_type])
+ find_topics = [line.strip() for line in find_output.splitlines() if line.strip()]
+ result["output"] = ", ".join(find_topics)
+
+ # -- ros2 topic type -------------------------------------
+ elif command == "type":
+ type_output = run_oneshot_cmd(["ros2", "topic", "type", topic_name])
+ result["output"] = type_output
+
+ # -- ros2 topic bw ---------------------------------------
+ elif command == "bw":
+ base_args = ["ros2", "topic", "bw", topic_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- ros2 topic delay ------------------------------------
+ elif command == "delay":
+ base_args = ["ros2", "topic", "delay", topic_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- ros2 topic hz ---------------------------------------
+ elif command == "hz":
+ base_args = ["ros2", "topic", "hz", topic_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(
+ f"Unknown command '{command}'. Expected one of: list, info, echo, bw, delay, hz, find, pub, type."
+ )
+
+ return result
+
+
+@vulcanai_tool
+class Ros2ServiceTool(AtomicTool):
+ name = "ros2_service"
+ description = (
+ "Wrapper for `ros2 service` CLI."
+ "Run any subcommand: 'list', 'info', 'type', 'find', 'call', 'echo'."
+ "With optional arguments like 'service_name', 'service_type', "
+ "'call', 'args', 'max_duration' or 'max_lines'"
+ )
+ tags = ["ros2", "services", "cli", "info", "call"]
+
+ # - `command` = "list", "info", "type", "call", "echo", "find"
+ input_schema = [
+ ("command", "string"), # Command
+ ("service_name", "string?"), # (optional) Service name. "info", "type", "call", "echo"
+ ("service_type", "string?"), # (optional) Service type. "find", "call"
+ ("call", "bool?"), # (optional) backwards-compatible call flag
+ ("args", "string?"), # (optional) YAML/JSON-like request data for `call`
+ ("max_duration", "number?"), # (optional) Maximum duration
+ ("max_lines", "int?"), # (optional) Maximum lines
+ ]
+
+ output_schema = {
+ "output": "string", # `ros2 service list`
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None)
+ service_name = kwargs.get("service_name", None)
+ service_type = kwargs.get("service_type", None)
+ call_args = kwargs.get("args", None)
+ # Streaming commands variables
+ max_duration = kwargs.get("max_duration", 2.0) # default for echo
+ max_lines = kwargs.get("max_lines", 50)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ service_name_list_str = run_oneshot_cmd(["ros2", "service", "list"])
+ service_name_list = service_name_list_str.splitlines()
+
+ # -- Service name suggestions --
+ if command == "find":
+ # TODO?
+ """suggested_type = suggest_string(console, self.name, "Service_Type", service_type, service_name_list)
+ if suggested_type is not None:
+ service_type = suggested_type"""
+
+ elif command != "list":
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_service_name = suggest_string(console, self.name, "Service", service_name, service_name_list)
+ if suggested_service_name is not None:
+ service_name = suggested_service_name
+
+ # Check if the service_name is null (suggest_string() failed)
+ if not service_name:
+ raise ValueError("`command='{}'` requires `service_name`.".format(command))
+
+ # -- ros2 service list ------------------------------------------------
+ if command == "list":
+ result["output"] = service_name_list_str
+
+ # -- ros2 service info ---------------------------------
+ elif command == "info":
+ info_output = run_oneshot_cmd(["ros2", "service", "info", service_name])
+ result["output"] = info_output
+
+ # -- ros2 service type ---------------------------------
+ elif command == "type":
+ type_output = run_oneshot_cmd(["ros2", "service", "type", service_name])
+ result["output"] = type_output.strip()
+
+ # -- ros2 service find -----------------------------------------
+ elif command == "find":
+ find_output = run_oneshot_cmd(["ros2", "service", "find", service_type])
+ result["output"] = find_output
+
+ # -- ros2 service call service_name service_type ----------------------
+ elif command == "call":
+ if call_args is None:
+ raise ValueError("`command='call'` requires `args`.")
+
+ # If service_type not given, detect it
+ if not service_type:
+ type_output = run_oneshot_cmd(["ros2", "service", "type", service_name])
+ service_type = type_output.strip()
+
+ call_output = run_oneshot_cmd(["ros2", "service", "call", service_name, service_type, call_args])
+ result["output"] = call_output
+
+ # -- ros2 service echo service_name -----------------------------------
+ elif command == "echo":
+ base_args = ["ros2", "service", "echo", service_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- unknown ------------------------------------------------------------
+ else:
+ raise ValueError(f"Unknown command '{command}'. Expected one of: list, info, type, call, echo, find.")
+
+ return result
+
+
+@vulcanai_tool
+class Ros2ActionTool(AtomicTool):
+ name = "ros2_action"
+ description = (
+ "Wrapper for `ros2 action` CLI."
+ "Run any subcommand: 'list', 'info', 'type', 'send_goal'."
+ "With optional arguments like 'action_name', 'action_type', "
+ "'goal_args'"
+ )
+ tags = ["ros2", "actions", "cli", "info", "goal"]
+
+ # - `command`: "list", "info", "type", "send_goal"
+ input_schema = [
+ ("command", "string"), # Command
+ ("action_name", "string?"), # (optional) Action name
+ ("action_type", "string?"), # (optional) Action type. "find"
+ ("send_goal", "bool?"), # (optional) legacy flag (backwards compatible)
+ ("goal_args", "string?"), # (optional) goal YAML, e.g. '{order: 5}'
+ ]
+
+ output_schema = {
+ "output": "string", # `ros2 action list`
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None)
+ action_name = kwargs.get("action_name", None)
+ action_type = kwargs.get("action_type", None)
+ goal_args = kwargs.get("goal_args", None)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ action_name_list_str = run_oneshot_cmd(["ros2", "action", "list"])
+ action_name_list = action_name_list_str.splitlines()
+
+ # -- Action name suggestions --
+ if command != "list":
+ # Check if the topic is not available ros2 topic list
+ suggested_action_name = suggest_string(console, self.name, "Action", action_name, action_name_list)
+ if suggested_action_name is not None:
+ action_name = suggested_action_name
+
+ # Check if the action_name is null (suggest_string() failed)
+ if not action_name:
+ raise ValueError("`command='{}'` requires `action_name`.".format(command))
+
+ # -- ros2 action list -------------------------------------------------
+ if command == "list":
+ result["output"] = action_name_list_str
+
+ # -- ros2 action info -----------------------------------
+ elif command == "info":
+ info_output = run_oneshot_cmd(["ros2", "action", "info", action_name])
+ result["output"] = info_output
+
+ # -- ros2 action type ------------------------------------
+ elif command == "get":
+ get_output = run_oneshot_cmd(["ros2", "param", "get", node_name, param_name])
+ result["output"] = get_output
+
+ # -- ros2 param describe -------------------------------
+ elif command == "describe":
+ describe_output = run_oneshot_cmd(["ros2", "param", "describe", node_name, param_name])
+ result["output"] = describe_output
+
+ # -- ros2 param set ------------------------
+ elif command == "set":
+ if set_value is None:
+ raise ValueError("`command='set'` requires `set_value`.")
+
+ set_output = run_oneshot_cmd(["ros2", "param", "set", node_name, param_name, set_value])
+ result["output"] = set_output
+
+ # -- ros2 param delete ----------------------------------
+ elif command == "delete":
+ delete_output = run_oneshot_cmd(["ros2", "param", "delete", node_name, param_name])
+ result["output"] = delete_output
+
+ # -- ros2 param dump [file_path] -------------------------------
+ elif command == "dump":
+ # Two modes:
+ # - If file_path given, write to file with --output-file
+ # - Otherwise, capture YAML from stdout
+ if file_path:
+ dump_output = run_oneshot_cmd(["ros2", "param", "dump", node_name, "--output-file", file_path])
+ # CLI usually prints a line like "Saved parameters to file..."
+ # so we just expose that.
+ result["output"] = dump_output or f"Dumped parameters to {file_path}"
+ else:
+ dump_output = run_oneshot_cmd(["ros2", "param", "dump", node_name])
+ result["output"] = dump_output
+
+ # -- ros2 param load -------------------------------
+ elif command == "load":
+ if not file_path:
+ raise ValueError("`command='load'` `file_path`.")
+
+ load_output = run_oneshot_cmd(["ros2", "param", "load", node_name, file_path])
+ result["output"] = load_output
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(
+ f"Unknown command '{command}'. Expected one of: list, get, describe, set, delete, dump, load."
+ )
+
+ return result
+
+
+@vulcanai_tool
+class Ros2PkgTool(AtomicTool):
+ name = "ros2_pkg"
+ description = "Wrapper for `ros2 pkg` CLI.Run any subcommand: 'list', 'executables'."
+ tags = ["ros2", "pkg", "packages", "cli", "introspection"]
+
+ # If package_name is not provided, runs: `ros2 pkg list`
+ # If provided, runs: `ros2 pkg executables `
+ input_schema = [
+ ("command", "string"), # Command
+ ]
+
+ output_schema = {
+ "output": "string", # list of packages or list of executables for a package.
+ }
+
+ def run(self, **kwargs):
+ # Get the package name if provided by the query
+ command = kwargs.get("command", None)
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ # -- Run `ros2 pkg list` ----------------------------------------------
+ if command == "list":
+ pkg_name_list = run_oneshot_cmd(["ros2", "pkg", "list"])
+ result["output"] = pkg_name_list
+
+ # -- Run `ros2 pkg executables` ---------------------------------------
+ elif command == "executables":
+ pkg_name_list = run_oneshot_cmd(["ros2", "pkg", "executables"])
+ result["output"] = pkg_name_list
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(f"Unknown command '{command}'. Expected one of: list, executables, prefix, xml")
+
+ return result
+
+
+@vulcanai_tool
+class Ros2InterfaceTool(AtomicTool):
+ name = "ros2_interface"
+ description = (
+ "Wrapper for `ros2 interface` CLI."
+ "Run any subcommand: 'list', 'packages', 'package', 'show'."
+ "With optional arguments like 'interface_name'."
+ )
+ tags = ["ros2", "interface", "msg", "srv", "cli", "introspection"]
+
+ # - `command` lets you pick a single subcommand (list/packages/package).
+ input_schema = [
+ ("command", "string"), # Command
+ ("interface_name", "string?"), # (optional) Name of the interface, e.g. "std_msgs/msg/String".
+ # If not provided, the command is `ros2 interface list`.
+ # Otherwise `ros2 interface show `.
+ ]
+
+ output_schema = {
+ "output": "string", # list of interfaces (as list of strings) or full interface definition.
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ # Get the interface name if provided by the query
+ command = kwargs.get("command", None)
+ interface_name = kwargs.get("interface_name", None)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ interface_name_list_str = run_oneshot_cmd(["ros2", "interface", "list"])
+ interface_name_list = interface_name_list_str.splitlines()
+
+ package_name_list_str = run_oneshot_cmd(["ros2", "interface", "packages"])
+ package_name_list = package_name_list_str.splitlines()
+
+ # -- ros2 interface list ----------------------------------------------
+ if interface_name is None:
+ result["output"] = interface_name_list_str
+
+ # -- ros2 interface packages ------------------------------------------
+ elif command == "packages":
+ result["output"] = package_name_list_str
+
+ # -- ros2 interface package --------------------------------
+ elif command == "package":
+ package_name = interface_name
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_package_name = suggest_string(console, self.name, "Interface", package_name, package_name_list)
+ if suggested_package_name is not None:
+ package_name = suggested_package_name
+
+ # Check if the interface_name is null (suggest_string() failed)
+ if not interface_name:
+ raise ValueError("`command='{}'` requires `interface_name`.".format(command))
+
+ info_output = run_oneshot_cmd(["ros2", "topic", "package", package_name])
+ result["output"] = info_output
+
+ # -- ros2 interface show --------------------------------
+ elif command == "show":
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_interface_name = suggest_string(
+ console, self.name, "Interface", interface_name, interface_name_list
+ )
+ if suggested_interface_name is not None:
+ interface_name = suggested_interface_name
+
+ # Check if the interface_name is null (suggest_string() failed)
+ if not interface_name:
+ raise ValueError("`command='{}'` requires `interface_name`.".format(command))
+
+ info_output = run_oneshot_cmd(["ros2", "topic", "show", interface_name])
+ result["output"] = info_output
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(
+ f"Unknown command '{command}'. Expected one of: list, info, echo, bw, delay, hz, find, pub, type."
+ )
+
+ return result
+
+
+def import_msg_type(type_str: str, node):
+ """
+ Dynamically import a ROS 2 message type from its string identifier.
+
+ This function resolves a ROS 2 message type expressed as a string
+ (e.g. `"std_msgs/msg/String"`) into the corresponding Python message class
+ (`std_msgs.msg.String`).
+ """
+ info_list = type_str.split("/")
+
+ if len(info_list) != 3:
+ pkg = "std_msgs"
+ msg_name = info_list[-1]
+ node.get_logger().warn(
+ f"Cannot import ROS message type '{type_str}'. " + "Adding default pkg 'std_msgs' instead."
+ )
+ else:
+ pkg, _, msg_name = info_list
+
+ module = importlib.import_module(f"{pkg}.msg")
+
+ return getattr(module, msg_name)
+
+
+@vulcanai_tool
+class Ros2PublishTool(AtomicTool):
+ name = "ros_publish"
+ description = (
+ "Publish one or more messages to a given ROS 2 topic . "
+ "Or execute 'ros2 topic pub '. "
+ "Supports both simple string messages (for std_msgs/msg/String) and custom message types. "
+ "For custom types, pass message_data as a JSON object with field names and values. "
+ "By default 10 messages 'Hello from VulcanAI PublishTool!' "
+ "with type 'std_msgs/msg/String' in topic '/chatter' "
+ "with 0.1 seconds of delay between messages to publish"
+ 'Example for custom type: msg_type=\'my_pkg/msg/MyMessage\', message_data=\'{"index": 1, "message": "Hello"}\''
+ )
+ tags = ["ros2", "publish", "message", "std_msgs"]
+
+ input_schema = [
+ ("topic", "string"), # e.g. "/chatter"
+ ("message_data", "string?"), # (optional) payload - string for std_msgs/String or JSON for custom types
+ ("msg_type", "string?"), # (optional) e.g. "std_msgs/msg/String" or "my_pkg/msg/CustomMsg"
+ ("max_lines", "int?"), # (optional) number of messages to publish
+ ("max_duration", "int?"), # (optional) stop after this seconds
+ ("period_sec", "float?"), # (optional) delay between publishes (in seconds)
+ ("message", "string?"), # (deprecated) use message_data instead
+ ]
+
+ output_schema = {
+ "published": "bool",
+ "count": "int",
+ "topic": "string",
+ "output": "string",
+ }
+
+ def msg_from_dict(self, msg, values: dict):
+ """
+ Populate a ROS 2 message instance from a Python dictionary.
+
+ This function recursively assigns values from a dictionary to the
+ corresponding fields of a ROS 2 message instance.
+
+ Supports:
+ - Primitive fields (int, float, bool, string)
+ - Nested ROS 2 messages
+
+ """
+ for field, value in values.items():
+ attr = getattr(msg, field)
+ if hasattr(attr, "__slots__"):
+ self.msg_from_dict(attr, value)
+ else:
+ setattr(msg, field, value)
+
+ def run(self, **kwargs):
+ # Ros2 node to create the Publisher and print the log information
+ node = self.bb.get("main_node", None)
+ if node is None:
+ raise Exception("Could not find shared node, aborting...")
+ # Optional console handle to route logs to the subprocess panel.
+ console = self.bb.get("console", None)
+
+ result = {
+ "published": "False",
+ "count": "0",
+ "topic": "",
+ "output": "",
+ }
+
+ panel_enabled = console is not None and hasattr(console, "show_subprocess_panel")
+ if panel_enabled:
+ console.call_from_thread(console.show_subprocess_panel)
+ if hasattr(console, "change_route_logs"):
+ console.call_from_thread(console.change_route_logs, True)
+
+ topic_name = kwargs.get("topic", "/chatter")
+ # Support both 'message_data' (new) and 'message' (deprecated)
+ message_data = kwargs.get("message_data", kwargs.get("message", "Hello from VulcanAI PublishTool!"))
+ msg_type_str = kwargs.get("msg_type", "std_msgs/msg/String")
+
+ max_duration = kwargs.get("max_duration", 60)
+ if not isinstance(max_duration, int):
+ max_duration = 60
+
+ max_lines = kwargs.get("max_lines", 200)
+ if not isinstance(max_lines, int):
+ max_lines = 200
+
+ period_sec = kwargs.get("period_sec", 0.1)
+
+ qos_depth = 10
+
+ if console is None:
+ print("[ERROR] Console not is None")
+
+ return result
+
+ published_msgs = []
+ publisher = None
+ cancel_token = None
+
+ try:
+ if not topic_name:
+ console.call_from_thread(console.logger.log_msg, "[ROS] [ERROR] No topic provided.")
+ return result
+
+ result["topic"] = topic_name
+
+ if max_lines <= 0:
+ # No messages to publish
+ console.call_from_thread(
+ console.logger.log_msg, "[ROS] [WARN] max_lines <= 0, nothing to publish."
+ )
+ return result
+
+ MsgType = import_msg_type(msg_type_str, node)
+ publisher = node.create_publisher(MsgType, topic_name, qos_depth)
+ cancel_token = Future()
+ console.set_stream_task(cancel_token)
+ console.logger.log_tool("[tool]Publisher created![tool]", tool_name=self.name)
+
+ for _ in range(max_lines):
+ if cancel_token.cancelled():
+ console.logger.log_tool("[tool]Ctrl+C received:[/tool] stopping publish...", tool_name=self.name)
+ break
+
+ msg = MsgType()
+
+ # Try to populate message based on message type
+ if hasattr(msg, "data"):
+ # Standard message type with a 'data' field (e.g., std_msgs/msg/String)
+ msg.data = message_data
+ else:
+ # Custom message type - parse message_data as JSON
+ try:
+ payload = json.loads(message_data)
+ self.msg_from_dict(msg, payload)
+ except json.JSONDecodeError as e:
+ console.call_from_thread(
+ console.logger.log_msg,
+ "[ROS] [ERROR] Failed to parse message_data as JSON for custom type"
+ + f"'{msg_type_str}': {e}",
+ )
+ return result
+
+ if hasattr(msg, "data"):
+ console.call_from_thread(
+ console.logger.log_msg, f"[ROS] [INFO] Publishing: '{msg.data}'"
+ )
+ else:
+ console.call_from_thread(
+ console.logger.log_msg,
+ f"[ROS] [INFO] Publishing custom message to '{topic_name}'",
+ )
+ publisher.publish(msg)
+ published_msgs.append(msg.data if hasattr(msg, "data") else str(msg))
+
+ rclpy.spin_once(node, timeout_sec=0.05)
+
+ if period_sec and period_sec > 0.0:
+ time.sleep(period_sec)
+
+ finally:
+ console.set_stream_task(None)
+ if panel_enabled:
+ if hasattr(console, "change_route_logs"):
+ console.call_from_thread(console.change_route_logs, False)
+ console.call_from_thread(console.hide_subprocess_panel)
+ if publisher is not None:
+ try:
+ node.destroy_publisher(publisher)
+ except Exception:
+ pass
+
+ result["subscribed"] = "True"
+ result["published_msgs"] = published_msgs
+ result["count"] = len(published_msgs)
+ return result
+
+
+@vulcanai_tool
+class Ros2SubscribeTool(AtomicTool):
+ name = "ros_subscribe"
+ description = (
+ "Subscribe to a topic or execute 'ros2 topic echo ' "
+ "and stop after receiving N messages or max duration."
+ )
+ tags = ["ros2", "subscribe", "topic", "std_msgs"]
+
+ input_schema = [
+ ("topic", "string"), # topic name
+ ("max_lines", "int?"), # (optional) stop after this number of messages
+ ("max_duration", "int?"), # (optional) stop after this seconds
+ ]
+
+ output_schema = {
+ "subscribed": "bool",
+ "count": "int",
+ "topic": "string",
+ "output": "string",
+ }
+
+ def run(self, **kwargs):
+ # Ros2 node to create the Publisher and print the log information
+ node = self.bb.get("main_node", None)
+ if node is None:
+ raise Exception("Could not find shared node, aborting...")
+ # Optional console handle to support Ctrl+C cancellation.
+ console = self.bb.get("console", None)
+
+ result = {
+ "subscribed": "False",
+ "subscribed_msgs": "",
+ "count": "0",
+ "topic": "",
+ }
+
+ topic_name = kwargs.get("topic", None)
+ max_duration = kwargs.get("max_duration", 60)
+ if not isinstance(max_duration, int):
+ max_duration = 60
+
+ max_lines = kwargs.get("max_lines", 200)
+ if not isinstance(max_lines, int):
+ max_lines = 200
+
+ # "--field data" prints only the data field from each message
+ # instead of the full YAML message
+ # "--no-arr" do not print array fields of messages
+ base_args = ["ros2", "topic", "echo", topic_name, "--field", "data", "--no-arr"]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = last_output_lines(console, self.name, ret, n_lines=10)
+
+ if ret is not None:
+ result["subscribed"] = "True"
+ result["count"] = len(ret)
+ result["topic"] = topic_name
+
+ return result
diff --git a/src/vulcanai/tools/tool_registry.py b/src/vulcanai/tools/tool_registry.py
index 686c8af..2ea5743 100644
--- a/src/vulcanai/tools/tool_registry.py
+++ b/src/vulcanai/tools/tool_registry.py
@@ -78,7 +78,7 @@ def run(self, **kwargs):
class ToolRegistry:
"""Holds all known tools and performs vector search over metadata."""
- def __init__(self, embedder=None, logger=None):
+ def __init__(self, embedder=None, logger=None, default_tools=True):
# Logging function from the class VulcanConsole
self.logger = logger or VulcanAILogger.default()
# Dictionary of tools (name -> tool instance)
@@ -97,6 +97,14 @@ def __init__(self, embedder=None, logger=None):
# Validation tools list to retrieve validation tools separately
self.validation_tools: List[str] = []
+ # Default tools
+ if default_tools:
+ try:
+ self.discover_tools_from_entry_points("ros2_default_tools")
+ except ImportError as e:
+ self.logger.log_msg(f"[error]{e}[/error]")
+ raise
+
def register_tool(self, tool: ITool, solve_deps: bool = True):
"""Register a single tool instance."""
# Avoid duplicates
diff --git a/src/vulcanai/tools/utils.py b/src/vulcanai/tools/utils.py
new file mode 100644
index 0000000..abef914
--- /dev/null
+++ b/src/vulcanai/tools/utils.py
@@ -0,0 +1,267 @@
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import difflib
+import heapq
+import subprocess
+import threading
+import time
+
+from textual.markup import escape # To remove potential errors in textual terminal
+
+
+async def run_streaming_cmd_async(
+ console, args: list[str], max_duration: float = 60, max_lines: int = 1000, echo: bool = True, tool_name=""
+) -> str:
+ # Unpack the command
+ cmd, *cmd_args = args
+
+ captured_lines: list[str] = []
+ process = None
+ try:
+ # Create the subprocess
+ process = await asyncio.create_subprocess_exec(
+ cmd,
+ *cmd_args,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ )
+
+ assert process.stdout is not None
+
+ start_time = time.monotonic()
+ line_count = 0
+
+ # Subprocess main loop. Read line by line
+ async for raw_line in process.stdout:
+ line = raw_line.decode(errors="ignore").rstrip("\n")
+
+ # Print the line
+ if echo:
+ if args[:3] == ["ros2", "topic", "echo"] and line:
+ msg = line.strip()
+ if msg == "---":
+ continue
+ msg = msg.strip("'\"")
+ line = f"[ROS] [INFO] I heard: [{msg}]"
+
+ captured_lines.append(line)
+ line_processed = escape(line)
+ if hasattr(console, "add_subprocess_line"):
+ console.add_subprocess_line(line_processed)
+ else:
+ console.add_line(line_processed)
+
+ # Count the line
+ line_count += 1
+ if max_lines is not None and line_count >= max_lines:
+ console.logger.log_tool(f"[tool]Stopping:[/tool] Reached max_lines = {max_lines}", tool_name=tool_name)
+ console.set_stream_task(None)
+ process.terminate()
+ break
+
+ # Check duration
+ if max_duration and (time.monotonic() - start_time) >= max_duration:
+ console.logger.log_tool(
+ f"[tool]Stopping:[/tool] Exceeded max_duration = {max_duration}s", tool_name=tool_name
+ )
+ console.set_stream_task(None)
+ process.terminate()
+ break
+
+ except asyncio.CancelledError:
+ # Task was cancelled → stop the subprocess
+ console.logger.log_tool("[tool]Cancellation received:[/tool] terminating subprocess...", tool_name=tool_name)
+ if process is not None:
+ process.terminate()
+
+ # Not necessary, textual terminal get the keyboard input
+ except KeyboardInterrupt:
+ # Ctrl+C pressed → stop subprocess
+ console.logger.log_tool("[tool]Ctrl+C received:[/tool] terminating subprocess...", tool_name=tool_name)
+ if process is not None:
+ process.terminate()
+
+ finally:
+ try:
+ if process is not None:
+ await asyncio.wait_for(process.wait(), timeout=3.0)
+ except asyncio.TimeoutError:
+ console.logger.log_tool("Subprocess didn't exit in time → killing it.", tool_name=tool_name, error=True)
+ if process is not None:
+ process.kill()
+ await process.wait()
+ finally:
+ if hasattr(console, "hide_subprocess_panel"):
+ console.hide_subprocess_panel()
+
+ return "\n".join(captured_lines)
+
+
+def execute_subprocess(console, tool_name, base_args, max_duration, max_lines):
+ stream_task = None
+ done_event = threading.Event()
+ result = {"output": ""}
+
+ def _launcher() -> None:
+ nonlocal stream_task
+ if hasattr(console, "show_subprocess_panel"):
+ console.show_subprocess_panel()
+
+ # This always runs in the Textual event-loop thread
+ loop = asyncio.get_running_loop()
+ stream_task = loop.create_task(
+ run_streaming_cmd_async(
+ console,
+ base_args,
+ max_duration=max_duration,
+ max_lines=max_lines,
+ tool_name=tool_name, # tool_header_str
+ )
+ )
+ # Keep the real task reference so Ctrl+C can cancel it.
+ console.set_stream_task(stream_task)
+
+ def _on_done(task: asyncio.Task) -> None:
+ try:
+ if not task.cancelled():
+ result["output"] = task.result() or ""
+ except Exception as e:
+ console.logger.log_msg(f"Echo task error: {e!r}\n", error=True)
+ finally:
+ done_event.set()
+
+ stream_task.add_done_callback(_on_done)
+
+ # `/rerun` workers can have their own asyncio loop in a non-UI thread.
+ # Route UI/task creation to Textual app thread unless we are already there.
+ if threading.current_thread() is threading.main_thread():
+ _launcher()
+ else:
+ # `console.app` is your Textual App instance.
+ console.app.call_from_thread(_launcher)
+
+ console.logger.log_tool("[tool]Subprocess created![tool]", tool_name=tool_name)
+ # Wait for streaming command to finish and return collected lines.
+ # In UI thread we avoid blocking to prevent deadlocks.
+ if threading.current_thread() is threading.main_thread():
+ return ""
+ done_event.wait()
+ return result["output"]
+
+
+def run_oneshot_cmd(args: list[str]) -> str:
+ try:
+ return subprocess.check_output(args, stderr=subprocess.STDOUT, text=True)
+
+ except subprocess.CalledProcessError as e:
+ raise Exception(f"Failed to run '{' '.join(args)}': {e.output}")
+
+
+def suggest_string(console, tool_name, string_name, input_string, real_string_list):
+ ret = None
+
+ def _similarity(a: str, b: str) -> float:
+ """Return a similarity score between 0 and 1."""
+ return difflib.SequenceMatcher(None, a, b).ratio()
+
+ def _get_suggestions(real_string_list_comp: list[str], string_comp: str) -> tuple[str, list[str]]:
+ """
+ Function used to search for the most "similar" string in a list.
+
+ Used in ROS2 cli commands to used the "simmilar" in case
+ the queried topic does not exists.
+
+ Example:
+ real_string_list_comp = [
+ "/parameter_events",
+ "/rosout",
+ "/turtle1/cmd_vel",
+ "/turtle1/color_sensor",
+ "/turtle1/pose",
+ ]
+ string_comp = "trtle1"
+
+ @return
+ str: the most "similar" string
+ list[str] a sorted list by a similitud value
+ """
+
+ topic_list_pq = []
+ n = len(string_comp)
+
+ for string in real_string_list_comp:
+ m = len(string)
+ # Calculate the similitud
+ score = _similarity(string_comp, string)
+ # Give more value for the nearest size comparations.
+ score -= abs(n - m) * 0.005
+ # Max-heap (via negative score)
+ heapq.heappush(topic_list_pq, (-score, string))
+
+ # Pop ordered list
+ ret_list: list[str] = []
+ _, most_topic_similar = heapq.heappop(topic_list_pq)
+
+ ret_list.append(most_topic_similar)
+
+ while topic_list_pq:
+ _, topic = heapq.heappop(topic_list_pq)
+ ret_list.append(topic)
+
+ return most_topic_similar, ret_list
+
+ # Add '/' for Topic, service, action, node
+ ros_categories_list = ["Topic", "Service", "Action", "Node"]
+ if string_name in ros_categories_list and len(input_string) > 0 and input_string[0] != "/":
+ input_string = f"/{input_string}"
+ ret = input_string
+
+ if input_string not in real_string_list:
+ console.logger.log_tool(f'{string_name}: "{input_string}" does not exists', tool_name=tool_name)
+
+ # Get the suggestions list sorted by similitud value
+ _, topic_sim_list = _get_suggestions(real_string_list, input_string)
+
+ # Open the ModalScreen
+ console.open_radiolist(topic_sim_list, tool_name, string_name, input_string)
+
+ # Wait for the user to select and item in the
+ # RadioList ModalScreen
+ console.suggestion_index_changed.wait()
+
+ # Check if the user cancelled the suggestion
+ if console.suggestion_index >= 0:
+ ret = topic_sim_list[console.suggestion_index]
+
+ # Reset suggestion index
+ console.suggestion_index = -1
+ console.suggestion_index_changed.clear()
+
+ return ret
+
+
+def last_output_lines(console, tool_name: str, output: str, n_lines: int = 10) -> str:
+ """
+ Keep only the last `max_lines` lines in tool output and log this behavior.
+ """
+ lines = output.splitlines()
+ if console is not None and hasattr(console, "logger"):
+ console.logger.log_tool(
+ f"Returning only the last {n_lines} lines in result['output'].",
+ tool_name=tool_name,
+ )
+ return "\n".join(lines[-n_lines:])
diff --git a/tests/unittest/test_default_tools.py b/tests/unittest/test_default_tools.py
new file mode 100644
index 0000000..58c4871
--- /dev/null
+++ b/tests/unittest/test_default_tools.py
@@ -0,0 +1,438 @@
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Unit tests for the ROS 2 default tools (ros2_topic).
+
+These tests require a working ROS 2 environment (rclpy importable and
+ros2 CLI available). Tests are automatically skipped when ROS 2 is not
+installed.
+
+Tests call the tool's ``run()`` method directly with a minimal blackboard
+containing a mock console that implements the methods the tool relies on
+(logging, suggestion handling, subprocess panel, stream task).
+Background ``ros2 topic pub`` processes are used to create observable topics.
+"""
+
+import asyncio
+import importlib
+import os
+import signal
+import subprocess
+import sys
+import threading
+import time
+import unittest
+
+# ---------------------------------------------------------------------------
+# Skip entire module when ROS 2 is not available
+# ---------------------------------------------------------------------------
+try:
+ import rclpy # noqa: F401
+
+ ROS2_AVAILABLE = True
+except ImportError:
+ ROS2_AVAILABLE = False
+
+# Also check that the ros2 CLI is on PATH
+try:
+ subprocess.check_output(["ros2", "topic", "list"], stderr=subprocess.STDOUT, timeout=10)
+ ROS2_CLI_AVAILABLE = True
+except Exception:
+ ROS2_CLI_AVAILABLE = False
+
+
+# Make src-layout importable
+CURRENT_DIR = os.path.dirname(__file__)
+SRC_DIR = os.path.abspath(os.path.join(CURRENT_DIR, os.path.pardir, os.path.pardir, "src"))
+if SRC_DIR not in sys.path:
+ sys.path.insert(0, SRC_DIR)
+
+
+# ---------------------------------------------------------------------------
+# Mock console — implements only the interface used by tools and utils
+# ---------------------------------------------------------------------------
+class _MockLogger:
+ """Collects log calls for optional inspection."""
+
+ def __init__(self):
+ self.messages = []
+
+ def log_tool(self, msg, **kwargs):
+ self.messages.append(msg)
+
+ def log_msg(self, msg, **kwargs):
+ self.messages.append(msg)
+
+ def log_console(self, msg, **kwargs):
+ self.messages.append(msg)
+
+
+class _MockApp:
+ """Provides an asyncio event loop on a background thread,
+ mimicking Textual's ``app.call_from_thread()`` for
+ ``execute_subprocess``."""
+
+ def __init__(self):
+ self._loop = asyncio.new_event_loop()
+ self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
+ self._thread.start()
+
+ def call_from_thread(self, fn, *args, **kwargs):
+ self._loop.call_soon_threadsafe(fn)
+
+ def stop(self):
+ self._loop.call_soon_threadsafe(self._loop.stop)
+ self._thread.join(timeout=5)
+
+
+class MockConsole:
+ """Implements the console interface expected by tools and their helpers.
+
+ Covers:
+ - ``logger`` — used for all tool logging
+ - ``app`` — used by execute_subprocess
+ (provides asyncio event loop)
+ - ``set_stream_task`` — used by execute_subprocess
+ - ``show/hide_subprocess_panel`` — used by execute_subprocess
+ - ``add_line / add_subprocess_line`` — used by run_streaming_cmd_async
+ - ``change_route_logs`` — used by streaming commands
+ - ``suggestion_index*`` — used by suggest_string when a topic
+ is not found (auto-selects first match)
+ - ``open_radiolist`` — used by suggest_string modal
+ """
+
+ def __init__(self):
+ self.logger = _MockLogger()
+ self.app = _MockApp()
+ # suggest_string support: auto-accept the first suggestion
+ self.suggestion_index = 0
+ self.suggestion_index_changed = threading.Event()
+ self.suggestion_index_changed.set() # unblock immediately
+ self._stream_task = None
+
+ def set_stream_task(self, task):
+ self._stream_task = task
+
+ def show_subprocess_panel(self):
+ pass
+
+ def hide_subprocess_panel(self):
+ pass
+
+ def change_route_logs(self, value):
+ pass
+
+ def add_line(self, text):
+ pass
+
+ def add_subprocess_line(self, text):
+ pass
+
+ def open_radiolist(self, items, tool_name, string_name, input_string):
+ # Auto-select index 0 (the best match) without user interaction
+ pass
+
+ def stop(self):
+ self.app.stop()
+
+
+# ---------------------------------------------------------------------------
+# Helper: background ROS 2 publisher
+# ---------------------------------------------------------------------------
+def start_background_publisher(
+ topic: str,
+ msg_type: str = "std_msgs/msg/String",
+ rate: float = 10.0,
+ message: str = "hello_test",
+):
+ """Launch ``ros2 topic pub`` in a subprocess and return the Popen handle."""
+ proc = subprocess.Popen(
+ [
+ "ros2",
+ "topic",
+ "pub",
+ topic,
+ msg_type,
+ f"{{data: '{message}'}}",
+ "--rate",
+ str(rate),
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ # Give the publisher a moment to register with the ROS graph
+ time.sleep(2)
+ return proc
+
+
+def stop_background_publisher(proc: subprocess.Popen):
+ """Terminate a background publisher gracefully."""
+ if proc.poll() is None:
+ proc.send_signal(signal.SIGINT)
+ try:
+ proc.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ proc.wait()
+
+
+# ===========================================================================
+# Test class
+# ===========================================================================
+@unittest.skipUnless(
+ ROS2_AVAILABLE and ROS2_CLI_AVAILABLE,
+ "ROS 2 (rclpy + ros2 CLI) not available — skipping",
+)
+class TestRos2TopicTool(unittest.TestCase):
+ """Direct tests for ``Ros2TopicTool.run()``.
+
+ Each test instantiates the tool, injects a blackboard with a
+ MockConsole and a ROS2DefaultToolNode, and calls ``run()`` directly.
+ """
+
+ # -----------------------------------------------------------------------
+ # Fixtures
+ # -----------------------------------------------------------------------
+ @classmethod
+ def setUpClass(cls):
+ """Import the default tools module and the ROS2DefaultToolNode."""
+ default_tools_mod = importlib.import_module("vulcanai.tools.default_tools")
+
+ cls.Ros2TopicTool = default_tools_mod.Ros2TopicTool
+ cls.ROS2DefaultToolNode = default_tools_mod.ROS2DefaultToolNode
+
+ def setUp(self):
+ self.console = MockConsole()
+ self.node = self.ROS2DefaultToolNode()
+ self._bg_publishers = []
+
+ def tearDown(self):
+ for proc in self._bg_publishers:
+ stop_background_publisher(proc)
+ self._bg_publishers.clear()
+ self.node.destroy_node()
+ self.console.stop()
+
+ @classmethod
+ def tearDownClass(cls):
+ if rclpy.ok():
+ rclpy.shutdown()
+
+ # -----------------------------------------------------------------------
+ # Helpers
+ # -----------------------------------------------------------------------
+ def _run_topic(self, **kwargs):
+ """Create a Ros2TopicTool, inject a blackboard, and call run()."""
+ tool = self.Ros2TopicTool()
+ tool.bb = {"console": self.console, "main_node": self.node}
+ return tool.run(**kwargs)
+
+ def _run_topic_threaded(self, **kwargs):
+ """Run the tool from a worker thread.
+
+ Streaming commands (bw, hz, delay) use ``execute_subprocess`` which
+ requires a non-main thread so it can call
+ ``console.app.call_from_thread()`` and then block on a done-event.
+ """
+ result = {}
+ error = {}
+
+ def worker():
+ try:
+ result["value"] = self._run_topic(**kwargs)
+ except Exception as e:
+ error["exc"] = e
+
+ t = threading.Thread(target=worker)
+ t.start()
+ t.join(timeout=30)
+ if "exc" in error:
+ raise error["exc"]
+ return result.get("value")
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic list
+ # -----------------------------------------------------------------------
+ def test_topic_list(self):
+ """`list` should succeed and contain /rosout."""
+ result = self._run_topic(command="list")
+
+ self.assertIn("output", result)
+ self.assertIn("/rosout", result["output"])
+
+ def test_topic_list_with_background_pub(self):
+ """After starting a background publisher, `list` should include
+ that topic."""
+ topic = "/vulcan_test_topic_list"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="list")
+
+ self.assertIn(topic, result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic info
+ # -----------------------------------------------------------------------
+ def test_topic_info(self):
+ """`info` on a published topic returns type and publisher count."""
+ topic = "/vulcan_test_topic_info"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="info", topic_name=topic)
+
+ self.assertIn("std_msgs/msg/String", result["output"])
+ self.assertIn("Publisher count:", result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic type
+ # -----------------------------------------------------------------------
+ def test_topic_type(self):
+ """`type` returns the message type of a topic."""
+ topic = "/vulcan_test_topic_type"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="type", topic_name=topic)
+
+ self.assertIn("std_msgs/msg/String", result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic find
+ # -----------------------------------------------------------------------
+ def test_topic_find(self):
+ """`find` locates topics by message type."""
+ topic = "/vulcan_test_topic_find"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="find", msg_type="std_msgs/msg/String")
+
+ self.assertIn(topic, result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic bw (streaming)
+ # -----------------------------------------------------------------------
+ def test_topic_bw(self):
+ """`bw` measures bandwidth on an active topic."""
+ topic = "/vulcan_test_topic_bw"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="bw",
+ topic_name=topic,
+ max_duration=5.0,
+ max_lines=20,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+ self.assertIsInstance(result["output"], str)
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic hz (streaming)
+ # -----------------------------------------------------------------------
+ def test_topic_hz(self):
+ """`hz` measures the publishing rate of a topic."""
+ topic = "/vulcan_test_topic_hz"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="hz",
+ topic_name=topic,
+ max_duration=5.0,
+ max_lines=20,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+ self.assertIsInstance(result["output"], str)
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic delay (streaming)
+ # -----------------------------------------------------------------------
+ def test_topic_delay(self):
+ """`delay` runs without error on an active topic.
+
+ Note: ``ros2 topic delay`` requires messages with a header stamp.
+ With ``std_msgs/msg/String`` (no header) the output may be empty
+ or contain a warning, but the command should not crash.
+ """
+ topic = "/vulcan_test_topic_delay"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="delay",
+ topic_name=topic,
+ max_duration=5.0,
+ max_lines=20,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+ self.assertIsInstance(result["output"], str)
+
+ # -----------------------------------------------------------------------
+ # Tests — streaming commands with custom max_duration / max_lines
+ # -----------------------------------------------------------------------
+ def test_topic_hz_custom_limits(self):
+ """`hz` respects custom max_duration and max_lines."""
+ topic = "/vulcan_test_topic_hz_limits"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="hz",
+ topic_name=topic,
+ max_duration=3.0,
+ max_lines=5,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+
+ # -----------------------------------------------------------------------
+ # Tests — error cases
+ # -----------------------------------------------------------------------
+ def test_topic_unknown_command_raises(self):
+ """An unknown subcommand should raise ValueError."""
+ with self.assertRaises((ValueError, TypeError)):
+ self._run_topic(command="nonexistent")
+
+ with self.assertRaises(ValueError):
+ self._run_topic(command="nonexistent", topic_name="/rosout")
+
+ def test_topic_info_missing_topic_name_raises(self):
+ """`info` without a topic_name should raise ValueError."""
+ with self.assertRaises((ValueError, TypeError)):
+ self._run_topic(command="info")
+
+ def test_topic_type_missing_topic_name_raises(self):
+ """`type` without a topic_name should raise ValueError."""
+ with self.assertRaises((ValueError, TypeError)):
+ self._run_topic(command="type")
+
+ def test_topic_find_missing_msg_type_raises(self):
+ """`find` without a msg_type should raise an exception."""
+ with self.assertRaises(Exception):
+ self._run_topic(command="find")
+
+
+if __name__ == "__main__":
+ unittest.main()