Skip to content

Commit 651bd29

Browse files
authored
Add registry client for interacting with the public dataset registry (#20)
* registry: Implement Registry API client infrastructure Add comprehensive Registry API client for dataset discovery, search, and publishing. The Registry is an external service for storing and sharing dataset manifests. Components: - RegistryClient: HTTP client with authentication support - RegistryDatasetsClient: Dataset operations (read/write) - Custom error handling with error code mapping - Pydantic models generated from OpenAPI spec - Custom datetime validators for API timestamp format Key features: - Dataset search (full-text and AI semantic) - Dataset browsing and version management - Manifest fetching - Publishing capabilities (requires auth) - Metadata management * client: Integrate registry client with unified Client class * tests: Add integration tests for registry API * docs: Registry client guide * fix(client): Remove duplicated auth resolution logic - AdminClient: Pass auth=True or auth_token directly (was duplicating the if/elif/else chain) - RegistryClient: Use get_token() callable for current token value * feat(registry): Add auto-refreshing auth token support Update RegistryClient to support the same auth patterns as AdminClient
1 parent 84475e1 commit 651bd29

File tree

14 files changed

+2444
-10
lines changed

14 files changed

+2444
-10
lines changed

docs/registry-guide.md

Lines changed: 485 additions & 0 deletions
Large diffs are not rendered by default.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ select = [
142142
]
143143
exclude = [
144144
"src/amp/FlightSql_pb2.py", # Generated protobuf file
145+
"src/amp/registry/models.py", # Generated from OpenAPI spec
145146
"*notebook*"
146147
]
147148

src/amp/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Amp - Flight SQL client with comprehensive data loading capabilities."""
22

33
from amp.client import Client, QueryBuilder
4+
from amp.registry import RegistryClient
45

5-
__all__ = ['Client', 'QueryBuilder']
6+
__all__ = ['Client', 'QueryBuilder', 'RegistryClient']

src/amp/client.py

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,13 @@ class Client:
265265
"""Enhanced Flight SQL client with data loading capabilities.
266266
267267
Supports both query operations (via Flight SQL) and optional admin operations
268-
(via HTTP Admin API).
268+
(via HTTP Admin API) and registry operations (via Registry API).
269269
270270
Args:
271271
url: Flight SQL URL (for backward compatibility, treated as query_url)
272272
query_url: Query endpoint URL via Flight SQL (e.g., 'grpc://localhost:1602')
273273
admin_url: Optional Admin API URL (e.g., 'http://localhost:8080')
274+
registry_url: Optional Registry API URL (default: staging registry)
274275
auth_token: Optional Bearer token for authentication (highest priority)
275276
auth: If True, load auth token from ~/.amp/cache (shared with TS CLI)
276277
@@ -293,13 +294,23 @@ class Client:
293294
>>> # Client with auth from environment variable
294295
>>> # export AMP_AUTH_TOKEN="eyJhbGci..."
295296
>>> client = Client(query_url='grpc://localhost:1602')
297+
>>>
298+
>>> # Client with registry support
299+
>>> client = Client(
300+
... query_url='grpc://localhost:1602',
301+
... admin_url='http://localhost:8080',
302+
... registry_url='https://api.registry.amp.staging.thegraph.com',
303+
... auth=True
304+
... )
305+
>>> results = client.registry.datasets.search('ethereum')
296306
"""
297307

298308
def __init__(
299309
self,
300310
url: Optional[str] = None,
301311
query_url: Optional[str] = None,
302312
admin_url: Optional[str] = None,
313+
registry_url: str = 'https://api.registry.amp.staging.thegraph.com',
303314
auth_token: Optional[str] = None,
304315
auth: bool = False,
305316
):
@@ -346,21 +357,40 @@ def get_token():
346357
if admin_url:
347358
from amp.admin.client import AdminClient
348359

349-
# Pass auth=True if we have a get_token callable from auth file
350-
# Otherwise pass the static token if available
360+
# Pass through auth parameters to AdminClient so it can set up its own auth
361+
# (AdminClient needs to manage its own AuthService for token refresh)
351362
if auth:
352-
# Use auth file (auto-refreshing)
363+
# Use auth file - AdminClient will set up AuthService for auto-refresh
353364
self._admin_client = AdminClient(admin_url, auth=True)
354365
elif auth_token or os.getenv('AMP_AUTH_TOKEN'):
355-
# Use static token
356-
static_token = auth_token or os.getenv('AMP_AUTH_TOKEN')
357-
self._admin_client = AdminClient(admin_url, auth_token=static_token)
366+
# Use static token (explicit param takes priority)
367+
token = auth_token or os.getenv('AMP_AUTH_TOKEN')
368+
self._admin_client = AdminClient(admin_url, auth_token=token)
358369
else:
359-
# No auth
370+
# No authentication
360371
self._admin_client = AdminClient(admin_url)
361372
else:
362373
self._admin_client = None
363374

375+
# Initialize optional Registry API client
376+
if registry_url:
377+
from amp.registry import RegistryClient
378+
379+
# Pass through auth parameters to RegistryClient so it can set up its own auth
380+
# (RegistryClient needs to manage its own AuthService for token refresh)
381+
if auth:
382+
# Use auth file - RegistryClient will set up AuthService for auto-refresh
383+
self._registry_client = RegistryClient(registry_url, auth=True)
384+
elif auth_token or os.getenv('AMP_AUTH_TOKEN'):
385+
# Use static token (explicit param takes priority)
386+
token = auth_token or os.getenv('AMP_AUTH_TOKEN')
387+
self._registry_client = RegistryClient(registry_url, auth_token=token)
388+
else:
389+
# No authentication
390+
self._registry_client = RegistryClient(registry_url)
391+
else:
392+
self._registry_client = None
393+
364394
def sql(self, query: str) -> QueryBuilder:
365395
"""
366396
Create a chainable query builder
@@ -460,6 +490,35 @@ def schema(self):
460490
)
461491
return self._admin_client.schema
462492

493+
@property
494+
def registry(self):
495+
"""Access registry client for Registry API operations.
496+
497+
Returns:
498+
RegistryClient for dataset discovery, search, and publishing
499+
500+
Raises:
501+
ValueError: If registry_url was not provided during Client initialization
502+
503+
Example:
504+
>>> client = Client(
505+
... query_url='grpc://localhost:1602',
506+
... registry_url='https://api.registry.amp.staging.thegraph.com'
507+
... )
508+
>>> # Search for datasets
509+
>>> results = client.registry.datasets.search('ethereum blocks')
510+
>>> # Get a specific dataset
511+
>>> dataset = client.registry.datasets.get('graphops', 'ethereum-mainnet')
512+
>>> # Fetch manifest
513+
>>> manifest = client.registry.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest')
514+
"""
515+
if not self._registry_client:
516+
raise ValueError(
517+
'Registry API not configured. Provide registry_url parameter to Client() '
518+
'to enable dataset discovery and search operations.'
519+
)
520+
return self._registry_client
521+
463522
# Existing methods for backward compatibility
464523
def get_sql(self, query, read_all=False):
465524
"""Execute SQL query and return Arrow data"""

src/amp/registry/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""Amp Registry API client.
2+
3+
The Registry provides dataset discovery, search, and publishing capabilities.
4+
5+
Example:
6+
>>> from amp.registry import RegistryClient
7+
>>>
8+
>>> # Read-only operations
9+
>>> client = RegistryClient()
10+
>>> results = client.datasets.search('ethereum blocks')
11+
>>> for dataset in results.datasets:
12+
... print(f"{dataset.namespace}/{dataset.name} - Score: {dataset.score}")
13+
>>>
14+
>>> # Get a specific dataset
15+
>>> dataset = client.datasets.get('graphops', 'ethereum-mainnet')
16+
>>> manifest = client.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest')
17+
>>>
18+
>>> # Authenticated operations
19+
>>> client = RegistryClient(auth_token='your-token')
20+
>>> client.datasets.publish(...)
21+
"""
22+
23+
from . import errors, models
24+
from .client import RegistryClient
25+
from .datasets import RegistryDatasetsClient
26+
27+
__all__ = ['RegistryClient', 'RegistryDatasetsClient', 'errors', 'models']

src/amp/registry/client.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""Registry API client."""
2+
3+
import logging
4+
import os
5+
from typing import Optional
6+
7+
import httpx
8+
9+
from . import errors
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class RegistryClient:
15+
"""Client for interacting with the Amp Registry API.
16+
17+
The Registry API provides dataset discovery, search, and publishing capabilities.
18+
19+
Args:
20+
base_url: Base URL for the Registry API (default: staging registry)
21+
auth_token: Optional Bearer token for authenticated operations (highest priority)
22+
auth: If True, load auth token from ~/.amp/cache (shared with TS CLI)
23+
24+
Authentication Priority (highest to lowest):
25+
1. Explicit auth_token parameter
26+
2. AMP_AUTH_TOKEN environment variable
27+
3. auth=True - reads from ~/.amp/cache/amp_cli_auth
28+
29+
Example:
30+
>>> # Read-only operations (no auth required)
31+
>>> client = RegistryClient()
32+
>>> datasets = client.datasets.search('ethereum')
33+
>>>
34+
>>> # Authenticated operations with explicit token
35+
>>> client = RegistryClient(auth_token='your-token')
36+
>>> client.datasets.publish(...)
37+
>>>
38+
>>> # Authenticated operations with auth file (auto-refresh)
39+
>>> client = RegistryClient(auth=True)
40+
>>> client.datasets.publish(...)
41+
"""
42+
43+
def __init__(
44+
self,
45+
base_url: str = 'https://api.registry.amp.staging.thegraph.com',
46+
auth_token: Optional[str] = None,
47+
auth: bool = False,
48+
):
49+
"""Initialize Registry client.
50+
51+
Args:
52+
base_url: Base URL for the Registry API
53+
auth_token: Optional Bearer token for authentication
54+
auth: If True, load auth token from ~/.amp/cache
55+
56+
Raises:
57+
ValueError: If both auth=True and auth_token are provided
58+
"""
59+
if auth and auth_token:
60+
raise ValueError('Cannot specify both auth=True and auth_token. Choose one authentication method.')
61+
62+
self.base_url = base_url.rstrip('/')
63+
64+
# Resolve auth token provider with priority: explicit param > env var > auth file
65+
self._get_token = None
66+
if auth_token:
67+
# Priority 1: Explicit auth_token parameter (static token)
68+
def get_token():
69+
return auth_token
70+
71+
self._get_token = get_token
72+
elif os.getenv('AMP_AUTH_TOKEN'):
73+
# Priority 2: AMP_AUTH_TOKEN environment variable (static token)
74+
env_token = os.getenv('AMP_AUTH_TOKEN')
75+
76+
def get_token():
77+
return env_token
78+
79+
self._get_token = get_token
80+
elif auth:
81+
# Priority 3: Load from ~/.amp/cache/amp_cli_auth (auto-refreshing)
82+
from amp.auth import AuthService
83+
84+
auth_service = AuthService()
85+
self._get_token = auth_service.get_token # Callable that auto-refreshes
86+
87+
# Create HTTP client (no auth header yet - will be added per-request)
88+
self._http = httpx.Client(
89+
base_url=self.base_url,
90+
headers={
91+
'Content-Type': 'application/json',
92+
'Accept': 'application/json',
93+
},
94+
timeout=30.0,
95+
)
96+
97+
logger.info(f'Initialized Registry client for {base_url}')
98+
99+
@property
100+
def datasets(self):
101+
"""Access the datasets client.
102+
103+
Returns:
104+
RegistryDatasetsClient: Client for dataset operations
105+
"""
106+
from .datasets import RegistryDatasetsClient
107+
108+
return RegistryDatasetsClient(self)
109+
110+
def _request(
111+
self,
112+
method: str,
113+
path: str,
114+
**kwargs,
115+
) -> httpx.Response:
116+
"""Make an HTTP request to the Registry API.
117+
118+
Args:
119+
method: HTTP method (GET, POST, etc.)
120+
path: API path (without base URL)
121+
**kwargs: Additional arguments to pass to httpx
122+
123+
Returns:
124+
httpx.Response: HTTP response
125+
126+
Raises:
127+
RegistryError: If the request fails
128+
"""
129+
url = path if path.startswith('http') else f'{self.base_url}{path}'
130+
131+
# Add auth header dynamically (auto-refreshes if needed)
132+
headers = kwargs.get('headers', {})
133+
if self._get_token:
134+
headers['Authorization'] = f'Bearer {self._get_token()}'
135+
kwargs['headers'] = headers
136+
137+
try:
138+
response = self._http.request(method, url, **kwargs)
139+
140+
# Handle error responses
141+
if response.status_code >= 400:
142+
self._handle_error(response)
143+
144+
return response
145+
146+
except httpx.RequestError as e:
147+
raise errors.RegistryError(f'Request failed: {e}') from e
148+
149+
def _handle_error(self, response: httpx.Response) -> None:
150+
"""Handle error responses from the API.
151+
152+
Args:
153+
response: HTTP error response
154+
155+
Raises:
156+
RegistryError: Mapped exception for the error
157+
"""
158+
try:
159+
error_data = response.json()
160+
error_code = error_data.get('error_code', '')
161+
error_message = error_data.get('error_message', response.text)
162+
request_id = error_data.get('request_id', '')
163+
164+
# Map to specific exception
165+
raise errors.map_error(error_code, error_message, request_id)
166+
167+
except (ValueError, KeyError):
168+
# Couldn't parse error response, raise generic error
169+
raise errors.RegistryError(
170+
f'HTTP {response.status_code}: {response.text}',
171+
error_code=str(response.status_code),
172+
) from None
173+
174+
def close(self):
175+
"""Close the HTTP client."""
176+
self._http.close()
177+
178+
def __enter__(self):
179+
"""Context manager entry."""
180+
return self
181+
182+
def __exit__(self, exc_type, exc_val, exc_tb):
183+
"""Context manager exit."""
184+
self.close()

0 commit comments

Comments
 (0)