Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions monarch_extension/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,33 +756,6 @@ impl ClientActor {
PyList::new(py, messages)
}

/// Get the status of all the worlds from the system.
#[pyo3(signature = (filter = None))]
fn world_status<'py>(
&mut self,
py: Python<'py>,
filter: Option<&PySystemSnapshotFilter>,
) -> PyResult<PyObject> {
let instance = self.instance.clone();
let filter = filter.cloned();
let worlds = signal_safe_block_on(py, async move {
instance
.lock()
.await
.world_status(
filter.map_or(SystemSnapshotFilter::all(), SystemSnapshotFilter::from),
)
.await
})??;
Python::with_gil(|py| {
let py_dict = PyDict::new(py);
for (world, status) in worlds {
py_dict.set_item(world.to_string(), status.to_string())?;
}
Ok(py_dict.into())
})
}

/// Get a list of procs know to this system instance.
/// world_filter contains a list of world names to filter on. Empty list means match all.
/// label_filter contains list of actor labels to filter on. Empty list means match all.
Expand Down
1 change: 0 additions & 1 deletion monarch_hyperactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ futures = { version = "0.3.31", features = ["async-await", "compat"] }
hyperactor = { version = "0.0.0", path = "../hyperactor" }
hyperactor_config = { version = "0.0.0", path = "../hyperactor_config" }
hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" }
hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiprocess" }
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
inventory = "0.3.21"
lazy_errors = "0.10.1"
Expand Down
15 changes: 0 additions & 15 deletions monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,6 @@ impl PickledMessageClientActor {
PyList::new(py, messages)
}

fn world_status<'py>(&mut self, py: Python<'py>) -> PyResult<PyObject> {
let instance = Arc::clone(&self.instance);

let worlds = signal_safe_block_on(py, async move {
instance.lock().await.world_status(Default::default()).await
})??;
Python::with_gil(|py| {
let py_dict = PyDict::new(py);
for (world, status) in worlds {
py_dict.set_item(world.to_string(), status.to_string())?;
}
Ok(py_dict.into())
})
}

#[getter]
fn actor_id(&self) -> PyResult<PyActorId> {
let instance = self.instance.blocking_lock();
Expand Down
149 changes: 1 addition & 148 deletions monarch_hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ use hyperactor::reference::ActorId;
use hyperactor::reference::Index;
use hyperactor::reference::ProcId;
use hyperactor::reference::WorldId;
use hyperactor_multiprocess::proc_actor::ProcActor;
use hyperactor_multiprocess::supervision::ProcStatus;
use hyperactor_multiprocess::supervision::ProcSupervisor;
use hyperactor_multiprocess::supervision::WorldSupervisionMessageClient;
use hyperactor_multiprocess::system_actor::ProcLifecycleMode;
use hyperactor_multiprocess::system_actor::SYSTEM_ACTOR_REF;
use hyperactor_multiprocess::system_actor::SystemMessageClient;
use hyperactor_multiprocess::system_actor::SystemSnapshotFilter;
use hyperactor_multiprocess::system_actor::WorldStatus;
use monarch_types::PickledPyObject;
use pyo3::exceptions::PyRuntimeError;
use pyo3::exceptions::PyValueError;
Expand Down Expand Up @@ -179,65 +170,6 @@ impl PyProc {
pub fn new_from_proc(proc: Proc) -> Self {
Self { inner: proc }
}

/// Bootstrap a proc into the system at the provided bootstrap address.
/// The proc will report to the system actor every
/// [`supervision_update_interval_in_sec`] seconds.
async fn bootstrap(
proc_id: &str,
bootstrap_addr: &str,
supervision_update_interval_in_sec: u64,
listen_addr: Option<String>,
) -> Result<Self> {
let proc_id: ProcId = proc_id.parse()?;
let bootstrap_addr: ChannelAddr = bootstrap_addr.parse()?;
let listen_addr = if let Some(listen_addr) = listen_addr {
listen_addr.parse()?
} else {
ChannelAddr::any(bootstrap_addr.transport())
};
let chan = channel::dial(bootstrap_addr.clone())?;
let system_sender = BoxedMailboxSender::new(MailboxClient::new(chan));
let proc_forwarder =
BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
let proc = Proc::new_with_clock(
proc_id.clone(),
proc_forwarder,
ClockKind::for_channel_addr(&bootstrap_addr),
);

let system_supervision_ref: ActorRef<ProcSupervisor> =
ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());

let bootstrap = ProcActor::bootstrap_for_proc(
proc.clone().clone(),
proc.clone()
.proc_id()
.world_id()
.expect("proc must be ranked for world id")
.clone(), // REFACTOR(marius): factor out world id
listen_addr,
bootstrap_addr.clone(),
system_supervision_ref,
Duration::from_secs(supervision_update_interval_in_sec),
HashMap::new(),
ProcLifecycleMode::Detached,
)
.await
.inspect_err(|err| {
tracing::error!("could not spawn proc actor for {}: {}", proc.proc_id(), err,);
})?;

tokio::spawn(async move {
tracing::info!(
"proc actor for {} exited with status {}",
proc_id,
bootstrap.proc_actor.await
);
});

Ok(Self { inner: proc })
}
}

#[pyfunction]
Expand All @@ -251,19 +183,7 @@ pub fn init_proc(
supervision_update_interval: u64,
listen_addr: Option<String>,
) -> PyResult<PyProc> {
// TODO: support configuring supervision_update_interval in Python binding.
let proc_id = proc_id.to_owned();
let bootstrap_addr = bootstrap_addr.to_owned();
signal_safe_block_on(py, async move {
PyProc::bootstrap(
&proc_id,
&bootstrap_addr,
supervision_update_interval,
listen_addr,
)
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})?
unimplemented!()
}

#[pyclass(
Expand Down Expand Up @@ -547,14 +467,6 @@ impl<M: RemoteMessage> InstanceWrapper<M> {

if check_staleness > Duration::from_secs(5) {
get_tokio_runtime().spawn(async move {
let _ = check_actor_supervision_state(
checker_instance,
clock,
signal_port,
controller_id.clone(),
controller_error_sender,
)
.await;
let _ = checker_instance_handle.drain_and_stop();
});
self.last_controller_status_check = self.clock.system_time_now();
Expand Down Expand Up @@ -622,21 +534,6 @@ impl<M: RemoteMessage> InstanceWrapper<M> {
Ok(messages)
}

pub async fn world_status(
&self,
filter: SystemSnapshotFilter,
) -> Result<HashMap<WorldId, WorldStatus>> {
let snapshot = SYSTEM_ACTOR_REF.snapshot(&self.instance, filter).await?;

// TODO: pulling snapshot is expensive as it contains all proc details
// We do not need those extra information.
Ok(snapshot
.worlds
.into_iter()
.map(|(k, v)| (k, v.status))
.collect())
}

pub fn instance(&self) -> &Instance<()> {
&self.instance
}
Expand All @@ -646,50 +543,6 @@ impl<M: RemoteMessage> InstanceWrapper<M> {
}
}

/// Check the supervision state of given actor from system actor. This will schedule itself to allow
/// for periodic checks.
async fn check_actor_supervision_state(
instance: Instance<()>,
clock: ClockKind,
signal_port: PortHandle<Signal>,
actor_id: ActorId,
controller_error_sender: watch::Sender<String>,
) -> Result<()> {
match clock
.timeout(
// TODO: make the timeout configurable
tokio::time::Duration::from_secs(10),
SYSTEM_ACTOR_REF.state(&instance, WorldId(actor_id.world_name().into())),
)
.await
{
Ok(Ok(Some(world_state))) => {
// Check if the controller has failed supervision heartbeats
if let Some(proc_state) = world_state.procs.get(&actor_id.rank()) {
if !matches!(proc_state.proc_health, ProcStatus::Alive) {
tracing::error!("controller {:?} is not alive, aborting!", actor_id);
// The controller is down, this only affects the mesh for the controller, other meshes
// should be unaffected, so we'll raise a worker error for this failure.
controller_error_sender
.send(format!("controller {:?} is not alive", actor_id))?;
}
}
}
Ok(_) => {
// The world isn't ready yet, we can safely ignore it
}
_ => {
// Timeout happened, system actor is down. As there is only one system actor for all meshes,
// client can't continue when system actor is down, so we stop the client here.
// TODO: should allow for multiple attempts
tracing::error!("system actor is not alive, aborting!");
// Send a signal to the client to abort.
signal_port.send(Signal::Stop).unwrap();
}
}
Ok(())
}

pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
let f = wrap_pyfunction!(init_proc, hyperactor_mod)?;
f.setattr(
Expand Down