From 7f966fe6ed2f0124d4d07d90bb7f54d5e6a1d571 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Tue, 23 Jun 2026 23:32:16 +0200 Subject: [PATCH 1/8] Add defer wrapper --- src/openhound/core/app.py | 13 ++++++++++++- src/openhound/core/app.pyi | 1 + src/openhound/core/resources.py | 18 +++++++++++++++--- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index 971aecf..a592a96 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -21,7 +21,7 @@ from openhound.core.models.extension import Extension from openhound.core.preproc import PreProcContext, PreProcessor from openhound.core.progress import Progress -from openhound.core.resources import safe_resource_wrapper +from openhound.core.resources import safe_defer_wrapper, safe_resource_wrapper logger = logging.getLogger(__name__) @@ -292,6 +292,17 @@ def wrapper( return decorator + def defer(self): + """Decorator to register a DLT defer with added exception handling.""" + + def decorator(func: Callable) -> DltResource: + safe_func = safe_defer_wrapper(func) + decorated = dlt.defer(safe_func) + return decorated # type: ignore + + logger.debug(f"Registering defer for {self.name}") + return decorator + def transformer( self, *dlt_args, diff --git a/src/openhound/core/app.pyi b/src/openhound/core/app.pyi index 9f61cd5..ace80d0 100644 --- a/src/openhound/core/app.pyi +++ b/src/openhound/core/app.pyi @@ -99,6 +99,7 @@ class OpenHound: parallelized: bool = False, _impl_cls: type[DltSource] = DltSource, ) -> Any: ... + def defer(self): ... def resource( self, data: Optional[Any] = None, diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 344f7c5..06efe0e 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -1,13 +1,25 @@ -import logging - - import functools import inspect +import logging from typing import Callable logger = logging.getLogger(__name__) +def safe_defer_wrapper(func: Callable) -> Callable: + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs): + try: + gen = func(*args, **kwargs) + return gen + except Exception as e: + logger.error(f"Error executing DLT defer: {e}") + return + + return sync_wrapper + + def safe_resource_wrapper(func: Callable, resource_name: str) -> Callable: """Wrap a DLT resource to catch and log exceptions without stopping the entire pipeline. Can either be sync or async generator function. From f0a17e5614595a8a2d4fd59befb1f8e1445e385a Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:35:27 +0200 Subject: [PATCH 2/8] Add safe_defer_wrapper to resource/transformers that return a defer function --- src/openhound/core/resources.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 06efe0e..4b25bb4 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -6,16 +6,18 @@ logger = logging.getLogger(__name__) -def safe_defer_wrapper(func: Callable) -> Callable: +def safe_defer_wrapper(func: Callable, resource_name: str | None = None) -> Callable: @functools.wraps(func) def sync_wrapper(*args, **kwargs): try: - gen = func(*args, **kwargs) - return gen + return func(*args, **kwargs) except Exception as e: - logger.error(f"Error executing DLT defer: {e}") - return + logger.error( + f"Error executing DLT defer: {e}", + extra={"resource": resource_name, "phase": "defer_execution"}, + ) + return [] return sync_wrapper @@ -48,11 +50,12 @@ def sync_wrapper(*args, **kwargs): return if inspect.isgenerator(gen): - # Note: Don't use while item: := next(gen, None) because this will stop the full iterator - # if the resource yields any empty value while True: try: item = next(gen) + if callable(item): + item = safe_defer_wrapper(item, resource_name) + yield item except StopIteration: break @@ -65,8 +68,10 @@ def sync_wrapper(*args, **kwargs): }, ) continue - else: + if callable(gen): + gen = safe_defer_wrapper(gen, resource_name) + yield gen @functools.wraps(func) @@ -86,11 +91,12 @@ async def async_wrapper(*args, **kwargs): return if inspect.isasyncgen(gen): - # Note: Don't use while item: := next(gen, None) because this will stop the full iterator - # if the resource yields any empty value while True: try: item = await gen.__anext__() + if callable(item): + item = safe_defer_wrapper(item, resource_name) + yield item except StopAsyncIteration: break @@ -102,11 +108,14 @@ async def async_wrapper(*args, **kwargs): "phase": "resource_iteration", }, ) - continue + else: try: result = await gen + if callable(result): + result = safe_defer_wrapper(result, resource_name) + yield result except Exception as e: logger.error( From 9acde82264d2f87b20945c27dcee889fad1c5d3a Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:38:41 +0200 Subject: [PATCH 3/8] Updated defer decorator to mimic dlt behaviour --- src/openhound/core/app.py | 11 +++-------- src/openhound/core/app.pyi | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/openhound/core/app.py b/src/openhound/core/app.py index a592a96..526a49c 100644 --- a/src/openhound/core/app.py +++ b/src/openhound/core/app.py @@ -292,16 +292,11 @@ def wrapper( return decorator - def defer(self): + def defer(self, func: Callable) -> Callable: """Decorator to register a DLT defer with added exception handling.""" - - def decorator(func: Callable) -> DltResource: - safe_func = safe_defer_wrapper(func) - decorated = dlt.defer(safe_func) - return decorated # type: ignore - logger.debug(f"Registering defer for {self.name}") - return decorator + safe_func = safe_defer_wrapper(func) + return dlt.defer(safe_func) def transformer( self, diff --git a/src/openhound/core/app.pyi b/src/openhound/core/app.pyi index ace80d0..1720a40 100644 --- a/src/openhound/core/app.pyi +++ b/src/openhound/core/app.pyi @@ -99,7 +99,7 @@ class OpenHound: parallelized: bool = False, _impl_cls: type[DltSource] = DltSource, ) -> Any: ... - def defer(self): ... + def defer(self, func: Callable[..., Any]) -> Callable[..., Callable[[], Any]]: ... def resource( self, data: Optional[Any] = None, From 9e7b981e97919659ed0994aec5bf3626013c063a Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:49:22 +0200 Subject: [PATCH 4/8] Remove double wrapping, assume collectors need to use `@app.defer` and not `@dlt.defer` --- src/openhound/core/resources.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 4b25bb4..7bbc825 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -53,9 +53,6 @@ def sync_wrapper(*args, **kwargs): while True: try: item = next(gen) - if callable(item): - item = safe_defer_wrapper(item, resource_name) - yield item except StopIteration: break @@ -69,9 +66,6 @@ def sync_wrapper(*args, **kwargs): ) continue else: - if callable(gen): - gen = safe_defer_wrapper(gen, resource_name) - yield gen @functools.wraps(func) @@ -94,9 +88,6 @@ async def async_wrapper(*args, **kwargs): while True: try: item = await gen.__anext__() - if callable(item): - item = safe_defer_wrapper(item, resource_name) - yield item except StopAsyncIteration: break @@ -113,9 +104,6 @@ async def async_wrapper(*args, **kwargs): else: try: result = await gen - if callable(result): - result = safe_defer_wrapper(result, resource_name) - yield result except Exception as e: logger.error( From 3c15fbdcd7e596eabfb95e559ec056fbc40fc38b Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 15:58:33 +0200 Subject: [PATCH 5/8] Remove unused resource_name input (since these are not always available) --- src/openhound/core/resources.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/openhound/core/resources.py b/src/openhound/core/resources.py index 7bbc825..900eb11 100644 --- a/src/openhound/core/resources.py +++ b/src/openhound/core/resources.py @@ -6,7 +6,15 @@ logger = logging.getLogger(__name__) -def safe_defer_wrapper(func: Callable, resource_name: str | None = None) -> Callable: +def safe_defer_wrapper(func: Callable) -> Callable: + """Wrap a DLT defer to catch and log exceptions without stopping the entire pipeline. + + Args: + func: The defer function + + Returns: + Wrapped function that catches exceptions and continues (if possible of course) + """ @functools.wraps(func) def sync_wrapper(*args, **kwargs): @@ -15,7 +23,7 @@ def sync_wrapper(*args, **kwargs): except Exception as e: logger.error( f"Error executing DLT defer: {e}", - extra={"resource": resource_name, "phase": "defer_execution"}, + extra={"phase": "defer_execution"}, ) return [] From 911cb40cd0cde8f839118f0f8fd40fdff840a647 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 16:12:10 +0200 Subject: [PATCH 6/8] Added test to see if the pipeline still continues with resource, transformer and defer failures --- tests/test_safe_dlt_wrappers.py | 96 +++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 tests/test_safe_dlt_wrappers.py diff --git a/tests/test_safe_dlt_wrappers.py b/tests/test_safe_dlt_wrappers.py new file mode 100644 index 0000000..c1ee057 --- /dev/null +++ b/tests/test_safe_dlt_wrappers.py @@ -0,0 +1,96 @@ +import logging + +from pydantic import BaseModel + +from openhound.core.app import OpenHound +from openhound.core.collect import Collector +from openhound.core.progress import Progress + + +class ExampleResource(BaseModel): + id: int + source: str + + +class Computer(BaseModel): + id: int + hostname: str + + +class User(BaseModel): + id: int + email: str + + +class UserDetails(BaseModel): + id: int + email: str + office: str + + +def test_dlt_wrapper_pipeline_continues( + caplog, + monkeypatch, + tmp_path, +): + monkeypatch.setenv("DLT_DATA_DIR", str(tmp_path / ".dlt")) + monkeypatch.setattr( + "openhound.core.collect.logger_override.set_handler", lambda name: None + ) + caplog.set_level(logging.ERROR, logger="openhound.core.resources") + + app = OpenHound("safe_wrapper_test", "TEST") + + @app.resource(name="computers", columns=Computer) + def computers(): + yield {"id": 1, "hostname": "DESKTOP-12345"} + yield {"id": 2, "hostname": "DESKTOP-54321"} + raise RuntimeError("resource failed after valid rows") + + @app.transformer(name="users", columns=User) + def users(computer): + if computer["id"] == 1: + yield {"id": 10, "email": "someuser@example.org"} + raise RuntimeError("transformer failed after valid row") + + yield {"id": 20, "email": "someuser2@example.org"} + + @app.transformer(name="user_details", columns=UserDetails) + def user_details(user): + + @app.defer + def deferred_child(user_input): + if user_input["id"] == 1: + raise RuntimeError("defer failed for parent") + + return {"id": 20, "email": "someuser2@example.org", "office": "Amsterdam"} + + yield deferred_child(user) + + @app.source(name="safe_wrapper_test", max_table_nesting=0) + def source(): + computers_resource = computers() + return ( + computers_resource, + computers_resource | users(), + computers_resource | user_details(), + ) + + collector = Collector( + name="safe_wrapper_test", + output_path=tmp_path / "output", + progress=Progress.log, + ) + + load_info = collector.run(source()) + + assert load_info is not None + + messages = [record.getMessage() for record in caplog.records] + phases = {getattr(record, "phase", None) for record in caplog.records} + + assert any("resource failed after valid rows" in message for message in messages) + assert any("transformer failed after valid row" in message for message in messages) + assert any("defer failed for parent" in message for message in messages) + assert "resource_iteration" in phases + assert "defer_execution" in phases From 3597c3c906edb75e974d8c997ebae72f91d71ab1 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 16:14:34 +0200 Subject: [PATCH 7/8] Remove unused ExampleResource model --- tests/test_safe_dlt_wrappers.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/test_safe_dlt_wrappers.py b/tests/test_safe_dlt_wrappers.py index c1ee057..519e3d4 100644 --- a/tests/test_safe_dlt_wrappers.py +++ b/tests/test_safe_dlt_wrappers.py @@ -7,11 +7,6 @@ from openhound.core.progress import Progress -class ExampleResource(BaseModel): - id: int - source: str - - class Computer(BaseModel): id: int hostname: str From cf9938ffe42a64f818ce1399423efb37ce74c629 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Wed, 24 Jun 2026 16:14:53 +0200 Subject: [PATCH 8/8] Remove unused ExampleResource model --- tests/test_safe_dlt_wrappers.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_safe_dlt_wrappers.py b/tests/test_safe_dlt_wrappers.py index 519e3d4..f401666 100644 --- a/tests/test_safe_dlt_wrappers.py +++ b/tests/test_safe_dlt_wrappers.py @@ -17,9 +17,7 @@ class User(BaseModel): email: str -class UserDetails(BaseModel): - id: int - email: str +class UserDetails(User): office: str