@@ -47,6 +47,7 @@ export class StreamConsumerService implements OnModuleDestroy {
4747 subscriberId : string ;
4848 resumeFromSeq ?: number ;
4949 sessionHandle ?: RuntimeSessionHandle ;
50+ pollOnly ?: boolean ;
5051 } ) : Promise < void > {
5152 if ( this . active . has ( params . runId ) ) return ;
5253 const marker : ActiveStream = {
@@ -116,6 +117,7 @@ export class StreamConsumerService implements OnModuleDestroy {
116117 runtimeSessionId : string ;
117118 subscriberId : string ;
118119 sessionHandle ?: RuntimeSessionHandle ;
120+ pollOnly ?: boolean ;
119121 }
120122 ) : Promise < void > {
121123 const provider = this . runtimeRegistry . get ( params . runtimeKind ) ;
@@ -125,31 +127,29 @@ export class StreamConsumerService implements OnModuleDestroy {
125127 runtimeSessionId : params . runtimeSessionId
126128 } ;
127129
128- let retries = 0 ;
129130 const maxRetries = this . config . streamMaxRetries ;
130- let isFirstIteration = true ;
131131
132- while ( ! marker . aborted ) {
132+ // If we have a session handle and not poll-only, consume the stream first
133+ if ( params . sessionHandle && ! params . pollOnly ) {
133134 try {
134- // First iteration: use the session handle's events if provided
135- // Subsequent iterations (reconnection): fall back to streamSession()
136- const iterable = ( isFirstIteration && params . sessionHandle )
137- ? params . sessionHandle . events
138- : provider . streamSession ( {
139- runId : params . runId ,
140- runtimeSessionId : params . runtimeSessionId ,
141- modeName : params . execution . session . modeName ,
142- subscriberId : params . subscriberId
143- } ) ;
144- isFirstIteration = false ;
145-
146- for await ( const raw of this . withIdleTimeout ( iterable , this . config . streamIdleTimeoutMs ) ) {
135+ for await ( const raw of this . withIdleTimeout ( params . sessionHandle . events , this . config . streamIdleTimeoutMs ) ) {
147136 if ( marker . aborted ) return ;
148137 await this . handleRawEvent ( params . runId , raw , context , params . runtimeSessionId , marker ) ;
149138 if ( marker . finalized ) return ;
150- retries = 0 ;
151139 }
140+ } catch ( error ) {
141+ marker . connected = false ;
142+ this . logger . warn ( `stream error for run ${ params . runId } : ${ error instanceof Error ? error . message : String ( error ) } ` ) ;
143+ }
144+
145+ // Stream ended — check if already finalized
146+ if ( marker . finalized || marker . aborted ) return ;
147+ }
152148
149+ // Polling fallback: poll getSession() until terminal state or max retries
150+ let retries = 0 ;
151+ while ( ! marker . aborted && ! marker . finalized ) {
152+ try {
153153 const snapshot = await provider . getSession ( {
154154 runId : params . runId ,
155155 runtimeSessionId : params . runtimeSessionId ,
@@ -172,72 +172,28 @@ export class StreamConsumerService implements OnModuleDestroy {
172172 await this . finalizeRun ( params . runId , marker , 'failed' , new Error ( 'runtime session expired' ) ) ;
173173 return ;
174174 }
175+ } catch ( pollError ) {
176+ this . logger . warn (
177+ `getSession poll failed for run ${ params . runId } : ${ pollError instanceof Error ? pollError . message : String ( pollError ) } `
178+ ) ;
179+ }
175180
176- retries += 1 ;
177- if ( retries > maxRetries ) {
178- await this . finalizeRun ( params . runId , marker , 'failed' , new Error ( 'stream ended without terminal session state' ) ) ;
179- return ;
180- }
181-
182- await this . eventService . emitControlPlaneEvents ( params . runId , [
183- {
184- ts : new Date ( ) . toISOString ( ) ,
185- type : 'session.stream.opened' ,
186- source : { kind : 'control-plane' , name : 'stream-consumer' } ,
187- subject : { kind : 'session' , id : params . runtimeSessionId } ,
188- data : { status : 'reconnecting' , detail : 'stream ended before terminal state; retrying' }
189- }
190- ] ) ;
191- await new Promise ( ( resolve ) => setTimeout ( resolve , this . backoffMs ( retries ) ) ) ;
192- } catch ( error ) {
193- marker . connected = false ;
194- retries += 1 ;
195- this . logger . warn ( `stream error for run ${ params . runId } : ${ error instanceof Error ? error . message : String ( error ) } ` ) ;
196- await this . eventService . emitControlPlaneEvents ( params . runId , [
197- {
198- ts : new Date ( ) . toISOString ( ) ,
199- type : 'session.stream.opened' ,
200- source : { kind : 'control-plane' , name : 'stream-consumer' } ,
201- subject : { kind : 'session' , id : params . runtimeSessionId } ,
202- data : { status : 'reconnecting' , detail : error instanceof Error ? error . message : String ( error ) }
203- }
204- ] ) ;
205-
206- if ( retries > maxRetries ) {
207- await this . finalizeRun ( params . runId , marker , 'failed' , error ) ;
208- return ;
209- }
181+ retries += 1 ;
182+ if ( retries > maxRetries ) {
183+ await this . finalizeRun ( params . runId , marker , 'failed' , new Error ( 'polling exhausted without terminal session state' ) ) ;
184+ return ;
185+ }
210186
211- try {
212- const snapshot = await provider . getSession ( {
213- runId : params . runId ,
214- runtimeSessionId : params . runtimeSessionId ,
215- requesterId : params . subscriberId
216- } ) ;
217- await this . handleRawEvent (
218- params . runId ,
219- { kind : 'session-snapshot' , receivedAt : new Date ( ) . toISOString ( ) , sessionSnapshot : snapshot } ,
220- context ,
221- params . runtimeSessionId ,
222- marker
223- ) ;
224- if ( marker . finalized ) return ;
225- if ( snapshot . state === 'SESSION_STATE_RESOLVED' ) {
226- await this . finalizeRun ( params . runId , marker , 'completed' ) ;
227- return ;
228- }
229- if ( snapshot . state === 'SESSION_STATE_EXPIRED' ) {
230- await this . finalizeRun ( params . runId , marker , 'failed' , new Error ( 'runtime session expired' ) ) ;
231- return ;
232- }
233- } catch ( snapshotError ) {
234- this . logger . warn (
235- `reconciliation failed for run ${ params . runId } : ${ snapshotError instanceof Error ? snapshotError . message : String ( snapshotError ) } `
236- ) ;
187+ await this . eventService . emitControlPlaneEvents ( params . runId , [
188+ {
189+ ts : new Date ( ) . toISOString ( ) ,
190+ type : 'session.stream.opened' ,
191+ source : { kind : 'control-plane' , name : 'stream-consumer' } ,
192+ subject : { kind : 'session' , id : params . runtimeSessionId } ,
193+ data : { status : 'reconnecting' , detail : 'polling getSession for terminal state' }
237194 }
238-
239- await new Promise ( ( resolve ) => setTimeout ( resolve , this . backoffMs ( retries ) ) ) ;
240- }
195+ ] ) ;
196+ await new Promise ( ( resolve ) => setTimeout ( resolve , this . backoffMs ( retries ) ) ) ;
241197 }
242198 }
243199
0 commit comments