-
Notifications
You must be signed in to change notification settings - Fork 507
Enhance async functionality and logging in AgentOps SDK #1162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Review Summary🏷️ Draft Comments (18)
|
WalkthroughThis PR refactors the AgentOps client and SDK to support asynchronous workflows throughout the authentication, HTTP, and telemetry layers. Core API clients, HTTP handling, and validation logic are modernized to use async methods and aiohttp, with improved error handling and dynamic JWT token management. The telemetry exporter and tracing setup now support runtime token refresh. Tests and configuration are updated to align with the new async architecture, and dependency management is updated to include aiohttp. Changes
Sequence DiagramThis diagram shows the interactions between components: sequenceDiagram
title BaseApiClient Asynchronous HTTP Operations
participant Client as "Client Code"
participant BaseApiClient
participant HttpClient
Note over BaseApiClient: Changed from synchronous to<br/>asynchronous HTTP methods
%% GET request flow
Client->>BaseApiClient: async get(path, headers)
activate BaseApiClient
BaseApiClient->>BaseApiClient: _get_full_url(path)
BaseApiClient->>HttpClient: async_request(method="get", url, headers, timeout)
activate HttpClient
Note right of HttpClient: Replaced direct HTTP client<br/>with static HttpClient class
HttpClient-->>BaseApiClient: response_data (Dict or None)
deactivate HttpClient
BaseApiClient-->>Client: JSON response as dictionary
deactivate BaseApiClient
%% POST request flow
Client->>BaseApiClient: async post(path, data, headers)
activate BaseApiClient
BaseApiClient->>BaseApiClient: _get_full_url(path)
BaseApiClient->>HttpClient: async_request(method="post", url, data, headers, timeout)
activate HttpClient
HttpClient-->>BaseApiClient: response_data (Dict or None)
deactivate HttpClient
BaseApiClient-->>Client: JSON response as dictionary
deactivate BaseApiClient
%% Error handling
Client->>BaseApiClient: async any_http_method()
activate BaseApiClient
BaseApiClient->>HttpClient: async_request()
activate HttpClient
Note over HttpClient: Exception occurs
HttpClient-->>BaseApiClient: Exception
deactivate HttpClient
BaseApiClient-->>Client: Raise Exception with details
deactivate BaseApiClient
Note over Client, HttpClient: All HTTP methods (get, post, put, delete)<br/>now return Optional[Dict[str, Any]]<br/>instead of requests.Response
Note for WindsurfPlease change the default marketplace provider to the following in the windsurf settings:Marketplace Extension Gallery Service URL: https://marketplace.visualstudio.com/_apis/public/gallery Marketplace Gallery Item URL: https://marketplace.visualstudio.com/items Entelligence.ai can learn from your feedback. Simply add 👍 / 👎 emojis to teach it your preferences. More shortcuts belowEmoji Descriptions:
Interact with the Bot:
Also you can trigger various commands with the bot by doing The current supported commands are
More commands to be added soon. |
Review Summary🏷️ Draft Comments (19)
|
WalkthroughThis PR modernizes the AgentOps client and SDK by introducing asynchronous HTTP handling using aiohttp, refactoring authentication and telemetry flows to support dynamic JWT token management, and improving error handling and flexibility throughout. The update enhances compatibility with async applications, streamlines configuration and validation, and ensures robust, non-blocking authentication and telemetry export. Test and configuration files are updated to align with the new async-first architecture. Changes
Sequence DiagramThis diagram shows the interactions between components: sequenceDiagram
title BaseApiClient Asynchronous HTTP Operations
participant Client as "Client Code"
participant BaseApiClient
participant HttpClient
Note over BaseApiClient: Changed from synchronous to<br/>asynchronous HTTP methods
%% POST request flow
Client->>BaseApiClient: async post(path, data, headers)
activate BaseApiClient
BaseApiClient->>BaseApiClient: _get_full_url(path)
BaseApiClient->>HttpClient: async_request(method="post", url, data, headers, timeout)
activate HttpClient
Note right of HttpClient: Replaced direct HTTP client<br/>with HttpClient static method
HttpClient-->>BaseApiClient: response_data (Dict or None)
deactivate HttpClient
BaseApiClient-->>Client: JSON response as dictionary
deactivate BaseApiClient
%% GET request flow
Client->>BaseApiClient: async get(path, headers)
activate BaseApiClient
BaseApiClient->>BaseApiClient: _get_full_url(path)
BaseApiClient->>HttpClient: async_request(method="get", url, headers=headers, timeout)
activate HttpClient
HttpClient-->>BaseApiClient: response_data (Dict or None)
deactivate HttpClient
BaseApiClient-->>Client: JSON response as dictionary
deactivate BaseApiClient
%% Error handling
Client->>BaseApiClient: async any_http_method()
activate BaseApiClient
BaseApiClient->>HttpClient: async_request()
activate HttpClient
Note over HttpClient: Exception occurs
HttpClient-->>BaseApiClient: Exception
deactivate HttpClient
alt Exception handling
BaseApiClient-->>Client: Raise Exception with details
end
deactivate BaseApiClient
Note over Client, HttpClient: All HTTP methods (post, get, put, delete)<br/>now return Optional[Dict[str, Any]]<br/>instead of requests.Response
Note for WindsurfPlease change the default marketplace provider to the following in the windsurf settings:Marketplace Extension Gallery Service URL: https://marketplace.visualstudio.com/_apis/public/gallery Marketplace Gallery Item URL: https://marketplace.visualstudio.com/items Entelligence.ai can learn from your feedback. Simply add 👍 / 👎 emojis to teach it your preferences. More shortcuts belowEmoji Descriptions:
Interact with the Bot:
Also you can trigger various commands with the bot by doing The current supported commands are
More commands to be added soon. |
…ical headers from being overridden by user-supplied values. Update tests to verify protection of critical headers and ensure proper JWT token usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the AgentOps SDK by migrating from synchronous to asynchronous HTTP operations and improving authentication handling. The changes implement dynamic JWT token management, graceful error handling, and better logging throughout the system.
- Migrated HTTP and authentication flows to async using aiohttp with fallback support
- Refactored client and SDK for non-blocking JWT authentication with dynamic token updates
- Enhanced error handling to be more graceful, preventing crashes on authentication failures
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/test_validation.py | Updated tests to use synchronous wrapper and mock async functions |
| tests/unit/sdk/test_exporters.py | Enhanced security tests for header protection in OTLP exporter |
| pyproject.toml | Added aiohttp dependency for async HTTP functionality |
| agentops/validation.py | Migrated to async JWT token fetching with graceful error handling |
| agentops/sdk/exporters.py | Implemented dynamic JWT authentication and header security protection |
| agentops/sdk/core.py | Updated telemetry setup to support JWT providers and async operations |
| agentops/config.py | Changed API key validation to log warnings instead of throwing exceptions |
| agentops/client/http/http_client.py | Added async HTTP client methods with aiohttp support |
| agentops/client/client.py | Implemented async authentication task management |
| agentops/client/api/versions/v4.py | Enhanced V4 API client with better error handling |
| agentops/client/api/versions/v3.py | Migrated V3 authentication to async with graceful failures |
| agentops/client/api/base.py | Converted base API client methods to async |
|
|
||
| @patch("agentops.validation.requests.post") | ||
| def test_get_jwt_token_success(self, mock_post): | ||
| @patch("tests.unit.test_validation.get_jwt_token_sync") |
Copilot
AI
Jul 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The patch path 'tests.unit.test_validation.get_jwt_token_sync' is incorrect. It should patch 'agentops.validation.get_jwt_token_sync' since that's where the function is defined and imported from.
| @patch("tests.unit.test_validation.get_jwt_token_sync") | |
| @patch("agentops.validation.get_jwt_token_sync") |
|
|
||
| @patch("agentops.validation.requests.post") | ||
| def test_get_jwt_token_failure(self, mock_post): | ||
| @patch("tests.unit.test_validation.get_jwt_token_sync") |
Copilot
AI
Jul 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The patch path 'tests.unit.test_validation.get_jwt_token_sync' is incorrect. It should patch 'agentops.validation.get_jwt_token_sync' since that's where the function is defined and imported from.
| @patch("tests.unit.test_validation.get_jwt_token_sync") | |
| @patch("agentops.validation.get_jwt_token_sync") |
| @patch("agentops.get_client") | ||
| @patch("agentops.validation.requests.post") | ||
| def test_get_jwt_token_from_env(self, mock_post, mock_get_client, mock_getenv): | ||
| @patch("tests.unit.test_validation.get_jwt_token_sync") |
Copilot
AI
Jul 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The patch path 'tests.unit.test_validation.get_jwt_token_sync' is incorrect. It should patch 'agentops.validation.get_jwt_token_sync' since that's where the function is defined and imported from.
| @patch("tests.unit.test_validation.get_jwt_token_sync") | |
| @patch("agentops.validation.get_jwt_token_sync") |
| def get_metrics_headers(): | ||
| token = jwt_provider() if jwt_provider else None | ||
| return {"Authorization": f"Bearer {token}"} if token else {} | ||
|
|
||
| metric_exporter = OTLPMetricExporter(endpoint=metrics_endpoint, headers=get_metrics_headers()) | ||
|
|
Copilot
AI
Jul 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The headers are computed once at initialization but the JWT token may change over time. The OTLPMetricExporter should receive a dynamic header provider or be updated to use the same pattern as the AuthenticatedOTLPExporter.
| def get_metrics_headers(): | |
| token = jwt_provider() if jwt_provider else None | |
| return {"Authorization": f"Bearer {token}"} if token else {} | |
| metric_exporter = OTLPMetricExporter(endpoint=metrics_endpoint, headers=get_metrics_headers()) | |
| class DynamicHeaderOTLPMetricExporter(OTLPMetricExporter): | |
| def __init__(self, endpoint: str, jwt_provider: Optional[Callable[[], Optional[str]]]): | |
| self._jwt_provider = jwt_provider | |
| super().__init__(endpoint=endpoint, headers={}) | |
| def _update_headers(self): | |
| token = self._jwt_provider() if self._jwt_provider else None | |
| self._headers = {"Authorization": f"Bearer {token}"} if token else {} | |
| def export(self, *args, **kwargs): | |
| self._update_headers() | |
| return super().export(*args, **kwargs) | |
| metric_exporter = DynamicHeaderOTLPMetricExporter(endpoint=metrics_endpoint, jwt_provider=jwt_provider) |
| # Close the old session if it exists but is closed | ||
| if cls._async_session is not None and cls._async_session.closed: | ||
| cls._async_session = None | ||
|
|
||
| # Create connector with connection pooling | ||
| connector = aiohttp.TCPConnector( | ||
| limit=100, # Total connection pool size | ||
| limit_per_host=30, # Per-host connection limit | ||
| ttl_dns_cache=300, # DNS cache TTL | ||
| use_dns_cache=True, | ||
| enable_cleanup_closed=True, | ||
| ) | ||
|
|
||
| # Create session with default headers | ||
| headers = { | ||
| "Content-Type": "application/json", | ||
| "User-Agent": f"agentops-python/{get_agentops_version() or 'unknown'}", | ||
| } | ||
|
|
||
| cls._async_session = aiohttp.ClientSession( | ||
| connector=connector, headers=headers, timeout=aiohttp.ClientTimeout(total=30) | ||
| ) |
Copilot
AI
Jul 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The session check and creation is not thread-safe. Multiple threads could simultaneously create sessions, potentially causing resource leaks. Consider using the same locking pattern as get_session().
| # Close the old session if it exists but is closed | |
| if cls._async_session is not None and cls._async_session.closed: | |
| cls._async_session = None | |
| # Create connector with connection pooling | |
| connector = aiohttp.TCPConnector( | |
| limit=100, # Total connection pool size | |
| limit_per_host=30, # Per-host connection limit | |
| ttl_dns_cache=300, # DNS cache TTL | |
| use_dns_cache=True, | |
| enable_cleanup_closed=True, | |
| ) | |
| # Create session with default headers | |
| headers = { | |
| "Content-Type": "application/json", | |
| "User-Agent": f"agentops-python/{get_agentops_version() or 'unknown'}", | |
| } | |
| cls._async_session = aiohttp.ClientSession( | |
| connector=connector, headers=headers, timeout=aiohttp.ClientTimeout(total=30) | |
| ) | |
| with cls._async_session_lock: | |
| if cls._async_session is None or cls._async_session.closed: # Double-check locking | |
| # Close the old session if it exists but is closed | |
| if cls._async_session is not None and cls._async_session.closed: | |
| cls._async_session = None | |
| # Create connector with connection pooling | |
| connector = aiohttp.TCPConnector( | |
| limit=100, # Total connection pool size | |
| limit_per_host=30, # Per-host connection limit | |
| ttl_dns_cache=300, # DNS cache TTL | |
| use_dns_cache=True, | |
| enable_cleanup_closed=True, | |
| ) | |
| # Create session with default headers | |
| headers = { | |
| "Content-Type": "application/json", | |
| "User-Agent": f"agentops-python/{get_agentops_version() or 'unknown'}", | |
| } | |
| cls._async_session = aiohttp.ClientSession( | |
| connector=connector, headers=headers, timeout=aiohttp.ClientTimeout(total=30) | |
| ) |
| if self._auth_task and not self._auth_task.done(): | ||
| return # Task already running | ||
|
|
||
| try: | ||
| loop = asyncio.get_event_loop() | ||
| if loop.is_running(): | ||
| # Use existing event loop | ||
| self._auth_task = loop.create_task(self._fetch_auth_async(api_key)) | ||
| else: |
Copilot
AI
Jul 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check for auth task status is not thread-safe. Multiple threads could pass the check simultaneously and create multiple auth tasks. Consider adding proper locking around this check.
| if self._auth_task and not self._auth_task.done(): | |
| return # Task already running | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| # Use existing event loop | |
| self._auth_task = loop.create_task(self._fetch_auth_async(api_key)) | |
| else: | |
| with self._auth_lock: | |
| if self._auth_task and not self._auth_task.done(): | |
| return # Task already running | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| # Use existing event loop | |
| self._auth_task = loop.create_task(self._fetch_auth_async(api_key)) | |
| else: | |
| # Create new event loop in background thread | |
| def run_async_auth(): | |
| asyncio.run(self._fetch_auth_async(api_key)) | |
| import threading | |
| auth_thread = threading.Thread(target=run_async_auth, daemon=True) | |
| auth_thread.start() | |
| except RuntimeError: |
* Enhance async functionality and logging in AgentOps SDK * Enhance LLM span validation and improve V4 API client functionality * Refactor header handling in AuthenticatedOTLPExporter to prevent critical headers from being overridden by user-supplied values. Update tests to verify protection of critical headers and ensure proper JWT token usage.
EntelligenceAI PR Summary