-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrobot_registry.py
More file actions
188 lines (171 loc) · 7.07 KB
/
robot_registry.py
File metadata and controls
188 lines (171 loc) · 7.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import sqlite3
import os
from datetime import datetime
class RobotRegistry:
def __init__(self, db_file='robot_registry.db'):
self.db_file = db_file
self._create_tables()
def _get_connection(self):
conn = sqlite3.connect(self.db_file)
conn.row_factory = sqlite3.Row # Enable column access by name
return conn
def _create_tables(self):
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
CREATE TABLE IF NOT EXISTS robots (
robot_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
manufacturer TEXT NOT NULL,
registration_date TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS battery_specs (
robot_id TEXT PRIMARY KEY,
capacity REAL NOT NULL,
type TEXT NOT NULL,
nominal_voltage REAL NOT NULL,
FOREIGN KEY (robot_id) REFERENCES robots(robot_id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS lidar_specs (
robot_id TEXT PRIMARY KEY,
model TEXT NOT NULL,
max_range REAL NOT NULL,
points_per_second INTEGER NOT NULL,
FOREIGN KEY (robot_id) REFERENCES robots(robot_id)
)
''')
conn.commit()
except sqlite3.Error as e:
print(f"Error creating tables: {e}")
finally:
if cursor:
cursor.close()
conn.close()
def register_robot(self, robot_id, name, manufacturer, battery_data=None, lidar_data=None):
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO robots (robot_id, name, manufacturer, registration_date)
VALUES (?, ?, ?, ?)
""", (robot_id, name, manufacturer, str(datetime.now())))
if battery_data:
cursor.execute("""
INSERT INTO battery_specs (robot_id, capacity, type, nominal_voltage)
VALUES (?, ?, ?, ?)
""", (robot_id, battery_data['capacity'], battery_data['type'], battery_data['nominal_voltage']))
if lidar_data:
cursor.execute("""
INSERT INTO lidar_specs (robot_id, model, max_range, points_per_second)
VALUES (?, ?, ?, ?)
""", (robot_id, lidar_data['model'], lidar_data['max_range'], lidar_data['points_per_second']))
conn.commit()
return robot_id
except sqlite3.IntegrityError:
return None
except sqlite3.Error as e:
print(f"Error registering robot: {e}")
return None
finally:
if cursor:
cursor.close()
conn.close()
def update_robot(self, robot_id, name=None, manufacturer=None, battery_data=None, lidar_data=None):
conn = self._get_connection()
cursor = conn.cursor()
try:
updates = []
params = []
if name:
updates.append("name = ?")
params.append(name)
if manufacturer:
updates.append("manufacturer = ?")
params.append(manufacturer)
if not updates:
return
params.append(robot_id)
sql = "UPDATE robots SET " + ", ".join(updates) + " WHERE robot_id = ?"
cursor.execute(sql, params)
if battery_data:
cursor.execute("""
INSERT INTO battery_specs (robot_id, capacity, type, nominal_voltage)
VALUES (?, ?, ?, ?)
ON CONFLICT(robot_id) DO UPDATE SET
capacity = excluded.capacity,
type = excluded.type,
nominal_voltage = excluded.nominal_voltage
""", (robot_id, battery_data['capacity'], battery_data['type'], battery_data['nominal_voltage']))
if lidar_data:
cursor.execute("""
INSERT INTO lidar_specs (robot_id, model, max_range, points_per_second)
VALUES (?, ?, ?, ?)
ON CONFLICT(robot_id) DO UPDATE SET
model = excluded.model,
max_range = excluded.max_range,
points_per_second = excluded.points_per_second
""", (robot_id, lidar_data['model'], lidar_data['max_range'], lidar_data['points_per_second']))
conn.commit()
except sqlite3.Error as e:
print(f"Error updating robot: {e}")
finally:
if cursor:
cursor.close()
conn.close()
def get_robot_details(self, robot_id):
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT r.*, b.capacity AS battery_capacity, b.type AS battery_type, b.nominal_voltage AS battery_nominal_voltage,
l.model AS lidar_model, l.max_range AS lidar_max_range, l.points_per_second AS lidar_points_per_second
FROM robots r
LEFT JOIN battery_specs b ON r.robot_id = b.robot_id
LEFT JOIN lidar_specs l ON r.robot_id = l.robot_id
WHERE r.robot_id = ?
""", (robot_id,))
row = cursor.fetchone()
if row:
return {
'robot_id': row['robot_id'],
'name': row['name'],
'manufacturer': row['manufacturer'],
'registration_date': row['registration_date'],
'battery_data': {
'capacity': row['battery_capacity'],
'type': row['battery_type'],
'nominal_voltage': row['battery_nominal_voltage']
} if row['battery_capacity'] is not None else None,
'lidar_data': {
'model': row['lidar_model'],
'max_range': row['lidar_max_range'],
'points_per_second': row['lidar_points_per_second']
} if row['lidar_model'] is not None else None
}
else:
return None
except sqlite3.Error as e:
print(f"Error getting robot details: {e}")
return None
finally:
if cursor:
cursor.close()
conn.close()
def list_robots(self):
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT robot_id FROM robots")
return [row['robot_id'] for row in cursor.fetchall()]
except sqlite3.Error as e:
print(f"Error listing robots: {e}")
return []
finally:
if cursor:
cursor.close()
conn.close()