diff --git a/.gitignore b/.gitignore index 0cda74d..835c9c9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,7 @@ testM1.py testM2.py testM3.py -**/CS451 \ No newline at end of file +**/CS451 +**/CT +**/MT +**/M2 \ No newline at end of file diff --git a/lstore/bufferpool.py b/lstore/bufferpool.py index 26182a2..e300f3d 100644 --- a/lstore/bufferpool.py +++ b/lstore/bufferpool.py @@ -21,7 +21,6 @@ """ import os from collections import OrderedDict -from threading import RLock from typing import Optional, Tuple from config import Config @@ -45,7 +44,6 @@ def __init__(self, capacity: int = 1000, base_path: str = None): """ self.capacity = capacity self.base_path = Path(base_path) if base_path else None - self._lock = RLock() if self.base_path: self.base_path.mkdir(parents=True, exist_ok=True) @@ -73,21 +71,20 @@ def _make_file_path(self, table_name, range_id, segment, page_index, column_inde def get_page(self, table_name, range_id, segment, page_index, column_index): page_key = self._make_page_key(table_name, range_id, segment, page_index, column_index) - with self._lock: - page = self.cache.get(page_key) + page = self.cache.get(page_key) - if page is not None: - self.cache.move_to_end(page_key) - self.hits += 1 - else: - self.misses += 1 - if len(self.cache) >= self.capacity: - self._evict_page_locked() - page = self._load_page(table_name, range_id, segment, page_index, column_index) - self.cache[page_key] = page + if page is not None: + self.cache.move_to_end(page_key) + self.hits += 1 + else: + self.misses += 1 + if len(self.cache) >= self.capacity: + self._evict_page() + page = self._load_page(table_name, range_id, segment, page_index, column_index) + self.cache[page_key] = page - page.pin() - return page, page_key + page.pin() + return page, page_key def _load_page(self, table_name: str, range_id: int, segment: str, page_index: int, column_index: int) -> Page: @@ -108,7 +105,7 @@ def _load_page(self, table_name: str, range_id: int, segment: str, print(f"Warning: Failed to load page from {file_path}: {e}") return Page() - def _evict_page_locked(self): + def _evict_page(self): """Evict the least recently used unpinned page.""" # Find first unpinned page (from least recently used) for page_key, page in list(self.cache.items()): @@ -130,13 +127,12 @@ def _evict_page_locked(self): def release_page(self, page_key: Tuple) -> None: """Unpin a previously fetched page so it becomes eligible for eviction.""" - with self._lock: - page = self.cache.get(page_key) - if page is None: - raise KeyError(f"Page {page_key} was never fetched") - page.unpin() - if not page.is_pinned(): - self.cache.move_to_end(page_key) + page = self.cache.get(page_key) + if page is None: + raise KeyError(f"Page {page_key} was never fetched") + page.unpin() + if not page.is_pinned(): + self.cache.move_to_end(page_key) @contextmanager def page(self, table_name, range_id, segment, page_index, column_index): @@ -161,51 +157,46 @@ def flush_page(self, table_name: str, range_id: int, segment: str, """Flush a specific page to disk if it's dirty.""" page_key = self._make_page_key(table_name, range_id, segment, page_index, column_index) - with self._lock: - if page_key in self.cache: - page = self.cache[page_key] - if page.dirty: - self._write_page_to_disk(page_key, page) + if page_key in self.cache: + page = self.cache[page_key] + if page.dirty: + self._write_page_to_disk(page_key, page) def discard_table(self, table_name: str): """Remove all cached pages for a table without writing them back to disk.""" - with self._lock: - keys_to_remove = [key for key in self.cache if key[0] == table_name] - for key in keys_to_remove: - page = self.cache[key] - if page.is_pinned(): - raise RuntimeError( - f"Cannot discard table {table_name}: page {key} is currently pinned" - ) - del self.cache[key] + keys_to_remove = [key for key in self.cache if key[0] == table_name] + for key in keys_to_remove: + page = self.cache[key] + if page.is_pinned(): + raise RuntimeError( + f"Cannot discard table {table_name}: page {key} is currently pinned" + ) + del self.cache[key] def flush_all(self): """Write all dirty pages to disk (called on database close).""" - with self._lock: - for page_key, page in list(self.cache.items()): - if page.dirty: - self._write_page_to_disk(page_key, page) + for page_key, page in self.cache.items(): + if page.dirty: + self._write_page_to_disk(page_key, page) def clear(self): """Clear the bufferpool (for testing or shutdown).""" - with self._lock: - self.flush_all() - self.cache.clear() + self.flush_all() + self.cache.clear() def get_stats(self): """Get bufferpool statistics.""" - with self._lock: - total_accesses = self.hits + self.misses - hit_rate = (self.hits / total_accesses * 100) if total_accesses > 0 else 0 - return { - 'capacity': self.capacity, - 'current_size': len(self.cache), - 'hits': self.hits, - 'misses': self.misses, - 'hit_rate': f"{hit_rate:.2f}%", - 'evictions': self.evictions, - 'writes': self.writes, - } + total_accesses = self.hits + self.misses + hit_rate = (self.hits / total_accesses * 100) if total_accesses > 0 else 0 + return { + 'capacity': self.capacity, + 'current_size': len(self.cache), + 'hits': self.hits, + 'misses': self.misses, + 'hit_rate': f"{hit_rate:.2f}%", + 'evictions': self.evictions, + 'writes': self.writes, + } def __repr__(self): stats = self.get_stats() diff --git a/lstore/db.py b/lstore/db.py index 7625e2d..4469463 100644 --- a/lstore/db.py +++ b/lstore/db.py @@ -34,8 +34,6 @@ def open(self, path: str): def close(self): """Flush all pages and save metadata.""" - for table in self.tables.values(): - table.close() if self.bufferpool: self.bufferpool.flush_all() self._save_metadata() @@ -56,8 +54,7 @@ def drop_table(self, name): raise ValueError(f"Table {name} does not exist") # Remove table metadata and cached pages before deleting files. - table = self.tables.pop(name) - table.close() + del self.tables[name] if self.bufferpool: self.bufferpool.discard_table(name) diff --git a/lstore/query.py b/lstore/query.py index 7daa4f8..215e6ba 100644 --- a/lstore/query.py +++ b/lstore/query.py @@ -193,12 +193,15 @@ def update(self, primary_key, *columns): return False current_data = current_record[Config.tail_meta_columns:] - # disallow primary-key modifications + # allow primary-key modifications only when the new key is unique pk_index = self.table.key requested_pk = columns[pk_index] current_pk = current_data[pk_index] if requested_pk is not None and requested_pk != current_pk: - return False + # allow primary-key changes only if the new key does not already exist + existing_rids = self.table.index.locate(pk_index, requested_pk) + if any(rid != base_rid for rid in existing_rids): + return False # create tail record with metadata tail_meta = [Config.null_value for _ in range(Config.tail_meta_columns)] @@ -218,14 +221,8 @@ def update(self, primary_key, *columns): tail_record = tail_meta + tail_data - # insert tail record and update indices using cached values - tail_rid = self.table.insert_record( - tail_record, - is_tail=True, - base_rid=base_rid, - index_prior_data=current_data, - index_new_data=new_data, - ) + # insert tail record table handles index updates + tail_rid = self.table.insert_record(tail_record, is_tail=True, base_rid=base_rid) return tail_rid is not False except Exception: diff --git a/lstore/table.py b/lstore/table.py index c663c53..95ebd1b 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -1,10 +1,7 @@ from collections import defaultdict from contextlib import ExitStack, contextmanager -from dataclasses import dataclass -import queue -import threading -from time import time, sleep -from typing import Callable, Dict, Optional, Set, Tuple +from time import time +from typing import Optional from config import Config from lstore.bufferpool import Bufferpool @@ -88,21 +85,6 @@ def __repr__(self): f"segment={self.segment}, page={self.page_index}, col={self.column_index})" ) - -@dataclass(frozen=True) -class RangeMergeTask: - range_id: int - start_offset: int - end_offset: int - - -@dataclass -class RangeMergeSnapshot: - task: RangeMergeTask - updates: Dict[Tuple[int, int, int], int] - updated_slots: Set[Tuple[int, int]] - tps_value: int - class PageDirectory: def __init__( self, @@ -111,7 +93,6 @@ def __init__( bufferpool: Bufferpool, num_ranges: int = Config.initial_page_ranges, metadata: Optional[dict] = None, - merge_request_callback: Optional[Callable[[int, Optional[int]], None]] = None, ): self.table_name = table_name self.bufferpool = bufferpool @@ -124,9 +105,6 @@ def __init__( self.tail_offsets = defaultdict(int) self.tail_merge_progress = defaultdict(int) self.merge_thresholds = defaultdict(lambda: max(1, Config.records_per_range // 4)) - self._merge_in_progress = defaultdict(int) - self._lock = threading.RLock() - self._merge_request_callback = merge_request_callback if metadata: self._load_metadata(metadata) @@ -157,70 +135,63 @@ def decode_rid(self, rid: int): # Page bookkeeping helpers # ------------------------------------------------------------------ def _get_or_create_range(self, range_id: int) -> dict: - with self._lock: - info = self.page_directory[range_id] - self.num_ranges = max(self.num_ranges, range_id + 1) - return info + info = self.page_directory[range_id] + self.num_ranges = max(self.num_ranges, range_id + 1) + return info def _require_range(self, range_id: int) -> dict: - with self._lock: - if range_id not in self.page_directory: - raise RuntimeError(f"Range {range_id} has no allocated pages") - return self.page_directory[range_id] + if range_id not in self.page_directory: + raise RuntimeError(f"Range {range_id} has no allocated pages") + return self.page_directory[range_id] def _ensure_logical_page(self, range_id: int, segment: int, page_index: int): - with self._lock: - info = self.page_directory[range_id] - self.num_ranges = max(self.num_ranges, range_id + 1) - if segment == 0: - pages = info["base"] - column_count = self._column_count(is_tail=False) - else: - tail_idx = segment - 1 - tail_segments = info["tail"] - while len(tail_segments) <= tail_idx: - tail_segments.append([]) - pages = tail_segments[tail_idx] - column_count = self._column_count(is_tail=True) - - while len(pages) <= page_index: - new_page_index = len(pages) - segment_label = self._segment_label(segment) - logical_page = [ - ColumnPageProxy( - self.bufferpool, - self.table_name, - range_id, - segment_label, - new_page_index, - column_index, - ) - for column_index in range(column_count) - ] - pages.append(logical_page) + info = self._get_or_create_range(range_id) + if segment == 0: + pages = info["base"] + column_count = self._column_count(is_tail=False) + else: + tail_idx = segment - 1 + tail_segments = info["tail"] + while len(tail_segments) <= tail_idx: + tail_segments.append([]) + pages = tail_segments[tail_idx] + column_count = self._column_count(is_tail=True) + + while len(pages) <= page_index: + new_page_index = len(pages) + segment_label = self._segment_label(segment) + logical_page = [ + ColumnPageProxy( + self.bufferpool, + self.table_name, + range_id, + segment_label, + new_page_index, + column_index, + ) + for column_index in range(column_count) + ] + pages.append(logical_page) - return pages[page_index] + return pages[page_index] def _ensure_page_exists_for_read(self, range_id: int, segment: int, page_index: int) -> None: - with self._lock: - info = self.page_directory.get(range_id) - if not info: - raise RuntimeError(f"Range {range_id} has no allocated pages") - if segment == 0: - if page_index >= len(info["base"]): - raise RuntimeError( - f"Base page {page_index} missing for range {range_id}" - ) - else: - tail_idx = segment - 1 - if tail_idx >= len(info["tail"]): - raise RuntimeError( - f"Tail segment {segment} missing for range {range_id}" - ) - if page_index >= len(info["tail"][tail_idx]): - raise RuntimeError( - f"Tail page {page_index} missing for range {range_id}, segment {segment}" - ) + info = self._require_range(range_id) + if segment == 0: + if page_index >= len(info["base"]): + raise RuntimeError( + f"Base page {page_index} missing for range {range_id}" + ) + else: + tail_idx = segment - 1 + if tail_idx >= len(info["tail"]): + raise RuntimeError( + f"Tail segment {segment} missing for range {range_id}" + ) + if page_index >= len(info["tail"][tail_idx]): + raise RuntimeError( + f"Tail page {page_index} missing for range {range_id}, segment {segment}" + ) def _segment_label(self, segment_id: int) -> str: return "base" if segment_id == 0 else f"tail_{segment_id}" @@ -263,56 +234,53 @@ def to_metadata(self) -> dict: def stringify(mapping: defaultdict) -> dict: return {str(k): v for k, v in mapping.items()} - with self._lock: - directory = {} - for range_id, info in self.page_directory.items(): - directory[str(range_id)] = { - "base_pages": len(info["base"]), - "tail_pages": [len(segment) for segment in info["tail"]], - "TPS": info["TPS"], - } - return { - "num_ranges": self.num_ranges, - "num_base_records": self.num_base_records, - "num_tail_records": self.num_tail_records, - "base_offsets": stringify(self.base_offsets), - "tail_offsets": stringify(self.tail_offsets), - "tail_merge_progress": stringify(self.tail_merge_progress), - "merge_thresholds": stringify(self.merge_thresholds), - "page_directory": directory, + directory = {} + for range_id, info in self.page_directory.items(): + directory[str(range_id)] = { + "base_pages": len(info["base"]), + "tail_pages": [len(segment) for segment in info["tail"]], + "TPS": info["TPS"], } + return { + "num_ranges": self.num_ranges, + "num_base_records": self.num_base_records, + "num_tail_records": self.num_tail_records, + "base_offsets": stringify(self.base_offsets), + "tail_offsets": stringify(self.tail_offsets), + "tail_merge_progress": stringify(self.tail_merge_progress), + "merge_thresholds": stringify(self.merge_thresholds), + "page_directory": directory, + } def _load_metadata(self, metadata: dict) -> None: - with self._lock: - self.num_ranges = metadata.get("num_ranges", self.num_ranges) - self.num_base_records = metadata.get("num_base_records", 0) - self.num_tail_records = metadata.get("num_tail_records", 0) + self.num_ranges = metadata.get("num_ranges", self.num_ranges) + self.num_base_records = metadata.get("num_base_records", 0) + self.num_tail_records = metadata.get("num_tail_records", 0) def int_keys(values: dict) -> dict: return {int(k): v for k, v in values.items()} - with self._lock: - self.base_offsets = defaultdict(int, int_keys(metadata.get("base_offsets", {}))) - self.tail_offsets = defaultdict(int, int_keys(metadata.get("tail_offsets", {}))) - self.tail_merge_progress = defaultdict( - int, int_keys(metadata.get("tail_merge_progress", {})) - ) - self.merge_thresholds = defaultdict( - lambda: max(1, Config.records_per_range // 4), - int_keys(metadata.get("merge_thresholds", {})), - ) + self.base_offsets = defaultdict(int, int_keys(metadata.get("base_offsets", {}))) + self.tail_offsets = defaultdict(int, int_keys(metadata.get("tail_offsets", {}))) + self.tail_merge_progress = defaultdict( + int, int_keys(metadata.get("tail_merge_progress", {})) + ) + self.merge_thresholds = defaultdict( + lambda: max(1, Config.records_per_range // 4), + int_keys(metadata.get("merge_thresholds", {})), + ) - self.page_directory = defaultdict(self._new_range_entry) - for range_id_str, info in metadata.get("page_directory", {}).items(): - range_id = int(range_id_str) - range_entry = self._get_or_create_range(range_id) - range_entry["TPS"] = info.get("TPS", 0) - base_pages = info.get("base_pages", 0) - for page_index in range(base_pages): - self._ensure_logical_page(range_id, 0, page_index) - for segment_offset, page_count in enumerate(info.get("tail_pages", []), start=1): - for page_index in range(page_count): - self._ensure_logical_page(range_id, segment_offset, page_index) + self.page_directory = defaultdict(self._new_range_entry) + for range_id_str, info in metadata.get("page_directory", {}).items(): + range_id = int(range_id_str) + range_entry = self._get_or_create_range(range_id) + range_entry["TPS"] = info.get("TPS", 0) + base_pages = info.get("base_pages", 0) + for page_index in range(base_pages): + self._ensure_logical_page(range_id, 0, page_index) + for segment_offset, page_count in enumerate(info.get("tail_pages", []), start=1): + for page_index in range(page_count): + self._ensure_logical_page(range_id, segment_offset, page_index) def add_record( @@ -339,40 +307,33 @@ def add_record( ) if not is_tail: - with self._lock: - range_id = self.num_base_records // Config.records_per_range - offset = self.base_offsets[range_id] - rid = self.encode_rid(range_id, 0, offset) - self.base_offsets[range_id] += 1 - self.num_base_records += 1 + range_id = self.num_base_records // Config.records_per_range + offset = self.base_offsets[range_id] + rid = self.encode_rid(range_id, 0, offset) + self.base_offsets[range_id] += 1 columns[Config.indirection_column] = Config.null_value columns[Config.schema_encoding_column] = 0 else: if base_rid == Config.null_value: raise ValueError("Tail records require a valid base RID") - schedule_merge = False - merge_hint = None - with self._lock: - base_range = self.decode_rid(base_rid)[0] + base_range = self.decode_rid(base_rid)[0] + total_offset = self.tail_offsets[base_range] + pending_updates = total_offset - self.tail_merge_progress[base_range] + if pending_updates >= self.merge_thresholds[base_range]: + self.merge_range(base_range) + self.merge_thresholds[base_range] = max(1, self.merge_thresholds[base_range] * 2) total_offset = self.tail_offsets[base_range] - pending_updates = total_offset - self.tail_merge_progress[base_range] - schedule_merge = pending_updates >= self.merge_thresholds[base_range] - if schedule_merge: - merge_hint = total_offset - - max_capacity = Config.records_per_range * Config.max_tail_segments - if total_offset >= max_capacity: - raise RuntimeError("Tail range is full; merge required before inserting more tail records") - segment_index = total_offset // Config.records_per_range - segment_offset = total_offset % Config.records_per_range - segment_id = 1 + segment_index + max_capacity = Config.records_per_range * Config.max_tail_segments + if total_offset >= max_capacity: + raise RuntimeError("Tail range is full; merge required before inserting more tail records") - rid = self.encode_rid(base_range, segment_id, segment_offset) - - self.tail_offsets[base_range] += 1 - self.num_tail_records += 1 + segment_index = total_offset // Config.records_per_range + segment_offset = total_offset % Config.records_per_range + segment_id = 1 + segment_index + rid = self.encode_rid(base_range, segment_id, segment_offset) + self.tail_offsets[base_range] += 1 columns[Config.schema_encoding_column] = self.build_schema_encoding(columns) base_record = self.get_record_from_rid(base_rid) base_indirection = base_record[Config.indirection_column] @@ -380,8 +341,7 @@ def add_record( base_indirection if base_indirection != Config.null_value else base_rid ) columns[Config.base_rid_column] = base_rid - if schedule_merge and self._merge_request_callback: - self._merge_request_callback(base_range, merge_hint) + columns[Config.timestamp_column] = int(time()) range_id, segment_id, page_index, _ = self.decode_rid(rid) self._ensure_logical_page(range_id, segment_id, page_index) @@ -394,6 +354,11 @@ def add_record( f"Failed to append value to page (range={range_id}, segment={segment_id}, page={page_index})" ) + if not is_tail: + self.num_base_records += 1 + else: + self.num_tail_records += 1 + if is_tail and base_rid != Config.null_value: self.update_base_record(base_rid, columns) @@ -557,156 +522,67 @@ def delete_record(self, rid: int): return True def merge_range(self, range_id): - task = self.prepare_merge_task(range_id, force=True) - if not task: + range_info = self.page_directory.get(range_id) + if not range_info: return False - snapshot = self.build_merge_snapshot(task) - if not snapshot: - self.cancel_merge_task(range_id, task.end_offset) + tail_segments = range_info["tail"] + start_offset = self.tail_merge_progress[range_id] + end_offset = self.tail_offsets[range_id] + if start_offset >= end_offset: return False - return self.commit_merge_snapshot(snapshot) - - def prepare_merge_task( - self, - range_id: int, - force: bool = False, - end_offset_hint: Optional[int] = None, - ) -> Optional[RangeMergeTask]: - with self._lock: - start_offset = self.tail_merge_progress[range_id] - end_offset = self.tail_offsets[range_id] - if end_offset_hint is not None: - end_offset = min(end_offset, end_offset_hint) - if start_offset >= end_offset: - return None - - in_progress = self._merge_in_progress.get(range_id, 0) - if in_progress and not force: - return None - - self._merge_in_progress[range_id] = end_offset - return RangeMergeTask(range_id, start_offset, end_offset) - - def cancel_merge_task(self, range_id: int, expected_end: int) -> None: - with self._lock: - if self._merge_in_progress.get(range_id) == expected_end: - self._merge_in_progress.pop(range_id, None) - - def _tps_from_offset(self, range_id: int, end_offset: int) -> int: - if end_offset <= 0: - return 0 - last_offset = end_offset - 1 - last_segment_index = last_offset // Config.records_per_range - last_segment_id = 1 + last_segment_index - last_segment_offset = last_offset % Config.records_per_range - return self.encode_rid(range_id, last_segment_id, last_segment_offset) - - def build_merge_snapshot(self, task: RangeMergeTask) -> Optional[RangeMergeSnapshot]: - if task.start_offset >= task.end_offset: - return None - - range_id = task.range_id - with self._lock: - range_info = self.page_directory.get(range_id) - if not range_info: - return None - tail_segments = [list(segment_pages) for segment_pages in range_info["tail"]] - - updates: Dict[Tuple[int, int, int], int] = {} - updated_slots: Set[Tuple[int, int]] = set() - - records_per_range = Config.records_per_range - records_per_page = Config.records_per_page - - start_segment = task.start_offset // records_per_range - end_segment = (task.end_offset - 1) // records_per_range - - for segment_index in range(start_segment, end_segment + 1): - tail_idx = segment_index - if tail_idx >= len(tail_segments): - break - segment_pages = tail_segments[tail_idx] - segment_start = max(task.start_offset, segment_index * records_per_range) - segment_end = min(task.end_offset, (segment_index + 1) * records_per_range) - segment_id = segment_index + 1 - - start_page = (segment_start - segment_index * records_per_range) // records_per_page - end_page = (segment_end - 1 - segment_index * records_per_range) // records_per_page - - for page_index in range(start_page, min(len(segment_pages), end_page + 1)): - page_start_offset = segment_index * records_per_range + page_index * records_per_page - page_end_offset = page_start_offset + records_per_page - read_start = max(segment_start, page_start_offset) - read_end = min(segment_end, page_end_offset) - if read_start >= read_end: - continue - - try: - self._ensure_page_exists_for_read(range_id, segment_id, page_index) - except RuntimeError: - continue - with self._logical_page(range_id, segment_id, page_index) as tail_pages: + merged_any = False + for segment_index, segment_pages in enumerate(tail_segments, start=1): + for page_index, _ in enumerate(segment_pages): + with self._logical_page(range_id, segment_index, page_index) as tail_pages: if not tail_pages: continue - - slot_start = read_start - page_start_offset - slot_end = read_end - page_start_offset - for tail_slot in range(slot_start, slot_end): + num_records = tail_pages[Config.rid_column].num_records + for tail_slot in range(num_records): tail_columns = [column_page.read(tail_slot) for column_page in tail_pages] + tail_rid = tail_columns[Config.rid_column] base_rid = tail_columns[Config.base_rid_column] if base_rid == Config.null_value: continue - base_range, _, base_page_index, base_slot_index = self.decode_rid(base_rid) - if base_range != range_id: + _, tail_segment, tail_page_index, tail_slot_index = self.decode_rid(tail_rid) + tail_segment_index = max(0, tail_segment - 1) + tail_offset = ( + tail_segment_index * Config.records_per_range + + tail_page_index * Config.records_per_page + + tail_slot_index + ) + if tail_offset < start_offset or tail_offset >= end_offset: continue - data_offset = Config.tail_meta_columns - for column_index in range(self.num_columns): - value = tail_columns[data_offset + column_index] - if value == Config.null_value: - continue - physical_column = Config.base_meta_columns + column_index - updates[(base_page_index, base_slot_index, physical_column)] = value - updated_slots.add((base_page_index, base_slot_index)) - - tps_value = self._tps_from_offset(range_id, task.end_offset) - return RangeMergeSnapshot(task=task, updates=updates, updated_slots=updated_slots, tps_value=tps_value) - - def commit_merge_snapshot(self, snapshot: RangeMergeSnapshot) -> bool: - range_id = snapshot.task.range_id - with self._lock: - if snapshot.task.end_offset <= self.tail_merge_progress[range_id]: - self._merge_in_progress.pop(range_id, None) - return False - zero_schema = self.tail_offsets[range_id] == snapshot.task.end_offset - - for (page_index, slot_index, column_index), value in snapshot.updates.items(): - with self._column(range_id, 0, page_index, column_index) as column_page: - column_page.write_slot(slot_index, value) - - if zero_schema: - for page_index, slot_index in snapshot.updated_slots: - with self._column(range_id, 0, page_index, Config.schema_encoding_column) as schema_page: - schema_page.write_slot(slot_index, 0) - - with self._lock: - self.tail_merge_progress[range_id] = snapshot.task.end_offset - range_info = self.page_directory.get(range_id) - if range_info is not None: - range_info["TPS"] = snapshot.tps_value - max_threshold = max(1, Config.records_per_range // 4) - doubled_threshold = max(1, self.merge_thresholds[range_id] * 2) - self.merge_thresholds[range_id] = min(doubled_threshold, max_threshold) - self._merge_in_progress.pop(range_id, None) - return True + base_range, _, base_page_index, base_slot_index = self.decode_rid(base_rid) + self._ensure_page_exists_for_read(base_range, 0, base_page_index) + with self._logical_page(base_range, 0, base_page_index) as base_pages: + for column_index in range(self.num_columns): + value = tail_columns[Config.tail_meta_columns + column_index] + if value == Config.null_value: + continue + base_column_page = base_pages[Config.base_meta_columns + column_index] + base_column_page.write_slot(base_slot_index, value) + + base_schema_page = base_pages[Config.schema_encoding_column] + base_schema_page.write_slot(base_slot_index, 0) + merged_any = True + + if not merged_any: + return False - def has_inflight_merge(self, range_id: Optional[int] = None) -> bool: - with self._lock: - if range_id is None: - return any(value > 0 for value in self._merge_in_progress.values()) - return self._merge_in_progress.get(range_id, 0) > 0 + self.tail_merge_progress[range_id] = end_offset + if end_offset == 0: + tps_value = 0 + else: + last_offset = end_offset - 1 + last_segment_index = last_offset // Config.records_per_range + last_segment_id = 1 + last_segment_index + last_segment_offset = last_offset % Config.records_per_range + tps_value = self.encode_rid(range_id, last_segment_id, last_segment_offset) + range_info["TPS"] = tps_value + return True class Table: @@ -727,20 +603,15 @@ def __init__( self.key = key self.num_columns = num_columns self.bufferpool = bufferpool or Bufferpool() - self._merge_jobs: "queue.Queue[Optional[RangeMergeTask]]" = queue.Queue() - self._merge_results: "queue.Queue[RangeMergeSnapshot]" = queue.Queue() - self._merge_shutdown = threading.Event() self.page_directory = PageDirectory( name, num_columns, self.bufferpool, Config.initial_page_ranges, metadata=directory_metadata, - merge_request_callback=self._request_merge, ) self.index = Index(self) - self._merge_thread = threading.Thread(target=self._merge_loop, name=f"{name}-merger", daemon=True) - self._merge_thread.start() + pass def to_metadata(self) -> dict: return { @@ -798,24 +669,16 @@ def get_cumulative_updated_record(self, rid: int): """ return self.page_directory.get_cumulative_updated_record_from_base_rid(rid) - def insert_record( - self, - columns: list[int], - is_tail: bool = False, - base_rid: int = Config.null_value, - index_prior_data: Optional[list[int]] = None, - index_new_data: Optional[list[int]] = None, - ): + def insert_record(self, columns: list[int], is_tail: bool = False, base_rid: int = Config.null_value): """ Inserts a record into the table :param columns: list[int] - the columns of the record :param base_rid: int - the RID of the base record, only used for tail records :return: int - the RID of the record """ - self._tick_merges() - cached_prior_data = None - if is_tail and base_rid != Config.null_value and index_prior_data is None: - cached_prior_data = self.get_cumulative_updated_record(base_rid)[ + prior_data = None + if is_tail: + prior_data = self.get_cumulative_updated_record(base_rid)[ Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns ] @@ -827,17 +690,10 @@ def insert_record( ] self.index.add(rid, base_data) else: - prior_values = index_prior_data if index_prior_data is not None else cached_prior_data - if prior_values is None: - prior_values = self.get_cumulative_updated_record(base_rid)[ - Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns - ] - new_values = index_new_data - if new_values is None: - new_values = self.get_cumulative_updated_record(base_rid)[ - Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns - ] - self.index.update(base_rid, prior_values, new_values) + updated_data = self.get_cumulative_updated_record(base_rid)[ + Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns + ] + self.index.update(base_rid, prior_data, updated_data) return rid def delete_record(self, rid: int): @@ -846,7 +702,6 @@ def delete_record(self, rid: int): :param rid: int - the RID of the base record :return: bool - whether the record was deleted """ - self._tick_merges() try: base_record = self.page_directory.get_record_from_rid(rid) except RuntimeError: @@ -860,68 +715,7 @@ def delete_record(self, rid: int): except ValueError: return False - def _merge_loop(self): - while not self._merge_shutdown.is_set(): - try: - task = self._merge_jobs.get(timeout=0.1) - except queue.Empty: - continue - - if task is None: - if self._merge_shutdown.is_set(): - break - continue - - try: - snapshot = self.page_directory.build_merge_snapshot(task) - if snapshot: - self._merge_results.put(snapshot) - else: - self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - except (RuntimeError, ValueError, IndexError) as e: - print(f"Error building merge snapshot: {e}") - self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - - def _request_merge(self, range_id: int, end_offset_hint: Optional[int] = None): - task = self.page_directory.prepare_merge_task(range_id, end_offset_hint=end_offset_hint) - if task: - self._merge_jobs.put(task) - - def _apply_ready_merges(self): - applied = 0 - while True: - try: - snapshot = self._merge_results.get_nowait() - except queue.Empty: - break - self.page_directory.commit_merge_snapshot(snapshot) - applied += 1 - return applied - - def _tick_merges(self): - self._apply_ready_merges() - - def wait_for_merges(self, timeout: float = 2.0): - deadline = time() + timeout - while time() < deadline: - self._apply_ready_merges() - if self._merge_jobs.empty() and self._merge_results.empty() and not self.page_directory.has_inflight_merge(): - break - sleep(0.01) - self._apply_ready_merges() - - def close(self): - if self._merge_shutdown.is_set(): - return - self._merge_shutdown.set() - self._merge_jobs.put(None) - if self._merge_thread.is_alive(): - self._merge_thread.join(timeout=1.0) - if self._merge_thread.is_alive(): - print("Merge thread did not join in time, force-killing") - print(f"Warning: merge thread for table {self.name} did not stop within timeout") - self.wait_for_merges() - - def __del__(self): - # Best-effort cleanup to avoid leaking background threads in tests. - self.close() + def __merge(self): + print("merge is happening") + pass + diff --git a/lstore/tests/test_indexing.py b/lstore/tests/test_indexing.py deleted file mode 100644 index b6d4df3..0000000 --- a/lstore/tests/test_indexing.py +++ /dev/null @@ -1,36 +0,0 @@ -from lstore.db import Database -from lstore.query import Query - -import sys -from pathlib import Path - -PROJECT_ROOT = Path(__file__).resolve().parents[2] -if str(PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(PROJECT_ROOT)) - -def test_indexing(): - db = Database() - table = db.create_table('Test', 3, 0) - query = Query(table) - - # Insert some records - query.insert(1, 100, 200) - query.insert(2, 100, 300) # Note: same value in column 1 - query.insert(3, 150, 400) - - # Create secondary index on column 1 - assert table.index.create_index(1) - - # Should find both records with value 100 in column 1 - results = query.select(100, 1, [1, 1, 1]) - assert len(results) == 2 - print(f"Found {len(results)} records") # Should print: Found 2 records - - # Try to create the same index again - should return False - assert not table.index.create_index(1) - - # Drop the index - assert table.index.drop_index(1) - - # Can't drop primary key index - assert not table.index.drop_index(0) diff --git a/lstore/tests/test_query.py b/lstore/tests/test_query.py index 6633d78..3e5b72d 100644 --- a/lstore/tests/test_query.py +++ b/lstore/tests/test_query.py @@ -84,3 +84,32 @@ def test_query_deletion_tombstone_behavior(): survivors = query.select(1, 0, [1, 1, 1, 1, 1]) assert len(survivors) == 1 + + +def test_primary_key_updates_require_uniqueness(): + table, query = _make_grades_table() + + original = [11, 5, 6, 7, 8] + other = [22, 1, 2, 3, 4] + assert query.insert(*original) + assert query.insert(*other) + + # Update pk to an unused value and tweak another column in the same update. + updated_values = [None] * table.num_columns + updated_values[table.key] = 33 + updated_values[1] = 99 + assert query.update(original[0], *updated_values) + + assert query.select(original[0], table.key, [1] * table.num_columns) == [] + refreshed = query.select(33, table.key, [1] * table.num_columns) + assert len(refreshed) == 1 + assert refreshed[0].columns == [33, 99, original[2], original[3], original[4]] + + # Reject conflicting primary-key changes. + conflicting_values = [None] * table.num_columns + conflicting_values[table.key] = 33 + assert not query.update(other[0], *conflicting_values) + + still_there = query.select(other[0], table.key, [1] * table.num_columns) + assert len(still_there) == 1 + assert still_there[0].columns == other diff --git a/lstore/tests/test_table.py b/lstore/tests/test_table.py index 890cd87..84a1d2b 100644 --- a/lstore/tests/test_table.py +++ b/lstore/tests/test_table.py @@ -252,11 +252,9 @@ def insert_tail(value): assert base_data() == [100, 10, 20] insert_tail(55) - grades_table.wait_for_merges() assert base_data() == [100, 10, 20] insert_tail(65) - grades_table.wait_for_merges() assert base_data() == [100, 55, 20] assert directory.merge_thresholds[0] == 2 @@ -281,37 +279,12 @@ def insert_tail(value): insert_tail(50) insert_tail(60) - grades_table.wait_for_merges() assert base_data() == [100, 50, 20] assert directory.merge_thresholds[0] == 2 insert_tail(70) - grades_table.wait_for_merges() assert base_data() == [100, 50, 20] insert_tail(80) - grades_table.wait_for_merges() assert base_data() == [100, 70, 20] assert directory.merge_thresholds[0] == 4 - - -def test_background_merge_applies_updates_without_manual_trigger(): - grades_table = Table("grades", num_columns=3, key=0) - base_rid = grades_table.insert_record( - _build_base_record(grades_table.num_columns, 999, 5, 6), - is_tail=False, - ) - directory = grades_table.page_directory - directory.merge_thresholds[0] = 1 - - tail_record = _build_tail_record(grades_table.num_columns, {1: 123}) - grades_table.insert_record(tail_record, is_tail=True, base_rid=base_rid) - - noop_tail = _build_tail_record(grades_table.num_columns, {}) - grades_table.insert_record(noop_tail, is_tail=True, base_rid=base_rid) - - grades_table.wait_for_merges() - merged_data = grades_table.get_record(base_rid)[ - Config.base_meta_columns : Config.base_meta_columns + grades_table.num_columns - ] - assert merged_data == [999, 123, 6] diff --git a/test_indexing.py b/test_indexing.py new file mode 100644 index 0000000..839d7d5 --- /dev/null +++ b/test_indexing.py @@ -0,0 +1,28 @@ +from lstore.db import Database +from lstore.query import Query + +db = Database() +table = db.create_table('Test', 3, 0) +query = Query(table) + +# Insert some records +query.insert(1, 100, 200) +query.insert(2, 100, 300) # Note: same value in column 1 +query.insert(3, 150, 400) + +# Create secondary index on column 1 +assert table.index.create_index(1) + +# Should find both records with value 100 in column 1 +results = query.select(100, 1, [1, 1, 1]) +assert len(results) == 2 +print(f"Found {len(results)} records") # Should print: Found 2 records + +# Try to create the same index again - should return False +assert not table.index.create_index(1) + +# Drop the index +assert table.index.drop_index(1) + +# Can't drop primary key index +assert not table.index.drop_index(0)