-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_connection.py
More file actions
99 lines (83 loc) · 3.22 KB
/
test_connection.py
File metadata and controls
99 lines (83 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3
"""
Test WebSocket connection to TankTank Hyperfy world
"""
import asyncio
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from src.hyperfy.auth import create_hyperfy_auth
from src.hyperfy.connector import create_hyperfy_connector
async def test_connection():
"""Test connection to TankTank world"""
print("🚀 Testing Connection to TankTank World")
print("=" * 50)
# Load configuration
try:
import config
test_config = {
key: getattr(config, key)
for key in dir(config)
if not key.startswith('_')
}
except ImportError:
print("❌ Could not load config.py")
return False
print(f"🌐 World URL: {test_config.get('HYPERFY_WORLD_URL')}")
try:
# Create authentication
print("1. Setting up authentication...")
auth = create_hyperfy_auth(test_config)
token_info = auth.get_token_info()
if token_info:
print(f" ✅ Authenticated as User ID: {token_info['user_id']}")
else:
print(" ❌ Authentication failed")
return False
# Create connector
print("2. Creating WebSocket connector...")
connector = create_hyperfy_connector(test_config, auth)
# Set up basic message handler
async def handle_message(message):
msg_type = message.get("type", "unknown")
print(f" 📨 Received: {msg_type}")
if msg_type == "chat":
username = message.get("username", "Unknown")
text = message.get("text", "")
print(f" 💬 {username}: {text}")
connector.add_message_handler("chat", handle_message)
connector.add_message_handler("userJoined", handle_message)
connector.add_message_handler("userLeft", handle_message)
print("3. Attempting connection...")
# Try to connect
if await connector.connect():
print(" ✅ Connected successfully!")
# Send a test message
await asyncio.sleep(1)
await connector.send_chat_message("Hello TankTank! I'm Eliza, testing my connection 🤖")
# Listen for a few seconds
print("4. Listening for messages (10 seconds)...")
for i in range(10):
message = await connector.receive_message()
if message:
await connector.handle_message(message)
await asyncio.sleep(1)
print("5. Disconnecting...")
await connector.disconnect()
print(" ✅ Disconnected successfully")
return True
else:
print(" ❌ Connection failed")
return False
except Exception as e:
print(f" ❌ Error: {e}")
return False
if __name__ == "__main__":
success = asyncio.run(test_connection())
print("\n" + "=" * 50)
if success:
print("🎉 Connection test successful!")
print("Ready to run the full agent!")
else:
print("❌ Connection test failed")
sys.exit(0 if success else 1)