Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions src/jsonldb/compactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import struct
from pathlib import Path
from typing import Iterator

from .index import Index
from .record import Record
from .wal import WAL


class Compactor:
def __init__(
self, data_path: Path, index: Index, wal: WAL, index_fields: list[str]
):
self.data_path = data_path
self.index = index
self.wal = wal
self.index_fields = index_fields

def compact(self) -> int:
compacted_records = self._merge_records()

offset_map = self._write_compacted_records(compacted_records)

self._rebuild_indexes(offset_map)

self.wal.clear()

return len(offset_map)

def _merge_records(self) -> dict[str, Record]:
merged = {}

for record in self._scan_data():
merged[record.id] = record

for entry in self.wal.scan():
if entry.op == "delete":
merged.pop(entry.record_id, None)
elif entry.op in ("insert", "update") and entry.data:
merged[entry.record_id] = Record(id=entry.record_id, data=entry.data)

return merged

def _scan_data(self) -> Iterator[Record]:
if not self.data_path.exists():
return
with open(self.data_path, "r") as f:
for line in f:
line = line.strip()
if line:
yield Record.from_jsonl(line)

def _write_compacted_records(
self, records: dict[str, Record]
) -> dict[str, tuple[int, int]]:
new_data_path = self.data_path.with_suffix(".jsonl.tmp")
offset_map = {}

with open(new_data_path, "w") as f:
offset = 0
for record_id, record in records.items():
line = record.to_jsonl() + "\n"
f.write(line)
end_offset = f.tell()
offset_map[record_id] = (offset, end_offset)
offset = end_offset

if self.data_path.exists():
self.data_path.unlink()
new_data_path.rename(self.data_path)

return offset_map

def _rebuild_indexes(self, offset_map: dict[str, tuple[int, int]]) -> None:
self.index.clear()

for record_id, (offset, end_offset) in offset_map.items():
self.index.put(record_id.encode(), struct.pack("QQ", offset, end_offset))

with open(self.data_path, "r") as f:
f.seek(offset)
line = f.read(end_offset - offset).strip()
record = Record.from_jsonl(line)

for field in self.index_fields:
if field in record.data:
index_key = f"{field}:{json.dumps(record.data[field])}".encode()
self.index.put_index(index_key, record_id.encode())
47 changes: 3 additions & 44 deletions src/jsonldb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import Any, Iterator

from .compactor import Compactor
from .index import Index
from .record import Record
from .wal import WAL, WalEntry
Expand All @@ -21,6 +22,7 @@ def __init__(self, path: str, index_fields: list[str] | None = None):
self._index_fields = index_fields or []

self.wal = WAL(self.wal_path)
self.compactor = Compactor(self.data_path, self.index, self.wal, self._index_fields)

def insert(self, data: dict[str, Any]) -> str:
record_id = str(uuid.uuid4())
Expand Down Expand Up @@ -140,47 +142,4 @@ def count(self) -> int:
return sum(1 for _ in self.all())

def compact(self) -> int:
compacted_records = {}

for record in self._scan_data():
compacted_records[record.id] = record

for entry in self.wal.scan():
if entry.op == "delete":
compacted_records.pop(entry.record_id, None)
elif entry.op in ("insert", "update") and entry.data:
compacted_records[entry.record_id] = Record(id=entry.record_id, data=entry.data)

new_data_path = self.data_path.with_suffix(".jsonl.tmp")
seen = {}

with open(new_data_path, "w") as f:
offset = 0
for record_id, record in compacted_records.items():
line = record.to_jsonl() + "\n"
f.write(line)
end_offset = f.tell()
seen[record_id] = (offset, end_offset)
offset = end_offset

if self.data_path.exists():
self.data_path.unlink()
new_data_path.rename(self.data_path)

self.index.clear()
for record_id, (offset, end_offset) in seen.items():
self.index.put(record_id.encode(), struct.pack("QQ", offset, end_offset))

with open(self.data_path, "r") as f:
f.seek(offset)
line = f.read(end_offset - offset).strip()
record = Record.from_jsonl(line)

for field in self._index_fields:
if field in record.data:
index_key = f"{field}:{json.dumps(record.data[field])}".encode()
self.index.put_index(index_key, record_id.encode())

self.wal.clear()

return len(seen)
return self.compactor.compact()