Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/publisher/flow-control.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export class FlowControl {
* Do not use externally, it may change without warning.
* @private
*/
async willSend(bytes: number, messages: number): Promise<void> {
willSend(bytes: number, messages: number): Promise<void> {
// Add this to our queue size.
this.bytes += bytes;
this.messages += messages;
Expand All @@ -129,7 +129,10 @@ export class FlowControl {
});

// This will pass through when someone else's this.sent() completes.
await promise.promise;
return promise.promise;
} else {
// This will pass through immediately.
return Promise.resolve();
}
}

Expand Down
40 changes: 17 additions & 23 deletions src/publisher/flow-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import * as tracing from '../telemetry-tracing';
export class FlowControlledPublisher {
private publisher: Publisher;
private flowControl: FlowControl;
private idPromises: Promise<string>[];
private idPromises: Set<Promise<string>>;

constructor(publisher: Publisher) {
this.publisher = publisher;
this.flowControl = this.publisher.flowControl;
this.idPromises = [];
this.idPromises = new Set();
}

/**
Expand Down Expand Up @@ -76,24 +76,20 @@ export class FlowControlledPublisher {
* await flowControlled.publish(data);
* ```
*/
publish(message: PubsubMessage): Promise<void> | null {
publish(message: PubsubMessage): Promise<string> {
const flowSpan = message.parentSpan
? tracing.PubsubSpans.createPublishFlowSpan(message)
: undefined;
const doPublish = () => {
flowSpan?.end();
this.doPublish(message);
return this.doPublish(message);
};

const size = calculateMessageSize(message);
if (this.flowControl.wouldExceed(size, 1)) {
const waitPromise = this.flowControl.willSend(size, 1);
return waitPromise.then(doPublish);
} else {
this.flowControl.willSend(size, 1).then(() => {});
doPublish();
return null;
}

return this.flowControl.willSend(size, 1).then(() => {
return doPublish();
});
}

/**
Expand All @@ -112,23 +108,22 @@ export class FlowControlledPublisher {
* }
* ```
*/
publishNow(message: PubsubMessage): void {
publishNow(message: PubsubMessage): Promise<string> {
this.flowControl.addToCount(calculateMessageSize(message), 1);
this.doPublish(message);
return this.doPublish(message);
}

private doPublish(message: PubsubMessage): void {
let idPromise = this.publisher.publishMessage(message);
private doPublish(message: PubsubMessage): Promise<string> {
const idPromise = this.publisher.publishMessage(message);

// This will defer but not eat any errors.
const publishDone = (id: string) => {
const publishDone = () => {
this.flowControl.sent(calculateMessageSize(message), 1);
return id;
this.idPromises.delete(idPromise);
};
idPromise.catch(publishDone);
idPromise = idPromise.then(publishDone);

this.idPromises.push(idPromise);
idPromise.finally(publishDone);
this.idPromises.add(idPromise);
return idPromise;
}

/**
Expand All @@ -142,7 +137,6 @@ export class FlowControlledPublisher {
*/
all(): Promise<string[]> {
const allPromise = Promise.all(this.idPromises);
this.idPromises = [];
return allPromise;
}
}