Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/).

---

## [0.1.9] - 2025-08-12

### Added
- Parser support for `EqualsSubstitution` and `NotEqualsSubstitution`
- Test coverage for `nav2_localization.launch.py`
- LaunchMap Watermark

### Fixed
- Composable node containers visualization without being called in launch description

## [0.1.8] - 2025-08-08

### Added
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "launchmap",
"displayName": "LaunchMap",
"description": "Visualize ROS2 Launch Files",
"version": "0.1.8",
"version": "0.1.9",
"publisher": "kodorobotics",
"icon": "assets/launchmap-logo.png",
"bugs": {
Expand Down
10 changes: 6 additions & 4 deletions parser/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@ def current_namespace(self) -> str | None:

## Composable node groups

def has_composable_node_group(self, container_name: str):
return container_name in self.composable_node_groups

def register_composable_node_group(self, container_name: str, container_metadata: dict):
self.composable_node_groups[container_name].update(container_metadata)

def extend_composable_node_group(self, container_name: str, nodes):
self.composable_node_groups[container_name]["composable_nodes"].extend(nodes)

def get_composable_node_groups(self) -> list[dict]:
results = []
def get_composable_node_groups(self) -> dict:
results = {}
for name, data in self.composable_node_groups.items():
if not data["composable_nodes"]:
continue
Expand All @@ -109,8 +112,7 @@ def get_composable_node_groups(self) -> list[dict]:
if value not in (None, "", [], {}):
entry[key] = value

results.append(entry)

results[name] = entry
return results

## Utility
Expand Down
2 changes: 1 addition & 1 deletion parser/entrypoint/parser_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def parse_launch_file(filepath: str) -> dict:
),
"environment_variables": context.introspection.get_environment_variables(),
"python_expressions": context.introspection.get_python_expressions(),
"composable_node_containers": sorted(context.get_composable_node_groups()),
"composable_containers": context.get_composable_node_groups(),
"additional_components": context.introspection.get_registered_entities(),
}

Expand Down
7 changes: 3 additions & 4 deletions parser/entrypoint/user_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from parser.entrypoint.parser_runner import parse_launch_file
from parser.parser.introspection_utils import (
collect_composable_node_containers,
collect_environment_variable_usages,
collect_event_handler_usages,
collect_launch_config_usages,
Expand All @@ -29,10 +30,8 @@ def parse_and_format_launch_file(filepath: str) -> dict:
"""
raw = parse_launch_file(filepath)
grouped = group_entities_by_type(raw["parsed"] + raw["additional_components"])

composable_node_containers = raw.get("composable_node_containers")
if composable_node_containers:
grouped["composable_nodes_container"] = composable_node_containers

grouped = collect_composable_node_containers(grouped, raw["composable_containers"])

launch_argument_usages = collect_launch_config_usages(grouped)
if launch_argument_usages:
Expand Down
40 changes: 40 additions & 0 deletions parser/parser/handlers/equals_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (c) 2025 Kodo Robotics
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ast

from parser.context import ParseContext
from parser.parser.postprocessing import simplify_launch_configurations
from parser.parser.registry import register_handler
from parser.resolution.utils import resolve_call_signature


@register_handler("EqualsSubstitution", "launch.substitutions.EqualsSubstitution")
def handle_equals_substitution(node: ast.Call, context: ParseContext) -> dict:
args, _ = resolve_call_signature(node, context.engine)
if len(args) != 2:
raise ValueError("EqualsSubstitution must receive exactly two arguments.")

left, right = (simplify_launch_configurations(arg) for arg in args)
return f"${{EqualsSubstitution:{left}, {right}}}"


@register_handler("NotEqualsSubstitution", "launch.substitutions.NotEqualsSubstitution")
def handle_not_equals_substitution(node: ast.Call, context: ParseContext) -> dict:
args, _ = resolve_call_signature(node, context.engine)
if len(args) != 2:
raise ValueError("NotEqualsSubstitution must receive exactly two arguments.")

left, right = (simplify_launch_configurations(arg) for arg in args)
return f"${{NotEqualsSubstitution:{left}, {right}}}"
7 changes: 6 additions & 1 deletion parser/parser/handlers/group_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ def handle_group_action(node: ast.Call, context: ParseContext) -> dict:
resolved_flat = flatten_once(raw_expr)

namespace = None
parameters = []
actions = []
for item in resolved_flat:
if isinstance(item, dict) and item.get("type") == "PushRosNamespace":
if isinstance(item, dict) and item.get("type") == "PushROSNamespace":
namespace = item.get("namespace")
context.push_namespace(namespace)
elif isinstance(item, dict) and item.get("type") == "SetParameter":
parameters.append({item.get("name"): item.get("value")})
else:
actions.append(item)

Expand All @@ -55,5 +58,7 @@ def handle_group_action(node: ast.Call, context: ParseContext) -> dict:

if namespace:
result["namespace"] = namespace
if parameters:
result["parameters"] = parameters

return result
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import ast

from parser.context import ParseContext
from parser.parser.postprocessing import simplify_launch_configurations
from parser.parser.registry import register_handler
from parser.parser.utils.common import flatten_once, group_entities_by_type
from parser.resolution.utils import resolve_call_signature
Expand All @@ -30,13 +31,25 @@ def handle_load_composable_nodes(node: ast.Call, context: ParseContext) -> dict:
grouped = group_entities_by_type(resolved_flat)
composable_nodes = grouped.get("unattached_composable_nodes", [])

# Add additional metadata to composable nodes
condition = kwargs.get("condition", {})
if condition:
for idx, _ in enumerate(composable_nodes):
composable_nodes[idx].update({"condition": condition})

# Determine target container
target_container = kwargs.get("target_container")
target_container = simplify_launch_configurations(kwargs.get("target_container"))
if not target_container:
raise ValueError("LoadComposableNodes requires a target_container to be specified.")
if isinstance(target_container, list):
target_container = "".join(target_container)

# Ensure group is registered
first_instance = not context.has_composable_node_group(target_container)
context.register_composable_node_group(target_container, {"target_container": target_container})
context.extend_composable_node_group(target_container, composable_nodes)

if first_instance:
return {"type": "ComposableNodeContainer", "target_container": target_container}

return {"type": "LoadComposableNodes", "target_container": target_container}
5 changes: 3 additions & 2 deletions parser/parser/handlers/push_ros_namespace_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from parser.resolution.utils import resolve_call_signature


@register_handler("PushRosNamespace", "launch_ros.actions.PushRosNamespace")
@register_handler("PushROSNamespace", "launch_ros.actions.PushROSNamespace",
"PushRosNamespace", "launch_ros.actions.PushRosNamespace")
def handle_push_ros_namespace(node: ast.Call, context: ParseContext):
args, kwargs = resolve_call_signature(node, context.engine)

ns = args[0] if args else kwargs.get("namespace")
return {"type": "PushRosNamespace", "namespace": ns}
return {"type": "PushROSNamespace", "namespace": ns}
4 changes: 4 additions & 0 deletions parser/parser/handlers/set_parameter_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
def handle_set_parameter(node: ast.Call, context: ParseContext) -> dict:
args, kwargs = resolve_call_signature(node, context.engine)

if len(args) == 2:
kwargs["name"] = args[0]
kwargs["value"] = args[1]

name = kwargs.get("name")
value = kwargs.get("value")

Expand Down
27 changes: 27 additions & 0 deletions parser/parser/introspection_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,30 @@ def walk(obj, path):
walk(entry, f"{top_key}[{idx}]")

return usages

def collect_composable_node_containers(grouped: dict, composable_containers: dict) -> dict:
"""
Recursively walk the grouped data and return the instances of composable_nodes_container
"""
def walk(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == "composable_nodes_container":
containers = obj.get("composable_nodes_container")
for idx, container in enumerate(containers):
container_name = container.get("target_container")
new_container = composable_containers.get(container_name, {})
obj[key][idx] = new_container
else:
walk(value)

elif isinstance(obj, list):
for idx, item in enumerate(obj):
walk(item)

elif isinstance(obj, tuple):
for idx, item in enumerate(obj):
walk(item)

walk(grouped)
return grouped
Loading