Skip to content

Commit 3acd912

Browse files
thomasywangmeta-codesync[bot]
authored andcommitted
Create hyperactor_config crate (#2017)
Summary: Pull Request resolved: #2017 We want this to be reusable from any crate without taking a dependency on `hyperactor` to avoid circular dependencies. Reviewed By: dulinriley Differential Revision: D87890698 fbshipit-source-id: 2f821213eb8d544cf0fbc669df90abec16d42afb
1 parent 1dc5a6f commit 3acd912

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+919
-455
lines changed

hyperactor/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ async-trait = "0.1.86"
4242
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
4343
bincode = "1.3.3"
4444
bytes = { version = "1.10", features = ["serde"] }
45-
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
4645
cityhasher = "0.1.0"
4746
clap = { version = "4.5.42", features = ["derive", "env", "string", "unicode", "wrap_help"] }
4847
crc32fast = "1.4"
@@ -55,6 +54,7 @@ fastrand = "2.1.1"
5554
futures = { version = "0.3.31", features = ["async-await", "compat"] }
5655
hostname = "0.3"
5756
humantime = "2.1"
57+
hyperactor_config = { version = "0.0.0", path = "../hyperactor_config" }
5858
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
5959
hyperactor_named = { version = "0.0.0", path = "../hyperactor_named" }
6060
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
@@ -73,8 +73,6 @@ rustls-pemfile = "1.0.0"
7373
serde = { version = "1.0.219", features = ["derive", "rc"] }
7474
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
7575
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
76-
serde_yaml = "0.9.25"
77-
shell-quote = "0.7.2"
7876
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
7977
strum = { version = "0.27.1", features = ["derive"] }
8078
thiserror = "2.0.12"
@@ -94,6 +92,7 @@ indoc = "2.0.2"
9492
maplit = "1.0"
9593
proptest = "1.5"
9694
serde_bytes = "0.11"
95+
serde_yaml = "0.9.25"
9796
tempfile = "3.22"
9897
timed_test = { version = "0.0.0", path = "../timed_test" }
9998
tokio-test = "0.4.4"

hyperactor/src/accum.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct ReducerOpts {
5858
impl ReducerOpts {
5959
pub(crate) fn max_update_interval(&self) -> Duration {
6060
self.max_update_interval
61-
.unwrap_or(config::global::get(config::SPLIT_MAX_BUFFER_AGE))
61+
.unwrap_or(hyperactor_config::global::get(config::SPLIT_MAX_BUFFER_AGE))
6262
}
6363
}
6464

hyperactor/src/channel.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::str::FromStr;
2222

2323
use async_trait::async_trait;
2424
use enum_as_inner::EnumAsInner;
25+
use hyperactor_config::attrs::AttrValue;
2526
use lazy_static::lazy_static;
2627
use local_ip_address::local_ipv6;
2728
use serde::Deserialize;
@@ -33,7 +34,6 @@ use tokio::sync::watch;
3334
use crate as hyperactor;
3435
use crate::Named;
3536
use crate::RemoteMessage;
36-
use crate::attrs::AttrValue;
3737
use crate::channel::sim::SimAddr;
3838
use crate::simnet::SimNetError;
3939

@@ -1158,7 +1158,7 @@ mod tests {
11581158
// TODO: OSS: called `Result::unwrap()` on an `Err` value: Server(Listen(Tcp([::1]:0), Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" }))
11591159
#[cfg_attr(not(fbcode_build), ignore)]
11601160
async fn test_send() {
1161-
let config = crate::config::global::lock();
1161+
let config = hyperactor_config::global::lock();
11621162

11631163
// Use temporary config for this test
11641164
let _guard1 = config.override_key(

hyperactor/src/channel/net.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,7 +1029,7 @@ mod tests {
10291029
async fn test_tcp_message_size() {
10301030
let default_size_in_bytes = 100 * 1024 * 1024;
10311031
// Use temporary config for this test
1032-
let config = config::global::lock();
1032+
let config = hyperactor_config::global::lock();
10331033
let _guard1 = config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(1));
10341034
let _guard2 = config.override_key(config::CODEC_MAX_FRAME_LENGTH, default_size_in_bytes);
10351035

@@ -1057,7 +1057,7 @@ mod tests {
10571057
// TODO: OSS: called `Result::unwrap()` on an `Err` value: Listen(Tcp([::1]:0), Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" })
10581058
#[cfg_attr(not(fbcode_build), ignore)]
10591059
async fn test_ack_flush() {
1060-
let config = config::global::lock();
1060+
let config = hyperactor_config::global::lock();
10611061
// Set a large value to effectively prevent acks from being sent except
10621062
// during shutdown flush.
10631063
let _guard_message_ack =
@@ -1340,7 +1340,7 @@ mod tests {
13401340
}
13411341
}
13421342
}
1343-
let mut fw = FrameWrite::new(writer, data, config::global::get(config::CODEC_MAX_FRAME_LENGTH)).unwrap();
1343+
let mut fw = FrameWrite::new(writer, data, hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH)).unwrap();
13441344
if fw.send().await.is_err() {
13451345
break;
13461346
}
@@ -1365,7 +1365,7 @@ mod tests {
13651365
let (server_r, server_writer) = tokio::io::split(server_relay);
13661366
let (client_r, client_writer) = tokio::io::split(client_relay);
13671367

1368-
let max_len = config::global::get(config::CODEC_MAX_FRAME_LENGTH);
1368+
let max_len = hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH);
13691369
let server_reader = FrameReader::new(server_r, max_len);
13701370
let client_reader = FrameReader::new(client_r, max_len);
13711371

@@ -1469,7 +1469,10 @@ mod tests {
14691469
let join_handle =
14701470
tokio::spawn(async move { manager1.serve(conn, tx, cancel_token_1).await });
14711471
let (r, writer) = tokio::io::split(sender);
1472-
let reader = FrameReader::new(r, config::global::get(config::CODEC_MAX_FRAME_LENGTH));
1472+
let reader = FrameReader::new(
1473+
r,
1474+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1475+
);
14731476
(join_handle, reader, writer, rx, cancel_token)
14741477
}
14751478

@@ -1489,7 +1492,7 @@ mod tests {
14891492
let mut fw = FrameWrite::new(
14901493
writer,
14911494
message.framed(),
1492-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1495+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
14931496
)
14941497
.map_err(|(_w, e)| e)
14951498
.unwrap();
@@ -1504,7 +1507,7 @@ mod tests {
15041507
let mut fw = FrameWrite::new(
15051508
writer,
15061509
message.framed(),
1507-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1510+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
15081511
)
15091512
.map_err(|(_w, e)| e)
15101513
.unwrap();
@@ -1518,7 +1521,7 @@ mod tests {
15181521
#[async_timed_test(timeout_secs = 60)]
15191522
async fn test_persistent_server_session() {
15201523
// Use temporary config for this test
1521-
let config = config::global::lock();
1524+
let config = hyperactor_config::global::lock();
15221525
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
15231526

15241527
async fn verify_ack(reader: &mut FrameReader<ReadHalf<DuplexStream>>, expected_last: u64) {
@@ -1624,7 +1627,7 @@ mod tests {
16241627

16251628
#[async_timed_test(timeout_secs = 60)]
16261629
async fn test_ack_from_server_session() {
1627-
let config = config::global::lock();
1630+
let config = hyperactor_config::global::lock();
16281631
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
16291632
let manager = SessionManager::new();
16301633
let session_id = 123u64;
@@ -1689,7 +1692,7 @@ mod tests {
16891692
let link = MockLink::<u64>::fail_connects();
16901693
let tx = super::dial::<u64>(link);
16911694
// Override the default (1m) for the purposes of this test.
1692-
let config = config::global::lock();
1695+
let config = hyperactor_config::global::lock();
16931696
let _guard = config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(1));
16941697
let mut tx_receiver = tx.status().clone();
16951698
let (return_channel, _return_receiver) = oneshot::channel();
@@ -1702,7 +1705,10 @@ mod tests {
17021705
) -> (FrameReader<ReadHalf<DuplexStream>>, WriteHalf<DuplexStream>) {
17031706
let receiver = receiver_storage.take().await;
17041707
let (r, writer) = tokio::io::split(receiver);
1705-
let reader = FrameReader::new(r, config::global::get(config::CODEC_MAX_FRAME_LENGTH));
1708+
let reader = FrameReader::new(
1709+
r,
1710+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1711+
);
17061712
(reader, writer)
17071713
}
17081714

@@ -2057,7 +2063,7 @@ mod tests {
20572063

20582064
async fn verify_ack_exceeded_limit(disconnect_before_ack: bool) {
20592065
// Use temporary config for this test
2060-
let config = config::global::lock();
2066+
let config = hyperactor_config::global::lock();
20612067
let _guard = config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(2));
20622068

20632069
let link: MockLink<u64> = MockLink::<u64>::new();
@@ -2075,7 +2081,7 @@ mod tests {
20752081
let _ = FrameWrite::write_frame(
20762082
writer,
20772083
serialize_response(NetRxResponse::Ack(0)).unwrap(),
2078-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
2084+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
20792085
)
20802086
.await
20812087
.map_err(|(_, e)| e)
@@ -2192,7 +2198,7 @@ mod tests {
21922198

21932199
#[async_timed_test(timeout_secs = 60)]
21942200
async fn test_ack_every_n_messages() {
2195-
let config = config::global::lock();
2201+
let config = hyperactor_config::global::lock();
21962202
let _guard_message_ack = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 600);
21972203
let _guard_time_interval =
21982204
config.override_key(config::MESSAGE_ACK_TIME_INTERVAL, Duration::from_secs(1000));
@@ -2201,7 +2207,7 @@ mod tests {
22012207

22022208
#[async_timed_test(timeout_secs = 60)]
22032209
async fn test_ack_every_time_interval() {
2204-
let config = config::global::lock();
2210+
let config = hyperactor_config::global::lock();
22052211
let _guard_message_ack =
22062212
config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 100000000);
22072213
let _guard_time_interval = config.override_key(
@@ -2292,7 +2298,7 @@ mod tests {
22922298
// TODO: OSS: called `Result::unwrap()` on an `Err` value: Listen(Tcp([::1]:0), Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" })
22932299
#[cfg_attr(not(fbcode_build), ignore)]
22942300
async fn test_tcp_throughput() {
2295-
let config = config::global::lock();
2301+
let config = hyperactor_config::global::lock();
22962302
let _guard =
22972303
config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(300));
22982304

@@ -2369,7 +2375,7 @@ mod tests {
23692375

23702376
#[async_timed_test(timeout_secs = 60)]
23712377
async fn test_server_rejects_conn_on_out_of_sequence_message() {
2372-
let config = config::global::lock();
2378+
let config = hyperactor_config::global::lock();
23732379
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
23742380
let manager = SessionManager::new();
23752381
let session_id = 123u64;
@@ -2400,7 +2406,7 @@ mod tests {
24002406
async fn test_stop_net_tx_after_stopping_net_rx() {
24012407
hyperactor_telemetry::initialize_logging_for_test();
24022408

2403-
let config = config::global::lock();
2409+
let config = hyperactor_config::global::lock();
24042410
let _guard =
24052411
config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(300));
24062412
let (addr, mut rx) = tcp::serve::<u64>("[::1]:0".parse().unwrap()).unwrap();

hyperactor/src/channel/net/client.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ impl<'a, M: RemoteMessage> Outbox<'a, M> {
228228
match self.deque.front() {
229229
None => false,
230230
Some(msg) => {
231-
msg.received_at.elapsed() > config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
231+
msg.received_at.elapsed()
232+
> hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
232233
}
233234
}
234235
}
@@ -436,7 +437,7 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
436437
fn is_expired(&self) -> bool {
437438
matches!(
438439
self.deque.front(),
439-
Some(msg) if msg.received_at.elapsed() > config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
440+
Some(msg) if msg.received_at.elapsed() > hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
440441
)
441442
}
442443

@@ -448,7 +449,8 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
448449
Some(msg) => {
449450
RealClock
450451
.sleep_until(
451-
msg.received_at + config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
452+
msg.received_at
453+
+ hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
452454
)
453455
.await
454456
}
@@ -866,7 +868,7 @@ where
866868
..
867869
},
868870
) if !outbox.is_empty() => {
869-
let max = config::global::get(config::CODEC_MAX_FRAME_LENGTH);
871+
let max = hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH);
870872
let len = outbox.front_size().expect("not empty");
871873
let message = outbox.front_message().expect("not empty");
872874

@@ -1000,7 +1002,7 @@ where
10001002
_ = unacked.wait_for_timeout(), if !unacked.is_empty() => {
10011003
let error_msg = format!(
10021004
"failed to receive ack within timeout {:?}; link is currently connected",
1003-
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
1005+
hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
10041006
);
10051007
tracing::error!(
10061008
dest = %link.dest(),
@@ -1099,7 +1101,7 @@ where
10991101
if outbox.is_expired() {
11001102
let error_msg = format!(
11011103
"failed to deliver message within timeout {:?}",
1102-
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
1104+
hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
11031105
);
11041106
tracing::error!(
11051107
dest = %link.dest(),
@@ -1116,7 +1118,7 @@ where
11161118
} else if unacked.is_expired() {
11171119
let error_msg = format!(
11181120
"failed to receive ack within timeout {:?}; link is currently broken",
1119-
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
1121+
hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
11201122
);
11211123
tracing::error!(
11221124
dest = %link.dest(),
@@ -1140,7 +1142,7 @@ where
11401142
let mut write = FrameWrite::new(
11411143
stream,
11421144
message.framed(),
1143-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1145+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
11441146
)
11451147
.expect("enough length");
11461148
let initialized = write.send().await.is_ok();
@@ -1183,7 +1185,9 @@ where
11831185
Conn::Connected {
11841186
reader: FrameReader::new(
11851187
reader,
1186-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1188+
hyperactor_config::global::get(
1189+
config::CODEC_MAX_FRAME_LENGTH,
1190+
),
11871191
),
11881192
write_state: WriteState::Idle(writer),
11891193
}

hyperactor/src/channel/net/server.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ impl<S: AsyncRead + AsyncWrite> ServerConn<S> {
5858
pub(super) fn new(stream: S, source: ChannelAddr, dest: ChannelAddr) -> Self {
5959
let (reader, writer) = tokio::io::split(stream);
6060
Self {
61-
reader: FrameReader::new(reader, config::global::get(config::CODEC_MAX_FRAME_LENGTH)),
61+
reader: FrameReader::new(
62+
reader,
63+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
64+
),
6265
write_state: WriteState::Idle(writer),
6366
source,
6467
dest,
@@ -102,8 +105,8 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
102105
let mut rcv_raw_frame_count = 0u64;
103106
let mut last_ack_time = RealClock.now();
104107

105-
let ack_time_interval = config::global::get(config::MESSAGE_ACK_TIME_INTERVAL);
106-
let ack_msg_interval = config::global::get(config::MESSAGE_ACK_EVERY_N_MESSAGES);
108+
let ack_time_interval = hyperactor_config::global::get(config::MESSAGE_ACK_TIME_INTERVAL);
109+
let ack_msg_interval = hyperactor_config::global::get(config::MESSAGE_ACK_EVERY_N_MESSAGES);
107110

108111
let (mut final_next, final_result, reject_conn) = loop {
109112
if self.write_state.is_idle()
@@ -128,7 +131,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
128131
match FrameWrite::new(
129132
writer,
130133
ack,
131-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
134+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
132135
) {
133136
Ok(fw) => {
134137
self.write_state = WriteState::Writing(fw, next.seq);
@@ -371,7 +374,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
371374
let ack = serialize_response(NetRxResponse::Ack(final_next.seq - 1))
372375
.map_err(anyhow::Error::from)?;
373376

374-
let max = config::global::get(config::CODEC_MAX_FRAME_LENGTH);
377+
let max = hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH);
375378
let fw =
376379
FrameWrite::new(writer, ack, max).map_err(|(_, e)| anyhow::Error::from(e))?;
377380
self.write_state = WriteState::Writing(fw, final_next.seq);
@@ -412,7 +415,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
412415
match FrameWrite::new(
413416
writer,
414417
data,
415-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
418+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
416419
) {
417420
Ok(fw) => {
418421
self.write_state = WriteState::Writing(fw, 0);
@@ -464,7 +467,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
464467
permit_result?.send(message);
465468
return Ok(())
466469
}
467-
_ = RealClock.sleep(config::global::get(config::CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL)) => {
470+
_ = RealClock.sleep(hyperactor_config::global::get(config::CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL)) => {
468471
// When buffer is full too long, we log it.
469472
metrics::CHANNEL_NET_RX_BUFFER_FULL.add(
470473
1,

hyperactor/src/channel/sim.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,11 +397,11 @@ impl<M: RemoteMessage> Rx<M> for SimRx<M> {
397397
mod tests {
398398
use std::iter::zip;
399399

400+
use hyperactor_config::attrs::Attrs;
400401
use ndslice::extent;
401402

402403
use super::*;
403404
use crate::PortId;
404-
use crate::attrs::Attrs;
405405
use crate::clock::Clock;
406406
use crate::clock::RealClock;
407407
use crate::clock::SimClock;

0 commit comments

Comments
 (0)