-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrun_analysis.py
More file actions
439 lines (354 loc) · 14.4 KB
/
run_analysis.py
File metadata and controls
439 lines (354 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
#!/usr/bin/env python3
"""
Main execution script for DarkBottomLine framework.
"""
import argparse
import logging
import time
import yaml
import os
from pathlib import Path
from typing import Dict, Any, Optional, Union, List
try:
from coffea import processor
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
COFFEA_AVAILABLE = True
except ImportError:
COFFEA_AVAILABLE = False
logging.warning("Coffea not available. Using fallback implementation.")
from darkbottomline.processor import DarkBottomLineProcessor, DarkBottomLineCoffeaProcessor
from darkbottomline.utils.chunk_optimizer import (
optimize_chunk_size_for_files,
parse_chunk_size_arg,
)
def setup_logging(level: str = "INFO"):
"""Setup logging configuration."""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def load_config(config_path: str) -> Dict[str, Any]:
"""
Load configuration from YAML file.
Args:
config_path: Path to configuration file
Returns:
Configuration dictionary
"""
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Add command line options to config
config['save_skims'] = True # Default to saving skims
return config
def load_events(input_path: str, max_events: Optional[int] = None) -> Any:
"""
Load events from input file.
Args:
input_path: Path to input file
max_events: Maximum number of events to process
Returns:
Events array
"""
if COFFEA_AVAILABLE:
# Use Coffea NanoEvents
events = NanoEventsFactory.from_root(
input_path,
schemaclass=NanoAODSchema,
maxchunks=max_events
).events()
return events
else:
# Fallback to uproot
try:
import uproot
with uproot.open(input_path) as f:
events = f["Events"].arrays()
if max_events:
events = events[:max_events]
return events
except ImportError:
raise ImportError("Neither Coffea nor uproot available. Cannot load events.")
def run_analysis_iterative(
events: Any,
processor: DarkBottomLineProcessor
) -> Dict[str, Any]:
"""
Run analysis using iterative execution.
Args:
events: Events to process
processor: Analysis processor
Returns:
Analysis results
"""
logging.info("Running analysis iteratively...")
start_time = time.time()
results = processor.process(events)
processing_time = time.time() - start_time
logging.info(f"Iterative processing completed in {processing_time:.2f} seconds")
return results
def run_analysis_futures(
fileset: Dict[str, List[str]],
processor_instance: DarkBottomLineCoffeaProcessor,
workers: int = 4,
chunksize: int = 50000,
maxchunks: Optional[int] = None
) -> Dict[str, Any]:
"""
Run analysis using futures executor with run_uproot_job.
Args:
fileset: Dictionary mapping dataset names to file paths
processor_instance: Coffea-compatible processor instance
workers: Number of parallel workers
chunksize: Number of events per chunk
maxchunks: Optional limit on number of chunks to process
Returns:
Analysis results
"""
if not COFFEA_AVAILABLE:
logging.warning("Coffea not available. Falling back to iterative execution.")
return None
from coffea.processor import run_uproot_job, FuturesExecutor
logging.info(f"Running analysis with futures executor ({workers} workers, chunksize={chunksize})...")
if maxchunks:
logging.info(f"Limiting to {maxchunks} chunks")
start_time = time.time()
# Pass executor class, not instance - run_uproot_job will instantiate it
result = run_uproot_job(
fileset,
"Events",
processor_instance,
executor=FuturesExecutor,
executor_args={"workers": workers},
chunksize=chunksize,
maxchunks=maxchunks,
)
processing_time = time.time() - start_time
logging.info(f"Futures processing completed in {processing_time:.2f} seconds")
return result
def run_analysis_dask(
fileset: Dict[str, List[str]],
processor_instance: DarkBottomLineCoffeaProcessor,
workers: int = 4,
chunksize: int = 200000,
maxchunks: Optional[int] = None
) -> Dict[str, Any]:
"""
Run analysis using Dask executor with run_uproot_job.
Args:
fileset: Dictionary mapping dataset names to file paths
processor_instance: Coffea-compatible processor instance
workers: Number of parallel workers
chunksize: Number of events per chunk
maxchunks: Optional limit on number of chunks to process
Returns:
Analysis results
"""
if not COFFEA_AVAILABLE:
logging.warning("Coffea not available. Falling back to iterative execution.")
return None
try:
from dask.distributed import Client
from coffea.processor import run_uproot_job, DaskExecutor
logging.info(f"Running analysis with Dask executor ({workers} workers, chunksize={chunksize})...")
if maxchunks:
logging.info(f"Limiting to {maxchunks} chunks")
start_time = time.time()
# Start Dask client
client = None
try:
client = Client(n_workers=workers, timeout=120)
# Wait for workers to be ready (with timeout)
try:
client.wait_for_workers(workers, timeout=60)
logging.info(f"Dask client ready with {len(client.scheduler_info()['workers'])} workers")
except Exception as e:
logging.warning(f"Timeout waiting for workers, continuing anyway: {e}")
# For DaskExecutor, we need to pass the client in executor_args
# since run_uproot_job will instantiate the executor
result = run_uproot_job(
fileset,
"Events",
processor_instance,
executor=DaskExecutor,
executor_args={"client": client},
chunksize=chunksize,
maxchunks=maxchunks,
)
processing_time = time.time() - start_time
logging.info(f"Dask processing completed in {processing_time:.2f} seconds")
return result
except Exception as e:
logging.error(f"Dask execution error: {e}")
raise
finally:
# Ensure client is properly closed
if client is not None:
try:
client.close()
except Exception as e:
logging.warning(f"Error closing Dask client: {e}")
except ImportError:
logging.warning("Dask not available. Falling back to iterative execution.")
return None
def save_results(results: Dict[str, Any], output_path: str):
"""
Save analysis results to file.
Args:
results: Analysis results
output_path: Output file path
"""
logging.info(f"Saving results to {output_path}")
if output_path.endswith('.parquet'):
save_parquet_results(results, output_path)
elif output_path.endswith('.root'):
save_root_results(results, output_path)
else:
save_pickle_results(results, output_path)
def save_parquet_results(results: Dict[str, Any], output_path: str):
"""Save results as Parquet file."""
try:
import pandas as pd
import awkward as ak
# Convert histograms to DataFrame format
data = {}
for name, hist in results.get("histograms", {}).items():
if hasattr(hist, 'values'):
data[name] = hist.values().flatten()
else:
data[name] = hist.get("values", [])
df = pd.DataFrame(data)
df.to_parquet(output_path)
logging.info(f"Saved results to {output_path}")
except ImportError:
logging.warning("pandas not available. Falling back to pickle.")
save_pickle_results(results, output_path)
def save_root_results(results: Dict[str, Any], output_path: str):
"""Save results as ROOT file."""
try:
import uproot
with uproot.recreate(output_path) as f:
# Save histograms
for name, hist in results.get("histograms", {}).items():
if hasattr(hist, 'values'):
f[name] = hist
# Save metadata
f["metadata"] = results.get("metadata", {})
logging.info(f"Saved results to {output_path}")
except ImportError:
logging.warning("uproot not available. Falling back to pickle.")
save_pickle_results(results, output_path)
def save_pickle_results(results: Dict[str, Any], output_path: str):
"""Save results as pickle file."""
import pickle
with open(output_path, 'wb') as f:
pickle.dump(results, f)
logging.info(f"Saved results to {output_path}")
def main():
"""Main function."""
parser = argparse.ArgumentParser(description="DarkBottomLine Analysis Framework")
# Required arguments
parser.add_argument("--config", required=True, help="Path to configuration YAML file")
parser.add_argument("--input", required=True, help="Path to input NanoAOD file")
parser.add_argument("--output", required=True, help="Path to output file")
# Optional arguments
parser.add_argument("--executor", choices=["iterative", "futures", "dask"],
default="iterative", help="Execution backend")
parser.add_argument("--workers", type=int, default=4,
help="Number of parallel workers")
parser.add_argument("--chunk-size", type=str, default="50000",
help="Number of events per chunk for futures/dask executors. Use 'auto' for automatic optimization, or specify an integer (default: 50000)")
parser.add_argument("--max-events", type=int, default=None,
help="Maximum number of events to process (converted to maxchunks for run_uproot_job)")
parser.add_argument("--save-skims", action="store_true",
help="Save skimmed events")
parser.add_argument("--data", action="store_true",
help="Input is collision data: apply golden JSON lumi mask and skip MC-only weights")
parser.add_argument("--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO", help="Logging level")
args = parser.parse_args()
# Setup logging
setup_logging(args.log_level)
# Load configuration
logging.info(f"Loading configuration from {args.config}")
config = load_config(args.config)
# Override config with command line arguments
config['save_skims'] = args.save_skims
if args.data:
config.setdefault("data", {})["is_data"] = True
# Initialize processor
logging.info("Initializing processor...")
base_processor = DarkBottomLineProcessor(config)
# Parse chunk size argument (can be "auto" or int)
chunk_size = None
if isinstance(args.chunk_size, str):
chunk_size = parse_chunk_size_arg(args.chunk_size)
else:
chunk_size = args.chunk_size
# Get input files list (needed for auto-optimization and processing)
input_files = []
if args.input.endswith('.txt'):
with open(args.input, 'r') as f:
input_files = [line.strip() for line in f if line.strip() and not line.strip().startswith('#')]
else:
input_files = [args.input]
# Auto-optimize chunk size if requested
if chunk_size is None and args.executor in ["futures", "dask"]:
logging.info("Auto-optimizing chunk size based on input files...")
try:
# Estimate available memory (default: 8GB per worker, conservative)
# This is a rough estimate - users can override with explicit chunk-size
available_memory_mb = 8000 # 8GB default
chunk_size = optimize_chunk_size_for_files(
input_files=input_files,
available_memory_mb=available_memory_mb,
num_workers=args.workers,
executor=args.executor,
)
logging.info(f"Auto-optimized chunk size: {chunk_size:,} events")
except Exception as e:
logging.warning(f"Failed to auto-optimize chunk size: {e}")
# Fallback to defaults
chunk_size = 50000 if args.executor == "futures" else 200000
logging.info(f"Using default chunk size: {chunk_size:,} events")
# Run analysis
logging.info(f"Starting analysis with {args.executor} executor...")
start_time = time.time()
if args.executor == "iterative":
# Load events for iterative mode
logging.info(f"Loading events from {args.input}")
events = load_events(args.input, args.max_events)
results = run_analysis_iterative(events, base_processor)
elif args.executor == "futures":
# Use run_uproot_job with FuturesExecutor
fileset = {"dataset": input_files}
chunksize = chunk_size
maxchunks = None
if args.max_events:
# Calculate maxchunks based on max_events and chunksize
maxchunks = (args.max_events + chunksize - 1) // chunksize
coffea_processor = DarkBottomLineCoffeaProcessor(config)
results = run_analysis_futures(fileset, coffea_processor, args.workers, chunksize, maxchunks)
elif args.executor == "dask":
# Use run_uproot_job with DaskExecutor
fileset = {"dataset": input_files}
chunksize = chunk_size if chunk_size is not None else 200000 # Default to 200k for dask
maxchunks = None
if args.max_events:
maxchunks = (args.max_events + chunksize - 1) // chunksize
coffea_processor = DarkBottomLineCoffeaProcessor(config)
results = run_analysis_dask(fileset, coffea_processor, args.workers, chunksize, maxchunks)
else:
raise ValueError(f"Unknown executor: {args.executor}")
total_time = time.time() - start_time
logging.info(f"Analysis completed in {total_time:.2f} seconds")
# Save results
save_results(results, args.output)
# Print summary
if "cutflow" in results:
print("\n" + processor.get_cutflow_summary())
if "metadata" in results:
print("\n" + processor.get_processing_summary())
logging.info("Analysis completed successfully!")
if __name__ == "__main__":
main()