From d60110c1a16f3cdc22b0f613c85985ab6a7044ef Mon Sep 17 00:00:00 2001 From: flamboh Date: Wed, 12 Nov 2025 23:18:33 -0800 Subject: [PATCH 1/5] Merge process now runs truly runs contention free on a background thread added daemon handler and persistent merge queue allowing multiple ranges to queue for merge at once ensure all bufferpool states are maintained in-line with merge add locking to bufferpool and table --- lstore/bufferpool.py | 103 +++---- lstore/db.py | 5 +- lstore/query.py | 10 +- lstore/table.py | 542 +++++++++++++++++++++++++------------ lstore/tests/test_table.py | 27 ++ 5 files changed, 466 insertions(+), 221 deletions(-) diff --git a/lstore/bufferpool.py b/lstore/bufferpool.py index e300f3d..26182a2 100644 --- a/lstore/bufferpool.py +++ b/lstore/bufferpool.py @@ -21,6 +21,7 @@ """ import os from collections import OrderedDict +from threading import RLock from typing import Optional, Tuple from config import Config @@ -44,6 +45,7 @@ def __init__(self, capacity: int = 1000, base_path: str = None): """ self.capacity = capacity self.base_path = Path(base_path) if base_path else None + self._lock = RLock() if self.base_path: self.base_path.mkdir(parents=True, exist_ok=True) @@ -71,20 +73,21 @@ def _make_file_path(self, table_name, range_id, segment, page_index, column_inde def get_page(self, table_name, range_id, segment, page_index, column_index): page_key = self._make_page_key(table_name, range_id, segment, page_index, column_index) - page = self.cache.get(page_key) + with self._lock: + page = self.cache.get(page_key) - if page is not None: - self.cache.move_to_end(page_key) - self.hits += 1 - else: - self.misses += 1 - if len(self.cache) >= self.capacity: - self._evict_page() - page = self._load_page(table_name, range_id, segment, page_index, column_index) - self.cache[page_key] = page + if page is not None: + self.cache.move_to_end(page_key) + self.hits += 1 + else: + self.misses += 1 + if len(self.cache) >= self.capacity: + self._evict_page_locked() + page = self._load_page(table_name, range_id, segment, page_index, column_index) + self.cache[page_key] = page - page.pin() - return page, page_key + page.pin() + return page, page_key def _load_page(self, table_name: str, range_id: int, segment: str, page_index: int, column_index: int) -> Page: @@ -105,7 +108,7 @@ def _load_page(self, table_name: str, range_id: int, segment: str, print(f"Warning: Failed to load page from {file_path}: {e}") return Page() - def _evict_page(self): + def _evict_page_locked(self): """Evict the least recently used unpinned page.""" # Find first unpinned page (from least recently used) for page_key, page in list(self.cache.items()): @@ -127,12 +130,13 @@ def _evict_page(self): def release_page(self, page_key: Tuple) -> None: """Unpin a previously fetched page so it becomes eligible for eviction.""" - page = self.cache.get(page_key) - if page is None: - raise KeyError(f"Page {page_key} was never fetched") - page.unpin() - if not page.is_pinned(): - self.cache.move_to_end(page_key) + with self._lock: + page = self.cache.get(page_key) + if page is None: + raise KeyError(f"Page {page_key} was never fetched") + page.unpin() + if not page.is_pinned(): + self.cache.move_to_end(page_key) @contextmanager def page(self, table_name, range_id, segment, page_index, column_index): @@ -157,46 +161,51 @@ def flush_page(self, table_name: str, range_id: int, segment: str, """Flush a specific page to disk if it's dirty.""" page_key = self._make_page_key(table_name, range_id, segment, page_index, column_index) - if page_key in self.cache: - page = self.cache[page_key] - if page.dirty: - self._write_page_to_disk(page_key, page) + with self._lock: + if page_key in self.cache: + page = self.cache[page_key] + if page.dirty: + self._write_page_to_disk(page_key, page) def discard_table(self, table_name: str): """Remove all cached pages for a table without writing them back to disk.""" - keys_to_remove = [key for key in self.cache if key[0] == table_name] - for key in keys_to_remove: - page = self.cache[key] - if page.is_pinned(): - raise RuntimeError( - f"Cannot discard table {table_name}: page {key} is currently pinned" - ) - del self.cache[key] + with self._lock: + keys_to_remove = [key for key in self.cache if key[0] == table_name] + for key in keys_to_remove: + page = self.cache[key] + if page.is_pinned(): + raise RuntimeError( + f"Cannot discard table {table_name}: page {key} is currently pinned" + ) + del self.cache[key] def flush_all(self): """Write all dirty pages to disk (called on database close).""" - for page_key, page in self.cache.items(): - if page.dirty: - self._write_page_to_disk(page_key, page) + with self._lock: + for page_key, page in list(self.cache.items()): + if page.dirty: + self._write_page_to_disk(page_key, page) def clear(self): """Clear the bufferpool (for testing or shutdown).""" - self.flush_all() - self.cache.clear() + with self._lock: + self.flush_all() + self.cache.clear() def get_stats(self): """Get bufferpool statistics.""" - total_accesses = self.hits + self.misses - hit_rate = (self.hits / total_accesses * 100) if total_accesses > 0 else 0 - return { - 'capacity': self.capacity, - 'current_size': len(self.cache), - 'hits': self.hits, - 'misses': self.misses, - 'hit_rate': f"{hit_rate:.2f}%", - 'evictions': self.evictions, - 'writes': self.writes, - } + with self._lock: + total_accesses = self.hits + self.misses + hit_rate = (self.hits / total_accesses * 100) if total_accesses > 0 else 0 + return { + 'capacity': self.capacity, + 'current_size': len(self.cache), + 'hits': self.hits, + 'misses': self.misses, + 'hit_rate': f"{hit_rate:.2f}%", + 'evictions': self.evictions, + 'writes': self.writes, + } def __repr__(self): stats = self.get_stats() diff --git a/lstore/db.py b/lstore/db.py index 4469463..7625e2d 100644 --- a/lstore/db.py +++ b/lstore/db.py @@ -34,6 +34,8 @@ def open(self, path: str): def close(self): """Flush all pages and save metadata.""" + for table in self.tables.values(): + table.close() if self.bufferpool: self.bufferpool.flush_all() self._save_metadata() @@ -54,7 +56,8 @@ def drop_table(self, name): raise ValueError(f"Table {name} does not exist") # Remove table metadata and cached pages before deleting files. - del self.tables[name] + table = self.tables.pop(name) + table.close() if self.bufferpool: self.bufferpool.discard_table(name) diff --git a/lstore/query.py b/lstore/query.py index 3be6c0b..7daa4f8 100644 --- a/lstore/query.py +++ b/lstore/query.py @@ -218,8 +218,14 @@ def update(self, primary_key, *columns): tail_record = tail_meta + tail_data - # insert tail record table handles index updates - tail_rid = self.table.insert_record(tail_record, is_tail=True, base_rid=base_rid) + # insert tail record and update indices using cached values + tail_rid = self.table.insert_record( + tail_record, + is_tail=True, + base_rid=base_rid, + index_prior_data=current_data, + index_new_data=new_data, + ) return tail_rid is not False except Exception: diff --git a/lstore/table.py b/lstore/table.py index 95ebd1b..006ee5d 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -1,7 +1,10 @@ from collections import defaultdict from contextlib import ExitStack, contextmanager -from time import time -from typing import Optional +from dataclasses import dataclass +import queue +import threading +from time import time, sleep +from typing import Callable, Dict, Optional, Set, Tuple from config import Config from lstore.bufferpool import Bufferpool @@ -85,6 +88,21 @@ def __repr__(self): f"segment={self.segment}, page={self.page_index}, col={self.column_index})" ) + +@dataclass(frozen=True) +class RangeMergeTask: + range_id: int + start_offset: int + end_offset: int + + +@dataclass +class RangeMergeSnapshot: + task: RangeMergeTask + updates: Dict[Tuple[int, int, int], int] + updated_slots: Set[Tuple[int, int]] + tps_value: int + class PageDirectory: def __init__( self, @@ -93,6 +111,7 @@ def __init__( bufferpool: Bufferpool, num_ranges: int = Config.initial_page_ranges, metadata: Optional[dict] = None, + merge_request_callback: Optional[Callable[[int], None]] = None, ): self.table_name = table_name self.bufferpool = bufferpool @@ -105,6 +124,9 @@ def __init__( self.tail_offsets = defaultdict(int) self.tail_merge_progress = defaultdict(int) self.merge_thresholds = defaultdict(lambda: max(1, Config.records_per_range // 4)) + self._merge_in_progress = defaultdict(int) + self._lock = threading.RLock() + self._merge_request_callback = merge_request_callback if metadata: self._load_metadata(metadata) @@ -135,63 +157,70 @@ def decode_rid(self, rid: int): # Page bookkeeping helpers # ------------------------------------------------------------------ def _get_or_create_range(self, range_id: int) -> dict: - info = self.page_directory[range_id] - self.num_ranges = max(self.num_ranges, range_id + 1) - return info + with self._lock: + info = self.page_directory[range_id] + self.num_ranges = max(self.num_ranges, range_id + 1) + return info def _require_range(self, range_id: int) -> dict: - if range_id not in self.page_directory: - raise RuntimeError(f"Range {range_id} has no allocated pages") - return self.page_directory[range_id] + with self._lock: + if range_id not in self.page_directory: + raise RuntimeError(f"Range {range_id} has no allocated pages") + return self.page_directory[range_id] def _ensure_logical_page(self, range_id: int, segment: int, page_index: int): - info = self._get_or_create_range(range_id) - if segment == 0: - pages = info["base"] - column_count = self._column_count(is_tail=False) - else: - tail_idx = segment - 1 - tail_segments = info["tail"] - while len(tail_segments) <= tail_idx: - tail_segments.append([]) - pages = tail_segments[tail_idx] - column_count = self._column_count(is_tail=True) - - while len(pages) <= page_index: - new_page_index = len(pages) - segment_label = self._segment_label(segment) - logical_page = [ - ColumnPageProxy( - self.bufferpool, - self.table_name, - range_id, - segment_label, - new_page_index, - column_index, - ) - for column_index in range(column_count) - ] - pages.append(logical_page) + with self._lock: + info = self.page_directory[range_id] + self.num_ranges = max(self.num_ranges, range_id + 1) + if segment == 0: + pages = info["base"] + column_count = self._column_count(is_tail=False) + else: + tail_idx = segment - 1 + tail_segments = info["tail"] + while len(tail_segments) <= tail_idx: + tail_segments.append([]) + pages = tail_segments[tail_idx] + column_count = self._column_count(is_tail=True) + + while len(pages) <= page_index: + new_page_index = len(pages) + segment_label = self._segment_label(segment) + logical_page = [ + ColumnPageProxy( + self.bufferpool, + self.table_name, + range_id, + segment_label, + new_page_index, + column_index, + ) + for column_index in range(column_count) + ] + pages.append(logical_page) - return pages[page_index] + return pages[page_index] def _ensure_page_exists_for_read(self, range_id: int, segment: int, page_index: int) -> None: - info = self._require_range(range_id) - if segment == 0: - if page_index >= len(info["base"]): - raise RuntimeError( - f"Base page {page_index} missing for range {range_id}" - ) - else: - tail_idx = segment - 1 - if tail_idx >= len(info["tail"]): - raise RuntimeError( - f"Tail segment {segment} missing for range {range_id}" - ) - if page_index >= len(info["tail"][tail_idx]): - raise RuntimeError( - f"Tail page {page_index} missing for range {range_id}, segment {segment}" - ) + with self._lock: + info = self.page_directory.get(range_id) + if not info: + raise RuntimeError(f"Range {range_id} has no allocated pages") + if segment == 0: + if page_index >= len(info["base"]): + raise RuntimeError( + f"Base page {page_index} missing for range {range_id}" + ) + else: + tail_idx = segment - 1 + if tail_idx >= len(info["tail"]): + raise RuntimeError( + f"Tail segment {segment} missing for range {range_id}" + ) + if page_index >= len(info["tail"][tail_idx]): + raise RuntimeError( + f"Tail page {page_index} missing for range {range_id}, segment {segment}" + ) def _segment_label(self, segment_id: int) -> str: return "base" if segment_id == 0 else f"tail_{segment_id}" @@ -234,53 +263,56 @@ def to_metadata(self) -> dict: def stringify(mapping: defaultdict) -> dict: return {str(k): v for k, v in mapping.items()} - directory = {} - for range_id, info in self.page_directory.items(): - directory[str(range_id)] = { - "base_pages": len(info["base"]), - "tail_pages": [len(segment) for segment in info["tail"]], - "TPS": info["TPS"], + with self._lock: + directory = {} + for range_id, info in self.page_directory.items(): + directory[str(range_id)] = { + "base_pages": len(info["base"]), + "tail_pages": [len(segment) for segment in info["tail"]], + "TPS": info["TPS"], + } + return { + "num_ranges": self.num_ranges, + "num_base_records": self.num_base_records, + "num_tail_records": self.num_tail_records, + "base_offsets": stringify(self.base_offsets), + "tail_offsets": stringify(self.tail_offsets), + "tail_merge_progress": stringify(self.tail_merge_progress), + "merge_thresholds": stringify(self.merge_thresholds), + "page_directory": directory, } - return { - "num_ranges": self.num_ranges, - "num_base_records": self.num_base_records, - "num_tail_records": self.num_tail_records, - "base_offsets": stringify(self.base_offsets), - "tail_offsets": stringify(self.tail_offsets), - "tail_merge_progress": stringify(self.tail_merge_progress), - "merge_thresholds": stringify(self.merge_thresholds), - "page_directory": directory, - } def _load_metadata(self, metadata: dict) -> None: - self.num_ranges = metadata.get("num_ranges", self.num_ranges) - self.num_base_records = metadata.get("num_base_records", 0) - self.num_tail_records = metadata.get("num_tail_records", 0) + with self._lock: + self.num_ranges = metadata.get("num_ranges", self.num_ranges) + self.num_base_records = metadata.get("num_base_records", 0) + self.num_tail_records = metadata.get("num_tail_records", 0) def int_keys(values: dict) -> dict: return {int(k): v for k, v in values.items()} - self.base_offsets = defaultdict(int, int_keys(metadata.get("base_offsets", {}))) - self.tail_offsets = defaultdict(int, int_keys(metadata.get("tail_offsets", {}))) - self.tail_merge_progress = defaultdict( - int, int_keys(metadata.get("tail_merge_progress", {})) - ) - self.merge_thresholds = defaultdict( - lambda: max(1, Config.records_per_range // 4), - int_keys(metadata.get("merge_thresholds", {})), - ) + with self._lock: + self.base_offsets = defaultdict(int, int_keys(metadata.get("base_offsets", {}))) + self.tail_offsets = defaultdict(int, int_keys(metadata.get("tail_offsets", {}))) + self.tail_merge_progress = defaultdict( + int, int_keys(metadata.get("tail_merge_progress", {})) + ) + self.merge_thresholds = defaultdict( + lambda: max(1, Config.records_per_range // 4), + int_keys(metadata.get("merge_thresholds", {})), + ) - self.page_directory = defaultdict(self._new_range_entry) - for range_id_str, info in metadata.get("page_directory", {}).items(): - range_id = int(range_id_str) - range_entry = self._get_or_create_range(range_id) - range_entry["TPS"] = info.get("TPS", 0) - base_pages = info.get("base_pages", 0) - for page_index in range(base_pages): - self._ensure_logical_page(range_id, 0, page_index) - for segment_offset, page_count in enumerate(info.get("tail_pages", []), start=1): - for page_index in range(page_count): - self._ensure_logical_page(range_id, segment_offset, page_index) + self.page_directory = defaultdict(self._new_range_entry) + for range_id_str, info in metadata.get("page_directory", {}).items(): + range_id = int(range_id_str) + range_entry = self._get_or_create_range(range_id) + range_entry["TPS"] = info.get("TPS", 0) + base_pages = info.get("base_pages", 0) + for page_index in range(base_pages): + self._ensure_logical_page(range_id, 0, page_index) + for segment_offset, page_count in enumerate(info.get("tail_pages", []), start=1): + for page_index in range(page_count): + self._ensure_logical_page(range_id, segment_offset, page_index) def add_record( @@ -307,33 +339,36 @@ def add_record( ) if not is_tail: - range_id = self.num_base_records // Config.records_per_range - offset = self.base_offsets[range_id] - rid = self.encode_rid(range_id, 0, offset) - self.base_offsets[range_id] += 1 + with self._lock: + range_id = self.num_base_records // Config.records_per_range + offset = self.base_offsets[range_id] + rid = self.encode_rid(range_id, 0, offset) + self.base_offsets[range_id] += 1 + self.num_base_records += 1 columns[Config.indirection_column] = Config.null_value columns[Config.schema_encoding_column] = 0 else: if base_rid == Config.null_value: raise ValueError("Tail records require a valid base RID") - base_range = self.decode_rid(base_rid)[0] - total_offset = self.tail_offsets[base_range] - pending_updates = total_offset - self.tail_merge_progress[base_range] - if pending_updates >= self.merge_thresholds[base_range]: - self.merge_range(base_range) - self.merge_thresholds[base_range] = max(1, self.merge_thresholds[base_range] * 2) + + schedule_merge = False + with self._lock: + base_range = self.decode_rid(base_rid)[0] total_offset = self.tail_offsets[base_range] + pending_updates = total_offset - self.tail_merge_progress[base_range] + if pending_updates >= self.merge_thresholds[base_range]: + schedule_merge = True - max_capacity = Config.records_per_range * Config.max_tail_segments - if total_offset >= max_capacity: - raise RuntimeError("Tail range is full; merge required before inserting more tail records") + max_capacity = Config.records_per_range * Config.max_tail_segments + if total_offset >= max_capacity: + raise RuntimeError("Tail range is full; merge required before inserting more tail records") - segment_index = total_offset // Config.records_per_range - segment_offset = total_offset % Config.records_per_range - segment_id = 1 + segment_index + segment_index = total_offset // Config.records_per_range + segment_offset = total_offset % Config.records_per_range + segment_id = 1 + segment_index + + rid = self.encode_rid(base_range, segment_id, segment_offset) - rid = self.encode_rid(base_range, segment_id, segment_offset) - self.tail_offsets[base_range] += 1 columns[Config.schema_encoding_column] = self.build_schema_encoding(columns) base_record = self.get_record_from_rid(base_rid) base_indirection = base_record[Config.indirection_column] @@ -341,6 +376,11 @@ def add_record( base_indirection if base_indirection != Config.null_value else base_rid ) columns[Config.base_rid_column] = base_rid + if schedule_merge and self._merge_request_callback: + self._merge_request_callback(base_range) + with self._lock: + self.tail_offsets[base_range] += 1 + self.num_tail_records += 1 columns[Config.timestamp_column] = int(time()) range_id, segment_id, page_index, _ = self.decode_rid(rid) @@ -354,11 +394,6 @@ def add_record( f"Failed to append value to page (range={range_id}, segment={segment_id}, page={page_index})" ) - if not is_tail: - self.num_base_records += 1 - else: - self.num_tail_records += 1 - if is_tail and base_rid != Config.null_value: self.update_base_record(base_rid, columns) @@ -522,68 +557,150 @@ def delete_record(self, rid: int): return True def merge_range(self, range_id): - range_info = self.page_directory.get(range_id) - if not range_info: + task = self.prepare_merge_task(range_id, force=True) + if not task: return False - tail_segments = range_info["tail"] - start_offset = self.tail_merge_progress[range_id] - end_offset = self.tail_offsets[range_id] - if start_offset >= end_offset: + snapshot = self.build_merge_snapshot(task) + if not snapshot: + self.cancel_merge_task(range_id, task.end_offset) return False + return self.commit_merge_snapshot(snapshot) + + def prepare_merge_task(self, range_id: int, force: bool = False) -> Optional[RangeMergeTask]: + with self._lock: + start_offset = self.tail_merge_progress[range_id] + end_offset = self.tail_offsets[range_id] + if start_offset >= end_offset: + return None + + in_progress = self._merge_in_progress.get(range_id, 0) + if in_progress and not force: + return None + + self._merge_in_progress[range_id] = end_offset + return RangeMergeTask(range_id, start_offset, end_offset) + + def cancel_merge_task(self, range_id: int, expected_end: int) -> None: + with self._lock: + if self._merge_in_progress.get(range_id) == expected_end: + self._merge_in_progress.pop(range_id, None) + + def _tps_from_offset(self, range_id: int, end_offset: int) -> int: + if end_offset <= 0: + return 0 + last_offset = end_offset - 1 + last_segment_index = last_offset // Config.records_per_range + last_segment_id = 1 + last_segment_index + last_segment_offset = last_offset % Config.records_per_range + return self.encode_rid(range_id, last_segment_id, last_segment_offset) + + def build_merge_snapshot(self, task: RangeMergeTask) -> Optional[RangeMergeSnapshot]: + if task.start_offset >= task.end_offset: + return None + + range_id = task.range_id + with self._lock: + range_info = self.page_directory.get(range_id) + if not range_info: + return None + tail_segments = [list(segment_pages) for segment_pages in range_info["tail"]] + + updates: Dict[Tuple[int, int, int], int] = {} + updated_slots: Set[Tuple[int, int]] = set() + + records_per_range = Config.records_per_range + records_per_page = Config.records_per_page + + start_segment = task.start_offset // records_per_range + end_segment = (task.end_offset - 1) // records_per_range + + for segment_index in range(start_segment, end_segment + 1): + tail_idx = segment_index + if tail_idx >= len(tail_segments): + break + segment_pages = tail_segments[tail_idx] + segment_start = max(task.start_offset, segment_index * records_per_range) + segment_end = min(task.end_offset, (segment_index + 1) * records_per_range) + segment_id = segment_index + 1 + + start_page = (segment_start - segment_index * records_per_range) // records_per_page + end_page = (segment_end - 1 - segment_index * records_per_range) // records_per_page + + for page_index in range(start_page, min(len(segment_pages), end_page + 1)): + page_start_offset = segment_index * records_per_range + page_index * records_per_page + page_end_offset = page_start_offset + records_per_page + read_start = max(segment_start, page_start_offset) + read_end = min(segment_end, page_end_offset) + if read_start >= read_end: + continue + + try: + self._ensure_page_exists_for_read(range_id, segment_id, page_index) + except RuntimeError: + continue - merged_any = False - for segment_index, segment_pages in enumerate(tail_segments, start=1): - for page_index, _ in enumerate(segment_pages): - with self._logical_page(range_id, segment_index, page_index) as tail_pages: + with self._logical_page(range_id, segment_id, page_index) as tail_pages: if not tail_pages: continue - num_records = tail_pages[Config.rid_column].num_records - for tail_slot in range(num_records): + + slot_start = read_start - page_start_offset + slot_end = read_end - page_start_offset + for tail_slot in range(slot_start, slot_end): tail_columns = [column_page.read(tail_slot) for column_page in tail_pages] - tail_rid = tail_columns[Config.rid_column] base_rid = tail_columns[Config.base_rid_column] if base_rid == Config.null_value: continue - _, tail_segment, tail_page_index, tail_slot_index = self.decode_rid(tail_rid) - tail_segment_index = max(0, tail_segment - 1) - tail_offset = ( - tail_segment_index * Config.records_per_range - + tail_page_index * Config.records_per_page - + tail_slot_index - ) - if tail_offset < start_offset or tail_offset >= end_offset: - continue - base_range, _, base_page_index, base_slot_index = self.decode_rid(base_rid) - self._ensure_page_exists_for_read(base_range, 0, base_page_index) - with self._logical_page(base_range, 0, base_page_index) as base_pages: - for column_index in range(self.num_columns): - value = tail_columns[Config.tail_meta_columns + column_index] - if value == Config.null_value: - continue - base_column_page = base_pages[Config.base_meta_columns + column_index] - base_column_page.write_slot(base_slot_index, value) - - base_schema_page = base_pages[Config.schema_encoding_column] - base_schema_page.write_slot(base_slot_index, 0) - merged_any = True - - if not merged_any: - return False + if base_range != range_id: + continue - self.tail_merge_progress[range_id] = end_offset - if end_offset == 0: - tps_value = 0 - else: - last_offset = end_offset - 1 - last_segment_index = last_offset // Config.records_per_range - last_segment_id = 1 + last_segment_index - last_segment_offset = last_offset % Config.records_per_range - tps_value = self.encode_rid(range_id, last_segment_id, last_segment_offset) - range_info["TPS"] = tps_value + data_offset = Config.tail_meta_columns + for column_index in range(self.num_columns): + value = tail_columns[data_offset + column_index] + if value == Config.null_value: + continue + physical_column = Config.base_meta_columns + column_index + updates[(base_page_index, base_slot_index, physical_column)] = value + updated_slots.add((base_page_index, base_slot_index)) + + tps_value = self._tps_from_offset(range_id, task.end_offset) + return RangeMergeSnapshot(task=task, updates=updates, updated_slots=updated_slots, tps_value=tps_value) + + def commit_merge_snapshot(self, snapshot: RangeMergeSnapshot) -> bool: + range_id = snapshot.task.range_id + with self._lock: + if snapshot.task.end_offset <= self.tail_merge_progress[range_id]: + self._merge_in_progress.pop(range_id, None) + return False + zero_schema = self.tail_offsets[range_id] == snapshot.task.end_offset + + for (page_index, slot_index, column_index), value in snapshot.updates.items(): + with self._column(range_id, 0, page_index, column_index) as column_page: + column_page.write_slot(slot_index, value) + + if zero_schema: + for page_index, slot_index in snapshot.updated_slots: + with self._column(range_id, 0, page_index, Config.schema_encoding_column) as schema_page: + schema_page.write_slot(slot_index, 0) + + with self._lock: + self.tail_merge_progress[range_id] = snapshot.task.end_offset + range_info = self.page_directory.get(range_id) + if range_info is not None: + range_info["TPS"] = snapshot.tps_value + max_threshold = max(1, Config.records_per_range // 4) + doubled_threshold = max(1, self.merge_thresholds[range_id] * 2) + self.merge_thresholds[range_id] = min(doubled_threshold, max_threshold) + self._merge_in_progress.pop(range_id, None) return True + def has_inflight_merge(self, range_id: Optional[int] = None) -> bool: + with self._lock: + if range_id is None: + return any(value > 0 for value in self._merge_in_progress.values()) + return self._merge_in_progress.get(range_id, 0) > 0 + class Table: """ @@ -603,15 +720,20 @@ def __init__( self.key = key self.num_columns = num_columns self.bufferpool = bufferpool or Bufferpool() + self._merge_jobs: queue.Queue[Optional[RangeMergeTask]] = queue.Queue() + self._merge_results: queue.Queue[RangeMergeSnapshot] = queue.Queue() + self._merge_shutdown = threading.Event() self.page_directory = PageDirectory( name, num_columns, self.bufferpool, Config.initial_page_ranges, metadata=directory_metadata, + merge_request_callback=self._request_merge, ) self.index = Index(self) - pass + self._merge_thread = threading.Thread(target=self._merge_loop, name=f"{name}-merger", daemon=True) + self._merge_thread.start() def to_metadata(self) -> dict: return { @@ -669,16 +791,24 @@ def get_cumulative_updated_record(self, rid: int): """ return self.page_directory.get_cumulative_updated_record_from_base_rid(rid) - def insert_record(self, columns: list[int], is_tail: bool = False, base_rid: int = Config.null_value): + def insert_record( + self, + columns: list[int], + is_tail: bool = False, + base_rid: int = Config.null_value, + index_prior_data: Optional[list[int]] = None, + index_new_data: Optional[list[int]] = None, + ): """ Inserts a record into the table :param columns: list[int] - the columns of the record :param base_rid: int - the RID of the base record, only used for tail records :return: int - the RID of the record """ - prior_data = None - if is_tail: - prior_data = self.get_cumulative_updated_record(base_rid)[ + self._tick_merges() + cached_prior_data = None + if is_tail and base_rid != Config.null_value and index_prior_data is None: + cached_prior_data = self.get_cumulative_updated_record(base_rid)[ Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns ] @@ -690,10 +820,17 @@ def insert_record(self, columns: list[int], is_tail: bool = False, base_rid: int ] self.index.add(rid, base_data) else: - updated_data = self.get_cumulative_updated_record(base_rid)[ - Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns - ] - self.index.update(base_rid, prior_data, updated_data) + prior_values = index_prior_data if index_prior_data is not None else cached_prior_data + if prior_values is None: + prior_values = self.get_cumulative_updated_record(base_rid)[ + Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns + ] + new_values = index_new_data + if new_values is None: + new_values = self.get_cumulative_updated_record(base_rid)[ + Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns + ] + self.index.update(base_rid, prior_values, new_values) return rid def delete_record(self, rid: int): @@ -702,6 +839,7 @@ def delete_record(self, rid: int): :param rid: int - the RID of the base record :return: bool - whether the record was deleted """ + self._tick_merges() try: base_record = self.page_directory.get_record_from_rid(rid) except RuntimeError: @@ -715,6 +853,68 @@ def delete_record(self, rid: int): except ValueError: return False + def _merge_loop(self): + while not self._merge_shutdown.is_set(): + try: + task = self._merge_jobs.get(timeout=0.1) + except queue.Empty: + continue + + if task is None: + if self._merge_shutdown.is_set(): + break + continue + + try: + snapshot = self.page_directory.build_merge_snapshot(task) + if snapshot: + self._merge_results.put(snapshot) + else: + self.page_directory.cancel_merge_task(task.range_id, task.end_offset) + except Exception: + self.page_directory.cancel_merge_task(task.range_id, task.end_offset) + + def _request_merge(self, range_id: int): + task = self.page_directory.prepare_merge_task(range_id) + if task: + self._merge_jobs.put(task) + + def _apply_ready_merges(self): + applied = 0 + while True: + try: + snapshot = self._merge_results.get_nowait() + except queue.Empty: + break + self.page_directory.commit_merge_snapshot(snapshot) + applied += 1 + return applied + + def _tick_merges(self): + self._apply_ready_merges() + + def wait_for_merges(self, timeout: float = 2.0): + deadline = time() + timeout + while time() < deadline: + self._apply_ready_merges() + if self._merge_jobs.empty() and self._merge_results.empty() and not self.page_directory.has_inflight_merge(): + break + sleep(0.01) + self._apply_ready_merges() + + def close(self): + if self._merge_shutdown.is_set(): + return + self._merge_shutdown.set() + self._merge_jobs.put(None) + if self._merge_thread.is_alive(): + self._merge_thread.join(timeout=1.0) + self.wait_for_merges() + + def __del__(self): + # Best-effort cleanup to avoid leaking background threads in tests. + self.close() + def __merge(self): print("merge is happening") pass diff --git a/lstore/tests/test_table.py b/lstore/tests/test_table.py index 84a1d2b..890cd87 100644 --- a/lstore/tests/test_table.py +++ b/lstore/tests/test_table.py @@ -252,9 +252,11 @@ def insert_tail(value): assert base_data() == [100, 10, 20] insert_tail(55) + grades_table.wait_for_merges() assert base_data() == [100, 10, 20] insert_tail(65) + grades_table.wait_for_merges() assert base_data() == [100, 55, 20] assert directory.merge_thresholds[0] == 2 @@ -279,12 +281,37 @@ def insert_tail(value): insert_tail(50) insert_tail(60) + grades_table.wait_for_merges() assert base_data() == [100, 50, 20] assert directory.merge_thresholds[0] == 2 insert_tail(70) + grades_table.wait_for_merges() assert base_data() == [100, 50, 20] insert_tail(80) + grades_table.wait_for_merges() assert base_data() == [100, 70, 20] assert directory.merge_thresholds[0] == 4 + + +def test_background_merge_applies_updates_without_manual_trigger(): + grades_table = Table("grades", num_columns=3, key=0) + base_rid = grades_table.insert_record( + _build_base_record(grades_table.num_columns, 999, 5, 6), + is_tail=False, + ) + directory = grades_table.page_directory + directory.merge_thresholds[0] = 1 + + tail_record = _build_tail_record(grades_table.num_columns, {1: 123}) + grades_table.insert_record(tail_record, is_tail=True, base_rid=base_rid) + + noop_tail = _build_tail_record(grades_table.num_columns, {}) + grades_table.insert_record(noop_tail, is_tail=True, base_rid=base_rid) + + grades_table.wait_for_merges() + merged_data = grades_table.get_record(base_rid)[ + Config.base_meta_columns : Config.base_meta_columns + grades_table.num_columns + ] + assert merged_data == [999, 123, 6] From fdcb66af9ff2f122fb296107062cca67ee05c898 Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 13 Nov 2025 10:36:05 -0800 Subject: [PATCH 2/5] remove unused __merge function, move indexing test into tests folder --- lstore/table.py | 7 +------ lstore/tests/test_indexing.py | 36 +++++++++++++++++++++++++++++++++++ test_indexing.py | 28 --------------------------- 3 files changed, 37 insertions(+), 34 deletions(-) create mode 100644 lstore/tests/test_indexing.py delete mode 100644 test_indexing.py diff --git a/lstore/table.py b/lstore/table.py index 006ee5d..67ad1a0 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -913,9 +913,4 @@ def close(self): def __del__(self): # Best-effort cleanup to avoid leaking background threads in tests. - self.close() - - def __merge(self): - print("merge is happening") - pass - + self.close() \ No newline at end of file diff --git a/lstore/tests/test_indexing.py b/lstore/tests/test_indexing.py new file mode 100644 index 0000000..b6d4df3 --- /dev/null +++ b/lstore/tests/test_indexing.py @@ -0,0 +1,36 @@ +from lstore.db import Database +from lstore.query import Query + +import sys +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +def test_indexing(): + db = Database() + table = db.create_table('Test', 3, 0) + query = Query(table) + + # Insert some records + query.insert(1, 100, 200) + query.insert(2, 100, 300) # Note: same value in column 1 + query.insert(3, 150, 400) + + # Create secondary index on column 1 + assert table.index.create_index(1) + + # Should find both records with value 100 in column 1 + results = query.select(100, 1, [1, 1, 1]) + assert len(results) == 2 + print(f"Found {len(results)} records") # Should print: Found 2 records + + # Try to create the same index again - should return False + assert not table.index.create_index(1) + + # Drop the index + assert table.index.drop_index(1) + + # Can't drop primary key index + assert not table.index.drop_index(0) diff --git a/test_indexing.py b/test_indexing.py deleted file mode 100644 index 839d7d5..0000000 --- a/test_indexing.py +++ /dev/null @@ -1,28 +0,0 @@ -from lstore.db import Database -from lstore.query import Query - -db = Database() -table = db.create_table('Test', 3, 0) -query = Query(table) - -# Insert some records -query.insert(1, 100, 200) -query.insert(2, 100, 300) # Note: same value in column 1 -query.insert(3, 150, 400) - -# Create secondary index on column 1 -assert table.index.create_index(1) - -# Should find both records with value 100 in column 1 -results = query.select(100, 1, [1, 1, 1]) -assert len(results) == 2 -print(f"Found {len(results)} records") # Should print: Found 2 records - -# Try to create the same index again - should return False -assert not table.index.create_index(1) - -# Drop the index -assert table.index.drop_index(1) - -# Can't drop primary key index -assert not table.index.drop_index(0) From f13b3a74db8e408c8d78f60fc325e928b25e62f2 Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 13 Nov 2025 10:58:05 -0800 Subject: [PATCH 3/5] fix type instantiation issue with merge jobs and results --- lstore/table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lstore/table.py b/lstore/table.py index 67ad1a0..83f106d 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -720,8 +720,8 @@ def __init__( self.key = key self.num_columns = num_columns self.bufferpool = bufferpool or Bufferpool() - self._merge_jobs: queue.Queue[Optional[RangeMergeTask]] = queue.Queue() - self._merge_results: queue.Queue[RangeMergeSnapshot] = queue.Queue() + self._merge_jobs: 'queue.Queue[Optional[RangeMergeTask]]' = queue.Queue() + self._merge_results: 'queue.Queue[RangeMergeSnapshot]' = queue.Queue() self._merge_shutdown = threading.Event() self.page_directory = PageDirectory( name, From a8a325c6766ea9e586b81bebaa6fb203a55ef0cd Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 13 Nov 2025 11:05:18 -0800 Subject: [PATCH 4/5] remove general exception handeler in table --- lstore/table.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lstore/table.py b/lstore/table.py index 83f106d..8c28ffd 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -720,8 +720,8 @@ def __init__( self.key = key self.num_columns = num_columns self.bufferpool = bufferpool or Bufferpool() - self._merge_jobs: 'queue.Queue[Optional[RangeMergeTask]]' = queue.Queue() - self._merge_results: 'queue.Queue[RangeMergeSnapshot]' = queue.Queue() + self._merge_jobs: "queue.Queue[Optional[RangeMergeTask]]" = queue.Queue() + self._merge_results: "queue.Queue[RangeMergeSnapshot]" = queue.Queue() self._merge_shutdown = threading.Event() self.page_directory = PageDirectory( name, @@ -871,7 +871,8 @@ def _merge_loop(self): self._merge_results.put(snapshot) else: self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - except Exception: + except (RuntimeError, ValueError, IndexError) as e: + print(f"Error building merge snapshot: {e}") self.page_directory.cancel_merge_task(task.range_id, task.end_offset) def _request_merge(self, range_id: int): From cfc1a4e8059b629a9778f1a499b1a62cf40f03f1 Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 13 Nov 2025 11:26:03 -0800 Subject: [PATCH 5/5] tail merge schedule fixed --- lstore/table.py | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/lstore/table.py b/lstore/table.py index 8c28ffd..c663c53 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -111,7 +111,7 @@ def __init__( bufferpool: Bufferpool, num_ranges: int = Config.initial_page_ranges, metadata: Optional[dict] = None, - merge_request_callback: Optional[Callable[[int], None]] = None, + merge_request_callback: Optional[Callable[[int, Optional[int]], None]] = None, ): self.table_name = table_name self.bufferpool = bufferpool @@ -350,14 +350,15 @@ def add_record( else: if base_rid == Config.null_value: raise ValueError("Tail records require a valid base RID") - schedule_merge = False + merge_hint = None with self._lock: base_range = self.decode_rid(base_rid)[0] total_offset = self.tail_offsets[base_range] pending_updates = total_offset - self.tail_merge_progress[base_range] - if pending_updates >= self.merge_thresholds[base_range]: - schedule_merge = True + schedule_merge = pending_updates >= self.merge_thresholds[base_range] + if schedule_merge: + merge_hint = total_offset max_capacity = Config.records_per_range * Config.max_tail_segments if total_offset >= max_capacity: @@ -369,6 +370,9 @@ def add_record( rid = self.encode_rid(base_range, segment_id, segment_offset) + self.tail_offsets[base_range] += 1 + self.num_tail_records += 1 + columns[Config.schema_encoding_column] = self.build_schema_encoding(columns) base_record = self.get_record_from_rid(base_rid) base_indirection = base_record[Config.indirection_column] @@ -377,11 +381,7 @@ def add_record( ) columns[Config.base_rid_column] = base_rid if schedule_merge and self._merge_request_callback: - self._merge_request_callback(base_range) - with self._lock: - self.tail_offsets[base_range] += 1 - self.num_tail_records += 1 - + self._merge_request_callback(base_range, merge_hint) columns[Config.timestamp_column] = int(time()) range_id, segment_id, page_index, _ = self.decode_rid(rid) self._ensure_logical_page(range_id, segment_id, page_index) @@ -566,10 +566,17 @@ def merge_range(self, range_id): return False return self.commit_merge_snapshot(snapshot) - def prepare_merge_task(self, range_id: int, force: bool = False) -> Optional[RangeMergeTask]: + def prepare_merge_task( + self, + range_id: int, + force: bool = False, + end_offset_hint: Optional[int] = None, + ) -> Optional[RangeMergeTask]: with self._lock: start_offset = self.tail_merge_progress[range_id] end_offset = self.tail_offsets[range_id] + if end_offset_hint is not None: + end_offset = min(end_offset, end_offset_hint) if start_offset >= end_offset: return None @@ -875,8 +882,8 @@ def _merge_loop(self): print(f"Error building merge snapshot: {e}") self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - def _request_merge(self, range_id: int): - task = self.page_directory.prepare_merge_task(range_id) + def _request_merge(self, range_id: int, end_offset_hint: Optional[int] = None): + task = self.page_directory.prepare_merge_task(range_id, end_offset_hint=end_offset_hint) if task: self._merge_jobs.put(task) @@ -910,8 +917,11 @@ def close(self): self._merge_jobs.put(None) if self._merge_thread.is_alive(): self._merge_thread.join(timeout=1.0) + if self._merge_thread.is_alive(): + print("Merge thread did not join in time, force-killing") + print(f"Warning: merge thread for table {self.name} did not stop within timeout") self.wait_for_merges() def __del__(self): # Best-effort cleanup to avoid leaking background threads in tests. - self.close() \ No newline at end of file + self.close()