From 763708f76c8193dd173b83066dc0deec02255b7b Mon Sep 17 00:00:00 2001 From: Katie Strader Date: Thu, 25 Jun 2026 11:41:00 -0700 Subject: [PATCH 1/8] fix: set up log formatting to human readable text and set that as the dfault --- .../.dlt-example/config.toml | 4 +- .../.dlt-example/config.toml | 4 +- src/openhound/core/logging.py | 90 ++++++++++++++++--- tests/test_log_handlers.py | 66 +++++++++++++- 4 files changed, 150 insertions(+), 14 deletions(-) diff --git a/example-configurations/bloodhound-community/.dlt-example/config.toml b/example-configurations/bloodhound-community/.dlt-example/config.toml index 8122887..e1b7caa 100644 --- a/example-configurations/bloodhound-community/.dlt-example/config.toml +++ b/example-configurations/bloodhound-community/.dlt-example/config.toml @@ -2,8 +2,10 @@ [runtime] http_show_error_body = true log_cli_level = "WARNING" -log_format = "JSON" log_rotate_when = "midnight" +# Default: logs are set to human readable text +# To switch to structured JSON instead, uncomment the line below (must be uppercase "JSON") +# log_format = "JSON" [extract] workers = 8 diff --git a/example-configurations/bloodhound-enterprise/.dlt-example/config.toml b/example-configurations/bloodhound-enterprise/.dlt-example/config.toml index 119f7f9..584e4b4 100644 --- a/example-configurations/bloodhound-enterprise/.dlt-example/config.toml +++ b/example-configurations/bloodhound-enterprise/.dlt-example/config.toml @@ -2,8 +2,10 @@ [runtime] http_show_error_body = true log_cli_level = "WARNING" -log_format = "JSON" log_rotate_when = "midnight" +# Default: logs are set to human readable text +# To switch to structured JSON instead, uncomment the line below (must be uppercase "JSON") +# log_format = "JSON" [extract] workers = 8 diff --git a/src/openhound/core/logging.py b/src/openhound/core/logging.py index a11e253..a53233b 100644 --- a/src/openhound/core/logging.py +++ b/src/openhound/core/logging.py @@ -17,6 +17,8 @@ VALID_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] +VALID_FORMATS = ["json", "text"] + # This should be a complete list of default fields as part of a LogRecord # this is used to prevent custom fields overwriting the default LogRecord entries DEFAULT_LOG_FIELDS = [ @@ -155,6 +157,46 @@ def format(self, record: logging.LogRecord) -> str: return json.dumps(log_data) +class OpenHoundTextFormatter(logging.Formatter): + """Human-readable plain-text formatter for OpenHound file logs""" + + def format(self, record: logging.LogRecord) -> str: + """Format a log record as a single readable line with structured context. + + Args: + record (logging.LogRecord): The original log record + + Returns: + str: A human-readable plain-text representation of the log record + """ + timestamp = self.formatTime(record, "%Y-%m-%d %H:%M:%S") + location = f"{record.name}:{record.funcName}:{record.lineno}" + log_line = ( + f"{timestamp} [{record.levelname}] {location} - {record.getMessage()}" + ) + + # Append any custom/extra fields (e.g. resource, phase, extension) so the + # structured context is preserved without the noise of full JSON + extras = { + key: value + for key, value in record.__dict__.items() + if ( + not key.startswith("_") + and key not in DEFAULT_LOG_FIELDS + and key != "taskName" + and value is not None + ) + } + if extras: + extra_str = " ".join(f"{key}={value}" for key, value in extras.items()) + log_line = f"{log_line} | {extra_str}" + + if record.exc_info: + log_line = f"{log_line}\n{self.formatException(record.exc_info)}" + + return log_line + + class OpenHoundRichFormatter(logging.Formatter): """Custom formatter for Rich when logging in CLI mode""" @@ -182,6 +224,7 @@ def __init__( interval: int = 1, cli_level: str = "ERROR", base_path: str | None = None, + log_format: str = "text", ): self.level = level self.name = name @@ -192,6 +235,7 @@ def __init__( self.backup_count = backup_count self.interval = interval self.cli_level = cli_level + self.log_format = log_format self.base_path = Path(base_path) if base_path else self.default_platform_path() self.log_file_path: Path | None = None @@ -210,6 +254,14 @@ def _validate_level(level: str, default: str = "INFO") -> str: return default + @staticmethod + def _validate_format(log_format: str, default: str = "text") -> str: + normalized_format = log_format.lower() + if normalized_format in VALID_FORMATS: + return normalized_format + + return default + @staticmethod def _is_valid_path(base_path: Path, spec_path: Path) -> Path: """Validate the user provided path to prevent directory traversal and ensure it's within the base path @@ -275,12 +327,18 @@ def setup(self) -> None: dlt_interval = dlt.config.get("runtime.log_interval", int) dlt_cli_level = dlt.config.get("runtime.log_cli_level", str) dlt_log_path = dlt.config.get("runtime.log_path", str) + # dlt only treats the exact value "JSON" specially. We read it to select json logs and + # fall back to text for anything else (including dlt's default format string). + dlt_log_format = dlt.config.get("runtime.log_format", str) # Check if the DLT config values are valid and if so override the defaults self.level = self._validate_level(dlt_level or self.level, default="INFO") self.cli_level = self._validate_level( dlt_cli_level or self.cli_level, default="ERROR" ) + self.log_format = self._validate_format( + dlt_log_format or self.log_format, default="text" + ) # Override the base path if log_path is set in DLT config, otherwise use the default platform path self.base_path = Path(dlt_log_path) if dlt_log_path else self.base_path @@ -303,17 +361,29 @@ def setup(self) -> None: self.root_logger.handlers.clear() self.handlers[self.runtime_mode](self.root_logger, self.log_file_path) + def _file_formatter(self) -> logging.Formatter: + """Return the formatter for file/stdout handlers based on the configured log format. + + Returns: + logging.Formatter: A JSON formatter when log_format is 'json', otherwise a + human-readable plain-text formatter. + """ + if self.log_format == "json": + return OpenHoundJSONFormatter() + return OpenHoundTextFormatter() + def container_handlers(self, logger: logging.Logger, file_path: Path) -> None: """Set the logging handler/format when running in a container""" - json_formatter = OpenHoundJSONFormatter() + formatter = self._file_formatter() - # Log to stdout in JSON for better compatibility with container-based logging systems + # Log to stdout for better compatibility with container-based logging systems. + # Output is human-readable text by default; set runtime.log_format = "JSON" for structured JSON. stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.setFormatter(json_formatter) + stdout_handler.setFormatter(formatter) logger.addHandler(stdout_handler) - # But also log the same json format to a file for persistence and debugging when needed + # But also log the same format to a file for persistence and debugging when needed rotating_file_handler = RotatingFileHandler( file_path, when=self.rotate_when, @@ -321,7 +391,7 @@ def container_handlers(self, logger: logging.Logger, file_path: Path) -> None: backupCount=self.backup_count, max_bytes=self.max_bytes, ) - rotating_file_handler.setFormatter(json_formatter) + rotating_file_handler.setFormatter(formatter) # This regular expression overrides the default extMatch to recognize both # default time based rotation filenames and size based rotation filenames (which gets a seconds added as well) rotating_file_handler.extMatch = re.compile( @@ -347,8 +417,8 @@ def cli_handlers(self, logger: logging.Logger, file_path: Path) -> None: console_handler.setFormatter(rich_formatter) logger.addHandler(console_handler) - # But also save the logs to a file in JSON format :) - json_formatter = OpenHoundJSONFormatter() + # But also save the logs to a file using the configured format (text by default) + file_formatter = self._file_formatter() rotating_file_handler = RotatingFileHandler( file_path, when=self.rotate_when, @@ -356,7 +426,7 @@ def cli_handlers(self, logger: logging.Logger, file_path: Path) -> None: backupCount=self.backup_count, max_bytes=self.max_bytes, ) - rotating_file_handler.setFormatter(json_formatter) + rotating_file_handler.setFormatter(file_formatter) # This regular expression overrides the default extMatch to recognize both # default time based rotation filenames and size based rotation filenames (which gets a seconds added as well) rotating_file_handler.extMatch = re.compile( @@ -367,7 +437,7 @@ def cli_handlers(self, logger: logging.Logger, file_path: Path) -> None: def service_handlers(self, logger: logging.Logger, file_path: Path) -> None: """Set the logging handler/format when running the OpenHound service""" - json_formatter = OpenHoundJSONFormatter() + file_formatter = self._file_formatter() rotating_file_handler = RotatingFileHandler( file_path, when=self.rotate_when, @@ -375,7 +445,7 @@ def service_handlers(self, logger: logging.Logger, file_path: Path) -> None: backupCount=self.backup_count, max_bytes=self.max_bytes, ) - rotating_file_handler.setFormatter(json_formatter) + rotating_file_handler.setFormatter(file_formatter) # This regular expression overrides the default extMatch to recognize both # default time based rotation filenames and size based rotation filenames (which gets a seconds added as well) rotating_file_handler.extMatch = re.compile( diff --git a/tests/test_log_handlers.py b/tests/test_log_handlers.py index 01f59a8..1a6ca1f 100644 --- a/tests/test_log_handlers.py +++ b/tests/test_log_handlers.py @@ -1,7 +1,15 @@ import json import logging -from openhound.core.logging import RotatingFileHandler, logger_override +import pytest + +from openhound.core.logging import ( + CustomLogger, + OpenHoundJSONFormatter, + OpenHoundTextFormatter, + RotatingFileHandler, + logger_override, +) def test_root_handler_setup(): @@ -59,8 +67,10 @@ def test_dlt_extension_handlers(): ) -def test_log_routing_content(tmp_path, caplog): +def test_log_routing_content(tmp_path, caplog, monkeypatch): """Test that logs are correctly routed and that the files are created for the expected paths""" + # Pin JSON (dlt's only crash-safe override value) so the JSON-parsing assertions are self-contained. + monkeypatch.setenv("RUNTIME__LOG_FORMAT", "JSON") logger_override.base_path = tmp_path logger_override.setup() logger_override.set_handler("test_extension") @@ -94,3 +104,55 @@ def test_log_routing_content(tmp_path, caplog): assert ext_logs_json[0]["message"] == "Extension DLT log", ( "The extension log message should be present in 'ext_test_extension.log'" ) + + +def test_validate_format_defaults_to_text(): + """Test that the log format validation accepts text/json and falls back to text""" + assert CustomLogger._validate_format("JSON") == "json" + assert CustomLogger._validate_format("text") == "text" + assert CustomLogger._validate_format("invalid") == "text" + + +def test_file_formatter_selection(): + """Test that the configured log_format selects the matching file formatter""" + json_logger = CustomLogger("openhound.log", log_format="json") + assert isinstance(json_logger._file_formatter(), OpenHoundJSONFormatter), ( + "log_format 'json' should produce a JSON formatter" + ) + + text_logger = CustomLogger("openhound.log", log_format="text") + assert isinstance(text_logger._file_formatter(), OpenHoundTextFormatter), ( + "log_format 'text' should produce a plain-text formatter" + ) + + +def test_text_formatter_produces_plain_text(): + """Test that the text formatter produces a readable, non-JSON line with extras""" + formatter = OpenHoundTextFormatter() + record = logging.LogRecord( + name="openhound.core.collect", + level=logging.ERROR, + pathname="collect.py", + lineno=61, + msg="Starting collector %s", + args=("github",), + exc_info=None, + func="run", + ) + record.resource = "scim_users" + record.taskName = None + + output = formatter.format(record) + + assert "Starting collector github" in output, ( + "The formatted message should be rendered with its args" + ) + assert "[ERROR" in output, "The log level should be present in the output" + assert "openhound.core.collect:run:61" in output, ( + "The logger/function/line location should be present in the output" + ) + assert "resource=scim_users" in output, "Extra fields should be preserved" + assert "taskName" not in output, "Null extras like taskName should be dropped" + + with pytest.raises(json.JSONDecodeError): + json.loads(output) From ad507772713e14842febb531d7461787ca60064a Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Thu, 25 Jun 2026 09:46:56 -0500 Subject: [PATCH 2/8] BED-8747: Check-in to BHE during collection --- src/openhound/scheduler/service.py | 36 ++++++++++++++++++++++ tests/test_bhe_job_scheduling.py | 49 ++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index 5ad21bb..c1bab0d 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -1,5 +1,6 @@ import logging import signal +import threading import time from concurrent.futures import Future, ProcessPoolExecutor from concurrent.futures.process import BrokenProcessPool @@ -69,6 +70,7 @@ def __init__( token_id: str, collector_name: str, interval: int = 5, + checkin_interval: int = 30, ): # BHE client settings self.bhe_uri = bhe_uri @@ -79,6 +81,9 @@ def __init__( # Interval how often to check for a job self.interval = interval + # Interval how often to send a check-in while a job is running + self.checkin_interval = checkin_interval + # Stores the ID of currently running BHE job self.job_running: int | None = None @@ -89,14 +94,40 @@ def __init__( # Exit condition, changed to True when the process needs to stop self.exit = False + # Check-in thread and stop event + self._checkin_stop = threading.Event() + self._checkin_thread: threading.Thread | None = None + def _exit_handler(self, sig: int, frame): """Handle SIGINT and SIGTERM signals. Sets self.exit to True to stop the while loop""" self.exit = True logger.warning(f"Received signal {sig}, shutting down gracefully.") + def _checkin(self) -> None: + """Send a check-in request to BHE to keep the client's LastCheckIn fresh. + + Only sends a request when a job is currently running. Exceptions are caught and + logged as warnings — they do not crash the thread or affect the running job. + """ + if self.job_running is None: + return + try: + self.client.jobs_current + logger.debug(f"Check-in sent for job {self.job_running}.") + except Exception: + logger.warning("Check-in request to BHE failed.", exc_info=True) + + def _checkin_loop(self) -> None: + """Background loop that calls _checkin every checkin_interval seconds.""" + while not self._checkin_stop.wait(self.checkin_interval): + self._checkin() + def _shutdown(self) -> None: """Shut down the executor and wait for running tasks to finish. Executed when the service is shut down.""" logger.info("Collection service stopping.") + self._checkin_stop.set() + if self._checkin_thread is not None: + self._checkin_thread.join(timeout=5) self.executor.shutdown(wait=True, cancel_futures=True) logger.info("Collection service stopped.") @@ -233,6 +264,11 @@ def start(self) -> None: logger.info( f"Service started, monitoring {self.bhe_uri} every {self.interval} seconds." ) + self._checkin_stop.clear() + self._checkin_thread = threading.Thread( + target=self._checkin_loop, daemon=True, name="bhe-checkin" + ) + self._checkin_thread.start() try: while not self.exit: self._poll() diff --git a/tests/test_bhe_job_scheduling.py b/tests/test_bhe_job_scheduling.py index e4051b0..19c409b 100644 --- a/tests/test_bhe_job_scheduling.py +++ b/tests/test_bhe_job_scheduling.py @@ -247,6 +247,55 @@ def broken_submit(*args, **kwargs): } +def test_checkin_calls_jobs_current_when_job_running(mock_service, monkeypatch): + """_checkin() should call jobs_current when a job is running.""" + mock_service.job_running = 123 + called = [] + + def fake_jobs_current(self): + called.append(True) + + monkeypatch.setattr( + mock_service.client.__class__, "jobs_current", property(fake_jobs_current) + ) + + mock_service._checkin() + + assert len(called) == 1 + + +def test_checkin_noop_when_no_job_running(mock_service, monkeypatch): + """_checkin() should not call jobs_current when no job is running.""" + assert mock_service.job_running is None + called = [] + + def fake_jobs_current(self): + called.append(True) + + monkeypatch.setattr( + mock_service.client.__class__, "jobs_current", property(fake_jobs_current) + ) + + mock_service._checkin() + + assert len(called) == 0 + + +def test_checkin_swallows_exception(mock_service, monkeypatch): + """_checkin() should swallow exceptions from jobs_current without re-raising.""" + mock_service.job_running = 123 + + def raise_error(self): + raise RuntimeError("BHE unreachable") + + monkeypatch.setattr( + mock_service.client.__class__, "jobs_current", property(raise_error) + ) + + # Should not raise + mock_service._checkin() + + def test_scheduler_ingest_opengraph(mock_service, mock_bloodhound_api, monkeypatch): """Run the DLT pipeline with the openhound-faker collector + check the amount of ingested nodes + edges""" monkeypatch.setenv( From 87df7268f5e83e9a56827cfda50ff1d5caccee6d Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Thu, 25 Jun 2026 10:48:59 -0500 Subject: [PATCH 3/8] refactor - no thread, rely on existing interval --- src/openhound/scheduler/service.py | 34 ++++++++---------------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index c1bab0d..5cd03c9 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -1,6 +1,5 @@ import logging import signal -import threading import time from concurrent.futures import Future, ProcessPoolExecutor from concurrent.futures.process import BrokenProcessPool @@ -70,7 +69,6 @@ def __init__( token_id: str, collector_name: str, interval: int = 5, - checkin_interval: int = 30, ): # BHE client settings self.bhe_uri = bhe_uri @@ -81,9 +79,6 @@ def __init__( # Interval how often to check for a job self.interval = interval - # Interval how often to send a check-in while a job is running - self.checkin_interval = checkin_interval - # Stores the ID of currently running BHE job self.job_running: int | None = None @@ -94,10 +89,6 @@ def __init__( # Exit condition, changed to True when the process needs to stop self.exit = False - # Check-in thread and stop event - self._checkin_stop = threading.Event() - self._checkin_thread: threading.Thread | None = None - def _exit_handler(self, sig: int, frame): """Handle SIGINT and SIGTERM signals. Sets self.exit to True to stop the while loop""" self.exit = True @@ -107,7 +98,7 @@ def _checkin(self) -> None: """Send a check-in request to BHE to keep the client's LastCheckIn fresh. Only sends a request when a job is currently running. Exceptions are caught and - logged as warnings — they do not crash the thread or affect the running job. + logged as warnings — they do not crash the loop or affect the running job. """ if self.job_running is None: return @@ -117,17 +108,9 @@ def _checkin(self) -> None: except Exception: logger.warning("Check-in request to BHE failed.", exc_info=True) - def _checkin_loop(self) -> None: - """Background loop that calls _checkin every checkin_interval seconds.""" - while not self._checkin_stop.wait(self.checkin_interval): - self._checkin() - def _shutdown(self) -> None: """Shut down the executor and wait for running tasks to finish. Executed when the service is shut down.""" logger.info("Collection service stopping.") - self._checkin_stop.set() - if self._checkin_thread is not None: - self._checkin_thread.join(timeout=5) self.executor.shutdown(wait=True, cancel_futures=True) logger.info("Collection service stopped.") @@ -264,14 +247,15 @@ def start(self) -> None: logger.info( f"Service started, monitoring {self.bhe_uri} every {self.interval} seconds." ) - self._checkin_stop.clear() - self._checkin_thread = threading.Thread( - target=self._checkin_loop, daemon=True, name="bhe-checkin" - ) - self._checkin_thread.start() + tick = 1 # seconds per loop iteration + last_poll = time.monotonic() - self.interval # trigger _poll() immediately on first tick try: while not self.exit: - self._poll() - time.sleep(self.interval) + now = time.monotonic() + if now - last_poll >= self.interval: + self._poll() + self._checkin() # no-op when idle + last_poll = now + time.sleep(tick) finally: self._shutdown() From 0f2a8e327309f3a203d6c98121302a243ad24764 Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Thu, 25 Jun 2026 10:57:02 -0500 Subject: [PATCH 4/8] cleanup --- src/openhound/scheduler/service.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index 5cd03c9..c5a7f04 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -247,15 +247,10 @@ def start(self) -> None: logger.info( f"Service started, monitoring {self.bhe_uri} every {self.interval} seconds." ) - tick = 1 # seconds per loop iteration - last_poll = time.monotonic() - self.interval # trigger _poll() immediately on first tick try: while not self.exit: - now = time.monotonic() - if now - last_poll >= self.interval: - self._poll() - self._checkin() # no-op when idle - last_poll = now - time.sleep(tick) + self._poll() + self._checkin() + time.sleep(self.interval) finally: self._shutdown() From cb3e7a79d7d4f5f3f987bb6fe6d438b0aec2ca30 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Thu, 25 Jun 2026 18:23:44 +0200 Subject: [PATCH 5/8] Updated checkin --- src/openhound/scheduler/service.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index c5a7f04..e4e1e22 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -94,20 +94,6 @@ def _exit_handler(self, sig: int, frame): self.exit = True logger.warning(f"Received signal {sig}, shutting down gracefully.") - def _checkin(self) -> None: - """Send a check-in request to BHE to keep the client's LastCheckIn fresh. - - Only sends a request when a job is currently running. Exceptions are caught and - logged as warnings — they do not crash the loop or affect the running job. - """ - if self.job_running is None: - return - try: - self.client.jobs_current - logger.debug(f"Check-in sent for job {self.job_running}.") - except Exception: - logger.warning("Check-in request to BHE failed.", exc_info=True) - def _shutdown(self) -> None: """Shut down the executor and wait for running tasks to finish. Executed when the service is shut down.""" logger.info("Collection service stopping.") @@ -237,6 +223,8 @@ def _poll(self) -> None: available_job = self.check_jobs() if available_job: self._start_job(available_job) + else: + self.client.jobs_current except Exception: logger.exception("Error checking for or starting jobs.") @@ -250,7 +238,6 @@ def start(self) -> None: try: while not self.exit: self._poll() - self._checkin() time.sleep(self.interval) finally: self._shutdown() From 901ce5daefb394c7fbd67fd0a7097ce2ac24b678 Mon Sep 17 00:00:00 2001 From: Joey Dreijer Date: Thu, 25 Jun 2026 18:28:46 +0200 Subject: [PATCH 6/8] Updated polling + set default interval --- src/openhound/scheduler/service.py | 2 +- src/scheduler.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index e4e1e22..10dc76a 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -68,7 +68,7 @@ def __init__( token_key: str, token_id: str, collector_name: str, - interval: int = 5, + interval: int = 15, ): # BHE client settings self.bhe_uri = bhe_uri diff --git a/src/scheduler.py b/src/scheduler.py index 9e37573..63781c9 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -15,7 +15,12 @@ def start(): # Load BHE config and secrets bhe_uri = dlt.config["destination.bloodhoundenterprise.url"] collector_name = dlt.config["destination.bloodhoundenterprise.collector_name"] - interval = dlt.config.get("destination.bloodhoundenterprise.interval", int) + interval = dlt.config("destination.bloodhoundenterprise.interval") + try: + interval = int(interval) + except (TypeError, ValueError): + logger.warning(f"Invalid interval: {interval}, defaulting to 15") + interval = 15 # Load BHE secrets token_key = dlt.secrets["destination.bloodhoundenterprise.token_key"] From 94d55f651c76e1715fb6c3b96b55212dfa2c66f4 Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Thu, 25 Jun 2026 13:20:45 -0500 Subject: [PATCH 7/8] hardcode poll interval to 30s, remove interval from example and config --- .../helm/openhound/templates/deployment.yaml | 4 --- deployments/helm/openhound/values.yaml | 1 - deployments/helm/values.example.yaml | 1 - .../.dlt-example/secrets_github.toml | 1 - .../.dlt-example/secrets_jamf.toml | 1 - .../.dlt-example/secrets_okta.toml | 1 - src/openhound/scheduler/service.py | 5 ++-- src/scheduler.py | 7 ------ tests/test_bhe_job_scheduling.py | 25 +++++++++++++------ 9 files changed, 20 insertions(+), 26 deletions(-) diff --git a/deployments/helm/openhound/templates/deployment.yaml b/deployments/helm/openhound/templates/deployment.yaml index 24e6484..4e15d27 100644 --- a/deployments/helm/openhound/templates/deployment.yaml +++ b/deployments/helm/openhound/templates/deployment.yaml @@ -59,10 +59,6 @@ spec: - name: DESTINATION__BLOODHOUNDENTERPRISE__URL value: {{ .Values.destination.bloodhoundEnterprise.url | quote }} {{- end }} - {{- if .Values.destination.bloodhoundEnterprise.interval }} - - name: DESTINATION__BLOODHOUNDENTERPRISE__INTERVAL - value: {{ .Values.destination.bloodhoundEnterprise.interval | quote }} - {{- end }} {{- range $name, $value := .Values.env }} - name: {{ $name }} value: {{ $value | quote }} diff --git a/deployments/helm/openhound/values.yaml b/deployments/helm/openhound/values.yaml index fbab6a4..a495bcc 100644 --- a/deployments/helm/openhound/values.yaml +++ b/deployments/helm/openhound/values.yaml @@ -46,7 +46,6 @@ collector: destination: bloodhoundEnterprise: url: "" - interval: "" resources: { } diff --git a/deployments/helm/values.example.yaml b/deployments/helm/values.example.yaml index c24097b..271922d 100644 --- a/deployments/helm/values.example.yaml +++ b/deployments/helm/values.example.yaml @@ -25,4 +25,3 @@ collector: destination: bloodhoundEnterprise: url: https://test.bloodhoundenterprise.io - interval: "300" diff --git a/example-configurations/bloodhound-enterprise/.dlt-example/secrets_github.toml b/example-configurations/bloodhound-enterprise/.dlt-example/secrets_github.toml index 56640e7..77c7b6f 100644 --- a/example-configurations/bloodhound-enterprise/.dlt-example/secrets_github.toml +++ b/example-configurations/bloodhound-enterprise/.dlt-example/secrets_github.toml @@ -1,5 +1,4 @@ [destination.bloodhoundenterprise] -interval = "300" token_id = "client_token_id" token_key = "client_token_key" url = "bhe_url" diff --git a/example-configurations/bloodhound-enterprise/.dlt-example/secrets_jamf.toml b/example-configurations/bloodhound-enterprise/.dlt-example/secrets_jamf.toml index 5aea0c8..d65f0b8 100644 --- a/example-configurations/bloodhound-enterprise/.dlt-example/secrets_jamf.toml +++ b/example-configurations/bloodhound-enterprise/.dlt-example/secrets_jamf.toml @@ -1,5 +1,4 @@ [destination.bloodhoundenterprise] -interval = "300" token_id = "client_token_id" token_key = "client_token_key" url = "bhe_url" diff --git a/example-configurations/bloodhound-enterprise/.dlt-example/secrets_okta.toml b/example-configurations/bloodhound-enterprise/.dlt-example/secrets_okta.toml index cb8daa8..cdae32b 100644 --- a/example-configurations/bloodhound-enterprise/.dlt-example/secrets_okta.toml +++ b/example-configurations/bloodhound-enterprise/.dlt-example/secrets_okta.toml @@ -1,5 +1,4 @@ [destination.bloodhoundenterprise] -interval = "300" token_id = "client_token_id" token_key = "client_token_key" url = "bhe_url" diff --git a/src/openhound/scheduler/service.py b/src/openhound/scheduler/service.py index 10dc76a..cb25ac5 100644 --- a/src/openhound/scheduler/service.py +++ b/src/openhound/scheduler/service.py @@ -13,6 +13,8 @@ logger = logging.getLogger(__name__) +POLL_INTERVAL = 30 # seconds; fixed poll/check-in cadence, must remain below BHE's 600s client-checkin timeout + class ExtensionNotFoundError(Exception): """Raised when the configured collector extension cannot be found.""" @@ -68,7 +70,6 @@ def __init__( token_key: str, token_id: str, collector_name: str, - interval: int = 15, ): # BHE client settings self.bhe_uri = bhe_uri @@ -77,7 +78,7 @@ def __init__( bhe_uri=bhe_uri, token_key=token_key, token_id=token_id ) # Interval how often to check for a job - self.interval = interval + self.interval = POLL_INTERVAL # Stores the ID of currently running BHE job self.job_running: int | None = None diff --git a/src/scheduler.py b/src/scheduler.py index 63781c9..d18fa16 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -15,12 +15,6 @@ def start(): # Load BHE config and secrets bhe_uri = dlt.config["destination.bloodhoundenterprise.url"] collector_name = dlt.config["destination.bloodhoundenterprise.collector_name"] - interval = dlt.config("destination.bloodhoundenterprise.interval") - try: - interval = int(interval) - except (TypeError, ValueError): - logger.warning(f"Invalid interval: {interval}, defaulting to 15") - interval = 15 # Load BHE secrets token_key = dlt.secrets["destination.bloodhoundenterprise.token_key"] @@ -33,7 +27,6 @@ def start(): token_key=token_key, token_id=token_id, collector_name=collector_name, - interval=interval, ) svc.start() diff --git a/tests/test_bhe_job_scheduling.py b/tests/test_bhe_job_scheduling.py index 19c409b..13ea67c 100644 --- a/tests/test_bhe_job_scheduling.py +++ b/tests/test_bhe_job_scheduling.py @@ -114,7 +114,6 @@ def mock_request(method, url, **kwargs): bhe_uri="http://localhost:8000", token_key="test-key", token_id="test-id", - interval=1, collector_name="openhound-faker", ) @@ -248,8 +247,11 @@ def broken_submit(*args, **kwargs): def test_checkin_calls_jobs_current_when_job_running(mock_service, monkeypatch): - """_checkin() should call jobs_current when a job is running.""" + """_poll() should call jobs_current via the else-branch check-in when a job is running.""" + # Simulate a job in progress with no completed future — skips the completion handler, + # reaches the else-branch, and triggers jobs_current as a check-in heartbeat. mock_service.job_running = 123 + mock_service.future = None # no completed future to handle called = [] def fake_jobs_current(self): @@ -259,14 +261,17 @@ def fake_jobs_current(self): mock_service.client.__class__, "jobs_current", property(fake_jobs_current) ) - mock_service._checkin() + mock_service._poll() assert len(called) == 1 def test_checkin_noop_when_no_job_running(mock_service, monkeypatch): - """_checkin() should not call jobs_current when no job is running.""" + """_poll() should not call jobs_current via the else-branch check-in when no job is running.""" + # When idle (job_running is None), _poll() takes the if-branch and calls check_jobs() + # instead of the else-branch check-in. jobs_current should never be touched. assert mock_service.job_running is None + mock_service.future = None called = [] def fake_jobs_current(self): @@ -275,15 +280,19 @@ def fake_jobs_current(self): monkeypatch.setattr( mock_service.client.__class__, "jobs_current", property(fake_jobs_current) ) + # Stub check_jobs so _poll doesn't try to start a job; we only care the else-branch doesn't fire + monkeypatch.setattr(mock_service, "check_jobs", lambda: None) - mock_service._checkin() + mock_service._poll() assert len(called) == 0 def test_checkin_swallows_exception(mock_service, monkeypatch): - """_checkin() should swallow exceptions from jobs_current without re-raising.""" + """_poll() should swallow exceptions raised by jobs_current in the check-in else-branch.""" + # A transient BHE error during check-in must not crash the service loop. mock_service.job_running = 123 + mock_service.future = None # no completed future to handle def raise_error(self): raise RuntimeError("BHE unreachable") @@ -292,8 +301,8 @@ def raise_error(self): mock_service.client.__class__, "jobs_current", property(raise_error) ) - # Should not raise - mock_service._checkin() + # Should not raise — _poll's except block absorbs the error + mock_service._poll() def test_scheduler_ingest_opengraph(mock_service, mock_bloodhound_api, monkeypatch): From 01f28044ac691c3ce19a88f3e9f011cd3e803eba Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Thu, 25 Jun 2026 13:32:02 -0500 Subject: [PATCH 8/8] fix typo in enterprise docker-compose --- example-configurations/bloodhound-enterprise/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example-configurations/bloodhound-enterprise/docker-compose.yml b/example-configurations/bloodhound-enterprise/docker-compose.yml index b489a12..ea0febb 100644 --- a/example-configurations/bloodhound-enterprise/docker-compose.yml +++ b/example-configurations/bloodhound-enterprise/docker-compose.yml @@ -1,5 +1,5 @@ x-scheduler: &scheduler - image: specterops/openhound:${IMAGE_VERSION:latest-enterprise} + image: specterops/openhound:${IMAGE_VERSION:-latest-enterprise} restart: unless-stopped init: true volumes: