diff --git a/code_review_graph/communities.py b/code_review_graph/communities.py index cc14cab..28d6cc1 100644 --- a/code_review_graph/communities.py +++ b/code_review_graph/communities.py @@ -453,36 +453,45 @@ def store_communities( # that are tightly coupled to the DB transaction lifecycle. conn = store._conn - # Clear existing data - conn.execute("DELETE FROM communities") - conn.execute("UPDATE nodes SET community_id = NULL") - - count = 0 - for comm in communities: - cursor = conn.execute( - """INSERT INTO communities (name, level, cohesion, size, dominant_language, description) - VALUES (?, ?, ?, ?, ?, ?)""", - ( - comm["name"], - comm.get("level", 0), - comm.get("cohesion", 0.0), - comm["size"], - comm.get("dominant_language", ""), - comm.get("description", ""), - ), - ) - community_id = cursor.lastrowid - - # Update community_id on member nodes - member_qns = comm.get("members", []) - for qn in member_qns: - conn.execute( - "UPDATE nodes SET community_id = ? WHERE qualified_name = ?", - (community_id, qn), + if conn.in_transaction: + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + conn.rollback() + conn.execute("BEGIN IMMEDIATE") + try: + # Clear existing data + conn.execute("DELETE FROM communities") + conn.execute("UPDATE nodes SET community_id = NULL") + + count = 0 + for comm in communities: + cursor = conn.execute( + """INSERT INTO communities + (name, level, cohesion, size, dominant_language, description) + VALUES (?, ?, ?, ?, ?, ?)""", + ( + comm["name"], + comm.get("level", 0), + comm.get("cohesion", 0.0), + comm["size"], + comm.get("dominant_language", ""), + comm.get("description", ""), + ), ) - count += 1 - - conn.commit() + community_id = cursor.lastrowid + + # Update community_id on member nodes + member_qns = comm.get("members", []) + for qn in member_qns: + conn.execute( + "UPDATE nodes SET community_id = ? WHERE qualified_name = ?", + (community_id, qn), + ) + count += 1 + + conn.commit() + except BaseException: + conn.rollback() + raise return count diff --git a/code_review_graph/flows.py b/code_review_graph/flows.py index 8c7374c..c74713a 100644 --- a/code_review_graph/flows.py +++ b/code_review_graph/flows.py @@ -299,41 +299,49 @@ def store_flows(store: GraphStore, flows: list[dict]) -> int: # tightly coupled to the DB transaction lifecycle. conn = store._conn - # Clear old data. - conn.execute("DELETE FROM flow_memberships") - conn.execute("DELETE FROM flows") - - count = 0 - for flow in flows: - path_json = json.dumps(flow.get("path", [])) - conn.execute( - """INSERT INTO flows - (name, entry_point_id, depth, node_count, file_count, - criticality, path_json) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - ( - flow["name"], - flow["entry_point_id"], - flow["depth"], - flow["node_count"], - flow["file_count"], - flow["criticality"], - path_json, - ), - ) - flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] - - # Insert memberships. - node_ids = flow.get("path", []) - for position, node_id in enumerate(node_ids): + if conn.in_transaction: + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + conn.rollback() + conn.execute("BEGIN IMMEDIATE") + try: + # Clear old data. + conn.execute("DELETE FROM flow_memberships") + conn.execute("DELETE FROM flows") + + count = 0 + for flow in flows: + path_json = json.dumps(flow.get("path", [])) conn.execute( - "INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) " - "VALUES (?, ?, ?)", - (flow_id, node_id, position), + """INSERT INTO flows + (name, entry_point_id, depth, node_count, file_count, + criticality, path_json) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + flow["name"], + flow["entry_point_id"], + flow["depth"], + flow["node_count"], + flow["file_count"], + flow["criticality"], + path_json, + ), ) - count += 1 - - conn.commit() + flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + + # Insert memberships. + node_ids = flow.get("path", []) + for position, node_id in enumerate(node_ids): + conn.execute( + "INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) " + "VALUES (?, ?, ?)", + (flow_id, node_id, position), + ) + count += 1 + + conn.commit() + except BaseException: + conn.rollback() + raise return count diff --git a/code_review_graph/graph.py b/code_review_graph/graph.py index 2fe2ccd..765aea8 100644 --- a/code_review_graph/graph.py +++ b/code_review_graph/graph.py @@ -8,6 +8,7 @@ from __future__ import annotations import json +import logging import sqlite3 import threading import time @@ -20,6 +21,8 @@ from .migrations import get_schema_version, run_migrations from .parser import EdgeInfo, NodeInfo +logger = logging.getLogger(__name__) + # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- @@ -232,6 +235,9 @@ def store_file_nodes_edges( self, file_path: str, nodes: list[NodeInfo], edges: list[EdgeInfo], fhash: str = "" ) -> None: """Atomically replace all data for a file.""" + if self._conn.in_transaction: + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + self._conn.rollback() self._conn.execute("BEGIN IMMEDIATE") try: self.remove_file_data(file_path) @@ -258,6 +264,10 @@ def get_metadata(self, key: str) -> Optional[str]: def commit(self) -> None: self._conn.commit() + def rollback(self) -> None: + """Rollback the current transaction.""" + self._conn.rollback() + # --- Read operations --- def get_node(self, qualified_name: str) -> Optional[GraphNode]: diff --git a/code_review_graph/search.py b/code_review_graph/search.py index d2eb84e..7d3ce47 100644 --- a/code_review_graph/search.py +++ b/code_review_graph/search.py @@ -16,6 +16,8 @@ logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) + # --------------------------------------------------------------------------- # FTS5 index management @@ -35,23 +37,28 @@ def rebuild_fts_index(store: GraphStore) -> int: # the FTS5 virtual table DDL, which is tightly coupled to SQLite internals. conn = store._conn - # Drop and recreate the FTS table to avoid content-sync mismatch issues - conn.execute("DROP TABLE IF EXISTS nodes_fts") - conn.execute(""" - CREATE VIRTUAL TABLE nodes_fts USING fts5( - name, qualified_name, file_path, signature, - tokenize='porter unicode61' - ) - """) - conn.commit() - - # Populate from nodes table - conn.execute(""" - INSERT INTO nodes_fts(rowid, name, qualified_name, file_path, signature) - SELECT id, name, qualified_name, file_path, COALESCE(signature, '') - FROM nodes - """) - conn.commit() + if conn.in_transaction: + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + conn.rollback() + conn.execute("BEGIN IMMEDIATE") + try: + # Drop and recreate the FTS table with content sync to match migration v5 + conn.execute("DROP TABLE IF EXISTS nodes_fts") + conn.execute(""" + CREATE VIRTUAL TABLE nodes_fts USING fts5( + name, qualified_name, file_path, signature, + content='nodes', content_rowid='rowid', + tokenize='porter unicode61' + ) + """) + + # Rebuild from the content table (nodes) using the FTS5 rebuild command + conn.execute("INSERT INTO nodes_fts(nodes_fts) VALUES('rebuild')") + + conn.commit() + except BaseException: + conn.rollback() + raise count = conn.execute("SELECT count(*) FROM nodes_fts").fetchone()[0] logger.info("FTS index rebuilt: %d rows indexed", count) diff --git a/code_review_graph/tools/build.py b/code_review_graph/tools/build.py index ad832ec..b8a5cae 100644 --- a/code_review_graph/tools/build.py +++ b/code_review_graph/tools/build.py @@ -85,6 +85,7 @@ def build_or_update_graph( store.update_node_signature(node_id, sig[:512]) store.commit() except (sqlite3.OperationalError, TypeError, KeyError) as e: + store.rollback() logger.warning("Signature computation failed: %s", e) warnings.append(f"Signature computation failed: {type(e).__name__}: {e}") @@ -95,6 +96,7 @@ def build_or_update_graph( fts_count = rebuild_fts_index(store) build_result["fts_indexed"] = fts_count except (sqlite3.OperationalError, ImportError) as e: + store.rollback() logger.warning("FTS index rebuild failed: %s", e) warnings.append(f"FTS index rebuild failed: {type(e).__name__}: {e}") @@ -107,6 +109,7 @@ def build_or_update_graph( count = _store_flows(store, flows) build_result["flows_detected"] = count except (sqlite3.OperationalError, ImportError) as e: + store.rollback() logger.warning("Flow detection failed: %s", e) warnings.append(f"Flow detection failed: {type(e).__name__}: {e}") @@ -123,6 +126,7 @@ def build_or_update_graph( count = _store_communities(store, comms) build_result["communities_detected"] = count except (sqlite3.OperationalError, ImportError) as e: + store.rollback() logger.warning("Community detection failed: %s", e) warnings.append(f"Community detection failed: {type(e).__name__}: {e}")