feat: auto-tiered sync, parquet sidecar cache, incremental extraction#127
feat: auto-tiered sync, parquet sidecar cache, incremental extraction#127BeArchiTek wants to merge 5 commits into
Conversation
Adds three layered features to `infrahub-sync` (auto-tiering →
parquet/sidecar cache → cursor-driven incremental) plus a CLI
defaults flip and a bench harness, all on a single commit toward main
now that `chore/migrate-to-ty` is merged.
## 1. Auto-tiered sync order
* `infrahub_sync/dependency_graph.py`: `build_dependency_graph`,
`compute_tiers` (with optional-edge cycle breaking), `flatten_tiers`.
* `SyncConfig.compute_order()` falls back to auto-tiered when `order:`
is omitted from `config.yml`.
* `infrahub-sync sync --parallel` (now default on) runs tier-by-tier
via `Potenda.sync_in_tiers`; warns and falls back to serial when
`order:` is set explicitly.
* All `examples/*/config.yml` have their `order:` lists commented out
with an "uncomment to override" note — auto-tier is the default path.
## 2. Parquet sidecar cache + apply workflow
* `infrahub_sync/cache/` scaffold: run_id allocation, atomic Parquet
writes (`PLAN_SCHEMA`, `ERRORS_SCHEMA`, per-resource
`write_resource_side` with `_extract_ts` / `_source_id` /
`_tombstone`), JSON sidecars (cursors, rowcounts, run.json,
schema-sub-hash), cross-process `pipeline_lock`, and a
`RowcountGuardrail` that rejects >50 % drops without
`--allow-rowcount-drop`.
* `Potenda` writes side A/B parquet snapshots after each load and
serializes the diff into `plan.parquet`.
* `infrahub-sync diff` wraps in `pipeline_lock` and persists run.json
+ plan.parquet.
* New `infrahub-sync apply --run-id <id>` re-applies a cached plan;
refuses on schema-subhash mismatch.
## 3. Concurrent loads + sync writes cache artifacts
* `Potenda` loads source and destination concurrently by default
(`--concurrent-load`). Disable for non-thread-safe custom adapters.
* `infrahub-sync sync` (serial and `--parallel`) writes plan.parquet
+ run.json + baseline rowcounts on success.
* New `--continue-on-error` logs and skips peer relationships whose
identifiers are missing instead of aborting.
## 4. Incremental (changed-since) extraction
* `DiffSyncMixin` adapter contract: `cursor_tier_for`,
`list_changed_since`, `list_existing_ids`. Default tier is NONE.
* `Potenda.load_one_side` chooses incremental vs full based on a
schema-subhash gate, a `RunCounterFile`-driven periodic full-resync
cadence (default every 10 runs, configurable via
`incremental.full_resync_every`), and the `--full-extract` CLI flag.
* `infrahub_sync/cache/incremental.py`: `previous_successful_run_dir`,
`should_use_incremental`, `hydrate_from_parquet` (replays prior
side snapshots into the adapter store), and per-side cursor sidecars
(`load_cursors` / `persist_cursors`).
* Adapters wired:
- NetBox / Nautobot: TIMESTAMP via pynetbox/pynautobot
`endpoint.filter(last_updated__gte=…)`. Nautobot gracefully falls
back to a full extract when an endpoint rejects the filter — the
fallback predicate checks `status_code == 400` and parses the
response JSON, not the message wording.
- Infrahub: TIMESTAMP via GraphQL
`node_metadata__updated_at__after`.
* Snapshots include identifier fields (DiffSync needs them on
hydrate); destination re-snapshots post-sync.
## 5. CLI defaults flip
`--parallel` and `--full-extract` now default to True; `--concurrent-load`
already did. A stock `infrahub-sync sync` runs tier-by-tier on the
auto-computed dep graph and re-extracts every resource each run; the
cursor-driven warm path is opt-in via `--no-full-extract`.
## 6. Bench
`scripts/bench-clean-nautobot.sh` rebuilds Infrahub between scenarios so
each cold+warm pair sees a fresh destination. Recent results on the
nautobot-v2 demo dataset (interfaces dropped, ~14 kinds):
| Scenario | Cold | Warm |
| ------------------------------------------- | -----: | -----: |
| baseline (serial, no concurrent) | 463.8s | 154.3s |
| --parallel + --concurrent-load | 437.5s | 132.3s |
| --no-full-extract (cursor-driven warm) | 434.5s | 6.3s |
`tasks/bench.py` provides an `invoke bench.run` task that compares the
auto-tier × concurrent-load matrix; pass `--scenarios` to subset it.
## Docs
* `docs/docs/guides/run.mdx` — diff/apply/sync flag tables updated.
* `docs/docs/reference/cli.mdx` — regenerated from Typer.
* `docs/docs/reference/cache-layout.mdx` — new: on-disk layout +
apply / `--allow-rowcount-drop` semantics.
* `docs/docs/reference/incremental-extraction.mdx` — new: activation
conditions, supported backends, soft-delete cadence,
`--no-full-extract` opt-in.
* `examples/nautobot-v2_to_infrahub/config.yml` refreshed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (14)
✅ Files skipped from review due to trivial changes (2)
🚧 Files skipped from review as they are similar to previous changes (7)
WalkthroughAdds incremental extraction with cursor tiers, Parquet-backed cache (snapshots, plan, sidecars), auto-tiered write-order computation, and CLI run locking/status with apply-from-plan support. Updates adapters (Infrahub, NetBox, Nautobot) for incremental methods and error handling. Extends Potenda to orchestrate cached loads, concurrent loading, plan serialization/application, rowcount guardrails, and tiered sync. Documentation, examples, sidebar, scripts, tasks, and comprehensive tests are included. |
Deploying infrahub-sync with
|
| Latest commit: |
b25942a
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://da409ad2.infrahub-sync.pages.dev |
| Branch Preview URL: | https://feat-parquet-sidecar-cache.infrahub-sync.pages.dev |
`tests/adapters/test_nautobot_incremental.py:80` imports `pynautobot` inside the fallback regression test. pynautobot is an optional extra, not in `--extra dev`, so ty's import resolution fails in CI even though the test is gated and skipped at runtime when the lib is missing. Add the same `# ty: ignore[unresolved-import] # optional dep, see pyproject extras` annotation already used on the adapter file's top import. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 14
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/test_potenda_parallel.py (1)
85-93:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winStrengthen tier-boundary assertion to require full tier-0 completion.
Line 90 currently only proves that one
Tag/Rolehappened beforeDevice. The docstring/comment says noDevicecall should occur before the previous tier is done. Assert bothTagandRolewere seen before anyDevice.Suggested assertion update
- seen_tier0 = False + seen_tier0: set[str] = set() for _, kind in by_time: if kind == "Device": - assert seen_tier0 + assert seen_tier0 == {"Tag", "Role"} if kind in {"Tag", "Role"}: - seen_tier0 = True + seen_tier0.add(kind)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_potenda_parallel.py` around lines 85 - 93, The current loop in tests/test_potenda_parallel.py only checks that some Tag/Role occurred before a Device by using seen_tier0; update the assertion to require that both a Tag and a Role have completed before any Device call. Replace the boolean seen_tier0 with two flags (e.g., seen_tag and seen_role) and in the for-loop over by_time (sorted dst.calls) set these flags when kind == "Tag" or kind == "Role", and change the Device check to assert seen_tag and seen_role are True before allowing a Device entry.
🧹 Nitpick comments (4)
tests/cache/test_sidecars.py (1)
37-46: ⚡ Quick winAssert all persisted run metadata fields you set.
This test writes
modeandsummary["diff_rows"]but doesn’t verify them after reload, so a partial persistence regression could pass unnoticed.Proposed test delta
def test_run_file_records_status(tmp_path: Path) -> None: f = RunFile.load_or_default(tmp_path / "run.json") f.status = "dry-run" f.mode = "diff" f.summary = {"resources": 17, "diff_rows": 42} f.save() g = RunFile.load_or_default(tmp_path / "run.json") assert g.status == "dry-run" + assert g.mode == "diff" assert g.summary["resources"] == 17 + assert g.summary["diff_rows"] == 42🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/cache/test_sidecars.py` around lines 37 - 46, The test test_run_file_records_status sets RunFile fields status, mode, and summary (including summary["diff_rows"]) but only asserts status and summary["resources"]; update the test to assert that mode was persisted and that summary["diff_rows"] equals 42 after reloading via RunFile.load_or_default so mode and the nested summary key are verified alongside status and resources.tests/adapters/test_nautobot_incremental.py (1)
30-31: ⚡ Quick winScope the
_create_nautobot_clientpatch to avoid cross-test leakage.Direct class-level assignment in
_make_adaptercan leak globally across tests. Preferpatch.object(...)(or pytestmonkeypatch) scoped to adapter construction.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/adapters/test_nautobot_incremental.py` around lines 30 - 31, The test currently assigns NautobotAdapter._create_nautobot_client at class level which can leak the stub across tests; instead, scope the patch to adapter construction by using patch.object(NautobotAdapter, "_create_nautobot_client", ...) or pytest's monkeypatch.setattr inside the test or the _make_adapter helper so the MagicMock is applied only for the duration of creating the NautobotAdapter instance (reference NautobotAdapter._create_nautobot_client and the _make_adapter/_make_adapter-like helper used to return the adapter).infrahub_sync/cache/sidecars.py (1)
46-103: ⚡ Quick winAdd concise docstrings to the remaining public sidecar classes.
RowcountsFile,RunFile, andSchemaHashFileare public and currently undocumented.📝 Suggested update
`@dataclass` class RowcountsFile: + """Stores per-resource row counts for a run.""" path: Path counts: dict[str, int] = field(default_factory=dict) @@ `@dataclass` class RunFile: + """Stores run lifecycle metadata (status/mode/summary/timestamps).""" path: Path @@ `@dataclass` class SchemaHashFile: + """Stores the schema sub-hash used to validate cache compatibility.""" path: Path value: str = ""As per coding guidelines,
infrahub_sync/**/*.py: Public functions and classes require concise docstrings.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@infrahub_sync/cache/sidecars.py` around lines 46 - 103, Add concise docstrings to the public sidecar classes RowcountsFile, RunFile, and SchemaHashFile by placing a short one- or two-sentence description immediately under each class definition that explains the purpose of the class and its important attributes/methods (e.g., RowcountsFile: stores resource counts and provides load_or_default, set, get, save; RunFile: tracks run metadata and uses KEYS with load_or_default and save; SchemaHashFile: stores schema hash with load and save). Keep each docstring brief, in imperative/descriptive form, and mention any return semantics where relevant (e.g., load_or_default may return a default instance when the path does not exist).infrahub_sync/potenda/__init__.py (1)
265-274: 💤 Low valueSecond future's exception may be swallowed if both fail.
When using
wait([src_fut, dst_fut], return_when=FIRST_EXCEPTION), if both futures fail, the loop at lines 273-274 will re-raise the first failure fromsrc_fut.result()but never checkdst_fut.result(). This is typically fine since we want to surface at least one error, but the second error's details are lost.This is acceptable behavior for a fail-fast strategy, but consider logging the second exception if it exists, so operators can see all failures.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@infrahub_sync/potenda/__init__.py` around lines 265 - 274, The current use of ThreadPoolExecutor with wait(..., return_when=FIRST_EXCEPTION) may re-raise only the first future's exception when iterating over src_fut.result() and dst_fut.result(), losing the second failure's details; update the block that submits self.source_load and self.destination_load (src_fut and dst_fut) to call .result() for each future inside a try/except that re-raises the first exception but also captures and logs (using the module logger) any secondary exception from the other future so both errors are preserved; keep the fail-fast behavior (FIRST_EXCEPTION) but ensure destination_load/source_load exceptions are logged if they occur after the first raised error.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/docs/reference/config.mdx`:
- Around line 39-41: The docs currently imply --parallel is opt-in; update the
infrahub-sync sync docs to state that --parallel is enabled by default and
describe how to disable it (mention the flag name --no-parallel or the CLI
equivalent), and clarify that the engine processes one tier at a time per
destination by narrowing top_level but runs multiple tiers in parallel across
destinations when enabled; update the sentence starting with "Set `--parallel`
on `infrahub-sync sync`" to reflect default-on behavior and include the disable
example and short note about the default semantics.
In `@examples/infrahub_to_peering-manager/config.yml`:
- Around line 16-25: The example config contains an embedded API token (the
literal token on the config line referenced in the comment) that must be
redacted; replace the hard-coded token with a placeholder or environment
variable reference (e.g. INFRAHUB_API_TOKEN or a clearly marked
<REDACTED_TOKEN>) and update any corresponding example usage/comments to
instruct using an env var, and scan the rest of examples for any other
credential-like literals to remove or replace similarly.
In `@examples/netbox_to_infrahub/config.yml`:
- Line 25: Replace the real-looking API token value for the token key in the
example config with a redacted placeholder (e.g. REPLACE_WITH_API_TOKEN or
<REDACTED>) so no credential-like secrets are shipped in examples; locate the
token: entry in the examples/netbox_to_infrahub/config.yml and update its value
accordingly.
In `@examples/peeringdb_to_infrahub/config.yml`:
- Around line 24-27: Redact the sensitive token value in the example config
(replace the literal token string under the token key with a placeholder like
"<REDACTED_TOKEN>" or an instruction to set via env) and fix the typo in the
config key from dyffsync_flags to diffsync_flags so the example uses the correct
setting name (ensure any values under diffsync_flags remain valid YAML/list
format).
In `@infrahub_sync/adapters/netbox.py`:
- Around line 69-75: The _resolve_endpoint function currently lets a bare
AttributeError bubble up for bad mapping strings; wrap the loop that calls
getattr in a try/except AttributeError and raise a specific config validation
error (e.g., ConfigValidationError or ConfigError — create it if it doesn't
exist) with a clear message that includes the offending mapping value and chain
the original exception (raise ... from e) so callers get both context and the
underlying error; ensure you only catch AttributeError (don't use a broad except
Exception:).
In `@infrahub_sync/cache/__init__.py`:
- Around line 21-31: compute_schema_subhash() currently builds payload only from
schema_mapping names/identifiers/field names, which omits mapping-critical
inputs; update the payload construction (the payload variable used in
compute_schema_subhash) to serialize full mapping semantics for each entry in
config.schema_mapping — include mapping definitions, mapping type/flags (e.g.,
field mapping vs reference vs static), per-field mapping details, filters,
transforms and any other mapping-specific options — ensure deterministic
ordering (sort lists/keys) before hashing so changes to mapping, filters or
transforms will change the sub-hash and prevent stale plan/cursor reuse.
In `@infrahub_sync/cache/cursors.py`:
- Around line 20-35: Add concise class docstrings to the public classes
CursorTier and CursorState: for CursorTier, document that it is an IntEnum
representing cache cursor tiers and briefly list/describe the enum members
(NONE, PAGE_TOKEN, TIMESTAMP, INFRAHUB_DIFF); for CursorState, document that it
is a frozen dataclass holding a CursorTier and an optional value, describe the
meaning of the fields (tier and value) and note the __post_init__ constraint
that non-NONE tiers require a non-None value. Place the docstrings immediately
under each class declaration.
In `@infrahub_sync/cache/guardrails.py`:
- Around line 22-29: Add concise docstrings to the public class
RowcountGuardrail and its public method check describing their purpose,
parameters, and behavior: document that RowcountGuardrail tracks previous row
counts, what drop_threshold and allow_drop control, and that triggered records
store resources that exceeded the threshold; for check(resource, *, current)
document parameters (resource: str, current: int), what constitutes a trigger
(percentage drop > drop_threshold), side effects (appends to triggered, raises
or blocks drops depending on allow_drop), and return type (None). Place the
docstrings immediately above the class definition and the check method using
standard triple-quote format.
In `@infrahub_sync/cache/paths.py`:
- Around line 18-21: Validate that sync_name and run_id are single safe path
segments before joining: use pathlib.PurePath(sync_name).parts (and same for
run_id) and reject values where len(parts) != 1 or parts[0] in (".","..") or
os.path.isabs(sync_name) (and run_id). On validation failure raise ValueError
(or a clear custom exception) instead of joining; then join only the validated
segment to the base (INFRAHUB_SYNC_CACHE_DIR or
Path.cwd()/".infrahub-sync-cache"). Apply the same checks where run_id is joined
(the other block mentioned around lines 34-36).
In `@infrahub_sync/cli.py`:
- Around line 260-265: When catching ValueError from ptd.load_both_sides() in
the serial path, set run_file.status = "failed" and persist the run file before
calling print_error_and_abort so run.json no longer incorrectly shows "running";
i.e., in the except block assign run_file.status = "failed" and call the run
file's save/persist method (e.g., run_file.save() or run_file.write()) then call
print_error_and_abort(str(exc)).
In `@tests/adapters/test_nautobot_incremental.py`:
- Line 54: Remove the unused type-ignore annotations on the assignment lines
that set adapter.client.dcim.devices = fake_endpoint (and the equivalent
assignments with fake_endpoint elsewhere in the same test file); delete the
trailing "# ty: ignore[unresolved-attribute]" comments from those statements so
the linter no longer reports unused suppressions, then run the test/lint suite
to confirm nothing else breaks.
In `@tests/cache/test_concurrent_load.py`:
- Around line 50-53: The current check uses only src_start < dst_end which can
be true for sequential runs; instead compute both start and end timestamps for
each adapter (use next(... for label == "start") and next(... for label ==
"end") on src.events and dst.events to get src_start, src_end, dst_start,
dst_end) and assert interval overlap by asserting src_start < dst_end and
dst_start < src_end so the time ranges actually intersect.
In `@tests/cache/test_locks.py`:
- Around line 23-37: The test test_pipeline_lock_excludes_concurrent_run is
using time.sleep to wait for the child process to acquire the lock, which is
flaky; change it to coordinate with the child via a multiprocessing.Event
(create an Event in the parent, pass it into _hold_lock, have _hold_lock set the
Event once it holds the lock), wait on that Event with a bounded timeout before
asserting pipeline_lock("p1", timeout=0.05) raises Timeout, and ensure the child
process is always cleaned up in a finally block (join with timeout and terminate
if still alive) so teardown is bounded.
In `@tests/test_dependency_graph.py`:
- Around line 106-110: The test currently asserts completeness against cfg.order
(loop over cfg.order), which is skipped when order is omitted; change the
assertion to use the schema-mapping names instead: compute the expected name set
from cfg.schema_mapping (e.g., its keys) and then assert each name from that set
appears in flat = set(flatten_tiers(tiers)), replacing the loop that references
cfg.order so the test catches tiering regressions even when order is absent.
---
Outside diff comments:
In `@tests/test_potenda_parallel.py`:
- Around line 85-93: The current loop in tests/test_potenda_parallel.py only
checks that some Tag/Role occurred before a Device by using seen_tier0; update
the assertion to require that both a Tag and a Role have completed before any
Device call. Replace the boolean seen_tier0 with two flags (e.g., seen_tag and
seen_role) and in the for-loop over by_time (sorted dst.calls) set these flags
when kind == "Tag" or kind == "Role", and change the Device check to assert
seen_tag and seen_role are True before allowing a Device entry.
---
Nitpick comments:
In `@infrahub_sync/cache/sidecars.py`:
- Around line 46-103: Add concise docstrings to the public sidecar classes
RowcountsFile, RunFile, and SchemaHashFile by placing a short one- or
two-sentence description immediately under each class definition that explains
the purpose of the class and its important attributes/methods (e.g.,
RowcountsFile: stores resource counts and provides load_or_default, set, get,
save; RunFile: tracks run metadata and uses KEYS with load_or_default and save;
SchemaHashFile: stores schema hash with load and save). Keep each docstring
brief, in imperative/descriptive form, and mention any return semantics where
relevant (e.g., load_or_default may return a default instance when the path does
not exist).
In `@infrahub_sync/potenda/__init__.py`:
- Around line 265-274: The current use of ThreadPoolExecutor with wait(...,
return_when=FIRST_EXCEPTION) may re-raise only the first future's exception when
iterating over src_fut.result() and dst_fut.result(), losing the second
failure's details; update the block that submits self.source_load and
self.destination_load (src_fut and dst_fut) to call .result() for each future
inside a try/except that re-raises the first exception but also captures and
logs (using the module logger) any secondary exception from the other future so
both errors are preserved; keep the fail-fast behavior (FIRST_EXCEPTION) but
ensure destination_load/source_load exceptions are logged if they occur after
the first raised error.
In `@tests/adapters/test_nautobot_incremental.py`:
- Around line 30-31: The test currently assigns
NautobotAdapter._create_nautobot_client at class level which can leak the stub
across tests; instead, scope the patch to adapter construction by using
patch.object(NautobotAdapter, "_create_nautobot_client", ...) or pytest's
monkeypatch.setattr inside the test or the _make_adapter helper so the MagicMock
is applied only for the duration of creating the NautobotAdapter instance
(reference NautobotAdapter._create_nautobot_client and the
_make_adapter/_make_adapter-like helper used to return the adapter).
In `@tests/cache/test_sidecars.py`:
- Around line 37-46: The test test_run_file_records_status sets RunFile fields
status, mode, and summary (including summary["diff_rows"]) but only asserts
status and summary["resources"]; update the test to assert that mode was
persisted and that summary["diff_rows"] equals 42 after reloading via
RunFile.load_or_default so mode and the nested summary key are verified
alongside status and resources.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8ce9723b-ed4c-4e2e-8ff6-35672a1ed9d2
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (73)
.gitignore.vale/styles/spelling-exceptions.txtdocs/docs/guides/run.mdxdocs/docs/readme.mdxdocs/docs/reference/cache-layout.mdxdocs/docs/reference/cli.mdxdocs/docs/reference/config.mdxdocs/docs/reference/incremental-extraction.mdxdocs/sidebars.tsexamples/aci_to_infrahub/config.ymlexamples/custom_adapter/config.ymlexamples/device42_to_infrahub/config.ymlexamples/infrahub_to_peering-manager/config.ymlexamples/ipfabric_to_infrahub/config.ymlexamples/librenms_to_infrahub/config.ymlexamples/nautobot-v1_to_infrahub/config.ymlexamples/nautobot-v2_to_infrahub/config.ymlexamples/netbox_to_infrahub/config.ymlexamples/observium_to_infrahub/config.ymlexamples/peering-manager_to_infrahub/config.ymlexamples/peeringdb_to_infrahub/config.ymlexamples/prometheus_to_infrahub (node_exporter)/config.ymlexamples/slurpit_to_infrahub/config.ymlinfrahub_sync/__init__.pyinfrahub_sync/adapters/infrahub.pyinfrahub_sync/adapters/nautobot.pyinfrahub_sync/adapters/netbox.pyinfrahub_sync/cache/__init__.pyinfrahub_sync/cache/cursors.pyinfrahub_sync/cache/guardrails.pyinfrahub_sync/cache/incremental.pyinfrahub_sync/cache/locks.pyinfrahub_sync/cache/parquet_io.pyinfrahub_sync/cache/paths.pyinfrahub_sync/cache/sidecars.pyinfrahub_sync/cli.pyinfrahub_sync/dependency_graph.pyinfrahub_sync/potenda/__init__.pyinfrahub_sync/utils.pypyproject.tomlscripts/bench-clean-nautobot.shscripts/bench-incremental-only.shtasks/__init__.pytasks/bench.pytests/adapters/test_infrahub_incremental.pytests/adapters/test_infrahub_peer_identifier.pytests/adapters/test_nautobot_incremental.pytests/adapters/test_netbox_incremental.pytests/cache/__init__.pytests/cache/test_apply_plan.pytests/cache/test_cli_sync_cache.pytests/cache/test_concurrent_load.pytests/cache/test_cursors.pytests/cache/test_guardrails.pytests/cache/test_incremental_engine.pytests/cache/test_incremental_helpers.pytests/cache/test_locks.pytests/cache/test_parquet_io.pytests/cache/test_paths.pytests/cache/test_plan_serialization.pytests/cache/test_potenda_cache_hook.pytests/cache/test_run_counter.pytests/cache/test_schema_subhash_persist.pytests/cache/test_sidecars.pytests/cache/test_sync_cache_flow.pytests/test_cli_full_extract.pytests/test_cli_parallel.pytests/test_dependency_graph.pytests/test_diffsync_mixin_contract.pytests/test_get_potenda_top_level.pytests/test_potenda_parallel.pytests/test_potenda_tiers.pytests/test_sync_config_order.py
💤 Files with no reviewable changes (1)
- .vale/styles/spelling-exceptions.txt
Verified each of the 14 inline comments. Fixes (11) + skipped (3, with
reasoning below):
## Correctness / security
- `infrahub_sync/cache/__init__.py`: `compute_schema_subhash` now hashes
the full mapping semantics — per-resource `mapping`, per-field
`mapping`/`reference`/`static`, plus `filters` and `transforms`. The
previous payload only captured names + identifiers, so editing a
filter or transform would not bust the cache and a stale plan could
be reused.
- `infrahub_sync/cache/paths.py`: validate `sync_name` and `run_id` are
single relative path segments (reject absolute paths and `..`).
Closes a path-traversal risk in `infrahub-sync apply --run-id <id>`
where the operator-supplied id is joined into a filesystem path.
- `infrahub_sync/cli.py`: serial sync path now sets
`run_file.status = "failed"` and saves before calling
`print_error_and_abort()` when `ptd.load_both_sides()` raises. The
parallel path already did this; the serial fork was leaking
`status: "running"` in run.json after a failed load.
## Robustness
- `infrahub_sync/adapters/{netbox,nautobot}.py`: `_resolve_endpoint`
now wraps the `getattr` walk so a bad `schema_mapping.mapping`
string surfaces as `ValueError("Invalid <vendor> mapping path …")`
instead of a bare `AttributeError`.
- `examples/netbox_to_infrahub/config.yml`: revert the token to the
placeholder — my squash inadvertently restored an older real-looking
token. The placeholder matches main.
## Tests
- `tests/cache/test_concurrent_load.py`: the original
`assert src_start < dst_end` was trivially true for sequential
execution. Tightened to `assert max(starts) < min(ends)` so the
test actually proves overlap.
- `tests/test_dependency_graph.py`: the completeness assertion looped
over `cfg.order`, which is empty when the example opts into
auto-tiering — silently a no-op. Replaced with a loop over
`cfg.schema_mapping` kinds so tiering regressions get caught.
## Docs / style
- `docs/docs/reference/config.mdx`: document `--parallel` as on by
default and mention `--no-parallel`.
- `infrahub_sync/cache/cursors.py`: add concise class docstrings on
`CursorTier` and `CursorState`.
- `infrahub_sync/cache/guardrails.py`: add docstrings on
`RowcountGuardrail` and `.check()`.
## Skipped
- Pre-existing token at `examples/infrahub_to_peering-manager/config.yml:14`
and `examples/peeringdb_to_infrahub/config.yml:12`, plus the
`dyffsync_flags` typo at `peeringdb_to_infrahub/config.yml:22`: all
pre-existing on main, outside this PR's diff. Worth fixing in a
dedicated redaction sweep, not bundled here.
- `tests/cache/test_locks.py:37` sleep-based synchronization: the
flake hypothesis is real but the test passes consistently and the
multiprocessing.Event refactor adds complexity without an observed
failure to justify it.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
infrahub_sync/adapters/netbox.py (1)
168-171:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winNetBox model_loader lacks ValidationError handling present in NautobotAdapter.
The Nautobot adapter's
model_loaderhandlesValidationErrorwithcontinue_on_error, but this NetBox implementation does not. This inconsistency means--continue-on-errorwon't work the same way for NetBox sources.Suggested fix to add consistent error handling
# Create model instances after filtering and transforming for data in self._records_to_diffsync(element=element, model=model, raw_records=raw_records): - item = model(**data) - self.add(item) + continue_on_error = getattr(self, "continue_on_error", False) + try: + item = model(**data) + except ValidationError as exc: + if not continue_on_error: + raise + logger.warning( + "Skipping %s[%s]: cannot build DiffSync model " + "(likely a required peer was skipped earlier). Pydantic errors: %s", + model_name, + data.get("local_id"), + exc.errors(include_url=False), + ) + continue + self.add(item)Also add the import at the top:
from pydantic import ValidationError🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@infrahub_sync/adapters/netbox.py` around lines 168 - 171, The NetBox adapter's model_loader loop currently constructs model(**data) and self.add(item) without catching pydantic ValidationError like the NautobotAdapter does; update model_loader to import pydantic.ValidationError and wrap the model(**data)/self.add(item) in a try/except that catches ValidationError and, when continue_on_error is true, logs the error and continues, otherwise re-raises the exception, mirroring the behavior implemented in NautobotAdapter's model_loader.
🧹 Nitpick comments (1)
examples/netbox_to_infrahub/config.yml (1)
25-25: 💤 Low valueConsider using a more explicit placeholder for the API token.
While the all-
astring is not a real token, it still resembles a valid NetBox API token format and may trigger secret scanners. A more explicit placeholder would be clearer for users.Suggested change
- token: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + token: "${NETBOX_TOKEN}" # or: "<REDACTED_NETBOX_TOKEN>"As per coding guidelines:
examples/**/*.{yaml,yml}: Example configurations must be minimal, accurate, and redacted of sensitive information.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@examples/netbox_to_infrahub/config.yml` at line 25, The example uses a realistic-looking API token string for the token key which can trigger secret scanners; replace the current value for the token field with an explicit, clearly redacted placeholder (e.g., "REDACTED_TOKEN" or "<NETBOX_API_TOKEN>") so the token key remains visible but no plausible secret is present and update any nearby comment to indicate this is a placeholder for the NetBox API token.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@infrahub_sync/adapters/netbox.py`:
- Around line 168-171: The NetBox adapter's model_loader loop currently
constructs model(**data) and self.add(item) without catching pydantic
ValidationError like the NautobotAdapter does; update model_loader to import
pydantic.ValidationError and wrap the model(**data)/self.add(item) in a
try/except that catches ValidationError and, when continue_on_error is true,
logs the error and continues, otherwise re-raises the exception, mirroring the
behavior implemented in NautobotAdapter's model_loader.
---
Nitpick comments:
In `@examples/netbox_to_infrahub/config.yml`:
- Line 25: The example uses a realistic-looking API token string for the token
key which can trigger secret scanners; replace the current value for the token
field with an explicit, clearly redacted placeholder (e.g., "REDACTED_TOKEN" or
"<NETBOX_API_TOKEN>") so the token key remains visible but no plausible secret
is present and update any nearby comment to indicate this is a placeholder for
the NetBox API token.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3589ad84-c977-4812-b7e9-89778352a758
📒 Files selected for processing (12)
docs/docs/reference/config.mdxexamples/netbox_to_infrahub/config.ymlinfrahub_sync/adapters/nautobot.pyinfrahub_sync/adapters/netbox.pyinfrahub_sync/cache/__init__.pyinfrahub_sync/cache/cursors.pyinfrahub_sync/cache/guardrails.pyinfrahub_sync/cache/paths.pyinfrahub_sync/cli.pytests/adapters/test_nautobot_incremental.pytests/cache/test_concurrent_load.pytests/test_dependency_graph.py
✅ Files skipped from review due to trivial changes (1)
- docs/docs/reference/config.mdx
minitriga
left a comment
There was a problem hiding this comment.
Overall this is a high-quality PR with a well-considered architecture. The bench numbers are compelling. A few issues worth addressing before merging — see inline comments. The main asks: fix the double filter_records call in netbox.py, collapse the two compute_tiers calls, guard the empty-snapshot hydration case, replace assert type-narrowing with explicit if/raise, and confirm the .vale removals are intentional.
| logger.info("%s: Loading %d/%d %s", self.type, len(filtered_objs), total, resource_name) | ||
| # Transform records | ||
| transformed_objs = model.transform_records(records=filtered_objs, schema_mapping=element) | ||
| filtered_count = len(model.filter_records(records=raw_records, schema_mapping=element)) |
There was a problem hiding this comment.
Double filter_records call. filter_records is called here to derive a log count, then _records_to_diffsync (line 169) calls it a second time internally via the same filter_records → transform_records pipeline. Every record gets filtered twice per load.
Consider computing the filtered list once and passing it to _records_to_diffsync (or adding an optional pre-filtered path), or simply omitting the log count here and relying on the count logged inside _records_to_diffsync.
| as specified in the schema mapping, and loads the processed data into the adapter. | ||
| """ | ||
| # Retrieve schema mapping for this model | ||
| for element in self.config.schema_mapping: |
There was a problem hiding this comment.
Asymmetric logging vs NetBox. The original model_loader logged Loading X/Y (filtered/total); the NetBox path still does this. Nautobot's refactored path only logs the total (or nothing for the source path). Minor inconsistency but can confuse operators comparing adapter output during a real sync run.
| if not explicit_order: | ||
| from infrahub_sync.dependency_graph import compute_tiers | ||
|
|
||
| tiers, _dropped = compute_tiers(sync_instance.schema_mapping) |
There was a problem hiding this comment.
compute_tiers called twice on the hot path. compute_order() (called on line 238) internally calls compute_tiers again when order is empty. This PR therefore runs two topological sorts of the same graph on every invocation.
Consider having compute_order() return the tiers alongside the flat list, or passing the already-computed tiers result from here into a leaner version of compute_order that skips the sort.
| { | ||
| "_extract_ts": pa.array([], type=pa.timestamp("ns", tz="UTC")), | ||
| "_source_id": pa.array([], type=pa.string()), | ||
| "_tombstone": pa.array([], type=pa.bool_()), |
There was a problem hiding this comment.
Empty snapshot loses resource columns. When rows is empty, the written Parquet file has only the three internal columns (_extract_ts, _source_id, _tombstone). When hydrate_from_parquet later reads this file, cols = [c for c in table.column_names if c not in SNAPSHOT_INTERNAL_COLUMNS] will be [], so every add_row call receives {} — pydantic validation will fail on any required identifier field.
This matters concretely when an adapter returns zero destination records on a first diff (common for a fresh Infrahub), then a subsequent warm run tries to hydrate side B from that empty snapshot. Suggest writing an empty table with the correct per-resource schema, or skipping snapshot hydration when the file has zero rows and falling back to model_loader.
| concurrent_load=concurrent_load, | ||
| ) | ||
| ptd.run_id = rid # expose for CLI logging | ||
| ptd.cache_root = rdir.parent # .infrahub-sync-cache/<sync_name>/ |
There was a problem hiding this comment.
Post-construction attribute mutation. run_id, cache_root, _schema_subhash, and force_full_extract are all set on ptd after the constructor returns — the object is in a partially-initialised state between Potenda(...) and the last assignment on line 284. If Potenda.__init__ ever calls into a method that reads any of these (or a subclass overrides __init__), it will see the None/empty defaults declared in potenda/__init__.py rather than the intended values.
These four fields are already declared in Potenda.__init__ (lines 55–57 and friends); consider promoting them to constructor parameters so the object is fully valid on construction.
| sync_instance = get_instance(name=name, config_file=config_file, directory=directory) | ||
| if not sync_instance: | ||
| print_error_and_abort("Failed to load sync instance.") | ||
| assert sync_instance is not None # type-narrowing; print_error_and_abort raises |
There was a problem hiding this comment.
assert for type-narrowing in production code. assert statements are eliminated at python -O (optimised runtime). If the runner ever invokes with -O, this becomes a silent None-dereference on the next line. Same pattern on lines 144, 216, 241, 308, and 318.
Replace with an explicit check:
if sync_instance is None:
raise RuntimeError("sync_instance is unexpectedly None")Or, since print_error_and_abort already raises typer.Exit, annotate its return type as NoReturn so the type checker narrows without any runtime guard needed.
| return field in str(exc) | ||
| if isinstance(payload, dict) and field in payload: | ||
| return True | ||
| return field in str(exc) |
There was a problem hiding this comment.
Overly broad fallback in _is_unknown_filter_error. The final return field in str(exc) fires when JSON parsing failed, but str(exc) can include the field name in unrelated contexts (log messages, repr of nested objects, HTTP body fragments). The JSON-key check on line 46 is reliable and specific; the string-match fallback may produce false positives for a 400 that happens to mention last_updated__gte for a different reason.
Consider tightening: only use the JSON check, or require both status_code == 400 and at least one of the field-in-JSON conditions to be satisfied before treating it as an unknown-filter error.
| optional = _collect_optional_edges(schema_mapping) | ||
| dropped: list[tuple[str, str]] = [] | ||
|
|
||
| for _ in range(_MAX_CYCLE_BREAK_ATTEMPTS): |
There was a problem hiding this comment.
Cycle-breaking loop drops one edge per iteration. With 50 max attempts, each breaking only one optional edge and re-running a full topological sort, this is O(n_cycles × sort_cost) in the worst case. For real schema_mappings (tens of kinds) this is fine, but the budget is invisible from the error message.
A simpler and faster approach: collect all optional edges appearing in any reported cycle in a single pass, drop them all, and retry once. That reduces it to at most 2 sorts regardless of cycle count.
| location (e.g., an NFS mount used by a fleet of runners). | ||
| """ | ||
| _require_safe_segment(sync_name, "sync_name") | ||
| base = os.environ.get("INFRAHUB_SYNC_CACHE_DIR") |
There was a problem hiding this comment.
INFRAHUB_SYNC_CACHE_DIR env-var not sanitised. cache_root_for calls _require_safe_segment on sync_name (line 33) but not on the INFRAHUB_SYNC_CACHE_DIR value itself. A misconfigured or malicious env var like INFRAHUB_SYNC_CACHE_DIR=/etc would be accepted and used as the base. The risk is low in practice (only an operator can set env vars), but it's worth documenting or adding a validation note.
| if not path.exists(): | ||
| return cls(path=path) | ||
| data = json.loads(path.read_text(encoding="utf-8")) | ||
| return cls(path=path, **{k: data.get(k) for k in cls.KEYS if data.get(k) is not None}) |
There was a problem hiding this comment.
load_or_default silently drops keys whose stored value is None. The if data.get(k) is not None guard means a serialised {"status": null} is treated identically to a missing key and falls back to the dataclass default ("pending"). For finished_at this is intentional. For status, it would silently reset a stored null to "pending", which could mask a corrupted run file.
Consider using k in data instead of data.get(k) is not None and letting None be a valid stored value for fields that allow it.
| context: Context, | ||
| name: str = "from-netbox", | ||
| directory: str = "examples/", | ||
| csv_out: str = "bench-results.csv", |
There was a problem hiding this comment.
Bench artefacts written to the repo root by default. bench-results.csv (line 75) and .bench-filtered-config.yml (line 113) both default to repo_root / <name>. Neither is in .gitignore. Anyone running invoke bench.run will get a dirty worktree. Consider defaulting csv_out to a temp directory or at least adding both paths to .gitignore.
| Containerlab | ||
| content_type | ||
| convert_query_response | ||
| coroutine |
There was a problem hiding this comment.
Intentional spelling-exception removals? This patch removes diffsync, Typer, Prefect, Slurp'it, cron, cutover, and declaratively from the Vale spelling exceptions. diffsync and Typer are still used in the codebase and docs. If these were removed to silence unrelated Vale noise, that's fine — but if it was accidental collateral, it will cause doc linting failures.
Redact credential-like API tokens in the peering example configs (including the unflagged peering-manager_to_infrahub sibling carrying the same token) to the placeholder convention. Correct the dyffsync_flags -> diffsync_flags typo in peeringdb_to_infrahub, which silently dropped SKIP_UNMATCHED_SRC. Replace the sleep-based synchronisation in test_pipeline_lock_excludes_concurrent_run with a multiprocessing.Event signalled once the child holds the lock, plus a bounded finally teardown, to remove a CI race. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- adapters: filter records once per load (drop double filter_records) and log filtered/total symmetrically across NetBox and Nautobot - nautobot: tighten _is_unknown_filter_error so a non-JSON 400 only matches when the body names the field AND mentions a filter, avoiding false positives - cli: drop asserts used for type-narrowing (stripped under python -O); raise explicitly when run_dir is unexpectedly missing - dependency_graph: break all optional edges found in cycles in one pass instead of one edge per topological sort - cache/paths: expand and reject '..' traversal in INFRAHUB_SYNC_CACHE_DIR - cache/sidecars: RunFile.load_or_default keeps a stored null instead of silently resetting it to the dataclass default - engine: compute order and tiers in a single topological pass; pass run_id, cache_root and schema_subhash to Potenda.__init__ so the object is valid on construction rather than mutated afterwards - gitignore bench artifacts; restore still-used Vale spelling exceptions Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
New Features
orderfield is now optional.diff) and apply operations for repeatable syncs.--parallelflag.--concurrent-load,--full-extract,--allow-rowcount-drop,--continue-on-error,--run-id.Documentation
Chores
.infrahub-sync-cache/to.gitignore.Three layered features on top of the recently-landed ty migration, plus a CLI defaults flip and a bench harness. One commit so the history is easy to review and revert as a unit.
What's in here
1. Auto-tiered sync order
infrahub_sync/dependency_graph.py:build_dependency_graph,compute_tiers(with optional-edge cycle breaking),flatten_tiers.SyncConfig.compute_order()falls back to auto-tiered whenorder:is omitted.infrahub-sync sync --parallel(now default on) runs tier-by-tier viaPotenda.sync_in_tiers; warns and falls back to serial whenorder:is set explicitly.examples/*/config.ymlhave theirorder:lists commented out with an "uncomment to override" note.2. Parquet sidecar cache + apply workflow
infrahub_sync/cache/scaffold: atomic Parquet writes (PLAN_SCHEMA,ERRORS_SCHEMA, per-resourcewrite_resource_sidewith_extract_ts/_source_id/_tombstone), JSON sidecars (cursors, rowcounts, run.json, schema-sub-hash), cross-processpipeline_lock, and aRowcountGuardrailthat rejects >50% drops without--allow-rowcount-drop.Potendawrites side A/B parquet snapshots after each load and serializes the diff intoplan.parquet.infrahub-sync diffwraps inpipeline_lockand persists run.json + plan.parquet.infrahub-sync apply --run-id <id>re-applies a cached plan; refuses on schema-subhash mismatch.3. Concurrent loads + sync writes cache artifacts
Potendaloads source and destination concurrently by default (--concurrent-load). Disable for non-thread-safe custom adapters.infrahub-sync sync(serial and--parallel) writes plan.parquet + run.json + baseline rowcounts on success.--continue-on-errorlogs and skips peer relationships whose identifiers are missing instead of aborting.4. Incremental (changed-since) extraction
DiffSyncMixinadapter contract:cursor_tier_for,list_changed_since,list_existing_ids. Default tier NONE.Potenda.load_one_sidepicks incremental vs full based on a schema-subhash gate, aRunCounterFile-driven periodic full-resync cadence (default every 10 runs, configurable viaincremental.full_resync_every), and--full-extract.endpoint.filter(last_updated__gte=…). Nautobot gracefully falls back to a full extract when an endpoint rejects the filter (predicate checksstatus_code == 400and parses the response JSON, not the message wording).node_metadata__updated_at__after.5. CLI defaults flip
--parallel,--full-extract, and--concurrent-loaddefault to True.A stock
infrahub-sync syncruns tier-by-tier on the auto-computed dep graph and re-extracts every resource each run.The cursor-driven warm path is opt-in via
--no-full-extract.Bench numbers (nautobot-v2 demo dataset, interfaces dropped, ~14 kinds)
Reproducer:
scripts/bench-clean-nautobot.shrebuilds Infrahub between scenarios so each cold+warm pair sees a fresh destination.Docs
docs/docs/guides/run.mdx— diff/apply/sync flag tables updated.docs/docs/reference/cli.mdx— regenerated.docs/docs/reference/cache-layout.mdx— new.docs/docs/reference/incremental-extraction.mdx— new.examples/nautobot-v2_to_infrahub/config.ymlrefreshed.Test plan
uv run invoke lintexits 0.uv run pytest -o "addopts=" tests/ -q→ 93 passed (13 known fails ontests/adapters/test_{netbox,nautobot}_incremental.pydue to optional pynetbox/pynautobot not in the basic dev env).uv run infrahub-sync list --directory examples/lists all projects.uv run infrahub-sync --help,diff --help,sync --help,apply --helpexit 0.🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
--parallelflag--concurrent-loadflag to speed up dual-side data loading--full-extract/--no-full-extractflags to control extraction mode--allow-rowcount-drop) to prevent accidental data loss--continue-on-errorflag to tolerate missing peer identifiersapplycommand to replay cached sync plansDocumentation
Improvements