diff --git a/docs/registry-guide.md b/docs/registry-guide.md new file mode 100644 index 0000000..cd98cb0 --- /dev/null +++ b/docs/registry-guide.md @@ -0,0 +1,485 @@ +# Registry API Guide + +The Amp Registry is a public catalog for discovering and sharing dataset manifests. This guide covers how to search for datasets, fetch manifests, and publish your own datasets. + +## Quick Start + +### Read-Only Operations (No Authentication Required) + +```python +from amp.registry import RegistryClient + +# Create registry client +registry = RegistryClient() + +# Search for datasets +results = registry.datasets.search('ethereum blocks', limit=10) +for dataset in results.datasets: + print(f'[{dataset.score:.2f}] {dataset.namespace}/{dataset.name}') + print(f' {dataset.description}') +``` + +### Using with Unified Client + +```python +from amp import Client + +# Initialize with all three APIs +client = Client( + query_url='grpc://localhost:1602', # Flight SQL queries + admin_url='http://localhost:8080', # Admin operations + registry_url='https://api.registry.amp.staging.thegraph.com', # Registry (default) + auth=True # Use ~/.amp-cli-config for authentication +) + +# Search registry +results = client.registry.datasets.search('ethereum') + +# Deploy found dataset to local node +dataset = results.datasets[0] +manifest = client.registry.datasets.get_manifest( + dataset.namespace, + dataset.name, + dataset.latest_version.version_tag +) + +client.admin.datasets.register( + namespace=dataset.namespace, + name=dataset.name, + revision=dataset.latest_version.version_tag, + manifest=manifest +) +``` + +## Discovery: Finding Datasets + +### List All Datasets + +```python +# Get first page of datasets +response = registry.datasets.list(limit=50, page=1) + +print(f'Total datasets: {response.total_count}') +print(f'Has more pages: {response.has_next_page}') + +for dataset in response.datasets: + print(f'{dataset.namespace}/{dataset.name} (v{dataset.latest_version.version_tag})') +``` + +### Full-Text Search + +Search by keywords in dataset names, descriptions, tags, and chains: + +```python +# Search with keyword +results = registry.datasets.search('ethereum blocks', limit=10) + +# Results are ranked by relevance +for dataset in results.datasets: + print(f'Score: {dataset.score:.2f}') + print(f'Dataset: {dataset.namespace}/{dataset.name}') + print(f'Description: {dataset.description}') + print() +``` + +### AI Semantic Search + +Use natural language queries for semantic matching (requires OpenAI configuration): + +```python +try: + # Natural language query + results = registry.datasets.ai_search( + 'find NFT transfer and marketplace data', + limit=5 + ) + + for dataset in results: + print(f'[{dataset.score:.2f}] {dataset.namespace}/{dataset.name}') +except Exception as e: + print(f'AI search not available: {e}') +``` + +## Exploring Datasets + +### Get Dataset Details + +```python +# Get full dataset information +dataset = registry.datasets.get('edgeandnode', 'ethereum_mainnet') + +print(f'Name: {dataset.name}') +print(f'Namespace: {dataset.namespace}') +print(f'Latest version: {dataset.latest_version.version_tag}') +print(f'Visibility: {dataset.visibility}') +print(f'Description: {dataset.description}') +print(f'Tags: {dataset.keywords}') +print(f'Chains: {dataset.indexing_chains}') +``` + +### List Versions + +```python +# Get all versions of a dataset +versions = registry.datasets.list_versions('edgeandnode', 'ethereum_mainnet') + +for version in versions: + print(f'v{version.version_tag} - {version.status}') + print(f' Created: {version.created_at}') + print(f' Changelog: {version.changelog}') +``` + +### Get Specific Version + +```python +# Get details of a specific version +version = registry.datasets.get_version( + 'edgeandnode', + 'ethereum_mainnet', + '1.0.0' +) + +print(f'Version: {version.version_tag}') +print(f'Status: {version.status}') +print(f'Dataset reference: {version.dataset_reference}') +``` + +### Fetch Manifest + +```python +# Get manifest for latest version +dataset = registry.datasets.get('edgeandnode', 'ethereum_mainnet') +manifest = registry.datasets.get_manifest( + dataset.namespace, + dataset.name, + dataset.latest_version.version_tag +) + +print(f'Kind: {manifest.get("kind")}') +print(f'Tables: {list(manifest.get("tables", {}).keys())}') +print(f'Dependencies: {list(manifest.get("dependencies", {}).keys())}') +``` + +## Publishing Datasets + +Publishing requires authentication. Set up your auth token: + +```python +# Option 1: Use existing auth from ~/.amp-cli-config +from amp import Client +client = Client(auth=True) + +# Option 2: Provide token directly +from amp.registry import RegistryClient +registry = RegistryClient(auth_token='your-token-here') +``` + +### Publish New Dataset + +```python +# Prepare your manifest +manifest = { + 'kind': 'manifest', + 'network': 'mainnet', + 'dependencies': {}, + 'tables': { + 'my_table': { + 'input': { + 'sql': 'SELECT * FROM source_table' + }, + 'schema': { + 'arrow': { + 'fields': [ + {'name': 'id', 'type': 'UInt64', 'nullable': False}, + {'name': 'value', 'type': 'String', 'nullable': True} + ] + } + }, + 'network': 'mainnet' + } + }, + 'functions': {} +} + +# Publish dataset +dataset = client.registry.datasets.publish( + namespace='myuser', + name='my_dataset', + version='1.0.0', + manifest=manifest, + visibility='public', + description='My custom dataset', + tags=['ethereum', 'custom'], + chains=['ethereum-mainnet'] +) + +print(f'Published: {dataset.namespace}/{dataset.name}@{dataset.latest_version.version_tag}') +``` + +### Publish New Version + +```python +# Update manifest for new version +updated_manifest = { + # ... your updated manifest +} + +# Publish new version +version = client.registry.datasets.publish_version( + namespace='myuser', + name='my_dataset', + version='1.1.0', + manifest=updated_manifest, + description='Added new features' +) + +print(f'Published version: {version.version_tag}') +``` + +### Update Metadata + +```python +# Update dataset description, tags, etc. +dataset = client.registry.datasets.update( + namespace='myuser', + name='my_dataset', + description='Updated description', + tags=['ethereum', 'defi', 'updated'], + chains=['ethereum-mainnet', 'arbitrum-one'] +) +``` + +### Change Visibility + +```python +# Make dataset private +dataset = client.registry.datasets.update_visibility( + 'myuser', + 'my_dataset', + 'private' +) + +# Make it public again +dataset = client.registry.datasets.update_visibility( + 'myuser', + 'my_dataset', + 'public' +) +``` + +### Manage Version Status + +```python +# Deprecate old version +version = client.registry.datasets.update_version_status( + 'myuser', + 'my_dataset', + '1.0.0', + 'deprecated' +) + +# Available statuses: 'draft', 'published', 'deprecated', 'archived' +``` + +### Delete Version + +```python +# Archive/delete a version +response = client.registry.datasets.delete_version( + 'myuser', + 'my_dataset', + '0.1.0' +) + +print(f'Deleted: {response.reference}') +``` + +## Complete Workflow: Search → Deploy + +This example shows the full workflow of finding a dataset, deploying it, and creating a derived dataset: + +```python +from amp import Client + +# Initialize client with all APIs +client = Client( + query_url='grpc://localhost:1602', + admin_url='http://localhost:8080', + auth=True +) + +# 1. Search for a dataset +print('Searching for ethereum datasets...') +results = client.registry.datasets.search('ethereum blocks', limit=5) +dataset = results.datasets[0] + +print(f'Found: {dataset.namespace}/{dataset.name}') + +# 2. Get full dataset details +full_dataset = client.registry.datasets.get(dataset.namespace, dataset.name) +print(f'Latest version: {full_dataset.latest_version.version_tag}') + +# 3. Fetch manifest +manifest = client.registry.datasets.get_manifest( + dataset.namespace, + dataset.name, + full_dataset.latest_version.version_tag +) + +# 4. Deploy dependency to local node +print(f'Deploying {dataset.namespace}/{dataset.name}...') +client.admin.datasets.register( + namespace=dataset.namespace, + name=dataset.name, + revision=full_dataset.latest_version.version_tag, + manifest=manifest +) + +deploy_response = client.admin.datasets.deploy( + dataset.namespace, + dataset.name, + full_dataset.latest_version.version_tag +) + +# Wait for deployment +client.admin.jobs.wait_for_completion(deploy_response.job_id) +print('Dependency deployed!') + +# 5. Create derived dataset +derived_manifest = { + 'kind': 'manifest', + 'network': 'mainnet', + 'dependencies': { + 'base': f'{dataset.namespace}/{dataset.name}@{full_dataset.latest_version.version_tag}' + }, + 'tables': { + 'sample_data': { + 'input': { + 'sql': 'SELECT * FROM base.blocks LIMIT 1000' + }, + 'schema': { + 'arrow': { + 'fields': [ + {'name': 'block_num', 'type': 'UInt64', 'nullable': False} + ] + } + }, + 'network': 'mainnet' + } + }, + 'functions': {} +} + +# 6. Deploy derived dataset +client.admin.datasets.register( + namespace='_', + name='my_sample', + revision='1.0.0', + manifest=derived_manifest +) + +deploy_response = client.admin.datasets.deploy('_', 'my_sample', '1.0.0') +client.admin.jobs.wait_for_completion(deploy_response.job_id) +print('Derived dataset deployed!') + +# 7. Query the data +result = client.sql('SELECT COUNT(*) FROM _/my_sample@1.0.0.sample_data') +print(f'Rows: {result.to_pandas()}') +``` + +## Error Handling + +```python +from amp.registry.errors import ( + RegistryError, + DatasetNotFoundError, + UnauthorizedError, + ValidationError +) + +try: + dataset = registry.datasets.get('unknown', 'dataset') +except DatasetNotFoundError as e: + print(f'Dataset not found: {e}') + print(f'Error code: {e.error_code}') + print(f'Request ID: {e.request_id}') +except UnauthorizedError: + print('Authentication required or invalid token') +except ValidationError as e: + print(f'Invalid input: {e}') +except RegistryError as e: + print(f'Registry error: {e}') +``` + +## Best Practices + +### Dataset Naming + +- Use lowercase letters, numbers, and underscores +- Must start with a letter or underscore (not a number) +- Examples: `ethereum_mainnet`, `uniswap_v3_trades`, `nft_transfers` + +### Version Tags + +- Use semantic versioning: `1.0.0`, `1.1.0`, `2.0.0` +- Or commit hashes: `8e0acc0` +- Or use `latest` for development + +### Dependencies + +- Always deploy dependencies before derived datasets +- Use version-pinned references: `namespace/dataset@1.0.0` +- Reference dependencies in SQL queries using aliases: + +```python +manifest = { + 'dependencies': { + 'eth': 'edgeandnode/ethereum_mainnet@1.0.0', + 'prices': 'edgeandnode/token_prices@2.1.0' + }, + 'tables': { + 'enriched_blocks': { + 'input': { + 'sql': ''' + SELECT + b.block_num, + b.timestamp, + p.eth_price + FROM eth.blocks b + LEFT JOIN prices.hourly p ON DATE_TRUNC('hour', b.timestamp) = p.hour + ''' + } + } + } +} +``` + +### Visibility + +- Use `public` for datasets you want to share +- Use `private` for internal/proprietary datasets +- Default is `public` when publishing + +## Environment Configuration + +### Registry URLs + +```python +# Staging (default) +registry = RegistryClient( + base_url='https://api.registry.amp.staging.thegraph.com' +) + +# Production (when available) +registry = RegistryClient( + base_url='https://api.registry.amp.thegraph.com' +) +``` + +### Authentication + +The Registry client uses the same authentication as the Admin API: + +1. Interactive login: `~/.amp-cli-config` +2. Direct token: Pass `auth_token='your-token'` +3. Unified client: Set `auth=True` to use saved credentials diff --git a/pyproject.toml b/pyproject.toml index d48e24e..258bc24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -142,6 +142,7 @@ select = [ ] exclude = [ "src/amp/FlightSql_pb2.py", # Generated protobuf file + "src/amp/registry/models.py", # Generated from OpenAPI spec "*notebook*" ] diff --git a/src/amp/__init__.py b/src/amp/__init__.py index 03c0873..dbba4c5 100644 --- a/src/amp/__init__.py +++ b/src/amp/__init__.py @@ -1,5 +1,6 @@ """Amp - Flight SQL client with comprehensive data loading capabilities.""" from amp.client import Client, QueryBuilder +from amp.registry import RegistryClient -__all__ = ['Client', 'QueryBuilder'] +__all__ = ['Client', 'QueryBuilder', 'RegistryClient'] diff --git a/src/amp/client.py b/src/amp/client.py index 760bfc7..38f1034 100644 --- a/src/amp/client.py +++ b/src/amp/client.py @@ -265,12 +265,13 @@ class Client: """Enhanced Flight SQL client with data loading capabilities. Supports both query operations (via Flight SQL) and optional admin operations - (via HTTP Admin API). + (via HTTP Admin API) and registry operations (via Registry API). Args: url: Flight SQL URL (for backward compatibility, treated as query_url) query_url: Query endpoint URL via Flight SQL (e.g., 'grpc://localhost:1602') admin_url: Optional Admin API URL (e.g., 'http://localhost:8080') + registry_url: Optional Registry API URL (default: staging registry) auth_token: Optional Bearer token for authentication (highest priority) auth: If True, load auth token from ~/.amp/cache (shared with TS CLI) @@ -293,6 +294,15 @@ class Client: >>> # Client with auth from environment variable >>> # export AMP_AUTH_TOKEN="eyJhbGci..." >>> client = Client(query_url='grpc://localhost:1602') + >>> + >>> # Client with registry support + >>> client = Client( + ... query_url='grpc://localhost:1602', + ... admin_url='http://localhost:8080', + ... registry_url='https://api.registry.amp.staging.thegraph.com', + ... auth=True + ... ) + >>> results = client.registry.datasets.search('ethereum') """ def __init__( @@ -300,6 +310,7 @@ def __init__( url: Optional[str] = None, query_url: Optional[str] = None, admin_url: Optional[str] = None, + registry_url: str = 'https://api.registry.amp.staging.thegraph.com', auth_token: Optional[str] = None, auth: bool = False, ): @@ -346,21 +357,40 @@ def get_token(): if admin_url: from amp.admin.client import AdminClient - # Pass auth=True if we have a get_token callable from auth file - # Otherwise pass the static token if available + # Pass through auth parameters to AdminClient so it can set up its own auth + # (AdminClient needs to manage its own AuthService for token refresh) if auth: - # Use auth file (auto-refreshing) + # Use auth file - AdminClient will set up AuthService for auto-refresh self._admin_client = AdminClient(admin_url, auth=True) elif auth_token or os.getenv('AMP_AUTH_TOKEN'): - # Use static token - static_token = auth_token or os.getenv('AMP_AUTH_TOKEN') - self._admin_client = AdminClient(admin_url, auth_token=static_token) + # Use static token (explicit param takes priority) + token = auth_token or os.getenv('AMP_AUTH_TOKEN') + self._admin_client = AdminClient(admin_url, auth_token=token) else: - # No auth + # No authentication self._admin_client = AdminClient(admin_url) else: self._admin_client = None + # Initialize optional Registry API client + if registry_url: + from amp.registry import RegistryClient + + # Pass through auth parameters to RegistryClient so it can set up its own auth + # (RegistryClient needs to manage its own AuthService for token refresh) + if auth: + # Use auth file - RegistryClient will set up AuthService for auto-refresh + self._registry_client = RegistryClient(registry_url, auth=True) + elif auth_token or os.getenv('AMP_AUTH_TOKEN'): + # Use static token (explicit param takes priority) + token = auth_token or os.getenv('AMP_AUTH_TOKEN') + self._registry_client = RegistryClient(registry_url, auth_token=token) + else: + # No authentication + self._registry_client = RegistryClient(registry_url) + else: + self._registry_client = None + def sql(self, query: str) -> QueryBuilder: """ Create a chainable query builder @@ -460,6 +490,35 @@ def schema(self): ) return self._admin_client.schema + @property + def registry(self): + """Access registry client for Registry API operations. + + Returns: + RegistryClient for dataset discovery, search, and publishing + + Raises: + ValueError: If registry_url was not provided during Client initialization + + Example: + >>> client = Client( + ... query_url='grpc://localhost:1602', + ... registry_url='https://api.registry.amp.staging.thegraph.com' + ... ) + >>> # Search for datasets + >>> results = client.registry.datasets.search('ethereum blocks') + >>> # Get a specific dataset + >>> dataset = client.registry.datasets.get('graphops', 'ethereum-mainnet') + >>> # Fetch manifest + >>> manifest = client.registry.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest') + """ + if not self._registry_client: + raise ValueError( + 'Registry API not configured. Provide registry_url parameter to Client() ' + 'to enable dataset discovery and search operations.' + ) + return self._registry_client + # Existing methods for backward compatibility def get_sql(self, query, read_all=False): """Execute SQL query and return Arrow data""" diff --git a/src/amp/registry/__init__.py b/src/amp/registry/__init__.py new file mode 100644 index 0000000..933a2ba --- /dev/null +++ b/src/amp/registry/__init__.py @@ -0,0 +1,27 @@ +"""Amp Registry API client. + +The Registry provides dataset discovery, search, and publishing capabilities. + +Example: + >>> from amp.registry import RegistryClient + >>> + >>> # Read-only operations + >>> client = RegistryClient() + >>> results = client.datasets.search('ethereum blocks') + >>> for dataset in results.datasets: + ... print(f"{dataset.namespace}/{dataset.name} - Score: {dataset.score}") + >>> + >>> # Get a specific dataset + >>> dataset = client.datasets.get('graphops', 'ethereum-mainnet') + >>> manifest = client.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest') + >>> + >>> # Authenticated operations + >>> client = RegistryClient(auth_token='your-token') + >>> client.datasets.publish(...) +""" + +from . import errors, models +from .client import RegistryClient +from .datasets import RegistryDatasetsClient + +__all__ = ['RegistryClient', 'RegistryDatasetsClient', 'errors', 'models'] diff --git a/src/amp/registry/client.py b/src/amp/registry/client.py new file mode 100644 index 0000000..82ba4f8 --- /dev/null +++ b/src/amp/registry/client.py @@ -0,0 +1,184 @@ +"""Registry API client.""" + +import logging +import os +from typing import Optional + +import httpx + +from . import errors + +logger = logging.getLogger(__name__) + + +class RegistryClient: + """Client for interacting with the Amp Registry API. + + The Registry API provides dataset discovery, search, and publishing capabilities. + + Args: + base_url: Base URL for the Registry API (default: staging registry) + auth_token: Optional Bearer token for authenticated operations (highest priority) + auth: If True, load auth token from ~/.amp/cache (shared with TS CLI) + + Authentication Priority (highest to lowest): + 1. Explicit auth_token parameter + 2. AMP_AUTH_TOKEN environment variable + 3. auth=True - reads from ~/.amp/cache/amp_cli_auth + + Example: + >>> # Read-only operations (no auth required) + >>> client = RegistryClient() + >>> datasets = client.datasets.search('ethereum') + >>> + >>> # Authenticated operations with explicit token + >>> client = RegistryClient(auth_token='your-token') + >>> client.datasets.publish(...) + >>> + >>> # Authenticated operations with auth file (auto-refresh) + >>> client = RegistryClient(auth=True) + >>> client.datasets.publish(...) + """ + + def __init__( + self, + base_url: str = 'https://api.registry.amp.staging.thegraph.com', + auth_token: Optional[str] = None, + auth: bool = False, + ): + """Initialize Registry client. + + Args: + base_url: Base URL for the Registry API + auth_token: Optional Bearer token for authentication + auth: If True, load auth token from ~/.amp/cache + + Raises: + ValueError: If both auth=True and auth_token are provided + """ + if auth and auth_token: + raise ValueError('Cannot specify both auth=True and auth_token. Choose one authentication method.') + + self.base_url = base_url.rstrip('/') + + # Resolve auth token provider with priority: explicit param > env var > auth file + self._get_token = None + if auth_token: + # Priority 1: Explicit auth_token parameter (static token) + def get_token(): + return auth_token + + self._get_token = get_token + elif os.getenv('AMP_AUTH_TOKEN'): + # Priority 2: AMP_AUTH_TOKEN environment variable (static token) + env_token = os.getenv('AMP_AUTH_TOKEN') + + def get_token(): + return env_token + + self._get_token = get_token + elif auth: + # Priority 3: Load from ~/.amp/cache/amp_cli_auth (auto-refreshing) + from amp.auth import AuthService + + auth_service = AuthService() + self._get_token = auth_service.get_token # Callable that auto-refreshes + + # Create HTTP client (no auth header yet - will be added per-request) + self._http = httpx.Client( + base_url=self.base_url, + headers={ + 'Content-Type': 'application/json', + 'Accept': 'application/json', + }, + timeout=30.0, + ) + + logger.info(f'Initialized Registry client for {base_url}') + + @property + def datasets(self): + """Access the datasets client. + + Returns: + RegistryDatasetsClient: Client for dataset operations + """ + from .datasets import RegistryDatasetsClient + + return RegistryDatasetsClient(self) + + def _request( + self, + method: str, + path: str, + **kwargs, + ) -> httpx.Response: + """Make an HTTP request to the Registry API. + + Args: + method: HTTP method (GET, POST, etc.) + path: API path (without base URL) + **kwargs: Additional arguments to pass to httpx + + Returns: + httpx.Response: HTTP response + + Raises: + RegistryError: If the request fails + """ + url = path if path.startswith('http') else f'{self.base_url}{path}' + + # Add auth header dynamically (auto-refreshes if needed) + headers = kwargs.get('headers', {}) + if self._get_token: + headers['Authorization'] = f'Bearer {self._get_token()}' + kwargs['headers'] = headers + + try: + response = self._http.request(method, url, **kwargs) + + # Handle error responses + if response.status_code >= 400: + self._handle_error(response) + + return response + + except httpx.RequestError as e: + raise errors.RegistryError(f'Request failed: {e}') from e + + def _handle_error(self, response: httpx.Response) -> None: + """Handle error responses from the API. + + Args: + response: HTTP error response + + Raises: + RegistryError: Mapped exception for the error + """ + try: + error_data = response.json() + error_code = error_data.get('error_code', '') + error_message = error_data.get('error_message', response.text) + request_id = error_data.get('request_id', '') + + # Map to specific exception + raise errors.map_error(error_code, error_message, request_id) + + except (ValueError, KeyError): + # Couldn't parse error response, raise generic error + raise errors.RegistryError( + f'HTTP {response.status_code}: {response.text}', + error_code=str(response.status_code), + ) from None + + def close(self): + """Close the HTTP client.""" + self._http.close() + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close() diff --git a/src/amp/registry/datasets.py b/src/amp/registry/datasets.py new file mode 100644 index 0000000..9f39fc6 --- /dev/null +++ b/src/amp/registry/datasets.py @@ -0,0 +1,420 @@ +"""Registry datasets client.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any, Dict, Optional + +from . import models + +if TYPE_CHECKING: + from .client import RegistryClient + +logger = logging.getLogger(__name__) + + +class RegistryDatasetsClient: + """Client for dataset operations in the Registry API. + + Provides methods for: + - Searching and discovering datasets + - Fetching dataset details and manifests + - Publishing datasets (requires authentication) + - Managing dataset visibility and versions + + Args: + registry_client: Parent RegistryClient instance + """ + + def __init__(self, registry_client: RegistryClient): + """Initialize datasets client. + + Args: + registry_client: Parent RegistryClient instance + """ + self._registry = registry_client + + # Read Operations (Public - No Auth Required) + + def list( + self, limit: int = 50, page: int = 1, sort_by: Optional[str] = None, direction: Optional[str] = None + ) -> models.DatasetListResponse: + """List all published datasets with pagination. + + Args: + limit: Maximum number of datasets to return (default: 50, max: 1000) + page: Page number (1-indexed, default: 1) + sort_by: Field to sort by (e.g., 'name', 'created_at', 'updated_at') + direction: Sort direction ('asc' or 'desc') + + Returns: + DatasetListResponse: Paginated list of datasets + + Example: + >>> client = RegistryClient() + >>> response = client.datasets.list(limit=10, page=1) + >>> print(f"Found {response.total_count} datasets") + >>> for dataset in response.datasets: + ... print(f" - {dataset.namespace}/{dataset.name}") + """ + params: Dict[str, Any] = {'limit': limit, 'page': page} + if sort_by: + params['sort_by'] = sort_by + if direction: + params['direction'] = direction + + response = self._registry._request('GET', '/api/v1/datasets', params=params) + return models.DatasetListResponse.model_validate(response.json()) + + def search(self, query: str, limit: int = 50, page: int = 1) -> models.DatasetSearchResponse: + """Search datasets using full-text keyword search. + + Results are ranked by relevance score. + + Args: + query: Search query string + limit: Maximum number of results (default: 50, max: 1000) + page: Page number (1-indexed, default: 1) + + Returns: + DatasetSearchResponse: Search results with relevance scores + + Example: + >>> client = RegistryClient() + >>> results = client.datasets.search('ethereum blocks') + >>> for dataset in results.datasets: + ... print(f"[{dataset.score}] {dataset.namespace}/{dataset.name}") + """ + params = {'search': query, 'limit': limit, 'page': page} + response = self._registry._request('GET', '/api/v1/datasets/search', params=params) + return models.DatasetSearchResponse.model_validate(response.json()) + + def ai_search(self, query: str, limit: int = 50) -> list[models.DatasetWithScore]: + """Search datasets using AI-powered semantic search. + + Uses embeddings and natural language processing for better matching. + + Args: + query: Natural language search query + limit: Maximum number of results (default: 50) + + Returns: + list[DatasetWithScore]: List of datasets with relevance scores + + Example: + >>> client = RegistryClient() + >>> results = client.datasets.ai_search('find NFT transfer data') + >>> for dataset in results: + ... print(f"[{dataset.score}] {dataset.namespace}/{dataset.name}") + """ + params = {'search': query, 'limit': limit} + response = self._registry._request('GET', '/api/v1/datasets/search/ai', params=params) + return [models.DatasetWithScore.model_validate(d) for d in response.json()] + + def get(self, namespace: str, name: str) -> models.Dataset: + """Get detailed information about a specific dataset. + + Args: + namespace: Dataset namespace (e.g., 'graphops', 'edgeandnode') + name: Dataset name (e.g., 'ethereum-mainnet') + + Returns: + Dataset: Complete dataset information + + Example: + >>> client = RegistryClient() + >>> dataset = client.datasets.get('graphops', 'ethereum-mainnet') + >>> print(f"Latest version: {dataset.latest_version}") + >>> print(f"Visibility: {dataset.visibility}") + """ + path = f'/api/v1/datasets/{namespace}/{name}' + response = self._registry._request('GET', path) + return models.Dataset.model_validate(response.json()) + + def list_versions(self, namespace: str, name: str) -> list[models.DatasetVersion]: + """List all versions of a dataset. + + Versions are returned sorted by latest first. + + Args: + namespace: Dataset namespace + name: Dataset name + + Returns: + list[DatasetVersion]: List of dataset versions + + Example: + >>> client = RegistryClient() + >>> versions = client.datasets.list_versions('graphops', 'ethereum-mainnet') + >>> for version in versions: + ... print(f" - v{version.version} ({version.status})") + """ + path = f'/api/v1/datasets/{namespace}/{name}/versions' + response = self._registry._request('GET', path) + return [models.DatasetVersion.model_validate(v) for v in response.json()] + + def get_version(self, namespace: str, name: str, version: str) -> models.DatasetVersion: + """Get details of a specific dataset version. + + Args: + namespace: Dataset namespace + name: Dataset name + version: Version tag (e.g., '1.0.0', 'latest') + + Returns: + DatasetVersion: Version details + + Example: + >>> client = RegistryClient() + >>> version = client.datasets.get_version('graphops', 'ethereum-mainnet', 'latest') + >>> print(f"Version: {version.version}") + >>> print(f"Created: {version.created_at}") + """ + path = f'/api/v1/datasets/{namespace}/{name}/versions/{version}' + response = self._registry._request('GET', path) + return models.DatasetVersion.model_validate(response.json()) + + def get_manifest(self, namespace: str, name: str, version: str) -> dict: + """Fetch the manifest JSON for a specific dataset version. + + Manifests define the dataset structure, dependencies, and ETL logic. + + Args: + namespace: Dataset namespace + name: Dataset name + version: Version tag (e.g., '1.0.0', 'latest') + + Returns: + dict: Manifest JSON content + + Example: + >>> client = RegistryClient() + >>> manifest = client.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest') + >>> print(f"Dependencies: {list(manifest.get('dependencies', {}).keys())}") + >>> print(f"Tables: {list(manifest.get('tables', {}).keys())}") + """ + path = f'/api/v1/datasets/{namespace}/{name}/versions/{version}/manifest' + response = self._registry._request('GET', path) + return response.json() + + # Write Operations (Require Authentication) + + def publish( + self, + namespace: str, + name: str, + version: str, + manifest: dict, + visibility: str = 'public', + description: Optional[str] = None, + tags: Optional[list[str]] = None, + chains: Optional[list[str]] = None, + sources: Optional[list[str]] = None, + ) -> models.Dataset: + """Publish a new dataset to the registry. + + Requires authentication (Bearer token). + + Args: + namespace: Dataset namespace (owner's username or org) + name: Dataset name + version: Initial version tag (e.g., '1.0.0') + manifest: Dataset manifest JSON + visibility: Dataset visibility ('public' or 'private', default: 'public') + description: Dataset description + tags: Optional list of tags/keywords + chains: Optional list of blockchain networks + sources: Optional list of data sources + + Returns: + Dataset: Created dataset + + Example: + >>> client = RegistryClient(auth_token='your-token') + >>> manifest = {...} # Your dataset manifest + >>> dataset = client.datasets.publish( + ... namespace='myuser', + ... name='my_dataset', + ... version='1.0.0', + ... manifest=manifest, + ... description='My custom dataset', + ... tags=['ethereum', 'defi'] + ... ) + >>> print(f"Published: {dataset.namespace}/{dataset.name}") + """ + body = { + 'name': name, + 'version': version, + 'manifest': manifest, + 'visibility': visibility, + } + if description: + body['description'] = description + if tags: + body['tags'] = tags + if chains: + body['chains'] = chains + if sources: + body['sources'] = sources + + response = self._registry._request('POST', '/api/v1/owners/@me/datasets/publish', json=body) + return models.Dataset.model_validate(response.json()) + + def publish_version( + self, + namespace: str, + name: str, + version: str, + manifest: dict, + description: Optional[str] = None, + ) -> models.DatasetVersion: + """Publish a new version of an existing dataset. + + Requires authentication and ownership of the dataset. + + Args: + namespace: Dataset namespace + name: Dataset name + version: New version tag (e.g., '1.1.0') + manifest: Dataset manifest JSON for this version + description: Optional version description + + Returns: + DatasetVersion: Created version + + Example: + >>> client = RegistryClient(auth_token='your-token') + >>> manifest = {...} # Updated manifest + >>> version = client.datasets.publish_version( + ... namespace='myuser', + ... name='my_dataset', + ... version='1.1.0', + ... manifest=manifest + ... ) + >>> print(f"Published version: {version.version}") + """ + body = {'version': version, 'manifest': manifest} + if description: + body['description'] = description + + path = f'/api/v1/owners/@me/datasets/{namespace}/{name}/versions/publish' + response = self._registry._request('POST', path, json=body) + return models.DatasetVersion.model_validate(response.json()) + + def update( + self, + namespace: str, + name: str, + description: Optional[str] = None, + tags: Optional[list[str]] = None, + chains: Optional[list[str]] = None, + sources: Optional[list[str]] = None, + ) -> models.Dataset: + """Update dataset metadata. + + Requires authentication and ownership of the dataset. + + Args: + namespace: Dataset namespace + name: Dataset name + description: Updated description + tags: Updated tags + chains: Updated chains + sources: Updated sources + + Returns: + Dataset: Updated dataset + + Example: + >>> client = RegistryClient(auth_token='your-token') + >>> dataset = client.datasets.update( + ... namespace='myuser', + ... name='my_dataset', + ... description='Updated description', + ... tags=['ethereum', 'defi', 'uniswap'] + ... ) + """ + body = {} + if description is not None: + body['description'] = description + if tags is not None: + body['tags'] = tags + if chains is not None: + body['chains'] = chains + if sources is not None: + body['sources'] = sources + + path = f'/api/v1/owners/@me/datasets/{namespace}/{name}' + response = self._registry._request('PUT', path, json=body) + return models.Dataset.model_validate(response.json()) + + def update_visibility(self, namespace: str, name: str, visibility: str) -> models.Dataset: + """Update dataset visibility (public/private). + + Requires authentication and ownership of the dataset. + + Args: + namespace: Dataset namespace + name: Dataset name + visibility: New visibility ('public' or 'private') + + Returns: + Dataset: Updated dataset + + Example: + >>> client = RegistryClient(auth_token='your-token') + >>> dataset = client.datasets.update_visibility('myuser', 'my_dataset', 'private') + >>> print(f"Visibility: {dataset.visibility}") + """ + body = {'visibility': visibility} + path = f'/api/v1/owners/@me/datasets/{namespace}/{name}/visibility' + response = self._registry._request('PATCH', path, json=body) + return models.Dataset.model_validate(response.json()) + + def update_version_status(self, namespace: str, name: str, version: str, status: str) -> models.DatasetVersion: + """Update the status of a dataset version. + + Requires authentication and ownership of the dataset. + + Args: + namespace: Dataset namespace + name: Dataset name + version: Version tag + status: New status ('draft', 'published', 'deprecated', or 'archived') + + Returns: + DatasetVersion: Updated version + + Example: + >>> client = RegistryClient(auth_token='your-token') + >>> version = client.datasets.update_version_status( + ... 'myuser', 'my_dataset', '1.0.0', 'deprecated' + ... ) + >>> print(f"Status: {version.status}") + """ + body = {'status': status} + path = f'/api/v1/owners/@me/datasets/{namespace}/{name}/versions/{version}' + response = self._registry._request('PATCH', path, json=body) + return models.DatasetVersion.model_validate(response.json()) + + def delete_version(self, namespace: str, name: str, version: str) -> models.ArchiveDatasetVersionResponse: + """Delete (archive) a dataset version. + + Requires authentication and ownership of the dataset. + + Args: + namespace: Dataset namespace + name: Dataset name + version: Version tag to delete + + Returns: + ArchiveDatasetVersionResponse: Confirmation of deletion + + Example: + >>> client = RegistryClient(auth_token='your-token') + >>> response = client.datasets.delete_version('myuser', 'my_dataset', '0.1.0') + >>> print(f"Deleted: {response.reference}") + """ + path = f'/api/v1/owners/@me/datasets/{namespace}/{name}/versions/{version}' + response = self._registry._request('DELETE', path) + return models.ArchiveDatasetVersionResponse.model_validate(response.json()) diff --git a/src/amp/registry/errors.py b/src/amp/registry/errors.py new file mode 100644 index 0000000..7b2b9d8 --- /dev/null +++ b/src/amp/registry/errors.py @@ -0,0 +1,76 @@ +"""Registry API error types.""" + + +class RegistryError(Exception): + """Base exception for all Registry API errors.""" + + def __init__(self, message: str, error_code: str = '', request_id: str = ''): + super().__init__(message) + self.error_code = error_code + self.request_id = request_id + + +class DatasetNotFoundError(RegistryError): + """Raised when a requested dataset does not exist.""" + + pass + + +class DatasetVersionNotFoundError(RegistryError): + """Raised when a requested dataset version does not exist.""" + + pass + + +class UnauthorizedError(RegistryError): + """Raised when authentication is missing or invalid.""" + + pass + + +class ForbiddenError(RegistryError): + """Raised when the user lacks permission for the requested operation.""" + + pass + + +class ValidationError(RegistryError): + """Raised when request parameters are invalid.""" + + pass + + +class RegistryDatabaseError(RegistryError): + """Raised when a database operation fails.""" + + pass + + +# Error code mapping from API responses +ERROR_CODE_MAP = { + 'NOT_FOUND': DatasetNotFoundError, + 'DATASET_NOT_FOUND': DatasetNotFoundError, + 'DATASET_VERSION_NOT_FOUND': DatasetVersionNotFoundError, + 'UNAUTHORIZED': UnauthorizedError, + 'FORBIDDEN': ForbiddenError, + 'INVALID_QUERY_PARAMETERS': ValidationError, + 'LIMIT_TOO_LARGE': ValidationError, + 'LIMIT_INVALID': ValidationError, + 'AMP_REGISTRY_DB_ERROR': RegistryDatabaseError, + 'DATASET_CONVERSION_ERROR': RegistryDatabaseError, +} + + +def map_error(error_code: str, error_message: str, request_id: str = '') -> RegistryError: + """Map an error code to the appropriate exception type. + + Args: + error_code: Machine-readable error code from API response + error_message: Human-readable error message + request_id: Optional request ID for tracing + + Returns: + RegistryError: Appropriate exception subclass for the error code + """ + exception_cls = ERROR_CODE_MAP.get(error_code, RegistryError) + return exception_cls(error_message, error_code=error_code, request_id=request_id) diff --git a/src/amp/registry/models.py b/src/amp/registry/models.py new file mode 100644 index 0000000..386f0f2 --- /dev/null +++ b/src/amp/registry/models.py @@ -0,0 +1,732 @@ +# generated by datamodel-codegen: +# filename: registry_openapi.json +# timestamp: 2025-11-18T02:00:39+00:00 + +from __future__ import annotations + +from enum import Enum +from typing import Annotated, Any, Optional + +from pydantic import AwareDatetime, BaseModel, Field, field_validator + +from .validators import parse_utc_datetime + + +class ArchiveDatasetVersionResponse(BaseModel): + """ + Response for archiving a dataset version + """ + + reference: Annotated[ + str, + Field( + description='The reference of the archived dataset version', + examples=['edgeandnode/eth_transfers@1.0.0'], + ), + ] + + +class DatasetVersionAncestry(BaseModel): + dataset_reference: Annotated[ + str, + Field( + description='Dataset reference in the format: {namespace}/{name}@{version_tag}. Points to the DatasetVersion.dataset_reference. This allows version-pinned dependencies.', + examples=[ + 'edgeandnode/eth_transfers@1.0.0', + 'edgeandnode/eth_transfers@8e0acc0', + 'edgeandnode/eth_transfers@latest', + ], + pattern='^[a-z0-9_]+/[a-z_][a-z0-9_]*@[a-z0-9._-]+$', + ), + ] + + +class DatasetVersionStatus(Enum): + draft = 'draft' + published = 'published' + deprecated = 'deprecated' + archived = 'archived' + + +class DatasetVisibility(Enum): + private = 'private' + public = 'public' + + +class ErrorResponse(BaseModel): + """ + Standard error response returned by the API + + This struct represents error information returned in HTTP error responses. + It provides structured error details including a machine-readable error code + and human-readable message. + + ## Error Code Conventions + - Error codes use SCREAMING_SNAKE_CASE (e.g., `DATASET_NOT_FOUND`) + - Codes are stable and can be relied upon programmatically + - Messages may change and should only be used for display/logging + + ## Example JSON Response + ```json + { + "error_code": "DATASET_NOT_FOUND", + "error_message": "dataset 'eth_mainnet' version '1.0.0' not found", + "request_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + } + ``` + """ + + error_code: Annotated[ + str, + Field( + description='Machine-readable error code in SCREAMING_SNAKE_CASE format\n\nError codes are stable across API versions and should be used\nfor programmatic error handling. Examples: `INVALID_SELECTOR`,\n`DATASET_NOT_FOUND`, `REGISTRY_DB_ERROR`' + ), + ] + error_message: Annotated[ + str, + Field( + description='Human-readable error message\n\nMessages provide detailed context about the error but may change\nover time. Use `error_code` for programmatic decisions.' + ), + ] + request_id: Annotated[ + Optional[str], + Field( + description='Request ID for tracing and correlation\n\nThis ID can be used to correlate error responses with server logs\nfor debugging and support purposes. The ID is generated per-request\nand appears in both logs and error responses.' + ), + ] = None + + +class HealthcheckResponse(BaseModel): + status: str + version: str + + +class LivenessResponse(BaseModel): + status: str + + +class ManifestKind(Enum): + manifest = 'manifest' + evm_rpc = 'evm-rpc' + eth_beacon = 'eth-beacon' + firehose = 'firehose' + + +class ManifestTag(BaseModel): + created_at: Annotated[ + str, + Field(description='Timestamp when the ManifestTag record was created (immutable).'), + ] + dataset_reference: Annotated[ + str, + Field( + description='Dataset reference in the format: {namespace}/{name}@{version_tag}. This value is globally unique and is a pointer to a tagged and published Manifest.', + examples=[ + 'edgeandnode/eth_transfers@1.0.0', + 'edgeandnode/eth_transfers@8e0acc0', + 'edgeandnode/eth_transfers@latest', + ], + pattern='^[a-z0-9_]+/[a-z_][a-z0-9_]*@[a-z0-9._-]+$', + ), + ] + manifest: Optional[ManifestTag] = None + name: Annotated[ + str, + Field( + description='The dataset name. Pattern: lowercase, underscores allowed, alphanumeric, cannot start with a number.', + pattern='^[a-z_][a-z0-9_]*$', + ), + ] + namespace: Annotated[ + str, + Field( + description='The dataset namespace. This is the logical/enforced grouping mechanism for datasets. It can be the user 0x address that deployed the subgraph, a chosen username, or org.', + examples=[ + 'edgeandnode', + 'testuser.eth', + '0x85F036b4952B74A438d724EA93495FD6220B94b6', + ], + pattern='^[a-z0-9_]*$', + ), + ] + version_tag: Annotated[ + str, + Field( + description="The published version tag. This is basically the version label. Can be semver, a commit hash, or 'latest'.", + examples=['1.0.0', '0.1.0', '0.0.1', '8e0acc0', 'latest'], + pattern='^[a-z0-9._-]+$', + ), + ] + + +class SavedQuery(BaseModel): + created_at: Annotated[ + AwareDatetime, + Field( + description='Timestamp when the SavedQuery was created', + examples=['2025-01-15T10:30:00Z'], + ), + ] + creator: Annotated[ + str, + Field( + description='Creator/owner of the saved query (ethereum address or user_id)', + examples=['0x1234567890123456789012345678901234567890'], + ), + ] + description: Annotated[ + Optional[str], + Field( + description='Optional description of what the query does', + examples=['Query to get all Ethereum transfers'], + ), + ] = None + id: Annotated[ + str, + Field( + description='Unique identifier for the saved query (UUID)', + examples=['123e4567-e89b-12d3-a456-426614174000'], + pattern='^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$', + ), + ] + name: Annotated[ + str, + Field(description='Name of the saved query', examples=['Get all transfers']), + ] + query: Annotated[ + str, + Field( + description='The SQL query string', + examples=['SELECT * FROM eth_transfers LIMIT 100'], + ), + ] + updated_at: Annotated[ + AwareDatetime, + Field( + description='Timestamp when the SavedQuery was last updated', + examples=['2025-01-15T14:45:00Z'], + ), + ] + visibility: DatasetVisibility + + # Custom validator for datetime parsing + _parse_datetimes = field_validator('created_at', 'updated_at', mode='before')(parse_utc_datetime) + + +class ServiceStatus(BaseModel): + error: Optional[str] = None + status: str + + +class UpdateDatasetMetadataDto(BaseModel): + """ + Input for update the Datasets metadata fields: + - keywords + - README + - sources + - repository_url + - license + - description + """ + + description: Annotated[Optional[str], Field(description='Dataset description')] = None + indexing_chains: Annotated[list[str], Field(description='Chains being indexed by the dataset')] + keywords: Annotated[Optional[list[str]], Field(description='Keywords for dataset discovery')] = None + license: Annotated[Optional[str], Field(description='License covering the dataset')] = None + readme: Annotated[Optional[str], Field(description='User-defined README for the dataset')] = None + repository_url: Annotated[Optional[str], Field(description='VCS repository URL')] = None + source: Annotated[Optional[list[str]], Field(description='Source of data being materialized')] = None + + +class UpdateDatasetVersionStatusDto(BaseModel): + """ + Input for updating a DatasetVersion's status + """ + + status: Annotated[ + DatasetVersionStatus, + Field( + description='The new status for the dataset version (Draft or Published)\nNote: Use the DELETE endpoint to archive a version' + ), + ] + + +class UpdateDatasetVisibilityDto(BaseModel): + """ + Input for updating a Dataset's visibility + """ + + visibility: Annotated[DatasetVisibility, Field(description='The new visibility level for the dataset')] + + +class DatasetVersion(BaseModel): + ancestors: Annotated[ + Optional[list[DatasetVersionAncestry]], + Field( + description='Array of ancestor DatasetVersion references that this version extends from (version-pinned dependencies).' + ), + ] = None + changelog: Annotated[ + Optional[str], + Field( + description='A description of what changed with this version. Allows developers of the Dataset to communicate to downstream consumers what has changed with this version from previous versions. Migration guides, etc.', + examples=[ + 'Version 1.0.0 - Added new features and bug fixes', + 'Breaking change: Updated schema structure', + ], + ), + ] = None + created_at: Annotated[ + str, + Field( + description='Timestamp when the DatasetVersion record was created (immutable).', + examples=['2025-01-15T10:30:00Z'], + ), + ] + dataset_reference: Annotated[ + str, + Field( + description='Dataset reference in the format: {namespace}/{name}@{version_tag}. This value is globally unique and is a pointer to a tagged and published Manifest.', + examples=[ + 'edgeandnode/eth_transfers@1.0.0', + 'edgeandnode/eth_transfers@8e0acc0', + 'edgeandnode/eth_transfers@latest', + ], + pattern='^[a-z0-9_]+/[a-z_][a-z0-9_]*@[a-z0-9._-]+$', + ), + ] + descendants: Annotated[ + Optional[list[DatasetVersionAncestry]], + Field(description='Array of descendant DatasetVersion references that extend from this version.'), + ] = None + status: DatasetVersionStatus + version_tag: Annotated[ + str, + Field( + description="The published version tag. This is basically the version label. Can be semver, a commit hash, or 'latest'.", + examples=['1.0.0', '0.1.0', '0.0.1', '8e0acc0', 'latest'], + pattern='^[a-z0-9._-]+$', + ), + ] + + +class DatasetWithScore(BaseModel): + """ + Dataset with search relevance score. Extends the base Dataset with a weighted score indicating how well it matches the search query. Higher scores indicate better relevance. + """ + + created_at: Annotated[ + AwareDatetime, + Field( + description='Timestamp when the Dataset record was created (immutable).', + examples=['2024-01-15T10:30:00Z'], + ), + ] + dataset_reference: Annotated[ + Optional[str], + Field( + description='Computed link to the latest DatasetVersion reference in PURL format.', + examples=['edgeandnode/eth_transfers@1.0.0'], + min_length=1, + pattern='^[a-z0-9_]+/[a-z_][a-z0-9_]*@[a-z0-9._-]+$', + ), + ] = None + description: Annotated[ + Optional[str], + Field( + description='Description of the dataset, its intended use, and purpose.', + examples=['Ethereum ERC20 token transfers indexed from mainnet'], + max_length=1024, + ), + ] = None + indexing_chains: Annotated[ + list[str], + Field( + description='Chains being indexed by the Dataset. Used for discovery by chain.', + examples=[['mainnet', 'arbitrum-one']], + ), + ] + keywords: Annotated[ + Optional[list[str]], + Field( + description='User-defined or derived keywords defining the usage of the dataset.', + examples=[['ERC20', 'DeFi', 'transfers']], + ), + ] = None + latest_version: Optional[DatasetVersion] = None + license: Annotated[ + Optional[str], + Field( + description='Usage license covering the Dataset.', + examples=['MIT', 'Apache-2.0'], + ), + ] = None + name: Annotated[ + str, + Field( + description='The dataset name. Lowercase, alphanumeric with underscores. Cannot start with a number.', + examples=['eth_transfers', 'nft_marketplace', 'defi_protocol'], + min_length=1, + pattern='^[a-z_][a-z0-9_]*$', + ), + ] + namespace: Annotated[ + str, + Field( + description='The dataset namespace. Logical grouping mechanism for datasets. Can be a user 0x address, username, or organization.', + examples=[ + 'edgeandnode', + 'testuser.eth', + '0x85F036b4952B74A438d724EA93495FD6220B94b6', + ], + min_length=1, + pattern='^[a-z0-9_]*$', + ), + ] + owner: Annotated[ + str, + Field( + description='Owner of the Dataset. Can be an organization or user 0x address.', + examples=['0x85F036b4952B74A438d724EA93495FD6220B94b6'], + ), + ] + readme: Annotated[ + Optional[str], + Field( + description='User-defined README for the Dataset providing usage examples and documentation.', + examples=['# ETH Transfers\\n\\nThis dataset indexes all Ethereum transfers...'], + ), + ] = None + repository_url: Annotated[ + Optional[str], + Field( + description='VCS repository URL containing the Dataset source code.', + examples=['https://github.com/edgeandnode/platform'], + ), + ] = None + score: Annotated[ + float, + Field( + description='Weighted relevance score indicating how well this dataset matches the search query. Higher scores indicate better relevance. Score is calculated based on matches in description, keywords, source, and indexing chains fields.', + examples=[0.95, 0.72, 0.45], + ), + ] + source: Annotated[ + Optional[list[str]], + Field( + description='Source of data being materialized by the Dataset (e.g., contract addresses, logs, transactions).', + examples=[['0x1f9840a85d5aF5bf1D1762F925BDADdC4201F984']], + ), + ] = None + updated_at: Annotated[ + AwareDatetime, + Field( + description='Timestamp when the Dataset record was last updated.', + examples=['2024-01-20T14:45:00Z'], + ), + ] + versions: Annotated[ + Optional[list[DatasetVersion]], + Field(description='Link to all DatasetVersion records that this Dataset is a parent of.'), + ] = None + visibility: DatasetVisibility + + # Custom validator for datetime parsing + _parse_datetimes = field_validator('created_at', 'updated_at', mode='before')(parse_utc_datetime) + + +class InsertDatasetVersion(BaseModel): + """ + Input for creating a new DatasetVersion. Contains the version tag, manifest hash, and manifest content. + """ + + changelog: Annotated[ + Optional[str], + Field( + description='Optional changelog describing what changed in this version.', + examples=[ + 'Added new features and bug fixes', + 'Breaking change: Updated schema structure', + ], + ), + ] = None + kind: ManifestKind + manifest: Annotated[ + dict[str, Any], + Field( + description='Manifest JSON content. This should be a valid datasets_derived::Manifest structure. The SHA256 hash will be calculated server-side.', + examples=[{'tables': [], 'version': '1.0'}], + ), + ] + status: DatasetVersionStatus + version_tag: Annotated[ + str, + Field( + description="Version tag (e.g., '1.0.0', 'latest', '8e0acc0'). Pattern: lowercase, numbers, dots, underscores, hyphens.", + examples=['1.0.0', '0.1.0', '8e0acc0', 'latest'], + pattern='^[a-z0-9._-]+$', + ), + ] + + +class Manifest(BaseModel): + created_at: Annotated[ + str, + Field( + description='Timestamp when the `Manifest` record was created (immutable).', + examples=['2025-01-15T10:30:00Z'], + ), + ] + kind: ManifestKind + manifest_hash: Annotated[ + str, + Field( + description='The SHA256 unique hash that represents that Manifest JSON content.', + examples=['e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'], + pattern='^[0-9a-fA-F]{64}$', + ), + ] + + +class ReadinessChecks(BaseModel): + database: ServiceStatus + + +class ReadinessResponse(BaseModel): + checks: ReadinessChecks + status: str + + +class Dataset(BaseModel): + """ + Top-level container for a user-defined, tagged, and published Dataset. Contains metadata and discovery information for datasets. + """ + + created_at: Annotated[ + AwareDatetime, + Field( + description='Timestamp when the Dataset record was created (immutable).', + examples=['2024-01-15T10:30:00Z'], + ), + ] + dataset_reference: Annotated[ + Optional[str], + Field( + description='Computed link to the latest DatasetVersion reference in PURL format.', + examples=['edgeandnode/eth_transfers@1.0.0'], + min_length=1, + pattern='^[a-z0-9_]+/[a-z_][a-z0-9_]*@[a-z0-9._-]+$', + ), + ] = None + description: Annotated[ + Optional[str], + Field( + description='Description of the dataset, its intended use, and purpose.', + examples=['Ethereum ERC20 token transfers indexed from mainnet'], + max_length=1024, + ), + ] = None + indexing_chains: Annotated[ + list[str], + Field( + description='Chains being indexed by the Dataset. Used for discovery by chain.', + examples=[['mainnet', 'arbitrum-one']], + ), + ] + keywords: Annotated[ + Optional[list[str]], + Field( + description='User-defined or derived keywords defining the usage of the dataset.', + examples=[['ERC20', 'DeFi', 'transfers']], + ), + ] = None + latest_version: Optional[DatasetVersion] = None + license: Annotated[ + Optional[str], + Field( + description='Usage license covering the Dataset.', + examples=['MIT', 'Apache-2.0'], + ), + ] = None + name: Annotated[ + str, + Field( + description='The dataset name. Lowercase, alphanumeric with underscores. Cannot start with a number.', + examples=['eth_transfers', 'nft_marketplace', 'defi_protocol'], + min_length=1, + pattern='^[a-z_][a-z0-9_]*$', + ), + ] + namespace: Annotated[ + str, + Field( + description='The dataset namespace. Logical grouping mechanism for datasets. Can be a user 0x address, username, or organization.', + examples=[ + 'edgeandnode', + 'testuser.eth', + '0x85F036b4952B74A438d724EA93495FD6220B94b6', + ], + min_length=1, + pattern='^[a-z0-9_]*$', + ), + ] + owner: Annotated[ + str, + Field( + description='Owner of the Dataset. Can be an organization or user 0x address.', + examples=['0x85F036b4952B74A438d724EA93495FD6220B94b6'], + ), + ] + readme: Annotated[ + Optional[str], + Field( + description='User-defined README for the Dataset providing usage examples and documentation.', + examples=['# ETH Transfers\n\nThis dataset indexes all Ethereum transfers...'], + ), + ] = None + repository_url: Annotated[ + Optional[str], + Field( + description='VCS repository URL containing the Dataset source code.', + examples=['https://github.com/edgeandnode/platform'], + ), + ] = None + source: Annotated[ + Optional[list[str]], + Field( + description='Source of data being materialized by the Dataset (e.g., contract addresses, logs, transactions).', + examples=[['0x1f9840a85d5aF5bf1D1762F925BDADdC4201F984']], + ), + ] = None + updated_at: Annotated[ + AwareDatetime, + Field( + description='Timestamp when the Dataset record was last updated.', + examples=['2024-01-20T14:45:00Z'], + ), + ] + versions: Annotated[ + Optional[list[DatasetVersion]], + Field(description='Link to all DatasetVersion records that this Dataset is a parent of.'), + ] = None + visibility: DatasetVisibility + + # Custom validator for datetime parsing + _parse_datetimes = field_validator('created_at', 'updated_at', mode='before')(parse_utc_datetime) + + +class DatasetListResponse(BaseModel): + datasets: Annotated[ + list[Dataset], + Field(description='List of the datasets being returned in this page'), + ] + has_next_page: Annotated[bool, Field(description='If true, there are more datasets that can be fetched')] + total_count: Annotated[int, Field(description='Total number of datasets matching the query filters')] + + +class DatasetSearchResponse(BaseModel): + datasets: Annotated[ + list[DatasetWithScore], + Field(description='List of the datasets being returned in this page'), + ] + has_next_page: Annotated[bool, Field(description='If true, there are more datasets that can be fetched')] + total_count: Annotated[int, Field(description='Total number of datasets matching the query filters')] + + +class InsertDataset(BaseModel): + """ + Input for creating a new Dataset. Contains metadata, discovery information, and the initial version to create. The owner will be automatically set to the authenticated user. + """ + + description: Annotated[ + Optional[str], + Field( + description='Description of the dataset, its intended use, and purpose.', + examples=['Ethereum ERC20 token transfers indexed from mainnet'], + max_length=1024, + ), + ] = None + indexing_chains: Annotated[ + list[str], + Field( + description='Chains being indexed by the Dataset. Used for discovery by chain.', + examples=[['mainnet', 'arbitrum-one']], + ), + ] + keywords: Annotated[ + Optional[list[str]], + Field( + description='User-defined keywords defining the usage of the dataset.', + examples=[['ERC20', 'DeFi', 'transfers']], + ), + ] = None + license: Annotated[ + Optional[str], + Field( + description='Usage license covering the Dataset.', + examples=['MIT', 'Apache-2.0', 'GPL-3.0'], + ), + ] = None + name: Annotated[ + str, + Field( + description='The dataset name. Pattern: lowercase, alphanumeric with underscores, cannot start with a number.', + examples=['eth_transfers', 'nft_marketplace', 'defi_protocol'], + min_length=1, + pattern='^[a-z_][a-z0-9_]*$', + ), + ] + namespace: Annotated[ + str, + Field( + description='The dataset namespace. Pattern: lowercase, numbers, underscores.', + examples=['edgeandnode', 'testuser_eth', 'my_org'], + min_length=1, + pattern='^[a-z0-9_]*$', + ), + ] + readme: Annotated[ + Optional[str], + Field( + description='User-defined README for the Dataset providing usage examples and documentation.', + examples=['# ETH Transfers\\n\\nThis dataset indexes all Ethereum transfers...'], + ), + ] = None + repository_url: Annotated[ + Optional[str], + Field( + description='VCS repository URL containing the Dataset source code.', + examples=['https://github.com/edgeandnode/platform'], + pattern='^https?://[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\\\\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*(/.*)?$', + ), + ] = None + source: Annotated[ + Optional[list[str]], + Field( + description='Source of data being materialized by the Dataset (e.g., contract addresses).', + examples=[['0x1f9840a85d5aF5bf1D1762F925BDADdC4201F984']], + ), + ] = None + version: InsertDatasetVersion + visibility: DatasetVisibility + + +class OwnerDatasetListResponse(BaseModel): + datasets: Annotated[ + list[Dataset], + Field(description='List of the datasets being returned in this page'), + ] + has_next_page: Annotated[bool, Field(description='If true, there are more datasets that can be fetched')] + total_count: Annotated[int, Field(description='Total number of datasets matching the query filters')] + + +class AuthUserOwnedDatasetListResponse(BaseModel): + datasets: Annotated[ + list[Dataset], + Field(description='List of the datasets being returned in this page'), + ] + has_next_page: Annotated[bool, Field(description='If true, there are more datasets that can be fetched')] + total_count: Annotated[int, Field(description='Total number of datasets matching the query filters')] + + +ManifestTag.model_rebuild() diff --git a/src/amp/registry/validators.py b/src/amp/registry/validators.py new file mode 100644 index 0000000..470102a --- /dev/null +++ b/src/amp/registry/validators.py @@ -0,0 +1,59 @@ +"""Custom validators for Registry API models.""" + +from datetime import datetime, timezone +from typing import Any + +from pydantic import field_validator + + +def parse_utc_datetime(value: Any) -> datetime: + """Parse datetime strings from Registry API. + + The API returns timestamps in format: '2025-11-18 01:28:40.434357 UTC' + Pydantic expects ISO 8601 format, so we need a custom parser. + + Args: + value: DateTime value from API (string or datetime) + + Returns: + datetime: Parsed timezone-aware datetime + + Raises: + ValueError: If the format is not recognized + """ + if isinstance(value, datetime): + return value + + if isinstance(value, str): + # Try parsing the Registry API format: "YYYY-MM-DD HH:MM:SS.ffffff UTC" + if value.endswith(' UTC'): + # Remove the ' UTC' suffix and parse + dt_str = value[:-4].strip() + try: + dt = datetime.fromisoformat(dt_str) + # Ensure it's timezone-aware (UTC) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + pass + + # Try standard ISO 8601 parsing + try: + dt = datetime.fromisoformat(value) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + pass + + raise ValueError(f'Unable to parse datetime: {value}') + + +def create_datetime_validator(): + """Create a field validator for datetime fields. + + Returns: + A Pydantic field_validator for parsing Registry API datetimes + """ + return field_validator('created_at', 'updated_at', mode='before')(lambda v: parse_utc_datetime(v)) diff --git a/tests/integration/registry/__init__.py b/tests/integration/registry/__init__.py new file mode 100644 index 0000000..6c0fd75 --- /dev/null +++ b/tests/integration/registry/__init__.py @@ -0,0 +1 @@ +"""Integration tests for Registry API.""" diff --git a/tests/integration/registry/test_datasets.py b/tests/integration/registry/test_datasets.py new file mode 100644 index 0000000..fbef327 --- /dev/null +++ b/tests/integration/registry/test_datasets.py @@ -0,0 +1,180 @@ +"""Integration tests for Registry datasets operations. + +These tests interact with the real staging Registry API. +Read operations don't require authentication. +""" + +import pytest + +from amp.registry import RegistryClient +from amp.registry.errors import DatasetNotFoundError + + +@pytest.fixture +def registry_client(): + """Create registry client for staging environment.""" + client = RegistryClient() + yield client + client.close() + + +class TestDatasetsList: + """Test dataset listing operations.""" + + def test_list_datasets_default(self, registry_client): + """Test listing datasets with default parameters.""" + response = registry_client.datasets.list() + + assert response.total_count >= 0 + assert isinstance(response.datasets, list) + assert response.has_next_page is not None + + if response.datasets: + dataset = response.datasets[0] + assert dataset.namespace + assert dataset.name + assert dataset.latest_version + + def test_list_datasets_with_limit(self, registry_client): + """Test listing with custom limit.""" + response = registry_client.datasets.list(limit=5, page=1) + + assert response.total_count >= 0 + assert len(response.datasets) <= 5 + + def test_list_datasets_pagination(self, registry_client): + """Test pagination works correctly.""" + page1 = registry_client.datasets.list(limit=10, page=1) + page2 = registry_client.datasets.list(limit=10, page=2) + + # If there are enough datasets, pages should differ + if page1.has_next_page and page2.datasets: + # Pages should have different datasets (by ID or name) + page1_ids = {(d.namespace, d.name) for d in page1.datasets} + page2_ids = {(d.namespace, d.name) for d in page2.datasets} + assert page1_ids != page2_ids + + +class TestDatasetsGet: + """Test getting dataset details.""" + + def test_get_dataset_not_found(self, registry_client): + """Test getting non-existent dataset raises appropriate error.""" + with pytest.raises(DatasetNotFoundError): + registry_client.datasets.get('nonexistent', 'nonexistent_dataset') + + def test_get_dataset_success(self, registry_client): + """Test getting a known dataset.""" + # First list datasets to find one that exists + response = registry_client.datasets.list(limit=1) + + if not response.datasets: + pytest.skip('No datasets available in registry') + + dataset = response.datasets[0] + + # Get full dataset details + full_dataset = registry_client.datasets.get(dataset.namespace, dataset.name) + + from amp.registry.models import DatasetVisibility + + assert full_dataset.namespace == dataset.namespace + assert full_dataset.name == dataset.name + assert full_dataset.latest_version + assert full_dataset.visibility in [DatasetVisibility.public, DatasetVisibility.private] + + +class TestDatasetVersions: + """Test dataset version operations.""" + + def test_list_versions(self, registry_client): + """Test listing versions for a dataset.""" + # Get a dataset first + response = registry_client.datasets.list(limit=1) + + if not response.datasets: + pytest.skip('No datasets available in registry') + + dataset = response.datasets[0] + + # List versions + versions = registry_client.datasets.list_versions(dataset.namespace, dataset.name) + + from amp.registry.models import DatasetVersionStatus + + assert isinstance(versions, list) + assert len(versions) > 0 + + version = versions[0] + assert version.version_tag + assert version.status in [ + DatasetVersionStatus.draft, + DatasetVersionStatus.published, + DatasetVersionStatus.deprecated, + DatasetVersionStatus.archived, + ] + + def test_get_version(self, registry_client): + """Test getting specific version details.""" + # Get a dataset and its versions + response = registry_client.datasets.list(limit=1) + + if not response.datasets: + pytest.skip('No datasets available in registry') + + dataset = response.datasets[0] + versions = registry_client.datasets.list_versions(dataset.namespace, dataset.name) + + if not versions: + pytest.skip('No versions available for dataset') + + # Get specific version + version_detail = registry_client.datasets.get_version(dataset.namespace, dataset.name, versions[0].version_tag) + + assert version_detail.version_tag == versions[0].version_tag + assert version_detail.status + + +class TestDatasetManifest: + """Test manifest operations.""" + + def test_get_manifest(self, registry_client): + """Test fetching dataset manifest.""" + # Get a dataset first + response = registry_client.datasets.list(limit=1) + + if not response.datasets: + pytest.skip('No datasets available in registry') + + dataset = response.datasets[0] + + # Skip if dataset has no latest_version + if not dataset.latest_version or not dataset.latest_version.version_tag: + pytest.skip('Dataset has no latest version') + + # Fetch manifest + manifest = registry_client.datasets.get_manifest( + dataset.namespace, dataset.name, dataset.latest_version.version_tag + ) + + assert isinstance(manifest, dict) + assert 'kind' in manifest or 'tables' in manifest or 'dependencies' in manifest + + def test_get_manifest_for_specific_version(self, registry_client): + """Test fetching manifest for a specific version.""" + # Get a dataset and list versions + response = registry_client.datasets.list(limit=1) + + if not response.datasets: + pytest.skip('No datasets available in registry') + + dataset = response.datasets[0] + versions = registry_client.datasets.list_versions(dataset.namespace, dataset.name) + + if not versions: + pytest.skip('No versions available for dataset') + + # Get manifest for specific version + manifest = registry_client.datasets.get_manifest(dataset.namespace, dataset.name, versions[0].version_tag) + + assert isinstance(manifest, dict) diff --git a/tests/integration/registry/test_search.py b/tests/integration/registry/test_search.py new file mode 100644 index 0000000..302d160 --- /dev/null +++ b/tests/integration/registry/test_search.py @@ -0,0 +1,208 @@ +"""Integration tests for Registry search operations. + +These tests interact with the real staging Registry API. +Search operations don't require authentication. +""" + +import pytest + +from amp.registry import RegistryClient + + +@pytest.fixture +def registry_client(): + """Create registry client for staging environment.""" + client = RegistryClient() + yield client + client.close() + + +class TestDatasetSearch: + """Test full-text search functionality.""" + + def test_search_empty_query(self, registry_client): + """Test search with empty query returns results.""" + results = registry_client.datasets.search('', limit=10) + + assert results.total_count >= 0 + assert isinstance(results.datasets, list) + + def test_search_with_keyword(self, registry_client): + """Test search with a common keyword.""" + results = registry_client.datasets.search('ethereum', limit=10) + + assert results.total_count >= 0 + assert isinstance(results.datasets, list) + + # If results exist, verify they have required fields + if results.datasets: + dataset = results.datasets[0] + assert dataset.namespace + assert dataset.name + assert hasattr(dataset, 'score') + assert isinstance(dataset.score, (int, float)) + + def test_search_results_have_scores(self, registry_client): + """Test that search results include relevance scores.""" + results = registry_client.datasets.search('blocks', limit=5) + + if results.datasets: + for dataset in results.datasets: + assert hasattr(dataset, 'score') + # Scores should be numeric + assert isinstance(dataset.score, (int, float)) + + def test_search_pagination(self, registry_client): + """Test search pagination.""" + page1 = registry_client.datasets.search('ethereum', limit=5, page=1) + page2 = registry_client.datasets.search('ethereum', limit=5, page=2) + + # Both pages should have valid responses + assert page1.total_count >= 0 + assert page2.total_count >= 0 + + # Total count should be the same across pages + if page1.total_count > 0: + assert page1.total_count == page2.total_count + + def test_search_limit_parameter(self, registry_client): + """Test that limit parameter is respected.""" + limit = 3 + results = registry_client.datasets.search('ethereum', limit=limit) + + # Results should not exceed limit + assert len(results.datasets) <= limit + + def test_search_no_results(self, registry_client): + """Test search with query unlikely to match anything.""" + results = registry_client.datasets.search('xyznonexistentqueryfoobar123', limit=10) + + # Should return valid response even with no matches + assert results.total_count >= 0 + assert isinstance(results.datasets, list) + + +class TestAISearch: + """Test AI semantic search functionality.""" + + def test_ai_search_basic(self, registry_client): + """Test basic AI search.""" + from amp.registry.errors import RegistryDatabaseError + + try: + results = registry_client.datasets.ai_search('find blockchain data', limit=5) + + assert isinstance(results, list) + + # If results exist, verify structure + if results: + dataset = results[0] + assert dataset.namespace + assert dataset.name + assert hasattr(dataset, 'score') + + except RegistryDatabaseError as e: + # AI search might not be configured on staging environment + if 'openai' in str(e).lower() or 'not been configured' in str(e).lower(): + pytest.skip(f'AI search not configured: {e}') + else: + raise + except Exception as e: + # Other potential errors + if 'not available' in str(e).lower() or 'not implemented' in str(e).lower(): + pytest.skip(f'AI search not available: {e}') + else: + raise + + def test_ai_search_natural_language(self, registry_client): + """Test AI search with natural language query.""" + from amp.registry.errors import RegistryDatabaseError + + try: + # Natural language queries should work + results = registry_client.datasets.ai_search('I need NFT transfer data', limit=5) + + assert isinstance(results, list) + + if results: + # Results should have scores + for dataset in results: + assert hasattr(dataset, 'score') + assert isinstance(dataset.score, (int, float)) + + except RegistryDatabaseError as e: + if 'openai' in str(e).lower() or 'not been configured' in str(e).lower(): + pytest.skip(f'AI search not configured: {e}') + else: + raise + except Exception as e: + if 'not available' in str(e).lower() or 'not implemented' in str(e).lower(): + pytest.skip(f'AI search not available: {e}') + else: + raise + + def test_ai_search_limit(self, registry_client): + """Test AI search respects limit parameter.""" + from amp.registry.errors import RegistryDatabaseError + + try: + limit = 3 + results = registry_client.datasets.ai_search('ethereum blocks', limit=limit) + + assert isinstance(results, list) + assert len(results) <= limit + + except RegistryDatabaseError as e: + if 'openai' in str(e).lower() or 'not been configured' in str(e).lower(): + pytest.skip(f'AI search not configured: {e}') + else: + raise + except Exception as e: + if 'not available' in str(e).lower() or 'not implemented' in str(e).lower(): + pytest.skip(f'AI search not available: {e}') + else: + raise + + +class TestSearchIntegration: + """Test integration between search and other operations.""" + + def test_search_then_get_dataset(self, registry_client): + """Test workflow: search → get dataset details.""" + # Search for datasets + results = registry_client.datasets.search('ethereum', limit=1) + + if not results.datasets: + pytest.skip('No search results available') + + dataset = results.datasets[0] + + # Get full dataset details + full_dataset = registry_client.datasets.get(dataset.namespace, dataset.name) + + assert full_dataset.namespace == dataset.namespace + assert full_dataset.name == dataset.name + + def test_search_then_get_manifest(self, registry_client): + """Test workflow: search → get manifest.""" + # Search for datasets + results = registry_client.datasets.search('blocks', limit=1) + + if not results.datasets: + pytest.skip('No search results available') + + dataset = results.datasets[0] + + # Get full dataset to get latest version + full_dataset = registry_client.datasets.get(dataset.namespace, dataset.name) + + # Skip if no latest version + if not full_dataset.latest_version or not full_dataset.latest_version.version_tag: + pytest.skip('Dataset has no latest version') + + # Fetch manifest + manifest = registry_client.datasets.get_manifest( + full_dataset.namespace, full_dataset.name, full_dataset.latest_version.version_tag + ) + + assert isinstance(manifest, dict) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 00b0488..f9fd98b 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -149,7 +149,8 @@ def getenv_side_effect(key, default=None): Client(query_url='grpc://localhost:1602', auth=True) # Verify auth file was used - mock_auth_service.assert_called_once() + # Note: AuthService is called twice - once for Flight SQL, once for RegistryClient + assert mock_auth_service.call_count == 2 mock_connect.assert_called_once() call_args = mock_connect.call_args middleware = call_args[1].get('middleware', [])