Skip to content

Commit d595dc1

Browse files
Update so prune and reindex work per-stack
1 parent 4b4ad73 commit d595dc1

2 files changed

Lines changed: 77 additions & 38 deletions

File tree

spackbot/handlers/mirrors.py

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,55 +3,87 @@
33
#
44
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
55

6+
from sh.contrib import git
7+
import os
68

79
import spackbot.helpers as helpers
8-
from spackbot.helpers import pr_expected_base, pr_mirror_base_url, pr_shared_mirror
9-
from spackbot.workers import copy_pr_binaries, update_mirror_index, work_queue
10+
import spackbot.workers as workers
1011

1112
# If we don't provide a timeout, the default in RQ is 180 seconds
1213
WORKER_JOB_TIMEOUT = 6 * 60 * 60
1314

1415
logger = helpers.getLogger(__name__)
1516

17+
def list_ci_stacks(branch=helpers.pr_expected_base):
18+
with helpers.temp_dir() as cwd:
19+
# Shallow clone of spack to use for spack getting the current stacks availabe
20+
# in the base branch.
21+
git.clone("--branch", branch, "--depth", 1, helpers.spack_upstream, "spack-develop")
22+
23+
stacks = []
24+
pipeline_root = f"{cwd}/spack-develop/share/spack/gitlab/cloud_pipelines/stacks/"
25+
for stack in os.listdir(pipeline_root):
26+
if os.path.isfile(f"{pipeline_root}/{stack}/spack.yaml"):
27+
stacks.append(stack)
28+
29+
return stacks
1630

1731
async def graduate_pr_binaries(event, gh):
1832
payload = event.data
1933

2034
base_branch = payload["pull_request"]["base"]["ref"]
2135
is_merged = payload["pull_request"]["merged"]
2236

23-
if is_merged and base_branch == pr_expected_base:
37+
if is_merged and base_branch == helpers.pr_expected_base:
2438
pr_number = payload["number"]
2539
pr_branch = payload["pull_request"]["head"]["ref"]
2640

27-
shared_mirror_url = f"{pr_mirror_base_url}/{pr_shared_mirror}"
41+
shared_mirror_url = f"{helpers.pr_mirror_base_url}/{helpers.pr_shared_mirror}"
2842

2943
logger.info(
3044
f"PR {pr_number}/{pr_branch} merged to develop, graduating binaries"
3145
)
3246

33-
ltask_q = work_queue.get_lqueue()
47+
ltask_q = workers.work_queue.get_lqueue()
3448
copy_job = ltask_q.enqueue(
35-
copy_pr_binaries,
49+
workers.copy_pr_binaries,
3650
pr_number,
3751
pr_branch,
38-
shared_mirror_url,
52+
f"{shared_mirror_url}",
3953
job_timeout=WORKER_JOB_TIMEOUT,
4054
)
55+
copy_job.meta["info"] = {
56+
"type": "copy",
57+
"stack": "all",
58+
}
4159
logger.info(f"Copy job queued: {copy_job.id}")
4260

43-
pr_number_i = int(pr_number)
44-
if pr_number_i % 100 == 0:
61+
for stack in list_ci_stacks():
62+
pr_stack_mirror = f"{helpers.pr_mirror_bucket}/{stack}"
63+
publish_stack_mirror = f"{helpers.publish_mirror_bucket}/{helpers.pr_excepted_base}/{stack}"
4564
prune_job = ltask_q.enqueue(
65+
workers.prune_mirror_duplicates,
66+
pr_stack_mirror,
67+
publish_stack_mirror,
4668
job_timeout=WORKER_JOB_TIMEOUT,
69+
depends_on=copy_job
4770
)
71+
prune_job.meta["info"] = {
72+
"type": "prune",
73+
"stack": stack,
74+
}
4875
logger.info(f"Pruning job queued: {prune_job.id}")
4976

50-
# If the index job queue has a job queued already, there is no need to
51-
# schedule another one
52-
update_job = ltask_q.enqueue(
53-
update_mirror_index,
54-
shared_mirror_url,
55-
job_timeout=WORKER_JOB_TIMEOUT,
56-
)
57-
logger.info(f"Reindex job queued: {update_job.id}")
77+
# If the index job queue has a job queued already, there is no need to
78+
# schedule another one
79+
update_job = ltask_q.enqueue(
80+
workers.update_mirror_index,
81+
f"{shared_mirror_url}/{stack}",
82+
job_timeout=WORKER_JOB_TIMEOUT,
83+
depends_on=prune_job
84+
)
85+
update_job.meta["info"] = {
86+
"type": "reindex",
87+
"stack": stack,
88+
}
89+
logger.info(f"Reindex job queued: {update_job.id}")

spackbot/workers.py

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,6 @@ async def fix_style_task(event):
331331
event.data["issue"]["comments_url"], {}, data={"body": message}
332332
)
333333

334-
335334
async def find_latest_pipeline(url, headers, session):
336335
async with session.get(url, headers=headers) as response:
337336
pipeline_objects = await response.json()
@@ -394,7 +393,7 @@ async def download_spack_lock_files(url, headers, download_dir, session):
394393

395394
return folder_list
396395

397-
async def copy_pr_binaries(pr_number, pr_branch, shared_pr_mirror_url):
396+
async def copy_pr_binaries(stack, pr_number, pr_branch, shared_pr_mirror_url):
398397
"""Find the latest gitlab pipeline for the PR, get the spack.lock
399398
for each child pipeline, and for each one, activate the environment
400399
and issue the spack buildcache sync command to copy between the
@@ -458,19 +457,26 @@ async def copy_pr_binaries(pr_number, pr_branch, shared_pr_mirror_url):
458457
pr_mirror_url
459458
])
460459

461-
def rq_has_reindex():
462-
ltask_q = work_queue.get_lqueue()
463-
for job in ltask_q.jobs:
464-
if "update_mirror_index" in job.func_name:
465-
return True
466-
return False
467-
460+
# Upate index per stack mirror
468461
async def update_mirror_index(mirror_url):
469462
"""Use spack buildcache command to update index on remote mirror"""
470463

464+
# Current job stack
465+
job = get_current_job()
466+
stack = job.meta["info"]["stack"]
467+
468+
# Check if another reindex for this stack is queued
469+
do_reindex = True
470+
ltask_q = work_queue.get_lqueue()
471+
for job in ltask_q.jobs:
472+
info = job.meta["info"]
473+
if info["type"] == "reindex" and info["stack"] == stack:
474+
do_reindex = False
475+
break
476+
471477
# Check the queue for more reindex jobs, if there are none,
472478
# run reindex on the graduated PR mirror.
473-
if not rq_has_reindex():
479+
if do_reindex:
474480
print(f"Updating binary index at {mirror_url}")
475481
await helpers.run_in_subprocess([
476482
"spack",
@@ -484,20 +490,22 @@ async def update_mirror_index(mirror_url):
484490

485491
def hash_from_key(key):
486492
h = None
493+
# 32 chars long
487494
if key.lower().endswith(("spec.json")):
488495
h = re.search(".*-([a-zA-Z0-9]+)\.spec\.json", key.lower())
489496
elif key.lower().endswith(("spack")):
490497
h = re.search(".*-([a-zA-Z0-9]+)\.spack", key.lower())
491498
return h
492499

493-
async def prune_mirror_duplicates():
500+
# Prune per stack mirror
501+
async def prune_mirror_duplicates(pr_mirror_bucket, publish_mirror_bucket):
494502
s3 = boto3.resource("s3")
495-
pr_bucket = s3.Bucket(helpers.pr_mirror_bucket)
496-
publish_bucket = s3.Bucket(helpers.publish_mirror_bucket)
503+
pr_bucket = s3.Bucket(pr_mirror_bucket)
504+
publish_bucket = s3.Bucket(publish_mirror_bucket)
497505

498506
# Get the hashes in the shared PR bucket
499507
pr_specs = {}
500-
for obj in pr_bucket.filter(Prefix=helpers.pr_shared_bucket).objects():
508+
for obj in pr_bucket.objects():
501509
if not obj.key.endswith(("spec.json")):
502510
continue
503511

@@ -506,12 +514,12 @@ async def prune_mirror_duplicates():
506514
# Clean up anything that didn't work and log an error. This should never happen
507515
# but also shouldn't stop the pruning.
508516
if None in pr_specs:
509-
logger.error("Encountered hashless spec.json in shared PR mirror")
517+
logger.error("Encountered hashless spec.json in PR mirror")
510518
pr_specs.remove(None)
511519

512520
# Check in the published base branch bucket for duplicates to delete
513521
delete_specs = {}
514-
for obj in publish_bucket.filter(Prefix=helpers.pr_expected_base).objects():
522+
for obj in publish_bucket.objects():
515523
if not obj.key.endswith(("spec.json")):
516524
continue
517525

@@ -520,13 +528,12 @@ async def prune_mirror_duplicates():
520528
delete_specs.add(spec_hash)
521529

522530
# Delete all of the objects with marked hashes
523-
if delete_specs:
524-
logger.info("Pruning PR mirror")
525-
for obj in pr_bucket.filter(Prefix=helpers.pr_shared_bucket).objects():
531+
for obj in pr_bucket.objects():
526532
if not obj.key.endswith((".spec.json", ".spack")):
527533
continue
528534

529535
if hash_from_key(obj.key) in delete_specs:
530-
logger.debug(f"""pr mirror pruning is deleteing {obj.key}
531-
from s3://{helpers.pr_mirror_bucket}/{helpers.pr_shared_bucket}""")
536+
logger.debug(
537+
f"pr mirror pruning is deleteing {obj.key} from s3://{pr_mirror_bucket}"
538+
)
532539
obj.delete()

0 commit comments

Comments
 (0)