From 3b51cdbc8ef161ab327af49fc79c2c8284f4ba46 Mon Sep 17 00:00:00 2001 From: jorntx <> Date: Fri, 9 Jan 2026 15:40:18 +0100 Subject: [PATCH 1/6] First, second and third stab at limiter functionality Added option to limit path depth Added option to limit snapshots pr. url Both are optional --- .gitignore | 3 + README.md | 6 ++ pywaybackup/Arguments.py | 12 ++++ pywaybackup/PyWayBackup.py | 14 ++++- pywaybackup/SnapshotCollection.py | 98 ++++++++++++++++++++++++++++++- 5 files changed, 130 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 420fdf8..bb6b52e 100644 --- a/.gitignore +++ b/.gitignore @@ -132,6 +132,7 @@ venv/ ENV/ env.bak/ venv.bak/ +copilot-instructions.md # Spyder project settings .spyderproject @@ -167,3 +168,5 @@ cython_debug/ # custom test.py +waybackup_snapshots/ +output/ \ No newline at end of file diff --git a/README.md b/README.md index 253c33b..df8ff9b 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,12 @@ Parameters for archive.org CDX query. No effect on snapshot download itself. - **`--limit`** ``:
Limits the snapshots fetched from archive.org CDX. (Will have no effect on existing CDX files) +- **`--max-snapshots-per-url`** ``:
+ Limits the number of snapshots kept per unique URL after CDX insertion and filtering. Selection is distributed across the requested date range so retained snapshots represent the whole timespan. Useful to cap harvest depth for large sites. Example: `--limit 10000 --max-snapshots-per-url 100` queries up to 10k snapshots then keeps up to 100 per unique URL. + +- **`--path-depth`** ``:
+ Limit harvested URLs by path depth. `0` = only `example.com/` (root), `1` = root + immediate children (e.g. `example.com/foo/`), `2` = two levels, etc. Applied after CDX insertion and mode filtering. Combine with `--max-snapshots-per-url` to cap snapshots per kept URL. + - **Range Selection:**
Set the query range in years (`range`) or a timestamp (`start` and/or `end`). If `range` then ignores `start` and `end`. Format for timestamps: YYYYMMDDhhmmss. Timestamp can as specific as needed (year 2019, year+month+day 20190101, ...). diff --git a/pywaybackup/Arguments.py b/pywaybackup/Arguments.py index 470123e..05118f4 100644 --- a/pywaybackup/Arguments.py +++ b/pywaybackup/Arguments.py @@ -40,6 +40,18 @@ def __init__(self): behavior.add_argument("--retry", type=int, default=0, metavar="", help="retry failed downloads (opt tries as int, else infinite)") behavior.add_argument("--workers", type=int, default=1, metavar="", help="number of workers (simultaneous downloads)") behavior.add_argument("--delay", type=int, default=0, metavar="", help="delay between each download in seconds") + behavior.add_argument( + "--max-snapshots-per-url", + type=int, + metavar="", + help="limit number of snapshots to keep per unique URL (distributed across date range)", + ) + behavior.add_argument( + "--path-depth", + type=int, + metavar="", + help="limit harvested URLs by path depth: 0=root only, 1=include immediate children, etc.", + ) special = parser.add_argument_group("special") special.add_argument("--reset", action="store_true", help="reset the job and ignore existing cdx/db/csv files") diff --git a/pywaybackup/PyWayBackup.py b/pywaybackup/PyWayBackup.py index b17ced4..11f77c5 100644 --- a/pywaybackup/PyWayBackup.py +++ b/pywaybackup/PyWayBackup.py @@ -133,6 +133,8 @@ def __init__( keep: bool = False, silent: bool = True, debug: bool = False, + max_snapshots_per_url: int = None, + path_depth: int = None, **kwargs: dict, ): self._url = url @@ -158,6 +160,8 @@ def __init__( self._delay = delay self._reset = reset self._keep = keep + self._max_snapshots_per_url = max_snapshots_per_url + self._path_depth = path_depth # module exclusive self._silent = silent @@ -184,6 +188,8 @@ def __init__( + str(self._start) + str(self._end) + str(self._limit) + + str(self._max_snapshots_per_url) + + str(self._path_depth) + str(self._filetype) + str(self._statuscode) ) @@ -324,7 +330,13 @@ def _prep_collection(self) -> SnapshotCollection: SnapshotCollection: The initialized and loaded snapshot collection. """ collection = SnapshotCollection() - collection.load(mode=self._mode, cdxfile=self._cdxfile, csvfile=self._csvfile) + collection.load( + mode=self._mode, + cdxfile=self._cdxfile, + csvfile=self._csvfile, + max_snapshots_per_url=self._max_snapshots_per_url, + path_depth=self._path_depth, + ) collection.print_calculation() return collection diff --git a/pywaybackup/SnapshotCollection.py b/pywaybackup/SnapshotCollection.py index dcafcf9..25c5478 100644 --- a/pywaybackup/SnapshotCollection.py +++ b/pywaybackup/SnapshotCollection.py @@ -4,6 +4,7 @@ from pywaybackup.files import CDXfile, CSVfile from pywaybackup.Verbosity import Progressbar from pywaybackup.Verbosity import Verbosity as vb +from pywaybackup.helper import url_split func: callable @@ -19,7 +20,9 @@ def __init__(self): self.csvfile = None self._mode_first = False self._mode_last = False - + self._max_snapshots_per_url = None + self._path_depth = None + self._cdx_total = 0 # absolute amount of snapshots in cdx file self._snapshot_total = 0 # absolute amount of snapshots in db @@ -60,7 +63,7 @@ def _finalize_db(self): self.db.write_progress(self._snapshot_handled, self._snapshot_total) self.db.session.close() - def load(self, mode: str, cdxfile: CDXfile, csvfile: CSVfile): + def load(self, mode: str, cdxfile: CDXfile, csvfile: CSVfile, max_snapshots_per_url: int = None, path_depth: int = None): """ Insert the content of the cdx and csv file into the snapshot table. """ @@ -87,6 +90,9 @@ def load(self, mode: str, cdxfile: CDXfile, csvfile: CSVfile): vb.write(verbose=True, content="\nAlready indexed snapshots") if not self.db.get_filter_complete(): vb.write(content="\nFiltering snapshots (last or first version)...") + # set per-url limit (if provided) and then filter + self._max_snapshots_per_url = max_snapshots_per_url + self._path_depth = path_depth self._filter_snapshots() # filter: keep newest or oldest based on MODE self.db.set_filter_complete() else: @@ -285,6 +291,94 @@ def _enumerate_counter(): offset += len(rows) _filter_mode() + + # Apply path-depth pruning if requested: remove snapshots whose URL path depth + # exceeds `self._path_depth`. Root path has depth 0; immediate children depth 1. + self._filter_path_depth_deleted = 0 + if self._path_depth is not None: + try: + depth_limit = int(self._path_depth) + except Exception: + depth_limit = None + if depth_limit is not None: + # iterate distinct origins and delete those exceeding depth + origins = ( + self.db.session.execute(select(waybackup_snapshots.url_origin).distinct()) + .scalars() + .all() + ) + total_deleted = 0 + for origin in origins: + if not origin: + continue + domain, subdir, filename = url_split(origin) + subdir = subdir or "" + path_segments = [p for p in subdir.strip("/").split("/") if p] + depth = len(path_segments) + if depth > depth_limit: + result = self.db.session.execute( + delete(waybackup_snapshots).where(waybackup_snapshots.url_origin == origin) + ) + total_deleted += result.rowcount + self.db.session.commit() + self._filter_path_depth_deleted += total_deleted + + # Apply per-URL snapshot limiting distributed across the date range + # This keeps up to `self._max_snapshots_per_url` snapshots per `url_origin`. + # Selection is distributed across the available timestamps so the whole + # date range is represented (first and last snapshots preserved when limit>1). + self._filter_snapshot_deleted = 0 + limit = None + if self._max_snapshots_per_url: + try: + limit = int(self._max_snapshots_per_url) + except (TypeError, ValueError): + limit = None + if limit and limit > 0: + # find origins that exceed the limit + origins = ( + self.db.session.execute( + select(waybackup_snapshots.url_origin, func.count().label("cnt")) + .group_by(waybackup_snapshots.url_origin) + .having(func.count() > limit) + ) + .all() + ) + for origin_row in origins: + origin = origin_row[0] + # fetch ordered scids for this origin (ascending timestamps) + scid_rows = ( + self.db.session.execute( + select(waybackup_snapshots.scid) + .where(waybackup_snapshots.url_origin == origin) + .order_by(waybackup_snapshots.timestamp.asc()) + ) + .scalars() + .all() + ) + total = len(scid_rows) + if total <= limit: + continue + # compute indices to keep (distributed across range) + indices = [] + if limit == 1: + # pick middle snapshot as representative + indices = [total // 2] + else: + for i in range(limit): + idx = round(i * (total - 1) / (limit - 1)) + indices.append(int(idx)) + # ensure unique and valid indices + indices = sorted(set(max(0, min(total - 1, i)) for i in indices)) + keep_scids = [scid_rows[i] for i in indices] + # delete non-kept snapshots for this origin + stmt = delete(waybackup_snapshots).where( + and_(waybackup_snapshots.url_origin == origin, ~waybackup_snapshots.scid.in_(keep_scids)) + ) + result = self.db.session.execute(stmt) + self.db.session.commit() + self._filter_snapshot_deleted += result.rowcount + _enumerate_counter() self._filter_response = ( self.db.session.query(waybackup_snapshots).where(waybackup_snapshots.response.in_(["404", "301"])).count() From cb63549b580b1f42fe9e55c121f962adb26d4e01 Mon Sep 17 00:00:00 2001 From: jorntx <> Date: Fri, 16 Jan 2026 08:52:11 +0100 Subject: [PATCH 2/6] Enhance filename sanitization to include additional disallowed characters Add additional '#=!~' sanitizing to filenames and subdir foldernames --- pywaybackup/helper.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pywaybackup/helper.py b/pywaybackup/helper.py index ca5005a..8362b82 100644 --- a/pywaybackup/helper.py +++ b/pywaybackup/helper.py @@ -15,7 +15,7 @@ def sanitize_filename(filename: str) -> str: """ Sanitize a string to be used as (part of) a filename. """ - disallowed = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] + disallowed = ["<", ">", ":", '"', "/", "\\", "|", "?", "*", "=", "#", "!", "~"] for char in disallowed: filename = filename.replace(char, ".") filename = ".".join(filter(None, filename.split("."))) @@ -63,12 +63,12 @@ def url_split(url, index=False): else: filename = "index.html" if index else "" subdir = "/".join(path_parts).strip("/") - # sanitize subdir and filename for windows - if check_nt(): - special_chars = [":", "*", "?", "&", "=", "<", ">", "\\", "|"] - for char in special_chars: - subdir = subdir.replace(char, f"%{ord(char):02x}") - filename = filename.replace(char, f"%{ord(char):02x}") + + # Sanitize special characters that are problematic in file- and foldernames + special_chars = [":", "*", "?", "&", "=", "<", ">", "\\", "|", "#", "!", "~"] + for char in special_chars: + subdir = subdir.replace(char, f"%{ord(char):02x}") + filename = filename.replace(char, f"%{ord(char):02x}") filename = filename.replace("%20", " ") return domain, subdir, filename From 5b0d2de2eb5e219382cbc4ae5c48c0702979092f Mon Sep 17 00:00:00 2001 From: Victor Johnston Date: Mon, 19 Jan 2026 09:39:27 +0100 Subject: [PATCH 3/6] Add atomic claim of snapshots --- pywaybackup/Snapshot.py | 45 ++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/pywaybackup/Snapshot.py b/pywaybackup/Snapshot.py index 1b343c1..3755501 100644 --- a/pywaybackup/Snapshot.py +++ b/pywaybackup/Snapshot.py @@ -1,7 +1,7 @@ import os import threading -from pywaybackup.db import Database, select, update, waybackup_snapshots +from pywaybackup.db import Database, select, update, waybackup_snapshots, and_ from pywaybackup.helper import url_split @@ -70,20 +70,41 @@ def __on_sqlite(): return False def __get_row(): - with self._db.session.begin(): - row = self._db.session.execute( - select(waybackup_snapshots) + # Atomic claim: find next unprocessed scid, set response='LOCK' only if still unprocessed, + # then fetch that row. This avoids relying on FOR UPDATE or explicit nested transactions + # which can trigger "A transaction is already begun on this Session" errors. + + session = self._db.session + + # get next available SnapshotId + scid = ( + session.execute( + select(waybackup_snapshots.scid) .where(waybackup_snapshots.response.is_(None)) .order_by(waybackup_snapshots.scid) .limit(1) - .with_for_update(skip_locked=True) - ).scalar_one_or_none() - - if row is None: - return None - - row.response = "LOCK" - + ) + .scalar_one_or_none() + ) + + if scid is None: + return None + + # try to atomically claim the row by updating only if still unclaimed + result = session.execute( + update(waybackup_snapshots) + .where(and_(waybackup_snapshots.scid == scid, waybackup_snapshots.response.is_(None))) + .values(response="LOCK") + ) + + # if another worker claimed it first, rowcount will be 0 and cannot be claimed by this worker. + if result.rowcount == 0: + session.commit() + return None + + # The row has been claimed by the worker and can now be fetched. + row = session.execute(select(waybackup_snapshots).where(waybackup_snapshots.scid == scid)).scalar_one_or_none() + session.commit() return row if __on_sqlite(): From e1dd090ca6d499f940eb29a535410cf5a4f29eba Mon Sep 17 00:00:00 2001 From: Victor Johnston Date: Mon, 19 Jan 2026 11:50:22 +0100 Subject: [PATCH 4/6] Harden closing of database connections --- pywaybackup/PyWayBackup.py | 7 ++ pywaybackup/Snapshot.py | 37 +++++++-- pywaybackup/SnapshotCollection.py | 130 +++++++++++++++++------------- pywaybackup/Worker.py | 21 +++++ pywaybackup/db.py | 25 +++++- 5 files changed, 158 insertions(+), 62 deletions(-) diff --git a/pywaybackup/PyWayBackup.py b/pywaybackup/PyWayBackup.py index 0ae84af..75418d1 100644 --- a/pywaybackup/PyWayBackup.py +++ b/pywaybackup/PyWayBackup.py @@ -363,6 +363,7 @@ def _workflow(self): resources after the backup is complete. """ + collection = None try: self._startup() @@ -384,6 +385,12 @@ def _workflow(self): self._keep = True ex.exception(message="", e=e) finally: + # if a collection was created during the workflow, close its DB session cleanly + try: + if collection: + collection.close() + except Exception: + pass self._shutdown() def paths(self, rel: bool = False) -> dict: diff --git a/pywaybackup/Snapshot.py b/pywaybackup/Snapshot.py index 3755501..75c5b54 100644 --- a/pywaybackup/Snapshot.py +++ b/pywaybackup/Snapshot.py @@ -3,6 +3,7 @@ from pywaybackup.db import Database, select, update, waybackup_snapshots, and_ from pywaybackup.helper import url_split +from pywaybackup.Verbosity import Verbosity as vb class Snapshot: @@ -77,6 +78,7 @@ def __get_row(): session = self._db.session # get next available SnapshotId + vb.write(verbose=True, content=f"[Snapshot.fetch] selecting next scid") scid = ( session.execute( select(waybackup_snapshots.scid) @@ -88,6 +90,7 @@ def __get_row(): ) if scid is None: + vb.write(verbose=True, content=f"[Snapshot.fetch] no unprocessed scid found") return None # try to atomically claim the row by updating only if still unclaimed @@ -98,13 +101,25 @@ def __get_row(): ) # if another worker claimed it first, rowcount will be 0 and cannot be claimed by this worker. + vb.write(verbose=True, content=f"[Snapshot.fetch] attempted to claim scid={scid}, rowcount={result.rowcount}") if result.rowcount == 0: - session.commit() + try: + session.commit() + except Exception: + pass + vb.write(verbose=True, content=f"[Snapshot.fetch] scid={scid} already claimed by another worker") return None # The row has been claimed by the worker and can now be fetched. row = session.execute(select(waybackup_snapshots).where(waybackup_snapshots.scid == scid)).scalar_one_or_none() - session.commit() + try: + session.commit() + except Exception: + try: + session.rollback() + except Exception: + pass + vb.write(verbose=True, content=f"[Snapshot.fetch] claimed scid={scid} and fetched row") return row if __on_sqlite(): @@ -122,10 +137,20 @@ def modify(self, column, value): value: New value to set for the column. """ column = getattr(waybackup_snapshots, column) - self._db.session.execute( - update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value}) - ) - self._db.session.commit() + try: + vb.write(verbose=True, content=f"[Snapshot.modify] updating scid={self.scid} column={column.key}") + self._db.session.execute( + update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value}) + ) + self._db.session.commit() + vb.write(verbose=True, content=f"[Snapshot.modify] update committed scid={self.scid} column={column.key}") + except Exception as e: + vb.write(verbose=True, content=f"[Snapshot.modify] update failed scid={self.scid} error={e}; rolling back") + try: + self._db.session.rollback() + except Exception: + pass + raise def create_output(self): """ diff --git a/pywaybackup/SnapshotCollection.py b/pywaybackup/SnapshotCollection.py index dcafcf9..81b8055 100644 --- a/pywaybackup/SnapshotCollection.py +++ b/pywaybackup/SnapshotCollection.py @@ -163,45 +163,56 @@ def _insert_batch_safe(line_batch): vb.write(verbose=None, content="\nInserting CDX data into database...") - progressbar = Progressbar( + try: + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] starting insert_cdx operation") + progressbar = Progressbar( unit=" lines", total=self._cdx_total, desc="process cdx".ljust(15), ascii="░▒█", bar_format="{l_bar}{bar:50}{r_bar}{bar:-10b}", - ) - line_batchsize = 2500 - line_batch = [] - total_inserted = 0 - first_line = True - - with self.cdxfile as f: - for line in f: - if first_line: - first_line = False - continue - line = line.strip() - if line.endswith("]]"): - line = line.rsplit("]", 1)[0] - if line.endswith(","): - line = line.rsplit(",", 1)[0] - - try: - line_batch.append(__parse_line(line)) - except json.decoder.JSONDecodeError: - self._snapshot_faulty += 1 - continue - - if len(line_batch) >= line_batchsize: + ) + line_batchsize = 2500 + line_batch = [] + total_inserted = 0 + first_line = True + + with self.cdxfile as f: + for line in f: + if first_line: + first_line = False + continue + line = line.strip() + if line.endswith("]]"): + line = line.rsplit("]", 1)[0] + if line.endswith(","): + line = line.rsplit(",", 1)[0] + + try: + line_batch.append(__parse_line(line)) + except json.decoder.JSONDecodeError: + self._snapshot_faulty += 1 + continue + + if len(line_batch) >= line_batchsize: + total_inserted += _insert_batch_safe(line_batch=line_batch) + line_batch = [] + progressbar.update(line_batchsize) + + if line_batch: total_inserted += _insert_batch_safe(line_batch=line_batch) - line_batch = [] - progressbar.update(line_batchsize) - - if line_batch: - total_inserted += _insert_batch_safe(line_batch=line_batch) - progressbar.update(len(line_batch)) - - self.db.session.commit() + progressbar.update(len(line_batch)) + + self.db.session.commit() + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] insert_cdx commit successful") + except Exception as e: + vb.write(verbose=True, content=f"[SnapshotCollection._insert_cdx] exception: {e}; rolling back") + try: + self.db.session.rollback() + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] rollback successful") + except Exception: + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] rollback failed") + raise def _index_snapshots(self): """ @@ -297,29 +308,40 @@ def _skip_set(self): """ # ? for now per row / no bulk for compatibility - with self.csvfile as f: - total_skipped = 0 - for row in f: - self.db.session.execute( - update(waybackup_snapshots) - .where( - and_( - waybackup_snapshots.timestamp == row["timestamp"], - waybackup_snapshots.url_origin == row["url_origin"], + try: + vb.write(verbose=True, content="[SnapshotCollection._skip_set] applying CSV skips to DB") + with self.csvfile as f: + total_skipped = 0 + for row in f: + self.db.session.execute( + update(waybackup_snapshots) + .where( + and_( + waybackup_snapshots.timestamp == row["timestamp"], + waybackup_snapshots.url_origin == row["url_origin"], + ) + ) + .values( + url_archive=row["url_archive"], + redirect_url=row["redirect_url"], + redirect_timestamp=row["redirect_timestamp"], + response=row["response"], + file=row["file"], ) ) - .values( - url_archive=row["url_archive"], - redirect_url=row["redirect_url"], - redirect_timestamp=row["redirect_timestamp"], - response=row["response"], - file=row["file"], - ) - ) - total_skipped += 1 - - self.db.session.commit() - self._filter_skip = total_skipped + total_skipped += 1 + + self.db.session.commit() + self._filter_skip = total_skipped + vb.write(verbose=True, content=f"[SnapshotCollection._skip_set] commit successful, total_skipped={total_skipped}") + except Exception as e: + vb.write(verbose=True, content=f"[SnapshotCollection._skip_set] exception: {e}; rolling back") + try: + self.db.session.rollback() + vb.write(verbose=True, content="[SnapshotCollection._skip_set] rollback successful") + except Exception: + vb.write(verbose=True, content="[SnapshotCollection._skip_set] rollback failed") + raise def count_total(self) -> int: return self.db.session.query(waybackup_snapshots.scid).count() diff --git a/pywaybackup/Worker.py b/pywaybackup/Worker.py index 6f66a6c..4a9ff01 100644 --- a/pywaybackup/Worker.py +++ b/pywaybackup/Worker.py @@ -21,6 +21,27 @@ def init(self): self.db = Database() self.connection = http.client.HTTPSConnection("web.archive.org") + def close(self): + """ + Try to close the database and connection. + """ + try: + if hasattr(self, "db") and self.db: + try: + vb.write(verbose=True, content=f"[Worker.close] closing DB for worker {self.id}") + self.db.close() + vb.write(verbose=True, content=f"[Worker.close] DB closed for worker {self.id}") + except Exception: + pass + finally: + try: + if hasattr(self, "connection") and self.connection: + vb.write(verbose=True, content=f"[Worker.close] closing connection for worker {self.id}") + self.connection.close() + vb.write(verbose=True, content=f"[Worker.close] connection closed for worker {self.id}") + except Exception: + pass + def assign_snapshot(self, total_amount: int): self.snapshot = Snapshot(self.db, output=self.output, mode=self.mode) self.total_amount = total_amount diff --git a/pywaybackup/db.py b/pywaybackup/db.py index 508f833..e4fb412 100644 --- a/pywaybackup/db.py +++ b/pywaybackup/db.py @@ -18,6 +18,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from typing import Optional # python 3.8 +from pywaybackup.Verbosity import Verbosity as vb Base = declarative_base() @@ -129,8 +130,28 @@ def __init__(self): self.session = self.sessman() def close(self): - self.session.commit() - self.session.close() + """ + Try to commit any pending work; if that fails, rollback to avoid leaving open transactions + """ + try: + if self.session.in_transaction(): + vb.write(verbose=True, content=f"[Database.close] session in transaction: committing") + try: + self.session.commit() + vb.write(verbose=True, content=f"[Database.close] commit successful") + except Exception as e: + vb.write(verbose=True, content=f"[Database.close] commit failed: {e}; rolling back") + try: + self.session.rollback() + vb.write(verbose=True, content=f"[Database.close] rollback successful") + except Exception: + vb.write(verbose=True, content=f"[Database.close] rollback failed") + finally: + try: + self.session.close() + vb.write(verbose=True, content=f"[Database.close] session closed") + except Exception as e: + vb.write(verbose=True, content=f"[Database.close] session close failed: {e}") def write_progress(self, done: int, total: int): """ From 603d544596ac252524747abbd54589b4bd3d65ee Mon Sep 17 00:00:00 2001 From: jorntx <> Date: Wed, 21 Jan 2026 13:56:28 +0100 Subject: [PATCH 5/6] Refactor: Remove limiter functionality: remove path depth option and update related logic --- .gitignore | 4 +- README.md | 3 - pywaybackup/Arguments.py | 6 - pywaybackup/PyWayBackup.py | 5 - pywaybackup/SnapshotCollection.py | 265 +++++++++++++++--------------- 5 files changed, 139 insertions(+), 144 deletions(-) diff --git a/.gitignore b/.gitignore index bb6b52e..1fd4802 100644 --- a/.gitignore +++ b/.gitignore @@ -169,4 +169,6 @@ cython_debug/ # custom test.py waybackup_snapshots/ -output/ \ No newline at end of file +output/ +.DS_Store +.github/.DS_Store \ No newline at end of file diff --git a/README.md b/README.md index df8ff9b..2631eda 100644 --- a/README.md +++ b/README.md @@ -148,9 +148,6 @@ Parameters for archive.org CDX query. No effect on snapshot download itself. - **`--max-snapshots-per-url`** ``:
Limits the number of snapshots kept per unique URL after CDX insertion and filtering. Selection is distributed across the requested date range so retained snapshots represent the whole timespan. Useful to cap harvest depth for large sites. Example: `--limit 10000 --max-snapshots-per-url 100` queries up to 10k snapshots then keeps up to 100 per unique URL. -- **`--path-depth`** ``:
- Limit harvested URLs by path depth. `0` = only `example.com/` (root), `1` = root + immediate children (e.g. `example.com/foo/`), `2` = two levels, etc. Applied after CDX insertion and mode filtering. Combine with `--max-snapshots-per-url` to cap snapshots per kept URL. - - **Range Selection:**
Set the query range in years (`range`) or a timestamp (`start` and/or `end`). If `range` then ignores `start` and `end`. Format for timestamps: YYYYMMDDhhmmss. Timestamp can as specific as needed (year 2019, year+month+day 20190101, ...). diff --git a/pywaybackup/Arguments.py b/pywaybackup/Arguments.py index 05118f4..422aae4 100644 --- a/pywaybackup/Arguments.py +++ b/pywaybackup/Arguments.py @@ -46,12 +46,6 @@ def __init__(self): metavar="", help="limit number of snapshots to keep per unique URL (distributed across date range)", ) - behavior.add_argument( - "--path-depth", - type=int, - metavar="", - help="limit harvested URLs by path depth: 0=root only, 1=include immediate children, etc.", - ) special = parser.add_argument_group("special") special.add_argument("--reset", action="store_true", help="reset the job and ignore existing cdx/db/csv files") diff --git a/pywaybackup/PyWayBackup.py b/pywaybackup/PyWayBackup.py index 11f77c5..85589b5 100644 --- a/pywaybackup/PyWayBackup.py +++ b/pywaybackup/PyWayBackup.py @@ -134,7 +134,6 @@ def __init__( silent: bool = True, debug: bool = False, max_snapshots_per_url: int = None, - path_depth: int = None, **kwargs: dict, ): self._url = url @@ -161,7 +160,6 @@ def __init__( self._reset = reset self._keep = keep self._max_snapshots_per_url = max_snapshots_per_url - self._path_depth = path_depth # module exclusive self._silent = silent @@ -189,11 +187,9 @@ def __init__( + str(self._end) + str(self._limit) + str(self._max_snapshots_per_url) - + str(self._path_depth) + str(self._filetype) + str(self._statuscode) ) - # if sys.argv is empty, we assume this is being run as a module if not sys.argv[1:]: self._command = "pywaybackup_module" @@ -335,7 +331,6 @@ def _prep_collection(self) -> SnapshotCollection: cdxfile=self._cdxfile, csvfile=self._csvfile, max_snapshots_per_url=self._max_snapshots_per_url, - path_depth=self._path_depth, ) collection.print_calculation() return collection diff --git a/pywaybackup/SnapshotCollection.py b/pywaybackup/SnapshotCollection.py index 25c5478..eece04c 100644 --- a/pywaybackup/SnapshotCollection.py +++ b/pywaybackup/SnapshotCollection.py @@ -21,7 +21,6 @@ def __init__(self): self._mode_first = False self._mode_last = False self._max_snapshots_per_url = None - self._path_depth = None self._cdx_total = 0 # absolute amount of snapshots in cdx file self._snapshot_total = 0 # absolute amount of snapshots in db @@ -63,7 +62,7 @@ def _finalize_db(self): self.db.write_progress(self._snapshot_handled, self._snapshot_total) self.db.session.close() - def load(self, mode: str, cdxfile: CDXfile, csvfile: CSVfile, max_snapshots_per_url: int = None, path_depth: int = None): + def load(self, mode: str, cdxfile: CDXfile, csvfile: CSVfile, max_snapshots_per_url: int = None): """ Insert the content of the cdx and csv file into the snapshot table. """ @@ -92,7 +91,6 @@ def load(self, mode: str, cdxfile: CDXfile, csvfile: CSVfile, max_snapshots_per_ vb.write(content="\nFiltering snapshots (last or first version)...") # set per-url limit (if provided) and then filter self._max_snapshots_per_url = max_snapshots_per_url - self._path_depth = path_depth self._filter_snapshots() # filter: keep newest or oldest based on MODE self.db.set_filter_complete() else: @@ -239,94 +237,51 @@ def _index_snapshots(self): def _filter_snapshots(self): """ - Filter the snapshot table. - - - MODE_LAST → keep only the latest snapshot (highest timestamp) per url_origin. - - MODE_FIRST → keep only the earliest snapshot (lowest timestamp) per url_origin. + Filter the snapshot table by applying mode and per-URL limits. + Orchestrates the full filtering pipeline. """ + self._filter_mode_snapshots() + self._filter_by_max_snapshots_per_url() + self._enumerate_counter() + self._count_response_status() - def _filter_mode(): - self._filter_mode = 0 - if self._mode_last or self._mode_first: - ordering = ( - waybackup_snapshots.timestamp.desc() if self._mode_last else waybackup_snapshots.timestamp.asc() - ) - # assign row numbers per url_origin - rownum = ( - func.row_number() - .over( - partition_by=waybackup_snapshots.url_origin, - order_by=ordering, - ) - .label("rn") - ) - subq = select(waybackup_snapshots.scid, rownum).subquery() - # keep rn == 1, delete all others - keepers = select(subq.c.scid).where(subq.c.rn == 1) - stmt = delete(waybackup_snapshots).where(~waybackup_snapshots.scid.in_(keepers)) - result = self.db.session.execute(stmt) - self.db.session.commit() - self._filter_mode = result.rowcount - - def _enumerate_counter(): - # this sets the counter (snapshot number x / y) to 1 ... n - offset = 1 - batch_size = 5000 - while True: - rows = ( - self.db.session.execute( - select(waybackup_snapshots.scid) - .where(waybackup_snapshots.counter.is_(None)) - .order_by(waybackup_snapshots.scid) - .limit(batch_size) - ) - .scalars() - .all() - ) - if not rows: - break - mappings = [{"scid": scid, "counter": i} for i, scid in enumerate(rows, start=offset)] - self.db.session.bulk_update_mappings(waybackup_snapshots, mappings) - self.db.session.commit() - offset += len(rows) - - _filter_mode() - - # Apply path-depth pruning if requested: remove snapshots whose URL path depth - # exceeds `self._path_depth`. Root path has depth 0; immediate children depth 1. - self._filter_path_depth_deleted = 0 - if self._path_depth is not None: - try: - depth_limit = int(self._path_depth) - except Exception: - depth_limit = None - if depth_limit is not None: - # iterate distinct origins and delete those exceeding depth - origins = ( - self.db.session.execute(select(waybackup_snapshots.url_origin).distinct()) - .scalars() - .all() + def _filter_mode_snapshots(self): + """ + Filter snapshots by mode (first or last version). + + - MODE_LAST: keep only the latest snapshot (highest timestamp) per url_origin. + - MODE_FIRST: keep only the earliest snapshot (lowest timestamp) per url_origin. + """ + self._filter_mode = 0 + if self._mode_last or self._mode_first: + ordering = ( + waybackup_snapshots.timestamp.desc() if self._mode_last else waybackup_snapshots.timestamp.asc() + ) + # assign row numbers per url_origin + rownum = ( + func.row_number() + .over( + partition_by=waybackup_snapshots.url_origin, + order_by=ordering, ) - total_deleted = 0 - for origin in origins: - if not origin: - continue - domain, subdir, filename = url_split(origin) - subdir = subdir or "" - path_segments = [p for p in subdir.strip("/").split("/") if p] - depth = len(path_segments) - if depth > depth_limit: - result = self.db.session.execute( - delete(waybackup_snapshots).where(waybackup_snapshots.url_origin == origin) - ) - total_deleted += result.rowcount - self.db.session.commit() - self._filter_path_depth_deleted += total_deleted - - # Apply per-URL snapshot limiting distributed across the date range - # This keeps up to `self._max_snapshots_per_url` snapshots per `url_origin`. - # Selection is distributed across the available timestamps so the whole - # date range is represented (first and last snapshots preserved when limit>1). + .label("rn") + ) + subq = select(waybackup_snapshots.scid, rownum).subquery() + # keep rn == 1, delete all others + keepers = select(subq.c.scid).where(subq.c.rn == 1) + stmt = delete(waybackup_snapshots).where(~waybackup_snapshots.scid.in_(keepers)) + result = self.db.session.execute(stmt) + self.db.session.commit() + self._filter_mode = result.rowcount + + def _filter_by_max_snapshots_per_url(self): + """ + Limit snapshots per unique URL, distributed across the date range. + + Keeps up to `self._max_snapshots_per_url` snapshots per `url_origin`. + Selection is distributed across available timestamps to represent the full timespan. + First and last snapshots are preserved when limit > 1. + """ self._filter_snapshot_deleted = 0 limit = None if self._max_snapshots_per_url: @@ -334,52 +289,104 @@ def _enumerate_counter(): limit = int(self._max_snapshots_per_url) except (TypeError, ValueError): limit = None - if limit and limit > 0: - # find origins that exceed the limit - origins = ( + + if not (limit and limit > 0): + return + + # Find origins that exceed the limit + origins = ( + self.db.session.execute( + select(waybackup_snapshots.url_origin, func.count().label("cnt")) + .group_by(waybackup_snapshots.url_origin) + .having(func.count() > limit) + ) + .all() + ) + + for origin_row in origins: + origin = origin_row[0] + # Fetch ordered scids for this origin (ascending timestamps) + scid_rows = ( self.db.session.execute( - select(waybackup_snapshots.url_origin, func.count().label("cnt")) - .group_by(waybackup_snapshots.url_origin) - .having(func.count() > limit) + select(waybackup_snapshots.scid) + .where(waybackup_snapshots.url_origin == origin) + .order_by(waybackup_snapshots.timestamp.asc()) ) + .scalars() .all() ) - for origin_row in origins: - origin = origin_row[0] - # fetch ordered scids for this origin (ascending timestamps) - scid_rows = ( - self.db.session.execute( - select(waybackup_snapshots.scid) - .where(waybackup_snapshots.url_origin == origin) - .order_by(waybackup_snapshots.timestamp.asc()) - ) - .scalars() - .all() - ) - total = len(scid_rows) - if total <= limit: - continue - # compute indices to keep (distributed across range) - indices = [] - if limit == 1: - # pick middle snapshot as representative - indices = [total // 2] - else: - for i in range(limit): - idx = round(i * (total - 1) / (limit - 1)) - indices.append(int(idx)) - # ensure unique and valid indices - indices = sorted(set(max(0, min(total - 1, i)) for i in indices)) - keep_scids = [scid_rows[i] for i in indices] - # delete non-kept snapshots for this origin - stmt = delete(waybackup_snapshots).where( - and_(waybackup_snapshots.url_origin == origin, ~waybackup_snapshots.scid.in_(keep_scids)) - ) - result = self.db.session.execute(stmt) - self.db.session.commit() - self._filter_snapshot_deleted += result.rowcount + total = len(scid_rows) + if total <= limit: + continue + + # Compute indices to keep (distributed across range) + indices = self._compute_distributed_indices(total, limit) + keep_scids = [scid_rows[i] for i in indices] - _enumerate_counter() + # Delete non-kept snapshots for this origin + stmt = delete(waybackup_snapshots).where( + and_(waybackup_snapshots.url_origin == origin, ~waybackup_snapshots.scid.in_(keep_scids)) + ) + result = self.db.session.execute(stmt) + self.db.session.commit() + self._filter_snapshot_deleted += result.rowcount + + def _compute_distributed_indices(self, total: int, limit: int) -> list: + """ + Compute which indices to keep for time-distributed snapshot selection. + + Ensures snapshots are evenly distributed across the date range, + preserving first and last snapshots when limit > 1. + + Args: + total: Total number of snapshots available. + limit: Maximum number of snapshots to keep. + + Returns: + Sorted list of indices to keep. + """ + indices = [] + if limit == 1: + # Pick middle snapshot as representative + indices = [total // 2] + else: + for i in range(limit): + idx = round(i * (total - 1) / (limit - 1)) + indices.append(int(idx)) + + # Ensure unique and valid indices + indices = sorted(set(max(0, min(total - 1, i)) for i in indices)) + return indices + + def _enumerate_counter(self): + """ + Assign sequential counter values to all snapshots. + + Sets the counter field (snapshot number x / y) for ordering and progress tracking. + Processes in batches for efficiency. + """ + offset = 1 + batch_size = 5000 + while True: + rows = ( + self.db.session.execute( + select(waybackup_snapshots.scid) + .where(waybackup_snapshots.counter.is_(None)) + .order_by(waybackup_snapshots.scid) + .limit(batch_size) + ) + .scalars() + .all() + ) + if not rows: + break + mappings = [{"scid": scid, "counter": i} for i, scid in enumerate(rows, start=offset)] + self.db.session.bulk_update_mappings(waybackup_snapshots, mappings) + self.db.session.commit() + offset += len(rows) + + def _count_response_status(self): + """Count snapshots with non-retrieval status codes (404, 301).""" self._filter_response = ( self.db.session.query(waybackup_snapshots).where(waybackup_snapshots.response.in_(["404", "301"])).count() ) From 6628fe603d2faa2cd7fcf69cc68057b8489b6bac Mon Sep 17 00:00:00 2001 From: jorntx <> Date: Wed, 21 Jan 2026 14:10:38 +0100 Subject: [PATCH 6/6] Refactor: split _filter_by_max_snapshots_per_url into 5 smaller methods and added doc strings --- pywaybackup/SnapshotCollection.py | 120 +++++++++++++++++++++++------- 1 file changed, 92 insertions(+), 28 deletions(-) diff --git a/pywaybackup/SnapshotCollection.py b/pywaybackup/SnapshotCollection.py index eece04c..1fc1cba 100644 --- a/pywaybackup/SnapshotCollection.py +++ b/pywaybackup/SnapshotCollection.py @@ -283,17 +283,43 @@ def _filter_by_max_snapshots_per_url(self): First and last snapshots are preserved when limit > 1. """ self._filter_snapshot_deleted = 0 + limit = self._parse_snapshot_limit() + + if not (limit and limit > 0): + return + + origins = self._find_origins_exceeding_limit(limit) + + for origin_row in origins: + origin = origin_row[0] + deleted_count = self._apply_limit_to_origin(origin, limit) + self._filter_snapshot_deleted += deleted_count + + def _parse_snapshot_limit(self) -> int: + """ + Parse and validate the max_snapshots_per_url configuration. + + Returns: + The validated limit as an integer, or None if invalid/not set. + """ limit = None if self._max_snapshots_per_url: try: limit = int(self._max_snapshots_per_url) except (TypeError, ValueError): limit = None + return limit - if not (limit and limit > 0): - return - - # Find origins that exceed the limit + def _find_origins_exceeding_limit(self, limit: int) -> list: + """ + Query the database for origins that have more snapshots than the limit. + + Args: + limit: Maximum number of snapshots allowed per origin. + + Returns: + List of origin rows that exceed the limit. + """ origins = ( self.db.session.execute( select(waybackup_snapshots.url_origin, func.count().label("cnt")) @@ -302,34 +328,72 @@ def _filter_by_max_snapshots_per_url(self): ) .all() ) + return origins - for origin_row in origins: - origin = origin_row[0] - # Fetch ordered scids for this origin (ascending timestamps) - scid_rows = ( - self.db.session.execute( - select(waybackup_snapshots.scid) - .where(waybackup_snapshots.url_origin == origin) - .order_by(waybackup_snapshots.timestamp.asc()) - ) - .scalars() - .all() - ) - total = len(scid_rows) - if total <= limit: - continue + def _apply_limit_to_origin(self, origin: str, limit: int) -> int: + """ + Apply the snapshot limit to a specific origin by deleting excess snapshots. + + Fetches all snapshots for the origin, computes which ones to keep using + distributed selection, and deletes the rest. + + Args: + origin: The url_origin to apply the limit to. + limit: Maximum number of snapshots to keep. + + Returns: + Number of snapshots deleted for this origin. + """ + scid_rows = self._fetch_ordered_scids(origin) + total = len(scid_rows) + + if total <= limit: + return 0 - # Compute indices to keep (distributed across range) - indices = self._compute_distributed_indices(total, limit) - keep_scids = [scid_rows[i] for i in indices] + indices = self._compute_distributed_indices(total, limit) + keep_scids = [scid_rows[i] for i in indices] + + deleted_count = self._delete_excess_snapshots(origin, keep_scids) + return deleted_count - # Delete non-kept snapshots for this origin - stmt = delete(waybackup_snapshots).where( - and_(waybackup_snapshots.url_origin == origin, ~waybackup_snapshots.scid.in_(keep_scids)) + def _fetch_ordered_scids(self, origin: str) -> list: + """ + Fetch all snapshot IDs for a given origin, ordered by timestamp ascending. + + Args: + origin: The url_origin to fetch snapshots for. + + Returns: + List of scid values in timestamp order. + """ + scid_rows = ( + self.db.session.execute( + select(waybackup_snapshots.scid) + .where(waybackup_snapshots.url_origin == origin) + .order_by(waybackup_snapshots.timestamp.asc()) ) - result = self.db.session.execute(stmt) - self.db.session.commit() - self._filter_snapshot_deleted += result.rowcount + .scalars() + .all() + ) + return scid_rows + + def _delete_excess_snapshots(self, origin: str, keep_scids: list) -> int: + """ + Delete snapshots for an origin that are not in the keep list. + + Args: + origin: The url_origin to delete snapshots from. + keep_scids: List of scid values to preserve. + + Returns: + Number of snapshots deleted. + """ + stmt = delete(waybackup_snapshots).where( + and_(waybackup_snapshots.url_origin == origin, ~waybackup_snapshots.scid.in_(keep_scids)) + ) + result = self.db.session.execute(stmt) + self.db.session.commit() + return result.rowcount def _compute_distributed_indices(self, total: int, limit: int) -> list: """