diff --git a/monarch_extension/src/client.rs b/monarch_extension/src/client.rs index b49f1ea8c..08136bc90 100644 --- a/monarch_extension/src/client.rs +++ b/monarch_extension/src/client.rs @@ -756,33 +756,6 @@ impl ClientActor { PyList::new(py, messages) } - /// Get the status of all the worlds from the system. - #[pyo3(signature = (filter = None))] - fn world_status<'py>( - &mut self, - py: Python<'py>, - filter: Option<&PySystemSnapshotFilter>, - ) -> PyResult { - let instance = self.instance.clone(); - let filter = filter.cloned(); - let worlds = signal_safe_block_on(py, async move { - instance - .lock() - .await - .world_status( - filter.map_or(SystemSnapshotFilter::all(), SystemSnapshotFilter::from), - ) - .await - })??; - Python::with_gil(|py| { - let py_dict = PyDict::new(py); - for (world, status) in worlds { - py_dict.set_item(world.to_string(), status.to_string())?; - } - Ok(py_dict.into()) - }) - } - /// Get a list of procs know to this system instance. /// world_filter contains a list of world names to filter on. Empty list means match all. /// label_filter contains list of actor labels to filter on. Empty list means match all. diff --git a/monarch_hyperactor/Cargo.toml b/monarch_hyperactor/Cargo.toml index 42a25e804..4b6087c9d 100644 --- a/monarch_hyperactor/Cargo.toml +++ b/monarch_hyperactor/Cargo.toml @@ -34,7 +34,6 @@ futures = { version = "0.3.31", features = ["async-await", "compat"] } hyperactor = { version = "0.0.0", path = "../hyperactor" } hyperactor_config = { version = "0.0.0", path = "../hyperactor_config" } hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" } -hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiprocess" } hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" } inventory = "0.3.21" lazy_errors = "0.10.1" diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index 6d7bb77e5..4d9fdce2d 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -182,21 +182,6 @@ impl PickledMessageClientActor { PyList::new(py, messages) } - fn world_status<'py>(&mut self, py: Python<'py>) -> PyResult { - let instance = Arc::clone(&self.instance); - - let worlds = signal_safe_block_on(py, async move { - instance.lock().await.world_status(Default::default()).await - })??; - Python::with_gil(|py| { - let py_dict = PyDict::new(py); - for (world, status) in worlds { - py_dict.set_item(world.to_string(), status.to_string())?; - } - Ok(py_dict.into()) - }) - } - #[getter] fn actor_id(&self) -> PyResult { let instance = self.instance.blocking_lock(); diff --git a/monarch_hyperactor/src/proc.rs b/monarch_hyperactor/src/proc.rs index d9a6f382d..cf5b2907d 100644 --- a/monarch_hyperactor/src/proc.rs +++ b/monarch_hyperactor/src/proc.rs @@ -43,15 +43,6 @@ use hyperactor::reference::ActorId; use hyperactor::reference::Index; use hyperactor::reference::ProcId; use hyperactor::reference::WorldId; -use hyperactor_multiprocess::proc_actor::ProcActor; -use hyperactor_multiprocess::supervision::ProcStatus; -use hyperactor_multiprocess::supervision::ProcSupervisor; -use hyperactor_multiprocess::supervision::WorldSupervisionMessageClient; -use hyperactor_multiprocess::system_actor::ProcLifecycleMode; -use hyperactor_multiprocess::system_actor::SYSTEM_ACTOR_REF; -use hyperactor_multiprocess::system_actor::SystemMessageClient; -use hyperactor_multiprocess::system_actor::SystemSnapshotFilter; -use hyperactor_multiprocess::system_actor::WorldStatus; use monarch_types::PickledPyObject; use pyo3::exceptions::PyRuntimeError; use pyo3::exceptions::PyValueError; @@ -179,65 +170,6 @@ impl PyProc { pub fn new_from_proc(proc: Proc) -> Self { Self { inner: proc } } - - /// Bootstrap a proc into the system at the provided bootstrap address. - /// The proc will report to the system actor every - /// [`supervision_update_interval_in_sec`] seconds. - async fn bootstrap( - proc_id: &str, - bootstrap_addr: &str, - supervision_update_interval_in_sec: u64, - listen_addr: Option, - ) -> Result { - let proc_id: ProcId = proc_id.parse()?; - let bootstrap_addr: ChannelAddr = bootstrap_addr.parse()?; - let listen_addr = if let Some(listen_addr) = listen_addr { - listen_addr.parse()? - } else { - ChannelAddr::any(bootstrap_addr.transport()) - }; - let chan = channel::dial(bootstrap_addr.clone())?; - let system_sender = BoxedMailboxSender::new(MailboxClient::new(chan)); - let proc_forwarder = - BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender)); - let proc = Proc::new_with_clock( - proc_id.clone(), - proc_forwarder, - ClockKind::for_channel_addr(&bootstrap_addr), - ); - - let system_supervision_ref: ActorRef = - ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone()); - - let bootstrap = ProcActor::bootstrap_for_proc( - proc.clone().clone(), - proc.clone() - .proc_id() - .world_id() - .expect("proc must be ranked for world id") - .clone(), // REFACTOR(marius): factor out world id - listen_addr, - bootstrap_addr.clone(), - system_supervision_ref, - Duration::from_secs(supervision_update_interval_in_sec), - HashMap::new(), - ProcLifecycleMode::Detached, - ) - .await - .inspect_err(|err| { - tracing::error!("could not spawn proc actor for {}: {}", proc.proc_id(), err,); - })?; - - tokio::spawn(async move { - tracing::info!( - "proc actor for {} exited with status {}", - proc_id, - bootstrap.proc_actor.await - ); - }); - - Ok(Self { inner: proc }) - } } #[pyfunction] @@ -251,19 +183,7 @@ pub fn init_proc( supervision_update_interval: u64, listen_addr: Option, ) -> PyResult { - // TODO: support configuring supervision_update_interval in Python binding. - let proc_id = proc_id.to_owned(); - let bootstrap_addr = bootstrap_addr.to_owned(); - signal_safe_block_on(py, async move { - PyProc::bootstrap( - &proc_id, - &bootstrap_addr, - supervision_update_interval, - listen_addr, - ) - .await - .map_err(|e| PyRuntimeError::new_err(e.to_string())) - })? + unimplemented!() } #[pyclass( @@ -547,14 +467,6 @@ impl InstanceWrapper { if check_staleness > Duration::from_secs(5) { get_tokio_runtime().spawn(async move { - let _ = check_actor_supervision_state( - checker_instance, - clock, - signal_port, - controller_id.clone(), - controller_error_sender, - ) - .await; let _ = checker_instance_handle.drain_and_stop(); }); self.last_controller_status_check = self.clock.system_time_now(); @@ -622,21 +534,6 @@ impl InstanceWrapper { Ok(messages) } - pub async fn world_status( - &self, - filter: SystemSnapshotFilter, - ) -> Result> { - let snapshot = SYSTEM_ACTOR_REF.snapshot(&self.instance, filter).await?; - - // TODO: pulling snapshot is expensive as it contains all proc details - // We do not need those extra information. - Ok(snapshot - .worlds - .into_iter() - .map(|(k, v)| (k, v.status)) - .collect()) - } - pub fn instance(&self) -> &Instance<()> { &self.instance } @@ -646,50 +543,6 @@ impl InstanceWrapper { } } -/// Check the supervision state of given actor from system actor. This will schedule itself to allow -/// for periodic checks. -async fn check_actor_supervision_state( - instance: Instance<()>, - clock: ClockKind, - signal_port: PortHandle, - actor_id: ActorId, - controller_error_sender: watch::Sender, -) -> Result<()> { - match clock - .timeout( - // TODO: make the timeout configurable - tokio::time::Duration::from_secs(10), - SYSTEM_ACTOR_REF.state(&instance, WorldId(actor_id.world_name().into())), - ) - .await - { - Ok(Ok(Some(world_state))) => { - // Check if the controller has failed supervision heartbeats - if let Some(proc_state) = world_state.procs.get(&actor_id.rank()) { - if !matches!(proc_state.proc_health, ProcStatus::Alive) { - tracing::error!("controller {:?} is not alive, aborting!", actor_id); - // The controller is down, this only affects the mesh for the controller, other meshes - // should be unaffected, so we'll raise a worker error for this failure. - controller_error_sender - .send(format!("controller {:?} is not alive", actor_id))?; - } - } - } - Ok(_) => { - // The world isn't ready yet, we can safely ignore it - } - _ => { - // Timeout happened, system actor is down. As there is only one system actor for all meshes, - // client can't continue when system actor is down, so we stop the client here. - // TODO: should allow for multiple attempts - tracing::error!("system actor is not alive, aborting!"); - // Send a signal to the client to abort. - signal_port.send(Signal::Stop).unwrap(); - } - } - Ok(()) -} - pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> { let f = wrap_pyfunction!(init_proc, hyperactor_mod)?; f.setattr(