This guide covers the complete installation and usage workflow for the Progressive Automations desk lifter control system.
- Hardware Setup: Follow the Raspberry Pi Setup Guide for hardware configuration
- Bill of Materials: See Bill of Materials for required components
- Raspberry Pi: Raspberry Pi 5 with Debian Trixie and Python 3.11+
After installation, you can customize duty cycle limits and calibration values by editing the configuration file:
# Location: site-packages/progressive_automations_python/config.py
# Or find it with: python -c "import progressive_automations_python.config as c; print(c.__file__)"
# Key settings to adjust during initial setup:
# - DUTY_CYCLE_PERCENTAGE: Motor duty cycle (default: 0.10 = 10%)
# - MAX_CONTINUOUS_RUNTIME: Maximum single movement time (default: 30s)
# - LOWEST_HEIGHT / HIGHEST_HEIGHT: Your desk's physical range
# - UP_RATE / DOWN_RATE: Measured movement rates (inches/second)The configuration includes validation to prevent invalid values.
pip install progressive-automations-pythonThis installs the package with all dependencies, including Prefect for workflow orchestration.
You need a Prefect Cloud account for remote workflow orchestration.
- Sign up at https://www.prefect.io/
- Get your API key from the Prefect Cloud dashboard
- Login from your Raspberry Pi using Prefect's CLI:
prefect cloud login -k <your-api-key>Create a work pool for the desk lifter using Prefect's CLI:
prefect work-pool create desk-lifter-pool --type processDeploy all desk control flows using Python:
from progressive_automations_python.deployment import create_deployments
create_deployments("desk-lifter-pool")This creates the following deployments:
simple-movement-flow/move-to-position- Move to a specific heightcustom-movements-flow/custom-movements- Execute multiple configured movementstest-sequence-flow/test-sequence- Test sequence (up, wait, down)duty-cycle-monitoring-flow/duty-cycle-monitor- On-demand duty cycle check
On your Raspberry Pi, start a worker to execute flows using Prefect's CLI:
prefect worker start --pool desk-lifter-poolKeep this running in a terminal or set up as a systemd service for automatic startup.
The `progressive_automations_python` CLI is for initial hardware testing and troubleshooting. For production use, trigger flows via `run_deployment()` or Prefect's CLI. The package can also be used standalone, independent of Prefect.
Test UP or DOWN movement for 2 seconds to verify GPIO connections:
progressive_automations_python --test UP
progressive_automations_python --test DOWNView current duty cycle usage during debugging:
progressive_automations_python --statusTrigger movements asynchronously from external systems and poll their status later.
From any Python environment with network access to Prefect Cloud:
from prefect.deployments import run_deployment
# Trigger a movement (returns immediately with timeout=0)
flow_run = run_deployment(
name="simple-movement-flow/move-to-position",
parameters={"target_height": 35.5, "current_height": 24.0},
timeout=0 # Return immediately without waiting
)
print(f"Movement started with flow run ID: {flow_run.id}")
# Continue with other work while the desk moves...Check if the movement has completed:
from prefect import get_client
import asyncio
async def check_movement_status(flow_run_id):
"""Check if the movement has completed"""
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
print(f"Status: {flow_run.state.type}")
if flow_run.state.type == "COMPLETED":
# Movement completed successfully
result = await flow_run.state.result()
print(f"✅ Movement completed!")
print(f" Final position: {result['movement_result']['end_height']}\"")
print(f" At target: {abs(result['movement_result']['end_height'] - result['movement_result']['start_height']) < 0.1}")
print(f" Duty cycle remaining: {result['final_duty_status']['remaining_capacity']:.1f}s")
return result
elif flow_run.state.type == "FAILED":
print(f"❌ Movement failed: {flow_run.state.message}")
return None
else:
print(f"⏳ Still running... (state: {flow_run.state.type})")
return None
# Check status
result = asyncio.run(check_movement_status(flow_run.id))Wait for a movement to complete with periodic polling:
import asyncio
import time
async def wait_for_movement_completion(flow_run_id, check_interval=5, max_wait=300):
"""
Poll until movement completes or timeout.
Args:
flow_run_id: The flow run ID from run_deployment
check_interval: Seconds between status checks (default: 5)
max_wait: Maximum time to wait in seconds (default: 300)
Returns:
dict with completion status and result
"""
from prefect import get_client
start_time = time.time()
async with get_client() as client:
while time.time() - start_time < max_wait:
flow_run = await client.read_flow_run(flow_run_id)
if flow_run.state.is_final():
if flow_run.state.type == "COMPLETED":
result = await flow_run.state.result()
return {
"completed": True,
"success": True,
"result": result
}
else:
return {
"completed": True,
"success": False,
"error": flow_run.state.message
}
print(f"⏳ Still moving... ({time.time() - start_time:.1f}s elapsed)")
await asyncio.sleep(check_interval)
return {
"completed": False,
"success": False,
"error": "Timeout waiting for movement"
}
# Use it
result = asyncio.run(wait_for_movement_completion(flow_run.id))
if result["completed"] and result["success"]:
print(f"✅ Desk reached target position!")
print(f"Details: {result['result']}")
else:
print(f"❌ Movement did not complete: {result.get('error', 'Unknown error')}")Check if there's enough duty cycle capacity before triggering a movement:
from prefect.deployments import run_deployment
# Check current duty cycle status
status_run = run_deployment(
name="duty-cycle-monitoring-flow/duty-cycle-monitor",
timeout=30 # Wait for result
)
remaining = status_run["status"]["remaining_capacity"]
if remaining > 10: # Need at least 10 seconds
# Safe to trigger movement
flow_run = run_deployment(
name="simple-movement-flow/move-to-position",
parameters={"target_height": 35.5},
timeout=0
)
print(f"Movement triggered: {flow_run.id}")
else:
print(f"⚠️ Insufficient duty cycle capacity ({remaining:.1f}s remaining)")
print("Wait for duty cycle window to reset")You can also trigger flows directly using Prefect's CLI:
# Trigger a movement to 30 inches
prefect deployment run 'simple-movement-flow/move-to-position' --param target_height=30.0
# Run a test sequence
prefect deployment run 'test-sequence-flow/test-sequence' --param movement_distance=0.5 --param rest_time=10.0
# Check duty cycle
prefect deployment run 'duty-cycle-monitoring-flow/duty-cycle-monitor'When integrating with other equipment that depends on the desk position:
async def orchestrate_equipment_workflow():
"""
Example: Move desk, wait for completion, then trigger dependent equipment
"""
from prefect.deployments import run_deployment
from prefect import get_client
import asyncio
# Step 1: Trigger desk movement
print("Step 1: Moving desk to position...")
desk_run = run_deployment(
name="simple-movement-flow/move-to-position",
parameters={"target_height": 30.0},
timeout=0
)
# Step 2: Poll until desk reaches position
print("Step 2: Waiting for desk to reach position...")
result = await wait_for_movement_completion(desk_run.id)
if not (result["completed"] and result["success"]):
raise RuntimeError("Desk movement failed")
print(f"✅ Desk at position: {result['result']['movement_result']['end_height']}\"")
# Step 3: Now safe to trigger dependent equipment
print("Step 3: Triggering dependent equipment...")
# ... trigger your other equipment here ...
return {"desk_movement": result, "equipment_triggered": True}
# Run the orchestration
result = asyncio.run(orchestrate_equipment_workflow())The system enforces a 10% duty cycle (2 minutes on, 18 minutes off) to protect the motor:
- Maximum continuous runtime: 30 seconds
- Maximum usage in 20-minute window: 120 seconds (2 minutes)
- Automatic tracking: All movements are tracked automatically
- Safety enforcement: Movements exceeding limits are rejected
View current usage:
progressive_automations_python --statusOutput example:
=== DUTY CYCLE STATUS ===
Current usage: 15.2s / 120.0s (12.7%)
Remaining capacity: 104.8s
Percentage used: 12.7%
Window period: 1200s (20 minutes)
Current position: 24.0"
Last movement: 2.1s ago
✅ GOOD CAPACITY - Normal operations possible
Generate optimized movement sequences based on current duty cycle:
progressive_automations_python --generate-movementsThis creates movement_configs.json with movements that:
- Respect the 30-second continuous runtime limit
- Use available capacity efficiently
- Demonstrate successful movements within limits
- Show duty cycle protection when limits would be exceeded
Error: Movement would exceed 10% duty cycle limit
Solution: Wait for the duty cycle window to reset. Check status with:
progressive_automations_python --statusError: Permission denied when accessing GPIO
Solution: Ensure your user is in the gpio group:
sudo usermod -a -G gpio $USER
# Then rebootError: Flow triggered but never executes
Solution: Ensure a Prefect worker is running:
prefect worker start --pool default-process-poolConsider setting up a systemd service to keep the worker running.
Error: No current height provided and no last known position
Solution: Provide the current height explicitly:
progressive_automations_python --move 30.0 --current 24.0The system will remember the position for future movements.
# Login to Prefect Cloud
prefect cloud login -k <api-key>
# Create work pool
prefect work-pool create desk-lifter-pool --type process
# Start worker (keep running)
prefect worker start --pool desk-lifter-pool
# Trigger deployments manually
prefect deployment run 'simple-movement-flow/move-to-position' --param target_height=30.0
prefect deployment run 'test-sequence-flow/test-sequence'
prefect deployment run 'duty-cycle-monitoring-flow/duty-cycle-monitor'# Hardware testing (initial setup)
progressive_automations_python --test UP|DOWN
# Status check (debugging)
progressive_automations_python --statusFor viewing complete Python examples for async deployment and polling:
progressive_automations_python --examplesThis displays comprehensive code examples for:
- Async movement triggering
- Status polling
- Polling loops with timeout
- Duty cycle checking
- Equipment workflow orchestration
- Complete hardware setup per Raspberry Pi Setup
- Install package:
pip install progressive-automations-python - Configure Prefect Cloud:
prefect cloud login -k <api-key> - Create work pool:
prefect work-pool create desk-lifter-pool --type process - Deploy flows:
python -c "from progressive_automations_python.deployment import create_deployments; create_deployments('desk-lifter-pool')" - Start worker:
prefect worker start --pool desk-lifter-pool - Test hardware (optional):
progressive_automations_python --test UP - Trigger flows via
run_deployment()from automation code
For more information, see: