From a94ba920ed0d77eeae6c829356f892c92148f253 Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Tue, 25 Nov 2025 13:23:50 -0800 Subject: [PATCH 1/5] remove metrics --- apps/grpo/main.py | 42 +++++++++++++---------------- src/forge/actors/generator.py | 9 ------- src/forge/actors/reference_model.py | 8 +----- src/forge/actors/replay_buffer.py | 10 +------ src/forge/actors/trainer/titan.py | 4 --- 5 files changed, 20 insertions(+), 53 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index cd355e20b..d744cdfb6 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -210,11 +210,6 @@ async def evaluate_response( ) reward_breakdown[reward_fn_name] = reward # per function reward - record_metric( - f"reward/evaluate_response/sum_{reward_fn_name}_reward", - reward, - Reduce.SUM, - ) record_metric( f"reward/evaluate_response/avg_{reward_fn_name}_reward", reward, @@ -226,18 +221,13 @@ async def evaluate_response( Reduce.STD, ) + # avg total reward record_metric( "reward/evaluate_response/avg_total_reward", reward, Reduce.MEAN, ) - record_metric( - f"reward/evaluate_response/count_{reward_fn_name}_calls", - 1, - Reduce.SUM, - ) - avg_reward: float = total_rewards / len(self.reward_functions) return reward_breakdown, avg_reward @@ -305,17 +295,6 @@ async def sample(self) -> dict[str, str] | None: try: sample = next(self._iterator) - record_metric("dataset/sample/count_samples_generated", 1, Reduce.SUM) - record_metric( - "dataset/sample/avg_sample_len", - len(sample["request"]), - Reduce.MEAN, - ) - record_metric( - "dataset/sample/max_sample_len", - len(sample["request"]), - Reduce.MAX, - ) record_metric("dataset/sample/current_epoch", self._epoch, Reduce.MAX) return sample @@ -442,8 +421,6 @@ async def continuous_rollouts(): print("Dataloader is empty, exiting continuous rollout") return - t.step("data_loading") - prompt, target = sample["request"], sample["target"] responses: list[Completion] = await policy.generate.route(prompt) t.step("policy_generation") @@ -477,6 +454,23 @@ async def continuous_rollouts(): input_ids[i, :max_req_tokens] = episode.request_tensor input_ids[i, max_req_tokens:] = episode.response_tensor + # Track token-based metrics (computed for free from already-tokenized data) + prompt_tokens = episode.completion.prompt_ids.shape[0] + response_tokens = episode.completion.token_ids.shape[0] + + record_metric("episode/avg_prompt_tokens", prompt_tokens, Reduce.MEAN) + record_metric("episode/max_prompt_tokens", prompt_tokens, Reduce.MAX) + record_metric("episode/min_prompt_tokens", prompt_tokens, Reduce.MIN) + record_metric( + "episode/avg_response_tokens", response_tokens, Reduce.MEAN + ) + record_metric( + "episode/max_response_tokens", response_tokens, Reduce.MAX + ) + record_metric( + "episode/min_response_tokens", response_tokens, Reduce.MIN + ) + # drop episodes if # 1> reward std-dev is very small (including all 0s and all 1s) # 2> response is potentially truncated (response_len >= max_res_tokens) diff --git a/src/forge/actors/generator.py b/src/forge/actors/generator.py index 1696214fc..2ab5a7598 100644 --- a/src/forge/actors/generator.py +++ b/src/forge/actors/generator.py @@ -336,8 +336,6 @@ async def generate( priority=priority, data_parallel_rank=None, # We do not support DP ) - t.step("process_inputs") - # Wait until we're accepting requests (releases lock while waiting) # If accepting_requests is True, continue immediately (holding the lock) # If False, release lock, wait for notification, re-acquire and recheck @@ -369,7 +367,6 @@ async def generate( self.requests[request_id] = (parent_req, request_fut) completions = await request_fut - t.step("generate") # Log some metrics record_metric( @@ -380,12 +377,6 @@ async def generate( for completion in completions: num_generated_tokens = len(completion.token_ids) - record_metric( - "generator/generate/sum_tokens_generated", - num_generated_tokens, - Reduce.SUM, - ) - record_metric( "generator/generate/avg_tokens_generated", num_generated_tokens, diff --git a/src/forge/actors/reference_model.py b/src/forge/actors/reference_model.py index 2f9983b56..46a89bd71 100644 --- a/src/forge/actors/reference_model.py +++ b/src/forge/actors/reference_model.py @@ -144,21 +144,15 @@ async def forward( """ # Record reference model metrics record_metric("reference_perf/forward/count_forward_passes", 1, Reduce.SUM) - record_metric( - "reference_perf/forward/avg_sequence_length", - input_ids.shape[1], - Reduce.MEAN, - ) t = Tracer("reference_perf/forward", timer="gpu", track_memory=True) t.start() self.engine.gc_handler.run(self.step) - t.step("garbage_collection") model_parts = self.engine.model_parts parallel_dims = self.engine.parallel_dims input_ids = input_ids.to("cuda") - t.step("to_device") + # optional_context_parallel_ctx = ( # dist_utils.create_context_parallel_ctx( # cp_mesh=parallel_dims.world_mesh["cp"], diff --git a/src/forge/actors/replay_buffer.py b/src/forge/actors/replay_buffer.py index 74bd66210..c69fb3641 100644 --- a/src/forge/actors/replay_buffer.py +++ b/src/forge/actors/replay_buffer.py @@ -87,8 +87,6 @@ async def sample( Returns: A list of sampled episodes with shape (dp_size, bsz, ...) or None if there are not enough episodes in the buffer. """ - # Record sample request metric - record_metric("buffer/sample/count_sample_requests", 1, Reduce.SUM) total_samples = self.dp_size * self.batch_size @@ -98,7 +96,7 @@ async def sample( # Calculate metrics if len(self.buffer) > 0: record_metric( - "buffer/sample/avg_data_utilization", + "buffer/sample/demand_to_size_ratio", total_samples / len(self.buffer), Reduce.MEAN, ) @@ -135,12 +133,6 @@ async def sample( max(sampled_policy_ages), Reduce.MAX, ) - record_metric( - "buffer/sample/min_sampled_policy_age", - min(sampled_policy_ages), - Reduce.MIN, - ) - # Reshape into (dp_size, bsz, ...) reshaped_episodes = [ sampled_episodes[dp_idx * self.batch_size : (dp_idx + 1) * self.batch_size] diff --git a/src/forge/actors/trainer/titan.py b/src/forge/actors/trainer/titan.py index 63e072276..399db1edc 100644 --- a/src/forge/actors/trainer/titan.py +++ b/src/forge/actors/trainer/titan.py @@ -205,13 +205,11 @@ async def push_weights(self, policy_version: int) -> None: sd = self.engine.checkpointer.states["model"].state_dict() flattened_state_dict, _ = flatten_state_dict(sd) - t.step("flatten_state_dict") if self.engine.checkpointer.sd_adapter is None: raise RuntimeError( "Trying to save checkpoint in HF safetensors format, but sd_adapter is not provided." ) hf_state_dict = self.engine.checkpointer.sd_adapter.to_hf(flattened_state_dict) - t.step("to_hf") if self.use_dcp: key = get_dcp_whole_state_dict_key(policy_version) dcp_id = f"{self.dcp_path}/{key}" @@ -225,12 +223,10 @@ async def push_weights(self, policy_version: int) -> None: param_names=hf_state_dict.keys(), ) await ts.put(key, dcp_handle) - t.step("dcp_save") else: for name, param in hf_state_dict.items(): key = get_param_key(policy_version, name) await ts.put(key, param) - t.step("ts_save") t.stop() end_time = time.perf_counter() logger.info("Completed weights push in %.2f seconds", end_time - start_time) From 0019c33d1cb07aa559637296f14115259b2368da Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Tue, 25 Nov 2025 13:31:06 -0800 Subject: [PATCH 2/5] comment --- apps/grpo/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index d744cdfb6..d3b47cc81 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -454,7 +454,7 @@ async def continuous_rollouts(): input_ids[i, :max_req_tokens] = episode.request_tensor input_ids[i, max_req_tokens:] = episode.response_tensor - # Track token-based metrics (computed for free from already-tokenized data) + # Track token-based metrics prompt_tokens = episode.completion.prompt_ids.shape[0] response_tokens = episode.completion.token_ids.shape[0] From 27254124bb26c6f0f1019e4c21288bc4f033b13b Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Wed, 26 Nov 2025 12:35:53 -0800 Subject: [PATCH 3/5] remove more --- apps/grpo/main.py | 9 ++++-- src/forge/actors/generator.py | 50 +++++++++-------------------- src/forge/actors/reference_model.py | 2 -- src/forge/actors/replay_buffer.py | 2 -- src/forge/actors/trainer/titan.py | 5 +-- 5 files changed, 23 insertions(+), 45 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index d3b47cc81..ba002c2c1 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -179,14 +179,19 @@ def simple_grpo_loss( loss = -(mean_policy_loss - beta * mean_kl) # Log metrics + # TODO: Better design - have loss function return all metrics as a dict, + # then record them in rl_trainer so all training metrics are in one namespace + # and we avoid doing .item here, which is not compile friendly record_metric("grpo_loss/kl_divergence_mean", mean_kl.item(), Reduce.MEAN) record_metric( "grpo_loss/kl_divergence_max", (kl * padding_mask).max().item(), Reduce.MAX ) - record_metric("grpo_loss/policy_loss", mean_policy_loss.item(), Reduce.MEAN) + record_metric( + "grpo_loss/policy_gradient_loss", mean_policy_loss.item(), Reduce.MEAN + ) + record_metric("grpo_loss/total_loss", loss.item(), Reduce.MEAN) record_metric("grpo_loss/advantage_mean", advantages.mean().item(), Reduce.MEAN) record_metric("grpo_loss/advantage_std", advantages.std().item(), Reduce.MEAN) - return loss diff --git a/src/forge/actors/generator.py b/src/forge/actors/generator.py index 2ab5a7598..37ba5a0bf 100644 --- a/src/forge/actors/generator.py +++ b/src/forge/actors/generator.py @@ -10,6 +10,7 @@ import logging import os import sys +import time from collections.abc import Mapping from copy import copy from dataclasses import dataclass, field @@ -258,8 +259,6 @@ async def _fetch_weights( version: int, ) -> dict[str, SharedTensorHandle]: """Fetch weights from torchstore and return a dict of {name: SharedTensorHandle}.""" - t = Tracer("generator_perf/_fetch_weights") - t.start() prefix = get_param_prefix(version) matching_keys = await ts.keys(prefix) hf_param_names = [extract_param_name(key) for key in matching_keys] @@ -282,8 +281,6 @@ def split_keys(keys): for sd in sub_state_dicts: state_dict.update(sd) - t.stop() - return state_dict @endpoint @@ -375,13 +372,6 @@ async def generate( Reduce.SUM, ) - for completion in completions: - num_generated_tokens = len(completion.token_ids) - record_metric( - "generator/generate/avg_tokens_generated", - num_generated_tokens, - Reduce.MEAN, - ) t.stop() return completions @@ -456,37 +446,36 @@ async def update_weights(self, version: int) -> None: async with self.request_lock: self.accepting_requests = False curr_requests = [fut for _, fut in self.requests.values()] + if curr_requests: - # Record pending requests metrics + # Record pending requests count record_metric( - "generator_perf/update_weights/avg_pending_requests", + "generator_perf/update_weights/sum_pending_requests", len(curr_requests), - Reduce.MEAN, - ) - record_metric( - "generator_perf/update_weights/max_pending_requests", - len(curr_requests), - Reduce.MAX, + Reduce.SUM, ) logger.debug(f"Waiting for {len(curr_requests)} pending requests") + # Start timing the wait + wait_start = time.perf_counter() + # Wait until all pending requests have been processed # TODO: If generating long sequences, this might be long and will block # generator weight updates await self.request_lock.wait_for(lambda: len(self.requests) == 0) - # Record weight update metrics - record_metric( - "generator/update_weights/count_weight_updates", 1, Reduce.SUM - ) + if curr_requests: + wait_duration = time.perf_counter() - wait_start + record_metric( + "generator_perf/update_weights/avg_waiting_for_generation_duration_s", + wait_duration, + Reduce.MEAN, + ) logger.debug(f"Starting weight update on {self.__class__.__name__}") if fetch_fut is not None: - t = Tracer("generator_perf/waiting_for_fetch_weights") - t.start() fetched_weights = await fetch_fut - t.stop() # Call update_weights on every policy_worker await self.worker.update_weights.call( shared_memory_state_dict=fetched_weights @@ -663,10 +652,6 @@ async def update_weights( model = self.worker.model_runner.model if shared_memory_state_dict is not None: logger.info("[PolicyWorker] update weights from shared memory.") - t = Tracer( - "generator_worker_perf/update_weights_from_shared_memory", timer="gpu" - ) - t.start() loaded_weights = set() for name, param_handle in shared_memory_state_dict.items(): # Use context manager for automatic cleanup @@ -676,7 +661,6 @@ async def update_weights( del param loaded_weights.update(loaded) logger.info(f"[PolicyWorker] updated {len(loaded_weights)} parameters") - t.stop() return # normal update_weights without shared memory prefetching if version is None: @@ -689,8 +673,6 @@ async def update_weights( dcp_whole_state_dict_key = get_dcp_whole_state_dict_key(version) use_dcp_for_weight_sync = dcp_whole_state_dict_key in matching_keys loaded_weights = set() - t = Tracer("generator_worker_perf/update_weights_from_torchstore", timer="gpu") - t.start() if use_dcp_for_weight_sync: dcp_handle = await ts.get(dcp_whole_state_dict_key) @@ -711,8 +693,6 @@ async def update_weights( del param loaded_weights.update(loaded) - t.stop() - @endpoint async def save_model_params(self): """Save model parameters before weight update, used for testing purposes only.""" diff --git a/src/forge/actors/reference_model.py b/src/forge/actors/reference_model.py index 46a89bd71..42efd9054 100644 --- a/src/forge/actors/reference_model.py +++ b/src/forge/actors/reference_model.py @@ -176,13 +176,11 @@ async def forward( self.step += 1 if isinstance(logits, DTensor): logits = logits.full_tensor() - t.step("forward") if not return_logprobs: t.stop() return logits else: logprobs = compute_logprobs(logits, input_ids[:, max_req_tokens:]) - t.step("compute_logprobs") t.stop() return logprobs diff --git a/src/forge/actors/replay_buffer.py b/src/forge/actors/replay_buffer.py index c69fb3641..f62f8d550 100644 --- a/src/forge/actors/replay_buffer.py +++ b/src/forge/actors/replay_buffer.py @@ -13,7 +13,6 @@ from forge.controller import ForgeActor from forge.observability.metrics import record_metric, Reduce -from forge.observability.perf_tracker import trace from monarch.actor import endpoint @@ -75,7 +74,6 @@ async def add(self, episode: "Episode") -> None: record_metric("buffer/add/count_episodes_added", 1, Reduce.SUM) @endpoint - @trace("buffer_perf/sample", track_memory=False) async def sample( self, curr_policy_version: int ) -> tuple[tuple[Any, ...], ...] | None: diff --git a/src/forge/actors/trainer/titan.py b/src/forge/actors/trainer/titan.py index 399db1edc..6a4fa3cb4 100644 --- a/src/forge/actors/trainer/titan.py +++ b/src/forge/actors/trainer/titan.py @@ -176,7 +176,7 @@ async def train_step( # TODO: delete item() to avoid cpu-gpu sync loss = loss.detach().item() - record_metric("rl_trainer/avg_loss", loss, Reduce.MEAN) + record_metric("rl_trainer/loss", loss, Reduce.MEAN) # These are placeholder values until the loss function exposes these metrics # record_metric("rl_trainer/step/avg_kl_divergence", 0.0, Reduce.MEAN) @@ -195,8 +195,6 @@ async def train_step( @endpoint async def push_weights(self, policy_version: int) -> None: """Push weights to torchstore in HF format.""" - t = Tracer("rl_trainer_perf/push_weights", timer="gpu", track_memory=True) - t.start() logger.info(f"Pushing weights for policy version {policy_version}") start_time = time.perf_counter() @@ -227,7 +225,6 @@ async def push_weights(self, policy_version: int) -> None: for name, param in hf_state_dict.items(): key = get_param_key(policy_version, name) await ts.put(key, param) - t.stop() end_time = time.perf_counter() logger.info("Completed weights push in %.2f seconds", end_time - start_time) From 66ca475fa618c1b45c52fdbf5d529ee0fcc2c3b0 Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Wed, 26 Nov 2025 12:45:48 -0800 Subject: [PATCH 4/5] rename --- apps/grpo/main.py | 2 +- src/forge/actors/generator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index ba002c2c1..8952d05be 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -484,7 +484,7 @@ async def continuous_rollouts(): max_response_len = max(e.completion.token_ids.shape[0] for e in episodes) drop = rewards_std < 1e-3 or max_response_len >= max_res_tokens record_metric( - "main/continuous_rollouts/dropped_episodes", + "main/continuous_rollouts/unfit_for_training_dropped_episodes", 1 if drop else 0, Reduce.SUM, ) diff --git a/src/forge/actors/generator.py b/src/forge/actors/generator.py index 37ba5a0bf..4889f183d 100644 --- a/src/forge/actors/generator.py +++ b/src/forge/actors/generator.py @@ -450,7 +450,7 @@ async def update_weights(self, version: int) -> None: if curr_requests: # Record pending requests count record_metric( - "generator_perf/update_weights/sum_pending_requests", + "generator_perf/update_weights/sum_pending_gen_requests", len(curr_requests), Reduce.SUM, ) From aeb08e924aec992af06abb4ab3f2c26fb56debe1 Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Mon, 8 Dec 2025 07:56:19 -0800 Subject: [PATCH 5/5] improve logging episodes dropped --- apps/grpo/main.py | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 8952d05be..7c057d078 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -214,7 +214,8 @@ async def evaluate_response( reward_fn, "__name__", reward_fn.__class__.__name__ ) reward_breakdown[reward_fn_name] = reward - # per function reward + + # log per fn reward and avg total record_metric( f"reward/evaluate_response/avg_{reward_fn_name}_reward", reward, @@ -226,7 +227,6 @@ async def evaluate_response( Reduce.STD, ) - # avg total reward record_metric( "reward/evaluate_response/avg_total_reward", reward, @@ -478,16 +478,34 @@ async def continuous_rollouts(): # drop episodes if # 1> reward std-dev is very small (including all 0s and all 1s) - # 2> response is potentially truncated (response_len >= max_res_tokens) + # 2> any response was truncated (didn't end with EOS) + # TODO: change it to filter only truncated episodes instead of dropping entire group rewards = [e.reward for e in episodes] rewards_std = torch.std(torch.tensor(rewards)) - max_response_len = max(e.completion.token_ids.shape[0] for e in episodes) - drop = rewards_std < 1e-3 or max_response_len >= max_res_tokens + is_low_variance = rewards_std < 1e-3 + num_truncated = sum( + 1 for e in episodes if e.completion.stop_reason == "length" + ) + is_truncated = num_truncated > 0 + drop = is_low_variance or is_truncated + + n = len(episodes) + record_metric( + "main/continuous_rollouts/episodes_dropped/low_variance", + n if is_low_variance else 0, + Reduce.SUM, + ) record_metric( - "main/continuous_rollouts/unfit_for_training_dropped_episodes", - 1 if drop else 0, + "main/continuous_rollouts/episodes_dropped/truncated", + num_truncated, Reduce.SUM, ) + record_metric( + "main/continuous_rollouts/episodes_dropped/total", + n if drop else 0, + Reduce.SUM, + ) + if drop: del input_ids, episodes continue