Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 285 additions & 36 deletions README.md

Large diffs are not rendered by default.

82 changes: 42 additions & 40 deletions src/CBORDecoderStream.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,50 @@
import { Decoder } from "./decodeAsyncIterable.js"
import { DecodeOptions } from "./options.js"
import { CBORValue } from "./types.js"
import { Decoder, type AsyncDecodeOptions } from "./decodeAsyncIterable.js"
import { createTransformWithBackpressure } from "./utils.js"
import type { WithRequired, Flatten, NoInfer } from "./utils.js"
import type { CBORValue } from "./types.js"

/** Decode a Web Streams API ReadableStream */
export class CBORDecoderStream<T extends CBORValue = CBORValue> extends TransformStream<Uint8Array, T> {
constructor(options: DecodeOptions = {}) {
let readableController: ReadableStreamDefaultController<Uint8Array>

const readable = new ReadableStream<Uint8Array>({
start(controller) {
readableController = controller
export class CBORDecoderStream<T = CBORValue> {
readable!: ReadableStream<T>
writable!: WritableStream<Uint8Array>

constructor(...[options = {}]: T extends CBORValue
? []|[AsyncDecodeOptions]
: [WithRequired<AsyncDecodeOptions<Flatten<NoInfer<T>>>, "onValue">]
) {
let transformResolve: (() => void) | null = null
const { writable, readable: readable_ } = new TransformStream<Uint8Array>(
{
transform(chunk, controller) {
return new Promise<void>((resolve) => {
transformResolve = () => {
transformResolve = null
resolve()
}
controller.enqueue(chunk)
})
}
},
})

// We need to track whick chunks have been "processed" and only resolve each
// .transform() promise once all data from each chunk has been enqueued.
const chunks = new WeakMap<Uint8Array, { resolve: () => void }>()

async function pipe(controller: TransformStreamDefaultController<T>) {
const decoder = new Decoder<T>(readable.values(), {
...options,
onFree: (chunk) => chunks.get(chunk)?.resolve(),
})

for await (const value of decoder) {
controller.enqueue(value)
{ highWaterMark: 1 },
{ highWaterMark: 1 }
)

const { readable, writable: writable_ } = createTransformWithBackpressure<undefined, T>(
async function pipe(_, enqueue) {
const decoder = new Decoder(readable_.values(), {
...options,
onPull: () => transformResolve?.()
} as AsyncDecodeOptions)
for await (const value of decoder) {
await enqueue(value as T)
}
writer.close() // Close stream
}
}
)

super({
start(controller) {
pipe(controller).catch((err) => controller.error(err))
},

transform(chunk) {
return new Promise<void>((resolve) => {
chunks.set(chunk, { resolve })
readableController.enqueue(chunk)
})
},
const writer = writable_.getWriter()
writer.write(undefined) // Jump-start stream

flush() {
readableController.close()
},
})
return { readable, writable }
}
}
37 changes: 22 additions & 15 deletions src/CBOREncoderStream.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
import { Encoder } from "./Encoder.js"
import { CBORValue } from "./types.js"
import { EncodeOptions } from "./options.js"
import { createTransformWithBackpressure } from "./utils.js"
import type { CBORValue } from "./types.js"
import type { EncodeOptions } from "./options.js"
import type { Flatten, WithRequired, NoInfer } from "./utils.js"

/**
* Encode a Web Streams API ReadableStream.
* options.chunkRecycling has no effect here.
*/
export class CBOREncoderStream extends TransformStream<CBORValue, Uint8Array> {
constructor(options: EncodeOptions = {}) {
const encoder = new Encoder({ ...options, chunkRecycling: false })
export class CBOREncoderStream<T = CBORValue> {
readable!: ReadableStream<Uint8Array>
writable!: WritableStream<T>

super({
transform(value: CBORValue, controller: TransformStreamDefaultController<Uint8Array>) {
constructor(...[options = {}]: T extends CBORValue
? []|[EncodeOptions]
: [WithRequired<EncodeOptions<Flatten<NoInfer<T>>>, "onValue">]
) {
const encoder = new Encoder({ ...options, chunkRecycling: false } as EncodeOptions)

return createTransformWithBackpressure<T, Uint8Array>(
async (value, enqueue) => {
// Encode the incoming value and push all resulting chunks
for (const chunk of encoder.encodeValue(value)) {
controller.enqueue(chunk)
for (const chunk of encoder.encodeValue(value as CBORValue)) {
await enqueue(chunk)
}
},

flush(controller: TransformStreamDefaultController<Uint8Array>) {
// Push any remaining chunks when the stream is closing
async (enqueue) => {
// Flush any remaining chunks
for (const chunk of encoder.flush()) {
controller.enqueue(chunk)
await enqueue(chunk)
}
},
})
}
)
}
}
Loading