Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions javascript/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,49 @@ export class ForeverVM {
async #get(path: string) {
const response = await fetch(`${this.#baseUrl}${path}`, { headers: this.#headers })
if (!response.ok) {
Comment thread
jakelazaroff marked this conversation as resolved.
throw new Error(`HTTP error! status: ${response.status}`)
const text = await response.text().catch(() => 'Unknown error')
throw new Error(`HTTP ${response.status}: ${text}`)
}
return await response.json()
}

async *#getStream(path: string) {
const response = await fetch(`${this.#baseUrl}${path}`, { headers: this.#headers })
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
const text = await response.text().catch(() => 'Unknown error')
throw new Error(`HTTP ${response.status}: ${text}`)
}

if (!response.body) return

const decoder = new TextDecoderStream()
const reader = response.body.pipeThrough(decoder).getReader()

// buffer JSON just in case an object is split across multiple stream chunks
let buffer = ''

while (true) {
const { done, value = '' } = await reader.read()
let { done, value = '' } = await reader.read()
if (done) return
Comment thread
jakelazaroff marked this conversation as resolved.

const lines = value.split('\n').filter((line) => line.trim())
for (const line of lines) {
yield JSON.parse(line)
// loop until we've read all the data in this chunk
while (value) {
// find the next newline character
const newline = value.indexOf('\n')

// if there are no more newlines, add the remaining data to the buffer and break
if (newline === -1) {
buffer += value
break
}

// parse and yield the next line from the data
const line = value.slice(0, newline)
yield JSON.parse(buffer + line)

// remove the just-processed line from the value and reset the buffer
value = value.slice(newline + 1)
buffer = ''
}
}
}
Expand All @@ -73,8 +94,10 @@ export class ForeverVM {
body: body ? JSON.stringify(body) : undefined,
})
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
const text = await response.text().catch(() => 'Unknown error')
throw new Error(`HTTP ${response.status}: ${text}`)
}

return await response.json()
}

Expand Down Expand Up @@ -179,6 +202,24 @@ if (import.meta.vitest) {
}
})

test('execResultStream with image', { timeout: 10000 }, async () => {
const fvm = new ForeverVM({ token: FOREVERVM_TOKEN, baseUrl: FOREVERVM_API_BASE })
const { machine_name } = await fvm.createMachine()

const code = `import matplotlib.pyplot as plt
plt.plot([0, 1, 2], [0, 1, 2])
plt.title('Simple Plot')
plt.show()`

const { instruction_seq } = await fvm.exec(code, machine_name)
expect(instruction_seq).toBe(0)

for await (const _ of fvm.execResultStream(machine_name, instruction_seq as number)) {
}

// if we reach this point, it means all the stream chunks were valid JSON
})

test('createMachine with tags', async () => {
const fvm = new ForeverVM({ token: FOREVERVM_TOKEN, baseUrl: FOREVERVM_API_BASE })

Expand Down
2 changes: 1 addition & 1 deletion javascript/sdk/src/repl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ if (import.meta.vitest) {
expect(error).toMatch('ZeroDivisionError')
})

test.sequential('reconnect', async () => {
test.sequential('reconnect', { timeout: 10000 }, async () => {
const repl = new Repl({ token: FOREVERVM_TOKEN, baseUrl: FOREVERVM_API_BASE })

await repl.exec('1 + 1').result
Expand Down
23 changes: 23 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/forevervm-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ readme = "README.md"
description = "foreverVM SDK. Allows you to start foreverVMs and run a REPL on them."

[dependencies]
async-stream = "0.3.6"
chrono = { version = "0.4.39", features = ["serde"] }
futures-util = "0.3.31"
regex = "1.11.1"
Expand Down
46 changes: 21 additions & 25 deletions rust/forevervm-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,33 +195,29 @@ impl ForeverVMClient {
return Err(parse_error(response).await?);
}

let stream = response
.bytes_stream()
.map(|result| -> Result<String> {
let bytes = result?;
Ok(String::from_utf8_lossy(&bytes).to_string())
})
.flat_map(|result| {
let lines = match result {
Ok(text) => text.lines().map(|s| Ok(s.to_string())).collect::<Vec<_>>(),
Err(err) => vec![Err(err)],
};
futures_util::stream::iter(lines)
})
.filter_map(|line_result| async move {
match line_result {
Ok(line) => {
if line.trim().is_empty() {
return None;
}
match serde_json::from_str::<MessageFromServer>(&line) {
Ok(message) => Some(Ok(message)),
Err(err) => Some(Err(ClientError::from(err))),
}
let stream = async_stream::stream! {
let mut bytes_stream = response.bytes_stream();
let mut buffer = String::new();
while let Some(bytes) = bytes_stream.next().await {
let mut value = String::from_utf8_lossy(&bytes?).to_string();

'chunk: loop {
if let Some((first, rest)) = value.split_once('\n') {
let json = &format!("{buffer}{first}");
yield match serde_json::from_str::<MessageFromServer>(json) {
Ok(message) => Ok(message),
Err(err) => Err(ClientError::from(err)),
};

value = String::from(rest);
buffer = String::new();
} else {
buffer += &value;
break 'chunk;
}
Err(err) => Some(Err(err)),
}
});
}
Comment thread
jakelazaroff marked this conversation as resolved.
};

Ok(Box::pin(stream))
}
Expand Down
95 changes: 95 additions & 0 deletions rust/forevervm-sdk/tests/basic_sdk_tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use forevervm_sdk::api::api_types::Instruction;
use forevervm_sdk::api::http_api::{CreateMachineRequest, ListMachinesRequest};
use forevervm_sdk::api::protocol::MessageFromServer;
use forevervm_sdk::{
api::{api_types::ExecResultType, protocol::StandardOutputStream, token::ApiToken},
client::ForeverVMClient,
};
use futures_util::StreamExt;
use std::env;
use url::Url;

Expand Down Expand Up @@ -85,6 +87,99 @@ async fn test_exec() {
);
}

#[tokio::test]
async fn test_exec_stream() {
let (api_base, token) = get_test_credentials();
let client = ForeverVMClient::new(api_base, token);

// Create machine and execute code
let machine = client
.create_machine(CreateMachineRequest::default())
.await
.expect("failed to create machine");
let code = "for i in range(10): print(i)\n'done'";

let result = client
.exec_instruction(
&machine.machine_name,
Instruction {
code: code.to_string(),
timeout_seconds: 10,
},
)
.await
.expect("exec failed");

let mut stream = client
.exec_result_stream(
&machine.machine_name,
result.instruction_seq.expect("instruction seq missing"),
)
.await
.expect("failed to get exec result");

let mut i = 0;
while let Some(msg) = stream.next().await {
match msg {
Ok(MessageFromServer::Output { chunk, .. }) => {
assert_eq!(chunk.data, format!("{}", i));
i += 1;
}
Ok(MessageFromServer::Result(chunk)) => {
assert_eq!(
chunk.result.result,
ExecResultType::Value {
value: Some("'done'".to_string()),
data: None
}
);
}
_ => {
panic!("unexpected message");
}
}
}
}

#[tokio::test]
async fn test_exec_stream_image() {
let (api_base, token) = get_test_credentials();
let client = ForeverVMClient::new(api_base, token);

// Create machine and execute code
let machine = client
.create_machine(CreateMachineRequest::default())
.await
.expect("failed to create machine");
let code = "import matplotlib.pyplot as plt
plt.plot([0, 1, 2], [0, 1, 2])
plt.title('Simple Plot')
plt.show()";

let result = client
.exec_instruction(
&machine.machine_name,
Instruction {
code: code.to_string(),
timeout_seconds: 10,
},
)
.await
.expect("exec failed");

let mut stream = client
.exec_result_stream(
&machine.machine_name,
result.instruction_seq.expect("instruction seq missing"),
)
.await
.expect("failed to get exec result");

while let Some(chunk) = stream.next().await {
assert!(chunk.is_ok(), "chunk should parse as JSON");
}
}

#[tokio::test]
async fn test_repl() {
let (api_base, token) = get_test_credentials();
Expand Down