diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9820f9dcbf..0d6bc17d87 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2050,6 +2050,25 @@ importers: specifier: 'catalog:' version: 0.105.0 + tegg/core/agent-runtime: + dependencies: + '@eggjs/controller-decorator': + specifier: workspace:* + version: link:../controller-decorator + '@eggjs/tegg-runtime': + specifier: workspace:* + version: link:../runtime + '@eggjs/tegg-types': + specifier: workspace:* + version: link:../types + devDependencies: + '@types/node': + specifier: 'catalog:' + version: 24.10.2 + typescript: + specifier: 'catalog:' + version: 5.9.3 + tegg/core/ajv-decorator: dependencies: ajv: @@ -2679,6 +2698,9 @@ importers: tegg/core/tegg: dependencies: + '@eggjs/agent-runtime': + specifier: workspace:* + version: link:../agent-runtime '@eggjs/ajv-decorator': specifier: workspace:* version: link:../ajv-decorator @@ -2954,6 +2976,9 @@ importers: tegg/plugin/controller: dependencies: + '@eggjs/agent-runtime': + specifier: workspace:* + version: link:../../core/agent-runtime '@eggjs/controller-decorator': specifier: workspace:* version: link:../../core/controller-decorator diff --git a/tegg/core/agent-runtime/package.json b/tegg/core/agent-runtime/package.json new file mode 100644 index 0000000000..8aad07ebd5 --- /dev/null +++ b/tegg/core/agent-runtime/package.json @@ -0,0 +1,56 @@ +{ + "name": "@eggjs/agent-runtime", + "version": "4.0.2-beta.1", + "description": "Smart default runtime for @AgentController in tegg", + "keywords": [ + "agent", + "egg", + "tegg", + "typescript" + ], + "homepage": "https://github.com/eggjs/egg/tree/next/tegg/core/agent-runtime", + "bugs": { + "url": "https://github.com/eggjs/egg/issues" + }, + "license": "MIT", + "author": "killagu ", + "repository": { + "type": "git", + "url": "git+https://github.com/eggjs/egg.git", + "directory": "tegg/core/agent-runtime" + }, + "files": [ + "dist" + ], + "type": "module", + "main": "./dist/index.js", + "module": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": "./src/index.ts", + "./package.json": "./package.json" + }, + "publishConfig": { + "access": "public", + "exports": { + ".": "./dist/index.js", + "./package.json": "./package.json" + } + }, + "scripts": { + "typecheck": "tsgo --noEmit", + "test": "vitest run" + }, + "dependencies": { + "@eggjs/controller-decorator": "workspace:*", + "@eggjs/tegg-runtime": "workspace:*", + "@eggjs/tegg-types": "workspace:*" + }, + "devDependencies": { + "@types/node": "catalog:", + "typescript": "catalog:" + }, + "engines": { + "node": ">=22.18.0" + } +} diff --git a/tegg/core/agent-runtime/src/AgentRuntime.ts b/tegg/core/agent-runtime/src/AgentRuntime.ts new file mode 100644 index 0000000000..8d37b36542 --- /dev/null +++ b/tegg/core/agent-runtime/src/AgentRuntime.ts @@ -0,0 +1,371 @@ +import type { + CreateRunInput, + ThreadObject, + ThreadObjectWithMessages, + RunObject, + MessageObject, + MessageDeltaObject, + MessageContentBlock, + AgentStreamMessage, +} from '@eggjs/controller-decorator'; +import { RunStatus, AgentSSEEvent, AgentObjectType, MessageRole, MessageStatus } from '@eggjs/controller-decorator'; + +import type { AgentStore } from './AgentStore.ts'; +import { AgentConflictError } from './errors.ts'; +import { toContentBlocks, extractFromStreamMessages, toInputMessageObjects } from './MessageConverter.ts'; +import { RunBuilder } from './RunBuilder.ts'; +import type { RunUsage } from './RunBuilder.ts'; +import type { SSEWriter } from './SSEWriter.ts'; +import { nowUnix, newMsgId } from './utils.ts'; + +export const AGENT_RUNTIME: unique symbol = Symbol('agentRuntime'); + +/** + * The host interface — only requires execRun so the runtime can delegate + * execution back through the controller's prototype chain (AOP/mock friendly). + */ +export interface AgentControllerHost { + execRun(input: CreateRunInput, signal?: AbortSignal): AsyncGenerator; +} + +export interface AgentRuntimeLogger { + error(...args: unknown[]): void; +} + +export interface AgentRuntimeOptions { + host: AgentControllerHost; + store: AgentStore; + logger: AgentRuntimeLogger; +} + +export class AgentRuntime { + private static readonly TERMINAL_RUN_STATUSES = new Set([ + RunStatus.Completed, + RunStatus.Failed, + RunStatus.Cancelled, + RunStatus.Expired, + ]); + + private store: AgentStore; + private runningTasks: Map; abortController: AbortController }>; + private host: AgentControllerHost; + private logger: AgentRuntimeLogger; + + constructor(options: AgentRuntimeOptions) { + this.host = options.host; + this.store = options.store; + if (!options.logger) { + throw new Error('AgentRuntimeOptions.logger is required'); + } + this.logger = options.logger; + this.runningTasks = new Map(); + } + + async createThread(): Promise { + const thread = await this.store.createThread(); + return { + id: thread.id, + object: AgentObjectType.Thread, + created_at: thread.created_at, + metadata: thread.metadata ?? {}, + }; + } + + async getThread(threadId: string): Promise { + const thread = await this.store.getThread(threadId); + return { + id: thread.id, + object: AgentObjectType.Thread, + created_at: thread.created_at, + metadata: thread.metadata ?? {}, + messages: thread.messages, + }; + } + + async syncRun(input: CreateRunInput): Promise { + let threadId = input.thread_id; + if (!threadId) { + const thread = await this.store.createThread(); + threadId = thread.id; + input = { ...input, thread_id: threadId }; + } + + const run = await this.store.createRun(input.input.messages, threadId, input.config, input.metadata); + const rb = RunBuilder.create(run, threadId); + + try { + await this.store.updateRun(run.id, rb.start()); + + const streamMessages: AgentStreamMessage[] = []; + for await (const msg of this.host.execRun(input)) { + streamMessages.push(msg); + } + const { output, usage } = extractFromStreamMessages(streamMessages, run.id); + + await this.store.updateRun(run.id, rb.complete(output, usage)); + + await this.store.appendMessages(threadId, [...toInputMessageObjects(input.input.messages, threadId), ...output]); + + return rb.snapshot(); + } catch (err: unknown) { + await this.store.updateRun(run.id, rb.fail(err as Error)); + throw err; + } + } + + async asyncRun(input: CreateRunInput): Promise { + let threadId = input.thread_id; + if (!threadId) { + const thread = await this.store.createThread(); + threadId = thread.id; + input = { ...input, thread_id: threadId }; + } + + const run = await this.store.createRun(input.input.messages, threadId, input.config, input.metadata); + const rb = RunBuilder.create(run, threadId); + + const abortController = new AbortController(); + + // Capture queued snapshot before background task mutates state + const queuedSnapshot = rb.snapshot(); + + const promise = (async () => { + try { + await this.store.updateRun(run.id, rb.start()); + + const streamMessages: AgentStreamMessage[] = []; + for await (const msg of this.host.execRun(input, abortController.signal)) { + if (abortController.signal.aborted) break; + streamMessages.push(msg); + } + + if (abortController.signal.aborted) return; + + const { output, usage } = extractFromStreamMessages(streamMessages, run.id); + + await this.store.updateRun(run.id, rb.complete(output, usage)); + + await this.store.appendMessages(threadId!, [ + ...toInputMessageObjects(input.input.messages, threadId), + ...output, + ]); + } catch (err: unknown) { + if (!abortController.signal.aborted) { + try { + await this.store.updateRun(run.id, rb.fail(err as Error)); + } catch (storeErr) { + this.logger.error('[AgentController] failed to update run status after error:', storeErr); + } + } else { + this.logger.error('[AgentController] execRun error during abort:', err); + } + } finally { + this.runningTasks.delete(run.id); + } + })(); + + this.runningTasks.set(run.id, { promise, abortController }); + + return queuedSnapshot; + } + + async streamRun(input: CreateRunInput, writer: SSEWriter): Promise { + // Abort execRun generator when client disconnects + const abortController = new AbortController(); + writer.onClose(() => abortController.abort()); + + let threadId = input.thread_id; + if (!threadId) { + const thread = await this.store.createThread(); + threadId = thread.id; + input = { ...input, thread_id: threadId }; + } + + const run = await this.store.createRun(input.input.messages, threadId, input.config, input.metadata); + const rb = RunBuilder.create(run, threadId); + + // event: thread.run.created + writer.writeEvent(AgentSSEEvent.ThreadRunCreated, rb.snapshot()); + + // event: thread.run.in_progress + await this.store.updateRun(run.id, rb.start()); + writer.writeEvent(AgentSSEEvent.ThreadRunInProgress, rb.snapshot()); + + const msgId = newMsgId(); + + // event: thread.message.created + const msgObj: MessageObject = { + id: msgId, + object: AgentObjectType.ThreadMessage, + created_at: nowUnix(), + run_id: run.id, + role: MessageRole.Assistant, + status: MessageStatus.InProgress, + content: [], + }; + writer.writeEvent(AgentSSEEvent.ThreadMessageCreated, msgObj); + + try { + const { content, usage, aborted } = await this.consumeStreamMessages( + input, + abortController.signal, + writer, + msgId, + ); + + if (aborted) { + try { + await this.store.updateRun(run.id, rb.cancel()); + } catch { + // Ignore store update failure during abort + } + if (!writer.closed) { + writer.writeEvent(AgentSSEEvent.ThreadRunCancelled, rb.snapshot()); + } + return; + } + + // event: thread.message.completed + msgObj.status = MessageStatus.Completed; + msgObj.content = content; + writer.writeEvent(AgentSSEEvent.ThreadMessageCompleted, msgObj); + + // Persist and emit completion + const output: MessageObject[] = content.length > 0 ? [msgObj] : []; + await this.store.updateRun(run.id, rb.complete(output, usage)); + await this.store.appendMessages(threadId!, [...toInputMessageObjects(input.input.messages, threadId), ...output]); + + // event: thread.run.completed + writer.writeEvent(AgentSSEEvent.ThreadRunCompleted, rb.snapshot()); + } catch (err: unknown) { + try { + await this.store.updateRun(run.id, rb.fail(err as Error)); + } catch (storeErr) { + this.logger.error('[AgentController] failed to update run status after error:', storeErr); + } + + // event: thread.run.failed + if (!writer.closed) { + writer.writeEvent(AgentSSEEvent.ThreadRunFailed, rb.snapshot()); + } + } finally { + // event: done + if (!writer.closed) { + writer.writeEvent(AgentSSEEvent.Done, '[DONE]'); + writer.end(); + } + } + } + + /** + * Consume the execRun async generator, emitting SSE message.delta events + * for each chunk and accumulating content blocks and token usage. + */ + private async consumeStreamMessages( + input: CreateRunInput, + signal: AbortSignal, + writer: SSEWriter, + msgId: string, + ): Promise<{ content: MessageContentBlock[]; usage?: RunUsage; aborted: boolean }> { + const content: MessageContentBlock[] = []; + let promptTokens = 0; + let completionTokens = 0; + let hasUsage = false; + + for await (const msg of this.host.execRun(input, signal)) { + if (signal.aborted) break; + if (msg.message) { + const contentBlocks = toContentBlocks(msg.message); + content.push(...contentBlocks); + + // event: thread.message.delta + const delta: MessageDeltaObject = { + id: msgId, + object: AgentObjectType.ThreadMessageDelta, + delta: { content: contentBlocks }, + }; + writer.writeEvent(AgentSSEEvent.ThreadMessageDelta, delta); + } + if (msg.usage) { + hasUsage = true; + promptTokens += msg.usage.prompt_tokens ?? 0; + completionTokens += msg.usage.completion_tokens ?? 0; + } + } + + return { + content, + usage: hasUsage ? { promptTokens, completionTokens, totalTokens: promptTokens + completionTokens } : undefined, + aborted: signal.aborted, + }; + } + + async getRun(runId: string): Promise { + const run = await this.store.getRun(runId); + return { + id: run.id, + object: AgentObjectType.ThreadRun, + created_at: run.created_at, + thread_id: run.thread_id, + status: run.status, + last_error: run.last_error, + started_at: run.started_at, + completed_at: run.completed_at, + cancelled_at: run.cancelled_at, + failed_at: run.failed_at, + usage: run.usage, + output: run.output, + config: run.config, + metadata: run.metadata, + }; + } + + async cancelRun(runId: string): Promise { + // Abort running task first to prevent it from writing completed status + const task = this.runningTasks.get(runId); + if (task) { + task.abortController.abort(); + // Wait for the background task to finish so it won't race with our update + await task.promise.catch(() => { + /* ignore */ + }); + } + + // Re-read run status after background task has settled + const run = await this.store.getRun(runId); + if (AgentRuntime.TERMINAL_RUN_STATUSES.has(run.status)) { + throw new AgentConflictError(`Cannot cancel run with status '${run.status}'`); + } + + const rb = RunBuilder.create(run, run.thread_id ?? ''); + await this.store.updateRun(runId, rb.cancel()); + + return rb.snapshot(); + } + + /** Wait for all in-flight background tasks to complete naturally (without aborting). */ + async waitForPendingTasks(): Promise { + if (this.runningTasks.size) { + const pending = Array.from(this.runningTasks.values()).map((t) => t.promise); + await Promise.allSettled(pending); + } + } + + async destroy(): Promise { + // Abort all in-flight background tasks, then wait for them to settle + for (const task of this.runningTasks.values()) { + task.abortController.abort(); + } + await this.waitForPendingTasks(); + + // Destroy store + if (this.store.destroy) { + await this.store.destroy(); + } + } +} + +/** Factory function — avoids the spread-arg type issue with dynamic delegation. */ +export function createAgentRuntime(options: AgentRuntimeOptions): AgentRuntime { + return new AgentRuntime(options); +} diff --git a/tegg/core/agent-runtime/src/AgentStore.ts b/tegg/core/agent-runtime/src/AgentStore.ts new file mode 100644 index 0000000000..d30c28023b --- /dev/null +++ b/tegg/core/agent-runtime/src/AgentStore.ts @@ -0,0 +1,49 @@ +import type { + InputMessage, + MessageObject, + AgentRunConfig, + RunStatus, + AgentObjectType, +} from '@eggjs/controller-decorator'; + +export interface ThreadRecord { + id: string; + object: typeof AgentObjectType.Thread; + messages: MessageObject[]; + metadata: Record; + created_at: number; // Unix seconds +} + +export interface RunRecord { + id: string; + object: typeof AgentObjectType.ThreadRun; + thread_id?: string; + status: RunStatus; + input: InputMessage[]; + output?: MessageObject[]; + last_error?: { code: string; message: string } | null; + usage?: { prompt_tokens: number; completion_tokens: number; total_tokens: number } | null; + config?: AgentRunConfig; + metadata?: Record; + created_at: number; + started_at?: number | null; + completed_at?: number | null; + cancelled_at?: number | null; + failed_at?: number | null; +} + +export interface AgentStore { + init?(): Promise; + destroy?(): Promise; + createThread(metadata?: Record): Promise; + getThread(threadId: string): Promise; + appendMessages(threadId: string, messages: MessageObject[]): Promise; + createRun( + input: InputMessage[], + threadId?: string, + config?: AgentRunConfig, + metadata?: Record, + ): Promise; + getRun(runId: string): Promise; + updateRun(runId: string, updates: Partial): Promise; +} diff --git a/tegg/core/agent-runtime/src/FileAgentStore.ts b/tegg/core/agent-runtime/src/FileAgentStore.ts new file mode 100644 index 0000000000..0e6bb5b522 --- /dev/null +++ b/tegg/core/agent-runtime/src/FileAgentStore.ts @@ -0,0 +1,130 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs/promises'; +import path from 'node:path'; + +import type { InputMessage, MessageObject, AgentRunConfig } from '@eggjs/controller-decorator'; +import { RunStatus, AgentObjectType } from '@eggjs/controller-decorator'; + +import type { AgentStore, ThreadRecord, RunRecord } from './AgentStore.ts'; +import { AgentNotFoundError } from './errors.ts'; +import { nowUnix } from './utils.ts'; + +export interface FileAgentStoreOptions { + dataDir: string; +} + +export class FileAgentStore implements AgentStore { + private readonly dataDir: string; + private readonly threadsDir: string; + private readonly runsDir: string; + + constructor(options: FileAgentStoreOptions) { + this.dataDir = options.dataDir; + this.threadsDir = path.join(this.dataDir, 'threads'); + this.runsDir = path.join(this.dataDir, 'runs'); + } + + private safePath(baseDir: string, id: string): string { + if (!id) { + throw new Error('Invalid id: id must not be empty'); + } + const filePath = path.join(baseDir, `${id}.json`); + if (!filePath.startsWith(baseDir + path.sep)) { + throw new Error(`Invalid id: ${id}`); + } + return filePath; + } + + async init(): Promise { + await fs.mkdir(this.threadsDir, { recursive: true }); + await fs.mkdir(this.runsDir, { recursive: true }); + } + + async createThread(metadata?: Record): Promise { + const threadId = `thread_${crypto.randomUUID()}`; + const record: ThreadRecord = { + id: threadId, + object: AgentObjectType.Thread, + messages: [], + metadata: metadata ?? {}, + created_at: nowUnix(), + }; + await this.writeFile(this.safePath(this.threadsDir, threadId), record); + return record; + } + + async getThread(threadId: string): Promise { + const filePath = this.safePath(this.threadsDir, threadId); + const data = await this.readFile(filePath); + if (!data) { + throw new AgentNotFoundError(`Thread ${threadId} not found`); + } + return data as ThreadRecord; + } + + // Note: read-modify-write without locking. In cluster mode with multiple workers + // sharing the same dataDir, concurrent operations on the same thread may lose data. + async appendMessages(threadId: string, messages: MessageObject[]): Promise { + const thread = await this.getThread(threadId); + thread.messages.push(...messages); + await this.writeFile(this.safePath(this.threadsDir, threadId), thread); + } + + async createRun( + input: InputMessage[], + threadId?: string, + config?: AgentRunConfig, + metadata?: Record, + ): Promise { + const runId = `run_${crypto.randomUUID()}`; + const record: RunRecord = { + id: runId, + object: AgentObjectType.ThreadRun, + thread_id: threadId, + status: RunStatus.Queued, + input, + config, + metadata, + created_at: nowUnix(), + }; + await this.writeFile(this.safePath(this.runsDir, runId), record); + return record; + } + + async getRun(runId: string): Promise { + const filePath = this.safePath(this.runsDir, runId); + const data = await this.readFile(filePath); + if (!data) { + throw new AgentNotFoundError(`Run ${runId} not found`); + } + return data as RunRecord; + } + + async updateRun(runId: string, updates: Partial): Promise { + const run = await this.getRun(runId); + const { id: _, object: __, ...safeUpdates } = updates; + Object.assign(run, safeUpdates); + await this.writeFile(this.safePath(this.runsDir, runId), run); + } + + private async writeFile(filePath: string, data: unknown): Promise { + // Write to a temp file first, then atomically rename to avoid data corruption + // if the process crashes mid-write. + const tmpPath = filePath + '.tmp'; + await fs.writeFile(tmpPath, JSON.stringify(data), 'utf-8'); + await fs.rename(tmpPath, filePath); + } + + private async readFile(filePath: string): Promise { + let content: string; + try { + content = await fs.readFile(filePath, 'utf-8'); + } catch (err: unknown) { + if ((err as NodeJS.ErrnoException).code === 'ENOENT') { + return null; + } + throw err; + } + return JSON.parse(content); + } +} diff --git a/tegg/core/agent-runtime/src/MessageConverter.ts b/tegg/core/agent-runtime/src/MessageConverter.ts new file mode 100644 index 0000000000..f9804c8d2c --- /dev/null +++ b/tegg/core/agent-runtime/src/MessageConverter.ts @@ -0,0 +1,108 @@ +import type { + CreateRunInput, + MessageObject, + MessageContentBlock, + AgentStreamMessage, + AgentStreamMessagePayload, +} from '@eggjs/controller-decorator'; +import { AgentObjectType, MessageRole, MessageStatus, ContentBlockType } from '@eggjs/controller-decorator'; + +import type { RunUsage } from './RunBuilder.ts'; +import { nowUnix, newMsgId } from './utils.ts'; + +/** + * Convert an AgentStreamMessage's message payload into OpenAI MessageContentBlock[]. + */ +export function toContentBlocks(msg: AgentStreamMessagePayload): MessageContentBlock[] { + if (!msg) return []; + const content = msg.content; + if (typeof content === 'string') { + return [{ type: ContentBlockType.Text, text: { value: content, annotations: [] } }]; + } + if (Array.isArray(content)) { + return content + .filter((part) => part.type === ContentBlockType.Text) + .map((part) => ({ type: ContentBlockType.Text, text: { value: part.text, annotations: [] } })); + } + return []; +} + +/** + * Build a completed MessageObject from an AgentStreamMessage payload. + */ +export function toMessageObject(msg: AgentStreamMessagePayload, runId?: string): MessageObject { + return { + id: newMsgId(), + object: AgentObjectType.ThreadMessage, + created_at: nowUnix(), + run_id: runId, + role: MessageRole.Assistant, + status: MessageStatus.Completed, + content: toContentBlocks(msg), + }; +} + +/** + * Extract MessageObjects and accumulated usage from AgentStreamMessage objects. + * Returns camelCase `RunUsage` for internal use; callers convert to snake_case at boundaries. + */ +export function extractFromStreamMessages( + messages: AgentStreamMessage[], + runId?: string, +): { + output: MessageObject[]; + usage?: RunUsage; +} { + const output: MessageObject[] = []; + let promptTokens = 0; + let completionTokens = 0; + let hasUsage = false; + + for (const msg of messages) { + if (msg.message) { + output.push(toMessageObject(msg.message, runId)); + } + if (msg.usage) { + hasUsage = true; + promptTokens += msg.usage.prompt_tokens ?? 0; + completionTokens += msg.usage.completion_tokens ?? 0; + } + } + + let usage: RunUsage | undefined; + if (hasUsage) { + usage = { + promptTokens, + completionTokens, + totalTokens: promptTokens + completionTokens, + }; + } + + return { output, usage }; +} + +/** + * Convert input messages to MessageObjects for thread history. + * System messages are filtered out — they are transient instructions, not conversation history. + */ +export function toInputMessageObjects( + messages: CreateRunInput['input']['messages'], + threadId?: string, +): MessageObject[] { + return messages + .filter( + (m): m is typeof m & { role: Exclude } => m.role !== MessageRole.System, + ) + .map((m) => ({ + id: newMsgId(), + object: AgentObjectType.ThreadMessage, + created_at: nowUnix(), + thread_id: threadId, + role: m.role, + status: MessageStatus.Completed, + content: + typeof m.content === 'string' + ? [{ type: ContentBlockType.Text, text: { value: m.content, annotations: [] } }] + : m.content.map((p) => ({ type: ContentBlockType.Text, text: { value: p.text, annotations: [] } })), + })); +} diff --git a/tegg/core/agent-runtime/src/RunBuilder.ts b/tegg/core/agent-runtime/src/RunBuilder.ts new file mode 100644 index 0000000000..944f7e76d9 --- /dev/null +++ b/tegg/core/agent-runtime/src/RunBuilder.ts @@ -0,0 +1,126 @@ +import type { MessageObject, RunObject } from '@eggjs/controller-decorator'; +import { RunStatus, AgentErrorCode, AgentObjectType } from '@eggjs/controller-decorator'; + +import type { RunRecord } from './AgentStore.ts'; +import { nowUnix } from './utils.ts'; + +/** + * Accumulated token usage in camelCase for internal use. + * Converted to snake_case at output boundaries (store / API / SSE). + */ +export interface RunUsage { + promptTokens: number; + completionTokens: number; + totalTokens: number; +} + +/** + * Encapsulates run state transitions using camelCase internally. + * + * Mutation methods (`start`, `complete`, `fail`, `cancel`) update internal + * state and return `Partial` (snake_case) for the store. + * + * `snapshot()` converts the full internal state to a snake_case `RunObject` + * suitable for API responses and SSE events. + */ +export class RunBuilder { + private readonly id: string; + private readonly threadId: string; + private readonly createdAt: number; + private readonly metadata?: Record; + + private status: RunStatus = RunStatus.Queued; + private startedAt?: number; + private completedAt?: number; + private cancelledAt?: number; + private failedAt?: number; + private lastError?: { code: string; message: string } | null; + private usage?: RunUsage; + private output?: MessageObject[]; + + private constructor(id: string, threadId: string, createdAt: number, metadata?: Record) { + this.id = id; + this.threadId = threadId; + this.createdAt = createdAt; + this.metadata = metadata; + } + + /** Create a RunBuilder from a store RunRecord. */ + static create(run: RunRecord, threadId: string): RunBuilder { + return new RunBuilder(run.id, threadId, run.created_at, run.metadata); + } + + /** queued → in_progress. Returns store update (snake_case). */ + start(): Partial { + this.status = RunStatus.InProgress; + this.startedAt = nowUnix(); + return { status: this.status, started_at: this.startedAt }; + } + + /** in_progress → completed. Returns store update (snake_case). */ + complete(output: MessageObject[], usage?: RunUsage): Partial { + this.status = RunStatus.Completed; + this.completedAt = nowUnix(); + this.output = output; + this.usage = usage; + return { + status: this.status, + output, + usage: usage + ? { + prompt_tokens: usage.promptTokens, + completion_tokens: usage.completionTokens, + total_tokens: usage.totalTokens, + } + : undefined, + completed_at: this.completedAt, + }; + } + + /** in_progress → failed. Returns store update (snake_case). */ + fail(error: Error): Partial { + this.status = RunStatus.Failed; + this.failedAt = nowUnix(); + this.lastError = { code: AgentErrorCode.ExecError, message: error.message }; + return { + status: this.status, + last_error: this.lastError, + failed_at: this.failedAt, + }; + } + + /** in_progress/queued → cancelled. Returns store update (snake_case). */ + cancel(): Partial { + this.status = RunStatus.Cancelled; + this.cancelledAt = nowUnix(); + return { + status: this.status, + cancelled_at: this.cancelledAt, + }; + } + + /** Convert internal camelCase state to snake_case RunObject for API / SSE. */ + snapshot(): RunObject { + return { + id: this.id, + object: AgentObjectType.ThreadRun, + created_at: this.createdAt, + thread_id: this.threadId, + status: this.status, + last_error: this.lastError, + started_at: this.startedAt ?? null, + completed_at: this.completedAt ?? null, + cancelled_at: this.cancelledAt ?? null, + failed_at: this.failedAt ?? null, + usage: this.usage + ? { + prompt_tokens: this.usage.promptTokens, + completion_tokens: this.usage.completionTokens, + total_tokens: this.usage.totalTokens, + } + : null, + metadata: this.metadata, + output: this.output, + }; + } +} diff --git a/tegg/core/agent-runtime/src/SSEWriter.ts b/tegg/core/agent-runtime/src/SSEWriter.ts new file mode 100644 index 0000000000..2c3d43ba04 --- /dev/null +++ b/tegg/core/agent-runtime/src/SSEWriter.ts @@ -0,0 +1,51 @@ +import type { ServerResponse } from 'node:http'; + +/** + * Abstract interface for writing SSE events. + * Decouples AgentRuntime from HTTP transport details. + */ +export interface SSEWriter { + /** Write an SSE event with the given name and JSON-serializable data. */ + writeEvent(event: string, data: unknown): void; + /** Whether the underlying connection has been closed. */ + readonly closed: boolean; + /** End the SSE stream. */ + end(): void; + /** Register a callback for when the client disconnects. */ + onClose(callback: () => void): void; +} + +/** + * SSEWriter implementation backed by a Node.js http.ServerResponse. + */ +export class NodeSSEWriter implements SSEWriter { + private readonly res: ServerResponse; + + constructor(res: ServerResponse) { + this.res = res; + this.res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + } + + writeEvent(event: string, data: unknown): void { + const payload = typeof data === 'string' ? data : JSON.stringify(data); + this.res.write(`event: ${event}\ndata: ${payload}\n\n`); + } + + get closed(): boolean { + return this.res.writableEnded; + } + + end(): void { + if (!this.res.writableEnded) { + this.res.end(); + } + } + + onClose(callback: () => void): void { + this.res.once('close', callback); + } +} diff --git a/tegg/core/agent-runtime/src/errors.ts b/tegg/core/agent-runtime/src/errors.ts new file mode 100644 index 0000000000..8ad035e0c6 --- /dev/null +++ b/tegg/core/agent-runtime/src/errors.ts @@ -0,0 +1,26 @@ +/** + * Error thrown when a thread or run is not found. + * The `status` property is recognized by Koa/Egg error handling + * to set the corresponding HTTP response status code. + */ +export class AgentNotFoundError extends Error { + status: number = 404; + + constructor(message: string) { + super(message); + this.name = 'AgentNotFoundError'; + } +} + +/** + * Error thrown when an operation conflicts with the current state + * (e.g., cancelling a completed run). + */ +export class AgentConflictError extends Error { + status: number = 409; + + constructor(message: string) { + super(message); + this.name = 'AgentConflictError'; + } +} diff --git a/tegg/core/agent-runtime/src/index.ts b/tegg/core/agent-runtime/src/index.ts new file mode 100644 index 0000000000..5cba75b523 --- /dev/null +++ b/tegg/core/agent-runtime/src/index.ts @@ -0,0 +1,9 @@ +export * from './AgentStore.ts'; +export * from './errors.ts'; +export * from './FileAgentStore.ts'; +export * from './MessageConverter.ts'; +export * from './RunBuilder.ts'; +export * from './SSEWriter.ts'; +export * from './utils.ts'; +export { AgentRuntime, AGENT_RUNTIME, createAgentRuntime } from './AgentRuntime.ts'; +export type { AgentControllerHost, AgentRuntimeOptions, AgentRuntimeLogger } from './AgentRuntime.ts'; diff --git a/tegg/core/agent-runtime/src/utils.ts b/tegg/core/agent-runtime/src/utils.ts new file mode 100644 index 0000000000..eb2f168471 --- /dev/null +++ b/tegg/core/agent-runtime/src/utils.ts @@ -0,0 +1,9 @@ +import crypto from 'node:crypto'; + +export function nowUnix(): number { + return Math.floor(Date.now() / 1000); +} + +export function newMsgId(): string { + return `msg_${crypto.randomUUID()}`; +} diff --git a/tegg/core/agent-runtime/test/AgentRuntime.test.ts b/tegg/core/agent-runtime/test/AgentRuntime.test.ts new file mode 100644 index 0000000000..4d7e32aff2 --- /dev/null +++ b/tegg/core/agent-runtime/test/AgentRuntime.test.ts @@ -0,0 +1,289 @@ +import { strict as assert } from 'node:assert'; +import fs from 'node:fs/promises'; +import path from 'node:path'; + +import { describe, it, beforeEach, afterEach } from 'vitest'; + +import { AgentRuntime } from '../src/AgentRuntime.ts'; +import type { AgentControllerHost } from '../src/AgentRuntime.ts'; +import { AgentNotFoundError } from '../src/errors.ts'; +import { FileAgentStore } from '../src/FileAgentStore.ts'; + +describe('core/agent-runtime/test/AgentRuntime.test.ts', () => { + const dataDir = path.join(import.meta.dirname, '.agent-runtime-test-data'); + let runtime: AgentRuntime; + let store: FileAgentStore; + let host: AgentControllerHost; + + beforeEach(async () => { + store = new FileAgentStore({ dataDir }); + await store.init(); + host = { + async *execRun(input: any) { + const messages = input.input.messages; + yield { + type: 'assistant', + message: { + role: 'assistant', + content: [{ type: 'text', text: `Hello ${messages.length} messages` }], + }, + }; + yield { + type: 'result', + usage: { prompt_tokens: 10, completion_tokens: 5 }, + }; + }, + } as any; + runtime = new AgentRuntime({ + host, + store, + logger: { + error() { + /* noop */ + }, + }, + }); + }); + + afterEach(async () => { + await runtime.destroy(); + await fs.rm(dataDir, { recursive: true, force: true }); + }); + + describe('createThread', () => { + it('should create a thread and return OpenAI ThreadObject', async () => { + const result = await runtime.createThread(); + assert(result.id.startsWith('thread_')); + assert.equal(result.object, 'thread'); + assert(typeof result.created_at === 'number'); + // Unix seconds + assert(result.created_at <= Math.floor(Date.now() / 1000)); + assert(typeof result.metadata === 'object'); + }); + }); + + describe('getThread', () => { + it('should get a thread by id', async () => { + const created = await runtime.createThread(); + + const result = await runtime.getThread(created.id); + assert.equal(result.id, created.id); + assert.equal(result.object, 'thread'); + assert(Array.isArray(result.messages)); + }); + + it('should throw AgentNotFoundError for non-existent thread', async () => { + await assert.rejects( + () => runtime.getThread('thread_xxx'), + (err: unknown) => { + assert(err instanceof AgentNotFoundError); + assert.equal(err.status, 404); + return true; + }, + ); + }); + }); + + describe('syncRun', () => { + it('should collect all chunks and return completed RunObject', async () => { + const result = await runtime.syncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + assert(result.id.startsWith('run_')); + assert.equal(result.object, 'thread.run'); + assert.equal(result.status, 'completed'); + assert(result.thread_id); + assert(result.thread_id.startsWith('thread_')); + assert.equal(result.output!.length, 1); + assert.equal(result.output![0].object, 'thread.message'); + assert.equal(result.output![0].role, 'assistant'); + assert.equal(result.output![0].status, 'completed'); + assert.equal(result.output![0].content[0].type, 'text'); + assert.equal(result.output![0].content[0].text.value, 'Hello 1 messages'); + assert(Array.isArray(result.output![0].content[0].text.annotations)); + assert.equal(result.usage!.prompt_tokens, 10); + assert.equal(result.usage!.completion_tokens, 5); + assert.equal(result.usage!.total_tokens, 15); + assert(result.started_at! >= result.created_at, 'started_at should be >= created_at'); + }); + + it('should pass metadata through to store and return it', async () => { + const meta = { user_id: 'u_1', trace: 'xyz' }; + const result = await runtime.syncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + metadata: meta, + } as any); + assert.deepEqual(result.metadata, meta); + + // Verify stored in store + const run = await store.getRun(result.id); + assert.deepEqual(run.metadata, meta); + }); + + it('should store the run in the store', async () => { + const result = await runtime.syncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + const run = await store.getRun(result.id); + assert.equal(run.status, 'completed'); + assert(run.completed_at); + }); + + it('should append messages to thread when thread_id provided', async () => { + const thread = await runtime.createThread(); + + await runtime.syncRun({ + thread_id: thread.id, + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + + const updated = await runtime.getThread(thread.id); + assert.equal(updated.messages.length, 2); // user + assistant + assert.equal(updated.messages[0].role, 'user'); + assert.equal(updated.messages[1].role, 'assistant'); + }); + + it('should auto-create thread and append messages when thread_id not provided', async () => { + const result = await runtime.syncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + assert(result.thread_id); + assert(result.thread_id.startsWith('thread_')); + + // Verify thread was created and messages were appended + const thread = await runtime.getThread(result.thread_id); + assert.equal(thread.messages.length, 2); // user + assistant + assert.equal(thread.messages[0].role, 'user'); + assert.equal(thread.messages[1].role, 'assistant'); + }); + }); + + describe('asyncRun', () => { + it('should return queued status immediately with auto-created thread_id', async () => { + const result = await runtime.asyncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + assert(result.id.startsWith('run_')); + assert.equal(result.object, 'thread.run'); + assert.equal(result.status, 'queued'); + assert(result.thread_id); + assert(result.thread_id.startsWith('thread_')); + }); + + it('should complete the run in the background', async () => { + const result = await runtime.asyncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + + // Wait for background task to complete naturally + await runtime.waitForPendingTasks(); + + const run = await store.getRun(result.id); + assert.equal(run.status, 'completed'); + assert.equal(run.output![0].content[0].text.value, 'Hello 1 messages'); + }); + + it('should auto-create thread and append messages when thread_id not provided', async () => { + const result = await runtime.asyncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + assert(result.thread_id); + + // Wait for background task to complete naturally + await runtime.waitForPendingTasks(); + + // Verify thread was created and messages were appended + const thread = await store.getThread(result.thread_id); + assert.equal(thread.messages.length, 2); // user + assistant + assert.equal(thread.messages[0].role, 'user'); + assert.equal(thread.messages[1].role, 'assistant'); + }); + + it('should pass metadata through to store and return it', async () => { + const meta = { session: 'sess_1' }; + const result = await runtime.asyncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + metadata: meta, + } as any); + assert.deepEqual(result.metadata, meta); + + // Wait for background task to complete naturally + await runtime.waitForPendingTasks(); + + // Verify stored in store + const run = await store.getRun(result.id); + assert.deepEqual(run.metadata, meta); + }); + }); + + describe('getRun', () => { + it('should get a run by id', async () => { + const syncResult = await runtime.syncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + + const result = await runtime.getRun(syncResult.id); + assert.equal(result.id, syncResult.id); + assert.equal(result.object, 'thread.run'); + assert.equal(result.status, 'completed'); + assert(typeof result.created_at === 'number'); + }); + + it('should return metadata from getRun', async () => { + const meta = { source: 'api' }; + const syncResult = await runtime.syncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + metadata: meta, + } as any); + + const result = await runtime.getRun(syncResult.id); + assert.deepEqual(result.metadata, meta); + }); + }); + + describe('cancelRun', () => { + it('should cancel a run', async () => { + // Use a signal-aware execRun so abort takes effect + host.execRun = async function* (_input: any, signal?: AbortSignal) { + yield { + type: 'assistant', + message: { role: 'assistant' as const, content: [{ type: 'text' as const, text: 'start' }] }, + }; + // Wait but check abort signal + await new Promise((resolve, reject) => { + const timer = setTimeout(resolve, 5000); + if (signal) { + signal.addEventListener( + 'abort', + () => { + clearTimeout(timer); + reject(new Error('aborted')); + }, + { once: true }, + ); + } + }); + yield { + type: 'assistant', + message: { role: 'assistant' as const, content: [{ type: 'text' as const, text: 'end' }] }, + }; + } as any; + + const result = await runtime.asyncRun({ + input: { messages: [{ role: 'user', content: 'Hi' }] }, + } as any); + + // Let background task start running + await new Promise((resolve) => setTimeout(resolve, 50)); + + const cancelResult = await runtime.cancelRun(result.id); + assert.equal(cancelResult.id, result.id); + assert.equal(cancelResult.object, 'thread.run'); + assert.equal(cancelResult.status, 'cancelled'); + + const run = await store.getRun(result.id); + assert.equal(run.status, 'cancelled'); + assert(run.cancelled_at); + }); + }); +}); diff --git a/tegg/core/agent-runtime/test/FileAgentStore.test.ts b/tegg/core/agent-runtime/test/FileAgentStore.test.ts new file mode 100644 index 0000000000..288ecff3ee --- /dev/null +++ b/tegg/core/agent-runtime/test/FileAgentStore.test.ts @@ -0,0 +1,181 @@ +import { strict as assert } from 'node:assert'; +import fs from 'node:fs/promises'; +import path from 'node:path'; + +import { describe, it, beforeEach, afterEach } from 'vitest'; + +import { AgentNotFoundError } from '../src/errors.ts'; +import { FileAgentStore } from '../src/FileAgentStore.ts'; + +describe('core/agent-runtime/test/FileAgentStore.test.ts', () => { + const dataDir = path.join(import.meta.dirname, '.agent-test-data'); + let store: FileAgentStore; + + beforeEach(async () => { + store = new FileAgentStore({ dataDir }); + await store.init(); + }); + + afterEach(async () => { + await fs.rm(dataDir, { recursive: true, force: true }); + }); + + describe('threads', () => { + it('should create a thread', async () => { + const thread = await store.createThread(); + assert(thread.id.startsWith('thread_')); + assert.equal(thread.object, 'thread'); + assert(Array.isArray(thread.messages)); + assert.equal(thread.messages.length, 0); + assert(typeof thread.created_at === 'number'); + // Unix seconds + assert(thread.created_at <= Math.floor(Date.now() / 1000)); + }); + + it('should create a thread with metadata', async () => { + const thread = await store.createThread({ key: 'value' }); + assert.deepEqual(thread.metadata, { key: 'value' }); + }); + + it('should create a thread with empty metadata by default', async () => { + const thread = await store.createThread(); + assert.deepEqual(thread.metadata, {}); + }); + + it('should get a thread by id', async () => { + const created = await store.createThread(); + const fetched = await store.getThread(created.id); + assert.equal(fetched.id, created.id); + assert.equal(fetched.object, 'thread'); + assert.equal(fetched.created_at, created.created_at); + }); + + it('should throw AgentNotFoundError for non-existent thread', async () => { + await assert.rejects( + () => store.getThread('thread_non_existent'), + (err: unknown) => { + assert(err instanceof AgentNotFoundError); + assert.equal(err.status, 404); + assert.match(err.message, /Thread thread_non_existent not found/); + return true; + }, + ); + }); + + it('should append messages to a thread', async () => { + const thread = await store.createThread(); + await store.appendMessages(thread.id, [ + { + id: 'msg_1', + object: 'thread.message', + created_at: Math.floor(Date.now() / 1000), + role: 'user', + status: 'completed', + content: [{ type: 'text', text: { value: 'Hello', annotations: [] } }], + }, + { + id: 'msg_2', + object: 'thread.message', + created_at: Math.floor(Date.now() / 1000), + role: 'assistant', + status: 'completed', + content: [{ type: 'text', text: { value: 'Hi!', annotations: [] } }], + }, + ]); + const fetched = await store.getThread(thread.id); + assert.equal(fetched.messages.length, 2); + assert.equal(fetched.messages[0].content[0].text.value, 'Hello'); + assert.equal(fetched.messages[1].content[0].text.value, 'Hi!'); + }); + }); + + describe('runs', () => { + it('should create a run', async () => { + const run = await store.createRun([{ role: 'user', content: 'Hello' }]); + assert(run.id.startsWith('run_')); + assert.equal(run.object, 'thread.run'); + assert.equal(run.status, 'queued'); + assert.equal(run.input.length, 1); + assert(typeof run.created_at === 'number'); + // Unix seconds + assert(run.created_at <= Math.floor(Date.now() / 1000)); + }); + + it('should create a run with thread_id and config', async () => { + const run = await store.createRun([{ role: 'user', content: 'Hello' }], 'thread_123', { timeout_ms: 5000 }); + assert.equal(run.thread_id, 'thread_123'); + assert.deepEqual(run.config, { timeout_ms: 5000 }); + }); + + it('should create a run with metadata', async () => { + const meta = { user_id: 'u_1', session: 'abc' }; + const run = await store.createRun([{ role: 'user', content: 'Hello' }], 'thread_123', undefined, meta); + assert.deepEqual(run.metadata, meta); + + // Verify metadata persists through getRun + const fetched = await store.getRun(run.id); + assert.deepEqual(fetched.metadata, meta); + }); + + it('should preserve metadata across updateRun', async () => { + const meta = { tag: 'test' }; + const run = await store.createRun([{ role: 'user', content: 'Hello' }], undefined, undefined, meta); + await store.updateRun(run.id, { status: 'in_progress', started_at: Math.floor(Date.now() / 1000) }); + const fetched = await store.getRun(run.id); + assert.equal(fetched.status, 'in_progress'); + assert.deepEqual(fetched.metadata, meta); + }); + + it('should get a run by id', async () => { + const created = await store.createRun([{ role: 'user', content: 'Hello' }]); + const fetched = await store.getRun(created.id); + assert.equal(fetched.id, created.id); + assert.equal(fetched.status, 'queued'); + }); + + it('should throw AgentNotFoundError for non-existent run', async () => { + await assert.rejects( + () => store.getRun('run_non_existent'), + (err: unknown) => { + assert(err instanceof AgentNotFoundError); + assert.equal(err.status, 404); + assert.match(err.message, /Run run_non_existent not found/); + return true; + }, + ); + }); + + it('should update a run', async () => { + const run = await store.createRun([{ role: 'user', content: 'Hello' }]); + await store.updateRun(run.id, { + status: 'completed', + output: [ + { + id: 'msg_1', + object: 'thread.message', + created_at: Math.floor(Date.now() / 1000), + role: 'assistant', + status: 'completed', + content: [{ type: 'text', text: { value: 'World', annotations: [] } }], + }, + ], + completed_at: Math.floor(Date.now() / 1000), + }); + const fetched = await store.getRun(run.id); + assert.equal(fetched.status, 'completed'); + assert.equal(fetched.output![0].content[0].text.value, 'World'); + assert(typeof fetched.completed_at === 'number'); + }); + }); + + describe('data directory', () => { + it('should create subdirectories on init', async () => { + const threadsDir = path.join(dataDir, 'threads'); + const runsDir = path.join(dataDir, 'runs'); + const threadsStat = await fs.stat(threadsDir); + const runsStat = await fs.stat(runsDir); + assert(threadsStat.isDirectory()); + assert(runsStat.isDirectory()); + }); + }); +}); diff --git a/tegg/core/agent-runtime/tsconfig.json b/tegg/core/agent-runtime/tsconfig.json new file mode 100644 index 0000000000..618c6c3e97 --- /dev/null +++ b/tegg/core/agent-runtime/tsconfig.json @@ -0,0 +1,3 @@ +{ + "extends": "../../../tsconfig.json" +} diff --git a/tegg/core/controller-decorator/src/decorator/agent/AgentController.ts b/tegg/core/controller-decorator/src/decorator/agent/AgentController.ts new file mode 100644 index 0000000000..d98f56d3ab --- /dev/null +++ b/tegg/core/controller-decorator/src/decorator/agent/AgentController.ts @@ -0,0 +1,146 @@ +import { PrototypeUtil, SingletonProto } from '@eggjs/core-decorator'; +import { StackUtil } from '@eggjs/tegg-common-util'; +import type { EggProtoImplClass } from '@eggjs/tegg-types'; +import { + AccessLevel, + AGENT_CONTROLLER_PROTO_IMPL_TYPE, + ControllerType, + HTTPMethodEnum, + HTTPParamType, +} from '@eggjs/tegg-types'; + +import { AgentInfoUtil } from '../../util/AgentInfoUtil.ts'; +import { ControllerInfoUtil } from '../../util/ControllerInfoUtil.ts'; +import { HTTPInfoUtil } from '../../util/HTTPInfoUtil.ts'; +import { MethodInfoUtil } from '../../util/MethodInfoUtil.ts'; + +interface AgentRouteDefinition { + methodName: string; + httpMethod: HTTPMethodEnum; + path: string; + paramType?: 'body' | 'pathParam'; + paramName?: string; + hasParam: boolean; +} + +// Default implementations for unimplemented methods. +// Methods with hasParam=true need function.length === 1 for param validation. +// Stubs are marked with Symbol.for('AGENT_NOT_IMPLEMENTED') so agent-runtime +// can distinguish them from user-defined methods at enhancement time. +function createNotImplemented(methodName: string, hasParam: boolean) { + let fn; + if (hasParam) { + fn = async function (_arg: unknown) { + throw new Error(`${methodName} not implemented`); + }; + } else { + fn = async function () { + throw new Error(`${methodName} not implemented`); + }; + } + AgentInfoUtil.setNotImplemented(fn); + return fn; +} + +const AGENT_ROUTES: AgentRouteDefinition[] = [ + { + methodName: 'createThread', + httpMethod: HTTPMethodEnum.POST, + path: '/threads', + hasParam: false, + }, + { + methodName: 'getThread', + httpMethod: HTTPMethodEnum.GET, + path: '/threads/:id', + paramType: 'pathParam', + paramName: 'id', + hasParam: true, + }, + { + methodName: 'asyncRun', + httpMethod: HTTPMethodEnum.POST, + path: '/runs', + paramType: 'body', + hasParam: true, + }, + { + methodName: 'streamRun', + httpMethod: HTTPMethodEnum.POST, + path: '/runs/stream', + paramType: 'body', + hasParam: true, + }, + { + methodName: 'syncRun', + httpMethod: HTTPMethodEnum.POST, + path: '/runs/wait', + paramType: 'body', + hasParam: true, + }, + { + methodName: 'getRun', + httpMethod: HTTPMethodEnum.GET, + path: '/runs/:id', + paramType: 'pathParam', + paramName: 'id', + hasParam: true, + }, + { + methodName: 'cancelRun', + httpMethod: HTTPMethodEnum.POST, + path: '/runs/:id/cancel', + paramType: 'pathParam', + paramName: 'id', + hasParam: true, + }, +]; + +export function AgentController() { + return function (constructor: EggProtoImplClass) { + // Set controller type as HTTP so existing infrastructure handles it + ControllerInfoUtil.setControllerType(constructor, ControllerType.HTTP); + + // Set the fixed base HTTP path + HTTPInfoUtil.setHTTPPath('/api/v1', constructor); + + // Apply SingletonProto with custom proto impl type + const func = SingletonProto({ + accessLevel: AccessLevel.PUBLIC, + protoImplType: AGENT_CONTROLLER_PROTO_IMPL_TYPE, + }); + func(constructor); + + // Set file path for prototype + // Stack depth 5: [0] getCalleeFromStack → [1] decorator fn → [2-4] reflect/oxc runtime → [5] user source + PrototypeUtil.setFilePath(constructor, StackUtil.getCalleeFromStack(false, 5)); + + // Register each agent route + for (const route of AGENT_ROUTES) { + // Inject default implementation if method not defined + if (!constructor.prototype[route.methodName]) { + constructor.prototype[route.methodName] = createNotImplemented(route.methodName, route.hasParam); + } + + // Set method controller type + MethodInfoUtil.setMethodControllerType(constructor, route.methodName, ControllerType.HTTP); + + // Set HTTP method (GET/POST) + HTTPInfoUtil.setHTTPMethodMethod(route.httpMethod, constructor, route.methodName); + + // Set HTTP path + HTTPInfoUtil.setHTTPMethodPath(route.path, constructor, route.methodName); + + // Set parameter metadata + if (route.paramType === 'body') { + HTTPInfoUtil.setHTTPMethodParamType(HTTPParamType.BODY, 0, constructor, route.methodName); + } else if (route.paramType === 'pathParam') { + HTTPInfoUtil.setHTTPMethodParamType(HTTPParamType.PARAM, 0, constructor, route.methodName); + HTTPInfoUtil.setHTTPMethodParamName(route.paramName!, 0, constructor, route.methodName); + } + } + + // Mark the class as an AgentController for precise detection + AgentInfoUtil.setIsAgentController(constructor); + }; +} diff --git a/tegg/core/controller-decorator/src/decorator/agent/AgentHandler.ts b/tegg/core/controller-decorator/src/decorator/agent/AgentHandler.ts new file mode 100644 index 0000000000..923b9df36c --- /dev/null +++ b/tegg/core/controller-decorator/src/decorator/agent/AgentHandler.ts @@ -0,0 +1,22 @@ +import type { + ThreadObject, + ThreadObjectWithMessages, + CreateRunInput, + RunObject, + AgentStreamMessage, +} from '../../model/AgentControllerTypes.ts'; + +// Interface for AgentController classes. The `execRun` method is required — +// the framework uses it to auto-wire thread/run management, store persistence, +// SSE streaming, async execution, and cancellation via smart defaults. +export interface AgentHandler { + execRun(input: CreateRunInput, signal?: AbortSignal): AsyncGenerator; + createStore?(): Promise; + createThread?(): Promise; + getThread?(threadId: string): Promise; + asyncRun?(input: CreateRunInput): Promise; + streamRun?(input: CreateRunInput): Promise; + syncRun?(input: CreateRunInput): Promise; + getRun?(runId: string): Promise; + cancelRun?(runId: string): Promise; +} diff --git a/tegg/core/controller-decorator/src/decorator/index.ts b/tegg/core/controller-decorator/src/decorator/index.ts index 27fef617f7..14c0ffa0c1 100644 --- a/tegg/core/controller-decorator/src/decorator/index.ts +++ b/tegg/core/controller-decorator/src/decorator/index.ts @@ -1,5 +1,7 @@ export * from './http/index.ts'; export * from './mcp/index.ts'; +export * from './agent/AgentController.ts'; +export * from './agent/AgentHandler.ts'; export * from './Acl.ts'; export * from './Context.ts'; export * from './Middleware.ts'; diff --git a/tegg/core/controller-decorator/src/model/AgentControllerTypes.ts b/tegg/core/controller-decorator/src/model/AgentControllerTypes.ts new file mode 100644 index 0000000000..7d23410e99 --- /dev/null +++ b/tegg/core/controller-decorator/src/model/AgentControllerTypes.ts @@ -0,0 +1,175 @@ +// ===== Object types ===== + +export const AgentObjectType = { + Thread: 'thread', + ThreadRun: 'thread.run', + ThreadMessage: 'thread.message', + ThreadMessageDelta: 'thread.message.delta', +} as const; +export type AgentObjectType = (typeof AgentObjectType)[keyof typeof AgentObjectType]; + +// ===== Message roles ===== + +export const MessageRole = { + User: 'user', + Assistant: 'assistant', + System: 'system', +} as const; +export type MessageRole = (typeof MessageRole)[keyof typeof MessageRole]; + +// ===== Message statuses ===== + +export const MessageStatus = { + InProgress: 'in_progress', + Incomplete: 'incomplete', + Completed: 'completed', +} as const; +export type MessageStatus = (typeof MessageStatus)[keyof typeof MessageStatus]; + +// ===== Content block types ===== + +export const ContentBlockType = { + Text: 'text', +} as const; +export type ContentBlockType = (typeof ContentBlockType)[keyof typeof ContentBlockType]; + +// ===== Input Message (what clients send in request body) ===== + +export interface InputMessage { + role: MessageRole; + content: string | InputContentPart[]; + metadata?: Record; +} + +export interface InputContentPart { + type: ContentBlockType; + text: string; +} + +// ===== Output Message (OpenAI thread.message object) ===== + +export interface MessageObject { + id: string; // "msg_xxx" + object: typeof AgentObjectType.ThreadMessage; + created_at: number; // Unix seconds + thread_id?: string; + run_id?: string; + role: Exclude; + status: MessageStatus; + content: MessageContentBlock[]; + metadata?: Record; +} + +export interface TextContentBlock { + type: ContentBlockType; + text: { value: string; annotations: unknown[] }; +} + +export type MessageContentBlock = TextContentBlock; + +// ===== Thread types ===== + +export interface ThreadObject { + id: string; // "thread_xxx" + object: typeof AgentObjectType.Thread; + created_at: number; // Unix seconds + metadata: Record; +} + +export interface ThreadObjectWithMessages extends ThreadObject { + messages: MessageObject[]; +} + +// ===== Run types ===== + +export const RunStatus = { + Queued: 'queued', + InProgress: 'in_progress', + Completed: 'completed', + Failed: 'failed', + Cancelled: 'cancelled', + Cancelling: 'cancelling', + Expired: 'expired', +} as const; +export type RunStatus = (typeof RunStatus)[keyof typeof RunStatus]; + +export interface RunObject { + id: string; // "run_xxx" + object: typeof AgentObjectType.ThreadRun; + created_at: number; // Unix seconds + thread_id?: string; + status: RunStatus; + last_error?: { code: string; message: string } | null; + started_at?: number | null; + completed_at?: number | null; + cancelled_at?: number | null; + failed_at?: number | null; + usage?: { prompt_tokens: number; completion_tokens: number; total_tokens: number } | null; + metadata?: Record; + output?: MessageObject[]; + config?: AgentRunConfig; +} + +// ===== Request types ===== + +export interface CreateRunInput { + thread_id?: string; + input: { + messages: InputMessage[]; + }; + config?: AgentRunConfig; + metadata?: Record; +} + +// ===== SSE Delta type ===== + +export interface MessageDeltaObject { + id: string; + object: typeof AgentObjectType.ThreadMessageDelta; + delta: { content: MessageContentBlock[] }; +} + +// ===== SSE Event names ===== + +export const AgentSSEEvent = { + ThreadRunCreated: 'thread.run.created', + ThreadRunInProgress: 'thread.run.in_progress', + ThreadRunCompleted: 'thread.run.completed', + ThreadRunFailed: 'thread.run.failed', + ThreadRunCancelled: 'thread.run.cancelled', + ThreadMessageCreated: 'thread.message.created', + ThreadMessageDelta: 'thread.message.delta', + ThreadMessageCompleted: 'thread.message.completed', + Done: 'done', +} as const; +export type AgentSSEEvent = (typeof AgentSSEEvent)[keyof typeof AgentSSEEvent]; + +// ===== Error codes ===== + +export const AgentErrorCode = { + ExecError: 'EXEC_ERROR', +} as const; +export type AgentErrorCode = (typeof AgentErrorCode)[keyof typeof AgentErrorCode]; + +// ===== Internal types ===== + +export type AgentStreamMessagePayload = AgentStreamMessage['message']; + +export interface AgentRunUsage { + total_tokens?: number; + prompt_tokens?: number; + completion_tokens?: number; + duration_ms?: number; +} + +export interface AgentRunConfig { + max_iterations?: number; + timeout_ms?: number; +} + +export interface AgentStreamMessage { + type: string; + message?: { role: string; content: string | { type: string; text: string }[] }; + usage?: AgentRunUsage; + [key: string]: unknown; +} diff --git a/tegg/core/controller-decorator/src/model/index.ts b/tegg/core/controller-decorator/src/model/index.ts index f637c18ae6..d3280fa4fb 100644 --- a/tegg/core/controller-decorator/src/model/index.ts +++ b/tegg/core/controller-decorator/src/model/index.ts @@ -1,3 +1,4 @@ +export * from './AgentControllerTypes.ts'; export * from './HTTPControllerMeta.ts'; export * from './HTTPCookies.ts'; export * from './HTTPMethodMeta.ts'; diff --git a/tegg/core/controller-decorator/src/util/AgentInfoUtil.ts b/tegg/core/controller-decorator/src/util/AgentInfoUtil.ts new file mode 100644 index 0000000000..3ab5b9d926 --- /dev/null +++ b/tegg/core/controller-decorator/src/util/AgentInfoUtil.ts @@ -0,0 +1,33 @@ +import { MetadataUtil } from '@eggjs/core-decorator'; +import { + CONTROLLER_AGENT_CONTROLLER, + CONTROLLER_AGENT_NOT_IMPLEMENTED, + CONTROLLER_AGENT_ENHANCED, +} from '@eggjs/tegg-types'; +import type { EggProtoImplClass } from '@eggjs/tegg-types'; + +export class AgentInfoUtil { + static setIsAgentController(clazz: EggProtoImplClass): void { + MetadataUtil.defineMetaData(CONTROLLER_AGENT_CONTROLLER, true, clazz); + } + + static isAgentController(clazz: EggProtoImplClass): boolean { + return MetadataUtil.getBooleanMetaData(CONTROLLER_AGENT_CONTROLLER, clazz); + } + + static setNotImplemented(fn: Function): void { + Reflect.defineMetadata(CONTROLLER_AGENT_NOT_IMPLEMENTED, true, fn); + } + + static isNotImplemented(fn: Function): boolean { + return !!Reflect.getMetadata(CONTROLLER_AGENT_NOT_IMPLEMENTED, fn); + } + + static setEnhanced(clazz: EggProtoImplClass): void { + MetadataUtil.defineMetaData(CONTROLLER_AGENT_ENHANCED, true, clazz); + } + + static isEnhanced(clazz: EggProtoImplClass): boolean { + return MetadataUtil.getBooleanMetaData(CONTROLLER_AGENT_ENHANCED, clazz); + } +} diff --git a/tegg/core/controller-decorator/src/util/index.ts b/tegg/core/controller-decorator/src/util/index.ts index c97db88411..15305fc730 100644 --- a/tegg/core/controller-decorator/src/util/index.ts +++ b/tegg/core/controller-decorator/src/util/index.ts @@ -1,4 +1,5 @@ export * from './validator/index.ts'; +export * from './AgentInfoUtil.ts'; export * from './ControllerInfoUtil.ts'; export * from './ControllerMetadataUtil.ts'; export * from './HTTPInfoUtil.ts'; diff --git a/tegg/core/controller-decorator/test/AgentController.test.ts b/tegg/core/controller-decorator/test/AgentController.test.ts new file mode 100644 index 0000000000..c3addeb02f --- /dev/null +++ b/tegg/core/controller-decorator/test/AgentController.test.ts @@ -0,0 +1,206 @@ +import assert from 'node:assert/strict'; + +import { ControllerType, HTTPMethodEnum } from '@eggjs/tegg-types'; +import { describe, it } from 'vitest'; + +import { + AgentInfoUtil, + ControllerMetaBuilderFactory, + BodyParamMeta, + PathParamMeta, + ControllerInfoUtil, + MethodInfoUtil, + HTTPInfoUtil, +} from '../src/index.ts'; +import { HTTPControllerMeta } from '../src/model/index.ts'; +import { AgentFooController } from './fixtures/AgentFooController.js'; + +describe('core/controller-decorator/test/AgentController.test.ts', () => { + describe('decorator metadata', () => { + it('should set ControllerType.HTTP on the class', () => { + const controllerType = ControllerInfoUtil.getControllerType(AgentFooController); + assert.strictEqual(controllerType, ControllerType.HTTP); + }); + + it('should set AGENT_CONTROLLER metadata on the class', () => { + assert.strictEqual(AgentInfoUtil.isAgentController(AgentFooController), true); + }); + + it('should set fixed base path /api/v1', () => { + const httpPath = HTTPInfoUtil.getHTTPPath(AgentFooController); + assert.strictEqual(httpPath, '/api/v1'); + }); + }); + + describe('method HTTP metadata', () => { + const methodRoutes = [ + { methodName: 'createThread', httpMethod: HTTPMethodEnum.POST, path: '/threads' }, + { methodName: 'getThread', httpMethod: HTTPMethodEnum.GET, path: '/threads/:id' }, + { methodName: 'asyncRun', httpMethod: HTTPMethodEnum.POST, path: '/runs' }, + { methodName: 'streamRun', httpMethod: HTTPMethodEnum.POST, path: '/runs/stream' }, + { methodName: 'syncRun', httpMethod: HTTPMethodEnum.POST, path: '/runs/wait' }, + { methodName: 'getRun', httpMethod: HTTPMethodEnum.GET, path: '/runs/:id' }, + { methodName: 'cancelRun', httpMethod: HTTPMethodEnum.POST, path: '/runs/:id/cancel' }, + ]; + + for (const route of methodRoutes) { + it(`should set correct HTTP method for ${route.methodName}`, () => { + const method = HTTPInfoUtil.getHTTPMethodMethod(AgentFooController, route.methodName); + assert.strictEqual(method, route.httpMethod); + }); + + it(`should set correct HTTP path for ${route.methodName}`, () => { + const path = HTTPInfoUtil.getHTTPMethodPath(AgentFooController, route.methodName); + assert.strictEqual(path, route.path); + }); + + it(`should set ControllerType.HTTP on method ${route.methodName}`, () => { + const controllerType = MethodInfoUtil.getMethodControllerType(AgentFooController, route.methodName); + assert.strictEqual(controllerType, ControllerType.HTTP); + }); + } + }); + + describe('parameter metadata', () => { + it('should set BODY param at index 0 for asyncRun', () => { + const paramType = HTTPInfoUtil.getHTTPMethodParamType(0, AgentFooController, 'asyncRun'); + assert.strictEqual(paramType, 'BODY'); + }); + + it('should set BODY param at index 0 for streamRun', () => { + const paramType = HTTPInfoUtil.getHTTPMethodParamType(0, AgentFooController, 'streamRun'); + assert.strictEqual(paramType, 'BODY'); + }); + + it('should set BODY param at index 0 for syncRun', () => { + const paramType = HTTPInfoUtil.getHTTPMethodParamType(0, AgentFooController, 'syncRun'); + assert.strictEqual(paramType, 'BODY'); + }); + + it('should set PARAM at index 0 with name "id" for getThread', () => { + const paramType = HTTPInfoUtil.getHTTPMethodParamType(0, AgentFooController, 'getThread'); + assert.strictEqual(paramType, 'PARAM'); + const paramName = HTTPInfoUtil.getHTTPMethodParamName(0, AgentFooController, 'getThread'); + assert.strictEqual(paramName, 'id'); + }); + + it('should set PARAM at index 0 with name "id" for getRun', () => { + const paramType = HTTPInfoUtil.getHTTPMethodParamType(0, AgentFooController, 'getRun'); + assert.strictEqual(paramType, 'PARAM'); + const paramName = HTTPInfoUtil.getHTTPMethodParamName(0, AgentFooController, 'getRun'); + assert.strictEqual(paramName, 'id'); + }); + + it('should set PARAM at index 0 with name "id" for cancelRun', () => { + const paramType = HTTPInfoUtil.getHTTPMethodParamType(0, AgentFooController, 'cancelRun'); + assert.strictEqual(paramType, 'PARAM'); + const paramName = HTTPInfoUtil.getHTTPMethodParamName(0, AgentFooController, 'cancelRun'); + assert.strictEqual(paramName, 'id'); + }); + + it('should not have params for createThread', () => { + const paramIndexList = HTTPInfoUtil.getParamIndexList(AgentFooController, 'createThread'); + assert.strictEqual(paramIndexList.length, 0); + }); + }); + + describe('context index', () => { + it('should not set contextIndex on any method', () => { + const methods = ['createThread', 'getThread', 'asyncRun', 'streamRun', 'syncRun', 'getRun', 'cancelRun']; + for (const methodName of methods) { + const contextIndex = MethodInfoUtil.getMethodContextIndex(AgentFooController, methodName); + assert.strictEqual(contextIndex, undefined, `${methodName} should not have contextIndex`); + } + }); + }); + + describe('default implementations', () => { + it('should inject default stubs for all 7 route methods', () => { + // AgentFooController only implements execRun (smart defaults pattern) + // All 7 route methods should have stub defaults that throw + const proto = AgentFooController.prototype as any; + const routeMethods = ['createThread', 'getThread', 'asyncRun', 'streamRun', 'syncRun', 'getRun', 'cancelRun']; + for (const methodName of routeMethods) { + assert(typeof proto[methodName] === 'function', `${methodName} should be a function`); + assert.strictEqual( + AgentInfoUtil.isNotImplemented(proto[methodName]), + true, + `${methodName} should be marked as AGENT_NOT_IMPLEMENTED`, + ); + } + }); + + const stubMethods = [ + { name: 'createThread', args: [] }, + { name: 'getThread', args: ['thread_1'] }, + { name: 'asyncRun', args: [{ input: { messages: [] } }] }, + { name: 'streamRun', args: [{ input: { messages: [] } }] }, + { name: 'syncRun', args: [{ input: { messages: [] } }] }, + { name: 'getRun', args: ['run_1'] }, + { name: 'cancelRun', args: ['run_1'] }, + ]; + + for (const { name, args } of stubMethods) { + it(`should throw for unimplemented ${name}`, async () => { + const instance = new AgentFooController() as any; + await assert.rejects(() => instance[name](...args), new RegExp(`${name} not implemented`)); + }); + } + }); + + describe('HTTPControllerMetaBuilder integration', () => { + it('should build metadata with 7 HTTPMethodMeta entries', () => { + const meta = ControllerMetaBuilderFactory.build(AgentFooController, ControllerType.HTTP) as HTTPControllerMeta; + assert(meta); + assert.strictEqual(meta.methods.length, 7); + assert.strictEqual(meta.path, '/api/v1'); + }); + + it('should produce correct route metadata for each method', () => { + const meta = ControllerMetaBuilderFactory.build(AgentFooController, ControllerType.HTTP) as HTTPControllerMeta; + + const createThread = meta.methods.find((m) => m.name === 'createThread')!; + assert.strictEqual(createThread.path, '/threads'); + assert.strictEqual(createThread.method, HTTPMethodEnum.POST); + assert.strictEqual(createThread.paramMap.size, 0); + + const getThread = meta.methods.find((m) => m.name === 'getThread')!; + assert.strictEqual(getThread.path, '/threads/:id'); + assert.strictEqual(getThread.method, HTTPMethodEnum.GET); + assert.deepStrictEqual(getThread.paramMap, new Map([[0, new PathParamMeta('id')]])); + + const asyncRun = meta.methods.find((m) => m.name === 'asyncRun')!; + assert.strictEqual(asyncRun.path, '/runs'); + assert.strictEqual(asyncRun.method, HTTPMethodEnum.POST); + assert.deepStrictEqual(asyncRun.paramMap, new Map([[0, new BodyParamMeta()]])); + + const streamRun = meta.methods.find((m) => m.name === 'streamRun')!; + assert.strictEqual(streamRun.path, '/runs/stream'); + assert.strictEqual(streamRun.method, HTTPMethodEnum.POST); + assert.deepStrictEqual(streamRun.paramMap, new Map([[0, new BodyParamMeta()]])); + + const syncRun = meta.methods.find((m) => m.name === 'syncRun')!; + assert.strictEqual(syncRun.path, '/runs/wait'); + assert.strictEqual(syncRun.method, HTTPMethodEnum.POST); + assert.deepStrictEqual(syncRun.paramMap, new Map([[0, new BodyParamMeta()]])); + + const getRun = meta.methods.find((m) => m.name === 'getRun')!; + assert.strictEqual(getRun.path, '/runs/:id'); + assert.strictEqual(getRun.method, HTTPMethodEnum.GET); + assert.deepStrictEqual(getRun.paramMap, new Map([[0, new PathParamMeta('id')]])); + + const cancelRun = meta.methods.find((m) => m.name === 'cancelRun')!; + assert.strictEqual(cancelRun.path, '/runs/:id/cancel'); + assert.strictEqual(cancelRun.method, HTTPMethodEnum.POST); + assert.deepStrictEqual(cancelRun.paramMap, new Map([[0, new PathParamMeta('id')]])); + }); + + it('should have all real paths start with /', () => { + const meta = ControllerMetaBuilderFactory.build(AgentFooController, ControllerType.HTTP) as HTTPControllerMeta; + for (const method of meta.methods) { + const realPath = meta.getMethodRealPath(method); + assert(realPath.startsWith('/'), `${method.name} real path "${realPath}" should start with /`); + } + }); + }); +}); diff --git a/tegg/core/controller-decorator/test/fixtures/AgentFooController.ts b/tegg/core/controller-decorator/test/fixtures/AgentFooController.ts new file mode 100644 index 0000000000..2187c3a962 --- /dev/null +++ b/tegg/core/controller-decorator/test/fixtures/AgentFooController.ts @@ -0,0 +1,18 @@ +import { AgentController } from '../../src/decorator/agent/AgentController.ts'; +import type { AgentHandler } from '../../src/decorator/agent/AgentHandler.ts'; +import type { CreateRunInput, AgentStreamMessage } from '../../src/model/AgentControllerTypes.ts'; + +// AgentController that only implements execRun (smart defaults pattern) +@AgentController() +export class AgentFooController implements AgentHandler { + async *execRun(input: CreateRunInput): AsyncGenerator { + const messages = input.input.messages; + yield { + type: 'assistant', + message: { + role: 'assistant', + content: [{ type: 'text', text: `Processed ${messages.length} messages` }], + }, + }; + } +} diff --git a/tegg/core/tegg/package.json b/tegg/core/tegg/package.json index 39efdb5940..b7e1721927 100644 --- a/tegg/core/tegg/package.json +++ b/tegg/core/tegg/package.json @@ -35,6 +35,7 @@ "./orm": "./src/orm.ts", "./schedule": "./src/schedule.ts", "./standalone": "./src/standalone.ts", + "./agent": "./src/agent.ts", "./transaction": "./src/transaction.ts", "./package.json": "./package.json" }, @@ -42,6 +43,7 @@ "access": "public", "exports": { ".": "./dist/index.js", + "./agent": "./dist/agent.js", "./ajv": "./dist/ajv.js", "./aop": "./dist/aop.js", "./dal": "./dist/dal.js", @@ -57,6 +59,7 @@ "typecheck": "tsgo --noEmit" }, "dependencies": { + "@eggjs/agent-runtime": "workspace:*", "@eggjs/ajv-decorator": "workspace:*", "@eggjs/aop-decorator": "workspace:*", "@eggjs/background-task": "workspace:*", diff --git a/tegg/core/tegg/src/agent.ts b/tegg/core/tegg/src/agent.ts new file mode 100644 index 0000000000..58f7a52054 --- /dev/null +++ b/tegg/core/tegg/src/agent.ts @@ -0,0 +1,30 @@ +// AgentController decorator from controller-decorator (no wrapper needed) +export { AgentController } from '@eggjs/controller-decorator'; + +// Utility types and classes from agent-runtime +export type { AgentStore, ThreadRecord, RunRecord } from '@eggjs/agent-runtime'; +export { AgentNotFoundError, AgentConflictError, FileAgentStore } from '@eggjs/agent-runtime'; + +// Original types and interfaces from controller-decorator +export type { AgentHandler } from '@eggjs/controller-decorator'; +export type { + // Input types + InputMessage, + InputContentPart, + // Output types (OpenAI-aligned) + MessageObject, + MessageContentBlock, + TextContentBlock, + MessageDeltaObject, + ThreadObject, + ThreadObjectWithMessages, + RunObject, + RunStatus, + // Internal types + AgentRunConfig, + AgentRunUsage, + // Request types + CreateRunInput, + // Stream types + AgentStreamMessage, +} from '@eggjs/controller-decorator'; diff --git a/tegg/core/types/src/controller-decorator/MetadataKey.ts b/tegg/core/types/src/controller-decorator/MetadataKey.ts index f0998c364b..11348fa84a 100644 --- a/tegg/core/types/src/controller-decorator/MetadataKey.ts +++ b/tegg/core/types/src/controller-decorator/MetadataKey.ts @@ -38,3 +38,9 @@ export const CONTROLLER_MCP_PROMPT_PARAMS_MAP: symbol = Symbol.for('EggPrototype export const CONTROLLER_MCP_PROMPT_ARGS_INDEX: symbol = Symbol.for('EggPrototype#controller#mcp#prompt#args'); export const METHOD_TIMEOUT_METADATA: symbol = Symbol.for('EggPrototype#method#timeout'); + +export const CONTROLLER_AGENT_CONTROLLER: symbol = Symbol.for('EggPrototype#controller#agent#isAgent'); +export const CONTROLLER_AGENT_NOT_IMPLEMENTED: symbol = Symbol.for('EggPrototype#controller#agent#notImplemented'); +export const CONTROLLER_AGENT_ENHANCED: symbol = Symbol.for('EggPrototype#controller#agent#enhanced'); + +export const AGENT_CONTROLLER_PROTO_IMPL_TYPE = 'AGENT_CONTROLLER_PROTO'; diff --git a/tegg/plugin/controller/package.json b/tegg/plugin/controller/package.json index b3861d837b..bcb96a84a2 100644 --- a/tegg/plugin/controller/package.json +++ b/tegg/plugin/controller/package.json @@ -88,6 +88,7 @@ "typecheck": "tsgo --noEmit" }, "dependencies": { + "@eggjs/agent-runtime": "workspace:*", "@eggjs/controller-decorator": "workspace:*", "@eggjs/core-decorator": "workspace:*", "@eggjs/lifecycle": "workspace:*", diff --git a/tegg/plugin/controller/src/app.ts b/tegg/plugin/controller/src/app.ts index 2990cf8142..837c46fdef 100644 --- a/tegg/plugin/controller/src/app.ts +++ b/tegg/plugin/controller/src/app.ts @@ -3,8 +3,11 @@ import assert from 'node:assert'; import { ControllerMetaBuilderFactory, ControllerType } from '@eggjs/controller-decorator'; import { GlobalGraph, type LoadUnitLifecycleContext } from '@eggjs/metadata'; import { type LoadUnitInstanceLifecycleContext, ModuleLoadUnitInstance } from '@eggjs/tegg-runtime'; +import { AGENT_CONTROLLER_PROTO_IMPL_TYPE } from '@eggjs/tegg-types'; import type { Application, ILifecycleBoot } from 'egg'; +import { AgentControllerObject } from './lib/AgentControllerObject.ts'; +import { AgentControllerProto } from './lib/AgentControllerProto.ts'; import { AppLoadUnitControllerHook } from './lib/AppLoadUnitControllerHook.ts'; import { CONTROLLER_LOAD_UNIT, ControllerLoadUnit } from './lib/ControllerLoadUnit.ts'; import { ControllerLoadUnitHandler } from './lib/ControllerLoadUnitHandler.ts'; @@ -37,11 +40,17 @@ export default class ControllerAppBootHook implements ILifecycleBoot { this.app.controllerMetaBuilderFactory = ControllerMetaBuilderFactory; this.loadUnitHook = new AppLoadUnitControllerHook(this.controllerRegisterFactory, this.app.rootProtoManager); this.controllerPrototypeHook = new EggControllerPrototypeHook(); + this.app.eggPrototypeCreatorFactory.registerPrototypeCreator( + AGENT_CONTROLLER_PROTO_IMPL_TYPE, + AgentControllerProto.createProto, + ); + AgentControllerObject.setLogger(this.app.logger); } configWillLoad(): void { this.app.loadUnitLifecycleUtil.registerLifecycle(this.loadUnitHook); this.app.eggPrototypeLifecycleUtil.registerLifecycle(this.controllerPrototypeHook); + this.app.eggObjectFactory.registerEggObjectCreateMethod(AgentControllerProto, AgentControllerObject.createObject); this.app.loaderFactory.registerLoader(CONTROLLER_LOAD_UNIT, (unitPath) => { return new EggControllerLoader(unitPath); }); diff --git a/tegg/plugin/controller/src/lib/AgentControllerObject.ts b/tegg/plugin/controller/src/lib/AgentControllerObject.ts new file mode 100644 index 0000000000..431ca5207e --- /dev/null +++ b/tegg/plugin/controller/src/lib/AgentControllerObject.ts @@ -0,0 +1,258 @@ +import path from 'node:path'; + +import { + type AgentRuntime, + type AgentControllerHost, + type AgentRuntimeLogger, + AGENT_RUNTIME, + createAgentRuntime, + FileAgentStore, + NodeSSEWriter, +} from '@eggjs/agent-runtime'; +import type { AgentStore } from '@eggjs/agent-runtime'; +import { AgentInfoUtil } from '@eggjs/controller-decorator'; +import type { CreateRunInput } from '@eggjs/controller-decorator'; +import { IdenticalUtil } from '@eggjs/lifecycle'; +import { LoadUnitFactory } from '@eggjs/metadata'; +import { EGG_CONTEXT } from '@eggjs/module-common'; +import { ContextHandler, EggContainerFactory, EggObjectLifecycleUtil, EggObjectUtil } from '@eggjs/tegg-runtime'; +import type { + EggObject, + EggObjectLifeCycleContext, + EggObjectLifecycle, + EggObjectName, + EggPrototype, + EggPrototypeName, +} from '@eggjs/tegg-types'; +import { EggObjectStatus, ObjectInitType } from '@eggjs/tegg-types'; + +import { AgentControllerProto } from './AgentControllerProto.ts'; + +/** Method names that can be delegated to AgentRuntime. */ +type AgentMethodName = 'createThread' | 'getThread' | 'asyncRun' | 'syncRun' | 'getRun' | 'cancelRun'; + +const AGENT_METHOD_NAMES: AgentMethodName[] = [ + 'createThread', + 'getThread', + 'asyncRun', + 'syncRun', + 'getRun', + 'cancelRun', +]; + +/** + * Custom EggObject for @AgentController classes. + * + * Replicates the full EggObjectImpl.initWithInjectProperty lifecycle and + * inserts AgentRuntime delegate installation between postInject and init + * hooks — exactly where the user's `init()` expects runtime to be ready. + */ +export class AgentControllerObject implements EggObject { + private static logger: AgentRuntimeLogger; + + private _obj!: object; + private status: EggObjectStatus = EggObjectStatus.PENDING; + private runtime: AgentRuntime | undefined; + + readonly id: string; + readonly name: EggPrototypeName; + readonly proto: AgentControllerProto; + + /** Inject a logger to be used by all AgentRuntime instances. */ + static setLogger(logger: AgentRuntimeLogger): void { + AgentControllerObject.logger = logger; + } + + constructor(name: EggObjectName, proto: AgentControllerProto) { + this.name = name; + this.proto = proto; + const ctx = ContextHandler.getContext(); + this.id = IdenticalUtil.createObjectId(this.proto.id, ctx?.id); + } + + get obj(): object { + return this._obj; + } + + get isReady(): boolean { + return this.status === EggObjectStatus.READY; + } + + injectProperty(name: EggObjectName, descriptor: PropertyDescriptor): void { + Reflect.defineProperty(this._obj, name, descriptor); + } + + /** + * Full lifecycle sequence mirroring EggObjectImpl.initWithInjectProperty, + * with AgentRuntime installation inserted between postInject and init. + */ + async init(ctx: EggObjectLifeCycleContext): Promise { + try { + // 1. Construct object + this._obj = this.proto.constructEggObject(); + const objLifecycleHook = this._obj as EggObjectLifecycle; + + // 2. Global preCreate hook + await EggObjectLifecycleUtil.objectPreCreate(ctx, this); + + // 3. Self postConstruct hook + const postConstructMethod = + EggObjectLifecycleUtil.getLifecycleHook('postConstruct', this.proto) ?? 'postConstruct'; + if (objLifecycleHook[postConstructMethod]) { + await objLifecycleHook[postConstructMethod](ctx, this); + } + + // 4. Self preInject hook + const preInjectMethod = EggObjectLifecycleUtil.getLifecycleHook('preInject', this.proto) ?? 'preInject'; + if (objLifecycleHook[preInjectMethod]) { + await objLifecycleHook[preInjectMethod](ctx, this); + } + + // 5. Inject dependencies + await Promise.all( + this.proto.injectObjects.map(async (injectObject) => { + const proto = injectObject.proto; + const loadUnit = LoadUnitFactory.getLoadUnitById(proto.loadUnitId); + if (!loadUnit) { + throw new Error(`can not find load unit: ${proto.loadUnitId}`); + } + if ( + this.proto.initType !== ObjectInitType.CONTEXT && + injectObject.proto.initType === ObjectInitType.CONTEXT + ) { + this.injectProperty( + injectObject.refName, + EggObjectUtil.contextEggObjectGetProperty(proto, injectObject.objName), + ); + } else { + const injectObj = await EggContainerFactory.getOrCreateEggObject(proto, injectObject.objName); + this.injectProperty(injectObject.refName, EggObjectUtil.eggObjectGetProperty(injectObj)); + } + }), + ); + + // 6. Global postCreate hook + await EggObjectLifecycleUtil.objectPostCreate(ctx, this); + + // 7. Self postInject hook + const postInjectMethod = EggObjectLifecycleUtil.getLifecycleHook('postInject', this.proto) ?? 'postInject'; + if (objLifecycleHook[postInjectMethod]) { + await objLifecycleHook[postInjectMethod](ctx, this); + } + + // === AgentRuntime installation (before user init) === + await this.installAgentRuntime(); + + // 8. Self init hook (user's init()) + const initMethod = EggObjectLifecycleUtil.getLifecycleHook('init', this.proto) ?? 'init'; + if (objLifecycleHook[initMethod]) { + await objLifecycleHook[initMethod](ctx, this); + } + + // 9. Ready + this.status = EggObjectStatus.READY; + } catch (e) { + this.status = EggObjectStatus.ERROR; + throw e; + } + } + + async destroy(ctx: EggObjectLifeCycleContext): Promise { + if (this.status === EggObjectStatus.READY) { + this.status = EggObjectStatus.DESTROYING; + + // Destroy AgentRuntime first (waits for in-flight tasks) + if (this.runtime) { + await this.runtime.destroy(); + } + + // Global preDestroy hook + await EggObjectLifecycleUtil.objectPreDestroy(ctx, this); + + // Self lifecycle hooks + const objLifecycleHook = this._obj as EggObjectLifecycle; + const preDestroyMethod = EggObjectLifecycleUtil.getLifecycleHook('preDestroy', this.proto) ?? 'preDestroy'; + if (objLifecycleHook[preDestroyMethod]) { + await objLifecycleHook[preDestroyMethod](ctx, this); + } + + const destroyMethod = EggObjectLifecycleUtil.getLifecycleHook('destroy', this.proto) ?? 'destroy'; + if (objLifecycleHook[destroyMethod]) { + await objLifecycleHook[destroyMethod](ctx, this); + } + + this.status = EggObjectStatus.DESTROYED; + } + } + + /** + * Create AgentRuntime and install delegate methods on the instance. + * Logic ported from the removed enhanceAgentController.ts. + */ + private async installAgentRuntime(): Promise { + const instance = this._obj as Record; + + // Determine which methods are stubs vs user-defined + const stubMethods = new Set(); + for (const name of AGENT_METHOD_NAMES) { + const method = instance[name]; + if (typeof method !== 'function' || AgentInfoUtil.isNotImplemented(method)) { + stubMethods.add(name); + } + } + const streamRunFn = instance['streamRun']; + const streamRunIsStub = typeof streamRunFn !== 'function' || AgentInfoUtil.isNotImplemented(streamRunFn); + + // Create store (support user-defined createStore()) + let store: AgentStore; + const createStoreFn = instance['createStore']; + if (typeof createStoreFn === 'function') { + store = (await Reflect.apply(createStoreFn, this._obj, [])) as AgentStore; + } else { + const dataDir = process.env.TEGG_AGENT_DATA_DIR || path.join(process.cwd(), '.agent-data'); + store = new FileAgentStore({ dataDir }); + } + if (store.init) { + await store.init(); + } + + // Create runtime with injected logger + const runtime = createAgentRuntime({ + host: this._obj as AgentControllerHost, + store, + logger: AgentControllerObject.logger, + }); + this.runtime = runtime; + instance[AGENT_RUNTIME] = runtime; + + // Install delegate methods for stubs (type-safe: all names are keys of AgentRuntime) + for (const methodName of stubMethods) { + const runtimeMethod = runtime[methodName].bind(runtime); + instance[methodName] = runtimeMethod; + } + + // streamRun needs special handling: create NodeSSEWriter from request context + if (streamRunIsStub) { + instance['streamRun'] = async (input: CreateRunInput): Promise => { + const runtimeCtx = ContextHandler.getContext(); + if (!runtimeCtx) { + throw new Error('streamRun must be called within a request context'); + } + const eggCtx = runtimeCtx.get(EGG_CONTEXT); + eggCtx.respond = false; + const writer = new NodeSSEWriter(eggCtx.res); + return runtime.streamRun(input, writer); + }; + } + } + + static async createObject( + name: EggObjectName, + proto: EggPrototype, + lifecycleContext: EggObjectLifeCycleContext, + ): Promise { + const obj = new AgentControllerObject(name, proto as AgentControllerProto); + await obj.init(lifecycleContext); + return obj; + } +} diff --git a/tegg/plugin/controller/src/lib/AgentControllerProto.ts b/tegg/plugin/controller/src/lib/AgentControllerProto.ts new file mode 100644 index 0000000000..1a92d26451 --- /dev/null +++ b/tegg/plugin/controller/src/lib/AgentControllerProto.ts @@ -0,0 +1,105 @@ +import { EggPrototypeCreatorFactory } from '@eggjs/metadata'; +import type { + AccessLevel, + EggPrototype, + EggPrototypeCreator, + EggPrototypeLifecycleContext, + EggPrototypeName, + InjectConstructorProto, + InjectObjectProto, + InjectType, + MetaDataKey, + ObjectInitTypeLike, + QualifierAttribute, + QualifierInfo, + QualifierValue, +} from '@eggjs/tegg-types'; +import { DEFAULT_PROTO_IMPL_TYPE } from '@eggjs/tegg-types'; + +/** + * Wraps a standard EggPrototypeImpl (created by the DEFAULT creator) to + * provide a distinct class identity so that EggObjectFactory can dispatch + * to AgentControllerObject.createObject. + * + * All EggPrototype interface members are delegated to the inner proto. + * Symbol-keyed properties (qualifier descriptors set by the runtime) are + * forwarded via a Proxy on `this`. + */ +export class AgentControllerProto implements EggPrototype { + [key: symbol]: PropertyDescriptor; + + private readonly delegate: EggPrototype; + + constructor(delegate: EggPrototype) { + this.delegate = delegate; + + // Copy symbol-keyed properties from delegate (qualifier descriptors, etc.) + for (const sym of Object.getOwnPropertySymbols(delegate)) { + const desc = Object.getOwnPropertyDescriptor(delegate, sym); + if (desc) { + Object.defineProperty(this, sym, desc); + } + } + } + + get id(): string { + return this.delegate.id; + } + get name(): EggPrototypeName { + return this.delegate.name; + } + get initType(): ObjectInitTypeLike { + return this.delegate.initType; + } + get accessLevel(): AccessLevel { + return this.delegate.accessLevel; + } + get loadUnitId(): string { + return this.delegate.loadUnitId; + } + get injectObjects(): Array { + return this.delegate.injectObjects; + } + get injectType(): InjectType | undefined { + return this.delegate.injectType; + } + get className(): string | undefined { + return this.delegate.className; + } + get multiInstanceConstructorIndex(): number | undefined { + return this.delegate.multiInstanceConstructorIndex; + } + get multiInstanceConstructorAttributes(): QualifierAttribute[] | undefined { + return this.delegate.multiInstanceConstructorAttributes; + } + + getMetaData(metadataKey: MetaDataKey): T | undefined { + return this.delegate.getMetaData(metadataKey); + } + + verifyQualifier(qualifier: QualifierInfo): boolean { + return this.delegate.verifyQualifier(qualifier); + } + + verifyQualifiers(qualifiers: QualifierInfo[]): boolean { + return this.delegate.verifyQualifiers(qualifiers); + } + + getQualifier(attribute: QualifierAttribute): QualifierValue | undefined { + return this.delegate.getQualifier(attribute); + } + + constructEggObject(...args: any): object { + return this.delegate.constructEggObject(...args); + } + + static createProto(ctx: EggPrototypeLifecycleContext): AgentControllerProto { + const defaultCreator: EggPrototypeCreator | undefined = + EggPrototypeCreatorFactory.getPrototypeCreator(DEFAULT_PROTO_IMPL_TYPE); + if (!defaultCreator) { + throw new Error(`Default prototype creator (${DEFAULT_PROTO_IMPL_TYPE}) not registered`); + } + const delegate = defaultCreator(ctx); + return new AgentControllerProto(delegate); + } +} diff --git a/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/app/controller/AgentTestController.ts b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/app/controller/AgentTestController.ts new file mode 100644 index 0000000000..6ee026b00d --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/app/controller/AgentTestController.ts @@ -0,0 +1,204 @@ +import { AgentController, AgentNotFoundError } from '@eggjs/tegg/agent'; +import type { + AgentHandler, + CreateRunInput, + RunObject, + ThreadObject, + ThreadObjectWithMessages, + MessageObject, + MessageDeltaObject, + AgentStreamMessage, +} from '@eggjs/tegg/agent'; +import { ContextHandler } from '@eggjs/tegg/helper'; + +// Canonical definition in @eggjs/module-common +const EGG_CONTEXT: symbol = Symbol.for('context#eggContext'); + +// In-memory store for threads and runs +const threads = new Map< + string, + { id: string; messages: MessageObject[]; created_at: number; metadata: Record } +>(); +const runs = new Map< + string, + { id: string; thread_id?: string; status: string; input: any[]; output?: MessageObject[]; created_at: number } +>(); + +let threadCounter = 0; +let runCounter = 0; + +function nowUnix(): number { + return Math.floor(Date.now() / 1000); +} + +@AgentController() +export class AgentTestController implements AgentHandler { + // Required by AgentHandler — noop since all route methods are overridden + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async *execRun(_input: CreateRunInput): AsyncGenerator { + // All routes are manually implemented; this is never called. + } + + async createThread(): Promise { + const threadId = `thread_${++threadCounter}`; + const now = nowUnix(); + threads.set(threadId, { id: threadId, messages: [], created_at: now, metadata: {} }); + return { id: threadId, object: 'thread', created_at: now, metadata: {} }; + } + + async getThread(threadId: string): Promise { + const thread = threads.get(threadId); + if (!thread) { + throw new AgentNotFoundError(`Thread ${threadId} not found`); + } + return { + id: thread.id, + object: 'thread', + messages: thread.messages, + created_at: thread.created_at, + metadata: thread.metadata, + }; + } + + async asyncRun(input: CreateRunInput): Promise { + const runId = `run_${++runCounter}`; + const now = nowUnix(); + runs.set(runId, { + id: runId, + thread_id: input.thread_id, + status: 'queued', + input: input.input.messages, + created_at: now, + }); + return { id: runId, object: 'thread.run', created_at: now, status: 'queued' }; + } + + async streamRun(input: CreateRunInput): Promise { + const runtimeCtx = ContextHandler.getContext()!; + const ctx = runtimeCtx.get(EGG_CONTEXT); + + // Bypass Koa response handling — write SSE directly to the raw response + ctx.respond = false; + const res = ctx.res; + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + + const runId = `run_${++runCounter}`; + const messages = input.input.messages; + const outputContent = `Streamed ${messages.length} messages`; + const now = nowUnix(); + + const runObj: RunObject = { id: runId, object: 'thread.run', created_at: now, status: 'queued' }; + res.write(`event: thread.run.created\ndata: ${JSON.stringify(runObj)}\n\n`); + + runObj.status = 'in_progress'; + res.write(`event: thread.run.in_progress\ndata: ${JSON.stringify(runObj)}\n\n`); + + const msgId = `msg_${runCounter}`; + const msgObj: MessageObject = { + id: msgId, + object: 'thread.message', + created_at: now, + run_id: runId, + role: 'assistant', + status: 'in_progress', + content: [], + }; + res.write(`event: thread.message.created\ndata: ${JSON.stringify(msgObj)}\n\n`); + + const contentBlock = { type: 'text' as const, text: { value: outputContent, annotations: [] as unknown[] } }; + const delta: MessageDeltaObject = { + id: msgId, + object: 'thread.message.delta', + delta: { content: [contentBlock] }, + }; + res.write(`event: thread.message.delta\ndata: ${JSON.stringify(delta)}\n\n`); + + msgObj.status = 'completed'; + msgObj.content = [contentBlock]; + res.write(`event: thread.message.completed\ndata: ${JSON.stringify(msgObj)}\n\n`); + + const outputMsg: MessageObject = { ...msgObj }; + runObj.status = 'completed'; + runObj.output = [outputMsg]; + res.write(`event: thread.run.completed\ndata: ${JSON.stringify(runObj)}\n\n`); + + res.write('event: done\ndata: [DONE]\n\n'); + res.end(); + + runs.set(runId, { + id: runId, + status: 'completed', + input: messages, + output: [outputMsg], + created_at: now, + }); + } + + async syncRun(input: CreateRunInput): Promise { + const runId = `run_${++runCounter}`; + const messages = input.input.messages; + const now = nowUnix(); + const output: MessageObject[] = [ + { + id: `msg_${runCounter}`, + object: 'thread.message', + created_at: now, + role: 'assistant', + status: 'completed', + content: [ + { + type: 'text', + text: { value: `Processed ${messages.length} messages`, annotations: [] }, + }, + ], + }, + ]; + runs.set(runId, { + id: runId, + thread_id: input.thread_id, + status: 'completed', + input: messages, + output, + created_at: now, + }); + return { + id: runId, + object: 'thread.run', + created_at: now, + status: 'completed', + output, + }; + } + + async getRun(runId: string): Promise { + const run = runs.get(runId); + if (!run) { + throw new AgentNotFoundError(`Run ${runId} not found`); + } + return { + id: run.id, + object: 'thread.run', + created_at: run.created_at, + thread_id: run.thread_id, + status: run.status as any, + output: run.output, + }; + } + + async cancelRun(runId: string): Promise { + const run = runs.get(runId); + if (run) { + run.status = 'cancelled'; + } + return { + id: runId, + object: 'thread.run', + created_at: run?.created_at ?? nowUnix(), + status: 'cancelled', + }; + } +} diff --git a/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/config/config.default.ts b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/config/config.default.ts new file mode 100644 index 0000000000..39984fbf2e --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/config/config.default.ts @@ -0,0 +1,9 @@ +export default () => { + const config = { + keys: 'test key', + security: { + csrf: false, + }, + }; + return config; +}; diff --git a/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/config/plugin.ts b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/config/plugin.ts new file mode 100644 index 0000000000..31a66b8e4f --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/config/plugin.ts @@ -0,0 +1,18 @@ +export default { + tracer: { + package: '@eggjs/tracer', + enable: true, + }, + tegg: { + package: '@eggjs/tegg-plugin', + enable: true, + }, + teggConfig: { + package: '@eggjs/tegg-config', + enable: true, + }, + teggController: { + package: '@eggjs/controller-plugin', + enable: true, + }, +}; diff --git a/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/package.json b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/package.json new file mode 100644 index 0000000000..0b8e77503f --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/agent-controller-app/package.json @@ -0,0 +1,4 @@ +{ + "name": "agent-controller-app", + "type": "module" +} diff --git a/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/app/controller/BaseAgentController.ts b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/app/controller/BaseAgentController.ts new file mode 100644 index 0000000000..2aa189e670 --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/app/controller/BaseAgentController.ts @@ -0,0 +1,42 @@ +import { AgentController } from '@eggjs/tegg/agent'; +import type { AgentHandler, CreateRunInput, AgentStreamMessage } from '@eggjs/tegg/agent'; + +function sleep(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(resolve, ms); + signal?.addEventListener( + 'abort', + () => { + clearTimeout(timer); + const err = new Error('Aborted'); + err.name = 'AbortError'; + reject(err); + }, + { once: true }, + ); + }); +} + +@AgentController() +export class BaseAgentController implements AgentHandler { + async *execRun(input: CreateRunInput, signal?: AbortSignal): AsyncGenerator { + const messages = input.input.messages; + + // If the first message asks to cancel, add a delay so cancel tests can catch it + if (messages[0]?.content === 'cancel me') { + await sleep(2000, signal); + } + + yield { + type: 'assistant', + message: { + role: 'assistant', + content: [{ type: 'text', text: `Processed ${messages.length} messages` }], + }, + }; + yield { + type: 'result', + usage: { prompt_tokens: 10, completion_tokens: 5 }, + }; + } +} diff --git a/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/config/config.default.ts b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/config/config.default.ts new file mode 100644 index 0000000000..39984fbf2e --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/config/config.default.ts @@ -0,0 +1,9 @@ +export default () => { + const config = { + keys: 'test key', + security: { + csrf: false, + }, + }; + return config; +}; diff --git a/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/config/plugin.ts b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/config/plugin.ts new file mode 100644 index 0000000000..31a66b8e4f --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/config/plugin.ts @@ -0,0 +1,18 @@ +export default { + tracer: { + package: '@eggjs/tracer', + enable: true, + }, + tegg: { + package: '@eggjs/tegg-plugin', + enable: true, + }, + teggConfig: { + package: '@eggjs/tegg-config', + enable: true, + }, + teggController: { + package: '@eggjs/controller-plugin', + enable: true, + }, +}; diff --git a/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/package.json b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/package.json new file mode 100644 index 0000000000..9b54c42a62 --- /dev/null +++ b/tegg/plugin/controller/test/fixtures/apps/base-agent-controller-app/package.json @@ -0,0 +1,4 @@ +{ + "name": "base-agent-controller-app", + "type": "module" +} diff --git a/tegg/plugin/controller/test/http/agent.test.ts b/tegg/plugin/controller/test/http/agent.test.ts new file mode 100644 index 0000000000..371c593204 --- /dev/null +++ b/tegg/plugin/controller/test/http/agent.test.ts @@ -0,0 +1,328 @@ +import { strict as assert } from 'node:assert'; +import { rm } from 'node:fs/promises'; +import path from 'node:path'; + +import { mm, type MockApplication } from '@eggjs/mock'; +import { describe, it, afterEach, beforeAll, afterAll } from 'vitest'; + +import { getFixtures } from '../utils.ts'; + +describe('plugin/controller/test/http/agent.test.ts', () => { + let app: MockApplication; + const agentDataDir = path.join(getFixtures('apps/agent-controller-app'), '.agent-data'); + + afterEach(() => { + return mm.restore(); + }); + + beforeAll(async () => { + mm(process.env, 'TEGG_AGENT_DATA_DIR', agentDataDir); + app = mm.app({ + baseDir: getFixtures('apps/agent-controller-app'), + }); + await app.ready(); + }); + + afterAll(async () => { + await app.close(); + await rm(agentDataDir, { recursive: true, force: true }).catch(() => { + /* ignore */ + }); + }); + + describe('POST /api/v1/threads (createThread)', () => { + it('should create a new thread', async () => { + const res = await app.httpRequest().post('/api/v1/threads').send({}).expect(200); + assert(res.body.id); + assert(typeof res.body.id === 'string'); + assert.equal(res.body.object, 'thread'); + assert(typeof res.body.created_at === 'number'); + assert(typeof res.body.metadata === 'object'); + }); + }); + + describe('GET /api/v1/threads/:id (getThread)', () => { + it('should get a thread by id', async () => { + // First create a thread + const createRes = await app.httpRequest().post('/api/v1/threads').send({}).expect(200); + const threadId = createRes.body.id; + + // Then get the thread + const res = await app.httpRequest().get(`/api/v1/threads/${threadId}`).expect(200); + assert.equal(res.body.id, threadId); + assert.equal(res.body.object, 'thread'); + assert(Array.isArray(res.body.messages)); + assert(typeof res.body.created_at === 'number'); + }); + + it('should return 404 for non-existent thread', async () => { + await app.httpRequest().get('/api/v1/threads/non_existent').expect(404); + }); + }); + + describe('POST /api/v1/runs (asyncRun)', () => { + it('should create an async run', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'Hello' }], + }, + }) + .expect(200); + assert(res.body.id); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'queued'); + }); + + it('should create an async run with thread_id', async () => { + const createRes = await app.httpRequest().post('/api/v1/threads').send({}).expect(200); + + const res = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + thread_id: createRes.body.id, + input: { + messages: [{ role: 'user', content: 'Hello from thread' }], + }, + }) + .expect(200); + assert(res.body.id); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'queued'); + }); + }); + + describe('POST /api/v1/runs/stream (streamRun)', () => { + it('should stream SSE events with OpenAI format', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/stream') + .send({ + input: { + messages: [{ role: 'user', content: 'Stream me' }], + }, + }) + .buffer(true) + .expect(200) + .expect('Content-Type', /text\/event-stream/); + + // Parse SSE events from response text + const events: { event: string; data: any }[] = []; + const rawEvents = res.text.split('\n\n').filter(Boolean); + for (const raw of rawEvents) { + const lines = raw.split('\n'); + let event = ''; + let data = ''; + for (const line of lines) { + if (line.startsWith('event: ')) event = line.slice(7); + if (line.startsWith('data: ')) data = line.slice(6); + } + if (event && data) { + try { + events.push({ event, data: JSON.parse(data) }); + } catch { + events.push({ event, data }); + } + } + } + + // Verify SSE events in OpenAI format + assert(events.length >= 6); // at least: run.created, run.in_progress, msg.created, msg.delta, msg.completed, run.completed, done + + assert.equal(events[0].event, 'thread.run.created'); + assert(events[0].data.id); + assert.equal(events[0].data.object, 'thread.run'); + assert.equal(events[0].data.status, 'queued'); + + assert.equal(events[1].event, 'thread.run.in_progress'); + assert.equal(events[1].data.status, 'in_progress'); + + assert.equal(events[2].event, 'thread.message.created'); + assert.equal(events[2].data.object, 'thread.message'); + assert.equal(events[2].data.status, 'in_progress'); + + assert.equal(events[3].event, 'thread.message.delta'); + assert.equal(events[3].data.object, 'thread.message.delta'); + assert(events[3].data.delta.content[0].text.value.includes('Streamed')); + + assert.equal(events[4].event, 'thread.message.completed'); + assert.equal(events[4].data.status, 'completed'); + + assert.equal(events[5].event, 'thread.run.completed'); + assert.equal(events[5].data.status, 'completed'); + assert(events[5].data.output[0].content[0].text.value.includes('Streamed')); + }); + + it('should stream SSE events with multiple messages', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/stream') + .send({ + input: { + messages: [ + { role: 'system', content: 'You are helpful' }, + { role: 'user', content: 'Hello' }, + { role: 'user', content: 'How are you?' }, + ], + }, + }) + .buffer(true) + .expect(200); + + // Verify message count is reflected in the streamed content + assert(res.text.includes('Streamed 3 messages')); + }); + }); + + describe('POST /api/v1/runs/wait (syncRun)', () => { + it('should create a sync run and wait for completion', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + input: { + messages: [{ role: 'user', content: 'What is 2+2?' }], + }, + }) + .expect(200); + assert(res.body.id); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'completed'); + assert(Array.isArray(res.body.output)); + assert.equal(res.body.output.length, 1); + assert.equal(res.body.output[0].object, 'thread.message'); + assert.equal(res.body.output[0].role, 'assistant'); + assert.equal(res.body.output[0].content[0].text.value, 'Processed 1 messages'); + }); + + it('should handle multiple messages', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + input: { + messages: [ + { role: 'system', content: 'You are helpful' }, + { role: 'user', content: 'Hello' }, + { role: 'assistant', content: 'Hi there!' }, + { role: 'user', content: 'How are you?' }, + ], + }, + }) + .expect(200); + assert.equal(res.body.status, 'completed'); + assert.equal(res.body.output[0].content[0].text.value, 'Processed 4 messages'); + }); + }); + + describe('GET /api/v1/runs/:id (getRun)', () => { + it('should get a run by id', async () => { + // First create a run + const createRes = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'test' }], + }, + }) + .expect(200); + const runId = createRes.body.id; + + // Then get the run + const res = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(res.body.id, runId); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'queued'); + assert(typeof res.body.created_at === 'number'); + }); + + it('should return 404 for non-existent run', async () => { + await app.httpRequest().get('/api/v1/runs/non_existent').expect(404); + }); + }); + + describe('POST /api/v1/runs/:id/cancel (cancelRun)', () => { + it('should cancel a run', async () => { + // First create a run + const createRes = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'cancel me' }], + }, + }) + .expect(200); + const runId = createRes.body.id; + + // Cancel it + const res = await app.httpRequest().post(`/api/v1/runs/${runId}/cancel`).expect(200); + assert.equal(res.body.id, runId); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'cancelled'); + + // Verify status changed + const getRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(getRes.body.status, 'cancelled'); + }); + }); + + describe('full workflow', () => { + it('should support create thread → sync run → get run flow', async () => { + // 1. Create thread + const threadRes = await app.httpRequest().post('/api/v1/threads').send({}).expect(200); + const threadId = threadRes.body.id; + + // 2. Run sync with thread + const runRes = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + thread_id: threadId, + input: { + messages: [{ role: 'user', content: 'Hello agent' }], + }, + }) + .expect(200); + assert.equal(runRes.body.status, 'completed'); + const runId = runRes.body.id; + + // 3. Get run details + const getRunRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(getRunRes.body.id, runId); + assert.equal(getRunRes.body.thread_id, threadId); + assert.equal(getRunRes.body.status, 'completed'); + }); + + it('should support async run → get run → cancel flow', async () => { + // 1. Create async run + const asyncRes = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'async task' }], + }, + }) + .expect(200); + assert.equal(asyncRes.body.status, 'queued'); + const runId = asyncRes.body.id; + + // 2. Get run - should be queued + const getRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(getRes.body.status, 'queued'); + + // 3. Cancel run + const cancelRes = await app.httpRequest().post(`/api/v1/runs/${runId}/cancel`).expect(200); + assert.equal(cancelRes.body.status, 'cancelled'); + + // 4. Verify cancelled + const verifyRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(verifyRes.body.status, 'cancelled'); + }); + }); +}); diff --git a/tegg/plugin/controller/test/http/base-agent.test.ts b/tegg/plugin/controller/test/http/base-agent.test.ts new file mode 100644 index 0000000000..32990fa300 --- /dev/null +++ b/tegg/plugin/controller/test/http/base-agent.test.ts @@ -0,0 +1,440 @@ +import { strict as assert } from 'node:assert'; +import { rm } from 'node:fs/promises'; +import path from 'node:path'; + +import { mm, type MockApplication } from '@eggjs/mock'; +import { describe, it, afterEach, beforeAll, afterAll } from 'vitest'; + +import { getFixtures } from '../utils.ts'; + +describe('plugin/controller/test/http/base-agent.test.ts', () => { + let app: MockApplication; + const agentDataDir = path.join(getFixtures('apps/base-agent-controller-app'), '.agent-data'); + + afterEach(() => { + return mm.restore(); + }); + + beforeAll(async () => { + mm(process.env, 'TEGG_AGENT_DATA_DIR', agentDataDir); + app = mm.app({ + baseDir: getFixtures('apps/base-agent-controller-app'), + }); + await app.ready(); + }); + + afterAll(async () => { + await app.close(); + await rm(agentDataDir, { recursive: true, force: true }).catch(() => { + /* ignore */ + }); + }); + + describe('POST /api/v1/threads (createThread)', () => { + it('should create a new thread via smart default', async () => { + const res = await app.httpRequest().post('/api/v1/threads').send({}).expect(200); + assert(res.body.id); + assert(res.body.id.startsWith('thread_')); + assert.equal(res.body.object, 'thread'); + assert(typeof res.body.created_at === 'number'); + // Unix seconds + assert(res.body.created_at <= Math.floor(Date.now() / 1000)); + assert(typeof res.body.metadata === 'object'); + }); + }); + + describe('GET /api/v1/threads/:id (getThread)', () => { + it('should get a thread by id', async () => { + const createRes = await app.httpRequest().post('/api/v1/threads').send({}).expect(200); + const threadId = createRes.body.id; + + const res = await app.httpRequest().get(`/api/v1/threads/${threadId}`).expect(200); + assert.equal(res.body.id, threadId); + assert.equal(res.body.object, 'thread'); + assert(Array.isArray(res.body.messages)); + assert(typeof res.body.created_at === 'number'); + }); + + it('should return 404 for non-existent thread', async () => { + await app.httpRequest().get('/api/v1/threads/non_existent').expect(404); + }); + }); + + describe('POST /api/v1/runs/wait (syncRun)', () => { + it('should process via execRun and return completed RunObject', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + input: { + messages: [{ role: 'user', content: 'What is 2+2?' }], + }, + }) + .expect(200); + assert(res.body.id); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'completed'); + assert(res.body.thread_id); + assert(res.body.thread_id.startsWith('thread_')); + assert(Array.isArray(res.body.output)); + assert.equal(res.body.output.length, 1); + assert.equal(res.body.output[0].object, 'thread.message'); + assert.equal(res.body.output[0].role, 'assistant'); + assert.equal(res.body.output[0].status, 'completed'); + assert.equal(res.body.output[0].content[0].type, 'text'); + assert.equal(res.body.output[0].content[0].text.value, 'Processed 1 messages'); + assert(Array.isArray(res.body.output[0].content[0].text.annotations)); + }); + + it('should handle multiple messages', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + input: { + messages: [ + { role: 'system', content: 'You are helpful' }, + { role: 'user', content: 'Hello' }, + { role: 'user', content: 'How are you?' }, + ], + }, + }) + .expect(200); + assert.equal(res.body.status, 'completed'); + assert.equal(res.body.output[0].content[0].text.value, 'Processed 3 messages'); + }); + + it('should pass metadata through syncRun and persist to store', async () => { + const meta = { user_id: 'u_sync', env: 'test' }; + const res = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + input: { + messages: [{ role: 'user', content: 'Hello' }], + }, + metadata: meta, + }) + .expect(200); + assert.deepEqual(res.body.metadata, meta); + + // Verify persisted via getRun + const getRes = await app.httpRequest().get(`/api/v1/runs/${res.body.id}`).expect(200); + assert.deepEqual(getRes.body.metadata, meta); + }); + + it('should auto-create thread and persist messages when thread_id not provided', async () => { + const runRes = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + input: { + messages: [{ role: 'user', content: 'Hello agent' }], + }, + }) + .expect(200); + assert(runRes.body.thread_id); + assert(runRes.body.thread_id.startsWith('thread_')); + + // Verify thread was created and messages were appended + const threadRes = await app.httpRequest().get(`/api/v1/threads/${runRes.body.thread_id}`).expect(200); + assert.equal(threadRes.body.messages.length, 2); // user + assistant + assert.equal(threadRes.body.messages[0].role, 'user'); + assert.equal(threadRes.body.messages[1].role, 'assistant'); + }); + }); + + describe('POST /api/v1/runs (asyncRun)', () => { + it('should create an async run and return queued with auto-created thread_id', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'Hello' }], + }, + }) + .expect(200); + assert(res.body.id); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'queued'); + assert(res.body.thread_id); + assert(res.body.thread_id.startsWith('thread_')); + }); + + it('should pass metadata through asyncRun and persist to store', async () => { + const meta = { user_id: 'u_async' }; + const res = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'Hello' }], + }, + metadata: meta, + }) + .expect(200); + assert.deepEqual(res.body.metadata, meta); + + // Wait for background task + await new Promise((resolve) => setTimeout(resolve, 500)); + + // Verify persisted via getRun + const getRes = await app.httpRequest().get(`/api/v1/runs/${res.body.id}`).expect(200); + assert.deepEqual(getRes.body.metadata, meta); + }); + + it('should complete the run in the background', async () => { + const asyncRes = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'Hello' }], + }, + }) + .expect(200); + const runId = asyncRes.body.id; + + // Wait a bit for background task + await new Promise((resolve) => setTimeout(resolve, 500)); + + const getRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(getRes.body.status, 'completed'); + assert.equal(getRes.body.output[0].content[0].text.value, 'Processed 1 messages'); + }); + }); + + describe('POST /api/v1/runs/stream (streamRun)', () => { + it('should stream SSE events with OpenAI format', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/stream') + .send({ + input: { + messages: [{ role: 'user', content: 'Stream me' }], + }, + }) + .buffer(true) + .expect(200) + .expect('Content-Type', /text\/event-stream/); + + // Parse SSE events from response text + const events: { event: string; data: any }[] = []; + const rawEvents = res.text.split('\n\n').filter(Boolean); + for (const raw of rawEvents) { + const lines = raw.split('\n'); + let event = ''; + let data = ''; + for (const line of lines) { + if (line.startsWith('event: ')) event = line.slice(7); + if (line.startsWith('data: ')) data = line.slice(6); + } + if (event && data) { + try { + events.push({ event, data: JSON.parse(data) }); + } catch { + events.push({ event, data }); + } + } + } + + // Expected events: thread.run.created, thread.run.in_progress, + // thread.message.created, thread.message.delta (assistant msg), + // thread.message.completed, thread.run.completed, done + assert(events.length >= 7); + + assert.equal(events[0].event, 'thread.run.created'); + assert(events[0].data.id); + assert.equal(events[0].data.object, 'thread.run'); + assert.equal(events[0].data.status, 'queued'); + assert(events[0].data.thread_id); + assert(events[0].data.thread_id.startsWith('thread_')); + + assert.equal(events[1].event, 'thread.run.in_progress'); + assert.equal(events[1].data.status, 'in_progress'); + + assert.equal(events[2].event, 'thread.message.created'); + assert.equal(events[2].data.object, 'thread.message'); + assert.equal(events[2].data.role, 'assistant'); + assert.equal(events[2].data.status, 'in_progress'); + assert.deepEqual(events[2].data.content, []); + + // thread.message.delta for the assistant message + assert.equal(events[3].event, 'thread.message.delta'); + assert.equal(events[3].data.object, 'thread.message.delta'); + assert.equal(events[3].data.delta.content[0].text.value, 'Processed 1 messages'); + + // No delta for the usage-only yield (type: 'result') + + assert.equal(events[4].event, 'thread.message.completed'); + assert.equal(events[4].data.status, 'completed'); + assert.equal(events[4].data.content[0].text.value, 'Processed 1 messages'); + + assert.equal(events[5].event, 'thread.run.completed'); + assert.equal(events[5].data.status, 'completed'); + assert.equal(events[5].data.output[0].role, 'assistant'); + assert.equal(events[5].data.output[0].content[0].text.value, 'Processed 1 messages'); + assert.equal(events[5].data.usage.prompt_tokens, 10); + assert.equal(events[5].data.usage.completion_tokens, 5); + assert.equal(events[5].data.usage.total_tokens, 15); + + assert.equal(events[6].event, 'done'); + }); + + it('should persist in_progress and started_at to store during stream', async () => { + const res = await app + .httpRequest() + .post('/api/v1/runs/stream') + .send({ + input: { + messages: [{ role: 'user', content: 'Stream me' }], + }, + }) + .buffer(true) + .expect(200); + + // Extract run id from the first SSE event + const firstEvent = res.text.split('\n\n')[0]; + const dataLine = firstEvent.split('\n').find((l: string) => l.startsWith('data: ')); + const runData = JSON.parse(dataLine!.slice(6)); + const runId = runData.id; + + // After stream completes, verify run was persisted with started_at + const getRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(getRes.body.status, 'completed'); + assert(typeof getRes.body.started_at === 'number'); + assert(getRes.body.started_at > 0); + }); + + it('should include metadata in SSE events and persist to store', async () => { + const meta = { user_id: 'u_stream', tag: 'test' }; + const res = await app + .httpRequest() + .post('/api/v1/runs/stream') + .send({ + input: { + messages: [{ role: 'user', content: 'Stream me' }], + }, + metadata: meta, + }) + .buffer(true) + .expect(200); + + // Parse SSE events + const events: { event: string; data: any }[] = []; + const rawEvents = res.text.split('\n\n').filter(Boolean); + for (const raw of rawEvents) { + const lines = raw.split('\n'); + let event = ''; + let data = ''; + for (const line of lines) { + if (line.startsWith('event: ')) event = line.slice(7); + if (line.startsWith('data: ')) data = line.slice(6); + } + if (event && data) { + try { + events.push({ event, data: JSON.parse(data) }); + } catch { + events.push({ event, data }); + } + } + } + + // Verify metadata in SSE events + assert.deepEqual(events[0].data.metadata, meta); // thread.run.created + assert.deepEqual(events[1].data.metadata, meta); // thread.run.in_progress + + // Verify metadata persisted in store via getRun + const runId = events[0].data.id; + const getRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.deepEqual(getRes.body.metadata, meta); + }); + }); + + describe('GET /api/v1/runs/:id (getRun)', () => { + it('should get a run by id', async () => { + const createRes = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + input: { + messages: [{ role: 'user', content: 'test' }], + }, + }) + .expect(200); + const runId = createRes.body.id; + + const res = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(res.body.id, runId); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'completed'); + assert(typeof res.body.created_at === 'number'); + }); + + it('should return 404 for non-existent run', async () => { + await app.httpRequest().get('/api/v1/runs/non_existent').expect(404); + }); + }); + + describe('POST /api/v1/runs/:id/cancel (cancelRun)', () => { + it('should cancel a run', async () => { + const createRes = await app + .httpRequest() + .post('/api/v1/runs') + .send({ + input: { + messages: [{ role: 'user', content: 'cancel me' }], + }, + }) + .expect(200); + const runId = createRes.body.id; + + // Wait for background task to start + await new Promise((resolve) => setTimeout(resolve, 100)); + + const res = await app.httpRequest().post(`/api/v1/runs/${runId}/cancel`).expect(200); + assert.equal(res.body.id, runId); + assert.equal(res.body.object, 'thread.run'); + assert.equal(res.body.status, 'cancelled'); + + // Verify status + const getRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(getRes.body.status, 'cancelled'); + }); + }); + + describe('full workflow', () => { + it('should support create thread → sync run → get thread with messages', async () => { + // 1. Create thread + const threadRes = await app.httpRequest().post('/api/v1/threads').send({}).expect(200); + const threadId = threadRes.body.id; + + // 2. Run sync with thread + const runRes = await app + .httpRequest() + .post('/api/v1/runs/wait') + .send({ + thread_id: threadId, + input: { + messages: [{ role: 'user', content: 'Hello agent' }], + }, + }) + .expect(200); + assert.equal(runRes.body.status, 'completed'); + const runId = runRes.body.id; + + // 3. Get run details + const getRunRes = await app.httpRequest().get(`/api/v1/runs/${runId}`).expect(200); + assert.equal(getRunRes.body.id, runId); + assert.equal(getRunRes.body.thread_id, threadId); + assert.equal(getRunRes.body.status, 'completed'); + + // 4. Thread should have messages appended + const getThreadRes = await app.httpRequest().get(`/api/v1/threads/${threadId}`).expect(200); + assert.equal(getThreadRes.body.messages.length, 2); + assert.equal(getThreadRes.body.messages[0].role, 'user'); + assert.equal(getThreadRes.body.messages[1].role, 'assistant'); + }); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index b67b58ec46..0a167c7226 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -115,6 +115,9 @@ }, { "path": "./tegg/core/vitest" + }, + { + "path": "./tegg/core/agent-runtime" } ] }