diff --git a/.github/KOYEB-DEPLOYMENT.md b/.github/KOYEB-DEPLOYMENT.md new file mode 100644 index 0000000..4cd7c48 --- /dev/null +++ b/.github/KOYEB-DEPLOYMENT.md @@ -0,0 +1,119 @@ +# Koyeb Deployment Troubleshooting Guide + +This document provides detailed information about deploying to Koyeb and troubleshooting common issues, particularly related to Docker image access. + +## Common Deployment Errors + +### "Error while checking the validity of the docker image" + +This error typically occurs when Koyeb cannot access the Docker image specified in the deployment configuration. This could be due to: + +1. **The Docker image doesn't exist** + - The image tag might be incorrect + - The repository might not exist + - The image might not have been pushed successfully + +2. **Authentication issues** + - The Docker registry secret might be incorrectly configured + - The credentials might be incorrect or expired + - The secret might not be properly associated with the deployment + +3. **Registry path issues** + - The image path might be incorrectly formatted + - The registry domain might be missing or incorrect + +## Troubleshooting Steps + +### 1. Verify Docker Image Existence + +```bash +# Check if the image exists locally +docker images | grep duckdb-spawn + +# Try to pull the image to verify it's accessible +docker pull your-dockerhub-username/duckdb-spawn:your-tag + +# List tags in your Docker Hub repository +curl -s "https://registry.hub.docker.com/v2/repositories/your-dockerhub-username/duckdb-spawn/tags?page_size=100" | jq +``` + +### 2. Check Koyeb Secret Configuration + +```bash +# List existing secrets +koyeb secret list + +# Get details about the Docker registry secret +koyeb secret get DOCKER_REPO_SECRET + +# Delete and recreate the secret if needed +koyeb secret delete DOCKER_REPO_SECRET +koyeb secret create DOCKER_REPO_SECRET \ + --docker-registry-auth=username:password \ + --docker-registry-server=docker.io \ + --type=registry +``` + +### 3. Direct CLI Deployment + +If the GitHub Action is failing, try deploying directly with the CLI: + +```bash +# Create app if it doesn't exist +koyeb app create duckdb-spawn + +# Create service with explicit image reference +koyeb service create api \ + --app duckdb-spawn \ + --docker docker.io/username/duckdb-spawn:tag \ + --docker-private-registry-secret DOCKER_REPO_SECRET \ + --ports 8000:http \ + --routes /:8000 \ + --env "DATABASE_URL=/data/duckdb_spawn.db PYTHONUNBUFFERED=1" \ + --instance-type nano \ + --regions fra + +# Update existing service +koyeb service update api \ + --app duckdb-spawn \ + --docker docker.io/username/duckdb-spawn:tag \ + --docker-private-registry-secret DOCKER_REPO_SECRET +``` + +## Important Notes + +### Docker Hub Rate Limits + +Docker Hub has rate limits for image pulls: +- Anonymous: 100 pulls / 6 hours +- Free accounts: 200 pulls / 6 hours +- Pro accounts: Higher limits + +If you're hitting rate limits, consider: +- Authenticating all pull requests +- Using a Pro account +- Implementing a container registry cache + +### Docker Registry Credentials + +Best practices for Docker Hub credentials: +1. Use access tokens instead of passwords +2. Create tokens with limited scope (read-only if possible) +3. Rotate tokens regularly +4. Store tokens securely in GitHub Secrets + +### Koyeb Deployment Workflow + +The updated workflow in this repository: +1. First tries to deploy using the GitHub Action +2. If that fails, falls back to direct CLI deployment +3. If that fails, attempts to update an existing service + +This provides multiple paths to success with detailed error information at each stage. + +## Reference Documentation + +- [Koyeb Docker Deployment Documentation](https://www.koyeb.com/docs/docker-deploy) +- [GitHub Actions for Koyeb](https://www.koyeb.com/docs/deploy-with-github-actions) +- [Koyeb CLI Documentation](https://www.koyeb.com/docs/cli/installation-cli) +- [Docker Hub Authentication](https://docs.docker.com/docker-hub/access-tokens/) \ No newline at end of file diff --git a/.github/README.md b/.github/README.md new file mode 100644 index 0000000..4c75137 --- /dev/null +++ b/.github/README.md @@ -0,0 +1,145 @@ +# DuckDB-Spawn CI/CD Pipeline + +This document outlines the CI/CD workflows set up for the DuckDB-Spawn project. + +## Overview + +The project uses GitHub Actions for CI/CD with deployments to Koyeb for hosting. We have the following environments: + +- **Production**: Main application deployed from the `main` branch +- **Staging**: Testing environment deployed from the `dev` branch +- **Preview**: Temporary environments for pull requests and feature development + +## Workflows + +### 1. CI/CD Pipeline (ci-cd.yml) + +Handles continuous integration tasks including: +- Code formatting +- Linting +- Testing +- Building the Docker image + +Triggered on push to `main` and `dev` branches, and on all pull requests to `main`. + +### 2. Production Deployment (koyeb-deploy.yml) + +Deploys the application to the production environment on Koyeb: +- Builds and pushes the Docker image +- Deploys the image to Koyeb +- Tags the image as `latest` + +Triggered on push to the `main` branch. + +### 3. Staging Deployment (staging-deploy.yml) + +Deploys the application to the staging environment on Koyeb: +- Builds and pushes the Docker image with the `staging` tag +- Deploys the image to Koyeb staging environment + +Triggered on push to the `dev` branch. + +### 4. PR Preview Deployment (pr-preview.yml) + +Creates temporary preview environments for pull requests: +- Builds and pushes a Docker image for the feature branch +- Deploys to a dedicated preview environment on Koyeb +- Comments on the PR with the deployment URL + +Triggered on new and updated pull requests to `main` and `dev` branches. + +### 5. Cleanup (cleanup.yml) + +Removes temporary preview environments when branches are deleted: +- Cleans up Koyeb deployments +- Can be triggered manually for cleanup + +Triggered on branch deletion events. + +## Environment Setup + +The following secrets need to be configured in GitHub: + +- `DOCKER_HUB_USERNAME`: Your Docker Hub username +- `DOCKER_HUB_ACCESS_TOKEN`: Access token for Docker Hub +- `KOYEB_API_TOKEN`: API token for accessing Koyeb services +- `DOCKER_REPO_SECRET`: Secret for accessing private Docker repositories + +## Infrastructure as Code + +The `/infrastructure/pulumi` directory contains Pulumi infrastructure code for alternative deployment options: +- Docker-based local deployment +- Koyeb CLI deployment +- Koyeb native provider deployment + +## Best Practices + +1. **Branch Protection**: Enable branch protection for `main` and `dev` branches +2. **PR Reviews**: Require pull request reviews before merging +3. **Environment Deployment**: Use GitHub Environments for deployment approval +4. **Secrets Management**: Store all sensitive information in GitHub Secrets + +## Troubleshooting + +### Docker Registry Authentication Issues + +If you encounter errors related to Docker registry authentication, such as: + +``` +Error while checking the validity of the docker image: The image "docker.io/****/duckdb-spawn:COMMIT_HASH" was not found. +``` + +Try the following steps: + +1. **Run Debug Workflow**: Use the `debug-docker.yml` workflow to check Docker registry access +2. **Verify Secrets**: Ensure `DOCKER_HUB_USERNAME` and `DOCKER_HUB_ACCESS_TOKEN` are correctly set in GitHub Secrets +3. **Check Image Tags**: Verify that your Docker image is being correctly tagged and pushed +4. **Koyeb Secret**: Make sure the Koyeb Docker registry secret is correctly configured + +If you see errors like: + +``` +jq: error (at :1): Cannot iterate over null (null) +``` + +This usually means: +1. The Docker Hub repository doesn't exist yet - the workflow should now auto-create it +2. Your Docker Hub credentials don't have permission to access or create repositories +3. The Docker Hub API might be experiencing issues + +To resolve these problems: +1. Run the `debug-docker.yml` workflow which will try to create the repository +2. Check that your Docker Hub access token has appropriate permissions (read, write, delete) +3. If auto-creation fails, manually create the repository in the Docker Hub web interface + +### Koyeb Deployment Failures + +If deployments to Koyeb fail: + +1. **API Token**: Ensure your `KOYEB_API_TOKEN` is valid and has the correct permissions +2. **Service Configuration**: Verify the service configuration parameters in the workflow files +3. **Resource Limits**: Check if you've hit any resource limits in your Koyeb account +4. **Logs**: Review the Koyeb service logs for more detailed error information + +For detailed Koyeb deployment troubleshooting, see the [Koyeb Deployment Guide](./.github/KOYEB-DEPLOYMENT.md). + +### Robust Deployment Strategy + +Our deployment workflow now includes: + +1. **Multi-stage verification** of Docker images +2. **Multiple deployment methods** with fallbacks if the primary method fails: + - GitHub Action-based deployment + - Direct CLI deployment + - Service update method +3. **Detailed error reporting** at each stage +4. **Registry authentication optimization** to resolve common image access issues + +This approach ensures maximum reliability for deployments to both staging and production environments. + +### Common Workflow Fixes + +- **Linter Errors**: If you see YAML linter errors about environment values, remove the environment line if it's not needed +- **Secret Access**: If secret access fails, verify that secrets are available to the workflow +- **Action Versions**: Ensure you're using the latest versions of the GitHub Actions +- **Checkout Token Error**: If you see `Error: Input required and not supplied: token` with `actions/checkout@v4`, ensure you add `with: token: ${{ secrets.GITHUB_TOKEN }}` to the checkout step. \ No newline at end of file diff --git a/.github/workflows/ci-cd.yml b/.github/workflows/ci-cd.yml index 59019d2..d73144b 100644 --- a/.github/workflows/ci-cd.yml +++ b/.github/workflows/ci-cd.yml @@ -6,17 +6,125 @@ on: pull_request: branches: [ main ] +# Limit concurrent runs of the same workflow on the same ref +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: PYTHON_VERSION: '3.10' +permissions: + contents: write + jobs: + # For pull requests: Format code and auto-commit changes + format: + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.head_ref }} + token: ${{ secrets.WORKFLOW_PAT }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + cache: 'pip' + + - name: Install formatting dependencies + run: | + python -m pip install --upgrade pip + pip install black isort + + - name: Format with Black and isort + run: | + black src/ + isort src/ + + # Specifically format files that were mentioned in error message + black src/database/connection_manager.py src/routes/admin.py + isort src/database/connection_manager.py src/routes/admin.py + + - name: Commit changes + uses: stefanzweifel/git-auto-commit-action@v5 + with: + commit_message: "style: auto-format with Black and isort" + branch: ${{ github.head_ref }} + + # For direct pushes to main/dev: First check formatting + check-format: + runs-on: ubuntu-latest + if: github.event_name == 'push' && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev') + steps: + - uses: actions/checkout@v4 + with: + token: ${{ secrets.WORKFLOW_PAT }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + cache: 'pip' + + - name: Install formatting dependencies + run: | + python -m pip install --upgrade pip + pip install black isort + + - name: Check format with Black and isort + run: | + black --check src/ + isort --check src/ + + # For pull requests: Run tests after formatting + test-pr: + runs-on: ubuntu-latest + needs: [format] + if: github.event_name == 'pull_request' + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.head_ref }} + token: ${{ secrets.WORKFLOW_PAT }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + cache: 'pip' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install pytest pytest-cov + + - name: Run tests with coverage + run: | + pytest --cov=src --cov-report=xml + + - name: Upload coverage report + uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: coverage.xml + retention-days: 7 + + # For direct pushes to main/dev: Run tests after format checking test: runs-on: ubuntu-latest + needs: [check-format] + if: github.event_name == 'push' && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev') steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + token: ${{ secrets.WORKFLOW_PAT }} - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_VERSION }} cache: 'pip' @@ -29,15 +137,60 @@ jobs: - name: Run tests with coverage run: | - pytest --cov=src + pytest --cov=src --cov-report=xml + + - name: Upload coverage report + uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: coverage.xml + retention-days: 7 + # For pull requests: Run linting after formatting + lint-pr: + runs-on: ubuntu-latest + needs: [format] + if: github.event_name == 'pull_request' + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.head_ref }} + token: ${{ secrets.WORKFLOW_PAT }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + cache: 'pip' + + - name: Install linting dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 black isort + + - name: Run linters + run: | + # Run black formatter in check mode + black --check src/ + + # Run isort to check import sorting + isort --check-only src/ + + # Run flake8 for additional style checks + flake8 src/ --config=.flake8 + + # For direct pushes to main/dev: Run linting after format checking lint: runs-on: ubuntu-latest + needs: [check-format] + if: github.event_name == 'push' && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev') steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + token: ${{ secrets.WORKFLOW_PAT }} - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_VERSION }} cache: 'pip' @@ -56,4 +209,64 @@ jobs: isort --check-only src/ # Run flake8 for additional style checks - flake8 src/ --config=.flake8 \ No newline at end of file + flake8 src/ --config=.flake8 + + # Build and push Docker image after tests and linting pass + build: + runs-on: ubuntu-latest + needs: [test, lint] + if: github.event_name == 'push' && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev') + steps: + - uses: actions/checkout@v4 + with: + token: ${{ secrets.WORKFLOW_PAT }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Set up Docker layer caching + uses: actions/cache@v3 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-${{ github.sha }} + restore-keys: | + ${{ runner.os }}-buildx- + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_HUB_USERNAME }} + password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} + + - name: Build and tag Docker image + uses: docker/build-push-action@v5 + with: + context: . + push: false + load: true + tags: ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} + cache-from: | + type=local,src=/tmp/.buildx-cache + type=registry,ref=${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:latest + cache-to: type=local,dest=/tmp/.buildx-cache-new,mode=max + + - name: Test Docker image + run: | + docker run --rm ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} python -c "import sys; print(f'Python {sys.version} is working')" + + - name: Push Docker image + uses: docker/build-push-action@v5 + with: + context: . + push: true + tags: ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} + cache-from: | + type=local,src=/tmp/.buildx-cache + type=registry,ref=${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:latest + cache-to: type=inline + + # Move cache for next run + - name: Move cache + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache \ No newline at end of file diff --git a/.github/workflows/cleanup.yml b/.github/workflows/cleanup.yml new file mode 100644 index 0000000..8a87601 --- /dev/null +++ b/.github/workflows/cleanup.yml @@ -0,0 +1,37 @@ +name: Cleanup Koyeb Deployments + +on: + delete: + workflow_dispatch: + inputs: + branch: + description: 'Branch name to clean up' + required: true + default: '' + +jobs: + cleanup: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + token: ${{ secrets.WORKFLOW_PAT }} + + - name: Set branch name for manual trigger + if: github.event_name == 'workflow_dispatch' + run: echo "BRANCH_NAME=${{ github.event.inputs.branch }}" >> $GITHUB_ENV + + - name: Set branch name for delete event + if: github.event_name == 'delete' + run: echo "BRANCH_NAME=${{ github.event.ref }}" >> $GITHUB_ENV + + - name: Install and configure the Koyeb CLI + uses: koyeb-community/koyeb-actions@v2 + with: + api_token: "${{ secrets.KOYEB_API_TOKEN }}" + + - name: Cleanup Koyeb application + uses: koyeb/action-git-deploy/cleanup@v1 + with: + app-name: ${{ env.BRANCH_NAME == 'dev' && 'duckdb-spawn-staging' || 'duckdb-spawn-preview' }} \ No newline at end of file diff --git a/.github/workflows/debug-docker.yml b/.github/workflows/debug-docker.yml new file mode 100644 index 0000000..b497a6e --- /dev/null +++ b/.github/workflows/debug-docker.yml @@ -0,0 +1,145 @@ +name: Debug Docker Registry Access + +on: + workflow_dispatch: + inputs: + image_tag: + description: 'Specific Docker image tag to check' + required: false + default: 'latest' + +jobs: + debug-docker: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + token: ${{ secrets.WORKFLOW_PAT }} + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_HUB_USERNAME }} + password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} + + - name: Check Docker Hub connection + run: | + echo "Checking Docker Hub connection..." + docker info + echo "✓ Docker Hub connection verified" + + - name: Check and create repository if needed + run: | + echo "Checking if repository exists..." + REPO_EXISTS=$(curl -s -o /dev/null -w "%{http_code}" "https://hub.docker.com/v2/repositories/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn") + + if [ "$REPO_EXISTS" = "404" ]; then + echo "Repository does not exist, attempting to create it..." + LOGIN_RESPONSE=$(curl -s -X POST "https://hub.docker.com/v2/users/login/" \ + -H "Content-Type: application/json" \ + -d "{\"username\":\"${{ secrets.DOCKER_HUB_USERNAME }}\",\"password\":\"${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}\"}") + + echo "Login response status: $?" + + TOKEN=$(echo $LOGIN_RESPONSE | jq -r '.token') + + if [ "$TOKEN" = "null" ]; then + echo "❌ Failed to get authentication token. Login response:" + echo "$LOGIN_RESPONSE" + echo "This might be due to incorrect credentials or Docker Hub API changes." + else + CREATE_RESPONSE=$(curl -s -X POST "https://hub.docker.com/v2/repositories/" \ + -H "Content-Type: application/json" \ + -H "Authorization: JWT $TOKEN" \ + -d "{\"namespace\":\"${{ secrets.DOCKER_HUB_USERNAME }}\",\"name\":\"duckdb-spawn\",\"description\":\"DuckDB Spawn API\",\"is_private\":false}") + + echo "Repository creation response:" + echo "$CREATE_RESPONSE" + fi + else + echo "✓ Repository exists, proceeding with checks." + fi + + - name: List available repositories + run: | + echo "Listing repositories for ${{ secrets.DOCKER_HUB_USERNAME }}..." + REPO_RESPONSE=$(curl -s "https://hub.docker.com/v2/users/${{ secrets.DOCKER_HUB_USERNAME }}/repositories?page_size=100") + if [[ $REPO_RESPONSE == *"\"detail\":\"Object not found\""* ]]; then + echo "User not found or no repositories available." + else + echo "Repositories found:" + echo "$REPO_RESPONSE" | jq -r '.results[].name' || echo "No repositories found or error parsing list" + fi + + - name: List available tags + run: | + echo "Listing tags for duckdb-spawn..." + TAG_RESPONSE=$(curl -s "https://registry.hub.docker.com/v2/repositories/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn/tags?page_size=100") + if [[ $TAG_RESPONSE == *"\"detail\":\"Object not found\""* || $TAG_RESPONSE == *"\"results\":null"* ]]; then + echo "Repository not found or no tags available." + echo "The repository might not exist or you might not have permission to access it." + else + echo "Tags found:" + echo "$TAG_RESPONSE" | jq -r '.results[].name' || echo "Error parsing tag response" + fi + + - name: Check specific tag + run: | + TAG="${{ github.event.inputs.image_tag }}" + echo "Checking for tag: $TAG" + + # Try to pull the image + if docker pull ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${TAG} 2>/dev/null; then + echo "✓ Successfully pulled image ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${TAG}" + else + echo "✗ Failed to pull image ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${TAG}" + echo "The image may not exist or you may not have permission to access it." + echo "Let's check Docker Hub directly..." + + TAG_RESPONSE=$(curl -s "https://registry.hub.docker.com/v2/repositories/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn/tags?page_size=100") + if [[ $TAG_RESPONSE == *"\"detail\":\"Object not found\""* || $TAG_RESPONSE == *"\"results\":null"* ]]; then + echo "Repository not found or no tags available." + else + echo "Available tags are:" + echo "$TAG_RESPONSE" | jq -r '.results[].name' || echo "Error parsing tag response" + fi + fi + + - name: Create and test Docker secret for Koyeb + run: | + echo "Creating Docker credentials file..." + mkdir -p ~/.docker + echo '{"auths":{"https://index.docker.io/v1/":{"auth":"'$(echo -n "${{ secrets.DOCKER_HUB_USERNAME }}:${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}" | base64)'"}}}' > ~/.docker/config.json + + # Install Koyeb CLI + curl -fsSL https://raw.githubusercontent.com/koyeb/koyeb-cli/master/install.sh | sh + export PATH=$PATH:$HOME/.koyeb/bin + + # Configure Koyeb CLI + echo "token: ${{ secrets.KOYEB_API_TOKEN }}" > ~/.koyeb.yaml + + # Create or update the secret + echo "Creating Docker registry secret in Koyeb..." + koyeb secret create DOCKER_REPO_SECRET --docker-registry-auth=${{ secrets.DOCKER_HUB_USERNAME }}:${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} --type=registry || echo "Secret already exists, trying to update..." + + # Try to get the secret + echo "Verifying secret in Koyeb..." + koyeb secret get DOCKER_REPO_SECRET + + - name: Create and push test image + if: github.event.inputs.image_tag == 'latest' + run: | + echo "Creating a simple test image to verify Docker Hub push access..." + echo "FROM hello-world" > Dockerfile.test + + echo "Building test image..." + docker build -t ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:test-${{ github.run_id }} -f Dockerfile.test . + + echo "Pushing test image..." + if docker push ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:test-${{ github.run_id }}; then + echo "✓ Successfully pushed test image to Docker Hub" + else + echo "❌ Failed to push test image to Docker Hub" + echo "This might be due to insufficient permissions or Docker Hub API issues." + fi \ No newline at end of file diff --git a/.github/workflows/koyeb-deploy.yml b/.github/workflows/koyeb-deploy.yml index 62d4738..bbd5639 100644 --- a/.github/workflows/koyeb-deploy.yml +++ b/.github/workflows/koyeb-deploy.yml @@ -1,4 +1,4 @@ -name: Deploy to Koyeb +name: Deploy to Koyeb Production on: push: @@ -10,49 +10,169 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 + with: + repository: DealExMachina/duckdb-spawn + token: ${{ secrets.WORKFLOW_PAT }} + ssh-strict: true + ssh-user: git + persist-credentials: true + clean: true + sparse-checkout-cone-mode: true + fetch-depth: 1 + fetch-tags: false + show-progress: true + lfs: false + submodules: false + set-safe-directory: true + env: + PYTHON_VERSION: 3.10 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: Login to Docker Hub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKER_HUB_USERNAME }} password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} - name: Build and push Docker image id: docker_build - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . push: true - tags: ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} + tags: | + ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} + ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:latest + cache-from: type=registry,ref=${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:latest + cache-to: type=inline - - name: Debug - Verify image exists + - name: Wait for image to be available + id: wait_for_image run: | - echo "Checking image: ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }}" - docker pull ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} - echo "Image details:" - docker inspect ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} + echo "Waiting for image to be available in Docker Hub..." + MAX_ATTEMPTS=10 + ATTEMPT=1 + IMAGE_AVAILABLE=false + + while [ $ATTEMPT -le $MAX_ATTEMPTS ] && [ "$IMAGE_AVAILABLE" = "false" ]; do + echo "Attempt $ATTEMPT of $MAX_ATTEMPTS: Checking if image exists..." + + # Try to pull the image + if docker pull ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} 2>/dev/null; then + echo "✅ Image exists and is accessible on attempt $ATTEMPT" + IMAGE_AVAILABLE=true + break + else + echo "⏳ Image not yet available. Waiting before next attempt..." + sleep 30 # Wait 30 seconds between attempts + ATTEMPT=$((ATTEMPT + 1)) + fi + done + + if [ "$IMAGE_AVAILABLE" = "true" ]; then + echo "Docker image is now available and ready for deployment" + echo "image_available=true" >> $GITHUB_OUTPUT + else + echo "⚠️ WARNING: Image could not be verified after $MAX_ATTEMPTS attempts" + echo "This might be due to Docker Hub delays or API limitations" + echo "Will attempt deployment anyway, but it might fail if the image isn't ready" + echo "image_available=false" >> $GITHUB_OUTPUT + + # List available tags to help diagnose issues + echo "All available tags in repository:" + curl -s "https://registry.hub.docker.com/v2/repositories/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn/tags?page_size=100" | jq -r '.results[].name' || echo "Error listing tags" + fi - - name: Configure Koyeb + - name: Install and configure the Koyeb CLI uses: koyeb-community/koyeb-actions@v2 with: api_token: "${{ secrets.KOYEB_API_TOKEN }}" - - name: Deploy on Koyeb + - name: Verify Koyeb CLI and Docker registry secret + run: | + echo "Verifying Koyeb CLI installation..." + koyeb --help + koyeb app list + + echo "Checking for existing Docker registry secret..." + if koyeb secret get DOCKER_REPO_SECRET &>/dev/null; then + echo "✅ Found existing Docker registry secret, using it for deployment" + else + echo "⚠️ Docker registry secret not found. Creating it..." + koyeb secret create DOCKER_REPO_SECRET \ + --docker-registry-auth=${{ secrets.DOCKER_HUB_USERNAME }}:${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} \ + --docker-registry-server=docker.io \ + --type=registry + echo "Docker registry secret created" + fi + + - name: Prepare direct deployment with Koyeb CLI + id: prepare_direct + run: | + echo "Preparing direct deployment in case GitHub Action fails..." + + # Encode the service env variables for CLI usage + ENV_VARS="DATABASE_URL=/data/duckdb_spawn.db PYTHONUNBUFFERED=1 LOG_LEVEL=info ENVIRONMENT=production" + + # Generate a command to directly deploy using the CLI as a fallback + CLI_DEPLOY_CMD="koyeb service create api \ + --app duckdb-spawn \ + --docker docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} \ + --docker-private-registry-secret DOCKER_REPO_SECRET \ + --ports 8000:http \ + --routes /:8000 \ + --env \"${ENV_VARS}\" \ + --instance-type nano \ + --regions fra \ + --healthchecks 8000:http:monitoring/health" + + echo "CLI_DEPLOY_CMD<> $GITHUB_ENV + echo "$CLI_DEPLOY_CMD" >> $GITHUB_ENV + echo "EOF" >> $GITHUB_ENV + + - name: Deploy to Koyeb using action + id: deploy_action uses: koyeb/action-git-deploy@v1 with: app-name: duckdb-spawn service-name: api service-type: web docker: docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} - service-env: DATABASE_URL=/data/duckdb_spawn.db,PYTHONUNBUFFERED=1,LOG_LEVEL=info,ENVIRONMENT=staging - service-ports: 8000:http - service-routes: /:8000 + docker-private-registry-secret: DOCKER_REPO_SECRET service-instance-type: nano service-regions: fra - service-checks: 8000:http:/monitoring/health - docker-private-registry-secret: DOCKER_REPO_SECRET + service-env: DATABASE_URL=/data/duckdb_spawn.db PYTHONUNBUFFERED=1 LOG_LEVEL=info ENVIRONMENT=production + service-ports: 8000:http + service-routes: /:8000 + service-checks: 8000:http://monitoring/health + continue-on-error: true + + - name: Fallback to direct Koyeb CLI if action failed + if: steps.deploy_action.outcome == 'failure' + run: | + echo "Action-based deployment failed, falling back to direct CLI deployment..." + echo "Executing: ${{ env.CLI_DEPLOY_CMD }}" + + if ${{ env.CLI_DEPLOY_CMD }}; then + echo "✅ Direct CLI deployment successful!" + else + echo "❌ Direct CLI deployment also failed. Trying to update existing service..." + + # Try to update if service exists + koyeb service update api \ + --app duckdb-spawn \ + --docker docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:${{ github.sha }} \ + --docker-private-registry-secret DOCKER_REPO_SECRET + fi + + - name: Verify deployment + run: | + echo "Verifying deployment status..." + koyeb service get -a duckdb-spawn api -o json | jq '.latest_deployment' + + echo "Application URL:" + koyeb app get duckdb-spawn -o json | jq -r '.domains[0].domain' \ No newline at end of file diff --git a/.github/workflows/pr-preview.yml b/.github/workflows/pr-preview.yml new file mode 100644 index 0000000..7d968c4 --- /dev/null +++ b/.github/workflows/pr-preview.yml @@ -0,0 +1,143 @@ +name: PR Preview Deployment + +on: + pull_request: + types: [opened, synchronize, reopened] + branches: + - main + - dev + +jobs: + deploy-preview: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + repository: DealExMachina/duckdb-spawn + token: ${{ secrets.WORKFLOW_PAT }} + ssh-strict: true + ssh-user: git + persist-credentials: true + clean: true + sparse-checkout-cone-mode: true + fetch-depth: 1 + fetch-tags: false + show-progress: true + lfs: false + submodules: false + set-safe-directory: true + env: + PYTHON_VERSION: 3.10 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_HUB_USERNAME }} + password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} + + - name: Extract branch name + shell: bash + run: | + echo "BRANCH_NAME=$(echo ${GITHUB_HEAD_REF} | tr / -)" >> $GITHUB_ENV + echo "Branch name: ${GITHUB_HEAD_REF}" + echo "Sanitized: $(echo ${GITHUB_HEAD_REF} | tr / -)" + + - name: Build and push Docker image + id: docker_build + uses: docker/build-push-action@v5 + with: + context: . + push: true + tags: | + ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:preview-${{ env.BRANCH_NAME }}-${{ github.sha }} + ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:preview-${{ env.BRANCH_NAME }} + cache-from: type=registry,ref=${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:preview-${{ env.BRANCH_NAME }} + cache-to: type=inline + + - name: Wait for image to be available + id: wait_for_image + run: | + echo "Waiting for image to be available in Docker Hub..." + MAX_ATTEMPTS=6 # Reduced attempts for PR previews for faster feedback + ATTEMPT=1 + IMAGE_AVAILABLE=false + + while [ $ATTEMPT -le $MAX_ATTEMPTS ] && [ "$IMAGE_AVAILABLE" = "false" ]; do + echo "Attempt $ATTEMPT of $MAX_ATTEMPTS: Checking if image exists..." + + # Try to pull the image + if docker pull ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:preview-${{ env.BRANCH_NAME }}-${{ github.sha }} 2>/dev/null; then + echo "✅ Image exists and is accessible on attempt $ATTEMPT" + IMAGE_AVAILABLE=true + break + else + echo "⏳ Image not yet available. Waiting before next attempt..." + sleep 20 # Reduced wait time for PR previews + ATTEMPT=$((ATTEMPT + 1)) + fi + done + + if [ "$IMAGE_AVAILABLE" = "false" ]; then + echo "⚠️ WARNING: Image could not be verified, but proceeding with deployment" + fi + + - name: Install and configure the Koyeb CLI + uses: koyeb-community/koyeb-actions@v2 + with: + api_token: "${{ secrets.KOYEB_API_TOKEN }}" + + - name: Verify Docker registry secret exists + run: | + echo "Checking for existing Docker registry secret..." + if koyeb secret get DOCKER_REPO_SECRET &>/dev/null; then + echo "✅ Found existing Docker registry secret, using it for deployment" + else + echo "⚠️ Docker registry secret not found. Creating it..." + koyeb secret create DOCKER_REPO_SECRET \ + --docker-registry-auth=${{ secrets.DOCKER_HUB_USERNAME }}:${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} \ + --docker-registry-server=docker.io \ + --type=registry + echo "Docker registry secret created" + fi + + - name: Deploy to Koyeb + id: deploy_action + uses: koyeb/action-git-deploy@v1 + with: + app-name: duckdb-spawn-preview-${{ env.BRANCH_NAME }} + service-name: api + service-type: web + docker: docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:preview-${{ env.BRANCH_NAME }}-${{ github.sha }} + docker-private-registry-secret: DOCKER_REPO_SECRET + service-instance-type: nano + service-regions: fra + service-env: DATABASE_URL=/data/duckdb_spawn.db PYTHONUNBUFFERED=1 LOG_LEVEL=info ENVIRONMENT=preview + service-ports: 8000:http + service-routes: /:8000 + service-checks: 8000:http://monitoring/health + continue-on-error: true + + - name: Get App URL + id: app_url + run: | + DOMAIN=$(koyeb app get duckdb-spawn-preview-${{ env.BRANCH_NAME }} -o json | jq -r '.domains[0].domain') + echo "APP_URL=https://$DOMAIN" >> $GITHUB_ENV + echo "url=https://$DOMAIN" >> $GITHUB_OUTPUT + + - name: Comment on PR with deployment URL + uses: actions/github-script@v6 + with: + github-token: ${{secrets.GITHUB_TOKEN}} + script: | + const domain = '${{ steps.app_url.outputs.url }}'; + const message = `✅ Preview environment deployed at: ${domain}`; + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: message + }); \ No newline at end of file diff --git a/.github/workflows/staging-deploy.yml b/.github/workflows/staging-deploy.yml index bc02bb4..32b7581 100644 --- a/.github/workflows/staging-deploy.yml +++ b/.github/workflows/staging-deploy.yml @@ -8,52 +8,176 @@ on: jobs: deploy: runs-on: ubuntu-latest - environment: - name: staging - url: https://duckdb-spawn-staging.koyeb.app steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 + with: + repository: DealExMachina/duckdb-spawn + token: ${{ secrets.WORKFLOW_PAT }} + ssh-strict: true + ssh-user: git + persist-credentials: true + clean: true + sparse-checkout-cone-mode: true + fetch-depth: 1 + fetch-tags: false + show-progress: true + lfs: false + submodules: false + set-safe-directory: true + env: + PYTHON_VERSION: 3.10 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: Login to Docker Hub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKER_HUB_USERNAME }} password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} - name: Build and push Docker image id: docker_build - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . push: true - tags: ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} + tags: | + ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} + ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging cache-from: type=registry,ref=${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging cache-to: type=inline - - name: Debug - Verify image exists + - name: Wait for image to be available + id: wait_for_image + run: | + echo "Waiting for image to be available in Docker Hub..." + MAX_ATTEMPTS=10 + ATTEMPT=1 + IMAGE_AVAILABLE=false + + while [ $ATTEMPT -le $MAX_ATTEMPTS ] && [ "$IMAGE_AVAILABLE" = "false" ]; do + echo "Attempt $ATTEMPT of $MAX_ATTEMPTS: Checking if image exists..." + + # Try to pull the image + if docker pull ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} 2>/dev/null; then + echo "✅ Image exists and is accessible on attempt $ATTEMPT" + IMAGE_AVAILABLE=true + break + else + echo "⏳ Image not yet available. Waiting before next attempt..." + sleep 30 # Wait 30 seconds between attempts + ATTEMPT=$((ATTEMPT + 1)) + fi + done + + if [ "$IMAGE_AVAILABLE" = "true" ]; then + echo "Docker image is now available and ready for deployment" + echo "image_available=true" >> $GITHUB_OUTPUT + else + echo "⚠️ WARNING: Image could not be verified after $MAX_ATTEMPTS attempts" + echo "This might be due to Docker Hub delays or API limitations" + echo "Will attempt deployment anyway, but it might fail if the image isn't ready" + echo "image_available=false" >> $GITHUB_OUTPUT + + # List available tags to help diagnose issues + echo "All available tags in repository:" + curl -s "https://registry.hub.docker.com/v2/repositories/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn/tags?page_size=100" | jq -r '.results[].name' || echo "Error listing tags" + fi + + - name: Install and configure the Koyeb CLI + uses: koyeb-community/koyeb-actions@v2 + with: + api_token: "${{ secrets.KOYEB_API_TOKEN }}" + + - name: Verify Koyeb CLI and Docker registry secret run: | - echo "Checking image: ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }}" - docker pull ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} - echo "Image details:" - docker inspect ${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} + echo "Verifying Koyeb CLI installation..." + koyeb --help + koyeb app list + + echo "Checking for existing Docker registry secret..." + if koyeb secret get DOCKER_REPO_SECRET &>/dev/null; then + echo "✅ Found existing Docker registry secret, using it for deployment" + else + echo "⚠️ Docker registry secret not found. Creating it..." + koyeb secret create DOCKER_REPO_SECRET \ + --docker-registry-auth=${{ secrets.DOCKER_HUB_USERNAME }}:${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} \ + --docker-registry-server=docker.io \ + --type=registry + echo "Docker registry secret created" + fi - - name: Deploy to Koyeb + - name: Prepare direct deployment with Koyeb CLI + id: prepare_direct + run: | + echo "Preparing direct deployment in case GitHub Action fails..." + + # Encode the service env variables for CLI usage + ENV_VARS="DATABASE_URL=/data/duckdb_spawn.db PYTHONUNBUFFERED=1 LOG_LEVEL=info ENVIRONMENT=staging" + + # Check if app exists, create if not + if ! koyeb app get duckdb-spawn-staging &>/dev/null; then + echo "Creating app duckdb-spawn-staging..." + koyeb app create duckdb-spawn-staging + fi + + # Generate a command to directly deploy using the CLI as a fallback + CLI_DEPLOY_CMD="koyeb service create api \ + --app duckdb-spawn-staging \ + --docker docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} \ + --docker-private-registry-secret DOCKER_REPO_SECRET \ + --ports 8000:http \ + --routes /:8000 \ + --env \"${ENV_VARS}\" \ + --instance-type nano \ + --regions fra \ + --healthchecks 8000:http:monitoring/health" + + echo "CLI_DEPLOY_CMD<> $GITHUB_ENV + echo "$CLI_DEPLOY_CMD" >> $GITHUB_ENV + echo "EOF" >> $GITHUB_ENV + + - name: Deploy to Koyeb using action + id: deploy_action uses: koyeb/action-git-deploy@v1 with: - api-token: ${{ secrets.KOYEB_API_TOKEN }} app-name: duckdb-spawn-staging service-name: api - service-env: DATABASE_URL=/data/duckdb_spawn.db PYTHONUNBUFFERED=1 LOG_LEVEL=info ENVIRONMENT=staging service-type: web - service-ports: 8000:http - service-routes: /:8000 + docker: docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} + docker-private-registry-secret: DOCKER_REPO_SECRET service-instance-type: nano service-regions: fra - service-health-check: /monitoring/health:8000 - docker-tag: docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} - docker-registry-username: ${{ secrets.DOCKER_HUB_USERNAME }} - docker-registry-password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} \ No newline at end of file + service-env: DATABASE_URL=/data/duckdb_spawn.db PYTHONUNBUFFERED=1 LOG_LEVEL=info ENVIRONMENT=staging + service-ports: 8000:http + service-routes: /:8000 + service-checks: 8000:http://monitoring/health + continue-on-error: true + + - name: Fallback to direct Koyeb CLI if action failed + if: steps.deploy_action.outcome == 'failure' + run: | + echo "Action-based deployment failed, falling back to direct CLI deployment..." + echo "Executing: ${{ env.CLI_DEPLOY_CMD }}" + + if ${{ env.CLI_DEPLOY_CMD }}; then + echo "✅ Direct CLI deployment successful!" + else + echo "❌ Direct CLI deployment also failed. Trying to update existing service..." + + # Try to update if service exists + koyeb service update api \ + --app duckdb-spawn-staging \ + --docker docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/duckdb-spawn:staging-${{ github.sha }} \ + --docker-private-registry-secret DOCKER_REPO_SECRET || echo "Service update failed too" + fi + + - name: Verify deployment + run: | + echo "Verifying deployment status..." + koyeb service get -a duckdb-spawn-staging api -o json | jq '.latest_deployment' || echo "Couldn't get deployment status" + + echo "Application URL:" + koyeb app get duckdb-spawn-staging -o json | jq -r '.domains[0].domain' || echo "Couldn't get app domain" \ No newline at end of file diff --git a/README.md b/README.md index f865700..80f90b4 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,22 @@ pip install -r requirements.txt uvicorn src.main:app --reload ``` +## GitHub Workflows + +This project uses GitHub Actions for CI/CD pipelines, testing, and deployments. To use these workflows, you'll need to set up the following secrets: + +### Required Secrets + +- `WORKFLOW_PAT`: A GitHub Personal Access Token with `repo` and `workflow` scopes. This is used for actions that need to access the repository, especially for cross-repository checkout operations. +- `DOCKER_HUB_USERNAME`: Your Docker Hub username +- `DOCKER_HUB_ACCESS_TOKEN`: Docker Hub access token for pushing images +- `KOYEB_API_TOKEN`: API token for Koyeb deployments + +To create a Personal Access Token (PAT): +1. Go to GitHub Settings → Developer Settings → Personal access tokens → Tokens (classic) +2. Generate a new token with at least the `repo` and `workflow` scopes +3. Add this token as a repository secret named `WORKFLOW_PAT` + ## Testing Run tests with pytest: @@ -343,6 +359,51 @@ cd infrastructure/monitoring docker-compose up -d --force-recreate ``` +## Troubleshooting Docker Registry Secrets + +When deploying to Koyeb with a private Docker registry, ensure: + +1. The secret exists: + ```bash + koyeb secret get DOCKER_REPO_SECRET + ``` + +2. The Docker registry secret has the correct format: + ```bash + koyeb secret create DOCKER_REPO_SECRET \ + --docker-registry-auth=YOUR_USERNAME:YOUR_PASSWORD \ + --docker-registry-server=docker.io \ + --type=registry + ``` + +3. The Docker image reference in the deployment command includes the full path: + ``` + docker.io/username/duckdb-spawn:tag + ``` + +4. The deployment command correctly references the secret: + ``` + --docker-private-registry-secret DOCKER_REPO_SECRET + ``` + +### Koyeb CLI Commands + +Note that some Koyeb CLI commands might have changed. To verify Koyeb CLI installation and get help: + +```bash +koyeb --help +``` + +To list available apps: +```bash +koyeb app list +``` + +To check service status: +```bash +koyeb service get -a app-name service-name +``` + ## License MIT License @@ -368,4 +429,4 @@ Detailed documentation for the DuckDB Spawn project is available in the `docs` d - [Architecture](docs/ARCHITECTURE.md): Comprehensive explanation of the system architecture and design decisions - [Roadmap](docs/ROADMAP.md): Future development plans and feature timelines - [Agentic Research](docs/AGENTIC_RESEARCH.md): Research initiative on agentic data products using small language models -- [Sidecar Specification](docs/SIDECAR_SPEC.md): Technical specification for the agentic sidecar implementation +- [Sidecar Specification](docs/SIDECAR_SPEC.md): Technical specification for the agentic sidecar implementation \ No newline at end of file diff --git a/data_mesh_design_recommendations.md b/data_mesh_design_recommendations.md new file mode 100644 index 0000000..be1f2fb --- /dev/null +++ b/data_mesh_design_recommendations.md @@ -0,0 +1,336 @@ +# Data Mesh Design Review and Recommendations + +## Executive Summary + +The DuckDB Spawn project demonstrates a solid understanding of data mesh principles with its domain-oriented approach, self-contained architecture, and federated governance through an ontology server. While the foundational implementation is strong, there are several opportunities to enhance the design for better scalability, resilience, and alignment with data mesh best practices. + +## Current Architecture Strengths + +### 1. Domain Ownership Implementation +- ✅ **Self-contained service**: The project correctly encapsulates the project financing domain +- ✅ **Infrastructure as Code**: Pulumi integration enables true self-service deployment +- ✅ **Embedded database**: DuckDB choice eliminates external dependencies + +### 2. Data as a Product Mindset +- ✅ **Well-documented APIs**: FastAPI provides automatic OpenAPI documentation +- ✅ **Health monitoring**: Comprehensive health checks and metrics +- ✅ **Clear versioning**: Structured roadmap with version planning + +### 3. Technical Implementation Quality +- ✅ **Connection pooling**: Thread-safe database connection management +- ✅ **Async architecture**: FastAPI's async support for better performance +- ✅ **Security layers**: CORS, rate limiting, and trusted host middleware + +## Design Recommendations + +### 1. Enhanced Data Product Metadata + +**Current Gap**: Limited metadata about the data product itself (ownership, SLAs, data quality metrics) + +**Recommendation**: Implement a comprehensive data product descriptor +```python +# src/models/data_product.py +class DataProductDescriptor(BaseModel): + id: str = "project-financing-data-product" + name: str = "Project Financing Data Product" + domain: str = "finance" + owner: TeamInfo + version: str = "1.2.1" + sla: SLADefinition + quality_metrics: List[QualityMetric] + data_contracts: List[DataContract] + dependencies: List[str] # Other data products + +# Expose via endpoint +@router.get("/data-product/descriptor") +async def get_data_product_info(): + return DataProductDescriptor(...) +``` + +### 2. Data Contract Implementation + +**Current Gap**: No formal data contracts with consumers + +**Recommendation**: Implement contract-driven development +```python +# src/contracts/base.py +class DataContract(BaseModel): + contract_id: str + consumer: str + provider: str = "project-financing-dp" + schema_version: str + quality_assertions: List[QualityAssertion] + sla_terms: SLATerms + valid_from: datetime + valid_until: Optional[datetime] + +# src/contracts/validation.py +class ContractValidator: + async def validate_response(self, data: Any, contract: DataContract): + # Validate schema compliance + # Check quality assertions + # Verify SLA compliance + pass +``` + +### 3. Event-Driven Architecture Enhancement + +**Current Gap**: Limited event publishing capabilities + +**Recommendation**: Implement comprehensive event streaming +```python +# src/events/publisher.py +class EventPublisher: + def __init__(self, broker_config: BrokerConfig): + self.producer = self._create_producer(broker_config) + + async def publish_data_change(self, event: DataChangeEvent): + # Publish to domain-specific topic + topic = f"finance.projects.{event.event_type}" + await self.producer.send(topic, event.json()) + +# src/database/hooks.py +class DatabaseHooks: + @after_insert("projects") + async def on_project_created(self, project: Project): + event = DataChangeEvent( + event_type="created", + entity_type="project", + entity_id=project.project_id, + data=project.dict(), + timestamp=datetime.utcnow() + ) + await self.event_publisher.publish_data_change(event) +``` + +### 4. Multi-Tenancy Support + +**Current Gap**: No tenant isolation for multi-organization use + +**Recommendation**: Add tenant context and isolation +```python +# src/middleware/tenant.py +class TenantMiddleware: + async def __call__(self, request: Request, call_next): + tenant_id = extract_tenant_from_token(request.headers) + request.state.tenant_id = tenant_id + + # Set tenant context for database operations + with tenant_context(tenant_id): + response = await call_next(request) + return response + +# src/database/tenant_isolation.py +class TenantAwareConnectionManager: + def get_connection(self, tenant_id: str): + # Option 1: Separate databases per tenant + db_path = f"data/{tenant_id}/data_product.db" + + # Option 2: Row-level security with tenant_id column + conn = self._get_base_connection() + conn.execute(f"SET tenant_id = '{tenant_id}'") + return conn +``` + +### 5. Data Lineage and Provenance + +**Current Gap**: No tracking of data origin and transformations + +**Recommendation**: Implement lineage tracking +```python +# src/lineage/tracker.py +class DataLineageTracker: + def track_data_flow(self, operation: Operation): + lineage_record = LineageRecord( + source_system=operation.source, + transformation=operation.transformation, + destination="project-financing-dp", + timestamp=datetime.utcnow(), + user=operation.user, + impact_analysis=self._analyze_impact(operation) + ) + self._store_lineage(lineage_record) +``` + +### 6. Advanced Schema Evolution + +**Current Gap**: Basic schema fetching without version management + +**Recommendation**: Implement sophisticated schema evolution +```python +# src/schema/evolution.py +class SchemaEvolutionManager: + async def apply_migration(self, + from_version: str, + to_version: str): + migration_path = self._get_migration_path(from_version, to_version) + + for migration in migration_path: + # Apply backward compatible changes first + if migration.is_backward_compatible: + await self._apply_migration(migration) + else: + # Coordinate with consumers for breaking changes + await self._coordinate_breaking_change(migration) + + def validate_compatibility(self, + old_schema: Schema, + new_schema: Schema) -> CompatibilityReport: + # Check for breaking changes + # Validate data type compatibility + # Ensure required fields handling + pass +``` + +### 7. Distributed Query Federation + +**Current Gap**: No ability to query across data products + +**Recommendation**: Implement query federation capabilities +```python +# src/federation/query_engine.py +class FederatedQueryEngine: + async def execute_federated_query(self, query: FederatedQuery): + # Parse query to identify required data products + required_products = self._parse_data_products(query) + + # Execute sub-queries in parallel + sub_results = await asyncio.gather(*[ + self._query_data_product(dp, query) + for dp in required_products + ]) + + # Join results locally using DuckDB + return self._join_results(sub_results, query.join_conditions) +``` + +### 8. Observability Enhancement + +**Current Gap**: Basic metrics without distributed tracing + +**Recommendation**: Implement comprehensive observability +```python +# src/observability/tracing.py +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) + +class TracingMiddleware: + async def __call__(self, request: Request, call_next): + with tracer.start_as_current_span("http_request") as span: + span.set_attribute("http.method", request.method) + span.set_attribute("http.url", str(request.url)) + span.set_attribute("data.product", "project-financing") + + # Propagate context to downstream services + headers = inject_trace_context(request.headers) + response = await call_next(request) + + span.set_attribute("http.status_code", response.status_code) + return response +``` + +### 9. Self-Service Data Discovery + +**Current Gap**: Limited discoverability features + +**Recommendation**: Implement data catalog integration +```python +# src/discovery/catalog.py +class DataCatalogIntegration: + async def register_data_product(self): + catalog_entry = CatalogEntry( + id=self.data_product_id, + name="Project Financing Data Product", + description=self.description, + schema=await self.get_current_schema(), + sample_queries=self.get_sample_queries(), + access_patterns=self.get_access_patterns(), + quality_score=await self.calculate_quality_score(), + tags=["finance", "projects", "investments"] + ) + await self.catalog_client.register(catalog_entry) +``` + +### 10. Resilience Patterns Enhancement + +**Current Gap**: Basic circuit breaker for ontology server + +**Recommendation**: Comprehensive resilience implementation +```python +# src/resilience/patterns.py +class ResilienceDecorator: + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type(TransientError) + ) + @circuit_breaker( + failure_threshold=5, + recovery_timeout=30, + expected_exception=ServiceUnavailable + ) + @timeout(seconds=5) + @bulkhead(max_concurrent=10) + async def call_external_service(self, *args, **kwargs): + # Resilient external service call + pass +``` + +## Architecture Evolution Recommendations + +### Phase 1: Foundation Enhancement (Next 3 months) +1. Implement data contracts and validation +2. Add comprehensive event publishing +3. Enhance observability with distributed tracing + +### Phase 2: Federation Capabilities (3-6 months) +1. Implement query federation engine +2. Add cross-product join capabilities +3. Build data product discovery service + +### Phase 3: Advanced Autonomy (6-12 months) +1. Integrate the planned agentic sidecar +2. Implement self-optimization capabilities +3. Add predictive scaling based on usage patterns + +## Testing Strategy Improvements + +```python +# tests/contract_testing.py +class ContractTests: + """Test data contracts with consumers""" + + @pytest.mark.contract + async def test_consumer_contract_compliance(self): + # Verify all contracts are satisfied + pass + +# tests/chaos_engineering.py +class ChaosTests: + """Test resilience under failure conditions""" + + @pytest.mark.chaos + async def test_ontology_server_failure_recovery(self): + # Simulate ontology server outage + # Verify fallback behavior + pass +``` + +## Security Enhancements + +1. **Zero-Trust Architecture**: Implement mTLS for service-to-service communication +2. **Data Encryption**: Add encryption at rest for sensitive financial data +3. **Audit Logging**: Comprehensive audit trail for all data operations +4. **Policy Engine**: Integrate Open Policy Agent for fine-grained authorization + +## Performance Optimizations + +1. **Query Optimization**: Implement query plan caching and optimization +2. **Materialized Views**: Pre-compute common aggregations +3. **Partitioning Strategy**: Time-based partitioning for historical data +4. **Connection Pool Tuning**: Dynamic pool sizing based on load + +## Conclusion + +The DuckDB Spawn project has a strong foundation for a data mesh implementation. The recommendations above will help evolve it into a more mature, production-ready data product that fully embraces data mesh principles while maintaining practical operability. Focus on implementing changes incrementally, starting with data contracts and enhanced observability, as these provide immediate value and lay groundwork for more advanced features. \ No newline at end of file diff --git a/decentralized_catalog_architecture.md b/decentralized_catalog_architecture.md new file mode 100644 index 0000000..fb02a4f --- /dev/null +++ b/decentralized_catalog_architecture.md @@ -0,0 +1,611 @@ +# Decentralized Catalog and Registry Architecture + +## Overview + +This proposal outlines a simple, decentralized catalog and registry system for data mesh architectures that enables data product discovery without central bottlenecks. The design prioritizes simplicity, resilience, and alignment with data mesh principles. + +## Architecture Principles + +1. **No Central Authority**: Each data product maintains its own metadata +2. **Peer Discovery**: Data products discover each other through gossip protocol +3. **Eventually Consistent**: Accept temporary inconsistencies for availability +4. **Self-Describing**: Each data product publishes standardized metadata +5. **Lightweight**: Minimal infrastructure overhead + +## High-Level Architecture + +```mermaid +graph TB + subgraph "Data Product A" + DPA[Data Product API] + LRA[Local Registry A] + GPA[Gossip Protocol A] + end + + subgraph "Data Product B" + DPB[Data Product API] + LRB[Local Registry B] + GPB[Gossip Protocol B] + end + + subgraph "Data Product C" + DPC[Data Product API] + LRC[Local Registry C] + GPC[Gossip Protocol C] + end + + subgraph "Discovery Service" + DS[Discovery API] + AGG[Aggregator] + CACHE[Registry Cache] + end + + GPA <--> GPB + GPB <--> GPC + GPC <--> GPA + + LRA --> DS + LRB --> DS + LRC --> DS + + DS --> AGG + AGG --> CACHE +``` + +## Core Components + +### 1. Data Product Self-Registration + +Each data product exposes its metadata through standardized endpoints: + +```python +# src/catalog/self_description.py +from pydantic import BaseModel, Field +from typing import List, Dict, Optional +from datetime import datetime + +class DataProductMetadata(BaseModel): + """Core metadata that every data product must provide""" + + # Identity + id: str = Field(description="Unique identifier (UUID)") + name: str = Field(description="Human-readable name") + version: str = Field(description="Semantic version") + domain: str = Field(description="Business domain") + + # Discovery + base_url: str = Field(description="Base API URL") + health_endpoint: str = "/health" + catalog_endpoint: str = "/catalog/metadata" + + # Description + description: str + owner: TeamInfo + contact: ContactInfo + + # Technical + data_formats: List[str] = ["json", "parquet", "csv"] + protocols: List[str] = ["http", "grpc"] + authentication: List[str] = ["oauth2", "api-key"] + + # Quality + sla: SLAInfo + quality_score: float = Field(ge=0, le=1) + + # Schema + schema_endpoint: str = "/schema" + schema_version: str + + # Timestamps + created_at: datetime + updated_at: datetime + last_seen: Optional[datetime] = None + +class TeamInfo(BaseModel): + name: str + email: str + slack_channel: Optional[str] + +class ContactInfo(BaseModel): + support_email: str + documentation_url: str + issue_tracker_url: Optional[str] + +class SLAInfo(BaseModel): + availability: float = Field(ge=0, le=100) + response_time_p99_ms: int + data_freshness_minutes: int +``` + +### 2. Local Registry Implementation + +Each data product maintains a local registry of discovered peers: + +```python +# src/catalog/local_registry.py +import asyncio +from typing import Dict, List, Optional +from datetime import datetime, timedelta +import aiohttp +from sqlalchemy import create_engine, Column, String, DateTime, Float +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +Base = declarative_base() + +class DataProductRecord(Base): + __tablename__ = 'discovered_products' + + id = Column(String, primary_key=True) + name = Column(String, nullable=False) + domain = Column(String, nullable=False) + base_url = Column(String, nullable=False) + version = Column(String, nullable=False) + quality_score = Column(Float) + last_seen = Column(DateTime, nullable=False) + metadata_json = Column(String, nullable=False) # Full metadata + +class LocalRegistry: + def __init__(self, db_path: str = "data/registry.db"): + self.engine = create_engine(f'sqlite:///{db_path}') + Base.metadata.create_all(self.engine) + self.Session = sessionmaker(bind=self.engine) + self._health_check_interval = 300 # 5 minutes + self._stale_threshold = timedelta(hours=24) + + async def register_peer(self, metadata: DataProductMetadata): + """Register or update a peer data product""" + session = self.Session() + try: + record = session.query(DataProductRecord).filter_by( + id=metadata.id + ).first() + + if record: + # Update existing + record.name = metadata.name + record.version = metadata.version + record.base_url = metadata.base_url + record.quality_score = metadata.quality_score + record.last_seen = datetime.utcnow() + record.metadata_json = metadata.json() + else: + # Create new + record = DataProductRecord( + id=metadata.id, + name=metadata.name, + domain=metadata.domain, + base_url=metadata.base_url, + version=metadata.version, + quality_score=metadata.quality_score, + last_seen=datetime.utcnow(), + metadata_json=metadata.json() + ) + session.add(record) + + session.commit() + finally: + session.close() + + async def get_peers_by_domain(self, domain: str) -> List[DataProductMetadata]: + """Get all active peers in a specific domain""" + session = self.Session() + try: + cutoff_time = datetime.utcnow() - self._stale_threshold + records = session.query(DataProductRecord).filter( + DataProductRecord.domain == domain, + DataProductRecord.last_seen > cutoff_time + ).all() + + return [ + DataProductMetadata.parse_raw(record.metadata_json) + for record in records + ] + finally: + session.close() + + async def health_check_peers(self): + """Periodic health check of registered peers""" + while True: + session = self.Session() + try: + records = session.query(DataProductRecord).all() + + for record in records: + metadata = DataProductMetadata.parse_raw(record.metadata_json) + if await self._check_peer_health(metadata): + record.last_seen = datetime.utcnow() + else: + # Mark as potentially stale but don't remove yet + pass + + session.commit() + finally: + session.close() + + await asyncio.sleep(self._health_check_interval) + + async def _check_peer_health(self, metadata: DataProductMetadata) -> bool: + """Check if a peer is healthy""" + try: + async with aiohttp.ClientSession() as session: + url = f"{metadata.base_url}{metadata.health_endpoint}" + async with session.get(url, timeout=5) as response: + return response.status == 200 + except: + return False +``` + +### 3. Gossip Protocol Implementation + +Simple gossip protocol for peer discovery: + +```python +# src/catalog/gossip.py +import random +import asyncio +from typing import Set, List +import aiohttp +from .local_registry import LocalRegistry, DataProductMetadata + +class GossipProtocol: + def __init__(self, + local_registry: LocalRegistry, + self_metadata: DataProductMetadata, + seed_peers: List[str] = None): + self.registry = local_registry + self.self_metadata = self_metadata + self.seed_peers = seed_peers or [] + self.known_peers: Set[str] = set() + self.gossip_interval = 60 # seconds + self.fanout = 3 # number of peers to gossip with + + async def start(self): + """Start the gossip protocol""" + # Bootstrap from seed peers + await self._bootstrap() + + # Start periodic gossip + asyncio.create_task(self._gossip_loop()) + + async def _bootstrap(self): + """Bootstrap from seed peers""" + for peer_url in self.seed_peers: + try: + await self._exchange_catalog(peer_url) + except Exception as e: + print(f"Failed to bootstrap from {peer_url}: {e}") + + async def _gossip_loop(self): + """Main gossip loop""" + while True: + await asyncio.sleep(self.gossip_interval) + + # Get random subset of known peers + all_peers = await self.registry.get_all_peers() + if len(all_peers) > self.fanout: + selected_peers = random.sample(all_peers, self.fanout) + else: + selected_peers = all_peers + + # Exchange catalogs with selected peers + for peer in selected_peers: + asyncio.create_task(self._exchange_catalog(peer.base_url)) + + async def _exchange_catalog(self, peer_url: str): + """Exchange catalog information with a peer""" + try: + async with aiohttp.ClientSession() as session: + # Send our catalog + my_peers = await self.registry.get_all_peers() + my_catalog = { + "self": self.self_metadata.dict(), + "peers": [p.dict() for p in my_peers] + } + + exchange_url = f"{peer_url}/catalog/exchange" + async with session.post( + exchange_url, + json=my_catalog, + timeout=10 + ) as response: + if response.status == 200: + their_catalog = await response.json() + await self._process_peer_catalog(their_catalog) + except Exception as e: + print(f"Gossip exchange failed with {peer_url}: {e}") + + async def _process_peer_catalog(self, catalog: dict): + """Process catalog received from peer""" + # Register the peer itself + peer_metadata = DataProductMetadata(**catalog["self"]) + await self.registry.register_peer(peer_metadata) + + # Register their known peers + for peer_data in catalog.get("peers", []): + try: + metadata = DataProductMetadata(**peer_data) + await self.registry.register_peer(metadata) + except Exception as e: + print(f"Failed to register peer: {e}") +``` + +### 4. Discovery API Implementation + +Aggregated discovery service (optional centralized view): + +```python +# src/catalog/discovery_api.py +from fastapi import FastAPI, HTTPException, Query +from typing import List, Optional +from datetime import datetime, timedelta + +app = FastAPI(title="Data Mesh Discovery Service") + +class DiscoveryService: + def __init__(self, registry: LocalRegistry): + self.registry = registry + self.cache_ttl = 300 # 5 minutes + self._cache = {} + self._cache_timestamp = None + + async def search_data_products( + self, + domain: Optional[str] = None, + name_contains: Optional[str] = None, + min_quality_score: float = 0.0, + limit: int = 100 + ) -> List[DataProductMetadata]: + """Search for data products with filters""" + + # Get all products from registry + all_products = await self._get_all_products_cached() + + # Apply filters + filtered = all_products + + if domain: + filtered = [p for p in filtered if p.domain == domain] + + if name_contains: + filtered = [ + p for p in filtered + if name_contains.lower() in p.name.lower() + ] + + if min_quality_score > 0: + filtered = [ + p for p in filtered + if p.quality_score >= min_quality_score + ] + + # Sort by quality score and limit + filtered.sort(key=lambda p: p.quality_score, reverse=True) + return filtered[:limit] + + async def _get_all_products_cached(self) -> List[DataProductMetadata]: + """Get all products with caching""" + now = datetime.utcnow() + + if (self._cache_timestamp and + now - self._cache_timestamp < timedelta(seconds=self.cache_ttl)): + return self._cache.get("all_products", []) + + # Refresh cache + all_products = await self.registry.get_all_active_peers() + self._cache["all_products"] = all_products + self._cache_timestamp = now + + return all_products + +discovery_service = DiscoveryService(LocalRegistry()) + +@app.get("/search", response_model=List[DataProductMetadata]) +async def search_data_products( + domain: Optional[str] = Query(None, description="Filter by domain"), + name: Optional[str] = Query(None, description="Filter by name (contains)"), + min_quality: float = Query(0.0, ge=0, le=1, description="Minimum quality score"), + limit: int = Query(100, ge=1, le=1000, description="Maximum results") +): + """Search for data products in the mesh""" + return await discovery_service.search_data_products( + domain=domain, + name_contains=name, + min_quality_score=min_quality, + limit=limit + ) + +@app.get("/domains", response_model=List[str]) +async def list_domains(): + """List all known domains in the mesh""" + all_products = await discovery_service._get_all_products_cached() + domains = list(set(p.domain for p in all_products)) + return sorted(domains) + +@app.get("/graph", response_model=dict) +async def get_dependency_graph(): + """Get the dependency graph of data products""" + # This would analyze dependencies between data products + # Implementation depends on how dependencies are tracked + pass +``` + +### 5. Integration with DuckDB Spawn + +Add catalog endpoints to your existing data product: + +```python +# src/routes/catalog.py +from fastapi import APIRouter, HTTPException +from typing import Dict, List +from ..catalog.self_description import DataProductMetadata, TeamInfo, ContactInfo, SLAInfo +from ..catalog.local_registry import LocalRegistry +from ..catalog.gossip import GossipProtocol + +router = APIRouter(prefix="/catalog", tags=["catalog"]) + +# Initialize catalog components +SELF_METADATA = DataProductMetadata( + id="550e8400-e29b-41d4-a716-446655440000", + name="Project Financing Data Product", + version="1.2.1", + domain="finance", + base_url="https://finance-dp.example.com", + description="Manages project financing data including portfolios and risk metrics", + owner=TeamInfo( + name="Finance Team", + email="finance-team@example.com", + slack_channel="#finance-data" + ), + contact=ContactInfo( + support_email="finance-support@example.com", + documentation_url="https://docs.example.com/finance-dp" + ), + sla=SLAInfo( + availability=99.9, + response_time_p99_ms=500, + data_freshness_minutes=15 + ), + quality_score=0.95, + schema_version="2.1.0", + created_at=datetime(2024, 1, 1), + updated_at=datetime.utcnow() +) + +local_registry = LocalRegistry() +gossip = GossipProtocol( + local_registry, + SELF_METADATA, + seed_peers=["https://catalog-dp1.example.com", "https://catalog-dp2.example.com"] +) + +@router.on_event("startup") +async def start_gossip(): + """Start gossip protocol on startup""" + await gossip.start() + +@router.get("/metadata", response_model=DataProductMetadata) +async def get_self_metadata(): + """Get this data product's metadata""" + return SELF_METADATA + +@router.post("/exchange") +async def exchange_catalogs(catalog: Dict): + """Exchange catalog information with a peer (used by gossip protocol)""" + try: + await gossip._process_peer_catalog(catalog) + + # Return our catalog + my_peers = await local_registry.get_all_peers() + return { + "self": SELF_METADATA.dict(), + "peers": [p.dict() for p in my_peers] + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/peers", response_model=List[DataProductMetadata]) +async def list_known_peers(domain: Optional[str] = None): + """List all known data products in the mesh""" + if domain: + return await local_registry.get_peers_by_domain(domain) + else: + return await local_registry.get_all_active_peers() + +@router.get("/discover/{domain}", response_model=List[DataProductMetadata]) +async def discover_domain_products(domain: str): + """Discover data products in a specific domain""" + return await local_registry.get_peers_by_domain(domain) +``` + +## Deployment Options + +### Option 1: Fully Decentralized +- Each data product runs gossip protocol +- No central discovery service +- Clients query any data product for discovery + +### Option 2: Hybrid with Discovery Services +- Data products maintain local registries +- Optional discovery services aggregate information +- Provides better search capabilities +- Discovery services can fail without breaking the mesh + +### Option 3: DNS-SD Based Discovery +```python +# Alternative: DNS Service Discovery +from zeroconf import ServiceInfo, Zeroconf +import socket + +class DNSSDCatalog: + def __init__(self): + self.zeroconf = Zeroconf() + + def register_data_product(self, metadata: DataProductMetadata): + """Register data product via DNS-SD""" + info = ServiceInfo( + "_datamesh._tcp.local.", + f"{metadata.name}._datamesh._tcp.local.", + addresses=[socket.inet_aton("127.0.0.1")], + port=8000, + properties={ + "id": metadata.id, + "domain": metadata.domain, + "version": metadata.version, + "base_url": metadata.base_url, + "schema_version": metadata.schema_version + } + ) + self.zeroconf.register_service(info) +``` + +## Benefits of This Architecture + +1. **No Single Point of Failure**: Registry is distributed across all data products +2. **Self-Healing**: Gossip protocol ensures information spreads even with failures +3. **Low Overhead**: Minimal additional infrastructure required +4. **Flexible Discovery**: Multiple discovery patterns supported +5. **Gradual Adoption**: Data products can join the catalog incrementally + +## Implementation Checklist + +- [ ] Define standard metadata schema (DataProductMetadata) +- [ ] Implement local registry with SQLite +- [ ] Add catalog endpoints to data products +- [ ] Implement gossip protocol +- [ ] Deploy seed nodes for bootstrap +- [ ] Optional: Deploy discovery service for enhanced search +- [ ] Create client libraries for catalog queries +- [ ] Add monitoring for catalog health + +## Security Considerations + +1. **Authentication**: Validate gossip exchanges with mutual TLS or tokens +2. **Data Validation**: Validate all received metadata against schema +3. **Rate Limiting**: Limit gossip frequency to prevent DoS +4. **Access Control**: Some metadata might be restricted by domain + +## Monitoring + +```python +# src/catalog/metrics.py +from prometheus_client import Counter, Gauge, Histogram + +# Metrics for catalog operations +catalog_gossip_exchanges = Counter( + 'catalog_gossip_exchanges_total', + 'Total gossip exchanges', + ['status'] +) + +catalog_peer_count = Gauge( + 'catalog_peer_count', + 'Number of known peers', + ['domain'] +) + +catalog_query_duration = Histogram( + 'catalog_query_duration_seconds', + 'Catalog query duration' +) +``` \ No newline at end of file diff --git a/infrastructure/pulumi/koyeb_native.py b/infrastructure/pulumi/koyeb_native.py new file mode 100644 index 0000000..a14ba08 --- /dev/null +++ b/infrastructure/pulumi/koyeb_native.py @@ -0,0 +1,98 @@ +"""Koyeb deployment configuration using Pulumi with native Koyeb provider. + +This module defines the infrastructure for deploying the DuckDB-SPAWN application to Koyeb +using the native Pulumi Koyeb provider instead of CLI commands.""" + +import pulumi +from pulumi import Config +import pulumi_koyeb as koyeb + +# Get configuration +config = Config("duckdb-spawn") +environment = config.require("environment") +log_level = config.require("logLevel") +image_tag = config.get("imageTag", "latest") + +# Docker registry configuration +registry_config = Config("registry") +registry_username = registry_config.require("username") +registry_password = registry_config.require_secret("password") + +# Define the Docker image +docker_image = f"docker.io/{registry_username}/duckdb-spawn:{environment}-{image_tag}" + +# Create Koyeb Provider +koyeb_provider = koyeb.Provider("koyeb-provider", + token=config.require_secret("koyeb_token"), +) + +# Create Koyeb app +app = koyeb.App("duckdb-spawn-app", + name=f"duckdb-spawn-{environment}", + opts=pulumi.ResourceOptions(provider=koyeb_provider) +) + +# Create Koyeb Service +service = koyeb.Service("duckdb-spawn-service", + app_name=app.name, + name="api", + definition=koyeb.ServiceDefinitionArgs( + instance=koyeb.ServiceDefinitionInstanceArgs( + type="nano", + region="fra", + ), + ports=[koyeb.ServiceDefinitionPortArgs( + port=8000, + protocol="http", + )], + routes=[koyeb.ServiceDefinitionRouteArgs( + path="/", + port=8000, + )], + health_check=koyeb.ServiceDefinitionHealthCheckArgs( + port=8000, + path="/monitoring/health", + protocol="http", + initial_delay_seconds=20, + timeout_seconds=10, + period_seconds=30, + success_threshold=1, + failure_threshold=3, + ), + env_vars=[ + koyeb.ServiceDefinitionEnvVarArgs( + key="DATABASE_URL", + value="/data/duckdb_spawn.db", + ), + koyeb.ServiceDefinitionEnvVarArgs( + key="PYTHONUNBUFFERED", + value="1", + ), + koyeb.ServiceDefinitionEnvVarArgs( + key="LOG_LEVEL", + value=log_level, + ), + koyeb.ServiceDefinitionEnvVarArgs( + key="ENVIRONMENT", + value=environment, + ), + ], + deployments=[koyeb.ServiceDefinitionDeploymentArgs( + docker=koyeb.ServiceDefinitionDeploymentDockerArgs( + image=docker_image, + registry_auth=koyeb.ServiceDefinitionDeploymentDockerRegistryAuthArgs( + username=registry_username, + password=registry_password, + ), + ), + )], + ), + opts=pulumi.ResourceOptions(provider=koyeb_provider, depends_on=[app]) +) + +# Export outputs +pulumi.export("app_name", app.name) +pulumi.export("service_name", service.name) +pulumi.export("environment", environment) +pulumi.export("image_tag", image_tag) +pulumi.export("docker_image", docker_image) \ No newline at end of file diff --git a/infrastructure/pulumi/requirements.txt b/infrastructure/pulumi/requirements.txt index f52e979..26be4d4 100644 --- a/infrastructure/pulumi/requirements.txt +++ b/infrastructure/pulumi/requirements.txt @@ -1,2 +1,3 @@ pulumi>=3.0.0 -pulumi-docker>=4.0.0 \ No newline at end of file +pulumi-docker>=4.0.0 +pulumi-koyeb==0.1.11 \ No newline at end of file diff --git a/metadata_autogeneration_proposal.md b/metadata_autogeneration_proposal.md new file mode 100644 index 0000000..6c72901 --- /dev/null +++ b/metadata_autogeneration_proposal.md @@ -0,0 +1,567 @@ +# Metadata Autogeneration Proposal + +## Format Recommendation: JSON-LD + +I recommend **JSON-LD** for contracts because: +1. **Native JSON**: Works seamlessly with your existing Pydantic/FastAPI stack +2. **Semantic Context**: Adds meaning without complexity via `@context` +3. **Extensible**: Easy to add domain-specific vocabularies +4. **Machine-Readable**: Better for automated validation and discovery +5. **Human-Friendly**: Still readable when kept simple + +## Autogeneration Architecture + +```mermaid +graph LR + subgraph "Data Product" + PM[Pydantic Models] + FA[FastAPI Routes] + DB[DuckDB Schema] + + IG[Introspection
Generator] + + PM --> IG + FA --> IG + DB --> IG + + IG --> MD[Metadata
JSON-LD] + IG --> DC[Data Contract
JSON-LD] + end + + MD --> REG[Registry] + DC --> CON[Contract Store] +``` + +## Implementation + +### 1. Metadata Introspection Module + +Create `src/metadata/introspector.py`: + +```python +from typing import Dict, List, Any, Optional +from pydantic import BaseModel +from fastapi import FastAPI +from datetime import datetime +import inspect +import duckdb +from ..database.connection_manager import DuckDBConnectionManager + +class SchemaField(BaseModel): + name: str + type: str + nullable: bool = True + description: Optional[str] = None + constraints: List[str] = [] + +class EndpointInfo(BaseModel): + path: str + method: str + summary: Optional[str] + parameters: List[Dict[str, Any]] + response_model: Optional[str] + rate_limit: Optional[str] + +class DataProductMetadata(BaseModel): + """Auto-generated metadata for data product""" + # JSON-LD context + context: Dict[str, str] = { + "@vocab": "https://schema.org/", + "dcat": "http://www.w3.org/ns/dcat#", + "dcterms": "http://purl.org/dc/terms/", + "datamesh": "https://datamesh.org/schema/" + } + + # Identity + id: str + type: str = "dcat:Dataset" + name: str + version: str + + # Generated metadata + tables: Dict[str, List[SchemaField]] + endpoints: List[EndpointInfo] + models: Dict[str, Dict[str, Any]] + + # Quality metrics (auto-calculated) + data_quality_score: float + api_coverage: float + documentation_score: float + + # Timestamps + generated_at: datetime + schema_version: str + +class MetadataIntrospector: + def __init__(self, app: FastAPI, db_manager: DuckDBConnectionManager): + self.app = app + self.db_manager = db_manager + + async def generate_metadata(self) -> Dict[str, Any]: + """Generate complete metadata for the data product""" + metadata = { + "@context": { + "@vocab": "https://schema.org/", + "dcat": "http://www.w3.org/ns/dcat#", + "dcterms": "http://purl.org/dc/terms/", + "datamesh": "https://datamesh.org/schema/" + }, + "@type": "dcat:Dataset", + "@id": f"urn:datamesh:finance:product:{self._get_product_id()}", + + # Basic info from environment + "name": self._get_product_name(), + "dcterms:identifier": self._get_product_id(), + "version": self._get_version(), + + # Auto-discovered components + "datamesh:dataSchemas": await self._introspect_database(), + "datamesh:apiEndpoints": self._introspect_endpoints(), + "datamesh:dataModels": self._introspect_models(), + + # Quality metrics + "dcat:qualityMetric": self._calculate_quality_metrics(), + + # Operational metadata + "dcterms:created": datetime.utcnow().isoformat(), + "dcterms:conformsTo": "https://datamesh.org/spec/1.0", + + # Access information + "dcat:accessURL": self._get_base_url(), + "dcat:mediaType": ["application/json", "application/parquet"], + "datamesh:protocols": ["REST", "GraphQL"] if self._has_graphql() else ["REST"], + } + + return metadata + + async def _introspect_database(self) -> List[Dict[str, Any]]: + """Introspect database schema""" + schemas = [] + + with self.db_manager.get_connection() as conn: + # Get all tables + tables = conn.execute(""" + SELECT table_schema, table_name + FROM information_schema.tables + WHERE table_schema NOT IN ('information_schema', 'pg_catalog') + """).fetchall() + + for schema_name, table_name in tables: + # Get columns for each table + columns = conn.execute(""" + SELECT + column_name, + data_type, + is_nullable, + column_default + FROM information_schema.columns + WHERE table_schema = ? AND table_name = ? + ORDER BY ordinal_position + """, [schema_name, table_name]).fetchall() + + schema_info = { + "@type": "datamesh:TableSchema", + "name": f"{schema_name}.{table_name}", + "datamesh:fields": [ + { + "@type": "datamesh:Field", + "name": col[0], + "datamesh:dataType": self._map_sql_type(col[1]), + "datamesh:nullable": col[2] == 'YES', + "datamesh:defaultValue": col[3] + } + for col in columns + ] + } + schemas.append(schema_info) + + return schemas + + def _introspect_endpoints(self) -> List[Dict[str, Any]]: + """Introspect FastAPI endpoints""" + endpoints = [] + + for route in self.app.routes: + if hasattr(route, 'endpoint'): + endpoint_info = { + "@type": "datamesh:APIEndpoint", + "datamesh:path": route.path, + "datamesh:methods": list(route.methods), + "dcterms:description": route.endpoint.__doc__ or "", + "datamesh:parameters": self._get_endpoint_parameters(route), + } + + # Extract rate limit if present + if hasattr(route.endpoint, '__wrapped__'): + for decorator in route.endpoint.__wrapped__.__decorators__: + if 'limit' in str(decorator): + endpoint_info["datamesh:rateLimit"] = str(decorator) + + endpoints.append(endpoint_info) + + return endpoints + + def _introspect_models(self) -> Dict[str, Dict[str, Any]]: + """Introspect Pydantic models""" + models = {} + + # Get all Pydantic models from the app + for name, obj in inspect.getmembers(self.app): + if inspect.isclass(obj) and issubclass(obj, BaseModel): + schema = obj.schema() + models[name] = { + "@type": "datamesh:DataModel", + "datamesh:modelName": name, + "datamesh:jsonSchema": schema + } + + return models + + def _calculate_quality_metrics(self) -> Dict[str, float]: + """Calculate quality metrics""" + return { + "@type": "dcat:QualityMeasurement", + "datamesh:schemaCompleteness": self._calculate_schema_completeness(), + "datamesh:apiDocumentation": self._calculate_api_documentation_score(), + "datamesh:dataFreshness": self._calculate_data_freshness(), + } +``` + +### 2. Contract Generation + +Create `src/contracts/generator.py`: + +```python +from typing import Dict, List, Any, Optional +from pydantic import BaseModel +from datetime import datetime, timedelta + +class DataContract(BaseModel): + """Auto-generated data contract""" + # JSON-LD context for contracts + context: Dict[str, str] = { + "@vocab": "https://schema.org/", + "dcat": "http://www.w3.org/ns/dcat#", + "odrl": "http://www.w3.org/ns/odrl/2/", + "datamesh": "https://datamesh.org/schema/" + } + + # Contract identity + id: str + provider: str + consumer: str + + # Terms + data_products: List[str] + endpoints: List[Dict[str, Any]] + + # SLA + sla: Dict[str, Any] + + # Data quality assertions + quality_assertions: List[Dict[str, Any]] + + # Validity + valid_from: datetime + valid_until: Optional[datetime] + +class ContractGenerator: + def __init__(self, introspector: MetadataIntrospector): + self.introspector = introspector + + async def generate_contract( + self, + consumer_id: str, + requested_endpoints: List[str], + requested_quality: Dict[str, float] + ) -> Dict[str, Any]: + """Generate a data contract based on consumer requirements""" + + # Get current metadata + metadata = await self.introspector.generate_metadata() + + contract = { + "@context": { + "@vocab": "https://schema.org/", + "odrl": "http://www.w3.org/ns/odrl/2/", + "datamesh": "https://datamesh.org/contract/" + }, + "@type": "datamesh:DataContract", + "@id": f"urn:contract:{self._generate_contract_id()}", + + # Parties + "datamesh:provider": { + "@id": metadata["@id"], + "name": metadata["name"] + }, + "datamesh:consumer": { + "@id": f"urn:consumer:{consumer_id}", + "name": consumer_id + }, + + # What's being provided + "datamesh:dataProducts": [{ + "@id": metadata["@id"], + "datamesh:endpoints": self._filter_endpoints( + metadata["datamesh:apiEndpoints"], + requested_endpoints + ), + "datamesh:schemas": metadata["datamesh:dataSchemas"] + }], + + # Service Level Agreement + "datamesh:sla": { + "@type": "datamesh:SLA", + "datamesh:availability": 99.9, + "datamesh:responseTime": { + "p99": 500, + "unit": "milliseconds" + }, + "datamesh:throughput": { + "max": 1000, + "unit": "requests/minute" + } + }, + + # Quality Assertions + "datamesh:qualityAssertions": self._generate_quality_assertions( + requested_quality, + metadata["dcat:qualityMetric"] + ), + + # Terms + "odrl:permission": [{ + "odrl:action": "odrl:read", + "odrl:target": metadata["@id"], + "odrl:constraint": { + "odrl:dateTime": { + "odrl:after": datetime.utcnow().isoformat(), + "odrl:before": (datetime.utcnow() + timedelta(days=365)).isoformat() + } + } + }], + + # Metadata + "dcterms:created": datetime.utcnow().isoformat(), + "dcterms:valid": { + "start": datetime.utcnow().isoformat(), + "end": (datetime.utcnow() + timedelta(days=365)).isoformat() + } + } + + return contract + + def _generate_quality_assertions( + self, + requested: Dict[str, float], + available: Dict[str, float] + ) -> List[Dict[str, Any]]: + """Generate quality assertions based on requirements""" + assertions = [] + + for metric, requested_value in requested.items(): + available_value = available.get(metric, 0.0) + + assertion = { + "@type": "datamesh:QualityAssertion", + "datamesh:metric": metric, + "datamesh:operator": "gte", + "datamesh:threshold": min(requested_value, available_value), + "datamesh:current": available_value + } + + assertions.append(assertion) + + return assertions +``` + +### 3. Integration with Routes + +Add to `src/routes/metadata.py`: + +```python +from fastapi import APIRouter, HTTPException +from ..metadata.introspector import MetadataIntrospector +from ..contracts.generator import ContractGenerator + +router = APIRouter(prefix="/metadata", tags=["metadata"]) + +# Initialize introspector +introspector = MetadataIntrospector(app, db_manager) +contract_generator = ContractGenerator(introspector) + +@router.get("/") +async def get_metadata(): + """Get auto-generated metadata for this data product""" + return await introspector.generate_metadata() + +@router.get("/schemas") +async def get_schemas(): + """Get data schemas in JSON-LD format""" + schemas = await introspector._introspect_database() + return { + "@context": "https://datamesh.org/schema/", + "@graph": schemas + } + +@router.post("/contracts/generate") +async def generate_contract( + consumer_id: str, + requested_endpoints: List[str] = None, + quality_requirements: Dict[str, float] = None +): + """Generate a data contract for a consumer""" + contract = await contract_generator.generate_contract( + consumer_id=consumer_id, + requested_endpoints=requested_endpoints or [], + requested_quality=quality_requirements or {} + ) + + return contract + +@router.get("/contracts/template") +async def get_contract_template(): + """Get a contract template with all available options""" + metadata = await introspector.generate_metadata() + + return { + "@context": "https://datamesh.org/contract/", + "available_endpoints": [ + { + "path": ep["datamesh:path"], + "methods": ep["datamesh:methods"] + } + for ep in metadata["datamesh:apiEndpoints"] + ], + "available_quality_metrics": list(metadata["dcat:qualityMetric"].keys()), + "sla_options": { + "availability": [99.0, 99.9, 99.99], + "response_time_p99_ms": [100, 500, 1000], + "throughput_rpm": [100, 1000, 10000] + } + } +``` + +### 4. Validation & Monitoring + +Create `src/contracts/validator.py`: + +```python +class ContractValidator: + def __init__(self, contract: Dict[str, Any]): + self.contract = contract + + async def validate_compliance(self, metrics: Dict[str, float]) -> Dict[str, Any]: + """Validate current metrics against contract assertions""" + results = { + "compliant": True, + "violations": [], + "warnings": [] + } + + for assertion in self.contract.get("datamesh:qualityAssertions", []): + metric = assertion["datamesh:metric"] + threshold = assertion["datamesh:threshold"] + current = metrics.get(metric, 0) + + if current < threshold: + results["compliant"] = False + results["violations"].append({ + "metric": metric, + "expected": threshold, + "actual": current + }) + elif current < threshold * 1.1: # Within 10% of threshold + results["warnings"].append({ + "metric": metric, + "message": f"Close to threshold: {current} vs {threshold}" + }) + + return results +``` + +## Example Generated Metadata + +```json +{ + "@context": { + "@vocab": "https://schema.org/", + "dcat": "http://www.w3.org/ns/dcat#", + "datamesh": "https://datamesh.org/schema/" + }, + "@type": "dcat:Dataset", + "@id": "urn:datamesh:finance:product:550e8400", + "name": "Project Financing Data Product", + "version": "1.2.1", + + "datamesh:dataSchemas": [{ + "@type": "datamesh:TableSchema", + "name": "public.projects", + "datamesh:fields": [{ + "name": "project_id", + "datamesh:dataType": "uuid", + "datamesh:nullable": false + }] + }], + + "datamesh:apiEndpoints": [{ + "@type": "datamesh:APIEndpoint", + "datamesh:path": "/ops/projects", + "datamesh:methods": ["GET", "POST"], + "datamesh:rateLimit": "10/minute" + }], + + "dcat:qualityMetric": { + "datamesh:schemaCompleteness": 0.95, + "datamesh:apiDocumentation": 0.90, + "datamesh:dataFreshness": 0.99 + } +} +``` + +## Example Generated Contract + +```json +{ + "@context": { + "@vocab": "https://schema.org/", + "datamesh": "https://datamesh.org/contract/" + }, + "@type": "datamesh:DataContract", + "@id": "urn:contract:abc123", + + "datamesh:provider": { + "@id": "urn:datamesh:finance:product:550e8400", + "name": "Project Financing Data Product" + }, + + "datamesh:consumer": { + "@id": "urn:consumer:analytics-team", + "name": "analytics-team" + }, + + "datamesh:sla": { + "datamesh:availability": 99.9, + "datamesh:responseTime": { + "p99": 500, + "unit": "milliseconds" + } + }, + + "datamesh:qualityAssertions": [{ + "datamesh:metric": "dataFreshness", + "datamesh:operator": "gte", + "datamesh:threshold": 0.95 + }] +} +``` + +## Benefits + +1. **Zero Manual Work**: All metadata generated from code +2. **Always Current**: Reflects actual implementation +3. **Semantic**: JSON-LD provides meaning and context +4. **Validatable**: Contracts can be automatically validated +5. **Extensible**: Easy to add domain-specific fields + +Total implementation: ~400 lines for complete autogeneration! \ No newline at end of file diff --git a/multi_tenancy_proposal.md b/multi_tenancy_proposal.md new file mode 100644 index 0000000..5a98427 --- /dev/null +++ b/multi_tenancy_proposal.md @@ -0,0 +1,612 @@ +# Multi-Tenancy Proposal for Data Mesh + +## Overview + +A pragmatic multi-tenancy approach that leverages DuckDB's schema capabilities and FastAPI's middleware system. This design supports both shared and isolated deployment models without adding complexity. + +## Architecture Options + +### Option 1: Schema-Based Isolation (Recommended) + +```mermaid +graph TB + subgraph "Data Product Instance" + API[FastAPI] + subgraph "DuckDB" + Public[public schema
shared data] + T1[tenant_1 schema] + T2[tenant_2 schema] + T3[tenant_3 schema] + end + end + + API --> Public + API --> T1 + API --> T2 + API --> T3 +``` + +### Option 2: Database-Per-Tenant + +```mermaid +graph LR + subgraph "Data Product Instance" + API[FastAPI] + DB1[(tenant1.db)] + DB2[(tenant2.db)] + DB3[(tenant3.db)] + end + + API --> DB1 + API --> DB2 + API --> DB3 +``` + +## Implementation (Schema-Based) + +### 1. Tenant Context Middleware + +Create `src/middleware/tenant.py`: + +```python +from fastapi import Request, HTTPException +from typing import Optional +import jwt +import os + +class TenantContext: + """Thread-local tenant context""" + _tenant_id: Optional[str] = None + + @classmethod + def set_tenant(cls, tenant_id: str): + cls._tenant_id = tenant_id + + @classmethod + def get_tenant(cls) -> Optional[str]: + return cls._tenant_id + + @classmethod + def clear(cls): + cls._tenant_id = None + +async def tenant_middleware(request: Request, call_next): + """Extract and set tenant context from request""" + try: + # Option 1: From JWT token + auth_header = request.headers.get("Authorization", "") + if auth_header.startswith("Bearer "): + token = auth_header.split(" ")[1] + payload = jwt.decode( + token, + os.getenv("JWT_SECRET", "secret"), + algorithms=["HS256"] + ) + tenant_id = payload.get("tenant_id") + + # Option 2: From header + elif "X-Tenant-ID" in request.headers: + tenant_id = request.headers["X-Tenant-ID"] + + # Option 3: From subdomain + elif "." in request.headers.get("host", ""): + tenant_id = request.headers["host"].split(".")[0] + + else: + # Default tenant for backwards compatibility + tenant_id = "default" + + # Validate tenant + if not tenant_id or not tenant_id.replace("-", "").isalnum(): + raise HTTPException(status_code=400, detail="Invalid tenant ID") + + TenantContext.set_tenant(tenant_id) + response = await call_next(request) + TenantContext.clear() + + return response + + except Exception as e: + TenantContext.clear() + raise +``` + +### 2. Update Connection Manager + +Modify `src/database/connection_manager.py`: + +```python +from contextlib import contextmanager +from typing import Generator, Optional +import duckdb +from ..middleware.tenant import TenantContext + +class TenantAwareConnectionPool: + """Connection pool with tenant awareness""" + + def __init__(self, db_path: str = "data/data_product.db"): + self.db_path = db_path + self.initialized_tenants = set() + + @contextmanager + def get_connection(self, tenant_id: Optional[str] = None) -> Generator[duckdb.DuckDBPyConnection, None, None]: + """Get a connection with tenant context""" + if not tenant_id: + tenant_id = TenantContext.get_tenant() or "default" + + connection = duckdb.connect(self.db_path) + + try: + # Initialize tenant schema if needed + if tenant_id not in self.initialized_tenants: + self._initialize_tenant_schema(connection, tenant_id) + self.initialized_tenants.add(tenant_id) + + # Set search path to tenant schema + if tenant_id != "default": + connection.execute(f"SET search_path = tenant_{tenant_id}, public") + + yield connection + + finally: + connection.close() + + def _initialize_tenant_schema(self, conn: duckdb.DuckDBPyConnection, tenant_id: str): + """Initialize schema for a new tenant""" + if tenant_id == "default": + return # Use public schema for default + + schema_name = f"tenant_{tenant_id}" + + # Create schema if not exists + conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}") + + # Create tenant-specific tables + conn.execute(f""" + CREATE TABLE IF NOT EXISTS {schema_name}.projects ( + project_id UUID PRIMARY KEY, + project_name VARCHAR NOT NULL, + description TEXT, + total_amount DECIMAL(20,2) NOT NULL, + maturity_years INTEGER NOT NULL, + expected_tri DECIMAL(5,2) NOT NULL, + dscr DECIMAL(5,2) NOT NULL, + status VARCHAR NOT NULL, + creation_date DATE NOT NULL, + last_updated TIMESTAMP NOT NULL, + currency_code CHAR(3) NOT NULL DEFAULT 'USD' + ) + """) + + # Add other tables... + conn.commit() + +# Update the singleton to use tenant-aware pool +class DuckDBConnectionManager: + _instance = None + _pool = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(DuckDBConnectionManager, cls).__new__(cls) + cls._pool = TenantAwareConnectionPool() + return cls._instance + + @contextmanager + def get_connection(self, tenant_id: Optional[str] = None) -> Generator[duckdb.DuckDBPyConnection, None, None]: + with self._pool.get_connection(tenant_id) as conn: + yield conn +``` + +### 3. Tenant Configuration + +Create `src/models/tenant.py`: + +```python +from pydantic import BaseModel, Field +from typing import Optional, Dict +from datetime import datetime +from enum import Enum + +class TenantTier(str, Enum): + STARTER = "starter" + PROFESSIONAL = "professional" + ENTERPRISE = "enterprise" + +class TenantLimits(BaseModel): + max_projects: int = 100 + max_api_calls_per_day: int = 10000 + max_storage_gb: float = 10.0 + data_retention_days: int = 90 + +class TenantConfig(BaseModel): + tenant_id: str + name: str + tier: TenantTier = TenantTier.STARTER + limits: TenantLimits = Field(default_factory=TenantLimits) + features: Dict[str, bool] = { + "graphql_api": False, + "advanced_analytics": False, + "data_export": True, + "api_webhooks": False + } + created_at: datetime = Field(default_factory=datetime.utcnow) + is_active: bool = True +``` + +### 4. Tenant Management Routes + +Create `src/routes/tenants.py`: + +```python +from fastapi import APIRouter, HTTPException, Depends +from typing import List +from ..models.tenant import TenantConfig, TenantTier +from ..database.connection_manager import DuckDBConnectionManager +from ..middleware.tenant import TenantContext + +router = APIRouter(prefix="/admin/tenants", tags=["tenant-management"]) +db_manager = DuckDBConnectionManager() + +# Store tenant configs in a system table +TENANT_TABLE = """ +CREATE TABLE IF NOT EXISTS system.tenants ( + tenant_id VARCHAR PRIMARY KEY, + config_json TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL +) +""" + +@router.post("/", response_model=TenantConfig) +async def create_tenant(config: TenantConfig): + """Create a new tenant (admin only)""" + with db_manager.get_connection("system") as conn: + # Ensure system schema exists + conn.execute("CREATE SCHEMA IF NOT EXISTS system") + conn.execute(TENANT_TABLE) + + # Check if tenant exists + existing = conn.execute( + "SELECT 1 FROM system.tenants WHERE tenant_id = ?", + [config.tenant_id] + ).fetchone() + + if existing: + raise HTTPException(status_code=409, detail="Tenant already exists") + + # Insert tenant config + conn.execute(""" + INSERT INTO system.tenants (tenant_id, config_json, created_at, updated_at) + VALUES (?, ?, ?, ?) + """, [ + config.tenant_id, + config.json(), + config.created_at, + config.created_at + ]) + + conn.commit() + + return config + +@router.get("/{tenant_id}", response_model=TenantConfig) +async def get_tenant(tenant_id: str): + """Get tenant configuration""" + with db_manager.get_connection("system") as conn: + result = conn.execute( + "SELECT config_json FROM system.tenants WHERE tenant_id = ?", + [tenant_id] + ).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Tenant not found") + + return TenantConfig.parse_raw(result[0]) +``` + +### 5. Tenant-Aware Operations + +Update `src/routes/operations.py`: + +```python +from fastapi import Request, HTTPException, Depends +from typing import List +from ..models.tenant import TenantConfig, TenantTier +from ..database.connection_manager import DuckDBConnectionManager +from ..middleware.tenant import TenantContext +import uuid +from datetime import datetime +from ..utils.rate_limiter import limiter + +router = APIRouter(prefix="/ops", tags=["operations"]) +db_manager = DuckDBConnectionManager() + +# Placeholder for other models/schemas +class Project(BaseModel): + project_name: str + description: Optional[str] = None + total_amount: float + maturity_years: int + expected_tri: float + dscr: float + status: str + currency_code: str = "USD" + +async def get_tenant_config(tenant_id: str) -> TenantConfig: + """Helper to get tenant config from DB""" + with db_manager.get_connection("system") as conn: + result = conn.execute( + "SELECT config_json FROM system.tenants WHERE tenant_id = ?", + [tenant_id] + ).fetchone() + if not result: + raise HTTPException(status_code=404, detail="Tenant not found") + return TenantConfig.parse_raw(result[0]) + +async def get_project_count(tenant_id: str) -> int: + """Helper to get project count for a tenant""" + with db_manager.get_connection(tenant_id) as conn: + result = conn.execute( + "SELECT COUNT(*) FROM projects" + ).fetchone() + return result[0] + +@router.post("/projects", response_model=dict) +@limiter.limit("10/minute") +async def create_project(request: Request, project: Project): + """Create a new project in tenant's schema""" + tenant_id = TenantContext.get_tenant() + + # Check tenant limits + tenant_config = await get_tenant_config(tenant_id) + current_count = await get_project_count(tenant_id) + + if current_count >= tenant_config.limits.max_projects: + raise HTTPException( + status_code=429, + detail=f"Project limit ({tenant_config.limits.max_projects}) reached" + ) + + with db_manager.get_connection() as conn: + # Projects will be created in tenant's schema due to search_path + result = conn.execute(""" + INSERT INTO projects ( + project_id, project_name, description, total_amount, + maturity_years, expected_tri, dscr, status, + creation_date, last_updated, currency_code + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + RETURNING project_id + """, [ + str(uuid.uuid4()), + project.project_name, + project.description, + project.total_amount, + project.maturity_years, + project.expected_tri, + project.dscr, + project.status.value, + datetime.now().date(), + datetime.now(), + project.currency_code + ]).fetchone() + + conn.commit() + + return {"project_id": str(result[0]), "status": "created"} +``` + +### 6. Cross-Tenant Analytics (Optional) + +For admin/analytics across tenants: + +```python +from fastapi import APIRouter, Depends +from typing import List +from ..models.tenant import TenantConfig +from ..database.connection_manager import DuckDBConnectionManager +from ..middleware.tenant import TenantContext +from fastapi.security import HTTPBearer +import jwt +import os + +router = APIRouter(prefix="/analytics", tags=["analytics"]) +db_manager = DuckDBConnectionManager() + +# Placeholder for other models/schemas +class TenantConfig(BaseModel): + tenant_id: str + name: str + tier: TenantTier = TenantTier.STARTER + limits: TenantLimits = Field(default_factory=TenantLimits) + features: Dict[str, bool] = { + "graphql_api": False, + "advanced_analytics": False, + "data_export": True, + "api_webhooks": False + } + created_at: datetime = Field(default_factory=datetime.utcnow) + is_active: bool = True + +async def verify_admin(token: str = Depends(HTTPBearer())): + """Verify admin token""" + try: + payload = jwt.decode( + token.credentials, + os.getenv("JWT_SECRET", "secret"), + algorithms=["HS256"] + ) + if payload.get("role") != "admin": + raise HTTPException(status_code=403, detail="Admin role required") + return payload + except jwt.ExpiredSignatureError: + raise HTTPException(status_code=401, detail="Token expired") + except jwt.InvalidTokenError: + raise HTTPException(status_code=401, detail="Invalid token") + +@router.get("/cross-tenant", tags=["admin"]) +async def cross_tenant_analytics(admin_token: str = Depends(verify_admin)): + """Get analytics across all tenants""" + with db_manager.get_connection("system") as conn: + # Get all active tenants + tenants = conn.execute(""" + SELECT tenant_id, config_json + FROM system.tenants + WHERE config_json::json->>'is_active' = 'true' + """).fetchall() + + analytics = [] + for tenant_id, config_json in tenants: + config = TenantConfig.parse_raw(config_json) + + # Query each tenant's schema + stats = conn.execute(f""" + SELECT + COUNT(*) as project_count, + SUM(total_amount) as total_value, + AVG(expected_tri) as avg_tri + FROM tenant_{tenant_id}.projects + WHERE status = 'ACTIVE' + """).fetchone() + + analytics.append({ + "tenant_id": tenant_id, + "tenant_name": config.name, + "tier": config.tier, + "project_count": stats[0] or 0, + "total_value": float(stats[1] or 0), + "avg_tri": float(stats[2] or 0) + }) + + return {"analytics": analytics} +``` + +### 7. Data Isolation Patterns + +```python +# src/utils/tenant_isolation.py +from functools import wraps +from fastapi import HTTPException +from ..middleware.tenant import TenantContext + +def tenant_isolated(func): + """Decorator to ensure queries are tenant-isolated""" + @wraps(func) + async def wrapper(*args, **kwargs): + tenant_id = TenantContext.get_tenant() + if not tenant_id: + raise HTTPException(status_code=401, detail="No tenant context") + + # Add tenant_id to kwargs if function accepts it + if 'tenant_id' in func.__code__.co_varnames: + kwargs['tenant_id'] = tenant_id + + return await func(*args, **kwargs) + return wrapper + +# Usage: +@tenant_isolated +async def get_tenant_projects(tenant_id: str): + # Automatically receives tenant_id from context + pass +``` + +## Deployment Strategies + +### 1. Single Instance, Multiple Tenants +```yaml +# docker-compose.yml +services: + data-product: + image: duckdb-spawn:latest + environment: + - MULTI_TENANT_MODE=true + - DEFAULT_TENANT_TIER=starter + volumes: + - ./data:/app/data # All tenants share storage +``` + +### 2. Tenant-Specific Instances +```yaml +# docker-compose.tenant-a.yml +services: + data-product-tenant-a: + image: duckdb-spawn:latest + environment: + - TENANT_ID=tenant-a + - SINGLE_TENANT_MODE=true + volumes: + - ./data/tenant-a:/app/data + ports: + - "8001:8000" +``` + +### 3. Hybrid Approach +- Starter/Professional tiers: Shared instance with schema isolation +- Enterprise tier: Dedicated instance with full isolation + +## Migration Path + +```python +# src/scripts/migrate_to_multitenant.py +import duckdb +from pathlib import Path + +def migrate_existing_data(db_path: str, target_tenant: str = "default"): + """Migrate single-tenant data to multi-tenant structure""" + conn = duckdb.connect(db_path) + + # Create tenant schema + if target_tenant != "default": + schema_name = f"tenant_{target_tenant}" + conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}") + + # Get all tables + tables = conn.execute(""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'main' + """).fetchall() + + # Copy tables to tenant schema + for (table_name,) in tables: + print(f"Migrating table: {table_name}") + conn.execute(f""" + CREATE TABLE {schema_name}.{table_name} AS + SELECT * FROM main.{table_name} + """) + + conn.close() + print(f"Migration completed for tenant: {target_tenant}") +``` + +## Security Considerations + +1. **Tenant Isolation**: Schema-level isolation prevents cross-tenant data access +2. **Connection String Validation**: Prevent SQL injection in schema names +3. **Resource Limits**: Implement per-tenant quotas +4. **Audit Logging**: Track all cross-tenant operations + +## Monitoring + +```python +# Add to metrics +tenant_request_count = Counter( + 'tenant_request_count', + 'Requests per tenant', + ['tenant_id', 'endpoint'] +) + +tenant_storage_bytes = Gauge( + 'tenant_storage_bytes', + 'Storage used per tenant', + ['tenant_id'] +) +``` + +## Benefits + +- **Simple**: Leverages DuckDB's schema support +- **Efficient**: Shared resources for small tenants +- **Flexible**: Easy to move tenants between isolation levels +- **Backwards Compatible**: Default tenant for existing deployments \ No newline at end of file diff --git a/registry_secrets.html b/registry_secrets.html new file mode 100644 index 0000000..d42054c --- /dev/null +++ b/registry_secrets.html @@ -0,0 +1,109 @@ +Private Container Registry Secrets | Koyeb
Build & Deploy
Private Container Registry Secrets

Private Container Registry Secrets

+

Koyeb allows you to easily deploy an App using Docker containers. Koyeb supports the deployment of containers hosted on any private Docker registry. This lets you build containers with your continuous delivery pipeline and host them on a secure private registry to protect your intellectual property.

+

We provide an easy-to-use form in the App and Service creation views to automatically create a Secret with the right format. The form supports:

+
    +
  • Azure Container Registry (ACR)
  • +
  • DockerHub private repositories
  • +
  • DigitalOcean Container Registry
  • +
  • GCP Container Registry
  • +
  • GitHub Container Registry (ghcr.io)
  • +
  • GitLab Container Registry
  • +
+

If you're using one the above registries, you probably don't need this documentation. Simply use the form embedded in the web interface.

+

Refer to how to deploy containers for generic instructions about +App deployment from containers.

+

In this guide, we explain how to manually create Secrets containing private registry credentials.

+

The Koyeb registry Secret format

+

To use a private registry, the Koyeb platform needs to be able to access the registry and you will need to create a Secret with the login information for your registry. You will then reference the registry Secret when you deploy your Service. The Secret creation described below is automatically done when you use the form embedded in the web interface.

+

The Secret needs to contain a JSON with the right parameters for your registry:

+
{
+  "auths": {
+    "<YOUR_REGISTRY_URL>": {
+      "auth": "<YOUR_TOKEN>"
+    }
+  }
+}
+

The <YOUR_REGISTRY_URL> string should be replaced by your registry URL.

+

The <YOUR_TOKEN> string is a Base64-encoded authentication string. You can generate a compatible string by typing:

+
echo -n "<USERNAME>:<TOKEN>" | base64
+

Replace the <USERNAME> and <TOKEN> placeholders with your registry credentials.

+

Refer to the provider specific instruction section if you need help providing the right credentials.

+

As a general rule, the Koyeb Secret will contain the same output as the config.json file generated by the docker login command on Linux or on Windows.

+

The JSON can contain either an auth field (set to USERNAME:TOKEN encoded in Base64) or two fields without encoding: username and password:

+
{
+  "auths": {
+    "<YOUR_REGISTRY_URL>": {
+      "username": "<USERNAME>",
+      "password": "<PASSWORD>"
+    }
+  }
+}
+

Remember to modify the <YOUR_REGISTRY_URL>, <USERNAME>, and <PASSWORD> to match the values associated with your registry.

+

In both cases, the Secrets will be encrypted server-side by Koyeb.

+

Provider specific instructions

+

Koyeb's implementation is compatible with all Docker-compatible registries. Below, we provide details on how to add private registry secrets for some of the most common registry providers:

+ +

Contact us if you need help or if your registry provider is not yet documented!

+

GitHub Container Registry

+

Please note that we support GitHub Container +Registry (opens in a new tab) +and not the older GitHub Packages Docker registry. Your registry URL should start with ghcr.io.

+

Build and push your container

+

In GitHub, create a Personal access token (opens in a new tab) with the write:packages permission.

+

Afterwards, log in, build, and push your container:

+

Don't forget to replace <PERSONAL_ACCESS_TOKEN> and <GITHUB_USERNAME> with your own token and GitHub username in the commands below.

+
echo "<PERSONAL_ACCESS_TOKEN>" | docker login ghcr.io -u "<GITHUB_USERNAME>" --password-stdin
+docker build . --tag ghcr.io/koyeb-community/koyeb-debug-container:0.0.1
+docker push ghcr.io/koyeb-community/koyeb-debug-container:0.0.1
+

Create the Koyeb Secret

+

In GitHub, create a Personal access token (opens in a new tab) with the read:packages permission.

+

Then, in Koyeb, create a Secret called my-registry-secret:

+

Replace the <USERNAME> and <PERSONAL_ACCESS_TOKEN> with your own token and GitHub username in the JSON below:

+
{
+  "auths": {
+    "ghcr.io": {
+      "username": "<USERNAME>",
+      "password": "<PERSONAL_ACCESS_TOKEN>"
+    }
+  }
+}
+

DockerHub

+

Generate an auth string with the following command, replacing <USERNAME> and <TOKEN> with the values associated with your registry:

+
echo -n "<USERNAME>:<TOKEN>" | base64
+

Replace the <GENERATED_TOKEN> placeholder in the JSON below with the output from the previous command to configure authentication to your private registry:

+
{
+  "auths": {
+    "index.docker.io/v1/": {
+      "auth": "<GENERATED_TOKEN>"
+    }
+  }
+}
+

GCP Container Registry

+

First, create a dedicated service account with a JSON key file (opens in a new tab).

+

To generate a valid auth token, execute the following command where keyfile.json is the file containing your newly created key:

+
echo -n "_json_key:$(cat keyfile.json)" | base64
+

Next, replace the <GENERATED_TOKEN> placeholder in the JSON below with the output from the previous command:

+
{
+  "auths": {
+    "gcr.io": {
+      "auth": "<GENERATED_TOKEN>"
+    }
+  }
+}

Copyright © 2025 Koyeb
\ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3851472..f3d2e6c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,5 +11,5 @@ python-json-logger slowapi==0.1.8 pulumi>=3.0.0,<4.0.0 pulumi-docker>=4.0.0,<5.0.0 -pulumi-koyeb>=1.0.0 +pulumi-koyeb==0.1.11 httpx==0.25.2 diff --git a/secrets_docs.txt b/secrets_docs.txt new file mode 100644 index 0000000..cfca21f --- /dev/null +++ b/secrets_docs.txt @@ -0,0 +1 @@ +404: This page could not be found

404

This page could not be found.

\ No newline at end of file diff --git a/simple_catalog_architecture.md b/simple_catalog_architecture.md new file mode 100644 index 0000000..c52f016 --- /dev/null +++ b/simple_catalog_architecture.md @@ -0,0 +1,409 @@ +# Simple Decentralized Catalog for Data Mesh + +## Overview + +A minimal catalog system that leverages your existing DuckDB and FastAPI stack. Each data product maintains its own catalog table and periodically syncs with peers. + +## Core Design + +```mermaid +graph LR + subgraph "Data Product A" + API_A[FastAPI] + DB_A[(DuckDB
catalog table)] + end + + subgraph "Data Product B" + API_B[FastAPI] + DB_B[(DuckDB
catalog table)] + end + + subgraph "Data Product C" + API_C[FastAPI] + DB_C[(DuckDB
catalog table)] + end + + API_A <--> API_B + API_B <--> API_C + API_C <--> API_A +``` + +## Implementation + +### 1. Catalog Table Schema + +Add to your existing `src/database/schema.py`: + +```python +# Additional schema for catalog +CATALOG_SCHEMA = """ + CREATE TABLE IF NOT EXISTS data_product_catalog ( + -- Identity + product_id UUID PRIMARY KEY, + name VARCHAR NOT NULL, + domain VARCHAR NOT NULL, + version VARCHAR NOT NULL, + + -- Access + base_url VARCHAR NOT NULL, + health_endpoint VARCHAR DEFAULT '/health', + + -- Metadata + description TEXT, + owner_email VARCHAR, + quality_score DECIMAL(3,2) DEFAULT 0.5, + + -- Discovery + last_seen TIMESTAMP NOT NULL, + first_discovered TIMESTAMP NOT NULL, + is_self BOOLEAN DEFAULT FALSE, + + -- Technical + supports_sql BOOLEAN DEFAULT TRUE, + supports_graphql BOOLEAN DEFAULT FALSE + ) +""" + +# Add to SCHEMA_DEFINITIONS +SCHEMA_DEFINITIONS["catalog"] = CATALOG_SCHEMA +``` + +### 2. Simple Catalog Model + +Add to a new file `src/models/catalog.py`: + +```python +from pydantic import BaseModel, Field +from datetime import datetime +from typing import Optional +import uuid + +class DataProductInfo(BaseModel): + """Minimal data product information for catalog""" + product_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + name: str + domain: str + version: str = "1.0.0" + base_url: str + description: Optional[str] = None + owner_email: Optional[str] = None + quality_score: float = Field(default=0.5, ge=0, le=1) + supports_sql: bool = True + supports_graphql: bool = False + +class CatalogEntry(DataProductInfo): + """Catalog entry with timestamps""" + last_seen: datetime = Field(default_factory=datetime.utcnow) + first_discovered: datetime = Field(default_factory=datetime.utcnow) + is_self: bool = False +``` + +### 3. Catalog Routes + +Add new file `src/routes/catalog.py`: + +```python +from fastapi import APIRouter, HTTPException, BackgroundTasks +from typing import List +import httpx +from datetime import datetime, timedelta +import asyncio +import os + +from ..models.catalog import DataProductInfo, CatalogEntry +from ..database.connection_manager import DuckDBConnectionManager + +router = APIRouter(prefix="/catalog", tags=["catalog"]) +db_manager = DuckDBConnectionManager() + +# Self information +SELF_INFO = DataProductInfo( + product_id=os.getenv("PRODUCT_ID", "550e8400-e29b-41d4-a716-446655440000"), + name=os.getenv("PRODUCT_NAME", "Project Financing Data Product"), + domain="finance", + version="1.2.1", + base_url=os.getenv("BASE_URL", "http://localhost:8000"), + description="Manages project financing data", + owner_email="finance-team@example.com", + quality_score=0.95 +) + +# Seed peers from environment +SEED_PEERS = os.getenv("CATALOG_PEERS", "").split(",") +SEED_PEERS = [p.strip() for p in SEED_PEERS if p.strip()] + +@router.get("/info", response_model=DataProductInfo) +async def get_product_info(): + """Get this data product's information""" + return SELF_INFO + +@router.get("/peers", response_model=List[CatalogEntry]) +async def list_peers(domain: str = None, include_self: bool = True): + """List known data products""" + with db_manager.get_connection() as conn: + query = "SELECT * FROM data_product_catalog WHERE 1=1" + params = [] + + if not include_self: + query += " AND is_self = false" + + if domain: + query += " AND domain = ?" + params.append(domain) + + query += " ORDER BY quality_score DESC, name" + + result = conn.execute(query, params).fetchall() + + return [ + CatalogEntry( + product_id=row[0], + name=row[1], + domain=row[2], + version=row[3], + base_url=row[4], + description=row[6], + owner_email=row[7], + quality_score=row[8], + last_seen=row[9], + first_discovered=row[10], + is_self=row[11], + supports_sql=row[12], + supports_graphql=row[13] + ) + for row in result + ] + +@router.post("/sync") +async def sync_catalog(background_tasks: BackgroundTasks): + """Trigger catalog synchronization with known peers""" + background_tasks.add_task(sync_with_peers) + return {"status": "sync initiated"} + +async def sync_with_peers(): + """Background task to sync with all known peers""" + # Get active peers + with db_manager.get_connection() as conn: + peers = conn.execute(""" + SELECT base_url FROM data_product_catalog + WHERE is_self = false + AND last_seen > datetime('now', '-1 day') + """).fetchall() + + peer_urls = [row[0] for row in peers] + SEED_PEERS + + # Sync with each peer + async with httpx.AsyncClient(timeout=5.0) as client: + tasks = [sync_with_peer(client, url) for url in peer_urls] + await asyncio.gather(*tasks, return_exceptions=True) + +async def sync_with_peer(client: httpx.AsyncClient, peer_url: str): + """Sync catalog with a single peer""" + try: + # Get peer's info + response = await client.get(f"{peer_url}/catalog/info") + if response.status_code == 200: + peer_info = DataProductInfo(**response.json()) + update_catalog_entry(peer_info) + + # Get peer's known peers + response = await client.get(f"{peer_url}/catalog/peers") + if response.status_code == 200: + peers = response.json() + for peer_data in peers[:20]: # Limit to prevent explosion + entry = CatalogEntry(**peer_data) + if entry.product_id != SELF_INFO.product_id: + update_catalog_entry(entry) + except Exception as e: + print(f"Failed to sync with {peer_url}: {e}") + +def update_catalog_entry(entry: DataProductInfo): + """Update or insert catalog entry""" + with db_manager.get_connection() as conn: + # Check if exists + existing = conn.execute( + "SELECT product_id FROM data_product_catalog WHERE product_id = ?", + [entry.product_id] + ).fetchone() + + if existing: + # Update + conn.execute(""" + UPDATE data_product_catalog + SET name = ?, domain = ?, version = ?, base_url = ?, + description = ?, owner_email = ?, quality_score = ?, + last_seen = datetime('now'), supports_sql = ?, + supports_graphql = ? + WHERE product_id = ? + """, [ + entry.name, entry.domain, entry.version, entry.base_url, + entry.description, entry.owner_email, entry.quality_score, + entry.supports_sql, entry.supports_graphql, entry.product_id + ]) + else: + # Insert + conn.execute(""" + INSERT INTO data_product_catalog ( + product_id, name, domain, version, base_url, + description, owner_email, quality_score, last_seen, + first_discovered, is_self, supports_sql, supports_graphql + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), + datetime('now'), false, ?, ?) + """, [ + entry.product_id, entry.name, entry.domain, entry.version, + entry.base_url, entry.description, entry.owner_email, + entry.quality_score, entry.supports_sql, entry.supports_graphql + ]) + + conn.commit() + +# Initialize self entry on startup +@router.on_event("startup") +async def initialize_catalog(): + """Initialize catalog with self entry""" + with db_manager.get_connection() as conn: + # Ensure catalog table exists + conn.execute(CATALOG_SCHEMA) + + # Insert or update self + conn.execute(""" + INSERT OR REPLACE INTO data_product_catalog ( + product_id, name, domain, version, base_url, + description, owner_email, quality_score, last_seen, + first_discovered, is_self, supports_sql, supports_graphql + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), + datetime('now'), true, ?, ?) + """, [ + SELF_INFO.product_id, SELF_INFO.name, SELF_INFO.domain, + SELF_INFO.version, SELF_INFO.base_url, SELF_INFO.description, + SELF_INFO.owner_email, SELF_INFO.quality_score, + SELF_INFO.supports_sql, SELF_INFO.supports_graphql + ]) + conn.commit() + +# Periodic sync task +async def periodic_sync(): + """Run sync every 5 minutes""" + while True: + await asyncio.sleep(300) # 5 minutes + await sync_with_peers() + +@router.on_event("startup") +async def start_periodic_sync(): + """Start background sync task""" + asyncio.create_task(periodic_sync()) +``` + +### 4. Add to Main App + +In `src/main.py`, add: + +```python +from .routes import admin, monitoring, operations, catalog + +# Register routes +app.include_router(admin.router) +app.include_router(operations.router) +app.include_router(monitoring.router) +app.include_router(catalog.router) # Add this line +``` + +### 5. Simple Discovery Query + +Add to `src/routes/catalog.py`: + +```python +@router.get("/discover") +async def discover_data_products( + domain: str = None, + min_quality: float = 0.0, + limit: int = 50 +): + """Simple discovery endpoint""" + with db_manager.get_connection() as conn: + query = """ + SELECT * FROM data_product_catalog + WHERE quality_score >= ? + AND last_seen > datetime('now', '-7 days') + """ + params = [min_quality] + + if domain: + query += " AND domain = ?" + params.append(domain) + + query += " ORDER BY quality_score DESC LIMIT ?" + params.append(limit) + + results = conn.execute(query, params).fetchall() + + return { + "count": len(results), + "products": [ + { + "name": row[1], + "domain": row[2], + "url": row[4], + "description": row[6], + "quality_score": row[8] + } + for row in results + ] + } +``` + +## Configuration + +Add to your `.env`: + +```bash +# Catalog Configuration +PRODUCT_ID=550e8400-e29b-41d4-a716-446655440000 +PRODUCT_NAME=Project Financing Data Product +BASE_URL=https://finance-dp.example.com + +# Comma-separated list of peer URLs for bootstrap +CATALOG_PEERS=https://hr-dp.example.com,https://sales-dp.example.com +``` + +## Usage + +1. **Self-registration**: Automatic on startup +2. **Discovery**: `GET /catalog/discover?domain=finance` +3. **List peers**: `GET /catalog/peers` +4. **Manual sync**: `POST /catalog/sync` +5. **Automatic sync**: Every 5 minutes in background + +## Benefits + +- **Dead simple**: Just one table, a few endpoints +- **No new dependencies**: Uses existing DuckDB and FastAPI +- **Resilient**: Works even if peers are down +- **Efficient**: Leverages DuckDB's query capabilities +- **Gradual adoption**: Products can join anytime + +## Monitoring + +Add to existing metrics: + +```python +# In src/utils/metrics.py +catalog_peer_count = Gauge( + 'catalog_peer_count', + 'Number of discovered data products' +) + +catalog_sync_duration = Histogram( + 'catalog_sync_duration_seconds', + 'Time to sync with peers' +) +``` + +## Total Lines of Code + +- Schema addition: ~20 lines +- Model: ~25 lines +- Routes: ~200 lines +- Total: **~245 lines** for a complete decentralized catalog + +This is all you need for a functional decentralized catalog that fits perfectly with your existing architecture! \ No newline at end of file