Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 289 additions & 7 deletions code_review_graph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@
code-review-graph register <path> [--alias name]
code-review-graph unregister <path_or_alias>
code-review-graph repos
code-review-graph query <pattern> <target>
code-review-graph impact [--files ...] [--depth N]
code-review-graph search <query> [--kind KIND]
code-review-graph flows [--sort COLUMN]
code-review-graph flow [--id ID | --name NAME]
code-review-graph communities [--sort COLUMN]
code-review-graph community [--id ID | --name NAME]
code-review-graph architecture
code-review-graph large-functions [--min-lines N]
code-review-graph refactor <mode>
"""

from __future__ import annotations
Expand Down Expand Up @@ -89,6 +99,18 @@ def _print_banner() -> None:
{g}eval{r} Run evaluation benchmarks
{g}serve{r} Start MCP server

{b}Graph queries:{r}
{g}query{r} Query relationships {d}(callers_of, callees_of, ...){r}
{g}impact{r} Analyze blast radius of changes
{g}search{r} Search code entities by name/keyword
{g}flows{r} List execution flows by criticality
{g}flow{r} Get details of a single execution flow
{g}communities{r} List detected code communities
{g}community{r} Get details of a single community
{g}architecture{r} Architecture overview from communities
{g}large-functions{r} Find functions exceeding line threshold
{g}refactor{r} Rename preview, dead code, suggestions

{d}Run{r} {b}code-review-graph <command> --help{r} {d}for details{r}
""")

Expand Down Expand Up @@ -150,6 +172,24 @@ def _handle_init(args: argparse.Namespace) -> None:
print(" 2. Restart your AI coding tool to pick up the new config")


def _cli_post_process(store: object) -> None:
"""Run shared post-processing pipeline and print summary for each step."""
from .postprocessing import run_post_processing

pp = run_post_processing(store) # type: ignore[arg-type]
if pp.get("signatures_computed"):
print(f"Signatures: {pp['signatures_computed']} nodes")
if pp.get("fts_indexed"):
print(f"FTS indexed: {pp['fts_indexed']} nodes")
if pp.get("flows_detected") is not None:
print(f"Flows: {pp['flows_detected']}")
if pp.get("communities_detected") is not None:
print(f"Communities: {pp['communities_detected']}")
if pp.get("warnings"):
for w in pp["warnings"]:
print(f" Warning: {w}")


def main() -> None:
"""Main CLI entry point."""
ap = argparse.ArgumentParser(
Expand Down Expand Up @@ -298,6 +338,120 @@ def main() -> None:
serve_cmd = sub.add_parser("serve", help="Start MCP server (stdio transport)")
serve_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# --- CLI-first commands (expose MCP tool functions directly) ---

# query
query_cmd = sub.add_parser(
"query", help="Query graph relationships (callers_of, callees_of, imports_of, etc.)"
)
query_cmd.add_argument(
"pattern",
choices=[
"callers_of", "callees_of", "imports_of", "importers_of",
"children_of", "tests_for", "inheritors_of", "file_summary",
],
help="Query pattern",
)
query_cmd.add_argument("target", help="Node name, qualified name, or file path")
query_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# impact
impact_cmd = sub.add_parser(
"impact", help="Analyze blast radius of changed files"
)
impact_cmd.add_argument(
"--files", nargs="*", default=None,
help="Changed files (auto-detected from git if omitted)",
)
impact_cmd.add_argument("--depth", type=int, default=2, help="BFS hops (default: 2)")
impact_cmd.add_argument("--base", default="HEAD~1", help="Git diff base (default: HEAD~1)")
impact_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# search
search_cmd = sub.add_parser("search", help="Search code entities by name/keyword")
search_cmd.add_argument("query", help="Search string")
search_cmd.add_argument(
"--kind", default=None,
choices=["File", "Class", "Function", "Type", "Test"],
help="Filter by entity kind",
)
search_cmd.add_argument("--limit", type=int, default=20, help="Max results (default: 20)")
search_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# flows
flows_cmd = sub.add_parser("flows", help="List execution flows by criticality")
flows_cmd.add_argument(
"--sort", default="criticality",
choices=["criticality", "depth", "node_count", "file_count", "name"],
help="Sort column (default: criticality)",
)
flows_cmd.add_argument("--limit", type=int, default=20, help="Max results (default: 20)")
flows_cmd.add_argument(
"--kind", default=None, help="Filter by entry point kind (e.g. Test, Function)"
)
flows_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# flow (single)
flow_cmd = sub.add_parser("flow", help="Get details of a single execution flow")
flow_cmd.add_argument("--id", type=int, default=None, help="Flow ID")
flow_cmd.add_argument("--name", default=None, help="Flow name (partial match)")
flow_cmd.add_argument(
"--source", action="store_true", help="Include source code snippets"
)
flow_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# communities
comms_cmd = sub.add_parser("communities", help="List detected code communities")
comms_cmd.add_argument(
"--sort", default="size", choices=["size", "cohesion", "name"],
help="Sort column (default: size)",
)
comms_cmd.add_argument("--min-size", type=int, default=0, help="Min community size")
comms_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# community (single)
comm_cmd = sub.add_parser("community", help="Get details of a single community")
comm_cmd.add_argument("--id", type=int, default=None, help="Community ID")
comm_cmd.add_argument("--name", default=None, help="Community name (partial match)")
comm_cmd.add_argument(
"--members", action="store_true", help="Include member node details"
)
comm_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# architecture
arch_cmd = sub.add_parser("architecture", help="Architecture overview from community structure")
arch_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# large-functions
large_cmd = sub.add_parser(
"large-functions", help="Find functions/classes exceeding line threshold"
)
large_cmd.add_argument("--min-lines", type=int, default=50, help="Min lines (default: 50)")
large_cmd.add_argument(
"--kind", default=None, choices=["Function", "Class", "File", "Test"],
help="Filter by kind",
)
large_cmd.add_argument("--path", default=None, help="Filter by file path substring")
large_cmd.add_argument("--limit", type=int, default=50, help="Max results (default: 50)")
large_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

# refactor
refactor_cmd = sub.add_parser(
"refactor", help="Rename preview, dead code detection, suggestions"
)
refactor_cmd.add_argument(
"mode", choices=["rename", "dead_code", "suggest"],
help="Operation mode",
)
refactor_cmd.add_argument("--old-name", default=None, help="(rename) Current symbol name")
refactor_cmd.add_argument("--new-name", default=None, help="(rename) New symbol name")
refactor_cmd.add_argument(
"--kind", default=None, choices=["Function", "Class"],
help="(dead_code) Filter by kind",
)
refactor_cmd.add_argument("--path", default=None, help="(dead_code) Filter by file path")
refactor_cmd.add_argument("--repo", default=None, help="Repository root (auto-detected)")

args = ap.parse_args()

if args.version:
Expand Down Expand Up @@ -392,18 +546,22 @@ def main() -> None:

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

from .graph import GraphStore
from .incremental import (
find_project_root,
find_repo_root,
full_build,
get_db_path,
incremental_update,
watch,
)

if args.command in ("update", "detect-changes"):
# update and detect-changes require git for diffing
# Commands that delegate to tool functions which create their own GraphStore.
# We skip opening a redundant store for these.
delegated_commands = {
"query", "impact", "search", "flows", "flow",
"communities", "community", "architecture",
"large-functions", "refactor",
}

if args.command in ("update", "detect-changes", "impact"):
# update, detect-changes, and impact require git for diffing
repo_root = Path(args.repo) if args.repo else find_repo_root()
if not repo_root:
logging.error(
Expand All @@ -416,6 +574,35 @@ def main() -> None:
repo_root = Path(args.repo) if args.repo else find_project_root()

db_path = get_db_path(repo_root)

# For delegated commands, warn if the graph hasn't been built yet, then
# delegate directly to the tool functions (they manage their own store).
if args.command in delegated_commands:
if not db_path.exists():
print(
"Error: Graph not built yet. "
"Run 'code-review-graph build' first."
)
sys.exit(1)
_run_delegated_command(args, repo_root)
return

# For non-delegated commands that need the graph DB (everything except build),
# warn if the DB is missing.
if args.command != "build" and not db_path.exists():
print(
"WARNING: Graph not built yet. "
"Run 'code-review-graph build' first."
)
print()

from .graph import GraphStore
from .incremental import (
full_build,
incremental_update,
watch,
)

store = GraphStore(db_path)

try:
Expand All @@ -427,13 +614,16 @@ def main() -> None:
)
if result["errors"]:
print(f"Errors: {len(result['errors'])}")
_cli_post_process(store)

elif args.command == "update":
result = incremental_update(repo_root, store, base=args.base)
print(
f"Incremental: {result['files_updated']} files updated, "
f"{result['total_nodes']} nodes, {result['total_edges']} edges"
)
if result.get("files_updated", 0) > 0:
_cli_post_process(store)

elif args.command == "status":
stats = store.get_stats()
Expand All @@ -459,7 +649,8 @@ def main() -> None:
)

elif args.command == "watch":
watch(repo_root, store)
from .postprocessing import run_post_processing
watch(repo_root, store, on_files_updated=run_post_processing)

elif args.command == "visualize":
from .visualization import generate_html
Expand Down Expand Up @@ -524,3 +715,94 @@ def main() -> None:

finally:
store.close()


def _run_delegated_command(args: argparse.Namespace, repo_root: Path) -> None:
"""Run commands that delegate to tool functions with their own GraphStore."""
if args.command == "query":
from .tools import query_graph
result = query_graph(
pattern=args.pattern, target=args.target,
repo_root=str(repo_root),
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "impact":
from .tools import get_impact_radius
result = get_impact_radius(
changed_files=args.files, max_depth=args.depth,
repo_root=str(repo_root), base=args.base,
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "search":
from .tools import semantic_search_nodes
result = semantic_search_nodes(
query=args.query, kind=args.kind, limit=args.limit,
repo_root=str(repo_root),
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "flows":
from .tools import list_flows
result = list_flows(
repo_root=str(repo_root), sort_by=args.sort,
limit=args.limit, kind=args.kind,
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "flow":
if args.id is None and args.name is None:
print("Error: provide --id or --name to select a flow.")
sys.exit(1)
from .tools import get_flow
result = get_flow(
flow_id=args.id, flow_name=args.name,
include_source=args.source, repo_root=str(repo_root),
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "communities":
from .tools import list_communities_func
result = list_communities_func(
repo_root=str(repo_root), sort_by=args.sort,
min_size=args.min_size,
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "community":
if args.id is None and args.name is None:
print("Error: provide --id or --name to select a community.")
sys.exit(1)
from .tools import get_community_func
result = get_community_func(
community_name=args.name, community_id=args.id,
include_members=args.members, repo_root=str(repo_root),
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "architecture":
from .tools import get_architecture_overview_func
result = get_architecture_overview_func(repo_root=str(repo_root))
print(json.dumps(result, indent=2, default=str))

elif args.command == "large-functions":
from .tools import find_large_functions
result = find_large_functions(
min_lines=args.min_lines, kind=args.kind,
file_path_pattern=args.path, limit=args.limit,
repo_root=str(repo_root),
)
print(json.dumps(result, indent=2, default=str))

elif args.command == "refactor":
if args.mode == "rename" and (not args.old_name or not args.new_name):
print("Error: refactor rename requires --old-name and --new-name.")
sys.exit(1)
from .tools import refactor_func
result = refactor_func(
mode=args.mode, old_name=args.old_name,
new_name=args.new_name, kind=args.kind,
file_pattern=args.path, repo_root=str(repo_root),
)
print(json.dumps(result, indent=2, default=str))
Loading