|
| 1 | +# Running Burr Applications on Ray Workers |
| 2 | + |
| 3 | +This guide explains the pattern of running entire Burr applications on Ray workers, with actions distributed to specialized Ray actors based on tags. |
| 4 | + |
| 5 | +## Architecture Overview |
| 6 | + |
| 7 | +``` |
| 8 | +┌─────────────────────────────────────────────────────────────┐ |
| 9 | +│ Main Process (Orchestrator) │ |
| 10 | +│ │ |
| 11 | +│ ┌────────────────────────────────────────────────────┐ │ |
| 12 | +│ │ Submit applications to Ray workers │ │ |
| 13 | +│ │ run_burr_application_on_worker.remote(...) │ │ |
| 14 | +│ └────────────────────────────────────────────────────┘ │ |
| 15 | +└─────────────────────────────────────────────────────────────┘ |
| 16 | + │ |
| 17 | + │ Ray Remote Function |
| 18 | + │ |
| 19 | + ▼ |
| 20 | +┌─────────────────────────────────────────────────────────────┐ |
| 21 | +│ Ray Worker (Application Execution) │ |
| 22 | +│ │ |
| 23 | +│ ┌────────────────────────────────────────────────────┐ │ |
| 24 | +│ │ Burr Application │ │ |
| 25 | +│ │ - State management │ │ |
| 26 | +│ │ - Workflow orchestration │ │ |
| 27 | +│ │ - Interceptor routing │ │ |
| 28 | +│ └────────────────────────────────────────────────────┘ │ |
| 29 | +│ │ |
| 30 | +│ ┌────────────────────────────────────────────────────┐ │ |
| 31 | +│ │ WorkerLevelInterceptor │ │ |
| 32 | +│ │ - Routes tagged actions to actors │ │ |
| 33 | +│ │ - Executes local actions on worker │ │ |
| 34 | +│ └────────────────────────────────────────────────────┘ │ |
| 35 | +│ │ |
| 36 | +│ Local Actions (tags=["local"]) │ |
| 37 | +│ └─→ Execute directly on Ray worker │ |
| 38 | +│ │ |
| 39 | +│ Tagged Actions (tags=["gpu", "db"]) │ |
| 40 | +│ ├─→ GPU Actions → GPU Actor Pool │ |
| 41 | +│ └─→ DB Actions → DB Actor Pool │ |
| 42 | +└─────────────────────────────────────────────────────────────┘ |
| 43 | + │ |
| 44 | + │ Ray Actor Calls |
| 45 | + │ |
| 46 | + ▼ |
| 47 | +┌─────────────────────────────────────────────────────────────┐ |
| 48 | +│ Specialized Ray Actors │ |
| 49 | +│ │ |
| 50 | +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ |
| 51 | +│ │ GPU Actor 0 │ │ GPU Actor 1 │ │ DB Actor 0 │ │ |
| 52 | +│ │ │ │ │ │ │ │ |
| 53 | +│ │ - GPU Model │ │ - GPU Model │ │ - DB Conn │ │ |
| 54 | +│ │ - CUDA │ │ - CUDA │ │ - Pool │ │ |
| 55 | +│ └──────────────┘ └──────────────┘ └──────────────┘ │ |
| 56 | +│ │ |
| 57 | +│ Actions execute here with specialized resources │ |
| 58 | +└─────────────────────────────────────────────────────────────┘ |
| 59 | +``` |
| 60 | + |
| 61 | +## Key Components |
| 62 | + |
| 63 | +### 1. Ray Remote Function |
| 64 | + |
| 65 | +The `run_burr_application_on_worker` function is decorated with `@ray.remote`, making it execute on a Ray worker: |
| 66 | + |
| 67 | +```python |
| 68 | +@ray.remote |
| 69 | +def run_burr_application_on_worker( |
| 70 | + initial_state: dict, |
| 71 | + actor_pool_stats: dict, |
| 72 | + app_config: dict, |
| 73 | +) -> dict: |
| 74 | + # Creates and runs Burr application on Ray worker |
| 75 | + ... |
| 76 | +``` |
| 77 | + |
| 78 | +### 2. Worker-Level Interceptor |
| 79 | + |
| 80 | +The `WorkerLevelInterceptor` runs on the Ray worker and routes actions: |
| 81 | + |
| 82 | +- **Tagged actions** (`gpu`, `db`, `specialized`) → Route to specialized actors |
| 83 | +- **Local actions** (no matching tags) → Execute directly on worker |
| 84 | + |
| 85 | +```python |
| 86 | +class WorkerLevelInterceptor(ActionExecutionInterceptorHook): |
| 87 | + def should_intercept(self, *, action: Action, **kwargs) -> bool: |
| 88 | + return any(tag in action.tags for tag in ["gpu", "db", "specialized"]) |
| 89 | + |
| 90 | + def intercept_run(self, *, action: Action, state: State, ...): |
| 91 | + # Route to actor or execute locally |
| 92 | + ... |
| 93 | +``` |
| 94 | + |
| 95 | +### 3. Specialized Actor Pools |
| 96 | + |
| 97 | +Different actor pools for different resource types: |
| 98 | + |
| 99 | +```python |
| 100 | +actor_pool = SpecializedActorPool() |
| 101 | +gpu_actor = actor_pool.get_actor("gpu") # Round-robin selection |
| 102 | +db_actor = actor_pool.get_actor("db") |
| 103 | +``` |
| 104 | + |
| 105 | +## Execution Flow |
| 106 | + |
| 107 | +1. **Main Process**: Submits application to Ray worker |
| 108 | + ```python |
| 109 | + future = run_burr_application_on_worker.remote(initial_state, ...) |
| 110 | + ``` |
| 111 | + |
| 112 | +2. **Ray Worker**: Creates application with interceptor |
| 113 | + ```python |
| 114 | + app = ApplicationBuilder()... |
| 115 | + .with_hooks(interceptor) |
| 116 | + .build() |
| 117 | + ``` |
| 118 | + |
| 119 | +3. **Action Execution**: |
| 120 | + - **Local action** (`tags=["local"]`): Executes directly on worker |
| 121 | + - **Tagged action** (`tags=["gpu"]`): Interceptor routes to GPU actor |
| 122 | + |
| 123 | +4. **Actor Execution**: Action runs on specialized actor with resources |
| 124 | + |
| 125 | +5. **State Management**: State serialized/deserialized at boundaries |
| 126 | + |
| 127 | +## When to Use This Pattern |
| 128 | + |
| 129 | +✅ **Use when:** |
| 130 | +- You want to offload entire applications to Ray cluster |
| 131 | +- You need different resource types (GPU, DB, etc.) for different actions |
| 132 | +- You want to scale applications horizontally across Ray workers |
| 133 | +- You want to keep lightweight actions local (avoid actor overhead) |
| 134 | +- You have multiple applications that can share actor pools |
| 135 | + |
| 136 | +❌ **Don't use when:** |
| 137 | +- All actions need the same resources (use simple actor pool) |
| 138 | +- Actions are very lightweight (overhead not worth it) |
| 139 | +- You need tight coupling with main process state |
| 140 | + |
| 141 | +## Benefits |
| 142 | + |
| 143 | +1. **Horizontal Scaling**: Run multiple applications in parallel on different workers |
| 144 | +2. **Resource Specialization**: Different actors for different resource needs |
| 145 | +3. **Efficiency**: Local actions avoid actor overhead |
| 146 | +4. **Resource Sharing**: Multiple applications share actor pools on same worker |
| 147 | +5. **State Isolation**: Each application maintains independent state |
| 148 | + |
| 149 | +## State Serialization |
| 150 | + |
| 151 | +State is properly serialized/deserialized at boundaries: |
| 152 | + |
| 153 | +- **Worker → Actor**: `state.serialize()` before sending |
| 154 | +- **Actor → Worker**: `State.deserialize()` after receiving |
| 155 | + |
| 156 | +This ensures non-serializable objects (DB clients, etc.) are handled via the serde layer. |
| 157 | + |
| 158 | +## Example Usage |
| 159 | + |
| 160 | +```python |
| 161 | +# Submit multiple applications to Ray workers |
| 162 | +futures = [] |
| 163 | +for i in range(10): |
| 164 | + future = run_burr_application_on_worker.remote( |
| 165 | + initial_state={"count": i * 10}, |
| 166 | + actor_pool_stats={}, |
| 167 | + app_config={"app_id": f"app_{i}"} |
| 168 | + ) |
| 169 | + futures.append(future) |
| 170 | + |
| 171 | +# Wait for all to complete |
| 172 | +results = ray.get(futures) |
| 173 | +``` |
| 174 | + |
| 175 | +## Comparison with Other Patterns |
| 176 | + |
| 177 | +| Pattern | Application Location | Action Distribution | Use Case | |
| 178 | +|---------|---------------------|-------------------|----------| |
| 179 | +| **Basic Interceptor** | Main process | Main → Ray actors | Single app, selective offloading | |
| 180 | +| **Actor Multiplexing** | Main process | Main → Shared actor pool | Multiple apps, resource reuse | |
| 181 | +| **App on Worker** (this) | Ray worker | Worker → Specialized actors | Scale apps, resource specialization | |
| 182 | + |
| 183 | +## Next Steps |
| 184 | + |
| 185 | +- See `app_on_ray_worker.py` for complete working example |
| 186 | +- Customize actor pools for your resource types |
| 187 | +- Add persistence/tracking hooks as needed |
| 188 | +- Consider async version for non-blocking execution |
0 commit comments