Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 38 additions & 29 deletions code_review_graph/communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,36 +453,45 @@ def store_communities(
# that are tightly coupled to the DB transaction lifecycle.
conn = store._conn

# Clear existing data
conn.execute("DELETE FROM communities")
conn.execute("UPDATE nodes SET community_id = NULL")

count = 0
for comm in communities:
cursor = conn.execute(
"""INSERT INTO communities (name, level, cohesion, size, dominant_language, description)
VALUES (?, ?, ?, ?, ?, ?)""",
(
comm["name"],
comm.get("level", 0),
comm.get("cohesion", 0.0),
comm["size"],
comm.get("dominant_language", ""),
comm.get("description", ""),
),
)
community_id = cursor.lastrowid

# Update community_id on member nodes
member_qns = comm.get("members", [])
for qn in member_qns:
conn.execute(
"UPDATE nodes SET community_id = ? WHERE qualified_name = ?",
(community_id, qn),
if conn.in_transaction:
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
conn.rollback()
conn.execute("BEGIN IMMEDIATE")
try:
# Clear existing data
conn.execute("DELETE FROM communities")
conn.execute("UPDATE nodes SET community_id = NULL")

count = 0
for comm in communities:
cursor = conn.execute(
"""INSERT INTO communities
(name, level, cohesion, size, dominant_language, description)
VALUES (?, ?, ?, ?, ?, ?)""",
(
comm["name"],
comm.get("level", 0),
comm.get("cohesion", 0.0),
comm["size"],
comm.get("dominant_language", ""),
comm.get("description", ""),
),
)
count += 1

conn.commit()
community_id = cursor.lastrowid

# Update community_id on member nodes
member_qns = comm.get("members", [])
for qn in member_qns:
conn.execute(
"UPDATE nodes SET community_id = ? WHERE qualified_name = ?",
(community_id, qn),
)
count += 1

conn.commit()
except BaseException:
conn.rollback()
raise
return count


Expand Down
74 changes: 41 additions & 33 deletions code_review_graph/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,41 +299,49 @@ def store_flows(store: GraphStore, flows: list[dict]) -> int:
# tightly coupled to the DB transaction lifecycle.
conn = store._conn

# Clear old data.
conn.execute("DELETE FROM flow_memberships")
conn.execute("DELETE FROM flows")

count = 0
for flow in flows:
path_json = json.dumps(flow.get("path", []))
conn.execute(
"""INSERT INTO flows
(name, entry_point_id, depth, node_count, file_count,
criticality, path_json)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
flow["name"],
flow["entry_point_id"],
flow["depth"],
flow["node_count"],
flow["file_count"],
flow["criticality"],
path_json,
),
)
flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]

# Insert memberships.
node_ids = flow.get("path", [])
for position, node_id in enumerate(node_ids):
if conn.in_transaction:
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
conn.rollback()
conn.execute("BEGIN IMMEDIATE")
try:
# Clear old data.
conn.execute("DELETE FROM flow_memberships")
conn.execute("DELETE FROM flows")

count = 0
for flow in flows:
path_json = json.dumps(flow.get("path", []))
conn.execute(
"INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) "
"VALUES (?, ?, ?)",
(flow_id, node_id, position),
"""INSERT INTO flows
(name, entry_point_id, depth, node_count, file_count,
criticality, path_json)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
flow["name"],
flow["entry_point_id"],
flow["depth"],
flow["node_count"],
flow["file_count"],
flow["criticality"],
path_json,
),
)
count += 1

conn.commit()
flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]

# Insert memberships.
node_ids = flow.get("path", [])
for position, node_id in enumerate(node_ids):
conn.execute(
"INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) "
"VALUES (?, ?, ?)",
(flow_id, node_id, position),
)
count += 1

conn.commit()
except BaseException:
conn.rollback()
raise
return count


Expand Down
10 changes: 10 additions & 0 deletions code_review_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import annotations

import json
import logging
import sqlite3
import threading
import time
Expand All @@ -20,6 +21,8 @@
from .migrations import get_schema_version, run_migrations
from .parser import EdgeInfo, NodeInfo

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -232,6 +235,9 @@ def store_file_nodes_edges(
self, file_path: str, nodes: list[NodeInfo], edges: list[EdgeInfo], fhash: str = ""
) -> None:
"""Atomically replace all data for a file."""
if self._conn.in_transaction:
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
self._conn.rollback()
self._conn.execute("BEGIN IMMEDIATE")
try:
self.remove_file_data(file_path)
Expand All @@ -258,6 +264,10 @@ def get_metadata(self, key: str) -> Optional[str]:
def commit(self) -> None:
self._conn.commit()

def rollback(self) -> None:
"""Rollback the current transaction."""
self._conn.rollback()

# --- Read operations ---

def get_node(self, qualified_name: str) -> Optional[GraphNode]:
Expand Down
41 changes: 24 additions & 17 deletions code_review_graph/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

logger = logging.getLogger(__name__)

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# FTS5 index management
Expand All @@ -35,23 +37,28 @@ def rebuild_fts_index(store: GraphStore) -> int:
# the FTS5 virtual table DDL, which is tightly coupled to SQLite internals.
conn = store._conn

# Drop and recreate the FTS table to avoid content-sync mismatch issues
conn.execute("DROP TABLE IF EXISTS nodes_fts")
conn.execute("""
CREATE VIRTUAL TABLE nodes_fts USING fts5(
name, qualified_name, file_path, signature,
tokenize='porter unicode61'
)
""")
conn.commit()

# Populate from nodes table
conn.execute("""
INSERT INTO nodes_fts(rowid, name, qualified_name, file_path, signature)
SELECT id, name, qualified_name, file_path, COALESCE(signature, '')
FROM nodes
""")
conn.commit()
if conn.in_transaction:
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
conn.rollback()
conn.execute("BEGIN IMMEDIATE")
try:
# Drop and recreate the FTS table with content sync to match migration v5
conn.execute("DROP TABLE IF EXISTS nodes_fts")
conn.execute("""
CREATE VIRTUAL TABLE nodes_fts USING fts5(
name, qualified_name, file_path, signature,
content='nodes', content_rowid='rowid',
tokenize='porter unicode61'
)
""")

# Rebuild from the content table (nodes) using the FTS5 rebuild command
conn.execute("INSERT INTO nodes_fts(nodes_fts) VALUES('rebuild')")

conn.commit()
except BaseException:
conn.rollback()
raise

count = conn.execute("SELECT count(*) FROM nodes_fts").fetchone()[0]
logger.info("FTS index rebuilt: %d rows indexed", count)
Expand Down
4 changes: 4 additions & 0 deletions code_review_graph/tools/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def build_or_update_graph(
store.update_node_signature(node_id, sig[:512])
store.commit()
except (sqlite3.OperationalError, TypeError, KeyError) as e:
store.rollback()
logger.warning("Signature computation failed: %s", e)
warnings.append(f"Signature computation failed: {type(e).__name__}: {e}")

Expand All @@ -95,6 +96,7 @@ def build_or_update_graph(
fts_count = rebuild_fts_index(store)
build_result["fts_indexed"] = fts_count
except (sqlite3.OperationalError, ImportError) as e:
store.rollback()
logger.warning("FTS index rebuild failed: %s", e)
warnings.append(f"FTS index rebuild failed: {type(e).__name__}: {e}")

Expand All @@ -107,6 +109,7 @@ def build_or_update_graph(
count = _store_flows(store, flows)
build_result["flows_detected"] = count
except (sqlite3.OperationalError, ImportError) as e:
store.rollback()
logger.warning("Flow detection failed: %s", e)
warnings.append(f"Flow detection failed: {type(e).__name__}: {e}")

Expand All @@ -123,6 +126,7 @@ def build_or_update_graph(
count = _store_communities(store, comms)
build_result["communities_detected"] = count
except (sqlite3.OperationalError, ImportError) as e:
store.rollback()
logger.warning("Community detection failed: %s", e)
warnings.append(f"Community detection failed: {type(e).__name__}: {e}")

Expand Down