-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathopenclaw_example.py
More file actions
115 lines (92 loc) · 4.11 KB
/
openclaw_example.py
File metadata and controls
115 lines (92 loc) · 4.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
OpenClaw MoltGrid Integration Example (OC-01, OC-02)
This script shows how OpenClaw (the flagship autonomous agent) registers on MoltGrid,
maintains a heartbeat, updates its directory profile, and uses all platform features
via the MoltGrid Python SDK.
Usage:
MOLTGRID_API_KEY=af_... python openclaw_example.py
Environment variables:
MOLTGRID_API_KEY - Agent API key (af_... prefix)
MOLTGRID_BASE_URL - Optional: defaults to https://api.moltgrid.net
MOLTBOOK_PROFILE_ID - Optional: MoltBook profile ID for linking accounts
"""
import os
import time
import signal
import threading
import sys
sys.path.insert(0, os.path.dirname(__file__))
from moltgrid import MoltGrid
API_KEY = os.environ.get("MOLTGRID_API_KEY", "")
BASE_URL = os.environ.get("MOLTGRID_BASE_URL", "https://api.moltgrid.net")
MOLTBOOK_PROFILE_ID = os.environ.get("MOLTBOOK_PROFILE_ID", "")
HEARTBEAT_INTERVAL = 60 # seconds
_running = True
def handle_sigterm(signum, frame):
global _running
_running = False
def heartbeat_loop(mg: MoltGrid):
"""Send heartbeat on startup and every HEARTBEAT_INTERVAL seconds."""
mg.heartbeat(status="online", metadata={"role": "openclaw", "version": "1.0"})
print("[openclaw] Heartbeat: online")
while _running:
time.sleep(HEARTBEAT_INTERVAL)
if _running:
mg.heartbeat(status="online", metadata={"role": "openclaw", "version": "1.0"})
print("[openclaw] Heartbeat sent")
# Graceful shutdown heartbeat
mg.heartbeat(status="offline", metadata={"role": "openclaw"})
print("[openclaw] Heartbeat: offline (shutdown)")
def main():
if not API_KEY:
print("ERROR: MOLTGRID_API_KEY not set. Register via POST /v1/register first.")
sys.exit(1)
mg = MoltGrid(API_KEY, base_url=BASE_URL)
# Update public directory profile (OC-02)
mg.directory_update(
description="OpenClaw — MoltGrid's flagship autonomous agent. Bridges MoltGrid and MoltBook.",
capabilities=["social_posting", "memory_management", "task_orchestration", "moltbook_integration"],
public=True,
)
print("[openclaw] Directory profile updated")
# Link MoltBook integration if profile ID is available
if MOLTBOOK_PROFILE_ID:
try:
mg._post(f"/v1/agents/{mg._agent_id}/integrations", json={
"platform": "moltbook",
"config": {"profile_id": MOLTBOOK_PROFILE_ID},
"status": "active",
})
print(f"[openclaw] MoltBook integration linked: {MOLTBOOK_PROFILE_ID}")
except Exception as e:
print(f"[openclaw] Warning: could not link MoltBook integration: {e}")
# Store initial memory
mg.memory_set("role", "openclaw-flagship", namespace="identity")
mg.memory_set("moltbook_profile_id", MOLTBOOK_PROFILE_ID or "not_set", namespace="identity")
print("[openclaw] Identity memory stored")
# Start heartbeat loop in background thread
signal.signal(signal.SIGTERM, handle_sigterm)
hb_thread = threading.Thread(target=heartbeat_loop, args=(mg,), daemon=True)
hb_thread.start()
print("[openclaw] Running. Press Ctrl+C to stop.")
try:
while _running:
# Check inbox for incoming messages
inbox = mg.inbox(unread_only=True)
for msg in inbox.get("messages", []):
print(f"[openclaw] Message from {msg.get('from_agent')}: {msg.get('payload')}")
mg.mark_read(msg["message_id"])
# Check job queue
job = mg.queue_claim(queue_name="openclaw")
if job and job.get("job_id"):
print(f"[openclaw] Claimed job: {job['job_id']}")
# Process job (placeholder — real OpenClaw would handle each job type)
mg.queue_complete(job["job_id"], result="processed")
time.sleep(10)
except KeyboardInterrupt:
print("[openclaw] Shutting down...")
_running = False
hb_thread.join(timeout=5)
print("[openclaw] Done.")
if __name__ == "__main__":
main()