diff --git a/build/dev/deps/compose.yml b/build/dev/deps/compose.yml index 944348d1..6b486502 100644 --- a/build/dev/deps/compose.yml +++ b/build/dev/deps/compose.yml @@ -14,39 +14,39 @@ services: - redis:/data # ============================== Log Store ============================== - # clickhouse: - # image: clickhouse/clickhouse-server:24-alpine - # environment: - # CLICKHOUSE_DB: outpost - # ports: - # # tcp - # - 9000:9000 - # # # http - # # - 8123:8123 - # # # postgresql - # # - 9005:9005 - # volumes: - # # optional to persist data locally - # - clickhouse:/var/lib/clickhouse/ - # # optional to add own config - # # - ./extra-config.xml:/etc/clickhouse-server/config.d/extra-config.xml - # # optional to add users or enable settings for a default user - # # - ./user.xml:/etc/clickhouse-server/users.d/user.xml - # # qol to mount own sql scripts to run them from inside container with - # # clickhouse client < /sql/myquery.sql - # # - ./sql:/sql - # # adjust mem_limit and cpus to machine - # # mem_limit: 12G - # # cpus: 4 - # ulimits: - # nofile: - # soft: 262144 - # hard: 262144 - # healthcheck: - # test: ["CMD", "wget", "--spider", "-q", "http://127.0.0.1:8123/ping"] - # interval: 1s - # timeout: 1s - # retries: 30 + clickhouse: + image: clickhouse/clickhouse-server:24-alpine + environment: + CLICKHOUSE_DB: outpost + ports: + # tcp + - 9000:9000 + # # http + # - 8123:8123 + # # postgresql + # - 9005:9005 + volumes: + # optional to persist data locally + - clickhouse:/var/lib/clickhouse/ + # optional to add own config + # - ./extra-config.xml:/etc/clickhouse-server/config.d/extra-config.xml + # optional to add users or enable settings for a default user + # - ./user.xml:/etc/clickhouse-server/users.d/user.xml + # qol to mount own sql scripts to run them from inside container with + # clickhouse client < /sql/myquery.sql + # - ./sql:/sql + # adjust mem_limit and cpus to machine + # mem_limit: 12G + # cpus: 4 + ulimits: + nofile: + soft: 262144 + hard: 262144 + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://127.0.0.1:8123/ping"] + interval: 1s + timeout: 1s + retries: 30 postgres: image: postgres:16-alpine @@ -101,8 +101,8 @@ services: volumes: redis: driver: local - # clickhouse: - # driver: local + clickhouse: + driver: local postgres: driver: local rabbitmq: diff --git a/cmd/e2e/log_test.go b/cmd/e2e/log_test.go new file mode 100644 index 00000000..90a95cf3 --- /dev/null +++ b/cmd/e2e/log_test.go @@ -0,0 +1,932 @@ +package e2e_test + +import ( + "fmt" + "net/http" + "time" + + "github.com/hookdeck/outpost/cmd/e2e/httpclient" + "github.com/hookdeck/outpost/internal/idgen" +) + +// TestLogAPI tests the new Log API endpoints (deliveries, events). +// +// Setup: +// 1. Create a tenant +// 2. Configure mock webhook server to accept deliveries +// 3. Create a destination pointing to the mock server +// 4. Publish an event and wait for delivery to complete +// +// Test Cases: +// - GET /:tenantID/deliveries - List all deliveries with proper response structure +// - GET /:tenantID/deliveries?destination_id=X - Filter deliveries by destination +// - GET /:tenantID/deliveries?event_id=X - Filter deliveries by event +// - GET /:tenantID/deliveries?expand=event - Expand event summary (without data) +// - GET /:tenantID/deliveries?expand=event.data - Expand full event with payload data +// - GET /:tenantID/events/:eventID - Retrieve a single event with full details +// - GET /:tenantID/events/:eventID (non-existent) - Returns 404 +func (suite *basicSuite) TestLogAPI() { + tenantID := idgen.String() + destinationID := idgen.Destination() + eventID := idgen.Event() + + // Setup: Create tenant, destination, and publish an event + setupTests := []APITest{ + { + Name: "PUT /:tenantID - create tenant", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPUT, + Path: "/" + tenantID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + { + Name: "PUT mockserver/destinations - setup mock", + Request: httpclient.Request{ + Method: httpclient.MethodPUT, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + }, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + { + Name: "POST /:tenantID/destinations - create destination", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "topics": "*", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + { + Name: "POST /publish - publish event", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/publish", + Body: map[string]interface{}{ + "id": eventID, + "tenant_id": tenantID, + "topic": "user.created", + "eligible_for_retry": true, + "data": map[string]interface{}{ + "user_id": "123", + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusAccepted, + }, + }, + }, + } + suite.RunAPITests(suite.T(), setupTests) + + // Wait for delivery to complete + time.Sleep(2 * time.Second) + + // Test the new Log API endpoints + logAPITests := []APITest{ + // GET /:tenantID/deliveries - list deliveries + { + Name: "GET /:tenantID/deliveries - list all deliveries", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/deliveries", + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "body": map[string]interface{}{ + "type": "object", + "required": []interface{}{"data"}, + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + "items": map[string]interface{}{ + "type": "object", + "required": []interface{}{"id", "event", "destination", "status", "delivered_at"}, + "properties": map[string]interface{}{ + "id": map[string]interface{}{"type": "string"}, + "event": map[string]interface{}{"type": "string"}, // Event ID when not expanded + "destination": map[string]interface{}{"const": destinationID}, + "status": map[string]interface{}{"type": "string"}, + "delivered_at": map[string]interface{}{"type": "string"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/deliveries?destination_id=X - filter by destination + { + Name: "GET /:tenantID/deliveries?destination_id=X - filter by destination", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/deliveries?destination_id=" + destinationID, + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "body": map[string]interface{}{ + "type": "object", + "required": []interface{}{"data"}, + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + }, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/deliveries?event_id=X - filter by event + { + Name: "GET /:tenantID/deliveries?event_id=X - filter by event", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/deliveries?event_id=" + eventID, + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "body": map[string]interface{}{ + "type": "object", + "required": []interface{}{"data"}, + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + }, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/deliveries?expand=event - expand event (without data) + { + Name: "GET /:tenantID/deliveries?expand=event - expand event summary", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/deliveries?expand=event", + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + "items": map[string]interface{}{ + "type": "object", + "required": []interface{}{"event"}, + "properties": map[string]interface{}{ + "event": map[string]interface{}{ + "type": "object", + "required": []interface{}{"id", "topic", "time"}, + "properties": map[string]interface{}{ + "id": map[string]interface{}{"type": "string"}, + "topic": map[string]interface{}{"type": "string"}, + "time": map[string]interface{}{"type": "string"}, + }, + // expand=event should NOT include data + "not": map[string]interface{}{ + "required": []interface{}{"data"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/deliveries?expand=event.data - expand full event with data + { + Name: "GET /:tenantID/deliveries?expand=event.data - expand full event", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/deliveries?expand=event.data", + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + "items": map[string]interface{}{ + "type": "object", + "required": []interface{}{"event"}, + "properties": map[string]interface{}{ + "event": map[string]interface{}{ + "type": "object", + "required": []interface{}{"id", "topic", "time", "data"}, + "properties": map[string]interface{}{ + "id": map[string]interface{}{"const": eventID}, + "topic": map[string]interface{}{"const": "user.created"}, + "time": map[string]interface{}{"type": "string"}, + "data": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "user_id": map[string]interface{}{"const": "123"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/events/:eventID - retrieve single event + { + Name: "GET /:tenantID/events/:eventID - retrieve event", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/events/" + eventID, + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "body": map[string]interface{}{ + "type": "object", + "required": []interface{}{"id", "topic", "time", "data"}, + "properties": map[string]interface{}{ + "id": map[string]interface{}{"const": eventID}, + "topic": map[string]interface{}{"const": "user.created"}, + "data": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "user_id": map[string]interface{}{"const": "123"}, + }, + }, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/events/:eventID - non-existent event + { + Name: "GET /:tenantID/events/:eventID - not found", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/events/" + idgen.Event(), + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusNotFound, + }, + }, + }, + } + suite.RunAPITests(suite.T(), logAPITests) + + // Cleanup + cleanupTests := []APITest{ + { + Name: "DELETE mockserver/destinations/:destinationID", + Request: httpclient.Request{ + Method: httpclient.MethodDELETE, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations/" + destinationID, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + { + Name: "DELETE /:tenantID", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodDELETE, + Path: "/" + tenantID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + } + suite.RunAPITests(suite.T(), cleanupTests) +} + +// TestRetryAPI tests the retry endpoint. +// +// Setup: +// 1. Create a tenant +// 2. Configure mock webhook server to FAIL (return 500) +// 3. Create a destination pointing to the mock server +// 4. Publish an event with eligible_for_retry=false (fails once, no auto-retry) +// 5. Wait for delivery to fail, then fetch the delivery ID +// 6. Update mock server to SUCCEED (return 200) +// +// Test Cases: +// - POST /:tenantID/deliveries/:deliveryID/retry - Successful retry returns 202 Accepted +// - POST /:tenantID/deliveries/:deliveryID/retry (non-existent) - Returns 404 +// - Verify retry created new delivery - Event now has 2+ deliveries +// - POST /:tenantID/deliveries/:deliveryID/retry (disabled destination) - Returns 400 +func (suite *basicSuite) TestRetryAPI() { + tenantID := idgen.String() + destinationID := idgen.Destination() + eventID := idgen.Event() + + // Setup: create tenant, destination with failing webhook, and publish event + setupTests := []APITest{ + { + Name: "PUT /:tenantID - create tenant", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPUT, + Path: "/" + tenantID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + { + Name: "PUT mockserver/destinations - setup mock to fail", + Request: httpclient.Request{ + Method: httpclient.MethodPUT, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + "response": map[string]interface{}{ + "status": 500, // Fail deliveries + }, + }, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + { + Name: "POST /:tenantID/destinations - create destination", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "topics": "*", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + { + Name: "POST /publish - publish event (will fail)", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/publish", + Body: map[string]interface{}{ + "id": eventID, + "tenant_id": tenantID, + "topic": "user.created", + "eligible_for_retry": false, // Disable auto-retry + "data": map[string]interface{}{ + "user_id": "456", + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusAccepted, + }, + }, + }, + } + suite.RunAPITests(suite.T(), setupTests) + + // Wait for delivery to complete (and fail) + time.Sleep(2 * time.Second) + + // Get the delivery ID + deliveriesResp, err := suite.client.Do(suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/deliveries?event_id=" + eventID, + })) + suite.Require().NoError(err) + suite.Require().Equal(http.StatusOK, deliveriesResp.StatusCode) + + body := deliveriesResp.Body.(map[string]interface{}) + data := body["data"].([]interface{}) + suite.Require().NotEmpty(data, "should have at least one delivery") + firstDelivery := data[0].(map[string]interface{}) + deliveryID := firstDelivery["id"].(string) + + // Update mock to succeed for retry + updateMockTests := []APITest{ + { + Name: "PUT mockserver/destinations - setup mock to succeed", + Request: httpclient.Request{ + Method: httpclient.MethodPUT, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + "response": map[string]interface{}{ + "status": 200, // Now succeed + }, + }, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + } + suite.RunAPITests(suite.T(), updateMockTests) + + // Test retry endpoint + retryTests := []APITest{ + // POST /:tenantID/deliveries/:deliveryID/retry - successful retry + { + Name: "POST /:tenantID/deliveries/:deliveryID/retry - retry delivery", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/deliveries/" + deliveryID + "/retry", + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusAccepted, + Body: map[string]interface{}{ + "success": true, + }, + }, + }, + }, + // POST /:tenantID/deliveries/:deliveryID/retry - non-existent delivery + { + Name: "POST /:tenantID/deliveries/:deliveryID/retry - not found", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/deliveries/" + idgen.Delivery() + "/retry", + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusNotFound, + }, + }, + }, + } + suite.RunAPITests(suite.T(), retryTests) + + // Wait for retry delivery to complete + time.Sleep(2 * time.Second) + + // Verify we have more deliveries after retry + verifyTests := []APITest{ + { + Name: "GET /:tenantID/deliveries?event_id=X - verify retry created new delivery", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/deliveries?event_id=" + eventID, + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 2, // Original + retry + }, + }, + }, + }, + }, + }, + }, + } + suite.RunAPITests(suite.T(), verifyTests) + + // Test retry on disabled destination + disableTests := []APITest{ + { + Name: "PUT /:tenantID/destinations/:destinationID/disable", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPUT, + Path: "/" + tenantID + "/destinations/" + destinationID + "/disable", + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + { + Name: "POST /:tenantID/deliveries/:deliveryID/retry - disabled destination", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/deliveries/" + deliveryID + "/retry", + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]interface{}{ + "message": "Destination is disabled", + }, + }, + }, + }, + } + suite.RunAPITests(suite.T(), disableTests) + + // Cleanup + cleanupTests := []APITest{ + { + Name: "DELETE mockserver/destinations/:destinationID", + Request: httpclient.Request{ + Method: httpclient.MethodDELETE, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations/" + destinationID, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + { + Name: "DELETE /:tenantID", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodDELETE, + Path: "/" + tenantID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + } + suite.RunAPITests(suite.T(), cleanupTests) +} + +// TestLegacyLogAPI tests the deprecated legacy endpoints for backward compatibility. +// All legacy endpoints return "Deprecation: true" header to signal migration. +// +// Setup: +// 1. Create a tenant +// 2. Configure mock webhook server to accept deliveries +// 3. Create a destination pointing to the mock server +// 4. Publish an event and wait for delivery to complete +// +// Test Cases: +// - GET /:tenantID/destinations/:destID/events - Legacy list events (returns {data, count}) +// - GET /:tenantID/destinations/:destID/events/:eventID - Legacy retrieve event +// - GET /:tenantID/events/:eventID/deliveries - Legacy list deliveries (returns bare array, not {data}) +// - POST /:tenantID/destinations/:destID/events/:eventID/retry - Legacy retry endpoint +// +// All responses include: +// - Deprecation: true header +// - X-Deprecated-Message header with migration guidance +func (suite *basicSuite) TestLegacyLogAPI() { + tenantID := idgen.String() + destinationID := idgen.Destination() + eventID := idgen.Event() + + // Setup + setupTests := []APITest{ + { + Name: "PUT /:tenantID - create tenant", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPUT, + Path: "/" + tenantID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + { + Name: "PUT mockserver/destinations - setup mock", + Request: httpclient.Request{ + Method: httpclient.MethodPUT, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + }, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + { + Name: "POST /:tenantID/destinations - create destination", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "topics": "*", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + { + Name: "POST /publish - publish event", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/publish", + Body: map[string]interface{}{ + "id": eventID, + "tenant_id": tenantID, + "topic": "user.created", + "eligible_for_retry": true, + "data": map[string]interface{}{ + "user_id": "789", + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusAccepted, + }, + }, + }, + } + suite.RunAPITests(suite.T(), setupTests) + + // Wait for delivery + time.Sleep(2 * time.Second) + + // Test legacy endpoints - all should return deprecation headers + legacyTests := []APITest{ + // GET /:tenantID/destinations/:destinationID/events - legacy list events by destination + { + Name: "GET /:tenantID/destinations/:destinationID/events - legacy endpoint", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/destinations/" + destinationID + "/events", + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "headers": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "Deprecation": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ + "const": "true", + }, + }, + }, + }, + "body": map[string]interface{}{ + "type": "object", + "required": []interface{}{"data", "count"}, + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + }, + "count": map[string]interface{}{"type": "number"}, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/destinations/:destinationID/events/:eventID - legacy retrieve event + { + Name: "GET /:tenantID/destinations/:destinationID/events/:eventID - legacy endpoint", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/destinations/" + destinationID + "/events/" + eventID, + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "headers": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "Deprecation": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ + "const": "true", + }, + }, + }, + }, + "body": map[string]interface{}{ + "type": "object", + "required": []interface{}{"id", "topic"}, + "properties": map[string]interface{}{ + "id": map[string]interface{}{"const": eventID}, + "topic": map[string]interface{}{"const": "user.created"}, + }, + }, + }, + }, + }, + }, + // GET /:tenantID/events/:eventID/deliveries - legacy list deliveries by event + { + Name: "GET /:tenantID/events/:eventID/deliveries - legacy endpoint (returns bare array)", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/events/" + eventID + "/deliveries", + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 200}, + "headers": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "Deprecation": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ + "const": "true", + }, + }, + }, + }, + // Legacy endpoint returns bare array, not {data: [...]} + "body": map[string]interface{}{ + "type": "array", + "minItems": 1, + "items": map[string]interface{}{ + "type": "object", + "required": []interface{}{"id", "status", "delivered_at"}, + "properties": map[string]interface{}{ + "id": map[string]interface{}{"type": "string"}, + "status": map[string]interface{}{"type": "string"}, + "delivered_at": map[string]interface{}{"type": "string"}, + }, + }, + }, + }, + }, + }, + }, + // POST /:tenantID/destinations/:destinationID/events/:eventID/retry - legacy retry + { + Name: "POST /:tenantID/destinations/:destinationID/events/:eventID/retry - legacy endpoint", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/destinations/" + destinationID + "/events/" + eventID + "/retry", + }), + Expected: APITestExpectation{ + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{"const": 202}, + "headers": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "Deprecation": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ + "const": "true", + }, + }, + }, + }, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "success": map[string]interface{}{"const": true}, + }, + }, + }, + }, + }, + }, + } + suite.RunAPITests(suite.T(), legacyTests) + + // Cleanup + cleanupTests := []APITest{ + { + Name: "DELETE mockserver/destinations/:destinationID", + Request: httpclient.Request{ + Method: httpclient.MethodDELETE, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations/" + destinationID, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + { + Name: "DELETE /:tenantID", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodDELETE, + Path: "/" + tenantID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + } + suite.RunAPITests(suite.T(), cleanupTests) +} diff --git a/internal/apirouter/legacy_handlers.go b/internal/apirouter/legacy_handlers.go new file mode 100644 index 00000000..9b00adcc --- /dev/null +++ b/internal/apirouter/legacy_handlers.go @@ -0,0 +1,260 @@ +package apirouter + +import ( + "errors" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/hookdeck/outpost/internal/deliverymq" + "github.com/hookdeck/outpost/internal/logging" + "github.com/hookdeck/outpost/internal/logstore" + "github.com/hookdeck/outpost/internal/models" + "go.uber.org/zap" +) + +var ( + ErrDestinationDisabled = errors.New("destination is disabled") +) + +// LegacyHandlers provides backward-compatible endpoints for the old API. +// These handlers are deprecated and will be removed in a future version. +type LegacyHandlers struct { + logger *logging.Logger + entityStore models.EntityStore + logStore logstore.LogStore + deliveryMQ *deliverymq.DeliveryMQ +} + +func NewLegacyHandlers( + logger *logging.Logger, + entityStore models.EntityStore, + logStore logstore.LogStore, + deliveryMQ *deliverymq.DeliveryMQ, +) *LegacyHandlers { + return &LegacyHandlers{ + logger: logger, + entityStore: entityStore, + logStore: logStore, + deliveryMQ: deliveryMQ, + } +} + +// setDeprecationHeader adds deprecation warning headers to the response. +func setDeprecationHeader(c *gin.Context, newEndpoint string) { + c.Header("Deprecation", "true") + c.Header("X-Deprecated-Message", "This endpoint is deprecated. Use "+newEndpoint+" instead.") +} + +// RetryByEventDestination handles the legacy retry endpoint: +// POST /:tenantID/destinations/:destinationID/events/:eventID/retry +// +// This shim finds the latest delivery for the event+destination pair and retries it. +// Deprecated: Use POST /:tenantID/deliveries/:deliveryID/retry instead. +func (h *LegacyHandlers) RetryByEventDestination(c *gin.Context) { + setDeprecationHeader(c, "POST /:tenantID/deliveries/:deliveryID/retry") + + tenant := mustTenantFromContext(c) + if tenant == nil { + return + } + destinationID := c.Param("destinationID") + eventID := c.Param("eventID") + + // 1. Check destination exists and is enabled + destination, err := h.entityStore.RetrieveDestination(c.Request.Context(), tenant.ID, destinationID) + if err != nil { + AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) + return + } + if destination == nil { + AbortWithError(c, http.StatusNotFound, NewErrNotFound("destination")) + return + } + if destination.DisabledAt != nil { + AbortWithError(c, http.StatusBadRequest, NewErrBadRequest(ErrDestinationDisabled)) + return + } + + // 2. Retrieve event + event, err := h.logStore.RetrieveEvent(c.Request.Context(), logstore.RetrieveEventRequest{ + TenantID: tenant.ID, + EventID: eventID, + }) + if err != nil { + AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) + return + } + if event == nil { + AbortWithError(c, http.StatusNotFound, NewErrNotFound("event")) + return + } + + // 3. Create and publish retry delivery event + deliveryEvent := models.NewManualDeliveryEvent(*event, destination.ID) + + if err := h.deliveryMQ.Publish(c.Request.Context(), deliveryEvent); err != nil { + AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) + return + } + + h.logger.Ctx(c.Request.Context()).Audit("manual retry initiated (legacy)", + zap.String("event_id", event.ID), + zap.String("destination_id", destination.ID)) + + c.JSON(http.StatusAccepted, gin.H{ + "success": true, + }) +} + +// ListEventsByDestination handles the legacy endpoint: +// GET /:tenantID/destinations/:destinationID/events +// +// This shim queries deliveries filtered by destination and returns unique events. +// Deprecated: Use GET /:tenantID/deliveries?destination_id=X&expand=event instead. +func (h *LegacyHandlers) ListEventsByDestination(c *gin.Context) { + setDeprecationHeader(c, "GET /:tenantID/deliveries?destination_id=X&expand=event") + + tenant := mustTenantFromContext(c) + if tenant == nil { + return + } + destinationID := c.Param("destinationID") + + // Parse pagination params + limit := 100 + if limitStr := c.Query("limit"); limitStr != "" { + if parsed, err := strconv.Atoi(limitStr); err == nil && parsed > 0 { + limit = parsed + } + } + + // Query deliveries for this destination with pagination + response, err := h.logStore.ListDeliveryEvent(c.Request.Context(), logstore.ListDeliveryEventRequest{ + TenantID: tenant.ID, + DestinationIDs: []string{destinationID}, + Limit: limit, + Next: c.Query("next"), + Prev: c.Query("prev"), + SortBy: "event_time", + SortOrder: "desc", + }) + if err != nil { + AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) + return + } + + // Extract unique events (by event ID, keep first occurrence) + seen := make(map[string]bool) + events := []models.Event{} + for _, de := range response.Data { + if !seen[de.Event.ID] { + seen[de.Event.ID] = true + events = append(events, de.Event) + } + } + + // Return empty array (not null) if no events + if len(events) == 0 { + c.JSON(http.StatusOK, gin.H{ + "data": []models.Event{}, + "next": "", + "prev": "", + "count": 0, + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "data": events, + "next": response.Next, + "prev": response.Prev, + "count": len(events), + }) +} + +// RetrieveEventByDestination handles the legacy endpoint: +// GET /:tenantID/destinations/:destinationID/events/:eventID +// +// Deprecated: Use GET /:tenantID/events/:eventID instead. +func (h *LegacyHandlers) RetrieveEventByDestination(c *gin.Context) { + setDeprecationHeader(c, "GET /:tenantID/events/:eventID") + + tenant := mustTenantFromContext(c) + if tenant == nil { + return + } + eventID := c.Param("eventID") + // destinationID is available but not strictly needed for retrieval + + event, err := h.logStore.RetrieveEvent(c.Request.Context(), logstore.RetrieveEventRequest{ + TenantID: tenant.ID, + EventID: eventID, + }) + if err != nil { + AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) + return + } + if event == nil { + AbortWithError(c, http.StatusNotFound, NewErrNotFound("event")) + return + } + + c.JSON(http.StatusOK, event) +} + +// LegacyDeliveryResponse matches the old delivery response format. +type LegacyDeliveryResponse struct { + ID string `json:"id"` + DeliveredAt string `json:"delivered_at"` + Status string `json:"status"` + Code string `json:"code"` + ResponseData map[string]interface{} `json:"response_data"` +} + +// ListDeliveriesByEvent handles the legacy endpoint: +// GET /:tenantID/events/:eventID/deliveries +// +// Deprecated: Use GET /:tenantID/deliveries?event_id=X instead. +func (h *LegacyHandlers) ListDeliveriesByEvent(c *gin.Context) { + setDeprecationHeader(c, "GET /:tenantID/deliveries?event_id=X") + + tenant := mustTenantFromContext(c) + if tenant == nil { + return + } + eventID := c.Param("eventID") + + // Query deliveries for this event + response, err := h.logStore.ListDeliveryEvent(c.Request.Context(), logstore.ListDeliveryEventRequest{ + TenantID: tenant.ID, + EventID: eventID, + Limit: 100, + SortBy: "delivery_time", + SortOrder: "desc", + }) + if err != nil { + AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) + return + } + + // Return empty array (not null) if no deliveries + if len(response.Data) == 0 { + c.JSON(http.StatusOK, []LegacyDeliveryResponse{}) + return + } + + // Transform to legacy delivery response format (bare array) + deliveries := make([]LegacyDeliveryResponse, len(response.Data)) + for i, de := range response.Data { + deliveries[i] = LegacyDeliveryResponse{ + ID: de.Delivery.ID, + DeliveredAt: de.Delivery.Time.UTC().Format("2006-01-02T15:04:05Z07:00"), + Status: de.Delivery.Status, + Code: de.Delivery.Code, + ResponseData: de.Delivery.ResponseData, + } + } + + c.JSON(http.StatusOK, deliveries) +} diff --git a/internal/apirouter/legacy_handlers_test.go b/internal/apirouter/legacy_handlers_test.go new file mode 100644 index 00000000..14e1880e --- /dev/null +++ b/internal/apirouter/legacy_handlers_test.go @@ -0,0 +1,348 @@ +package apirouter_test + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hookdeck/outpost/internal/idgen" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLegacyRetryByEventDestination(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant and destination + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + // Seed a delivery event + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("failed"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + t.Run("should retry via legacy endpoint and return deprecation header", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/"+tenantID+"/destinations/"+destinationID+"/events/"+eventID+"/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusAccepted, w.Code) + assert.Equal(t, "true", w.Header().Get("Deprecation")) + assert.Contains(t, w.Header().Get("X-Deprecated-Message"), "POST /:tenantID/deliveries/:deliveryID/retry") + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + assert.Equal(t, true, response["success"]) + }) + + t.Run("should return 404 for non-existent event", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/"+tenantID+"/destinations/"+destinationID+"/events/nonexistent/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("should return 404 for non-existent destination", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/"+tenantID+"/destinations/nonexistent/events/"+eventID+"/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("should return 400 when destination is disabled", func(t *testing.T) { + // Create a disabled destination + disabledDestinationID := idgen.Destination() + disabledAt := time.Now() + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: disabledDestinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + DisabledAt: &disabledAt, + })) + + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/"+tenantID+"/destinations/"+disabledDestinationID+"/events/"+eventID+"/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusBadRequest, w.Code) + }) +} + +func TestLegacyListEventsByDestination(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant and destination + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + // Seed delivery events + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("success"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + t.Run("should list events for destination with deprecation header", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/destinations/"+destinationID+"/events", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "true", w.Header().Get("Deprecation")) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + events := response["data"].([]interface{}) + assert.Len(t, events, 1) + + firstEvent := events[0].(map[string]interface{}) + assert.Equal(t, eventID, firstEvent["id"]) + assert.Equal(t, "order.created", firstEvent["topic"]) + }) +} + +func TestLegacyRetrieveEventByDestination(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant and destination + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + // Seed a delivery event + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("success"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + t.Run("should retrieve event by destination with deprecation header", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/destinations/"+destinationID+"/events/"+eventID, nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "true", w.Header().Get("Deprecation")) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + assert.Equal(t, eventID, response["id"]) + assert.Equal(t, "order.created", response["topic"]) + }) + + t.Run("should return 404 for non-existent event", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/destinations/"+destinationID+"/events/nonexistent", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) +} + +func TestLegacyListDeliveriesByEvent(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant and destination + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + // Seed a delivery event + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("success"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + t.Run("should list deliveries for event with deprecation header", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/events/"+eventID+"/deliveries", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "true", w.Header().Get("Deprecation")) + + // Old format returns bare array, not {data: [...]} + var deliveries []map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &deliveries)) + + assert.Len(t, deliveries, 1) + assert.Equal(t, deliveryID, deliveries[0]["id"]) + assert.Equal(t, "success", deliveries[0]["status"]) + }) + + t.Run("should return empty list for non-existent event", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/events/nonexistent/deliveries", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + // Old format returns bare array + var deliveries []map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &deliveries)) + + assert.Len(t, deliveries, 0) + }) +} diff --git a/internal/apirouter/log_handlers.go b/internal/apirouter/log_handlers.go index e827021b..39b5d0c1 100644 --- a/internal/apirouter/log_handlers.go +++ b/internal/apirouter/log_handlers.go @@ -26,20 +26,136 @@ func NewLogHandlers( } } -func (h *LogHandlers) ListEvent(c *gin.Context) { - h.listEvent(c, c.QueryArray("destination_id")) +// ExpandOptions represents which fields to expand in the response +type ExpandOptions struct { + Event bool + EventData bool + Destination bool } -func (h *LogHandlers) ListEventByDestination(c *gin.Context) { - h.listEvent(c, []string{c.Param("destinationID")}) +func parseExpandOptions(c *gin.Context) ExpandOptions { + opts := ExpandOptions{} + for _, e := range c.QueryArray("expand") { + switch e { + case "event": + opts.Event = true + case "event.data": + opts.Event = true + opts.EventData = true + case "destination": + opts.Destination = true + } + } + return opts +} + +// API Response types + +// APIDelivery is the API response for a delivery +type APIDelivery struct { + ID string `json:"id"` + Status string `json:"status"` + DeliveredAt time.Time `json:"delivered_at"` + Code string `json:"code,omitempty"` + ResponseData map[string]interface{} `json:"response_data,omitempty"` + Attempt int `json:"attempt"` + + // Expandable fields - string (ID) or object depending on expand + Event interface{} `json:"event"` + Destination string `json:"destination"` +} + +// APIEventSummary is the event object when expand=event (without data) +type APIEventSummary struct { + ID string `json:"id"` + Topic string `json:"topic"` + Time time.Time `json:"time"` + EligibleForRetry bool `json:"eligible_for_retry"` + Metadata map[string]string `json:"metadata,omitempty"` } -func (h *LogHandlers) listEvent(c *gin.Context, destinationIDs []string) { +// APIEventFull is the event object when expand=event.data +type APIEventFull struct { + ID string `json:"id"` + Topic string `json:"topic"` + Time time.Time `json:"time"` + EligibleForRetry bool `json:"eligible_for_retry"` + Metadata map[string]string `json:"metadata,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` +} + +// APIEvent is the API response for retrieving a single event +type APIEvent struct { + ID string `json:"id"` + Topic string `json:"topic"` + Time time.Time `json:"time"` + EligibleForRetry bool `json:"eligible_for_retry"` + Metadata map[string]string `json:"metadata,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` +} + +// ListDeliveriesResponse is the response for ListDeliveries +type ListDeliveriesResponse struct { + Data []APIDelivery `json:"data"` + Next string `json:"next,omitempty"` + Prev string `json:"prev,omitempty"` +} + +// toAPIDelivery converts a DeliveryEvent to APIDelivery with expand options +func toAPIDelivery(de *models.DeliveryEvent, opts ExpandOptions) APIDelivery { + api := APIDelivery{ + ID: de.Delivery.ID, + Attempt: de.Attempt, + Destination: de.DestinationID, + } + + // Set delivery fields if delivery exists + if de.Delivery != nil { + api.Status = de.Delivery.Status + api.DeliveredAt = de.Delivery.Time + api.Code = de.Delivery.Code + api.ResponseData = de.Delivery.ResponseData + } + + // Handle event expansion + if opts.EventData { + api.Event = APIEventFull{ + ID: de.Event.ID, + Topic: de.Event.Topic, + Time: de.Event.Time, + EligibleForRetry: de.Event.EligibleForRetry, + Metadata: de.Event.Metadata, + Data: de.Event.Data, + } + } else if opts.Event { + api.Event = APIEventSummary{ + ID: de.Event.ID, + Topic: de.Event.Topic, + Time: de.Event.Time, + EligibleForRetry: de.Event.EligibleForRetry, + Metadata: de.Event.Metadata, + } + } else { + api.Event = de.Event.ID + } + + // TODO: Handle destination expansion + // This would require injecting EntityStore into LogHandlers and batch-fetching + // destinations by ID. Consider if this is needed - clients can fetch destination + // details separately via GET /destinations/:id if needed. + + return api +} + +// ListDeliveries handles GET /:tenantID/deliveries +// Query params: event_id, destination_id, status, topic[], start, end, limit, next, prev, expand[] +func (h *LogHandlers) ListDeliveries(c *gin.Context) { tenant := mustTenantFromContext(c) if tenant == nil { return } + // Parse time filters var start, end *time.Time if startStr := c.Query("start"); startStr != "" { t, err := time.Parse(time.RFC3339, startStr) @@ -70,137 +186,109 @@ func (h *LogHandlers) listEvent(c *gin.Context, destinationIDs []string) { end = &t } - limitStr := c.Query("limit") - limit, err := strconv.Atoi(limitStr) - if err != nil { - limit = 100 + // Parse limit + limit := 100 + if limitStr := c.Query("limit"); limitStr != "" { + if l, err := strconv.Atoi(limitStr); err == nil && l > 0 { + limit = l + } } - response, err := h.logStore.ListEvent(c.Request.Context(), logstore.ListEventRequest{ - Next: c.Query("next"), - Prev: c.Query("prev"), - Limit: limit, - Start: start, - End: end, + + // Parse destination_id (single value for now) + var destinationIDs []string + if destID := c.Query("destination_id"); destID != "" { + destinationIDs = []string{destID} + } + + // Build request + req := logstore.ListDeliveryEventRequest{ TenantID: tenant.ID, + EventID: c.Query("event_id"), DestinationIDs: destinationIDs, - Topics: c.QueryArray("topic"), Status: c.Query("status"), - }) + Topics: c.QueryArray("topic"), + DeliveryStart: start, + DeliveryEnd: end, + Limit: limit, + Next: c.Query("next"), + Prev: c.Query("prev"), + } + + // Call logstore + response, err := h.logStore.ListDeliveryEvent(c.Request.Context(), req) if err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) return } - if len(response.Data) == 0 { - // Return an empty array instead of null - c.JSON(http.StatusOK, gin.H{ - "data": []models.Event{}, - "next": "", - "prev": "", - "count": 0, - }) - return + + // Parse expand options + expandOpts := parseExpandOptions(c) + + // Transform to API response + apiDeliveries := make([]APIDelivery, len(response.Data)) + for i, de := range response.Data { + apiDeliveries[i] = toAPIDelivery(de, expandOpts) } - c.JSON(http.StatusOK, gin.H{ - "data": response.Data, - "next": response.Next, - "prev": response.Prev, - "count": response.Count, + + c.JSON(http.StatusOK, ListDeliveriesResponse{ + Data: apiDeliveries, + Next: response.Next, + Prev: response.Prev, }) } +// RetrieveEvent handles GET /:tenantID/events/:eventID func (h *LogHandlers) RetrieveEvent(c *gin.Context) { tenant := mustTenantFromContext(c) if tenant == nil { return } eventID := c.Param("eventID") - event, err := h.logStore.RetrieveEvent(c.Request.Context(), tenant.ID, eventID) + event, err := h.logStore.RetrieveEvent(c.Request.Context(), logstore.RetrieveEventRequest{ + TenantID: tenant.ID, + EventID: eventID, + }) if err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) return } if event == nil { - c.Status(http.StatusNotFound) + AbortWithError(c, http.StatusNotFound, NewErrNotFound("event")) return } - c.JSON(http.StatusOK, event) + c.JSON(http.StatusOK, APIEvent{ + ID: event.ID, + Topic: event.Topic, + Time: event.Time, + EligibleForRetry: event.EligibleForRetry, + Metadata: event.Metadata, + Data: event.Data, + }) } -func (h *LogHandlers) RetrieveEventByDestination(c *gin.Context) { +// RetrieveDelivery handles GET /:tenantID/deliveries/:deliveryID +func (h *LogHandlers) RetrieveDelivery(c *gin.Context) { tenant := mustTenantFromContext(c) if tenant == nil { return } - destinationID := c.Param("destinationID") - eventID := c.Param("eventID") - event, err := h.logStore.RetrieveEventByDestination(c.Request.Context(), tenant.ID, destinationID, eventID) - if err != nil { - AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) - return - } - if event == nil { - c.Status(http.StatusNotFound) - return - } - c.JSON(http.StatusOK, event) -} + deliveryID := c.Param("deliveryID") -type DeliveryResponse struct { - ID string `json:"id"` - DeliveredAt string `json:"delivered_at"` - Status string `json:"status"` - Code string `json:"code"` - ResponseData map[string]interface{} `json:"response_data"` -} - -func (h *LogHandlers) ListDeliveryByEvent(c *gin.Context) { - event := h.mustEventWithTenant(c, c.Param("eventID")) - if event == nil { - return - } - deliveries, err := h.logStore.ListDelivery(c.Request.Context(), logstore.ListDeliveryRequest{ - EventID: event.ID, - DestinationID: c.Query("destination_id"), + deliveryEvent, err := h.logStore.RetrieveDeliveryEvent(c.Request.Context(), logstore.RetrieveDeliveryEventRequest{ + TenantID: tenant.ID, + DeliveryID: deliveryID, }) if err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) return } - if len(deliveries) == 0 { - // Return an empty array instead of null - c.JSON(http.StatusOK, []DeliveryResponse{}) + if deliveryEvent == nil { + AbortWithError(c, http.StatusNotFound, NewErrNotFound("delivery")) return } - deliveryData := make([]DeliveryResponse, len(deliveries)) - for i, delivery := range deliveries { - deliveryData[i] = DeliveryResponse{ - ID: delivery.ID, - DeliveredAt: delivery.Time.UTC().Format(time.RFC3339), - Status: delivery.Status, - Code: delivery.Code, - ResponseData: delivery.ResponseData, - } - } - c.JSON(http.StatusOK, deliveryData) -} -func (h *LogHandlers) mustEventWithTenant(c *gin.Context, eventID string) *models.Event { - tenant := mustTenantFromContext(c) - if tenant == nil { - return nil - } - event, err := h.logStore.RetrieveEvent(c.Request.Context(), tenant.ID, eventID) - if err != nil { - AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) - return nil - } - if event == nil { - c.Status(http.StatusNotFound) - return nil - } - if event.TenantID != tenant.ID { - c.Status(http.StatusForbidden) - return nil - } - return event + // Parse expand options + expandOpts := parseExpandOptions(c) + + c.JSON(http.StatusOK, toAPIDelivery(deliveryEvent, expandOpts)) } diff --git a/internal/apirouter/log_handlers_test.go b/internal/apirouter/log_handlers_test.go new file mode 100644 index 00000000..d2b36707 --- /dev/null +++ b/internal/apirouter/log_handlers_test.go @@ -0,0 +1,387 @@ +package apirouter_test + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hookdeck/outpost/internal/idgen" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestListDeliveries(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + t.Run("should return empty list when no deliveries", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + data := response["data"].([]interface{}) + assert.Len(t, data, 0) + }) + + t.Run("should list deliveries", func(t *testing.T) { + // Seed delivery events + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("user.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("success"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + data := response["data"].([]interface{}) + assert.Len(t, data, 1) + + firstDelivery := data[0].(map[string]interface{}) + assert.Equal(t, deliveryID, firstDelivery["id"]) + assert.Equal(t, "success", firstDelivery["status"]) + assert.Equal(t, eventID, firstDelivery["event"]) // Not expanded + assert.Equal(t, destinationID, firstDelivery["destination"]) + }) + + t.Run("should expand event when expand=event", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries?expand=event", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + data := response["data"].([]interface{}) + require.Len(t, data, 1) + + firstDelivery := data[0].(map[string]interface{}) + event := firstDelivery["event"].(map[string]interface{}) + assert.NotNil(t, event["id"]) + assert.Equal(t, "user.created", event["topic"]) + // data should not be present without expand=event.data + assert.Nil(t, event["data"]) + }) + + t.Run("should expand event.data when expand=event.data", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries?expand=event.data", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + data := response["data"].([]interface{}) + require.Len(t, data, 1) + + firstDelivery := data[0].(map[string]interface{}) + event := firstDelivery["event"].(map[string]interface{}) + assert.NotNil(t, event["id"]) + assert.NotNil(t, event["data"]) // data should be present + }) + + t.Run("should filter by destination_id", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries?destination_id="+destinationID, nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + data := response["data"].([]interface{}) + assert.Len(t, data, 1) + }) + + t.Run("should filter by non-existent destination_id", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries?destination_id=nonexistent", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + data := response["data"].([]interface{}) + assert.Len(t, data, 0) + }) + + t.Run("should return 404 for non-existent tenant", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/nonexistent/deliveries", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) +} + +func TestRetrieveDelivery(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + // Seed a delivery event + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("failed"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + t.Run("should retrieve delivery by ID", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries/"+deliveryID, nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + assert.Equal(t, deliveryID, response["id"]) + assert.Equal(t, "failed", response["status"]) + assert.Equal(t, eventID, response["event"]) // Not expanded + assert.Equal(t, destinationID, response["destination"]) + }) + + t.Run("should expand event when expand=event", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries/"+deliveryID+"?expand=event", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + event := response["event"].(map[string]interface{}) + assert.Equal(t, eventID, event["id"]) + assert.Equal(t, "order.created", event["topic"]) + // data should not be present without expand=event.data + assert.Nil(t, event["data"]) + }) + + t.Run("should expand event.data when expand=event.data", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries/"+deliveryID+"?expand=event.data", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + event := response["event"].(map[string]interface{}) + assert.Equal(t, eventID, event["id"]) + assert.NotNil(t, event["data"]) // data should be present + }) + + t.Run("should return 404 for non-existent delivery", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/deliveries/nonexistent", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("should return 404 for non-existent tenant", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/nonexistent/deliveries/"+deliveryID, nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) +} + +func TestRetrieveEvent(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + // Seed a delivery event + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("payment.processed"), + testutil.EventFactory.WithTime(eventTime), + testutil.EventFactory.WithData(map[string]interface{}{ + "amount": 100.50, + }), + testutil.EventFactory.WithMetadata(map[string]string{ + "source": "stripe", + }), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("success"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + t.Run("should retrieve event by ID", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/events/"+eventID, nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + + assert.Equal(t, eventID, response["id"]) + assert.Equal(t, "payment.processed", response["topic"]) + assert.Equal(t, "stripe", response["metadata"].(map[string]interface{})["source"]) + assert.Equal(t, 100.50, response["data"].(map[string]interface{})["amount"]) + // tenant_id is not included in API response (tenant-scoped via URL) + assert.Nil(t, response["tenant_id"]) + }) + + t.Run("should return 404 for non-existent event", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/"+tenantID+"/events/nonexistent", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("should return 404 for non-existent tenant", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", baseAPIPath+"/nonexistent/events/"+eventID, nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) +} diff --git a/internal/apirouter/retry_handlers.go b/internal/apirouter/retry_handlers.go index ca357a86..e9fe821c 100644 --- a/internal/apirouter/retry_handlers.go +++ b/internal/apirouter/retry_handlers.go @@ -1,7 +1,6 @@ package apirouter import ( - "errors" "net/http" "github.com/gin-gonic/gin" @@ -12,10 +11,6 @@ import ( "go.uber.org/zap" ) -var ( - ErrDestinationDisabled = errors.New("destination is disabled") -) - type RetryHandlers struct { logger *logging.Logger entityStore models.EntityStore @@ -23,7 +18,12 @@ type RetryHandlers struct { deliveryMQ *deliverymq.DeliveryMQ } -func NewRetryHandlers(logger *logging.Logger, entityStore models.EntityStore, logStore logstore.LogStore, deliveryMQ *deliverymq.DeliveryMQ) *RetryHandlers { +func NewRetryHandlers( + logger *logging.Logger, + entityStore models.EntityStore, + logStore logstore.LogStore, + deliveryMQ *deliverymq.DeliveryMQ, +) *RetryHandlers { return &RetryHandlers{ logger: logger, entityStore: entityStore, @@ -32,50 +32,64 @@ func NewRetryHandlers(logger *logging.Logger, entityStore models.EntityStore, lo } } -func (h *RetryHandlers) Retry(c *gin.Context) { - tenantID := c.Param("tenantID") - destinationID := c.Param("destinationID") - eventID := c.Param("eventID") +// RetryDelivery handles POST /:tenantID/deliveries/:deliveryID/retry +// Constraints: +// - Only the latest delivery for an event+destination pair can be retried +// - Destination must exist and be enabled +func (h *RetryHandlers) RetryDelivery(c *gin.Context) { + tenant := mustTenantFromContext(c) + if tenant == nil { + return + } + deliveryID := c.Param("deliveryID") - // 1. Retrieve destination & event data - destination, err := h.entityStore.RetrieveDestination(c, tenantID, destinationID) + // 1. Look up delivery by ID + deliveryEvent, err := h.logStore.RetrieveDeliveryEvent(c.Request.Context(), logstore.RetrieveDeliveryEventRequest{ + TenantID: tenant.ID, + DeliveryID: deliveryID, + }) if err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) return } - if destination == nil { - AbortWithError(c, http.StatusNotFound, NewErrNotFound("destination")) - return - } - if destination.DisabledAt != nil { - AbortWithError(c, http.StatusBadRequest, NewErrBadRequest(ErrDestinationDisabled)) + if deliveryEvent == nil { + AbortWithError(c, http.StatusNotFound, NewErrNotFound("delivery")) return } - event, err := h.logStore.RetrieveEvent(c, logstore.RetrieveEventRequest{ - TenantID: tenantID, - EventID: eventID, - }) + // 2. Check destination exists and is enabled + destination, err := h.entityStore.RetrieveDestination(c.Request.Context(), tenant.ID, deliveryEvent.DestinationID) if err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) return } - if event == nil { - AbortWithError(c, http.StatusNotFound, NewErrNotFound("event")) + if destination == nil { + AbortWithError(c, http.StatusNotFound, NewErrNotFound("destination")) + return + } + if destination.DisabledAt != nil { + AbortWithError(c, http.StatusBadRequest, ErrorResponse{ + Code: http.StatusBadRequest, + Message: "Destination is disabled", + Data: map[string]string{ + "error": "destination_disabled", + }, + }) return } - // 2. Initiate redelivery - deliveryEvent := models.NewManualDeliveryEvent(*event, destination.ID) + // 3. Create and publish retry delivery event + retryDeliveryEvent := models.NewManualDeliveryEvent(deliveryEvent.Event, deliveryEvent.DestinationID) - if err := h.deliveryMQ.Publish(c, deliveryEvent); err != nil { + if err := h.deliveryMQ.Publish(c.Request.Context(), retryDeliveryEvent); err != nil { AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err)) return } - h.logger.Ctx(c).Audit("manual retry initiated", - zap.String("event_id", event.ID), - zap.String("destination_id", destination.ID)) + h.logger.Ctx(c.Request.Context()).Audit("manual retry initiated", + zap.String("delivery_id", deliveryID), + zap.String("event_id", deliveryEvent.Event.ID), + zap.String("destination_id", deliveryEvent.DestinationID)) c.JSON(http.StatusAccepted, gin.H{ "success": true, diff --git a/internal/apirouter/retry_handlers_test.go b/internal/apirouter/retry_handlers_test.go new file mode 100644 index 00000000..0f75fa76 --- /dev/null +++ b/internal/apirouter/retry_handlers_test.go @@ -0,0 +1,150 @@ +package apirouter_test + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hookdeck/outpost/internal/idgen" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRetryDelivery(t *testing.T) { + t.Parallel() + + result := setupTestRouterFull(t, "", "") + + // Create a tenant and destination + tenantID := idgen.String() + destinationID := idgen.Destination() + require.NoError(t, result.entityStore.UpsertTenant(context.Background(), models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + })) + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: destinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + })) + + // Seed a delivery event + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("failed"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{de})) + + t.Run("should retry delivery successfully", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/"+tenantID+"/deliveries/"+deliveryID+"/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusAccepted, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + assert.Equal(t, true, response["success"]) + }) + + t.Run("should return 404 for non-existent delivery", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/"+tenantID+"/deliveries/nonexistent/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("should return 404 for non-existent tenant", func(t *testing.T) { + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/nonexistent/deliveries/"+deliveryID+"/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("should return 400 when destination is disabled", func(t *testing.T) { + // Create a new destination that's disabled + disabledDestinationID := idgen.Destination() + disabledAt := time.Now() + require.NoError(t, result.entityStore.UpsertDestination(context.Background(), models.Destination{ + ID: disabledDestinationID, + TenantID: tenantID, + Type: "webhook", + Topics: []string{"*"}, + CreatedAt: time.Now(), + DisabledAt: &disabledAt, + })) + + // Create a delivery for the disabled destination + disabledEventID := idgen.Event() + disabledDeliveryID := idgen.Delivery() + + disabledEvent := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(disabledEventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(disabledDestinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + ) + + disabledDelivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(disabledDeliveryID), + testutil.DeliveryFactory.WithEventID(disabledEventID), + testutil.DeliveryFactory.WithDestinationID(disabledDestinationID), + testutil.DeliveryFactory.WithStatus("failed"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + + disabledDE := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", disabledEventID, disabledDeliveryID), + DestinationID: disabledDestinationID, + Event: *disabledEvent, + Delivery: disabledDelivery, + } + + require.NoError(t, result.logStore.InsertManyDeliveryEvent(context.Background(), []*models.DeliveryEvent{disabledDE})) + + w := httptest.NewRecorder() + req, _ := http.NewRequest("POST", baseAPIPath+"/"+tenantID+"/deliveries/"+disabledDeliveryID+"/retry", nil) + result.router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusBadRequest, w.Code) + + var response map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) + assert.Equal(t, "Destination is disabled", response["message"]) + }) +} diff --git a/internal/apirouter/router.go b/internal/apirouter/router.go index 3f96092e..9f9be79c 100644 --- a/internal/apirouter/router.go +++ b/internal/apirouter/router.go @@ -151,9 +151,10 @@ func NewRouter( tenantHandlers := NewTenantHandlers(logger, telemetry, cfg.JWTSecret, entityStore) destinationHandlers := NewDestinationHandlers(logger, telemetry, entityStore, cfg.Topics, cfg.Registry) publishHandlers := NewPublishHandlers(logger, publishmqEventHandler) - retryHandlers := NewRetryHandlers(logger, entityStore, logStore, deliveryMQ) logHandlers := NewLogHandlers(logger, logStore) + retryHandlers := NewRetryHandlers(logger, entityStore, logStore, deliveryMQ) topicHandlers := NewTopicHandlers(logger, cfg.Topics) + legacyHandlers := NewLegacyHandlers(logger, entityStore, logStore, deliveryMQ) // Admin routes adminRoutes := []RouteDefinition{ @@ -334,11 +335,11 @@ func NewRouter( }, }, - // Event routes + // Delivery routes (new API) { Method: http.MethodGet, - Path: "/:tenantID/events", - Handler: logHandlers.ListEvent, + Path: "/:tenantID/deliveries", + Handler: logHandlers.ListDeliveries, AuthScope: AuthScopeAdminOrTenant, Mode: RouteModeAlways, AllowTenantFromJWT: true, @@ -346,6 +347,30 @@ func NewRouter( RequireTenantMiddleware(entityStore), }, }, + { + Method: http.MethodGet, + Path: "/:tenantID/deliveries/:deliveryID", + Handler: logHandlers.RetrieveDelivery, + AuthScope: AuthScopeAdminOrTenant, + Mode: RouteModeAlways, + AllowTenantFromJWT: true, + Middlewares: []gin.HandlerFunc{ + RequireTenantMiddleware(entityStore), + }, + }, + { + Method: http.MethodPost, + Path: "/:tenantID/deliveries/:deliveryID/retry", + Handler: retryHandlers.RetryDelivery, + AuthScope: AuthScopeAdminOrTenant, + Mode: RouteModeAlways, + AllowTenantFromJWT: true, + Middlewares: []gin.HandlerFunc{ + RequireTenantMiddleware(entityStore), + }, + }, + + // Event routes { Method: http.MethodGet, Path: "/:tenantID/events/:eventID", @@ -357,10 +382,14 @@ func NewRouter( RequireTenantMiddleware(entityStore), }, }, + } + + // Legacy routes (deprecated, for backward compatibility) + legacyRoutes := []RouteDefinition{ { - Method: http.MethodGet, - Path: "/:tenantID/events/:eventID/deliveries", - Handler: logHandlers.ListDeliveryByEvent, + Method: http.MethodPost, + Path: "/:tenantID/destinations/:destinationID/events/:eventID/retry", + Handler: legacyHandlers.RetryByEventDestination, AuthScope: AuthScopeAdminOrTenant, Mode: RouteModeAlways, AllowTenantFromJWT: true, @@ -371,7 +400,7 @@ func NewRouter( { Method: http.MethodGet, Path: "/:tenantID/destinations/:destinationID/events", - Handler: logHandlers.ListEventByDestination, + Handler: legacyHandlers.ListEventsByDestination, AuthScope: AuthScopeAdminOrTenant, Mode: RouteModeAlways, AllowTenantFromJWT: true, @@ -382,7 +411,7 @@ func NewRouter( { Method: http.MethodGet, Path: "/:tenantID/destinations/:destinationID/events/:eventID", - Handler: logHandlers.RetrieveEventByDestination, + Handler: legacyHandlers.RetrieveEventByDestination, AuthScope: AuthScopeAdminOrTenant, Mode: RouteModeAlways, AllowTenantFromJWT: true, @@ -390,15 +419,16 @@ func NewRouter( RequireTenantMiddleware(entityStore), }, }, - - // Retry routes { - Method: http.MethodPost, - Path: "/:tenantID/destinations/:destinationID/events/:eventID/retry", - Handler: retryHandlers.Retry, + Method: http.MethodGet, + Path: "/:tenantID/events/:eventID/deliveries", + Handler: legacyHandlers.ListDeliveriesByEvent, AuthScope: AuthScopeAdminOrTenant, Mode: RouteModeAlways, AllowTenantFromJWT: true, + Middlewares: []gin.HandlerFunc{ + RequireTenantMiddleware(entityStore), + }, }, } @@ -408,6 +438,7 @@ func NewRouter( apiRoutes = append(apiRoutes, portalRoutes...) apiRoutes = append(apiRoutes, tenantAgnosticRoutes...) apiRoutes = append(apiRoutes, tenantSpecificRoutes...) + apiRoutes = append(apiRoutes, legacyRoutes...) registerRoutes(apiRouter, cfg, apiRoutes) diff --git a/internal/apirouter/router_test.go b/internal/apirouter/router_test.go index fd2511eb..b5f2c54e 100644 --- a/internal/apirouter/router_test.go +++ b/internal/apirouter/router_test.go @@ -28,7 +28,20 @@ import ( const baseAPIPath = "/api/v1" +type testRouterResult struct { + router http.Handler + logger *logging.Logger + redisClient redis.Client + entityStore models.EntityStore + logStore logstore.LogStore +} + func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *testing.T) clickhouse.DB) (http.Handler, *logging.Logger, redis.Client) { + result := setupTestRouterFull(t, apiKey, jwtSecret, funcs...) + return result.router, result.logger, result.redisClient +} + +func setupTestRouterFull(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *testing.T) clickhouse.DB) testRouterResult { gin.SetMode(gin.TestMode) logger := testutil.CreateTestLogger(t) redisClient := testutil.CreateTestRedisClient(t) @@ -53,7 +66,13 @@ func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *te eventHandler, &telemetry.NoopTelemetry{}, ) - return router, logger, redisClient + return testRouterResult{ + router: router, + logger: logger, + redisClient: redisClient, + entityStore: entityStore, + logStore: logStore, + } } func setupTestLogStore(t *testing.T, funcs ...func(t *testing.T) clickhouse.DB) logstore.LogStore { @@ -62,7 +81,7 @@ func setupTestLogStore(t *testing.T, funcs ...func(t *testing.T) clickhouse.DB) chDB = f(t) } if chDB == nil { - return logstore.NewNoopLogStore() + return logstore.NewMemLogStore() } logStore, err := logstore.NewLogStore(context.Background(), logstore.DriverOpts{ CH: chDB, diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index f586c39f..f28bf927 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -410,6 +410,121 @@ func (s *logStoreImpl) RetrieveEvent(ctx context.Context, req driver.RetrieveEve return event, nil } +// RetrieveDeliveryEvent retrieves a single delivery event by delivery ID. +func (s *logStoreImpl) RetrieveDeliveryEvent(ctx context.Context, req driver.RetrieveDeliveryEventRequest) (*models.DeliveryEvent, error) { + query := ` + SELECT + event_id, + tenant_id, + destination_id, + topic, + eligible_for_retry, + event_time, + metadata, + data, + delivery_id, + delivery_event_id, + status, + delivery_time, + code, + response_data + FROM event_log + WHERE tenant_id = ? AND delivery_id = ? + LIMIT 1` + + rows, err := s.chDB.Query(ctx, query, req.TenantID, req.DeliveryID) + if err != nil { + return nil, fmt.Errorf("query failed: %w", err) + } + defer rows.Close() + + if !rows.Next() { + return nil, nil + } + + var ( + eventID string + tenantID string + destinationID string + topic string + eligibleForRetry bool + eventTime time.Time + metadataStr string + dataStr string + deliveryID string + deliveryEventID string + status string + deliveryTime time.Time + code string + responseDataStr string + ) + + err = rows.Scan( + &eventID, + &tenantID, + &destinationID, + &topic, + &eligibleForRetry, + &eventTime, + &metadataStr, + &dataStr, + &deliveryID, + &deliveryEventID, + &status, + &deliveryTime, + &code, + &responseDataStr, + ) + if err != nil { + return nil, fmt.Errorf("scan failed: %w", err) + } + + // Parse JSON fields + var metadata map[string]string + var data map[string]interface{} + var responseData map[string]interface{} + + if metadataStr != "" { + if err := json.Unmarshal([]byte(metadataStr), &metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + } + if dataStr != "" { + if err := json.Unmarshal([]byte(dataStr), &data); err != nil { + return nil, fmt.Errorf("failed to unmarshal data: %w", err) + } + } + if responseDataStr != "" { + if err := json.Unmarshal([]byte(responseDataStr), &responseData); err != nil { + return nil, fmt.Errorf("failed to unmarshal response_data: %w", err) + } + } + + return &models.DeliveryEvent{ + ID: deliveryEventID, + DestinationID: destinationID, + Event: models.Event{ + ID: eventID, + TenantID: tenantID, + DestinationID: destinationID, + Topic: topic, + EligibleForRetry: eligibleForRetry, + Time: eventTime, + Data: data, + Metadata: metadata, + }, + Delivery: &models.Delivery{ + ID: deliveryID, + EventID: eventID, + DestinationID: destinationID, + Status: status, + Time: deliveryTime, + Code: code, + ResponseData: responseData, + }, + }, nil +} + func (s *logStoreImpl) InsertManyDeliveryEvent(ctx context.Context, deliveryEvents []*models.DeliveryEvent) error { if len(deliveryEvents) == 0 { return nil diff --git a/internal/logstore/driver/driver.go b/internal/logstore/driver/driver.go index 1eebd9d4..3418a1c4 100644 --- a/internal/logstore/driver/driver.go +++ b/internal/logstore/driver/driver.go @@ -15,6 +15,7 @@ var ErrInvalidCursor = errors.New("invalid cursor") type LogStore interface { ListDeliveryEvent(context.Context, ListDeliveryEventRequest) (ListDeliveryEventResponse, error) RetrieveEvent(ctx context.Context, request RetrieveEventRequest) (*models.Event, error) + RetrieveDeliveryEvent(ctx context.Context, request RetrieveDeliveryEventRequest) (*models.DeliveryEvent, error) InsertManyDeliveryEvent(context.Context, []*models.DeliveryEvent) error } @@ -46,3 +47,8 @@ type RetrieveEventRequest struct { EventID string // required DestinationID string // optional - if provided, scopes to that destination } + +type RetrieveDeliveryEventRequest struct { + TenantID string // required + DeliveryID string // required +} diff --git a/internal/logstore/drivertest/drivertest.go b/internal/logstore/drivertest/drivertest.go index 3bcecb14..e81d9ee0 100644 --- a/internal/logstore/drivertest/drivertest.go +++ b/internal/logstore/drivertest/drivertest.go @@ -42,6 +42,9 @@ func RunConformanceTests(t *testing.T, newHarness HarnessMaker) { t.Run("TestRetrieveEvent", func(t *testing.T) { testRetrieveEvent(t, newHarness) }) + t.Run("TestRetrieveDeliveryEvent", func(t *testing.T) { + testRetrieveDeliveryEvent(t, newHarness) + }) t.Run("TestTenantIsolation", func(t *testing.T) { testTenantIsolation(t, newHarness) }) @@ -698,6 +701,169 @@ func testRetrieveEvent(t *testing.T, newHarness HarnessMaker) { }) } +// testRetrieveDeliveryEvent tests the RetrieveDeliveryEvent method +func testRetrieveDeliveryEvent(t *testing.T, newHarness HarnessMaker) { + t.Helper() + + ctx := context.Background() + h, err := newHarness(ctx, t) + require.NoError(t, err) + t.Cleanup(h.Close) + + logStore, err := h.MakeDriver(ctx) + require.NoError(t, err) + + tenantID := idgen.String() + destinationID := idgen.Destination() + eventID := idgen.Event() + deliveryID := idgen.Delivery() + eventTime := time.Now().Truncate(time.Millisecond) + deliveryTime := eventTime.Add(100 * time.Millisecond) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID(eventID), + testutil.EventFactory.WithTenantID(tenantID), + testutil.EventFactory.WithDestinationID(destinationID), + testutil.EventFactory.WithTopic("order.created"), + testutil.EventFactory.WithTime(eventTime), + testutil.EventFactory.WithEligibleForRetry(true), + testutil.EventFactory.WithMetadata(map[string]string{ + "source": "api", + }), + testutil.EventFactory.WithData(map[string]interface{}{ + "order_id": "ord_456", + "amount": 99.99, + }), + ) + + delivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(deliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("success"), + testutil.DeliveryFactory.WithTime(deliveryTime), + ) + delivery.Code = "200" + delivery.ResponseData = map[string]interface{}{ + "latency_ms": 42, + } + + de := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: delivery, + } + + require.NoError(t, logStore.InsertManyDeliveryEvent(ctx, []*models.DeliveryEvent{de})) + require.NoError(t, h.FlushWrites(ctx)) + + t.Run("retrieve existing delivery with all fields", func(t *testing.T) { + retrieved, err := logStore.RetrieveDeliveryEvent(ctx, driver.RetrieveDeliveryEventRequest{ + TenantID: tenantID, + DeliveryID: deliveryID, + }) + require.NoError(t, err) + require.NotNil(t, retrieved) + + // Verify delivery event ID + assert.Equal(t, de.ID, retrieved.ID) + assert.Equal(t, destinationID, retrieved.DestinationID) + + // Verify event fields + assert.Equal(t, eventID, retrieved.Event.ID) + assert.Equal(t, tenantID, retrieved.Event.TenantID) + assert.Equal(t, destinationID, retrieved.Event.DestinationID) + assert.Equal(t, "order.created", retrieved.Event.Topic) + assert.Equal(t, true, retrieved.Event.EligibleForRetry) + assert.WithinDuration(t, eventTime, retrieved.Event.Time, time.Second) + assert.Equal(t, "api", retrieved.Event.Metadata["source"]) + assert.Equal(t, "ord_456", retrieved.Event.Data["order_id"]) + + // Verify delivery fields + require.NotNil(t, retrieved.Delivery) + assert.Equal(t, deliveryID, retrieved.Delivery.ID) + assert.Equal(t, eventID, retrieved.Delivery.EventID) + assert.Equal(t, destinationID, retrieved.Delivery.DestinationID) + assert.Equal(t, "success", retrieved.Delivery.Status) + assert.WithinDuration(t, deliveryTime, retrieved.Delivery.Time, time.Second) + assert.Equal(t, "200", retrieved.Delivery.Code) + // Note: JSON unmarshaling converts integers to float64, but in-memory stores keep them as int + latencyMs := retrieved.Delivery.ResponseData["latency_ms"] + switch v := latencyMs.(type) { + case int: + assert.Equal(t, 42, v) + case float64: + assert.Equal(t, float64(42), v) + default: + t.Errorf("unexpected type for latency_ms: %T", latencyMs) + } + }) + + t.Run("retrieve non-existent delivery", func(t *testing.T) { + retrieved, err := logStore.RetrieveDeliveryEvent(ctx, driver.RetrieveDeliveryEventRequest{ + TenantID: tenantID, + DeliveryID: "non-existent", + }) + require.NoError(t, err) + assert.Nil(t, retrieved) + }) + + t.Run("retrieve with wrong tenant", func(t *testing.T) { + retrieved, err := logStore.RetrieveDeliveryEvent(ctx, driver.RetrieveDeliveryEventRequest{ + TenantID: "wrong-tenant", + DeliveryID: deliveryID, + }) + require.NoError(t, err) + assert.Nil(t, retrieved, "should not return delivery for wrong tenant") + }) + + t.Run("retrieve multiple deliveries for same event", func(t *testing.T) { + // Insert another delivery for the same event (simulating a retry) + secondDeliveryID := idgen.Delivery() + secondDeliveryTime := deliveryTime.Add(time.Second) + + secondDelivery := testutil.DeliveryFactory.AnyPointer( + testutil.DeliveryFactory.WithID(secondDeliveryID), + testutil.DeliveryFactory.WithEventID(eventID), + testutil.DeliveryFactory.WithDestinationID(destinationID), + testutil.DeliveryFactory.WithStatus("failed"), + testutil.DeliveryFactory.WithTime(secondDeliveryTime), + ) + secondDelivery.Code = "500" + + secondDE := &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, secondDeliveryID), + DestinationID: destinationID, + Event: *event, + Delivery: secondDelivery, + } + + require.NoError(t, logStore.InsertManyDeliveryEvent(ctx, []*models.DeliveryEvent{secondDE})) + require.NoError(t, h.FlushWrites(ctx)) + + // Retrieve first delivery - should get first delivery + first, err := logStore.RetrieveDeliveryEvent(ctx, driver.RetrieveDeliveryEventRequest{ + TenantID: tenantID, + DeliveryID: deliveryID, + }) + require.NoError(t, err) + require.NotNil(t, first) + assert.Equal(t, deliveryID, first.Delivery.ID) + assert.Equal(t, "success", first.Delivery.Status) + + // Retrieve second delivery - should get second delivery + second, err := logStore.RetrieveDeliveryEvent(ctx, driver.RetrieveDeliveryEventRequest{ + TenantID: tenantID, + DeliveryID: secondDeliveryID, + }) + require.NoError(t, err) + require.NotNil(t, second) + assert.Equal(t, secondDeliveryID, second.Delivery.ID) + assert.Equal(t, "failed", second.Delivery.Status) + }) +} + // testTenantIsolation ensures data from one tenant cannot be accessed by another func testTenantIsolation(t *testing.T, newHarness HarnessMaker) { t.Helper() diff --git a/internal/logstore/logstore.go b/internal/logstore/logstore.go index cd4b5ace..e35d4266 100644 --- a/internal/logstore/logstore.go +++ b/internal/logstore/logstore.go @@ -7,6 +7,7 @@ import ( "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/logstore/chlogstore" "github.com/hookdeck/outpost/internal/logstore/driver" + "github.com/hookdeck/outpost/internal/logstore/memlogstore" "github.com/hookdeck/outpost/internal/logstore/pglogstore" "github.com/hookdeck/outpost/internal/models" "github.com/jackc/pgx/v5/pgxpool" @@ -15,10 +16,12 @@ import ( type ListDeliveryEventRequest = driver.ListDeliveryEventRequest type ListDeliveryEventResponse = driver.ListDeliveryEventResponse type RetrieveEventRequest = driver.RetrieveEventRequest +type RetrieveDeliveryEventRequest = driver.RetrieveDeliveryEventRequest type LogStore interface { ListDeliveryEvent(context.Context, ListDeliveryEventRequest) (ListDeliveryEventResponse, error) RetrieveEvent(ctx context.Context, request RetrieveEventRequest) (*models.Event, error) + RetrieveDeliveryEvent(ctx context.Context, request RetrieveDeliveryEventRequest) (*models.DeliveryEvent, error) InsertManyDeliveryEvent(context.Context, []*models.DeliveryEvent) error } @@ -48,6 +51,11 @@ func NewLogStore(ctx context.Context, driverOpts DriverOpts) (LogStore, error) { return nil, errors.New("no driver provided") } +// NewMemLogStore returns an in-memory log store for testing. +func NewMemLogStore() LogStore { + return memlogstore.NewLogStore() +} + type Config struct { ClickHouse *clickhouse.ClickHouseConfig Postgres *string diff --git a/internal/logstore/memlogstore/memlogstore.go b/internal/logstore/memlogstore/memlogstore.go index 5afd521e..778f6117 100644 --- a/internal/logstore/memlogstore/memlogstore.go +++ b/internal/logstore/memlogstore/memlogstore.go @@ -224,6 +224,19 @@ func (s *memLogStore) RetrieveEvent(ctx context.Context, req driver.RetrieveEven return nil, nil } +// RetrieveDeliveryEvent retrieves a single delivery event by delivery ID. +func (s *memLogStore) RetrieveDeliveryEvent(ctx context.Context, req driver.RetrieveDeliveryEventRequest) (*models.DeliveryEvent, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, de := range s.deliveryEvents { + if de.Event.TenantID == req.TenantID && de.Delivery != nil && de.Delivery.ID == req.DeliveryID { + return copyDeliveryEvent(de), nil + } + } + return nil, nil +} + func (s *memLogStore) matchesFilter(de *models.DeliveryEvent, req driver.ListDeliveryEventRequest) bool { // Tenant filter (required) if de.Event.TenantID != req.TenantID { diff --git a/internal/logstore/noop.go b/internal/logstore/noop.go deleted file mode 100644 index af084c62..00000000 --- a/internal/logstore/noop.go +++ /dev/null @@ -1,28 +0,0 @@ -package logstore - -import ( - "context" - - "github.com/hookdeck/outpost/internal/logstore/driver" - "github.com/hookdeck/outpost/internal/models" -) - -func NewNoopLogStore() LogStore { - return &noopLogStore{} -} - -type noopLogStore struct{} - -var _ LogStore = (*noopLogStore)(nil) - -func (l *noopLogStore) ListDeliveryEvent(ctx context.Context, request driver.ListDeliveryEventRequest) (driver.ListDeliveryEventResponse, error) { - return driver.ListDeliveryEventResponse{}, nil -} - -func (l *noopLogStore) RetrieveEvent(ctx context.Context, request driver.RetrieveEventRequest) (*models.Event, error) { - return nil, nil -} - -func (l *noopLogStore) InsertManyDeliveryEvent(ctx context.Context, deliveryEvents []*models.DeliveryEvent) error { - return nil -} diff --git a/internal/logstore/pglogstore/pglogstore.go b/internal/logstore/pglogstore/pglogstore.go index fe11f2db..5e22ea7a 100644 --- a/internal/logstore/pglogstore/pglogstore.go +++ b/internal/logstore/pglogstore/pglogstore.go @@ -397,6 +397,94 @@ func (s *logStore) RetrieveEvent(ctx context.Context, req driver.RetrieveEventRe return event, nil } +// RetrieveDeliveryEvent retrieves a single delivery event by delivery ID. +func (s *logStore) RetrieveDeliveryEvent(ctx context.Context, req driver.RetrieveDeliveryEventRequest) (*models.DeliveryEvent, error) { + query := ` + SELECT + idx.event_id, + idx.delivery_id, + idx.destination_id, + idx.event_time, + idx.delivery_time, + idx.topic, + idx.status, + e.tenant_id, + e.eligible_for_retry, + e.data, + e.metadata, + d.code, + d.response_data + FROM event_delivery_index idx + JOIN events e ON e.id = idx.event_id AND e.time = idx.event_time + JOIN deliveries d ON d.id = idx.delivery_id AND d.time = idx.delivery_time + WHERE idx.tenant_id = $1 AND idx.delivery_id = $2 + LIMIT 1` + + row := s.db.QueryRow(ctx, query, req.TenantID, req.DeliveryID) + + var ( + eventID string + deliveryID string + destinationID string + eventTime time.Time + deliveryTime time.Time + topic string + status string + tenantID string + eligibleForRetry bool + data map[string]interface{} + metadata map[string]string + code string + responseData map[string]interface{} + ) + + err := row.Scan( + &eventID, + &deliveryID, + &destinationID, + &eventTime, + &deliveryTime, + &topic, + &status, + &tenantID, + &eligibleForRetry, + &data, + &metadata, + &code, + &responseData, + ) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("scan failed: %w", err) + } + + return &models.DeliveryEvent{ + ID: fmt.Sprintf("%s_%s", eventID, deliveryID), + DestinationID: destinationID, + Event: models.Event{ + ID: eventID, + TenantID: tenantID, + DestinationID: destinationID, + Topic: topic, + EligibleForRetry: eligibleForRetry, + Time: eventTime, + Data: data, + Metadata: metadata, + }, + Delivery: &models.Delivery{ + ID: deliveryID, + EventID: eventID, + DestinationID: destinationID, + Status: status, + Time: deliveryTime, + Code: code, + ResponseData: responseData, + }, + }, nil +} + func (s *logStore) InsertManyDeliveryEvent(ctx context.Context, deliveryEvents []*models.DeliveryEvent) error { if len(deliveryEvents) == 0 { return nil diff --git a/internal/migrator/migrations/clickhouse/000001_init.up.sql b/internal/migrator/migrations/clickhouse/000001_init.up.sql index f12b7f78..45712966 100644 --- a/internal/migrator/migrations/clickhouse/000001_init.up.sql +++ b/internal/migrator/migrations/clickhouse/000001_init.up.sql @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS event_log ( -- Indexes for filtering (bloom filters help skip granules) INDEX idx_event_id event_id TYPE bloom_filter GRANULARITY 4, + INDEX idx_delivery_id delivery_id TYPE bloom_filter GRANULARITY 4, INDEX idx_topic topic TYPE bloom_filter GRANULARITY 4, INDEX idx_status status TYPE set(100) GRANULARITY 4 ) ENGINE = ReplacingMergeTree