-
Notifications
You must be signed in to change notification settings - Fork 25
Prefetcher #151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Prefetcher #151
Conversation
…lock model to use PooledRpcClient and L1RpcPrefetcher for improved block fetching; remove AlchemyClient and UniversalClient; add new PooledRpcClient and L1RpcPrefetcher classes for enhanced performance and reliability.
…ock model; implement association handling for hashable attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a new RPC pipeline with a prefetching mechanism to improve block import performance and reliability. The implementation replaces the existing Alchemy-specific client with a generic pooled RPC client and adds a multi-threaded prefetcher that queues block data ahead of import time.
Changes:
- Introduced
PooledRpcClientusing connection pooling vianet-http-persistentwith retry logic and configurable timeouts - Implemented
L1RpcPrefetcherwith concurrent block fetching using thread pools, prefetch queue management, and transaction filtering - Refactored
EthBlockto use single-block sequential imports with prefetcher integration, reorg detection with prefetcher reset, and removed batch import and attestation hash logic - Added graceful shutdown handling via
at_exithook in clockwork configuration - Removed legacy
AlchemyClientandUniversalClientclasses - Updated dependencies: pinned
pgto 1.6.2, addednet-http-persistentandconcurrent-ruby, updatedfacet_rails_commonreference
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| lib/pooled_rpc_client.rb | New generic RPC client with connection pooling, retry logic, and timeout handling |
| lib/l1_rpc_prefetcher.rb | New prefetcher with thread pool, concurrent block fetching, and transaction filtering |
| app/models/eth_block.rb | Refactored to use prefetcher, single-block imports, removed batch logic and attestation hashing |
| lib/ethscription_test_helper.rb | Updated to use new rpc_client instead of AlchemyClient |
| config/main_importer_clock.rb | Added at_exit handler for graceful prefetcher shutdown |
| Gemfile | Added net-http-persistent and concurrent-ruby dependencies, pinned pg version |
| Gemfile.lock | Updated dependency versions including concurrent-ruby, connection_pool, and various transitive dependencies |
| lib/alchemy_client.rb | Removed legacy Alchemy-specific client |
| lib/universal_client.rb | Removed legacy generic client |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| resp = client.send(:query_api, | ||
| method: 'eth_getTransactionByHash', | ||
| params: [hash] | ||
| )['result'] | ||
| resp2 = AlchemyClient.query_api( | ||
|
|
||
| resp2 = client.send(:query_api, | ||
| method: 'eth_getTransactionReceipt', | ||
| params: [hash] |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling a private method query_api using send bypasses encapsulation and makes the test code fragile to internal implementation changes. The PooledRpcClient class should expose a public method for getting transaction details by hash, or the test helper should use a different approach that doesn't rely on private methods.
| rescue Net::HTTP::Persistent::Error, Errno::ECONNRESET, Errno::ETIMEDOUT, | ||
| Errno::ECONNREFUSED, SocketError, Timeout::Error, RpcError => e | ||
| retries += 1 | ||
| if retries <= MAX_RETRIES | ||
| sleep(RETRY_DELAY * retries) | ||
| retry |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry logic includes RpcError in the list of retryable exceptions at line 78, but RPC errors from the server (like invalid parameters or method not found) are typically not transient and shouldn't be retried. Only network-related errors should trigger retries. Consider separating RpcError into retriable and non-retriable categories, or remove RpcError from the retry list entirely.
| def ensure_prefetched(from_block) | ||
| # Skip if we've already queued far enough ahead | ||
| return if @highest_queued >= from_block + @ahead | ||
|
|
||
| latest = cached_chain_tip(from_block) | ||
| to_block = [from_block + @ahead, latest].min | ||
|
|
||
| # Only queue blocks we haven't queued yet | ||
| start_block = [@highest_queued + 1, from_block].max | ||
| return if start_block > to_block | ||
|
|
||
| (start_block..to_block).each { |n| enqueue_single(n) } | ||
| @highest_queued = to_block |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The @highest_queued variable is read at line 23 and written at line 33 without any synchronization. In a concurrent environment where multiple threads might call ensure_prefetched or fetch simultaneously, this could lead to race conditions where blocks are queued multiple times or the tracking gets out of sync. Consider using an atomic variable or proper synchronization for @highest_queued.
| gas_used = receipt['gasUsed'].to_i(16).to_d | ||
|
|
||
| # Pre-Byzantium blocks (before 4370000) didn't have status field | ||
| status_value = block_number <= 4370000 ? nil : receipt['status']&.to_i(16) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The block number 4370000 represents the Byzantium hard fork block, but it's hardcoded without explanation. This magic number should be defined as a named constant (e.g., BYZANTIUM_BLOCK = 4370000) to make the code more maintainable and self-documenting. The comment mentions "Pre-Byzantium blocks" but having a constant would make this clearer.
| { | ||
| 'id' => 1, | ||
| 'jsonrpc' => '2.0', | ||
| 'result' => { | ||
| 'receipts' => result | ||
| } | ||
| } |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method wraps the receipts in an extra structure with hardcoded id: 1, which differs from the actual request ID that might have been used. This wrapper structure appears to mimic the old client's response format for backward compatibility, but the hardcoded ID could cause confusion when debugging request/response pairs. Consider either returning the actual response structure or documenting why this wrapping is necessary.
| end.rescue do |e| | ||
| Rails.logger.error "[PREFETCH] Block #{block_number}: #{e.class} - #{e.message}" | ||
| @promises.delete(block_number) | ||
| raise e | ||
| end |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the rescue block, the code logs an error, deletes the promise from the map, and then re-raises the exception. However, the re-raised exception will bubble up through the Promise's error handling, potentially causing the error to be logged twice - once here and once when the promise is consumed. Consider whether the promise deletion and re-raise are both necessary, or if the error should be wrapped in a Promise rejection instead.
lib/l1_rpc_prefetcher.rb
Outdated
| ) | ||
| t3 = Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
|
|
||
| tx_count = result['transactions'].size |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable tx_count is assigned twice - once at line 152 and again at line 178. The first assignment from line 152 is used in the receipt count validation at line 160, but then it's unnecessarily reassigned at line 178 before being used in the logging statement. The second assignment should be removed to avoid confusion and unnecessary computation.
| response = @http.request(@uri, request) | ||
| result = JSON.parse(response.body) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code doesn't check the HTTP response status code before attempting to parse the JSON body. Non-200 responses (like 500, 502, 503 errors) may not contain valid JSON, which will cause JSON::ParserError to be raised. The code should check response.is_a?(Net::HTTPSuccess) before parsing and raise an appropriate error for failed HTTP responses.
| begin | ||
| result = promise.value!(@fetch_timeout) | ||
| rescue => e | ||
| @promises.delete(block_number) | ||
| raise BlockFetchError.new("Block #{block_number} fetch failed: #{e.class} - #{e.message}") | ||
| end |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a bare rescue (equivalent to rescue StandardError) is too broad here. The code catches any exception during promise value retrieval and converts it to a BlockFetchError, but this could mask programming errors like NoMethodError or ArgumentError that indicate bugs in the code. Consider being more specific about which exceptions to catch, or at least document why all StandardError subclasses should be converted to BlockFetchError.
lib/l1_rpc_prefetcher.rb
Outdated
| t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
|
|
||
| block_response = @eth.get_block(block_number) | ||
| t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assignment to t1 is useless, since its value is never read.
…ming measurements and logging for improved clarity and performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 8 out of 9 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (1)
lib/l1_rpc_prefetcher.rb:1
- The magic number
10for cache retention should be extracted to a named constant likeCACHE_RETENTION_WINDOWto improve maintainability and make the cache management policy explicit.
class L1RpcPrefetcher
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| block_record.create_attachments_for_previous_block | ||
|
|
||
| block_record.update!(imported_at: Time.current) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The database trigger check_block_order_on_update() requires state_hash to be set when imported_at is not null (line 64-65 in migration). The removed before_validation :generate_attestation_hash callback was responsible for setting this. Without it, the update!(imported_at: Time.current) call will fail with a Postgres exception. You need to add block_record.generate_attestation_hash before this line, or restore the before_validation callback with the condition if: -> { imported_at.present? }.
| rescue Net::HTTP::Persistent::Error, Errno::ECONNRESET, Errno::ETIMEDOUT, | ||
| Errno::ECONNREFUSED, SocketError, Timeout::Error, RpcError => e |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retrying on RpcError will retry all RPC-level errors, including non-transient errors like invalid parameters or non-existent methods. RPC errors should be categorized, and only transient errors (like rate limits or temporary unavailability) should be retried. Non-transient errors should fail immediately to avoid wasting retry attempts on errors that won't succeed.
| # Get block data, waiting if necessary | ||
| def fetch(block_number) | ||
| # Only ensure_prefetched every 10 blocks to reduce overhead | ||
| ensure_prefetched(block_number) if block_number % 10 == 0 || !@promises.key?(block_number) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 10 should be extracted to a named constant like PREFETCH_CHECK_INTERVAL to improve code clarity and make it easier to tune this parameter.
| gas_used = receipt['gasUsed'].to_i(16).to_d | ||
|
|
||
| # Pre-Byzantium blocks (before 4370000) didn't have status field | ||
| status_value = block_number <= 4370000 ? nil : receipt['status']&.to_i(16) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 4370000 (Byzantium hard fork block) should be extracted to a named constant like BYZANTIUM_FORK_BLOCK to document its significance and improve code readability.
| at_exit do | ||
| puts "Shutting down prefetcher..." | ||
| EthBlock.reset_prefetcher! | ||
| rescue => e | ||
| puts "Error shutting down prefetcher: #{e.message}" | ||
| end |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The at_exit hook may not fire reliably during signal-based termination (SIGTERM/SIGKILL). Consider adding explicit signal handlers for SIGTERM and SIGINT to ensure graceful shutdown of the prefetcher, especially in production environments where processes are typically terminated with signals.
| def self.create_from_hash(hash) | ||
| resp = AlchemyClient.query_api( | ||
| client = EthBlock.rpc_client | ||
| resp = client.send(:query_api, |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using send to call a private method :query_api violates encapsulation. Consider making query_api public or adding a dedicated test-only method like query_api_for_testing that delegates to the private method, making the test interface explicit.
| if blocks_imported % 100 == 0 | ||
| elapsed = Time.current - start_time | ||
| rate = (blocks_imported / elapsed).round(1) | ||
| stats = prefetcher.stats | ||
| puts "Imported #{blocks_imported} blocks (#{rate} bl/s) | #{total_ethscriptions} ethscriptions | Prefetcher: #{stats[:fulfilled]}/#{stats[:fulfilled] + stats[:pending]} ready" | ||
| end |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 100 for logging frequency should be extracted to a named constant like PROGRESS_LOG_INTERVAL to make it configurable and improve code clarity.
| response = @http.request(@uri, request) | ||
| result = JSON.parse(response.body) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no check for HTTP response status codes. Non-2xx responses (e.g., 429 rate limit, 503 service unavailable) should be handled explicitly before attempting to parse JSON, as they may return HTML error pages or non-JSON responses that will raise JSON::ParserError.
…ion hash when imported_at is present, enhancing data integrity.
Note
Introduces a pooled Ethereum RPC client with threaded prefetching and refactors block import to use it.
PooledRpcClientandL1RpcPrefetcherfor concurrent RPC fetching, caching, stats, and graceful shutdownEthBlockto use new RPC client/prefetcher; import one block at a time with periodic stats, cache pruning, and reorg handling that resets the prefetcherat_exithook inconfig/main_importer_clock.rbto shut down the prefetcher cleanlylib/alchemy_client.rb,lib/universal_client.rb); update test helper to use new clientpgto1.6.2, addnet-http-persistentandconcurrent-ruby, updatefacet_rails_commonrefWritten by Cursor Bugbot for commit 0cdf777. This will update automatically on new commits. Configure here.