Skip to content
Open
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ venv/
ENV/
env.bak/
venv.bak/
copilot-instructions.md

# Spyder project settings
.spyderproject
Expand Down Expand Up @@ -167,3 +168,7 @@ cython_debug/

# custom
test.py
waybackup_snapshots/
output/
.DS_Store
.github/.DS_Store
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ Parameters for archive.org CDX query. No effect on snapshot download itself.
- **`--limit`** `<count>`:<br>
Limits the snapshots fetched from archive.org CDX. (Will have no effect on existing CDX files)

- **`--max-snapshots-per-url`** `<count>`:<br>
Limits the number of snapshots kept per unique URL after CDX insertion and filtering. Selection is distributed across the requested date range so retained snapshots represent the whole timespan. Useful to cap harvest depth for large sites. Example: `--limit 10000 --max-snapshots-per-url 100` queries up to 10k snapshots then keeps up to 100 per unique URL.

- **Range Selection:**<br>
Set the query range in years (`range`) or a timestamp (`start` and/or `end`). If `range` then ignores `start` and `end`. Format for timestamps: YYYYMMDDhhmmss. Timestamp can as specific as needed (year 2019, year+month+day 20190101, ...).

Expand Down
8 changes: 7 additions & 1 deletion pywaybackup/Arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ def __init__(self):
behavior.add_argument("--retry", type=int, default=0, metavar="", help="retry failed downloads (opt tries as int, else infinite)")
behavior.add_argument("--workers", type=int, default=1, metavar="", help="number of workers (simultaneous downloads)")
behavior.add_argument("--delay", type=int, default=0, metavar="", help="delay between each download in seconds")
behavior.add_argument("--wait", type=int, default=15, metavar="", help="seconds to wait before renewing connection after HTTP errors or snapshot download errors (default: 15)",)
behavior.add_argument(
"--max-snapshots-per-url",
type=int,
metavar="",
help="limit number of snapshots to keep per unique URL (distributed across date range)",
)
behavior.add_argument("--wait", type=int, default=15, metavar="", help="seconds to wait before renewing connection after HTTP errors or snapshot download errors (default: 15)")

special = parser.add_argument_group("special")
special.add_argument("--reset", action="store_true", help="reset the job and ignore existing cdx/db/csv files")
Expand Down
18 changes: 16 additions & 2 deletions pywaybackup/PyWayBackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(
keep: bool = False,
silent: bool = True,
debug: bool = False,
max_snapshots_per_url: int = None,
**kwargs: dict,
):
self._url = url
Expand Down Expand Up @@ -161,6 +162,7 @@ def __init__(

self._reset = reset
self._keep = keep
self._max_snapshots_per_url = max_snapshots_per_url

# module exclusive
self._silent = silent
Expand All @@ -187,10 +189,10 @@ def __init__(
+ str(self._start)
+ str(self._end)
+ str(self._limit)
+ str(self._max_snapshots_per_url)
+ str(self._filetype)
+ str(self._statuscode)
)

# if sys.argv is empty, we assume this is being run as a module
if not sys.argv[1:]:
self._command = "pywaybackup_module"
Expand Down Expand Up @@ -327,7 +329,12 @@ def _prep_collection(self) -> SnapshotCollection:
SnapshotCollection: The initialized and loaded snapshot collection.
"""
collection = SnapshotCollection()
collection.load(mode=self._mode, cdxfile=self._cdxfile, csvfile=self._csvfile)
collection.load(
mode=self._mode,
cdxfile=self._cdxfile,
csvfile=self._csvfile,
max_snapshots_per_url=self._max_snapshots_per_url,
)
collection.print_calculation()
return collection

Expand Down Expand Up @@ -363,6 +370,7 @@ def _workflow(self):
resources after the backup is complete.

"""
collection = None
try:
self._startup()

Expand All @@ -384,6 +392,12 @@ def _workflow(self):
self._keep = True
ex.exception(message="", e=e)
finally:
# if a collection was created during the workflow, close its DB session cleanly
try:
if collection:
collection.close()
except Exception:
pass
self._shutdown()

def paths(self, rel: bool = False) -> dict:
Expand Down
78 changes: 62 additions & 16 deletions pywaybackup/Snapshot.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
import threading

from pywaybackup.db import Database, select, update, waybackup_snapshots
from pywaybackup.db import Database, select, update, waybackup_snapshots, and_
from pywaybackup.helper import url_split
from pywaybackup.Verbosity import Verbosity as vb


class Snapshot:
Expand Down Expand Up @@ -70,20 +71,55 @@ def __on_sqlite():
return False

def __get_row():
with self._db.session.begin():
row = self._db.session.execute(
select(waybackup_snapshots)
# Atomic claim: find next unprocessed scid, set response='LOCK' only if still unprocessed,
# then fetch that row. This avoids relying on FOR UPDATE or explicit nested transactions
# which can trigger "A transaction is already begun on this Session" errors.

session = self._db.session

# get next available SnapshotId
vb.write(verbose=True, content=f"[Snapshot.fetch] selecting next scid")
scid = (
session.execute(
select(waybackup_snapshots.scid)
.where(waybackup_snapshots.response.is_(None))
.order_by(waybackup_snapshots.scid)
.limit(1)
.with_for_update(skip_locked=True)
).scalar_one_or_none()

if row is None:
return None

row.response = "LOCK"

)
.scalar_one_or_none()
)

if scid is None:
vb.write(verbose=True, content=f"[Snapshot.fetch] no unprocessed scid found")
return None

# try to atomically claim the row by updating only if still unclaimed
result = session.execute(
update(waybackup_snapshots)
.where(and_(waybackup_snapshots.scid == scid, waybackup_snapshots.response.is_(None)))
.values(response="LOCK")
)

# if another worker claimed it first, rowcount will be 0 and cannot be claimed by this worker.
vb.write(verbose=True, content=f"[Snapshot.fetch] attempted to claim scid={scid}, rowcount={result.rowcount}")
if result.rowcount == 0:
try:
session.commit()
except Exception:
pass
vb.write(verbose=True, content=f"[Snapshot.fetch] scid={scid} already claimed by another worker")
return None

# The row has been claimed by the worker and can now be fetched.
row = session.execute(select(waybackup_snapshots).where(waybackup_snapshots.scid == scid)).scalar_one_or_none()
try:
session.commit()
except Exception:
try:
session.rollback()
except Exception:
pass
vb.write(verbose=True, content=f"[Snapshot.fetch] claimed scid={scid} and fetched row")
return row

if __on_sqlite():
Expand All @@ -101,10 +137,20 @@ def modify(self, column, value):
value: New value to set for the column.
"""
column = getattr(waybackup_snapshots, column)
self._db.session.execute(
update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value})
)
self._db.session.commit()
try:
vb.write(verbose=True, content=f"[Snapshot.modify] updating scid={self.scid} column={column.key}")
self._db.session.execute(
update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value})
)
self._db.session.commit()
vb.write(verbose=True, content=f"[Snapshot.modify] update committed scid={self.scid} column={column.key}")
except Exception as e:
vb.write(verbose=True, content=f"[Snapshot.modify] update failed scid={self.scid} error={e}; rolling back")
try:
self._db.session.rollback()
except Exception:
pass
raise

def create_output(self):
"""
Expand Down
Loading