3434
3535
3636class LifecycleMixin (BaseHandler ):
37-
3837 async def _recover_upload_state (
3938 self , client : S3Client , bucket : str , key : str , upload_id : str , context : str = ""
4039 ) -> MultipartUploadState :
4140 from s3proxy .state import reconstruct_upload_state_from_s3
4241
4342 logger .warning (
4443 "RECOVER_STATE_FROM_S3" ,
45- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." , context = context ,
44+ bucket = bucket ,
45+ key = key ,
46+ upload_id = upload_id [:20 ] + "..." ,
47+ context = context ,
4648 )
4749
4850 state = await reconstruct_upload_state_from_s3 (
@@ -54,7 +56,9 @@ async def _recover_upload_state(
5456 await self .multipart_manager .store_reconstructed_state (bucket , key , upload_id , state )
5557 logger .info (
5658 "RECOVER_STATE_SUCCESS" ,
57- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
59+ bucket = bucket ,
60+ key = key ,
61+ upload_id = upload_id [:20 ] + "..." ,
5862 parts_recovered = len (state .parts ),
5963 )
6064 return state
@@ -83,7 +87,8 @@ async def handle_create_multipart_upload(
8387 upload_metadata [hdr [11 :]] = val
8488
8589 resp = await client .create_multipart_upload (
86- bucket , key ,
90+ bucket ,
91+ key ,
8792 content_type = content_type ,
8893 metadata = upload_metadata ,
8994 tagging = tagging ,
@@ -104,19 +109,25 @@ async def handle_create_multipart_upload(
104109 if attempt == 0 :
105110 logger .warning (
106111 "PERSIST_STATE_RETRY" ,
107- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
112+ bucket = bucket ,
113+ key = key ,
114+ upload_id = upload_id [:20 ] + "..." ,
108115 error = str (e ),
109116 )
110117 else :
111118 logger .error (
112119 "PERSIST_STATE_FAILED" ,
113- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
120+ bucket = bucket ,
121+ key = key ,
122+ upload_id = upload_id [:20 ] + "..." ,
114123 error = str (e ),
115124 )
116125
117126 logger .info (
118127 "CREATE_MULTIPART_COMPLETE" ,
119- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
128+ bucket = bucket ,
129+ key = key ,
130+ upload_id = upload_id [:20 ] + "..." ,
120131 )
121132
122133 return Response (
@@ -146,7 +157,9 @@ async def handle_complete_multipart_upload(
146157
147158 logger .info (
148159 "COMPLETE_MULTIPART" ,
149- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
160+ bucket = bucket ,
161+ key = key ,
162+ upload_id = upload_id [:20 ] + "..." ,
150163 client_parts = len (completed_parts ),
151164 s3_parts = len (s3_parts ),
152165 total_mb = f"{ total_plaintext / 1024 / 1024 :.2f} MB" ,
@@ -166,7 +179,9 @@ async def handle_complete_multipart_upload(
166179 # lose the DEK, making the object permanently undecryptable.
167180 wrapped_dek = crypto .wrap_key (state .dek , self .settings .kek )
168181 await save_multipart_metadata (
169- client , bucket , key ,
182+ client ,
183+ bucket ,
184+ key ,
170185 MultipartMetadata (
171186 version = 1 ,
172187 part_count = len (completed_parts ),
@@ -179,7 +194,9 @@ async def handle_complete_multipart_upload(
179194
180195 logger .info (
181196 "COMPLETE_MULTIPART_SUCCESS" ,
182- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
197+ bucket = bucket ,
198+ key = key ,
199+ upload_id = upload_id [:20 ] + "..." ,
183200 total_parts = len (completed_parts ),
184201 total_mb = f"{ total_plaintext / 1024 / 1024 :.2f} MB" ,
185202 )
@@ -214,7 +231,9 @@ def internal_to_client_part(internal_part_number: int) -> int:
214231 # Upload exists but DEK is missing - internal state corruption
215232 logger .error (
216233 "RECOVER_DEK_MISSING" ,
217- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
234+ bucket = bucket ,
235+ key = key ,
236+ upload_id = upload_id [:20 ] + "..." ,
218237 message = "Upload exists in S3 but DEK state is missing" ,
219238 )
220239 except Exception :
@@ -236,7 +255,9 @@ def internal_to_client_part(internal_part_number: int) -> int:
236255
237256 logger .debug (
238257 "RECOVER_STATE_GROUPING" ,
239- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
258+ bucket = bucket ,
259+ key = key ,
260+ upload_id = upload_id [:20 ] + "..." ,
240261 s3_parts = len (parts_resp .get ("Parts" , [])),
241262 client_parts = sorted (client_parts .keys ()),
242263 )
@@ -255,22 +276,29 @@ def internal_to_client_part(internal_part_number: int) -> int:
255276 plaintext_size = crypto .plaintext_size (ciphertext_size )
256277 etag = s3_part .get ("ETag" , "" ).strip ('"' )
257278
258- internal_parts_meta .append (InternalPartMetadata (
259- internal_part_number = internal_num ,
260- plaintext_size = plaintext_size ,
261- ciphertext_size = ciphertext_size ,
262- etag = etag ,
263- ))
279+ internal_parts_meta .append (
280+ InternalPartMetadata (
281+ internal_part_number = internal_num ,
282+ plaintext_size = plaintext_size ,
283+ ciphertext_size = ciphertext_size ,
284+ etag = etag ,
285+ )
286+ )
264287 part_plaintext_size += plaintext_size
265288 part_ciphertext_size += ciphertext_size
266289
267290 first_etag = internal_s3_parts [0 ].get ("ETag" , "" ).strip ('"' )
268291
269292 await self .multipart_manager .add_part (
270- bucket , key , upload_id ,
293+ bucket ,
294+ key ,
295+ upload_id ,
271296 PartMetadata (
272- client_part_num , part_plaintext_size , part_ciphertext_size ,
273- first_etag , "" ,
297+ client_part_num ,
298+ part_plaintext_size ,
299+ part_ciphertext_size ,
300+ first_etag ,
301+ "" ,
274302 internal_parts = internal_parts_meta ,
275303 ),
276304 )
@@ -279,7 +307,9 @@ def internal_to_client_part(internal_part_number: int) -> int:
279307 except Exception as e :
280308 logger .error (
281309 "RECOVER_STATE_FOR_COMPLETE_FAILED" ,
282- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
310+ bucket = bucket ,
311+ key = key ,
312+ upload_id = upload_id [:20 ] + "..." ,
283313 error = str (e ),
284314 )
285315 return state
@@ -320,22 +350,24 @@ def _build_s3_parts(
320350 )
321351 for ip in sorted_internal :
322352 etag = f'"{ ip .etag } "' if not ip .etag .startswith ('"' ) else ip .etag
323- s3_parts .append ({
324- "PartNumber" : ip .internal_part_number ,
325- "ETag" : etag ,
326- })
353+ s3_parts .append (
354+ {
355+ "PartNumber" : ip .internal_part_number ,
356+ "ETag" : etag ,
357+ }
358+ )
327359 else :
328- s3_parts .append ({
329- "PartNumber" : client_part_num ,
330- "ETag" : cp ["ETag" ],
331- })
360+ s3_parts .append (
361+ {
362+ "PartNumber" : client_part_num ,
363+ "ETag" : cp ["ETag" ],
364+ }
365+ )
332366 else :
333367 missing_parts .append (client_part_num )
334368
335369 if missing_parts :
336- raise S3Error .invalid_part (
337- f"Parts { missing_parts } were never uploaded"
338- )
370+ raise S3Error .invalid_part (f"Parts { missing_parts } were never uploaded" )
339371 if not s3_parts :
340372 raise S3Error .invalid_part ("No valid parts found" )
341373
@@ -357,8 +389,11 @@ async def _handle_complete_error(
357389 if error_code == "EntityTooSmall" :
358390 logger .warning (
359391 "ENTITY_TOO_SMALL" ,
360- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
361- parts = len (s3_parts ), total_plaintext = total_plaintext ,
392+ bucket = bucket ,
393+ key = key ,
394+ upload_id = upload_id [:20 ] + "..." ,
395+ parts = len (s3_parts ),
396+ total_plaintext = total_plaintext ,
362397 )
363398 with contextlib .suppress (Exception ):
364399 await client .abort_multipart_upload (bucket , key , upload_id )
@@ -380,7 +415,9 @@ async def handle_abort_multipart_upload(
380415
381416 logger .info (
382417 "ABORT_MULTIPART" ,
383- bucket = bucket , key = key , upload_id = upload_id [:20 ] + "..." ,
418+ bucket = bucket ,
419+ key = key ,
420+ upload_id = upload_id [:20 ] + "..." ,
384421 )
385422
386423 await asyncio .gather (
0 commit comments