diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index f9beb70602..a6f3fddea8 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -1029,6 +1029,29 @@ export class StreamManager extends EventEmitter { // Console events are not streamed (appear in final result only) } + private getStreamMode(initialMetadata?: Partial): "plan" | "exec" | undefined { + const rawMode = initialMetadata?.mode; + // Stats schema only accepts "plan" | "exec". + return rawMode === "plan" || rawMode === "exec" ? rawMode : undefined; + } + + private emitStreamStart( + workspaceId: WorkspaceId, + streamInfo: WorkspaceStreamInfo, + historySequence: number + ): void { + const streamStartMode = this.getStreamMode(streamInfo.initialMetadata); + this.emit("stream-start", { + type: "stream-start", + workspaceId: workspaceId as string, + messageId: streamInfo.messageId, + model: streamInfo.model, + historySequence, + startTime: streamInfo.startTime, + ...(streamStartMode && { mode: streamStartMode }), + } as StreamStartEvent); + } + /** * Processes a stream with guaranteed cleanup, regardless of success or failure */ @@ -1042,18 +1065,7 @@ export class StreamManager extends EventEmitter { streamInfo.state = StreamState.STREAMING; // Emit stream start event (include mode from initialMetadata if available) - // Validate mode - stats schema only accepts "plan" | "exec" for now - const rawMode = streamInfo.initialMetadata?.mode; - const streamStartMode = rawMode === "plan" || rawMode === "exec" ? rawMode : undefined; - this.emit("stream-start", { - type: "stream-start", - workspaceId: workspaceId as string, - messageId: streamInfo.messageId, - model: streamInfo.model, - historySequence, - startTime: streamInfo.startTime, - ...(streamStartMode && { mode: streamStartMode }), - } as StreamStartEvent); + this.emitStreamStart(workspaceId, streamInfo, historySequence); // Initialize token tracker for this model await this.tokenTracker.setModel(streamInfo.model); @@ -1573,7 +1585,18 @@ export class StreamManager extends EventEmitter { errorType = "authentication"; } - // Write error metadata to partial.json for persistence across reloads + await this.persistErrorState(workspaceId, streamInfo, errorMessage, errorType); + } + + /** + * Write error metadata to partial.json and emit the corresponding error event. + */ + private async persistErrorState( + workspaceId: WorkspaceId, + streamInfo: WorkspaceStreamInfo, + errorMessage: string, + errorType: StreamErrorType + ): Promise { const errorPartialMessage: MuxMessage = { id: streamInfo.messageId, role: "assistant", @@ -1588,22 +1611,24 @@ export class StreamManager extends EventEmitter { }, parts: streamInfo.parts, }; - // Wait for any in-flight partial write to complete before writing error state + + // Wait for any in-flight partial write to complete before writing error state. // This prevents race conditions where the error write and a throttled flush - // write at the same time, causing inconsistent partial.json state + // write at the same time, causing inconsistent partial.json state. if (streamInfo.partialWritePromise) { await streamInfo.partialWritePromise; } - // Write error state to disk - await to ensure consistent state before any resume + + // Write error state to disk - await to ensure consistent state before any resume. await this.partialService.writePartial(workspaceId as string, errorPartialMessage); - // Emit error event + // Emit error event. this.emit("error", { type: "error", workspaceId: workspaceId as string, messageId: streamInfo.messageId, error: errorMessage, - errorType: errorType, + errorType, } as ErrorEvent); } @@ -2283,19 +2308,7 @@ export class StreamManager extends EventEmitter { await this.tokenTracker.setModel(streamInfo.model); // Emit stream-start event (include mode from initialMetadata if available) - // Validate mode - stats schema only accepts "plan" | "exec" for now - const rawReplayMode = streamInfo.initialMetadata?.mode; - const replayMode = - rawReplayMode === "plan" || rawReplayMode === "exec" ? rawReplayMode : undefined; - this.emit("stream-start", { - type: "stream-start", - workspaceId, - messageId: streamInfo.messageId, - model: streamInfo.model, - historySequence: streamInfo.historySequence, - startTime: streamInfo.startTime, - ...(replayMode && { mode: replayMode }), - }); + this.emitStreamStart(typedWorkspaceId, streamInfo, streamInfo.historySequence); // Replay accumulated parts as events using shared emission logic. // IMPORTANT: Snapshot the parts array up-front. @@ -2347,34 +2360,7 @@ export class StreamManager extends EventEmitter { }; // Write error state to partial.json (same as real error handling) - // Wait for any in-flight partial write to complete first - if (streamInfo.partialWritePromise) { - await streamInfo.partialWritePromise; - } - const errorPartialMessage: MuxMessage = { - id: streamInfo.messageId, - role: "assistant", - metadata: { - historySequence: streamInfo.historySequence, - timestamp: streamInfo.startTime, - model: streamInfo.model, - partial: true, - error: errorMessage, - errorType: "network", // Test errors are network-like - ...streamInfo.initialMetadata, - }, - parts: streamInfo.parts, - }; - await this.partialService.writePartial(workspaceId, errorPartialMessage); - - // Emit error event (same as real error handling) - this.emit("error", { - type: "error", - workspaceId, - messageId: streamInfo.messageId, - error: errorMessage, - errorType: "network", - } as ErrorEvent); + await this.persistErrorState(typedWorkspaceId, streamInfo, errorMessage, "network"); // Wait for the stream processing to complete (cleanup) await streamInfo.processingPromise;