Skip to content

Commit 3cef645

Browse files
damnjanovictanjaTatjana Damnjanović
andauthored
[EDPOPS-144] Add multipart upload support (#498)
* Set py 3.8 as required version * [EDPOPS-144] Add multipart upload enhancements * linting * linting * linting --------- Co-authored-by: Tatjana Damnjanović <tatjana@solvebio.com>
1 parent d9f0808 commit 3cef645

1 file changed

Lines changed: 130 additions & 2 deletions

File tree

solvebio/resource/object.py

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,12 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
434434
# Get vault
435435
vault = Vault.get_by_full_path(vault_full_path, client=_client)
436436

437-
# Get MD5
438-
local_md5, _ = md5sum(local_path, multipart_threshold=None)
437+
# Get MD5 and check if multipart upload is needed
438+
multipart_threshold = kwargs.get(
439+
"multipart_threshold", 64 * 1024 * 1024
440+
) # 64MB default
441+
local_md5, _ = md5sum(local_path, multipart_threshold=multipart_threshold)
442+
439443
# Get a mimetype of file
440444
mime_tuple = mimetypes.guess_type(local_path)
441445
# If a file is compressed get a compression type, otherwise a file type
@@ -509,6 +513,27 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
509513
obj.path))
510514
print('Notice: Upload initialized')
511515

516+
# Check if multipart upload is needed
517+
if hasattr(obj, "is_multipart") and obj.is_multipart:
518+
return cls._upload_multipart(obj, local_path, local_md5, **kwargs)
519+
else:
520+
return cls._upload_single_file(obj, local_path, **kwargs)
521+
522+
@classmethod
523+
def _upload_single_file(cls, obj, local_path, **kwargs):
524+
"""Handle single-part upload for smaller files"""
525+
import mimetypes
526+
527+
# Get a mimetype of file
528+
mime_tuple = mimetypes.guess_type(local_path)
529+
# If a file is compressed get a compression type, otherwise a file type
530+
mimetype = mime_tuple[1] if mime_tuple[1] else mime_tuple[0]
531+
# Get file size
532+
size = os.path.getsize(local_path)
533+
534+
# Get MD5 for single part upload
535+
local_md5, _ = md5sum(local_path, multipart_threshold=None)
536+
512537
upload_url = obj.upload_url
513538

514539
headers = {
@@ -563,6 +588,109 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
563588

564589
return obj
565590

591+
@classmethod
592+
def _upload_multipart(cls, obj, local_path, local_md5, **kwargs):
593+
"""Handle multipart upload for larger files"""
594+
_client = kwargs.get("client") or cls._client or client
595+
print(f"Notice: Upload ID {obj.upload_id}")
596+
try:
597+
# Get presigned URLs from the object
598+
presigned_urls = obj.presigned_urls
599+
600+
print(
601+
"Notice: Starting multipart upload with {} parts...".format(
602+
len(presigned_urls)
603+
)
604+
)
605+
606+
# Step 2: Upload each part using presigned URLs
607+
parts = []
608+
with open(local_path, "rb") as f:
609+
for part_info in presigned_urls:
610+
part_number = part_info.part_number
611+
start_byte = part_info.start_byte
612+
end_byte = part_info.end_byte
613+
part_size = part_info.size
614+
upload_url = part_info.upload_url
615+
616+
print(
617+
"Notice: Uploading part {}/{}... (bytes {}-{})".format(
618+
part_number, len(presigned_urls), start_byte, end_byte
619+
)
620+
)
621+
622+
# Seek to start position and read the exact part size
623+
f.seek(start_byte)
624+
chunk_data = f.read(part_size)
625+
if not chunk_data:
626+
break
627+
628+
# Upload part with retry logic
629+
session = requests.Session()
630+
retry = Retry(
631+
total=3,
632+
backoff_factor=2,
633+
status_forcelist=(500, 502, 503, 504),
634+
allowed_methods=["PUT"],
635+
)
636+
session.mount(
637+
"https://", requests.adapters.HTTPAdapter(max_retries=retry)
638+
)
639+
640+
headers = {
641+
"Content-Length": str(len(chunk_data)),
642+
}
643+
644+
upload_resp = session.put(
645+
upload_url, data=chunk_data, headers=headers
646+
)
647+
648+
if upload_resp.status_code != 200:
649+
raise FileUploadError(
650+
"Failed to upload part {}: {}".format(
651+
part_number, upload_resp.content
652+
)
653+
)
654+
655+
# Get ETag from response
656+
etag = upload_resp.headers.get("ETag", "").strip('"')
657+
parts.append({"part_number": part_number, "etag": etag})
658+
659+
# Step 3: Complete multipart upload
660+
print("Notice: Completing multipart upload....")
661+
complete_data = {
662+
"upload_id": obj.upload_id,
663+
"physical_object_id": obj.upload_key,
664+
"parts": parts,
665+
}
666+
667+
print(f"Notice: {complete_data}")
668+
669+
complete_resp = _client.post("/v2/complete_multi_part", complete_data)
670+
671+
if "message" in complete_resp:
672+
print(
673+
"Notice: Successfully uploaded {0} to {1} with multipart upload.".format(
674+
local_path, obj.path
675+
)
676+
)
677+
return obj
678+
else:
679+
raise Exception(complete_resp)
680+
681+
except Exception as e:
682+
# Clean up failed upload - best effort cleanup
683+
try:
684+
_client.delete(
685+
obj.instance_url() + "/multipart-upload",
686+
{},
687+
)
688+
except Exception:
689+
pass # Best effort cleanup
690+
691+
obj.delete(force=True)
692+
raise FileUploadError("Multipart upload failed: {}".format(str(e)))
693+
566694
def _object_list_helper(self, **params):
567695
"""Helper method to get objects within"""
568696

0 commit comments

Comments
 (0)