From 74ebe6b38c37a5fd620bb2b6b2d23eb52ad38bb4 Mon Sep 17 00:00:00 2001 From: Philippe Tremblay Date: Sat, 21 Mar 2026 01:33:33 -0400 Subject: [PATCH 1/3] fix(ts-bindings): populate response headers in fetch() instead of returning empty Headers fetch() was deserializing the HttpResponse (including headers) from BSATN but then discarding them, always returning `new Headers()`. This made it impossible for procedures to inspect response metadata like Content-Type or retry hints. Add a `deserializeHeaders()` helper that converts the BSATN-decoded HttpHeaders entries into a web-standard Headers object, and use it in fetch(). --- .../src/server/http_internal.ts | 8 ++- .../tests/http_headers.test.ts | 51 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 crates/bindings-typescript/tests/http_headers.test.ts diff --git a/crates/bindings-typescript/src/server/http_internal.ts b/crates/bindings-typescript/src/server/http_internal.ts index 69cd3a98287..cdb665857a4 100644 --- a/crates/bindings-typescript/src/server/http_internal.ts +++ b/crates/bindings-typescript/src/server/http_internal.ts @@ -27,6 +27,12 @@ export interface ResponseInit { const textEncoder = new TextEncoder(); const textDecoder = new TextDecoder('utf-8' /* { fatal: true } */); +function deserializeHeaders(headers: HttpHeaders): Headers { + return new Headers( + headers.entries.map(({ name, value }): [string, string] => [name, textDecoder.decode(value)]) + ); +} + const makeResponse = Symbol('makeResponse'); // based on deno's type of the same name @@ -187,7 +193,7 @@ function fetch(url: URL | string, init: RequestOptions = {}) { url: uri, status: response.code, statusText: status(response.code), - headers: new Headers(), + headers: deserializeHeaders(response.headers), aborted: false, }); } diff --git a/crates/bindings-typescript/tests/http_headers.test.ts b/crates/bindings-typescript/tests/http_headers.test.ts new file mode 100644 index 00000000000..7fe5ebecc4b --- /dev/null +++ b/crates/bindings-typescript/tests/http_headers.test.ts @@ -0,0 +1,51 @@ +import { describe, expect, test } from 'vitest'; +import { BinaryReader, BinaryWriter } from '../src'; +import { HttpResponse, HttpHeaders } from '../src/lib/http_types'; + +describe('HttpResponse header round-trip', () => { + test('headers survive BSATN serialize/deserialize', () => { + const textEncoder = new TextEncoder(); + const textDecoder = new TextDecoder('utf-8'); + + const original: HttpResponse = { + headers: { + entries: [ + { name: 'content-type', value: textEncoder.encode('text/event-stream') }, + { name: 'x-request-id', value: textEncoder.encode('abc-123') }, + ], + }, + version: { tag: 'Http11' }, + code: 200, + }; + + const writer = new BinaryWriter(256); + HttpResponse.serialize(writer, original); + const buf = writer.getBuffer(); + + const deserialized = HttpResponse.deserialize(new BinaryReader(buf)); + + expect(deserialized.code).toBe(200); + expect(deserialized.headers.entries).toHaveLength(2); + + expect(deserialized.headers.entries[0].name).toBe('content-type'); + expect(textDecoder.decode(deserialized.headers.entries[0].value)).toBe('text/event-stream'); + + expect(deserialized.headers.entries[1].name).toBe('x-request-id'); + expect(textDecoder.decode(deserialized.headers.entries[1].value)).toBe('abc-123'); + }); + + test('empty headers round-trip correctly', () => { + const original: HttpResponse = { + headers: { entries: [] }, + version: { tag: 'Http11' }, + code: 404, + }; + + const writer = new BinaryWriter(64); + HttpResponse.serialize(writer, original); + const deserialized = HttpResponse.deserialize(new BinaryReader(writer.getBuffer())); + + expect(deserialized.code).toBe(404); + expect(deserialized.headers.entries).toHaveLength(0); + }); +}); From f836c3531aa0bd627f8bf85cb44fc33bf932fab5 Mon Sep 17 00:00:00 2001 From: Philippe Tremblay Date: Sat, 21 Mar 2026 01:38:54 -0400 Subject: [PATCH 2/3] feat(procedures): add streaming HTTP response support for TypeScript modules Add fetchStreaming() to the procedure HTTP client, allowing TypeScript modules to consume HTTP response bodies chunk by chunk without buffering the entire response in memory. Syscall layer: - procedure_http_stream_open: initiates the request and returns a handle plus BSATN-encoded response metadata (status, headers) - procedure_http_stream_next: blocks until the next chunk arrives or the stream ends; guarded by in_tx() to prevent holding a mutable transaction open across network waits - procedure_http_stream_close: closes the handle and aborts the background reader task Runtime: - HttpStreamState stores the mpsc receiver and a tokio AbortHandle; Drop impl aborts the background reader immediately rather than waiting for the next frame or timeout - Streaming response size is metered via the existing procedure_http_response_size_bytes counter (header size at open, per-chunk body size at read) TypeScript bindings: - StreamHandle class with FinalizationRegistry safety net - StreamingResponse interface with Symbol.iterator / Symbol.dispose - fetchStreaming() uses deserializeHeaders() from the preceding commit to expose response headers Tests: - Unit: WouldBlockTransaction guards on http_request and http_stream_open - Unit: HttpStreamState::drop aborts the background task - Smoketest: end-to-end streaming read, tx-blocked iteration, and header preservation against a local chunked HTTP server --- .../src/server/http_internal.ts | 127 +++++- .../bindings-typescript/src/server/sys.d.ts | 13 + crates/core/src/host/instance_env.rs | 373 +++++++++++++----- crates/core/src/host/mod.rs | 3 + crates/core/src/host/v8/mod.rs | 6 +- crates/core/src/host/v8/syscall/common.rs | 149 ++++++- crates/core/src/host/v8/syscall/v2.rs | 4 + crates/core/src/host/wasm_common.rs | 3 + .../tests/smoketests/http_streaming.rs | 192 +++++++++ crates/smoketests/tests/smoketests/mod.rs | 1 + 10 files changed, 760 insertions(+), 111 deletions(-) create mode 100644 crates/smoketests/tests/smoketests/http_streaming.rs diff --git a/crates/bindings-typescript/src/server/http_internal.ts b/crates/bindings-typescript/src/server/http_internal.ts index cdb665857a4..43caf7e3716 100644 --- a/crates/bindings-typescript/src/server/http_internal.ts +++ b/crates/bindings-typescript/src/server/http_internal.ts @@ -138,8 +138,43 @@ export interface RequestOptions { // redirect?: RequestRedirect; } +/** + * A streaming HTTP response that yields body chunks via iteration. + * + * **Important:** Each iteration blocks the module's V8 thread until the next + * chunk arrives. Because there is one V8 thread per module instance, all other + * reducers and procedures for this database are stalled while waiting. For + * long-running streams (e.g. LLM token streaming), this means the database is + * unresponsive for the duration. Prefer streaming for large finite downloads + * or cases where you can read a few chunks and exit early. + */ +export interface StreamingResponse extends Disposable { + /** HTTP status code. */ + readonly status: number; + /** HTTP status text. */ + readonly statusText: string; + /** Response headers. */ + readonly headers: Headers; + /** Whether the status is in the 200-299 range. */ + readonly ok: boolean; + /** Iterate over response body chunks. Each iteration blocks until the next chunk arrives. */ + [Symbol.iterator](): Iterator; + /** Close the underlying stream handle, canceling the background reader. */ + [Symbol.dispose](): void; +} + export interface HttpClient { fetch(url: URL | string, init?: RequestOptions): SyncResponse; + /** + * Initiate a streaming HTTP request. The response body can be iterated + * chunk by chunk. + * + * **Important:** Iterating over the response blocks the module's V8 thread + * on each chunk, stalling all other operations on this database until + * iteration finishes or the stream is disposed. See {@link StreamingResponse} + * for details. + */ + fetchStreaming(url: URL | string, init?: RequestOptions): StreamingResponse; } const requestBaseSize = bsatnBaseSize({ types: [] }, HttpRequest.algebraicType); @@ -156,8 +191,8 @@ const methods = new Map([ ['PATCH', { tag: 'Patch' }], ]); -function fetch(url: URL | string, init: RequestOptions = {}) { - const method = methods.get(init.method?.toUpperCase() ?? 'GET') ?? { +function buildRequest(url: URL | string, init: RequestOptions = {}): { request: HttpRequest; uri: string; body: Uint8Array | string } { + const method: HttpMethod = methods.get(init.method?.toUpperCase() ?? 'GET') ?? { tag: 'Extension', value: init.method!, }; @@ -175,16 +210,25 @@ function fetch(url: URL | string, init: RequestOptions = {}) { uri, version: { tag: 'Http11' } as const, }); - const requestBuf = new BinaryWriter(requestBaseSize); - HttpRequest.serialize(requestBuf, request); const body = init.body == null ? new Uint8Array() : typeof init.body === 'string' ? init.body : new Uint8Array(init.body as any); + return { request, uri, body }; +} + +function serializeRequest(request: HttpRequest): Uint8Array { + const requestBuf = new BinaryWriter(requestBaseSize); + HttpRequest.serialize(requestBuf, request); + return requestBuf.getBuffer(); +} + +function fetch(url: URL | string, init: RequestOptions = {}) { + const { request, uri, body } = buildRequest(url, init); const [responseBuf, responseBody] = sys.procedure_http_request( - requestBuf.getBuffer(), + serializeRequest(request), body ); const response = HttpResponse.deserialize(new BinaryReader(responseBuf)); @@ -198,6 +242,77 @@ function fetch(url: URL | string, init: RequestOptions = {}) { }); } +/** Manages the lifecycle of a streaming HTTP response handle. */ +class StreamHandle implements Disposable { + #id: number | -1; + + static #finalizationRegistry = new FinalizationRegistry( + sys.procedure_http_stream_close + ); + + constructor(id: number) { + this.#id = id; + StreamHandle.#finalizationRegistry.register(this, id, this); + } + + /** Read the next chunk. Returns null when the stream is exhausted. */ + next(): Uint8Array | null { + if (this.#id === -1) return null; + const chunk = sys.procedure_http_stream_next(this.#id); + if (chunk === null) { + this.#detach(); + } + return chunk; + } + + #detach(): number { + const id = this.#id; + this.#id = -1; + StreamHandle.#finalizationRegistry.unregister(this); + return id; + } + + [Symbol.dispose]() { + if (this.#id >= 0) { + const id = this.#detach(); + sys.procedure_http_stream_close(id); + } + } +} + +function fetchStreaming(url: URL | string, init: RequestOptions = {}): StreamingResponse { + const { request, body } = buildRequest(url, init); + const [handle, responseBuf] = sys.procedure_http_stream_open( + serializeRequest(request), + body + ); + const response = HttpResponse.deserialize(new BinaryReader(responseBuf)); + const stream = new StreamHandle(handle); + const code = response.code; + const responseHeaders = deserializeHeaders(response.headers); + + return { + get status() { return code; }, + get statusText() { return status(code) as string; }, + headers: responseHeaders, + get ok() { return 200 <= code && code <= 299; }, + *[Symbol.iterator]() { + try { + let chunk: Uint8Array | null; + while ((chunk = stream.next()) !== null) { + yield chunk; + } + } finally { + stream[Symbol.dispose](); + } + }, + [Symbol.dispose]() { + stream[Symbol.dispose](); + }, + }; +} + freeze(fetch); +freeze(fetchStreaming); -export const httpClient: HttpClient = freeze({ fetch }); +export const httpClient: HttpClient = freeze({ fetch, fetchStreaming }); diff --git a/crates/bindings-typescript/src/server/sys.d.ts b/crates/bindings-typescript/src/server/sys.d.ts index d85f23210e2..baccf35262a 100644 --- a/crates/bindings-typescript/src/server/sys.d.ts +++ b/crates/bindings-typescript/src/server/sys.d.ts @@ -94,6 +94,19 @@ declare module 'spacetime:sys@2.0' { body: Uint8Array | string ): [response: Uint8Array, body: Uint8Array]; + export function procedure_http_stream_open( + request: Uint8Array, + body: Uint8Array | string + ): [handle: number, response: Uint8Array]; + + export function procedure_http_stream_next( + handle: number + ): Uint8Array | null; + + export function procedure_http_stream_close( + handle: number + ): void; + export function procedure_start_mut_tx(): bigint; export function procedure_commit_mut_tx(); diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 38517e6eb89..aed265e9657 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -847,94 +847,14 @@ impl InstanceEnv { .with_label_values(self.database_identity()) .inc_scope(); - /// Strip the query part out of the URL in `err`, as query parameters may be sensitive - /// and we'd like it to be safe to directly log errors from this method. - fn strip_query_params_from_reqwest_error(mut err: reqwest::Error) -> reqwest::Error { - if let Some(url) = err.url_mut() { - // `set_query` of `None` clears the query part. - url.set_query(None); - } - err - } - - fn http_error(err: E) -> NodesError { - // Include the full error chain, not just the top-level message. - // `reqwest::Error` wraps underlying causes (DNS failure, connection refused, - // timeout, TLS errors, etc.) which are essential for debugging. - use std::fmt::Write; - let mut message = err.to_string(); - let mut source = err.source(); - while let Some(cause) = source { - write!(message, ": {cause}").unwrap(); - source = cause.source(); - } - NodesError::HttpError(message) - } - - // Then convert the request into an `http::Request`, a semi-standard "lingua franca" type in the Rust ecosystem, - // and map its body into a type `reqwest` will like. - // - // See comments on and in `convert_http_request` for justification that there's no sensitive info in this error. - let (request, timeout) = convert_http_request(request).map_err(http_error)?; - - let request = http::Request::from_parts(request, body); + let (client, reqwest_req) = prepare_http_request(request, body)?; - let mut reqwest: reqwest::Request = request - .try_into() - // `reqwest::Error` may contain sensitive info, namely the full URL with query params. - // Strip those out before returning the error. - .map_err(strip_query_params_from_reqwest_error) - .map_err(http_error)?; - - // If the user requested a timeout using our extension, slot it in to reqwest's timeout. - // Clamp to the range `0..HTTP_DEFAULT_TIMEOUT`. - let timeout = timeout.unwrap_or(HTTP_DEFAULT_TIMEOUT).min(HTTP_MAX_TIMEOUT); - - // reqwest's timeout covers from the start of the request to the end of reading the body, - // so there's no need to do our own timeout operation. - *reqwest.timeout_mut() = Some(timeout); - - let reqwest = reqwest; - - // Check if we have a blocked IP address, since IP literals bypass DNS resolution. - if is_blocked_ip_literal(reqwest.url()) { - return Err(NodesError::HttpError(BLOCKED_HTTP_ADDRESS_ERROR.to_string())); - } - - let redirect_policy = reqwest::redirect::Policy::custom(|attempt| { - if is_blocked_ip_literal(attempt.url()) { - attempt.error(BLOCKED_HTTP_ADDRESS_ERROR) - } else { - reqwest::redirect::Policy::default().redirect(attempt) - } - }); - - // TODO(procedure-metrics): record size in bytes of response, time spent awaiting response. - - // Actually execute the HTTP request! - // TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call. - let execute_fut = reqwest::Client::builder() - .dns_resolver(Arc::new(FilteredDnsResolver)) - .redirect(redirect_policy) - .build() - .map_err(http_error)? - .execute(reqwest); - - // Run the future that does IO work on a tokio worker thread, where it's more efficent. + // Run the future that does IO work on a tokio worker thread, where it's more efficient. + let execute_fut = client.execute(reqwest_req); let response_fut = tokio::spawn(async { - // `reqwest::Error` may contain sensitive info, namely the full URL with query params. - // We'll strip those with `strip_query_params_from_eqwest_error` - // after `await`ing `response_fut` below. let response = execute_fut.await?; - - // Download the response body, which in all likelihood will be a stream, - // as reqwest seems to prefer that. let (response, body) = http::Response::from(response).into_parts(); - - // This error may also contain the full URL with query params. - // Again, we'll strip them after `await`ing `response_fut` below. let body = http_body_util::BodyExt::collect(body).await?.to_bytes(); - Ok((response, body)) }) .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())); @@ -945,29 +865,13 @@ impl InstanceEnv { let (response, body) = response_fut .await .inspect_err(|err: &reqwest::Error| { - // Report the request's failure in our metrics as either a timeout or a misc. failure, as appropriate. - if err.is_timeout() { - DB_METRICS - .procedure_num_timeout_http_requests - .with_label_values(&database_identity) - .inc(); - } else { - DB_METRICS - .procedure_num_failed_http_requests - .with_label_values(&database_identity) - .inc(); - } + record_http_error_metrics(&database_identity, err); }) - // `response_fut` returns a `reqwest::Error`, which may contain the full URL including query params. - // Strip them out to clean the error of potentially sensitive info. .map_err(strip_query_params_from_reqwest_error) .map_err(http_error)?; - // Transform the `http::Response` into our `spacetimedb_lib::http::Response` type, - // which has a stable BSATN encoding to pass across the WASM boundary. let response = convert_http_response(response); - // Record the response size in bytes. DB_METRICS .procedure_http_response_size_bytes .with_label_values(&database_identity) @@ -978,6 +882,202 @@ impl InstanceEnv { } } +/// State for an in-progress streaming HTTP response. +/// +/// The background tokio task reads chunks from the response body +/// and sends them through a bounded channel. The procedure reads +/// chunks one at a time via [`InstanceEnv::http_stream_next`]. +pub struct HttpStreamState { + pub receiver: tokio::sync::mpsc::Receiver>, + pub abort_handle: tokio::task::AbortHandle, +} + +impl Drop for HttpStreamState { + fn drop(&mut self) { + self.abort_handle.abort(); + } +} + +impl InstanceEnv { + /// Initiate a streaming HTTP request. + /// + /// Returns a future that resolves to the response metadata and a chunk receiver. + /// The response body is read in the background and sent through a bounded channel. + /// + /// Security: reuses the same IP filtering, DNS resolution, timeout clamping, + /// and redirect policy as [`InstanceEnv::http_request`]. + pub fn http_stream_open( + &mut self, + request: st_http::Request, + body: bytes::Bytes, + ) -> Result< + impl Future< + Output = Result< + ( + st_http::Response, + tokio::sync::mpsc::Receiver>, + tokio::task::AbortHandle, + ), + NodesError, + >, + > + use<>, + NodesError, + > { + if self.in_tx() { + return Err(NodesError::WouldBlockTransaction(super::AbiCall::ProcedureHttpStreamOpen)); + } + + DB_METRICS + .procedure_num_http_requests + .with_label_values(self.database_identity()) + .inc(); + DB_METRICS + .procedure_http_request_size_bytes + .with_label_values(self.database_identity()) + .inc_by((request.size_in_bytes() + body.len()) as _); + // Create the in-progress metric guard. Moved into the async block so it + // lives until the response headers are received and the chunk reader is spawned. + let in_progress_metric = DB_METRICS + .procedure_num_in_progress_http_requests + .with_label_values(self.database_identity()) + .inc_scope(); + + let (client, reqwest_req) = prepare_http_request(request, body)?; + let execute_fut = client.execute(reqwest_req); + + let database_identity = *self.database_identity(); + + Ok(async move { + // Keep the in-progress metric alive until the response headers arrive + // and the background chunk reader is spawned. + let _in_progress_metric = in_progress_metric; + + let response = tokio::spawn(execute_fut) + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())) + .await + .inspect_err(|err: &reqwest::Error| { + record_http_error_metrics(&database_identity, err); + }) + .map_err(strip_query_params_from_reqwest_error) + .map_err(http_error)?; + + let (parts, body) = http::Response::from(response).into_parts(); + let response = convert_http_response(parts); + + // Spawn a background task that reads chunks from the response body + // and sends them through a bounded channel. + let (tx, rx) = tokio::sync::mpsc::channel::>(8); + let task = tokio::spawn(async move { + use http_body_util::BodyExt; + let mut body = std::pin::pin!(body); + loop { + match body.frame().await { + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + if !data.is_empty() { + if tx.send(Ok(data)).await.is_err() { + // Receiver dropped — procedure closed the stream. + break; + } + } + } + // Non-data frames (trailers) are skipped. + } + Some(Err(err)) => { + let _ = tx.send(Err(err.to_string())).await; + break; + } + None => { + // Body stream ended. + break; + } + } + } + }); + + Ok((response, rx, task.abort_handle())) + }) + } +} + +/// Strip the query part out of the URL in `err`, as query parameters may be sensitive +/// and we'd like it to be safe to directly log errors from HTTP methods. +fn strip_query_params_from_reqwest_error(mut err: reqwest::Error) -> reqwest::Error { + if let Some(url) = err.url_mut() { + url.set_query(None); + } + err +} + +/// Convert a `std::error::Error` into a `NodesError::HttpError`, +/// preserving the full error chain for debugging. +fn http_error(err: E) -> NodesError { + use std::fmt::Write; + let mut message = err.to_string(); + let mut source = err.source(); + while let Some(cause) = source { + write!(message, ": {cause}").unwrap(); + source = cause.source(); + } + NodesError::HttpError(message) +} + +/// Prepare an HTTP request for execution: convert from our type to reqwest, +/// apply timeout clamping, IP filtering, DNS filtering, and redirect policy. +/// +/// Returns the `reqwest::Client` and `reqwest::Request` ready to execute. +fn prepare_http_request( + request: st_http::Request, + body: bytes::Bytes, +) -> Result<(reqwest::Client, reqwest::Request), NodesError> { + let (request_parts, timeout) = convert_http_request(request).map_err(http_error)?; + let request = http::Request::from_parts(request_parts, body); + + let mut reqwest_req: reqwest::Request = request + .try_into() + .map_err(strip_query_params_from_reqwest_error) + .map_err(http_error)?; + + let timeout = timeout.unwrap_or(HTTP_DEFAULT_TIMEOUT).min(HTTP_MAX_TIMEOUT); + *reqwest_req.timeout_mut() = Some(timeout); + + if is_blocked_ip_literal(reqwest_req.url()) { + return Err(NodesError::HttpError(BLOCKED_HTTP_ADDRESS_ERROR.to_string())); + } + + let redirect_policy = reqwest::redirect::Policy::custom(|attempt| { + if is_blocked_ip_literal(attempt.url()) { + attempt.error(BLOCKED_HTTP_ADDRESS_ERROR) + } else { + reqwest::redirect::Policy::default().redirect(attempt) + } + }); + + // TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call. + let client = reqwest::Client::builder() + .dns_resolver(Arc::new(FilteredDnsResolver)) + .redirect(redirect_policy) + .build() + .map_err(http_error)?; + + Ok((client, reqwest_req)) +} + +/// Record error metrics for a failed HTTP request. +fn record_http_error_metrics(database_identity: &Identity, err: &reqwest::Error) { + if err.is_timeout() { + DB_METRICS + .procedure_num_timeout_http_requests + .with_label_values(database_identity) + .inc(); + } else { + DB_METRICS + .procedure_num_failed_http_requests + .with_label_values(database_identity) + .inc(); + } +} + /// Default timeout for HTTP requests performed by [`InstanceEnv::http_request`]. /// /// Applied when the module does not specify a timeout. @@ -2180,4 +2280,77 @@ mod test { assert_eq!(0, tx.metrics.bytes_sent_to_clients); Ok(()) } + + #[test] + fn http_request_blocked_during_tx() -> Result<()> { + let db = relational_db()?; + let (mut env, _rt) = instance_env(db)?; + + env.start_mutable_tx()?; + + let request = spacetimedb_lib::http::Request { + method: spacetimedb_lib::http::Method::Get, + headers: std::iter::empty::<(Option>, Box<[u8]>)>().collect(), + timeout: None, + uri: "http://example.com".into(), + version: spacetimedb_lib::http::Version::Http11, + }; + let result = env.http_request(request, bytes::Bytes::new()); + assert!( + matches!(&result, Err(NodesError::WouldBlockTransaction(crate::host::AbiCall::ProcedureHttpRequest))), + "expected WouldBlockTransaction(ProcedureHttpRequest)" + ); + + env.abort_mutable_tx()?; + Ok(()) + } + + #[test] + fn http_stream_open_blocked_during_tx() -> Result<()> { + let db = relational_db()?; + let (mut env, _rt) = instance_env(db)?; + + env.start_mutable_tx()?; + + let request = spacetimedb_lib::http::Request { + method: spacetimedb_lib::http::Method::Get, + headers: std::iter::empty::<(Option>, Box<[u8]>)>().collect(), + timeout: None, + uri: "http://example.com".into(), + version: spacetimedb_lib::http::Version::Http11, + }; + let result = env.http_stream_open(request, bytes::Bytes::new()); + assert!( + matches!(&result, Err(NodesError::WouldBlockTransaction(crate::host::AbiCall::ProcedureHttpStreamOpen))), + "expected WouldBlockTransaction(ProcedureHttpStreamOpen)" + ); + + env.abort_mutable_tx()?; + Ok(()) + } + + #[tokio::test] + async fn http_stream_state_drop_aborts_background_task() { + let (tx, rx) = tokio::sync::mpsc::channel::>(8); + let task = tokio::spawn(async move { + // Hold the sender open and block indefinitely. + let _tx = tx; + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + }); + let abort_handle = task.abort_handle(); + + let state = HttpStreamState { + receiver: rx, + abort_handle, + }; + + // Dropping the state should abort the background task via the Drop impl. + drop(state); + + let result = task.await; + assert!( + result.unwrap_err().is_cancelled(), + "background task should have been cancelled by HttpStreamState::drop" + ); + } } diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 0daa9c359bc..db3bf8a4d38 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -194,4 +194,7 @@ pub enum AbiCall { ProcedureCommitMutTransaction, ProcedureAbortMutTransaction, ProcedureHttpRequest, + ProcedureHttpStreamOpen, + ProcedureHttpStreamNext, + ProcedureHttpStreamClose, } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index b611fcaef26..443d5a37d7e 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -25,7 +25,7 @@ use crate::host::wasm_common::module_host_actor::{ InstanceOp, ProcedureExecuteResult, ProcedureOp, ReducerExecuteResult, ReducerOp, ViewExecuteResult, ViewOp, WasmInstance, }; -use crate::host::wasm_common::{RowIters, TimingSpanSet}; +use crate::host::wasm_common::{HttpStreams, RowIters, TimingSpanSet}; use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; @@ -215,6 +215,9 @@ struct JsInstanceEnv { /// The slab of `BufferIters` created for this instance. iters: RowIters, + /// The slab of streaming HTTP response handles. + http_streams: HttpStreams, + /// Track time spent in module-defined spans. timing_spans: TimingSpanSet, @@ -237,6 +240,7 @@ impl JsInstanceEnv { module_def: None, call_times: CallTimes::new(), iters: <_>::default(), + http_streams: <_>::default(), chunk_pool: <_>::default(), timing_spans: <_>::default(), } diff --git a/crates/core/src/host/v8/syscall/common.rs b/crates/core/src/host/v8/syscall/common.rs index 5e96ef9a1ab..b865a318f26 100644 --- a/crates/core/src/host/v8/syscall/common.rs +++ b/crates/core/src/host/v8/syscall/common.rs @@ -19,9 +19,11 @@ use crate::host::instance_env::InstanceEnv; use crate::host::wasm_common::module_host_actor::{ deserialize_view_rows, run_query_for_view, AnonymousViewOp, ProcedureOp, ViewOp, ViewResult, ViewReturnData, }; -use crate::host::wasm_common::{RowIterIdx, TimingSpan, TimingSpanIdx}; +use crate::host::instance_env::HttpStreamState; +use crate::host::wasm_common::{HttpStreamIdx, RowIterIdx, TimingSpan, TimingSpanIdx}; use anyhow::Context; use bytes::Bytes; +use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_primitives::{ColId, IndexId, ProcedureId, TableId}; @@ -585,10 +587,14 @@ pub fn identity<'scope>( /// /// Accepts a BSATN-encoded [`spacetimedb_lib::http::Request`] and a request body, and /// returns a BSATN-encoded [`spacetimedb_lib::http::Response`] and the response body. -pub fn procedure_http_request<'scope>( +/// Deserialize an HTTP request and body from V8 syscall arguments. +/// +/// `args.get(0)` must be a `Uint8Array` containing the BSATN-encoded request. +/// `args.get(1)` must be a `Uint8Array` or `string` containing the request body. +fn deserialize_http_request_args<'scope>( scope: &mut PinScope<'scope, '_>, - args: FunctionCallbackArguments<'scope>, -) -> SysCallResult> { + args: &FunctionCallbackArguments<'scope>, +) -> SysCallResult<(spacetimedb_lib::http::Request, Bytes)> { use spacetimedb_lib::http as st_http; let request = @@ -611,6 +617,15 @@ pub fn procedure_http_request<'scope>( Bytes::copy_from_slice(bytes.get_contents(&mut [])) }; + Ok((request, request_body)) +} + +pub fn procedure_http_request<'scope>( + scope: &mut PinScope<'scope, '_>, + args: FunctionCallbackArguments<'scope>, +) -> SysCallResult> { + let (request, request_body) = deserialize_http_request_args(scope, &args)?; + let env = get_env(scope)?; let fut = env.instance_env.http_request(request, request_body)?; @@ -847,3 +862,129 @@ fn call_view( ErrorOrException::Exception(exc) => exc.into(), }) } + +/// # Signature +/// +/// ```ignore +/// function procedure_http_stream_open( +/// request: Uint8Array, +/// body: Uint8Array | string +/// ): [handle: number, response: Uint8Array]; +/// ``` +/// +/// Initiates a streaming HTTP request. Returns a handle for reading chunks +/// and the BSATN-encoded response metadata (status, headers). +pub fn procedure_http_stream_open<'scope>( + scope: &mut PinScope<'scope, '_>, + args: FunctionCallbackArguments<'scope>, +) -> SysCallResult> { + let (request, request_body) = deserialize_http_request_args(scope, &args)?; + + let env = get_env(scope)?; + + let fut = env.instance_env.http_stream_open(request, request_body)?; + + let rt = tokio::runtime::Handle::current(); + let (response, receiver, abort_handle) = rt.block_on(fut)?; + + // Record the response header size; body bytes are recorded per-chunk in stream_next. + DB_METRICS + .procedure_http_response_size_bytes + .with_label_values(env.instance_env.database_identity()) + .inc_by(response.size_in_bytes() as _); + + let handle = env.http_streams.insert(HttpStreamState { receiver, abort_handle }); + + let response = bsatn::to_vec(&response).expect("failed to serialize `HttpResponse`"); + let response = make_uint8array(scope, response); + let handle_val = v8::Integer::new_from_unsigned(scope, handle.0); + + Ok(v8::Array::new_with_elements( + scope, + &[handle_val.into(), response.into()], + )) +} + +/// # Signature +/// +/// ```ignore +/// function procedure_http_stream_next( +/// handle: number, +/// ): Uint8Array | null; +/// ``` +/// +/// Reads the next chunk from a streaming HTTP response. +/// Returns `null` when the stream is exhausted. +/// +/// **Threading note:** This blocks the V8 worker thread until a chunk arrives. +/// Since there is one V8 worker thread per module instance, all other reducers +/// and procedures for this instance are blocked while waiting. A slow remote +/// server will stall the entire module instance per chunk. +/// +/// **Transaction guard:** Unlike `http_request` and `http_stream_open`, whose +/// guards live in [`InstanceEnv`], the `in_tx()` check here is at the syscall +/// layer because `InstanceEnv` does not own the stream handles — they live in +/// [`JsInstanceEnv`]. +pub fn procedure_http_stream_next<'scope>( + scope: &mut PinScope<'scope, '_>, + args: FunctionCallbackArguments<'scope>, +) -> SysCallResult> { + let handle_u32: u32 = deserialize_js(scope, args.get(0))?; + let handle = HttpStreamIdx(handle_u32); + + let env = get_env(scope)?; + + // Refuse to block on a stream chunk while holding a mutable transaction open. + if env.instance_env.in_tx() { + return Err(NodesError::WouldBlockTransaction(crate::host::AbiCall::ProcedureHttpStreamNext).into()); + } + + let stream = env.http_streams.get_mut(handle).ok_or(SysCallError::NO_SUCH_ITER)?; + + let rt = tokio::runtime::Handle::current(); + let chunk = rt.block_on(stream.receiver.recv()); + + match chunk { + Some(Ok(bytes)) => { + DB_METRICS + .procedure_http_response_size_bytes + .with_label_values(env.instance_env.database_identity()) + .inc_by(bytes.len() as _); + let arr = match bytes.try_into_mut() { + Ok(bytes_mut) => make_uint8array(scope, Box::new(bytes_mut)), + Err(bytes) => make_uint8array(scope, Vec::from(bytes)), + }; + Ok(arr.into()) + } + Some(Err(err)) => { + // Stream error — clean up and throw. + env.http_streams.take(handle); + Err(TypeError(format!("http stream error: {err}")).throw(scope).into()) + } + None => { + // Stream ended — clean up the handle. + env.http_streams.take(handle); + Ok(v8::null(scope).into()) + } + } +} + +/// # Signature +/// +/// ```ignore +/// function procedure_http_stream_close(handle: number): void; +/// ``` +/// +/// Closes a streaming HTTP response handle, canceling the background reader. +pub fn procedure_http_stream_close( + scope: &mut PinScope<'_, '_>, + args: FunctionCallbackArguments<'_>, +) -> SysCallResult<()> { + let handle_u32: u32 = deserialize_js(scope, args.get(0))?; + let handle = HttpStreamIdx(handle_u32); + + let env = get_env(scope)?; + // `HttpStreamState::drop` aborts the background reader task. + env.http_streams.take(handle); + Ok(()) +} diff --git a/crates/core/src/host/v8/syscall/v2.rs b/crates/core/src/host/v8/syscall/v2.rs index 5f3f6ed3edb..4ffd9e85e5f 100644 --- a/crates/core/src/host/v8/syscall/v2.rs +++ b/crates/core/src/host/v8/syscall/v2.rs @@ -17,6 +17,7 @@ use super::common::{ console_log, console_timer_end, console_timer_start, datastore_index_scan_range_bsatn_inner, datastore_table_row_count, datastore_table_scan_bsatn, deserialize_row_iter_idx, get_env, identity, index_id_from_name, procedure_abort_mut_tx, procedure_commit_mut_tx, procedure_http_request, + procedure_http_stream_close, procedure_http_stream_next, procedure_http_stream_open, procedure_start_mut_tx, row_iter_bsatn_close, table_id_from_name, volatile_nonatomic_schedule_immediate, }; use super::hooks::get_hook_function; @@ -132,6 +133,9 @@ pub(super) fn sys_v2_0<'scope>(scope: &mut PinScope<'scope, '_>) -> Local<'scope (with_sys_result, AbiCall::Identity, identity), (with_sys_result, AbiCall::GetJwt, get_jwt_payload), (with_sys_result, AbiCall::ProcedureHttpRequest, procedure_http_request), + (with_sys_result, AbiCall::ProcedureHttpStreamOpen, procedure_http_stream_open), + (with_sys_result, AbiCall::ProcedureHttpStreamNext, procedure_http_stream_next), + (with_sys_result, AbiCall::ProcedureHttpStreamClose, procedure_http_stream_close), ( with_sys_result, AbiCall::ProcedureStartMutTransaction, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index a5c737d54d6..7f3b2043ebe 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -326,6 +326,9 @@ impl ResourceSlab { decl_index!(RowIterIdx => std::vec::IntoIter>); pub(super) type RowIters = ResourceSlab; +decl_index!(HttpStreamIdx => crate::host::instance_env::HttpStreamState); +pub(super) type HttpStreams = ResourceSlab; + pub(crate) struct TimingSpan { pub start: Instant, pub name: String, diff --git a/crates/smoketests/tests/smoketests/http_streaming.rs b/crates/smoketests/tests/smoketests/http_streaming.rs new file mode 100644 index 00000000000..cf21d7faa7c --- /dev/null +++ b/crates/smoketests/tests/smoketests/http_streaming.rs @@ -0,0 +1,192 @@ +use std::io::{Read, Write}; +use std::net::TcpListener; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +use spacetimedb_smoketests::Smoketest; + +/// Spawn a test HTTP server that responds with chunked data and custom headers. +/// Accepts up to `max_connections` requests then exits. +fn spawn_chunked_server(max_connections: usize) -> (u16, JoinHandle>) { + let listener = TcpListener::bind(("127.0.0.1", 0)).expect("failed to bind test server"); + listener + .set_nonblocking(true) + .expect("failed to set test server nonblocking mode"); + let port = listener + .local_addr() + .expect("failed to read test server address") + .port(); + + let handle = std::thread::spawn(move || -> std::io::Result<()> { + let deadline = Instant::now() + Duration::from_secs(30); + for _ in 0..max_connections { + let (mut stream, _) = loop { + match listener.accept() { + Ok(pair) => break pair, + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + if Instant::now() >= deadline { + // All expected connections may not arrive (e.g. if a test fails early). + return Ok(()); + } + std::thread::sleep(Duration::from_millis(10)); + } + Err(err) => return Err(err), + } + }; + + // Drain the request. + let mut buf = [0u8; 4096]; + stream.set_read_timeout(Some(Duration::from_millis(200))).ok(); + let _ = stream.read(&mut buf); + + // Respond with chunked encoding and custom headers. + stream.write_all( + b"HTTP/1.1 200 OK\r\n\ + Content-Type: text/event-stream\r\n\ + X-Test-Header: hello-world\r\n\ + Transfer-Encoding: chunked\r\n\ + Connection: close\r\n\r\n", + )?; + + // Send three chunks. + for chunk in &[b"AAA" as &[u8], b"BBB", b"CCC"] { + write!(stream, "{:x}\r\n", chunk.len())?; + stream.write_all(chunk)?; + stream.write_all(b"\r\n")?; + } + // Terminating chunk. + stream.write_all(b"0\r\n\r\n")?; + stream.flush()?; + } + Ok(()) + }); + + (port, handle) +} + +fn ts_module_code(port: u16) -> String { + format!( + r#"import {{ schema, t, table }} from "spacetimedb/server"; + +const spacetimedb = schema({{}}); +export default spacetimedb; + +// --- Test 1: basic streaming read --- +export const stream_read = spacetimedb.procedure( + {{}}, + t.string(), + (ctx) => {{ + const resp = ctx.http.fetchStreaming("http://127.0.0.1:{port}/stream"); + const decoder = new TextDecoder(); + let body = ""; + for (const chunk of resp) {{ + body += decoder.decode(chunk); + }} + return body; + }} +); + +// --- Test 2: stream_next inside a transaction must throw --- +export const stream_next_blocked_in_tx = spacetimedb.procedure( + {{}}, + t.string(), + (ctx) => {{ + // Open the stream *outside* the transaction — this must succeed. + const resp = ctx.http.fetchStreaming("http://127.0.0.1:{port}/stream"); + try {{ + ctx.withTx(_tx => {{ + // Iterating calls procedure_http_stream_next which should throw + // WouldBlockTransaction because a mutable tx is open. + for (const _chunk of resp) {{ + return "ERROR: stream.next() inside tx should have thrown"; + }} + }}); + return "ERROR: withTx should have thrown"; + }} catch (e: any) {{ + // We expect a WouldBlockTransaction error. + return "blocked"; + }} finally {{ + resp[Symbol.dispose](); + }} + }} +); + +// --- Test 3: streaming response headers are preserved --- +export const stream_headers = spacetimedb.procedure( + {{}}, + t.string(), + (ctx) => {{ + const resp = ctx.http.fetchStreaming("http://127.0.0.1:{port}/stream"); + try {{ + const ct = resp.headers.get("content-type") ?? "MISSING"; + const xh = resp.headers.get("x-test-header") ?? "MISSING"; + return ct + "|" + xh; + }} finally {{ + resp[Symbol.dispose](); + }} + }} +); +"# + ) +} + +/// Test that streaming HTTP responses can be read chunk by chunk, +/// that iterating a stream inside a transaction is blocked, +/// and that response headers are preserved. +/// +/// Requires the server to be built with `allow_loopback_http_for_tests`. +#[test] +fn test_http_streaming() { + spacetimedb_smoketests::require_pnpm!(); + + // 3 connections: one per procedure call. + let (port, server) = spawn_chunked_server(3); + + let mut test = Smoketest::builder().autopublish(false).build(); + test.publish_typescript_module_source("http-streaming", "http-streaming", &ts_module_code(port)) + .unwrap(); + + // Test 1: basic streaming read — all chunks concatenated. + let output = test.call_output("stream_read", &[]); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "stream_read failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stdout.contains("AAABBBCCC"), + "expected concatenated chunks AAABBBCCC in output, got:\n{stdout}" + ); + + // Test 2: stream_next inside a transaction must throw. + let output = test.call_output("stream_next_blocked_in_tx", &[]); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "stream_next_blocked_in_tx failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stdout.contains("blocked"), + "expected procedure to catch WouldBlockTransaction, got:\n{stdout}" + ); + + // Test 3: response headers are preserved. + let output = test.call_output("stream_headers", &[]); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "stream_headers failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stdout.contains("text/event-stream") && stdout.contains("hello-world"), + "expected response headers in output, got:\n{stdout}" + ); + + server + .join() + .expect("test server thread panicked") + .expect("test server failed"); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index f5053652dd3..b03231c4ec8 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -19,6 +19,7 @@ mod domains; mod fail_initial_publish; mod filtering; mod http_egress; +mod http_streaming; mod logs_level_filter; mod module_nested_op; mod modules; From 139e603d5ccb57733ce3ed792e2979d1d9db0829 Mon Sep 17 00:00:00 2001 From: Philippe Tremblay Date: Sun, 22 Mar 2026 21:03:29 -0400 Subject: [PATCH 3/3] fix: construct StreamHandle before deserializing response Ensures the stream handle is registered with FinalizationRegistry before any code that could throw, preventing a handle leak if HttpResponse.deserialize() ever fails. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/bindings-typescript/src/server/http_internal.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bindings-typescript/src/server/http_internal.ts b/crates/bindings-typescript/src/server/http_internal.ts index 43caf7e3716..61bc43f7b3f 100644 --- a/crates/bindings-typescript/src/server/http_internal.ts +++ b/crates/bindings-typescript/src/server/http_internal.ts @@ -286,8 +286,8 @@ function fetchStreaming(url: URL | string, init: RequestOptions = {}): Streaming serializeRequest(request), body ); - const response = HttpResponse.deserialize(new BinaryReader(responseBuf)); const stream = new StreamHandle(handle); + const response = HttpResponse.deserialize(new BinaryReader(responseBuf)); const code = response.code; const responseHeaders = deserializeHeaders(response.headers);