Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 45 additions & 59 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,29 @@ export class StreamManager extends EventEmitter {
// Console events are not streamed (appear in final result only)
}

private getStreamMode(initialMetadata?: Partial<MuxMetadata>): "plan" | "exec" | undefined {
const rawMode = initialMetadata?.mode;
// Stats schema only accepts "plan" | "exec".
return rawMode === "plan" || rawMode === "exec" ? rawMode : undefined;
}

private emitStreamStart(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo,
historySequence: number
): void {
const streamStartMode = this.getStreamMode(streamInfo.initialMetadata);
this.emit("stream-start", {
type: "stream-start",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
model: streamInfo.model,
historySequence,
startTime: streamInfo.startTime,
...(streamStartMode && { mode: streamStartMode }),
} as StreamStartEvent);
}

/**
* Processes a stream with guaranteed cleanup, regardless of success or failure
*/
Expand All @@ -1042,18 +1065,7 @@ export class StreamManager extends EventEmitter {
streamInfo.state = StreamState.STREAMING;

// Emit stream start event (include mode from initialMetadata if available)
// Validate mode - stats schema only accepts "plan" | "exec" for now
const rawMode = streamInfo.initialMetadata?.mode;
const streamStartMode = rawMode === "plan" || rawMode === "exec" ? rawMode : undefined;
this.emit("stream-start", {
type: "stream-start",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
model: streamInfo.model,
historySequence,
startTime: streamInfo.startTime,
...(streamStartMode && { mode: streamStartMode }),
} as StreamStartEvent);
this.emitStreamStart(workspaceId, streamInfo, historySequence);

// Initialize token tracker for this model
await this.tokenTracker.setModel(streamInfo.model);
Expand Down Expand Up @@ -1573,7 +1585,18 @@ export class StreamManager extends EventEmitter {
errorType = "authentication";
}

// Write error metadata to partial.json for persistence across reloads
await this.persistErrorState(workspaceId, streamInfo, errorMessage, errorType);
}

/**
* Write error metadata to partial.json and emit the corresponding error event.
*/
private async persistErrorState(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo,
errorMessage: string,
errorType: StreamErrorType
): Promise<void> {
const errorPartialMessage: MuxMessage = {
id: streamInfo.messageId,
role: "assistant",
Expand All @@ -1588,22 +1611,24 @@ export class StreamManager extends EventEmitter {
},
parts: streamInfo.parts,
};
// Wait for any in-flight partial write to complete before writing error state

// Wait for any in-flight partial write to complete before writing error state.
// This prevents race conditions where the error write and a throttled flush
// write at the same time, causing inconsistent partial.json state
// write at the same time, causing inconsistent partial.json state.
if (streamInfo.partialWritePromise) {
await streamInfo.partialWritePromise;
}
// Write error state to disk - await to ensure consistent state before any resume

// Write error state to disk - await to ensure consistent state before any resume.
await this.partialService.writePartial(workspaceId as string, errorPartialMessage);

// Emit error event
// Emit error event.
this.emit("error", {
type: "error",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
error: errorMessage,
errorType: errorType,
errorType,
} as ErrorEvent);
}

Expand Down Expand Up @@ -2283,19 +2308,7 @@ export class StreamManager extends EventEmitter {
await this.tokenTracker.setModel(streamInfo.model);

// Emit stream-start event (include mode from initialMetadata if available)
// Validate mode - stats schema only accepts "plan" | "exec" for now
const rawReplayMode = streamInfo.initialMetadata?.mode;
const replayMode =
rawReplayMode === "plan" || rawReplayMode === "exec" ? rawReplayMode : undefined;
this.emit("stream-start", {
type: "stream-start",
workspaceId,
messageId: streamInfo.messageId,
model: streamInfo.model,
historySequence: streamInfo.historySequence,
startTime: streamInfo.startTime,
...(replayMode && { mode: replayMode }),
});
this.emitStreamStart(typedWorkspaceId, streamInfo, streamInfo.historySequence);

// Replay accumulated parts as events using shared emission logic.
// IMPORTANT: Snapshot the parts array up-front.
Expand Down Expand Up @@ -2347,34 +2360,7 @@ export class StreamManager extends EventEmitter {
};

// Write error state to partial.json (same as real error handling)
// Wait for any in-flight partial write to complete first
if (streamInfo.partialWritePromise) {
await streamInfo.partialWritePromise;
}
const errorPartialMessage: MuxMessage = {
id: streamInfo.messageId,
role: "assistant",
metadata: {
historySequence: streamInfo.historySequence,
timestamp: streamInfo.startTime,
model: streamInfo.model,
partial: true,
error: errorMessage,
errorType: "network", // Test errors are network-like
...streamInfo.initialMetadata,
},
parts: streamInfo.parts,
};
await this.partialService.writePartial(workspaceId, errorPartialMessage);

// Emit error event (same as real error handling)
this.emit("error", {
type: "error",
workspaceId,
messageId: streamInfo.messageId,
error: errorMessage,
errorType: "network",
} as ErrorEvent);
await this.persistErrorState(typedWorkspaceId, streamInfo, errorMessage, "network");

// Wait for the stream processing to complete (cleanup)
await streamInfo.processingPromise;
Expand Down
Loading