Skip to content

Commit 0438c26

Browse files
committed
remove need for configmap (use job creation dates instead). support pagination. limit memory usage.
1 parent 83e8b53 commit 0438c26

1 file changed

Lines changed: 97 additions & 20 deletions

File tree

src/mas/devops/saas/job_cleaner.py

Lines changed: 97 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,110 @@
1010

1111
from kubernetes import client
1212
import logging
13+
import itertools
1314

1415
logger = logging.getLogger(__name__)
1516

17+
# TODO: dry-run mode that just logs (does not delete anything)
1618

17-
def cleanup_jobs(k8s_client: client.api_client.ApiClient, label: str):
18-
core_v1_api = client.CoreV1Api(k8s_client)
19+
20+
# TODO: test case: four jobs with same cleanup_group id but different namespaces
21+
22+
23+
def job_details(job, label):
24+
name = job.metadata.name
25+
namespace = job.metadata.namespace
26+
creation_timestamp = job.metadata.creation_timestamp
27+
cleanup_group = job.metadata.labels[label]
28+
29+
return f"{name} {namespace} {cleanup_group} {creation_timestamp}"
30+
31+
32+
def cleanup_jobs(k8s_client: client.api_client.ApiClient, label: str, limit: int = 100):
1933
batch_v1_api = client.BatchV1Api(k8s_client)
2034

21-
cms = core_v1_api.list_config_map_for_all_namespaces(label_selector=label)
35+
# we need to be sure we have all Jobs loaded up front (we can't do the cleanup page by page)
36+
# so a page boundary may cut a cleanup_group in half, which would cause inconsistent behaviour
37+
38+
# set of tuples (namespace, cleanup_group_id)
39+
cleanup_groups = set()
40+
_continue = None
41+
while True:
42+
43+
# to avoid loading all jobs into memory at once (there may be a LOT),
44+
# do an initial query to look for all unique group_ids in the cluster
45+
# later, for each group_id, another query to find all jobs belonging to that group
46+
# We're trading cpu time / network io for memory here..
47+
48+
jobs_page = batch_v1_api.list_job_for_all_namespaces(
49+
label_selector=label,
50+
limit=limit,
51+
_continue=_continue
52+
)
53+
_continue = jobs_page.metadata._continue
54+
55+
for job in jobs_page.items:
56+
cleanup_groups.add((job.metadata.namespace, job.metadata.labels[label]))
57+
58+
if _continue is None:
59+
break
60+
61+
# NOTE: it's possible for things to change in the cluster while this process is ongoing
62+
# e.g.:
63+
# - a new sync cycle creates a newer version of Job; not a problem, just means an orphaned job will stick around for one extra cycle
64+
# - a new cleanup group appears; not a problem, the new cleanup group will be handled in the next cycle
65+
# - ... other race conditions?
66+
# this process is eventually consistent
67+
68+
# Now we know all the cleanup group ids in the cluster
69+
# we can deal with each one separately; we only have to load the job resources for that particular group into memory at once
70+
# (we have to load into memory in order to guarantee the jobs are sorted by creation_date
71+
# if we could (can?) rely on K8S to always return them in this order then we could evaluate each page of Jobs lazily
72+
for (namespace, cleanup_group_id) in cleanup_groups:
73+
74+
print()
75+
print()
76+
print(f"{namespace} / {cleanup_group_id}")
77+
print("============================")
78+
79+
# page through all jobs in this namespace and group, and chain together all the resulting iterators
80+
job_items_iters = []
81+
while True:
82+
jobs_page = batch_v1_api.list_namespaced_job(
83+
namespace,
84+
label_selector=f"{label}={cleanup_group_id}",
85+
limit=limit,
86+
_continue=_continue
87+
)
88+
job_items_iters.append(jobs_page.items)
89+
_continue = jobs_page.metadata._continue
90+
if _continue is None:
91+
break
92+
93+
jobs = itertools.chain(*job_items_iters)
2294

23-
for cm in cms.items:
24-
cm_ns = cm.metadata.namespace
25-
job_cleanup_group = cm.metadata.labels[label]
26-
logger.info("")
27-
logger.info(f"{job_cleanup_group} in {cm_ns}")
28-
logger.info("-------------------------------")
29-
try:
30-
current_job_name = cm.data['current_job_name']
31-
logger.info(f"Current Job Name: {current_job_name}")
95+
# sort the jobs by creation_timestamp
96+
jobs_sorted = iter(sorted(
97+
jobs,
98+
key=lambda group_job: group_job.metadata.creation_timestamp,
99+
reverse=True
100+
))
32101

33-
# get all Jobs in the same namespace as the configmap that have LABEL: job_cleanup_group
34-
jobs_in_cleanup_group = batch_v1_api.list_namespaced_job(cm_ns, label_selector=f"{label}={job_cleanup_group}")
102+
# inspect the first Job - i.e. the one created most recently
103+
# whatever happens we definitely will not be deleting this job (in this cycle, at least)
104+
most_recent_job = next(jobs_sorted)
105+
print()
106+
print("Most recent Job")
107+
print("------")
108+
print(job_details(most_recent_job, label))
35109

36-
for job in jobs_in_cleanup_group.items:
37-
job_name = job.metadata.name
38-
if job_name != current_job_name:
39-
logger.info(f"Deleting old Job resource: {job_name}")
110+
# TODO: prune prior jobs even if most recent job has failed?
111+
# or leave them be as they may provide valuable debugging info?
40112

41-
except Exception as e:
42-
logger.error(f"Skipping {job_cleanup_group} in {cm_ns}: {repr(e)}")
113+
print()
114+
print("Old Jobs to be pruned")
115+
print("------")
116+
for job in jobs_sorted:
117+
# prune prior jobs even if most recent job has failed?
118+
# or leave them be as they may provide valuable debugging info?
119+
print(job_details(job, label))

0 commit comments

Comments
 (0)