Skip to content
12 changes: 6 additions & 6 deletions cmd/e2e/configs/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ func setLogStorage(t *testing.T, c *config.Config, logStorage LogStorageType) er
case LogStorageTypePostgres:
postgresURL := testinfra.NewPostgresConfig(t)
c.PostgresURL = postgresURL
// case LogStorageTypeClickHouse:
// clickHouseConfig := testinfra.NewClickHouseConfig(t)
// c.ClickHouse.Addr = clickHouseConfig.Addr
// c.ClickHouse.Username = clickHouseConfig.Username
// c.ClickHouse.Password = clickHouseConfig.Password
// c.ClickHouse.Database = clickHouseConfig.Database
case LogStorageTypeClickHouse:
clickHouseConfig := testinfra.NewClickHouseConfig(t)
c.ClickHouse.Addr = clickHouseConfig.Addr
c.ClickHouse.Username = clickHouseConfig.Username
c.ClickHouse.Password = clickHouseConfig.Password
c.ClickHouse.Database = clickHouseConfig.Database
default:
return fmt.Errorf("invalid log storage type: %s", logStorage)
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/e2e/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ func (s *basicSuite) TearDownSuite() {
s.e2eSuite.TearDownSuite()
}

// func TestCHBasicSuite(t *testing.T) {
// t.Parallel()
// if testing.Short() {
// t.Skip("skipping e2e test")
// }
// suite.Run(t, &basicSuite{logStorageType: configs.LogStorageTypeClickHouse})
// }
func TestBasicSuiteWithCH(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping e2e test")
}
suite.Run(t, &basicSuite{logStorageType: configs.LogStorageTypeClickHouse})
}

func TestPGBasicSuite(t *testing.T) {
t.Parallel()
Expand Down
5 changes: 4 additions & 1 deletion internal/apirouter/retry_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func (h *RetryHandlers) Retry(c *gin.Context) {
return
}

event, err := h.logStore.RetrieveEvent(c, tenantID, eventID)
event, err := h.logStore.RetrieveEvent(c, logstore.RetrieveEventRequest{
TenantID: tenantID,
EventID: eventID,
})
if err != nil {
AbortWithError(c, http.StatusInternalServerError, NewErrInternalServer(err))
return
Expand Down
71 changes: 36 additions & 35 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/caarlos0/env/v9"
"github.com/hookdeck/outpost/internal/backoff"
"github.com/hookdeck/outpost/internal/clickhouse"
"github.com/hookdeck/outpost/internal/migrator"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/telemetry"
Expand Down Expand Up @@ -61,10 +62,10 @@ type Config struct {
HTTPUserAgent string `yaml:"http_user_agent" env:"HTTP_USER_AGENT" desc:"Custom HTTP User-Agent string for outgoing webhook deliveries. If unset, a default (OrganizationName/Version) is used." required:"N"`

// Infrastructure
Redis RedisConfig `yaml:"redis"`
// ClickHouse ClickHouseConfig `yaml:"clickhouse"`
PostgresURL string `yaml:"postgres" env:"POSTGRES_URL" desc:"Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'." required:"Y"`
MQs *MQsConfig `yaml:"mqs"`
Redis RedisConfig `yaml:"redis"`
ClickHouse ClickHouseConfig `yaml:"clickhouse"`
PostgresURL string `yaml:"postgres" env:"POSTGRES_URL" desc:"Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'." required:"N"`
MQs *MQsConfig `yaml:"mqs"`

// PublishMQ
PublishMQ PublishMQConfig `yaml:"publishmq"`
Expand Down Expand Up @@ -131,9 +132,9 @@ func (c *Config) InitDefaults() {
Host: "127.0.0.1",
Port: 6379,
}
// c.ClickHouse = ClickHouseConfig{
// Database: "outpost",
// }
c.ClickHouse = ClickHouseConfig{
Database: "outpost",
}
c.MQs = &MQsConfig{
RabbitMQ: RabbitMQConfig{
Exchange: "outpost",
Expand Down Expand Up @@ -378,24 +379,24 @@ func (c *RedisConfig) ToConfig() *redis.RedisConfig {
}
}

// type ClickHouseConfig struct {
// Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'. Required if ClickHouse is used for log storage." required:"C"`
// Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
// Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
// Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
// }

// func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig {
// if c.Addr == "" {
// return nil
// }
// return &clickhouse.ClickHouseConfig{
// Addr: c.Addr,
// Username: c.Username,
// Password: c.Password,
// Database: c.Database,
// }
// }
type ClickHouseConfig struct {
Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'." required:"N"`
Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
}

func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig {
if c.Addr == "" {
return nil
}
return &clickhouse.ClickHouseConfig{
Addr: c.Addr,
Username: c.Username,
Password: c.Password,
Database: c.Database,
}
}

type AlertConfig struct {
CallbackURL string `yaml:"callback_url" env:"ALERT_CALLBACK_URL" desc:"URL to which Outpost will send a POST request when an alert is triggered (e.g., for destination failures)." required:"N"`
Expand Down Expand Up @@ -447,10 +448,10 @@ func (c *Config) ToTelemetryApplicationInfo() telemetry.ApplicationInfo {
portalEnabled := c.APIKey != "" && c.APIJWTSecret != ""

entityStore := "redis"
logStore := "TODO"
// if c.ClickHouse.Addr != "" {
// logStore = "clickhouse"
// }
logStore := ""
if c.ClickHouse.Addr != "" {
logStore = "clickhouse"
}
if c.PostgresURL != "" {
logStore = "postgres"
}
Expand All @@ -471,11 +472,11 @@ func (c *Config) ToMigratorOpts() migrator.MigrationOpts {
PG: migrator.MigrationOptsPG{
URL: c.PostgresURL,
},
// CH: migrator.MigrationOptsCH{
// Addr: c.ClickHouse.Addr,
// Username: c.ClickHouse.Username,
// Password: c.ClickHouse.Password,
// Database: c.ClickHouse.Database,
// },
CH: migrator.MigrationOptsCH{
Addr: c.ClickHouse.Addr,
Username: c.ClickHouse.Username,
Password: c.ClickHouse.Password,
Database: c.ClickHouse.Database,
},
}
}
9 changes: 1 addition & 8 deletions internal/config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,10 @@ func (c *Config) validateRedis() error {
}

// validateLogStorage validates the ClickHouse / PG configuration
// Temporary: disable CH as it's not fully supported yet
func (c *Config) validateLogStorage() error {
// if c.ClickHouse.Addr == "" && c.PostgresURL == "" {
// return ErrMissingLogStorage
// }
if c.PostgresURL == "" {
if c.ClickHouse.Addr == "" && c.PostgresURL == "" {
return ErrMissingLogStorage
}
// if c.ClickHouse.Addr != "" {
// return fmt.Errorf("ClickHouse is not currently supported")
// }
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hookdeck/outpost/internal/destregistry"
"github.com/hookdeck/outpost/internal/idempotence"
"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/logstore"
"github.com/hookdeck/outpost/internal/models"
"github.com/hookdeck/outpost/internal/mqs"
"github.com/hookdeck/outpost/internal/scheduler"
Expand Down Expand Up @@ -96,7 +97,7 @@ type DestinationGetter interface {
}

type EventGetter interface {
RetrieveEvent(ctx context.Context, tenantID, eventID string) (*models.Event, error)
RetrieveEvent(ctx context.Context, request logstore.RetrieveEventRequest) (*models.Event, error)
}

type DeliveryTracer interface {
Expand Down Expand Up @@ -417,7 +418,10 @@ func (h *messageHandler) ensureDeliveryEvent(ctx context.Context, deliveryEvent
return nil
}

event, err := h.logStore.RetrieveEvent(ctx, deliveryEvent.Event.TenantID, deliveryEvent.Event.ID)
event, err := h.logStore.RetrieveEvent(ctx, logstore.RetrieveEventRequest{
TenantID: deliveryEvent.Event.TenantID,
EventID: deliveryEvent.Event.ID,
})
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions internal/deliverymq/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/hookdeck/outpost/internal/alert"
"github.com/hookdeck/outpost/internal/idgen"
"github.com/hookdeck/outpost/internal/logstore"
"github.com/hookdeck/outpost/internal/models"
mqs "github.com/hookdeck/outpost/internal/mqs"
"github.com/hookdeck/outpost/internal/scheduler"
Expand Down Expand Up @@ -111,12 +112,12 @@ func (m *mockEventGetter) clearError() {
m.err = nil
}

func (m *mockEventGetter) RetrieveEvent(ctx context.Context, tenantID, eventID string) (*models.Event, error) {
func (m *mockEventGetter) RetrieveEvent(ctx context.Context, req logstore.RetrieveEventRequest) (*models.Event, error) {
if m.err != nil {
return nil, m.err
}
m.lastRetrievedID = eventID
event, ok := m.events[eventID]
m.lastRetrievedID = req.EventID
event, ok := m.events[req.EventID]
if !ok {
return nil, errors.New("event not found")
}
Expand Down
109 changes: 109 additions & 0 deletions internal/logstore/chlogstore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# chlogstore

ClickHouse implementation of the LogStore interface.

## Schema

Single denormalized table - each row represents a delivery attempt for an event.

| Table | Engine | Partition | Order By |
|-------|--------|-----------|----------|
| `event_log` | ReplacingMergeTree | `toYYYYMMDD(delivery_time)` | `(tenant_id, destination_id, delivery_time, event_id, delivery_id)` |

**Secondary indexes:**
- `idx_event_id` - bloom_filter for event_id lookups
- `idx_topic` - bloom_filter for topic filtering
- `idx_status` - set index for status filtering

## Design Principles

### Stateless Queries

All queries are designed to be stateless:
- No `GROUP BY`, no aggregation
- Direct row access with `ORDER BY` + `LIMIT`
- O(limit) performance regardless of total data volume

This avoids the scaling issues of aggregation-based queries that must scan all matching rows before applying LIMIT.

### Eventual Consistency

ReplacingMergeTree deduplicates rows asynchronously during background merges. This means:
- Duplicate inserts (retries) may briefly appear as multiple rows
- Background merge consolidates duplicates within seconds/minutes
- Production queries do NOT use `FINAL` to avoid full-scan overhead

For most use cases (log viewing), brief duplicates are acceptable.

## Operations

### ListDeliveryEvent

Direct index scan with cursor-based pagination.

```sql
SELECT
event_id, tenant_id, destination_id, topic, eligible_for_retry,
event_time, metadata, data,
delivery_id, delivery_event_id, status, delivery_time, code, response_data
FROM event_log
WHERE tenant_id = ?
AND [optional filters: destination_id, status, topic, time ranges]
AND [cursor condition]
ORDER BY delivery_time DESC, delivery_id DESC
LIMIT 101
```

**Cursor design:**
- Format: `v1:{sortBy}:{sortOrder}:{position}` (base62 encoded)
- Position for delivery_time sort: `{timestamp}::{delivery_id}`
- Position for event_time sort: `{event_timestamp}::{event_id}::{delivery_timestamp}::{delivery_id}`
- Validates sort params match - rejects mismatched cursors

**Backward pagination:**
- Reverses ORDER BY direction
- Reverses comparison operator in cursor condition
- Reverses results after fetching

### RetrieveEvent

Direct lookup by tenant_id and event_id.

```sql
SELECT
event_id, tenant_id, destination_id, topic, eligible_for_retry,
event_time, metadata, data
FROM event_log
WHERE tenant_id = ? AND event_id = ?
LIMIT 1
```

With destination filter, adds `AND destination_id = ?`.

### InsertManyDeliveryEvent

Batch insert using ClickHouse's native batch API.

```go
batch, _ := conn.PrepareBatch(ctx, `
INSERT INTO event_log (
event_id, tenant_id, destination_id, topic, eligible_for_retry,
event_time, metadata, data,
delivery_id, delivery_event_id, status, delivery_time, code, response_data
)
`)
for _, de := range deliveryEvents {
batch.Append(...)
}
batch.Send()
```

**Idempotency:** ReplacingMergeTree deduplicates rows with identical ORDER BY keys during background merges.

## Performance Characteristics

| Operation | Complexity | Notes |
|-----------|------------|-------|
| ListDeliveryEvent | O(limit) | Index scan, stops at LIMIT |
| RetrieveEvent | O(1) | Single row lookup via bloom filter |
| InsertManyDeliveryEvent | O(batch) | Batch insert, async dedup |
Loading
Loading