66from typing import Dict , List , NamedTuple
77from bson .objectid import ObjectId
88from mongoengine import connect , connection
9- from pymongo import MongoClient , UpdateOne
9+ from pymongo import MongoClient
1010from pymongo .errors import ServerSelectionTimeoutError
1111
1212from execution_engine2 .db .models .models import JobLog , Job , Status , TerminatedCode
@@ -33,7 +33,8 @@ def __init__(self, config: Dict):
3333 self .mongo_pass = config ["mongo-password" ]
3434 self .retry_rewrites = parse_bool (config ["mongo-retry-rewrites" ])
3535 self .mongo_authmechanism = config ["mongo-authmechanism" ]
36- self .mongo_collection = None
36+ self ._col_jobs = config ["mongo-jobs-collection" ]
37+ self ._col_logs = config ["mongo-logs-collection" ]
3738 self ._start_local_service ()
3839 self .logger = logging .getLogger ("ee2" )
3940 self .pymongoc = self ._get_pymongo_client ()
@@ -155,41 +156,28 @@ def _get_collection(
155156
156157 return pymongo_client , mongoengine_client
157158
158- @contextmanager
159- def pymongo_client (self , mongo_collection ):
160- """
161- Instantiates a mongo client to be used as a context manager
162- Closes the connection at the end
163- :return:
164- """
165- self .mongo_collection = mongo_collection
166- yield self .pymongoc
167-
168159 def get_workspace_jobs (self , workspace_id ):
169160 with self .mongo_engine_connection ():
170161 job_ids = [str (job .id ) for job in Job .objects (wsid = workspace_id )]
171162 return job_ids
172163
173164 def get_job_log_pymongo (self , job_id : str = None ):
174165
175- mongo_collection = self .config ["mongo-logs-collection" ]
176-
177- with self .pymongo_client (mongo_collection ) as pymongo_client :
178- job_log_col = pymongo_client [self .mongo_database ][self .mongo_collection ]
179- try :
180- find_filter = {"_id" : ObjectId (job_id )}
181- job_log = job_log_col .find_one (find_filter )
182- except Exception as e :
183- error_msg = "Unable to find job\n "
184- error_msg += "ERROR -- {}:\n {}" .format (
185- e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
186- )
187- raise ValueError (error_msg )
166+ job_log_col = self .pymongoc [self .mongo_database ][self ._col_logs ]
167+ try :
168+ find_filter = {"_id" : ObjectId (job_id )}
169+ job_log = job_log_col .find_one (find_filter )
170+ except Exception as e :
171+ error_msg = "Unable to find job\n "
172+ error_msg += "ERROR -- {}:\n {}" .format (
173+ e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
174+ )
175+ raise ValueError (error_msg )
188176
189- if not job_log :
190- raise RecordNotFoundException (
191- "Cannot find job log with id: {}" .format (job_id )
192- )
177+ if not job_log :
178+ raise RecordNotFoundException (
179+ "Cannot find job log with id: {}" .format (job_id )
180+ )
193181
194182 return job_log
195183
@@ -290,88 +278,24 @@ def update_job_to_queued(
290278 raise ValueError ("None of the 3 arguments can be falsy" )
291279 # could also test that the job ID is a valid job ID rather than having mongo throw an
292280 # error
293- mongo_collection = self .config ["mongo-jobs-collection" ]
294- queue_time_now = time .time ()
295- with self .pymongo_client (mongo_collection ) as pymongo_client :
296- ee2_jobs_col = pymongo_client [self .mongo_database ][mongo_collection ]
297- # should we check that the job was updated and do something if it wasn't?
298- ee2_jobs_col .update_one (
299- {"_id" : ObjectId (job_id ), "status" : Status .created .value },
300- {"$set" : {"status" : Status .queued .value , "queued" : queue_time_now }},
301- )
302- # originally had a single query, but seems safer to always record the scheduler
303- # state no matter the state of the job
304- ee2_jobs_col .update_one (
305- {"_id" : ObjectId (job_id )},
306- {
307- "$set" : {
308- "scheduler_id" : scheduler_id ,
309- "scheduler_type" : scheduler_type ,
310- }
311- },
312- )
313-
314- def update_jobs_to_queued (
315- self , job_id_pairs : List [JobIdPair ], scheduler_type : str = "condor"
316- ) -> None :
317- f"""
318- * Adds scheduler id to list of jobs
319- * Updates a list of { Status .created .value } jobs to queued. Does not work on jobs that already have gone through any other
320- status transition. If the record is not in the { Status .created .value } status, nothing will happen
321- :param job_id_pairs: A list of pairs of Job Ids and Scheduler Ids
322- :param scheduler_type: The scheduler this job was queued in, default condor
323- """
324-
325- bulk_update_scheduler_jobs = []
326- bulk_update_created_to_queued = []
327281 queue_time_now = time .time ()
328- for job_id_pair in job_id_pairs :
329- if job_id_pair .job_id is None :
330- raise ValueError (
331- f"Provided a bad job_id_pair, missing job_id for { job_id_pair .scheduler_id } "
332- )
333- elif job_id_pair .scheduler_id is None :
334- raise ValueError (
335- f"Provided a bad job_id_pair, missing scheduler_id for { job_id_pair .job_id } "
336- )
337-
338- bulk_update_scheduler_jobs .append (
339- UpdateOne (
340- {
341- "_id" : ObjectId (job_id_pair .job_id ),
342- },
343- {
344- "$set" : {
345- "scheduler_id" : job_id_pair .scheduler_id ,
346- "scheduler_type" : scheduler_type ,
347- }
348- },
349- )
350- )
351- bulk_update_created_to_queued .append (
352- UpdateOne (
353- {
354- "_id" : ObjectId (job_id_pair .job_id ),
355- "status" : Status .created .value ,
356- },
357- {
358- "$set" : {
359- "status" : Status .queued .value ,
360- "queued" : queue_time_now ,
361- }
362- },
363- )
364- )
365- # Update provided jobs with scheduler id. Then only update non terminated jobs into updated status.
366- mongo_collection = self .config ["mongo-jobs-collection" ]
367-
368- if bulk_update_scheduler_jobs :
369- with self .pymongo_client (mongo_collection ) as pymongo_client :
370- ee2_jobs_col = pymongo_client [self .mongo_database ][mongo_collection ]
371- # Bulk Update to add scheduler ids
372- ee2_jobs_col .bulk_write (bulk_update_scheduler_jobs , ordered = False )
373- # Bulk Update to add queued status ids
374- ee2_jobs_col .bulk_write (bulk_update_created_to_queued , ordered = False )
282+ ee2_jobs_col = self .pymongoc [self .mongo_database ][self ._col_jobs ]
283+ # should we check that the job was updated and do something if it wasn't?
284+ ee2_jobs_col .update_one (
285+ {"_id" : ObjectId (job_id ), "status" : Status .created .value },
286+ {"$set" : {"status" : Status .queued .value , "queued" : queue_time_now }},
287+ )
288+ # originally had a single query, but seems safer to always record the scheduler
289+ # state no matter the state of the job
290+ ee2_jobs_col .update_one (
291+ {"_id" : ObjectId (job_id )},
292+ {
293+ "$set" : {
294+ "scheduler_id" : scheduler_id ,
295+ "scheduler_type" : scheduler_type ,
296+ }
297+ },
298+ )
375299
376300 def cancel_job (self , job_id = None , terminated_code = None ):
377301 """
@@ -542,26 +466,6 @@ def insert_jobs(self, jobs_to_insert: List[Job]) -> List[ObjectId]:
542466 inserted = Job .objects .insert (doc_or_docs = jobs_to_insert , load_bulk = False )
543467 return inserted
544468
545- def insert_one (self , doc ):
546- """
547- insert a doc into collection
548- """
549- self .logger .debug ("start inserting document" )
550-
551- with self .pymongo_client (self .mongo_collection ) as pymongo_client :
552- try :
553- rec = pymongo_client [self .mongo_database ][
554- self .mongo_collection
555- ].insert_one (doc )
556- except Exception as e :
557- error_msg = "Cannot insert doc\n "
558- error_msg += "ERROR -- {}:\n {}" .format (
559- e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
560- )
561- raise ValueError (error_msg )
562-
563- return rec .inserted_id
564-
565469 def _push_job_logs (self , log_lines : JobLog , job_id : str , record_count : int ):
566470 """append a list of job logs, and update the record count"""
567471
@@ -574,80 +478,15 @@ def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int):
574478 "updated" : time .time (),
575479 }
576480 update = {"$push" : push_op , "$set" : set_op }
577- with self .pymongo_client (self .mongo_collection ) as pymongo_client :
578- job_col = pymongo_client [self .mongo_database ][self .mongo_collection ]
579- try :
580- job_col .update_one (update_filter , update , upsert = False )
581- except Exception as e :
582- error_msg = "Cannot update doc\n ERROR -- {}:\n {}" .format (
583- e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
584- )
585- raise ValueError (error_msg )
586-
587- slc = job_col .find_one ({"_id" : ObjectId (job_id )}).get ("stored_line_count" )
588-
589- return slc
590-
591- def update_one (self , doc , job_id ):
592- """
593- update existing records or create if they do not exist
594- https://docs.mongodb.com/manual/reference/operator/update/set/
595- """
596- with self .pymongo_client (self .mongo_collection ) as pymongo_client :
597- job_col = pymongo_client [self .mongo_database ][self .mongo_collection ]
598- try :
599- update_filter = {"_id" : ObjectId (job_id )}
600- update = {"$set" : doc }
601- job_col .update_one (update_filter , update , upsert = True )
602- except Exception as e :
603- error_msg = "Connot update doc\n "
604- error_msg += "ERROR -- {}:\n {}" .format (
605- e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
606- )
607- raise ValueError (error_msg )
608-
609- return True
610-
611- def delete_one (self , job_id ):
612- """
613- delete a doc by _id
614- """
615- self .logger .debug ("start deleting document" )
616- with self .pymongo_client (self .mongo_collection ) as pymongo_client :
617- job_col = pymongo_client [self .mongo_database ][self .mongo_collection ]
618- try :
619- delete_filter = {"_id" : ObjectId (job_id )}
620- job_col .delete_one (delete_filter )
621- except Exception as e :
622- error_msg = "Connot delete doc\n "
623- error_msg += "ERROR -- {}:\n {}" .format (
624- e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
625- )
626- raise ValueError (error_msg )
627-
628- return True
629-
630- def find_in (self , elements , field_name , projection = None , batch_size = 1000 ):
631- """
632- return cursor that contains docs which field column is in elements
633- """
634- self .logger .debug ("start querying MongoDB" )
635-
636- with self .pymongo_client (self .mongo_collection ) as pymongo_client :
637- job_col = pymongo_client [self .mongo_database ][self .mongo_collection ]
638- try :
639- result = job_col .find (
640- {field_name : {"$in" : elements }},
641- projection = projection ,
642- batch_size = batch_size ,
643- )
644- except Exception as e :
645- error_msg = "Connot query doc\n "
646- error_msg += "ERROR -- {}:\n {}" .format (
647- e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
648- )
649- raise ValueError (error_msg )
481+ job_col = self .pymongoc [self .mongo_database ][self ._col_logs ]
482+ try :
483+ job_col .update_one (update_filter , update , upsert = False )
484+ except Exception as e :
485+ error_msg = "Cannot update doc\n ERROR -- {}:\n {}" .format (
486+ e , "" .join (traceback .format_exception (None , e , e .__traceback__ ))
487+ )
488+ raise ValueError (error_msg )
650489
651- self . logger . debug ( "returned {} results" . format ( result . count ()) )
490+ slc = job_col . find_one ({ "_id" : ObjectId ( job_id )}). get ( "stored_line_count" )
652491
653- return result
492+ return slc
0 commit comments