From 8149ea98e02ccce880d78348f4f4912c857783e2 Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 20 Nov 2025 22:59:04 -0300 Subject: [PATCH 1/4] feat(utils): Add shared manifest inspection utilities - describe_manifest(): Extract structured schema from manifests - format_arrow_type(): Format Arrow types into readable strings - print_schema(): Pretty-print schema in human-readable format --- src/amp/utils/__init__.py | 5 + src/amp/utils/manifest_inspector.py | 98 ++++++++++++++++++ tests/unit/test_registry_inspect.py | 150 ++++++++++++++++++++++++++++ 3 files changed, 253 insertions(+) create mode 100644 src/amp/utils/__init__.py create mode 100644 src/amp/utils/manifest_inspector.py create mode 100644 tests/unit/test_registry_inspect.py diff --git a/src/amp/utils/__init__.py b/src/amp/utils/__init__.py new file mode 100644 index 0000000..74a3201 --- /dev/null +++ b/src/amp/utils/__init__.py @@ -0,0 +1,5 @@ +"""Utility modules for amp Python client.""" + +from .manifest_inspector import describe_manifest, format_arrow_type, print_schema + +__all__ = ['describe_manifest', 'format_arrow_type', 'print_schema'] diff --git a/src/amp/utils/manifest_inspector.py b/src/amp/utils/manifest_inspector.py new file mode 100644 index 0000000..0828fee --- /dev/null +++ b/src/amp/utils/manifest_inspector.py @@ -0,0 +1,98 @@ +"""Shared utilities for inspecting dataset manifests. + +This module provides functions to parse and display dataset schemas +from manifest files in a human-readable format. +""" + +from typing import Any, Dict + + +def describe_manifest(manifest: dict) -> Dict[str, list[Dict[str, str | bool]]]: + """Extract structured schema information from a manifest. + + Args: + manifest: Dataset manifest dictionary + + Returns: + dict: Mapping of table names to column information. Each column is a dict with: + - name: Column name + - type: Arrow type (simplified string representation) + - nullable: Whether the column allows NULL values + """ + tables = manifest.get('tables', {}) + + result = {} + for table_name, table_def in tables.items(): + schema = table_def.get('schema', {}).get('arrow', {}) + fields = schema.get('fields', []) + + columns = [] + for field in fields: + col_type = format_arrow_type(field.get('type')) + columns.append( + { + 'name': field.get('name', ''), + 'type': col_type, + 'nullable': field.get('nullable', True), + } + ) + + result[table_name] = columns + + return result + + +def format_arrow_type(type_def: Any) -> str: + """Format Arrow type definition into a readable string. + + Args: + type_def: Arrow type definition (str or dict) + + Returns: + str: Human-readable type string + """ + if isinstance(type_def, str): + return type_def + elif isinstance(type_def, dict): + # Handle complex types like Timestamp, FixedSizeBinary, Decimal128 + if 'Timestamp' in type_def: + unit = type_def['Timestamp'][0] if type_def['Timestamp'] else 'Unknown' + return f'Timestamp({unit})' + elif 'FixedSizeBinary' in type_def: + size = type_def['FixedSizeBinary'] + return f'FixedSizeBinary({size})' + elif 'Decimal128' in type_def: + precision, scale = type_def['Decimal128'] + return f'Decimal128({precision},{scale})' + else: + # Fallback for unknown complex types + return str(type_def) + else: + return str(type_def) + + +def print_schema(schema: Dict[str, list[Dict[str, Any]]], header: str = None) -> None: + """Pretty-print a schema dictionary. + + Args: + schema: Schema dictionary from describe_manifest() + header: Optional header text to print before the schema + """ + if header: + print(f'\n{header}') + + if not schema: + print('\n (No tables found in manifest)') + return + + # Print each table + for table_name, columns in schema.items(): + print(f'\n{table_name} ({len(columns)} columns)') + for col in columns: + nullable_str = 'NULL ' if col['nullable'] else 'NOT NULL' + # Pad column name for alignment + col_name = col['name'].ljust(20) + col_type = col['type'].ljust(20) + print(f' {col_name} {col_type} {nullable_str}') + + print() # Empty line at end diff --git a/tests/unit/test_registry_inspect.py b/tests/unit/test_registry_inspect.py new file mode 100644 index 0000000..95e5a69 --- /dev/null +++ b/tests/unit/test_registry_inspect.py @@ -0,0 +1,150 @@ +"""Unit tests for registry dataset inspection methods.""" + +import pytest + +from amp.registry.datasets import RegistryDatasetsClient +from amp.utils.manifest_inspector import format_arrow_type + + +class MockRegistryClient: + """Mock registry client for testing.""" + + def __init__(self, manifest): + self.manifest = manifest + + def _request(self, method, path, params=None): + """Mock HTTP request.""" + + class MockResponse: + def json(self): + return manifest + + return MockResponse() + + +# Sample manifest for testing +manifest = { + 'kind': 'manifest', + 'dependencies': {}, + 'tables': { + 'blocks': { + 'schema': { + 'arrow': { + 'fields': [ + {'name': 'block_num', 'type': 'UInt64', 'nullable': False}, + {'name': 'timestamp', 'type': {'Timestamp': ['Nanosecond', '+00:00']}, 'nullable': False}, + {'name': 'hash', 'type': {'FixedSizeBinary': 32}, 'nullable': False}, + {'name': 'base_fee_per_gas', 'type': {'Decimal128': [38, 0]}, 'nullable': True}, + ] + } + }, + }, + 'transactions': { + 'schema': { + 'arrow': { + 'fields': [ + {'name': 'tx_hash', 'type': {'FixedSizeBinary': 32}, 'nullable': False}, + {'name': 'from', 'type': {'FixedSizeBinary': 20}, 'nullable': False}, + {'name': 'value', 'type': {'Decimal128': [38, 0]}, 'nullable': True}, + ] + } + }, + }, + }, +} + + +@pytest.mark.unit +class TestDatasetInspection: + """Test dataset inspection methods.""" + + def test_format_arrow_type_primitive(self): + """Test formatting primitive Arrow types.""" + assert format_arrow_type('UInt64') == 'UInt64' + assert format_arrow_type('Binary') == 'Binary' + assert format_arrow_type('Boolean') == 'Boolean' + + def test_format_arrow_type_timestamp(self): + """Test formatting Timestamp types.""" + result = format_arrow_type({'Timestamp': ['Nanosecond', '+00:00']}) + assert result == 'Timestamp(Nanosecond)' + + result = format_arrow_type({'Timestamp': ['Microsecond', '+00:00']}) + assert result == 'Timestamp(Microsecond)' + + def test_format_arrow_type_fixed_binary(self): + """Test formatting FixedSizeBinary types.""" + result = format_arrow_type({'FixedSizeBinary': 32}) + assert result == 'FixedSizeBinary(32)' + + result = format_arrow_type({'FixedSizeBinary': 20}) + assert result == 'FixedSizeBinary(20)' + + def test_format_arrow_type_decimal(self): + """Test formatting Decimal128 types.""" + result = format_arrow_type({'Decimal128': [38, 0]}) + assert result == 'Decimal128(38,0)' + + result = format_arrow_type({'Decimal128': [18, 6]}) + assert result == 'Decimal128(18,6)' + + def test_describe_returns_correct_structure(self): + """Test that describe returns the expected structure.""" + # Create mock client with test manifest + mock_registry = MockRegistryClient(manifest) + client = RegistryDatasetsClient(mock_registry) + + # Mock get_manifest to return our test manifest + client.get_manifest = lambda ns, name, ver: manifest + + # Call describe + schema = client.describe('test', 'dataset', 'latest') + + # Verify structure + assert 'blocks' in schema + assert 'transactions' in schema + + # Check blocks table + blocks = schema['blocks'] + assert len(blocks) == 4 + assert blocks[0]['name'] == 'block_num' + assert blocks[0]['type'] == 'UInt64' + assert blocks[0]['nullable'] is False + + # Check formatted complex types + assert blocks[1]['name'] == 'timestamp' + assert blocks[1]['type'] == 'Timestamp(Nanosecond)' + + assert blocks[2]['name'] == 'hash' + assert blocks[2]['type'] == 'FixedSizeBinary(32)' + + assert blocks[3]['name'] == 'base_fee_per_gas' + assert blocks[3]['type'] == 'Decimal128(38,0)' + assert blocks[3]['nullable'] is True + + def test_describe_handles_empty_manifest(self): + """Test that describe handles manifests with no tables.""" + empty_manifest = {'kind': 'manifest', 'dependencies': {}, 'tables': {}} + + mock_registry = MockRegistryClient(empty_manifest) + client = RegistryDatasetsClient(mock_registry) + client.get_manifest = lambda ns, name, ver: empty_manifest + + schema = client.describe('test', 'dataset', 'latest') + assert schema == {} + + def test_describe_handles_nullable_field(self): + """Test that describe correctly identifies nullable fields.""" + mock_registry = MockRegistryClient(manifest) + client = RegistryDatasetsClient(mock_registry) + client.get_manifest = lambda ns, name, ver: manifest + + schema = client.describe('test', 'dataset', 'latest') + + # Check nullable fields + transactions = schema['transactions'] + value_field = next(col for col in transactions if col['name'] == 'value') + assert value_field['nullable'] is True + + from_field = next(col for col in transactions if col['name'] == 'from') + assert from_field['nullable'] is False From b6230ea6f559ad6556b9664fed0d97c49f9f79b4 Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 20 Nov 2025 22:59:30 -0300 Subject: [PATCH 2/4] feat(registry): Add inspect() and describe() methods to datasets client - describe(namespace, name, version): Returns structured schema dictionary mapping table names to column info (name, type, nullable) - inspect(namespace, name, version): Pretty-prints dataset structure in human-readable format for interactive exploration - Both methods use the shared manifest inspection utilities for consistency --- src/amp/registry/datasets.py | 68 ++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/src/amp/registry/datasets.py b/src/amp/registry/datasets.py index 9f39fc6..dd98c64 100644 --- a/src/amp/registry/datasets.py +++ b/src/amp/registry/datasets.py @@ -5,6 +5,8 @@ import logging from typing import TYPE_CHECKING, Any, Dict, Optional +from amp.utils.manifest_inspector import describe_manifest, print_schema + from . import models if TYPE_CHECKING: @@ -197,6 +199,72 @@ def get_manifest(self, namespace: str, name: str, version: str) -> dict: response = self._registry._request('GET', path) return response.json() + def describe(self, namespace: str, name: str, version: str = 'latest') -> Dict[str, list[Dict[str, str | bool]]]: + """Get a structured summary of tables and columns in a dataset. + + Returns a dictionary mapping table names to lists of column information, + making it easy to programmatically inspect the dataset schema. + + Args: + namespace: Dataset namespace + name: Dataset name + version: Version tag (default: 'latest') + + Returns: + dict: Mapping of table names to column information. Each column is a dict with: + - name: Column name (str) + - type: Arrow type (str, simplified representation) + - nullable: Whether the column allows NULL values (bool) + + Example: + >>> client = RegistryClient() + >>> schema = client.datasets.describe('edgeandnode', 'ethereum-mainnet', 'latest') + >>> for table_name, columns in schema.items(): + ... print(f"\\nTable: {table_name}") + ... for col in columns: + ... nullable = "NULL" if col['nullable'] else "NOT NULL" + ... print(f" {col['name']}: {col['type']} {nullable}") + """ + manifest = self.get_manifest(namespace, name, version) + return describe_manifest(manifest) + + def inspect(self, namespace: str, name: str, version: str = 'latest') -> None: + """Pretty-print the structure of a dataset for easy inspection. + + Displays tables and their columns in a human-readable format. + This is perfect for exploring datasets interactively. + + Args: + namespace: Dataset namespace + name: Dataset name + version: Version tag (default: 'latest') + + Example: + >>> client = RegistryClient() + >>> client.datasets.inspect('graphops', 'ethereum-mainnet') + Dataset: graphops/ethereum-mainnet@latest + + blocks (4 columns) + block_num UInt64 NOT NULL + timestamp Timestamp NOT NULL + hash FixedSizeBinary(32) NOT NULL + parent_hash FixedSizeBinary(32) NOT NULL + + transactions (23 columns) + block_num UInt64 NOT NULL + tx_hash FixedSizeBinary(32) NOT NULL + ... + """ + # Get dataset info + dataset = self.get(namespace, name) + header = f'Dataset: {namespace}/{name}@{version}' + if dataset.description: + header += f'\nDescription: {dataset.description}' + + # Get schema and print + schema = self.describe(namespace, name, version) + print_schema(schema, header=header) + # Write Operations (Require Authentication) def publish( From 744668ec7d929e44695d952d7119991ffa1618c7 Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 20 Nov 2025 23:06:29 -0300 Subject: [PATCH 3/4] feat(admin): Add inspect() and describe() methods to datasets client - describe(namespace, name, revision): Returns structured schema dictionary - inspect(namespace, name, revision): Pretty-prints dataset structure --- src/amp/admin/datasets.py | 75 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/src/amp/admin/datasets.py b/src/amp/admin/datasets.py index 4cc42d0..b47f107 100644 --- a/src/amp/admin/datasets.py +++ b/src/amp/admin/datasets.py @@ -4,7 +4,9 @@ including registration, deployment, versioning, and manifest operations. """ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Dict, Optional + +from amp.utils.manifest_inspector import describe_manifest, print_schema from . import models @@ -198,6 +200,77 @@ def get_manifest(self, namespace: str, name: str, revision: str) -> dict: response = self._admin._request('GET', path) return response.json() + def describe(self, namespace: str, name: str, revision: str = 'latest') -> Dict[str, list[Dict[str, str | bool]]]: + """Get a structured summary of tables and columns in a dataset. + + Returns a dictionary mapping table names to lists of column information, + making it easy to programmatically inspect the dataset schema. + + Args: + namespace: Dataset namespace + name: Dataset name + revision: Version tag (default: 'latest') + + Returns: + dict: Mapping of table names to column information. Each column is a dict with: + - name: Column name (str) + - type: Arrow type (str, simplified representation) + - nullable: Whether the column allows NULL values (bool) + + Example: + >>> client = AdminClient('http://localhost:8080') + >>> schema = client.datasets.describe('_', 'eth_firehose', 'latest') + >>> for table_name, columns in schema.items(): + ... print(f"\\nTable: {table_name}") + ... for col in columns: + ... nullable = "NULL" if col['nullable'] else "NOT NULL" + ... print(f" {col['name']}: {col['type']} {nullable}") + """ + manifest = self.get_manifest(namespace, name, revision) + return describe_manifest(manifest) + + def inspect(self, namespace: str, name: str, revision: str = 'latest') -> None: + """Pretty-print the structure of a dataset for easy inspection. + + Displays tables and their columns in a human-readable format. + This is perfect for exploring datasets interactively. + + Args: + namespace: Dataset namespace + name: Dataset name + revision: Version tag (default: 'latest') + + Example: + >>> client = AdminClient('http://localhost:8080') + >>> client.datasets.inspect('_', 'eth_firehose') + Dataset: _/eth_firehose@latest + + blocks (21 columns) + block_num UInt64 NOT NULL + timestamp Timestamp NOT NULL + hash FixedSizeBinary(32) NOT NULL + ... + + transactions (24 columns) + tx_hash FixedSizeBinary(32) NOT NULL + from FixedSizeBinary(20) NOT NULL + to FixedSizeBinary(20) NULL + ... + """ + header = f'Dataset: {namespace}/{name}@{revision}' + + # Try to get version info for additional context (optional, might not always work) + try: + version_info = self.get_version(namespace, name, revision) + if hasattr(version_info, 'kind'): + header += f'\nKind: {version_info.kind}' + except Exception: + # If we can't get version info, that's okay - just continue + pass + + schema = self.describe(namespace, name, revision) + print_schema(schema, header=header) + def delete(self, namespace: str, name: str) -> None: """Delete all versions and metadata for a dataset. From 793ff37913905e89e643542b20f4a903adac0041 Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 20 Nov 2025 23:35:43 -0300 Subject: [PATCH 4/4] docs: Add comprehensive guide for dataset inspection - Full guide covering both Registry and Admin clients - Use cases: interactive exploration, finding specific columns, type inspection, checking nullability, building dynamic queries - Practical examples for finding Ethereum addresses and hashes - Complete API reference with expected output --- docs/inspecting_datasets.md | 211 ++++++++++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 docs/inspecting_datasets.md diff --git a/docs/inspecting_datasets.md b/docs/inspecting_datasets.md new file mode 100644 index 0000000..ea3fb03 --- /dev/null +++ b/docs/inspecting_datasets.md @@ -0,0 +1,211 @@ +# Inspecting Dataset Schemas + +The Registry client and Admin API on standard client provide convenient methods to explore dataset structures without manually parsing manifests. + +## Quick Start + +```python +from amp.registry import RegistryClient + +client = RegistryClient() # Note: Inspection functionality is also available on the Admin api of the regular client (Client()) + +# Pretty-print dataset structure +client.datasets.inspect('edgeandnode', 'ethereum-mainnet') + +# Get structured schema data +schema = client.datasets.describe('edgeandnode', 'ethereum-mainnet') +``` + +## Methods + +### `inspect(namespace, name, version='latest')` + +Pretty-prints the dataset structure in a human-readable format. Perfect for interactive exploration. + +**Example Output:** +``` +Dataset: edgeandnode/ethereum-mainnet@latest +Description: Ethereum mainnet blockchain data + +📊 blocks (21 columns) + • block_num UInt64 NOT NULL + • timestamp Timestamp(Nanosecond) NOT NULL + • hash FixedSizeBinary(32) NOT NULL + • parent_hash FixedSizeBinary(32) NOT NULL + • miner FixedSizeBinary(20) NOT NULL + ... + +📊 transactions (24 columns) + • block_num UInt64 NOT NULL + • tx_hash FixedSizeBinary(32) NOT NULL + • from FixedSizeBinary(20) NOT NULL + • to FixedSizeBinary(20) NULL + ... +``` + +### `describe(namespace, name, version='latest')` + +Returns a structured dictionary mapping table names to column information. Use this for programmatic access. + +**Returns:** +```python +{ + 'blocks': [ + {'name': 'block_num', 'type': 'UInt64', 'nullable': False}, + {'name': 'timestamp', 'type': 'Timestamp(Nanosecond)', 'nullable': False}, + {'name': 'hash', 'type': 'FixedSizeBinary(32)', 'nullable': False}, + ... + ], + 'transactions': [ + {'name': 'tx_hash', 'type': 'FixedSizeBinary(32)', 'nullable': False}, + ... + ] +} +``` + +## Use Cases + +### 1. Interactive Exploration + +```python +# Quickly see what's available +client.datasets.inspect('namespace', 'dataset-name') +``` + +### 2. Finding Specific Columns + +```python +schema = client.datasets.describe('namespace', 'dataset-name') + +# Find tables with specific columns +for table_name, columns in schema.items(): + col_names = [col['name'] for col in columns] + if 'address' in col_names: + print(f"Table '{table_name}' has an address column") +``` + +### 3. Finding Ethereum Addresses + +```python +schema = client.datasets.describe('namespace', 'dataset-name') + +# Find all address columns (20-byte binary fields) +for table_name, columns in schema.items(): + address_cols = [col['name'] for col in columns if col['type'] == 'FixedSizeBinary(20)'] + if address_cols: + print(f"{table_name}: {', '.join(address_cols)}") + +# Example output: +# blocks: miner +# transactions: from, to +# logs: address +``` + +### 4. Finding Transaction/Block Hashes + +```python +schema = client.datasets.describe('namespace', 'dataset-name') + +# Find all hash columns (32-byte binary fields) +for table_name, columns in schema.items(): + hash_cols = [col['name'] for col in columns if col['type'] == 'FixedSizeBinary(32)'] + if hash_cols: + print(f"{table_name}: {', '.join(hash_cols)}") + +# Example output: +# blocks: hash, parent_hash, state_root, transactions_root +# transactions: block_hash, tx_hash +# logs: block_hash, tx_hash, topic0, topic1, topic2, topic3 +``` + +### 5. Checking Nullable Columns + +```python +schema = client.datasets.describe('namespace', 'dataset-name') + +# Find columns that allow NULL values (important for data quality) +for table_name, columns in schema.items(): + nullable_cols = [col['name'] for col in columns if col['nullable']] + print(f"{table_name}: {len(nullable_cols)}/{len(columns)} nullable columns") + print(f" Nullable: {', '.join(nullable_cols[:5])}") + +# Example output: +# transactions: 5/24 nullable columns +# Nullable: to, gas_price, value, max_fee_per_gas, max_priority_fee_per_gas +``` + +### 6. Building Dynamic Queries + +```python +from amp import Client + +registry_client = RegistryClient() +client = Client( + query_url='grpc://localhost:1602', + admin_url='http://localhost:8080', + auth=True +) + +# Discover available tables +schema = registry_client.datasets.describe('namespace', 'dataset-name') +print(f"Available tables: {list(schema.keys())}") + +# Build query based on available columns +if 'blocks' in schema: + block_cols = [col['name'] for col in schema['blocks']] + if 'block_num' in block_cols and 'timestamp' in block_cols: + # Safe to query these columns + result = client.sql("SELECT block_num, timestamp FROM blocks LIMIT 10") +``` + +## Supported Arrow Types + +The `describe()` and `inspect()` methods handle these Arrow types: + +- **Primitives**: `UInt64`, `Int32`, `Boolean`, `Binary` +- **Timestamps**: `Timestamp(Nanosecond)`, `Timestamp(Microsecond)`, etc. +- **Fixed-size Binary**: `FixedSizeBinary(20)` (addresses), `FixedSizeBinary(32)` (hashes) +- **Decimals**: `Decimal128(38,0)` (large integers), `Decimal128(18,6)` (fixed-point) + +## Complete Example + +```python +from amp.registry import RegistryClient +from amp import Client + +# Step 1: Discover datasets +registry = RegistryClient() +results = registry.datasets.search('ethereum blocks') + +print("Available datasets:") +for ds in results.datasets[:5]: + print(f" • {ds.namespace}/{ds.name}") + +# Step 2: Inspect a dataset +print("\nInspecting dataset structure:") +registry.datasets.inspect('graphops', 'ethereum-mainnet') + +# Step 3: Get schema programmatically +schema = registry.datasets.describe('graphops', 'ethereum-mainnet') + +# Step 4: Query based on discovered schema +client = Client(query_url='grpc://your-server:1602', auth=True) + +# Find tables with block_num column +tables_with_blocks = [ + table for table, cols in schema.items() + if any(col['name'] == 'block_num' for col in cols) +] + +for table in tables_with_blocks: + print(f"\nQuerying {table}...") + results = client.sql(f"SELECT * FROM {table} LIMIT 5").to_arrow() + print(f" Rows: {len(results)}") +``` + +## Tips + +1. **Use `inspect()` interactively**: Great for Jupyter notebooks or REPL exploration +2. **Use `describe()` in scripts**: When you need programmatic access to schema info +3. **Check nullability**: The `nullable` field tells you if a column can have NULL values +4. **Version pinning**: Always specify a version in production (`version='1.2.3'`) instead of using `'latest'`