Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2405,6 +2405,89 @@ def preset_set_priority(
console.print("\n[dim]Lower priority = higher precedence in template resolution[/dim]")


@preset_app.command("enable")
def preset_enable(
pack_id: str = typer.Argument(help="Preset ID to enable"),
):
"""Enable a disabled preset."""
from .presets import PresetManager

project_root = Path.cwd()

# Check if we're in a spec-kit project
specify_dir = project_root / ".specify"
if not specify_dir.exists():
console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)")
console.print("Run this command from a spec-kit project root")
raise typer.Exit(1)

manager = PresetManager(project_root)

# Check if preset is installed
if not manager.registry.is_installed(pack_id):
console.print(f"[red]Error:[/red] Preset '{pack_id}' is not installed")
raise typer.Exit(1)

# Get current metadata
metadata = manager.registry.get(pack_id)
if metadata is None or not isinstance(metadata, dict):
Comment thread
mbachorik marked this conversation as resolved.
console.print(f"[red]Error:[/red] Preset '{pack_id}' not found in registry (corrupted state)")
raise typer.Exit(1)

if metadata.get("enabled", True):
console.print(f"[yellow]Preset '{pack_id}' is already enabled[/yellow]")
raise typer.Exit(0)

# Enable the preset
manager.registry.update(pack_id, {"enabled": True})

console.print(f"[green]✓[/green] Preset '{pack_id}' enabled")
console.print("\nTemplates from this preset will now be included in resolution.")
Comment thread
mbachorik marked this conversation as resolved.
console.print("[dim]Note: Previously registered commands/skills remain active.[/dim]")


@preset_app.command("disable")
def preset_disable(
pack_id: str = typer.Argument(help="Preset ID to disable"),
):
"""Disable a preset without removing it."""
from .presets import PresetManager

project_root = Path.cwd()

# Check if we're in a spec-kit project
specify_dir = project_root / ".specify"
if not specify_dir.exists():
console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)")
console.print("Run this command from a spec-kit project root")
raise typer.Exit(1)

manager = PresetManager(project_root)

# Check if preset is installed
if not manager.registry.is_installed(pack_id):
Comment thread
mbachorik marked this conversation as resolved.
console.print(f"[red]Error:[/red] Preset '{pack_id}' is not installed")
raise typer.Exit(1)

# Get current metadata
metadata = manager.registry.get(pack_id)
if metadata is None or not isinstance(metadata, dict):
console.print(f"[red]Error:[/red] Preset '{pack_id}' not found in registry (corrupted state)")
raise typer.Exit(1)

if not metadata.get("enabled", True):
console.print(f"[yellow]Preset '{pack_id}' is already disabled[/yellow]")
raise typer.Exit(0)

# Disable the preset
manager.registry.update(pack_id, {"enabled": False})

console.print(f"[green]✓[/green] Preset '{pack_id}' disabled")
console.print("\nTemplates from this preset will be skipped during resolution.")
console.print("[dim]Note: Previously registered commands/skills remain active until preset removal.[/dim]")
console.print(f"To re-enable: specify preset enable {pack_id}")
Comment thread
mbachorik marked this conversation as resolved.


# ===== Preset Catalog Commands =====


Expand Down
56 changes: 46 additions & 10 deletions src/specify_cli/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,17 @@ def _load(self) -> dict:

try:
with open(self.registry_path, 'r') as f:
return json.load(f)
data = json.load(f)
# Validate loaded data is a dict (handles corrupted registry files)
if not isinstance(data, dict):
return {
"schema_version": self.SCHEMA_VERSION,
"extensions": {}
}
Comment thread
mbachorik marked this conversation as resolved.
# Normalize extensions field (handles corrupted extensions value)
if not isinstance(data.get("extensions"), dict):
data["extensions"] = {}
return data
except (json.JSONDecodeError, FileNotFoundError):
# Corrupted or missing registry, start fresh
return {
Expand Down Expand Up @@ -296,8 +306,16 @@ def restore(self, extension_id: str, metadata: dict):
Args:
extension_id: Extension ID
metadata: Complete extension metadata including installed_at

Raises:
ValueError: If metadata is None or not a dict
"""
self.data["extensions"][extension_id] = dict(metadata)
if metadata is None or not isinstance(metadata, dict):
raise ValueError(f"Cannot restore '{extension_id}': metadata must be a dict")
# Ensure extensions dict exists (handle corrupted registry)
if not isinstance(self.data.get("extensions"), dict):
self.data["extensions"] = {}
self.data["extensions"][extension_id] = copy.deepcopy(metadata)
self._save()

def remove(self, extension_id: str):
Expand All @@ -320,10 +338,16 @@ def get(self, extension_id: str) -> Optional[dict]:
extension_id: Extension ID

Returns:
Deep copy of extension metadata, or None if not found
Deep copy of extension metadata, or None if not found or corrupted
"""
entry = self.data["extensions"].get(extension_id)
return copy.deepcopy(entry) if entry is not None else None
extensions = self.data.get("extensions")
if not isinstance(extensions, dict):
return None
entry = extensions.get(extension_id)
Comment thread
mbachorik marked this conversation as resolved.
# Return None for missing or corrupted (non-dict) entries
if entry is None or not isinstance(entry, dict):
return None
return copy.deepcopy(entry)
Comment thread
mbachorik marked this conversation as resolved.

def list(self) -> Dict[str, dict]:
"""Get all installed extensions.
Expand All @@ -332,9 +356,12 @@ def list(self) -> Dict[str, dict]:
from accidentally mutating nested internal registry state.

Returns:
Dictionary of extension_id -> metadata (deep copies)
Dictionary of extension_id -> metadata (deep copies), empty dict if corrupted
"""
return copy.deepcopy(self.data["extensions"])
extensions = self.data.get("extensions", {}) or {}
if not isinstance(extensions, dict):
return {}
return copy.deepcopy(extensions)
Comment thread
mbachorik marked this conversation as resolved.
Outdated

def is_installed(self, extension_id: str) -> bool:
"""Check if extension is installed.
Expand All @@ -343,17 +370,23 @@ def is_installed(self, extension_id: str) -> bool:
extension_id: Extension ID

Returns:
True if extension is installed
True if extension is installed, False if not or registry corrupted
"""
return extension_id in self.data["extensions"]
extensions = self.data.get("extensions")
if not isinstance(extensions, dict):
return False
return extension_id in extensions

def list_by_priority(self) -> List[tuple]:
def list_by_priority(self, include_disabled: bool = False) -> List[tuple]:
"""Get all installed extensions sorted by priority.

Lower priority number = higher precedence (checked first).
Extensions with equal priority are sorted alphabetically by ID
for deterministic ordering.

Args:
include_disabled: If True, include disabled extensions. Default False.

Returns:
List of (extension_id, metadata_copy) tuples sorted by priority.
Metadata is deep-copied to prevent accidental mutation.
Expand All @@ -365,6 +398,9 @@ def list_by_priority(self) -> List[tuple]:
for ext_id, meta in extensions.items():
if not isinstance(meta, dict):
continue
# Skip disabled extensions unless explicitly requested
if not include_disabled and not meta.get("enabled", True):
continue
Comment thread
mbachorik marked this conversation as resolved.
metadata_copy = copy.deepcopy(meta)
metadata_copy["priority"] = normalize_priority(metadata_copy.get("priority", 10))
sortable_extensions.append((ext_id, metadata_copy))
Expand Down
87 changes: 76 additions & 11 deletions src/specify_cli/presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,17 @@ def _load(self) -> dict:

try:
with open(self.registry_path, 'r') as f:
return json.load(f)
data = json.load(f)
# Validate loaded data is a dict (handles corrupted registry files)
if not isinstance(data, dict):
return {
"schema_version": self.SCHEMA_VERSION,
"presets": {}
}
Comment thread
mbachorik marked this conversation as resolved.
# Normalize presets field (handles corrupted presets value)
if not isinstance(data.get("presets"), dict):
data["presets"] = {}
return data
except (json.JSONDecodeError, FileNotFoundError):
return {
"schema_version": self.SCHEMA_VERSION,
Expand Down Expand Up @@ -306,32 +316,73 @@ def update(self, pack_id: str, updates: dict):
self.data["presets"][pack_id] = merged
self._save()

def restore(self, pack_id: str, metadata: dict):
"""Restore preset metadata to registry without modifying timestamps.

Use this method for rollback scenarios where you have a complete backup
of the registry entry (including installed_at) and want to restore it
exactly as it was.

Args:
pack_id: Preset ID
metadata: Complete preset metadata including installed_at

Raises:
ValueError: If metadata is None or not a dict
"""
if metadata is None or not isinstance(metadata, dict):
raise ValueError(f"Cannot restore '{pack_id}': metadata must be a dict")
# Ensure presets dict exists (handle corrupted registry)
if not isinstance(self.data.get("presets"), dict):
self.data["presets"] = {}
self.data["presets"][pack_id] = copy.deepcopy(metadata)
Comment thread
mbachorik marked this conversation as resolved.
self._save()

def get(self, pack_id: str) -> Optional[dict]:
"""Get preset metadata from registry.

Returns a deep copy to prevent callers from accidentally mutating
nested internal registry state without going through the write path.

Args:
pack_id: Preset ID

Returns:
Pack metadata or None if not found
Deep copy of preset metadata, or None if not found or corrupted
"""
return self.data["presets"].get(pack_id)
packs = self.data.get("presets")
if not isinstance(packs, dict):
return None
entry = packs.get(pack_id)
Comment thread
mbachorik marked this conversation as resolved.
# Return None for missing or corrupted (non-dict) entries
if entry is None or not isinstance(entry, dict):
return None
return copy.deepcopy(entry)

def list(self) -> Dict[str, dict]:
"""Get all installed presets.

Returns a deep copy of the presets mapping to prevent callers
from accidentally mutating nested internal registry state.

Returns:
Dictionary of pack_id -> metadata
Dictionary of pack_id -> metadata (deep copies), empty dict if corrupted
"""
return self.data["presets"]
packs = self.data.get("presets", {}) or {}
if not isinstance(packs, dict):
return {}
return copy.deepcopy(packs)
Comment thread
mbachorik marked this conversation as resolved.
Outdated

def list_by_priority(self) -> List[tuple]:
def list_by_priority(self, include_disabled: bool = False) -> List[tuple]:
"""Get all installed presets sorted by priority.

Lower priority number = higher precedence (checked first).
Presets with equal priority are sorted alphabetically by ID
for deterministic ordering.

Args:
include_disabled: If True, include disabled presets. Default False.

Returns:
List of (pack_id, metadata_copy) tuples sorted by priority.
Metadata is deep-copied to prevent accidental mutation.
Expand All @@ -343,6 +394,9 @@ def list_by_priority(self) -> List[tuple]:
for pack_id, meta in packs.items():
if not isinstance(meta, dict):
continue
# Skip disabled presets unless explicitly requested
if not include_disabled and not meta.get("enabled", True):
continue
metadata_copy = copy.deepcopy(meta)
metadata_copy["priority"] = normalize_priority(metadata_copy.get("priority", 10))
sortable_packs.append((pack_id, metadata_copy))
Expand All @@ -358,9 +412,12 @@ def is_installed(self, pack_id: str) -> bool:
pack_id: Preset ID

Returns:
True if pack is installed
True if pack is installed, False if not or registry corrupted
"""
return pack_id in self.data["presets"]
packs = self.data.get("presets")
if not isinstance(packs, dict):
return False
return pack_id in packs


class PresetManager:
Expand Down Expand Up @@ -1466,12 +1523,20 @@ def _get_all_extensions_by_priority(self) -> list[tuple[int, str, dict | None]]:
return []

registry = ExtensionRegistry(self.extensions_dir)
registered_extensions = registry.list_by_priority()
registered_extension_ids = {ext_id for ext_id, _ in registered_extensions}
# Use raw registry keys to track ALL extensions (including corrupted entries)
# This prevents corrupted entries from being picked up as "unregistered" dirs
registered_extension_ids = set(registry.list().keys())
Comment thread
mbachorik marked this conversation as resolved.
Outdated

# Get enabled extensions for resolution (list_by_priority skips corrupted/disabled)
Comment thread
mbachorik marked this conversation as resolved.
Outdated
all_registered = registry.list_by_priority(include_disabled=True)

all_extensions: list[tuple[int, str, dict | None]] = []

for ext_id, metadata in registered_extensions:
# Only include enabled extensions in the result
for ext_id, metadata in all_registered:
# Skip disabled extensions
if not metadata.get("enabled", True):
continue
priority = normalize_priority(metadata.get("priority") if metadata else None)
all_extensions.append((priority, ext_id, metadata))

Expand Down
Loading
Loading