diff --git a/README.md b/README.md index e9d3614..b33e310 100644 --- a/README.md +++ b/README.md @@ -2,19 +2,24 @@ [![CI](https://github.com/johnburbridge/python-time-based-experiment/actions/workflows/ci.yml/badge.svg)](https://github.com/johnburbridge/python-time-based-experiment/actions/workflows/ci.yml) -A Python package providing two implementations of a time-based storage system for managing events with timestamps. +A Python package providing two implementations of a time-based storage system for managing events with timestamps. This library is useful for applications that need to efficiently store and query time-based data, such as event logs, time series data, monitoring systems, and schedulers. ## Features - Two storage implementations: - `TimeBasedStorage`: Uses a sorted list for efficient range queries - `TimeBasedStorageHeap`: Uses a heap for efficient insertion and earliest event access +- Thread-safe variants: + - `ThreadSafeTimeBasedStorage`: Thread-safe version of TimeBasedStorage + - `ThreadSafeTimeBasedStorageHeap`: Thread-safe version of TimeBasedStorageHeap - Support for: - Event creation and deletion - Range queries - Duration-based queries - Day-of-week queries - Earliest/latest event access + - Timestamp collision handling + - Generic typing (store any data type) ## Installation @@ -22,41 +27,107 @@ A Python package providing two implementations of a time-based storage system fo pip install time_based_storage ``` -## Usage +## Basic Usage ```python from datetime import datetime, timedelta -from time_based_storage import TimeBasedStorage, TimeBasedStorageHeap, Event +from time_based_storage import TimeBasedStorage, TimeBasedStorageHeap # Create storage instances -storage = TimeBasedStorage() -heap_storage = TimeBasedStorageHeap() +storage = TimeBasedStorage[str]() # Type annotation for stored values +heap_storage = TimeBasedStorageHeap[str]() -# Create events -event1 = Event(timestamp=datetime(2024, 1, 1, 10, 0), data="Event 1") -event2 = Event(timestamp=datetime(2024, 1, 1, 11, 0), data="Event 2") +# Add events with timestamps +storage.add(datetime(2024, 1, 1, 10, 0), "Event 1") +storage.add(datetime(2024, 1, 1, 11, 0), "Event 2") +storage.add(datetime(2024, 1, 1, 12, 0), "Event 3") -# Add events -storage.create_event(event1.timestamp, event1.data) -heap_storage.create_event(event1.timestamp, event1.data) - -# Query events +# Query events in a time range start_time = datetime(2024, 1, 1, 10, 30) end_time = datetime(2024, 1, 1, 11, 30) -events_in_range = storage.get_events_in_range(start_time, end_time) +events_in_range = storage.get_range(start_time, end_time) # Returns ["Event 2"] + +# Query events within a duration (last hour) +duration = 3600 # seconds +recent_events = storage.get_duration(duration) + +# Get all events and timestamps +all_events = storage.get_all() +all_timestamps = storage.get_timestamps() + +# Remove an event +storage.remove(datetime(2024, 1, 1, 11, 0)) # Removes "Event 2" -# Get events within duration -duration = timedelta(hours=1) -recent_events = storage.get_events_by_duration(duration) +# Clear all events +storage.clear() +``` + +## Thread-Safe Usage -# Get events by day of week (0 = Monday, 6 = Sunday) -monday_events = storage.get_events_by_day_of_week(0) +For multithreaded applications, use the thread-safe variants: -# Get earliest/latest events -earliest = heap_storage.get_earliest_event() -latest = storage.get_latest_event() +```python +from time_based_storage import ThreadSafeTimeBasedStorage, ThreadSafeTimeBasedStorageHeap +import threading + +# Create thread-safe storage +storage = ThreadSafeTimeBasedStorage[int]() + +# Use in multiple threads +def producer(): + for i in range(10): + storage.add(datetime.now(), i) + +def consumer(): + # Wait for data with timeout + if storage.wait_for_data(timeout=1.0): + data = storage.get_all() + print(f"Received: {data}") + +# Start threads +producer_thread = threading.Thread(target=producer) +consumer_thread = threading.Thread(target=consumer) +producer_thread.start() +consumer_thread.start() ``` +## Choosing the Right Implementation + +### TimeBasedStorage +- **Best for**: Applications with frequent range queries or sorted access patterns +- **Advantages**: Efficient range queries, direct index access +- **Trade-offs**: Slower insertion (O(n)) + +### TimeBasedStorageHeap +- **Best for**: Applications needing fast insertion or frequent access to earliest events +- **Advantages**: Fast insertion, efficient earliest event access +- **Trade-offs**: Less efficient for range queries + +## API Reference + +### Common Methods (Both Implementations) + +| Method | Description | Time Complexity | +|--------|-------------|-----------------| +| `add(timestamp, value)` | Add a value at a specific timestamp | O(n) / O(log n) | +| `get_value_at(timestamp)` | Get value at a specific timestamp | O(1) / O(n) | +| `get_range(start, end)` | Get values in a time range | O(log n) / O(n log n) | +| `get_duration(seconds)` | Get values within a duration | O(log n) / O(n log n) | +| `remove(timestamp)` | Remove value at a timestamp | O(n) / O(log n) | +| `clear()` | Remove all values | O(1) | +| `size()` | Get number of stored events | O(1) | +| `is_empty()` | Check if storage is empty | O(1) | +| `get_all()` | Get all stored values | O(1) | +| `get_timestamps()` | Get all timestamps | O(1) | +| `add_unique_timestamp()` | Add with timestamp collision handling | Varies | + +### Thread-Safe Additional Methods + +| Method | Description | Notes | +|--------|-------------|-------| +| `wait_for_data(timeout)` | Wait for data to be available | Blocks until data or timeout | +| `notify_data_available()` | Notify waiting threads | Called automatically on add | + ## Performance Characteristics ### TimeBasedStorage @@ -72,12 +143,58 @@ latest = storage.get_latest_event() - Earliest Event: O(1) - Latest Event: O(n log n) +## Use Cases + +This library is well-suited for: + +- Event logging and analysis +- Time series data storage +- Monitoring systems +- Event scheduling +- Message queues with time-based priorities +- Session tracking + +For more detailed use cases for concurrent access scenarios, see [Concurrent Use Cases](time_based_storage/docs/concurrent_use_cases.md). + ## Testing -Run the test suite: +Run the complete test suite: + +```bash +# From the project root +cd time_based_storage +python -m pytest tests/ -v +``` + +## Development + +### Setup Development Environment + +```bash +git clone https://github.com/johnburbridge/python-time-based-experiment.git +cd python-time-based-experiment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +pip install -e time_based_storage/ +pip install pytest black flake8 +``` + +### Code Style + +This project uses: +- Black for code formatting +- Flake8 for linting + +Apply formatting: + +```bash +black time_based_storage/src time_based_storage/tests +``` + +Check style: ```bash -python -m unittest time_based_storage/tests/test_storage.py +flake8 time_based_storage/src time_based_storage/tests ``` ## License diff --git a/time_based_storage/docs/architecture.md b/time_based_storage/docs/architecture.md new file mode 100644 index 0000000..ff86f3c --- /dev/null +++ b/time_based_storage/docs/architecture.md @@ -0,0 +1,163 @@ +# Time-Based Storage Architecture + +This document provides an overview of the architecture, design decisions, and implementation details of the time-based storage library. + +## System Architecture + +The library is organized with a clear separation of concerns: + +``` +time_based_storage/ +├── core/ +│ ├── __init__.py +│ ├── base.py # Base implementation +│ └── heap.py # Heap-based implementation +└── concurrent/ + ├── __init__.py + ├── thread_safe.py # Thread-safe wrapper for base implementation + └── thread_safe_heap.py # Thread-safe wrapper for heap implementation +``` + +### Core Components + +1. **`TimeBasedStorage` (core/base.py)** + - Stores events in a sorted structure + - Optimized for range queries + - Maintains chronological order + +2. **`TimeBasedStorageHeap` (core/heap.py)** + - Uses Python's heapq module for efficient insertion + - Optimized for accessing the earliest event + - Maintains partial ordering based on timestamps + +### Concurrent Components + +1. **`ThreadSafeTimeBasedStorage` (concurrent/thread_safe.py)** + - Thread-safe wrapper around TimeBasedStorage + - Uses locks to ensure thread safety + - Provides waiting/notification mechanism + +2. **`ThreadSafeTimeBasedStorageHeap` (concurrent/thread_safe_heap.py)** + - Thread-safe wrapper around TimeBasedStorageHeap + - Uses locks to ensure thread safety + - Maintains the efficiency of the underlying heap + +## Design Decisions + +### 1. Implementation Variants + +Two different implementations were created to support different access patterns: + +- **List-based implementation**: Prioritizes efficient range queries and timestamp lookups, with O(log n) complexity for these operations but O(n) for insertions. +- **Heap-based implementation**: Prioritizes efficient insertion and earliest event access, with O(log n) complexity for insertions but O(n log n) for range queries. + +This allows users to choose the implementation that best matches their access patterns. + +### 2. Generics Support + +The library uses Python's generic typing to allow storing any type of data: + +```python +storage = TimeBasedStorage[int]() # Store integers +storage = TimeBasedStorage[str]() # Store strings +storage = TimeBasedStorage[dict]() # Store dictionaries +``` + +This provides flexibility while maintaining type safety. + +### 3. Thread Safety as a Wrapper + +Thread safety is implemented as wrappers around the base implementations rather than being built into the core classes. This architecture: + +- Keeps the core implementations simple and focused +- Allows users to choose between thread-safe and non-thread-safe variants +- Separates concurrency concerns from data structure logic +- Makes the code easier to test and maintain + +### 4. Timestamp Collision Handling + +The library provides explicit handling for timestamp collisions: + +- By default, `add()` will raise a ValueError if a timestamp collision occurs +- `add_unique_timestamp()` method automatically adjusts timestamps to ensure uniqueness + +This approach allows users to decide how to handle conflicts rather than silently adopting a strategy. + +## Implementation Details + +### Storage Backend + +Both implementations use Python's built-in data structures: + +1. **TimeBasedStorage**: + - Uses a dictionary (`self.values`) for O(1) lookup by timestamp + - Maintains a sorted list of timestamps (`self.timestamps`) + - Uses binary search for range queries + +2. **TimeBasedStorageHeap**: + - Uses a binary min-heap for fast insertion and earliest event access + - Uses a dictionary for direct timestamp lookup + +### Thread Safety Implementation + +Thread safety is achieved using Python's threading primitives: + +1. **ReentrantLock**: Used to protect shared data structures +2. **Condition Variables**: Used for wait/notify mechanism +3. **Local storage**: Used to prevent race conditions + +Performance considerations: +- Fine-grained locking where possible +- Minimized critical sections +- Read operations can proceed concurrently + +### Error Handling + +The library follows these error handling principles: + +1. **Explicit errors**: Raises specific exceptions rather than returning error codes +2. **Validation**: Input parameters are validated early +3. **Idempotent operations**: Some operations are designed to be safely repeated +4. **Clear error messages**: Error messages clearly indicate the issue + +## Performance Considerations + +### TimeBasedStorage + +- **Space complexity**: O(n) where n is the number of stored events +- **Time complexity**: + - Insertion: O(n) due to maintaining sorted order + - Lookup by timestamp: O(1) + - Range queries: O(log n) using binary search + - Iteration: O(1) for accessing all events + +### TimeBasedStorageHeap + +- **Space complexity**: O(n) +- **Time complexity**: + - Insertion: O(log n) using heapq + - Lookup by timestamp: O(1) using dictionary + - Range queries: O(n log n) + - Earliest event access: O(1) + +## Testing Strategy + +The library employs a comprehensive testing strategy: + +1. **Unit tests** for individual components +2. **Integration tests** for component interactions +3. **Concurrency tests** for thread-safe variants +4. **Stress tests** to ensure performance under load +5. **Edge case tests** for boundary conditions + +## Future Improvements + +Potential areas for enhancement: + +1. **Persistence**: Add support for saving/loading from disk +2. **Advanced indexing**: Support for multiple indexes beyond timestamp +3. **Event expiration**: Automatic removal of old events +4. **Batch operations**: Optimized bulk operations +5. **Distributed storage**: Support for distributed deployment +6. **Time zone handling**: Better support for time zone conversions +7. **Compression**: Data compression for large datasets \ No newline at end of file diff --git a/time_based_storage/docs/examples.py b/time_based_storage/docs/examples.py new file mode 100644 index 0000000..ab72762 --- /dev/null +++ b/time_based_storage/docs/examples.py @@ -0,0 +1,170 @@ +""" +Examples for using the time_based_storage package in various scenarios. +""" + +import time +import threading +from datetime import datetime, timedelta +from time_based_storage import ( + TimeBasedStorage, + TimeBasedStorageHeap, + ThreadSafeTimeBasedStorage, + ThreadSafeTimeBasedStorageHeap +) + + +def example_basic_usage(): + """Demonstrate basic usage of TimeBasedStorage.""" + print("\n=== Basic Usage ===") + + # Create a storage instance with string values + storage = TimeBasedStorage[str]() + + # Add events + now = datetime.now() + storage.add(now - timedelta(minutes=30), "Event from 30 minutes ago") + storage.add(now - timedelta(minutes=20), "Event from 20 minutes ago") + storage.add(now - timedelta(minutes=10), "Event from 10 minutes ago") + storage.add(now, "Current event") + + # Get all events + print(f"Total events: {storage.size()}") + all_events = storage.get_all() + print(f"All events: {all_events}") + + # Get range of events + start_time = now - timedelta(minutes=25) + end_time = now - timedelta(minutes=5) + range_events = storage.get_range(start_time, end_time) + print(f"Events between 25 and 5 minutes ago: {range_events}") + + # Get recent events (within last 15 minutes) + duration = 15 * 60 # 15 minutes in seconds + recent_events = storage.get_duration(duration) + print(f"Events in the last 15 minutes: {recent_events}") + + # Remove an event + removal_time = now - timedelta(minutes=20) + storage.remove(removal_time) + print(f"After removal, events: {storage.get_all()}") + + +def example_timestamp_collision_handling(): + """Demonstrate how to handle timestamp collisions.""" + print("\n=== Timestamp Collision Handling ===") + + storage = TimeBasedStorage[str]() + + # Create a timestamp + timestamp = datetime(2024, 1, 1, 12, 0, 0) + + # Add first event + storage.add(timestamp, "First event") + print(f"Added event at {timestamp}") + + try: + # Try to add another event with the same timestamp + storage.add(timestamp, "Second event") + except ValueError as e: + print(f"Error: {e}") + + # Use add_unique_timestamp to handle collisions + unique_timestamp = storage.add_unique_timestamp(timestamp, "Second event") + print(f"Added with unique timestamp: {unique_timestamp}") + + # Get all timestamps + all_timestamps = storage.get_timestamps() + print(f"All timestamps: {all_timestamps}") + + # Get all values + all_values = storage.get_all() + print(f"All values: {all_values}") + + +def example_thread_safe_storage(): + """Demonstrate usage of thread-safe storage with multiple threads.""" + print("\n=== Thread-Safe Storage ===") + + # Create thread-safe storage + storage = ThreadSafeTimeBasedStorage[int]() + event = threading.Event() + + def producer(): + """Add values to the storage.""" + for i in range(5): + timestamp = datetime.now() + storage.add(timestamp, i) + print(f"Producer: Added {i} at {timestamp}") + time.sleep(0.5) + event.set() # Signal consumer to stop + + def consumer(): + """Read values from the storage.""" + while not event.is_set(): + if storage.wait_for_data(timeout=0.2): + values = storage.get_all() + timestamps = storage.get_timestamps() + print(f"Consumer: Current values: {values}") + print(f"Consumer: Total entries: {len(timestamps)}") + else: + print("Consumer: No new data") + + # Start threads + producer_thread = threading.Thread(target=producer) + consumer_thread = threading.Thread(target=consumer) + + producer_thread.start() + consumer_thread.start() + + # Wait for threads to complete + producer_thread.join() + consumer_thread.join() + + +def example_event_monitoring_system(): + """Demonstrate a practical use case: event monitoring system.""" + print("\n=== Event Monitoring System Example ===") + + monitor = TimeBasedStorageHeap[dict]() + + # Simulate monitoring events with different priorities + events = [ + {"type": "INFO", "message": "System started", "priority": 1}, + {"type": "WARNING", "message": "High CPU usage", "priority": 2}, + {"type": "ERROR", "message": "Database connection failed", "priority": 3}, + {"type": "INFO", "message": "User logged in", "priority": 1}, + {"type": "CRITICAL", "message": "Out of memory", "priority": 4}, + ] + + # Add events with timestamps + now = datetime.now() + for i, event in enumerate(events): + # Simulate events happening at different times + timestamp = now - timedelta(minutes=10) + timedelta(minutes=i*2) + monitor.add(timestamp, event) + + # Get all events + all_events = monitor.get_all() + print("All monitoring events:") + for event in all_events: + print(f"- [{event['type']}] {event['message']} (Priority: {event['priority']})") + + # Get high priority events (WARNING, ERROR, CRITICAL) + high_priority = [e for e in all_events if e['priority'] >= 2] + print("\nHigh priority events:") + for event in high_priority: + print(f"- [{event['type']}] {event['message']} (Priority: {event['priority']})") + + # Get most recent event (last 2 minutes) + duration = 2 * 60 # 2 minutes in seconds + recent = monitor.get_duration(duration) + print("\nMost recent events (last 2 minutes):") + for event in recent: + print(f"- [{event['type']}] {event['message']}") + + +if __name__ == "__main__": + example_basic_usage() + example_timestamp_collision_handling() + example_thread_safe_storage() + example_event_monitoring_system() \ No newline at end of file