2525import android .os .IBinder ;
2626import android .os .Parcel ;
2727import android .os .Process ;
28+
2829import androidx .annotation .BinderThread ;
2930import androidx .annotation .MainThread ;
31+
32+ import com .google .common .base .Preconditions ;
3033import com .google .common .base .Ticker ;
3134import com .google .common .util .concurrent .FutureCallback ;
3235import com .google .common .util .concurrent .Futures ;
5962import io .grpc .internal .StatsTraceContext ;
6063import java .util .concurrent .Executor ;
6164import java .util .concurrent .ScheduledFuture ;
65+ import java .util .concurrent .atomic .AtomicBoolean ;
66+ import java .util .concurrent .atomic .AtomicInteger ;
6267
6368import javax .annotation .Nullable ;
6469import javax .annotation .concurrent .ThreadSafe ;
@@ -78,12 +83,13 @@ public final class BinderClientTransport extends BinderTransport
7883 private final ClientHandshake handshake ;
7984
8085 /** Number of ongoing calls which keep this transport "in-use". */
81- @ GuardedBy ("this" )
82- private int numInUseStreams ;
86+ private final AtomicInteger numInUseStreams ;
8387
8488 /** Last in-use state that was reported to the listener */
85- @ GuardedBy ("this" )
86- private boolean listenerInUse ;
89+ private final AtomicBoolean listenerInUse ;
90+
91+ /** Synchronizes transport listener callbacks */
92+ private final Object listenerNotifyLock ;
8793
8894 private final long readyTimeoutMillis ;
8995 private final PingTracker pingTracker ;
@@ -124,11 +130,11 @@ public BinderClientTransport(
124130 Boolean preAuthServerOverride = options .getEagAttributes ().get (PRE_AUTH_SERVER_OVERRIDE );
125131 this .preAuthorizeServer =
126132 preAuthServerOverride != null ? preAuthServerOverride : factory .preAuthorizeServers ;
127-
128133 this .handshake =
129134 factory .useLegacyAuthStrategy ? new LegacyClientHandshake () : new V2ClientHandshake ();
130- numInUseStreams = 0 ;
131- listenerInUse = false ;
135+ this .numInUseStreams = new AtomicInteger ();
136+ this .listenerInUse = new AtomicBoolean ();
137+ this .listenerNotifyLock = new Object ();
132138 pingTracker = new PingTracker (Ticker .systemTicker (), (id ) -> sendPing (id ));
133139 serviceBinding =
134140 new ServiceBinding (
@@ -272,7 +278,7 @@ public synchronized ClientStream newStream(
272278 return newFailingClientStream (failure , attributes , headers , tracers );
273279 }
274280
275- updateInUseStreamsIfNeed (inbound .countsForInUse (), 1 );
281+ updateInUseStreamsCountIfNeeded (inbound .countsForInUse (), 1 );
276282 Outbound .ClientOutbound outbound =
277283 new Outbound .ClientOutbound (this , callId , method , headers , statsTraceContext );
278284 if (method .getType ().clientSendsOneMessage ()) {
@@ -284,7 +290,7 @@ public synchronized ClientStream newStream(
284290
285291 @ Override
286292 protected void unregisterInbound (Inbound <?> inbound ) {
287- updateInUseStreamsIfNeed (inbound .countsForInUse (), -1 );
293+ updateInUseStreamsCountIfNeeded (inbound .countsForInUse (), -1 );
288294 super .unregisterInbound (inbound );
289295 }
290296
@@ -314,9 +320,8 @@ void notifyShutdown(Status status) {
314320 @ Override
315321 @ GuardedBy ("this" )
316322 void notifyTerminated () {
317- if (numInUseStreams > 0 ) {
318- numInUseStreams = 0 ;
319- listenerInUse = false ;
323+ if (numInUseStreams .getAndSet (0 ) > 0 ) {
324+ listenerInUse .set (false );
320325 clientTransportListener .transportInUse (false );
321326 }
322327 if (readyTimeoutFuture != null ) {
@@ -457,25 +462,63 @@ private synchronized void handleAuthResult(Throwable t) {
457462 Status .INTERNAL .withDescription ("Could not evaluate SecurityPolicy" ).withCause (t ), true );
458463 }
459464
460- /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */
461- private synchronized void updateInUseStreamsIfNeed (boolean countsForInUse , int delta ) {
462- if (!countsForInUse ) {
465+ /**
466+ * Updates in-use stream count and notifies listener only on transitions between 0 and >0, without
467+ * acquiring the transport lock.
468+ */
469+ private void updateInUseStreamsCountIfNeeded (boolean countsForInUse , int delta ) {
470+ Preconditions .checkArgument (delta == -1 || delta == 1 , "stream count delta must be -1 or +1" );
471+ if (!countsForInUse ) {
463472 return ;
464473 }
474+ int prev , next ;
465475
466- numInUseStreams += delta ;
467- if (numInUseStreams < 0 ) {
468- // Defensive: prevent negative due to unexpected double-decrement
469- numInUseStreams = 0 ;
476+ if (delta > 0 ) {
477+ next = numInUseStreams .incrementAndGet ();
478+ prev = next - 1 ;
479+ } else {
480+ prev = numInUseStreams .get ();
481+ int updated ;
482+
483+ while (true ) {
484+ int current = prev ;
485+ int newValue = current > 0 ? current - 1 : 0 ;
486+ if (numInUseStreams .compareAndSet (current , newValue )) {
487+ updated = newValue ;
488+ break ;
489+ }
490+ prev = numInUseStreams .get ();
491+ }
492+ next = updated ;
470493 }
471494
472- boolean nowInUseStream = numInUseStreams > 0 ;
473- if (nowInUseStream != listenerInUse ) {
474- listenerInUse = nowInUseStream ;
475- clientTransportListener .transportInUse (nowInUseStream );
495+ boolean prevInUse = prev > 0 ;
496+ boolean nextInUse = next > 0 ;
497+
498+ if (prevInUse != nextInUse ) {
499+ if (listenerInUse .compareAndSet (prevInUse , nextInUse )) {
500+ scheduleTransportInUseNotification (nextInUse );
501+ }
476502 }
477503 }
478504
505+ private void scheduleTransportInUseNotification (final boolean inUse ) {
506+ getScheduledExecutorService ()
507+ .execute (
508+ new Runnable () {
509+ @ Override
510+ public void run () {
511+ // Provide external synchronization as required by Listener contract,
512+ // without taking the transport lock to avoid potential deadlocks.
513+ synchronized (listenerNotifyLock ) {
514+ if (listenerInUse .get () == inUse ) {
515+ clientTransportListener .transportInUse (inUse );
516+ }
517+ }
518+ }
519+ });
520+ }
521+
479522 @ GuardedBy ("this" )
480523 @ Override
481524 protected void handlePingResponse (Parcel parcel ) {
0 commit comments