diff --git a/README.md b/README.md index 279f915..2a8faae 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,17 @@ ## Overview -Python client for Amp - a high-performance data infrastructure for blockchain data. +Python client for Amp - a database for blockchain data. **Features:** - **Query Client**: Issue Flight SQL queries to Amp servers - **Admin Client**: Manage datasets, deployments, and jobs programmatically +- **Registry Client**: Discover, search, and publish datasets to the Registry +- **Dataset Inspection**: Explore dataset schemas with `inspect()` and `describe()` methods - **Data Loaders**: Zero-copy loading into PostgreSQL, Redis, Snowflake, Delta Lake, Iceberg, and more - **Parallel Streaming**: High-throughput parallel data ingestion with automatic resume - **Manifest Generation**: Fluent API for creating and deploying datasets from SQL queries +- **Auto-Refreshing Auth**: Seamless authentication with automatic token refresh ## Dependencies 1. Rust @@ -45,7 +48,7 @@ from amp import Client client = Client(url="grpc://localhost:8815") # Execute query and convert to pandas -df = client.query("SELECT * FROM eth.blocks LIMIT 10").to_pandas() +df = client.sql("SELECT * FROM eth.blocks LIMIT 10").to_arrow().to_pandas() print(df) ``` @@ -63,7 +66,7 @@ client = Client( # Register and deploy a dataset job = ( - client.query("SELECT block_num, hash FROM eth.blocks") + client.sql("SELECT block_num, hash FROM eth.blocks") .with_dependency('eth', '_/eth_firehose@1.0.0') .register_as('_', 'my_dataset', '1.0.0', 'blocks', 'mainnet') .deploy(parallelism=4, end_block='latest', wait=True) @@ -76,12 +79,97 @@ print(f"Deployment completed: {job.status}") ```python # Load query results into PostgreSQL -loader = client.query("SELECT * FROM eth.blocks").load( - loader_type='postgresql', +result = client.sql("SELECT * FROM eth.blocks").load( connection='my_pg_connection', - table_name='eth_blocks' + destination='eth_blocks' ) -print(f"Loaded {loader.rows_written} rows") +print(f"Loaded {result.rows_loaded} rows") +``` + +### Authentication + +The client supports three authentication methods (in priority order): + +```python +from amp import Client + +# 1. Explicit token (highest priority) +client = Client( + url="grpc://localhost:8815", + auth_token="your-token" +) + +# 2. Environment variable +# export AMP_AUTH_TOKEN="your-token" +client = Client(url="grpc://localhost:8815") + +# 3. Shared auth file (auto-refresh, recommended) +# Uses ~/.amp/cache/amp_cli_auth (shared with TypeScript CLI) +client = Client( + url="grpc://localhost:8815", + auth=True # Automatically refreshes expired tokens +) +``` + +### Registry - Discovering Datasets + +```python +from amp import Client + +# Connect with registry support +client = Client( + query_url="grpc://localhost:8815", + registry_url="https://api.registry.amp.staging.thegraph.com", + auth=True +) + +# Search for datasets +results = client.registry.datasets.search('ethereum blocks') +for dataset in results.datasets[:5]: + print(f"{dataset.namespace}/{dataset.name} - {dataset.description}") + +# Get dataset details +dataset = client.registry.datasets.get('edgeandnode', 'ethereum-mainnet') +print(f"Latest version: {dataset.latest_version}") + +# Inspect dataset schema +client.registry.datasets.inspect('edgeandnode', 'ethereum-mainnet') +``` + +### Dataset Inspection + +Explore dataset schemas before querying: + +```python +from amp.registry import RegistryClient + +client = RegistryClient() + +# Pretty-print dataset structure (interactive) +client.datasets.inspect('edgeandnode', 'ethereum-mainnet') +# Output: +# Dataset: edgeandnode/ethereum-mainnet@latest +# +# blocks (21 columns) +# block_num UInt64 NOT NULL +# timestamp Timestamp(Nanosecond) NOT NULL +# hash FixedSizeBinary(32) NOT NULL +# ... + +# Get structured schema data (programmatic) +schema = client.datasets.describe('edgeandnode', 'ethereum-mainnet') + +# Find tables with specific columns +for table_name, columns in schema.items(): + col_names = [col['name'] for col in columns] + if 'block_num' in col_names: + print(f"Table '{table_name}' has block_num column") + +# Find all address columns (20-byte binary) +for table_name, columns in schema.items(): + addresses = [col['name'] for col in columns if col['type'] == 'FixedSizeBinary(20)'] + if addresses: + print(f"{table_name}: {', '.join(addresses)}") ``` ## Usage @@ -108,7 +196,9 @@ uv run apps/execute_query.py ### Getting Started - **[Admin Client Guide](docs/admin_client_guide.md)** - Complete guide for dataset management and deployment -- **[Admin API Reference](docs/api/admin_api.md)** - Full API documentation for admin operations +- **[Registry Guide](docs/registry-guide.md)** - Discover and search datasets in the Registry +- **[Dataset Inspection](docs/inspecting_datasets.md)** - Explore dataset schemas with `inspect()` and `describe()` +- **[Admin API Reference](docs/api/client_api.md)** - Full API documentation for admin operations ### Features - **[Parallel Streaming Usage Guide](docs/parallel_streaming_usage.md)** - User guide for high-throughput parallel data loading diff --git a/docs/admin_client_guide.md b/docs/admin_client_guide.md index 693d28b..42e35e2 100644 --- a/docs/admin_client_guide.md +++ b/docs/admin_client_guide.md @@ -140,7 +140,7 @@ client = Client( ) # Query operations (Flight SQL) -df = client.query("SELECT * FROM eth.blocks LIMIT 10").to_pandas() +df = client.sql("SELECT * FROM eth.blocks LIMIT 10").to_pandas() # Admin operations (HTTP API) datasets = client.datasets.list_all() @@ -170,7 +170,7 @@ The legacy `url` parameter still works for Flight SQL: ```python # This still works client = Client(url="grpc://localhost:8815") -client.query("SELECT * FROM eth.blocks") +client.sql("SELECT * FROM eth.blocks") ``` ### Environment Variables @@ -376,7 +376,7 @@ The QueryBuilder provides a fluent API for generating manifests from SQL queries ```python # Build a query -query = client.query("SELECT block_num, hash FROM eth.blocks") +query = client.sql("SELECT block_num, hash FROM eth.blocks") # Add dependencies query = query.with_dependency('eth', '_/eth_firehose@1.0.0') @@ -409,7 +409,7 @@ The most powerful pattern combines query building, manifest generation, registra ```python # Build, register, and deploy in one chain job = ( - client.query("SELECT block_num, hash FROM eth.blocks") + client.sql("SELECT block_num, hash FROM eth.blocks") .with_dependency('eth', '_/eth_firehose@1.0.0') .register_as( namespace='_', @@ -432,7 +432,7 @@ print(f"Deployment completed: {job.status}") ```python manifest = ( - client.query(""" + client.sql(""" SELECT t.token_address, t.amount, @@ -453,8 +453,7 @@ manifest = ( ```python # 1. Develop query locally -# REVIEW: IS THIS CORRECT?? -query = client.query(""" +query = client.sql(""" SELECT block_num, COUNT(*) as tx_count @@ -506,7 +505,7 @@ if job.status == 'Completed': ```python # Register production version context = ( - client.query("SELECT * FROM processed_data") + client.sql("SELECT * FROM processed_data") .with_dependency('raw', '_/raw_data@2.0.0') .register_as('_', 'processed_data', '2.0.0', 'data', 'mainnet') ) @@ -691,7 +690,7 @@ thread.start() ```python # Always specify full dependency references query = ( - client.query("SELECT * FROM base.data") + client.sql("SELECT * FROM base.data") .with_dependency('base', '_/base_dataset@1.0.0') # Include version! ) @@ -700,6 +699,6 @@ query = ( ## Next Steps -- See [API Reference](api/admin_api.md) for complete API documentation +- See [API Reference](api/client_api.md) for complete API documentation - Check [examples/admin/](../examples/admin/) for more code samples - Review the [Admin API OpenAPI spec](../specs/admin.spec.json) for endpoint details diff --git a/docs/api/admin_api.md b/docs/api/client_api.md similarity index 98% rename from docs/api/admin_api.md rename to docs/api/client_api.md index 1e66118..d03763a 100644 --- a/docs/api/admin_api.md +++ b/docs/api/client_api.md @@ -1,4 +1,4 @@ -# Admin API Reference +# Client API Reference Complete API reference for the Amp Admin Client. @@ -90,7 +90,7 @@ Access the SchemaClient for schema operations. #### Methods -##### `query(sql: str) -> QueryBuilder` +##### `sql(sql: str) -> QueryBuilder` Create a QueryBuilder for the given SQL query. @@ -103,7 +103,7 @@ Create a QueryBuilder for the given SQL query. **Example:** ```python -qb = client.query("SELECT * FROM eth.blocks LIMIT 10") +qb = client.sql("SELECT * FROM eth.blocks LIMIT 10") df = qb.to_pandas() ``` @@ -832,7 +832,7 @@ with_dependency(alias: str, reference: str) -> QueryBuilder ```python qb = ( - client.query("SELECT * FROM eth.blocks") + client.sql("SELECT * FROM eth.blocks") .with_dependency('eth', '_/eth_firehose@1.0.0') ) ``` @@ -856,7 +856,7 @@ to_manifest(table_name: str, network: str = 'mainnet') -> dict ```python manifest = ( - client.query("SELECT * FROM eth.blocks") + client.sql("SELECT * FROM eth.blocks") .with_dependency('eth', '_/eth_firehose@1.0.0') .to_manifest('blocks', 'mainnet') ) @@ -890,7 +890,7 @@ register_as( ```python job = ( - client.query("SELECT * FROM eth.blocks") + client.sql("SELECT * FROM eth.blocks") .with_dependency('eth', '_/eth_firehose@1.0.0') .register_as('_', 'my_dataset', '1.0.0', 'blocks') .deploy(parallelism=4, wait=True) @@ -937,7 +937,7 @@ deploy( ```python # Deploy and return immediately -context = client.query(...).register_as(...) +context = client.sql(...).register_as(...) job = context.deploy(parallelism=4) print(f"Started job {job.id}") @@ -965,7 +965,7 @@ client = Client( try: # Build and test query - query = client.query(""" + query = client.sql(""" SELECT block_num, hash, timestamp FROM eth.blocks WHERE block_num > 1000000 diff --git a/docs/inspecting_datasets.md b/docs/inspecting_datasets.md index ea3fb03..0476ac5 100644 --- a/docs/inspecting_datasets.md +++ b/docs/inspecting_datasets.md @@ -183,10 +183,10 @@ for ds in results.datasets[:5]: # Step 2: Inspect a dataset print("\nInspecting dataset structure:") -registry.datasets.inspect('graphops', 'ethereum-mainnet') +registry.datasets.inspect('edgeandnode', 'ethereum-mainnet') # Step 3: Get schema programmatically -schema = registry.datasets.describe('graphops', 'ethereum-mainnet') +schema = registry.datasets.describe('edgeandnode', 'ethereum-mainnet') # Step 4: Query based on discovered schema client = Client(query_url='grpc://your-server:1602', auth=True) diff --git a/docs/registry-guide.md b/docs/registry-guide.md index cd98cb0..404bdc7 100644 --- a/docs/registry-guide.md +++ b/docs/registry-guide.md @@ -29,7 +29,7 @@ client = Client( query_url='grpc://localhost:1602', # Flight SQL queries admin_url='http://localhost:8080', # Admin operations registry_url='https://api.registry.amp.staging.thegraph.com', # Registry (default) - auth=True # Use ~/.amp-cli-config for authentication + auth=True # Use ~/.amp/cache/amp_cli_auth for authentication ) # Search registry @@ -43,7 +43,7 @@ manifest = client.registry.datasets.get_manifest( dataset.latest_version.version_tag ) -client.admin.datasets.register( +client.datasets.register( namespace=dataset.namespace, name=dataset.name, revision=dataset.latest_version.version_tag, @@ -165,7 +165,7 @@ print(f'Dependencies: {list(manifest.get("dependencies", {}).keys())}') Publishing requires authentication. Set up your auth token: ```python -# Option 1: Use existing auth from ~/.amp-cli-config +# Option 1: Use existing auth from ~/.amp/cache/amp_cli_auth from amp import Client client = Client(auth=True) @@ -328,21 +328,21 @@ manifest = client.registry.datasets.get_manifest( # 4. Deploy dependency to local node print(f'Deploying {dataset.namespace}/{dataset.name}...') -client.admin.datasets.register( +client.datasets.register( namespace=dataset.namespace, name=dataset.name, revision=full_dataset.latest_version.version_tag, manifest=manifest ) -deploy_response = client.admin.datasets.deploy( +deploy_response = client.datasets.deploy( dataset.namespace, dataset.name, full_dataset.latest_version.version_tag ) # Wait for deployment -client.admin.jobs.wait_for_completion(deploy_response.job_id) +client.jobs.wait_for_completion(deploy_response.job_id) print('Dependency deployed!') # 5. Create derived dataset @@ -371,15 +371,15 @@ derived_manifest = { } # 6. Deploy derived dataset -client.admin.datasets.register( +client.datasets.register( namespace='_', name='my_sample', revision='1.0.0', manifest=derived_manifest ) -deploy_response = client.admin.datasets.deploy('_', 'my_sample', '1.0.0') -client.admin.jobs.wait_for_completion(deploy_response.job_id) +deploy_response = client.datasets.deploy('_', 'my_sample', '1.0.0') +client.jobs.wait_for_completion(deploy_response.job_id) print('Derived dataset deployed!') # 7. Query the data @@ -480,6 +480,6 @@ registry = RegistryClient( The Registry client uses the same authentication as the Admin API: -1. Interactive login: `~/.amp-cli-config` +1. Interactive login: `~/.amp/cache/amp_cli_auth` 2. Direct token: Pass `auth_token='your-token'` 3. Unified client: Set `auth=True` to use saved credentials diff --git a/src/amp/client.py b/src/amp/client.py index 38f1034..2eee462 100644 --- a/src/amp/client.py +++ b/src/amp/client.py @@ -508,9 +508,9 @@ def registry(self): >>> # Search for datasets >>> results = client.registry.datasets.search('ethereum blocks') >>> # Get a specific dataset - >>> dataset = client.registry.datasets.get('graphops', 'ethereum-mainnet') + >>> dataset = client.registry.datasets.get('edgeandnode', 'ethereum-mainnet') >>> # Fetch manifest - >>> manifest = client.registry.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest') + >>> manifest = client.registry.datasets.get_manifest('edgeandnode', 'ethereum-mainnet', 'latest') """ if not self._registry_client: raise ValueError( diff --git a/src/amp/registry/__init__.py b/src/amp/registry/__init__.py index 933a2ba..a4b9bfb 100644 --- a/src/amp/registry/__init__.py +++ b/src/amp/registry/__init__.py @@ -12,8 +12,8 @@ ... print(f"{dataset.namespace}/{dataset.name} - Score: {dataset.score}") >>> >>> # Get a specific dataset - >>> dataset = client.datasets.get('graphops', 'ethereum-mainnet') - >>> manifest = client.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest') + >>> dataset = client.datasets.get('edgeandnode', 'ethereum-mainnet') + >>> manifest = client.datasets.get_manifest('edgeandnode', 'ethereum-mainnet', 'latest') >>> >>> # Authenticated operations >>> client = RegistryClient(auth_token='your-token') diff --git a/src/amp/registry/datasets.py b/src/amp/registry/datasets.py index dd98c64..9bf94ef 100644 --- a/src/amp/registry/datasets.py +++ b/src/amp/registry/datasets.py @@ -117,7 +117,7 @@ def get(self, namespace: str, name: str) -> models.Dataset: """Get detailed information about a specific dataset. Args: - namespace: Dataset namespace (e.g., 'graphops', 'edgeandnode') + namespace: Dataset namespace (e.g., 'edgeandnode', 'edgeandnode') name: Dataset name (e.g., 'ethereum-mainnet') Returns: @@ -125,7 +125,7 @@ def get(self, namespace: str, name: str) -> models.Dataset: Example: >>> client = RegistryClient() - >>> dataset = client.datasets.get('graphops', 'ethereum-mainnet') + >>> dataset = client.datasets.get('edgeandnode', 'ethereum-mainnet') >>> print(f"Latest version: {dataset.latest_version}") >>> print(f"Visibility: {dataset.visibility}") """ @@ -147,7 +147,7 @@ def list_versions(self, namespace: str, name: str) -> list[models.DatasetVersion Example: >>> client = RegistryClient() - >>> versions = client.datasets.list_versions('graphops', 'ethereum-mainnet') + >>> versions = client.datasets.list_versions('edgeandnode', 'ethereum-mainnet') >>> for version in versions: ... print(f" - v{version.version} ({version.status})") """ @@ -168,7 +168,7 @@ def get_version(self, namespace: str, name: str, version: str) -> models.Dataset Example: >>> client = RegistryClient() - >>> version = client.datasets.get_version('graphops', 'ethereum-mainnet', 'latest') + >>> version = client.datasets.get_version('edgeandnode', 'ethereum-mainnet', 'latest') >>> print(f"Version: {version.version}") >>> print(f"Created: {version.created_at}") """ @@ -191,7 +191,7 @@ def get_manifest(self, namespace: str, name: str, version: str) -> dict: Example: >>> client = RegistryClient() - >>> manifest = client.datasets.get_manifest('graphops', 'ethereum-mainnet', 'latest') + >>> manifest = client.datasets.get_manifest('edgeandnode', 'ethereum-mainnet', 'latest') >>> print(f"Dependencies: {list(manifest.get('dependencies', {}).keys())}") >>> print(f"Tables: {list(manifest.get('tables', {}).keys())}") """ @@ -241,8 +241,8 @@ def inspect(self, namespace: str, name: str, version: str = 'latest') -> None: Example: >>> client = RegistryClient() - >>> client.datasets.inspect('graphops', 'ethereum-mainnet') - Dataset: graphops/ethereum-mainnet@latest + >>> client.datasets.inspect('edgeandnode', 'ethereum-mainnet') + Dataset: edgeandnode/ethereum-mainnet@latest blocks (4 columns) block_num UInt64 NOT NULL