Skip to content

Commit 468fab8

Browse files
Terraphim CIclaude
andcommitted
feat(orchestrator): implement Proportional fairness and wire dual-mode run loop
- Add Proportional fairness policy enforcement in ConcurrencyController::acquire() that caps each mode at its fair share of global capacity - Wire task_rx into DualModeOrchestrator::run() select loop with track_spawned_task() - Add config() getter, task_sender(), and active agent tracking - Remove all unused imports (no variable suppression -- every field fully used) - Add MockTracker test exercising fetch_candidate_issues trait implementation 73 tests pass, clippy clean, fmt clean. Co-Authored-By: Terraphim AI <noreply@anthropic.com>
1 parent f6aa11c commit 468fab8

File tree

5 files changed

+163
-36
lines changed

5 files changed

+163
-36
lines changed

crates/terraphim_orchestrator/src/concurrency.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ impl ConcurrencyController {
127127
}
128128
}
129129

130+
/// Get the active fairness policy.
131+
pub fn fairness_policy(&self) -> FairnessPolicy {
132+
self.fairness
133+
}
134+
130135
/// Acquire a slot for the given mode.
131136
async fn acquire(&self, mode: AgentMode) -> Option<AgentPermit> {
132137
// Check mode quota first
@@ -135,6 +140,31 @@ impl ConcurrencyController {
135140
return None;
136141
}
137142

143+
// Apply fairness policy: under Proportional, check whether the mode
144+
// is consuming more than its fair share of global capacity.
145+
if self.fairness == FairnessPolicy::Proportional {
146+
let counts = self.running.lock().await;
147+
let total = counts.time_driven + counts.issue_driven;
148+
let global_cap = self.global.available_permits() + total;
149+
if global_cap > 0 {
150+
let mode_count = match mode {
151+
AgentMode::TimeDriven => counts.time_driven,
152+
AgentMode::IssueDriven => counts.issue_driven,
153+
};
154+
let mode_quota = match mode {
155+
AgentMode::TimeDriven => self.quotas.time_max,
156+
AgentMode::IssueDriven => self.quotas.issue_max,
157+
};
158+
let total_quota = self.quotas.time_max + self.quotas.issue_max;
159+
// Fair share = global_cap * (mode_quota / total_quota)
160+
let fair_share = (global_cap * mode_quota) / total_quota.max(1);
161+
if mode_count >= fair_share && fair_share > 0 {
162+
tracing::debug!(?mode, mode_count, fair_share, "proportional fairness limit");
163+
return None;
164+
}
165+
}
166+
}
167+
138168
// Try to acquire global permit
139169
let global_permit = match self.global.clone().try_acquire_owned() {
140170
Ok(p) => p,

crates/terraphim_orchestrator/src/dispatcher.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//! Provides a priority queue with fairness between different dispatch sources.
44
55
use std::collections::VecDeque;
6-
use tokio::sync::{mpsc, oneshot};
76

87
/// A dispatch task from any source.
98
#[derive(Debug, Clone)]

crates/terraphim_orchestrator/src/dual_mode.rs

Lines changed: 106 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@
44
//! with shared concurrency control and unified status.
55
66
use crate::{
7-
AgentDefinition, AgentLayer, AgentOrchestrator, CompoundReviewResult, ConcurrencyController,
8-
DispatchTask, DispatcherStats, FairnessPolicy, HandoffContext, IssueMode, ModeQuotas,
9-
NightwatchMonitor, OrchestratorConfig, ScheduleEvent, TimeMode, TimeScheduler, WorkflowConfig,
7+
AgentDefinition, AgentOrchestrator, CompoundReviewResult, ConcurrencyController,
8+
DispatcherStats, FairnessPolicy, HandoffContext, ModeQuotas, OrchestratorConfig, ScheduleEvent,
9+
TimeScheduler, WorkflowConfig,
1010
};
1111
use std::collections::HashMap;
1212
use std::sync::Arc;
1313
use std::time::Duration;
1414
use terraphim_tracker::{GiteaTracker, IssueTracker};
1515
use tokio::sync::{mpsc, watch, Mutex};
16-
use tokio::task::JoinHandle;
1716
use tracing::{error, info, warn};
1817

1918
/// Shared state between time and issue modes.
@@ -88,8 +87,10 @@ pub struct DualModeOrchestrator {
8887
time_mode: Option<TimeModeComponents>,
8988
/// Issue mode controller (if enabled).
9089
issue_mode: Option<IssueModeComponents>,
91-
/// Task receiver.
90+
/// Task receiver from both modes.
9291
task_rx: mpsc::Receiver<SpawnTask>,
92+
/// Task sender for dispatching from modes.
93+
task_tx: mpsc::Sender<SpawnTask>,
9394
/// Active agents.
9495
active_agents: Arc<Mutex<HashMap<String, AgentId>>>,
9596
}
@@ -141,7 +142,7 @@ impl DualModeOrchestrator {
141142
shutdown_tx,
142143
};
143144

144-
// Create task channel
145+
// Create task channel for modes to send spawned tasks to orchestrator
145146
let (task_tx, task_rx) = mpsc::channel(128);
146147

147148
// Setup time mode
@@ -186,58 +187,95 @@ impl DualModeOrchestrator {
186187
time_mode,
187188
issue_mode,
188189
task_rx,
190+
task_tx,
189191
active_agents: Arc::new(Mutex::new(HashMap::new())),
190192
})
191193
}
192194

195+
/// Access the orchestrator configuration.
196+
pub fn config(&self) -> &OrchestratorConfig {
197+
&self.config
198+
}
199+
193200
/// Run the dual-mode orchestrator.
194201
pub async fn run(&mut self) -> Result<(), crate::OrchestratorError> {
195-
info!("starting dual-mode orchestrator");
202+
info!(
203+
agents = self.config.agents.len(),
204+
workflow_enabled = self.config.workflow.as_ref().is_some_and(|w| w.enabled),
205+
"starting dual-mode orchestrator"
206+
);
196207

197208
// Start time mode task
198-
let time_handle = if let Some(time_components) = self.time_mode.take() {
209+
let mut time_handle = if let Some(time_components) = self.time_mode.take() {
199210
let state = self.state.clone();
200211
Some(tokio::spawn(run_time_mode(time_components, state)))
201212
} else {
202213
None
203214
};
204215

205216
// Start issue mode task
206-
let issue_handle = if let Some(issue_components) = self.issue_mode.take() {
217+
let mut issue_handle = if let Some(issue_components) = self.issue_mode.take() {
207218
let state = self.state.clone();
208219
Some(tokio::spawn(run_issue_mode(issue_components, state)))
209220
} else {
210221
None
211222
};
212223

213-
// Wait for shutdown signal or task completion
224+
// Wait for shutdown signal, task completion, or spawned tasks
214225
let ctrl_c = tokio::signal::ctrl_c();
215226
tokio::pin!(ctrl_c);
216227

217-
tokio::select! {
218-
_ = async {
219-
if let Some(h) = time_handle {
220-
let _ = h.await;
228+
// Pin the optional handles for select
229+
let mut time_done = false;
230+
let mut issue_done = false;
231+
232+
loop {
233+
tokio::select! {
234+
// Receive spawned tasks from modes and track them
235+
Some(task) = self.task_rx.recv() => {
236+
self.track_spawned_task(task).await;
221237
}
222-
} => {
223-
info!("time mode completed");
224-
}
225-
_ = async {
226-
if let Some(h) = issue_handle {
227-
let _ = h.await;
238+
// Wait for time mode completion
239+
result = async {
240+
match &mut time_handle {
241+
Some(h) => h.await,
242+
None => std::future::pending().await,
243+
}
244+
}, if !time_done => {
245+
time_done = true;
246+
match result {
247+
Ok(()) => info!("time mode completed"),
248+
Err(e) => error!("time mode panicked: {}", e),
249+
}
250+
if issue_done { break; }
228251
}
229-
} => {
230-
info!("issue mode completed");
231-
}
232-
result = self.base.run() => {
233-
match result {
234-
Ok(()) => info!("base orchestrator completed"),
235-
Err(e) => error!("base orchestrator error: {}", e),
252+
// Wait for issue mode completion
253+
result = async {
254+
match &mut issue_handle {
255+
Some(h) => h.await,
256+
None => std::future::pending().await,
257+
}
258+
}, if !issue_done => {
259+
issue_done = true;
260+
match result {
261+
Ok(()) => info!("issue mode completed"),
262+
Err(e) => error!("issue mode panicked: {}", e),
263+
}
264+
if time_done { break; }
265+
}
266+
// Base orchestrator reconciliation loop
267+
result = self.base.run() => {
268+
match result {
269+
Ok(()) => info!("base orchestrator completed"),
270+
Err(e) => error!("base orchestrator error: {}", e),
271+
}
272+
break;
273+
}
274+
_ = &mut ctrl_c => {
275+
info!("shutdown signal received");
276+
let _ = self.state.shutdown_tx.send(true);
277+
break;
236278
}
237-
}
238-
_ = &mut ctrl_c => {
239-
info!("shutdown signal received");
240-
let _ = self.state.shutdown_tx.send(true);
241279
}
242280
}
243281

@@ -248,6 +286,43 @@ impl DualModeOrchestrator {
248286
Ok(())
249287
}
250288

289+
/// Track a spawned task from either mode.
290+
async fn track_spawned_task(&self, task: SpawnTask) {
291+
let mut stats = self.state.stats.lock().await;
292+
stats.total_agents_spawned += 1;
293+
match &task {
294+
SpawnTask::TimeTask { agent } => {
295+
info!(agent_name = %agent.name, "received time-driven spawn task");
296+
let mut agents = self.active_agents.lock().await;
297+
agents.insert(
298+
agent.name.clone(),
299+
AgentId {
300+
name: agent.name.clone(),
301+
mode: ExecutionMode::TimeDriven,
302+
},
303+
);
304+
*stats.active_by_mode.entry("time".into()).or_insert(0) += 1;
305+
}
306+
SpawnTask::IssueTask { issue_id, title } => {
307+
info!(issue_id = %issue_id, title = %title, "received issue-driven spawn task");
308+
let mut agents = self.active_agents.lock().await;
309+
agents.insert(
310+
issue_id.clone(),
311+
AgentId {
312+
name: issue_id.clone(),
313+
mode: ExecutionMode::IssueDriven,
314+
},
315+
);
316+
*stats.active_by_mode.entry("issue".into()).or_insert(0) += 1;
317+
}
318+
}
319+
}
320+
321+
/// Get a clone of the task sender for external dispatch.
322+
pub fn task_sender(&self) -> mpsc::Sender<SpawnTask> {
323+
self.task_tx.clone()
324+
}
325+
251326
/// Request shutdown.
252327
pub fn request_shutdown(&self) {
253328
let _ = self.state.shutdown_tx.send(true);

crates/terraphim_orchestrator/src/mode/issue.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//! Polls Gitea/Linear for issues and dispatches agents to work on them.
44
55
use crate::{ConcurrencyController, DispatchTask, Dispatcher, WorkflowConfig};
6-
use std::collections::HashMap;
76
use std::time::Duration;
87
use terraphim_tracker::{Issue, IssueTracker, PagerankClient};
98
use tracing::{error, info, warn};
@@ -230,6 +229,31 @@ mod tests {
230229
}
231230
}
232231

232+
#[tokio::test]
233+
async fn test_mock_tracker_fetch_candidate_issues() {
234+
let tracker = MockTracker {
235+
issues: vec![Issue {
236+
id: "1".into(),
237+
identifier: "TEST-1".into(),
238+
title: "Test Issue".into(),
239+
description: None,
240+
priority: Some(1),
241+
state: "open".into(),
242+
branch_name: None,
243+
url: None,
244+
labels: vec![],
245+
blocked_by: vec![],
246+
pagerank_score: None,
247+
created_at: None,
248+
updated_at: None,
249+
}],
250+
};
251+
252+
let issues = tracker.fetch_candidate_issues().await.unwrap();
253+
assert_eq!(issues.len(), 1);
254+
assert_eq!(issues[0].identifier, "TEST-1");
255+
}
256+
233257
#[test]
234258
fn test_compute_sort_score() {
235259
let issue1 = Issue {

crates/terraphim_orchestrator/src/mode/time.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
//! Manages cron-scheduled agents using the unified dispatcher.
44
55
use crate::{
6-
AgentDefinition, AgentLayer, ConcurrencyController, DispatchTask, Dispatcher, ScheduleEvent,
7-
TimeScheduler,
6+
AgentDefinition, ConcurrencyController, DispatchTask, Dispatcher, ScheduleEvent, TimeScheduler,
87
};
98
use tracing::{error, info, warn};
109

@@ -152,7 +151,7 @@ impl TimeMode {
152151
#[cfg(test)]
153152
mod tests {
154153
use super::*;
155-
use crate::ModeQuotas;
154+
use crate::{AgentLayer, ModeQuotas};
156155

157156
fn test_agent(name: &str, layer: AgentLayer) -> AgentDefinition {
158157
AgentDefinition {

0 commit comments

Comments
 (0)