From d3057fad38348ef3d65cd214bc269b88aad05009 Mon Sep 17 00:00:00 2001 From: Eric Garcia Date: Mon, 23 Feb 2026 16:13:31 -0600 Subject: [PATCH] add slurm version detection --- internal/slurm/detect.go | 61 ++++++++++++ internal/slurm/slurm.go | 159 +++++++++++++++--------------- internal/slurm/v23_02.go | 154 +++++++++++++++++++++++++++++ internal/slurm/v23_11.go | 190 ++++++++++++++++++++++++++++++++++++ internal/tracker/tracker.go | 2 +- 5 files changed, 483 insertions(+), 83 deletions(-) create mode 100644 internal/slurm/detect.go create mode 100644 internal/slurm/v23_02.go create mode 100644 internal/slurm/v23_11.go diff --git a/internal/slurm/detect.go b/internal/slurm/detect.go new file mode 100644 index 0000000..1cf4e5d --- /dev/null +++ b/internal/slurm/detect.go @@ -0,0 +1,61 @@ +package slurm + +import ( + "fmt" + "os/exec" + "strconv" + "strings" +) + +type slurmVersion struct { + Major int + Minor int + Micro int +} + +// detectVersion runs `sinfo --version` and parses the Slurm version string. +// The output format is: "slurm 23.11.9" +func detectVersion() (slurmVersion, error) { + cmd := exec.Command("sinfo", "--version") + out, err := cmd.Output() + if err != nil { + return slurmVersion{}, fmt.Errorf("sinfo --version failed: %w", err) + } + + line := strings.TrimSpace(string(out)) + // Expected format: "slurm 23.11.9" + parts := strings.Fields(line) + if len(parts) < 2 { + return slurmVersion{}, fmt.Errorf("unexpected sinfo --version output: %q", line) + } + + return parseVersionString(parts[1]) +} + +// parseVersionString parses a version string like "23.11.9" into a slurmVersion. +func parseVersionString(s string) (slurmVersion, error) { + parts := strings.SplitN(s, ".", 3) + if len(parts) < 2 { + return slurmVersion{}, fmt.Errorf("cannot parse version %q", s) + } + + major, err := strconv.Atoi(parts[0]) + if err != nil { + return slurmVersion{}, fmt.Errorf("invalid major version in %q: %w", s, err) + } + + minor, err := strconv.Atoi(parts[1]) + if err != nil { + return slurmVersion{}, fmt.Errorf("invalid minor version in %q: %w", s, err) + } + + micro := 0 + if len(parts) == 3 { + micro, err = strconv.Atoi(parts[2]) + if err != nil { + return slurmVersion{}, fmt.Errorf("invalid micro version in %q: %w", s, err) + } + } + + return slurmVersion{Major: major, Minor: minor, Micro: micro}, nil +} diff --git a/internal/slurm/slurm.go b/internal/slurm/slurm.go index 7a67e5e..06c0cff 100644 --- a/internal/slurm/slurm.go +++ b/internal/slurm/slurm.go @@ -2,7 +2,6 @@ package slurm import ( "bytes" - "encoding/json" "fmt" "os/exec" "time" @@ -10,25 +9,6 @@ import ( "github.com/rs/zerolog/log" ) -// SacctOutput represents the JSON output from sacct command -// Assumes slurm version 23.02.6 -type SacctOutput struct { - Meta struct { - Slurm struct { - Version struct { - Major int `json:"major"` - Minor int `json:"minor"` - Micro int `json:"micro"` - Release string `json:"release"` - Cluster string `json:"cluster"` - } `json:"version"` - } `json:"Slurm"` - } `json:"meta"` - Jobs []Job `json:"jobs"` - Errors []any `json:"errors"` - Warnings []any `json:"warnings"` -} - // NumberValue represents a value that can be set/infinite/number type NumberValue struct { Set bool `json:"set"` @@ -36,7 +16,16 @@ type NumberValue struct { Number int `json:"number"` } -// Job represents a single job from sacct output +// TresAlloc represents a TRES allocation entry +type TresAlloc struct { + Type string `json:"type"` + Name string `json:"name"` + ID int `json:"id"` + Count int `json:"count"` +} + +// Job is the canonical normalized job type used throughout the application. +// Version-specific parsers normalize their output into this type. type Job struct { JobID int `json:"job_id"` Name string `json:"name"` @@ -44,14 +33,14 @@ type Job struct { Account string `json:"account"` AllocationNodes int `json:"allocation_nodes"` State struct { - Current string `json:"current"` + Current string `json:"current"` // always a plain string (normalized from []string in 23.11) Reason string `json:"reason"` } `json:"state"` Partition string `json:"partition"` QOS string `json:"qos"` ExitCode struct { - Status string `json:"status"` - ReturnCode int `json:"return_code"` + Status string `json:"status"` // normalized from []string in 23.11 + ReturnCode int `json:"return_code"` // normalized from NumberValue in 23.11 } `json:"exit_code"` Time struct { Submission int64 `json:"submission"` @@ -99,68 +88,44 @@ type Job struct { SubmitLine string `json:"submit_line"` } -// TresAlloc represents a TRES allocation entry -type TresAlloc struct { - Type string `json:"type"` - Name string `json:"name"` - ID int `json:"id"` - Count int `json:"count"` -} - -// GetJobs queries sacct and returns parsed job data +// GetJobs detects the Slurm version, then queries sacct and returns normalized jobs. func GetJobs(lookbackMinutes int) ([]Job, error) { - // Calculate the start time - startTime := time.Now().Add(-time.Duration(lookbackMinutes) * time.Minute) - startTimeStr := startTime.Format("2006-01-02T15:04:05") - - log.Debug(). - Str("start_time", startTimeStr). - Msg("Querying sacct") - - // Build the sacct command - cmd := exec.Command("sacct", - "--parsable2", - "--allocations", - "--units=M", - "--allusers", - "-S", startTimeStr, - "-E", "now", - "-o", "JobIDRaw,JobID,User,Account,Partition,QOS,JobName,State,ExitCode,Submit,Start,End,ElapsedRaw,AllocCPUS,AllocNodes,CPUTimeRAW,MaxRSS,AveRSS,NodeList,AllocTRES", - "--json", - ) - - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - if err := cmd.Run(); err != nil { - log.Error(). - Str("stderr", stderr.String()). - Err(err). - Msg("sacct command failed") - return nil, fmt.Errorf("sacct command failed: %w", err) + v, err := detectVersion() + if err != nil { + return nil, fmt.Errorf("failed to detect Slurm version: %w", err) } - cmdOutput := stdout.String() - // Parse the JSON output - var output SacctOutput - if err := json.Unmarshal([]byte(cmdOutput), &output); err != nil { - log.Error(). - Str("stdout", cmdOutput). - Err(err). - Msg("Failed to parse sacct JSON output") - return nil, fmt.Errorf("failed to parse sacct output: %w", err) + log.Info(). + Int("major", v.Major). + Int("minor", v.Minor). + Int("micro", v.Micro). + Msg("Detected Slurm version") + + switch { + case v.Major == 23 && v.Minor <= 2: + return getJobsV2302(lookbackMinutes) + default: + if v.Major != 23 || v.Minor != 11 { + log.Warn(). + Int("major", v.Major). + Int("minor", v.Minor). + Msg("Untested Slurm version, falling back to 23.11 parser") + } + return getJobsV2311(lookbackMinutes) } +} - return output.Jobs, nil +// GetCurrentState returns the normalized state string for a job. +func GetCurrentState(job *Job) string { + return job.State.Current } -// IsJobRunning returns true if the job is currently running +// IsJobRunning returns true if the job is currently running. func IsJobRunning(job *Job) bool { return job.State.Current == "RUNNING" } -// IsJobCompleted returns true if the job has reached a terminal state +// IsJobCompleted returns true if the job has reached a terminal state. func IsJobCompleted(job *Job) bool { completedStates := map[string]bool{ "COMPLETED": true, @@ -170,16 +135,13 @@ func IsJobCompleted(job *Job) bool { "PREEMPTED": true, "NODE_FAIL": true, } - return completedStates[job.State.Current] } -// CalculateCoreHoursForElapsed calculates core hours for a given elapsed time +// CalculateCoreHoursForElapsed calculates core hours for a given elapsed time. func CalculateCoreHoursForElapsed(job *Job, elapsedSeconds int) float64 { - // Elapsed time is in seconds elapsedHours := float64(elapsedSeconds) / 3600.0 - // Get allocated CPUs (cores) from TRES allocatedCPUs := 0 for _, tres := range job.Tres.Allocated { if tres.Type == "cpu" { @@ -187,12 +149,45 @@ func CalculateCoreHoursForElapsed(job *Job, elapsedSeconds int) float64 { break } } - - // Fallback to required CPUs if TRES not available if allocatedCPUs == 0 { allocatedCPUs = job.Required.CPUs } - // Core hours = CPUs * hours return float64(allocatedCPUs) * elapsedHours } + +// runSacct executes the sacct command and returns its raw JSON output. +// The sacct flags are identical across all supported Slurm versions. +func runSacct(lookbackMinutes int) ([]byte, error) { + startTime := time.Now().Add(-time.Duration(lookbackMinutes) * time.Minute) + startTimeStr := startTime.Format("2006-01-02T15:04:05") + + log.Debug(). + Str("start_time", startTimeStr). + Msg("Querying sacct") + + cmd := exec.Command("sacct", + "--parsable2", + "--allocations", + "--units=M", + "--allusers", + "-S", startTimeStr, + "-E", "now", + "-o", "JobIDRaw,JobID,User,Account,Partition,QOS,JobName,State,ExitCode,Submit,Start,End,ElapsedRaw,AllocCPUS,AllocNodes,CPUTimeRAW,MaxRSS,AveRSS,NodeList,AllocTRES", + "--json", + ) + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + log.Error(). + Str("stderr", stderr.String()). + Err(err). + Msg("sacct command failed") + return nil, fmt.Errorf("sacct command failed: %w", err) + } + + return stdout.Bytes(), nil +} diff --git a/internal/slurm/v23_02.go b/internal/slurm/v23_02.go new file mode 100644 index 0000000..00071be --- /dev/null +++ b/internal/slurm/v23_02.go @@ -0,0 +1,154 @@ +package slurm + +import ( + "encoding/json" + "fmt" +) + +// sacctOutput2302 matches the sacct --json output format for Slurm 23.02.x. +// Version fields are integers, and State.Current is a plain string. +type sacctOutput2302 struct { + Meta struct { + Slurm struct { + Version struct { + Major int `json:"major"` + Minor int `json:"minor"` + Micro int `json:"micro"` + Release string `json:"release"` + Cluster string `json:"cluster"` + } `json:"version"` + } `json:"Slurm"` + } `json:"meta"` + Jobs []job2302 `json:"jobs"` + Errors []any `json:"errors"` + Warnings []any `json:"warnings"` +} + +type job2302 struct { + JobID int `json:"job_id"` + Name string `json:"name"` + User string `json:"user"` + Account string `json:"account"` + AllocationNodes int `json:"allocation_nodes"` + State struct { + Current string `json:"current"` + Reason string `json:"reason"` + } `json:"state"` + Partition string `json:"partition"` + QOS string `json:"qos"` + ExitCode struct { + Status string `json:"status"` + ReturnCode int `json:"return_code"` + } `json:"exit_code"` + Time struct { + Submission int64 `json:"submission"` + Start int64 `json:"start"` + End int64 `json:"end"` + Elapsed int `json:"elapsed"` + Eligible int64 `json:"eligible"` + Suspended int `json:"suspended"` + Limit NumberValue `json:"limit"` + Planned NumberValue `json:"planned"` + System struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"system"` + Total struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"total"` + User struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"user"` + } `json:"time"` + Association struct { + Account string `json:"account"` + Cluster string `json:"cluster"` + Partition string `json:"partition"` + User string `json:"user"` + ID int `json:"id"` + } `json:"association"` + Cluster string `json:"cluster"` + Group string `json:"group"` + Nodes string `json:"nodes"` + Required struct { + CPUs int `json:"CPUs"` + MemoryPerCPU NumberValue `json:"memory_per_cpu"` + MemoryPerNode NumberValue `json:"memory_per_node"` + } `json:"required"` + Tres struct { + Allocated []TresAlloc `json:"allocated"` + Requested []TresAlloc `json:"requested"` + } `json:"tres"` + Flags []string `json:"flags"` + WorkingDirectory string `json:"working_directory"` + SubmitLine string `json:"submit_line"` +} + +// getJobsV2302 runs sacct and returns normalized jobs for Slurm 23.02.x. +func getJobsV2302(lookbackMinutes int) ([]Job, error) { + data, err := runSacct(lookbackMinutes) + if err != nil { + return nil, err + } + + var output sacctOutput2302 + if err := json.Unmarshal(data, &output); err != nil { + return nil, fmt.Errorf("failed to parse sacct output (23.02 format): %w", err) + } + + jobs := make([]Job, len(output.Jobs)) + for i, j := range output.Jobs { + jobs[i] = normalize2302(j) + } + return jobs, nil +} + +// normalize2302 converts a 23.02.x job to the canonical Job type. +// This version's format is closest to canonical, so the mapping is direct. +func normalize2302(j job2302) Job { + var out Job + out.JobID = j.JobID + out.Name = j.Name + out.User = j.User + out.Account = j.Account + out.AllocationNodes = j.AllocationNodes + out.State.Current = j.State.Current + out.State.Reason = j.State.Reason + out.Partition = j.Partition + out.QOS = j.QOS + out.ExitCode.Status = j.ExitCode.Status + out.ExitCode.ReturnCode = j.ExitCode.ReturnCode + out.Time.Submission = j.Time.Submission + out.Time.Start = j.Time.Start + out.Time.End = j.Time.End + out.Time.Elapsed = j.Time.Elapsed + out.Time.Eligible = j.Time.Eligible + out.Time.Suspended = j.Time.Suspended + out.Time.Limit = j.Time.Limit + out.Time.Planned = j.Time.Planned + out.Time.System.Seconds = j.Time.System.Seconds + out.Time.System.Microseconds = j.Time.System.Microseconds + out.Time.Total.Seconds = j.Time.Total.Seconds + out.Time.Total.Microseconds = j.Time.Total.Microseconds + out.Time.User.Seconds = j.Time.User.Seconds + out.Time.User.Microseconds = j.Time.User.Microseconds + out.Association.Account = j.Association.Account + out.Association.Cluster = j.Association.Cluster + out.Association.Partition = j.Association.Partition + out.Association.User = j.Association.User + out.Association.ID = j.Association.ID + out.Cluster = j.Cluster + out.Group = j.Group + out.Nodes = j.Nodes + out.Required.CPUs = j.Required.CPUs + out.Required.MemoryPerCPU = j.Required.MemoryPerCPU + out.Required.MemoryPerNode = j.Required.MemoryPerNode + out.Tres.Allocated = j.Tres.Allocated + out.Tres.Requested = j.Tres.Requested + out.Flags = j.Flags + out.WorkingDirectory = j.WorkingDirectory + out.SubmitLine = j.SubmitLine + return out +} diff --git a/internal/slurm/v23_11.go b/internal/slurm/v23_11.go new file mode 100644 index 0000000..e15f463 --- /dev/null +++ b/internal/slurm/v23_11.go @@ -0,0 +1,190 @@ +package slurm + +import ( + "encoding/json" + "fmt" +) + +// sacctOutput2311 matches the sacct --json output format for Slurm 23.11.x. +// Version fields are strings, State.Current is []string, and ExitCode uses NumberValue. +type sacctOutput2311 struct { + Meta struct { + Plugin struct { + Type string `json:"type"` + Name string `json:"name"` + DataParser string `json:"data_parser"` + AccountingStorage string `json:"accounting_storage"` + } `json:"plugin"` + Client struct { + Source string `json:"source"` + User string `json:"user"` + Group string `json:"group"` + } `json:"client"` + Command []string `json:"command"` + Slurm struct { + Version struct { + Major string `json:"major"` + Minor string `json:"minor"` + Micro string `json:"micro"` + } `json:"version"` + Release string `json:"release"` + Cluster string `json:"cluster"` + } `json:"slurm"` + } `json:"meta"` + Jobs []job2311 `json:"jobs"` + Errors []any `json:"errors"` + Warnings []any `json:"warnings"` +} + +type exitCode2311 struct { + Status []string `json:"status"` + ReturnCode NumberValue `json:"return_code"` + Signal struct { + ID NumberValue `json:"id"` + Name string `json:"name"` + } `json:"signal"` +} + +type job2311 struct { + JobID int `json:"job_id"` + Name string `json:"name"` + User string `json:"user"` + Account string `json:"account"` + AllocationNodes int `json:"allocation_nodes"` + State struct { + Current []string `json:"current"` + Reason string `json:"reason"` + } `json:"state"` + Partition string `json:"partition"` + QOS string `json:"qos"` + ExitCode exitCode2311 `json:"exit_code"` + DerivedExitCode exitCode2311 `json:"derived_exit_code"` + Time struct { + Submission int64 `json:"submission"` + Start int64 `json:"start"` + End int64 `json:"end"` + Elapsed int `json:"elapsed"` + Eligible int64 `json:"eligible"` + Suspended int `json:"suspended"` + Limit NumberValue `json:"limit"` + Planned NumberValue `json:"planned"` + System struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"system"` + Total struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"total"` + User struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"user"` + } `json:"time"` + Association struct { + Account string `json:"account"` + Cluster string `json:"cluster"` + Partition string `json:"partition"` + User string `json:"user"` + ID int `json:"id"` + } `json:"association"` + Cluster string `json:"cluster"` + Group string `json:"group"` + Nodes string `json:"nodes"` + Required struct { + CPUs int `json:"CPUs"` + MemoryPerCPU NumberValue `json:"memory_per_cpu"` + MemoryPerNode NumberValue `json:"memory_per_node"` + } `json:"required"` + Tres struct { + Allocated []TresAlloc `json:"allocated"` + Requested []TresAlloc `json:"requested"` + } `json:"tres"` + Flags []string `json:"flags"` + WorkingDirectory string `json:"working_directory"` + SubmitLine string `json:"submit_line"` +} + +// getJobsV2311 runs sacct and returns normalized jobs for Slurm 23.11.x. +func getJobsV2311(lookbackMinutes int) ([]Job, error) { + data, err := runSacct(lookbackMinutes) + if err != nil { + return nil, err + } + + var output sacctOutput2311 + if err := json.Unmarshal(data, &output); err != nil { + return nil, fmt.Errorf("failed to parse sacct output (23.11 format): %w", err) + } + + jobs := make([]Job, len(output.Jobs)) + for i, j := range output.Jobs { + jobs[i] = normalize2311(j) + } + return jobs, nil +} + +// normalize2311 converts a 23.11.x job to the canonical Job type. +// Key normalizations: +// - State.Current: first element of []string, or "" if empty +// - ExitCode.Status: first element of []string, or "" if empty +// - ExitCode.ReturnCode: NumberValue.Number +func normalize2311(j job2311) Job { + var out Job + out.JobID = j.JobID + out.Name = j.Name + out.User = j.User + out.Account = j.Account + out.AllocationNodes = j.AllocationNodes + + if len(j.State.Current) > 0 { + out.State.Current = j.State.Current[0] + } + out.State.Reason = j.State.Reason + + out.Partition = j.Partition + out.QOS = j.QOS + + if len(j.ExitCode.Status) > 0 { + out.ExitCode.Status = j.ExitCode.Status[0] + } + out.ExitCode.ReturnCode = j.ExitCode.ReturnCode.Number + + out.Time.Submission = j.Time.Submission + out.Time.Start = j.Time.Start + out.Time.End = j.Time.End + out.Time.Elapsed = j.Time.Elapsed + out.Time.Eligible = j.Time.Eligible + out.Time.Suspended = j.Time.Suspended + out.Time.Limit = j.Time.Limit + out.Time.Planned = j.Time.Planned + out.Time.System.Seconds = j.Time.System.Seconds + out.Time.System.Microseconds = j.Time.System.Microseconds + out.Time.Total.Seconds = j.Time.Total.Seconds + out.Time.Total.Microseconds = j.Time.Total.Microseconds + out.Time.User.Seconds = j.Time.User.Seconds + out.Time.User.Microseconds = j.Time.User.Microseconds + + out.Association.Account = j.Association.Account + out.Association.Cluster = j.Association.Cluster + out.Association.Partition = j.Association.Partition + out.Association.User = j.Association.User + out.Association.ID = j.Association.ID + + out.Cluster = j.Cluster + out.Group = j.Group + out.Nodes = j.Nodes + + out.Required.CPUs = j.Required.CPUs + out.Required.MemoryPerCPU = j.Required.MemoryPerCPU + out.Required.MemoryPerNode = j.Required.MemoryPerNode + + out.Tres.Allocated = j.Tres.Allocated + out.Tres.Requested = j.Tres.Requested + + out.Flags = j.Flags + out.WorkingDirectory = j.WorkingDirectory + out.SubmitLine = j.SubmitLine + + return out +} diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go index f2278bb..3955f50 100644 --- a/internal/tracker/tracker.go +++ b/internal/tracker/tracker.go @@ -33,7 +33,7 @@ func ProcessJob(cfg *config.Config, job *slurm.Job, stateDriver *state.Driver, p if !isRunning && !isCompleted { log.Debug(). Int("job_id", job.JobID). - Str("state", job.State.Current). + Str("state", slurm.GetCurrentState(job)). Msg("Skipping job (not running or completed)") return nil }