Skip to content

Latest commit

 

History

History
747 lines (542 loc) · 14.4 KB

File metadata and controls

747 lines (542 loc) · 14.4 KB

Operations Guide

Production operations guide for running and maintaining Phlo.

Daily Operations

Monitoring Services

Check all services are running:

phlo services status

Expected output:

SERVICE              STATUS    PORTS
postgres             running   10000
minio                running   10001-10002
nessie               running   10003
trino                running   10005
dagster-webserver    running   10006
dagster-daemon       running

Viewing Logs

Monitor service logs:

# All services
phlo services logs -f

# Specific service
phlo services logs -f dagster-webserver

# Last 100 lines
phlo services logs --tail 100 dagster-daemon

Asset Status

Check asset health and freshness:

# All assets
phlo status

# Only stale assets
phlo status --stale

# Specific group
phlo status --group csv

Manual Materialization

Trigger asset runs manually:

# Single asset
phlo materialize dlt_events

# With downstream
phlo materialize dlt_events+

# Specific partition
phlo materialize dlt_events --partition 2026-05-04

# By tag
phlo materialize --select "tag:csv"

Backup and Recovery

Database Backups

PostgreSQL:

# Backup
docker exec phlo-postgres-1 pg_dump -U postgres cascade | gzip > backup.sql.gz

# Restore
gunzip < backup.sql.gz | docker exec -i phlo-postgres-1 psql -U postgres -d cascade

Automated backups:

# Add to crontab
0 2 * * * /path/to/backup-postgres.sh
#!/bin/bash
# backup-postgres.sh
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/backups/postgres"
mkdir -p $BACKUP_DIR
docker exec phlo-postgres-1 pg_dump -U postgres cascade | \
  gzip > $BACKUP_DIR/cascade_$DATE.sql.gz

# Keep only last 30 days
find $BACKUP_DIR -name "*.sql.gz" -mtime +30 -delete

Object Storage Backups

MinIO/S3:

# Install mc (MinIO client)
brew install minio/stable/mc

# Configure
mc alias set local http://localhost:10001 minioadmin minioadmin

# Backup bucket
mc mirror local/lake /backups/minio/lake

# Restore
mc mirror /backups/minio/lake local/lake

Migrating Service Data Volumes

Some Phlo service packages persist runtime data in Docker named volumes instead of project-local ./volumes/<service> bind mounts. Before upgrading an existing project that already has bind-mounted data, copy the old directory contents into the named volume once.

Run these commands from the project root, after stopping the Phlo stack:

docker volume create loki-data
docker run --rm -v "$(pwd)/volumes/loki:/source:ro" -v loki-data:/dest \
  alpine sh -c "cp -a /source/. /dest/"

docker volume create grafana-data
docker run --rm -v "$(pwd)/volumes/grafana:/source:ro" -v grafana-data:/dest \
  alpine sh -c "cp -a /source/. /dest/"

docker volume create prometheus-data
docker run --rm -v "$(pwd)/volumes/prometheus:/source:ro" -v prometheus-data:/dest \
  alpine sh -c "cp -a /source/. /dest/"

docker volume create clickstack-data
docker run --rm -v "$(pwd)/volumes/clickstack:/source:ro" -v clickstack-data:/dest \
  alpine sh -c "cp -a /source/. /dest/"

docker volume create superset-home
docker run --rm -v "$(pwd)/volumes/superset:/source:ro" -v superset-home:/dest \
  alpine sh -c "cp -a /source/. /dest/"

Skip any command for a service you have not used or whose source directory does not exist.

Automated S3 sync:

# Using AWS CLI or rclone
rclone sync /backups/minio/lake s3://backup-bucket/lake

Nessie Catalog Backups

Nessie state is stored in PostgreSQL, so backing up Postgres includes catalog metadata.

Export specific branches:

# List branches
phlo branch list > branches_backup.txt

# Export branch commits
curl http://localhost:10003/api/v1/trees/main > main_branch.json

Branch Management

Creating Branches

# Development branch
phlo branch create dev

# Feature branch from specific ref
phlo branch create feature-xyz --from main

# With description
phlo branch create experiment --description "Testing new ingestion"

Merging Branches

# Merge dev to main
phlo branch merge dev main

# Force merge commit
phlo branch merge dev main --no-ff

Cleanup Old Branches

# List all branches
phlo branch list

# Delete specific branch
phlo branch delete old-feature

# Automated cleanup (configure in .phlo/.env.local)
BRANCH_CLEANUP_ENABLED=true
BRANCH_RETENTION_DAYS=7
BRANCH_RETENTION_DAYS_FAILED=2

Manual cleanup script:

from phlo_nessie.resource import BranchManagerResource
from datetime import datetime, timedelta

branch_manager = BranchManagerResource()

# Get all pipeline branches
branches = branch_manager.get_all_pipeline_branches()

retention_days = 7
cutoff_date = datetime.now() - timedelta(days=retention_days)

for branch in branches:
    if branch.created_at < cutoff_date:
        print(f"Deleting old branch: {branch.name}")
        branch_manager.cleanup_branch(branch.name)

Performance Optimization

Trino Query Optimization

Enable query profiling:

-- In Trino CLI
EXPLAIN ANALYZE SELECT * FROM bronze.events WHERE date = '2025-01-15';

Partition pruning:

-- Good: uses partition pruning
SELECT * FROM bronze.events WHERE partition_date = '2025-01-15';

-- Bad: full table scan
SELECT * FROM bronze.events WHERE timestamp > '2025-01-15';

Table statistics:

-- Analyze table
ANALYZE iceberg_dev.bronze.events;

-- Show stats
SHOW STATS FOR bronze.events;

Iceberg Maintenance

Optimize files:

from phlo_iceberg import get_iceberg_table

table = get_iceberg_table("bronze.events")

# Compact small files
table.optimize.compact()

# Expire old snapshots
table.expire_snapshots(older_than=30)  # days

Automated maintenance:

# workflows/maintenance/iceberg.py
from dagster import asset, schedule

@asset
def optimize_iceberg_tables():
    tables = ["bronze.events", "silver.events_cleaned"]
    for table_name in tables:
        table = get_iceberg_table(table_name)
        table.optimize.compact()
        table.expire_snapshots(older_than=30)

@schedule(cron_schedule="0 2 * * 0", job_name="weekly_maintenance")
def weekly_iceberg_maintenance():
    return RunRequest()

Dagster Performance

Use multiprocess executor for production:

# .phlo/.env.local
DAGSTER_EXECUTOR=multiprocess

Configure resource limits:

# dagster.yaml
execution:
  multiprocess:
    max_concurrent: 4
    retries:
      enabled: true
      max_retries: 3

Scaling

Horizontal Scaling

Trino workers:

Add workers in docker-compose.yml:

services:
  trino-worker-1:
    image: trinodb/trino:461
    environment:
      - TRINO_DISCOVERY_URI=http://trino:10005
    depends_on:
      - trino

  trino-worker-2:
    image: trinodb/trino:461
    environment:
      - TRINO_DISCOVERY_URI=http://trino:10005
    depends_on:
      - trino

Dagster daemon replicas:

services:
  dagster-daemon-1:
    # ... configuration

  dagster-daemon-2:
    # ... configuration

Vertical Scaling

Resource limits in docker-compose.yml:

services:
  trino:
    deploy:
      resources:
        limits:
          cpus: "4"
          memory: 8G
        reservations:
          cpus: "2"
          memory: 4G

  postgres:
    deploy:
      resources:
        limits:
          cpus: "2"
          memory: 4G

Storage Scaling

MinIO distributed mode:

services:
  minio-1:
    image: minio/minio
    command: server http://minio-{1...4}/data{1...2}

  minio-2:
    image: minio/minio
    command: server http://minio-{1...4}/data{1...2}

  minio-3:
    image: minio/minio
    command: server http://minio-{1...4}/data{1...2}

  minio-4:
    image: minio/minio
    command: server http://minio-{1...4}/data{1...2}

Security

Access Control

PostgreSQL roles:

-- Read-only role for BI
CREATE ROLE bi_readonly WITH LOGIN PASSWORD 'secure-password';
GRANT CONNECT ON DATABASE cascade TO bi_readonly;
GRANT USAGE ON SCHEMA marts TO bi_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA marts TO bi_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA marts GRANT SELECT ON TABLES TO bi_readonly;

-- Application role with limited write
CREATE ROLE app_writer WITH LOGIN PASSWORD 'secure-password';
GRANT CONNECT ON DATABASE cascade TO app_writer;
GRANT USAGE ON SCHEMA bronze TO app_writer;
GRANT INSERT, UPDATE ON ALL TABLES IN SCHEMA bronze TO app_writer;

MinIO policies:

# Create read-only policy
mc admin policy create local readonly-policy policy.json

# policy.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:GetObject"],
      "Resource": ["arn:aws:s3:::lake/*"]
    }
  ]
}

# Create user and assign policy
mc admin user add local readonly secure-password
mc admin policy attach local readonly-policy --user readonly

Network Security

Docker network isolation:

# docker-compose.yml
networks:
  backend:
    driver: bridge
  frontend:
    driver: bridge

services:
  postgres:
    networks:
      - backend

  dagster-webserver:
    networks:
      - backend
      - frontend
    ports:
      - "10006:10006" # Only expose webserver

Firewall rules:

# Allow only specific IPs to access services
iptables -A INPUT -p tcp --dport 10006 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 10006 -j DROP

Secrets Management

Use secret managers:

# AWS Secrets Manager
export POSTGRES_PASSWORD=$(aws secretsmanager get-secret-value \
  --secret-id phlo/postgres/password \
  --query SecretString --output text)

# HashiCorp Vault
export POSTGRES_PASSWORD=$(vault kv get -field=password secret/phlo/postgres)

Docker secrets:

# docker-compose.yml
secrets:
  postgres_password:
    external: true

services:
  postgres:
    secrets:
      - postgres_password
    environment:
      POSTGRES_PASSWORD_FILE: /run/secrets/postgres_password

Monitoring

Prometheus Metrics

Enable Prometheus in Dagster (via environment variables):

# .phlo/.env.local
DAGSTER_TELEMETRY_ENABLED=true
DAGSTER_PROMETHEUS_ENABLED=true
DAGSTER_PROMETHEUS_PORT=9090

Key metrics to monitor:

# Asset materialization success rate
rate(dagster_asset_materializations_total[5m])

# Asset materialization duration
histogram_quantile(0.95, rate(dagster_asset_materialization_duration_seconds_bucket[5m]))

# Failed materializations
rate(dagster_asset_materializations_failed_total[5m])

# Trino query duration
trino_query_execution_time_seconds

# MinIO storage usage
minio_disk_storage_used_bytes

Grafana Dashboards

Import dashboards:

  1. Start with observability profile:
phlo services start --profile observability
  1. Access Grafana: http://localhost:3000

  2. Import pre-built dashboards:

    • Dagster metrics
    • Trino performance
    • MinIO storage
    • PostgreSQL queries

Alerting

Configure Prometheus alerts:

# prometheus/alerts.yml
groups:
  - name: phlo_alerts
    rules:
      - alert: AssetMaterializationFailed
        expr: rate(dagster_asset_materializations_failed_total[5m]) > 0
        for: 5m
        annotations:
          summary: "Asset materialization failures detected"

      - alert: HighQueryLatency
        expr: histogram_quantile(0.95, rate(trino_query_execution_time_seconds_bucket[5m])) > 30
        for: 10m
        annotations:
          summary: "High Trino query latency"

      - alert: LowStorageSpace
        expr: (minio_disk_storage_free_bytes / minio_disk_storage_total_bytes) < 0.1
        for: 5m
        annotations:
          summary: "Low MinIO storage space"

Slack integration:

# .phlo/.env.local
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
SLACK_CHANNEL=#data-alerts

Disaster Recovery

Recovery Plan

RTO/RPO targets:

  • RTO (Recovery Time Objective): 4 hours
  • RPO (Recovery Point Objective): 24 hours

Recovery steps:

  1. Restore PostgreSQL:
# Stop services
phlo services stop

# Restore database
gunzip < backup.sql.gz | docker exec -i phlo-postgres-1 psql -U postgres -d cascade

# Start services
phlo services start
  1. Restore MinIO:
# Sync from backup
mc mirror /backups/minio/lake local/lake
  1. Verify Nessie catalog:
# Check branches
phlo branch list

# Verify table metadata
curl http://localhost:10003/api/v1/trees/main
  1. Re-materialize recent partitions:
# Last 7 days
for i in {0..6}; do
  date=$(date -d "$i days ago" +%Y-%m-%d)
  phlo materialize --partition $date
done

Testing Recovery

Regular DR drills:

#!/bin/bash
# dr-test.sh

# 1. Backup current state
./backup-all.sh

# 2. Destroy services
phlo services stop --volumes

# 3. Restore from backup
./restore-all.sh

# 4. Verify services
phlo services status

# 5. Test asset materialization
phlo materialize --select "tag:critical" --partition $(date -d "yesterday" +%Y-%m-%d)

# 6. Validate data
./validate-data.sh

Release Management

Phlo uses standard git workflows with conventional commits and tags.

Maintenance Windows

Planned Downtime

Communication:

# Announce maintenance
curl -X POST $SLACK_WEBHOOK_URL \
  -H 'Content-Type: application/json' \
  -d '{
    "channel": "#data-alerts",
    "text": "Scheduled maintenance: Phlo will be down 2025-01-15 02:00-04:00 UTC"
  }'

Maintenance tasks:

#!/bin/bash
# maintenance.sh

# 1. Stop Dagster daemon (prevent new runs)
docker stop phlo-dagster-daemon-1

# 2. Wait for running jobs to complete
while [ $(docker exec phlo-dagster-webserver-1 dagster job list --running | wc -l) -gt 0 ]; do
  sleep 60
done

# 3. Backup databases
docker exec phlo-postgres-1 pg_dump -U postgres cascade | gzip > backup_$(date +%Y%m%d).sql.gz
mc mirror local/lake /backups/minio/lake

# 4. Perform maintenance
docker exec phlo-postgres-1 psql -U postgres -d cascade -c "VACUUM ANALYZE;"

# 5. Optimize Iceberg tables
python -m phlo.maintenance.optimize_tables

# 6. Restart services
phlo services stop
phlo services start

# 7. Verify health
./health-check.sh

# 8. Announce completion
curl -X POST $SLACK_WEBHOOK_URL \
  -H 'Content-Type: application/json' \
  -d '{
    "channel": "#data-alerts",
    "text": "Maintenance complete. Phlo is back online."
  }'

Next Steps