From b5c3d7b707d69c1bc3bf12a892226b64c626cda3 Mon Sep 17 00:00:00 2001 From: Raihaan Sandhu <63803872+GaminRick7@users.noreply.github.com> Date: Sun, 21 Sep 2025 23:44:36 -0400 Subject: [PATCH 1/5] Implemented Status Table --- src/api.go | 112 ++++++++++++++++++++++++++++++++++++---- src/status.go | 128 ++++++++++++++++++++++++++++++++++++++++++++++ src/supervisor.go | 6 +++ src/util.go | 20 ++++++-- 4 files changed, 251 insertions(+), 15 deletions(-) create mode 100644 src/status.go diff --git a/src/api.go b/src/api.go index f5d71ba..0a751ac 100644 --- a/src/api.go +++ b/src/api.go @@ -2,12 +2,14 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "log/slog" "net/http" "os" "os/signal" + "strings" "sync" "syscall" "time" @@ -16,34 +18,43 @@ import ( ) type App struct { - redisClient *redis.Client - scheduler *Scheduler - supervisor *Supervisor - httpServer *http.Server - wg sync.WaitGroup - log *slog.Logger + redisClient *redis.Client + scheduler *Scheduler + supervisor *Supervisor + httpServer *http.Server + wg sync.WaitGroup + log *slog.Logger + statusRegistry *StatusRegistry } func NewApp(redisAddr, gpuType string, log *slog.Logger) *App { client := redis.NewClient(&redis.Options{Addr: redisAddr}) scheduler := NewScheduler(redisAddr, log) + statusRegistry := NewStatusRegistry(client, log) consumerID := fmt.Sprintf("worker_%d", os.Getpid()) supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log) + // Add dummy supervisors for testing + addDummySupervisors(statusRegistry, log) + mux := http.NewServeMux() a := &App{ - redisClient: client, - scheduler: scheduler, - supervisor: supervisor, - httpServer: &http.Server{Addr: ":3000", Handler: mux}, - log: log, + redisClient: client, + scheduler: scheduler, + supervisor: supervisor, + httpServer: &http.Server{Addr: ":3000", Handler: mux}, + log: log, + statusRegistry: statusRegistry, } mux.HandleFunc("/auth/login", a.login) mux.HandleFunc("/auth/refresh", a.refresh) mux.HandleFunc("/jobs", a.enqueueJob) mux.HandleFunc("/jobs/status", a.getJobStatus) + mux.HandleFunc("/supervisors/status", a.getSupervisorStatus) + mux.HandleFunc("/supervisors/status/", a.getSupervisorStatusByID) + mux.HandleFunc("/supervisors", a.getAllSupervisors) a.log.Info("new app initialized", "redis_address", redisAddr, "gpu_type", gpuType, "http_address", a.httpServer.Addr) @@ -58,6 +69,12 @@ func (a *App) Start() error { return err } + // Start supervisor + if err := a.supervisor.Start(); err != nil { + a.log.Error("supervisor start failed", "err", err) + return err + } + // Launch HTTP server a.wg.Add(1) go func() { @@ -164,3 +181,76 @@ func (a *App) getJobStatus(w http.ResponseWriter, r *http.Request) { id := r.URL.Query().Get("id") fmt.Fprintln(w, "job id=", id) } + +func (a *App) getSupervisorStatus(w http.ResponseWriter, r *http.Request) { + supervisors, err := a.statusRegistry.GetAllSupervisors() + if err != nil { + a.log.Error("failed to get supervisor status", "error", err) + http.Error(w, "failed to get supervisor status", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]interface{}{ + "supervisors": supervisors, + "count": len(supervisors), + }); err != nil { + a.log.Error("failed to encode supervisor status response", "error", err) + http.Error(w, "failed to encode response", http.StatusInternalServerError) + return + } +} + +func (a *App) getSupervisorStatusByID(w http.ResponseWriter, r *http.Request) { + // extract consumer ID from URL path + path := strings.TrimPrefix(r.URL.Path, "/supervisors/status/") + if path == "" { + http.Error(w, "consumer ID required", http.StatusBadRequest) + return + } + + supervisor, err := a.statusRegistry.GetSupervisor(path) + if err != nil { + a.log.Error("failed to get supervisor status", "consumer_id", path, "error", err) + http.Error(w, "supervisor not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(supervisor); err != nil { + a.log.Error("failed to encode supervisor status response", "error", err) + http.Error(w, "failed to encode response", http.StatusInternalServerError) + return + } +} + +func (a *App) getAllSupervisors(w http.ResponseWriter, r *http.Request) { + // check if we want only active supervisors + activeOnly := r.URL.Query().Get("active") == "true" + + var supervisors []SupervisorStatus + var err error + + if activeOnly { + supervisors, err = a.statusRegistry.GetActiveSupervisors() + } else { + supervisors, err = a.statusRegistry.GetAllSupervisors() + } + + if err != nil { + a.log.Error("failed to get supervisors", "active_only", activeOnly, "error", err) + http.Error(w, "failed to get supervisors", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]interface{}{ + "supervisors": supervisors, + "count": len(supervisors), + "active_only": activeOnly, + }); err != nil { + a.log.Error("failed to encode supervisors response", "error", err) + http.Error(w, "failed to encode response", http.StatusInternalServerError) + return + } +} diff --git a/src/status.go b/src/status.go new file mode 100644 index 0000000..09179fa --- /dev/null +++ b/src/status.go @@ -0,0 +1,128 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/redis/go-redis/v9" +) + +type StatusRegistry struct { + redisClient *redis.Client + log *slog.Logger +} + +func NewStatusRegistry(redisClient *redis.Client, log *slog.Logger) *StatusRegistry { + return &StatusRegistry{ + redisClient: redisClient, + log: log, + } +} + +func addDummySupervisors(statusRegistry *StatusRegistry, log *slog.Logger) { + now := time.Now() + + // three dummy supervisors with different statuses + dummySupervisors := []SupervisorStatus{ + { + ConsumerID: "worker_amd_001", + GPUType: "AMD", + Status: "active", + LastSeen: now, // now + StartedAt: now.Add(-2 * time.Hour), // 2hours ago + }, + { + ConsumerID: "worker_nvidia_002", + GPUType: "NVIDIA", + Status: "active", + LastSeen: now.Add(-30 * time.Second), // 30 seconds ago + StartedAt: now.Add(-1 * time.Hour), // 1 hour ago + }, + { + ConsumerID: "worker_tt_003", + GPUType: "TT", + Status: "inactive", + LastSeen: now.Add(-5 * time.Minute), // seen 5 minutes ago + StartedAt: now.Add(-3 * time.Hour), // 3 hours ago + }, + } + + // Add each dummy supervisor to the registry + for _, supervisor := range dummySupervisors { + if err := statusRegistry.UpdateStatus(supervisor.ConsumerID, supervisor); err != nil { + log.Error("failed to add dummy supervisor", "consumer_id", supervisor.ConsumerID, "error", err) + } else { + log.Info("added dummy supervisor", "consumer_id", supervisor.ConsumerID, "gpu_type", supervisor.GPUType, "status", supervisor.Status) + } + } +} + +func (sr *StatusRegistry) GetAllSupervisors() ([]SupervisorStatus, error) { + ctx := context.Background() + result := sr.redisClient.HGetAll(ctx, SupervisorStatusKey) + if result.Err() != nil { + return nil, fmt.Errorf("failed to get supervisor status: %w", result.Err()) + } + + var supervisors []SupervisorStatus + for consumerID, statusJSON := range result.Val() { + var status SupervisorStatus + if err := json.Unmarshal([]byte(statusJSON), &status); err != nil { + sr.log.Error("failed to unmarshal supervisor status", "consumer_id", consumerID, "error", err) + continue + } + supervisors = append(supervisors, status) + } + + return supervisors, nil +} + +func (sr *StatusRegistry) GetSupervisor(consumerID string) (*SupervisorStatus, error) { + ctx := context.Background() + result := sr.redisClient.HGet(ctx, SupervisorStatusKey, consumerID) + if result.Err() != nil { + return nil, fmt.Errorf("failed to get supervisor status: %w", result.Err()) + } + + var status SupervisorStatus + if err := json.Unmarshal([]byte(result.Val()), &status); err != nil { + return nil, fmt.Errorf("failed to unmarshal supervisor status: %w", err) + } + + return &status, nil +} + +func (sr *StatusRegistry) GetActiveSupervisors() ([]SupervisorStatus, error) { + allSupervisors, err := sr.GetAllSupervisors() + if err != nil { + return nil, err + } + + var activeSupervisors []SupervisorStatus + for _, supervisor := range allSupervisors { + if supervisor.Status == "active" { + activeSupervisors = append(activeSupervisors, supervisor) + } + } + + return activeSupervisors, nil +} + +func (sr *StatusRegistry) UpdateStatus(consumerID string, status SupervisorStatus) error { + ctx := context.Background() + statusJSON, err := json.Marshal(status) + if err != nil { + return fmt.Errorf("failed to marshal supervisor status: %w", err) + } + + result := sr.redisClient.HSet(ctx, SupervisorStatusKey, consumerID, string(statusJSON)) + if result.Err() != nil { + return fmt.Errorf("failed to update supervisor status: %w", result.Err()) + } + + sr.log.Info("supervisor status updated", "consumer_id", consumerID, "status", status.Status) + return nil +} diff --git a/src/supervisor.go b/src/supervisor.go index 5c5756b..09b0597 100644 --- a/src/supervisor.go +++ b/src/supervisor.go @@ -150,6 +150,12 @@ func (s *Supervisor) canHandleJob(job Job) bool { // TODO: Actually schedule a container here func (s *Supervisor) processJob(job Job) bool { + s.log.Info("starting job processing", "job_id", job.ID, "payload", job.Payload) + + // Simulate processing time + time.Sleep(100 * time.Millisecond) + + s.log.Info("job processing completed", "job_id", job.ID) return true } diff --git a/src/util.go b/src/util.go index 9a2e05c..0e52475 100644 --- a/src/util.go +++ b/src/util.go @@ -7,10 +7,14 @@ import ( ) const ( - StreamName = "jobs:stream" - ConsumerGroup = "workers" - MaxRetries = 3 - RetryDelay = 5 * time.Second + StreamName = "jobs:stream" + ConsumerGroup = "workers" + MaxRetries = 3 + RetryDelay = 5 * time.Second + SupervisorRegistry = "supervisors:registry" + SupervisorStatusKey = "supervisors:status" + HeartbeatInterval = 10 * time.Second + HeartbeatTimeout = 30 * time.Second ) type Job struct { @@ -22,6 +26,14 @@ type Job struct { RequiredGPU string `json:"gpu"` } +type SupervisorStatus struct { + ConsumerID string `json:"consumer_id"` + GPUType string `json:"gpu_type"` + Status string `json:"status"` // "active", "inactive", "failed" + LastSeen time.Time `json:"last_seen"` + StartedAt time.Time `json:"started_at"` +} + func generateJobID() string { return fmt.Sprintf("job_%d_%d", time.Now().UnixNano(), os.Getpid()) } From b887bff191d30bb319970a536ac8dd18408fad3d Mon Sep 17 00:00:00 2001 From: Raihaan Sandhu <63803872+GaminRick7@users.noreply.github.com> Date: Sun, 21 Sep 2025 23:57:42 -0400 Subject: [PATCH 2/5] rolled back unwanted util and supervisor changes --- src/supervisor.go | 6 ------ src/util.go | 12 ++++-------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/supervisor.go b/src/supervisor.go index 09b0597..5c5756b 100644 --- a/src/supervisor.go +++ b/src/supervisor.go @@ -150,12 +150,6 @@ func (s *Supervisor) canHandleJob(job Job) bool { // TODO: Actually schedule a container here func (s *Supervisor) processJob(job Job) bool { - s.log.Info("starting job processing", "job_id", job.ID, "payload", job.Payload) - - // Simulate processing time - time.Sleep(100 * time.Millisecond) - - s.log.Info("job processing completed", "job_id", job.ID) return true } diff --git a/src/util.go b/src/util.go index 0e52475..e2cd3c0 100644 --- a/src/util.go +++ b/src/util.go @@ -7,14 +7,10 @@ import ( ) const ( - StreamName = "jobs:stream" - ConsumerGroup = "workers" - MaxRetries = 3 - RetryDelay = 5 * time.Second - SupervisorRegistry = "supervisors:registry" - SupervisorStatusKey = "supervisors:status" - HeartbeatInterval = 10 * time.Second - HeartbeatTimeout = 30 * time.Second + StreamName = "jobs:stream" + ConsumerGroup = "workers" + MaxRetries = 3 + RetryDelay = 5 * time.Second ) type Job struct { From e35a6ab551290053315ce32af1b2cfe5c4a49b8b Mon Sep 17 00:00:00 2001 From: Raihaan Sandhu <63803872+GaminRick7@users.noreply.github.com> Date: Mon, 29 Sep 2025 15:00:12 -0400 Subject: [PATCH 3/5] fix: change SupervisorStatusType to enum --- src/api.go | 1 - src/status.go | 8 ++++---- src/util.go | 27 ++++++++++++++++++--------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/api.go b/src/api.go index 0a751ac..70c058b 100644 --- a/src/api.go +++ b/src/api.go @@ -225,7 +225,6 @@ func (a *App) getSupervisorStatusByID(w http.ResponseWriter, r *http.Request) { } func (a *App) getAllSupervisors(w http.ResponseWriter, r *http.Request) { - // check if we want only active supervisors activeOnly := r.URL.Query().Get("active") == "true" var supervisors []SupervisorStatus diff --git a/src/status.go b/src/status.go index 09179fa..621a78f 100644 --- a/src/status.go +++ b/src/status.go @@ -30,21 +30,21 @@ func addDummySupervisors(statusRegistry *StatusRegistry, log *slog.Logger) { { ConsumerID: "worker_amd_001", GPUType: "AMD", - Status: "active", + Status: SupervisorStateActive, LastSeen: now, // now StartedAt: now.Add(-2 * time.Hour), // 2hours ago }, { ConsumerID: "worker_nvidia_002", GPUType: "NVIDIA", - Status: "active", + Status: SupervisorStateActive, LastSeen: now.Add(-30 * time.Second), // 30 seconds ago StartedAt: now.Add(-1 * time.Hour), // 1 hour ago }, { ConsumerID: "worker_tt_003", GPUType: "TT", - Status: "inactive", + Status: SupervisorStateInactive, LastSeen: now.Add(-5 * time.Minute), // seen 5 minutes ago StartedAt: now.Add(-3 * time.Hour), // 3 hours ago }, @@ -103,7 +103,7 @@ func (sr *StatusRegistry) GetActiveSupervisors() ([]SupervisorStatus, error) { var activeSupervisors []SupervisorStatus for _, supervisor := range allSupervisors { - if supervisor.Status == "active" { + if supervisor.Status == SupervisorStateActive { activeSupervisors = append(activeSupervisors, supervisor) } } diff --git a/src/util.go b/src/util.go index e2cd3c0..f6ee922 100644 --- a/src/util.go +++ b/src/util.go @@ -7,10 +7,11 @@ import ( ) const ( - StreamName = "jobs:stream" - ConsumerGroup = "workers" - MaxRetries = 3 - RetryDelay = 5 * time.Second + StreamName = "jobs:stream" + ConsumerGroup = "workers" + SupervisorStatusKey = "supervisors:status" + MaxRetries = 3 + RetryDelay = 5 * time.Second ) type Job struct { @@ -22,12 +23,20 @@ type Job struct { RequiredGPU string `json:"gpu"` } +type SupervisorState string + +const ( + SupervisorStateActive SupervisorState = "active" + SupervisorStateInactive SupervisorState = "inactive" + SupervisorStateFailed SupervisorState = "failed" +) + type SupervisorStatus struct { - ConsumerID string `json:"consumer_id"` - GPUType string `json:"gpu_type"` - Status string `json:"status"` // "active", "inactive", "failed" - LastSeen time.Time `json:"last_seen"` - StartedAt time.Time `json:"started_at"` + ConsumerID string `json:"consumer_id"` + GPUType string `json:"gpu_type"` + Status SupervisorState `json:"status"` + LastSeen time.Time `json:"last_seen"` + StartedAt time.Time `json:"started_at"` } func generateJobID() string { From 1c80ec4048f9349b9d94fcf7c4d4037335a32cfa Mon Sep 17 00:00:00 2001 From: Raihaan Sandhu <63803872+GaminRick7@users.noreply.github.com> Date: Mon, 29 Sep 2025 16:18:39 -0400 Subject: [PATCH 4/5] fix: add dummy supervisor test and status table tests --- src/int_test.go | 138 ++++++++++++++++++++++++++++++++++++++++++++++++ src/status.go | 39 -------------- 2 files changed, 138 insertions(+), 39 deletions(-) diff --git a/src/int_test.go b/src/int_test.go index ff35165..7daf89b 100644 --- a/src/int_test.go +++ b/src/int_test.go @@ -9,11 +9,51 @@ import ( "sync" "syscall" "testing" + "time" "github.com/redis/go-redis/v9" ) +func addDummySupervisors(statusRegistry *StatusRegistry, log *slog.Logger) { + now := time.Now() + + dummySupervisors := []SupervisorStatus{ + { + ConsumerID: "worker_amd_001", + GPUType: "AMD", + Status: SupervisorStateActive, + LastSeen: now, // now + StartedAt: now.Add(-2 * time.Hour), // 2hours ago + }, + { + ConsumerID: "worker_nvidia_002", + GPUType: "NVIDIA", + Status: SupervisorStateActive, + LastSeen: now.Add(-30 * time.Second), // 30 seconds ago + StartedAt: now.Add(-1 * time.Hour), // 1 hour ago + }, + { + ConsumerID: "worker_tt_003", + GPUType: "TT", + Status: SupervisorStateInactive, + LastSeen: now.Add(-5 * time.Minute), // seen 5 minutes ago + StartedAt: now.Add(-3 * time.Hour), // 3 hours ago + }, + } + + for _, supervisor := range dummySupervisors { + if err := statusRegistry.UpdateStatus(supervisor.ConsumerID, supervisor); err != nil { + log.Error("failed to add dummy supervisor", "consumer_id", supervisor.ConsumerID, "error", err) + } else { + log.Info("added dummy supervisor", "consumer_id", supervisor.ConsumerID, "gpu_type", supervisor.GPUType, "status", supervisor.Status) + } + } +} + func TestIntegration(t *testing.T) { + os.Setenv("ENV", "test") + defer os.Unsetenv("ENV") + redisAddr := "localhost:6379" log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) @@ -58,3 +98,101 @@ func TestIntegration(t *testing.T) { wg.Wait() supervisor.Stop() } + +func TestDummySupervisors(t *testing.T) { + redisAddr := "localhost:6379" + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + + // Test 1: Dummy supervisors should be added in test environment + os.Setenv("ENV", "test") + defer os.Unsetenv("ENV") + + // Clean up Redis data before test + client := redis.NewClient(&redis.Options{Addr: redisAddr}) + defer client.Close() + client.FlushDB(context.Background()) + + app := NewApp(redisAddr, "AMD", log) + defer app.redisClient.Close() + + // Manually add dummy supervisors for testing + addDummySupervisors(app.statusRegistry, log) + + supervisors, err := app.statusRegistry.GetAllSupervisors() + if err != nil { + t.Errorf("Failed to get supervisors: %v", err) + } + // Verify dummy supervisor IDs exist + dummyIDs := []string{"worker_amd_001", "worker_nvidia_002", "worker_tt_003"} + for _, dummyID := range dummyIDs { + found := false + for _, supervisor := range supervisors { + if supervisor.ConsumerID == dummyID { + found = true + break + } + } + if !found { + t.Errorf("Expected dummy supervisor %s not found", dummyID) + } + } +} + +// Unit tests for StatusRegistry +func TestStatusRegistry_BasicOperations(t *testing.T) { + redisAddr := "localhost:6379" + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + + client := redis.NewClient(&redis.Options{Addr: redisAddr}) + defer client.Close() + client.FlushDB(context.Background()) + + registry := NewStatusRegistry(client, log) + + now := time.Now() + + // Test adding and retrieving a supervisor + status := SupervisorStatus{ + ConsumerID: "test_worker_001", + GPUType: "AMD", + Status: SupervisorStateActive, + LastSeen: now, + StartedAt: now.Add(-1 * time.Hour), + } + + // Add status + err := registry.UpdateStatus(status.ConsumerID, status) + if err != nil { + t.Errorf("UpdateStatus failed: %v", err) + } + + // Retrieve status + retrievedStatus, err := registry.GetSupervisor(status.ConsumerID) + if err != nil { + t.Errorf("GetSupervisor failed: %v", err) + } + + if retrievedStatus.Status != status.Status { + t.Errorf("Expected Status %s, got %s", status.Status, retrievedStatus.Status) + } + + // Test getting all supervisors + allSupervisors, err := registry.GetAllSupervisors() + if err != nil { + t.Errorf("GetAllSupervisors failed: %v", err) + } + + if len(allSupervisors) != 1 { + t.Errorf("Expected 1 supervisor, got %d", len(allSupervisors)) + } + + // Test getting active supervisors + activeSupervisors, err := registry.GetActiveSupervisors() + if err != nil { + t.Errorf("GetActiveSupervisors failed: %v", err) + } + + if len(activeSupervisors) != 1 { + t.Errorf("Expected 1 active supervisor, got %d", len(activeSupervisors)) + } +} diff --git a/src/status.go b/src/status.go index 621a78f..6b20f3e 100644 --- a/src/status.go +++ b/src/status.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log/slog" - "time" "github.com/redis/go-redis/v9" ) @@ -22,44 +21,6 @@ func NewStatusRegistry(redisClient *redis.Client, log *slog.Logger) *StatusRegis } } -func addDummySupervisors(statusRegistry *StatusRegistry, log *slog.Logger) { - now := time.Now() - - // three dummy supervisors with different statuses - dummySupervisors := []SupervisorStatus{ - { - ConsumerID: "worker_amd_001", - GPUType: "AMD", - Status: SupervisorStateActive, - LastSeen: now, // now - StartedAt: now.Add(-2 * time.Hour), // 2hours ago - }, - { - ConsumerID: "worker_nvidia_002", - GPUType: "NVIDIA", - Status: SupervisorStateActive, - LastSeen: now.Add(-30 * time.Second), // 30 seconds ago - StartedAt: now.Add(-1 * time.Hour), // 1 hour ago - }, - { - ConsumerID: "worker_tt_003", - GPUType: "TT", - Status: SupervisorStateInactive, - LastSeen: now.Add(-5 * time.Minute), // seen 5 minutes ago - StartedAt: now.Add(-3 * time.Hour), // 3 hours ago - }, - } - - // Add each dummy supervisor to the registry - for _, supervisor := range dummySupervisors { - if err := statusRegistry.UpdateStatus(supervisor.ConsumerID, supervisor); err != nil { - log.Error("failed to add dummy supervisor", "consumer_id", supervisor.ConsumerID, "error", err) - } else { - log.Info("added dummy supervisor", "consumer_id", supervisor.ConsumerID, "gpu_type", supervisor.GPUType, "status", supervisor.Status) - } - } -} - func (sr *StatusRegistry) GetAllSupervisors() ([]SupervisorStatus, error) { ctx := context.Background() result := sr.redisClient.HGetAll(ctx, SupervisorStatusKey) From 10ec784e8f01dc967c01de2f4eed83628809efbc Mon Sep 17 00:00:00 2001 From: Raihaan Sandhu <63803872+GaminRick7@users.noreply.github.com> Date: Mon, 29 Sep 2025 16:29:06 -0400 Subject: [PATCH 5/5] fix: remove addDummySupervisors from prod --- src/api.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/api.go b/src/api.go index 70c058b..b7b4ba9 100644 --- a/src/api.go +++ b/src/api.go @@ -35,9 +35,6 @@ func NewApp(redisAddr, gpuType string, log *slog.Logger) *App { consumerID := fmt.Sprintf("worker_%d", os.Getpid()) supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log) - // Add dummy supervisors for testing - addDummySupervisors(statusRegistry, log) - mux := http.NewServeMux() a := &App{ redisClient: client,