-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_scanner_with_node.py
More file actions
64 lines (50 loc) · 2.55 KB
/
test_scanner_with_node.py
File metadata and controls
64 lines (50 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import logging
import sys
import unittest
from unittest import TestCase
from eth_typing import Address
from pandas import DataFrame
from web3 import Web3
from src.local_node.event_scanner import JSONifiedState
from src.local_node.process_state_to_df import ProcessStateToDF
from src.local_node.scanner_runner import ScannerRunner
from src.uniswap_v2_pair_abi import UNISWAP_V2_PAIR_ABI
this_log = logging.getLogger(__name__)
this_log.setLevel(logging.INFO)
this_log.addHandler(logging.StreamHandler(sys.stdout))
class TestScannerWithNode(TestCase, ScannerRunner):
ETH_POOL_UNI_V2_CONTRACT_ADDRESS: Address = "0x85Cb0baB616Fe88a89A35080516a8928F38B518b"
FIRST_BLOCK = 11876000
NODE_URL = "http://1.1.1.1:1111"
ABI = UNISWAP_V2_PAIR_ABI
def test_web_3_sync(self):
w3 = Web3(Web3.HTTPProvider(self.NODE_URL, request_kwargs={'timeout': 60}))
self.assertTrue(w3.isConnected())
contract = w3.eth.contract(address=self.ETH_POOL_UNI_V2_CONTRACT_ADDRESS, abi=self.ABI)
print(contract)
event_filter = contract.events.Sync.createFilter(fromBlock=16478475-1000, toBlock=16478475)
events = event_filter.get_all_entries()
self.assertEqual(len(events), 2)
self.assertEqual(set(events[0].keys()), {'address', 'args', 'blockHash', 'blockNumber', 'event', 'logIndex',
'transactionHash', 'transactionIndex'})
self.assertEqual(events[0]["event"], 'Sync')
self.assertEqual(set(events[0]["args"].keys()), {'reserve0', 'reserve1'})
print(events[0]["args"])
event_filter = contract.events.Swap.createFilter(fromBlock=16478475 - 1000, toBlock=16478475)
swaps = event_filter.get_all_entries()
print(len(swaps))
print(swaps[0]["args"].keys())
@unittest.skip("Using as a work space, runs in 12 hours for ETH-POOL pair")
def test_scanner_runner(self):
ScannerRunner.run_scanner(
"test-state.json", self.FIRST_BLOCK, self.NODE_URL, self.ABI, self.ETH_POOL_UNI_V2_CONTRACT_ADDRESS)
@unittest.skip("Using as a work space")
def testReadJsonState(self):
json_state = JSONifiedState("test-state.json")
json_state.restore()
state = json_state.state
state_df = ProcessStateToDF.process_state(state["blocks"], self)
self.assertTrue(isinstance(state_df, DataFrame))
# store file as needed.
# state_df.to_csv("state/0x85Cb0baB616Fe88a89A35080516a8928F38B518b_df.csv")
state_df.to_pickle("state/0x85Cb0baB616Fe88a89A35080516a8928F38B518b_df.pkl")