From d8ecee7a0b38552d7e4e14f931f9997fe6ee4685 Mon Sep 17 00:00:00 2001 From: Abdellatif Labreche Date: Thu, 26 Feb 2026 13:47:39 +0100 Subject: [PATCH] Extract compaction logic to Compactor class (SOLID) --- src/jsonldb/compactor.py | 89 ++++++++++++++++++++++++++++++++++++++++ src/jsonldb/database.py | 51 +++-------------------- 2 files changed, 94 insertions(+), 46 deletions(-) create mode 100644 src/jsonldb/compactor.py diff --git a/src/jsonldb/compactor.py b/src/jsonldb/compactor.py new file mode 100644 index 0000000..e91daec --- /dev/null +++ b/src/jsonldb/compactor.py @@ -0,0 +1,89 @@ +import json +import struct +from pathlib import Path +from typing import Iterator + +from .index import Index +from .record import Record +from .wal import WAL + + +class Compactor: + def __init__( + self, data_path: Path, index: Index, wal: WAL, index_fields: list[str] + ): + self.data_path = data_path + self.index = index + self.wal = wal + self.index_fields = index_fields + + def compact(self) -> int: + compacted_records = self._merge_records() + + offset_map = self._write_compacted_records(compacted_records) + + self._rebuild_indexes(offset_map) + + self.wal.clear() + + return len(offset_map) + + def _merge_records(self) -> dict[str, Record]: + merged = {} + + for record in self._scan_data(): + merged[record.id] = record + + for entry in self.wal.scan(): + if entry.op == "delete": + merged.pop(entry.record_id, None) + elif entry.op in ("insert", "update") and entry.data: + merged[entry.record_id] = Record(id=entry.record_id, data=entry.data) + + return merged + + def _scan_data(self) -> Iterator[Record]: + if not self.data_path.exists(): + return + with open(self.data_path, "r") as f: + for line in f: + line = line.strip() + if line: + yield Record.from_jsonl(line) + + def _write_compacted_records( + self, records: dict[str, Record] + ) -> dict[str, tuple[int, int]]: + new_data_path = self.data_path.with_suffix(".jsonl.tmp") + offset_map = {} + + with open(new_data_path, "w") as f: + offset = 0 + for record_id, record in records.items(): + line = record.to_jsonl() + "\n" + f.write(line) + end_offset = f.tell() + offset_map[record_id] = (offset, end_offset) + offset = end_offset + + if self.data_path.exists(): + self.data_path.unlink() + new_data_path.rename(self.data_path) + + return offset_map + + def _rebuild_indexes(self, offset_map: dict[str, tuple[int, int]]) -> None: + self.index.clear() + + for record_id, (offset, end_offset) in offset_map.items(): + self.index.put(record_id.encode(), struct.pack("QQ", offset, end_offset)) + + with open(self.data_path, "r") as f: + f.seek(offset) + line = f.read(end_offset - offset).strip() + record = Record.from_jsonl(line) + + for field in self.index_fields: + if field in record.data: + index_key = f"{field}:{json.dumps(record.data[field])}".encode() + self.index.put_index(index_key, record_id.encode()) diff --git a/src/jsonldb/database.py b/src/jsonldb/database.py index 2d5030f..0a394d8 100644 --- a/src/jsonldb/database.py +++ b/src/jsonldb/database.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Any, Iterator +from .compactor import Compactor from .index import Index from .record import Record from .wal import WAL, WalEntry @@ -21,6 +22,7 @@ def __init__(self, path: str, index_fields: list[str] | None = None): self._index_fields = index_fields or [] self.wal = WAL(self.wal_path) + self.compactor = Compactor(self.data_path, self.index, self.wal, self._index_fields) def insert(self, data: dict[str, Any]) -> str: record_id = str(uuid.uuid4()) @@ -68,10 +70,10 @@ def _get_from_wal(self, record_id: str) -> Record | None: for entry in self.wal.scan(): if entry.record_id == record_id: last_entry = entry - + if last_entry is None: return None - + if last_entry.op == "delete": return None if last_entry.op in ("insert", "update") and last_entry.data: @@ -140,47 +142,4 @@ def count(self) -> int: return sum(1 for _ in self.all()) def compact(self) -> int: - compacted_records = {} - - for record in self._scan_data(): - compacted_records[record.id] = record - - for entry in self.wal.scan(): - if entry.op == "delete": - compacted_records.pop(entry.record_id, None) - elif entry.op in ("insert", "update") and entry.data: - compacted_records[entry.record_id] = Record(id=entry.record_id, data=entry.data) - - new_data_path = self.data_path.with_suffix(".jsonl.tmp") - seen = {} - - with open(new_data_path, "w") as f: - offset = 0 - for record_id, record in compacted_records.items(): - line = record.to_jsonl() + "\n" - f.write(line) - end_offset = f.tell() - seen[record_id] = (offset, end_offset) - offset = end_offset - - if self.data_path.exists(): - self.data_path.unlink() - new_data_path.rename(self.data_path) - - self.index.clear() - for record_id, (offset, end_offset) in seen.items(): - self.index.put(record_id.encode(), struct.pack("QQ", offset, end_offset)) - - with open(self.data_path, "r") as f: - f.seek(offset) - line = f.read(end_offset - offset).strip() - record = Record.from_jsonl(line) - - for field in self._index_fields: - if field in record.data: - index_key = f"{field}:{json.dumps(record.data[field])}".encode() - self.index.put_index(index_key, record_id.encode()) - - self.wal.clear() - - return len(seen) + return self.compactor.compact()