-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexpmanager.py
More file actions
760 lines (697 loc) · 34.1 KB
/
expmanager.py
File metadata and controls
760 lines (697 loc) · 34.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
import enum
import subprocess
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import glob
import shutil
import time
from typing import Callable, Optional
import re
import threading
class Status(enum.Enum):
NOT_STARTED = "not_started"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
# while loop until the user presses Ctrl+C
def loop_until_ctrl_c(callback: Optional[Callable[[], None]] = None, interval: float = 1.0):
"""
Run repeatedly until the user presses Ctrl+C.
If a callback is provided, it will be invoked each iteration.
"""
try:
while True:
if callback:
try:
callback()
except Exception:
# Ignore callback errors to keep the loop running
pass
time.sleep(interval)
except KeyboardInterrupt:
print("\nInterrupted by user. Exiting loop.")
class Experiment():
def __init__(self, dll: str, mode: str, ver: str, api: str, cpus: int = 16, mem: int = 16, check_valid: bool = False, time_budget: int = 180, itv: int = 60, debug: bool = False, slurm: bool = False, vs: Optional[str] = None, gpu: bool = False):
self.dll = dll
self.mode = mode
self.ver = ver
self.api = api
self.cpus = cpus
self.mem = mem
self.check_valid = check_valid
self.time_budget = time_budget
self.status = Status.NOT_STARTED
self.image_name = f"ncsuswat/flashfuzz:{self.dll}{self.ver}-{self.mode}{'-gpu' if gpu else ''}"
self.vs = vs
self.gpu = gpu
# include vs tag in container name if provided
self.container_name = (
f"{self.api}_{self.dll}{self.ver}_{self.mode}_{self.vs}"
if self.vs else f"{self.api}_{self.dll}{self.ver}_{self.mode}"
)
self.container_id = None
self.itv = itv
self.debug = debug
self.slurm = slurm
# result directory includes vs tag if provided to disambiguate baselines
vs_suffix = f"-{self.vs}" if self.vs else ""
if self.api == "all":
self.result_dir = f"./_{self.mode}_result/{self.dll}{self.ver}-{self.mode}-{self.time_budget}s{vs_suffix}/"
else:
self.result_dir = f"./_{self.mode}_result/{self.dll}{self.ver}-{self.mode}-{self.time_budget}s{vs_suffix}/{self.api}/"
self.check_image()
def check_image(self):
cmd = f"docker images -q {self.image_name}"
proc = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE)
out, _ = proc.communicate()
image_available = out.decode().strip() != ""
if not image_available:
print(f"Image {self.image_name} not found. Please build it first.")
raise RuntimeError(f"Image {self.image_name} not found. Please build it first.")
# print(f"Image {self.image_name} is available.")
def classify_with_itv(self, itv: int):
# check if dir exists
if not os.path.exists(self.result_dir):
print(f"Result directory {self.result_dir} does not exist. Cannot classify.")
return
# Allow defaulting to instance interval
itv = itv or getattr(self, "itv", 60)
if itv <= 0:
print("Interval must be a positive integer (seconds).")
return
# Determine candidate experiment output dirs
dirs = glob.glob(f"{self.result_dir}/{self.dll}.*")
if not dirs:
print(f"No directories found in {self.result_dir} matching pattern {self.dll}.*")
return
for base_dir in dirs:
seed_dir = os.path.join(base_dir, "corpus")
if not os.path.isdir(seed_dir):
print(f"Corpus directory not found: {seed_dir}")
return
# Collect all files (recursively) in corpus
files: list[str] = []
for root, _dirs, fnames in os.walk(seed_dir):
for fname in fnames:
fpath = os.path.join(root, fname)
if os.path.isfile(fpath):
files.append(fpath)
if not files:
print(f"No files found in corpus: {seed_dir}")
return
# Sort by modification time (approx creation order on Linux)
files.sort(key=lambda p: os.path.getmtime(p))
t0 = os.path.getmtime(files[0])
# Destination base for buckets
dest_base = os.path.join(base_dir, f"corpus_itv_{itv}")
os.makedirs(dest_base, exist_ok=True)
bucket_counts: dict[str, int] = {}
for f in files:
dt = os.path.getmtime(f) - t0
bucket = int(dt // itv)
start = bucket * itv
end = (bucket + 1) * itv
bucket_name = f"{start}-{end}"
bucket_dir = os.path.join(dest_base, bucket_name)
os.makedirs(bucket_dir, exist_ok=True)
# Avoid overwriting files with same basename
base_name = os.path.basename(f)
target = os.path.join(bucket_dir, base_name)
if os.path.exists(target):
name, ext = os.path.splitext(base_name)
i = 1
while os.path.exists(target):
target = os.path.join(bucket_dir, f"{name}_{i}{ext}")
i += 1
try:
shutil.copy2(f, target)
except Exception as e:
print(f"Failed to copy {f} -> {target}: {e}")
continue
bucket_counts[bucket_name] = bucket_counts.get(bucket_name, 0) + 1
# Print summary for this base_dir
total = sum(bucket_counts.values())
print(f"Classified {total} files from {seed_dir} into {len(bucket_counts)} buckets under {dest_base}.")
def create_docker_container(self):
cmd = f'docker create --name {self.container_name} {self.image_name}'
try:
proc = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
self.container_id = proc.stdout.strip()
print(f"Created container {self.container_name} with ID: {self.container_id}")
except subprocess.CalledProcessError as e:
print(f"Failed to create container {self.container_name}.")
print(f"Stderr: {e.stderr}")
raise
def _get_allowed_cpus_list(self) -> Optional[str]:
"""Return the current process CPU allow-list in Linux format (e.g., '0-3,8-11')."""
try:
with open("/proc/self/status", "r") as f:
for line in f:
if line.startswith("Cpus_allowed_list:"):
return line.split(":", 1)[1].strip()
except Exception:
pass
# Fallback: try cgroup v2 effective cpus
for path in [
"/sys/fs/cgroup/cpuset.cpus.effective",
"/sys/fs/cgroup/cpuset/cpuset.cpus",
]:
try:
if os.path.exists(path):
content = open(path, "r").read().strip()
if content:
return content
except Exception:
continue
return None
def _count_cpus_in_list(self, cpu_list: str) -> int:
total = 0
for part in cpu_list.split(","):
part = part.strip()
if not part:
continue
if "-" in part:
a, b = part.split("-", 1)
try:
start = int(a)
end = int(b)
if end >= start:
total += (end - start + 1)
except ValueError:
continue
else:
try:
int(part)
total += 1
except ValueError:
continue
return total
def start_docker_container(self):
# Build docker run command with optional SLURM-aware CPU pinning
base_cmd = f"docker run -itd --name {self.container_name}"
extra = []
if self.slurm:
cpus_list = self._get_allowed_cpus_list()
if cpus_list:
extra.append(f"--cpuset-cpus {cpus_list}")
# Optionally also set --cpus to the allowed count for fairness
try:
count = self._count_cpus_in_list(cpus_list)
if count > 0:
extra.append(f"--cpus {count}")
except Exception:
pass
else:
extra.append(f"--cpus {self.cpus}")
if self.gpu:
extra.append("--gpus all")
# Memory limit is currently not enforced; uncomment to enable:
# extra.append(f"-m {self.mem}g")
cmd = f"{base_cmd} {' '.join(extra)} {self.image_name}"
try:
subprocess.run(cmd, shell=True, check=True)
print(f"Started container {self.container_name}.")
except subprocess.CalledProcessError as e:
print(f"Failed to start container {self.container_name}.")
print(f"Stderr: {e.stderr}")
raise
def execute_command(self, command: str):
cmd = f'docker exec {self.container_name} sh -c "{command}"'
try:
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
except Exception:
pass
def stop_docker_container(self):
cmd = f"docker stop {self.container_name}"
try:
subprocess.run(cmd, shell=True, check=True)
print(f"Stopped container {self.container_name}.")
except subprocess.CalledProcessError as e:
print(f"Failed to stop container {self.container_name}.")
print(f"Stderr: {e.stderr}")
raise
def remove_docker_container(self, prune_volumes: bool = True):
# Use -v to remove anonymous volumes associated with the container
vol_flag = "-v" if prune_volumes else ""
cmd = f"docker rm {vol_flag} {self.container_name}"
try:
subprocess.run(cmd, shell=True, check=True)
if prune_volumes:
print(f"Removed container {self.container_name} and its anonymous volumes.")
else:
print(f"Removed container {self.container_name}.")
except subprocess.CalledProcessError as e:
print(f"Failed to remove container {self.container_name}.")
print(f"Stderr: {e.stderr}")
raise
def force_remove_container(self):
"""
Force kill and remove the container along with anonymous volumes.
This will not raise on failure; it's a best-effort cleanup.
"""
cmd = f"docker rm -f -v {self.container_name}"
try:
subprocess.run(cmd, shell=True, check=False, capture_output=True, text=True)
print(f"Force removed container {self.container_name} (if it existed).")
except Exception:
# Best-effort; ignore any errors
pass
def copy_results_from_container(self, src: str, dest: str):
cmd = f"docker cp {self.container_name}:{src} {dest}"
subprocess.run(f"mkdir -p {self.result_dir}", shell=True, check=True)
try:
subprocess.run(cmd, shell=True, check=True)
print(f"Copied results from {src} to {dest}.")
except Exception:
pass
def copy_from_container(self, src: str, dest: str):
"""Copy a single file or directory from container to host, creating parent dirs."""
os.makedirs(os.path.dirname(os.path.abspath(dest)), exist_ok=True)
cmd = f"docker cp {self.container_name}:{src} {dest}"
try:
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
print(f"Copied {src} -> {dest}")
except subprocess.CalledProcessError as e:
print(f"Failed to copy {src}: {e.stderr.strip() if e.stderr else e}")
def copy_files_to_container(self, src: str, dest: str):
cmd = f"docker cp {src} {self.container_name}:{dest}"
try:
subprocess.run(cmd, shell=True, check=True)
print(f"Copied files from {src} to {dest} in container {self.container_name}.")
except subprocess.CalledProcessError as e:
print(f"Failed to copy files to container {self.container_name}.")
print(f"Stderr: {e.stderr}")
raise
def tf_fuzz(self):
self.status = Status.RUNNING
try:
self.check_image()
self.start_docker_container()
self.execute_command(f"cd /root/tensorflow/fuzz/ && python3 build_test_harness.py --dll {self.dll} --mode {self.mode} --ver {self.ver} --time_budget {self.time_budget} --no-compile")
self.execute_command(f"mkdir -p /root/tensorflow/fuzz/{self.api}/artifacts")
self.execute_command(f"cd /root/tensorflow/fuzz/{self.api} && bash fuzz.sh > execution.log")
self.copy_results_from_container(f"/root/tensorflow/fuzz/{self.api}/execution.log", self.result_dir)
self.copy_results_from_container(f"/root/tensorflow/fuzz/{self.api}/fuzz-0.log", self.result_dir)
self.copy_results_from_container(f"/root/tensorflow/fuzz/{self.api}/artifacts/", self.result_dir)
# Compute and persist validity stats for this API
try:
self._compute_and_write_stats()
except Exception as e:
print(f"Failed to compute stats for {self.api}: {e}")
self.status = Status.COMPLETED
except Exception:
self.status = Status.FAILED
def tf_check_valid(self):
self.status = Status.RUNNING
try:
self.check_image()
self.start_docker_container()
self.execute_command(f"cd /root/tensorflow/fuzz/ && python3 -u build_test_harness.py --dll {self.dll} --mode {self.mode} --check_build > check.log")
self.copy_results_from_container(f"/root/tensorflow/fuzz/{self.api}", f"{self.result_dir}/{self.api}")
os.makedirs(f"{self.result_dir}/build_status", exist_ok=True)
self.copy_results_from_container(f"/root/tensorflow/fuzz/success_apis.txt", f"{self.result_dir}/build_status/")
self.copy_results_from_container(f"/root/tensorflow/fuzz/fail_apis.txt", f"{self.result_dir}/build_status/")
self.copy_results_from_container(f"/root/tensorflow/fuzz/build_summary.txt", f"{self.result_dir}/build_status/")
self.copy_results_from_container(f"/root/tensorflow/fuzz/check.log", f"{self.result_dir}/build_status/")
fail_file = f"{self.result_dir}/build_status/fail_apis.txt"
fail_apis = self._load_fail_api_list(fail_file)
self._copy_build_logs("/root/tensorflow/fuzz", fail_apis)
with open(f"{self.result_dir}/build_status/build_summary.txt", "r") as f:
summary = f.read()
print(f"Build Summary: {summary}")
self.status = Status.COMPLETED
except Exception:
self.status = Status.FAILED
def tf_cov_api(self):
self.status = Status.RUNNING
try:
self.check_image()
self.start_docker_container()
self.execute_command(f"cd /root/tensorflow/fuzz/{self.api} && python3 coverage_fuzzing.py --interval {self.itv} --max-time {self.time_budget} --api {self.api}")
self.copy_results_from_container(f"/root/tensorflow/fuzz/{self.api}/coverage_data", f"{self.result_dir}/coverage_data")
self.status = Status.COMPLETED
except Exception:
self.status = Status.FAILED
# --- torch support ---
def torch_fuzz(self):
self.status = Status.RUNNING
try:
self.check_image()
self.start_docker_container()
self.execute_command(f"cd /root/fuzz/ && python3 build_test_harness.py --dll {self.dll} --mode {self.mode} --ver {self.ver} --time_budget {self.time_budget} --no-compile")
self.execute_command(f"mkdir -p /root/fuzz/{self.api}/artifacts")
self.execute_command(f"cd /root/fuzz/{self.api} && bash fuzz.sh > execution.log")
self.copy_results_from_container(f"/root/fuzz/{self.api}/execution.log", self.result_dir)
self.copy_results_from_container(f"/root/fuzz/{self.api}/fuzz-0.log", self.result_dir)
self.copy_results_from_container(f"/root/fuzz/{self.api}/artifacts/", self.result_dir)
# Compute and persist validity stats for this API
try:
self._compute_and_write_stats()
except Exception as e:
print(f"Failed to compute stats for {self.api}: {e}")
self.status = Status.COMPLETED
except Exception:
self.status = Status.FAILED
def torch_check_valid(self):
self.status = Status.RUNNING
try:
self.check_image()
self.start_docker_container()
self.execute_command(f"cd /root/fuzz/ && python3 -u build_test_harness.py --dll {self.dll} --mode {self.mode} --check_build > check.log")
self.copy_results_from_container(f"/root/fuzz/{self.api}", f"{self.result_dir}/{self.api}")
os.makedirs(f"{self.result_dir}/build_status", exist_ok=True)
self.copy_results_from_container(f"/root/fuzz/success_apis.txt", f"{self.result_dir}/build_status/")
self.copy_results_from_container(f"/root/fuzz/fail_apis.txt", f"{self.result_dir}/build_status/")
self.copy_results_from_container(f"/root/fuzz/build_summary.txt", f"{self.result_dir}/build_status/")
self.copy_results_from_container(f"/root/fuzz/check.log", f"{self.result_dir}/build_status/")
fail_file = f"{self.result_dir}/build_status/fail_apis.txt"
fail_apis = self._load_fail_api_list(fail_file)
self._copy_build_logs("/root/fuzz", fail_apis)
with open(f"{self.result_dir}/build_status/build_summary.txt", "r") as f:
summary = f.read()
print(f"Build Summary: {summary}")
self.status = Status.COMPLETED
except Exception:
self.status = Status.FAILED
def torch_cov_api(self):
self.status = Status.RUNNING
try:
self.check_image()
self.start_docker_container()
self.execute_command(f"cd /root/fuzz/{self.api} && python3 coverage_fuzzing.py --interval {self.itv} --max-time {self.time_budget} --api {self.api}")
self.copy_results_from_container(f"/root/fuzz/{self.api}/coverage_data", f"{self.result_dir}/coverage_data")
self.status = Status.COMPLETED
except Exception:
self.status = Status.FAILED
def merge_coverage_files(self) -> None:
print("Merging coverage files...")
output_dir = f"{self.result_dir}/all"
os.makedirs(output_dir, exist_ok=True)
previous_interval_name: Optional[str] = None
for idx in range(0, self.time_budget, self.itv):
interval_start = idx
interval_end = min(idx + self.itv, self.time_budget)
interval_name = f"{interval_start}-{interval_end}"
interval_dir = os.path.join(output_dir, interval_name)
os.makedirs(interval_dir, exist_ok=True)
api_dirs = glob.glob(f"{self.result_dir}/{self.dll}.*")
if not api_dirs:
print(f"No {self.dll} API directories found.")
return
for api_dir in api_dirs:
profraw_file = os.path.join(api_dir, "coverage_data", f"{interval_name}", "*.profraw")
profraw_files = glob.glob(profraw_file)
if not profraw_files:
print(f"No profraw files found for {api_dir} in interval {interval_name}.")
continue
for profraw in profraw_files:
shutil.copy2(profraw, interval_dir)
print(f"Copied {profraw} to {interval_dir}")
# Merge inside container; choose base_root by dll
base_root = "/root/tensorflow/fuzz" if self.dll == "tf" else "/root/fuzz"
try:
self.check_image()
self.start_docker_container()
self.copy_files_to_container(f"{output_dir}/{interval_name}", f"{base_root}/")
if previous_interval_name:
self.copy_files_to_container(f"{output_dir}/{previous_interval_name}", f"{base_root}/{previous_interval_name}")
if not previous_interval_name:
self.execute_command(f"cd {base_root} && python3 merge_profraw.py --dll {self.dll} --dir {interval_name}")
else:
self.execute_command(f"cd {base_root} && python3 merge_profraw.py --dll {self.dll} --dir {interval_name} --previous {previous_interval_name}/merged.profdata")
self.copy_results_from_container(f"{base_root}/{interval_name}/merged.profdata", f"{interval_dir}/merged.profdata")
previous_interval_name = interval_name
# delete the profraw files to save space
profraw_files = glob.glob(os.path.join(interval_dir, "*.profraw"))
if not self.debug:
for profraw_file in profraw_files:
os.remove(profraw_file)
except KeyboardInterrupt:
print(f"\nKeyboard interrupt received.")
except Exception as e:
print(f"Failed to merge profraw files for interval {interval_name}: {e}")
finally:
try:
self.stop_docker_container()
self.remove_docker_container()
except Exception:
pass
def get_coverage_results(self):
print("Getting coverage results...")
output_dir = f"{self.result_dir}/all"
os.makedirs(output_dir, exist_ok=True)
for idx in range(0, self.time_budget, self.itv):
interval_start = idx
interval_end = min(idx + self.itv, self.time_budget)
interval_name = f"{interval_start}-{interval_end}"
interval_dir = os.path.join(output_dir, interval_name)
os.makedirs(interval_dir, exist_ok=True)
# copy the merged.profdata file to the output directory
merged_profdata_file = os.path.join(interval_dir, "merged.profdata")
if not os.path.exists(merged_profdata_file):
print(f"No merged.profdata file found for interval {interval_name}.")
continue
# Run results extraction inside container; choose base_root by dll
base_root = "/root/tensorflow/fuzz" if self.dll == "tf" else "/root/fuzz"
try:
self.check_image()
self.start_docker_container()
self.copy_files_to_container(merged_profdata_file, f"{base_root}/")
# loop_until_ctrl_c()
if self.dll == "tf":
self.execute_command(f"cd {base_root} && python3 get_coverage_results.py --binary /root/tensorflow/bazel-bin/tensorflow/libtensorflow_cc.so.2.16.1 --dll {self.dll} --require tensorflow/core/kernels --coverage_file merged.profdata --out {interval_name}.txt")
else:
self.execute_command(f"cd {base_root} && python3 get_coverage_results.py --binary /root/pytorch/build-fuzz/lib/libtorch_cpu.so --dll {self.dll} --require aten/src/ATen/native --coverage_file merged.profdata --out {interval_name}.txt")
self.copy_results_from_container(f"{base_root}/{interval_name}.txt", f"{output_dir}/{interval_name}.txt")
self.copy_results_from_container(f"{base_root}/coverage_html", f"{interval_dir}/coverage_html")
# remove merged_profdata_file to save space
if not self.debug:
os.remove(merged_profdata_file)
except KeyboardInterrupt:
print(f"\nKeyboard interrupt received.")
except Exception as e:
print(f"Failed to get coverage results for interval {interval_name}: {e}")
finally:
try:
self.stop_docker_container()
self.remove_docker_container()
except Exception:
pass
# print the coverage summary
for idx in range(0, self.time_budget, self.itv):
interval_start = idx
interval_end = min(idx + self.itv, self.time_budget)
interval_name = f"{interval_start}-{interval_end}"
interval_dir = os.path.join(output_dir, interval_name)
coverage_results_file = f"{interval_dir}.txt"
if not os.path.exists(coverage_results_file):
print(f"No coverage_results.txt file found for interval {interval_name}.")
continue
with open(coverage_results_file, "r") as f:
coverage_summary = f.read()
pattern = r"(?<=Covered branches: )\d+"
coverage_number = re.search(pattern, coverage_summary)
if coverage_number:
coverage_number = coverage_number.group()
print(f"{interval_name}: {coverage_number}")
def _parse_fuzz_logs_for_stats(self, directory: str) -> tuple[int, int]:
"""
Scan fuzz logs in a directory to compute total rounds and invalid count.
- Rounds: prefer 'stat::number_of_executed_units', fallback to 'Done X runs'.
- Invalid: lines containing either 'Exception caught:' or 'CPU Execution error'.
Returns: (rounds, invalid_count)
"""
# Find fuzz-*.log files
log_files = sorted(glob.glob(os.path.join(directory, "fuzz-*.log")))
if not log_files:
# Nothing to parse
return 0, 0
total_rounds: Optional[int] = None
done_runs: Optional[int] = None
invalid_count = 0
for log_path in log_files:
try:
with open(log_path, "r", errors="ignore") as f:
for line in f:
# Count invalid markers
if ("Exception caught:" in line) or ("CPU Execution error" in line):
invalid_count += 1
# Capture rounds stats (prefer explicit stat line)
if "stat::number_of_executed_units:" in line:
try:
val = int(line.strip().split(":", 1)[1])
total_rounds = val
except Exception:
pass
elif line.startswith("Done ") and " runs in " in line:
# Example: Done 541361 runs in 602 second(s)
try:
parts = line.split()
# parts[1] should be the number following 'Done'
val = int(parts[1])
done_runs = val
except Exception:
pass
except FileNotFoundError:
continue
rounds = total_rounds if total_rounds is not None else (done_runs if done_runs is not None else 0)
return rounds, invalid_count
def _compute_and_write_stats(self) -> None:
"""Compute rounds and validity ratio for current API and write stat.txt."""
# Determine directory for this experiment's logs
target_dir = self.result_dir.rstrip("/")
os.makedirs(target_dir, exist_ok=True)
rounds, invalid = self._parse_fuzz_logs_for_stats(target_dir)
valid = max(0, rounds - invalid)
ratio = (valid / rounds) if rounds > 0 else 0.0
# Prepare output
lines = [
f"api: {self.api}",
f"rounds: {rounds}",
f"invalid: {invalid}",
f"valid: {valid}",
f"validity_ratio: {ratio:.6f}",
]
# Print summary
print(" | ".join(lines))
# Write to stat.txt under result dir
stat_path = os.path.join(target_dir, "stat.txt")
try:
with open(stat_path, "w") as sf:
sf.write("\n".join(lines) + "\n")
except Exception as e:
print(f"Failed to write stats file {stat_path}: {e}")
def _load_fail_api_list(self, fail_file: str) -> list[str]:
"""Parse fail_apis.txt and return API names."""
apis: list[str] = []
try:
with open(fail_file, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
api = line.split(":", 1)[0].strip()
if api:
apis.append(api)
except FileNotFoundError:
print(f"Fail file not found: {fail_file}")
except Exception as e:
print(f"Error reading fail file {fail_file}: {e}")
return apis
def _copy_build_logs(self, base_remote: str, apis: list[str], log_name: str = "build.log"):
"""Copy per-API build logs out of the container."""
if not apis:
return
out_base = os.path.join(self.result_dir, "build_logs")
for api in apis:
dest_dir = os.path.join(out_base, api)
dest = os.path.join(dest_dir, log_name)
remote = f"{base_remote}/{api}/{log_name}"
try:
self.copy_from_container(remote, dest)
except Exception:
# Best effort; keep going
pass
def run(self):
if self.dll == "tf":
if self.check_valid:
self.tf_check_valid()
elif self.mode == "fuzz":
self.tf_fuzz()
elif self.mode == "cov":
if self.api != "all":
self.tf_cov_api()
elif self.dll == "torch":
if self.check_valid:
self.torch_check_valid()
elif self.mode == "fuzz":
self.torch_fuzz()
elif self.mode == "cov":
if self.api != "all":
self.torch_cov_api()
class Scheduler():
def __init__(self, num_parallel: int = 1):
self.experiments: list[Experiment] = []
self.num_parallel = num_parallel
def add_experiment(self, experiment: Experiment):
self.experiments.append(experiment)
def _run_experiment(self, exp: Experiment):
"""Helper function to run a single experiment and handle cleanup."""
# Watchdog: hard stop at 2x time budget to avoid hanging containers
timeout_seconds = None if exp.check_valid else max(1, int(2 * getattr(exp, "time_budget", 180)))
cancel_event = threading.Event()
def watchdog():
if timeout_seconds is None:
return
# Wait for either completion signal or timeout
triggered = not cancel_event.wait(timeout=timeout_seconds)
if triggered:
# Only act if not already completed
if exp.status not in (Status.COMPLETED,):
print(f"Time budget exceeded for {exp.api} (>{timeout_seconds}s). Forcing docker cleanup...")
try:
exp.force_remove_container()
finally:
# Mark as failed due to timeout if not already set
if exp.status != Status.COMPLETED:
exp.status = Status.FAILED
wd_thread = threading.Thread(target=watchdog, name=f"wd-{exp.container_name}", daemon=True)
wd_thread.start()
try:
print(f"Running experiment for {exp.api}...")
exp.run()
if exp.status == Status.COMPLETED:
print(f"Experiment for {exp.api} completed successfully.")
else:
print(f"Experiment for {exp.api} failed.")
except KeyboardInterrupt:
print(f"\nKeyboard interrupt received. Cleaning up experiment for {exp.api}...")
exp.status = Status.FAILED
finally:
# Cancel watchdog and cleanup container
cancel_event.set()
try:
exp.stop_docker_container()
exp.remove_docker_container()
except Exception:
# Fall back to force remove in case normal stop/rm fails
try:
exp.force_remove_container()
except Exception:
pass
return exp.api, exp.status
def run_all(self):
# Take a snapshot so we only run what existed at call time
to_run = list(self.experiments)
if self.num_parallel == 1:
for exp in tqdm(to_run, desc="Running Experiments"):
try:
self._run_experiment(exp)
finally:
# Remove finished experiment to prevent re-execution
try:
self.experiments.remove(exp)
except ValueError:
pass
else:
with ThreadPoolExecutor(max_workers=self.num_parallel) as executor:
with tqdm(total=len(to_run), desc="Running Experiments") as pbar:
future_to_exp = {executor.submit(self._run_experiment, exp): exp for exp in to_run}
try:
for future in as_completed(future_to_exp):
pbar.update(1)
exp = future_to_exp[future]
try:
future.result()
finally:
# Remove finished experiment to prevent re-execution
try:
self.experiments.remove(exp)
except ValueError:
pass
except KeyboardInterrupt:
print("\nInterrupted. Cancelling remaining experiments...")
for f in future_to_exp:
f.cancel()
raise