Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies = [
"click>=8.0.0",
"gitpython>=3.1.0",
"httpx",
"jupytext>=1.16.0",
"loguru>=0.7.0",
"pathspec>=0.12.1",
"pydantic",
Expand Down
141 changes: 22 additions & 119 deletions src/gitingest/utils/notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from __future__ import annotations

import json
from itertools import chain
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING

import jupytext
from jupytext.config import JupytextConfiguration

from gitingest.utils.exceptions import InvalidNotebookError
from gitingest.utils.logging_config import get_logger
Expand All @@ -24,7 +25,8 @@ def process_notebook(file: Path, *, include_output: bool = True) -> str:
file : Path
The path to the Jupyter notebook file.
include_output : bool
Whether to include cell outputs in the generated script (default: ``True``).
Whether to include cell outputs in the generated script (Not supported by Jupytext).
This parameter is kept for backward compatibility but is ignored.

Returns
-------
Expand All @@ -37,123 +39,24 @@ def process_notebook(file: Path, *, include_output: bool = True) -> str:
If the notebook file is invalid or cannot be processed.

"""
try:
with file.open(encoding="utf-8") as f:
notebook: dict[str, Any] = json.load(f)
except json.JSONDecodeError as exc:
msg = f"Invalid JSON in notebook: {file}"
raise InvalidNotebookError(msg) from exc

# Check if the notebook contains worksheets
worksheets = notebook.get("worksheets")
if worksheets:
logger.warning(
"Worksheets are deprecated as of IPEP-17. Consider updating the notebook. "
"(See: https://github.com/jupyter/nbformat and "
"https://github.com/ipython/ipython/wiki/IPEP-17:-Notebook-Format-4#remove-multiple-worksheets "
"for more information.)",
if include_output:
# Jupytext does not support including outputs in the generated script
# We log a debug message to inform the user
logger.debug(
"Jupytext does not support including outputs in the generated script. 'include_output' is ignored."
)

if len(worksheets) > 1:
logger.warning(
"Multiple worksheets detected. Combining all worksheets into a single script.",
)

cells = list(chain.from_iterable(ws["cells"] for ws in worksheets))

else:
cells = notebook["cells"]

result = ["# Jupyter notebook converted to Python script."]

for cell in cells:
cell_str = _process_cell(cell, include_output=include_output)
if cell_str:
result.append(cell_str)

return "\n\n".join(result) + "\n"


def _process_cell(cell: dict[str, Any], *, include_output: bool) -> str | None:
"""Process a Jupyter notebook cell and return the cell content as a string.

Parameters
----------
cell : dict[str, Any]
The cell dictionary from a Jupyter notebook.
include_output : bool
Whether to include cell outputs in the generated script.

Returns
-------
str | None
The cell content as a string, or ``None`` if the cell is empty.

Raises
------
ValueError
If an unexpected cell type is encountered.

"""
cell_type = cell["cell_type"]

# Validate cell type and handle unexpected types
if cell_type not in ("markdown", "code", "raw"):
msg = f"Unknown cell type: {cell_type}"
raise ValueError(msg)

cell_str = "".join(cell["source"])

# Skip empty cells
if not cell_str:
return None

# Convert Markdown and raw cells to multi-line comments
if cell_type in ("markdown", "raw"):
return f'"""\n{cell_str}\n"""'

# Add cell output as comments
outputs = cell.get("outputs")
if include_output and outputs:
# Include cell outputs as comments
raw_lines: list[str] = []
for output in outputs:
raw_lines += _extract_output(output)

cell_str += "\n# Output:\n# " + "\n# ".join(raw_lines)

return cell_str


def _extract_output(output: dict[str, Any]) -> list[str]:
"""Extract the output from a Jupyter notebook cell.

Parameters
----------
output : dict[str, Any]
The output dictionary from a Jupyter notebook cell.

Returns
-------
list[str]
The output as a list of strings.

Raises
------
ValueError
If an unknown output type is encountered.

"""
output_type = output["output_type"]

if output_type == "stream":
return output["text"]
try:
# Read the notebook using jupytext
notebook = jupytext.read(file)

if output_type in ("execute_result", "display_data"):
return output["data"]["text/plain"]
# Convert to Python script
# using "py:percent" format to preserve cell structure
config = JupytextConfiguration()
# We can add more config here if needed

if output_type == "error":
return [f"Error: {output['ename']}: {output['evalue']}"]
return jupytext.writes(notebook, fmt="py:percent")

msg = f"Unknown output type: {output_type}"
raise ValueError(msg)
except Exception as exc:
msg = f"Error processing notebook {file}: {exc}"
raise InvalidNotebookError(msg) from exc
34 changes: 34 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,40 @@ def write_notebook(tmp_path: Path) -> WriteNotebookFunc:
"""

def _write_notebook(name: str, content: dict[str, Any]) -> Path:
# Add minimal required fields for valid notebook v4
if "nbformat" not in content:
content["nbformat"] = 4
if "nbformat_minor" not in content:
content["nbformat_minor"] = 5
if "metadata" not in content:
content["metadata"] = {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3",
},
"language_info": {
"codemirror_mode": {"name": "ipython", "version": 3},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.0",
},
}

# Ensure cells have required fields
if "cells" in content:
for cell in content["cells"]:
if "metadata" not in cell:
cell["metadata"] = {}
if cell["cell_type"] == "code":
if "outputs" not in cell:
cell["outputs"] = []
if "execution_count" not in cell:
cell["execution_count"] = None

notebook_path = tmp_path / name
with notebook_path.open(mode="w", encoding="utf-8") as f:
json.dump(content, f)
Expand Down
Loading