Skip to content

Aydeing/log-collector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Project: Concurrency-First Log Collector & Health Checker (Go)

One-line: A production-style, concurrency-heavy Go service that ingests logs/health pings, batches and persists them to pluggable sinks, and runs concurrent health checks — built with testable modules, observability, graceful shutdown, and benchmarks to showcase performance for interviews.


Goals

  • Demonstrate deep Go systems skills: goroutines, channels, context, worker pools, batching, backpressure, graceful shutdown, and testing/benchmarking.
  • Design for production: metrics, tracing hooks, retries, rate-limiting, circuit breakers, and pluggable sinks.
  • Provide a reproducible repo with Docker + docker-compose and README so recruiters and interviewers can run and benchmark locally.
  • Surface interview talking points and resume-ready bullets.

High-level architecture

      +-----------+           +----------------+          +--------+
      |  Clients  | --HTTP--> |  Ingest Server | --ch-->  | Batcher |
      +-----------+           +----------------+          +----+---+
                                                           | |
                                        +------------------+ | +----------------+
                                        |                    v                  |
                                   +----+-----+          +--------+         +--+--+
                                   | Worker 1 |          | WorkerN |  ...  | Sink |
                                   +----------+          +--------+         +-----+

Components:

  • HTTP Ingest API (POST /logs, POST /health)
  • In-memory buffer with bounded capacity, backpressure to clients (HTTP 429 / timeout)
  • Batcher that groups messages by size/time and sends to worker pool
  • Worker pool that writes to sinks (Postgres sink, File sink, Optional Kafka/Redis sink)
  • Health checker: independent component that pings registered services concurrently and records status history
  • Observability: Prometheus metrics, OpenTelemetry traces (hooks), structured logs
  • Admin endpoints: /metrics (Prometheus), /healthz, /admin/jobs

Non-functional requirements

  • Graceful shutdown (context.Context, sync.WaitGroup) ensuring inflight batches flush
  • Config-driven (env + config file) with sane defaults
  • Tests: unit tests for batcher, worker, sink; integration test using docker-compose + local Postgres
  • Benchmarks: logs/sec and end-to-end latency under load
  • CI: run go test ./... and go vet, golangci-lint (recommend), build Docker image

Folder layout

/cmd/collector/main.go
/internal/ingest/server.go
/internal/ingest/handler.go
/internal/batcher/batcher.go
/internal/worker/worker.go
/internal/sink/sink.go
/internal/sink/postgres/postgres.go
/internal/sink/file/file.go
/internal/health/checker.go
/internal/config/config.go
/internal/metrics/metrics.go
/internal/tracing/tracing.go
/test/integration/*
/docker/Dockerfile
/docker/docker-compose.yml
/README.md

Key design patterns & decisions

  • Bounded channel buffers for ingress to limit memory usage.
  • Batcher with time-or-size trigger: flush when len(batch) >= maxSize or time.Since(first) >= maxWait.
  • Worker pool: N workers read from a batches channel; each worker writes to sink with retry/backoff.
  • Pluggable sink interface:
// internal/sink/sink.go
package sink

import "context"

type Record struct {
    Timestamp int64
    Type      string // "log" | "health"
    Payload   []byte
}

type Sink interface {
    WriteBatch(ctx context.Context, records []Record) error
    Close() error
}
  • Backpressure: When ingress channel is full, server returns 429 or blocks for a short timeout using select and time.After.
  • Graceful shutdown: HTTP server shutdown -> close ingest channels -> wait for batcher and workers to finish with WaitGroup.
  • Context propagation: pass context.Context with request ID to every step for tracing and cancellations.

Example: core components (skeletons)

cmd/collector/main.go

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/your/module/internal/config"
    "github.com/your/module/internal/ingest"
    "github.com/your/module/internal/metrics"
    "github.com/your/module/internal/sink/postgres"
)

func main() {
    ctx := context.Background()

    cfg := config.LoadFromEnv()
    metrics.Init(cfg)

    pg, err := postgres.New(cfg.PostgresDSN)
    if err != nil {
        log.Fatalf("pg init: %v", err)
    }
    defer pg.Close()

    srv := ingest.NewServer(cfg, pg)

    go func() {
        if err := srv.Run(ctx); err != nil {
            log.Fatalf("server run: %v", err)
        }
    }()

    // graceful shutdown
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
    <-sig

    shutdownCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
    defer cancel()
    if err := srv.Shutdown(shutdownCtx); err != nil {
        log.Printf("shutdown err: %v", err)
    }
}

internal/batcher/batcher.go (concept)

package batcher

import (
    "context"
    "time"
)

type Batcher struct {
    in chan []byte
    out chan [][]byte // batches
    maxSize int
    maxWait time.Duration
}

func New(in chan []byte, out chan [][]byte, maxSize int, maxWait time.Duration) *Batcher {
    return &Batcher{in: in, out: out, maxSize: maxSize, maxWait: maxWait}
}

func (b *Batcher) Run(ctx context.Context) {
    var batch [][]byte
    var timer *time.Timer

    flush := func() {
        if len(batch) == 0 { return }
        b.out <- batch
        batch = nil
        if timer != nil { timer.Stop() }
    }

    for {
        select {
        case <-ctx.Done():
            flush()
            return
        case msg := <-b.in:
            batch = append(batch, msg)
            if len(batch) >= b.maxSize {
                flush()
                continue
            }
            if timer == nil {
                timer = time.NewTimer(b.maxWait)
            }
        case <-timer.C:
            flush()
            timer = nil
        }
    }
}

Operational concerns

  • Schema: store logs and health checks in separate tables. Use COPY/bulk insert or pgx to speed up Postgres writes.
  • Batch size tuning: benchmark different sizes (100, 500, 2000) to find optimal throughput vs latency.
  • Idempotency: include id field for dedupe if necessary.
  • Sinks: for production, prefer a durable queue (Kafka) between batcher and workers; for demo, Postgres or files are fine.

Observability & Metrics to show in interviews

Expose and collect:

  • ingest_requests_total, ingest_errors_total, ingest_rate (per-second)
  • batch_size, batches_flushed_total
  • worker_failures_total, worker_retries_total
  • write_latency_seconds histograms for sink writes
  • Prometheus + Grafana dashboard showing throughput vs latency and error rates

Also add structured logging with request ID and sample OpenTelemetry traces (instrumented at batch boundary and sink write).


Testing & Benchmarks

  • Unit tests for batcher logic: ensure time/size flush behavior.
  • Integration test: docker-compose with the service + Postgres, run a script that posts N messages and assert counts in DB.
  • Benchmarks: go test -bench . for critical components (batcher, sink writer). Provide simple wrk or hey scripts for end-to-end load tests.

Example benchmark command in README:

# run locally
docker-compose up --build -d
./scripts/load_test.sh 10000 50  # send 10k messages with 50 concurrent clients

docker-compose (example)

version: '3.8'
services:
  collector:
    build: ../docker
    ports:
      - "8080:8080"
    environment:
      - POSTGRES_DSN=postgres://postgres:pass@postgres:5432/logs?sslmode=disable
  postgres:
    image: postgres:15
    environment:
      POSTGRES_PASSWORD: pass
    ports:
      - "5432:5432"

CI / Quality checks

  • go test ./...
  • go vet ./...
  • golangci-lint run (setup in CI)
  • Build Docker image and run basic smoke tests

Resume bullets (copy-paste)

  • Built a high-throughput Go-based log collector and health-checker with batching, worker-pool concurrency, and pluggable sinks (Postgres, file). Implemented backpressure, graceful shutdown, and Prometheus metrics; benchmarked to Xk req/s (local).
  • Designed and implemented a concurrent health checker that performs thousands of concurrent probes using goroutines and context cancellation, storing latency and status history.

Interview talking points

  • Why choose Go: binary deployment, goroutines for cheap concurrency, predictable performance.
  • Batcher design: tradeoffs between size vs latency; how time-or-size triggers work; how to handle partial failures.
  • Backpressure: how bounded channels and request timeouts protect the service.
  • Graceful shutdown: steps to ensure no data loss and closing resources safely.
  • Observability: key metrics, how you used them to tune batch size and worker count.
  • Scaling beyond single node: add Kafka, partitioning by tenant/service, autoscaling workers, consistent hashing for stateful sinks.

Roadmap & milestones (2–6 weeks depending on effort)

  • Week 0: Repo init, config, CI basics, Dockerfile
  • Week 1: HTTP ingest + basic in-memory channel + simple sink (file)
  • Week 2: Batcher + worker pool + Postgres sink + integration tests
  • Week 3: Health checker + admin endpoints + metrics + graceful shutdown
  • Week 4: Benchmarks, optimization, README, Docker-compose examples, resume copy
  • Week 5–6: Optional: Kafka sink, OpenTelemetry tracing, Grafana dashboards, production hardening

Acceptance criteria (what to demo to a recruiter)

  1. Run docker-compose up and start the service.
  2. Run load script sending 10k messages; show Prometheus metrics and DB counts.
  3. Show benchmark numbers (throughput, p95 latency) and explain tuning choices.
  4. Show test coverage for core modules (batcher, worker) and run CI badge locally.

Next steps I can do for you right now (pick one, I will generate the content in the canvas):

  1. Generate full cmd/collector/main.go, internal/ingest server & handlers, and a working internal/batcher + internal/worker with a Postgres sink — a runnable minimal demo.
  2. Create the docker/Dockerfile and docker-compose.yml and scripts/load_test.sh for end-to-end testing.
  3. Write the unit tests + benchmarks for batcher & worker.
  4. Generate Prometheus metrics + Grafana dashboard JSON and example queries.
  5. Produce a short README with run instructions and sample curl commands.

End of project spec.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors