Skip to content
Merged

Dev #22

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Unit Tests

on:
push:
pull_request:

jobs:
test:
name: Python ${{ matrix.python-version }} / ${{ matrix.os }}
runs-on: ${{ matrix.os }}

strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

# opencv-python needs libGL on headless Ubuntu runners.
- name: Install system dependencies (Linux)
if: runner.os == 'Linux'
run: sudo apt-get update -q && sudo apt-get install -y libgl1

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install -r requirements-dev.txt

- name: Run unit tests
run: python -m pytest tests/unit/ -q --tb=short

14 changes: 13 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "py3r_media"
version = "0.3.1"
version = "0.5.0"
authors = [
{ name="Marcel Schmutz", email="mschmut@ethz.ch" },
]
Expand All @@ -17,8 +17,20 @@ dependencies = [
"numpy >= 2",
"opencv-python >= 4",
"reactivex >= 4.0.0",
"av >= 13",
]

[project.optional-dependencies]
dev = [
"pytest >= 8",
]

[tool.setuptools.packages.find]
where = ["src/"]
include = ["py3r.media*"]

[tool.pytest.ini_options]
testpaths = ["tests"]
markers = [
"hardware: requires physical capture hardware (webcam, Pylon camera, …)",
]
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-r requirements.txt
pytest >= 8
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
numpy>=2
opencv-python>=4
reactivex>=4.0.0
pypylon>=4.0.0
pypylon>=4.0.0
av>=13
6 changes: 6 additions & 0 deletions src/py3r/media/streaming/observables/reader_observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from reactivex.abc import ObserverBase
from reactivex.disposable import Disposable

from py3r.media.types import FatalReadError

log = logging.getLogger(__name__)

TItem = TypeVar("TItem")
Expand Down Expand Up @@ -69,6 +71,10 @@ def run():
try:
item = reader.read(read_timeout_seconds)
consecutive_errors = 0 # reset on success
except FatalReadError as ex:
if not stop.is_set():
observer.on_error(ex)
return
except Exception as ex:
if stop.is_set():
return
Expand Down
33 changes: 18 additions & 15 deletions src/py3r/media/streaming/operators/adaptive_pace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ def adaptive_pace(
initial_interval: Optional[float] = None,
learn_rate: float = 0.1,
scheduler: Optional[rx.abc.SchedulerBase] = None,
_poll: float = 0.01,
) -> Callable[[rx.abc.ObservableBase[_T]], rx.Observable[_T]]:
def _op(source: rx.abc.ObservableBase[_T]) -> rx.Observable[_T]:
def _subscribe(observer: rx.abc.ObserverBase[_T], scheduler_: Optional[rx.abc.SchedulerBase] = None) -> Disposable:
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()

q: queue.Queue[_T] = queue.Queue()
q: queue.Queue = queue.Queue()
last_time: Optional[float] = None
period: float = initial_interval or 0.0
stopped = threading.Event()
disposed = threading.Event()

def update_interval(now: float):
Expand All @@ -37,14 +37,14 @@ def update_interval(now: float):

def on_next(x: _T):
update_interval(time.perf_counter())
q.put(x)
q.put(("next", x))

def on_error(err: Exception):
stopped.set()
observer.on_error(err)
# Route through queue so buffered on_next items are delivered first.
q.put(("error", err))

def on_completed():
stopped.set()
q.put(("completed", None))

src_disp = source.subscribe(
on_next,
Expand All @@ -54,28 +54,31 @@ def on_completed():
)

def emit_loop():
nonlocal period
try:
while not disposed.is_set():
try:
item = q.get(timeout=0.01)
observer.on_next(item)
kind, value = q.get(timeout=_poll)
except queue.Empty:
pass
continue

if stopped.is_set() and q.empty():
if kind == "next":
observer.on_next(value)
elif kind == "error":
observer.on_error(value)
return
else: # completed
observer.on_completed()
return

# Pace: sleep for the learned interval, waking frequently
# so we can respond to dispose() quickly.
sleep_time = max(period, 1e-6)
deadline = time.perf_counter() + sleep_time
while True:
if disposed.is_set():
return
while not disposed.is_set():
remaining = deadline - time.perf_counter()
if remaining <= 0:
break
time.sleep(min(remaining, 0.01))
time.sleep(min(remaining, _poll))
except Exception as e:
observer.on_error(e)

Expand Down
3 changes: 2 additions & 1 deletion src/py3r/media/streaming/operators/observe_on_bounded.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def observe_on_bounded(
maxsize: int = 256,
policy: str = "block",
timeout: float = 0.1,
_worker_poll: float = 0.05,
) -> Callable[[rx.Observable[_T]], rx.Observable[_T]]:
"""
Like observe_on, but with a bounded internal queue.
Expand Down Expand Up @@ -89,7 +90,7 @@ def worker(_sc=None, _st=None):

try:
try:
action = q.get(timeout=0.05)
action = q.get(timeout=_worker_poll)
except queue.Empty:
if not disposed.is_set():
worker_disp.disposable = scheduler.schedule(worker)
Expand Down
19 changes: 12 additions & 7 deletions src/py3r/media/streaming/operators/subscribe_on_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@ def action(_sched, _state: Any = None):

def subscribe_on_future(
scheduler: rx.abc.SchedulerBase,
subscribed: Future[None] = Future(),
disposed: Future[None] = Future()
subscribed: Optional[Future[None]] = None,
disposed: Optional[Future[None]] = None,
) -> Callable[[rx.abc.ObservableBase[_T]], rx.Observable[_T]]:
# Create fresh Futures here (not in the signature) to avoid the
# mutable-default-argument trap where all callers share the same object.
if subscribed is None:
subscribed = Future()
if disposed is None:
disposed = Future()

def _op(source: rx.abc.ObservableBase[_T]) -> rx.Observable[_T]:

Expand All @@ -56,19 +62,18 @@ def subscribe(
d.disposable = m

def action(
sched: rx.abc.SchedulerBase,
_state: Optional[Any] = None,
sched: rx.abc.SchedulerBase,
_state: Optional[Any] = None,
):
try:
# subscribe upstream on this scheduler
inner_disp = source.subscribe(observer)
except BaseException as exc:
# subscription failed
if not subscribed.done():
subscribed.set_exception(exc)
raise
else:
# subscription succeeded; disposal will be signaled separately
# Subscription fully established — signal now so callers that
# blocked on subscribed.result() know it is safe to start producing.
d.disposable = FutureScheduledDisposable(sched, inner_disp, disposed)
if not subscribed.done():
subscribed.set_result(None)
Expand Down
33 changes: 33 additions & 0 deletions src/py3r/media/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,39 @@
import numpy as np


# ---------------------------------------------------------------------------
# Read error hierarchy
# ---------------------------------------------------------------------------

class ReadError(Exception):
"""Base class for errors raised by IReader.read()."""


class FatalReadError(ReadError):
"""
An unrecoverable read failure. ``reader_observable`` will forward this
immediately as ``on_error`` without consulting ``max_consecutive_errors``.

All other exceptions (including plain ``ReadError`` subclasses) are treated
as retryable by default — the source will be retried up to
``max_consecutive_errors`` times before the stream is terminated.

Use this only when retrying is known to be pointless or harmful (e.g. the
camera has been physically disconnected and the source has already set an
internal flag so that every subsequent ``read()`` would also fail
immediately). In most cases it is better to let the consecutive-error
threshold handle termination naturally.
"""


class GrabFailedError(ReadError):
"""The camera returned a result whose GrabSucceeded() flag is False."""


class GrabTimeoutError(ReadError):
"""RetrieveResult returned without a frame within the given timeout."""


@runtime_checkable
class HasImage(Protocol):
@property
Expand Down
Loading
Loading