Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,12 @@ export class RuntimeManager {
responseType: 'stream',
}
)
.catch(this.httpErrorHandler);
.catch((err) =>
this.handleStreamError(
err,
`Error running action key='${input.key}'.`
)
);
let genkitVersion: string;
if (response.headers['x-genkit-version']) {
genkitVersion = response.headers['x-genkit-version'];
Expand Down Expand Up @@ -302,7 +307,10 @@ export class RuntimeManager {
responseType: 'stream', // Use stream to get early headers
})
.catch((err) =>
this.httpErrorHandler(err, `Error running action key='${input.key}'.`)
this.handleStreamError(
err,
`Error running action key='${input.key}'.`
)
);

const traceId = response.headers['x-genkit-trace-id'];
Expand Down Expand Up @@ -479,7 +487,7 @@ export class RuntimeManager {
responseType: 'stream',
})
.catch((err) =>
this.httpErrorHandler(
this.handleStreamError(
err,
`Error streaming trace for traceId='${traceId}'`
)
Expand Down Expand Up @@ -735,7 +743,7 @@ export class RuntimeManager {
/**
* Handles an HTTP error.
*/
private httpErrorHandler(error: AxiosError, message?: string): any {
private httpErrorHandler(error: AxiosError, message?: string): never {
const newError = new GenkitToolsError(message || 'Internal Error');

if (error.response) {
Expand All @@ -753,6 +761,57 @@ export class RuntimeManager {
});
}

/**
* Handles a stream error by reading the stream and then calling httpErrorHandler.
*/
private async handleStreamError(
error: AxiosError,
message: string
): Promise<never> {
if (
error.response &&
error.config?.responseType === 'stream' &&
(error.response.data as any).on
) {
try {
const body = await this.streamToString(error.response.data);
try {
error.response.data = JSON.parse(body);
} catch (e) {
error.response.data = {
message: body || 'Unknown error',
};
}
} catch (e) {
// If stream reading fails, we must replace the stream object with a safe error object
// to prevent circular structure errors during JSON serialization.
error.response.data = {
message: 'Failed to read error response stream',
details: String(e),
};
}
}
this.httpErrorHandler(error, message);
}

/**
* Helper to convert a stream to string.
*/
private streamToString(stream: any): Promise<string> {
return new Promise((resolve, reject) => {
let buffer = '';
stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
});
stream.on('end', () => {
resolve(buffer);
});
stream.on('error', (err: Error) => {
reject(err);
});
});
}
Comment on lines +800 to +813
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of streamToString can lead to corruption of multi-byte characters if they are split across data chunks. Calling chunk.toString() on partial character data will result in replacement characters (``).

To fix this, you should collect all Buffer chunks in an array, concatenate them into a single Buffer when the stream ends, and then convert that final buffer to a string. This ensures that multi-byte characters are decoded correctly.

Suggested change
private streamToString(stream: any): Promise<string> {
return new Promise((resolve, reject) => {
let buffer = '';
stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
});
stream.on('end', () => {
resolve(buffer);
});
stream.on('error', (err: Error) => {
reject(err);
});
});
}
private streamToString(stream: any): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
stream.on('end', () => {
resolve(Buffer.concat(chunks).toString());
});
stream.on('error', (err: Error) => {
reject(err);
});
});
}


/**
* Performs health checks on all runtimes.
*/
Expand Down
Loading