From c52f1947d50cf39c84fef4b3d5c4538eef1e1f2d Mon Sep 17 00:00:00 2001 From: Pranav Joshi Date: Tue, 9 Sep 2025 18:18:45 +0530 Subject: [PATCH 1/3] add deliveryMode support --- src/directLine.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/directLine.ts b/src/directLine.ts index 7cc1b15fc..c8094316f 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -329,7 +329,8 @@ export interface Message extends IActivity { suggestedActions?: { actions: CardAction[], to?: string[] }, speak?: string, inputHint?: string, - value?: object + value?: object, + deliveryMode?: "normal" | "stream" | "expectReplies" } export interface Typing extends IActivity { From 21383aaa9b610b3eeed21b77dfeb3cf7b259b641 Mon Sep 17 00:00:00 2001 From: Pranav Joshi Date: Wed, 29 Oct 2025 15:24:29 +0530 Subject: [PATCH 2/3] deliveryMode support to enable streaming from ABS side --- src/directLine.test.ts | 142 +++++++++++++++++++++++++++++++++++++++++ src/directLine.ts | 27 ++++++-- 2 files changed, 165 insertions(+), 4 deletions(-) diff --git a/src/directLine.test.ts b/src/directLine.test.ts index f280e081c..93755b1bf 100644 --- a/src/directLine.test.ts +++ b/src/directLine.test.ts @@ -243,4 +243,146 @@ describe('MockSuite', () => { expect(actualError.status).toStrictEqual(429); expect(endTime - startTime).toStrictEqual(10); }); + + describe('StreamingMode', () => { + + test('Setting streaming adds deliveryMode=stream to outgoing activity', () => { + // override directline with streaming enabled + directline = new DirectLineExport.DirectLine({ ...services, streaming: true }); + + const streamingActivity = DirectLineMock.mockActivity('streaming-test'); + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(streamingActivity); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + scheduler.flush(); + + expect(streamingActivity.deliveryMode).toStrictEqual('stream'); + expect(actual[0].deliveryMode).toStrictEqual('stream'); + }); + + test('Not setting streaming does not add deliveryMode at all to outgoing activity', () => { + const normalActivity = DirectLineMock.mockActivity('normal-test'); + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(normalActivity); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + scheduler.flush(); + + expect(normalActivity.deliveryMode).toBeUndefined(); + expect(actual[0].deliveryMode).toBeUndefined(); + }); + + test('Setting streaming overrides passed deliveryMode "normal" in activity to "stream"', () => { + directline = new DirectLineExport.DirectLine({ ...services, streaming: true }); + + const presetActivity: DirectLineExport.Message = { + type: 'message', + from: { id: 'sender' }, + text: 'preset', + deliveryMode: 'normal' + }; + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(presetActivity); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + scheduler.flush(); + + expect(presetActivity.deliveryMode).toStrictEqual('stream'); + expect(actual[0].deliveryMode).toStrictEqual('stream'); + }); + + test('Not setting streaming preserves passed deliveryMode "normal" in activity', () => { + const presetActivity: DirectLineExport.Message = { + type: 'message', + from: { id: 'sender' }, + text: 'preset-nonstream', + deliveryMode: 'normal' + }; + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(presetActivity); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + scheduler.flush(); + + expect(presetActivity.deliveryMode).toStrictEqual('normal'); + expect(actual[0].deliveryMode).toStrictEqual('normal'); + }); + + test('Streaming + 403 post returns retry and keeps deliveryMode=stream on activity', () => { + services.ajax = DirectLineMock.mockAjax(server, (urlOrRequest) => { + if (typeof urlOrRequest === 'string') { + throw new Error(); + } + + if (urlOrRequest.url && urlOrRequest.url.indexOf('/conversations') > 0 && !/activities/u.test(urlOrRequest.url)) { + // start conversation + const response: Partial = { + response: server.conversation, + status: 201, + xhr: { getResponseHeader: () => 'n/a' } as unknown as XMLHttpRequest + }; + return response as AjaxResponse; + } + + if (urlOrRequest.url && /activities/u.test(urlOrRequest.url)) { + const response: Partial = { + status: 403, + xhr: { getResponseHeader: () => 'n/a' } as unknown as XMLHttpRequest + }; + const error = new Error('Forbidden'); + throw Object.assign(error, response); + } + + throw new Error(); + }); + + directline = new DirectLineExport.DirectLine({ ...services, streaming: true }); + + const retryActivity = DirectLineMock.mockActivity('will-retry'); + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(retryActivity); + }; + + let postResult: string | undefined; + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe({ + next: v => { postResult = v as string; }, + error: () => {}, + complete: () => {} + })); + + scheduler.flush(); + + expect(retryActivity.deliveryMode).toStrictEqual('stream'); + expect(postResult).toStrictEqual('retry'); + }); + + }); }); diff --git a/src/directLine.ts b/src/directLine.ts index c8094316f..66530e22b 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -305,6 +305,8 @@ export interface User { role?: UserRole } +export type DeliveryMode = "normal" | "stream" | "expectReplies"; + export interface IActivity { type: string, channelData?: any, @@ -313,7 +315,8 @@ export interface IActivity { eTag?: string, from: User, id?: string, - timestamp?: string + timestamp?: string, + deliveryMode?: DeliveryMode } export type AttachmentLayout = "list" | "carousel"; @@ -329,8 +332,7 @@ export interface Message extends IActivity { suggestedActions?: { actions: CardAction[], to?: string[] }, speak?: string, inputHint?: string, - value?: object, - deliveryMode?: "normal" | "stream" | "expectReplies" + value?: object } export interface Typing extends IActivity { @@ -373,7 +375,13 @@ export interface DirectLineOptions { timeout?: number, // Attached to all requests to identify requesting agent. botAgent?: string, - conversationStartProperties?: any + conversationStartProperties?: any, + /** + * Per-conversation switch for streaming delivery mode. + * If true, every outgoing activity will include deliveryMode: 'stream'. + * If false/omitted, deliveryMode is not sent (defaults to 'normal' in ABS). + */ + streaming?: boolean } export interface Services { @@ -478,12 +486,17 @@ export class DirectLine implements IBotConnection { private pollingInterval: number = 1000; //ms private tokenRefreshSubscription: Subscription; + private streaming: boolean; constructor(options: DirectLineOptions & Partial) { this.secret = options.secret; this.token = options.secret || options.token; this.webSocket = (options.webSocket === undefined ? true : options.webSocket) && typeof WebSocket !== 'undefined' && WebSocket !== undefined; + if (options.streaming) { + this.streaming = options.streaming; + } + if (options.conversationStartProperties && options.conversationStartProperties.locale) { if (Object.prototype.toString.call(options.conversationStartProperties.locale) === '[object String]') { this.localeOnStartConversation = options.conversationStartProperties.locale; @@ -755,6 +768,12 @@ export class DirectLine implements IBotConnection { } postActivity(activity: Activity) { + // If streaming is enabled for this DirectLine instance, always set deliveryMode to 'stream' + // default would be 'normal' and not passing anything meaning 'normal' as well on ABS side + if (this.streaming) { + activity.deliveryMode = 'stream'; + } + // If user id is set, check if it match activity.from.id and always override it in activity if (this.userIdOnStartConversation && activity.from && activity.from.id !== this.userIdOnStartConversation) { console.warn('DirectLineJS: Activity.from.id does not match with user id, ignoring activity.from.id'); From cc529b8e79c6102d14a157b70dbc7f222438f2dd Mon Sep 17 00:00:00 2001 From: Pranav Joshi Date: Fri, 26 Dec 2025 13:00:41 +0530 Subject: [PATCH 3/3] comment resolved --- src/directLine.test.ts | 13 +++++++++---- src/directLine.ts | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/directLine.test.ts b/src/directLine.test.ts index 93755b1bf..5f6b7e396 100644 --- a/src/directLine.test.ts +++ b/src/directLine.test.ts @@ -335,7 +335,10 @@ describe('MockSuite', () => { expect(actual[0].deliveryMode).toStrictEqual('normal'); }); - test('Streaming + 403 post returns retry and keeps deliveryMode=stream on activity', () => { + test.each([ + { streaming: true, expectedDeliveryMode: 'stream', testName: 'Streaming' }, + { streaming: false, expectedDeliveryMode: undefined, testName: 'Non-streaming' } + ])('$testName + 403 post returns retry and preserves deliveryMode', ({ streaming, expectedDeliveryMode }) => { services.ajax = DirectLineMock.mockAjax(server, (urlOrRequest) => { if (typeof urlOrRequest === 'string') { throw new Error(); @@ -363,7 +366,10 @@ describe('MockSuite', () => { throw new Error(); }); - directline = new DirectLineExport.DirectLine({ ...services, streaming: true }); + directline = new DirectLineExport.DirectLine({ + ...services, + ...(streaming ? { streaming: true } : {}) + }); const retryActivity = DirectLineMock.mockActivity('will-retry'); const scenario = function* (): IterableIterator> { @@ -380,9 +386,8 @@ describe('MockSuite', () => { scheduler.flush(); - expect(retryActivity.deliveryMode).toStrictEqual('stream'); + expect(retryActivity.deliveryMode).toStrictEqual(expectedDeliveryMode); expect(postResult).toStrictEqual('retry'); }); - }); }); diff --git a/src/directLine.ts b/src/directLine.ts index 66530e22b..443e0416d 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -305,7 +305,7 @@ export interface User { role?: UserRole } -export type DeliveryMode = "normal" | "stream" | "expectReplies"; +export type DeliveryMode = "normal" | "stream"; export interface IActivity { type: string,