diff --git a/src/aggregate.py b/src/aggregate.py
index 967a6c7..6f9a6b6 100755
--- a/src/aggregate.py
+++ b/src/aggregate.py
@@ -11,17 +11,12 @@
import tempfile
from pathlib import Path
-from aggregation import (
- load_config,
- save_config,
- DocsFetcher,
- transform_directory_structure,
- copy_targeted_docs,
- process_all_markdown,
-)
-from aggregation.releases import generate_release_docs
-from aggregation.release_notes import generate_release_notes_docs
+from aggregation import (DocsFetcher, copy_targeted_docs, load_config,
+ process_all_markdown, save_config,
+ transform_directory_structure)
from aggregation.flavor_matrix import generate_flavor_matrix_docs
+from aggregation.release_notes import generate_release_notes_docs
+from aggregation.releases import generate_release_docs
def transform_repo_docs(
@@ -34,14 +29,20 @@ def transform_repo_docs(
print(f"\n{'='*60}")
print(f"Transforming docs for: {repo_name}")
print(f"{'='*60}")
-
+
source_dir = temp_dir / repo_name
target_dir = docs_dir / repo.target_path
-
+
# Step 1: Copy files with 'github_target_path:' frontmatter
print(f"\nStep 1: Processing targeted files...")
- copy_targeted_docs(str(source_dir), str(docs_dir), repo_name, repo.media_directories, repo.root_files)
-
+ copy_targeted_docs(
+ str(source_dir),
+ str(docs_dir),
+ repo_name,
+ repo.media_directories,
+ repo.root_files,
+ )
+
# Step 2: Transform project structure
print(f"\nStep 2: Transforming project structure...")
transform_directory_structure(
@@ -51,11 +52,11 @@ def transform_repo_docs(
repo.special_files,
repo.media_directories,
)
-
+
# Step 3: Process markdown files
print(f"\nStep 3: Processing markdown files...")
process_all_markdown(str(target_dir), repo_name)
-
+
print(f"\n✓ Transformation complete for {repo_name}")
return True
@@ -68,32 +69,32 @@ def aggregate_repo(
) -> tuple:
"""
Aggregate documentation for a single repository.
-
+
Returns:
Tuple of (success, resolved_commit_hash)
"""
print(f"\n{'='*60}")
print(f"Aggregating: {repo.name}")
print(f"{'='*60}")
-
+
# Create output directory for this repo
repo_output_dir = temp_dir / repo.name
repo_output_dir.mkdir(parents=True, exist_ok=True)
-
+
# Fetch the repository
result = fetcher.fetch(repo, repo_output_dir)
-
+
if not result.success:
print(f"✗ Failed to fetch {repo.name}")
return False, result.resolved_commit
-
+
# Transform the fetched docs
transform_success = transform_repo_docs(repo, docs_dir, temp_dir)
-
+
if not transform_success:
print(f"✗ Failed to transform {repo.name}")
return False, result.resolved_commit
-
+
return True, result.resolved_commit
@@ -117,7 +118,7 @@ def main() -> int:
%(prog)s --update-locks
""",
)
-
+
parser.add_argument(
"--config",
default="repos-config.json",
@@ -137,25 +138,25 @@ def main() -> int:
action="store_true",
help="Update commit locks: fetch and update config with resolved commit hashes",
)
-
+
args = parser.parse_args()
-
+
# Determine script directory
script_dir = Path(__file__).parent.resolve()
project_root = script_dir.parent
-
+
# Resolve paths
# Config files are in project root, not in src/
if not Path(args.config).is_absolute():
config_path = project_root / args.config
else:
config_path = Path(args.config)
-
+
if not Path(args.docs_dir).is_absolute():
docs_dir = project_root / args.docs_dir
else:
docs_dir = Path(args.docs_dir)
-
+
# Load configuration
print(f"{'='*60}")
print("Garden Linux Documentation Aggregation")
@@ -167,56 +168,56 @@ def main() -> int:
if args.update_locks:
print("Update commit locks: ENABLED")
print()
-
+
repos = load_config(str(config_path))
-
+
# Create temporary directory for fetched docs
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
print(f"Temporary directory: {temp_dir}\n")
-
+
# Initialize fetcher
fetcher = DocsFetcher(project_root, update_locks=args.update_locks)
-
+
# Track resolved commits for locking
resolved_commits = {}
success_count = 0
fail_count = 0
-
+
# Aggregate each repository
for repo in repos:
# Filter by repo if specified
if args.repo and repo.name != args.repo:
continue
-
+
success, resolved_commit = aggregate_repo(
repo,
docs_dir,
temp_dir,
fetcher,
)
-
+
if success:
success_count += 1
if resolved_commit:
resolved_commits[repo.name] = resolved_commit
else:
fail_count += 1
-
+
# Update config with resolved commits if locking
if args.update_locks and resolved_commits:
print(f"\n{'='*60}")
print("Updating config with resolved commits...")
print(f"{'='*60}\n")
-
+
for repo in repos:
if repo.name in resolved_commits:
repo.commit = resolved_commits[repo.name]
print(f" {repo.name}: {resolved_commits[repo.name]}")
-
+
save_config(str(config_path), repos)
print(f"\n✓ Config updated: {config_path}")
-
+
# Generate flavor matrix documentation after all repos are aggregated
# Use docs/projects/gardenlinux path since temp_dir is cleaned up
gardenlinux_docs_path = docs_dir / "projects" / "gardenlinux"
@@ -225,33 +226,33 @@ def main() -> int:
print("Generating flavor matrix documentation...")
print(f"{'='*60}\n")
generate_flavor_matrix_docs(docs_dir, gardenlinux_docs_path)
-
+
# Generate release documentation from GLRD
print(f"\n{'='*60}")
print("Generating release documentation...")
print(f"{'='*60}\n")
generate_release_docs(docs_dir)
-
+
# Generate release notes from GitHub
print(f"\n{'='*60}")
print("Fetching release notes from GitHub...")
print(f"{'='*60}\n")
generate_release_notes_docs(docs_dir)
-
+
# Summary
print(f"\n{'='*60}")
print("Documentation aggregation complete!")
print(f"{'='*60}\n")
print(f"Successful: {success_count}")
print(f"Failed: {fail_count}")
-
+
print("\nNext steps:")
print(" 1. Review the changes in docs/projects/")
print(" 2. Run 'make dev' or 'pnpm run docs:dev' to preview")
print(" 3. Commit the changes if satisfied")
-
+
return 0 if fail_count == 0 else 1
if __name__ == "__main__":
- sys.exit(main())
\ No newline at end of file
+ sys.exit(main())
diff --git a/src/aggregation/__init__.py b/src/aggregation/__init__.py
index 5c4e302..4a0c7cc 100644
--- a/src/aggregation/__init__.py
+++ b/src/aggregation/__init__.py
@@ -1,23 +1,15 @@
"""Aggregation package for docs-ng documentation aggregation."""
# Re-export commonly used functions for backward compatibility with tests
-from .transformer import (
- rewrite_links,
- ensure_frontmatter,
- quote_yaml_value,
- parse_frontmatter,
-)
-
-from .models import RepoConfig, AggregateResult
from .config import load_config, save_config
from .fetcher import DocsFetcher
-from .structure import (
- transform_directory_structure,
- copy_targeted_docs,
- process_all_markdown,
-)
-from .releases import generate_release_docs
from .flavor_matrix import generate_flavor_matrix_docs
+from .models import AggregateResult, RepoConfig
+from .releases import generate_release_docs
+from .structure import (copy_targeted_docs, process_all_markdown,
+ transform_directory_structure)
+from .transformer import (ensure_frontmatter, parse_frontmatter,
+ quote_yaml_value, rewrite_links)
__all__ = [
# Models
@@ -41,4 +33,4 @@
"generate_release_docs",
# Flavor Matrix
"generate_flavor_matrix_docs",
-]
\ No newline at end of file
+]
diff --git a/src/aggregation/config.py b/src/aggregation/config.py
index 16af74f..6fbe1a6 100644
--- a/src/aggregation/config.py
+++ b/src/aggregation/config.py
@@ -2,7 +2,7 @@
import json
import sys
-from typing import Dict, List
+from typing import List
from .models import RepoConfig
@@ -10,26 +10,26 @@
def load_config(config_path: str) -> List[RepoConfig]:
"""
Load and validate repository configuration.
-
+
Args:
config_path: Path to JSON configuration file
-
+
Returns:
List of validated RepoConfig objects
"""
try:
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
-
+
if "repos" not in config:
raise ValueError("Configuration must have 'repos' array")
-
+
repos = []
for repo_dict in config["repos"]:
repo = RepoConfig.from_dict(repo_dict)
repo.validate()
repos.append(repo)
-
+
return repos
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in config file: {e}", file=sys.stderr)
@@ -42,7 +42,7 @@ def load_config(config_path: str) -> List[RepoConfig]:
def save_config(config_path: str, repos: List[RepoConfig]) -> None:
"""
Save repository configuration to JSON file.
-
+
Args:
config_path: Path to JSON configuration file
repos: List of RepoConfig objects to save
@@ -60,12 +60,16 @@ def save_config(config_path: str, repos: List[RepoConfig]) -> None:
**({"root_files": repo.root_files} if repo.root_files else {}),
**({"structure": repo.structure} if repo.structure != "flat" else {}),
**({"special_files": repo.special_files} if repo.special_files else {}),
- **({"media_directories": repo.media_directories} if repo.media_directories else {}),
+ **(
+ {"media_directories": repo.media_directories}
+ if repo.media_directories
+ else {}
+ ),
}
for repo in repos
]
}
-
+
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2)
- f.write("\n")
\ No newline at end of file
+ f.write("\n")
diff --git a/src/aggregation/fetcher.py b/src/aggregation/fetcher.py
index b34dd4f..9af3d4c 100644
--- a/src/aggregation/fetcher.py
+++ b/src/aggregation/fetcher.py
@@ -5,15 +5,15 @@
import sys
import tempfile
from pathlib import Path
-from typing import Tuple, Optional
+from typing import Tuple
-from .models import RepoConfig, AggregateResult
+from .models import AggregateResult, RepoConfig
def _convert_to_git_pattern(pattern: str) -> str:
"""
Convert Python glob pattern to git sparse-checkout compatible pattern.
-
+
Git doesn't support **, so convert to folder prefix.
e.g., "features/**/*.md" -> "features/*"
"""
@@ -22,33 +22,33 @@ def _convert_to_git_pattern(pattern: str) -> str:
for i, part in enumerate(parts):
if "**" in part:
parts[i] = "*"
- return "/".join(parts[:i+1])
+ return "/".join(parts[: i + 1])
return pattern
return pattern
class DocsFetcher:
"""Handles fetching documentation from remote or local repositories."""
-
+
def __init__(self, project_root: Path, update_locks: bool = False):
"""
Initialize fetcher.
-
+
Args:
project_root: Root directory of docs-ng project
update_locks: Whether we're in update-locks mode (allows commit mismatches)
"""
self.project_root = project_root
self.update_locks = update_locks
-
+
def fetch(self, repo: RepoConfig, output_dir: Path) -> AggregateResult:
"""
Fetch documentation for a repository.
-
+
Args:
repo: Repository configuration
output_dir: Where to copy fetched files
-
+
Returns:
AggregateResult with success status and resolved commit
"""
@@ -58,31 +58,33 @@ def fetch(self, repo: RepoConfig, output_dir: Path) -> AggregateResult:
else:
success, commit = self._fetch_remote(repo, output_dir)
return AggregateResult(repo.name, success, commit)
-
+
def _fetch_remote(
self,
repo: RepoConfig,
output_dir: Path,
- ) -> Tuple[bool, Optional[str]]:
+ ) -> Tuple[bool, str | None]:
"""Fetch from remote repository using git sparse checkout."""
temp_dir = Path(tempfile.mkdtemp())
-
+
try:
print(f" Fetching from: {repo.url}")
print(f" Ref: {repo.ref}")
if repo.root_files:
print(f" Root files: {', '.join(repo.root_files)}")
print(f" Output: {output_dir}")
-
+
# Initialize git repository
- subprocess.run(["git", "init"], check=True, capture_output=True, cwd=temp_dir)
+ subprocess.run(
+ ["git", "init"], check=True, capture_output=True, cwd=temp_dir
+ )
subprocess.run(
["git", "remote", "add", "origin", repo.url],
check=True,
capture_output=True,
cwd=temp_dir,
)
-
+
# Fetch the ref (full history to support any locked commit)
print(" Fetching repository...")
subprocess.run(
@@ -91,7 +93,7 @@ def _fetch_remote(
capture_output=True,
cwd=temp_dir,
)
-
+
# Determine which commit to checkout:
# - In update-locks mode: always checkout the ref (latest)
# - In normal mode: use locked commit if available, otherwise checkout ref
@@ -99,7 +101,7 @@ def _fetch_remote(
checkout_ref = repo.ref
else:
checkout_ref = repo.commit
-
+
print(f" Checking out: {checkout_ref}")
subprocess.run(
["git", "checkout", checkout_ref],
@@ -107,7 +109,7 @@ def _fetch_remote(
capture_output=True,
cwd=temp_dir,
)
-
+
# Get resolved commit hash
result = subprocess.run(
["git", "rev-parse", "HEAD"],
@@ -118,21 +120,23 @@ def _fetch_remote(
)
resolved_commit = result.stdout.strip()
print(f" Resolved commit: {resolved_commit}")
-
+
# Copy docs to output directory
docs_source = temp_dir / repo.docs_path
if docs_source.exists():
print(f" Copying docs to {output_dir}")
self._copy_docs(docs_source, output_dir)
else:
- print(f" Warning: docs_path '{repo.docs_path}' not found in repository")
-
+ print(
+ f" Warning: docs_path '{repo.docs_path}' not found in repository"
+ )
+
# Copy root files if specified
self._copy_root_files(temp_dir, repo.root_files, output_dir)
-
+
print(" ✓ Fetch complete")
return True, resolved_commit
-
+
except subprocess.CalledProcessError as e:
print(f" Error: Git command failed: {e}", file=sys.stderr)
if e.stderr:
@@ -144,7 +148,7 @@ def _fetch_remote(
finally:
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
-
+
def _fetch_local(
self,
repo: RepoConfig,
@@ -158,45 +162,50 @@ def _fetch_local(
repo_abs_path = (self.project_root / repo_path).resolve()
else:
repo_abs_path = repo_path.resolve()
-
+
print(f" Copying from: {repo_abs_path}")
if repo.root_files:
print(f" Root files: {', '.join(repo.root_files)}")
print(f" Output: {output_dir}")
-
+
if not repo_abs_path.exists():
- print(f" Error: Local repository not found: {repo_abs_path}", file=sys.stderr)
+ print(
+ f" Error: Local repository not found: {repo_abs_path}",
+ file=sys.stderr,
+ )
return False
-
+
# Copy docs directory
docs_source = repo_abs_path / repo.docs_path
if docs_source.exists():
print(f" Copying docs from {repo.docs_path}/")
self._copy_docs(docs_source, output_dir)
else:
- print(f" Warning: docs_path '{repo.docs_path}' not found in local repository")
-
+ print(
+ f" Warning: docs_path '{repo.docs_path}' not found in local repository"
+ )
+
# Copy root files if specified
self._copy_root_files(repo_abs_path, repo.root_files, output_dir)
-
+
print(" ✓ Copy complete")
return True
-
+
except Exception as e:
print(f" Error: {e}", file=sys.stderr)
return False
-
+
@staticmethod
def _copy_docs(source: Path, dest: Path) -> None:
"""
Copy documentation directory contents.
-
+
Args:
source: Source docs directory
dest: Destination directory
"""
dest.mkdir(parents=True, exist_ok=True)
-
+
# Copy all regular files and directories
for item in source.iterdir():
target = dest / item.name
@@ -204,21 +213,21 @@ def _copy_docs(source: Path, dest: Path) -> None:
shutil.copy2(item, target)
elif item.is_dir():
shutil.copytree(item, target, dirs_exist_ok=True)
-
+
# Also copy hidden directories (like .media)
for item in source.glob(".*"):
if item.is_dir() and item.name not in [".", ".."]:
target = dest / item.name
shutil.copytree(item, target, dirs_exist_ok=True)
-
+
@staticmethod
def _copy_root_files(repo_root: Path, root_files: list, dest: Path) -> None:
"""
Copy specified root-level files and directories from repository.
-
+
Supports glob patterns like "features/*/*.md" to match specific files
without copying entire directories.
-
+
Args:
repo_root: Root directory of the repository
root_files: List of filenames/directories/patterns to copy
@@ -226,23 +235,25 @@ def _copy_root_files(repo_root: Path, root_files: list, dest: Path) -> None:
"""
if not root_files:
return
-
+
print(" Copying root files")
for filename in root_files:
clean_name = filename.rstrip("/")
-
- if '*' in clean_name or '?' in clean_name or '[' in clean_name:
+
+ if "*" in clean_name or "?" in clean_name or "[" in clean_name:
matches = list(repo_root.glob(clean_name))
if not matches:
print(f" Warning: {filename} not found (no matches)")
continue
-
+
for src in matches:
rel_path = src.relative_to(repo_root)
target = dest / rel_path
if src.is_dir():
try:
- shutil.copytree(src, target, dirs_exist_ok=True, symlinks=False)
+ shutil.copytree(
+ src, target, dirs_exist_ok=True, symlinks=False
+ )
print(f" ✓ {filename} -> {rel_path} (directory)")
except Exception as e:
print(f" Warning: Failed to copy {rel_path}: {e}")
@@ -256,7 +267,9 @@ def _copy_root_files(repo_root: Path, root_files: list, dest: Path) -> None:
target = dest / src.name
if src.is_dir():
try:
- shutil.copytree(src, target, dirs_exist_ok=True, symlinks=False)
+ shutil.copytree(
+ src, target, dirs_exist_ok=True, symlinks=False
+ )
print(f" ✓ {filename} (directory)")
except Exception as e:
print(f" Warning: Failed to copy {filename}: {e}")
@@ -265,5 +278,3 @@ def _copy_root_files(repo_root: Path, root_files: list, dest: Path) -> None:
print(f" ✓ {filename}")
else:
print(f" Warning: {filename} not found")
-
-
diff --git a/src/aggregation/flavor_matrix.py b/src/aggregation/flavor_matrix.py
index 187e0d0..ea7ac57 100644
--- a/src/aggregation/flavor_matrix.py
+++ b/src/aggregation/flavor_matrix.py
@@ -1,15 +1,13 @@
"""Generate flavor matrix documentation from flavors.yaml and feature dependencies."""
-import re
-import yaml
from pathlib import Path
-from typing import Optional, List, Tuple
+import yaml
from gardenlinux.features import Parser as FeaturesParser
from gardenlinux.flavors.parser import Parser as FlavorsParser
-def get_flavor_list(gardenlinux_repo_dir: Path) -> Optional[dict]:
+def get_flavor_list(gardenlinux_repo_dir: Path) -> dict | None:
"""Get flavor list by parsing flavors.yaml directly."""
flavors_file = gardenlinux_repo_dir / "flavors.yaml"
@@ -38,9 +36,7 @@ def get_flavor_list(gardenlinux_repo_dir: Path) -> Optional[dict]:
return None
-def generate_flavor_matrix_docs(
- docs_dir: Path, gardenlinux_repo_dir: Path
-) -> bool:
+def generate_flavor_matrix_docs(docs_dir: Path, gardenlinux_repo_dir: Path) -> bool:
"""
Generate flavor matrix page from flavors.yaml and feature dependencies.
@@ -66,7 +62,9 @@ def generate_flavor_matrix_docs(
return False
try:
- features_parser = FeaturesParser(str(features_dir)) # Default feature_dir_name is "features"
+ features_parser = FeaturesParser(
+ str(features_dir)
+ ) # Default feature_dir_name is "features"
except Exception as e:
print(f"Failed to initialize features parser: {e}")
return False
@@ -123,6 +121,7 @@ def link(feature: str) -> str:
# Step 5: Append table to existing aggregated file (keeps frontmatter and content)
output_file = docs_dir / "reference" / "flavor-matrix.md"
+ content: str = ""
if output_file.exists():
try:
existing_content = output_file.read_text()
@@ -134,6 +133,11 @@ def link(feature: str) -> str:
except Exception as e:
print(f"Warning: Could not read existing file: {e}")
+ if not content:
+ print(
+ f"Warning: Read existing file '{output_file}', but file contents are empty!"
+ )
+
output_dir = docs_dir / "reference"
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "flavor-matrix.md"
diff --git a/src/aggregation/glrd.py b/src/aggregation/glrd.py
index cb7946e..f7463e1 100644
--- a/src/aggregation/glrd.py
+++ b/src/aggregation/glrd.py
@@ -3,9 +3,9 @@
import json
import subprocess
import sys
-from typing import Optional
-def run_glrd_json(args: list[str]) -> Optional[dict]:
+
+def run_glrd_json(args: list[str]) -> dict | None:
"""Run glrd command with JSON output and return parsed data."""
try:
result = subprocess.run(
diff --git a/src/aggregation/models.py b/src/aggregation/models.py
index de4133e..eb13356 100644
--- a/src/aggregation/models.py
+++ b/src/aggregation/models.py
@@ -1,47 +1,47 @@
"""Data models for documentation aggregation."""
from dataclasses import dataclass, field
-from typing import Dict, List, Optional, Union
+from typing import Dict, List, Union
@dataclass
class RepoConfig:
"""Configuration for a single repository."""
-
+
name: str
url: str
docs_path: str
target_path: str
- ref: Optional[str] = None
- commit: Optional[str] = None
+ ref: str
+ commit: str | None = None
root_files: List[str] = field(default_factory=list)
structure: Union[str, Dict[str, str]] = "flat"
special_files: Dict[str, str] = field(default_factory=dict)
media_directories: List[str] = field(default_factory=list)
-
+
@property
def is_local(self) -> bool:
"""Check if this is a local file:// repository."""
return self.url.startswith("file://")
-
+
@property
def is_remote(self) -> bool:
"""Check if this is a remote https:// repository."""
return self.url.startswith("https://")
-
+
@property
def local_path(self) -> str:
"""Get local path by stripping file:// prefix."""
return self.url[7:] if self.is_local else ""
-
+
def validate(self) -> None:
"""Validate repository configuration."""
if not (self.is_local or self.is_remote):
raise ValueError(f"Invalid URL scheme for {self.name}: {self.url}")
-
+
if self.is_remote and not self.ref:
raise ValueError(f"Remote repository {self.name} must have 'ref' field")
-
+
@classmethod
def from_dict(cls, data: Dict) -> "RepoConfig":
"""Create RepoConfig from dictionary."""
@@ -50,7 +50,7 @@ def from_dict(cls, data: Dict) -> "RepoConfig":
url=data["url"],
docs_path=data["docs_path"],
target_path=data["target_path"],
- ref=data.get("ref"),
+ ref=data.get("ref") or "main",
commit=data.get("commit"),
root_files=data.get("root_files", []),
structure=data.get("structure", "flat"),
@@ -62,7 +62,7 @@ def from_dict(cls, data: Dict) -> "RepoConfig":
@dataclass
class AggregateResult:
"""Result of aggregating a single repository."""
-
+
repo_name: str
success: bool
- resolved_commit: Optional[str] = None
\ No newline at end of file
+ resolved_commit: str | None = None
diff --git a/src/aggregation/release_notes.py b/src/aggregation/release_notes.py
index d2cb080..8af3eb2 100644
--- a/src/aggregation/release_notes.py
+++ b/src/aggregation/release_notes.py
@@ -7,10 +7,7 @@
from datetime import datetime
from pathlib import Path
-from .glrd import (
- run_glrd_json,
- get_active_minor_versions
-)
+from .glrd import get_active_minor_versions
from .transformer import cleanup_github_markdown
GITHUB_API_URL = "https://api.github.com/repos/gardenlinux/gardenlinux/releases"
@@ -31,14 +28,14 @@ def parse_version(tag: str) -> tuple:
1592.18.0 -> (1592, 18, 0)
"""
# Remove leading 'v' if present
- tag = tag.lstrip('v')
+ tag = tag.lstrip("v")
# Split by dots and convert to integers
- parts = tag.split('.')
+ parts = tag.split(".")
version_nums = []
for part in parts:
# Extract numeric part (handle cases like 2150.1.0, 576.3.0)
- match = re.match(r'(\d+)', part)
+ match = re.match(r"(\d+)", part)
if match:
version_nums.append(int(match.group(1)))
else:
@@ -54,9 +51,7 @@ def parse_version(tag: str) -> tuple:
def sort_by_version(releases: list) -> list:
"""Sort releases by semantic version (highest first)."""
return sorted(
- releases,
- key=lambda r: parse_version(r.get("tag_name", "0")),
- reverse=True
+ releases, key=lambda r: parse_version(r.get("tag_name", "0")), reverse=True
)
@@ -69,7 +64,12 @@ def fetch_github_releases(per_page: int = 100) -> list:
while page <= max_pages and len(all_releases) < MAX_RELEASES:
try:
result = subprocess.run(
- ["curl", "-s", "-L", f"{GITHUB_API_URL}?per_page={per_page}&page={page}"],
+ [
+ "curl",
+ "-s",
+ "-L",
+ f"{GITHUB_API_URL}?per_page={per_page}&page={page}",
+ ],
capture_output=True,
text=True,
check=False,
@@ -154,7 +154,10 @@ def generate_release_notes_docs(docs_dir: Path) -> bool:
print("Querying GLRD for release status...")
active_versions = get_active_minor_versions()
if not active_versions:
- print("Warning: GLRD query failed, defaulting all releases to archived", file=sys.stderr)
+ print(
+ "Warning: GLRD query failed, defaulting all releases to archived",
+ file=sys.stderr,
+ )
# Filter releases (skip drafts)
filtered = []
@@ -179,12 +182,14 @@ def generate_release_notes_docs(docs_dir: Path) -> bool:
date = format_release_date(release.get("published_at", ""))
# Make version heading h1 (replace ## VersionName with # VersionName)
- content = re.sub(r'^##\s+' + re.escape(name) + r'$', '# ' + name, content, flags=re.MULTILINE)
+ content = re.sub(
+ r"^##\s+" + re.escape(name) + r"$", "# " + name, content, flags=re.MULTILINE
+ )
# Determine if this release is archived
# A release is active ONLY if it's explicitly in the active_versions dict
# All other releases are archived
- tag_without_v = tag_name.lstrip('v')
+ tag_without_v = tag_name.lstrip("v")
is_archived = tag_without_v not in active_versions
# Order: highest version = 1, second = 2, etc.
@@ -221,13 +226,15 @@ def generate_release_notes_docs(docs_dir: Path) -> bool:
filepath.write_text(page_content)
- release_list.append({
- "tag": tag_name,
- "name": name,
- "filename": filename,
- "date": format_release_date(release.get("published_at", "")),
- "is_archived": is_archived,
- })
+ release_list.append(
+ {
+ "tag": tag_name,
+ "name": name,
+ "filename": filename,
+ "date": format_release_date(release.get("published_at", "")),
+ "is_archived": is_archived,
+ }
+ )
print(f" Created: {filepath.relative_to(docs_dir)}")
print("Release notes generation complete.")
diff --git a/src/aggregation/releases.py b/src/aggregation/releases.py
index 236aba9..9aa88a3 100644
--- a/src/aggregation/releases.py
+++ b/src/aggregation/releases.py
@@ -1,22 +1,10 @@
"""Generate release documentation from GLRD."""
-import json
-import subprocess
import sys
from pathlib import Path
-from typing import Optional
-from .constants import (
- GANTT_THEME,
- RELEASES_TAG_URL,
- COMMITS_URL,
- LIFECYCLE_LINKS,
-)
-
-from .glrd import (
- run_glrd_json,
- get_active_minor_versions
-)
+from .constants import COMMITS_URL, GANTT_THEME, LIFECYCLE_LINKS
+from .glrd import get_active_minor_versions, run_glrd_json
def format_version(release: dict, active_versions: set[str]) -> tuple[str, str]:
@@ -38,7 +26,9 @@ def format_version(release: dict, active_versions: set[str]) -> tuple[str, str]:
if "major" in version_obj and "minor" in version_obj:
if "patch" in version_obj:
- version_str = f"{version_obj['major']}.{version_obj['minor']}.{version_obj['patch']}"
+ version_str = (
+ f"{version_obj['major']}.{version_obj['minor']}.{version_obj['patch']}"
+ )
else:
version_str = f"{version_obj['major']}.{version_obj['minor']}"
elif "major" in version_obj:
@@ -52,7 +42,9 @@ def format_version(release: dict, active_versions: set[str]) -> tuple[str, str]:
is_active = version_str in active_versions
if is_active:
- version_link = f"[{version_str}](release-notes/{version_str.replace('.', '-')}.html)"
+ version_link = (
+ f"[{version_str}](release-notes/{version_str.replace('.', '-')}.html)"
+ )
else:
version_link = f"[{version_str}](release-notes/archived/{version_str.replace('.', '-')}.html)"
else:
@@ -192,7 +184,9 @@ def get_timeline_section(gantt_chart: str, title: str) -> str:
"""
-def append_release_page(table: str, timeline: str, page_type: str = "maintained") -> str:
+def append_release_page(
+ table: str, timeline: str, page_type: str = "maintained"
+) -> str:
"""Append to an existing release page."""
return f"""
@@ -220,7 +214,10 @@ def generate_release_docs(docs_dir: Path) -> bool:
archived_data = run_glrd_json(["--archived"])
if active_data is None:
- print("Warning: Could not fetch active releases - skipping generation", file=sys.stderr)
+ print(
+ "Warning: Could not fetch active releases - skipping generation",
+ file=sys.stderr,
+ )
return False
active_table = generate_release_table(active_data, active_versions)
@@ -234,28 +231,32 @@ def generate_release_docs(docs_dir: Path) -> bool:
)
release_file = "maintained-releases.md"
- release_path = (releases_dir / release_file)
+ release_path = releases_dir / release_file
# Read existing file and keep only frontmatter and static content
# (everything before the generated tables)
existing_content = release_path.read_text()
- lines = existing_content.split('\n')
+ lines = existing_content.split("\n")
# Find where the generated content starts (look for "## Active Releases" heading)
static_lines = []
for i, line in enumerate(lines):
- if line.startswith('## Active Releases') or line.startswith('## Release Timeline'):
+ if line.startswith("## Active Releases") or line.startswith(
+ "## Release Timeline"
+ ):
break
static_lines.append(line)
# Write static content plus new generated content
- release_path.write_text('\n'.join(static_lines).rstrip() + '\n\n' + active_content)
+ release_path.write_text("\n".join(static_lines).rstrip() + "\n\n" + active_content)
print(f" Updated: {release_path}")
if archived_data is not None:
archived_table = generate_release_table(archived_data, active_versions)
archived_gantt = generate_mermaid_gantt(archived_data)
- archived_timeline = get_timeline_section(archived_gantt, "Archived Releases Timeline")
+ archived_timeline = get_timeline_section(
+ archived_gantt, "Archived Releases Timeline"
+ )
archived_content = append_release_page(
archived_table,
@@ -264,21 +265,25 @@ def generate_release_docs(docs_dir: Path) -> bool:
)
release_file = "archived-releases.md"
- release_path = (releases_dir / release_file)
+ release_path = releases_dir / release_file
# Read existing file and keep only frontmatter and static content
existing_content = release_path.read_text()
- lines = existing_content.split('\n')
+ lines = existing_content.split("\n")
# Find where the generated content starts (look for "## Out of Maintenance" heading)
static_lines = []
for i, line in enumerate(lines):
- if line.startswith('## Out of Maintenance') or line.startswith('## Archived Releases Timeline'):
+ if line.startswith("## Out of Maintenance") or line.startswith(
+ "## Archived Releases Timeline"
+ ):
break
static_lines.append(line)
# Write static content plus new generated content
- release_path.write_text('\n'.join(static_lines).rstrip() + '\n\n' + archived_content)
+ release_path.write_text(
+ "\n".join(static_lines).rstrip() + "\n\n" + archived_content
+ )
print(f" Updated: {release_path}")
else:
print("Warning: Could not fetch archived releases", file=sys.stderr)
diff --git a/src/aggregation/structure.py b/src/aggregation/structure.py
index 500120a..7f4877a 100644
--- a/src/aggregation/structure.py
+++ b/src/aggregation/structure.py
@@ -2,26 +2,22 @@
import shutil
from pathlib import Path
-from typing import Dict, List, Optional
+from typing import List
-from .transformer import (
- rewrite_links,
- fix_broken_project_links,
- ensure_frontmatter,
- parse_frontmatter,
-)
+from .transformer import (ensure_frontmatter, fix_broken_project_links,
+ parse_frontmatter, rewrite_links)
def transform_directory_structure(
source_dir: str,
target_dir: str,
structure_map,
- special_files: Optional[Dict] = None,
- media_dirs: Optional[List[str]] = None,
+ special_files: dict | None = None,
+ media_dirs: List[str] | None = None,
) -> None:
"""
Transform directory structure based on mapping.
-
+
Args:
source_dir: Source directory with fetched docs
target_dir: Target directory in docs/projects/
@@ -32,25 +28,25 @@ def transform_directory_structure(
source_path = Path(source_dir)
target_path = Path(target_dir)
target_path.mkdir(parents=True, exist_ok=True)
-
+
special_files = special_files or {}
media_dirs = media_dirs or []
-
+
if isinstance(structure_map, dict):
# Structured transformation with subdirectories specified
for old_name, new_name in structure_map.items():
old_path = source_path / old_name
new_path = target_path / new_name
-
+
if old_path.exists():
print(f" Transforming: {old_name} -> {new_name}")
shutil.copytree(old_path, new_path, dirs_exist_ok=True)
-
+
# Handle special files
for item in source_path.iterdir():
if item.name in structure_map:
continue
-
+
if item.name in special_files:
target_subdir = target_path / special_files[item.name]
target_subdir.mkdir(parents=True, exist_ok=True)
@@ -71,7 +67,7 @@ def transform_directory_structure(
and not item.name.startswith(".")
):
shutil.copytree(item, target_path / item.name, dirs_exist_ok=True)
-
+
else:
# Flat/sphinx structure - copy all files as-is (merged logic)
print(f" Copying {structure_map} structure")
@@ -83,11 +79,17 @@ def transform_directory_structure(
shutil.copytree(item, target_item, dirs_exist_ok=True)
-def copy_targeted_docs(source_dir: str, docs_dir: str, repo_name: str, media_dirs: Optional[List[str]] = None, root_files: Optional[List[str]] = None) -> None:
+def copy_targeted_docs(
+ source_dir: str,
+ docs_dir: str,
+ repo_name: str,
+ media_dirs: List[str] | None = None,
+ root_files: List[str] | None = None,
+) -> None:
"""
Copy markdown files with 'github_target_path:' frontmatter to their specified locations.
Also copies media directories to the common target path of targeted files.
-
+
Args:
source_dir: Source directory with fetched docs
docs_dir: Docs root directory
@@ -97,14 +99,14 @@ def copy_targeted_docs(source_dir: str, docs_dir: str, repo_name: str, media_dir
"""
source_path = Path(source_dir)
docs_path = Path(docs_dir)
-
+
if not source_path.exists():
print(f" [Warning] Source directory not found: {source_dir}")
return
-
+
# Find all markdown files (recursively in source_dir)
md_files = list(source_path.rglob("*.md"))
-
+
# Also check root_files if provided
# Note: root_files may have been flattened by the fetcher (e.g., src/README.md -> README.md)
# So we need to check both the original path and just the basename
@@ -113,15 +115,21 @@ def copy_targeted_docs(source_dir: str, docs_dir: str, repo_name: str, media_dir
for root_file in root_files:
# Try the full path first
root_file_path = source_path / root_file
-
+
# If that doesn't exist, try just the basename (in case fetcher flattened it)
if not root_file_path.exists():
root_file_path = source_path / Path(root_file).name
-
+
print(f" Checking: {root_file} -> {root_file_path}")
- print(f" Exists: {root_file_path.exists()}, Is file: {root_file_path.is_file() if root_file_path.exists() else 'N/A'}, Ends with .md: {root_file.endswith('.md')}")
-
- if root_file_path.exists() and root_file_path.is_file() and root_file.endswith('.md'):
+ print(
+ f" Exists: {root_file_path.exists()}, Is file: {root_file_path.is_file() if root_file_path.exists() else 'N/A'}, Ends with .md: {root_file.endswith('.md')}"
+ )
+
+ if (
+ root_file_path.exists()
+ and root_file_path.is_file()
+ and root_file.endswith(".md")
+ ):
# Add to list if not already there
if root_file_path not in md_files:
md_files.append(root_file_path)
@@ -130,53 +138,58 @@ def copy_targeted_docs(source_dir: str, docs_dir: str, repo_name: str, media_dir
print(f" Already in list")
else:
print(f" ✗ Skipped")
-
+
targeted_files = []
-
+
print(f" Scanning {len(md_files)} files for 'github_target_path:' frontmatter...")
-
+
for md_file in md_files:
try:
with open(md_file, "r", encoding="utf-8") as f:
content = f.read()
-
+
frontmatter, _ = parse_frontmatter(content)
-
+
# Check for 'github_target_path' in frontmatter
if frontmatter and ("github_target_path" in frontmatter):
- target_path = frontmatter.get("github_target_path") or frontmatter.get("target")
-
+ target_path = frontmatter.get("github_target_path") or frontmatter.get(
+ "target"
+ )
+
+ if target_path is None:
+ continue
+
# Strip leading 'docs/' if present
if target_path.startswith("docs/"):
target_path = target_path[5:]
-
+
target_file = docs_path / target_path
-
+
# Create parent directories if needed
target_file.parent.mkdir(parents=True, exist_ok=True)
-
+
# Copy the file
shutil.copy2(md_file, target_file)
-
+
# Apply markdown processing
content = ensure_frontmatter(content)
-
+
with open(target_file, "w", encoding="utf-8") as f:
f.write(content)
-
+
targeted_files.append((md_file.relative_to(source_path), target_path))
print(f" ✓ Copied: {md_file.name} → {target_path}")
-
+
except Exception as e:
print(f" [Warning] Error processing {md_file.name}: {e}")
-
+
if targeted_files:
print(f" ✓ Copied {len(targeted_files)} targeted file(s)")
-
+
# Copy media directories to maintain relative paths with targeted files
if media_dirs:
print(f" Copying media directories recursively...")
-
+
# Compute common ancestor of all targeted files for root-level media
target_paths = [Path(target_path) for _, target_path in targeted_files]
common_parent = None
@@ -189,22 +202,28 @@ def copy_targeted_docs(source_dir: str, docs_dir: str, repo_name: str, media_dir
if all(p in parents for parents in all_parents):
common_parent = p
break
-
+
for media_dir_name in media_dirs:
# Recursively find all instances of this media directory in the source
for media_dir in source_path.rglob(media_dir_name):
if media_dir.is_dir():
# Calculate relative path from source_path
rel_path = media_dir.relative_to(source_path)
-
+
# Determine if this is a root-level or nested media directory
if len(rel_path.parts) == 1:
# Root-level media directory: copy to common ancestor of targeted files
- if common_parent and common_parent != Path('.'):
- target_media = docs_path / common_parent / media_dir_name
+ if common_parent and common_parent != Path("."):
+ target_media = (
+ docs_path / common_parent / media_dir_name
+ )
target_media.parent.mkdir(parents=True, exist_ok=True)
- shutil.copytree(media_dir, target_media, dirs_exist_ok=True)
- print(f" ✓ Copied media: {common_parent / media_dir_name}")
+ shutil.copytree(
+ media_dir, target_media, dirs_exist_ok=True
+ )
+ print(
+ f" ✓ Copied media: {common_parent / media_dir_name}"
+ )
else:
# Nested media directory: copy to same relative path
target_media = docs_path / rel_path
@@ -223,20 +242,20 @@ def process_markdown_file(
) -> bool:
"""
Process a single markdown file: rewrite links, fix frontmatter.
-
+
Args:
file_path: Path to markdown file
repo_name: Repository name
target_dir: Target directory path
base_path: Base path for projects
-
+
Returns:
Success status
"""
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
-
+
# Calculate relative path from target_dir
file_path_obj = Path(file_path)
target_path_obj = Path(target_dir)
@@ -244,14 +263,14 @@ def process_markdown_file(
file_rel_path = str(file_path_obj.relative_to(target_path_obj))
except ValueError:
file_rel_path = ""
-
+
content = rewrite_links(content, repo_name, file_rel_path, base_path)
content = fix_broken_project_links(content, repo_name, target_dir, base_path)
content = ensure_frontmatter(content)
-
+
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
-
+
return True
except Exception as e:
print(f" [Warning] Error processing {file_path}: {e}")
@@ -261,13 +280,13 @@ def process_markdown_file(
def process_all_markdown(target_dir: str, repo_name: str) -> None:
"""
Process all markdown files in target directory.
-
+
Args:
target_dir: Target directory path
repo_name: Repository name
"""
target_path = Path(target_dir)
-
+
# Rename all README.md to index.md for VitePress
readme_files = list(target_path.rglob("README.md"))
for readme in readme_files:
@@ -275,13 +294,13 @@ def process_all_markdown(target_dir: str, repo_name: str) -> None:
if not index_file.exists():
readme.rename(index_file)
print(f" Renamed {readme.relative_to(target_path)} to index.md")
-
+
md_files = list(target_path.rglob("*.md"))
print(f" Processing {len(md_files)} markdown files...")
-
+
success_count = 0
for md_file in md_files:
if process_markdown_file(md_file, repo_name, target_dir):
success_count += 1
-
+
print(f" ✓ Processed {success_count}/{len(md_files)} files successfully")
diff --git a/src/aggregation/transformer.py b/src/aggregation/transformer.py
index 902a59d..275784d 100644
--- a/src/aggregation/transformer.py
+++ b/src/aggregation/transformer.py
@@ -2,7 +2,7 @@
import re
from pathlib import Path
-from typing import Optional, Dict, Tuple
+from typing import Dict, Tuple
def rewrite_links(
@@ -14,41 +14,46 @@ def rewrite_links(
) -> str:
"""
Rewrite internal markdown links to work with VitePress structure.
-
+
Args:
content: The markdown content
repo_name: Name of the repository
file_rel_path: Relative path of the file within the repo
base_path: Base path for projects
github_base: Base URL for GitHub organization
-
+
Returns:
Content with rewritten links
"""
file_dir = str(Path(file_rel_path).parent) if file_rel_path else ""
if file_dir == ".":
file_dir = ""
-
+
def replace_link(match):
text = match.group(1)
link = match.group(2)
-
+
# Skip external links
if link.startswith("http://") or link.startswith("https://"):
return match.group(0)
-
+
# Skip special protocols (mailto, tel, javascript, etc.)
- if ":" in link and not link.startswith("/") and not link.startswith("./") and not link.startswith("../"):
+ if (
+ ":" in link
+ and not link.startswith("/")
+ and not link.startswith("./")
+ and not link.startswith("../")
+ ):
return match.group(0)
-
+
# Skip anchors
if link.startswith("#"):
return match.group(0)
-
+
# Skip if already a /projects/ link
if link.startswith(f"{base_path}/"):
return match.group(0)
-
+
# Handle relative paths for .media directory
if ".media/" in link:
media_part = link
@@ -57,11 +62,11 @@ def replace_link(match):
media_part = media_part.replace("./", "")
new_link = f"{base_path}/{repo_name}/{media_part}"
return f"[{text}]({new_link})"
-
+
# Handle relative links
if link.startswith("../") or link.startswith("./"):
stripped_link = link.replace(".md", "")
-
+
# For ./ links (same directory)
if link.startswith("./"):
stripped_link = stripped_link.replace("./", "")
@@ -73,21 +78,23 @@ def replace_link(match):
# For ../ links, check if they go outside docs/
levels_up = link.count("../")
stripped_link = stripped_link.replace("../", "")
-
+
# Check if we go outside docs/
if file_dir:
dir_depth = len(file_dir.split("/"))
if levels_up > dir_depth:
# Link to GitHub
- new_link = f"{github_base}/{repo_name}/blob/main/{stripped_link}"
+ new_link = (
+ f"{github_base}/{repo_name}/blob/main/{stripped_link}"
+ )
return f"[{text}]({new_link})"
-
+
# Remove numbered prefixes
stripped_link = re.sub(r"\d+_(\w+)", r"\1", stripped_link)
new_link = f"{base_path}/{repo_name}/{stripped_link}"
-
+
return f"[{text}]({new_link})"
-
+
# Handle absolute paths from root
if link.startswith("/"):
if link.startswith(f"{base_path}/"):
@@ -96,7 +103,7 @@ def replace_link(match):
stripped_link = link.lstrip("/")
new_link = f"{github_base}/{repo_name}/blob/main/{stripped_link}"
return f"[{text}]({new_link})"
-
+
# Handle simple filenames (same directory)
if "/" not in link:
stripped_link = link.replace(".md", "")
@@ -105,17 +112,17 @@ def replace_link(match):
else:
new_link = f"{base_path}/{repo_name}/{stripped_link}"
return f"[{text}]({new_link})"
-
+
return match.group(0)
-
+
# Apply transform to markdown links
content = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", replace_link, content)
-
+
# Handle HTML media links
def replace_html_media_link(match):
attr_name = match.group(1)
link = match.group(2)
-
+
if link.startswith(f"{base_path}/"):
return match.group(0)
if ".media/" in link:
@@ -126,23 +133,23 @@ def replace_html_media_link(match):
new_link = f"{base_path}/{repo_name}/{media_part}"
return f'{attr_name}="{new_link}"'
return match.group(0)
-
+
content = re.sub(
r'(src|srcset)="([^"]*\.media/[^"]*)"',
replace_html_media_link,
content,
)
-
+
return content
def quote_yaml_value(value: str) -> str:
"""
Quote YAML value if needed, handling already-quoted values.
-
+
Args:
value: YAML value to potentially quote
-
+
Returns:
Quoted value if needed, otherwise original value
"""
@@ -150,20 +157,34 @@ def quote_yaml_value(value: str) -> str:
if value.startswith('"') and value.endswith('"'):
if not value.startswith('"\\"'):
return value
-
+
if value.startswith("'") and value.endswith("'"):
return value
-
+
special_chars = [
- ":", "#", "@", "`", "|", ">", "*", "&", "!",
- "%", "[", "]", "{", "}", ",", "?",
+ ":",
+ "#",
+ "@",
+ "`",
+ "|",
+ ">",
+ "*",
+ "&",
+ "!",
+ "%",
+ "[",
+ "]",
+ "{",
+ "}",
+ ",",
+ "?",
]
-
+
needs_quoting = any(char in value for char in special_chars)
-
+
if value and (value[0] in ['"', "'", " "] or value[-1] in [" "]):
needs_quoting = True
-
+
if needs_quoting:
if '"' not in value:
return f'"{value}"'
@@ -172,43 +193,43 @@ def quote_yaml_value(value: str) -> str:
else:
escaped_value = value.replace('"', '\\"')
return f'"{escaped_value}"'
-
+
return value
-def parse_frontmatter(content: str) -> Tuple[Optional[Dict[str, str]], str]:
+def parse_frontmatter(content: str) -> Tuple[Dict[str, str] | None, str]:
"""
Parse YAML frontmatter from markdown content.
-
+
Args:
content: Markdown content potentially with frontmatter
-
+
Returns:
Tuple of (frontmatter_dict, content_without_frontmatter)
or (None, original_content) if no frontmatter found.
"""
if not content.startswith("---\n"):
return None, content
-
+
try:
end_match = re.search(r"\n---\n", content[4:])
if not end_match:
return None, content
-
+
frontmatter_text = content[4 : 4 + end_match.start()]
rest_content = content[4 + end_match.end() :]
-
+
frontmatter_dict = {}
for line in frontmatter_text.split("\n"):
line = line.strip()
if not line or ":" not in line:
continue
-
+
key, value = line.split(":", 1)
key = key.strip()
value = value.strip().strip("\"'")
frontmatter_dict[key] = value
-
+
return frontmatter_dict, rest_content
except Exception as e:
print(f" [Warning] Failed to parse frontmatter: {e}")
@@ -218,21 +239,21 @@ def parse_frontmatter(content: str) -> Tuple[Optional[Dict[str, str]], str]:
def fix_yaml_frontmatter(frontmatter_text: str) -> str:
"""
Fix YAML frontmatter formatting.
-
+
Args:
frontmatter_text: Frontmatter content (without --- markers)
-
+
Returns:
Fixed frontmatter text
"""
lines = frontmatter_text.split("\n")
fixed_lines = []
-
+
for line in lines:
if not line.strip():
fixed_lines.append(line)
continue
-
+
if ":" in line:
parts = line.split(":", 1)
if len(parts) == 2:
@@ -241,19 +262,19 @@ def fix_yaml_frontmatter(frontmatter_text: str) -> str:
quoted_value = quote_yaml_value(value)
fixed_lines.append(f"{key}: {quoted_value}")
continue
-
+
fixed_lines.append(line)
-
+
return "\n".join(fixed_lines)
def ensure_frontmatter(content: str) -> str:
"""
Ensure frontmatter exists and fix YAML formatting.
-
+
Args:
content: Markdown content
-
+
Returns:
Content with fixed frontmatter
"""
@@ -263,14 +284,14 @@ def ensure_frontmatter(content: str) -> str:
if end_match:
frontmatter_content = content[4 : 4 + end_match.start()]
rest_content = content[4 + end_match.end() :]
-
+
# Parse and fix the frontmatter
fixed_frontmatter = fix_yaml_frontmatter(frontmatter_content)
-
+
return f"---\n{fixed_frontmatter}\n---\n\n{rest_content}"
except Exception:
print(" [Warning] Couldn't parse existing frontmatter!")
-
+
return content
@@ -284,34 +305,34 @@ def fix_broken_project_links(
"""
Fix links in /projects/ that point to non-existent files.
Replace with GitHub links.
-
+
Args:
content: Markdown content
repo_name: Repository name
target_dir: Target directory path
base_path: Base path for projects
github_base: GitHub base URL
-
+
Returns:
Content with fixed links
"""
target_path = Path(target_dir)
-
+
def check_and_fix_link(match):
text = match.group(1)
link = match.group(2)
-
+
# Only process /projects/{repo}/ links
if not link.startswith(f"{base_path}/{repo_name}/"):
return match.group(0)
-
+
# Extract the path after /projects/{repo}/
rel_path = link[len(f"{base_path}/{repo_name}/") :]
-
+
potential_file = target_path / f"{rel_path}.md"
potential_index = target_path / rel_path / "index.md"
potential_dir = target_path / rel_path
-
+
# If file exists, or directory exists with index.md, keep the link
if (
potential_file.exists()
@@ -323,10 +344,10 @@ def check_and_fix_link(match):
)
):
return match.group(0)
-
+
github_link = f"{github_base}/{repo_name}/blob/main/{rel_path}"
return f"[{text}]({github_link})"
-
+
content = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", check_and_fix_link, content)
return content
@@ -334,52 +355,52 @@ def check_and_fix_link(match):
def cleanup_github_markdown(content: str) -> str:
"""
Clean up GitHub release notes markdown for VitePress compatibility.
-
+
Handles common issues in GitHub release notes that cause VitePress parsing errors:
- Orphaned code fences followed by headers
- Empty code blocks
- Inconsistent fence markers
- Windows line endings
- Content followed by fence on same line
-
+
Args:
content: Raw markdown from GitHub release notes
-
+
Returns:
Cleaned markdown safe for VitePress rendering
"""
if not content:
return content
-
+
# Fix Windows line endings first
content = content.replace("\r\n", "\n").replace("\r", "\n")
-
+
# Remove or fix HTML details/summary tags that cause parsing issues
# Replace content
content with just content
- content = re.sub(r'([^<]*)
', r'\1', content)
- content = re.sub(r' ', '', content)
- content = re.sub(r'', '', content)
- content = re.sub(r'', '', content)
- content = re.sub(r'
', '', content)
-
+ content = re.sub(r"([^<]*)
", r"\1", content)
+ content = re.sub(r" ", "", content)
+ content = re.sub(r"", "", content)
+ content = re.sub(r"", "", content)
+ content = re.sub(r"
", "", content)
+
# Fix patterns where content is followed by fence on same line
# e.g., "tag```" or "content````" should become "tag\n```" or "content\n```"
- content = re.sub(r'([^`])````', r'\1\n```', content)
- content = re.sub(r'([^`])```$', r'\1\n```', content, flags=re.MULTILINE)
- content = re.sub(r'`````', '```', content) # Fix five backticks
- content = re.sub(r'````', '```', content) # Fix four backticks
-
- lines = content.split('\n')
+ content = re.sub(r"([^`])````", r"\1\n```", content)
+ content = re.sub(r"([^`])```$", r"\1\n```", content, flags=re.MULTILINE)
+ content = re.sub(r"`````", "```", content) # Fix five backticks
+ content = re.sub(r"````", "```", content) # Fix four backticks
+
+ lines = content.split("\n")
cleaned = []
i = 0
-
+
while i < len(lines):
line = lines[i]
stripped = line.strip()
-
+
# Detect code fence
is_fence_start = stripped in ("```", "````", "`````")
-
+
# Handle orphan fence followed by header (add blank line)
if is_fence_start and i + 1 < len(lines):
next_line = lines[i + 1].strip()
@@ -388,7 +409,7 @@ def cleanup_github_markdown(content: str) -> str:
cleaned.append("") # Add blank line before header
i += 1
continue
-
+
# Handle empty code block (fence followed immediately by another fence)
if is_fence_start and i + 1 < len(lines):
next_stripped = lines[i + 1].strip()
@@ -396,18 +417,18 @@ def cleanup_github_markdown(content: str) -> str:
# Skip this empty block
i += 2
continue
-
+
# Normalize fence markers (```` or ````` -> ```)
if stripped == "````" or stripped == "`````":
cleaned.append("```")
i += 1
continue
-
+
cleaned.append(line)
i += 1
-
+
# Join and fix multiple blank lines
content = "\n".join(cleaned)
content = re.sub(r"\n{4,}", "\n\n\n", content)
-
+
return content
diff --git a/src/migration_tracker.py b/src/migration_tracker.py
index 28a8422..ea4f953 100755
--- a/src/migration_tracker.py
+++ b/src/migration_tracker.py
@@ -22,15 +22,15 @@
import argparse
import re
import sys
+from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
-from datetime import datetime
def extract_frontmatter(content: str) -> Optional[Dict[str, str]]:
"""Extract YAML frontmatter from markdown content."""
# Match frontmatter between --- markers
- match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL)
+ match = re.match(r"^---\s*\n(.*?)\n---\s*\n", content, re.DOTALL)
if not match:
return None
@@ -38,17 +38,17 @@ def extract_frontmatter(content: str) -> Optional[Dict[str, str]]:
frontmatter = {}
# Parse simple key: value pairs
- for line in frontmatter_text.split('\n'):
- if ':' in line:
- key, value = line.split(':', 1)
- frontmatter[key.strip()] = value.strip().strip('"\'')
+ for line in frontmatter_text.split("\n"):
+ if ":" in line:
+ key, value = line.split(":", 1)
+ frontmatter[key.strip()] = value.strip().strip("\"'")
return frontmatter
def scan_files(directory: Path, exclude_dirs: Optional[List[str]] = None) -> List[Dict]:
"""Scan all markdown files in directory for migration tracking data.
-
+
Args:
directory: Root directory to scan
exclude_dirs: List of directory names to exclude from scanning
@@ -56,18 +56,18 @@ def scan_files(directory: Path, exclude_dirs: Optional[List[str]] = None) -> Lis
files_data = []
exclude_dirs = exclude_dirs or []
- for md_file in directory.rglob('*.md'):
+ for md_file in directory.rglob("*.md"):
# Skip if any part of the full path matches excluded directories
if any(exclude_dir in str(md_file) for exclude_dir in exclude_dirs):
continue
# Skip hidden files and node_modules
- if any(part.startswith('.') for part in md_file.parts):
+ if any(part.startswith(".") for part in md_file.parts):
continue
- if 'node_modules' in md_file.parts:
+ if "node_modules" in md_file.parts:
continue
try:
- with open(md_file, 'r', encoding='utf-8') as f:
+ with open(md_file, "r", encoding="utf-8") as f:
content = f.read()
frontmatter = extract_frontmatter(content)
@@ -75,21 +75,26 @@ def scan_files(directory: Path, exclude_dirs: Optional[List[str]] = None) -> Lis
continue
# Check if this file has migration tracking fields
- if 'migration_status' in frontmatter:
- # Get relative path from the original scan directory
- # Get the relative path from the scan directory
- rel_path = md_file.relative_to(directory)
- # Prepend 'docs/' to create the full path
- full_path = 'docs/' + str(rel_path)
- files_data.append({
- 'path': full_path,
- 'status': frontmatter.get('migration_status', ''),
- 'source': frontmatter.get('migration_source', ''),
- 'issue': frontmatter.get('migration_issue', ''),
- 'stakeholder': frontmatter.get('migration_stakeholder', ''),
- 'approved': frontmatter.get('migration_approved', 'false').lower() == 'true',
- 'title': frontmatter.get('title', md_file.stem)
- })
+ if "migration_status" in frontmatter:
+ # Get relative path from the original scan directory
+ # Get the relative path from the scan directory
+ rel_path = md_file.relative_to(directory)
+ # Prepend 'docs/' to create the full path
+ full_path = "docs/" + str(rel_path)
+ files_data.append(
+ {
+ "path": full_path,
+ "status": frontmatter.get("migration_status", ""),
+ "source": frontmatter.get("migration_source", ""),
+ "issue": frontmatter.get("migration_issue", ""),
+ "stakeholder": frontmatter.get("migration_stakeholder", ""),
+ "approved": frontmatter.get(
+ "migration_approved", "false"
+ ).lower()
+ == "true",
+ "title": frontmatter.get("title", md_file.stem),
+ }
+ )
except Exception as e:
print(f"Warning: Could not process {md_file}: {e}", file=sys.stderr)
@@ -101,8 +106,8 @@ def filter_by_status(files_data: List[Dict], status_filter: str) -> List[Dict]:
if not status_filter:
return files_data
- statuses = [s.strip().lower() for s in status_filter.split(',')]
- return [f for f in files_data if f['status'].lower() in statuses]
+ statuses = [s.strip().lower() for s in status_filter.split(",")]
+ return [f for f in files_data if f["status"].lower() in statuses]
def generate_markdown_report(files_data: List[Dict]) -> str:
@@ -113,11 +118,11 @@ def generate_markdown_report(files_data: List[Dict]) -> str:
# Count by status
status_counts = {}
for f in files_data:
- status = f['status'] or 'unset'
+ status = f["status"] or "unset"
status_counts[status] = status_counts.get(status, 0) + 1
# Count approved
- approved_count = sum(1 for f in files_data if f['approved'])
+ approved_count = sum(1 for f in files_data if f["approved"])
total = len(files_data)
@@ -132,63 +137,71 @@ def generate_markdown_report(files_data: List[Dict]) -> str:
report.append("|--------|-------|------------|")
status_emoji = {
- 'done': '✅',
- 'new': '🆕',
- 'adapt': '🔄',
- 'merge': '🔀',
- 'unset': '❓'
+ "done": "✅",
+ "new": "🆕",
+ "adapt": "🔄",
+ "merge": "🔀",
+ "unset": "❓",
}
- for status in ['done', 'new', 'adapt', 'merge', 'unset']:
+ for status in ["done", "new", "adapt", "merge", "unset"]:
count = status_counts.get(status, 0)
percentage = round((count / total * 100)) if total > 0 else 0
- emoji = status_emoji.get(status, '•')
+ emoji = status_emoji.get(status, "•")
report.append(f"| {emoji} {status} | {count} | {percentage}% |")
- report.append(f"\n**Approved:** {approved_count}/{total} ({round(approved_count/total*100) if total > 0 else 0}%)\n")
+ report.append(
+ f"\n**Approved:** {approved_count}/{total} ({round(approved_count/total*100) if total > 0 else 0}%)\n"
+ )
# Group by status
- for status in ['new', 'adapt', 'merge', 'done', 'unset']:
- status_files = [f for f in files_data if (f['status'] or 'unset') == status]
+ for status in ["new", "adapt", "merge", "done", "unset"]:
+ status_files = [f for f in files_data if (f["status"] or "unset") == status]
if not status_files:
continue
- emoji = status_emoji.get(status, '•')
+ emoji = status_emoji.get(status, "•")
report.append(f"### {emoji} {status.title()} ({len(status_files)} files)\n")
# Table header
- if status == 'merge':
+ if status == "merge":
report.append("| Done | Approved | File | Sources | Issue | Stakeholder |")
report.append("|------|----------|------|---------|-------|-------------|")
- elif status in ['adapt', 'new']:
+ elif status in ["adapt", "new"]:
report.append("| Done | Approved | File | Source | Issue | Stakeholder |")
report.append("|------|----------|------|--------|-------|-------------|")
else:
report.append("| Done | Approved | File | Issue | Stakeholder |")
report.append("|------|----------|------|-------|-------------|")
- for f in sorted(status_files, key=lambda x: x['path']):
- path = f['path']
- issue = f['issue'] if f['issue'] else '-'
- stakeholder = f['stakeholder'] if f['stakeholder'] else '-'
+ for f in sorted(status_files, key=lambda x: x["path"]):
+ path = f["path"]
+ issue = f["issue"] if f["issue"] else "-"
+ stakeholder = f["stakeholder"] if f["stakeholder"] else "-"
# Checkbox for done status
- done_checkbox = '✅' if status == 'done' else '❌'
+ done_checkbox = "✅" if status == "done" else "❌"
# Checkbox for approved
- approved_checkbox = '✅' if f['approved'] else '❌'
-
- if status == 'merge':
- sources = f['source'].replace(',', ', ') if f['source'] else '-'
- report.append(f"| {done_checkbox} | {approved_checkbox} | `{path}` | {sources} | {issue} | {stakeholder} |")
- elif status in ['adapt', 'new']:
- source = f['source'] if f['source'] else '-'
- report.append(f"| {done_checkbox} | {approved_checkbox} | `{path}` | {source} | {issue} | {stakeholder} |")
+ approved_checkbox = "✅" if f["approved"] else "❌"
+
+ if status == "merge":
+ sources = f["source"].replace(",", ", ") if f["source"] else "-"
+ report.append(
+ f"| {done_checkbox} | {approved_checkbox} | `{path}` | {sources} | {issue} | {stakeholder} |"
+ )
+ elif status in ["adapt", "new"]:
+ source = f["source"] if f["source"] else "-"
+ report.append(
+ f"| {done_checkbox} | {approved_checkbox} | `{path}` | {source} | {issue} | {stakeholder} |"
+ )
else:
- report.append(f"| {done_checkbox} | {approved_checkbox} | `{path}` | {issue} | {stakeholder} |")
+ report.append(
+ f"| {done_checkbox} | {approved_checkbox} | `{path}` | {issue} | {stakeholder} |"
+ )
report.append("") # Empty line
- return '\n'.join(report)
+ return "\n".join(report)
def generate_csv_report(files_data: List[Dict]) -> str:
@@ -198,39 +211,41 @@ def generate_csv_report(files_data: List[Dict]) -> str:
lines = ["path,status,source,issue,stakeholder,approved"]
- for f in sorted(files_data, key=lambda x: x['path']):
- approved_str = 'true' if f['approved'] else 'false'
- lines.append(f'"{f["path"]}",{f["status"]},"{f["source"]}","{f["issue"]}","{f["stakeholder"]}",{approved_str}')
+ for f in sorted(files_data, key=lambda x: x["path"]):
+ approved_str = "true" if f["approved"] else "false"
+ lines.append(
+ f'"{f["path"]}",{f["status"]},"{f["source"]}","{f["issue"]}","{f["stakeholder"]}",{approved_str}'
+ )
- return '\n'.join(lines)
+ return "\n".join(lines)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
- description='Track documentation migration progress from frontmatter fields'
+ description="Track documentation migration progress from frontmatter fields"
)
parser.add_argument(
- '--dir',
+ "--dir",
type=Path,
- default=Path('.'),
- help='Directory to scan for markdown files (default: current directory)'
+ default=Path("."),
+ help="Directory to scan for markdown files (default: current directory)",
)
parser.add_argument(
- '--status',
+ "--status",
type=str,
- help='Filter by status (comma-separated): new,adapt,merge,done'
+ help="Filter by status (comma-separated): new,adapt,merge,done",
)
parser.add_argument(
- '--format',
- choices=['markdown', 'csv'],
- default='markdown',
- help='Output format (default: markdown)'
+ "--format",
+ choices=["markdown", "csv"],
+ default="markdown",
+ help="Output format (default: markdown)",
)
parser.add_argument(
- '--exclude-dir',
+ "--exclude-dir",
type=str,
- help='Comma-separated list of directory names to exclude from scanning'
+ help="Comma-separated list of directory names to exclude from scanning",
)
args = parser.parse_args()
@@ -246,9 +261,9 @@ def main():
# Process exclude-dir argument
exclude_dirs = []
if args.exclude_dir:
- exclude_dirs = [d.strip() for d in args.exclude_dir.split(',')]
+ exclude_dirs = [d.strip() for d in args.exclude_dir.split(",")]
print(f"Excluding directories: {exclude_dirs}", file=sys.stderr)
-
+
# Scan files
print(f"Scanning {args.dir}...", file=sys.stderr)
files_data = scan_files(args.dir, exclude_dirs)
@@ -257,10 +272,13 @@ def main():
# Filter if needed
if args.status:
files_data = filter_by_status(files_data, args.status)
- print(f"Filtered to {len(files_data)} files with status: {args.status}", file=sys.stderr)
+ print(
+ f"Filtered to {len(files_data)} files with status: {args.status}",
+ file=sys.stderr,
+ )
# Generate report
- if args.format == 'csv':
+ if args.format == "csv":
report = generate_csv_report(files_data)
else:
report = generate_markdown_report(files_data)
@@ -268,5 +286,5 @@ def main():
print(report)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()