From e1efe387aff6aaf32081e310115fa8a0e1ed1088 Mon Sep 17 00:00:00 2001 From: Cheskel Twersky Date: Wed, 13 Nov 2024 02:30:13 +0200 Subject: [PATCH] wip: rework --- src/publisher/flow-control.ts | 7 ++++-- src/publisher/flow-publisher.ts | 40 ++++++++++++++------------------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/src/publisher/flow-control.ts b/src/publisher/flow-control.ts index ee467e329..c5bdfc3f6 100644 --- a/src/publisher/flow-control.ts +++ b/src/publisher/flow-control.ts @@ -112,7 +112,7 @@ export class FlowControl { * Do not use externally, it may change without warning. * @private */ - async willSend(bytes: number, messages: number): Promise { + willSend(bytes: number, messages: number): Promise { // Add this to our queue size. this.bytes += bytes; this.messages += messages; @@ -129,7 +129,10 @@ export class FlowControl { }); // This will pass through when someone else's this.sent() completes. - await promise.promise; + return promise.promise; + } else { + // This will pass through immediately. + return Promise.resolve(); } } diff --git a/src/publisher/flow-publisher.ts b/src/publisher/flow-publisher.ts index 74e3b3279..34128d4a1 100644 --- a/src/publisher/flow-publisher.ts +++ b/src/publisher/flow-publisher.ts @@ -32,12 +32,12 @@ import * as tracing from '../telemetry-tracing'; export class FlowControlledPublisher { private publisher: Publisher; private flowControl: FlowControl; - private idPromises: Promise[]; + private idPromises: Set>; constructor(publisher: Publisher) { this.publisher = publisher; this.flowControl = this.publisher.flowControl; - this.idPromises = []; + this.idPromises = new Set(); } /** @@ -76,24 +76,20 @@ export class FlowControlledPublisher { * await flowControlled.publish(data); * ``` */ - publish(message: PubsubMessage): Promise | null { + publish(message: PubsubMessage): Promise { const flowSpan = message.parentSpan ? tracing.PubsubSpans.createPublishFlowSpan(message) : undefined; const doPublish = () => { flowSpan?.end(); - this.doPublish(message); + return this.doPublish(message); }; const size = calculateMessageSize(message); - if (this.flowControl.wouldExceed(size, 1)) { - const waitPromise = this.flowControl.willSend(size, 1); - return waitPromise.then(doPublish); - } else { - this.flowControl.willSend(size, 1).then(() => {}); - doPublish(); - return null; - } + + return this.flowControl.willSend(size, 1).then(() => { + return doPublish(); + }); } /** @@ -112,23 +108,22 @@ export class FlowControlledPublisher { * } * ``` */ - publishNow(message: PubsubMessage): void { + publishNow(message: PubsubMessage): Promise { this.flowControl.addToCount(calculateMessageSize(message), 1); - this.doPublish(message); + return this.doPublish(message); } - private doPublish(message: PubsubMessage): void { - let idPromise = this.publisher.publishMessage(message); + private doPublish(message: PubsubMessage): Promise { + const idPromise = this.publisher.publishMessage(message); // This will defer but not eat any errors. - const publishDone = (id: string) => { + const publishDone = () => { this.flowControl.sent(calculateMessageSize(message), 1); - return id; + this.idPromises.delete(idPromise); }; - idPromise.catch(publishDone); - idPromise = idPromise.then(publishDone); - - this.idPromises.push(idPromise); + idPromise.finally(publishDone); + this.idPromises.add(idPromise); + return idPromise; } /** @@ -142,7 +137,6 @@ export class FlowControlledPublisher { */ all(): Promise { const allPromise = Promise.all(this.idPromises); - this.idPromises = []; return allPromise; } }