Практический учебный гайд по ROS 2 на примере AI-робота с архитектурой LLM + VLM + сенсоры + исполнительные узлы. Главная цель — показать базовые паттерны ROS 2 на живых примерах, которые можно запускать даже без реального железа.
- Для тех, кто изучает ROS 2 с нуля и хочет быстро понять узлы, топики, сервисы и общую архитектуру.
- Для тех, кто собирает AI-робота и хочет сначала обкатать логику локально, а потом перенести ее на реальную платформу.
Этот гайд вырос из реального проекта мобильного робота на ROS 2, где роль «мозга» выполняет AI-агент на базе OpenAI API: связка LLM + VLM, Realtime API и обычного API для логики, зрения, речи и принятия решений.
Целевая аппаратная платформа проекта такая:
- Шасси и движение — Wheeltec Ackerman, драйвер Pololu Dual на VNH5019, контроллер Raspberry Pi Pico 2.
- Вычислительный центр — Raspberry Pi 5 (8 GB RAM) с Active Cooler.
- Хранение — SanDisk Extreme microSDXC 128 GB.
- Зрение и голова — 3D-камера Orbbec Gemini 335, Pan-Tilt кронштейн Hiwonder 2 DoF, контроллер сервоприводов LSC-16.
- Интерфейсы и безопасность — фронтальные и тыловые ToF-сонары, USB-микрофон, I2S-усилитель Adafruit MAX98357A и динамик.
Этот репозиторий нужен не для демонстрации железа, а для понятного и воспроизводимого изучения ROS 2. Поэтому здесь акцент на архитектуре и учебных узлах-заглушках: камере, аудио, сонарах, AI-модулях и базовых ROS 2-сценариях, которые можно запускать локально. Сначала — понимание архитектуры в безопасной песочнице, потом — перенос на реального робота.
mock_sonar.py— виртуальный ToF-сонар, публикующий дистанцию в/sensor/sonar_front.safety_stop.py— простой safety-узел, реагирующий на близкое препятствие.head_controller.py— сервис управления Pan-Tilt головой черезset_head_position.mock_camera.py— виртуальная камера, публикующаяsensor_msgs/Imageв/camera/color/image_raw.vlm_agent_stub.py— заглушка VLM-узла, которая берет свежие кадры и имитирует отправку в vision-модель.mock_audio.py— эмуляция микрофона и динамика через ROS 2 топики.ai_driver.py— переводчик простых текстовых AI-команд вAckermannDriveStamped.electronic_differential.py— расчет скоростей левого и правого колеса для Ackermann-шасси.ai_brain.py— учебный центральный AI-узел, оркестрирующий сенсоры, речь, сервисы и команды движения.llm_parser.py— парсер структурированного JSON-ответа от LLM с валидацией через Pydantic.
- Базовый граф узлов ROS 2 — кто публикует, кто подписывается, кто вызывает сервисы.
- Сенсоры в симуляции — камера, сонар, аудио-вход и аудио-выход.
- AI-оркестрацию — как центральный узел принимает данные и инициирует действия.
- Кинематику Ackermann — как абстрактная команда движения превращается в параметры для шасси.
- Подготовку к LLM Function Calling — как безопасно принимать структурированные команды от модели.
Этот репозиторий показывает учебную и архитектурную основу. Некоторые узлы здесь намеренно реализованы как моки, стабы и демонстрационные сценарии, а не как production-ready интеграции с реальными драйверами, OpenAI API и железом. Это сделано специально, чтобы путь входа в ROS 2 был простым, наглядным и воспроизводимым.
Многие думают, что ROS (Robot Operating System) — это операционная система вроде Windows или Ubuntu. На самом деле это набор библиотек и инструментов (фреймворк/middleware), который помогает разработчикам создавать приложения для роботов.
Главная задача ROS 2 — заставить разные железки (лидары, камеры, моторы) и программы (алгоритмы зрения, навигацию) обмениваться данными быстро, надежно и по единому стандарту.
Ниже собраны главные термины, которые вам нужно знать, прежде чем писать код.
Аналогия: Сотрудники в большой компании или органы в теле человека.
Узел (Node) — это минимальная единица программы в ROS 2, которая выполняет одну конкретную задачу. Правильная архитектура робота состоит из множества маленьких узлов, а не из одной огромной программы.
- Пример 1: Узел
camera_nodeтолько получает картинку с камеры и передает её дальше. - Пример 2: Узел
wheel_motor_nodeтолько крутит колесо. - Пример 3: Узел
brain_nodeпринимает решения.
Если один узел упадет (например, зависнет драйвер камеры), остальные продолжат работать!
Аналогия: Радиостанция и радиослушатели.
Узлам нужно общаться. Главный способ общения в ROS 2 — это Топики. Это односторонняя непрерывная передача данных (Publish / Subscribe).
- Publisher (Издатель): Узел, который отправляет данные в топик.
- Subscriber (Подписчик): Узел, который читает данные из топика.
- Message (Сообщение): Строгий формат данных, который передается по топику (например, число, строка, массив данных с лазера, картинка).
Пример: Узел камеры (Publisher) непрерывно транслирует видео в топик /video_stream. Узел распознавания лиц (Subscriber) и узел записи видео (Subscriber) одновременно читают этот топик. Камере всё равно, кто её слушает и слушает ли вообще.
Аналогия: Вызов официанта в ресторане.
Топики транслируют данные постоянно (например, 30 раз в секунду). Но что, если нам нужно просто задать вопрос и получить ответ один раз? Для этого нужны Сервисы.
Сервисы работают по принципу Запрос -> Ответ (Request/Response). Это синхронная операция: пока клиент не получит ответ от сервера, он будет ждать.
- Пример: Узел навигации запрашивает у узла карты: "Дай мне текущую карту помещения" (Запрос). Узел карты отправляет файл (Ответ).
Аналогия: Заказ доставки пиццы. Вы делаете заказ, вам периодически приходят СМС о статусе ("Готовится", "Курьер в пути"), и в конце курьер отдает пиццу.
Экшены нужны для долгих задач. В отличие от Сервисов, они не блокируют систему, пока выполняется задача, и могут присылать промежуточный прогресс (Feedback). Экшен можно отменить в любой момент.
- Пример: Вы отправляете роботу команду: "Едь в соседнюю комнату". Робот начинает движение, раз в секунду шлет Feedback ("Осталось 5 метров... 4 метра..."), а в конце присылает Result ("Я на месте!").
Весь код в ROS 2 строго организован по папкам.
- Workspace (Рабочее пространство): Главная папка вашего проекта (то есть корень репозитория). Внутри неё всегда есть папка
src, где лежит исходный код. - Package (Пакет): Папка внутри
src, содержащая связанные узлы, библиотеки и настройки. Пакет — это то, чем разработчики делятся на GitHub. Пакеты можно писать на C++ или Python. - Colcon: Программа-сборщик. Она берет исходный код из папки
srcи превращает его в рабочие программы (собирает/компилирует).
ROS 2 имеет очень мощный интерфейс командной строки. Вот команды, которые вы будете использовать каждый день (выполнять в терминале с активированной средой ROS 2):
ros2 node list— показать все запущенные узлы.ros2 node info /node_name— показать детальную информацию об узле (какие топики он читает/пишет).
ros2 topic list— показать все активные топики в системе.ros2 topic echo /topic_name— вывести в терминал данные, которые прямо сейчас летят по топику (очень полезно для дебага!).ros2 topic hz /topic_name— проверить частоту (сколько сообщений в секунду приходит в топик).
ros2 run <package_name> <executable_name>— запустить конкретный узел из пакета (например,ros2 run demo_nodes_cpp talker).ros2 launch <package_name> <launch_file_name>.py— запустить сразу несколько узлов с заданными настройками (используется для запуска всего робота целиком).
- Пишите узлы, которые делают что-то одно, но хорошо.
- Связывайте их топиками для постоянного потока данных.
- Используйте сервисы для быстрых разовых запросов.
- Используйте экшены для долгих физических действий робота.
- Собирайте всё это через colcon и делитесь своими пакетами!
Этот метод решает главную проблему ROS 2 на современных Linux-системах — боль с графическими драйверами (Wayland, Intel Iris) при использовании Docker.
Почему Pixi?
- 100% Изоляция: ROS 2 устанавливается локально в папку вашего проекта (
.pixi). Ваша основная система остается девственно чистой. Никаких конфликтов зависимостей! - Нативная графика: RViz2 и Gazebo работают мгновенно, используя родные драйверы Ubuntu. Не нужно пробрасывать
xhost,DISPLAYили мучиться с флагами Docker. - Удаление в один клик: Хотите снести ROS? Просто удалите папку проекта.
Откройте терминал и выполните установку (если Pixi еще не установлен):
curl -fsSL https://pixi.sh/install.sh | bashpixi стала доступна.
Стандартные каналы conda могут не содержать свежих сборок Jazzy. Мы подключим прямой сервер prefix.dev.
Если вы клонировали этот репозиторий с GitHub, просто перейдите в папку проекта и инициализируйте среду:
git clone https://github.com/pipedude/ros2-ai-robot-guide-ru.git
cd ros2-ai-robot-guide-ru
pixi initДобавляем правильные источники пакетов (важен порядок!):
pixi project channel add https://prefix.dev/robostack-jazzy
pixi project channel add conda-forgeУстанавливаем полную версию ROS 2 Jazzy (Desktop) и инструмент colcon для сборки нашего будущего кода. Находясь в корне проекта, выполните:
pixi add ros-jazzy-desktop colcon-common-extensions(Загрузка займет несколько минут. Скачается около 1.5–2 Гб данных: Python, ROS, библиотеки Qt и симуляторы).
Проверим, подхватила ли среда вашу видеокарту:
pixi run rviz2Если открылось окно с 3D-сеткой (даже если есть предупреждение No tf data) — поздравляю, графическая подсистема работает идеально! Закройте окно (или нажмите Ctrl+C в терминале), чтобы продолжить.
Теперь напишем код. Чтобы не писать pixi run перед каждой командой, давайте "войдем" внутрь нашей изолированной среды:
pixi shell(Слева в терминале появится приписка с именем вашего проекта.)
Если вы работаете в Windsurf, удобно сразу открыть текущую папку проекта прямо из терминала:
windsurf .Точка в конце означает: открыть текущую папку проекта.
1. Создаем рабочую область и пакет:
mkdir src && cd src
ros2 pkg create --build-type ament_python my_first_package --dependencies rclpy std_msgs2. Пишем код узла:
Перейдите по пути src/my_first_package/my_first_package/ и создайте там файл hello_node.py. Вставьте этот код:
import rclpy
from rclpy.node import Node
class HelloNode(Node):
def __init__(self):
super().__init__('hello_node')
self.get_logger().info("Привет, ROS 2! Робот проснулся!")
self.count = 1
# Таймер: вызываем функцию каждую секунду
self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
self.get_logger().info(f"Я работаю уже {self.count} секунд. Полет нормальный!")
self.count += 1
def main(args=None):
rclpy.init(args=args)
node = HelloNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()3. Регистрируем скрипт:
Откройте файл setup.py (он находится в src/my_first_package/) и найдите блок entry_points в самом низу. Измените его так:
entry_points={
'console_scripts':[
'hello = my_first_package.hello_node:main'
],
},Если ты сейчас находишься в папке src, вернись на уровень выше — в корень проекта — и собери код:
cd ..
colcon buildПодгружаем новые собранные файлы в текущую сессию:
source install/setup.bashЗапускаем нашего робота:
ros2 run my_first_package helloОжидаемый вывод в терминале:
[INFO] [hello_node]: Привет, ROS 2! Робот проснулся![INFO] [hello_node]: Я работаю уже 1 секунд. Полет нормальный!
[INFO][hello_node]: Я работаю уже 2 секунд. Полет нормальный!
🎉 Вы великолепны! Теперь у вас есть мощная, надежная и абсолютно безопасная для вашей ОС среда для изучения робототехники.
Ниже — актуальный учебный маршрут для текущего состояния репозитория. Он привязан к тем узлам и сценариям, которые уже есть в проекте или прямо подготавливаются в рамках этого гайда.
Модуль 1: Базовый каркас (Среда и Узлы)
- Урок 1 (Начинаем сегодня): Инициализация ROS 2 через Pixi, создание рабочего пространства (workspace), написание первых Python-узлов (Node) в Windsurf.
- Урок 2: Топики (Topics). Создаем виртуальный ToF-сонар (Publisher) и узел безопасности (Subscriber), который будет кричать "Стоп!", если препятствие близко.
- Урок 3: Сервисы и Экшены (Services & Actions). Написание сервиса для Pan-Tilt "головы" (повернуть камеру на заданный угол) и экшена для длительных задач ("едь к точке").
Модуль 2: Кинематика и Управление (Шасси Ackerman)
- Урок 4: Сообщения управления. Изучение
ackermann_msgs. Как преобразовать команды от AI (вперед, лево, право) в угол поворота рулевых колес и скорость (для связки Pico 2 + VNH5019). - Урок 5: TF2 (Трансформации). Понимание систем координат робота: как "глаза" (камера Gemini 335) расположены относительно центра колес.
- Урок 6: Визуализация. Запуск RViz2. Виртуальное отображение нашего робота и "фейковых" данных с сенсоров без реального железа.
Модуль 3: Органы чувств (Зрение и Слух)
- Урок 7: Работа с изображениями. Подписка на
sensor_msgs/Image. Как в Python перехватывать кадры для отправки в VLM (OpenAI Vision). - Урок 8: Работа со звуком в ROS 2. Как упаковать аудиопоток с микрофона в топики и как воспроизводить звук (ответы LLM) через Adafruit I2S Amp.
Модуль 4: AI-Оркестратор (Ядро проекта)
- Урок 9: Учебный AI-мозг и оркестрация. Разбор узла
ai_brain.py, который слушает аудио и сенсоры, вызывает сервис головы и публикует команды движения. - Урок 10: Парсинг команд от LLM в ROS 2. Как заставить LLM выдавать структурированные JSON-команды, которые узел
llm_parser.pyпереведет в движения Ackermann и повороты сервоприводов.
Важно: Мост к реальному железу, Micro-ROS, запуск Orbbec Gemini на Raspberry Pi 5 и аппаратная интеграция будут добавлены позже. В текущую версию гайда они намеренно не включены.
В IDE Windsurf (можешь использовать VS Code, мне нравится Windsurf) мы будем использовать pixi для управления зависимостями. Это избавит нас от боли с системными зависимостями Python и ROS 2.
Шаг 1: Инициализация проекта Открой терминал в Windsurf и перейди в папку проекта:
# Если репозиторий уже клонирован с GitHub
cd ros2-ai-robot-guide-ru
# Если начинаешь с нуля без клонирования
# mkdir my_project && cd my_project
pixi initТеперь добавим каналы https://prefix.dev/robostack-jazzy и conda-forge (как в текущем pixi.toml) и установим базовые пакеты ROS 2 Jazzy и Python:
pixi project channel add https://prefix.dev/robostack-jazzy
pixi project channel add conda-forge
# Устанавливаем базовые ROS 2 пакеты и инструменты для сборки
pixi add ros-jazzy-desktop ros-jazzy-rclpy compilers cmake pkg-configШаг 2: Создание первого скрипта (Виртуальный "Мозг" и "Сонар")
Пока мы не создали сложную структуру ROS-пакетов (с colcon), давай напишем простой скрипт, чтобы понять суть rclpy (ROS Client Library for Python).
Создай файл mock_sonar.py в корне проекта:
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import random
class MockSonarNode(Node):
def __init__(self):
# Название узла в графе ROS 2
super().__init__('mock_sonar_front')
# Создаем Publisher: публикуем дистанцию (Float32) в топик '/sensor/sonar_front'
self.publisher_ = self.create_publisher(Float32, '/sensor/sonar_front', 10)
# Таймер, который вызывает функцию каждую 1 секунду
timer_period = 1.0
self.timer = self.create_timer(timer_period, self.timer_callback)
self.get_logger().info('Виртуальный передний ToF-сонар запущен!')
def timer_callback(self):
msg = Float32()
# Симулируем дистанцию от 0.2 до 3.0 метров
msg.data = random.uniform(0.2, 3.0)
self.publisher_.publish(msg)
self.get_logger().info(f'Публикую дистанцию: {msg.data:.2f} м')
def main(args=None):
rclpy.init(args=args)
node = MockSonarNode()
try:
rclpy.spin(node) # Крутим цикл, пока не нажмем Ctrl+C
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Шаг 3: Запуск Так как мы используем Pixi, нам нужно запускать скрипт в его окружении:
pixi run python mock_sonar.pyТы должен увидеть в терминале логи, как твой виртуальный сонар (будущий ToF) начинает публиковать данные.
- Выполни эти шаги в Windsurf.
- Проверь, работает ли скрипт.
- Дополнительный челлендж: Пока скрипт работает, открой второй терминал в Windsurf, активируй оболочку pixi (
pixi shell) и введи командуros2 topic list, а затемros2 topic echo /sensor/sonar_front.
Что сейчас произошло "под капотом"?
В ROS 2 нет центрального "сервера" (как это было в ROS 1). Когда ты запустил свой Python-скрипт, он создал узел-Publisher. Когда ты ввел команду ros2 topic echo, ROS 2 временно создал безымянный узел-Subscriber. Благодаря протоколу DDS (Data Distribution Service) они автоматически "нашли" друг друга в локальной сети (в данном случае внутри твоей ОС) и начали обмениваться сообщениями типа Float32 по каналу (топику) /sensor/sonar_front.
Так как твоим роботом будет управлять AI-агент (LLM), нам нужно учитывать задержки сети и возможные галлюцинации нейросети. Если LLM скажет "Едь вперед", а перед роботом стена, робот не должен ждать ответа от AI — у него должен сработать базовый рефлекс.
Сейчас мы напишем узел-подписчик, который будет слушать наш сонар и кричать "СТОП!", если дистанция меньше 0.5 метра.
Шаг 1: Создаем скрипт подписчика
В корне твоего проекта (там же, где mock_sonar.py) создай новый файл safety_stop.py:
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
class SafetyNode(Node):
def __init__(self):
super().__init__('safety_node')
# Создаем Subscriber.
# Аргументы: Тип сообщения, Имя топика, Функция-колбек, Размер очереди(QoS)
self.subscription = self.create_subscription(
Float32,
'/sensor/sonar_front',
self.sonar_callback,
10
)
self.subscription # предотвращаем удаление переменной сборщиком мусора
# Задаем критическую дистанцию в метрах
self.critical_distance = 0.5
self.get_logger().info('Узел безопасности запущен. Охраняю периметр!')
def sonar_callback(self, msg):
# Эта функция вызывается АВТОМАТИЧЕСКИ каждый раз, когда в топик приходит новое сообщение
distance = msg.data
if distance < self.critical_distance:
# Используем уровень логирования WARN (предупреждение) - он выделится цветом
self.get_logger().warning(f'АВАРИЙНАЯ ОСТАНОВКА! Препятствие на расстоянии {distance:.2f} м!')
else:
self.get_logger().info(f'Путь свободен (до препятствия {distance:.2f} м).')
def main(args=None):
rclpy.init(args=args)
safety_node = SafetyNode()
try:
rclpy.spin(safety_node) # Ждем сообщений бесконечно
except KeyboardInterrupt:
pass
finally:
safety_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Шаг 2: Запускаем связку узлов
Для этого нам понадобятся оба терминала в Windsurf:
- В первом терминале убедись, что работает твой "сонар" (если остановил, запусти снова):
pixi run python mock_sonar.py
- Во втором терминале останови
echo(Ctrl+C), убедись, что ты в корне проекта, и запусти узел безопасности:pixi run python safety_stop.py
Ты увидишь, как второй скрипт читает данные от первого. Большую часть времени он будет писать "Путь свободен", но как только random в первом скрипте выдаст число меньше 0.5, второй скрипт выдаст желтое/оранжевое предупреждение АВАРИЙНАЯ ОСТАНОВКА!.
В будущем реальном роботе именно этот safety_node будет перехватывать управление двигателями на Raspberry Pi Pico 2 и игнорировать команды движения от LLM, пока путь не освободится.
Теперь мы переходим к третьему уроку, который критически важен для твоего робота.
В чем отличие от Топиков?
- Топик — это радиостанция. Сонар (Publisher) бесконечно кричит в эфир "Дистанция 1 метр!", а Безопасность (Subscriber) это слушает. Это поток данных без гарантии доставки.
- Сервис — это звонок по телефону (Запрос -> Ответ). Клиент говорит: "Поверни голову на 45 градусов вправо". Сервер делает это и отвечает: "Готово, повернул".
Для твоего кронштейна Hiwonder 2 DoF Pan Tilt (где стоят два сервопривода: один крутит головой влево-вправо / Pan, другой вверх-вниз / Tilt) сервисы подходят идеально!
Твой будущий AI-агент (LLM) не будет постоянно слать координаты. Он скажет один раз: "Посмотри на человека слева" -> вызовет Сервис -> дождется ответа, что камера повернулась -> возьмет кадр с Orbbec Gemini.
Пока мы не создали свой тип сообщений (для этого понадобится сборщик colcon, к которому мы перейдем позже), мы используем стандартный ROS 2 тип сервиса AddTwoInts (Сложение двух чисел).
Давай "схитрим" и представим, что:
- Число
a— это угол Pan (влево-вправо, от -90 до 90 градусов) - Число
b— это угол Tilt (вверх-вниз, от -90 до 90 градусов)
Создай файл head_controller.py в корне проекта:
import rclpy
from rclpy.node import Node
# Импортируем стандартный тип сервиса (Запрос: a, b. Ответ: sum)
from example_interfaces.srv import AddTwoInts
import time
class HeadControllerService(Node):
def __init__(self):
super().__init__('head_controller')
# Создаем Service Server
# Аргументы: Тип сервиса, Имя сервиса, Функция-обработчик
self.srv = self.create_service(AddTwoInts, 'set_head_position', self.move_head_callback)
# Текущие углы (робот смотрит прямо)
self.current_pan = 0
self.current_tilt = 0
self.get_logger().info('Контроллер Pan-Tilt головы запущен. Жду команд (Сервисов)...')
def move_head_callback(self, request, response):
# Эта функция вызывается, когда кто-то "звонит" в этот сервис
target_pan = request.a
target_tilt = request.b
self.get_logger().info(f'Получена команда: повернуть голову на Pan={target_pan}°, Tilt={target_tilt}°')
# Имитируем физическое время работы сервоприводов LSC-16 (1 секунда)
self.get_logger().info('Моторы жжжжжж...')
time.sleep(1.0)
# Обновляем текущие координаты
self.current_pan = target_pan
self.current_tilt = target_tilt
# Формируем ответ клиенту.
# В AddTwoInts ответ хранится в поле "sum". Мы запишем туда 1 (успех)
response.sum = 1
self.get_logger().info('Движение завершено. Отправляю ответ.')
return response
def main(args=None):
rclpy.init(args=args)
head_controller = HeadControllerService()
try:
rclpy.spin(head_controller)
except KeyboardInterrupt:
pass
finally:
head_controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()В первом терминале запусти наш новый узел:
pixi run python head_controller.pyОткрой второй терминал (убедись, что ты в окружении pixi shell или используй префикс pixi run).
В ROS 2 есть потрясающая утилита для тестирования — мы можем стать "клиентом" прямо из командной строки!
Введи команду, которая прикажет роботу посмотреть вправо (45°) и чуть вверх (15°):
pixi run ros2 service call /set_head_position example_interfaces/srv/AddTwoInts "{a: 45, b: 15}"Что должно произойти:
- Во втором терминале ты увидишь запрос, а через секунду — ответ
response: example_interfaces.srv.AddTwoInts_Response(sum=1). - В первом терминале (где работает скрипт) ты увидишь логи: получение команды, звук моторов
Моторы жжжжжж...и рапорт об успехе.
Попробуй вызвать сервис пару раз с разными углами! Это точь-в-точь то действие, которое твоя LLM будет выполнять через Python-код, когда решит "оглядеться".
Как это вписывается в твой будущий AI-кейс:
В будущем твоя LLM через OpenAI API сгенерирует JSON, например: {"action": "look_around", "pan": 45, "tilt": 15}. Твой центральный Python-скрипт распарсит этот JSON и "дернет" именно этот ROS 2 сервис. Пока голова робота физически не повернется, LLM не получит кадр с камеры Orbbec Gemini.
(Примечание: В ROS 2 есть еще Actions / Экшены. Это как Сервисы, но для очень долгих задач, которые можно отменить в процессе — например, "едь в другую комнату". Мы изучим их чуть позже, когда дойдем до навигации).
А сейчас мы переходим к самому интересному!
Твое шасси Wheeltec использует Ackermann steering (Аккермановская рулевая трапеция). Это значит, что твой робот управляется как настоящий автомобиль: у него есть передние рулевые колеса (задают угол поворота) и задние ведущие колеса (задают скорость движения). Он не может крутиться на месте, как робот-пылесос.
В ROS 2 для таких роботов есть специальный стандарт сообщений — ackermann_msgs. В будущем твой контроллер Raspberry Pi Pico 2 будет подписан на эти сообщения и будет переводить их в ШИМ-сигналы для драйвера Pololu VNH5019.
Давай напишем узел "Транслятор", который будет слушать простые текстовые команды от AI (например, "вперед", "налево") и переводить их в физические команды для шасси.
Так как ackermann_msgs не входит в самую базовую поставку, нам нужно добавить его в наше окружение pixi. Открой терминал и введи:
pixi add ros-jazzy-ackermann-msgsСоздай файл ai_driver.py в корне проекта:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from ackermann_msgs.msg import AckermannDriveStamped
class AIDriverNode(Node):
def __init__(self):
super().__init__('ai_driver_translator')
# Подписчик на текстовые команды (Топик: /ai_command)
self.subscription = self.create_subscription(
String, '/ai_command', self.command_callback, 10)
# Паблишер физических команд для шасси (Топик: /drive)
self.publisher = self.create_publisher(
AckermannDriveStamped, '/drive', 10)
self.get_logger().info('Транслятор AI -> Ackermann запущен. Жду текстовых команд в топик /ai_command')
def command_callback(self, msg):
command = msg.data.lower()
# Создаем пустое сообщение Ackermann
drive_msg = AckermannDriveStamped()
# Логика перевода текста в физику:
# speed (м/с) - скорость
# steering_angle (радианы) - угол поворота руля
if command == 'вперед':
drive_msg.drive.speed = 0.5
drive_msg.drive.steering_angle = 0.0
elif command == 'налево':
drive_msg.drive.speed = 0.3
drive_msg.drive.steering_angle = 0.5 # Примерно 28 градусов влево
elif command == 'направо':
drive_msg.drive.speed = 0.3
drive_msg.drive.steering_angle = -0.5 # Примерно 28 градусов вправо
elif command == 'назад':
drive_msg.drive.speed = -0.3
drive_msg.drive.steering_angle = 0.0
elif command == 'стоп':
drive_msg.drive.speed = 0.0
drive_msg.drive.steering_angle = 0.0
else:
self.get_logger().warning(f'AI выдал неизвестную команду: {command}')
return
self.publisher.publish(drive_msg)
self.get_logger().info(f'AI сказал "{command}". Едем: Скорость={drive_msg.drive.speed} м/с, Руль={drive_msg.drive.steering_angle} рад')
def main(args=None):
rclpy.init(args=args)
node = AIDriverNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()-
В первом терминале запусти транслятор:
pixi run python ai_driver.py
-
Во втором терминале мы будем играть роль AI. Отправим команду "налево" в топик
/ai_command:pixi run ros2 topic pub --once /ai_command std_msgs/msg/String "{data: 'налево'}"Попробуй отправить разные команды: "вперед", "назад", "стоп", "прыжок" (чтобы проверить неизвестную команду).
-
(Дополнительно) В третьем терминале ты можешь посмотреть, как эти команды конвертируются в формат для моторов:
pixi run ros2 topic echo /drive
Тестирование: Отправьте команду через терминал и посмотрите на вывод. Но здесь есть важный нюанс: наше шасси использует два независимых мотора сзади без механического дифференциала. Как это решить?
Решение для двух моторов без дифференциала: В классическом автомобиле стоит механический дифференциал. Если задняя ось жестко заблокирована, колеса будут проскальзывать, робот будет "прыгать" в поворотах, а моторы и драйвер VNH5019 будут перегреваться.
Мы реализуем "Электронный дифференциал" (E-Diff).
Пакет ros-jazzy-ackermann-msgs описывает идеальную кинематическую модель. В сообщении AckermannDrive мы передаем желаемую скорость центра робота и желаемый угол поворота рулевых колес.
AI-агенту вообще не нужно знать, сколько у тебя моторов. Он просто говорит: "Едь вперед со скоростью 0.5 м/с и поверни руль на 0.3 радиана".
А вот дальше, между топиком /drive и контроллером Raspberry Pi Pico 2, мы ставим специальный узел — Кинематический вычислитель (Kinematics Solver).
Вот как это будет работать математически:
Пусть
- Скорость левого колеса:
$V_{left} = V \cdot (1 - \frac{W}{2R})$ - Скорость правого колеса:
$V_{right} = V \cdot (1 + \frac{W}{2R})$
Это будет отличным продолжением Урока 4. Мы напишем скрипт, который слушает AckermannDrive из предыдущего шага и рассчитывает скорости для левого и правого мотора.
Создай файл electronic_differential.py в папке проекта:
import rclpy
from rclpy.node import Node
from ackermann_msgs.msg import AckermannDriveStamped
import math
class ElectronicDifferential(Node):
def __init__(self):
super().__init__('electronic_differential')
# Подписываемся на команды от AI (или навигации)
self.subscription = self.create_subscription(
AckermannDriveStamped, '/drive', self.drive_callback, 10)
# Габариты твоего шасси Wheeltec (примерные в метрах, потом замеришь рулеткой)
self.wheelbase = 0.25 # L: расстояние между передней и задней осью
self.track_width = 0.15 # W: расстояние между левым и правым задним колесом
self.get_logger().info('Электронный дифференциал запущен. Жду команды /drive...')
def drive_callback(self, msg):
v = msg.drive.speed
delta = msg.drive.steering_angle
# Если едем прямо или стоим (угол поворота близок к нулю)
if abs(delta) < 0.01:
v_left = v
v_right = v
else:
# Радиус поворота центра задней оси
R = self.wheelbase / math.tan(delta)
# Рассчитываем скорости для каждого колеса
v_left = v * (1 - self.track_width / (2 * R))
v_right = v * (1 + self.track_width / (2 * R))
# В будущем этот узел будет отправлять v_left, v_right и delta
# по Serial или I2C в Raspberry Pi Pico 2.
# А пока просто красиво логируем:
if v == 0:
status = "СТОИМ"
elif delta > 0.01:
status = f"ПОВОРОТ НАЛЕВО (Внутреннее колесо медленнее)"
elif delta < -0.01:
status = f"ПОВОРОТ НАПРАВО (Внутреннее колесо медленнее)"
else:
status = "ЕДЕМ ПРЯМО"
self.get_logger().info(
f'\n--- {status} ---\n'
f'Целевая: Скорость={v:.2f} м/с, Руль={delta:.2f} рад\n'
f'Мотор ЛЕВЫЙ: {v_left:.2f} м/с\n'
f'Мотор ПРАВЫЙ: {v_right:.2f} м/с\n'
f'Серво РУЛЬ: {delta:.2f} рад'
)
def main(args=None):
rclpy.init(args=args)
node = ElectronicDifferential()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()- Запусти
ai_driver.pyиз предыдущего шага в первом терминале. - Запусти
electronic_differential.pyво втором терминале. - В третьем терминале отправь команду
налевочерез топик (как мы делали в прошлом уроке):pixi run ros2 topic pub --once /ai_command std_msgs/msg/String "{data: 'налево'}"
Что ты увидишь во втором терминале:
Когда придет команда налево, ai_driver.py скажет: "едем со скоростью 0.3 и поворачиваем на 0.5 рад".
А новый узел подхватит это и высчитает, что левый мотор должен крутиться медленнее, а правый — быстрее.
Эта архитектурная прослойка развязывает высокоуровневый ИИ и физику конкретного железа. Когда приедет Raspberry Pi Pico 2, этот код либо переедет в микроконтроллер, либо останется в Python на RPi 5, отправляя готовые скорости колес.
Попробуй запустить и отправить команду поворота! Это потрясающее чувство, когда математика оживает в логах.
Мы завершаем второй модуль (Кинематика и Управление) важнейшей темой, без которой компьютерное зрение (VLM) и навигация просто не смогут работать.
Зачем это твоему AI-роботу? Представь: твоя 3D-камера Orbbec Gemini 335 видит препятствие ровно в 1 метре от себя. VLM-модель говорит: "Препятствие на расстоянии 1 метр". Но камера стоит на кронштейне в передней части робота, а центр вращения колес (откуда робот считает свои координаты) находится на 20 сантиметров сзади. Если робот поедет вперед на 1 метр, он врежется в препятствие!
Чтобы этого не происходило, в ROS 2 есть TF2 (Transform Framework). Это система, которая хранит "дерево" систем координат. У нас будут как минимум две точки:
base_link— центр робота (между задними колесами).camera_link— глаз камеры.
Давай посмотрим на это своими глазами в виртуальном пространстве, раз у нас пока нет "железа"!
RViz2 (ROS Visualization) — это мощнейший 3D-инструмент. Он позволяет видеть то, что "думает" робот.
Открой терминал в Windsurf и введи:
pixi run rviz2Откроется черное окно с сеткой. Это пустое сознание твоего робота.
- В левой панели (Displays) найди поле Fixed Frame. Сейчас там написано
map. Измени это слово наbase_link(впиши вручную и нажми Enter). - Нажми кнопку Add (внизу слева) -> Выбери по вкладке By topic -> Найди
/tf-> Выбери TF -> Нажми OK. Пока ничего не появится, потому что мы еще не создали наши координаты. Не закрывай RViz2!
Так как кронштейн прикручен к шасси жестко, расстояние между центром робота и основанием головы не меняется. Для таких вещей в ROS 2 есть встроенный узел, и нам даже не нужно писать Python-код!
Открой второй терминал в Windsurf и введи эту команду (она запустит стандартный узел ROS 2 для публикации статических трансформаций):
pixi run ros2 run tf2_ros static_transform_publisher --x 0.15 --y 0.0 --z 0.25 --yaw 0 --pitch 0 --roll 0 --frame-id base_link --child-frame-id camera_linkЧто значат эти цифры? Мы говорим ROS 2: "Камера (camera_link) находится на 15 см спереди (x 0.15) и на 25 см выше (z 0.25) центра робота (base_link)".
Вернись в окно RViz2. Ты должен увидеть, как в центре сетки появились оси координат!
- Одна координата в центре сетки — это твой
base_link(центр задней оси). - Вторая координата висит чуть спереди и сверху в воздухе — это твой
camera_link(где будет стоять Orbbec Gemini). - Между ними будет желтая линия, показывающая их связь.
(Красная стрелка = ось X / вперед, Зеленая = ось Y / влево, Синяя = ось Z / вверх).
- Запусти
rviz2и командуstatic_transform_publisher. - Настрой отображение TF в RViz2.
- Сделай скриншот этого 3D-пространства.
Почему это так круто для твоего VLM-агента? Когда VLM (OpenAI) распознает объект на картинке с камеры (например, "вижу мяч в 1 метре от меня"), ROS 2 через этот TF-каркас автоматически пересчитает: "Ага, мяч в 1 метре от камеры, значит, от колес до него 1.15 метра. Можно ехать!". Тебе не придется считать эти смещения вручную в коде.
Переходим к Уроку 7. Так как физической камеры Orbbec Gemini 335 у нас пока нет, мы напишем Виртуальную Камеру, которая будет генерировать видеопоток. А затем напишем Узел-заглушку VLM-агента, который будет перехватывать эти кадры.
В мире ROS 2 и Python золотым стандартом для работы с картинками является связка OpenCV и библиотеки cv_bridge (она переводит картинки из формата OpenCV в формат сообщений ROS 2 sensor_msgs/Image и обратно).
Открой терминал в Windsurf (убедись, что ты в корне проекта) и добавь нужные пакеты в окружение Pixi:
pixi add ros-jazzy-cv-bridge ros-jazzy-sensor-msgs opencvСоздай файл mock_camera.py:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
class MockCamera(Node):
def __init__(self):
super().__init__('mock_camera')
# Публикуем в топик, стандартный для цветных камер Orbbec
self.publisher_ = self.create_publisher(Image, '/camera/color/image_raw', 10)
# Таймер для 10 FPS (1 кадр каждые 0.1 сек)
self.timer = self.create_timer(0.1, self.timer_callback)
self.bridge = CvBridge()
self.frame_count = 0
self.get_logger().info('Виртуальная камера Orbbec запущена (10 FPS)...')
def timer_callback(self):
# 1. Генерируем "фейковый" кадр с помощью OpenCV (черный фон 640x480)
img = np.zeros((480, 640, 3), dtype=np.uint8)
# 2. Рисуем меняющийся текст, чтобы видеть, что видео идет
text = f"Simulated Vision: Frame {self.frame_count}"
cv2.putText(img, text, (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
self.frame_count += 1
# 3. Конвертируем OpenCV картинку (numpy array) в ROS 2 Image Message
msg = self.bridge.cv2_to_imgmsg(img, encoding="bgr8")
# 4. Публикуем в топик
self.publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = MockCamera()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Отправлять видео 10-30 FPS в OpenAI API — это верный путь сжечь все лимиты и деньги за 5 минут. Поэтому архитектура AI-робота строится иначе: мы подписываемся на видеопоток, всегда сохраняем в памяти только самый свежий кадр, а в API отправляем его только раз в несколько секунд (или когда происходит важное событие/команда).
Создай файл vlm_agent_stub.py:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
class VLMAgentStub(Node):
def __init__(self):
super().__init__('vlm_agent')
self.subscription = self.create_subscription(
Image,
'/camera/color/image_raw',
self.image_callback,
10 # QoS (размер очереди)
)
self.bridge = CvBridge()
self.latest_cv_image = None
# Таймер, имитирующий обращение к OpenAI (например, раз в 3 секунды)
self.timer = self.create_timer(3.0, self.process_vision_api)
self.get_logger().info('AI Vision Агент запущен. Жду видеопоток...')
def image_callback(self, msg):
# Эта функция вызывается 10 раз в секунду.
# Мы НЕ делаем здесь тяжелых вычислений, просто обновляем кэш свежего кадра.
self.latest_cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
def process_vision_api(self):
# Эта функция вызывается раз в 3 секунды.
if self.latest_cv_image is not None:
height, width, channels = self.latest_cv_image.shape
self.get_logger().info(
f'>>> [API CALL] Беру свежий кадр размером {width}x{height} '
f'и "отправляю" его в OpenAI GPT-4o Vision...'
)
# В будущем здесь будет код:
# 1. cv2.imencode('.jpg', self.latest_cv_image)
# 2. base64.b64encode
# 3. client.chat.completions.create(...)
else:
self.get_logger().warning('Кадров пока нет...')
def main(args=None):
rclpy.init(args=args)
node = VLMAgentStub()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()- В первом терминале запусти виртуальную камеру:
pixi run python mock_camera.py
- Во втором терминале запусти AI-агента:
pixi run python vlm_agent_stub.py
(Дополнительно)
В ROS 2 (и RViz2) можно смотреть видео с камер!
3. В третьем терминале открой pixi run rviz2.
4. Нажми Add внизу слева -> Выбери по вкладке By topic -> Найди /camera/color/image_raw -> Выбери Image -> Нажми OK.
В левом нижнем углу RViz появится окошко, где будет транслироваться твое зелёное текстовое сообщение!
Запусти оба скрипта (камеру и агента) и посмотри на логи. VLM-агент должен регулярно сообщать об "отправке" кадров.
Ты только что реализовал асинхронный пайплайн компьютерного зрения! В будущем вместо заглушки text = ... в кадре будет реальная картинка твоей комнаты, а вместо get_logger().info — вызов библиотеки openai.
Мы плавно подошли к финалу нашей виртуальной сборки. У нас есть "Мышцы" (Сервисы головы и Эккермановское шасси), у нас есть "Зрение" и "Осязание" (Сонар). Чего не хватает? Слуха, Голоса и Мозга.
Так как твой робот будет использовать OpenAI Realtime API (которое работает напрямую с потоковым аудио, минуя стадию распознавания текста/STT на самом роботе), нам нужно подготовить "трубы" для передачи звука.
В реальном RPi 5 звук с твоего USB-микрофона и вывод на усилитель Adafruit I2S мы будем захватывать библиотекой sounddevice или pyaudio, упаковывая в сырые байты (PCM 16-bit).
Сейчас, чтобы не возиться с драйверами аудиокарты твоего ПК внутри виртуального окружения, мы создадим заглушки топиков. В терминале добавь пакет для сырых массивов:
pixi add ros-jazzy-std-msgsСоздай файл mock_audio.py. Он будет имитировать человека, который раз в 10 секунд говорит в микрофон команду, и "колонку", которая ждет ответов от AI:
import rclpy
from rclpy.node import Node
from std_msgs.msg import ByteMultiArray, String
import time
class AudioSystem(Node):
def __init__(self):
super().__init__('audio_system')
# Микрофон (Публикует аудио-байты)
self.mic_pub = self.create_publisher(ByteMultiArray, '/audio/mic_raw', 10)
# Динамик (Слушает аудио-байты от AI и "воспроизводит" их)
self.speaker_sub = self.create_subscription(
ByteMultiArray, '/audio/speaker_raw', self.speaker_callback, 10)
self.timer = self.create_timer(10.0, self.simulate_user_speech)
self.get_logger().info('Аудиосистема (Микрофон + Динамик Adafruit) запущена.')
def simulate_user_speech(self):
# Имитируем, что пользователь что-то сказал (например, "Робот, что ты видишь?")
msg = ByteMultiArray()
# В реальности здесь будет массив байтов PCM 16-bit.
# Мы упакуем фейковую строку в байты для теста
msg.data = b"VOICE_COMMAND: Look around and tell me what you see!"
self.get_logger().info('🎙️ Человек сказал фразу в микрофон...')
self.mic_pub.publish(msg)
def speaker_callback(self, msg):
# В реальности здесь байты отправляются на I2S Amp
# Мы просто декодируем для теста
text = bytes(msg.data).decode('utf-8', errors='ignore')
self.get_logger().info(f'🔊 ДИНАМИК ВОСПРОИЗВОДИТ: {text}')
def main(args=None):
rclpy.init(args=args)
node = AudioSystem()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()А теперь Самое Главное. Мы пишем центральный узел, который свяжет всё, что мы сделали в предыдущих уроках, воедино!
Этот узел будет получать аудио от микрофона, брать самый свежий кадр с камеры, проверять сонар, отправлять всё это в OpenAI (пока виртуально) и выдавать команды шасси, голове и динамику.
Создай файл ai_brain.py:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import ByteMultiArray, String, Float32
from example_interfaces.srv import AddTwoInts
import json
class AIBrain(Node):
def __init__(self):
super().__init__('ai_brain')
# --- ПОДПИСКИ (Органы чувств) ---
self.create_subscription(Image, '/camera/color/image_raw', self.image_cb, 10)
self.create_subscription(ByteMultiArray, '/audio/mic_raw', self.audio_cb, 10)
self.create_subscription(Float32, '/sensor/sonar_front', self.sonar_cb, 10)
# --- ПАБЛИШЕРЫ (Действия) ---
self.drive_pub = self.create_publisher(String, '/ai_command', 10)
self.speaker_pub = self.create_publisher(ByteMultiArray, '/audio/speaker_raw', 10)
# --- КЛИЕНТЫ СЕРВИСОВ ---
self.head_client = self.create_client(AddTwoInts, 'set_head_position')
# Состояние памяти робота
self.latest_image = None
self.front_distance = 999.0
self.get_logger().info('🧠 AI Brain запущен и ждет данных...')
def image_cb(self, msg):
self.latest_image = "КАДР_В_ПАМЯТИ" # В реальности: cv_bridge перехват
def sonar_cb(self, msg):
self.front_distance = msg.data
def audio_cb(self, msg):
user_voice = bytes(msg.data).decode('utf-8')
self.get_logger().info(f'🧠 Услышал голос: {user_voice}')
self.process_ai_reasoning()
def process_ai_reasoning(self):
self.get_logger().info('🧠 Отправляю [Голос + Фото + Данные Сонара] в OpenAI Realtime API...')
# --- ИМИТАЦИЯ ОТВЕТА ОТ OPENAI (LLM+VLM) ---
# В реальности OpenAI Realtime API пришлет нам аудио-поток с голосом
# и JSON с вызовом функций (Function Calling) для моторов.
if self.front_distance < 1.0:
ai_thought = "Вижу препятствие близко. Нужно остановиться и сказать об этом."
drive_cmd = "стоп"
voice_reply = b"I stopped because there is an obstacle right in front of me!"
else:
ai_thought = "Путь свободен. Поверну голову посмотреть и поеду вперед."
drive_cmd = "вперед"
voice_reply = b"Moving forward and scanning the area."
# Запрашиваем поворот головы (асинхронно)
self.turn_head(45, 0)
self.get_logger().info(f'🧠 Решение AI: {ai_thought}')
# Говорим голосом
speaker_msg = ByteMultiArray()
speaker_msg.data = voice_reply
self.speaker_pub.publish(speaker_msg)
# Даем команду шасси
drive_msg = String()
drive_msg.data = drive_cmd
self.drive_pub.publish(drive_msg)
def turn_head(self, pan, tilt):
if not self.head_client.wait_for_service(timeout_sec=1.0):
self.get_logger().warning('Сервис головы недоступен!')
return
req = AddTwoInts.Request()
req.a = pan
req.b = tilt
self.get_logger().info(f'🧠 Даю команду моторам головы: Pan={pan}, Tilt={tilt}')
self.head_client.call_async(req) # Асинхронный вызов
def main(args=None):
rclpy.init(args=args)
node = AIBrain()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Сейчас мы запустим учебную версию архитектуры, которая повторяет логику будущего робота, но пока работает на локальных моках и заглушках. Тебе понадобится открыть 5 терминалов (в Windsurf можно разделить панели) и запустить наши узлы из прошлых уроков в каждом из них:
- Терминал 1 (Сонар и Камера):
(Можно запустить в одном через
&или открыть два под-терминала)pixi run python mock_sonar.py - Терминал 2 (Железо - Голова и Шасси):
pixi run python head_controller.py - Терминал 3 (Железо - Транслятор E-Diff):
pixi run python ai_driver.py - Терминал 4 (Аудио система):
pixi run python mock_audio.py - Терминал 5 (Мозг AI):
pixi run python ai_brain.py
Что ты должен увидеть в этой симфонии:
Раз в 10 секунд mock_audio будет "кричать" в микрофон. ai_brain это услышит, проверит дистанцию от mock_sonar. Если дистанция больше 1 метра, Мозг отправит сервис-запрос в head_controller (он пожужжит моторами), отправит команду "вперед" в ai_driver, а в динамик скажет "Moving forward". Если сонар выдал < 1.0 метр, Мозг скажет "стоп".
Запусти это, понаблюдай за логами терминала 5 (Мозг AI) и терминала 2/3 (Железо). Если всё это свяжется — поздравляю, мы полностью спроектировали программную архитектуру твоего агента без реального железа!
В предыдущих уроках наш робот понимал только жестко заданные текстовые команды вроде "вперед" или "налево". Но современные LLM (особенно в связке с OpenAI Realtime API) могут выдавать структурированные ответы. Заставлять LLM писать текст, а потом парсить его через if/else — это антипаттерн в 2026 году.
Вместо этого мы используем Function Calling (Вызов функций) или Structured Outputs (Структурированные форматы). Мы дадим нейросети строгую схему того, что умеет робот, и она будет возвращать точный JSON с цифрами.
В Python золотым стандартом для валидации данных от LLM является библиотека Pydantic. OpenAI API нативно поддерживает генерацию ответов прямо в Pydantic-схемы.
Сейчас мы используем узел llm_parser.py, который принимает структурированный JSON от LLM и переводит его в прямые команды для шасси и головы. Это новый, более строгий путь управления, который может использоваться вместо старого текстового транслятора ai_driver.py.
Открой терминал в Windsurf и добавь библиотеку:
pixi add pydanticСоздай файл llm_parser.py в корне проекта:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from ackermann_msgs.msg import AckermannDriveStamped
from example_interfaces.srv import AddTwoInts
from pydantic import BaseModel, Field, ValidationError
from typing import Literal, Union, List
# --- ОПРЕДЕЛЯЕМ СХЕМУ (API НАШЕГО РОБОТА ДЛЯ LLM) ---
class DriveCommand(BaseModel):
action: Literal["drive"]
speed: float = Field(..., description="Скорость в м/с. От -0.5 (назад) до 0.5 (вперед)")
steering: float = Field(..., description="Угол руля в радианах. От -0.5 (вправо) до 0.5 (влево)")
class HeadCommand(BaseModel):
action: Literal["turn_head"]
pan: int = Field(..., description="Поворот головы влево-вправо (градусы, от -90 до 90)")
tilt: int = Field(..., description="Наклон головы вверх-вниз (градусы, от -90 до 90)")
class LlmResponse(BaseModel):
thought: str = Field(..., description="Мысли агента вслух: почему он принял такое решение")
commands: List[Union[DriveCommand, HeadCommand]] = Field(..., description="Список действий")
# --- ROS 2 УЗЕЛ ---
class LlmCommandParser(Node):
def __init__(self):
super().__init__('llm_parser')
# Слушаем сырой JSON от LLM
self.subscription = self.create_subscription(
String, '/llm_raw_json', self.json_callback, 10)
# Паблишер напрямую в Электронный Дифференциал (минуя текстовый транслятор)
self.drive_pub = self.create_publisher(AckermannDriveStamped, '/drive', 10)
# Клиент для управления головой
self.head_client = self.create_client(AddTwoInts, 'set_head_position')
self.get_logger().info('🤖 LLM JSON Парсер запущен. Жду структурированных команд...')
def json_callback(self, msg):
raw_json = msg.data
try:
# 1. Pydantic автоматически валидирует JSON!
# Если LLM ошиблась или погаллюцинировала, код выбросит ошибку, и робот не сойдет с ума.
parsed_data = LlmResponse.model_validate_json(raw_json)
self.get_logger().info(f'💡 Мысль AI: "{parsed_data.thought}"')
# 2. Выполняем список команд по очереди
for cmd in parsed_data.commands:
if cmd.action == "drive":
self.execute_drive(cmd.speed, cmd.steering)
elif cmd.action == "turn_head":
self.execute_head(cmd.pan, cmd.tilt)
except ValidationError as e:
self.get_logger().error(f'❌ LLM прислала кривой JSON (Галлюцинация): {e}')
def execute_drive(self, speed, steering):
drive_msg = AckermannDriveStamped()
drive_msg.drive.speed = speed
drive_msg.drive.steering_angle = steering
self.drive_pub.publish(drive_msg)
self.get_logger().info(f'🚗 [ШАССИ] Отправлена команда: скорость {speed} м/с, руль {steering} рад')
def execute_head(self, pan, tilt):
if self.head_client.wait_for_service(timeout_sec=0.2):
req = AddTwoInts.Request(a=pan, b=tilt)
self.head_client.call_async(req)
self.get_logger().info(f'👀 [ГОЛОВА] Отправлен запрос: Pan={pan}°, Tilt={tilt}°')
else:
self.get_logger().warning('⚠️ Сервис головы не в сети!')
def main(args=None):
rclpy.init(args=args)
node = LlmCommandParser()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()В будущем, когда ты будешь настраивать OpenAI API, ты сможешь использовать эти схемы как основу для Structured Outputs / Function Calling, чтобы модель возвращала не свободный текст, а валидируемую структуру данных. Тогда ROS 2-узел сможет сразу переводить ответ модели в физические действия.
Давай сымитируем идеальный ответ от LLM. Нейросеть увидела что-то интересное справа, решила повернуть туда голову и медленно подъехать.
- В первом терминале запусти наш E-Diff (Электронный дифференциал), чтобы он принял команду для шасси:
pixi run python electronic_differential.py
- Во втором терминале запусти Сервер головы:
pixi run python head_controller.py
- В третьем терминале запусти наш новый узел Парсера:
pixi run python llm_parser.py
- В четвертом терминале мы отправим "ответ от нейросети" (JSON-строку) в топик
/llm_raw_json:pixi run ros2 topic pub --once /llm_raw_json std_msgs/msg/String '{data: "{\"thought\": \"Вижу интересную тень справа. Поверну камеру и медленно подъеду.\", \"commands\":[{\"action\": \"turn_head\", \"pan\": -45, \"tilt\": 10}, {\"action\": \"drive\", \"speed\": 0.2, \"steering\": -0.3}]}"}'
Что произойдет?
Парсер llm_parser.py мгновенно раскусит этот JSON, напишет мысль AI в консоль, а затем асинхронно пнет сервис head_controller и топик /drive (где его радостно встретит electronic_differential.py и рассчитает скорости для колес).
Это — абсолютная архитектурная вершина для роботов с LLM. Попробуй запустить этот тест.
- Репозиторий уже полезен для изучения ROS 2 без физического робота.
- Основные учебные узлы уже есть в коде и соответствуют большей части гайда.
- OpenAI-интеграция пока показана на уровне архитектуры и заглушек, а не как готовый production-клиент.
- Аппаратный мост к реальному железу пока отложен и сознательно не входит в текущую версию README.