diff --git a/f1-telemetry/Dockerfile b/f1-telemetry/Dockerfile new file mode 100644 index 0000000..8bcc2df --- /dev/null +++ b/f1-telemetry/Dockerfile @@ -0,0 +1,22 @@ +# Two-stage build. Produces a small image containing both binaries; the +# docker-compose file decides which one to run via `command:`. + +FROM golang:1.22-alpine AS build +WORKDIR /src + +# Cache deps separately for fast incremental builds. +COPY go.mod go.sum* ./ +RUN go mod download || true + +COPY . . + +RUN CGO_ENABLED=0 go build -o /out/server ./cmd/server +RUN CGO_ENABLED=0 go build -o /out/simulator ./cmd/simulator + +FROM gcr.io/distroless/static:nonroot +WORKDIR /app +COPY --from=build /out/server /app/server +COPY --from=build /out/simulator /app/simulator +USER nonroot:nonroot +# Default to running the server; docker-compose overrides per-service. +CMD ["/app/server"] diff --git a/f1-telemetry/F1_Telemetry_UI_view.jpg b/f1-telemetry/F1_Telemetry_UI_view.jpg new file mode 100644 index 0000000..9af06f4 Binary files /dev/null and b/f1-telemetry/F1_Telemetry_UI_view.jpg differ diff --git a/f1-telemetry/README.md b/f1-telemetry/README.md new file mode 100644 index 0000000..69adf96 --- /dev/null +++ b/f1-telemetry/README.md @@ -0,0 +1,164 @@ +# F1 Telemetry — CedarDB Live Demo + +![F1 telemetry UI](./F1_Telemetry_UI_view.jpg) + +A small Go project that simulates a 20 car Formula 1 race, streams 10 Hz +telemetry into CedarDB, and renders a live dashboard fed by the same database +the simulator writes into (e.g. an HTAP workload). + +The point of the demo is the **concurrent ingest + analytic reads** story: +the simulator is `INSERT`-ing into `telemetry` and `laps` continuously while +the web server runs `SELECT`s against those tables every 200 ms to drive +the SSE stream. + +What is _SSE_? SSE means _Server-Sent Events_: a streaming HTTP protocol where the server +keeps a long-lived response open and writes data: `{...}\n\n` chunks down it as +things happen, instead of the client polling repeatedly. + +## Quick Start: Run it in Docker + +``` +docker compose up --build +``` + +* Open a browser tab to [the app](http://localhost:8080) +* Be sure to also click the _SQL queries_ [link](http://localhost:8080/static/queries.html) at the bottom of the page + +## What's inside + +``` +schema.sql -- 5 tables: sessions, drivers, telemetry, laps, events +docker-compose.yml -- CedarDB + simulator + web server +Dockerfile -- builds both Go binaries +cmd/simulator/main.go -- runs the race +cmd/server/main.go -- serves the dashboard +internal/db/db.go -- pgx pool wrapper +internal/sim/track.go -- Cedar Park Circuit: Catmull-Rom spline through + hand-picked control points (straight, hairpin, + chicane, sweepers) +internal/sim/simulator.go -- 20 cars, 10 Hz, COPY-batched INSERTs + (per-driver pace + corner bias + form drift) +internal/web/server.go -- HTTP + SSE + leaderboard HTML fragment + /static +internal/web/templates/ -- HTMX + small inline JS dashboard +internal/web/static/ -- embedded /static/ assets (logo) +``` + + +## Run it (local Go, your own CedarDB) + +If you already have CedarDB running: + +``` +export DATABASE_URL="postgres://USER:PASS@HOST:5432/DB?sslmode=disable" +psql "$DATABASE_URL" -f schema.sql + +go run ./cmd/simulator -laps=50 -hz=10 -track=cedar-park & +go run ./cmd/server -addr=:8080 +# open http://localhost:8080 +``` + +## Picking a track + +Six selectable circuits are baked in. Each is drawn from a hand-picked set +of control points run through a centripetal Catmull-Rom spline, with the +matching lap length chosen to put average speed in a realistic F1 band. + +``` +go run ./cmd/simulator -list-tracks +``` + +| key | display name | lap | character | +|----------------|---------------------|-------|----------------------------------------------------------| +| `cedar-park` | Cedar Park Circuit | 5.4 km| Long straight, fast right-hander, chicane, hairpin | +| `sprint-oval` | Sprint Oval | 4.2 km| Two long straights, two big sweepers — fastest avg speed | +| `highline` | Highline Esses | 5.2 km| Flowing shallow esses across a long horizontal band | +| `crescent` | Crescent Bay | 5.1 km| Asymmetric arc with a tight infield twist on the south | +| `pinewood` | Pinewood Climb | 4.8 km| Vertical layout, two opposed hairpins, narrow infield | +| `downtown` | Downtown Loop | 4.4 km| Street circuit — boxy, right-angle turns, slow avg | + +Use `-track ` to pick one, or `-track random` to let the simulator +choose. The dashboard reads the chosen preset's name from `sessions.track` +and rebuilds the same geometry — no track polyline is persisted, both +sides derive it from the same `internal/sim/track.go` preset map. + +The dashboard also draws a checkered band across the start/finish line so +you can see where lap timing begins. Position and orientation come from +the `/track.json` endpoint, which returns the start point plus a unit +normal vector at trackPos = 0. + +## Architecture + +``` + ┌──────────────┐ INSERT (COPY) ┌────────────────────┐ + │ simulator │ ─────────────────────► │ CedarDB │ + │ (20 cars, │ telemetry + laps │ sessions/drivers/ │ + │ 10 Hz) │ │ telemetry/laps/ │ + └──────────────┘ │ events │ + └─────────┬──────────┘ + │ 4 concurrent reads: + │ - SSE snapshot (every 200 ms) + │ - leaderboard (every 500 ms) + │ - speed-map agg (every 3 s) + │ - ingest-rate (every 1 s) + ▼ + ┌────────────────────┐ + │ Go web server │ + │ /sse/state │ + │ /api/leaderboard │ + │ /api/speed-map │ + │ /api/ingest-rate │ + └─────────┬──────────┘ + ▼ + ┌────────────────────┐ + │ browser (HTMX) │ + │ track map + │ + │ speed ribbon + │ + │ leaderboard + │ + │ live ingest rate │ + └────────────────────┘ +``` + +All four reads run against the same `telemetry` table the simulator is +INSERTing into. The speed-map and ingest-rate panels exist specifically to +make the concurrent-OLAP-on-OLTP story visible without saying it out loud. + +## Key queries to read + +The single query worth opening the source for is the SSE snapshot in +`internal/web/server.go::loadSnapshot`: + +```sql +WITH latest AS ( + SELECT DISTINCT ON (driver_id) + driver_id, lap, pos_x, pos_y, speed_kph, + rpm, gear, throttle, brake, drs + FROM telemetry + WHERE session_id = $1 + ORDER BY driver_id, ts DESC +), +lap_stats AS ( + SELECT driver_id, + MIN(lap_time_ms) AS best_lap_ms, + (ARRAY_AGG(lap_time_ms ORDER BY lap_number DESC))[1] AS last_lap_ms + FROM laps WHERE session_id = $1 GROUP BY driver_id +) +SELECT ... FROM latest JOIN drivers JOIN lap_stats ... +``` + +This runs five times a second over the same table the simulator is +inserting into. `DISTINCT ON` aligns with the +`(session_id, driver_id, ts DESC)` index so the latest row per driver is +returned without scanning the whole partition. + +## Knobs + +``` +go run ./cmd/simulator -laps=50 -hz=10 + ↑ ↑ + │ └─ inserts/sec = 20 * hz (200 default) + └─ how many laps before the session ends +``` + +Try `-hz=50` for ~1000 inserts/sec, or run multiple simulators against the +same DB (each creates its own `session_id`). + diff --git a/f1-telemetry/build.sh b/f1-telemetry/build.sh new file mode 100755 index 0000000..7e63092 --- /dev/null +++ b/f1-telemetry/build.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +CGO_ENABLED=0 go build -o ./out/server ./cmd/server +CGO_ENABLED=0 go build -o ./out/simulator ./cmd/simulator + diff --git a/f1-telemetry/clean.sh b/f1-telemetry/clean.sh new file mode 100755 index 0000000..dd0277e --- /dev/null +++ b/f1-telemetry/clean.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +rm -f ./out/server +rm -f ./out/simulator + diff --git a/f1-telemetry/cmd/server/main.go b/f1-telemetry/cmd/server/main.go new file mode 100644 index 0000000..8f0a9de --- /dev/null +++ b/f1-telemetry/cmd/server/main.go @@ -0,0 +1,53 @@ +// Command server runs the web dashboard. Connect with your browser to +// http://localhost:8080 once the simulator has produced at least one row. +package main + +import ( + "context" + "flag" + "log" + "net/http" + "os/signal" + "syscall" + "time" + + "github.com/cedardb-demo/f1-telemetry/internal/db" + "github.com/cedardb-demo/f1-telemetry/internal/web" +) + +func main() { + addr := flag.String("addr", ":8080", "http listen addr") + flag.Parse() + + ctx, cancel := signal.NotifyContext(context.Background(), + syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + pool, err := db.Connect(ctx) + if err != nil { + log.Fatalf("db: %v", err) + } + defer pool.Close() + + srv, err := web.NewServer(pool) + if err != nil { + log.Fatalf("server: %v", err) + } + + httpSrv := &http.Server{ + Addr: *addr, + Handler: srv.Routes(), + ReadHeaderTimeout: 5 * time.Second, + } + go func() { + <-ctx.Done() + shutdown, c := context.WithTimeout(context.Background(), 5*time.Second) + defer c() + _ = httpSrv.Shutdown(shutdown) + }() + + log.Printf("dashboard listening on %s", *addr) + if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("listen: %v", err) + } +} diff --git a/f1-telemetry/cmd/simulator/main.go b/f1-telemetry/cmd/simulator/main.go new file mode 100644 index 0000000..5facaaf --- /dev/null +++ b/f1-telemetry/cmd/simulator/main.go @@ -0,0 +1,109 @@ +// Command simulator drives the 20-car race loop and INSERTs telemetry/laps +// into CedarDB. Run this in one terminal; run cmd/server in another. +// +// Pick a track with -track ; list available presets with -list-tracks. +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/cedardb-demo/f1-telemetry/internal/db" + "github.com/cedardb-demo/f1-telemetry/internal/sim" +) + +func main() { + totalLaps := flag.Int("laps", 50, "total race laps") + tickHz := flag.Int("hz", 10, "telemetry sample rate") + trackKey := flag.String("track", "cedar-park", + `track preset key — see -list-tracks for the full set, or "random"`) + listTracks := flag.Bool("list-tracks", false, "print the available track presets and exit") + resetSchema := flag.Bool("reset-schema", false, + "drop & recreate all tables on startup — destroys all data from prior sessions") + flag.Parse() + + if *listTracks { + printTracks() + return + } + + chosen := *trackKey + if strings.EqualFold(strings.TrimSpace(chosen), "random") { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + chosen = sim.Presets[rng.Intn(len(sim.Presets))].Key + log.Printf("track=random picked=%s", chosen) + } + + ctx, cancel := signal.NotifyContext(context.Background(), + os.Interrupt, syscall.SIGTERM) + defer cancel() + + pool, err := db.Connect(ctx) + if err != nil { + log.Fatalf("db: %v", err) + } + defer pool.Close() + log.Printf("connected to CedarDB") + + // Schema bootstrap. schema.sql is idempotent (every statement is + // CREATE … IF NOT EXISTS), so we always call ApplySchema on cold + // start — no SchemaPresent gating. The -reset-schema flag layers + // destructive DROPs on top via ResetSchema. + // + // SchemaPresent is still called for diagnostic logging so it's + // obvious from stdout whether the tables were already there. + if *resetSchema { + log.Printf("-reset-schema set: wiping all data and re-creating tables") + if err := db.ResetSchema(ctx, pool); err != nil { + log.Fatalf("reset schema: %v", err) + } + } else { + present, err := db.SchemaPresent(ctx, pool) + switch { + case err != nil: + log.Printf("schema-presence probe failed (continuing anyway): %v", err) + case present: + log.Printf("schema-presence probe: sessions table already present") + default: + log.Printf("schema-presence probe: sessions table missing") + } + if err := db.ApplySchema(ctx, pool); err != nil { + log.Fatalf("apply schema: %v", err) + } + } + + track := sim.GenerateTrack(chosen, 240) + drivers := sim.DefaultGrid() + + s, err := sim.NewSimulator(ctx, pool, track, drivers, *totalLaps, *tickHz) + if err != nil { + log.Fatalf("setup: %v", err) + } + log.Printf("session_id=%d track=%q drivers=%d laps=%d hz=%d", + s.SessionID, track.Name, len(drivers), *totalLaps, *tickHz) + + if err := s.Run(ctx); err != nil && ctx.Err() == nil { + log.Fatalf("run: %v", err) + } + log.Printf("session %d finished cleanly", s.SessionID) +} + +func printTracks() { + fmt.Println("Available track presets:") + fmt.Println() + for _, p := range sim.Presets { + fmt.Printf(" %-13s %s\n", p.Key, p.Display) + fmt.Printf(" %-13s %s\n", "", p.Tagline) + fmt.Printf(" %-13s %.0f m\n", "", p.LapMeters) + fmt.Println() + } + fmt.Println(`Use: -track (or "random" to pick one at random)`) +} diff --git a/f1-telemetry/docker-compose.yml b/f1-telemetry/docker-compose.yml new file mode 100644 index 0000000..c628558 --- /dev/null +++ b/f1-telemetry/docker-compose.yml @@ -0,0 +1,54 @@ +# Brings up CedarDB alongside the Go simulator and web dashboard. +# +# IMPORTANT: the CedarDB image name and the env vars below reflect the +# CedarDB Postgres-protocol container as of mid-2025. If your image differs, +# adjust `image:` and the DATABASE_URL accordingly -- the rest of the demo +# is purely standard Postgres wire protocol and should not need to change. + +services: + cedardb: + image: cedardb/cedardb:latest + container_name: f1-cedardb + # CedarDB speaks the Postgres protocol on 5432. + ports: + - "5432:5432" + environment: + # These mirror the postgres image conventions; if the CedarDB image + # uses different names check its docs and rename. + CEDAR_PASSWORD: postgres + VERBOSITY: DEBUG1 + volumes: + - cedar-data:/var/lib/cedardb + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d cedar || exit 1"] + interval: 3s + timeout: 2s + retries: 30 + + simulator: + build: . + container_name: f1-simulator + command: ["/app/simulator", "-track=random", "-laps=50", "-hz=10"] + depends_on: + cedardb: + condition: service_healthy + environment: + DATABASE_URL: "postgresql://postgres:postgres@cedardb:5432/postgres?sslmode=require" + restart: on-failure + + server: + build: . + container_name: f1-server + command: ["/app/server", "-addr=:8080"] + depends_on: + cedardb: + condition: service_healthy + environment: + DATABASE_URL: "postgresql://postgres:postgres@cedardb:5432/postgres?sslmode=require" + ports: + - "8080:8080" + restart: on-failure + +volumes: + cedar-data: + diff --git a/f1-telemetry/go.mod b/f1-telemetry/go.mod new file mode 100644 index 0000000..b48d244 --- /dev/null +++ b/f1-telemetry/go.mod @@ -0,0 +1,14 @@ +module github.com/cedardb-demo/f1-telemetry + +go 1.22 + +require github.com/jackc/pgx/v5 v5.6.0 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/text v0.18.0 // indirect +) diff --git a/f1-telemetry/go.sum b/f1-telemetry/go.sum new file mode 100644 index 0000000..b9bb4b2 --- /dev/null +++ b/f1-telemetry/go.sum @@ -0,0 +1,28 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/f1-telemetry/internal/db/db.go b/f1-telemetry/internal/db/db.go new file mode 100644 index 0000000..d898a6b --- /dev/null +++ b/f1-telemetry/internal/db/db.go @@ -0,0 +1,53 @@ +// Package db wraps the connection pool to CedarDB. +// +// CedarDB speaks the Postgres wire protocol, so we use pgx unmodified. The +// only thing different from a normal Postgres setup is the DSN, which the +// caller supplies via DATABASE_URL. +package db + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// Connect dials CedarDB using DATABASE_URL from the environment. +// +// Example DSN: +// +// postgres://postgres:postgres@localhost:5432/cedar?sslmode=disable +func Connect(ctx context.Context) (*pgxpool.Pool, error) { + dsn := os.Getenv("DATABASE_URL") + if dsn == "" { + return nil, fmt.Errorf("DATABASE_URL is not set") + } + + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("parse DATABASE_URL: %w", err) + } + // Generous pool: the simulator opens 1 long-lived conn for COPY-style + // inserts, the web server opens several short-lived ones for dashboard + // queries running concurrently. + cfg.MaxConns = 20 + cfg.MinConns = 2 + cfg.MaxConnLifetime = 30 * time.Minute + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("dial CedarDB: %w", err) + } + + // Verify the server is reachable before returning. Otherwise the first + // real query is the one that fails, which makes errors confusing. + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := pool.Ping(pingCtx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping CedarDB: %w", err) + } + return pool, nil +} diff --git a/f1-telemetry/internal/db/schema.go b/f1-telemetry/internal/db/schema.go new file mode 100644 index 0000000..916542a --- /dev/null +++ b/f1-telemetry/internal/db/schema.go @@ -0,0 +1,158 @@ +package db + +import ( + "context" + _ "embed" + "fmt" + "log" + "strings" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// SchemaSQL is the contents of the canonical schema.sql, embedded into the +// simulator binary at build time. Lives next to db.go so the //go:embed +// directive can pick it up — Go embed can only see files in the same +// package directory or below it. +// +// If you edit schema.sql you don't need to do anything else: a `go build` +// re-bakes it into the binary. +// +//go:embed schema.sql +var SchemaSQL string + +// SchemaPresent returns true if the canonical `sessions` table already +// exists in the connected database. We use this as a cheap "has the schema +// been applied?" check on simulator startup, so subsequent runs don't wipe +// data when the schema is already in place. +// +// We query information_schema.tables (SQL-standard, present in CedarDB) +// rather than Postgres's `to_regclass`, which CedarDB doesn't yet expose. +func SchemaPresent(ctx context.Context, pool *pgxpool.Pool) (bool, error) { + var present bool + if err := pool.QueryRow(ctx, ` + SELECT (SELECT COUNT(*) FROM information_schema.tables + WHERE table_name = 'sessions') = 1 + `).Scan(&present); err != nil { + return false, fmt.Errorf("schema presence check: %w", err) + } + return present, nil +} + +// ApplySchema runs the embedded schema.sql against the database. The file +// is idempotent — every statement uses CREATE TABLE IF NOT EXISTS or +// CREATE INDEX IF NOT EXISTS — so it's safe to call on every cold start +// without losing prior data. For destructive re-init, call ResetSchema +// instead (drops everything first, then calls ApplySchema). +// +// Two non-obvious things going on here: +// +// - We execute each statement separately, because CedarDB's protocol +// implementation doesn't currently accept multi-statement queries. The +// splitter strips `--` line comments and breaks on `;`; that's safe for +// our DDL since none of it embeds semicolons inside string literals or +// function bodies. +// - We force `pgx.QueryExecModeSimpleProtocol` on each Exec. pgx's default +// mode (CacheStatement) sends Parse → Bind → Execute through the +// extended protocol and caches the resulting prepared statement. +// CedarDB rejects (or, worse, silently swallows) DDL prepared that way. +// Simple-protocol Exec just sends the SQL text as a `Q` message, which +// CedarDB's parser handles correctly. +// +// Per-statement log lines give you visual confirmation that each piece of +// the schema was accepted; this is a once-per-cold-start operation so the +// chattiness doesn't matter. +func ApplySchema(ctx context.Context, pool *pgxpool.Pool) error { + statements := splitSQLStatements(SchemaSQL) + log.Printf("applying schema: %d statements", len(statements)) + for i, stmt := range statements { + if _, err := pool.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf( + "apply schema (statement %d of %d failed): %w\n--- failing statement ---\n%s\n", + i+1, len(statements), err, stmt, + ) + } + log.Printf(" [%2d/%d] %s — ok", i+1, len(statements), firstLine(stmt)) + } + log.Printf("schema applied") + return nil +} + +// ResetSchema drops every table the demo owns and then re-applies the +// embedded schema. Wired behind the simulator's `-reset-schema` flag so +// it's never called accidentally. Each DROP is sent via the simple +// protocol for the same reason ApplySchema uses simple protocol — +// CedarDB doesn't accept DDL through prepare/bind/execute. +func ResetSchema(ctx context.Context, pool *pgxpool.Pool) error { + // Order matters because of foreign-key-style cascades: drop the + // fact tables first, then the dimensions. CASCADE would handle this + // for us with real FKs, but we don't define any, so we just sequence. + drops := []string{ + "DROP TABLE IF EXISTS telemetry CASCADE", + "DROP TABLE IF EXISTS laps CASCADE", + "DROP TABLE IF EXISTS events CASCADE", + "DROP TABLE IF EXISTS drivers CASCADE", + "DROP TABLE IF EXISTS sessions CASCADE", + } + log.Printf("resetting schema: dropping %d tables", len(drops)) + for i, stmt := range drops { + if _, err := pool.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf( + "reset schema (drop %d of %d failed): %w\n--- failing statement ---\n%s\n", + i+1, len(drops), err, stmt, + ) + } + log.Printf(" [%d/%d] %s — ok", i+1, len(drops), stmt) + } + return ApplySchema(ctx, pool) +} + +// firstLine returns the first non-empty line of the statement, trimmed to +// a sensible length, for the per-statement progress log. +func firstLine(stmt string) string { + for _, line := range strings.Split(stmt, "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + if len(line) > 70 { + return line[:67] + "..." + } + return line + } + return "(empty)" +} + +// splitSQLStatements turns a SQL script into a slice of individual +// statements. Two-step process: strip `--` line comments first so semicolons +// in trailing comments don't confuse the splitter, then split on `;` and +// drop empty fragments. +// +// This is good enough for DDL that doesn't include string literals or +// PL/pgSQL function bodies with embedded `;` or `--`. The schema we ship +// satisfies that constraint; if you add `CREATE FUNCTION ... $$ ... $$` +// blocks or string DEFAULTs containing semicolons you'll need a smarter +// tokenizer here. +func splitSQLStatements(sql string) []string { + // Strip `-- …` line comments. We keep the newline so line numbers in + // downstream error messages still line up roughly with the source file. + var clean strings.Builder + clean.Grow(len(sql)) + for _, line := range strings.Split(sql, "\n") { + if idx := strings.Index(line, "--"); idx >= 0 { + line = line[:idx] + } + clean.WriteString(line) + clean.WriteByte('\n') + } + + out := make([]string, 0, 16) + for _, raw := range strings.Split(clean.String(), ";") { + stmt := strings.TrimSpace(raw) + if stmt != "" { + out = append(out, stmt) + } + } + return out +} diff --git a/f1-telemetry/internal/db/schema.sql b/f1-telemetry/internal/db/schema.sql new file mode 100644 index 0000000..fb20c14 --- /dev/null +++ b/f1-telemetry/internal/db/schema.sql @@ -0,0 +1,87 @@ +-- F1 telemetry schema for CedarDB (Postgres dialect). +-- +-- Five tables: +-- sessions - one row per race / practice session +-- drivers - 20 driver entries (one-time seed data) +-- telemetry - high-frequency car state (this is the hot table: ~200 rows/sec) +-- laps - one row per completed lap, with sector splits +-- events - race events (start, fastest lap, overtake, etc.) +-- +-- The dashboard joins telemetry x drivers for live state, and reads laps for +-- timing. All of this happens while the simulator is INSERT-ing into +-- telemetry/laps in real time, which is the CedarDB differentiator we want +-- to show: concurrent ingest + analytic queries on the same table. +-- +-- This file is intentionally idempotent: every statement is guarded with +-- IF NOT EXISTS so the simulator can call it on every cold start without +-- risk of wiping prior data. For destructive re-init, run the simulator +-- with the `-reset-schema` flag, which DROPs each table first. + +CREATE TABLE IF NOT EXISTS sessions ( + session_id BIGSERIAL PRIMARY KEY, + name TEXT NOT NULL, + track TEXT NOT NULL, + total_laps INTEGER NOT NULL, + started_at TIMESTAMPTZ NOT NULL DEFAULT now(), + ended_at TIMESTAMPTZ +); + +CREATE TABLE IF NOT EXISTS drivers ( + driver_id INTEGER PRIMARY KEY, + code TEXT NOT NULL, -- 3-letter code, e.g. 'VER' + full_name TEXT NOT NULL, + team TEXT NOT NULL, + car_number INTEGER NOT NULL, + color_hex TEXT NOT NULL -- team color for the UI +); + +-- Hot path. Inserted ~200 rows/sec (20 cars x 10 Hz). +CREATE TABLE IF NOT EXISTS telemetry ( + session_id BIGINT NOT NULL, + driver_id INTEGER NOT NULL, + ts TIMESTAMPTZ NOT NULL, + lap INTEGER NOT NULL, + track_pos DOUBLE PRECISION NOT NULL, -- 0..1 around the lap + pos_x DOUBLE PRECISION NOT NULL, + pos_y DOUBLE PRECISION NOT NULL, + speed_kph DOUBLE PRECISION NOT NULL, + rpm INTEGER NOT NULL, + gear SMALLINT NOT NULL, + throttle REAL NOT NULL, -- 0..1 + brake REAL NOT NULL, -- 0..1 + drs BOOLEAN NOT NULL +); + +-- Two indexes that match the two read patterns the dashboard uses: +-- 1) latest sample per (session, driver) -> track map + telemetry panel +-- 2) latest samples across whole session -> any "newest-first" scan +CREATE INDEX IF NOT EXISTS telemetry_session_driver_ts_idx + ON telemetry (session_id, driver_id, ts DESC); + +CREATE INDEX IF NOT EXISTS telemetry_session_ts_idx + ON telemetry (session_id, ts DESC); + +CREATE TABLE IF NOT EXISTS laps ( + session_id BIGINT NOT NULL, + driver_id INTEGER NOT NULL, + lap_number INTEGER NOT NULL, + lap_time_ms INTEGER NOT NULL, + sector1_ms INTEGER NOT NULL, + sector2_ms INTEGER NOT NULL, + sector3_ms INTEGER NOT NULL, + finished_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (session_id, driver_id, lap_number) +); + +CREATE INDEX IF NOT EXISTS laps_session_lap_idx ON laps (session_id, lap_number); + +CREATE TABLE IF NOT EXISTS events ( + event_id BIGSERIAL PRIMARY KEY, + session_id BIGINT NOT NULL, + driver_id INTEGER, + ts TIMESTAMPTZ NOT NULL, + kind TEXT NOT NULL, -- 'race_start','fastest_lap','overtake','drs_open' + detail TEXT +); + +CREATE INDEX IF NOT EXISTS events_session_ts_idx ON events (session_id, ts DESC); diff --git a/f1-telemetry/internal/sim/simulator.go b/f1-telemetry/internal/sim/simulator.go new file mode 100644 index 0000000..e070ded --- /dev/null +++ b/f1-telemetry/internal/sim/simulator.go @@ -0,0 +1,439 @@ +package sim + +import ( + "context" + "fmt" + "math" + "math/rand" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// Driver is the static description of one of the 20 entries. +type Driver struct { + ID int + Code string + FullName string + Team string + Number int + Color string + + // PaceFactor is the per-driver multiplier on target speed: 1.00 is + // average, 1.02 is a top-tier car, 0.95 is a backmarker. + PaceFactor float64 + + // CornerBias models a driver's preferred corner type. Positive values + // mean the driver is faster than their baseline in HIGH-speed corners + // (and proportionally slower in slow ones); negative is the opposite. + // Range roughly ±0.006 (0.6% of pace). + CornerBias float64 +} + +// DefaultGrid returns a 20-driver grid with fictional teams/codes so we +// don't impersonate real drivers. +// +// Pace model +// - Each TEAM has a baseline pace, top to bottom: 1.020 .. 0.930. +// - Within a team the two drivers are offset by ±0.005 around the team +// mean (so a ~1% intra-team gap, roughly one second per lap on a 100- +// second lap) — big enough that teammates visibly separate over 50 +// laps, small enough to stay realistic. +// - On top of that, each driver picks up a small random "sprinkle" +// (±0.0025) and a per-driver CornerBias (±0.006) so that some drivers +// gain in fast sweepers while others gain in slow corners. Average +// pace is preserved. +// +// The grid RNG is seeded with a fixed value so the demo is reproducible +// across runs (same fast/slow drivers each time); the per-tick sim noise +// (in advance()) still uses a wall-clock seed for variety. +func DefaultGrid() []Driver { + type teamSpec struct { + name string + color string + pace float64 + } + teams := []teamSpec{ + {"Apex Racing", "#1ee2a2", 1.020}, + {"Velocity GP", "#ff5577", 1.010}, + {"Crimson Wing", "#cc2222", 1.000}, + {"Solaris", "#ffaa00", 0.990}, + {"Nordic", "#3aa6ff", 0.980}, + {"Vortex", "#9966ff", 0.970}, + {"Iron Lotus", "#22cc88", 0.960}, + {"Meridian", "#bbbbbb", 0.950}, + {"Polestar Works", "#ff8c1a", 0.940}, + {"Sable", "#888888", 0.930}, + } + codes := []string{ + "ANV", "BRK", "CRO", "DEL", "EZK", "FAR", "GRV", "HIN", "ILA", "JEN", + "KOR", "LIA", "MUR", "NEV", "ORS", "PAX", "QIN", "ROS", "SAV", "TOR", + } + names := []string{ + "A. Anvers", "B. Brokk", "C. Cross", "D. Delacroix", "E. Ezakov", + "F. Faruq", "G. Greaves", "H. Hinata", "I. Ilanov", "J. Jensen", + "K. Korva", "L. Liang", "M. Murat", "N. Neves", "O. Orsini", + "P. Paxson", "Q. Quintero", "R. Rosso", "S. Savic", "T. Torres", + } + + gridRNG := rand.New(rand.NewSource(0xCEDA12B)) + out := make([]Driver, 20) + for i := 0; i < 20; i++ { + team := teams[i/2] + + // Intra-team: the #1 driver is 0.005 faster than the team mean, + // the #2 driver 0.005 slower. ~1% gap, ~1s/lap. + intraTeam := 0.005 + if i%2 == 1 { + intraTeam = -0.005 + } + // Small extra sprinkle so the order isn't perfectly tidy. + sprinkle := (gridRNG.Float64()*2 - 1) * 0.0025 + + // Corner specialty: zero-mean across drivers. A driver with +0.006 + // is ~0.6% faster than baseline on full straights / fast sweepers + // and ~0.6% slower in slow corners; the opposite for negative. + cornerBias := (gridRNG.Float64()*2 - 1) * 0.006 + + out[i] = Driver{ + ID: i + 1, + Code: codes[i], + FullName: names[i], + Team: team.name, + Number: 10 + i, + Color: team.color, + PaceFactor: team.pace + intraTeam + sprinkle, + CornerBias: cornerBias, + } + } + return out +} + +// state holds the per-driver mutable state during simulation. +type state struct { + driver Driver + trackPos float64 // 0..1 + speedKph float64 + lap int + lapStart time.Time + sectorStart time.Time + sector int // 0,1,2 + sectorTimes [3]int // ms + bestLap int // ms + finished bool + + // formDelta is a slow-varying per-driver pace bonus that random-walks + // during the race. It represents the driver's "form" / tyre state / + // fuel mass / general luck-of-the-stint. Bounded to ±0.008 so it never + // dominates the static PaceFactor difference between teams. + formDelta float64 +} + +// Simulator drives 20 cars around a Track and inserts both telemetry samples +// and completed laps into CedarDB. +type Simulator struct { + Pool *pgxpool.Pool + Track *Track + Drivers []Driver + SessionID int64 + TotalLaps int + TickHz int + + state []state + rng *rand.Rand +} + +// NewSimulator creates a session row, seeds drivers, and prepares per-car +// state on a starting grid. It does NOT begin the loop -- call Run for that. +func NewSimulator(ctx context.Context, pool *pgxpool.Pool, track *Track, drivers []Driver, totalLaps, tickHz int) (*Simulator, error) { + var sessionID int64 + err := pool.QueryRow(ctx, + `INSERT INTO sessions (name, track, total_laps) + VALUES ($1, $2, $3) RETURNING session_id`, + "Live Demo Race", track.Name, totalLaps, + ).Scan(&sessionID) + if err != nil { + return nil, fmt.Errorf("create session: %w", err) + } + + // Upsert drivers (idempotent across runs). + batch := &pgx.Batch{} + for _, d := range drivers { + batch.Queue( + `INSERT INTO drivers (driver_id, code, full_name, team, car_number, color_hex) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (driver_id) DO UPDATE SET + code=EXCLUDED.code, full_name=EXCLUDED.full_name, + team=EXCLUDED.team, car_number=EXCLUDED.car_number, + color_hex=EXCLUDED.color_hex`, + d.ID, d.Code, d.FullName, d.Team, d.Number, d.Color) + } + if err := pool.SendBatch(ctx, batch).Close(); err != nil { + return nil, fmt.Errorf("seed drivers: %w", err) + } + + // Starting grid: small backward offset per car so they don't all stack at + // the start line. Position is in track_pos units (fraction of a lap). + now := time.Now() + st := make([]state, len(drivers)) + for i, d := range drivers { + st[i] = state{ + driver: d, + trackPos: -0.001 * float64(i), // wraps to ~0.999 etc. + speedKph: 0, + lap: 1, + lapStart: now, + sectorStart: now, + } + } + + return &Simulator{ + Pool: pool, + Track: track, + Drivers: drivers, + SessionID: sessionID, + TotalLaps: totalLaps, + TickHz: tickHz, + state: st, + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + }, nil +} + +// Run advances the simulation at TickHz until all cars finish TotalLaps or +// the context is cancelled. Each tick produces one telemetry row per car; +// laps are inserted whenever a car crosses the start/finish line. +func (s *Simulator) Run(ctx context.Context) error { + // Insert a 'race_start' event for the timeline. + _, _ = s.Pool.Exec(ctx, + `INSERT INTO events (session_id, ts, kind, detail) VALUES ($1, now(), 'race_start', $2)`, + s.SessionID, fmt.Sprintf("%d cars, %d laps", len(s.Drivers), s.TotalLaps)) + + interval := time.Second / time.Duration(s.TickHz) + tick := time.NewTicker(interval) + defer tick.Stop() + + dt := 1.0 / float64(s.TickHz) // seconds per tick + + // Pre-allocate the batch we send each tick. + tele := make([][]any, 0, len(s.Drivers)) + + for { + select { + case <-ctx.Done(): + s.endSession(context.Background()) + return ctx.Err() + case now := <-tick.C: + tele = tele[:0] + allDone := true + + for i := range s.state { + cs := &s.state[i] + if cs.finished { + continue + } + allDone = false + s.advance(cs, dt) + + p, _ := s.Track.PosAt(cs.trackPos) + gear, rpm, throttle, brake, drs := derive(cs.speedKph, cs.trackPos, s.Track) + + tele = append(tele, []any{ + s.SessionID, cs.driver.ID, now, cs.lap, + cs.trackPos, p.X, p.Y, cs.speedKph, + rpm, gear, throttle, brake, drs, + }) + + // Check sector / lap crossings. + s.handleSectors(ctx, cs, now) + } + + if len(tele) > 0 { + _, err := s.Pool.CopyFrom(ctx, + pgx.Identifier{"telemetry"}, + []string{"session_id", "driver_id", "ts", "lap", + "track_pos", "pos_x", "pos_y", "speed_kph", + "rpm", "gear", "throttle", "brake", "drs"}, + pgx.CopyFromRows(tele), + ) + if err != nil { + return fmt.Errorf("copy telemetry: %w", err) + } + } + + if allDone { + s.endSession(ctx) + return nil + } + } + } +} + +// advance moves a single car forward by dt seconds. +// +// Pace is composed each tick from four pieces so that 20 cars with the same +// underlying physics actually finish in different positions: +// +// final_pace = driver.PaceFactor // static team + intra-team rank +// + driver.CornerBias * mix // dynamic, depends on corner type +// + state.formDelta // slow random walk, lap-scale drift +// + ε (1.5 km/h jitter) // sub-corner noise +// +// `mix` is 0 in a 90 km/h hairpin and 1 on a 320 km/h straight, so the bias +// term reverses sign smoothly across the lap. +func (s *Simulator) advance(cs *state, dt float64) { + baseTarget := s.Track.TargetSpeedAt(cs.trackPos) + + // Corner-type weighting: 0 = slow corner, 1 = fast straight. + const slowKph = 110.0 + const fastKph = 290.0 + mix := (baseTarget - slowKph) / (fastKph - slowKph) + if mix < 0 { + mix = 0 + } + if mix > 1 { + mix = 1 + } + // Positive CornerBias adds in fast sections, subtracts in slow ones. + cornerAdj := cs.driver.CornerBias * (2*mix - 1) + + // Slow random walk on form. ±1e-4 per tick, bounded to ±0.008. At 10 Hz + // this drifts on the order of 0.001/sec, so it takes a couple of laps + // to traverse the full range — i.e. lap-scale drift, not noise. + cs.formDelta += (s.rng.Float64()*2 - 1) * 1e-4 + if cs.formDelta > 0.008 { + cs.formDelta = 0.008 + } else if cs.formDelta < -0.008 { + cs.formDelta = -0.008 + } + + pace := cs.driver.PaceFactor + cornerAdj + cs.formDelta + target := baseTarget * pace + // Tiny noise so cars don't move in lockstep within a corner. + target += (s.rng.Float64()*2 - 1) * 1.5 + + // Accelerate / brake toward target. Real F1 cars hit ~3-5g braking and + // ~1-2g accel; we pick numbers that look right at 10Hz. + const accelKphPerSec = 95.0 + const brakeKphPerSec = 230.0 + delta := target - cs.speedKph + maxStep := accelKphPerSec * dt + if delta < 0 { + maxStep = brakeKphPerSec * dt + } + if math.Abs(delta) <= maxStep { + cs.speedKph = target + } else if delta > 0 { + cs.speedKph += maxStep + } else { + cs.speedKph -= maxStep + } + if cs.speedKph < 30 { + cs.speedKph = 30 // never crawl: avoids div-by-zero everywhere + } + + // Move along the track. trackPos is a fraction of a lap [0,1), so + // delta-pos = (meters traveled this tick) / (lap length in meters). + mps := cs.speedKph * 1000.0 / 3600.0 + cs.trackPos += (mps * dt) / s.Track.LapMeters +} + +// derive computes RPM, gear, throttle, brake, DRS from speed + track context. +// These are derived (not independently simulated) but consistent. +func derive(speedKph, trackPos float64, t *Track) (gear int16, rpm int, throttle, brake float32, drs bool) { + // Gear from speed bands. + switch { + case speedKph < 80: + gear = 2 + case speedKph < 130: + gear = 3 + case speedKph < 180: + gear = 4 + case speedKph < 230: + gear = 5 + case speedKph < 270: + gear = 6 + case speedKph < 305: + gear = 7 + default: + gear = 8 + } + // RPM rises within each gear, snaps down on shift. Roughly 8000-12000. + gearBase := []int{0, 7000, 8000, 8500, 9000, 9500, 10000, 10500, 11000} + rpm = gearBase[gear] + int(((speedKph-float64((int(gear)-1)*45))/45.0)*1500) + if rpm < 6500 { + rpm = 6500 + } + if rpm > 12000 { + rpm = 12000 + } + + // Throttle / brake based on speed vs the local target. Anticipatory: if the + // target ahead is much lower, we must be braking; if higher, on the gas. + target := t.TargetSpeedAt(trackPos) + switch { + case target < speedKph-15: + brake = float32(math.Min(1.0, (speedKph-target)/80.0)) + throttle = 0 + case target > speedKph+5: + throttle = 1 + brake = 0 + default: + throttle = 0.6 + brake = 0 + } + + // DRS open on long fast sections (rough proxy: target speed > 290). + drs = target > 290 && speedKph > 250 + return +} + +// handleSectors detects sector crossings and lap completion. +func (s *Simulator) handleSectors(ctx context.Context, cs *state, now time.Time) { + // Sector boundaries at 1/3, 2/3, 1.0. + boundaries := []float64{1.0 / 3.0, 2.0 / 3.0, 1.0} + for cs.sector < 3 && cs.trackPos >= boundaries[cs.sector] { + elapsed := int(now.Sub(cs.sectorStart) / time.Millisecond) + cs.sectorTimes[cs.sector] = elapsed + cs.sector++ + cs.sectorStart = now + if cs.sector == 3 { + // Lap complete. + lapMs := cs.sectorTimes[0] + cs.sectorTimes[1] + cs.sectorTimes[2] + s.insertLap(ctx, cs, lapMs, now) + cs.lap++ + cs.lapStart = now + cs.sector = 0 + cs.sectorTimes = [3]int{} + cs.trackPos -= 1.0 // wrap + if cs.bestLap == 0 || lapMs < cs.bestLap { + cs.bestLap = lapMs + } + if cs.lap > s.TotalLaps { + cs.finished = true + } + } + } +} + +func (s *Simulator) insertLap(ctx context.Context, cs *state, lapMs int, finishedAt time.Time) { + _, err := s.Pool.Exec(ctx, + `INSERT INTO laps (session_id, driver_id, lap_number, lap_time_ms, + sector1_ms, sector2_ms, sector3_ms, finished_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT DO NOTHING`, + s.SessionID, cs.driver.ID, cs.lap, lapMs, + cs.sectorTimes[0], cs.sectorTimes[1], cs.sectorTimes[2], finishedAt, + ) + if err != nil { + // Non-fatal: log and continue. The simulation should not stop because + // one INSERT failed. + fmt.Printf("insert lap: %v\n", err) + } +} + +func (s *Simulator) endSession(ctx context.Context) { + _, _ = s.Pool.Exec(ctx, + `UPDATE sessions SET ended_at = now() WHERE session_id = $1`, + s.SessionID) +} diff --git a/f1-telemetry/internal/sim/track.go b/f1-telemetry/internal/sim/track.go new file mode 100644 index 0000000..7124d62 --- /dev/null +++ b/f1-telemetry/internal/sim/track.go @@ -0,0 +1,302 @@ +package sim + +import ( + "math" + "strings" +) + +// Track is a closed loop of waypoints. Each waypoint carries a target speed +// that the cars try to drive at while passing through it. By giving curvy +// regions a low target speed and straights a high one, we get realistic-looking +// throttle/brake/gear behavior without simulating actual vehicle dynamics. +type Track struct { + Name string + Points []Point // closed loop, Points[0] == start/finish line + TargetKph []float64 // len == len(Points), target speed at each waypoint + + // Length is the geometric arc length of the SVG polyline, in the same + // arbitrary units as Points. Used for proportional lookup along the + // closed curve (see PosAt), nothing else. + Length float64 + + // LapMeters is the physical lap length used by the simulator to convert + // km/h into "fraction of lap per tick". Decoupling this from Length means + // we can pick a realistic F1 lap length (~5.4 km) without having to scale + // the SVG geometry up. + LapMeters float64 + + cumLen []float64 // cumulative arc length, for track_pos lookup +} + +type Point struct { + X, Y float64 +} + +// Preset describes one of the selectable track variants. Add new tracks by +// appending to the Presets slice — make sure the control points form a clean +// counter-clockwise loop with no self-intersections. +type Preset struct { + Key string // machine key (used by the simulator's -track flag) + Display string // human name; written into sessions.track + LapMeters float64 // physical lap length used by the simulator + Tagline string // one-line description shown by -list-tracks + Controls []Point // hand-picked spline control points (CCW) +} + +// Presets are the 6 selectable circuits. They're deliberately drawn to look +// obviously different from each other so it's easy to tell which one is up. +var Presets = []Preset{ + { + Key: "cedar-park", Display: "Cedar Park Circuit", LapMeters: 5400, + Tagline: "Balanced — long straight, fast right-hander, chicane, hairpin", + Controls: []Point{ + {160, 600}, {310, 600}, {480, 595}, {650, 590}, + {790, 565}, {860, 510}, {885, 420}, {870, 330}, + {790, 265}, {680, 230}, {570, 200}, {495, 230}, {420, 200}, + {310, 215}, {210, 260}, {155, 335}, + {130, 425}, {150, 520}, {135, 580}, + }, + }, + { + Key: "sprint-oval", Display: "Sprint Oval", LapMeters: 4200, + Tagline: "High-speed — long straights, two big sweepers, no real corners", + Controls: []Point{ + {160, 380}, {200, 250}, {340, 175}, {660, 175}, + {800, 250}, {840, 380}, {800, 510}, {660, 585}, + {340, 585}, {200, 510}, + }, + }, + { + Key: "highline", Display: "Highline Esses", LapMeters: 5200, + Tagline: "Flowing — shallow sweepers across a long horizontal band", + Controls: []Point{ + {140, 470}, {300, 410}, {470, 460}, {640, 400}, + {800, 450}, {875, 510}, {820, 570}, {660, 585}, + {480, 580}, {300, 575}, {170, 555}, + }, + }, + { + Key: "crescent", Display: "Crescent Bay", LapMeters: 5100, + Tagline: "Asymmetric arc with a tight infield twist on the south side", + Controls: []Point{ + {140, 540}, {310, 590}, {490, 595}, {660, 580}, + {790, 520}, {855, 410}, {820, 290}, {700, 220}, + {570, 230}, {490, 310}, {420, 360}, {340, 320}, + {240, 360}, {170, 450}, + }, + }, + { + Key: "pinewood", Display: "Pinewood Climb", LapMeters: 4800, + Tagline: "Vertical layout — two opposed hairpins, narrow infield", + Controls: []Point{ + {420, 620}, {510, 600}, {590, 560}, {635, 480}, + {590, 410}, {510, 380}, {460, 330}, {480, 250}, + {560, 190}, {660, 170}, {740, 220}, {750, 320}, + {700, 400}, {640, 460}, {640, 540}, {560, 590}, + }, + }, + { + Key: "downtown", Display: "Downtown Loop", LapMeters: 4400, + Tagline: "Street circuit — boxy silhouette, right-angle turns, slow average", + Controls: []Point{ + {180, 580}, {380, 590}, {560, 585}, {700, 565}, + {760, 510}, {760, 430}, {700, 410}, {640, 430}, + {640, 320}, {730, 290}, {730, 200}, {560, 190}, + {390, 195}, {300, 240}, {290, 340}, {240, 380}, + {240, 460}, {190, 490}, + }, + }, +} + +// PresetByName returns the preset whose Key or Display matches (case- and +// whitespace-insensitive). Falls back to Presets[0] (Cedar Park) if no +// match — that's intentional so a typo at the CLI gives you a working race +// rather than a panic. +func PresetByName(name string) Preset { + n := strings.ToLower(strings.TrimSpace(name)) + for _, p := range Presets { + if strings.ToLower(p.Key) == n || strings.ToLower(p.Display) == n { + return p + } + } + return Presets[0] +} + +// GenerateTrack builds the named preset by interpolating its control points +// with a centripetal Catmull-Rom spline. nameOrKey may be either a preset's +// Key (e.g. "cedar-park") or its Display name (e.g. "Cedar Park Circuit"); +// unknown names fall back to the first preset. +// +// nPoints is a hint for the desired number of sampled waypoints; it's +// rounded down to a clean number of samples per control segment. +func GenerateTrack(nameOrKey string, nPoints int) *Track { + p := PresetByName(nameOrKey) + if nPoints < 100 { + nPoints = 240 + } + samplesPerSeg := nPoints / len(p.Controls) + if samplesPerSeg < 8 { + samplesPerSeg = 8 + } + pts := closedCatmullRom(p.Controls, samplesPerSeg) + n := len(pts) + + t := &Track{ + Name: p.Display, + Points: pts, + TargetKph: make([]float64, n), + cumLen: make([]float64, n), + LapMeters: p.LapMeters, + } + + // Cumulative arc length, then total length (closing the loop). + for i := 1; i < n; i++ { + dx := t.Points[i].X - t.Points[i-1].X + dy := t.Points[i].Y - t.Points[i-1].Y + t.cumLen[i] = t.cumLen[i-1] + math.Hypot(dx, dy) + } + closeDx := t.Points[0].X - t.Points[n-1].X + closeDy := t.Points[0].Y - t.Points[n-1].Y + t.Length = t.cumLen[n-1] + math.Hypot(closeDx, closeDy) + + // Target speed at each waypoint, based on local curvature. Tight corners + // get low target speeds; straight sections get high ones. Curvature is + // approximated from the angle change across a small window scaled to the + // point density. + window := n / 40 + if window < 4 { + window = 4 + } + for i := 0; i < n; i++ { + a := (i - window + n) % n + b := (i + window) % n + v1x := t.Points[i].X - t.Points[a].X + v1y := t.Points[i].Y - t.Points[a].Y + v2x := t.Points[b].X - t.Points[i].X + v2y := t.Points[b].Y - t.Points[i].Y + ang := math.Atan2(v1x*v2y-v1y*v2x, v1x*v2x+v1y*v2y) + curve := math.Abs(ang) + + const maxKph = 325.0 + const minKph = 85.0 + kph := maxKph - (maxKph-minKph)*math.Min(1.0, curve/0.45) + t.TargetKph[i] = kph + } + + // Smooth the target-speed profile so braking zones extend back from the + // apex (cars start slowing before the corner). + for pass := 0; pass < 6; pass++ { + out := make([]float64, n) + for i := range t.TargetKph { + p := t.TargetKph[(i-1+n)%n] + c := t.TargetKph[i] + nx := t.TargetKph[(i+1)%n] + blend := 0.25*p + 0.5*c + 0.25*nx + out[i] = math.Min(blend, c+10) + } + t.TargetKph = out + } + + return t +} + +// PosAt returns the (x, y) point and the index of the waypoint just before +// the given track_pos in [0, 1). It interpolates linearly between waypoints. +func (t *Track) PosAt(pos float64) (Point, int) { + pos = pos - math.Floor(pos) // wrap to [0, 1) + target := pos * t.Length + + lo, hi := 0, len(t.cumLen)-1 + for lo < hi { + mid := (lo + hi + 1) / 2 + if t.cumLen[mid] <= target { + lo = mid + } else { + hi = mid - 1 + } + } + i := lo + next := (i + 1) % len(t.Points) + segStart := t.cumLen[i] + var segLen float64 + if next == 0 { + segLen = t.Length - segStart + } else { + segLen = t.cumLen[next] - segStart + } + frac := 0.0 + if segLen > 0 { + frac = (target - segStart) / segLen + } + p := Point{ + X: t.Points[i].X + frac*(t.Points[next].X-t.Points[i].X), + Y: t.Points[i].Y + frac*(t.Points[next].Y-t.Points[i].Y), + } + return p, i +} + +// TargetSpeedAt returns the target km/h at the given track position, with a +// short look-ahead so cars start braking BEFORE the corner. +func (t *Track) TargetSpeedAt(pos float64) float64 { + const lookAhead = 0.02 + _, i := t.PosAt(pos + lookAhead) + _, j := t.PosAt(pos + 2*lookAhead) + a := t.TargetKph[i] + b := t.TargetKph[j] + if b < a { + return b + } + return a +} + +// closedCatmullRom interpolates a closed loop of control points using a +// centripetal Catmull-Rom spline. +func closedCatmullRom(ctrl []Point, samplesPerSeg int) []Point { + n := len(ctrl) + out := make([]Point, 0, n*samplesPerSeg) + for i := 0; i < n; i++ { + p0 := ctrl[(i-1+n)%n] + p1 := ctrl[i] + p2 := ctrl[(i+1)%n] + p3 := ctrl[(i+2)%n] + + t0 := 0.0 + t1 := t0 + math.Sqrt(pointDist(p0, p1)) + t2 := t1 + math.Sqrt(pointDist(p1, p2)) + t3 := t2 + math.Sqrt(pointDist(p2, p3)) + if t1 <= t0 { + t1 = t0 + 1e-6 + } + if t2 <= t1 { + t2 = t1 + 1e-6 + } + if t3 <= t2 { + t3 = t2 + 1e-6 + } + + for s := 0; s < samplesPerSeg; s++ { + t := t1 + (t2-t1)*float64(s)/float64(samplesPerSeg) + out = append(out, catmullRomEval(p0, p1, p2, p3, t0, t1, t2, t3, t)) + } + } + return out +} + +// catmullRomEval evaluates the centripetal Catmull-Rom spline at parameter t, +// using the Barry-Goldman pyramid (three lerp levels). +func catmullRomEval(p0, p1, p2, p3 Point, t0, t1, t2, t3, t float64) Point { + a1 := lerpPoint(p0, p1, (t-t0)/(t1-t0)) + a2 := lerpPoint(p1, p2, (t-t1)/(t2-t1)) + a3 := lerpPoint(p2, p3, (t-t2)/(t3-t2)) + b1 := lerpPoint(a1, a2, (t-t0)/(t2-t0)) + b2 := lerpPoint(a2, a3, (t-t1)/(t3-t1)) + return lerpPoint(b1, b2, (t-t1)/(t2-t1)) +} + +func lerpPoint(a, b Point, t float64) Point { + return Point{X: a.X + t*(b.X-a.X), Y: a.Y + t*(b.Y-a.Y)} +} + +func pointDist(a, b Point) float64 { + return math.Hypot(a.X-b.X, a.Y-b.Y) +} diff --git a/f1-telemetry/internal/web/server.go b/f1-telemetry/internal/web/server.go new file mode 100644 index 0000000..56245fc --- /dev/null +++ b/f1-telemetry/internal/web/server.go @@ -0,0 +1,536 @@ +package web + +import ( + "context" + "embed" + "encoding/json" + "fmt" + "html/template" + "io/fs" + "log" + "math" + "net/http" + "strconv" + "time" + + "github.com/cedardb-demo/f1-telemetry/internal/sim" + "github.com/jackc/pgx/v5/pgxpool" +) + +//go:embed templates/*.html +var templatesFS embed.FS + +// staticFS holds files served under /static/. Drop replacement assets +// (e.g. the real cedardb-logo.svg or a PNG variant) into internal/web/static +// and rebuild — they'll be picked up automatically. +// +//go:embed static/* +var staticFS embed.FS + +// Server holds the HTTP routes and CedarDB pool. +type Server struct { + Pool *pgxpool.Pool + tpl *template.Template + static http.Handler +} + +func NewServer(pool *pgxpool.Pool) (*Server, error) { + tpl, err := template.ParseFS(templatesFS, "templates/*.html") + if err != nil { + return nil, fmt.Errorf("parse templates: %w", err) + } + sub, err := fs.Sub(staticFS, "static") + if err != nil { + return nil, fmt.Errorf("static sub: %w", err) + } + return &Server{ + Pool: pool, + tpl: tpl, + static: http.FileServer(http.FS(sub)), + }, nil +} + +func (s *Server) Routes() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/", s.handleIndex) + mux.HandleFunc("/track.json", s.handleTrackJSON) + mux.HandleFunc("/sse/state", s.handleSSE) + mux.HandleFunc("/api/leaderboard", s.handleLeaderboard) + mux.HandleFunc("/api/speed-map", s.handleSpeedMap) + mux.HandleFunc("/api/ingest-rate", s.handleIngestRate) + mux.Handle("/static/", http.StripPrefix("/static/", s.static)) + return mux +} + +// --- Models the SSE payload uses ----------------------------------------- + +type carState struct { + DriverID int `json:"driver_id"` + Code string `json:"code"` + Team string `json:"team"` + Color string `json:"color"` + Lap int `json:"lap"` + PosX float64 `json:"x"` + PosY float64 `json:"y"` + SpeedKph float64 `json:"speed_kph"` + RPM int `json:"rpm"` + Gear int16 `json:"gear"` + Throttle float32 `json:"throttle"` + Brake float32 `json:"brake"` + DRS bool `json:"drs"` + BestLapMs int `json:"best_lap_ms"` + LastLapMs int `json:"last_lap_ms"` +} + +type sessionInfo struct { + SessionID int64 `json:"session_id"` + Name string `json:"name"` + Track string `json:"track"` + TotalLaps int `json:"total_laps"` +} + +type stateSnapshot struct { + Session sessionInfo `json:"session"` + Cars []carState `json:"cars"` + StampMs int64 `json:"stamp_ms"` +} + +// --- Handlers ------------------------------------------------------------- + +func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + if err := s.tpl.ExecuteTemplate(w, "index.html", nil); err != nil { + log.Printf("template: %v", err) + } +} + +// handleTrackJSON returns the track polyline plus finish-line geometry so +// the client can render the SVG once per session. The track is stored in +// the DB only as session.track (display name); the geometry is regenerated +// deterministically server-side from the matching preset in +// internal/sim/track.go. +// +// The finish_line field contains the start/finish point and a unit normal +// vector pointing perpendicular to the racing line at that point, so the +// client can rotate a checkered marker into place without having to figure +// out the tangent itself. +func (s *Server) handleTrackJSON(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var trackName string + err := s.Pool.QueryRow(ctx, ` + SELECT track FROM sessions + ORDER BY (ended_at IS NULL) DESC, started_at DESC + LIMIT 1 + `).Scan(&trackName) + if err != nil { + // No session yet — fall back to the default preset so the page + // still loads with something visible. + trackName = sim.Presets[0].Display + } + t := sim.GenerateTrack(trackName, 240) + + pts := make([][2]float64, len(t.Points)) + for i, p := range t.Points { + pts[i] = [2]float64{p.X, p.Y} + } + + // Unit normal to the racing line at trackPos = 0, pointing "left" of the + // direction of travel. Used by the client to orient the checkered band. + p0 := t.Points[0] + p1 := t.Points[1%len(t.Points)] + dx := p1.X - p0.X + dy := p1.Y - p0.Y + norm := math.Hypot(dx, dy) + if norm == 0 { + norm = 1 + } + nx, ny := -dy/norm, dx/norm + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "points": pts, + "track_name": t.Name, + "finish_line": map[string]float64{ + "x": p0.X, + "y": p0.Y, + "nx": nx, + "ny": ny, + }, + }) +} + +// handleSSE streams a JSON snapshot of the live race state every 200ms. +// +// This is the panel that demonstrates "concurrent ingest + analytic reads on +// the same table." Each tick we run a window-function query over `telemetry` +// to pick the latest row per driver while the simulator is still INSERTing. +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + ctx := r.Context() + tick := time.NewTicker(200 * time.Millisecond) + defer tick.Stop() + + // Initial push. + if err := s.pushSnapshot(ctx, w, flusher); err != nil { + return + } + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + if err := s.pushSnapshot(ctx, w, flusher); err != nil { + return + } + } + } +} + +func (s *Server) pushSnapshot(ctx context.Context, w http.ResponseWriter, f http.Flusher) error { + snap, err := s.loadSnapshot(ctx) + if err != nil { + // Skip this tick on error but keep the stream open. + log.Printf("snapshot: %v", err) + return nil + } + buf, err := json.Marshal(snap) + if err != nil { + return err + } + if _, err := fmt.Fprintf(w, "data: %s\n\n", buf); err != nil { + return err + } + f.Flush() + return nil +} + +// loadSnapshot picks the live session and runs one query that returns the +// most recent telemetry row per driver, joined with driver metadata, plus +// their best / last lap times. +// +// Query notes: +// - DISTINCT ON (driver_id) ... ORDER BY driver_id, ts DESC matches the +// telemetry_session_driver_ts_idx index. +// - The laps lookup is cheap because the table is small relative to telemetry. +// +// CedarDB executes both reads concurrently with the simulator's INSERTs, +// which is the differentiator we want to showcase. +func (s *Server) loadSnapshot(ctx context.Context) (stateSnapshot, error) { + // Latest live (not-yet-ended) session, falling back to most recent. + var sess sessionInfo + err := s.Pool.QueryRow(ctx, ` + SELECT session_id, name, track, total_laps + FROM sessions + ORDER BY (ended_at IS NULL) DESC, started_at DESC + LIMIT 1 + `).Scan(&sess.SessionID, &sess.Name, &sess.Track, &sess.TotalLaps) + if err != nil { + return stateSnapshot{}, err + } + + rows, err := s.Pool.Query(ctx, ` + WITH latest AS ( + SELECT DISTINCT ON (driver_id) + driver_id, ts, lap, pos_x, pos_y, speed_kph, + rpm, gear, throttle, brake, drs + FROM telemetry + WHERE session_id = $1 + ORDER BY driver_id, ts DESC + ), + lap_stats AS ( + SELECT driver_id, + MIN(lap_time_ms) AS best_lap_ms, + (ARRAY_AGG(lap_time_ms ORDER BY lap_number DESC))[1] AS last_lap_ms + FROM laps + WHERE session_id = $1 + GROUP BY driver_id + ) + SELECT d.driver_id, d.code, d.team, d.color_hex, + l.lap, l.pos_x, l.pos_y, l.speed_kph, + l.rpm, l.gear, l.throttle, l.brake, l.drs, + COALESCE(s.best_lap_ms, 0), COALESCE(s.last_lap_ms, 0) + FROM latest l + JOIN drivers d ON d.driver_id = l.driver_id + LEFT JOIN lap_stats s ON s.driver_id = l.driver_id + ORDER BY d.driver_id + `, sess.SessionID) + if err != nil { + return stateSnapshot{}, err + } + defer rows.Close() + + cars := make([]carState, 0, 20) + for rows.Next() { + var c carState + if err := rows.Scan( + &c.DriverID, &c.Code, &c.Team, &c.Color, + &c.Lap, &c.PosX, &c.PosY, &c.SpeedKph, + &c.RPM, &c.Gear, &c.Throttle, &c.Brake, &c.DRS, + &c.BestLapMs, &c.LastLapMs, + ); err != nil { + return stateSnapshot{}, err + } + cars = append(cars, c) + } + return stateSnapshot{ + Session: sess, + Cars: cars, + StampMs: time.Now().UnixMilli(), + }, rows.Err() +} + +// handleLeaderboard is an HTMX-friendly HTML fragment returning a sorted +// leaderboard. The client polls this via hx-trigger="every 500ms". +func (s *Server) handleLeaderboard(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var sessID int64 + if err := s.Pool.QueryRow(ctx, ` + SELECT session_id FROM sessions + ORDER BY (ended_at IS NULL) DESC, started_at DESC LIMIT 1 + `).Scan(&sessID); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // "Race position" here is a simple approximation: lap * 1.0 + track_pos. + // Real F1 would use sector crossings, but this is good enough for a demo. + rows, err := s.Pool.Query(ctx, ` + WITH latest AS ( + SELECT DISTINCT ON (driver_id) + driver_id, ts, lap, track_pos + FROM telemetry + WHERE session_id = $1 + ORDER BY driver_id, ts DESC + ), + lap_stats AS ( + SELECT driver_id, + MIN(lap_time_ms) AS best_lap_ms, + MAX(lap_number) AS max_lap_number + FROM laps WHERE session_id = $1 GROUP BY driver_id + ), + last_lap AS ( + SELECT lp.driver_id, lp.lap_time_ms AS last_lap_ms + FROM laps lp + JOIN lap_stats ls + ON ls.driver_id = lp.driver_id + AND ls.max_lap_number = lp.lap_number + WHERE lp.session_id = $1 + ) + SELECT d.code, d.team, d.color_hex, + l.lap, l.track_pos, + COALESCE(s.best_lap_ms, 0), COALESCE(ll.last_lap_ms, 0), + (MAX(l.lap + l.track_pos) OVER () - (l.lap + l.track_pos)) AS gap_laps + FROM latest l + JOIN drivers d ON d.driver_id = l.driver_id + LEFT JOIN lap_stats s ON s.driver_id = l.driver_id + LEFT JOIN last_lap ll ON ll.driver_id = l.driver_id + ORDER BY (l.lap + l.track_pos) DESC + `, sessID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer rows.Close() + + type row struct { + Pos int + Code string + Team string + Color string + Lap int + Best string + Last string + Gap string + } + var out []row + leaderLastMs := 0 + i := 0 + for rows.Next() { + i++ + var ( + code, team, color string + lap int + tp, gapLaps float64 + best, last int + ) + if err := rows.Scan(&code, &team, &color, &lap, &tp, &best, &last, &gapLaps); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if i == 1 { + leaderLastMs = last + } + out = append(out, row{ + Pos: i, + Code: code, + Team: team, + Color: color, + Lap: lap, + Best: fmtLap(best), + Last: fmtLap(last), + Gap: fmtGap(i, gapLaps, leaderLastMs), + }) + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if err := s.tpl.ExecuteTemplate(w, "leaderboard.html", out); err != nil { + log.Printf("leaderboard tpl: %v", err) + } +} + +// handleSpeedMap returns avg speed per 1%-of-lap bucket across the whole +// session — a 100-row aggregation over the hot `telemetry` table that the +// simulator is INSERTing into. The dashboard polls this every few seconds +// and renders the result as a colored ribbon along the racing line, so the +// audience can watch the analytical query refresh against the same table +// the inserts are hitting. +func (s *Server) handleSpeedMap(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + var sessID int64 + if err := s.Pool.QueryRow(ctx, ` + SELECT session_id FROM sessions + ORDER BY (ended_at IS NULL) DESC, started_at DESC LIMIT 1 + `).Scan(&sessID); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + rows, err := s.Pool.Query(ctx, ` + SELECT WIDTH_BUCKET(track_pos, 0, 1, 100) AS bucket, + AVG(speed_kph) AS avg_kph, + COUNT(*) AS samples + FROM telemetry + WHERE session_id = $1 + GROUP BY bucket + ORDER BY bucket + `, sessID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer rows.Close() + + type bucket struct { + Bucket int `json:"bucket"` + AvgKph float64 `json:"avg_kph"` + Samples int64 `json:"samples"` + } + var out []bucket + for rows.Next() { + var b bucket + if err := rows.Scan(&b.Bucket, &b.AvgKph, &b.Samples); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + out = append(out, b) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"buckets": out}) +} + +// handleIngestRate returns the recent ingest rate and cumulative row count +// for the live session — a tiny query that's deliberately cheap to run, but +// whose entire point is to demonstrate the database measuring itself while +// it's also handling the simulator's INSERT stream. The dashboard polls this +// once per second and displays it in the footer. +func (s *Server) handleIngestRate(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + var sessID int64 + if err := s.Pool.QueryRow(ctx, ` + SELECT session_id FROM sessions + ORDER BY (ended_at IS NULL) DESC, started_at DESC LIMIT 1 + `).Scan(&sessID); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + var rowsLast1s, totalRows int64 + err := s.Pool.QueryRow(ctx, ` + SELECT + COUNT(*) FILTER (WHERE ts > now() - interval '1 second') AS rows_last_1s, + COUNT(*) AS total_rows + FROM telemetry + WHERE session_id = $1 + `, sessID).Scan(&rowsLast1s, &totalRows) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "rows_per_sec": rowsLast1s, + "total_rows": totalRows, + }) +} + +func fmtLap(ms int) string { + if ms <= 0 { + return "—" + } + m := ms / 60000 + rem := ms - m*60000 + sec := rem / 1000 + frac := rem - sec*1000 + if m > 0 { + return strconv.Itoa(m) + ":" + zPad(sec, 2) + "." + zPad(frac, 3) + } + return strconv.Itoa(sec) + "." + zPad(frac, 3) +} + +// fmtGap formats a "gap to leader" cell for the leaderboard. Inputs: +// - pos: 1-based finishing-order position (1 = leader). +// - gapLaps: gap in lap-fractions (leader_pos - this_pos), so 0.05 = 5% +// of a lap. +// - leaderLastMs: the leader's most recent lap time in ms, used to convert +// gapLaps into seconds. If 0 (leader hasn't completed a lap yet), the +// gap is shown as a lap-fraction instead. +// +// Output: +// - "—" for the leader +// - "+N LAP[S]" when at least one full lap behind +// - "+S.SSSs" using the leader's lap time when available +// - "+0.XXX L" as a fallback when the leader hasn't set a lap yet +func fmtGap(pos int, gapLaps float64, leaderLastMs int) string { + if pos == 1 { + return "—" + } + if gapLaps >= 1.0 { + n := int(gapLaps) + if n == 1 { + return "+1 LAP" + } + return "+" + strconv.Itoa(n) + " LAPS" + } + if leaderLastMs > 0 { + gapSec := gapLaps * float64(leaderLastMs) / 1000.0 + // "+S.SSSs" — show three decimals for small gaps to read like F1 TV. + whole := int(gapSec) + frac := int((gapSec - float64(whole)) * 1000) + return "+" + strconv.Itoa(whole) + "." + zPad(frac, 3) + "s" + } + // No leader lap time yet — fall back to fractional lap. + whole := int(gapLaps) + frac := int((gapLaps - float64(whole)) * 1000) + return "+" + strconv.Itoa(whole) + "." + zPad(frac, 3) + " L" +} + +func zPad(n, width int) string { + s := strconv.Itoa(n) + for len(s) < width { + s = "0" + s + } + return s +} diff --git a/f1-telemetry/internal/web/static/cedardb-logo.svg b/f1-telemetry/internal/web/static/cedardb-logo.svg new file mode 100644 index 0000000..192623c --- /dev/null +++ b/f1-telemetry/internal/web/static/cedardb-logo.svg @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/f1-telemetry/internal/web/static/checkered-flag.svg b/f1-telemetry/internal/web/static/checkered-flag.svg new file mode 100644 index 0000000..e8ae19c --- /dev/null +++ b/f1-telemetry/internal/web/static/checkered-flag.svg @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/f1-telemetry/internal/web/static/queries.html b/f1-telemetry/internal/web/static/queries.html new file mode 100644 index 0000000..a5bb112 --- /dev/null +++ b/f1-telemetry/internal/web/static/queries.html @@ -0,0 +1,611 @@ + + + + +SQL Queries — F1 Telemetry CedarDB Demo + + + + + + + + +
+

SQL queries · F1 telemetry × CedarDB

+ For each SQL statement run by the simulator or dashboard: the role it plays and how often it fires +
+ +

+ The demo's headline story is concurrent OLTP+OLAP on the same table. + The simulator streams ~200 rows/second into the telemetry table; + the dashboard runs five different read queries against the + same table, at refresh rates from 200ms to 3s: no replicas, no + ETL, and no batching. This page is the working catalog of those queries. +

+ + + + + + + + + + + + + + + + + + + + + + + + + + + +
PhaseQuerySourceCadenceTables
SSchema-presence probedb.SchemaPresentonce / cold startinformation_schema.tables
SApply schema (idempotent DDL)db.ApplySchemaonce / cold startsessions · drivers · telemetry · laps · events
SReset schema (destructive)db.ResetSchemaonly with -reset-schemaall 5 tables
WCreate sessionsimulator.NewSimulatoronce / racesessions
WSeed drivers (batch upsert)simulator.NewSimulator20 / racedrivers
WRace-start eventsimulator.Runonce / raceevents
WTelemetry bulk INSERT (COPY)simulator.Run10 Hz · 20 rows / ticktelemetry
WLap-completed INSERTsimulator.insertLap~1 000 / racelaps
WEnd sessionsimulator.endSessiononce / racesessions
RPick current session5 endpoints share this~8 / s combined / clientsessions
RSSE state snapshotweb.loadSnapshot5 Hz (every 200 ms)telemetry · laps · drivers
RLeaderboard with gap to leaderweb.handleLeaderboard2 Hz (every 500 ms)telemetry · laps · drivers
RSpeed-map heatmapweb.handleSpeedMap0.33 Hz (every 3 s)telemetry
RLive ingest rateweb.handleIngestRate1 Hztelemetry
+ +

Setup path · Simulator startup3 stages

+ +
+
+

§0.1Schema-presence probe

+
+ internal/db/schema.go · SchemaPresent() + runs: once at simulator cold start + tables: information_schema.tables +
+
+
SELECT (SELECT COUNT(*) FROM information_schema.tables
+         WHERE table_name = 'sessions') = 1;
+
+

A cheap "have I been here before?" check the simulator logs on every cold start. + Purely diagnostic — ApplySchema is called either way, since + every statement in schema.sql is guarded with + IF NOT EXISTS. We probe information_schema.tables on + purpose: it's SQL-standard and supported by CedarDB.

+
+
+ +
+
+

§0.2Apply schema (idempotent DDL)

+
+ internal/db/schema.go · ApplySchema() + runs: once at simulator cold start — ~10 statements + tables: sessions · drivers · telemetry · laps · events +
+
+
CREATE TABLE IF NOT EXISTS sessions  ( … );
+CREATE TABLE IF NOT EXISTS drivers   ( … );
+CREATE TABLE IF NOT EXISTS telemetry ( … );
+CREATE INDEX IF NOT EXISTS telemetry_session_driver_ts_idx
+    ON telemetry (session_id, driver_id, ts DESC);
+CREATE INDEX IF NOT EXISTS telemetry_session_ts_idx
+    ON telemetry (session_id, ts DESC);
+CREATE TABLE IF NOT EXISTS laps     ( … );
+CREATE INDEX IF NOT EXISTS laps_session_lap_idx ON laps (session_id, lap_number);
+CREATE TABLE IF NOT EXISTS events    ( … );
+CREATE INDEX IF NOT EXISTS events_session_ts_idx ON events (session_id, ts DESC);
+
+

The canonical schema lives in internal/db/schema.sql and is + embedded into the simulator binary via //go:embed.

+

Indexes created here are the ones every read query on this page relies + on: telemetry_session_driver_ts_idx drives §8 / §9's + DISTINCT ON (driver_id) seek; laps_session_lap_idx + backs §9's MAX(lap_number) + self-join lookup.

+
+
+ +
+
+

§0.3Reset schema (destructive)

+
+ internal/db/schema.go · ResetSchema() + runs: only when started with -reset-schema + tables: telemetry · laps · events · drivers · sessions +
+
+
DROP TABLE IF EXISTS telemetry CASCADE;
+DROP TABLE IF EXISTS laps      CASCADE;
+DROP TABLE IF EXISTS events    CASCADE;
+DROP TABLE IF EXISTS drivers   CASCADE;
+DROP TABLE IF EXISTS sessions  CASCADE;
+-- then re-runs §0.2 to recreate everything
+
+

Wired behind the simulator's -reset-schema CLI flag so it's never called accidentally.

+
+
+ +

Write path · Simulator → CedarDB6 statements

+ +
+
+

§1Create session

+
+ internal/sim/simulator.go · NewSimulator() + runs: once at race start + tables: sessions +
+
+
INSERT INTO sessions (name, track, total_laps)
+VALUES ($1, $2, $3)
+RETURNING session_id;
+
+

Allocates the BIGSERIAL session_id that every downstream write carries + as its tenant key. track is the human display name of the preset chosen + by the -track CLI flag (e.g. "Cedar Park Circuit") — the dashboard + reads it back later and uses it to regenerate matching geometry on the client side, + so we never persist the polyline itself.

+
+
+ +
+
+

§2Seed drivers (batch upsert)

+
+ internal/sim/simulator.go · NewSimulator() + runs: 20 statements in one pgx Batch, once at race start + tables: drivers +
+
+
INSERT INTO drivers (driver_id, code, full_name, team, car_number, color_hex)
+VALUES ($1, $2, $3, $4, $5, $6)
+ON CONFLICT (driver_id) DO UPDATE SET
+    code       = EXCLUDED.code,
+    full_name  = EXCLUDED.full_name,
+    team       = EXCLUDED.team,
+    car_number = EXCLUDED.car_number,
+    color_hex  = EXCLUDED.color_hex;
+
+

Idempotent upsert. The drivers table is small, fixed (20 rows), and never written + to during the race — it's separated from the hot fact table on purpose. Running this + in a pgx.Batch keeps the round-trip cost to one network turn.

+
+
+ +
+
+

§3Race-start event

+
+ internal/sim/simulator.go · Run() + runs: once at race start + tables: events +
+
+
INSERT INTO events (session_id, ts, kind, detail)
+VALUES ($1, now(), 'race_start', $2);
+
+

The events table is a low-volume audit log indexed + (session_id, ts DESC). kind is intended for an extensible set of + values — 'race_start', 'fastest_lap', 'overtake', + 'drs_open', etc.; though only race_start is wired in today. + Good spot to hang future "what just happened" tickers off.

+
+
+ +
+
+

§4Telemetry bulk INSERT (COPY)

+
+ internal/sim/simulator.go · Run() + runs: every tick — default 10 Hz × 20 rows = ~200 rows / second + tables: telemetry (hot) +
+
+
pool.CopyFrom(ctx,
+    pgx.Identifier{"telemetry"},
+    []string{"session_id", "driver_id", "ts", "lap",
+             "track_pos", "pos_x", "pos_y", "speed_kph",
+             "rpm", "gear", "throttle", "brake", "drs"},
+    pgx.CopyFromRows(tele),
+)
+
+

This is the workload the demo is built around. pgx's CopyFrom uses + the Postgres COPY FROM STDIN binary protocol over the wire — bypassing + the parser entirely, much cheaper than 20 separate INSERTs per tick. + Every read query on this page is competing with this stream of writes against the + same table; the headline of the demo is that none of them block.

+

Indexes maintained on every row: (session_id, driver_id, ts DESC) and (session_id, ts DESC).

+
+
+ +
+
+

§5Lap-completed INSERT

+
+ internal/sim/simulator.go · insertLap() + runs: once per car per lap — ~1 000 inserts over a 50-lap race + tables: laps +
+
+
INSERT INTO laps (session_id, driver_id, lap_number, lap_time_ms,
+                  sector1_ms, sector2_ms, sector3_ms, finished_at)
+VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ON CONFLICT DO NOTHING;
+
+

Conflict target is the natural primary key (session_id, driver_id, lap_number). + ON CONFLICT DO NOTHING makes retries safe — handy because a transient error + in the lap insert shouldn't kill the simulator. laps is small relative to + telemetry, but it's the authoritative source of Best/Last lap times for + the leaderboard.

+
+
+ +
+
+

§6End session

+
+ internal/sim/simulator.go · endSession() + runs: once at race finish or context cancellation + tables: sessions +
+
+
UPDATE sessions
+SET ended_at = now()
+WHERE session_id = $1;
+
+

ended_at IS NULL is how every read endpoint distinguishes a live race + from a completed one — see the ORDER BY (ended_at IS NULL) DESC, started_at DESC + pattern in §7. A simulator that crashes mid-race will leave this NULL, which is why + the read queries fall back to "most-recent" rather than panicking.

+
+
+ +

Read path · Dashboard ← CedarDB5 distinct queries

+ +
+
+

§7Pick the current session

+
+ server.go · shared by 5 endpoints + runs: ~8 times per second combined per connected client + tables: sessions +
+
+
SELECT session_id, name, track, total_laps
+FROM sessions
+ORDER BY (ended_at IS NULL) DESC,
+         started_at DESC
+LIMIT 1;
+
+

Boolean-as-sort-key trick: (ended_at IS NULL) DESC ranks unended (live) + sessions above ended ones, and started_at DESC tiebreaks among each + group. So the dashboard automatically follows whichever race is live right now, or + falls back to the most-recently-finished session if none is live.

+

Cadence breakdown per connected browser: + loadSnapshot (SSE) at 5 Hz, + handleLeaderboard at 2 Hz, + handleIngestRate at 1 Hz, + handleSpeedMap at ~0.33 Hz, + plus handleTrackJSON once on initial page load. + That's the "~8 / s combined" figure in the cadence tag.

+

One variant worth knowing about: handleTrackJSON uses the + same ORDER BY (ended_at IS NULL) DESC, started_at DESC LIMIT 1 + pattern but projects a single column — SELECT track FROM sessions … + — because all it needs is the track display name to regenerate the polyline + geometry server-side. Same plan, smaller payload.

+
+
+ +
+
+

§8SSE state snapshot

+
+ internal/web/server.go · loadSnapshot() + runs: every 200 ms per connected browser + tables: telemetry · laps · drivers +
+
+
WITH latest AS (
+    SELECT DISTINCT ON (driver_id)
+           driver_id, ts, lap, pos_x, pos_y, speed_kph,
+           rpm, gear, throttle, brake, drs
+    FROM telemetry
+    WHERE session_id = $1
+    ORDER BY driver_id, ts DESC
+),
+lap_stats AS (
+    SELECT driver_id,
+           MIN(lap_time_ms)                                       AS best_lap_ms,
+           (ARRAY_AGG(lap_time_ms ORDER BY lap_number DESC))[1]   AS last_lap_ms
+    FROM laps
+    WHERE session_id = $1
+    GROUP BY driver_id
+)
+SELECT d.driver_id, d.code, d.team, d.color_hex,
+       l.lap, l.pos_x, l.pos_y, l.speed_kph,
+       l.rpm, l.gear, l.throttle, l.brake, l.drs,
+       COALESCE(s.best_lap_ms, 0), COALESCE(s.last_lap_ms, 0)
+FROM latest l
+JOIN drivers d ON d.driver_id = l.driver_id
+LEFT JOIN lap_stats s ON s.driver_id = l.driver_id
+ORDER BY d.driver_id;
+
+

The centerpiece query. DISTINCT ON (driver_id) … ORDER BY driver_id, ts DESC + returns exactly one row per driver — their most recent telemetry sample — supported + by the telemetry_session_driver_ts_idx index (session_id, driver_id, ts DESC) + so the planner can do an index-only seek per group rather than scanning the partition. + The lap_stats CTE pulls Best/Last from the smaller laps table, + joined back in for the per-car telemetry panel.

+
+
+ +
+
+

§9Leaderboard with gap-to-leader

+
+ internal/web/server.go · handleLeaderboard() + runs: every 500 ms (HTMX polling) + tables: telemetry · laps · drivers +
+
+
WITH latest AS (
+    SELECT DISTINCT ON (driver_id)
+           driver_id, ts, lap, track_pos
+    FROM telemetry
+    WHERE session_id = $1
+    ORDER BY driver_id, ts DESC
+),
+lap_stats AS (
+    SELECT driver_id,
+           MIN(lap_time_ms) AS best_lap_ms,
+           MAX(lap_number)  AS max_lap_number
+    FROM laps
+    WHERE session_id = $1
+    GROUP BY driver_id
+),
+last_lap AS (
+    SELECT lp.driver_id, lp.lap_time_ms AS last_lap_ms
+    FROM laps lp
+    JOIN lap_stats ls
+      ON ls.driver_id      = lp.driver_id
+     AND ls.max_lap_number = lp.lap_number
+    WHERE lp.session_id = $1
+)
+SELECT d.code, d.team, d.color_hex,
+       l.lap, l.track_pos,
+       COALESCE(s.best_lap_ms, 0),
+       COALESCE(ll.last_lap_ms, 0),
+       (MAX(l.lap + l.track_pos) OVER ()
+         - (l.lap + l.track_pos))                AS gap_laps
+FROM latest l
+JOIN drivers d ON d.driver_id = l.driver_id
+LEFT JOIN lap_stats s ON s.driver_id = l.driver_id
+LEFT JOIN last_lap  ll ON ll.driver_id = l.driver_id
+ORDER BY (l.lap + l.track_pos) DESC;
+
+

Three CTEs plus a window function in a single round-trip. + MAX(l.lap + l.track_pos) OVER () + is the magic that gives every row the leader's race position, so the app can compute + "gap to leader" in lap-fractions; Go-side, the leader's last lap time is captured from + the first scanned row and used to convert lap-fractions into seconds for F1-TV-style + +1.234s formatting.

+

Returns: 20 rows. Rendered into an HTML fragment by Go's html/template + and swapped into the leaderboard panel by HTMX.

+
+
+ +
+
+

§10Speed-map heatmap

+
+ internal/web/server.go · handleSpeedMap() + runs: every 3 seconds + tables: telemetry +
+
+
SELECT WIDTH_BUCKET(track_pos, 0, 1, 100) AS bucket,
+       AVG(speed_kph)                    AS avg_kph,
+       COUNT(*)                          AS samples
+FROM telemetry
+WHERE session_id = $1
+GROUP BY bucket
+ORDER BY bucket;
+
+

The OLAP-flavoured one. Scans every telemetry row for the session — no + DISTINCT ON shortcut — and bins by 1%-of-lap track_pos. Returns up to + 100 rows. The dashboard pollers turn it into the colored ribbon laid along the + racing line on the track map.

+

The query gets more work to do as the race progresses (more telemetry + rows accumulate), which makes for a good "watch the database absorb the workload" + demo — pair it with §11 to show the simulator's writes keep ticking at full speed + regardless.

+
+
+ +
+
+

§11Live ingest rate (meta-query)

+
+ internal/web/server.go · handleIngestRate() + runs: every 1 second + tables: telemetry +
+
+
SELECT
+    COUNT(*) FILTER (WHERE ts > now() - interval '1 second') AS rows_last_1s,
+    COUNT(*)                                                  AS total_rows
+FROM telemetry
+WHERE session_id = $1;
+
+

The meta-query: the database is measuring its own ingest rate while continuing + to ingest. COUNT(*) FILTER (WHERE ...) is standard Postgres syntax that + computes both counts in a single index scan. Displayed in the dashboard footer so + the audience sees the row counter ticking up live, with every other panel still + refreshing concurrently.

+
+
+ + + + diff --git a/f1-telemetry/internal/web/templates/index.html b/f1-telemetry/internal/web/templates/index.html new file mode 100644 index 0000000..0ce578d --- /dev/null +++ b/f1-telemetry/internal/web/templates/index.html @@ -0,0 +1,426 @@ + + + + +F1 Telemetry — CedarDB Live Demo + + + + +
+

+ +   + + F1 TELEMETRY +

+
+ Session · · + Lap 0/0 · + SSE every 200 ms · last frame +
+
+ +
+
+

Track Map · Live Positions

+ + + + + + + + + + + + + + + + + +
+ +
+
+

Leaderboard (Live)

+
+

Waiting for data…

+
+
+
+

Per-Car Telemetry

+
+
+
+
+ +
+ + INSERTs: — rows/sec · + — total · + same table also serving SSE, leaderboard, speed-map · CedarDB · + SQL queries ↗ + + connecting… +
+ + + + diff --git a/f1-telemetry/internal/web/templates/leaderboard.html b/f1-telemetry/internal/web/templates/leaderboard.html new file mode 100644 index 0000000..1911882 --- /dev/null +++ b/f1-telemetry/internal/web/templates/leaderboard.html @@ -0,0 +1,20 @@ + + + + + + + + {{range .}} + + + + + + + + + + {{end}} + +
PDriverTeamLapGapLastBest
{{.Pos}}{{.Code}}{{.Team}}{{.Lap}}{{.Gap}}{{.Last}}{{.Best}}