From 5aa68f9074b41735e45f0489e2dd0b1e7670a5bf Mon Sep 17 00:00:00 2001 From: Jack Lund <118699768+lundj227@users.noreply.github.com> Date: Thu, 20 Nov 2025 11:29:10 -0800 Subject: [PATCH 1/7] Merge pull request #28 from flamboh/jack added assignment 3 instructions in a txt file --- assignment3.txt | 150 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 assignment3.txt diff --git a/assignment3.txt b/assignment3.txt new file mode 100644 index 0000000..57afb9b --- /dev/null +++ b/assignment3.txt @@ -0,0 +1,150 @@ +Assignment 3: Multi-threaded, In-memory & Durable L-Store +CS 451/551 - Fall 2025 +Deadline: November 30, 2025 at 11:59pm +In this assignment, we will add to the L-Store support for transactional semantics and +concurrent execution. +The main objective of this assignment consists of two parts: +(1) Transaction Semantics: to create the concept of the multi-statement transaction with the +property that either all statements (operations) are successfully executed and the transaction +commits or none will and the transaction aborts (i.e., atomicity). +(2) Concurrency Control: to allow running multiple transactions concurrently while providing +serializable isolation semantics by employing two-phase locking (2PL) without the need to wait +for locks. +The overall goal of this milestone is to create a multi-threaded and durable L-Store capable of +handling transactions. +# Transaction Semantics +In database systems, a transaction is a logical unit of work that accesses and/or modifies the +database, and it may contain one or more read and write operations. A transaction in a database +must maintain four essential properties: Atomicity, Consistency, Isolation, and Durability, +commonly known together as ACID. +Atomicity: Transactions are often composed of multiple statements (read or write operations). +Atomicity guarantees that each transaction is treated as a single "unit", which either succeeds +completely, or fails completely: if any of the statements constituting a transaction fails to complete, +the entire transaction fails, and the database is left unchanged. An atomic system must guarantee +atomicity in every situation, including power failures, errors, and crashes. A guarantee of atomicity +prevents updates to the database from occurring only partially. +Isolation: Transactions are often executed concurrently (e.g., multiple transactions reading and +writing to a table at the same time). Isolation ensures that the concurrent execution of transactions +leaves the database in the same state that would have been obtained if the transactions were +executed sequentially. Isolation is the main goal of concurrency control; depending on the method +used, the effects of an incomplete transaction might not even be visible to other transactions. +Durability: Durability guarantees that once a transaction has been committed, it will remain +committed even in the case of a system failure (e.g., power outage or crash). This usually means +that completed transactions (or their effects) are recorded in non-volatile memory. +In the previous assignment, we focused on the durability aspect. The first goal of this assignment +is to add the notion of a multi-statement transaction, that is, to create a transaction consisting of +a set of read and write operations where its execution adheres to the atomicity property. If one of +the transaction operations fails (perhaps due to failure in acquiring locks), the transaction must +abort, and all effects of the transaction (any operation executed already) must be rolled back and +undo. Any base/tail record created as a result of an aborted transaction need not be removed from +the database, it can just be marked as deleted. If the transaction runs successfully, it should commit +to the database and the resulting changes should persist. For an aborted transaction, the thread +should keep trying to execute it until it commits. +# Multithreading Concurrency Control +Until now, our L-Store implementation has been limited to a single-threaded execution, namely, +serial execution of transactions one at a time. However, any commercial database must have the +ability to support the concurrent execution of transactions in order to fully utilize all available +hardware resources. +The concurrent execution adds many interesting challenges to the database design and +implementation; protecting against race conditions while coping with the contention that may +occur among threads that access shared data. In general, it is the role of the concurrency control +(offering the Isolation property of ACID) layer or transaction manager to handle these concurrency +intricacies through the use of clever synchronization primitives such as locks and semaphores (i.e., +pessimistic concurrency). +We will adopt the strict 2PL protocol for this milestone along with no wait property (which +eliminates deadlocks), meaning if a transaction requests a shared or exclusive lock on a record that +cannot be granted, the transaction simply aborts and undo any changes it has made. You may create +a lock manager (typically a hashtable) that would allow (un)locking each record by a transaction. +You have complete freedom on how to implement your 2PL and lock manager. Of course, you are +encouraged to implement any other advanced concurrency protocols that you wish as a bonus. +Furthermore, you need to pay attention when accessing any shared data structures such as indexes +or bufferpool. You need to protect these data structures with an additional set of locks, and you +have the complete freedom to design your own scheme. Due to the Global Interpreter Lock +(GIL), real multithreading is not achievable in CPython as the CPython interpreter only allows +one thread to run Python bytecode at a time, a limitation of the language. However, multithreading +conceptually is possible and useful, especially when performing any I/O operations, as they are +handled outside of the interpreter. Therefore, when a thread is blocked by an I/O request, another +thread can still run the bytecode. As a result, Python code can only achieve concurrency, not +true parallelism. +Note that although no two threads can access the same resource at the same time due to the GIL +limitation, race conditions can still occur as several operations, such as evicting a page from the +buffer pool, are not inherently atomic, meaning a context switch to a different thread might happen +in the middle of the operation. If not handled properly, these situations can result in inconsistencies +and data corruption. More importantly, when executing multi-statement transactions, multiple +transactions may access an overlapping set of records, which is why 2PL is needed. +One should use the threading module in Python to work with threads. This module provides a highlevel threading interface and synchronization primitives. As thread creations are costly, databases +often avoid spawning and removing threads on the fly and rely on a fixed-size number of worker +threads (a pool of threads) initialized at the start of the application to distribute the workload. +Transactions will be assigned to worker threads, and they will concurrently execute the assigned +transactions to them. The threading module offers implementations for a number of common +locking primitives. The threading.Lock class is an implementation of a Mutually Exclusive +(Mutex) lock that can be used by the 2PL protocol for locking records. Note that a Lock does not +by itself “lock” any objects but is merely acquired or released by different threads. The +programmer decides what resources are to be protected by each lock. +# Implementation +We have provided a code skeleton that can be used as a baseline for developing your project. This +skeleton is merely a suggestion, and you are free and even encouraged to come up with your own +design. You will find three main classes in the provided skeleton. Some of the needed methods in +each class are provided as stubs. But you must implement the APIs listed in db.py, query.py, +table.py, transaction.py, transaction_worker.py, and index.py; you also need to ensure that you can +run main.py and tester files to allow auto-grading as well. We have provided several such methods +to guide you through the implementation. +The Database class is a general interface to the database and handles high-level operations such +as starting and shutting down the database instance and loading the database from stored disk files. +This class also handles the creation and deletion of tables via the create and drop function. The +create function will create a new table in the database. The Table constructor takes as input the +name of the table, the number of columns, and the index of the key column. The drop function +drops the specified table. In this milestone, we have also added open and close functions for +reading and writing all data (not the indexes) to files at the restart. +The Query class provides standard SQL operations such as insert, select, update, delete, and sum. +The select function returns the specified set of columns from the record with the given search key +(the search is not the same as the primary key). In this assignment, we use any column as the search +key for the select function; thus, returning more than one row and exploiting secondary indexes to +speed up the querying. The insert function will insert a new record in the table. All columns should +be passed a non-NULL value when inserting. The update function updates values for the specified +set of columns. The delete function will delete the record with the specified key from the table. +The sum function will sum over the values of the selected column for a range of records specified +by their key values. We query tables by direct function calls rather than parsing SQL queries. +The Transaction class allows for the creation and management of transactions. Queries are added +to the transaction through the add_query method. This method takes as input a Query object, the +method (update, select, etc.) to be called on that query and the arguments to the method. These +will be saved in a list and called in the order they were added when the query is run. +The TransactionWorker class is a representation of a worker thread in the template code. It is +initialized with a list of transactions to run concurrently with other worker instances. You may +create a fixed number of workers, each with its own thread, and pass them a list of functions to +run. The tester code for this milestone will create its own TransactionWorker instances and assign +Transactions to them. +The Table class provides the core of our relational storage functionality. All columns are 64-bit +integers in this implementation. Users mainly interact with tables through queries. Tables provide +a logical view of the actual physically stored data and mostly manage the storage and retrieval of +data. Each table is responsible for managing its pages and requires an internal page directory that, +given a RID, returns the actual physical location of the record. The table class should also manage +the periodical merge of its corresponding page ranges. +The Index class provides the interface to add or remove indexes to speed up queries (e.g., select +or update). Each Table will have a single Index object accessible through table.index that holds the +indices on various columns. Given a search key on a column, its index should efficiently locate all +records matching the search key. The primary key column of all tables is indexed by default. The +external API for this class exposes the two functions create_index and drop_index. These functions +are accessed by the tester through the table.index handle. No index should be created on a non-key +column unless the user has called create_index. +The Page class provides low-level physical storage capabilities. In the provided skeleton, each +page has a fixed size of 4096 KB. This should provide optimal performance when persisting to +disk, as most hard drives have blocks of the same size. You can experiment with different sizes. +This class is mostly used internally by the Table class to store and retrieve records. While working +with this class, keep in mind that tail and base pages should be identical from the hardware’s point +of view. +The config.py file is meant to act as centralized storage for all the configuration options and the +constant values used in the code. It is good practice to organize such information into a Singleton +object accessible from every file in the project. This class will find more use when implementing +persistence. +Self-Testing Scripts: +You will also observe that there are several scripts in the repository, namely, m1/m2/m3_tester.py +and exam_tester_m1/m2/m3.py. Those scripts are for your self-debugging and testing. Here, m1 +refers to the assignment 1, m2 is for assignment 2, and m3 is for assignment 3. +Milestone Deliverables/Grading Scheme: What to submit? +At the end of this assignment, each team needs to submit a working L-Store implementation. Your +submission should have a working and correct implementation of multi-threaded L-Store (a zip +folder). Further, your submission should successfully run and pass main.py; otherwise, a grade +of zero will be received on the auto-grading component of the assignment. No presentation needs +to be submitted with this assignment. The submission is made through Canvas, and only one group +member must submit the package on behalf of the entire group. \ No newline at end of file From dc8b8b83d8e36c0f4f802791d84a65d3def1a406 Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 20 Nov 2025 14:58:03 -0800 Subject: [PATCH 2/7] update gitignore, fix to meet milestone 2 primary key tests --- .gitignore | 5 ++++- lstore/query.py | 7 +++++-- lstore/tests/test_query.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 0cda74d..835c9c9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,7 @@ testM1.py testM2.py testM3.py -**/CS451 \ No newline at end of file +**/CS451 +**/CT +**/MT +**/M2 \ No newline at end of file diff --git a/lstore/query.py b/lstore/query.py index 3be6c0b..215e6ba 100644 --- a/lstore/query.py +++ b/lstore/query.py @@ -193,12 +193,15 @@ def update(self, primary_key, *columns): return False current_data = current_record[Config.tail_meta_columns:] - # disallow primary-key modifications + # allow primary-key modifications only when the new key is unique pk_index = self.table.key requested_pk = columns[pk_index] current_pk = current_data[pk_index] if requested_pk is not None and requested_pk != current_pk: - return False + # allow primary-key changes only if the new key does not already exist + existing_rids = self.table.index.locate(pk_index, requested_pk) + if any(rid != base_rid for rid in existing_rids): + return False # create tail record with metadata tail_meta = [Config.null_value for _ in range(Config.tail_meta_columns)] diff --git a/lstore/tests/test_query.py b/lstore/tests/test_query.py index 6633d78..3e5b72d 100644 --- a/lstore/tests/test_query.py +++ b/lstore/tests/test_query.py @@ -84,3 +84,32 @@ def test_query_deletion_tombstone_behavior(): survivors = query.select(1, 0, [1, 1, 1, 1, 1]) assert len(survivors) == 1 + + +def test_primary_key_updates_require_uniqueness(): + table, query = _make_grades_table() + + original = [11, 5, 6, 7, 8] + other = [22, 1, 2, 3, 4] + assert query.insert(*original) + assert query.insert(*other) + + # Update pk to an unused value and tweak another column in the same update. + updated_values = [None] * table.num_columns + updated_values[table.key] = 33 + updated_values[1] = 99 + assert query.update(original[0], *updated_values) + + assert query.select(original[0], table.key, [1] * table.num_columns) == [] + refreshed = query.select(33, table.key, [1] * table.num_columns) + assert len(refreshed) == 1 + assert refreshed[0].columns == [33, 99, original[2], original[3], original[4]] + + # Reject conflicting primary-key changes. + conflicting_values = [None] * table.num_columns + conflicting_values[table.key] = 33 + assert not query.update(other[0], *conflicting_values) + + still_there = query.select(other[0], table.key, [1] * table.num_columns) + assert len(still_there) == 1 + assert still_there[0].columns == other From 45cf911852e4ab197bf37a80cffa848c90b93ce1 Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 20 Nov 2025 15:23:52 -0800 Subject: [PATCH 3/7] Revert "tail merge schedule fixed" This reverts commit cfc1a4e8059b629a9778f1a499b1a62cf40f03f1. --- lstore/table.py | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/lstore/table.py b/lstore/table.py index c663c53..8c28ffd 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -111,7 +111,7 @@ def __init__( bufferpool: Bufferpool, num_ranges: int = Config.initial_page_ranges, metadata: Optional[dict] = None, - merge_request_callback: Optional[Callable[[int, Optional[int]], None]] = None, + merge_request_callback: Optional[Callable[[int], None]] = None, ): self.table_name = table_name self.bufferpool = bufferpool @@ -350,15 +350,14 @@ def add_record( else: if base_rid == Config.null_value: raise ValueError("Tail records require a valid base RID") + schedule_merge = False - merge_hint = None with self._lock: base_range = self.decode_rid(base_rid)[0] total_offset = self.tail_offsets[base_range] pending_updates = total_offset - self.tail_merge_progress[base_range] - schedule_merge = pending_updates >= self.merge_thresholds[base_range] - if schedule_merge: - merge_hint = total_offset + if pending_updates >= self.merge_thresholds[base_range]: + schedule_merge = True max_capacity = Config.records_per_range * Config.max_tail_segments if total_offset >= max_capacity: @@ -370,9 +369,6 @@ def add_record( rid = self.encode_rid(base_range, segment_id, segment_offset) - self.tail_offsets[base_range] += 1 - self.num_tail_records += 1 - columns[Config.schema_encoding_column] = self.build_schema_encoding(columns) base_record = self.get_record_from_rid(base_rid) base_indirection = base_record[Config.indirection_column] @@ -381,7 +377,11 @@ def add_record( ) columns[Config.base_rid_column] = base_rid if schedule_merge and self._merge_request_callback: - self._merge_request_callback(base_range, merge_hint) + self._merge_request_callback(base_range) + with self._lock: + self.tail_offsets[base_range] += 1 + self.num_tail_records += 1 + columns[Config.timestamp_column] = int(time()) range_id, segment_id, page_index, _ = self.decode_rid(rid) self._ensure_logical_page(range_id, segment_id, page_index) @@ -566,17 +566,10 @@ def merge_range(self, range_id): return False return self.commit_merge_snapshot(snapshot) - def prepare_merge_task( - self, - range_id: int, - force: bool = False, - end_offset_hint: Optional[int] = None, - ) -> Optional[RangeMergeTask]: + def prepare_merge_task(self, range_id: int, force: bool = False) -> Optional[RangeMergeTask]: with self._lock: start_offset = self.tail_merge_progress[range_id] end_offset = self.tail_offsets[range_id] - if end_offset_hint is not None: - end_offset = min(end_offset, end_offset_hint) if start_offset >= end_offset: return None @@ -882,8 +875,8 @@ def _merge_loop(self): print(f"Error building merge snapshot: {e}") self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - def _request_merge(self, range_id: int, end_offset_hint: Optional[int] = None): - task = self.page_directory.prepare_merge_task(range_id, end_offset_hint=end_offset_hint) + def _request_merge(self, range_id: int): + task = self.page_directory.prepare_merge_task(range_id) if task: self._merge_jobs.put(task) @@ -917,11 +910,8 @@ def close(self): self._merge_jobs.put(None) if self._merge_thread.is_alive(): self._merge_thread.join(timeout=1.0) - if self._merge_thread.is_alive(): - print("Merge thread did not join in time, force-killing") - print(f"Warning: merge thread for table {self.name} did not stop within timeout") self.wait_for_merges() def __del__(self): # Best-effort cleanup to avoid leaking background threads in tests. - self.close() + self.close() \ No newline at end of file From 3f7785c1ec95771c570747e137294b54478e5d0a Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 20 Nov 2025 15:23:52 -0800 Subject: [PATCH 4/7] Revert "remove general exception handeler in table" This reverts commit a8a325c6766ea9e586b81bebaa6fb203a55ef0cd. --- lstore/table.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lstore/table.py b/lstore/table.py index 8c28ffd..83f106d 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -720,8 +720,8 @@ def __init__( self.key = key self.num_columns = num_columns self.bufferpool = bufferpool or Bufferpool() - self._merge_jobs: "queue.Queue[Optional[RangeMergeTask]]" = queue.Queue() - self._merge_results: "queue.Queue[RangeMergeSnapshot]" = queue.Queue() + self._merge_jobs: 'queue.Queue[Optional[RangeMergeTask]]' = queue.Queue() + self._merge_results: 'queue.Queue[RangeMergeSnapshot]' = queue.Queue() self._merge_shutdown = threading.Event() self.page_directory = PageDirectory( name, @@ -871,8 +871,7 @@ def _merge_loop(self): self._merge_results.put(snapshot) else: self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - except (RuntimeError, ValueError, IndexError) as e: - print(f"Error building merge snapshot: {e}") + except Exception: self.page_directory.cancel_merge_task(task.range_id, task.end_offset) def _request_merge(self, range_id: int): From fe4494c2d306b91b40424f5951644974231b2874 Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 20 Nov 2025 15:23:52 -0800 Subject: [PATCH 5/7] Revert "fix type instantiation issue with merge jobs and results" This reverts commit f13b3a74db8e408c8d78f60fc325e928b25e62f2. --- lstore/table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lstore/table.py b/lstore/table.py index 83f106d..67ad1a0 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -720,8 +720,8 @@ def __init__( self.key = key self.num_columns = num_columns self.bufferpool = bufferpool or Bufferpool() - self._merge_jobs: 'queue.Queue[Optional[RangeMergeTask]]' = queue.Queue() - self._merge_results: 'queue.Queue[RangeMergeSnapshot]' = queue.Queue() + self._merge_jobs: queue.Queue[Optional[RangeMergeTask]] = queue.Queue() + self._merge_results: queue.Queue[RangeMergeSnapshot] = queue.Queue() self._merge_shutdown = threading.Event() self.page_directory = PageDirectory( name, From 1e61ea7b56a699af29247795249471769a21884b Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 20 Nov 2025 15:23:52 -0800 Subject: [PATCH 6/7] Revert "remove unused __merge function, move indexing test into tests folder" This reverts commit fdcb66af9ff2f122fb296107062cca67ee05c898. --- lstore/table.py | 7 ++++++- lstore/tests/test_indexing.py | 36 ----------------------------------- test_indexing.py | 28 +++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 37 deletions(-) delete mode 100644 lstore/tests/test_indexing.py create mode 100644 test_indexing.py diff --git a/lstore/table.py b/lstore/table.py index 67ad1a0..006ee5d 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -913,4 +913,9 @@ def close(self): def __del__(self): # Best-effort cleanup to avoid leaking background threads in tests. - self.close() \ No newline at end of file + self.close() + + def __merge(self): + print("merge is happening") + pass + diff --git a/lstore/tests/test_indexing.py b/lstore/tests/test_indexing.py deleted file mode 100644 index b6d4df3..0000000 --- a/lstore/tests/test_indexing.py +++ /dev/null @@ -1,36 +0,0 @@ -from lstore.db import Database -from lstore.query import Query - -import sys -from pathlib import Path - -PROJECT_ROOT = Path(__file__).resolve().parents[2] -if str(PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(PROJECT_ROOT)) - -def test_indexing(): - db = Database() - table = db.create_table('Test', 3, 0) - query = Query(table) - - # Insert some records - query.insert(1, 100, 200) - query.insert(2, 100, 300) # Note: same value in column 1 - query.insert(3, 150, 400) - - # Create secondary index on column 1 - assert table.index.create_index(1) - - # Should find both records with value 100 in column 1 - results = query.select(100, 1, [1, 1, 1]) - assert len(results) == 2 - print(f"Found {len(results)} records") # Should print: Found 2 records - - # Try to create the same index again - should return False - assert not table.index.create_index(1) - - # Drop the index - assert table.index.drop_index(1) - - # Can't drop primary key index - assert not table.index.drop_index(0) diff --git a/test_indexing.py b/test_indexing.py new file mode 100644 index 0000000..839d7d5 --- /dev/null +++ b/test_indexing.py @@ -0,0 +1,28 @@ +from lstore.db import Database +from lstore.query import Query + +db = Database() +table = db.create_table('Test', 3, 0) +query = Query(table) + +# Insert some records +query.insert(1, 100, 200) +query.insert(2, 100, 300) # Note: same value in column 1 +query.insert(3, 150, 400) + +# Create secondary index on column 1 +assert table.index.create_index(1) + +# Should find both records with value 100 in column 1 +results = query.select(100, 1, [1, 1, 1]) +assert len(results) == 2 +print(f"Found {len(results)} records") # Should print: Found 2 records + +# Try to create the same index again - should return False +assert not table.index.create_index(1) + +# Drop the index +assert table.index.drop_index(1) + +# Can't drop primary key index +assert not table.index.drop_index(0) From 7a6d56f6b3626977c40e4c59e08bdaf7a47f50f4 Mon Sep 17 00:00:00 2001 From: flamboh Date: Thu, 20 Nov 2025 15:23:52 -0800 Subject: [PATCH 7/7] Revert "Merge process now runs truly runs contention free on a background thread" This reverts commit d60110c1a16f3cdc22b0f613c85985ab6a7044ef. --- lstore/bufferpool.py | 103 ++++--- lstore/db.py | 5 +- lstore/query.py | 10 +- lstore/table.py | 542 ++++++++++++------------------------- lstore/tests/test_table.py | 27 -- 5 files changed, 221 insertions(+), 466 deletions(-) diff --git a/lstore/bufferpool.py b/lstore/bufferpool.py index 26182a2..e300f3d 100644 --- a/lstore/bufferpool.py +++ b/lstore/bufferpool.py @@ -21,7 +21,6 @@ """ import os from collections import OrderedDict -from threading import RLock from typing import Optional, Tuple from config import Config @@ -45,7 +44,6 @@ def __init__(self, capacity: int = 1000, base_path: str = None): """ self.capacity = capacity self.base_path = Path(base_path) if base_path else None - self._lock = RLock() if self.base_path: self.base_path.mkdir(parents=True, exist_ok=True) @@ -73,21 +71,20 @@ def _make_file_path(self, table_name, range_id, segment, page_index, column_inde def get_page(self, table_name, range_id, segment, page_index, column_index): page_key = self._make_page_key(table_name, range_id, segment, page_index, column_index) - with self._lock: - page = self.cache.get(page_key) + page = self.cache.get(page_key) - if page is not None: - self.cache.move_to_end(page_key) - self.hits += 1 - else: - self.misses += 1 - if len(self.cache) >= self.capacity: - self._evict_page_locked() - page = self._load_page(table_name, range_id, segment, page_index, column_index) - self.cache[page_key] = page + if page is not None: + self.cache.move_to_end(page_key) + self.hits += 1 + else: + self.misses += 1 + if len(self.cache) >= self.capacity: + self._evict_page() + page = self._load_page(table_name, range_id, segment, page_index, column_index) + self.cache[page_key] = page - page.pin() - return page, page_key + page.pin() + return page, page_key def _load_page(self, table_name: str, range_id: int, segment: str, page_index: int, column_index: int) -> Page: @@ -108,7 +105,7 @@ def _load_page(self, table_name: str, range_id: int, segment: str, print(f"Warning: Failed to load page from {file_path}: {e}") return Page() - def _evict_page_locked(self): + def _evict_page(self): """Evict the least recently used unpinned page.""" # Find first unpinned page (from least recently used) for page_key, page in list(self.cache.items()): @@ -130,13 +127,12 @@ def _evict_page_locked(self): def release_page(self, page_key: Tuple) -> None: """Unpin a previously fetched page so it becomes eligible for eviction.""" - with self._lock: - page = self.cache.get(page_key) - if page is None: - raise KeyError(f"Page {page_key} was never fetched") - page.unpin() - if not page.is_pinned(): - self.cache.move_to_end(page_key) + page = self.cache.get(page_key) + if page is None: + raise KeyError(f"Page {page_key} was never fetched") + page.unpin() + if not page.is_pinned(): + self.cache.move_to_end(page_key) @contextmanager def page(self, table_name, range_id, segment, page_index, column_index): @@ -161,51 +157,46 @@ def flush_page(self, table_name: str, range_id: int, segment: str, """Flush a specific page to disk if it's dirty.""" page_key = self._make_page_key(table_name, range_id, segment, page_index, column_index) - with self._lock: - if page_key in self.cache: - page = self.cache[page_key] - if page.dirty: - self._write_page_to_disk(page_key, page) + if page_key in self.cache: + page = self.cache[page_key] + if page.dirty: + self._write_page_to_disk(page_key, page) def discard_table(self, table_name: str): """Remove all cached pages for a table without writing them back to disk.""" - with self._lock: - keys_to_remove = [key for key in self.cache if key[0] == table_name] - for key in keys_to_remove: - page = self.cache[key] - if page.is_pinned(): - raise RuntimeError( - f"Cannot discard table {table_name}: page {key} is currently pinned" - ) - del self.cache[key] + keys_to_remove = [key for key in self.cache if key[0] == table_name] + for key in keys_to_remove: + page = self.cache[key] + if page.is_pinned(): + raise RuntimeError( + f"Cannot discard table {table_name}: page {key} is currently pinned" + ) + del self.cache[key] def flush_all(self): """Write all dirty pages to disk (called on database close).""" - with self._lock: - for page_key, page in list(self.cache.items()): - if page.dirty: - self._write_page_to_disk(page_key, page) + for page_key, page in self.cache.items(): + if page.dirty: + self._write_page_to_disk(page_key, page) def clear(self): """Clear the bufferpool (for testing or shutdown).""" - with self._lock: - self.flush_all() - self.cache.clear() + self.flush_all() + self.cache.clear() def get_stats(self): """Get bufferpool statistics.""" - with self._lock: - total_accesses = self.hits + self.misses - hit_rate = (self.hits / total_accesses * 100) if total_accesses > 0 else 0 - return { - 'capacity': self.capacity, - 'current_size': len(self.cache), - 'hits': self.hits, - 'misses': self.misses, - 'hit_rate': f"{hit_rate:.2f}%", - 'evictions': self.evictions, - 'writes': self.writes, - } + total_accesses = self.hits + self.misses + hit_rate = (self.hits / total_accesses * 100) if total_accesses > 0 else 0 + return { + 'capacity': self.capacity, + 'current_size': len(self.cache), + 'hits': self.hits, + 'misses': self.misses, + 'hit_rate': f"{hit_rate:.2f}%", + 'evictions': self.evictions, + 'writes': self.writes, + } def __repr__(self): stats = self.get_stats() diff --git a/lstore/db.py b/lstore/db.py index 7625e2d..4469463 100644 --- a/lstore/db.py +++ b/lstore/db.py @@ -34,8 +34,6 @@ def open(self, path: str): def close(self): """Flush all pages and save metadata.""" - for table in self.tables.values(): - table.close() if self.bufferpool: self.bufferpool.flush_all() self._save_metadata() @@ -56,8 +54,7 @@ def drop_table(self, name): raise ValueError(f"Table {name} does not exist") # Remove table metadata and cached pages before deleting files. - table = self.tables.pop(name) - table.close() + del self.tables[name] if self.bufferpool: self.bufferpool.discard_table(name) diff --git a/lstore/query.py b/lstore/query.py index d1afc52..215e6ba 100644 --- a/lstore/query.py +++ b/lstore/query.py @@ -221,14 +221,8 @@ def update(self, primary_key, *columns): tail_record = tail_meta + tail_data - # insert tail record and update indices using cached values - tail_rid = self.table.insert_record( - tail_record, - is_tail=True, - base_rid=base_rid, - index_prior_data=current_data, - index_new_data=new_data, - ) + # insert tail record table handles index updates + tail_rid = self.table.insert_record(tail_record, is_tail=True, base_rid=base_rid) return tail_rid is not False except Exception: diff --git a/lstore/table.py b/lstore/table.py index 006ee5d..95ebd1b 100644 --- a/lstore/table.py +++ b/lstore/table.py @@ -1,10 +1,7 @@ from collections import defaultdict from contextlib import ExitStack, contextmanager -from dataclasses import dataclass -import queue -import threading -from time import time, sleep -from typing import Callable, Dict, Optional, Set, Tuple +from time import time +from typing import Optional from config import Config from lstore.bufferpool import Bufferpool @@ -88,21 +85,6 @@ def __repr__(self): f"segment={self.segment}, page={self.page_index}, col={self.column_index})" ) - -@dataclass(frozen=True) -class RangeMergeTask: - range_id: int - start_offset: int - end_offset: int - - -@dataclass -class RangeMergeSnapshot: - task: RangeMergeTask - updates: Dict[Tuple[int, int, int], int] - updated_slots: Set[Tuple[int, int]] - tps_value: int - class PageDirectory: def __init__( self, @@ -111,7 +93,6 @@ def __init__( bufferpool: Bufferpool, num_ranges: int = Config.initial_page_ranges, metadata: Optional[dict] = None, - merge_request_callback: Optional[Callable[[int], None]] = None, ): self.table_name = table_name self.bufferpool = bufferpool @@ -124,9 +105,6 @@ def __init__( self.tail_offsets = defaultdict(int) self.tail_merge_progress = defaultdict(int) self.merge_thresholds = defaultdict(lambda: max(1, Config.records_per_range // 4)) - self._merge_in_progress = defaultdict(int) - self._lock = threading.RLock() - self._merge_request_callback = merge_request_callback if metadata: self._load_metadata(metadata) @@ -157,70 +135,63 @@ def decode_rid(self, rid: int): # Page bookkeeping helpers # ------------------------------------------------------------------ def _get_or_create_range(self, range_id: int) -> dict: - with self._lock: - info = self.page_directory[range_id] - self.num_ranges = max(self.num_ranges, range_id + 1) - return info + info = self.page_directory[range_id] + self.num_ranges = max(self.num_ranges, range_id + 1) + return info def _require_range(self, range_id: int) -> dict: - with self._lock: - if range_id not in self.page_directory: - raise RuntimeError(f"Range {range_id} has no allocated pages") - return self.page_directory[range_id] + if range_id not in self.page_directory: + raise RuntimeError(f"Range {range_id} has no allocated pages") + return self.page_directory[range_id] def _ensure_logical_page(self, range_id: int, segment: int, page_index: int): - with self._lock: - info = self.page_directory[range_id] - self.num_ranges = max(self.num_ranges, range_id + 1) - if segment == 0: - pages = info["base"] - column_count = self._column_count(is_tail=False) - else: - tail_idx = segment - 1 - tail_segments = info["tail"] - while len(tail_segments) <= tail_idx: - tail_segments.append([]) - pages = tail_segments[tail_idx] - column_count = self._column_count(is_tail=True) - - while len(pages) <= page_index: - new_page_index = len(pages) - segment_label = self._segment_label(segment) - logical_page = [ - ColumnPageProxy( - self.bufferpool, - self.table_name, - range_id, - segment_label, - new_page_index, - column_index, - ) - for column_index in range(column_count) - ] - pages.append(logical_page) + info = self._get_or_create_range(range_id) + if segment == 0: + pages = info["base"] + column_count = self._column_count(is_tail=False) + else: + tail_idx = segment - 1 + tail_segments = info["tail"] + while len(tail_segments) <= tail_idx: + tail_segments.append([]) + pages = tail_segments[tail_idx] + column_count = self._column_count(is_tail=True) + + while len(pages) <= page_index: + new_page_index = len(pages) + segment_label = self._segment_label(segment) + logical_page = [ + ColumnPageProxy( + self.bufferpool, + self.table_name, + range_id, + segment_label, + new_page_index, + column_index, + ) + for column_index in range(column_count) + ] + pages.append(logical_page) - return pages[page_index] + return pages[page_index] def _ensure_page_exists_for_read(self, range_id: int, segment: int, page_index: int) -> None: - with self._lock: - info = self.page_directory.get(range_id) - if not info: - raise RuntimeError(f"Range {range_id} has no allocated pages") - if segment == 0: - if page_index >= len(info["base"]): - raise RuntimeError( - f"Base page {page_index} missing for range {range_id}" - ) - else: - tail_idx = segment - 1 - if tail_idx >= len(info["tail"]): - raise RuntimeError( - f"Tail segment {segment} missing for range {range_id}" - ) - if page_index >= len(info["tail"][tail_idx]): - raise RuntimeError( - f"Tail page {page_index} missing for range {range_id}, segment {segment}" - ) + info = self._require_range(range_id) + if segment == 0: + if page_index >= len(info["base"]): + raise RuntimeError( + f"Base page {page_index} missing for range {range_id}" + ) + else: + tail_idx = segment - 1 + if tail_idx >= len(info["tail"]): + raise RuntimeError( + f"Tail segment {segment} missing for range {range_id}" + ) + if page_index >= len(info["tail"][tail_idx]): + raise RuntimeError( + f"Tail page {page_index} missing for range {range_id}, segment {segment}" + ) def _segment_label(self, segment_id: int) -> str: return "base" if segment_id == 0 else f"tail_{segment_id}" @@ -263,56 +234,53 @@ def to_metadata(self) -> dict: def stringify(mapping: defaultdict) -> dict: return {str(k): v for k, v in mapping.items()} - with self._lock: - directory = {} - for range_id, info in self.page_directory.items(): - directory[str(range_id)] = { - "base_pages": len(info["base"]), - "tail_pages": [len(segment) for segment in info["tail"]], - "TPS": info["TPS"], - } - return { - "num_ranges": self.num_ranges, - "num_base_records": self.num_base_records, - "num_tail_records": self.num_tail_records, - "base_offsets": stringify(self.base_offsets), - "tail_offsets": stringify(self.tail_offsets), - "tail_merge_progress": stringify(self.tail_merge_progress), - "merge_thresholds": stringify(self.merge_thresholds), - "page_directory": directory, + directory = {} + for range_id, info in self.page_directory.items(): + directory[str(range_id)] = { + "base_pages": len(info["base"]), + "tail_pages": [len(segment) for segment in info["tail"]], + "TPS": info["TPS"], } + return { + "num_ranges": self.num_ranges, + "num_base_records": self.num_base_records, + "num_tail_records": self.num_tail_records, + "base_offsets": stringify(self.base_offsets), + "tail_offsets": stringify(self.tail_offsets), + "tail_merge_progress": stringify(self.tail_merge_progress), + "merge_thresholds": stringify(self.merge_thresholds), + "page_directory": directory, + } def _load_metadata(self, metadata: dict) -> None: - with self._lock: - self.num_ranges = metadata.get("num_ranges", self.num_ranges) - self.num_base_records = metadata.get("num_base_records", 0) - self.num_tail_records = metadata.get("num_tail_records", 0) + self.num_ranges = metadata.get("num_ranges", self.num_ranges) + self.num_base_records = metadata.get("num_base_records", 0) + self.num_tail_records = metadata.get("num_tail_records", 0) def int_keys(values: dict) -> dict: return {int(k): v for k, v in values.items()} - with self._lock: - self.base_offsets = defaultdict(int, int_keys(metadata.get("base_offsets", {}))) - self.tail_offsets = defaultdict(int, int_keys(metadata.get("tail_offsets", {}))) - self.tail_merge_progress = defaultdict( - int, int_keys(metadata.get("tail_merge_progress", {})) - ) - self.merge_thresholds = defaultdict( - lambda: max(1, Config.records_per_range // 4), - int_keys(metadata.get("merge_thresholds", {})), - ) + self.base_offsets = defaultdict(int, int_keys(metadata.get("base_offsets", {}))) + self.tail_offsets = defaultdict(int, int_keys(metadata.get("tail_offsets", {}))) + self.tail_merge_progress = defaultdict( + int, int_keys(metadata.get("tail_merge_progress", {})) + ) + self.merge_thresholds = defaultdict( + lambda: max(1, Config.records_per_range // 4), + int_keys(metadata.get("merge_thresholds", {})), + ) - self.page_directory = defaultdict(self._new_range_entry) - for range_id_str, info in metadata.get("page_directory", {}).items(): - range_id = int(range_id_str) - range_entry = self._get_or_create_range(range_id) - range_entry["TPS"] = info.get("TPS", 0) - base_pages = info.get("base_pages", 0) - for page_index in range(base_pages): - self._ensure_logical_page(range_id, 0, page_index) - for segment_offset, page_count in enumerate(info.get("tail_pages", []), start=1): - for page_index in range(page_count): - self._ensure_logical_page(range_id, segment_offset, page_index) + self.page_directory = defaultdict(self._new_range_entry) + for range_id_str, info in metadata.get("page_directory", {}).items(): + range_id = int(range_id_str) + range_entry = self._get_or_create_range(range_id) + range_entry["TPS"] = info.get("TPS", 0) + base_pages = info.get("base_pages", 0) + for page_index in range(base_pages): + self._ensure_logical_page(range_id, 0, page_index) + for segment_offset, page_count in enumerate(info.get("tail_pages", []), start=1): + for page_index in range(page_count): + self._ensure_logical_page(range_id, segment_offset, page_index) def add_record( @@ -339,36 +307,33 @@ def add_record( ) if not is_tail: - with self._lock: - range_id = self.num_base_records // Config.records_per_range - offset = self.base_offsets[range_id] - rid = self.encode_rid(range_id, 0, offset) - self.base_offsets[range_id] += 1 - self.num_base_records += 1 + range_id = self.num_base_records // Config.records_per_range + offset = self.base_offsets[range_id] + rid = self.encode_rid(range_id, 0, offset) + self.base_offsets[range_id] += 1 columns[Config.indirection_column] = Config.null_value columns[Config.schema_encoding_column] = 0 else: if base_rid == Config.null_value: raise ValueError("Tail records require a valid base RID") - - schedule_merge = False - with self._lock: - base_range = self.decode_rid(base_rid)[0] + base_range = self.decode_rid(base_rid)[0] + total_offset = self.tail_offsets[base_range] + pending_updates = total_offset - self.tail_merge_progress[base_range] + if pending_updates >= self.merge_thresholds[base_range]: + self.merge_range(base_range) + self.merge_thresholds[base_range] = max(1, self.merge_thresholds[base_range] * 2) total_offset = self.tail_offsets[base_range] - pending_updates = total_offset - self.tail_merge_progress[base_range] - if pending_updates >= self.merge_thresholds[base_range]: - schedule_merge = True - max_capacity = Config.records_per_range * Config.max_tail_segments - if total_offset >= max_capacity: - raise RuntimeError("Tail range is full; merge required before inserting more tail records") + max_capacity = Config.records_per_range * Config.max_tail_segments + if total_offset >= max_capacity: + raise RuntimeError("Tail range is full; merge required before inserting more tail records") - segment_index = total_offset // Config.records_per_range - segment_offset = total_offset % Config.records_per_range - segment_id = 1 + segment_index - - rid = self.encode_rid(base_range, segment_id, segment_offset) + segment_index = total_offset // Config.records_per_range + segment_offset = total_offset % Config.records_per_range + segment_id = 1 + segment_index + rid = self.encode_rid(base_range, segment_id, segment_offset) + self.tail_offsets[base_range] += 1 columns[Config.schema_encoding_column] = self.build_schema_encoding(columns) base_record = self.get_record_from_rid(base_rid) base_indirection = base_record[Config.indirection_column] @@ -376,11 +341,6 @@ def add_record( base_indirection if base_indirection != Config.null_value else base_rid ) columns[Config.base_rid_column] = base_rid - if schedule_merge and self._merge_request_callback: - self._merge_request_callback(base_range) - with self._lock: - self.tail_offsets[base_range] += 1 - self.num_tail_records += 1 columns[Config.timestamp_column] = int(time()) range_id, segment_id, page_index, _ = self.decode_rid(rid) @@ -394,6 +354,11 @@ def add_record( f"Failed to append value to page (range={range_id}, segment={segment_id}, page={page_index})" ) + if not is_tail: + self.num_base_records += 1 + else: + self.num_tail_records += 1 + if is_tail and base_rid != Config.null_value: self.update_base_record(base_rid, columns) @@ -557,149 +522,67 @@ def delete_record(self, rid: int): return True def merge_range(self, range_id): - task = self.prepare_merge_task(range_id, force=True) - if not task: + range_info = self.page_directory.get(range_id) + if not range_info: return False - snapshot = self.build_merge_snapshot(task) - if not snapshot: - self.cancel_merge_task(range_id, task.end_offset) + tail_segments = range_info["tail"] + start_offset = self.tail_merge_progress[range_id] + end_offset = self.tail_offsets[range_id] + if start_offset >= end_offset: return False - return self.commit_merge_snapshot(snapshot) - - def prepare_merge_task(self, range_id: int, force: bool = False) -> Optional[RangeMergeTask]: - with self._lock: - start_offset = self.tail_merge_progress[range_id] - end_offset = self.tail_offsets[range_id] - if start_offset >= end_offset: - return None - - in_progress = self._merge_in_progress.get(range_id, 0) - if in_progress and not force: - return None - - self._merge_in_progress[range_id] = end_offset - return RangeMergeTask(range_id, start_offset, end_offset) - - def cancel_merge_task(self, range_id: int, expected_end: int) -> None: - with self._lock: - if self._merge_in_progress.get(range_id) == expected_end: - self._merge_in_progress.pop(range_id, None) - - def _tps_from_offset(self, range_id: int, end_offset: int) -> int: - if end_offset <= 0: - return 0 - last_offset = end_offset - 1 - last_segment_index = last_offset // Config.records_per_range - last_segment_id = 1 + last_segment_index - last_segment_offset = last_offset % Config.records_per_range - return self.encode_rid(range_id, last_segment_id, last_segment_offset) - - def build_merge_snapshot(self, task: RangeMergeTask) -> Optional[RangeMergeSnapshot]: - if task.start_offset >= task.end_offset: - return None - - range_id = task.range_id - with self._lock: - range_info = self.page_directory.get(range_id) - if not range_info: - return None - tail_segments = [list(segment_pages) for segment_pages in range_info["tail"]] - - updates: Dict[Tuple[int, int, int], int] = {} - updated_slots: Set[Tuple[int, int]] = set() - - records_per_range = Config.records_per_range - records_per_page = Config.records_per_page - - start_segment = task.start_offset // records_per_range - end_segment = (task.end_offset - 1) // records_per_range - - for segment_index in range(start_segment, end_segment + 1): - tail_idx = segment_index - if tail_idx >= len(tail_segments): - break - segment_pages = tail_segments[tail_idx] - segment_start = max(task.start_offset, segment_index * records_per_range) - segment_end = min(task.end_offset, (segment_index + 1) * records_per_range) - segment_id = segment_index + 1 - - start_page = (segment_start - segment_index * records_per_range) // records_per_page - end_page = (segment_end - 1 - segment_index * records_per_range) // records_per_page - - for page_index in range(start_page, min(len(segment_pages), end_page + 1)): - page_start_offset = segment_index * records_per_range + page_index * records_per_page - page_end_offset = page_start_offset + records_per_page - read_start = max(segment_start, page_start_offset) - read_end = min(segment_end, page_end_offset) - if read_start >= read_end: - continue - - try: - self._ensure_page_exists_for_read(range_id, segment_id, page_index) - except RuntimeError: - continue - with self._logical_page(range_id, segment_id, page_index) as tail_pages: + merged_any = False + for segment_index, segment_pages in enumerate(tail_segments, start=1): + for page_index, _ in enumerate(segment_pages): + with self._logical_page(range_id, segment_index, page_index) as tail_pages: if not tail_pages: continue - - slot_start = read_start - page_start_offset - slot_end = read_end - page_start_offset - for tail_slot in range(slot_start, slot_end): + num_records = tail_pages[Config.rid_column].num_records + for tail_slot in range(num_records): tail_columns = [column_page.read(tail_slot) for column_page in tail_pages] + tail_rid = tail_columns[Config.rid_column] base_rid = tail_columns[Config.base_rid_column] if base_rid == Config.null_value: continue - base_range, _, base_page_index, base_slot_index = self.decode_rid(base_rid) - if base_range != range_id: + _, tail_segment, tail_page_index, tail_slot_index = self.decode_rid(tail_rid) + tail_segment_index = max(0, tail_segment - 1) + tail_offset = ( + tail_segment_index * Config.records_per_range + + tail_page_index * Config.records_per_page + + tail_slot_index + ) + if tail_offset < start_offset or tail_offset >= end_offset: continue - data_offset = Config.tail_meta_columns - for column_index in range(self.num_columns): - value = tail_columns[data_offset + column_index] - if value == Config.null_value: - continue - physical_column = Config.base_meta_columns + column_index - updates[(base_page_index, base_slot_index, physical_column)] = value - updated_slots.add((base_page_index, base_slot_index)) - - tps_value = self._tps_from_offset(range_id, task.end_offset) - return RangeMergeSnapshot(task=task, updates=updates, updated_slots=updated_slots, tps_value=tps_value) - - def commit_merge_snapshot(self, snapshot: RangeMergeSnapshot) -> bool: - range_id = snapshot.task.range_id - with self._lock: - if snapshot.task.end_offset <= self.tail_merge_progress[range_id]: - self._merge_in_progress.pop(range_id, None) - return False - zero_schema = self.tail_offsets[range_id] == snapshot.task.end_offset - - for (page_index, slot_index, column_index), value in snapshot.updates.items(): - with self._column(range_id, 0, page_index, column_index) as column_page: - column_page.write_slot(slot_index, value) - - if zero_schema: - for page_index, slot_index in snapshot.updated_slots: - with self._column(range_id, 0, page_index, Config.schema_encoding_column) as schema_page: - schema_page.write_slot(slot_index, 0) - - with self._lock: - self.tail_merge_progress[range_id] = snapshot.task.end_offset - range_info = self.page_directory.get(range_id) - if range_info is not None: - range_info["TPS"] = snapshot.tps_value - max_threshold = max(1, Config.records_per_range // 4) - doubled_threshold = max(1, self.merge_thresholds[range_id] * 2) - self.merge_thresholds[range_id] = min(doubled_threshold, max_threshold) - self._merge_in_progress.pop(range_id, None) - return True + base_range, _, base_page_index, base_slot_index = self.decode_rid(base_rid) + self._ensure_page_exists_for_read(base_range, 0, base_page_index) + with self._logical_page(base_range, 0, base_page_index) as base_pages: + for column_index in range(self.num_columns): + value = tail_columns[Config.tail_meta_columns + column_index] + if value == Config.null_value: + continue + base_column_page = base_pages[Config.base_meta_columns + column_index] + base_column_page.write_slot(base_slot_index, value) + + base_schema_page = base_pages[Config.schema_encoding_column] + base_schema_page.write_slot(base_slot_index, 0) + merged_any = True + + if not merged_any: + return False - def has_inflight_merge(self, range_id: Optional[int] = None) -> bool: - with self._lock: - if range_id is None: - return any(value > 0 for value in self._merge_in_progress.values()) - return self._merge_in_progress.get(range_id, 0) > 0 + self.tail_merge_progress[range_id] = end_offset + if end_offset == 0: + tps_value = 0 + else: + last_offset = end_offset - 1 + last_segment_index = last_offset // Config.records_per_range + last_segment_id = 1 + last_segment_index + last_segment_offset = last_offset % Config.records_per_range + tps_value = self.encode_rid(range_id, last_segment_id, last_segment_offset) + range_info["TPS"] = tps_value + return True class Table: @@ -720,20 +603,15 @@ def __init__( self.key = key self.num_columns = num_columns self.bufferpool = bufferpool or Bufferpool() - self._merge_jobs: queue.Queue[Optional[RangeMergeTask]] = queue.Queue() - self._merge_results: queue.Queue[RangeMergeSnapshot] = queue.Queue() - self._merge_shutdown = threading.Event() self.page_directory = PageDirectory( name, num_columns, self.bufferpool, Config.initial_page_ranges, metadata=directory_metadata, - merge_request_callback=self._request_merge, ) self.index = Index(self) - self._merge_thread = threading.Thread(target=self._merge_loop, name=f"{name}-merger", daemon=True) - self._merge_thread.start() + pass def to_metadata(self) -> dict: return { @@ -791,24 +669,16 @@ def get_cumulative_updated_record(self, rid: int): """ return self.page_directory.get_cumulative_updated_record_from_base_rid(rid) - def insert_record( - self, - columns: list[int], - is_tail: bool = False, - base_rid: int = Config.null_value, - index_prior_data: Optional[list[int]] = None, - index_new_data: Optional[list[int]] = None, - ): + def insert_record(self, columns: list[int], is_tail: bool = False, base_rid: int = Config.null_value): """ Inserts a record into the table :param columns: list[int] - the columns of the record :param base_rid: int - the RID of the base record, only used for tail records :return: int - the RID of the record """ - self._tick_merges() - cached_prior_data = None - if is_tail and base_rid != Config.null_value and index_prior_data is None: - cached_prior_data = self.get_cumulative_updated_record(base_rid)[ + prior_data = None + if is_tail: + prior_data = self.get_cumulative_updated_record(base_rid)[ Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns ] @@ -820,17 +690,10 @@ def insert_record( ] self.index.add(rid, base_data) else: - prior_values = index_prior_data if index_prior_data is not None else cached_prior_data - if prior_values is None: - prior_values = self.get_cumulative_updated_record(base_rid)[ - Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns - ] - new_values = index_new_data - if new_values is None: - new_values = self.get_cumulative_updated_record(base_rid)[ - Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns - ] - self.index.update(base_rid, prior_values, new_values) + updated_data = self.get_cumulative_updated_record(base_rid)[ + Config.tail_meta_columns : Config.tail_meta_columns + self.num_columns + ] + self.index.update(base_rid, prior_data, updated_data) return rid def delete_record(self, rid: int): @@ -839,7 +702,6 @@ def delete_record(self, rid: int): :param rid: int - the RID of the base record :return: bool - whether the record was deleted """ - self._tick_merges() try: base_record = self.page_directory.get_record_from_rid(rid) except RuntimeError: @@ -853,68 +715,6 @@ def delete_record(self, rid: int): except ValueError: return False - def _merge_loop(self): - while not self._merge_shutdown.is_set(): - try: - task = self._merge_jobs.get(timeout=0.1) - except queue.Empty: - continue - - if task is None: - if self._merge_shutdown.is_set(): - break - continue - - try: - snapshot = self.page_directory.build_merge_snapshot(task) - if snapshot: - self._merge_results.put(snapshot) - else: - self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - except Exception: - self.page_directory.cancel_merge_task(task.range_id, task.end_offset) - - def _request_merge(self, range_id: int): - task = self.page_directory.prepare_merge_task(range_id) - if task: - self._merge_jobs.put(task) - - def _apply_ready_merges(self): - applied = 0 - while True: - try: - snapshot = self._merge_results.get_nowait() - except queue.Empty: - break - self.page_directory.commit_merge_snapshot(snapshot) - applied += 1 - return applied - - def _tick_merges(self): - self._apply_ready_merges() - - def wait_for_merges(self, timeout: float = 2.0): - deadline = time() + timeout - while time() < deadline: - self._apply_ready_merges() - if self._merge_jobs.empty() and self._merge_results.empty() and not self.page_directory.has_inflight_merge(): - break - sleep(0.01) - self._apply_ready_merges() - - def close(self): - if self._merge_shutdown.is_set(): - return - self._merge_shutdown.set() - self._merge_jobs.put(None) - if self._merge_thread.is_alive(): - self._merge_thread.join(timeout=1.0) - self.wait_for_merges() - - def __del__(self): - # Best-effort cleanup to avoid leaking background threads in tests. - self.close() - def __merge(self): print("merge is happening") pass diff --git a/lstore/tests/test_table.py b/lstore/tests/test_table.py index 890cd87..84a1d2b 100644 --- a/lstore/tests/test_table.py +++ b/lstore/tests/test_table.py @@ -252,11 +252,9 @@ def insert_tail(value): assert base_data() == [100, 10, 20] insert_tail(55) - grades_table.wait_for_merges() assert base_data() == [100, 10, 20] insert_tail(65) - grades_table.wait_for_merges() assert base_data() == [100, 55, 20] assert directory.merge_thresholds[0] == 2 @@ -281,37 +279,12 @@ def insert_tail(value): insert_tail(50) insert_tail(60) - grades_table.wait_for_merges() assert base_data() == [100, 50, 20] assert directory.merge_thresholds[0] == 2 insert_tail(70) - grades_table.wait_for_merges() assert base_data() == [100, 50, 20] insert_tail(80) - grades_table.wait_for_merges() assert base_data() == [100, 70, 20] assert directory.merge_thresholds[0] == 4 - - -def test_background_merge_applies_updates_without_manual_trigger(): - grades_table = Table("grades", num_columns=3, key=0) - base_rid = grades_table.insert_record( - _build_base_record(grades_table.num_columns, 999, 5, 6), - is_tail=False, - ) - directory = grades_table.page_directory - directory.merge_thresholds[0] = 1 - - tail_record = _build_tail_record(grades_table.num_columns, {1: 123}) - grades_table.insert_record(tail_record, is_tail=True, base_rid=base_rid) - - noop_tail = _build_tail_record(grades_table.num_columns, {}) - grades_table.insert_record(noop_tail, is_tail=True, base_rid=base_rid) - - grades_table.wait_for_merges() - merged_data = grades_table.get_record(base_rid)[ - Config.base_meta_columns : Config.base_meta_columns + grades_table.num_columns - ] - assert merged_data == [999, 123, 6]