Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions internal/slurm/detect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package slurm

import (
"fmt"
"os/exec"
"strconv"
"strings"
)

type slurmVersion struct {
Major int
Minor int
Micro int
}

// detectVersion runs `sinfo --version` and parses the Slurm version string.
// The output format is: "slurm 23.11.9"
func detectVersion() (slurmVersion, error) {
cmd := exec.Command("sinfo", "--version")
out, err := cmd.Output()
if err != nil {
return slurmVersion{}, fmt.Errorf("sinfo --version failed: %w", err)
}

line := strings.TrimSpace(string(out))
// Expected format: "slurm 23.11.9"
parts := strings.Fields(line)
if len(parts) < 2 {
return slurmVersion{}, fmt.Errorf("unexpected sinfo --version output: %q", line)
}

return parseVersionString(parts[1])
}

// parseVersionString parses a version string like "23.11.9" into a slurmVersion.
func parseVersionString(s string) (slurmVersion, error) {
parts := strings.SplitN(s, ".", 3)
if len(parts) < 2 {
return slurmVersion{}, fmt.Errorf("cannot parse version %q", s)
}

major, err := strconv.Atoi(parts[0])
if err != nil {
return slurmVersion{}, fmt.Errorf("invalid major version in %q: %w", s, err)
}

minor, err := strconv.Atoi(parts[1])
if err != nil {
return slurmVersion{}, fmt.Errorf("invalid minor version in %q: %w", s, err)
}

micro := 0
if len(parts) == 3 {
micro, err = strconv.Atoi(parts[2])
if err != nil {
return slurmVersion{}, fmt.Errorf("invalid micro version in %q: %w", s, err)
}
}

return slurmVersion{Major: major, Minor: minor, Micro: micro}, nil
}
159 changes: 77 additions & 82 deletions internal/slurm/slurm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,45 @@ package slurm

import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"time"

"github.com/rs/zerolog/log"
)

// SacctOutput represents the JSON output from sacct command
// Assumes slurm version 23.02.6
type SacctOutput struct {
Meta struct {
Slurm struct {
Version struct {
Major int `json:"major"`
Minor int `json:"minor"`
Micro int `json:"micro"`
Release string `json:"release"`
Cluster string `json:"cluster"`
} `json:"version"`
} `json:"Slurm"`
} `json:"meta"`
Jobs []Job `json:"jobs"`
Errors []any `json:"errors"`
Warnings []any `json:"warnings"`
}

// NumberValue represents a value that can be set/infinite/number
type NumberValue struct {
Set bool `json:"set"`
Infinite bool `json:"infinite"`
Number int `json:"number"`
}

// Job represents a single job from sacct output
// TresAlloc represents a TRES allocation entry
type TresAlloc struct {
Type string `json:"type"`
Name string `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
}

// Job is the canonical normalized job type used throughout the application.
// Version-specific parsers normalize their output into this type.
type Job struct {
JobID int `json:"job_id"`
Name string `json:"name"`
User string `json:"user"`
Account string `json:"account"`
AllocationNodes int `json:"allocation_nodes"`
State struct {
Current string `json:"current"`
Current string `json:"current"` // always a plain string (normalized from []string in 23.11)
Reason string `json:"reason"`
} `json:"state"`
Partition string `json:"partition"`
QOS string `json:"qos"`
ExitCode struct {
Status string `json:"status"`
ReturnCode int `json:"return_code"`
Status string `json:"status"` // normalized from []string in 23.11
ReturnCode int `json:"return_code"` // normalized from NumberValue in 23.11
} `json:"exit_code"`
Time struct {
Submission int64 `json:"submission"`
Expand Down Expand Up @@ -99,68 +88,44 @@ type Job struct {
SubmitLine string `json:"submit_line"`
}

// TresAlloc represents a TRES allocation entry
type TresAlloc struct {
Type string `json:"type"`
Name string `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
}

// GetJobs queries sacct and returns parsed job data
// GetJobs detects the Slurm version, then queries sacct and returns normalized jobs.
func GetJobs(lookbackMinutes int) ([]Job, error) {
// Calculate the start time
startTime := time.Now().Add(-time.Duration(lookbackMinutes) * time.Minute)
startTimeStr := startTime.Format("2006-01-02T15:04:05")

log.Debug().
Str("start_time", startTimeStr).
Msg("Querying sacct")

// Build the sacct command
cmd := exec.Command("sacct",
"--parsable2",
"--allocations",
"--units=M",
"--allusers",
"-S", startTimeStr,
"-E", "now",
"-o", "JobIDRaw,JobID,User,Account,Partition,QOS,JobName,State,ExitCode,Submit,Start,End,ElapsedRaw,AllocCPUS,AllocNodes,CPUTimeRAW,MaxRSS,AveRSS,NodeList,AllocTRES",
"--json",
)

var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

if err := cmd.Run(); err != nil {
log.Error().
Str("stderr", stderr.String()).
Err(err).
Msg("sacct command failed")
return nil, fmt.Errorf("sacct command failed: %w", err)
v, err := detectVersion()
if err != nil {
return nil, fmt.Errorf("failed to detect Slurm version: %w", err)
}
cmdOutput := stdout.String()

// Parse the JSON output
var output SacctOutput
if err := json.Unmarshal([]byte(cmdOutput), &output); err != nil {
log.Error().
Str("stdout", cmdOutput).
Err(err).
Msg("Failed to parse sacct JSON output")
return nil, fmt.Errorf("failed to parse sacct output: %w", err)
log.Info().
Int("major", v.Major).
Int("minor", v.Minor).
Int("micro", v.Micro).
Msg("Detected Slurm version")

switch {
case v.Major == 23 && v.Minor <= 2:
return getJobsV2302(lookbackMinutes)
default:
if v.Major != 23 || v.Minor != 11 {
log.Warn().
Int("major", v.Major).
Int("minor", v.Minor).
Msg("Untested Slurm version, falling back to 23.11 parser")
}
return getJobsV2311(lookbackMinutes)
}
}

return output.Jobs, nil
// GetCurrentState returns the normalized state string for a job.
func GetCurrentState(job *Job) string {
return job.State.Current
}

// IsJobRunning returns true if the job is currently running
// IsJobRunning returns true if the job is currently running.
func IsJobRunning(job *Job) bool {
return job.State.Current == "RUNNING"
}

// IsJobCompleted returns true if the job has reached a terminal state
// IsJobCompleted returns true if the job has reached a terminal state.
func IsJobCompleted(job *Job) bool {
completedStates := map[string]bool{
"COMPLETED": true,
Expand All @@ -170,29 +135,59 @@ func IsJobCompleted(job *Job) bool {
"PREEMPTED": true,
"NODE_FAIL": true,
}

return completedStates[job.State.Current]
}

// CalculateCoreHoursForElapsed calculates core hours for a given elapsed time
// CalculateCoreHoursForElapsed calculates core hours for a given elapsed time.
func CalculateCoreHoursForElapsed(job *Job, elapsedSeconds int) float64 {
// Elapsed time is in seconds
elapsedHours := float64(elapsedSeconds) / 3600.0

// Get allocated CPUs (cores) from TRES
allocatedCPUs := 0
for _, tres := range job.Tres.Allocated {
if tres.Type == "cpu" {
allocatedCPUs = tres.Count
break
}
}

// Fallback to required CPUs if TRES not available
if allocatedCPUs == 0 {
allocatedCPUs = job.Required.CPUs
}

// Core hours = CPUs * hours
return float64(allocatedCPUs) * elapsedHours
}

// runSacct executes the sacct command and returns its raw JSON output.
// The sacct flags are identical across all supported Slurm versions.
func runSacct(lookbackMinutes int) ([]byte, error) {
startTime := time.Now().Add(-time.Duration(lookbackMinutes) * time.Minute)
startTimeStr := startTime.Format("2006-01-02T15:04:05")

log.Debug().
Str("start_time", startTimeStr).
Msg("Querying sacct")

cmd := exec.Command("sacct",
"--parsable2",
"--allocations",
"--units=M",
"--allusers",
"-S", startTimeStr,
"-E", "now",
"-o", "JobIDRaw,JobID,User,Account,Partition,QOS,JobName,State,ExitCode,Submit,Start,End,ElapsedRaw,AllocCPUS,AllocNodes,CPUTimeRAW,MaxRSS,AveRSS,NodeList,AllocTRES",
"--json",
)

var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

if err := cmd.Run(); err != nil {
log.Error().
Str("stderr", stderr.String()).
Err(err).
Msg("sacct command failed")
return nil, fmt.Errorf("sacct command failed: %w", err)
}

return stdout.Bytes(), nil
}
Loading
Loading