From f7a7d21cd9f978397168d64bb0d7d7f0545dc5e8 Mon Sep 17 00:00:00 2001 From: Pranav Joshi Date: Thu, 27 Nov 2025 16:57:21 +0530 Subject: [PATCH 1/2] post voice traffic only to socket --- src/directLine.mock.ts | 12 ++++ src/directLine.test.ts | 148 +++++++++++++++++++++++++++++++++++++++++ src/directLine.ts | 67 +++++++++++++++++-- 3 files changed, 220 insertions(+), 7 deletions(-) diff --git a/src/directLine.mock.ts b/src/directLine.mock.ts index 152fba1af..540452ad7 100644 --- a/src/directLine.mock.ts +++ b/src/directLine.mock.ts @@ -11,6 +11,18 @@ const notImplemented = (): never => { throw new Error('not implemented') }; export const mockActivity = (text: string): DirectLineExport.Activity => ({ type: 'message', from: { id: 'sender' }, text }); +export const mockVoiceActivity = (): DirectLineExport.Activity => ({ + type: 'event', + from: { id: 'sender' }, + name: 'voiceLiveEvent', + value: { + voiceLiveEvent: { + type: 'type', + delta: 'base64AudioChunk' + } + } +}); + // MOCK DirectLine Server (shared state used by Observable.ajax and WebSocket mocks) interface ActivitySocket { diff --git a/src/directLine.test.ts b/src/directLine.test.ts index f280e081c..3a70ab667 100644 --- a/src/directLine.test.ts +++ b/src/directLine.test.ts @@ -243,4 +243,152 @@ describe('MockSuite', () => { expect(actualError.status).toStrictEqual(429); expect(endTime - startTime).toStrictEqual(10); }); + + test('VoiceActivityWebSocket', () => { + const voiceActivity = DirectLineMock.mockVoiceActivity(); + directline = new DirectLineExport.DirectLine({ ...services, webSocket: true }); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + let postActivityCompleted = false; + let postActivityError: any; + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(voiceActivity) + .do(() => postActivityCompleted = true) + .catch(error => { + postActivityError = error; + return Observable.empty(); + }); + yield Observable.timer(200, scheduler); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); + + // Assert that voice activity was sent successfully without errors + expect(postActivityCompleted).toBe(true); + expect(postActivityError).toBeUndefined(); + }); + + test('VoiceActivityWithoutWebSocket', () => { + const voiceActivity = DirectLineMock.mockVoiceActivity(); + directline = new DirectLineExport.DirectLine({ ...services, webSocket: false }); + + let actualError: any; + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(voiceActivity).catch(error => { + actualError = error; + return Observable.empty(); + }); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); + + expect(actualError.message).toContain('Voice activities require WebSocket to be enabled'); + }); + + test('VoiceVsTextActivityRouting', () => { + const voiceActivity = DirectLineMock.mockVoiceActivity(); + const textActivity = DirectLineMock.mockActivity('hello'); + + directline = new DirectLineExport.DirectLine({ ...services, webSocket: true }); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + let voiceCompleted = false; + let textCompleted = false; + let voiceError: any; + let textError: any; + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + + // Send text activity (should go through HTTP/Ajax) + yield directline.postActivity(textActivity) + .do(() => textCompleted = true) + .catch(error => { + textError = error; + return Observable.empty(); + }); + + yield Observable.timer(100, scheduler); + + // Send voice activity (should go through WebSocket) + yield directline.postActivity(voiceActivity) + .do(() => voiceCompleted = true) + .catch(error => { + voiceError = error; + return Observable.empty(); + }); + + yield Observable.timer(200, scheduler); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); + + // Both should complete successfully but through different paths + expect(textCompleted).toBe(true); + expect(voiceCompleted).toBe(true); + expect(textError).toBeUndefined(); + expect(voiceError).toBeUndefined(); + + // Text activity should echo back, voice activity should not + expect(actual).toContainEqual(textActivity); + expect(actual).not.toContainEqual(voiceActivity); + }); + + test('InvalidVoiceActivityStructures', () => { + const invalidStructures: DirectLineExport.Activity[] = [ + { type: 'event', from: { id: 'user' }, value: null } as any, + { type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: null } } as any, + { type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: {} } }, + { type: 'event', from: { id: 'user' }, value: { notVoice: { data: 'test' } } } as any + ]; + + directline = new DirectLineExport.DirectLine({ ...services, webSocket: true }); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + let completedCount = 0; + let errorCount = 0; + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + + // Send each invalid structure - should all go through HTTP path + for (const invalidActivity of invalidStructures) { + yield directline.postActivity(invalidActivity) + .do(() => completedCount++) + .catch(error => { + errorCount++; + return Observable.empty(); + }); + yield Observable.timer(100, scheduler); + } + + yield Observable.timer(200, scheduler); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); + + // All invalid structures should complete successfully through HTTP path + expect(completedCount).toBe(invalidStructures.length); + expect(errorCount).toBe(0); + + // All invalid structures should echo back (confirming they went through HTTP, not WebSocket) + expect(actual).toHaveLength(invalidStructures.length); + invalidStructures.forEach(invalidActivity => { + expect(actual).toContainEqual(invalidActivity); + }); + }); }); diff --git a/src/directLine.ts b/src/directLine.ts index 7cc1b15fc..be6ca859a 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -470,6 +470,7 @@ export class DirectLine implements IBotConnection { public referenceGrammarId: string; private timeout = 20 * 1000; private retries: number; + private webSocketConnection: WebSocket | null = null; private localeOnStartConversation: string; private userIdOnStartConversation: string; @@ -765,6 +766,32 @@ export class DirectLine implements IBotConnection { if (activity.type === "message" && activity.attachments && activity.attachments.length > 0) return this.postMessageWithAttachments(activity); + // if it is voice activity, send it through webSocket as voice over http is not supported in ABS. + // ABS limitation - client to server push is not being processed over web socket for text. + // Once it is implemented, we can remove this and send all traffic to the webSocket + if (this.isVoiceEventActivity(activity)) { + if (!this.webSocket) { + return Observable.throw(new Error('Voice activities require WebSocket to be enabled'), this.services.scheduler); + } + return this.checkConnection(true) + .flatMap(_ => + Observable.create((subscriber: Subscriber) => { + const envelope = { activities: [activity] }; + try { + if (!this.webSocketConnection || this.webSocketConnection.readyState !== WebSocket.OPEN) { + throw new Error('WebSocket connection not ready for voice activities'); + } + this.webSocketConnection.send(JSON.stringify(envelope)); + subscriber.next(envelope); + subscriber.complete(); + } catch (e) { + subscriber.error(e); + } + }) + ) + .catch(error => this.catchExpiredToken(error)); + } + // If we're not connected to the bot, get connected // Will throw an error if we are not connected konsole.log("postActivity", activity); @@ -786,6 +813,32 @@ export class DirectLine implements IBotConnection { .catch(error => this.catchExpiredToken(error)); } + // Until activity protocol changes for multi-modal output are ratified, this method + // identifies voice event activities using the given activity example below as payload + // to send voice chunks over activity protocol. The activity structure shown serves as + // the current solution for transmitting voice data: + // { "type": "event", "value": { "voiceLiveEvent": { "type": "response.audio.delta", "delta": "" } } } + private isVoiceEventActivity(activity: Activity) { + if (activity.type !== 'event') { + return false; + } + + if (!activity?.value || typeof activity?.value !== 'object') { + return false; + } + + const vle = activity?.value?.voiceLiveEvent; + if (!vle || typeof vle !== 'object') { + return false; + } + + if (Object.keys(vle).length === 0) { + return false; + } + + return true; + } + private postMessageWithAttachments(message: Message) { const { attachments } = message; // We clean the attachments but making sure every attachment has unique name. @@ -938,11 +991,11 @@ export class DirectLine implements IBotConnection { private observableWebSocket() { return Observable.create((subscriber: Subscriber) => { konsole.log("creating WebSocket", this.streamUrl); - const ws = new this.services.WebSocket(this.streamUrl); + this.webSocketConnection = new this.services.WebSocket(this.streamUrl); let sub: Subscription; let closed: boolean; - ws.onopen = open => { + this.webSocketConnection.onopen = open => { konsole.log("WebSocket open", open); // Chrome is pretty bad at noticing when a WebSocket connection is broken. // If we periodically ping the server with empty messages, it helps Chrome @@ -950,14 +1003,14 @@ export class DirectLine implements IBotConnection { // error, and that give us the opportunity to attempt to reconnect. sub = Observable.interval(this.timeout, this.services.scheduler).subscribe(_ => { try { - ws.send("") + this.webSocketConnection.send("") } catch(e) { konsole.log("Ping error", e); } }); } - ws.onclose = close => { + this.webSocketConnection.onclose = close => { konsole.log("WebSocket close", close); if (sub) sub.unsubscribe(); @@ -967,7 +1020,7 @@ export class DirectLine implements IBotConnection { closed = true; } - ws.onerror = error => { + this.webSocketConnection.onerror = error => { konsole.log("WebSocket error", error); if (sub) sub.unsubscribe(); @@ -977,14 +1030,14 @@ export class DirectLine implements IBotConnection { closed = true; } - ws.onmessage = message => message.data && subscriber.next(JSON.parse(message.data)); + this.webSocketConnection.onmessage = message => message.data && subscriber.next(JSON.parse(message.data)); // This is the 'unsubscribe' method, which is called when this observable is disposed. // When the WebSocket closes itself, we throw an error, and this function is eventually called. // When the observable is closed first (e.g. when tearing down a WebChat instance) then // we need to manually close the WebSocket. return () => { - if (ws.readyState === 0 || ws.readyState === 1) ws.close(); + if (this.webSocketConnection.readyState === 0 || this.webSocketConnection.readyState === 1) this.webSocketConnection.close(); } }) as Observable } From 19d9e7729a11ea7a781844738cac139e10b0e637 Mon Sep 17 00:00:00 2001 From: Pranav Joshi Date: Fri, 26 Dec 2025 12:35:05 +0530 Subject: [PATCH 2/2] comment resolved --- src/directLine.test.ts | 157 +++++++++++++++++++---------------------- src/directLine.ts | 34 ++++----- 2 files changed, 85 insertions(+), 106 deletions(-) diff --git a/src/directLine.test.ts b/src/directLine.test.ts index 3a70ab667..90ecb8dd6 100644 --- a/src/directLine.test.ts +++ b/src/directLine.test.ts @@ -274,121 +274,110 @@ describe('MockSuite', () => { }); test('VoiceActivityWithoutWebSocket', () => { - const voiceActivity = DirectLineMock.mockVoiceActivity(); - directline = new DirectLineExport.DirectLine({ ...services, webSocket: false }); + const voiceActivity = DirectLineMock.mockVoiceActivity(); + directline = new DirectLineExport.DirectLine({ ...services, webSocket: false }); - let actualError: any; + let actualError: any; - const scenario = function* (): IterableIterator> { - yield Observable.timer(200, scheduler); - yield directline.postActivity(voiceActivity).catch(error => { - actualError = error; - return Observable.empty(); - }); - }; + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(voiceActivity).catch(error => { + actualError = error; + return Observable.empty(); + }); + }; - subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); - scheduler.flush(); + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); - expect(actualError.message).toContain('Voice activities require WebSocket to be enabled'); + expect(actualError.message).toContain('Voice activities require WebSocket to be enabled'); }); test('VoiceVsTextActivityRouting', () => { - const voiceActivity = DirectLineMock.mockVoiceActivity(); - const textActivity = DirectLineMock.mockActivity('hello'); + const voiceActivity = DirectLineMock.mockVoiceActivity(); + const textActivity = DirectLineMock.mockActivity('hello'); - directline = new DirectLineExport.DirectLine({ ...services, webSocket: true }); + directline = new DirectLineExport.DirectLine({ ...services, webSocket: true }); - const actual: Array = []; - subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); - let voiceCompleted = false; - let textCompleted = false; - let voiceError: any; - let textError: any; + let voiceCompleted = false; + let textCompleted = false; + let voiceError: any; + let textError: any; - const scenario = function* (): IterableIterator> { - yield Observable.timer(200, scheduler); + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); - // Send text activity (should go through HTTP/Ajax) - yield directline.postActivity(textActivity) - .do(() => textCompleted = true) - .catch(error => { - textError = error; - return Observable.empty(); - }); + // Send text activity (should go through HTTP/Ajax) + yield directline.postActivity(textActivity) + .do(() => textCompleted = true) + .catch(error => { + textError = error; + return Observable.empty(); + }); - yield Observable.timer(100, scheduler); + yield Observable.timer(100, scheduler); - // Send voice activity (should go through WebSocket) - yield directline.postActivity(voiceActivity) - .do(() => voiceCompleted = true) - .catch(error => { - voiceError = error; - return Observable.empty(); - }); + // Send voice activity (should go through WebSocket) + yield directline.postActivity(voiceActivity) + .do(() => voiceCompleted = true) + .catch(error => { + voiceError = error; + return Observable.empty(); + }); - yield Observable.timer(200, scheduler); - }; + yield Observable.timer(200, scheduler); + }; - subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); - scheduler.flush(); + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); - // Both should complete successfully but through different paths - expect(textCompleted).toBe(true); - expect(voiceCompleted).toBe(true); - expect(textError).toBeUndefined(); - expect(voiceError).toBeUndefined(); + // Both should complete successfully but through different paths + expect(textCompleted).toBe(true); + expect(voiceCompleted).toBe(true); + expect(textError).toBeUndefined(); + expect(voiceError).toBeUndefined(); - // Text activity should echo back, voice activity should not - expect(actual).toContainEqual(textActivity); - expect(actual).not.toContainEqual(voiceActivity); + // Text activity should echo back, voice activity should not + expect(actual).toContainEqual(textActivity); + expect(actual).not.toContainEqual(voiceActivity); }); - test('InvalidVoiceActivityStructures', () => { - const invalidStructures: DirectLineExport.Activity[] = [ - { type: 'event', from: { id: 'user' }, value: null } as any, - { type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: null } } as any, + test.each([ + { type: 'event', from: { id: 'user' }, value: null }, + { type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: null } }, { type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: {} } }, - { type: 'event', from: { id: 'user' }, value: { notVoice: { data: 'test' } } } as any - ]; - - directline = new DirectLineExport.DirectLine({ ...services, webSocket: true }); - - const actual: Array = []; - subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + { type: 'event', from: { id: 'user' }, value: { notVoice: { data: 'test' } } } + ] as DirectLineExport.Activity[])('InvalidVoiceActivityStructure: %p', (invalidActivity) => { + directline = new DirectLineExport.DirectLine({ ...services, webSocket: true }); - let completedCount = 0; - let errorCount = 0; + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); - const scenario = function* (): IterableIterator> { - yield Observable.timer(200, scheduler); + let completed = false; + let activityError: any; - // Send each invalid structure - should all go through HTTP path - for (const invalidActivity of invalidStructures) { + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); yield directline.postActivity(invalidActivity) - .do(() => completedCount++) + .do(() => completed = true) .catch(error => { - errorCount++; + activityError = error; return Observable.empty(); }); - yield Observable.timer(100, scheduler); - } - - yield Observable.timer(200, scheduler); - }; + yield Observable.timer(200, scheduler); + }; - subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); - scheduler.flush(); + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); - // All invalid structures should complete successfully through HTTP path - expect(completedCount).toBe(invalidStructures.length); - expect(errorCount).toBe(0); + // Should complete successfully through HTTP path + expect(completed).toBe(true); + expect(activityError).toBeUndefined(); - // All invalid structures should echo back (confirming they went through HTTP, not WebSocket) - expect(actual).toHaveLength(invalidStructures.length); - invalidStructures.forEach(invalidActivity => { + // Should echo back (confirming it went through HTTP, not WebSocket) expect(actual).toContainEqual(invalidActivity); - }); }); }); diff --git a/src/directLine.ts b/src/directLine.ts index be6ca859a..7f3d0d375 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -769,7 +769,7 @@ export class DirectLine implements IBotConnection { // if it is voice activity, send it through webSocket as voice over http is not supported in ABS. // ABS limitation - client to server push is not being processed over web socket for text. // Once it is implemented, we can remove this and send all traffic to the webSocket - if (this.isVoiceEventActivity(activity)) { + if (DirectLine.isVoiceEventActivity(activity)) { if (!this.webSocket) { return Observable.throw(new Error('Voice activities require WebSocket to be enabled'), this.services.scheduler); } @@ -778,8 +778,8 @@ export class DirectLine implements IBotConnection { Observable.create((subscriber: Subscriber) => { const envelope = { activities: [activity] }; try { - if (!this.webSocketConnection || this.webSocketConnection.readyState !== WebSocket.OPEN) { - throw new Error('WebSocket connection not ready for voice activities'); + if (!this.webSocketConnection || this.webSocketConnection.readyState !== WebSocket.OPEN) { + throw new Error('WebSocket connection not ready for voice activities'); } this.webSocketConnection.send(JSON.stringify(envelope)); subscriber.next(envelope); @@ -818,25 +818,15 @@ export class DirectLine implements IBotConnection { // to send voice chunks over activity protocol. The activity structure shown serves as // the current solution for transmitting voice data: // { "type": "event", "value": { "voiceLiveEvent": { "type": "response.audio.delta", "delta": "" } } } - private isVoiceEventActivity(activity: Activity) { - if (activity.type !== 'event') { - return false; - } - - if (!activity?.value || typeof activity?.value !== 'object') { - return false; - } - - const vle = activity?.value?.voiceLiveEvent; - if (!vle || typeof vle !== 'object') { - return false; - } - - if (Object.keys(vle).length === 0) { - return false; - } - - return true; + private static isVoiceEventActivity(activity: Activity) { + return ( + activity.type === 'event' && + activity?.value && + typeof activity?.value === 'object' && + activity?.value?.voiceLiveEvent && + typeof activity?.value?.voiceLiveEvent === 'object' && + Object.keys(activity?.value?.voiceLiveEvent).length > 0 + ); } private postMessageWithAttachments(message: Message) {