Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
Merged

a #11

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions host/aidsorter.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ cpu_threads=0
# in the image recognition system, but also
# consumes more processing power.
# If unset, the default is 640x480
resolution=1024x768
resolution=640x480
# The debounce in frames for the system to wait
# before registering a new object detection.
detector_debounce=5
# How many samples should the program take before
# considering the object as something that belongs
# to a bucket.
detector_samples=10

# The baudrate of the serial connection
baudrate=115200
Expand All @@ -20,7 +24,7 @@ mcu_connection_timeout=10.0
# This is separated by a comma
# Example
# bucket1_contents=
bucket1_contents=person
bucket2_contents=bottle,cup
bucket3_contents=cell phone,laptop
bucket4_contents=
bucket1_contents=noodles
bucket2_contents=biscuit
bucket3_contents=canned_goods
bucket4_contents=hygiene
195 changes: 131 additions & 64 deletions host/aidsorter/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,30 @@
# pyright: reportDeprecated=false

import time
from typing import Optional

import cv2

from aidsorter import detector, exceptions, info, visualizer
from aidsorter.fps_config import FPSConfig
from aidsorter.logger import LoggerFactory
from aidsorter.mcu import MCU
from aidsorter.mcu_config import MCUConfig


def getCameraCapture(camera_id: int, resolution: tuple[int, int]) -> cv2.VideoCapture:
logger = LoggerFactory().get_logger(__name__)
logger.info("Getting camera capture...")
cam_cap = cv2.VideoCapture(camera_id)
logger.info("Backend API name: %s", cam_cap.getBackendName())
if not cam_cap.set(cv2.CAP_PROP_FRAME_WIDTH, resolution[0]):
raise exceptions.CameraError("Unable to set the camera width.")

if not cam_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, resolution[1]):
raise exceptions.CameraError("Unable to set the camera height.")

return cam_cap


def capture(
config_path: str,
camera_id: int = 1,
Expand Down Expand Up @@ -46,8 +60,9 @@ def capture(
stats_style = visualizer.StatsStyle()
tf_detector = detector.create_detector(model_name, config.cpu_threads)
mcu = MCU(mcu_port, config.baudrate, config.mcu_connection_timeout)
prev_object_category: Optional[str] = None
object_sorting_in_progress: int = -1 # which bucket is targeted for the object
object_category_samples: list[str] = []
object_category_samples_expiration: int = 0

logger = LoggerFactory().get_logger(__name__)
logger.info("Starting the detection process...")
Expand All @@ -56,13 +71,7 @@ def capture(
"Using camera resolution: %sx%s", config.resolution[0], config.resolution[1]
)
logger.info("Using CPU threads: %s", config.cpu_threads)
cam_cap = cv2.VideoCapture(camera_id)
logger.info("Backend API name: %s", cam_cap.getBackendName())
if not cam_cap.set(cv2.CAP_PROP_FRAME_WIDTH, config.resolution[0]):
raise exceptions.CameraError("Unable to set the camera width.")

if not cam_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, config.resolution[1]):
raise exceptions.CameraError("Unable to set the camera height.")
cam_cap = getCameraCapture(camera_id, config.resolution)

logger.info("Starting camera loop...")
fps.start_time = time.time() # Start the timer
Expand All @@ -88,73 +97,116 @@ def capture(
],
)

if object_sorting_in_progress != -1:
# Here, we wait for the object to fall into the bucket.
try:
if mcu.ir_states[object_sorting_in_progress - 1]:
mcu.set_gate_state(object_sorting_in_progress, False)
# just acknowledge all IR states...
# _ = mcu.acknowledge_ir_state(object_sorting_in_progress)
logger.debug("Acknowledging all IR states...")
for i in range(1, 6):
_ = mcu.acknowledge_ir_state(i)

logger.info(
"Nahulog na siya kay %s.", object_sorting_in_progress
)
object_sorting_in_progress = -1
mcu.platform_deactivate()
mcu.show_statistics()

except IndexError:
logger.error("Unable to get IR statuses. Please check the MCU.")

continue

if len(detection_result.detections) > 1:
object_category_samples_expiration = 0
mcu.set_err_led(True)
logger.error(
"%s object/s detected. Will not proceed.",
len(detection_result.detections),
)

elif len(detection_result.detections) == 1:
object_category_samples_expiration = 0
mcu.set_err_led(False)
if object_sorting_in_progress != -1:
# Here, we wait for the object to fall into the bucket.
if mcu.ir_states[object_sorting_in_progress - 1] == 1:
mcu.set_gate_state(object_sorting_in_progress, False)
object_sorting_in_progress = -1
logger.info(
"Nahulog na siya kay %s.", object_sorting_in_progress
)
logger.debug(
"object_sorting_in_progress=%s", object_sorting_in_progress
)

object_category: str = (
detection_result.detections[0].categories[0].category_name
)

if (
len(object_category_samples) < config.detector_samples
and object_sorting_in_progress == -1
):
logger.debug("Taking sample #%s.", len(object_category_samples) + 1)
object_category_samples.append(object_category)
continue

logger.debug("Samples: %s", object_category_samples)
object_category = max(
set(object_category_samples), key=object_category_samples.count
)
logger.info(
"Object category from %s samples: %s (%s%%)",
config.detector_samples,
object_category,
object_category_samples.count(object_category)
/ config.detector_samples
* 100,
)
object_category_samples.clear()

# Here, we sort the object to the correct bucket.
if object_category in config.bucket_contents[0]:
logger.info("Object %s belongs to Bucket 1.", object_category)
mcu.set_gate_state(1, True)
mcu.platform_activate()
object_sorting_in_progress = 1

elif object_category in config.bucket_contents[1]:
logger.info("Object %s belongs to Bucket 2.", object_category)
mcu.set_gate_state(2, True)
mcu.platform_activate()
object_sorting_in_progress = 2

elif object_category in config.bucket_contents[2]:
logger.info("Object %s belongs to Bucket 3.", object_category)
mcu.set_gate_state(3, True)
mcu.platform_activate()
object_sorting_in_progress = 3

elif object_category in config.bucket_contents[3]:
logger.info("Object %s belongs to Bucket 4.", object_category)
mcu.set_gate_state(4, True)
mcu.platform_activate()
object_sorting_in_progress = 4

else:
# Here, we sort the object to the correct bucket.
object_category: str = (
detection_result.detections[0].categories[0].category_name
)
if prev_object_category != object_category:
if object_category in config.bucket_contents[0]:
logger.info(
"Object %s belongs to Bucket 1.", object_category
)
mcu.set_gate_state(1, True)
mcu.platform_activate()
object_sorting_in_progress = 1

elif object_category in config.bucket_contents[1]:
logger.info(
"Object %s belongs to Bucket 2.", object_category
)
mcu.set_gate_state(2, True)
mcu.platform_activate()
object_sorting_in_progress = 2

elif object_category in config.bucket_contents[2]:
logger.info(
"Object %s belongs to Bucket 3.", object_category
)
mcu.set_gate_state(3, True)
mcu.platform_activate()
object_sorting_in_progress = 3

elif object_category in config.bucket_contents[3]:
logger.info(
"Object %s belongs to Bucket 4.", object_category
)
mcu.set_gate_state(4, True)
mcu.platform_activate()
object_sorting_in_progress = 4

else:
logger.warning(
"Unknown object category: %s", object_category
)
logger.warning("Object will be put in Bucket 5.")
mcu.platform_activate()
object_sorting_in_progress = 5

prev_object_category = (
object_category # Update the previous object category
logger.warning("Unknown object category: %s", object_category)
logger.warning("Object will be put in Bucket 5.")
mcu.platform_activate()
object_sorting_in_progress = 5

mcu.acknowledge_object_sort()

elif len(detection_result.detections) == 0:
if len(object_category_samples) != 0:
if object_category_samples_expiration < config.detector_samples:
object_category_samples_expiration += 1
continue

object_category_samples_expiration = 0
logger.info(
"No object detected after %s frames. Clearing %s samples.",
config.detector_samples,
len(object_category_samples),
)
object_category_samples.clear()

# Calculate the FPS
if fps.frame_count % fps.avg_frame_count == 0:
Expand Down Expand Up @@ -189,6 +241,21 @@ def capture(
logger.info("KeyboardInterrupt detected.")
break

except exceptions.CameraError as e:
logger.error("An error occurred: %s", e)
logger.info("Restarting after %s seconds...", info.ERROR_RESTART_DELAY)
mcu.set_err_led(True)
time.sleep(info.ERROR_RESTART_DELAY)
mcu.set_err_led(False)
cam_cap = getCameraCapture(camera_id, config.resolution)

except Exception as e: # pylint: disable=broad-exception-caught
logger.critical("An error occurred: %s", e)
logger.info("Restarting after %s seconds...", info.ERROR_RESTART_DELAY)
mcu.set_err_led(True)
time.sleep(info.ERROR_RESTART_DELAY)
mcu.set_err_led(False)

logger.info("Releasing camera...")
cam_cap.release()
logger.info("Destroying all windows...")
Expand Down
3 changes: 2 additions & 1 deletion host/aidsorter/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
VERSION = (0, 3, 0)
TITLE = f"{NAME} v{'.'.join(map(str, VERSION))}"
PROTOCOL_VERSION = "1.0"
PROTOCOL_SEP = "\r\n"
PROTOCOL_SEP = "\n"
ERROR_RESTART_DELAY = 5 # in seconds

DEBUG_MODE = (
logging.DEBUG
Expand Down
Loading
Loading