Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions plugin/framework/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,32 @@ def safe_json_loads(text: Any, default: Any = None) -> Any:
return default


def handle_network_error(e, context="network_operation"):
"""Handle network errors with appropriate logging and user messaging."""
import logging
log = logging.getLogger(__name__)

error_payload = format_error_payload(e)

if isinstance(e, NetworkError):
log.error(f"Network error [{context}]: {e.message}",
extra={"network_error": error_payload})
else:
# Wrap non-network exceptions
wrapped = NetworkError(
f"Network-related error in {context}",
code="NETWORK_WRAPPED_ERROR",
details={
"original_error": str(e),
"type": type(e).__name__,
"context": context
}
)
log.error(f"Wrapped network error [{context}]: {wrapped.message}")
error_payload = format_error_payload(wrapped)

return error_payload

def safe_python_literal_eval(text: Any, default: Any = None) -> Any:
"""Safely parse a Python-style literal (e.g. from an LLM) without using ast.literal_eval.
Supports scalars (bool, None, number, string) and simple JSON-compatible lists/dicts.
Expand Down
70 changes: 70 additions & 0 deletions plugin/framework/retry_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import time
import random
from functools import wraps
from typing import Callable, TypeVar, Any

from plugin.framework.errors import NetworkError

T = TypeVar('T')

def retry_with_backoff(
max_attempts: int = 3,
base_delay: float = 0.1,
max_delay: float = 2.0,
retry_exceptions: tuple = (ConnectionError, TimeoutError, OSError),
logger=None
) -> Callable:
"""Retry decorator with exponential backoff.

Args:
max_attempts: Maximum number of retry attempts
base_delay: Base delay in seconds
max_delay: Maximum delay in seconds
retry_exceptions: Tuple of exceptions to retry on
logger: Optional logger for retry logging
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
attempt = 1
last_exception = None

while attempt <= max_attempts:
try:
return func(*args, **kwargs)
except retry_exceptions as e:
last_exception = e
if attempt == max_attempts:
break

# Exponential backoff with jitter
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
delay = delay * random.uniform(0.5, 1.5) # Add jitter

if logger:
logger.warning(
f"Attempt {attempt} failed: {str(e)}. "
f"Retrying in {delay:.2f}s..."
)

time.sleep(delay)
attempt += 1

# If all attempts failed, raise the last exception
if last_exception:
raise NetworkError(
f"Operation failed after {max_attempts} attempts",
code="NETWORK_RETRY_FAILED",
details={
"attempts": max_attempts,
"last_error": str(last_exception),
"type": type(last_exception).__name__
}
) from last_exception

# Shouldn't reach here
raise NetworkError("Unexpected retry failure")

return wrapper

return decorator
107 changes: 51 additions & 56 deletions plugin/modules/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from plugin.framework.auth import resolve_auth_for_config, build_auth_headers, AuthError
from plugin.framework.errors import NetworkError, safe_json_loads
from plugin.framework.utils import get_url_hostname, get_url_path_and_query
from plugin.framework.retry_decorator import retry_with_backoff

from plugin.modules.http.errors import format_error_message, _format_http_error_response
from plugin.modules.http.ssl_helpers import (
Expand Down Expand Up @@ -449,6 +450,13 @@ def stream_completion(
stop_checker=stop_checker,
)

@retry_with_backoff(
max_attempts=3,
base_delay=0.2,
max_delay=3.0,
retry_exceptions=(ConnectionError, TimeoutError, OSError, http.client.HTTPException),
logger=log
)
def _run_streaming_loop(
self,
method,
Expand All @@ -459,7 +467,6 @@ def _run_streaming_loop(
on_thinking=None,
on_delta=None,
stop_checker=None,
_retry=True,
):
"""Common low-level streaming engine."""
init_logging(self.ctx)
Expand Down Expand Up @@ -575,29 +582,13 @@ def _run_streaming_loop(
if stop_checker and stop_checker():
log.error("Connection error during stop; exiting streaming loop")
return "stop"
if self._enable_local_ssl_fallback(e):
return self._run_streaming_loop(
method, path, body, headers,
on_content=on_content,
on_thinking=on_thinking,
on_delta=on_delta,
stop_checker=stop_checker,
_retry=False,
)

err_msg = format_error_message(e)
if _retry:
log.warning("Retrying streaming request once on fresh connection")
return self._run_streaming_loop(
method, path, body, headers,
on_content=on_content,
on_thinking=on_thinking,
on_delta=on_delta,
stop_checker=stop_checker,
_retry=False,
)
log.error("Connection retry failed: %s" % err_msg)
raise NetworkError(err_msg, code="CONNECTION_ERROR", context={"url": path}) from e
# Check for local SSL fallback; state changes internally
self._enable_local_ssl_fallback(e)

# Re-raise so the @retry_with_backoff decorator catches it
raise e

except NetworkError:
self._close_connection()
raise
Expand Down Expand Up @@ -649,6 +640,35 @@ def stream_chat_response(
stop_checker=stop_checker,
)

@retry_with_backoff(
max_attempts=3,
base_delay=0.2,
max_delay=3.0,
retry_exceptions=(ConnectionError, TimeoutError, OSError, http.client.HTTPException),
logger=log
)
def _execute_sync_request(self, method, path, body, headers):
"""Execute a synchronous request with retry logic."""
try:
conn = self._get_connection()
conn.request(method, path, body=body, headers=headers)
response = conn.getresponse()
if response.status != 200:
err_body = response.read().decode("utf-8", errors="replace")
log.error("API Error %d: %s" % (response.status, err_body))
self._close_connection()
raise NetworkError(
_format_http_error_response(response.status, response.reason, err_body),
code="HTTP_ERROR",
context={"url": path, "status": response.status}
)
return json.loads(response.read().decode("utf-8"))
except (http.client.HTTPException, socket.error, OSError) as e:
log.error("Connection error in sync request, closing: %s" % e)
self._close_connection()
self._enable_local_ssl_fallback(e)
raise e

def request_with_tools(
self,
messages,
Expand Down Expand Up @@ -711,39 +731,14 @@ def request_with_tools(
usage = message_snapshot.get("usage", {})
else:
# Sync path
result = None
for attempt in (0, 1):
try:
conn = self._get_connection()
conn.request(method, path, body=body, headers=headers)
response = conn.getresponse()
if response.status != 200:
err_body = response.read().decode("utf-8", errors="replace")
log.error("API Error %d: %s" % (response.status, err_body))
self._close_connection()
raise NetworkError(
_format_http_error_response(response.status, response.reason, err_body),
code="HTTP_ERROR",
context={"url": path, "status": response.status}
)
result = json.loads(response.read().decode("utf-8"))
break
except (http.client.HTTPException, socket.error, OSError) as e:
log.error("Connection error, closing: %s" % e)
self._close_connection()
if self._enable_local_ssl_fallback(e):
continue
if attempt == 0:
log.warning("Retrying request_with_tools once on fresh connection")
continue
log.error("Connection retry failed: %s" % format_error_message(e))
raise NetworkError(format_error_message(e), code="CONNECTION_ERROR", context={"url": path}) from e
except NetworkError:
raise
except Exception as e:
err_msg = format_error_message(e)
log.error("request_with_tools ERROR: %s -> %s" % (type(e).__name__, err_msg))
raise NetworkError(err_msg, context={"url": path}) from e
try:
result = self._execute_sync_request(method, path, body, headers)
except NetworkError:
raise
except Exception as e:
err_msg = format_error_message(e)
log.error("request_with_tools ERROR: %s -> %s" % (type(e).__name__, err_msg))
raise NetworkError(err_msg, context={"url": path}) from e

log.debug("=== Sync response: %s" % json.dumps(result, indent=2))

Expand Down
8 changes: 8 additions & 0 deletions plugin/modules/http/mcp_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from plugin.framework.main_thread import execute_on_main_thread
from plugin.framework.errors import WriterAgentException, safe_json_loads
from plugin.framework.retry_decorator import retry_with_backoff
from plugin.modules.http.mcp_state import (
MCPState, MCPStateStr, EventKind, MCPEvent,
ParseRequestEffect, ResolveDocumentEffect,
Expand Down Expand Up @@ -531,6 +532,13 @@ def _process_jsonrpc(self, msg, document_url=None):

# ── Backpressure execution ───────────────────────────────────────

@retry_with_backoff(
max_attempts=3,
base_delay=0.1,
max_delay=1.0,
retry_exceptions=(BusyError, TimeoutError),
logger=log
)
def _execute_with_backpressure(self, tool_name, arguments, document_url=None):
"""Execute a tool on the VCL main thread with backpressure."""
acquired = _tool_semaphore.acquire(timeout=_WAIT_TIMEOUT)
Expand Down
99 changes: 99 additions & 0 deletions plugin/tests/test_network_resilience.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import pytest
import time
from unittest.mock import MagicMock, patch

from plugin.framework.errors import NetworkError, format_error_payload
from plugin.framework.retry_decorator import retry_with_backoff
from plugin.framework.errors import handle_network_error
from plugin.modules.http.mcp_protocol import BusyError

def test_retry_success_on_first_try():
mock_func = MagicMock(return_value="success")

@retry_with_backoff(max_attempts=3, base_delay=0.01)
def test_func():
return mock_func()

result = test_func()
assert result == "success"
assert mock_func.call_count == 1

def test_retry_success_after_failure():
mock_func = MagicMock(side_effect=[ConnectionError("fail"), "success"])

@retry_with_backoff(max_attempts=3, base_delay=0.01)
def test_func():
return mock_func()

result = test_func()
assert result == "success"
assert mock_func.call_count == 2

def test_retry_exhaustion_raises_network_error():
mock_func = MagicMock(side_effect=ConnectionError("fail"))

@retry_with_backoff(max_attempts=3, base_delay=0.01)
def test_func():
return mock_func()

with pytest.raises(NetworkError) as excinfo:
test_func()

assert "Operation failed after 3 attempts" in str(excinfo.value)
assert excinfo.value.code == "NETWORK_RETRY_FAILED"
assert mock_func.call_count == 3

def test_retry_ignores_unspecified_exceptions():
mock_func = MagicMock(side_effect=ValueError("fail"))

@retry_with_backoff(max_attempts=3, base_delay=0.01, retry_exceptions=(ConnectionError,))
def test_func():
return mock_func()

with pytest.raises(ValueError) as excinfo:
test_func()

assert mock_func.call_count == 1

@patch("time.sleep")
def test_retry_backoff_timing(mock_sleep):
mock_func = MagicMock(side_effect=ConnectionError("fail"))

@retry_with_backoff(max_attempts=3, base_delay=0.1, max_delay=1.0)
def test_func():
return mock_func()

with pytest.raises(NetworkError):
test_func()

assert mock_sleep.call_count == 2
# First sleep base_delay * (2^0) = 0.1 * jitter
# Second sleep base_delay * (2^1) = 0.2 * jitter
# Just check sleep was called

def test_handle_network_error_with_network_error():
err = NetworkError("Test error", code="TEST_CODE")
payload = handle_network_error(err, "test_context")

assert payload["status"] == "error"
assert payload["code"] == "TEST_CODE"
assert payload["message"] == "Test error"

def test_handle_network_error_with_other_exception():
err = ValueError("Test error")
payload = handle_network_error(err, "test_context")

assert payload["status"] == "error"
assert payload["code"] == "NETWORK_WRAPPED_ERROR"
assert "Network-related error in test_context" in payload["message"]
assert payload["details"]["type"] == "ValueError"

def test_mcp_busy_error_retry():
mock_func = MagicMock(side_effect=[BusyError("busy"), "success"])

@retry_with_backoff(max_attempts=3, base_delay=0.01, retry_exceptions=(BusyError,))
def execute():
return mock_func()

assert execute() == "success"
assert mock_func.call_count == 2