5959import io .grpc .internal .StatsTraceContext ;
6060import java .util .concurrent .Executor ;
6161import java .util .concurrent .ScheduledFuture ;
62- import java . util . concurrent . atomic . AtomicInteger ;
62+
6363import javax .annotation .Nullable ;
6464import javax .annotation .concurrent .ThreadSafe ;
6565
@@ -78,7 +78,12 @@ public final class BinderClientTransport extends BinderTransport
7878 private final ClientHandshake handshake ;
7979
8080 /** Number of ongoing calls which keep this transport "in-use". */
81- private final AtomicInteger numInUseStreams ;
81+ @ GuardedBy ("this" )
82+ private int numInUseStreams ;
83+
84+ /** Last in-use state that was reported to the listener */
85+ @ GuardedBy ("this" )
86+ private boolean listenerInUse ;
8287
8388 private final long readyTimeoutMillis ;
8489 private final PingTracker pingTracker ;
@@ -119,9 +124,11 @@ public BinderClientTransport(
119124 Boolean preAuthServerOverride = options .getEagAttributes ().get (PRE_AUTH_SERVER_OVERRIDE );
120125 this .preAuthorizeServer =
121126 preAuthServerOverride != null ? preAuthServerOverride : factory .preAuthorizeServers ;
127+
122128 this .handshake =
123129 factory .useLegacyAuthStrategy ? new LegacyClientHandshake () : new V2ClientHandshake ();
124- numInUseStreams = new AtomicInteger ();
130+ numInUseStreams = 0 ;
131+ listenerInUse = false ;
125132 pingTracker = new PingTracker (Ticker .systemTicker (), (id ) -> sendPing (id ));
126133 serviceBinding =
127134 new ServiceBinding (
@@ -265,9 +272,7 @@ public synchronized ClientStream newStream(
265272 return newFailingClientStream (failure , attributes , headers , tracers );
266273 }
267274
268- if (inbound .countsForInUse () && numInUseStreams .getAndIncrement () == 0 ) {
269- clientTransportListener .transportInUse (true );
270- }
275+ updateInUseStreamsIfNeed (inbound .countsForInUse (), 1 );
271276 Outbound .ClientOutbound outbound =
272277 new Outbound .ClientOutbound (this , callId , method , headers , statsTraceContext );
273278 if (method .getType ().clientSendsOneMessage ()) {
@@ -279,9 +284,7 @@ public synchronized ClientStream newStream(
279284
280285 @ Override
281286 protected void unregisterInbound (Inbound <?> inbound ) {
282- if (inbound .countsForInUse () && numInUseStreams .decrementAndGet () == 0 ) {
283- clientTransportListener .transportInUse (false );
284- }
287+ updateInUseStreamsIfNeed (inbound .countsForInUse (), -1 );
285288 super .unregisterInbound (inbound );
286289 }
287290
@@ -311,7 +314,9 @@ void notifyShutdown(Status status) {
311314 @ Override
312315 @ GuardedBy ("this" )
313316 void notifyTerminated () {
314- if (numInUseStreams .getAndSet (0 ) > 0 ) {
317+ if (numInUseStreams > 0 ) {
318+ numInUseStreams = 0 ;
319+ listenerInUse = false ;
315320 clientTransportListener .transportInUse (false );
316321 }
317322 if (readyTimeoutFuture != null ) {
@@ -452,6 +457,25 @@ private synchronized void handleAuthResult(Throwable t) {
452457 Status .INTERNAL .withDescription ("Could not evaluate SecurityPolicy" ).withCause (t ), true );
453458 }
454459
460+ /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */
461+ private synchronized void updateInUseStreamsIfNeed (boolean countsForInUse , int delta ) {
462+ if (!countsForInUse ) {
463+ return ;
464+ }
465+
466+ numInUseStreams += delta ;
467+ if (numInUseStreams < 0 ) {
468+ // Defensive: prevent negative due to unexpected double-decrement
469+ numInUseStreams = 0 ;
470+ }
471+
472+ boolean nowInUseStream = numInUseStreams > 0 ;
473+ if (nowInUseStream != listenerInUse ) {
474+ listenerInUse = nowInUseStream ;
475+ clientTransportListener .transportInUse (nowInUseStream );
476+ }
477+ }
478+
455479 @ GuardedBy ("this" )
456480 @ Override
457481 protected void handlePingResponse (Parcel parcel ) {
0 commit comments