diff --git a/.claude-plugin/CHANGELOG.md b/.claude-plugin/CHANGELOG.md new file mode 100644 index 000000000..2b4385438 --- /dev/null +++ b/.claude-plugin/CHANGELOG.md @@ -0,0 +1,87 @@ + + +# Changelog + +All notable changes to the Hamilton Claude Code plugin will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [2.0.0] - 2025-02-04 + +### Changed (Breaking) +- **Plugin location**: Moved from `.claude/plugins/hamilton/` to `.claude-plugin/` at repo root + - `.claude/` is now reserved for user configuration only + - Simpler installation: `claude plugin install ./.claude-plugin --scope user` + - Self-contained plugin structure +- **Split into focused skills**: Reorganized from single monolithic skill into 6 specialized skills: + - `hamilton-dev-workflow` - Systematic 5-step development workflow (NEW) + - `hamilton-core` - Core patterns, decorators, testing, debugging + - `hamilton-scale` - Async, Ray, Dask, Spark patterns + - `hamilton-llm` - LLM and RAG workflows + - `hamilton-observability` - Monitoring, tracking, lineage + - `hamilton-integrations` - Airflow, FastAPI, Streamlit, etc. + +### Added +- **hamilton-dev-workflow skill**: Systematic workflow for building Hamilton DAGs + - Step 1: Natural language → DOT graph (token-efficient DAG design) + - Step 2: DOT graph → Function signatures + docstrings + - Step 3: Validate DAG with Hamilton CLI (`hamilton build`) + - Step 4: TDD implementation (implement node-by-node with tests) + - Step 5: Pragmatic type annotations (use `Any` + MonkeyType) + - Optimized for LLMs: structured, incremental, validatable approach + - Avoids monolithic implementation and "spaghetti code" + +### Fixed +- Installation instructions now reference correct `.claude-plugin/` path +- DOT graph examples now show correct data flow direction (upstream → downstream) + +### Documentation +- New workflow-based development guide +- Updated README.md with simplified installation +- Improved contributor documentation + +## [1.0.0] - 2025-01-31 + +### Added +- Initial release of Hamilton Claude Code plugin +- Comprehensive skill for Hamilton DAG development +- Support for creating new Hamilton modules with best practices +- Function modifier guidance (@parameterize, @config.when, @extract_columns, @check_output, etc.) +- Code conversion assistance (Python scripts → Hamilton modules) +- DAG visualization and understanding +- Debugging assistance for common issues +- Data quality validation patterns +- LLM/RAG workflow examples +- Feature engineering patterns +- Integration examples: + - Airflow + - FastAPI + - Streamlit + - Jupyter notebooks +- Parallel execution patterns (ThreadPool, Ray, Dask, Spark) +- Caching strategies +- Testing guidance + +### Documentation +- Comprehensive SKILL.md with all Hamilton patterns +- examples.md with 60+ production-ready code examples +- README.md with installation and usage instructions +- Plugin manifest (plugin.json) and marketplace (marketplace.json) diff --git a/.claude-plugin/README.md b/.claude-plugin/README.md new file mode 100644 index 000000000..7c7a3d3ee --- /dev/null +++ b/.claude-plugin/README.md @@ -0,0 +1,257 @@ + + +# Apache Hamilton Plugin for Claude Code + +A comprehensive AI assistant skill for [Apache Hamilton](https://github.com/apache/hamilton) development, designed to help you build, debug, and optimize Apache Hamilton DAGs using Claude Code. + +## What is This? + +This is a [Claude Code plugin](https://code.claude.com/docs/en/plugins.md) that provides expert assistance for Apache Hamilton development. When active, Claude Code understands Apache Hamilton's patterns, best practices, and can help you: + +- 🏗️ **Create new Apache Hamilton modules** with proper patterns and decorators +- 🔍 **Understand existing DAGs** by explaining dataflow and dependencies +- 🎨 **Apply function modifiers** correctly (@parameterize, @config.when, @check_output, etc.) +- 🐛 **Debug issues** in DAG definitions and execution +- 🔄 **Convert Python scripts** to Apache Hamilton modules +- ⚡ **Optimize pipelines** with caching, parallelization, and best practices +- ✅ **Write tests** for Apache Hamilton functions +- 📊 **Generate visualizations** of your DAGs + +## Installation + +### Option 1: Install via Plugin System (Recommended for Users) + +```bash +# Add the Hamilton plugin marketplace +/plugin marketplace add apache/hamilton + +# Install the plugin +/plugin install hamilton --scope user +``` + +Or in one command: +```bash +claude plugin install hamilton@apache/hamilton --scope user +``` + +**Installation scopes:** +- `--scope user` - Available in all your projects (recommended) +- `--scope project` - Only in current project +- `--scope local` - Testing/development only + +### Option 2: For Apache Hamilton Contributors + +If you've cloned the Apache Hamilton repository, the plugin is already available at `.claude-plugin/` and will be automatically discovered by Claude Code. No installation needed! + +### Option 3: Manual Installation from Cloned Repo + +Install the plugin from your local clone: + +```bash +# From within the Hamilton repo directory +claude plugin install ./.claude-plugin --scope user + +# Or copy the plugin directory +cp -r .claude-plugin ~/.claude/plugins/hamilton +``` + +## Usage + +### Automatic Invocation + +Claude Code will automatically use this skill when it detects you're working with Apache Hamilton code. Just ask questions or give instructions naturally: + +``` +"Help me create an Apache Hamilton module for processing customer data" +"Explain what this DAG does" +"Convert this pandas script to Apache Hamilton" +"Add caching to my expensive computation function" +"Why am I getting a circular dependency error?" +``` + +### Manual Invocation + +You can explicitly invoke the skill using the `/hamilton` command: + +``` +/hamilton create a feature engineering module with rolling averages +/hamilton explain the dataflow in my_functions.py +/hamilton optimize this DAG for parallel execution +``` + +## What the Skill Knows + +This skill has deep knowledge of: + +- **Core Apache Hamilton concepts**: Drivers, DAGs, nodes, function-based definitions +- **Function modifiers**: All decorators (@parameterize, @config.when, @extract_columns, @check_output, @save_to, @load_from, @cache, @pipe, @does, @mutate, @step, etc.) +- **Execution patterns**: Sequential, parallel, distributed (Ray, Dask, Spark) +- **Data quality**: Validation, schema checking, data quality pipelines +- **I/O patterns**: Materialization, data loaders, result adapters +- **Integration patterns**: Airflow, Streamlit, FastAPI, Jupyter +- **LLM workflows**: RAG pipelines, document processing, embeddings +- **Testing strategies**: Unit testing functions, integration testing DAGs +- **Debugging techniques**: Circular dependencies, visualization, lineage tracing + +## Examples + +### Creating a New Apache Hamilton Module + +``` +"Create an Apache Hamilton module that loads data from a CSV, cleans it by removing +nulls, calculates a 7-day rolling average of the 'sales' column, and outputs +the top 10 days by sales." +``` + +Claude will generate: +- Properly structured functions with type hints +- Correct dependency declarations via parameters +- Appropriate docstrings +- Driver setup code +- Suggestions for visualization + +### Converting Existing Code + +``` +"Convert this script to Apache Hamilton: + +import pandas as pd +df = pd.read_csv('data.csv') +df['feature'] = df['col_a'] * 2 + df['col_b'] +result = df.groupby('category')['feature'].mean() +" +``` + +Claude will refactor it into a clean Apache Hamilton module with separate functions for each transformation step. + +### Applying Decorators + +``` +"I need to create rolling averages for 7, 30, and 90 day windows. +How do I do this in Apache Hamilton without repeating code?" +``` + +Claude will show you how to use `@parameterize` to create multiple nodes from a single function. + +### Debugging + +``` +"I'm getting an error: 'Could not find parameter 'processed_data' in graph'. +What's wrong?" +``` + +Claude will help identify the issue (likely a typo or missing function definition) and suggest fixes. + +## Skill Features + +### Allowed Tools + +This skill is configured with permissions to: +- Read files (`Read`, `Grep`, `Glob`) +- Run Python code (`Bash(python:*)`) +- Search for files (`Bash(find:*)`) +- Run tests (`Bash(pytest:*)`) + +These tools are automatically permitted when the skill is active, streamlining the workflow. + +### Reference Materials + +The skill includes additional reference files: + +- **[examples.md](skills/hamilton/examples.md)** - Comprehensive code examples for common patterns + - Basic DAG creation + - Advanced function modifiers + - LLM & RAG workflows + - Feature engineering patterns + - Data quality validation + - Parallel execution + - Integration patterns (Airflow, FastAPI, Streamlit) + +## Requirements + +- **Claude Code CLI** - Install from https://code.claude.com +- **Apache Hamilton** - The skill works with any version, but references Hamilton 1.x+ patterns +- **Python 3.9+** - For running generated Apache Hamilton code + +## Contributing + +This plugin is open source and part of the Apache Hamilton project! We welcome contributions: + +### Found a Bug? + +Please [file an issue](https://github.com/apache/hamilton/issues/new) on GitHub with: +- A clear description of the problem +- Steps to reproduce +- Expected vs actual behavior +- Your Hamilton and Claude Code versions + +### Want to Improve It? + +Even better - submit a pull request! + +1. **Fork the repository**: https://github.com/apache/hamilton +2. **Make your changes**: Edit files in `.claude-plugin/` +3. **Test thoroughly**: Try the skill with various Apache Hamilton scenarios +4. **Submit a PR**: Include a clear description of your improvements + +**Types of contributions we love:** +- 📚 Add new examples to `examples.md` +- 📝 Improve instructions in `SKILL.md` +- 🐛 Fix bugs or inaccuracies +- ✨ Add support for new Apache Hamilton features +- 📖 Enhance documentation + +See [CONTRIBUTING.md](../../../CONTRIBUTING.md) in the Apache Hamilton repo for detailed guidelines. + +## Philosophy + +This skill follows Apache Hamilton's core philosophy: + +- **Declarative over imperative**: Guide users toward function-based definitions +- **Separation of concerns**: Keep definition, execution, and observation separate +- **Reusability**: Encourage patterns that make code testable and portable +- **Simplicity**: Prefer simple solutions over over-engineering + +## Changelog + +### v1.0.0 (2025-01-31) +- Initial release +- Comprehensive Apache Hamilton DAG creation assistance +- Support for all major function modifiers +- LLM/RAG workflow patterns +- Feature engineering examples +- Data quality validation patterns +- Integration examples (Airflow, FastAPI, Streamlit) + +## Learn More + +- **Apache Hamilton Documentation**: https://hamilton.apache.org +- **GitHub Repository**: https://github.com/apache/hamilton +- **Apache Hamilton Examples**: See `examples/` directory in the repo (60+ production examples) +- **DAGWorks Blog**: https://blog.dagworks.io +- **Community Slack**: Join via Apache Hamilton GitHub repo + +## License + +This plugin is part of the Apache Hamilton project and is licensed under the Apache 2.0 License. + +--- + +**Happy Apache Hamilton coding with Claude! 🚀** diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 000000000..ace6234b3 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,35 @@ +{ + "name": "hamilton", + "version": "2.0.0", + "description": "Expert AI assistant for Apache Hamilton framework development - create DAGs, apply decorators, debug dataflows, and optimize pipelines", + "author": { + "name": "Hamilton Team", + "email": "dev@hamilton.apache.org" + }, + "homepage": "https://github.com/apache/hamilton", + "repository": "https://github.com/apache/hamilton", + "license": "Apache-2.0", + "keywords": [ + "hamilton", + "dag", + "dataflow", + "workflow", + "pipeline", + "data-engineering", + "ml-ops", + "feature-engineering", + "llm", + "rag", + "async", + "spark", + "observability" + ], + "skills": [ + "./skills/hamilton-dev-workflow", + "./skills/core", + "./skills/scale", + "./skills/llm", + "./skills/observability", + "./skills/integrations" + ] +} diff --git a/.claude-plugin/skills/core/SKILL.md b/.claude-plugin/skills/core/SKILL.md new file mode 100644 index 000000000..48c4095c6 --- /dev/null +++ b/.claude-plugin/skills/core/SKILL.md @@ -0,0 +1,395 @@ + + +--- +name: hamilton-core +description: Core Hamilton patterns for creating DAGs, applying decorators, testing, and debugging dataflows. Use for basic Hamilton development tasks. +allowed-tools: Read, Grep, Glob, Bash(python:*), Bash(pytest:*) +user-invocable: true +disable-model-invocation: false +--- + +# Hamilton Core Development Assistant + +Apache Hamilton is a lightweight Python framework for building Directed Acyclic Graphs (DAGs) of data transformations using declarative, function-based definitions. + +## Core Principles + +**Function-Based DAG Definition** +- Functions with type hints define nodes in the DAG +- Function parameters automatically create edges (dependencies) +- Function names become node names in the DAG +- Pure functions enable easy testing and reusability + +**Key Architecture Components** +- **Functions**: Define transformations with parameters as dependencies +- **Driver**: Builds and manages DAG execution (`.execute()` runs the DAG) +- **FunctionGraph**: Internal DAG representation +- **Function Modifiers**: Decorators that modify DAG behavior +- **Adapters**: Result formatters and lifecycle hooks + +**Separation of Concerns** +- **Definition layer**: Pure Python functions (testable, reusable) +- **Execution layer**: Driver configuration (where/how to run) +- **Observation layer**: Monitoring, lineage, caching + +## Common Tasks + +### 1. Creating New Hamilton Modules + +**Basic Module Structure:** +```python +""" +Module docstring explaining the DAG's purpose. +""" +import pandas as pd +from hamilton.function_modifiers import extract_columns + +def raw_data(data_path: str) -> pd.DataFrame: + """Load raw data from source. + + :param data_path: Path to data file (passed as input) + :return: Raw DataFrame + """ + return pd.read_csv(data_path) + +def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame: + """Remove null values and duplicates. + + :param raw_data: Raw data from previous node + :return: Cleaned DataFrame + """ + return raw_data.dropna().drop_duplicates() + +def feature_a(cleaned_data: pd.DataFrame) -> pd.Series: + """Calculate feature A. + + :param cleaned_data: Cleaned data + :return: Feature A values + """ + return cleaned_data['column_a'] * 2 +``` + +**Driver Setup:** +```python +from hamilton import driver +import my_functions + +dr = driver.Driver({}, my_functions) +results = dr.execute( + ['feature_a', 'cleaned_data'], + inputs={'data_path': 'data.csv'} +) +``` + +**Best Practices:** +1. ✅ Add type hints to ALL function signatures +2. ✅ Write clear docstrings with :param and :return +3. ✅ Keep functions pure (no side effects) +4. ✅ Name functions after the output they produce +5. ✅ Use function parameters for dependencies (not globals) +6. ✅ Create unit tests for each function +7. ❌ Don't use classes unless needed (functions are preferred) +8. ❌ Don't mutate inputs (return new objects) + +### 2. Applying Function Modifiers (Decorators) + +**Configuration & Polymorphism:** +```python +from hamilton.function_modifiers import config + +@config.when(model_type='linear') +def predictions(features: pd.DataFrame) -> pd.Series: + """Linear model predictions.""" + from sklearn.linear_model import LinearRegression + model = LinearRegression() + return model.fit_predict(features) + +@config.when(model_type='tree') +def predictions(features: pd.DataFrame) -> pd.Series: + """Tree model predictions.""" + from sklearn.tree import DecisionTreeRegressor + model = DecisionTreeRegressor() + return model.fit_predict(features) + +# Use: driver.Driver({'model_type': 'linear'}, module) +``` + +**Parameterization - Creating Multiple Nodes:** +```python +from hamilton.function_modifiers import parameterize + +@parameterize( + rolling_7d={'window': 7}, + rolling_30d={'window': 30}, + rolling_90d={'window': 90}, +) +def rolling_average(spend: pd.Series, window: int) -> pd.Series: + """Calculate rolling average for different windows.""" + return spend.rolling(window).mean() + +# Creates 3 nodes: rolling_7d, rolling_30d, rolling_90d +``` + +**Column Extraction - DataFrames to Series:** +```python +from hamilton.function_modifiers import extract_columns + +@extract_columns('feature_1', 'feature_2', 'feature_3') +def features(cleaned_data: pd.DataFrame) -> pd.DataFrame: + """Generate multiple features.""" + return pd.DataFrame({ + 'feature_1': cleaned_data['a'] * 2, + 'feature_2': cleaned_data['b'] ** 2, + 'feature_3': cleaned_data['a'] + cleaned_data['b'], + }) + +# Creates 3 nodes: feature_1, feature_2, feature_3 (each a Series) +``` + +**Data Quality Validation:** +```python +from hamilton.function_modifiers import check_output +import pandera as pa + +@check_output( + data_type=float, + range=(0, 100), + importance="fail" +) +def revenue_percentage(revenue: float, total: float) -> float: + """Calculate revenue as percentage.""" + return (revenue / total) * 100 + +# With Pandera schemas +@check_output( + schema=pa.SeriesSchema(float, pa.Check.greater_than(0)), + importance="fail" +) +def positive_values(data: pd.Series) -> pd.Series: + """Ensure all values are positive.""" + return data +``` + +**I/O Materialization:** +```python +from hamilton.function_modifiers import save_to, load_from +from hamilton.io.materialization import to + +@save_to(to.csv(path="output.csv")) +def final_results(aggregated_data: pd.DataFrame) -> pd.DataFrame: + """Save final results to CSV.""" + return aggregated_data + +@load_from(from_='data.parquet', reader='parquet') +def input_data() -> pd.DataFrame: + """Load data from parquet.""" + pass # Function body ignored when using @load_from +``` + +### 3. Converting Existing Code to Hamilton + +**Before (Script):** +```python +import pandas as pd + +df = pd.read_csv('data.csv') +df = df.dropna() +df['feature'] = df['col_a'] * 2 +result = df.groupby('category')['feature'].mean() +print(result) +``` + +**After (Hamilton Module):** +```python +"""Data processing DAG.""" +import pandas as pd + +def raw_data(data_path: str) -> pd.DataFrame: + """Load raw data.""" + return pd.read_csv(data_path) + +def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame: + """Remove nulls.""" + return raw_data.dropna() + +def feature(cleaned_data: pd.DataFrame) -> pd.Series: + """Calculate feature.""" + return cleaned_data['col_a'] * 2 + +def data_with_feature(cleaned_data: pd.DataFrame, feature: pd.Series) -> pd.DataFrame: + """Add feature to dataset.""" + df = cleaned_data.copy() + df['feature'] = feature + return df + +def result(data_with_feature: pd.DataFrame) -> pd.Series: + """Aggregate by category.""" + return data_with_feature.groupby('category')['feature'].mean() +``` + +**Conversion Guidelines:** +1. Identify distinct computation steps +2. Extract each step into a pure function +3. Use previous step's variable name as function parameter +4. Add type hints and docstrings +5. Remove imperative variable assignments +6. Test each function independently + +### 4. Visualizing & Understanding DAGs + +**Generate Visualization:** +```python +from hamilton import driver +import my_functions + +dr = driver.Driver({}, my_functions) + +# Create visualization +dr.display_all_functions('dag.png') # All nodes +dr.visualize_execution( + ['final_output'], + 'execution.png', + inputs={'input_data': ...} +) # Execution path only +``` + +**Understanding DAG Structure:** +- Each function becomes a node +- Function parameters create directed edges +- No cycles allowed (DAG = Directed Acyclic Graph) +- Execution order determined by dependencies +- Multiple paths execute in parallel when possible + +**Debugging Tips:** +1. Check for circular dependencies (A depends on B depends on A) +2. Verify all parameter names match existing function names +3. Look for typos in parameter names +4. Use `dr.list_available_variables()` to see all nodes +5. Check `dr.what_is_downstream_of('node_name')` for dependencies + +### 5. Testing Hamilton Functions + +**Unit Testing Pattern:** +```python +import pytest +import pandas as pd +from my_functions import cleaned_data, feature + +def test_cleaned_data(): + """Test data cleaning.""" + raw = pd.DataFrame({ + 'col_a': [1, 2, None, 4], + 'col_b': ['a', 'b', 'c', 'd'] + }) + result = cleaned_data(raw) + assert len(result) == 3 + assert result['col_a'].isna().sum() == 0 + +def test_feature(): + """Test feature calculation.""" + data = pd.DataFrame({'col_a': [1, 2, 3]}) + result = feature(data) + pd.testing.assert_series_equal( + result, + pd.Series([2, 4, 6], name='col_a') + ) +``` + +**Integration Testing with Driver:** +```python +def test_full_pipeline(): + """Test complete DAG execution.""" + from hamilton import driver + import my_functions + + dr = driver.Driver({}, my_functions) + result = dr.execute( + ['result'], + inputs={'data_path': 'test_data.csv'} + ) + assert 'result' in result + assert result['result'].sum() > 0 +``` + +## Common Pitfalls & Solutions + +**Circular Dependencies:** +```python +# ❌ Bad - circular dependency +def a(b: int) -> int: + return b + 1 + +def b(a: int) -> int: + return a + 1 + +# ✅ Good - break the cycle +def a(input_value: int) -> int: + return input_value + 1 + +def b(a: int) -> int: + return a + 1 +``` + +**Missing Type Hints:** +```python +# ❌ Bad - no type hints +def process(data): + return data * 2 + +# ✅ Good - clear types +def process(data: pd.Series) -> pd.Series: + return data * 2 +``` + +**Mutating Inputs:** +```python +# ❌ Bad - mutates input +def add_column(df: pd.DataFrame, col_name: str) -> pd.DataFrame: + df[col_name] = 0 # Modifies original! + return df + +# ✅ Good - returns new object +def add_column(df: pd.DataFrame, col_name: str) -> pd.DataFrame: + result = df.copy() + result[col_name] = 0 + return result +``` + +## Key Files & Locations + +- **Core library**: `hamilton/` - Main package code +- **Driver**: `hamilton/driver.py` - Main orchestration class +- **Function modifiers**: `hamilton/function_modifiers/` - Decorators +- **Examples**: `examples/` - Production examples +- **Tests**: `tests/` - Unit and integration tests +- **Docs**: `docs/` - Official documentation + +## Getting Help + +- **Documentation**: `docs/` directory in repo +- **Examples**: `examples/` directory for patterns +- **Community**: Apache Hamilton Slack, GitHub issues +- **Other Skills**: Use `/hamilton-scale` for async/Spark, `/hamilton-llm` for AI workflows + +## Additional Resources + +For detailed reference material, see: +- Apache Hamilton official docs at hamilton.apache.org +- Apache Hamilton GitHub repository examples folder diff --git a/.claude-plugin/skills/hamilton-dev-workflow/SKILL.md b/.claude-plugin/skills/hamilton-dev-workflow/SKILL.md new file mode 100644 index 000000000..35a910b21 --- /dev/null +++ b/.claude-plugin/skills/hamilton-dev-workflow/SKILL.md @@ -0,0 +1,522 @@ + + +--- +name: hamilton-dev-workflow +description: Systematic 5-step workflow for building Hamilton DAGs - DOT graphs, signatures, validation, TDD implementation. Use this workflow when creating new Hamilton modules from scratch. +allowed-tools: Read, Grep, Glob, Bash(python:*), Bash(hamilton:*), Bash(pytest:*) +user-invocable: true +disable-model-invocation: false +--- + +# Hamilton Development Workflow for Claude Code + +**TL;DR**: Build Hamilton DAGs systematically using DOT → Signatures → Validation → TDD implementation. + +## Why This Workflow? + +This workflow optimizes for: +- **Token efficiency**: DOT language is extremely concise +- **Early validation**: Catch graph errors before implementation +- **LLM-friendly**: Well-structured, incremental approach +- **Testability**: Build complex systems node-by-node with tests +- **Pragmatic typing**: Handle type annotations strategically + +## The 5-Step Workflow + +### Step 1: Natural Language → DOT Graph + +**Convert user requirements into DOT language representation** + +DOT language is: +- Simple and well-understood by LLMs +- Extremely token-efficient +- Perfect for representing DAG structure +- Easy to validate visually + +**Example:** +``` +User: "Load CSV data, clean nulls, calculate 7-day rolling average, output top 10" + +DOT representation: +digraph hamilton_dag { + // Inputs + data_path [shape=box, color=blue] + + // Pipeline (arrows show data flow: upstream -> downstream) + data_path -> raw_data + raw_data -> cleaned_data + cleaned_data -> rolling_7d + rolling_7d -> top_10 +} +``` + +**Best Practices:** +- Use descriptive node names (snake_case) +- Mark inputs with `[shape=box, color=blue]` +- Add comments for clarity +- Keep it simple - just the graph structure + +### Step 2: DOT Graph → Function Signatures + Docstrings + +**Convert DOT nodes into Python function signatures** + +This step is mechanical and can be automated. For each node: +1. Create a function with the node name +2. Add parameters based on incoming edges +3. Add return type annotation (use `Any` if uncertain) +4. Write a clear docstring explaining what the node computes + +**Example:** +```python +"""Data processing pipeline.""" +from typing import Any +import pandas as pd + +def raw_data(data_path: str) -> pd.DataFrame: + """Load raw data from CSV file. + + :param data_path: Path to CSV file + :return: Raw DataFrame + """ + pass + +def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame: + """Remove null values and duplicates. + + :param raw_data: Raw data from previous step + :return: Cleaned DataFrame + """ + pass + +def rolling_7d(cleaned_data: pd.DataFrame) -> pd.Series: + """Calculate 7-day rolling average of sales. + + :param cleaned_data: Cleaned data + :return: 7-day rolling average + """ + pass + +def top_10(rolling_7d: pd.Series) -> pd.Series: + """Get top 10 days by value. + + :param rolling_7d: Rolling average values + :return: Top 10 values + """ + pass +``` + +**Type Annotation Strategy:** +- **Known types**: Use specific types (pd.DataFrame, pd.Series, int, str) +- **Uncertain**: Use `Any` - you'll fix this later with MonkeyType +- **Complex types**: Start with `Any`, refine incrementally + +### Step 3: Validate DAG with Hamilton CLI + +**Verify the graph structure before implementing logic** + +Use the Hamilton CLI to validate: +```bash +# Validate the DAG structure +hamilton build --module my_pipeline + +# Visualize the graph +hamilton build --module my_pipeline --output dag.png + +# List all nodes +hamilton build --module my_pipeline --list-nodes +``` + +**What to check:** +- ✅ No circular dependencies +- ✅ All edges connect properly +- ✅ Input nodes are identified correctly +- ✅ Output nodes are what you expect +- ✅ No typos in function/parameter names + +**If validation fails:** +- Fix the function signatures (parameter names, function names) +- Re-run `hamilton build` +- Don't proceed to implementation until validation passes + +### Step 4: Function Signatures → TDD Implementation + +**Implement nodes incrementally with test-driven development** + +This is where the workflow shines for LLMs. Instead of writing all code at once: + +**Process:** +1. Pick ONE node to implement (start with inputs, work forward) +2. Write a test for that node first +3. Implement the node to pass the test +4. Run the test to verify +5. Move to the next node + +**Example - Node by node:** + +```python +# Test for raw_data +def test_raw_data(tmp_path): + """Test CSV loading.""" + # Setup + csv_file = tmp_path / "data.csv" + csv_file.write_text("col_a,col_b\n1,2\n3,4") + + # Execute + result = raw_data(str(csv_file)) + + # Verify + assert isinstance(result, pd.DataFrame) + assert len(result) == 2 + assert list(result.columns) == ['col_a', 'col_b'] + +# Implementation +def raw_data(data_path: str) -> pd.DataFrame: + """Load raw data from CSV file.""" + return pd.read_csv(data_path) +``` + +```python +# Test for cleaned_data +def test_cleaned_data(): + """Test data cleaning.""" + # Setup + raw = pd.DataFrame({ + 'col_a': [1, 2, None, 4], + 'col_b': [1, 1, 2, 2] + }) + + # Execute + result = cleaned_data(raw) + + # Verify + assert len(result) == 3 # One null removed + assert result['col_a'].isna().sum() == 0 + +# Implementation +def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame: + """Remove null values and duplicates.""" + return raw_data.dropna().drop_duplicates() +``` + +**Why this works:** +- **Focused context**: LLM only thinks about one node at a time +- **Immediate validation**: Each node is tested before moving on +- **Easy refactoring**: Well-tested nodes can be safely refactored +- **Handles complexity**: Build complex pipelines without "spaghetti code" +- **Clear progress**: Know exactly what's done and what's left + +### Step 5: Handle Type Annotations Pragmatically + +**Use MonkeyType or runtime inference to add accurate types** + +Don't spend time guessing types upfront. Instead: + +**Option A: Use `Any` everywhere initially** +```python +from typing import Any + +def my_function(input_data: Any) -> Any: + """Process data.""" + # Implementation here + pass +``` + +**Option B: Disable type validation in Driver** +```python +from hamilton import driver +from hamilton.lifecycle import base + +# Use an adapter that doesn't validate types +class NoTypeCheckAdapter(base.BaseDo): + """Adapter that skips type validation.""" + pass + +dr = driver.Driver( + {}, + module, + adapter=NoTypeCheckAdapter() +) +``` + +**Option C: Run MonkeyType to infer types** + +After implementing and testing: +```bash +# Record runtime types +monkeytype run -m pytest tests/ + +# Generate type stubs +monkeytype stub my_pipeline + +# Apply types to source +monkeytype apply my_pipeline +``` + +MonkeyType will: +- Observe actual values passed at runtime +- Infer concrete types from observations +- Generate accurate type annotations +- Update your source files + +**Result**: Accurate types without upfront guessing! + +**Important**: MonkeyType isn't perfect, but it's: +- ✅ Much faster than manual annotation +- ✅ Based on real runtime behavior +- ✅ Good enough for first drafts +- ✅ Easy to refine manually later + +## Complete Example + +**User Request:** +"Create a pipeline that loads JSON data, filters by status='active', enriches with external API data, and outputs to Parquet" + +**Step 1: DOT Graph** +```dot +digraph pipeline { + // Inputs + json_path [shape=box, color=blue] + api_endpoint [shape=box, color=blue] + output_path [shape=box, color=blue] + + // Pipeline (arrows show data flow: upstream -> downstream) + json_path -> raw_data + raw_data -> active_records + active_records -> enriched_data + api_endpoint -> enriched_data + enriched_data -> final_output + output_path -> final_output +} +``` + +**Step 2: Function Signatures** +```python +"""Data enrichment pipeline.""" +from typing import Any +import pandas as pd + +def raw_data(json_path: str) -> pd.DataFrame: + """Load data from JSON file. + + :param json_path: Path to JSON file + :return: Raw DataFrame + """ + pass + +def active_records(raw_data: pd.DataFrame) -> pd.DataFrame: + """Filter for active status only. + + :param raw_data: Raw data + :return: Filtered DataFrame + """ + pass + +def enriched_data(active_records: pd.DataFrame, api_endpoint: str) -> pd.DataFrame: + """Enrich with external API data. + + :param active_records: Active records + :param api_endpoint: API endpoint URL + :return: Enriched DataFrame + """ + pass + +def final_output(enriched_data: pd.DataFrame, output_path: str) -> str: + """Save to Parquet file. + + :param enriched_data: Enriched data + :param output_path: Output file path + :return: Path to saved file + """ + pass +``` + +**Step 3: Validate** +```bash +hamilton build --module enrichment_pipeline +# Output: ✓ DAG is valid, 4 nodes, 0 cycles +``` + +**Step 4: TDD Implementation** + +*Test 1:* +```python +def test_raw_data(tmp_path): + json_file = tmp_path / "data.json" + json_file.write_text('[{"id": 1, "status": "active"}]') + result = raw_data(str(json_file)) + assert len(result) == 1 +``` + +*Implementation 1:* +```python +def raw_data(json_path: str) -> pd.DataFrame: + return pd.read_json(json_path) +``` + +*Test 2:* +```python +def test_active_records(): + raw = pd.DataFrame([ + {"id": 1, "status": "active"}, + {"id": 2, "status": "inactive"} + ]) + result = active_records(raw) + assert len(result) == 1 + assert result.iloc[0]["status"] == "active" +``` + +*Implementation 2:* +```python +def active_records(raw_data: pd.DataFrame) -> pd.DataFrame: + return raw_data[raw_data["status"] == "active"] +``` + +...continue for each node... + +**Step 5: Add Types with MonkeyType** +```bash +monkeytype run -m pytest tests/ +monkeytype apply enrichment_pipeline +``` + +Result: Accurate type annotations added automatically! + +## Key Benefits of This Workflow + +### For LLMs (Claude): +- **Structured thinking**: Each step has clear boundaries +- **Token efficient**: DOT is concise, avoids redundant code +- **Incremental**: Build one piece at a time +- **Validatable**: Check work at each step +- **Manageable complexity**: Never overwhelmed with full implementation + +### For Developers: +- **Visual validation**: See the DAG before coding +- **Test coverage**: Every node is tested +- **Maintainable**: Well-structured, documented code +- **Debuggable**: Issues isolated to specific nodes +- **Refactorable**: Safe to modify with test coverage + +## Advanced Patterns + +### Using Function Modifiers + +Add decorators in Step 2: + +```python +from hamilton.function_modifiers import parameterize + +@parameterize( + rolling_7d={'window': 7}, + rolling_30d={'window': 30}, + rolling_90d={'window': 90}, +) +def rolling_average(data: pd.Series, window: int) -> pd.Series: + """Calculate rolling average. + + :param data: Input data + :param window: Rolling window size + :return: Rolling average + """ + pass +``` + +Update DOT graph to reflect expanded nodes: +```dot +data -> rolling_7d +data -> rolling_30d +data -> rolling_90d +``` + +### Async Functions + +Mark async in Step 2: + +```python +async def api_data(endpoint: str, record_id: int) -> dict: + """Fetch data from API. + + :param endpoint: API endpoint + :param record_id: Record ID to fetch + :return: API response data + """ + pass +``` + +Use async driver: +```python +from hamilton import async_driver + +dr = await async_driver.Builder().with_modules(module).build() +``` + +## Common Pitfalls + +### ❌ Implementing before validating +Don't write function bodies before running `hamilton build`. Validate structure first! + +### ❌ Writing all code at once +Don't implement all nodes before testing. Go node-by-node with TDD. + +### ❌ Guessing type annotations +Don't waste time guessing types. Use `Any` + MonkeyType or disable validation initially. + +### ❌ Skipping the DOT step +Don't jump straight to code. The DOT graph is your blueprint - validate it first. + +### ❌ Not running tests +Don't implement the next node until the current one passes its test. + +## Quick Reference + +```bash +# Step 3: Validate DAG +hamilton build --module my_module +hamilton build --module my_module --output dag.png + +# Step 4: Run tests +pytest tests/test_my_module.py::test_my_function -v + +# Step 5: Infer types +monkeytype run -m pytest tests/ +monkeytype apply my_module + +# Full pipeline validation +python -c "from hamilton import driver; import my_module; driver.Driver({}, my_module)" +``` + +## Summary + +1. **DOT first**: Design the graph in DOT language (token-efficient, validatable) +2. **Signatures next**: Convert to function signatures with docstrings +3. **Validate early**: Use `hamilton build` before implementing +4. **TDD everything**: Implement one node at a time with tests +5. **Types pragmatically**: Use Any + MonkeyType or disable validation initially + +**This workflow enables Claude to build complex Hamilton DAGs systematically, avoiding the pitfalls of monolithic implementation.** + +--- + +For additional patterns and examples, see the skill-specific guides: +- `/hamilton-core` - Core patterns and decorators +- `/hamilton-scale` - Async, Ray, Dask, Spark +- `/hamilton-llm` - LLM and RAG workflows +- `/hamilton-observability` - Monitoring and tracking +- `/hamilton-integrations` - Airflow, FastAPI, Streamlit diff --git a/.claude-plugin/skills/integrations/SKILL.md b/.claude-plugin/skills/integrations/SKILL.md new file mode 100644 index 000000000..a4c82a733 --- /dev/null +++ b/.claude-plugin/skills/integrations/SKILL.md @@ -0,0 +1,549 @@ + + +--- +name: hamilton-integrations +description: Hamilton integration patterns for Airflow, Dagster, FastAPI, Streamlit, Jupyter notebooks, and other frameworks. Use when integrating Hamilton with other tools. +allowed-tools: Read, Grep, Glob, Bash(python:*), Bash(jupyter:*) +user-invocable: true +disable-model-invocation: false +--- + +# Hamilton Integrations + +This skill covers integrating Hamilton with orchestrators, web frameworks, notebooks, and other tools. + +## Why Integrate Hamilton? + +Hamilton focuses on **dataflow definition**, letting you integrate with: +- **Orchestrators** (Airflow, Dagster, Prefect) - Schedule and monitor +- **Web frameworks** (FastAPI, Flask) - Serve predictions +- **Dashboards** (Streamlit, Plotly Dash) - Interactive visualization +- **Notebooks** (Jupyter) - Interactive development +- **Experiment tracking** (MLflow, Weights & Biases) - Track experiments + +## Airflow Integration + +**Use Case:** Schedule Hamilton DAGs as Airflow tasks + +```python +"""Hamilton in Airflow DAG.""" +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime, timedelta +from hamilton import driver +import my_hamilton_module + +def run_hamilton_pipeline(**context): + """Execute Hamilton DAG within Airflow task.""" + dr = driver.Driver({}, my_hamilton_module) + + results = dr.execute( + ['final_output'], + inputs={ + 'data_date': context['ds'], # Airflow execution date + 'run_id': context['run_id'] + } + ) + + # Push results to XCom for downstream tasks + context['task_instance'].xcom_push(key='results', value=results) + return results + +default_args = { + 'owner': 'data-team', + 'depends_on_past': False, + 'start_date': datetime(2024, 1, 1), + 'retries': 2, + 'retry_delay': timedelta(minutes=5) +} + +with DAG( + 'hamilton_etl_pipeline', + default_args=default_args, + schedule_interval='@daily', + catchup=False +) as dag: + + hamilton_task = PythonOperator( + task_id='run_hamilton_etl', + python_callable=run_hamilton_pipeline, + provide_context=True + ) +``` + +**Multiple Hamilton DAGs in Airflow:** + +```python +"""Orchestrate multiple Hamilton pipelines.""" +def run_data_ingestion(**context): + """Hamilton pipeline 1: Ingest data.""" + import ingestion_module + dr = driver.Driver({}, ingestion_module) + return dr.execute(['ingested_data'], inputs={'date': context['ds']}) + +def run_feature_engineering(**context): + """Hamilton pipeline 2: Feature engineering.""" + import feature_module + # Get data from previous task + ingested_data = context['task_instance'].xcom_pull(task_ids='ingest') + dr = driver.Driver({}, feature_module) + return dr.execute(['features'], inputs={'raw_data': ingested_data}) + +def run_model_training(**context): + """Hamilton pipeline 3: Train model.""" + import training_module + features = context['task_instance'].xcom_pull(task_ids='features') + dr = driver.Driver({}, training_module) + return dr.execute(['trained_model'], inputs={'features': features}) + +with DAG('ml_pipeline', schedule_interval='@weekly') as dag: + ingest = PythonOperator(task_id='ingest', python_callable=run_data_ingestion) + features = PythonOperator(task_id='features', python_callable=run_feature_engineering) + train = PythonOperator(task_id='train', python_callable=run_model_training) + + ingest >> features >> train +``` + +## Dagster Integration + +**Use Case:** Define Hamilton as Dagster assets + +```python +"""Hamilton in Dagster.""" +from dagster import asset, AssetExecutionContext +from hamilton import driver +import my_hamilton_module + +@asset +def customer_features(context: AssetExecutionContext) -> dict: + """Execute Hamilton DAG as Dagster asset.""" + dr = driver.Driver({}, my_hamilton_module) + + context.log.info("Starting Hamilton pipeline") + + results = dr.execute( + ['customer_segments', 'feature_importance'], + inputs={'data_path': '/data/customers.csv'} + ) + + context.log.info(f"Generated {len(results['customer_segments'])} segments") + + return results + +@asset(deps=[customer_features]) +def segment_report(context: AssetExecutionContext, customer_features: dict) -> str: + """Use Hamilton output in downstream Dagster asset.""" + segments = customer_features['customer_segments'] + # Generate report + return f"Processed {len(segments)} segments" +``` + +## FastAPI Integration + +**Use Case:** Serve Hamilton DAGs as REST API endpoints + +```python +"""Hamilton as FastAPI microservice.""" +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from hamilton import driver +import prediction_module + +app = FastAPI(title="ML Prediction Service") + +# Initialize driver once at startup +prediction_driver = driver.Driver({}, prediction_module) + +class PredictionRequest(BaseModel): + """Request schema.""" + user_id: str + feature_a: float + feature_b: float + feature_c: float + +class PredictionResponse(BaseModel): + """Response schema.""" + user_id: str + prediction: float + confidence: float + +@app.post("/predict", response_model=PredictionResponse) +def predict(request: PredictionRequest): + """Stateless prediction endpoint.""" + try: + result = prediction_driver.execute( + ['prediction', 'confidence'], + inputs=request.dict() + ) + + return PredictionResponse( + user_id=request.user_id, + prediction=result['prediction'], + confidence=result['confidence'] + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/health") +def health_check(): + """Health check endpoint.""" + return {"status": "healthy", "service": "hamilton-predictor"} + +# Run with: uvicorn main:app --reload +``` + +**Async FastAPI with Async Hamilton:** + +```python +"""Async Hamilton with FastAPI.""" +from fastapi import FastAPI +from hamilton import async_driver +import async_prediction_module + +app = FastAPI() + +# Async driver initialization +prediction_driver = None + +@app.on_event("startup") +async def startup(): + """Initialize async driver on startup.""" + global prediction_driver + prediction_driver = await async_driver.Builder()\ + .with_modules(async_prediction_module)\ + .build() + +@app.post("/predict") +async def predict(request: PredictionRequest): + """Async prediction endpoint.""" + result = await prediction_driver.execute( + ['prediction'], + inputs=request.dict() + ) + return {"prediction": result['prediction']} +``` + +## Streamlit Integration + +**Use Case:** Interactive data apps with Hamilton + +```python +"""Hamilton-powered Streamlit dashboard.""" +import streamlit as st +from hamilton import driver +import analytics_module + +st.title("Customer Analytics Dashboard") + +# Sidebar controls +date_range = st.sidebar.date_input("Select date range", []) +metric = st.sidebar.selectbox("Metric", ["revenue", "users", "conversions"]) +segment = st.sidebar.multiselect("Segments", ["new", "returning", "churned"]) + +# Execute Hamilton DAG with user inputs +if st.sidebar.button("Run Analysis"): + with st.spinner("Running analysis..."): + dr = driver.Driver({'metric': metric}, analytics_module) + + results = dr.execute( + ['daily_metrics', 'segment_breakdown', 'trends'], + inputs={ + 'date_range': date_range, + 'segments': segment + } + ) + + # Display results + st.header("Daily Metrics") + st.line_chart(results['daily_metrics']) + + st.header("Segment Breakdown") + st.bar_chart(results['segment_breakdown']) + + st.header("Trends") + st.dataframe(results['trends']) + + # Visualize DAG + st.header("Pipeline Visualization") + dr.visualize_execution( + ['trends'], + './pipeline.png', + inputs={'date_range': date_range, 'segments': segment} + ) + st.image('./pipeline.png') +``` + +## Jupyter Notebook Integration + +**Use Case:** Interactive development and experimentation + +### Jupyter Magic Extension + +```python +"""Use Hamilton directly in notebooks.""" +# Install magic extension +%load_ext hamilton.plugins.jupyter_magic + +# Define Hamilton functions in cells +%%cell_to_module my_analysis --display + +import pandas as pd + +def raw_data(csv_path: str) -> pd.DataFrame: + """Load data.""" + return pd.read_csv(csv_path) + +def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame: + """Clean data.""" + return raw_data.dropna() + +def summary_stats(cleaned_data: pd.DataFrame) -> dict: + """Calculate summary.""" + return { + 'mean': cleaned_data['value'].mean(), + 'std': cleaned_data['value'].std() + } + +# Displays DAG visualization automatically! +``` + +### Standard Notebook Pattern + +```python +"""Hamilton in Jupyter without magic.""" +# Cell 1: Define Hamilton functions +# my_functions.py equivalent +def load_data(data_path: str) -> pd.DataFrame: + return pd.read_csv(data_path) + +def process_data(load_data: pd.DataFrame) -> pd.DataFrame: + return load_data.dropna() + +# Cell 2: Create driver +from hamilton import driver +import sys + +# Add current module to driver +dr = driver.Driver({}, sys.modules[__name__]) + +# Cell 3: Execute and explore +results = dr.execute( + ['process_data'], + inputs={'data_path': 'data.csv'} +) + +results['process_data'].head() + +# Cell 4: Visualize +dr.visualize_execution( + ['process_data'], + './notebook_dag.png', + inputs={'data_path': 'data.csv'} +) + +from IPython.display import Image +Image('./notebook_dag.png') +``` + +## MLflow Integration + +**Use Case:** Track experiments and models + +```python +"""Hamilton with MLflow tracking.""" +from hamilton import driver +from hamilton.plugins.mlflow_extensions import MLFlowTracker +import mlflow +import training_module + +# Configure MLflow +mlflow.set_tracking_uri("http://localhost:5000") +mlflow.set_experiment("customer_churn") + +with mlflow.start_run(): + # Create MLflow adapter + mlflow_tracker = MLFlowTracker( + experiment_name="customer_churn", + run_name="baseline_model_v1", + tags={"model_type": "random_forest", "version": "1.0"} + ) + + # Hamilton driver with MLflow tracking + dr = driver.Builder()\ + .with_config({'model_type': 'random_forest'})\ + .with_modules(training_module)\ + .with_adapters(mlflow_tracker)\ + .build() + + results = dr.execute( + ['trained_model', 'metrics'], + inputs={'training_data': train_df} + ) + + # Log additional metrics + mlflow.log_metrics(results['metrics']) + mlflow.log_param("features_count", len(results['features'])) +``` + +## Weights & Biases Integration + +```python +"""Hamilton with W&B tracking.""" +import wandb +from hamilton import driver +import experiment_module + +# Initialize W&B +wandb.init(project="ml-experiments", name="experiment-42") + +# Configure Hamilton +config = { + 'learning_rate': 0.001, + 'batch_size': 32, + 'epochs': 10 +} + +# Log config to W&B +wandb.config.update(config) + +dr = driver.Driver(config, experiment_module) + +results = dr.execute( + ['trained_model', 'validation_metrics'], + inputs={'data_path': '/data/train.csv'} +) + +# Log results to W&B +wandb.log({ + "val_accuracy": results['validation_metrics']['accuracy'], + "val_loss": results['validation_metrics']['loss'] +}) + +wandb.finish() +``` + +## Flask Integration + +```python +"""Hamilton with Flask.""" +from flask import Flask, request, jsonify +from hamilton import driver +import service_module + +app = Flask(__name__) +service_driver = driver.Driver({}, service_module) + +@app.route('/api/process', methods=['POST']) +def process_data(): + """Process data endpoint.""" + data = request.get_json() + + try: + results = service_driver.execute( + ['processed_result'], + inputs=data + ) + return jsonify(results) + except Exception as e: + return jsonify({'error': str(e)}), 500 + +@app.route('/health') +def health(): + return jsonify({'status': 'healthy'}) + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000) +``` + +## dbt Integration + +**Use Case:** Hamilton for transformations, dbt for SQL + +```python +"""Hamilton after dbt transformations.""" +import subprocess +from hamilton import driver +import post_dbt_module + +def run_dbt() -> dict: + """Run dbt pipeline.""" + result = subprocess.run(['dbt', 'run'], capture_output=True) + return {'status': 'success' if result.returncode == 0 else 'failed'} + +def dbt_output_path(run_dbt: dict) -> str: + """Get dbt output location.""" + return './target/output.csv' + +# Rest of Hamilton DAG uses dbt output +def post_dbt_analysis(dbt_output_path: str) -> pd.DataFrame: + """Analyze dbt output.""" + return pd.read_csv(dbt_output_path) +``` + +## Kedro Integration + +```python +"""Use Hamilton within Kedro pipelines.""" +from kedro.pipeline import Pipeline, node +from hamilton import driver +import hamilton_transformations + +def run_hamilton_node(**inputs): + """Execute Hamilton as Kedro node.""" + dr = driver.Driver({}, hamilton_transformations) + return dr.execute(['output'], inputs=inputs) + +def create_pipeline(**kwargs) -> Pipeline: + return Pipeline([ + node( + func=run_hamilton_node, + inputs=["raw_data", "parameters"], + outputs="hamilton_results", + name="hamilton_transformation" + ) + ]) +``` + +## Best Practices + +1. **Initialize driver once** - Reuse driver across requests in web services +2. **Separate concerns** - Orchestrator handles scheduling, Hamilton handles dataflow +3. **Use async** - FastAPI + async Hamilton for I/O-bound workflows +4. **Track everywhere** - Add HamiltonTracker to production integrations +5. **Health checks** - Expose health endpoints for monitoring +6. **Error handling** - Wrap Hamilton execution in try/except +7. **Configuration** - Pass environment-specific config to Hamilton + +## Choosing the Right Integration + +| Use Case | Tool | When to Use | +|----------|------|-------------| +| Schedule pipelines | Airflow, Dagster | Daily/weekly batch processing | +| Serve predictions | FastAPI, Flask | Real-time ML inference | +| Interactive dashboards | Streamlit | Business intelligence, exploration | +| Development | Jupyter | Prototyping, analysis | +| Experiment tracking | MLflow, W&B | ML model development | +| SQL + Python | dbt | Most data in warehouse, some Python logic | + +## Additional Resources + +- For core Hamilton patterns, use `/hamilton-core` +- For observability, use `/hamilton-observability` +- Examples: github.com/apache/hamilton/tree/main/examples diff --git a/.claude-plugin/skills/llm/SKILL.md b/.claude-plugin/skills/llm/SKILL.md new file mode 100644 index 000000000..dbf1e03c6 --- /dev/null +++ b/.claude-plugin/skills/llm/SKILL.md @@ -0,0 +1,507 @@ + + +--- +name: hamilton-llm +description: LLM and AI workflow patterns for Hamilton including RAG pipelines, embeddings, vector databases, and prompt engineering. Use for building AI applications with Hamilton. +allowed-tools: Read, Grep, Glob, Bash(python:*), Bash(pytest:*) +user-invocable: true +disable-model-invocation: false +--- + +# Hamilton for LLM & AI Workflows + +This skill covers patterns for building LLM applications, RAG pipelines, and AI workflows with Apache Hamilton. + +## Why Hamilton for LLM Workflows? + +**Key Benefits:** +- **Modular prompts**: Each prompt component is a testable function +- **Dependency tracking**: Clear lineage from data → embeddings → retrieval → generation +- **Async parallelization**: Multiple LLM calls happen concurrently +- **Caching**: Avoid redundant expensive API calls +- **Observability**: Track LLM calls, costs, and performance +- **Testing**: Unit test prompts, retrieval, and generation separately + +## RAG Pipeline Pattern + +**Complete RAG Implementation:** + +```python +"""RAG pipeline with Hamilton.""" +import openai +from typing import List, Dict +import aiohttp + +# 1. Document Loading & Processing +async def document_text(document_url: str) -> str: + """Fetch document from URL.""" + async with aiohttp.ClientSession() as session: + async with session.get(document_url) as resp: + return await resp.text() + +def document_chunks( + document_text: str, + chunk_size: int = 1000, + overlap: int = 100 +) -> List[str]: + """Split document into overlapping chunks.""" + chunks = [] + start = 0 + while start < len(document_text): + end = start + chunk_size + chunks.append(document_text[start:end]) + start = end - overlap + return chunks + +# 2. Embedding Generation +async def embeddings( + document_chunks: List[str], + embedding_model: str = 'text-embedding-3-small' +) -> List[List[float]]: + """Generate embeddings for chunks.""" + client = openai.AsyncOpenAI() + response = await client.embeddings.create( + input=document_chunks, + model=embedding_model + ) + return [item.embedding for item in response.data] + +# 3. Vector Store +async def vector_store_ids( + embeddings: List[List[float]], + document_chunks: List[str] +) -> List[str]: + """Store embeddings in vector database.""" + import pinecone + index = pinecone.Index('documents') + + vectors = [ + (f"chunk_{i}", emb, {"text": chunk}) + for i, (emb, chunk) in enumerate(zip(embeddings, document_chunks)) + ] + await index.upsert(vectors) + return [v[0] for v in vectors] + +# 4. Query & Retrieval +async def query_embedding( + query: str, + embedding_model: str = 'text-embedding-3-small' +) -> List[float]: + """Generate embedding for query.""" + client = openai.AsyncOpenAI() + response = await client.embeddings.create( + input=[query], + model=embedding_model + ) + return response.data[0].embedding + +async def retrieved_chunks( + query_embedding: List[float], + top_k: int = 5 +) -> List[str]: + """Retrieve relevant chunks from vector store.""" + import pinecone + index = pinecone.Index('documents') + + results = await index.query( + vector=query_embedding, + top_k=top_k, + include_metadata=True + ) + return [match['metadata']['text'] for match in results['matches']] + +# 5. Prompt Construction +def rag_prompt(query: str, retrieved_chunks: List[str]) -> str: + """Construct RAG prompt with context.""" + context = "\n\n".join(retrieved_chunks) + return f"""Answer the question based on the context below. + +Context: +{context} + +Question: {query} + +Answer:""" + +# 6. LLM Generation +async def llm_response( + rag_prompt: str, + model: str = "gpt-4" +) -> str: + """Generate response using LLM.""" + client = openai.AsyncOpenAI() + response = await client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": rag_prompt}] + ) + return response.choices[0].message.content + +# Execute the RAG pipeline +from hamilton import async_driver +import rag_pipeline + +dr = await async_driver.Builder().with_modules(rag_pipeline).build() + +# Indexing phase +await dr.execute( + ['vector_store_ids'], + inputs={'document_url': 'https://example.com/doc.pdf'} +) + +# Query phase +result = await dr.execute( + ['llm_response'], + inputs={'query': 'What is the main topic?'} +) +``` + +## Multi-Provider Pattern + +**Support multiple LLM providers:** + +```python +"""Multi-provider LLM configuration.""" +from hamilton.function_modifiers import config +import openai +import anthropic + +@config.when(provider='openai') +def llm_client__openai() -> openai.AsyncOpenAI: + """OpenAI client.""" + return openai.AsyncOpenAI() + +@config.when(provider='anthropic') +def llm_client__anthropic() -> anthropic.AsyncAnthropic: + """Anthropic client.""" + return anthropic.AsyncAnthropic() + +@config.when(provider='openai') +async def llm_response__openai( + llm_client: openai.AsyncOpenAI, + prompt: str +) -> str: + """Generate with OpenAI.""" + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": prompt}] + ) + return response.choices[0].message.content + +@config.when(provider='anthropic') +async def llm_response__anthropic( + llm_client: anthropic.AsyncAnthropic, + prompt: str +) -> str: + """Generate with Anthropic.""" + response = await llm_client.messages.create( + model="claude-3-5-sonnet-20241022", + max_tokens=1024, + messages=[{"role": "user", "content": prompt}] + ) + return response.content[0].text + +# Switch providers via config +dr = await async_driver.Builder()\ + .with_config({'provider': 'anthropic'})\ + .with_modules(llm_module)\ + .build() +``` + +## Parallel LLM Calls + +**Multiple analyses in parallel:** + +```python +"""Run multiple LLM analyses concurrently.""" +import openai + +async def llm_client() -> openai.AsyncOpenAI: + """Shared LLM client.""" + return openai.AsyncOpenAI() + +def summarization_prompt(document: str) -> str: + """Prompt for summarization.""" + return f"Summarize this document:\n\n{document}" + +def keyword_prompt(document: str) -> str: + """Prompt for keyword extraction.""" + return f"Extract 5 key topics from this document:\n\n{document}" + +def sentiment_prompt(document: str) -> str: + """Prompt for sentiment analysis.""" + return f"Analyze the sentiment of this document:\n\n{document}" + +# These three run in parallel! +async def summary(llm_client: openai.AsyncOpenAI, summarization_prompt: str) -> str: + """Generate summary.""" + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": summarization_prompt}] + ) + return response.choices[0].message.content + +async def keywords(llm_client: openai.AsyncOpenAI, keyword_prompt: str) -> List[str]: + """Extract keywords.""" + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": keyword_prompt}] + ) + return response.choices[0].message.content.split('\n') + +async def sentiment(llm_client: openai.AsyncOpenAI, sentiment_prompt: str) -> str: + """Analyze sentiment.""" + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": sentiment_prompt}] + ) + return response.choices[0].message.content + +def document_analysis(summary: str, keywords: List[str], sentiment: str) -> dict: + """Combine all analyses.""" + return {"summary": summary, "keywords": keywords, "sentiment": sentiment} + +# Result: 3 LLM calls happen concurrently, ~3x faster! +``` + +## Caching LLM Calls + +**Save API costs with caching:** + +```python +"""Cache expensive LLM calls.""" +from hamilton.function_modifiers import cache +import openai + +@cache(behavior="default") # Cache LLM responses +async def document_summary(document_text: str, llm_client: openai.AsyncOpenAI) -> str: + """Generate summary (cached). + + First call: Makes API request, caches result + Subsequent calls: Retrieves from cache (free & instant!) + """ + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=[{ + "role": "user", + "content": f"Summarize this document:\n\n{document_text}" + }] + ) + return response.choices[0].message.content + +# First run: Costs $0.01 +# Second run: $0.00, instant retrieval +# Savings on 1000 documents: $10! +``` + +## Prompt Engineering Patterns + +**Modular prompts:** + +```python +"""Build complex prompts from components.""" +def system_message(task_type: str) -> str: + """System message based on task.""" + templates = { + "summarize": "You are an expert at creating concise summaries.", + "extract": "You are an expert at extracting structured information.", + "analyze": "You are an expert at analyzing content and providing insights." + } + return templates[task_type] + +def user_context(document: str, max_length: int = 2000) -> str: + """Truncate document if needed.""" + return document[:max_length] if len(document) > max_length else document + +def instruction(task_type: str) -> str: + """Task-specific instruction.""" + instructions = { + "summarize": "Provide a 3-sentence summary.", + "extract": "Extract key entities and dates.", + "analyze": "Analyze the main themes and sentiment." + } + return instructions[task_type] + +def messages(system_message: str, user_context: str, instruction: str) -> List[dict]: + """Combine into message list.""" + return [ + {"role": "system", "content": system_message}, + {"role": "user", "content": f"{user_context}\n\n{instruction}"} + ] + +async def llm_result(llm_client: openai.AsyncOpenAI, messages: List[dict]) -> str: + """Execute LLM call.""" + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=messages + ) + return response.choices[0].message.content +``` + +## Structured Output with Pydantic + +**Parse LLM output into structured data:** + +```python +"""Structured extraction with validation.""" +from pydantic import BaseModel, Field +from typing import List +import openai + +class ExtractedEntity(BaseModel): + """Structured entity.""" + name: str = Field(description="Entity name") + type: str = Field(description="Entity type (person, org, location)") + relevance: float = Field(description="Relevance score 0-1", ge=0, le=1) + +class ExtractionResult(BaseModel): + """Complete extraction result.""" + entities: List[ExtractedEntity] + summary: str + +async def structured_extraction( + document: str, + llm_client: openai.AsyncOpenAI +) -> ExtractionResult: + """Extract structured data from document.""" + response = await llm_client.beta.chat.completions.parse( + model="gpt-4o-2024-08-06", + messages=[{ + "role": "user", + "content": f"Extract entities from:\n\n{document}" + }], + response_format=ExtractionResult + ) + return response.choices[0].message.parsed +``` + +## Agent Patterns + +**Multi-step agent with Hamilton:** + +```python +"""Agent with tool use.""" +from typing import Literal + +def agent_prompt(query: str, available_tools: List[str]) -> str: + """Create agent prompt with tools.""" + tools_desc = "\n".join(f"- {tool}" for tool in available_tools) + return f"""You have access to these tools: +{tools_desc} + +User query: {query} + +What tool should be used? Respond with just the tool name.""" + +async def tool_selection( + llm_client: openai.AsyncOpenAI, + agent_prompt: str +) -> Literal["search", "calculate", "summarize"]: + """LLM selects which tool to use.""" + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": agent_prompt}] + ) + return response.choices[0].message.content.strip().lower() + +@config.when_in(tool_selection=["search"]) +async def tool_result__search(query: str) -> str: + """Execute search tool.""" + # Implement search logic + return "Search results..." + +@config.when_in(tool_selection=["calculate"]) +def tool_result__calculate(query: str) -> str: + """Execute calculation tool.""" + # Implement calculation logic + return "Calculation result..." + +async def final_response( + llm_client: openai.AsyncOpenAI, + query: str, + tool_result: str +) -> str: + """Generate final response with tool result.""" + response = await llm_client.chat.completions.create( + model="gpt-4", + messages=[{ + "role": "user", + "content": f"User asked: {query}\nTool returned: {tool_result}\n\nProvide final answer:" + }] + ) + return response.choices[0].message.content +``` + +## Testing LLM Workflows + +**Unit test prompts and logic:** + +```python +"""Test LLM components.""" +import pytest + +def test_prompt_construction(): + """Test prompt building logic.""" + from llm_module import rag_prompt + + query = "What is Hamilton?" + chunks = ["Hamilton is a framework", "It builds DAGs"] + + prompt = rag_prompt(query, chunks) + + assert "Hamilton is a framework" in prompt + assert "What is Hamilton?" in prompt + assert "Context:" in prompt + +async def test_retrieval(): + """Test retrieval logic (mock vector store).""" + # Mock vector store responses + # Test retrieval function + pass + +def test_structured_output(): + """Test Pydantic parsing.""" + from llm_module import ExtractionResult, ExtractedEntity + + result = ExtractionResult( + entities=[ + ExtractedEntity(name="Hamilton", type="product", relevance=0.9) + ], + summary="A framework for building DAGs" + ) + + assert len(result.entities) == 1 + assert result.entities[0].name == "Hamilton" +``` + +## Best Practices + +1. **Modularize prompts** - Each component is a testable function +2. **Cache aggressively** - LLM calls are expensive +3. **Use async** - Parallelize independent LLM calls +4. **Structure outputs** - Use Pydantic for parsing +5. **Handle failures** - Add retry logic and fallbacks +6. **Track costs** - Monitor token usage +7. **Version prompts** - Use config for prompt variants + +## Additional Resources + +- For async patterns, use `/hamilton-scale` +- For observability, use `/hamilton-observability` +- Apache Hamilton LLM examples: github.com/apache/hamilton/tree/main/examples/LLM_Workflows diff --git a/.claude-plugin/skills/observability/SKILL.md b/.claude-plugin/skills/observability/SKILL.md new file mode 100644 index 000000000..5346f6283 --- /dev/null +++ b/.claude-plugin/skills/observability/SKILL.md @@ -0,0 +1,493 @@ + + +--- +name: hamilton-observability +description: Hamilton UI and SDK patterns for tracking, monitoring, and debugging dataflows. Use for observability, lineage tracking, and production monitoring. +allowed-tools: Read, Grep, Glob, Bash(python:*), Bash(hamilton:*) +user-invocable: true +disable-model-invocation: false +--- + +# Hamilton Observability & UI + +This skill covers the Hamilton UI, SDK, and observability patterns for tracking and monitoring your dataflows in development and production. + +## What is Hamilton UI? + +Hamilton UI is a web-based dashboard for: +- **Tracking DAG executions** - See every run with inputs, outputs, and timing +- **Visualizing dataflows** - Interactive DAG visualization +- **Debugging failures** - Inspect errors and intermediate values +- **Lineage tracking** - Understand data provenance +- **Performance monitoring** - Identify bottlenecks +- **Team collaboration** - Share DAGs and results + +## Quick Start + +### 1. Install Hamilton with UI Support + +```bash +pip install "sf-hamilton[sdk,ui]" +``` + +### 2. Start the Hamilton UI + +```bash +# Start the UI server locally +hamilton ui + +# UI will be available at http://localhost:8241 +``` + +### 3. Add Tracking to Your Code + +```python +"""Add HamiltonTracker to your driver.""" +from hamilton_sdk import adapters +from hamilton import driver + +# Create tracker +tracker = adapters.HamiltonTracker( + project_id=1, # Your project ID from UI + username="your.email@example.com", + dag_name="my_pipeline", + tags={"environment": "dev", "team": "data-science"} +) + +# Build driver with tracker +dr = driver.Builder()\ + .with_config(your_config)\ + .with_modules(*your_modules)\ + .with_adapters(tracker)\ + .build() + +# Execute as normal - runs are automatically tracked! +results = dr.execute(['final_output'], inputs={'data_path': 'data.csv'}) +``` + +### 4. View in UI + +Open http://localhost:8241 and see: +- Your DAG visualization +- Execution history +- Node-level timing +- Input/output values + +## HamiltonTracker Features + +### Basic Tracking + +```python +"""Minimal tracking setup.""" +from hamilton_sdk import adapters + +tracker = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="etl_pipeline" +) + +# Attach to driver +dr = driver.Builder().with_adapters(tracker).build() +``` + +### Advanced Tracking with Tags + +```python +"""Use tags for filtering and organization.""" +tracker = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="ml_training", + tags={ + "environment": "production", + "model_version": "v2.1", + "team": "ml-platform", + "experiment_id": "exp_123" + } +) + +# Tags appear in UI for filtering and search +``` + +### Async Tracking + +```python +"""Track async workflows.""" +from hamilton import async_driver +from hamilton_sdk import adapters + +tracker = adapters.AsyncHamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="async_rag_pipeline" +) + +dr = await async_driver.Builder()\ + .with_modules(async_module)\ + .with_adapters(tracker)\ + .build() + +result = await dr.execute(['llm_response'], inputs={'query': 'test'}) +``` + +## Project Organization + +### Creating Projects + +Projects group related DAGs together: + +```bash +# Create project via UI +# 1. Open http://localhost:8241 +# 2. Click "New Project" +# 3. Name it (e.g., "Customer Analytics") +# 4. Get the project_id + +# Or via API +import requests +response = requests.post( + "http://localhost:8241/api/v1/projects", + json={"name": "Customer Analytics", "description": "Customer data pipelines"} +) +project_id = response.json()['id'] +``` + +### Organizing by Team + +```python +"""Organize DAGs by team and environment.""" +# Team A - Development +tracker_team_a_dev = adapters.HamiltonTracker( + project_id=1, # "Team A Analytics" project + username="user@example.com", + dag_name="user_segmentation", + tags={"team": "team-a", "env": "dev"} +) + +# Team A - Production +tracker_team_a_prod = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="user_segmentation", + tags={"team": "team-a", "env": "prod"} +) + +# Team B - Different project +tracker_team_b = adapters.HamiltonTracker( + project_id=2, # "Team B ML" project + username="user@example.com", + dag_name="recommendation_model", + tags={"team": "team-b", "env": "dev"} +) +``` + +## Debugging with Hamilton UI + +### Inspecting Failed Runs + +When a DAG fails, the UI shows: +1. **Which node failed** - Visual highlighting +2. **Error message** - Full stack trace +3. **Inputs to failed node** - Inspect what caused the failure +4. **Successful nodes** - What completed before failure +5. **Timing** - Where time was spent before failure + +```python +"""DAG fails at 'processed_data' node.""" +# In UI: +# - Navigate to failed run +# - Click on red 'processed_data' node +# - See error: "ValueError: Cannot convert string to float" +# - Inspect inputs: raw_data contains 'N/A' strings +# - Fix data cleaning logic +``` + +### Comparing Runs + +Compare two DAG runs side-by-side: +- Input differences +- Timing changes +- Output value changes +- Code changes + +```python +"""Compare dev vs prod performance.""" +# Run 1: Development (10 seconds) +# Run 2: Production (45 seconds) + +# In UI: +# - Select both runs +# - Click "Compare" +# - See: 'feature_engineering' node is 8x slower in prod +# - Reason: Prod has 10x more data +# - Solution: Add caching or parallelize +``` + +### Node-Level Inspection + +Drill into any node to see: +- Execution time +- Input values +- Output values (if stored) +- Error details (if failed) +- Code version + +## Lineage Tracking + +### Understanding Data Provenance + +Hamilton UI automatically tracks: +- **Upstream dependencies** - What data contributed to this result? +- **Downstream impact** - What depends on this node? +- **Cross-DAG lineage** - Track data between different pipelines + +```python +"""Track lineage across training and inference.""" +# Training pipeline +training_tracker = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="model_training", + tags={"stage": "training", "model_version": "v2.1"} +) + +# Inference pipeline (same project) +inference_tracker = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="model_inference", + tags={"stage": "inference", "model_version": "v2.1"} +) + +# In UI: Filter by model_version="v2.1" to see both pipelines +``` + +## Production Monitoring + +### Key Metrics to Track + +```python +"""Track production metrics.""" +tracker = adapters.HamiltonTracker( + project_id=1, + username="service@example.com", + dag_name="production_etl", + tags={ + "environment": "production", + "service": "data-pipeline", + "version": os.getenv("SERVICE_VERSION", "unknown"), + "host": os.getenv("HOSTNAME", "unknown") + } +) + +# Monitor in UI: +# - Execution frequency (runs per hour) +# - Success rate (failures per day) +# - Execution time trends +# - Node-level performance +``` + +### Alerting on Failures + +```python +"""Set up failure notifications.""" +# Hamilton UI can send alerts on: +# - DAG failures +# - Slow executions (> threshold) +# - Specific node failures + +# Configure in UI: +# 1. Go to Project Settings +# 2. Set up webhook or email alerts +# 3. Define alert conditions +``` + +### Performance Monitoring + +Track performance over time: + +```python +"""Monitor performance degradation.""" +# Week 1: Average execution time = 5 minutes +# Week 2: Average execution time = 8 minutes +# Week 3: Average execution time = 12 minutes + +# In UI: +# - View execution time chart +# - Identify 'data_processing' node is slowing down +# - Root cause: Data volume increased 3x +# - Solution: Add partitioning or switch to Spark +``` + +## Integration with Other Tools + +### MLflow Integration + +```python +"""Track both Hamilton and MLflow.""" +from hamilton_sdk import adapters +import mlflow + +hamilton_tracker = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="ml_training" +) + +# Use both adapters +dr = driver.Builder()\ + .with_adapters(hamilton_tracker, mlflow_tracker)\ + .build() + +# Results tracked in both Hamilton UI and MLflow +``` + +### Airflow Integration + +```python +"""Track Hamilton DAGs in Airflow tasks.""" +from airflow import DAG +from airflow.operators.python import PythonOperator +from hamilton_sdk import adapters + +def run_hamilton_pipeline(**context): + """Execute Hamilton with tracking.""" + tracker = adapters.HamiltonTracker( + project_id=1, + username="airflow@example.com", + dag_name="airflow_etl", + tags={ + "airflow_dag": context['dag'].dag_id, + "airflow_run": context['run_id'], + "task": context['task_instance'].task_id + } + ) + + dr = driver.Builder()\ + .with_modules(my_module)\ + .with_adapters(tracker)\ + .build() + + return dr.execute(['output'], inputs=context['params']) + +with DAG('my_dag', schedule_interval='@daily') as dag: + task = PythonOperator( + task_id='hamilton_pipeline', + python_callable=run_hamilton_pipeline + ) +``` + +## SDK Advanced Usage + +### Querying Runs Programmatically + +```python +"""Query Hamilton UI via SDK.""" +from hamilton_sdk import client + +# Connect to Hamilton UI +hc = client.HamiltonClient( + base_url="http://localhost:8241", + username="user@example.com" +) + +# Get recent runs +runs = hc.get_runs( + project_id=1, + dag_name="my_pipeline", + limit=10 +) + +for run in runs: + print(f"Run {run.id}: {run.status} in {run.duration}s") + +# Get specific run details +run_detail = hc.get_run(run_id=runs[0].id) +print(f"Inputs: {run_detail.inputs}") +print(f"Outputs: {run_detail.outputs}") +``` + +### Custom Metadata + +```python +"""Add custom metadata to runs.""" +tracker = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="my_pipeline", + tags={ + "git_commit": subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip(), + "git_branch": subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).decode().strip(), + "dataset_version": "v2024.01", + "experiment_name": "baseline_v2" + } +) + +# All metadata searchable in UI +``` + +## Best Practices + +1. **Use descriptive dag_names** - Make them searchable (e.g., "user_segmentation_daily" not "pipeline_1") +2. **Tag consistently** - Use standard keys (environment, team, version) +3. **Track production** - Always enable tracking in production +4. **Monitor trends** - Set up dashboards for key metrics +5. **Clean up old runs** - Archive or delete runs after retention period +6. **Use projects** - Organize by team/domain, not by environment +7. **Document tags** - Create team standard for tag keys and values + +## Troubleshooting + +### UI Not Showing Runs + +```bash +# Check UI is running +curl http://localhost:8241/api/v1/ping + +# Check tracker configuration +tracker = adapters.HamiltonTracker( + project_id=1, # Does this project exist? + username="user@example.com", # Is this user registered? + dag_name="my_pipeline", + api_url="http://localhost:8241" # Override if UI is on different host +) +``` + +### Slow UI Performance + +```python +"""Optimize tracking for large DAGs.""" +tracker = adapters.HamiltonTracker( + project_id=1, + username="user@example.com", + dag_name="large_pipeline", + # Don't capture large outputs + capture_data_statistics=False, # Skip stats collection + # Or be selective about what to capture +) +``` + +## Additional Resources + +- For core Hamilton patterns, use `/hamilton-core` +- For scaling patterns, use `/hamilton-scale` +- Hamilton UI docs: hamilton.apache.org/concepts/ui +- Hamilton SDK docs: github.com/apache/hamilton/tree/main/ui/sdk diff --git a/.claude-plugin/skills/scale/SKILL.md b/.claude-plugin/skills/scale/SKILL.md new file mode 100644 index 000000000..104dc1eee --- /dev/null +++ b/.claude-plugin/skills/scale/SKILL.md @@ -0,0 +1,347 @@ + + +--- +name: hamilton-scale +description: Performance and parallelization patterns for Hamilton including async I/O, Spark, Ray, Dask, caching, and multithreading. Use for scaling Hamilton workflows. +allowed-tools: Read, Grep, Glob, Bash(python:*), Bash(pytest:*) +user-invocable: true +disable-model-invocation: false +--- + +# Hamilton Scaling & Performance + +This skill covers parallelization strategies and performance optimization for Apache Hamilton workflows. + +## When to Scale + +Choose your scaling strategy based on workload: +- **Async**: I/O-bound operations (API calls, database queries) +- **MultiThreading**: Synchronous I/O (legacy APIs without async support) +- **Spark**: Large datasets (multi-GB to multi-TB) on clusters +- **Ray/Dask**: Distributed Python computation +- **Caching**: Avoid redundant expensive computations + +## Async Execution for I/O-Bound Workflows + +**When to Use:** +- API calls (LLM providers, REST APIs, GraphQL) +- Database queries (async ORM operations) +- Dependent chains of I/O-bound calls +- Multiple parallel external service calls + +**Key Benefits:** +- Automatic parallelization of independent async operations +- Natural expression of dependent chains +- Mix async and sync functions in the same DAG +- Significantly faster than sequential execution + +**Basic Async Pattern:** +```python +from hamilton import async_driver +import aiohttp +from typing import List + +# Mix async and sync functions +async def api_data(user_id: str) -> dict: + """Fetch from API (async I/O).""" + async with aiohttp.ClientSession() as session: + async with session.get(f"https://api.example.com/users/{user_id}") as resp: + return await resp.json() + +def transform_data(api_data: dict) -> dict: + """Transform data (sync CPU).""" + return {k: v.upper() if isinstance(v, str) else v for k, v in api_data.items()} + +async def save_data(transform_data: dict) -> str: + """Save to database (async I/O).""" + async with db_pool.acquire() as conn: + await conn.execute("INSERT INTO data VALUES ($1)", transform_data) + return "success" + +# Use AsyncDriver +import my_async_module +dr = await async_driver.Builder().with_modules(my_async_module).build() +result = await dr.execute(['save_data'], inputs={'user_id': '123'}) +``` + +**Parallel I/O Operations:** +```python +# These three operations execute in parallel automatically! +async def user_data(user_id: str) -> dict: + """Fetch user data.""" + async with aiohttp.ClientSession() as session: + async with session.get(f"https://api.example.com/users/{user_id}") as resp: + return await resp.json() + +async def user_orders(user_id: str) -> List[dict]: + """Fetch user orders (parallel with user_data).""" + async with aiohttp.ClientSession() as session: + async with session.get(f"https://api.example.com/orders?user={user_id}") as resp: + return await resp.json() + +async def user_preferences(user_id: str) -> dict: + """Fetch preferences (parallel with both above).""" + async with db_pool.acquire() as conn: + return await conn.fetchrow("SELECT * FROM preferences WHERE user_id=$1", user_id) + +def user_profile(user_data: dict, user_orders: List[dict], user_preferences: dict) -> dict: + """Combine all data (waits for all three to complete).""" + return {"data": user_data, "orders": user_orders, "preferences": user_preferences} +``` + +## MultiThreading for Sync I/O + +For synchronous I/O-bound code (legacy APIs, blocking libraries): + +```python +from hamilton import driver +from hamilton.execution import executors + +# Use multithreading for sync I/O operations +dr = driver.Builder()\ + .with_modules(my_functions)\ + .with_local_executor(executors.MultiThreadingExecutor(max_tasks=10))\ + .build() + +# Sync functions that do I/O will run concurrently +results = dr.execute(['final_output'], inputs={...}) +``` + +**When to Use:** +- Synchronous I/O code (requests library, blocking DB drivers) +- Legacy APIs without async support +- Simple parallelization without code rewrite + +## Scaling with Apache Spark + +**When to Use Spark:** +- Dataset size exceeds single-machine memory (multi-GB to multi-TB) +- You have access to a Spark cluster +- Need distributed data processing + +**Basic PySpark Pattern:** +```python +from pyspark.sql import DataFrame as SparkDataFrame, SparkSession +from hamilton.plugins import h_spark + +def raw_data(spark_session: SparkSession) -> SparkDataFrame: + """Load data into Spark.""" + return spark_session.read.csv("data.csv", header=True) + +def filtered_data(raw_data: SparkDataFrame) -> SparkDataFrame: + """Filter using Spark operations.""" + return raw_data.filter(raw_data.age > 18) + +def aggregated_stats(filtered_data: SparkDataFrame) -> SparkDataFrame: + """Aggregate using Spark.""" + return filtered_data.groupBy("country").count() + +# Driver Setup +dr = driver.Builder()\ + .with_modules(my_spark_functions)\ + .with_adapters(h_spark.SPARK_INPUT_CHECK)\ + .build() + +result = dr.execute(['aggregated_stats'], inputs={'spark_session': spark}) +``` + +**Column-Level Transformations with @with_columns:** +```python +from hamilton.plugins.h_spark import with_columns +import pandas as pd + +# File: map_transforms.py +def normalized_amount(amount: pd.Series) -> pd.Series: + """Pandas UDF for normalization.""" + return (amount - amount.mean()) / amount.std() + +def amount_category(normalized_amount: pd.Series) -> pd.Series: + """Categorize based on normalized amount.""" + return pd.cut(normalized_amount, bins=[-float('inf'), -1, 1, float('inf')], + labels=['low', 'medium', 'high']) + +# Main dataflow +@with_columns( + map_transforms, + columns_to_pass=["amount"] +) +def enriched_data(raw_data: SparkDataFrame) -> SparkDataFrame: + """Apply pandas UDFs to Spark DataFrame.""" + return raw_data +``` + +**Spark Best Practices:** +1. Keep transformations lazy - Don't call `.collect()` until final nodes +2. Use `@with_columns` for map operations +3. Partition wisely between major operations +4. Cache DataFrames accessed multiple times +5. Test with small data first (`.limit(1000)`) + +## Ray for Distributed Python + +**When to Use Ray:** +- Distributed Python computation +- Machine learning workloads +- Custom parallelization logic + +```python +from hamilton.plugins import h_ray +import ray + +ray.init() + +ray_executor = h_ray.RayGraphAdapter(result_builder={"base": dict}) + +dr = driver.Driver({}, my_functions, adapter=ray_executor) +results = dr.execute(['large_computation'], inputs={...}) +``` + +## Dask for Parallel Computing + +**When to Use Dask:** +- Pandas-like operations on larger-than-memory datasets +- Parallel computation on single machine or cluster +- Incremental migration from pandas + +```python +from hamilton.plugins import h_dask +from dask import distributed + +cluster = distributed.LocalCluster() +client = distributed.Client(cluster) + +dask_executor = h_dask.DaskExecutor(client=client) + +dr = driver.Builder()\ + .with_remote_executor(dask_executor)\ + .with_modules(my_functions)\ + .build() +``` + +## Caching for Performance + +**When to Use Caching:** +- Expensive API calls (LLM inference, external services) +- Time-consuming data processing +- Iterative development in notebooks +- Shared preprocessing between pipelines + +**Basic Caching Setup:** +```python +from hamilton import driver + +# Enable caching +dr = driver.Builder()\ + .with_modules(my_functions)\ + .with_cache()\ + .build() + +# First execution: computes and caches +result1 = dr.execute(['final_output'], inputs={'data_path': 'data.csv'}) + +# Second execution: retrieves from cache (instant!) +result2 = dr.execute(['final_output'], inputs={'data_path': 'data.csv'}) +``` + +**Controlling Cache Behavior:** +```python +from hamilton.function_modifiers import cache + +# Always recompute (for data loaders) +@cache(behavior="recompute") +def live_api_data(api_key: str) -> dict: + """Always fetch fresh data.""" + import requests + response = requests.get("https://api.example.com/data", + headers={"Authorization": api_key}) + return response.json() + +# Never cache (for non-deterministic operations) +@cache(behavior="disable") +def random_sample(data: pd.DataFrame) -> pd.DataFrame: + """Random sampling should not be cached.""" + return data.sample(frac=0.1) + +# Custom format for efficiency +@cache(format="parquet") +def large_dataframe(processed_data: pd.DataFrame) -> pd.DataFrame: + """Store as parquet for efficiency.""" + return processed_data +``` + +**Driver-Level Cache Control:** +```python +dr = driver.Builder()\ + .with_modules(my_functions)\ + .with_cache( + recompute=['raw_data'], # Always recompute these + disable=['random_sample'], # Never cache these + path="./my_cache" # Custom location + )\ + .build() + +# Force complete refresh +dr = driver.Builder()\ + .with_modules(my_functions)\ + .with_cache(recompute=True)\ + .build() +``` + +**Cache Inspection:** +```python +# Visualize what was cached vs executed +dr.cache.view_run() # Green = cache hit, Orange = executed + +# Access cached results +run_id = dr.cache.last_run_id +data_version = dr.cache.data_versions[run_id]['my_node'] +cached_result = dr.cache.result_store.get(data_version) +``` + +## Choosing the Right Scaling Strategy + +**Decision Matrix:** + +| Workload Type | Strategy | Use When | +|--------------|----------|----------| +| I/O-bound (async-capable) | AsyncDriver | Multiple API calls, async libraries available | +| I/O-bound (sync only) | MultiThreading | Legacy APIs, blocking I/O | +| Large datasets | Spark | Multi-GB/TB data, cluster available | +| Python computation | Ray/Dask | Custom parallel logic, ML workloads | +| Expensive operations | Caching | Repeated computations, LLM calls | + +**Combining Strategies:** +- Cache + Async = Cache expensive API calls, parallelize independent calls +- Spark + Caching = Cache intermediate Spark results +- Ray + Caching = Distribute computation, cache results + +## Performance Tips + +1. **Profile first** - Identify bottlenecks before optimizing +2. **Start simple** - Begin with caching, add parallelization if needed +3. **Measure impact** - Benchmark before and after changes +4. **Consider costs** - Serialization overhead can negate parallelization benefits +5. **Test at scale** - Small data may hide parallelization overhead + +## Additional Resources + +- For basic Hamilton patterns, use `/hamilton-core` +- For LLM-specific workflows, use `/hamilton-llm` +- Apache Hamilton docs: hamilton.apache.org/concepts/parallel-task diff --git a/.gitignore b/.gitignore index 5ed52fc7b..8184334b7 100644 --- a/.gitignore +++ b/.gitignore @@ -155,3 +155,4 @@ examples/**/hamilton-env # hamilton default caching directory **/.hamilton_cache/ +.claude/settings.local.json diff --git a/docs/ecosystem/claude-code-plugin.md b/docs/ecosystem/claude-code-plugin.md new file mode 100644 index 000000000..d72c2a617 --- /dev/null +++ b/docs/ecosystem/claude-code-plugin.md @@ -0,0 +1,406 @@ + + +# Claude Code Plugin for Hamilton + +The Hamilton Claude Code plugin provides AI-powered assistance for developing Hamilton DAGs using [Claude Code](https://code.claude.com), Anthropic's official CLI tool. + +## What is Claude Code? + +[Claude Code](https://code.claude.com) is an AI-powered CLI tool that helps you build software faster by providing intelligent code assistance, debugging help, and code generation capabilities. The Hamilton plugin extends Claude Code with deep knowledge of Hamilton patterns and best practices. + +## Features + +The Hamilton Claude Code plugin provides expert assistance for: + +- 🏗️ **Creating new Hamilton modules** with proper patterns and decorators +- 🔍 **Understanding existing DAGs** by explaining dataflow and dependencies +- 🎨 **Applying function modifiers** correctly (@parameterize, @config.when, @check_output, etc.) +- 🐛 **Debugging issues** in DAG definitions and execution +- 🔄 **Converting Python scripts** to Hamilton modules +- ⚡ **Optimizing pipelines** with caching, parallelization, and best practices +- ✅ **Writing tests** for Hamilton functions +- 📊 **Generating visualizations** of your DAGs + +## Installation + +### Prerequisites + +First, install Claude Code: + +```bash +# Install Claude Code CLI +curl -fsSL https://cli.claude.ai/install.sh | sh +``` + +For more installation options, see the [Claude Code documentation](https://code.claude.com/docs/en/install.html). + +### Install the Hamilton Plugin + +Once Claude Code is installed, you can add the Hamilton plugin: + +```bash +# Add the Hamilton plugin marketplace +/plugin marketplace add apache/hamilton + +# Install the plugin (available across all projects) +/plugin install hamilton --scope user +``` + +Or combine into a single command: +```bash +claude plugin install hamilton@apache/hamilton --scope user +``` + +**Installation scopes:** +- `--scope user` - Available in all your projects (recommended) +- `--scope project` - Only in the current project +- `--scope local` - Testing/development only + +### For Contributors + +If you've cloned the Hamilton repository, the skill is already available in `.claude/plugins/hamilton/skills/hamilton/` and will be automatically discovered by Claude Code when you work in the repo. No installation needed! + +## Usage + +### Automatic Assistance + +Claude Code will automatically use the Hamilton plugin when it detects you're working with Hamilton code. Just ask questions or give instructions naturally: + +```bash +claude "Help me create a Hamilton module for processing customer data" +claude "Explain what this DAG does" +claude "Convert this pandas script to Hamilton" +claude "Add caching to my expensive computation function" +claude "Why am I getting a circular dependency error?" +``` + +### Manual Invocation + +You can explicitly invoke the skill using the `/hamilton` command: + +```bash +claude /hamilton create a feature engineering module with rolling averages +claude /hamilton explain the dataflow in my_functions.py +claude /hamilton optimize this DAG for parallel execution +``` + +### Interactive Mode + +For ongoing assistance, start an interactive session: + +```bash +# Start Claude Code in your Hamilton project +cd /path/to/your/hamilton/project +claude +``` + +Then interact naturally: +``` +You: I need to create a DAG that processes customer transactions +Claude: I'll help you create a Hamilton module for processing customer transactions... + +You: Add validation to ensure all amounts are positive +Claude: I'll add a @check_output decorator with validation... +``` + +## Examples + +### Creating a New Hamilton Module + +**Prompt:** +``` +Create a Hamilton module that loads sales data from a CSV, removes nulls, +calculates a 7-day rolling average, and outputs the top 10 days by sales. +``` + +**Claude generates:** +```python +"""Sales analysis DAG.""" +import pandas as pd + +def raw_sales(csv_path: str) -> pd.DataFrame: + """Load sales data from CSV. + + :param csv_path: Path to sales CSV file + :return: Raw sales DataFrame + """ + return pd.read_csv(csv_path) + +def cleaned_sales(raw_sales: pd.DataFrame) -> pd.DataFrame: + """Remove null values from sales data. + + :param raw_sales: Raw sales data + :return: Cleaned sales DataFrame + """ + return raw_sales.dropna() + +def rolling_avg_7d(cleaned_sales: pd.DataFrame) -> pd.Series: + """Calculate 7-day rolling average of sales. + + :param cleaned_sales: Cleaned sales data + :return: 7-day rolling average + """ + return cleaned_sales['sales'].rolling(window=7, min_periods=1).mean() + +def top_10_sales_days( + cleaned_sales: pd.DataFrame, + rolling_avg_7d: pd.Series +) -> pd.DataFrame: + """Get top 10 days by rolling average. + + :param cleaned_sales: Cleaned sales data + :param rolling_avg_7d: 7-day rolling average + :return: Top 10 sales days + """ + result = cleaned_sales.copy() + result['rolling_avg'] = rolling_avg_7d + return result.nlargest(10, 'rolling_avg') + +# Usage: +# from hamilton import driver +# dr = driver.Driver({}, __name__) +# results = dr.execute(['top_10_sales_days'], inputs={'csv_path': 'sales.csv'}) +``` + +### Converting Existing Code + +**Prompt:** +``` +Convert this script to Hamilton: + +import pandas as pd +df = pd.read_csv('data.csv') +df['feature'] = df['col_a'] * 2 + df['col_b'] +result = df.groupby('category')['feature'].mean() +``` + +**Claude refactors to:** +```python +"""Data processing DAG.""" +import pandas as pd + +def raw_data(data_path: str) -> pd.DataFrame: + """Load raw data from CSV.""" + return pd.read_csv(data_path) + +def feature(raw_data: pd.DataFrame) -> pd.Series: + """Calculate feature from columns.""" + return raw_data['col_a'] * 2 + raw_data['col_b'] + +def data_with_feature(raw_data: pd.DataFrame, feature: pd.Series) -> pd.DataFrame: + """Add feature column to dataset.""" + result = raw_data.copy() + result['feature'] = feature + return result + +def result(data_with_feature: pd.DataFrame) -> pd.Series: + """Calculate mean feature by category.""" + return data_with_feature.groupby('category')['feature'].mean() +``` + +### Applying Decorators + +**Prompt:** +``` +I need to create rolling averages for 7, 30, and 90 day windows. +How do I do this without repeating code? +``` + +**Claude suggests:** +```python +from hamilton.function_modifiers import parameterize + +@parameterize( + rolling_7d={'window': 7}, + rolling_30d={'window': 30}, + rolling_90d={'window': 90}, +) +def rolling_average(sales: pd.Series, window: int) -> pd.Series: + """Calculate rolling average for different windows. + + Creates three nodes: rolling_7d, rolling_30d, rolling_90d + """ + return sales.rolling(window=window, min_periods=1).mean() +``` + +### Debugging + +**Prompt:** +``` +I'm getting: "Could not find parameter 'processed_data' in graph" +``` + +**Claude analyzes:** +``` +This error occurs when a function parameter references a node that doesn't exist +in your DAG. Common causes: + +1. Typo in parameter name (e.g., 'processed_data' vs 'process_data') +2. Missing function definition for 'processed_data' +3. Function excluded via @hamilton_exclude or @config.when + +Check your module for a function named 'processed_data'. If it doesn't exist, +either: +- Add the function, or +- Fix the parameter name to match an existing function +``` + +## What the Plugin Knows + +The plugin has deep knowledge of Hamilton: + +### Core Concepts +- Function-based DAG definitions +- Driver configuration and execution +- Node dependencies via parameters +- Type hints and annotations + +### Function Modifiers +- **Configuration**: @config.when, @hamilton_exclude +- **Parameterization**: @parameterize, @parameterize_sources, @parameterize_values +- **Column extraction**: @extract_columns, @extract_fields +- **Data quality**: @check_output, @check_output_custom, @schema +- **I/O**: @save_to, @load_from, @dataloader, @datasaver +- **Transformation**: @pipe, @does, @mutate, @step +- **Advanced**: @subdag, @resolve, @cache + +### Integration Patterns +- Airflow orchestration +- FastAPI microservices +- Streamlit dashboards +- Jupyter notebooks +- Ray/Dask/Spark distributed execution + +### LLM Workflows +- RAG pipeline patterns +- Document chunking +- Vector database operations +- Embedding generation + +### Best Practices +- Testing strategies +- Code organization +- Error handling +- Performance optimization + +## Plugin Structure + +The plugin is organized as follows: + +``` +.claude/plugins/hamilton/ +├── .claude-plugin/ +│ ├── plugin.json # Plugin manifest +│ └── marketplace.json # Marketplace configuration +├── skills/ +│ └── hamilton/ +│ ├── SKILL.md # Main skill instructions +│ ├── examples.md # Code examples and patterns +│ └── README.md # Skill documentation +└── README.md # Plugin documentation +``` + +For contributors, the skill exists in `.claude/plugins/hamilton/skills/hamilton/` for immediate use. + +## Contributing + +Found a bug or want to improve the plugin? We'd love your help! + +### Report Issues + +Please [file an issue](https://github.com/apache/hamilton/issues/new) with: +- Clear description of the problem +- Steps to reproduce +- Expected vs actual behavior +- Hamilton and Claude Code versions + +### Submit Pull Requests + +1. Fork the repository: https://github.com/apache/hamilton +2. Make changes in `.claude/plugins/hamilton/skills/hamilton/` +3. Test thoroughly with various scenarios +4. Submit a PR with a clear description + +**Contribution ideas:** +- 📚 Add new examples to `examples.md` +- 📝 Improve instructions in `SKILL.md` +- 🐛 Fix bugs or inaccuracies +- ✨ Add support for new Hamilton features +- 📖 Enhance documentation + +See [CONTRIBUTING.md](https://github.com/apache/hamilton/blob/main/CONTRIBUTING.md) for guidelines. + +## Requirements + +- **Claude Code CLI** - v0.1.0 or later +- **Hamilton** - v1.0.0 or later (plugin works with any version) +- **Python** - 3.9 or later + +## Troubleshooting + +### Plugin Not Loading + +If the plugin isn't recognized: + +```bash +# Check installed plugins +claude plugin list + +# Reinstall if needed +claude plugin uninstall hamilton +claude plugin install hamilton@apache/hamilton --scope user +``` + +### Skill Not Activating + +If Claude doesn't seem to use Hamilton knowledge: + +```bash +# Manually invoke the skill +claude /hamilton + +# Or mention Hamilton explicitly in your prompt +claude "Using Hamilton framework, create a DAG for..." +``` + +### Permission Errors + +The plugin requests permission to: +- Read files (Read, Grep, Glob) +- Run Python code (python, pytest) +- Search files (find) + +If prompted, approve these permissions for the best experience. + +## Learn More + +- **Hamilton Documentation**: https://hamilton.apache.org +- **Claude Code Documentation**: https://code.claude.com/docs +- **Hamilton GitHub**: https://github.com/apache/hamilton +- **Hamilton Examples**: https://github.com/apache/hamilton/tree/main/examples +- **Community Slack**: Join via Hamilton GitHub repo + +## License + +This plugin is part of the Apache Hamilton project and is licensed under the Apache 2.0 License. + +--- + +**Enhance your Hamilton development with AI! 🚀** diff --git a/docs/ecosystem/index.md b/docs/ecosystem/index.md index 4c4dd9e86..161843a33 100644 --- a/docs/ecosystem/index.md +++ b/docs/ecosystem/index.md @@ -118,6 +118,7 @@ Improve your development workflow: |------------|-------------|---------------| | **Jupyter** | Notebook magic commands | [Examples](https://github.com/apache/hamilton/tree/main/examples/jupyter_notebook_magic) | | **VS Code** | Language server and extension | [VS Code Guide](../hamilton-vscode/index.rst) | +| **Claude Code** | AI assistant plugin for Hamilton development | [Plugin Guide](claude-code-plugin.md) | | **tqdm** | Progress bars | [Lifecycle Hook](../reference/lifecycle-hooks/ProgressBar.rst) | ### Cloud Providers & Infrastructure @@ -220,3 +221,9 @@ If you've created a plugin or integration for Apache Hamilton, we'd love to incl - ⭐ Star us on [GitHub](https://github.com/apache/hamilton) - 🐦 Follow [@hamilton_os](https://twitter.com/hamilton_os) on Twitter/X - 📧 Join the [mailing lists](../asf/index.rst) for announcements + +```{toctree} +:hidden: + +claude-code-plugin +```