Skip to content

Add non otel profiling#2044

Open
jperez999 wants to merge 3 commits into
NVIDIA:mainfrom
jperez999:add-non-otel-profiling
Open

Add non otel profiling#2044
jperez999 wants to merge 3 commits into
NVIDIA:mainfrom
jperez999:add-non-otel-profiling

Conversation

@jperez999
Copy link
Copy Markdown
Collaborator

Description

This PR works to add minimal profiling support (non OTEL) for local analysis of memory and storage usage through out a run. It also adds a tool to visualize the output of the profiling run.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.
  • If adjusting docker-compose.yaml environment variables have you ensured those are mimicked in the Helm values.yaml file.

@jperez999 jperez999 self-assigned this May 15, 2026
@jperez999 jperez999 requested review from a team as code owners May 15, 2026 13:28
@jperez999 jperez999 requested a review from nkmcalli May 15, 2026 13:28
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 15, 2026

Greptile Summary

This PR adds opt-in, non-OTEL profiling (NR_STAGE_TIMING=1) to the Ray batch executor: a detached named actor collects per-batch timing and memory records emitted by AbstractOperator.run, a driver-side sampler thread snapshots PSS-based system memory every second, and a new stage_timing_viz.py CLI renders the JSON output as charts.

  • abstract_operator.py wraps the preprocess/process/postprocess chain with time.perf_counter and psutil RSS snapshots, falling back cleanly when profiling is disabled.
  • executor.py manages the collector/sampler lifecycle around ds.to_pandas() and propagates env vars into the Ray runtime — but three ds.repartition() calls have been accidentally commented out, silently disabling GLOBAL_BATCH_GROUP_KEYS partitioning and target_num_rows_per_block tuning for all runs regardless of whether profiling is on.
  • stage_timing.py (new, ~620 lines) implements the collector actor and _MemorySampler thread; the module-level _collector_cache in worker processes is never invalidated after stop_collector kills the actor, causing all timing records to be silently dropped on any subsequent ingest() call in the same Ray session.

Confidence Score: 2/5

Not safe to merge as-is: the commented-out repartition calls affect all pipeline runs, not just profiling runs, meaning operators that rely on GLOBAL_BATCH_GROUP_KEYS co-location will silently receive wrong partitioning in production.

Two real defects are present. The repartition regression is unconditional — it activates even when NR_STAGE_TIMING is off — so every pipeline execution is affected. The stale worker cache means multi-run timing sessions produce silently empty reports. Both are current, observable misbehaviours on the changed code paths.

executor.py requires the repartition calls to be restored before any merge; stage_timing.py requires the worker-side cache invalidation strategy to be revisited.

Important Files Changed

Filename Overview
nemo_retriever/src/nemo_retriever/graph/executor.py Adds timing collector lifecycle around ingest(); however three ds.repartition() calls are commented out and replaced with pass, silently disabling GLOBAL_BATCH_GROUP_KEYS partitioning and target_num_rows_per_block behaviour.
nemo_retriever/src/nemo_retriever/utils/stage_timing.py New 620-line profiling module; module-level _collector_cache never resets its tried flag so worker processes use a stale handle after the actor is recycled between ingest() calls. Several except Exception: pass blocks also violate no-bare-except. Copyright year is 2024-25 instead of 2026.
nemo_retriever/src/nemo_retriever/graph/abstract_operator.py Wraps preprocess/process/postprocess with timing instrumentation when enabled; falls back cleanly when profiling is off. Module-level psutil.Process() is called at import time but is low-risk.
nemo_retriever/src/nemo_retriever/graph/operator_archetype.py Propagates _nr_stage_name from the archetype to its resolved delegate so timing records consistently use the pipeline node name rather than the variant class name. Change is minimal and correct.
nemo_retriever/src/nemo_retriever/utils/stage_timing_viz.py New CLI visualisation tool; print() usage is acceptable for a CLI script. Copyright year is 2024-25 instead of 2026. No tests added.
nemo_retriever/src/nemo_retriever/utils/stage_timing_memory.md Documentation-only file explaining PSS vs RSS accounting and baseline subtraction design decisions. No code issues.

Sequence Diagram

sequenceDiagram
    participant Driver as RayDataExecutor (driver)
    participant Collector as StageTimingCollector (Ray actor)
    participant Sampler as _MemorySampler (thread)
    participant Worker as Ray Worker (AbstractOperator.run)

    Driver->>Collector: start_collector() create named detached actor
    Driver->>Sampler: start_memory_sampler(collector)
    loop every 1 s
        Sampler->>Collector: collector.record_sample.remote(SystemSample)
    end
    loop for each node in pipeline
        Driver->>Worker: ds.map_batches(operator_cls, ...)
        Worker->>Worker: preprocess process postprocess
        Worker->>Collector: record_timing() via _get_collector() cache
    end
    Driver->>Driver: ds.to_pandas() materialise pipeline
    Driver->>Sampler: stop_memory_sampler()
    Driver->>Collector: ray.get(collector.dump())
    Driver->>Collector: ray.get(collector.dump_samples())
    Driver->>Driver: write_report(records, memory_samples)
    Driver->>Collector: stop_collector() ray.kill(handle)
    note over Worker: On 2nd ingest() call worker _collector_cache
    note over Worker: still points at killed actor records silently dropped
Loading

Comments Outside Diff (1)

  1. nemo_retriever/src/nemo_retriever/utils/stage_timing.py, line 598-628 (link)

    P2 Silent except Exception: pass at non-boundary helpers

    stop_memory_sampler and stop_collector both catch Exception and pass silently without any log. Per the no-bare-except rule, a broad catch is only acceptable at defined boundaries when the exception is at minimum logged with context. Teardown failures here could leave a zombie sampler thread running or a detached actor leaking resources, yet there would be no signal in the logs.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: nemo_retriever/src/nemo_retriever/utils/stage_timing.py
    Line: 598-628
    
    Comment:
    **Silent `except Exception: pass` at non-boundary helpers**
    
    `stop_memory_sampler` and `stop_collector` both catch `Exception` and pass silently without any log. Per the `no-bare-except` rule, a broad catch is only acceptable at defined boundaries when the exception is at minimum logged with context. Teardown failures here could leave a zombie sampler thread running or a detached actor leaking resources, yet there would be no signal in the logs.
    
    How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 7 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 7
nemo_retriever/src/nemo_retriever/graph/executor.py:389-397
**Repartition calls silently disabled**

The three `ds.repartition(...)` calls that control block layout for `GLOBAL_BATCH_GROUP_KEYS` operators (hash-partitioning so co-keyed rows stay co-located) and `target_num_rows_per_block` tuning have been commented out and replaced with `pass`. Any operator that sets `GLOBAL_BATCH_GROUP_KEYS` will now silently receive arbitrarily-partitioned batches, breaking the guarantee that related rows land on the same actor. Similarly, `target_num_rows_per_block` overrides are completely ignored. These look like debugging leftovers that should be restored before merging.

### Issue 2 of 7
nemo_retriever/src/nemo_retriever/utils/stage_timing.py:417-431
**Stale worker-side actor cache across multiple `ingest()` calls**

`_collector_cache` is a module-level dict in each Ray worker process. After the driver calls `stop_collector` (which kills the named actor) and a subsequent `ingest()` call calls `start_collector()` to create a fresh one, each long-lived worker process still has `"tried": True` pointing at the now-dead actor handle. `record_timing` calls `old_handle.record.remote(...)`, which raises `RayActorError`, swallowed by `except Exception: pass`, so every timing record for the second (and all further) runs is silently dropped. A `"tried"` flag that never resets means the cache can never recover once the actor is recycled.

### Issue 3 of 7
nemo_retriever/src/nemo_retriever/utils/stage_timing.py:189-193
The `except Exception: pass` pattern in `record_timing` silently swallows all errors, including the `RayActorError` that surfaces when the collector has been recycled. Per the `no-bare-except` rule, a catch-all at a defined boundary must at least log with context. Adding a `debug`-level log here also makes the stale-cache problem visible during troubleshooting.

```suggestion
    try:
        handle.record.remote(StageRecord(**fields))
    except Exception as exc:
        # Never let timing break the pipeline.
        logger.debug("stage_timing: record_timing failed (handle may be stale): %s", exc)
```

### Issue 4 of 7
nemo_retriever/src/nemo_retriever/utils/stage_timing.py:1
Copyright year in the SPDX header reads `2024-25`. The repository rule requires using the current year for new files; this file is being added in 2026.

```suggestion
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
```

### Issue 5 of 7
nemo_retriever/src/nemo_retriever/utils/stage_timing_viz.py:1
Same copyright year issue as in `stage_timing.py``2024-25` should be the current year `2026` for newly added files.

```suggestion
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
```

### Issue 6 of 7
nemo_retriever/src/nemo_retriever/utils/stage_timing.py:598-628
**Silent `except Exception: pass` at non-boundary helpers**

`stop_memory_sampler` and `stop_collector` both catch `Exception` and pass silently without any log. Per the `no-bare-except` rule, a broad catch is only acceptable at defined boundaries when the exception is at minimum logged with context. Teardown failures here could leave a zombie sampler thread running or a detached actor leaking resources, yet there would be no signal in the logs.

### Issue 7 of 7
nemo_retriever/src/nemo_retriever/utils/stage_timing.py:1-10
**No tests for new `stage_timing` module**

`stage_timing.py` adds ~620 lines of new business logic including `format_report`, `slugify_graph_label`, `resolve_report_path`, `make_named_operator_class`, the `_MemorySampler` thread, and `write_report`. None of this is covered by a corresponding test file. The `test-mirrors-source-structure` and `test-coverage-new-code` rules both require new source modules to have matching tests. Similarly, `stage_timing_viz.py` has no tests for its data-loading helpers.

Reviews (1): Last reviewed commit: "format fix" | Re-trigger Greptile

Comment on lines 389 to +397
if n_blocks > 1:
ds = ds.repartition(num_blocks=n_blocks, keys=group_keys, shuffle=True)
# ds = ds.repartition(num_blocks=n_blocks, keys=group_keys, shuffle=True)
pass
else:
ds = ds.repartition(num_blocks=1)
# ds = ds.repartition(num_blocks=1)
pass
elif target_num_rows_per_block is not None and int(target_num_rows_per_block) > 0:
ds = ds.repartition(target_num_rows_per_block=int(target_num_rows_per_block))
# ds = ds.repartition(target_num_rows_per_block=int(target_num_rows_per_block))
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Repartition calls silently disabled

The three ds.repartition(...) calls that control block layout for GLOBAL_BATCH_GROUP_KEYS operators (hash-partitioning so co-keyed rows stay co-located) and target_num_rows_per_block tuning have been commented out and replaced with pass. Any operator that sets GLOBAL_BATCH_GROUP_KEYS will now silently receive arbitrarily-partitioned batches, breaking the guarantee that related rows land on the same actor. Similarly, target_num_rows_per_block overrides are completely ignored. These look like debugging leftovers that should be restored before merging.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/executor.py
Line: 389-397

Comment:
**Repartition calls silently disabled**

The three `ds.repartition(...)` calls that control block layout for `GLOBAL_BATCH_GROUP_KEYS` operators (hash-partitioning so co-keyed rows stay co-located) and `target_num_rows_per_block` tuning have been commented out and replaced with `pass`. Any operator that sets `GLOBAL_BATCH_GROUP_KEYS` will now silently receive arbitrarily-partitioned batches, breaking the guarantee that related rows land on the same actor. Similarly, `target_num_rows_per_block` overrides are completely ignored. These look like debugging leftovers that should be restored before merging.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +417 to +431
row = (
stage[: widths[0]].ljust(widths[0])
+ _fmt_int(n_batches).rjust(widths[1])
+ _fmt_int(rows_in).rjust(widths[2])
+ f"{total_ms / 1000:.2f}".rjust(widths[3])
+ f"{pre_avg:.2f}".rjust(widths[4])
+ f"{proc_avg:.2f}".rjust(widths[5])
+ f"{post_avg:.2f}".rjust(widths[6])
+ f"{ms_per_row:.3f}".rjust(widths[7])
)
lines.append(row)
lines.append(sep)
lines.append(f"sum of stage wall-time (worker-side, parallel): {pipeline_total_ms / 1000:.2f} s")
body = "\n".join(lines)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Stale worker-side actor cache across multiple ingest() calls

_collector_cache is a module-level dict in each Ray worker process. After the driver calls stop_collector (which kills the named actor) and a subsequent ingest() call calls start_collector() to create a fresh one, each long-lived worker process still has "tried": True pointing at the now-dead actor handle. record_timing calls old_handle.record.remote(...), which raises RayActorError, swallowed by except Exception: pass, so every timing record for the second (and all further) runs is silently dropped. A "tried" flag that never resets means the cache can never recover once the actor is recycled.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/utils/stage_timing.py
Line: 417-431

Comment:
**Stale worker-side actor cache across multiple `ingest()` calls**

`_collector_cache` is a module-level dict in each Ray worker process. After the driver calls `stop_collector` (which kills the named actor) and a subsequent `ingest()` call calls `start_collector()` to create a fresh one, each long-lived worker process still has `"tried": True` pointing at the now-dead actor handle. `record_timing` calls `old_handle.record.remote(...)`, which raises `RayActorError`, swallowed by `except Exception: pass`, so every timing record for the second (and all further) runs is silently dropped. A `"tried"` flag that never resets means the cache can never recover once the actor is recycled.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +189 to +193
try:
handle.record.remote(StageRecord(**fields))
except Exception:
# Never let timing break the pipeline.
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The except Exception: pass pattern in record_timing silently swallows all errors, including the RayActorError that surfaces when the collector has been recycled. Per the no-bare-except rule, a catch-all at a defined boundary must at least log with context. Adding a debug-level log here also makes the stale-cache problem visible during troubleshooting.

Suggested change
try:
handle.record.remote(StageRecord(**fields))
except Exception:
# Never let timing break the pipeline.
pass
try:
handle.record.remote(StageRecord(**fields))
except Exception as exc:
# Never let timing break the pipeline.
logger.debug("stage_timing: record_timing failed (handle may be stale): %s", exc)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/utils/stage_timing.py
Line: 189-193

Comment:
The `except Exception: pass` pattern in `record_timing` silently swallows all errors, including the `RayActorError` that surfaces when the collector has been recycled. Per the `no-bare-except` rule, a catch-all at a defined boundary must at least log with context. Adding a `debug`-level log here also makes the stale-cache problem visible during troubleshooting.

```suggestion
    try:
        handle.record.remote(StageRecord(**fields))
    except Exception as exc:
        # Never let timing break the pipeline.
        logger.debug("stage_timing: record_timing failed (handle may be stale): %s", exc)
```

How can I resolve this? If you propose a fix, please make it concise.

@@ -0,0 +1,617 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Copyright year in the SPDX header reads 2024-25. The repository rule requires using the current year for new files; this file is being added in 2026.

Suggested change
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/utils/stage_timing.py
Line: 1

Comment:
Copyright year in the SPDX header reads `2024-25`. The repository rule requires using the current year for new files; this file is being added in 2026.

```suggestion
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
```

How can I resolve this? If you propose a fix, please make it concise.

@@ -0,0 +1,575 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Same copyright year issue as in stage_timing.py2024-25 should be the current year 2026 for newly added files.

Suggested change
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/utils/stage_timing_viz.py
Line: 1

Comment:
Same copyright year issue as in `stage_timing.py``2024-25` should be the current year `2026` for newly added files.

```suggestion
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +1 to +10
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Per-stage / per-batch timing for the Ray batch executor.

When the ``NR_STAGE_TIMING=1`` environment variable is set, the
``RayDataExecutor`` starts a detached named Ray actor that collects
records emitted by :class:`AbstractOperator.run` on every worker.
After the pipeline materialises, the executor pulls the records,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No tests for new stage_timing module

stage_timing.py adds ~620 lines of new business logic including format_report, slugify_graph_label, resolve_report_path, make_named_operator_class, the _MemorySampler thread, and write_report. None of this is covered by a corresponding test file. The test-mirrors-source-structure and test-coverage-new-code rules both require new source modules to have matching tests. Similarly, stage_timing_viz.py has no tests for its data-loading helpers.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/utils/stage_timing.py
Line: 1-10

Comment:
**No tests for new `stage_timing` module**

`stage_timing.py` adds ~620 lines of new business logic including `format_report`, `slugify_graph_label`, `resolve_report_path`, `make_named_operator_class`, the `_MemorySampler` thread, and `write_report`. None of this is covered by a corresponding test file. The `test-mirrors-source-structure` and `test-coverage-new-code` rules both require new source modules to have matching tests. Similarly, `stage_timing_viz.py` has no tests for its data-loading helpers.

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant