-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_daemon.py
More file actions
77 lines (64 loc) · 2.12 KB
/
test_daemon.py
File metadata and controls
77 lines (64 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
import asyncio
import websockets
import json
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
async def subscribe_and_listen(uri, subscription_type, id):
"""Connect to WebSocket server and subscribe to events."""
async with websockets.connect(uri) as websocket:
# Subscribe to the event
subscribe_msg = {
"action": "subscribe",
"type": subscription_type,
"id": id
}
logging.info(f"Subscribing to {subscription_type} {id}")
await websocket.send(json.dumps(subscribe_msg))
response = await websocket.recv()
logging.info(f"Subscription response: {response}")
# Keep listening for events
try:
while True:
message = await websocket.recv()
logging.info(f"Received event: {message}")
except websockets.exceptions.ConnectionClosed:
logging.info("Connection closed")
async def test_multiple_subscriptions():
"""Test multiple concurrent subscriptions."""
uri = "ws://localhost:8080"
tasks = []
# Test cases
subscriptions = [
("invoice", "inv_123"),
("account", "acc_456"),
("address", "addr_789"),
("invoice", "inv_999")
]
# Create tasks for each subscription
for sub_type, sub_id in subscriptions:
task = asyncio.create_task(
subscribe_and_listen(uri, sub_type, sub_id)
)
tasks.append(task)
# Wait for all tasks
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
logging.info("Test terminated by user")
except Exception as e:
logging.error(f"Error during test: {e}")
def main():
logging.info("Starting WebSocket daemon test")
try:
asyncio.run(test_multiple_subscriptions())
except KeyboardInterrupt:
logging.info("Test terminated by user")
except Exception as e:
logging.error(f"Test failed: {e}")
if __name__ == "__main__":
main()