Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 61 additions & 48 deletions RFDs/0001-job-worker-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,32 @@ To showcase a secure and efficient approach to running and managing arbitrary pr
### Components

1. **Library**: A reusable job management library
2. **API**: A gRPC API server
3. **Client**: A command-line interface (CLI) client
2. **API**: A gRPC API server that wraps the functionality of the library
3. **Client**: A command-line interface (CLI) client to interact with the API via gRPC

### Key Features

- Job management (start, stop, query status, get output)
- Real-time output streaming with support for multiple concurrent clients
- Resource control using cgroups (CPU, Memory, Disk I/O)
- Secure communication using mTLS
- Simple role-based authorization scheme
- Simple authorization scheme: Users can only access jobs they created
- Rudimentary process isolation will be provided using PID namespace so that a job is not able to access other jobs
- gRPC API for client-server communication

#### Key Considerations
- This design is not a distributed system and is only optimized to run on a single linux OS
- If needed, the components of the library can be split into a distributed microservices framework inthe future
- The machine has enough resources to run all the jobs
- There is enough memory for stdout and stderr from all jobs
- This is as secure as we can make it, but without the following features:
- Jobs' filesystem access could be controlled by creating a pivot root and bind mounts to mount commonly needed directories like "/bin", "/lib", "/lib64", "/usr" etc. Not implenting this in the interest of time.
- Jobs are run as soon as they are received. No Scheduling is done
- cgroups v2 is supported by the the system
- Resource monitoring is out of scope
- Job timeouts are not implemented
- Automated sercets generation and renewal will not be implented. For this PoC, secrets and keys will be pregenerated
- Commands will not be sanitized for this PoC

## Detailed Design

Expand All @@ -41,38 +52,49 @@ To showcase a secure and efficient approach to running and managing arbitrary pr
#### a. In-memory Job Logger
- Implement a thread-safe buffer to store output from runnning the command
- Use a read-write mutex to ensure thread-safety for concurrent read/write operations
- Provide methods for adding logs, and getting a stream Reader to the log
- Provide methods for adding logs, and getting threadsafe and efficient readers to the log

```go
// Logger represents an in-memory buffer for storing job logs.

type Logger struct {
mu sync.RWMutex
logs bytes.Buffer
// This condition variable will be used to signal readers when new data is available
cond *sync.Cond
logs *bytes.Buffer
}

// NewLogger creates a new Logger instance
func NewLogger() *Logger

// AddLog appends new output from processes to the logs buffer
func (l *Logger) AddLog(output []byte)
// Write appends new output from processes to the logs buffer
// This conforms to io.Writer
func (l *Logger) Write(p []byte) (n int, err error)

// Gets an instance of io.Reader that is able to read the data from the internal logs buffer
// This is basically a logReader that conforms to io.Reader
// Since io.Reader is read only, it will prevent the readers from modifying the logs buffer
func (l *Logger) GetLogReader() io.Reader

// Size returns the current size of the logs buffer
func (l *Logger) Size() int
// logReader is a thread-safe reader for the Logger buffer.
type logReader struct {
l *Logger
offset int
}
// logReader creates a new logReader. Internally, it points to the Logger instance
// and accesses the same buffer.
func logReader(l *Logger) *logReader

// Read reads data from the buffer in a thread-safe manner.
// It conforms to io.Reader
func (lr *logReader) Read(p []byte) (n int, err error)

```

#### b. Job Executor
- Use `os/exec` package to create and manage child processes
- Implement non-blocking I/O for stdout/stderr capture using `io.Pipe()`
- Use a separate goroutine for each of stdout and stderr to prevent blocking
- Implement a context-based cancellation mechanism for graceful job termination
- Monitor resource usage using cgroups
- Provide callbacks for resource usage updates
- Use a Go channel to periodically read current resource usage (memory, I/O, and CPU usage) from the associated cgroup

```go
// Executor is responsible for running a job and managing its lifecycle.
Expand All @@ -92,7 +114,8 @@ type Executor struct {
Command string
Args []string
cmd *exec.Cmd
logger *logger.Logger
outputLogger *logger.Logger
errorLogger *logger.Logger
resourceManager *resourcemanager.ResourceManager
// Context to propagate state and cancellations
ctx context.Context
Expand All @@ -116,9 +139,11 @@ func (e *Executor) Stop() error
// Get status of the job.
func (e *Executor) GetStatus() JobStatus

// Access the logger for the job
// Can be used to subscribe to output logs
func (e *Executor) GetLogger() *logger.Logger
// Access the stdout for the job
func (e *Executor) GetOutputReader() io.Reader

// Access the stderr for the job
func (e *Executor) GetErrorReader() io.Reader

func (e *Executor) GetStartTime() time.Time

Expand All @@ -141,20 +166,17 @@ func (e *Executor) Wait() error
- Create and manage cgroups for a single job
- Set and enforce resource limits
- Implement resource allocation strategies:
- Use CPU shares for fair CPU allocation
- Set memory limits with both soft and hard limits
- Use CPU weight for CPU allocation
- Set memory limits
- Configure I/O weight for disk I/O prioritization
```go
// ResourceManager monitors and controls resource usage for a job.

// ResourceLimits defines the resource constraints for a job.
type ResourceLimits struct {
CPUShares uint64 // CPU shares (relative weight)
MemoryLimit uint64 // Memory limit in bytes
IOReadBPS uint64 // I/O read rate limit in bytes per second
IOWriteBPS uint64 // I/O write rate limit in bytes per second
IOReadIOPS uint64 // I/O read rate limit in operations per second
IOWriteIOPS uint64 // I/O write rate limit in operations per second
CPUWeight uint64 // CPU weight (range of 1-10000)
MemoryLimit uint64 // Memory limit in bytes
IOMaxBPS uint64 // I/O maximum bandwidth rate in bytes per second
}

// ResourceManager represents the resources associated with a single job.
Expand Down Expand Up @@ -199,7 +221,7 @@ service JobWorker {
rpc GetJobStatus(JobId) returns (JobStatusResponse) {}

// Streams the output of a job.
rpc GetJobOutput(stream JobOutputResquest) returns (stream JobOutputResponse) {}
rpc GetJobOutput(JobOutputRequest) returns (stream JobOutputResponse) {}
}

// StartJobRequest contains the information needed to start a new job.
Expand Down Expand Up @@ -247,9 +269,10 @@ message StopJobResponse {
message JobStatusResponse {
// The current status of the job.
JobStatus status = 1;
int32 errorcode = 2;
string error = 3;
}


// JobOutputRequest requests to the number of bytes to read from the output of a job.
// The server will keep streaming bytes_to_read number of bytes, until the client sends
// another request to change the number of bytes_to_read
Expand All @@ -258,10 +281,12 @@ message JobOutputRequest {
unit64 bytes_to_read = 2;
}

// JobOutputResponse contains a chunk of output data from a job.
// JobOutputResponse contains a chunk of output data from a job that can either be a
message JobOutputResponse {
// output data from the job.
bytes data = 1;
oneof output {
bytes stdout = 1;
bytes stderr = 2;
}
}

// ResourceLimits defines the resource constraints for a job.
Expand Down Expand Up @@ -339,9 +364,9 @@ Job ID: 550e8400-e29b-41d4-a716-446655440000
4. JobExecutor passes cgroup path to ResourceManager to set limits
5. ResourceManager sets resource limits
6. Executor runs job within cgroup constraints
7. Output and resource usage are streamed to logger
7. StdOut and StdErr are streamed to loggers
8. Client requests output log stream
9. Logger streams logs to client(s)
9. Loggers stream logs to client(s)

Here's a visual representation of the job creation flow:
![alt text](job_worker.png)
Expand Down Expand Up @@ -372,34 +397,22 @@ All jobs are run using cancellable contexts.

1. **Process Termination**: Implement forceful termination for unresponsive processes. Kill processes that don't respond to standard termination signals.

2. **Resource Exhaustion**:
- Monitor total resource usage and implement OOM killer
- Keep track of total resource usage and only start processes from the queue if there are enough resources
- Configure resources at the root level cgroup
- Implement a hard limit by enabling the OOM killer, with a configurable percentage of the input Resource Limit

3. **Network Interruptions**:
2. **Network Interruptions**:
- Ensure graceful handling of disconnections and reconnections
- Stream logs from the beginning each time a streaming connection is established, as logs are stored in the Logger module

4. **Large Output Streams**:
3. **Large Output Streams**:
- Consider persistent storage for large outputs
- For this PoC, assume infinite memory, but note the need for a storage solution in production

5. **Concurrent Access**:
4. **Concurrent Access**:
- Use in-memory logger to support multiple clients
- Stream all output from the job to each client through goroutines when initiating the streaming gRPC to retrieve output

6. **Job Timeouts**: Implement configurable job timeouts

7. **Invalid Commands**: Consider input sanitization and process sandboxing (future enhancement)

8. **Certificate Expiration**: Implement certificate renewal policy (future enhancement)

## Future Enhancements

- Distributed architecture for high availability
- Container-based job isolation
- Better Job isolation using pivot root and bind mounts, and namespaces
- Enhanced security measures (e.g., input sanitization, process sandboxing)
- Certificate renewal automation
- Persistent storage solution for large output streams
Expand Down