Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/knowhere/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ConflictError,
GatewayTimeoutError,
InternalServerError,
InvalidStateError,
JobFailedError,
KnowhereError,
NotFoundError,
Expand All @@ -30,6 +31,7 @@
PollingTimeoutError,
RateLimitError,
ServiceUnavailableError,
ValidationError,
)
from knowhere._types import PollProgressCallback, UploadProgressCallback
from knowhere._version import __version__
Expand Down Expand Up @@ -58,6 +60,8 @@
"__version__",
# Exceptions
"KnowhereError",
"ValidationError",
"InvalidStateError",
"APIConnectionError",
"APITimeoutError",
"APIStatusError",
Expand Down
127 changes: 96 additions & 31 deletions src/knowhere/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from knowhere._exceptions import (
APIConnectionError,
APITimeoutError,
ValidationError,
makeStatusError,
)
from knowhere._logging import getLogger, redactSensitiveHeaders
Expand All @@ -35,17 +36,23 @@

_logger = getLogger()

# Error codes that are safe to retry
_RETRYABLE_ERROR_CODES: frozenset[str] = frozenset({
"rate_limit_exceeded",
"service_unavailable",
"gateway_timeout",
"internal_server_error",
"timeout",
# Error codes that are always safe to retry (matches server ALWAYS_RETRYABLE_ERROR_CODES)
_ALWAYS_RETRYABLE_ERROR_CODES: frozenset[str] = frozenset({
"ABORTED", # 409 - Concurrency conflict
"UNAVAILABLE", # 503 - Service temporarily down
"DEADLINE_EXCEEDED", # 504 - Timeout
})

# Status codes that are safe to retry
_RETRYABLE_STATUS_CODES: frozenset[int] = frozenset({408, 429, 500, 502, 503, 504})
# RESOURCE_EXHAUSTED (429) is conditionally retryable:
# - Rate limit: details.retry_after present → RETRY
# - Quota exceeded: no retry_after → DO NOT RETRY
_CONDITIONALLY_RETRYABLE_ERROR_CODE: str = "RESOURCE_EXHAUSTED"

# HTTP status codes that are always safe to retry
_ALWAYS_RETRYABLE_STATUS_CODES: frozenset[int] = frozenset({409, 502, 503, 504})

# HTTP status code that is conditionally retryable (only with retry_after)
_CONDITIONALLY_RETRYABLE_STATUS_CODE: int = 429


class BaseClient:
Expand All @@ -71,7 +78,7 @@ def __init__(
# Resolve: arg > env > default
resolved_key: Optional[str] = api_key or os.environ.get(ENV_API_KEY)
if not resolved_key:
raise ValueError(
raise ValidationError(
"An API key must be provided via the 'api_key' argument "
f"or the {ENV_API_KEY} environment variable."
)
Expand Down Expand Up @@ -122,12 +129,68 @@ def _shouldRetry(
self,
status_code: int,
error_code: Optional[str] = None,
details: Optional[Any] = None,
details: Optional[Dict[str, Any]] = None,
) -> bool:
"""Decide whether a request should be retried."""
if error_code and error_code in _RETRYABLE_ERROR_CODES:
"""Decide whether a request should be retried.

Follows server-side retry semantics:
- ABORTED, UNAVAILABLE, DEADLINE_EXCEEDED → always retry
- RESOURCE_EXHAUSTED (429) → retry only if details.retry_after present
- All other errors → never retry
"""
if error_code:
if error_code in _ALWAYS_RETRYABLE_ERROR_CODES:
return True
if error_code == _CONDITIONALLY_RETRYABLE_ERROR_CODE:
return self._hasRetryAfter(details)
return False

# Fallback to status code when error_code is unavailable
if status_code in _ALWAYS_RETRYABLE_STATUS_CODES:
return True
return status_code in _RETRYABLE_STATUS_CODES
if status_code == _CONDITIONALLY_RETRYABLE_STATUS_CODE:
return self._hasRetryAfter(details)
return False

@staticmethod
def _hasRetryAfter(details: Optional[Dict[str, Any]]) -> bool:
"""Check if details contains a retry_after hint."""
if not isinstance(details, dict):
return False
retry_after: Any = details.get("retry_after")
return retry_after is not None

@staticmethod
def _extractRetryAfter(
error_body: Optional[Dict[str, Any]],
response: httpx.Response,
) -> Optional[float]:
"""Extract retry_after from the response body or Retry-After header.

The server puts retry_after in ``error.details.retry_after``.
Falls back to the HTTP ``Retry-After`` header.
"""
# Prefer body: error.details.retry_after
if isinstance(error_body, dict):
err_obj: Any = error_body.get("error", error_body)
if isinstance(err_obj, dict):
details: Any = err_obj.get("details")
if isinstance(details, dict):
raw: Any = details.get("retry_after")
if raw is not None:
try:
return float(raw)
except (ValueError, TypeError):
pass

# Fallback: HTTP Retry-After header
header_raw: Optional[str] = response.headers.get("retry-after")
if header_raw is not None:
try:
return float(header_raw)
except (ValueError, TypeError):
pass
return None

def _calculateRetryDelay(
self,
Expand Down Expand Up @@ -257,24 +320,24 @@ def _request(
response
)
error_code: Optional[str] = None
error_details: Optional[Dict[str, Any]] = None
if isinstance(error_body, dict):
err_obj: Any = error_body.get("error", error_body)
if isinstance(err_obj, dict):
error_code = err_obj.get("code")
raw_details: Any = err_obj.get("details")
if isinstance(raw_details, dict):
error_details = raw_details

if (
attempt < self.max_retries
and self._shouldRetry(response.status_code, error_code)
and self._shouldRetry(
response.status_code, error_code, error_details
)
):
retry_after_raw: Optional[str] = response.headers.get(
"retry-after"
retry_after_val: Optional[float] = self._extractRetryAfter(
error_body, response
)
retry_after_val: Optional[float] = None
if retry_after_raw:
try:
retry_after_val = float(retry_after_raw)
except (ValueError, TypeError):
pass
delay = self._calculateRetryDelay(attempt, retry_after_val)
_logger.warning(
"Retryable error %d on attempt %d/%d, retrying in %.1fs",
Expand Down Expand Up @@ -404,22 +467,24 @@ async def _request(

error_body: Optional[Dict[str, Any]] = self._parseErrorResponse(response)
error_code: Optional[str] = None
error_details: Optional[Dict[str, Any]] = None
if isinstance(error_body, dict):
err_obj: Any = error_body.get("error", error_body)
if isinstance(err_obj, dict):
error_code = err_obj.get("code")
raw_details: Any = err_obj.get("details")
if isinstance(raw_details, dict):
error_details = raw_details

if (
attempt < self.max_retries
and self._shouldRetry(response.status_code, error_code)
and self._shouldRetry(
response.status_code, error_code, error_details
)
):
retry_after_raw: Optional[str] = response.headers.get("retry-after")
retry_after_val: Optional[float] = None
if retry_after_raw:
try:
retry_after_val = float(retry_after_raw)
except (ValueError, TypeError):
pass
retry_after_val: Optional[float] = self._extractRetryAfter(
error_body, response
)
delay = self._calculateRetryDelay(attempt, retry_after_val)
_logger.warning(
"Retryable error %d on attempt %d/%d, retrying in %.1fs",
Expand Down
9 changes: 5 additions & 4 deletions src/knowhere/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from knowhere._base_client import AsyncAPIClient, SyncAPIClient
from knowhere._constants import DEFAULT_POLL_INTERVAL, DEFAULT_POLL_TIMEOUT
from knowhere._exceptions import ValidationError
from knowhere._logging import getLogger
from knowhere._types import (
PollProgressCallback,
Expand Down Expand Up @@ -94,9 +95,9 @@ def parse(
Provide exactly one of *url* or *file*.
"""
if url and file:
raise ValueError("Provide either 'url' or 'file', not both.")
raise ValidationError("Provide either 'url' or 'file', not both.")
if not url and file is None:
raise ValueError("Provide either 'url' or 'file'.")
raise ValidationError("Provide either 'url' or 'file'.")

# Determine source type and create job
if url:
Expand Down Expand Up @@ -196,9 +197,9 @@ async def parse(
) -> ParseResult:
"""Parse a document end-to-end (async version)."""
if url and file:
raise ValueError("Provide either 'url' or 'file', not both.")
raise ValidationError("Provide either 'url' or 'file', not both.")
if not url and file is None:
raise ValueError("Provide either 'url' or 'file'.")
raise ValidationError("Provide either 'url' or 'file'.")

if url:
job: Job = await self.jobs.create(
Expand Down
1 change: 1 addition & 0 deletions src/knowhere/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

# Retry configuration
DEFAULT_MAX_RETRIES: int = 5
DEFAULT_UPLOAD_MAX_RETRIES: int = 2

# Polling configuration
MAX_POLL_INTERVAL: float = 30.0
Expand Down
Loading