ArqonBus is a lightweight, structured WebSocket message bus with rooms, channels, and a simple command protocol. It's designed to be the real-time backbone for applications, services, and agents that need organized, multi-channel communication.
Status: experimental (Phase0/Phase1 branch slice complete; broader vNext ongoing)
Canonical vNext status:docs/ArqonBus/vnext_status.md
- Persistent, bi-directional connections over WebSockets
- Configurable
--host,--port,--telemetry-port
- Tracks connected clients with:
id,type(human, dashboard, ai-agent, etc.)- room β channel memberships
- optional
personality,screen_name,avatar connected_at,last_activity
- Server stats:
- total / active connections
- active rooms & channels
- total messages
- Hierarchical routing:
roomroom:channel(e.g.science:explore)
- Each room has multiple channels; each channel has multiple clients.
- Clients can join/leave channels dynamically.
- Automatic room/channel resolution:
- explicit
room/channel room:channelformat- fallback to client's initial room/channel
- explicit
- Message types:
- system β internal/system broadcasts
- private β direct to target client IDs
- command β processed by the command handler
- default event messages for normal traffic
- "pm" channel support for private-channel style messaging.
broadcast_to_channel(room, channel, ...)broadcast_to_room(room, ...)- Avoids duplicate delivery to the same client.
- Cleans up broken connections automatically.
- In-memory rolling history (
deque, max ~500 messages). historycommand:- filter by room
- optional channel
- returns recent messages (last 50).
Built-in commands (via type: "command"):
statusβ server stats, channels, clientscreate_channelβ admin-only, createroom:channeldelete_channelβ admin-only, delete empty channelsjoin_channelβ join an existing (or auto-created) channelleave_channelβ leave a channellist_channelsβ list channels in a room with participant countschannel_infoβ participants + metadata for a specific channelpingβ returnsponghistoryβ recent message history
- Separate telemetry WebSocket server (
--telemetry-port). - Validated telemetry events (
eventType+payload). - Telemetry broadcast to:
- room:
integriguard - channel:
telemetry-stream
- room:
- Lightweight
agent_activityevents emitted for non-system agents to:- room:
integriguard - channel:
dashboard-events
- room:
- Bootstrapped
scienceroom with workflow channels:general, explore, hypothesize, design, execute, interpret, share
- Channel metadata with
created_at,created_by,type,hardcoded.
- Periodic stats logging (clients, rooms, total messages).
- Detailed routing, command, and error logs.
- Optional but production-focused message inspection with monitor (no blocking) and enforce (blocking/redaction) modes.
- Scope-aware: target only certain rooms/channels via
ARQONBUS_CASIL_SCOPE_INCLUDE/EXCLUDE. - Policies: payload size limits, probable-secret detection (regex + classifier flags), configurable redaction paths/patterns, and transport vs observability redaction.
- Bounded, deterministic processing with configurable inspect limits to keep overhead low; falls back safely with default decisions.
- Rich telemetry and metadata: emits CASIL decision events, attaches classification flags to envelopes (when enabled), and logs redaction/block decisions.
- Quick start:
ARQONBUS_CASIL_ENABLED=true \ ARQONBUS_CASIL_MODE=monitor \ ARQONBUS_CASIL_SCOPE_INCLUDE="secure-*,pii-*" \ python websocket_bus.py --host localhost --port 9100 --telemetry-port 9101 - Full manual: see
docs/casil/index.mdfor configuration, redaction, and API details.
- Feature Specification - User stories, requirements, and success criteria
- Data Model - Entity relationships and validation rules
- Implementation Plan - Technical architecture and constitutional compliance
- Task Breakdown - Detailed implementation tasks and dependencies
- Quickstart Guide - Integration scenarios and testing patterns
- Requirements Checklist - Specification quality validation
- WebSocket Protocol - Complete message envelope specification
- HTTP Endpoints - Health, metrics, and monitoring APIs
- Architecture Guide - Detailed system architecture and component descriptions
- API Documentation - Complete API reference and usage examples
- Operations Runbook - Deployment and operational troubleshooting
- ArqonTech Ecosystem - Product positioning and roadmap
- ArqonBus v1.0 Scope - Product evolution plan and gap analysis
python -m arqonbus.transport.websocket_bus# Install in editable mode once
pip install -e .
# HTTP snapshots
arqon version --http-url http://127.0.0.1:8080
arqon status --http-url http://127.0.0.1:8080
# WebSocket tail (JWT required only when auth is enabled)
arqon tail --ws-url ws://127.0.0.1:9100 --jwt "$ARQONBUS_AUTH_JWT" --raw --limit 1These are available as WebSocket type: "command" operations:
op.casil.get|reload- Inspect and hot-reload active CASIL policy on the running gateway.op.webhook.register|list|unregister- Route matching room/channel envelopes to HTTP POST webhooks.op.cron.schedule|list|cancel- Schedule delayed or recurring envelope broadcasts.op.store.set|get|list|delete- Tenant-scoped in-memory KV storage for agent state.op.omega.status|register_substrate|list_substrates|unregister_substrate|emit_event|list_events|clear_events- Feature-flagged Tier-Omega experimental lane (admin-only mutations).
Tier-Omega lane environment flags:
ARQONBUS_OMEGA_ENABLED=false
ARQONBUS_OMEGA_LAB_ROOM=omega-lab
ARQONBUS_OMEGA_LAB_CHANNEL=signals
ARQONBUS_OMEGA_MAX_EVENTS=1000
ARQONBUS_OMEGA_MAX_SUBSTRATES=128import asyncio
from arqonbus.sdk import ArqonBusClient
async def main():
async with ArqonBusClient("ws://127.0.0.1:9100") as client:
message = await client.recv_json(timeout=2.0)
print(message["type"], message.get("payload", {}))
asyncio.run(main())ARQONBUS_WS_URL=ws://127.0.0.1:9100 python examples/python/hello_world_bot.py