Skip to content

Commit b16ad36

Browse files
authored
Merge pull request #22 from rivet-dev/nathan/warm-isolate-pool-v2
chore: warm isolate pool
2 parents c462089 + 22cdcde commit b16ad36

17 files changed

Lines changed: 1091 additions & 174 deletions

File tree

crates/v8-runtime/src/ipc_binary.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ const MSG_BRIDGE_RESPONSE: u8 = 0x06;
2525
const MSG_STREAM_EVENT: u8 = 0x07;
2626
const MSG_TERMINATE_EXECUTION: u8 = 0x08;
2727
const MSG_WARM_SNAPSHOT: u8 = 0x09;
28+
const MSG_INIT: u8 = 0x0B;
2829

2930
// Rust → Host message type codes
3031
const MSG_BRIDGE_CALL: u8 = 0x81;
3132
const MSG_EXECUTION_RESULT: u8 = 0x82;
3233
const MSG_LOG: u8 = 0x83;
3334
const MSG_STREAM_CALLBACK: u8 = 0x84;
35+
const MSG_INIT_READY: u8 = 0x8C;
3436

3537
// ExecutionResult flags
3638
const FLAG_HAS_EXPORTS: u8 = 0x01;
@@ -80,6 +82,13 @@ pub enum BinaryFrame {
8082
WarmSnapshot {
8183
bridge_code: String,
8284
},
85+
Init {
86+
bridge_code: String,
87+
warm_pool_size: u32,
88+
default_warm_heap_limit_mb: u32,
89+
default_warm_cpu_time_limit_ms: u32,
90+
wait_for_warm_pool: bool,
91+
},
8392

8493
// Rust → Host
8594
BridgeCall {
@@ -104,6 +113,7 @@ pub enum BinaryFrame {
104113
callback_type: String,
105114
payload: Vec<u8>, // V8-serialized payload
106115
},
116+
InitReady,
107117
}
108118

109119
/// Structured error in binary format.
@@ -176,7 +186,7 @@ pub fn extract_session_id(raw: &[u8]) -> io::Result<Option<&str>> {
176186
return Err(io::Error::new(io::ErrorKind::InvalidData, "frame too short"));
177187
}
178188
let msg_type = raw[0];
179-
if msg_type == MSG_AUTHENTICATE || msg_type == MSG_WARM_SNAPSHOT {
189+
if msg_type == MSG_AUTHENTICATE || msg_type == MSG_WARM_SNAPSHOT || msg_type == MSG_INIT {
180190
return Ok(None);
181191
}
182192
let sid_len = raw[1] as usize;
@@ -277,6 +287,23 @@ fn encode_body(buf: &mut Vec<u8>, frame: &BinaryFrame) -> io::Result<()> {
277287
buf.extend_from_slice(&(bc_bytes.len() as u32).to_be_bytes());
278288
buf.extend_from_slice(bc_bytes);
279289
}
290+
BinaryFrame::Init {
291+
bridge_code,
292+
warm_pool_size,
293+
default_warm_heap_limit_mb,
294+
default_warm_cpu_time_limit_ms,
295+
wait_for_warm_pool,
296+
} => {
297+
buf.push(MSG_INIT);
298+
buf.push(0); // no session_id
299+
let bc_bytes = bridge_code.as_bytes();
300+
buf.extend_from_slice(&(bc_bytes.len() as u32).to_be_bytes());
301+
buf.extend_from_slice(bc_bytes);
302+
buf.extend_from_slice(&warm_pool_size.to_be_bytes());
303+
buf.extend_from_slice(&default_warm_heap_limit_mb.to_be_bytes());
304+
buf.extend_from_slice(&default_warm_cpu_time_limit_ms.to_be_bytes());
305+
buf.push(if *wait_for_warm_pool { 1 } else { 0 });
306+
}
280307
BinaryFrame::BridgeCall {
281308
session_id,
282309
call_id,
@@ -337,6 +364,10 @@ fn encode_body(buf: &mut Vec<u8>, frame: &BinaryFrame) -> io::Result<()> {
337364
write_len_prefixed_u16(buf, callback_type)?;
338365
buf.extend_from_slice(payload);
339366
}
367+
BinaryFrame::InitReady => {
368+
buf.push(MSG_INIT_READY);
369+
buf.push(0); // no session_id
370+
}
340371
}
341372
Ok(())
342373
}
@@ -423,6 +454,21 @@ fn decode_body(buf: &[u8]) -> io::Result<BinaryFrame> {
423454
let bridge_code = read_utf8(buf, &mut pos, bc_len)?;
424455
Ok(BinaryFrame::WarmSnapshot { bridge_code })
425456
}
457+
MSG_INIT => {
458+
let bc_len = read_u32(buf, &mut pos)? as usize;
459+
let bridge_code = read_utf8(buf, &mut pos, bc_len)?;
460+
let warm_pool_size = read_u32(buf, &mut pos)?;
461+
let default_warm_heap_limit_mb = read_u32(buf, &mut pos)?;
462+
let default_warm_cpu_time_limit_ms = read_u32(buf, &mut pos)?;
463+
let wait_flag = read_u8(buf, &mut pos)?;
464+
Ok(BinaryFrame::Init {
465+
bridge_code,
466+
warm_pool_size,
467+
default_warm_heap_limit_mb,
468+
default_warm_cpu_time_limit_ms,
469+
wait_for_warm_pool: wait_flag != 0,
470+
})
471+
}
426472
MSG_BRIDGE_CALL => {
427473
let call_id = read_u64(buf, &mut pos)?;
428474
let m_len = read_u16(buf, &mut pos)? as usize;
@@ -486,6 +532,9 @@ fn decode_body(buf: &[u8]) -> io::Result<BinaryFrame> {
486532
payload,
487533
})
488534
}
535+
MSG_INIT_READY => {
536+
Ok(BinaryFrame::InitReady)
537+
}
489538
_ => Err(io::Error::new(
490539
io::ErrorKind::InvalidData,
491540
format!("unknown message type: 0x{msg_type:02x}"),
@@ -841,6 +890,51 @@ mod tests {
841890
assert_eq!(result, None);
842891
}
843892

893+
// -- Init / InitReady --
894+
895+
#[test]
896+
fn roundtrip_init() {
897+
roundtrip(&BinaryFrame::Init {
898+
bridge_code: "(function(){ /* bridge */ })()".into(),
899+
warm_pool_size: 2,
900+
default_warm_heap_limit_mb: 128,
901+
default_warm_cpu_time_limit_ms: 5000,
902+
wait_for_warm_pool: true,
903+
});
904+
}
905+
906+
#[test]
907+
fn roundtrip_init_no_wait() {
908+
roundtrip(&BinaryFrame::Init {
909+
bridge_code: "bridge()".into(),
910+
warm_pool_size: 0,
911+
default_warm_heap_limit_mb: 0,
912+
default_warm_cpu_time_limit_ms: 0,
913+
wait_for_warm_pool: false,
914+
});
915+
}
916+
917+
#[test]
918+
fn roundtrip_init_ready() {
919+
roundtrip(&BinaryFrame::InitReady);
920+
}
921+
922+
#[test]
923+
fn extract_session_id_init_returns_none() {
924+
let frame = BinaryFrame::Init {
925+
bridge_code: "bridge()".into(),
926+
warm_pool_size: 2,
927+
default_warm_heap_limit_mb: 128,
928+
default_warm_cpu_time_limit_ms: 0,
929+
wait_for_warm_pool: true,
930+
};
931+
let mut buf = Vec::new();
932+
write_frame(&mut buf, &frame).expect("write");
933+
let raw = &buf[4..];
934+
let result = extract_session_id(raw).expect("extract");
935+
assert_eq!(result, None);
936+
}
937+
844938
// -- Edge cases --
845939

846940
#[test]
@@ -1132,6 +1226,16 @@ mod tests {
11321226
},
11331227
0x09,
11341228
),
1229+
(
1230+
BinaryFrame::Init {
1231+
bridge_code: "b()".into(),
1232+
warm_pool_size: 2,
1233+
default_warm_heap_limit_mb: 128,
1234+
default_warm_cpu_time_limit_ms: 0,
1235+
wait_for_warm_pool: true,
1236+
},
1237+
0x0B,
1238+
),
11351239
(
11361240
BinaryFrame::BridgeCall {
11371241
session_id: "s".into(),
@@ -1166,6 +1270,10 @@ mod tests {
11661270
},
11671271
0x84,
11681272
),
1273+
(
1274+
BinaryFrame::InitReady,
1275+
0x8C,
1276+
),
11691277
];
11701278
for (frame, expected_type) in &cases {
11711279
let mut buf = Vec::new();

crates/v8-runtime/src/main.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ fn handle_connection(
166166
connection_id: u64,
167167
session_mgr: Arc<Mutex<SessionManager>>,
168168
snapshot_cache: Arc<SnapshotCache>,
169+
ipc_tx: session::IpcSender,
169170
) {
170171
loop {
171172
// Read next binary frame from connection
@@ -264,6 +265,45 @@ fn handle_connection(
264265
);
265266
}
266267
}
268+
// Handle Init: warm snapshot cache, configure warm pool, send InitReady
269+
BinaryFrame::Init {
270+
bridge_code,
271+
warm_pool_size,
272+
default_warm_heap_limit_mb,
273+
default_warm_cpu_time_limit_ms,
274+
wait_for_warm_pool,
275+
} => {
276+
// Warm snapshot cache
277+
if !bridge_code.is_empty() {
278+
if let Err(e) = snapshot_cache.get_or_create(&bridge_code) {
279+
eprintln!("connection {}: Init snapshot warmup failed: {}", connection_id, e);
280+
}
281+
}
282+
283+
// Configure warm pool
284+
let hlm = if default_warm_heap_limit_mb == 0 { None } else { Some(default_warm_heap_limit_mb) };
285+
let ctl = if default_warm_cpu_time_limit_ms == 0 { None } else { Some(default_warm_cpu_time_limit_ms) };
286+
let ready_signal = {
287+
let mut mgr = session_mgr.lock().unwrap();
288+
mgr.set_warm_pool_config(bridge_code, warm_pool_size as usize, hlm, ctl)
289+
};
290+
291+
// Wait for warm pool to fill if requested
292+
if wait_for_warm_pool && warm_pool_size > 0 {
293+
let (lock, cvar) = &*ready_signal;
294+
let mut count = lock.lock().unwrap();
295+
while *count < warm_pool_size as usize {
296+
count = cvar.wait(count).unwrap();
297+
}
298+
}
299+
300+
// Send InitReady response via the connection's writer thread
301+
if let Ok(bytes) = ipc_binary::frame_to_bytes(&BinaryFrame::InitReady) {
302+
if let Err(e) = ipc_tx.send(bytes) {
303+
eprintln!("connection {}: failed to send InitReady: {}", connection_id, e);
304+
}
305+
}
306+
}
267307
_ => {
268308
eprintln!("connection {}: unexpected frame type", connection_id);
269309
}
@@ -403,6 +443,7 @@ fn main() {
403443
.expect("failed to spawn IPC writer thread");
404444

405445
// Create shared session manager for this connection
446+
let conn_ipc_tx = ipc_tx.clone();
406447
let session_mgr = Arc::new(Mutex::new(SessionManager::new(
407448
max_concurrency,
408449
ipc_tx,
@@ -416,7 +457,7 @@ fn main() {
416457
std::thread::Builder::new()
417458
.name(format!("conn-{}", conn_id))
418459
.spawn(move || {
419-
handle_connection(stream, conn_id, mgr, snap);
460+
handle_connection(stream, conn_id, mgr, snap, conn_ipc_tx);
420461
})
421462
.expect("failed to spawn connection handler");
422463
}

0 commit comments

Comments
 (0)