Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 174 additions & 72 deletions solvebio/resource/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,13 @@ def create_shortcut(self, shortcut_full_path, **kwargs):

return shortcut


@classmethod
def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
from solvebio import Vault
from solvebio import Object

_client = kwargs.pop('client', None) or cls._client or client
_client = kwargs.pop("client", None) or cls._client or client

local_path = os.path.expanduser(local_path)

Expand All @@ -442,79 +443,88 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
mimetype = mime_tuple[1] if mime_tuple[1] else mime_tuple[0]
# Get file size
size = os.path.getsize(local_path)
if size == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this part? We should skip the empty files.

print('WARNING: skipping empty object: {}'.format(local_path))
return False

# Check if object exists already and compare md5sums
full_path, path_dict = Object.validate_full_path(
os.path.join('{}:{}'.format(vault.full_path, remote_path),
os.path.basename(local_path)), client=_client)
os.path.join(
"{}:{}".format(vault.full_path, remote_path),
os.path.basename(local_path),
),
client=_client,
)
try:
obj = cls.get_by_full_path(full_path, client=_client)
if not obj.is_file:
print('WARNING: A {} currently exists at {}'
.format(obj.object_type, full_path))
print(
"WARNING: A {} currently exists at {}".format(
obj.object_type, full_path
)
)
else:
# Check against md5sum of remote file
if obj.md5 == local_md5:
print('WARNING: File {} (md5sum {}) already exists, '
'not uploading'.format(full_path, local_md5))
print(
"WARNING: File {} (md5sum {}) already exists, "
"not uploading".format(full_path, local_md5)
)
return obj
else:
if kwargs.get('archive_folder'):
obj._archive(kwargs['archive_folder'])
else:
print('WARNING: File {} exists on SolveBio with different '
'md5sum (local: {} vs remote: {}) Uploading anyway, '
'but not overwriting.'
.format(full_path, local_md5, obj.md5))
print(
"WARNING: File {} exists on SolveBio with different "
"md5sum (local: {} vs remote: {}) Uploading anyway, "
"but not overwriting.".format(full_path, local_md5, obj.md5)
)
except NotFoundError:
obj = None
pass

# Lookup parent object
if kwargs.get('follow_shortcuts') and obj and obj.is_file:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this logic for the shortcuts resolution, so this shouldn't be removed

vault_id = obj.vault_id
parent_object_id = obj.parent_object_id
filename = obj.filename
if path_dict["parent_path"] == "/":
parent_object_id = None
else:
vault_id = vault.id
filename = os.path.basename(local_path)
if path_dict['parent_path'] == '/':
parent_object_id = None
else:
parent_obj = Object.get_by_full_path(
path_dict['parent_full_path'], assert_type='folder',
client=_client
)
parent_object_id = parent_obj.id
parent_obj = Object.get_by_full_path(
path_dict["parent_full_path"], assert_type="folder", client=_client
)
parent_object_id = parent_obj.id

description = kwargs.get('description')
description = kwargs.get("description")

# Create the file, and upload it to the Upload URL
obj = Object.create(
vault_id=vault_id,
vault_id=vault.id,
parent_object_id=parent_object_id,
object_type='file',
filename=filename,
object_type="file",
filename=os.path.basename(local_path),
md5=local_md5,
mimetype=mimetype,
size=size,
description=description,
tags=kwargs.get('tags', []) or [],
client=_client
tags=kwargs.get("tags", []) or [],
client=_client,
)
print('Notice: File created for {0} at {1}'.format(local_path,
obj.path))
print('Notice: Upload initialized')

print(
"Notice: File created for {0} at {1}".format(local_path, obj.path)
)
print("Notice: Upload initialized")

# Check if multipart upload is needed
if hasattr(obj, "is_multipart") and obj.is_multipart:
return cls._upload_multipart(obj, local_path, local_md5, **kwargs)
else:
return cls._upload_single_file(
obj, local_path, local_md5, mimetype, size, **kwargs
)

@classmethod
def _upload_single_file(
cls, obj, local_path, local_md5, mimetype, size, **kwargs
):
upload_url = obj.upload_url

headers = {
'Content-MD5': base64.b64encode(binascii.unhexlify(local_md5)),
'Content-Type': mimetype,
'Content-Length': str(size),
"Content-MD5": base64.b64encode(binascii.unhexlify(local_md5)),
"Content-Type": mimetype,
"Content-Length": str(size),
}

# Use a session with a retry policy to handle connection errors.
Expand All @@ -524,45 +534,137 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
total=max_retries,
read=max_retries,
connect=max_retries,
backoff_factor=2,
status_forcelist=(500, 502, 503, 504, 400),
allowed_methods=["HEAD", "OPTIONS", "GET", "PUT", "POST"]
backoff_factor=0.3,
status_forcelist=(500, 502, 504, 400),
)
session.mount(
'https://', requests.adapters.HTTPAdapter(max_retries=retry))

# Handle retries when upload fails due to an exception such as SSLError
n_retries = 0
while True:
try:
upload_resp = session.put(upload_url,
data=open(local_path, 'rb'),
headers=headers)
except Exception as e:
if n_retries == max_retries:
obj.delete(force=True)
raise FileUploadError(str(e))

n_retries += 1
print('WARNING: Retrying ({}/{}) failed upload for {}: {}'.format(
n_retries, max_retries, local_path, e))
time.sleep(2 * n_retries)
else:
break
"https://", requests.adapters.HTTPAdapter(max_retries=retry)
)
upload_resp = session.put(
upload_url, data=open(local_path, "rb"), headers=headers
)

if upload_resp.status_code != 200:
print('WARNING: Upload status code for {0} was {1}'.format(
local_path, upload_resp.status_code
))
print(
"WARNING: Upload status code for {0} was {1}".format(
local_path, upload_resp.status_code
)
)
# Clean up the failed upload
obj.delete(force=True)
raise FileUploadError(upload_resp.content)
else:
print('Notice: Successfully uploaded {0} to {1}'.format(local_path,
obj.path))
print(
"Notice: Successfully uploaded {0} to {1}".format(
local_path, obj.path
)
)

return obj

@classmethod
def _upload_multipart(cls, obj, local_path, local_md5, **kwargs):
"""Handle multipart upload for larger files"""
_client = kwargs.get("client") or cls._client or client
print(f"Notice: Upload ID {obj.upload_id}")
try:
# Get presigned URLs from the object
presigned_urls = obj.presigned_urls

print(
"Notice: Starting multipart upload with {} parts...".format(
len(presigned_urls)
)
)

# Step 2: Upload each part using presigned URLs
parts = []
with open(local_path, "rb") as f:
for part_info in presigned_urls:
part_number = part_info.part_number
start_byte = part_info.start_byte
end_byte = part_info.end_byte
part_size = part_info.size
upload_url = part_info.upload_url

print(
"Notice: Uploading part {}/{}... (bytes {}-{})".format(
part_number, len(presigned_urls), start_byte, end_byte
)
)

# Seek to start position and read the exact part size
f.seek(start_byte)
chunk_data = f.read(part_size)
if not chunk_data:
break

# Upload part with retry logic
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=2,
status_forcelist=(500, 502, 503, 504),
allowed_methods=["PUT"],
)
session.mount(
"https://", requests.adapters.HTTPAdapter(max_retries=retry)
)

headers = {
"Content-Length": str(len(chunk_data)),
}

upload_resp = session.put(
upload_url, data=chunk_data, headers=headers
)

if upload_resp.status_code != 200:
raise FileUploadError(
"Failed to upload part {}: {}".format(
part_number, upload_resp.content
)
)

# Get ETag from response
etag = upload_resp.headers.get("ETag", "").strip('"')
parts.append({"part_number": part_number, "etag": etag})

# Step 3: Complete multipart upload
print("Notice: Completing multipart upload....")
complete_data = {
"upload_id": obj.upload_id,
"physical_object_id": obj.upload_key,
"parts": parts,
}

print(f"Notice: {complete_data}")

complete_resp = _client.post("/v2/complete_multi_part", complete_data)

if "message" in complete_resp:
print(
"Notice: Successfully uploaded {0} to {1} with multipart upload.".format(
local_path, obj.path
)
)
return obj
else:
raise Exception(complete_resp)

except Exception as e:
# Clean up failed upload - best effort cleanup
try:
_client.delete(
obj.instance_url() + "/multipart-upload",
{},
)
except Exception:
pass # Best effort cleanup

obj.delete(force=True)
raise FileUploadError("Multipart upload failed: {}".format(str(e)))

def _object_list_helper(self, **params):
"""Helper method to get objects within"""

Expand Down
Loading