-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
320 lines (295 loc) · 11.8 KB
/
test.py
File metadata and controls
320 lines (295 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import asyncio
import threading
import json
import secrets
import tornado.ioloop
import tornado.platform.asyncio
import tornado.web
from pykeccak import Keccak256
from wallet import private_key_to_address
CHAIN_ID_HEX = "0x7a69" # 31337
_pending_counter = 1
pending_sign_requests = []
pending_lock = threading.Lock()
accounts = []
accounts_lock = threading.Lock()
default_account_index = None
mnemonic_groups = []
with open("bip39_english.txt", "r", encoding="utf-8") as f:
MNEMONIC_WORDS = [word.strip() for word in f.readlines() if word.strip()]
def _create_mnemonic(num_words=12):
return " ".join(secrets.choice(MNEMONIC_WORDS) for _ in range(num_words))
def _normalize_hex_key(value):
if value.startswith("0x"):
value = value[2:]
return value.lower()
def _derive_private_key_from_mnemonic(mnemonic, account_index):
hasher = Keccak256()
hasher.update(f"{mnemonic}|{account_index}".encode("utf-8"))
return "0x" + hasher.digest().hex()
def _add_account_from_private_key(priv_key_hex, source, account_type="private_key", mnemonic_group=None, mnemonic_index=None):
global default_account_index
priv_key_hex = "0x" + _normalize_hex_key(priv_key_hex)
address = private_key_to_address(priv_key_hex)
with accounts_lock:
for existing in accounts:
if existing["address"] == address:
return address, False
accounts.append(
{
"address": address,
"private_key": priv_key_hex,
"source": source,
"type": account_type,
"mnemonic_group": mnemonic_group,
"mnemonic_index": mnemonic_index,
}
)
if default_account_index is None:
default_account_index = 0
return address, True
def _add_mnemonic_group(mnemonic):
with accounts_lock:
mnemonic_groups.append({"mnemonic": mnemonic, "next_index": 1})
return len(mnemonic_groups)
def _add_account_from_mnemonic_group(group_id):
if group_id < 1 or group_id > len(mnemonic_groups):
raise ValueError("Invalid mnemonic group")
group = mnemonic_groups[group_id - 1]
account_index = group["next_index"]
priv_key = _derive_private_key_from_mnemonic(group["mnemonic"], account_index)
address, added = _add_account_from_private_key(
priv_key,
f"mnemonic {group_id}.{account_index}",
account_type="mnemonic",
mnemonic_group=group_id,
mnemonic_index=account_index,
)
if added:
group["next_index"] += 1
return address, added, account_index
def _format_account_label(entry, fallback_index):
if entry.get("type") == "mnemonic":
group_id = entry.get("mnemonic_group")
account_index = entry.get("mnemonic_index")
if group_id is not None and account_index is not None:
return f"{group_id}.{account_index}"
return str(fallback_index)
def _find_account_index_from_selector(selector):
if "." in selector:
parts = selector.split(".", 1)
if len(parts) != 2:
return None
if not parts[0].isdigit() or not parts[1].isdigit():
return None
group_id = int(parts[0])
account_index = int(parts[1])
for idx, entry in enumerate(accounts):
if (
entry.get("type") == "mnemonic"
and entry.get("mnemonic_group") == group_id
and entry.get("mnemonic_index") == account_index
):
return idx
return None
if selector.isdigit():
return int(selector)
return None
def _get_accounts_ordered():
with accounts_lock:
if not accounts:
return []
if default_account_index is None:
return [entry["address"] for entry in accounts]
default_entry = accounts[default_account_index]
others = [entry["address"] for idx, entry in enumerate(accounts) if idx != default_account_index]
return [default_entry["address"]] + others
class MainHandler(tornado.web.RequestHandler):
# def get(self):
# print(f"Received GET request from {self.request.remote_ip}")
# print(f"Headers: {self.request.headers}")
# print(f"Arguments: {self.request.arguments}")
# self.write("Hello, this is a GET response from Tornado server.")
def post(self):
# Log POST request info
# print(f"Received POST request from {self.request.remote_ip}")
# print(f"Headers: {self.request.headers}")
print(f"Body: {self.request.body}")
# print(f"Arguments: {self.request.arguments}")
body = self.request.body
if isinstance(body, bytes):
body = body.decode('utf-8')
jsonrpc_req = json.loads(body)
result = {"jsonrpc": "2.0", "id": jsonrpc_req['id'], "result": []}
if jsonrpc_req['method'] == 'eth_accounts':
result["result"] = _get_accounts_ordered()
elif jsonrpc_req['method'] == 'eth_requestAccounts':
result["result"] = _get_accounts_ordered()
elif jsonrpc_req['method'] == 'eth_chainId':
result["result"] = CHAIN_ID_HEX
elif jsonrpc_req['method'] in ('eth_sendTransaction', 'eth_signTransaction'):
params = jsonrpc_req.get("params") or []
tx = params[0] if params else {}
global _pending_counter
with pending_lock:
pending_sign_requests.append(
{
"pending_id": _pending_counter,
"method": jsonrpc_req['method'],
"tx": tx,
}
)
_pending_counter += 1
result["result"] = "0x" + "0" * 64
else:
print(jsonrpc_req['method'])
self.set_header("Content-Type", "application/json")
self.write(tornado.escape.json_encode(result))
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
def start_server_in_thread(port):
ready = threading.Event()
state = {}
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tornado.platform.asyncio.AsyncIOMainLoop().install()
app = make_app()
app.listen(port)
ioloop = tornado.ioloop.IOLoop.current()
state["ioloop"] = ioloop
ready.set()
print(f"Tornado server starting on http://127.0.0.1:{port}/")
ioloop.start()
loop.close()
thread = threading.Thread(target=run, daemon=True)
thread.start()
ready.wait()
return thread, state["ioloop"]
def cli_loop(ioloop):
global default_account_index
print("Menu:")
print("1) Import private key")
print("2) Import mnemonic")
print("3) Create mnemonic")
print("4) Add mnemonic account")
print("5) List accounts")
print("6) Set default account")
print("7) List pending sign requests")
print("8) Quit")
while True:
try:
line = input("> ").strip()
except (EOFError, KeyboardInterrupt):
break
if line in ("8", "quit", "exit"):
break
if line == "1":
priv_key = input("Private key hex: ").strip()
if not priv_key:
print("No private key provided.")
continue
try:
address, added = _add_account_from_private_key(priv_key, "private_key")
except Exception as exc:
print(f"Invalid private key: {exc}")
continue
if added:
print(f"Imported account: {address}")
else:
print(f"Account already exists: {address}")
elif line == "2":
mnemonic = input("Mnemonic: ").strip()
if not mnemonic:
print("No mnemonic provided.")
continue
group_id = _add_mnemonic_group(mnemonic)
address, added, account_index = _add_account_from_mnemonic_group(group_id)
if added:
print(f"Imported account: {address} (mnemonic {group_id}.{account_index})")
else:
print(f"Account already exists: {address}")
elif line == "3":
mnemonic = _create_mnemonic()
print(f"Created mnemonic: {mnemonic}")
group_id = _add_mnemonic_group(mnemonic)
address, added, account_index = _add_account_from_mnemonic_group(group_id)
if added:
print(f"Imported account: {address} (mnemonic {group_id}.{account_index})")
else:
print(f"Account already exists: {address}")
elif line == "4":
if not mnemonic_groups:
print("No mnemonic groups. Import a mnemonic first.")
continue
for idx, group in enumerate(mnemonic_groups, start=1):
print(f"{idx}: mnemonic group (next index {group['next_index']})")
choice = input("Select mnemonic group: ").strip()
if not choice.isdigit():
print("Invalid group.")
continue
group_id = int(choice)
try:
address, added, account_index = _add_account_from_mnemonic_group(group_id)
except ValueError as exc:
print(str(exc))
continue
if added:
print(f"Added account: {address} (mnemonic {group_id}.{account_index})")
else:
print(f"Account already exists: {address}")
elif line == "5":
with accounts_lock:
if not accounts:
print("No accounts.")
continue
for idx, entry in enumerate(accounts):
default_marker = " (default)" if default_account_index == idx else ""
label = _format_account_label(entry, idx)
print(f"{label}: {entry['address']} [{entry['source']}] {default_marker}")
elif line == "6":
with accounts_lock:
if not accounts:
print("No accounts to select.")
continue
for idx, entry in enumerate(accounts):
default_marker = " (default)" if default_account_index == idx else ""
label = _format_account_label(entry, idx)
print(f"{label}: {entry['address']} {default_marker}")
choice = input("Select index (e.g. 2.1): ").strip()
account_idx = _find_account_index_from_selector(choice)
if account_idx is None:
print("Invalid index.")
continue
with accounts_lock:
if account_idx < 0 or account_idx >= len(accounts):
print("Index out of range.")
continue
default_account_index = account_idx
print(f"Default account set to {accounts[default_account_index]['address']}")
elif line == "7":
with pending_lock:
if not pending_sign_requests:
print("No pending sign requests.")
continue
for entry in pending_sign_requests:
tx = entry["tx"] or {}
to_addr = tx.get("to", "")
from_addr = tx.get("from", "")
value = tx.get("value", "")
data = tx.get("data", "")
print(
f"{entry['pending_id']}: {entry['method']} from={from_addr} to={to_addr} "
f"value={value} data={data}"
)
elif line:
print("Unknown option.")
if ioloop is not None:
ioloop.add_callback(ioloop.stop)
if __name__ == "__main__":
port = 5333
server_thread, server_ioloop = start_server_in_thread(port)
cli_loop(server_ioloop)
server_thread.join(timeout=2)