Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions deploy/local/docker-compose/consumoor-clickhouse-init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/bin/bash
# Initialize the consumoor ClickHouse database by copying all table schemas from default.
set -e

CH_HOST="${CH_HOST:-xatu-clickhouse-01}"

ch() {
clickhouse-client --host="$CH_HOST" "$@"
}

echo "Creating consumoor database..."
ch --query="CREATE DATABASE IF NOT EXISTS consumoor ON CLUSTER '{cluster}'"

echo "Getting local tables from default database..."
tables=$(ch --query="SELECT name FROM system.tables WHERE database = 'default' AND name LIKE '%_local' AND engine LIKE 'Replicated%' ORDER BY name")

for table in $tables; do
echo "Copying: default.$table -> consumoor.$table"

# Get the CREATE TABLE DDL and modify it for consumoor database:
# 1. Replace database qualifier: default. -> consumoor.
# 2. Add ON CLUSTER after table name
# 3. Append /consumoor to the ZK path (first quoted arg in MergeTree())
# to ensure unique paths regardless of path pattern variant
ch --format=TSVRaw --query="SHOW CREATE TABLE default.\`$table\`" \
| sed '1s/^CREATE TABLE default\./CREATE TABLE IF NOT EXISTS consumoor./' \
| sed "1s/\$/ ON CLUSTER '{cluster}'/" \
| sed "s|MergeTree('/\([^']*\)'|MergeTree('/\1/consumoor'|" \
| ch --multiquery

# Create corresponding distributed table
dist_table="${table%_local}"
echo "Creating distributed: consumoor.$dist_table"
ch --query="CREATE TABLE IF NOT EXISTS consumoor.\`$dist_table\` ON CLUSTER '{cluster}' AS consumoor.\`$table\` ENGINE = Distributed('{cluster}', 'consumoor', '$table', rand())"
done

echo ""
table_count=$(echo "$tables" | wc -w | tr -d ' ')
echo "Consumoor database initialization complete! ($table_count tables copied)"
194 changes: 194 additions & 0 deletions deploy/local/docker-compose/smoke-test-correctness.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#!/bin/bash
# Smoke test comparing Vector (default db) vs Consumoor (consumoor db) ClickHouse output.
#
# Usage:
# ./smoke-test-correctness.sh [ingest|compare|all]
#
# Modes:
# ingest - Send test events from testdata captures to xatu-server HTTP API
# compare - Compare default vs consumoor ClickHouse tables
# all - Ingest, wait, then compare (default)
set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
REPO_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)"
CAPTURES_DIR="$REPO_ROOT/pkg/consumoor/testdata/captures"
XATU_SERVER_URL="${XATU_SERVER_URL:-http://localhost:8087/v1/events}"
CH_HOST="${CH_HOST:-localhost}"
CH_PORT="${CH_PORT:-8123}"
FLUSH_WAIT="${FLUSH_WAIT:-15}"

MODE="${1:-all}"

# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

ch_query() {
curl -sf "http://${CH_HOST}:${CH_PORT}/" --data-binary "$1"
}

# ── Ingest ──────────────────────────────────────────────────────────────────
ingest_events() {
echo "=== Ingesting test events via xatu-server HTTP API ==="
echo "Server: $XATU_SERVER_URL"
echo "Captures: $CAPTURES_DIR"
echo ""

local total=0
local sent=0
local skipped=0
local failed=0

for jsonl_file in "$CAPTURES_DIR"/*.jsonl; do
filename=$(basename "$jsonl_file" .jsonl)
total=$((total + 1))

# Skip empty files
if [ ! -s "$jsonl_file" ]; then
skipped=$((skipped + 1))
continue
fi

lines=$(wc -l < "$jsonl_file" | tr -d ' ')

# Send each line as a separate CreateEventsRequest
local line_num=0
while IFS= read -r line; do
line_num=$((line_num + 1))

# Wrap the DecoratedEvent in a CreateEventsRequest
payload="{\"events\":[$line]}"

http_code=$(curl -sf -o /dev/null -w "%{http_code}" \
-X POST "$XATU_SERVER_URL" \
-H "Content-Type: application/json" \
-d "$payload" 2>/dev/null) || http_code="000"

if [ "$http_code" = "200" ] || [ "$http_code" = "202" ] || [ "$http_code" = "204" ]; then
:
else
echo -e " ${RED}FAIL${NC} $filename line $line_num (HTTP $http_code)"
failed=$((failed + 1))
fi
done < "$jsonl_file"

echo -e " ${GREEN}SENT${NC} $filename ($lines events)"
sent=$((sent + 1))
done

echo ""
echo "Ingest summary: total=$total sent=$sent skipped=$skipped failed=$failed"
}

# ── Compare ─────────────────────────────────────────────────────────────────
compare_tables() {
echo "=== Comparing default vs consumoor ClickHouse tables ==="
echo ""

# Get tables that exist in consumoor database
consumoor_tables=$(ch_query "SELECT name FROM system.tables WHERE database = 'consumoor' AND name NOT LIKE '%_local' AND engine = 'Distributed' ORDER BY name")

if [ -z "$consumoor_tables" ]; then
echo -e "${RED}ERROR: No tables found in consumoor database${NC}"
echo "Make sure xatu-clickhouse-consumoor-init has completed."
exit 1
fi

local total=0
local match=0
local diff=0
local skip=0
local empty=0

while IFS= read -r table; do
[ -z "$table" ] && continue
total=$((total + 1))

# Check if table exists in default
default_exists=$(ch_query "SELECT count() FROM system.tables WHERE database = 'default' AND name = '$table' AND engine = 'Distributed'")
if [ "$default_exists" = "0" ]; then
echo -e " ${YELLOW}SKIP${NC} $table (not in default db)"
skip=$((skip + 1))
continue
fi

# Get row counts
default_count=$(ch_query "SELECT count() FROM default.\`$table\`")
consumoor_count=$(ch_query "SELECT count() FROM consumoor.\`$table\`")

if [ "$default_count" = "0" ] && [ "$consumoor_count" = "0" ]; then
echo -e " ${YELLOW}EMPTY${NC} $table (both 0 rows)"
empty=$((empty + 1))
continue
fi

if [ "$default_count" != "$consumoor_count" ]; then
echo -e " ${RED}DIFF${NC} $table (default=$default_count consumoor=$consumoor_count)"
diff=$((diff + 1))
continue
fi

# Same count — do a deeper comparison using EXCEPT
# Compare all columns except those that might legitimately differ
diff_count=$(ch_query "
SELECT count() FROM (
SELECT * FROM default.\`$table\`
EXCEPT
SELECT * FROM consumoor.\`$table\`
)
" 2>/dev/null) || diff_count="error"

if [ "$diff_count" = "error" ]; then
echo -e " ${YELLOW}SKIP${NC} $table (schema mismatch, cannot EXCEPT)"
skip=$((skip + 1))
elif [ "$diff_count" = "0" ]; then
echo -e " ${GREEN}MATCH${NC} $table ($default_count rows)"
match=$((match + 1))
else
echo -e " ${RED}DIFF${NC} $table ($default_count rows, $diff_count differ)"
diff=$((diff + 1))
fi
done <<< "$consumoor_tables"

echo ""
echo "Compare summary: total=$total match=$match diff=$diff skip=$skip empty=$empty"

if [ "$diff" -gt 0 ]; then
echo ""
echo -e "${RED}FAIL: $diff tables have differences${NC}"
return 1
elif [ "$match" -gt 0 ]; then
echo ""
echo -e "${GREEN}PASS: All $match populated tables match${NC}"
return 0
else
echo ""
echo -e "${YELLOW}WARN: No tables had data to compare${NC}"
return 0
fi
}

# ── Main ────────────────────────────────────────────────────────────────────
case "$MODE" in
ingest)
ingest_events
;;
compare)
compare_tables
;;
all)
ingest_events
echo ""
echo "Waiting ${FLUSH_WAIT}s for both pipelines to flush..."
sleep "$FLUSH_WAIT"
echo ""
compare_tables
;;
*)
echo "Usage: $0 [ingest|compare|all]"
exit 1
;;
esac
26 changes: 26 additions & 0 deletions deploy/local/docker-compose/xatu-consumoor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
logging: "debug"
metricsAddr: ":9091"

kafka:
brokers:
- xatu-kafka:29092
topics:
- "^proto-.+"
consumerGroup: xatu-consumoor-v3
encoding: protobuf
commitInterval: 1s
deliveryMode: message

clickhouse:
dsn: "clickhouse://xatu-clickhouse-01:9000/consumoor"
tableSuffix: "_local"
chgo:
maxConns: 32
queryTimeout: 120s
defaults:
batchSize: 1000
batchBytes: 10485760
flushInterval: 1s
bufferSize: 10000
insertSettings:
insert_quorum: 0
52 changes: 52 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ services:
dockerfile: Dockerfile
ports:
- "${XATU_SERVER_ADDRESS:-0.0.0.0}:${XATU_SERVER_PORT:-8080}:8080"
- "${XATU_SERVER_HTTP_ADDRESS:-0.0.0.0}:${XATU_SERVER_HTTP_PORT:-8087}:8087"
# environment:
# SERVER_EVENT_INGESTER_BASIC_AUTH_USERNAME: ${SERVER_EVENT_INGESTER_BASIC_AUTH_USERNAME:-xatu}
# SERVER_EVENT_INGESTER_BASIC_AUTH_PASSWORD: ${SERVER_EVENT_INGESTER_BASIC_AUTH_PASSWORD:-example}
Expand Down Expand Up @@ -355,6 +356,17 @@ services:
kafka-topics --create --if-not-exists --bootstrap-server xatu-kafka:29092 --partitions 1 --replication-factor 1 --config cleanup.policy=delete --config retention.ms=259200000 --config compression.type=snappy --config segment.bytes=536870912 --topic "$$topic"
done

# Proto topics for consumoor (protobuf encoding)
for topic in "$${large_message_topics[@]}"; do
echo "Creating proto topic with 10MB message limit: proto-$$topic";
kafka-topics --create --if-not-exists --bootstrap-server xatu-kafka:29092 --partitions 1 --replication-factor 1 --config cleanup.policy=delete --config retention.ms=259200000 --config compression.type=snappy --config segment.bytes=536870912 --config max.message.bytes=10485760 --topic "proto-$$topic"
done

for topic in "$${regular_topics[@]}"; do
echo "Creating proto topic: proto-$$topic";
kafka-topics --create --if-not-exists --bootstrap-server xatu-kafka:29092 --partitions 1 --replication-factor 1 --config cleanup.policy=delete --config retention.ms=259200000 --config compression.type=snappy --config segment.bytes=536870912 --topic "proto-$$topic"
done

sleep 3;
depends_on:
xatu-kafka:
Expand Down Expand Up @@ -565,6 +577,24 @@ services:
condition: service_healthy
networks:
- xatu-net
xatu-clickhouse-consumoor-init:
profiles:
- ""
image: "clickhouse/clickhouse-server:${CHVER:-latest}"
container_name: xatu-clickhouse-consumoor-init
entrypoint: ["bash", "/init.sh"]
volumes:
- ./deploy/local/docker-compose/consumoor-clickhouse-init.sh:/init.sh:ro
depends_on:
xatu-clickhouse-migrator:
condition: service_completed_successfully
xatu-clickhouse-01:
condition: service_healthy
xatu-clickhouse-02:
condition: service_healthy
networks:
- xatu-net

tempo-init:
image: busybox:latest
user: root
Expand Down Expand Up @@ -638,6 +668,28 @@ services:
depends_on:
- xatu-server

xatu-consumoor:
profiles:
- ""
command: consumoor --config /etc/consumoor/config.yaml
container_name: xatu-consumoor
build:
context: .
dockerfile: Dockerfile
volumes:
- ./deploy/local/docker-compose/xatu-consumoor.yaml:/etc/consumoor/config.yaml
networks:
- xatu-net
depends_on:
xatu-kafka:
condition: service_healthy
xatu-clickhouse-01:
condition: service_healthy
xatu-clickhouse-consumoor-init:
condition: service_completed_successfully
xatu-init-kafka:
condition: service_completed_successfully

xatu-cannon:
profiles:
- "cannon"
Expand Down
17 changes: 8 additions & 9 deletions pkg/consumoor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ Kafka (Benthos kafka_franz) → xatu_clickhouse output (decode + route + classif

### Adding a New Event

1. Add or update a table route in `sinks/clickhouse/transform/flattener/tables/<domain>/*.go` using:
`flattener.`
` From(...).`
` To(...).`
` Apply(...).`
` Build()`
2. For custom behavior, implement `flattener.Route` directly or use `.If(...)` / `.Mutator(...)` on the route pipeline
3. Write a ClickHouse migration for the target table
4. Add or update unit tests in `sinks/clickhouse/transform/flattener/routes_test.go`
1. Generate a typed row struct using `chgo-rowgen` (see `tables/<domain>/cmd/`) — produces a `.gen.go` file with `ToMap()`, `SetMetadata()`, `SetAnyColumn()`, `GetColumn()`, etc.
2. Add a hand-written `$table.go` in `sinks/clickhouse/transform/flattener/tables/<domain>/` with:
- `flattenXxx()` function returning `[]map[string]any`
- `newXxxRow()` constructor calling `setRuntime()`, `SetMetadata()`, `setPayload()`, and optionally `setClientAdditionalData()` / `setServerAdditionalData()`
- `init()` registering via `catalog.MustRegister(flattener.NewStaticRoute(tableName, eventNames, flattenFn))`
3. For conditional routing, use `flattener.WithStaticRoutePredicate(...)` option
4. Write a ClickHouse migration for the target table
5. Add or update unit tests in `sinks/clickhouse/transform/flattener/routes_test.go`

### Configuration

Expand Down
4 changes: 4 additions & 0 deletions pkg/consumoor/consumoor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func New(

router := consrouter.New(log, registeredRoutes, disabledEvents, metrics)

// Register columnar batch factories from routes on the writer so
// each table gets zero-reflection inserts.
writer.RegisterBatchFactories(registeredRoutes)

c := &Consumoor{
log: log.WithField("component", "consumoor"),
config: config,
Expand Down
5 changes: 5 additions & 0 deletions pkg/consumoor/sinks/clickhouse/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type Config struct {
// DSN is the ClickHouse connection string.
DSN string `yaml:"dsn"`

// TableSuffix is appended to every table name before writing.
// For example, set to "_local" to bypass Distributed tables and write
// directly to ReplicatedMergeTree tables in a clustered setup.
TableSuffix string `yaml:"tableSuffix"`

// Defaults are the default batch settings for all tables.
Defaults TableConfig `yaml:"defaults"`

Expand Down
Loading
Loading