From 73bca99b2dfaedcb15ff135357e415bd926be054 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Mon, 29 Jun 2026 15:24:31 -0700 Subject: [PATCH 1/5] Post 2.17 requests declares urllib3 a normal dep --- morango/sync/session.py | 4 ++-- morango/sync/syncsession.py | 7 ++++--- setup.py | 3 ++- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/morango/sync/session.py b/morango/sync/session.py index bbe03161..e092e0f6 100644 --- a/morango/sync/session.py +++ b/morango/sync/session.py @@ -1,11 +1,11 @@ import logging from requests import exceptions -from morango import __version__ from requests.sessions import Session from requests.utils import super_len -from requests.packages.urllib3.util.url import parse_url +from urllib3.util.url import parse_url +from morango import __version__ from morango.utils import serialize_capabilities_to_client_request from morango.utils import SETTINGS diff --git a/morango/sync/syncsession.py b/morango/sync/syncsession.py index b3f61603..fbf49de3 100644 --- a/morango/sync/syncsession.py +++ b/morango/sync/syncsession.py @@ -10,11 +10,12 @@ from urllib.parse import urljoin from urllib.parse import urlparse +from django.db import connection +from django.db import transaction from django.utils import timezone from requests.adapters import HTTPAdapter from requests.exceptions import HTTPError -from requests.packages.urllib3.util.retry import Retry -from django.db import transaction, connection +from urllib3.util.retry import Retry from .session import SessionWrapper from morango.api.serializers import CertificateSerializer @@ -38,11 +39,11 @@ from morango.sync.context import LocalSessionContext from morango.sync.context import NetworkSessionContext from morango.sync.controller import SessionController +from morango.sync.utils import lock_partitions from morango.sync.utils import SyncSignal from morango.sync.utils import SyncSignalGroup from morango.utils import CAPABILITIES from morango.utils import pid_exists -from morango.sync.utils import lock_partitions if GZIP_BUFFER_POST in CAPABILITIES: from gzip import GzipFile diff --git a/setup.py b/setup.py index 69140a2a..7213f9c4 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,8 @@ "rsa<4.10", "djangorestframework>3.10", "django-ipware==4.0.2", - "requests", + "requests>=2.27.1", + "urllib3>=1.26.0", "ifcfg", ], license="MIT", From d3a64a1f71e56ca54de39290fca68ca94c7f2eee Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 30 Jun 2026 11:52:17 -0700 Subject: [PATCH 2/5] Allow repeat push/pull of buffers --- morango/sync/utils.py | 14 +++++++++--- tests/testapp/tests/test_api.py | 39 +++++++++++++++++++++++++++------ 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/morango/sync/utils.py b/morango/sync/utils.py index a025b55e..5201657e 100644 --- a/morango/sync/utils.py +++ b/morango/sync/utils.py @@ -128,18 +128,26 @@ def validate_and_create_buffer_data( # noqa: C901 buffer_list += [Buffer(**record)] with transaction.atomic(): - transfer_session.records_transferred += len(data) + deleted_buffers, _ = Buffer.objects.filter( + transfer_session=transfer_session, + model_uuid__in=[record["model_uuid"] for record in data] + ).delete() + RecordMaxCounterBuffer.objects.filter( + transfer_session=transfer_session, + model_uuid__in=[record["model_uuid"] for record in data] + ).delete() if connection is not None: transfer_session.bytes_sent = connection.bytes_sent if connection is not None: transfer_session.bytes_received = connection.bytes_received - transfer_session.save() - Buffer.objects.bulk_create(buffer_list) RecordMaxCounterBuffer.objects.bulk_create(rmcb_list) + transfer_session.records_transferred += len(buffer_list) - deleted_buffers + transfer_session.save() + class SyncSignal(object): """ diff --git a/tests/testapp/tests/test_api.py b/tests/testapp/tests/test_api.py index 2b1367d5..f0a873aa 100644 --- a/tests/testapp/tests/test_api.py +++ b/tests/testapp/tests/test_api.py @@ -897,7 +897,7 @@ def build_buffer_item(self, **kwargs): return buffermodel - def make_buffer_post_request(self, buffers, expected_status=201, gzip=False): + def make_buffer_post_request(self, buffers, expected_status=201, gzip=False, pre_count=0): serialized_recs = BufferSerializer(buffers, many=True) # extract that data that is to be posted @@ -911,9 +911,13 @@ def make_buffer_post_request(self, buffers, expected_status=201, gzip=False): headers["content_type"] = "application/gzip" headers["format"] = None - # delete the records from the DB so we don't conflict when we POST - Buffer.objects.all().delete() - RecordMaxCounterBuffer.objects.all().delete() + # delete the records to match pre_count if zero + if pre_count == 0: + Buffer.objects.all().delete() + RecordMaxCounterBuffer.objects.all().delete() + else: + self.assertEqual(Buffer.objects.count(), pre_count) + self.assertEqual(RecordMaxCounterBuffer.objects.count(), pre_count * 3) response = self.client.post(reverse("buffers-list"), data, **headers) self.assertEqual(response.status_code, expected_status) @@ -964,6 +968,22 @@ def test_push_fails_for_pull_transfersession(self): rec_3 = self.build_buffer_item(transfer_session=rec_1.transfer_session) self.make_buffer_post_request([rec_1, rec_2, rec_3], expected_status=403) + def test_push_repeat_chunk(self): + rec_1 = self.build_buffer_item(push=True, filter=self.default_push_filter) + transfer_session = rec_1.transfer_session + rec_2 = self.build_buffer_item(transfer_session=transfer_session) + # need to create a third buffer so it doesn't mark the transfer complete after pushing 2 + self.build_buffer_item(transfer_session=transfer_session) + self.assertEqual(transfer_session.records_transferred, 0) + + self.make_buffer_post_request([rec_1, rec_2], expected_status=201) + transfer_session.refresh_from_db() + self.assertEqual(transfer_session.records_transferred, 2) + + self.make_buffer_post_request([rec_1, rec_2], expected_status=201, pre_count=2) + transfer_session.refresh_from_db() + self.assertEqual(transfer_session.records_transferred, 2) + def create_records_for_pulling(self, count=3, **kwargs): assert count >= 1 @@ -1025,11 +1045,9 @@ def make_buffer_get_request( transfer_session_id=t_id, model_uuid__in=model_uuids ).delete() - # run the validation logic to ensure no errors were returned - errors = validate_and_create_buffer_data( + validate_and_create_buffer_data( data, TransferSession.objects.get(id=t_id) ) - self.assertFalse(errors) # check that the correct number of buffer items were created self.assertEqual( @@ -1061,6 +1079,13 @@ def test_pull_valid_buffer_list(self): self.make_buffer_get_request(transfer_session_id=transfer_session_id) + def test_pull_repeat(self): + + transfer_session_id = self.create_records_for_pulling(count=3) + + self.make_buffer_get_request(transfer_session_id=transfer_session_id, expected_count=3) + self.make_buffer_get_request(transfer_session_id=transfer_session_id, expected_count=3) + def test_pull_fails_when_transfer_session_id_not_specified(self): self.create_records_for_pulling() From 0fd9fa3ea09ae7853c56c14b4033540e51525d91 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 30 Jun 2026 14:06:13 -0700 Subject: [PATCH 3/5] Ignore 404s on close/destroy requests due to possible retry --- morango/sync/operations.py | 10 +--- morango/sync/syncsession.py | 27 ++++++++- tests/testapp/tests/sync/test_syncsession.py | 61 ++++++++++++++++++++ 3 files changed, 89 insertions(+), 9 deletions(-) diff --git a/morango/sync/operations.py b/morango/sync/operations.py index d14c5497..b29767a5 100644 --- a/morango/sync/operations.py +++ b/morango/sync/operations.py @@ -1390,9 +1390,8 @@ def close_transfer_session(self, context): Closes remote transfer session :type context: NetworkSessionContext - :return: The Response """ - return context.connection._close_transfer_session(context.transfer_session) + context.connection._close_transfer_session(context.transfer_session) def put_buffers(self, context, buffers): """ @@ -1763,8 +1762,5 @@ def handle(self, context): """ :type context: NetworkSessionContext """ - response = self.close_transfer_session(context) - remote_status = transfer_statuses.COMPLETED - if response.status_code < 200 or response.status_code >= 300: - remote_status = transfer_statuses.ERRORED - return remote_status + self.close_transfer_session(context) + return transfer_statuses.COMPLETED diff --git a/morango/sync/syncsession.py b/morango/sync/syncsession.py index fbf49de3..9991b53c 100644 --- a/morango/sync/syncsession.py +++ b/morango/sync/syncsession.py @@ -6,6 +6,7 @@ import os import socket import uuid +from functools import wraps from io import BytesIO from urllib.parse import urljoin from urllib.parse import urlparse @@ -53,6 +54,7 @@ DBBackend = load_backend(connection) + def _join_with_logical_operator(lst, operator): op = ") {operator} (".format(operator=operator) return "(({items}))".format(items=op.join(lst)) @@ -77,6 +79,25 @@ def _get_client_ip_for_server(server_host, server_port): return IP +def ignore_404(target): + """ + Decorator that wraps callables to ignore 404s caused by its use of requests + :param target: A callable + :return: A callable + """ + @wraps(target) + def wrapper(*args, **kwargs): + try: + return target(*args, **kwargs) + except HTTPError as e: + if e.response is None or e.response.status_code != 404: + raise e + else: + logger.debug(f"Ignoring 404 raised by {target.__name__}") + + return wrapper + + # borrowed from https://github.com/django/django/blob/1.11.20/django/utils/text.py#L295 def compress_string(s, compresslevel=9): zbuf = BytesIO() @@ -479,13 +500,15 @@ def _update_transfer_session(self, data, transfer_session): json=data, ) + @ignore_404 def _close_transfer_session(self, transfer_session): - return self.session.delete( + self.session.delete( self.urlresolve(api_urls.TRANSFERSESSION, lookup=transfer_session.id) ) + @ignore_404 def _close_sync_session(self, sync_session): - return self.session.delete( + self.session.delete( self.urlresolve(api_urls.SYNCSESSION, lookup=sync_session.id) ) diff --git a/tests/testapp/tests/sync/test_syncsession.py b/tests/testapp/tests/sync/test_syncsession.py index d41f5d87..1068c4ff 100644 --- a/tests/testapp/tests/sync/test_syncsession.py +++ b/tests/testapp/tests/sync/test_syncsession.py @@ -21,6 +21,7 @@ from morango.models.certificates import Key from morango.models.certificates import ScopeDefinition from morango.models.core import SyncSession +from morango.models.core import TransferSession from morango.models.fields.crypto import SharedKey from morango.sync.context import LocalSessionContext from morango.sync.context import NetworkSessionContext @@ -236,6 +237,66 @@ def create(**data): self.network_connection.close_sync_session(client.sync_session) self.assertEqual(SyncSession.objects.filter(active=True).count(), 0) + def test_close_transfer_session_ignores_404(self): + mock_response = mock.Mock() + mock_response.status_code = 404 + transfer_session = mock.Mock(spec=TransferSession) + transfer_session.id = uuid.uuid4().hex + + with mock.patch.object( + self.network_connection.session, + "delete", + side_effect=HTTPError(response=mock_response), + ): + # should not raise even though the server returned 404 + self.network_connection._close_transfer_session(transfer_session) + + def test_close_sync_session_ignores_404(self): + mock_response = mock.Mock() + mock_response.status_code = 404 + sync_session = mock.Mock(spec=SyncSession) + sync_session.id = uuid.uuid4().hex + + with mock.patch.object( + self.network_connection.session, + "delete", + side_effect=HTTPError(response=mock_response), + ): + # should not raise even though the server returned 404 + self.network_connection._close_sync_session(sync_session) + + def test_close_sync_session_raises_500(self): + mock_response = mock.Mock() + mock_response.status_code = 500 + sync_session = mock.Mock(spec=SyncSession) + sync_session.id = uuid.uuid4().hex + + e = HTTPError(response=mock_response) + + with mock.patch.object( + self.network_connection.session, + "delete", + side_effect=e, + ): + with self.assertRaises(type(e)) as raised: + self.network_connection._close_sync_session(sync_session) + self.assertEqual(e, raised.exception) + + def test_close_sync_session_raises_no_response(self): + sync_session = mock.Mock(spec=SyncSession) + sync_session.id = uuid.uuid4().hex + + e = HTTPError() + + with mock.patch.object( + self.network_connection.session, + "delete", + side_effect=e, + ): + with self.assertRaises(type(e)) as raised: + self.network_connection._close_sync_session(sync_session) + self.assertEqual(e, raised.exception) + @mock.patch.object(SyncSession.objects, "create") def test_resume_sync_session(self, mock_create): def create(**data): From ea045b9677fc2216f7fcb40332f81a6fa0cd1c01 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 30 Jun 2026 14:29:22 -0700 Subject: [PATCH 4/5] Add retry behavior for low level connection issues not captured by urllib3's retries --- morango/sync/controller.py | 2 +- morango/sync/session.py | 290 ++++++++++++++++--- morango/sync/syncsession.py | 38 +-- morango/utils.py | 13 + tests/testapp/tests/sync/test_session.py | 68 ++++- tests/testapp/tests/sync/test_syncsession.py | 25 ++ 6 files changed, 367 insertions(+), 69 deletions(-) diff --git a/morango/sync/controller.py b/morango/sync/controller.py index 92e38e6a..3b3e14f3 100644 --- a/morango/sync/controller.py +++ b/morango/sync/controller.py @@ -270,7 +270,7 @@ def _invoke_middleware(self, context, middleware): return context.stage_status except Exception as e: # always log the error itself - logger.error(e) + logger.exception(e) context.update(stage_status=transfer_statuses.ERRORED, error=e) # fire completed signal, after context update. handlers can use context to detect error signal.completed.fire(context=prepared_context or context) diff --git a/morango/sync/session.py b/morango/sync/session.py index e092e0f6..96f1eee0 100644 --- a/morango/sync/session.py +++ b/morango/sync/session.py @@ -1,11 +1,15 @@ import logging +from contextlib import contextmanager from requests import exceptions +from requests.adapters import HTTPAdapter from requests.sessions import Session from requests.utils import super_len +from urllib3.exceptions import MaxRetryError from urllib3.util.url import parse_url from morango import __version__ +from morango.utils import nullcontext from morango.utils import serialize_capabilities_to_client_request from morango.utils import SETTINGS @@ -13,7 +17,22 @@ logger = logging.getLogger(__name__) +_RETRY_REQUEST_EXCEPTIONS = ( + exceptions.ConnectionError, + exceptions.ChunkedEncodingError, + exceptions.ContentDecodingError, +) + + def _headers_content_length(headers): + """ + Obtains the value of 'Content-Length' from the provided headers. + + :param headers: Dictionary of headers + :type headers: dict + :return: The integer value of 'Content-Length' if found and valid, otherwise 0. + :rtype: int + """ try: content_length = int(headers.get("Content-Length", 0)) if content_length > 0: @@ -24,79 +43,280 @@ def _headers_content_length(headers): def _length_of_headers(headers): + """ + Calculates the total length of all headers. + + :param headers: Dictionary of headers + :type headers: dict + :return: The total length of the string representation of all headers. + :rtype: int + """ return super_len( "\n".join(["{}: {}".format(key, value) for key, value in headers.items()]) ) +def _is_retryable_method(retries, method): + """ + Checks if the request method is configured as retryable. + + :type retries: urllib3.util.retry.Retry + :type method: str|None + :rtype: bool + """ + allowed_methods = getattr(retries, "allowed_methods", None) + if allowed_methods is False or allowed_methods is None: + return True + return method.upper() in allowed_methods if method is not None else False + + +def _log_response_error(err, response): + """ + Logs an error and its associated response content. + + :param err: The exception instance that represents the error encountered. + :type err: Exception + :param response: The HTTP response object associated with the error. + If None, it is interpreted as no response being available. + :type response: Optional[Response] + """ + try: + response_content = response.content if response else "(no response)" + except Exception: + response_content = "(unable to read response)" + logger.error( + "{} Reason: {}".format(err.__class__.__name__, response_content) + ) + + +class ContextualRetryHTTPAdapter(HTTPAdapter): + @contextmanager + def use_retries(self, max_retries): + """ + Context manager for temporarily changing the retry configuration. + + :param max_retries: The temporary Retry object + :type max_retries: urllib3.util.retry.Retry + """ + original_retries = self.max_retries + try: + self.max_retries = max_retries + yield + finally: + self.max_retries = original_retries + + class SessionWrapper(Session): """ Wrapper around `requests.sessions.Session` in order to implement logging around all request errors. """ - bytes_sent = 0 - bytes_received = 0 - - def __init__(self): + def __init__(self, max_retries): + """ + :param max_retries: The urllib3 Retry object + :type max_retries: urllib3.util.retry.Retry + """ super(SessionWrapper, self).__init__() + self.max_retries = max_retries + user_agent_header = "morango/{}".format(__version__) if SETTINGS.CUSTOM_INSTANCE_INFO is not None: instances = list(SETTINGS.CUSTOM_INSTANCE_INFO) if instances: user_agent_header += " " + "{}/{}".format(instances[0], SETTINGS.CUSTOM_INSTANCE_INFO.get(instances[0])) self.headers["User-Agent"] = "{} {}".format(user_agent_header, self.headers["User-Agent"]) + self.hooks["response"].append(self._track_bytes_received) + self.bytes_sent = 0 + self.bytes_received = 0 - def request(self, method, url, **kwargs): - response = None + # use custom adapter + adapter = ContextualRetryHTTPAdapter() + self.mount("http://", adapter) + self.mount("https://", adapter) + + def _track_bytes_sent(self, request): + """ + Request hook that tracks the size of the request, by capturing the size of headers and the + request body. Note: python requests only supports the `response` hook, so this is invoked + manually + + :type request: requests.Request|requests.PreparedRequest + """ try: - response = super(SessionWrapper, self).request(method, url, **kwargs) + parsed_url = parse_url(request.url) + # we don't bother checking if the content length header exists here because we've probably + # been given the request body as Morango sends bodies that aren't streamed, so the + # underlying requests code will set it appropriately + self.bytes_sent += len("{} {} HTTP/1.1".format(request.method, parsed_url.path)) + self.bytes_sent += _length_of_headers(request.headers) + self.bytes_sent += _headers_content_length(request.headers) + except Exception as e: + # tracking bandwidth usage is useful but not critical + logger.exception(e) - # capture bytes received from the response, the length header could be missing if it's - # a chunked response though - content_length = _headers_content_length(response.headers) - if not content_length: - content_length = super_len(response.content) + def _track_bytes_received(self, response, *args, **kwargs): + """ + Response hook that tracks the size of the response, by capturing the size of headers and + the response body + :type response: requests.Response + """ + try: + # headers: self.bytes_received += len( "HTTP/1.1 {} {}".format(response.status_code, response.reason) ) self.bytes_received += _length_of_headers(response.headers) - self.bytes_received += content_length - response.raise_for_status() - return response - except exceptions.RequestException as req_err: - # we want to log all request errors for debugging purposes - if response is None: - response = req_err.response + # body: + # capture bytes received from the response, the length header could be missing if it's + # a chunked response though + content_length = _headers_content_length(response.headers) + if not content_length: + content_length = super_len(response.content) + self.bytes_received += content_length + except Exception as e: + # tracking bandwidth usage is useful but not critical + logger.exception(e) - response_content = response.content if response else "(no response)" - logger.error( - "{} Reason: {}".format(req_err.__class__.__name__, response_content) - ) - raise req_err + def _get_adapter(self, url): + """ + :param url: the request URL + :type url: bytes|str|None + :rtype: Optional[HTTPAdapter] + """ + if url is None: + return None + # requests allows bytes + if isinstance(url, bytes): + url = url.decode("utf-8") + try: + return self.get_adapter(url) + except exceptions.InvalidSchema: + return None def prepare_request(self, request): """ - Override request preparer so we can get the prepared content length, for tracking - transfer sizes + Override request preparer so we can add morango capabilities to the request, and invoke + the sent bytes hook. :type request: requests.Request :rtype: requests.PreparedRequest """ # add header with client's morango capabilities so server has that information serialize_capabilities_to_client_request(request) - prepped = super(SessionWrapper, self).prepare_request(request) - parsed_url = parse_url(request.url) + return super(SessionWrapper, self).prepare_request(request) + + def request(self, method, url, **kwargs): + """ + Issues an HTTP request, with conditional retry behavior if passed `is_retryable` kwarg, and + logs any errors from the request flow. + + :param method: The HTTP request method (e.g., 'GET', 'POST', 'PUT', etc.). + :type method: str + :param url: The URL to send the request to. + :type url: str + :param kwargs: Additional arguments to pass to the underlying request method. + :return: The HTTP response object obtained from the request. + :rtype: requests.Response + :raises Exception: Logs then re-raises any exception that occurs during the request. + """ + adapter = self._get_adapter(url) + + # super's request has strict kwarg list, so we have to pop `is_retryable` and modify + # the adapter state based on the value + is_retryable = kwargs.pop("is_retryable", False) + if is_retryable and isinstance(adapter, ContextualRetryHTTPAdapter): + ctx = adapter.use_retries(self.max_retries) + else: + ctx = nullcontext() - # we don't bother checking if the content length header exists here because we've probably - # been given the request body as Morango sends bodies that aren't streamed, so the - # underlying requests code will set it appropriately - self.bytes_sent += len("{} {} HTTP/1.1".format(request.method, parsed_url.path)) - self.bytes_sent += _length_of_headers(prepped.headers) - self.bytes_sent += _headers_content_length(prepped.headers) + with ctx: + response = None + try: + response = super(SessionWrapper, self).request(method, url, **kwargs) + response.raise_for_status() + return response + except Exception as e: + if response is None: + response = getattr(e, 'response', None) + + _log_response_error(e, response) + raise e + + def send(self, request, **kwargs): + """ + Issues an HTTP request with automatic retry handling for transport-level failures and + logging of request-related errors. + + :param request: The prepared request + :type request: requests.PreparedRequest + :param kwargs: Additional arguments to pass to the underlying `send` method. + :return: The HTTP response object obtained from the request. + :rtype: requests.Response + """ + adapter = self._get_adapter(request.url) + retries = adapter.max_retries if adapter is not None else None - return prepped + while True: + # sent bytes from low-level retries in urllib3 are not captured, but this is good enough + self._track_bytes_sent(request) + response = None + try: + response = super(SessionWrapper, self).send(request, **kwargs) + return response + except exceptions.RequestException as e: + retries = self._should_retry(retries, request, e, response=response) + if retries is None: + raise e + + def _should_retry(self, retries, request, e, response=None): + """ + Determines whether a request should be retried based on the provided criteria, including + the retry settings, the type of exception occurred, and the HTTP method used. This method + also handles sleeping between retries and increments retry counters appropriately. If + retries are exhausted, it raises a `RetryError`. + + :param retries: The retry configuration instance that tracks and manages retry attempts. + :type retries: Optional[urllib3.util.retry.Retry] + :param request: The HTTP request object that specifies the details of the request being made. + :type request: requests.PreparedRequest + :param e: The exception raised during the request execution that triggered this check. + :type e: requests.exceptions.RequestException + :param response: Optional. The HTTP response object corresponding to the request, if available. + :type response: Optional[requests.Response] + :return: The new Retry object if the retry is allowed, otherwise None. + :rtype: Optional[urllib3.util.retry.Retry] + """ + if response is None: + response = e.response + + if ( + retries is None + or retries.total == 0 + or not isinstance(e, _RETRY_REQUEST_EXCEPTIONS) + or not _is_retryable_method(retries, request.method) + ): + return None + + # requests might wrap `MaxRetryError` in a `ConnectionError` + if any(isinstance(arg, MaxRetryError) for arg in e.args): + return None + + try: + # may raise if retries have been exhausted + retries = retries.increment( + method=request.method, + url=request.url, + response=response, + error=e + ) + logger.debug(f"Sleeping before retrying {request.method} request to {request.url}") + retries.sleep(response) + return retries + except MaxRetryError as _e: + # re-raise wrapped exception, like requests would. see `HTTPAdapter.send` + raise exceptions.RetryError(_e.reason, request=request, response=response) def reset_transfer_bytes(self): """ diff --git a/morango/sync/syncsession.py b/morango/sync/syncsession.py index 9991b53c..611e1e70 100644 --- a/morango/sync/syncsession.py +++ b/morango/sync/syncsession.py @@ -14,7 +14,6 @@ from django.db import connection from django.db import transaction from django.utils import timezone -from requests.adapters import HTTPAdapter from requests.exceptions import HTTPError from urllib3.util.retry import Retry @@ -152,16 +151,18 @@ def __init__( self.base_url = base_url self.compresslevel = compresslevel # set up requests session with retry logic - self.session = SessionWrapper() # sleep for {backoff factor} * (2 ^ ({number of total retries} - 1)) between requests # with 7 retry attempts, sleep escalation becomes (0.6s, 1.2s, ..., 38.4s) - retry = Retry(total=retries, backoff_factor=backoff_factor) - adapter = HTTPAdapter(max_retries=retry) - self.session.mount("http://", adapter) - self.session.mount("https://", adapter) + self.session = SessionWrapper( + Retry( + total=retries, + backoff_factor=backoff_factor, + allowed_methods=None, # allow any method + ) + ) # get morango information about server self.server_info = self.session.get( - urljoin(self.base_url, api_urls.INFO) + urljoin(self.base_url, api_urls.INFO), is_retryable=True ).json() self.capabilities = self.server_info.get("capabilities", []) self.chunk_size = chunk_size @@ -457,13 +458,13 @@ def push_signed_client_certificate_chain( return certificate def _get_public_key(self): - return self.session.get(self.urlresolve(api_urls.PUBLIC_KEY)) + return self.session.get(self.urlresolve(api_urls.PUBLIC_KEY), is_retryable=True) def _get_nonce(self): - return self.session.post(self.urlresolve(api_urls.NONCE)) + return self.session.post(self.urlresolve(api_urls.NONCE), is_retryable=True) def _get_certificate_chain(self, params): - return self.session.get(self.urlresolve(api_urls.CERTIFICATE), params=params) + return self.session.get(self.urlresolve(api_urls.CERTIFICATE), params=params, is_retryable=True) def _certificate_signing(self, data, userargs, password): # convert user arguments into query str for passing to auth layer @@ -483,33 +484,33 @@ def _create_sync_session(self, data): def _get_sync_session(self, sync_session): return self.session.get( - self.urlresolve(api_urls.SYNCSESSION, lookup=sync_session.id) + self.urlresolve(api_urls.SYNCSESSION, lookup=sync_session.id), is_retryable=True ) def _create_transfer_session(self, data): - return self.session.post(self.urlresolve(api_urls.TRANSFERSESSION), json=data) + return self.session.post(self.urlresolve(api_urls.TRANSFERSESSION), json=data, is_retryable=True) def _get_transfer_session(self, transfer_session): return self.session.get( - self.urlresolve(api_urls.TRANSFERSESSION, lookup=transfer_session.id) + self.urlresolve(api_urls.TRANSFERSESSION, lookup=transfer_session.id), is_retryable=True ) def _update_transfer_session(self, data, transfer_session): return self.session.patch( self.urlresolve(api_urls.TRANSFERSESSION, lookup=transfer_session.id), - json=data, + json=data, is_retryable=True ) @ignore_404 def _close_transfer_session(self, transfer_session): self.session.delete( - self.urlresolve(api_urls.TRANSFERSESSION, lookup=transfer_session.id) + self.urlresolve(api_urls.TRANSFERSESSION, lookup=transfer_session.id), is_retryable=True ) @ignore_404 def _close_sync_session(self, sync_session): self.session.delete( - self.urlresolve(api_urls.SYNCSESSION, lookup=sync_session.id) + self.urlresolve(api_urls.SYNCSESSION, lookup=sync_session.id), is_retryable=True ) def _push_record_chunk(self, data): @@ -523,9 +524,10 @@ def _push_record_chunk(self, data): self.urlresolve(api_urls.BUFFER), data=gzipped_data, headers={"content-type": "application/gzip"}, + is_retryable=True ) else: - return self.session.post(self.urlresolve(api_urls.BUFFER), json=data) + return self.session.post(self.urlresolve(api_urls.BUFFER), json=data, is_retryable=True) def _pull_record_chunk(self, transfer_session): # pull records from server for given transfer session @@ -534,7 +536,7 @@ def _pull_record_chunk(self, transfer_session): "offset": transfer_session.records_transferred, "transfer_session_id": transfer_session.id, } - return self.session.get(self.urlresolve(api_urls.BUFFER), params=params) + return self.session.get(self.urlresolve(api_urls.BUFFER), params=params, is_retryable=True) class SyncClientSignals(SyncSignal): diff --git a/morango/utils.py b/morango/utils.py index 9c433814..a903480e 100644 --- a/morango/utils.py +++ b/morango/utils.py @@ -143,3 +143,16 @@ def exception_path(exc): if isinstance(exc, Exception): exc_cls = exc.__class__ return f"{exc_cls.__module__}.{exc_cls.__name__}" + + +class nullcontext: + """Replace this with contextlib.nullcontext when python3.6 support is dropped""" + + def __init__(self, value=None): + self.value = value + + def __enter__(self, *args, **kwargs): + return self.value + + def __exit__(self, exc_type, exc_val, exc_tb): + pass diff --git a/tests/testapp/tests/sync/test_session.py b/tests/testapp/tests/sync/test_session.py index 25c5baf7..adfdce36 100644 --- a/tests/testapp/tests/sync/test_session.py +++ b/tests/testapp/tests/sync/test_session.py @@ -1,8 +1,11 @@ import mock from django.test import TestCase +from requests.exceptions import ConnectionError from requests.exceptions import HTTPError from requests.exceptions import RequestException - +from requests.exceptions import RetryError +from urllib3.exceptions import MaxRetryError +from urllib3.util.retry import Retry from morango.sync.session import _length_of_headers from morango.sync.session import SessionWrapper @@ -12,14 +15,22 @@ class SessionWrapperTestCase(TestCase): @mock.patch("morango.sync.session.Session.request") def test_request(self, mocked_super_request): headers = {"Content-Length": 1024} - expected = mocked_super_request.return_value = mock.Mock( + mock_response = mock.Mock( headers=headers, raise_for_status=mock.Mock(), status_code=200, reason="OK" ) - wrapper = SessionWrapper() + wrapper = SessionWrapper(Retry.DEFAULT) + + def dispatch_hooks(method, url, **kwargs): + for hook in wrapper.hooks.get("response", []): + hook(mock_response) + return mock_response + + mocked_super_request.side_effect = dispatch_hooks + actual = wrapper.request("GET", "test_url", is_test=True) mocked_super_request.assert_called_once_with("GET", "test_url", is_test=True) - self.assertEqual(expected, actual) + self.assertEqual(mock_response, actual) head_length = len("HTTP/1.1 200 OK") + _length_of_headers(headers) self.assertEqual(wrapper.bytes_received, 1024 + head_length) @@ -28,12 +39,12 @@ def test_request_user_agent(self): from morango import __version__ as morango_version from requests import __version__ as requests_version - wrapper = SessionWrapper() + wrapper = SessionWrapper(Retry.DEFAULT) expected_user_agent = "morango/{} python-requests/{}".format(morango_version, requests_version) self.assertEqual(wrapper.headers["User-Agent"], expected_user_agent) with self.settings(CUSTOM_INSTANCE_INFO={"kolibri": "0.16.0"}): - wrapper = SessionWrapper() + wrapper = SessionWrapper(Retry.DEFAULT) expected_user_agent = "morango/{} kolibri/0.16.0 python-requests/{}".format(morango_version, requests_version) self.assertEqual(wrapper.headers["User-Agent"], expected_user_agent) @@ -49,7 +60,7 @@ def test_request__not_ok(self, mocked_super_request, mocked_logger): raise_for_status.side_effect = HTTPError(response=expected) - wrapper = SessionWrapper() + wrapper = SessionWrapper(Retry.DEFAULT) with self.assertRaises(HTTPError): wrapper.request("GET", "test_url", is_test=True) @@ -64,7 +75,7 @@ def test_request__not_ok(self, mocked_super_request, mocked_logger): def test_request__really_not_ok(self, mocked_super_request, mocked_logger): mocked_super_request.side_effect = RequestException() - wrapper = SessionWrapper() + wrapper = SessionWrapper(Retry.DEFAULT) with self.assertRaises(RequestException): wrapper.request("GET", "test_url", is_test=True) @@ -76,18 +87,45 @@ def test_request__really_not_ok(self, mocked_super_request, mocked_logger): @mock.patch("morango.sync.session.Session.prepare_request") def test_prepare_request(self, mocked_super_prepare_request): - headers = {"Content-Length": 256} - expected = mocked_super_prepare_request.return_value = mock.Mock( - headers=headers, - ) + expected = mocked_super_prepare_request.return_value = mock.Mock() request = mock.Mock(url="http://test_app/path/to/resource", method="GET", headers={}) - wrapper = SessionWrapper() + wrapper = SessionWrapper(Retry.DEFAULT) actual = wrapper.prepare_request(request) mocked_super_prepare_request.assert_called_once_with(request) self.assertEqual(expected, actual) - head_length = len("GET /path/to/resource HTTP/1.1") + _length_of_headers( - headers + self.assertEqual(wrapper.bytes_sent, 0) + + @mock.patch("morango.sync.session.Session.send") + def test_send(self, mocked_super_send): + headers = {"Content-Length": 256} + mocked_super_send.return_value = mock.Mock() + + prepared_request = mock.Mock( + url="http://test_app/path/to/resource", + method="GET", + headers=headers, ) + wrapper = SessionWrapper(Retry.DEFAULT) + wrapper.send(prepared_request) + + mocked_super_send.assert_called_once_with(prepared_request) + head_length = len("GET /path/to/resource HTTP/1.1") + _length_of_headers(headers) self.assertEqual(wrapper.bytes_sent, 256 + head_length) + + def test_should_retry__raises_retry_error_on_max_retries(self): + retries = mock.Mock() + retries.allowed_methods = None + retries.increment.side_effect = MaxRetryError(None, "http://test_app/path/to/resource") + + prepared_request = mock.Mock( + url="http://test_app/path/to/resource", + method="GET", + ) + + wrapper = SessionWrapper(Retry.DEFAULT) + with self.assertRaises(RetryError) as cm: + wrapper._should_retry(retries, prepared_request, ConnectionError()) + + self.assertIs(cm.exception.request, prepared_request) diff --git a/tests/testapp/tests/sync/test_syncsession.py b/tests/testapp/tests/sync/test_syncsession.py index 1068c4ff..07c5fc3b 100644 --- a/tests/testapp/tests/sync/test_syncsession.py +++ b/tests/testapp/tests/sync/test_syncsession.py @@ -4,7 +4,9 @@ import mock from django.test.testcases import LiveServerTestCase from django.test.utils import override_settings +from requests.exceptions import ChunkedEncodingError from requests.exceptions import HTTPError +from requests.sessions import Session from ..helpers import BaseClientTestCase from ..helpers import BaseTransferClientTestCase @@ -135,6 +137,29 @@ def test_get_remote_certs(self): ) self.assertSetEqual(set(certs), set(remote_certs)) + def test_get_remote_certs__retries_chunked_encoding_error(self): + certs = self.subset_cert.get_ancestors(include_self=True) + original_send = Session.send + attempts = {"chunked_encoding_errors": 0} + + def flaky_request(session, request, **kwargs): + if request.method == "GET" and attempts["chunked_encoding_errors"] == 0: + attempts["chunked_encoding_errors"] += 1 + raise ChunkedEncodingError("Connection broken") + return original_send(session, request, **kwargs) + + with mock.patch( + "morango.sync.session.Session.send", + autospec=True, + side_effect=flaky_request, + ): + remote_certs = self.network_connection.get_remote_certificates( + self.root_cert.id + ) + + self.assertEqual(1, attempts["chunked_encoding_errors"]) + self.assertSetEqual(set(certs), set(remote_certs)) + @mock.patch.object(SessionWrapper, "request") def test_csr(self, mock_request): # mock a "signed" cert being returned by server From ddb551fc5c443f2911b3e66d5f4b87ee5c2318e4 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 30 Jun 2026 14:33:22 -0700 Subject: [PATCH 5/5] Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ec2bb29..876b0e43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ List of the most important changes for each release. ## 0.8.14 - Adds utility for addressing immediate FK constraints caused by Django upgrade, automatically performed for morango models in a Django migration. +- Adds retry behavior for low-level connection issues not handled by `urllib3` retries +- Allows repeat pushes and pulls of buffers during transfer +- Ignores HTTP 404 errors during sync or transfer session closure, which may occur if they're already closed ## 0.8.13 - Removes multiprocessing fallback for RSA key generation to avoid leaving zombie processes; key generation now stays in-process