diff --git a/RFDs/0001-job-worker-service.md b/RFDs/0001-job-worker-service.md index b95029b..34e841f 100644 --- a/RFDs/0001-job-worker-service.md +++ b/RFDs/0001-job-worker-service.md @@ -18,8 +18,8 @@ To showcase a secure and efficient approach to running and managing arbitrary pr ### Components 1. **Library**: A reusable job management library -2. **API**: A gRPC API server -3. **Client**: A command-line interface (CLI) client +2. **API**: A gRPC API server that wraps the functionality of the library +3. **Client**: A command-line interface (CLI) client to interact with the API via gRPC ### Key Features @@ -27,12 +27,23 @@ To showcase a secure and efficient approach to running and managing arbitrary pr - Real-time output streaming with support for multiple concurrent clients - Resource control using cgroups (CPU, Memory, Disk I/O) - Secure communication using mTLS -- Simple role-based authorization scheme +- Simple authorization scheme: Users can only access jobs they created +- Rudimentary process isolation will be provided using PID namespace so that a job is not able to access other jobs - gRPC API for client-server communication #### Key Considerations - This design is not a distributed system and is only optimized to run on a single linux OS - If needed, the components of the library can be split into a distributed microservices framework inthe future +- The machine has enough resources to run all the jobs +- There is enough memory for stdout and stderr from all jobs +- This is as secure as we can make it, but without the following features: + - Jobs' filesystem access could be controlled by creating a pivot root and bind mounts to mount commonly needed directories like "/bin", "/lib", "/lib64", "/usr" etc. Not implenting this in the interest of time. +- Jobs are run as soon as they are received. No Scheduling is done +- cgroups v2 is supported by the the system +- Resource monitoring is out of scope +- Job timeouts are not implemented +- Automated sercets generation and renewal will not be implented. For this PoC, secrets and keys will be pregenerated +- Commands will not be sanitized for this PoC ## Detailed Design @@ -41,38 +52,49 @@ To showcase a secure and efficient approach to running and managing arbitrary pr #### a. In-memory Job Logger - Implement a thread-safe buffer to store output from runnning the command - Use a read-write mutex to ensure thread-safety for concurrent read/write operations -- Provide methods for adding logs, and getting a stream Reader to the log +- Provide methods for adding logs, and getting threadsafe and efficient readers to the log ```go // Logger represents an in-memory buffer for storing job logs. type Logger struct { mu sync.RWMutex - logs bytes.Buffer + // This condition variable will be used to signal readers when new data is available + cond *sync.Cond + logs *bytes.Buffer } // NewLogger creates a new Logger instance func NewLogger() *Logger -// AddLog appends new output from processes to the logs buffer -func (l *Logger) AddLog(output []byte) +// Write appends new output from processes to the logs buffer +// This conforms to io.Writer +func (l *Logger) Write(p []byte) (n int, err error) // Gets an instance of io.Reader that is able to read the data from the internal logs buffer +// This is basically a logReader that conforms to io.Reader +// Since io.Reader is read only, it will prevent the readers from modifying the logs buffer func (l *Logger) GetLogReader() io.Reader -// Size returns the current size of the logs buffer -func (l *Logger) Size() int +// logReader is a thread-safe reader for the Logger buffer. +type logReader struct { + l *Logger + offset int +} +// logReader creates a new logReader. Internally, it points to the Logger instance +// and accesses the same buffer. +func logReader(l *Logger) *logReader + +// Read reads data from the buffer in a thread-safe manner. +// It conforms to io.Reader +func (lr *logReader) Read(p []byte) (n int, err error) ``` #### b. Job Executor - Use `os/exec` package to create and manage child processes - Implement non-blocking I/O for stdout/stderr capture using `io.Pipe()` -- Use a separate goroutine for each of stdout and stderr to prevent blocking - Implement a context-based cancellation mechanism for graceful job termination -- Monitor resource usage using cgroups -- Provide callbacks for resource usage updates -- Use a Go channel to periodically read current resource usage (memory, I/O, and CPU usage) from the associated cgroup ```go // Executor is responsible for running a job and managing its lifecycle. @@ -92,7 +114,8 @@ type Executor struct { Command string Args []string cmd *exec.Cmd - logger *logger.Logger + outputLogger *logger.Logger + errorLogger *logger.Logger resourceManager *resourcemanager.ResourceManager // Context to propagate state and cancellations ctx context.Context @@ -116,9 +139,11 @@ func (e *Executor) Stop() error // Get status of the job. func (e *Executor) GetStatus() JobStatus -// Access the logger for the job -// Can be used to subscribe to output logs -func (e *Executor) GetLogger() *logger.Logger +// Access the stdout for the job +func (e *Executor) GetOutputReader() io.Reader + +// Access the stderr for the job +func (e *Executor) GetErrorReader() io.Reader func (e *Executor) GetStartTime() time.Time @@ -141,20 +166,17 @@ func (e *Executor) Wait() error - Create and manage cgroups for a single job - Set and enforce resource limits - Implement resource allocation strategies: - - Use CPU shares for fair CPU allocation - - Set memory limits with both soft and hard limits + - Use CPU weight for CPU allocation + - Set memory limits - Configure I/O weight for disk I/O prioritization ```go // ResourceManager monitors and controls resource usage for a job. // ResourceLimits defines the resource constraints for a job. type ResourceLimits struct { - CPUShares uint64 // CPU shares (relative weight) - MemoryLimit uint64 // Memory limit in bytes - IOReadBPS uint64 // I/O read rate limit in bytes per second - IOWriteBPS uint64 // I/O write rate limit in bytes per second - IOReadIOPS uint64 // I/O read rate limit in operations per second - IOWriteIOPS uint64 // I/O write rate limit in operations per second + CPUWeight uint64 // CPU weight (range of 1-10000) + MemoryLimit uint64 // Memory limit in bytes + IOMaxBPS uint64 // I/O maximum bandwidth rate in bytes per second } // ResourceManager represents the resources associated with a single job. @@ -199,7 +221,7 @@ service JobWorker { rpc GetJobStatus(JobId) returns (JobStatusResponse) {} // Streams the output of a job. - rpc GetJobOutput(stream JobOutputResquest) returns (stream JobOutputResponse) {} + rpc GetJobOutput(JobOutputRequest) returns (stream JobOutputResponse) {} } // StartJobRequest contains the information needed to start a new job. @@ -247,9 +269,10 @@ message StopJobResponse { message JobStatusResponse { // The current status of the job. JobStatus status = 1; + int32 errorcode = 2; + string error = 3; } - // JobOutputRequest requests to the number of bytes to read from the output of a job. // The server will keep streaming bytes_to_read number of bytes, until the client sends // another request to change the number of bytes_to_read @@ -258,10 +281,12 @@ message JobOutputRequest { unit64 bytes_to_read = 2; } -// JobOutputResponse contains a chunk of output data from a job. +// JobOutputResponse contains a chunk of output data from a job that can either be a message JobOutputResponse { - // output data from the job. - bytes data = 1; + oneof output { + bytes stdout = 1; + bytes stderr = 2; + } } // ResourceLimits defines the resource constraints for a job. @@ -339,9 +364,9 @@ Job ID: 550e8400-e29b-41d4-a716-446655440000 4. JobExecutor passes cgroup path to ResourceManager to set limits 5. ResourceManager sets resource limits 6. Executor runs job within cgroup constraints -7. Output and resource usage are streamed to logger +7. StdOut and StdErr are streamed to loggers 8. Client requests output log stream -9. Logger streams logs to client(s) +9. Loggers stream logs to client(s) Here's a visual representation of the job creation flow: ![alt text](job_worker.png) @@ -372,34 +397,22 @@ All jobs are run using cancellable contexts. 1. **Process Termination**: Implement forceful termination for unresponsive processes. Kill processes that don't respond to standard termination signals. -2. **Resource Exhaustion**: - - Monitor total resource usage and implement OOM killer - - Keep track of total resource usage and only start processes from the queue if there are enough resources - - Configure resources at the root level cgroup - - Implement a hard limit by enabling the OOM killer, with a configurable percentage of the input Resource Limit - -3. **Network Interruptions**: +2. **Network Interruptions**: - Ensure graceful handling of disconnections and reconnections - Stream logs from the beginning each time a streaming connection is established, as logs are stored in the Logger module -4. **Large Output Streams**: +3. **Large Output Streams**: - Consider persistent storage for large outputs - For this PoC, assume infinite memory, but note the need for a storage solution in production -5. **Concurrent Access**: +4. **Concurrent Access**: - Use in-memory logger to support multiple clients - Stream all output from the job to each client through goroutines when initiating the streaming gRPC to retrieve output -6. **Job Timeouts**: Implement configurable job timeouts - -7. **Invalid Commands**: Consider input sanitization and process sandboxing (future enhancement) - -8. **Certificate Expiration**: Implement certificate renewal policy (future enhancement) - ## Future Enhancements - Distributed architecture for high availability -- Container-based job isolation +- Better Job isolation using pivot root and bind mounts, and namespaces - Enhanced security measures (e.g., input sanitization, process sandboxing) - Certificate renewal automation - Persistent storage solution for large output streams