1818import java .util .concurrent .ExecutorService ;
1919import java .util .concurrent .TimeoutException ;
2020import java .util .concurrent .atomic .AtomicBoolean ;
21+ import java .util .function .Supplier ;
2122import org .slf4j .Logger ;
2223import org .slf4j .LoggerFactory ;
2324
@@ -126,6 +127,7 @@ public class ZerobusStream<RecordType extends Message> {
126127 private static final int CREATE_STREAM_TIMEOUT_MS = 15000 ;
127128
128129 private ZerobusStub stub ;
130+ private final Supplier <ZerobusStub > stubSupplier ;
129131 final TableProperties <RecordType > tableProperties ;
130132 private final ZerobusSdkStubFactory stubFactory ;
131133 private final String serverEndpoint ;
@@ -352,10 +354,10 @@ private CompletableFuture<Void> createStream() {
352354 () -> {
353355 CompletableFuture <Void > createStreamTry = new CompletableFuture <>();
354356
355- // The stub was created once with a token supplier, so we don't recreate it here
356- // The token supplier will provide a fresh token for each gRPC request
357+ // Get a fresh stub from the supplier
358+ stub = stubSupplier . get ();
357359
358- // Create the gRPC stream with the existing stub
360+ // Create the gRPC stream with the fresh stub
359361 streamCreatedEvent = Optional .of (new CompletableFuture <>());
360362 stream =
361363 Optional .of (
@@ -500,6 +502,9 @@ private void closeStream(boolean hardFailure, Optional<ZerobusException> excepti
500502 try {
501503 if (stream .isPresent ()) {
502504 stream .get ().onCompleted ();
505+ if (hardFailure ) {
506+ stream .get ().cancel ("Stream closed" , null );
507+ }
503508 }
504509 } catch (Exception e ) {
505510 // Ignore errors during stream cleanup - stream may already be closed
@@ -528,6 +533,7 @@ private void closeStream(boolean hardFailure, Optional<ZerobusException> excepti
528533 stream = Optional .empty ();
529534 streamCreatedEvent = Optional .empty ();
530535 streamId = Optional .empty ();
536+ stub = null ;
531537
532538 this .notifyAll ();
533539 }
@@ -1073,6 +1079,7 @@ public void onNext(EphemeralStreamResponse response) {
10731079 String .format (
10741080 "Server will close the stream in %.3fms. Triggering stream recovery." ,
10751081 durationMs ));
1082+ streamFailureInfo .resetFailure (StreamFailureType .SERVER_CLOSED_STREAM );
10761083 handleStreamFailed (StreamFailureType .SERVER_CLOSED_STREAM , Optional .empty ());
10771084 }
10781085 break ;
@@ -1085,6 +1092,13 @@ public void onNext(EphemeralStreamResponse response) {
10851092
10861093 @ Override
10871094 public void onError (Throwable t ) {
1095+ synchronized (ZerobusStream .this ) {
1096+ if (state == StreamState .CLOSED && !stream .isPresent ()) {
1097+ logger .debug ("Ignoring error on already closed stream: " + t .getMessage ());
1098+ return ;
1099+ }
1100+ }
1101+
10881102 Optional <Throwable > error = Optional .of (t );
10891103
10901104 if (t instanceof StatusRuntimeException ) {
@@ -1336,7 +1350,7 @@ public void close() throws ZerobusException {
13361350 }
13371351
13381352 public ZerobusStream (
1339- ZerobusStub stub ,
1353+ Supplier < ZerobusStub > stubSupplier ,
13401354 TableProperties <RecordType > tableProperties ,
13411355 ZerobusSdkStubFactory stubFactory ,
13421356 String serverEndpoint ,
@@ -1347,7 +1361,8 @@ public ZerobusStream(
13471361 StreamConfigurationOptions options ,
13481362 ExecutorService zerobusStreamExecutor ,
13491363 ExecutorService ec ) {
1350- this .stub = stub ;
1364+ this .stub = null ;
1365+ this .stubSupplier = stubSupplier ;
13511366 this .tableProperties = tableProperties ;
13521367 this .stubFactory = stubFactory ;
13531368 this .serverEndpoint = serverEndpoint ;
0 commit comments