Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
b1abed3
feat: add message passing system for event publishing
xernobyl Feb 2, 2026
8370bab
chore: removed excessive comments
xernobyl Feb 2, 2026
430e068
chore: remove PublishEvents
xernobyl Feb 2, 2026
3304b89
chore: refactored service
xernobyl Feb 2, 2026
4604dbb
chore: further refactoring
xernobyl Feb 2, 2026
6607359
chore: webhooks implementation
xernobyl Feb 2, 2026
a105a1d
chore: implemented retryable http
xernobyl Feb 3, 2026
3a6f528
chore: concurrency limit; singleflight
xernobyl Feb 3, 2026
585c6bd
feat: goose based migrations
xernobyl Feb 3, 2026
64a8507
Merge feat/goose-migrations into feat/webhooks
xernobyl Feb 3, 2026
1d5549d
Merge branch 'main' into feat/webhooks
xernobyl Feb 3, 2026
76baf01
Merge branch 'main' into feat/message-passing
xernobyl Feb 3, 2026
192e59e
Merge branch 'feat/message-passing' into feat/webhooks
xernobyl Feb 3, 2026
d9104d1
chore: changed context
xernobyl Feb 3, 2026
de1d103
chore: updated openapi.yaml
xernobyl Feb 3, 2026
7aa4fc4
chore: fixed tenant_id
xernobyl Feb 3, 2026
c5a2d9f
chore: webhook id idempotency
xernobyl Feb 3, 2026
c52e1fc
chore: remove file
xernobyl Feb 3, 2026
f6694ba
chore: changed webhooks redirect / timeouts to fall in line with stan…
xernobyl Feb 3, 2026
43407cf
chore: further alignment with standard webhooks
xernobyl Feb 3, 2026
8095505
chore: migration change / comments
xernobyl Feb 4, 2026
8608177
Merge branch 'feat/goose-migrations' into feat/webhooks
xernobyl Feb 4, 2026
849e9db
chore: slight event refactor
xernobyl Feb 4, 2026
d1df6dd
chore: added publisher queue
xernobyl Feb 4, 2026
1e9526e
chore: message publisher worker shutdown
xernobyl Feb 4, 2026
ef319d7
chore: river based webhooks implementation
xernobyl Feb 4, 2026
80a99a3
chore: Updated linting rules, and fixed problems
xernobyl Feb 4, 2026
e694e1b
chore: additional performance and safety lint rules
xernobyl Feb 4, 2026
cec0503
chore: additional performance linting rules
xernobyl Feb 4, 2026
7d3cf14
Merge branch 'main' into chore/linting-rules
xernobyl Feb 5, 2026
aec5193
Merge branch 'chore/linting-rules' into feat/webhooks
xernobyl Feb 5, 2026
906c184
chore: added riverui to docker compose
xernobyl Feb 5, 2026
b6911a9
Merge branch 'chore/linting-rules' into feat/webhooks
xernobyl Feb 5, 2026
25ebf00
chore: improve river performance by using batches ("InsertMany" inste…
xernobyl Feb 5, 2026
5251371
chore: additional env vars in example
xernobyl Feb 5, 2026
84fc5a1
Merge branch 'main' into feat/webhooks
xernobyl Feb 5, 2026
d555d9a
chore: assorted fixes
xernobyl Feb 5, 2026
77c1740
chore: messages on bulk deletes
xernobyl Feb 5, 2026
af9332a
chore: vulnerability fix
xernobyl Feb 6, 2026
f1b7b06
Merge branch 'main' into feat/webhooks
xernobyl Feb 9, 2026
1420d8d
chore: updated deletes
xernobyl Feb 10, 2026
49fca62
chore: fixed error checking
xernobyl Feb 10, 2026
a3b9874
chore: unnecessary error checking
xernobyl Feb 10, 2026
96a3b01
chore: url validation
xernobyl Feb 10, 2026
6ad81a1
chore: add hard limit to webhooks
xernobyl Feb 10, 2026
345520c
chore: .env.example update
xernobyl Feb 10, 2026
d7e0727
chore: updated timestamp types in DB
xernobyl Feb 10, 2026
e02ce29
chore: fixed timestamps
xernobyl Feb 10, 2026
62c41c5
chore: early webhook key validation
xernobyl Feb 10, 2026
adbd378
chore: remove limit in listing webhooks that listen for events
xernobyl Feb 10, 2026
0dc82e0
chore: renamed for clarity
xernobyl Feb 10, 2026
0ab5f99
chore: refactor changed fields
xernobyl Feb 10, 2026
81f31ea
chore: refactor main
xernobyl Feb 10, 2026
78925b1
chore: merged migrations; updated indexes
xernobyl Feb 10, 2026
f1780e9
chore: refactored event types
xernobyl Feb 10, 2026
81906fe
chore: fix integration tests
xernobyl Feb 10, 2026
791b9f0
chore: fix make
xernobyl Feb 10, 2026
81fcb75
chore: fix build
xernobyl Feb 10, 2026
43e135c
chore: fix documentation
xernobyl Feb 10, 2026
31a9838
chore: fix workflow
xernobyl Feb 10, 2026
41997bb
chore: fix shutdown double call
xernobyl Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,25 @@ PORT=8080
# Default: info
# Valid values: debug, info, warn, error
LOG_LEVEL=info

# Message publisher: event channel buffer size (optional). Default: 1024
MESSAGE_PUBLISHER_QUEUE_MAX_SIZE=16384

# Message publisher: per-event processing timeout in seconds (optional). Default: 10
MESSAGE_PUBLISHER_PER_EVENT_TIMEOUT_SECONDS=10

# Graceful shutdown timeout in seconds (optional). Default: 30
SHUTDOWN_TIMEOUT_SECONDS=30

# Webhook max fan-out per event (optional)
# Max number of webhook jobs enqueued per event; excess is capped and logged. Default: 500
WEBHOOK_MAX_FAN_OUT_PER_EVENT=500

# Webhook max count (optional)
# Max total webhooks allowed; creation returns 403 Forbidden when limit reached. Default: 500
WEBHOOK_MAX_COUNT=500

# River UI basic auth (optional, used when running River UI via docker compose)
# Defaults: admin / changeme if unset
RIVER_BASIC_AUTH_USER=admin
RIVER_BASIC_AUTH_PASS=changeme
5 changes: 5 additions & 0 deletions .github/workflows/api-contract-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ jobs:
- name: Initialize database schema
run: make init-db

- name: Run River migrations
run: |
go install github.com/riverqueue/river/cmd/river@latest
make river-migrate

- name: Build application
run: make build

Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ jobs:
- name: Initialize database schema
run: make init-db

- name: Run River migrations
run: |
go install github.com/riverqueue/river/cmd/river@latest
make river-migrate

- name: Run integration tests
run: make tests

Expand Down
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Repository Guidelines

## Project Structure & Module Organization
- `cmd/api/` holds the API server entrypoint (`main.go`).
- `cmd/api/` holds the API server (package `main`: `main.go`, `app.go`). Build/run the package, e.g. `go run ./cmd/api` or `make run`.
- `internal/` contains core application layers: `api/handlers`, `api/middleware`, `service`, `repository`, `models`, and `config`.
- `pkg/` provides shared utilities (currently `pkg/database`).
- `migrations/` stores SQL migration files (goose); use `-- +goose up` / `-- +goose down` annotations.
Expand All @@ -13,7 +13,7 @@
- `make build`: build the API binary to `bin/api`.
- `make tests`: run integration tests in `tests/`.
- `make tests-coverage`: generate `coverage.html`.
- `make init-db`: run goose migrations up using `DATABASE_URL`. `make migrate-status` and `make migrate-validate` for status and validation. New migrations go in `migrations/` with goose annotations (`-- +goose up` / `-- +goose down`). Name files with a sequential number and short description (e.g. `002_add_webhooks_table.sql`); goose orders by the numeric prefix.
- `make init-db`: run goose migrations up using `DATABASE_URL`. `make migrate-status` and `make migrate-validate` for status and validation. New migrations go in `migrations/` with goose annotations (`-- +goose up` / `-- +goose down`). Name files with a sequential number and short description (e.g. `002_add_webhooks_table.sql`); goose orders by the numeric prefix. For webhook delivery, run `make river-migrate` after `init-db` to apply River job queue migrations.
- `make fmt`: format code (runs `golangci-lint run --fix`; uses gofumpt/gci from config).
- `make lint`: run `golangci-lint` (includes format checks; requires `make install-tools`).

Expand Down
60 changes: 37 additions & 23 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
.PHONY: help tests tests-coverage build run init-db clean docker-up docker-down docker-clean deps install-tools fmt lint dev-setup test-all test-unit schemathesis install-hooks migrate-status migrate-validate
.PHONY: help tests tests-coverage build run init-db clean docker-up docker-down docker-clean deps install-tools fmt lint dev-setup test-all test-unit schemathesis install-hooks migrate-status migrate-validate river-migrate

# Default target - show help
help:
@echo "Available targets:"
@echo " make help - Show this help message"
@echo " make dev-setup - Set up development environment (docker, deps, tools, schema, hooks)"
@echo " make build - Build the API server"
@echo " make run - Run the API server"
@echo " make test-unit - Run unit tests (fast, no database)"
@echo " make tests - Run integration tests"
@echo " make test-all - Run all tests (unit + integration)"
@echo " make tests-coverage - Run tests with coverage report"
@echo " make init-db - Initialize database schema (run migrations with goose)"
@echo " make migrate-status - Show migration status"
@echo " make help - Show this help message"
@echo " make dev-setup - Set up development environment (docker, deps, tools, schema, hooks)"
@echo " make build - Build the API server"
@echo " make run - Run the API server"
@echo " make test-unit - Run unit tests (fast, no database)"
@echo " make tests - Run integration tests"
@echo " make test-all - Run all tests (unit + integration)"
@echo " make tests-coverage - Run tests with coverage report"
@echo " make init-db - Initialize database schema (run migrations with goose)"
@echo " make migrate-status - Show migration status"
@echo " make migrate-validate - Validate migration files (no DB)"
@echo " make fmt - Format code (golangci-lint run --fix)"
@echo " make lint - Run linter (includes format checks)"
@echo " make deps - Install Go dependencies"
@echo " make install-tools - Install development tools (golangci-lint, govulncheck, goose)"
@echo " make install-hooks - Install git hooks"
@echo " make docker-up - Start Docker containers"
@echo " make docker-down - Stop Docker containers"
@echo " make docker-clean - Stop Docker containers and remove volumes"
@echo " make clean - Clean build artifacts"
@echo " make schemathesis - Run Schemathesis API tests (requires API server running)"
@echo " make river-migrate - Run River job queue migrations (required for webhook delivery)"
@echo " make fmt - Format code (golangci-lint run --fix)"
@echo " make lint - Run linter (includes format checks)"
@echo " make deps - Install Go dependencies"
@echo " make install-tools - Install development tools (golangci-lint, govulncheck, goose)"
@echo " make install-hooks - Install git hooks"
@echo " make docker-up - Start Docker containers"
@echo " make docker-down - Stop Docker containers"
@echo " make docker-clean - Stop Docker containers and remove volumes"
@echo " make clean - Clean build artifacts"
@echo " make schemathesis - Run Schemathesis API tests (requires API server running)"

# Run all tests (integration tests in tests/ directory)
tests:
Expand All @@ -49,7 +50,7 @@ tests-coverage:
# Build the API server
build:
@echo "Building API server..."
go build -o bin/api cmd/api/main.go
go build -o bin/api ./cmd/api
@echo "Binary created: bin/api"

# Run the API server
Expand All @@ -71,7 +72,7 @@ run:
echo ".env file created with default values."; \
fi
@echo "Starting API server..."
go run cmd/api/main.go
go run ./cmd/api

# Initialize database schema (run goose migrations up)
init-db:
Expand Down Expand Up @@ -109,6 +110,19 @@ migrate-validate:
@goose -dir migrations validate
@echo "Migration files are valid"

# Run River job queue migrations (required for webhook delivery)
river-migrate:
@command -v river >/dev/null 2>&1 || { echo "Error: river CLI not found. Install with: go install github.com/riverqueue/river/cmd/river@latest"; exit 1; }
@if [ -f .env ]; then \
export $$(grep -v '^#' .env | xargs) && \
if [ -z "$$DATABASE_URL" ]; then echo "Error: DATABASE_URL not found in .env"; exit 1; fi && \
river migrate-up --database-url "$$DATABASE_URL"; \
else \
if [ -z "$$DATABASE_URL" ]; then echo "Error: DATABASE_URL not set"; exit 1; fi && \
river migrate-up --database-url "$$DATABASE_URL"; \
fi
@echo "River migrations applied"

# Start Docker containers
docker-up:
@echo "Starting Docker containers..."
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ API_KEY=your-secret-key-here

5. Start the server:
```bash
go run ./cmd/api/main.go
go run ./cmd/api
```

## API Endpoints
Expand Down
162 changes: 162 additions & 0 deletions cmd/api/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Package main provides the application lifecycle: bootstrap, run, and shutdown.
package main

import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"

"github.com/formbricks/hub/internal/api/handlers"
"github.com/formbricks/hub/internal/api/middleware"
"github.com/formbricks/hub/internal/config"
"github.com/formbricks/hub/internal/repository"
"github.com/formbricks/hub/internal/service"
"github.com/formbricks/hub/internal/workers"
)

// App holds all server dependencies and coordinates startup and shutdown.
type App struct {
cfg *config.Config
db *pgxpool.Pool
server *http.Server
river *river.Client[pgx.Tx]
message *service.MessagePublisherManager
runCtx context.Context
cancel context.CancelFunc
}

// NewApp builds and wires all components. It does not start the HTTP server or River;
// call Run to start and block until shutdown or failure.
func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
ctx, cancel := context.WithCancel(context.Background())

messageManager := service.NewMessagePublisherManager(cfg.MessagePublisherBufferSize, cfg.MessagePublisherPerEventTimeout)

webhooksRepo := repository.NewWebhooksRepository(db)
webhookSender := service.NewWebhookSenderImpl(webhooksRepo)
webhookWorker := workers.NewWebhookDispatchWorker(webhooksRepo, webhookSender)
riverWorkers := river.NewWorkers()
river.AddWorker(riverWorkers, webhookWorker)

riverClient, err := river.NewClient(riverpgxv5.New(db), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: cfg.WebhookDeliveryMaxConcurrent},
},
Workers: riverWorkers,
})
if err != nil {
cancel()
messageManager.Shutdown()
return nil, fmt.Errorf("create River client: %w", err)
}

webhookProvider := service.NewWebhookProvider(riverClient, webhooksRepo, cfg.WebhookDeliveryMaxAttempts, cfg.WebhookMaxFanOutPerEvent)
messageManager.RegisterProvider(webhookProvider)

webhooksService := service.NewWebhooksService(webhooksRepo, messageManager, cfg.WebhookMaxCount)
webhooksHandler := handlers.NewWebhooksHandler(webhooksService)

feedbackRecordsRepo := repository.NewFeedbackRecordsRepository(db)
feedbackRecordsService := service.NewFeedbackRecordsService(feedbackRecordsRepo, messageManager)
feedbackRecordsHandler := handlers.NewFeedbackRecordsHandler(feedbackRecordsService)
healthHandler := handlers.NewHealthHandler()

server := newHTTPServer(cfg, healthHandler, feedbackRecordsHandler, webhooksHandler)

return &App{
cfg: cfg,
db: db,
server: server,
river: riverClient,
message: messageManager,
runCtx: ctx,
cancel: cancel,
}, nil
}

// newHTTPServer builds the HTTP server and muxes (no auth on /health, API key on /v1/).
func newHTTPServer(cfg *config.Config, health *handlers.HealthHandler, feedback *handlers.FeedbackRecordsHandler, webhooks *handlers.WebhooksHandler) *http.Server {
public := http.NewServeMux()
public.HandleFunc("GET /health", health.Check)

protected := http.NewServeMux()
protected.HandleFunc("POST /v1/feedback-records", feedback.Create)
protected.HandleFunc("GET /v1/feedback-records", feedback.List)
protected.HandleFunc("GET /v1/feedback-records/{id}", feedback.Get)
protected.HandleFunc("PATCH /v1/feedback-records/{id}", feedback.Update)
protected.HandleFunc("DELETE /v1/feedback-records/{id}", feedback.Delete)
protected.HandleFunc("DELETE /v1/feedback-records", feedback.BulkDelete)
protected.HandleFunc("POST /v1/webhooks", webhooks.Create)
protected.HandleFunc("GET /v1/webhooks", webhooks.List)
protected.HandleFunc("GET /v1/webhooks/{id}", webhooks.Get)
protected.HandleFunc("PATCH /v1/webhooks/{id}", webhooks.Update)
protected.HandleFunc("DELETE /v1/webhooks/{id}", webhooks.Delete)

protectedWithAuth := middleware.Auth(cfg.APIKey)(protected)
mux := http.NewServeMux()
mux.Handle("/v1/", protectedWithAuth)
mux.Handle("/", public)

return &http.Server{
Addr: ":" + cfg.Port,
Handler: middleware.Logging(mux),
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
}

// Run starts the HTTP server and River, then blocks until ctx is cancelled (e.g. signal)
// or a component fails. On component failure it cancels internal context and returns the error.
// Caller should then call Shutdown.
func (a *App) Run(ctx context.Context) error {
runErr := make(chan error, 1)

go func() {
if err := a.river.Start(a.runCtx); err != nil && !errors.Is(err, context.Canceled) {
select {
case runErr <- fmt.Errorf("river: %w", err):
default:
}
}
}()

go func() {
slog.Info("Starting server", "port", a.cfg.Port)
if err := a.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
select {
case runErr <- fmt.Errorf("server: %w", err):
default:
}
}
}()

select {
case err := <-runErr:
a.cancel()
return err
case <-ctx.Done():
return nil
}
}

// Shutdown stops the server, message publisher, and River in order. Call after Run returns.
func (a *App) Shutdown(ctx context.Context) error {
defer a.message.Shutdown()
if err := a.server.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
_ = a.river.Stop(ctx)
return fmt.Errorf("server shutdown: %w", err)
}
if err := a.river.Stop(ctx); err != nil {
return fmt.Errorf("river stop: %w", err)
}
return nil
}
Loading
Loading