Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ List of the most important changes for each release.

## 0.8.14
- Adds utility for addressing immediate FK constraints caused by Django upgrade, automatically performed for morango models in a Django migration.
- Adds retry behavior for low-level connection issues not handled by `urllib3` retries
- Allows repeat pushes and pulls of buffers during transfer
- Ignores HTTP 404 errors during sync or transfer session closure, which may occur if they're already closed

## 0.8.13
- Removes multiprocessing fallback for RSA key generation to avoid leaving zombie processes; key generation now stays in-process
Expand Down
2 changes: 1 addition & 1 deletion morango/sync/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def _invoke_middleware(self, context, middleware):
return context.stage_status
except Exception as e:
# always log the error itself
logger.error(e)
logger.exception(e)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

context.update(stage_status=transfer_statuses.ERRORED, error=e)
# fire completed signal, after context update. handlers can use context to detect error
signal.completed.fire(context=prepared_context or context)
Expand Down
10 changes: 3 additions & 7 deletions morango/sync/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1390,9 +1390,8 @@ def close_transfer_session(self, context):
Closes remote transfer session

:type context: NetworkSessionContext
:return: The Response
"""
return context.connection._close_transfer_session(context.transfer_session)
context.connection._close_transfer_session(context.transfer_session)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response object wasn't really necessary, so this no longer returns it.


def put_buffers(self, context, buffers):
"""
Expand Down Expand Up @@ -1763,8 +1762,5 @@ def handle(self, context):
"""
:type context: NetworkSessionContext
"""
response = self.close_transfer_session(context)
remote_status = transfer_statuses.COMPLETED
if response.status_code < 200 or response.status_code >= 300:
remote_status = transfer_statuses.ERRORED
return remote_status
self.close_transfer_session(context)
return transfer_statuses.COMPLETED
Comment on lines -1770 to +1766

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this code, we call raise_for_status() so this seemed unnecessary and because we may get a 404 with a retry, we wouldn't have an accurate response. This now relies on the controller to set the status.

294 changes: 257 additions & 37 deletions morango/sync/session.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
import logging
from contextlib import contextmanager

from requests import exceptions
from morango import __version__
from requests.adapters import HTTPAdapter
from requests.sessions import Session
from requests.utils import super_len
from requests.packages.urllib3.util.url import parse_url
from urllib3.exceptions import MaxRetryError
from urllib3.util.url import parse_url

from morango import __version__
from morango.utils import nullcontext
from morango.utils import serialize_capabilities_to_client_request
from morango.utils import SETTINGS


logger = logging.getLogger(__name__)


_RETRY_REQUEST_EXCEPTIONS = (
exceptions.ConnectionError,
Comment thread
bjester marked this conversation as resolved.
exceptions.ChunkedEncodingError,
exceptions.ContentDecodingError,
)


def _headers_content_length(headers):
"""
Obtains the value of 'Content-Length' from the provided headers.

:param headers: Dictionary of headers
:type headers: dict
:return: The integer value of 'Content-Length' if found and valid, otherwise 0.
:rtype: int
"""
try:
content_length = int(headers.get("Content-Length", 0))
if content_length > 0:
Expand All @@ -24,79 +43,280 @@ def _headers_content_length(headers):


def _length_of_headers(headers):
"""
Calculates the total length of all headers.

:param headers: Dictionary of headers
:type headers: dict
:return: The total length of the string representation of all headers.
:rtype: int
"""
return super_len(
"\n".join(["{}: {}".format(key, value) for key, value in headers.items()])
)


def _is_retryable_method(retries, method):
"""
Checks if the request method is configured as retryable.

:type retries: urllib3.util.retry.Retry
:type method: str|None
:rtype: bool
"""
allowed_methods = getattr(retries, "allowed_methods", None)
if allowed_methods is False or allowed_methods is None:
return True
return method.upper() in allowed_methods if method is not None else False


def _log_response_error(err, response):
"""
Logs an error and its associated response content.

:param err: The exception instance that represents the error encountered.
:type err: Exception
:param response: The HTTP response object associated with the error.
If None, it is interpreted as no response being available.
:type response: Optional[Response]
"""
try:
response_content = response.content if response else "(no response)"
except Exception:
response_content = "(unable to read response)"
logger.error(
"{} Reason: {}".format(err.__class__.__name__, response_content)
)


class ContextualRetryHTTPAdapter(HTTPAdapter):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice - I think I came across this recently somewhere either in Kolibri or ricecooker and had wondered why we hadn't used this approach before. I approve!

@contextmanager
def use_retries(self, max_retries):
"""
Context manager for temporarily changing the retry configuration.

:param max_retries: The temporary Retry object
:type max_retries: urllib3.util.retry.Retry
"""
original_retries = self.max_retries
try:
self.max_retries = max_retries
yield
finally:
self.max_retries = original_retries


class SessionWrapper(Session):
"""
Wrapper around `requests.sessions.Session` in order to implement logging around all request errors.
"""

bytes_sent = 0
bytes_received = 0

def __init__(self):
def __init__(self, max_retries):
Comment thread
bjester marked this conversation as resolved.
"""
:param max_retries: The urllib3 Retry object
:type max_retries: urllib3.util.retry.Retry
"""
super(SessionWrapper, self).__init__()
self.max_retries = max_retries

user_agent_header = "morango/{}".format(__version__)
if SETTINGS.CUSTOM_INSTANCE_INFO is not None:
instances = list(SETTINGS.CUSTOM_INSTANCE_INFO)
if instances:
user_agent_header += " " + "{}/{}".format(instances[0], SETTINGS.CUSTOM_INSTANCE_INFO.get(instances[0]))
self.headers["User-Agent"] = "{} {}".format(user_agent_header, self.headers["User-Agent"])
self.hooks["response"].append(self._track_bytes_received)
Comment thread
bjester marked this conversation as resolved.
self.bytes_sent = 0
self.bytes_received = 0

def request(self, method, url, **kwargs):
response = None
# use custom adapter
adapter = ContextualRetryHTTPAdapter()
self.mount("http://", adapter)
self.mount("https://", adapter)

def _track_bytes_sent(self, request):
"""
Request hook that tracks the size of the request, by capturing the size of headers and the
request body. Note: python requests only supports the `response` hook, so this is invoked
manually

:type request: requests.Request|requests.PreparedRequest
"""
try:
response = super(SessionWrapper, self).request(method, url, **kwargs)
parsed_url = parse_url(request.url)
# we don't bother checking if the content length header exists here because we've probably
# been given the request body as Morango sends bodies that aren't streamed, so the
# underlying requests code will set it appropriately
self.bytes_sent += len("{} {} HTTP/1.1".format(request.method, parsed_url.path))
self.bytes_sent += _length_of_headers(request.headers)
self.bytes_sent += _headers_content_length(request.headers)
except Exception as e:
# tracking bandwidth usage is useful but not critical
logger.exception(e)

# capture bytes received from the response, the length header could be missing if it's
# a chunked response though
content_length = _headers_content_length(response.headers)
if not content_length:
content_length = super_len(response.content)
def _track_bytes_received(self, response, *args, **kwargs):
"""
Response hook that tracks the size of the response, by capturing the size of headers and
the response body

:type response: requests.Response
"""
try:
# headers:
self.bytes_received += len(
"HTTP/1.1 {} {}".format(response.status_code, response.reason)
)
self.bytes_received += _length_of_headers(response.headers)
self.bytes_received += content_length

response.raise_for_status()
return response
except exceptions.RequestException as req_err:
# we want to log all request errors for debugging purposes
if response is None:
response = req_err.response
# body:
# capture bytes received from the response, the length header could be missing if it's
# a chunked response though
content_length = _headers_content_length(response.headers)
if not content_length:
content_length = super_len(response.content)
self.bytes_received += content_length
except Exception as e:
# tracking bandwidth usage is useful but not critical
logger.exception(e)

response_content = response.content if response else "(no response)"
logger.error(
"{} Reason: {}".format(req_err.__class__.__name__, response_content)
)
raise req_err
def _get_adapter(self, url):
"""
:param url: the request URL
:type url: bytes|str|None
:rtype: Optional[HTTPAdapter]
"""
if url is None:
return None
# requests allows bytes
if isinstance(url, bytes):
url = url.decode("utf-8")
try:
return self.get_adapter(url)
except exceptions.InvalidSchema:
return None

def prepare_request(self, request):
"""
Override request preparer so we can get the prepared content length, for tracking
transfer sizes
Override request preparer so we can add morango capabilities to the request, and invoke
the sent bytes hook.

:type request: requests.Request
:rtype: requests.PreparedRequest
"""
# add header with client's morango capabilities so server has that information
serialize_capabilities_to_client_request(request)
prepped = super(SessionWrapper, self).prepare_request(request)
parsed_url = parse_url(request.url)
return super(SessionWrapper, self).prepare_request(request)

# we don't bother checking if the content length header exists here because we've probably
# been given the request body as Morango sends bodies that aren't streamed, so the
# underlying requests code will set it appropriately
self.bytes_sent += len("{} {} HTTP/1.1".format(request.method, parsed_url.path))
self.bytes_sent += _length_of_headers(prepped.headers)
self.bytes_sent += _headers_content_length(prepped.headers)
def request(self, method, url, **kwargs):
"""
Issues an HTTP request, with conditional retry behavior if passed `is_retryable` kwarg, and
logs any errors from the request flow.

:param method: The HTTP request method (e.g., 'GET', 'POST', 'PUT', etc.).
:type method: str
:param url: The URL to send the request to.
:type url: str
:param kwargs: Additional arguments to pass to the underlying request method.
:return: The HTTP response object obtained from the request.
:rtype: requests.Response
:raises Exception: Logs then re-raises any exception that occurs during the request.
"""
adapter = self._get_adapter(url)

# super's request has strict kwarg list, so we have to pop `is_retryable` and modify
# the adapter state based on the value
is_retryable = kwargs.pop("is_retryable", False)
if is_retryable and isinstance(adapter, ContextualRetryHTTPAdapter):
ctx = adapter.use_retries(self.max_retries)
else:
ctx = nullcontext()

with ctx:
response = None
try:
response = super(SessionWrapper, self).request(method, url, **kwargs)
response.raise_for_status()
return response
except Exception as e:
Comment thread
bjester marked this conversation as resolved.
if response is None:
response = getattr(e, 'response', None)

_log_response_error(e, response)
raise e

def send(self, request, **kwargs):
"""
Issues an HTTP request with automatic retry handling for transport-level failures and
logging of request-related errors.

:param request: The prepared request
:type request: requests.PreparedRequest
:param kwargs: Additional arguments to pass to the underlying `send` method.
:return: The HTTP response object obtained from the request.
:rtype: requests.Response
"""
adapter = self._get_adapter(request.url)
retries = adapter.max_retries if adapter is not None else None

return prepped
while True:
# sent bytes from low-level retries in urllib3 are not captured, but this is good enough
self._track_bytes_sent(request)
response = None
try:
response = super(SessionWrapper, self).send(request, **kwargs)
return response
except exceptions.RequestException as e:
retries = self._should_retry(retries, request, e, response=response)
if retries is None:
raise e

def _should_retry(self, retries, request, e, response=None):
Comment thread
bjester marked this conversation as resolved.
"""
Determines whether a request should be retried based on the provided criteria, including
the retry settings, the type of exception occurred, and the HTTP method used. This method
also handles sleeping between retries and increments retry counters appropriately. If
retries are exhausted, it raises a `RetryError`.

:param retries: The retry configuration instance that tracks and manages retry attempts.
:type retries: Optional[urllib3.util.retry.Retry]
:param request: The HTTP request object that specifies the details of the request being made.
:type request: requests.PreparedRequest
:param e: The exception raised during the request execution that triggered this check.
:type e: requests.exceptions.RequestException
:param response: Optional. The HTTP response object corresponding to the request, if available.
:type response: Optional[requests.Response]
:return: The new Retry object if the retry is allowed, otherwise None.
:rtype: Optional[urllib3.util.retry.Retry]
"""
if response is None:
response = e.response

if (
retries is None
Comment thread
bjester marked this conversation as resolved.
or retries.total == 0
or not isinstance(e, _RETRY_REQUEST_EXCEPTIONS)
or not _is_retryable_method(retries, request.method)
):
return None

# requests might wrap `MaxRetryError` in a `ConnectionError`
if any(isinstance(arg, MaxRetryError) for arg in e.args):
return None

try:
# may raise if retries have been exhausted
retries = retries.increment(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increment returns a new Retry object every time, hence why we pass it around.

method=request.method,
url=request.url,
response=response,
error=e
)
logger.debug(f"Sleeping before retrying {request.method} request to {request.url}")
retries.sleep(response)
return retries
except MaxRetryError as _e:
# re-raise wrapped exception, like requests would. see `HTTPAdapter.send`
raise exceptions.RetryError(_e.reason, request=request, response=response)

def reset_transfer_bytes(self):
"""
Expand Down
Loading
Loading