Skip to content

Unified connection management across all protocols#13

Open
zach-snell wants to merge 18 commits intomainfrom
feature/unified-connection-management
Open

Unified connection management across all protocols#13
zach-snell wants to merge 18 commits intomainfrom
feature/unified-connection-management

Conversation

@zach-snell
Copy link
Copy Markdown
Contributor

@zach-snell zach-snell commented Apr 8, 2026

Summary

Unifies connection/session management across all connection-style protocols, based on the excellent work from @pr0head in #9 and #11.

  • WebSocket: Connection management API (list, get, close, send, stats) + auto-disconnect on mock update with close code 1012
  • SSE: Auto-disconnect on mock update + enriched response types (was only returning ID/MockID, now returns full connection details) + engine-level tests
  • MQTT: New connection management API with per-client control, subscription visibility, and granular disconnect
  • gRPC: Stream tracking for streaming RPCs with cancel support and codes.Unavailable on mock update
  • CLI: mockd [websocket|sse|mqtt|grpc] connections [list|get|close] + stats for all protocols
  • Desktop UI: Connections view with protocol tabs, stats cards, live table, send dialog, close actions, detail panel
  • Docker CI: Fixed build triggers (PR + main + tags + manual dispatch)

What changed

  • All 4 protocols follow the same layering: Admin API → Engine Client → Engine Control API → ConnectionManager
  • Auto-disconnect pattern: WebSocket uses 1012 (Service Restart), SSE uses context cancellation, MQTT uses clean disconnect, gRPC uses codes.Unavailable
  • Pre-v1 breaking changes: SSE list/get responses now include full connection details, WebSocket response removed unpopulated fields

Based on community contribution

Cherry-picked and extended @pr0head's work from PRs #9 and #11. Applied fixes identified in review (type validation, response field alignment, dead code removal, etc.) and extended the same patterns to SSE, MQTT, and gRPC.

Closes #8
Closes #10

Test plan

  • go build ./... passes
  • go vet ./... passes
  • go test ./... passes
  • Start mockd, connect a WebSocket client, verify connection appears in mockd websocket connections
  • Update a WebSocket mock, verify client receives close code 1012
  • CLI: test mockd websocket stats, mockd sse stats, mockd mqtt stats, mockd grpc stats

@pr0head — would love your thoughts on this and if you could give it a test. We took your WebSocket work as the foundation and extended the same patterns to SSE, MQTT, and gRPC. Let us know if anything looks off or if you have suggestions.

pr0head and others added 11 commits April 5, 2026 18:28
Add full MQTT connection management across all API layers:
- Broker: ListClientInfos, GetClientInfo, DisconnectClient, GetConnectionStats
- Engine API: GET/DELETE /mqtt/connections/{id}, GET /mqtt/stats
- Control API adapter: bridges broker methods to engine API
- Engine client: HTTP client methods for MQTT endpoints
- Admin API: handlers with stats provider pattern, routes at /mqtt-connections/
- Auto-disconnect: cleanly disconnect all MQTT clients before stopping broker
  on mock update/delete, giving clients a chance to reconnect

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add StreamTracker to the gRPC server that tracks active streaming RPCs
(server-stream, client-stream, bidirectional). Unary RPCs are counted
but not tracked as active connections since they are stateless.

Key changes:
- StreamTracker in pkg/grpc/stream_tracker.go with Register/Unregister,
  message counting, Cancel/CancelAll, and aggregate stats
- Hooked into all three streaming handlers in server.go, replacing raw
  metrics.ActiveConnections calls with tracker-based management
- Admin API endpoints: GET/DELETE /grpc/connections/{id}, GET /grpc/stats
- Engine Control API routes and ControlAPIAdapter methods
- Engine client methods for the admin server to call through
- Auto-disconnect on mock update: unregisterHandlerLocked cancels active
  gRPC streams with codes.Unavailable before stopping the server, so
  clients with retry policies reconnect automatically
- Full test coverage: stream tracker unit tests, engine API handler
  tests, admin handler tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When SSE mocks are updated, deleted, toggled, or cleared, active SSE
connections now get disconnected automatically. This prevents clients
from streaming stale data after configuration changes.

Changes:
- Add DisconnectSSEByMock() to Handler (handler_protocol.go) which
  delegates to SSEConnectionManager.CloseByMock()
- Add case mock.TypeHTTP in unregisterHandlerLocked (mock_manager.go)
  that disconnects SSE connections when the HTTP mock has SSE config
- Add 6 integration tests covering update, delete, toggle, clear,
  multi-connection, and cross-mock isolation scenarios

The SSE close mechanism uses context cancellation: cancelling each
connection's context causes the event loop to exit, the response writer
to close, and the client to receive EOF. EventSource clients will
auto-reconnect and pick up the new configuration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
SSE API responses now return full connection details (path, clientIp,
userAgent, connectedAt, eventsSent, bytesSent, status) instead of the
slimmed SSEStreamInfo (ID, MockID only). This aligns SSE with the
WebSocket API which already returns rich connection info.

Changes:
- SSE list, get-single, and mock-specific list endpoints now return
  full SSEConnection structs from engineclient (admin/sse_handlers.go)
- Add mock-specific WebSocket connection endpoints for parity with SSE:
  GET /mocks/{id}/websocket/connections and
  DELETE /mocks/{id}/websocket/connections (admin/websocket_handlers.go)
- Register the new WebSocket routes (admin/routes.go)
- Add engine-level SSE handler tests covering list, get, close, and
  stats handlers (engine/api/handlers_test.go)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add `connections` subcommands (list, get, close) and `stats` commands
for WebSocket, SSE, MQTT, and gRPC protocols. WebSocket also gets a
`connections send` command with --binary flag support.

New files: ws_connections.go, sse.go, mqtt_connections.go, grpc_connections.go
Extended: client.go (16 new AdminClient interface methods + implementations),
print.go (formatStringSlice helper)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…onnection management

Critical fixes:
- C3: MQTT ConnectedAt now populated via OnSessionEstablished/OnDisconnect hooks
  with clientConnectedAt tracking map on the Broker struct
- C4: Canonical API types (WebSocketConnectionListResponse, SSEConnectionListResponse,
  MQTTConnectionListResponse, GRPCStreamListResponse) now use Stats field matching
  what admin handlers actually return, fixing CLI count:0 bug
- C5: CHANGELOG expanded with SSE, MQTT, gRPC, CLI, and UI connection management entries
- C6: API docs updated with correct WebSocket paths, full SSE/MQTT/gRPC connection
  management sections, mock-scoped WebSocket endpoints, and response examples

High-priority fixes:
- H1: gRPC stream handlers now return codes.Canceled for client-initiated cancellations
  and codes.Unavailable only for admin/mock-update cancellations, using per-stream
  adminCancelled tracking in StreamTracker
- H2: Removed dead code CancelAllWithStatus() and UnavailableError() from stream_tracker.go

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@pr0head
Copy link
Copy Markdown

pr0head commented Apr 8, 2026

Hi @zach-snell!

I was able to test only the WebSocket functionality — it works correctly. The control API works as expected, and when mock data for a WebSocket is changed, the connection is terminated (as intended).

A couple of points I’d like to highlight:

  1. sseStatsProvider.GetStats() returns sse.ConnectionStats{...}, while all other providers return engineclient.*Stats.
  2. Engine handlers always return 404 on any error from CloseMQTTConnection / CancelGRPCStream. It seems that a 500 status code would be more appropriate here — up to you (depending on whether the error level can be inferred).

Great work — I’m really glad to see the project evolving so quickly! ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] WebSocket mock: close active connections on mock update [FEATURE] Admin API WebSocket management

2 participants