Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 25 additions & 52 deletions dimos/protocol/pubsub/impl/test_lcmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

from collections.abc import Generator
import time
from typing import Any

import pytest
Expand All @@ -27,6 +26,7 @@
PickleLCM,
Topic,
)
from dimos.utils.testing.collector import CallbackCollector


@pytest.fixture
Expand Down Expand Up @@ -74,25 +74,19 @@ def __eq__(self, other: object) -> bool:

def test_LCMPubSubBase_pubsub(lcm_pub_sub_base: LCMPubSubBase) -> None:
lcm = lcm_pub_sub_base

received_messages: list[tuple[Any, Any]] = []
collector = CallbackCollector(1)

topic = Topic(topic="/test_topic", lcm_type=MockLCMMessage)
test_message = MockLCMMessage("test_data")

def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))

lcm.subscribe(topic, callback)
lcm.subscribe(topic, collector)
lcm.publish(topic, test_message.lcm_encode())
time.sleep(0.1)
collector.wait()

assert len(received_messages) == 1
assert len(collector.results) == 1

received_data = received_messages[0][0]
received_topic = received_messages[0][1]

print(f"Received data: {received_data}, Topic: {received_topic}")
received_data = collector.results[0][0]
received_topic = collector.results[0][1]

assert isinstance(received_data, bytes)
assert received_data.decode() == "test_data"
Expand All @@ -102,24 +96,19 @@ def callback(msg: Any, topic: Any) -> None:


def test_lcm_autodecoder_pubsub(lcm: LCM) -> None:
received_messages: list[tuple[Any, Any]] = []
collector = CallbackCollector(1)

topic = Topic(topic="/test_topic", lcm_type=MockLCMMessage)
test_message = MockLCMMessage("test_data")

def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))

lcm.subscribe(topic, callback)
lcm.subscribe(topic, collector)
lcm.publish(topic, test_message)
time.sleep(0.1)
collector.wait()

assert len(received_messages) == 1
assert len(collector.results) == 1

received_data = received_messages[0][0]
received_topic = received_messages[0][1]

print(f"Received data: {received_data}, Topic: {received_topic}")
received_data = collector.results[0][0]
received_topic = collector.results[0][1]

assert isinstance(received_data, MockLCMMessage)
assert received_data == test_message
Expand All @@ -138,61 +127,45 @@ def callback(msg: Any, topic: Any) -> None:
# passes some geometry types through LCM
@pytest.mark.parametrize("test_message", test_msgs)
def test_lcm_geometry_msgs_pubsub(test_message: Any, lcm: LCM) -> None:
received_messages: list[tuple[Any, Any]] = []
collector = CallbackCollector(1)

topic = Topic(topic="/test_topic", lcm_type=test_message.__class__)

def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))

lcm.subscribe(topic, callback)
lcm.subscribe(topic, collector)
lcm.publish(topic, test_message)
collector.wait()

time.sleep(0.1)

assert len(received_messages) == 1
assert len(collector.results) == 1

received_data = received_messages[0][0]
received_topic = received_messages[0][1]

print(f"Received data: {received_data}, Topic: {received_topic}")
received_data = collector.results[0][0]
received_topic = collector.results[0][1]

assert isinstance(received_data, test_message.__class__)
assert received_data == test_message

assert isinstance(received_topic, Topic)
assert received_topic == topic

print(test_message, topic)


# passes some geometry types through pickle LCM
@pytest.mark.parametrize("test_message", test_msgs)
def test_lcm_geometry_msgs_autopickle_pubsub(test_message: Any, pickle_lcm: PickleLCM) -> None:
lcm = pickle_lcm
received_messages: list[tuple[Any, Any]] = []
collector = CallbackCollector(1)

topic = Topic(topic="/test_topic")

def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))

lcm.subscribe(topic, callback)
lcm.subscribe(topic, collector)
lcm.publish(topic, test_message)
collector.wait()

time.sleep(0.1)
assert len(collector.results) == 1

assert len(received_messages) == 1

received_data = received_messages[0][0]
received_topic = received_messages[0][1]

print(f"Received data: {received_data}, Topic: {received_topic}")
received_data = collector.results[0][0]
received_topic = collector.results[0][1]

assert isinstance(received_data, test_message.__class__)
assert received_data == test_message

assert isinstance(received_topic, Topic)
assert received_topic == topic

print(test_message, topic)
98 changes: 31 additions & 67 deletions dimos/protocol/pubsub/impl/test_rospubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

from collections.abc import Generator
import threading

from dimos_lcm.geometry_msgs import PointStamped
import numpy as np
Expand All @@ -28,6 +27,7 @@
# Add msg_name to LCM PointStamped for testing nested message conversion
PointStamped.msg_name = "geometry_msgs.PointStamped"
from dimos.utils.data import get_data
from dimos.utils.testing.collector import CallbackCollector
from dimos.utils.testing.replay import TimedSensorReplay


Expand Down Expand Up @@ -57,20 +57,14 @@ def test_basic_conversion(publisher, subscriber):
Simple flat dimos.msgs type with no nesting (just x/y/z floats).
"""
topic = ROSTopic("/test_ros_topic", Vector3)
collector = CallbackCollector(1)

received = []
event = threading.Event()

def callback(msg, t):
received.append(msg)
event.set()

subscriber.subscribe(topic, callback)
subscriber.subscribe(topic, collector)
publisher.publish(topic, Vector3(1.0, 2.0, 3.0))

assert event.wait(timeout=2.0), "No message received"
assert len(received) == 1
msg = received[0]
collector.wait()
assert len(collector.results) == 1
msg = collector.results[0][0]
assert msg.x == 1.0
assert msg.y == 2.0
assert msg.z == 3.0
Expand All @@ -95,21 +89,15 @@ def test_pointcloud2_pubsub(publisher, subscriber):
assert len(original) > 0, "Loaded empty pointcloud"

topic = ROSTopic("/test_pointcloud2", PointCloud2)
collector = CallbackCollector(1, timeout=5.0)

received = []
event = threading.Event()

def callback(msg, t):
received.append(msg)
event.set()

subscriber.subscribe(topic, callback)
subscriber.subscribe(topic, collector)
publisher.publish(topic, original)

assert event.wait(timeout=5.0), "No PointCloud2 message received"
assert len(received) == 1
collector.wait()
assert len(collector.results) == 1

converted = received[0]
converted = collector.results[0][0]

# Verify point cloud data is preserved
original_points, _ = original.as_numpy()
Expand Down Expand Up @@ -147,20 +135,14 @@ def test_pointcloud2_empty_pubsub(publisher, subscriber):
)

topic = ROSTopic("/test_empty_pointcloud", PointCloud2)
collector = CallbackCollector(1)

received = []
event = threading.Event()

def callback(msg, t):
received.append(msg)
event.set()

subscriber.subscribe(topic, callback)
subscriber.subscribe(topic, collector)
publisher.publish(topic, original)

assert event.wait(timeout=2.0), "No empty PointCloud2 message received"
assert len(received) == 1
assert len(received[0]) == 0
collector.wait()
assert len(collector.results) == 1
assert len(collector.results[0][0]) == 0


@pytest.mark.skipif_no_ros
Expand All @@ -178,21 +160,15 @@ def test_posestamped_pubsub(publisher, subscriber):
)

topic = ROSTopic("/test_posestamped", PoseStamped)
collector = CallbackCollector(1)

received = []
event = threading.Event()

def callback(msg, t):
received.append(msg)
event.set()

subscriber.subscribe(topic, callback)
subscriber.subscribe(topic, collector)
publisher.publish(topic, original)

assert event.wait(timeout=2.0), "No PoseStamped message received"
assert len(received) == 1
collector.wait()
assert len(collector.results) == 1

converted = received[0]
converted = collector.results[0][0]

# Verify all fields preserved
assert converted.frame_id == original.frame_id
Expand Down Expand Up @@ -220,21 +196,15 @@ def test_pointstamped_pubsub(publisher, subscriber):
original.point.z = 3.5

topic = ROSTopic("/test_pointstamped", PointStamped)
collector = CallbackCollector(1)

received = []
event = threading.Event()

def callback(msg, t):
received.append(msg)
event.set()

subscriber.subscribe(topic, callback)
subscriber.subscribe(topic, collector)
publisher.publish(topic, original)

assert event.wait(timeout=2.0), "No PointStamped message received"
assert len(received) == 1
collector.wait()
assert len(collector.results) == 1

converted = received[0]
converted = collector.results[0][0]

# Verify nested header fields are preserved
assert converted.header.frame_id == original.header.frame_id
Expand All @@ -260,21 +230,15 @@ def test_twist_pubsub(publisher, subscriber):
)

topic = ROSTopic("/test_twist", Twist)
collector = CallbackCollector(1)

received = []
event = threading.Event()

def callback(msg, t):
received.append(msg)
event.set()

subscriber.subscribe(topic, callback)
subscriber.subscribe(topic, collector)
publisher.publish(topic, original)

assert event.wait(timeout=2.0), "No Twist message received"
assert len(received) == 1
collector.wait()
assert len(collector.results) == 1

converted = received[0]
converted = collector.results[0][0]

# Verify linear velocity preserved
assert converted.linear.x == original.linear.x
Expand Down
Loading
Loading