Skip to content

feat: scale up connection worker pool based on latency#13384

Open
agrawal-siddharth wants to merge 1 commit into
googleapis:mainfrom
agrawal-siddharth:latency
Open

feat: scale up connection worker pool based on latency#13384
agrawal-siddharth wants to merge 1 commit into
googleapis:mainfrom
agrawal-siddharth:latency

Conversation

@agrawal-siddharth
Copy link
Copy Markdown
Contributor

No description provided.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a latency-based load metric, timeSinceLastCallback, to ConnectionWorker to track the duration since the oldest in-flight request was sent. This metric is used to determine if a worker is overwhelmed and is integrated into the load comparators for connection routing, with corresponding unit tests added. The review feedback suggests avoiding lock contention in getLoad() by using a volatile timestamp to make the method lock-free, simplifying the comparator bucket calculation using toSeconds(), and maintaining test assertion consistency by using Truth's assertThat instead of JUnit's assertFalse/assertTrue.

Comment on lines +1884 to +1902
this.lock.lock();
try {
Duration timeSinceLastCallback = Duration.ZERO;
if (!inflightRequestQueue.isEmpty()) {
AppendRequestAndResponse head = inflightRequestQueue.peekFirst();
if (head != null && head.requestSendTimeStamp != null) {
timeSinceLastCallback = Duration.between(head.requestSendTimeStamp, Instant.now());
}
}
return Load.create(
timeSinceLastCallback,
inflightBytes,
inflightRequests,
destinationSet.size(),
maxInflightBytes,
maxInflightRequests);
} finally {
this.lock.unlock();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Acquiring this.lock in getLoad() can introduce significant lock contention on the critical path. getLoad() is called frequently by the routing/load-balancing thread to select the best connection worker. Since the worker thread also frequently holds this.lock to serialize requests and responses, locking here can block user threads.

To avoid this, we can maintain a volatile Instant oldestInflightRequestSendTimeStamp field that is updated under the lock when requests are added to or removed from inflightRequestQueue. This allows getLoad() to remain completely lock-free.

    Instant oldestTimestamp = this.oldestInflightRequestSendTimeStamp;
    Duration timeSinceLastCallback = Duration.ZERO;
    if (oldestTimestamp != null) {
      timeSinceLastCallback = Duration.between(oldestTimestamp, Instant.now());
    }
    return Load.create(
        timeSinceLastCallback,
        inflightBytes,
        inflightRequests,
        destinationSet.size(),
        maxInflightBytes,
        maxInflightRequests);
References
  1. In performance-sensitive code, minor race conditions in periodic background tasks (such as error rate monitoring) may be acceptable if the impact is limited to a slightly delayed or stale state transition and it avoids synchronization overhead.

// Then compare destination count of the two connections.
public static final Comparator<Load> LOAD_COMPARATOR =
Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024))
Comparator.comparing((Load key) -> (int) (key.timeSinceLastCallback().toMillis() / 1000))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

We can simplify key.timeSinceLastCallback().toMillis() / 1000 by using toSeconds() directly, which is cleaner and more idiomatic.

Suggested change
Comparator.comparing((Load key) -> (int) (key.timeSinceLastCallback().toMillis() / 1000))
Comparator.comparing((Load key) -> (int) key.timeSinceLastCallback().toSeconds())

Comment on lines +903 to +912
Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 1, 1, 100, 100, 100);
assertFalse(load2.isOverwhelmed());

// Under threshold (3s) for timeSinceLastCallback
Load load3 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 100, 100);
assertFalse(load3.isOverwhelmed());

// Over threshold (3s) for timeSinceLastCallback
Load load4 = ConnectionWorker.Load.create(Duration.ofSeconds(4), 0, 0, 0, 100, 100);
assertTrue(load4.isOverwhelmed());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the rest of the test suite (which uses Truth's assertThat), we should use assertThat(...).isFalse() and assertThat(...).isTrue() instead of JUnit's assertFalse and assertTrue.

Suggested change
Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 1, 1, 100, 100, 100);
assertFalse(load2.isOverwhelmed());
// Under threshold (3s) for timeSinceLastCallback
Load load3 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 100, 100);
assertFalse(load3.isOverwhelmed());
// Over threshold (3s) for timeSinceLastCallback
Load load4 = ConnectionWorker.Load.create(Duration.ofSeconds(4), 0, 0, 0, 100, 100);
assertTrue(load4.isOverwhelmed());
Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 1, 1, 100, 100, 100);
assertThat(load2.isOverwhelmed()).isFalse();
// Under threshold (3s) for timeSinceLastCallback
Load load3 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 100, 100);
assertThat(load3.isOverwhelmed()).isFalse();
// Over threshold (3s) for timeSinceLastCallback
Load load4 = ConnectionWorker.Load.create(Duration.ofSeconds(4), 0, 0, 0, 100, 100);
assertThat(load4.isOverwhelmed()).isTrue();

destinationSet.size(),
maxInflightBytes,
maxInflightRequests);
this.lock.lock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you refactor inflightBytes / inflightRequests / head.requestSendTimeStamp into a a atomic member variaable and read that out instead of grabing the lock? (we don't need to examine the queue to get head.requestSendTimeStamp, we can just update a member variable everytime a request is added)

I think the original code also has bug around inflightBytes, that should be volatile or atomic otherwise it can lead to undefined behavior if read and write happen at the same time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants