-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlaunch_nersc.py
More file actions
executable file
·548 lines (481 loc) · 25 KB
/
launch_nersc.py
File metadata and controls
executable file
·548 lines (481 loc) · 25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
#!/usr/bin/env python
#
# Submit jobs to run at NERSC via swif2
#
# See more detailed documentation here:
# https://halldweb.jlab.org/wiki/index.php/HOWTO_Execute_a_Launch_using_NERSC
#
# This will run commands submitting several recon jobs
# with the run/file numbers hardcoded into this script.
# Here is how this is supposed to work:
#
# This will run swif2 with all of the SBATCH(slurm) options
# passed via command line. This includes the shifter image
# that should be used. swif2 will then take care of getting
# the file from tape and transferring it to NERSC. Once
# the file is there, it will submit the job to Cori.
#
# When the job wakes up, it will be in a subdirectory of the
# NERSC scratch disk that swif2 has already setup.
# This directory will contain a symbolic link pointing
# to the raw data file which is somewhere else on the scratch
# disk.
#
# The container will run the /launch/script_nersc.sh script
# where /launch has been mounted in the container from the
# "launch" directory in the project directory. The jana
# config file is also kept in the launch directory.
#
# The container will also have /cvmfs mounted. The standard
# gluex containers have links built in so that /group will
# point to the appropriate subdirectory of /cvmfs making the
# the GlueX software available. The script_nersc.sh script
# will use this to setup the environment and then run hd_root
# using the /launch/jana_recon_nersc.config file.
#
# A couple of more notes:
#
# 1. The CCDB and RCDB used comes from an sqlite file in
# CVMFS. These are copied to the local node in /tmp at
# the beginning of the job and deleted at the end. The
# timestamp used is hardcoded in /launch/jana_recon_nersc.config
#
# 2. NERSC requires that the program being run is actually
# a script that starts with #!/XXX/YYY . It is actually a
# SLURM script where additional SLURM options could be set.
# We do not put them there though. All SLURM options are
# passed via the sbatch command swif2 runs and that we specify
# here. The /launch/script_nersc.sh script is trivial and only
# runs shifter passing any arguments we give to it here in
# the swif2 command.
#
# 3. The output directory is created by this script
# to allow group writing since the files are copied using
# the davidl account on globus but swif2 is being run from
# the gxproj4 account.
#
#
# For the recon_2018-08_ver00 launch, the data were separated into
# separate batches following the boundaries defined here:
# https://halldweb.jlab.org/wiki-private/index.php/Fall_2018_Dataset_Summary
#
# BATCH 1: 50677-51035
# BATCH 2: 51036-51203
# BATCH 3: 51204-51383
# BATCH 4: 51497-51638, 51683-51687, 51722-51768
#
# Number of jobs by batch:
# BATCH 1: 14833
# BATCH 2: 12154
# BATCH 3: 9835
# BATCH 4: 10363 = 8373+189+1801
import subprocess
import math
import glob
import sys
import os
# mysql.connector not available via system and must come via PYTHONPATH
if not os.getenv('PYTHONPATH') : sys.path.append('/group/halld/Software/builds/Linux_CentOS7-x86_64-gcc4.8.5/ccdb/ccdb_1.06.06/python')
import mysql.connector
#TESTMODE = True # True=only print commands, but don't actually submit jobs
TESTMODE = False # True=only print commands, but don't actually submit jobs
VERBOSE = 3 # 1 is default
### 2019-11 monitoring ver13
##RUNPERIOD = '2018-01'
RUNPERIOD = '2019-11'
#LAUNCHTYPE = 'recon' # 'offmon' or 'recon'
LAUNCHTYPE = 'offmon' # 'offmon' or 'recon'
VER = 'test_13'
BATCH = '01'
#WORKFLOW = LAUNCHTYPE+'_'+RUNPERIOD+'_ver'+VER+'_batch'+BATCH
WORKFLOW = LAUNCHTYPE+'_'+RUNPERIOD+'_ver'+VER+'_batch'+BATCH+'_igal'
NAME = 'GLUEX_' + LAUNCHTYPE
#RCDB_QUERY = '@is_2018production and @status_approved' # Comment out for all runs in range MINRUN-MAXRUN
RCDB_QUERY = '@is_dirc_production' # Comment out for all runs in range MINRUN-MAXRUN
#RUNS = [71463,71469,71724] # List of runs to process. If empty, MINRUN-MAXRUN are searched in RCDB
RUNS = [71350] # List of runs to process. If empty, MINRUN-MAXRUN are searched in RCDB
MINRUN = 71350 # If RUNS is empty, then RCDB queried for this range
MAXRUN = 71591 # If RUNS is empty, then RCDB queried for this range
#MINRUN = 41511 # If RUNS is empty, then RCDB queried for this range
#MAXRUN = 51768 # If RUNS is empty, then RCDB queried for this range
#MAXRUN = 41511 # If RUNS is empty, then RCDB queried for this range
MINFILENO = 0 # Min file number to process for each run (n.b. file numbers start at 0!)
#MAXFILENO = 1000 # Max file number to process for each run (n.b. file numbers start at 0!)
MAXFILENO = 5 # Max file number to process for each run (n.b. file numbers start at 0!)
FILE_FRACTION = 1.0 # Fraction of files to process for each run in specified range (see GetFileNumbersToProcess)
MAX_CONCURRENT_JOBS = '2100' # Maximum number of jobs swif2 will have in flight at once
EXCLUDE_RUNS = [] # Runs that should be excluded from processing
PROJECT = 'm3120'
TIMELIMIT = '24:00:00' # Set time limit (2.4 timeslonger for KNL than haswell)
QOS = 'regular' # debug, regular, premium, low, flex, scavenger
NODETYPE = 'knl' # haswell, knl (quad,cache)
IMAGE = 'docker:markito3/gluex_docker_devel'
#RECONVERSION = 'halld_recon/halld_recon-recon-2018_08-ver02' # must exist in /group/halld/Software/builds/Linux_CentOS7-x86_64-gcc4.8.5-cntr
RECONVERSION = 'halld_recon/halld_recon-4.16.1' # must exist in /group/halld/Software/builds/Linux_CentOS7-x86_64-gcc4.8.5-cntr
#halld_recon-4.15.0
SCRIPTFILE = '/launch/script_nersc.sh'
CONFIG = '/launch/jana_'+LAUNCHTYPE+'_nersc.config'
RCDB_HOST = 'hallddb.jlab.org'
RCDB_USER = 'rcdb'
RCDB = None
BAD_RCDB_QUERY_RUNS = [] # will be filled with runs that are missing evio_file_count field in RCDB query
BAD_FILE_COUNT_RUNS = [] # will be filled with runs where number of evio files could not be obtained by any method
# Set output directory depending on launch type
if LAUNCHTYPE=='offmon':
OUTPUTTOP = 'mss:/mss/halld/halld-scratch/offline_monitoring/RunPeriod-'+RUNPERIOD+'/ver'+VER # prefix with mss: for tape or file: for filesyste
elif LAUNCHTYPE=='recon':
OUTPUTTOP = 'mss:/mss/halld/RunPeriod-'+RUNPERIOD+'/recon/ver'+VER
else:
print 'Unknown launch type "'+LAUNCHTYPE+'"! Don\'t know where to put output files!'
sys.exit(-1)
#----------------------------------------------------
def MakeJob(RUN,FILE):
global Njobs, NJOBS_SUBMITTED, DIRS_CREATED
JOB_STR = '%s_%06d_%03d' % (NAME, RUN, FILE)
EVIOFILE = 'hd_rawdata_%06d_%03d.evio' % (RUN, FILE)
MSSFILE = '/mss/halld/RunPeriod-%s/rawdata/Run%06d/%s' % (RUNPERIOD, RUN, EVIOFILE)
# The OUTPUTDIR variable is a fully qualified path used to pre-create
# the output directories for the job files. If the files are going to
# /mss, then we must replace the mss:/mss part at the beginning with
# /lustre/expphy/cache. Otherwise, just use the path as given in OUTPUTTOP
OUTPUTDIR = OUTPUTTOP.split(':',1)[1] # just directory part
if OUTPUTTOP.startswith('mss:/mss'): OUTPUTDIR = OUTPUTDIR.replace('/mss','/lustre/expphy/cache')
# Get list of output file names and mappings to final directory and file name.
# The outfiles variable is a map of local file(key) to output file with path(value)
# The path is relative to the OUTPUTDIR directory.
if LAUNCHTYPE == 'recon':
outfiles = ReconOutFiles(RUN, FILE)
elif LAUNCHTYPE == 'offmon':
outfiles = OffmonOutFiles(RUN, FILE)
else:
print 'Unknown launch type (' + LAUNCHTYPE + ')! Unable to form output file list'
# SLURM options
SBATCH = ['-sbatch']
SBATCH += ['-A', PROJECT]
SBATCH += ['--volume="/global/project/projectdirs/%s/launch:/launch"' % PROJECT]
SBATCH += ['--image=%s' % IMAGE]
SBATCH += ['--time=%s' % TIMELIMIT]
SBATCH += ['--nodes=1']
SBATCH += ['--tasks-per-node=1']
SBATCH += ['--cpus-per-task=64']
SBATCH += ['--qos='+QOS]
SBATCH += ['-C', NODETYPE]
SBATCH += ['-L', 'project']
# Command for job to run
CMD = ['/global/project/projectdirs/%s/launch/run_shifter.sh' % PROJECT]
CMD += ['--module=cvmfs']
CMD += ['--']
CMD += [SCRIPTFILE]
CMD += [CONFIG] # arg 1: JANA config file
CMD += [RECONVERSION] # arg 2: sim-recon version
CMD += [str(RUN)] # arg 3: run <--+ run and file number used to name job_info
CMD += [str(FILE)] # arg 4: file <--+ directory only.
# Make swif2 command
SWIF2_CMD = ['swif2']
SWIF2_CMD += ['add-job']
SWIF2_CMD += ['-workflow', WORKFLOW]
SWIF2_CMD += ['-name', JOB_STR]
SWIF2_CMD += ['-input', EVIOFILE, 'mss:'+MSSFILE]
for src,dest in outfiles.iteritems(): SWIF2_CMD += ['-output', src, OUTPUTTOP + '/' + dest]
SWIF2_CMD += SBATCH + ['::'] + CMD
# Get list of output directories so we can pre-create them with proper
# permissions. Normally, we wouldn't have to make the directories, but if using
# a Globus account with a different user than the one running swif2,
# there will be permissions errors otherwise.
outdirs = []
for (infile, outpath) in outfiles.iteritems():
if infile.startswith('match:'): # If using wildcards, the outputpath already is the output directory
outdirs.append(outpath)
else:
outdirs.append(os.path.dirname(outpath))
# Pare down list of outdirs to only those that don't already exist
new_outdirs = [x for x in outdirs if x not in DIRS_CREATED]
# Set umask to make directories group writable (but not world writable)
os.umask(0002)
# Print commands
if VERBOSE > 1:
for d in new_outdirs: print 'mkdir -p ' + OUTPUTDIR + '/' + d
if VERBOSE > 2 : print ' '.join(SWIF2_CMD)
elif VERBOSE > 1 : print ' --- Job will be created for run:' + str(RUN) + ' file:' + str(FILE)
if not TESTMODE:
for d in new_outdirs:
if not os.path.exists(OUTPUTDIR + '/' + d) :
os.makedirs(OUTPUTDIR + '/' + d)
DIRS_CREATED.append(OUTPUTDIR + '/' + d)
subprocess.check_call(SWIF2_CMD)
NJOBS_SUBMITTED += 1
#----------------------------------------------------
def OffmonOutFiles(RUN, FILE):
# Return list of output directory/filename mappings for a
# offline monitoring job.
# Map of local file(key) to output file(value)
RFSTR = '%06d_%03d' % (RUN, FILE)
outfiles = {}
outfiles['job_info_%s.tgz' % RFSTR ] = 'job_info/%06d/job_info_%s.tgz' % (RUN, RFSTR)
#outfiles['dana_rest_coherent_peak.hddm' ] = 'dana_rest_coherent_peak/%06d/dana_rest_coherent_peak_%s.hddm' % (RUN, RFSTR)
outfiles['dana_rest.hddm' ] = 'REST/%06d/dana_rest_%s.hddm' % (RUN, RFSTR)
outfiles['converted_random.hddm' ] = 'converted_random/%06d/converted_random_%s.hddm' % (RUN, RFSTR)
#outfiles['hd_rawdata_%s.omega.evio' % RFSTR ] = 'omega/%06d/omega_%s.evio' % (RUN, RFSTR)
outfiles['hd_root.root' ] = 'hists/%06d/hd_root_%s.root' % (RUN, RFSTR)
#outfiles['p3pi_excl_skim.root' ] = 'p3pi_excl_skim/%06d/p3pi_excl_skim_%s.root' % (RUN, RFSTR)
outfiles['tree_bcal_hadronic_eff.root' ] = 'tree_bcal_hadronic_eff/%06d/tree_bcal_hadronic_eff_%s.root' % (RUN, RFSTR)
outfiles['tree_fcal_hadronic_eff.root' ] = 'tree_fcal_hadronic_eff/%06d/tree_fcal_hadronic_eff_%s.root' % (RUN, RFSTR)
outfiles['tree_PSFlux.root' ] = 'tree_PSFlux/%06d/tree_PSFlux_%s.root' % (RUN, RFSTR)
outfiles['tree_trackeff.root' ] = 'tree_trackeff/%06d/tree_trackeff_%s.root' % (RUN, RFSTR)
outfiles['tree_p2k_dirc.root' ] = 'tree_p2k_dirc/%06d/tree_p2k_dirc_%s.root' % (RUN, RFSTR)
outfiles['tree_p2pi_dirc.root' ] = 'tree_p2k_dirc/%06d/tree_ppi_dirc_%s.root' % (RUN, RFSTR)
#outfiles['TOF_TDC_shift_%06d.txt' % RUN ] = 'TOF_TDC_shift/%06d/TOF_TDC_shift_%s.txt' % (RUN, RFSTR)
outfiles['hd_root_tofcalib.root' ] = 'hd_root_tofcalib/%06d/hd_root_tofcalib_%s.root' % (RUN, RFSTR)
#outfiles['hd_rawdata_%s.random.evio' % RFSTR ] = 'random/%06d/hd_rawdata_%s.random.evio' % (RUN, RFSTR)
#outfiles['hd_rawdata_%s.BCAL-LED.evio' % RFSTR ] = 'BCAL-LED/%06d/hd_rawdata_%s.BCAL-LED.evio' % (RUN, RFSTR)
#outfiles['hd_rawdata_%s.FCAL-LED.evio' % RFSTR ] = 'FCAL-LED/%06d/hd_rawdata_%s.FCAL-LED.evio' % (RUN, RFSTR)
#outfiles['hd_rawdata_%s.sync.evio' % RFSTR ] = 'sync/%06d/hd_rawdata_%s.sync' % (RUN, RFSTR)
#outfiles['tree_TPOL.root' ] = 'TPOL_tree/%06d/tree_TPOL_%s.root' % (RUN, RFSTR)
outfiles['pi0fcaltofskim.evio' ] = 'pi0fcaltofskim/%06d/pi0fcaltofskim_%s.evio' % (RUN, RFSTR)
outfiles['syncskim.evio' ] = 'syncskim/%06d/syncskim_%s.root' % (RUN, RFSTR)
outfiles['TPOL_tree.root' ] = 'TPOL_tree/%06d/TPOL_tree_%s.root' % (RUN, RFSTR)
return outfiles
#----------------------------------------------------
def ReconOutFiles(RUN, FILE):
# Return list of output directory/filename mappings for a
# reconstruction job.
# Map of local file(key) to output file(value)
RFSTR = '%06d_%03d' % (RUN, FILE)
outfiles = {}
outfiles['job_info_%s.tgz' % RFSTR ] = 'job_info/%06d/job_info_%s.tgz' % (RUN, RFSTR)
outfiles['match:converted_random*.hddm' ] = 'converted_random/%06d' % (RUN)
outfiles['dana_rest_coherent_peak.hddm' ] = 'dana_rest_coherent_peak/%06d/dana_rest_coherent_peak_%s.hddm' % (RUN, RFSTR)
outfiles['dana_rest.hddm' ] = 'REST/%06d/dana_rest_%s.hddm' % (RUN, RFSTR)
#outfiles['hd_rawdata_%s.exclusivepi0.evio' % RFSTR] = 'exclusivepi0/%06d/exclusivepi0_%s.evio' % (RUN, RFSTR)
outfiles['hd_rawdata_%s.omega.evio' % RFSTR ] = 'omega/%06d/omega_%s.evio' % (RUN, RFSTR)
outfiles['hd_rawdata_%s.pi0bcalskim.evio' % RFSTR ] = 'pi0bcalskim/%06d/pi0bcalskim_%s.evio' % (RUN, RFSTR)
outfiles['hd_root.root' ] = 'hists/%06d/hd_root_%s.root' % (RUN, RFSTR)
outfiles['p3pi_excl_skim.root' ] = 'p3pi_excl_skim/%06d/p3pi_excl_skim_%s.root' % (RUN, RFSTR)
outfiles['tree_bcal_hadronic_eff.root' ] = 'tree_bcal_hadronic_eff/%06d/tree_bcal_hadronic_eff_%s.root' % (RUN, RFSTR)
outfiles['tree_fcal_hadronic_eff.root' ] = 'tree_fcal_hadronic_eff/%06d/tree_fcal_hadronic_eff_%s.root' % (RUN, RFSTR)
outfiles['tree_PSFlux.root' ] = 'tree_PSFlux/%06d/tree_PSFlux_%s.root' % (RUN, RFSTR)
outfiles['tree_sc_eff.root' ] = 'tree_sc_eff/%06d/tree_sc_eff_%s.root' % (RUN, RFSTR)
outfiles['tree_tof_eff.root' ] = 'tree_tof_eff/%06d/tree_tof_eff_%s.root' % (RUN, RFSTR)
outfiles['tree_TPOL.root' ] = 'tree_TPOL/%06d/tree_TPOL_%s.root' % (RUN, RFSTR)
outfiles['tree_trackeff.root' ] = 'tree_trackeff/%06d/tree_trackeff_%s.root' % (RUN, RFSTR)
outfiles['tree_TS_scaler.root' ] = 'tree_TS_scaler/%06d/tree_TS_scaler_%s.root' % (RUN, RFSTR)
#outfiles['hd_rawdata_%s.cal_high_energy_skim.evio' % RFSTR] = 'cal_high_energy_skim/%06d/cal_high_energy_skim_%s.evio'% (RUN, RFSTR)
return outfiles
#----------------------------------------------------
def GetRunInfo():
# Get the list of runs to process and the number of EVIO files for each.
# The list is returned in the form of a dictionary with the run numbers
# as keys and the maximum evio file number for that run as values.
# Which runs show up in the list depends on how the RUNS and RCDB_QUERY
# globals are set:
#
# RUNS is not None: All runs in the list are included
# RUNS is empty and RCDB_QUERY is None: All runs in the range MINRUN-MAXRUN inclusive are included
# RUNS is empty and RCDB_QUERY is not None: RCDB is queried for the list of runs.
#
# n.b. that for the first 2 options above, the GetNumEVIOFiles routine
# below is called which queries the RCDB via mysql directly so the RCDB
# python module does not actually need to be in PYTHONPATH. For the 3rd
# option, the RCDB python API is used so it is needed.
global RUNS, MINRUN, MAXRUN, RCDB_QUERY, RUN_LIST_SOURCE, BAD_RCDB_QUERY_RUNS, BAD_FILE_COUNT_RUNS
good_runs = {}
# If RCDB_QUERY is not defined, define with value None
try: RCDB_QUERY
except : RCDB_QUERY = None
# Query through RCDB API
if len(RUNS)==0 and RCDB_QUERY!=None:
RUN_LIST_SOURCE = 'RCDB ' + str(MINRUN) + '-' + str(MAXRUN) + ' (query="' + RCDB_QUERY + '")'
print 'Querying RCDB for run list ....'
# Import RCDB python module. Add a path on the CUE just in case
# PYTHONPATH is not already set
sys.path.append('/group/halld/Software/builds/Linux_CentOS7-x86_64-gcc4.8.5/rcdb/rcdb_0.06.00/python')
import rcdb
db = rcdb.RCDBProvider('mysql://' + RCDB_USER + '@' + RCDB_HOST + '/rcdb')
print 'RCDB_QUERY = ' + RCDB_QUERY
for r in db.select_runs(RCDB_QUERY, MINRUN, MAXRUN):
evio_files_count = r.get_condition_value('evio_files_count')
if evio_files_count == None:
print('ERROR in RCDB: Run ' + str(r.number) + ' has no value for evio_files_count!...')
BAD_RCDB_QUERY_RUNS.append( int(r.number) )
print('Attempting to extract number of files by examining /mss ...')
rawdatafiles = glob.glob('/mss/halld/RunPeriod-'+RUNPERIOD+'/rawdata/Run%06d/hd_rawdata_%06d_*.evio' % (r.number,r.number))
if len(rawdatafiles) > 0: evio_files_count = len(rawdatafiles)
if evio_files_count == None:
print('ERROR getting number of files for: Run ' + str(r.number) )
BAD_FILE_COUNT_RUNS.append( int(r.number) )
continue
good_runs[r.number] = int(evio_files_count)
elif len(RUNS)==0 :
RUN_LIST_SOURCE = 'All runs in range ' + str(MINRUN) + '-' + str(MAXRUN)
print 'Getting info for all runs in range ' + str(MINRUN) + '-' + str(MAXRUN) + ' ....'
for RUN in range(MINRUN, MAXRUN+1): good_runs[RUN] = GetNumEVIOFiles(RUN)
else:
RUN_LIST_SOURCE = 'Custom list: ' + ' '.join([str(x) for x in RUNS])
print 'Getting info for runs : ' + ' '.join([str(x) for x in RUNS])
for RUN in RUNS: good_runs[RUN] = GetNumEVIOFiles(RUN)
# Filter out runs in the EXCLUDE_RUNS list
global EXCLUDE_RUNS
good_runs_filtered = {}
for run in good_runs.keys():
if run not in EXCLUDE_RUNS: good_runs_filtered[run] = good_runs[run]
return good_runs_filtered
#----------------------------------------------------
def GetNumEVIOFiles(RUN):
global BAD_RCDB_QUERY_RUNS, BAD_FILE_COUNT_RUNS
# Access RCDB to get the number of EVIO files for this run.
# n.b. the file numbers start from 0 so the last valid file
# number will be one less than the value returned
global RCDB, cnx, cur
if not RCDB :
try:
RCDB = 'mysql://' + RCDB_USER + '@' + RCDB_HOST + '/rcdb'
cnx = mysql.connector.connect(user=RCDB_USER, host=RCDB_HOST, database='rcdb')
cur = cnx.cursor() # using dictionary=True crashes when running on ifarm (??)
except Exception as e:
print 'Error connecting to RCDB: ' + RCDB
print str(e)
sys.exit(-1)
Nfiles = 0
sql = 'SELECT int_value from conditions,condition_types WHERE condition_type_id=condition_types.id'
sql += ' AND condition_types.name="evio_files_count" AND run_number=' + str(RUN);
cur.execute(sql)
c_rows = cur.fetchall()
if len(c_rows)>0 :
Nfiles = int(c_rows[0][0])
else:
BAD_RCDB_QUERY_RUNS.append(RUN)
print('Attempting to extract number of files by examining /mss ...')
rawdatafiles = glob.glob('/mss/halld/RunPeriod-'+RUNPERIOD+'/rawdata/Run%06d/hd_rawdata_%06d_*.evio' % (RUN,RUN))
if len(rawdatafiles) > 0:
Nfiles = len(rawdatafiles)
else:
BAD_FILE_COUNT_RUNS.append(RUN)
return Nfiles
#----------------------------------------------------
def GetFileNumbersToProcess(Nfiles):
# This will return a list of file numbers to process for a run
# given the number of files given by Nfiles. The list is determined
# by the values MINFILENO, MAXFILENO and FILE_FRACTION.
# First, the actual range of file numbers for the run is determined
# by MINFILENO-MAXFILENO, but clipped if necessary to Nfiles.
# Next, a list of files in that range representing FILE_FRACTION
# of the range is formed and returned.
#
# Example 1: Process first 10 files of each run:
# MINFILENO = 0
# MAXFILENO = 9
# FILE_FRACTION = 1.0
#
# Example 2: Process first 5% of files of each run of
# files distributed throughout run:
# MINFILENO = 0
# MAXFILENO = 1000 n.b. set this to something really big
# FILE_FRACTION = 0.05
#
global MINFILENO, MAXFILENO, FILE_FRACTION
# Make sure MINFILENO is not greater than max file number for the run
if MINFILENO >= Nfiles : return []
# Limit max file number to how many there are for this run according to RCDB
maxfile = MAXFILENO+1 # set to 1 past the actual last file number we want to process
if Nfiles < maxfile : maxfile = Nfiles
# If FILE_FRACTION is 1.0 then we want all files in the range.
if FILE_FRACTION == 1.0: return range( MINFILENO, maxfile)
# At this point, maxfile should be one greater than the last file
# number we want to process. If the last file we want to process
# is the last file in the run, then it could be a short file. Thus,
# use the next to the last file in the run to determine the range.
if Nfiles < MAXFILENO : maxfile -= 1
# Number of files in run to process
Nrange = float(maxfile-1) - float(MINFILENO)
N = math.ceil(FILE_FRACTION * Nrange)
if N<2 : return [MINFILENO]
nskip = Nrange/(N-1)
filenos = []
for i in range(0, int(N)): filenos.append(int(i*nskip))
# print 'Nrange=%f N=%f nskip=%f ' % (Nrange, N, nskip)
return filenos
#----------------------------------------------------
# --------------- MAIN --------------------
# Get list of runs with number of evio files for each.
# (parameters for search set at top of file)
good_runs = GetRunInfo()
# Print some info before doing anything
Njobs = 0
for n in [x for (y,x) in good_runs.iteritems()]:
Njobs += len(GetFileNumbersToProcess(n))
# if n<MAXFILENO : Njobs += (n-MINFILENO+1)
# else: Njobs += (MAXFILENO-MINFILENO+1)
if VERBOSE > 0:
print '================================================='
print 'Launch Summary ' + ('**** TEST MODE ****' if TESTMODE else '')
print '-----------------------------------------------'
print ' RunPeriod: ' + RUNPERIOD
print ' Launch type: ' + LAUNCHTYPE
print ' Version: ' + VER
print ' batch: ' + BATCH
print ' WORKFLOW: ' + WORKFLOW
print ' Origin of run list: ' + RUN_LIST_SOURCE
print ' Number of runs: ' + str(len(good_runs))
print ' Number of jobs: ' + str(Njobs) + ' (maximum ' + str(MAXFILENO-MINFILENO+1) + ' files/run)'
print ' Min. file no.: ' + str(MINFILENO)
print ' Max. file no.: ' + str(MAXFILENO)
print ' NERSC project: ' + PROJECT
print ' Time limit per job: ' + TIMELIMIT
print ' Shifter/Docker image: ' + IMAGE
print ' halld_recon version: ' + RECONVERSION + ' (from CVMFS)'
print ' Output directory: ' + OUTPUTTOP
print '================================================='
# Create workflow
cmd = ['swif2', 'create', '-workflow', WORKFLOW]
cmd += ['-site', 'nersc/cori', '-site-storage', 'nersc:'+PROJECT]
cmd += ['-max-concurrent', MAX_CONCURRENT_JOBS]
if VERBOSE>0 : print 'Workflow creation command: ' + ' '.join(cmd)
if TESTMODE:
print '(TEST MODE so command will not be run)'
else:
(cmd_out, cmd_err) = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
if VERBOSE>0:
if len(cmd_err)>0 :
if VERBOSE>1 : print cmd_err
print 'Command returned error message. Assuming workflow already exists'
else:
print cmd_out
# Run workflow
cmd = ['swif2', 'run', '-workflow', WORKFLOW]
if VERBOSE>0 : print 'Command to start workflow: ' + ' '.join(cmd)
if TESTMODE:
print '(TEST MODE so command will not be run)'
else:
(cmd_out, cmd_err) = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
if VERBOSE>0:
print cmd_out
print cmd_err
# Loop over runs
NJOBS_SUBMITTED = 0
DIRS_CREATED = [] # keeps track of local directories we create so we don't create them twice
if VERBOSE>0 :
print 'Submitting jobs ....'
print '-----------------------------------------------'
for (RUN, Nfiles) in good_runs.iteritems():
# Get list of files to process
files_to_process = GetFileNumbersToProcess( Nfiles )
# Loop over files, creating job for each
for FILE in files_to_process:
MakeJob(RUN, FILE)
if VERBOSE>0:
sys.stdout.write(' ' + str(NJOBS_SUBMITTED) + '/' + str(Njobs) + ' jobs \r')
sys.stdout.flush()
print('\n')
print('NOTE: The values in BAD_RCDB_QUERY_RUNS is informative about what is missing from')
print(' the RCDB. An attempt to recover the information from the /mss filesystem')
print(' was also made. Values in BAD_FILE_COUNT_RUNS are ones for which that failed.')
print(' Thus, only runs listed in BAD_FILE_COUNT_RUNS will not have any jobs submitted')
print 'BAD_RCDB_QUERY_RUNS=' + str(BAD_RCDB_QUERY_RUNS)
print 'BAD_FILE_COUNT_RUNS=' + str(BAD_FILE_COUNT_RUNS)
print('')
print('WORKFLOW: ' + WORKFLOW)
print('------------------------------------')
print('Number of runs: ' + str(len(good_runs)) + ' (only good runs)')
print(str(NJOBS_SUBMITTED) + '/' + str(Njobs) + ' jobs submitted')
print(str(len(DIRS_CREATED)) + ' directories created for output')
print('')