Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
private int numTf = 0;
private boolean firstPass = true;
@Nullable
private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
Expand Down Expand Up @@ -120,9 +119,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
}
}

// Since we have a new set of addresses, we are again at first pass
firstPass = true;

List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);

// We can optionally be configured to shuffle the address list. This can help better distribute
Expand Down Expand Up @@ -298,6 +294,7 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
}

if (newState == IDLE && subchannelData.state == READY) {
numTf = 0;
helper.refreshNameResolution();
}

Expand Down Expand Up @@ -344,6 +341,8 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
break;

case TRANSIENT_FAILURE:
numTf++;

// If we are looking at current channel, request a connection if possible
if (addressIndex.isValid()
&& subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
Expand All @@ -369,12 +368,9 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo

// Refresh Name Resolution, but only when all 3 conditions are met
// * We are at the end of addressIndex
// * have had status reported for all subchannels.
// * And one of the following conditions:
// * Have had enough TF reported since we completed first pass
// * Just completed the first pass
if (++numTf >= addressIndex.size() || firstPass) {
firstPass = false;
// * Have had status reported for all subchannels.
// * Have had enough TF reported since the last refresh
if (numTf >= addressIndex.size()) {
numTf = 0;
helper.refreshNameResolution();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,7 @@ public void updateAddresses_identical_transient_failure() {
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build());
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
inOrder.verify(mockHelper).createSubchannel(any());
inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
Expand All @@ -1949,6 +1950,7 @@ public void updateAddresses_identical_transient_failure() {
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());

// Second connection attempt is unsuccessful
inOrder.verify(mockHelper).createSubchannel(any());
inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture());
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
SubchannelStateListener stateListener2 = stateListenerCaptor.getValue();
Expand All @@ -1961,18 +1963,24 @@ public void updateAddresses_identical_transient_failure() {
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build());

// Verify that no new subchannels were created or started
verify(mockHelper, times(2)).createSubchannel(createArgsCaptor.capture());
verify(mockSubchannel1, times(1)).start(stateListenerCaptor.capture());
verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture());
// Verify that no new subchannels were created or started, nor resolver refresh
inOrder.verify(mockHelper, never()).createSubchannel(createArgsCaptor.capture());
inOrder.verify(mockSubchannel1, never()).start(stateListenerCaptor.capture());
inOrder.verify(mockSubchannel2, never()).start(stateListenerCaptor.capture());
inOrder.verify(mockHelper, never()).refreshNameResolution();
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());

// No new connections are requested, subchannels responsible for completing their own backoff
verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);

// First connection attempt is successful
// Second connection is unsuccessful (e.g., retry after backoff), and there is no name resolver
// refresh
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
inOrder.verify(mockHelper, never()).refreshNameResolution();

// First connection is successful
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
assertEquals(READY, loadBalancer.getConcludedConnectivityState());
inOrder.verify(mockSubchannel2).shutdown();
Expand Down