A powerful abstraction layer for streaming text completions from multiple AI providers through a single unified toolkit.
@bnk/ai (AI Streaming Engine) provides a flexible, plugin-based abstraction for streaming AI-generated text from multiple providers such as OpenAI, Anthropic, Ollama, etc. The goal is for your application to integrate once, then swap or combine providers at will without modifying your client-side streaming logic.
Key features:
- Unified streaming abstraction: use one engine in your client code to handle partial message streaming, done signals, and error handling.
- Plugin-based: each AI provider is implemented as a plugin that knows how to prepare the request and parse the stream.
- Simple SSE-based interface: your app interacts with the engine through a set of handlers (e.g.
onPartial,onDone). - Extensible: easily add new providers by implementing a
ProviderPlugin.
-
Your application calls
createSSEStream(params)with:userMessageand optionally asystemMessage- The desired plugin (provider)
- Options you want to pass to the provider (model, temperature, etc.)
- Handlers for partial chunks, done signal, errors, etc.
-
Inside
createSSEStream:- The plugin’s
prepareRequest()method is called, returning a readable stream or reader containing the SSE data from the AI provider. - The engine reads SSE lines, calls the plugin’s
parseServerSentEvent()to extract text chunks, and forwards them to your handlers (onPartial,onDone, etc.). - The engine also returns a
ReadableStream<Uint8Array>which you could pipe back to your clients if desired (for real-time text updates in a browser).
- The plugin’s
-
Plugins (e.g.
OpenAiLikePlugin,AnthropicPlugin) each implement:interface ProviderPlugin { prepareRequest(params: SSEEngineParams): Promise<ReadableStream<Uint8Array> | ReadableStreamDefaultReader<Uint8Array>>; parseServerSentEvent(line: string): string | null; }
prepareRequest()handles how to call the provider’s API and get back a streaming SSE.parseServerSentEvent()extracts only the text from each SSE chunk, returning[DONE]ornullwhen appropriate.
-
Install the package (private or local reference as appropriate):
# e.g. if you're using bun or npm bun add @bnk/ai # or npm install @bnk/ai
-
Import the engine and plugin(s) you want:
import { createSSEStream, OpenAiLikePlugin } from '@bnk/ai'; // Initialize provider const plugin = new OpenAiLikePlugin(myOpenAiClient, 'gpt-4');
-
Create the stream and provide handlers:
const userMessage = "Explain the concept of neural networks."; const handlers = { onPartial: (chunk) => { console.log("Partial chunk:", chunk.content); }, onDone: (final) => { console.log("All done:", final.content); }, onError: (error) => { console.error("Stream error:", error); }, }; const stream = await createSSEStream({ userMessage, plugin, // e.g. OpenAiLikePlugin handlers, options: { model: "gpt-4", temperature: 0.5 }, // any provider-specific options }); // You can also return or pipe this ReadableStream back to your client // for in-browser SSE consumption, or handle it server-side.
Creates a new SSE stream. The SSEEngineParams object includes:
userMessage: string– the user’s promptsystemMessage?: string– an optional system-level instructionplugin: ProviderPlugin– the provider plugin handling the requestoptions?: Record<string, any>– any provider-specific options (model, temperature, etc.)handlers: SSEEngineHandlers– callbacks for partial, done, error, etc.
Example usage:
await createSSEStream({
userMessage: "Hello world",
systemMessage: "You are a helpful assistant",
plugin: myPlugin,
options: { model: "myModel", temperature: 0.9 },
handlers: {
onPartial: (chunk) => {
console.log("Partial:", chunk.content);
},
onDone: (message) => {
console.log("Complete:", message.content);
},
},
});interface SSEEngineHandlers {
onSystemMessage?: (message: SSEMessage) => void;
onUserMessage?: (message: SSEMessage) => void;
onPartial?: (partial: SSEMessage) => void;
onDone?: (fullContent: SSEMessage) => void;
onError?: (error: unknown, partialSoFar: SSEMessage) => void;
}onSystemMessage: Invoked once if a system message is provided.onUserMessage: Invoked for the user's message.onPartial: Invoked for each streamed chunk of assistant text.onDone: Final callback when the stream is complete or[DONE]is encountered.onError: If an error occurs, you get the error plus the text accumulated so far.
To add a new provider, implement:
import { ProviderPlugin } from "@bnk/ai";
export class MyProviderPlugin implements ProviderPlugin {
async prepareRequest(params: SSEEngineParams) {
// 1) Call your provider's SSE or streaming API
// 2) Return a ReadableStream or the reader
}
parseServerSentEvent(line: string): string | null {
// Convert SSE line to text chunk or "[DONE]" if the stream ended
}
}Then pass an instance to the engine via plugin: new MyProviderPlugin(...).
- Calls OpenAI’s streaming Chat Completions API.
- Returns a stream of SSE lines that the engine processes.
- Calls Anthropic’s API.
- Parses its SSE format to gather partial text.
You can build your own higher-level logic on top of this engine:
- Save partial responses to a database
- Stream updates to websockets or SSE endpoints
- Provide multiple fallback plugins (e.g., if one fails, switch to another)
src/– main code for the streaming engine, plugins, and typessrc/streaming-engine.ts– corecreateSSEStreamfunctionsrc/streaming-types.ts– shared SSE engine types (SSEEngineParams,SSEMessage, etc.)src/plugins/– plugin implementations for different AI providerspackage.json– package metadata
- Install dependencies:
bun install # or npm install - Build or run tests:
bun test
Contributions are welcome! To add a new provider:
- Create a new plugin in
src/plugins/. - Implement
prepareRequest()andparseServerSentEvent(). - Export it in
src/index.ts.
MIT License (or appropriate license text here)