Skip to content

Commit 95cbbe8

Browse files
zdevitometa-codesync[bot]
authored andcommitted
Startup 3.5x faster (#2099)
Summary: Pull Request resolved: #2099 Fixing various simple things that claude helped find. 1.7 seconds --> .5 seconds. * Don't start the exponential backoff for initial connections at 50ms. We hit these like 20 times in a row and it adds up to 1/2 second! * We need to rethink the batched aggregation interval. We probably need a simple predictor that assumes the average time between messages in `inf` for the first message, and just sends it, that way we do not add another 50 ms on every process start. * Do not import ipython to check if ipython is active. If it isn't active this adds 1/2 second to startup time because it happens in three processes. ghstack-source-id: 328614676 exported-using-ghexport Reviewed By: mariusae Differential Revision: D88810033 fbshipit-source-id: 0b23387f4f551e706ab1bb86f0b2e88572c44d87
1 parent 0c7e815 commit 95cbbe8

File tree

13 files changed

+112
-52
lines changed

13 files changed

+112
-52
lines changed

hyperactor/src/accum.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,22 @@ pub struct ReducerOpts {
5353
/// The maximum interval between updates. When unspecified, a default
5454
/// interval is used.
5555
pub max_update_interval: Option<Duration>,
56+
/// The initial interval for the first update. When unspecified, defaults to 1ms.
57+
/// This allows quick flushing of single messages while using exponential backoff
58+
/// to reach max_update_interval for batched messages.
59+
pub initial_update_interval: Option<Duration>,
5660
}
5761

5862
impl ReducerOpts {
5963
pub(crate) fn max_update_interval(&self) -> Duration {
6064
self.max_update_interval
6165
.unwrap_or(hyperactor_config::global::get(config::SPLIT_MAX_BUFFER_AGE))
6266
}
67+
68+
pub(crate) fn initial_update_interval(&self) -> Duration {
69+
self.initial_update_interval
70+
.unwrap_or(Duration::from_millis(1))
71+
}
6372
}
6473

6574
/// Commutative reducer for an accumulator. This is used to coallesce updates.

hyperactor/src/channel/net/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ impl<S: Stream> Conn<S> {
542542
fn reconnect_with_default() -> Self {
543543
Self::Disconnected(Box::new(
544544
ExponentialBackoffBuilder::new()
545-
.with_initial_interval(Duration::from_millis(50))
545+
.with_initial_interval(Duration::from_millis(1))
546546
.with_multiplier(2.0)
547547
.with_randomization_factor(0.1)
548548
.with_max_interval(Duration::from_millis(1000))

hyperactor/src/context.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use std::sync::Mutex;
2020
use std::sync::OnceLock;
2121

2222
use async_trait::async_trait;
23+
use backoff::ExponentialBackoffBuilder;
24+
use backoff::backoff::Backoff;
2325
use dashmap::DashSet;
2426
use hyperactor_config::attrs::Attrs;
2527

@@ -191,6 +193,19 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
191193

192194
// Default to global configuration if not specified.
193195
let reducer_opts = reducer_opts.unwrap_or_default();
196+
let max_interval = reducer_opts.max_update_interval();
197+
let initial_interval = reducer_opts.initial_update_interval();
198+
199+
// Create exponential backoff for buffer flush interval, starting at
200+
// initial_interval and growing to max_interval
201+
let backoff = Mutex::new(
202+
ExponentialBackoffBuilder::new()
203+
.with_initial_interval(initial_interval)
204+
.with_multiplier(2.0)
205+
.with_max_interval(max_interval)
206+
.with_max_elapsed_time(None)
207+
.build(),
208+
);
194209

195210
Box::new(move |update: Serialized| {
196211
// Hold the lock until messages are sent. This is to avoid another
@@ -199,16 +214,12 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
199214
//
200215
// We also always acquire alarm *after* the buffer, to avoid deadlocks.
201216
let mut buf = buffer.lock().unwrap();
202-
let was_empty = buf.is_empty();
203217
match buf.push(update) {
204-
None if was_empty => {
205-
alarm
206-
.lock()
207-
.unwrap()
208-
.arm(reducer_opts.max_update_interval());
218+
None => {
219+
let interval = backoff.lock().unwrap().next_backoff().unwrap();
220+
alarm.lock().unwrap().rearm(interval);
209221
Ok(())
210222
}
211-
None => Ok(()),
212223
Some(Ok(reduced)) => {
213224
alarm.lock().unwrap().disarm();
214225
post(&mailbox, port_id.clone(), reduced, return_undeliverable);

hyperactor/src/mailbox.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3497,7 +3497,17 @@ mod tests {
34973497
let port_id = port_handle.bind().port_id().clone();
34983498
// Split it
34993499
let reducer_spec = accum::sum::<u64>().reducer_spec();
3500-
let split_port_id = port_id.split(&actor, reducer_spec, None, true).unwrap();
3500+
let split_port_id = port_id
3501+
.split(
3502+
&actor,
3503+
reducer_spec,
3504+
Some(ReducerOpts {
3505+
max_update_interval: Some(Duration::from_mins(10)),
3506+
initial_update_interval: Some(Duration::from_mins(10)),
3507+
}),
3508+
true,
3509+
)
3510+
.unwrap();
35013511

35023512
// Send 9 messages.
35033513
for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
@@ -3535,6 +3545,7 @@ mod tests {
35353545
Some(accum::sum::<u64>().reducer_spec().unwrap()),
35363546
Some(ReducerOpts {
35373547
max_update_interval: Some(Duration::from_millis(50)),
3548+
initial_update_interval: Some(Duration::from_millis(50)),
35383549
}),
35393550
)
35403551
.await;
@@ -3578,6 +3589,7 @@ mod tests {
35783589
Some(accum::sum::<u64>().reducer_spec().unwrap()),
35793590
Some(ReducerOpts {
35803591
max_update_interval: Some(Duration::from_millis(50)),
3592+
initial_update_interval: Some(Duration::from_millis(50)),
35813593
}),
35823594
)
35833595
.await;

hyperactor/src/time.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ pub struct Alarm {
3535
}
3636
enum AlarmStatus {
3737
Unarmed,
38-
Armed { version: usize, deadline: Instant },
38+
Armed {
39+
version: usize,
40+
deadline: Instant,
41+
armed_at: Instant,
42+
},
3943
Dropped,
4044
}
4145

@@ -52,15 +56,31 @@ impl Alarm {
5256
/// Arm the alarm to fire after the provided duration.
5357
pub fn arm(&mut self, duration: Duration) {
5458
let mut status = self.status.lock().unwrap();
59+
let armed_at = RealClock.now();
5560
*status = AlarmStatus::Armed {
5661
version: self.version,
57-
deadline: RealClock.now() + duration,
62+
deadline: armed_at + duration,
63+
armed_at,
5864
};
5965
drop(status);
6066
self.notify.notify_waiters();
6167
self.version += 1;
6268
}
6369

70+
/// Arm the alarm to fire `duration` after the alarm was originally armed.
71+
/// This behaves as if arm was called originally with the new duration.
72+
/// If it has not been armed it behaves the same as arm.
73+
pub fn rearm(&mut self, duration: Duration) {
74+
let remaining = match *self.status.lock().unwrap() {
75+
AlarmStatus::Armed { armed_at, .. } => {
76+
let elapsed = RealClock.now() - armed_at;
77+
duration.saturating_sub(elapsed)
78+
}
79+
AlarmStatus::Unarmed | AlarmStatus::Dropped => duration,
80+
};
81+
self.arm(remaining);
82+
}
83+
6484
/// Disarm the alarm, canceling any pending alarms.
6585
pub fn disarm(&mut self) {
6686
let mut status = self.status.lock().unwrap();
@@ -123,13 +143,16 @@ impl AlarmSleeper {
123143
AlarmStatus::Dropped => return false,
124144
AlarmStatus::Unarmed => None,
125145
AlarmStatus::Armed { version, .. } if version < self.min_version => None,
126-
AlarmStatus::Armed { version, deadline } if RealClock.now() >= deadline => {
146+
AlarmStatus::Armed {
147+
version, deadline, ..
148+
} if RealClock.now() >= deadline => {
127149
self.min_version = version + 1;
128150
return true;
129151
}
130152
AlarmStatus::Armed {
131153
version: _,
132154
deadline,
155+
..
133156
} => Some(deadline),
134157
};
135158

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,7 @@ impl HostMeshRef {
804804
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
805805
Some(ReducerOpts {
806806
max_update_interval: Some(Duration::from_millis(50)),
807+
initial_update_interval: None,
807808
}),
808809
);
809810

@@ -988,6 +989,7 @@ impl HostMeshRef {
988989
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
989990
Some(ReducerOpts {
990991
max_update_interval: Some(Duration::from_millis(50)),
992+
initial_update_interval: None,
991993
}),
992994
);
993995
for proc_id in procs.into_iter() {

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,7 @@ impl ProcMeshRef {
984984
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
985985
Some(ReducerOpts {
986986
max_update_interval: Some(Duration::from_millis(50)),
987+
initial_update_interval: None,
987988
}),
988989
);
989990

@@ -1114,6 +1115,7 @@ impl ProcMeshRef {
11141115
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
11151116
Some(ReducerOpts {
11161117
max_update_interval: Some(Duration::from_millis(50)),
1118+
initial_update_interval: None,
11171119
}),
11181120
);
11191121
agent_mesh.cast(

python/monarch/_src/actor/__init__.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,11 @@
1010
Monarch Actor API
1111
"""
1212

13+
from monarch._src.actor.ipython_check import is_ipython
1314
from monarch.config import configure
1415

1516
# Detect if we're running in IPython/Jupyter
16-
_in_ipython = False
17-
try:
18-
# pyre-ignore[21]
19-
from IPython import get_ipython
20-
21-
_in_ipython = get_ipython() is not None
22-
except ImportError:
23-
pass
17+
_in_ipython: bool = is_ipython()
2418

2519
# Set notebook-friendly defaults for stdio piping when spawning procs.
2620
# These config is read by:
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
# pyre-strict
8+
9+
"""
10+
Fast IPython detection by checking sys.modules instead of importing.
11+
"""
12+
13+
import sys
14+
15+
16+
def is_ipython() -> bool:
17+
"""
18+
Check if code is running in an IPython/Jupyter environment.
19+
Avoids slow IPython import by checking sys.modules first.
20+
"""
21+
if "IPython" not in sys.modules:
22+
return False
23+
24+
# pyre-ignore[16]: get_ipython exists in IPython module
25+
get_ipython = sys.modules["IPython"].get_ipython
26+
return get_ipython() is not None

python/monarch/_src/actor/logging.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,9 @@
2121
)
2222
from monarch._src.actor.actor_mesh import context
2323
from monarch._src.actor.future import Future
24+
from monarch._src.actor.ipython_check import is_ipython
2425

25-
IN_IPYTHON = False
26-
try:
27-
# Check if we are in ipython environment
28-
# pyre-ignore[21]
29-
from IPython import get_ipython
30-
31-
# pyre-ignore[21]
32-
from IPython.core.interactiveshell import ExecutionResult # noqa: F401
33-
34-
IN_IPYTHON = get_ipython() is not None
35-
except ImportError:
36-
pass
26+
IN_IPYTHON: bool = is_ipython()
3727

3828
logger: logging.Logger = logging.getLogger(__name__)
3929

@@ -88,7 +78,12 @@ def register_flusher_if_in_ipython(self) -> None:
8878
with _global_flush_lock:
8979
global _global_flush_registered
9080
if not _global_flush_registered:
91-
get_ipython().events.register(
81+
# pyre-ignore[21]: IPython is already loaded if IN_IPYTHON is True
82+
from IPython import get_ipython
83+
84+
ipython = get_ipython()
85+
assert ipython is not None
86+
ipython.events.register(
9287
"post_run_cell",
9388
lambda _: flush_all_proc_mesh_logs(
9489
self._logging_mesh_client is not None

0 commit comments

Comments
 (0)