-
Notifications
You must be signed in to change notification settings - Fork 7
cancel jobs #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cancel jobs #64
Conversation
Wiz Scan Summary
To detect these findings earlier in the dev lifecycle, try using Wiz Code VS Code Extension. |
internal/pkg/heimdall/job.go
Outdated
| } | ||
|
|
||
| // make sure we have a job object | ||
| job, ok := currentJob.(*job.Job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good reason for all of the endpoints to be returning generics? We wouldn't need to do a type check here if cancelJob expected a job object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Go does not support Liskov Substitution for return types in the same way Java does with covariant return types.
Go does not allow this- the method signature must exactly match.
|
|
||
| // Sleep until next poll time | ||
| time.Sleep(time.Duration(execCtx.PollingInterval)) | ||
| // Check for cancellation or sleep until next poll time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I picked the area where ECS plugin spends the most time (status polling) to implement the cancellation check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Termination and clean up should be separated. Plugin shouldn't care about termination.
It should be handled here func (h *Heimdall) runAsyncJob(ctx context.Context, j *job.Job) error { for all plugins.
Plugin should be changed from the type function to type interface with 2 functions(terminate and handle.
Our major plugins, Spark, ECS, Trino, Clickhouse allows this feature implementation.
Handle function should rely on all libraries and believe that library is context aware and respect of context cancelation.
What does it mean for PR?
- type Handler func(context.Context, *Runtime, *job.Job, *cluster.Cluster) error ->
Handle (context.Context, *Runtime, *job.Job, *cluster.Cluster) error
Terminate(context.Context, *Runtime, *job.Job, *cluster.Cluster) error
}
- Handle function doesn't contains
select {
case <-ctx.Done():
stopAllTasks(execCtx, "Job cancelled by user")
return nil
case <-time.After(time.Duration(execCtx.PollingInterval)):
} - when we run (h *Heimdall) runJob(job *job.Job, command *command.Command, cluster *cluster.Cluster, ctx context.Context) error {
We start 2 gorutines
- Execute job
- Check job status in db and if it's canceled terminate context. We can call handler.Terminate here because all resources in our hands or wait when Janitor cancel everything.
|
|
||
| // handler executes the Spark EKS job submission and execution. | ||
| func (s *sparkEksCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { | ||
| func (s *sparkEksCommandContext) handler(ct ct.Context, r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context is set as a var here to be accessed globally. Wondering if this is the right approach to reassign global context to the context we pass in to the handler. @hladush @sanketjadhavSF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix
internal/pkg/heimdall/job.go
Outdated
| } | ||
|
|
||
| func (h *Heimdall) runJob(job *job.Job, command *command.Command, cluster *cluster.Cluster) error { | ||
| func (h *Heimdall) runJob(job *job.Job, command *command.Command, cluster *cluster.Cluster, ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context is always the first parameter in golang
internal/pkg/heimdall/job.go
Outdated
| func (h *Heimdall) cancelJob(req *jobRequest) (any, error) { | ||
|
|
||
| // validate that job exists and get its current status | ||
| currentJob, err := h.getJob(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have a user who likes to call api or click a button few times, cancel cancel, cancel, or UI had an issue and send 2 requests with tiny delete.
On machine 1 you execute lines from 173 to 188, after that on another machine the same request is processed and machine 1 executing and set status to cancel.
Can we make canceling an atomic operation?
internal/pkg/heimdall/job.go
Outdated
| func (h *Heimdall) cancelJob(req *jobRequest) (any, error) { | ||
|
|
||
| // validate that job exists and get its current status | ||
| currentJob, err := h.getJob(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add metrics and logs for errors
| j.job_id | ||
| from | ||
| jobs j | ||
| join job_statuses js on j.job_status_id = js.job_status_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need join here if status.go has a clear mapping between job_status_id and name in DB?
| set | ||
| job_status_id = $1, | ||
| job_error = $2, | ||
| updated_at = extract(epoch from now())::int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also add a column updated_by to log the username who has requested the job cancellation for future audit trails or debugging.
may be we can also add an optional column cancellation_reason for logging the cancellation reason as well. we can make it mandatory when we integrate the cancel api with the UI.
…into cancel_jobs merge in main
| } | ||
|
|
||
| func (t *commandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { | ||
| func (t *commandContext) handler(ct ct.Context, r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in golang context is imported as context and usual shortage is ctx.
ctx context.Context. Let's unify it
|
|
||
| // execute request | ||
| result, err := fn(&payload) | ||
| result, err := fn(r.Context(), &payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like it
| pluginCtx, cancel := context.WithCancel(ctx) | ||
|
|
||
| // Start plugin execution in goroutine | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea
internal/pkg/heimdall/job.go
Outdated
| } | ||
|
|
||
| // check current job status | ||
| switch job.Status { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we run 1 query try to update job and if 0 rows or 1 row is updated return a result based on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this approach makes the most sense because we already have to make 1 call h.getJob, and based on the status we don't actually need to make a call unless its in a running or new state. I could reuse the updateAsyncJobStatus function though and get rid of this custom one. Thoughts? @hladush
internal/pkg/heimdall/job.go
Outdated
| } | ||
|
|
||
| // isJobCancelling checks if a specific job is in CANCELLING state | ||
| func (h *Heimdall) isJobCancelling(j *job.Job) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just getJobStatus and after that everyone will resolve it on top?
internal/pkg/pool/pool.go
Outdated
| Size int `yaml:"size,omitempty" json:"size,omitempty"` | ||
| Sleep int `yaml:"sleep,omitempty" json:"sleep,omitempty"` | ||
| queue chan T | ||
| queue chan *job.Job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pool is an abstract object which can do job on any time of items. Let's keep it abstract.
| ) | ||
|
|
||
| type commandContext struct { | ||
| type clickhouseCommandContext struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see why you named that in that way the general way in golang is to not use package prefix as a name in the type.
maybe let's name it execution context? or commandExecutionContext
|
|
||
| func (h *Heimdall) cancelJob(ctx context.Context, req *jobRequest) (any, error) { | ||
|
|
||
| sess, err := h.Database.NewSession(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| // Start cancellation monitoring for async jobs | ||
| if !j.IsSync { | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: separate method + metrics?
| return err | ||
| // Check if context was cancelled and mark status appropriately | ||
| if pluginCtx.Err() != nil { | ||
| j.Status = jobStatus.Cancelling // janitor will update to cancelled when resources are cleaned up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a func/step in janitor to update the status from CANCELLING to CANCELLED?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Janitor will do this once we implement. Once remote resources are shut down, it will move to cancelled.
3e1e6c6
| j.Status = status.Failed | ||
| j.Error = jobError.Error() | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is redundant. Status already set in job.go
PR Details
Clickup Link -
Description
This PR enables a job/{jobid}/cancel endpoint that will cancel any ASYNC jobs.
The way this mechanism works:
When the
cancelapi is hit, it immediately updates that job's status toCANCELLINGin the database and also updates thecancelled_byfield.Each machine running Heimdall will create a context cancellation for each async job when fired off. Immediately after starting the plugin handler, a separate Go routine starts polling the database (every 10 seconds) to see if the status changes to
CANCELLING. If the job status changes, then the routine cancels the context. All existing ASYNC jobs have components that respect context and will cause the job to fail due to the cancelled context. The job status will remain asCANCELLINGfor now until we implement the janitor resource clean-up which will terminate any remote resources and finalize with aCANCELEDstate.Types of changes
Checklist