Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hyperactor/src/accum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,22 @@ pub struct ReducerOpts {
/// The maximum interval between updates. When unspecified, a default
/// interval is used.
pub max_update_interval: Option<Duration>,
/// The initial interval for the first update. When unspecified, defaults to 1ms.
/// This allows quick flushing of single messages while using exponential backoff
/// to reach max_update_interval for batched messages.
pub initial_update_interval: Option<Duration>,
}

impl ReducerOpts {
pub(crate) fn max_update_interval(&self) -> Duration {
self.max_update_interval
.unwrap_or(hyperactor_config::global::get(config::SPLIT_MAX_BUFFER_AGE))
}

pub(crate) fn initial_update_interval(&self) -> Duration {
self.initial_update_interval
.unwrap_or(Duration::from_millis(1))
}
}

/// Commutative reducer for an accumulator. This is used to coallesce updates.
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/channel/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ impl<S: Stream> Conn<S> {
fn reconnect_with_default() -> Self {
Self::Disconnected(Box::new(
ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(50))
.with_initial_interval(Duration::from_millis(1))
.with_multiplier(2.0)
.with_randomization_factor(0.1)
.with_max_interval(Duration::from_millis(1000))
Expand Down
25 changes: 18 additions & 7 deletions hyperactor/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Mutex;
use std::sync::OnceLock;

use async_trait::async_trait;
use backoff::ExponentialBackoffBuilder;
use backoff::backoff::Backoff;
use dashmap::DashSet;
use hyperactor_config::attrs::Attrs;

Expand Down Expand Up @@ -191,6 +193,19 @@ impl<T: Actor + Send + Sync> MailboxExt for T {

// Default to global configuration if not specified.
let reducer_opts = reducer_opts.unwrap_or_default();
let max_interval = reducer_opts.max_update_interval();
let initial_interval = reducer_opts.initial_update_interval();

// Create exponential backoff for buffer flush interval, starting at
// initial_interval and growing to max_interval
let backoff = Mutex::new(
ExponentialBackoffBuilder::new()
.with_initial_interval(initial_interval)
.with_multiplier(2.0)
.with_max_interval(max_interval)
.with_max_elapsed_time(None)
.build(),
);

Box::new(move |update: Serialized| {
// Hold the lock until messages are sent. This is to avoid another
Expand All @@ -199,16 +214,12 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
//
// We also always acquire alarm *after* the buffer, to avoid deadlocks.
let mut buf = buffer.lock().unwrap();
let was_empty = buf.is_empty();
match buf.push(update) {
None if was_empty => {
alarm
.lock()
.unwrap()
.arm(reducer_opts.max_update_interval());
None => {
let interval = backoff.lock().unwrap().next_backoff().unwrap();
alarm.lock().unwrap().rearm(interval);
Ok(())
}
None => Ok(()),
Some(Ok(reduced)) => {
alarm.lock().unwrap().disarm();
post(&mailbox, port_id.clone(), reduced, return_undeliverable);
Expand Down
14 changes: 13 additions & 1 deletion hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,17 @@ mod tests {
let port_id = port_handle.bind().port_id().clone();
// Split it
let reducer_spec = accum::sum::<u64>().reducer_spec();
let split_port_id = port_id.split(&actor, reducer_spec, None, true).unwrap();
let split_port_id = port_id
.split(
&actor,
reducer_spec,
Some(ReducerOpts {
max_update_interval: Some(Duration::from_mins(10)),
initial_update_interval: Some(Duration::from_mins(10)),
}),
true,
)
.unwrap();

// Send 9 messages.
for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
Expand Down Expand Up @@ -3535,6 +3545,7 @@ mod tests {
Some(accum::sum::<u64>().reducer_spec().unwrap()),
Some(ReducerOpts {
max_update_interval: Some(Duration::from_millis(50)),
initial_update_interval: Some(Duration::from_millis(50)),
}),
)
.await;
Expand Down Expand Up @@ -3578,6 +3589,7 @@ mod tests {
Some(accum::sum::<u64>().reducer_spec().unwrap()),
Some(ReducerOpts {
max_update_interval: Some(Duration::from_millis(50)),
initial_update_interval: Some(Duration::from_millis(50)),
}),
)
.await;
Expand Down
29 changes: 26 additions & 3 deletions hyperactor/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ pub struct Alarm {
}
enum AlarmStatus {
Unarmed,
Armed { version: usize, deadline: Instant },
Armed {
version: usize,
deadline: Instant,
armed_at: Instant,
},
Dropped,
}

Expand All @@ -52,15 +56,31 @@ impl Alarm {
/// Arm the alarm to fire after the provided duration.
pub fn arm(&mut self, duration: Duration) {
let mut status = self.status.lock().unwrap();
let armed_at = RealClock.now();
*status = AlarmStatus::Armed {
version: self.version,
deadline: RealClock.now() + duration,
deadline: armed_at + duration,
armed_at,
};
drop(status);
self.notify.notify_waiters();
self.version += 1;
}

/// Arm the alarm to fire `duration` after the alarm was originally armed.
/// This behaves as if arm was called originally with the new duration.
/// If it has not been armed it behaves the same as arm.
pub fn rearm(&mut self, duration: Duration) {
let remaining = match *self.status.lock().unwrap() {
AlarmStatus::Armed { armed_at, .. } => {
let elapsed = RealClock.now() - armed_at;
duration.saturating_sub(elapsed)
}
AlarmStatus::Unarmed | AlarmStatus::Dropped => duration,
};
self.arm(remaining);
}

/// Disarm the alarm, canceling any pending alarms.
pub fn disarm(&mut self) {
let mut status = self.status.lock().unwrap();
Expand Down Expand Up @@ -123,13 +143,16 @@ impl AlarmSleeper {
AlarmStatus::Dropped => return false,
AlarmStatus::Unarmed => None,
AlarmStatus::Armed { version, .. } if version < self.min_version => None,
AlarmStatus::Armed { version, deadline } if RealClock.now() >= deadline => {
AlarmStatus::Armed {
version, deadline, ..
} if RealClock.now() >= deadline => {
self.min_version = version + 1;
return true;
}
AlarmStatus::Armed {
version: _,
deadline,
..
} => Some(deadline),
};

Expand Down
2 changes: 2 additions & 0 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ impl HostMeshRef {
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
Some(ReducerOpts {
max_update_interval: Some(Duration::from_millis(50)),
initial_update_interval: None,
}),
);

Expand Down Expand Up @@ -988,6 +989,7 @@ impl HostMeshRef {
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
Some(ReducerOpts {
max_update_interval: Some(Duration::from_millis(50)),
initial_update_interval: None,
}),
);
for proc_id in procs.into_iter() {
Expand Down
2 changes: 2 additions & 0 deletions hyperactor_mesh/src/v1/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ impl ProcMeshRef {
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
Some(ReducerOpts {
max_update_interval: Some(Duration::from_millis(50)),
initial_update_interval: None,
}),
);

Expand Down Expand Up @@ -1114,6 +1115,7 @@ impl ProcMeshRef {
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
Some(ReducerOpts {
max_update_interval: Some(Duration::from_millis(50)),
initial_update_interval: None,
}),
);
agent_mesh.cast(
Expand Down
10 changes: 2 additions & 8 deletions python/monarch/_src/actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@
Monarch Actor API
"""

from monarch._src.actor.ipython_check import is_ipython
from monarch.config import configure

# Detect if we're running in IPython/Jupyter
_in_ipython = False
try:
# pyre-ignore[21]
from IPython import get_ipython

_in_ipython = get_ipython() is not None
except ImportError:
pass
_in_ipython: bool = is_ipython()

# Set notebook-friendly defaults for stdio piping when spawning procs.
# These config is read by:
Expand Down
26 changes: 26 additions & 0 deletions python/monarch/_src/actor/ipython_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

"""
Fast IPython detection by checking sys.modules instead of importing.
"""

import sys


def is_ipython() -> bool:
"""
Check if code is running in an IPython/Jupyter environment.
Avoids slow IPython import by checking sys.modules first.
"""
if "IPython" not in sys.modules:
return False

# pyre-ignore[16]: get_ipython exists in IPython module
get_ipython = sys.modules["IPython"].get_ipython
return get_ipython() is not None
21 changes: 8 additions & 13 deletions python/monarch/_src/actor/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,9 @@
)
from monarch._src.actor.actor_mesh import context
from monarch._src.actor.future import Future
from monarch._src.actor.ipython_check import is_ipython

IN_IPYTHON = False
try:
# Check if we are in ipython environment
# pyre-ignore[21]
from IPython import get_ipython

# pyre-ignore[21]
from IPython.core.interactiveshell import ExecutionResult # noqa: F401

IN_IPYTHON = get_ipython() is not None
except ImportError:
pass
IN_IPYTHON: bool = is_ipython()

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -88,7 +78,12 @@ def register_flusher_if_in_ipython(self) -> None:
with _global_flush_lock:
global _global_flush_registered
if not _global_flush_registered:
get_ipython().events.register(
# pyre-ignore[21]: IPython is already loaded if IN_IPYTHON is True
from IPython import get_ipython

ipython = get_ipython()
assert ipython is not None
ipython.events.register(
"post_run_cell",
lambda _: flush_all_proc_mesh_logs(
self._logging_mesh_client is not None
Expand Down
16 changes: 1 addition & 15 deletions python/monarch/controller/debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,8 @@

# pyre-strict
import sys
from typing import Optional

_is_ipython: Optional[bool] = None


def is_ipython() -> bool:
global _is_ipython
if _is_ipython is not None:
return _is_ipython
try:
from IPython import get_ipython

_is_ipython = get_ipython() is not None
except ImportError:
_is_ipython = False
return _is_ipython
from monarch._src.actor.ipython_check import is_ipython


def write(msg: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions python/tests/_monarch/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def test_init_initializes_logging_mesh_client_to_none(self) -> None:
# Assert: confirm that _logging_mesh_client is initialized to None
self.assertIsNone(manager._logging_mesh_client)

@pytest.mark.oss_skip # type: ignore: monarch._src.actor.logging.get_ipython doesn't exist in OSS CI
@pytest.mark.oss_skip # type: ignore: IPython.get_ipython doesn't exist in OSS CI
@patch("monarch._src.actor.logging.IN_IPYTHON", True)
@patch("monarch._src.actor.logging.get_ipython")
@patch("IPython.get_ipython")
@patch("monarch._src.actor.logging._global_flush_registered", False)
def test_register_flusher_if_in_ipython_registers_event(
self, mock_get_ipython: Mock
Expand Down
4 changes: 2 additions & 2 deletions python/tests/test_python_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ async def test_flush_called_only_once() -> None:
):
mock_ipython = MockIPython()
with unittest.mock.patch(
"monarch._src.actor.logging.get_ipython",
"IPython.get_ipython",
lambda: mock_ipython,
), unittest.mock.patch(
"monarch._src.actor.logging.IN_IPYTHON", True
Expand Down Expand Up @@ -880,7 +880,7 @@ async def test_flush_logs_ipython() -> None:
mock_ipython = MockIPython()

with unittest.mock.patch(
"monarch._src.actor.logging.get_ipython",
"IPython.get_ipython",
lambda: mock_ipython,
), unittest.mock.patch("monarch._src.actor.logging.IN_IPYTHON", True):
# Make sure we can register and unregister callbacks
Expand Down