diff --git a/Gemfile b/Gemfile index f53504c..c3c4d9f 100644 --- a/Gemfile +++ b/Gemfile @@ -6,7 +6,7 @@ ruby "3.2.2" gem "rails", "~> 7.1.2" # Use postgresql as the database for Active Record -gem "pg", "~> 1.1" +gem "pg", "1.6.2" # Use the Puma web server [https://github.com/puma/puma] gem "puma", ">= 5.0" @@ -76,10 +76,14 @@ gem "httparty", "~> 0.21.0" gem "order_query", "~> 0.5.3" -gem 'facet_rails_common', git: 'https://github.com/0xfacet/facet_rails_common.git' +gem 'facet_rails_common', git: 'https://github.com/0xfacet/facet_rails_common.git', ref: 'lenient_base64' gem "cbor", "~> 0.5.9" +gem "net-http-persistent", "~> 4.0" + +gem "concurrent-ruby", "~> 1.2" + gem 'rswag-api' gem 'rswag-ui' diff --git a/Gemfile.lock b/Gemfile.lock index fe1bf41..9b2a5b2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,6 +1,7 @@ GIT remote: https://github.com/0xfacet/facet_rails_common.git - revision: dd72807b5e51dc6fd7969f320cbb9bedfa92c4a5 + revision: 52b9b5e34183028d095b6e8fc0f1126bfb703f97 + ref: lenient_base64 specs: facet_rails_common (0.1.0) order_query (~> 0.5.3) @@ -94,8 +95,8 @@ GEM rbtree3 (~> 0.6) ast (2.4.2) awesome_print (1.9.2) - base64 (0.2.0) - bigdecimal (3.1.5) + base64 (0.3.0) + bigdecimal (3.3.1) bootsnap (1.17.0) msgpack (~> 1.2) builder (3.2.4) @@ -105,8 +106,8 @@ GEM activesupport tzinfo coderay (1.1.3) - concurrent-ruby (1.2.2) - connection_pool (2.4.1) + concurrent-ruby (1.3.5) + connection_pool (3.0.2) crass (1.0.6) dalli (3.2.6) date (3.3.4) @@ -118,8 +119,7 @@ GEM dotenv-rails (2.8.1) dotenv (= 2.8.1) railties (>= 3.2) - drb (2.2.0) - ruby2_keywords + drb (2.2.3) erubi (1.12.0) eth (0.5.11) forwardable (~> 1.3) @@ -140,7 +140,7 @@ GEM multi_xml (>= 0.5.2) httpparty (0.2.0) httparty (> 0) - i18n (1.14.1) + i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.7.1) irb (1.10.1) @@ -175,10 +175,12 @@ GEM method_source (1.0.0) mini_mime (1.1.5) mini_portile2 (2.8.5) - minitest (5.20.0) + minitest (5.26.2) msgpack (1.7.2) multi_xml (0.6.0) - mutex_m (0.2.0) + mutex_m (0.3.0) + net-http-persistent (4.0.8) + connection_pool (>= 2.2.4, < 4) net-imap (0.4.8) date net-protocol @@ -196,13 +198,15 @@ GEM nokogiri (1.15.5-x86_64-linux) racc (~> 1.4) openssl (3.2.0) - order_query (0.5.3) - activerecord (>= 5.0, < 7.2) - activesupport (>= 5.0, < 7.2) + order_query (0.5.6) + activerecord (>= 5.0, < 8.2) + activesupport (>= 5.0, < 8.2) parser (3.2.2.4) ast (~> 2.4.1) racc - pg (1.5.4) + pg (1.6.2-aarch64-linux) + pg (1.6.2-arm64-darwin) + pg (1.6.2-x86_64-linux) pkg-config (1.5.6) pry (0.14.2) coderay (~> 1.1) @@ -294,7 +298,6 @@ GEM rswag-ui (2.13.0) actionpack (>= 3.1, < 7.2) railties (>= 3.1, < 7.2) - ruby2_keywords (0.0.5) rubyzip (2.3.2) scout_apm (5.3.5) parser @@ -303,7 +306,7 @@ GEM stackprof (0.2.25) stringio (3.1.0) thor (1.3.0) - timeout (0.4.1) + timeout (0.4.4) tzinfo (2.0.6) concurrent-ruby (~> 1.0) webrick (1.8.1) @@ -327,6 +330,7 @@ DEPENDENCIES cbor (~> 0.5.9) clipboard clockwork (~> 3.0) + concurrent-ruby (~> 1.2) dalli (~> 3.2) debug dotenv-rails (~> 2.8) @@ -336,8 +340,9 @@ DEPENDENCIES httpparty (~> 0.2.0) kaminari (~> 1.2) memoist (~> 0.16.2) + net-http-persistent (~> 4.0) order_query (~> 0.5.3) - pg (~> 1.1) + pg (= 1.6.2) pry puma (>= 5.0) rack-cors (~> 2.0) diff --git a/app/models/eth_block.rb b/app/models/eth_block.rb index 3028b3b..cbb0934 100644 --- a/app/models/eth_block.rb +++ b/app/models/eth_block.rb @@ -19,18 +19,23 @@ class BlockNotReadyToImportError < StandardError; end primary_key: :block_number, inverse_of: :eth_block end - + before_validation :generate_attestation_hash, if: -> { imported_at.present? } - - def self.ethereum_client - @_ethereum_client ||= begin - client_class = ENV.fetch('ETHEREUM_CLIENT_CLASS', 'AlchemyClient').constantize - - client_class.new( - api_key: ENV['ETHEREUM_CLIENT_API_KEY'], - base_url: ENV.fetch('ETHEREUM_CLIENT_BASE_URL') - ) - end + + def self.rpc_client + @_rpc_client ||= PooledRpcClient.new( + base_url: ENV.fetch('ETHEREUM_CLIENT_BASE_URL'), + api_key: ENV['ETHEREUM_CLIENT_API_KEY'] + ) + end + + def self.prefetcher + @_prefetcher ||= L1RpcPrefetcher.new(ethereum_client: rpc_client) + end + + def self.reset_prefetcher! + @_prefetcher&.shutdown + @_prefetcher = nil end def self.beacon_client @@ -65,81 +70,47 @@ def self.blocks_behind (cached_global_block_number - next_block_to_import) + 1 end - def self.import_batch_size - [blocks_behind, ENV.fetch('BLOCK_IMPORT_BATCH_SIZE', 2).to_i].min - end - def self.import_blocks_until_done + blocks_imported = 0 + start_time = Time.current + total_ethscriptions = 0 + loop do begin - block_numbers = EthBlock.next_blocks_to_import(import_batch_size) - - if block_numbers.blank? - raise BlockNotReadyToImportError.new("Block not ready") + block_number = next_block_to_import + raise BlockNotReadyToImportError.new("No block to import") if block_number.nil? + + response = prefetcher.fetch(block_number) + result = import_block( + block_number, + response[:block_response], + response[:relevant_transactions] + ) + + total_ethscriptions += result.ethscriptions_imported + blocks_imported += 1 + prefetcher.clear_older_than(block_number - 10) + + if blocks_imported % 100 == 0 + elapsed = Time.current - start_time + rate = (blocks_imported / elapsed).round(1) + stats = prefetcher.stats + puts "Imported #{blocks_imported} blocks (#{rate} bl/s) | #{total_ethscriptions} ethscriptions | Prefetcher: #{stats[:fulfilled]}/#{stats[:fulfilled] + stats[:pending]} ready" end - - EthBlock.import_blocks(block_numbers) - rescue BlockNotReadyToImportError => e + + rescue BlockNotReadyToImportError, L1RpcPrefetcher::BlockFetchError => e puts "#{e.message}. Stopping import." break end end end - - def self.import_next_block - next_block_to_import.tap do |block| - import_blocks([block]) - end - end - - def self.import_blocks(block_numbers) - logger.info "Block Importer: importing blocks #{block_numbers.join(', ')}" - start = Time.current - _blocks_behind = blocks_behind - - block_by_number_promises = block_numbers.map do |block_number| - Concurrent::Promise.execute do - [block_number, ethereum_client.get_block(block_number)] - end - end - - receipts_promises = block_numbers.map do |block_number| - Concurrent::Promise.execute do - [ - block_number, - ethereum_client.get_transaction_receipts( - block_number, - blocks_behind: _blocks_behind - ) - ] - end - end - - block_by_number_responses = block_by_number_promises.map(&:value!).sort_by(&:first) - receipts_responses = receipts_promises.map(&:value!).sort_by(&:first) - - res = [] - - block_by_number_responses.zip(receipts_responses).each do |(block_number1, block_by_number_response), (block_number2, receipts_response)| - raise "Mismatched block numbers: #{block_number1} and #{block_number2}" unless block_number1 == block_number2 - res << import_block(block_number1, block_by_number_response, receipts_response) - end - - blocks_per_second = (block_numbers.length / (Time.current - start)).round(2) - puts "Imported #{res.map(&:ethscriptions_imported).sum} ethscriptions" - puts "Imported #{block_numbers.length} blocks. #{blocks_per_second} blocks / s" - - block_numbers - end - - def self.import_block(block_number, block_by_number_response, receipts_response) - ActiveRecord::Base.transaction do - validate_ready_to_import!(block_by_number_response, receipts_response) + def self.import_block(block_number, block_by_number_response, relevant_transactions) + ActiveRecord::Base.transaction do result = block_by_number_response['result'] - + parent_block = EthBlock.find_by(block_number: block_number - 1) - + if (block_number > genesis_blocks.max) && parent_block.blockhash != result['parentHash'] Airbrake.notify(" Reorg detected: #{block_number}, @@ -147,12 +118,15 @@ def self.import_block(block_number, block_by_number_response, receipts_response) #{result['parentHash']}, Deleting block(s): #{EthBlock.where("block_number >= ?", parent_block.block_number).pluck(:block_number).join(', ')} ") - + EthBlock.where("block_number >= ?", parent_block.block_number).delete_all - + + # Clear prefetcher cache - it has stale data from the old chain + reset_prefetcher! + return OpenStruct.new(ethscriptions_imported: 0) end - + block_record = create!( block_number: block_number, blockhash: result['hash'], @@ -161,59 +135,26 @@ def self.import_block(block_number, block_by_number_response, receipts_response) timestamp: result['timestamp'].to_i(16), is_genesis_block: genesis_blocks.include?(block_number) ) - - receipts = receipts_response['result']['receipts'] - - tx_record_instances = result['transactions'].map do |tx| - current_receipt = receipts.detect { |receipt| receipt['transactionHash'] == tx['hash'] } - - gas_price = current_receipt['effectiveGasPrice'].to_i(16).to_d - gas_used = current_receipt['gasUsed'].to_i(16).to_d - transaction_fee = gas_price * gas_used - - EthTransaction.new( - block_number: block_record.block_number, - block_timestamp: block_record.timestamp, - block_blockhash: block_record.blockhash, - transaction_hash: tx['hash'], - from_address: tx['from'], - to_address: tx['to'], - created_contract_address: current_receipt['contractAddress'], - transaction_index: tx['transactionIndex'].to_i(16), - input: tx['input'], - status: current_receipt['status']&.to_i(16), - logs: current_receipt['logs'], - gas_price: gas_price, - gas_used: gas_used, - transaction_fee: transaction_fee, - value: tx['value'].to_i(16).to_d, - blob_versioned_hashes: tx['blobVersionedHashes'].presence || [] - ) - end - - possibly_relevant = tx_record_instances.select(&:possibly_relevant?) - - if possibly_relevant.present? - EthTransaction.import!(possibly_relevant) - + + ethscriptions_imported = 0 + + if relevant_transactions.present? + EthTransaction.import!(relevant_transactions) + eth_transactions = EthTransaction.where(block_number: block_number).order(transaction_index: :asc) - eth_transactions.each(&:process!) - + ethscriptions_imported = eth_transactions.map(&:ethscription).compact.size end - + EthTransaction.prune_transactions(block_number) - Token.process_block(block_record) - block_record.create_attachments_for_previous_block - block_record.update!(imported_at: Time.current) - + puts "Block Importer: imported block #{block_number}" - - OpenStruct.new(ethscriptions_imported: ethscriptions_imported.to_i) + + OpenStruct.new(ethscriptions_imported: ethscriptions_imported) end rescue ActiveRecord::RecordNotUnique => e if e.message.include?("eth_blocks") && e.message.include?("block_number") @@ -261,7 +202,7 @@ def create_attachments_for_previous_block end def self.uncached_global_block_number - ethereum_client.get_block_number.tap do |block_number| + rpc_client.get_block_number.tap do |block_number| Rails.cache.write('global_block_number', block_number, expires_in: 1.second) end end @@ -270,18 +211,6 @@ def self.cached_global_block_number Rails.cache.read('global_block_number') || uncached_global_block_number end - def self.validate_ready_to_import!(block_by_number_response, receipts_response) - is_ready = block_by_number_response.present? && - block_by_number_response.dig('result', 'hash').present? && - receipts_response.present? && - receipts_response.dig('error', 'code') != -32600 && - receipts_response.dig('error', 'message') != "Block being processed - please try again later" - - unless is_ready - raise BlockNotReadyToImportError.new("Block not ready") - end - end - def self.next_block_to_import next_blocks_to_import(1).first end @@ -299,15 +228,15 @@ def self.next_blocks_to_import(n) (max_db_block + 1..max_db_block + n).to_a end - + def generate_attestation_hash hash = Digest::SHA256.new - + self.parent_state_hash = EthBlock.where(block_number: block_number - 1). limit(1).pluck(:state_hash).first - + hash << parent_state_hash.to_s - + hash << hashable_attributes.map do |attr| send(attr) end.to_json @@ -321,16 +250,16 @@ def generate_attestation_hash self.state_hash = "0x" + hash.hexdigest end - + delegate :quoted_hashable_attributes, :associations_to_hash, to: :class def hashable_attributes self.class.hashable_attributes(self.class) end - + def check_attestation_hash current_hash = state_hash - + current_hash == generate_attestation_hash && parent_state_hash == EthBlock.find_by(block_number: block_number - 1)&.generate_attestation_hash ensure @@ -344,10 +273,10 @@ def association_scope(association) def self.associations_to_hash reflect_on_all_associations(:has_many).sort_by(&:name) end - + def self.all_hashable_attrs classes = [self, associations_to_hash.map(&:klass)].flatten - + classes.map(&:column_names).flatten.uniq.sort - [ 'state_hash', 'parent_state_hash', @@ -357,7 +286,7 @@ def self.all_hashable_attrs 'imported_at' ] end - + def self.hashable_attributes(klass) (all_hashable_attrs & klass.column_names).sort end diff --git a/config/main_importer_clock.rb b/config/main_importer_clock.rb index b1a010d..7655f39 100644 --- a/config/main_importer_clock.rb +++ b/config/main_importer_clock.rb @@ -3,6 +3,14 @@ require './config/environment' require 'active_support/time' +# Ensure prefetcher shuts down cleanly on process exit +at_exit do + puts "Shutting down prefetcher..." + EthBlock.reset_prefetcher! +rescue => e + puts "Error shutting down prefetcher: #{e.message}" +end + module Clockwork handler do |job| puts "Running #{job}" @@ -10,9 +18,9 @@ module Clockwork error_handler do |error| report_exception_every = 15.minutes - + exception_key = ["clockwork-airbrake", error.class, error.message, error.backtrace[0]].to_cache_key - + last_reported_at = Rails.cache.read(exception_key) if last_reported_at.blank? || (Time.zone.now - last_reported_at > report_exception_every) diff --git a/lib/alchemy_client.rb b/lib/alchemy_client.rb deleted file mode 100644 index 984bab8..0000000 --- a/lib/alchemy_client.rb +++ /dev/null @@ -1,90 +0,0 @@ -class AlchemyClient - attr_accessor :base_url, :api_key - - def initialize(base_url: ENV['ETHEREUM_CLIENT_BASE_URL'], api_key:) - self.base_url = base_url.chomp('/') - self.api_key = api_key - end - - def get_block(block_number) - query_api( - method: 'eth_getBlockByNumber', - params: ['0x' + block_number.to_s(16), true] - ) - end - - def get_transaction_receipts(block_number, blocks_behind: nil) - use_individual = ENV.fetch('ETHEREUM_NETWORK') == "eth-sepolia" && - blocks_behind.present? && - blocks_behind < 5 - - if use_individual - get_transaction_receipts_individually(block_number) - else - get_transaction_receipts_batch(block_number) - end - end - - def get_transaction_receipts_batch(block_number) - query_api( - method: 'alchemy_getTransactionReceipts', - params: [{ blockNumber: "0x" + block_number.to_s(16) }] - ) - end - - def get_transaction_receipts_individually(block_number) - block_info = query_api( - method: 'eth_getBlockByNumber', - params: ['0x' + block_number.to_s(16), false] - ) - - transactions = block_info['result']['transactions'] - - receipts = transactions.map do |transaction| - Concurrent::Promise.execute do - get_transaction_receipt(transaction)['result'] - end - end.map(&:value!) - - { - 'id' => 1, - 'jsonrpc' => '2.0', - 'result' => { - 'receipts' => receipts - } - } - end - - def get_transaction_receipt(transaction_hash) - query_api( - method: 'eth_getTransactionReceipt', - params: [transaction_hash] - ) - end - - def get_block_number - query_api(method: 'eth_blockNumber')['result'].to_i(16) - end - - private - - def query_api(method:, params: []) - data = { - id: 1, - jsonrpc: "2.0", - method: method, - params: params - } - - url = [base_url, api_key].join('/') - - HTTParty.post(url, body: data.to_json, headers: headers).parsed_response - end - - def headers - { - 'Accept' => 'application/json', - 'Content-Type' => 'application/json' - } - end -end diff --git a/lib/ethscription_test_helper.rb b/lib/ethscription_test_helper.rb index 3c799b2..de71f7b 100644 --- a/lib/ethscription_test_helper.rb +++ b/lib/ethscription_test_helper.rb @@ -1,11 +1,12 @@ module EthscriptionTestHelper def self.create_from_hash(hash) - resp = AlchemyClient.query_api( + client = EthBlock.rpc_client + resp = client.send(:query_api, method: 'eth_getTransactionByHash', params: [hash] )['result'] - - resp2 = AlchemyClient.query_api( + + resp2 = client.send(:query_api, method: 'eth_getTransactionReceipt', params: [hash] )['result'] diff --git a/lib/l1_rpc_prefetcher.rb b/lib/l1_rpc_prefetcher.rb new file mode 100644 index 0000000..60969b1 --- /dev/null +++ b/lib/l1_rpc_prefetcher.rb @@ -0,0 +1,225 @@ +class L1RpcPrefetcher + class BlockFetchError < StandardError; end + + def initialize(ethereum_client:, + ahead: ENV.fetch('L1_PREFETCH_FORWARD', 100).to_i, + threads: ENV.fetch('L1_PREFETCH_THREADS', 10).to_i) + @eth = ethereum_client + @ahead = ahead + @threads = threads + @fetch_timeout = ENV.fetch('L1_PREFETCH_TIMEOUT', 30).to_i + + @pool = Concurrent::FixedThreadPool.new(threads) + @promises = Concurrent::Map.new + @last_chain_tip = @eth.get_block_number + @highest_queued = 0 # Track highest block we've queued + + Rails.logger.info "L1RpcPrefetcher initialized: #{threads} threads, #{ahead} blocks ahead, #{@fetch_timeout}s timeout" + end + + # Proactively queue blocks for prefetching - optimized to avoid redundant work + def ensure_prefetched(from_block) + # Skip if we've already queued far enough ahead + return if @highest_queued >= from_block + @ahead + + latest = cached_chain_tip(from_block) + to_block = [from_block + @ahead, latest].min + + # Only queue blocks we haven't queued yet + start_block = [@highest_queued + 1, from_block].max + return if start_block > to_block + + (start_block..to_block).each { |n| enqueue_single(n) } + @highest_queued = to_block + end + + # Get block data, waiting if necessary + def fetch(block_number) + # Only ensure_prefetched every 10 blocks to reduce overhead + ensure_prefetched(block_number) if block_number % 10 == 0 || !@promises.key?(block_number) + + promise = @promises[block_number] || enqueue_single(block_number) + + begin + result = promise.value!(@fetch_timeout) + rescue => e + @promises.delete(block_number) + raise BlockFetchError.new("Block #{block_number} fetch failed: #{e.class} - #{e.message}") + end + + if result.nil? + @promises.delete(block_number) + raise BlockFetchError.new("Block #{block_number} fetch timed out after #{@fetch_timeout}s") + end + + if result == :not_ready + @promises.delete(block_number) + raise BlockFetchError.new("Block #{block_number} not yet available on L1") + end + + result + end + + # Memory management - remove old promises + def clear_older_than(min_keep) + return if min_keep.nil? + + deleted = 0 + @promises.keys.each do |n| + if n < min_keep + @promises.delete(n) + deleted += 1 + end + end + + Rails.logger.debug "[PREFETCH] Cleared #{deleted} promises older than #{min_keep}" if deleted > 0 + end + + # Statistics for monitoring + def stats + fulfilled = 0 + pending = 0 + + @promises.each_pair do |_, promise| + if promise.fulfilled? + fulfilled += 1 + elsif promise.pending? + pending += 1 + end + end + + { + promises_total: @promises.size, + fulfilled: fulfilled, + pending: pending, + threads_active: @pool.length, + threads_queued: @pool.queue_length + } + end + + # Graceful shutdown + def shutdown + Rails.logger.info "[PREFETCH] Shutting down..." + @pool.shutdown + terminated = @pool.wait_for_termination(3) + @pool.kill unless terminated + + @promises.clear + + if terminated + Rails.logger.info "[PREFETCH] Thread pool shut down successfully" + else + Rails.logger.warn "[PREFETCH] Shutdown timed out after 3s, pool killed" + end + + terminated + rescue => e + Rails.logger.error "[PREFETCH] Error during shutdown: #{e.message}" + false + end + + private + + def enqueue_single(block_number) + @promises.compute_if_absent(block_number) do + Concurrent::Promise.execute(executor: @pool) do + fetch_job(block_number) + end.rescue do |e| + Rails.logger.error "[PREFETCH] Block #{block_number}: #{e.class} - #{e.message}" + @promises.delete(block_number) + raise e + end + end + end + + def fetch_job(block_number) + block_response = @eth.get_block(block_number) + + # Handle case where block doesn't exist yet + if block_response.nil? || block_response.dig('result', 'hash').nil? + return :not_ready + end + + receipts_response = @eth.get_transaction_receipts(block_number) + receipts = receipts_response.dig('result', 'receipts') + result = block_response['result'] + tx_count = result['transactions'].size + + unless receipts + Rails.logger.warn "[PREFETCH] Block #{block_number}: receipts missing" + return :not_ready + end + + # Validate receipts count matches transactions - incomplete receipts can cause lost ethscriptions + if receipts.size != tx_count + Rails.logger.warn "[PREFETCH] Block #{block_number}: receipts count mismatch (#{receipts.size} vs #{tx_count} txs)" + return :not_ready + end + + block_timestamp = result['timestamp'].to_i(16) + block_blockhash = result['hash'] + + relevant_transactions = build_relevant_transactions( + block_number: block_number, + block_timestamp: block_timestamp, + block_blockhash: block_blockhash, + transactions: result['transactions'], + receipts: receipts + ) + + { + block_number: block_number, + block_response: block_response, + receipts_response: receipts_response, + relevant_transactions: relevant_transactions + } + end + + def build_relevant_transactions(block_number:, block_timestamp:, block_blockhash:, transactions:, receipts:) + receipts_by_hash = receipts.index_by { |r| r['transactionHash'] } + + tx_instances = transactions.map do |tx| + receipt = receipts_by_hash[tx['hash']] + next unless receipt + + gas_price = receipt['effectiveGasPrice'].to_i(16).to_d + gas_used = receipt['gasUsed'].to_i(16).to_d + + # Pre-Byzantium blocks (before 4370000) didn't have status field + status_value = block_number <= 4370000 ? nil : receipt['status']&.to_i(16) + + EthTransaction.new( + block_number: block_number, + block_timestamp: block_timestamp, + block_blockhash: block_blockhash, + transaction_hash: tx['hash'], + from_address: tx['from'], + to_address: tx['to'], + created_contract_address: receipt['contractAddress'], + transaction_index: tx['transactionIndex'].to_i(16), + input: tx['input'], + status: status_value, + logs: receipt['logs'], + gas_price: gas_price, + gas_used: gas_used, + transaction_fee: gas_price * gas_used, + value: tx['value'].to_i(16).to_d, + blob_versioned_hashes: tx['blobVersionedHashes'].presence || [] + ) + end.compact + + tx_instances.select(&:possibly_relevant?) + end + + def cached_chain_tip(from_block) + distance = @last_chain_tip - from_block + + if distance > 10 + # Far from tip, use cached value + @last_chain_tip + else + # Near tip, refresh + @last_chain_tip = @eth.get_block_number + end + end +end diff --git a/lib/pooled_rpc_client.rb b/lib/pooled_rpc_client.rb new file mode 100644 index 0000000..3cf322c --- /dev/null +++ b/lib/pooled_rpc_client.rb @@ -0,0 +1,91 @@ +require 'net/http/persistent' +require 'json' + +class PooledRpcClient + class RpcError < StandardError; end + + MAX_RETRIES = 3 + RETRY_DELAY = 0.5 + + attr_reader :base_url + + def initialize(base_url:, api_key: nil, pool_size: 20) + @base_url = base_url.chomp('/') + @api_key = api_key + + url = [@base_url, @api_key].compact.join('/') + @uri = URI.parse(url) + + @http = Net::HTTP::Persistent.new(name: "eth_rpc_#{@uri.host}", pool_size: pool_size) + @http.open_timeout = 5 + @http.read_timeout = 15 + @http.idle_timeout = 30 + + @request_id = Concurrent::AtomicFixnum.new(0) + end + + def get_block(block_number) + query_api( + method: 'eth_getBlockByNumber', + params: ['0x' + block_number.to_s(16), true] + ) + end + + def get_transaction_receipts(block_number, **_options) + result = query_api( + method: 'eth_getBlockReceipts', + params: ['0x' + block_number.to_s(16)] + )['result'] + + { + 'id' => 1, + 'jsonrpc' => '2.0', + 'result' => { + 'receipts' => result + } + } + end + + def get_block_number + query_api(method: 'eth_blockNumber')['result'].to_i(16) + end + + private + + def query_api(method:, params: []) + retries = 0 + + begin + request = Net::HTTP::Post.new(@uri.request_uri) + request['Content-Type'] = 'application/json' + request['Accept'] = 'application/json' + request.body = { + id: @request_id.increment, + jsonrpc: '2.0', + method: method, + params: params + }.to_json + + response = @http.request(@uri, request) + result = JSON.parse(response.body) + + if result['error'] + raise RpcError, "RPC error: #{result['error']['message']} (code: #{result['error']['code']})" + end + + result + rescue Net::HTTP::Persistent::Error, Errno::ECONNRESET, Errno::ETIMEDOUT, + Errno::ECONNREFUSED, SocketError, Timeout::Error, RpcError => e + retries += 1 + if retries <= MAX_RETRIES + sleep(RETRY_DELAY * retries) + retry + end + Rails.logger.error "[PooledRpcClient] #{method} failed after #{MAX_RETRIES} retries: #{e.class} - #{e.message}" + raise + rescue JSON::ParserError => e + Rails.logger.error "[PooledRpcClient] Invalid JSON response: #{e.message}" + raise + end + end +end diff --git a/lib/universal_client.rb b/lib/universal_client.rb deleted file mode 100644 index e8fa055..0000000 --- a/lib/universal_client.rb +++ /dev/null @@ -1,59 +0,0 @@ -class UniversalClient - attr_accessor :base_url, :api_key - - def initialize(base_url: ENV['ETHEREUM_CLIENT_BASE_URL'], api_key: nil) - self.base_url = base_url.chomp('/') - self.api_key = api_key - end - - def headers - { - 'Accept' => 'application/json', - 'Content-Type' => 'application/json' - } - end - - def query_api(method:, params: []) - data = { - id: 1, - jsonrpc: '2.0', - method: method, - params: params - } - url = [base_url, api_key].join('/') - HTTParty.post(url, body: data.to_json, headers: headers).parsed_response - end - - def get_block(block_number) - query_api( - method: 'eth_getBlockByNumber', - params: ['0x' + block_number.to_s(16), true] - ) - end - - def get_transaction_receipt(transaction_hash) - query_api( - method: 'eth_getTransactionReceipt', - params: [transaction_hash] - ) - end - - def get_transaction_receipts(block_number, blocks_behind: nil) - receipts = query_api( - method: 'eth_getBlockReceipts', - params: ["0x" + block_number.to_s(16)] - )['result'] - - { - 'id' => 1, - 'jsonrpc' => '2.0', - 'result' => { - 'receipts' => receipts - } - } - end - - def get_block_number - query_api(method: 'eth_blockNumber')['result'].to_i(16) - end -end