diff --git a/solvebio/resource/object.py b/solvebio/resource/object.py index 3b75f0b8..2d5f7806 100644 --- a/solvebio/resource/object.py +++ b/solvebio/resource/object.py @@ -422,12 +422,13 @@ def create_shortcut(self, shortcut_full_path, **kwargs): return shortcut + @classmethod def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs): from solvebio import Vault from solvebio import Object - _client = kwargs.pop('client', None) or cls._client or client + _client = kwargs.pop("client", None) or cls._client or client local_path = os.path.expanduser(local_path) @@ -442,79 +443,88 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs): mimetype = mime_tuple[1] if mime_tuple[1] else mime_tuple[0] # Get file size size = os.path.getsize(local_path) - if size == 0: - print('WARNING: skipping empty object: {}'.format(local_path)) - return False # Check if object exists already and compare md5sums full_path, path_dict = Object.validate_full_path( - os.path.join('{}:{}'.format(vault.full_path, remote_path), - os.path.basename(local_path)), client=_client) + os.path.join( + "{}:{}".format(vault.full_path, remote_path), + os.path.basename(local_path), + ), + client=_client, + ) try: obj = cls.get_by_full_path(full_path, client=_client) if not obj.is_file: - print('WARNING: A {} currently exists at {}' - .format(obj.object_type, full_path)) + print( + "WARNING: A {} currently exists at {}".format( + obj.object_type, full_path + ) + ) else: # Check against md5sum of remote file if obj.md5 == local_md5: - print('WARNING: File {} (md5sum {}) already exists, ' - 'not uploading'.format(full_path, local_md5)) + print( + "WARNING: File {} (md5sum {}) already exists, " + "not uploading".format(full_path, local_md5) + ) return obj else: - if kwargs.get('archive_folder'): - obj._archive(kwargs['archive_folder']) - else: - print('WARNING: File {} exists on SolveBio with different ' - 'md5sum (local: {} vs remote: {}) Uploading anyway, ' - 'but not overwriting.' - .format(full_path, local_md5, obj.md5)) + print( + "WARNING: File {} exists on SolveBio with different " + "md5sum (local: {} vs remote: {}) Uploading anyway, " + "but not overwriting.".format(full_path, local_md5, obj.md5) + ) except NotFoundError: - obj = None pass # Lookup parent object - if kwargs.get('follow_shortcuts') and obj and obj.is_file: - vault_id = obj.vault_id - parent_object_id = obj.parent_object_id - filename = obj.filename + if path_dict["parent_path"] == "/": + parent_object_id = None else: - vault_id = vault.id - filename = os.path.basename(local_path) - if path_dict['parent_path'] == '/': - parent_object_id = None - else: - parent_obj = Object.get_by_full_path( - path_dict['parent_full_path'], assert_type='folder', - client=_client - ) - parent_object_id = parent_obj.id + parent_obj = Object.get_by_full_path( + path_dict["parent_full_path"], assert_type="folder", client=_client + ) + parent_object_id = parent_obj.id - description = kwargs.get('description') + description = kwargs.get("description") # Create the file, and upload it to the Upload URL obj = Object.create( - vault_id=vault_id, + vault_id=vault.id, parent_object_id=parent_object_id, - object_type='file', - filename=filename, + object_type="file", + filename=os.path.basename(local_path), md5=local_md5, mimetype=mimetype, size=size, description=description, - tags=kwargs.get('tags', []) or [], - client=_client + tags=kwargs.get("tags", []) or [], + client=_client, ) - print('Notice: File created for {0} at {1}'.format(local_path, - obj.path)) - print('Notice: Upload initialized') + print( + "Notice: File created for {0} at {1}".format(local_path, obj.path) + ) + print("Notice: Upload initialized") + + # Check if multipart upload is needed + if hasattr(obj, "is_multipart") and obj.is_multipart: + return cls._upload_multipart(obj, local_path, local_md5, **kwargs) + else: + return cls._upload_single_file( + obj, local_path, local_md5, mimetype, size, **kwargs + ) + + @classmethod + def _upload_single_file( + cls, obj, local_path, local_md5, mimetype, size, **kwargs + ): upload_url = obj.upload_url headers = { - 'Content-MD5': base64.b64encode(binascii.unhexlify(local_md5)), - 'Content-Type': mimetype, - 'Content-Length': str(size), + "Content-MD5": base64.b64encode(binascii.unhexlify(local_md5)), + "Content-Type": mimetype, + "Content-Length": str(size), } # Use a session with a retry policy to handle connection errors. @@ -524,45 +534,137 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs): total=max_retries, read=max_retries, connect=max_retries, - backoff_factor=2, - status_forcelist=(500, 502, 503, 504, 400), - allowed_methods=["HEAD", "OPTIONS", "GET", "PUT", "POST"] + backoff_factor=0.3, + status_forcelist=(500, 502, 504, 400), ) session.mount( - 'https://', requests.adapters.HTTPAdapter(max_retries=retry)) - - # Handle retries when upload fails due to an exception such as SSLError - n_retries = 0 - while True: - try: - upload_resp = session.put(upload_url, - data=open(local_path, 'rb'), - headers=headers) - except Exception as e: - if n_retries == max_retries: - obj.delete(force=True) - raise FileUploadError(str(e)) - - n_retries += 1 - print('WARNING: Retrying ({}/{}) failed upload for {}: {}'.format( - n_retries, max_retries, local_path, e)) - time.sleep(2 * n_retries) - else: - break + "https://", requests.adapters.HTTPAdapter(max_retries=retry) + ) + upload_resp = session.put( + upload_url, data=open(local_path, "rb"), headers=headers + ) if upload_resp.status_code != 200: - print('WARNING: Upload status code for {0} was {1}'.format( - local_path, upload_resp.status_code - )) + print( + "WARNING: Upload status code for {0} was {1}".format( + local_path, upload_resp.status_code + ) + ) # Clean up the failed upload obj.delete(force=True) raise FileUploadError(upload_resp.content) else: - print('Notice: Successfully uploaded {0} to {1}'.format(local_path, - obj.path)) + print( + "Notice: Successfully uploaded {0} to {1}".format( + local_path, obj.path + ) + ) return obj + @classmethod + def _upload_multipart(cls, obj, local_path, local_md5, **kwargs): + """Handle multipart upload for larger files""" + _client = kwargs.get("client") or cls._client or client + print(f"Notice: Upload ID {obj.upload_id}") + try: + # Get presigned URLs from the object + presigned_urls = obj.presigned_urls + + print( + "Notice: Starting multipart upload with {} parts...".format( + len(presigned_urls) + ) + ) + + # Step 2: Upload each part using presigned URLs + parts = [] + with open(local_path, "rb") as f: + for part_info in presigned_urls: + part_number = part_info.part_number + start_byte = part_info.start_byte + end_byte = part_info.end_byte + part_size = part_info.size + upload_url = part_info.upload_url + + print( + "Notice: Uploading part {}/{}... (bytes {}-{})".format( + part_number, len(presigned_urls), start_byte, end_byte + ) + ) + + # Seek to start position and read the exact part size + f.seek(start_byte) + chunk_data = f.read(part_size) + if not chunk_data: + break + + # Upload part with retry logic + session = requests.Session() + retry = Retry( + total=3, + backoff_factor=2, + status_forcelist=(500, 502, 503, 504), + allowed_methods=["PUT"], + ) + session.mount( + "https://", requests.adapters.HTTPAdapter(max_retries=retry) + ) + + headers = { + "Content-Length": str(len(chunk_data)), + } + + upload_resp = session.put( + upload_url, data=chunk_data, headers=headers + ) + + if upload_resp.status_code != 200: + raise FileUploadError( + "Failed to upload part {}: {}".format( + part_number, upload_resp.content + ) + ) + + # Get ETag from response + etag = upload_resp.headers.get("ETag", "").strip('"') + parts.append({"part_number": part_number, "etag": etag}) + + # Step 3: Complete multipart upload + print("Notice: Completing multipart upload....") + complete_data = { + "upload_id": obj.upload_id, + "physical_object_id": obj.upload_key, + "parts": parts, + } + + print(f"Notice: {complete_data}") + + complete_resp = _client.post("/v2/complete_multi_part", complete_data) + + if "message" in complete_resp: + print( + "Notice: Successfully uploaded {0} to {1} with multipart upload.".format( + local_path, obj.path + ) + ) + return obj + else: + raise Exception(complete_resp) + + except Exception as e: + # Clean up failed upload - best effort cleanup + try: + _client.delete( + obj.instance_url() + "/multipart-upload", + {}, + ) + except Exception: + pass # Best effort cleanup + + obj.delete(force=True) + raise FileUploadError("Multipart upload failed: {}".format(str(e))) + def _object_list_helper(self, **params): """Helper method to get objects within"""