-
Notifications
You must be signed in to change notification settings - Fork 1
Reorg aware streaming support #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Classes: StreamingResultIterator, ReorgAwareStream, BlockRange, BatchMetadata, ResponseBatch, ResponseBatchType, ResponseBatchWithReorg, ResumeWatermark - Not yet wired up with user callable functions
- Add load_stream_continous() for streaming with reorg handling - Enhance LoadResult to be prepared to handle reorgs
- Add query_and_load_streaming() to Client - Use query_and_load_streaming() in QueryBuilder.load() if stream=True
- Add test_reorg_result_string_representation - Enhance test_all_loaders_implement_required_methods to check whether it has real implementation in each data loader - Remove now redundant test test_create_table_from_schema_not_just_pass
| iceberg_table = self._catalog.load_table(table_identifier) | ||
| except NoSuchTableError: | ||
| self.logger.warning(f"Table '{table_identifier}' does not exist, skipping reorg handling") | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't a full table scan be problematic for very large datasets?
| row_mask = pa.array([j == i for j in range(current_table.num_rows)]) | ||
| keep_mask = pa.compute.and_(keep_mask, pa.compute.invert(row_mask)) | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very inn-efficient, crates huge arrays unneccesarily. A simple bool flag list would work here instead
incrypto32
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM apart from the full table scans for Iceberg and Deltalake loader and the deletion logic in Deltalake loader.
I'll add the commits to fix that
Signed-off-by: Ford <ford@edgeandnode.com>
Signed-off-by: Ford <ford@edgeandnode.com>
|
I'm going to go ahead and merge this and consider the specific loader implementations to be in beta (needing optimization and hardening). |
Reorg Aware Streaming Support for amp-python
Summary
This PR adds comprehensive blockchain reorganization (reorg) handling and streaming support to the amp-python client library. It enables real-time data streaming with automatic handling of blockchain reorganizations across all supported data loaders.
Key Features
1. Streaming Infrastructure
ResponseBatchWithReorgtype for streaming data and reorg eventsBlockRangemetadata tracking for multi-network support2. Enhanced Client API
load()method in QueryBuilder with streaming supportstreamparameter enables continuous data streaminghandle_reorgsparameter3. Universal Reorg Support
4. Metadata Architecture
_meta_block_rangescolumn across all loaders[{"network": "ethereum", "start": 100, "end": 110}]Changes by Component
Core Library (552 lines added)
src/amp/client.py: Enhanced QueryBuilder.load() with streaming supportsrc/amp/loaders/base.py: Addedload_stream_continuous()and_handle_reorg()src/amp/streaming/: New module with iterator, types, and reorg handlingData Loaders (740 lines added)
Testing (2,063 lines added)
Documentation (355 lines added)
Usage Example
Commits