Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions git_remote_s3/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import sys
import logging
import uuid
import boto3
import boto3.exceptions
from botocore.exceptions import (
Expand Down Expand Up @@ -362,12 +363,20 @@ def acquire_lock(self, remote_ref: str) -> Optional[str]:
"""

lock_key = f"{self.prefix}/{remote_ref}/LOCK#.lock"
# Identifies our PUT so a 412 caused by boto3 retrying our own
# successful write can be recognised and claimed. Set
# GIT_REMOTE_S3_OWNER_TOKEN to a stable per-client value to also
# recover locks leaked by a previous invocation of this helper.
token = (
os.environ.get("GIT_REMOTE_S3_OWNER_TOKEN", "").encode()
or uuid.uuid4().hex.encode()
)
try:
# Use conditional write to create the lock only if it does not exist
self.s3.put_object(
Bucket=self.bucket,
Key=lock_key,
Body=b"",
Body=token,
IfNoneMatch="*",
)
return lock_key
Expand All @@ -380,26 +389,35 @@ def acquire_lock(self, remote_ref: str) -> Optional[str]:
"412",
]
):
# Check if the existing lock is stale; if so, try to clear and acquire
# Take the existing lock if it's ours (boto3 retried our own
# successful PUT, or an outer-loop retry of this helper) or stale.
try:
head = self.s3.head_object(Bucket=self.bucket, Key=lock_key)
is_stale = False
last_modified = head.get("LastModified")
if last_modified is not None:
import datetime

now = datetime.datetime.now(tz=last_modified.tzinfo)
age = (now - last_modified).total_seconds()
if age > self.lock_ttl_seconds:
# Attempt to delete stale lock and re-acquire
self.s3.delete_object(Bucket=self.bucket, Key=lock_key)
# Retry conditional put
self.s3.put_object(
Bucket=self.bucket,
Key=lock_key,
Body=b"",
IfNoneMatch="*",
)
return lock_key
is_stale = age > self.lock_ttl_seconds
is_ours = False
try:
obj = self.s3.get_object(Bucket=self.bucket, Key=lock_key)
is_ours = obj["Body"].read() == token
except botocore.exceptions.ClientError:
pass
if is_ours or is_stale:
# Attempt to delete and re-acquire
self.s3.delete_object(Bucket=self.bucket, Key=lock_key)
# Retry conditional put
self.s3.put_object(
Bucket=self.bucket,
Key=lock_key,
Body=token,
IfNoneMatch="*",
)
return lock_key
except botocore.exceptions.ClientError as e:
logger.info(f"failed to check staleness of {lock_key} for {remote_ref}: {e}")
raise e
Expand Down
51 changes: 51 additions & 0 deletions test/remote_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,3 +821,54 @@ def head_object_side_effect(Bucket, Key):
c for c in session_client_mock.return_value.put_object.call_args_list if c.kwargs.get("Key", "").endswith(".lock")
]
assert len(put_lock_calls) >= 2


@patch("boto3.Session.client")
def test_acquire_lock_claims_own_lock_after_boto3_retry_after_success(session_client_mock):
"""When boto3 silently retries a PutObject that already succeeded
server-side, the retry returns 412 PreconditionFailed. The lock body
matches our just-issued token, so acquire_lock claims it instead of
waiting for TTL."""
s3_remote = S3Remote(UriScheme.S3, None, "test_bucket", "test_prefix")
remote_ref = f"refs/heads/{BRANCH}"
lock_key = f"test_prefix/{remote_ref}/LOCK#.lock"

captured_token = {}
# Stateful: lock is present until DELETEd, then second PUT succeeds.
lock_present = {"v": True}

def put_object_side_effect(Bucket, Key, Body=None, **kwargs):
if Key == lock_key and kwargs.get("IfNoneMatch") == "*":
if "v" not in captured_token:
captured_token["v"] = Body
if lock_present["v"]:
raise botocore.exceptions.ClientError(
{
"ResponseMetadata": {"HTTPStatusCode": 412},
"Error": {"Code": "PreconditionFailed"},
},
"put_object",
)
lock_present["v"] = True
return {}

def delete_object_side_effect(Bucket, Key, **kwargs):
if Key == lock_key:
lock_present["v"] = False
return {}

def get_object_side_effect(Bucket, Key, **kwargs):
return {"Body": BytesIO(captured_token["v"])}

session_client_mock.return_value.put_object.side_effect = put_object_side_effect
session_client_mock.return_value.get_object.side_effect = get_object_side_effect
session_client_mock.return_value.delete_object.side_effect = delete_object_side_effect
# Non-stale lock — proves the claim path runs via own-token, not staleness.
session_client_mock.return_value.head_object.return_value = {
"LastModified": datetime.datetime.now(datetime.timezone.utc)
}

assert s3_remote.acquire_lock(remote_ref) == lock_key
# Token is an unguessable per-call fingerprint, not a constant.
assert len(captured_token["v"]) >= 16
assert session_client_mock.return_value.get_object.called