Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 193 additions & 95 deletions mpesakit/http_client/mpesa_async_http_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,98 @@
"""MpesaAsyncHttpClient: An asynchronous client for making HTTP requests to the M-Pesa API."""

from typing import Dict, Any, Optional
from urllib import response
import httpx
import logging

from mpesakit.errors import MpesaError, MpesaApiException
from .http_client import AsyncHttpClient
from urllib.parse import urljoin

from tenacity import (
AsyncRetrying,
RetryCallState,
before_sleep_log,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)

logger = logging.getLogger(__name__)


def handle_request_error(response: httpx.Response):
"""Handles non-successful HTTP responses.

This function is now responsible for converting HTTP status codes
and JSON parsing errors into MpesaApiException.
"""
if response.is_success:
return
try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response.text,
)
)
Comment on lines +38 to +45
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

git ls-files | grep -E "(mpesa|error)" | head -20

Repository: Byte-Barn/mpesakit

Length of output: 836


🏁 Script executed:

cat -n mpesakit/http_client/mpesa_async_http_client.py | wc -l

Repository: Byte-Barn/mpesakit

Length of output: 65


🏁 Script executed:

# First, let's look at the MpesaError class definition
rg -n "class MpesaError" --type py -A 15

Repository: Byte-Barn/mpesakit

Length of output: 1021


🏁 Script executed:

# Now let's examine the three error handling locations
cat -n mpesakit/http_client/mpesa_async_http_client.py | sed -n '30,50p'

Repository: Byte-Barn/mpesakit

Length of output: 869


🏁 Script executed:

# Check lines around 154-170 and 211-224
cat -n mpesakit/http_client/mpesa_async_http_client.py | sed -n '150,175p'

Repository: Byte-Barn/mpesakit

Length of output: 1150


🏁 Script executed:

cat -n mpesakit/http_client/mpesa_async_http_client.py | sed -n '205,230p'

Repository: Byte-Barn/mpesakit

Length of output: 1289


🏁 Script executed:

# Find where raw_response is used downstream
rg -n "raw_response" --type py -B 2 -A 2

Repository: Byte-Barn/mpesakit

Length of output: 5253


raw_response field receives inconsistent types across error paths.

handle_request_error passes raw_response=response_data (a dict), while the except blocks in post() and get() pass raw_response=getattr(response, "text", None) (a str or None). Any downstream code that reads MpesaError.raw_response will encounter different types depending on the error path, which will cause AttributeError/TypeError at runtime if the field is assumed to be one type.

Normalize to a consistent type (e.g., always str or always dict):

🛠 Proposed fix for `handle_request_error`
     raise MpesaApiException(
         MpesaError(
             error_code=f"HTTP_{response.status_code}",
             error_message=error_message,
             status_code=response.status_code,
-            raw_response=response_data,
+            raw_response=response.text,
         )
     )

Also applies to synchronous equivalents in mpesakit/http_client/mpesa_http_client.py.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
)
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response.text,
)
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@mpesakit/http_client/mpesa_async_http_client.py` around lines 37 - 44, The
MpesaError.raw_response is receiving mixed types (dict from handle_request_error
vs str/None from post()/get()), so normalize it to a single type (choose string)
across code paths: in handle_request_error convert response_data (dict) to a
JSON string (or str()) before passing as raw_response to
MpesaError/MpesaApiException, and in post() and get() ensure the except blocks
pass raw_response as response.text or str(response_text) (or None) consistently;
make the same change in the synchronous client
(mpesakit/http_client/mpesa_http_client.py) and update any tests or callers
expecting a dict accordingly, referencing the functions/methods
handle_request_error, post, get, MpesaError and MpesaApiException.



def handle_retry_exception(retry_state: RetryCallState):
"""Custom hook to handle exceptions after all retries fail.

It raises a custom MpesaApiException with the appropriate error code.
"""
if retry_state.outcome:
exception = retry_state.outcome.exception()

if isinstance(exception, httpx.TimeoutException):
raise MpesaApiException(
MpesaError(error_code="REQUEST_TIMEOUT", error_message=str(exception))
) from exception
elif isinstance(exception, httpx.ConnectError):
raise MpesaApiException(
MpesaError(error_code="CONNECTION_ERROR", error_message="Failed to connect to M-Pesa API.")
) from exception

raise MpesaApiException(
MpesaError(error_code="REQUEST_FAILED", error_message=str(exception))
) from exception

raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message="An unknown retry error occurred.",
)
)


def retry_enabled(enabled: bool):
"""Factory function to conditionally enable retries.

Args:
enabled (bool): Whether to enable retry logic.

Returns:
A retry condition function.
"""
base_retry = retry_if_exception_type(
httpx.TimeoutException
) | retry_if_exception_type(httpx.ConnectError)

def _retry(retry_state):
if not enabled:
return False
return base_retry(retry_state)

return _retry


class MpesaAsyncHttpClient(AsyncHttpClient):
Expand All @@ -20,140 +108,150 @@ class MpesaAsyncHttpClient(AsyncHttpClient):
base_url: str
_client: httpx.AsyncClient

def __init__(self, env: str = "sandbox"):
def __init__(self, env: str = "sandbox", retry_enabled_flag: bool = True, wait_strategy=None, stop_strategy=None):
"""Initializes the MpesaAsyncHttpClient with the specified environment."""
self.base_url = self._resolve_base_url(env)
self._client = httpx.AsyncClient(base_url=self.base_url)

self._retry_enabled_flag = retry_enabled_flag
self._wait_strategy = wait_strategy or wait_random_exponential(multiplier=5, max=8)
self._stop_strategy = stop_strategy or stop_after_attempt(3)

def _resolve_base_url(self, env: str) -> str:
if env.lower() == "production":
return "https://api.safaricom.co.ke"
return "https://sandbox.safaricom.co.ke"

def _build_retrying(self):
return AsyncRetrying(
retry=retry_enabled(enabled=self._retry_enabled_flag),
wait=self._wait_strategy,
stop=self._stop_strategy,
retry_error_callback=handle_retry_exception,
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
Comment on lines +125 to +133
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find project dependency files to identify tenacity version
find . -type f \( -name "requirements.txt" -o -name "setup.py" -o -name "pyproject.toml" -o -name "setup.cfg" \) | head -20

Repository: Byte-Barn/mpesakit

Length of output: 78


🏁 Script executed:

# Read the specific file being reviewed
head -150 mpesakit/http_client/mpesa_async_http_client.py | tail -50

Repository: Byte-Barn/mpesakit

Length of output: 1975


🏁 Script executed:

# Get the full context of the _build_retrying method and surrounding code
sed -n '110,145p' mpesakit/http_client/mpesa_async_http_client.py

Repository: Byte-Barn/mpesakit

Length of output: 1545


🌐 Web query:

tenacity library reraise=True retry_error_callback precedence behavior

💡 Result:

In tenacity, when retries are exhausted (i.e., the stop condition is met), retry_error_callback takes precedence over reraise=True:

  • If retry_error_callback is set, Tenacity schedules only that callback and returns its value as the function result; it does not raise RetryError or re-raise the last exception (unless your callback raises). This happens because the stop-handling path checks retry_error_callback first and returns before reaching the reraise / RetryError logic. [1]
  • reraise=True only applies in the no-callback case: Tenacity constructs retry_error_cls(last_attempt) (default RetryError), and if reraise is true it calls retry_exc.reraise() to re-raise the last attempt’s exception; otherwise it raises RetryError. [1][2]

So the effective precedence is:

  1. retry_error_callback (if provided) ⇒ return callback value (or raise whatever the callback raises)
  2. else if reraise=Trueraise last attempt exception
  3. else ⇒ raise RetryError [1]

Sources: [1] [2]


🏁 Script executed:

# Find where handle_retry_exception is defined
rg "def handle_retry_exception" --type py -A 10

Repository: Byte-Barn/mpesakit

Length of output: 1988


🏁 Script executed:

# Check imports in the file to understand where handle_retry_exception comes from
head -30 mpesakit/http_client/mpesa_async_http_client.py

Repository: Byte-Barn/mpesakit

Length of output: 851


🏁 Script executed:

# Search for handle_retry_exception usage and definition across the codebase
rg "handle_retry_exception" --type py -B 2 -A 5

Repository: Byte-Barn/mpesakit

Length of output: 3175


Remove reraise=True—it is dead code that conflicts with the callback precedence.

Tenacity's retry_error_callback takes precedence over reraise=True. When a callback is provided, the reraise parameter is ignored and the callback's return value (or raised exception) is used instead. Your handle_retry_exception callback correctly maps httpx.TimeoutException and httpx.ConnectError to MpesaApiException, so the exception wrapping works as intended—but reraise=True never executes and merely creates confusion.

The synchronous counterpart in mpesa_http_client.py does not include this parameter. Remove reraise=True from _build_retrying() to match the sync version and clarify intent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@mpesakit/http_client/mpesa_async_http_client.py` around lines 125 - 133, The
_build_retrying method currently passes reraise=True to AsyncRetrying which is
dead code when retry_error_callback is provided; remove the reraise=True
argument from the AsyncRetrying call in mpesa_async_http_client.py so behavior
matches the sync version (mpesa_http_client.py) and relies on
handle_retry_exception to map/raise MpesaApiException; keep all other parameters
(retry, wait, stop, retry_error_callback=handle_retry_exception, before_sleep)
unchanged.


async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._client.aclose()

async def async_raw_post(
self, url: str, json: Dict[str, Any], headers: Dict[str, str], timeout: int = 10
) -> httpx.Response:
"""Low-level POST request - may raise httpx exceptions."""
full_url = urljoin(self.base_url, url)

async def post(
self, url: str, json: Dict[str, Any], headers: Dict[str, str]
) -> Dict[str, Any]:
"""Sends an asynchronous POST request to the M-Pesa API."""
try:
async for attempt in self._build_retrying():
with attempt:
response = await self._client.post(
full_url, json=json, headers=headers, timeout=timeout
)
break

if response is None:
raise RuntimeError("Retry loop exited without returning a response or raising an exception.")

return response

Comment on lines +137 to +154
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Pipeline failure: Mypy [return] — missing return statement in async_raw_post.

After the async for loop there is no explicit return or raise, so Mypy (correctly) flags this as a missing return path. At runtime tenacity's retry_error_callback always raises before the loop can exhaust silently, but Mypy can't infer that. Add an unreachable sentinel raise after the loop to satisfy the type checker and document the invariant.

🛠 Proposed fix
     async def async_raw_post(
         self, url: str, json: Dict[str, Any], headers: Dict[str, str], timeout: int = 10
     ) -> httpx.Response:
         """Low-level POST request - may raise httpx exceptions."""
         full_url = urljoin(self.base_url, url)
 
         async for attempt in self._build_retrying():
             with attempt:
                 return await self._client.post(
                     full_url, json=json, headers=headers, timeout=timeout
                 )
+
+        raise RuntimeError("async_raw_post: unreachable – retry_error_callback should have raised")
🧰 Tools
🪛 GitHub Actions: Code Quality

[error] 136-136: Mypy: Missing return statement. [return]

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@mpesakit/http_client/mpesa_async_http_client.py` around lines 136 - 147, The
async_raw_post function's async for loop using self._build_retrying() can
conceptually fall through without returning, causing Mypy to flag a missing
return; after the loop add an explicit unreachable sentinel (e.g., raise
RuntimeError("unreachable: retry exhausted") or AssertionError) to satisfy the
type checker and document the invariant, referencing async_raw_post and the
retry iterator returned by _build_retrying to indicate why the raise is safe.

response = await self._client.post(
url, json=json, headers=headers, timeout=10
)


try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

if not response.is_success:
error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
)
async def post(
self, url: str, json: Dict[str, Any], headers: Dict[str, str]
) -> Dict[str, Any]:
"""Sends a POST request to the M-Pesa API.

return response_data
Args:
url (str): The URL path for the request.
json (Dict[str, Any]): The JSON payload for the request body.
headers (Dict[str, str]): The HTTP headers for the request.
timeout (int): The timeout for the request in seconds.

except httpx.TimeoutException:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_TIMEOUT",
error_message="Request to Mpesa timed out.",
status_code=None,
)
)
except httpx.ConnectError:
raise MpesaApiException(
MpesaError(
error_code="CONNECTION_ERROR",
error_message="Failed to connect to Mpesa API. Check network or URL.",
status_code=None,
)
Returns:
Dict[str, Any]: The JSON response from the API.
"""
response: httpx.Response | None = None
try:
response = await self.async_raw_post(
url, json=json, headers=headers, timeout=10
)
except httpx.HTTPError as e:
handle_request_error(response)
return response.json()

except (httpx.RequestError, ValueError) as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message=f"HTTP request failed: {str(e)}",
status_code=None,
raw_response=None,
error_message="HTTP request failed.",
status_code=getattr(response, "status_code", None),
raw_response=getattr(response, "text", None),
)
)
) from e
Comment on lines +179 to +187
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

REQUEST_TIMEOUT/CONNECTION_ERROR codes are silently downgraded to REQUEST_FAILED when retries are disabled.

When retry_enabled_flag=False, retry_enabled() returns False for every failed attempt. Tenacity's return_last_value path re-raises the original httpx.TimeoutException/httpx.ConnectError directly — handle_retry_exception is only invoked when the stop condition triggers, not when retry returns False. The except (httpx.RequestError, ValueError) block then catches the raw exception and maps it uniformly to REQUEST_FAILED, losing the more specific codes.

The same issue affects get() (lines 237–245). Consider mapping the exception types explicitly in the except handler to preserve specificity:

🛠 Proposed fix
-        except (httpx.RequestError, ValueError) as e:
+        except httpx.TimeoutException as e:
             raise MpesaApiException(
                 MpesaError(
-                    error_code="REQUEST_FAILED",
-                    error_message="HTTP request failed.",
+                    error_code="REQUEST_TIMEOUT",
+                    error_message=str(e),
                     status_code=getattr(response, "status_code", None),
                     raw_response=getattr(response, "text", None),
                 )
             ) from e
+        except httpx.ConnectError as e:
+            raise MpesaApiException(
+                MpesaError(
+                    error_code="CONNECTION_ERROR",
+                    error_message="Failed to connect to M-Pesa API.",
+                    status_code=getattr(response, "status_code", None),
+                    raw_response=getattr(response, "text", None),
+                )
+            ) from e
+        except (httpx.RequestError, ValueError) as e:
+            raise MpesaApiException(
+                MpesaError(
+                    error_code="REQUEST_FAILED",
+                    error_message="HTTP request failed.",
+                    status_code=getattr(response, "status_code", None),
+                    raw_response=getattr(response, "text", None),
+                )
+            ) from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@mpesakit/http_client/mpesa_async_http_client.py` around lines 179 - 187, The
generic except block that maps all httpx.RequestError/ValueError to
REQUEST_FAILED must be changed to preserve specific error codes when timeouts or
connection errors occur: in the except (httpx.RequestError, ValueError) as e
handler (used by the request/send path and in get()), inspect the exception type
(isinstance(e, httpx.TimeoutException) -> use error_code "REQUEST_TIMEOUT";
isinstance(e, httpx.ConnectError) -> "CONNECTION_ERROR"; else ->
"REQUEST_FAILED"), construct the MpesaError with the appropriate error_message
and include status_code/text from getattr(response, ...) if present, then raise
MpesaApiException from e; ensure this change is applied to both get() and the
other request-handling block so retry_disabled paths keep specific codes and
preserve exception chaining and existing use of MpesaError/MpesaApiException.


async def get(

async def async_raw_get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""Sends an asynchronous GET request to the M-Pesa API."""
try:
if headers is None:
headers = {}

response = await self._client.get(
url, params=params, headers=headers, timeout=10
)
timeout: int = 10,
) -> httpx.Response:
"""Low-level GET request - may raise httpx exceptions."""
if headers is None:
headers = {}
full_url = urljoin(self.base_url, url)

try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

if not response.is_success:
error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
async for attempt in self._build_retrying():
with attempt:
response = await self._client.get(
full_url, params=params, headers=headers, timeout=timeout
)

if response is None:
raise RuntimeError("Retry loop exited without returning a response or raising an exception.")

return response

return response_data


Comment thread
coderabbitai[bot] marked this conversation as resolved.
except httpx.TimeoutException:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_TIMEOUT",
error_message="Request to Mpesa timed out.",
status_code=None,
)
)
except httpx.ConnectError:
raise MpesaApiException(
MpesaError(
error_code="CONNECTION_ERROR",
error_message="Failed to connect to Mpesa API. Check network or URL.",
status_code=None,
)
)
except httpx.HTTPError as e:
async def get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""Sends a GET request to the M-Pesa API.

Args:
url (str): The URL path for the request.
params (Optional[Dict[str, Any]]): The URL parameters.
headers (Optional[Dict[str, str]]): The HTTP headers.
timeout (int): The timeout for the request in seconds.

Returns:
Dict[str, Any]: The JSON response from the API.
"""
response: httpx.Response | None = None
try:
response = await self.async_raw_get(url, params, headers, timeout = 10)
handle_request_error(response)
return response.json()
except (httpx.RequestError, ValueError) as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message=f"HTTP request failed: {str(e)}",
status_code=None,
raw_response=None,
error_message="HTTP request failed.",
status_code=getattr(response, "status_code", None),
raw_response=getattr(response, "text", None),
)
)
) from e

async def aclose(self):
"""Manually close the underlying httpx client connection pool."""
await self._client.aclose()

async def __aenter__(self) -> "MpesaAsyncHttpClient":
"""Context manager entry point."""
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit point. Closes the client."""
await self._client.aclose()
4 changes: 2 additions & 2 deletions mpesakit/mpesa_express/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class StkPushSimulateRequest(BaseModel):
json_schema_extra={
"example": {
"BusinessShortCode": 654321,
"Password": "bXlwYXNzd29yZA==",
"Password": "bXlwYXNzd29yZA==", # nosec B105
"Timestamp": "20240607123045",
"TransactionType": "CustomerPayBillOnline",
"Amount": 10,
Expand Down Expand Up @@ -499,7 +499,7 @@ class StkPushQueryRequest(BaseModel):
json_schema_extra={
"example": {
"BusinessShortCode": 654321,
"Password": "bXlwYXNzd29yZA==",
"Password": "bXlwYXNzd29yZA==", # nosec B105
"Timestamp": "20240607123045",
"CheckoutRequestID": "ws_CO_DMZ_123212312_2342347678234",
}
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/http_client/test_mpesa_async_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def test_post_success(async_client):

assert result == {"foo": "bar"}
mock_post.assert_called_once()
mock_post.assert_called_with("/test", json={"a": 1}, headers={"h": "v"}, timeout=10)
assert mock_post.call_args[0][0] == "https://sandbox.safaricom.co.ke/test"


@pytest.mark.asyncio
Expand Down Expand Up @@ -126,7 +126,7 @@ async def test_post_generic_httpx_error(async_client):
await async_client.post("/error", json={}, headers={})

assert exc.value.error.error_code == "REQUEST_FAILED"
assert "protocol error" in exc.value.error.error_message
assert "HTTP request failed" in exc.value.error.error_message


@pytest.mark.asyncio
Expand All @@ -141,7 +141,7 @@ async def test_get_success(async_client):

assert result == {"foo": "bar"}
mock_get.assert_called_once()
mock_get.assert_called_with("/test", params={"a": 1}, headers={"h": "v"}, timeout=10)
assert mock_get.call_args[0][0] == "https://sandbox.safaricom.co.ke/test"


@pytest.mark.asyncio
Expand Down Expand Up @@ -171,7 +171,7 @@ async def test_get_timeout(async_client):
await async_client.get("/timeout")

assert exc.value.error.error_code == "REQUEST_TIMEOUT"
assert "timed out" in exc.value.error.error_message
assert "Test Timeout" in exc.value.error.error_message


@pytest.mark.asyncio
Expand Down
Loading