Skip to content

Commit 00db699

Browse files
: remomve dependence on hyperactor_multiprocess
Differential Revision: D88401829
1 parent fd2af8f commit 00db699

File tree

3 files changed

+1
-164
lines changed

3 files changed

+1
-164
lines changed

monarch_hyperactor/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ futures = { version = "0.3.31", features = ["async-await", "compat"] }
3434
hyperactor = { version = "0.0.0", path = "../hyperactor" }
3535
hyperactor_config = { version = "0.0.0", path = "../hyperactor_config" }
3636
hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" }
37-
hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiprocess" }
3837
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
3938
inventory = "0.3.21"
4039
lazy_errors = "0.10.1"

monarch_hyperactor/src/actor.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -176,21 +176,6 @@ impl PickledMessageClientActor {
176176
PyList::new(py, messages)
177177
}
178178

179-
fn world_status<'py>(&mut self, py: Python<'py>) -> PyResult<PyObject> {
180-
let instance = Arc::clone(&self.instance);
181-
182-
let worlds = signal_safe_block_on(py, async move {
183-
instance.lock().await.world_status(Default::default()).await
184-
})??;
185-
Python::with_gil(|py| {
186-
let py_dict = PyDict::new(py);
187-
for (world, status) in worlds {
188-
py_dict.set_item(world.to_string(), status.to_string())?;
189-
}
190-
Ok(py_dict.into())
191-
})
192-
}
193-
194179
#[getter]
195180
fn actor_id(&self) -> PyResult<PyActorId> {
196181
let instance = self.instance.blocking_lock();

monarch_hyperactor/src/proc.rs

Lines changed: 1 addition & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,6 @@ use hyperactor::reference::ActorId;
4444
use hyperactor::reference::Index;
4545
use hyperactor::reference::ProcId;
4646
use hyperactor::reference::WorldId;
47-
use hyperactor_multiprocess::proc_actor::ProcActor;
48-
use hyperactor_multiprocess::supervision::ProcStatus;
49-
use hyperactor_multiprocess::supervision::ProcSupervisor;
50-
use hyperactor_multiprocess::supervision::WorldSupervisionMessageClient;
51-
use hyperactor_multiprocess::system_actor::ProcLifecycleMode;
52-
use hyperactor_multiprocess::system_actor::SYSTEM_ACTOR_REF;
53-
use hyperactor_multiprocess::system_actor::SystemMessageClient;
54-
use hyperactor_multiprocess::system_actor::SystemSnapshotFilter;
55-
use hyperactor_multiprocess::system_actor::WorldStatus;
5647
use monarch_types::PickledPyObject;
5748
use pyo3::exceptions::PyRuntimeError;
5849
use pyo3::exceptions::PyValueError;
@@ -180,65 +171,6 @@ impl PyProc {
180171
pub fn new_from_proc(proc: Proc) -> Self {
181172
Self { inner: proc }
182173
}
183-
184-
/// Bootstrap a proc into the system at the provided bootstrap address.
185-
/// The proc will report to the system actor every
186-
/// [`supervision_update_interval_in_sec`] seconds.
187-
async fn bootstrap(
188-
proc_id: &str,
189-
bootstrap_addr: &str,
190-
supervision_update_interval_in_sec: u64,
191-
listen_addr: Option<String>,
192-
) -> Result<Self> {
193-
let proc_id: ProcId = proc_id.parse()?;
194-
let bootstrap_addr: ChannelAddr = bootstrap_addr.parse()?;
195-
let listen_addr = if let Some(listen_addr) = listen_addr {
196-
listen_addr.parse()?
197-
} else {
198-
ChannelAddr::any(bootstrap_addr.transport())
199-
};
200-
let chan = channel::dial(bootstrap_addr.clone())?;
201-
let system_sender = BoxedMailboxSender::new(MailboxClient::new(chan));
202-
let proc_forwarder =
203-
BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
204-
let proc = Proc::new_with_clock(
205-
proc_id.clone(),
206-
proc_forwarder,
207-
ClockKind::for_channel_addr(&bootstrap_addr),
208-
);
209-
210-
let system_supervision_ref: ActorRef<ProcSupervisor> =
211-
ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());
212-
213-
let bootstrap = ProcActor::bootstrap_for_proc(
214-
proc.clone().clone(),
215-
proc.clone()
216-
.proc_id()
217-
.world_id()
218-
.expect("proc must be ranked for world id")
219-
.clone(), // REFACTOR(marius): factor out world id
220-
listen_addr,
221-
bootstrap_addr.clone(),
222-
system_supervision_ref,
223-
Duration::from_secs(supervision_update_interval_in_sec),
224-
HashMap::new(),
225-
ProcLifecycleMode::Detached,
226-
)
227-
.await
228-
.inspect_err(|err| {
229-
tracing::error!("could not spawn proc actor for {}: {}", proc.proc_id(), err,);
230-
})?;
231-
232-
tokio::spawn(async move {
233-
tracing::info!(
234-
"proc actor for {} exited with status {}",
235-
proc_id,
236-
bootstrap.proc_actor.await
237-
);
238-
});
239-
240-
Ok(Self { inner: proc })
241-
}
242174
}
243175

244176
#[pyfunction]
@@ -252,19 +184,7 @@ pub fn init_proc(
252184
supervision_update_interval: u64,
253185
listen_addr: Option<String>,
254186
) -> PyResult<PyProc> {
255-
// TODO: support configuring supervision_update_interval in Python binding.
256-
let proc_id = proc_id.to_owned();
257-
let bootstrap_addr = bootstrap_addr.to_owned();
258-
signal_safe_block_on(py, async move {
259-
PyProc::bootstrap(
260-
&proc_id,
261-
&bootstrap_addr,
262-
supervision_update_interval,
263-
listen_addr,
264-
)
265-
.await
266-
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
267-
})?
187+
unimplemented!()
268188
}
269189

270190
#[pyclass(
@@ -548,14 +468,6 @@ impl<M: RemoteMessage> InstanceWrapper<M> {
548468

549469
if check_staleness > Duration::from_secs(5) {
550470
get_tokio_runtime().spawn(async move {
551-
let _ = check_actor_supervision_state(
552-
checker_instance,
553-
clock,
554-
signal_port,
555-
controller_id.clone(),
556-
controller_error_sender,
557-
)
558-
.await;
559471
let _ = checker_instance_handle.drain_and_stop();
560472
});
561473
self.last_controller_status_check = self.clock.system_time_now();
@@ -623,21 +535,6 @@ impl<M: RemoteMessage> InstanceWrapper<M> {
623535
Ok(messages)
624536
}
625537

626-
pub async fn world_status(
627-
&self,
628-
filter: SystemSnapshotFilter,
629-
) -> Result<HashMap<WorldId, WorldStatus>> {
630-
let snapshot = SYSTEM_ACTOR_REF.snapshot(&self.instance, filter).await?;
631-
632-
// TODO: pulling snapshot is expensive as it contains all proc details
633-
// We do not need those extra information.
634-
Ok(snapshot
635-
.worlds
636-
.into_iter()
637-
.map(|(k, v)| (k, v.status))
638-
.collect())
639-
}
640-
641538
pub fn instance(&self) -> &Instance<()> {
642539
&self.instance
643540
}
@@ -647,50 +544,6 @@ impl<M: RemoteMessage> InstanceWrapper<M> {
647544
}
648545
}
649546

650-
/// Check the supervision state of given actor from system actor. This will schedule itself to allow
651-
/// for periodic checks.
652-
async fn check_actor_supervision_state(
653-
instance: Instance<()>,
654-
clock: ClockKind,
655-
signal_port: PortHandle<Signal>,
656-
actor_id: ActorId,
657-
controller_error_sender: watch::Sender<String>,
658-
) -> Result<()> {
659-
match clock
660-
.timeout(
661-
// TODO: make the timeout configurable
662-
tokio::time::Duration::from_secs(10),
663-
SYSTEM_ACTOR_REF.state(&instance, WorldId(actor_id.world_name().into())),
664-
)
665-
.await
666-
{
667-
Ok(Ok(Some(world_state))) => {
668-
// Check if the controller has failed supervision heartbeats
669-
if let Some(proc_state) = world_state.procs.get(&actor_id.rank()) {
670-
if !matches!(proc_state.proc_health, ProcStatus::Alive) {
671-
tracing::error!("controller {:?} is not alive, aborting!", actor_id);
672-
// The controller is down, this only affects the mesh for the controller, other meshes
673-
// should be unaffected, so we'll raise a worker error for this failure.
674-
controller_error_sender
675-
.send(format!("controller {:?} is not alive", actor_id))?;
676-
}
677-
}
678-
}
679-
Ok(_) => {
680-
// The world isn't ready yet, we can safely ignore it
681-
}
682-
_ => {
683-
// Timeout happened, system actor is down. As there is only one system actor for all meshes,
684-
// client can't continue when system actor is down, so we stop the client here.
685-
// TODO: should allow for multiple attempts
686-
tracing::error!("system actor is not alive, aborting!");
687-
// Send a signal to the client to abort.
688-
signal_port.send(Signal::Stop).unwrap();
689-
}
690-
}
691-
Ok(())
692-
}
693-
694547
pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
695548
let f = wrap_pyfunction!(init_proc, hyperactor_mod)?;
696549
f.setattr(

0 commit comments

Comments
 (0)