Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tiny-houses-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@adobe/alloy": patch
---

Fixed a bug where in Safari the Brand Concierge streams were not parsed
16 changes: 13 additions & 3 deletions packages/core/src/components/BrandConcierge/createStreamParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export default () => {

/**
* Parse SSE stream using callbacks.
* Uses modern async iteration (for await...of) for clean, performant stream processing.
* Uses ReadableStreamDefaultReader for cross-browser compatibility (Safari 11+, Firefox, Chrome).
*
* @param {ReadableStream} stream - The readable stream from fetch response
* @param {Object} callbacks - Callback functions for stream events
Expand All @@ -82,12 +82,20 @@ export default () => {
* @param {Function} callbacks.onComplete - Callback function called when stream ends
*/
return async (stream, { onEvent, onPing, onComplete }) => {
const reader = stream.getReader();
const decoder = new TextDecoder(ENCODING);
let buffer = "";

try {
for await (const chunk of stream) {
buffer += decoder.decode(chunk, { stream: true });
while (true) {
// eslint-disable-next-line no-await-in-loop
const { done, value } = await reader.read();

if (done) {
break;
}

buffer += decoder.decode(value, { stream: true });
const events = buffer.split(EVENT_SEPARATOR_REGEX);
buffer = events.pop() || "";

Expand Down Expand Up @@ -134,6 +142,8 @@ export default () => {
} catch (error) {
onEvent({ error });
onComplete();
} finally {
reader.releaseLock();
}
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export default ({ onStreamResponseCallback, streamTimeout }) => {
timedOut = true;
onStreamResponseCallback({
error: {
message: "Stream timeout: No data received within 10 seconds",
message: `Stream timeout: No data received within ${streamTimeout / 1000} seconds`,
},
});
}, streamTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ governing permissions and limitations under the License.
import { vi, beforeEach, describe, it, expect } from "vitest";
import createSendConversationEvent from "../../../../../src/components/BrandConcierge/createSendConversationEvent.js";
import flushPromiseChains from "../../../helpers/flushPromiseChains.js";
import createMockReadableStream from "./helpers/createMockReadableStream.js";

describe("createSendConversationEvent", () => {
let mockDependencies;
Expand Down Expand Up @@ -77,7 +78,7 @@ describe("createSendConversationEvent", () => {
const mockResponse = {
ok: true,
status: 200,
body: "mock-stream-body"
body: createMockReadableStream([]),
};
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);

Expand Down Expand Up @@ -117,7 +118,7 @@ describe("createSendConversationEvent", () => {
const mockResponse = {
ok: true,
status: 200,
body: "mock-stream-body"
body: createMockReadableStream([]),
};
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);

Expand Down Expand Up @@ -156,7 +157,7 @@ describe("createSendConversationEvent", () => {
const mockResponse = {
ok: true,
status: 200,
body: "mock-stream-body"
body: createMockReadableStream([]),
};
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);

Expand Down Expand Up @@ -196,17 +197,19 @@ describe("createSendConversationEvent", () => {

it("handles stream timeout when no data is received within 10 seconds", async () => {
vi.useFakeTimers();

const mockResponse = {
ok: true,
status: 200,
body: {
// Simulate an async iterator that never yields data
[Symbol.asyncIterator]: async function* () {
// Never yield anything - simulates a hanging stream
await new Promise(() => {}); // Promise that never resolves
}
}
getReader() {
return {
// Simulate a reader that never returns data (hanging stream)
read: () => new Promise(() => {}), // Promise that never resolves
releaseLock: () => {},
};
},
},
};
mockDependencies.sendConversationServiceRequest.mockResolvedValue(mockResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ governing permissions and limitations under the License.
*/
import { vi, beforeEach, describe, it, expect } from "vitest";
import createStreamParser from "../../../../../src/components/BrandConcierge/createStreamParser.js";
import flushPromiseChains from "../../../helpers/flushPromiseChains.js";
import createMockReadableStream from "./helpers/createMockReadableStream.js";

describe("createStreamParser", () => {
let streamParser;
Expand All @@ -31,176 +31,170 @@ describe("createStreamParser", () => {
});

it("parses streaming data chunks", async () => {
// Create a mock stream using async generator
async function* mockStream() {
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
yield new TextEncoder().encode('data: {"text": " World"}\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
new TextEncoder().encode('data: {"text": " World"}\n\n'),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledTimes(2);
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": " World"}' });
});

it("handles parsing errors gracefully", async () => {
// Create a mock stream that yields invalid data
async function* mockStream() {
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
yield new TextEncoder().encode('data: invalid json\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
new TextEncoder().encode("data: invalid json\n\n"),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledTimes(2);
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
expect(onEvent).toHaveBeenCalledWith({ data: 'invalid json' });
expect(onEvent).toHaveBeenCalledWith({ data: "invalid json" });
});

it("handles stream reading errors", async () => {
// Create a mock stream that throws an error
async function* mockStream() {
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
throw new Error("Stream reading failed");
}
const mockStream = createMockReadableStream([
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
new Error("Stream reading failed"),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
expect(onEvent).toHaveBeenCalledWith({ error: expect.any(Error) });
});

it("ignores empty lines and non-ping comments", async () => {
async function* mockStream() {
yield new TextEncoder().encode(': this is a comment\n\n');
yield new TextEncoder().encode('\n\n');
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode(": this is a comment\n\n"),
new TextEncoder().encode("\n\n"),
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
});

it("calls onPing callback when ping comment is received", async () => {
const mockStream = createMockReadableStream([
new TextEncoder().encode(": ping\n\n"),
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
]);

async function* mockStream() {
yield new TextEncoder().encode(': ping\n\n');
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
}

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onPing).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
});

it("handles multiple ping comments", async () => {
async function* mockStream() {
yield new TextEncoder().encode(': ping\n\n');
yield new TextEncoder().encode(': ping\n\n');
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
yield new TextEncoder().encode(': ping\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode(": ping\n\n"),
new TextEncoder().encode(": ping\n\n"),
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
new TextEncoder().encode(": ping\n\n"),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onPing).toHaveBeenCalledTimes(3);
expect(onEvent).toHaveBeenCalledTimes(1);
});

it("only treats ': ping' as ping comment", async () => {
async function* mockStream() {
yield new TextEncoder().encode(': ping\n\n'); // This IS a ping (space after colon)
yield new TextEncoder().encode(':ping\n\n'); // This is NOT (no space after colon)
yield new TextEncoder().encode(': pinging\n\n'); // This IS a ping (startsWith ': ping')
yield new TextEncoder().encode(': PING\n\n'); // This is NOT (uppercase)
}
const mockStream = createMockReadableStream([
new TextEncoder().encode(": ping\n\n"), // This IS a ping (space after colon)
new TextEncoder().encode(":ping\n\n"), // This is NOT (no space after colon)
new TextEncoder().encode(": pinging\n\n"), // This IS a ping (startsWith ': ping')
new TextEncoder().encode(": PING\n\n"), // This is NOT (uppercase)
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onPing).toHaveBeenCalledTimes(2); // ': ping' and ': pinging' count
expect(onPing).toHaveBeenCalledTimes(2); // ': ping' and ': pinging' count
expect(onEvent).toHaveBeenCalledTimes(0); // None have data fields
});

it("handles event types and IDs", async () => {
async function* mockStream() {
yield new TextEncoder().encode('event: message\ndata: {"text": "Hello"}\nid: 123\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode('event: message\ndata: {"text": "Hello"}\nid: 123\n\n'),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith({
type: 'message',
type: "message",
data: '{"text": "Hello"}',
id: '123'
id: "123",
});
});

it("handles multi-line data", async () => {
async function* mockStream() {
yield new TextEncoder().encode('data: line 1\ndata: line 2\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode("data: line 1\ndata: line 2\n\n"),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith({ data: 'line 1line 2' });
expect(onEvent).toHaveBeenCalledWith({ data: "line 1line 2" });
});

it("processes buffer remainder at end", async () => {
async function* mockStream() {
yield new TextEncoder().encode('data: {"text": "Hello"}');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode('data: {"text": "Hello"}'),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith({ data: '{"text": "Hello"}' });
});

it("calls onComplete when stream ends with data", async () => {
async function* mockStream() {
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onComplete).toHaveBeenCalledTimes(1);
});

it("calls onComplete when stream ends with empty buffer", async () => {
async function* mockStream() {
yield new TextEncoder().encode('data: {"text": "Hello"}\n\n');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode('data: {"text": "Hello"}\n\n'),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledTimes(1);
expect(onComplete).toHaveBeenCalledTimes(1);
});

it("calls onComplete when stream ends with ping", async () => {
async function* mockStream() {
yield new TextEncoder().encode(': ping');
}
const mockStream = createMockReadableStream([
new TextEncoder().encode(": ping"),
]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onPing).toHaveBeenCalledTimes(1);
expect(onComplete).toHaveBeenCalledTimes(1);
});

it("calls onComplete after onEvent when stream errors", async () => {
async function* mockStream() {
throw new Error("Stream error");
}
const mockStream = createMockReadableStream([new Error("Stream error")]);

await streamParser(mockStream(), { onEvent, onPing, onComplete });
await streamParser(mockStream, { onEvent, onPing, onComplete });

expect(onEvent).toHaveBeenCalledWith({ error: expect.any(Error) });
expect(onComplete).toHaveBeenCalledTimes(1);
Expand Down
Loading