Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 128 additions & 7 deletions crates/bindings-typescript/src/server/http_internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ export interface ResponseInit {
const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder('utf-8' /* { fatal: true } */);

function deserializeHeaders(headers: HttpHeaders): Headers {
return new Headers(
headers.entries.map(({ name, value }): [string, string] => [name, textDecoder.decode(value)])
);
}

const makeResponse = Symbol('makeResponse');

// based on deno's type of the same name
Expand Down Expand Up @@ -132,8 +138,43 @@ export interface RequestOptions {
// redirect?: RequestRedirect;
}

/**
* A streaming HTTP response that yields body chunks via iteration.
*
* **Important:** Each iteration blocks the module's V8 thread until the next
* chunk arrives. Because there is one V8 thread per module instance, all other
* reducers and procedures for this database are stalled while waiting. For
* long-running streams (e.g. LLM token streaming), this means the database is
* unresponsive for the duration. Prefer streaming for large finite downloads
* or cases where you can read a few chunks and exit early.
*/
export interface StreamingResponse extends Disposable {
/** HTTP status code. */
readonly status: number;
/** HTTP status text. */
readonly statusText: string;
/** Response headers. */
readonly headers: Headers;
/** Whether the status is in the 200-299 range. */
readonly ok: boolean;
/** Iterate over response body chunks. Each iteration blocks until the next chunk arrives. */
[Symbol.iterator](): Iterator<Uint8Array>;
/** Close the underlying stream handle, canceling the background reader. */
[Symbol.dispose](): void;
}

export interface HttpClient {
fetch(url: URL | string, init?: RequestOptions): SyncResponse;
/**
* Initiate a streaming HTTP request. The response body can be iterated
* chunk by chunk.
*
* **Important:** Iterating over the response blocks the module's V8 thread
* on each chunk, stalling all other operations on this database until
* iteration finishes or the stream is disposed. See {@link StreamingResponse}
* for details.
*/
fetchStreaming(url: URL | string, init?: RequestOptions): StreamingResponse;
}

const requestBaseSize = bsatnBaseSize({ types: [] }, HttpRequest.algebraicType);
Expand All @@ -150,8 +191,8 @@ const methods = new Map<string, HttpMethod>([
['PATCH', { tag: 'Patch' }],
]);

function fetch(url: URL | string, init: RequestOptions = {}) {
const method = methods.get(init.method?.toUpperCase() ?? 'GET') ?? {
function buildRequest(url: URL | string, init: RequestOptions = {}): { request: HttpRequest; uri: string; body: Uint8Array | string } {
const method: HttpMethod = methods.get(init.method?.toUpperCase() ?? 'GET') ?? {
tag: 'Extension',
value: init.method!,
};
Expand All @@ -169,16 +210,25 @@ function fetch(url: URL | string, init: RequestOptions = {}) {
uri,
version: { tag: 'Http11' } as const,
});
const requestBuf = new BinaryWriter(requestBaseSize);
HttpRequest.serialize(requestBuf, request);
const body =
init.body == null
? new Uint8Array()
: typeof init.body === 'string'
? init.body
: new Uint8Array<ArrayBuffer>(init.body as any);
return { request, uri, body };
}

function serializeRequest(request: HttpRequest): Uint8Array {
const requestBuf = new BinaryWriter(requestBaseSize);
HttpRequest.serialize(requestBuf, request);
return requestBuf.getBuffer();
}

function fetch(url: URL | string, init: RequestOptions = {}) {
const { request, uri, body } = buildRequest(url, init);
const [responseBuf, responseBody] = sys.procedure_http_request(
requestBuf.getBuffer(),
serializeRequest(request),
body
);
const response = HttpResponse.deserialize(new BinaryReader(responseBuf));
Expand All @@ -187,11 +237,82 @@ function fetch(url: URL | string, init: RequestOptions = {}) {
url: uri,
status: response.code,
statusText: status(response.code),
headers: new Headers(),
headers: deserializeHeaders(response.headers),
aborted: false,
});
}

/** Manages the lifecycle of a streaming HTTP response handle. */
class StreamHandle implements Disposable {
#id: number | -1;

static #finalizationRegistry = new FinalizationRegistry<number>(
sys.procedure_http_stream_close
);

constructor(id: number) {
this.#id = id;
StreamHandle.#finalizationRegistry.register(this, id, this);
}

/** Read the next chunk. Returns null when the stream is exhausted. */
next(): Uint8Array | null {
if (this.#id === -1) return null;
const chunk = sys.procedure_http_stream_next(this.#id);
if (chunk === null) {
this.#detach();
}
return chunk;
}

#detach(): number {
const id = this.#id;
this.#id = -1;
StreamHandle.#finalizationRegistry.unregister(this);
return id;
}

[Symbol.dispose]() {
if (this.#id >= 0) {
const id = this.#detach();
sys.procedure_http_stream_close(id);
}
}
}

function fetchStreaming(url: URL | string, init: RequestOptions = {}): StreamingResponse {
const { request, body } = buildRequest(url, init);
const [handle, responseBuf] = sys.procedure_http_stream_open(
serializeRequest(request),
body
);
const stream = new StreamHandle(handle);
const response = HttpResponse.deserialize(new BinaryReader(responseBuf));
const code = response.code;
const responseHeaders = deserializeHeaders(response.headers);

return {
get status() { return code; },
get statusText() { return status(code) as string; },
headers: responseHeaders,
get ok() { return 200 <= code && code <= 299; },
*[Symbol.iterator]() {
try {
let chunk: Uint8Array | null;
while ((chunk = stream.next()) !== null) {
yield chunk;
}
} finally {
stream[Symbol.dispose]();
}
},
[Symbol.dispose]() {
stream[Symbol.dispose]();
},
};
}

freeze(fetch);
freeze(fetchStreaming);

export const httpClient: HttpClient = freeze({ fetch });
export const httpClient: HttpClient = freeze({ fetch, fetchStreaming });
13 changes: 13 additions & 0 deletions crates/bindings-typescript/src/server/sys.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ declare module 'spacetime:sys@2.0' {
body: Uint8Array | string
): [response: Uint8Array, body: Uint8Array];

export function procedure_http_stream_open(
request: Uint8Array,
body: Uint8Array | string
): [handle: number, response: Uint8Array];

export function procedure_http_stream_next(
handle: number
): Uint8Array | null;

export function procedure_http_stream_close(
handle: number
): void;

export function procedure_start_mut_tx(): bigint;

export function procedure_commit_mut_tx();
Expand Down
51 changes: 51 additions & 0 deletions crates/bindings-typescript/tests/http_headers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { describe, expect, test } from 'vitest';
import { BinaryReader, BinaryWriter } from '../src';
import { HttpResponse, HttpHeaders } from '../src/lib/http_types';

describe('HttpResponse header round-trip', () => {
test('headers survive BSATN serialize/deserialize', () => {
const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder('utf-8');

const original: HttpResponse = {
headers: {
entries: [
{ name: 'content-type', value: textEncoder.encode('text/event-stream') },
{ name: 'x-request-id', value: textEncoder.encode('abc-123') },
],
},
version: { tag: 'Http11' },
code: 200,
};

const writer = new BinaryWriter(256);
HttpResponse.serialize(writer, original);
const buf = writer.getBuffer();

const deserialized = HttpResponse.deserialize(new BinaryReader(buf));

expect(deserialized.code).toBe(200);
expect(deserialized.headers.entries).toHaveLength(2);

expect(deserialized.headers.entries[0].name).toBe('content-type');
expect(textDecoder.decode(deserialized.headers.entries[0].value)).toBe('text/event-stream');

expect(deserialized.headers.entries[1].name).toBe('x-request-id');
expect(textDecoder.decode(deserialized.headers.entries[1].value)).toBe('abc-123');
});

test('empty headers round-trip correctly', () => {
const original: HttpResponse = {
headers: { entries: [] },
version: { tag: 'Http11' },
code: 404,
};

const writer = new BinaryWriter(64);
HttpResponse.serialize(writer, original);
const deserialized = HttpResponse.deserialize(new BinaryReader(writer.getBuffer()));

expect(deserialized.code).toBe(404);
expect(deserialized.headers.entries).toHaveLength(0);
});
});
Loading