Skip to content

Commit 2c0ad12

Browse files
shayne-fletchermeta-codesync[bot]
authored andcommitted
remove controller health monitoring (#2069)
Summary: Pull Request resolved: #2069 this removes some of the dead code mariusae talks about in D88434964. recover D88441603. cleanup all the dead controller supervision machinery that was left behind after hyperactor_multiprocess crate was deleted. the supervision check was already stubbed out, `ControllerError` was never constructed, and the controller fields were write-only. Reviewed By: vidhyav, mariusae Differential Revision: D88446442 fbshipit-source-id: b61b36d62e8c27dacca180324d15647e7874c64a
1 parent cd32e16 commit 2c0ad12

File tree

3 files changed

+20
-69
lines changed

3 files changed

+20
-69
lines changed

monarch_extension/src/client.rs

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::sync::Arc;
1111
use hyperactor::ActorRef;
1212
use hyperactor::data::Serialized;
1313
use monarch_hyperactor::ndslice::PySlice;
14-
use monarch_hyperactor::proc::ControllerError;
1514
use monarch_hyperactor::proc::InstanceWrapper;
1615
use monarch_hyperactor::proc::PyActorId;
1716
use monarch_hyperactor::proc::PyProc;
@@ -450,8 +449,7 @@ impl ClientActor {
450449

451450
/// Attach the client to a controller actor. This will block until the controller responds.
452451
fn attach(&mut self, py: Python, controller_id: PyActorId) -> PyResult<()> {
453-
let mut instance_wrapper = self.instance.blocking_lock();
454-
instance_wrapper.set_controller((&controller_id).into());
452+
let instance_wrapper = self.instance.blocking_lock();
455453
let actor_id = instance_wrapper.actor_id().clone();
456454
let (instance, _handler) = instance_wrapper
457455
.instance()
@@ -495,44 +493,25 @@ impl ClientActor {
495493
instance.lock().await.next_message(timeout_msec).await
496494
})?;
497495

498-
Python::with_gil(|py| {
499-
match result {
500-
Ok(Some(ClientMessage::Result { seq, result })) => {
501-
WorkerResponse { seq, result }.into_py_any(py)
502-
}
503-
Ok(Some(ClientMessage::Log { level, message })) => LogMessage {
504-
level: PyLogLevel::from(level),
505-
message,
506-
}
507-
.into_py_any(py),
508-
Ok(Some(ClientMessage::DebuggerMessage {
509-
debugger_actor_id,
510-
action,
511-
})) => DebuggerMessage {
512-
debugger_actor_id: debugger_actor_id.into(),
513-
action,
514-
}
515-
.into_py_any(py),
516-
Ok(None) => PyNone::get(py).into_py_any(py),
517-
Err(err) => {
518-
if let Some(ControllerError::Failed(controller_id, err_msg)) =
519-
err.downcast_ref::<ControllerError>()
520-
{
521-
let failure = DeviceFailure {
522-
actor_id: controller_id.clone(),
523-
address: "".to_string(), // Controller is always task 0 for now.
524-
backtrace: err_msg.clone(),
525-
};
526-
WorkerResponse {
527-
seq: Seq::default(),
528-
result: Some(Err(Exception::Failure(failure))),
529-
}
530-
.into_py_any(py)
531-
} else {
532-
Err(PyRuntimeError::new_err(err.to_string()))
533-
}
534-
}
496+
Python::with_gil(|py| match result {
497+
Ok(Some(ClientMessage::Result { seq, result })) => {
498+
WorkerResponse { seq, result }.into_py_any(py)
499+
}
500+
Ok(Some(ClientMessage::Log { level, message })) => LogMessage {
501+
level: PyLogLevel::from(level),
502+
message,
503+
}
504+
.into_py_any(py),
505+
Ok(Some(ClientMessage::DebuggerMessage {
506+
debugger_actor_id,
507+
action,
508+
})) => DebuggerMessage {
509+
debugger_actor_id: debugger_actor_id.into(),
510+
action,
535511
}
512+
.into_py_any(py),
513+
Ok(None) => PyNone::get(py).into_py_any(py),
514+
Err(err) => Err(PyRuntimeError::new_err(err.to_string())),
536515
})
537516
}
538517

monarch_hyperactor/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ serde = { version = "1.0.219", features = ["derive", "rc"] }
5151
serde_bytes = "0.11"
5252
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
5353
tempfile = "3.22"
54-
thiserror = "2.0.12"
5554
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
5655
tokio-util = { version = "0.7.15", features = ["full"] }
5756
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }

monarch_hyperactor/src/proc.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@ use std::hash::DefaultHasher;
1010
use std::hash::Hash;
1111
use std::hash::Hasher;
1212
use std::time::Duration;
13-
use std::time::SystemTime;
1413

1514
use anyhow::Result;
1615
use hyperactor::RemoteMessage;
1716
use hyperactor::actor::Signal;
1817
use hyperactor::clock::Clock;
1918
use hyperactor::clock::ClockKind;
2019
use hyperactor::data::Serialized;
21-
use hyperactor::mailbox::PortHandle;
2220
use hyperactor::mailbox::PortReceiver;
2321
use hyperactor::proc::Instance;
2422
use hyperactor::proc::Proc;
@@ -32,8 +30,6 @@ use pyo3::exceptions::PyValueError;
3230
use pyo3::prelude::*;
3331
use pyo3::types::PyList;
3432
use pyo3::types::PyType;
35-
use tokio::sync::OnceCell;
36-
use tokio::sync::watch;
3733

3834
use crate::actor::PythonActor;
3935
use crate::actor::PythonActorHandle;
@@ -317,23 +313,10 @@ pub struct InstanceWrapper<M: RemoteMessage> {
317313
signal_receiver: PortReceiver<Signal>,
318314
status: InstanceStatus,
319315

320-
// TODO(T216450632): merge actor.rs and client.rs in monarch_extension
321-
signal_port: PortHandle<Signal>,
322-
last_controller_status_check: SystemTime,
323-
controller_id: OnceCell<ActorId>,
324-
controller_error_sender: watch::Sender<String>,
325-
controller_error_receiver: watch::Receiver<String>,
326316
clock: ClockKind,
327317
actor_id: ActorId,
328318
}
329319

330-
/// Error that can occur when there is controller supervision error.
331-
#[derive(thiserror::Error, Debug)]
332-
pub enum ControllerError {
333-
#[error("controller actor {0} failed: {1}")]
334-
Failed(ActorId, String),
335-
}
336-
337320
impl<M: RemoteMessage> InstanceWrapper<M> {
338321
pub fn new(proc: &PyProc, actor_name: &str) -> Result<Self> {
339322
InstanceWrapper::new_with_instance_and_clock(
@@ -346,30 +329,20 @@ impl<M: RemoteMessage> InstanceWrapper<M> {
346329
// TEMPORARY: remove after using fixed message ports.
347330
let (_message_port, message_receiver) = instance.bind_actor_port::<M>();
348331

349-
let (signal_port, signal_receiver) = instance.bind_actor_port::<Signal>();
332+
let (_signal_port, signal_receiver) = instance.bind_actor_port::<Signal>();
350333

351-
let (controller_error_sender, controller_error_receiver) = watch::channel("".to_string());
352334
let actor_id = instance.self_id().clone();
353335

354336
Ok(Self {
355337
instance,
356338
message_receiver,
357339
signal_receiver,
358340
status: InstanceStatus::Running,
359-
signal_port,
360-
last_controller_status_check: clock.system_time_now(),
361-
controller_id: OnceCell::new(),
362-
controller_error_sender,
363-
controller_error_receiver,
364341
clock,
365342
actor_id,
366343
})
367344
}
368345

369-
pub fn set_controller(&mut self, controller_id: ActorId) {
370-
self.controller_id.set(controller_id).unwrap();
371-
}
372-
373346
/// Send a message to any actor. It is the responsibility of the caller to ensure the right
374347
/// payload accepted by the target actor has been serialized and provided to this function.
375348
pub fn send(&self, actor_id: &PyActorId, message: &PySerialized) -> PyResult<()> {

0 commit comments

Comments
 (0)