Conversation
- Add scripts/train/debug/sft-tokenization.sh for quick testing (~5 min) - Add Beaker progress updates via maybe_update_beaker_description - Only save checkpoints when --resume flag is enabled Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use incremental append-only checkpointing instead of serializing all tokens to JSON (was taking ~2 hours per checkpoint with 19B tokens) - Stream tokens to binary files, only save metadata in JSON checkpoint - Remove GPU requirement from tokenization script (CPU-only task) - Switch from print statements to logger.info Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary of ChangesHello @finbarrtimbers, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness and scalability of the SFT data conversion process by refactoring its checkpointing logic. The core change involves moving from an in-memory storage of processed token data to an incremental, disk-based approach. This ensures that large datasets can be processed without exhausting memory resources and allows for reliable resumption of jobs, improving overall operational efficiency and stability. The changes also include better logging practices and a dedicated debug script to streamline development and testing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request significantly improves the SFT data conversion script by refactoring the checkpointing mechanism to be more memory-efficient. It moves from storing all token data in a single JSON file to streaming it to partial binary files, which is a crucial enhancement for handling large datasets. The consistent use of a logger instead of print statements is also a welcome improvement.
My review identifies a critical bug in the new resume logic that could lead to data duplication. I've also included a couple of medium-severity suggestions to enhance the script's robustness and maintainability.
| print(f" Remaining samples: {len(train_dataset) - state['samples_processed']:,}") | ||
| print(f"================================") | ||
| start_idx = checkpoint["samples_processed"] | ||
| current_position, _ = load_progress(output_dir, token_dtype) |
There was a problem hiding this comment.
There's a critical issue in the resume logic that can lead to data duplication. When resuming, current_position is determined by the size of partial files on disk (load_progress), while start_idx (the sample to resume from) is read from the checkpoint metadata. If the script crashes after appending data to partial files but before the metadata is updated, on resume, it will re-process samples that are already on disk, leading to duplicated data in the final output.
To ensure consistency, current_position should be loaded from the checkpoint metadata, not from the partial files. Additionally, the partial files should be truncated to match the state recorded in the checkpoint. This prevents re-processing and data duplication.
I've suggested changing this line to load current_position from the checkpoint. You'll also need to add logic to truncate the partial files (_partial_tokens.bin, _partial_labels.bin, _partial_boundaries.bin) to the sizes consistent with current_position and start_idx from the checkpoint to fully resolve the issue.
| current_position, _ = load_progress(output_dir, token_dtype) | |
| current_position = checkpoint["current_position"] |
| os.remove(checkpoint_path) | ||
| print(f"Removed checkpoint file: {checkpoint_path}") | ||
| logger.info(f"Removed checkpoint file: {checkpoint_path}") | ||
| for partial_file in ["_partial_tokens.bin", "_partial_labels.bin", "_partial_boundaries.bin"]: |
There was a problem hiding this comment.
The filenames for partial checkpoint data (_partial_tokens.bin, etc.) are hardcoded as string literals here and in other functions (append_tokens_to_disk, load_progress, main). This can make maintenance difficult and risks inconsistencies if a filename needs to be changed.
Consider defining these filenames as constants at the module level and reusing them. This would centralize the definitions and improve code clarity and maintainability.
| if os.path.exists(tokens_path): | ||
| token_count = os.path.getsize(tokens_path) // np.dtype(token_dtype).itemsize | ||
| boundaries_data = np.fromfile(boundaries_path, dtype=np.int64).reshape(-1, 2) | ||
| boundaries = [tuple(b) for b in boundaries_data.tolist()] | ||
| return token_count, boundaries |
There was a problem hiding this comment.
The function checks for the existence of tokens_path but then unconditionally reads from boundaries_path. If the script is interrupted after writing the tokens file but before the boundaries file, this will cause a FileNotFoundError. To make the function more robust, you should also check if boundaries_path exists before attempting to read from it.
| if os.path.exists(tokens_path): | |
| token_count = os.path.getsize(tokens_path) // np.dtype(token_dtype).itemsize | |
| boundaries_data = np.fromfile(boundaries_path, dtype=np.int64).reshape(-1, 2) | |
| boundaries = [tuple(b) for b in boundaries_data.tolist()] | |
| return token_count, boundaries | |
| if os.path.exists(tokens_path) and os.path.exists(boundaries_path): | |
| token_count = os.path.getsize(tokens_path) // np.dtype(token_dtype).itemsize | |
| boundaries_data = np.fromfile(boundaries_path, dtype=np.int64).reshape(-1, 2) | |
| boundaries = [tuple(b) for b in boundaries_data.tolist()] | |
| return token_count, boundaries |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3c9ae74053
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| append_tokens_to_disk(output_dir, token_ids, labels_mask, document_boundaries, token_dtype) | ||
| token_ids.clear() | ||
| labels_mask.clear() | ||
| document_boundaries.clear() | ||
| save_checkpoint_metadata(output_dir, { |
There was a problem hiding this comment.
Make checkpoint flush atomic to avoid duplicates
If the process is interrupted after append_tokens_to_disk runs but before save_checkpoint_metadata updates _checkpoint.json, the partial files will already include the latest samples while samples_processed still points to the previous checkpoint. On resume, the loop restarts from the older start_idx but uses current_position from the partial files, so those samples are reprocessed and appended again, corrupting the token stream and document boundaries. This can happen on any crash/kill during the small window between the two calls; consider making the flush+metadata update atomic (e.g., write metadata first with a temp marker, or validate/truncate partial files against samples_processed on resume).
Useful? React with 👍 / 👎.
No description provided.