Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ DVUploader provides several environment variables that allow you to control retr
- `DVUPLOADER_MIN_RETRY_TIME`: Minimum wait time between retries in seconds (default: 1)
- `DVUPLOADER_RETRY_MULTIPLIER`: Multiplier for exponential backoff (default: 0.1)
- `DVUPLOADER_MAX_PKG_SIZE`: Maximum package size in bytes (default: 2GB)
- `DVUPLOADER_LOCK_WAIT_TIME`: Time to wait between checks for dataset lock (default: 10 seconds)
- `DVUPLOADER_LOCK_TIMEOUT`: Timeout for dataset lock check in seconds (default: 300 seconds)

**Setting via environment:**
```bash
Expand All @@ -135,6 +137,8 @@ export DVUPLOADER_MAX_RETRY_TIME=300
export DVUPLOADER_MIN_RETRY_TIME=2
export DVUPLOADER_RETRY_MULTIPLIER=0.2
export DVUPLOADER_MAX_PKG_SIZE=3221225472 # 3GB
export DVUPLOADER_LOCK_WAIT_TIME=5
export DVUPLOADER_LOCK_TIMEOUT=300
```

**Setting programmatically:**
Expand All @@ -148,6 +152,8 @@ dv.config(
min_retry_time=2,
retry_multiplier=0.2,
max_package_size=3 * 1024**3 # 3GB
lock_wait_time=5,
lock_timeout=300,
)

# Continue with your upload as normal
Expand Down
20 changes: 14 additions & 6 deletions dvuploader/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import yaml
import typer

from pathlib import Path
from pydantic import BaseModel
from typing import List, Optional

import typer
import yaml
from pydantic import BaseModel

from dvuploader import DVUploader, File
from dvuploader.utils import add_directory

Expand All @@ -29,6 +30,7 @@ class CliInput(BaseModel):

app = typer.Typer()


def _enumerate_filepaths(filepaths: List[str], recurse: bool) -> List[File]:
"""
Take a list of filepaths and transform it into a list of File objects, optionally recursing into each of them.
Expand All @@ -39,7 +41,7 @@ def _enumerate_filepaths(filepaths: List[str], recurse: bool) -> List[File]:

Returns:
List[File]: A list of File objects representing the files extracted from all filepaths.

Raises:
FileNotFoundError: If a filepath does not exist.
IsADirectoryError: If recurse is False and a filepath points to a directory instead of a file.
Expand Down Expand Up @@ -183,6 +185,9 @@ def main(
if filepaths is None:
filepaths = []

if recurse is None:
recurse = False

_validate_inputs(
filepaths=filepaths,
pid=pid,
Expand All @@ -200,7 +205,10 @@ def main(
api_token=api_token,
dataverse_url=dataverse_url,
persistent_id=pid,
files=_enumerate_filepaths(filepaths=filepaths, recurse=recurse),
files=_enumerate_filepaths(
filepaths=filepaths,
recurse=recurse,
),
)

uploader = DVUploader(files=cli_input.files)
Expand Down
4 changes: 4 additions & 0 deletions dvuploader/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


def config(
lock_wait_time: int = 10,
lock_timeout: int = 300,
max_retries: int = 15,
max_retry_time: int = 240,
min_retry_time: int = 1,
Expand Down Expand Up @@ -54,3 +56,5 @@ def config(
os.environ["DVUPLOADER_MIN_RETRY_TIME"] = str(min_retry_time)
os.environ["DVUPLOADER_RETRY_MULTIPLIER"] = str(retry_multiplier)
os.environ["DVUPLOADER_MAX_PKG_SIZE"] = str(max_package_size)
os.environ["DVUPLOADER_LOCK_WAIT_TIME"] = str(lock_wait_time)
os.environ["DVUPLOADER_LOCK_TIMEOUT"] = str(lock_timeout)
28 changes: 22 additions & 6 deletions dvuploader/directupload.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import asyncio
import httpx
from io import BytesIO
import json
import os
from typing import Dict, List, Optional, Tuple
from io import BytesIO
from typing import AsyncGenerator, Dict, List, Optional, Tuple
from urllib.parse import urljoin

import aiofiles
from typing import AsyncGenerator
import httpx
from rich.progress import Progress, TaskID

from dvuploader.file import File
from dvuploader.utils import build_url
from dvuploader.utils import build_url, init_logging, wait_for_dataset_unlock

TESTING = bool(os.environ.get("DVUPLOADER_TESTING", False))
MAX_FILE_DISPLAY = int(os.environ.get("DVUPLOADER_MAX_FILE_DISPLAY", 50))
MAX_RETRIES = int(os.environ.get("DVUPLOADER_MAX_RETRIES", 10))

LOCK_WAIT_TIME = int(os.environ.get("DVUPLOADER_LOCK_WAIT_TIME", 1.5))
LOCK_TIMEOUT = int(os.environ.get("DVUPLOADER_LOCK_TIMEOUT", 300))

assert isinstance(LOCK_WAIT_TIME, int), "DVUPLOADER_LOCK_WAIT_TIME must be an integer"
assert isinstance(LOCK_TIMEOUT, int), "DVUPLOADER_LOCK_TIMEOUT must be an integer"

assert isinstance(MAX_FILE_DISPLAY, int), (
"DVUPLOADER_MAX_FILE_DISPLAY must be an integer"
)
Expand All @@ -27,6 +33,9 @@
UPLOAD_ENDPOINT = "/api/datasets/:persistentId/addFiles?persistentId="
REPLACE_ENDPOINT = "/api/datasets/:persistentId/replaceFiles?persistentId="

# Initialize logging
init_logging()


async def direct_upload(
files: List[File],
Expand Down Expand Up @@ -250,7 +259,7 @@ async def _upload_singlepart(
"headers": headers,
"url": ticket["url"],
"content": upload_bytes(
file=file.handler, # type: ignore
file=file.get_handler(), # type: ignore
progress=progress,
pbar=pbar,
hash_func=file.checksum._hash_fun,
Expand Down Expand Up @@ -549,6 +558,13 @@ async def _add_files_to_ds(
pbar: Progress bar for registration.
"""

await wait_for_dataset_unlock(
session=session,
persistent_id=pid,
sleep_time=LOCK_WAIT_TIME,
timeout=LOCK_TIMEOUT,
)

novel_url = urljoin(dataverse_url, UPLOAD_ENDPOINT + pid)
replace_url = urljoin(dataverse_url, REPLACE_ENDPOINT + pid)

Expand Down
20 changes: 14 additions & 6 deletions dvuploader/dvuploader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import asyncio
from urllib.parse import urljoin
import httpx
import os
import rich
from typing import Dict, List, Optional
from urllib.parse import urljoin

import httpx
import rich
from pydantic import BaseModel
from rich.progress import Progress
from rich.table import Table
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress
from rich.table import Table

from dvuploader.directupload import (
TICKET_ENDPOINT,
Expand Down Expand Up @@ -239,7 +239,13 @@ def _check_duplicates(
to_skip.append(file.file_id)

if replace_existing:
assert file.file_id is not None, "File ID is required"
assert isinstance(file.file_id, int), "File ID must be an integer"

ds_file = self._get_dsfile_by_id(file.file_id, ds_files)

assert ds_file is not None, "Dataset file not found"

if not self._check_size(file, ds_file):
file._unchanged_data = False
else:
Expand Down Expand Up @@ -359,10 +365,12 @@ def _check_hashes(file: File, dsFile: Dict):
dsFile.get("directoryLabel", ""), dsFile["dataFile"]["filename"]
)

directory_label = file.directory_label if file.directory_label else ""

return (
file.checksum.value == hash_value
and file.checksum.type == hash_algo
and path == os.path.join(file.directory_label, file.file_name) # type: ignore
and path == os.path.join(directory_label, file.file_name) # type: ignore
)

@staticmethod
Expand Down
24 changes: 19 additions & 5 deletions dvuploader/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def extract_file_name(self):

if self.handler is None:
self._validate_filepath(self.filepath)
self.handler = open(self.filepath, "rb")
self._size = os.path.getsize(self.filepath)
else:
self._size = len(self.handler.read())
Expand All @@ -147,6 +146,15 @@ def extract_file_name(self):

return self

def get_handler(self) -> IO:
"""
Opens the file and initializes the file handler.
"""
if self.handler is not None:
return self.handler

return open(self.filepath, "rb")

@staticmethod
def _validate_filepath(path):
"""
Expand Down Expand Up @@ -190,21 +198,27 @@ def update_checksum_chunked(self, blocksize=2**20):
Note:
This method resets the file position to the start after reading.
"""
assert self.handler is not None, "File handler is not initialized."
assert self.checksum is not None, "Checksum is not initialized."
assert self.checksum._hash_fun is not None, "Checksum hash function is not set."

handler = self.get_handler()

while True:
buf = self.handler.read(blocksize)
buf = handler.read(blocksize)

if not isinstance(buf, bytes):
buf = buf.encode()

if not buf:
break
self.checksum._hash_fun.update(buf)

self.handler.seek(0)

if self.handler is not None: # type: ignore
# In case of passed handler, we need to seek the handler to the start after reading.
self.handler.seek(0)
else:
# Path-based handlers will be opened just-in-time, so we can close it.
handler.close()

def __del__(self):
if self.handler is not None:
Expand Down
Loading