|
1 | | -'# ComStock™, Copyright (c) 2025 Alliance for Sustainable Energy, LLC. All rights reserved.' |
| 1 | +# ComStock™, Copyright (c) 2025 Alliance for Sustainable Energy, LLC. All rights reserved. |
2 | 2 | # See top level LICENSE.txt file for license terms. |
3 | 3 | import os |
4 | 4 | import s3fs |
|
20 | 20 | import polars as pl |
21 | 21 | import re |
22 | 22 | import datetime |
| 23 | +from natsort import natsort_keygen, natsorted |
23 | 24 | from pathlib import Path |
24 | 25 |
|
25 | 26 | from comstockpostproc.naming_mixin import NamingMixin |
@@ -154,7 +155,7 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc |
154 | 155 | upgrade_pqts.append(f's3://{p}') |
155 | 156 | else: |
156 | 157 | upgrade_pqts.append(p) |
157 | | - upgrade_pqts.sort() |
| 158 | + upgrade_pqts.sort(key=natsort_keygen()) |
158 | 159 | if len(upgrade_pqts) > 0: |
159 | 160 | upgrade_dfs = [] |
160 | 161 | for file_path in upgrade_pqts: |
@@ -250,29 +251,21 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc |
250 | 251 | self._aggregate_failure_summaries() |
251 | 252 |
|
252 | 253 | def _aggregate_failure_summaries(self): |
253 | | - #since we are generating summary of failures based on |
254 | | - #each upgrade_id(in load_data()), we should aggregate |
255 | | - #the summary of failures for each upgrade_id into one |
256 | | - |
257 | | - path = self.output_dir['fs_path'] |
258 | | - |
259 | | - allLines = list() |
260 | | - #find all the failure_summary files like with failure_summary_0.csv |
261 | | - # failure_summary_1.csv ... failure_summary_k.csv |
262 | | - for file in os.listdir(path): |
263 | | - if file.startswith("failure_summary_") and file.endswith(".csv"): |
264 | | - #open the file and read the content |
265 | | - with self.output_dir['fs'].open(f'{path}/{file}', 'r') as f: |
266 | | - for line in f: |
267 | | - if line not in allLines: |
268 | | - allLines.append(line) |
269 | | - #delete the file |
270 | | - os.remove(os.path.join(path, file)) |
271 | | - |
272 | | - #write the aggregated summary of failures to a new file |
273 | | - with self.output_dir['fs'].open(f'{path}/failure_summary_aggregated.csv', 'w') as f: |
274 | | - for line in allLines: |
275 | | - f.write(line) |
| 254 | + # Aggregate and deduplicate lines from all failure_summary_*.csv files, then remove them |
| 255 | + fs = self.output_dir["fs"] |
| 256 | + fs_path = self.output_dir["fs_path"] |
| 257 | + |
| 258 | + lines = [] |
| 259 | + for file_path in natsorted([p for p in fs.ls(fs_path) if Path(p).name.startswith("failure_summary_") and Path(p).name.endswith(".csv")]): |
| 260 | + logger.debug(f"Aggregating failure summary from {file_path!r}") |
| 261 | + with fs.open(file_path, "r") as f: |
| 262 | + for line in f: |
| 263 | + if line not in lines: |
| 264 | + lines.append(line) |
| 265 | + fs.rm(file_path) |
| 266 | + |
| 267 | + with fs.open(f"{fs_path}/failure_summary_aggregated.csv", "w") as f: |
| 268 | + f.writelines(lines) |
276 | 269 |
|
277 | 270 | def download_data(self): |
278 | 271 |
|
@@ -334,27 +327,26 @@ def download_data(self): |
334 | 327 |
|
335 | 328 | # upgrades/upgrade=*/results_up*.parquet |
336 | 329 | if self.include_upgrades: |
337 | | - if len(glob.glob(f'{self.data_dir}/results_up*.parquet')) < 2: |
338 | | - if self.s3_inpath is None: |
339 | | - logger.info('The s3 path passed to the constructor is invalid, ' |
340 | | - 'cannot check for results_up**.parquet files to download') |
341 | | - else: |
342 | | - upgrade_parquet_path = f'{prfx}/upgrades' |
343 | | - resp = s3_resource.Bucket(bucket_name).objects.filter(Prefix=upgrade_parquet_path).all() |
344 | | - for obj in list(resp): |
345 | | - obj_path = obj.key |
346 | | - obj_name = obj_path.split('/')[-1] |
347 | | - m = re.search('results_up(.*).parquet', obj_name) |
348 | | - if not m: |
349 | | - continue |
350 | | - upgrade_id = m.group(1) |
351 | | - if upgrade_id in self.upgrade_ids_to_skip: |
352 | | - logger.info(f'Skipping data download for upgrade {upgrade_id}') |
353 | | - continue |
354 | | - results_data_path = os.path.join(self.data_dir, obj_name) |
355 | | - if not os.path.exists(results_data_path): |
356 | | - logger.info(f'Downloading {obj_path} from the {bucket_name} bucket') |
357 | | - s3_resource.Object(bucket_name, obj_path).download_file(results_data_path) |
| 330 | + if self.s3_inpath is None: |
| 331 | + logger.info('The s3 path passed to the constructor is invalid, ' |
| 332 | + 'cannot check for results_up**.parquet files to download') |
| 333 | + else: |
| 334 | + upgrade_parquet_path = f'{prfx}/upgrades' |
| 335 | + resp = s3_resource.Bucket(bucket_name).objects.filter(Prefix=upgrade_parquet_path).all() |
| 336 | + for obj in natsorted(resp, key=lambda obj: obj.key): |
| 337 | + obj_path = obj.key |
| 338 | + obj_name = obj_path.split('/')[-1] |
| 339 | + m = re.search('results_up(.*).parquet', obj_name) |
| 340 | + if not m: |
| 341 | + continue |
| 342 | + upgrade_id = m.group(1) |
| 343 | + if upgrade_id in self.upgrade_ids_to_skip: |
| 344 | + logger.info(f'Skipping data download for upgrade {upgrade_id}') |
| 345 | + continue |
| 346 | + results_data_path = os.path.join(self.data_dir, obj_name) |
| 347 | + if not os.path.exists(results_data_path): |
| 348 | + logger.info(f'Downloading {obj_path} from the {bucket_name} bucket') |
| 349 | + s3_resource.Object(bucket_name, obj_path).download_file(results_data_path) |
358 | 350 |
|
359 | 351 | # buildstock.csv |
360 | 352 | #1. check the file in the data_dir |
@@ -543,7 +535,7 @@ def load_data(self, upgrade_id, acceptable_failure_percentage=0.01, drop_failed_ |
543 | 535 | failure_summaries = [] |
544 | 536 |
|
545 | 537 | # Load results, identify failed runs |
546 | | - for upgrade_id in [0, upgrade_id]: |
| 538 | + for upgrade_id in [np.int64(0), upgrade_id]: |
547 | 539 |
|
548 | 540 | # Skip specified upgrades |
549 | 541 | if upgrade_id in self.upgrade_ids_to_skip: |
@@ -645,7 +637,7 @@ def load_data(self, upgrade_id, acceptable_failure_percentage=0.01, drop_failed_ |
645 | 637 | .when( |
646 | 638 | (pl.col(self.COMP_STATUS).is_null())) |
647 | 639 | .then(pl.lit(ST_FAIL_NO_STATUS)) |
648 | | - # Sucessful, but upgrade was NA, so has no results |
| 640 | + # Successful, but upgrade was NA, so has no results |
649 | 641 | .when( |
650 | 642 | (pl.col(self.COMP_STATUS) == 'Invalid')) |
651 | 643 | .then(pl.lit(ST_NA)) |
|
0 commit comments