Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **WebSocket connection management API** — `GET /websocket/connections`, `GET /websocket/connections/{id}`, `DELETE /websocket/connections/{id}`, `POST /websocket/connections/{id}/send`, `GET /websocket/stats` added to the Admin API for real-time visibility, control, and server-initiated messaging of active WebSocket connections
- **Workspace-scoped stateful resources** — stateful resources, custom operations, and request logs are now isolated per workspace
- **`--workspace` persistent CLI flag** — scope any CLI command to a specific workspace without switching context
- **`?workspaceId=` API parameter** — all admin API endpoints now accept workspace filtering
Expand Down
80 changes: 69 additions & 11 deletions docs/src/content/docs/reference/admin-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -945,21 +945,58 @@ Get SSE statistics.

### WebSocket Management

#### GET /admin/ws/connections
#### GET /websocket/connections

List active WebSocket connections.

#### GET /admin/ws/connections/{id}
**Response:**

Get connection details.
```json
{
"connections": [
{
"id": "ws-abc123",
"path": "/ws/orderbook",
"connectedAt": "2024-01-15T10:30:00Z",
"messagesSent": 42,
"messagesRecv": 7,
"status": "connected"
}
],
"stats": {
"totalConnections": 1,
"activeConnections": 1,
"totalMessagesSent": 42,
"totalMessagesRecv": 7,
"connectionsByMock": {}
}
}
```

#### DELETE /admin/ws/connections/{id}
#### GET /websocket/connections/{id}

Get details of a specific WebSocket connection.

**Response:** Single connection object (same shape as items in the list above). Returns `404` if not found.

#### DELETE /websocket/connections/{id}

Close a WebSocket connection.

#### POST /admin/ws/connections/{id}/send
**Response:**

```json
{
"message": "Connection closed",
"connection": "ws-abc123"
}
```

Returns `404` if the connection is not found.

#### POST /websocket/connections/{id}/send

Send a message to a specific connection.
Send a text or binary message to a specific active WebSocket connection.

**Request:**

Expand All @@ -970,18 +1007,39 @@ Send a message to a specific connection.
}
```

#### POST /admin/ws/broadcast
| Field | Type | Description |
|-------|------|-------------|
| `type` | string | Message type: `"text"` (default) or `"binary"` |
| `data` | string | Message payload. For `"text"`, a plain UTF-8 string. For `"binary"`, a **base64-encoded** string — the server decodes it before writing raw bytes to the WebSocket. |

Broadcast message to all connections.
**Response:**

#### GET /admin/ws/endpoints
```json
{
"message": "Message sent",
"connection": "ws-abc123",
"type": "text"
}
```

List configured WebSocket endpoints.
Returns `404` if the connection is not found.

#### GET /admin/ws/stats
#### GET /websocket/stats

Get WebSocket statistics.

**Response:**

```json
{
"totalConnections": 10,
"activeConnections": 2,
"totalMessagesSent": 500,
"totalMessagesRecv": 120,
"connectionsByMock": {}
}
```

---

### Stream Recordings (WebSocket/SSE)
Expand Down
104 changes: 104 additions & 0 deletions pkg/admin/engineclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,110 @@ func (c *Client) GetSSEStats(ctx context.Context) (*SSEStats, error) {
return &stats, nil
}

// ListWebSocketConnections returns all active WebSocket connections.
func (c *Client) ListWebSocketConnections(ctx context.Context) ([]*WebSocketConnection, error) {
resp, err := c.get(ctx, "/websocket/connections")
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return nil, c.parseError(resp)
}

var result struct {
Connections []*WebSocketConnection `json:"connections"`
Count int `json:"count"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode WebSocket connections: %w", err)
}
return result.Connections, nil
}

// GetWebSocketConnection returns a specific WebSocket connection by ID.
func (c *Client) GetWebSocketConnection(ctx context.Context, id string) (*WebSocketConnection, error) {
resp, err := c.get(ctx, "/websocket/connections/"+url.PathEscape(id))
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound {
return nil, ErrNotFound
}
if resp.StatusCode != http.StatusOK {
return nil, c.parseError(resp)
}

var conn WebSocketConnection
if err := json.NewDecoder(resp.Body).Decode(&conn); err != nil {
return nil, fmt.Errorf("failed to decode WebSocket connection: %w", err)
}
return &conn, nil
}

// CloseWebSocketConnection closes a WebSocket connection by ID.
func (c *Client) CloseWebSocketConnection(ctx context.Context, id string) error {
resp, err := c.delete(ctx, "/websocket/connections/"+url.PathEscape(id))
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound {
return ErrNotFound
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return c.parseError(resp)
}
return nil
}

// SendToWebSocketConnection sends a text or binary message to a specific connection.
// msgType must be "text" (default) or "binary".
// For binary messages, data must be a base64-encoded string; the engine decodes
// it before sending the raw bytes over the WebSocket connection.
func (c *Client) SendToWebSocketConnection(ctx context.Context, id string, msgType string, data string) error {
body := map[string]string{
"type": msgType,
"data": data,
}
resp, err := c.post(ctx, "/websocket/connections/"+url.PathEscape(id)+"/send", body)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound {
return ErrNotFound
}
if resp.StatusCode != http.StatusOK {
return c.parseError(resp)
}
return nil
}

// GetWebSocketStats returns WebSocket statistics.
func (c *Client) GetWebSocketStats(ctx context.Context) (*WebSocketStats, error) {
resp, err := c.get(ctx, "/websocket/stats")
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return nil, c.parseError(resp)
}

var stats WebSocketStats
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return nil, fmt.Errorf("failed to decode WebSocket stats: %w", err)
}
return &stats, nil
}

// HTTP helpers

func (c *Client) get(ctx context.Context, path string) (*http.Response, error) {
Expand Down
139 changes: 139 additions & 0 deletions pkg/admin/engineclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,145 @@ func TestHTTPMethods(t *testing.T) {
}
}

// --- WebSocket Tests ---

func TestListWebSocketConnections_Success(t *testing.T) {
resp := struct {
Connections []*WebSocketConnection `json:"connections"`
Count int `json:"count"`
}{
Connections: []*WebSocketConnection{{ID: "ws-1"}, {ID: "ws-2"}},
Count: 2,
}
_, c := mockServer(t, jsonHandler(t, 200, resp))

conns, err := c.ListWebSocketConnections(context.Background())
if err != nil {
t.Fatalf("ListWebSocketConnections() error = %v", err)
}
if len(conns) != 2 {
t.Errorf("ListWebSocketConnections() = %d, want 2", len(conns))
}
if conns[0].ID != "ws-1" {
t.Errorf("ListWebSocketConnections()[0].ID = %q, want %q", conns[0].ID, "ws-1")
}
}

func TestListWebSocketConnections_Error(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 503, nil))
_, err := c.ListWebSocketConnections(context.Background())
if err == nil {
t.Error("ListWebSocketConnections() error = nil, want error for 503")
}
}

func TestGetWebSocketConnection_Success(t *testing.T) {
conn := WebSocketConnection{ID: "ws-1", Status: "connected"}
_, c := mockServer(t, jsonHandler(t, 200, conn))

result, err := c.GetWebSocketConnection(context.Background(), "ws-1")
if err != nil {
t.Fatalf("GetWebSocketConnection() error = %v", err)
}
if result.ID != "ws-1" {
t.Errorf("GetWebSocketConnection().ID = %q, want %q", result.ID, "ws-1")
}
}

func TestGetWebSocketConnection_NotFound(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 404, nil))
_, err := c.GetWebSocketConnection(context.Background(), "missing")
if !errors.Is(err, ErrNotFound) {
t.Errorf("GetWebSocketConnection() error = %v, want ErrNotFound", err)
}
}

func TestCloseWebSocketConnection_Success(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 200, map[string]string{"message": "closed"}))
err := c.CloseWebSocketConnection(context.Background(), "ws-1")
if err != nil {
t.Errorf("CloseWebSocketConnection() error = %v, want nil", err)
}
}

func TestCloseWebSocketConnection_NoContent(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 204, nil))
err := c.CloseWebSocketConnection(context.Background(), "ws-1")
if err != nil {
t.Errorf("CloseWebSocketConnection() 204 error = %v, want nil", err)
}
}

func TestCloseWebSocketConnection_NotFound(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 404, nil))
err := c.CloseWebSocketConnection(context.Background(), "missing")
if !errors.Is(err, ErrNotFound) {
t.Errorf("CloseWebSocketConnection() error = %v, want ErrNotFound", err)
}
}

func TestSendToWebSocketConnection_Success(t *testing.T) {
var capturedBody map[string]string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewDecoder(r.Body).Decode(&capturedBody)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]string{"message": "sent"})
}))
defer ts.Close()
c := New(ts.URL)

err := c.SendToWebSocketConnection(context.Background(), "ws-1", "text", "hello")
if err != nil {
t.Fatalf("SendToWebSocketConnection() error = %v", err)
}
if capturedBody["type"] != "text" {
t.Errorf("request body type = %q, want %q", capturedBody["type"], "text")
}
if capturedBody["data"] != "hello" {
t.Errorf("request body data = %q, want %q", capturedBody["data"], "hello")
}
}

func TestSendToWebSocketConnection_NotFound(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 404, nil))
err := c.SendToWebSocketConnection(context.Background(), "missing", "text", "hello")
if !errors.Is(err, ErrNotFound) {
t.Errorf("SendToWebSocketConnection() error = %v, want ErrNotFound", err)
}
}

func TestSendToWebSocketConnection_Error(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 500, ErrorResponse{Error: "engine_error", Message: "failed"}))
err := c.SendToWebSocketConnection(context.Background(), "ws-1", "text", "hello")
if err == nil {
t.Error("SendToWebSocketConnection() error = nil, want error for 500")
}
}

func TestGetWebSocketStats_Success(t *testing.T) {
stats := WebSocketStats{ActiveConnections: 3, TotalConnections: 10}
_, c := mockServer(t, jsonHandler(t, 200, stats))

result, err := c.GetWebSocketStats(context.Background())
if err != nil {
t.Fatalf("GetWebSocketStats() error = %v", err)
}
if result.ActiveConnections != 3 {
t.Errorf("GetWebSocketStats().ActiveConnections = %d, want 3", result.ActiveConnections)
}
if result.TotalConnections != 10 {
t.Errorf("GetWebSocketStats().TotalConnections = %d, want 10", result.TotalConnections)
}
}

func TestGetWebSocketStats_Error(t *testing.T) {
_, c := mockServer(t, jsonHandler(t, 503, nil))
_, err := c.GetWebSocketStats(context.Background())
if err == nil {
t.Error("GetWebSocketStats() error = nil, want error for 503")
}
}

// containsStr checks if s contains substr.
func containsStr(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsSubstring(s, substr))
Expand Down
4 changes: 3 additions & 1 deletion pkg/admin/engineclient/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package engineclient
import (
"errors"

types "github.com/getmockd/mockd/pkg/api/types"
"github.com/getmockd/mockd/pkg/api/types"
"github.com/getmockd/mockd/pkg/store"
)

Expand Down Expand Up @@ -47,6 +47,8 @@ type (
ProtocolHandler = types.ProtocolHandler
SSEConnection = types.SSEConnection
SSEStats = types.SSEStats
WebSocketConnection = types.WebSocketConnection
WebSocketStats = types.WebSocketStats
CustomOperationInfo = types.CustomOperationInfo
CustomOperationDetail = types.CustomOperationDetail
CustomOperationStep = types.CustomOperationStep
Expand Down
Loading