Complete guide for developing and extending IngestKit.
Last updated: 2025-11-15
- Development Environment Setup
- Project Structure
- Development Workflow
- Adding New Event Types
- Schema Management
- Testing
- Building and Deployment
- Go 1.24+ (tested with 1.25) - Backend services
- Docker & Docker Compose - Infrastructure (PostgreSQL, Redpanda)
- Make - Build automation
- Optional: Python 3.9+ and Node.js 18+ for SDK development
# Clone the repository
git clone <repo-url>
cd ingestkit
# Start infrastructure
make up
# Build IngestKit CLI and services
make build
# Generate code from schema
make generate
# Create database tables
make db-create
# Run the API server
make run-api
# In another terminal, run the consumer
make run-consumerCreate .env file in project root:
# API Server
API_PORT=8080
API_KEY_1=dev_key_1234567890:default
API_KEY_2=sk_test_tenant_alpha:tenant_alpha
API_KEY_3=dev_key_ecommerce:ecommerce-demo
# Consumer
CONSUMER_WORKERS=4
CONSUMER_BATCH_SIZE=500
CONSUMER_BATCH_TIMEOUT_MS=20
# Database
POSTGRES_HOST=localhost
POSTGRES_PORT=5433
POSTGRES_USER=ingestkit
POSTGRES_PASSWORD=ingestkit_dev
POSTGRES_DB=ingestkit
# Kafka
REDPANDA_ADDR=localhost:19092
REDPANDA_TOPIC=ingestkit.eventsingestkit/
├── cmd/ # Main applications
│ ├── api/ # HTTP API server
│ ├── consumer/ # Kafka consumer
│ └── cli/ # IngestKit CLI tool
├── internal/ # Private application code
│ ├── api/ # API handlers and middleware
│ │ └── middleware/ # Auth, CORS, rate limiting
│ ├── messaging/ # Kafka producer/consumer
│ ├── schema/ # Schema parsing and generators
│ │ └── templates/ # Code generation templates
│ ├── storage/ # PostgreSQL operations
│ │ ├── partitions.go # Auto-partition management
│ │ └── dlq.go # Dead letter queue
│ └── validation/ # Event validation
├── generated/ # Auto-generated code (gitignored)
│ ├── sql/ # SQL DDL
│ ├── models/ # Go structs
│ ├── storage/ # COPY protocol writers
│ ├── consumer/ # Event handlers
│ └── sdk/ # Client SDKs
│ ├── python/ # Python SDK
│ └── typescript/ # TypeScript SDK
├── schema/ # Schema definitions
│ └── events.yaml # Single source of truth
├── init-db/ # Database initialization
│ └── 01-init.sql # Metadata tables
├── examples/ # Example applications
│ ├── blog-flask/ # Python/Flask blog analytics
│ └── ecommerce-express/ # TypeScript/Express e-commerce
└── docs/ # Documentation
- Edit Schema: Modify
schema/events.yaml - Generate Code: Run
make generate - Update Database: Run
make db-create(creates new columns/tables) - Build Services: Run
make build - Restart Services: Restart API and consumer
- Test Changes: Use examples or curl
# Terminal 1: Watch schema and regenerate
while true; do
inotifywait -e modify schema/events.yaml
make generate && make build
done
# Terminal 2: API server (restart manually after rebuild)
make run-api
# Terminal 3: Consumer (restart manually after rebuild)
make run-consumerCheck API Server Logs:
# API logs show validation errors, authentication issues
make run-api
# Look for:
# - Event types, API key count, rate limit settings
# - Schema validation failures, auth errorsCheck Consumer Logs:
# Consumer logs show batch processing, partition creation
make run-consumer
# Look for:
# - Batch: N events
# - Wrote N event_type events
# - Errors in processing or DLQ writesCheck Database State:
# Connect to database
make db-connect
# Check event counts
SELECT
tablename,
(SELECT COUNT(*) FROM events_user_signup) as count
FROM pg_tables
WHERE tablename LIKE 'events_%';
# Check DLQ
SELECT * FROM ingestkit_meta.dead_letter_queue;Check Kafka Messages:
# List topics
docker exec -it redpanda rpk topic list
# Consume from topic
docker exec -it redpanda rpk topic consume ingestkit.eventsEdit schema/events.yaml:
events:
# ... existing events ...
new_event_name:
description: "Description of your event"
fields:
field_name:
type: string
required: true
description: "Field description"
optional_field:
type: integer
required: false
description: "Optional field"make generateThis generates:
- SQL DDL in
generated/sql/schema.sql - Go models in
generated/models/events.go - Storage writers in
generated/storage/writer.go - Consumer handlers in
generated/consumer/handler.go
make db-createThis executes the generated SQL to create tables.
# Python SDK
./bin/ingestkit sdk generate --lang python --api-url http://localhost:8080
# TypeScript SDK
./bin/ingestkit sdk generate --lang typescript --api-url http://localhost:8080curl -X POST http://localhost:8080/v1/events/new_event_name \
-H "Authorization: Bearer dev_key_1234567890" \
-H "Content-Type: application/json" \
-d '{
"field_name": "value",
"optional_field": 123
}'The schema version is defined in schema/events.yaml:
version: "1.0"-
Additive Changes Only (for backwards compatibility):
- Add new optional fields
- Add new event types
- Don't remove or rename fields
-
Breaking Changes (requires version bump):
- Change field types
- Make optional fields required
- Remove fields
- Rename fields
-
Version Bump Process:
version: "2.0" # Increment version
Then regenerate everything:
make generate make db-create make build
Clients can fetch the current schema:
# Fetch schema with ETag support
curl -i http://localhost:8080/schema
# With caching
curl -H "If-None-Match: <previous-etag>" http://localhost:8080/schemaSchema push is protected by a dedicated ADMIN_SCHEMA_KEY environment variable. This endpoint is NOT available to regular API keys - only to platform administrators.
Setup Admin Key (Server)
Add to your server .env:
ADMIN_SCHEMA_KEY=admin_secret_key_hereMethod 1: Using IngestKit CLI (Recommended)
# Push schema to local server (requires admin key)
ingestkit schema push --api-key admin_secret_key_here
# Push schema to remote server
ingestkit schema push \
--api-url https://api.ingestkit.com \
--api-key admin_secret_key_here
# Or use environment variables
export INGESTKIT_API_URL=https://api.ingestkit.com
export INGESTKIT_API_KEY=admin_secret_key_here
ingestkit schema pushMethod 2: Using curl
# Push updated schema to server (requires admin key)
curl -X POST http://localhost:8080/v1/schema/push \
-H "Authorization: Bearer admin_secret_key_here" \
-H "Content-Type: application/x-yaml" \
--data-binary @schema/events.yamlWhat Happens After Push
Server will:
- Validate the schema before accepting
- Create automatic backup (e.g.,
schema/events.yaml.backup.1731672000) - Update schema file
- Return validation results with event count
The schema push does NOT automatically reload the server or regenerate code. You must:
Method 1: Using Makefile (Recommended)
# Apply all schema changes (generate + build + db-create)
make schema-apply
# Then restart services:
make run-api # In one terminal
make run-consumer # In another terminalMethod 2: Manual Steps
# 1. Regenerate code from new schema
make generate
# 2. Rebuild binaries
make build
# 3. Apply database migrations
make db-create
# 4. Restart API server
# Stop current API and run:
make run-api
# 5. Restart Consumer
# Stop current consumer and run:
make run-consumerThis manual workflow is intentional for safety - hot-reloading schemas in production could cause data inconsistencies or downtime.
# Run all tests
go test ./...
# Run with coverage
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
# Run specific package
go test ./internal/validation/...# Start infrastructure
make up
# Run integration tests
go test -tags=integration ./tests/integration/...# Quick load test (100 RPS for 10s)
make loadtest-quick
# Full load test (1000 RPS for 60s)
make loadtest# Blog example (Python)
cd examples/blog-flask
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python main.py
# E-commerce example (TypeScript)
cd examples/ecommerce-express
npm install
npm start# Build all binaries (api, consumer, ingestkit CLI)
make build
# Build individual components
go build -o bin/api ./cmd/api
go build -o bin/consumer ./cmd/consumer
go build -o bin/ingestkit ./cmd/cli# Build Docker images
docker build -t ingestkit-api -f Dockerfile.api .
docker build -t ingestkit-consumer -f Dockerfile.consumer .
# Run with Docker Compose
docker-compose up -d- Set production API keys (not dev_key_*)
- Configure appropriate rate limits
- Set CONSUMER_WORKERS based on load
- Enable connection pooling
- Configure log levels (not DEBUG)
- Set up monitoring (Prometheus /metrics endpoints)
- Configure TLS for API
- Set up Redpanda cluster (not single node)
- Configure PostgreSQL replication
- Set up automated backups
- Configure DLQ alerting
# API Server
API_PORT=8080
LOG_LEVEL=info
RATE_LIMIT_RPS=1000
# Consumer
CONSUMER_WORKERS=8 # Scale based on load
CONSUMER_BATCH_SIZE=500
CONSUMER_BATCH_TIMEOUT_MS=20
# Database
DATABASE_URL=postgres://user:pass@postgres:5432/ingestkit?sslmode=require
# Kafka
KAFKA_BROKERS=kafka1:9092,kafka2:9092,kafka3:9092
KAFKA_TOPIC=ingestkit.events# ⚠️ Warning: Deletes all data
make db-resetmake db-stats# API metrics
curl http://localhost:8080/metrics
# Consumer metrics
curl http://localhost:8081/metrics- Create middleware in
internal/api/middleware/ - Add to middleware chain in
cmd/api/main.go - Test with integration tests
Example:
// internal/api/middleware/custom.go
func CustomMiddleware() fiber.Handler {
return func(c *fiber.Ctx) error {
// Middleware logic
return c.Next()
}
}
// cmd/api/main.go
app.Use(middleware.CustomMiddleware())Problem: Generated code doesn't match schema
Solution:
# Clean and regenerate
rm -rf generated/
make generate
make buildProblem: Schema changed but tables not updated
Solution:
# For development (⚠️ loses data)
make db-reset
make db-create
# For production: Write manual migration
psql $DATABASE_URL -f migrations/001_add_column.sqlProblem: Import errors after renaming
Solution:
# Update go.mod module path
go mod edit -module github.com/yourorg/ingestkit
find . -name "*.go" -exec sed -i 's|old/import/path|new/import/path|g' {} +
go mod tidy- Fork the repository
- Create a feature branch
- Make changes
- Add tests
- Run
make test - Submit PR