diff --git a/README.md b/README.md index d2c3d40..f662944 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,11 @@ configurations. interacting with the cluster. Process job details, accounting information, cluster configuration details with your favorite AI application (like Claude) which supports MCP extensions. +- **Ray Integration**: Examples and tools for integrating SGE/Grid Engine with +[Ray](https://www.ray.io/), a distributed computing framework. Deploy Ray clusters +on SGE infrastructure with automated head and worker node management. +(See the [`examples/ray`](examples/ray) directory and [`docs/ray-integration.md`](docs/ray-integration.md) +for more information.) ## Go API Development Container diff --git a/cmd/adapter/go.mod b/cmd/adapter/go.mod index f95594f..4cd3a5a 100644 --- a/cmd/adapter/go.mod +++ b/cmd/adapter/go.mod @@ -7,6 +7,7 @@ replace github.com/hpc-gridware/go-clusterscheduler => ./../../ require ( github.com/gorilla/mux v1.8.1 github.com/hpc-gridware/go-clusterscheduler v0.0.0-20240914092021-0285da16cc31 + github.com/spf13/cobra v1.9.1 ) require ( @@ -14,7 +15,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/bridges/otelslog v0.8.0 // indirect @@ -28,5 +28,5 @@ require ( go.opentelemetry.io/otel/sdk/log v0.9.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.33.0 // indirect go.opentelemetry.io/otel/trace v1.33.0 // indirect - golang.org/x/sys v0.29.0 // indirect + golang.org/x/sys v0.32.0 // indirect ) diff --git a/cmd/adapter/go.sum b/cmd/adapter/go.sum index ff94502..3b1a487 100644 --- a/cmd/adapter/go.sum +++ b/cmd/adapter/go.sum @@ -57,10 +57,10 @@ go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qq go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/cmd/diag/go.sum b/cmd/diag/go.sum new file mode 100644 index 0000000..632ba28 --- /dev/null +++ b/cmd/diag/go.sum @@ -0,0 +1,22 @@ +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= +github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= +github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= +golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/cmd/simulator/go.sum b/cmd/simulator/go.sum index f8fb7ce..8d74f2f 100644 --- a/cmd/simulator/go.sum +++ b/cmd/simulator/go.sum @@ -18,14 +18,14 @@ github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= -golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= +golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/docs/ray-integration.md b/docs/ray-integration.md new file mode 100644 index 0000000..9e482e7 --- /dev/null +++ b/docs/ray-integration.md @@ -0,0 +1,360 @@ +# Integrating SGE/Grid Engine with Ray + +This guide explains how to integrate Sun Grid Engine (SGE) / Open Grid Engine / Gridware Cluster Scheduler with [Ray](https://www.ray.io/), a distributed computing framework. + +## Overview + +Ray is a unified framework for scaling AI and Python applications. It provides distributed computing capabilities through a cluster of workers. While Ray has built-in support for cloud providers and schedulers like SLURM, it can also be integrated with SGE/Grid Engine clusters. + +## Architecture + +The integration follows a similar pattern to Ray's SLURM integration: + +1. **Head Node**: A Ray head node runs on one SGE execution host +2. **Worker Nodes**: Ray workers are submitted as SGE jobs +3. **Auto-scaling**: Workers can be dynamically added or removed based on workload +4. **Resource Management**: SGE handles resource allocation and job scheduling + +## Integration Approaches + +### Approach 1: Manual Cluster Setup + +Launch Ray head and workers manually using SGE job submission: + +```bash +# Submit Ray head node +qsub -b y -cwd -o ray_head.log -e ray_head.err \ + ray start --head --port=6379 --dashboard-host=0.0.0.0 + +# Wait for head node to start and get its address +HEAD_NODE_IP=$(qstat -j | grep exec_host | cut -d'@' -f2 | cut -d'.' -f1) + +# Submit Ray workers +qsub -b y -cwd -t 1-10 -o ray_worker.\$TASK_ID.log -e ray_worker.\$TASK_ID.err \ + ray start --address=${HEAD_NODE_IP}:6379 +``` + +### Approach 2: Ray Cluster Launcher with SGE Backend + +Ray provides a cluster launcher that can be extended to support SGE. This approach uses a YAML configuration file to define the cluster. + +#### Prerequisites + +- SGE/Grid Engine cluster properly configured +- Ray installed on all execution hosts +- Python 3.8+ on all nodes (Python 3.9+ recommended) +- Network connectivity between all SGE execution hosts + +## Configuration + +### Example Ray Cluster Configuration (ray-sge-cluster.yaml) + +```yaml +# A unique identifier for the cluster +cluster_name: ray-sge-cluster + +# The maximum number of workers the cluster will have at any given time +max_workers: 10 + +# Cloud provider-specific configuration +# For SGE, we use a custom node provider +provider: + type: external + module: ray_sge_provider # Custom module for SGE integration + +# How Ray will authenticate with newly launched nodes +auth: + ssh_user: your_username + ssh_private_key: ~/.ssh/id_rsa + +# SGE-specific settings +sge: + # Queue to submit jobs to + queue: all.q + + # Parallel environment (for multi-slot jobs) + parallel_environment: smp + + # Project to charge resources to + project: your_project + + # Additional qsub options + qsub_options: "-l h_rt=24:00:00 -l mem_free=4G" + +# Configuration for the head node +head_node: + # Resources allocated to head node + resources: + slots: 4 + memory: "16G" + +# Configuration for worker nodes +worker_nodes: + # Resources per worker + resources: + slots: 2 + memory: "8G" + + # Minimum number of workers to maintain + min_workers: 2 + + # Maximum number of workers to scale to + max_workers: 10 + +# Commands to run on each node +setup_commands: + - pip install -U ray[default] + - pip install -U numpy pandas + +# Command to start Ray on the head node +head_start_ray_commands: + - ray stop + - ray start --head --port=6379 --dashboard-host=0.0.0.0 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start Ray on worker nodes +worker_start_ray_commands: + - ray stop + - ray start --address=$RAY_HEAD_IP:6379 +``` + +## Implementation with Go API + +This repository provides a Go API for SGE that can be used to build Ray integration tools. Here's an example: + +### Example: Ray Worker Launcher + +Create a Go application that manages Ray workers as SGE jobs: + +```go +package main + +import ( + "context" + "fmt" + "os" + "time" + + qsub "github.com/hpc-gridware/go-clusterscheduler/pkg/qsub/v9.0" + qstat "github.com/hpc-gridware/go-clusterscheduler/pkg/qstat/v9.0" + "github.com/hpc-gridware/go-clusterscheduler/pkg/qdel/v9.0" +) + +type RayClusterManager struct { + qsub qsub.QSub + qstat qstat.QStat + qdel qdel.QDel + headIP string + workerJobs []int64 +} + +func NewRayClusterManager(headIP string) (*RayClusterManager, error) { + qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{}) + if err != nil { + return nil, err + } + + qst, err := qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{}) + if err != nil { + return nil, err + } + + qd, err := qdel.NewCommandLineQDel(qdel.CommandLineQDelConfig{}) + if err != nil { + return nil, err + } + + return &RayClusterManager{ + qsub: qs, + qstat: qst, + qdel: qd, + headIP: headIP, + workerJobs: make([]int64, 0), + }, nil +} + +func (r *RayClusterManager) StartHead(ctx context.Context) (int64, error) { + jobId, _, err := r.qsub.Submit(ctx, qsub.JobOptions{ + Command: "ray", + CommandArgs: []string{"start", "--head", "--port=6379", "--dashboard-host=0.0.0.0"}, + Binary: qsub.ToPtr(true), + JobName: qsub.ToPtr("ray-head"), + OutputPath: qsub.ToPtr("ray_head.log"), + ErrorPath: qsub.ToPtr("ray_head.err"), + MemoryLimit: qsub.ToPtr("16G"), + }) + + if err != nil { + return 0, fmt.Errorf("failed to submit head node: %w", err) + } + + fmt.Printf("Ray head node submitted with job ID: %d\n", jobId) + return jobId, nil +} + +func (r *RayClusterManager) AddWorkers(ctx context.Context, count int) error { + rayAddress := fmt.Sprintf("%s:6379", r.headIP) + + jobId, _, err := r.qsub.Submit(ctx, qsub.JobOptions{ + Command: "ray", + CommandArgs: []string{"start", "--address=" + rayAddress}, + Binary: qsub.ToPtr(true), + JobName: qsub.ToPtr("ray-worker"), + OutputPath: qsub.ToPtr("ray_worker.$JOB_ID.log"), + ErrorPath: qsub.ToPtr("ray_worker.$JOB_ID.err"), + JobArray: qsub.ToPtr(fmt.Sprintf("1-%d", count)), + MemoryLimit: qsub.ToPtr("8G"), + }) + + if err != nil { + return fmt.Errorf("failed to submit workers: %w", err) + } + + r.workerJobs = append(r.workerJobs, jobId) + fmt.Printf("Submitted %d Ray workers with job ID: %d\n", count, jobId) + return nil +} + +func (r *RayClusterManager) Shutdown(ctx context.Context) error { + // Delete all worker jobs + for _, jobId := range r.workerJobs { + if err := r.qdel.DeleteJobs(ctx, []int64{jobId}); err != nil { + fmt.Printf("Warning: failed to delete job %d: %v\n", jobId, err) + } + } + + fmt.Println("Ray cluster shutdown complete") + return nil +} + +func main() { + if len(os.Args) < 2 { + fmt.Println("Usage: ray-cluster-manager ") + os.Exit(1) + } + + ctx := context.Background() + manager, err := NewRayClusterManager(os.Args[1]) + if err != nil { + fmt.Printf("Error creating cluster manager: %v\n", err) + os.Exit(1) + } + + // Start head node + headJobID, err := manager.StartHead(ctx) + if err != nil { + fmt.Printf("Error starting head node: %v\n", err) + os.Exit(1) + } + + // Wait for head node to be ready + fmt.Println("Waiting for head node to start...") + time.Sleep(30 * time.Second) + + // Add workers + if err := manager.AddWorkers(ctx, 5); err != nil { + fmt.Printf("Error adding workers: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Ray cluster is running. Head node job ID: %d\n", headJobID) + fmt.Println("Press Ctrl+C to shutdown...") + + // Keep running until interrupted + select {} +} +``` + +## Best Practices + +1. **Resource Specification**: Always specify memory and CPU requirements to help SGE schedule jobs efficiently +2. **Job Arrays**: Use SGE job arrays for launching multiple identical workers +3. **Error Handling**: Implement proper error handling and logging for job submission failures +4. **Cleanup**: Always clean up SGE jobs when shutting down the Ray cluster +5. **Monitoring**: Use `qstat` to monitor job status and implement health checks +6. **Network Configuration**: Ensure all SGE execution hosts can communicate on Ray's ports (default: 6379 for Redis) + +## Monitoring and Debugging + +### Check Ray Cluster Status + +```bash +# From the head node +ray status + +# Check SGE job status +qstat -f +``` + +### View Ray Dashboard + +The Ray dashboard is accessible at `http://:8265` by default. + +### Logs + +- Ray logs: `~/ray/session_*/logs/` +- SGE job output: `ray_head.log`, `ray_worker.*.log` +- SGE job errors: `ray_head.err`, `ray_worker.*.err` + +## Resource Management + +SGE provides fine-grained resource management that can be leveraged: + +- **Slots**: Map to Ray CPUs using `-pe smp N` +- **Memory**: Specify using `-l mem_free=XG` +- **Runtime**: Set maximum runtime using `-l h_rt=HH:MM:SS` +- **Queues**: Direct jobs to specific queues using `-q queue_name` + +## Example: Submitting a Ray Application + +Once your Ray cluster is running on SGE: + +```python +import ray + +# Connect to existing cluster +ray.init(address="auto") + +# Your Ray application +@ray.remote +def compute_task(x): + return x * x + +# Submit tasks +futures = [compute_task.remote(i) for i in range(100)] +results = ray.get(futures) + +print(f"Results: {results}") +``` + +## Troubleshooting + +### Workers Cannot Connect to Head + +- Verify network connectivity between execution hosts +- Check that port 6379 is open +- Ensure `RAY_HEAD_IP` is correctly set + +### Jobs Stuck in Queue + +- Check SGE queue status: `qstat -f` +- Verify resource availability: `qhost` +- Review queue configuration: `qconf -sq ` + +### Out of Memory Errors + +- Increase memory request in job submission: `-l mem_free=16G` +- Monitor actual memory usage: `qacct -j ` + +## Further Reading + +- [Ray Documentation](https://docs.ray.io/) +- [Ray Cluster Launcher](https://docs.ray.io/en/latest/cluster/launcher.html) +- [SGE/Grid Engine Documentation](https://github.com/hpc-gridware/clusterscheduler) +- [go-clusterscheduler Examples](../examples/) + +## Support + +For issues related to: +- **SGE Integration**: Open an issue in this repository +- **Ray Core**: Visit [Ray GitHub](https://github.com/ray-project/ray) +- **SGE/Grid Engine**: Visit [Gridware Cluster Scheduler](https://github.com/hpc-gridware/clusterscheduler) diff --git a/examples/customqacct/go.sum b/examples/customqacct/go.sum index 9826afd..f649201 100644 --- a/examples/customqacct/go.sum +++ b/examples/customqacct/go.sum @@ -14,10 +14,10 @@ github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/examples/ray/README.md b/examples/ray/README.md new file mode 100644 index 0000000..4edf585 --- /dev/null +++ b/examples/ray/README.md @@ -0,0 +1,423 @@ +# Ray on SGE/Grid Engine Integration Example + +This directory contains examples and tools for integrating Ray (distributed computing framework) with SGE/Grid Engine using the go-clusterscheduler API. + +## Overview + +Ray is a unified framework for scaling AI and Python applications. This integration allows you to: +- Launch Ray clusters on SGE/Grid Engine infrastructure +- Manage Ray head and worker nodes as SGE jobs +- Leverage SGE's resource management and scheduling capabilities +- Run distributed Ray applications on HPC clusters + +## Files in This Directory + +- **`ray_cluster_manager.go`**: Go application that manages Ray clusters using SGE +- **`start_ray_cluster.sh`**: Bash script to start a Ray cluster on SGE +- **`stop_ray_cluster.sh`**: Bash script to stop a Ray cluster on SGE +- **`example_ray_job.py`**: Example Python application demonstrating Ray usage on SGE +- **`README.md`**: This file + +## Prerequisites + +### On All SGE Execution Hosts + +1. **Ray Installation**: + ```bash + pip install ray[default] + ``` + +2. **Python 3.8+** (Python 3.9+ recommended): + ```bash + python --version + ``` + +3. **Network Connectivity**: Ensure all execution hosts can communicate on: + - Port 6379 (Ray default port for Redis) + - Port 8265 (Ray dashboard, optional) + +### For Go Application + +1. **Go 1.19+**: + ```bash + go version + ``` + +2. **SGE Client Tools**: `qsub`, `qstat`, `qdel` must be available + +## Usage + +### Method 1: Using Shell Scripts (Recommended for Quick Start) + +#### Start a Ray Cluster + +```bash +# Start with default settings (5 workers) +./start_ray_cluster.sh + +# Start with custom number of workers +./start_ray_cluster.sh 10 +``` + +This will: +1. Submit the Ray head node as an SGE job +2. Wait for the head node to start +3. Submit worker nodes as an SGE job array +4. Create `ray_cluster_info.txt` with connection details + +#### Run a Ray Application + +After the cluster starts, use the connection information: + +```bash +# Check the cluster info +cat ray_cluster_info.txt + +# Run the example application +python example_ray_job.py --address :6379 +``` + +#### Stop the Ray Cluster + +```bash +./stop_ray_cluster.sh +``` + +This will delete all Ray-related SGE jobs. + +### Method 2: Using Go Application + +#### Build the Application + +```bash +go build -o ray-cluster-manager ray_cluster_manager.go +``` + +#### Run the Manager + +```bash +./ray-cluster-manager +``` + +The Go application will: +1. Start a Ray head node +2. Launch 5 worker nodes (configurable in code) +3. Monitor the cluster status +4. Provide connection information +5. Wait for Ctrl+C to shutdown + +You can modify the configuration in the `main()` function: +```go +const ( + rayPort = 6379 // Ray communication port + headMemory = "16G" // Memory for head node + workerMemory = "8G" // Memory per worker + workerCount = 5 // Number of workers +) +``` + +## Example Ray Application + +The included `example_ray_job.py` demonstrates several Ray features: + +### 1. Pi Estimation (Monte Carlo) + +Distributed Monte Carlo sampling to estimate π: +```python +python example_ray_job.py --address :6379 +``` + +### 2. Matrix Operations + +Distributed matrix multiplications: +```python +python example_ray_job.py --address :6379 --no-pi --no-actor +``` + +### 3. Ray Actors + +Stateful distributed actors: +```python +python example_ray_job.py --address :6379 --no-pi --no-matrix +``` + +## Advanced Configuration + +### Customizing Resource Requests + +Edit the scripts or Go code to change SGE resource requests: + +**In Shell Script**: +```bash +# Edit start_ray_cluster.sh +HEAD_MEMORY="32G" # Increase head node memory +WORKER_MEMORY="16G" # Increase worker memory + +# Add more SGE options +qsub ... -l h_rt=48:00:00 -pe smp 4 ... +``` + +**In Go Code**: +```go +// Edit ray_cluster_manager.go +jobId, _, err := r.qsub.Submit(ctx, qsub.JobOptions{ + // ... existing options ... + MemoryLimit: qsub.ToPtr("32G"), + ParallelEnvironment: qsub.ToPtr("smp"), + Slots: qsub.ToPtr(4), +}) +``` + +### Specifying SGE Queue + +```bash +# In shell script +qsub -q high_priority.q ... + +# In Go code +Queue: qsub.ToPtr("high_priority.q"), +``` + +### Using Parallel Environments + +For multi-core Ray workers: + +```bash +qsub -pe smp 4 ... # Request 4 cores per worker +``` + +## Monitoring + +### Check Cluster Status + +```bash +# View all jobs +qstat -f + +# View specific job details +qstat -j + +# Ray cluster status (from head node) +ray status --address=:6379 +``` + +### View Logs + +```bash +# Head node logs +tail -f ray_head.log + +# Worker logs (for task array) +tail -f ray_worker.*.log + +# Ray internal logs (on the head node) +ls ~/ray/session_*/logs/ +``` + +### Access Ray Dashboard + +The Ray dashboard provides a web interface for monitoring: + +```bash +# Find the head node hostname +qstat -j | grep exec_host + +# Access dashboard at: +# http://:8265 +``` + +## Troubleshooting + +### Workers Cannot Connect to Head Node + +**Symptoms**: Workers fail to connect, logs show connection errors + +**Solutions**: +1. Verify network connectivity: + ```bash + # From a worker node + telnet 6379 + ``` + +2. Check firewall settings: + ```bash + # Ensure port 6379 is open + ``` + +3. Verify head node is running: + ```bash + qstat -j + ``` + +### Jobs Stuck in Queue + +**Symptoms**: Jobs remain in "qw" state + +**Solutions**: +1. Check resource availability: + ```bash + qhost + qstat -g c + ``` + +2. Verify queue configuration: + ```bash + qconf -sq + ``` + +3. Reduce resource requests (memory, slots) + +### Out of Memory Errors + +**Symptoms**: Jobs die with OOM errors in logs + +**Solutions**: +1. Increase memory request: + ```bash + qsub -l mem_free=32G ... + ``` + +2. Reduce workload per task + +3. Check actual memory usage: + ```bash + qacct -j + ``` + +### Ray Connection Timeout + +**Symptoms**: `ray.init()` times out + +**Solutions**: +1. Verify Ray is running on head node: + ```bash + # On head node + ray status + ``` + +2. Check head node logs: + ```bash + cat ray_head.log + ``` + +3. Ensure using correct address: + ```bash + cat ray_cluster_info.txt + ``` + +## Best Practices + +1. **Resource Planning**: + - Head node: 4+ cores, 16+ GB RAM + - Workers: 2+ cores, 8+ GB RAM + - Adjust based on workload + +2. **Network Configuration**: + - Ensure low latency between nodes + - Use high-speed interconnect if available + +3. **Job Arrays**: + - Use SGE job arrays for launching multiple identical workers + - More efficient than individual job submissions + +4. **Cleanup**: + - Always shutdown clusters when done + - Use `stop_ray_cluster.sh` or Ctrl+C in Go app + +5. **Logging**: + - Monitor logs for errors + - Keep logs for debugging + - Use unique log names for each cluster + +6. **Testing**: + - Start with small clusters (1-2 workers) + - Scale up after verifying connectivity + - Test with simple workloads first + +## Architecture Diagram + +``` +┌─────────────────────────────────────────────────────────┐ +│ SGE Cluster │ +│ │ +│ ┌──────────────┐ │ +│ │ SGE Qmaster │ (Job Scheduler) │ +│ └──────┬───────┘ │ +│ │ │ +│ │ Submits Jobs │ +│ │ │ +│ ┌────▼────────────────────────────────┐ │ +│ │ │ │ +│ ┌─▼────────────┐ ┌──────────────▼──┐ │ +│ │ Exec Host 1 │ │ Exec Host 2 │ │ +│ │ │ │ │ │ +│ │ ┌──────────┐ │ │ ┌──────────────┐│ │ +│ │ │ Ray Head │◄├─────────┼─┤ Ray Worker 1 ││ │ +│ │ │ (qsub) │ │ 6379 │ │ (qsub) ││ │ +│ │ └──────────┘ │ │ └──────────────┘│ │ +│ │ │ │ ┌──────────────┐│ │ +│ │ │◄────────┼─┤ Ray Worker 2 ││ │ +│ │ │ │ │ (qsub) ││ │ +│ └──────────────┘ │ └──────────────┘│ │ +│ └──────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────┘ + +User Application (Python/Go) + │ + │ ray.init() + ▼ + Ray Head ─────► Ray Workers + (Port 6379) +``` + +## Integration with Existing Workflows + +### Submitting Ray Jobs via SGE + +You can wrap Ray applications in SGE jobs: + +```bash +#!/bin/bash +#$ -N my-ray-job +#$ -cwd +#$ -j y + +# Connect to existing Ray cluster +export RAY_ADDRESS=":6379" + +# Run your application +python my_ray_app.py +``` + +### Auto-scaling (Advanced) + +For dynamic worker scaling based on load, you can: +1. Monitor Ray metrics +2. Use `qsub` to add workers +3. Use `qdel` to remove idle workers + +Example monitoring script: +```bash +# Check Ray load +ray status --address=$RAY_ADDRESS + +# Add workers if needed +if [ $LOAD -gt 80 ]; then + qsub -t 1-5 ... ray start --address=$RAY_ADDRESS +fi +``` + +## Further Reading + +- [Complete Integration Guide](../../docs/ray-integration.md) +- [Ray Documentation](https://docs.ray.io/) +- [SGE/Grid Engine](https://github.com/hpc-gridware/clusterscheduler) +- [go-clusterscheduler API](../../README.md) + +## Support + +For issues or questions: +- SGE Integration: Open an issue in this repository +- Ray Core: [Ray GitHub](https://github.com/ray-project/ray) +- Grid Engine: [Gridware Cluster Scheduler](https://github.com/hpc-gridware/clusterscheduler) diff --git a/examples/ray/example_ray_job.py b/examples/ray/example_ray_job.py new file mode 100755 index 0000000..dc5c888 --- /dev/null +++ b/examples/ray/example_ray_job.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +Example Ray application to run on SGE cluster + +This example demonstrates how to connect to a Ray cluster +running on SGE and execute distributed tasks. + +Usage: + 1. Start Ray cluster with start_ray_cluster.sh + 2. Get the Ray address from ray_cluster_info.txt + 3. Run: python example_ray_job.py --address :6379 +""" + +import argparse +import time +import ray +import numpy as np + + +@ray.remote +def compute_pi_sample(n_samples): + """ + Monte Carlo estimation of Pi using random sampling. + Each task generates n_samples random points. + Uses vectorized operations for better performance. + """ + # Generate random points efficiently using numpy + points = np.random.random((n_samples, 2)) + # Calculate distances from origin + distances_squared = np.sum(points**2, axis=1) + # Count points inside unit circle + inside_circle = np.sum(distances_squared <= 1) + return inside_circle + + +@ray.remote +def matrix_multiply(matrix_a, matrix_b): + """ + Simple matrix multiplication task. + """ + return np.dot(matrix_a, matrix_b) + + +@ray.remote +class Counter: + """ + Example Ray actor for stateful computation. + """ + def __init__(self): + self.count = 0 + + def increment(self): + self.count += 1 + return self.count + + def get_count(self): + return self.count + + +def estimate_pi(n_tasks=100, samples_per_task=1000000): + """ + Estimate Pi using distributed Monte Carlo sampling. + """ + print(f"\nEstimating Pi using {n_tasks} tasks...") + print(f"Each task samples {samples_per_task} points") + + start_time = time.time() + + # Submit tasks in parallel + futures = [compute_pi_sample.remote(samples_per_task) for _ in range(n_tasks)] + + # Gather results + results = ray.get(futures) + + # Calculate Pi estimate + total_inside = sum(results) + total_samples = n_tasks * samples_per_task + pi_estimate = 4.0 * total_inside / total_samples + + elapsed = time.time() - start_time + + print(f"Pi estimate: {pi_estimate:.6f}") + print(f"Actual Pi: {np.pi:.6f}") + print(f"Error: {abs(pi_estimate - np.pi):.6f}") + print(f"Time taken: {elapsed:.2f} seconds") + + return pi_estimate + + +def matrix_operations(): + """ + Demonstrate distributed matrix operations. + """ + print("\nPerforming distributed matrix operations...") + + # Create random matrices + size = 1000 + n_operations = 20 + + print(f"Matrix size: {size}x{size}") + print(f"Number of operations: {n_operations}") + + start_time = time.time() + + # Submit matrix multiplication tasks + futures = [] + for i in range(n_operations): + matrix_a = np.random.rand(size, size) + matrix_b = np.random.rand(size, size) + futures.append(matrix_multiply.remote(matrix_a, matrix_b)) + + # Wait for all results + results = ray.get(futures) + + elapsed = time.time() - start_time + + print(f"Completed {n_operations} matrix multiplications") + print(f"Time taken: {elapsed:.2f} seconds") + print(f"Throughput: {n_operations/elapsed:.2f} operations/second") + + +def actor_example(): + """ + Demonstrate Ray actors for stateful computation. + """ + print("\nDemonstrating Ray actors...") + + # Create multiple counter actors + counters = [Counter.remote() for _ in range(5)] + + # Increment each counter multiple times + futures = [] + for counter in counters: + for _ in range(10): + futures.append(counter.increment.remote()) + + # Wait for all increments + ray.get(futures) + + # Get final counts + counts = ray.get([counter.get_count.remote() for counter in counters]) + + print(f"Created {len(counters)} counter actors") + print(f"Final counts: {counts}") + print(f"Total count: {sum(counts)}") + + +def main(): + parser = argparse.ArgumentParser(description='Example Ray application on SGE') + parser.add_argument('--address', type=str, default='auto', + help='Ray cluster address (e.g., head_host:6379)') + parser.add_argument('--no-pi', action='store_true', + help='Skip Pi estimation') + parser.add_argument('--no-matrix', action='store_true', + help='Skip matrix operations') + parser.add_argument('--no-actor', action='store_true', + help='Skip actor example') + + args = parser.parse_args() + + print("=" * 60) + print("Ray on SGE - Example Application") + print("=" * 60) + + # Connect to Ray cluster + print(f"\nConnecting to Ray cluster at: {args.address}") + + try: + ray.init(address=args.address, ignore_reinit_error=True) + except Exception as e: + print(f"Error connecting to Ray cluster: {e}") + print("\nMake sure:") + print("1. Ray cluster is running (check ray_cluster_info.txt)") + print("2. You're on a host that can connect to the head node") + print("3. The address is correct (format: hostname:port)") + return 1 + + # Display cluster info + print("\nCluster Information:") + print(f"Available resources: {ray.available_resources()}") + print(f"Cluster nodes: {len(ray.nodes())}") + + # Run examples + try: + if not args.no_pi: + estimate_pi(n_tasks=50, samples_per_task=1000000) + + if not args.no_matrix: + matrix_operations() + + if not args.no_actor: + actor_example() + + print("\n" + "=" * 60) + print("All tasks completed successfully!") + print("=" * 60) + + except Exception as e: + print(f"\nError during execution: {e}") + return 1 + + finally: + # Note: Don't shutdown Ray if using an existing cluster + # ray.shutdown() + pass + + return 0 + + +if __name__ == '__main__': + exit(main()) diff --git a/examples/ray/ray_cluster_manager.go b/examples/ray/ray_cluster_manager.go new file mode 100644 index 0000000..0f0b5f0 --- /dev/null +++ b/examples/ray/ray_cluster_manager.go @@ -0,0 +1,301 @@ +/*___INFO__MARK_BEGIN__*/ +/************************************************************************* +* Copyright 2024 HPC-Gridware GmbH +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +************************************************************************/ +/*___INFO__MARK_END__*/ + +package main + +import ( + "context" + "fmt" + "os" + "os/exec" + "os/signal" + "strings" + "syscall" + "time" + + qstat "github.com/hpc-gridware/go-clusterscheduler/pkg/qstat/v9.0" + qsub "github.com/hpc-gridware/go-clusterscheduler/pkg/qsub/v9.0" +) + +// RayClusterManager manages a Ray cluster using SGE as the backend scheduler +type RayClusterManager struct { + qsub qsub.Qsub + qstat qstat.QStat + headJobID int64 + workerJobs []int64 +} + +// NewRayClusterManager creates a new RayClusterManager +func NewRayClusterManager() (*RayClusterManager, error) { + qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{}) + if err != nil { + return nil, fmt.Errorf("failed to create qsub client: %w", err) + } + + qst, err := qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{}) + if err != nil { + return nil, fmt.Errorf("failed to create qstat client: %w", err) + } + + return &RayClusterManager{ + qsub: qs, + qstat: qst, + workerJobs: make([]int64, 0), + }, nil +} + +// StartHead starts the Ray head node as an SGE job +func (r *RayClusterManager) StartHead(ctx context.Context, port int, memory string) (int64, error) { + rayCommand := fmt.Sprintf("ray start --head --port=%d --dashboard-host=0.0.0.0 --block", port) + + jobId, _, err := r.qsub.Submit(ctx, qsub.JobOptions{ + Command: "bash", + CommandArgs: []string{"-c", rayCommand}, + Binary: qsub.ToPtr(true), + JobName: qsub.ToPtr("ray-head"), + StdOut: []string{"ray_head.log"}, + StdErr: []string{"ray_head.err"}, + ScopedResources: qsub.SimpleLRequest(map[string]string{"mem_free": memory}), + Shell: qsub.ToPtr(false), + MergeStdOutErr: qsub.ToPtr(true), + }) + + if err != nil { + return 0, fmt.Errorf("failed to submit head node job: %w", err) + } + + r.headJobID = jobId + fmt.Printf("✓ Ray head node submitted with job ID: %d\n", jobId) + fmt.Printf(" Monitor with: qstat -j %d\n", jobId) + fmt.Printf(" View logs: tail -f ray_head.log\n") + return jobId, nil +} + +// GetHeadNodeAddress retrieves the hostname where the head node is running +func (r *RayClusterManager) GetHeadNodeAddress(ctx context.Context) (string, error) { + if r.headJobID == 0 { + return "", fmt.Errorf("head node not started") + } + + // Poll for the job to be running + for i := 0; i < 30; i++ { + jobs, err := r.qstat.ViewJobsOfUser([]string{}) + if err != nil { + return "", fmt.Errorf("failed to get job info: %w", err) + } + + for _, job := range jobs { + if job.JobID == int(r.headJobID) && job.State == "r" { + // Extract hostname from queue info + // Format is typically: queue@hostname + if job.Queue != "" { + // Extract hostname from queue@hostname format + parts := strings.Split(job.Queue, "@") + if len(parts) >= 2 { + return parts[1], nil + } + return job.Queue, nil + } + } + } + + time.Sleep(2 * time.Second) + } + + return "", fmt.Errorf("timeout waiting for head node to start") +} + +// AddWorkers adds Ray worker nodes as SGE jobs +func (r *RayClusterManager) AddWorkers(ctx context.Context, count int, headAddress string, port int, memory string) error { + rayAddress := fmt.Sprintf("%s:%d", headAddress, port) + rayCommand := fmt.Sprintf("ray start --address=%s --block", rayAddress) + + jobId, _, err := r.qsub.Submit(ctx, qsub.JobOptions{ + Command: "bash", + CommandArgs: []string{"-c", rayCommand}, + Binary: qsub.ToPtr(true), + JobName: qsub.ToPtr("ray-worker"), + StdOut: []string{"ray_worker.$TASK_ID.log"}, + StdErr: []string{"ray_worker.$TASK_ID.err"}, + JobArray: qsub.ToPtr(fmt.Sprintf("1-%d", count)), + ScopedResources: qsub.SimpleLRequest(map[string]string{"mem_free": memory}), + Shell: qsub.ToPtr(false), + MergeStdOutErr: qsub.ToPtr(true), + }) + + if err != nil { + return fmt.Errorf("failed to submit worker jobs: %w", err) + } + + r.workerJobs = append(r.workerJobs, jobId) + fmt.Printf("✓ Submitted %d Ray workers with job array ID: %d\n", count, jobId) + fmt.Printf(" Monitor with: qstat -j %d\n", jobId) + return nil +} + +// GetClusterStatus returns the status of all Ray cluster jobs +func (r *RayClusterManager) GetClusterStatus(ctx context.Context) error { + jobs, err := r.qstat.ViewJobsOfUser([]string{}) + if err != nil { + return fmt.Errorf("failed to get job info: %w", err) + } + + fmt.Println("\n=== Ray Cluster Status ===") + + // Head node status + if r.headJobID != 0 { + found := false + for _, job := range jobs { + if job.JobID == int(r.headJobID) { + fmt.Printf("Head Node (Job %d): State=%s, Queue=%s\n", + r.headJobID, job.State, job.Queue) + found = true + break + } + } + if !found { + fmt.Printf("Head Node (Job %d): Completed or not found\n", r.headJobID) + } + } + + // Worker status + for _, workerJobID := range r.workerJobs { + workerCount := 0 + for _, job := range jobs { + if job.JobID == int(workerJobID) { + workerCount++ + } + } + if workerCount > 0 { + fmt.Printf("Workers (Job Array %d): %d tasks running\n", workerJobID, workerCount) + } + } + + fmt.Println("=========================\n") + return nil +} + +// Shutdown terminates all Ray cluster jobs using qdel command +func (r *RayClusterManager) Shutdown(ctx context.Context) error { + fmt.Println("\nShutting down Ray cluster...") + + // Delete worker jobs first + for _, jobId := range r.workerJobs { + cmd := exec.Command("qdel", fmt.Sprintf("%d", jobId)) + if err := cmd.Run(); err != nil { + fmt.Printf("⚠ Warning: failed to delete worker job %d: %v\n", jobId, err) + } else { + fmt.Printf("✓ Deleted worker job array %d\n", jobId) + } + } + + // Delete head node + if r.headJobID != 0 { + cmd := exec.Command("qdel", fmt.Sprintf("%d", r.headJobID)) + if err := cmd.Run(); err != nil { + fmt.Printf("⚠ Warning: failed to delete head job %d: %v\n", r.headJobID, err) + } else { + fmt.Printf("✓ Deleted head node job %d\n", r.headJobID) + } + } + + fmt.Println("✓ Ray cluster shutdown complete") + return nil +} + +func main() { + // Configuration + const ( + rayPort = 6379 + headMemory = "16G" + workerMemory = "8G" + workerCount = 5 + ) + + ctx := context.Background() + + // Create cluster manager + manager, err := NewRayClusterManager() + if err != nil { + fmt.Printf("Error creating cluster manager: %v\n", err) + os.Exit(1) + } + + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Start head node + fmt.Println("Starting Ray cluster...") + headJobID, err := manager.StartHead(ctx, rayPort, headMemory) + if err != nil { + fmt.Printf("Error starting head node: %v\n", err) + os.Exit(1) + } + + // Wait for head node to be scheduled and get its address + fmt.Println("\nWaiting for head node to start...") + time.Sleep(10 * time.Second) + + headAddress, err := manager.GetHeadNodeAddress(ctx) + if err != nil { + fmt.Printf("Error getting head node address: %v\n", err) + fmt.Println("\nPlease manually determine the head node hostname:") + fmt.Printf(" 1. Run: qstat -j %d\n", headJobID) + fmt.Println(" 2. Look for 'exec_host' line and note the hostname") + fmt.Println(" 3. Re-run this tool or manually submit workers") + manager.Shutdown(ctx) + os.Exit(1) + } else { + fmt.Printf("✓ Head node running at: %s:%d\n", headAddress, rayPort) + } + + // Start workers + fmt.Println("\nStarting Ray workers...") + if err := manager.AddWorkers(ctx, workerCount, headAddress, rayPort, workerMemory); err != nil { + fmt.Printf("Error adding workers: %v\n", err) + manager.Shutdown(ctx) + os.Exit(1) + } + + // Display status + time.Sleep(5 * time.Second) + manager.GetClusterStatus(ctx) + + // Display connection information + fmt.Println("=== Ray Cluster Information ===") + fmt.Printf("Head node job ID: %d\n", headJobID) + fmt.Printf("Ray address: %s:%d\n", headAddress, rayPort) + fmt.Printf("Dashboard: http://%s:8265\n", headAddress) + fmt.Println("\nTo connect from Python:") + fmt.Println(" import ray") + fmt.Printf(" ray.init(address='%s:%d')\n", headAddress, rayPort) + fmt.Println("\nPress Ctrl+C to shutdown the cluster...") + fmt.Println("================================\n") + + // Wait for shutdown signal + <-sigChan + fmt.Println("\nReceived shutdown signal...") + + // Shutdown cluster + if err := manager.Shutdown(ctx); err != nil { + fmt.Printf("Error during shutdown: %v\n", err) + os.Exit(1) + } +} diff --git a/examples/ray/requirements.txt b/examples/ray/requirements.txt new file mode 100644 index 0000000..079d410 --- /dev/null +++ b/examples/ray/requirements.txt @@ -0,0 +1,3 @@ +# Python requirements for Ray on SGE examples +ray[default]>=2.0.0 +numpy>=1.20.0 diff --git a/examples/ray/start_ray_cluster.sh b/examples/ray/start_ray_cluster.sh new file mode 100755 index 0000000..dfb27e2 --- /dev/null +++ b/examples/ray/start_ray_cluster.sh @@ -0,0 +1,118 @@ +#!/bin/bash +# +# Script to start a Ray cluster on SGE +# Usage: ./start_ray_cluster.sh [num_workers] +# + +set -e + +NUM_WORKERS=${1:-5} +RAY_PORT=6379 +HEAD_MEMORY="16G" +WORKER_MEMORY="8G" + +echo "Starting Ray cluster with $NUM_WORKERS workers..." + +# Submit Ray head node +echo "Submitting Ray head node..." +HEAD_JOB_OUTPUT=$(qsub -b y -cwd -j y -o ray_head.log -N ray-head \ + -l mem_free=$HEAD_MEMORY \ + ray start --head --port=$RAY_PORT --dashboard-host=0.0.0.0 --block 2>&1) + +# Extract job ID - try multiple patterns for different SGE versions +# Use POSIX-compliant regex for better portability +HEAD_JOB_ID=$(echo "$HEAD_JOB_OUTPUT" | sed -n 's/.*Your job \([0-9][0-9]*\).*/\1/p' | head -1) +if [ -z "$HEAD_JOB_ID" ]; then + HEAD_JOB_ID=$(echo "$HEAD_JOB_OUTPUT" | sed -n 's/.*Your job-ID is \([0-9][0-9]*\).*/\1/p' | head -1) +fi +if [ -z "$HEAD_JOB_ID" ]; then + HEAD_JOB_ID=$(echo "$HEAD_JOB_OUTPUT" | grep -o '[0-9][0-9]*' | head -1) +fi + +if [ -z "$HEAD_JOB_ID" ]; then + echo "Error: Could not extract job ID from qsub output:" + echo "$HEAD_JOB_OUTPUT" + exit 1 +fi + +echo "Head node submitted with job ID: $HEAD_JOB_ID" +echo "Waiting for head node to start..." + +# Wait for head node to start +sleep 15 + +# Get head node hostname +HEAD_HOST="" +MAX_ATTEMPTS=30 +ATTEMPT=0 + +while [ $ATTEMPT -lt $MAX_ATTEMPTS ]; do + HEAD_HOST=$(qstat -j $HEAD_JOB_ID 2>/dev/null | grep "exec_host" | awk -F'[@:]' '{print $2}' || echo "") + + if [ ! -z "$HEAD_HOST" ]; then + echo "Head node running on: $HEAD_HOST" + break + fi + + ATTEMPT=$((ATTEMPT + 1)) + sleep 2 +done + +if [ -z "$HEAD_HOST" ]; then + echo "Error: Could not determine head node hostname" + echo "Check the job status with: qstat -j $HEAD_JOB_ID" + exit 1 +fi + +# Submit Ray workers as job array +echo "Submitting $NUM_WORKERS Ray workers..." +WORKER_JOB_OUTPUT=$(qsub -b y -cwd -j y -o ray_worker.\$TASK_ID.log -N ray-worker \ + -t 1-$NUM_WORKERS \ + -l mem_free=$WORKER_MEMORY \ + ray start --address=$HEAD_HOST:$RAY_PORT --block 2>&1) + +# Extract job ID - try multiple patterns for different SGE versions +# Use POSIX-compliant regex for better portability +WORKER_JOB_ID=$(echo "$WORKER_JOB_OUTPUT" | sed -n 's/.*Your job-array \([0-9][0-9]*\).*/\1/p' | head -1) +if [ -z "$WORKER_JOB_ID" ]; then + WORKER_JOB_ID=$(echo "$WORKER_JOB_OUTPUT" | sed -n 's/.*Your job \([0-9][0-9]*\).*/\1/p' | head -1) +fi +if [ -z "$WORKER_JOB_ID" ]; then + WORKER_JOB_ID=$(echo "$WORKER_JOB_OUTPUT" | grep -o '[0-9][0-9]*' | head -1) +fi + +if [ -z "$WORKER_JOB_ID" ]; then + echo "Warning: Could not extract worker job ID from qsub output:" + echo "$WORKER_JOB_OUTPUT" + echo "Workers may not have been submitted correctly." +fi + +echo "Workers submitted with job array ID: $WORKER_JOB_ID" + +# Save cluster information +cat > ray_cluster_info.txt < # to delete specific jobs" + exit 1 +fi + +# Extract job IDs from cluster info +HEAD_JOB_ID=$(grep "Head Job ID:" ray_cluster_info.txt | awk '{print $NF}') +WORKER_JOB_ID=$(grep "Worker Job Array ID:" ray_cluster_info.txt | awk '{print $NF}') + +echo "Shutting down Ray cluster..." +echo "Head Job ID: $HEAD_JOB_ID" +echo "Worker Job Array ID: $WORKER_JOB_ID" + +# Delete worker jobs +if [ ! -z "$WORKER_JOB_ID" ]; then + echo "Deleting worker jobs..." + qdel $WORKER_JOB_ID 2>/dev/null || echo "Worker jobs already deleted or not found" +fi + +# Delete head job +if [ ! -z "$HEAD_JOB_ID" ]; then + echo "Deleting head job..." + qdel $HEAD_JOB_ID 2>/dev/null || echo "Head job already deleted or not found" +fi + +echo "Ray cluster shutdown complete" +echo "" +echo "Cluster information archived to: ray_cluster_info.txt.bak" +mv ray_cluster_info.txt ray_cluster_info.txt.bak diff --git a/examples/resources/go.sum b/examples/resources/go.sum index c4863de..632ba28 100644 --- a/examples/resources/go.sum +++ b/examples/resources/go.sum @@ -12,10 +12,10 @@ github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/examples/testexample/go.sum b/examples/testexample/go.sum index dddf382..1e024f1 100644 --- a/examples/testexample/go.sum +++ b/examples/testexample/go.sum @@ -26,10 +26,10 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= diff --git a/go.mod b/go.mod index d1d867a..5f88139 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.23.1 require ( github.com/goccy/go-json v0.10.4 - github.com/jackc/pgx/v5 v5.7.5 github.com/mark3labs/mcp-go v0.33.0 github.com/onsi/ginkgo/v2 v2.22.0 github.com/onsi/gomega v1.36.1 @@ -27,17 +26,12 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect github.com/google/uuid v1.6.0 // indirect - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/spf13/cast v1.7.1 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel/metric v1.33.0 // indirect go.opentelemetry.io/otel/trace v1.33.0 // indirect - golang.org/x/crypto v0.37.0 // indirect golang.org/x/net v0.34.0 // indirect - golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect golang.org/x/tools v0.29.0 // indirect diff --git a/go.sum b/go.sum index c7ec237..eeaae6e 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,3 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -18,14 +17,6 @@ github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgY github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= -github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs= -github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= -github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= -github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -42,9 +33,6 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= @@ -73,14 +61,10 @@ go.opentelemetry.io/otel/sdk/metric v1.33.0 h1:Gs5VK9/WUJhNXZgn8MR6ITatvAmKeIuCt go.opentelemetry.io/otel/sdk/metric v1.33.0/go.mod h1:dL5ykHZmm1B1nVRk9dDjChwDmt81MjVp3gLkQRwKf/Q= go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= -golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= -golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= -golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= @@ -92,6 +76,5 @@ google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=