-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/toponav #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat/toponav #2
Changes from all commits
bb4f753
ef5611d
7235987
b1d440e
7391263
84cf662
cfa8770
a3fe014
027571d
58bb478
b30be61
8062a09
346702e
eb27a56
3955fac
67f789a
70fce1b
85652f1
e069585
833b7db
622de7d
a377e0f
2c215f3
799acdf
07066cc
8119420
f9c8c20
3221057
1506bf5
5c74b35
caef8a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
|
|
||
| import argparse | ||
| import math | ||
| import sys | ||
| from collections import deque | ||
| from pathlib import Path | ||
| from typing import Deque, Optional, Tuple | ||
|
|
@@ -15,11 +16,26 @@ | |
| import torch | ||
| from PIL import Image as PILImage | ||
|
|
||
| _THIS_FILE = Path(__file__).resolve() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. この辺のpath周りの設定,変更が不要なようにも思えるのですが,変更が必要なら理由を教えてください |
||
| _REPO_ROOT_CANDIDATES = [ | ||
| _THIS_FILE.parents[2], | ||
| _THIS_FILE.parents[4] / "src" / "NavVLA" if len(_THIS_FILE.parents) > 4 else None, | ||
| ] | ||
| for _repo_root in reversed([path for path in _REPO_ROOT_CANDIDATES if path is not None and (path / "OmniVLA").exists()]): | ||
| for _path in (_repo_root, _repo_root / "OmniVLA", _repo_root / "OmniVLA" / "inference"): | ||
| if str(_path) not in sys.path: | ||
| sys.path.insert(0, str(_path)) | ||
|
|
||
| _INSTALLED_INFERENCE_DIR = _THIS_FILE.parents[1] / "OmniVLA" / "inference" | ||
| if _INSTALLED_INFERENCE_DIR.exists() and str(_INSTALLED_INFERENCE_DIR) not in sys.path: | ||
| sys.path.insert(0, str(_INSTALLED_INFERENCE_DIR)) | ||
|
|
||
| from OmniVLA.inference.utils_policy import ( | ||
| load_model, | ||
| transform_images_PIL_mask, | ||
| ) | ||
| from .preprocess import build_mask, build_omnivla_edge_inputs, image_to_cv2, load_yaml | ||
| from .preprocess import build_mask, build_omnivla_edge_inputs, image_msg_to_bgr, image_to_cv2, load_yaml | ||
| from .toponav import TopologicalNavigator | ||
|
|
||
| import rclpy | ||
| from geometry_msgs.msg import PoseStamped, Twist | ||
|
|
@@ -41,6 +57,7 @@ def __init__( | |
| self.autonomous_flag = False | ||
| self.context_queue = [] | ||
| self.obs_image = None | ||
| self.obs_image_bgr = None | ||
| self.package_share_dir = package_share_dir | ||
|
|
||
| self.nav_cfg = load_yaml(nav_config_path) | ||
|
|
@@ -49,6 +66,7 @@ def __init__( | |
| self.init_params() | ||
| self.init_model() | ||
| self.init_model_modality() | ||
| self.init_toponav() | ||
|
|
||
| self.image_sub = self.create_subscription(Image, "/image_raw", self.image_callback, 10) | ||
| self.autonomous_sub = self.create_subscription(Bool, "/autonomous", self.autonomous_callback, 10) | ||
|
|
@@ -61,7 +79,9 @@ def __init__( | |
|
|
||
| def init_params(self) -> None: | ||
| self.context_size = self.nav_cfg.get("context_size", 5) | ||
| self.waypoint_spacing = self.nav_cfg.get("metric_waypoint_spacing", 0.1) | ||
| self.metric_waypoint_spacing = self.nav_cfg.get("metric_waypoint_spacing", 0.1) | ||
| self.waypoint_spacing = self.nav_cfg.get("waypoint_spacing", 1) | ||
| self.action_scale = self.metric_waypoint_spacing * self.waypoint_spacing | ||
| self.waypoint_select = self.nav_cfg.get("waypoint_select", 4) | ||
| self.linear_max_vel = self.nav_cfg.get("linear_max_vel", 0.3) | ||
| self.angular_max_vel = self.nav_cfg.get("angular_max_vel", 0.3) | ||
|
|
@@ -151,7 +171,46 @@ def init_model_modality(self) -> None: | |
| self.goal_image_tensor = transform_images_PIL_mask(goal_pil, self.mask_goal).to(self.device) | ||
| self.goal_pose_tensor = torch.tensor([goal_pose], dtype=torch.float32, device=self.device) | ||
| self.modality_tensor = torch.tensor([self.modality_id], dtype=torch.long, device=self.device) | ||
|
|
||
|
|
||
| def _update_text_feature(self) -> None: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. メソッド名に統一感が無いです |
||
| prompt = self.latest_prompt if self.use_prompt else "No language instruction" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. modality id次第で切り替えればuse_promptというパラメータを用意せずに済みそうですね その際にプロンプトが与えられていないのであれば,エラーを返すようにしたほうが良いと思います. |
||
| token = clip.tokenize(prompt, truncate=True).to(self.device) | ||
| with torch.no_grad(): | ||
| self.feat_text = self.text_encoder.encode_text(token) | ||
|
|
||
| def resolve_package_path(self, raw_path: str) -> Path: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ちょっと気持ち悪いことをしている |
||
| path = Path(raw_path) | ||
| return path if path.is_absolute() else self.package_share_dir / path | ||
|
|
||
| def init_toponav(self) -> None: | ||
| self.use_toponav = bool(self.nav_cfg.get("use_toponav", False)) | ||
| self.toponav = None | ||
| self.toponav_current_index = None | ||
| self.toponav_goal_index = None | ||
| self.toponav_min_score = float(self.nav_cfg.get("toponav_min_score", -1.0)) | ||
|
|
||
| if not self.use_toponav: | ||
| return | ||
|
|
||
| if not self.use_goal_image: | ||
| raise ValueError("Toponav requires a modality_id that uses goal_image.") | ||
|
|
||
| topomap_path = self.resolve_package_path(str(self.nav_cfg.get("topomap_path", "config/topomap/topomap.yaml"))) | ||
| image_dir = self.resolve_package_path(str(self.nav_cfg.get("topomap_image_dir", "config/topomap/images"))) | ||
| weight_path = self.resolve_package_path(str(self.nav_cfg.get("placenet_weight_path", "deployment/weights/placenet.pt"))) | ||
|
|
||
| self.toponav = TopologicalNavigator( | ||
| topomap_path=topomap_path, | ||
| image_dir=image_dir, | ||
| weight_path=weight_path, | ||
| device=self.device, | ||
| image_size=self.goal_size, | ||
| crop_size=int(self.nav_cfg.get("toponav_crop_size", 288)), | ||
| delta=float(self.nav_cfg.get("toponav_delta", 5.0)), | ||
| window_lower=int(self.nav_cfg.get("toponav_window_lower", -1)), | ||
| window_upper=int(self.nav_cfg.get("toponav_window_upper", 10)), | ||
| ) | ||
| self.get_logger().info(f"Toponav loaded: nodes={len(self.toponav.nodes)}, topomap={topomap_path}") | ||
|
|
||
| def autonomous_callback(self, msg: Bool) -> None: | ||
| self.autonomous_flag = bool(msg.data) | ||
|
|
@@ -162,6 +221,8 @@ def prompt_callback(self, msg: String) -> None: | |
| self._update_text_feature() | ||
|
|
||
| def image_callback(self, msg: Image) -> None: | ||
| self.obs_image_bgr = image_msg_to_bgr(msg) | ||
|
|
||
| cv_image = image_to_cv2(msg, self.clip_size) | ||
| self.obs_image = PILImage.fromarray(cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)) | ||
|
|
||
|
|
@@ -176,6 +237,8 @@ def timer_callback(self) -> None: | |
| if len(self.context_queue) < self.context_size + 1: | ||
| return | ||
|
|
||
| self.update_toponav_goal() | ||
|
|
||
| obs_images, map_images, cur_large_img = build_omnivla_edge_inputs( | ||
| context_queue=self.context_queue, | ||
| current_image=self.obs_image, | ||
|
|
@@ -208,6 +271,31 @@ def timer_callback(self) -> None: | |
| self.publisher_path(waypoints) | ||
| self.publisher_command_velocity(linear_vel, angular_vel) | ||
|
|
||
| def update_toponav_goal(self) -> None: | ||
| if self.toponav is None or self.obs_image_bgr is None: | ||
| return | ||
|
|
||
| current_index, score = self.toponav.estimate_current_node(self.obs_image_bgr) | ||
| if score < self.toponav_min_score: | ||
| self.get_logger().warn( | ||
| f"Toponav score below threshold: score={score:.3f}, threshold={self.toponav_min_score:.3f}" | ||
| ) | ||
| return | ||
|
|
||
| goal_index = self.toponav.select_goal_node(current_index) | ||
| if current_index == self.toponav_current_index and goal_index == self.toponav_goal_index: | ||
| return | ||
|
|
||
| goal_pil = self.toponav.load_goal_image(goal_index) | ||
| self.goal_image_tensor = transform_images_PIL_mask(goal_pil, self.mask_goal).to(self.device) | ||
| self.toponav_current_index = current_index | ||
| self.toponav_goal_index = goal_index | ||
| current_node = self.toponav.nodes[current_index] | ||
| goal_node = self.toponav.nodes[goal_index] | ||
| self.get_logger().info( | ||
| "Toponav goal updated: " | ||
| f"current_id={current_node.node_id}, goal_id={goal_node.node_id}, score={score:.3f}" | ||
| ) | ||
|
|
||
| def publisher_path(self, waypoints: np.ndarray) -> None: | ||
| msg = NavPath() | ||
|
|
@@ -217,8 +305,8 @@ def publisher_path(self, waypoints: np.ndarray) -> None: | |
| for wp in waypoints: | ||
| pose = PoseStamped() | ||
| pose.header = msg.header | ||
| x = float(wp[0]) * self.waypoint_spacing | ||
| y = float(wp[1]) * self.waypoint_spacing | ||
| x = float(wp[0]) * self.action_scale | ||
| y = float(wp[1]) * self.action_scale | ||
| yaw = math.atan2(float(wp[3]), float(wp[2])) | ||
|
|
||
| pose.pose.position.x = x | ||
|
|
@@ -241,8 +329,8 @@ def action_to_waypoints_and_cmd_vel(self, action_pred: np.ndarray) -> Tuple[np.n | |
| selected = max(0, min(self.waypoint_select, waypoints.shape[0] - 1)) | ||
|
|
||
| dx, dy, hx, hy = [float(v) for v in waypoints[selected]] | ||
| dx *= self.waypoint_spacing | ||
| dy *= self.waypoint_spacing | ||
| dx *= self.action_scale | ||
| dy *= self.action_scale | ||
|
|
||
| eps = 1e-8 | ||
| dt = 1.0 / 3.0 | ||
|
|
@@ -257,9 +345,6 @@ def action_to_waypoints_and_cmd_vel(self, action_pred: np.ndarray) -> Tuple[np.n | |
| linear_vel = dx / dt | ||
| angular_vel = math.atan(dy / dx) / dt | ||
|
|
||
| linear_vel = float(np.clip(linear_vel, 0.0, 0.5)) | ||
| angular_vel = float(np.clip(angular_vel, -1.0, 1.0)) | ||
|
|
||
| maxv = float(self.linear_max_vel) | ||
| maxw = float(self.angular_max_vel) | ||
| if abs(linear_vel) <= maxv: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.