Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 149 additions & 11 deletions src/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewApp(redisAddr, gpuType string, log *slog.Logger) *App {

mux.HandleFunc("/auth/login", a.login)
mux.HandleFunc("/auth/refresh", a.refresh)
mux.HandleFunc("/jobs", a.enqueueJob)
mux.HandleFunc("/jobs", a.handleJobs)
mux.HandleFunc("/jobs/status", a.getJobStatus)
mux.HandleFunc("/supervisors/status", a.getSupervisorStatus)
mux.HandleFunc("/supervisors/status/", a.getSupervisorStatusByID)
Expand Down Expand Up @@ -167,25 +167,163 @@ func (a *App) refresh(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, world!\n")
}

func (a *App) enqueueJob(w http.ResponseWriter, r *http.Request) {
a.log.Info("enqueueJob handler accessed", "remote_address", r.RemoteAddr)
payload := map[string]interface{}{
"task_id": 123,
"data": "test_data_123",
type CreateJobRequest struct {
Type string `json:"type"`
Payload map[string]interface{} `json:"payload"`
RequiredGPU string `json:"gpu,omitempty"`
}

type CreateJobResponse struct {
JobID string `json:"job_id"`
}

func (a *App) handleJobs(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
a.createJob(w, r)
return
}
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}

func (a *App) createJob(w http.ResponseWriter, r *http.Request) {

a.log.Info("createJob handler accessed", "remote_address", r.RemoteAddr)

var req CreateJobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
a.log.Error("failed to decode request body", "err", err)
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}

if req.Type == "" {
http.Error(w, "Job type is required", http.StatusBadRequest)
return
}
if err := a.scheduler.Enqueue("jobType", "gpuType", payload); err != nil {
a.log.Error("enqueue failed", "err", err, "payload", payload)
http.Error(w, "enqueue failed", http.StatusInternalServerError)
return
}
a.log.Info("job enqueued", "payload", payload)
w.WriteHeader(http.StatusAccepted)
fmt.Fprint(w, "enqueued")

a.log.Info("job created", "job_id", jobID, "type", req.Type, "gpu", req.RequiredGPU)

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)

response := CreateJobResponse{JobID: jobID}
if err := json.NewEncoder(w).Encode(response); err != nil {
a.log.Error("failed to encode response", "err", err)
}
}

func (a *App) getJobStatus(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
fmt.Fprintln(w, "job id=", id)
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

jobID := r.URL.Query().Get("id")
if jobID == "" {
// Try to get from path if query param not provided
path := strings.TrimPrefix(r.URL.Path, "/jobs/status/")
if path != "" && path != "/jobs/status" {
jobID = path
}
}

if jobID == "" {
http.Error(w, "Job ID is required", http.StatusBadRequest)
return
}

a.log.Info("getJobStatus handler accessed", "job_id", jobID, "remote_address", r.RemoteAddr)

job, err := a.statusRegistry.GetJobStatus(jobID)
if err != nil {
a.log.Error("failed to get job status", "job_id", jobID, "error", err)
http.Error(w, fmt.Sprintf("Job not found: %s", jobID), http.StatusNotFound)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(job); err != nil {
a.log.Error("failed to encode job status response", "error", err)
http.Error(w, "Failed to encode response", http.StatusInternalServerError)
return
}
}

func (a *App) getSupervisorStatus(w http.ResponseWriter, r *http.Request) {
supervisors, err := a.statusRegistry.GetAllSupervisors()
if err != nil {
a.log.Error("failed to get supervisor status", "error", err)
http.Error(w, "failed to get supervisor status", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"supervisors": supervisors,
"count": len(supervisors),
}); err != nil {
a.log.Error("failed to encode supervisor status response", "error", err)
http.Error(w, "failed to encode response", http.StatusInternalServerError)
return
}
}

func (a *App) getSupervisorStatusByID(w http.ResponseWriter, r *http.Request) {
// extract consumer ID from URL path
path := strings.TrimPrefix(r.URL.Path, "/supervisors/status/")
if path == "" {
http.Error(w, "consumer ID required", http.StatusBadRequest)
return
}

supervisor, err := a.statusRegistry.GetSupervisor(path)
if err != nil {
a.log.Error("failed to get supervisor status", "consumer_id", path, "error", err)
http.Error(w, "supervisor not found", http.StatusNotFound)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(supervisor); err != nil {
a.log.Error("failed to encode supervisor status response", "error", err)
http.Error(w, "failed to encode response", http.StatusInternalServerError)
return
}
}

func (a *App) getAllSupervisors(w http.ResponseWriter, r *http.Request) {
activeOnly := r.URL.Query().Get("active") == "true"

var supervisors []SupervisorStatus
var err error

if activeOnly {
supervisors, err = a.statusRegistry.GetActiveSupervisors()
} else {
supervisors, err = a.statusRegistry.GetAllSupervisors()
}

if err != nil {
a.log.Error("failed to get supervisors", "active_only", activeOnly, "error", err)
http.Error(w, "failed to get supervisors", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"supervisors": supervisors,
"count": len(supervisors),
"active_only": activeOnly,
}); err != nil {
a.log.Error("failed to encode supervisors response", "error", err)
http.Error(w, "failed to encode response", http.StatusInternalServerError)
return
}
}

func (a *App) getSupervisorStatus(w http.ResponseWriter, r *http.Request) {
Expand Down
105 changes: 105 additions & 0 deletions src/images/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,108 @@ sh run_tests.sh

### nvidia-smi: command not found
- download [nvidia container toolkit](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html#installing-the-nvidia-container-toolkit)

# mist/images Package Documentation

## Overview

The `mist/images` package provides a `ContainerMgr` struct and related methods for managing Docker containers and volumes programmatically using the Docker Go SDK. It enforces limits on the number of containers and volumes, and provides safe creation, deletion, and lifecycle management.

---

## Main APIs

### `type ContainerMgr struct`
Manages Docker containers and volumes, enforces resource limits, and tracks active resources.

**Fields:**
- `ctx context.Context` — Context for Docker operations.
- `cli *client.Client` — Docker client.
- `containerLimit int` — Maximum allowed containers.
- `volumeLimit int` — Maximum allowed volumes.
- `containers map[string]struct{}` — Tracks active container IDs.
- `volumes map[string]struct{}` — Tracks active volume names.
- `mu sync.Mutex` — Mutex for concurrency protection.

---

### `func NewContainerMgr(client *client.Client, containerLimit, volumeLimit int) *ContainerMgr`
Creates a new `ContainerMgr` with the specified Docker client and resource limits.

---

### `func (mgr *ContainerMgr) createVolume(volumeName string) (volume.Volume, error)`
Creates a Docker volume with the given name, enforcing the volume limit.
Returns the created volume or an error.

---

### `func (mgr *ContainerMgr) removeVolume(volumeName string, force bool) error`
Removes a Docker volume by name.
Returns an error if the volume does not exist or is in use (unless `force` is true).

---

### `func (mgr *ContainerMgr) runContainer(imageName string, runtimeName string, volumeName string) (string, error)`
Creates and starts a container with the specified image, runtime, and volume attached at `/data`.
Enforces the container limit.
Returns the container ID or an error.

---

### `func (mgr *ContainerMgr) stopContainer(containerID string) error`
Stops a running container by ID.
Returns an error if the operation fails.

---

### `func (mgr *ContainerMgr) removeContainer(containerID string) error`
Removes a container by ID and deletes it from the internal tracking map.
Returns an error if the operation fails.

---

## Test Plan

The test suite (`serve_image_test.go`) covers the following scenarios:

- Create a volume, check it exists, delete it, check it no longer exists.
- Create a volume with the same name twice (should not fail).
- Remove a volume that doesn't exist (should fail or return error).
- Remove a volume in use (should fail or return error).
- Attach a volume that does not exist (should fail or return error).
- Two containers attach to the same volume (should succeed in Docker, but test for your policy).
- Two containers try to attach to the same volume at the same time (should succeed in Docker).
- Set a limit of 100 volumes (should fail on 101st if you enforce a limit).
- Set a limit of 10 containers (should fail on 11th if you enforce a limit).

---

## Example Usage

```go
cli, _ := client.NewClientWithOpts(client.FromEnv)
mgr := NewContainerMgr(cli, 10, 100)

vol, err := mgr.createVolume("myvol")
if err != nil { /* handle error */ }

cid, err := mgr.runContainerCuda("myvol")
if err != nil { /* handle error */ }

_ = mgr.stopContainer(cid)
_ = mgr.removeContainer(cid)
_ = mgr.removeVolume("myvol", true)
```

---

## Notes

- All resource-creating methods enforce limits and track resources in maps for accurate management.
- All destructive operations (`removeVolume`, `removeContainer`) return errors for non-existent or in-use resources.
- The package is designed for integration with Docker and expects a running Docker daemon.

---

**For more details, see the source code and comments in `serve_image.go
35 changes: 35 additions & 0 deletions src/images/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module mist/images

go 1.24.6

require github.com/docker/docker v28.2.2+incompatible

require (
github.com/Microsoft/go-winio v0.4.21 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-connections v0.6.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/atomicwriter v0.1.0 // indirect
github.com/moby/term v0.5.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/time v0.12.0 // indirect
gotest.tools/v3 v3.5.2 // indirect
)
Loading