diff --git a/crates/bindings-typescript/src/server/http_internal.ts b/crates/bindings-typescript/src/server/http_internal.ts index 69cd3a98287..61bc43f7b3f 100644 --- a/crates/bindings-typescript/src/server/http_internal.ts +++ b/crates/bindings-typescript/src/server/http_internal.ts @@ -27,6 +27,12 @@ export interface ResponseInit { const textEncoder = new TextEncoder(); const textDecoder = new TextDecoder('utf-8' /* { fatal: true } */); +function deserializeHeaders(headers: HttpHeaders): Headers { + return new Headers( + headers.entries.map(({ name, value }): [string, string] => [name, textDecoder.decode(value)]) + ); +} + const makeResponse = Symbol('makeResponse'); // based on deno's type of the same name @@ -132,8 +138,43 @@ export interface RequestOptions { // redirect?: RequestRedirect; } +/** + * A streaming HTTP response that yields body chunks via iteration. + * + * **Important:** Each iteration blocks the module's V8 thread until the next + * chunk arrives. Because there is one V8 thread per module instance, all other + * reducers and procedures for this database are stalled while waiting. For + * long-running streams (e.g. LLM token streaming), this means the database is + * unresponsive for the duration. Prefer streaming for large finite downloads + * or cases where you can read a few chunks and exit early. + */ +export interface StreamingResponse extends Disposable { + /** HTTP status code. */ + readonly status: number; + /** HTTP status text. */ + readonly statusText: string; + /** Response headers. */ + readonly headers: Headers; + /** Whether the status is in the 200-299 range. */ + readonly ok: boolean; + /** Iterate over response body chunks. Each iteration blocks until the next chunk arrives. */ + [Symbol.iterator](): Iterator; + /** Close the underlying stream handle, canceling the background reader. */ + [Symbol.dispose](): void; +} + export interface HttpClient { fetch(url: URL | string, init?: RequestOptions): SyncResponse; + /** + * Initiate a streaming HTTP request. The response body can be iterated + * chunk by chunk. + * + * **Important:** Iterating over the response blocks the module's V8 thread + * on each chunk, stalling all other operations on this database until + * iteration finishes or the stream is disposed. See {@link StreamingResponse} + * for details. + */ + fetchStreaming(url: URL | string, init?: RequestOptions): StreamingResponse; } const requestBaseSize = bsatnBaseSize({ types: [] }, HttpRequest.algebraicType); @@ -150,8 +191,8 @@ const methods = new Map([ ['PATCH', { tag: 'Patch' }], ]); -function fetch(url: URL | string, init: RequestOptions = {}) { - const method = methods.get(init.method?.toUpperCase() ?? 'GET') ?? { +function buildRequest(url: URL | string, init: RequestOptions = {}): { request: HttpRequest; uri: string; body: Uint8Array | string } { + const method: HttpMethod = methods.get(init.method?.toUpperCase() ?? 'GET') ?? { tag: 'Extension', value: init.method!, }; @@ -169,16 +210,25 @@ function fetch(url: URL | string, init: RequestOptions = {}) { uri, version: { tag: 'Http11' } as const, }); - const requestBuf = new BinaryWriter(requestBaseSize); - HttpRequest.serialize(requestBuf, request); const body = init.body == null ? new Uint8Array() : typeof init.body === 'string' ? init.body : new Uint8Array(init.body as any); + return { request, uri, body }; +} + +function serializeRequest(request: HttpRequest): Uint8Array { + const requestBuf = new BinaryWriter(requestBaseSize); + HttpRequest.serialize(requestBuf, request); + return requestBuf.getBuffer(); +} + +function fetch(url: URL | string, init: RequestOptions = {}) { + const { request, uri, body } = buildRequest(url, init); const [responseBuf, responseBody] = sys.procedure_http_request( - requestBuf.getBuffer(), + serializeRequest(request), body ); const response = HttpResponse.deserialize(new BinaryReader(responseBuf)); @@ -187,11 +237,82 @@ function fetch(url: URL | string, init: RequestOptions = {}) { url: uri, status: response.code, statusText: status(response.code), - headers: new Headers(), + headers: deserializeHeaders(response.headers), aborted: false, }); } +/** Manages the lifecycle of a streaming HTTP response handle. */ +class StreamHandle implements Disposable { + #id: number | -1; + + static #finalizationRegistry = new FinalizationRegistry( + sys.procedure_http_stream_close + ); + + constructor(id: number) { + this.#id = id; + StreamHandle.#finalizationRegistry.register(this, id, this); + } + + /** Read the next chunk. Returns null when the stream is exhausted. */ + next(): Uint8Array | null { + if (this.#id === -1) return null; + const chunk = sys.procedure_http_stream_next(this.#id); + if (chunk === null) { + this.#detach(); + } + return chunk; + } + + #detach(): number { + const id = this.#id; + this.#id = -1; + StreamHandle.#finalizationRegistry.unregister(this); + return id; + } + + [Symbol.dispose]() { + if (this.#id >= 0) { + const id = this.#detach(); + sys.procedure_http_stream_close(id); + } + } +} + +function fetchStreaming(url: URL | string, init: RequestOptions = {}): StreamingResponse { + const { request, body } = buildRequest(url, init); + const [handle, responseBuf] = sys.procedure_http_stream_open( + serializeRequest(request), + body + ); + const stream = new StreamHandle(handle); + const response = HttpResponse.deserialize(new BinaryReader(responseBuf)); + const code = response.code; + const responseHeaders = deserializeHeaders(response.headers); + + return { + get status() { return code; }, + get statusText() { return status(code) as string; }, + headers: responseHeaders, + get ok() { return 200 <= code && code <= 299; }, + *[Symbol.iterator]() { + try { + let chunk: Uint8Array | null; + while ((chunk = stream.next()) !== null) { + yield chunk; + } + } finally { + stream[Symbol.dispose](); + } + }, + [Symbol.dispose]() { + stream[Symbol.dispose](); + }, + }; +} + freeze(fetch); +freeze(fetchStreaming); -export const httpClient: HttpClient = freeze({ fetch }); +export const httpClient: HttpClient = freeze({ fetch, fetchStreaming }); diff --git a/crates/bindings-typescript/src/server/sys.d.ts b/crates/bindings-typescript/src/server/sys.d.ts index d85f23210e2..baccf35262a 100644 --- a/crates/bindings-typescript/src/server/sys.d.ts +++ b/crates/bindings-typescript/src/server/sys.d.ts @@ -94,6 +94,19 @@ declare module 'spacetime:sys@2.0' { body: Uint8Array | string ): [response: Uint8Array, body: Uint8Array]; + export function procedure_http_stream_open( + request: Uint8Array, + body: Uint8Array | string + ): [handle: number, response: Uint8Array]; + + export function procedure_http_stream_next( + handle: number + ): Uint8Array | null; + + export function procedure_http_stream_close( + handle: number + ): void; + export function procedure_start_mut_tx(): bigint; export function procedure_commit_mut_tx(); diff --git a/crates/bindings-typescript/tests/http_headers.test.ts b/crates/bindings-typescript/tests/http_headers.test.ts new file mode 100644 index 00000000000..7fe5ebecc4b --- /dev/null +++ b/crates/bindings-typescript/tests/http_headers.test.ts @@ -0,0 +1,51 @@ +import { describe, expect, test } from 'vitest'; +import { BinaryReader, BinaryWriter } from '../src'; +import { HttpResponse, HttpHeaders } from '../src/lib/http_types'; + +describe('HttpResponse header round-trip', () => { + test('headers survive BSATN serialize/deserialize', () => { + const textEncoder = new TextEncoder(); + const textDecoder = new TextDecoder('utf-8'); + + const original: HttpResponse = { + headers: { + entries: [ + { name: 'content-type', value: textEncoder.encode('text/event-stream') }, + { name: 'x-request-id', value: textEncoder.encode('abc-123') }, + ], + }, + version: { tag: 'Http11' }, + code: 200, + }; + + const writer = new BinaryWriter(256); + HttpResponse.serialize(writer, original); + const buf = writer.getBuffer(); + + const deserialized = HttpResponse.deserialize(new BinaryReader(buf)); + + expect(deserialized.code).toBe(200); + expect(deserialized.headers.entries).toHaveLength(2); + + expect(deserialized.headers.entries[0].name).toBe('content-type'); + expect(textDecoder.decode(deserialized.headers.entries[0].value)).toBe('text/event-stream'); + + expect(deserialized.headers.entries[1].name).toBe('x-request-id'); + expect(textDecoder.decode(deserialized.headers.entries[1].value)).toBe('abc-123'); + }); + + test('empty headers round-trip correctly', () => { + const original: HttpResponse = { + headers: { entries: [] }, + version: { tag: 'Http11' }, + code: 404, + }; + + const writer = new BinaryWriter(64); + HttpResponse.serialize(writer, original); + const deserialized = HttpResponse.deserialize(new BinaryReader(writer.getBuffer())); + + expect(deserialized.code).toBe(404); + expect(deserialized.headers.entries).toHaveLength(0); + }); +}); diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 38517e6eb89..aed265e9657 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -847,94 +847,14 @@ impl InstanceEnv { .with_label_values(self.database_identity()) .inc_scope(); - /// Strip the query part out of the URL in `err`, as query parameters may be sensitive - /// and we'd like it to be safe to directly log errors from this method. - fn strip_query_params_from_reqwest_error(mut err: reqwest::Error) -> reqwest::Error { - if let Some(url) = err.url_mut() { - // `set_query` of `None` clears the query part. - url.set_query(None); - } - err - } - - fn http_error(err: E) -> NodesError { - // Include the full error chain, not just the top-level message. - // `reqwest::Error` wraps underlying causes (DNS failure, connection refused, - // timeout, TLS errors, etc.) which are essential for debugging. - use std::fmt::Write; - let mut message = err.to_string(); - let mut source = err.source(); - while let Some(cause) = source { - write!(message, ": {cause}").unwrap(); - source = cause.source(); - } - NodesError::HttpError(message) - } - - // Then convert the request into an `http::Request`, a semi-standard "lingua franca" type in the Rust ecosystem, - // and map its body into a type `reqwest` will like. - // - // See comments on and in `convert_http_request` for justification that there's no sensitive info in this error. - let (request, timeout) = convert_http_request(request).map_err(http_error)?; - - let request = http::Request::from_parts(request, body); + let (client, reqwest_req) = prepare_http_request(request, body)?; - let mut reqwest: reqwest::Request = request - .try_into() - // `reqwest::Error` may contain sensitive info, namely the full URL with query params. - // Strip those out before returning the error. - .map_err(strip_query_params_from_reqwest_error) - .map_err(http_error)?; - - // If the user requested a timeout using our extension, slot it in to reqwest's timeout. - // Clamp to the range `0..HTTP_DEFAULT_TIMEOUT`. - let timeout = timeout.unwrap_or(HTTP_DEFAULT_TIMEOUT).min(HTTP_MAX_TIMEOUT); - - // reqwest's timeout covers from the start of the request to the end of reading the body, - // so there's no need to do our own timeout operation. - *reqwest.timeout_mut() = Some(timeout); - - let reqwest = reqwest; - - // Check if we have a blocked IP address, since IP literals bypass DNS resolution. - if is_blocked_ip_literal(reqwest.url()) { - return Err(NodesError::HttpError(BLOCKED_HTTP_ADDRESS_ERROR.to_string())); - } - - let redirect_policy = reqwest::redirect::Policy::custom(|attempt| { - if is_blocked_ip_literal(attempt.url()) { - attempt.error(BLOCKED_HTTP_ADDRESS_ERROR) - } else { - reqwest::redirect::Policy::default().redirect(attempt) - } - }); - - // TODO(procedure-metrics): record size in bytes of response, time spent awaiting response. - - // Actually execute the HTTP request! - // TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call. - let execute_fut = reqwest::Client::builder() - .dns_resolver(Arc::new(FilteredDnsResolver)) - .redirect(redirect_policy) - .build() - .map_err(http_error)? - .execute(reqwest); - - // Run the future that does IO work on a tokio worker thread, where it's more efficent. + // Run the future that does IO work on a tokio worker thread, where it's more efficient. + let execute_fut = client.execute(reqwest_req); let response_fut = tokio::spawn(async { - // `reqwest::Error` may contain sensitive info, namely the full URL with query params. - // We'll strip those with `strip_query_params_from_eqwest_error` - // after `await`ing `response_fut` below. let response = execute_fut.await?; - - // Download the response body, which in all likelihood will be a stream, - // as reqwest seems to prefer that. let (response, body) = http::Response::from(response).into_parts(); - - // This error may also contain the full URL with query params. - // Again, we'll strip them after `await`ing `response_fut` below. let body = http_body_util::BodyExt::collect(body).await?.to_bytes(); - Ok((response, body)) }) .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())); @@ -945,29 +865,13 @@ impl InstanceEnv { let (response, body) = response_fut .await .inspect_err(|err: &reqwest::Error| { - // Report the request's failure in our metrics as either a timeout or a misc. failure, as appropriate. - if err.is_timeout() { - DB_METRICS - .procedure_num_timeout_http_requests - .with_label_values(&database_identity) - .inc(); - } else { - DB_METRICS - .procedure_num_failed_http_requests - .with_label_values(&database_identity) - .inc(); - } + record_http_error_metrics(&database_identity, err); }) - // `response_fut` returns a `reqwest::Error`, which may contain the full URL including query params. - // Strip them out to clean the error of potentially sensitive info. .map_err(strip_query_params_from_reqwest_error) .map_err(http_error)?; - // Transform the `http::Response` into our `spacetimedb_lib::http::Response` type, - // which has a stable BSATN encoding to pass across the WASM boundary. let response = convert_http_response(response); - // Record the response size in bytes. DB_METRICS .procedure_http_response_size_bytes .with_label_values(&database_identity) @@ -978,6 +882,202 @@ impl InstanceEnv { } } +/// State for an in-progress streaming HTTP response. +/// +/// The background tokio task reads chunks from the response body +/// and sends them through a bounded channel. The procedure reads +/// chunks one at a time via [`InstanceEnv::http_stream_next`]. +pub struct HttpStreamState { + pub receiver: tokio::sync::mpsc::Receiver>, + pub abort_handle: tokio::task::AbortHandle, +} + +impl Drop for HttpStreamState { + fn drop(&mut self) { + self.abort_handle.abort(); + } +} + +impl InstanceEnv { + /// Initiate a streaming HTTP request. + /// + /// Returns a future that resolves to the response metadata and a chunk receiver. + /// The response body is read in the background and sent through a bounded channel. + /// + /// Security: reuses the same IP filtering, DNS resolution, timeout clamping, + /// and redirect policy as [`InstanceEnv::http_request`]. + pub fn http_stream_open( + &mut self, + request: st_http::Request, + body: bytes::Bytes, + ) -> Result< + impl Future< + Output = Result< + ( + st_http::Response, + tokio::sync::mpsc::Receiver>, + tokio::task::AbortHandle, + ), + NodesError, + >, + > + use<>, + NodesError, + > { + if self.in_tx() { + return Err(NodesError::WouldBlockTransaction(super::AbiCall::ProcedureHttpStreamOpen)); + } + + DB_METRICS + .procedure_num_http_requests + .with_label_values(self.database_identity()) + .inc(); + DB_METRICS + .procedure_http_request_size_bytes + .with_label_values(self.database_identity()) + .inc_by((request.size_in_bytes() + body.len()) as _); + // Create the in-progress metric guard. Moved into the async block so it + // lives until the response headers are received and the chunk reader is spawned. + let in_progress_metric = DB_METRICS + .procedure_num_in_progress_http_requests + .with_label_values(self.database_identity()) + .inc_scope(); + + let (client, reqwest_req) = prepare_http_request(request, body)?; + let execute_fut = client.execute(reqwest_req); + + let database_identity = *self.database_identity(); + + Ok(async move { + // Keep the in-progress metric alive until the response headers arrive + // and the background chunk reader is spawned. + let _in_progress_metric = in_progress_metric; + + let response = tokio::spawn(execute_fut) + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())) + .await + .inspect_err(|err: &reqwest::Error| { + record_http_error_metrics(&database_identity, err); + }) + .map_err(strip_query_params_from_reqwest_error) + .map_err(http_error)?; + + let (parts, body) = http::Response::from(response).into_parts(); + let response = convert_http_response(parts); + + // Spawn a background task that reads chunks from the response body + // and sends them through a bounded channel. + let (tx, rx) = tokio::sync::mpsc::channel::>(8); + let task = tokio::spawn(async move { + use http_body_util::BodyExt; + let mut body = std::pin::pin!(body); + loop { + match body.frame().await { + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + if !data.is_empty() { + if tx.send(Ok(data)).await.is_err() { + // Receiver dropped — procedure closed the stream. + break; + } + } + } + // Non-data frames (trailers) are skipped. + } + Some(Err(err)) => { + let _ = tx.send(Err(err.to_string())).await; + break; + } + None => { + // Body stream ended. + break; + } + } + } + }); + + Ok((response, rx, task.abort_handle())) + }) + } +} + +/// Strip the query part out of the URL in `err`, as query parameters may be sensitive +/// and we'd like it to be safe to directly log errors from HTTP methods. +fn strip_query_params_from_reqwest_error(mut err: reqwest::Error) -> reqwest::Error { + if let Some(url) = err.url_mut() { + url.set_query(None); + } + err +} + +/// Convert a `std::error::Error` into a `NodesError::HttpError`, +/// preserving the full error chain for debugging. +fn http_error(err: E) -> NodesError { + use std::fmt::Write; + let mut message = err.to_string(); + let mut source = err.source(); + while let Some(cause) = source { + write!(message, ": {cause}").unwrap(); + source = cause.source(); + } + NodesError::HttpError(message) +} + +/// Prepare an HTTP request for execution: convert from our type to reqwest, +/// apply timeout clamping, IP filtering, DNS filtering, and redirect policy. +/// +/// Returns the `reqwest::Client` and `reqwest::Request` ready to execute. +fn prepare_http_request( + request: st_http::Request, + body: bytes::Bytes, +) -> Result<(reqwest::Client, reqwest::Request), NodesError> { + let (request_parts, timeout) = convert_http_request(request).map_err(http_error)?; + let request = http::Request::from_parts(request_parts, body); + + let mut reqwest_req: reqwest::Request = request + .try_into() + .map_err(strip_query_params_from_reqwest_error) + .map_err(http_error)?; + + let timeout = timeout.unwrap_or(HTTP_DEFAULT_TIMEOUT).min(HTTP_MAX_TIMEOUT); + *reqwest_req.timeout_mut() = Some(timeout); + + if is_blocked_ip_literal(reqwest_req.url()) { + return Err(NodesError::HttpError(BLOCKED_HTTP_ADDRESS_ERROR.to_string())); + } + + let redirect_policy = reqwest::redirect::Policy::custom(|attempt| { + if is_blocked_ip_literal(attempt.url()) { + attempt.error(BLOCKED_HTTP_ADDRESS_ERROR) + } else { + reqwest::redirect::Policy::default().redirect(attempt) + } + }); + + // TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call. + let client = reqwest::Client::builder() + .dns_resolver(Arc::new(FilteredDnsResolver)) + .redirect(redirect_policy) + .build() + .map_err(http_error)?; + + Ok((client, reqwest_req)) +} + +/// Record error metrics for a failed HTTP request. +fn record_http_error_metrics(database_identity: &Identity, err: &reqwest::Error) { + if err.is_timeout() { + DB_METRICS + .procedure_num_timeout_http_requests + .with_label_values(database_identity) + .inc(); + } else { + DB_METRICS + .procedure_num_failed_http_requests + .with_label_values(database_identity) + .inc(); + } +} + /// Default timeout for HTTP requests performed by [`InstanceEnv::http_request`]. /// /// Applied when the module does not specify a timeout. @@ -2180,4 +2280,77 @@ mod test { assert_eq!(0, tx.metrics.bytes_sent_to_clients); Ok(()) } + + #[test] + fn http_request_blocked_during_tx() -> Result<()> { + let db = relational_db()?; + let (mut env, _rt) = instance_env(db)?; + + env.start_mutable_tx()?; + + let request = spacetimedb_lib::http::Request { + method: spacetimedb_lib::http::Method::Get, + headers: std::iter::empty::<(Option>, Box<[u8]>)>().collect(), + timeout: None, + uri: "http://example.com".into(), + version: spacetimedb_lib::http::Version::Http11, + }; + let result = env.http_request(request, bytes::Bytes::new()); + assert!( + matches!(&result, Err(NodesError::WouldBlockTransaction(crate::host::AbiCall::ProcedureHttpRequest))), + "expected WouldBlockTransaction(ProcedureHttpRequest)" + ); + + env.abort_mutable_tx()?; + Ok(()) + } + + #[test] + fn http_stream_open_blocked_during_tx() -> Result<()> { + let db = relational_db()?; + let (mut env, _rt) = instance_env(db)?; + + env.start_mutable_tx()?; + + let request = spacetimedb_lib::http::Request { + method: spacetimedb_lib::http::Method::Get, + headers: std::iter::empty::<(Option>, Box<[u8]>)>().collect(), + timeout: None, + uri: "http://example.com".into(), + version: spacetimedb_lib::http::Version::Http11, + }; + let result = env.http_stream_open(request, bytes::Bytes::new()); + assert!( + matches!(&result, Err(NodesError::WouldBlockTransaction(crate::host::AbiCall::ProcedureHttpStreamOpen))), + "expected WouldBlockTransaction(ProcedureHttpStreamOpen)" + ); + + env.abort_mutable_tx()?; + Ok(()) + } + + #[tokio::test] + async fn http_stream_state_drop_aborts_background_task() { + let (tx, rx) = tokio::sync::mpsc::channel::>(8); + let task = tokio::spawn(async move { + // Hold the sender open and block indefinitely. + let _tx = tx; + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + }); + let abort_handle = task.abort_handle(); + + let state = HttpStreamState { + receiver: rx, + abort_handle, + }; + + // Dropping the state should abort the background task via the Drop impl. + drop(state); + + let result = task.await; + assert!( + result.unwrap_err().is_cancelled(), + "background task should have been cancelled by HttpStreamState::drop" + ); + } } diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 0daa9c359bc..db3bf8a4d38 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -194,4 +194,7 @@ pub enum AbiCall { ProcedureCommitMutTransaction, ProcedureAbortMutTransaction, ProcedureHttpRequest, + ProcedureHttpStreamOpen, + ProcedureHttpStreamNext, + ProcedureHttpStreamClose, } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index b611fcaef26..443d5a37d7e 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -25,7 +25,7 @@ use crate::host::wasm_common::module_host_actor::{ InstanceOp, ProcedureExecuteResult, ProcedureOp, ReducerExecuteResult, ReducerOp, ViewExecuteResult, ViewOp, WasmInstance, }; -use crate::host::wasm_common::{RowIters, TimingSpanSet}; +use crate::host::wasm_common::{HttpStreams, RowIters, TimingSpanSet}; use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; @@ -215,6 +215,9 @@ struct JsInstanceEnv { /// The slab of `BufferIters` created for this instance. iters: RowIters, + /// The slab of streaming HTTP response handles. + http_streams: HttpStreams, + /// Track time spent in module-defined spans. timing_spans: TimingSpanSet, @@ -237,6 +240,7 @@ impl JsInstanceEnv { module_def: None, call_times: CallTimes::new(), iters: <_>::default(), + http_streams: <_>::default(), chunk_pool: <_>::default(), timing_spans: <_>::default(), } diff --git a/crates/core/src/host/v8/syscall/common.rs b/crates/core/src/host/v8/syscall/common.rs index 5e96ef9a1ab..b865a318f26 100644 --- a/crates/core/src/host/v8/syscall/common.rs +++ b/crates/core/src/host/v8/syscall/common.rs @@ -19,9 +19,11 @@ use crate::host::instance_env::InstanceEnv; use crate::host::wasm_common::module_host_actor::{ deserialize_view_rows, run_query_for_view, AnonymousViewOp, ProcedureOp, ViewOp, ViewResult, ViewReturnData, }; -use crate::host::wasm_common::{RowIterIdx, TimingSpan, TimingSpanIdx}; +use crate::host::instance_env::HttpStreamState; +use crate::host::wasm_common::{HttpStreamIdx, RowIterIdx, TimingSpan, TimingSpanIdx}; use anyhow::Context; use bytes::Bytes; +use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_primitives::{ColId, IndexId, ProcedureId, TableId}; @@ -585,10 +587,14 @@ pub fn identity<'scope>( /// /// Accepts a BSATN-encoded [`spacetimedb_lib::http::Request`] and a request body, and /// returns a BSATN-encoded [`spacetimedb_lib::http::Response`] and the response body. -pub fn procedure_http_request<'scope>( +/// Deserialize an HTTP request and body from V8 syscall arguments. +/// +/// `args.get(0)` must be a `Uint8Array` containing the BSATN-encoded request. +/// `args.get(1)` must be a `Uint8Array` or `string` containing the request body. +fn deserialize_http_request_args<'scope>( scope: &mut PinScope<'scope, '_>, - args: FunctionCallbackArguments<'scope>, -) -> SysCallResult> { + args: &FunctionCallbackArguments<'scope>, +) -> SysCallResult<(spacetimedb_lib::http::Request, Bytes)> { use spacetimedb_lib::http as st_http; let request = @@ -611,6 +617,15 @@ pub fn procedure_http_request<'scope>( Bytes::copy_from_slice(bytes.get_contents(&mut [])) }; + Ok((request, request_body)) +} + +pub fn procedure_http_request<'scope>( + scope: &mut PinScope<'scope, '_>, + args: FunctionCallbackArguments<'scope>, +) -> SysCallResult> { + let (request, request_body) = deserialize_http_request_args(scope, &args)?; + let env = get_env(scope)?; let fut = env.instance_env.http_request(request, request_body)?; @@ -847,3 +862,129 @@ fn call_view( ErrorOrException::Exception(exc) => exc.into(), }) } + +/// # Signature +/// +/// ```ignore +/// function procedure_http_stream_open( +/// request: Uint8Array, +/// body: Uint8Array | string +/// ): [handle: number, response: Uint8Array]; +/// ``` +/// +/// Initiates a streaming HTTP request. Returns a handle for reading chunks +/// and the BSATN-encoded response metadata (status, headers). +pub fn procedure_http_stream_open<'scope>( + scope: &mut PinScope<'scope, '_>, + args: FunctionCallbackArguments<'scope>, +) -> SysCallResult> { + let (request, request_body) = deserialize_http_request_args(scope, &args)?; + + let env = get_env(scope)?; + + let fut = env.instance_env.http_stream_open(request, request_body)?; + + let rt = tokio::runtime::Handle::current(); + let (response, receiver, abort_handle) = rt.block_on(fut)?; + + // Record the response header size; body bytes are recorded per-chunk in stream_next. + DB_METRICS + .procedure_http_response_size_bytes + .with_label_values(env.instance_env.database_identity()) + .inc_by(response.size_in_bytes() as _); + + let handle = env.http_streams.insert(HttpStreamState { receiver, abort_handle }); + + let response = bsatn::to_vec(&response).expect("failed to serialize `HttpResponse`"); + let response = make_uint8array(scope, response); + let handle_val = v8::Integer::new_from_unsigned(scope, handle.0); + + Ok(v8::Array::new_with_elements( + scope, + &[handle_val.into(), response.into()], + )) +} + +/// # Signature +/// +/// ```ignore +/// function procedure_http_stream_next( +/// handle: number, +/// ): Uint8Array | null; +/// ``` +/// +/// Reads the next chunk from a streaming HTTP response. +/// Returns `null` when the stream is exhausted. +/// +/// **Threading note:** This blocks the V8 worker thread until a chunk arrives. +/// Since there is one V8 worker thread per module instance, all other reducers +/// and procedures for this instance are blocked while waiting. A slow remote +/// server will stall the entire module instance per chunk. +/// +/// **Transaction guard:** Unlike `http_request` and `http_stream_open`, whose +/// guards live in [`InstanceEnv`], the `in_tx()` check here is at the syscall +/// layer because `InstanceEnv` does not own the stream handles — they live in +/// [`JsInstanceEnv`]. +pub fn procedure_http_stream_next<'scope>( + scope: &mut PinScope<'scope, '_>, + args: FunctionCallbackArguments<'scope>, +) -> SysCallResult> { + let handle_u32: u32 = deserialize_js(scope, args.get(0))?; + let handle = HttpStreamIdx(handle_u32); + + let env = get_env(scope)?; + + // Refuse to block on a stream chunk while holding a mutable transaction open. + if env.instance_env.in_tx() { + return Err(NodesError::WouldBlockTransaction(crate::host::AbiCall::ProcedureHttpStreamNext).into()); + } + + let stream = env.http_streams.get_mut(handle).ok_or(SysCallError::NO_SUCH_ITER)?; + + let rt = tokio::runtime::Handle::current(); + let chunk = rt.block_on(stream.receiver.recv()); + + match chunk { + Some(Ok(bytes)) => { + DB_METRICS + .procedure_http_response_size_bytes + .with_label_values(env.instance_env.database_identity()) + .inc_by(bytes.len() as _); + let arr = match bytes.try_into_mut() { + Ok(bytes_mut) => make_uint8array(scope, Box::new(bytes_mut)), + Err(bytes) => make_uint8array(scope, Vec::from(bytes)), + }; + Ok(arr.into()) + } + Some(Err(err)) => { + // Stream error — clean up and throw. + env.http_streams.take(handle); + Err(TypeError(format!("http stream error: {err}")).throw(scope).into()) + } + None => { + // Stream ended — clean up the handle. + env.http_streams.take(handle); + Ok(v8::null(scope).into()) + } + } +} + +/// # Signature +/// +/// ```ignore +/// function procedure_http_stream_close(handle: number): void; +/// ``` +/// +/// Closes a streaming HTTP response handle, canceling the background reader. +pub fn procedure_http_stream_close( + scope: &mut PinScope<'_, '_>, + args: FunctionCallbackArguments<'_>, +) -> SysCallResult<()> { + let handle_u32: u32 = deserialize_js(scope, args.get(0))?; + let handle = HttpStreamIdx(handle_u32); + + let env = get_env(scope)?; + // `HttpStreamState::drop` aborts the background reader task. + env.http_streams.take(handle); + Ok(()) +} diff --git a/crates/core/src/host/v8/syscall/v2.rs b/crates/core/src/host/v8/syscall/v2.rs index 5f3f6ed3edb..4ffd9e85e5f 100644 --- a/crates/core/src/host/v8/syscall/v2.rs +++ b/crates/core/src/host/v8/syscall/v2.rs @@ -17,6 +17,7 @@ use super::common::{ console_log, console_timer_end, console_timer_start, datastore_index_scan_range_bsatn_inner, datastore_table_row_count, datastore_table_scan_bsatn, deserialize_row_iter_idx, get_env, identity, index_id_from_name, procedure_abort_mut_tx, procedure_commit_mut_tx, procedure_http_request, + procedure_http_stream_close, procedure_http_stream_next, procedure_http_stream_open, procedure_start_mut_tx, row_iter_bsatn_close, table_id_from_name, volatile_nonatomic_schedule_immediate, }; use super::hooks::get_hook_function; @@ -132,6 +133,9 @@ pub(super) fn sys_v2_0<'scope>(scope: &mut PinScope<'scope, '_>) -> Local<'scope (with_sys_result, AbiCall::Identity, identity), (with_sys_result, AbiCall::GetJwt, get_jwt_payload), (with_sys_result, AbiCall::ProcedureHttpRequest, procedure_http_request), + (with_sys_result, AbiCall::ProcedureHttpStreamOpen, procedure_http_stream_open), + (with_sys_result, AbiCall::ProcedureHttpStreamNext, procedure_http_stream_next), + (with_sys_result, AbiCall::ProcedureHttpStreamClose, procedure_http_stream_close), ( with_sys_result, AbiCall::ProcedureStartMutTransaction, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index a5c737d54d6..7f3b2043ebe 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -326,6 +326,9 @@ impl ResourceSlab { decl_index!(RowIterIdx => std::vec::IntoIter>); pub(super) type RowIters = ResourceSlab; +decl_index!(HttpStreamIdx => crate::host::instance_env::HttpStreamState); +pub(super) type HttpStreams = ResourceSlab; + pub(crate) struct TimingSpan { pub start: Instant, pub name: String, diff --git a/crates/smoketests/tests/smoketests/http_streaming.rs b/crates/smoketests/tests/smoketests/http_streaming.rs new file mode 100644 index 00000000000..cf21d7faa7c --- /dev/null +++ b/crates/smoketests/tests/smoketests/http_streaming.rs @@ -0,0 +1,192 @@ +use std::io::{Read, Write}; +use std::net::TcpListener; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +use spacetimedb_smoketests::Smoketest; + +/// Spawn a test HTTP server that responds with chunked data and custom headers. +/// Accepts up to `max_connections` requests then exits. +fn spawn_chunked_server(max_connections: usize) -> (u16, JoinHandle>) { + let listener = TcpListener::bind(("127.0.0.1", 0)).expect("failed to bind test server"); + listener + .set_nonblocking(true) + .expect("failed to set test server nonblocking mode"); + let port = listener + .local_addr() + .expect("failed to read test server address") + .port(); + + let handle = std::thread::spawn(move || -> std::io::Result<()> { + let deadline = Instant::now() + Duration::from_secs(30); + for _ in 0..max_connections { + let (mut stream, _) = loop { + match listener.accept() { + Ok(pair) => break pair, + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + if Instant::now() >= deadline { + // All expected connections may not arrive (e.g. if a test fails early). + return Ok(()); + } + std::thread::sleep(Duration::from_millis(10)); + } + Err(err) => return Err(err), + } + }; + + // Drain the request. + let mut buf = [0u8; 4096]; + stream.set_read_timeout(Some(Duration::from_millis(200))).ok(); + let _ = stream.read(&mut buf); + + // Respond with chunked encoding and custom headers. + stream.write_all( + b"HTTP/1.1 200 OK\r\n\ + Content-Type: text/event-stream\r\n\ + X-Test-Header: hello-world\r\n\ + Transfer-Encoding: chunked\r\n\ + Connection: close\r\n\r\n", + )?; + + // Send three chunks. + for chunk in &[b"AAA" as &[u8], b"BBB", b"CCC"] { + write!(stream, "{:x}\r\n", chunk.len())?; + stream.write_all(chunk)?; + stream.write_all(b"\r\n")?; + } + // Terminating chunk. + stream.write_all(b"0\r\n\r\n")?; + stream.flush()?; + } + Ok(()) + }); + + (port, handle) +} + +fn ts_module_code(port: u16) -> String { + format!( + r#"import {{ schema, t, table }} from "spacetimedb/server"; + +const spacetimedb = schema({{}}); +export default spacetimedb; + +// --- Test 1: basic streaming read --- +export const stream_read = spacetimedb.procedure( + {{}}, + t.string(), + (ctx) => {{ + const resp = ctx.http.fetchStreaming("http://127.0.0.1:{port}/stream"); + const decoder = new TextDecoder(); + let body = ""; + for (const chunk of resp) {{ + body += decoder.decode(chunk); + }} + return body; + }} +); + +// --- Test 2: stream_next inside a transaction must throw --- +export const stream_next_blocked_in_tx = spacetimedb.procedure( + {{}}, + t.string(), + (ctx) => {{ + // Open the stream *outside* the transaction — this must succeed. + const resp = ctx.http.fetchStreaming("http://127.0.0.1:{port}/stream"); + try {{ + ctx.withTx(_tx => {{ + // Iterating calls procedure_http_stream_next which should throw + // WouldBlockTransaction because a mutable tx is open. + for (const _chunk of resp) {{ + return "ERROR: stream.next() inside tx should have thrown"; + }} + }}); + return "ERROR: withTx should have thrown"; + }} catch (e: any) {{ + // We expect a WouldBlockTransaction error. + return "blocked"; + }} finally {{ + resp[Symbol.dispose](); + }} + }} +); + +// --- Test 3: streaming response headers are preserved --- +export const stream_headers = spacetimedb.procedure( + {{}}, + t.string(), + (ctx) => {{ + const resp = ctx.http.fetchStreaming("http://127.0.0.1:{port}/stream"); + try {{ + const ct = resp.headers.get("content-type") ?? "MISSING"; + const xh = resp.headers.get("x-test-header") ?? "MISSING"; + return ct + "|" + xh; + }} finally {{ + resp[Symbol.dispose](); + }} + }} +); +"# + ) +} + +/// Test that streaming HTTP responses can be read chunk by chunk, +/// that iterating a stream inside a transaction is blocked, +/// and that response headers are preserved. +/// +/// Requires the server to be built with `allow_loopback_http_for_tests`. +#[test] +fn test_http_streaming() { + spacetimedb_smoketests::require_pnpm!(); + + // 3 connections: one per procedure call. + let (port, server) = spawn_chunked_server(3); + + let mut test = Smoketest::builder().autopublish(false).build(); + test.publish_typescript_module_source("http-streaming", "http-streaming", &ts_module_code(port)) + .unwrap(); + + // Test 1: basic streaming read — all chunks concatenated. + let output = test.call_output("stream_read", &[]); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "stream_read failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stdout.contains("AAABBBCCC"), + "expected concatenated chunks AAABBBCCC in output, got:\n{stdout}" + ); + + // Test 2: stream_next inside a transaction must throw. + let output = test.call_output("stream_next_blocked_in_tx", &[]); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "stream_next_blocked_in_tx failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stdout.contains("blocked"), + "expected procedure to catch WouldBlockTransaction, got:\n{stdout}" + ); + + // Test 3: response headers are preserved. + let output = test.call_output("stream_headers", &[]); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "stream_headers failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stdout.contains("text/event-stream") && stdout.contains("hello-world"), + "expected response headers in output, got:\n{stdout}" + ); + + server + .join() + .expect("test server thread panicked") + .expect("test server failed"); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index f5053652dd3..b03231c4ec8 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -19,6 +19,7 @@ mod domains; mod fail_initial_publish; mod filtering; mod http_egress; +mod http_streaming; mod logs_level_filter; mod module_nested_op; mod modules;