Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions data_collection.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Error loading module `ublox_gps`: No module named 'serial'\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "2e48bbd61f5b48b7addfb65f463e3b07",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Controller()"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import ipywidgets.widgets as widgets\n",
"from jetbot import Robot, Camera, bgr8_to_jpeg, Heartbeat\n",
"import traitlets\n",
"import os\n",
"import ipywidgets as widgets\n",
"from IPython.display import display\n",
"import traitlets\n",
"from jetbot import Camera, bgr8_to_jpeg\n",
"from datetime import datetime\n",
"\n",
"# establish a connection with the controller\n",
"controller = widgets.Controller(index=0) # change index if necessary\n",
"display(controller)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# link controller to motors\n",
"robot = Robot()\n",
"\n",
"left_link = traitlets.dlink((controller.axes[1], 'value'), (robot.left_motor, 'value'), transform=lambda x: -x)\n",
"right_link = traitlets.dlink((controller.axes[3], 'value'), (robot.right_motor, 'value'), transform=lambda x: -x)\n",
"\n",
"# establish video feed\n",
"image = widgets.Image(format='jpeg', width=300, height=300)\n",
"camera = Camera.instance()\n",
"camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)\n",
"\n",
"# create killswitch function that ceases the JetBot's functions if disconnected\n",
"def handle_heartbeat_status(change):\n",
" if change['new'] == Heartbeat.Status.dead:\n",
" camera_link.unlink()\n",
" left_link.unlink()\n",
" right_link.unlink()\n",
" robot.stop()\n",
"\n",
"heartbeat = Heartbeat(period=0.5)\n",
"heartbeat.observe(handle_heartbeat_status, names='status')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# global state\n",
"run_initialized = False\n",
"run_dirs = {}\n",
"\n",
"# initialize directory structure\n",
"def init_run_dirs():\n",
" global run_initialized, run_dirs\n",
" suffix = suffix_input.value.strip()\n",
" if not suffix:\n",
" print(\"❌ Please enter a suffix before saving.\")\n",
" return False\n",
"\n",
" base_dir = os.path.join('./data', suffix)\n",
" labels = ['left', 'forward', 'right']\n",
" run_dirs = {}\n",
"\n",
" for label in labels:\n",
" subfolder = f\"{label}_{suffix}\"\n",
" full_path = os.path.join(base_dir, subfolder)\n",
" os.makedirs(full_path, exist_ok=True)\n",
" run_dirs[label] = full_path\n",
"\n",
" run_initialized = True\n",
" print(f\"✅ Initialized run folders under: {base_dir}\")\n",
" return True\n",
"\n",
"# save snapshot\n",
"def save_snapshot(label):\n",
" if not run_initialized:\n",
" if not init_run_dirs():\n",
" return\n",
" dir_path = run_dirs[label]\n",
" suffix = suffix_input.value.strip()\n",
" filename = f\"{label}_{suffix}_{len(os.listdir(dir_path))}.jpg\"\n",
" path = os.path.join(dir_path, filename)\n",
" with open(path, 'wb') as f:\n",
" f.write(image.value)\n",
"\n",
"# save and update counter\n",
"def save_and_update(label):\n",
" save_snapshot(label)\n",
" count_widgets[label].value = len(os.listdir(run_dirs[label]))\n",
" \n",
"# controller bindings\n",
"def make_button_handler(label):\n",
" def handler(change):\n",
" if change['new'] == 1.0:\n",
" save_and_update(label)\n",
" return handler"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# create filename suffix input\n",
"suffix_input = widgets.Text(\n",
" value='',\n",
" placeholder='enter run name (e.g. run1)',\n",
" description='run name:',\n",
" layout=widgets.Layout(width='300px')\n",
")\n",
"\n",
"# widgets\n",
"button_layout = widgets.Layout(width='128px', height='64px')\n",
"count_widgets = {\n",
" 'left': widgets.IntText(layout=button_layout, value=0),\n",
" 'forward': widgets.IntText(layout=button_layout, value=0),\n",
" 'right': widgets.IntText(layout=button_layout, value=0),\n",
"}\n",
"buttons = {\n",
" 'left': widgets.Button(description='⬅️ (LB)', layout=button_layout),\n",
" 'forward': widgets.Button(description='⬆️ (L2/R2)', layout=button_layout),\n",
" 'right': widgets.Button(description='➡️ (RB)', layout=button_layout),\n",
"}\n",
"buttons['left'].on_click(lambda x: save_and_update('left'))\n",
"buttons['forward'].on_click(lambda x: save_and_update('forward'))\n",
"buttons['right'].on_click(lambda x: save_and_update('right'))\n",
"\n",
"controller.buttons[4].observe(make_button_handler('left'), names='value')\n",
"controller.buttons[5].observe(make_button_handler('right'), names='value')\n",
"controller.buttons[6].observe(make_button_handler('forward'), names='value')\n",
"controller.buttons[7].observe(make_button_handler('forward'), names='value')"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "c92139a8cdd24be686bc9e04b4958d98",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Text(value='', description='Run name:', layout=Layout(width='300px'), placeholder='Enter run name (e.g. run1)'…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "9d1eb473c0dc4e559f7b340bee27a5f4",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"HBox(children=(IntText(value=0, layout=Layout(height='64px', width='128px')), IntText(value=0, layout=Layout(h…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "3c988fa66b4e49c88536e68aa344a86d",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"HBox(children=(Button(description='⬅️ (LB)', layout=Layout(height='64px', width='128px'), style=ButtonStyle())…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "b67e4caba6bf4b9c92454ad8acadf93f",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Image(value=b'\\xff\\xd8\\xff\\xe0\\x00\\x10JFIF\\x00\\x01\\x01\\x00\\x00\\x01\\x00\\x01\\x00\\x00\\xff\\xdb\\x00C\\x00\\x02\\x01\\x0…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# display GUI\n",
"display(suffix_input)\n",
"display(widgets.HBox([count_widgets['left'], count_widgets['forward'], count_widgets['right']]))\n",
"display(widgets.HBox([buttons['left'], buttons['forward'], buttons['right']]))\n",
"display(image) # comment out as needed if JetBot keeps freezing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Loading