Skip to content

Commit 8d4299f

Browse files
authored
DATAUP-499 Switch parent_job_id for batch_id for 'true' batch jobs (#404)
* Switch parent_job_id for batch_id for 'true' batch jobs Per the request of the front end devs, we want a way to distinguish 'true' batch jobs created by run_job_batch from 'manual' batch jobs created by including a parent_job_id in the job input params. As such, true batch jobs now have a batch_id field in the job status dict, replacing the parent_job_id field. That field remains in the job_input. A job must only ever have zero or one of the fields, never both. * run black * Match spec, change some spec keys Changed the abandon_children return dict to match the spec Updated the check_jobs_batch spec to match nomenclature elsewhere * Update names and docs to batch from parent
1 parent bd218d6 commit 8d4299f

12 files changed

Lines changed: 454 additions & 331 deletions

execution_engine2.html

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

execution_engine2.spec

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,12 @@
181181
} BatchParams;
182182

183183
typedef structure {
184-
job_id parent_job_id;
184+
job_id batch_id;
185185
list<job_id> child_job_ids;
186186
} BatchSubmission;
187187

188188
typedef structure {
189-
job_id parent_job_id;
189+
job_id batch_id;
190190
list<job_id> child_job_ids;
191191
boolean as_admin;
192192
} AbandonChildren;
@@ -460,7 +460,7 @@
460460
retry_ids - list - list of jobs that are retried based off of this job
461461
retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself
462462

463-
parent_job_id - str - job_id taken from job_input.parent_job_id
463+
batch_id - str - the coordinating job, if the job is a child job created via run_job_batch
464464
batch_job - bool - whether or not this is a batch parent container
465465
child_jobs - array - Only parent container should have child job ids
466466

@@ -501,6 +501,7 @@
501501
int error_code;
502502
string errormsg;
503503
int terminated_code;
504+
string batch_id;
504505

505506
} JobState;
506507

@@ -510,12 +511,12 @@
510511
funcdef check_job(CheckJobParams params) returns (JobState job_state) authentication required;
511512

512513
/*
513-
parent_job - state of parent job
514-
job_states - states of child jobs
514+
batch_jobstate - state of the coordinating job for the batch
515+
child_jobstates - states of child jobs
515516
IDEA: ADD aggregate_states - count of all available child job states, even if they are zero
516517
*/
517518
typedef structure {
518-
JobState parent_jobstate;
519+
JobState batch_jobstate;
519520
list<JobState> child_jobstates;
520521
} CheckJobBatchResults;
521522

lib/execution_engine2/db/models/models.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ class JobInput(EmbeddedDocument):
165165
service_ver = StringField(required=True)
166166
app_id = StringField()
167167
source_ws_objects = ListField()
168+
# this ID is for jobs submitted via run_job with a parent_job_id field included by the
169+
# client. For this case, the parent job is not updated at all.
168170
parent_job_id = StringField()
169171
requirements = EmbeddedDocumentField(JobRequirements)
170172
narrative_cell_info = EmbeddedDocumentField(Meta, required=True)
@@ -320,7 +322,12 @@ class Job(Document):
320322
job_input = EmbeddedDocumentField(JobInput, required=True)
321323
job_output = DynamicField()
322324
condor_job_ads = DynamicField()
323-
child_jobs = ListField() # Only parent container should have child jobs
325+
# this is the ID of the coordinating job created as part of run_job_batch. Only child jobs
326+
# in a "true" batch job maintained by EE2 should have this field. Coordinating jobs will
327+
# be updated with the child ID in child_jobs, unlike "fake" batch jobs that are created
328+
# outside of the EE2 codebase using the 'parent_job_id' field.
329+
batch_id = StringField()
330+
child_jobs = ListField() # Only coordinating jobs should have child jobs
324331
# batch_parent_container = BooleanField(default=False) # Only parent container should have this
325332
retry_ids = ListField() # The retry_parent has been used to launch these jobs
326333
# Only present on a retried job, not it's parent. If attempting to retry this job, use its parent instead

lib/execution_engine2/execution_engine2Impl.py

Lines changed: 325 additions & 234 deletions
Large diffs are not rendered by default.

lib/execution_engine2/sdk/EE2Runjob.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
_REQUIREMENTS_LIST = "requirements_list"
5454
_METHOD = "method"
5555
_APP_ID = "app_id"
56+
_BATCH_ID = "batch_id"
5657
_PARENT_JOB_ID = "parent_job_id"
5758
_PARENT_RETRY_JOB_ID = "retry_parent"
5859
_RETRY_IDS = "retry_ids"
@@ -98,6 +99,7 @@ def _init_job_rec(
9899
{_SERVICE_VER} (app version)
99100
{_APP_ID} (app UI)
100101
{_SOURCE_WS_OBJECTS} (collected workspace objects for this app)
102+
{_BATCH_ID} (parent of the job for EE2 batch jobs, the parent should be updated)
101103
{_PARENT_JOB_ID} (parent of this job, doesn't update/notify the parent)
102104
{_META} (narrative cell information)
103105
@@ -158,6 +160,7 @@ def _init_job_rec(
158160
parent_retry_job_id = params.get(_PARENT_RETRY_JOB_ID)
159161
if parent_retry_job_id:
160162
job.retry_parent = str(parent_retry_job_id)
163+
job.batch_id = str(params.get(_BATCH_ID)) if params.get(_BATCH_ID) else None
161164

162165
job_id = self.sdkmr.save_job(job)
163166
self.sdkmr.get_kafka_client().send_kafka_message(
@@ -241,7 +244,10 @@ def _prepare_to_run(self, params, concierge_params=None) -> JobSubmissionParamet
241244
AppInfo(params[_METHOD], params.get(_APP_ID)),
242245
params[_JOB_REQUIREMENTS],
243246
UserCreds(self.sdkmr.get_user_id(), self.sdkmr.get_token()),
244-
parent_job_id=params.get(_PARENT_JOB_ID),
247+
# a job should have a parent ID or a batch ID or nothing, but never both
248+
# Do we want to distinguish between the two cases in the sub params?
249+
# It's informational only for Condor
250+
parent_job_id=params.get(_BATCH_ID) or params.get(_PARENT_JOB_ID),
245251
wsid=params.get(_WORKSPACE_ID),
246252
source_ws_objects=params.get(_SOURCE_WS_OBJECTS),
247253
)
@@ -287,7 +293,7 @@ def _abort_child_jobs(self, child_job_ids):
287293
# TODO Maybe add a retry here?
288294
self.logger.error(f"Couldn't cancel child job {e}")
289295

290-
def _create_parent_job(self, wsid, meta):
296+
def _create_batch_job(self, wsid, meta):
291297
"""
292298
This creates the parent job for all children to mark as their ancestor
293299
:param params:
@@ -320,11 +326,11 @@ def _create_parent_job(self, wsid, meta):
320326
)
321327
return j
322328

323-
def _run_batch(self, parent_job: Job, params):
329+
def _run_batch(self, batch_job: Job, params):
324330
child_jobs = []
325331

326332
for job_param in params:
327-
job_param[_PARENT_JOB_ID] = str(parent_job.id)
333+
job_param[_BATCH_ID] = str(batch_job.id)
328334
try:
329335
child_jobs.append(str(self._run(params=job_param)))
330336
except Exception as e:
@@ -334,8 +340,8 @@ def _run_batch(self, parent_job: Job, params):
334340
self._abort_child_jobs(child_jobs)
335341
raise e
336342

337-
parent_job.child_jobs = child_jobs
338-
self.sdkmr.save_job(parent_job)
343+
batch_job.child_jobs = child_jobs
344+
self.sdkmr.save_job(batch_job)
339345

340346
return child_jobs
341347

@@ -365,12 +371,12 @@ def run_batch(
365371
self._check_workspace_permissions_list(wsids)
366372

367373
self._add_job_requirements(params, bool(as_admin)) # as_admin checked above
368-
self._check_job_arguments(params, has_parent_job=True)
374+
self._check_job_arguments(params, batch_job=True)
369375

370-
parent_job = self._create_parent_job(wsid=wsid, meta=meta)
371-
children_jobs = self._run_batch(parent_job=parent_job, params=params)
376+
batch_job = self._create_batch_job(wsid=wsid, meta=meta)
377+
children_jobs = self._run_batch(batch_job=batch_job, params=params)
372378

373-
return {_PARENT_JOB_ID: str(parent_job.id), "child_job_ids": children_jobs}
379+
return {_BATCH_ID: str(batch_job.id), "child_job_ids": children_jobs}
374380

375381
# modifies the jobs in place
376382
def _add_job_requirements(self, jobs: List[Dict[str, Any]], is_write_admin: bool):
@@ -464,7 +470,7 @@ def _rethrow_incorrect_params_with_error_prefix(
464470
raise error
465471
raise IncorrectParamsException(f"{error_prefix}{error.args[0]}") from error
466472

467-
def _check_job_arguments(self, jobs, has_parent_job=False):
473+
def _check_job_arguments(self, jobs, batch_job=False):
468474
# perform sanity checks before creating any jobs, including the parent job for batch jobs
469475
for i, job in enumerate(jobs):
470476
# Could make an argument checker method, or a class that doesn't require a job id.
@@ -482,7 +488,7 @@ def _check_job_arguments(self, jobs, has_parent_job=False):
482488
)
483489
except IncorrectParamsException as e:
484490
self._rethrow_incorrect_params_with_error_prefix(e, pre)
485-
if has_parent_job and job.get(_PARENT_JOB_ID):
491+
if batch_job and job.get(_PARENT_JOB_ID):
486492
raise IncorrectParamsException(
487493
f"{pre}batch jobs may not specify a parent job ID"
488494
)
@@ -535,12 +541,11 @@ def _validate_retry_presubmit(self, job_id: str, as_admin: bool = False):
535541
job = self.sdkmr.get_job_with_permission(
536542
job_id, JobPermissions.WRITE, as_admin=as_admin
537543
) # type: Job
538-
job_input = job.job_input # type: JobInput
539544

540-
parent_job = None
541-
if job_input.parent_job_id:
542-
parent_job = self.sdkmr.get_job_with_permission(
543-
job_input.parent_job_id, JobPermissions.WRITE, as_admin=as_admin
545+
batch_job = None
546+
if job.batch_id:
547+
batch_job = self.sdkmr.get_job_with_permission(
548+
job.batch_id, JobPermissions.WRITE, as_admin=as_admin
544549
)
545550

546551
if job.batch_job:
@@ -553,9 +558,9 @@ def _validate_retry_presubmit(self, job_id: str, as_admin: bool = False):
553558
f"Error retrying job {job_id} with status {job.status}: can only retry jobs with status 'error' or 'terminated'"
554559
)
555560

556-
return job, parent_job
561+
return job, batch_job
557562

558-
def _retry(self, job_id: str, job: Job, parent_job: Job, as_admin: bool = False):
563+
def _retry(self, job_id: str, job: Job, batch_job: Job, as_admin: bool = False):
559564
# Cannot retry a retried job, you must retry the retry_parent
560565
if job.retry_parent:
561566
return self.retry(str(job.retry_parent), as_admin=as_admin)
@@ -569,13 +574,15 @@ def _retry(self, job_id: str, job: Job, parent_job: Job, as_admin: bool = False)
569574
retry_job_id = self.run(params=run_job_params, as_admin=as_admin)
570575

571576
# Save that the job has been retried, and increment the count. Notify the parent(s)
572-
# 1) Notify the parent container that it has a new child..
573-
if parent_job:
577+
# 1) Notify the batch container that it has a new child. Note that the parent jobs of
578+
# 'manual' batch jobs using the job_input.parent_job_id field *are not* modified to
579+
# include their children, so we don't do that here either.
580+
if batch_job:
574581
try:
575-
parent_job.modify(add_to_set__child_jobs=retry_job_id)
582+
batch_job.modify(add_to_set__child_jobs=retry_job_id)
576583
except Exception as e:
577584
self._db_update_failure(
578-
job_that_failed_operation=str(parent_job.id),
585+
job_that_failed_operation=str(batch_job.id),
579586
job_to_abort=retry_job_id,
580587
exception=e,
581588
)
@@ -611,11 +618,11 @@ def retry(self, job_id: str, as_admin=False) -> Dict[str, Optional[str]]:
611618
:param as_admin: Run with admin permission
612619
:return: The child job id that has been retried
613620
"""
614-
job, parent_job = self._validate_retry_presubmit(
621+
job, batch_job = self._validate_retry_presubmit(
615622
job_id=job_id, as_admin=as_admin
616623
)
617624
return self._retry(
618-
job_id=job_id, job=job, parent_job=parent_job, as_admin=as_admin
625+
job_id=job_id, job=job, batch_job=batch_job, as_admin=as_admin
619626
)
620627

621628
def retry_multiple(
@@ -646,14 +653,14 @@ def retry_multiple(
646653
# Check all inputs before attempting to start submitting jobs
647654
retried_jobs = []
648655
jobs = []
649-
parent_jobs = []
656+
batch_jobs = []
650657
for job_id in job_ids:
651658
try:
652-
job, parent_job = self._validate_retry_presubmit(
659+
job, batch_job = self._validate_retry_presubmit(
653660
job_id=job_id, as_admin=as_admin
654661
)
655662
jobs.append(job)
656-
parent_jobs.append(parent_job)
663+
batch_jobs.append(batch_job)
657664
except Exception as e:
658665
raise RetryFailureException(e)
659666

@@ -664,7 +671,7 @@ def retry_multiple(
664671
self._retry(
665672
job_id=job_id,
666673
job=jobs[i],
667-
parent_job=parent_jobs[i],
674+
batch_job=batch_jobs[i],
668675
as_admin=as_admin,
669676
)
670677
)
@@ -810,6 +817,8 @@ def get_job_params(self, job_id, as_admin=False):
810817
job_params["service_ver"] = job_input.service_ver
811818
job_params[_APP_ID] = job_input.app_id
812819
job_params[_WORKSPACE_ID] = job_input.wsid
820+
# This is specfically the data in the job params, which includes any manually submitted
821+
# parent job information but does not include batch job information
813822
job_params[_PARENT_JOB_ID] = job_input.parent_job_id
814823
job_params[_SOURCE_WS_OBJECTS] = job_input.source_ws_objects
815824

lib/execution_engine2/sdk/EE2Status.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -474,9 +474,7 @@ def check_jobs(
474474
del mongo_rec["_id"]
475475
mongo_rec["retry_count"] = len(job["retry_ids"])
476476
mongo_rec["job_id"] = str(job.id)
477-
mongo_rec["parent_job_id"] = (
478-
job.job_input.parent_job_id if job.job_input else None
479-
)
477+
mongo_rec["batch_id"] = job.batch_id
480478
mongo_rec["created"] = int(job.id.generation_time.timestamp() * 1000)
481479
mongo_rec["updated"] = int(job.updated * 1000)
482480
if job.estimating:
@@ -560,26 +558,28 @@ def _send_exec_stats_to_catalog(self, job_id):
560558

561559
self.sdkmr.get_catalog().log_exec_stats(log_exec_stats_params)
562560

563-
def abandon_children(self, parent_job_id, child_job_ids, as_admin=False) -> Dict:
564-
if not parent_job_id:
565-
raise ValueError("Please provide valid parent_job id")
561+
def abandon_children(self, batch_id, child_job_ids, as_admin=False) -> Dict:
562+
# Note this does not work for 'manual' batch jobs as the parent job is
563+
# never updated with the child jobs. It will only work with batch jobs specifically
564+
# created by the run_job_batch endpoint.
565+
if not batch_id:
566+
raise ValueError("Please provide valid batch_id")
566567
if not child_job_ids:
567568
raise ValueError("Please provide job_ids of children to abandon")
568569

569570
job = self.sdkmr.get_job_with_permission(
570-
parent_job_id, JobPermissions.WRITE, as_admin=as_admin
571+
batch_id, JobPermissions.WRITE, as_admin=as_admin
571572
) # type: Job
572573
for child_job_id in child_job_ids:
573574
if child_job_id not in job.child_jobs:
574575
raise ChildrenNotFoundError(
575576
f"Couldn't find {child_job_id} in {child_job_ids}"
576577
)
577578

578-
with self.sdkmr.get_mongo_util().mongo_engine_connection():
579-
job.update(pull_all__child_jobs=child_job_ids)
580-
job.reload()
579+
job.update(pull_all__child_jobs=child_job_ids)
580+
job.reload()
581581

582-
return {"parent_job_id": parent_job_id, "child_jobs": job.child_jobs}
582+
return {"batch_id": batch_id, "child_job_ids": job.child_jobs}
583583

584584
def start_job(self, job_id, skip_estimation=True, as_admin=False):
585585
"""

lib/execution_engine2/sdk/SDKMethodRunner.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,10 +358,10 @@ def start_job(self, job_id, skip_estimation=True, as_admin=False):
358358
)
359359

360360
# Endpoints: Changing a job's status
361-
def abandon_children(self, parent_job_id, child_job_ids, as_admin=False):
361+
def abandon_children(self, batch_id, child_job_ids, as_admin=False):
362362
"""Authorization Required Read/Write"""
363363
return self.get_jobs_status().abandon_children(
364-
parent_job_id=parent_job_id, child_job_ids=child_job_ids, as_admin=as_admin
364+
batch_id=batch_id, child_job_ids=child_job_ids, as_admin=as_admin
365365
)
366366

367367
def update_job_status(self, job_id, status, as_admin=False):
@@ -433,7 +433,7 @@ def get_job_status_field(self, job_id, as_admin=False):
433433

434434
def check_job_batch(
435435
self,
436-
parent_job_id,
436+
batch_id,
437437
check_permission=True,
438438
exclude_fields=None,
439439
as_admin=False,
@@ -448,7 +448,7 @@ def check_job_batch(
448448
raise ValueError("You can't exclude child jobs from this endpoint")
449449

450450
parent_job_status = self.get_jobs_status().check_job(
451-
job_id=parent_job_id,
451+
job_id=batch_id,
452452
check_permission=check_permission,
453453
exclude_fields=exclude_fields,
454454
)
@@ -462,7 +462,7 @@ def check_job_batch(
462462
return_list=1,
463463
)["job_states"]
464464
return {
465-
"parent_jobstate": parent_job_status,
465+
"batch_jobstate": parent_job_status,
466466
"child_jobstates": child_job_states,
467467
}
468468

0 commit comments

Comments
 (0)