-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathmain.py
More file actions
177 lines (146 loc) · 5.98 KB
/
main.py
File metadata and controls
177 lines (146 loc) · 5.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import logging
from contextlib import ExitStack
from time import sleep
from typing import Annotated, Optional
import pyrealsense2 as rs
import rcsss
import rcsss.control.fr3_desk
import typer
from rcsss.camera.realsense import RealSenseCameraSet
from rcsss.control.record import PoseList
from rcsss.control.utils import load_creds_fr3_desk
from rcsss.envs.factories import get_urdf_path
logger = logging.getLogger(__name__)
# MAIN CLI
main_app = typer.Typer(help="CLI tool for the Robot Control Stack (RCS).")
# REALSENSE CLI
realsense_app = typer.Typer()
main_app.add_typer(
realsense_app,
name="realsense",
help="Commands to access the intel realsense camera. This includes tools such as reading out the serial numbers of connected devices.",
)
@realsense_app.command()
def serials():
"""Reads out the serial numbers of the connected realsense devices."""
context = rs.context()
devices = RealSenseCameraSet.enumerate_connected_devices(context)
if len(devices) == 0:
logger.warning("No realsense devices connected.")
return
logger.info("Connected devices:")
for device in devices.values():
logger.info(" %s: %s", device.product_line, device.serial)
# FR3 CLI
fr3_app = typer.Typer()
main_app.add_typer(
fr3_app,
name="fr3",
help="Commands to control a Franka Research 3. This includes tools that you would usually do with Franka's Desk interface.",
)
@fr3_app.command()
def home(
ip: Annotated[str, typer.Argument(help="IP of the robot")],
shut: Annotated[bool, typer.Option("-s", help="Should the robot be shut down")] = False,
unlock: Annotated[bool, typer.Option("-u", help="unlocks the robot")] = False,
):
"""Moves the FR3 to home position"""
user, pw = load_creds_fr3_desk()
rcsss.control.fr3_desk.home(ip, user, pw, shut, unlock)
@fr3_app.command()
def info(
ip: Annotated[str, typer.Argument(help="IP of the robot")],
include_gripper: Annotated[bool, typer.Option("-g", help="includes gripper")] = False,
):
"""Prints info about the robots current joint position and end effector pose, optionally also the gripper."""
user, pw = load_creds_fr3_desk()
rcsss.control.fr3_desk.info(ip, user, pw, include_gripper)
@fr3_app.command()
def lock(
ip: Annotated[str, typer.Argument(help="IP of the robot")],
):
"""Locks the robot."""
user, pw = load_creds_fr3_desk()
rcsss.control.fr3_desk.lock(ip, user, pw)
@fr3_app.command()
def unlock(
ip: Annotated[str, typer.Argument(help="IP of the robot")],
):
"""Prepares the robot by unlocking the joints and putting the robot into the FCI mode."""
user, pw = load_creds_fr3_desk()
rcsss.control.fr3_desk.unlock(ip, user, pw)
with rcsss.control.fr3_desk.Desk(ip, user, pw) as d:
d.activate_fci()
@fr3_app.command()
def fci(
ip: Annotated[str, typer.Argument(help="IP of the robot")],
unlock: Annotated[bool, typer.Option("-u", help="unlocks the robot")] = False,
shutdown: Annotated[bool, typer.Option("-s", help="After ctrl+c shuts the robot down")] = False,
):
"""Puts the robot into FCI mode, optionally unlocks the robot. Waits for ctrl+c to exit."""
user, pw = load_creds_fr3_desk()
try:
with rcsss.control.fr3_desk.FCI(rcsss.control.fr3_desk.Desk(ip, user, pw), unlock=unlock, lock_when_done=False):
while True:
sleep(1)
except KeyboardInterrupt:
if shutdown:
rcsss.control.fr3_desk.shutdown(ip, user, pw)
@fr3_app.command()
def guiding_mode(
ip: Annotated[str, typer.Argument(help="IP of the robot")],
disable: Annotated[bool, typer.Option("-d", help="Disable guiding mode")] = False,
unlock: Annotated[bool, typer.Option("-u", help="unlocks the robot")] = False,
):
"""Enables or disables guiding mode."""
user, pw = load_creds_fr3_desk()
rcsss.control.fr3_desk.guiding_mode(ip, user, pw, disable, unlock)
@fr3_app.command()
def shutdown(
ip: Annotated[str, typer.Argument(help="IP of the robot")],
):
"""Shuts the robot down"""
user, pw = load_creds_fr3_desk()
rcsss.control.fr3_desk.shutdown(ip, user, pw)
@fr3_app.command()
def record(
ip_str: Annotated[str, typer.Argument(help="Name to IP dict. e.g. \"{'robot1': '192.168.100.1'}\"")],
urdf_path: Annotated[Optional[str], typer.Option(help="Path to the urdf file")] = None,
lpaths: Annotated[Optional[list[str]], typer.Option("--lpaths", help="Paths to load n recordings")] = None,
spath: Annotated[Optional[str], typer.Option("--spath", help="Paths to load n recordings")] = None,
buttons: Annotated[bool, typer.Option("-b", help="Use the robots buttons instead of the keyboard")] = False,
):
"""Tool to record poses with multiple FR3 robots."""
user, pw = load_creds_fr3_desk()
urdf_path = get_urdf_path(urdf_path, allow_none_if_not_found=False) # type: ignore
name2ip: dict[str, str] = eval(ip_str)
if lpaths is not None and len(lpaths) > 0:
with ExitStack() as stack:
for r_ip in name2ip.values():
stack.enter_context(rcsss.control.fr3_desk.Desk.fci(r_ip, username=user, password=pw, unlock=True))
p = PoseList.load(name2ip, lpaths, urdf_path=urdf_path)
input("Press any key to replay")
p.replay()
else:
with ExitStack() as stack:
gms = [
rcsss.control.fr3_desk.Desk.guiding_mode(r_ip, username=user, password=pw, unlock=True)
for r_ip in name2ip.values()
]
for gm in gms:
stack.enter_context(gm)
p = PoseList(name2ip, urdf_path=urdf_path)
if not buttons:
p.record()
else:
p.start_button_recording()
for gm in gms:
gm.desk.listen(p.button_callback)
while p._button_recording:
pass
for gm in gms:
gm.desk.stop_listen()
if spath is not None:
p.save(spath)
def main():
main_app()