Skip to content

Commit ec9ed9d

Browse files
committed
Fixed formatting.
1 parent 3896b94 commit ec9ed9d

5 files changed

Lines changed: 65 additions & 45 deletions

File tree

examples/rpc_run_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import time
2+
23
from python.rcs.rpc.client import RcsClient
34

45
if __name__ == "__main__":

examples/rpc_run_server.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
from rcs.envs.base import ControlMode, RelativeTo
12
from rcs.envs.creators import SimEnvCreator
23
from rcs.envs.utils import (
3-
default_mujoco_cameraset_cfg,
4-
default_sim_gripper_cfg,
5-
default_sim_robot_cfg,
4+
default_mujoco_cameraset_cfg,
5+
default_sim_gripper_cfg,
6+
default_sim_robot_cfg,
67
)
7-
from rcs.envs.base import ControlMode, RelativeTo
88
from rcs.rpc.server import RcsServer
99

10+
1011
def run_server():
1112
env = SimEnvCreator()(
1213
control_mode=ControlMode.JOINTS,

python/rcs/rpc/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import rpyc
33
from rpyc.utils.classic import obtain
44

5+
56
class RcsClient(gym.Env):
67
def __init__(self, host='localhost', port=50051):
78
super().__init__()

python/rcs/rpc/server.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
21
# import wrapper
3-
from gymnasium import Wrapper
42
import rpyc
3+
from gymnasium import Wrapper
54
from rpyc.utils.server import ThreadedServer
6-
rpyc.core.protocol.DEFAULT_CONFIG['allow_pickle'] = True
5+
6+
rpyc.core.protocol.DEFAULT_CONFIG["allow_pickle"] = True
7+
78

89
@rpyc.service
910
class RcsServer(Wrapper, rpyc.Service):
10-
def __init__(self, env, host='localhost', port=50051):
11+
def __init__(self, env, host="localhost", port=50051):
1112
super().__init__(env)
1213
self.host = host
1314
self.port = port
@@ -25,25 +26,24 @@ def reset(self, **kwargs):
2526
@rpyc.exposed
2627
def get_obs(self):
2728
"""Get the current observation using the Wrapper base class if available."""
28-
if hasattr(super(), 'get_obs'):
29+
if hasattr(super(), "get_obs"):
2930
return super().get_obs()
30-
elif hasattr(self.env, 'get_obs'):
31+
if hasattr(self.env, "get_obs"):
3132
return self.env.get_obs()
32-
else:
33-
raise NotImplementedError("The environment does not have a get_obs method.")
33+
error = "The environment does not have a get_obs method."
34+
raise NotImplementedError(error)
3435

3536
@rpyc.exposed
3637
def unwrapped(self):
3738
"""Return the unwrapped environment using the Wrapper base class."""
3839
return super().unwrapped
39-
40+
4041
@rpyc.exposed
4142
def action_space(self):
4243
"""Return the action space using the Wrapper base class."""
4344
return super().action_space
4445

4546
def start(self):
46-
import time
4747
print(f"Starting RcsServer RPC (looped OneShotServer) on {self.host}:{self.port}")
4848
t = ThreadedServer(self, port=self.port)
49-
t.start()
49+
t.start()

python/tests/test_rpc.py

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,37 @@
1+
# noqa: type
2+
13
import multiprocessing
2-
import time
4+
import os
35
import socket
46
import sys
7+
import time
58
import traceback
6-
import os
9+
from contextlib import suppress
10+
from multiprocessing.context import ForkServerContext, SpawnContext
11+
from typing import Optional, Type, Union # Add Type and Union here
12+
713
import pytest
8-
from typing import Optional # Add this import at the top
9-
from rcs.envs.creators import SimEnvCreator
10-
from rcs.envs.utils import (
11-
default_mujoco_cameraset_cfg,
12-
default_sim_gripper_cfg,
13-
default_sim_robot_cfg,
14-
)
1514
from rcs.envs.base import ControlMode, RelativeTo
16-
from rcs.rpc.server import RcsServer
15+
from rcs.envs.creators import SimEnvCreator
16+
from rcs.envs.utils import default_sim_gripper_cfg, default_sim_robot_cfg
1717
from rcs.rpc.client import RcsClient
18+
from rcs.rpc.server import RcsServer
1819

1920
HOST = "127.0.0.1"
2021

22+
2123
def get_free_port() -> int:
2224
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
2325
s.bind((HOST, 0))
2426
return s.getsockname()[1]
2527

28+
2629
def wait_for_port(
2730
host: str,
2831
port: int,
2932
timeout: float,
3033
server_proc: Optional[multiprocessing.Process] = None,
31-
err_q: Optional[multiprocessing.Queue] = None
34+
err_q: Optional[multiprocessing.Queue] = None,
3235
) -> None:
3336
start = time.time()
3437
last_exc = None
@@ -44,21 +47,17 @@ def wait_for_port(
4447
if server_proc is not None and not server_proc.is_alive():
4548
server_err = None
4649
if err_q is not None:
47-
try:
50+
with suppress(Exception):
4851
server_err = err_q.get_nowait()
49-
except Exception:
50-
pass
5152
msg = f"Server process exited early (exitcode={server_proc.exitcode})."
5253
if server_err:
5354
msg += f"\nServer traceback:\n{server_err}"
5455
raise RuntimeError(msg)
5556
time.sleep(0.2)
5657
server_err = None
5758
if err_q is not None:
58-
try:
59+
with suppress(Exception):
5960
server_err = err_q.get_nowait()
60-
except Exception:
61-
pass
6261
msg = f"Timed out waiting for {host}:{port} to open."
6362
if last_exc:
6463
msg += f" Last socket error: {last_exc}"
@@ -68,6 +67,7 @@ def wait_for_port(
6867
msg += f"\nServer traceback:\n{server_err}"
6968
raise TimeoutError(msg)
7069

70+
7171
def run_server(host: str, port: int, err_q: multiprocessing.Queue) -> None:
7272
try:
7373
env = SimEnvCreator()(
@@ -76,7 +76,7 @@ def run_server(host: str, port: int, err_q: multiprocessing.Queue) -> None:
7676
robot_cfg=default_sim_robot_cfg(),
7777
gripper_cfg=default_sim_gripper_cfg(),
7878
# Disabled to avoid rendering problem in python subprocess.
79-
#cameras=default_mujoco_cameraset_cfg(),
79+
# cameras=default_mujoco_cameraset_cfg(),
8080
max_relative_movement=0.1,
8181
relative_to=RelativeTo.LAST_STEP,
8282
)
@@ -90,20 +90,22 @@ def run_server(host: str, port: int, err_q: multiprocessing.Queue) -> None:
9090
time.sleep(1)
9191
except Exception:
9292
tb = "".join(traceback.format_exception(*sys.exc_info()))
93-
try:
93+
with suppress(Exception):
9494
err_q.put(tb)
95-
except Exception:
96-
pass
9795
sys.exit(1)
9896

99-
def _mp_context() -> multiprocessing.context.BaseContext:
97+
98+
def _mp_context() -> Union[SpawnContext, ForkServerContext]:
10099
# Prefer spawn to avoid fork-related issues with GL/MuJoCo/threaded libs
101100
methods = multiprocessing.get_all_start_methods()
102101
if "spawn" in methods:
103102
return multiprocessing.get_context("spawn")
104103
if "forkserver" in methods:
105104
return multiprocessing.get_context("forkserver")
106-
return multiprocessing.get_context(methods[0])
105+
106+
msg = "No suitable multiprocessing context found."
107+
raise RuntimeError(msg)
108+
107109

108110
def _external_server_from_env() -> tuple[str, int] | None:
109111
# Set RCS_TEST_HOST and RCS_TEST_PORT to reuse an already running server.
@@ -119,6 +121,7 @@ def _external_server_from_env() -> tuple[str, int] | None:
119121
return HOST, 50055
120122
return None
121123

124+
122125
def test_run_server_starts_and_stops():
123126
# Skip if reusing an external server
124127
ext = _external_server_from_env()
@@ -130,17 +133,25 @@ def test_run_server_starts_and_stops():
130133
server_proc = ctx.Process(target=run_server, args=(HOST, port, err_q))
131134
server_proc.start()
132135
try:
133-
wait_for_port(HOST, port, timeout=120.0, server_proc=server_proc, err_q=err_q)
136+
wait_for_port(HOST, port, timeout=120.0, server_proc=server_proc, err_q=err_q) # type: ignore
134137
assert server_proc.is_alive(), "Server process did not start as expected."
135138
finally:
136139
if server_proc.is_alive():
137140
server_proc.terminate()
138141
server_proc.join(timeout=5)
139142
assert not server_proc.is_alive(), "Server process did not terminate as expected."
140143

144+
141145
class TestRcsClientServer:
146+
client: RcsClient
147+
host: str = HOST
148+
port: int = 0
149+
server_proc = None
150+
err_q: Optional[multiprocessing.Queue] = None
151+
152+
142153
@classmethod
143-
def setup_class(cls):
154+
def setup_class(cls: Type["TestRcsClientServer"]):
144155
ext = _external_server_from_env()
145156
if ext:
146157
cls.host, cls.port = ext
@@ -156,11 +167,11 @@ def setup_class(cls):
156167
cls.server_proc = ctx.Process(target=run_server, args=(cls.host, cls.port, cls.err_q))
157168
cls.server_proc.start()
158169
# Wait until the server is actually listening or fail early if it crashed
159-
wait_for_port(cls.host, cls.port, timeout=180.0, server_proc=cls.server_proc, err_q=cls.err_q)
170+
wait_for_port(cls.host, cls.port, timeout=180.0, server_proc=cls.server_proc, err_q=cls.err_q) # type: ignore
160171
cls.client = RcsClient(host=cls.host, port=cls.port)
161172

162173
@classmethod
163-
def teardown_class(cls):
174+
def teardown_class(cls: Type["TestRcsClientServer"]):
164175
try:
165176
if getattr(cls, "client", None):
166177
cls.client.close()
@@ -188,8 +199,14 @@ def test_unwrapped(self):
188199
_ = self.client.unwrapped
189200

190201
def test_close(self):
191-
self.client.close()
202+
if self.client is not None:
203+
self.client.close()
192204
# Reconnect for further tests
193-
wait_for_port(self.__class__.host, self.__class__.port, timeout=15.0,
194-
server_proc=self.__class__.server_proc, err_q=self.__class__.err_q)
205+
wait_for_port(
206+
self.__class__.host,
207+
self.__class__.port,
208+
timeout=15.0,
209+
server_proc=self.__class__.server_proc, # type: ignore
210+
err_q=self.__class__.err_q,
211+
)
195212
self.__class__.client = RcsClient(host=self.__class__.host, port=self.__class__.port)

0 commit comments

Comments
 (0)