Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 27 additions & 28 deletions agentops/client/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ def __call__(self, api_key: str) -> str:

class BaseApiClient:
"""
Base class for API communication with connection pooling.
Base class for API communication with async HTTP methods.

This class provides the core HTTP functionality without authentication.
It should be used for APIs that don't require authentication.
All HTTP methods are asynchronous.
"""

def __init__(self, endpoint: str):
Expand Down Expand Up @@ -72,16 +72,16 @@ def _get_full_url(self, path: str) -> str:
"""
return f"{self.endpoint}{path}"

def request(
async def async_request(
self,
method: str,
path: str,
data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: int = 30,
) -> requests.Response:
) -> Optional[Dict[str, Any]]:
"""
Make a generic HTTP request
Make a generic async HTTP request

Args:
method: HTTP method (e.g., 'get', 'post', 'put', 'delete')
Expand All @@ -91,72 +91,71 @@ def request(
timeout: Request timeout in seconds

Returns:
Response from the API
JSON response as dictionary, or None if request failed

Raises:
Exception: If the request fails
"""
url = self._get_full_url(path)

try:
response = self.http_client.request(method=method, url=url, data=data, headers=headers, timeout=timeout)

self.last_response = response
return response
except requests.RequestException as e:
self.last_response = None
response_data = await self.http_client.async_request(
method=method, url=url, data=data, headers=headers, timeout=timeout
)
return response_data
except Exception as e:
raise Exception(f"{method.upper()} request failed: {str(e)}") from e

def post(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
async def post(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""
Make POST request
Make async POST request

Args:
path: API endpoint path
data: Request payload
headers: Request headers

Returns:
Response from the API
JSON response as dictionary, or None if request failed
"""
return self.request("post", path, data=data, headers=headers)
return await self.async_request("post", path, data=data, headers=headers)

def get(self, path: str, headers: Dict[str, str]) -> requests.Response:
async def get(self, path: str, headers: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""
Make GET request
Make async GET request

Args:
path: API endpoint path
headers: Request headers

Returns:
Response from the API
JSON response as dictionary, or None if request failed
"""
return self.request("get", path, headers=headers)
return await self.async_request("get", path, headers=headers)

def put(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
async def put(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""
Make PUT request
Make async PUT request

Args:
path: API endpoint path
data: Request payload
headers: Request headers

Returns:
Response from the API
JSON response as dictionary, or None if request failed
"""
return self.request("put", path, data=data, headers=headers)
return await self.async_request("put", path, data=data, headers=headers)

def delete(self, path: str, headers: Dict[str, str]) -> requests.Response:
async def delete(self, path: str, headers: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""
Make DELETE request
Make async DELETE request

Args:
path: API endpoint path
headers: Request headers

Returns:
Response from the API
JSON response as dictionary, or None if request failed
"""
return self.request("delete", path, headers=headers)
return await self.async_request("delete", path, headers=headers)
54 changes: 29 additions & 25 deletions agentops/client/api/versions/v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from agentops.client.api.base import BaseApiClient
from agentops.client.api.types import AuthTokenResponse
from agentops.exceptions import ApiServerException
from agentops.client.http.http_client import HttpClient
from agentops.logging import logger
from termcolor import colored

Expand All @@ -24,42 +24,46 @@ def __init__(self, endpoint: str):
# Set up with V3-specific auth endpoint
super().__init__(endpoint)

def fetch_auth_token(self, api_key: str) -> AuthTokenResponse:
path = "/v3/auth/token"
data = {"api_key": api_key}
headers = self.prepare_headers()

r = self.post(path, data, headers)
async def fetch_auth_token(self, api_key: str) -> AuthTokenResponse:
"""
Asynchronously fetch authentication token.

if r.status_code != 200:
error_msg = f"Authentication failed: {r.status_code}"
try:
error_data = r.json()
if "error" in error_data:
error_msg = f"{error_data['error']}"
except Exception:
pass
logger.error(f"{error_msg} - Perhaps an invalid API key?")
raise ApiServerException(error_msg)
Args:
api_key: The API key to authenticate with

Returns:
AuthTokenResponse containing token and project information, or None if failed
"""
try:
jr = r.json()
token = jr.get("token")
path = "/v3/auth/token"
data = {"api_key": api_key}
headers = self.prepare_headers()

# Build full URL
url = self._get_full_url(path)

# Make async request
response_data = await HttpClient.async_request(
method="POST", url=url, data=data, headers=headers, timeout=30
)

token = response_data.get("token")
if not token:
raise ApiServerException("No token in authentication response")
logger.warning("Authentication failed: Perhaps an invalid API key?")
return None

# Check project premium status
if jr.get("project_prem_status") != "pro":
if response_data.get("project_prem_status") != "pro":
logger.info(
colored(
"\x1b[34mYou're on the agentops free plan 🤔\x1b[0m",
"blue",
)
)

return jr
except Exception as e:
logger.error(f"Failed to process authentication response: {str(e)}")
raise ApiServerException(f"Failed to process authentication response: {str(e)}")
return response_data

except Exception:
return None

# Add V3-specific API methods here
133 changes: 87 additions & 46 deletions agentops/client/api/versions/v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
This module provides the client for the V4 version of the AgentOps API.
"""

from typing import Optional, Union, Dict
from typing import Optional, Union, Dict, Any

import requests
from agentops.client.api.base import BaseApiClient
from agentops.client.http.http_client import HttpClient
from agentops.exceptions import ApiServerException
from agentops.client.api.types import UploadedObjectResponse
from agentops.helpers.version import get_agentops_version


class V4Client(BaseApiClient):
"""Client for the AgentOps V4 API"""

auth_token: str
def __init__(self, endpoint: str):
"""Initialize the V4 API client."""
super().__init__(endpoint)
self.auth_token: Optional[str] = None

def set_auth_token(self, token: str):
"""
Expand All @@ -36,69 +40,106 @@ def prepare_headers(self, custom_headers: Optional[Dict[str, str]] = None) -> Di
Headers dictionary with standard headers and any custom headers
"""
headers = {
"Authorization": f"Bearer {self.auth_token}",
"User-Agent": f"agentops-python/{get_agentops_version() or 'unknown'}",
}

# Only add Authorization header if we have a token
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"

if custom_headers:
headers.update(custom_headers)
return headers

def upload_object(self, body: Union[str, bytes]) -> UploadedObjectResponse:
def post(self, path: str, body: Union[str, bytes], headers: Optional[Dict[str, str]] = None) -> requests.Response:
"""
Upload an object to the API and return the response.
Make a POST request to the V4 API.

Args:
body: The object to upload, either as a string or bytes.
path: The API path to POST to
body: The request body (string or bytes)
headers: Optional headers to include

Returns:
UploadedObjectResponse: The response from the API after upload.
The response object
"""
if isinstance(body, bytes):
body = body.decode("utf-8")
url = self._get_full_url(path)
request_headers = headers or self.prepare_headers()

response = self.post("/v4/objects/upload/", body, self.prepare_headers())
return HttpClient.get_session().post(url, json={"body": body}, headers=request_headers, timeout=30)

if response.status_code != 200:
error_msg = f"Upload failed: {response.status_code}"
try:
error_data = response.json()
if "error" in error_data:
error_msg = error_data["error"]
except Exception:
pass
raise ApiServerException(error_msg)
def upload_object(self, body: Union[str, bytes]) -> Dict[str, Any]:
"""
Upload an object to the V4 API.

Args:
body: The object body to upload

Returns:
Dictionary containing upload response data

Raises:
ApiServerException: If the upload fails
"""
try:
response_data = response.json()
return UploadedObjectResponse(**response_data)
except Exception as e:
raise ApiServerException(f"Failed to process upload response: {str(e)}")
# Convert bytes to string for consistency with test expectations
if isinstance(body, bytes):
body = body.decode("utf-8")

response = self.post("/v4/objects/upload/", body, self.prepare_headers())

if response.status_code != 200:
error_msg = f"Upload failed: {response.status_code}"
try:
error_data = response.json()
if "error" in error_data:
error_msg = error_data["error"]
except:
pass
raise ApiServerException(error_msg)

try:
return response.json()
except Exception as e:
raise ApiServerException(f"Failed to process upload response: {str(e)}")
except requests.exceptions.RequestException as e:
raise ApiServerException(f"Failed to upload object: {e}")

def upload_logfile(self, body: Union[str, bytes], trace_id: int) -> UploadedObjectResponse:
def upload_logfile(self, body: Union[str, bytes], trace_id: str) -> Dict[str, Any]:
"""
Upload an log file to the API and return the response.
Upload a logfile to the V4 API.

Args:
body: The log file to upload, either as a string or bytes.
body: The logfile content to upload
trace_id: The trace ID associated with the logfile

Returns:
UploadedObjectResponse: The response from the API after upload.
"""
if isinstance(body, bytes):
body = body.decode("utf-8")
Dictionary containing upload response data

response = self.post("/v4/logs/upload/", body, {**self.prepare_headers(), "Trace-Id": str(trace_id)})
Raises:
ApiServerException: If the upload fails
"""
try:
# Convert bytes to string for consistency with test expectations
if isinstance(body, bytes):
body = body.decode("utf-8")

headers = {**self.prepare_headers(), "Trace-Id": str(trace_id)}
response = self.post("/v4/logs/upload/", body, headers)

if response.status_code != 200:
error_msg = f"Upload failed: {response.status_code}"
try:
error_data = response.json()
if "error" in error_data:
error_msg = error_data["error"]
except:
pass
raise ApiServerException(error_msg)

if response.status_code != 200:
error_msg = f"Upload failed: {response.status_code}"
try:
error_data = response.json()
if "error" in error_data:
error_msg = error_data["error"]
except Exception:
pass
raise ApiServerException(error_msg)

try:
response_data = response.json()
return UploadedObjectResponse(**response_data)
except Exception as e:
raise ApiServerException(f"Failed to process upload response: {str(e)}")
return response.json()
except Exception as e:
raise ApiServerException(f"Failed to process upload response: {str(e)}")
except requests.exceptions.RequestException as e:
raise ApiServerException(f"Failed to upload logfile: {e}")
Loading
Loading