Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ export class UnauthenticatedError extends EvalProtocolError {
}
}

type EvalProtocolErrorConstructor = new (message?: string) => EvalProtocolError;

// Mapping from status codes to exception classes
const STATUS_CODE_TO_EXCEPTION = new Map<StatusCode, typeof EvalProtocolError | null>([
const STATUS_CODE_TO_EXCEPTION = new Map<StatusCode, EvalProtocolErrorConstructor | null>([
[StatusCode.OK, null],
[StatusCode.CANCELLED, CancelledError],
[StatusCode.UNKNOWN, UnknownError],
Expand Down Expand Up @@ -148,7 +150,7 @@ export function exceptionForStatusCode(code: StatusCode, message: string = ''):
if (!exceptionClass) {
return null;
}
return new exceptionClass(message, code);
return new exceptionClass(message);
}

/**
Expand Down
5 changes: 3 additions & 2 deletions tests/test_ep_upload_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ async def test_quickstart_eval(row: EvaluationRow) -> EvaluationRow:

test_project_dir, test_file_path = create_test_project_with_evaluation_test(
test_content,
"quickstart.py", # Non test_* filename
"ep_upload_non_test_prefixed_eval.py", # Non test_* filename
)

original_cwd = os.getcwd()
Expand All @@ -423,7 +423,8 @@ async def test_quickstart_eval(row: EvaluationRow) -> EvaluationRow:

assert len(discovered_tests) == 1
assert "test_quickstart_eval" in discovered_tests[0].qualname
assert "quickstart.py" in discovered_tests[0].file_path
# Verify we discovered a non-test-prefixed file (our unique filename)
assert "ep_upload_non_test_prefixed_eval.py" in discovered_tests[0].file_path

finally:
os.chdir(original_cwd)
Expand Down
119 changes: 6 additions & 113 deletions typescript/index.ts
Original file line number Diff line number Diff line change
@@ -1,113 +1,6 @@
import z from "zod";
import type { ChatCompletionCreateParamsNonStreaming } from "openai/resources/chat/completions/completions";

// Zod schemas for validation
const roleSchema = z.enum(["system", "user", "assistant"]);
const messageSchema = z.union([
z.object({
role: roleSchema,
content: z.string(),
}),
z.object({
role: z.literal("tool"),
content: z.string(),
tool_call_id: z.string(),
}),
]);

const functionDefinitionSchema = z
.object({
name: z.string().regex(/^[a-zA-Z0-9_-]{1,64}$/),
description: z.string().optional(),
// JSON Schema object; allow arbitrary keys
parameters: z.object({}).loose().optional(),
})
.loose();

const toolSchema = z.object({
type: z.literal("function"),
function: functionDefinitionSchema,
});

const metadataSchema = z
.object({
invocation_id: z.string(),
experiment_id: z.string(),
rollout_id: z.string(),
run_id: z.string(),
row_id: z.string(),
})
.loose();

export const initRequestSchema = z.object({
completion_params: z.record(z.string(), z.any()).describe("Completion parameters including model and optional model_kwargs, temperature, etc."),
messages: z.array(messageSchema).optional(),
tools: z.array(toolSchema).optional().nullable(),
metadata: metadataSchema,
model_base_url: z.string().optional().nullable(),
});

export const statusInfoSchema = z.record(z.string(), z.any());

export const statusResponseSchema = z.object({
terminated: z.boolean(),
info: statusInfoSchema.optional(),
});

// Infer types from schemas
export type Message = z.infer<typeof messageSchema>;
export type FunctionDefinition = z.infer<typeof functionDefinitionSchema>;
export type Tool = z.infer<typeof toolSchema>;
export type Metadata = z.infer<typeof metadataSchema>;
export type InitRequest = z.infer<typeof initRequestSchema>;
export type StatusInfo = z.infer<typeof statusInfoSchema>;
export type StatusResponse = z.infer<typeof statusResponseSchema>;

export function initRequestToCompletionParams(
initRequest: InitRequest
): ChatCompletionCreateParamsNonStreaming {
const model = initRequest.completion_params?.['model'];
if (!model) {
throw new Error("model is required in completion_params");
}

const toolsToOpenAI = initRequest.tools?.map((tool) => ({
type: "function" as const,
function: tool.function.description
? {
name: tool.function.name,
description: tool.function.description,
parameters: tool.function.parameters || {},
}
: {
name: tool.function.name,
parameters: tool.function.parameters || {},
},
}));

if (!initRequest.messages) {
throw new Error("messages is required");
}

// Spread completion_params directly (model, temperature, max_tokens, etc.)
const { model: _, ...otherParams } = initRequest.completion_params || {};

const completionParams: ChatCompletionCreateParamsNonStreaming = {
model: model,
messages: initRequest.messages,
...(toolsToOpenAI && { tools: toolsToOpenAI }),
...otherParams // Spreads temperature, max_tokens, etc.
};

return completionParams;
}

export function createLangfuseConfigTags(initRequest: InitRequest): string[] {
return [
`invocation_id:${initRequest.metadata.invocation_id}`,
`experiment_id:${initRequest.metadata.experiment_id}`,
`rollout_id:${initRequest.metadata.rollout_id}`,
`run_id:${initRequest.metadata.run_id}`,
`row_id:${initRequest.metadata.row_id}`,
];
}
export * from "./models/types.js";
export * from "./models/status.js";
export * from "./models/exceptions.js";
export * from "./logging/fireworks-transport.js";
export * from "./logging/logger.js";
export * from "./logging/fireworks-vercel.js";
223 changes: 223 additions & 0 deletions typescript/logging/fireworks-transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/**
* Winston transport that sends logs to Fireworks tracing gateway.
*/

import Transport from 'winston-transport';
import type { TransformableInfo } from 'logform';
const LEVEL = Symbol.for('level');

interface FireworksLogInfo extends TransformableInfo {
rollout_id?: string;
experiment_id?: string;
run_id?: string;
rollout_ids?: string[];
status?: any;
program?: string;
logger_name?: string;
[key: string]: any;
}

interface StatusInfo {
code?: number;
message?: string;
details?: any[];
}

interface FireworksPayload {
program: string;
status?: StatusInfo | null;
message: string;
tags: string[];
extras: {
logger_name: string;
level: string;
timestamp: string;
};
}

export class FireworksTransport extends Transport {
private gatewayBaseUrl: string;
private rolloutIdEnv: string;
private apiKey?: string;
private waitUntil?: (promise: Promise<any>) => void;

constructor(opts: {
gatewayBaseUrl?: string;
rolloutIdEnv?: string;
waitUntil?: (promise: Promise<any>) => void;
} = {}) {
super();

this.gatewayBaseUrl =
opts.gatewayBaseUrl ||
process.env.FW_TRACING_GATEWAY_BASE_URL ||
'https://tracing.fireworks.ai';

this.rolloutIdEnv = opts.rolloutIdEnv || 'EP_ROLLOUT_ID';
this.apiKey = process.env.FIREWORKS_API_KEY;
this.waitUntil = opts.waitUntil;
}

log(info: FireworksLogInfo, callback: () => void) {
setImmediate(() => {
this.emit('logged', info);
});

const sendPromise = this.sendToFireworks(info).catch((error) => {
this.emit('error', error);
});

// Use waitUntil for ALL logs when available so Fireworks logging
// can complete even after the HTTP response is sent.
if (this.waitUntil) {
this.waitUntil(sendPromise);
}

callback();
}

private async sendToFireworks(info: FireworksLogInfo): Promise<void> {
if (!this.gatewayBaseUrl) {
return;
}

const rolloutId = this.getRolloutId(info);
if (!rolloutId) {
return;
}

const payload = this.buildPayload(info, rolloutId);
const baseUrl = this.gatewayBaseUrl.replace(/\/$/, '');
const url = `${baseUrl}/logs`;

// Debug logging
if (process.env.EP_DEBUG === 'true') {
const tagsLen = Array.isArray(payload.tags) ? payload.tags.length : 0;
const msgPreview = typeof payload.message === 'string'
? payload.message.substring(0, 80)
: payload.message;
const payloadSize = JSON.stringify(payload).length;
const hasStatus = !!payload.status;
console.log(`[FW_LOG] POST ${url} rollout_id=${rolloutId} tags=${tagsLen} msg=${msgPreview} size=${payloadSize} hasStatus=${hasStatus}`);
}

try {
const headers: HeadersInit = {
'Content-Type': 'application/json',
'User-Agent': 'winston-fireworks-transport/1.0.0',
};

if (this.apiKey) {
headers['Authorization'] = `Bearer ${this.apiKey}`;
}

const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(payload),
// No timeout signal for compatibility
});

if (process.env.EP_DEBUG === 'true') {
console.log(`[FW_LOG] resp=${response.status} for rollout_id=${rolloutId}`);
}

// Fallback to /v1/logs if /logs is not found
if (response.status === 404) {
const altUrl = `${baseUrl}/v1/logs`;

if (process.env.EP_DEBUG === 'true') {
const tagsLen = Array.isArray(payload.tags) ? payload.tags.length : 0;
console.log(`[FW_LOG] RETRY POST ${altUrl} rollout_id=${rolloutId} tags=${tagsLen}`);
}

const retryResponse = await fetch(altUrl, {
method: 'POST',
headers,
body: JSON.stringify(payload),
// No timeout signal for compatibility
});

if (process.env.EP_DEBUG === 'true') {
console.log(`[FW_LOG] retry resp=${retryResponse.status}`);
}
}

} catch (error: any) {
// Silently handle errors - logging should not break the application
if (process.env.EP_DEBUG === 'true') {
console.error(`[FW_LOG] Error sending to Fireworks:`, error.message);
console.error(`[FW_LOG] Payload was:`, JSON.stringify(payload, null, 2));
}
}
}

private getRolloutId(info: FireworksLogInfo): string | null {
// Check if rollout_id is in the log info
if (info.rollout_id && typeof info.rollout_id === 'string') {
return info.rollout_id;
}

// Fallback to environment variable
return process.env[this.rolloutIdEnv] || null;
}

private getStatusInfo(info: FireworksLogInfo): StatusInfo | null {
if (!info.status) {
return null;
}

const status = info.status;

// Handle Status class instances (with code and message properties)
if (typeof status === 'object' && status !== null && 'code' in status && 'message' in status) {
return {
code: typeof status.code === 'number' ? status.code : undefined,
message: typeof status.message === 'string' ? status.message : undefined,
details: Array.isArray(status.details) ? status.details : [],
};
}

return null;
}

private buildPayload(info: FireworksLogInfo, rolloutId: string): FireworksPayload {
const timestamp = new Date().toISOString();
// Ensure message is always a string for Fireworks payload
const message: string = typeof info.message === 'string' ? info.message : '';
const level = (info as any)[LEVEL] || info.level || 'info';

const tags: string[] = [`rollout_id:${rolloutId}`];

// Optional additional tags
if (info.experiment_id && typeof info.experiment_id === 'string') {
tags.push(`experiment_id:${info.experiment_id}`);
}
if (info.run_id && typeof info.run_id === 'string') {
tags.push(`run_id:${info.run_id}`);
}

// Groupwise list of rollout_ids
if (Array.isArray(info.rollout_ids)) {
for (const rid of info.rollout_ids) {
if (typeof rid === 'string') {
tags.push(`rollout_id:${rid}`);
}
}
}

const program = (typeof info.program === 'string' ? info.program : null) || 'eval_protocol';

return {
program,
status: this.getStatusInfo(info),
message,
tags,
extras: {
logger_name: info.logger_name || 'winston',
level: level.toUpperCase(),
timestamp,
},
};
}
}
Loading
Loading