From 88233e5c97842184cb29cad3bc92255ec640d62c Mon Sep 17 00:00:00 2001 From: king_stroke Date: Mon, 22 Sep 2025 20:46:24 -0400 Subject: [PATCH 1/6] Added prometheus integration and monitoring --- src/go.mod | 17 ++++++ src/prometheus.go | 140 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 src/prometheus.go diff --git a/src/go.mod b/src/go.mod index 3e8eb50..0faab9a 100644 --- a/src/go.mod +++ b/src/go.mod @@ -5,7 +5,24 @@ go 1.24.3 require github.com/redis/go-redis/v9 v9.10.0 require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/prometheus/client_golang v1.23.2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + github.com/shirou/gopsutil/v3 v3.24.5 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/src/prometheus.go b/src/prometheus.go new file mode 100644 index 0000000..1e6b0f0 --- /dev/null +++ b/src/prometheus.go @@ -0,0 +1,140 @@ +package main + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + cpuinfo "github.com/shirou/gopsutil/v3/cpu" + diskinfo "github.com/shirou/gopsutil/v3/disk" + meminfo "github.com/shirou/gopsutil/v3/mem" +) + +func handle() { + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(":2112", nil) +} + +// http metrics +var ( + httpInFlight = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "http_in_flight_requests", + Help: "Current number of in-flight HTTP requests.", + }) + httpRequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "http_requests_total", + Help: "Total HTTP requests by handler/method/status.", + }, []string{"handler", "method", "code"}) + httpRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "HTTP request latency in seconds.", + Buckets: prometheus.DefBuckets, + }, []string{"handler", "method", "code"}) +) + +// job metrics +var ( + jobsStarted = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobs_started_total", + Help: "Total number of jobs started.", + }, []string{"job_type", "gpu"}) + jobsCompleted = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobs_completed_total", + Help: "Total number of jobs completed successfully.", + }, []string{"job_type", "gpu"}) + jobsFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobs_failed_total", + Help: "Total number of jobs that failed.", + }, []string{"job_type", "gpu"}) + runningJobs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "running_jobs", + Help: "Total jobs currently running", + }, []string{"gpu"}) +) + +// system metrics (cpu, memory, disk, etc.) +var ( + // cpu (percent of total) + systemCPUPercent = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "system_cpu_usage_percent", + Help: "Host CPU usage percentage (all cores averaged).", + }) + + // memory + systemMemTotalBytes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "system_memory_total_bytes", + Help: "Host total memory bytes.", + }) + systemMemUsedBytes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "system_memory_used_bytes", + Help: "Host used memory bytes.", + }) + + // disk per mountpoint + systemDiskTotalBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "system_disk_total_bytes", + Help: "Total disk bytes for a mountpoint.", + }, []string{"mountpoint"}) + + systemDiskUsedBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "system_disk_used_bytes", + Help: "Used disk bytes for a mountpoint.", + }, []string{"mountpoint"}) +) + +// this function may need to run per server to capture local system metrics + +func startSystemCollector(ctx context.Context) { + + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + + select { + case <-ctx.Done(): + return + + case <-ticker.C: + + // cpu + pct, err := cpuinfo.Percent(0, false) + + if err == nil && len(pct) > 0 { + systemCPUPercent.Set(pct[0]) + } else { + // TODO: log err + } + + // memory + m, err := meminfo.VirtualMemory() + if err == nil { + systemMemTotalBytes.Set(float64(m.Total)) + systemMemUsedBytes.Set(float64(m.Used)) + } else { + // TODO: log err + } + + // disk capture + parts, err := diskinfo.Partitions(false) + if err == nil { + for _, p := range parts { + if u, err := diskinfo.Usage(p.Mountpoint); err == nil { + // Use a consistent label key (e.g., mountpoint) + systemDiskTotalBytes.WithLabelValues(u.Path).Set(float64(u.Total)) + systemDiskUsedBytes.WithLabelValues(u.Path).Set(float64(u.Used)) + } else { + // TODO: log err + } + } + } else { + // TODO: log err + } + } + } + }() +} From e8ee41152b03832598ab6f3e0ae08b7c3bde0c14 Mon Sep 17 00:00:00 2001 From: king_stroke Date: Mon, 22 Sep 2025 20:48:53 -0400 Subject: [PATCH 2/6] Added prometheuss integration and monitoring --- src/go.sum | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/go.sum b/src/go.sum index 30e2f56..51e361f 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1,3 +1,5 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -6,8 +8,46 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= +github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 4c87de22082757bbbe716632718a56c0918812b2 Mon Sep 17 00:00:00 2001 From: avii778 <142438557+avii778@users.noreply.github.com> Date: Mon, 29 Sep 2025 13:28:19 -0400 Subject: [PATCH 3/6] Fixed issues and added prometheus and grafana setup --- docker-compose.yml | 25 +++++++++++++ prometheus.yml | 9 +++++ src/api.go | 24 +++++++++---- src/int_test.go | 4 +-- src/prometheus.go | 90 +++++++++++++++++++++++++++++++++++++++------- src/supervisor.go | 14 ++++++-- 6 files changed, 142 insertions(+), 24 deletions(-) create mode 100644 prometheus.yml diff --git a/docker-compose.yml b/docker-compose.yml index b400a10..a5445b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,8 +8,33 @@ services: - redis_data:/data - ./redis.conf:/usr/local/etc/redis/redis.conf:ro command: redis-server /usr/local/etc/redis/redis.conf + prometheus: + image: prom/prometheus:latest + ports: ["9090:9090"] + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prom-data:/prometheus + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.path=/prometheus + + grafana: + image: grafana/grafana:latest + ports: ["3001:3000"] + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + volumes: + - grafana_data:/var/lib/grafana + depends_on: [prometheus] volumes: redis_data: + driver: local + prom-data: + driver: local + grafana_data: driver: local \ No newline at end of file diff --git a/prometheus.yml b/prometheus.yml new file mode 100644 index 0000000..28ed0f7 --- /dev/null +++ b/prometheus.yml @@ -0,0 +1,9 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: "app" + metrics_path: /metrics + static_configs: + - targets: ["host.docker.internal:3000"] diff --git a/src/api.go b/src/api.go index d44428d..bc5b61f 100644 --- a/src/api.go +++ b/src/api.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/redis/go-redis/v9" ) @@ -22,14 +23,15 @@ type App struct { httpServer *http.Server wg sync.WaitGroup log *slog.Logger + metrics *Metrics } -func NewApp(redisAddr, gpuType string, log *slog.Logger) *App { +func NewApp(redisAddr, gpuType string, log *slog.Logger, metrics *Metrics) *App { client := redis.NewClient(&redis.Options{Addr: redisAddr}) scheduler := NewScheduler(redisAddr, log) consumerID := fmt.Sprintf("worker_%d", os.Getpid()) - supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log) + supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log, metrics) mux := http.NewServeMux() a := &App{ @@ -38,12 +40,15 @@ func NewApp(redisAddr, gpuType string, log *slog.Logger) *App { supervisor: supervisor, httpServer: &http.Server{Addr: ":3000", Handler: mux}, log: log, + metrics: metrics, } - mux.HandleFunc("/auth/login", a.login) - mux.HandleFunc("/auth/refresh", a.refresh) - mux.HandleFunc("/jobs", a.enqueueJob) - mux.HandleFunc("/jobs/status", a.getJobStatus) + mux.Handle("/auth/login", a.metrics.WrapHTTP("auth_login", http.HandlerFunc(a.login))) + mux.Handle("/auth/refresh", a.metrics.WrapHTTP("auth_refresh", http.HandlerFunc(a.refresh))) + mux.Handle("/jobs", a.metrics.WrapHTTP("jobs", http.HandlerFunc(a.enqueueJob))) + mux.Handle("/jobs/status", a.metrics.WrapHTTP("jobs_status", http.HandlerFunc(a.getJobStatus))) + + mux.Handle("/metrics", promhttp.Handler()) a.log.Info("new app initialized", "redis_address", redisAddr, "gpu_type", gpuType, "http_address", a.httpServer.Addr) @@ -105,7 +110,9 @@ func main() { fmt.Fprintf(os.Stderr, "failed to create logger: %v\n", err) os.Exit(1) } - app := NewApp("localhost:6379", "AMD", log) + + metrics := NewMetrics() + app := NewApp("localhost:6379", "AMD", log, metrics) if err := app.Start(); err != nil { log.Error("failed to start app", "err", err) @@ -114,6 +121,9 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() + + go app.metrics.StartCollecting(ctx) + <-ctx.Done() log.Info("shutdown signal received") diff --git a/src/int_test.go b/src/int_test.go index dac0636..1d30ec8 100644 --- a/src/int_test.go +++ b/src/int_test.go @@ -33,9 +33,9 @@ func TestIntegration(t *testing.T) { fmt.Fprintf(os.Stderr, "failed to create logger: %v\n", err) os.Exit(1) } - + metrics := NewMetrics() consumerID := fmt.Sprintf("worker_%d", os.Getpid()) - supervisor := NewSupervisor(redisAddr, consumerID, "AMD", supervisorLog) + supervisor := NewSupervisor(redisAddr, consumerID, "AMD", supervisorLog, metrics) if err := supervisor.Start(); err != nil { t.Errorf("Failed to start supervisor: %v", err) diff --git a/src/prometheus.go b/src/prometheus.go index 1e6b0f0..a5cffc7 100644 --- a/src/prometheus.go +++ b/src/prometheus.go @@ -2,6 +2,7 @@ package main import ( "context" + "log/slog" "net/http" "time" @@ -13,11 +14,6 @@ import ( meminfo "github.com/shirou/gopsutil/v3/mem" ) -func handle() { - http.Handle("/metrics", promhttp.Handler()) - http.ListenAndServe(":2112", nil) -} - // http metrics var ( httpInFlight = promauto.NewGauge(prometheus.GaugeOpts{ @@ -49,7 +45,7 @@ var ( Name: "jobs_failed_total", Help: "Total number of jobs that failed.", }, []string{"job_type", "gpu"}) - runningJobs = promauto.NewCounterVec(prometheus.CounterOpts{ + runningJobs = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "running_jobs", Help: "Total jobs currently running", }, []string{"gpu"}) @@ -85,8 +81,78 @@ var ( }, []string{"mountpoint"}) ) -// this function may need to run per server to capture local system metrics +type Metrics struct { + HTTPInFlight prometheus.Gauge + HTTPRequestsTotal *prometheus.CounterVec + HTTPRequestDuration *prometheus.HistogramVec + + JobsStarted *prometheus.CounterVec + JobsCompleted *prometheus.CounterVec + JobsFailed *prometheus.CounterVec + RunningJobs *prometheus.GaugeVec + + SystemCPUPercent prometheus.Gauge + SystemMemTotalBytes prometheus.Gauge + SystemMemUsedBytes prometheus.Gauge + SystemDiskTotalBytes *prometheus.GaugeVec + SystemDiskUsedBytes *prometheus.GaugeVec +} + +func NewMetrics() *Metrics { + return &Metrics{ + HTTPInFlight: httpInFlight, + HTTPRequestsTotal: httpRequestsTotal, + HTTPRequestDuration: httpRequestDuration, + + JobsStarted: jobsStarted, + JobsCompleted: jobsCompleted, + JobsFailed: jobsFailed, + RunningJobs: runningJobs, + + SystemCPUPercent: systemCPUPercent, + SystemMemTotalBytes: systemMemTotalBytes, + SystemMemUsedBytes: systemMemUsedBytes, + SystemDiskTotalBytes: systemDiskTotalBytes, + SystemDiskUsedBytes: systemDiskUsedBytes, + } +} + +func (m *Metrics) StartCollecting(ctx context.Context) { + startSystemCollector(ctx) + +} + +// Wrap http wraps around the http handlers to collect metrics +func (m *Metrics) WrapHTTP(name string, next http.Handler) http.Handler { + return promhttp.InstrumentHandlerInFlight( + httpInFlight, + promhttp.InstrumentHandlerDuration( + httpRequestDuration.MustCurryWith(prometheus.Labels{"handler": name}), + promhttp.InstrumentHandlerCounter( + httpRequestsTotal.MustCurryWith(prometheus.Labels{"handler": name}), + next, + ), + ), + ) +} + +func (m *Metrics) TrackJob(ctx context.Context, jobType, gpu string, fn func(context.Context) error) error { + + jobsStarted.WithLabelValues(jobType, gpu).Inc() + runningJobs.WithLabelValues(gpu).Inc() + defer runningJobs.WithLabelValues(gpu).Dec() + err := fn(ctx) + + if err != nil { + jobsFailed.WithLabelValues(jobType, gpu).Inc() + return err + } + jobsCompleted.WithLabelValues(jobType, gpu).Inc() + return nil +} + +// Collects metrics from the host where this process is running, these values reflect the local machine func startSystemCollector(ctx context.Context) { go func() { @@ -107,7 +173,7 @@ func startSystemCollector(ctx context.Context) { if err == nil && len(pct) > 0 { systemCPUPercent.Set(pct[0]) } else { - // TODO: log err + slog.Error("failed to get cpu percent", "err", err) } // memory @@ -116,7 +182,7 @@ func startSystemCollector(ctx context.Context) { systemMemTotalBytes.Set(float64(m.Total)) systemMemUsedBytes.Set(float64(m.Used)) } else { - // TODO: log err + slog.Error("failed to get memory info", "err", err) } // disk capture @@ -124,15 +190,15 @@ func startSystemCollector(ctx context.Context) { if err == nil { for _, p := range parts { if u, err := diskinfo.Usage(p.Mountpoint); err == nil { - // Use a consistent label key (e.g., mountpoint) + systemDiskTotalBytes.WithLabelValues(u.Path).Set(float64(u.Total)) systemDiskUsedBytes.WithLabelValues(u.Path).Set(float64(u.Used)) } else { - // TODO: log err + slog.Error("failed to get disk usage", "mountpoint", p.Mountpoint) } } } else { - // TODO: log err + slog.Error("failed to get disk partitions") } } } diff --git a/src/supervisor.go b/src/supervisor.go index 5c5756b..c1e49ed 100644 --- a/src/supervisor.go +++ b/src/supervisor.go @@ -20,9 +20,10 @@ type Supervisor struct { gpuType string wg sync.WaitGroup log *slog.Logger + metrics *Metrics } -func NewSupervisor(redisAddr, consumerID, gpuType string, log *slog.Logger) *Supervisor { +func NewSupervisor(redisAddr, consumerID, gpuType string, log *slog.Logger, metrics *Metrics) *Supervisor { client := redis.NewClient(&redis.Options{ Addr: redisAddr, }) @@ -36,6 +37,7 @@ func NewSupervisor(redisAddr, consumerID, gpuType string, log *slog.Logger) *Sup consumerID: consumerID, gpuType: gpuType, log: log, + metrics: metrics, } } @@ -126,9 +128,15 @@ func (s *Supervisor) handleMessage(message redis.XMessage) { s.log.Info("processing job", "job_id", job.ID, "job_type", job.Type) // Simulate job processing - success := s.processJob(job) + gpuLabel := s.gpuType // e.g. "AMD" or "NVIDIA" + err := s.metrics.TrackJob(context.Background(), job.Type, gpuLabel, func(ctx context.Context) error { + if s.processJob(job) { + return nil + } + return fmt.Errorf("job failed") + }) - if success { + if err == nil { s.ackMessage(message.ID) s.log.Info("job completed successfully", "job_id", job.ID) } else { From 6f6720adc37ececd8eb5e1ae6b6b148f1323d8f7 Mon Sep 17 00:00:00 2001 From: avii778 <142438557+avii778@users.noreply.github.com> Date: Mon, 3 Nov 2025 18:41:01 -0500 Subject: [PATCH 4/6] Added monitoring endpoints --- src/api.go | 43 +++++++++++++++++++++++-------------------- src/int_test.go | 1 + 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/api.go b/src/api.go index 15085a6..75f9f38 100644 --- a/src/api.go +++ b/src/api.go @@ -19,13 +19,13 @@ import ( ) type App struct { - redisClient *redis.Client - scheduler *Scheduler - supervisor *Supervisor - httpServer *http.Server - wg sync.WaitGroup - log *slog.Logger - metrics *Metrics + redisClient *redis.Client + scheduler *Scheduler + supervisor *Supervisor + httpServer *http.Server + wg sync.WaitGroup + log *slog.Logger + metrics *Metrics statusRegistry *StatusRegistry } @@ -38,23 +38,26 @@ func NewApp(redisAddr, gpuType string, log *slog.Logger, metrics *Metrics) *App supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log, metrics) mux := http.NewServeMux() + + mux.Handle("/metrics", promhttp.Handler()) + a := &App{ - redisClient: client, - scheduler: scheduler, - supervisor: supervisor, - httpServer: &http.Server{Addr: ":3000", Handler: mux}, - log: log, - metrics: metrics, - statusRegistry: statusRegistry, + redisClient: client, + scheduler: scheduler, + supervisor: supervisor, + httpServer: &http.Server{Addr: ":3000", Handler: mux}, + log: log, + metrics: metrics, + statusRegistry: statusRegistry, } mux.Handle("/auth/login", a.metrics.WrapHTTP("auth_login", http.HandlerFunc(a.login))) - mux.Handle("/auth/refresh", a.metrics.WrapHTTP("auth_refresh", http.HandlerFunc(a.refresh))) - mux.Handle("/jobs", a.metrics.WrapHTTP("jobs", http.HandlerFunc(a.enqueueJob))) - mux.Handle("/jobs/status", a.metrics.WrapHTTP("jobs_status", http.HandlerFunc(a.getJobStatus))) - mux.Handle("/supervisors/status", a.metrics.WrapHTTP("supervisors_status", http.HandlerFunc(a.getSupervisorStatus))) - mux.Handle("/supervisors/status/", a.metrics.WrapHTTP("supervisors_status_by_id", http.HandlerFunc(a.getSupervisorStatusByID))) - mux.Handle("/supervisors", a.metrics.WrapHTTP("supervisors", http.HandlerFunc(a.getAllSupervisors))) + mux.Handle("/auth/refresh", a.metrics.WrapHTTP("auth_refresh", http.HandlerFunc(a.refresh))) + mux.Handle("/jobs", a.metrics.WrapHTTP("jobs", http.HandlerFunc(a.enqueueJob))) + mux.Handle("/jobs/status", a.metrics.WrapHTTP("jobs_status", http.HandlerFunc(a.getJobStatus))) + mux.Handle("/supervisors/status", a.metrics.WrapHTTP("supervisors_status", http.HandlerFunc(a.getSupervisorStatus))) + mux.Handle("/supervisors/status/", a.metrics.WrapHTTP("supervisors_status_by_id", http.HandlerFunc(a.getSupervisorStatusByID))) + mux.Handle("/supervisors", a.metrics.WrapHTTP("supervisors", http.HandlerFunc(a.getAllSupervisors))) a.log.Info("new app initialized", "redis_address", redisAddr, "gpu_type", gpuType, "http_address", a.httpServer.Addr) diff --git a/src/int_test.go b/src/int_test.go index e17753a..fa8a82a 100644 --- a/src/int_test.go +++ b/src/int_test.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "log/slog" "os" "os/signal" "sync" From 2eaf975af2cbc707be8de1052df76abb8d6e7ee0 Mon Sep 17 00:00:00 2001 From: avii778 <142438557+avii778@users.noreply.github.com> Date: Mon, 3 Nov 2025 18:45:33 -0500 Subject: [PATCH 5/6] Json with relevant grafana queries --- grafana_queries.json | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 grafana_queries.json diff --git a/grafana_queries.json b/grafana_queries.json new file mode 100644 index 0000000..62fc22b --- /dev/null +++ b/grafana_queries.json @@ -0,0 +1,35 @@ +{ + "grafana_queries": { + "http_metrics": { + "http_requests_per_second": "sum by (job) (rate(http_requests_total{job=\"$job\"}[$__rate_interval]))", + "http_in_flight_requests": "sum by (handler) (http_in_flight_requests{job=\"$job\"})", + "http_failure_rate_percent": "100 * sum(rate(http_requests_total{job=\"$job\", code=~\"5..\"}[$__rate_interval])) / sum(rate(http_requests_total{job=\"$job\"}[$__rate_interval]))", + "http_latency_p50": "histogram_quantile(0.50, sum by (le) (rate(http_request_duration_seconds_bucket{job=\"$job\"}[$__rate_interval])))", + "http_latency_p99": "histogram_quantile(0.99, sum by (le) (rate(http_request_duration_seconds_bucket{job=\"$job\"}[$__rate_interval])))" + }, + + "system_metrics": { + "cpu_usage_cores": "rate(process_cpu_seconds_total{job=\"$job\"}[$__rate_interval])", + "memory_resident_bytes": "process_resident_memory_bytes{job=\"$job\"}", + "heap_alloc_bytes": "go_memstats_heap_alloc_bytes{job=\"$job\"}", + "goroutines": "go_goroutines{job=\"$job\"}", + "threads": "go_threads{job=\"$job\"}", + "gc_pause_seconds_avg": "rate(go_gc_duration_seconds_sum{job=\"$job\"}[$__rate_interval]) / rate(go_gc_duration_seconds_count{job=\"$job\"}[$__rate_interval])", + "gc_pause_seconds_p99": "histogram_quantile(0.99, sum by (le) (rate(go_gc_duration_seconds_bucket{job=\"$job\"}[$__rate_interval])))", + "process_uptime_seconds": "time() - process_start_time_seconds{job=\"$job\"}" + }, + + "disk_metrics": { + "disk_usage_bytes": "node_filesystem_size_bytes{fstype!=\"tmpfs\", job=\"$job\"} - node_filesystem_free_bytes{fstype!=\"tmpfs\", job=\"$job\"}", + "disk_free_bytes": "node_filesystem_free_bytes{fstype!=\"tmpfs\", job=\"$job\"}", + "disk_io_reads_per_second": "rate(node_disk_reads_completed_total{job=\"$job\"}[$__rate_interval])", + "disk_io_writes_per_second": "rate(node_disk_writes_completed_total{job=\"$job\"}[$__rate_interval])" + }, + + "health_and_status": { + "target_up": "up{job=\"$job\"}", + "scrape_duration_seconds": "prometheus_target_interval_length_seconds{quantile=\"0.99\"}", + "scrape_sample_rate": "sum by (job) (rate(prometheus_tsdb_head_samples_appended_total[$__rate_interval]))" + } + } +} From 6344c9577d5ca0800aebca64483a10409fb926e9 Mon Sep 17 00:00:00 2001 From: avii778 <142438557+avii778@users.noreply.github.com> Date: Mon, 5 Jan 2026 19:43:22 -0500 Subject: [PATCH 6/6] Added basic http and redis health-checking --- src/api.go | 16 +++++ src/health.go | 149 ++++++++++++++++++++++++++++++++++++++++++++++ src/prometheus.go | 13 ++++ 3 files changed, 178 insertions(+) create mode 100644 src/health.go diff --git a/src/api.go b/src/api.go index 75f9f38..5a4c526 100644 --- a/src/api.go +++ b/src/api.go @@ -27,6 +27,7 @@ type App struct { log *slog.Logger metrics *Metrics statusRegistry *StatusRegistry + health *HealthChecker } func NewApp(redisAddr, gpuType string, log *slog.Logger, metrics *Metrics) *App { @@ -49,6 +50,7 @@ func NewApp(redisAddr, gpuType string, log *slog.Logger, metrics *Metrics) *App log: log, metrics: metrics, statusRegistry: statusRegistry, + health: NewHealthChecker(client, log, 5*time.Second), } mux.Handle("/auth/login", a.metrics.WrapHTTP("auth_login", http.HandlerFunc(a.login))) @@ -58,6 +60,19 @@ func NewApp(redisAddr, gpuType string, log *slog.Logger, metrics *Metrics) *App mux.Handle("/supervisors/status", a.metrics.WrapHTTP("supervisors_status", http.HandlerFunc(a.getSupervisorStatus))) mux.Handle("/supervisors/status/", a.metrics.WrapHTTP("supervisors_status_by_id", http.HandlerFunc(a.getSupervisorStatusByID))) mux.Handle("/supervisors", a.metrics.WrapHTTP("supervisors", http.HandlerFunc(a.getAllSupervisors))) + mux.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + mux.Handle("/readyz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ready, _ := a.health.Ready() // we really don't need much with the results currently + + if !ready { + w.WriteHeader(http.StatusServiceUnavailable) + } + + w.WriteHeader(http.StatusOK) + })) a.log.Info("new app initialized", "redis_address", redisAddr, "gpu_type", gpuType, "http_address", a.httpServer.Addr) @@ -138,6 +153,7 @@ func main() { defer stop() go app.metrics.StartCollecting(ctx) + go app.health.Start(ctx) <-ctx.Done() log.Info("shutdown signal received") diff --git a/src/health.go b/src/health.go new file mode 100644 index 0000000..4821d2f --- /dev/null +++ b/src/health.go @@ -0,0 +1,149 @@ +package main + +import ( + "context" + "errors" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/redis/go-redis/v9" +) + +type CheckResult struct { + OK bool + Latency time.Duration + LastErr string + LastTime time.Time +} + +type HealthChecker struct { + log *slog.Logger + redis *redis.Client + http *http.Client + interval time.Duration + + mu sync.RWMutex + results map[string]CheckResult +} + +func NewHealthChecker(redis *redis.Client, log *slog.Logger, interval time.Duration) *HealthChecker { + return &HealthChecker{ + log: log, + redis: redis, + http: &http.Client{}, + interval: interval, + results: make(map[string]CheckResult), + } +} + +func (h *HealthChecker) Start(ctx context.Context) { + + h.runAllChecksOnce(ctx) + + ticker := time.NewTicker(h.interval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + h.runAllChecksOnce(ctx) + } + } + }() +} + +func (h *HealthChecker) Ready() (bool, map[string]CheckResult) { + + h.mu.RLock() + defer h.mu.RUnlock() + + // copy for race safety + out := make(map[string]CheckResult, len(h.results)) + for k, v := range h.results { + out[k] = v + } + + crit := []string{"redis", "self_http"} + for _, name := range crit { + r, ok := h.results[name] + if !ok || !r.OK { + return false, out + } + } + return true, out + +} + +func (h *HealthChecker) runAllChecksOnce(ctx context.Context) { + h.runCheck(ctx, "redis", 250*time.Millisecond, h.checkRedis) + h.runCheck(ctx, "self_http", 250*time.Millisecond, h.checkSelfHTTP) +} + +func (h *HealthChecker) runCheck(parent context.Context, s string, timeout time.Duration, f func(context.Context) error) { + + start := time.Now() + cctx, cancel := context.WithTimeout(parent, timeout) + err := f(cctx) + cancel() + lat := time.Since(start) + + var errStr string + + if err != nil { + errStr = err.Error() + h.log.Warn("health check failed", "check", s, "err", errStr) + } + + h.mu.Lock() + h.results[s] = CheckResult{ + OK: err != nil, + Latency: lat, + LastErr: errStr, // lol + LastTime: time.Now(), + } + h.mu.Unlock() + + // send to prometheus + if err != nil { + healthCheckOK.WithLabelValues(s).Set(1) + } else { + healthCheckOK.WithLabelValues(s).Set(0) + } + + healthCheckLatency.WithLabelValues(s).Set(lat.Seconds()) +} + +func (h *HealthChecker) checkRedis(c context.Context) error { + + if h.redis == nil { + return errors.New("redis client is nil") + } + + return h.redis.Ping(c).Err() +} + +// call some proc to detect staleness +func (h *HealthChecker) checkSelfHTTP(ctx context.Context) error { + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "some_http_endpoint_to_check", nil) // not sure what to put here yet, please provide feedback on what to check + + if err != nil { + return err + } + + resp, err := h.http.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return errors.New("self_http returned non-2xx") + } + return nil +} diff --git a/src/prometheus.go b/src/prometheus.go index a5cffc7..7fc6bae 100644 --- a/src/prometheus.go +++ b/src/prometheus.go @@ -14,6 +14,19 @@ import ( meminfo "github.com/shirou/gopsutil/v3/mem" ) +// health check metrics +var ( + healthCheckOK = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "health_check_ok", + Help: "1 if the named health check is passing, else 0.", + }, []string{"check"}) + + healthCheckLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "health_check_latency_seconds", + Help: "Latency of the named health check in seconds.", + }, []string{"check"}) +) + // http metrics var ( httpInFlight = promauto.NewGauge(prometheus.GaugeOpts{